分布式事务框架tcc_transaction第二篇——源码详解
框架思想简介
主要通过spring-aop拦截器CompensableTransactionInterceptor、ResourceCoordinatorInterceptor
CompensableTransactionInterceptor : 用于tcc事务的流程执行begin(try)、commit(confirm)、rollback(cancel)
ResourceCoordinatorInterceptor : 用于记录tcc事务的Participant(参与方)
回顾下TCC事务模型:
try阶段是由业务方发起调用,而confirm和cancel由事务管理器自动调用。在这里,事务管理器就是CompensableTransactionInterceptor,confirm和cancel方法由它自动调用
源码分析
具体http rest模式下使用可参考第一篇,本文以此作为讲解
使用入口主要是注解
org.mengyun.tcctransaction.api.Compensable
框架对这个注解配置了两个拦截器,在tcc-transaction.xml中(本人开发的spring-boot模式,在TccTransactionAutoConfiguration内)
<bean id="compensableTransactionAspect" class="org.mengyun.tcctransaction.spring.ConfigurableTransactionAspect"
init-method="init">
<property name="transactionConfigurator" ref="transactionConfigurator"/>
</bean>
<bean id="resourceCoordinatorAspect" class="org.mengyun.tcctransaction.spring.ConfigurableCoordinatorAspect"
init-method="init">
<property name="transactionConfigurator" ref="transactionConfigurator"/>
</bean>
ConfigurableTransactionAspect的优先级比ConfigurableCoordinatorAspect高,也就是执行时先走CompensableTransactionInterceptor再走ResourceCoordinatorInterceptor
CompensableTransactionInterceptor
注意:下面源码解析中的参与者,实际包含:事务发起方、事务参与方、远程事务参与方
事务发起方(本身也是一个参与者)使用
//开启tcc事务,并且当前order是第一个参与者
@Compensable(confirmMethod = "confirmOrder", cancelMethod = "cancelOrder")
public String orderTcc(String tccTrace) {
tccMember(null, tccTrace);
}
事务参与方(事务参与方与事务发起方在一个系统,就在事务发起方的try方法里)使用,并且方法第一个入参是TransactionContext
@Compensable(confirmMethod = "tccMember", cancelMethod = "tccMember", propagation = Propagation.SUPPORTS)
public TmcResponse<String> tccMember(TransactionContext transactionContext, String tccTrace) {
//发起远程调用
//到memberTry(transactionContext, tccTrace);
}
远程事务参与方(在另外一个系统,下面源码讲解注意与事务参与方区分)使用,并且方法第一个参数是TransactionContext
@Compensable(confirmMethod = "memberConfirm", cancelMethod = "memberCancel")
@Transactional
public void memberTry(TransactionContext transactionContext, String tccTrace){
log.info("member try starting...");
}
==========================================================
看下CompensableTransactionAspect#interceptCompensableMethod为拦截器执行入口
@Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}
到CompensableTransactionInterceptor#interceptCompensableMethod
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
//compensableMethodContext包含transactionContext,从被拦截的方法入参中获取,通过.Compensable#transactionContextEditor(默认DefaultTransactionContextEditor)
//如果是事务发起方transactionContext就为null,事务参与方就取方法参数里的transactionContext
//(从代码里看,也不一定非要transactionContext在方法参数第一位,只要有这个参数就行)
CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp);
//当前线程是否绑定事务Transaction,事务发起方调用为null,远程事务参与方一开始也为null
boolean isTransactionActive = transactionManager.isTransactionActive();
//看下面
if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, compensableMethodContext)) {
throw new SystemException("no active compensable transaction while propagation is mandatory for method " + compensableMethodContext.getMethod().getName());
}
switch (compensableMethodContext.getMethodRole(isTransactionActive)) {
case ROOT://事务发起方走这里
return rootMethodProceed(compensableMethodContext); //看下面
case PROVIDER://远程事务参与方走这里
return providerMethodProceed(compensableMethodContext); //看下面
default://事务参与方走这里,会走到ResourceCoordinatorInterceptor
return pjp.proceed();
}
}
CompensableMethodContext#getMethodRole
public MethodRole getMethodRole(boolean isTransactionActive) {
//Compensable.propagation默认是REQUIRED,也就是事务发起方为REQUIRED
//事务参与方配置了为SUPPORTS
//远程事务参与方没有配置默认REQUIRED
if ((propagation.equals(Propagation.REQUIRED) && !isTransactionActive && transactionContext == null) ||
propagation.equals(Propagation.REQUIRES_NEW)) {
return MethodRole.ROOT;//事务发起方
} else if ((propagation.equals(Propagation.REQUIRED) || propagation.equals(Propagation.MANDATORY)) && !isTransactionActive && transactionContext != null) {
return MethodRole.PROVIDER;//远程事务参与方(服务提供者)
} else {
return MethodRole.NORMAL;//事务参与方(也就是官方文档为什么要说配置成SUPPORTS)
}
}
CompensableMethodContext#rootMethodProceed,这就是框架的核心,事务流程代码,其实可以发现,它和spring本身的事务管理挺像得,两者的思想基本一致吧
private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {
Object returnValue = null;
Transaction transaction = null;
//是否开启异步confirm模式,这样可以提高性能
//Compensable#asyncConfirm是否为true
boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();
//是否开启异步confirm模式,这样可以提高性能
//Compensable#asyncCancel是否为true
boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();
Set<Class<? extends Exception>> allDelayCancelExceptions = new HashSet<Class<? extends Exception>>();
allDelayCancelExceptions.addAll(this.delayCancelExceptions);
allDelayCancelExceptions.addAll(Arrays.asList(compensableMethodContext.getAnnotation().delayCancelExceptions()));
try {
//创建事务,并持久化,绑定事务到当前线程,看下面
transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity());
try {
//这里直接走到ResourceCoordinatorInterceptor
//其实这里最后,就是执行参与者的try方法
returnValue = compensableMethodContext.proceed();
} catch (Throwable tryingException) {
if (!isDelayCancelException(tryingException, allDelayCancelExceptions)) {
logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
//如果try失败,就执行参与者的cancel方法
transactionManager.rollback(asyncCancel);
}
throw tryingException;
}
//try成功就执行参与者的confirm方法
transactionManager.commit(asyncConfirm); //rollback和commit,就看下commit,看下面
} finally {
//清除线程中缓存的Transaction
transactionManager.cleanAfterCompletion(transaction);
}
return returnValue;
}
TransactionManager#begin(java.lang.Object)
public Transaction begin(Object uniqueIdentify) {
Transaction transaction = new Transaction(uniqueIdentify,TransactionType.ROOT);
//事务持久化,这里支持jdbc\redis\zookeeper\file等
transactionRepository.create(transaction);
//将Transaction绑定到当前线程
registerTransaction(transaction);
return transaction;
}
事务提交,也就是事务管理器自动调用confirm方法,rollback方法同理,参考TransactionManager#commit
public void commit(boolean asyncCommit) {
//获取当前线程绑定的事务
final Transaction transaction = getCurrentTransaction();
//事务状态变更为确认中(2)
transaction.changeStatus(TransactionStatus.CONFIRMING);
//持久化更新
transactionRepository.update(transaction);
//如果开启异步confirm
if (asyncCommit) {
try {
Long statTime = System.currentTimeMillis();
executorService.submit(new Runnable() {
@Override
public void run() {
commitTransaction(transaction);
}
});
logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
} catch (Throwable commitException) {
logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);
throw new ConfirmingException(commitException);
}
} else {
//没有开启异步
commitTransaction(transaction);
}
}
再贴下commitTransaction
private void commitTransaction(Transaction transaction) {
try {
//这里就是去执行事务参与者的confirm方法
transaction.commit();
//执行成功就把持久化的事务删除掉
transactionRepository.delete(transaction);
} catch (Throwable commitException) {
logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
throw new ConfirmingException(commitException);
}
}
Transaction#commit
public void commit() {
for (Participant participant : participants) {
participant.commit();(参与者的commit方法)
}
}
Participant#commit(participant怎么来的,看ResourceCoordinatorInterceptor)
public void commit() {
//反射调用confirm方法
Terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}
==========================================================
看下远程事务参与方(服务提供者)的执行代码:CompensableTransactionInterceptor#providerMethodProceed
private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {
Transaction transaction = null;
//同rootMethodProceed
boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();
//同rootMethodProceed
boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();
try {
//上面说了远程事务参与方的transactionContext不为null,由事务发起方传过来
switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) {
case TRYING: //事务发起方begin的时候,transaction状态为TRYING
//通过传过来的TransactionContext,在服务提供者创建Transaction并保持Transaction.xid一致,代表一个全局事务
transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext());
//调用远程事务参与方的try方法
return compensableMethodContext.proceed();
case CONFIRMING: //事务发起方commit的时候,transaction状态为CONFIRMING
try {
transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
//调用远程事务参与方confirm方法
transactionManager.commit(asyncConfirm);
} catch (NoExistedTransactionException excepton) {
//the transaction has been commit,ignore it.
}
break;
case CANCELLING: //事务发起方rollback的时候,transaction状态为CANCELLING
try {
transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
//调用远程事务参与方的cancel方法
transactionManager.rollback(asyncCancel);
} catch (NoExistedTransactionException exception) {
//the transaction has been rollback,ignore it.
}
break;
}
} finally {
transactionManager.cleanAfterCompletion(transaction);
}
Method method = compensableMethodContext.getMethod();
return ReflectionUtils.getNullValue(method.getReturnType());
}
ResourceCoordinatorInterceptor
CompensableTransactionInterceptor#interceptCompensableMethod执行完到ResourceCoordinatorInterceptor#interceptTransactionContextMethod
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
//获取当前事务,事务发起方(rootMethodProceed中绑定线程)和远程事务参与方(providerMethodProceed中绑定线程)
Transaction transaction = transactionManager.getCurrentTransaction();
if (transaction != null) {
//判断当前事务状态
switch (transaction.getStatus()) {
case TRYING:
//这里再说明下
//事务发起方和事务参与方在一个系统,然后他们在一个Transaction中,下面这个方法就是把它们抽象化为Participant放到Transaction中
//远程事务参与方在另外一个系统,会有自己的Transaction,只不过它的Transaction.xid与发起方系统的Transaction一致,上面已经解释
//Participant包括了try\confirm\cancel等方法,供事务管理器CompensableTransactionInterceptor调用
enlistParticipant(pjp);
break;
case CONFIRMING:
break; //直接走下去调用
case CANCELLING:
break;
}
}
//接着就去调用真正的try、confirm、cancel方法了
return pjp.proceed(pjp.getArgs());
}
补偿机制——重试
主要是TCC的confirm或这cancel阶段出现异常,会去重试调用。
主要逻辑在RecoverScheduledJob.TransactionRecovery中。
TransactionRecovery#startRecover
public void startRecover() {
//从持久化中查询未完成的事务
List<Transaction> transactions = loadErrorTransactions();
//事务恢复操作
recoverErrorTransactions(transactions);
}
看下如何获取未完成事务
TransactionRecovery#loadErrorTransactions
====>
TransactionRepository#findAllUnmodifiedSince
====>假设持久化方式为jdbc
JdbcTransactionRepository#doFindAllUnmodifiedSince
查询sql如下:(这里的表明是【“TCC_TRANSACTION” + tbSuffix】)
SELECT GLOBAL_TX_ID, BRANCH_QUALIFIER, CONTENT,STATUS,TRANSACTION_TYPE,CREATE_TIME,LAST_UPDATE_TIME,RETRIED_COUNT,VERSION FROM TCC_TRANSACTION WHERE LAST_UPDATE_TIME < ? AND IS_DELETE = 0
也就是获取(当前时间-recoverConfig.getRecoverDuration() * 1000)之前所有IS_DELETE=0的事务,正常情况事务执行完记录都会被删除,所以这个表记录肯定很小
上一篇: computeIfAbsent的使用
下一篇: J.U.C之AQS:阻塞和唤醒线程