【RabbitMQ-6】消费端获取消息(SpringBoot2.0版本)
文章目录
上一节我们学习rabbitmq的推拉模式的理论,那这一节我们学习一下消费者获取消息的代码以及整合SpringBoot的配置吧。
消费者有两种接收消息的方法:
- poll consumer,即拉模式,消费者主动去消息队列拉取消息。
- push consumer,即推模式,消息队列主动往消费者推送消息。
public enum AcknowledgeMode {
NONE, //无应答。
MANUAL, //手动应答
AUTO; //自动应答
}
- NONE:无应答,rabbitmq默认consumer正确处理所有请求。
-
AUTO:consumer自动应答,处理成功(注意:此处的成功确认是没有发生异常)发出ack,处理失败发出nack。rabbitmq发出消息后会等待consumer端应答,只有收到
ack
确定信息后才会将消息在rabbitmq清除掉。收到nack
异常信息的处理方法由setDefaultRequeueReject()
方法设置,这种模式下,发送错误的消息可以恢复。 - MANUAL:基本等同于AUTO模式,区别是需要人为调用方法确认。
1 消费者通过推(PUSH)方式获取消息
实现push模式最简单的方式就是使用
@aaa@qq.com
注解来指定某方法作为消息消费的方法。例如监听某个Queue
的方法。
1.1 配置RabbitListenerContainerFactory
这个bean只会在consumer
端通过@RabbitListener
注解的方式接收消息的时候使用。每个@RabbitListener
注解方法都会由RabbitListenerContainerFactory
创建一个MessageListenerContainer
,负责接收消息。
@Bean( name = "singleListenerContainer" )
public SimpleRabbitListenerContainerFactory listenerContainerFactory()
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
/* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */
factory.setConnectionFactory( connectionFactory() );
/* 消息序列化类型 */
factory.setMessageConverter( new Jackson2JsonMessageConverter() );
/* setConcurrentConsumers:设置每个MessageListenerContainer将会创建的Consumer的最小数量,默认是1个。 */
factory.setConcurrentConsumers( 1 );
factory.setMaxConcurrentConsumers( 1 );
/* setPrefetchCount:设置每次请求发送给每个Consumer的消息数量。 */
factory.setPrefetchCount( 1 );
/* 是否设置Channel的事务。 */
factory.setChannelTransacted( false );
/* setTxSize:设置事务当中可以处理的消息数量。 */
factory.setTxSize( 1 );
/* 设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。 */
factory.setDefaultRequeueRejected( true );
/*
* setErrorHandler:实现ErrorHandler接口设置进去,所有未catch的异常都会由ErrorHandler处理。
* factory.setErrorHandler();
*/
factory.setAcknowledgeMode(AcknowledgeMode.AUTO );
return(factory);
}
1.2 配置@RabbitListener
上面配置了singleListenerContainer
信息,将其加入到containerFactory
中。
@RabbitListener(queues = "queue_direct",
containerFactory = "singleListenerContainer")
public void receive02(Message message, long deliveryTag, Channel channel) {
//获取头信息
int i = 1 / 0;
System.out.println(message.getMessageProperties().getHeaders());
}
注意:queues中声明绑定的队列必须存在。
@RabbitListener注解中指明binding信息,就能自动创建queue、exchange并建立binding关系。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_direct", durable = "true"),
arguments = {}, //可用于headers类型的exchange
exchange = @Exchange(value = "exchange_direct" ,type = ExchangeTypes.DIRECT), //声明交换机的类型
key = "ord" //声明路由主键
), containerFactory = "singleListenerContainer")
public void rec(Message message, long deliveryTag, Channel channel) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
System.out.println("消息体--------->" + message.getBody());
//foreach遍历循环
for (Map.Entry<String, Object> entry : headers.entrySet()) {
System.out.println("消息头:" + entry.getKey() + "---" + entry.getValue());
}
}
(1)在2.0版本之后,可以指定多个routingkey
即key={"ord","con"}
。
(2)exchange属性中,可以使用type = ExchangeTypes.DIRECT
指定不同类型的交换机。
(3)arguments属性,可以用于指定headers类型的exchange。arguments = @Argument(name = "x-message-ttl", value = "10000", type= "java.lang.Integer")),
(4)queue属性中exclusive
,排他队列,只对创建这个queue
的Connection
可见,Connection
关闭,那么这个queue
删除。
(5)queue属性中的autoDelete
,若是这个consumer
下线,那么这个queue
队列将会删除。
bindings注意事项:
- 对于(4)(5)这两种情况,
durable=true
队列持久化是不起作用的。 - 注意
bindings
属性不能和queues
属性同时使用。 - 如果注解声明的
queue
和xchange
以及binging
关系都存在的情况下,但是我们在bindings
属性中又进行配置,那么bindings
新增或者修改的参数都不会生效。但是queue
存在,exchange
存在但是没有binding
,那么应用程序启动后,会自动创建binding
关系。
1.3 @Payload和@Headers
这两个注解可以获取信息体和信息头。
@RabbitListener(queues = "queue_direct", containerFactory = "singleListenerContainer")
public void handleMessage(@Payload Book body, @Headers Map<String, Object> headers) {
System.out.println("-->信息域的值"+body);
for (Map.Entry<String, Object> entry : headers.entrySet()) {
System.out.println("消息头:" + entry.getKey() + "---" + entry.getValue());
}
}
1.4 @RabbitListener和 @RabbitHandler
@RabbitListener可以标注在类上,需要配合@RabbitHandler注解一起使用。当标注在类方法上时表示收到消息后,就转交给@RabbitHander的方法处理。但是具体那个方法,要看MessageConverter
转换后的参数。
@Service
@RabbitListener(queues = "queue_direct", containerFactory = "singleListenerContainer")
public class BookService {
// @RabbitListener(queues = "queue_direct", containerFactory = "singleListenerContainer")
@RabbitHandler
public void handleBook(@Payload Book body) {
System.out.println("-->信息域的值" + body);
}
@RabbitHandler
public void handleStr(@Payload HashMap<String, Object> body) {
System.out.println("-->信息域2的值" + body);
}
}
1.5 序列化方式MessageConverter
在RabbitTemplate
配置了MessageConverter
后,MQ发送和接收消息时候就能自动完成Message和自定义java对象类的转换。
@Bean
public MessageConverter messageConverter() {
//使用Jackson序列化消息
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,MessageConverter messageConverter) {
//客户端开启confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//交换器无法根据自身类型和路由键找到一个符合条件的队列时的处理方式
//true:RabbitMQ会调用Basic.Return命令将消息返回给生产者
//false:RabbitMQ会把消息直接丢弃(默认)
rabbitTemplate.setMandatory(true);
//定义消息转换器
rabbitTemplate.setMessageConverter(messageConverter);
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
}
});
rabbitTemplate.setReturnCallback(new ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
return rabbitTemplate;
}
我们可以在RabbitTemplate源码中看到:
private volatile MessageConverter messageConverter = new SimpleMessageConverter();
默认采用的是SimpleMessageConverter
他就直接将java对象序列化。但是并不推荐直接使用,因为会只限于java平台。
推荐使用JsonMessageConverter
、Jackson2JsonMessageConverter
,这两个是都将java对象转化为json再转为byte[]来构造Message对象,前一个用的是jackson json lib
,后一个用的是jackson 2 json lib
。
1.6 consumer端的异常处理
有两个error handler类可以对@RabbitListener注解方法中抛出的异常进行异常处理。
RabbitListenerErrorHandler接口
配置类代码:
@Bean
public RabbitListenerErrorHandler rabbitListenerErrorHandler(){
return new RabbitListenerErrorHandler() {
@Override
public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) throws Exception {
System.out.println("-------------------------------------"+message);
throw exception;
}
};
}
监听类代码:
@RabbitListener(queues = "queue_direct", containerFactory = "singleListenerContainer",errorHandler = "rabbitListenerErrorHandler")
public void handleBook(@Payload Book body) throws Exception {
int i = 1 / 0;
System.out.println("-->信息域的值" + body);
}
效果图:
ErrorHandler接口
这一个值是设置在SimpleMessageListenerContainer
的。
生产者生产了Book类型的消息:
@Test
public void contextLoads() {
Book book = new Book("西游记", "120.00");
rabbitTemplate.convertAndSend("exchange_direct", "ord", book);
}
配置类配置了ErrorHandler处理:
注意消息到达ErrorHandler
则意味着处理失败,不需要在抛出异常。并且这个含有ConditionalRejectingErrorHandler
默认配置,可以识别特定的不可挽回的异常拒绝requeue
队列,防止消息处理的死循环。
//该参数是自定义的一个pojo对象,里面存储Mq的连接信息
public static SimpleMessageListenerContainer newContainer(RabbitConsumer consumer) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
//监听的队列
container.setQueueNames(consumer.getQueueName());
//自动ACK
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setAmqpAdmin(RabbitContextHolder.getRabbitAdmin(consumer.getVhost()));
container.setConnectionFactory(RabbitContextHolder.getConnectionFactory(consumer.getVhost()));
//预取的数量
container.setPrefetchCount(consumer.getPerfetch());
//是否自动声明 队列、交换机、绑定
container.setAutoDeclare(false);
//设置初始化后是否自动启动容器。
container.setAutoStartup(true);
//是否设置排他性,即该消费者独享该队列
container.setExclusive(false);
//消费者开启几个线程
container.setConcurrency(consumer.getConcurrency());
//消费者的监听
container.setMessageListener(new NormalListener(consumer));
//设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。
container.setDefaultRequeueRejected(false);
//发送异常时,会回调该方法
container.setErrorHandler(e->log.error("丢弃消息啦!"));
//初始化这个类
container.afterPropertiesSet();
return container;
}
执行结果:
在发送异常后并未重新放入队列,而是直接丢弃消息。
注意事项:
-
@RabbitListener
和@RabbitHandler
组合使用时,RabbitListenerErrorHandler
配置无效。 -
@RabbitListenerErrorHandler
作用域只是配置@RabbitListener
注解上的,这个注解只对当前方法发生异常时有效。而ErrorHandler
对所有@RabbitListener
注解方法有效。 -
@RabbitListener
注解的方法中抛出的异常,首先会进入RabbitListenerErrorHandler
,这里如果没有能力处理这个异常,需要将其重新抛出(否则不会进入rrorHandler
),然后异常将会进入ErrorHandler
,一旦异常进入ErrorHandler
就意味着消息消费失败了(所以不需要重新抛出异常)。 -
RabbitListenerErrorHandler
没有默认配置,而ErrorHandler
有一个默认的ConditionalRejectingErrorHandler
类,他的作用__打印日志,辨别特定的异常。__将其包装成AmqpRejectAndDontRequeueException
抛出,这个异常的作用是,忽略defaultRequeueRejected
(前文已经讲过)的设置,强制让rabbitmq
丢弃此条处理失败消息,不放回queue
。
需要丢弃的异常:
o.s.amqp...MessageConversionException
o.s.messaging...MessageConversionException
o.s.messaging...MethodArgumentNotValidException
o.s.messaging...MethodArgumentTypeMismatchException
java.lang.NoSuchMethodException
java.lang.ClassCastException
1.7 手动确认-将异常信息放入死信队列
为了避免消息异常造成的死循环,也可以将requeue
(上文配置参数)设置为false
。消息被拒绝(basic.reject/ basic.nack)并且requeue=false时,消息会进入死信队列。于是我们可以监听死信队列来处理异常消息。
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("consumer_queue"); // 监听的队列
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { //消息处理
System.out.println("====接收到消息=====");
System.out.println(new String(message.getBody()));
if(message.getMessageProperties().getHeaders().get("error") == null){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息已经确认");
}else {
//第二个参数是requeue=false,即不重新会队列,会进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息拒绝");
}
});
return container;
}
消息进入死信队列的途径:
- 消息被拒绝(basic.reject/ basic.nack)并且requeue=false。
- 消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间))。
- 队列达到最大长度。
1.8 保证消息的不丢失
-
自动确认模式下,可以使用
1.1.6
方式的异常处理机制即可。 -
手动确认模式下,推荐是使用死信队列的方式,即
1.1.7
处理。 - 需要注意
1.1.6
在手动确认模式下,若是异常未被捕获,也是可以生效的。
手动确定模式:
@RabbitListener(queues = "queue_direct", containerFactory = "singleListenerContainer")
public void handleBook(Message message, Book book, Channel channel) throws Exception {
try {
int i = 1 / 0;
} catch (Exception e) {
//告诉MQ删除这一条消息,若是true,则是删除所有小于tags的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("book------>" + book);
}
System.out.println("---->信息域的值" + book);
}
1.9 保证消息唯一性
我们在生产者保证消息不丢失 中,可以知道,为了保证消息不丢失,生产者会将ack=false
的消息重新发送,那么可能会导致消费端的消息重复,那怎么去重?
可以为每一条消息设置一个messageId
,用于消费者端的去重。
生产者代码:
@Autowired
private MessageConverter messageConverter;
@Test
public void contextLoads() {
Map<String, Object> map = new HashMap<>();
Book book = new Book("西游记", "120.00");
MessageProperties messageProperties = new MessageProperties();
//设置messageId
messageProperties.setMessageId("123456");
Message message = messageConverter.toMessage(book, messageProperties);
////////////////////
try {
rabbitTemplate.convertAndSend("exchange_direct", "ord", message);
} catch (AmqpConnectException e) {
System.out.println("保存信息编号:" + correlationData);
}
}
消费者代码:
@RabbitListener(queues = "queue_direct", containerFactory = "singleListenerContainer", errorHandler = "rabbitListenerErrorHandler")
public void handleBook(Message message, Book book, Channel channel) throws Exception {
System.out.println("book------>" + book);
System.out.println("------>"+message.getMessageProperties().getMessageId());
//告诉MQ删除这一条消息,若是true,则是删除所有小于tags的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("----->信息域的值" + book);
}
效果图:
2 消费者通过拉(PULL)方式获取消息
可以通过AmqpTemplate
或者RabbitMqTemplate
拉取消息,当queue没有消息时,会立刻返回null,传入timeoutMillis参数可阻塞等待一段时间。
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
若是想直接在queue获取到java对象,可以调用receiveAndConvert
方法。
测试代码:
@Test
public void receive() {
Object o = rabbitTemplate.receiveAndConvert("queue_direct");
System.out.println(o.hashCode());
System.out.println(o);
}