欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

自定义事务管理器的实现 以应对同一个系统多个数据源@Transactional事务 失效问题

程序员文章站 2022-06-28 17:18:27
一 问题由来?在同一个系统中如果使用多个数据源 当一个服务方法里面 另一个服务类的方法 而这两个服务操作的不是同一个同一个数据库 那么@Tranactional将会失效 我们常用的DataSourceTransactionManager事务管理器就不管用了 这种情况无法实现事务的原子性了 归根到底的最终原因:是这两个服务的无法拿到无法拿到同一个Connection 也就是无法拿到同一个数据库连接 在源码中 拿连接的逻辑是根据数据源来拿取的 它保存在一个Thread......

一  问题由来?

在同一个系统中如果使用多个数据源  当一个服务方法里面调用另一个服务类的方法  而这两个服务操作的不是同一个同一个数据库    那么@Tranactional将会失效    我们常用的DataSourceTransactionManager事务管理器就不管用了  这种情况无法实现事务的原子性了   

归根到底的最终原因:

是这两个服务的无法拿到无法拿到同一个Connection 也就是无法拿到同一个数据库连接   在源码中 拿连接的逻辑是根据数据源来拿取的  它保存在一个ThreadLocal的Map变量里面 。

当一个线程第一个服务第一次进入的时候拿到一个新的连接   然后进入到第二个服务  但是这个时候因为这两个服务的数据库不一样  所以spring的事务处理拦截方法无法根据数据库从线程绑定变量ThreadLocal拿到第一次的的连接   又会开一个新的连接    spring设计的事务提交的条件就是 如果是新的事务(如果拿不到同一个连接意味着要开一个新的事务 )  最后才会提交  很明显 服务二开了一个新的事务   所以  如果 服务二不出错 那么 服务二已经被提交了  但是这个时候服务一还没结束! 如果服务二在调用服务一之后出个错 那么 服务一虽然回滚了但是服务二是没法控制的 

例如:下面这个方法  shopServiceSlove 增加的商品无法被回滚 

@Override
public void testCodeTransaction() {


        int index=5;
        Shop shop=new Shop();
        shop.setId(index);
        shop.setName("product"+index);
        int insert = shopMapperMaster.insert(shop);
        Shop shop2=new Shop();
        shop2.setId(index);
        shop2.setName("product"+index);
        shopServiceSlove.addShops(shop2);
        int a=2/0;
}

对上面这种情况传统的事务管理器已经无法满足需求了  所以我自定义了一个新的事务管理器

我设计的思路是 :

把第一个事务开启的方法开始  直到结束 我们这中间用到的所有的数据库连接缓存起来 然后把第一个事务方法所设计的所有服务方法的执行结果是否出现了错误保存在一个线程变量里   最后第一个事务方法根据这个线程变量情况看看是全部回滚还是全部提交      然后我们就可以拿到所有数据库连接执行相关操作。

举个例子 就拿下面这个方法testCodeTransaction():

这个服务方法会跨越数据源调用  shopMapperMaster会调用数据源1    shopServiceSlove这个服务会调用数据源2       

传统的事务管理是无法器无法在   shopServiceSlove.addShops(shop2);   这行代码之后出错之后 (也就是int a=2/0);     shopServiceSlove增加的数据进行回滚(假如它没出错) 原因上面已经讲过了 。

我如果把testCodeTransaction()开始直到这个方法结束  就把用到的数据库连接缓存到一个线程变量里   知道第一个方法结束时才把所有数据库连接拿出来 根据方法是否出错 在集体实现回滚还是集体提交。

@Override
public void testCodeTransaction() {


        int index=5;
        Shop shop=new Shop();
        shop.setId(index);
        shop.setName("product"+index);
        int insert = shopMapperMaster.insert(shop);
        Shop shop2=new Shop();
        shop2.setId(index);
        shop2.setName("product"+index);
        shopServiceSlove.addShops(shop2);
        int a=2/0;


}

这中间会涉及到一个问题 就是方法的嵌套  怎么判定某一个方法是最开始进入的那个方法   因为我们的拦截方法肯定是每一个服务的方法都要进入的。 我是这样设计的(有点借鉴了可重入锁的设计):

拦截方法:

设计一个一个计数器 初始值为:0

 

1 拿到现在的执行器的值 并加一

try{

我们的方法执行

}finally{

减去1

}

根据计数器是否为初始值 就可以判断这个方法是否是第一次进入的方法  也就是全局事务发起的方法:

 

二  撸代码

1 定义一个注解  类似spring的@tranactional  方便我们扫描spring容器然后代理

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE,ElementType.METHOD})
public @interface MutilDbTransactional {
    String dataBase() default "dataSource";
    String []  ignoreMethod() default {"equals","hashCode","toString"};
    boolean readOnly() default false;
    boolean autoCommit() default  false;
    IsolationLevel isolationLevel() default IsolationLevel.TRANSACTION_DEFAULT;
}

2  启动注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(MutilDbProcessor.class)
public @interface EnableMutilDbTranactionManagement {
}

 

3 注解处理器 (扫描容器中的我们注解 然后代理它)


@Slf4j
public class MutilDbProcessor implements BeanPostProcessor, BeanFactoryAware {

    private BeanFactory beanFactory;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        Class<?> aClass = bean.getClass();
        if(aClass.isAnnotationPresent(MutilDbTransactional.class)){
            log.info("{} 被MutilDbTransactional注解修饰  现在为它生成代理对象-------------",beanName);
            try {
                return   new MethodInvocationHander(bean,null,beanFactory).createProxy();
            } catch (Exception e) {
                e.printStackTrace();
                throw  new RuntimeException(bean+":创建代理失败!!");
            }

        }else{
            Method[] declaredMethods = aClass.getDeclaredMethods();
            Map<String,MutilDbTransactional> methodMutilDbTransactionalMap=new HashMap<>();
            for(Method m:declaredMethods){
                if(m.isAnnotationPresent(MutilDbTransactional.class)) {
                    methodMutilDbTransactionalMap.put(MutiDbTransactionUtil.keyGenarate(m),m.getAnnotation(MutilDbTransactional.class));
                }
            }
            if(methodMutilDbTransactionalMap.keySet().size()>0){
                log.info("{} 类 有方法: 被MutilDbTransactional注解修饰  现在为它生成代理对象-------------",beanName);
                try {
                    return   new MethodInvocationHander(bean,methodMutilDbTransactionalMap,beanFactory).createProxy();
                } catch (Exception e) {
                    e.printStackTrace();
                    throw  new RuntimeException("创建代理对象失败!");
                }
            }


        }

        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

      return  bean;
    }


    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory=beanFactory;
    }


}

4 核心的拦截方法

@Slf4j
public class MutiDbTransactionInterceptor {

    private static final ThreadLocal<Map<Connection,Boolean>>  conExeResult=ThreadLocal.withInitial(()->new HashMap<>());
    private static final ThreadLocal<List<DataSource>>  dataSourceUseList=ThreadLocal.withInitial(()->new ArrayList<>());

    private Invocation invocation;
    private BeanFactory beanFactory;
    private Map<String,MutilDbTransactional> methodMutilDbTransactionalMap;
    private  MutilDbTransactional mutilDbTransactional;
    public  MutiDbTransactionInterceptor(Invocation invocation,BeanFactory beanFactory){
        this.invocation=invocation;
        this.beanFactory=beanFactory;
        this.methodMutilDbTransactionalMap=invocation.getMethodMutilDbTransactionalMap();
        this.mutilDbTransactional = invocation.getMutilDbTransactional();
    }
    public Object interceptor() throws Throwable{

        boolean classTypeAnnation=true;
        invocation.getMethod();
        if(mutilDbTransactional==null){//说明是方法加了注解
            MutilDbTransactional mutilDbTransactional = methodMutilDbTransactionalMap.get(MutiDbTransactionUtil.keyGenarate(invocation.getMethod()));
            if(mutilDbTransactional!=null){
                this.mutilDbTransactional =mutilDbTransactional;
                classTypeAnnation=false;
            }else {
                return  invocation.execute();
            }
        }
        if(classTypeAnnation) {
            String[] strings = mutilDbTransactional.ignoreMethod();
            String name = invocation.getMethod().getName();
            boolean ingore = Arrays.stream(strings).anyMatch(p -> p.equals(name));
            if (ingore) {
                return invocation.execute();
            }
            if (invocation.getMethod().isAnnotationPresent(IngnoreMethod.class)) {
                return invocation.execute();
            }
        }
        MutiDbTransactionUtil.enterMutlTransaction();
        String qualifier = mutilDbTransactional.dataBase();
        DataSource dataSource = BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, DataSource.class, qualifier);
        if(dataSource==null){
            throw  new RuntimeException(qualifier+"数据源不存在容器中----------------");
        }
        Map<Connection, Boolean> connectionBooleanMap = conExeResult.get();
        Connection connection=null;
        Object resource = TransactionSynchronizationManager.getResource(dataSource);
        if(resource!=null){
            connection=((ConnectionHolder)resource).getConnection();
        }else {
            connection = dataSource.getConnection();
            ConnectionHolder connectionHolder=new ConnectionHolder(connection);
            TransactionSynchronizationManager.bindResource(dataSource,connectionHolder);
            dataSourceUseList.get().add(dataSource);
        }

        connection.setReadOnly(mutilDbTransactional.readOnly());
        if(mutilDbTransactional.isolationLevel()!= IsolationLevel.TRANSACTION_DEFAULT) {
            connection.setTransactionIsolation(mutilDbTransactional.isolationLevel().getLevel());
        }
        connection.setAutoCommit(mutilDbTransactional.autoCommit());
        Object result = null;
        Boolean error=false;
        try {
            result = invocation.execute();
        } catch (Throwable throwable) {
            error=true;
            throwable.printStackTrace();
        }finally {
            MutiDbTransactionUtil.releaseMutlTransaction();
        }
        connectionBooleanMap.put(connection,error);

        //事务发起方法结束
        if(MutiDbTransactionUtil.isFirstEnterMethod()){

            try {
                boolean errorHappen = connectionBooleanMap.values().stream().anyMatch(p -> p == true);
                if (errorHappen) {
                    log.info("发生错误  全部回滚------------------");
                    connectionBooleanMap.keySet().forEach(con -> {
                        try {
                            con.rollback();
                            log.info("数据库连接:{} 已回滚",con);
                        } catch (SQLException e) {
                            throw new RuntimeException("数据库连接回滚错误!!");
                        }
                    });
                    log.info("全部回滚结束------------------");
                } else {
                    log.info("没有发生错误  全部提交------------------");
                    connectionBooleanMap.keySet().forEach(con -> {
                        try {
                            con.commit();
                            log.info("数据库连接:{} 已提交",con);
                        } catch (SQLException e) {
                            throw new RuntimeException("数据库连接提交错误!!");
                        }
                    });
                    log.info("全部提交结束------------------");
                }
            }finally {
                log.info("归还数据库连接------------------");
                conExeResult.get().keySet().forEach(con->{
                    try {
                        con.close();
                    } catch (SQLException e) {
                        log.info("数据库连接:{} 关闭错误",con);
                    }
                });
                log.info("归还数据库连接结束------------------");
                conExeResult.remove();
                MutiDbTransactionUtil.reset();
                dataSourceUseList.get().forEach(d->{
                    TransactionSynchronizationManager.unbindResource(d);
                });
                dataSourceUseList.remove();
            }


        }


        return result;




    }




}

 

5 附上其他的类

public class Invocation {
    private Method method;
    private Object[] args;
    private  Object target;
    private MutilDbTransactional mutilDbTransactional;
    private Map<String,MutilDbTransactional> methodMutilDbTransactionalMap;
    public Invocation(Object t, MutilDbTransactional mutilDbTransactional,Map<String,MutilDbTransactional> methodMutilDbTransactionalMap, Method method, Object[] args) {
        target=t;
        this.method = method;
        this.args = args;
        this.mutilDbTransactional=mutilDbTransactional;
        this.methodMutilDbTransactionalMap=methodMutilDbTransactionalMap;
    }

    public MutilDbTransactional getMutilDbTransactional() {
        return mutilDbTransactional;
    }

    public Object execute() throws Throwable {
        return  method.invoke(target,args);
    }

    public Method getMethod() {
        return method;
    }

    public Object[] getArgs() {
        return args;
    }

    public Object getTarget() {
        return target;
    }

    public Map<String, MutilDbTransactional> getMethodMutilDbTransactionalMap() {
        return methodMutilDbTransactionalMap;
    }
}
/**
 * @author
 * @discription;
 * @time 2020/11/5 13:51
 */
public class MethodInvocationHander implements InvocationHandler {

    private Object target;
    private Class [] interfaceClasses;
    private ClassLoader classLoader;
    private MutilDbTransactional mutilDbTransactional;
    private BeanFactory beanFactory;
    private Map<String,MutilDbTransactional> methodMutilDbTransactionalMap;
    public MethodInvocationHander(Object targetO, Map<String,MutilDbTransactional> methodMutilDbTransactionalMap,BeanFactory beanFactory) throws IllegalAccessException, InstantiationException {
        Class<?> targetClass = targetO.getClass();
        Class[] interfaces = targetClass.getInterfaces();
        interfaceClasses=interfaces;
        mutilDbTransactional=targetClass.getDeclaredAnnotation(MutilDbTransactional.class);
        target= targetO;
        classLoader=targetClass.getClassLoader();
        this.beanFactory=beanFactory;
        this.methodMutilDbTransactionalMap=methodMutilDbTransactionalMap;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        Invocation invocation=new Invocation(target,mutilDbTransactional,methodMutilDbTransactionalMap,method,args);
        MutiDbTransactionInterceptor mutiDbTransactionInterceptor = new MutiDbTransactionInterceptor(invocation,beanFactory);
        return mutiDbTransactionInterceptor.interceptor();

    }

    public Object createProxy(){

      return  Proxy.newProxyInstance(classLoader,interfaceClasses,this);

    }




}

 

public class MutiDbTransactionUtil {

    private static final ThreadLocal<Integer> transactionStartFlag=ThreadLocal.withInitial(()->0);

    public static void enterMutlTransaction(){
        Integer integer = transactionStartFlag.get();
        transactionStartFlag.set(++integer);
    }

    public static void releaseMutlTransaction(){
        Integer integer = transactionStartFlag.get();
        transactionStartFlag.set(--integer);
    }

    public static boolean isFirstEnterMethod(){
        Integer integer = transactionStartFlag.get();
        return  integer==0;
    };

    public static void reset() {
        transactionStartFlag.remove();
    }

    public static String keyGenarate(Method method){

        String name = method.getName();
        String collect = Arrays.stream(method.getParameterTypes()).map(Class::getName).collect(Collectors.joining());
        return  name+collect;

    }
}
public enum IsolationLevel {

   TRANSACTION_NONE(0),
   TRANSACTION_DEFAULT(-1),
   TRANSACTION_READ_UNCOMMITTED(1),
   TRANSACTION_READ_COMMITTED(2),
   TRANSACTION_REPEATABLE_READ(4),
   TRANSACTION_SERIALIZABLE(8);

    private int level;

    IsolationLevel(int level){
        this.level=level;
    }

    public int getLevel(){
        return  this.level;
    }


}

 

三  实战 服务一调用服务二

服务一 :

@Override
@MutilDbTransactional(dataBase = "data-source1")
public void testCodeTransaction() {
   
        int index=5;
        Shop shop=new Shop();
        shop.setId(index);
        shop.setName("product"+index);
        int insert = shopMapperMaster.insert(shop);
        Shop shop2=new Shop();
        shop2.setId(index);
        shop2.setName("product"+index);
       shopServiceSlove.addShops(shop2);//服务二

        int a=2/0;

}

服务二:

@Override
@MutilDbTransactional(dataBase = "data-source2")
public int addShops(Shop shop) {
    return  shopMapperSlove.insert(shop);
}

 

启动情况:

自定义事务管理器的实现  以应对同一个系统多个数据源@Transactional事务 失效问题

 

数据库1情况:

自定义事务管理器的实现  以应对同一个系统多个数据源@Transactional事务 失效问题

 

数据库2:

自定义事务管理器的实现  以应对同一个系统多个数据源@Transactional事务 失效问题

 

测试:

自定义事务管理器的实现  以应对同一个系统多个数据源@Transactional事务 失效问题

数据库1情况:

自定义事务管理器的实现  以应对同一个系统多个数据源@Transactional事务 失效问题

数据库2:

 

自定义事务管理器的实现  以应对同一个系统多个数据源@Transactional事务 失效问题

 

圆满解决了上述问题!!  请注意如果使用了我的事务注解就不能使用spring自带的了 会冲突!!!!

 

 

本文地址:https://blog.csdn.net/qq_41082092/article/details/109533928