Saltar al contenido

¿Por qué falla la unión con “java.util.concurrent.TimeoutException: Futures agotó el tiempo de espera después de [300 seconds]”?

Esta es la respuesta más exacta que encomtrarás brindar, pero obsérvala detenidamente y valora si se puede adaptar a tu proyecto.

Solución:

Esto sucede porque Spark intenta hacer Broadcast Hash Join y uno de los DataFrames es muy grande, por lo que enviarlo consume mucho tiempo.

Puede:

  1. Establecer más alto spark.sql.broadcastTimeout para aumentar el tiempo de espera – spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
  2. persist() ambos DataFrames, luego Spark usará Shuffle Join – referencia desde aquí

PySpark

En PySpark, puede establecer la configuración cuando crea el contexto de chispa de la siguiente manera:

spark = SparkSession
  .builder
  .appName("Your App")
  .config("spark.sql.broadcastTimeout", "36000")
  .getOrCreate()

Solo para agregar algo de contexto de código a la respuesta muy concisa de @T. Gawęda.


En su aplicación Spark, Spark SQL eligió un unión hash de difusión para la unión porque “libriFirstTable50Plus3DF tiene 766,151 registros” que resultó ser menor que el llamado umbral de difusión (predeterminado en 10 MB).

Puede controlar el umbral de transmisión mediante la propiedad de configuración spark.sql.autoBroadcastJoinThreshold.

spark.sql.autoBroadcastJoinThreshold Configura el tamaño máximo en bytes para una tabla que se transmitirá a todos los nodos trabajadores al realizar una unión. Al establecer este valor en -1, se puede deshabilitar la transmisión. Tenga en cuenta que actualmente las estadísticas solo son compatibles con las tablas de Hive Metastore donde se ha ejecutado el comando ANALYZE TABLE COMPUTE STATISTICS noscan.

Puede encontrar ese tipo particular de combinación en el seguimiento de la pila:

org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)

BroadcastHashJoin El operador físico en Spark SQL usa una variable de transmisión para distribuir el conjunto de datos más pequeño a los ejecutores de Spark (en lugar de enviar una copia con cada tarea).

si usaste explain para revisar el plan de consulta física, notará que la consulta usa el operador físico BroadcastExchangeExec. Aquí es donde puede ver la maquinaria subyacente para transmitir la tabla más pequeña (y el tiempo de espera).

override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = 
  ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]

doExecuteBroadcast es parte de SparkPlan contrato que sigue cada operador físico en Spark SQL que permite la transmisión si es necesario. BroadcastExchangeExec pasa a necesitarlo.

El parámetro de tiempo de espera es lo que está buscando.

private val timeout: Duration = 
  val timeoutValue = sqlContext.conf.broadcastTimeout
  if (timeoutValue < 0) 
    Duration.Inf
   else 
    timeoutValue.seconds
  

Como puede ver, puede deshabilitarlo por completo (usando un valor negativo), lo que implicaría esperar a que la variable de transmisión se envíe a los ejecutores indefinidamente o usar sqlContext.conf.broadcastTimeout que es exactamente la propiedad de configuración spark.sql.broadcastTimeout. El valor predeterminado es 5 * 60 segundos que puedes ver en el stacktrace:

java.util.concurrent.TimeoutException: los futuros expiraron después de [300 seconds]

Al final de todo puedes encontrar las explicaciones de otros creadores, tú aún tienes la opción de insertar el tuyo si te gusta.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *