SpringBoot集成RocketMQ
实战,用案例来说话
前面已经说了jms和rocketmq一些概念和安装,下面使用springboot来亲身操作一下.
生产者的操作
- springboot项目创建完成,引入依赖是第一步:
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> <dependency> <groupid>org.apache.rocketmq</groupid> <artifactid>rocketmq-client</artifactid> <version>4.3.0</version> </dependency>
- 创建生产者是第二步,生产者必须依赖于生产组,而且需要指定nameserver
@component public class payproducer { /** * 生产组,生产者必须在生产组内 */ private string producergroup = "pay_group"; /** * 端口 */ private string nameserver = "39.106.214.179:9876"; private defaultmqproducer producer; public payproducer() { producer = new defaultmqproducer(producergroup); // 指定nameserver地址,多个地址之间以 ; 隔开 producer.setnamesrvaddr(nameserver); start(); } public defaultmqproducer getproducer() { return producer; } /** * 对象在使用之前必须调用一次,并且只能初始化一次 */ public void start() { try { this.producer.start(); } catch (mqclientexception e) { e.printstacktrace(); } } /** * 一般在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown() { producer.shutdown(); } }
- 创建controller进行测试发送消息,必须要指定topic,消息依赖于主题
@restcontroller public class paycontroller { @autowired private payproducer payproducer; /** * topic,消息依赖于topic */ private static final string topic = "pay_test_topic"; @requestmapping("/api/v1/pay_cb") public object callback(string text) throws interruptedexception, remotingexception, mqclientexception, mqbrokerexception { // 创建消息 主题 二级分类 消息内容好的字节数组 message message = new message(topic, "taga", ("hello rocketmq " + text).getbytes()); sendresult send = payproducer.getproducer().send(message); system.out.println(send); return new hashmap<>(); } }
- 采坑记录
- 上面完成就可以启动项目了,访问之后报错了:
mqclientexception: no route info of this topic, topictest1 这个的原因就是broker禁止自动创建topic且用户没有通过手动方式创建topic, 或者是broker与nameserver网络不通 解决: 使用手动创建topic,在rocketmq控制台的主题中创建就好,最主要的是指定topic name,如下图 出现创建不了的情况往下看 如果还出现这个问题,请关闭防火墙
这次说下上面可能创建不了的问题,前面说了安装开放安全组,这次就是因为rocketmq虚拟的端口问题,需要开放10909,也就是说ecs最终开放的端口号: 8080,10911,9876,10909
- 继续采坑
org.apache.rocketmq.remoting.exception.remotingtoomuchrequestexception: senddefaultimpl call timeout 这个问题是阿里云服务器存在多个网卡,rocketmq会根据当前网卡选择一个ip使用,我们需要制定一个ip: 路径是: /usr/local/software/rocketmq/distribution/target/apache-rocketmq vim ./conf/broker.conf 添加配置: brokerip1=公网ip 重新启动: nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf & tail -f nohup.out
- 其他问题
https://blog.csdn.net/qq_14853889/article/details/81053145 https://blog.csdn.net/wangmx1993328/article/details/81588217#%e5%bc%82%e5%b8%b8%e8%af%b4%e6%98%8e https://www.jianshu.com/p/bfd6d849f156 https://blog.csdn.net/wangmx1993328/article/details/81588217
消费者操作
-
在前一个项目的基础上,将公共内容提取出来,创建一个jsmconfig的类,来声明公共内容:
```
public class jmsconfig {/** * 端口 */ public static final string name_server = "39.106.214.179:9876"; /** * topic,消息依赖于topic */ public static final string topic = "pay_test_topic";
}
``` -
生产者内容变为
```
@component
public class payproducer {/** * 生产组,生产者必须在生产组内 */ private string producergroup = "pay_producer_group"; private defaultmqproducer producer; public payproducer() { producer = new defaultmqproducer(producergroup); // 指定nameserver地址,多个地址之间以 ; 隔开 producer.setnamesrvaddr(jmsconfig.name_server); start(); } public defaultmqproducer getproducer() { return producer; } /** * 对象在使用之前必须调用一次,并且只能初始化一次 */ public void start() { try { this.producer.start(); } catch (mqclientexception e) { e.printstacktrace(); } } /** * 一般在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown() { producer.shutdown(); }
}
``` -
创建消费者
```
@component
public class payconsumer {private defaultmqpushconsumer consumer; private string consumergroup = "pay_consumer_group"; public payconsumer() throws mqclientexception { consumer = new defaultmqpushconsumer(consumergroup); consumer.setnamesrvaddr(jmsconfig.name_server); // 设置消费地点,从最后一个进行消费(其实就是消费策略) consumer.setconsumefromwhere(consumefromwhere.consume_from_last_offset); // 订阅主题的哪些标签 consumer.subscribe(jmsconfig.topic, "*"); // 注册监听器 consumer.registermessagelistener((messagelistenerconcurrently) (msgs, context) -> { try { // 获取message message msg = msgs.get(0); system.out.printf("%s receive new messages: %s %n", thread.currentthread().getname(), new string(msgs.get(0).getbody())); string topic = msg.gettopic(); string body = new string(msg.getbody(), "utf-8"); // 标签 string tags = msg.gettags(); string keys = msg.getkeys(); system.out.println("topic=" + topic + ", tags=" + tags + ",keys=" + keys + ", msg=" + body); return consumeconcurrentlystatus.consume_success; } catch (unsupportedencodingexception e) { e.printstacktrace(); return consumeconcurrentlystatus.reconsume_later; } }); consumer.start(); system.out.println("consumer listener"); }
}
``` -
controller的变化:
```
@restcontroller
public class paycontroller {@autowired private payproducer payproducer; @requestmapping("/api/v1/pay_cb") public object callback(string text) throws interruptedexception, remotingexception, mqclientexception, mqbrokerexception { // 创建消息 主题 二级分类 消息内容好的字节数组 message message = new message(jmsconfig.topic, "taga", ("hello rocketmq " + text).getbytes()); sendresult send = payproducer.getproducer().send(message); system.out.println(send); return new hashmap<>(); }
}
```
梳理一下整个流程,生产者存在于生产组,所以生产组很重要,创建生产者需要指定生产组.消费者同理,创建消费者也需要指定消费组. 并且二者都需要指定nameserver. 有了生产者就要发送消息,也就是message,创建message需要指定topic,二级分类和消息体等信息. 那消费者如何获取呢? 无非就是绑定topic和二级分类就可以,这就是整个流程. 中间少说了消息的存放,消息是在broker中,这个相当于仓库,所以就是生产者生产消息到broker,consumer从broker中获取消息进行消费.
上一篇: 浅谈RESTful风格下的API接口设计
下一篇: js中的数组对象排序分析