欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

在springboot中对kafka进行读写的示例代码

程序员文章站 2024-03-31 12:38:22
springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo。 1.pom配置 只...

springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo。

1.pom配置

只需要在dependencies中增加 spring-kafka的配置即可。完整效果如下:

<parent>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-parent</artifactid>
    <version>1.5.4.release</version>
  </parent>

  <properties>
    <java.version>1.8</java.version>
     <spring-kafka.version>1.2.2.release</spring-kafka.version>
    <project.build.sourceencoding>utf-8</project.build.sourceencoding>
  </properties>

  <dependencies>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-web</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-test</artifactid>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-aop</artifactid>
    </dependency>
   <!-- spring-kafka -->
      <dependency>
      <groupid>org.springframework.kafka</groupid>
      <artifactid>spring-kafka</artifactid>
      <version>${spring-kafka.version}</version>
      </dependency>
      <dependency>
      <groupid>org.springframework.kafka</groupid>
      <artifactid>spring-kafka-test</artifactid>
      <version>${spring-kafka.version}</version>
      <scope>test</scope>
      </dependency>
   </dependencies>

2.生产者

参数配置类,其参数卸载yml文件中,通过@value注入

package com.dhb.kafka.producer;

import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.common.serialization.stringserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.core.defaultkafkaproducerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.core.producerfactory;

import java.util.hashmap;
import java.util.map;

@configuration
public class senderconfig {

  @value("${kafka.bootstrap-servers}")
  private string bootstrapservers;

  @bean
  public map<string,object> producerconfigs() {
    map<string,object> props = new hashmap<>();
    props.put(producerconfig.bootstrap_servers_config,this.bootstrapservers);
    props.put(producerconfig.key_serializer_class_config, stringserializer.class);
    props.put(producerconfig.value_serializer_class_config,stringserializer.class);
    props.put(producerconfig.acks_config,"0");
    return props;
  }

  @bean
  public producerfactory<string,string> producerfactory() {
    return new defaultkafkaproducerfactory<>(producerconfigs());
  }

  @bean
  public kafkatemplate<string,string> kafkatemplate() {
    return new kafkatemplate<string, string>(producerfactory());
  }

  @bean
  public sender sender() {
    return new sender();
  }
}

消息发送类

package com.dhb.kafka.producer;

import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.core.kafkatemplate;

@slf4j
public class sender {

  @autowired
  private kafkatemplate<string,string> kafkatemplate;

  public void send(string topic,string payload) {
    log.info("sending payload='{}' to topic='{}'",payload,topic);
    this.kafkatemplate.send(topic,payload);
  }
}

3.消费者

参数配置类

package com.dhb.kafka.consumer;

import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.common.serialization.stringdeserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.annotation.enablekafka;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.core.consumerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;

import java.util.hashmap;
import java.util.map;

@configuration
@enablekafka
public class receiverconfig {

  @value("${kafka.bootstrap-servers}")
  private string bootstrapservers;

  public map<string,object> consumerconfigs() {
    map<string,object> props = new hashmap<>();
    props.put(consumerconfig.bootstrap_servers_config,bootstrapservers);
    props.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
    props.put(consumerconfig.value_deserializer_class_config,stringdeserializer.class);
    props.put(consumerconfig.group_id_config,"helloword");
    return props;
  }

  @bean
  public consumerfactory<string,string> consumerfactory() {
    return new defaultkafkaconsumerfactory<>(consumerconfigs());
  }

  @bean
  public concurrentkafkalistenercontainerfactory<string,string> kafkalistenercontainerfactory() {
    concurrentkafkalistenercontainerfactory<string,string> factory =
        new concurrentkafkalistenercontainerfactory<>();
    factory.setconsumerfactory(consumerfactory());
    return factory;
  }

  @bean
  public receiver receiver() {
    return new receiver();
  }

}

消息接受类

package com.dhb.kafka.consumer;

import lombok.extern.slf4j.slf4j;
import org.springframework.kafka.annotation.kafkalistener;

import java.util.concurrent.countdownlatch;

@slf4j
public class receiver {

  private countdownlatch latch = new countdownlatch(1);

  public countdownlatch getlatch() {
    return latch;
  }

  @kafkalistener(topics = "${kafka.topic.helloworld}")
  public void receive(string payload) {
    log.info("received payload='{}'",payload);
    latch.countdown();
  }
}

3.web测试类

定义了一个基于http的web测试接口

package com.dhb.kafka.web;

import com.dhb.kafka.producer.sender;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.requestmethod;
import org.springframework.web.bind.annotation.restcontroller;

import javax.servlet.http.httpservletrequest;
import javax.servlet.http.httpservletresponse;
import java.io.ioexception;

@restcontroller
@slf4j
public class kafkaproducer {

  @autowired
  sender sender;

  @requestmapping(value = "/sender.action", method = requestmethod.post)
  public void exec(httpservletrequest request, httpservletresponse response,string data) throws ioexception{
    this.sender.send("testtopic",data);
    response.setcharacterencoding("utf-8");
    response.setcontenttype("text/json");
    response.getwriter().write("success");
    response.getwriter().flush();
    response.getwriter().close();
  }

}

4.启动类及配置

package com.dhb.kafka;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;

@springbootapplication
public class kafkaapplication {


  public static void main(string[] args) {
    springapplication.run(kafkaapplication.class,args);

  }
}

application.yml

kafka:
 bootstrap-servers: 192.168.162.239:9092
 topic:
  helloworld: testtopic

程序结构:

在springboot中对kafka进行读写的示例代码

包结构

5.读写测试

通过执行kafkaapplication的main方法启动程序。然后打开postman进行测试:

在springboot中对kafka进行读写的示例代码

运行后返回success

在springboot中对kafka进行读写的示例代码

生产者日志:

在springboot中对kafka进行读写的示例代码

消费者日志:

在springboot中对kafka进行读写的示例代码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。