Solución:
Si tiene una columna de marca de tiempo en su tabla (p. Ej. last_updated
), debería utilizarlo preferiblemente en lugar del ID. De modo que cuando se actualiza un registro, también modifica esa marca de tiempo y la jdbc
el complemento de entrada recogerá el registro (es decir, la columna de ID no cambiará su valor y el registro actualizado no se recogerá)
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://hostmachine:3306/db"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/path/mysql_jar/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
schedule => "* * * * *"
statement => "SELECT * from mytable where last_updated > :sql_last_value"
}
}
No obstante, si decide quedarse con la columna ID, debe eliminar la $HOME/.logstash_jdbc_last_run
archivo e inténtelo de nuevo.
Hay algunas cosas de las que debe ocuparse:
-
Si ha ejecutado Logstash anteriormente sin la programación, antes de ejecutar Logstash con programación, elimine el archivo:
$HOME/.logstash_jdbc_last_run
En Windows, este archivo se encuentra en:
C:Users<Username>.logstash_jdbc_last_run
-
La “declaración =>” en la configuración de Logstash debe tener “orden por” la columna de seguimiento.
-
tracking_column debe proporcionarse correctamente.
A continuación, se muestra un ejemplo del archivo de configuración de Logstash:
input {
jdbc {
# MySQL DB jdbc connection string to our database, softwaredevelopercentral
jdbc_connection_string => "jdbc:mysql://localhost:3306/softwaredevelopercentral?autoReconnect=true&useSSL=false"
# The user we wish to execute our statement as
jdbc_user => "root"
# The user password
jdbc_password => ""
# The path to our downloaded jdbc driver
jdbc_driver_library => "D:ProgramsMySQLJavamysql-connector-java-6.0.6.jar"
# The name of the driver class for MySQL DB
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# our query
schedule => "* * * * *"
statement => "SELECT * FROM student WHERE studentid > :sql_last_value order by studentid"
use_column_value => true
tracking_column => "studentid"
}
}
output {
stdout { codec => json_lines }
elasticsearch {
hosts => ["localhost:9200"]
index => "students"
document_type => "student"
document_id => "%{studentid}"
}
}
Para ver un ejemplo funcional del mismo, puede consultar la publicación de mi blog: http://softwaredevelopercentral.blogspot.com/2017/10/elasticsearch-logstash-kibana-tutorial.html
En palabras simples, sql_last_value
le permite conservar los datos de su última ejecución de sql como su nombre sugiere.
Este valor es especialmente útil cuando programar su consulta. Pero por qué … ? Porque puede crear su condición de declaración SQL basada en el valor almacenado en sql_last_value
y evitar recuperar filas que ya fueron ingeridas para su entrada de logstash o actualizadas después de la última ejecución de la canalización.
Cosas a tener en cuenta al usar sql_last_value
- De forma predeterminada, esta variable almacena una marca de tiempo de la última ejecución. Útil cuando necesita ingerir datos basados en columnas como
creation_date
last_update
etc. - Puede definir el valor de
sql_last_value
rastreándolo con el valor de columna de una tabla específica. Útil cuando necesita ingerir datos basados en incrementos automáticos. Para eso, debe especificaruse_column_value => true
ytracking_column => "column_name_to_track"
.
El siguiente ejemplo almacenará la última fila de mytable identificación dentro :sql_last_value
para ingerir en la siguiente ejecución las filas que no fueron ingeridas previamente, significa las filas cuyo id es mayor que el último id ingerido.
input {
jdbc {
# ...
schedule => "* * * * *"
statement => "SELECT * from mytable where id > :sql_last_value"
use_column_value => true
tracking_column => id
}
}
Extremadamente importante !!!
Cuando usa múltiples entradas en su canalización, cada bloque de entrada sobrescribirá el valor de sql_last_value
del último. Para evitar ese comportamiento, puede usar last_run_metadata_path => "/path/to/sql_last_value/of_your_pipeline.yml"
opción, lo que significa que cada tubería almacenará su propio valor en un archivo diferente.