欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring AMQP 整合RabbitMQ是如何消费消息的

程序员文章站 2022-07-12 12:31:29
...

                      Spring AMQP 整合RabbitMQ是如何消费消息的

 

先看下我们的消费者配置

 <bean id="eacct2219MessageConsumer" class="cpcn.payment.eaccountbatch.mq.Eacct2219Consumer" />
    <bean id="eacct2219MessageAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="eacct2219MessageConsumer" />
        <property name="defaultListenerMethod" value="handleTxMsg"></property>
        <property name="messageConverter" ref="messageConverter"></property>
    </bean>
    <bean id="eacct2219MessagelistenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="queueNames" value="E1174b_Eacct2219Message_Eacct"></property>
        <property name="connectionFactory" ref="cxConsumerConnectionFactory"></property>
        <property name="messageListener" ref="eacct2219MessageAdapter"></property>
        <property name="maxConcurrentConsumers" value="100" />
        <property name="concurrentConsumers" value="20" />
        <property name="adviceChain">
            <array>
                <ref bean="retryInterceptor" />
            </array>
        </property> 
        <property name="autoStartup" value="false" />
    </bean>

我们定义了一个Consumer ,这个consumer有一个处理消息的方法   handleTxMsg 

public abstract class SmartMQConsumer {

    protected static final Loggerx LOGGER = Loggerx.getLogger("dao");

    public void handleTxMsg(Object message) {
        try {
            if (message instanceof TxRequest) {
                TxRequest baseRequest = (TxRequest) message;
                LogContextVO baseContext = baseRequest.getLogContext();

                if (baseContext != null) {
                    String transferID = baseContext.getTransferID();
                    baseContext.setCurrentTransferID(transferID);
                    baseContext.setNextTransferID(0);
                    LogContextHolder.getInstance().setLogContext(baseContext);
                }
            }

            LOGGER.info(LogType.MQ, "接收MQ消息内容:" + message.getClass().getSimpleName() + ObjectTranslateUtils.getDesensitizationJSON(message));
            handler(message);
        } catch (Exception e) {
            LOGGER.error(LogType.EX, e, e);
        } finally {
            LogContextHolder.getInstance().removeLogContext();
        }
    }

    public abstract void handler(Object message);
}

 

容器启动之后有个 run()方法,重点关注 run方法中的,

continuable = receiveAndExecute(this.consumer) && !isChannelTransacted();

这一行里面的receiveAndExecute方法。

 

@Override
		public void run() {

			boolean aborted = false;

			int consecutiveIdles = 0;

			int consecutiveMessages = 0;

			try {

				try {
					SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();
					this.consumer.start();
					this.start.countDown();
				}
				catch (QueuesNotAvailableException e) {
					if (SimpleMessageListenerContainer.this.missingQueuesFatal) {
						throw e;
					}
					else {
						this.start.countDown();
						handleStartupFailure(e);
						throw e;
					}
				}
				catch (FatalListenerStartupException ex) {
					throw ex;
				}
				catch (Throwable t) {
					this.start.countDown();
					handleStartupFailure(t);
					throw t;
				}

				if (SimpleMessageListenerContainer.this.transactionManager != null) {
					/*
					 * Register the consumer's channel so it will be used by the transaction manager
					 * if it's an instance of RabbitTransactionManager.
					 */
					ConsumerChannelRegistry.registerConsumerChannel(consumer.getChannel(), getConnectionFactory());
				}

				// Always better to stop receiving as soon as possible if
				// transactional
				boolean continuable = false;
				while (isActive(this.consumer) || continuable) {
					try {
						// Will come back false when the queue is drained
						continuable = receiveAndExecute(this.consumer) && !isChannelTransacted();
						if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
							if (continuable) {
								consecutiveIdles = 0;
								if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
									considerAddingAConsumer();
									consecutiveMessages = 0;
								}
							}
							else {
								consecutiveMessages = 0;
								if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
									considerStoppingAConsumer(this.consumer);
									consecutiveIdles = 0;
								}
							}
						}
					}
					catch (ListenerExecutionFailedException ex) {
						// Continue to process, otherwise re-throw
					}
					catch (AmqpRejectAndDontRequeueException rejectEx) {
						/*
						 *  These will normally be wrapped by an LEFE if thrown by the
						 *  listener, but we will also honor it if thrown by an
						 *  error handler.
						 */
					}
				}

			}

查看这个方法中有判断事务还是不是事务的方法,关注

doReceiveAndExecute(consumer);

	private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {

		if (transactionManager != null) {
			try {
				return new TransactionTemplate(transactionManager, transactionAttribute)
						.execute(new TransactionCallback<Boolean>() {
							@Override
							public Boolean doInTransaction(TransactionStatus status) {
								ConnectionFactoryUtils.bindResourceToTransaction(
										new RabbitResourceHolder(consumer.getChannel(), false), getConnectionFactory(), true);
								try {
									return doReceiveAndExecute(consumer);
								} catch (RuntimeException e) {
									throw e;
								} catch (Throwable e) {
									throw new WrappedTransactionException(e);
								}
							}
						});
			} catch (WrappedTransactionException e) {
				throw e.getCause();
			}
		}

		return doReceiveAndExecute(consumer);

	}

 

查看doReceiveAndExecute()方法

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable {

		Channel channel = consumer.getChannel();

		for (int i = 0; i < txSize; i++) {

			logger.trace("Waiting for message from consumer.");
			Message message = consumer.nextMessage(receiveTimeout);
			if (message == null) {
				break;
			}
			try {
				executeListener(channel, message);
			} catch (ImmediateAcknowledgeAmqpException e) {
				break;
			} catch (Throwable ex) {
				consumer.rollbackOnExceptionIfNecessary(ex);
				throw ex;
			}

		}

		return consumer.commitIfNecessary(isChannelLocallyTransacted(channel));

	}

 

查看 consumer.commitIfNecessary(isChannelLocallyTransacted(channel)); 就知道为啥,每次eacct2219MessageConsumer处理完之后 ,队列上的消息就自动删除了

public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {

		if (deliveryTags.isEmpty()) {
			return false;
		}

		try {

			boolean ackRequired = !acknowledgeMode.isAutoAck() && !acknowledgeMode.isManual();

			if (ackRequired) {

				if (transactional && !locallyTransacted) {

					// Not locally transacted but it is transacted so it
					// could be synchronized with an external transaction
					for (Long deliveryTag : deliveryTags) {
						ConnectionFactoryUtils.registerDeliveryTag(connectionFactory, channel, deliveryTag);
					}

				} else {
					long deliveryTag = new ArrayList<Long>(deliveryTags).get(deliveryTags.size() - 1);
					channel.basicAck(deliveryTag, true);
				}
			}

			if (locallyTransacted) {
				// For manual acks we still need to commit
				RabbitUtils.commitIfNecessary(channel);
			}

		} finally {
			deliveryTags.clear();
		}

		return true;

	}

 

咱配置的默认是Auto 的,又不是事务,走的是这个方法

	long deliveryTag = new ArrayList<Long>(deliveryTags).get(deliveryTags.size() - 1);
					channel.basicAck(deliveryTag, true);
				}

channel.basicAck(deliverTag,true) 这个方法,批量应答,应答完之后,消息就删除了

 

如果你想把消费者改改, 改成ChannelAwareMessageListener 这个类型的

可以这样干

xml 配置

<!-- 声明消息转换器为SimpleMessageConverter -->
    <bean id="messageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"></bean>

<bean id="iFM1000MessageConsumer" class="cpcn.payment.eaccountbatch.mq.IFM1000Consumer" >
	     <property name="messageConverter" ref="messageConverter"></property>
</bean>
/**
 * 版权所有 (c) 2018,xiaoming有限公司  
 */
package cpcn.payment.tools.middleware.mq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;

import com.rabbitmq.client.Channel;

import cpcn.payment.tools.frame.ObjectTranslateUtils;
import cpcn.payment.tools.log.LogType;

/**
 * 类说明
 * 
 * <pre>
 * Modify Information:
 * Author        Date          Description
 * ============ =========== ============================
 * DELL          2018年8月6日    Create this file
 * </pre>
 * 
 */

public abstract class PubSubConsumer extends SmartMQConsumer implements ChannelAwareMessageListener {
    private MessageConverter messageConverter;
    
    protected MessageConverter getMessageConverter() {
        return this.messageConverter;
    }
    
    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }
    
    protected Object extractMessage(Message message) throws Exception {
        MessageConverter converter = getMessageConverter();
        if (converter != null) {
            return converter.fromMessage(message);
        }
        return message;
    }
    
    public abstract boolean isReject(String routingKey);

    /**
     * @see cpcn.payment.tools.middleware.mq.SmartMQConsumer#handler(java.lang.Object)
     */
    @Override
    public void handler(Object message) {
        System.out.println(message instanceof PubSubConsumer);
        if(message instanceof PubSubMessage){
            LOGGER.info(LogType.MQ, "接收MQ消息内容:" + message.getClass().getSimpleName() + ObjectTranslateUtils.getDesensitizationJSON(message));
            handlerSub(message);
        }

    }

    /**
     * @param message
     */
    public abstract void handlerSub(Object message) ;
    
    /**
     * @see org.springframework.amqp.rabbit.core.ChannelAwareMessageListener#onMessage(org.springframework.amqp.core.Message, com.rabbitmq.client.Channel)
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println(message);
        if(isReject(message.getMessageProperties().getReceivedRoutingKey())){
          channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }else{
            Object msg = extractMessage(message);
            if(msg instanceof PubSubMessage){
                LOGGER.info(LogType.MQ, "接收MQ消息内容:" + msg.getClass().getSimpleName() + ObjectTranslateUtils.getDesensitizationJSON(msg));
                handlerSub(msg);
            }
         
        }
    }

}