欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring Boot集成Kafka的示例代码

程序员文章站 2023-12-04 16:56:40
本文介绍了spring boot集成kafka的示例代码,分享给大家,也给自己留个笔记 系统环境 使用远程服务器上搭建的kafka服务 ubuntu 16.0...

本文介绍了spring boot集成kafka的示例代码,分享给大家,也给自己留个笔记

系统环境

使用远程服务器上搭建的kafka服务

  1. ubuntu 16.04 lts
  2. kafka_2.12-0.11.0.0.tgz
  3. zookeeper-3.5.2-alpha.tar.gz

集成过程

1.创建spring boot工程,添加相关依赖:

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
     xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelversion>4.0.0</modelversion>

  <groupid>com.laravelshao.springboot</groupid>
  <artifactid>spring-boot-integration-kafka</artifactid>
  <version>0.0.1-snapshot</version>
  <packaging>jar</packaging>

  <name>spring-boot-integration-kafka</name>
  <description>demo project for spring boot</description>

  <parent>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-parent</artifactid>
    <version>2.0.0.release</version>
    <relativepath/> <!-- lookup parent from repository -->
  </parent>

  <properties>
    <project.build.sourceencoding>utf-8</project.build.sourceencoding>
    <project.reporting.outputencoding>utf-8</project.reporting.outputencoding>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter</artifactid>
    </dependency>
    <!--kafka-->
    <dependency>
      <groupid>org.springframework.kafka</groupid>
      <artifactid>spring-kafka</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-json</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-test</artifactid>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-maven-plugin</artifactid>
      </plugin>
    </plugins>
  </build>
</project>

2.添加配置信息,这里使用yml文件

spring:
 kafka:
  bootstrap-servers:x.x.x.x:9092
  producer:
   value-serializer: org.springframework.kafka.support.serializer.jsonserializer
  consumer:
   group-id: test
   auto-offset-reset: earliest
   value-deserializer: org.springframework.kafka.support.serializer.jsondeserializer
   properties:
    spring:
     json:
      trusted:
       packages: com.laravelshao.springboot.kafka

3.创建消息对象

public class message {
  private integer id;
  private string msg;

  public message() {
  }

  public message(integer id, string msg) {
    this.id = id;
    this.msg = msg;
  }

  public integer getid() {
    return id;
  }

  public void setid(integer id) {
    this.id = id;
  }

  public string getmsg() {
    return msg;
  }

  public void setmsg(string msg) {
    this.msg = msg;
  }

  @override
  public string tostring() {
    return "message{" +
        "id=" + id +
        ", msg='" + msg + '\'' +
        '}';
  }
}

4.创建生产者

package com.laravelshao.springboot.kafka;

import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.component;

/**
 * created by shaoqinghua on 2018/3/23.
 */
@component
public class producer {
  private static logger log = loggerfactory.getlogger(producer.class);

  @autowired
  private kafkatemplate kafkatemplate;

  public void send(string topic, message message) {
    kafkatemplate.send(topic, message);
    log.info("producer->topic:{}, message:{}", topic, message);
  }

}

5.创建消费者,使用@ kafkalistener注解监听主题

package com.laravelshao.springboot.kafka;

import org.apache.kafka.clients.consumer.consumerrecord;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.stereotype.component;

/**
 * created by shaoqinghua on 2018/3/23.
 */
@component
public class consumer {
  private static logger log = loggerfactory.getlogger(consumer.class);

  @kafkalistener(topics = "test_topic")
  public void receive(consumerrecord<string, message> consumerrecord) {
    log.info("consumer->topic:{}, value:{}", consumerrecord.topic(), consumerrecord.value());
  }

}

6.发送消费测试

package com.laravelshao.springboot;

import com.laravelshao.springboot.kafka.message;
import com.laravelshao.springboot.kafka.producer;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.context.applicationcontext;

@springbootapplication
public class integrationkafkaapplication {

  public static void main(string[] args) throws interruptedexception {
    applicationcontext context = springapplication.run(integrationkafkaapplication.class, args);
    producer producer = context.getbean(producer.class);

    for (int i = 1; i < 10; i++) {
      producer.send("test_topic", new message(i, "test topic message " + i));
      thread.sleep(2000);
    }
  }

}

可以依次看到发送消息,消费消息

Spring Boot集成Kafka的示例代码

异常问题

反序列化异常(自定义的消息对象不在kafka信任的包路径下)?

[org.springframework.kafka.kafkalistenerendpointcontainer#0-0-c-1] error org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.719 container exception
org.apache.kafka.common.errors.serializationexception: error deserializing key/value for partition test_topic-0 at offset 9. if needed, please seek past the record to continue consumption.
caused by: java.lang.illegalargumentexception: the class 'com.laravelshao.springboot.kafka.message' is not in the trusted packages: [java.util, java.lang]. if you believe this class is safe to deserialize, please provide its name. if the serialization is only done by a trusted source, you can also enable trust all (*).
 at org.springframework.kafka.support.converter.defaultjackson2javatypemapper.getclassidtype(defaultjackson2javatypemapper.java:139)
 at org.springframework.kafka.support.converter.defaultjackson2javatypemapper.tojavatype(defaultjackson2javatypemapper.java:113)
 at org.springframework.kafka.support.serializer.jsondeserializer.deserialize(jsondeserializer.java:191)
 at org.apache.kafka.clients.consumer.internals.fetcher.parserecord(fetcher.java:923)
 at org.apache.kafka.clients.consumer.internals.fetcher.access$2600(fetcher.java:93)
 at org.apache.kafka.clients.consumer.internals.fetcher$partitionrecords.fetchrecords(fetcher.java:1100)
 at org.apache.kafka.clients.consumer.internals.fetcher$partitionrecords.access$1200(fetcher.java:949)
 at org.apache.kafka.clients.consumer.internals.fetcher.fetchrecords(fetcher.java:570)
 at org.apache.kafka.clients.consumer.internals.fetcher.fetchedrecords(fetcher.java:531)
 at org.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer.java:1146)
 at org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer.java:1103)
 at org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.run(kafkamessagelistenercontainer.java:667)
 at java.util.concurrent.executors$runnableadapter.call(executors.java:511)
 at java.util.concurrent.futuretask.run(futuretask.java:266)
 at java.lang.thread.run(thread.java:745)

解决方法:将当前包添加到kafka信任的包路径下

spring:
 kafka:
  consumer:
   properties:
    spring:
     json:
      trusted:
       packages: com.laravelshao.springboot.kafka

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。