欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring Boot整合Kafka的简单用例(@KafkaListener注解)

程序员文章站 2024-03-24 16:41:10
...

第一步、启动zookeeper server和kafka server

启动zookeeper server:bin/zookeeper-server-start.sh config/zookeeper.properties
启动两个kafka server:bin/kafka-server-start.sh config/server-1.properties;
bin/kafka-server-start.sh config/server.properties
zookeeper会选举一个作为leader,另外一个作为slave

第二步、创建一个maven项目

这一篇中修改了Spring Boot的版本为2.0.0,pom.xml如下:

<dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.0.0.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.4.RELEASE</version>
        </dependency>

第三步、kafka配置

@Configuration
@EnableKafka
public class KafkaConfig {

    /* --------------producer configuration-----------------**/
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /* --------------consumer configuration-----------------**/
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }


    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean//消息监听器
    public MyListener myListener() {
        return new MyListener();
    }


    /* --------------kafka template configuration-----------------**/
    @Bean
    public KafkaTemplate<String,String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }

}

第四步、topic的配置

自动创建的topic分区数是1,复制因子是0

@Configuration
@EnableKafka
public class TopicConfig {
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic foo() {
        /第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数
        //当broker个数为1个时会创建topic失败,
        //提示:replication factor: 2 larger than available brokers: 1
        //只有在集群中才能使用kafka的备份功能
        return new NewTopic("foo", 10, (short) 2);
    }

    @Bean
    public NewTopic bar() {
        return new NewTopic("bar", 10, (short) 2);
    }

    @Bean
    public NewTopic topic1(){
        return new NewTopic("topic1", 10, (short) 2);
    }

    @Bean
    public NewTopic topic2(){
        return new NewTopic("topic2", 10, (short) 2);
    }
}

第五步、使用@KafkaListener注解

topicPartitions和topics、topicPattern不能同时使用

public class MyListener {
    @KafkaListener(id = "myContainer1",//id是消费者监听容器
            topicPartitions =//配置topic和分区:监听两个topic,分别为topic1、topic2,topic1只接收分区0,3的消息,
                    //topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为5
            { @TopicPartition(topic = "topic1", partitions = { "0", "3" }),
                    @TopicPartition(topic = "topic2", partitions = "0",
                            partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4"))
            })
    public void listen(ConsumerRecord<?, ?> record) {
        System.out.println("topic" + record.topic());
        System.out.println("key:" + record.key());
        System.out.println("value:"+record.value());
    }


    @KafkaListener(id = "myContainer2",topics = {"foo","bar"})
    public void listen2(ConsumerRecord<?, ?> record){
        System.out.println("topic:" + record.topic());
        System.out.println("key:" + record.key());
        System.out.println("value:"+record.value());
    }
}

第六步、创建发送消息的接口

@RestController
public class KafkaController {
    private final static Logger logger = LoggerFactory.getLogger(KafkaController.class);

    @Autowired
    private  KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping(value = "/{topic}/send",method = RequestMethod.GET)
    public void sendMeessageTotopic1(@PathVariable String topic,@RequestParam(value = "partition",defaultValue = "0") int partition) {
        logger.info("start send message to {}",topic);
        kafkaTemplate.send(topic,partition,"你","好");
    }
}

第七步、启动程序、调用接口

消息监听器只监听订阅的topic的特定分区的消息
Spring Boot整合Kafka的简单用例(@KafkaListener注解)
源码:https://github.com/NapWells/java_framework_learn/tree/master/springkafka2