欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

springboot配置kafka生产者和消费者详解

程序员文章站 2022-10-30 14:40:46
在原有pom.xml依赖下新添加一下kafka依赖ar包 application.properties: springboot生产者配置: springboot消费者配置: 生产者测试: 消费者测试: 总结: ① 生产者环境类配置好以后,@Autowired自动注入KafkaTemplate类,使用 ......

在原有pom.xml依赖下新添加一下kafka依赖ar包

<!--kafka-->
<dependency>
<groupid>org.springframework.kafka</groupid>
<artifactid>spring-kafka</artifactid>
<version>1.1.1.release</version>
</dependency>
<dependency>
<groupid>org.apache.kafka</groupid>
<artifactid>kafka_2.10</artifactid>
<version>0.10.0.1</version>
</dependency>

 

application.properties:

 1 #原始数据kafka读取
 2 kafka.consumer.servers=ip:9092,ip:9092(kafka消费集群ip+port端口)
 3 kafka.consumer.enable.auto.commit=true(是否自动提交)
 4 kafka.consumer.session.timeout=20000(连接超时时间)
 5 kafka.consumer.auto.commit.interval=100
 6 kafka.consumer.auto.offset.reset=latest(实时生产,实时消费,不会从头开始消费)
 7 kafka.consumer.topic=result(消费的topic)
 8 kafka.consumer.group.id=test(消费组)
 9 kafka.consumer.concurrency=10(设置消费线程数)
10 
11 #协议转换后存储kafka
12 kafka.producer.servers=ip:9092,ip:9092(kafka生产集群ip+port端口)
13 kafka.producer.topic=result(生产的topic)
14 kafka.producer.retries=0
15 kafka.producer.batch.size=4096
16 kafka.producer.linger=1
17 kafka.producer.buffer.memory=40960

 

springboot生产者配置:

 1 package com.mapbar.track_storage.config;
 2 
 3 import org.apache.kafka.clients.producer.producerconfig;
 4 import org.apache.kafka.common.serialization.stringserializer;
 5 import org.springframework.beans.factory.annotation.value;
 6 import org.springframework.context.annotation.bean;
 7 import org.springframework.context.annotation.configuration;
 8 import org.springframework.kafka.annotation.enablekafka;
 9 import org.springframework.kafka.core.defaultkafkaproducerfactory;
10 import org.springframework.kafka.core.kafkatemplate;
11 import org.springframework.kafka.core.producerfactory;
12 
13 import java.util.hashmap;
14 import java.util.map;
15 
16 /**
17 * kafka生产配置
18 * @author lvjiapeng
19 *
20 */
21 @configuration
22 @enablekafka
23 public class kafkaproducerconfig {
24 @value("${kafka.producer.servers}")
25 private string servers;
26 @value("${kafka.producer.retries}")
27 private int retries;
28 @value("${kafka.producer.batch.size}")
29 private int batchsize;
30 @value("${kafka.producer.linger}")
31 private int linger;
32 @value("${kafka.producer.buffer.memory}")
33 private int buffermemory;
34 
35 public map<string, object> producerconfigs() {
36 map<string, object> props = new hashmap<>();
37 props.put(producerconfig.bootstrap_servers_config, servers);
38 props.put(producerconfig.retries_config, retries);
39 props.put(producerconfig.batch_size_config, batchsize);
40 props.put(producerconfig.linger_ms_config, linger);
41 props.put(producerconfig.buffer_memory_config, buffermemory);
42 props.put(producerconfig.key_serializer_class_config, stringserializer.class);
43 props.put(producerconfig.value_serializer_class_config, stringserializer.class);
44 return props;
45 }
46 
47 public producerfactory<string, string> producerfactory() {
48 return new defaultkafkaproducerfactory<>(producerconfigs());
49 }
50 
51 @bean
52 public kafkatemplate<string, string> kafkatemplate() {
53 return new kafkatemplate<string, string>(producerfactory());
54 }
55 }

 

springboot消费者配置:

 1 package com.mapbar.track_storage.config;
 2 
 3 import org.apache.kafka.clients.consumer.consumerconfig;
 4 import org.apache.kafka.common.serialization.stringdeserializer;
 5 import org.springframework.beans.factory.annotation.value;
 6 import org.springframework.context.annotation.bean;
 7 import org.springframework.context.annotation.configuration;
 8 import org.springframework.kafka.annotation.enablekafka;
 9 import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
10 import org.springframework.kafka.config.kafkalistenercontainerfactory;
11 import org.springframework.kafka.core.consumerfactory;
12 import org.springframework.kafka.core.defaultkafkaconsumerfactory;
13 import org.springframework.kafka.listener.concurrentmessagelistenercontainer;
14 
15 import java.util.hashmap;
16 import java.util.map;
17 
18 /**
19 * kafka消费者配置
20 * @author lvjiapeng
21 *
22 */
23 @configuration
24 @enablekafka
25 public class kafkaconsumerconfig {
26 
27 @value("${kafka.consumer.servers}")
28 private string servers;
29 @value("${kafka.consumer.enable.auto.commit}")
30 private boolean enableautocommit;
31 @value("${kafka.consumer.session.timeout}")
32 private string sessiontimeout;
33 @value("${kafka.consumer.auto.commit.interval}")
34 private string autocommitinterval;
35 @value("${kafka.consumer.group.id}")
36 private string groupid;
37 @value("${kafka.consumer.auto.offset.reset}")
38 private string autooffsetreset;
39 @value("${kafka.consumer.concurrency}")
40 private int concurrency;
41 
42 @bean
43 public kafkalistenercontainerfactory<concurrentmessagelistenercontainer<string, string>> kafkalistenercontainerfactory() {
44 concurrentkafkalistenercontainerfactory<string, string> factory = new concurrentkafkalistenercontainerfactory<>();
45 factory.setconsumerfactory(consumerfactory());
46 factory.setconcurrency(concurrency);
47 factory.getcontainerproperties().setpolltimeout(1500);
48 return factory;
49 }
50 
51 public consumerfactory<string, string> consumerfactory() {
52 return new defaultkafkaconsumerfactory<>(consumerconfigs());
53 }
54 
55 
56 public map<string, object> consumerconfigs() {
57 map<string, object> propsmap = new hashmap<>();
58 propsmap.put(consumerconfig.bootstrap_servers_config, servers);
59 propsmap.put(consumerconfig.enable_auto_commit_config, enableautocommit);
60 propsmap.put(consumerconfig.auto_commit_interval_ms_config, autocommitinterval);
61 propsmap.put(consumerconfig.session_timeout_ms_config, sessiontimeout);
62 propsmap.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
63 propsmap.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);
64 propsmap.put(consumerconfig.group_id_config, groupid);
65 propsmap.put(consumerconfig.auto_offset_reset_config, autooffsetreset);
66 return propsmap;
67 }
68 /**
69 * kafka监听
70 * @return
71 */
72 @bean
73 public rawdatalistener listener() {
74 return new rawdatalistener();
75 }
76 
77 }

 

生产者测试:

 1 package com.mapbar.track_storage.controller;
 2 
 3 import org.springframework.beans.factory.annotation.autowired;
 4 import org.springframework.kafka.core.kafkatemplate;
 5 import org.springframework.stereotype.controller;
 6 import org.springframework.web.bind.annotation.requestmapping;
 7 import org.springframework.web.bind.annotation.requestmethod;
 8 
 9 import javax.servlet.http.httpservletrequest;
10 import javax.servlet.http.httpservletresponse;
11 import java.io.ioexception;
12 
13 @requestmapping(value = "/kafka")
14 @controller
15 public class producercontroller {
16 @autowired
17 private kafkatemplate kafkatemplate;
18 
19 @requestmapping(value = "/producer",method = requestmethod.get)
20 public void consume(httpservletrequest request, httpservletresponse response) throws ioexception{
21 string value = "{\"code\":200,\"dataversion\":\"17q1\",\"message\":\"\",\"id\":\"364f79f28eea48eefeca8c85477a10d3\",\"source\":\"didi\",\"triplist\":[{\"subtriplist\":[{\"starttimestamp\":1519879598,\"schemelist\":[{\"distance\":0.0,\"ids\":\"94666702,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519879598,\"subtripid\":0},{\"starttimestamp\":1519879727,\"schemelist\":[{\"distance\":1395.0,\"ids\":\"94666729,7298838,7291709,7291706,88613298,88613297,7297542,7297541,94698785,94698786,94698778,94698780,94698779,94698782,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519879812,\"subtripid\":1},{\"starttimestamp\":1519879836,\"schemelist\":[{\"distance\":0.0,\"ids\":\"54123007,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519879904,\"subtripid\":2},{\"starttimestamp\":1519879959,\"schemelist\":[{\"distance\":0.0,\"ids\":\"54190443,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519879959,\"subtripid\":3},{\"starttimestamp\":1519880088,\"schemelist\":[{\"distance\":2885.0,\"ids\":\"94698824,94698822,94698789,94698786,54123011,54123012,54123002,94698763,94698727,94698722,94698765,54123006,54123004,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519880300,\"subtripid\":4},{\"starttimestamp\":1519880393,\"schemelist\":[{\"distance\":2398.0,\"ids\":\"7309441,7303680,54123061,54123038,7309478,7309477,94698204,94698203,94698273,94698274,94698288,94698296,94698295,94698289,94698310,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519880636,\"subtripid\":5},{\"starttimestamp\":1519881064,\"schemelist\":[{\"distance\":35.0,\"ids\":\"7309474,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519881204,\"subtripid\":6},{\"starttimestamp\":1519881204,\"schemelist\":[{\"distance\":28.0,\"ids\":\"7309476,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519881266,\"subtripid\":7},{\"starttimestamp\":1519881291,\"schemelist\":[{\"distance\":463.0,\"ids\":\"7303683,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519881329,\"subtripid\":8}],\"starttimestamp\":1519879350,\"unusetime\":1201,\"totaltime\":2049,\"endtimestamp\":1519881399,\"tripid\":0}]}";
22 for (int i = 1; i<=500; i++){
23 kafkatemplate.send("result",value);
24 }
25 }
26 }

 

消费者测试:

 1 import net.sf.json.jsonobject;
 2 import org.apache.kafka.clients.consumer.consumerrecord;
 3 import org.apache.log4j.logger;
 4 import org.springframework.beans.factory.annotation.autowired;
 5 import org.springframework.kafka.annotation.kafkalistener;
 6 import org.springframework.stereotype.component;
 7 
 8 import java.io.ioexception;
 9 import java.util.list;
10 
11 /**
12 * kafka监听
13 * @author shangzz
14 *
15 */
16 @component
17 public class rawdatalistener {
18 logger logger=logger.getlogger(rawdatalistener.class);
19 @autowired
20 private matchroadservice matchroadservice;
21 
22 /**
23 * 实时获取kafka数据(生产一条,监听生产topic自动消费一条)
24 * @param record
25 * @throws ioexception
26 */
27 @kafkalistener(topics = {"${kafka.consumer.topic}"})
28 public void listen(consumerrecord<?, ?> record) throws ioexception {
29 string value = (string) record.value();
30 system.out.println(value);
31 }
32 
33 }

总结:

         ①  生产者环境类配置好以后,@autowired自动注入kafkatemplate类,使用send方法生产消息

         ②  消费者环境类配置好以后,方法头前使用@kafkalistener(topics = {"${kafka.consumer.topic}"})注解监听topic并传入consumerrecord<?, ?> record对象即可自动消费topic

         ③  相关kafka配置只需在application.properties照葫芦画瓢添加,修改或者删除配置并在环境配置类中做出相应修改即可