欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

分布式事务框架tcc_transaction第二篇——源码详解

程序员文章站 2022-05-04 20:59:30
...

框架思想简介

主要通过spring-aop拦截器CompensableTransactionInterceptor、ResourceCoordinatorInterceptor
CompensableTransactionInterceptor : 用于tcc事务的流程执行begin(try)、commit(confirm)、rollback(cancel)
ResourceCoordinatorInterceptor : 用于记录tcc事务的Participant(参与方)

回顾下TCC事务模型:
分布式事务框架tcc_transaction第二篇——源码详解
try阶段是由业务方发起调用,而confirm和cancel由事务管理器自动调用。在这里,事务管理器就是CompensableTransactionInterceptor,confirm和cancel方法由它自动调用

源码分析

具体http rest模式下使用可参考第一篇,本文以此作为讲解

使用入口主要是注解

org.mengyun.tcctransaction.api.Compensable

框架对这个注解配置了两个拦截器,在tcc-transaction.xml中(本人开发的spring-boot模式,在TccTransactionAutoConfiguration内)

	<bean id="compensableTransactionAspect" class="org.mengyun.tcctransaction.spring.ConfigurableTransactionAspect"
          init-method="init">
        <property name="transactionConfigurator" ref="transactionConfigurator"/>
    </bean>

    <bean id="resourceCoordinatorAspect" class="org.mengyun.tcctransaction.spring.ConfigurableCoordinatorAspect"
          init-method="init">
        <property name="transactionConfigurator" ref="transactionConfigurator"/>
    </bean>

ConfigurableTransactionAspect的优先级比ConfigurableCoordinatorAspect高,也就是执行时先走CompensableTransactionInterceptor再走ResourceCoordinatorInterceptor

CompensableTransactionInterceptor

注意:下面源码解析中的参与者,实际包含:事务发起方、事务参与方、远程事务参与方
事务发起方(本身也是一个参与者)使用

	//开启tcc事务,并且当前order是第一个参与者
    @Compensable(confirmMethod = "confirmOrder", cancelMethod = "cancelOrder")
    public String orderTcc(String tccTrace) {
    	tccMember(null, tccTrace);
    }

事务参与方(事务参与方与事务发起方在一个系统,就在事务发起方的try方法里)使用,并且方法第一个入参是TransactionContext

	@Compensable(confirmMethod = "tccMember", cancelMethod = "tccMember", propagation = Propagation.SUPPORTS)
	public TmcResponse<String> tccMember(TransactionContext transactionContext, String tccTrace) {
		//发起远程调用
		//到memberTry(transactionContext, tccTrace);
	}

远程事务参与方(在另外一个系统,下面源码讲解注意与事务参与方区分)使用,并且方法第一个参数是TransactionContext

	@Compensable(confirmMethod = "memberConfirm", cancelMethod = "memberCancel")
    @Transactional
    public void memberTry(TransactionContext transactionContext, String tccTrace){
        log.info("member try starting...");
    }

==========================================================
看下CompensableTransactionAspect#interceptCompensableMethod为拦截器执行入口

@Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

    return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}

到CompensableTransactionInterceptor#interceptCompensableMethod

public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
	//compensableMethodContext包含transactionContext,从被拦截的方法入参中获取,通过.Compensable#transactionContextEditor(默认DefaultTransactionContextEditor)
	//如果是事务发起方transactionContext就为null,事务参与方就取方法参数里的transactionContext
	//(从代码里看,也不一定非要transactionContext在方法参数第一位,只要有这个参数就行)
    CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp);

	//当前线程是否绑定事务Transaction,事务发起方调用为null,远程事务参与方一开始也为null
    boolean isTransactionActive = transactionManager.isTransactionActive();
	//看下面
    if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, compensableMethodContext)) {
        throw new SystemException("no active compensable transaction while propagation is mandatory for method " + compensableMethodContext.getMethod().getName());
    }

    switch (compensableMethodContext.getMethodRole(isTransactionActive)) {
        case ROOT://事务发起方走这里
            return rootMethodProceed(compensableMethodContext); //看下面
        case PROVIDER://远程事务参与方走这里
            return providerMethodProceed(compensableMethodContext); //看下面
        default://事务参与方走这里,会走到ResourceCoordinatorInterceptor
            return pjp.proceed();
    }
}

CompensableMethodContext#getMethodRole

public MethodRole getMethodRole(boolean isTransactionActive) {
	//Compensable.propagation默认是REQUIRED,也就是事务发起方为REQUIRED
	//事务参与方配置了为SUPPORTS
	//远程事务参与方没有配置默认REQUIRED
    if ((propagation.equals(Propagation.REQUIRED) && !isTransactionActive && transactionContext == null) ||
            propagation.equals(Propagation.REQUIRES_NEW)) {
        return MethodRole.ROOT;//事务发起方
    } else if ((propagation.equals(Propagation.REQUIRED) || propagation.equals(Propagation.MANDATORY)) && !isTransactionActive && transactionContext != null) {
        return MethodRole.PROVIDER;//远程事务参与方(服务提供者)
    } else {
        return MethodRole.NORMAL;//事务参与方(也就是官方文档为什么要说配置成SUPPORTS)
    }
}

CompensableMethodContext#rootMethodProceed,这就是框架的核心,事务流程代码,其实可以发现,它和spring本身的事务管理挺像得,两者的思想基本一致吧

private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {

        Object returnValue = null;

        Transaction transaction = null;
		//是否开启异步confirm模式,这样可以提高性能
		//Compensable#asyncConfirm是否为true
        boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();
		//是否开启异步confirm模式,这样可以提高性能
		//Compensable#asyncCancel是否为true
        boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();

        Set<Class<? extends Exception>> allDelayCancelExceptions = new HashSet<Class<? extends Exception>>();
        allDelayCancelExceptions.addAll(this.delayCancelExceptions);
        allDelayCancelExceptions.addAll(Arrays.asList(compensableMethodContext.getAnnotation().delayCancelExceptions()));

        try {
			//创建事务,并持久化,绑定事务到当前线程,看下面
            transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity());

            try {
            //这里直接走到ResourceCoordinatorInterceptor
            //其实这里最后,就是执行参与者的try方法
                returnValue = compensableMethodContext.proceed(); 
            } catch (Throwable tryingException) {

                if (!isDelayCancelException(tryingException, allDelayCancelExceptions)) {

                    logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);

            		//如果try失败,就执行参与者的cancel方法
                    transactionManager.rollback(asyncCancel);
                }

                throw tryingException;
            }
			//try成功就执行参与者的confirm方法
            transactionManager.commit(asyncConfirm); //rollback和commit,就看下commit,看下面

        } finally {
       		//清除线程中缓存的Transaction
            transactionManager.cleanAfterCompletion(transaction);
        }

        return returnValue;
    }

TransactionManager#begin(java.lang.Object)

public Transaction begin(Object uniqueIdentify) {
        Transaction transaction = new Transaction(uniqueIdentify,TransactionType.ROOT);
        //事务持久化,这里支持jdbc\redis\zookeeper\file等
        transactionRepository.create(transaction);
        //将Transaction绑定到当前线程
        registerTransaction(transaction);
        return transaction;
    }

事务提交,也就是事务管理器自动调用confirm方法,rollback方法同理,参考TransactionManager#commit

public void commit(boolean asyncCommit) {
	//获取当前线程绑定的事务
   final Transaction transaction = getCurrentTransaction();
	//事务状态变更为确认中(2)
   transaction.changeStatus(TransactionStatus.CONFIRMING);
	//持久化更新
   transactionRepository.update(transaction);
	//如果开启异步confirm
   if (asyncCommit) {
       try {
           Long statTime = System.currentTimeMillis();

           executorService.submit(new Runnable() {
               @Override
               public void run() {
                   commitTransaction(transaction);
               }
           });
           logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
       } catch (Throwable commitException) {
           logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);
           throw new ConfirmingException(commitException);
       }
   } else {
	   //没有开启异步
       commitTransaction(transaction);
   }
}
再贴下commitTransaction
private void commitTransaction(Transaction transaction) {
    try {
    	//这里就是去执行事务参与者的confirm方法
        transaction.commit();
       	//执行成功就把持久化的事务删除掉
        transactionRepository.delete(transaction);
    } catch (Throwable commitException) {
        logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
        throw new ConfirmingException(commitException);
    }
}

Transaction#commit

public void commit() {

    for (Participant participant : participants) {
        participant.commit();(参与者的commit方法)
    }
}

Participant#commit(participant怎么来的,看ResourceCoordinatorInterceptor)

public void commit() {
	//反射调用confirm方法
    Terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}

==========================================================
看下远程事务参与方(服务提供者)的执行代码:CompensableTransactionInterceptor#providerMethodProceed

private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {

   Transaction transaction = null;
   //同rootMethodProceed
   boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();
	//同rootMethodProceed
   boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();

   try {
		//上面说了远程事务参与方的transactionContext不为null,由事务发起方传过来
       switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) {
           case TRYING: //事务发起方begin的时候,transaction状态为TRYING
           		//通过传过来的TransactionContext,在服务提供者创建Transaction并保持Transaction.xid一致,代表一个全局事务
               transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext());
               //调用远程事务参与方的try方法
               return compensableMethodContext.proceed();
           case CONFIRMING: //事务发起方commit的时候,transaction状态为CONFIRMING
               try {
                   transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
                   //调用远程事务参与方confirm方法
                   transactionManager.commit(asyncConfirm);
               } catch (NoExistedTransactionException excepton) {
                   //the transaction has been commit,ignore it.
               }
               break;
           case CANCELLING: //事务发起方rollback的时候,transaction状态为CANCELLING

               try {
                   transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
                   //调用远程事务参与方的cancel方法
                   transactionManager.rollback(asyncCancel);
               } catch (NoExistedTransactionException exception) {
                   //the transaction has been rollback,ignore it.
               }
               break;
       }

   } finally {
       transactionManager.cleanAfterCompletion(transaction);
   }

   Method method = compensableMethodContext.getMethod();

   return ReflectionUtils.getNullValue(method.getReturnType());
}

ResourceCoordinatorInterceptor

CompensableTransactionInterceptor#interceptCompensableMethod执行完到ResourceCoordinatorInterceptor#interceptTransactionContextMethod

public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
		//获取当前事务,事务发起方(rootMethodProceed中绑定线程)和远程事务参与方(providerMethodProceed中绑定线程)
        Transaction transaction = transactionManager.getCurrentTransaction();

        if (transaction != null) {
			//判断当前事务状态
            switch (transaction.getStatus()) {
                case TRYING:
                	//这里再说明下
                	//事务发起方和事务参与方在一个系统,然后他们在一个Transaction中,下面这个方法就是把它们抽象化为Participant放到Transaction中
                	//远程事务参与方在另外一个系统,会有自己的Transaction,只不过它的Transaction.xid与发起方系统的Transaction一致,上面已经解释
                	//Participant包括了try\confirm\cancel等方法,供事务管理器CompensableTransactionInterceptor调用
                    enlistParticipant(pjp);
                    break;
                case CONFIRMING:
                    break; //直接走下去调用
                case CANCELLING:
                    break;
            }
        }
		//接着就去调用真正的try、confirm、cancel方法了
        return pjp.proceed(pjp.getArgs());
    }

补偿机制——重试

主要是TCC的confirm或这cancel阶段出现异常,会去重试调用。
主要逻辑在RecoverScheduledJob.TransactionRecovery中。
TransactionRecovery#startRecover

public void startRecover() {
	//从持久化中查询未完成的事务
     List<Transaction> transactions = loadErrorTransactions();
	//事务恢复操作
     recoverErrorTransactions(transactions);
 }

看下如何获取未完成事务

TransactionRecovery#loadErrorTransactions
====>
TransactionRepository#findAllUnmodifiedSince
====>假设持久化方式为jdbc
JdbcTransactionRepository#doFindAllUnmodifiedSince
查询sql如下:(这里的表明是【“TCC_TRANSACTION” + tbSuffix】)
SELECT GLOBAL_TX_ID, BRANCH_QUALIFIER, CONTENT,STATUS,TRANSACTION_TYPE,CREATE_TIME,LAST_UPDATE_TIME,RETRIED_COUNT,VERSION FROM  TCC_TRANSACTION  WHERE LAST_UPDATE_TIME < ?  AND IS_DELETE = 0
也就是获取(当前时间-recoverConfig.getRecoverDuration() * 1000)之前所有IS_DELETE=0的事务,正常情况事务执行完记录都会被删除,所以这个表记录肯定很小

相关标签: 并发