kafka之java编程模型
程序员文章站
2022-03-31 16:52:11
...
package com.ganglia.kafka; import java.util.Date; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class ProducerTest2 { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("metadata.broker.list","bfdbjc1:9092,test1:9092,test2:9092"); props.setProperty("serializer.class","kafka.serializer.StringEncoder"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); try { int i =1; while(true){ i++; String text = new StringBuffer((i+"")).reverse()+":test-kafka_"+args[0]+"_"+i; KeyedMessage<String, String> data = new KeyedMessage<String, String>("test",text); producer.send(data); Thread.sleep(100); System.out.println(DateUtil.fmtDateToYMDHMS(new Date())+"\t"+text); } } catch (Exception e) { e.printStackTrace(); } producer.close(); } }
1.安装zookeeper.
2.启动zookeeper.
3.启动kafka服务, 在zk1,zk2,zk3上分别运行:
kafka-server-start.sh ../config/server.properties /启动kafka
4. 新建一个TOPIC(replication-factor=num of brokers)
kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181
5.假设我们在zk2上,开一个终端,发送消息至kafka
kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test
在发送消息的终端输入:Hello Kafka
6.假设我们在zk3上,开一个终端,显示消息的消费(zk3模拟consumer)
kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning
package com.ganglia.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class TestConsumer extends Thread{ private final ConsumerConnector consumer; private final String topic; public static void main(String[] args) { TestConsumer consumerThread = new TestConsumer("test"); consumerThread.start(); } public TestConsumer(String topic) { consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic =topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect","test1:2181,test2:2181,bfdbjc1:2181"); props.put("group.id", "0"); props.put("zookeeper.session.timeout.ms","10000"); return new ConsumerConfig(props); } public void run(){ Map<String,Integer> topickMap = new HashMap<String, Integer>(); topickMap.put(topic, 1); Map<String, List<KafkaStream<byte[],byte[]>>> streamMap =consumer.createMessageStreams(topickMap); KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0); ConsumerIterator<byte[],byte[]> it =stream.iterator(); System.out.println("*********Results********"); while(true){ if(it.hasNext()){ /* MessageAndMetadata<byte[], byte[]> mm = it.next(); System.err.println("get data:" +new String(mm.message())); */ System.err.println("get data:" +new String(it.next().message())); } } } }
上一篇: JS原生对象和正则表达式详解
下一篇: linux ISO本地源