Saltar al contenido

partición y sobrescribir la estrategia en un Azure DataLake usando PySpark en Databricks

Solución:

Vi que estás usando databricks en la pila azul. Creo que el método más viable y recomendado que puede utilizar sería utilizar el nuevo proyecto delta lake en databricks:

Proporciona opciones para varios reafirma, fusiona y transacciones ácidas a almacenes de objetos como s3 o almacenamiento de lago de datos azul. Básicamente, proporciona la gestión, la seguridad, el aislamiento y las actualizaciones / fusiones que proporcionan los almacenes de datos a los datalakes. Para una tubería, Apple realmente reemplazó sus almacenes de datos para que se ejecutaran únicamente en databricks delta debido a su funcionalidad y flexibilidad. Para su caso de uso y muchos otros que usan parquet, es solo una simple cambio de reemplazar ‘parquet’ por ‘delta’, para utilizar su funcionalidad (si tiene databricks). Delta es básicamente un evolución natural de parquet y databricks ha hecho un gran trabajo al proporcionar funcionalidad adicional y al mismo tiempo que la fuente abierta.

Para su caso, le sugiero que pruebe el reemplazar donde opción proporcionada en delta. Antes de hacer esto actualización dirigida, la tabla de destino debe tener el formato delta

En lugar de esto:

dataset.repartition(1).write.mode('overwrite')
                         .partitionBy('Year','Week').parquet('curataeddataset')

De https://docs.databricks.com/delta/delta-batch.html:

‘Usted puede sobrescribir selectivamente solo los datos que coincide con predicados sobre columnas de partición

Podrías probar esto:

dataset.write.repartition(1)
       .format("delta")
       .mode("overwrite")
       .partitionBy('Year','Week')
       .option("replaceWhere", "Year == '2019' AND Week >='01' AND Week <='02'") #to avoid overwriting Week3
       .save("curataeddataset")

Además, si desea llevar las particiones a 1, ¿por qué no usa fusionar (1) ya que evitará una reproducción aleatoria completa.

Desde https://mungingdata.com/delta-lake/updating-partitions-with-replacewhere/:

reemplazar donde es particularmente útil cuando tienes que ejecutar un algoritmo computacionalmente costoso, pero solo en ciertas particiones

Por lo tanto, personalmente creo que usar replacewhere para especificar manualmente su sobrescritura será más específico y computacionalmente eficiente que simplemente confiar en:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

Databricks proporciona optimizaciones en las tablas delta, lo que lo convierte en una opción más rápida y mucho más eficiente para el parquet (por lo tanto, una evolución natural) mediante el empaquetado en contenedores y el pedido en z:

Desde el enlace: https: //docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html

  • DÓNDE(binpacking)

‘Optimice el subconjunto de filas que coinciden con el predicado de partición dado. Solo se admiten los filtros que involucran atributos de clave de partición ‘.

  • ZORDER POR

‘Coloca la información de la columna en el mismo conjunto de archivos. Los algoritmos de omisión de datos de Delta Lake utilizan la co-localidad para reducir drásticamente la cantidad de datos que deben leerse ‘.

  • Ejecución de consultas más rápida con compatibilidad con indexación, estadísticas y almacenamiento en caché automático

  • Fiabilidad de los datos con validación de esquemas enriquecida y garantías transaccionales

  • Canalización de datos simplificada con UPSERT flexible soporte y transmisión estructurada unificada + procesamiento por lotes en una sola fuente de datos

También puede consultar la documentación completa del proyecto de código abierto: https://docs.delta.io/latest/index.html

.. También quiero decir que no trabajo para databricks / delta lake. Acabo de ver que sus mejoras y funcionalidad me benefician en mi trabajo.

ACTUALIZAR:

los esencia de la pregunta es “reemplazar datos que existen y crear nuevas carpetas para nuevos datos” y hacerlo de manera altamente escalable y eficaz.

El uso de sobrescritura de partición dinámica en parquet funciona, sin embargo, siento que la evolución natural de ese método es usar operaciones de combinación de tablas delta que básicamente se crearon para ‘integrar datos de Spark DataFrames en Delta Lake’. Le brindan funcionalidad adicional y optimizaciones para fusionar sus datos en función de cómo le gustaría que eso sucediera y mantienen un registro de todas las acciones en una tabla para que pueda revertir las versiones si es necesario.

API de Delta Lake Python(para fusionar): https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder

optimización de databricks: https://kb.databricks.com/delta/delta-merge-into.html#discussion

Usando una sola operación de combinación, puede especificar la condición de combinación, en este caso podría ser una combinación del año y la semana y la identificación, y luego si los registros coinciden (lo que significa que existen en su marco de datos de chispa y tabla delta, semana1 y semana2 ), actualícelos con los datos en su marco de datos Spark y deje los otros registros sin cambios:

#you can also add additional condition if the records match, but not required
.whenMatchedUpdateAll(condition=None)

En algunos casos, si nada coincide, es posible que desee insertar y crear nuevas filas y particiones, para eso puede usar:

.whenNotMatchedInsertAll(condition=None)

Puedes usar .converttodelta operación https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.convertToDelta, para convertir su tabla de parquet en una tabla delta para que pueda realizar operaciones delta en ella usando la api .

‘Ahora puede convertir una tabla de Parquet en su lugar en una tabla de Delta Lake sin volver a escribir ninguno de los datos. Esto es ideal para convertir tablas de Parquet muy grandes que sería costoso reescribir como una tabla Delta. Además, este proceso es reversible ‘

Tu fusionar caso(reemplazar datos donde existen y crear nuevos registros cuando no existen) podría ser así:

(no lo he probado, consulte ejemplos + api para conocer la sintaxis)

%python  
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`curataeddataset`")

deltaTable.alias("target").merge(dataset, "target.Year= dataset.Year  AND target.Week = dataset.Week") 
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()

Si la tabla delta está particionada correctamente (año, semana) y usó la cláusula whenmatched correctamente, estas operaciones estarán altamente optimizadas y podrían tomar segundos en su caso. También le proporciona consistencia, atomicidad e integridad de datos con opción de reversión.

Algunas funciones más proporcionadas es que puede especificar el conjunto de columnas para actualizar si se realiza la coincidencia (si solo necesita actualizar ciertas columnas). También puede habilitar spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true"), de modo que delta utilice particiones específicas mínimas para llevar a cabo la fusión (actualizar, eliminar, crear).

En general, creo que usar este enfoque es una forma muy nueva e innovadora de llevar a cabo actualizaciones específicas, ya que le brinda más control al mismo tiempo que mantiene las operaciones altamente eficientes. El uso de parquet con el modo de sobrescritura de partición dinámica también funcionará bien, sin embargo, las características delta lake brindan calidad de los datos a su lago de datos que no tiene parangón.

Mi recomendación:
Yo diría que por ahora, use el modo de sobrescritura de partición dinámica para que los archivos de parquet hagan sus actualizaciones, y podría experimentar e intentar usar la combinación delta en una sola tabla con la optimización de databricks de spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true") y .whenMatchedUpdateAll() y comparar el rendimiento de ambos (sus archivos son pequeños, así que no creo que haya una gran diferencia). El artículo sobre optimización de la poda de particiones de databricks para fusiones se publicó en febrero, por lo que es realmente nuevo y posiblemente podría cambiar las reglas del juego para las operaciones de fusión delta generales en las que incurren (ya que, bajo el capó, solo crean nuevos archivos, pero la poda de particiones podría acelerarlo)

Combinar ejemplos en python, scala, sql: https://docs.databricks.com/delta/delta-update.html#merge-examples

https://databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *