Solución:
No hay un costo inherente de rdd
componente en rdd.getNumPartitions
, porque regresó RDD
nunca se evalúa.
Si bien puede determinar esto fácilmente de manera empírica, utilizando el depurador (dejaré esto como un ejercicio para el lector) o estableciendo que no se activan trabajos en el escenario del caso base
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.4.0
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]
scala> ds.rdd.getNumPartitions
res0: Int = 1
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true
puede que no sea suficiente para convencerte. Así que abordemos esto de una manera más sistemática:
-
rdd
devuelve unMapPartitionRDD
(ds
como se define arriba):scala> ds.rdd.getClass res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
-
RDD.getNumPartitions
invocaRDD.partitions
. - En un escenario sin puntos de control
RDD.partitions
invocagetPartitions
(siéntase libre de rastrear también la ruta del punto de control). -
RDD.getPartitions
es abstracto. - Entonces, la implementación real utilizada en este caso es
MapPartitionsRDD.getPartitions
, que simplemente delega la llamada al padre. -
Solo hay
MapPartitionsRDD
Entrerdd
y la fuente.scala> ds.rdd.toDebugString res3: String = (1) MapPartitionsRDD[3] at rdd at <console>:26 [] | MapPartitionsRDD[2] at rdd at <console>:26 [] | MapPartitionsRDD[1] at rdd at <console>:26 [] | FileScanRDD[0] at rdd at <console>:26 []
Similarmente si
Dataset
contenía un intercambio que seguiríamos a los padres hasta la mezcla más cercana:scala> ds.orderBy("value").rdd.toDebugString res4: String = (67) MapPartitionsRDD[13] at rdd at <console>:26 [] | MapPartitionsRDD[12] at rdd at <console>:26 [] | MapPartitionsRDD[11] at rdd at <console>:26 [] | ShuffledRowRDD[10] at rdd at <console>:26 [] +-(1) MapPartitionsRDD[9] at rdd at <console>:26 [] | MapPartitionsRDD[5] at rdd at <console>:26 [] | FileScanRDD[4] at rdd at <console>:26 []
Tenga en cuenta que este caso es particularmente interesante, porque en realidad activamos un trabajo:
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty res5: Boolean = false scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) res6: Array[Int] = Array(0)
Esto se debe a que nos hemos encontrado con un escenario en el que las particiones no se pueden determinar estáticamente (consulte ¿Número de particiones de marco de datos después de ordenar? Y ¿Por qué la transformación sortBy activa un trabajo de Spark?).
En tal escenario
getNumPartitions
también activará un trabajo:scala> ds.orderBy("value").rdd.getNumPartitions res7: Int = 67 scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id res8: Array[Int] = Array(1, 0)
sin embargo, no significa que el costo observado esté relacionado de alguna manera con
.rdd
llama. En cambio, es un costo intrínseco de encontrarpartitions
en caso de que no haya una fórmula estática (algunos formatos de entrada de Hadoop, por ejemplo, donde se requiere un escaneo completo de los datos).
Tenga en cuenta que los puntos señalados aquí no deben extrapolarse a otras aplicaciones de Dataset.rdd
. Por ejemplo ds.rdd.count
sería realmente caro y derrochador.
En mi experiencia df.rdd.getNumPartitions
es muy rápido, nunca me encontré con tomar esto más de un segundo más o menos.
Alternativamente, también puedes probar
val numPartitions: Long = df
.select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()
que evitaría usar .rdd