Saltar al contenido

Conexión de Airflow s3 mediante la interfaz de usuario

Estate atento porque en esta reseña vas a hallar el arreglo que buscas.

Solución:

EDITAR: Esta respuesta almacena tu secreto key en Texto sin formato que puede ser un riesgo de seguridad y no es recomendable. La mejor manera es poner acceso key y secreto key en los campos de inicio de sesión/contraseña, como se menciona en otras respuestas a continuación. FIN DE EDITAR

Es difícil encontrar referencias, pero después de investigar un poco pude hacerlo funcionar.

TLDR

Cree una nueva conexión con lo siguiente attributes:

ID de conexión: my_conn_S3

Tipo de conexión: S3

Extra:

"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"

Versión larga, configurando la conexión de la interfaz de usuario:

  • En Airflow UI, vaya a Admin > Conexiones
  • Cree una nueva conexión con lo siguiente attributes:
  • ID de conexión: my_conn_S3
  • Tipo de conexión: S3
  • Extra: "aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"
  • Deje todos los demás campos (Host, Esquema, Inicio de sesión) en blanco.

Para usar esta conexión, a continuación puede encontrar una prueba de sensor S3 simple. La idea de esta prueba es configurar un sensor que observe los archivos en S3 (tarea T1) y una vez que se cumpla la siguiente condición, active un comando bash (tarea T2).

Pruebas

  • Antes de ejecutar el DAG, asegúrese de tener un depósito S3 llamado “S3-Bucket-To-Watch”.
  • Agregue a continuación s3_dag_test.py a la carpeta airflow dags (~/airflow/dags)
  • Comienzo airflow webserver.
  • Vaya a la interfaz de usuario de Airflow (http://localhost:8383/)
  • Comienzo airflow scheduler.
  • Active el DAG ‘s3_dag_test’ en la vista principal de DAG.
  • Seleccione ‘s3_dag_test’ para mostrar los detalles de dag.
  • En la vista de gráfico, debería poder ver su estado actual.
  • La tarea ‘check_s3_for_file_in_s3’ debe estar activa y ejecutándose.
  • Ahora, agregue un archivo llamado ‘file-to-watch-1’ a su ‘S3-Bucket-To-Watch’.
  • Las primeras tareas deberían haberse completado, las segundas deberían comenzar y terminar.

El intervalo_programación en la definición de dag se establece en ‘@once’, para facilitar la depuración.

Para volver a ejecutarlo, déjelo todo como está, elimine los archivos del depósito y vuelva a intentarlo seleccionando la primera tarea (en la vista de gráfico) y seleccionando ‘Borrar’ todo ‘Pasado’, ‘Futuro’, ‘Upstream’, ‘Downstream’ …. actividad. Esto debería iniciar el DAG nuevamente.

Déjame saber cómo te fue.

s3_dag_test.py;

"""
S3 Sensor Connection Test
"""

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor,   BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta

default_args = 
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 11, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)


dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')

t1 = BashOperator(
    task_id='bash_test',
    bash_command='echo "hello, it should work" > s3_conn_test.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='check_s3_for_file_in_s3',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='S3-Bucket-To-Watch',
    s3_conn_id='my_conn_S3',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)

Principales Referencias:

  • https://gitter.im/apache/incubator-airflow
  • https://groups.google.com/forum/#!topic/airbnb_airflow/TXsJNOBBfig
  • https://github.com/apache/incubator-airflow

Suponiendo que Airflow está alojado en un servidor EC2.

simplemente cree la conexión según otras respuestas, pero deje todo en blanco en la configuración, aparte del tipo de conexión, que debe permanecer como S3

El S3hook se establecerá de manera predeterminada en boto y esto se establecerá de manera predeterminada en la función del servidor EC2 en el que está ejecutando airflow. suponiendo que este rol tenga derechos para S3, su tarea podrá acceder al depósito.

esta es una forma mucho más segura que usar y almacenar credenciales.

Si le preocupa exponer las credenciales en la interfaz de usuario, otra forma es pasar la ubicación del archivo de credenciales en el parámetro Extra en la interfaz de usuario. Solo el usuario funcional tiene privilegios de lectura para el archivo. Se ve algo como a continuación

Extra:  
    "profile": "", 
    "s3_config_file": "/home//creds/s3_credentials", 
    "s3_config_format": "aws" 

expediente “/home//creds/s3_credentials” tiene debajo de las entradas

[]
aws_access_key_id = 
aws_secret_access_key = 

Valoraciones y reseñas

Tienes la opción de asentar nuestra función escribiendo un comentario o dejando una valoración te damos las gracias.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *