springboot配置kafka生产者和消费者详解
程序员文章站
2022-10-30 14:40:46
在原有pom.xml依赖下新添加一下kafka依赖ar包 application.properties: springboot生产者配置: springboot消费者配置: 生产者测试: 消费者测试: 总结: ① 生产者环境类配置好以后,@Autowired自动注入KafkaTemplate类,使用 ......
在原有pom.xml依赖下新添加一下kafka依赖ar包
<!--kafka--> <dependency> <groupid>org.springframework.kafka</groupid> <artifactid>spring-kafka</artifactid> <version>1.1.1.release</version> </dependency> <dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka_2.10</artifactid> <version>0.10.0.1</version> </dependency>
application.properties:
1 #原始数据kafka读取 2 kafka.consumer.servers=ip:9092,ip:9092(kafka消费集群ip+port端口) 3 kafka.consumer.enable.auto.commit=true(是否自动提交) 4 kafka.consumer.session.timeout=20000(连接超时时间) 5 kafka.consumer.auto.commit.interval=100 6 kafka.consumer.auto.offset.reset=latest(实时生产,实时消费,不会从头开始消费) 7 kafka.consumer.topic=result(消费的topic) 8 kafka.consumer.group.id=test(消费组) 9 kafka.consumer.concurrency=10(设置消费线程数) 10 11 #协议转换后存储kafka 12 kafka.producer.servers=ip:9092,ip:9092(kafka生产集群ip+port端口) 13 kafka.producer.topic=result(生产的topic) 14 kafka.producer.retries=0 15 kafka.producer.batch.size=4096 16 kafka.producer.linger=1 17 kafka.producer.buffer.memory=40960
springboot生产者配置:
1 package com.mapbar.track_storage.config; 2 3 import org.apache.kafka.clients.producer.producerconfig; 4 import org.apache.kafka.common.serialization.stringserializer; 5 import org.springframework.beans.factory.annotation.value; 6 import org.springframework.context.annotation.bean; 7 import org.springframework.context.annotation.configuration; 8 import org.springframework.kafka.annotation.enablekafka; 9 import org.springframework.kafka.core.defaultkafkaproducerfactory; 10 import org.springframework.kafka.core.kafkatemplate; 11 import org.springframework.kafka.core.producerfactory; 12 13 import java.util.hashmap; 14 import java.util.map; 15 16 /** 17 * kafka生产配置 18 * @author lvjiapeng 19 * 20 */ 21 @configuration 22 @enablekafka 23 public class kafkaproducerconfig { 24 @value("${kafka.producer.servers}") 25 private string servers; 26 @value("${kafka.producer.retries}") 27 private int retries; 28 @value("${kafka.producer.batch.size}") 29 private int batchsize; 30 @value("${kafka.producer.linger}") 31 private int linger; 32 @value("${kafka.producer.buffer.memory}") 33 private int buffermemory; 34 35 public map<string, object> producerconfigs() { 36 map<string, object> props = new hashmap<>(); 37 props.put(producerconfig.bootstrap_servers_config, servers); 38 props.put(producerconfig.retries_config, retries); 39 props.put(producerconfig.batch_size_config, batchsize); 40 props.put(producerconfig.linger_ms_config, linger); 41 props.put(producerconfig.buffer_memory_config, buffermemory); 42 props.put(producerconfig.key_serializer_class_config, stringserializer.class); 43 props.put(producerconfig.value_serializer_class_config, stringserializer.class); 44 return props; 45 } 46 47 public producerfactory<string, string> producerfactory() { 48 return new defaultkafkaproducerfactory<>(producerconfigs()); 49 } 50 51 @bean 52 public kafkatemplate<string, string> kafkatemplate() { 53 return new kafkatemplate<string, string>(producerfactory()); 54 } 55 }
springboot消费者配置:
1 package com.mapbar.track_storage.config; 2 3 import org.apache.kafka.clients.consumer.consumerconfig; 4 import org.apache.kafka.common.serialization.stringdeserializer; 5 import org.springframework.beans.factory.annotation.value; 6 import org.springframework.context.annotation.bean; 7 import org.springframework.context.annotation.configuration; 8 import org.springframework.kafka.annotation.enablekafka; 9 import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory; 10 import org.springframework.kafka.config.kafkalistenercontainerfactory; 11 import org.springframework.kafka.core.consumerfactory; 12 import org.springframework.kafka.core.defaultkafkaconsumerfactory; 13 import org.springframework.kafka.listener.concurrentmessagelistenercontainer; 14 15 import java.util.hashmap; 16 import java.util.map; 17 18 /** 19 * kafka消费者配置 20 * @author lvjiapeng 21 * 22 */ 23 @configuration 24 @enablekafka 25 public class kafkaconsumerconfig { 26 27 @value("${kafka.consumer.servers}") 28 private string servers; 29 @value("${kafka.consumer.enable.auto.commit}") 30 private boolean enableautocommit; 31 @value("${kafka.consumer.session.timeout}") 32 private string sessiontimeout; 33 @value("${kafka.consumer.auto.commit.interval}") 34 private string autocommitinterval; 35 @value("${kafka.consumer.group.id}") 36 private string groupid; 37 @value("${kafka.consumer.auto.offset.reset}") 38 private string autooffsetreset; 39 @value("${kafka.consumer.concurrency}") 40 private int concurrency; 41 42 @bean 43 public kafkalistenercontainerfactory<concurrentmessagelistenercontainer<string, string>> kafkalistenercontainerfactory() { 44 concurrentkafkalistenercontainerfactory<string, string> factory = new concurrentkafkalistenercontainerfactory<>(); 45 factory.setconsumerfactory(consumerfactory()); 46 factory.setconcurrency(concurrency); 47 factory.getcontainerproperties().setpolltimeout(1500); 48 return factory; 49 } 50 51 public consumerfactory<string, string> consumerfactory() { 52 return new defaultkafkaconsumerfactory<>(consumerconfigs()); 53 } 54 55 56 public map<string, object> consumerconfigs() { 57 map<string, object> propsmap = new hashmap<>(); 58 propsmap.put(consumerconfig.bootstrap_servers_config, servers); 59 propsmap.put(consumerconfig.enable_auto_commit_config, enableautocommit); 60 propsmap.put(consumerconfig.auto_commit_interval_ms_config, autocommitinterval); 61 propsmap.put(consumerconfig.session_timeout_ms_config, sessiontimeout); 62 propsmap.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class); 63 propsmap.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class); 64 propsmap.put(consumerconfig.group_id_config, groupid); 65 propsmap.put(consumerconfig.auto_offset_reset_config, autooffsetreset); 66 return propsmap; 67 } 68 /** 69 * kafka监听 70 * @return 71 */ 72 @bean 73 public rawdatalistener listener() { 74 return new rawdatalistener(); 75 } 76 77 }
生产者测试:
1 package com.mapbar.track_storage.controller; 2 3 import org.springframework.beans.factory.annotation.autowired; 4 import org.springframework.kafka.core.kafkatemplate; 5 import org.springframework.stereotype.controller; 6 import org.springframework.web.bind.annotation.requestmapping; 7 import org.springframework.web.bind.annotation.requestmethod; 8 9 import javax.servlet.http.httpservletrequest; 10 import javax.servlet.http.httpservletresponse; 11 import java.io.ioexception; 12 13 @requestmapping(value = "/kafka") 14 @controller 15 public class producercontroller { 16 @autowired 17 private kafkatemplate kafkatemplate; 18 19 @requestmapping(value = "/producer",method = requestmethod.get) 20 public void consume(httpservletrequest request, httpservletresponse response) throws ioexception{ 21 string value = "{\"code\":200,\"dataversion\":\"17q1\",\"message\":\"\",\"id\":\"364f79f28eea48eefeca8c85477a10d3\",\"source\":\"didi\",\"triplist\":[{\"subtriplist\":[{\"starttimestamp\":1519879598,\"schemelist\":[{\"distance\":0.0,\"ids\":\"94666702,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519879598,\"subtripid\":0},{\"starttimestamp\":1519879727,\"schemelist\":[{\"distance\":1395.0,\"ids\":\"94666729,7298838,7291709,7291706,88613298,88613297,7297542,7297541,94698785,94698786,94698778,94698780,94698779,94698782,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519879812,\"subtripid\":1},{\"starttimestamp\":1519879836,\"schemelist\":[{\"distance\":0.0,\"ids\":\"54123007,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519879904,\"subtripid\":2},{\"starttimestamp\":1519879959,\"schemelist\":[{\"distance\":0.0,\"ids\":\"54190443,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519879959,\"subtripid\":3},{\"starttimestamp\":1519880088,\"schemelist\":[{\"distance\":2885.0,\"ids\":\"94698824,94698822,94698789,94698786,54123011,54123012,54123002,94698763,94698727,94698722,94698765,54123006,54123004,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519880300,\"subtripid\":4},{\"starttimestamp\":1519880393,\"schemelist\":[{\"distance\":2398.0,\"ids\":\"7309441,7303680,54123061,54123038,7309478,7309477,94698204,94698203,94698273,94698274,94698288,94698296,94698295,94698289,94698310,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519880636,\"subtripid\":5},{\"starttimestamp\":1519881064,\"schemelist\":[{\"distance\":35.0,\"ids\":\"7309474,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519881204,\"subtripid\":6},{\"starttimestamp\":1519881204,\"schemelist\":[{\"distance\":28.0,\"ids\":\"7309476,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519881266,\"subtripid\":7},{\"starttimestamp\":1519881291,\"schemelist\":[{\"distance\":463.0,\"ids\":\"7303683,\",\"schemeid\":0,\"linklist\":[{\"score\":72,\"distance\":1,\"gpslist\":[{\"origlonlat\":\"116.321343,40.43242\",\"grablonlat\":\"112.32312,40.32132\",\"timestamp\":1515149926000}]}]}],\"endtimestamp\":1519881329,\"subtripid\":8}],\"starttimestamp\":1519879350,\"unusetime\":1201,\"totaltime\":2049,\"endtimestamp\":1519881399,\"tripid\":0}]}"; 22 for (int i = 1; i<=500; i++){ 23 kafkatemplate.send("result",value); 24 } 25 } 26 }
消费者测试:
1 import net.sf.json.jsonobject; 2 import org.apache.kafka.clients.consumer.consumerrecord; 3 import org.apache.log4j.logger; 4 import org.springframework.beans.factory.annotation.autowired; 5 import org.springframework.kafka.annotation.kafkalistener; 6 import org.springframework.stereotype.component; 7 8 import java.io.ioexception; 9 import java.util.list; 10 11 /** 12 * kafka监听 13 * @author shangzz 14 * 15 */ 16 @component 17 public class rawdatalistener { 18 logger logger=logger.getlogger(rawdatalistener.class); 19 @autowired 20 private matchroadservice matchroadservice; 21 22 /** 23 * 实时获取kafka数据(生产一条,监听生产topic自动消费一条) 24 * @param record 25 * @throws ioexception 26 */ 27 @kafkalistener(topics = {"${kafka.consumer.topic}"}) 28 public void listen(consumerrecord<?, ?> record) throws ioexception { 29 string value = (string) record.value(); 30 system.out.println(value); 31 } 32 33 }
总结:
① 生产者环境类配置好以后,@autowired自动注入kafkatemplate类,使用send方法生产消息
② 消费者环境类配置好以后,方法头前使用@kafkalistener(topics = {"${kafka.consumer.topic}"})注解监听topic并传入consumerrecord<?, ?> record对象即可自动消费topic
③ 相关kafka配置只需在application.properties照葫芦画瓢添加,修改或者删除配置并在环境配置类中做出相应修改即可