欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka之java编程模型

程序员文章站 2022-03-31 18:10:19
...
package com.ganglia.kafka;

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
   
public class ProducerTest2 {   
     
        public static void main(String[] args) {   
            Properties props = new Properties();   
            props.setProperty("metadata.broker.list","bfdbjc1:9092,test1:9092,test2:9092");   
            props.setProperty("serializer.class","kafka.serializer.StringEncoder");   
            ProducerConfig config = new ProducerConfig(props);   
            Producer<String, String> producer = new Producer<String, String>(config);   
            try {   
                int i =1; 
                while(true){ 
                    i++;
                    String text = new StringBuffer((i+"")).reverse()+":test-kafka_"+args[0]+"_"+i;
                    KeyedMessage<String, String> data = new KeyedMessage<String, String>("test",text);   
                    producer.send(data);   
                    Thread.sleep(100);
                    System.out.println(DateUtil.fmtDateToYMDHMS(new Date())+"\t"+text);
                } 
            } catch (Exception e) {   
                e.printStackTrace();   
            }   
            producer.close();   
        }   
}

 

1.安装zookeeper.

2.启动zookeeper.

3.启动kafka服务, 在zk1,zk2,zk3上分别运行:
   kafka-server-start.sh  ../config/server.properties /启动kafka
4. 新建一个TOPIC(replication-factor=num of brokers)
   kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181
5.假设我们在zk2上,开一个终端,发送消息至kafka
   kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test
  在发送消息的终端输入:Hello Kafka
6.假设我们在zk3上,开一个终端,显示消息的消费(zk3模拟consumer)
   kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning

 

 

package com.ganglia.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
   
public class TestConsumer extends Thread{   
        private final ConsumerConnector consumer;   
        private final String topic;   
     
        public static void main(String[] args) {   
            TestConsumer consumerThread = new TestConsumer("test");   
            consumerThread.start();   
        }   
        public TestConsumer(String topic) {   
            consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());   
            this.topic =topic;   
        }   
     
    private static ConsumerConfig createConsumerConfig() {   
        Properties props = new Properties();   
        props.put("zookeeper.connect","test1:2181,test2:2181,bfdbjc1:2181");   
        props.put("group.id", "0");   
        props.put("zookeeper.session.timeout.ms","10000");   
        return new ConsumerConfig(props);   
    }   
     
    public void run(){   
        Map<String,Integer> topickMap = new HashMap<String, Integer>();   
        topickMap.put(topic, 1);   
        Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);   
        KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);   
        ConsumerIterator<byte[],byte[]> it =stream.iterator();   
        System.out.println("*********Results********");   
        while(true){   
            if(it.hasNext()){ 
               /* MessageAndMetadata<byte[], byte[]> mm = it.next();
                System.err.println("get data:" +new String(mm.message())); */  
                System.err.println("get data:" +new String(it.next().message()));
            } 
             
        }   
    }   
}

 

  

 

 

 

相关标签: kafka java