RabbitMQ实战篇10-消息确认机制之消息的准确发布
上节讲了消息的持久化,是为了防止在RabbitMQ服务挂掉的情况下消息丢失。在实际的生产中,我们也可能会遇到这样的情况:比如由于网络原因我们的消息并没有发送到消息交换机或者路由到队列,再或者消费者接受到消息后处理消息失败。那么,如何保证消息的正确传递以及消费就是我们需要关注的问题。
这节,我们就来讲解消息确认机制一 :消息的准确发布
消息发布是基于生产者的,所以我们需要在order服务中进行一些配置(因为代码中有很全的注释,所以我在这里就不多啰嗦了哈)
配置yml
首先,在application.yml文件中进行以下配置:
spring:
rabbitmq:
host: xxx.xx.xx.xx
port: 5672
username: xxxx
password: xxxxx
publisher-confirms: true # 消息发送到交换机确认机制,是否确认回调
publisher-returns: true # 消息发送到交换机确认机制,是否返回回馈
声明回调方法
同时,我们需要两个回调方法
package com.space.rbq.order.mqcallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
/**
* 消息发送到交换机确认机制
* @author zhuzhe
* @date 2018/5/25 15:53
* @email aaa@qq.com
*/
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 当消息发送到交换机(exchange)时,该方法被调用.
* 1.如果消息没有到exchange,则 ack=false
* 2.如果消息到达exchange,则 ack=true
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData);
if (ack) {
System.out.println("消息发送到exchange成功");
// TODO 删除 msgId 与 Message 的关系
} else {
System.err.println("消息发送到exchange失败");
// TODO 消息发送到exchange失败 , 重新发送
}
}
}
package com.space.rbq.order.mqcallback;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* @author zhuzhe
* @date 2018/5/25 15:54
* @email aaa@qq.com
*/
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
/**
* 当消息从交换机到队列失败时,该方法被调用。(若成功,则不调用)
* 需要注意的是:该方法调用后,MsgSendConfirmCallBack中的confirm方法也会被调用,且ack = true
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("MsgSendReturnCallback [消息从交换机到队列失败] message:"+message);
// TODO 消息从交换机到队列失败,重新发送
}
}
关于这两个回调方法中的TODO所述不理解的,暂时先不用管,下面的标记消息会说到。
可以看到,RabbitTemplate中提供了两个接口,其中各有一个方法时做回调的。我们只需要实现这两个接口,自定义回调之后需要做什么操作就行。(注释有详细说明)
将回调方法配置到RabbitMqConfig中
之前我们在RabbitMqConfig中配置了交换机和队列的绑定关系。
现在,我们将刚才实现的两个回调方法配置到其中,这样,在我们发送消息的时候就可以根据回调判断消息是否正确的发布,以及做后续处理。详细看代码:
package com.space.rbq.order.config;
import com.space.rbq.order.mqcallback.MsgSendConfirmCallBack;
import com.space.rbq.order.mqcallback.MsgSendReturnCallback;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMq配置
* @author zhuzhe
* @date 2018/5/25 13:37
* @email aaa@qq.com
*/
@Configuration
public class RabbitMqConfig {
/**
* key: queue在该direct-exchange中的key值,当消息发送给direct-exchange中指定key为设置值时,
* 消息将会转发给queue参数指定的消息队列
*/
/** 队列key1*/
public static final String ROUTING_KEY_1 = "queue_one_key1";
public static final String ROUTING_KEY_2 = "queue_one_key2";
@Autowired
private QueueConfig queueConfig;
@Autowired
private ExchangeConfig exchangeConfig;
@Autowired
private ConnectionFactory connectionFactory;
/**
* 将消息队列1和交换机1进行绑定,指定队列key1
*/
@Bean
public Binding binding_one() {
return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_1);
}
/**
* 将消息队列2和交换机1进行绑定,指定队列key2
*/
@Bean
public Binding binding_two() {
return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_2);
}
/**
* 定义rabbit template用于数据的接收和发送
* 可以设置消息确认机制和回调
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// template.setMessageConverter(); 可以自定义消息转换器 默认使用的JDK的,所以消息对象需要实现Serializable
// template.setMessageConverter(new Jackson2JsonMessageConverter());
/**若使用confirm-callback或return-callback,
* 必须要配置publisherConfirms或publisherReturns为true
* 每个rabbitTemplate只能有一个confirm-callback和return-callback
*/
template.setConfirmCallback(msgSendConfirmCallBack());
/**
* 使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true,
* 可针对每次请求的消息去确定’mandatory’的boolean值,
* 只能在提供’return -callback’时使用,与mandatory互斥
*/
template.setReturnCallback(msgSendReturnCallback());
template.setMandatory(true);
return template;
}
/**
* 关于 msgSendConfirmCallBack 和 msgSendReturnCallback 的回调说明:
* 1.如果消息没有到exchange,则confirm回调,ack=false
* 2.如果消息到达exchange,则confirm回调,ack=true
* 3.exchange到queue成功,则不回调return
* 4.exchange到queue失败,则回调return(需设置mandatory=true,否则不回调,消息就丢了)
*/
/**
* 消息确认机制
* Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,
* 哪些可能因为broker宕掉或者网络失败的情况而重新发布。
* 确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
* 在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。
* @return
*/
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack(){
return new MsgSendConfirmCallBack();
}
@Bean
public MsgSendReturnCallback msgSendReturnCallback(){
return new MsgSendReturnCallback();
}
}
标记消息
上面我们已经成功实现了消息发送后的回调,根据回调我们可以知道消息是否正确发布了。
但是还是不够完善,在MsgSendConfirmCallBack.java中,当confirm方法被回调时,我们只能知道消息发送成功或失败了,至于是哪条消息发送失败,我们并不知道。所以,这就需要我们完善了
我们可以看到,在confirm方法中,有correlationData这个参数,同样的,在我们发送消息的时候,也传入了这么一个参数。其实呢,这两个就是同一个。我们可以测试下:
可以看到,这两个correlationData的确是同一个。
所以,我们只需要将correlationData和所发送的消息通过某个东西进行绑定,这样,当调用confirm方法时,我们就可以通过correlationData知道是哪个消息发送成功或是失败了。
接下来我们就将他们两个进行绑定:
package com.space.rbq.order.controller;
import com.google.gson.Gson;
import com.space.rbq.order.bean.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* @author zhuzhe
* @date 2018/6/7 9:48
* @email aaa@qq.com
*/
@Slf4j
@RequestMapping("/order")
@RestController
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 保存order , 同时需要向store服务发送通知减库存
* @param order
* @return
*/
@PostMapping("/save")
public Order saveOrder(Order order){
log.info(order.toString());
Gson gson = new Gson();
String json = gson.toJson(order);
String msgId = UUID.randomUUID().toString();
/**
* 构建Message ,主要是使用 msgId 将 message 和 CorrelationData 关联起来。
* 这样当消息发送到交换机失败的时候,在 MsgSendConfirmCallBack 中就可以根据
* correlationData.getId()即 msgId,知道具体是哪个message发送失败,进而进行处理。
*/
/*将 msgId和 message绑定*/
Message message = MessageBuilder.withBody(json.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setCorrelationId(msgId).build();
/*将 msgId和 CorrelationData绑定*/
CorrelationData correlationData = new CorrelationData(msgId);
// TODO 将 msgId 与 Message 的关系保存起来
/**
* 将 msgId 与 Message 的关系保存起来,例如放到缓存中.
* 当 MsgSendReturnCallback回调时(消息从交换机到队列失败),进行处理 {@code MsgSendReturnCallback}.
* 当 MsgSendConfirmCallBack回调时,进行处理 {@code MsgSendConfirmCallBack}.
* 定时检查这个绑定关系列表,如果发现一些已经超时(自己设定的超时时间)未被处理,则手动处理这些消息.
*/
/**
* 发送消息
* 指定消息交换机 "first_exchange"
* 指定队列key "queue_one_key1"
*/
rabbitTemplate.convertAndSend("first_exchange", "queue_one_key1",
message, correlationData);
return order;
}
}
如此,我们就将消息进行标记了。当发送消息执行回调时,就可以根据回调结果知道消息是否发送成功,以确保消息的准确发布。
在下一节,我们将讨论如何确保消息的正确消费。
源码:https://github.com/zhuzhegithub/rabbitmq
转载请务必保留此出处(原作者):https://blog.csdn.net/zhuzhezhuzhe1
版权声明:本文为原创文章,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。
https://blog.csdn.net/zhuzhezhuzhe1