在springboot中对kafka进行读写的示例代码
程序员文章站
2024-03-31 12:38:22
springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo。
1.pom配置
只...
springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo。
1.pom配置
只需要在dependencies中增加 spring-kafka的配置即可。完整效果如下:
<parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>1.5.4.release</version> </parent> <properties> <java.version>1.8</java.version> <spring-kafka.version>1.2.2.release</spring-kafka.version> <project.build.sourceencoding>utf-8</project.build.sourceencoding> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-aop</artifactid> </dependency> <!-- spring-kafka --> <dependency> <groupid>org.springframework.kafka</groupid> <artifactid>spring-kafka</artifactid> <version>${spring-kafka.version}</version> </dependency> <dependency> <groupid>org.springframework.kafka</groupid> <artifactid>spring-kafka-test</artifactid> <version>${spring-kafka.version}</version> <scope>test</scope> </dependency> </dependencies>
2.生产者
参数配置类,其参数卸载yml文件中,通过@value注入
package com.dhb.kafka.producer; import org.apache.kafka.clients.producer.producerconfig; import org.apache.kafka.common.serialization.stringserializer; import org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.kafka.core.defaultkafkaproducerfactory; import org.springframework.kafka.core.kafkatemplate; import org.springframework.kafka.core.producerfactory; import java.util.hashmap; import java.util.map; @configuration public class senderconfig { @value("${kafka.bootstrap-servers}") private string bootstrapservers; @bean public map<string,object> producerconfigs() { map<string,object> props = new hashmap<>(); props.put(producerconfig.bootstrap_servers_config,this.bootstrapservers); props.put(producerconfig.key_serializer_class_config, stringserializer.class); props.put(producerconfig.value_serializer_class_config,stringserializer.class); props.put(producerconfig.acks_config,"0"); return props; } @bean public producerfactory<string,string> producerfactory() { return new defaultkafkaproducerfactory<>(producerconfigs()); } @bean public kafkatemplate<string,string> kafkatemplate() { return new kafkatemplate<string, string>(producerfactory()); } @bean public sender sender() { return new sender(); } }
消息发送类
package com.dhb.kafka.producer; import lombok.extern.slf4j.slf4j; import org.springframework.beans.factory.annotation.autowired; import org.springframework.kafka.core.kafkatemplate; @slf4j public class sender { @autowired private kafkatemplate<string,string> kafkatemplate; public void send(string topic,string payload) { log.info("sending payload='{}' to topic='{}'",payload,topic); this.kafkatemplate.send(topic,payload); } }
3.消费者
参数配置类
package com.dhb.kafka.consumer; import org.apache.kafka.clients.consumer.consumerconfig; import org.apache.kafka.common.serialization.stringdeserializer; import org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.kafka.annotation.enablekafka; import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory; import org.springframework.kafka.core.consumerfactory; import org.springframework.kafka.core.defaultkafkaconsumerfactory; import java.util.hashmap; import java.util.map; @configuration @enablekafka public class receiverconfig { @value("${kafka.bootstrap-servers}") private string bootstrapservers; public map<string,object> consumerconfigs() { map<string,object> props = new hashmap<>(); props.put(consumerconfig.bootstrap_servers_config,bootstrapservers); props.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class); props.put(consumerconfig.value_deserializer_class_config,stringdeserializer.class); props.put(consumerconfig.group_id_config,"helloword"); return props; } @bean public consumerfactory<string,string> consumerfactory() { return new defaultkafkaconsumerfactory<>(consumerconfigs()); } @bean public concurrentkafkalistenercontainerfactory<string,string> kafkalistenercontainerfactory() { concurrentkafkalistenercontainerfactory<string,string> factory = new concurrentkafkalistenercontainerfactory<>(); factory.setconsumerfactory(consumerfactory()); return factory; } @bean public receiver receiver() { return new receiver(); } }
消息接受类
package com.dhb.kafka.consumer; import lombok.extern.slf4j.slf4j; import org.springframework.kafka.annotation.kafkalistener; import java.util.concurrent.countdownlatch; @slf4j public class receiver { private countdownlatch latch = new countdownlatch(1); public countdownlatch getlatch() { return latch; } @kafkalistener(topics = "${kafka.topic.helloworld}") public void receive(string payload) { log.info("received payload='{}'",payload); latch.countdown(); } }
3.web测试类
定义了一个基于http的web测试接口
package com.dhb.kafka.web; import com.dhb.kafka.producer.sender; import lombok.extern.slf4j.slf4j; import org.springframework.beans.factory.annotation.autowired; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.bind.annotation.requestmethod; import org.springframework.web.bind.annotation.restcontroller; import javax.servlet.http.httpservletrequest; import javax.servlet.http.httpservletresponse; import java.io.ioexception; @restcontroller @slf4j public class kafkaproducer { @autowired sender sender; @requestmapping(value = "/sender.action", method = requestmethod.post) public void exec(httpservletrequest request, httpservletresponse response,string data) throws ioexception{ this.sender.send("testtopic",data); response.setcharacterencoding("utf-8"); response.setcontenttype("text/json"); response.getwriter().write("success"); response.getwriter().flush(); response.getwriter().close(); } }
4.启动类及配置
package com.dhb.kafka; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; @springbootapplication public class kafkaapplication { public static void main(string[] args) { springapplication.run(kafkaapplication.class,args); } }
application.yml
kafka: bootstrap-servers: 192.168.162.239:9092 topic: helloworld: testtopic
程序结构:
包结构
5.读写测试
通过执行kafkaapplication的main方法启动程序。然后打开postman进行测试:
运行后返回success
生产者日志:
消费者日志:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: PHP微信刮刮卡 附微信接口
下一篇: litjson读取数据示例