欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

activemq消息持久化方式(activemq和kafka区别)

程序员文章站 2023-11-17 23:11:46
队列模式(点对点模式,p2p)特点:1、客户端包括生产者和消费者;2、队列中的消息只能被一个消费者消费;3、消费者可以随时消费队列中的消息;队列模式和主题模式的区别:1、提前订阅,队列模式:消费者不需...

队列模式(点对点模式,p2p)特点:

1、客户端包括生产者和消费者;

2、队列中的消息只能被一个消费者消费;

3、消费者可以随时消费队列中的消息;

activemq消息持久化方式(activemq和kafka区别)

队列模式和主题模式的区别:

1、提前订阅,队列模式:消费者不需要提前订阅也可以消费消息;主题模式:只有提前进行订阅的消费者才能成功消费消息;

2、多个消费者分配消息:队列模式:只能平均消费消息,被别的消费者消费的消息不能重复被其他的消费者消费;主题模式:每个订阅者都可以消费主题模式中的每一条消息;

案例代码:

生产者:

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;

public class activemqproducer {

    public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final string queue_name = "queue01";

    public static void main(string[] args) throws jmsexception {
        //创建连接工厂 ,,按照定的url地址给定默认的用户名和密码
        activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
        //通过连接工厂获取connection连接 并启动访问
        connection connection = activemqconnectionfactory.createconnection();
        connection.start();
        //创建会话session  需要两个参数,第一个事务,第二个签收
        session session = connection.createsession(false, session.auto_acknowledge);
        //创建目的地(选择是队列还是主题)
        queue queue = session.createqueue(queue_name);
        //创建消息的生产者
        messageproducer messageproducer = session.createproducer(queue);
        //通过使用消息生产者messageproducer生产3条消息发送到队列中
        for (int i = 1; i <= 7; i++) {
            //创建消息   一个字符串消息
            textmessage textmessage = session.createtextmessage("msg---->" + i);
            //通过messageproducer 发布消息
            messageproducer.send(textmessage);
        }
        //关闭资源
        messageproducer.close();
        session.close();
        connection.close();
        system.out.println("消息发送到mq成功");
    }

}

消费者1:

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;

public class activemqconsumer {

    public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final string queue_name="queue01";
    public static void main(string[] args) throws jmsexception {
        activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
        //通过连接工厂获取connection连接 并启动访问
        connection connection = activemqconnectionfactory.createconnection();
        connection.start();
        //创建会话session  需要两个参数,第一个事务,第二个签收
        session session = connection.createsession(false, session.auto_acknowledge);
        //创建目的地(选择是队列还是主题)
        queue queue = session.createqueue(queue_name);
        //创建消息的消费者
        messageconsumer messageconsumer = session.createconsumer(queue);
        while (true){
            //从队列中获取消息  receive未设置最大时间 是阻塞的,
            textmessage textmessage = (textmessage) messageconsumer.receive();
            if (textmessage !=null){
                system.out.println("消费者接受到消息---->"+textmessage.gettext());
            }else {
                break;
            }
        }
        messageconsumer.close();
        session.close();
        connection.close();
    }

}

输出:

 info | successfully connected to tcp://192.168.1.17:61616
消费者接受到消息---->msg---->2
消费者接受到消息---->msg---->4
消费者接受到消息---->msg---->6

消费者2:

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;
import java.io.ioexception;

public class activemqconsumerlistener {

    public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final string queue_name = "queue01";

    public static void main(string[] args) throws jmsexception, ioexception {
        activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
        //通过连接工厂获取connection连接 并启动访问
        connection connection = activemqconnectionfactory.createconnection();
        connection.start();
        //创建会话session  需要两个参数,第一个事务,第二个签收
        session session = connection.createsession(false, session.auto_acknowledge);
        //创建目的地(选择是队列还是主题)
        queue queue = session.createqueue(queue_name);
        //创建消息的消费者
        messageconsumer messageconsumer = session.createconsumer(queue);
        //通过监听的机制消费消息
        messageconsumer.setmessagelistener((message) -> {
            if (message != null && message instanceof textmessage) {
                textmessage textmessage = (textmessage) message;
                try {
                    system.out.println("消费者接受到消息---->" + textmessage.gettext());
                } catch (jmsexception e) {
                    e.printstacktrace();
                }
            }
        });
        //不关闭控制台  如果不加这句话,在下面可能在连接的时候直接关闭了,造成无法消费的问题
        system.in.read();
        messageconsumer.close();
        session.close();
        connection.close();
    }

}

输出:

 info | successfully connected to tcp://192.168.1.17:61616
消费者接受到消息---->msg---->1
消费者接受到消息---->msg---->3
消费者接受到消息---->msg---->5
消费者接受到消息---->msg---->7
activemq消息持久化方式(activemq和kafka区别)

number of consumers:表示消费者数量;

number of pending messages:等待消费的消息,这个是当前未出队列的数量;

messages enqueued:进入队列的消息;( 这个数量只增不减,重启后会清零);

messages dequeued:出了队列的消息 可以理解为是消费者消费掉的数量 (重启后会清零);

持久化案例代码:

activemq持久化,生产者产生的数据,在没有被消费者消费时,先保存到数据库中,当数据被消费者消费后,再从数据库中删除。

生产者:

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;

public class activemqproducer {

    public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final string queue_name = "queue02";

    public static void main(string[] args) throws jmsexception {
        //创建连接工厂 ,,按照定的url地址给定默认的用户名和密码
        activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
        //通过连接工厂获取connection连接 并启动访问
        connection connection = activemqconnectionfactory.createconnection();
        connection.start();
        //创建会话session  需要两个参数,第一个事务,第二个签收
        session session = connection.createsession(false, session.auto_acknowledge);
        //创建目的地(选择是队列还是主题)
        queue queue = session.createqueue(queue_name);
        //创建消息的生产者
        messageproducer messageproducer = session.createproducer(queue);
        // 消息持久化
        messageproducer.setdeliverymode(deliverymode.persistent);
        //通过使用消息生产者messageproducer生产3条消息发送到队列中
        for (int i = 1; i <= 7; i++) {
            //创建消息   一个字符串消息
            textmessage textmessage = session.createtextmessage("msg---->" + i);
            //通过messageproducer 发布消息
            messageproducer.send(textmessage);
        }
        //关闭资源
        messageproducer.close();
        session.close();
        connection.close();
        system.out.println("消息发送到mq成功");
    }

}

代码:
messageproducer.setdeliverymode(deliverymode.persistent);

消费者:

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;

public class activemqconsumer {

    public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final string queue_name="queue02";
    public static void main(string[] args) throws jmsexception {
        activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
        //通过连接工厂获取connection连接 并启动访问
        connection connection = activemqconnectionfactory.createconnection();
        connection.setclientid("client-queue02-01");
        connection.start();
        //创建会话session  需要两个参数,第一个事务,第二个签收
        session session = connection.createsession(false, session.auto_acknowledge);
        //创建目的地(选择是队列还是主题)
        queue queue = session.createqueue(queue_name);
        //创建消息的消费者
        messageconsumer messageconsumer = session.createconsumer(queue);
        while (true){
            //从队列中获取消息  receive未设置最大时间 是阻塞的,
            textmessage textmessage = (textmessage) messageconsumer.receive();
            if (textmessage !=null){
                system.out.println("消费者接受到消息---->"+textmessage.gettext());
            }else {
                break;
            }
        }
        messageconsumer.close();
        session.close();
        connection.close();
    }

}

测试:

1、先运行生产者,activemqproducer

2、查看数据库:

activemq消息持久化方式(activemq和kafka区别)

3、在运行消费者,activemqconsumer,输出:

 info | successfully connected to tcp://192.168.1.17:61616
消费者接受到消息---->msg---->1
消费者接受到消息---->msg---->2
消费者接受到消息---->msg---->3
消费者接受到消息---->msg---->4
消费者接受到消息---->msg---->5
消费者接受到消息---->msg---->6
消费者接受到消息---->msg---->7

4、再次查看数据库,消息已删除。