Kafka入门
kafka下载:http://kafka.apache.org/downloads
解压下载下来的文件,bin目录下是常用命令,config目录下是配置文件。
kafka已经内置了一个zookeeper环境,可以直接使用。
建议kafka在linux环境下使用。
Kafka的简单测试
以下是在windows环境下的一个kafka测试:
配置则直接使用默认的配置即可。
1,启动zookeeper:
进入bin/windows目录,运行命令:
>zookeeper-server-start.bat ../../config/zookeeper.properties
2,启动kafka:
重新打开一个窗口,进入bin/windows目录,运行命令:
>kafka-server-start.bat ../../config/server.properties
3,创建一个Topic:
重新打开一个窗口,进入bin/windows目录,运行命令:
>kafka-topics.bat -create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 -topic test
创建Topic完成后,可通过命令查看Topic:
>kafka-topics.bat -list --zookeeper localhost:2181
4,创建一个消息消费者:
重新打开一个窗口,进入bin/windows目录,运行命令:
>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
执行命令后,暂时不会有任何信息返回,消费者在等待信息。
5,创建一个消息生产者:
重新打开一个窗口,进入bin/windows目录,运行命令:
>kafka-console-producer.bat --broker-list localhost:9092 --topic test
此时输入信息,然后按回车键。则完成了一条消息的生产,此时可看到在消息消费者窗口,会打印出刚刚生产的消息。
此时一个Kafka的使用实例完成。
Java代码完成Kafka测试
也可以直接通过Java代码来完成上述过程:
如上面步骤3:创建一个Topic,Java代码如下:
public static void main(String[] args) { // TODO Auto-generated method stub Properties prop = new Properties(); prop.put("bootstrap.servers", "localhost:9092"); AdminClient ac = AdminClient.create(prop); ArrayList<NewTopic> topics = new ArrayList<NewTopic>(); NewTopic topic = new NewTopic("topic-test2",1,(short)1); topics.add(topic); CreateTopicsResult result = ac.createTopics(topics); try { System.out.println("show create topics result:"); result.all().get(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
然后通过命令
>kafka-topics.bat -list --zookeeper localhost:2181
可查看刚刚通过程序新增的Topic。
创建一个消息生产者,
public void createMsg(){ prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> p = new KafkaProducer<String,String>(prop); for(int i=0;i<50;i++){ p.send(new ProducerRecord<String,String>("topic-test1","topictest message" + i)); } p.close(); }
生产的消息能够通过命令查看:
>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic-test1 --from-beginning
创建一个消息消费者,把收到的消息信息展示出来:
public void doconsume(){ prop.put("group.id", "test"); prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); final KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(prop); consumer.subscribe(Arrays.asList("topic-test1"),new ConsumerRebalanceListener(){ public void onPartitionsRevoked( Collection<TopicPartition> partitions) { // TODO Auto-generated method stub } public void onPartitionsAssigned( Collection<TopicPartition> partitions) { // TODO Auto-generated method stub consumer.seekToBeginning(partitions); } }); while(true){ ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100)); for(ConsumerRecord<String,String> record: records) System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value()); } }