Celeste, miembro de este equipo, nos hizo el favor de escribir este tutorial ya que controla a la perfección este tema.
Solución:
https://gist.github.com/asmaier/6465468#file-kafkaproducertest-java
Este ejemplo se actualizó para que funcione en la nueva versión 0.8.2.2. Aquí está el fragmento de código con dependencias de maven:
pom.xml:
junit
junit
4.12
org.apache.kafka
kafka_2.11
0.8.2.2
org.apache.kafka
kafka_2.11
0.8.2.2
test
org.apache.kafka
kafka-clients
0.8.2.2
KafkaProducerTest.java:
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.junit.Test;
import kafka.admin.TopicCommand;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.TestZKUtils;
import kafka.utils.Time;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
import static org.junit.Assert.*;
/**
* For online documentation
* see
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/utils/TestUtils.scala
* https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/TopicCommand.scala
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
*/
public class KafkaProducerTest
private int brokerId = 0;
private String topic = "test";
@Test
public void producerTest() throws InterruptedException
// setup Zookeeper
String zkConnect = TestZKUtils.zookeeperConnect();
EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
// setup Broker
int port = TestUtils.choosePort();
Properties props = TestUtils.createBrokerConfig(brokerId, port, true);
KafkaConfig config = new KafkaConfig(props);
Time mock = new MockTime();
KafkaServer kafkaServer = TestUtils.createServer(config, mock);
String [] arguments = new String[]"--topic", topic, "--partitions", "1","--replication-factor", "1";
// create topic
TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments));
List servers = new ArrayList();
servers.add(kafkaServer);
TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);
// setup producer
Properties properties = TestUtils.getProducerConfig("localhost:" + port);
ProducerConfig producerConfig = new ProducerConfig(properties);
Producer producer = new Producer(producerConfig);
// setup simple consumer
Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
// send message
KeyedMessage data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8));
List messages = new ArrayList();
messages.add(data);
producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
producer.close();
// deleting zookeeper information to make sure the consumer starts from the beginning
// see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka
zkClient.delete("/consumers/group0");
// starting consumer
Map topicCountMap = new HashMap();
topicCountMap.put(topic, 1);
Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream stream = consumerMap.get(topic).get(0);
ConsumerIterator iterator = stream.iterator();
if(iterator.hasNext())
String msg = new String(iterator.next().message(), StandardCharsets.UTF_8);
System.out.println(msg);
assertEquals("test-message", msg);
else
fail();
// cleanup
consumer.shutdown();
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
Asegúrese de verificar su mvn dependency: tree para ver si hay bibliotecas en conflicto. Tuve que agregar exclusiones para slf y log4j:
org.apache.kafka
kafka_2.11
0.8.2.2
org.slf4j
slf4j-log4j12
log4j
log4j
org.apache.kafka
kafka_2.11
0.8.2.2
test
org.slf4j
slf4j-log4j12
log4j
log4j
org.apache.kafka
kafka-clients
0.8.2.2
org.slf4j
slf4j-log4j12
log4j
log4j
Otra opción que estoy investigando es usar apache curator: ¿es posible iniciar una instancia de servidor zookeeper en proceso, por ejemplo, para pruebas unitarias?
org.apache.curator
curator-test
2.2.0-incubating
test
TestingServer zkTestServer;
@Before
public void startZookeeper() throws Exception
zkTestServer = new TestingServer(2181);
cli = CuratorFrameworkFactory.newClient(zkTestServer.getConnectString(), new RetryOneTime(2000));
@After
public void stopZookeeper() throws IOException
cli.close();
zkTestServer.stop();
¿Ha intentado burlarse de los objetos de consumo de Kafka utilizando un marco de burla como Mockito?
Sección de Reseñas y Valoraciones
Si tienes alguna perplejidad o capacidad de limar nuestro post te recordamos escribir una explicación y con mucho gusto lo leeremos.