XxlJob源码解析
git地址:https://github.com/xuxueli/xxl-job
版本:v2.1.1
执行器初始化流程
在容器初始化好xxlJobExecutor后会执行其initMethod,也就是start方法
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appName;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppName(appName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
这是start方法的一些启动步骤
启动分为3步,
1.初始化作业执行器,并且全部缓存起来
初始化容器作业,就是从spring上下文中获取到使用了所有使用了@JobHandler注解的bean,然后根据这个注解的value作为key,实例作为value缓存到名为jobHandlerRepository的map中
2.刷新GLUE工厂
3.调用父类启动方法
XxlJobSpringExecutor.java
@Override
public void start() throws Exception {
// 1.初始化作业执行器,并且全部缓存起来
initJobHandlerRepository(applicationContext);
// 2.刷新GLUE工厂
GlueFactory.refreshInstance(1);
// 3.调用父类启动方法
super.start();
}
private void initJobHandlerRepository(ApplicationContext applicationContext){
if (applicationContext == null) {
return;
}
// init job handler action
//获得所有用到了@JobHandler注解的bean
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
if (serviceBean instanceof IJobHandler){
String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
IJobHandler handler = (IJobHandler) serviceBean;
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler["+ name +"] naming conflicts.");
}
//把所有执行器bean缓存起来
registJobHandler(name, handler);
}
}
}
}
把加了@JobHandler注解的bean全部缓存到jobHandlerRepository中
XxlJobExecutor.java
// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
下面就是父类的start方法的一个流程
1.会先初始化本地日志路径,
2.初始化调度中心的地址列表,创建好adminBiz实例,调度中心客户端
3.启动日志文件清理的线程
4.启动回调子任务的线程
5.初始化执行服务器
6.初始化Rpc提供程序
下面详细看看这个第4步
XxlJobExecutor.java
/**
* ---------------------- start + stop ----------------------
* @throws Exception
*/
public void start() throws Exception {
// 1.init logpath 初始化本地日志路径
XxlJobFileAppender.initLogPath(logPath);
// 初始化调用者,管理客户端
// 2.初始化调度中心的地址列表,创建好adminBiz实例,调度中心客户端
initAdminBizList(adminAddresses, accessToken);
//3. 初始化作业日志文件清理线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 4.初始化触发回调线程,执行子任务
TriggerCallbackThread.getInstance().start();
// 5.初始化执行服务器
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
//6.初始化Rpc提供程序
initRpcProvider(ip, port, appName, accessToken);
}
这边主要启动了2个线程,并且都设置为了守护线程,就是说这2个线程不会影响jvm的退出
1个线程是实时去消费阻塞队列里面的回调参数,另一个线程主要是负责运行重试回调失败的调度任务,30秒执行一次
TriggerCallbackThread.java
public void start() {
// valid
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
return;
}
// callback
triggerCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
// 正常回调
while(!toStop){
try {
//这里采用了阻塞队列,可以看出,当服务中心发送任务到此队列,就会被消费
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
// 回调参数集合
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// 回调,如果发生错误将把错误写入日志文件然后重试
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
// 最后的回调
try {
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");
}
});
triggerCallbackThread.setDaemon(true);
triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
triggerCallbackThread.start();
// 重试
triggerRetryCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
while(!toStop){
try {
//重试失败回调文件
retryFailCallbackFile();
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
//每30s执行一次
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
}
});
triggerRetryCallbackThread.setDaemon(true);
//调用重试回调线程
triggerRetryCallbackThread.start();
}
public void toStop(){
toStop = true;
// stop callback, interrupt and wait
if (triggerCallbackThread != null) { // support empty admin address
triggerCallbackThread.interrupt();
try {
triggerCallbackThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// stop retry, interrupt and wait
if (triggerRetryCallbackThread != null) {
triggerRetryCallbackThread.interrupt();
try {
triggerRetryCallbackThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
/**
* 进行回调,如果发生错误将重试
* @param callbackParamList
*/
private void doCallback(List<HandleCallbackParam> callbackParamList){
boolean callbackRet = false;
// callback, will retry if error
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
callbackRet = true;
break;
} else {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
}
} catch (Exception e) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
}
}
//如果回调失败,就写入失败回调文件,错误日志回调线程会继续重试
if (!callbackRet) {
//写入失败回调文件
appendFailCallbackFile(callbackParamList);
}
}
TriggerCallbackThread.java
/**
* 进行回调,如果发生错误将重试
* @param callbackParamList
*/
private void doCallback(List<HandleCallbackParam> callbackParamList){
boolean callbackRet = false;
// 回调,如果出错将重试
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
callbackRet = true;
break;
} else {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
}
} catch (Exception e) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
}
}
//如果回调失败,就写入失败回调文件,错误日志回调线程会继续重试
if (!callbackRet) {
//写入失败回调文件
appendFailCallbackFile(callbackParamList);
}
}
AdminBizImpl.java
/**
* 回调
* @param callbackParamList 回调参数集合
* @return
*/
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
for (HandleCallbackParam handleCallbackParam: callbackParamList) {
ReturnT<String> callbackResult = callback(handleCallbackParam);
logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
(callbackResult.getCode()==IJobHandler.SUCCESS.getCode()?"success":"fail"), handleCallbackParam, callbackResult);
}
return ReturnT.SUCCESS;
}
循环调度子任务,并保存日志
AdminBizImpl.java
private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
// valid log item
XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId());
if (log == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
}
if (log.getHandleCode() > 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc
}
// 调度子任务
String callbackMsg = null;
if (IJobHandler.SUCCESS.getCode() == handleCallbackParam.getExecuteResult().getCode()) {
//根据jodId查询XxlJobInfo
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId());
if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) {
callbackMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>";
//获得子任务id的数组
String[] childJobIds = xxlJobInfo.getChildJobId().split(",");
for (int i = 0; i < childJobIds.length; i++) {
//校验
int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1;
if (childJobId > 0) {
//调度子任务
JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null);
ReturnT<String> triggerChildResult = ReturnT.SUCCESS;
// add msg
callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"),
(i+1),
childJobIds.length,
childJobIds[i],
(triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")),
triggerChildResult.getMsg());
} else {
callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"),
(i+1),
childJobIds.length,
childJobIds[i]);
}
}
}
}
// 处理msg
StringBuffer handleMsg = new StringBuffer();
if (log.getHandleMsg()!=null) {
handleMsg.append(log.getHandleMsg()).append("<br>");
}
if (handleCallbackParam.getExecuteResult().getMsg() != null) {
handleMsg.append(handleCallbackParam.getExecuteResult().getMsg());
}
if (callbackMsg != null) {
handleMsg.append(callbackMsg);
}
// 成功, 保存日志
log.setHandleTime(new Date());
log.setHandleCode(handleCallbackParam.getExecuteResult().getCode());
log.setHandleMsg(handleMsg.toString());
xxlJobLogDao.updateHandleInfo(log);
return ReturnT.SUCCESS;
}
然后看看第6步
6.初始化Rpc提供程序
XxlJobExecutor.java
/**
* 初始化rpc提供者
* @param ip
* @param port
* @param appName
* @param accessToken
* @throws Exception
*/
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
// init, provider factory
String address = IpUtil.getIpPort(ip, port);
Map<String, String> serviceRegistryParam = new HashMap<String, String>();
serviceRegistryParam.put("appName", appName);
serviceRegistryParam.put("address", address);
xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
// 添加服务
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
// 启动Rpc提供工厂
xxlRpcProviderFactory.start();
}
这里会调用XxlJobExecutor的注册方法
XxlRpcProviderFactory.java
public void start() throws Exception {
this.serviceAddress = IpUtil.getIpPort(this.ip, this.port);
this.server = (Server)this.netType.serverClass.newInstance();
this.server.setStartedCallback(new BaseCallback() {
public void run() throws Exception {
if (XxlRpcProviderFactory.this.serviceRegistryClass != null) {
XxlRpcProviderFactory.this.serviceRegistry = (ServiceRegistry)XxlRpcProviderFactory.this.serviceRegistryClass.newInstance();
XxlRpcProviderFactory.this.serviceRegistry.start(XxlRpcProviderFactory.this.serviceRegistryParam);
if (XxlRpcProviderFactory.this.serviceData.size() > 0) {
XxlRpcProviderFactory.this.serviceRegistry.registry(XxlRpcProviderFactory.this.serviceData.keySet(), XxlRpcProviderFactory.this.serviceAddress);
}
}
}
});
this.server.setStopedCallback(new BaseCallback() {
public void run() {
if (XxlRpcProviderFactory.this.serviceRegistry != null) {
if (XxlRpcProviderFactory.this.serviceData.size() > 0) {
XxlRpcProviderFactory.this.serviceRegistry.remove(XxlRpcProviderFactory.this.serviceData.keySet(), XxlRpcProviderFactory.this.serviceAddress);
}
XxlRpcProviderFactory.this.serviceRegistry.stop();
XxlRpcProviderFactory.this.serviceRegistry = null;
}
}
});
this.server.start(this);
}
调用任务的注册和删除
XxlJobExecutor.java
//执行器启动初始化的时候回调用这里
public static class ExecutorServiceRegistry extends ServiceRegistry {
@Override
public void start(Map<String, String> param) {
// 启动注册
ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
}
@Override
public void stop() {
// 停止注册
ExecutorRegistryThread.getInstance().toStop();
}
@Override
public boolean registry(Set<String> keys, String value) {
return false;
}
@Override
public boolean remove(Set<String> keys, String value) {
return false;
}
@Override
public Map<String, TreeSet<String>> discovery(Set<String> keys) {
return null;
}
@Override
public TreeSet<String> discovery(String key) {
return null;
}
}
执行器注册表线程,具体注册方法,
启动一个ExecutorRegistryThread守护线程,每30秒去注册或更新一次
ExecutorRegistryThread.java
public void start(final String appName, final String address){
// valid
if (appName==null || appName.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appName is null.");
return;
}
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
return;
}
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
if (!toStop) {
//30秒注册一次
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
}
// registry remove
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
if (!toStop) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
}
});
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();
}
调度器启动流程
/**
* @author xuxueli 2018-10-28 00:18:17
*/
@Component
//xxlJobAdminConfig先被初始化
@DependsOn("xxlJobAdminConfig")
public class XxlJobScheduler implements InitializingBean, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
@Override
public void afterPropertiesSet() throws Exception {
// init i18n
initI18n();
/**
* 1.调度任务注册监控助手运行,每30s运行一次
* 主要监听90秒之内没有更新信息的***器删除掉
* 然后查询90s以内有更新的机器列表
* 并且把这些机器的最新ip更新到XxlJobGroup表
* 多个地址以逗号分隔
*/
JobRegistryMonitorHelper.getInstance().start();
/**
* 2.调度任务失败监控助手运行,每10s运行一次
* 主要负责处理失败的调度任务重新执行,并且更新调度日志
*/
JobFailMonitorHelper.getInstance().start();
// 3.初始化Rpc提供者
initRpcProvider();
// 4.执行调度任务
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
}
调度任务注册监控助手运行
JobRegistryMonitorHelper.java
public void start(){
registryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// auto registry group
//执行器地址类型:0=自动注册、1=手动录入 查询所有类型为自动注册的执行器
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
//删除无效地址(管理员/执行者)
//删除 90秒之内没有更新信息的***器, 90秒没有心跳信息返回,代表机器已经出现问题,故移除
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT);
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// 刷新在线地址(管理员/执行者)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
// 查询在90秒之内有过更新的机器列表
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT);
if (list != null) {
//循环***器列表, 根据执行器不同,将这些机器列表区分拿出来,然后放到集合里面
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appName = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appName);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appName, registryList);
}
}
}
// fresh group address
//遍历执行器列表
for (XxlJobGroup group: groupList) {
// 通过执行器的APP_NAME 拿出他下面的集群机器地址
List<String> registryList = appAddressMap.get(group.getAppName());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
addressListStr = "";
for (String item:registryList) {
addressListStr += item + ",";
}
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
//设置执行器地址列表,多地址逗号分隔(手动录入)
group.setAddressList(addressListStr);
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
//每30s运行一次
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
//设置为守护线程
registryThread.setDaemon(true);
registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
//启动线程
registryThread.start();
}
2.调度任务失败监控助手运行
每10s运行一次, 主要负责处理失败的调度任务重新执行,并且更新调度日志
JobFailMonitorHelper.java
public void start(){
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// 监控
while (!toStop) {
try {
// 从队列中拿出所有失败的 jobLogIds
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// lock log
//alarm_status从0修改为-1(从默认修改为锁定状态) 告警状态:0-默认、-1=锁定状态 1-无需告警、2-告警成功、3-告警失败
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
//没有更新到就跳过此次循环
if (lockRet < 1) {
continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、失败重试监视器的次数如果大于0就继续执行
if (log.getExecutorFailRetryCount() > 0) {
//重新调度任务
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam());
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
//更新XxlJobLog
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、故障警报监视器
int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
//判断报警邮箱是否为空
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
boolean alarmResult = true;
try {
//发送邮件失败警报
alarmResult = failAlarm(info, log);
} catch (Exception e) {
alarmResult = false;
logger.error(e.getMessage(), e);
}
newAlarmStatus = alarmResult?2:3;
} else {
//无需告警
newAlarmStatus = 1;
}
//修改告警状态字段
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
//每10s运行一次
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
monitorThread.start();
}
4.执行调度任务的具体方法
有2个线程,
1.个是执行任务线程,差不多每5s执行一次
2.执行超过5秒需要执行任务的线程(时间轮执行线程),每1秒执行一次
1. 超时未调度(超过调度时间5秒)的任务不再执行,修改下次执行时间。
2. 超过调度时间但未超时(超过5秒之内)的任务,立即放入执行线程池,再修改执行时间,接着判断下次执行时间若在5秒之内,加入timewheel的map后再次修改下次执行时间。
3. 调度时间在未来5秒之内的(预读5s),根据5秒内即将执行的任务的执行时间的秒数,将其放到ringData的map里面去,key为下次执行的时间戳除以1000然后取模60,value为任务id,并根据表达式修改下次执行时间。
时间轮执行线程定时每1秒执行一次
1. 删除并取出当前秒数的list和前一秒的list立即放入执行线程池。(往前取1秒防止前1秒的任务未执行,比如当前秒数是59,如果57的任务执行时间大于1秒,可能58的任务就没有被执行过,所以59秒的时候取58和59的任务防止这种情况)。
JobScheduleHelper.java
public void start(){
// 工作计划线程
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
//调度线程是否停止
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
//查询所有下次执行时间小于当前时间+5s的XxlJobInfo集合
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
//当前时间是否大于jobInfo的下一个触发时间+5s
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
// fresh next 通过cron获取下次执行时间
Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date());
if (nextValidTime != null) {
//最后触发时间set为下次触发时间
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
//set下次触发时间
jobInfo.setTriggerNextTime(nextValidTime.getTime());
} else {
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
}
//当前时间是否大于jobInfo的下一个触发时间,此时已经不会超过触发时间5s了
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
CronExpression cronExpression = new CronExpression(jobInfo.getJobCron());
long nextTime = cronExpression.getNextValidTimeAfter(new Date()).getTime();
// 1、触发
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
logger.debug(">>>>>>>>>>> xxl-job, shecule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextTime);
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerNextTime() - nowTime < PRE_READ_MS) {
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()));
if (nextValidTime != null) {
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextValidTime.getTime());
} else {
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
}
}
} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()));
if (nextValidTime != null) {
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextValidTime.getTime());
} else {
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
}
}
}
// 3、更新触发完后的XxlJobInfo
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException ignore) {
if (!scheduleThreadToStop) {
logger.error(ignore.getMessage(), ignore);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// ring thread
ringThread = new Thread(new Runnable() {
@Override
public void run() {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
while (!ringThreadToStop) {
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData!=null && ringItemData.size()>0) {
// do trigger
for (int jobId: ringItemData) {
// 触发任务
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
// next second, align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
具体的调度方法
JobTriggerPoolHelper.java
/**
* @param jobId
* @param triggerType
* @param failRetryCount
* >=0: use this param
* <0: use param from job info config
* @param executorShardingParam
* @param executorParam
* null: use job param
* not null: cover job param
*/
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
}
/**
* 添加调度器
* @param jobId 任务id
* @param triggerType 调度类型
* @param failRetryCount 失败重试次数
* @param executorShardingParam 任务分片参数
* @param executorParam 任务参数
*/
public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {
// 选择线程池
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// 调度
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
}
});
}
判断是不是分片,如果是分片就循环执行器地址列表(系统注册)一个一个服务器去调用
XxlJobTrigger.java
/**
* trigger job
*
* @param jobId 任务id
* @param triggerType 触发类型
* @param failRetryCount 失败重试次数
* >=0: use this param
* <0: use param from job info config
* @param executorShardingParam 任务分片参数
* @param executorParam 任务参数
* null: use job param
* not null: cover job param
*/
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
// 根据id查询XxlJobInfo
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
//设置任务参数
if (executorParam != null) {
jobInfo.setExecutorParam(executorParam);
}
//失败重试次数
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
//查询执行器
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
// 分片参数
int[] shardingParam = null;
if (executorShardingParam!=null){
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
//判断执行器路由策略是不是分片广播
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
//获取多个执行去,循环去执行
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
} else {
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
//执行
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
}
XxlJobTrigger.java
/**
* @param group job group, registry list may be empty
* @param jobInfo
* @param finalFailRetryCount
* @param triggerType
* @param index sharding index
* @param total sharding index
*/
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// 获取阻塞处理策略,默认是串行
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
// 获取执行器路由策略,默认是null
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
//分片参数
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
// 1、保存XxlJobLog日志
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、初始化触发器参数
TriggerParam triggerParam = new TriggerParam();
//任务id
triggerParam.setJobId(jobInfo.getId());
//执行器,任务Handler名称
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
//执行器,任务参数
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
// 阻塞处理策略
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
// 任务执行超时时间,单位秒
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
//日志id
triggerParam.setLogId(jobLog.getId());
//日志保存时间
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
// GLUE类型 #com.xxl.job.core.glue.GlueTypeEnum
triggerParam.setGlueType(jobInfo.getGlueType());
// GLUE源代码
triggerParam.setGlueSource(jobInfo.getGlueSource());
// GLUE更新时间
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
//设置执行器列表的索引
triggerParam.setBroadcastIndex(index);
//设置执行器列表的总数
triggerParam.setBroadcastTotal(total);
// 3、初始化地址
String address = null;
ReturnT<String> routeAddressResult = null;
//判断执行器地址列表是否为空
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
//判断是否分片策略
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
//获取执行器地址
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4、触发远程执行器
ReturnT<String> triggerResult = null;
if (address != null) {
//执行
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
// 5、收集触发器信息
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
if (shardingParam != null) {
triggerMsgSb.append("("+shardingParam+")");
}
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
.append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
// 6、保存日志触发器信息
jobLog.setExecutorAddress(address);
//执行器任务Handler名称
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
//执行器任务参数
jobLog.setExecutorParam(jobInfo.getExecutorParam());
//分片参数
jobLog.setExecutorShardingParam(shardingParam);
//最终失败重试计数
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
//jobLog.setTriggerTime();
//触发器结果的状态码
jobLog.setTriggerCode(triggerResult.getCode());
//触发器信息
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
XxlJobTrigger.java
/**
* 运行执行器
* @param triggerParam
* @param address
* @return
*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
1.先加载线程中的执行器,然后和缓存的执行器做比较,如果不一样就以缓存的执行器为主,缓存的执行器就是 执行器服务初始化的时候放入的
2.然后判断运行模式,校验执行器,执行器阻止策略
3.然后把TriggerParam数据推送到triggerQueue队列
并且通过工作线程jobThread实时去监控队列,取出来执行
4.如果jobThread工作线程为空就去创建一个工作线程,并且缓存到jobThreadRepository的map里面,key为调度任务的id并启动工作线程
ExecutorBizImpl.java
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
//判断运行模式是不是BEAN
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// 去缓冲中根据执行器名字获取一个实例
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// 校验执行器
if (jobThread!=null && jobHandler != newJobHandler) {
// 如果执行器不一致,就把旧的jobThread和jobHandler置空
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// 校验执行器
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change script or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
}
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// 执行器阻止策略
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// 串行执行
}
}
// replace thread (new or exists invalid)
//替换掉工作线程
if (jobThread == null) {
//如果为空就注册一个工作线程
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// 数据推送到队列
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
工作线程的run方法
执行调度任务的工作线程
实时去取triggerQueue队列里面的调度任务,然后执行
循环30次并且队列没数据就清空一次jobThreadRepository
JobThread.java
@Override
public void run() {
// init
try {
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// execute
while(!toStop){
running = false;
//空闲时间
idleTimes++;
TriggerParam triggerParam = null;
ReturnT<String> executeResult = null;
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
//获取队列里的任务,设置3秒钟超时
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
// 日志路径名称, 比如 "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName);
//设置分片索引和总数
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
//判断是否调度超时
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
Thread futureThread = null;
try {
final TriggerParam triggerParamTmp = triggerParam;
FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
@Override
public ReturnT<String> call() throws Exception {
return handler.execute(triggerParamTmp.getExecutorParams());
}
});
futureThread = new Thread(futureTask);
futureThread.start();
//调度结果
executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
XxlJobLogger.log(e);
executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
} finally {
futureThread.interrupt();
}
} else {
// 仅仅执行任务
executeResult = handler.execute(triggerParam.getExecutorParams());
}
if (executeResult == null) {
executeResult = IJobHandler.FAIL;
} else {
executeResult.setMsg(
(executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
?executeResult.getMsg().substring(0, 50000).concat("...")
:executeResult.getMsg());
executeResult.setContent(null); // limit obj size
}
XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
} else {
//超过一定次数,清空线程,并设置JobThread的stop停止标识位,终止轮询。也就是30秒空轮询
if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
}
} catch (Throwable e) {
if (toStop) {
XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
}
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
} finally {
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running,killed]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
}
}
}
}
// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
}
}
// 销毁
try {
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}