SpringBoot Kafka 整合 实例 源码
程序员文章站
2022-06-28 12:55:51
1、使用IDEA新建工程引导方式,创建消息生产工程 springboot-kafka-producer。 工程POM文件代码如下: 注释部分为手动添加的 gson、lombok 依赖。 2、创建消息实体类 3、创建消息生产类 4、编辑资源配置文件 application.properties 5、启 ......
1、使用idea新建工程引导方式,创建消息生产工程 springboot-kafka-producer。
工程pom文件代码如下:
1 <?xml version="1.0" encoding="utf-8"?> 2 <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" 3 xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelversion>4.0.0</modelversion> 5 6 <groupid>com.miniooc</groupid> 7 <artifactid>springboot-kafka-producer</artifactid> 8 <version>1.0.0-snapshot</version> 9 <packaging>jar</packaging> 10 11 <name>springboot-kafka-producer</name> 12 <description>demo project for spring boot</description> 13 14 <parent> 15 <groupid>org.springframework.boot</groupid> 16 <artifactid>spring-boot-starter-parent</artifactid> 17 <version>2.0.3.release</version> 18 <relativepath/> 19 </parent> 20 21 <properties> 22 <spring-cloud.version>finchley.release</spring-cloud.version> 23 </properties> 24 25 <dependencies> 26 <dependency> 27 <groupid>org.springframework.boot</groupid> 28 <artifactid>spring-boot-starter-web</artifactid> 29 </dependency> 30 <dependency> 31 <groupid>org.springframework.boot</groupid> 32 <artifactid>spring-boot-starter-actuator</artifactid> 33 </dependency> 34 <dependency> 35 <groupid>org.springframework.kafka</groupid> 36 <artifactid>spring-kafka</artifactid> 37 </dependency> 38 <dependency> 39 <groupid>org.springframework.boot</groupid> 40 <artifactid>spring-boot-starter-test</artifactid> 41 <scope>test</scope> 42 </dependency> 43 44 <!-- 添加 gson 依赖 --> 45 <dependency> 46 <groupid>com.google.code.gson</groupid> 47 <artifactid>gson</artifactid> 48 <version>2.8.5</version> 49 </dependency> 50 <!-- 添加 lombok 依赖 --> 51 <dependency> 52 <groupid>org.projectlombok</groupid> 53 <artifactid>lombok</artifactid> 54 <version>1.16.22</version> 55 <scope>provided</scope> 56 </dependency> 57 </dependencies> 58 59 <dependencymanagement> 60 <dependencies> 61 <dependency> 62 <groupid>org.springframework.cloud</groupid> 63 <artifactid>spring-cloud-dependencies</artifactid> 64 <version>${spring-cloud.version}</version> 65 <type>pom</type> 66 <scope>import</scope> 67 </dependency> 68 </dependencies> 69 </dependencymanagement> 70 71 <build> 72 <plugins> 73 <plugin> 74 <groupid>org.springframework.boot</groupid> 75 <artifactid>spring-boot-maven-plugin</artifactid> 76 </plugin> 77 </plugins> 78 </build> 79 80 81 </project>
注释部分为手动添加的 gson、lombok 依赖。
2、创建消息实体类
1 package com.miniooc.kafka.message; 2 3 import lombok.data; 4 5 import java.io.serializable; 6 import java.util.date; 7 import java.util.list; 8 9 @data 10 public class orderbasic implements serializable { 11 12 /** 13 * 订单id 14 */ 15 private string orderid; 16 /** 17 * 订单编号 18 */ 19 private string ordernumber; 20 /** 21 * 订单日期 22 */ 23 private date date; 24 /** 25 * 订单信息 26 */ 27 private list<string> desc; 28 29 }
3、创建消息生产类
1 /** 2 * 3 */ 4 package com.miniooc.kafka.producer; 5 6 import com.google.gson.gsonbuilder; 7 import com.miniooc.kafka.message.orderbasic; 8 import lombok.extern.java.log; 9 import org.springframework.beans.factory.annotation.value; 10 import org.springframework.kafka.core.kafkatemplate; 11 import org.springframework.stereotype.component; 12 13 import javax.annotation.resource; 14 15 /** 16 * kafka消息生产类 17 */ 18 @log 19 @component 20 public class kafkaproducer { 21 22 @resource 23 private kafkatemplate<string, string> kafkatemplate; 24 25 @value("${kafka.topic.order}") 26 private string topicorder; 27 28 /** 29 * 发送订单消息 30 * 31 * @param orderbasic 订单信息 32 */ 33 public void sendordermessage(orderbasic orderbasic) { 34 gsonbuilder builder = new gsonbuilder(); 35 builder.setprettyprinting(); 36 builder.setdateformat("yyyy-mm-dd hh:mm:ss"); 37 string message = builder.create().tojson(orderbasic); 38 kafkatemplate.send(topicorder, message); 39 log.info("\n" + message); 40 } 41 }
4、编辑资源配置文件 application.properties
1 server.port=9526 2 spring.application.name=kafka-producer 3 kafka.bootstrap.servers=localhost:9092 4 kafka.topic.order=topic-order 5 kafka.group.id=group-order
5、启动 zookeeper
d:\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
6、启动 kafka
d:\kafka>bin\windows\kafka-server-start.bat config\server.properties
7、运行工程,通过控制器调用消息生产类,创建一条消息到kafka
看到红框内容,说明消息发送成功。
8、再使用idea新建工程引导方式,创建消息消费工程 springboot-kafka-producer。
9、创建消息消费类,并监听topic。
1 package com.miniooc.kafka.consumer; 2 3 import com.google.gson.gson; 4 import com.google.gson.gsonbuilder; 5 import com.google.gson.reflect.typetoken; 6 import com.miniooc.kafka.message.orderbasic; 7 import lombok.extern.java.log; 8 import org.springframework.kafka.annotation.kafkalistener; 9 import org.springframework.messaging.handler.annotation.payload; 10 import org.springframework.stereotype.component; 11 12 @log 13 @component 14 public class kafkaconsumer { 15 16 @kafkalistener(topics = "${kafka.topic.order}", containerfactory = "kafkalistenercontainerfactory") 17 public void consume(@payload string message) { 18 gsonbuilder builder = new gsonbuilder(); 19 builder.setprettyprinting(); 20 builder.setdateformat("yyyy-mm-dd hh:mm:ss"); 21 gson gson = builder.create(); 22 orderbasic orderbasic = gson.fromjson(message, new typetoken<orderbasic>() { 23 }.gettype()); 24 string json = gson.tojson(orderbasic); 25 log.info("\n接受并消费消息\n" + json); 26 } 27 }
10、运行工程。
看到红框内容,说明消息消费成功。
springboot kafka 整合完成!
推荐阅读
-
springboot activiti 整合项目框架源码 druid 数据库连接池 shiro 安全框架
-
SpringBoot整合Elasticsearch详细步骤以及代码示例(附源码)
-
windows10风格 springboot vue.js html 跨域 前后分离 activiti 整合项目框架源码
-
Springboot 整合通用mapper和pagehelper展示分页数据的问题(附github源码)
-
springboot windows10风格 activiti 整合项目框架源码 shiro 安全框架 druid 数据库连接池
-
SpringBoot整合模板引擎过程代码实例
-
SpringBoot2.x系列二:整合第三方组件Mybatis、JPA、Redis、Elasticsearch、ActiveMQ、Kafka、Logback
-
Flink+Kafka整合的实例
-
SpringBoot2.0源码分析(二):整合ActiveMQ分析
-
Springboot 1.5.7整合Kafka-client代码示例