Spring Boot集成Kafka的示例代码
本文介绍了spring boot集成kafka的示例代码,分享给大家,也给自己留个笔记
系统环境
使用远程服务器上搭建的kafka服务
- ubuntu 16.04 lts
- kafka_2.12-0.11.0.0.tgz
- zookeeper-3.5.2-alpha.tar.gz
集成过程
1.创建spring boot工程,添加相关依赖:
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>com.laravelshao.springboot</groupid> <artifactid>spring-boot-integration-kafka</artifactid> <version>0.0.1-snapshot</version> <packaging>jar</packaging> <name>spring-boot-integration-kafka</name> <description>demo project for spring boot</description> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>2.0.0.release</version> <relativepath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceencoding>utf-8</project.build.sourceencoding> <project.reporting.outputencoding>utf-8</project.reporting.outputencoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter</artifactid> </dependency> <!--kafka--> <dependency> <groupid>org.springframework.kafka</groupid> <artifactid>spring-kafka</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-json</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> </plugins> </build> </project>
2.添加配置信息,这里使用yml文件
spring: kafka: bootstrap-servers:x.x.x.x:9092 producer: value-serializer: org.springframework.kafka.support.serializer.jsonserializer consumer: group-id: test auto-offset-reset: earliest value-deserializer: org.springframework.kafka.support.serializer.jsondeserializer properties: spring: json: trusted: packages: com.laravelshao.springboot.kafka
3.创建消息对象
public class message { private integer id; private string msg; public message() { } public message(integer id, string msg) { this.id = id; this.msg = msg; } public integer getid() { return id; } public void setid(integer id) { this.id = id; } public string getmsg() { return msg; } public void setmsg(string msg) { this.msg = msg; } @override public string tostring() { return "message{" + "id=" + id + ", msg='" + msg + '\'' + '}'; } }
4.创建生产者
package com.laravelshao.springboot.kafka; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.kafka.core.kafkatemplate; import org.springframework.stereotype.component; /** * created by shaoqinghua on 2018/3/23. */ @component public class producer { private static logger log = loggerfactory.getlogger(producer.class); @autowired private kafkatemplate kafkatemplate; public void send(string topic, message message) { kafkatemplate.send(topic, message); log.info("producer->topic:{}, message:{}", topic, message); } }
5.创建消费者,使用@ kafkalistener注解监听主题
package com.laravelshao.springboot.kafka; import org.apache.kafka.clients.consumer.consumerrecord; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.kafka.annotation.kafkalistener; import org.springframework.stereotype.component; /** * created by shaoqinghua on 2018/3/23. */ @component public class consumer { private static logger log = loggerfactory.getlogger(consumer.class); @kafkalistener(topics = "test_topic") public void receive(consumerrecord<string, message> consumerrecord) { log.info("consumer->topic:{}, value:{}", consumerrecord.topic(), consumerrecord.value()); } }
6.发送消费测试
package com.laravelshao.springboot; import com.laravelshao.springboot.kafka.message; import com.laravelshao.springboot.kafka.producer; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.context.applicationcontext; @springbootapplication public class integrationkafkaapplication { public static void main(string[] args) throws interruptedexception { applicationcontext context = springapplication.run(integrationkafkaapplication.class, args); producer producer = context.getbean(producer.class); for (int i = 1; i < 10; i++) { producer.send("test_topic", new message(i, "test topic message " + i)); thread.sleep(2000); } } }
可以依次看到发送消息,消费消息
异常问题
反序列化异常(自定义的消息对象不在kafka信任的包路径下)?
[org.springframework.kafka.kafkalistenerendpointcontainer#0-0-c-1] error org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.719 container exception
org.apache.kafka.common.errors.serializationexception: error deserializing key/value for partition test_topic-0 at offset 9. if needed, please seek past the record to continue consumption.
caused by: java.lang.illegalargumentexception: the class 'com.laravelshao.springboot.kafka.message' is not in the trusted packages: [java.util, java.lang]. if you believe this class is safe to deserialize, please provide its name. if the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.defaultjackson2javatypemapper.getclassidtype(defaultjackson2javatypemapper.java:139)
at org.springframework.kafka.support.converter.defaultjackson2javatypemapper.tojavatype(defaultjackson2javatypemapper.java:113)
at org.springframework.kafka.support.serializer.jsondeserializer.deserialize(jsondeserializer.java:191)
at org.apache.kafka.clients.consumer.internals.fetcher.parserecord(fetcher.java:923)
at org.apache.kafka.clients.consumer.internals.fetcher.access$2600(fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.fetcher$partitionrecords.fetchrecords(fetcher.java:1100)
at org.apache.kafka.clients.consumer.internals.fetcher$partitionrecords.access$1200(fetcher.java:949)
at org.apache.kafka.clients.consumer.internals.fetcher.fetchrecords(fetcher.java:570)
at org.apache.kafka.clients.consumer.internals.fetcher.fetchedrecords(fetcher.java:531)
at org.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer.java:1146)
at org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer.java:1103)
at org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.run(kafkamessagelistenercontainer.java:667)
at java.util.concurrent.executors$runnableadapter.call(executors.java:511)
at java.util.concurrent.futuretask.run(futuretask.java:266)
at java.lang.thread.run(thread.java:745)
解决方法:将当前包添加到kafka信任的包路径下
spring: kafka: consumer: properties: spring: json: trusted: packages: com.laravelshao.springboot.kafka
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。