Esta noticia fue probado por especialistas para garantizar la veracidad de este artículo.
Solución:
Una posible mejora es crear un Transformer
, que manejará la normalización Unicode y el contenedor Python correspondiente. Debería reducir la sobrecarga general de pasar datos entre JVM y Python y no requiere ninguna modificación en Spark ni acceso a la API privada.
En el lado de JVM, necesitará un transformador similar a este:
package net.zero323.spark.ml.feature
import java.text.Normalizer
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.sql.types.DataType, StringType
class UnicodeNormalizer (override val uid: String)
extends UnaryTransformer[String, String, UnicodeNormalizer]
def this() = this(Identifiable.randomUID("unicode_normalizer"))
private val forms = Map(
"NFC" -> Normalizer.Form.NFC, "NFD" -> Normalizer.Form.NFD,
"NFKC" -> Normalizer.Form.NFKC, "NFKD" -> Normalizer.Form.NFKD
)
val form: Param[String] = new Param(this, "form", "unicode form (one of NFC, NFD, NFKC, NFKD)",
ParamValidators.inArray(forms.keys.toArray))
def setN(value: String): this.type = set(form, value)
def getForm: String = $(form)
setDefault(form -> "NFKD")
override protected def createTransformFunc: String => String =
val normalizerForm = forms($(form))
(s: String) => Normalizer.normalize(s, normalizerForm)
override protected def validateInputType(inputType: DataType): Unit =
require(inputType == StringType, s"Input type must be string type but got $inputType.")
override protected def outputDataType: DataType = StringType
Definición de compilación correspondiente (ajuste las versiones de Spark y Scala para que coincidan con su implementación de Spark):
name := "unicode-normalization"
version := "1.0"
crossScalaVersions := Seq("2.11.12", "2.12.8")
organization := "net.zero323"
val sparkVersion = "2.4.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion
)
En el lado de Python, necesitará un contenedor similar a este.
from pyspark.ml.param.shared import *
# from pyspark.ml.util import keyword_only # in Spark < 2.0
from pyspark import keyword_only
from pyspark.ml.wrapper import JavaTransformer
class UnicodeNormalizer(JavaTransformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, form="NFKD", inputCol=None, outputCol=None):
super(UnicodeNormalizer, self).__init__()
self._java_obj = self._new_java_obj(
"net.zero323.spark.ml.feature.UnicodeNormalizer", self.uid)
self.form = Param(self, "form",
"unicode form (one of NFC, NFD, NFKC, NFKD)")
# kwargs = self.__init__._input_kwargs # in Spark < 2.0
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, form="NFKD", inputCol=None, outputCol=None):
# kwargs = self.setParams._input_kwargs # in Spark < 2.0
kwargs = self._input_kwargs
return self._set(**kwargs)
def setForm(self, value):
return self._set(form=value)
def getForm(self):
return self.getOrDefault(self.form)
Construya el paquete Scala:
sbt +package
inclúyalo cuando inicie shell o envíe. Por ejemplo, para la compilación de Spark con Scala 2.11:
bin/pyspark --jars path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar
--driver-class-path path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar
y debería estar listo para comenzar. Todo lo que queda es un poco de magia de expresiones regulares:
from pyspark.sql.functions import regexp_replace
normalizer = UnicodeNormalizer(form="NFKD",
inputCol="text", outputCol="text_normalized")
df = sc.parallelize([
(1, "Maracaibó"), (2, "New York"),
(3, " São Paulo "), (4, "~Madrid")
]).toDF(["id", "text"])
(normalizer
.transform(df)
.select(regexp_replace("text_normalized", "pM", ""))
.show())
## +--------------------------------------+
## |regexp_replace(text_normalized,pM,)|
## +--------------------------------------+
## | Maracaibo|
## | New York|
## | Sao Paulo |
## | ~Madrid|
## +--------------------------------------+
Tenga en cuenta que esto sigue las mismas convenciones que los transformadores de texto integrados y no es null a salvo. Puede corregirlo fácilmente comprobando null
en createTransformFunc
.
Otra forma de hacerlo usando Python Unicode Database:
import unicodedata
import sys
from pyspark.sql.functions import translate, regexp_replace
def make_trans():
matching_string = ""
replace_string = ""
for i in range(ord(" "), sys.maxunicode):
name = unicodedata.name(chr(i), "")
if "WITH" in name:
try:
base = unicodedata.lookup(name.split(" WITH")[0])
matching_string += chr(i)
replace_string += base
except KeyError:
pass
return matching_string, replace_string
def clean_text(c):
matching_string, replace_string = make_trans()
return translate(
regexp_replace(c, "pM", ""),
matching_string, replace_string
).alias(c)
Así que ahora probémoslo:
df = sc.parallelize([
(1, "Maracaibó"), (2, "New York"),
(3, " São Paulo "), (4, "~Madrid"),
(5, "São Paulo"), (6, "Maracaibó")
]).toDF(["id", "text"])
df.select(clean_text("text")).show()
## +---------------+
## | text|
## +---------------+
## | Maracaibo|
## | New York|
## | Sao Paulo |
## | ~Madrid|
## | Sao Paulo|
## | Maracaibo|
## +---------------+
reconocer @ zero323
Esta solución es solo para Python, pero solo es útil si el número de acentos posibles es bajo (por ejemplo, un solo idioma como el español) y los reemplazos de caracteres se especifican manualmente.
Parece que no hay una forma incorporada de hacer lo que solicitó directamente sin UDF, sin embargo, puede encadenar muchos regexp_replace
llamadas para reemplazar cada posible carácter acentuado. Probé el rendimiento de esta solución y resultó que solo se ejecuta más rápido si tiene un conjunto muy limitado de acentos para reemplazar. Si ese es el caso, puede ser más rápido que las UDF porque está optimizado fuera de Python.
from pyspark.sql.functions import col, regexp_replace
accent_replacements_spanish = [
(u'á', 'a'), (u'Á', 'A'),
(u'é', 'e'), (u'É', 'E'),
(u'í', 'i'), (u'Í', 'I'),
(u'ò', 'o'), (u'Ó', 'O'),
(u'ú|ü', 'u'), (u'Ú|Ű', 'U'),
(u'ñ', 'n'),
# see http://stackoverflow.com/a/18123985/3810493 for other characters
# this will convert other non ASCII characters to a question mark:
('[^x00-x7F]', '?')
]
def remove_accents(column):
r = col(column)
for a, b in accent_replacements_spanish:
r = regexp_replace(r, a, b)
return r.alias('remove_accents(' + column + ')')
df = sqlContext.createDataFrame([['Olà'], ['Olé'], ['Núñez']], ['str'])
df.select(remove_accents('str')).show()
No he comparado el rendimiento con las otras respuestas y esta función no es tan general, pero al menos vale la pena considerarla porque no necesita agregar Scala o Java a su proceso de compilación.
Al final de todo puedes encontrar las notas de otros sys admins, tú además puedes insertar el tuyo si lo crees conveniente.