RabbitMq简介
1,mq产生背景
1.在网络通讯中,Http请求默认采用同步请求方式,基于请求与响应模式
2.在客户端与服务器进行通讯时,客户端调用服务端接口后,必须等待服务端完成处理后返回结果给客户端才能继续执行,这种情况属于同步调用方式。
3.如果服务器端发生网络延迟、不可达的情况,可能客户端也会受到影响。
2,能做什么?
解决应用解耦,异步消息,流量削峰等问题,实现高可用,可伸缩性和最终一致性的架构;
1).异步处理
2).应用解耦
3).流量削锋
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用
4).消息通讯
3,ActiveMQ的通讯模式
基于队列(点对点)或者主题(发布订阅)
4,中间件的对比
ActiveMQ:基于java语言
RabbitMQ: 基于Erlang语言(高并发,稳定性高)
5,Erlang编程语言最初目的是进行大型电信交换设备的软件开发,是一种适用于大规模并行处理环境的高可靠性编程语言。随着多核处理器技术的日渐普及,以及互联网、云计算等技术的发展,该语言的应用范围也有逐渐扩大之势。初衷理念实现抗高并发语言
6,Virtual Hosts:
像mysql有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?RabbitMQ也有类似的权限管理。在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。
7,五种队列模式
1).点对点(简单)的队列
2).工作(公平性)队列模式
3).发布订阅模式
4).路由模式Routing
5).通配符模式Topics
长链接:减少三次握手,但是占据带宽
8,RabbitMQ关键名词
1)AMQP(高级消息队列协议)是一个异步消息传递所使用应用层协议规范,为面向消息中间件设计,基于此协议的客户端与消息中间件可以无视消息来源传递消息,不受客户端、消息中间件、不同的开发语言环境等条件的限制;
涉及概念解释:
2) Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程;
3) Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue。
4) Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue。
5) ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic;
6) Message Queue:消息队列,用于存储还未被消费者消费的消息;
7) Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等;body是真正需要发送的数据内容;
8)BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来。
9,消息队列RabbitMQ应答模式
案例:
生产者端代码不变,消费者端代码这部分就是用于开启手动应答模式的。
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
注:第二个参数值为false代表关闭RabbitMQ的自动应答机制,改为手动应答。
在处理完消息时,返回应答状态,true表示为自动应答模式。
channel.basicAck(envelope.getDeliveryTag(), false);
消息队列RabbitMQ应答模式:
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。
没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。
消息应答是默认打开的。我们通过显示的设置autoAsk=true关闭这种机制。现即自动应答开,一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
RabbitMQ消息确认机制:
问题产生背景:
生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。
解决方案:
1.AMQP 事务机制
2.Confirm 模式
事务模式:
txSelect 将当前channel设置为transaction模式
txCommit 提交当前事务
txRollback 事务回滚
如果rabbitmq服务器宕机,消息会丢失吗?
queueDeclare(String queue,boolean durable,boolean execusive,boolean autoDelete ,Map<String,Object> arguments)
rabbitMQ支持消息持久化机制,会把消息持久化到硬盘上 durable=true
10,RabbitMQ的公平转发
目前消息转发机制是平均分配,这样就会出现俩个消费者,奇数的任务很耗时,偶数的任何工作量很小,造成的原因就是近当消息到达队列进行转发消息。并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。
为了解决这样的问题,我们可以使用basicQos方法,传递参数为prefetchCount= 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。
换句话说,只有在消费者空闲的时候会发送下一条信息。调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完毕并自己对刚刚处理的消息进行确认之后,才发送下一条消息,防止消费者太过于忙碌,也防止它太过去清闲。
通过 设置channel.basicQos(1);
11,RabbitMQ交换机的作用
生产者发送消息不会向传统方式直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列在将消息以推送或者拉取方式给消费者进行消费,这和我们之前学习Nginx有点类似。
交换机的作用根据具体的路由策略分发到不同的队列中,交换机有四种类型。
1) Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的
2)Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列
3) Topic exchange(主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列
4) Headers exchange(头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
12,发布/订阅模式Publish/Subscribe
这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展。
功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者。
思路解读(重点理解):
(1)一个生产者,多个消费者
(2)每一个消费者都有自己的一个队列
(3)生产者没有直接发消息到队列中,而是发送到交换机
(4)每个消费者的队列都绑定到交换机上
(5)消息通过交换机到达每个消费者的队列
该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列
以用户发邮件案例讲解
注意:交换机没有存储消息功能,如果消息发送到没有绑定消费队列的交换机,消息则丢失。
13,路由模式RoutingKey
生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
例如:我们可以把路由key设置为insert ,那么消费者队列key指定包含insert才可以接收消息,消费者队列key定义为update或者delete就不能接收消息。很好的控制了更新,插入和删除的操作。
采用交换机direct模式
14,通配符模式Topics
说明:此模式实在路由key模式的基础上,使用了通配符来管理消费者接收消息。生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;
符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor
15,一个项目有多个消费者一个可以有多个生产者
16,RabbitMQ消息重试机制(补偿)
消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这时候应该如何处理?
答案:使用消息重试机制。(演示重试机制)
如何合适选择重试机制(Mq重试需要注意的问题):
情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? (需要重试机制)
情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试?(不需要重试机制)需要发布进行解决。
如何实现重试机制
总结:对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job健康检查(查询日志表,没有消费的数据进行xxl)+人工进行补偿
@RabbitListener(queues = “new_fanout_sms_queue”)
底层使用Aop进行拦截,如果程序没有出现异常,自动提交事务,如果抛出异常自动实现补偿机制,该消息会缓存到rabbitmq服务端存放,一直重试到不抛出异常为止,这种做法肯定有问题,可以修改重试机制,
默认情况下五秒重试一次
17,消费者如果保证消息幂等性,不被重复消费
产生原因:网络延迟传输中,消费出现异常或者是消费延迟消费,会造成MQ进行重试补偿,在重试过程中,可能会造成重复消费。
消费者如何保证消息幂等性,不被重复消费
解决办法:
①使用全局MessageID判断消费方使用同一个,解决幂等性。(UUID生成)
//生产者发送消息的时候设置消息Id
Message message = MessageBuilder
.withBody(jsonString.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding(“utf-8”)
.setMessageId(UUID.randomUUID() + “”).build();
②或者使用业务逻辑保证唯一(比如订单号码)
18,手动ack
消费者 yml写入
spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: /
listener:
simple:
retry:
####开启消费者重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔时间
initial-interval: 3000
####手动开启ack
acknowledge-mode: manual
代码
// @RabbitHandler()
@RabbitListener(queues = “new_fanout_email_queue”)
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
//业务逻辑
// 手动ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动签收
channel.basicAck(deliveryTag, false);
System.out.println("调用第三方成功,result" + retult + "程序结束");
}
19,RabbitMQ死信队列
死信队列 是 当消息在一个队列 因为下列原因:
1.消息被拒绝(basic.reject或basic.nack)并且requeue=false.
2.消息TTL过期(消息过期)
3.队列达到最大长度(队列满了,无法再添加数据到mq中,会丢失数据,)
应用场景分析
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丢弃消息
注意事项:如果之前没有绑定死信交换机,中间绑定的,需要把之前的队列的删掉,重新绑定;
20,RabbitMQ解决分布式事务思路
案例:
经典案例,以目前流行点外卖的案例,用户下单后,调用订单服务,让后订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯。
RabbitMQ解决分布式事务原理: 采用最终一致性原理。
需要保证以下三要素
1、确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制)
2、MQ消费者消息能够正确消费消息,采用手动ACK模式(注意重试幂等性问题)
3、如何保证第一个事务先执行,采用补偿机制,在创建一个补单消费者进行监听,如果订单没有创建成功,进行补单。
http协议形式的传统派单不支持高并发:订单服务过大,导致派单服务堆积
21,部分思路问题
1)如果消费者消费消息失败,生产者是不需要回滚事务
解决思路:消费采用手动ack应答模式,采用mq进行补偿重试机制,注意mq补偿幂等性问题
2)如果生产者发送失败到mq服务器?
解决思路:使用生产者重试机制进行发送消息
3)如何保证第一个事务先执行,生产者投递消息到mq服务端成功,消费者消费消息成功,但是生产者回滚,采用补单机制;
22,代码详见[email protected]:qinixiangyang111/RabbitMQStudy.git