欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ整合SpringBoot

程序员文章站 2022-06-16 20:39:11
RabbitMQ:消息发送确认 与 消息接收确认(ACK):https://www.jianshu.com/p/2c5eebfd0e95springboot2.3.1整合RabbitMQ多种工作模式 发送确认,手动应答:https://blog.csdn.net/leilei1366615/article/details/107373033/之前一直以为只有一个消息接受确认,看到上篇博文让我茅塞顿开,还分一个发送确认;其中消息接受确认可以分为手工和自动。1.RabbitmqConfig.ja...

SpringBoot+RabbitMQ发送确认和消费手动确认机制:https://www.jianshu.com/p/fae8fca98522

RabbitMQ:消息发送确认 与 消息接收确认(ACK):https://www.jianshu.com/p/2c5eebfd0e95

springboot2.3.1整合RabbitMQ多种工作模式 发送确认,手动应答:https://blog.csdn.net/leilei1366615/article/details/107373033/

之前一直以为只有一个消息接受确认,看到上篇博文让我茅塞顿开,还分一个发送确认;其中消息接受确认可以分为手工和自动。

RabbitMQ整合SpringBoot

1.RabbitmqConfig.java 配置类

package com.zdxf.ecommerce.rabbitmq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;


@Configuration
@ConditionalOnProperty("zdxf.quene.rabbitmq.address")
public class RabbitmqConfig {

	@Value("${zdxf.quene.rabbitmq.address}")
	private String address;

	@Value("${zdxf.quene.rabbitmq.port}")
	private Integer port;

	@Value("${zdxf.quene.rabbitmq.username}")
	private String username;

	@Value("${zdxf.quene.rabbitmq.password}")
	private String password;
	
	@Value("${zdxf.quene.rabbitmq.listener.acknowledge-mode}")
	private String acknowledge_mode;
	
	@Value("${zdxf.quene.rabbitmq.publisher-confirms}")
	private Boolean publisher_confirms;
	
	@Value("${zdxf.quene.rabbitmq.publisher-returns}")
	private Boolean publisher_returns;
	
	@Value("${zdxf.quene.rabbitmq.template.mandatory}")
	private Boolean template_mandatory;
	
	@Value("${zdxf.quene.rabbitmq.pay.ttl.queue_name}")
	private String pay_ttl_queue_name;
	
	@Value("${zdxf.quene.rabbitmq.pay.ttl.exchange_name}")
	private String pay_ttl_exchange_name;
	
	@Value("${zdxf.quene.rabbitmq.pay.consumer.queue_name}")
	private String pay_consumer_queue_name;
	
	@Value("${zdxf.quene.rabbitmq.pay.consumer.exchange_name}")
	private String pay_consumer_exchange_name;
	
	@Value("${zdxf.quene.rabbitmq.queue_name}")
	private String queue_name;
	
	@Value("${zdxf.quene.rabbitmq.exchange_name}")
	private String exchange_name;
	
	@Bean(name = "queneConnectionFactory")
	public ConnectionFactory queneConnectionFactory() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		connectionFactory.setAddresses(address);
		//connectionFactory.setHost(address);
		//connectionFactory.setPort(port);
		connectionFactory.setUsername(username);
		connectionFactory.setPassword(password);
		connectionFactory.setPublisherConfirms(publisher_confirms);
		connectionFactory.setPublisherReturns(publisher_returns);
		return connectionFactory;
	}

	@Bean(name = "queneRabbitTemplate")
	public RabbitTemplate queneRabbitTemplate() {
		RabbitTemplate firstRabbitTemplate = new RabbitTemplate(queneConnectionFactory());
		//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
		firstRabbitTemplate.setMandatory(template_mandatory);
		return firstRabbitTemplate;
	}

	@Bean(name = "queneRabbitListenerContainerFactory")
	public SimpleRabbitListenerContainerFactory queneRabbitListenerContainerFactory() {
		
		AcknowledgeMode mode ;
		if("AUTO".equals(acknowledge_mode)) {
			mode = AcknowledgeMode.AUTO;
		}else if("MANUAL".equals(acknowledge_mode)){
			mode = AcknowledgeMode.MANUAL;
		}else {
			mode = AcknowledgeMode.NONE;
		}
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		factory.setConnectionFactory(queneConnectionFactory());
		factory.setAcknowledgeMode(mode);//这块是auto自动确认

		return factory;
	}

	public Integer getPort() {
		return port;
	}
	public void setPort(Integer port) {
		this.port = port;
	}
	public String getUsername() {
		return username;
	}
	public void setUsername(String username) {
		this.username = username;
	}
	public String getPassword() {
		return password;
	}
	public void setPassword(String password) {
		this.password = password;
	}
	public String getAcknowledge_mode() {
		return acknowledge_mode;
	}
	public void setAcknowledge_mode(String acknowledge_mode) {
		this.acknowledge_mode = acknowledge_mode;
	}
	public Boolean getPublisher_confirms() {
		return publisher_confirms;
	}
	public void setPublisher_confirms(Boolean publisher_confirms) {
		this.publisher_confirms = publisher_confirms;
	}
	public Boolean getPublisher_returns() {
		return publisher_returns;
	}
	public void setPublisher_returns(Boolean publisher_returns) {
		this.publisher_returns = publisher_returns;
	}
	public Boolean getTemplate_mandatory() {
		return template_mandatory;
	}
	public void setTemplate_mandatory(Boolean template_mandatory) {
		this.template_mandatory = template_mandatory;
	}
	public String getQueue_name() {
		return queue_name;
	}
	public void setQueue_name(String queue_name) {
		this.queue_name = queue_name;
	}
	public String getExchange_name() {
		return exchange_name;
	}
	public void setExchange_name(String exchange_name) {
		this.exchange_name = exchange_name;
	}

	public String getAddress() {
		return address;
	}

	public void setAddress(String address) {
		this.address = address;
	}

	public String getPay_ttl_queue_name() {
		return pay_ttl_queue_name;
	}

	public void setPay_ttl_queue_name(String pay_ttl_queue_name) {
		this.pay_ttl_queue_name = pay_ttl_queue_name;
	}

	public String getPay_ttl_exchange_name() {
		return pay_ttl_exchange_name;
	}

	public void setPay_ttl_exchange_name(String pay_ttl_exchange_name) {
		this.pay_ttl_exchange_name = pay_ttl_exchange_name;
	}

	public String getPay_consumer_queue_name() {
		return pay_consumer_queue_name;
	}

	public void setPay_consumer_queue_name(String pay_consumer_queue_name) {
		this.pay_consumer_queue_name = pay_consumer_queue_name;
	}

	public String getPay_consumer_exchange_name() {
		return pay_consumer_exchange_name;
	}

	public void setPay_consumer_exchange_name(String pay_consumer_exchange_name) {
		this.pay_consumer_exchange_name = pay_consumer_exchange_name;
	}
	
	
	
}

 2.RabbitSenderConfig  定义类(交换机、队列、路由键自定义和绑定关系)

package com.zdxf.ecommerce.rabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;  
  
@Configuration
public class RabbitSenderConfig {  
	
	@Autowired
	private RabbitmqConfig rabbitmqConfig;
  
	//定义Quene
    @Bean  
    public Queue ecSoQueue() {  
        return new Queue(rabbitmqConfig.getQueue_name(),true);  
    }  
    
    //定义交互器
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(rabbitmqConfig.getExchange_name());
    }
    
    //绑定交互器和Quene
    @Bean
    public Binding bindingExchangeMessage(Queue ecSoQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(ecSoQueue).to(topicExchange).with(rabbitmqConfig.getQueue_name());
    }
    
    @Bean
    public RabbitAdmin rabbitAdmin(@Qualifier("queneConnectionFactory") ConnectionFactory connectionFactory){
        return new RabbitAdmin(connectionFactory);
    }
    
    
    //定义支付超时消费Quene
    @Bean  
    public Queue orderQueue() {  
        return new Queue(rabbitmqConfig.getPay_consumer_queue_name(),true);  
    }  
    
    //定义支付超时消费交互器
    @Bean
    public TopicExchange topicOrderExchange() {
        return new TopicExchange(rabbitmqConfig.getPay_consumer_exchange_name());
    }
    
    //绑定支付超时延时消费交互器和Quene
    @Bean
    public Binding bindingOrderExchangeMessage(Queue orderQueue, TopicExchange topicOrderExchange) {
        return BindingBuilder.bind(orderQueue).to(topicOrderExchange).with(rabbitmqConfig.getPay_consumer_queue_name());
    }
     
	// 支付超时延时队列绑定交换机
	@Bean
	public TopicExchange orderTtlDirect() {
		return new TopicExchange(rabbitmqConfig.getPay_ttl_exchange_name());
	}
        
    //将支付延迟队列绑定到交换机
	@Bean
	Binding orderTtlBinding(TopicExchange orderTtlDirect, Queue orderTtlQueue) {
		return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect)
				.with(rabbitmqConfig.getPay_ttl_queue_name());
	}
	
	// 订单延迟队列(死信队列)
	@Bean
	public Queue orderTtlQueue() {
		return QueueBuilder.durable(rabbitmqConfig.getPay_ttl_queue_name())
				.withArgument("x-dead-letter-exchange", rabbitmqConfig.getPay_consumer_exchange_name())//到期后转发的交换机
				.withArgument("x-dead-letter-routing-key",rabbitmqConfig.getPay_consumer_queue_name())//到期后转发的路由键
				.build();
	}

}  

3.RabbitSender   消息发送,这里面配置了消息发送的确认回调,一个是只确认是否正确到达 Exchange 中(confirm()),另一个是比如路由不到队列时触发回调,也就是没有到达队列时 触发此回调(returnedMessage())

package com.zdxf.ecommerce.rabbitmq;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;


/**
 * @Description 消息发送确认
 * <p>
 * ConfirmCallback  只确认消息是否正确到达 Exchange 中
 * ReturnCallback   消息没有正确到达队列时触发回调,如果正确到达队列不执行
 * <p>
 * 1. 如果消息没有到exchange,则confirm回调,ack=false
 * 2. 如果消息到达exchange,则confirm回调,ack=true
 * 3. exchange到queue成功,则不回调return
 * 4. exchange到queue失败,则回调return
 */
@Component
public class RabbitSender implements ConfirmCallback, ReturnCallback {  
	
	@Autowired
	private RabbitmqConfig rabbitmqConfig;
      
    @Resource(name = "queneRabbitTemplate")
    private RabbitTemplate rabbitTemplate;  
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

	/**
	 * 发送消息方法调用,构建message消息
	 * @param context
	 */
	public void send(String context) {
    	logger.debug("RabbitSender send : {}" , context);
        CorrelationData correlationId = new CorrelationData(context);
        MessagePostProcessor messagePost = message-> {
        	message.getMessageProperties().setHeader("x-ha-policy", "all");
        	return message;
		};
		rabbitTemplate.convertAndSend(rabbitmqConfig.getExchange_name(),rabbitmqConfig.getQueue_name(),context,messagePost,correlationId);
	    
    }

    public void sendTTL(String context,long delayTimes) {  
    	logger.debug("RabbitSender send ttl : {}" , context);
        CorrelationData correlationId = new CorrelationData(context);
        MessagePostProcessor messagePost = message-> {
        	message.getMessageProperties().setHeader("x-ha-policy", "all");
        	message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
        	return message;
		};

		rabbitTemplate.convertAndSend(rabbitmqConfig.getPay_ttl_exchange_name(),rabbitmqConfig.getPay_ttl_queue_name(),context,messagePost,correlationId);    
    }


	/**
	 * 回调函数  return返回
	 * 通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调
	 * 也就是没有到达队列时 触发此回调
	 * @param message
	 * @param replyCode
	 * @param replyText
	 * @param exchange
	 * @param routingKey
	 */
	@Override
	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
		logger.debug("RabbitSender returnedMessage : CorrelationId : {} , replyCode : {} ,exchange : {} , routingKey : {} , body : {} "
				,message.getMessageProperties().getCorrelationIdString(),replyCode,replyText,exchange,routingKey,new String(message.getBody()));
	}


	/**
	 * 回调函数  confirm确认
	 * 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,
	 * 确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
	 * @param correlationData
	 * @param ack
	 * @param cause
	 */
	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		if (ack) {  
			logger.debug("RabbitSender send success : {}" , correlationData);
        } else {  
        	logger.debug("RabbitSender send failed : {}" , cause);
        	
        }
	}  
}

4.RabbitReciver  消息接收处理类,接收到消息之后的逻辑处理,本项目使用的不是手工应答(ACK确认),所以可能会存在诸多不好的地方,比如消息可能会丢失。

package com.zdxf.ecommerce.rabbitmq;

import com.alibaba.fastjson.JSONObject;
import com.google.gson.reflect.TypeToken;
import com.zdxf.ecommerce.model.order.PaymentOrderInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.google.gson.Gson;
import com.zdxf.ecommerce.ec.service.common.OrderJobReviverService;
import com.zdxf.ecommerce.util.RedisUtil;

import java.lang.reflect.Type;
import java.util.List;

@Component
@RabbitListener(queues = "${RABBIT_MQ_QUEUE_NAME}", containerFactory = "queneRabbitListenerContainerFactory")  
public class RabbitReciver {

	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	@Autowired
	private RedisUtil redisUtil;
	
	@Autowired
	private OrderJobReviverService orderJobReciverService;
	
	private final static String key="ecmmerce_redis_key_rabbit_receiver_order_sn";
	
	
	@RabbitHandler
	public void process(String content) {

		// 参数校验
		logger.info("RabbitReceiver receive : {}", content);
		if (content == null || content.isEmpty()) {
			logger.error("RabbitReceiver receive content is empty");
			return;
		}


		Type type = new TypeToken<List<PaymentOrderInfo>>() {}.getType();
		List<PaymentOrderInfo> paymentOrderInfos = JSONObject.parseObject(content, type);
		if (paymentOrderInfos == null) {
			logger.error("RabbitReceiver orderInfo is null");
			return;
		}

		// 防止重复执行验证
		for (PaymentOrderInfo paymentOrderInfo : paymentOrderInfos) {
			String order_sn = paymentOrderInfo.getOrderSn();
			boolean flag = redisUtil.lock(key + order_sn, order_sn);
			if (!flag) {
				logger.error("RabbitReceiver order_sn have handled : {}", order_sn);
				return;
			}
			redisUtil.expire(key + order_sn, 10);// 设置10秒过期
		}


		// 执行订单处理逻辑
		orderJobReciverService.submitMQReciver(paymentOrderInfos);
		
		logger.info("RabbitReceiver handle success : {}");

	}
	
	
	
}

最后,在下单的时候发送一条消息到mq中

// 写入消息队列
rabbitSender.send(JSONObject.toJSONString(paymentOrderInfos));

 

本文地址:https://blog.csdn.net/zhangleiyes123/article/details/109625929