Saltar al contenido

pyspark: tenga partición de manera eficiente Al escribir en la misma cantidad de particiones totales que la tabla original

No olvides que en las ciencias un problema suele tener más de una soluciones, pero aquí enseñaremos lo mejor y más eficiente.

Solución:

Tienes varias opciones. En mi código a continuación, asumiré que desea escribir en parquet, pero, por supuesto, puede cambiar eso.

(1) df.repartition(numPartitions, *cols).write.partitionBy(*cols).parquet(writePath)

Esto primero utilizará la partición basada en hash para garantizar que una cantidad limitada de valores de COL lleguen a cada partición. Dependiendo del valor que elija para numPartitions, algunas particiones pueden estar vacías mientras que otras pueden estar llenas de valores; si no está seguro de por qué, lea esto. Entonces, cuando llames partitionBy en DataFrameWriter, cada valor único en cada partición se colocará en su propio archivo individual.

Advertencia: este enfoque puede generar tamaños de partición desequilibrados y tiempos de ejecución de tareas desequilibrados. Esto sucede cuando los valores de su columna están asociados con muchas filas (p. ej., una columna de ciudad; el archivo de la ciudad de Nueva York puede tener muchas filas), mientras que otros valores son menos numerosos (p. ej., valores para pueblos pequeños).

(2) df.sort(sortCols).write.parquet(writePath)

Esta opción funciona muy bien cuando desea (1) que los archivos que escribe tengan casi el mismo tamaño (2) un control exacto sobre la cantidad de archivos escritos. Este enfoque primero ordena globalmente sus datos y luego encuentra divisiones que dividen los datos en k particiones de tamaño uniforme, donde k se especifica en la configuración de chispa spark.sql.shuffle.partitions. Esto significa que todos los valores con los mismos valores de su tipo key son adyacentes entre sí, pero a veces abarcan una división y están en archivos diferentes. Esto, si su caso de uso requiere todas las filas con el mismo key estar en la misma partición, entonces no use este enfoque.

Hay dos bonificaciones adicionales: (1) al ordenar sus datos, su tamaño en el disco a menudo se puede reducir (por ejemplo, ordenar todos los eventos por ID de usuario y luego por tiempo generará muchas repeticiones en los valores de columna, lo que ayuda a la compresión) y (2 ) si escribe en un formato de archivo que lo admita (como Parquet), los lectores posteriores pueden leer los datos de manera óptima mediante el predicado pushdown, porque el escritor de parquet escribirá los valores MAX y MIN de cada columna en los metadatos, lo que permite la lector para omitir filas si la consulta especifica valores fuera del rango (mínimo, máximo) de la partición.

Tenga en cuenta que ordenar en Spark es más costoso que solo volver a particionar y requiere una etapa adicional. Detrás de escena, Spark primero determinará las divisiones en una etapa y luego mezclará los datos en esas divisiones en otra etapa.

(3) df.rdd.partitionBy(partición personalizada).toDF().write.parquet(writePath)

Si está utilizando Spark en Scala, entonces puede escribir un particionador de clientes, que puede superar los molestos errores del particionador basado en hash. Desafortunadamente, no es una opción en pySpark. Si realmente desea escribir un particionador personalizado en pySpark, descubrí que esto es posible, aunque un poco incómodo, usando rdd.repartitionAndSortWithinPartitions:

df.rdd 
  .keyBy(sort_key_function)   # Convert to key-value pairs
  .repartitionAndSortWithinPartitions(numPartitions=N_WRITE_PARTITIONS, 
                                      partitionFunc=part_func) 
  .values() # get rid of keys 
.toDF().write.parquet(writePath)

¿Quizás alguien más conoce una forma más fácil de usar un particionador personalizado en un marco de datos en pyspark?

df.write().repartition(COL).partitionBy(COL) escribirá un archivo por partición. Esto no funcionará bien si una de sus particiones contiene muchos datos. por ejemplo, si una partición contiene 100 GB de datos, Spark intentará escribir un archivo de 100 GB y su trabajo probablemente explote.

df.write().repartition(2, COL).partitionBy(COL) escribirá un máximo de dos archivos por partición, como se describe en esta respuesta. Este enfoque funciona bien para conjuntos de datos que no están muy sesgados (porque la cantidad óptima de archivos por partición es aproximadamente la misma para todas las particiones).

Esta respuesta explica cómo escribir más archivos para las particiones que tienen muchos datos y menos archivos para las particiones pequeñas.

Recuerda dar visibilidad a esta crónica si te valió la pena.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *