欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

动态设置Spring DefaultMessageListenerContainer 的messageSelector

程序员文章站 2022-05-13 10:36:48
...

Spring JMS可以帮助开发人员快速的使用MQ的发送与接收,在异步接收方面,Spring 提供了MessageListenerContainer的容器接收消息。通过研究源码发现DefaultMessageListenerContainer是支持动态改变messageSelector的。在DefaultMessageListenerContainer 中有个cacheLevel的属性默认是4,把它改动到2或1或0,数字分表代表

public static final int CACHE_NONE = 0;
public static final int CACHE_CONNECTION = 1;
public static final int CACHE_SESSION = 2;
public static final int CACHE_CONSUMER = 3;
public static final int CACHE_AUTO = 4;

在设置完cacheLevel后就可以动态设置messageSelector,Container就能用上最新的selector了。

Spring配置如下

<bean id="messageListenerContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="jmsConnectionFactory" />  
        <property name="destination" ref="receiverQueue" />  
        <property name="messageListener" ref="jmsReceiver" />  
        <property name="concurrentConsumers" value="10" />           
        <property name="messageSelector" value="CLIENT='DEMO'" />  
        <property name="cacheLevel" value="2"/>
    </bean> 

 修改messageSelector代码如下

DefaultMessageListenerContainer messageListenerContainer = (DefaultMessageListenerContainer) ac.getBean("messageListenerContainer");
		messageListenerContainer.setMessageSelector("CLIENT='DEMO2'");

 

源码分析:

//DefaultMessageListenerContainer类中, 每次收消息都会call的方法
private boolean invokeListener() throws JMSException {
initResourcesIfNecessary();
boolean messageReceived = DefaultMessageListenerContainer.this.receiveAndExecute(this, this.session, this.consumer);
this.lastMessageSucceeded = true;
return messageReceived;
 }

//这里就是使用cacheLevel的地方,由于需要动态selector,所以需要每次重新生成consumer,//当cacheLevel<3的时候,this.consumer会为null
private void initResourcesIfNecessary() throws JMSException {
       if (DefaultMessageListenerContainer.this.getCacheLevel() <= 1) {
         updateRecoveryMarker();
       }
       else {
         if ((this.session == null) && (DefaultMessageListenerContainer.this.getCacheLevel() >= 2)) {
           updateRecoveryMarker();
           this.session = DefaultMessageListenerContainer.this.createSession(DefaultMessageListenerContainer.this.getSharedConnection());
         }
         if ((this.consumer == null) && (DefaultMessageListenerContainer.this.getCacheLevel() >= 3))
           this.consumer = DefaultMessageListenerContainer.this.createListenerConsumer(this.session);
       }
     }


//在这个方法中可以发现当传入consumer为null时,会生成一个新的consumer
protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status)
     throws JMSException
   {
     Connection conToClose = null;
     Session sessionToClose = null;
     MessageConsumer consumerToClose = null;
     try {
       Session sessionToUse = session;
       boolean transactional = false;
       if (sessionToUse == null) {
         sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
           getConnectionFactory(), this.transactionalResourceFactory, true);
         transactional = sessionToUse != null;
       }
       if (sessionToUse == null) {
         Connection conToUse = null;
         if (sharedConnectionEnabled()) {
           conToUse = getSharedConnection();
         }
         else {
           conToUse = createConnection();
           conToClose = conToUse;
           conToUse.start();
         }
         sessionToUse = createSession(conToUse);
         sessionToClose = sessionToUse;
       }
       MessageConsumer consumerToUse = consumer;
       if (consumerToUse == null) {
         consumerToUse = createListenerConsumer(sessionToUse);
         consumerToClose = consumerToUse;
       }
       Message message = receiveMessage(consumerToUse);
       if (message != null) {
         if (this.logger.isDebugEnabled()) {
           this.logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + 
             consumerToUse + "] of " + ((transactional) ? "transactional " : "") + "session [" + 
             sessionToUse + "]");
         }
         messageReceived(invoker, sessionToUse);
         boolean exposeResource = (!(transactional)) && (isExposeListenerSession()) && 
           (!(TransactionSynchronizationManager.hasResource(getConnectionFactory())));
         if (exposeResource)
           TransactionSynchronizationManager.bindResource(
             getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
         try
         {
           doExecuteListener(sessionToUse, message);
         }
         catch (Throwable ex) {
           if (status != null) {
             if (this.logger.isDebugEnabled()) {
               this.logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
             }
             status.setRollbackOnly();
           }
           handleListenerException(ex);
 
           if (ex instanceof JMSException)
             throw ((JMSException)ex);
         }
         finally
         {
           if (exposeResource)
             TransactionSynchronizationManager.unbindResource(getConnectionFactory());
         }
         return true;
       }
 
       if (this.logger.isTraceEnabled()) {
         this.logger.trace("Consumer [" + consumerToUse + "] of " + ((transactional) ? "transactional " : "") + 
           "session [" + sessionToUse + "] did not receive a message");
       }
       noMessageReceived(invoker, sessionToUse);
       return false;
     }
     finally
     {
       JmsUtils.closeMessageConsumer(consumerToClose);
       JmsUtils.closeSession(sessionToClose);
       ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
     }
   }