ActiveMQ学习总结------原生实战操作(下)03
本篇将继续延续上一篇的内容,作为知识补充篇,为接下来我们学习spring整合activemq打好基础
本篇主要学习内容:
1.activemq 队列服务监听
2.activemq topic模型
回顾下上一篇activemq学习总结我们学习到了:
1.activemq术语及api介绍
2.activemq 文本消息处理
3.activemq 对象消息处理
相信大现在对activemq的一些简单操作已经很轻松掌握了
上一篇文章地址:
一 activemq实现队列服务监听
在我们上一篇的练习中,所有的消费者都是接收一次消息即断开连接,这样是不是很不方便。
试想一下,如果我们的provider在consumer接收完第一条消息后又继续发送了一条消息,那么consumer已经断开连接了,是不是就不能连接不间断的实时获取消息?
解决方案:
很容易,用我们的队列服务监听即可
注*:根据上一章的学习,大家对环境搭建使用配置,肯定都已经相当清楚了,这里就不过多阐述,直接进行代码实战
1 消息生产者
相比之下,我么你的生产者照之前是没有任何变化的,主要的变化还是在cosumer身上
package cn.arebirth.mq; import org.apache.activemq.activemqconnectionfactory; import javax.jms.*; public class activemqqueuelistenerproducer { public static void sendtextactivemq(string txt) { //定义链接工厂 connectionfactory connectionfactory = null; //定义链接对象 connection connection = null; //定义会话 session session = null; //目的地 destination destination = null; //定义消息的发送者 messageproducer producer = null; //定义消息 message message = null; try { //创建链接工厂 connectionfactory = new activemqconnectionfactory("admin", "admin", "tcp://169.254.18.20:61616"); //创建链接诶对象 connection = connectionfactory.createconnection(); //启动链接 connection.start(); //创建会话 session = connection.createsession(false, session.auto_acknowledge); //创建目的地 destination = session.createqueue("queue-listener"); //创建消息生产者 producer = session.createproducer(destination); //创建消息对象 message = session.createtextmessage(txt); //发送消息 producer.send(message); } catch (exception ex) { ex.printstacktrace(); } finally { //回收资源 if (producer != null) { try { producer.close(); } catch (jmsexception e) { e.printstacktrace(); } } if (session != null) { try { session.close(); } catch (jmsexception e) { e.printstacktrace(); } } if (connection != null) { try { connection.close(); } catch (jmsexception e) { e.printstacktrace(); } } } } }
2 消息消费者
package cn.arebirth.mq; import org.apache.activemq.activemqconnectionfactory; import javax.jms.*; public class activemqqueuelistenerconsumer { public static void receivetextactivemq() { // 定义链接工厂 connectionfactory connectionfactory = null; // 定义链接对象 connection connection = null; // 定义会话 session session = null; // 目的地 destination destination = null; // 定义消息的发送者 messageconsumer consumer = null; // 定义消息 message message = null; try { //创建链接工厂 connectionfactory = new activemqconnectionfactory("admin", "admin", "tcp://169.254.18.20:61616"); //创建链接对象 connection = connectionfactory.createconnection(); //启动链接 connection.start(); //创建会话 session = connection.createsession(false, session.auto_acknowledge); //创建目的地 destination = session.createqueue("queue-listener"); //创建消息消费者 consumer = session.createconsumer(destination); //队列服务监听 consumer.setmessagelistener(new messagelistener() { //activemq回调方法。通过该方法将消息传递到consumer @override public void onmessage(message message) { //处理消息 string msg = null; try { msg = ((textmessage) message).gettext(); } catch (jmsexception e) { e.printstacktrace(); } system.out.println("producer say:" + msg); } }); } catch (exception ex) { ex.printstacktrace(); } } }
3 测试
3.1 provider测试
package cn.arebirth.mq; public class producertest { public static void main(string[] args) { activemqqueuelistenerproducer.sendtextactivemq("hello,consumer!"); } }观察我们的控制台可以发现已经成功发布到队列
3.2 consumer测试
package cn.arebirth.mq; public class consumertest { public static void main(string[] args) { activemqqueuelistenerconsumer.receivetextactivemq(); } }我们运行后可以发现,它接收到了消息,但是它的进程并没有关闭,
我们用provider继续发布一条消息,看看consumer能不能接收到
可以看到,consumer持续在后台监听我们发布的消息,
通过上面代码,不难发现,provider没有任何改动,只是consumer修改了一部分
通过调用匿名内部类的方法来实现持续监听
consumer.setmessagelistener(new messagelistener() { @override public void onmessage(message message) { } }
注意:因为涉及到队列持续监听,所以我们不能在finally处给资源回收,否则还在监听状态,资源都回收没了,也就无从监听啦。
二 topic模型
在本系列文章第一篇也有介绍过一些topic模型的概念,那么这里我们将以原理+实战的方式来带领大家掌握
1 publish/subscribe处理模式(topic)
消息生产者(发布)消息到topic中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到topic的消息会被所有的订阅者消费,而点对点的只能是指定的消费者去消费
当生产者发布消息,不管是否有消费者,都不会保存消息,也就是说它是发完就啥也不管了那种,
所以要注意:一定要有消费者,然后在有生产者,否则生产者不发完消息什么也不管了,你消费者在生产者之后才有,那么你是接收不到消息的。
接下来我们就以实战的方式鼓捣下。
2 创建生产者
package cn.arebirth.mq; import org.apache.activemq.activemqconnectionfactory; import javax.jms.*; public class activemqtopicproducer { public static void sendtextactivemq(string txt){ //定义链接工厂 connectionfactory connectionfactory = null; //定义链接对象 connection connection = null; //定义会话 session session = null; //目的地 destination destination = null; //定义消息的发送者 messageproducer producer = null; //定义消息 message message = null; try { //创建链接工厂 connectionfactory = new activemqconnectionfactory("admin", "admin", "tcp://169.254.18.20:61616"); //创建链接诶对象 connection = connectionfactory.createconnection(); //启动链接 connection.start(); //创建会话 session = connection.createsession(false, session.auto_acknowledge); //创建目的地 destination = session.createtopic("topic-test"); //创建消息生产者 producer = session.createproducer(destination); //创建消息对象 message = session.createtextmessage(txt); //发送消息 producer.send(message); } catch (exception ex) { ex.printstacktrace(); } finally { //回收资源 if (producer != null) { try { producer.close(); } catch (jmsexception e) { e.printstacktrace(); } } if (session != null) { try { session.close(); } catch (jmsexception e) { e.printstacktrace(); } } if (connection != null) { try { connection.close(); } catch (jmsexception e) { e.printstacktrace(); } } } } }
我们可以发现,在创建目的地destination的时候代码有了变动
destination = session.createtopic("topic-test");
变成了createtopic,对这就是topic模式了。
3 创建消费者
package cn.arebirth.mq; import org.apache.activemq.activemqconnectionfactory; import javax.jms.*; public class activemqtopicconsumer implements runnable { public static void receivetextactivemq(string threadname) { // 定义链接工厂 connectionfactory connectionfactory = null; // 定义链接对象 connection connection = null; // 定义会话 session session = null; // 目的地 destination destination = null; // 定义消息的发送者 messageconsumer consumer = null; // 定义消息 message message = null; try { //创建链接工厂 connectionfactory = new activemqconnectionfactory("admin", "admin", "tcp://169.254.18.20:61616"); //创建链接对象 connection = connectionfactory.createconnection(); //启动链接 connection.start(); //创建会话 session = connection.createsession(false, session.auto_acknowledge); //创建目的地 destination = session.createtopic("topic-test"); //创建消息的消费者 consumer = session.createconsumer(destination); //服务监听 consumer.setmessagelistener(new messagelistener() { //activemq回调方法。通过该方法将消息传递到consumer @override public void onmessage(message message) { //处理消息 string msg = null; try { msg = ((textmessage) message).gettext(); } catch (jmsexception e) { e.printstacktrace(); } system.out.println(threadname + "--producer say:" + msg); } }); } catch (exception ex) { ex.printstacktrace(); } } @override public void run() { receivetextactivemq(thread.currentthread().getname()); } }
我们可以发现,在创建目的地destination的时候代码有了变动
destination = session.createtopic("topic-test");
还有实现了runnable这个是为了一会测试的时候,多线程启动,看效果,是否多个都会接受到,(如果看着糊涂的话,你也可以去掉线程的部分,单独复制多个对象,并启动,效果也是一样的)
4 测试(要先启动消费者,否则消费者是接收不到消息的!当然,你自己可以试一下)
4.1 测试消费者
package cn.arebirth.mq; public class consumertest { public static void main(string[] args) { activemqtopicconsumer a1 = new activemqtopicconsumer(); thread t1 = new thread(a1,"a1"); activemqtopicconsumer a2 = new activemqtopicconsumer(); thread t2 = new thread(a2,"a2"); activemqtopicconsumer a3 = new activemqtopicconsumer(); thread t3 = new thread(a3,"a3"); t1.start(); t2.start(); t3.start(); } }
可以看到,我们的消费者已经启动了,三个线程。并以监听服务的方式启动
4.2 测试生产者
package cn.arebirth.mq; public class producertest { public static void main(string[] args) { activemqtopicproducer.sendtextactivemq("hello,topic"); } }
可以看到,在topics下面,我们发布的内容已经有记录了
然后我们在看下,我们的consumer
可以发现,三个consumer都已经接收到了
ps:
如果你对activemq原理性的东西感到困惑,可以看下我们前面的文章:
上一篇: 沈义人吐槽iOS 14.5.1:稳定性最差 也是醉了
下一篇: 莱卡红酒价格,一款性价比超高的红酒