浅谈Springboot整合RocketMQ使用心得
一、阿里云官网---帮助文档
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.wwtiuh
按照官网步骤,创建topic、申请发布(生产者)、申请订阅(消费者)
二、代码
1、配置:
public class mqconfig { /** * 启动测试之前请替换如下 xxx 为您的配置 */ public static final string public_topic = "test";//公网测试 public static final string public_producer_id = "pid_scheduler"; public static final string public_consumer_id = "cid_service"; public static final string access_key = "123"; public static final string secret_key = "123"; public static final string tag = ""; public static final string thread_num = "25";//消费端线程数 /** * onsaddr 请根据不同region进行配置 * 公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet * 公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */ public static final string onsaddr = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"; }
onsaddr 阿里云用 公有云生产,测试用公网
不同的业务可以设置不同的tag,但是如果发送消息量大的话,建议新建topic
2、生产者
方式1:
配置文件:producer.xml
<?xml version="1.0" encoding="utf-8"?> <!doctype beans public "-//spring//dtd bean//en" "http://www.springframework.org/dtd/spring-beans.dtd"> <beans> <bean id="producer" class="com.aliyun.openservices.ons.api.bean.producerbean" init-method="start" destroy-method="shutdown"> <property name="properties"> <map> <entry key="producerid" value="" /> <!-- pid,请替换 --> <entry key="accesskey" value="" /> <!-- access_key,请替换 --> <entry key="secretkey" value="" /> <!-- secret_key,请替换 --> <!--propertykeyconst.onsaddr 请根据不同region进行配置 公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet 公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal --> <entry key="onsaddr" value="http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/> </map> </property> </bean> </beans>
启动方式1,在使用类的全局里设置:
//初始化生产者 private applicationcontext ctx; private producerbean producer; @value("${producerconfig.enabled}")//开关,spring配置项,true为开启,false关闭 private boolean producerconfigenabled; @postconstruct public void init(){ if (true == producerconfigenabled) { ctx = new classpathxmlapplicationcontext("producer.xml"); producer = (producerbean) ctx.getbean("producer"); } }
ps:最近发现一个坑,如果producer用上面这种方式启动的话,一旦启动的多了,会造成fullgc,所以可以换成下面这种注解方式启动,在用到的地方手动start、shutdown
方式2:配置类(不需要xml)
@configuration public class producerbeanconfig { @value("${openservices.ons.producerbean.producerid}") private string producerid; @value("${openservices.ons.producerbean.accesskey}") private string accesskey; @value("${openservices.ons.producerbean.secretkey}") private string secretkey; private producerbean producerbean; @value("${openservices.ons.producerbean.onsaddr}") private string onsaddr; @bean public producerbean oneproducer() { producerbean producerbean = new producerbean(); properties properties = new properties(); properties.setproperty(propertykeyconst.producerid, producerid); properties.setproperty(propertykeyconst.accesskey, accesskey); properties.setproperty(propertykeyconst.secretkey, secretkey); properties.setproperty(propertykeyconst.onsaddr, onsaddr); producerbean.setproperties(properties); return producerbean; } }
ps:经过这次双11发现,以上2种方式在大数据量,多线程情况下都不太适用, 性能很差,所以推荐用3
方式3:(不需要xml)
@component public class producerbeansingleton { @value("${openservices.ons.producerbean.producerid}") private string producerid; @value("${openservices.ons.producerbean.accesskey}") private string accesskey; @value("${openservices.ons.producerbean.secretkey}") private string secretkey; @value("${openservices.ons.producerbean.onsaddr}") private string onsaddr; private static producer producer; private static class singletonholder { private static final producerbeansingleton instance = new producerbeansingleton(); } private producerbeansingleton (){} public static final producerbeansingleton getinstance() { return singletonholder.instance; } @postconstruct public void init(){ // producer 实例配置初始化 properties properties = new properties(); //您在控制台创建的producer id properties.setproperty(propertykeyconst.producerid, producerid); // accesskey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.setproperty(propertykeyconst.accesskey, accesskey); // secretkey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.setproperty(propertykeyconst.secretkey, secretkey); //设置发送超时时间,单位毫秒 properties.setproperty(propertykeyconst.sendmsgtimeoutmillis, "3000"); // 设置 tcp 接入域名(此处以公共云生产环境为例) properties.setproperty(propertykeyconst.onsaddr, onsaddr); producer = onsfactory.createproducer(properties); // 在发送消息前,必须调用start方法来启动producer,只需调用一次即可 producer.start(); } public producer getproducer(){ return producer; } }
spring配置
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.mysql5dialect consumerconfig.enabled = true producerconfig.enabled = true #方式1: scheduling.enabled = false #方式2、3:rocketmq \u516c\u7f51\u914d\u7f6e openservices.ons.producerbean.producerid = pid openservices.ons.producerbean.accesskey = openservices.ons.producerbean.secretkey = openservices.ons.producerbean.onsaddr = 公网、杭州公有云生产
方式1投递消息代码:
try { string jsonc = jsonutils.tojson(elevenmessage); message message = new message(mqconfig.topic, mqconfig.tag, jsonc.getbytes()); sendresult sendresult = producer.send(message); if (sendresult != null) { logger.info(".send mq message success!”; } else { logger.warn(".sendresult is null........."); } } catch (exception e) { logger.warn("doubleelevenallpreservice"); thread.sleep(1000);//如果有异常,休眠1秒 }
方式2投递消息代码:(可以每发1000个启动/关闭一次)
producerbean.start(); try { string jsonc = jsonutils.tojson(elevenmessage); message message = new message(mqconfig.topic, mqconfig.tag, jsonc.getbytes()); sendresult sendresult = producer.send(message); if (sendresult != null) { logger.info(".send mq message success!”; } else { logger.warn(".sendresult is null........."); } } catch (exception e) { logger.warn("doubleelevenallpreservice"); thread.sleep(1000);//如果有异常,休眠1秒 } producerbean.shutdown();
方式3:投递消息
try { string jsonc = jsonutils.tojson(elevenmessage); message message = new message(mqconfig.topic, mqconfig.tag, jsonc.getbytes()); producer producer = producerbeansingleton.getinstance().getproducer(); sendresult sendresult = producer.send(message); if (sendresult != null) { logger.info("doubleelevenmidservice.send mq message success! topic is:"”; } else { logger.warn("doubleelevenmidservice.sendresult is null........."); } } catch (exception e) { logger.error("doubleelevenmidservice thread.sleep 1 s___error is "+e.getmessage(), e); thread.sleep(1000);//如果有异常,休眠1秒 }
发送消息的代码一定要捕获异常,不然会重复发送。
这里的topic用自己创建的,elevenmessage是要发送的内容,我这里是自己建的对象
3、消费者
配置启动类:
@configuration @conditionalonproperty(value = "consumerconfig.enabled", havingvalue = "true", matchifmissing = true) public class consumerconfig { private logger logger = loggerfactory.getlogger(loggerappendertype.smsdist.name()); @bean public consumer consumerfactory(){//不同消费者 这里不能重名 properties consumerproperties = new properties(); consumerproperties.setproperty(propertykeyconst.consumerid, mqconfig.consumer_id); consumerproperties.setproperty(propertykeyconst.accesskey, mqconfig.access_key); consumerproperties.setproperty(propertykeyconst.secretkey, mqconfig.secret_key); //consumerproperties.setproperty(propertykeyconst.consumethreadnums,mqconfig.thread_num); consumerproperties.setproperty(propertykeyconst.onsaddr, mqconfig.onsaddr); consumer consumer = onsfactory.createconsumer(consumerproperties); consumer.subscribe(mqconfig.topic, mqconfig.tag, new doubleelevenmessagelistener());//new对应的监听器 consumer.start(); logger.info("consumerconfig start success."); return consumer; } }
cid和onsaddr一点要选对,用自己的,消费者线程数等可以在这里配置
创建消息监听器类,消费消息:
@component public class messagelistener implements messagelistener { private logger logger = loggerfactory.getlogger("remind"); protected static elevenreposity elevenreposity; @resource public void setelevenreposity(elevenreposity elevenreposity){ messagelistener .elevenreposity=elevenreposity; } @override public action consume(message message, consumecontext consumecontext) { if(message.gettopic().equals("自己的topic")){//避免消费到其他消息 json转换报错 try { byte[] body = message.getbody(); string res = new string(body); //res 是生产者传过来的消息内容 //业务代码 }else{ logger.warn("!"); } } catch (exception e) { logger.error("messagelistener.consume error:" + e.getmessage(), e); } logger.info("messagelistener.receive message”); //如果想测试消息重投的功能,可以将action.commitmessage 替换成action.reconsumelater return action.commitmessage; }else{ logger.warn(); return action.reconsumelater; } }
注意,由于消费者是多线程的,所以对象要用static+set注入,把对象的级别提升到进程,这样多个线程就可以共用,但是无法调用父类的方法和变量
消费者状态可以查看消费者是否连接成功,消费是否延迟,消费速度等
重置消费位点可以清空所有消息
三、注意事项
1、发送的消息体 最大为256kb
2、消息最多存在3天
3、消费端默认线程数是20
4、如果运行过程中出现java挂掉或者cpu占用异常高,可以在发送消息的时候,每发送1000条让线程休息1s
5、本地测试或启动的时候,把onsaddr换成公网,不然报错无法启动
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
推荐阅读
-
从安装到使用springboot集成RocketMq4.5.2
-
从安装到使用springboot集成RocketMq4.5.2
-
浅谈ng-zorro使用心得
-
浅谈PDF.js使用心得
-
SpringBoot如何优雅的使用RocketMQ
-
SpringBoot整合Netty并使用Protobuf进行数据传输(附工程)
-
每天学点SpringCloud(一):使用SpringBoot2.0.3整合SpringCloud
-
Springboot整合Mybatis使用分页 PageHelper分页插件
-
SpringBoot整合使用tk.mybatis配置及测试
-
SpringBoot RocketMQ docker整合使用