Hemos estado buscando por diferentes espacios para así traerte la solución para tu problema, en caso de alguna duda puedes dejarnos la duda y te respondemos con mucho gusto, porque estamos para ayudarte.
Solución:
depende de lo que quieras hacer con las excepciones del lado del productor. si se lanzará una excepción en el productor (p. ej., debido a una falla en la red o el agente kafka ha muerto), la transmisión morirá de forma predeterminada. y con kafka-streams versión 1.1.0, puede anular el comportamiento predeterminado implementando ProductionExceptionHandler
como el siguiente:
public class CustomProductionExceptionHandler implements ProductionExceptionHandler
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord record,
final Exception exception)
log.error("Kafka message marked as processed although it failed. Message: [], destination topic: []", new String(record.value()), record.topic(), exception);
return ProductionExceptionHandlerResponse.CONTINUE;
@Override
public void configure(final Map configs)
desde el método de manejo, puede devolver cualquiera CONTINUE
si no desea que las transmisiones mueran por excepción, al regresar FAIL
en caso de que desee detener la transmisión (FAIL es el predeterminado). y necesita especificar esta clase en la configuración de transmisión:
default.production.exception.handler=com.example.CustomProductionExceptionHandler
También presta atención a que ProductionExceptionHandler
maneja solo excepciones en el productor, y no manejará excepciones durante el procesamiento de mensajes con métodos de transmisión mapValues(..)
, filter(..)
, branch(..)
etc., debe envolver esta lógica de método con bloques de prueba / captura (coloque toda la lógica de su método en el bloque de prueba para garantizar que manejará todos los casos excepcionales):
.filter((key, value) -> try .. catch (Exception e) .. )
como sé, no necesitamos manejar excepciones en el lado del consumidor explícitamente, ya que las transmisiones de kafka volverán a intentar consumir automáticamente más tarde (ya que el desplazamiento no se cambiará hasta que los mensajes se consuman y procesen); Por ejemplo, si no se puede acceder al agente de kafka durante algún tiempo, obtendrá excepciones de las transmisiones de kafka y, cuando se interrumpa, la transmisión de kafka consumirá todos los mensajes. entonces, en este caso, solo tendremos demoras y nada dañado/perdido.
con setUncaughtExceptionHandler
no podrá cambiar el comportamiento predeterminado como con ProductionExceptionHandler
con él solo podría registrar un error o enviar un mensaje al tema de falla.
Para el manejo de excepciones en el lado del consumidor,
1) Puede agregar un controlador de excepciones predeterminado en el productor con la siguiente propiedad.
"default.deserialization.exception.handler" = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler";
Básicamente, apache proporciona tres clases de manejo de excepciones como
1) LogAndContiuneExceptionHandler que puede tomar como
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
2) LogAndFailExceptionHandler
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
3) LogAndSkipOnInvalidTimestamp
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndSkipOnInvalidTimestamp.class);
Para el manejo personalizado de excepciones,
1) puede implementar la interfaz DeserializationExceptionHandler y anular el método handle().
2) O puede extender las clases mencionadas anteriormente.
Comentarios y puntuaciones
Si te gustó nuestro trabajo, tienes el poder dejar una crónica acerca de qué te ha gustado de este artículo.