RabbitMQ异常处理方案设计
程序员文章站
2022-07-15 13:06:56
...
导语:根据业务给MQ异常处理设置优先级:如低、中等、紧急,当MQ发生异常时通过告警邮件通知和记录到数据库中,对于低和中等的异常采用定时任务轮询去重新投递,紧急的异常例如订单支付等则需要开发者尽快去手动处理最佳。对于MQ中发生的异常有以下三种: confirm异常、returnCallBack异常、队列监听消费异常,在此次实际项目中有监控模块(死信队列的监控,根据业务类型发送告警邮件;是否将异常写入数据库等待定时任务重新投递)和定时任务(创建重新投递任务,告知监控模块投递触发时机)模块协同处理,以下只是目前所想的方案,其实有着更多的可靠的方案吧,您怎么看?
confirm异常
- confirm确认消息是否投递到MQ服务器。处理逻辑:当未投递成功时,发送到告警队列,发送报警邮件。
returnCallBack异常
- 消息投递到队列发生异常时回调(当消息在交换机根据routeKey找不到投递的队列时发生异常),处理逻辑:发送报警邮件,异常写入数据库; sendErrorMessage()方法中,消息内容message优先级设置为低,根据业务需求设置
队列监听消费异常
- 队列初始化配置死信队列
- 消费的确认机制为手动确认,队列监听消费处理业务代码块做try/catch处理,发生异常nack,拒绝重新入队
- 消费监听发生异常投递重新投递死信队列
- 业务队列发生异常设置处理优先级为中等,订单支付设置为紧急,根据业务需求设置,处理逻辑:发送报警邮件,异常写入数据库。
- MQ发送消息时需要给消息设置请求头属性,包含当前投递exchange、routeKey和异常处理优先级
定时任务
- 通过定时任务轮询表status 1-异常 和优先级为(0-低 1-中等)的数据,重新投递到原始队列 (在原始监听队列 消费成功逻辑中补充,将消息标志id的status设置为2-已解决);
- 重新投递需要知道消息绑定的原始exchange和routeKey, rabbitListenerErrorHandler回调时取不到相关信息,因此在发送消息时在请求头携带;根据业务在MQ发送消息时指定消息的优先级(0-低 1-中等 2-紧急);
- status1-异常和优先级为紧急(订单支付等业务)的异常数据手动处理;
- 重新投递给消息加上消息头属性设置是否重新投递标志,用于MQ监听消费后将数据库该消息ID的修复状态设置为2-已解决
部分实现
- 通发送消息时header设置当前exchenge等属性:
CorrelationData correlationData = new CorrelationData(messageId); rabbitTemplate.convertAndSend(exchange, routeKey, MessageHelper.objToMsg(message.getContent()), message1 -> { message1.getMessageProperties().setHeader(MqConstants.MessageHeadProperties.HEAD_EXCHANGE, exchange); message1.getMessageProperties().setHeader(MqConstants.MessageHeadProperties.HEAD_ROUTE, routeKey); message1.getMessageProperties().setPriority(priority.intValue()); message1.getMessageProperties().setHeader(MqConstants.MessageHeadProperties.IS_RETRY_DELIVER, true); return message1; }, correlationData);
- 监听消费确认:
@RabbitListener(queues = MqConstants.QaInfo.QA_INFO_TIMEOUT_QUEUE_NAME) public void consume(Message message, Channel channel, @Header(value = MqConstants.MQ_HEADER_CORRELATION_ID_KEY, required = false) String correlationId) { if (this.lockToConsume(correlationId)) { Boolean flag = true; try { consumer.consume(message); } catch (Exception e) { flag = false; e.printStackTrace(); } finally { if (flag) { this.consumeConfirm(channel, correlationId, message.getMessageProperties().getDeliveryTag()); String status = message.getMessageProperties().getHeader(MqConstants.MessageHeadProperties.IS_RETRY_DELIVER); if (StringUtils.isNotBlank(status) && Boolean.parseBoolean(status)) { //将数据库记录设置为已解决,此处需要确保代码处理无异常,省略, } } else { this.nack(channel, correlationId, message.getMessageProperties().getDeliveryTag()); } } } else { log.warn("[MQ]消息重复消费, ud: {}", correlationId); this.nack(channel, correlationId, message.getMessageProperties().getDeliveryTag()); } }