ActiveMQ简介及实例
一、ActiveMQ简介
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
二、特性列表
⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支持JMS1.1和J2EE1.4规范 (持久化,XA消息,事务)
⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通过JDBC和journal提供高速的消息持久化
⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点
⒏ 支持Ajax
⒐ 支持与Axis的整合
⒑ 可以很容易的调用内嵌JMS provider,进行测试
三、消息发送方式形式
1、ActiveMQ点对点消息实现
(1)、直接Receive方式
Session.AUTO_ACKNOWLEDGE。 当客户成功的从receive方法返回的时候, 或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Session.CLIENT_ACKNOWLEDGE。 客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消 费的消息。例如,如果一个消息消费者消费了 10 个消息,然后确认第 5 个消息,那么所有 10 个消息都被确认。
Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝第确认消息的提交。如果 JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置为 true。
(2)、使用Listener监听方式
2.ActiveMQ发布-订阅消息模式实现
四、简单实例
1.下载安装 apache-activemq-5.15.5-bin.zip
网址:http://activemq.apache.org/activemq-5155-release.html
windows下启动:
登录客户端:http://127.0.0.1:8161/admin ,用户名和密码都是admin
2.创建maven工程实现简单的消息发送
添加依赖pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.activemq.demo</groupId>
<artifactId>activemq-master</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.5</version>
</dependency>
</dependencies>
</project>
<一>、ActiveMQ点对点消息实现
创建消息生产者 JMSProducer.java
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 路径:com.activemq.demo
* 类名:JMSProducer
* 功能:消息生产者
* 备注:
* 创建人:typ
* 创建时间:2018/8/14 21:35
* 修改人:
* 修改备注:
* 修改时间:
*/
public class JMSProducer {
//默认的连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认的连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认的连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
//发送的消息数量
private static final int SENDNUM = 10;
public static void main(String[] args) {
// 连接工厂
ConnectionFactory connectionFactory = null;
// 连接
Connection connection = null;
// 会话,接受或者发送消息的线程
Session session = null;
// 消息的目的地
Destination destination = null;
// 消息的生产者
MessageProducer messageProducer = null;
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,JMSProducer.PASSWORD,JMSProducer.BROKEURL);
try{
// 通过连接工厂获取连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建Session
session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
// 创建消息队列
destination = session.createQueue("FirstQueue");
// 创建消息生产者
messageProducer = session.createProducer(destination);
// 发送消息
sendMessage(session,messageProducer);
// 提交事务
session.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
if(connection != null){
try {
connection.close();
}catch (JMSException e){
e.printStackTrace();
}
}
}
}
/**
* 方法名:sendMessage
* 功能:发送消息
* 描述:
* 创建人:typ
* 创建时间:2018/8/14 21:47
* 修改人:
* 修改描述:
* 修改时间:
*/
public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
for(int i=0;i<JMSProducer.SENDNUM;i++){
TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
System.out.println("来自 ActiveMQ 发送的消息"+i);
messageProducer.send(message);
}
}
}
创建消息消费者(直接Receive方式)JMSConsumer.java
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 路径:com.activemq.demo
* 类名:JMSConsumer
* 功能:消息消费者
* 备注:直接Receive方式
* 创建人:typ
* 创建时间:2018/8/14 21:53
* 修改人:
* 修改备注:
* 修改时间:
*/
public class JMSConsumer {
//默认的连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认的连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认的连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
// 连接工厂
ConnectionFactory connectionFactory = null;
// 连接
Connection connection = null;
// 会话,接受或者发送消息的线程
Session session = null;
// 消息的目的地
Destination destination = null;
// 消息的消费者
MessageConsumer messageConsumer = null;
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.BROKEURL);
try {
// 通过连接工厂获取连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建Session
session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
// 创建连接的消息队列
destination = session.createQueue("FirstQueue");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
while (true){
TextMessage textMessage = (TextMessage)messageConsumer.receive(10000);
if(textMessage != null){
System.out.println("接受到的消息:"+textMessage.getText());
}else{
break;
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}
创建消息监听Listener.java
package com.activemq.demo;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* 路径:com.activemq.demo
* 类名:Listener
* 功能:消息监听
* 备注:
* 创建人:typ
* 创建时间:2018/8/14 22:07
* 修改人:
* 修改备注:
* 修改时间:
*/
public class Listener implements MessageListener{
public void onMessage(Message message) {
try {
System.out.println("接收到的消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
创建消息消费者(使用Listener监听方式)JMSListenerConsumer.java
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 路径:com.activemq.demo
* 类名:JMSListenerConsumer
* 功能:消息消费者
* 备注:使用Listener监听方式
* 创建人:typ
* 创建时间:2018/8/14 22:07
* 修改人:
* 修改备注:
* 修改时间:
*/
public class JMSListenerConsumer {
//默认的连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认的连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认的连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
// 连接工厂
ConnectionFactory connectionFactory = null;
// 连接
Connection connection = null;
// 会话,接受或者发送消息的线程
Session session = null;
// 消息的目的地
Destination destination = null;
// 消息的消费者
MessageConsumer messageConsumer = null;
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSListenerConsumer.USERNAME,JMSListenerConsumer.PASSWORD,JMSListenerConsumer.BROKEURL);
try {
// 通过连接工厂获取连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建Session
session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
// 创建连接的消息队列
destination = session.createQueue("FirstQueue");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
// 注册消息监听
messageConsumer.setMessageListener(new Listener());
}catch(Exception e){
e.printStackTrace();
}
}
}
先运行生产者,再运行消费者,客户端效果如图:
<二>、ActiveMQ发布-订阅消息模式实现
创建消费者JMSProducer.java
package com.activemq.demo1;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 路径:com.activemq.demo1
* 类名:JMSProducer
* 功能:消息生产者-消息发布者
* 备注:发布-订阅消息模式实现
* 创建人:typ
* 创建时间:2018/8/14 21:35
* 修改人:
* 修改备注:
* 修改时间:
*/
public class JMSProducer {
//默认的连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认的连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认的连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
//发送的消息数量
private static final int SENDNUM = 10;
public static void main(String[] args) {
// 连接工厂
ConnectionFactory connectionFactory = null;
// 连接
Connection connection = null;
// 会话,接受或者发送消息的线程
Session session = null;
// 消息的目的地
Destination destination = null;
// 消息的生产者
MessageProducer messageProducer = null;
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
try{
// 通过连接工厂获取连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建Session
session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
// 创建消息队列
destination = session.createTopic("FirstQueue");
// 创建消息生产者
messageProducer = session.createProducer(destination);
// 发送消息
sendMessage(session,messageProducer);
// 提交事务
session.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
if(connection != null){
try {
connection.close();
}catch (JMSException e){
e.printStackTrace();
}
}
}
}
/**
* 方法名:sendMessage
* 功能:发送消息
* 描述:
* 创建人:typ
* 创建时间:2018/8/14 21:47
* 修改人:
* 修改描述:
* 修改时间:
*/
public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
for(int i = 0; i< JMSProducer.SENDNUM; i++){
TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
System.out.println("来自 ActiveMQ 发送的消息"+i);
messageProducer.send(message);
}
}
}
创建监听一 Listener1.java
package com.activemq.demo1;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* 路径:com.activemq.demo1
* 类名:Listener1
* 功能:消息监听-订阅者一
* 备注:
* 创建人:typ
* 创建时间:2018/8/14 22:07
* 修改人:
* 修改备注:
* 修改时间:
*/
public class Listener1 implements MessageListener{
public void onMessage(Message message) {
try {
System.out.println("订阅者一接收到的消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
创建监听二 Listener2.java
package com.activemq.demo1;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* 路径:com.activemq.demo1
* 类名:Listener2
* 功能:消息监听-订阅者二
* 备注:
* 创建人:typ
* 创建时间:2018/8/14 22:07
* 修改人:
* 修改备注:
* 修改时间:
*/
public class Listener2 implements MessageListener{
public void onMessage(Message message) {
try {
System.out.println("订阅者二接收到的消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
创建消息订阅者一 JMSListenerConsumer1.java
package com.activemq.demo1;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 路径:com.activemq.demo
* 类名:JMSListenerConsumer1
* 功能:消息消费者-消息订阅者一
* 备注:使用Listener监听方式,发布-订阅消息模式实现
* 创建人:typ
* 创建时间:2018/8/14 22:07
* 修改人:
* 修改备注:
* 修改时间:
*/
public class JMSListenerConsumer1 {
//默认的连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认的连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认的连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
// 连接工厂
ConnectionFactory connectionFactory = null;
// 连接
Connection connection = null;
// 会话,接受或者发送消息的线程
Session session = null;
// 消息的目的地
Destination destination = null;
// 消息的消费者
MessageConsumer messageConsumer = null;
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSListenerConsumer1.USERNAME, JMSListenerConsumer1.PASSWORD, JMSListenerConsumer1.BROKEURL);
try {
// 通过连接工厂获取连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建Session
session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
// 创建连接的消息队列
destination = session.createTopic("FirstQueue");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
// 注册消息监听
messageConsumer.setMessageListener(new Listener1());
}catch(Exception e){
e.printStackTrace();
}
}
}
创建消息订阅者二 JMSListenerConsumer2.java
package com.activemq.demo1;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 路径:com.activemq.demo1
* 类名:JMSListenerConsumer2
* 功能:消息消费者-消息订阅者二
* 备注:使用Listener监听方式,发布-订阅消息模式实现
* 创建人:typ
* 创建时间:2018/8/14 22:07
* 修改人:
* 修改备注:
* 修改时间:
*/
public class JMSListenerConsumer2 {
//默认的连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认的连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认的连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
// 连接工厂
ConnectionFactory connectionFactory = null;
// 连接
Connection connection = null;
// 会话,接受或者发送消息的线程
Session session = null;
// 消息的目的地
Destination destination = null;
// 消息的消费者
MessageConsumer messageConsumer = null;
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSListenerConsumer2.USERNAME, JMSListenerConsumer2.PASSWORD, JMSListenerConsumer2.BROKEURL);
try {
// 通过连接工厂获取连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建Session
session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
// 创建连接的消息队列
destination = session.createTopic("FirstQueue");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
// 注册消息监听
messageConsumer.setMessageListener(new Listener2());
}catch(Exception e){
e.printStackTrace();
}
}
}
依次运行订阅者一、订阅者二、消息消费者,客户端效果如图:
ActiveMQ简单实例结束!!!