Java按时间梯度实现异步回调接口的方法
1. 背景
在业务处理完之后,需要调用其他系统的接口,将相应的处理结果通知给对方,若是同步请求,假如调用的系统出现异常或是宕机等事件,会导致自身业务受到影响,事务会一直阻塞,数据库连接不够用等异常现象,可以通过异步回调来防止阻塞,但异步的情况还存在一个问题,若调用一次不成功的话接下来怎么处理?这个地方就需要按时间梯度回调,比如前期按10s间隔回调,回调3次,若不成功按30s回调,回调2次,再不成功按分钟回调,依次类推……相当于给了对方系统恢复的时间,不可能一直处于异常或宕机等异常状态,若是再不成功可以再通过人工干预的手段去处理了,具体业务具体实现。
2. 技术实现
大体实现思路如下图,此过程用到两个队列,当前队列和next队列,当前队列用来存放第一次需要回调的数据对象,如果调用不成功则放入next队列,按照制定的时间策略再继续回调,直到成功或最终持久化后人工接入处理。
用到的技术如下:
•http请求库,retrofit2
•队列,linkedblockingqueue
•调度线程池,scheduledexecutorservice
3. 主要代码说明
3.1 回调时间梯度的策略设计
采用枚举来对策略规则进行处理,便于代码上的维护,该枚举设计三个参数,级别、回调间隔、回调次数;
/** * 回调策略 */ public enum callbacktype { //等级1,10s执行3次 seconds_10(1, 10, 3), //等级2,30s执行2次 seconds_30(2, 30, 2), //等级3,60s执行2次 minute_1(3, 60, 2), //等级4,5min执行1次 minute_5(4, 300, 1), //等级5,30min执行1次 minute_30(5, 30*60, 1), //等级6,1h执行2次 hour_1(6, 60*60, 1), //等级7,3h执行2次 hour_3(7, 60*60*3, 1), //等级8,6h执行2次 hour_6(8, 60*60*6, 1); //级别 private int level; //回调间隔时间 秒 private int intervaltime; //回调次数 private int count; }
3.2 数据传输对象设计
声明抽象父类,便于其他对象调用传输继承。
/** * 消息对象父类 */ public abstract class messageinfo { //开始时间 private long starttime; //更新时间 private long updatetime; //是否回调成功 private boolean issuccess=false; //回调次数 private int count=0; //回调策略 private callbacktype callbacktype; }
要传输的对象,继承消息父类;
/** * 工单回调信息 */ public class workordermessage extends messageinfo { //车架号 private string vin; //工单号 private string workorderno; //工单状态 private integer status; //工单原因 private string reason; //操作用户 private integer userid; }
3.3 调度线程池的使用
//声明线程池,大小为16 private scheduledexecutorservice pool = executors.newscheduledthreadpool(16); ...略 while (true){ //从队列获取数据,交给定时器执行 try { workordermessage message = messagequeue.getmessagefromnext(); long excuetime = message.getupdatetime()+message.getcallbacktype().getintervaltime()* 1000; long t = excuetime - system.currenttimemillis(); if (t/1000 < 5) {//5s之内将要执行的数据提交给调度线程池 system.out.println("messagehandlenext-满足定时器执行条件"+jsonobject.tojsonstring(message)); pool.schedule(new callable<boolean>() { @override public boolean call() throws exception { remotecallback(message); return true; } }, t, timeunit.milliseconds); }else { messagequeue.putmessagetonext(message); } } catch (interruptedexception e) { system.out.println(e); } }
3.4 retrofit2的使用,方便好用。
具体可查看官网相关文档进行了解,用起来还是比较方便的。
retrofit初始化:
import retrofit2.retrofit; import retrofit2.converter.gson.gsonconverterfactory; public class retrofithelper { private static final string http_url = "http://baidu.com/"; private static retrofit retrofit; public static retrofit instance(){ if (retrofit == null){ retrofit = new retrofit.builder() .baseurl(http_url) .addconverterfactory(gsonconverterfactory.create()) .build(); } return retrofit; } }
如果需要修改超时时间,连接时间等可以这样初始话,retrofit采用okhttpclient
import okhttp3.okhttpclient; import retrofit2.retrofit; import retrofit2.converter.gson.gsonconverterfactory; import java.util.concurrent.timeunit; public class retrofithelper { private static final string http_url = "http://baidu.com/"; private static retrofit retrofit; public static retrofit instance(){ if (retrofit == null){ retrofit = new retrofit.builder() .baseurl(http_url) .client(new okhttpclient.builder() .connecttimeout(30, timeunit.seconds)//连接时间 .readtimeout(30, timeunit.seconds)//读时间 .writetimeout(30, timeunit.seconds)//写时间 .build()) .addconverterfactory(gsonconverterfactory.create()) .build(); } return retrofit; } }
retrofit使用通过接口调用,要先声明一个接口;
import com.alibaba.fastjson.jsonobject; import com.woasis.callbackdemo.bean.workordermessage; import retrofit2.call; import retrofit2.http.body; import retrofit2.http.post; public interface workordermessageinterface { @post("/api") call<jsonobject> updatebatteryinfo(@body workordermessage message); }
接口和实例对象准备好了,接下来就是调用;
private void remotecallback(workordermessage message){ //实例接口对象 workordermessageinterface workordermessageinterface = retrofithelper.instance().create(workordermessageinterface.class); //调用接口方法 call<jsonobject> objectcall = workordermessageinterface.updatebatteryinfo(message); system.out.println("远程调用执行:"+new date()); //异步调用执行 objectcall.enqueue(new callback<jsonobject>() { @override public void onresponse(call<jsonobject> call, response<jsonobject> response) { system.out.println("messagehandlenext****调用成功"+thread.currentthread().getid()); message.setsuccess(true); system.out.println("messagehandlenext-回调成功"+jsonobject.tojsonstring(message)); } @override public void onfailure(call<jsonobject> call, throwable throwable) { system.out.println("messagehandlenext++++调用失败"+thread.currentthread().getid()); //失败后再将数据放入队列 try { //对回调策略初始化 long currenttime = system.currenttimemillis(); message.setupdatetime(currenttime); message.setsuccess(false); callbacktype callbacktype = message.getcallbacktype(); //获取等级 int level = callbacktype.getlevel(callbacktype); //获取次数 int count = callbacktype.getcount(callbacktype); //如果等级已经最高,则不再回调 if (callbacktype.hour_6.getlevel() == callbacktype.getlevel() && count == message.getcount()){ system.out.println("messagehandlenext-等级最高,不再回调, 线下处理:"+jsonobject.tojsonstring(message)); }else { //看count是否最大,count次数最大则增加level if (message.getcount()<callbacktype.getcount()){ message.setcount(message.getcount()+1); }else {//如果不小,则增加level message.setcount(1); level += 1; message.setcallbacktype(callbacktype.gettypebylevel(level)); } messagequeue.putmessagetonext(message); } } catch (interruptedexception e) { e.printstacktrace(); system.out.println("messagehandlenext-放入队列数据失败"); } } }); }
3.5结果实现
4.总结
本次实现了按照时间梯度去相应其他系统的接口,不再导致本身业务因其他系统的异常而阻塞。
源码:
以上所述是小编给大家介绍的java按时间梯度实现异步回调接口,希望对大家有所帮助
推荐阅读