欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spring boot 使用kafka

程序员文章站 2022-06-03 17:59:46
启用spring-kafka 在spring boot 配置类上添加 配置kafka 在application.properties中添加配置 生产者 消费者者 项目中使用 ......
<dependency>
<groupid>org.springframework.kafka</groupid>
<artifactid>spring-kafka</artifactid>
</dependency>
spring-kafka version会使用spring boot 对应版本

启用spring-kafka

在spring boot 配置类上添加

@enablekafka

配置kafka

在application.properties中添加配置

      生产者

        

#kafka producer
spring.kafka.producer.bootstrap-servers=168.61.2.47:9092,168.61.2.48:9092,168.61.2.49:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.stringserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.bytearrayserializer
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288

      消费者者

#kafka consumer
spring.kafka.consumer.group-id=thfx00
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.bootstrap-servers=168.61.2.47:9092,168.61.2.48:9092,168.61.2.49:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.stringdeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.bytearraydeserializer

 

项目中使用

package com.htsc.thfx.kafka;

import com.alibaba.fastjson.jsonobject;
import com.google.protobuf.invalidprotocolbufferexception;
import com.htsc.mdc.model.mdsecurityrecordprotos;
import com.htsc.mdc.model.mdstockrecordprotos;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.apache.kafka.common.serialization.bytearraydeserializer;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.scheduling.annotation.scheduled;
import org.springframework.stereotype.component;

import javax.annotation.resource;

@component
public class kafkatest {
// 泛型与生产者key,value配置对应
 @resource
 kafkatemplate<string,byte[]> template;

@scheduled(fixedrate = 1000*10)
public void send() {
template.send("thfx-test00", "2", "你好00".getbytes());
template.send("thfx-test01", "2","你好01".getbytes());
}

// 泛型与消费者key,value配置对应
 @kafkalistener(topics = {"thfx-test00","thfx-test01"})
public void consumerrecord00(consumerrecord<string, byte[]> record) {
string s = new string(record.value());
system.out.println(s);
}

@kafkalistener(topics = {"pt-mdc-xshg-indextype"})
public void consumerrecord01(consumerrecord<string, byte[]> record) throws invalidprotocolbufferexception {
mdsecurityrecordprotos.mdsecurityrecord mdsecurityrecord = mdsecurityrecordprotos.mdsecurityrecord.parsefrom(record.value());
string s = mdsecurityrecord.tostring();
system.out.println("pt-mdc-xshg-indextype : " +s);
}
}