Entiende el código de forma correcta antes de usarlo a tu trabajo si tquieres aportar algo puedes dejarlo en la sección de comentarios.
Solución:
El problema está en tu código. Debido a que sobrescribe una tabla desde la que está tratando de leer, borra efectivamente todos los datos antes de que Spark pueda acceder a ellos.
Recuerda que Spark es perezoso. Cuando creas un Dataset
Spark obtiene los metadatos necesarios, pero no carga los datos. Por lo tanto, no existe un caché mágico que conserve el contenido original. Los datos se cargarán cuando realmente se requieran. Aquí está cuando ejecutas write
acción y cuando comienza a escribir no hay más datos para buscar.
Lo que necesitas es algo como esto:
- Crear un
Dataset
. -
Aplique las transformaciones requeridas y escriba datos en una tabla MySQL intermedia.
-
TRUNCATE
la entrada original yINSERT INTO ... SELECT
de la mesa intermedia oDROP
la tabla original yRENAME
mesa intermedia.
Un enfoque alternativo, pero menos favorable, sería:
- Crear un
Dataset
. - Aplicar las transformaciones requeridas y escribir datos en una tabla Spark persistente (
df.write.saveAsTable(...)
o equivalente) TRUNCATE
la entrada original.- Leer datos y guardar (
spark.table(...).write.jdbc(...)
) - Caída de la mesa Spark.
No podemos enfatizar lo suficiente que usar Spark cache
/ persist
no es el camino a seguir. Incluso con el conservador StorageLevel
(MEMORY_AND_DISK_2
/ MEMORY_AND_DISK_SER_2
) los datos almacenados en caché se pueden perder (fallas de nodo), lo que lleva a errores de corrección silenciosos.
Creo que todos los pasos anteriores son innecesarios. Esto es lo que debe hacer:
-
Crear un conjunto de datos
A
me gustaval A = spark.read.parquet("....")
-
Lea la tabla que se actualizará, como marco de datos
B
. Asegúrese de que habilitar el almacenamiento en caché esté habilitado para el marco de datosB
.val B = spark.read.jdbc("mytable").cache
-
Forzar un
count
sobreB
– esto forzará la ejecución y almacenará en caché la tabla según el elegidoStorageLevel
–B.count
-
Ahora, puedes hacer una transformación como
val C = A.union(B)
-
Y luego escribe
C
volver a la base de datos comoC.write.mode(SaveMode.Overwrite).jdbc("mytable")
Si te ha resultado de utilidad nuestro artículo, te agradeceríamos que lo compartas con otros juniors de esta manera contrubuyes a dar difusión a este contenido.