Saltar al contenido

Cómo manejar errores con EasyNetQ / RabbitMQ

Solución:

El problema con el que se está encontrando con EasyNetQ / RabbitMQ es que es mucho más “crudo” en comparación con otros servicios de mensajería como SQS o Azure Service Bus / Queues, pero haré todo lo posible para indicarle la dirección correcta.

Pregunta 1.

Esto dependerá de ti. La forma más sencilla es que puede No-Ack de un mensaje en RabbitMQ / EasyNetQ, y se colocará al principio de la cola para que vuelva a intentarlo. Esto no es realmente recomendable porque se reintentará casi de inmediato (sin demora de tiempo) y también bloqueará el procesamiento de otros mensajes (si tiene un solo suscriptor con un recuento de captación previa de 1).

He visto otras implementaciones del uso de un “MessageEnvelope”. Entonces, una clase contenedora que cuando un mensaje falla, incrementa una variable de reintento en el MessageEnvelope y vuelve a enviar el mensaje a la cola. USTED tendría que hacer esto y escribir el código envolvente alrededor de sus manejadores de mensajes, no sería una función de EasyNetQ.

Usando lo anterior, también he visto a personas usar sobres, pero permiten que el mensaje tenga letras muertas. Una vez que está en la cola de mensajes no entregados, hay otra aplicación / trabajador leyendo elementos de la cola de mensajes no entregados.

Todos estos enfoques anteriores tienen un pequeño problema, ya que no hay realmente una buena manera de tener un retraso logarítmico / exponencial / cualquier tipo de aumento en el procesamiento del mensaje. Puede “retener” el mensaje en código durante algún tiempo antes de devolverlo a la cola, pero no es una buena forma de evitarlo.

De todas estas opciones, su propia aplicación personalizada que lee la cola de mensajes no entregados y decide si redirigir el mensaje basándose en un sobre que contiene el recuento de reintentos es probablemente la mejor manera.

Pregunta 2.

Puede especificar un intercambio de mensajes no entregados por cola utilizando la API avanzada. (https://github.com/EasyNetQ/EasyNetQ/wiki/The-Advanced-API#declaring-queues). Sin embargo, esto significa que tendrá que usar la API avanzada prácticamente en todas partes, ya que el uso de la implementación simple de IBus de suscripción / publicación busca colas que se nombran según el tipo de mensaje y el nombre del suscriptor. El uso de una declaración de cola personalizada significa que usted mismo manejará el nombre de sus colas, lo que significa que cuando se suscriba, necesitará saber el nombre de lo que desea, etc. ¡No más suscripciones automáticas para usted!

Pregunta 3

Una Cola de errores / Cola de mensajes no entregados es solo otra cola. Puede escuchar esta cola y hacer lo que tenga que hacer con ella. Pero realmente no hay ninguna solución lista para usar que parezca que se ajuste a sus necesidades.

Implementé exactamente lo que usted describe. Aquí hay algunos consejos basados ​​en mi experiencia y relacionados con cada una de sus preguntas.

Q1 (cómo reintentar X veces):

Para esto, puedes usar IMessage.Body.BasicProperties.Headers. Cuando consume un mensaje de una cola de error, simplemente agregue un encabezado con el nombre que elija. Busque este encabezado en cada mensaje que llegue a la cola de errores e increméntelo. Esto le dará un recuento continuo de reintentos.

Es muy importante que tiene una estrategia sobre qué hacer cuando un mensaje excede el límite de reintentos de X. No quiere perder ese mensaje. En mi caso, escribo el mensaje en el disco en ese momento. Le brinda mucha información de depuración útil para volver a consultar más adelante, porque EasyNetQ envuelve automáticamente su mensaje de origen con información de error. También tiene el mensaje original para que usted pueda, si lo desea, manualmente (o tal vez automatizado, a través de algún código de reprocesamiento por lotes) volver a enviar el mensaje más tarde de alguna manera controlada.

Puede mirar el código en la utilidad Hosepipe para ver una buena forma de hacerlo. De hecho, si sigue el patrón que ve allí, incluso puede usar Hosepipe más tarde para poner en cola los mensajes si es necesario.

Q2 (cómo crear una cola de error por cola de origen):

Puede utilizar EasyNetQ Advanced Bus para hacer esto de forma limpia. Usar IBus.Advanced.Container.Resolve<IConventions> para acceder a la interfaz de convenciones. Luego puede establecer las convenciones para la nomenclatura de la cola de error con conventions.ErrorExchangeNamingConvention y conventions.ErrorQueueNamingConvention. En mi caso, configuro la convención para que se base en el nombre de la cola de origen para obtener un par de colas queue / queue_error cada vez que creo una cola.

Q3 (cómo procesar mensajes en las colas de errores):

Puede declarar un consumidor para la cola de errores de la misma manera que lo hace con cualquier otra cola. Nuevamente, AdvancedBus le permite hacer esto limpiamente especificando que el tipo que sale de la cola es EasyNetQ.SystemMessage.Error. Entonces, IAdvancedBus.Consume<EasyNetQ.SystemMessage.Error>() te llevará allí. Reintentar simplemente significa volver a publicar en el intercambio original (prestando atención al recuento de reintentos que colocó en el encabezado (consulte mi respuesta a la P1, más arriba), y la información en el mensaje de error que consumió de la cola de errores puede ayudarlo a encontrar el objetivo para republicación.

Sé que esta es una publicación antigua, pero, en caso de que ayude a otra persona, aquí está mi pregunta de respuesta automática (necesitaba hacerla porque la ayuda existente no era suficiente) que explica cómo implementé el reintento de mensajes fallidos en sus colas originales. Lo siguiente debería responder a su pregunta n. ° 1 y n. ° 3. Para el n. ° 2, es posible que deba usar la API avanzada, que no he usado (y creo que frustra el propósito de FácilNetQ; también se podría usar el cliente RabbitMQ directamente). Sin embargo, también considere implementar IConsumerErrorStrategy.

1) Dado que puede haber varios consumidores de un mensaje y es posible que no todos tengan que volver a intentarlo, tengo un Dictionary<consumerId, RetryInfo> en el cuerpo del mensaje, ya que EasyNetQ no admite (de fábrica) tipos complejos en los encabezados de los mensajes.

public interface IMessageType
{
    int MsgTypeId { get; }

    Dictionary<string, TryInfo> MsgTryInfo {get; set;}

}

2) He implementado un class RetryEnabledErrorMessageSerializer : IErrorMessageSerializer que solo actualiza TryCount y otra información cada vez que el marco lo llama. Adjunto este serializador personalizado al marco por consumidor a través del soporte de IoC proporcionado por EasyNetQ.

 public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
 {
        public string Serialize(byte[] messageBody)
        {
             string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
             var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);

             // Add/update RetryInformation into objectifiedMsgBody here
             // I have a dictionary that saves <key:consumerId, val: TryInfoObj>

             return JsonConvert.SerializeObject(objectifiedMsgBody);
        }
  }

Y en mi clase contenedora EasyNetQ:

    public void SetupMessageBroker(string givenSubscriptionId, bool enableRetry = false)
    {
        if (enableRetry)
        {
            _defaultBus = RabbitHutch.CreateBus(currentConnString,
                                                        serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId))
                                                );
        }
        else // EasyNetQ's DefaultErrorMessageSerializer will wrap error messages
        {
            _defaultBus = RabbitHutch.CreateBus(currentConnString);
        }
    }

    public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
    {
        IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
        // Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
        // The mediator can transmit the retried msg or choose to ignore it
        return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
    }

3) Una vez que el mensaje se agrega a la cola de errores predeterminada, puede tener una aplicación de consola simple / servicio de Windows que republica periódicamente los mensajes de error existentes en sus colas originales. Algo como:

var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
var vhost = client.GetVhostAsync("https://foroayuda.es/").Result;
var aliveRes = client.IsAliveAsync(vhost).Result;
var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
foreach (var errMsg in errMsgs)
{
    var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
    var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
    pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
    pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
    pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
    var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result, pubInfo).Result;
}

4) Tengo una clase MessageHandler que contiene una función de devolución de llamada. Siempre que se entrega un mensaje al consumidor, se envía al MessageHandler, que decide si el intento del mensaje es válido y, si es así, llama a la devolución de llamada real. Si el intento no es válido (maxRetriesExceeded / el consumidor no necesita volver a intentarlo de todos modos), ignoro el mensaje. Puede optar por Dead Letter el mensaje en este caso.

public interface IMsgHandler<T> where T: class, IMessageType
{
    Task InvokeMsgCallbackFunc(T msg);
    Func<T, Task> MsgCallbackFunc { get; set; }
    bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only 
                                                      // if Retry is valid
}

Aquí está la función de mediador en MsgHandler que invoca la devolución de llamada:

    public async Task InvokeMsgCallbackFunc(T msg)
    {
        if (IsTryValid(msg, CurrSubscriptionId))
        {
            await this.MsgCallbackFunc(msg);
        }
        else
        {
            // Do whatever you want
        }
    }
¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *