Saltar al contenido

Cómo eliminar objetos XCOM una vez que el DAG finaliza su ejecución en Airflow

Solución:

Puede realizar la limpieza mediante programación a través de sqlalchemy para que su solución no se rompa si cambia la estructura de la base de datos:

from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.dag_id == "your dag id").delete()

También puede purgar datos antiguos de XCom:

from airflow.utils.db import provide_session
from airflow.models import XCom
from sqlalchemy import func

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()

Si desea purgar el XCom una vez que finalice el dag, creo que la solución más limpia es usar la propiedad “on_success_callback” de la clase de modelo DAG:

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)

Debe agregar una tarea que depende de su metadatadb (sqllite, PostgreSql, MySql ..) que elimine XCOM una vez que finalice la ejecución del DAG.

delete_xcom_task = PostgresOperator(
      task_id='delete-xcom-task',
      postgres_conn_id='airflow_db',
      sql="delete from xcom where dag_id=dag.dag_id and 
           task_id='your_task_id' and execution_date={{ ds }}",
      dag=dag)

Puede verificar su consulta antes de ejecutar el dag.

Creación de perfiles de datos -> Consulta ad hoc -> airflow_db -> consulta -> Ejecutar.

metadatos de xcom

A continuación se muestra el código que funcionó para mí, esto eliminará xcom de todas las tareas en DAG (agregue task_id a SQL si es necesario eliminar xcom de solo una tarea específica):

Como dag_id es dinámico y fechas debe seguir la sintaxis respectiva de SQL.

from airflow.operators.postgres_operator import PostgresOperator

delete_xcom_task_inst = PostgresOperator(task_id='delete_xcom',
                                            postgres_conn_id='your_conn_id',
                                            sql="delete from xcom where dag_id= '"+dag.dag_id+"' and date(execution_date)=date('{{ ds }}')"
                                            )
¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *