Saltar al contenido

¿Cómo obtengo un equivalente de SQL row_number para un Spark RDD?

Nuestros mejores desarrolladores han agotado sus reservas de café, investigando noche y día por la respuesta, hasta que Marco encontró el hallazgo en Gitea por lo tanto ahora la comparte aquí.

El row_number() over (partition by ... order by ...) Se agregó funcionalidad a Spark 1.4. Esta respuesta usa PySpark/DataFrames.

Cree un marco de datos de prueba:

from pyspark.sql import Row, functions as F

testDF = sc.parallelize(
    (Row(k="key1", v=(1,2,3)),
     Row(k="key1", v=(1,4,7)),
     Row(k="key1", v=(2,2,3)),
     Row(k="key2", v=(5,5,5)),
     Row(k="key2", v=(5,5,9)),
     Row(k="key2", v=(7,5,5))
    )
).toDF()

Agregue el número de fila particionada:

from pyspark.sql.window import Window

(testDF
 .select("k", "v",
         F.rowNumber()
         .over(Window
               .partitionBy("k")
               .orderBy("k")
              )
         .alias("rowNum")
        )
 .show()
)

+----+-------+------+
|   k|      v|rowNum|
+----+-------+------+
|key1|[1,2,3]|     1|
|key1|[1,4,7]|     2|
|key1|[2,2,3]|     3|
|key2|[5,5,5]|     1|
|key2|[5,5,9]|     2|
|key2|[7,5,5]|     3|
+----+-------+------+

Es un problema interesante el que planteas. Lo responderé en Python, pero estoy seguro de que podrá traducirlo sin problemas a Scala.

Así es como lo abordaría:

1- Simplifica tus datos:

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))

temp2 es ahora un “real” key-par de valores. Se parece a eso:

[
((3, 4), (5, 5, 5)),  
((3, 4), (5, 5, 9)),   
((3, 4), (7, 5, 5)),   
((1, 2), (1, 2, 3)),  
((1, 2), (1, 4, 7)),   
((1, 2), (2, 2, 3))

]

2- Luego, usa la función group-by para reproducir el efecto de PARTITION BY:

temp3 = temp2.groupByKey()

temp3 ahora es un RDD con 2 filas:

[((1, 2), ),  
 ((3, 4), )]

3- Ahora, necesitas aplicar una función de rango para cada valor del RDD. En python, usaría la función ordenada simple (la enumeración creará su columna de número de fila):

 temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)

Tenga en cuenta que para implementar su pedido en particular, deberá alimentar el derecho “key”argumento (en python, simplemente crearía una función lambda como esas:

lambda tuple : (tuple[0],-tuple[1],tuple[2])

Al final (sin el key función de argumento, se ve así):

[
((1, 2), ((1, 2, 3), 0)), 
((1, 2), ((1, 4, 7), 1)), 
((1, 2), ((2, 2, 3), 2)), 
((3, 4), ((5, 5, 5), 0)), 
((3, 4), ((5, 5, 9), 1)), 
((3, 4), ((7, 5, 5), 2))

]

¡Espero que ayude!

Buena suerte.

Sección de Reseñas y Valoraciones

Recuerda que puedes dar recomendación a este tutorial si te fue útil.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)


Tags :

Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *