13、Springboot集成Kafka,最简化消息队列通信的实现
程序员文章站
2022-03-26 21:41:42
...
相关依赖的引入
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.6.RELEASE</version>
</dependency>
这里的版本号是1.0.6.RELEASE ,是以我服务器上安装的Kafka版本所决定的,如果版本不对应则会发生各种意想不到的问题,就当人类未解之谜好了,
服务器端Kafka版本为kafka_2.11-1.0.0, 根据下图,请自行根据实际情况甄别不同版本
kafka-clients的版本号,在Springboot项目运行的时候,在运行日志中可见到
编辑配置文件application.yml
spring:
kafka:
bootstrap-servers: XX.XXX.XX.XX:9092
consumer:
group-id: 0
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
batch-size: 65536
buffer-memory: 524288
创建 生产者 KafkaProducer
package com.zyr.ws.kafka;
import com.zyr.ws.utils.LogUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;
import java.util.Date;
@Component
@EnableScheduling
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
private static Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
/**
* 发送消息到Kafka
* @param channel Topic
* @param message 数据包
*/
public void sendChannelMess(String channel, String message){
ListenableFuture future = kafkaTemplate.send(channel, message);
future.addCallback(new SuccessCallback() {
@Override
public void onSuccess(Object o) {
LogUtils.writeLogger(logger, o, "发送成功");
}
}, new FailureCallback() {
@Override
public void onFailure(Throwable throwable) {
LogUtils.writeErrorException(logger, "发送异常", throwable);
}
});
}
@Scheduled(cron = "0/5 * * * * ?")
public void task() {
String message = new Date().toString();
sendChannelMess("match", message);
}
}
创建 消费者 KafkaConsumer
package com.zyr.ws.kafka;
import com.zyr.ws.utils.LogUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
/**
* 监听比赛主题,有消息就读取
* @param message 数据包
*/
@KafkaListener(topics = {"match"})
public void receiveMessage(String message){
LogUtils.writeLogger(logger, message, "接收数据");
// 后续操作待完善
}
}
在 生产者中 定义了一个每五秒发送消息的定时任务,启动项目,测试即可!!!
至于Kafka的安装,在我其它日志中有相关描述,此处省略……