Saltar al contenido

Usando monotonically_increasing_id () para asignar el número de fila al marco de datos de pyspark

Solución:

De la documentación

Una columna que genera enteros de 64 bits que aumentan monótonamente.

Se garantiza que la ID generada aumentará de forma monótona y será única, pero no consecutiva. La implementación actual coloca el ID de la partición en los 31 bits superiores y el número de registro dentro de cada partición en los 33 bits inferiores. El supuesto es que el marco de datos tiene menos de mil millones de particiones y cada partición tiene menos de 8 mil millones de registros.

Por lo tanto, no es como una identificación de incremento automático en RDB y es no confiable para fusionar.

Si necesita un comportamiento de incremento automático como en los RDB y sus datos se pueden ordenar, puede usar row_number

df.createOrReplaceTempView('df')
spark.sql('select row_number() over (order by "some_column") as num, * from df')
+---+-----------+
|num|some_column|
+---+-----------+
|  1|   ....... |
|  2|   ....... |
|  3| ..........|
+---+-----------+

Si sus datos no se pueden ordenar y no le importa usar rdds para crear los índices y luego recurrir a los marcos de datos, puede usar rdd.zipWithIndex()

Un ejemplo puede ser encontrado aquí

En breve:

# since you have a dataframe, use the rdd interface to create indexes with zipWithIndex()
df = df.rdd.zipWithIndex()
# return back to dataframe
df = df.toDF()

df.show()

# your data           | indexes
+---------------------+---+
|         _1          | _2| 
+-----------=---------+---+
|[data col1,data col2]|  0|
|[data col1,data col2]|  1|
|[data col1,data col2]|  2|
+---------------------+---+

Probablemente necesitará algunas transformaciones más después de eso para que su marco de datos sea lo que necesita. Nota: no es una solución muy eficaz.

Espero que esto ayude. ¡Buena suerte!

Editar:
Ahora que lo pienso, puede combinar el monotonically_increasing_id usar el row_number:

# create a monotonically increasing id 
df = df.withColumn("idx", monotonically_increasing_id())

# then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number`
df.createOrReplaceTempView('df')
new_df = spark.sql('select row_number() over (order by "idx") as num, * from df')

Sin embargo, no estoy seguro del rendimiento.

Puede encontrar ejemplos completos de las formas de hacer esto y los riesgos aquí.

usando funciones api puede hacer simplemente lo siguiente

from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
df1 = df1.withColumn("idx", F.monotonically_increasing_id())
windowSpec = W.orderBy("idx")
df1.withColumn("idx", F.row_number().over(windowSpec)).show()

Espero que la respuesta sea de ayuda

Encontré útil la solución de @mkaran, pero para mí no había una columna de pedido mientras usaba la función de ventana. Quería mantener el orden de las filas del marco de datos como sus índices (lo que verías en un marco de datos de pandas). Por lo tanto, la solución en la sección de edición llegó a ser útil. Dado que es una buena solución (si el rendimiento no es una preocupación), me gustaría compartirla como una respuesta separada.

# Add a increasing data column 
df_index = df.withColumn("idx", monotonically_increasing_id())

# Create the window specification
w = Window.orderBy("idx")

# Use row number with the window specification
df_index = df_index.withColumn("index", F.row_number().over(w))

# Drop the created increasing data column
df2_index = df2_index.drop("idx")

df es su marco de datos original y df_index es un nuevo marco de datos.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *