Saltar al contenido

¿Cómo usar AirFlow para ejecutar una carpeta de archivos python?

El paso a paso o código que hallarás en este post es la resolución más eficiente y efectiva que hallamos a tus dudas o dilema.

Solución:

Para ejecutar el archivo python como un todo, usando el BashOperator (Como en la respuesta de liferacer):

from airflow.operators.bash_operator import BashOperator

bash_task = BashOperator(
    task_id='bash_task',
    bash_command='python file1.py',
    dag=dag
)

Entonces, para hacerlo usando el PythonOperator llama a tu main función. Ya deberías tener un __main__ bloque, así que ponga lo que sucede allí en un main función, tal que su file1.py se ve así:

def main():
    """This gets executed if `python file1` gets called."""
    # my code

if __name__ == '__main__':
    main() 

Entonces tu definición dag:

from airflow.operators.python_operator import PythonOperator

import file1

python_task = PythonOperator(
    task_id='python_task',
    python_callable=file1.main,
    dag=dag
)

Puede usar BashOperator para ejecutar archivos de python como una tarea

    from airflow import DAG
    from airflow.operators import BashOperator,PythonOperator
    from datetime import datetime, timedelta

    seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                      datetime.min.time())

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': seven_days_ago,
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
      )

    dag = DAG('simple', default_args=default_args)
t1 = BashOperator(
    task_id='testairflow',
    bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
    dag=dag)

Sé que está preguntando que “le gustaría ejecutar esos archivos de Python (no la función de Python a través del Operador de Python)”. pero veo que esto probablemente usa Airflow de manera menos efectiva de lo que podría ser. También veo confusión en las respuestas escritas anteriormente, así que esta es la forma en que quería y la forma en que recomendaría realizar las tareas:

Asumiendo:

dags/
    my_dag_for_task_1_and_2.py
    tasks/
         file1.py
         file2.py

Su solicitud para evitar la PythonOperator:

#  my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import BashOperator

with DAG(
    'my_dag_for_task_1_and_2',
    default_args=
        'owner': 'me',
        'start_date': datetime(…),
        …,
    , 
    schedule_interval='8 * * * *',
) as dag:
    task_1 = BashOperator(
        task_id='task_1', 
        bash_command='/path/to/python /path/to/dags/tasks/file1.py',
    )
    task_2 = BashOperator(
        task_id='task_2', 
        bash_command='/path/to/python /path/to/dags/tasks/file2.py',
    )
    task_1 >> task_2

No escribiste Python desde cero para Airflow, pero con PythonOperator:

#  my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import PythonOperator
import tasks.file1
import tasks.file2

with DAG(
    'my_dag_for_task_1_and_2',
    default_args=
        'owner': 'me',
        'start_date': datetime(…),
        …,
    , 
    schedule_interval='8 * * * *',
) as dag:
    task_1 = PythonOperator(
        task_id='task_1', 
        python_callable=file1.function_in_file1,
    )
    task_2 = PythonOperator(
        task_id='task_2', 
        python_callable=file2.function_in_file2,  # maybe main?
    )
    task_1 >> task_2

Si para ti ha sido de utilidad este post, sería de mucha ayuda si lo compartes con otros entusiastas de la programación y nos ayudes a extender esta información.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *