微服务架构-利用事件驱动实现最终一致性
事务一致性
首先,我们来回顾一下acid原则:
- atomicity:原子性,改变数据状态要么是一起完成,要么一起失败
- consistency:一致性,数据的状态是完整一致的
- isolation:隔离线,即使有并发事务,互相之间也不影响
- durability:持久性, 一旦事务提交,不可撤销
在单体应用中,我们可以利用关系型数据库的特性去完成事务一致性,但是一旦应用往微服务发展,根据业务拆分成不用的模块,而且每个模块的数据库已经分离开了,这时候,我们要面对的就是分布式事务了,需要自己在代码里头完成acid了。比较流行的解决方案有:两阶段提交、补偿机制、本地消息表(利用本地事务和mq)、mq的事务消息(rocketmq)。
大家可以到此篇文章去了解一下:分布式事务的四种解决方案
cap定理
1998年,加州大学的计算机科学家 eric brewer 提出,分布式系统有三个指标。
- consistency:一致性
- availability:可用性
- partition tolerance:分区容错
eric brewer 说,这三个指标不可能同时做到。这个结论就叫做 cap 定理。
微服务中,不同模块之间使用的数据库是不同的,不同模块之间部署的服务去也有可能是不用的,那么分区容错是无法避免的,因为服务之间的调用不能保证百分百的没问题,所以系统设计必须考虑这种情况。因此,我们可以认为cap的p总是成立的,剩下的c和a无法同时做到。
实际上根据分布式系统中cap原则,当p(分区容忍)发生的时候,强行追求c(一致性),会导致(a)可用性、吞吐量下降,此时我们一般用最终一致性来保证我们系统的ap能力。当然不是放弃c,而是放弃强一致性,而且在一般情况下cap都能保证,只是在发生分区容错的情况下,我们可以通过最终一致性来保证数据一致。
事件驱动实现最终一致性
事件驱动架构在领域对象之间通过异步的消息来同步状态,有些消息也可以同时发布给多个服务,在消息引起了一个服务的同步后可能会引起另外消息,事件会扩散开。严格意义上的事件驱动是没有同步调用的。
例子:
在电商里面,用户下单必须根据库存来确定订单是否成交。
项目架构:springboot2+mybatis+tk-mybatis+activemq【因为小例子,不做成spring cloud架构】
首先,我们来看看正常的服务之间调用:
代码:
@override @transactional(rollbackfor = exception.class) public result placeorder(orderquery query) { result result = new result(); // 先远程调用stock-service去减少库存 resttemplate resttemplate = new resttemplate(); //请求头 httpheaders headers = new httpheaders(); headers.setcontenttype(mediatype.application_json); //封装成一个请求对象 httpentity entity = new httpentity(query, headers); // 同步调用库存服务的接口 result stockresult = resttemplate.postforobject("http://127.0.0.1:8081/stock/reducestock",entity,result.class); if (stockresult.getcode() == result.resultconstants.success){ order order = new order(); beanutils.copyproperties(query,order); order.setorderstatus(1); integer insertcount = ordermapper.insertselective(order); if (insertcount == 1){ result.setmsg("下单成功"); }else { result.setmsg("下单失败"); } }else { result.setcode(result.resultconstants.fail); result.setmsg("下单失败:"+stockresult.getmsg()); } return result; }
我们可以看到,这样的服务调用的弊端多多:
1、订单服务需同步等待库存服务的返回结果,接口结果返回延误。 2、订单服务直接依赖于库存服务,只要库存服务崩了,订单服务不能再正常运行。 3、订单服务需考虑并发问题,库存最后可能为负。
下面开始利用事件驱动实现最终一致性
1、在订单服务新增订单后,订单的状态是“已开启”,然后发布一个order created事件到消息队列上
代码:
@transactional(rollbackfor = exception.class) public result placeorderbymq(orderquery query) { result result = new result(); // 先创建订单,状态为下单0 order order = new order(); beanutils.copyproperties(query,order); order.setorderstatus(0); integer insertcount = ordermapper.insertselective(order); if (insertcount == 1){ // 发送 订单消息 mqordermsg mqordermsg = new mqordermsg(); mqordermsg.setid(order.getid()); mqordermsg.setgoodcount(query.getgoodcount()); mqordermsg.setgoodname(query.getgoodname()); mqordermsg.setstockid(query.getstockid()); jmsproducer.sendordercreatedmsg(mqordermsg); // 此时的订单只是开启状态 result.setmsg("下单成功"); } return result; }
2、库存服务在监听到消息队列ordercreated中的消息,将库存表中商品的库存减去下单数量,然后再发送一个stock locked事件给消息队列。
代码:
/** * 接收下单消息 * @param message 接收到的消息 * @param session 上下文 */ @jmslistener(destination = order_create,containerfactory = "mylistenercontainerfactory") @transactional(rollbackfor = exception.class) public void receiveordercreatedmsg(message message, session session){ try { if (message instanceof activemqobjectmessage){ mqstockmsg result = new mqstockmsg(); activemqobjectmessage objectmessage=(activemqobjectmessage)message; mqordermsg msg = (mqordermsg)objectmessage.getobject(); integer updatecount = stockmapper.updatenumbystockid(msg.getstockid(),msg.getgoodcount()); if (updatecount >= 1){ result.setsuccess(true); result.setorderid(msg.getid()); }else { result.setsuccess(false); } // 手动ack,使消息出队列,不然会不断消费 message.acknowledge(); // 发送库存锁定消息到mq jmsproducer.sendstocklockedmsg(result); } } catch (jmsexception e) { log.error("接收订单创建消息报错:"+e.getmessage()); } }
仔细的朋友可能会看到:message.acknowledge(),即手动确认消息。因为在保证库存服务的逻辑能正常执行后再确认消息已消费,可以保证消息的投递可靠性,万一在库存服务执行时报出异常,我们可以做到重新消费该下单消息。
3、订单服务接收到stock locked事件,将订单的状态改为“已确认”
代码:
/** * 判断是否还有库存,有库存更新订单状态为1,无库存更新订单状态为2,并且通知用户(websocket) * @param message */ @jmslistener(destination = stock_locked,containerfactory = "mylistenercontainerfactory") @transactional(rollbackfor = exception.class) public void receivestocklockedmsg(message message, session session){ try { if (message instanceof activemqobjectmessage){ activemqobjectmessage objectmessage=(activemqobjectmessage)message; mqstockmsg msg = (mqstockmsg)objectmessage.getobject(); if (msg.issuccess()){ order updateorder = new order(); updateorder.setid(msg.getorderid()); updateorder.setorderstatus(1); ordermapper.updatebyprimarykeyselective(updateorder); log.info("订单【"+msg.getorderid()+"】下单成功"); }else { order updateorder = new order(); updateorder.setid(msg.getorderid()); updateorder.setorderstatus(2); ordermapper.updatebyprimarykeyselective(updateorder); // 通知用户库存不足,订单被取消 log.error("订单【"+msg.getorderid()+"】因库存不足被取消"); } // 手动ack,使消息出队列,不然会不断消费 message.acknowledge(); } } catch (jmsexception e) { log.error("接收库存锁定消息报错:"+e.getmessage()); } }
同样,这里我们也是会利用手动确认消息来保证消息的投递可靠性。
至此,已经全部搞定了。我们看一下和正常的服务调用对比如何:
1、订单服务不再直接依赖于库存服务,而是将下单事件发送到mq中,让库存监听。 2、订单服务能真正的作为一个模块独立运行。 3、解决了并发问题,而且mq的队列处理效率非常的高。
但是也存在下面的问题:
1、用户体验改变了:因为使用事件机制,订单是立即生成的,可是很有可能过一会,系统会提醒你没货了。。这就像是排队抢购一样,排着排着就被通知没货了,不用再排队了。 2、数据库可能会存在很对没有完成下单的订单。
最后,如果真的要考虑用户体验,并且不想数据库存在很多不必要的数据,该怎么办?
那就把订单服务和库存服务聚合在一起吧。解决当前的问题应当是首先要考虑的,我们设计微服务的目的是本想是解决业务并发量。而现在面临的却是用户体验的问题,所以架构设计也是需要妥协的。
最主要是,我们是经过思考和分析的,每个方案能做到哪种程度,能应用到哪种场景。正所谓,技术要和实际场景结合,我们不能为了追求新技术而生搬硬套。