Te doy la bienvenida a nuestro sitio web, ahora encontrarás la respuesta de lo que buscabas.
Solución:
Deberías poder usar BashOperator
. Manteniendo el resto de su código como está, importe los paquetes de clase y sistema requeridos:
from airflow.operators.bash_operator import BashOperator
import os
import sys
establecer las rutas requeridas:
os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
y agregar operador:
spark_task = BashOperator(
task_id='spark_java',
bash_command='spark-submit --class params.class params.jar ',
params='class': 'MainClassName', 'jar': '/path/to/your.jar',
dag=dag
)
Puede extender esto fácilmente para proporcionar argumentos adicionales usando plantillas Jinja.
Por supuesto, puede ajustar esto para el escenario que no sea Spark reemplazando bash_command
con una plantilla adecuada en tu caso, por ejemplo:
bash_command = 'java -jar params.jar '
y ajustando params
.
Airflow a partir de la versión 1.8 (lanzada hoy), tiene
- SparkSqlOperator – https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_sql_operator.py;
Código SparkSQLHook: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
- SparkSubmitOperator: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_submit_operator.py
Código SparkSubmitHook: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py
Tenga en cuenta que estos dos nuevos operadores/ganchos de Spark están en la rama “contrib” a partir de la versión 1.8, por lo que no están (bien) documentados.
Entonces puede usar SparkSubmitOperator para enviar su código Java para la ejecución de Spark.
Hay un ejemplo de SparkSubmitOperator
uso de Spark 2.3.1 en kubernetes (instancia de minikube):
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta
default_args =
'owner': '[email protected]',
'depends_on_past': False,
'start_date': datetime(2018, 7, 27),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
'end_date': datetime(2018, 7, 29),
dag = DAG(
'tutorial_spark_operator', default_args=default_args, schedule_interval=timedelta(1))
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
print_path_env_task = BashOperator(
task_id='print_path_env',
bash_command='echo $PATH',
dag=dag)
spark_submit_task = SparkSubmitOperator(
task_id='spark_submit_job',
conn_id='spark_default',
java_class='com.ibm.cdopoc.DataLoaderDB2COS',
application='local:///opt/spark/examples/jars/cppmpoc-dl-0.1.jar',
total_executor_cores='1',
executor_cores='1',
executor_memory='2g',
num_executors='2',
name='airflowspark-DataLoaderDB2COS',
verbose=True,
driver_memory='1g',
conf=
'spark.DB_URL': 'jdbc:db2://dashdb-dal13.services.dal.bluemix.net:50001/BLUDB:sslConnection=true;',
'spark.DB_USER': Variable.get("CEDP_DB2_WoC_User"),
'spark.DB_PASSWORD': Variable.get("CEDP_DB2_WoC_Password"),
'spark.DB_DRIVER': 'com.ibm.db2.jcc.DB2Driver',
'spark.DB_TABLE': 'MKT_ATBTN.MERGE_STREAM_2000_REST_API',
'spark.COS_API_KEY': Variable.get("COS_API_KEY"),
'spark.COS_SERVICE_ID': Variable.get("COS_SERVICE_ID"),
'spark.COS_ENDPOINT': 's3-api.us-geo.objectstorage.softlayer.net',
'spark.COS_BUCKET': 'data-ingestion-poc',
'spark.COS_OUTPUT_FILENAME': 'cedp-dummy-table-cos2',
'spark.kubernetes.container.image': 'ctipka/spark:spark-docker',
'spark.kubernetes.authenticate.driver.serviceAccountName': 'spark'
,
dag=dag,
)
t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)
El código que usa variables almacenadas en Airflow variables:
Además, debe crear una nueva conexión de chispa o editar ‘spark_default’ existente con un diccionario adicional "queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"
:
valoraciones y reseñas
Eres capaz de reafirmar nuestro quehacer escribiendo un comentario y valorándolo te estamos agradecidos.