Saltar al contenido

¿Cómo puedo crear una instancia de un tema de Mock Kafka para pruebas junit?

Celeste, miembro de este equipo, nos hizo el favor de escribir este tutorial ya que controla a la perfección este tema.

Solución:

https://gist.github.com/asmaier/6465468#file-kafkaproducertest-java

Este ejemplo se actualizó para que funcione en la nueva versión 0.8.2.2. Aquí está el fragmento de código con dependencias de maven:

pom.xml:


    
      junit
      junit
      4.12
    
    
      org.apache.kafka
      kafka_2.11
      0.8.2.2
    
    
      org.apache.kafka
      kafka_2.11
      0.8.2.2
      test
    
    
      org.apache.kafka
      kafka-clients
      0.8.2.2
    

KafkaProducerTest.java:

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.junit.Test;
import kafka.admin.TopicCommand;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.TestZKUtils;
import kafka.utils.Time;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
import static org.junit.Assert.*;

/**
 * For online documentation
 * see
 * https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/utils/TestUtils.scala
 * https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/TopicCommand.scala
 * https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
 */
public class KafkaProducerTest 

    private int brokerId = 0;
    private String topic = "test";

    @Test
    public void producerTest() throws InterruptedException 

        // setup Zookeeper
        String zkConnect = TestZKUtils.zookeeperConnect();
        EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
        ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);

        // setup Broker
        int port = TestUtils.choosePort();
        Properties props = TestUtils.createBrokerConfig(brokerId, port, true);

        KafkaConfig config = new KafkaConfig(props);
        Time mock = new MockTime();
        KafkaServer kafkaServer = TestUtils.createServer(config, mock);

        String [] arguments = new String[]"--topic", topic, "--partitions", "1","--replication-factor", "1";
        // create topic
        TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments));

        List servers = new ArrayList();
        servers.add(kafkaServer);
        TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);

        // setup producer
        Properties properties = TestUtils.getProducerConfig("localhost:" + port);
        ProducerConfig producerConfig = new ProducerConfig(properties);
        Producer producer = new Producer(producerConfig);

        // setup simple consumer
        Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1);
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));

        // send message
        KeyedMessage data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8));

        List messages = new ArrayList();
        messages.add(data);

        producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
        producer.close();

        // deleting zookeeper information to make sure the consumer starts from the beginning
        // see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka
        zkClient.delete("/consumers/group0");

        // starting consumer
        Map topicCountMap = new HashMap();
        topicCountMap.put(topic, 1);
        Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream stream = consumerMap.get(topic).get(0);
        ConsumerIterator iterator = stream.iterator();

        if(iterator.hasNext()) 
            String msg = new String(iterator.next().message(), StandardCharsets.UTF_8);
            System.out.println(msg);
            assertEquals("test-message", msg);
         else 
            fail();
        

        // cleanup
        consumer.shutdown();
        kafkaServer.shutdown();
        zkClient.close();
        zkServer.shutdown();
    

Asegúrese de verificar su mvn dependency: tree para ver si hay bibliotecas en conflicto. Tuve que agregar exclusiones para slf y log4j:


    org.apache.kafka
    kafka_2.11
    0.8.2.2
    
        
            org.slf4j
            slf4j-log4j12
        
        
            log4j
            log4j
        
    


    org.apache.kafka
    kafka_2.11
    0.8.2.2
    test
    
        
            org.slf4j
            slf4j-log4j12
        
        
            log4j
            log4j
        
    


    org.apache.kafka
    kafka-clients
    0.8.2.2
    
        
            org.slf4j
            slf4j-log4j12
        
        
            log4j
            log4j
        
    

Otra opción que estoy investigando es usar apache curator: ¿es posible iniciar una instancia de servidor zookeeper en proceso, por ejemplo, para pruebas unitarias?


    org.apache.curator
    curator-test
    2.2.0-incubating
    test

TestingServer zkTestServer;

@Before
public void startZookeeper() throws Exception 
    zkTestServer = new TestingServer(2181);
    cli = CuratorFrameworkFactory.newClient(zkTestServer.getConnectString(), new RetryOneTime(2000));


@After
public void stopZookeeper() throws IOException 
    cli.close();
    zkTestServer.stop();

¿Ha intentado burlarse de los objetos de consumo de Kafka utilizando un marco de burla como Mockito?

Sección de Reseñas y Valoraciones

Si tienes alguna perplejidad o capacidad de limar nuestro post te recordamos escribir una explicación y con mucho gusto lo leeremos.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *