欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ActiveMQ简介及实例

程序员文章站 2022-05-18 14:47:57
...

一、ActiveMQ简介

      ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

二、特性列表

     ⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

      ⒉ 完全支持JMS1.1和J2EE1.4规范 (持久化,XA消息,事务)

      ⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

      ⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

      ⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

      ⒍ 支持通过JDBC和journal提供高速的消息持久化

      ⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点

      ⒏ 支持Ajax

      ⒐ 支持与Axis的整合

      ⒑ 可以很容易的调用内嵌JMS provider,进行测试

三、消息发送方式形式

  1、ActiveMQ点对点消息实现

  (1)、直接Receive方式

        Session.AUTO_ACKNOWLEDGE。 当客户成功的从receive方法返回的时候, 或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
        Session.CLIENT_ACKNOWLEDGE。 客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消 费的消息。例如,如果一个消息消费者消费了 10 个消息,然后确认第 5 个消息,那么所有 10 个消息都被确认。
        Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝第确认消息的提交。如果 JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置为 true。

    (2)、使用Listener监听方式

   2.ActiveMQ发布-订阅消息模式实现

四、简单实例

1.下载安装 apache-activemq-5.15.5-bin.zip

     网址:http://activemq.apache.org/activemq-5155-release.html

windows下启动:

ActiveMQ简介及实例

登录客户端:http://127.0.0.1:8161/admin ,用户名和密码都是admin

ActiveMQ简介及实例

2.创建maven工程实现简单的消息发送

添加依赖pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.activemq.demo</groupId>
    <artifactId>activemq-master</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.5</version>
        </dependency>

    </dependencies>

</project>

<一>、ActiveMQ点对点消息实现

创建消息生产者 JMSProducer.java

package com.activemq.demo;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 路径:com.activemq.demo
 * 类名:JMSProducer
 * 功能:消息生产者
 * 备注:
 * 创建人:typ
 * 创建时间:2018/8/14 21:35
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
public class JMSProducer {

    //默认的连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认的连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认的连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        // 连接工厂
        ConnectionFactory connectionFactory = null;
        // 连接
        Connection connection = null;
        // 会话,接受或者发送消息的线程
        Session session = null;
        // 消息的目的地
        Destination destination = null;
        // 消息的生产者
        MessageProducer messageProducer = null;

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,JMSProducer.PASSWORD,JMSProducer.BROKEURL);

        try{
            // 通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建Session
            session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            // 创建消息队列
            destination = session.createQueue("FirstQueue");
            // 创建消息生产者
            messageProducer = session.createProducer(destination);
            // 发送消息
            sendMessage(session,messageProducer);
            // 提交事务
            session.commit();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(connection != null){
                try {
                    connection.close();
                }catch (JMSException e){
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 方法名:sendMessage
     * 功能:发送消息
     * 描述:
     * 创建人:typ
     * 创建时间:2018/8/14 21:47
     * 修改人:
     * 修改描述:
     * 修改时间:
     */
    public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
        for(int i=0;i<JMSProducer.SENDNUM;i++){
            TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
            System.out.println("来自 ActiveMQ 发送的消息"+i);
            messageProducer.send(message);
        }
    }
}

创建消息消费者(直接Receive方式)JMSConsumer.java

package com.activemq.demo;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 路径:com.activemq.demo
 * 类名:JMSConsumer 
 * 功能:消息消费者
 * 备注:直接Receive方式
 * 创建人:typ
 * 创建时间:2018/8/14 21:53
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
public class JMSConsumer {

    //默认的连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认的连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认的连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        // 连接工厂
        ConnectionFactory connectionFactory = null;
        // 连接
        Connection connection = null;
        // 会话,接受或者发送消息的线程
        Session session = null;
        // 消息的目的地
        Destination destination = null;
        // 消息的消费者
        MessageConsumer messageConsumer = null;

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.BROKEURL);

        try {
            // 通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建Session
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            // 创建连接的消息队列
            destination = session.createQueue("FirstQueue");
            // 创建消息消费者
            messageConsumer = session.createConsumer(destination);

            while (true){
                TextMessage textMessage = (TextMessage)messageConsumer.receive(10000);
                if(textMessage != null){
                    System.out.println("接受到的消息:"+textMessage.getText());
                }else{
                    break;
                }
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

创建消息监听Listener.java 

package com.activemq.demo;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 路径:com.activemq.demo
 * 类名:Listener
 * 功能:消息监听
 * 备注:
 * 创建人:typ
 * 创建时间:2018/8/14 22:07
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
public class Listener implements MessageListener{

	public void onMessage(Message message) {
		try {
			System.out.println("接收到的消息:"+((TextMessage)message).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

创建消息消费者(使用Listener监听方式)JMSListenerConsumer.java

package com.activemq.demo;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 路径:com.activemq.demo
 * 类名:JMSListenerConsumer
 * 功能:消息消费者
 * 备注:使用Listener监听方式
 * 创建人:typ
 * 创建时间:2018/8/14 22:07
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
public class JMSListenerConsumer {

    //默认的连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认的连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认的连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        // 连接工厂
        ConnectionFactory connectionFactory = null;
        // 连接
        Connection connection = null;
        // 会话,接受或者发送消息的线程
        Session session = null;
        // 消息的目的地
        Destination destination = null;
        // 消息的消费者
        MessageConsumer messageConsumer = null;

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSListenerConsumer.USERNAME,JMSListenerConsumer.PASSWORD,JMSListenerConsumer.BROKEURL);

        try {
            // 通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建Session
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            // 创建连接的消息队列
            destination = session.createQueue("FirstQueue");
            // 创建消息消费者
            messageConsumer = session.createConsumer(destination);
            // 注册消息监听
            messageConsumer.setMessageListener(new Listener());
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

先运行生产者,再运行消费者,客户端效果如图:

ActiveMQ简介及实例

<二>、ActiveMQ发布-订阅消息模式实现

创建消费者JMSProducer.java

package com.activemq.demo1;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 路径:com.activemq.demo1
 * 类名:JMSProducer
 * 功能:消息生产者-消息发布者
 * 备注:发布-订阅消息模式实现
 * 创建人:typ
 * 创建时间:2018/8/14 21:35
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
public class JMSProducer {

    //默认的连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认的连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认的连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        // 连接工厂
        ConnectionFactory connectionFactory = null;
        // 连接
        Connection connection = null;
        // 会话,接受或者发送消息的线程
        Session session = null;
        // 消息的目的地
        Destination destination = null;
        // 消息的生产者
        MessageProducer messageProducer = null;

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);

        try{
            // 通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建Session
            session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            // 创建消息队列
            destination = session.createTopic("FirstQueue");
            // 创建消息生产者
            messageProducer = session.createProducer(destination);
            // 发送消息
            sendMessage(session,messageProducer);
            // 提交事务
            session.commit();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(connection != null){
                try {
                    connection.close();
                }catch (JMSException e){
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 方法名:sendMessage
     * 功能:发送消息
     * 描述:
     * 创建人:typ
     * 创建时间:2018/8/14 21:47
     * 修改人:
     * 修改描述:
     * 修改时间:
     */
    public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
        for(int i = 0; i< JMSProducer.SENDNUM; i++){
            TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
            System.out.println("来自 ActiveMQ 发送的消息"+i);
            messageProducer.send(message);
        }
    }
}

创建监听一 Listener1.java

package com.activemq.demo1;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 路径:com.activemq.demo1
 * 类名:Listener1
 * 功能:消息监听-订阅者一
 * 备注:
 * 创建人:typ
 * 创建时间:2018/8/14 22:07
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
public class Listener1 implements MessageListener{

	public void onMessage(Message message) {
		try {
			System.out.println("订阅者一接收到的消息:"+((TextMessage)message).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

创建监听二 Listener2.java

package com.activemq.demo1;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 路径:com.activemq.demo1
 * 类名:Listener2
 * 功能:消息监听-订阅者二
 * 备注:
 * 创建人:typ
 * 创建时间:2018/8/14 22:07
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
public class Listener2 implements MessageListener{

	public void onMessage(Message message) {
		try {
			System.out.println("订阅者二接收到的消息:"+((TextMessage)message).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

创建消息订阅者一  JMSListenerConsumer1.java

package com.activemq.demo1;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 路径:com.activemq.demo
 * 类名:JMSListenerConsumer1
 * 功能:消息消费者-消息订阅者一
 * 备注:使用Listener监听方式,发布-订阅消息模式实现
 * 创建人:typ
 * 创建时间:2018/8/14 22:07
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
public class JMSListenerConsumer1 {

    //默认的连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认的连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认的连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        // 连接工厂
        ConnectionFactory connectionFactory = null;
        // 连接
        Connection connection = null;
        // 会话,接受或者发送消息的线程
        Session session = null;
        // 消息的目的地
        Destination destination = null;
        // 消息的消费者
        MessageConsumer messageConsumer = null;

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSListenerConsumer1.USERNAME, JMSListenerConsumer1.PASSWORD, JMSListenerConsumer1.BROKEURL);

        try {
            // 通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建Session
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            // 创建连接的消息队列
            destination = session.createTopic("FirstQueue");
            // 创建消息消费者
            messageConsumer = session.createConsumer(destination);
            // 注册消息监听
            messageConsumer.setMessageListener(new Listener1());
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

创建消息订阅者二   JMSListenerConsumer2.java

package com.activemq.demo1;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 路径:com.activemq.demo1
 * 类名:JMSListenerConsumer2
 * 功能:消息消费者-消息订阅者二
 * 备注:使用Listener监听方式,发布-订阅消息模式实现
 * 创建人:typ
 * 创建时间:2018/8/14 22:07
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
public class JMSListenerConsumer2 {

    //默认的连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认的连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认的连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        // 连接工厂
        ConnectionFactory connectionFactory = null;
        // 连接
        Connection connection = null;
        // 会话,接受或者发送消息的线程
        Session session = null;
        // 消息的目的地
        Destination destination = null;
        // 消息的消费者
        MessageConsumer messageConsumer = null;

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSListenerConsumer2.USERNAME, JMSListenerConsumer2.PASSWORD, JMSListenerConsumer2.BROKEURL);

        try {
            // 通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建Session
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            // 创建连接的消息队列
            destination = session.createTopic("FirstQueue");
            // 创建消息消费者
            messageConsumer = session.createConsumer(destination);
            // 注册消息监听
            messageConsumer.setMessageListener(new Listener2());
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

依次运行订阅者一、订阅者二、消息消费者,客户端效果如图:

ActiveMQ简介及实例

ActiveMQ简单实例结束!!!

源码:https://download.csdn.net/download/typ1805/10604206

相关标签: ActiveMQ