欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

分布式事务之解决方案(XA和2PC)

程序员文章站 2022-06-23 23:44:32
3. 分布式事务解决方案之2PC(两阶段提交) 针对不同的分布式场景业界常见的解决方案有2PC、TCC、可靠消息最终一致性、最大努力通知这几种。 3.1. 什么是2PC 2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase) ......

 

3. 分布式事务解决方案之2pc(两阶段提交)

针对不同的分布式场景业界常见的解决方案有2pc、tcc、可靠消息最终一致性、最大努力通知这几种。

3.1. 什么是2pc

2pc即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(prepare phase)、提交阶段(commit phase),2是指两阶段,p是指准备阶段,c是提交阶段。
举例 :张三和李四好久不见,老友约起聚餐,饭店老板要求先买单,才能出票。这时张三和李四分别抱怨近况不如意,囊肿羞涩,都不愿意请客,这时只能aa。只有张三和李四都付款,老板才能出票安排就餐。但由于张三和李四都是铁公鸡,形成两尴尬的一幕 :
准备阶段 :老板要求张三付款,张三付款。老板要求李四付款,李四付款。
提交阶段 :老板出票,两人拿票纷纷落座就餐。
例子中形成两一个事务,若张三或李四其中一个拒绝付款,或钱不够,店老板都不会给出票,并且会把已收款退回。
整个事务过程由事务管理器和参与者组成,店老板就是事务管理器,张三、李四就是事务参与者,事务管理器负责决策整个分布式事务的提交和回滚,事务参与者负责自己本地事务的提交和回滚。
在计算机中部分关系数据库如oracle、mysql支持两阶段提交协议,如下图 :
1. 准备阶段(prepare phase):事务管理器给每个参与者发送prepare消息,每个数据库参与者在本地执行事务,并写本地的undo/redo日志,此时事务没有提交。
(undo日志是记录修改前的数据,用于数据库回滚,redo日志是记录修改后的数据,用于提交事务后写入数据文件)
2. 提交阶段(commit phase):如果事务管理器收到两参与者的执行失败或者超时消息时,直接给每个参与者发送回滚(rollback)消息;否则,发送提交(commit)消息;参与者根据事务管理器的指令执行提交或者回滚操作,并释放事务处理过程中使用的锁资源。注意 :必须在最后阶段释放锁资源。
下图展示两2pc的两个阶段,分成功和失败两个情况说明 :
成功情况 :
分布式事务之解决方案(XA和2PC)
失败情况 :
分布式事务之解决方案(XA和2PC)

3.2. 解决方案

3.2.1 xa方案

2pc的传统方案是在数据库层面实现的,如oracle、mysql都支持2pc协议,为了统一标准减少行业内不必要的对接成本,需要制定标准化的处理模型及接口标准,国际开放标准组织open group定义分布式事务处理模型dtp(distributed transaction processing reference model)。
为了让大家更明确xa方案的内容,下面新用户注册送积分为例来说明 :
分布式事务之解决方案(XA和2PC)
执行流程如下 :
1、应用程序(ap)持有用户库和积分库两个数据源。
2、应用程序(ap)通过tm通知用户库rm新增用户,同时通知积分库rm为该用户新增积分,rm此时并未提交事务,此时用户和积分资源锁定。
3、tm收到执行回复,只要有一方失败则分别向其他rm发起回滚事务,回滚完毕,资源锁释放。
4、tm收到执行回复,全部成功,此时向所有rm发起提交事务,提交完毕,资源锁释放。
dtp模型定义如下角色 :

  • ap(application program) : 既应用程序,可以理解为使用dtp分布式事务的程序。
  • rm(resource manager) : 即资源管理器,可以理解为事务的参与者,一般情况下是指一个数据库实例,通过资源管理器对该数据库进行控制,资源管理器控制着分支事务。
  • tm(transaction manager) : 事务管理器,负责协调和管理事务,事务管理器控制着全局事务,管理事务生命周期,并协调各个rm。全局事务是指分布式事务处理环境中,需要操作多个数据库共同完成一个工作,这个工作即是一个全局事务。
  • dtp模型定义tm和rm之间通讯的接口规范叫xa,简单理解为数据库提供的2pc接口协议,基于数据库的xa协议来实现2pc又称为xa方案
  • 以上三个角色之间的交互方式如下 :
    1)tm向ap提供应用程序编程接口,ap通过tm提交及回滚事务。
    2)tm交易中间件通过xa接口来通知rm数据库事务的开始、结束以及提交、回滚等。
    总结 :
    整个2pc的事务流程涉及到三个角色ap、rm、tm。ap指的是使用2pc分布式事务的应用程序;rm指的是资源管理器,它控制着分支事务;tm指的是事务管理器,它控制着整个全局事务。
    1)在准备阶段rm执行实际的业务操作,但不提交事务,资源锁定;
    2)在提交阶段tm会接收rm在准备阶段的执行回复,只要有任一个rm执行失败,tm会通知所有rm执行回滚操作,否则,tm将会通知所有rm提交该事务。提交阶段结束资源锁释放。
    xa方案的问题 :
    1、需要本地数据库支持xa协议。
    2、资源锁需要等到两个阶段结束才释放,性能较差。

3.2.2 seata方案

seata是阿里中间件团队发起的开源项目fescar,后更名seata,它是一个是开源的分布式事务框架。传统2pc的问题在seata中得到了解决,它通过对本地关系数据库的分支事务的协调来驱动完成全局事务,是工作在应用层的中间件。主要优点是性能较好,且不长时间占用连接资源,它以高效并且对业务0入侵的方式解决微服务场景下面临的分布式事务问题,它目前提供at模式(即2pc)及tcc模式的分布式事务解决方案。
seata的设计思想如下 :
seata的设计目标其一是对业务无入侵,因此从业务无入侵的2pc方案着手,在传统2pc的基础上演进,并解决2pc方案面临的问题。
seata把一个分布式事务理解成一个包含来若干分支事务的全局事务。全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务,下图是全局事务与分支事务的关系图 :
分布式事务之解决方案(XA和2PC)
与传统2pc的模型类似,seata定义了三个组件来协议分布式事务的处理过程 :
分布式事务之解决方案(XA和2PC)

  • transaction coordinator(tc):事务协调器,它是独立的中间件,需要独立部署运行,它维护全局事务的运行状态,接收tm指令发起全局事务的提交与回滚,负责与rm通信协调各个分支事务的提交或回滚。
  • transaction manager(tm):事务管理器,tm需要嵌入应用程序中工作,它负责开启一个全局事务,并最终向tc发起全局提交或全局回滚的指令。
  • resource manager(rm):控制分支事务,负责分支注册、状态汇报,并接收事务协调器tc的指令,驱动分支(本地)事务的提交和回滚。
    还拿新用户注册送积分举例seata的分布式事务过程 :
    分布式事务之解决方案(XA和2PC)
    具体的执行流程如下 :
  1. 用户服务的tm向tc申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的xid。
  2. 用户服务的rm向tc注册分支事务,该分支事务在用户服务执行新增用户逻辑,并将其纳入xid对应全局事务的管辖。
  3. 用户服务执行分支事务,向用户表插入一条记录。
  4. 逻辑执行到远程调用积分服务时(xid在微服务调用链路的上下文中传播)。积分服务的rm向tc注册分支事务,该分支事务执行增加积分的逻辑,并将其纳入xid对应全局事务的管辖。
  5. 积分服务执行分支事务,向积分记录表插入一条记录,执行完毕后,返回用户服务。
  6. 用户服务分支事务执行完毕。
  7. tm向tc发起针对xid的全局提交或回滚决议。
  8. tc调度xid下管辖的全部分支事务完成提交或回滚请求。

seata实现2pc与传统2pc的差别 :
架构层次方面,传统2pc方案的rm实际上是在数据库层,rm本质上就是数据库自身,通过xa协议实现,而seata的rm是以jar包的形式作为中间件层部署在应用程序的这一侧的。
两阶段提交方面,传统2pc无论第二阶段的决议是commit还是rollbcak,事务性资源的锁都要保持到phase2完成才释放。而seata的做法是在phase1就将本地事务提交,这样就可以省去phase2持锁的时间,整体提高效率。

3.3. seata实现2pc事务

3.3.1. 业务说明

本实例通过seata中间件实现分布式事务,模拟两个账户的转账交易过程。两个账户在两个不同的银行(张三在bank1、李四在bank2),bank1和bank2是两个微服务。交易过程中,张三给李四转账制定金额。
上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。
分布式事务之解决方案(XA和2PC)

3.3.2.程序组成部分

本实例程序组成 部分如下 :
数据库 :mysql-5.7.25
包括bank1和bank2两个数据库。
jdk:1.8
微服务框架 :spring-boot-2.1.3、spring-cloud-greenwich.release
seata客户端(rm、tm):spring-cloud-alibaba-seata-2.1.0release
seata服务端(tc):seata-server-0.7.1
微服务及数据库的关系 :
dtx/dtx-seata-demo/seata-demo-bank1 银行1,操作张三账户,链接数据库bank1
dtx/dtx-seata-demo/seata-demo-bank2 银行2,操作李四账户,链接数据库bank2
服务注册中兴 :dtx/discover-server
本实例程序技术架构如下 :
分布式事务之解决方案(XA和2PC)
交互流程如下 :
1、请求bank1进行转账,传入转账金额。
2、bank1减少转账金额,调用bank2,传入转账金额。

3.3.3.创建数据库

bank1库,包含张三账户

create database /*!32312 if not exists*/`bank1` /*!40100 default character set utf8 */;

use `bank1`;

/*table structure for table `account_info` */

drop table if exists `account_info`;

create table `account_info` (
  `id` bigint(20) not null auto_increment,
  `account_name` varchar(100) collate utf8_bin default null comment '户主姓名',
  `account_no` varchar(100) collate utf8_bin default null comment '银行卡号',
  `account_password` varchar(100) collate utf8_bin default null comment '帐户密码',
  `account_balance` double default null comment '帐户余额',
  primary key (`id`) using btree
) engine=innodb auto_increment=3 default charset=utf8 collate=utf8_bin row_format=dynamic;

/*data for the table `account_info` */

insert  into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (2,'张三','1',null,1000);

/*table structure for table `de_duplication` */

drop table if exists `de_duplication`;

create table `de_duplication` (
  `tx_no` varchar(64) collate utf8_bin not null,
  `create_time` datetime default null,
  primary key (`tx_no`) using btree
) engine=innodb default charset=utf8 collate=utf8_bin row_format=dynamic;

/*data for the table `de_duplication` */

/*table structure for table `local_cancel_log` */

drop table if exists `local_cancel_log`;

create table `local_cancel_log` (
  `tx_no` varchar(64) not null comment '事务id',
  `create_time` datetime default null,
  primary key (`tx_no`)
) engine=innodb default charset=utf8;

/*data for the table `local_cancel_log` */

/*table structure for table `local_confirm_log` */

drop table if exists `local_confirm_log`;

create table `local_confirm_log` (
  `tx_no` varchar(64) not null comment '事务id',
  `create_time` datetime default null,
  primary key (`tx_no`)
) engine=innodb default charset=utf8;

/*data for the table `local_confirm_log` */

/*table structure for table `local_trade_log` */

drop table if exists `local_trade_log`;

create table `local_trade_log` (
  `tx_no` bigint(20) not null,
  `create_time` datetime default null,
  primary key (`tx_no`) using btree
) engine=innodb default charset=utf8 collate=utf8_bin row_format=dynamic;
drop table if exists `local_try_log`;

create table `local_try_log` (
  `tx_no` varchar(64) not null comment '事务id',
  `create_time` datetime default null,
  primary key (`tx_no`)
) engine=innodb default charset=utf8;

/*data for the table `local_try_log` */

/*table structure for table `undo_log` */

drop table if exists `undo_log`;

create table `undo_log` (
  `id` bigint(20) not null auto_increment,
  `branch_id` bigint(20) not null,
  `xid` varchar(100) not null,
  `context` varchar(128) not null,
  `rollback_info` longblob not null,
  `log_status` int(11) not null,
  `log_created` datetime not null,
  `log_modified` datetime not null,
  `ext` varchar(100) default null,
  primary key (`id`),
  unique key `ux_undo_log` (`xid`,`branch_id`)
) engine=innodb auto_increment=167 default charset=utf8;

/*data for the table `undo_log` */

insert  into `undo_log`(`id`,`branch_id`,`xid`,`context`,`rollback_info`,`log_status`,`log_created`,`log_modified`,`ext`) values (166,2019228885,'192.168.1.101:8888:2019228047','serializer=jackson','{}',1,'2019-08-11 15:16:43','2019-08-11 15:16:43',null);

bank2库,包含李四账户

create database /*!32312 if not exists*/`bank2` /*!40100 default character set utf8 */;

use `bank2`;

/*table structure for table `account_info` */

drop table if exists `account_info`;

create table `account_info` (
  `id` bigint(20) not null auto_increment,
  `account_name` varchar(100) collate utf8_bin default null comment '户主姓名',
  `account_no` varchar(100) collate utf8_bin default null comment '银行卡号',
  `account_password` varchar(100) collate utf8_bin default null comment '帐户密码',
  `account_balance` double default null comment '帐户余额',
  primary key (`id`) using btree
) engine=innodb auto_increment=4 default charset=utf8 collate=utf8_bin row_format=dynamic;

/*data for the table `account_info` */

insert  into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (3,'李四的账户','2',null,0);

/*table structure for table `de_duplication` */

drop table if exists `de_duplication`;

create table `de_duplication` (
  `tx_no` varchar(64) collate utf8_bin not null,
  `create_time` datetime default null,
  primary key (`tx_no`) using btree
) engine=innodb default charset=utf8 collate=utf8_bin row_format=dynamic;

/*data for the table `de_duplication` */

/*table structure for table `local_cancel_log` */

drop table if exists `local_cancel_log`;

create table `local_cancel_log` (
  `tx_no` varchar(64) not null comment '事务id',
  `create_time` datetime default null,
  primary key (`tx_no`)
) engine=innodb default charset=utf8;

/*data for the table `local_cancel_log` */

/*table structure for table `local_confirm_log` */

drop table if exists `local_confirm_log`;

create table `local_confirm_log` (
  `tx_no` varchar(64) not null comment '事务id',
  `create_time` datetime default null
) engine=innodb default charset=utf8;

/*data for the table `local_confirm_log` */

/*table structure for table `local_trade_log` */

drop table if exists `local_trade_log`;

create table `local_trade_log` (
  `tx_no` bigint(20) not null,
  `create_time` datetime default null,
  primary key (`tx_no`) using btree
) engine=innodb default charset=utf8 collate=utf8_bin row_format=dynamic;
drop table if exists `local_try_log`;

create table `local_try_log` (
  `tx_no` varchar(64) not null comment '事务id',
  `create_time` datetime default null,
  primary key (`tx_no`)
) engine=innodb default charset=utf8;

/*data for the table `local_try_log` */

/*table structure for table `undo_log` */

drop table if exists `undo_log`;

create table `undo_log` (
  `id` bigint(20) not null auto_increment,
  `branch_id` bigint(20) not null,
  `xid` varchar(100) not null,
  `context` varchar(128) not null,
  `rollback_info` longblob not null,
  `log_status` int(11) not null,
  `log_created` datetime not null,
  `log_modified` datetime not null,
  `ext` varchar(100) default null,
  primary key (`id`),
  unique key `ux_undo_log` (`xid`,`branch_id`)
) engine=innodb default charset=utf8;

3.3.4.启动tc(事务协调器)

(1)下载seata服务器
下载地址 :
(2)解压并启动
winodws :【seata服务端解压路径】/bin/seata-server.bat -p 8888 -m file
mac/linux : 【seata服务端解压路径】nohup sh seata-server.sh -p 8888 -h 127.0.0.1 -m file &> seata.log &
注 :其中8888为服务端口号;file为启动模式,这里指seata服务将采用文件的方式存储信息。
分布式事务之解决方案(XA和2PC)
如上图出现“server started。。。“的字样则表示启动成功。

3.3.5 discover-server

discover-server是服务注册中心,测试工程将自己注册至discover-server。

3.3.6 创建dtx-seata-demo

dtx-seata-demo是seata的测试工程,根据业务需求需要创建两个dex-seata-demo工程。
(1)父工程maven依赖说明
在dtx父工程中指定了springboot和springcloud版本
分布式事务之解决方案(XA和2PC)
在dtx-seata-demo父工程中指定了spring-cloud-alibaba-dependencies的版本。
分布式事务之解决方案(XA和2PC)
(3)配置seata
在src/main/resource中,新增registry.conf、file.conf文件,内容可拷贝seata-server-0.7.1中的配置文件子。 在registry.conf中registry.type使用file:
分布式事务之解决方案(XA和2PC)
在file.conf中更改service.vgroup_mapping.[springcloud服务名]-fescar-service-group = “default”,并修改 service.default.grouplist =[seata服务端地址]
分布式事务之解决方案(XA和2PC)
关于vgroup_mapping的配置:
vgroup_mapping.事务分组服务名=seata server集群名称(默认名称为default) default.grouplist = seata server集群地址
在 org.springframework.cloud:spring-cloud-starter-alibaba-seata 的 org.springframework.cloud.alibaba.seata.globaltransactionautoconfiguration 类中,默认会使用 ${spring.application.name}-fescar-service-group 作为事务分组服务名注册到 seata server上,如果和 file.conf 中的配置不一致,会提示 no available server to connect 错误
也可以通过配置 spring.cloud.alibaba.seata.tx-service-group 修改后缀,但是必须和 file.conf 中的配置保持 一致。
(4)创建代理数据源
新增databaseconfiguration.java,seata的rm通过datasourceproxy才能在业务代码的事务提交时,通过这个切 入点,与tc进行通信交互、记录undo_log等。

@configuration
public class databaseconfiguration { 
	@bean
	@configurationproperties(prefix = "spring.datasource.ds0") 
	public 	druiddatasource ds0() {
	druiddatasource druiddatasource = new druiddatasource(); 
	return 		druiddatasource;
}
	@primary
	@bean
	public datasource datasource(druiddatasource ds0) {
	datasourceproxy pds0 = new datasourceproxy(ds0);
	return pds0; 
	}
}

3.3.7 seata执行流程

1、正常提交流程
分布式事务之解决方案(XA和2PC)
2、回滚流程
回滚流程省略前的rm注册过程。
分布式事务之解决方案(XA和2PC)
要点说明 :
1、每个rm使用datasourceproxy连接数据库,其目的是使用connectionproxy,使用数据源和数据连接代理的目的就是第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务操作就一定有undo_log.
2、在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成就已经将分支事务提交,也就释放了锁资源。
3、tm开启全局事务开始,将xid全局事务id放在事务上下午中,通过feign调用也将xid传入下游分支事务,每个分支事务将自己的branch id分支事务id与xid关联。
4、第二阶段全局事务提交,tc会通知各个分支参与者提交分支事务,在第一阶段就已经提交了分支事务,这里各个参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。
5、第二阶段全局事务回滚,tc会通知各个分支参与者回滚分支事务,通过xid和branch id找到相应的回滚日志,通过回滚日志生成反向的sql并执行,以完成分支事务回滚到之前的状态,如果回滚失败则会重试回滚操作。

3.3.8 dtx-seata-demo-bank1

dtx-seata-demo-bank1实现如下功能:
1、张三账户减少金额,开启全局事务。
2、远程调用bank2向李四转账。
(1)dao

@mapper
@component
public interface accountinfodao {

	//更新账户金额
@update("update account_info set account_balance = account_balance + #{amount} where account_no = #{accountno}")
int updateaccountbalance(@param("accountno") string accountno, @param("amount") double amount);

}

(2) feignclient
远程调用bank2的客户端

 @feignclient(value = "seata‐demo‐bank2",fallback = bank2clientfallback.class) public interface bank2client {
@getmapping("/bank2/transfer")
string transfer(@requestparam("amount") double amount); 
}
@component
public class bank2clientfallback implements bank2client{
@override
public string transfer(double amount) {
return "fallback"; }
}

(3)service

@service
public class accountinfoserviceimpl implements accountinfoservice {
private logger logger = loggerfactory.getlogger(accountinfoserviceimpl.class);
@autowired
accountinfodao accountinfodao;
@autowired
bank2client bank2client;
//张三转账
@override
@globaltransactional
@transactional
public void updateaccountbalance(string accountno, double amount) {
	logger.info("******** bank1 service begin ... xid: {}" , rootcontext.getxid()); //张三扣减金额
	accountinfodao.updateaccountbalance(accountno,amount*‐1);
	//向李四转账
	string remoterst = bank2client.transfer(amount); //远程调用失败
	if(remoterst.equals("fallback")){
	throw new runtimeexception("bank1 下游服务异常"); }
	//人为制造错误 if(amount==3){
	throw new runtimeexception("bank1 make exception 3"); }
	} 
}

将@globaltransactional注解标注在全局事务发起的service实现方法上,开启全局事务 :
globaltransactionalinterceptor会拦截@globaltransactional注解的方法,生成全局事务id(xid),xid会在整个分布式事务中传递。
在远程调用时,spring-cloud-alibaba-seata会拦截feign调用将xid传递到下游服务。
(6)controller

@restcontroller
public class bank1controller {
@autowired
accountinfoservice accountinfoservice;
//转账
@getmapping("/transfer")
public string transfer(double amount){
accountinfoservice.updateaccountbalance("1",amount);
return "bank1"+amount; }
}

3.3.9 dtx-seata-demo-bank2

dtx-seata-demo-bank2实现如下功能:
1、李四账户增加金额。
dtx-seata-demo-bank2在本账户事务中作为分支事务不使用@globaltransactional。
(1)dao

 @mapper
@component
public interface accountinfodao {
//向李四转账
@update("update account_info set account_balance = account_balance + #{amount} where account_no = #{accountno}")
int updateaccountbalance(@param("accountno") string accountno, @param("amount") double amount);
}

(2)service

@service
public class accountinfoserviceimpl implements accountinfoservice {
private logger logger = loggerfactory.getlogger(accountinfoserviceimpl.class);
@autowired
accountinfodao accountinfodao;
@override 
@transactional
public void updateaccountbalance(string accountno, double amount) { logger.info("******** bank2 service begin ... xid: {}" , rootcontext.getxid()); //李四增加金额
accountinfodao.updateaccountbalance(accountno,amount);
//制造异常
if(amount==2){
throw new runtimeexception("bank1 make exception 2"); }
} 
}

(3)controller

 @restcontroller
public class bank2controller {
@autowired
accountinfoservice accountinfoservice;
@getmapping("/transfer")
public string transfer(double amount){
accountinfoservice.updateaccountbalance("2",amount);
return "bank2"+amount; 
}
}

3.3.10 测试场景

  • 张三向李四转账成功。
  • 李四事务失败,张三事务回滚成功。
  • 张三事务失败,李四事务回滚成功。
  • 分支事务超时测试。

3.4. 小结

传统2pc(基于数据库xa协议)和seata实现2pc的两种2pc方案,由于seata的零入侵并且解决了传统2pc长期锁资源的问题,所以推荐采用seata实现2pc。
seata实现2pc要点 :
1、全局事务开始使用globaltransactional标识。
2、每个本地事务方案仍然使用@transactional标识。
3、每个数据都需要创建undo_log表,此表是seata保证本地事务一致性的关键。