Saltar al contenido

Cómo verificar el número de particiones de un Spark DataFrame sin incurrir en el costo de .rdd

Solución:

No hay un costo inherente de rdd componente en rdd.getNumPartitions, porque regresó RDD nunca se evalúa.

Si bien puede determinar esto fácilmente de manera empírica, utilizando el depurador (dejaré esto como un ejercicio para el lector) o estableciendo que no se activan trabajos en el escenario del caso base

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/_,_/_/ /_/_   version 2.4.0
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]

scala> ds.rdd.getNumPartitions
res0: Int = 1

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true

puede que no sea suficiente para convencerte. Así que abordemos esto de una manera más sistemática:

  • rdd devuelve un MapPartitionRDD (ds como se define arriba):

    scala> ds.rdd.getClass
    res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
    
  • RDD.getNumPartitions invoca RDD.partitions.

  • En un escenario sin puntos de control RDD.partitions invoca getPartitions (siéntase libre de rastrear también la ruta del punto de control).
  • RDD.getPartitions es abstracto.
  • Entonces, la implementación real utilizada en este caso es MapPartitionsRDD.getPartitions, que simplemente delega la llamada al padre.
  • Solo hay MapPartitionsRDD Entre rdd y la fuente.

    scala> ds.rdd.toDebugString
    res3: String =
    (1) MapPartitionsRDD[3] at rdd at <console>:26 []
     |  MapPartitionsRDD[2] at rdd at <console>:26 []
     |  MapPartitionsRDD[1] at rdd at <console>:26 []
     |  FileScanRDD[0] at rdd at <console>:26 []
    

    Similarmente si Dataset contenía un intercambio que seguiríamos a los padres hasta la mezcla más cercana:

    scala> ds.orderBy("value").rdd.toDebugString
    res4: String =
    (67) MapPartitionsRDD[13] at rdd at <console>:26 []
     |   MapPartitionsRDD[12] at rdd at <console>:26 []
     |   MapPartitionsRDD[11] at rdd at <console>:26 []
     |   ShuffledRowRDD[10] at rdd at <console>:26 []
     +-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
        |  MapPartitionsRDD[5] at rdd at <console>:26 []
        |  FileScanRDD[4] at rdd at <console>:26 []
    

    Tenga en cuenta que este caso es particularmente interesante, porque en realidad activamos un trabajo:

    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
    res5: Boolean = false
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
    res6: Array[Int] = Array(0)
    

    Esto se debe a que nos hemos encontrado con un escenario en el que las particiones no se pueden determinar estáticamente (consulte ¿Número de particiones de marco de datos después de ordenar? Y ¿Por qué la transformación sortBy activa un trabajo de Spark?).

    En tal escenario getNumPartitions también activará un trabajo:

    scala> ds.orderBy("value").rdd.getNumPartitions
    res7: Int = 67
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)  // Note new job id
    res8: Array[Int] = Array(1, 0)
    

    sin embargo, no significa que el costo observado esté relacionado de alguna manera con .rdd llama. En cambio, es un costo intrínseco de encontrar partitions en caso de que no haya una fórmula estática (algunos formatos de entrada de Hadoop, por ejemplo, donde se requiere un escaneo completo de los datos).

Tenga en cuenta que los puntos señalados aquí no deben extrapolarse a otras aplicaciones de Dataset.rdd. Por ejemplo ds.rdd.count sería realmente caro y derrochador.

En mi experiencia df.rdd.getNumPartitions es muy rápido, nunca me encontré con tomar esto más de un segundo más o menos.

Alternativamente, también puedes probar

val numPartitions: Long = df
      .select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()

que evitaría usar .rdd

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *