欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot集成Kafka的步骤

程序员文章站 2022-06-22 09:13:00
springboot集成kafka本篇主要讲解springboot 如何集成kafka ,并且简单的 编写了一个demo 来测试 发送和消费功能前言选择的版本如下:springboot : 2.3.4...

springboot集成kafka

本篇主要讲解springboot 如何集成kafka ,并且简单的 编写了一个demo 来测试 发送和消费功能

前言

选择的版本如下:

springboot : 2.3.4.release

spring-kafka : 2.5.6.release

kafka : 2.5.1

zookeeper : 3.4.14

本demo 使用的是 springboot 比较高的版本 springboot 2.3.4.release 它会引入 spring-kafka 2.5.6 release ,对应了版本关系中的
spring boot 2.3 users should use 2.5.x (boot dependency management will use the correct version).

spring和 kafka 的版本 关系

...

1.搭建kafka 和 zookeeper 环境

搭建kafka 和 zookeeper 环境 并且启动 它们

2.创建demo 项目引入spring-kafka

2.1 pom 文件

<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-web</artifactid>
</dependency>
<dependency>
  <groupid>org.springframework.kafka</groupid>
  <artifactid>spring-kafka</artifactid>
</dependency>

<dependency>
  <groupid>com.google.code.gson</groupid>
  <artifactid>gson</artifactid>
</dependency>

2.2 配置application.yml

spring:
 kafka:
  bootstrap-servers: 192.168.25.6:9092 #bootstrap-servers:连接kafka的地址,多个地址用逗号分隔
  consumer:
   group-id: mygroup
   enable-auto-commit: true
   auto-commit-interval: 100ms
   properties:
    session.timeout.ms: 15000
   key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
   value-deserializer: org.apache.kafka.common.serialization.stringdeserializer
   auto-offset-reset: earliest
  producer:
   retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送
   batch-size: 16384 #当将多个记录被发送到同一个分区时, producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
   buffer-memory: 33554432 #producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
   key-serializer: org.apache.kafka.common.serialization.stringserializer #关键字的序列化类
   value-serializer: org.apache.kafka.common.serialization.stringserializer #值的序列化类

2.3 定义消息体message

/**
 * @author johnny
 * @create 2020-09-23 上午9:21
 **/
@data
public class message {


  private long id;

  private string msg;

  private date sendtime;
}

2.4 定义kafkasender

主要利用 kafkatemplate 来发送消息 ,将消息封装成message 并且进行 转化成json串 发送到kafka中

@component
@slf4j
public class kafkasender {

  private final kafkatemplate<string, string> kafkatemplate;

  //构造器方式注入 kafkatemplate
  public kafkasender(kafkatemplate<string, string> kafkatemplate) {
    this.kafkatemplate = kafkatemplate;
  }

  private gson gson = new gsonbuilder().create();

  public void send(string msg) {
    message message = new message();

    message.setid(system.currenttimemillis());
    message.setmsg(msg);
    message.setsendtime(new date());
    log.info("【++++++++++++++++++ message :{}】", gson.tojson(message));
    //对 topic = hello2 的发送消息
    kafkatemplate.send("hello2",gson.tojson(message));
  }

}

2.5 定义kafkaconsumer

在监听的方法上通过注解配置一个监听器即可,另外就是指定需要监听的topic
kafka的消息再接收端会被封装成consumerrecord对象返回,它内部的value属性就是实际的消息。

@component
@slf4j
public class kafkaconsumer {


  @kafkalistener(topics = {"hello2"})
  public void listen(consumerrecord<?, ?> record) {

    optional.ofnullable(record.value())
        .ifpresent(message -> {
          log.info("【+++++++++++++++++ record = {} 】", record);
          log.info("【+++++++++++++++++ message = {}】", message);
        });
  }

}

3.测试 效果

提供一个 http接口调用 kafkasender 去发送消息

3.1 提供http 测试接口

@restcontroller
@slf4j
public class testcontroller {


  @autowired
  private kafkasender kafkasender;


  @getmapping("sendmessage/{msg}")
  public void sendmessage(@pathvariable("msg") string msg){
    kafkasender.send(msg);
  }
}

3.2 启动项目

监听8080 端口

kafkamessagelistenercontainer中有 consumer group = mygroup 有一个 监听 hello2-0 topic 的 消费者

SpringBoot集成Kafka的步骤

3.3 调用http接口

http://localhost:8080/sendmessage/kafkatestmsg

SpringBoot集成Kafka的步骤

至此 springboot集成kafka 结束 。。

以上就是springboot集成kafka的步骤的详细内容,更多关于springboot集成kafka的资料请关注其它相关文章!

相关标签: SpringBoot Kafka