Queremos brindarte la mejor solución que encontramos por todo internet. Deseamos que te sea de utilidad y si puedes aportar algo que nos pueda ayudar a crecer hazlo libremente.
Solución:
En 0.11 o superior puede ejecutar el bin/kafka-delete-records.sh
Comando para marcar mensajes para su eliminación.
https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh
Por ejemplo, publicar 100 mensajes
seq 100 | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest
luego borre 90 de esos 100 mensajes con el nuevo kafka-delete-records.sh
herramienta de línea de comandos
./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ./offsetfile.json
donde offsetfile.json
contiene
"partitions": ["topic": "mytest", "partition": 0, "offset": 90], "version":1
y luego consuma los mensajes desde el principio para verificar que 90 de los 100 mensajes estén marcados como eliminados.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest --from-beginning
91
92
93
94
95
96
97
98
99
100
Para eliminar todos los mensajes en un tema específico, puede ejecutar kafka-delete-records.sh
Por ejemplo, tengo un tema llamado test
que tiene 4partitions
.
Crear un Json
archivo, por ejemplo j.json
:
"partitions": [
"topic": "test",
"partition": 0,
"offset": -1
,
"topic": "test",
"partition": 1,
"offset": -1
,
"topic": "test",
"partition": 2,
"offset": -1
,
"topic": "test",
"partition": 3,
"offset": -1
],
"version": 1
ahora elimine todos los mensajes con este comando:
/opt/kafka/confluent-4.1.1/bin/kafdelete-records --bootstrap-server 192.168.XX.XX:9092 --offset-json-file j.json
Después de ejecutar el comando, se mostrará este mensaje
Records delete operation completed:
partition: test-0 low_watermark: 7
partition: test-1 low_watermark: 7
partition: test-2 low_watermark: 7
partition: test-3 low_watermark: 7
Te mostramos las reseñas y valoraciones de los usuarios
Recuerda comunicar esta reseña si te valió la pena.