Springboot 整合 RocketMQ 收发消息
程序员文章站
2022-03-07 11:27:14
springboot 整合 rocketmq 收发消息创建springboot项目pom.xml添加rocketmq-spring-boot-starter依赖。...
springboot 整合 rocketmq 收发消息
创建springboot项目
pom.xml添加rocketmq-spring-boot-starter
依赖。
<dependency> <groupid>org.apache.rocketmq</groupid> <artifactid>rocketmq-spring-boot-starter</artifactid> <version>2.1.0</version> </dependency>
yml 配置
application.yml
rocketmq: name-server: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生产者组组名
rocketmq: producer: group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生产者组组名
rocketmq: producer: group: producer-demo2
测试
demo 1
- 发送普通消息
- 发送 spring 的通用 message 对象
- 发送异步消息
- 发送顺序消息
生产者
package cn.tedu.demo2.m1; import org.apache.rocketmq.client.producer.sendcallback; import org.apache.rocketmq.client.producer.sendresult; import org.apache.rocketmq.spring.core.rocketmqtemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.messaging.message; import org.springframework.messaging.support.messagebuilder; import org.springframework.stereotype.component; @component public class producer { @autowired private rocketmqtemplate t ; public void send(){ //发送同步消息 t.convertandsend("topic1:taga", "hello world! "); //发送spring的message message<string> message = messagebuilder.withpayload("hello spring message! ").build(); t.send("topic1:taga",message); //发送异步消息 t.asyncsend("topic1:taga", "hello world asyn", new sendcallback() { @override public void onsuccess(sendresult sendresult) { system.out.println("发送成功"); } @override public void onexception(throwable throwable) { system.out.println("发送失败"); } }); //发送顺序消息 t.syncsendorderly("topic1", "98456237,创建", "98456237"); t.syncsendorderly("topic1", "98456237,支付", "98456237"); t.syncsendorderly("topic1", "98456237,完成", "98456237"); } }
消费者
package cn.tedu.demo2.m1; import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener; import org.apache.rocketmq.spring.core.rocketmqlistener; import org.springframework.stereotype.component; @component @rocketmqmessagelistener(topic = "topic1",consumergroup = "consumer-demo1") public class consumer implements rocketmqlistener<string> { @override public void onmessage(string s) { system.out.println("收到"+s); } }
主类
package cn.tedu.demo2.m1; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; @springbootapplication public class main { public static void main(string[] args) { springapplication.run(main.class, args); } }
测试类
需要放在 test 文件夹
激活 demo1 profile @activeprofiles("demo1")
package cn.tedu.demo2.m1; import org.junit.jupiter.api.test; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.test.context.springboottest; import org.springframework.test.context.activeprofiles; @springboottest @activeprofiles("demo1") public class test1 { @autowired private producer producer; @test public void test1(){ producer.send(); try { thread.sleep(5000); } catch (interruptedexception e) { e.printstacktrace(); } } }
demo 2
发送事务消息
生产者
package cn.tedu.demo2.m2; import org.apache.rocketmq.spring.annotation.rocketmqtransactionlistener; import org.apache.rocketmq.spring.core.rocketmqlocaltransactionlistener; import org.apache.rocketmq.spring.core.rocketmqlocaltransactionstate; import org.apache.rocketmq.spring.core.rocketmqtemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.messaging.message; import org.springframework.messaging.support.messagebuilder; import org.springframework.stereotype.component; @component public class producer { @autowired private rocketmqtemplate t; public void send(){ message<string> message = messagebuilder.withpayload("hello world").build(); //一旦发送消息,则执行监听器 t.sendmessageintransaction("topic2",message,null); } @rocketmqtransactionlistener class lis implements rocketmqlocaltransactionlistener { @override public rocketmqlocaltransactionstate executelocaltransaction(message message, object o) { system.out.println("执行本地事务"); return rocketmqlocaltransactionstate.unknown; } @override public rocketmqlocaltransactionstate checklocaltransaction(message message) { system.out.println("执行事务回查"); return rocketmqlocaltransactionstate.commit; } } }
消费者
package cn.tedu.demo2.m2; import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener; import org.apache.rocketmq.spring.core.rocketmqlistener; import org.springframework.stereotype.component; @component @rocketmqmessagelistener(topic = "topic2",consumergroup = "consumer-demo2") public class consumer implements rocketmqlistener<string> { @override public void onmessage(string s) { system.out.println("收到"+s); } }
主类
package cn.tedu.demo2.m2; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; @springbootapplication public class main { public static void main(string[] args) { springapplication.run(main.class, args); } }
测试类
package cn.tedu.demo2.m2; import org.junit.jupiter.api.test; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.test.context.springboottest; import org.springframework.test.context.activeprofiles; @springboottest @activeprofiles("demo2") public class test2 { @autowired private producer producer; @test public void test1(){ producer.send(); //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间 try { thread.sleep(30000); } catch (interruptedexception e) { e.printstacktrace(); } } }
到此这篇关于springboot 整合 rocketmq 收发消息的文章就介绍到这了,更多相关springboot 整合 rocketmq 收发消息内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
推荐阅读
-
SpringBoot RocketMQ docker整合使用
-
springboot 整合 RocketMQ
-
SpringBoot整合阿里RocketMQ
-
springboot 整合rocketmq
-
SpringBoot RocketMQ 整合使用和监控
-
SpringBoot RocketMQ 整合使用和监控
-
rocketmq(2): springboot 整合rocketmq
-
SpringBoot 整合 RocketMQ 实现消息生产消费(RocketMQTemplate实现)
-
SpringBoot2.x系列教程59--SpringBoot整合消息队列之JMS简介
-
SpringBoot2.x系列教程63--SpringBoot整合消息队列之RabbitMQ详解