SpringBoot2.0 整合 RocketMQ ,实现请求异步处理
一、rocketmq
1、架构图片
2、角色分类
(1)、broker
rocketmq 的核心,接收 producer 发过来的消息、处理 consumer 的消费消息请求、消息的持 久化存储、服务端过滤功能等 。
(2)、nameserver
消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息 。类似微服务中注册中心的服务注册,发现,下线,上线的概念。
热备份:
namserver可以部署多个,相互之间独立,其他角色同时向多个nameserver 机器上报状态信息。
心跳机制:
nameserver 中的 broker、 topic等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中,超时不上报的话, nameserver会认为某个机器出故障不可用。
(3)、producer
消息的生成者,最常用的producer类就是defaultmqproducer。
(4)、consumer
消息的消费者,常用consumer类
defaultmqpushconsumer
收到消息后自动调用传入的处理方法来处理,实时性高
defaultmqpullconsumer
用户自主控制 ,灵活性更高。
3、通信机制
(1)、broker启动后需要完成一次将自己注册至nameserver的操作;随后每隔30s时间定时向nameserver更新topic路由信息。
(2)、producer发送消息时候,需要根据消息的topic从本地缓存的获取路由信息。如果没有则更新路由信息会从nameserver重新拉取,同时producer会默认每隔30s向nameserver拉取一次路由信息。
(3)、consumer消费消息时候,从nameserver获取的路由信息,并再完成客户端的负载均衡后,监听指定消息队列获取消息并进行消费。
二、代码实现案例
1、项目结构图
版本描述
<spring-boot.version>2.1.3.release</spring-boot.version> <rocketmq.version>4.3.0</rocketmq.version>
2、配置文件
rocketmq: # 生产者配置 producer: isonoff: on # 发送同一类消息的设置为同一个group,保证唯一 groupname: feeplatgroup # 服务地址 namesrvaddr: 10.1.1.207:9876 # 消息最大长度 默认1024*4(4m) maxmessagesize: 4096 # 发送消息超时时间,默认3000 sendmsgtimeout: 3000 # 发送消息失败重试次数,默认2 retrytimeswhensendfailed: 2 # 消费者配置 consumer: isonoff: on # 官方建议:确保同一组中的每个消费者订阅相同的主题。 groupname: feeplatgroup # 服务地址 namesrvaddr: 10.1.1.207:9876 # 接收该 topic 下所有 tag topics: feeplattopic~*; consumethreadmin: 20 consumethreadmax: 64 # 设置一次消费消息的条数,默认为1条 consumemessagebatchmaxsize: 1 # 配置 group topic tag fee-plat: fee-plat-group: feeplatgroup fee-plat-topic: feeplattopic fee-account-tag: feeaccounttag
3、生产者配置
import org.apache.rocketmq.client.exception.mqclientexception; import org.apache.rocketmq.client.producer.defaultmqproducer; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; /** * rocketmq 生产者配置 */ @configuration public class producerconfig { private static final logger log = loggerfactory.getlogger(producerconfig.class) ; @value("${rocketmq.producer.groupname}") private string groupname; @value("${rocketmq.producer.namesrvaddr}") private string namesrvaddr; @value("${rocketmq.producer.maxmessagesize}") private integer maxmessagesize ; @value("${rocketmq.producer.sendmsgtimeout}") private integer sendmsgtimeout; @value("${rocketmq.producer.retrytimeswhensendfailed}") private integer retrytimeswhensendfailed; @bean public defaultmqproducer getrocketmqproducer() { defaultmqproducer producer; producer = new defaultmqproducer(this.groupname); producer.setnamesrvaddr(this.namesrvaddr); //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instancename if(this.maxmessagesize!=null){ producer.setmaxmessagesize(this.maxmessagesize); } if(this.sendmsgtimeout!=null){ producer.setsendmsgtimeout(this.sendmsgtimeout); } //如果发送消息失败,设置重试次数,默认为2次 if(this.retrytimeswhensendfailed!=null){ producer.setretrytimeswhensendfailed(this.retrytimeswhensendfailed); } try { producer.start(); } catch (mqclientexception e) { e.printstacktrace(); } return producer; } }
4、消费者配置
import org.apache.rocketmq.client.consumer.defaultmqpushconsumer; import org.apache.rocketmq.client.exception.mqclientexception; import org.apache.rocketmq.common.consumer.consumefromwhere; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import javax.annotation.resource; /** * rocketmq 消费者配置 */ @configuration public class consumerconfig { private static final logger log = loggerfactory.getlogger(consumerconfig.class) ; @value("${rocketmq.consumer.namesrvaddr}") private string namesrvaddr; @value("${rocketmq.consumer.groupname}") private string groupname; @value("${rocketmq.consumer.consumethreadmin}") private int consumethreadmin; @value("${rocketmq.consumer.consumethreadmax}") private int consumethreadmax; @value("${rocketmq.consumer.topics}") private string topics; @value("${rocketmq.consumer.consumemessagebatchmaxsize}") private int consumemessagebatchmaxsize; @resource private rocketmsglistener msglistener; @bean public defaultmqpushconsumer getrocketmqconsumer(){ defaultmqpushconsumer consumer = new defaultmqpushconsumer(groupname); consumer.setnamesrvaddr(namesrvaddr); consumer.setconsumethreadmin(consumethreadmin); consumer.setconsumethreadmax(consumethreadmax); consumer.registermessagelistener(msglistener); consumer.setconsumefromwhere(consumefromwhere.consume_from_last_offset); consumer.setconsumemessagebatchmaxsize(consumemessagebatchmaxsize); try { string[] topictagsarr = topics.split(";"); for (string topictags : topictagsarr) { string[] topictag = topictags.split("~"); consumer.subscribe(topictag[0],topictag[1]); } consumer.start(); }catch (mqclientexception e){ e.printstacktrace(); } return consumer; } }
5、消息监听配置
import com.rocket.queue.service.impl.paramconfigservice; import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlycontext; import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlystatus; import org.apache.rocketmq.client.consumer.listener.messagelistenerconcurrently; import org.apache.rocketmq.common.message.messageext; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.stereotype.component; import org.springframework.util.collectionutils; import javax.annotation.resource; import java.util.list; /** * 消息消费监听 */ @component public class rocketmsglistener implements messagelistenerconcurrently { private static final logger log = loggerfactory.getlogger(rocketmsglistener.class) ; @resource private paramconfigservice paramconfigservice ; @override public consumeconcurrentlystatus consumemessage(list<messageext> list, consumeconcurrentlycontext context) { if (collectionutils.isempty(list)){ return consumeconcurrentlystatus.consume_success; } messageext messageext = list.get(0); log.info("接受到的消息为:"+new string(messageext.getbody())); int reconsume = messageext.getreconsumetimes(); // 消息已经重试了3次,如果不需要再次消费,则返回成功 if(reconsume ==3){ return consumeconcurrentlystatus.consume_success; } if(messageext.gettopic().equals(paramconfigservice.feeplattopic)){ string tags = messageext.gettags() ; switch (tags){ case "feeaccounttag": log.info("开户 tag == >>"+tags); break ; default: log.info("未匹配到tag == >>"+tags); break; } } // 消息消费成功 return consumeconcurrentlystatus.consume_success; } }
6、配置参数绑定
import org.springframework.beans.factory.annotation.value; import org.springframework.stereotype.service; @service public class paramconfigservice { @value("${fee-plat.fee-plat-group}") public string feeplatgroup ; @value("${fee-plat.fee-plat-topic}") public string feeplattopic ; @value("${fee-plat.fee-account-tag}") public string feeaccounttag ; }
7、消息发送测试
import com.rocket.queue.service.feeplatmqservice; import org.apache.rocketmq.client.producer.defaultmqproducer; import org.apache.rocketmq.client.producer.sendresult; import org.apache.rocketmq.common.message.message; import org.springframework.stereotype.service; import javax.annotation.resource; @service public class feeplatmqserviceimpl implements feeplatmqservice { @resource private defaultmqproducer defaultmqproducer; @resource private paramconfigservice paramconfigservice ; @override public sendresult openaccountmsg(string msginfo) { // 可以不使用config中的group defaultmqproducer.setproducergroup(paramconfigservice.feeplatgroup); sendresult sendresult = null; try { message sendmsg = new message(paramconfigservice.feeplattopic, paramconfigservice.feeaccounttag, "fee_open_account_key", msginfo.getbytes()); sendresult = defaultmqproducer.send(sendmsg); } catch (exception e) { e.printstacktrace(); } return sendresult ; } }
三、项目源码
github:知了一笑 https://github.com/cicadasmile/middle-ware-parent
上一篇: 一些Java基础方面问题的总结
下一篇: 分享一个比较通用的Makefile