SpringBoot配置双kafka
SpringBoot配置双kafka
最近因为工作上的需求,要将我们系统上的kafka数据,发到第三方kafka上,所以需要配置两个kafka,配置过程中,也在网上参考 了一些博客,参考博客如下
参考博客:https://blog.csdn.net/qq_16116549/article/details/95465169
首先我们要知道,springboot支持kafka的自动配置,源码如下:
package org.springframework.boot.autoconfigure.kafka
@ConfigurationProperties(
prefix = "spring.kafka"
)
public class KafkaProperties {
private List<String> bootstrapServers = new ArrayList(Collections.singletonList("localhost:9092"));
private String clientId;
private final Map<String, String> properties = new HashMap();
private final KafkaProperties.Consumer consumer = new KafkaProperties.Consumer();
private final KafkaProperties.Producer producer = new KafkaProperties.Producer();
.....
省略
.....
}
但是这里的需求是配置两个kafka,所以需要我们自己,通过java bean的形式进行配置
补充:以下inner前缀的代表我们自己的kafka,out前缀的代表第三方的kafka
我们需要配置的主要有一下几项:
-
KafkaListenerContainerFactory(kafkaListenerContainerFactory监听我们服务器kafka消息,kafkaListenerContainerFactoryOutSchedule监听第三方kafka消息)
-
KafkaTemplate(kafkaTemplate往我们kafka上发送消息,kafkaOutTemplate往第三方kafka发送消息)
准备工作
配置依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
spring配置文件application.properties
#kafka配置
# 第一个kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=10.128.97.83:11202
spring.kafka.producer.acks=all
spring.kafka.consumer.group-id=DATA_0X0001
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=50
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.concurrency=10
spring.kafka.properties.max.poll.interval.ms=3600000
#kafka配置
# 第二个kafka 代理地址,可以多个
spring.outkafka.bootstrap-servers=192.168.58.10:9092
spring.outkafka.producer.acks=all
spring.outkafka.consumer.group-id=DATA_0X0002
spring.outkafka.consumer.enable-auto-commit=false
spring.outkafka.consumer.max-poll-records=50
spring.outkafka.listener.ack-mode=manual_immediate
spring.outkafka.listener.concurrency=10
spring.outkafka.properties.max.poll.interval.ms=3600000
kafka配置
创建kafkaConfig类
@SuppressWarnings("all")
@Configuration
@EnableKafka
public class KafkaConfig {
/*------INTER_KAFKA配置------*/
@Value("${spring.kafka.bootstrap-servers}")
private String innerServers;
@Value("${spring.kafka.consumer.group-id}")
private String innerGroupid;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String innerEnableAutoCommit;
@Value("${spring.kafka.listener.concurrency}")
private String innerConcurrency;
@Value("${spring.kafka.producer.acks}")
private String innerAcks;
@Value("${spring.kafka.properties.max.poll.interval.ms}")
private String innePollTimeout;
@Value("${spring.kafka.consumer.max-poll-records}")
private String innerPollRecogs;
@Value("${spring.kafka.listener.ack-mode}")
private String innerAckModel;
/*------OUTER_KAFKA配置------*/
@Value("${spring.outkafka.bootstrap-servers}")
private String outServers;
@Value("${spring.outkafka.consumer.group-id}")
private String outGroupid;
@Value("${spring.outkafka.consumer.enable-auto-commit}")
private String outEnableAutoCommit;
@Value("${spring.outkafka.listener.concurrency}")
private String outConcurrency;
@Value("${spring.outkafka.producer.acks}")
private String outAcks;
@Value("${spring.outkafka.properties.max.poll.interval.ms}")
private String outPollTimeout;
@Value("${spring.outkafka.consumer.max-poll-records}")
private String outPollRecogs;
@Value("${spring.outkafka.listener.ack-mode}")
private String outAckModel;
/*------INTER_KAFKA配置开始------*/
@Bean
@Primary /*默认的kafka配置*/
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.getInteger(innerConcurrency));
factory.getContainerProperties().setPollTimeout(Long.parseLong(innePollTimeout));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.valueOf(innerAckModel.toUpperCase()));
return factory;
}
@Bean//第一个消费者工厂的bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, innerGroupid);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, innerEnableAutoCommit);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,innerPollRecogs);
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
// props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
//inner生产者配置
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers);
props.put(ProducerConfig.ACKS_CONFIG, innerAcks);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/*------INTER_KAFKA配置结束------*/
/*------OUT_KAFKA配置开始------*/
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryOutSchedule() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryOutSchedule());
factory.setConcurrency(Integer.getInteger(outConcurrency));
factory.getContainerProperties().setPollTimeout(Long.parseLong(outPollTimeout));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.valueOf(outAckModel.toUpperCase()));
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactoryOutSchedule() {
return new DefaultKafkaConsumerFactory<>(consumerConfigsOutSchedule());
}
@Bean
public Map<String, Object> consumerConfigsOutSchedule() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, outGroupid);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, outEnableAutoCommit);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,outPollRecogs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean //生产者工厂配置
public ProducerFactory<String, String> producerOutFactory() {
return new DefaultKafkaProducerFactory<>(senderOutProps());
}
@Bean //kafka发送消息模板
public KafkaTemplate<String, String> kafkaOutTemplate() {
return new KafkaTemplate<String, String>(producerOutFactory());
}
private Map<String, Object> senderOutProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers);
props.put(ProducerConfig.ACKS_CONFIG, outAcks);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/*------OUT_KAFKA配置结束-----*/
}
监听消息demo
监听消息中,consume方法上的@KafkaListener只指定了监听的topic,所以使用默认的kafka配置,即我们kafkaConfig中加了@Primary 的那个bean。consume2方法上,通过containerFactory = "kafkaListenerContainerFactoryOutSchedule"指定了他的factory为kafkaConfig中的kafkaListenerContainerFactoryOutSchedule这个bean,所以会监听kafkaListenerContainerFactoryOutSchedule配置的服务器信息。因此我们通过containerFactory等于不同的值,就可以监听不同的kafka集群
@Slf4j
@Component
public class KafkaSubscriber {
/**
* @param consumerRecord
* @param ack
* @throws ConsumeException
*/
@Override
@KafkaListener(topics = "DETAIL")
public void consume(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) throws ConsumeException {
String content = consumerRecord.value();
if (StringUtils.isBlank(content)) {
return;
}
log.info("收到订阅的kafka数据,message={}", content);
ack.acknowledge();
}
@KafkaListener(topics = "MSG",containerFactory = "kafkaListenerContainerFactoryOutSchedule")
public void consume2(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) throws ConsumeException {
String content = consumerRecord.value();
log.info("收到第三方kafka数据,massage={}",content);
ack.acknowledge();
}
}
发送消息demo
发送消息demo中使用kafkaOutTemplate是往三方系统的kafka上发送消息,使用 kafkaTemplate则是往我们kafka上发送消息,他们分别对应的就是我们kafkaConfig中手动注入的两个bean,因为设置的bootstrap-servers不一样,所以会向不同的kafka集群发送消息
@Component
@Slf4j
public class StorageService {
@Autowired
private KafkaTemplate<String,String> kafkaOutTemplate;
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Transactional(rollbackFor = Exception.class)
public void execute(Map<String,List<RecordMessage>> map){
log.info("发送消息到第三方kafka开始------->{}",JSONObject.toJSONString(map.get("data")));
if (!CollectionUtils.isEmpty(map.get("data"))){
try {
this.kafkaOutTemplate.send(TopicType.MSG.getTopicCode(),
JSONObject.toJSONString(map));
log.info("发送消息到第三方kafka成功");
} catch (Exception e) {
log.info("发送消息到第三方kafka失败 错误 ------> {}", e.getMessage());
}
}
}
}
补充,项目结构
如下图:
推荐阅读
-
SpringBoot(二十一)IntelliJ IDEA配置Quartz启动项
-
Springboot的application.properties或者application.yml环境的指定运行与配置
-
springboot redis使用lettuce配置多数据源的实现
-
SpringBoot(二、 多环境配置文件)
-
SpringBoot配置加载,各配置文件优先级对比方式
-
Springboot2.1.x配置Activiti7单独数据源问题
-
Springboot全局异常配置超简单
-
kafka配置及启动等命令总结
-
SpringBoot开发案例之整合Kafka实现消息队列
-
Kafka系列8:一网打尽常用脚本及配置,宜收藏落灰!