Este grupo redactor ha pasado horas buscando la solución a tu interrogante, te compartimos la soluciones y nuestro objetivo es serte de gran ayuda.
Solución:
Pensé que era una pregunta interesante y dediqué un tiempo a descubrir cómo lograrlo sin una tarea ficticia adicional. Se convirtió en una tarea un poco superflua, pero aquí está el resultado final:
Este es el DAG completo:
import airflow
from airflow import AirflowException
from airflow.models import DAG, TaskInstance, BaseOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
default_args = "owner": "airflow", "start_date": airflow.utils.dates.days_ago(3)
dag = DAG(
dag_id="finally_task_set_end_state",
default_args=default_args,
schedule_interval="0 0 * * *",
description="Answer for question https://stackoverflow.com/questions/51728441",
)
start = BashOperator(task_id="start", bash_command="echo start", dag=dag)
failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", dag=dag)
@provide_session
def _finally(task, execution_date, dag, session=None, **_):
upstream_task_instances = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id.in_(task.upstream_task_ids),
)
.all()
)
upstream_states = [ti.state for ti in upstream_task_instances]
fail_this_task = State.FAILED in upstream_states
print("Do logic here...")
if fail_this_task:
raise AirflowException("Failing task because one or more upstream tasks failed.")
finally_ = PythonOperator(
task_id="finally",
python_callable=_finally,
trigger_rule=TriggerRule.ALL_DONE,
provide_context=True,
dag=dag,
)
succesful_task = DummyOperator(task_id="succesful_task", dag=dag)
start >> [failing_task, succesful_task] >> finally_
mira el _finally
función, que es llamada por PythonOperator. Hay algunos key puntos aquí:
- anotar con
@provide_session
y agregar argumentosession=None
para que pueda consultar Airflow DB consession
. - Consulta todas las instancias de tareas ascendentes para la tarea actual:
upstream_task_instances = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id.in_(task.upstream_task_ids),
)
.all()
)
- De las instancias de tareas devueltas, obtenga los estados y verifique si
State.FAILED
está ahí:
upstream_states = [ti.state for ti in upstream_task_instances]
fail_this_task = State.FAILED in upstream_states
- Realiza tu propia lógica:
print("Do logic here...")
- Y finalmente, fallar la tarea si
fail_this_task=True
:
if fail_this_task:
raise AirflowException("Failing task because one or more upstream tasks failed.")
El final resulto:
Como explicó @JustinasMarozas en un comentario, una solución es crear una tarea ficticia como:
dummy = DummyOperator(
task_id='test',
dag=dag
)
y unirlo aguas abajo a special_task
:
failing_task.set_downstream(dummy)
Por lo tanto, el DAG se marca como fallido y el dummy
la tarea se marca como upstream_failed
.
Espero que haya una solución lista para usar, pero esperando eso, esta solución hace el trabajo.
Sección de Reseñas y Valoraciones
Recuerda mostrar este escrito si te valió la pena.