Esta es la respuesta más exacta que encomtrarás brindar, pero obsérvala detenidamente y valora si se puede adaptar a tu proyecto.
Solución:
Esto sucede porque Spark intenta hacer Broadcast Hash Join y uno de los DataFrames es muy grande, por lo que enviarlo consume mucho tiempo.
Puede:
- Establecer más alto
spark.sql.broadcastTimeout
para aumentar el tiempo de espera –spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
persist()
ambos DataFrames, luego Spark usará Shuffle Join – referencia desde aquí
PySpark
En PySpark, puede establecer la configuración cuando crea el contexto de chispa de la siguiente manera:
spark = SparkSession
.builder
.appName("Your App")
.config("spark.sql.broadcastTimeout", "36000")
.getOrCreate()
Solo para agregar algo de contexto de código a la respuesta muy concisa de @T. Gawęda.
En su aplicación Spark, Spark SQL eligió un unión hash de difusión para la unión porque “libriFirstTable50Plus3DF tiene 766,151 registros” que resultó ser menor que el llamado umbral de difusión (predeterminado en 10 MB).
Puede controlar el umbral de transmisión mediante la propiedad de configuración spark.sql.autoBroadcastJoinThreshold.
spark.sql.autoBroadcastJoinThreshold Configura el tamaño máximo en bytes para una tabla que se transmitirá a todos los nodos trabajadores al realizar una unión. Al establecer este valor en -1, se puede deshabilitar la transmisión. Tenga en cuenta que actualmente las estadísticas solo son compatibles con las tablas de Hive Metastore donde se ha ejecutado el comando ANALYZE TABLE COMPUTE STATISTICS noscan.
Puede encontrar ese tipo particular de combinación en el seguimiento de la pila:
org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
BroadcastHashJoin
El operador físico en Spark SQL usa una variable de transmisión para distribuir el conjunto de datos más pequeño a los ejecutores de Spark (en lugar de enviar una copia con cada tarea).
si usaste explain
para revisar el plan de consulta física, notará que la consulta usa el operador físico BroadcastExchangeExec. Aquí es donde puede ver la maquinaria subyacente para transmitir la tabla más pequeña (y el tiempo de espera).
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] =
ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
doExecuteBroadcast
es parte de SparkPlan
contrato que sigue cada operador físico en Spark SQL que permite la transmisión si es necesario. BroadcastExchangeExec
pasa a necesitarlo.
El parámetro de tiempo de espera es lo que está buscando.
private val timeout: Duration =
val timeoutValue = sqlContext.conf.broadcastTimeout
if (timeoutValue < 0)
Duration.Inf
else
timeoutValue.seconds
Como puede ver, puede deshabilitarlo por completo (usando un valor negativo), lo que implicaría esperar a que la variable de transmisión se envíe a los ejecutores indefinidamente o usar sqlContext.conf.broadcastTimeout
que es exactamente la propiedad de configuración spark.sql.broadcastTimeout. El valor predeterminado es 5 * 60
segundos que puedes ver en el stacktrace:
java.util.concurrent.TimeoutException: los futuros expiraron después de [300 seconds]
Al final de todo puedes encontrar las explicaciones de otros creadores, tú aún tienes la opción de insertar el tuyo si te gusta.