欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

13、Springboot集成Kafka,最简化消息队列通信的实现

程序员文章站 2022-03-26 21:41:42
...

相关依赖的引入

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.0.6.RELEASE</version>
</dependency>

这里的版本号是1.0.6.RELEASE ,是以我服务器上安装的Kafka版本所决定的,如果版本不对应则会发生各种意想不到的问题,就当人类未解之谜好了,

服务器端Kafka版本为kafka_2.11-1.0.0, 根据下图,请自行根据实际情况甄别不同版本

13、Springboot集成Kafka,最简化消息队列通信的实现

kafka-clients的版本号,在Springboot项目运行的时候,在运行日志中可见到

13、Springboot集成Kafka,最简化消息队列通信的实现

编辑配置文件application.yml

spring: 
    kafka:
        bootstrap-servers: XX.XXX.XX.XX:9092
        consumer:
            group-id: 0
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-deserializer: org.apache.kafka.common.serialization.StringSerializer
            batch-size: 65536
            buffer-memory: 524288

创建 生产者 KafkaProducer

package com.zyr.ws.kafka;

import com.zyr.ws.utils.LogUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;

import java.util.Date;

@Component
@EnableScheduling
public class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    private static Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
    /**
     * 发送消息到Kafka
     * @param channel Topic
     * @param message 数据包
     */
    public void sendChannelMess(String channel, String message){
        ListenableFuture future = kafkaTemplate.send(channel, message);
        future.addCallback(new SuccessCallback() {
            @Override
            public void onSuccess(Object o) {
                LogUtils.writeLogger(logger, o, "发送成功");
            }
        }, new FailureCallback() {
            @Override
            public void onFailure(Throwable throwable) {
                LogUtils.writeErrorException(logger, "发送异常", throwable);
            }
        });
    }

    @Scheduled(cron = "0/5 * * * * ?")
    public void task() {
        String message = new Date().toString();
        sendChannelMess("match", message);
    }
}

创建 消费者 KafkaConsumer

package com.zyr.ws.kafka;

import com.zyr.ws.utils.LogUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    /**
     * 监听比赛主题,有消息就读取
     * @param message 数据包
     */
    @KafkaListener(topics = {"match"})
    public void receiveMessage(String message){
        LogUtils.writeLogger(logger, message, "接收数据");
        // 后续操作待完善
    }
}

在 生产者中 定义了一个每五秒发送消息的定时任务,启动项目,测试即可!!!

至于Kafka的安装,在我其它日志中有相关描述,此处省略……

相关标签: kafka