欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

最最最最最简单系列四——springboot和kafka的整合

程序员文章站 2022-05-06 13:26:13
...

开头先吐槽一下,网上复制粘贴的憨憨,就知道复制粘贴。

网上你能搜到很多什么xml+注解两种整合配置的方案,结局是代码又臭又长,还跑不起来(不否认网上搜的绝对是可以启动的,不过他们隐藏掉了很多步骤,而且这都9102年了,是springboot,springboot看中的就是零配置/少配置)

本帖子需要内容

  • 会搭建一个springboot的项目
  • 有个装着kafka的服务器/虚拟机 假设ip为192.168.132.14
    • 安装kafka的传送门,怎么搭建springboot项目等我写了再放传送门

pom文件大放送

    <!--父项目-->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!--依赖项-->
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.0</version>
        </dependency>
    </dependencies>

然后我们再springboot的配置文件(application.properties)中添加这些配置项

#这是最简最简的配置了没办法再简 =.=
#消费者地址
spring.kafka.consumer.bootstrap-servers=192.168.132.15:9092

#消费者组默认id
spring.kafka.consumer.group-id=consumer_group_1

#生产者地址地址
spring.kafka.producer.bootstrap-servers=192.168.132.15:9092

然后我们创建几个文件

controller(用于发送消息,当然你直接创建类来也可以)

package com.zaiji.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * @Description:
 * @ClassName:TestController
 * @Author: zaiji
 * @CreateDate: 2019/11/28 15:23
 * @Version 1.0
 */
@Controller
@RequestMapping("/")
public class TestController {

    @Autowired
    private KafkaTemplate kafka;

    @RequestMapping("send")
    @ResponseBody
    public void sendMessage(String msg) {
        //"zaiji-topic"是在kafka中已经创建的topic名称,msg是消息内容
        kafka.send("zaiji-topic", msg);
    }
}

然后我们再创建一个消费者,并交给spring管理

package com.zaiji.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * @ClassName:KafkaConsumerListener
 * @Author: zaiji
 * @CreateDate: 2019/11/28 13:54
 * @Version 1.0
 */
@Component
public class KafkaConsumerListener {

    //这里的topics是你订阅的消息的topic,是个数组
    @KafkaListener(topics = {"zaiji-topic"})
    void listener(ConsumerRecord<String, String> data) {
        System.out.println("*********************************接受消息************************************");
        System.out.println("[ 消息 来自:" + data.topic() + ",分区:" + data.partition() + " ,委托时间:" + data.timestamp() + "]");
        System.out.println("**********消息内容开始**********");
        System.out.println(data.value());
        System.out.println("**********消息内容结束**********");
        System.out.println("*********************************接受完毕************************************");
    }
}

最后我们创建启动类,并开启kafka功能

package com.zaiji;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

/**
 * @Description:
 * @ClassName:Application
 * @Author: zaiji
 * @CreateDate: 2019/11/28 15:40
 * @Version 1.0
 */
@SpringBootApplication
@EnableKafka
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

然后我们依次启动,zookeeper,kafka,项目,然后去浏览器/postman等工具访问 http://localhost:8080/send?msg=最最最最最简单系列四——springboot和kafka的整合

然后你的控制台可以看到如下内容

最最最最最简单系列四——springboot和kafka的整合

恭喜你得到了人生第一个由springboot+kafka推送的消息

一个小bug的提醒最最最最最简单系列四——springboot和kafka的整合

这个错先检查kafka启动没有,确认启动之后这个错可能是kafka的配置文件(server.properties)中的listen属性没有配置,或者配置出错了

至此,springboot和kafka的最最最最最简单的整合就结束

结束语:很气,明明这么简单的一个整合,百度到的教程都差不多,挂羊头卖狗肉,标题是springboot+kafka,结果全是用spring的各种配置来整合kafka,显得不伦不类,一眼就能看出来很多博客就是在复制粘贴,我才大家和我一样在入门的时候可能都想快速得到一些结果,来让增加自己学下去的勇气和信心????,结果各个博客看完,动手实践的时候发现反复出错反复出错,根本出不了结果,也失去了学下去的兴趣。因此一气之下写了这几个最最最最最简单的教程,之后要坚持更博啊????

再见????,欢迎点赞关注