springboot 集成activeMQ实现消息队列和双向队列
一. 认识JMS
1.概述
对于JMS,百度百科,是这样介绍的:JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
简短来说,JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity),提供了应用程序之间异步通信的功能。
JMS1.0是jsr 194里规定的规范(关于jsr规范,请点击)。目前最新的规范是JSR 343,JMS2.0。
好了,说了这么多,其实只是在说,JMS只是sun公司为了统一厂商的接口规范,而定义出的一组api接口。
2. JMS体系结构
描述如下:
- JMS提供者(JMS的实现者,比如activemq jbossmq等)
- JMS客户(使用提供者发送消息的程序或对象,例如在12306中,负责发送一条购票消息到处理队列中,用来解决购票高峰问题,那么,发送消息到队列的程序和从队列获取消息的程序都叫做客户)
- JMS生产者,JMS消费者(生产者及负责创建并发送消息的客户,消费者是负责接收并处理消息的客户)
- JMS消息(在JMS客户之间传递数据的对象)
- JMS队列(一个容纳那些被发送的等待阅读的消息的区域)
- JMS主题(一种支持发送消息给多个订阅者的机制)
3. JMS对象模型
- 连接工厂(connectionfactory)客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
- JMS连接 表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
- JMS会话 session 标识JMS客户端和服务端的会话状态。会话建立在JMS连接上,标识客户与服务器之间的一个会话进程。
- JMS目的 Destinatio 又称为消息队列,是实际的消息源
- 生产者和消费者
- 消息类型,分为队列类型(优先先进先出)以及订阅类型
二. ActiveMQ
1. ActiveMQ的安装
- 从官网下载安装包, http://activemq.apache.org/download.html
- 赋予运行权限 chmod +x,windows可以忽略此步
- 运行 ./active start | stop
启动后,activeMQ会占用两个端口,一个是负责接收发送消息的tcp端口:61616,一个是基于web负责用户界面化管理的端口:8161。这两个端口可以在conf下面的xml中找到。http服务器使用了jettry。这里有个问题是启动mq后,很长时间管理界面才可以显示出来。
==》启动MQ服务器:
根据操作系统不同,进入相应win64/win32位目录,双击activemq.bat启动MQ。控制台部分显示:
ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用
进入ActionMQ服务监控地址:浏览器输入http://127.0.0.1:8161/admin
登录成功,页面显示:
重点关注菜单栏:
【1】Queues:队列
【2】Topics
一、引入依赖配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
二、application.properties配置
# ActiveMQ--------------------------------------------------------------------------------------------------------------
# Specify if the default broker URL should be in memory. Ignored if an explicit broker has been specified.
#spring.activemq.in-memory=false
# URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`
spring.activemq.broker-url=tcp://127.0.0.1:61616
# Login user of the broker.
spring.activemq.user=admin
# Login password of the broker.
spring.activemq.password=admin
# Trust all packages.
#spring.activemq.packages.trust-all=false
# Comma-separated list of specific packages to trust (when not trusting all packages).
#spring.activemq.packages.trusted=
# See PooledConnectionFactory.
#spring.activemq.pool.configuration.*=
# Whether a PooledConnectionFactory should be created instead of a regular ConnectionFactory.
spring.activemq.pool.enabled=true
# Maximum number of pooled connections.
spring.activemq.pool.max-connections=50
# Connection expiration timeout in milliseconds.
spring.activemq.pool.expiry-timeout=10000
# Connection idle timeout in milliseconds.
spring.activemq.pool.idle-timeout=30000
spring.jms.pub-sub-domain=false
三、新建消息生产者类
package ykf.mattress.sys.main.activemq;
import javax.annotation.Resource;
import javax.jms.Destination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
/**
* 测试消息 生成者
*/
@Service("messageProduce")
public class MessageProduce {
private Logger logger = LoggerFactory.getLogger(MessageProduce.class);
@Resource
private JmsTemplate jmsTemplate;
/**
* 发送消息
*
* @param destination
* 发送到的队列
* @param message
* 待发送的消息
*/
public void convertAndSend(Destination destination, final String message) {
jmsTemplate.convertAndSend(destination, message);
logger.info("消息发送成功!");
}
}
四、新建消息消费者one two
package ykf.mattress.sys.main.activemq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumeNone {
private Logger logger = LoggerFactory.getLogger(MessageConsumeNone.class);
/**
* 使用JmsListener配置消费者监听的队列
*
* @param text
* 接收到的消息
*/
@JmsListener(destination = "suimh_queue")
public void receiveQueue(String text) {
logger.info("MessageConsumeNone : 收到的报文为:" + text);
}
}
package ykf.mattress.sys.main.activemq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumeNtwo {
private Logger logger = LoggerFactory.getLogger(MessageConsumeNtwo.class);
/**
* 使用JmsListener配置消费者监听的队列
*
* @param text
* 接收到的消息
*/
@JmsListener(destination = "suimh_queue")
@SendTo("out.queue")
public String receiveQueue(String text) {
logger.info("MessageConsumeNtwo : 收到的报文为:" + text);
logger.info("MessageConsumeNtwo : 收到的报文为:返回数据到out.queue");
return text;
}
}
五、新建返回报文接收类
package ykf.mattress.sys.main.activemq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* 接收消费者 返回数据
*/
@Component
public class RxCousResultProduce {
private Logger logger = LoggerFactory.getLogger(RxCousResultProduce.class);
/**
* 使用JmsListener配置消费者监听的队列
*
* @param text
* 接收到的消息
*/
@JmsListener(destination = "out.queue")
public void consumerMessage(String text) {
logger.info("RxCousResultProduce : 从out.queue队列收到的回复报文为:" + text);
}
}
六、写控制层测试
@Resource
MessageProduce messageProduce;
@GetMapping(value = "/activeMqSendMes")
@ResponseBody
public String activeMqSendMes() {
int num = 10;
try {
//ActiveMQQueue 队列模式
//ActiveMQTopic 订阅模式
Destination destinationQueue = new ActiveMQQueue("suimh_queue");
for (int i = 1; i <= num; i++) {
messageProduce.convertAndSend(destinationQueue, "这是queueProducer发送的第" + i + "个消息!");
}
return "activeMQ生产成功!";
} catch (Exception e) {
return "activeMQ生产失败!";
}
}
七、结果
整体 是结合了网上的几篇 帖子然后整合一起的
推荐阅读
-
springboot 集成activeMQ实现消息队列和双向队列
-
SpringBoot2.X 整合RedisTemplate 简单实现消息队列
-
Springboot项目redisTemplate实现轻量级消息队列
-
PHP消息队列实现及应用详解【队列处理订单系统和配送系统】
-
消息队列的作用以及kafka和activemq的对比
-
PHP消息队列实现及应用详解【队列处理订单系统和配送系统】
-
Springboot项目redisTemplate实现轻量级消息队列
-
PHP使用ActiveMQ实现消息队列的方法详解
-
springboot 整合 RabbitMQ实现消息队列
-
php和redis怎么实现消息队列