Al fin después de tanto trabajar pudimos dar con la contestación de este atasco que muchos lectores de este sitio tienen. Si tienes algún dato que aportar puedes aportar tu comentario.
Solución:
ExternalTaskSensor
asume que usted depende de una tarea en una ejecución dag con la misma fecha de ejecución.
Esto significa que en tu caso dags a
y b
necesita ejecutarse en el mismo horario (por ejemplo, todos los días a las 9:00 am o w/e).
De lo contrario, debe usar el execution_delta
o execution_date_fn
cuando instancias un ExternalTaskSensor
.
Aquí está la documentación dentro del propio operador para ayudar a aclarar más:
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task.
For yesterday, use [positive!] datetime.timedelta(days=1). Either
execution_delta or execution_date_fn can be passed to
ExternalTaskSensor, but not both.
:type execution_delta: datetime.timedelta
:param execution_date_fn: function that receives the current execution date
and returns the desired execution date to query. Either execution_delta
or execution_date_fn can be passed to ExternalTaskSensor, but not both.
:type execution_date_fn: callable
Para aclarar algo que he visto aquí y en otras preguntas relacionadas, los dags no necesariamente tienen que ejecutarse en el mismo horario, como se indica en la respuesta aceptada. Los dags tampoco necesitan tener el mismo start_date
. Si creas tu ExternalTaskSensor
tarea sin la execution_delta
o execution_date_fn
entonces los dos dags deben tener el mismo fecha de ejecución. Sucede que si dos dags tienen la misma programación, las ejecuciones programadas en cada intervalo tendrán la misma fecha de ejecución. No estoy seguro de cuál sería la fecha de ejecución para las ejecuciones activadas manualmente de dags programados.
Para que este ejemplo funcione, dag b
‘s ExternalTaskSensor
la tarea necesita un execution_delta
o execution_date_fn
parámetro. Si usa un execution_delta
parámetro, debe ser tal que b
fecha de ejecución – execution_delta
= a
fecha de ejecución. si usa execution_date_fn
entonces esa función debería devolver a
fecha de ejecución.
Si estuviera usando el TriggerDagRunOperator
luego usando un ExternalTaskSensor
para detectar cuándo se completó ese dag, puede hacer algo como pasar la fecha de ejecución del dag principal al desencadenado con el TriggerDagRunOperator
‘s execution_date
parámetro, como execution_date=' execution_date '
. Entonces, la fecha de ejecución de ambos dags sería la misma, y no necesitarías que los horarios fueran los mismos para cada dag, o usar el execution_delta
o execution_date_fn
parámetros de los sensores.
Lo anterior fue escrito y probado en Airflow 1.10.9
Si te ha sido de provecho este post, nos gustaría que lo compartas con otros juniors y nos ayudes a difundir nuestro contenido.