JMS 2.0的新变化
自JMS 1.1于2002年发布以来,JMS规范今年进行了第一次更新——发布了JMS 2.0。
在JMS 2.0里,主要进行了易用性方面的提升、简化了开发(这终于追赶上EJB、JPA等Java EE里其他子规范的脚步了)。另一方面,消息处理本身也增加了一些新特性,比如多个消费者共享同一个主题订阅、延迟发送、异步发送消息、JMS提供者必须设置JMSXDeliveryCount消息属性等。
接下来看看具体的变化和内容。
简化的API
JMS 2.0里最大的变化是引入了一组新的API,用来发送、接收消息,减少了开发人员的编码量。对运行在Java EE应用服务器里的应用来说,新的API也支持资源注入,这样的话,JMS对象的创建和管理就可以由应用服务器负责,应用也可以进一步简化。
JMS 2.0是Java EE 7的一部分,可以用在Web应用或EJB应用里,当然,它也可以直接在Java SE环境里使用。
新的API叫做“简化的API”,物如其名,它比JMS 1.1的API更简单、更易用。简化的API包含三个新的接口:
- JMSContext:综合了1.1里的Connection和Session;
- JMSProducer:1.1里MessageProducer的替换者。支持消息发送选项、头信息,还可以用方法链配置属性;
- JMSConsumer:替换1.1里的MessageConsumer,用法和MessageConsumer类似。
简化的API包括1.1 API的所有特性,而且新增了一些功能。但2.0向后兼容,并没有废弃1.1的API,开发人员仍然可以继续使用1.1的API。
下面我们来比较一下新旧API的代码示例。
发送消息
JMS 1.1
public void sendMessage11(ConnectionFactory connectionFactory, Queue queue, String msg) { Connection connection = null; try { connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer messageProducer = session.createProducer(queue); TextMessage textMessage = session.createTextMessage(msg); messageProducer.send(textMessage); } catch (JMSException ex) { // handle exception } finally { if (connection != null) { try { connection.close(); } catch (JMSException ex) { } } } }
JMS 2.0
public void sendMessage20(ConnectionFactory connectionFactory, Queue queue, String msg) { try (JMSContext context = connectionFactory.createContext();){ context.createProducer().send(queue, msg); } catch (JMSRuntimeException ex) { // handle exception } }
显而易见,使用2.0 API的代码更为简洁。具体来说:
1、1.1要分别创建Connection和Session对象,而2.0只创建一个JMSContext对象。
2、1.1要在发送完消息之后释放连接,稳妥的做法就是在finally块里调用Connection的close方法(MessageProducer和Session不用分别close,连接关闭后会被删除)。但2.0里的JMSContext实现了Java SE 7里的java.lang.AutoCloseable接口,我们就可以使用Java SE 7里的try-with-resources块(也是Java SE 7里的一个新特性),这个块执行完之后会自动调用JMSContext的close方法释放资源,而不用我们在代码里显式关闭。
实际上,2.0里所有有close方法的接口(包括Connection、Session、MessageProducer、MessageConsumer)都实现了java.lang.AutoCloseable接口,它们都可以用在try-with-resources块里。不过这只能用在Java SE 7的环境里。
3、1.1里创建Session对象时需要指定会话是否使用本地事务,以及消息确认方式。2.0提供了默认配置,也提供了API去设置其他的会话模式。
4、上面的例子是发送一个文本消息。1.1里需要创建消息对象(TextMessage),将消息体设置为指定的字符串。2.0里直接把字符串传给send方法就可以了,JMS提供者会自动完成1.1里需要开发人员做的事情。
5、1.1的API抛出的都是受检查的异常JMSException。2.0里新的API则会抛出运行时异常JMSRuntimeException,所以我们的代码里可以不再显式地进行捕获。
同步接收消息
JMS 1.1
public String receiveMessage11(ConnectionFactory connectionFactory, Queue queue) { String msg = ""; Connection connection = null; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer messageConsumer = session.createConsumer(queue); TextMessage textMessage = (TextMessage) messageConsumer.receive(); msg = textMessage.getText(); } catch (JMSException ex) { // handle exception } finally { if (connection != null) { try { connection.close(); } catch (JMSException ex) { } } } return msg; }
JMS 2.0
public String receiveMessage20(ConnectionFactory connectionFactory, Queue queue){ String msg = null; try (JMSContext context = connectionFactory.createContext();){ JMSConsumer consumer = session.createConsumer(queue); msg = consumer.receiveBody(String.class); } catch (JMSRuntimeException ex) { // handle exception } return msg; }
接收消息的代码互相对比,简化的部分和发送消息是类似的。需要特别说明的是:
1、1.1需要调用connection.start()把消息分发到消费者。2.0里连接会自动启动。
2、1.1接收的是Message对象,需要显式进行类型转换,再进一步获取消息体,而且在1.1里,针对不同的消息类型,获取消息体的方法也是各自特定的。2.0里统一调用receiveBody方法就可以直接返回消息体,或者获取消息后统一用getBody方法获取消息体。
异步接收消息
前面接收消息的例子是同步的,接收方法在接收到消息之前会一直阻塞,除非超时。
如果想异步接收消息,需要给MessageConsumer或JMSConsumer对象设置MessageListener接口的实现类,让MessageListener去处理消息(onMessage方法)。
不同之处仍然是1.1需要显式调用connection.start()。
JMS 1.1
MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(msgListener); connection.start();
JMS 2.0
JMSConsumer consumer = context.createConsumer(queue); consumer.setMessageListener(messageListener);
注意,上面的示例代码是在Java客户端代码里的写法。如果要在Web应用或EJB应用里异步接收消息,需要使用消息驱动Bean。
将JMSContext注入Java EE应用
编写Web应用或EJB应用时,使用JMS 2.0简化的API会更加简单。因为Java EE 7的应用服务器会负责JMSContext的创建和关闭,开发人员只需要把它注入应用就可以了。
// @Inject注解告诉容器创建JMSContext @Inject // @JMSConnectionFactory注解告诉容器应用使用的连接工厂的JNDI名 @JMSConnectionFactory("jms/connectionFactory") private JMSContext context; @Resource(lookup="jms/queue") private Queue queue; public void sendMessage(String msg) { context.send(queue, msg); }
更简单的资源配置
对JMS应用来说,需要使用连接工厂和目的地(队列或主题)。连接工厂需要知道JMS提供者所在的主机名和监听端口,应用通过连接工厂创建到JMS提供者的连接;队列或主题则要知道作为消息端点的物理队列或物理主题。由于每个JMS提供者创建、配置它们的方式不尽相同,所以JMS推荐单独创建、配置,然后绑定到JNDI树上,应用使用时再通过JNDI查找、直接获取连接工厂或目的地对象。Java EE规范推荐把代码和配置分开,是为了保证应用的可移植性,代码不需要知道具体的物理细节(像主机名、端口、物理名称等)。
但对于大规模集群上部署的应用来说,代码和配置分离反而可能会增加额外的工作量,尤其是在集群不能统一配置的情形下。而且在这样的生产环境下,应用重部署和迁移的情况相对较少,那能否在应用部署的过程中就一起把应用要使用的资源对象给创建出来、省去管理员单独配置的环节呢?
JMS 2.0简化了JMS资源的配置,就适用于前面说的情形。应用开发人员可以在代码里使用2.0新增的JMS资源定义注解,也可以在部署描述符里定义,也可以两者结合。
JMS资源定义注解
JMS资源定义注解主要有javax.jms.JMSConnectionFactoryDefinition和javax.jms.JMSDestinationDefinition,可以直接定义在EJB或Servlet里。
@JMSConnectionFactoryDefinition( name="java:global/jms/connectionFactory", maxPoolSize = 30, minPoolSize= 20, properties = { "addressList=mq://localhost:7676", "reconnectEnabled=true" } ) @JMSDestinationDefinition( name = "java:global/jms/queue", interfaceName = "javax.jms.Queue", destinationName = "queue" ) public class TestServlet extends HttpServlet { ... }
如果需要定义多个连接工厂或目的地,JMSConnectionFactoryDefinition和JMSDestinationDefinition可以分别包含在MSConnectionFactoryDefinitions和JMSDestinationDefinitions注解里。类似于EJBs和EJB。
@JMSConnectionFactoryDefinitions({ @JMSConnectionFactoryDefinition( name="java:global/jms/connectionFactory1", maxPoolSize = 30, minPoolSize= 20, properties = { "addressList=mq://localhost:7676", "reconnectEnabled=true" } ), @JMSConnectionFactoryDefinition( name="java:global/jms/connectionFactory2", maxPoolSize = 30, minPoolSize= 20, properties = { "addressList=mq://localhost:7677", "reconnectEnabled=true" } ) }) @JMSDestinationDefinitions({ @JMSDestinationDefinition( name="java:global/jms/queue1", interfaceName = "javax.jms.Queue", destinationName = "queue1" ), @JMSDestinationDefinition( name="java:global/jms/queue2", interfaceName = "javax.jms.Queue", destinationName = "queue2" ) }) public class TestServlet extends HttpServlet { ... }
需要注意的是,使用JMS资源定义注解定义的连接工厂和目的地必须放在java:comp、java:module、java:app或者java:global名字空间里,生命周期和应用保持一致,即部署应用时生成,解部署应用时就会删除。
部署描述符
如果不想让资源的配置侵入代码,或者开发的时候还不确定JMS提供者的真实信息(比如主机地址和端口),那可以选择在应用的部署描述里定义连接工厂和目的地,比如web.xml或ejb-jar.xml。
<jms-connection-factory> <name>java:global/jms/connectionFactory</name> <max-pool-size>30</max-pool-size> <min-pool-size>20</min-pool-size> <property> <name>addressList</name> <value>mq://localhost:7676</value> </property> <property> <name>reconnectEnabled</name> <value>true</value> </property> </jms-connection-factory> <jms-destination> <name>java:global/jms/queue</name> <interfaceName>javax.jms.Queue</interfaceName> <destinationName>queue</destinationName> </jms-destination>
延迟发送
2.0里,消息生产者可以指定一个延迟发送时间,JMS提供者等过了这么长时间之后再发送消息。
这个功能具体体现在MessageProducer和JMSProducer的API里。
MessageProducer
MessageProducer messageProducer = session.createProducer(queue); messageProducer.setDeliveryDelay(20000); // 20000毫秒,发送消息之前设置 TextMessage textMessage = session.createTextMessage("Hello World!"); messageProducer.send(textMessage);
JMSProducer
try (JMSContext context = connectionFactory.createContext();){ context.createProducer().setDeliveryDelay(20000).send(queue, "Hello World!"); // 20000毫秒,也是在发送消息之前设置 }
异步发送消息
2.0里新的send方法允许应用异步发送消息。
传统的消息发送方式是阻塞的,消息安全发送到JMS提供者之后,发送方法才会返回。如果消息是持久化的,那中间的阻塞时间还要算上持久化花费的时间(写文件或写数据库,或者其他的存储介质)。
异步发送在发送完消息之后并不会等待JMS提供者的应答,而是继续做其它事情。JMS提供者收到消息、做完持久化之后,会回调应用指定的CompletionListener的onCompletion方法,通知客户端应用消息发送的结果。这样的话,客户端应用的执行效率就可以大大提升。
新的JMSProducer和以前就有的MessageProducer都支持异步发送。我们来看看具体的例子。
CompletionListener
class SampleCompletionListener implements CompletionListener { CountDownLatch latch; Exception exception; public SampleCompletionListener(CountDownLatch latch) { this.latch=latch; } @Override public void onCompletion(Message message) { latch.countDown(); } @Override public void onException(Message message, Exception exception) { latch.countDown(); this.exception = exception; } public Exception getException(){ return exception; } }
MessageProducer
private void asyncSend11(ConnectionFactory connectionFactory, Queue queue) throws Exception { try (Connection connection = connectionFactory.createConnection();){ Session session = connection.createSession(); MessageProducer messageProducer = session.createProducer(queue); TextMessage textMessage = session.createTextMessage("Hello World!"); CountDownLatch latch = new CountDownLatch(1); SampleCompletionListener completionListener = new SampleCompletionListener(latch); messageProducer.send(textMessage, completionListener); System.out.println("Waiting for a reply..."); // 做其他事情,不让应用闲等 latch.await(); Exception exception = completionListener.getException(); if (exception == null){ System.out.println("Send message successfully."); } else { System.out.println("Failed to send message. " + exception.getMessage()); } } }
JMSProducer
private void asyncSend20(ConnectionFactory connectionFactory, Queue queue) throws Exception { try (JMSContext context = connectionFactory.createContext();){ CountDownLatch latch = new CountDownLatch(1); SampleCompletionListener completionListener = new SampleCompletionListener(latch); context.createProducer().setAsync(completionListener).send(queue, "Hello World!"); System.out.println("Waiting for a reply..."); // 做其他事情,不让应用闲等 latch.await(); Exception exception = completionListener.getException(); if (exception == null){ System.out.println("Send message successfully."); } else { System.out.println("Failed to send message. " + exception.getMessage()); } } }
不过这个特性不适用于Java EE的Web容器和EJB容器,只可用于Java SE应用、或者是Java EE Application Client容器。
多个消费者共享同一个主题订阅
在JMS 1.1里,主题订阅和消费者是一一对应的,也就是在同一时间,一个主题订阅只能有一个消费者。
private void createConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException { Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer messageConsumer = session.createConsumer(topic); connection.start(); Message message = messageConsumer.receive(10000); while (message != null) { System.out.println("Message received: " + ((TextMessage) message).getText()); message = messageConsumer.receive(10000); } connection.close(); }
如果我们想调用createConsumer创建多个消费者,那每个消费者各有一个订阅,每个消费者都会从主题接收到一份消息拷贝。举例来说,有两个消费者A和B,主题topic上有三条消息x、y、z,那A会拿到x、y、z的一份拷贝,B也会拿到x、y、z的一份拷贝。要是想A和B能合起来消费x、y、z,即A消费x、z,B消费y,那只能是A和B共享同一个订阅了,可惜1.1并不支持共享的订阅。
2.0里可以指定订阅是“可共享的”,不论订阅是持久的还是非持久的,而且Session和JMSContext都支持这个特性。这样的话,消息的处理就可以由多线程、或者多个不同的连接、甚至多个Java进程同时进行,显而易见,这很利于提升应用的可伸缩性。
private void createSharedConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException { Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer messageConsumer = session.createSharedConsumer(topic, "ourSubscription"); // 需要指定共享订阅的名称,以便多个消费者能确定彼此共享的订阅 connection.start(); Message message = messageConsumer.receive(10000); while (message != null) { System.out.println("Message received: " + ((TextMessage) message).getText()); message = messageConsumer.receive(10000); } connection.close(); }
前面的例子创建的是非持久的订阅,同样的,持久订阅在2.0里也是可以共享的,而且Session和JMSContext也都支持。
MessageConsumer messageConsumer = session.createSharedDurableConsumer(topic, "ourDurableSubscription");
当然,2.0依旧支持不共享的订阅。
JMS提供者必须设置消息属性JMSXDeliveryCount
JMS消息由三部分组成:
- 消息头:消息头是所有消息都支持的,客户端和JMS提供者都能使用,用于消息的区分和路由。
- 属性:消息头的补充。其中一部分是JMS定义的标准属性(JMSX开头),其他的可以是应用特定的,也可以是JMS提供者特定的。
- 消息体
标准属性里,有一个JMSXDeliveryCount(int类型),它由JMS提供者设置,表示JMS提供者给消费者发送消息的尝试次数。第一次发送消息的时候,该属性的值设置为1,然后每尝试发送一次,值就加一。如果该属性的值为N,那就说明前N-1次都发送失败了。消费者如果不做特殊的处理,那会一直重复发送。
消费者可以根据这个值确认消息是否被重复发送了,进而可以进行特殊的处理,比如把这条消息放入死信队列等。
JMSXDeliveryCount在1.1里就有,只是是可选的,2.0里则变成了必需的。下面是一个借助JMSXDeliveryCount属性的值进行不同处理的例子:
class SampleMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { int deliveryCount = message.getIntProperty("JMSXDeliveryCount"); if (deliveryCount < 10){ // 故意抛出运行时一场,模拟消息处理失败的情形,使得JMS提供者能重发消息 throw new RuntimeException("Exception thrown to simulate a bad message"); } else { // 消息已经被发送了10次,放弃重发,进行其他的处理 } } catch (JMSException e) { throw new RuntimeException(e); } } }
JMS 2.0主要的新特性就介绍完了,其实在细节上还有不少变化,大家可以参考JMS 2.0规范和API文档理解更多内容。