欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

ActiveMQ学习总结------原生实战操作(下)03

程序员文章站 2022-06-24 08:10:41
本篇将继续延续上一篇的内容,作为知识补充篇,为接下来我们学习spring整合ActiveMQ打好基础 本篇主要学习内容: 1.ActiveMQ 队列服务监听 2.ActiveMQ Topic模型 回顾下上一篇ActiveMQ学习总结我们学习到了: 1.ActiveMQ术语及API介绍 2.Activ ......

本篇将继续延续上一篇的内容,作为知识补充篇,为接下来我们学习spring整合activemq打好基础

本篇主要学习内容:

  1.activemq 队列服务监听

  2.activemq topic模型


 

回顾下上一篇activemq学习总结我们学习到了:

  1.activemq术语及api介绍

  2.activemq 文本消息处理

  3.activemq 对象消息处理

相信大现在对activemq的一些简单操作已经很轻松掌握了

上一篇文章地址:


 

 

一 activemq实现队列服务监听

在我们上一篇的练习中,所有的消费者都是接收一次消息即断开连接,这样是不是很不方便。

试想一下,如果我们的provider在consumer接收完第一条消息后又继续发送了一条消息,那么consumer已经断开连接了,是不是就不能连接不间断的实时获取消息?

解决方案:

  很容易,用我们的队列服务监听即可

 

*:根据上一章的学习,大家对环境搭建使用配置,肯定都已经相当清楚了,这里就不过多阐述,直接进行代码实战

 

1 消息生产者

相比之下,我么你的生产者照之前是没有任何变化的,主要的变化还是在cosumer身上

package cn.arebirth.mq;

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;

public class activemqqueuelistenerproducer {
    public static void sendtextactivemq(string txt) {
        //定义链接工厂
        connectionfactory connectionfactory = null;

        //定义链接对象
        connection connection = null;

        //定义会话
        session session = null;

        //目的地
        destination destination = null;

        //定义消息的发送者
        messageproducer producer = null;

        //定义消息
        message message = null;

        try {
            //创建链接工厂
            connectionfactory = new activemqconnectionfactory("admin", "admin", "tcp://169.254.18.20:61616");

            //创建链接诶对象
            connection = connectionfactory.createconnection();

            //启动链接
            connection.start();

            //创建会话
            session = connection.createsession(false, session.auto_acknowledge);

            //创建目的地
            destination = session.createqueue("queue-listener");

            //创建消息生产者
            producer = session.createproducer(destination);

            //创建消息对象
            message = session.createtextmessage(txt);

            //发送消息
            producer.send(message);
        } catch (exception ex) {
            ex.printstacktrace();
        } finally {
            //回收资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (jmsexception e) {
                    e.printstacktrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (jmsexception e) {
                    e.printstacktrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (jmsexception e) {
                    e.printstacktrace();
                }
            }
        }
    }
}

 

2 消息消费者

package cn.arebirth.mq;

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;

public class activemqqueuelistenerconsumer {
    public static void receivetextactivemq() {
        // 定义链接工厂
        connectionfactory connectionfactory = null;
        // 定义链接对象
        connection connection = null;
        // 定义会话
        session session = null;
        // 目的地
        destination destination = null;
        // 定义消息的发送者
        messageconsumer consumer = null;
        // 定义消息
        message message = null;

        try {
            //创建链接工厂
            connectionfactory = new activemqconnectionfactory("admin", "admin", "tcp://169.254.18.20:61616");

            //创建链接对象
            connection = connectionfactory.createconnection();

            //启动链接
            connection.start();

            //创建会话
            session = connection.createsession(false, session.auto_acknowledge);

            //创建目的地
            destination = session.createqueue("queue-listener");

            //创建消息消费者
            consumer = session.createconsumer(destination);

            //队列服务监听
            consumer.setmessagelistener(new messagelistener() {
                //activemq回调方法。通过该方法将消息传递到consumer
                @override
                public void onmessage(message message) {
                    //处理消息
                    string msg = null;
                    try {
                        msg = ((textmessage) message).gettext();
                    } catch (jmsexception e) {
                        e.printstacktrace();
                    }
                    system.out.println("producer say:" + msg);
                }
            });
        } catch (exception ex) {
            ex.printstacktrace();
        }
    }
}

 

3 测试

3.1 provider测试

package cn.arebirth.mq;

public class producertest {
    public static void main(string[] args) {
        activemqqueuelistenerproducer.sendtextactivemq("hello,consumer!");
    }
}

观察我们的控制台可以发现已经成功发布到队列

ActiveMQ学习总结------原生实战操作(下)03

 

 

 

3.2 consumer测试

package cn.arebirth.mq;

public class consumertest {
    public static void main(string[] args) {
        activemqqueuelistenerconsumer.receivetextactivemq();
    }
}

我们运行后可以发现,它接收到了消息,但是它的进程并没有关闭,

ActiveMQ学习总结------原生实战操作(下)03

 

 

我们用provider继续发布一条消息,看看consumer能不能接收到

ActiveMQ学习总结------原生实战操作(下)03

 

 

可以看到,consumer持续在后台监听我们发布的消息,

ActiveMQ学习总结------原生实战操作(下)03

 

 

 

 

 

 

通过上面代码,不难发现,provider没有任何改动,只是consumer修改了一部分

通过调用匿名内部类的方法来实现持续监听

 consumer.setmessagelistener(new messagelistener() {
    @override
                public void onmessage(message message) {

        }
    
}

注意:因为涉及到队列持续监听,所以我们不能在finally处给资源回收,否则还在监听状态,资源都回收没了,也就无从监听啦。

 


 

 

二 topic模型

在本系列文章第一篇也有介绍过一些topic模型的概念,那么这里我们将以原理+实战的方式来带领大家掌握

 

1 publish/subscribe处理模式(topic)

消息生产者(发布)消息到topic中,同时有多个消息消费者(订阅)消费该消息。

ActiveMQ学习总结------原生实战操作(下)03

和点对点方式不同,发布到topic的消息会被所有的订阅者消费,而点对点的只能是指定的消费者去消费

当生产者发布消息,不管是否有消费者,都不会保存消息,也就是说它是发完就啥也不管了那种,

所以要注意:一定要有消费者,然后在有生产者,否则生产者不发完消息什么也不管了,你消费者在生产者之后才有,那么你是接收不到消息的。

 

接下来我们就以实战的方式鼓捣下。

 

2 创建生产者

package cn.arebirth.mq;

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;

public class activemqtopicproducer {
    public static void sendtextactivemq(string txt){
        //定义链接工厂
        connectionfactory connectionfactory = null;

        //定义链接对象
        connection connection = null;

        //定义会话
        session session = null;

        //目的地
        destination destination = null;

        //定义消息的发送者
        messageproducer producer = null;

        //定义消息
        message message = null;

        try {
            //创建链接工厂
            connectionfactory = new activemqconnectionfactory("admin", "admin", "tcp://169.254.18.20:61616");

            //创建链接诶对象
            connection = connectionfactory.createconnection();

            //启动链接
            connection.start();

            //创建会话
            session = connection.createsession(false, session.auto_acknowledge);

            //创建目的地
            destination = session.createtopic("topic-test");

            //创建消息生产者
            producer = session.createproducer(destination);

            //创建消息对象
            message = session.createtextmessage(txt);

            //发送消息
            producer.send(message);
        } catch (exception ex) {
            ex.printstacktrace();
        } finally {
            //回收资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (jmsexception e) {
                    e.printstacktrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (jmsexception e) {
                    e.printstacktrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (jmsexception e) {
                    e.printstacktrace();
                }
            }
        }
    }
}

我们可以发现,在创建目的地destination的时候代码有了变动

destination = session.createtopic("topic-test");

变成了createtopic,对这就是topic模式了。

 

3 创建消费者

package cn.arebirth.mq;

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;

public class activemqtopicconsumer implements runnable {


    public static void receivetextactivemq(string threadname) {
        // 定义链接工厂
        connectionfactory connectionfactory = null;
        // 定义链接对象
        connection connection = null;
        // 定义会话
        session session = null;
        // 目的地
        destination destination = null;
        // 定义消息的发送者
        messageconsumer consumer = null;
        // 定义消息
        message message = null;

        try {
            //创建链接工厂
            connectionfactory = new activemqconnectionfactory("admin", "admin", "tcp://169.254.18.20:61616");

            //创建链接对象
            connection = connectionfactory.createconnection();

            //启动链接
            connection.start();

            //创建会话
            session = connection.createsession(false, session.auto_acknowledge);

            //创建目的地
            destination = session.createtopic("topic-test");

            //创建消息的消费者
            consumer = session.createconsumer(destination);

            //服务监听
            consumer.setmessagelistener(new messagelistener() {
                //activemq回调方法。通过该方法将消息传递到consumer
                @override
                public void onmessage(message message) {
                    //处理消息
                    string msg = null;
                    try {
                        msg = ((textmessage) message).gettext();
                    } catch (jmsexception e) {
                        e.printstacktrace();
                    }
                    system.out.println(threadname + "--producer say:" + msg);
                }
            });
        } catch (exception ex) {
            ex.printstacktrace();
        }
    }

    @override
    public void run() {
        receivetextactivemq(thread.currentthread().getname());
    }
}

 

我们可以发现,在创建目的地destination的时候代码有了变动

destination = session.createtopic("topic-test");

还有实现了runnable这个是为了一会测试的时候,多线程启动,看效果,是否多个都会接受到,(如果看着糊涂的话,你也可以去掉线程的部分,单独复制多个对象,并启动,效果也是一样的)

 

4 测试(要先启动消费者,否则消费者是接收不到消息的!当然,你自己可以试一下

4.1 测试消费者

package cn.arebirth.mq;

public class consumertest {
    public static void main(string[] args) {
        activemqtopicconsumer a1 = new activemqtopicconsumer();
        thread t1 = new thread(a1,"a1");

        activemqtopicconsumer a2 = new activemqtopicconsumer();
        thread t2 = new thread(a2,"a2");

        activemqtopicconsumer a3 = new activemqtopicconsumer();
        thread t3 = new thread(a3,"a3");

        t1.start();
        t2.start();
        t3.start();
    }
}

 

可以看到,我们的消费者已经启动了,三个线程。并以监听服务的方式启动

ActiveMQ学习总结------原生实战操作(下)03

 

 

4.2 测试生产者

package cn.arebirth.mq;

public class producertest {
    public static void main(string[] args) {
        activemqtopicproducer.sendtextactivemq("hello,topic");
    }
}

 

可以看到,在topics下面,我们发布的内容已经有记录了ActiveMQ学习总结------原生实战操作(下)03

 

 

 

然后我们在看下,我们的consumer

 

ActiveMQ学习总结------原生实战操作(下)03

 

 

可以发现,三个consumer都已经接收到了

 

ps:

  如果你对activemq原理性的东西感到困惑,可以看下我们前面的文章: