ActiveMQ——消息队列基础篇 企业应用JMSActiveMQ消息队列MQ
简介
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
特性
-
多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
-
完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
-
对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
-
通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
-
支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
-
支持通过JDBC和journal提供高速的消息持久化
-
从设计上保证了高性能的集群,客户端-服务器,点对点
-
支持Ajax
-
支持与Axis的整合
-
可以很容易得调用内嵌JMS provider,进行测试
JMS简介
JMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。JMS有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。
JMS服务提供者
JMS服务提供者实现消息队列和通知,同时实现消息管理的API。JMS已经是J2EE API的一部分,J2EE服务器都提供JMS服务。
消息管理对象
消息管理对象提供对消息进行操作的API。JMS AP中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。
消息的生产者消费者
消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber)和点对点消息的接收者(queue receiver)。
JMS消息
息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下三部分组成:消息头(header)、属性(property)和消息体(body)。
消息标头
消息标头是消息的信封,包含为使消息到达目的地所需要的所有信息,可以直接控制其
中一些字段的值,其它值则由JMS提供程序填写。
JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表
示消息、设置优先权和失效时间等等,并且为消息确定路由。
JMSDestination: 由Send方法设置。指定消息的目的地,由JMS提供程序填写
JMSDeliveryMode: 由Send方法设置。提交消息的模式-持续或非持续。发送消息后JMS提供程序填写该字段。
JMSMessageID: 由Send方法设置。包含消息的唯一标识符。发送过程中由JMS提供程序填写
JMSTimeStamp: 由Send 方法设置。记录消息被传递给send方法的时间。发送过程中由JMS提供程序填写
JMSCorrelationID: 由客户端设置。包含用于将消息连接在一起的ID。客户端一般将其置为所引用消息的ID
JMSReplyTo: 由客户端设置。响应消息的目的地,如果客户端期望得到响应消息,则填写该字段
JMSRedelivered: 由JMS提供程序设置。指出该消息先前被发送过
JMSType: 由客户端设置。包含由客户端提供的消息类型标识符。是否需要该字段,不同的提供程序有不同要求
JMSExpiration: Send 方法设置。一个根据客户端提供的年龄计算出来的值,如果GMT比该过期值晚,则销毁消息
JMSPriority: Send 方法设置。包含客户端在发送消息时所设置有限级值
消息属性
消息属性,用来添加删除消息头以外的附加信息。除了上面的属性,还可以自定义属
性,以便进行消息的选择 。
一般通过setXXXProperty方法来定义消息属性,XXX取值为:Boolean、Byte、
Double、Float、Int、Long、Object、Short及String。
每一属性均由字符串名字和相关的值组成 ,例如:
TextMessage msg = tsession.createTextMessage();
msg.setStringProperty(“CUSTOMER_NAME”,”MyCustomer”);
String customer = msg.getStringProperty(“CUSTOMER_NAME”);
其中的”CUSTOMER_NAME”和”MyCustomer”就是消息当中对应的key和value。
消息主体
消息主体包含了消息的核心数据。
JMS 定义了5中消息类型: TextMessage、MapMessage、BytesMessage、
StreamMessage和ObjectMessage
选择最合适的消息类型可以使JMS最有效 的处理消息。
-
TextMessage(文本消息)
将数据作为简单字符串存放在主体中(XML就可以作为字符串发)
TextMessage msg = session.createTextMessage();
msg.setText(text);
有些厂商支持一种XML专用的消息格式,带来了便利,但是不是标准的JMS类型,影响
移植性。
只自己定义了两个方法setText(String s)、getText()
-
MapMessage(映射消息)
使用一张映射表来存放其主体内容(参照Jms API)
MapMessage msg = session.createMapMessage();
msg.setString(“CUSTOMER_NAME”,”John”);
msg.setInt(“CUSTOMER_AGE”,12);
String s = msg.getString(“CUSTOMER_NAME”);
int age = msg.getInt(“CUSTOMER_AGE”);
-
BytesMessage(字节消息)
将字节流存放在消息主体中。适合于下列情况:必须压缩发送的大量数据、需要与现有
消息格式保持一致等(参照Jms API)
byte[] data;
BytesMessage msg = session.createBytesMessage();
msg.wirte(data);
byte[] msgData = new byte[256];
int bytesRead = msg.readBytes(msgData);
-
StreamMessage(流消息)
用于处理原语类型。这里也支持属性字段和MapMessage所支持的数据类型。使用这种
消息格式时,收发双方事先协商好字段的顺序,以保证写读顺序相同(参照Jms API)
StringMessage msg = session.createStreamMessage();
msg.writeString(“John”);
msg.writeInt(12);
String s = msg.readString();
Int age = msg.readInt();
(PS:个人认为有点像socket的信息收发)
-
ObjectMessage(对象消息)
用于往消息中写入可序列化的对象。
消息中可以存放一个对象,如果要存放多个对象,需要建立一个对象集合,然后把这个
集合写入消息。
客户端接收到一个ObjectMessage时,是read-only模式。如果一个客户端试图写
message,将会抛出MessageNotWriteableException。如果调用了clearBody方法,message既可以读又可以写
自己只单独定义了两个方法:getObject()和setObject(Serializable s)
ObjectMessage包含的只是object的一个快照,set之后object的修改对ObjectMessage的body无效 (从两个方法可以看出,这种消息已经强制要你实现java.io. Serializable接口)
Message只读时被set抛出MessageNotWriteableException;
set和get时,如果对象序列化失败抛出MessageFormatException
消息的通信方式(点对点通信和发布/订阅方式)
点对点方式(point-to-point)
点对点的消息发送方式主要建立在 Message Queue、Sender、Receiver上,
Message Queue 存贮消息,Sender 发送消息,Receiver接收消息.具体点就是Sender Client发送Message Queue ,而 Receiver Client从Queue中接收消息和"发送消息已接受"到Queue,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行。
发布/订阅方式(publish/subscriber Messaging)
发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户
端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。
编程模式
消息产生者向JMS发送消息的步骤
-
创建连接使用的工厂类JMS ConnectionFactory
-
使用管理对象JMS ConnectionFactory建立连接Connection
-
使用连接Connection建立会话Session
-
使用会话Session和管理对象Destination创建消息生产者MessageSender
-
使用消息生产者MessageSender发送消息
消息消费者从JMS接受消息的步骤
-
创建连接使用的工厂类JMS ConnectionFactory
-
使用管理对象JMS ConnectionFactory建立连接Connection
-
使用连接Connection 建立会话Session
-
使用会话Session和管理对象Destination创建消息消费者MessageReceiver
-
使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver 消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。
ActiveMQ运行
ActiveMQ5.3版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于
监控ActiveMQ的admin应用。运行%activemq_home%bin/目录下的 activemq.bat , 之后你会看见如下一段话表示启动成功。
打开http://localhost:8161/admin/queues.jsp ,可以查看相应的queue中是否有
消息,如下截图
简单的例子
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Test { public static void main(String[] args) throws JMSException { new Receiver().beginReceiveMsg(); new Sender().send(); } } class Sender { static String subject = "TEST"; static String user = ActiveMQConnection.DEFAULT_USER; static String password = ActiveMQConnection.DEFAULT_PASSWORD; // failover://tcp://localhost:61616 static String url = ActiveMQConnection.DEFAULT_BROKER_URL; boolean transacted = true; boolean persistent = true; int ackMode = Session.AUTO_ACKNOWLEDGE; Session session; Destination destination; MessageProducer producer; public Sender() throws JMSException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); Connection connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(transacted, ackMode); destination = session.createQueue(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); } public void send() throws JMSException { MapMessage message = session.createMapMessage(); message.setString("test", "test"); message.setStringProperty("RECEIVER_CHAR", " TEST "); producer.send(message); session.commit(); } } class Receiver implements MessageListener { static String subject = "TEST"; static String user = ActiveMQConnection.DEFAULT_USER; static String password = ActiveMQConnection.DEFAULT_PASSWORD; // failover://tcp://localhost:61616 static String url = ActiveMQConnection.DEFAULT_BROKER_URL; boolean transacted = true; int ackMode = Session.AUTO_ACKNOWLEDGE; Session session; Destination destination; MessageConsumer consumer; public Receiver() throws JMSException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); Connection connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(transacted, ackMode); destination = session.createQueue(subject); consumer = session.createConsumer(destination, "RECEIVER_CHAR = TEST"); } public void beginReceiveMsg() throws JMSException { consumer.setMessageListener(this); } @Override public void onMessage(Message message) { if (message instanceof MapMessage) { MapMessage msg = (MapMessage) message; try { System.out.println(msg.getString("test")); } catch (JMSException e) { try { session.rollback(); } catch (JMSException e1) { e1.printStackTrace(); } e.printStackTrace(); } finally { if (session != null) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } }
其他产品
1、开源JMS供应商
-
jbossmq(jboss 4)
-
jboss messaging (jboss 5)
-
joram-4.3.21 2006-09-22
-
openjms-0.7.7-alpha-3.zip December 26, 2005
-
mantamq
-
ubermq
-
SomnifugiJMS 2005-7-27
2、商业JMS供应商
-
IBM WebSphere MQ
-
BEA WebLogic JMS
-
Oracle AQ
-
NonStop Server for Java Message Service(JMS)
-
Sun Java System Message Queue
-
Sonic jms
-
TIBCO Enterprise For JMS
-
iLinkMQ (国内)
-
TongLink/Q(北京东方通科技)