欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

tcc-transaction 执行流程源码分析

程序员文章站 2022-04-21 23:51:22
...

1.简单使用

请参考如下链接:
tcc-transaction demo 例程源码

2.流程分析

我们根据例程代码分析,可以猜测到事务的执行流程通过spring-aop实现。
ResourceCoordinatorAspect,主要为了完成confirm和cancel的执行,其执行级别为HIGHEST_PRECEDENCE + 1;:

@Aspect
public abstract class ResourceCoordinatorAspect {

    private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor;

    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void transactionContextCall() {

    }

    @Around("transactionContextCall()")
    public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
        // 通过ConfigurableTransactionAspect的compensableTransactionInterceptor的事务相关初始化操作,执行函数逻辑
        return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
    }
}

接下来是业务逻辑的执行

public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
        // 从threadlocal中获取绑定的事务
        Transaction transaction = transactionManager.getCurrentTransaction();
        if (transaction != null) {
            // 根据事务的状态判断,只有当Trying才会执行方法try(),confirm()
            switch (transaction.getStatus()) {
                case TRYING:
                    // 列出参与者,持久化
                    enlistParticipant(pjp);
                    break;
                case CONFIRMING:
                    break;
                case CANCELLING:
                    break;
            }
        }

        return pjp.proceed(pjp.getArgs());
    }

    private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {
        // 获取被注解标注的方法,即record()
        Method method = CompensableMethodUtils.getCompensableMethod(pjp);
        Compensable compensable = method.getAnnotation(Compensable.class);
        // 通过反射获取confirm()和cancel()
        String confirmMethodName = compensable.confirmMethod();
        String cancelMethodName = compensable.cancelMethod();
        // 获取当前事务
        Transaction transaction = transactionManager.getCurrentTransaction();
        // 生成事务唯一表示id
        TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());

        if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
            FactoryBuilder
            .factoryOf(compensable.transactionContextEditor())
            .getInstance()
            // 往事务上下文写入参数,方便以后获取
            .set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
        }

        Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());

        InvocationContext confirmInvocation = new InvocationContext(targetClass,
                confirmMethodName,
                method.getParameterTypes(), pjp.getArgs());

        InvocationContext cancelInvocation = new InvocationContext(targetClass,
                cancelMethodName,
                method.getParameterTypes(), pjp.getArgs());

        // 组成一个对象
        Participant participant =
                new Participant(
                        xid,
                        confirmInvocation,
                        cancelInvocation,
                        compensable.transactionContextEditor());

        // 将自定义事务对象持久化到db,返回执行函数逻辑
        transactionManager.enlistParticipant(participant);

    }

CompensableTransactionAspect,完成事务的提交回滚操作,执行级别为HIGHEST_PRECEDENCE,次于ResourceCoordinatorAspect。

// ...声明切面
@Aspect
public abstract class CompensableTransactionAspect {

    // ...实际调用对象
    private CompensableTransactionInterceptor compensableTransactionInterceptor;

    // ...声明切点
    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void compensableService() {
    }
    // ...定义切点执行函数
    @Around("compensableService()")
    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
        return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
    }

}

然后通过继承,对CompensableTransactionInterceptor进行赋值设置,其执行级别为HIGHEST_PRECEDENCE:

    public void init() {

        TransactionManager transactionManager = transactionConfigurator.getTransactionManager();

        CompensableTransactionInterceptor compensableTransactionInterceptor = new CompensableTransactionInterceptor();
        compensableTransactionInterceptor.setTransactionManager(transactionManager);
        compensableTransactionInterceptor.setDelayCancelExceptions(transactionConfigurator.getRecoverConfig().getDelayCancelExceptions());

        this.setCompensableTransactionInterceptor(compensableTransactionInterceptor);
    }

CompensableTransactionInterceptor是个实体类,其interceptCompensableMethod方法源码如下:

    // 切点执行逻辑
    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
        // 通过反射获取被注解@Compensable标识的方法,及demo中的record方法
        Method method = CompensableMethodUtils.getCompensableMethod(pjp);
        // 获取注解定义,方便获取相关配置定义的信息
        Compensable compensable = method.getAnnotation(Compensable.class);
        // 获取默认事务传播级别:REQUIRED
        Propagation propagation = compensable.propagation();
        // 构造事务上下文
        TransactionContext transactionContext = FactoryBuilder
                .factoryOf(compensable.transactionContextEditor())
                // double-check
                .getInstance()
                .get(pjp.getTarget(), method, pjp.getArgs());
        // 检查事务是否**
        boolean isTransactionActive = transactionManager.isTransactionActive();
        // 检查事务是否非法:事务未**&&事务传播级别为MANDATORY&&transactionContext为null -->false
        // MANDATORY:表示必须存在事务执行,否则抛出异常
        if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
            throw new SystemException(...);
        }
        // 获取方法属于主事务Root还是子事务Provider,其余为Normal
        // Root->事务传播级别为REQUIRED&&事务未**&&transactionContext为null || 事务传播级别REQUIRES_NEW(每个方法都新建事务执行)
        // Provider->事务传播级别REQUIRED || 事务传播级别为MANDATORY&&事务未**&&transactionContext不为null
        MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);
        // 分流程处理
        switch (methodType) {
            case ROOT:
                return rootMethodProceed(pjp);
            case PROVIDER:
                return providerMethodProceed(pjp, transactionContext);
            default: // normal单做一个事务执行,无关联关系
                return pjp.proceed();
        }
    }

对主事务(Spring 提供的事务机制tx)的处理:

    private Object rootMethodProceed(ProceedingJoinPoint pjp) throws Throwable {
        Object returnValue = null;
        Transaction transaction = null;
        try {
            // 开启事务
            transaction = transactionManager.begin();
            try {
            // 执行切点函数
                returnValue = pjp.proceed();
            } catch (Throwable tryingException) {
                if (isDelayCancelException(tryingException)) {
                } else {
                    transactionManager.rollback();
                }
                throw tryingException;
            }
            // 提交事务
            transactionManager.commit();
        } finally {
            // 做事务预留资源的清理,绑定在threadlocal的事务栈的弹栈处理
            transactionManager.cleanAfterCompletion(transaction);
        }
        // 返回结果
        return returnValue;
    }

子事务(tcc)的处理:

    private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext) throws Throwable {
        Transaction transaction = null;
        try {
            switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
                case TRYING:// 第一阶段
                    // 新建子事务执行函数
                    transaction = transactionManager.propagationNewBegin(transactionContext);
                    return pjp.proceed();
                case CONFIRMING:
                    try {
                        // 获取已存在并且开始的事务
                        transaction = transactionManager.propagationExistBegin(transactionContext);
                        // 提交事务
                        transactionManager.commit();
                    } catch (NoExistedTransactionException excepton) {
                        //the transaction has been commit,ignore it.
                    }
                    break;
                case CANCELLING:
                    try {
                        // 获取已存在并且开始的事务
                        transaction = transactionManager.propagationExistBegin(transactionContext);
                        // 回滚
                        transactionManager.rollback();
                    } catch (NoExistedTransactionException exception) {
                        //the transaction has been rollback,ignore it.
                    }
                    break;
            }
        } finally {
            // 清理事务预留资源
            transactionManager.cleanAfterCompletion(transaction);
        }
        // 获取返回类型并且进行反射,默认值为null
        Method method = ((MethodSignature) (pjp.getSignature())).getMethod();
        return ReflectionUtils.getNullValue(method.getReturnType());
    }

TransactionManager 通过和线程绑定的Transaction,实现自定义事务栈

public class TransactionManager {
    private TransactionRepository transactionRepository;

    private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();

    public void setTransactionRepository(TransactionRepository transactionRepository) {
        this.transactionRepository = transactionRepository;
    }

    public Transaction begin() {

        Transaction transaction = new Transaction(TransactionType.ROOT);
        transactionRepository.create(transaction);
        registerTransaction(transaction);
        return transaction;
    }

    public Transaction propagationNewBegin(TransactionContext transactionContext) {

        Transaction transaction = new Transaction(transactionContext);
        transactionRepository.create(transaction);

        registerTransaction(transaction);
        return transaction;
    }

    public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {
        Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());

        if (transaction != null) {
            transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));
            registerTransaction(transaction);
            return transaction;
        } else {
            throw new NoExistedTransactionException();
        }
    }

    public void commit() {

        Transaction transaction = getCurrentTransaction();

        transaction.changeStatus(TransactionStatus.CONFIRMING);

        transactionRepository.update(transaction);

        try {
            transaction.commit();
            transactionRepository.delete(transaction);
        } catch (Throwable commitException) {
            logger.error("compensable transaction confirm failed.", commitException);
            throw new ConfirmingException(commitException);
        }
    }

    public Transaction getCurrentTransaction() {
        if (isTransactionActive()) {
            return CURRENT.get().peek();
        }
        return null;
    }

    public boolean isTransactionActive() {
        Deque<Transaction> transactions = CURRENT.get();
        return transactions != null && !transactions.isEmpty();
    }

    public void rollback() {

        Transaction transaction = getCurrentTransaction();
        transaction.changeStatus(TransactionStatus.CANCELLING);

        transactionRepository.update(transaction);

        try {
            transaction.rollback();
            transactionRepository.delete(transaction);
        } catch (Throwable rollbackException) {
            logger.error("compensable transaction rollback failed.", rollbackException);
            throw new CancellingException(rollbackException);
        }
    }

    private void registerTransaction(Transaction transaction) {

        if (CURRENT.get() == null) {
            CURRENT.set(new LinkedList<Transaction>());
        }

        CURRENT.get().push(transaction);
    }

    public void cleanAfterCompletion(Transaction transaction) {
        if (isTransactionActive() && transaction != null) {
            Transaction currentTransaction = getCurrentTransaction();
            if (currentTransaction == transaction) {
                CURRENT.get().pop();
            } else {
                throw new SystemException("Illegal transaction when clean after completion");
            }
        }
    }


    public void enlistParticipant(Participant participant) {
        Transaction transaction = this.getCurrentTransaction();
        transaction.enlistParticipant(participant);
        transactionRepository.update(transaction);
    }
}

自定义Transaction 和 Participant :

// transaction:
    private List<Participant> participants = new ArrayList<Participant>();
    public void enlistParticipant(Participant participant) {
        participants.add(participant);
    }

public class Participant implements Serializable {

    // ... 执行cancel函数
    public void rollback() {
        terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
    }

    // ... 执行commit函数
    public void commit() {
        terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
    }
}

即在事务提交时执行commit函数逻辑