spring boot整合spring-kafka实现发送接收消息实例代码
程序员文章站
2023-12-15 12:38:10
前言
由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我...
前言
由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选mq没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,zk.只是用做一些简单数据同步的话,有点大材小用.
没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大.功能一直没实现.文档写的也不是很漂亮,也可能是刚起步,有很多的问题.我这里只能放弃了,使用了spring-kafka.
实现方法
pom.xml文件如下
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>org.linuxsogood.sync</groupid> <artifactid>linuxsogood-sync</artifactid> <version>1.0.0-snapshot</version> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>1.4.0.release</version> </parent> <properties> <java.version>1.8</java.version> <!-- 依赖版本 --> <mybatis.version>3.3.1</mybatis.version> <mybatis.spring.version>1.2.4</mybatis.spring.version> <mapper.version>3.3.6</mapper.version> <pagehelper.version>4.1.1</pagehelper.version> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-jdbc</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-aop</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-freemarker</artifactid> </dependency> <!--<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-integration</artifactid> <scope>compile</scope> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-kafka</artifactid> <version>2.0.1.release</version> <scope>compile</scope> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-core</artifactid> <version>4.3.1.release</version> <scope>compile</scope> </dependency>--> <dependency> <groupid>org.springframework.kafka</groupid> <artifactid>spring-kafka</artifactid> <version>1.1.0.release</version> </dependency> <!--<dependency> <groupid>org.springframework.kafka</groupid> <artifactid>spring-kafka-test</artifactid> <version>1.1.0.release</version> </dependency>--> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupid>org.assertj</groupid> <artifactid>assertj-core</artifactid> <version>3.5.2</version> </dependency> <dependency> <groupid>org.hamcrest</groupid> <artifactid>hamcrest-all</artifactid> <version>1.3</version> <scope>test</scope> </dependency> <dependency> <groupid>org.mockito</groupid> <artifactid>mockito-all</artifactid> <version>1.9.5</version> <scope>test</scope> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-test</artifactid> <version>4.2.3.release</version> <scope>test</scope> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> <dependency> <groupid>mysql</groupid> <artifactid>mysql-connector-java</artifactid> </dependency> <dependency> <groupid>com.microsoft.sqlserver</groupid> <artifactid>sqljdbc4</artifactid> <version>4.0.0</version> </dependency> <dependency> <groupid>com.alibaba</groupid> <artifactid>druid</artifactid> <version>1.0.11</version> </dependency> <!--mybatis--> <dependency> <groupid>org.mybatis</groupid> <artifactid>mybatis</artifactid> <version>${mybatis.version}</version> </dependency> <dependency> <groupid>org.mybatis</groupid> <artifactid>mybatis-spring</artifactid> <version>${mybatis.spring.version}</version> </dependency> <!--<dependency> <groupid>org.mybatis.spring.boot</groupid> <artifactid>mybatis-spring-boot-starter</artifactid> <version>1.1.1</version> </dependency>--> <!-- mybatis generator --> <dependency> <groupid>org.mybatis.generator</groupid> <artifactid>mybatis-generator-core</artifactid> <version>1.3.2</version> <scope>compile</scope> <optional>true</optional> </dependency> <!--分页插件--> <dependency> <groupid>com.github.pagehelper</groupid> <artifactid>pagehelper</artifactid> <version>${pagehelper.version}</version> </dependency> <!--通用mapper--> <dependency> <groupid>tk.mybatis</groupid> <artifactid>mapper</artifactid> <version>${mapper.version}</version> </dependency> <dependency> <groupid>com.alibaba</groupid> <artifactid>fastjson</artifactid> <version>1.2.17</version> </dependency> </dependencies> <repositories> <repository> <id>repo.spring.io.milestone</id> <name>spring framework maven milestone repository</name> <url>https://repo.spring.io/libs-milestone</url> </repository> </repositories> <build> <finalname>mybatis_generator</finalname> <plugins> <plugin> <groupid>org.mybatis.generator</groupid> <artifactid>mybatis-generator-maven-plugin</artifactid> <version>1.3.2</version> <configuration> <verbose>true</verbose> <overwrite>true</overwrite> </configuration> </plugin> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> <configuration> <mainclass>org.linuxsogood.sync.starter</mainclass> </configuration> </plugin> </plugins> </build> </project>
orm层使用了mybatis,又使用了通用mapper和分页插件.
kafka消费端配置
import org.linuxsogood.sync.listener.listener; import org.apache.kafka.clients.consumer.consumerconfig; import org.apache.kafka.common.serialization.stringdeserializer; import org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.kafka.annotation.enablekafka; import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory; import org.springframework.kafka.config.kafkalistenercontainerfactory; import org.springframework.kafka.core.consumerfactory; import org.springframework.kafka.core.defaultkafkaconsumerfactory; import org.springframework.kafka.listener.concurrentmessagelistenercontainer; import java.util.hashmap; import java.util.map; @configuration @enablekafka public class kafkaconsumerconfig { @value("${kafka.broker.address}") private string brokeraddress; @bean kafkalistenercontainerfactory<concurrentmessagelistenercontainer<string, string>> kafkalistenercontainerfactory() { concurrentkafkalistenercontainerfactory<string, string> factory = new concurrentkafkalistenercontainerfactory<>(); factory.setconsumerfactory(consumerfactory()); factory.setconcurrency(3); factory.getcontainerproperties().setpolltimeout(3000); return factory; } @bean public consumerfactory<string, string> consumerfactory() { return new defaultkafkaconsumerfactory<>(consumerconfigs()); } @bean public map<string, object> consumerconfigs() { map<string, object> propsmap = new hashmap<>(); propsmap.put(consumerconfig.bootstrap_servers_config, this.brokeraddress); propsmap.put(consumerconfig.enable_auto_commit_config, false); propsmap.put(consumerconfig.auto_commit_interval_ms_config, "100"); propsmap.put(consumerconfig.session_timeout_ms_config, "15000"); propsmap.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class); propsmap.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class); propsmap.put(consumerconfig.group_id_config, "firehome-group"); propsmap.put(consumerconfig.auto_offset_reset_config, "earliest"); return propsmap; } @bean public listener listener() { return new listener(); } }
生产者的配置.
import org.apache.kafka.clients.producer.producerconfig; import org.apache.kafka.common.serialization.stringserializer; import org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.kafka.annotation.enablekafka; import org.springframework.kafka.core.defaultkafkaproducerfactory; import org.springframework.kafka.core.kafkatemplate; import org.springframework.kafka.core.producerfactory; import java.util.hashmap; import java.util.map; @configuration @enablekafka public class kafkaproducerconfig { @value("${kafka.broker.address}") private string brokeraddress; @bean public producerfactory<string, string> producerfactory() { return new defaultkafkaproducerfactory<>(producerconfigs()); } @bean public map<string, object> producerconfigs() { map<string, object> props = new hashmap<>(); props.put(producerconfig.bootstrap_servers_config, this.brokeraddress); props.put(producerconfig.retries_config, 0); props.put(producerconfig.batch_size_config, 16384); props.put(producerconfig.linger_ms_config, 1); props.put(producerconfig.buffer_memory_config, 33554432); props.put(producerconfig.key_serializer_class_config, stringserializer.class); props.put(producerconfig.value_serializer_class_config, stringserializer.class); return props; } @bean public kafkatemplate<string, string> kafkatemplate() { return new kafkatemplate<string, string>(producerfactory()); } }
监听,监听里面,写的就是业务逻辑了,从kafka里面得到数据后,具体怎么去处理. 如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group,即方法上的注解@kafkalistener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息,另外监听就不能处理消息了.也即是kafka的分布式消息处理方式.
在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式
import com.alibaba.fastjson.json; import org.linuxsogood.qilian.enums.cupmessagetype; import org.linuxsogood.qilian.kafka.messagewrapper; import org.linuxsogood.qilian.model.store.store; import org.linuxsogood.sync.mapper.storemapper; import org.linuxsogood.sync.model.storeexample; import org.apache.commons.lang3.stringutils; import org.apache.kafka.clients.consumer.consumerrecord; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.kafka.annotation.kafkalistener; import java.util.list; import java.util.optional; public class listener { private static final logger logger = loggerfactory.getlogger(listener.class); @autowired private storemapper storemapper; /** * 监听kafka消息,如果有消息则消费,同步数据到新烽火的库 * @param record 消息实体bean */ @kafkalistener(topics = "linuxsogood-topic", group = "sync-group") public void listen(consumerrecord<?, ?> record) { optional<?> kafkamessage = optional.ofnullable(record.value()); if (kafkamessage.ispresent()) { object message = kafkamessage.get(); try { messagewrapper messagewrapper = json.parseobject(message.tostring(), messagewrapper.class); cupmessagetype type = messagewrapper.gettype(); //判断消息的数据类型,不同的数据入不同的表 if (cupmessagetype.store == type) { proceedstore(messagewrapper); } } catch (exception e) { logger.error("将接收到的消息保存到数据库时异常, 消息:{}, 异常:{}",message.tostring(),e); } } } /** * 消息是店铺类型,店铺消息处理入库 * @param messagewrapper 从kafka中得到的消息 */ private void proceedstore(messagewrapper messagewrapper) { object data = messagewrapper.getdata(); store cupstore = json.parseobject(data.tostring(), store.class); storeexample storeexample = new storeexample(); string storename = stringutils.isblank(cupstore.getstoreoldname()) ? cupstore.getstorename() : cupstore.getstoreoldname(); storeexample.createcriteria().andstorenameequalto(storename); list<org.linuxsogood.sync.model.store> stores = storemapper.selectbyexample(storeexample); org.linuxsogood.sync.model.store convertstore = new org.linuxsogood.sync.model.store(); org.linuxsogood.sync.model.store store = convertstore.convert(cupstore); //如果查询不到记录则新增 if (stores.size() == 0) { storemapper.insert(store); } else { store.setstoreid(stores.get(0).getstoreid()); storemapper.updatebyprimarykey(store); } } }
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对的支持。