欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SpringBoot配置双kafka

程序员文章站 2024-01-09 18:25:22
...

SpringBoot配置双kafka

最近因为工作上的需求,要将我们系统上的kafka数据,发到第三方kafka上,所以需要配置两个kafka,配置过程中,也在网上参考 了一些博客,参考博客如下

参考博客:https://blog.csdn.net/qq_16116549/article/details/95465169

首先我们要知道,springboot支持kafka的自动配置,源码如下:

package org.springframework.boot.autoconfigure.kafka

@ConfigurationProperties(
    prefix = "spring.kafka"
)
public class KafkaProperties {
    private List<String> bootstrapServers = new ArrayList(Collections.singletonList("localhost:9092"));
    private String clientId;
    private final Map<String, String> properties = new HashMap();
    private final KafkaProperties.Consumer consumer = new KafkaProperties.Consumer();
    private final KafkaProperties.Producer producer = new KafkaProperties.Producer();
.....
省略
.....
}

但是这里的需求是配置两个kafka,所以需要我们自己,通过java bean的形式进行配置

补充:以下inner前缀的代表我们自己的kafka,out前缀的代表第三方的kafka

我们需要配置的主要有一下几项:

  1. KafkaListenerContainerFactory(kafkaListenerContainerFactory监听我们服务器kafka消息,kafkaListenerContainerFactoryOutSchedule监听第三方kafka消息)
  2. KafkaTemplate(kafkaTemplate往我们kafka上发送消息,kafkaOutTemplate往第三方kafka发送消息)

准备工作

配置依赖

 <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.4.RELEASE</version>
 </dependency>

spring配置文件application.properties

#kafka配置
# 第一个kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=10.128.97.83:11202
spring.kafka.producer.acks=all
spring.kafka.consumer.group-id=DATA_0X0001
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=50
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.concurrency=10
spring.kafka.properties.max.poll.interval.ms=3600000

#kafka配置
# 第二个kafka 代理地址,可以多个
spring.outkafka.bootstrap-servers=192.168.58.10:9092
spring.outkafka.producer.acks=all
spring.outkafka.consumer.group-id=DATA_0X0002
spring.outkafka.consumer.enable-auto-commit=false
spring.outkafka.consumer.max-poll-records=50
spring.outkafka.listener.ack-mode=manual_immediate
spring.outkafka.listener.concurrency=10
spring.outkafka.properties.max.poll.interval.ms=3600000

kafka配置

创建kafkaConfig类


@SuppressWarnings("all")
@Configuration
@EnableKafka
public class KafkaConfig {

    /*------INTER_KAFKA配置------*/
    @Value("${spring.kafka.bootstrap-servers}")
    private String innerServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String innerGroupid;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String innerEnableAutoCommit;
    @Value("${spring.kafka.listener.concurrency}")
    private String innerConcurrency;
    @Value("${spring.kafka.producer.acks}")
    private String innerAcks;
    @Value("${spring.kafka.properties.max.poll.interval.ms}")
    private String innePollTimeout;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String innerPollRecogs;
    @Value("${spring.kafka.listener.ack-mode}")
    private String innerAckModel;

    /*------OUTER_KAFKA配置------*/
    @Value("${spring.outkafka.bootstrap-servers}")
    private String outServers;
    @Value("${spring.outkafka.consumer.group-id}")
    private String outGroupid;
    @Value("${spring.outkafka.consumer.enable-auto-commit}")
    private String outEnableAutoCommit;
    @Value("${spring.outkafka.listener.concurrency}")
    private String outConcurrency;
    @Value("${spring.outkafka.producer.acks}")
    private String outAcks;
    @Value("${spring.outkafka.properties.max.poll.interval.ms}")
    private String outPollTimeout;
    @Value("${spring.outkafka.consumer.max-poll-records}")
    private String outPollRecogs;
    @Value("${spring.outkafka.listener.ack-mode}")
    private String outAckModel;
    /*------INTER_KAFKA配置开始------*/
    @Bean
    @Primary /*默认的kafka配置*/
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(Integer.getInteger(innerConcurrency));
        factory.getContainerProperties().setPollTimeout(Long.parseLong(innePollTimeout));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.valueOf(innerAckModel.toUpperCase()));
        return factory;
    }
    @Bean//第一个消费者工厂的bean
    public ConsumerFactory<Integer, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, innerGroupid);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, innerEnableAutoCommit);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,innerPollRecogs);
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
//        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    //inner生产者配置
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers);
        props.put(ProducerConfig.ACKS_CONFIG, innerAcks);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    /*------INTER_KAFKA配置结束------*/


    /*------OUT_KAFKA配置开始------*/

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryOutSchedule() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryOutSchedule());
        factory.setConcurrency(Integer.getInteger(outConcurrency));
        factory.getContainerProperties().setPollTimeout(Long.parseLong(outPollTimeout));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.valueOf(outAckModel.toUpperCase()));
        return factory;
    }
    @Bean
    public ConsumerFactory<Integer, String> consumerFactoryOutSchedule() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigsOutSchedule());
    }
    @Bean
    public Map<String, Object> consumerConfigsOutSchedule() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, outGroupid);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, outEnableAutoCommit);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,outPollRecogs);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    @Bean //生产者工厂配置
    public ProducerFactory<String, String> producerOutFactory() {
        return new DefaultKafkaProducerFactory<>(senderOutProps());
    }
    @Bean //kafka发送消息模板
    public KafkaTemplate<String, String> kafkaOutTemplate() {
        return new KafkaTemplate<String, String>(producerOutFactory());
    }
    private Map<String, Object> senderOutProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers);
        props.put(ProducerConfig.ACKS_CONFIG, outAcks);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    /*------OUT_KAFKA配置结束-----*/
}

监听消息demo

监听消息中,consume方法上的@KafkaListener只指定了监听的topic,所以使用默认的kafka配置,即我们kafkaConfig中加了@Primary 的那个bean。consume2方法上,通过containerFactory = "kafkaListenerContainerFactoryOutSchedule"指定了他的factory为kafkaConfig中的kafkaListenerContainerFactoryOutSchedule这个bean,所以会监听kafkaListenerContainerFactoryOutSchedule配置的服务器信息。因此我们通过containerFactory等于不同的值,就可以监听不同的kafka集群

@Slf4j
@Component
public class KafkaSubscriber {

 
    /**
     * @param consumerRecord
     * @param ack
     * @throws ConsumeException
     */
    @Override
    @KafkaListener(topics = "DETAIL")
    public void consume(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) throws ConsumeException {
        String content = consumerRecord.value();
        if (StringUtils.isBlank(content)) {
            return;
        }
        log.info("收到订阅的kafka数据,message={}", content); 
        ack.acknowledge();
    }
    @KafkaListener(topics = "MSG",containerFactory = "kafkaListenerContainerFactoryOutSchedule")
    public void consume2(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) throws ConsumeException {
        String content = consumerRecord.value();
        log.info("收到第三方kafka数据,massage={}",content);
        ack.acknowledge();
    }
}

发送消息demo

发送消息demo中使用kafkaOutTemplate是往三方系统的kafka上发送消息,使用 kafkaTemplate则是往我们kafka上发送消息,他们分别对应的就是我们kafkaConfig中手动注入的两个bean,因为设置的bootstrap-servers不一样,所以会向不同的kafka集群发送消息

@Component
@Slf4j
public class StorageService {


    @Autowired
    private KafkaTemplate<String,String> kafkaOutTemplate;
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Transactional(rollbackFor = Exception.class)
    public void execute(Map<String,List<RecordMessage>> map){
        log.info("发送消息到第三方kafka开始------->{}",JSONObject.toJSONString(map.get("data")));
        if (!CollectionUtils.isEmpty(map.get("data"))){
            try {
                this.kafkaOutTemplate.send(TopicType.MSG.getTopicCode(),
                        JSONObject.toJSONString(map));
                log.info("发送消息到第三方kafka成功");
            } catch (Exception e) {
                log.info("发送消息到第三方kafka失败 错误 ------> {}", e.getMessage());
            }
        }

    }
}

补充,项目结构

如下图:

SpringBoot配置双kafka