Solución:
Primero, realmente evitaría usar coalesce
, ya que esto a menudo se empuja hacia arriba en la cadena de transformación y puede destruir el paralelismo de su trabajo (pregunté sobre este problema aquí: Coalesce reduce el paralelismo de toda la etapa (chispa))
Escribir 1 archivo por partición de parquet es realmente fácil (consulte el método de escritura de marco de datos de Spark para escribir muchos archivos pequeños):
data.repartition($"key").write.partitionBy("key").parquet("/location")
Si desea establecer una cantidad arbitraria de archivos (o archivos que tienen todos el mismo tamaño), debe volver a particionar sus datos utilizando otro atributo que podría usarse (no puedo decirle cuál podría ser esto en su caso):
data.repartition($"key",$"another_key").write.partitionBy("key").parquet("/location")
another_key
podría ser otro atributo de su conjunto de datos, o un atributo derivado usando algún módulo o operaciones de redondeo en atributos existentes. Incluso podrías usar funciones de ventana con row_number
sobre key
y luego redondear esto con algo como
data.repartition($"key",floor($"row_number"/N)*N).write.partitionBy("key").parquet("/location")
Esto te pondría N
registros en 1 archivo de parquet
usando orderBy
También puede controlar la cantidad de archivos sin volver a particionar ordenando su marco de datos en consecuencia:
data.orderBy($"key").write.partitionBy("key").parquet("/location")
Esto conducirá a un total de (al menos, pero no mucho más) spark.sql.shuffle.partitions
archivos en todas las particiones (por defecto 200). Incluso es beneficioso agregar una segunda columna de pedidos después $key
, ya que parquet recordará el orden del marco de datos y escribirá las estadísticas en consecuencia. Por ejemplo, puede ordenar por ID:
data.orderBy($"key",$"id").write.partitionBy("key").parquet("/location")
Esto no cambiará el número de archivos, pero mejorará el rendimiento cuando consulte su archivo de parquet para un determinado key
y id
. Consulte, por ejemplo, https://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide y https://db-blog.web.cern.ch/blog/luca-canali/2017-06- Buceo-chispa-y-parquet-cargas-de-trabajo-ejemplo
Spark 2.2+
Desde Spark 2.2 en adelante, también puedes jugar con la nueva opción maxRecordsPerFile
para limitar el número de registros por archivo si tiene archivos demasiado grandes. Aún obtendrá al menos N archivos si tiene N particiones, pero puede dividir el archivo escrito por 1 partición (tarea) en trozos más pequeños:
df.write
.option("maxRecordsPerFile", 10000)
...
Ver, por ejemplo, http://www.gatorsmile.io/anticipated-feature-in-spark-2-2-max-records-written-per-file/ y escribir en disco con N archivos de menos de N particiones
Ampliemos la respuesta de Raphael Roth con un enfoque adicional que creará un límite superior en la cantidad de archivos que puede contener cada partición, como se explica en esta respuesta:
import org.apache.spark.sql.functions.rand
df.repartition(numPartitions, $"some_col", rand)
.write.partitionBy("some_col")
.parquet("partitioned_lake")
Esta publicación de blog explica en detalle todas las opciones de particionamiento para usar junto con partitionBy.
Esto me está funcionando muy bien:
data.repartition(n, "key").write.partitionBy("key").parquet("/location")
Produce N archivos en cada partición de salida (directorio) y es (anecdóticamente) más rápido que usar coalesce
y (de nuevo, anecdóticamente, en mi conjunto de datos) más rápido que solo reparticionar en la salida.
Si está trabajando con S3, también recomiendo hacer todo en unidades locales (Spark crea muchos archivos / renombra / elimina durante las escrituras) y una vez que todo esté resuelto, use hadoop FileUtil
(o simplemente el aws cli) para copiar todo:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
def copy(
in : String,
out : String,
sparkSession: SparkSession
) = {
FileUtil.copy(
FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
new Path(in),
FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
new Path(out),
false,
sparkSession.sparkContext.hadoopConfiguration
)
}
Editar: según la discusión en los comentarios:
Tiene un conjunto de datos con una columna de partición de AÑO, pero cada AÑO determinado tiene cantidades de datos muy diferentes. Entonces, un año podría tener 1 GB de datos, pero otro podría tener 100 GB.
Aquí está psuedocode para una forma de manejar esto:
val partitionSize = 10000 // Number of rows you want per output file.
val yearValues = df.select("YEAR").distinct
distinctGroupByValues.each((yearVal) -> {
val subDf = df.filter(s"YEAR = $yearVal")
val numPartitionsToUse = subDf.count / partitionSize
subDf.repartition(numPartitionsToUse).write(outputPath + "/year=$yearVal")
})
Pero, en realidad, no sé qué funcionará. Es posible que Spark tenga problemas para leer una cantidad variable de archivos por partición de columna.
Otra forma de hacerlo sería escribir su propio particionador personalizado, pero no tengo idea de lo que implica eso, así que no puedo proporcionar ningún código.