Saltar al contenido

Transmita la tabla de BigQuery a Google Pub/Sub

Posteriormente a consultar con expertos en este tema, programadores de deferentes áreas y maestros hemos dado con la respuesta al problema y la dejamos plasmada en esta publicación.

Solución:

Actualización 2019:

Ahora es realmente fácil con una opción de clic para hacer una consulta grande en Pub/Sub:

ingrese la descripción de la imagen aquí

Encuéntralo en: https://console.cloud.google.com/cloudpubsub/topicList


La forma más fácil que conozco es usar Google Cloud Dataflow, que sabe de forma nativa cómo acceder a BigQuery y Pub/Sub.

En teoría, debería ser tan fácil como las siguientes líneas de Python:

p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
  beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
  beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))

Esta combinación de Python/Dataflow/BigQuery/PubSub no funciona hoy (Python Dataflow está en versión beta, pero esté atento al registro de cambios).

Podemos hacer lo mismo con Java, y funciona bien. Lo acabo de probar. Se ejecuta localmente y también en el corredor de Dataflow alojado:

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection weatherData = p.apply(
        BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn() 
    @Override
    public void processElement(DoFn.ProcessContext c) throws Exception 
        c.output(c.element().toString());
    
)).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));

p.run();

Prueba si los mensajes están ahí con:

gcloud --project myproject beta pubsub subscriptions  pull --auto-ack sub1

Captura de pantalla de flujo de datos alojado:

Flujo de datos alojado en el trabajo

Eso realmente depende del tamaño de la mesa.

Si se trata de una tabla pequeña (unos pocos miles de registros, un par de columnas), entonces podría configurar un proceso para consultar toda la tabla, convertir la respuesta en un JSON arrayy empujar a pub-sub.

Si se trata de una tabla grande (millones/billones de registros, cientos de columnas), tendría que exportar a un archivo y luego preparar/enviar a pub-sub

También depende de su política de particionamiento: si sus tablas están configuradas para particionar por fecha, es posible que pueda, nuevamente, consultar en lugar de exportar.

Por último, pero no menos importante, también depende de la frecuencia: ¿es un trato único (luego exportar) o un proceso continuo (luego usar decoradores de tablas para consultar solo los datos más recientes)?

Necesita más información si desea una respuesta realmente útil.

Editar

Según sus comentarios sobre el tamaño de la tabla, creo que la mejor manera sería tener un script que:

  1. Exporte la tabla a GCS como JSON delimitado por saltos de línea

  2. Procese el archivo (lea línea por línea) y envíelo a pub-sub

Hay bibliotecas cliente para la mayoría de los lenguajes de programación. He hecho cosas similares con Python, y es bastante sencillo.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *