Saltar al contenido

¿Cómo debo usar sql_last_value en logstash?

Solución:

Si tiene una columna de marca de tiempo en su tabla (p. Ej. last_updated), debería utilizarlo preferiblemente en lugar del ID. De modo que cuando se actualiza un registro, también modifica esa marca de tiempo y la jdbc el complemento de entrada recogerá el registro (es decir, la columna de ID no cambiará su valor y el registro actualizado no se recogerá)

input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://hostmachine:3306/db" 
        jdbc_user => "root"
        jdbc_password => "root"
        jdbc_validate_connection => true
        jdbc_driver_library => "/path/mysql_jar/mysql-connector-java-5.1.39-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        schedule => "* * * * *"
        statement => "SELECT * from mytable where last_updated > :sql_last_value"
    }
}

No obstante, si decide quedarse con la columna ID, debe eliminar la $HOME/.logstash_jdbc_last_run archivo e inténtelo de nuevo.

Hay algunas cosas de las que debe ocuparse:

  1. Si ha ejecutado Logstash anteriormente sin la programación, antes de ejecutar Logstash con programación, elimine el archivo:

    $HOME/.logstash_jdbc_last_run
    

    En Windows, este archivo se encuentra en:

    C:Users<Username>.logstash_jdbc_last_run
    
  2. La “declaración =>” en la configuración de Logstash debe tener “orden por” la columna de seguimiento.

  3. tracking_column debe proporcionarse correctamente.

A continuación, se muestra un ejemplo del archivo de configuración de Logstash:

    input {
jdbc {
    # MySQL DB jdbc connection string to our database, softwaredevelopercentral
    jdbc_connection_string => "jdbc:mysql://localhost:3306/softwaredevelopercentral?autoReconnect=true&useSSL=false"
    # The user we wish to execute our statement as
    jdbc_user => "root"
    # The user password
    jdbc_password => ""
    # The path to our downloaded jdbc driver
    jdbc_driver_library => "D:ProgramsMySQLJavamysql-connector-java-6.0.6.jar"
    # The name of the driver class for MySQL DB
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    # our query
    schedule => "* * * * *"
    statement => "SELECT * FROM student WHERE studentid > :sql_last_value order by studentid"
    use_column_value => true
    tracking_column => "studentid"
}
}
output {
stdout { codec => json_lines }
elasticsearch { 
   hosts => ["localhost:9200"]
   index => "students"
   document_type => "student"
   document_id => "%{studentid}"
   }

}

Para ver un ejemplo funcional del mismo, puede consultar la publicación de mi blog: http://softwaredevelopercentral.blogspot.com/2017/10/elasticsearch-logstash-kibana-tutorial.html

En palabras simples, sql_last_value le permite conservar los datos de su última ejecución de sql como su nombre sugiere.

Este valor es especialmente útil cuando programar su consulta. Pero por qué … ? Porque puede crear su condición de declaración SQL basada en el valor almacenado en sql_last_value y evitar recuperar filas que ya fueron ingeridas para su entrada de logstash o actualizadas después de la última ejecución de la canalización.

Cosas a tener en cuenta al usar sql_last_value

  • De forma predeterminada, esta variable almacena una marca de tiempo de la última ejecución. Útil cuando necesita ingerir datos basados ​​en columnas como creation_date last_update etc.
  • Puede definir el valor de sql_last_value rastreándolo con el valor de columna de una tabla específica. Útil cuando necesita ingerir datos basados ​​en incrementos automáticos. Para eso, debe especificar use_column_value => true y tracking_column => "column_name_to_track".

El siguiente ejemplo almacenará la última fila de mytable identificación dentro :sql_last_value para ingerir en la siguiente ejecución las filas que no fueron ingeridas previamente, significa las filas cuyo id es mayor que el último id ingerido.

input {
    jdbc {
        # ...
        schedule => "* * * * *"
        statement => "SELECT * from mytable where id > :sql_last_value"
        use_column_value => true
        tracking_column => id
    }
}

Extremadamente importante !!!

Cuando usa múltiples entradas en su canalización, cada bloque de entrada sobrescribirá el valor de sql_last_value del último. Para evitar ese comportamiento, puede usar last_run_metadata_path => "/path/to/sql_last_value/of_your_pipeline.yml" opción, lo que significa que cada tubería almacenará su propio valor en un archivo diferente.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *