欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SpringBoot整合Kafka(3)

程序员文章站 2022-04-19 21:32:57
...

1. send发送消息

1. sendDefault发送消息

1.1 创建defaultKafkaTemplate

    //kafkaTemplate实现了Kafka发送接收等功能
    @Bean
    @Primary
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
        return template;
    }

    @Bean("defaultKafkaTemplate")
    public KafkaTemplate<Integer, String> defaultKafkaTemplate() {
        KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
        template.setDefaultTopic("topic.quick.default");
        r

1.2 消息生产者producer

package zhw.example.zhw.kafka;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;

@Controller
@RequestMapping(value = "kafka")
public class producer {

    @Resource
    private KafkaTemplate defaultKafkaTemplate;

    @ResponseBody
    @RequestMapping(value = "sendMsg")
    public void sendMsg() throws InterruptedException {
        defaultKafkaTemplate.sendDefault("send default data");
        return;
    }
}

1.3 消息消费者consumer

package zhw.example.zhw.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class consumer {
    private static final Logger log= LoggerFactory.getLogger(consumer.class);
    //声明consumerID为demo,监听topicName为topic.quick.default的Topic
    @KafkaListener(id = "demo", topics = "topic.quick.default")
    public void listen(String msgData) {
        log.info("demo receive:"+msgData);
    }
}

1.4 测试

SpringBoot整合Kafka(3)

2. send带有时间戳的消息

 //发送带有时间戳的消息
defaultKafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp");

3. 使用ProducerRecord发送消息

ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");
defaultKafkaTemplate.send(record);

4. 使用Message发送消息

Map map = new HashMap();
map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
map.put(KafkaHeaders.PARTITION_ID, 0);
map.put(KafkaHeaders.MESSAGE_KEY, 0);
GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
kafkaTemplate.send(message);

2. KafkaTemplate异步发送消息

2.1 消息结果回调类KafkaSendResultHandler

package zhw.example.zhw.kafka;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaSendResultHandler implements ProducerListener {
    private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);

    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        log.info("Message send success : " + producerRecord.toString());
    }

    @Override
    public void onError(ProducerRecord producerRecord, Exception exception) {
        log.info("Message send error : " + producerRecord.toString());
    }
}

2.2 消息生产者producer

package zhw.example.zhw.kafka;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@Controller
@RequestMapping(value = "kafka")
public class producer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Autowired
    private KafkaSendResultHandler kafkaSendResultHandler;

    @ResponseBody
    @RequestMapping(value = "sendMsg")
    public void sendMsg() throws InterruptedException {

        kafkaTemplate.setProducerListener(kafkaSendResultHandler);

        //发送带有时间戳的消息
        kafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp");
        ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");
        kafkaTemplate.send(record);

        Map map = new HashMap();
        map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
        map.put(KafkaHeaders.PARTITION_ID, 0);
        map.put(KafkaHeaders.MESSAGE_KEY, 0);
        GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
        kafkaTemplate.send(message);

        return;
    }
}


2.3 测试

SpringBoot整合Kafka(3)

3. KafkaTemplate同步发送消息

KafkaTemplate异步发送消息大大的提升了生产者的并发能力,但某些场景下我们并不需要异步发送消息,这个时候我们可以采取同步发送方式,实现也是非常简单的,我们只需要在send方法后面调用get方法

        //发送带有时间戳的消息
        kafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp").get();
        ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");
        kafkaTemplate.send(record).get();

        Map map = new HashMap();
        map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
        map.put(KafkaHeaders.PARTITION_ID, 0);
        map.put(KafkaHeaders.MESSAGE_KEY, 0);
        GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
        kafkaTemplate.send(message).get();
相关标签: # Kafka