Active MQ 消息队列
有什么用?
对于一个系统来说,少不了通信。如果要有通信的和返回就涉及到了同步和异步的问题了。
讲个小故事:
现在有AB两个人一起做生意,A主要负责收钱,B负责发货。一开始人不多的时候,A收完钱要去给B说发货,两个人合作愉快;可是有时候B不在,A收了钱以后,就要等着B回来才能给B说发货的事情。久而久之,影响到了效率。
有一天,A给B说,“老哥,咱们这样:我收钱写订单,让后我把订单放在一个盒子里。你呢发货的时候也别来找我了,直接去盒子里看有没有订单就行。”
一开始的两个人通信就是同步通信,只有A得到了B的回应才会继续记性;后来A提出的“放入盒子”就是一种异步操作。订单信息我们可以看作是数据,这个盒子就是我们的消息中间件。
当然一个这个盒子只是一个简单的模型,我们使用的时候还要考虑别的事情,像是:订单会不会丢了、订单会不会被修改等等。
小结:消息中间件键的作用小结一下集中:
- 削峰:面对大流量的时,率先开始就收消息,这样将压力从io移走,使得系统可用。
- 填谷:面对“来就来的多,要不没什么人来”的环境,中间件可以让数据处理在小流量时候依然执行。削峰填谷,可以通过消息异步的方式增强了系统面对大流量时候的适应性,将流量分摊开,更好的使用系统的资源。
- 结构改变:消息请求在消息队列中,处理的只要不停的取就可以了,不用在乎现在现在是不是大流量。使得产生请求和处理请求分离。编程时候只要面向中间件编写就行,消息提供方或者接收方的改变不会影响到另一个模块。
activeMQ的安装
解压安装就OK了
默认账号admin 默认密码admin
用java实现连接
添加xml依赖
<!--本地的版本对应-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.3</version>
</dependency>
<!--spring的activemq的支持-->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.17</version>
</dependency>
JMS规范
什么是jms
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。------来自百度百科
jms的编码架构
图片来自百度
消息发送分两类,一类是点对点的消息队列queue,另一个是点对多的订阅 主题topic。
基本使用(非持久化)
队列queue
成员:发送方、接收方、发送队列、消息
特点:发送方和接收方互相独立,各自运行各自的。同时消息被消费后不会就消失(有别于订阅)。
模式:这种模式更像是生产者消费者模式。------生产者往“中介”放入数据,消费者消费“中介”的数据。
所以java实现就存在两个身份即 生产者(provider)和消费者(customer)。
- 配置地址
- 连接工厂 & 创建连接 (端口61616)
- 创建一个会话(session)
- 创建目的地(queue/topic)
- 确定身份(provide还是customer)
- 确定消息的类型(它们分别携带:
- 简单文本(TextMessage)、
- 可序列化的对象 (ObjectMessage)、
- 属性集合 (MapMessage)、
- 字节流 (BytesMessage)、
- 原始值流 (StreamMessage),
- 还有无有效负载的消息 (Message)。)
- 放入消息
- 发送消息
- 消息关闭
代码如下:
public class JMSProvider {
// 1、消息发送到哪里
// 39.105.61.80:8161 控制板
// 39.105.61.80:61616 程序通信tcp协议
// public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
private final static String ACTIVE_URL = "tcp://127.0.0.1:61616";
private final static int TEST_TIME = 5;
public static void main(String[] args) throws JMSException {
// 建立一个连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 建立连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建 主题还是队列
Queue ubw = session.createQueue("ubw");
// session.createTopic("ubw");
// 确定生产者
MessageProducer producer = session.createProducer(ubw);
// MessageConsumer consumer = session.createConsumer(ubw);
// 创建类型
TextMessage textMessage = session.createTextMessage();
// 输入值
// 注意不能写成这样 一个textMessage算一个消息
// for(int i = 0; i < TEST_TIME ; i++){
// textMessage.setText(i+"");
// }
for (int i = 0; i < TEST_TIME; i++) {
textMessage.setText(i+"");
// 提交
producer.send(textMessage);
}
// 关闭资源
producer.close();
session.close();
connection.close();
}
}
消费发送成功能看到下面的这个图:
因为只有生产者没有消费者,我们再写一个消费者。
只是session创建的时候创建一个消费者就行
public static void main(String[] args) throws JMSException {
// 建立一个连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 建立连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建 主题还是队列
Queue ubw = session.createQueue("ubw");
// session.createTopic("ubw");
// 确定生产者
// MessageProducer producer = session.createProducer(ubw);
MessageConsumer consumer = session.createConsumer(ubw);
// 第一种方式 循环阻塞的等待
while (true){
TextMessage receive = (TextMessage)consumer.receive();
String text = receive.getText();
if(StringUtils.isEmpty(text)){
break;
}
System.out.println("消息是: " + text);
}
// 第二种方式 监听状态 这种多用于主题订阅 订阅里用的是这个方法
consumer.setMessageListener((message)->{
if (message!=null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到主题消息"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 关闭资源
consumer.close();
session.close();
connection.close();
}
结果为:
第一种是循环阻塞的等待
consumer.receive(1000)
这里等待1s后就关闭
主题topic
订阅模式
生产者
// 1、消息发送到哪里
// 39.105.61.80:8161 控制板
// 39.105.61.80:61616 程序通信tcp协议
// public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
private final static String ACTIVE_URL = "tcp://127.0.0.1:61616";
private final static int TEST_TIME = 5;
public static void main(String[] args) throws JMSException {
// 建立一个连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 建立连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建 主题还是队列
// Queue ubw = session.createQueue("ubw-topic");
Topic ubw = session.createTopic("ubw-topic");
// 确定生产者
MessageProducer producer = session.createProducer(ubw);
// 创建类型
TextMessage textMessage = session.createTextMessage();
for (int i = 0; i < TEST_TIME; i++) {
textMessage.setText(i+"");
// 提交
producer.send(textMessage);
}
// 关闭资源
producer.close();
session.close();
connection.close();
}
消费者
// 1、消息发送到哪里
// 39.105.61.80:8161 控制板
// 39.105.61.80:61616 程序通信tcp协议
// public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
private final static String ACTIVE_URL = "tcp://127.0.0.1:61616";
private final static int TEST_TIME = 5;
public static void main(String[] args) throws JMSException {
// 建立一个连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 建立连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建 主题还是队列
// Queue ubw = session.createQueue("ubw-topic");
Topic ubw = session.createTopic("ubw-topic");
// 确定生产者
// MessageProducer producer = session.createProducer(ubw);
MessageConsumer consumer = session.createConsumer(ubw);
//
consumer.setMessageListener((message)->{
if (message!=null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到主题消息"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 卡住等待
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
// 关闭资源
consumer.close();
session.close();
connection.close();
}
注意:这里一定要先监听,后发送,如果顺序反了就不会接收到消息。
持久化
持久化,顾名思义就是将消息队列里面的数据进行持久化,也就是当消息队列挂了重启以后队列中的数据不会消失。
我们先看看java代码里面怎么实现:
queue持久化
// 1、消息发送到哪里
// 39.105.61.80:8161 控制板
// 39.105.61.80:61616 程序通信tcp协议
// public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
private final static String ACTIVE_URL = "tcp://127.0.0.1:61616";
private final static int TEST_TIME = 5;
public static void main(String[] args) throws JMSException {
// 建立一个连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 建立连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue ubw = session.createQueue("ubw-persistence");
MessageProducer producer = session.createProducer(ubw);
// 创建类型
TextMessage textMessage = session.createTextMessage();
for (int i = 0; i < TEST_TIME; i++) {
textMessage.setText(i+"");
// 在这里持久化
textMessage.setJMSDeliveryMode(Message.DEFAULT_PRIORITY);
// 提交
producer.send(textMessage);
}
// 关闭资源
producer.close();
session.close();
connection.close();
}
重启activeMQ以后启动消费者依然可以接收到消息。
topic持久化
主题的持久化有别的与队列,并不是发送的消息持久化,客户端随时上来就能取到。
主题(topic)客户端的持久化是需要订阅的。
消费者
// 1、消息发送到哪里
// 39.105.61.80:8161 控制板
// 39.105.61.80:61616 程序通信tcp协议
// public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
private final static String ACTIVE_URL = "tcp://127.0.0.1:61616";
private final static int TEST_TIME = 5;
public static void main(String[] args) throws JMSException {
// 建立一个连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 建立连接
Connection connection = activeMQConnectionFactory.createConnection();
// 注意这个id要写在创建session会话的前面
connection.setClientID("clientID");
// 创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建 主题还是队列
Topic ubw = session.createTopic("ubw-topic-p");
// 消费者的topic模式下,只能用订阅的方式进行持久化,
// 即才能实现即便是离线了,上线以后还是能看到消息
// 设置完持久化在连接
// 确定生产者
MessageConsumer consumer = session.createConsumer(ubw);
// Subscriber 设置name 注意这里是用session创建订阅
TopicSubscriber sessionID = session.createDurableSubscriber(ubw, "sessionID");
// 进行连接
connection.start();
while (true){
// 接受消息
Message receive = sessionID.receive();
if (receive!=null) {
TextMessage message = (TextMessage)receive;
System.out.println(message.getText());
}else {
break;
}
}
// 关闭资源
consumer.close();
session.close();
connection.close();
}
这样就是订阅上了 后面provider向这个里面发送消息就可以同步接收了。
这时候关闭客户端这边就显示下线了
生产者:
需要注意生产者的连接需要放在持久化配置之后,不然不能生效:
// 1、消息发送到哪里
// 39.105.61.80:8161 控制板
// 39.105.61.80:61616 程序通信tcp协议
// public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
private final static String ACTIVE_URL = "tcp://127.0.0.1:61616";
private final static int TEST_TIME = 5;
public static void main(String[] args) throws JMSException {
// 建立一个连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 建立连接
Connection connection = activeMQConnectionFactory.createConnection();
// 创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建 主题还是队列
// Queue ubw = session.createQueue("ubw-topic");
Topic ubw = session.createTopic("ubw-topic-p");
// 确定生产者
MessageProducer producer = session.createProducer(ubw);
// producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 这里也可以持久化
// 创建类型
TextMessage textMessage = session.createTextMessage();
TextMessage textMessage2 = session.createTextMessage();
textMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT); // 这里持久化 这两个持久化其实只是作用域不用而已
connection.start();
for (int i = 0; i < TEST_TIME; i++) {
textMessage.setText(i+"");
// 提交
producer.send(textMessage);
}
for (int i = 5; i < TEST_TIME + 5 ; i++) {
textMessage2.setText(i+"");
// 提交
producer.send(textMessage2);
}
// 关闭资源
producer.close();
session.close();
connection.close();
}
执行生产者,发送消息可以看到订阅的客户端多了几条消息。重启activemq以后可以获得数据
分析:这里都进行了持久化,也就说持久化设置在MessageProducer
和TextMessage
区别不大。同时create的生产者不是单例。也就是说,订阅模式本身就实现了持久化,并不需要加入setJMSDeliveryMode去设置持久化。
消息的队列的事务
对于生产者而言
影响:用send并不能发送消息,需要用commit才能提交事务。
for (int i = 0; i < TEST_TIME; i++) {
textMessage.setText(i+"");
textMessage.setJMSDeliveryMode(Message.DEFAULT_PRIORITY);
// 提交
producer.send(textMessage);
}
// session.commit(); 这里没有启动commit
for (int i = 0; i < TEST_TIME; i++) {
textMessage.setText(i+"");
textMessage.setJMSDeliveryMode(Message.DEFAULT_PRIORITY);
// 提交
producer.send(textMessage);
}
session.commit();
启动commit以后
对于消费者而言
影响:如果开启了事务而接收到以后不提交就会重复消费。
// 创建session
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//加入commit
System.out.println("commit");
session.commit();
提交了事务以后就没有重复消费了。
上一篇: MQ(消息队列)的安装和使用
下一篇: 5、服务器配置及日常运行