Saltar al contenido

¿Hay alguna manera de eliminar todos los datos de un tema o eliminar el tema antes de cada ejecución?

Te recomendamos que revises esta respuesta en un ambiente controlado antes de pasarlo a producción, saludos.

Solución:

Como mencioné aquí Purge Kafka Queue:

Probado en Kafka 0.8.2, para el ejemplo de inicio rápido: Primero, agregue una línea al archivo server.properties en la carpeta de configuración:

delete.topic.enable=true

entonces, puedes ejecutar este comando:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

No creo que sea compatible todavía. Eche un vistazo a este problema de JIRA “Agregar eliminar soporte de tema”.

Para eliminar manualmente:

  1. Cerrar el clúster
  2. Limpiar el directorio de registro de kafka (especificado por el log.dir attribute en el archivo de configuración de kafka) así como los datos del cuidador del zoológico
  3. Reiniciar el clúster

Para cualquier tema dado, lo que puede hacer es

  1. Detener Kafka
  2. Limpie el registro de kafka específico de la partición, kafka almacena su archivo de registro en un formato de “logDir/topic-partition”, por lo que para un tema llamado “MyTopic”, el registro de la partición id 0 se almacenará en /tmp/kafka-logs/MyTopic-0 donde /tmp/kafka-logs es especificado por el log.dir attribute
  3. reiniciar kafka

Este es NOT un enfoque bueno y recomendado, pero debería funcionar. En el archivo de configuración del agente de Kafka, el log.retention.hours.per.topic attribute se utiliza para definir The number of hours to keep a log file before deleting it for some specific topic

Además, ¿hay alguna forma de que los mensajes se eliminen tan pronto como el consumidor los lea?

De la documentación de Kafka:

El clúster de Kafka conserva todos los mensajes publicados, ya sea que se hayan consumido o no, durante un período de tiempo configurable. Por ejemplo, si la retención de registros se establece en dos días, durante los dos días posteriores a la publicación de un mensaje, estará disponible para su consumo, después de lo cual se descartará para liberar espacio. El rendimiento de Kafka es efectivamente constante con respecto al tamaño de los datos, por lo que retener muchos datos no es un problema.

De hecho, los únicos metadatos retenidos por consumidor son la posición del consumidor en el registro, lo que se denomina “compensación”. Este desplazamiento lo controla el consumidor: normalmente, un consumidor avanzará su desplazamiento linealmente a medida que lee los mensajes, pero de hecho, la posición está controlada por el consumidor y puede consumir mensajes en el orden que desee. Por ejemplo, un consumidor puede restablecer una compensación anterior para reprocesar.

Para encontrar el desplazamiento de inicio para leer en el ejemplo de Kafka 0.8 Simple Consumer, dicen

Kafka incluye dos constantes para ayudar, kafka.api.OffsetRequest.EarliestTime() encuentra el comienzo de los datos en los registros y comienza a transmitir desde allí, kafka.api.OffsetRequest.LatestTime() solo transmitirá nuevos mensajes.

También puede encontrar el código de ejemplo allí para administrar la compensación en el lado del consumidor.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) 
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map requestInfo = new HashMap();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) 
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];

Probado con kafka 0.10

1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.

Nota: si está eliminando carpetas de temas dentro de los registros de kafka pero no de la carpeta de datos de zookeeper, verá que los temas todavía están allí.

Tienes la opción de corroborar nuestro quehacer añadiendo un comentario y valorándolo te damos las gracias.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *