Saltar al contenido

¿Cómo convertir pyspark.rdd.PipelinedRDD en un marco de datos sin usar el método collect () en Pyspark?

Nuestros mejores desarrolladores agotaron sus reservas de café, en su búsqueda a tiempo completo por la solución, hasta que Emanuel halló la solución en GitLab así que en este momento la compartimos aquí.

Solución:

Quiere hacer dos cosas aquí: 1. aplanar sus datos 2. ponerlos en un marco de datos

Una forma de hacerlo es la siguiente:

Primero, aplanemos el diccionario:

rdd2 = Rdd1.flatMapValues(lambda x : [ (k, x[k]) for k in x.keys()])

Al recopilar los datos, obtienes algo como esto:

[(10, (3, 3.616726727464709)), (10, (4, 2.9996439803387602)), ...

Then we can format the data and turn it into a dataframe:

rdd2.map(lambda x : (x[0]X[1][0]X[1][1])) .toDF(("CId", "IID", "Puntuación")) .show()

que te da esto:

+---+---+-------------------+
|CId|IID|              Score|
+---+---+-------------------+
| 10|  3|  3.616726727464709|
| 10|  4| 2.9996439803387602|
| 10|  5| 1.6767412921625855|
|  1|  3|  2.016527311459324|
|  1|  4|-1.5271512313750577|
|  1|  5| 1.9665475696370045|
|  2|  3|  6.230272144805092|
|  2|  4|  4.033642544526678|
|  2|  5| 3.1517805604906313|
|  3|  3|-0.3924680103722977|
|  3|  4| 2.9757316477407443|
|  3|  5|-1.5689126834176417|
+---+---+-------------------+

Hay una solución aún más fácil y elegante que evita las expresiones lambda de python como en la respuesta de @oli que se basa en Spark DataFrames explode que se adapta perfectamente a sus necesidades. También debería ser más rápido porque no hay necesidad de usar python lambda dos veces. Vea abajo:

from pyspark.sql.functions import explode

# dummy data
data = [(10, 3: 3.616726727464709, 4: 2.9996439803387602, 5: 1.6767412921625855),
        (1, 3: 2.016527311459324, 4: -1.5271512313750577, 5: 1.9665475696370045),
        (2, 3: 6.230272144805092, 4: 4.033642544526678, 5: 3.1517805604906313),
        (3, 3: -0.3924680103722977, 4: 2.9757316477407443, 5: -1.5689126834176417)]

# create your rdd
rdd = sc.parallelize(data)

# convert to spark data frame
df = rdd.toDF(["CId", "Values"])

# use explode
df.select("CId", explode("Values").alias("IID", "Score")).show()

+---+---+-------------------+
|CId|IID|              Score|
+---+---+-------------------+
| 10|  3|  3.616726727464709|
| 10|  4| 2.9996439803387602|
| 10|  5| 1.6767412921625855|
|  1|  3|  2.016527311459324|
|  1|  4|-1.5271512313750577|
|  1|  5| 1.9665475696370045|
|  2|  3|  6.230272144805092|
|  2|  4|  4.033642544526678|
|  2|  5| 3.1517805604906313|
|  3|  3|-0.3924680103722977|
|  3|  4| 2.9757316477407443|
|  3|  5|-1.5689126834176417|
+---+---+-------------------+

Si para ti ha sido de utilidad nuestro artículo, nos gustaría que lo compartas con otros entusiastas de la programación y nos ayudes a difundir esta información.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *