用AOP与Threadlocal实现超简单TCC事务框架 aoptcc
程序员文章站
2022-03-13 09:21:48
...
用AOP与Threadlocal实现一个mini的TCC事务框架
TCC是处理分布式事务的一种技术,每个服务提供者提供TRY/CONFIRM/CANCEL三个接口,分别对应资源锁定,提交,取消操作。看到github上有些复杂完善的TCC框架,本着简单用AOP与ThreadLocal来做一个简单的框架,验证下自己的想法是否可行,同时练练手。
其中的TCC三调用的方法切换,以及考虑后续要使用try返回值处理,本人采用了**一种投机取巧的方式**来实现。
## 一、主要目标
只考虑几个简单的目标
### 1. TCC调用的关系信息
每个TCC服务调用一般都要包装一下,而且要先定义好谁是TRY,谁是CONFIRM/CANCEL。比如在spring cloud的hystrix中,有注解中设定降级方法,另外写好降级方法,在扫描时,相关信息都记录下来备用。比如try失败了要找到执行哪个cancel。
### 2. 具体调用的参数
在运行中,具体的调用与参数,都需要在事务过程中存下来,可以通过在调用过程中增加AOP,得到相关信息。不仅当时的调用参数,包括try的结果要记录下来。
### 3. 事务中相关的调用
因为要么所有的try都成功,再全部执行confirm操作,如果有一个失败,所有已经成功的都调用cancel操作。所以已经执行的都要记录下来
### 4. 完整性保障
如果confirm/cancel操作失败了,可能记录,可能重试,要考虑。
## 二、设计方案
一般是对Try方法进行注解,而且注解中指明成功与失败执行什么。可是这些注解解析,保存,工作量不少,这个先不考虑。这里要删繁就简的处理。
在执行方法时,肯定有公共的信息要存,所以spring AOP来切入保存数据。对每个参与者方法与聚合方法都要处理,所以使用两个注解进行AOP即可。一个总控,一个在具体执行中。
由于在AOP的@Around过程中,可以得到当前调用的ProceedingJoinPoint对象,它包含了当前调用的所有信息的封装,可以很好的利用一下。
比如ProceedingJoinPoint可以得到当前调用的参数,而参数是可以修改后再执行的,那么切换TCC方法就可以利用这个特点。于是对一个参与者的TCC调用包装成一个单一方法,并用一个TccType的参数进行区分,switch内部一分为三,对应TCC。为了方便修改参数,这个参数排第一位置。
另外,Try方法可能返回参与者自己系统的ID,用于根据ID进行确认或者取消,这个参数正好也利用一下ProceedingJoinPoint。为了方便设置这个参数,就定在单一方法的第二个参数位置,Try调用时,这个位置置null即可。
至于每个调用都的调用保存,就把它们的ProceedingJoinPoint放在一个list中,这个list保存在ThreadLocal中就行了,一开始产生一个空的list,成功了就放进去,如果都成功,就取出来修改参数运行,失败了也是修改参数再运行。
另外考虑本地事务如果与多个参与者在一起怎么办?本地就用@Transactional吧,不管哪个步骤有问题,都抛出异常,让本地事务也回滚。
最后提到完整性,如果Confirm/Cancel失败了怎么办?同样为了简化,提供一个接口,把ProceedingJoinPoint传出来,由使用都来实现,是先记录呢,还是再重试呢,还是同时要分发进行不同的处理都不管了,外部实现。
## 三、主要代码
核心方法都已经注解
## 四、结果
聚合方法及其中一个参与者Service
正常与回滚的运行结果
## 五、源码
https://github.com/herriman76/tcc-mini
TCC是处理分布式事务的一种技术,每个服务提供者提供TRY/CONFIRM/CANCEL三个接口,分别对应资源锁定,提交,取消操作。看到github上有些复杂完善的TCC框架,本着简单用AOP与ThreadLocal来做一个简单的框架,验证下自己的想法是否可行,同时练练手。
其中的TCC三调用的方法切换,以及考虑后续要使用try返回值处理,本人采用了**一种投机取巧的方式**来实现。
## 一、主要目标
只考虑几个简单的目标
### 1. TCC调用的关系信息
每个TCC服务调用一般都要包装一下,而且要先定义好谁是TRY,谁是CONFIRM/CANCEL。比如在spring cloud的hystrix中,有注解中设定降级方法,另外写好降级方法,在扫描时,相关信息都记录下来备用。比如try失败了要找到执行哪个cancel。
### 2. 具体调用的参数
在运行中,具体的调用与参数,都需要在事务过程中存下来,可以通过在调用过程中增加AOP,得到相关信息。不仅当时的调用参数,包括try的结果要记录下来。
### 3. 事务中相关的调用
因为要么所有的try都成功,再全部执行confirm操作,如果有一个失败,所有已经成功的都调用cancel操作。所以已经执行的都要记录下来
### 4. 完整性保障
如果confirm/cancel操作失败了,可能记录,可能重试,要考虑。
## 二、设计方案
一般是对Try方法进行注解,而且注解中指明成功与失败执行什么。可是这些注解解析,保存,工作量不少,这个先不考虑。这里要删繁就简的处理。
在执行方法时,肯定有公共的信息要存,所以spring AOP来切入保存数据。对每个参与者方法与聚合方法都要处理,所以使用两个注解进行AOP即可。一个总控,一个在具体执行中。
由于在AOP的@Around过程中,可以得到当前调用的ProceedingJoinPoint对象,它包含了当前调用的所有信息的封装,可以很好的利用一下。
比如ProceedingJoinPoint可以得到当前调用的参数,而参数是可以修改后再执行的,那么切换TCC方法就可以利用这个特点。于是对一个参与者的TCC调用包装成一个单一方法,并用一个TccType的参数进行区分,switch内部一分为三,对应TCC。为了方便修改参数,这个参数排第一位置。
另外,Try方法可能返回参与者自己系统的ID,用于根据ID进行确认或者取消,这个参数正好也利用一下ProceedingJoinPoint。为了方便设置这个参数,就定在单一方法的第二个参数位置,Try调用时,这个位置置null即可。
至于每个调用都的调用保存,就把它们的ProceedingJoinPoint放在一个list中,这个list保存在ThreadLocal中就行了,一开始产生一个空的list,成功了就放进去,如果都成功,就取出来修改参数运行,失败了也是修改参数再运行。
另外考虑本地事务如果与多个参与者在一起怎么办?本地就用@Transactional吧,不管哪个步骤有问题,都抛出异常,让本地事务也回滚。
最后提到完整性,如果Confirm/Cancel失败了怎么办?同样为了简化,提供一个接口,把ProceedingJoinPoint传出来,由使用都来实现,是先记录呢,还是再重试呢,还是同时要分发进行不同的处理都不管了,外部实现。
## 三、主要代码
核心方法都已经注解
```java //两个AOP切点 @Pointcut("@annotation(com.so_mini.tcc.annotation.Tcc)") public void tccAspect() { System.out.println("Tcc");//注解聚合方法 } @Pointcut("@annotation(com.so_mini.tcc.annotation.TccEach)") public void tccEachAspect() { System.out.println("TccEach");//注解每一个TCC参与者方法 } //对应的环绕AOP处理 @Around("tccAspect()") public Object aroundMethod(ProceedingJoinPoint pjd) { Object result = null; String methodName = pjd.getSignature().getName(); Map<String,Object> tccInfo=new HashMap<String,Object>(); tccInfo.put("list", new ArrayList<ProceedingJoinPoint>()); threadLocal.set(tccInfo);//当前线程局部变量中保存 try { result = pjd.proceed(); Map<String,Object> tccInfoRe=(Map<String, Object>) threadLocal.get(); List<ProceedingJoinPoint> participant=(List<ProceedingJoinPoint>)(tccInfoRe.get("list")); System.out.println("【正常,需确认数】"+participant.size()); //没有异常,执行所有try对应的confirm广场 for(ProceedingJoinPoint pj:participant){ toConfirm(pj, _CCFailListenerList); } // participant.forEach(joinPoint-->(org.aspectj.lang.JoinPoint)joinPoint); return result; } catch (Throwable e) { Map<String,Object> tccInfoRe=(Map<String, Object>) threadLocal.get(); List<ProceedingJoinPoint> participant=(List<ProceedingJoinPoint>)(tccInfoRe.get("list")); System.out.println("【有问题,取消已完成数】"+participant.size()); //有异常,已经成功的try方法,都要执行对应的cancel方法 for(ProceedingJoinPoint pj:participant){ toCancle(pj,_CCFailListenerList); } throw new RuntimeException("TCC异常");//抛异常,本地DB事务回滚。 } finally{ threadLocal.remove(); System.out.println("【threadLocal removed!】"); } } @Around("tccEachAspect()") public Object aroundEachMethod(ProceedingJoinPoint pjd) { Object result = null; String methodName = pjd.getSignature().getName(); Map tccInfo=(Map) threadLocal.get(); List participant=(List)(tccInfo.get("list")); try { participant.add(pjd);//预存下来 result = pjd.proceed(); System.out.println("["+methodName+"] return value:"+result); setTryResult(pjd,result);//【设置try执行的结果】 return result; } catch (Throwable e) { participant.remove(pjd);//从成功的列表中移除,抛异常让整体cancel throw new RuntimeException(e); } finally{ } } //正常try,第二个参数记录结果 public static void setTryResult(ProceedingJoinPoint point,Object result) { int i=0; Object[] args = point.getArgs(); args[1]=result; } //需要确认时,第一个参数修改为confirm public static void toConfirm(ProceedingJoinPoint point,List<CCFailListener> cCFailListenerList) { Object[] args = point.getArgs(); args[0]=(Object)(TccPhaseEnum.Confirm); try { point.proceed(args); } catch (Throwable e) { cCFailListenerList.forEach(listener->listener.notify(point, TccPhaseEnum.Confirm));//出错由监听器处理,用户自己实现 } } ```
## 四、结果
聚合方法及其中一个参与者Service
```java @Transactional @Tcc public void createTripTrans(String orderId){ System.out.println("BEGIN db-trans..."); staffService.addStaff(orderId);//简单的本地数据库操作 System.out.println("BEGIN remote..."); eatBook.bookEat(TccPhaseEnum.Try,null,orderId);//前两个参数固定!!! houseBook.bookHouse(TccPhaseEnum.Try,null,orderId); planeBook.bookPlane(TccPhaseEnum.Try,null,orderId); } @Service public class HouseBook { @TccEach public String bookHouse(TccPhaseEnum type,Object tryResult,String orderId){ switch (type) { case Try: System.out.println("house book...!"); if("222".equals(orderId)) throw new RuntimeException("house failure"); return "HS100083";//模拟try的返回值 case Confirm: confirmHouse(tryResult);//必须使用前面的返回值,否则异常 return null; case Cancel: System.out.println("house cancel...!"+tryResult); return null; } return null; } private void confirmHouse(Object houseId){ if("HS100083".equals(houseId)) System.out.println("house confirm...!"+houseId); else{ throw new RuntimeException("houseId missing and confirme failure"); } } } ```
正常与回滚的运行结果
```java //第三个TCC出错,前两个执行CANCEL,本地DB事务回滚 BEGIN db-trans... Hibernate: select staffbo0_.id ..... BEGIN remote... Eat book...!//第一个参与者 try [bookEat] return value:null//try 的 返回值 house book...!//第二个参与者 try [bookHouse] return value:HS100083//try 的 返回值 【有问题,取消已完成数】2//第三个失败了,取消前两个成功的try Eat cnacel...!//第一个参与者 house cancel...!HS100083//第二个参与者 cancel使用了try的结果 【threadLocal removed!】//all finished java.lang.RuntimeException: TCC异常 ``` ```java //正常情况 BEGIN db-trans... Hibernate: select staffbo0_.id ... BEGIN remote... Eat book...! [bookEat] return value:null house book...! [bookHouse] return value:HS100083 plane booked! [bookPlane] return value:null 【正常,需确认数】3 Eat confirm...! house confirm...!HS100083 plane confirmed! 【threadLocal removed!】 Hibernate: insert into staff (age, name, id) values (?, ?, ?) ```
## 五、源码
https://github.com/herriman76/tcc-mini