Saltar al contenido

¿Cuándo almacenar en caché un DataFrame?

Solución:

¿cuándo debo hacer dataframe.cache () y cuándo es útil?

cache lo que va a utilizar en las consultas (y antes y, a menudo, hasta la memoria disponible). Realmente no importa qué lenguaje de programación use (Python o Scala o Java o SQL o R) ya que la mecánica subyacente es la misma.

Puede ver si un DataFrame se almacenó en caché en su plan físico usando explain operador (donde InMemoryRelation las entidades reflejan los conjuntos de datos almacenados en caché con su nivel de almacenamiento):

== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 1, step=1, splits=Some(8))

Después de ti cache (o persist) su DataFrame, la primera consulta puede volverse más lenta, pero valdrá la pena para las siguientes consultas.

Puede verificar si un conjunto de datos se almacenó en caché o no con el siguiente código:

scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false

Además, en mi código, ¿debería almacenar en caché los marcos de datos en las líneas comentadas?

Si y no. Almacene en caché lo que representa conjuntos de datos externos para no pagar el precio adicional de transmitir datos a través de la red (mientras accede al almacenamiento externo) cada vez que los consulta.

No almacene en caché lo que usa solo una vez o es fácil de calcular. De lo contrario, cache.


Tenga cuidado con lo que guarda en caché, es decir, lo que Dataset se almacena en caché, ya que proporciona diferentes consultas en caché.

// cache after range(5)
val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id")
scala> q1.explain
== Physical Plan ==
*Filter ((id#0L % 2) = 0)
+- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 5, step=1, splits=8)

// cache at the end
val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#17L]
   +- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Filter ((id#17L % 2) = 0)
            +- *Range (0, 1, step=1, splits=8)

Hay una sorpresa con el almacenamiento en caché en Spark SQL. El almacenamiento en caché es lento y es por eso que paga el precio adicional para que las filas se almacenen en caché la primera acción, pero eso solo sucede con la API de DataFrame. En SQL, el almacenamiento en caché es ansioso, lo que marca una gran diferencia en el rendimiento de la consulta, ya que no tiene que llamar a una acción para activar el almacenamiento en caché.

De hecho en tu caso .cache() no ayudará en absoluto. No está ejecutando ninguna acción en su marco de datos (al menos no en la función proporcionada). .cache() es una buena idea si va a utilizar datos varias veces como:

data = sub_tax_transfer_pricing_eur_aux(...).cache()
one_use_case = data.groupBy(...).agg(...).show()
another_use_case = data.groupBy(...).agg(...).show()

De esta manera, obtendrá datos solo una vez (cuando se llame a la primera acción .show() y luego el siguiente uso de data el marco de datos debería ser más rápido. Sin embargo, use esto con precaución: en ocasiones, recuperar datos nuevamente es más rápido. Además, desaconsejaría nombrar el mismo nombre a su marco de datos una y otra vez. Los marcos de datos son objetos inmutables, después de todo.

Espero que esto sea de ayuda.

Almacenamiento en caché de RDD en Spark: es un mecanismo para acelerar las aplicaciones que acceden al mismo RDD varias veces. Un RDD que no está almacenado en caché, ni con puntos de control, se vuelve a evaluar cada vez que se invoca una acción en ese RDD. Hay dos llamadas a funciones para almacenar en caché un RDD: cache() y persist(level: StorageLevel). La diferencia entre ellos es que cache() almacenará en caché el RDD en la memoria, mientras que persist(level) puede almacenar en caché en la memoria, en el disco o fuera del montón de memoria de acuerdo con la estrategia de almacenamiento en caché especificada por nivel.
persist() sin un argumento es equivalente a cache(). Discutiremos las estrategias de almacenamiento en caché más adelante en esta publicación. La liberación de espacio de la memoria de almacenamiento se realiza mediante unpersist().

Cuándo usar el almacenamiento en caché: como se sugiere en esta publicación, se recomienda usar el almacenamiento en caché en las siguientes situaciones:

  • Reutilización de RDD en aplicaciones iterativas de aprendizaje automático
  • Reutilización de RDD en aplicaciones Spark independientes
  • Cuando el cálculo de RDD es costoso, el almacenamiento en caché puede ayudar a reducir el costo de recuperación en caso de que falle un ejecutor
¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *