Indagamos en diferentes espacios para mostrarte la respuesta para tu dilema, si continúas con inquietudes puedes dejar tu inquietud y contestaremos con mucho gusto.
Solución:
En este momento, Kafka Streams ofrece solo capacidades limitadas de manejo de errores. Se está trabajando para simplificar esto. Por ahora, su enfoque general parece ser un buen camino a seguir.
Un comentario sobre el manejo de errores de deserialización: manejar esos errores manualmente, requiere que realice la deserialización “manualmente”. Esto significa que debe configurar ByteArraySerde
s para key y valor para su tema de entrada / salida de su aplicación Streams y agregue un map()
que hace la deserialización (es decir, KStream
– o al revés si también desea detectar excepciones de serialización). De lo contrario, no puede try-catch
excepciones de deserialización.
Con su enfoque actual, “solo” valida que el string representa un documento válido, pero podría darse el caso de que el mensaje en sí esté dañado y no se pueda convertir en un String
en el operador de origen en primer lugar. Por lo tanto, en realidad no cubre la excepción de deserialización con su código. Sin embargo, si está seguro de que nunca se producirá una excepción de deserialización, su enfoque también sería suficiente.
Actualizar
Estos problemas se abordan a través de KIP-161 y se incluirán en la próxima versión 1.0.0. Le permite registrar una devolución de llamada a través de un parámetro default.deserialization.exception.handler
. El controlador se invocará cada vez que se produzca una excepción durante la deserialización y le permitirá devolver un DeserializationResponse
(CONTINUE
-> suelta el registro y sigue adelante, o FAIL
que es el predeterminado).
Actualización 2
Con KIP-210 (será parte de Kafka 1.1) también es posible manejar errores en el lado del productor, similar a la parte del consumidor, registrando un ProductionExceptionHandler
a través de la configuración default.production.exception.handler
que puede volver CONTINUE
.
Actualización 23 de marzo de 2018: Kafka 1.0 proporciona un manejo mucho mejor y más fácil para los mensajes de error incorrectos (“píldoras venenosas”) a través de KIP-161 que lo que describí a continuación. Consulte default.deserialization.exception.handler en los documentos de Kafka 1.0.
Potencialmente, esto podría ser cosas como mensajes que no puedo deserializar correctamente […]
Ok, mi respuesta aquí se centra en los problemas de (des) serialización, ya que este podría ser el escenario más complicado de manejar para la mayoría de los usuarios.
[…] o tal vez la lógica de procesamiento / filtrado falla de alguna manera inesperada (no tengo dependencias externas, por lo que no debería haber errores transitorios de ese tipo).
El mismo pensamiento (para la deserialización) también se puede aplicar a fallas en la lógica de procesamiento. Aquí, la mayoría de la gente tiende a gravitar hacia la opción 2 a continuación (menos la parte de deserialización), pero YMMV.
Estaba considerando envolver todo mi código de procesamiento / filtrado en una captura de prueba y, si se generaba una excepción, enrutarlo a un “tema de error”. Luego puedo estudiar el mensaje y modificarlo o corregir mi código según corresponda y luego reproducirlo en master. Si dejo que se propaguen las excepciones, la transmisión parece atascarse y no se recogen más mensajes.
- ¿Este enfoque se considera la mejor práctica?
Sí, por el momento este es el camino a seguir. Esencialmente, los dos patrones más comunes son (1) omitir mensajes corruptos o (2) enviar registros corruptos a un tema en cuarentena, también conocido como cola de mensajes no entregados.
- ¿Existe una forma conveniente de Kafka Streams para manejar esto? No creo que haya un concepto de DLQ …
Sí, hay una forma de manejar esto, incluido el uso de una cola de mensajes no entregados. Sin embargo, (al menos en mi humilde opinión) no es tan conveniente todavía. Si tiene algún comentario sobre cómo la API debería permitirle manejar esto, por ejemplo, a través de un método nuevo o actualizado, una configuración (“si la serialización / deserialización falla, envíe el registro problemático a ESTE tema de cuarentena”), por favor déjenos saber. 🙂
- ¿Cuáles son las formas alternativas de evitar que Kafka interfiera en un “mensaje negativo”?
- ¿Qué enfoques alternativos para el manejo de errores existen?
Vea mis ejemplos a continuación.
FWIW, la comunidad de Kafka también está discutiendo la adición de una nueva herramienta CLI que le permite omitir mensajes corruptos. Sin embargo, como usuario de la API de Kafka Streams, creo que idealmente desea manejar estos escenarios directamente en su código y recurrir a las utilidades CLI solo como último recurso.
Aquí hay algunos patrones para que Kafka Streams DSL maneje registros / mensajes corruptos, también conocidos como “píldoras venenosas”. Esto está tomado de http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages
Opción 1: omita los registros dañados con flatMap
Podría decirse que esto es lo que a la mayoría de los usuarios les gustaría hacer.
- Usamos
flatMap
porque le permite generar cero, uno o más registros de salida por registro de entrada. En el caso de un registro dañado, no mostramos nada (cero registros), ignorando / omitiendo el registro dañado. - Benefíciese de este enfoque en comparación con los otros que se enumeran aquí: ¡Necesitamos deserializar manualmente un registro solo una vez!
- Inconveniente de este enfoque:
flatMap
“marca” el flujo de entrada para una posible nueva partición de datos, es decir, si realiza una key-operación basada en agrupaciones (groupBy
/groupByKey
) o se une posteriormente, sus datos se volverán a particionar detrás de escena. Dado que este puede ser un paso costoso, no queremos que suceda innecesariamente. Si SABE que el registro keys son siempre válidos O que no es necesario operar en el keys (manteniéndolos como “crudos” keys enbyte[]
formato), puede cambiar deflatMap
paraflatMapValues
, lo que no resultará en una nueva partición de los datos incluso si se une / agrupa / agrega la transmisión más tarde.
Ejemplo de código:
Serde bytesSerde = Serdes.ByteArray();
Serde stringSerde = Serdes.String();
Serde longSerde = Serdes.Long();
// Input topic, which might contain corrupted messages
KStream input = builder.stream(bytesSerde, bytesSerde, inputTopic);
// Note how the returned stream is of type KStream,
// rather than KStream.
KStream doubled = input.flatMap(
(k, v) ->
try
// Attempt deserialization
String key = stringSerde.deserializer().deserialize(inputTopic, k);
long value = longSerde.deserializer().deserialize(inputTopic, v);
// Ok, the record is valid (not corrupted). Let's take the
// opportunity to also process the record in some way so that
// we haven't paid the deserialization cost just for "poison pill"
// checking.
return Collections.singletonList(KeyValue.pair(key, 2 * value));
catch (SerializationException e)
// log + ignore/skip the corrupted message
System.err.println("Could not deserialize record: " + e.getMessage());
return Collections.emptyList();
);
Opción 2: cola de mensajes no entregados con branch
En comparación con la opción 1 (que ignora los registros corruptos), la opción 2 retiene los mensajes corruptos filtrándolos fuera del flujo de entrada “principal” y escribiéndolos en un tema de cuarentena (piense: cola de mensajes no entregados). El inconveniente es que, para registros válidos, debemos pagar dos veces el costo de deserialización manual.
KStream input = ...;
KStream[] partitioned = input.branch(
(k, v) ->
boolean isValidRecord = false;
try
stringSerde.deserializer().deserialize(inputTopic, k);
longSerde.deserializer().deserialize(inputTopic, v);
isValidRecord = true;
catch (SerializationException ignored)
return isValidRecord;
,
(k, v) -> true
);
// partitioned[0] is the KStream that contains
// only valid records. partitioned[1] contains only corrupted
// records and thus acts as a "dead letter queue".
KStream doubled = partitioned[0].map(
(key, value) -> KeyValue.pair(
// Must deserialize a second time unfortunately.
stringSerde.deserializer().deserialize(inputTopic, key),
2 * longSerde.deserializer().deserialize(inputTopic, value)));
// Don't forget to actually write the dead letter queue back to Kafka!
partitioned[1].to(Serdes.ByteArray(), Serdes.ByteArray(), "quarantine-topic");
Opción 3: omita los registros dañados con filter
Solo menciono esto para completar. Esta opción parece una combinación de las opciones 1 y 2, pero es peor que cualquiera de ellas. En comparación con la opción 1, debe pagar el costo de deserialización manual por registros válidos dos veces (¡malo!). En comparación con la opción 2, pierde la capacidad de retener registros corruptos en una cola de mensajes no entregados.
KStream validRecordsOnly = input.filter(
(k, v) ->
boolean isValidRecord = false;
try
bytesSerde.deserializer().deserialize(inputTopic, k);
longSerde.deserializer().deserialize(inputTopic, v);
isValidRecord = true;
catch (SerializationException e)
// log + ignore/skip the corrupted message
System.err.println("Could not deserialize record: " + e.getMessage());
return isValidRecord;
);
KStream doubled = validRecordsOnly.map(
(key, value) -> KeyValue.pair(
// Must deserialize a second time unfortunately.
stringSerde.deserializer().deserialize(inputTopic, key),
2 * longSerde.deserializer().deserialize(inputTopic, value)));
Cualquier ayuda muy apreciada.
Espero poder ayudar. En caso afirmativo, agradecería sus comentarios sobre cómo podríamos mejorar la API de Kafka Streams para manejar fallas / excepciones de una manera mejor / más conveniente que en la actualidad. 🙂
Si tienes algún recelo y disposición de reformar nuestro artículo puedes escribir un paráfrasis y con placer lo estudiaremos.