欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JMS

程序员文章站 2024-03-24 09:07:46
...

JMS

一、介绍

1、关于JMS

JMS(Java Message Server):Java 消息服务,当两个程序使用JMS进行通信时,它们并不是直接相连的,而是通过一个共同的消息收发服务连接起来,达到解耦的效果。JMS为标准消息协议和消息服务提供一组通用接口,包括创建、发送、读取消息等。

优势:

  • 异步:客户端不用发送请求,JMS自动将消息发送给客户端。
  • 可靠:JMS保证消息只会递送一次。

2、消息传送模型

1)点对点

点对点消息传送模型:应用程序由消息队列、发送者、接收者组成,每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息,除了消费掉的和过期的消息。

特性:

  • 每一个消息只有一个接收者;
  • 消息发送者和接收者并没有时间依赖性;
  • 当消息发送者发送消息时,无论接收者程序在不在运行,都能获取消息;
  • 当接收者收到消息时,会发送确认收到通知。

JMS

2)发布/订阅

发布/订阅消息传送模型:发布者发布一个消息,该消息通过topic传递给所有订阅的客户端,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和订阅topic。topic主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端。

特性:

  • 一个消息可以传递给多个订阅者;
  • 发布者和订阅者有时间依赖性,只有当客户端创建订阅后才能接受消息,且订阅者需一直保持活动状态以接收消息。
  • 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被**(运行),它也能接收到发布者的消息。

JMS

3、消息接收方式

  • 同步:消息订阅者调用receive()方法,消息未到达或或在到达指定时间之前,方法阻塞,直到消息可用。
  • 异步:消息订阅者需要注册一个消息监听者,只要消息到达,JMS服务提供者就会调用监听器的onMessage()方法递送消息。

4、消息结构

JMS客户端使用JMS消息与系统通讯,JMS消息虽然格式简单但是非常灵活,由消息头、消息属性、消息体三部分组成。

1)消息头

JMS消息头预定义了若干字段用于客户端与JMS提供者之间识别和发送消息,预编译头如下:

  • JMSDestination
  • JMSDeliveryMode
  • JMSMessageID
  • JMSTimestamp
  • JMSCorrelationID
  • JMSReplyTo
  • JMSRedelivered
  • JMSType
  • JMSExpiration
  • JMSPriority

2)消息属性

我们可以给消息设置自定义属性,提供给应用程序,用于实现消息过滤功能。

3)消息体

在消息体中,JMS API定义了五种类型的消息格式,让我们可以以不同的形式发送和接收消息,并提供了对已有消息格式的兼容。不同的消息类型如下:

  • Text message : javax.jms.TextMessage,表示一个文本对象。
  • Object message : javax.jms.ObjectMessage,表示一个JAVA对象。
  • Bytes message : javax.jms.BytesMessage,表示字节数据。
  • Stream message :javax.jms.StreamMessage,表示java原始值数据流。
  • Map message : javax.jms.MapMessage,表示键值对。

5、基本模块

JMS应用程序由如下基本模块组成:

JMS

  • 管理对象(Administered objects)是预先配置的JMS对象,由JMS系统管理员通过使用Application Server管理控制台创建,存储在应用程序服务器的JNDI名字空间或JNDI注册表,主要有两个被管理的对象:

    • 连接工厂(ConnectionFactory):客户端使用一个连接工厂对象连接到JMS服务提供者,它创建了JMS服务提供者和客户端之间的连接。JMS客户端(如发送者或接受者)会在JNDI名字空间中搜索并获取该连接。使用该连接,客户端能够与目的地通讯,往队列或话题发送/接收消息。让我们用一个例子来理解如何发送消息:
    • 目的地(Destination):目的地指明消息被发送的目的地以及客户端接收消息的来源。JMS使用两种目的地,队列和话题。如下代码指定了一个队列和话题。
  • 连接对象:封装了与JMS提供者之间的虚拟连接,如果我们有一个ConnectionFactory对象,可以使用它来创建一个连接。

  • 会话Session:是一个单线程上下文,用于生产和消费消息,可以创建出消息生产者和消息消费者。Session对象实现了Session接口,在创建完连接后,我们可以使用它创建Session。
  • 消息生产者:由Session创建,用于往目的地发送消息。生产者实现MessageProducer接口,我们可以为目的地、队列或话题创建生产者。
  • 消息消费者:由Session创建,用于接受目的地发送的消息。消费者实现MessageConsumer接口,我们可以为目的地、队列或话题创建消费者。
  • 消息监听器:消息的默认事件处理者,他实现了MessageListener接口,该接口包含一个onMessage方法,在该方法中需要定义消息达到后的具体动作。通过调用setMessageListener方法我们给指定消费者定义了消息监听器。

二、ActiveMQ

1、介绍

ActiveMQ:是Apache提供的一个开源的消息系统,采用java实现,能够很好的支持JMS规范

ActiveMQ组成:
JMS

ActiveMQ接发送消息流程图:
JMS

ActiveMQ官网下载ActiveMQ,解压文件,点击apache-activemq-5.15.4\bin\activemq.bat文件启动ActiveMQ,然后就可以通过http://127.0.0.1:8161/admin/index.jsp进行访问,默认账号:admin,默认密码:admin。

2、Queue

Queue:实现的是基于消息队列的点对点传送模型,每个消息被定到一个特定的队列,接收者从队列中去除发送给它的消息,队列将保留所有的消息直到消息被取走或过期。

  • 每个消息只有一个接收者。
  • 消息发送者和接收者没有时间上的依赖。
  • 接收者需要确认成功处理了消息。

1)连接信息

//ActiveMq 的默认用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//ActiveMq 的默认登录密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//ActiveMQ 的连接地址
private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

2)连接对象

//创建一个连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
//从工厂中创建一个连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();

3)会话

ActiveMQConnection源码:

/*
* @param transacted 表示session是否是可提交事务的
* @param acknowledgeMode 表示消费者是否对接收到的消息进行确认,有三个取值
*       Session.AUTO_ACKNOWLEDGE
*       Session.CLIENT_ACKNOWLEDGE
*     Session.DUPS_OK_ACKNOWLEDGE
*/
@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
    checkClosedOrFailed();
    ensureConnectionInfoSent();
    if (!transacted) {
        if (acknowledgeMode == Session.SESSION_TRANSACTED) {
            throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
        } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
            throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
                    "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
        }
    }
    return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : acknowledgeMode, isDispatchAsync(), isAlwaysSessionAsync());
}

创建会话

//生产者会话
session = connection.createSession(true,Session.SESSION_TRANSACTED);

//消费者会话
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

4)目的地

//消息队列
Queue queue = session.createQueue("ActiveMQ");

//创建生产者
MessageProducer producer=session.createProducer(queue);

//创建消费者
MessageConsumer consumer=session.createConsumer(queue);

5)发送消息

//创建消息
TextMessage message = session.createTextMessage("Message");
//发送消息
producer.send(message);
//提交事务
session.commit();

6)接收消息

  • 阻塞式:等待接收消息,消息没到时方法阻塞。
TextMessage message = (TextMessage) consumer.receive();
if(message!=null) {
    message.acknowledge();
    System.out.println(message.getText());
}else {
    break;
}
  • 监听式:可以执行后面的代码,只要消息到达,JMS服务提供者就会调用监听器的onMessage()方法传送消息。
consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
        if(message instanceof TextMessage) {
            TextMessage textMessage=(TextMessage) message;
            textMessage.acknowledge();
            System.out.println(textMessage.getText());
        }
    }
});

3、Queue实例

消息生产者

public class Producter {

    //ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的连接地址
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    AtomicInteger count = new AtomicInteger(0);
    //连接工厂
    ConnectionFactory connectionFactory;
    //连接对象
    Connection connection;
    //会话
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

    public void init(){
        try {
            //创建一个连接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //从工厂中创建一个连接
            connection  = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建一个事务(这里通过参数可以设置事务的级别)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者
            MessageProducer messageProducer = null;
            if(threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
           while(true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+"productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                System.out.println(msg.getText());
                //发送消息
                messageProducer.send(msg);
                //提交事务
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消息消费者

public class Consumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory connectionFactory;
    Connection connection;
    Session session;
    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer正在消费Msg:"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void getListenerMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;
            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage) {
                        TextMessage textMessage=(TextMessage) message;
                        try {
                            textMessage.acknowledge();
                            System.out.println(Thread.currentThread().getName()+": Consumer正在消费Msg:"+textMessage.getText()+"--->"+count.getAndIncrement());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            System.out.println("use lietener!");
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

生产者测试

public class TestProducter {
    public static void main(String[] args){
        Producter producter = new Producter();
        producter.init();
        TestProducter testProducter = new TestProducter();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(testProducter.new ProductorMq(producter)).start();
        new Thread(testProducter.new ProductorMq(producter)).start();
        new Thread(testProducter.new ProductorMq(producter)).start();
        new Thread(testProducter.new ProductorMq(producter)).start();
        new Thread(testProducter.new ProductorMq(producter)).start();
    }

    private class ProductorMq implements Runnable{
        Producter producter;
        public ProductorMq(Producter producter){
            this.producter = producter;
        }

        @Override
        public void run() {
            while(true){
                try {
                    producter.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者测试

public class TestConsumer {
    public static void main(String[] args){
        Consumer consumer = new Consumer();
        consumer.init();
        TestConsumer testConsumer = new TestConsumer();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
    }

    private class ConsumerMq implements Runnable{
        Consumer consumer;
        public ConsumerMq(Consumer consumer){
            this.consumer = consumer;
        }

        @Override
        public void run() {
            while(true){
                try {
                    consumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

4、Topic

Topic:基于Topic的发布/订阅消息传送模型,发布者和订阅者匿名,可以动态的发布或订阅内容,消息只会发送给当前订阅者,然后就失效,新的订阅者无法接收失效的消息。

  • 每个消息可以有多个消费者。
  • 发布者和订阅者有时间依赖,只有订阅了某个主题的订阅者才能接收消息,订阅者必须保持活跃以获取消息。
  • 当一个消息有多个接收者,使用发布/订阅模式。

目的地与Queue有所不同,其他皆类似:

//消息Topic
Topic topic=session.createTopic("ActiveMQ");

//创建生产者
MessageProducer producer=session.createProducer(topic);

//创建消费者
MessageConsumer consumer=session.createConsumer(topic);

四、spring整合

1、基本使用

1)pom.xml

<spring-version>5.0.7.RELEASE</spring-version>

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>${spring-version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring-version}</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.4</version>
</dependency>

2)active-mq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
                    http://www.springframework.org/schema/jms
                    http://www.springframework.org/schema/jms/spring-jms-4.0.xsd">
    <context:component-scan base-package="com.literature.activemp.spring" />

    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
        <property name="userName" value="admin"/>
        <property name="password" value="admin"/>
    </bean>
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  
        <property name="connectionFactory" ref="targetConnectionFactory"/>  
    </bean>

    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->      
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <description>队列模式模型</description>
        <constructor-arg ref="pooledConnectionFactory" />
        <property name="receiveTimeout" value="10000" />
        <!--区别它采用的模式,false是p2p,true是订阅-->
        <property name="pubSubDomain" value="false" />
    </bean>
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <description>发布/订阅模式模型</description>
        <constructor-arg ref="pooledConnectionFactory" />
        <property name="receiveTimeout" value="10000" />
        <property name="pubSubDomain" value="true" />
    </bean>

    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue"
        container-type="default" connection-factory="pooledConnectionFactory"
        acknowledge="auto">
        <!-- 可写多个监听器,监听spring-mq队列 -->
        <jms:listener destination="spring-mq" ref="queueReceiver" />
        <jms:listener destination="spring-mq" ref="queueReceiver2" />
    </jms:listener-container>
    <!-- 定义Topic监听器 -->
    <!-- <jms:listener-container destination-type="topic"
        container-type="default" connection-factory="pooledConnectionFactory"
        acknowledge="auto">
        <jms:listener destination="spring-mq" ref="topicReceiver" />
    </jms:listener-container> -->
</beans>

3)QueueSender

@Service
public class QueueSender {

    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    public void send(String queueName,final String message) {
        jmsTemplate.send(queueName,new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}

4)QueueReceiver

@Service
public class QueueReceiver implements MessageListener{
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage=(TextMessage)message;
        try {
            System.out.println(textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
@Service
public class QueueReceiver2 implements SessionAwareMessageListener{
    @Override
    public void onMessage(Message message, Session session) throws JMSException {
        TextMessage textMessage=(TextMessage)message;
        try {
            System.out.println("QueueReceiver2:"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

当队列中有消息时,两个监听器都会执行。

2、监听方式

1)MessageListener

实现MessageListener接口,该接口由原生JMS规范定义,纯粹是用来接收消息的。

@Service
public class QueueReceiver implements MessageListener{
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage=(TextMessage)message;
        try {
            System.out.println(textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

2)SessionAwareMessageListener

实现SessionAwareMessageListener接口,该接口由spring提供,方法中含有两个参数,不光可以用来接收消息,还可以根据session在接收到消息后发送一个回复的消息。

@Service
public class QueueReceiver2 implements SessionAwareMessageListener<TextMessage>{
    @Autowired
    @Qualifier("responseDestination")
    private Destination destination;

    @Override
    public void onMessage(TextMessage message, Session session) throws JMSException {
        try {
            System.out.println("QueueReceiver2:"+message.getText());
            MessageProducer producer = session.createProducer(destination);  
            Message textMessage = session.createTextMessage("QueueReceiver2 return message:"+message.getText());  
            producer.send(textMessage);  
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
<bean id="responseDestination" class="org.apache.activemq.command.ActiveMQQueue">  
    <constructor-arg>  
        <value>sessionAwareQueue</value>  
    </constructor-arg>  
</bean> 

3)MessageListenerAdapte

关于MessageListenerAdapterr,它实际上是实现了MessageListener和SessionAwareMessageListener接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的java类进行处理。

public class MessageListenerAdapter extends AbstractAdaptableMessageListener implements SubscriptionNameProvider {
    public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";//默认调用该方法
    //...
}

public abstract class AbstractAdaptableMessageListener implements MessageListener, SessionAwareMessageListener<Message> {}

MessageListenerAdapter会把接收到的消息做如下转换:

  • TextMessage转换为String对象;
  • BytesMessage转换为byte数组;
  • MapMessage转换为Map对象;
  • ObjectMessage转换为对应的Serializable对象。
@Service
public class QueueReceiver3 {
    public void handleMessage(String message){
        System.out.println("QueueReceiver3-handleMessage:"+message);
    }

    public void receiveMessage(String message){
        System.out.println("QueueReceiver3-receiveMessage:"+message);
    }
}
<bean id="queueReceiver3" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <property name="delegate">  
        <bean class="com.literature.activemp.spring.QueueReceiver3"/>  
    </property>
    <!--指定收到消息调用的方法,不指定默认调用handleMessage方法-->
    <property name="defaultListenerMethod" value="receiveMessage"/>
</bean>

3、原理分析

public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations {
    @Override
    public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
        execute(new SessionCallback<Object>() {
            @Override
            public Object doInJms(Session session) throws JMSException {
                //根据session得到目的地
                Destination destination = resolveDestinationName(session, destinationName);
                //发送消息
                doSend(session, destination, messageCreator);
                return null;
            }
        }, false);
    }

    /*
    * 创建session,并使用session执行给定action
    * action session回掉函数
    * startConnection 是否打开连接,对于receive方法需要打开
    */
    public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {
        Assert.notNull(action, "Callback object must not be null");
        Connection conToClose = null;
        Session sessionToClose = null;
        try {
            Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
                getConnectionFactory(), this.transactionalResourceFactory, startConnection);
            if (sessionToUse == null) {
                conToClose = createConnection();
                sessionToClose = createSession(conToClose);
                if (startConnection) {
                    conToClose.start();
                }
                sessionToUse = sessionToClose;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Executing callback on JMS Session: " + sessionToUse);
            }
            //传入session执行给定action
            return action.doInJms(sessionToUse);
        }catch (JMSException ex) {
            throw convertJmsAccessException(ex);
        }finally {
            JmsUtils.closeSession(sessionToClose);
            ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);
        }
    }

    //发送消息
    protected void doSend(Session session, Destination destination, MessageCreator messageCreator) throws JMSException {
        Assert.notNull(messageCreator, "MessageCreator must not be null");
        //创建生产者
        MessageProducer producer = createProducer(session, destination);
        try {
            //根据内部类中自定义的方法实现来创建Message
            Message message = messageCreator.createMessage(session);
            if (logger.isDebugEnabled()) {
                logger.debug("Sending created message: " + message);
            }
            //发送消息
            doSend(producer, message);
            // 检查提交,避免在JTA事务中的提交调用
            if (session.getTransacted() && isSessionLocallyTransacted(session)) {
                //如果不是JTA事务则提交session
                JmsUtils.commitIfNecessary(session);
            }
        }finally {
            JmsUtils.closeMessageProducer(producer);
        }
    }
}
相关标签: JMS Active MQ