Saltar al contenido

Spark SQL y MySQL- SaveMode.Overwrite sin insertar datos modificados

Entiende el código de forma correcta antes de usarlo a tu trabajo si tquieres aportar algo puedes dejarlo en la sección de comentarios.

Solución:

El problema está en tu código. Debido a que sobrescribe una tabla desde la que está tratando de leer, borra efectivamente todos los datos antes de que Spark pueda acceder a ellos.

Recuerda que Spark es perezoso. Cuando creas un Dataset Spark obtiene los metadatos necesarios, pero no carga los datos. Por lo tanto, no existe un caché mágico que conserve el contenido original. Los datos se cargarán cuando realmente se requieran. Aquí está cuando ejecutas write acción y cuando comienza a escribir no hay más datos para buscar.

Lo que necesitas es algo como esto:

  • Crear un Dataset.
  • Aplique las transformaciones requeridas y escriba datos en una tabla MySQL intermedia.

  • TRUNCATE la entrada original y INSERT INTO ... SELECT de la mesa intermedia o DROP la tabla original y RENAME mesa intermedia.

Un enfoque alternativo, pero menos favorable, sería:

  • Crear un Dataset.
  • Aplicar las transformaciones requeridas y escribir datos en una tabla Spark persistente (df.write.saveAsTable(...) o equivalente)
  • TRUNCATE la entrada original.
  • Leer datos y guardar (spark.table(...).write.jdbc(...))
  • Caída de la mesa Spark.

No podemos enfatizar lo suficiente que usar Spark cache / persist no es el camino a seguir. Incluso con el conservador StorageLevel (MEMORY_AND_DISK_2 / MEMORY_AND_DISK_SER_2) los datos almacenados en caché se pueden perder (fallas de nodo), lo que lleva a errores de corrección silenciosos.

Creo que todos los pasos anteriores son innecesarios. Esto es lo que debe hacer:

  • Crear un conjunto de datos A me gusta val A = spark.read.parquet("....")

  • Lea la tabla que se actualizará, como marco de datos B. Asegúrese de que habilitar el almacenamiento en caché esté habilitado para el marco de datos B. val B = spark.read.jdbc("mytable").cache

  • Forzar un count sobre B – esto forzará la ejecución y almacenará en caché la tabla según el elegido StorageLevelB.count

  • Ahora, puedes hacer una transformación como val C = A.union(B)

  • Y luego escribe C volver a la base de datos como C.write.mode(SaveMode.Overwrite).jdbc("mytable")

Si te ha resultado de utilidad nuestro artículo, te agradeceríamos que lo compartas con otros juniors de esta manera contrubuyes a dar difusión a este contenido.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *