欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ActiveMQ——消息队列基础篇 企业应用JMSActiveMQ消息队列MQ 

程序员文章站 2022-07-13 12:22:48
...

简介

ActiveMQ Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

特性

  1.  多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2.  完全支持JMS1.1J2EE 1.4规范 (持久化,XA消息,事务)
  3. Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  4. 通过了常见J2EE服务器( Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBCjournal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9.  支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试

JMS简介

     JMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。JMS有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。

JMS服务提供者

     JMS服务提供者实现消息队列和通知,同时实现消息管理的APIJMS已经是J2EE API的一部分,J2EE服务器都提供JMS服务。

消息管理对象

     消息管理对象提供对消息进行操作的APIJMS AP中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactoryTopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。

消息的生产者消费者

     消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber)和点对点消息的接收者(queue receiver)。

JMS消息

     息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下三部分组成:消息头(header)、属性(property)和消息体(body)。

消息标头

消息标头是消息的信封,包含为使消息到达目的地所需要的所有信息,可以直接控制其

中一些字段的值,其它值则由JMS提供程序填写。

JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表

示消息、设置优先权和失效时间等等,并且为消息确定路由。

 

JMSDestination Send方法设置。指定消息的目的地,由JMS提供程序填写

JMSDeliveryMode Send方法设置。提交消息的模式-持续或非持续。发送消息后JMS提供程序填写该字段。

JMSMessageID Send方法设置。包含消息的唯一标识符。发送过程中由JMS提供程序填写

JMSTimeStamp Send 方法设置。记录消息被传递给send方法的时间。发送过程中由JMS提供程序填写

JMSCorrelationID 由客户端设置。包含用于将消息连接在一起的ID。客户端一般将其置为所引用消息的ID

JMSReplyTo 由客户端设置。响应消息的目的地,如果客户端期望得到响应消息,则填写该字段

JMSRedelivered JMS提供程序设置。指出该消息先前被发送过

JMSType 由客户端设置。包含由客户端提供的消息类型标识符。是否需要该字段,不同的提供程序有不同要求

JMSExpiration Send 方法设置。一个根据客户端提供的年龄计算出来的值,如果GMT比该过期值晚,则销毁消息

JMSPriority Send 方法设置。包含客户端在发送消息时所设置有限级值

消息属性

消息属性,用来添加删除消息头以外的附加信息。除了上面的属性,还可以自定义属

性,以便进行消息的选择

一般通过setXXXProperty方法来定义消息属性,XXX取值为:BooleanByte

DoubleFloatIntLongObjectShortString

每一属性均由字符串名字和相关的值组成 ,例如:

TextMessage msg = tsession.createTextMessage();

msg.setStringProperty(“CUSTOMER_NAME”,”MyCustomer”);

String customer = msg.getStringProperty(“CUSTOMER_NAME”);

其中的”CUSTOMER_NAME””MyCustomer”就是消息当中对应的keyvalue

消息主体

消息主体包含了消息的核心数据。

JMS 定义了5中消息类型: TextMessageMapMessageBytesMessage

StreamMessageObjectMessage

选择最合适的消息类型可以使JMS最有效 的处理消息。

 

  • TextMessage(文本消息)

将数据作为简单字符串存放在主体中(XML就可以作为字符串发)

TextMessage msg = session.createTextMessage();

msg.setText(text);

有些厂商支持一种XML专用的消息格式,带来了便利,但是不是标准的JMS类型,影响

移植性。

只自己定义了两个方法setText(String s)getText()

 

  • MapMessage(映射消息)

使用一张映射表来存放其主体内容(参照Jms API

MapMessage msg = session.createMapMessage();

msg.setString(“CUSTOMER_NAME”,”John”);

msg.setInt(“CUSTOMER_AGE”,12);

String s = msg.getString(“CUSTOMER_NAME”);

int age = msg.getInt(“CUSTOMER_AGE”);

 

  • BytesMessage(字节消息)

将字节流存放在消息主体中。适合于下列情况:必须压缩发送的大量数据、需要与现有

消息格式保持一致等(参照Jms API

byte[] data;

BytesMessage msg = session.createBytesMessage();

msg.wirte(data);

byte[] msgData = new byte[256];

int bytesRead = msg.readBytes(msgData);

 

  •  StreamMessage(流消息)

用于处理原语类型。这里也支持属性字段和MapMessage所支持的数据类型。使用这种

消息格式时,收发双方事先协商好字段的顺序,以保证写读顺序相同(参照Jms API

StringMessage msg = session.createStreamMessage();

msg.writeString(“John”);

msg.writeInt(12);

String s = msg.readString();

Int age = msg.readInt();

PS:个人认为有点像socket的信息收发)

 

  •  ObjectMessage(对象消息)

用于往消息中写入可序列化的对象。

消息中可以存放一个对象,如果要存放多个对象,需要建立一个对象集合,然后把这个

集合写入消息。

客户端接收到一个ObjectMessage时,是read-only模式。如果一个客户端试图写

message,将会抛出MessageNotWriteableException。如果调用了clearBody方法,message既可以读又可以写

自己只单独定义了两个方法:getObject()setObject(Serializable s)

ObjectMessage包含的只是object的一个快照,set之后object的修改对ObjectMessagebody无效 (从两个方法可以看出,这种消息已经强制要你实现java.io. Serializable接口)

Message只读时被set抛出MessageNotWriteableException;

setget时,如果对象序列化失败抛出MessageFormatException

消息的通信方式(点对点通信和发布/订阅方式)

点对点方式(point-to-point

点对点的消息发送方式主要建立在 Message QueueSenderReceiver上,

Message Queue 存贮消息,Sender 发送消息,Receiver接收消息.具体点就是Sender Client发送Message Queue , Receiver ClientQueue中接收消息和"发送消息已接受"Queue,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行。

发布/订阅方式(publish/subscriber Messaging

发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户

端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destinationreceive方法,和实现message listener 接口的onMessage 方法。

编程模式

消息产生者向JMS发送消息的步骤

  1. 创建连接使用的工厂类JMS ConnectionFactory
  2. 使用管理对象JMS ConnectionFactory建立连接Connection
  3. 使用连接Connection建立会话Session
  4. 使用会话Session和管理对象Destination创建消息生产者MessageSender
  5. 使用消息生产者MessageSender发送消息

消息消费者从JMS接受消息的步骤

  1. 创建连接使用的工厂类JMS ConnectionFactory
  2. 使用管理对象JMS ConnectionFactory建立连接Connection
  3. 使用连接Connection 建立会话Session
  4. 使用会话Session和管理对象Destination创建消息消费者MessageReceiver
  5. 使用消息消费者MessageReceiver接受消息,需要用setMessageListenerMessageListener接口绑定到MessageReceiver 消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

ActiveMQ运行

ActiveMQ5.3版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于

监控ActiveMQadmin应用。运行%activemq_home%bin/目录下的 activemq.bat , 之后你会看见如下一段话表示启动成功。

打开http://localhost:8161/admin/queues.jsp ,可以查看相应的queue中是否有

消息,如下截图


ActiveMQ——消息队列基础篇
            
    
    
        企业应用JMSActiveMQ消息队列MQ 
  

简单的例子

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Test {
	public static void main(String[] args) throws JMSException {
		new Receiver().beginReceiveMsg();
		new Sender().send();
	}
}

class Sender {

	static String subject = "TEST";
	static String user = ActiveMQConnection.DEFAULT_USER;
	static String password = ActiveMQConnection.DEFAULT_PASSWORD;
	// failover://tcp://localhost:61616
	static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	boolean transacted = true;
	boolean persistent = true;
	int ackMode = Session.AUTO_ACKNOWLEDGE;

	Session session;
	Destination destination;
	MessageProducer producer;

	public Sender() throws JMSException {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				user, password, url);
		Connection connection = connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(transacted, ackMode);
		destination = session.createQueue(subject);
		producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
	}

	public void send() throws JMSException {
		MapMessage message = session.createMapMessage();
		message.setString("test", "test");
		message.setStringProperty("RECEIVER_CHAR", " TEST ");
		producer.send(message);
		session.commit();
	}
}

class Receiver implements MessageListener {

	static String subject = "TEST";
	static String user = ActiveMQConnection.DEFAULT_USER;
	static String password = ActiveMQConnection.DEFAULT_PASSWORD;
	// failover://tcp://localhost:61616
	static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	boolean transacted = true;
	int ackMode = Session.AUTO_ACKNOWLEDGE;

	Session session;
	Destination destination;
	MessageConsumer consumer;

	public Receiver() throws JMSException {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				user, password, url);
		Connection connection = connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(transacted, ackMode);
		destination = session.createQueue(subject);
		consumer = session.createConsumer(destination, "RECEIVER_CHAR = TEST");
	}

	public void beginReceiveMsg() throws JMSException {
		consumer.setMessageListener(this);
	}

	@Override
	public void onMessage(Message message) {
		if (message instanceof MapMessage) {
			MapMessage msg = (MapMessage) message;
			try {
				System.out.println(msg.getString("test"));
			} catch (JMSException e) {
				try {
					session.rollback();
				} catch (JMSException e1) {
					e1.printStackTrace();
				}
				e.printStackTrace();
			} finally {
				if (session != null) {
					try {
						session.close();
					} catch (JMSException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		}
	}
}

其他产品

1、开源JMS供应商

  • jbossmq(jboss 4)
  • jboss messaging (jboss 5)
  • joram-4.3.21 2006-09-22
  • openjms-0.7.7-alpha-3.zip December 26, 2005
  • mantamq
  • ubermq
  • SomnifugiJMS 2005-7-27

2、商业JMS供应商

  • IBM WebSphere MQ
  • BEA WebLogic JMS
  • Oracle AQ
  • NonStop Server for Java Message Service(JMS)
  • Sun Java System Message Queue
  • Sonic jms
  • TIBCO Enterprise For JMS
  • iLinkMQ (国内)
  • TongLink/Q(北京东方通科技)

 

 

  • ActiveMQ——消息队列基础篇
            
    
    
        企业应用JMSActiveMQ消息队列MQ 
  • 大小: 33.4 KB