activemq消息持久化方式(activemq和kafka区别)
程序员文章站
2023-11-17 23:11:46
队列模式(点对点模式,p2p)特点:1、客户端包括生产者和消费者;2、队列中的消息只能被一个消费者消费;3、消费者可以随时消费队列中的消息;队列模式和主题模式的区别:1、提前订阅,队列模式:消费者不需...
队列模式(点对点模式,p2p)特点:
1、客户端包括生产者和消费者;
2、队列中的消息只能被一个消费者消费;
3、消费者可以随时消费队列中的消息;
队列模式和主题模式的区别:
1、提前订阅,队列模式:消费者不需要提前订阅也可以消费消息;主题模式:只有提前进行订阅的消费者才能成功消费消息;
2、多个消费者分配消息:队列模式:只能平均消费消息,被别的消费者消费的消息不能重复被其他的消费者消费;主题模式:每个订阅者都可以消费主题模式中的每一条消息;
案例代码:
生产者:
import org.apache.activemq.activemqconnectionfactory;
import javax.jms.*;
public class activemqproducer {
public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final string queue_name = "queue01";
public static void main(string[] args) throws jmsexception {
//创建连接工厂 ,,按照定的url地址给定默认的用户名和密码
activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
//通过连接工厂获取connection连接 并启动访问
connection connection = activemqconnectionfactory.createconnection();
connection.start();
//创建会话session 需要两个参数,第一个事务,第二个签收
session session = connection.createsession(false, session.auto_acknowledge);
//创建目的地(选择是队列还是主题)
queue queue = session.createqueue(queue_name);
//创建消息的生产者
messageproducer messageproducer = session.createproducer(queue);
//通过使用消息生产者messageproducer生产3条消息发送到队列中
for (int i = 1; i <= 7; i++) {
//创建消息 一个字符串消息
textmessage textmessage = session.createtextmessage("msg---->" + i);
//通过messageproducer 发布消息
messageproducer.send(textmessage);
}
//关闭资源
messageproducer.close();
session.close();
connection.close();
system.out.println("消息发送到mq成功");
}
}
消费者1:
import org.apache.activemq.activemqconnectionfactory;
import javax.jms.*;
public class activemqconsumer {
public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final string queue_name="queue01";
public static void main(string[] args) throws jmsexception {
activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
//通过连接工厂获取connection连接 并启动访问
connection connection = activemqconnectionfactory.createconnection();
connection.start();
//创建会话session 需要两个参数,第一个事务,第二个签收
session session = connection.createsession(false, session.auto_acknowledge);
//创建目的地(选择是队列还是主题)
queue queue = session.createqueue(queue_name);
//创建消息的消费者
messageconsumer messageconsumer = session.createconsumer(queue);
while (true){
//从队列中获取消息 receive未设置最大时间 是阻塞的,
textmessage textmessage = (textmessage) messageconsumer.receive();
if (textmessage !=null){
system.out.println("消费者接受到消息---->"+textmessage.gettext());
}else {
break;
}
}
messageconsumer.close();
session.close();
connection.close();
}
}
输出:
info | successfully connected to tcp://192.168.1.17:61616
消费者接受到消息---->msg---->2
消费者接受到消息---->msg---->4
消费者接受到消息---->msg---->6
消费者2:
import org.apache.activemq.activemqconnectionfactory;
import javax.jms.*;
import java.io.ioexception;
public class activemqconsumerlistener {
public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final string queue_name = "queue01";
public static void main(string[] args) throws jmsexception, ioexception {
activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
//通过连接工厂获取connection连接 并启动访问
connection connection = activemqconnectionfactory.createconnection();
connection.start();
//创建会话session 需要两个参数,第一个事务,第二个签收
session session = connection.createsession(false, session.auto_acknowledge);
//创建目的地(选择是队列还是主题)
queue queue = session.createqueue(queue_name);
//创建消息的消费者
messageconsumer messageconsumer = session.createconsumer(queue);
//通过监听的机制消费消息
messageconsumer.setmessagelistener((message) -> {
if (message != null && message instanceof textmessage) {
textmessage textmessage = (textmessage) message;
try {
system.out.println("消费者接受到消息---->" + textmessage.gettext());
} catch (jmsexception e) {
e.printstacktrace();
}
}
});
//不关闭控制台 如果不加这句话,在下面可能在连接的时候直接关闭了,造成无法消费的问题
system.in.read();
messageconsumer.close();
session.close();
connection.close();
}
}
输出:
info | successfully connected to tcp://192.168.1.17:61616
消费者接受到消息---->msg---->1
消费者接受到消息---->msg---->3
消费者接受到消息---->msg---->5
消费者接受到消息---->msg---->7
number of consumers:表示消费者数量;
number of pending messages:等待消费的消息,这个是当前未出队列的数量;
messages enqueued:进入队列的消息;( 这个数量只增不减,重启后会清零);
messages dequeued:出了队列的消息 可以理解为是消费者消费掉的数量 (重启后会清零);
持久化案例代码:
activemq持久化,生产者产生的数据,在没有被消费者消费时,先保存到数据库中,当数据被消费者消费后,再从数据库中删除。
生产者:
import org.apache.activemq.activemqconnectionfactory;
import javax.jms.*;
public class activemqproducer {
public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final string queue_name = "queue02";
public static void main(string[] args) throws jmsexception {
//创建连接工厂 ,,按照定的url地址给定默认的用户名和密码
activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
//通过连接工厂获取connection连接 并启动访问
connection connection = activemqconnectionfactory.createconnection();
connection.start();
//创建会话session 需要两个参数,第一个事务,第二个签收
session session = connection.createsession(false, session.auto_acknowledge);
//创建目的地(选择是队列还是主题)
queue queue = session.createqueue(queue_name);
//创建消息的生产者
messageproducer messageproducer = session.createproducer(queue);
// 消息持久化
messageproducer.setdeliverymode(deliverymode.persistent);
//通过使用消息生产者messageproducer生产3条消息发送到队列中
for (int i = 1; i <= 7; i++) {
//创建消息 一个字符串消息
textmessage textmessage = session.createtextmessage("msg---->" + i);
//通过messageproducer 发布消息
messageproducer.send(textmessage);
}
//关闭资源
messageproducer.close();
session.close();
connection.close();
system.out.println("消息发送到mq成功");
}
}
代码:
messageproducer.setdeliverymode(deliverymode.persistent);
消费者:
import org.apache.activemq.activemqconnectionfactory;
import javax.jms.*;
public class activemqconsumer {
public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final string queue_name="queue02";
public static void main(string[] args) throws jmsexception {
activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
//通过连接工厂获取connection连接 并启动访问
connection connection = activemqconnectionfactory.createconnection();
connection.setclientid("client-queue02-01");
connection.start();
//创建会话session 需要两个参数,第一个事务,第二个签收
session session = connection.createsession(false, session.auto_acknowledge);
//创建目的地(选择是队列还是主题)
queue queue = session.createqueue(queue_name);
//创建消息的消费者
messageconsumer messageconsumer = session.createconsumer(queue);
while (true){
//从队列中获取消息 receive未设置最大时间 是阻塞的,
textmessage textmessage = (textmessage) messageconsumer.receive();
if (textmessage !=null){
system.out.println("消费者接受到消息---->"+textmessage.gettext());
}else {
break;
}
}
messageconsumer.close();
session.close();
connection.close();
}
}
测试:
1、先运行生产者,activemqproducer
2、查看数据库:
3、在运行消费者,activemqconsumer,输出:
info | successfully connected to tcp://192.168.1.17:61616
消费者接受到消息---->msg---->1
消费者接受到消息---->msg---->2
消费者接受到消息---->msg---->3
消费者接受到消息---->msg---->4
消费者接受到消息---->msg---->5
消费者接受到消息---->msg---->6
消费者接受到消息---->msg---->7
4、再次查看数据库,消息已删除。
推荐阅读
-
activemq消息持久化方式(activemq和kafka区别)
-
消息队列的作用以及kafka和activemq的对比
-
用于处理日志的分布式消息系统Kafka(吞吐量高于ActiveMQ和RabbitMQ) activemqkafkaMQ
-
将redis发布订阅模式用做消息队列和rabbitmq的区别?Redis禁用持久化功能的设置?想想为什么要使用MQ?使用了消息队列会有什么缺点?
-
JMS服务器ActiveMQ的初体验并持久化消息到MySQL数据库中.
-
JMS服务器ActiveMQ的初体验并持久化消息到MySQL数据库中.
-
JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中_MySQL
-
消息队列的作用以及kafka和activemq的对比
-
JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中_MySQL
-
将redis发布订阅模式用做消息队列和rabbitmq的区别?Redis禁用持久化功能的设置?想想为什么要使用MQ?使用了消息队列会有什么缺点?