欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Kafka简单客户端编程实例

程序员文章站 2023-12-21 10:44:58
今天,我们给大家带来一篇如何利用kafka的api进行客户端编程的文章,这篇文章很简单,就是利用kafka的api创建一个生产者和消费者,生产者不断向kafka写入消息,消...

今天,我们给大家带来一篇如何利用kafka的api进行客户端编程的文章,这篇文章很简单,就是利用kafka的api创建一个生产者和消费者,生产者不断向kafka写入消息,消费者则不断消费kafka的消息。下面是具体的实例代码。

一、创建配置类config

这个类很简单,只是存放了两个常量,一个是话题topic,一个是线程数threads

package com.lya.kafka; 
 
/** 
 * 配置项 
 * @author liuyazhuang 
 * 
 */ 
public class config { 
  
 /** 
  * 话题 
  */ 
 public static final string topic = "wordcount"; 
 /** 
  * 线程数 
  */ 
 public static final integer threads = 1; 
} 

二、编程生产者类producerdemo

这个类的主要作用就是向kafka写入相应的消息,并且将消息写入wordcount话题。

package com.lya.kafka; 
 
import java.util.properties; 
 
import kafka.javaapi.producer.producer; 
import kafka.producer.keyedmessage; 
import kafka.producer.producerconfig; 
 
/** 
 * 生产者实例 
 * @author liuyazhuang 
 * 
 */ 
public class producerdemo { 
 public static void main(string[] args) throws exception { 
  properties props = new properties(); 
  props.put("zk.connect", "192.168.209.121:2181"); 
  props.put("metadata.broker.list","192.168.209.121:9092"); 
  props.put("serializer.class", "kafka.serializer.stringencoder"); 
  props.put("zk.connectiontimeout.ms", "15000"); 
  producerconfig config = new producerconfig(props); 
  producer<string, string> producer = new producer<string, string>(config); 
 
  // 发送业务消息 
  // 读取文件 读取内存数据库 读socket端口 
  for (int i = 1; i <= 100; i++) { 
   thread.sleep(500); 
   producer.send(new keyedmessage<string, string>(config.topic, 
     "this number ===>>> " + i)); 
  } 
 
 } 
} 

三、编写消息者类consumerdemo

这个类的主要作用就是消费kafka中wordcount话题的消息。

package com.lya.kafka; 
 
import java.util.hashmap; 
import java.util.list; 
import java.util.map; 
import java.util.properties; 
 
import kafka.consumer.consumer; 
import kafka.consumer.consumerconfig; 
import kafka.consumer.kafkastream; 
import kafka.javaapi.consumer.consumerconnector; 
import kafka.message.messageandmetadata; 
 
/** 
 * 消费者实例 
 * @author liuyazhuang 
 * 
 */ 
public class consumerdemo { 
  
 
 public static void main(string[] args) { 
   
  properties props = new properties(); 
  props.put("zookeeper.connect", "192.168.209.121:2181"); 
  props.put("group.id", "1111"); 
  props.put("auto.offset.reset", "smallest"); 
  props.put("zk.connectiontimeout.ms", "15000"); 
 
  consumerconfig config = new consumerconfig(props); 
  consumerconnector consumer =consumer.createjavaconsumerconnector(config); 
  map<string, integer> topiccountmap = new hashmap<string, integer>(); 
  topiccountmap.put(config.topic, config.threads); 
  map<string, list<kafkastream<byte[], byte[]>>> consumermap = consumer.createmessagestreams(topiccountmap); 
  list<kafkastream<byte[], byte[]>> streams = consumermap.get(config.topic); 
   
  for(final kafkastream<byte[], byte[]> kafkastream : streams){ 
   new thread(new runnable() { 
    @override 
    public void run() { 
     for(messageandmetadata<byte[], byte[]> mm : kafkastream){ 
      string msg = new string(mm.message()); 
      system.out.println(msg); 
     } 
    } 
    
   }).start(); 
   
  } 
 } 
} 

四、运行实例

首先,运行消费者类consumerdemo
运行结果如下:

Kafka简单客户端编程实例

没有打印任何信息。
此时,我们运行生产者类producerdemo
我们再次打开消费者的控制台查看如下:

Kafka简单客户端编程实例

打印出了生产者生产的消息。
至此,kafka简单客户端编程实例结束。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

上一篇:

下一篇: