Spring Cloud Stream消费失败后的处理策略 :自定义错误处理逻辑
前言
之前我们介绍了会生效的消息重试功能,对于一些因环境原因、网络抖动等不稳定因素引发的问题可以起到比较好的作用。
但是对于诸如代码本身存在的逻辑错误等,无论重试多少次都不可能成功的问题,是无法修复的。对于这样的情况,前文中说了可以利用日志记录消息内容,配合告警来做补救,但是很显然,这样做非常原始,并且太过笨拙,处理复杂度过高。
针对该类问题的一种处理方法:自定义错误处理逻辑。
自定义错误处理逻辑
示例
准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑:
@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@RestController
static class TestController {
@Autowired
private TestTopic testTopic;
/**
* 消息生产接口
*
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
}
/**
* 消息消费逻辑
*/
@Slf4j
@Component
static class TestListener {
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload);
throw new RuntimeException("Message consumer failed!");
}
}
interface TestTopic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
}
内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。
在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,比如:
spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.bindings.example-topic-output.destination=test-topic
完成了上面配置之后,启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了,此时可以看到消费失败后抛出了异常,跟上一篇文章的结果一样,消息消费失败,记录了日志,消息信息丢弃。
下面,针对消息消费失败,在TestListener中针对消息消费逻辑创建一段错误处理逻辑,比如:
@Slf4j
@Component
static class TestListener {
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload);
throw new RuntimeException("Message consumer failed!");
}
/**
* 消息消费失败的降级处理逻辑
*
* @param message
*/
@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")
public void error(Message<?> message) {
log.info("Message consumer failed, call fallback!");
}
}
通过使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")
指定了某个通道的错误处理映射。其中,inputChannel的配置中对应关系如下:
-
test-topic
:消息通道对应的目标(destination,即:spring.cloud.stream.bindings.example-topic-input.destination
的配置) -
stream-exception-handler
:消息通道对应的消费组(group,即:spring.cloud.stream.bindings.example-topic-input.group
的配置)
再启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中,此时可以看到日志如下:
2018-12-11 12:00:35.500 INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2018-12-11 12:00:35.512 INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#311db1cb:0/SimpleConnection@40370d8c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 54391]
2018-12-11 12:00:35.527 INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener : Received: hello,
2018-12-11 12:00:38.541 INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener : Message consumer failed, call fallback!
使用DLQ队列(RabbitMQ)
之前的例子抛出异常后,我们需要如下配置来完成加入死信队列
spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true
spring.cloud.stream.bindings.example-topic-output.destination=test-topic
这里加入了一个重要配置spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true
,用来开启DLQ(死信队列)。完成了上面配置之后,启动应用并访问localhost:8080/sendMessage?message=hello
接口来发送一个消息到MQ中了,此时可以看到消费失败后抛出了异常,消息消费失败,记录了日志。此时,可以查看RabbitMQ的控制台如下:
其中,test-topic.stream-exception-handler.dlq
队列就是test-topic.stream-exception-handler
的dlq(死信)队列,当test-topic.stream-exception-handler
队列中的消息消费时候之后,就会将这条消息原封不动的转存到dlq队列中。这样这些没有得到妥善处理的消息就通过简单的配置实现了存储,之后,我们还可以通过简单的操作对这些消息进行重新消费。我们只需要在控制台中点击test-topic.stream-exception-handler.dlq
队列的名字进入到详情页面之后,使用Move messages功能,直接将这些消息移动回test-topic.stream-exception-handler
队列,这样这些消息就能重新被消费一次。
深入思考
场景一:有些消息在业务上存在时效性,进入死信队列之后,过一段时间再处理已经没有意义,这个时候如何过滤这些消息呢?
只需要配置一个参数即可:
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.dlq-ttl=10000
场景二:可能进入DLQ队列的消息存在各种不同的原因(不同异常造成的),此时如果在做补救措施的时候,还希望根据这些异常做不同的处理时候,我们如何区分这些消息进入DLQ的原因呢?
pring.cloud.stream.rabbit.bindings.example-topic-input.consumer.republish-to-dlq=true
该参数默认是false,如果设置了死信队列的时候,会将消息原封不动的发送到死信队列(也就是上面例子中的实现),此时大家可以在RabbitMQ控制台中通过Get message(s)功能来看看队列中的消息,应该如下图所示:如果我们该配置设置为true的时候,那么该消息在进入到死信队列的时候,会在headers中加入错误信息,如下图所示: