Saltar al contenido

¿Cómo utilizo varios consumidores en Kafka?

Te sugerimos que revises esta solución en un entorno controlado antes de enviarlo a producción, saludos.

Solución:

Creo que su problema radica en la propiedad auto.offset.reset. Cuando un nuevo consumidor lee desde una partición y no hay un desplazamiento comprometido previo, la propiedad auto.offset.reset se usa para decidir cuál debería ser el desplazamiento inicial. Si lo configura en “más grande” (el predeterminado), comienza a leer en el último (último) mensaje. Si lo configura como “más pequeño”, obtendrá el primer mensaje disponible.

Entonces agregue:

properties.put("auto.offset.reset", "smallest");

e intenta de nuevo.

* editar *

“más pequeño” y “más grande” se desaprobaron hace un tiempo. Debería utilizar “más temprano” o “más reciente” ahora. Cualquier duda, consulte la documentación.

Si desea que varios consumidores consuman los mismos mensajes (como una transmisión), puede generarlos con un grupo de consumidores diferente y también establecer auto.offset.reset en el más pequeño en la configuración del consumidor. Si desea que varios consumidores terminen de consumir en paralelo (divida el trabajo entre ellos), debe crear número de particiones> = número de consumidores. Una partición solo puede ser consumida por como máximo un proceso de consumidor. Pero un consumidor puede consumir más de una partición.

En la documentación aquí dice: “si proporciona más subprocesos que particiones sobre el tema, algunos subprocesos nunca verán un mensaje”. ¿Puedes agregar particiones a tu tema? Tengo el recuento de subprocesos de mi grupo de consumidores igual al número de particiones en mi tema, y ​​cada subproceso recibe mensajes.

Aquí está mi configuración de tema:

buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins
Topic:recent-wins   PartitionCount:3    ReplicationFactor:1 Configs:
Topic: recent-wins  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic: recent-wins  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
Topic: recent-wins  Partition: 2    Leader: 0   Replicas: 0 Isr: 0

Y mi consumidor:

package com.cie.dispatcher.services;

import com.cie.dispatcher.model.WinNotification;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.dropwizard.lifecycle.Managed;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * This will create three threads, assign them to a "group" and listen for  notifications on a topic.
 * Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by
 * the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the
 * lifecycle manager in dropwizard.
 * 

* Created by aakture on 6/15/15. */ public class KafkaTopicListener implements Managed private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class); private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; private int threadCount; private WinNotificationWorkflow winNotificationWorkflow; private ObjectMapper objectMapper; @Inject public KafkaTopicListener(String a_zookeeper, String a_groupId, String a_topic, int threadCount, WinNotificationWorkflow winNotificationWorkflow, ObjectMapper objectMapper) consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; this.threadCount = threadCount; this.winNotificationWorkflow = winNotificationWorkflow; this.objectMapper = objectMapper; /** * Creates the config for a connection * * @param zookeeper the host:port for zookeeper, "localhost:2181" for example. * @param groupId the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads. * @return the config props */ private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); public void stop() if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); catch (InterruptedException e) LOG.info("Interrupted during shutdown, exiting uncleanly"); LOG.info(" shutdown successfully", this.getClass().getName()); /** * Starts the listener */ public void start() Map topicCountMap = new HashMap<>(); topicCountMap.put(topic, new Integer(threadCount)); Map>> consumerMap = consumer.createMessageStreams(topicCountMap); List> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(threadCount); int threadNumber = 0; for (final KafkaStream stream : streams) executor.submit(new ListenerThread(stream, threadNumber)); threadNumber++; private class ListenerThread implements Runnable private KafkaStream m_stream; private int m_threadNumber; public ListenerThread(KafkaStream a_stream, int a_threadNumber) m_threadNumber = a_threadNumber; m_stream = a_stream; public void run() try String message = null; LOG.info("started listener thread: ", m_threadNumber); ConsumerIterator it = m_stream.iterator(); while (it.hasNext()) try message = new String(it.next().message()); LOG.info("receive message by " + m_threadNumber + " : " + message); WinNotification winNotification = objectMapper.readValue(message, WinNotification.class); winNotificationWorkflow.process(winNotification); catch (Exception ex) LOG.error("error processing queue for message: " + message, ex); LOG.info("Shutting down listener thread: " + m_threadNumber); catch (Exception ex) LOG.error("error:", ex);

Valoraciones y reseñas

Si guardas alguna indecisión y forma de beneficiar nuestro artículo eres capaz de ejecutar una disquisición y con gusto lo analizaremos.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *