El paso a paso o código que hallarás en este post es la resolución más eficiente y efectiva que hallamos a tus dudas o dilema.
Solución:
Para ejecutar el archivo python como un todo, usando el BashOperator
(Como en la respuesta de liferacer):
from airflow.operators.bash_operator import BashOperator
bash_task = BashOperator(
task_id='bash_task',
bash_command='python file1.py',
dag=dag
)
Entonces, para hacerlo usando el PythonOperator
llama a tu main
función. Ya deberías tener un __main__
bloque, así que ponga lo que sucede allí en un main
función, tal que su file1.py
se ve así:
def main():
"""This gets executed if `python file1` gets called."""
# my code
if __name__ == '__main__':
main()
Entonces tu definición dag:
from airflow.operators.python_operator import PythonOperator
import file1
python_task = PythonOperator(
task_id='python_task',
python_callable=file1.main,
dag=dag
)
Puede usar BashOperator para ejecutar archivos de python como una tarea
from airflow import DAG
from airflow.operators import BashOperator,PythonOperator
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': seven_days_ago,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
)
dag = DAG('simple', default_args=default_args)
t1 = BashOperator(
task_id='testairflow',
bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
dag=dag)
Sé que está preguntando que “le gustaría ejecutar esos archivos de Python (no la función de Python a través del Operador de Python)”. pero veo que esto probablemente usa Airflow de manera menos efectiva de lo que podría ser. También veo confusión en las respuestas escritas anteriormente, así que esta es la forma en que quería y la forma en que recomendaría realizar las tareas:
Asumiendo:
dags/
my_dag_for_task_1_and_2.py
tasks/
file1.py
file2.py
Su solicitud para evitar la PythonOperator
:
# my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import BashOperator
with DAG(
'my_dag_for_task_1_and_2',
default_args=
'owner': 'me',
'start_date': datetime(…),
…,
,
schedule_interval='8 * * * *',
) as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='/path/to/python /path/to/dags/tasks/file1.py',
)
task_2 = BashOperator(
task_id='task_2',
bash_command='/path/to/python /path/to/dags/tasks/file2.py',
)
task_1 >> task_2
No escribiste Python desde cero para Airflow, pero con PythonOperator
:
# my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import PythonOperator
import tasks.file1
import tasks.file2
with DAG(
'my_dag_for_task_1_and_2',
default_args=
'owner': 'me',
'start_date': datetime(…),
…,
,
schedule_interval='8 * * * *',
) as dag:
task_1 = PythonOperator(
task_id='task_1',
python_callable=file1.function_in_file1,
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=file2.function_in_file2, # maybe main?
)
task_1 >> task_2
Si para ti ha sido de utilidad este post, sería de mucha ayuda si lo compartes con otros entusiastas de la programación y nos ayudes a extender esta información.