欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spring boot与kafka集成的简单实例

程序员文章站 2024-02-28 23:06:52
本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下: 引入相关依赖

本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下:

引入相关依赖

<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter</artifactid>
</dependency>

<dependency>
  <groupid>org.springframework.kafka</groupid>
  <artifactid>spring-kafka</artifactid>
  <version>1.1.1.release</version>
</dependency>

从依赖项的引入即可看出,当前spring boot(1.4.2)还不支持完全以配置项的配置来实现与kafka的无缝集成。也就意味着必须通过java config的方式进行手工配置。

定义kafka基础配置

与redistemplate及jdbctemplate等类似。spring同样提供了org.springframework.kafka.core.kafkatemplate作为kafka相关api操作的入口。

import java.util.hashmap;
import java.util.map;

import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.common.serialization.stringserializer;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.annotation.enablekafka;
import org.springframework.kafka.core.defaultkafkaproducerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.core.producerfactory;

@configuration
@enablekafka
public class kafkaproducerconfig {

  public map<string, object> producerconfigs() {
    map<string, object> props = new hashmap<>();
    props.put(producerconfig.bootstrap_servers_config, "192.168.179.200:9092");
    props.put(producerconfig.retries_config, 0);
    props.put(producerconfig.batch_size_config, 4096);
    props.put(producerconfig.linger_ms_config, 1);
    props.put(producerconfig.buffer_memory_config, 40960);
    props.put(producerconfig.key_serializer_class_config, stringserializer.class);
    props.put(producerconfig.value_serializer_class_config, stringserializer.class);
    return props;
  }

  public producerfactory<string, string> producerfactory() {
    return new defaultkafkaproducerfactory<>(producerconfigs());
  }

  @bean
  public kafkatemplate<string, string> kafkatemplate() {
    return new kafkatemplate<string, string>(producerfactory());
  }
}

kafkatemplate依赖于producerfactory,而创建producerfactory时则通过一个map指定kafka相关配置参数。通过kafkatemplate对象即可实现消息发送。

kafkatemplate.send("test-topic", "hello");
or
kafkatemplate.send("test-topic", "key-1", "hello");

监听消息配置

import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.common.serialization.stringdeserializer;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.annotation.enablekafka;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.config.kafkalistenercontainerfactory;
import org.springframework.kafka.core.consumerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
import org.springframework.kafka.listener.concurrentmessagelistenercontainer;

import java.util.hashmap;
import java.util.map;

@configuration
@enablekafka
public class kafkaconsumerconfig {

  @bean
  public kafkalistenercontainerfactory<concurrentmessagelistenercontainer<string, string>> kafkalistenercontainerfactory() {
    concurrentkafkalistenercontainerfactory<string, string> factory = new concurrentkafkalistenercontainerfactory<>();
    factory.setconsumerfactory(consumerfactory());
    factory.setconcurrency(3);
    factory.getcontainerproperties().setpolltimeout(3000);
    return factory;
  }

  public consumerfactory<string, string> consumerfactory() {
    return new defaultkafkaconsumerfactory<>(consumerconfigs());
  }


  public map<string, object> consumerconfigs() {
    map<string, object> propsmap = new hashmap<>();
    propsmap.put(consumerconfig.bootstrap_servers_config, "192.168.179.200:9092");
    propsmap.put(consumerconfig.enable_auto_commit_config, false);
    propsmap.put(consumerconfig.auto_commit_interval_ms_config, "100");
    propsmap.put(consumerconfig.session_timeout_ms_config, "15000");
    propsmap.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
    propsmap.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);
    propsmap.put(consumerconfig.group_id_config, "test-group");
    propsmap.put(consumerconfig.auto_offset_reset_config, "latest");
    return propsmap;
  }

  @bean
  public listener listener() {
    return new listener();
  }
}

实现消息监听的最终目标是得到监听器对象。该监听器对象自行实现。

import org.apache.kafka.clients.consumer.consumerrecord;
  import org.springframework.kafka.annotation.kafkalistener;

  import java.util.optional;

  public class listener {

  @kafkalistener(topics = {"test-topic"})
  public void listen(consumerrecord<?, ?> record) {
    optional<?> kafkamessage = optional.ofnullable(record.value());
    if (kafkamessage.ispresent()) {
      object message = kafkamessage.get();
      system.out.println("listen1 " + message);
    }
  }
}

只需用@kafkalistener指定哪个方法处理消息即可。同时指定该方法用于监听kafka中哪些topic。

注意事项

定义监听消息配置时,group_id_config配置项的值用于指定消费者组的名称,如果同组中存在多个监听器对象则只有一个监听器对象能收到消息。

@kafkalistener中topics属性用于指定kafka topic名称,topic名称由消息生产者指定,也就是由kafkatemplate在发送消息时指定。

key_deserializer_class_config与value_deserializer_class_config指定key和value的编码、解码策略。kafka用key值确定value存放在哪个分区中。

后记

时间是解决问题的有效手段之一。

spring boot与kafka集成的简单实例

在spring boot 1.5版本中即可实现spring boot与kafka auto-configuration

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。