Saltar al contenido

Flujo de aire: DAG marcado como “éxito” si una tarea falla, debido a la regla de activación ALL_DONE

Este grupo redactor ha pasado horas buscando la solución a tu interrogante, te compartimos la soluciones y nuestro objetivo es serte de gran ayuda.

Solución:

Pensé que era una pregunta interesante y dediqué un tiempo a descubrir cómo lograrlo sin una tarea ficticia adicional. Se convirtió en una tarea un poco superflua, pero aquí está el resultado final:

Este es el DAG completo:

import airflow
from airflow import AirflowException
from airflow.models import DAG, TaskInstance, BaseOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule

default_args = "owner": "airflow", "start_date": airflow.utils.dates.days_ago(3)

dag = DAG(
    dag_id="finally_task_set_end_state",
    default_args=default_args,
    schedule_interval="0 0 * * *",
    description="Answer for question https://stackoverflow.com/questions/51728441",
)

start = BashOperator(task_id="start", bash_command="echo start", dag=dag)
failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", dag=dag)


@provide_session
def _finally(task, execution_date, dag, session=None, **_):
    upstream_task_instances = (
        session.query(TaskInstance)
        .filter(
            TaskInstance.dag_id == dag.dag_id,
            TaskInstance.execution_date == execution_date,
            TaskInstance.task_id.in_(task.upstream_task_ids),
        )
        .all()
    )
    upstream_states = [ti.state for ti in upstream_task_instances]
    fail_this_task = State.FAILED in upstream_states

    print("Do logic here...")

    if fail_this_task:
        raise AirflowException("Failing task because one or more upstream tasks failed.")


finally_ = PythonOperator(
    task_id="finally",
    python_callable=_finally,
    trigger_rule=TriggerRule.ALL_DONE,
    provide_context=True,
    dag=dag,
)

succesful_task = DummyOperator(task_id="succesful_task", dag=dag)

start >> [failing_task, succesful_task] >> finally_

mira el _finally función, que es llamada por PythonOperator. Hay algunos key puntos aquí:

  1. anotar con @provide_session y agregar argumento session=Nonepara que pueda consultar Airflow DB con session.
  2. Consulta todas las instancias de tareas ascendentes para la tarea actual:
upstream_task_instances = (
    session.query(TaskInstance)
    .filter(
        TaskInstance.dag_id == dag.dag_id,
        TaskInstance.execution_date == execution_date,
        TaskInstance.task_id.in_(task.upstream_task_ids),
    )
    .all()
)
  1. De las instancias de tareas devueltas, obtenga los estados y verifique si State.FAILED está ahí:
upstream_states = [ti.state for ti in upstream_task_instances]
fail_this_task = State.FAILED in upstream_states
  1. Realiza tu propia lógica:
print("Do logic here...")
  1. Y finalmente, fallar la tarea si fail_this_task=True:
if fail_this_task:
    raise AirflowException("Failing task because one or more upstream tasks failed.")

El final resulto:

ingrese la descripción de la imagen aquí

Como explicó @JustinasMarozas en un comentario, una solución es crear una tarea ficticia como:

dummy = DummyOperator(
    task_id='test',
    dag=dag
)

y unirlo aguas abajo a special_task :

failing_task.set_downstream(dummy)

Por lo tanto, el DAG se marca como fallido y el dummy la tarea se marca como upstream_failed.

Espero que haya una solución lista para usar, pero esperando eso, esta solución hace el trabajo.

Sección de Reseñas y Valoraciones

Recuerda mostrar este escrito si te valió la pena.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *