rabbitmq一些实例应用
rabbitmq特点
* 异步处理
* 应用解耦:只关心消息,不关心参数
* 流量控制(削锋):把流量请求放到队列,实现流量控制
消息队列主要的两种形式目的地
* 队列(queue):点对点(point to point)信息通信
* 点对点式:
* 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列
* 消息只有唯一的发送者和接收者,但并不是说只有一个接收者
* 主题(topic):发布(public)/订阅(subcribe)消息通信
* 发布订阅式:
* 发送者发送消息到主题,多个接收者监听这个主题,那么就在消息到达时同时收到消息
rabbitmq概念
* Message:消息,有消息头和消息体组成,消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包含routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等
* Publisher:消息生产者,也是一个向交换器发布消息的客户端应用程序
* Exchange:交换器,用来接收生产者发送的消息,并将这些消息路由给服务器中的队列。
* Exchange有四种类型:
* direct:默认
* fanout
* topic
* headers
* Queue:消息队列,用来保存消息知道发送给消费者,它是消息容器,也是消息的终点,一个消息可投入一个或者多个队列。消息一直在队列里,等待消费者连接到这个队列将其取走
* Binding:绑定,用于消息队列和交换器之间的关联,一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将减缓器理解成一个由绑定构成的路由表
* Connection:网络连接,比如一个tcp连接
* channel:信道,多路复用连接一条独立的双向的数据流通道。信道是建立在真实的tcp连接的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁tcp都是非常昂贵的开销,所以引入信道的概念,以复用一条tcp连接(一个客户端只会建立一条长连接)
* Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序
* Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的Rabbitmq服务器,拥有自己的队列。交换器、绑定和权限机制。vhost是AMQP的基础,必须在连接时指定rabbitmq的默认vhost是/
* Broker:消息队列服务器
Docker安装Rabbitmqdocker run --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management#4369,25672(Erlang发现&集群端口)#5672,5671(AMQP端口)#15672(web管理后台端口)#61613,61614(STOMP协议端口)#1883,8883(MQTT协议端口)https://www.rabbitmq.com/networking.htmlExchange类型
* direct:消息中的路由键(routing key)如果和binding key一致,交换器就将消息发到对应的队列中;路由键要与队列名完全匹配
* headers:和direct交换器方式一致,但匹配的是header而不是routing key
* fanout:每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。fanout交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的
* topic:通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将了路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符#、*;#匹配0个或者多个单词;*号匹配一个单词。
RabbitMQ与SpringBoot整合//1、引入spring-boot-starter-amqp//2、application.yml配置//3、组件 //3.1、AmqpAdmin:管理组件 //3.2、RabbitTemplate:消息发送处理组件/*** 使用rabbitmq配置* 1、引入amqp场景:RabbitAutoConfiguration自动生效* 2、给容器中自动配置* 2.1、CachingConnectionFactory* 2.2、RabbitMessagingTemplate* 2.3、RabbitTemplate* 2.4、AmqpAdmin:消息队列管理组件** 2.5/@ConfigurationProperties(* * prefix = “spring.rabbitmq”* * )所有信息从这里开始* 2.6、给配置文件配置host、port、vhost** 3、@EnableRabbit:开启rabbitmq* 4、监听消息:@RabbitListener注解,必须开启@EnableRabbit后才能使用* @RabbitListener注解:可以加载类+方法上* @RabbitHandler:只能加在方法上,重载区分不同的消息**/##配置中加rabbitmq: host: 192.168.56.10 port: 5672 virtual-host: /#測試創建交换机、队列、绑定关系@[email protected] GulimallOrderApplicationTests { @Autowired AmqpAdmin amqpAdmin; @Autowired RabbitTemplate rabbitTemplate; /** * 1、如何创建Exchange、Queue、Binding * 1.1、使用AmqpAdmin进行创建 * 2、如何收发消息 * 使用RabbitTemplate发送消息 * 使用@RabbitListener监听消息 / @Test public void sendMessage() { for (int i = 0; i < 10; i++) { if (i%20) { //1、发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去,对象必须实现序列化 OrderReturnReasonEntity returnReasonEntity = new OrderReturnReasonEntity(); returnReasonEntity.setId(1L); returnReasonEntity.setCreateTime(new Date()); returnReasonEntity.setName(“哈哈”+i); //2、发送的对象类型也可以是一个json—》创建一个消息转换器,转换json格式,config包 rabbitTemplate.convertAndSend(“direct-Exchange”, “hello.java”, returnReasonEntity); log.info(“消息[{}]发送完成”, returnReasonEntity); }else { //1、发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去,对象必须实现序列化 OrderEntity orderEntity = new OrderEntity(); orderEntity.setOrderSn(UUID.randomUUID().toString()); //2、发送的对象类型也可以是一个json—》创建一个消息转换器,转换json格式,config包 rabbitTemplate.convertAndSend(“direct-Exchange”, “hello.java”, orderEntity); log.info(“消息[{}]发送完成”, orderEntity); } } } @Test public void createExchange() { /* * String name:交换机名字 * boolean durable:是否持久化 * boolean autoDelete:是否自动删除 * Map<String, Object> arguments:参数 / DirectExchange directExchange = new DirectExchange(“direct-Exchange”, true, false); amqpAdmin.declareExchange(directExchange); log.info(“创建了[{}]交换机”,“direct-Exchange”); } @Test public void createQueue() { /* * String name:队列名字 * boolean durable:队列持久化 * boolean exclusive:是否排他(只能被声明的连接使用,其他连接不能使用) * boolean autoDelete:队列自动删除 * @Nullable Map<String, Object> arguments:参数 / Queue queue = new Queue(“hello-java-queue”,true,false,false); amqpAdmin.declareQueue(queue); log.info(“创建了[{}]队列”,“hello-java-queue”); } //交换机与队列绑定 @Test public void createBinding() { /* * String destination:目的地-》 * Binding.DestinationType destinationType:目的地类型-》绑定队列还是交换机 * String exchange:交换机 * String routingKey:路由键 * @Nullable Map<String, Object> arguments:参数 / Binding binding = new Binding(“hello-java-queue”, Binding.DestinationType.QUEUE,“direct-Exchange”,“hello.java”,null); amqpAdmin.declareBinding(binding); log.info(“创建了[{}]绑定关系”,“hello-java-binding”); }}@Configurationpublic class RabbitConfig { //消息转换器,容器中有就用容器中的,没有就默认一个SimpleMessageConverter @Bean public MessageConverter messageConverter(){ //将消息转为json格式 return new Jackson2JsonMessageConverter(); }}@RabbitListener(queues = {“hello-java-queue”})@[email protected](“orderItemService”)public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService { /* * queues:声明监听的队列数组 * *参数可以写一下类型 * 1、Message message:原生消息详细信息。头+体 * 2、T<发送的消息类型> * 3、Channel channel:当前传输数据的通道 * * Queue可以很多人都来监听,只要收到消息,队列删除消息,而且只能有一个人收到此消息 * 场景: * 1)、订单服务启动多个;同一个消息只能有一个客户收到 * 2)、只有一个消息完全处理完,方法运行结束,我们就可以接受到下一个消息 */ @RabbitHandler public void recieveMessage01(Message message, OrderReturnReasonEntity content, Channel channel){ //消息体 byte[] body = message.getBody(); //OrderReturnReasonEntity orderReturnReasonEntity = JSON.parseObject(body, OrderReturnReasonEntity.class); //消息头 MessageProperties properties = message.getMessageProperties(); System.out.println(“接收到消息[{}]”+message+">类型[{}]"+content); } @RabbitHandler public void recieveMessage02(Message message, OrderEntity content, Channel channel){ //消息体 byte[] body = message.getBody(); //OrderReturnReasonEntity orderReturnReasonEntity = JSON.parseObject(body, OrderReturnReasonEntity.class); //消息头 MessageProperties properties = message.getMessageProperties(); System.out.println(“接收到消息[{}]”+message+"==>类型[{}]"+content); }}RabbitMQ消息确认机制-可靠性抵达
1. publisher confirmCallback:确认模式
2. publisher returnCallback:未投递到queue退回模式
3. consumer ack机制
* 可靠抵达-ConfirmCallback
* spring.rabbitmq.publisher-confirms=true(配置开启)新版本:publisher-confirm-type: correlated
* 在创建connectionFactory的时候设置PublisherConfirms(true)选项,开启confirmCallback
* CorrelationData:用来表示当前消息的唯一性
* 消息只要被broker接收到就会执行confirmCallback,如果是cluster模式,需要所有的broker接收到才会调用confirmCallback
* 被broker接收到只能表示message已经到达服务器,并不能保证消息一定会被投递到目标queue里。所以需要用到接下来的returnCallback
p->b阶段####配置spring.rabbitmq.publisher-confirms=true新版本:publisher-confirm-type: correlated###配置类设置消息确认回调/*** 定制RabbitTemplate*/@PostConstruct //RabbitConfig创建完之后执行方法public void initRabbitTemplate(){ //设置确认回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id) * @param ack 消息是否成功收到,只要消息抵达服务器ack=true * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(“confirm…correlationData[”+correlationData+"]==>ack["+ack+"]=>cause["+cause+"]"); } });}
* 可靠抵达-ReturnCallback
* spring.rabbitmq.publisher-returns=true
* spring.rabbitmq.teplate.mandatory=true
* confirm模式只能保证消息到达broker,不能保证消息准确投递到目标queue里。在有些业务场景下,我们需要保证消息一定要投递到目标queue里,此时需要用到return回退模式
* 这样如果未能投递到目标queue里将调用returnCallback,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据
- 2、消息正确抵达队列进行回调* 1、spring.rabbitmq.publisher-returns=true* spring.rabbitmq.template.mandatory=true* 2、设置确认回调[email protected] class RabbitConfig { @Autowired RabbitTemplate rabbitTemplate; //消息转换器,容器中有就用容器中的,没有就默认一个SimpleMessageConverter @Bean public MessageConverter messageConverter(){ //将消息转为json格式 return new Jackson2JsonMessageConverter(); } /** * 定制RabbitTemplate * 1、服务器收到消息就回调 * 1、spring.rabbitmq.publisher-confirms=true * 2、设置确认回调setConfirmCallback * * 2、消息正确抵达队列进行回调 * 1、spring.rabbitmq.publisher-returns=true * spring.rabbitmq.template.mandatory=true * 2、设置确认回调returnCallback * * 3、消费端确认,保证每一个消息正确消费,保证可以删除消息 / @PostConstruct //RabbitConfig创建完之后执行方法 public void initRabbitTemplate(){ //设置确认回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /* * * @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id) * @param ack 消息是否成功收到,只要消息抵达服务器ack=true * @param cause 失败的原因 / @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(“confirm…correlationData[”+correlationData+"]>ack["+ack+"]=>cause["+cause+"]"); } }); //设置消息抵达队列的确认回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /* * 只要消息没有投递给指定的队列,就触发这个失败回调 * @param message 投递失败的消息详细信息 * @param replyCode 回复状态码 * @param replyText 回复文本内容 * @param exchange 当时这个消息发给哪个交换机 * @param routingKey 当时这个消息用哪个路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println(“Fail Message[”+message+"]>replyCode["+replyCode+"]>replyText["+exchange+"]===>["+routingKey+"]"); } }}
-
可靠抵达-Ack消息确认机制
-
spring.rabbitmq.listener.simple.acknowledge-mode
-
消费者获取到消息,成功处理,可以回复Ack给Broker
- -basic.ack:用于肯定确认;broker将移除此消息
- -basic.nack:用于否定确认;可以指定broker是否丢弃此消息,可以批量
- -basic.reject:用于否定确认;同上,但不能批量
-
默认,消息被消费者收到,就会从broker的queue中移除
-
queue无消费者,消息依然会被存储,知道消费者消费
-
消费者收到消息,默认就会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式
- 消息处理成功,ack(),接受下一个消息,此消息就会被broker移除
- 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
- 消息一直没有调用ack()/nack(),broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息就不会被broker移除,回投递给别人
-
-
channel.basicAck(deliveryTag, false);签收获取channel.basicNack(deliveryTag,false,true);拒签
* RabbitMQ演示队列(实现定时任务)
* 场景:
* 比如未付款订单,超过一定时间后,系统自动取消并释放占有物品
* 常用解决方案:spring的schedule定时任务轮询数据库
* 缺点:消耗内存、增加数据库的压力、存在较大的时间误差
* 解决:rabbitmq的消息ttl和私信Exchange结合
* 订单创建与支付
* 订单创建前需要预览订单,选择收货信息等
* 订单创建需要锁定库存,库存有才能创建,否则不能创建
* 订单创建成功后超市未支付需要解锁库存
* 支付成功后,需要进行进行拆单,根据商品打包方式,所在仓库,物流等进行拆单
* 支付的每笔流水都需要记录,一点查账
* 订单创建,支付成功等状态都需要给MQ发送消息,方便其他系统感知订阅
* 逆向流程
* 修改订单,用户没有提交订单,可以对订单一些信息进行修改,比如配送信息,优惠信息,及其他一些订单可修改范围的内容,此时只需要对数据进行变更即可
* 订单取消,用户主动取消订单和用户超时未支付,两种情况下订单都会取消,而超时情况是系统自动关闭订单,所以在订单支付的响应机制上面要做支付的
* 幂等性处理
* 订单业务
*
* feign远程调用丢失请求头问题(同步)
* 解决
@Configurationpublic class FeignConfig { //添加拦截器 @Bean(“requestInterceptor”) public RequestInterceptor requestInterceptor(){ return new RequestInterceptor(){ @Override public void apply(RequestTemplate requestTemplate) { //RequestContextHolder获取当前的请求 ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest();//老请求 //同步请求头数据,Cookie String cookie = request.getHeader(“Cookie”); //给新请求同步了老请求的cookie requestTemplate.header(“Cookie”,cookie); } }; }}
* feign远程调用丢失请求头问题(异步)
@Configurationpublic class FeignConfig { //添加拦截器 @Bean(“requestInterceptor”) public RequestInterceptor requestInterceptor(){ return new RequestInterceptor(){ @Override public void apply(RequestTemplate requestTemplate) { //RequestContextHolder获取当前的请求 ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest();//老请求 if (request != null) { //同步请求头数据,Cookie String cookie = request.getHeader(“Cookie”); //给新请求同步了老请求的cookie requestTemplate.header(“Cookie”, cookie); } } }; }}######RequestContextHolder是ThreadLocal共享的异步时拿到threadlocal中的RequestAttributes,每个线程设置同一个请求头@Override public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException { OrderConfirmVo confirmVo = new OrderConfirmVo(); MemberRespVo memberRespVo = LoginUserInterceptor.loginUser.get(); RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); //从拦截器获取登录会员信息 CompletableFuture getAddressFuture = CompletableFuture.runAsync(() -> { RequestContextHolder.setRequestAttributes(attributes); //1、远程获取会员的收货地址列表 List address = memberFeignService.getAddress(memberRespVo.getId()); confirmVo.setAddress(address); }, executor); CompletableFuture getOrderItemFuture = CompletableFuture.runAsync(() -> { RequestContextHolder.setRequestAttributes(attributes); //2、远程查询购物车所有选中的购物项 List items = cartFeignService.getCurrentUserCartItems(); confirmVo.setItems(items); //feign在远程调用之前要构造请求,调用很多的拦截器 //RequestInterceptor interceptor : requestInterceptor }, executor); //3、查询用户积分 Integer integration = memberRespVo.getIntegration(); confirmVo.setIntegration(integration); //4、其他数据自动计算 //TODO 5、 防重令牌 CompletableFuture.allOf(getAddressFuture,getOrderItemFuture).get(); return confirmVo; }}
* 保证提交订单的幂等性
* 接口幂等性就是用户对于同意操作发起的一次请求或者多次请求结果是一致的,不会因为多次点而产生了副作用。
* 出现幂等性的情况
* 用户多次点击按钮
* 用户页面回退再次提交
* 微服务互相调用,由于网络问题,导致请求失败。feign触发重试机制
* 幂等解决方案 (token机制)
1. 服务端提供了发送token的接口。我们拆分业务的时候,哪些业务是存在幂等问题的,就必须执行业务前,先去获取token,服务器会把token保存到redis中
2. 然后调用业务接口请求时,把token携带过去,一般在请求头部
3. 服务器判断token是否存在redis中,存在表示第一次请求,然后是删除token,继续执行业务
* token机制危险性
* 先删除token还是后删除token
* 先删除可能导致,业务确实没有执行,重试还带上之前的token,由于防重设计导致,请求还是不能执行
* 后删除可能导致,业务执行成功,但是服务器闪断,出现超时,没有删除token,别人继续重试,导致业务被执行两遍
* 设计为先删除token,如果业务调用失败,就重新获取token再次请求。
* token的获取、比较和删除必须是原子性的
* redis.get(token)、token.equals、redis.del(token)如果这两个操作不是与啊你在,可能导致,高并发下,都get到同样的数据,判断都成功,继续业务并发执行
* 可以在redis使用lua脚本完成这个操作
if redis.call(‘get’,KEYS[1])==ARGV[1] then retrun redis.call(‘del’,KEYS[1]) else return 0 end
* 幂等解决方案 (各种锁机制)
1. 数据库悲观锁:select *from xxx where id=1 for update;悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,需要根据实际情况选用。另外要注意的是,id字段一定是主键或者唯一索引,不然那可能造成锁表的结果,处理起来会非常麻烦。
2. 数据库乐观锁:
3. 业务层分布式锁:如果多个机器可能在同一时间同时处理相同的数据,比如多态机器定时任务都拿到了相同数据处理,我们就可以加分布式锁,锁定此数据,处理完成后释放锁。获取到锁的必须先判断这个数据是否被处理过
* 幂等解决方案 (各种唯一约束)
1. 数据库唯一约束:插入数据,应按照唯一索引进行插入,比如订单号,相同的订单就不可能有两条记录插入
2. redis set防重:很多数据需要处理,只能被处理一次,比如我们可以计算数据的MD5将其放入redis的set,每次处理数据,先看这个MD5是否已经存在,存在就不处理
* 幂等解决方案 (防重表)
1. 使用订单号orderNo做为去重表的唯一索引,吧唯一索引插入去重表,再进行业务操作,且他们在同一个事务中。这个保证了重复请求时,因为去重表有唯一约束,导致请求失败,避免了幂等问题。这里要注意的是,去重表和业务表应该在同一个库中,这样就保证了字啊同一个事务,即使业务操作失败了,也会把去重表的数据回滚。这个很好保证了数据一致性。
* 幂等解决方案 (全局请求唯一id)
1. 调用接口时,生成一个唯一id,redis将数据保存到集合中(去重),存在即处理过。
2. 可以使用nginx设置每一个请求的唯一 id:proxy_set_header X-Request-Id $request_id
* 锁库存
* 事务
* 本地事务特性
* 原子性、一致性、隔离性、持久性(ACID)
* 原子性:一系列操作整体不可分割,要么同时成功,要么同时失败
* 一致性:数据在事务的前后,业务整体一致
* 转账:A 100 B 100 A给B转20 A:80 B:120 一致
* 隔离性:事务之间相互隔离
* 持久性:事务一旦成功,数据一定会罗盘在数据库
* 事务的隔离性
* READ UNCOMMITIED:读未提交-》该隔离级别的事务会读到其它未提交事务的数据,此现象也称为脏读
* READ COMMITED:读提交-》一个事务可以读取另一个已提交事务,多次读取会造成不一样的结果,此现象称为不可重复读
* REPEATABLE READ:可重复读-》该隔离级别是mysql默认的隔离级别,同一个事务里select的结果是事务开始时间点的状态,因此,同样的select操作读到的结果会是一致的。但是,会有会有幻读现象。Mysql的innodb引擎可以通过next-key locks机制来避免幻读。
* SERIALIZABLE:序列化-》该隔离级别下的事务都是串行顺序执行的,Mysql数据库的innodb引擎会给读操作隐式加一把读共享锁,从而避免脏读、不可重复读和幻读问题
* 事务的传播机制
@Transactional(timeout = 30)//a事务的所有设置就传播到了和他公用一个事务的方法public void a(){ //b、c做任何设置都没用,都是和a共用同一个事务,在同一个service情况下 //原因是事务是代理队里对象来控制的 //同一个对象内事务方法互调失效,原因染过了代理对象 b();//a事务,b共所有事务设置用a的 c();//新事物(不回滚) int i =10/0;}@Transactional(propagation = Propagation.REQUIRED,timeout = 2)public void b(){}@Transactional(propagation = Propagation.REQUIRES_NEW)//新事物public void c(){}5、本地事务失效问题* 原因是事务是代理队列对象来控制的,同一个对象内事务方法互调失效,原因染过了代理对象* 解决:使用代理对象来调用事务方法* 1)、引入aop-starter;引入了aspectj* 2)、启动类中加@EnableAspectJAutoProxy(exposeProxy = true),开启aspectj动态代理功能。* exposeProxy = true:对外暴露代理对象* 以后所有的动态代理都是aspectj创建的,即使没有接口也可以创建动态代理* 3)、用代理对象在本类进行互调@Transactional(timeout = 30)//a事务的所有设置就传播到了和他公用一个事务的方法 public void a(){ //b、c做任何设置都没用,都是和a共用同一个事务,在同一个service情况下 //原因是事务是代理队里对象来控制的 //同一个对象内事务方法互调失效,原因染过了代理对象// b();//a事务,b共所有事务设置用a的// c();//新事物(不回滚) OrderServiceImpl orderService = (OrderServiceImpl) AopContext.currentProxy(); orderService.b();//这样b、c事务就可以设置属于自己的隔离级别和传播机制和超时时间等 orderService.c(); int i =10/0; } @Transactional(propagation = Propagation.REQUIRED,timeout = 2) public void b(){ } @Transactional(propagation = Propagation.REQUIRES_NEW)//新事物 public void c(){ }
* 分布式事务
* CAP定理(三者区其二,分区容错必选,一致和可用二选一)
* 一致性:在分布式系统中的所有数据备份,在同一时刻是否同样的值(等同于所有节点访问同一份最新的数据副本)raft算法保证一致性(自旋时间选领导,心跳时间发日志)
* 可用性:在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求(对数据更新具备高可用性)
* 分区容错性:大多数分布式系统都分布在多个子网络。每个子网络就叫做一个区(partition),分区容错意思是,区间通信可能失败。比如,一台服务器放在中国,另一台服务器放在美国,这就是两个区,它们之间可能无法通信
* 一般来说分区容错无法避免,因此可以认为P总是成立的。C和A无法同时做到
* 面临问题:
* 对于多数大型互联网应用的场景,主机众多,部署分散,而且现在的集群规模越来越大,所以节点故障、网络故障是常态,而且要保证服务可用性达到99.99999%(N个9),即保证P和A,舍弃C。
* BASE理论:对CAP理论的延伸,思想是即使无法做到强一致性(CAP的一致性是强一致性),但可以采用适当的弱一致性,即最终一致性
* 基本可用:基本可用是指分布式系统在出现故障的时候,允许损失部分可用性(例如响应时间、功能上的可用性),允许损失部分的可用性。需要注意的是,基本可用绝不等价系统不可用
* 响应时间上的损失:正常情况下搜索引擎要在0.5秒之内返回给用户相应的查询结果,但由于出现故障(比如系统部分机房发生断电或断网故障),查询结果的响应时间增加到1~2秒
* 功能上的损失:购物网在购物高峰(如双十一)时,为了保护系统的稳定性,部分消费者可能会被引导到一个降级页面。
* 软状态:软状态是指允许系统存在中间状态,而该 中间状态不会影响系统的整个可用性。分布式存储中一般一份数据会有多个副本,允许不同副本同步的延时就是软状态的体现。mysql replication的异步复制也是一种体现。
* 最终一致性:最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。软一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况(P286)
* 强一致性、弱一致性、最终一致性
* 从客户端角度,多进程并发访问时,更新过的数据在不同的进程如何获取的不同策略,决定了不同的一致性。对于关系型数据库,要求更新过的数据能被后续的访问都能看到,这就是强一致性。如果能容忍后续的部分或者全部访问不到,则是弱一致性。如果经过一段时间后要求能访问到更新数据,则是最终一致性
* 分布式事务几种方案
* 2PC模式:二阶提交
* 第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是都可以提交
* 第二阶段:事务协调器要求每个数据库提交数据
* 其中,任何一个数据库否决此次提交,那么所有数据库都会被要去回滚他们再次事务中的那部分信息
* XA协议比较简单,而且一旦商业数据库实现了XA协议,使用分布式事务的成本也比较低
* XA协议性能不理想,特别是在交易下单链路,往往并发量高,XA无法满足高并发场景
* XA目前在商业数据库支持的比较理想,在mysql数据库中支持的不太理想,mysql的XA实现,没有记录prepare的阶段日志,主备切换导致主库和从库数据不一致
* 许多nosql也没有支持XA,这让XA的应用场景变得非常狭隘
* 也有3PC,引入了超时机制(无论协调者还是参与者,字啊向对方发送请求后,若长时间未收到回应则做出相应处理)
* 柔性事务-TCC事务补偿型方案(有远程调用适合AT,有高并发情况适合TCC)
* 刚性事务:遵循ACID原则,强一致性
* 柔性事务:遵循BASE理论,最终一致性
* 与刚性事务不同柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致
* 一阶段prepare行为:调用自定义的prepare逻辑
* 二阶段commit行为:调用自定义的commit逻辑
* 三阶段rollback行为:调用自定义的rollback逻辑
* 所谓TCC模式,是指支持把自定义的分支事务纳入到全局事务管理中
* 柔性事务-最大努力通知方案
* 按规律进行通知,不包含智能数据一定能通知成功,但会提供可查询操作接口进行核对。这种方案主要结合与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。这种方案也是结合MQ进行实现,例如:通过MQ发送http请求,设置最大通知次数。达到通知次数后即不在通知
* 案例:银行通知、商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对账文件),支付宝的支付成功异步回调。
* 柔性事务-可靠消息—最终一致性方案(异步确保型)
* 实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确认发送。只有在确认发送指令后,实时消息服务才会真正发送
* 阿里巴巴的Seata分布式事务(http://seata.io/zh-cn/docs/overview/what-is-seata.html)
* 术语:
* TC:事务协调者-》维护全局和分支事务的状态,驱动去哪聚事务提交和回滚
* TM:事务管理器-》定义全局事务的范围:开始全局事务、提交和回滚全局事务
* RM:资源管理器-》管理分支事务处理资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚
* SEATA AT模式需要 UNDO_LOG表
CREATE TABLE undo_log
( id
bigint(20) NOT NULL AUTO_INCREMENT, branch_id
bigint(20) NOT NULL, xid
varchar(100) NOT NULL, context
varchar(128) NOT NULL, rollback_info
longblob NOT NULL, log_status
int(11) NOT NULL, log_created
datetime NOT NULL, log_modified
datetime NOT NULL, ext
varchar(100) DEFAULT NULL, PRIMARY KEY (id
), UNIQUE KEY ux_undo_log
(xid
,branch_id
) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
* AT模式
* 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源
* 二阶段:
* 提交异步化,非常快速地完成
* 回滚通过一阶段日志进行反向补偿
* 写隔离
* 一阶段本地事务提交前,需要确保先拿到全局锁
* 拿不到全局锁,不能提交本地事务
* 那全局锁的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,方式本地所
以一个示例来说明:两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
* 读隔离
* 在数据库本地事务隔离级别读一提交(read commited)或以上基本上,seata(AT模式)的默认全局隔离级别是读未提交。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
* 库存解锁
* RabbitMQ的延时队列(实现定时任务)
* 场景:比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品
* 常用解决方案:spring的schedule定时任务轮询数据库
* 缺点:消耗系统内存、增加了数据库的压力、存在较大的时间误差
* 解决:rabbitmq的消息TTL和死信Exchange结合
* 消息的TTL就是消息的存活时间。
* rabbitmq可以对队列和消息分别设置TTL
* 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
* 如果队列队列设置了,消息也设置了,那么会取小。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果
* 死信(DLX):一个消息在满足如下条件,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列
* 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/basic.nack)requeue=false
* 上面的消息的ttl到了,消息过期了
* 死信队列其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange 的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
* 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合两者,其实就可以实现一个延时队列
队列设置过期时间(推荐)消息设置过期时间延时队列模拟延时队列
* 库存解锁场景
* 下订单成功,订单过期没有支付被系统自动取消、被用户手动取消,都需要解锁库存
* 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚,之前锁定的库存就要自动解锁
* 保证消息可靠性
* 消息丢失
* 消息发出去,由于网络原因没有抵达服务器
* 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期三秒重发的方式
* 做好日志记录,每个消息状态是否被服务器收到都应记录
* 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发
* 消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机
* publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态
* 自动ACK的状态下。消费者收到消息,但为来得及消费消息宕机
* 一定开启手动ack,消费成功才移除,失败或者没有来得及处理就noAck并重新入队
* 消息重复
* 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者
* 消息消费失败,由于重试机制,自动又将消息发送出去
* 成功消息,ack时宕机,消息由unack变为ready,Broker有重新发送
* 消息者的业务消费接口应该设计为幂等性的。比如扣库存有工作但的状态标志
* 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用处理
* rabbitmq的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递
* 消息积压
* 消费者宕机积压
* 消费者消费能力不足积压
* 发送者发送流量过大
* 上线更多的消费者,进行正常消费
* 上线专门的队列消费服务,将消息先批量去出来,记录数据库,离线慢慢处理
* 支付宝收单
上一篇: 记录JavaWeb项目部署的一些坑
下一篇: Postman是什么 怎么用