SpringBoot整合Kafka(3)
程序员文章站
2022-04-19 21:32:57
...
SpringBoot整合Kafka(3)
1. send发送消息
1. sendDefault发送消息
1.1 创建defaultKafkaTemplate
//kafkaTemplate实现了Kafka发送接收等功能
@Bean
@Primary
public KafkaTemplate<Integer, String> kafkaTemplate() {
KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
return template;
}
@Bean("defaultKafkaTemplate")
public KafkaTemplate<Integer, String> defaultKafkaTemplate() {
KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
template.setDefaultTopic("topic.quick.default");
r
1.2 消息生产者producer
package zhw.example.zhw.kafka;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
@Controller
@RequestMapping(value = "kafka")
public class producer {
@Resource
private KafkaTemplate defaultKafkaTemplate;
@ResponseBody
@RequestMapping(value = "sendMsg")
public void sendMsg() throws InterruptedException {
defaultKafkaTemplate.sendDefault("send default data");
return;
}
}
1.3 消息消费者consumer
package zhw.example.zhw.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class consumer {
private static final Logger log= LoggerFactory.getLogger(consumer.class);
//声明consumerID为demo,监听topicName为topic.quick.default的Topic
@KafkaListener(id = "demo", topics = "topic.quick.default")
public void listen(String msgData) {
log.info("demo receive:"+msgData);
}
}
1.4 测试
2. send带有时间戳的消息
//发送带有时间戳的消息
defaultKafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp");
3. 使用ProducerRecord发送消息
ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");
defaultKafkaTemplate.send(record);
4. 使用Message发送消息
Map map = new HashMap();
map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
map.put(KafkaHeaders.PARTITION_ID, 0);
map.put(KafkaHeaders.MESSAGE_KEY, 0);
GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
kafkaTemplate.send(message);
2. KafkaTemplate异步发送消息
2.1 消息结果回调类KafkaSendResultHandler
package zhw.example.zhw.kafka;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaSendResultHandler implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("Message send success : " + producerRecord.toString());
}
@Override
public void onError(ProducerRecord producerRecord, Exception exception) {
log.info("Message send error : " + producerRecord.toString());
}
}
2.2 消息生产者producer
package zhw.example.zhw.kafka;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Controller
@RequestMapping(value = "kafka")
public class producer {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private KafkaSendResultHandler kafkaSendResultHandler;
@ResponseBody
@RequestMapping(value = "sendMsg")
public void sendMsg() throws InterruptedException {
kafkaTemplate.setProducerListener(kafkaSendResultHandler);
//发送带有时间戳的消息
kafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp");
ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");
kafkaTemplate.send(record);
Map map = new HashMap();
map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
map.put(KafkaHeaders.PARTITION_ID, 0);
map.put(KafkaHeaders.MESSAGE_KEY, 0);
GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
kafkaTemplate.send(message);
return;
}
}
2.3 测试
3. KafkaTemplate同步发送消息
KafkaTemplate异步发送消息大大的提升了生产者的并发能力,但某些场景下我们并不需要异步发送消息,这个时候我们可以采取同步发送方式,实现也是非常简单的,我们只需要在send方法后面调用get方法
//发送带有时间戳的消息
kafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp").get();
ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");
kafkaTemplate.send(record).get();
Map map = new HashMap();
map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
map.put(KafkaHeaders.PARTITION_ID, 0);
map.put(KafkaHeaders.MESSAGE_KEY, 0);
GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
kafkaTemplate.send(message).get();
推荐阅读
-
SpringBoot中整合knife4j接口文档
-
SpringBoot缓存详解并整合Redis架构
-
springboot整合freemarker模板
-
详解SpringBoot开发案例之整合定时任务(Scheduled)
-
SpringBoot整合Elasticsearch7.2.0的实现方法
-
SpringBoot整合UEditor的示例代码
-
完整SpringBoot Cache整合redis缓存(二)
-
详解mall整合SpringBoot+MyBatis搭建基本骨架
-
SpringBoot整合Redis、ApachSolr和SpringSession的示例
-
前端笔记知识点整合之HTML5&CSS3(上)新特性&音频视频&本地存储