Saltar al contenido

¿Cómo ejecutar código Spark en Airflow?

Te doy la bienvenida a nuestro sitio web, ahora encontrarás la respuesta de lo que buscabas.

Solución:

Deberías poder usar BashOperator. Manteniendo el resto de su código como está, importe los paquetes de clase y sistema requeridos:

from airflow.operators.bash_operator import BashOperator

import os
import sys

establecer las rutas requeridas:

os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

y agregar operador:

spark_task = BashOperator(
    task_id='spark_java',
    bash_command='spark-submit --class  params.class   params.jar ',
    params='class': 'MainClassName', 'jar': '/path/to/your.jar',
    dag=dag
)

Puede extender esto fácilmente para proporcionar argumentos adicionales usando plantillas Jinja.

Por supuesto, puede ajustar esto para el escenario que no sea Spark reemplazando bash_command con una plantilla adecuada en tu caso, por ejemplo:

bash_command = 'java -jar  params.jar '

y ajustando params.

Airflow a partir de la versión 1.8 (lanzada hoy), tiene

  • SparkSqlOperator – https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_sql_operator.py;

Código SparkSQLHook: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py

  • SparkSubmitOperator: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_submit_operator.py

Código SparkSubmitHook: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py

Tenga en cuenta que estos dos nuevos operadores/ganchos de Spark están en la rama “contrib” a partir de la versión 1.8, por lo que no están (bien) documentados.

Entonces puede usar SparkSubmitOperator para enviar su código Java para la ejecución de Spark.

Hay un ejemplo de SparkSubmitOperator uso de Spark 2.3.1 en kubernetes (instancia de minikube):

"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta

default_args = 
    'owner': '[email protected]',
    'depends_on_past': False,
    'start_date': datetime(2018, 7, 27),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    'end_date': datetime(2018, 7, 29),


dag = DAG(
    'tutorial_spark_operator', default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

print_path_env_task = BashOperator(
    task_id='print_path_env',
    bash_command='echo $PATH',
    dag=dag)

spark_submit_task = SparkSubmitOperator(
    task_id='spark_submit_job',
    conn_id='spark_default',
    java_class='com.ibm.cdopoc.DataLoaderDB2COS',
    application='local:///opt/spark/examples/jars/cppmpoc-dl-0.1.jar',
    total_executor_cores='1',
    executor_cores='1',
    executor_memory='2g',
    num_executors='2',
    name='airflowspark-DataLoaderDB2COS',
    verbose=True,
    driver_memory='1g',
    conf=
        'spark.DB_URL': 'jdbc:db2://dashdb-dal13.services.dal.bluemix.net:50001/BLUDB:sslConnection=true;',
        'spark.DB_USER': Variable.get("CEDP_DB2_WoC_User"),
        'spark.DB_PASSWORD': Variable.get("CEDP_DB2_WoC_Password"),
        'spark.DB_DRIVER': 'com.ibm.db2.jcc.DB2Driver',
        'spark.DB_TABLE': 'MKT_ATBTN.MERGE_STREAM_2000_REST_API',
        'spark.COS_API_KEY': Variable.get("COS_API_KEY"),
        'spark.COS_SERVICE_ID': Variable.get("COS_SERVICE_ID"),
        'spark.COS_ENDPOINT': 's3-api.us-geo.objectstorage.softlayer.net',
        'spark.COS_BUCKET': 'data-ingestion-poc',
        'spark.COS_OUTPUT_FILENAME': 'cedp-dummy-table-cos2',
        'spark.kubernetes.container.image': 'ctipka/spark:spark-docker',
        'spark.kubernetes.authenticate.driver.serviceAccountName': 'spark'
        ,
    dag=dag,
)

t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)

El código que usa variables almacenadas en Airflow variables:
ingrese la descripción de la imagen aquí

Además, debe crear una nueva conexión de chispa o editar ‘spark_default’ existente con un diccionario adicional "queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default":
ingrese la descripción de la imagen aquí

valoraciones y reseñas

Eres capaz de reafirmar nuestro quehacer escribiendo un comentario y valorándolo te estamos agradecidos.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *