Spring Boot整合Kafka的简单用例(@KafkaListener注解)
程序员文章站
2024-03-24 16:41:10
...
第一步、启动zookeeper server和kafka server
启动zookeeper server:bin/zookeeper-server-start.sh config/zookeeper.properties
启动两个kafka server:bin/kafka-server-start.sh config/server-1.properties;
bin/kafka-server-start.sh config/server.properties
zookeeper会选举一个作为leader,另外一个作为slave
第二步、创建一个maven项目
这一篇中修改了Spring Boot的版本为2.0.0,pom.xml如下:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.0.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
第三步、kafka配置
@Configuration
@EnableKafka
public class KafkaConfig {
/* --------------producer configuration-----------------**/
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/* --------------consumer configuration-----------------**/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean//消息监听器
public MyListener myListener() {
return new MyListener();
}
/* --------------kafka template configuration-----------------**/
@Bean
public KafkaTemplate<String,String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
}
第四步、topic的配置
自动创建的topic分区数是1,复制因子是0
@Configuration
@EnableKafka
public class TopicConfig {
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic foo() {
/第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数
//当broker个数为1个时会创建topic失败,
//提示:replication factor: 2 larger than available brokers: 1
//只有在集群中才能使用kafka的备份功能
return new NewTopic("foo", 10, (short) 2);
}
@Bean
public NewTopic bar() {
return new NewTopic("bar", 10, (short) 2);
}
@Bean
public NewTopic topic1(){
return new NewTopic("topic1", 10, (short) 2);
}
@Bean
public NewTopic topic2(){
return new NewTopic("topic2", 10, (short) 2);
}
}
第五步、使用@KafkaListener注解
topicPartitions和topics、topicPattern不能同时使用
public class MyListener {
@KafkaListener(id = "myContainer1",//id是消费者监听容器
topicPartitions =//配置topic和分区:监听两个topic,分别为topic1、topic2,topic1只接收分区0,3的消息,
//topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为5
{ @TopicPartition(topic = "topic1", partitions = { "0", "3" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4"))
})
public void listen(ConsumerRecord<?, ?> record) {
System.out.println("topic" + record.topic());
System.out.println("key:" + record.key());
System.out.println("value:"+record.value());
}
@KafkaListener(id = "myContainer2",topics = {"foo","bar"})
public void listen2(ConsumerRecord<?, ?> record){
System.out.println("topic:" + record.topic());
System.out.println("key:" + record.key());
System.out.println("value:"+record.value());
}
}
第六步、创建发送消息的接口
@RestController
public class KafkaController {
private final static Logger logger = LoggerFactory.getLogger(KafkaController.class);
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping(value = "/{topic}/send",method = RequestMethod.GET)
public void sendMeessageTotopic1(@PathVariable String topic,@RequestParam(value = "partition",defaultValue = "0") int partition) {
logger.info("start send message to {}",topic);
kafkaTemplate.send(topic,partition,"你","好");
}
}
第七步、启动程序、调用接口
消息监听器只监听订阅的topic的特定分区的消息
源码:https://github.com/NapWells/java_framework_learn/tree/master/springkafka2