RabbitMQ整合SpringBoot
程序员文章站
2022-06-16 20:39:11
RabbitMQ:消息发送确认 与 消息接收确认(ACK):https://www.jianshu.com/p/2c5eebfd0e95springboot2.3.1整合RabbitMQ多种工作模式 发送确认,手动应答:https://blog.csdn.net/leilei1366615/article/details/107373033/之前一直以为只有一个消息接受确认,看到上篇博文让我茅塞顿开,还分一个发送确认;其中消息接受确认可以分为手工和自动。1.RabbitmqConfig.ja...
SpringBoot+RabbitMQ发送确认和消费手动确认机制:https://www.jianshu.com/p/fae8fca98522
RabbitMQ:消息发送确认 与 消息接收确认(ACK):https://www.jianshu.com/p/2c5eebfd0e95
springboot2.3.1整合RabbitMQ多种工作模式 发送确认,手动应答:https://blog.csdn.net/leilei1366615/article/details/107373033/
之前一直以为只有一个消息接受确认,看到上篇博文让我茅塞顿开,还分一个发送确认;其中消息接受确认可以分为手工和自动。
1.RabbitmqConfig.java 配置类
package com.zdxf.ecommerce.rabbitmq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
@ConditionalOnProperty("zdxf.quene.rabbitmq.address")
public class RabbitmqConfig {
@Value("${zdxf.quene.rabbitmq.address}")
private String address;
@Value("${zdxf.quene.rabbitmq.port}")
private Integer port;
@Value("${zdxf.quene.rabbitmq.username}")
private String username;
@Value("${zdxf.quene.rabbitmq.password}")
private String password;
@Value("${zdxf.quene.rabbitmq.listener.acknowledge-mode}")
private String acknowledge_mode;
@Value("${zdxf.quene.rabbitmq.publisher-confirms}")
private Boolean publisher_confirms;
@Value("${zdxf.quene.rabbitmq.publisher-returns}")
private Boolean publisher_returns;
@Value("${zdxf.quene.rabbitmq.template.mandatory}")
private Boolean template_mandatory;
@Value("${zdxf.quene.rabbitmq.pay.ttl.queue_name}")
private String pay_ttl_queue_name;
@Value("${zdxf.quene.rabbitmq.pay.ttl.exchange_name}")
private String pay_ttl_exchange_name;
@Value("${zdxf.quene.rabbitmq.pay.consumer.queue_name}")
private String pay_consumer_queue_name;
@Value("${zdxf.quene.rabbitmq.pay.consumer.exchange_name}")
private String pay_consumer_exchange_name;
@Value("${zdxf.quene.rabbitmq.queue_name}")
private String queue_name;
@Value("${zdxf.quene.rabbitmq.exchange_name}")
private String exchange_name;
@Bean(name = "queneConnectionFactory")
public ConnectionFactory queneConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(address);
//connectionFactory.setHost(address);
//connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(publisher_confirms);
connectionFactory.setPublisherReturns(publisher_returns);
return connectionFactory;
}
@Bean(name = "queneRabbitTemplate")
public RabbitTemplate queneRabbitTemplate() {
RabbitTemplate firstRabbitTemplate = new RabbitTemplate(queneConnectionFactory());
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
firstRabbitTemplate.setMandatory(template_mandatory);
return firstRabbitTemplate;
}
@Bean(name = "queneRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory queneRabbitListenerContainerFactory() {
AcknowledgeMode mode ;
if("AUTO".equals(acknowledge_mode)) {
mode = AcknowledgeMode.AUTO;
}else if("MANUAL".equals(acknowledge_mode)){
mode = AcknowledgeMode.MANUAL;
}else {
mode = AcknowledgeMode.NONE;
}
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(queneConnectionFactory());
factory.setAcknowledgeMode(mode);//这块是auto自动确认
return factory;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getAcknowledge_mode() {
return acknowledge_mode;
}
public void setAcknowledge_mode(String acknowledge_mode) {
this.acknowledge_mode = acknowledge_mode;
}
public Boolean getPublisher_confirms() {
return publisher_confirms;
}
public void setPublisher_confirms(Boolean publisher_confirms) {
this.publisher_confirms = publisher_confirms;
}
public Boolean getPublisher_returns() {
return publisher_returns;
}
public void setPublisher_returns(Boolean publisher_returns) {
this.publisher_returns = publisher_returns;
}
public Boolean getTemplate_mandatory() {
return template_mandatory;
}
public void setTemplate_mandatory(Boolean template_mandatory) {
this.template_mandatory = template_mandatory;
}
public String getQueue_name() {
return queue_name;
}
public void setQueue_name(String queue_name) {
this.queue_name = queue_name;
}
public String getExchange_name() {
return exchange_name;
}
public void setExchange_name(String exchange_name) {
this.exchange_name = exchange_name;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getPay_ttl_queue_name() {
return pay_ttl_queue_name;
}
public void setPay_ttl_queue_name(String pay_ttl_queue_name) {
this.pay_ttl_queue_name = pay_ttl_queue_name;
}
public String getPay_ttl_exchange_name() {
return pay_ttl_exchange_name;
}
public void setPay_ttl_exchange_name(String pay_ttl_exchange_name) {
this.pay_ttl_exchange_name = pay_ttl_exchange_name;
}
public String getPay_consumer_queue_name() {
return pay_consumer_queue_name;
}
public void setPay_consumer_queue_name(String pay_consumer_queue_name) {
this.pay_consumer_queue_name = pay_consumer_queue_name;
}
public String getPay_consumer_exchange_name() {
return pay_consumer_exchange_name;
}
public void setPay_consumer_exchange_name(String pay_consumer_exchange_name) {
this.pay_consumer_exchange_name = pay_consumer_exchange_name;
}
}
2.RabbitSenderConfig 定义类(交换机、队列、路由键自定义和绑定关系)
package com.zdxf.ecommerce.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitSenderConfig {
@Autowired
private RabbitmqConfig rabbitmqConfig;
//定义Quene
@Bean
public Queue ecSoQueue() {
return new Queue(rabbitmqConfig.getQueue_name(),true);
}
//定义交互器
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(rabbitmqConfig.getExchange_name());
}
//绑定交互器和Quene
@Bean
public Binding bindingExchangeMessage(Queue ecSoQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(ecSoQueue).to(topicExchange).with(rabbitmqConfig.getQueue_name());
}
@Bean
public RabbitAdmin rabbitAdmin(@Qualifier("queneConnectionFactory") ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
//定义支付超时消费Quene
@Bean
public Queue orderQueue() {
return new Queue(rabbitmqConfig.getPay_consumer_queue_name(),true);
}
//定义支付超时消费交互器
@Bean
public TopicExchange topicOrderExchange() {
return new TopicExchange(rabbitmqConfig.getPay_consumer_exchange_name());
}
//绑定支付超时延时消费交互器和Quene
@Bean
public Binding bindingOrderExchangeMessage(Queue orderQueue, TopicExchange topicOrderExchange) {
return BindingBuilder.bind(orderQueue).to(topicOrderExchange).with(rabbitmqConfig.getPay_consumer_queue_name());
}
// 支付超时延时队列绑定交换机
@Bean
public TopicExchange orderTtlDirect() {
return new TopicExchange(rabbitmqConfig.getPay_ttl_exchange_name());
}
//将支付延迟队列绑定到交换机
@Bean
Binding orderTtlBinding(TopicExchange orderTtlDirect, Queue orderTtlQueue) {
return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect)
.with(rabbitmqConfig.getPay_ttl_queue_name());
}
// 订单延迟队列(死信队列)
@Bean
public Queue orderTtlQueue() {
return QueueBuilder.durable(rabbitmqConfig.getPay_ttl_queue_name())
.withArgument("x-dead-letter-exchange", rabbitmqConfig.getPay_consumer_exchange_name())//到期后转发的交换机
.withArgument("x-dead-letter-routing-key",rabbitmqConfig.getPay_consumer_queue_name())//到期后转发的路由键
.build();
}
}
3.RabbitSender 消息发送,这里面配置了消息发送的确认回调,一个是只确认是否正确到达 Exchange 中(confirm()),另一个是比如路由不到队列时触发回调,也就是没有到达队列时 触发此回调(returnedMessage())
package com.zdxf.ecommerce.rabbitmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* @Description 消息发送确认
* <p>
* ConfirmCallback 只确认消息是否正确到达 Exchange 中
* ReturnCallback 消息没有正确到达队列时触发回调,如果正确到达队列不执行
* <p>
* 1. 如果消息没有到exchange,则confirm回调,ack=false
* 2. 如果消息到达exchange,则confirm回调,ack=true
* 3. exchange到queue成功,则不回调return
* 4. exchange到queue失败,则回调return
*/
@Component
public class RabbitSender implements ConfirmCallback, ReturnCallback {
@Autowired
private RabbitmqConfig rabbitmqConfig;
@Resource(name = "queneRabbitTemplate")
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 发送消息方法调用,构建message消息
* @param context
*/
public void send(String context) {
logger.debug("RabbitSender send : {}" , context);
CorrelationData correlationId = new CorrelationData(context);
MessagePostProcessor messagePost = message-> {
message.getMessageProperties().setHeader("x-ha-policy", "all");
return message;
};
rabbitTemplate.convertAndSend(rabbitmqConfig.getExchange_name(),rabbitmqConfig.getQueue_name(),context,messagePost,correlationId);
}
public void sendTTL(String context,long delayTimes) {
logger.debug("RabbitSender send ttl : {}" , context);
CorrelationData correlationId = new CorrelationData(context);
MessagePostProcessor messagePost = message-> {
message.getMessageProperties().setHeader("x-ha-policy", "all");
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
};
rabbitTemplate.convertAndSend(rabbitmqConfig.getPay_ttl_exchange_name(),rabbitmqConfig.getPay_ttl_queue_name(),context,messagePost,correlationId);
}
/**
* 回调函数 return返回
* 通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调
* 也就是没有到达队列时 触发此回调
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.debug("RabbitSender returnedMessage : CorrelationId : {} , replyCode : {} ,exchange : {} , routingKey : {} , body : {} "
,message.getMessageProperties().getCorrelationIdString(),replyCode,replyText,exchange,routingKey,new String(message.getBody()));
}
/**
* 回调函数 confirm确认
* 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,
* 确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.debug("RabbitSender send success : {}" , correlationData);
} else {
logger.debug("RabbitSender send failed : {}" , cause);
}
}
}
4.RabbitReciver 消息接收处理类,接收到消息之后的逻辑处理,本项目使用的不是手工应答(ACK确认),所以可能会存在诸多不好的地方,比如消息可能会丢失。
package com.zdxf.ecommerce.rabbitmq;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.reflect.TypeToken;
import com.zdxf.ecommerce.model.order.PaymentOrderInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.gson.Gson;
import com.zdxf.ecommerce.ec.service.common.OrderJobReviverService;
import com.zdxf.ecommerce.util.RedisUtil;
import java.lang.reflect.Type;
import java.util.List;
@Component
@RabbitListener(queues = "${RABBIT_MQ_QUEUE_NAME}", containerFactory = "queneRabbitListenerContainerFactory")
public class RabbitReciver {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private RedisUtil redisUtil;
@Autowired
private OrderJobReviverService orderJobReciverService;
private final static String key="ecmmerce_redis_key_rabbit_receiver_order_sn";
@RabbitHandler
public void process(String content) {
// 参数校验
logger.info("RabbitReceiver receive : {}", content);
if (content == null || content.isEmpty()) {
logger.error("RabbitReceiver receive content is empty");
return;
}
Type type = new TypeToken<List<PaymentOrderInfo>>() {}.getType();
List<PaymentOrderInfo> paymentOrderInfos = JSONObject.parseObject(content, type);
if (paymentOrderInfos == null) {
logger.error("RabbitReceiver orderInfo is null");
return;
}
// 防止重复执行验证
for (PaymentOrderInfo paymentOrderInfo : paymentOrderInfos) {
String order_sn = paymentOrderInfo.getOrderSn();
boolean flag = redisUtil.lock(key + order_sn, order_sn);
if (!flag) {
logger.error("RabbitReceiver order_sn have handled : {}", order_sn);
return;
}
redisUtil.expire(key + order_sn, 10);// 设置10秒过期
}
// 执行订单处理逻辑
orderJobReciverService.submitMQReciver(paymentOrderInfos);
logger.info("RabbitReceiver handle success : {}");
}
}
最后,在下单的时候发送一条消息到mq中
// 写入消息队列
rabbitSender.send(JSONObject.toJSONString(paymentOrderInfos));
本文地址:https://blog.csdn.net/zhangleiyes123/article/details/109625929
上一篇: 测试守护重构