最最最最最简单系列四——springboot和kafka的整合
开头先吐槽一下,网上复制粘贴的憨憨,就知道复制粘贴。
网上你能搜到很多什么xml+注解两种整合配置的方案,结局是代码又臭又长,还跑不起来(不否认网上搜的绝对是可以启动的,不过他们隐藏掉了很多步骤,而且这都9102年了,是springboot,springboot看中的就是零配置/少配置)
本帖子需要内容
- 会搭建一个springboot的项目
- 有个装着kafka的服务器/虚拟机 假设ip为192.168.132.14
- 安装kafka的传送门,怎么搭建springboot项目等我写了再放传送门
pom文件大放送
<!--父项目-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<!--依赖项-->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
然后我们再springboot的配置文件(application.properties)中添加这些配置项
#这是最简最简的配置了没办法再简 =.=
#消费者地址
spring.kafka.consumer.bootstrap-servers=192.168.132.15:9092
#消费者组默认id
spring.kafka.consumer.group-id=consumer_group_1
#生产者地址地址
spring.kafka.producer.bootstrap-servers=192.168.132.15:9092
然后我们创建几个文件
controller(用于发送消息,当然你直接创建类来也可以)
package com.zaiji.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* @Description:
* @ClassName:TestController
* @Author: zaiji
* @CreateDate: 2019/11/28 15:23
* @Version 1.0
*/
@Controller
@RequestMapping("/")
public class TestController {
@Autowired
private KafkaTemplate kafka;
@RequestMapping("send")
@ResponseBody
public void sendMessage(String msg) {
//"zaiji-topic"是在kafka中已经创建的topic名称,msg是消息内容
kafka.send("zaiji-topic", msg);
}
}
然后我们再创建一个消费者,并交给spring管理
package com.zaiji.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @Description:
* @ClassName:KafkaConsumerListener
* @Author: zaiji
* @CreateDate: 2019/11/28 13:54
* @Version 1.0
*/
@Component
public class KafkaConsumerListener {
//这里的topics是你订阅的消息的topic,是个数组
@KafkaListener(topics = {"zaiji-topic"})
void listener(ConsumerRecord<String, String> data) {
System.out.println("*********************************接受消息************************************");
System.out.println("[ 消息 来自:" + data.topic() + ",分区:" + data.partition() + " ,委托时间:" + data.timestamp() + "]");
System.out.println("**********消息内容开始**********");
System.out.println(data.value());
System.out.println("**********消息内容结束**********");
System.out.println("*********************************接受完毕************************************");
}
}
最后我们创建启动类,并开启kafka功能
package com.zaiji;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
/**
* @Description:
* @ClassName:Application
* @Author: zaiji
* @CreateDate: 2019/11/28 15:40
* @Version 1.0
*/
@SpringBootApplication
@EnableKafka
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
然后我们依次启动,zookeeper,kafka,项目,然后去浏览器/postman等工具访问 http://localhost:8080/send?msg=最最最最最简单系列四——springboot和kafka的整合
然后你的控制台可以看到如下内容
恭喜你得到了人生第一个由springboot+kafka推送的消息
一个小bug的提醒
这个错先检查kafka启动没有,确认启动之后这个错可能是kafka的配置文件(server.properties)中的listen属性没有配置,或者配置出错了
至此,springboot和kafka的最最最最最简单的整合就结束
结束语:很气,明明这么简单的一个整合,百度到的教程都差不多,挂羊头卖狗肉,标题是springboot+kafka,结果全是用spring的各种配置来整合kafka,显得不伦不类,一眼就能看出来很多博客就是在复制粘贴,我才大家和我一样在入门的时候可能都想快速得到一些结果,来让增加自己学下去的勇气和信心????,结果各个博客看完,动手实践的时候发现反复出错反复出错,根本出不了结果,也失去了学下去的兴趣。因此一气之下写了这几个最最最最最简单的教程,之后要坚持更博啊????
再见????,欢迎点赞关注
上一篇: python检测是文件还是目录的方法
下一篇: 2013技术博客汇总贴