Saltar al contenido

Estrategia eficaz para evitar mensajes duplicados en el consumidor de apache kafka

Solución:

La respuesta corta es no.

Lo que está buscando es un procesamiento exactamente una vez. Si bien a menudo puede parecer factible, nunca se debe confiar en él porque siempre hay advertencias.

Incluso para intentar evitar duplicados, necesitaría utilizar el consumidor simple. El funcionamiento de este enfoque es para cada consumidor, cuando se consume un mensaje desde alguna partición, se escribe la partición y el desplazamiento del mensaje consumido en el disco. Cuando el consumidor se reinicia después de una falla, lea el último desplazamiento consumido para cada partición del disco.

Pero incluso con este patrón, el consumidor no puede garantizar que no volverá a procesar un mensaje después de una falla. ¿Qué pasa si el consumidor consume un mensaje y luego falla antes de que el desplazamiento se vacíe en el disco? Si escribe en el disco antes de procesar el mensaje, ¿qué pasa si escribe el desplazamiento y luego falla antes de procesar el mensaje? Este mismo problema existiría incluso si tuviera que enviar compensaciones a ZooKeeper después de cada mensaje.

Sin embargo, hay algunos casos en los que el procesamiento exactamente una vez es más alcanzable, pero solo para ciertos casos de uso. Esto simplemente requiere que su compensación se almacene en la misma ubicación que la salida de la aplicación de la unidad. Por ejemplo, si escribe un consumidor que cuenta mensajes, al almacenar el último desplazamiento contado con cada recuento, puede garantizar que el desplazamiento se almacena al mismo tiempo que el estado del consumidor. Por supuesto, para garantizar el procesamiento de una sola vez, esto requeriría consumir exactamente un mensaje y actualizar el estado exactamente una vez para cada mensaje, y eso es completamente impráctico para la mayoría de las aplicaciones de consumo de Kafka. Por su naturaleza, Kafka consume mensajes en lotes por motivos de rendimiento.

Por lo general, su tiempo estará mejor invertido y su aplicación será mucho más confiable si simplemente la diseña para que sea idempotente.

Esto es lo que Kafka FAQ tiene que decir sobre el tema de exactamente una vez:

¿Cómo obtengo mensajes exactamente una vez de Kafka?

Exactamente una vez, la semántica tiene dos partes: evitar la duplicación durante la producción de datos y evitar duplicados durante el consumo de datos.

Hay dos enfoques para obtener semántica exactamente una vez durante la producción de datos:

  • Use un solo escritor por partición y cada vez que obtenga un error de red, verifique el último mensaje en esa partición para ver si su última escritura fue exitosa
  • Incluya una clave principal (UUID o algo así) en el mensaje y deduplica en el consumidor.

Si realiza una de estas cosas, el registro que aloja Kafka no tendrá duplicados. Sin embargo, la lectura sin duplicados también depende de la cooperación del consumidor. Si el consumidor está marcando periódicamente su posición, si falla y se reinicia, se reiniciará desde la posición marcada. Por lo tanto, si la salida de datos y el punto de control no se escriben de forma atómica, también será posible obtener duplicados aquí. Este problema es particular de su sistema de almacenamiento. Por ejemplo, si está utilizando una base de datos, puede comprometerlos juntos en una transacción. El cargador de HDFS Camus que escribió LinkedIn hace algo como esto para las cargas de Hadoop. La otra alternativa que no requiere una transacción es almacenar el desplazamiento con los datos cargados y deduplicar usando la combinación de tema / partición / desplazamiento.

Creo que hay dos mejoras que harían esto mucho más fácil:

  • La idempotencia del productor se podría hacer de forma automática y mucho más económica integrando opcionalmente el soporte para esto en el servidor.
  • El consumidor de alto nivel existente no expone mucho del control más fino de las compensaciones (por ejemplo, para restablecer su posición). Estaremos trabajando en eso pronto

Estoy de acuerdo con la deduplicación de RaGe en el lado del consumidor. Y usamos Redis para deduplicar el mensaje de Kafka.

Suponga que la clase Message tiene un miembro llamado ‘uniqId’, que es llenado por el lado del productor y se garantiza que es único. Usamos una cadena aleatoria de 12 longitudes. (regexp es '^[A-Za-z0-9]{12}$')

El lado del consumidor usa SETNX de Redis para deduplicar y EXPIRE para purgar las claves caducadas automáticamente. Código de muestra:

Message msg = ... // eg. ConsumerIterator.next().message().fromJson();
Jedis jedis = ... // eg. JedisPool.getResource();
String key = "SPOUT:" + msg.uniqId; // prefix name at will
String val = Long.toString(System.currentTimeMillis());
long rsps = jedis.setnx(key, val);
if (rsps <= 0) {
    log.warn("kafka dup: {}", msg.toJson()); // and other logic
} else {
    jedis.expire(key, 7200); // 2 hours is ok for production environment;
}

El código anterior detectó mensajes duplicados varias veces cuando Kafka (versión 0.8.x) tuvo situaciones. Con nuestro registro de auditoría de balance de entrada / salida, no se perdió ningún mensaje o se duplicó.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *