SpringBoot+RabbitMq具体使用的几种姿势
目前主流的消息中间件有activemq,rabbitmq,rocketmq,kafka,我们要根据实际的业务场景来选择一款合适的消息中间件,关注的主要指标有,消息投递的可靠性,可维护性,吞吐量以及中间件的特色等重要指标来选择,大数据领域肯定是kafka,那么传统的业务场景就是解耦,异步,削峰。那么就在剩下的3款产品中选择一款,从吞吐量,社区的活跃度,消息的可靠性出发,一般的中小型公司选择rabbitmq来说可能更为合适。那么我们就来看看如何使用它吧。
环境准备
本案例基于springboot集成rabbitmq,本案例主要侧重要实际的code,对于基础理论知识请自行百度。
jdk-version:1.8
rabbitmq-version:3.7
springboot-version:2.1.4.release
pom文件
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency>
yml配置文件
spring: rabbitmq: password: guest username: guest port: 5672 addresses: 127.0.0.1 #开启发送失败返回 publisher-returns: true #开启发送确认 publisher-confirms: true listener: simple: #指定最小的消费者数量. concurrency: 2 #指定最大的消费者数量. max-concurrency: 2 #开启ack acknowledge-mode: auto #开启ack direct: acknowledge-mode: auto #支持消息的确认与返回 template: mandatory: true
配置rabbitmq的姿势
姿势一
基于javaconfig
package com.lly.order.message; import org.springframework.amqp.core.*; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; /** * @classname rabbitmqconfig * @description rabbitmq配置类 * @author lly * @date 2019-05-13 15:05 * @version 1.0 **/ @configuration public class rabbitmqconfig { public final static string direct_queue = "directqueue"; public final static string topic_queue_one = "topic_queue_one"; public final static string topic_queue_two = "topic_queue_two"; public final static string fanout_queue_one = "fanout_queue_one"; public final static string fanout_queue_two = "fanout_queue_two"; public final static string topic_exchange = "topic_exchange"; public final static string fanout_exchange = "fanout_exchange"; public final static string topic_routingkey_one = "common_key"; public final static string topic_routingkey_two = "*.key"; // direct模式队列 @bean public queue directqueue() { return new queue(direct_queue, true); } // topic 订阅者模式队列 @bean public queue topicqueueone() { return new queue(topic_queue_one, true); } @bean public queue topicqueuetwo() { return new queue(topic_queue_two, true); } // fanout 广播者模式队列 @bean public queue fanoutqueueone() { return new queue(fanout_queue_one, true); } @bean public queue fanoutqueuetwo() { return new queue(fanout_queue_two, true); } // topic 交换器 @bean public topicexchange topexchange() { return new topicexchange(topic_exchange); } // fanout 交换器 @bean public fanoutexchange fanoutexchange() { return new fanoutexchange(fanout_exchange); } // 订阅者模式绑定 @bean public binding topexchangebingingone() { return bindingbuilder.bind(topicqueueone()).to(topexchange()).with(topic_routingkey_one); } @bean public binding topicexchangebingingtwo() { return bindingbuilder.bind(topicqueuetwo()).to(topexchange()).with(topic_routingkey_two); } // 广播模式绑定 @bean public binding fanoutexchangebingingone() { return bindingbuilder.bind(fanoutqueueone()).to(fanoutexchange()); } @bean public binding fanoutexchangebingingtwo() { return bindingbuilder.bind(fanoutqueuetwo()).to(fanoutexchange()); } }
姿势二
基于注解
package com.lly.order.message; import com.rabbitmq.client.channel; import lombok.extern.slf4j.slf4j; import org.springframework.amqp.core.amqptemplate; import org.springframework.amqp.core.exchangetypes; import org.springframework.amqp.core.message; import org.springframework.amqp.rabbit.annotation.exchange; import org.springframework.amqp.rabbit.annotation.queue; import org.springframework.amqp.rabbit.annotation.queuebinding; import org.springframework.amqp.rabbit.annotation.rabbitlistener; import org.springframework.amqp.rabbit.connection.correlationdata; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.component; import java.io.ioexception; import java.time.localtime; import java.util.uuid; /** * @classname mqtest * @description 消息队列测试 * @author lly * @date 2019-05-13 10:50 * @version 1.0 **/ @component @slf4j public class mqtest implements rabbittemplate.confirmcallback, rabbittemplate.returncallback { private final static string queue = "test_queue"; @autowired private amqptemplate amqptemplate; @autowired private rabbittemplate rabbittemplate; public mqtest(rabbittemplate rabbittemplate) { rabbittemplate.setconfirmcallback(this); rabbittemplate.setreturncallback(this); } public void sendmq() { rabbittemplate.convertandsend("test_queue", "test_queue" + localtime.now()); log.info("发送消息:{}", "test_queue" + localtime.now()); } public void sendmqrabbit() { //回调id correlationdata cid = new correlationdata(uuid.randomuuid().tostring()); // rabbittemplate.convertandsend(rabbitmqconfig.fanout_exchange, "", "广播者模式测试",cid); object object = rabbittemplate.convertsendandreceive(rabbitmqconfig.fanout_exchange, "", "广播者模式测试", cid); log.info("发送消息:{},object:{}", "广播者模式测试" + localtime.now(), object); } //发送订阅者模式 public void sendmqexchange() { correlationdata cid = new correlationdata(uuid.randomuuid().tostring()); correlationdata cid01 = new correlationdata(uuid.randomuuid().tostring()); log.info("订阅者模式->发送消息:routing_key_one"); rabbittemplate.convertsendandreceive("topic_exchange", "routing_key_one", "routing_key_one" + localtime.now(), cid); log.info("订阅者模式->发送消息routing_key_two"); rabbittemplate.convertsendandreceive("topic_exchange", "routing_key_two", "routing_key_two" + localtime.now(), cid01); } //如果不存在,自动创建队列 @rabbitlistener(queuestodeclare = @queue("test_queue")) public void receivermq(string msg) { log.info("接收到队列消息:{}", msg); } //如果不存在,自动创建队列和交换器并且绑定 @rabbitlistener(bindings = { @queuebinding(value = @queue(value = "topic_queue01", durable = "true"), exchange = @exchange(value = "topic_exchange", type = exchangetypes.topic), key = "routing_key_one")}) public void receivermqexchage(string msg, channel channel, message message) throws ioexception { long deliverytag = message.getmessageproperties().getdeliverytag(); try { log.info("接收到topic_routing_key_one消息:{}", msg); //发生异常 log.error("发生异常"); int i = 1 / 0; //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicack(deliverytag, false); } catch (exception e) { log.error("接收消息失败,重新放回队列"); //requeu,为true,代表重新放入队列多次失败重新放回会导致队列堵塞或死循环问题, // 解决方案,剔除此消息,然后记录到db中去补偿 //channel.basicnack(deliverytag, false, true); //拒绝消息 //channel.basicreject(deliverytag, true); } } @rabbitlistener(bindings = { @queuebinding(value = @queue(value = "topic_queue02", durable = "true"), exchange = @exchange(value = "topic_exchange", type = exchangetypes.topic), key = "routing_key_two")}) public void receivermqexchagetwo(string msg) { log.info("接收到topic_routing_key_two消息:{}", msg); } @rabbitlistener(queues = rabbitmqconfig.fanout_queue_one) public void receivermqfanout(string msg, channel channel, message message) throws ioexception { long deliverytag = message.getmessageproperties().getdeliverytag(); try { log.info("接收到队列fanout_queue_one消息:{}", msg); channel.basicack(deliverytag, false); } catch (exception e) { e.printstacktrace(); //多次失败重新放回会导致队列堵塞或死循环问题 丢弃这条消息 // channel.basicnack(message.getmessageproperties().getdeliverytag(), false, false); log.error("接收消息失败"); } } @rabbitlistener(queues = rabbitmqconfig.fanout_queue_two) public void receivermqfanouttwo(string msg) { log.info("接收到队列fanout_queue_two消息:{}", msg); } /** * @return * @author lly * @description 确认消息是否发送到exchange * @date 2019-05-14 15:36 * @param [correlationdata, ack, cause] **/ @override public void confirm(correlationdata correlationdata, boolean ack, string cause) { log.info("消息唯一标识id:{}", correlationdata); log.info("消息确认结果!"); log.error("消息失败原因,cause:{}", cause); } /** * @return * @author lly * @description 消息消费发生异常时返回 * @date 2019-05-14 16:22 * @param [message, replycode, replytext, exchange, routingkey] **/ @override public void returnedmessage(message message, int replycode, string replytext, string exchange, string routingkey) { log.info("消息发送失败id:{}", message.getmessageproperties().getcorrelationid()); log.info("消息主体 message : ", message); log.info("消息主体 message : ", replycode); log.info("描述:" + replytext); log.info("消息使用的交换器 exchange : ", exchange); log.info("消息使用的路由键 routing : ", routingkey); } }
rabbitmq消息确认的三种方式
# 发送消息后直接确认消息 acknowledge-mode:none # 根据消息消费的情况,智能判定消息的确认情况 acknowledge-mode:auto # 手动确认消息的情况 acknowledge-mode:manual
我们以topic模式来试验下消息的ack
自动确认消息模式
手动确认消息模式
然后我们再次消费消息,发现消息是没有被确认的,所以可以被再次消费
发现同样的消息还是存在的没有被队列删除,必须手动去ack,我们修改队列1的手动ack看看效果
channel.basicack(deliverytag, false);
重启项目再次消费消息
再次查看队列里的消息,发现队列01里的消息被删除了,队列02的还是存在。
消费消息发生异常的情况,修改代码 模拟发生异常的情况下发生了什么, 异常发生了,消息被重放进了队列
但是会导致消息不停的循环消费,然后失败,致死循环调用大量服务器资源
所以我们正确的处理方式是,发生异常,将消息记录到db,再通过补偿机制来补偿消息,或者记录消息的重复次数,进行重试,超过几次后再放到db中。
总结
通过实际的code我们了解的rabbitmq在项目的具体的整合情况,消息ack的几种情况,方便在实际的场景中选择合适的方案来使用。如有不足,还望不吝赐教。希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: JSP迅速入门
下一篇: 推荐几款非常实用的IDEA插件小结