Te doy la bienvenida a nuestra página web, en este sitio vas a hallar la respuesta que estás buscando.
Solución:
Este es un ejemplo básico. No lo he probado con múltiples particiones / temas.
// Código de productor de muestra
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;
public class ProducerTest
void producer(Schema schema) throws IOException
Properties props = new Properties();
props.put("metadata.broker.list", "0:9092");
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer(config);
GenericRecord payload1 = new GenericData.Record(schema);
//Step2 : Put data in that genericrecord object
payload1.put("desc", "'testdata'");
//payload1.put("name", "अasa");
payload1.put("name", "dbevent1");
payload1.put("id", 111);
System.out.println("Original Message : "+ payload1);
//Step3 : Serialize the object to a bytearray
DatumWriterwriter = new SpecificDatumWriter(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(payload1, encoder);
encoder.flush();
out.close();
byte[] serializedBytes = out.toByteArray();
System.out.println("Sending message in bytes : " + serializedBytes);
//String serializedHex = Hex.encodeHexString(serializedBytes);
//System.out.println("Serialized Hex String : " + serializedHex);
KeyedMessage message = new KeyedMessage("page_views", serializedBytes);
producer.send(message);
producer.close();
public static void main(String[] args) throws IOException, DecoderException
ProducerTest test = new ProducerTest();
Schema schema = new Schema.Parser().parse(new File("src/test_schema.avsc"));
test.producer(schema);
// Código de consumidor de muestra
Parte 1: Código de grupo de consumidores: ya que puede tener más de varios consumidores para múltiples particiones / temas.
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by on 9/1/15.
*/
public class ConsumerGroupExample
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic)
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId)
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
public void shutdown()
if (consumer!=null) consumer.shutdown();
if (executor!=null) executor.shutdown();
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
try
if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
catch(InterruptedException e)
System.out.println("Interrupted");
public void run(int a_numThreads)
//Make a map of topic as key and no. of threads for that topic
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(a_numThreads));
//Create message streams for each topic
Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
List> streams = consumerMap.get(topic);
//initialize thread pool
executor = Executors.newFixedThreadPool(a_numThreads);
//start consuming from thread
int threadNumber = 0;
for (final KafkaStream stream : streams)
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
public static void main(String[] args)
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);
try
Thread.sleep(10000);
catch (InterruptedException ie)
example.shutdown();
Parte 2: Consumidor individual que realmente consume los mensajes.
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.codec.binary.Hex;
import java.io.File;
import java.io.IOException;
public class ConsumerTest implements Runnable
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber)
m_threadNumber = a_threadNumber;
m_stream = a_stream;
public void run()
ConsumerIteratorit = m_stream.iterator();
while(it.hasNext())
try
//System.out.println("Encoded Message received : " + message_received);
//byte[] input = Hex.decodeHex(it.next().message().toString().toCharArray());
//System.out.println("Deserializied Byte array : " + input);
byte[] received_message = it.next().message();
System.out.println(received_message);
Schema schema = null;
schema = new Schema.Parser().parse(new File("src/test_schema.avsc"));
DatumReader reader = new SpecificDatumReader(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null);
GenericRecord payload2 = null;
payload2 = reader.read(null, decoder);
System.out.println("Message received : " + payload2);
catch (Exception e)
e.printStackTrace();
System.out.println(e);
Pruebe el esquema AVRO:
"namespace": "xyz.test",
"type": "record",
"name": "payload",
"fields":[
"name": "name", "type": "string"
,
"name": "id", "type": ["int", "null"]
,
"name": "desc", "type": ["string", "null"]
]
Las cosas importantes a tener en cuenta son:
-
Necesitará los frascos estándar kafka y avro para ejecutar este código de fábrica.
-
Es muy importante props.put (“serializer.class”, “kafka.serializer.DefaultEncoder”); Don
t use stringEncoder as that won
No funciona si está enviando un byte array como mensaje. -
Puedes convertir el byte[] a un maleficio string y envía eso y en el consumidor reconvierte hexadecimal string a byte[] y luego al mensaje original.
-
Ejecute el guardián del zoológico y el corredor como se menciona aquí: – http://kafka.apache.org/documentation.html#quickstart y cree un tema llamado “page_views” o lo que quiera.
-
Ejecute ProducerTest.java y luego ConsumerGroupExample.java y vea los datos avro que se producen y consumen.
Finalmente me acordé de preguntarle a la lista de correo de Kafka y obtuve la siguiente respuesta, que funcionó a la perfección.
Sí, puede enviar mensajes como matrices de bytes. Si observa el constructor de la clase Message, verá:
def this (bytes: Array[Byte])
Ahora, mirando la API de Producer send () –
def enviar (ProducerData: ProducerData[K,V]*)
Puede configurar V para que sea del tipo Mensaje y K para lo que desee key ser. Si no le importa particionar usando un key, luego configúrelo en Tipo de mensaje también.
Gracias, neha
Si quieres obtener un byte array desde un mensaje de Avro (la parte kafka ya está respondida), use el codificador binario:
GenericDatumWriter writer = new GenericDatumWriter(schema);
ByteArrayOutputStream os = new ByteArrayOutputStream();
try
Encoder e = EncoderFactory.get().binaryEncoder(os, null);
writer.write(record, e);
e.flush();
byte[] byteData = os.toByteArray();
finally
os.close();
Sección de Reseñas y Valoraciones
Nos puedes añadir valor a nuestro contenido tributando tu veteranía en las notas.