Saltar al contenido

Inicie CloudSQL Proxy en Python Dataflow/Apache Beam

Rosa, parte de nuestro staff, nos hizo el favor de crear este enunciado ya que controla perfectamente el tema.

Solución:

Solución alternativa:

Finalmente encontré una solución. Tomé la idea de conectarme a través de la IP pública de la instancia de CloudSQL. Para eso necesitabas permitir conexiones a tu instancia de CloudSQL desde cada IP:

  1. Ve a la página de descripción general de tu instancia de CloudSQL en GCP
  2. Haga clic en el Authorization pestaña
  3. Haga clic en Add network y añadir 0.0.0.0/0 (!! ¡esto permitirá que cada dirección IP se conecte a su instancia!)

Para agregar seguridad al proceso, usé SSL keys y solo permitió conexiones SSL a la instancia:

  1. Haga clic en SSL pestaña
  2. Haga clic en Create a new certificate para crear un certificado SSL para su servidor
  3. Haga clic en Create a client certificate para crear un certificado SSL para su cliente
  4. Haga clic en Allow only SSL connections para rechazar todos los intentos de conexión sin SSL

Después de eso, almacené los certificados en un depósito de Google Cloud Storage y los cargué antes de conectarme dentro del trabajo de Dataflow, es decir:

import psycopg2
import psycopg2.extensions
import os
import stat
from google.cloud import storage

# Function to wait for open connection when processing parallel
def wait(conn):
    while 1:
        state = conn.poll()
        if state == psycopg2.extensions.POLL_OK:
            break
        elif state == psycopg2.extensions.POLL_WRITE:
            pass
            select.select([], [conn.fileno()], [])
        elif state == psycopg2.extensions.POLL_READ:
            pass
            select.select([conn.fileno()], [], [])
        else:
            raise psycopg2.OperationalError("poll() returned %s" % state)

# Function which returns a connection which can be used for queries
def connect_to_db(host, hostaddr, dbname, user, password, sslmode = 'verify-full'):

    # Get keys from GCS
    client = storage.Client()

    bucket = client.get_bucket()

    bucket.get_blob('PATH_TO/server-ca.pem').download_to_filename('server-ca.pem')
    bucket.get_blob('PATH_TO/client-key.pem').download_to_filename('client-key.pem')
    os.chmod("client-key.pem", stat.S_IRWXU)
    bucket.get_blob('PATH_TO/client-cert.pem').download_to_filename('client-cert.pem')

    sslrootcert = 'server-ca.pem'
    sslkey = 'client-key.pem'
    sslcert = 'client-cert.pem'

    con = psycopg2.connect(
        host = host,
        hostaddr = hostaddr,
        dbname = dbname,
        user = user,
        password = password,
        sslmode=sslmode,
        sslrootcert = sslrootcert,
        sslcert = sslcert,
        sslkey = sslkey)
    return con

Luego uso estas funciones en una costumbre ParDo para realizar consultas.
Ejemplo mínimo:

import apache_beam as beam

class ReadSQLTableNames(beam.DoFn):
    '''
    parDo class to get all table names of a given cloudSQL database.
    It will return each table name.
    '''
    def __init__(self, host, hostaddr, dbname, username, password):
        super(ReadSQLTableNames, self).__init__()
        self.host = host
        self.hostaddr = hostaddr
        self.dbname = dbname
        self.username = username
        self.password = password

    def process(self, element):

        # Connect do database
        con = connect_to_db(host = self.host,
            hostaddr = self.hostaddr,
            dbname = self.dbname,
            user = self.username,
            password = self.password)
        # Wait for free connection
        wait_select(con)
        # Create cursor to query data
        cur = con.cursor(cursor_factory=RealDictCursor)

        # Get all table names
        cur.execute(
        """
        SELECT
        tablename as table
        FROM pg_tables
        WHERE schemaname = 'public'
        """
        )
        table_names = cur.fetchall()

        cur.close()
        con.close()
        for table_name in table_names:
            yield table_name["table"]

Una parte de la canalización podría verse así:

# Current workaround to query all tables: 
# Create a dummy initiator PCollection with one element
init = p        |'Begin pipeline with initiator' >> beam.Create(['All tables initializer'])

tables = init   |'Get table names' >> beam.ParDo(ReadSQLTableNames(
                                                host = known_args.host,
                                                hostaddr = known_args.hostaddr,
                                                dbname = known_args.db_name,
                                                username = known_args.user,
                                                password = known_args.password))

Espero que esta solución ayude a otros con problemas similares.

Logré encontrar una solución mejor o al menos más fácil. En la función de configuración de DoFn, use el proxy en la nube para configurar la conexión previa

class MyDoFn(beam.DoFn):
 def setup(self):
    os.system("wget https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64 -O cloud_sql_proxy")
    os.system("chmod +x cloud_sql_proxy")
    os.system(f"./cloud_sql_proxy -instances=self.sql_args['cloud_sql_connection_name']=tcp:3306 &")

Te mostramos comentarios y puntuaciones

Si eres capaz, eres capaz de dejar una crónica acerca de qué te ha parecido este tutorial.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *