Java实现saga分布式事务模型框架
程序员文章站
2022-07-12 16:16:31
...
调用第三方接口操作业务的时候经常要用到分布式事务,以保证事务的完整性,我们常用的分布式事务模型有saga、tcc、2pc等。2pc不属于接口调用的场景,所以我们调用第三方接口常用的模型有saga、tcc
tcc要保证comfirm时报所有dml操作请求写在comfirm实现里面,这个有一定的局限性,回滚的的实现写在cancel接口里面,这个对业务有一定的局限性
saga更适合长事务、业务比较复杂的场景
本文实现saga事务异常处理,如果需要处理那种接口comfirm超时的情况,需要将事务id及comfirm调用链持久化保存起来,异步检查,也可以写在缓存里面定时检查。这里没实现,应该比较容易实现。
- saga的模型如下
由上图模型不难看出我们在methodA就可以实现saga事务,只要serviceB、serviceC配合实现对应的cancel方法;本文实现java简易版本的sata框架模型代码。当然一定要实现阿里的seata框架或者其它也可以的,就是要另外搭建seata服务器跟相关数据库;
实现技术点:
1、lamdba
2、gclib动态代理
3、ThreadLocal
- saga使用测试
public class SagaTest { public void testFunc(){ System.out.println("**********先测试 Func方法测试 开始***********"); String failStr = "调用失败"; //lambda表达式,jdk8才能执行。 Test test1 = Func.proxy(new Test(),3, Callback::success,//Callback::success方法调用 (aa)->System.out.println(aa+failStr) ); // try{ // System.out.println("======== test start 测试动态代理,代理方法内部调用代理方法========="); test1.test(); System.out.println("======== test end 测试动态代理,代理方法内部调用代理方法========="); System.out.println(); System.out.println("======== successWithRetry3 start 测试动态代理失败重试三次========="); test1.successWithRetry3(); System.out.println("======== successWithRetry3 end 测试动态代理失败重试三次========="); System.out.println(); // System.out.println("======== successWithRetry4 start 测试动态代理失败重试四次========="); // 因为动态代理对象test1最多重试4次,所以successWithRetry4重试4次后会报异常,不再往下执行。 // test1.successWithRetry4(); // System.out.println("======== successWithRetry4 end 测试动态代理失败重试四次========="); // }catch(Exception e){ // // } } /** * 测试saga事务 * @author giant * @date 2021年12月17日 */ public void methodA(){ System.out.println("**********Gaga 测试 真正开始***********"); Saga.begin();//saga开始。 Test sagaTest = new Test(); // 每个saga事务节点有两个实现, // 如下:1、业务实现comfirm方法:sagaTest.test()方法;2、cancel方法为lambda方法实现 Saga.service(sagaTest, (a)->{ System.out.println("回滚t1"); }).test(); Saga.service(sagaTest, (a)->{ System.out.println("回滚t2"); }).test1(); System.out.println("准备报错混滚:"); Saga.service(sagaTest, (a)->{ System.out.println("回滚t3"); }).fail(); // sagaTest.fail()方法抛出异常,会执行Saga.rollback(throwable)方法, // rollback方法会执行所有执行过commit方法的cancel方法,回滚所有执行过的comfirm; Saga.end();//结束本次分布式事务 // test1.test2(); // Test test2 = Fun.proxy(new Test(),null,null); // test2.test1(); // test2.test2(); //successWithRetry4重试四次失败抛出异常,不往下跑了 System.out.println("不抛出异常跑完;"); } public static void main(String[] args) { new SagaTest().methodA(); } } class Test{ int execCount = 0 ; public String test(){ System.out.println("test"); this.test1(); return "test success"; } public String test1(){ String innerParam = "innerParam1"; System.out.println("test1 exec,方法内部参数:"+innerParam); return "test1 success"; } public void test2(String arg){ System.out.println("test2:"+arg); } /** * 跑第三次成功 * @author giant * @date 2021年12月15日 * @return */ public String successWithRetry3(){ if(execCount>=2){ String str = "第"+(++execCount)+"次success"; execCount = 0; return str; }else{ System.out.println("fail:"+(++execCount)); throw new RuntimeException("fail"); } } /** * 跑第第次成功 * @author giant * @date 2021年12月15日 * @return */ public String successWithRetry4(){ execCount = 0 ; if(execCount>=3){ String str = "第"+(++execCount)+"次success"; execCount = 0; return str; }else{ System.out.println("fail:"+(++execCount)); throw new RuntimeException("fail"); } } public String fail(){ System.out.println("fail:"); throw new RuntimeException("fail"); } public String fail(String msg){ System.out.println("fail:"+msg); throw new RuntimeException("fail"); } } class Callback{ public Callback(){} public static void success(Object obj){ System.out.println(obj+"成功"); } public void fail(Throwable e){ System.out.println("执行失败"); } }
最终执行fail方法异常报错:回滚Saga.service执行过cancel方法,输出内容如下:
**********Gaga 测试 真正开始*********** test test1 exec,方法内部参数:innerParam1 test1 exec,方法内部参数:innerParam1 准备报错混滚: fail: 回滚t1 回滚t2 回滚t3 Exception in thread "main" java.lang.RuntimeException: fail at com.framework.utils.func.Test.fail(SagaTest.java:132) at com.framework.utils.func.Test$$EnhancerByCGLIB$$6580e53a.CGLIB$fail$4(<generated>) at com.framework.utils.func.Test$$EnhancerByCGLIB$$6580e53a$$FastClassByCGLIB$$c8387779.invoke(<generated>) at net.sf.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:228) at com.framework.utils.func.Func.lambda$0(Func.java:51) at com.framework.utils.func.Func.retry(Func.java:73) at com.framework.utils.func.Func.intercept(Func.java:48) at com.framework.utils.func.Test$$EnhancerByCGLIB$$6580e53a.fail(<generated>) at com.framework.utils.func.SagaTest.methodA(SagaTest.java:58) at com.framework.utils.func.SagaTest.main(SagaTest.java:75)
- 代码实现
maven导入gclib动态代理
<dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>3.2.12</version> </dependency>
1、saga事务重试接口
/** * lambda表达式实现,方法失败重试接口 * @author giant * @date 2021年12月15日 * @param <T> 返回对象 */ @FunctionalInterface interface IRetry<T>{ public T run() throws Throwable; }
2、成功或者失败后的回调接口
/** * lambda表达式实现,方法成功或者失败的回调接口 * @author giant * @date 2021年12月15日 * @param <T> 参数对象 */ @FunctionalInterface public interface ICallback<T> { public void run(T obj); }
3、通过动态代理service中所有的方法,实现service的重试、成功及错误的回调
import java.lang.reflect.Method; import net.sf.cglib.proxy.Enhancer; import net.sf.cglib.proxy.MethodInterceptor; import net.sf.cglib.proxy.MethodProxy; /** * * 方法执行动态代理,实现代理类方法重试,方法调用成功或者失败后的反调函数实现 * 代理对象调用方法时,方法与方法直接可嵌套使用。 * @author giant * @date 2021年12月15日 */ public class Func implements MethodInterceptor{ /** * 是否saga模式,如果是saga模式,建在报错的时候统一执行Saga.sagaFailCallBacks集合里面的方法 * */ private boolean isSaga=false; private ICallback<Throwable> fail = null; private ICallback<Object> success = null; private int retry = 1; private Object t = null; /** * 不允许外表创建该对象 * @param t * @param success * @param fail * @param retry */ Func(Object t,ICallback<Object> success,ICallback<Throwable> fail,int retry,boolean isSaga) { // TODO Auto-generated constructor stub this.t = t; this.success = success; this.fail = fail; this.retry = retry; this.isSaga = isSaga; } @Override public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable { Object o2 = retry(()->{ Object o1 = null; try{ o1 = methodProxy.invokeSuper(o, objects); return o1; }catch(Throwable e){ throw e; } // return null; }, Object.class,this.retry,this.success,this.fail); return o2; } /** * 失败重试三次 * @param run * @param clz * @return */ private <T> T retry(IRetry<T> run,Class <T> clz,int count,ICallback<Object> success,ICallback<Throwable>fail){ int i=0; T result = null; boolean isSuccess = false; do{ try { result = run.run(); isSuccess = true; if(success!=null){ success.run(result); } }catch(Throwable e){ if(fail!=null&&!isSaga){ fail.run(e); } if(i==retry-1){ //向外抛出的异常只认最后一次 if(isSaga){//如果是saga处理,统一最后回滚事务 Saga.rollback(e); } throw e; } } }while((!isSuccess)&&++i<count); return result; } //生成代理对象 Object get(){ //new 一个Enhancer对象 Enhancer enhancer = new Enhancer(); //指定他的父类(注意这 是实现类,不是接口) enhancer.setSuperclass(t.getClass()); //指定真正做事情的回调方法 enhancer.setCallback(this); //生成代理类对象 Object o = enhancer.create(); //返回 return o; } /** * 给代理对象赋能,支持方法重试、成功及失败的回调 * @author giant * @date 2021年12月15日 * @param obj 代理对象 * @param retry 代理对象失败重试次数 * @param success 代理对象所有方法成功回调函数(不报异常就算成功) * @param fail 代理对象所有方法失败回调函数。(报异常就算失败) * @return 返回代理 */ @SuppressWarnings("unchecked") public static <T2> T2 proxy(T2 obj,int retry,ICallback<Object> success,ICallback<Throwable> fail){ try { Func p = new Func(obj,success,fail,retry,false); return (T2)p.get(); } catch (Exception e) { throw new RuntimeException(e); } } /** * 给代理对象赋能,支持方法成功及失败的回调 * @author giant * @date 2021年12月15日 * @param obj 代理对象 * @param success 代理对象所有方法成功回调函数(不报异常就算成功) * @param fail 代理对象所有方法失败回调函数(报异常就算失败) * @return 返回代理 */ public static <T2> T2 proxy(T2 obj,ICallback <Object>success,ICallback<Throwable> fail){ return proxy(obj, 1, success, fail); } }
4、Saga模型框架基于Func类实现
package com.framework.utils.func; import java.util.ArrayList; import java.util.List; /** * saga分布式事务服务类 * 所有Gaga.service()调用的方法必须在Gaga.begin()和Gaga.begin()中间。 * xid就是threadid,可以通过Gaga.begin()获取 * @author giant * @date 2021年12月15日 */ public class Saga { /** * * 记录事务所有saga节点回滚函数,该对象代替了事件表,每次Gaga分布式事务必须调用Saga.begin()跟Saga.end(); * */ static ThreadLocal<List<ICallback<Throwable>>> sagaFailCallBacks = new ThreadLocal<>(); /** * saga事务开始;每次Gaga分布式事务必须调用Saga.begin()跟Saga.end(); * @author giant * @date 2021年12月15日 */ public static long begin() { long xid = Thread.currentThread().getId(); List<ICallback<Throwable>> fails = Saga.sagaFailCallBacks.get(); if (fails == null) { fails = new ArrayList<ICallback<Throwable>>(); Saga.sagaFailCallBacks.set(fails); } else { throw new SagaException("程序未关闭saga,请在finally方法执行Saga.end()"); } return xid; } /** * 代理事务节点,该节点执行失败,会根据参数重试,如果方法报错,会回滚其它所有事务的failRollback方法 * (注意:failRollback方法默认时所有都能执行成功的;建议回滚方法调用Func.proxy retry多次) * @author giant * @date 2021年12月15日 * @param obj 事务节点对象 * @param retry 事务失败重试次数 * @param failRollback 如果其中一个service模块失败,将在异常的时候回归,或者调用Gaga.rollback(null)回滚。 * @return */ @SuppressWarnings("unchecked") public static <T2> T2 service(T2 obj, int retry, ICallback<Throwable> failRollback) { try { List<ICallback<Throwable>> fails = Saga.sagaFailCallBacks.get(); if (fails == null) { throw new SagaException("请先开启Saga,调用Saga.begin()"); } fails.add(failRollback); Func p = new Func(obj, null, failRollback, retry, true); return (T2) p.get(); } catch (Exception e) { throw new SagaException(e); } } /** * 代理事务节点,如果报错方法报错,会回滚其它所有事务的failRollback方法 * (注意:failRollback方法默认时所有都能执行成功的;建议回滚方法调用Func.proxy retry多次) * @author giant * @date 2021年12月15日 * @param obj 事务节点对象 * @param failRollback * @return */ @SuppressWarnings("unchecked") public static <T2> T2 service(T2 obj, ICallback<Throwable> failRollback) { return service(obj, 1, failRollback); } /** * 回滚所有saga执行的链路,在本地方方法抛异常时也可调用,回滚所有方法 * * @author giant * @date 2021年12月15日 * @param e */ public static void rollback(Throwable e) { List<ICallback<Throwable>> failCallbacks = sagaFailCallBacks.get(); if (failCallbacks != null) { // 这里认为所有方法fail调用都是成功的,如果失败,就是人工处理了。 failCallbacks.forEach(f -> f.run(e)); failCallbacks.clear(); } sagaFailCallBacks.set(null); } /** * 结束saga,这个方法是要在saga事务结束的时候调用,调用了Saga.begin()就必须调用该方法 * @author giant * @date 2021年12月15日 */ public static void end() { List<ICallback<Throwable>> fails = Saga.sagaFailCallBacks.get(); if (fails != null) { fails.clear(); } Saga.sagaFailCallBacks.set(null); } }
saga异常实现
package com.framework.utils.func; public class SagaException extends RuntimeException { /** * */ private static final long serialVersionUID = 1L; public SagaException(String e){ super(e); } public SagaException(Throwable e){ super(e); } }