欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java实现saga分布式事务模型框架

程序员文章站 2022-07-12 16:16:31
...

 

调用第三方接口操作业务的时候经常要用到分布式事务,以保证事务的完整性,我们常用的分布式事务模型有saga、tcc、2pc等。2pc不属于接口调用的场景,所以我们调用第三方接口常用的模型有saga、tcc

 

tcc要保证comfirm时报所有dml操作请求写在comfirm实现里面,这个有一定的局限性,回滚的的实现写在cancel接口里面,这个对业务有一定的局限性

saga更适合长事务、业务比较复杂的场景

 

本文实现saga事务异常处理,如果需要处理那种接口comfirm超时的情况,需要将事务id及comfirm调用链持久化保存起来,异步检查,也可以写在缓存里面定时检查。这里没实现,应该比较容易实现。

 

  • saga的模型如下

Java实现saga分布式事务模型框架
            
    
    博客分类: javasaga java saga seata 

 

 

由上图模型不难看出我们在methodA就可以实现saga事务,只要serviceB、serviceC配合实现对应的cancel方法;本文实现java简易版本的sata框架模型代码。当然一定要实现阿里的seata框架或者其它也可以的,就是要另外搭建seata服务器跟相关数据库;

实现技术点:

1、lamdba

2、gclib动态代理

3、ThreadLocal

 

 

 

  •  saga使用测试

 

public class SagaTest {
	
	public void testFunc(){
	System.out.println("**********先测试 Func方法测试 开始***********");
	String failStr = "调用失败";
	//lambda表达式,jdk8才能执行。
	Test test1 = Func.proxy(new Test(),3, 
			Callback::success,//Callback::success方法调用
			(aa)->System.out.println(aa+failStr)
			);
//	try{
//	System.out.println("======== test start 测试动态代理,代理方法内部调用代理方法=========");
		test1.test();
	System.out.println("======== test end 测试动态代理,代理方法内部调用代理方法=========");
	System.out.println();
	System.out.println("======== successWithRetry3 start  测试动态代理失败重试三次=========");
		test1.successWithRetry3();
	System.out.println("======== successWithRetry3 end  测试动态代理失败重试三次=========");
	System.out.println();
//	System.out.println("======== successWithRetry4 start  测试动态代理失败重试四次=========");
//          因为动态代理对象test1最多重试4次,所以successWithRetry4重试4次后会报异常,不再往下执行。
//		test1.successWithRetry4();
//	System.out.println("======== successWithRetry4 end  测试动态代理失败重试四次=========");
//	}catch(Exception e){
//		
//	}
}	

/**
 * 测试saga事务
 * @author giant
 * @date 2021年12月17日
 */
public void methodA(){
		
		
		
		System.out.println("**********Gaga 测试 真正开始***********");
		Saga.begin();//saga开始。
		Test sagaTest = new Test();
//               每个saga事务节点有两个实现,
//               如下:1、业务实现comfirm方法:sagaTest.test()方法;2、cancel方法为lambda方法实现
		 Saga.service(sagaTest, (a)->{
			System.out.println("回滚t1");
		}).test();
		
		Saga.service(sagaTest, (a)->{
			System.out.println("回滚t2");
		}).test1();
		
		System.out.println("准备报错混滚:");
		Saga.service(sagaTest, (a)->{
			System.out.println("回滚t3");
		}).fail();
//         sagaTest.fail()方法抛出异常,会执行Saga.rollback(throwable)方法,
 //        rollback方法会执行所有执行过commit方法的cancel方法,回滚所有执行过的comfirm;
		
		Saga.end();//结束本次分布式事务
			
//			test1.test2();
//			Test test2 = Fun.proxy(new Test(),null,null);
//			test2.test1();
//			test2.test2();
		//successWithRetry4重试四次失败抛出异常,不往下跑了
		System.out.println("不抛出异常跑完;");
		
	}
	public static void main(String[] args) {
		new SagaTest().methodA();
	}
}
class Test{
	 int execCount = 0 ;
	public String test(){
		System.out.println("test");
		this.test1();
		return "test success";
	}
	public String test1(){
		String innerParam = "innerParam1";
		System.out.println("test1 exec,方法内部参数:"+innerParam);
		return "test1 success";
	}
	public void test2(String arg){
		System.out.println("test2:"+arg);
	}
	
	/**
	 * 跑第三次成功
	 * @author giant
	 * @date 2021年12月15日
	 * @return
	 */
	public String successWithRetry3(){
		
		if(execCount>=2){
			String str = "第"+(++execCount)+"次success";
			execCount = 0;
			return str;
		}else{
			System.out.println("fail:"+(++execCount));
			throw new RuntimeException("fail");
		}
	}
	
	
	/**
	 * 跑第第次成功
	 * @author giant
	 * @date 2021年12月15日
	 * @return
	 */
	public String successWithRetry4(){
		 execCount = 0 ;
		if(execCount>=3){
			String str = "第"+(++execCount)+"次success";
			execCount = 0;
			return str;
		}else{
			System.out.println("fail:"+(++execCount));
			throw new RuntimeException("fail");
		}
	}
	public String fail(){
			System.out.println("fail:");
			throw new RuntimeException("fail");
	}
	
	public String fail(String msg){
		System.out.println("fail:"+msg);
		throw new RuntimeException("fail");
	}
	
	
	
}



class Callback{
	 
	 public Callback(){}
		public static void success(Object obj){
			System.out.println(obj+"成功");
		}
		
		public void fail(Throwable e){
			System.out.println("执行失败");
		}
	}

 

 

 

最终执行fail方法异常报错:回滚Saga.service执行过cancel方法,输出内容如下:

 

**********Gaga 测试 真正开始***********
test
test1 exec,方法内部参数:innerParam1
test1 exec,方法内部参数:innerParam1
准备报错混滚:
fail:
回滚t1
回滚t2
回滚t3
Exception in thread "main" java.lang.RuntimeException: fail
	at com.framework.utils.func.Test.fail(SagaTest.java:132)
	at com.framework.utils.func.Test$$EnhancerByCGLIB$$6580e53a.CGLIB$fail$4(<generated>)
	at com.framework.utils.func.Test$$EnhancerByCGLIB$$6580e53a$$FastClassByCGLIB$$c8387779.invoke(<generated>)
	at net.sf.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:228)
	at com.framework.utils.func.Func.lambda$0(Func.java:51)
	at com.framework.utils.func.Func.retry(Func.java:73)
	at com.framework.utils.func.Func.intercept(Func.java:48)
	at com.framework.utils.func.Test$$EnhancerByCGLIB$$6580e53a.fail(<generated>)
	at com.framework.utils.func.SagaTest.methodA(SagaTest.java:58)
	at com.framework.utils.func.SagaTest.main(SagaTest.java:75)

 

 

 

 

  • 代码实现

 

 

 maven导入gclib动态代理

 

<dependency>
	<groupId>cglib</groupId>
	<artifactId>cglib</artifactId>
	<version>3.2.12</version>
</dependency>

 

 

1、saga事务重试接口

 

/**
 * lambda表达式实现,方法失败重试接口
 * @author giant
 * @date 2021年12月15日
 * @param <T> 返回对象
 */
@FunctionalInterface
interface IRetry<T>{

	public T run() throws Throwable;
}

 

 

 2、成功或者失败后的回调接口

 

/**
 * lambda表达式实现,方法成功或者失败的回调接口
 * @author giant
 * @date 2021年12月15日
 * @param <T> 参数对象
 */
@FunctionalInterface
public interface ICallback<T> {

	public void run(T obj);
}

 

 

3、通过动态代理service中所有的方法,实现service的重试、成功及错误的回调

 

 

import java.lang.reflect.Method;

import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;


/**
 * * 方法执行动态代理,实现代理类方法重试,方法调用成功或者失败后的反调函数实现
 * 代理对象调用方法时,方法与方法直接可嵌套使用。
 * @author giant
 * @date 2021年12月15日
 */
public class Func implements MethodInterceptor{
	
	/**
	 * 是否saga模式,如果是saga模式,建在报错的时候统一执行Saga.sagaFailCallBacks集合里面的方法
	 * 
	 */
	private boolean isSaga=false;
	
	private ICallback<Throwable> fail = null;
	private ICallback<Object> success  = null;
	private int retry = 1;
	private Object t = null;
	
	/**
	 * 不允许外表创建该对象
	 * @param t
	 * @param success
	 * @param fail
	 * @param retry
	 */
	 Func(Object t,ICallback<Object> success,ICallback<Throwable> fail,int retry,boolean isSaga) {
		// TODO Auto-generated constructor stub
		this.t = t;
		this.success = success;
		this.fail = fail;
		this.retry = retry;
		this.isSaga = isSaga;
	}
	

	
	@Override
	public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
		Object o2 = retry(()->{
			Object o1 = null;
			try{
				 o1 = methodProxy.invokeSuper(o, objects);
				return o1;
			}catch(Throwable e){
				throw e;
			}
//			return null;
		}, Object.class,this.retry,this.success,this.fail);
		return o2;
	}
	
	/**
	 * 失败重试三次
	 * @param run
	 * @param clz
	 * @return
	 */
	private <T> T retry(IRetry<T> run,Class <T> clz,int count,ICallback<Object> success,ICallback<Throwable>fail){
		int i=0;
		T result = null;
		boolean isSuccess = false;
		do{
			try {
				result = run.run();
				isSuccess = true;
				 if(success!=null){
					success.run(result);
				 }
			}catch(Throwable e){
				if(fail!=null&&!isSaga){
					fail.run(e);
				}
				if(i==retry-1){
					//向外抛出的异常只认最后一次
					if(isSaga){//如果是saga处理,统一最后回滚事务
						Saga.rollback(e);
					}
					throw e;
				}
			}
		}while((!isSuccess)&&++i<count);
		return result;
	}
	
	 //生成代理对象
	Object get(){
	  //new 一个Enhancer对象
	  Enhancer enhancer = new Enhancer();
	  //指定他的父类(注意这 是实现类,不是接口)
	  enhancer.setSuperclass(t.getClass());
	  //指定真正做事情的回调方法
	  enhancer.setCallback(this);
	  //生成代理类对象
	  Object o =  enhancer.create();
	  //返回
	  return o;
	 }
	 
	/**
	 * 给代理对象赋能,支持方法重试、成功及失败的回调
	 * @author giant
	 * @date 2021年12月15日
	 * @param obj 代理对象
	 * @param retry 代理对象失败重试次数
	 * @param success 代理对象所有方法成功回调函数(不报异常就算成功)
	 * @param fail 代理对象所有方法失败回调函数。(报异常就算失败)
	 * @return 返回代理
	 */
	 @SuppressWarnings("unchecked")
	public static <T2> T2 proxy(T2 obj,int retry,ICallback<Object> success,ICallback<Throwable> fail){
		 try {
			Func p = new Func(obj,success,fail,retry,false);
			return (T2)p.get();
		} catch (Exception e) {
			throw new RuntimeException(e);
			
		}
	 }
	 
	 /**
	  * 给代理对象赋能,支持方法成功及失败的回调
	  * @author giant
	  * @date 2021年12月15日
	  * @param obj 代理对象
	  * @param success 代理对象所有方法成功回调函数(不报异常就算成功)
	  * @param fail 代理对象所有方法失败回调函数(报异常就算失败)
	  * @return 返回代理
	  */
	 public static <T2> T2 proxy(T2 obj,ICallback <Object>success,ICallback<Throwable> fail){
		 return proxy(obj, 1, success, fail);
	 }
	 
	
}

 

 

4、Saga模型框架基于Func类实现

 

package com.framework.utils.func;

import java.util.ArrayList;
import java.util.List;
/**
 * saga分布式事务服务类
 * 所有Gaga.service()调用的方法必须在Gaga.begin()和Gaga.begin()中间。
 * xid就是threadid,可以通过Gaga.begin()获取
 * @author giant
 * @date 2021年12月15日
 */
public class Saga {
	/**
         * 
	 * 记录事务所有saga节点回滚函数,该对象代替了事件表,每次Gaga分布式事务必须调用Saga.begin()跟Saga.end();
         *
	 */
	static ThreadLocal<List<ICallback<Throwable>>> sagaFailCallBacks = new ThreadLocal<>();
	
        /**
	 * saga事务开始;每次Gaga分布式事务必须调用Saga.begin()跟Saga.end();
	 * @author giant
	 * @date 2021年12月15日
	 */
	public static long begin() {
		long xid = Thread.currentThread().getId();
		List<ICallback<Throwable>> fails = Saga.sagaFailCallBacks.get();
		if (fails == null) {
			fails = new ArrayList<ICallback<Throwable>>();
			Saga.sagaFailCallBacks.set(fails);
		} else {
			throw new SagaException("程序未关闭saga,请在finally方法执行Saga.end()");
		}
		return xid;
	}

	
	
	/**
	 * 代理事务节点,该节点执行失败,会根据参数重试,如果方法报错,会回滚其它所有事务的failRollback方法
	 * (注意:failRollback方法默认时所有都能执行成功的;建议回滚方法调用Func.proxy retry多次)
	 * @author giant
	 * @date 2021年12月15日
	 * @param obj 事务节点对象
	 * @param retry 事务失败重试次数
	 * @param failRollback 如果其中一个service模块失败,将在异常的时候回归,或者调用Gaga.rollback(null)回滚。
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public static <T2> T2 service(T2 obj, int retry, ICallback<Throwable> failRollback) {
		try {
			List<ICallback<Throwable>> fails = Saga.sagaFailCallBacks.get();
			if (fails == null) {
				throw new SagaException("请先开启Saga,调用Saga.begin()");
			}
			fails.add(failRollback);
			Func p = new Func(obj, null, failRollback, retry, true);
			return (T2) p.get();
		} catch (Exception e) {
			throw new SagaException(e);

		}
	}

	/**
	 * 代理事务节点,如果报错方法报错,会回滚其它所有事务的failRollback方法
	 * (注意:failRollback方法默认时所有都能执行成功的;建议回滚方法调用Func.proxy retry多次)
	 * @author giant
	 * @date 2021年12月15日
	 * @param obj 事务节点对象
	 * @param failRollback
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public static <T2> T2 service(T2 obj, ICallback<Throwable> failRollback) {
		return service(obj, 1, failRollback);
	}

	/**
	 * 回滚所有saga执行的链路,在本地方方法抛异常时也可调用,回滚所有方法
	 * 
	 * @author giant
	 * @date 2021年12月15日
	 * @param e
	 */
	public static void rollback(Throwable e) {
		List<ICallback<Throwable>> failCallbacks = sagaFailCallBacks.get();
		if (failCallbacks != null) {
			// 这里认为所有方法fail调用都是成功的,如果失败,就是人工处理了。
			failCallbacks.forEach(f -> f.run(e));
			failCallbacks.clear();
		}
		sagaFailCallBacks.set(null);
	}
	
	/**
	 * 结束saga,这个方法是要在saga事务结束的时候调用,调用了Saga.begin()就必须调用该方法
	 * @author giant
	 * @date 2021年12月15日
	 */
	public static void end() {
		List<ICallback<Throwable>> fails = Saga.sagaFailCallBacks.get();
		if (fails != null) {
			fails.clear();
		}
		Saga.sagaFailCallBacks.set(null);
	}
}

 

 

saga异常实现

 

package com.framework.utils.func;

public class SagaException extends RuntimeException {
	
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	public SagaException(String e){
		super(e);
	}
	public SagaException(Throwable e){
		super(e);
	}
}

 

 

 

 

 

 

相关标签: java saga seata