Saltar al contenido

¿Cuál es la mejor manera de eliminar los acentos con los marcos de datos de Apache Spark en PySpark?

Esta noticia fue probado por especialistas para garantizar la veracidad de este artículo.

Solución:

Una posible mejora es crear un Transformer, que manejará la normalización Unicode y el contenedor Python correspondiente. Debería reducir la sobrecarga general de pasar datos entre JVM y Python y no requiere ninguna modificación en Spark ni acceso a la API privada.

En el lado de JVM, necesitará un transformador similar a este:

package net.zero323.spark.ml.feature

import java.text.Normalizer
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.sql.types.DataType, StringType

class UnicodeNormalizer (override val uid: String)
  extends UnaryTransformer[String, String, UnicodeNormalizer] 

  def this() = this(Identifiable.randomUID("unicode_normalizer"))

  private val forms = Map(
    "NFC" -> Normalizer.Form.NFC, "NFD" -> Normalizer.Form.NFD,
    "NFKC" -> Normalizer.Form.NFKC, "NFKD" -> Normalizer.Form.NFKD
  )

  val form: Param[String] = new Param(this, "form", "unicode form (one of NFC, NFD, NFKC, NFKD)",
    ParamValidators.inArray(forms.keys.toArray))

  def setN(value: String): this.type = set(form, value)

  def getForm: String = $(form)

  setDefault(form -> "NFKD")

  override protected def createTransformFunc: String => String = 
    val normalizerForm = forms($(form))
    (s: String) => Normalizer.normalize(s, normalizerForm)
  

  override protected def validateInputType(inputType: DataType): Unit = 
    require(inputType == StringType, s"Input type must be string type but got $inputType.")
  

  override protected def outputDataType: DataType = StringType

Definición de compilación correspondiente (ajuste las versiones de Spark y Scala para que coincidan con su implementación de Spark):

name := "unicode-normalization"

version := "1.0"

crossScalaVersions := Seq("2.11.12", "2.12.8")

organization := "net.zero323"

val sparkVersion = "2.4.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion
)

En el lado de Python, necesitará un contenedor similar a este.

from pyspark.ml.param.shared import *
# from pyspark.ml.util import keyword_only  # in Spark < 2.0
from pyspark import keyword_only 
from pyspark.ml.wrapper import JavaTransformer

class UnicodeNormalizer(JavaTransformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, form="NFKD", inputCol=None, outputCol=None):
        super(UnicodeNormalizer, self).__init__()
        self._java_obj = self._new_java_obj(
            "net.zero323.spark.ml.feature.UnicodeNormalizer", self.uid)
        self.form = Param(self, "form",
            "unicode form (one of NFC, NFD, NFKC, NFKD)")
        # kwargs = self.__init__._input_kwargs  # in Spark < 2.0
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, form="NFKD", inputCol=None, outputCol=None):
        # kwargs = self.setParams._input_kwargs  # in Spark < 2.0
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setForm(self, value):
        return self._set(form=value)

    def getForm(self):
        return self.getOrDefault(self.form)

Construya el paquete Scala:

sbt +package

inclúyalo cuando inicie shell o envíe. Por ejemplo, para la compilación de Spark con Scala 2.11:

bin/pyspark --jars path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar 
 --driver-class-path path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar

y debería estar listo para comenzar. Todo lo que queda es un poco de magia de expresiones regulares:

from pyspark.sql.functions import regexp_replace

normalizer = UnicodeNormalizer(form="NFKD",
    inputCol="text", outputCol="text_normalized")

df = sc.parallelize([
    (1, "Maracaibó"), (2, "New York"),
    (3, "   São Paulo   "), (4, "~Madrid")
]).toDF(["id", "text"])

(normalizer
    .transform(df)
    .select(regexp_replace("text_normalized", "pM", ""))
    .show())

## +--------------------------------------+
## |regexp_replace(text_normalized,pM,)|
## +--------------------------------------+
## |                             Maracaibo|
## |                              New York|
## |                          Sao Paulo   |
## |                               ~Madrid|
## +--------------------------------------+

Tenga en cuenta que esto sigue las mismas convenciones que los transformadores de texto integrados y no es null a salvo. Puede corregirlo fácilmente comprobando null en createTransformFunc.

Otra forma de hacerlo usando Python Unicode Database:

import unicodedata
import sys

from pyspark.sql.functions import translate, regexp_replace

def make_trans():
    matching_string = ""
    replace_string = ""

    for i in range(ord(" "), sys.maxunicode):
        name = unicodedata.name(chr(i), "")
        if "WITH" in name:
            try:
                base = unicodedata.lookup(name.split(" WITH")[0])
                matching_string += chr(i)
                replace_string += base
            except KeyError:
                pass

    return matching_string, replace_string

def clean_text(c):
    matching_string, replace_string = make_trans()
    return translate(
        regexp_replace(c, "pM", ""), 
        matching_string, replace_string
    ).alias(c)

Así que ahora probémoslo:

df = sc.parallelize([
(1, "Maracaibó"), (2, "New York"),
(3, "   São Paulo   "), (4, "~Madrid"),
(5, "São Paulo"), (6, "Maracaibó")
]).toDF(["id", "text"])

df.select(clean_text("text")).show()
## +---------------+
## |           text|
## +---------------+
## |      Maracaibo|
## |       New York|
## |   Sao Paulo   |
## |        ~Madrid|
## |      Sao Paulo|
## |      Maracaibo|
## +---------------+

reconocer @ zero323

Esta solución es solo para Python, pero solo es útil si el número de acentos posibles es bajo (por ejemplo, un solo idioma como el español) y los reemplazos de caracteres se especifican manualmente.

Parece que no hay una forma incorporada de hacer lo que solicitó directamente sin UDF, sin embargo, puede encadenar muchos regexp_replace llamadas para reemplazar cada posible carácter acentuado. Probé el rendimiento de esta solución y resultó que solo se ejecuta más rápido si tiene un conjunto muy limitado de acentos para reemplazar. Si ese es el caso, puede ser más rápido que las UDF porque está optimizado fuera de Python.

from pyspark.sql.functions import col, regexp_replace

accent_replacements_spanish = [
    (u'á', 'a'), (u'Á', 'A'),
    (u'é', 'e'), (u'É', 'E'),
    (u'í', 'i'), (u'Í', 'I'),
    (u'ò', 'o'), (u'Ó', 'O'),
    (u'ú|ü', 'u'), (u'Ú|Ű', 'U'),
    (u'ñ', 'n'),
    # see http://stackoverflow.com/a/18123985/3810493 for other characters

    # this will convert other non ASCII characters to a question mark:
    ('[^x00-x7F]', '?') 
]

def remove_accents(column):
    r = col(column)
    for a, b in accent_replacements_spanish:
        r = regexp_replace(r, a, b)
    return r.alias('remove_accents(' + column + ')')

df = sqlContext.createDataFrame([['Olà'], ['Olé'], ['Núñez']], ['str'])
df.select(remove_accents('str')).show()

No he comparado el rendimiento con las otras respuestas y esta función no es tan general, pero al menos vale la pena considerarla porque no necesita agregar Scala o Java a su proceso de compilación.

Al final de todo puedes encontrar las notas de otros sys admins, tú además puedes insertar el tuyo si lo crees conveniente.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *