Подтвердить что ты не робот

Каков наилучший способ удаления акцентов с фреймами данных Apache Spark в PySpark?

Мне нужно удалить акценты с символов на испанском и других языках из разных наборов данных.

Я уже сделал функцию, основанную на коде, представленном в этом посте, который удаляет специальные акценты. Проблема в том, что функция медленная, потому что она использует UDF. Мне просто интересно, смогу ли я улучшить производительность своей функции, чтобы получить результаты за меньшее время, потому что это хорошо для небольших кадров данных, но не для больших.

Заранее спасибо.

Вот код, вы сможете запустить его так, как он представлен:

# Importing sql types
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf, col
import unicodedata

# Building a simple dataframe:
schema = StructType([StructField("city", StringType(), True),
                     StructField("country", StringType(), True),
                     StructField("population", IntegerType(), True)])

countries = ['Venezuela', '[email protected]', 'Brazil', 'Spain']
cities = ['Maracaibó', 'New York', '   São Paulo   ', '~Madrid']
population = [37800000,19795791,12341418,6489162]

# Dataframe:
df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema)

df.show()

class Test():
    def __init__(self, df):
        self.df = df

    def clearAccents(self, columns):
        """This function deletes accents in strings column dataFrames, 
        it does not eliminate main characters, but only deletes special tildes.

        :param columns  String or a list of column names.
        """
        # Filters all string columns in dataFrame
        validCols = [c for (c, t) in filter(lambda t: t[1] == 'string', self.df.dtypes)]

        # If None or [] is provided with column parameter:
        if (columns == "*"): columns = validCols[:]

        # Receives  a string as an argument
        def remove_accents(inputStr):
            # first, normalize strings:
            nfkdStr = unicodedata.normalize('NFKD', inputStr)
            # Keep chars that has no other char combined (i.e. accents chars)
            withOutAccents = u"".join([c for c in nfkdStr if not unicodedata.combining(c)])
            return withOutAccents

        function = udf(lambda x: remove_accents(x) if x != None else x, StringType())
        exprs = [function(col(c)).alias(c) if (c in columns) and (c in validCols) else c for c in self.df.columns]
        self.df = self.df.select(*exprs)

foo = Test(df)
foo.clearAccents(columns="*")
foo.df.show()
4b9b3361

Ответ 1

Одним из возможных улучшений является создание собственного Transformer, который будет обрабатывать нормализацию Unicode и соответствующую оболочку Python. Это должно снизить общую нагрузку при передаче данных между JVM и Python и не требует каких-либо изменений в самой Spark или доступа к частному API.

На стороне JVM вам понадобится трансформатор, похожий на этот:

package net.zero323.spark.ml.feature

import java.text.Normalizer
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.sql.types.{DataType, StringType}

class UnicodeNormalizer (override val uid: String)
  extends UnaryTransformer[String, String, UnicodeNormalizer] {

  def this() = this(Identifiable.randomUID("unicode_normalizer"))

  private val forms = Map(
    "NFC" -> Normalizer.Form.NFC, "NFD" -> Normalizer.Form.NFD,
    "NFKC" -> Normalizer.Form.NFKC, "NFKD" -> Normalizer.Form.NFKD
  )

  val form: Param[String] = new Param(this, "form", "unicode form (one of NFC, NFD, NFKC, NFKD)",
    ParamValidators.inArray(forms.keys.toArray))

  def setN(value: String): this.type = set(form, value)

  def getForm: String = $(form)

  setDefault(form -> "NFKD")

  override protected def createTransformFunc: String => String = {
    val normalizerForm = forms($(form))
    (s: String) => Normalizer.normalize(s, normalizerForm)
  }

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType == StringType, s"Input type must be string type but got $inputType.")
  }

  override protected def outputDataType: DataType = StringType
}

Соответствующее определение сборки (настройте версии Spark и Scala в соответствии с вашим развертыванием Spark):

name := "unicode-normalization"

version := "1.0"

crossScalaVersions := Seq("2.11.12", "2.12.8")

organization := "net.zero323"

val sparkVersion = "2.4.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion
)

На стороне Python вам понадобится оболочка, похожая на эту.

from pyspark.ml.param.shared import *
# from pyspark.ml.util import keyword_only  # in Spark < 2.0
from pyspark import keyword_only 
from pyspark.ml.wrapper import JavaTransformer

class UnicodeNormalizer(JavaTransformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, form="NFKD", inputCol=None, outputCol=None):
        super(UnicodeNormalizer, self).__init__()
        self._java_obj = self._new_java_obj(
            "net.zero323.spark.ml.feature.UnicodeNormalizer", self.uid)
        self.form = Param(self, "form",
            "unicode form (one of NFC, NFD, NFKC, NFKD)")
        # kwargs = self.__init__._input_kwargs  # in Spark < 2.0
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, form="NFKD", inputCol=None, outputCol=None):
        # kwargs = self.setParams._input_kwargs  # in Spark < 2.0
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setForm(self, value):
        return self._set(form=value)

    def getForm(self):
        return self.getOrDefault(self.form)

Сборка пакета Scala:

sbt +package

включите его при запуске оболочки или отправке. Например, для сборки Spark с Scala 2.11:

bin/pyspark --jars path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar \
 --driver-class-path path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar

и ты должен быть готов к работе. Все, что осталось, это немного магии регулярных выражений:

from pyspark.sql.functions import regexp_replace

normalizer = UnicodeNormalizer(form="NFKD",
    inputCol="text", outputCol="text_normalized")

df = sc.parallelize([
    (1, "Maracaibó"), (2, "New York"),
    (3, "   São Paulo   "), (4, "~Madrid")
]).toDF(["id", "text"])

(normalizer
    .transform(df)
    .select(regexp_replace("text_normalized", "\p{M}", ""))
    .show())

## +--------------------------------------+
## |regexp_replace(text_normalized,\p{M},)|
## +--------------------------------------+
## |                             Maracaibo|
## |                              New York|
## |                          Sao Paulo   |
## |                               ~Madrid|
## +--------------------------------------+

Обратите внимание, что это соответствует тем же соглашениям, что и встроенные преобразователи текста, и не является нулевым. Вы можете легко исправить это путем проверки на null в createTransformFunc.

Ответ 2

Другой способ использования python Unicode Database:

import unicodedata
import sys

from pyspark.sql.functions import translate, regexp_replace

def make_trans():
    matching_string = ""
    replace_string = ""

    for i in range(ord(" "), sys.maxunicode):
        name = unicodedata.name(chr(i), "")
        if "WITH" in name:
            try:
                base = unicodedata.lookup(name.split(" WITH")[0])
                matching_string += chr(i)
                replace_string += base
            except KeyError:
                pass

    return matching_string, replace_string

def clean_text(c):
    matching_string, replace_string = make_trans()
    return translate(
        regexp_replace(c, "\p{M}", ""), 
        matching_string, replace_string
    ).alias(c)

Теперь давайте протестируем его:

df = sc.parallelize([
(1, "Maracaibó"), (2, "New York"),
(3, "   São Paulo   "), (4, "~Madrid"),
(5, "São Paulo"), (6, "Maracaibó")
]).toDF(["id", "text"])

df.select(clean_text("text")).show()
## +---------------+
## |           text|
## +---------------+
## |      Maracaibo|
## |       New York|
## |   Sao Paulo   |
## |        ~Madrid|
## |      Sao Paulo|
## |      Maracaibo|
## +---------------+

подтвердить @zero323

Ответ 3

Это решение только для Python, но полезно только в том случае, если количество возможных акцентов низкое (например, один отдельный язык, например, испанский), а замены символов указаны вручную.

Кажется, нет встроенного способа делать то, что вы просили напрямую без UDF, однако вы можете связать много вызовов regexp_replace, чтобы заменить каждый возможный акцентный символ. Я тестировал производительность этого решения, и выясняется, что он работает только быстрее, если у вас есть очень ограниченный набор акцентов для замены. Если это так, это может быть быстрее, чем UDF, потому что он оптимизирован за пределами Python.

  из pyspark.sql.functions import col, regexp_replace

accent_replacements_spanish = [   (u'a ',' a '), (u'Á', 'A'),   (u'e ',' e '), (u'É', 'E'),   (u'' ',' я '), (u'Í', 'I'),   (u'ò ',' o '), (u'Ó', 'O'),   (u'ú | ü ',' u '), (u'Ú | Ű', 'U'),   (u''n ',' n ')   # см. http://stackoverflow.com/a/18123985/3810493 для других персонажей
   # это преобразует другие символы не ASCII в знак вопроса:   ('[^\x00-\x7F]', '?')
]

def remove_accents (столбец):   r = col (столбец)   для a, b в accent_replacements_spanish:       r = regexp_replace (r, a, b)   return r.alias('remove_accents (' + column + ')')

df = sqlContext.createDataFrame([['Olà'], ['Olé']], ['str'])
df.select(remove_accents ( 'ул')). Шоу()
Код>

Я не сравнивал производительность с другими ответами, и эта функция не является общей, но, по крайней мере, стоит ее учесть, потому что вам не нужно добавлять Scala или Java в ваш процесс сборки.