Spring AMQP 整合RabbitMQ是如何消费消息的
程序员文章站
2022-07-12 12:31:29
...
Spring AMQP 整合RabbitMQ是如何消费消息的
先看下我们的消费者配置
<bean id="eacct2219MessageConsumer" class="cpcn.payment.eaccountbatch.mq.Eacct2219Consumer" />
<bean id="eacct2219MessageAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="eacct2219MessageConsumer" />
<property name="defaultListenerMethod" value="handleTxMsg"></property>
<property name="messageConverter" ref="messageConverter"></property>
</bean>
<bean id="eacct2219MessagelistenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="queueNames" value="E1174b_Eacct2219Message_Eacct"></property>
<property name="connectionFactory" ref="cxConsumerConnectionFactory"></property>
<property name="messageListener" ref="eacct2219MessageAdapter"></property>
<property name="maxConcurrentConsumers" value="100" />
<property name="concurrentConsumers" value="20" />
<property name="adviceChain">
<array>
<ref bean="retryInterceptor" />
</array>
</property>
<property name="autoStartup" value="false" />
</bean>
我们定义了一个Consumer ,这个consumer有一个处理消息的方法 handleTxMsg
public abstract class SmartMQConsumer {
protected static final Loggerx LOGGER = Loggerx.getLogger("dao");
public void handleTxMsg(Object message) {
try {
if (message instanceof TxRequest) {
TxRequest baseRequest = (TxRequest) message;
LogContextVO baseContext = baseRequest.getLogContext();
if (baseContext != null) {
String transferID = baseContext.getTransferID();
baseContext.setCurrentTransferID(transferID);
baseContext.setNextTransferID(0);
LogContextHolder.getInstance().setLogContext(baseContext);
}
}
LOGGER.info(LogType.MQ, "接收MQ消息内容:" + message.getClass().getSimpleName() + ObjectTranslateUtils.getDesensitizationJSON(message));
handler(message);
} catch (Exception e) {
LOGGER.error(LogType.EX, e, e);
} finally {
LogContextHolder.getInstance().removeLogContext();
}
}
public abstract void handler(Object message);
}
容器启动之后有个 run()方法,重点关注 run方法中的,
continuable = receiveAndExecute(this.consumer) && !isChannelTransacted();
这一行里面的receiveAndExecute方法。
@Override
public void run() {
boolean aborted = false;
int consecutiveIdles = 0;
int consecutiveMessages = 0;
try {
try {
SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();
this.consumer.start();
this.start.countDown();
}
catch (QueuesNotAvailableException e) {
if (SimpleMessageListenerContainer.this.missingQueuesFatal) {
throw e;
}
else {
this.start.countDown();
handleStartupFailure(e);
throw e;
}
}
catch (FatalListenerStartupException ex) {
throw ex;
}
catch (Throwable t) {
this.start.countDown();
handleStartupFailure(t);
throw t;
}
if (SimpleMessageListenerContainer.this.transactionManager != null) {
/*
* Register the consumer's channel so it will be used by the transaction manager
* if it's an instance of RabbitTransactionManager.
*/
ConsumerChannelRegistry.registerConsumerChannel(consumer.getChannel(), getConnectionFactory());
}
// Always better to stop receiving as soon as possible if
// transactional
boolean continuable = false;
while (isActive(this.consumer) || continuable) {
try {
// Will come back false when the queue is drained
continuable = receiveAndExecute(this.consumer) && !isChannelTransacted();
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
if (continuable) {
consecutiveIdles = 0;
if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
considerAddingAConsumer();
consecutiveMessages = 0;
}
}
else {
consecutiveMessages = 0;
if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
considerStoppingAConsumer(this.consumer);
consecutiveIdles = 0;
}
}
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
/*
* These will normally be wrapped by an LEFE if thrown by the
* listener, but we will also honor it if thrown by an
* error handler.
*/
}
}
}
查看这个方法中有判断事务还是不是事务的方法,关注
doReceiveAndExecute(consumer);
private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {
if (transactionManager != null) {
try {
return new TransactionTemplate(transactionManager, transactionAttribute)
.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
ConnectionFactoryUtils.bindResourceToTransaction(
new RabbitResourceHolder(consumer.getChannel(), false), getConnectionFactory(), true);
try {
return doReceiveAndExecute(consumer);
} catch (RuntimeException e) {
throw e;
} catch (Throwable e) {
throw new WrappedTransactionException(e);
}
}
});
} catch (WrappedTransactionException e) {
throw e.getCause();
}
}
return doReceiveAndExecute(consumer);
}
查看doReceiveAndExecute()方法
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable {
Channel channel = consumer.getChannel();
for (int i = 0; i < txSize; i++) {
logger.trace("Waiting for message from consumer.");
Message message = consumer.nextMessage(receiveTimeout);
if (message == null) {
break;
}
try {
executeListener(channel, message);
} catch (ImmediateAcknowledgeAmqpException e) {
break;
} catch (Throwable ex) {
consumer.rollbackOnExceptionIfNecessary(ex);
throw ex;
}
}
return consumer.commitIfNecessary(isChannelLocallyTransacted(channel));
}
查看 consumer.commitIfNecessary(isChannelLocallyTransacted(channel)); 就知道为啥,每次eacct2219MessageConsumer处理完之后 ,队列上的消息就自动删除了
public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
if (deliveryTags.isEmpty()) {
return false;
}
try {
boolean ackRequired = !acknowledgeMode.isAutoAck() && !acknowledgeMode.isManual();
if (ackRequired) {
if (transactional && !locallyTransacted) {
// Not locally transacted but it is transacted so it
// could be synchronized with an external transaction
for (Long deliveryTag : deliveryTags) {
ConnectionFactoryUtils.registerDeliveryTag(connectionFactory, channel, deliveryTag);
}
} else {
long deliveryTag = new ArrayList<Long>(deliveryTags).get(deliveryTags.size() - 1);
channel.basicAck(deliveryTag, true);
}
}
if (locallyTransacted) {
// For manual acks we still need to commit
RabbitUtils.commitIfNecessary(channel);
}
} finally {
deliveryTags.clear();
}
return true;
}
咱配置的默认是Auto 的,又不是事务,走的是这个方法
long deliveryTag = new ArrayList<Long>(deliveryTags).get(deliveryTags.size() - 1);
channel.basicAck(deliveryTag, true);
}
channel.basicAck(deliverTag,true) 这个方法,批量应答,应答完之后,消息就删除了
如果你想把消费者改改, 改成ChannelAwareMessageListener 这个类型的
可以这样干
xml 配置
<!-- 声明消息转换器为SimpleMessageConverter -->
<bean id="messageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"></bean>
<bean id="iFM1000MessageConsumer" class="cpcn.payment.eaccountbatch.mq.IFM1000Consumer" >
<property name="messageConverter" ref="messageConverter"></property>
</bean>
/**
* 版权所有 (c) 2018,xiaoming有限公司
*/
package cpcn.payment.tools.middleware.mq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import com.rabbitmq.client.Channel;
import cpcn.payment.tools.frame.ObjectTranslateUtils;
import cpcn.payment.tools.log.LogType;
/**
* 类说明
*
* <pre>
* Modify Information:
* Author Date Description
* ============ =========== ============================
* DELL 2018年8月6日 Create this file
* </pre>
*
*/
public abstract class PubSubConsumer extends SmartMQConsumer implements ChannelAwareMessageListener {
private MessageConverter messageConverter;
protected MessageConverter getMessageConverter() {
return this.messageConverter;
}
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
protected Object extractMessage(Message message) throws Exception {
MessageConverter converter = getMessageConverter();
if (converter != null) {
return converter.fromMessage(message);
}
return message;
}
public abstract boolean isReject(String routingKey);
/**
* @see cpcn.payment.tools.middleware.mq.SmartMQConsumer#handler(java.lang.Object)
*/
@Override
public void handler(Object message) {
System.out.println(message instanceof PubSubConsumer);
if(message instanceof PubSubMessage){
LOGGER.info(LogType.MQ, "接收MQ消息内容:" + message.getClass().getSimpleName() + ObjectTranslateUtils.getDesensitizationJSON(message));
handlerSub(message);
}
}
/**
* @param message
*/
public abstract void handlerSub(Object message) ;
/**
* @see org.springframework.amqp.rabbit.core.ChannelAwareMessageListener#onMessage(org.springframework.amqp.core.Message, com.rabbitmq.client.Channel)
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println(message);
if(isReject(message.getMessageProperties().getReceivedRoutingKey())){
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}else{
Object msg = extractMessage(message);
if(msg instanceof PubSubMessage){
LOGGER.info(LogType.MQ, "接收MQ消息内容:" + msg.getClass().getSimpleName() + ObjectTranslateUtils.getDesensitizationJSON(msg));
handlerSub(msg);
}
}
}
}