xxl-job源码分析
xxl-job
系统说明
安装
安装部署参考文档:
功能
定时调度、服务解耦、灵活控制跑批时间(停止、开启、重新设定时间、手动触发)
xxl-job是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用
概念
执行器列表:一个执行器是一个项目
任务:一个任务是一个项目中的 jobhandler
一个xxl-job服务可以有多个执行器(项目),一个项目下可以有多个任务(jobhandler),他们是如何关联的?
页面操作:
- 在管理平台可以新增执行器(项目)
- 在任务列表可以指定执行器(项目)下新增多个任务(jobhandler)
代码操作:
- 项目配置中增加 xxl.job.executor.appname = "执行器名称"
- 在实现类中增加 @jobhandler(value="xxl-job-demo") 注解,并继承 ijobhandler
架构图
抛出疑问
- 调度中心启动过程?
- 执行器启动过程?
- 执行器如何注册到调度中心?
- 调度中心怎么调用执行器?
- 集群调度时如何控制一个任务在该时刻不会重复执行
- 集群部署应该注意什么?
系统分析
执行器依赖jar包
com.xuxueli:xxl-job-core:2.1.0
com.xuxueli:xxl-registry-client:1.0.2
com.xuxueli:xxl-rpc-core:1.4.1
调度中心启动过程
// 1. 加载 xxljobadminconfig,adminconfig = this xxljobadminconfig.java // 启动过程代码 @component public class xxljobscheduler implements initializingbean, disposablebean { private static final logger logger = loggerfactory.getlogger(xxljobscheduler.class); @override public void afterpropertiesset() throws exception { // init i18n initi18n(); // admin registry monitor run // 2. 启动注册监控器(将注册到register表中的ip加载到group表)/ 30执行一次 jobregistrymonitorhelper.getinstance().start(); // admin monitor run // 3. 启动失败日志监控器(失败重试,失败邮件发送) jobfailmonitorhelper.getinstance().start(); // admin-server // 4. 初始化rpc服务 initrpcprovider(); // start-schedule // 5. 启动定时任务调度器(执行任务,缓存任务) jobschedulehelper.getinstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); } ...... }
执行器启动过程
@override public void start() throws exception { // init jobhandler repository // 将执行 jobhandler 注册到缓存中 jobhandlerrepository(concurrentmap) initjobhandlerrepository(applicationcontext); // refresh gluefactory // 刷新glue gluefactory.refreshinstance(1); // super start // 核心启动项 super.start(); } public void start() throws exception { // 初始化日志路径 // private static string logbasepath = "/data/applogs/xxl-job/jobhandler"; xxljobfileappender.initlogpath(this.logpath); // 初始化注册中心列表 (把注册地址放到 list) this.initadminbizlist(this.adminaddresses, this.accesstoken); // 启动日志文件清理线程 (一天清理一次) // 每天清理一次过期日志,配置参数必须大于3才有效 joblogfilecleanthread.getinstance().start((long)this.logretentiondays); // 开启触发器回调线程 triggercallbackthread.getinstance().start(); // 指定端口 this.port = this.port > 0 ? this.port : netutil.findavailableport(9999); // 指定ip this.ip = this.ip != null && this.ip.trim().length() > 0 ? this.ip : iputil.getip(); // 初始化rpc 将执行器注册到调度中心 30秒一次 this.initrpcprovider(this.ip, this.port, this.appname, this.accesstoken); }
执行器注册到调度中心
执行器
// 注册执行器入口 xxljobexecutor.java->initrpcprovider()->xxlrpcproviderfactory.start(); // 开启注册 xxlrpcproviderfactory.java->start(); // 执行注册 executorregistrythread.java->start(); // rpc 注册代码 for (adminbiz adminbiz: xxljobexecutor.getadminbizlist()) { try { returnt<string> registryresult = adminbiz.registry(registryparam); if (registryresult!=null && returnt.success_code == registryresult.getcode()) { registryresult = returnt.success; logger.debug(">>>>>>>>>>> xxl-job registry success, registryparam:{}, registryresult:{}", new object[]{registryparam, registryresult}); break; } else { logger.info(">>>>>>>>>>> xxl-job registry fail, registryparam:{}, registryresult:{}", new object[]{registryparam, registryresult}); } } catch (exception e) { logger.info(">>>>>>>>>>> xxl-job registry error, registryparam:{}", registryparam, e); } }
调度中心
// rpc 注册服务 adminbizimpl.java->registry();
数据库
调度中心调用执行器
/* 调度中心执行步骤 */ // 1. 调用执行器 xxljobtrigger.java->runexecutor(); // 2. 获取执行器 xxljobscheduler.java->getexecutorbiz(); // 3. 调用 executorbizimpl.java->run(); /* 执行器执行步骤 */ // 1. 执行器接口 executorbiz.java->run(); // 2. 执行器实现 executorbizimpl.java->run(); // 3. 把jobinfo 从 jobthreadrepository (concurrentmap) 中获取一个新线程,并开启新线程 xxljobexecutor.java->registjobthread(); // 4. 保存到当前线程队列 jobthread.java->pushtriggerqueue(); // 5. 执行 jobthread.java->handler.execute(triggerparam.getexecutorparams());
调度中心(admin)
实现 org.springframework.beans.factory.initializingbean类,重写 afterpropertiesset 方法,在初始化bean的时候都会执行该方法
disposablebean spring停止时执行
结束加载项
- 停止定时任务调度器(中断schedulethread,中断ringthread)
- 停止触发线程池(jobtriggerpoolhelper)
- 停止注册监控器(registrythread)
- 停止失败日志监控器(monitorthread)
- 停止rpc服务(stoprpcprovider)
手动执行方式
jobinfocontroller.java
@requestmapping("/trigger") @responsebody //@permissionlimit(limit = false) public returnt<string> triggerjob(int id, string executorparam) { // force cover job param if (executorparam == null) { executorparam = ""; } jobtriggerpoolhelper.trigger(id, triggertypeenum.manual, -1, null, executorparam); return returnt.success; }
定时调度策略
调度策略执行图
调度策略源码
jobschedulehelper.java->start();
路由策略
第一个
固定选择第一个机器
executorroutefirst.java->route();
最后一个
固定选择最后一个机器
executorroutelast.java->route();
轮询
随机选择在线的机器
executorrouteround.java->route(); private static int count(int jobid) { // cache clear if (system.currenttimemillis() > cache_valid_time) { routecounteachjob.clear(); cache_valid_time = system.currenttimemillis() + 1000*60*60*24; } // count++ integer count = routecounteachjob.get(jobid); count = (count==null || count>1000000)?(new random().nextint(100)):++count; // 初始化时主动random一次,缓解首次压力 routecounteachjob.put(jobid, count); return count; }
随机
随机获取地址列表中的一个
executorrouterandom.java->route();
一致性hash
一个job通过hash算法固定使用一台机器,且所有任务均匀散列在不同机器
executorrouteconsistenthash.java->route(); public string hashjob(int jobid, list<string> addresslist) { // ------a1------a2-------a3------ // -----------j1------------------ treemap<long, string> addressring = new treemap<long, string>(); for (string address: addresslist) { for (int i = 0; i < virtual_node_num; i++) { long addresshash = hash("shard-" + address + "-node-" + i); addressring.put(addresshash, address); } } long jobhash = hash(string.valueof(jobid)); // 取出键值 >= jobhash sortedmap<long, string> lastring = addressring.tailmap(jobhash); if (!lastring.isempty()) { return lastring.get(lastring.firstkey()); } return addressring.firstentry().getvalue(); }
最不经常使用
使用频率最低的机器优先被选举
把地址列表加入到内存中,等下次执行时剔除无效的地址,判断地址列表中执行次数最少的地址取出
频率、次数
executorroutelfu.java->route(); public string route(int jobid, list<string> addresslist) { // cache clear if (system.currenttimemillis() > cache_valid_time) { joblfumap.clear(); cache_valid_time = system.currenttimemillis() + 1000*60*60*24; } // lfu item init hashmap<string, integer> lfuitemmap = joblfumap.get(jobid); // key排序可以用treemap+构造入参compare;value排序暂时只能通过arraylist; if (lfuitemmap == null) { lfuitemmap = new hashmap<string, integer>(); joblfumap.putifabsent(jobid, lfuitemmap); // 避免重复覆盖 } // put new for (string address: addresslist) { if (!lfuitemmap.containskey(address) || lfuitemmap.get(address) >1000000 ) { // 0-n随机数,包括0不包括n lfuitemmap.put(address, new random().nextint(addresslist.size())); // 初始化时主动random一次,缓解首次压力 } } // remove old list<string> delkeys = new arraylist<>(); for (string existkey: lfuitemmap.keyset()) { if (!addresslist.contains(existkey)) { delkeys.add(existkey); } } if (delkeys.size() > 0) { for (string delkey: delkeys) { lfuitemmap.remove(delkey); } } /*********************** 优化 start ***********************/ // 优化 remove old部分 iterator<string> iterable = lfuitemmap.keyset().iterator(); while (iterable.hasnext()) { string address = iterable.next(); if (!addresslist.contains(address)) { iterable.remove(); } } /*********************** 优化 start ***********************/ // load least userd count address // 从小到大排序 list<map.entry<string, integer>> lfuitemlist = new arraylist<map.entry<string, integer>>(lfuitemmap.entryset()); collections.sort(lfuitemlist, new comparator<map.entry<string, integer>>() { @override public int compare(map.entry<string, integer> o1, map.entry<string, integer> o2) { return o1.getvalue().compareto(o2.getvalue()); } }); map.entry<string, integer> addressitem = lfuitemlist.get(0); string minaddress = addressitem.getkey(); addressitem.setvalue(addressitem.getvalue() + 1); return addressitem.getkey(); }
最近最久未使用
最久未使用的机器优先被选举
用链表的方式存储地址,第一个地址使用后下次该任务过来使用第二个地址,依次类推(ps:有点类似轮询策略)
与轮询策略的区别:
- 轮询策略是第一次随机找一台机器执行,后续执行会将索引加1取余
- 轮询策略依赖 addresslist 的顺序,如果这个顺序变了,索引到下一次的机器可能不是期望的顺序
- lru算法第一次执行会把所有地址加载进来并缓存,从第一个地址开始执行,即使 addresslist 地址顺序变了也不影响
次数
executorroutelru.java->route(); public string route(int jobid, list<string> addresslist) { // cache clear if (system.currenttimemillis() > cache_valid_time) { joblrumap.clear(); cache_valid_time = system.currenttimemillis() + 1000*60*60*24; } // init lru linkedhashmap<string, string> lruitem = joblrumap.get(jobid); if (lruitem == null) { /** * linkedhashmap * a、accessorder:ture=访问顺序排序(get/put时排序);false=插入顺序排期; * b、removeeldestentry:新增元素时将会调用,返回true时会删除最老元素;可封装linkedhashmap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的lru算法; */ lruitem = new linkedhashmap<string, string>(16, 0.75f, true); joblrumap.putifabsent(jobid, lruitem); } /*********************** 举个例子 start ***********************/ // 如果accessorder为true的话,则会把访问过的元素放在链表后面,放置顺序是访问的顺序 // 如果accessorder为flase的话,则按插入顺序来遍历 linkedhashmap<string, string> lruitem = new linkedhashmap<string, string>(16, 0.75f, true); joblrumap.putifabsent(1, lruitem); lruitem.put("192.168.0.1", "192.168.0.1"); lruitem.put("192.168.0.2", "192.168.0.2"); lruitem.put("192.168.0.3", "192.168.0.3"); string eldestkey = lruitem.entryset().iterator().next().getkey(); string eldestvalue = lruitem.get(eldestkey); system.out.println(eldestvalue + ": " + lruitem); eldestkey = lruitem.entryset().iterator().next().getkey(); eldestvalue = lruitem.get(eldestkey); system.out.println(eldestvalue + ": " + lruitem); // 输出结果: 192.168.0.1: {192.168.0.2=192.168.0.2, 192.168.0.3=192.168.0.3, 192.168.0.1=192.168.0.1} 192.168.0.2: {192.168.0.3=192.168.0.3, 192.168.0.1=192.168.0.1, 192.168.0.2=192.168.0.2} /*********************** 举个例子 end ***********************/ // put new for (string address: addresslist) { if (!lruitem.containskey(address)) { lruitem.put(address, address); } } // remove old list<string> delkeys = new arraylist<>(); for (string existkey: lruitem.keyset()) { if (!addresslist.contains(existkey)) { delkeys.add(existkey); } } if (delkeys.size() > 0) { for (string delkey: delkeys) { lruitem.remove(delkey); } } // load string eldestkey = lruitem.entryset().iterator().next().getkey(); string eldestvalue = lruitem.get(eldestkey); return eldestvalue; }
故障转移
按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度
executorroutefailover.java->route();
忙碌转移
按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度
executorroutebusyover.java->route();
分片广播
广播触发对应集群中所有机器执行一次任务,同时传递分片参数;可根据分片参数开发分片任务
阻塞处理策略
为了解决执行线程因并发问题、执行效率慢、任务多等原因而做的一种线程处理机制,主要包括 串行、丢弃后续调度、覆盖之前调度,一般常用策略是串行机制
executorblockstrategyenum.java serial_execution("serial execution"), // 串行 discard_later("discard later"), // 丢弃后续调度 cover_early("cover early"); // 覆盖之前调度 executorbizimpl.java->run(); // executor block strategy if (jobthread != null) { executorblockstrategyenum blockstrategy = executorblockstrategyenum.match(triggerparam.getexecutorblockstrategy(), null); if (executorblockstrategyenum.discard_later == blockstrategy) { // discard when running if (jobthread.isrunningorhasqueue()) { return new returnt<string>(returnt.fail_code, "block strategy effect:"+executorblockstrategyenum.discard_later.gettitle()); } } else if (executorblockstrategyenum.cover_early == blockstrategy) { // kill running jobthread if (jobthread.isrunningorhasqueue()) { removeoldreason = "block strategy effect:" + executorblockstrategyenum.cover_early.gettitle(); jobthread = null; } } else { // just queue trigger } }
单机串行
对当前线程不做任何处理,并在当前线程的队列里增加一个执行任务
丢弃后续调度
如果当前线程阻塞,后续任务不再执行,直接返回失败
覆盖之前调度
创建一个移除原因,新建一个线程去执行后续任务
运行模式
executorbizimpl.java->run();
bean
java里的bean对象
glue(java)
利用java的反射机制,通过代码字符串生成实体类
ijobhandler originjobhandler = gluefactory.getinstance().loadnewinstance(triggerparam.getgluesource()); groovyclassloader
glue(shell python php nodejs powershell)
按照文件命名规则创建一个执行脚本文件和一个日志输出文件,通过脚本执行器执行
失败重试次数
任务失败后记录到 xxl_job_log 中,由失败监控线程查询处理失败的任务且失败次数大于0,继续执行
任务超时时间
把超时时间给 triggerparam 触发参数,在调用执行器的任务时超时时间,有点类似httpclient的超时时间
执行器(exector)
注册自己的机器地址
注册项目中的 jobhandler
-
提供被调度中心调用的接口
public interface executorbiz { /** * 供调度中心检测机器是否存活 * * beat * @return */ public returnt<string> beat(); /** * 供调度中心检测机器是否空闲 * * @param jobid * @return */ public returnt<string> idlebeat(int jobid); /** * kill * @param jobid * @return */ public returnt<string> kill(int jobid); /** * log * @param logdatetim * @param logid * @param fromlinenum * @return */ public returnt<logresult> log(long logdatetim, long logid, int fromlinenum); /** * 执行触发器 * * @param triggerparam * @return */ public returnt<string> run(triggerparam triggerparam); }
总结
学到了什么
- 算法(lfu、lru、轮询等)
- jdk动态代理对象(详细研究)
- 用到了netty(详细研究)
- futuretask
- groovyclassloader