欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

XxlJob源码解析

程序员文章站 2022-06-20 20:30:00
...

git地址:https://github.com/xuxueli/xxl-job

版本:v2.1.1

执行器初始化流程

在容器初始化好xxlJobExecutor后会执行其initMethod,也就是start方法

@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.executor.appname}")
    private String appName;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean(initMethod = "start", destroyMethod = "destroy")
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppName(appName);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

    /**
     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
     *
     *      1、引入依赖:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>${version}</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器启动变量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、获取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */


}

 

这是start方法的一些启动步骤

启动分为3步,

1.初始化作业执行器,并且全部缓存起来

 

初始化容器作业,就是从spring上下文中获取到使用了所有使用了@JobHandler注解的bean,然后根据这个注解的value作为key,实例作为value缓存到名为jobHandlerRepository的map中

 

2.刷新GLUE工厂

 

3.调用父类启动方法

 

XxlJobSpringExecutor.java
@Override
public void start() throws Exception {

    // 1.初始化作业执行器,并且全部缓存起来
    initJobHandlerRepository(applicationContext);

    // 2.刷新GLUE工厂
    GlueFactory.refreshInstance(1);


    // 3.调用父类启动方法
    super.start();
}

private void initJobHandlerRepository(ApplicationContext applicationContext){
    if (applicationContext == null) {
        return;
    }

    // init job handler action
    //获得所有用到了@JobHandler注解的bean
    Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);

    if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
        for (Object serviceBean : serviceBeanMap.values()) {
            if (serviceBean instanceof IJobHandler){
                String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
                IJobHandler handler = (IJobHandler) serviceBean;
                if (loadJobHandler(name) != null) {
                    throw new RuntimeException("xxl-job jobhandler["+ name +"] naming conflicts.");
                }
                //把所有执行器bean缓存起来
                registJobHandler(name, handler);
            }
        }
    }
}

把加了@JobHandler注解的bean全部缓存到jobHandlerRepository中

 

XxlJobExecutor.java
// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
    logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
    return jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name){
    return jobHandlerRepository.get(name);
}

下面就是父类的start方法的一个流程

1.会先初始化本地日志路径,

2.初始化调度中心的地址列表,创建好adminBiz实例,调度中心客户端

3.启动日志文件清理的线程

4.启动回调子任务的线程

5.初始化执行服务器

6.初始化Rpc提供程序

下面详细看看这个第4步

XxlJobExecutor.java
/**
 * ---------------------- start + stop ----------------------
 * @throws Exception
 */
public void start() throws Exception {

    // 1.init logpath 初始化本地日志路径
    XxlJobFileAppender.initLogPath(logPath);

    // 初始化调用者,管理客户端
    // 2.初始化调度中心的地址列表,创建好adminBiz实例,调度中心客户端
    initAdminBizList(adminAddresses, accessToken);


    //3. 初始化作业日志文件清理线程
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // 4.初始化触发回调线程,执行子任务
    TriggerCallbackThread.getInstance().start();

    // 5.初始化执行服务器
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
    //6.初始化Rpc提供程序
    initRpcProvider(ip, port, appName, accessToken);
}

这边主要启动了2个线程,并且都设置为了守护线程,就是说这2个线程不会影响jvm的退出

1个线程是实时去消费阻塞队列里面的回调参数,另一个线程主要是负责运行重试回调失败的调度任务,30秒执行一次 

 

TriggerCallbackThread.java
public void start() {

    // valid
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
        return;
    }

    // callback
    triggerCallbackThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // 正常回调
            while(!toStop){
                try {
                    //这里采用了阻塞队列,可以看出,当服务中心发送任务到此队列,就会被消费
                    HandleCallbackParam callback = getInstance().callBackQueue.take();
                    if (callback != null) {

                        // 回调参数集合
                        List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                        int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                        callbackParamList.add(callback);

                        // 回调,如果发生错误将把错误写入日志文件然后重试
                        if (callbackParamList!=null && callbackParamList.size()>0) {
                            doCallback(callbackParamList);
                        }
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }

            // 最后的回调
            try {
                List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                if (callbackParamList!=null && callbackParamList.size()>0) {
                    doCallback(callbackParamList);
                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");

        }
    });
    triggerCallbackThread.setDaemon(true);
    triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
    triggerCallbackThread.start();


    // 重试
    triggerRetryCallbackThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while(!toStop){
                try {
                    //重试失败回调文件
                    retryFailCallbackFile();
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }
                try {
                    //每30s执行一次
                    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
        }
    });
    triggerRetryCallbackThread.setDaemon(true);
    //调用重试回调线程
    triggerRetryCallbackThread.start();

}
public void toStop(){
    toStop = true;
    // stop callback, interrupt and wait
    if (triggerCallbackThread != null) {    // support empty admin address
        triggerCallbackThread.interrupt();
        try {
            triggerCallbackThread.join();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }

    // stop retry, interrupt and wait
    if (triggerRetryCallbackThread != null) {
        triggerRetryCallbackThread.interrupt();
        try {
            triggerRetryCallbackThread.join();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }

}

/**
 * 进行回调,如果发生错误将重试
 * @param callbackParamList
 */
private void doCallback(List<HandleCallbackParam> callbackParamList){
    boolean callbackRet = false;
    // callback, will retry if error
    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
        try {
            ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
            if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
                callbackRet = true;
                break;
            } else {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
            }
        } catch (Exception e) {
            callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
        }
    }
    //如果回调失败,就写入失败回调文件,错误日志回调线程会继续重试
    if (!callbackRet) {
        //写入失败回调文件
        appendFailCallbackFile(callbackParamList);
    }
}
TriggerCallbackThread.java
/**
 * 进行回调,如果发生错误将重试
 * @param callbackParamList
 */
private void doCallback(List<HandleCallbackParam> callbackParamList){
    boolean callbackRet = false;
    // 回调,如果出错将重试
    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
        try {
            ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
            if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
                callbackRet = true;
                break;
            } else {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
            }
        } catch (Exception e) {
            callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
        }
    }
    //如果回调失败,就写入失败回调文件,错误日志回调线程会继续重试
    if (!callbackRet) {
        //写入失败回调文件
        appendFailCallbackFile(callbackParamList);
    }
}
AdminBizImpl.java
/**
 * 回调
 * @param callbackParamList 回调参数集合
 * @return
 */
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
    for (HandleCallbackParam handleCallbackParam: callbackParamList) {
        ReturnT<String> callbackResult = callback(handleCallbackParam);
        logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
                (callbackResult.getCode()==IJobHandler.SUCCESS.getCode()?"success":"fail"), handleCallbackParam, callbackResult);
    }

    return ReturnT.SUCCESS;
}

 

循环调度子任务,并保存日志

AdminBizImpl.java
private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
    // valid log item
    XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId());
    if (log == null) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
    }
    if (log.getHandleCode() > 0) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback.");     // avoid repeat callback, trigger child job etc
    }

    // 调度子任务
    String callbackMsg = null;
    if (IJobHandler.SUCCESS.getCode() == handleCallbackParam.getExecuteResult().getCode()) {
        //根据jodId查询XxlJobInfo
        XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId());
        if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) {
            callbackMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>";
            //获得子任务id的数组
            String[] childJobIds = xxlJobInfo.getChildJobId().split(",");
            for (int i = 0; i < childJobIds.length; i++) {
                //校验
                int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1;
                if (childJobId > 0) {
                    //调度子任务
                    JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null);
                    ReturnT<String> triggerChildResult = ReturnT.SUCCESS;

                    // add msg
                    callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"),
                            (i+1),
                            childJobIds.length,
                            childJobIds[i],
                            (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")),
                            triggerChildResult.getMsg());
                } else {
                    callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"),
                            (i+1),
                            childJobIds.length,
                            childJobIds[i]);
                }
            }

        }
    }

    // 处理msg
    StringBuffer handleMsg = new StringBuffer();
    if (log.getHandleMsg()!=null) {
        handleMsg.append(log.getHandleMsg()).append("<br>");
    }
    if (handleCallbackParam.getExecuteResult().getMsg() != null) {
        handleMsg.append(handleCallbackParam.getExecuteResult().getMsg());
    }
    if (callbackMsg != null) {
        handleMsg.append(callbackMsg);
    }

    // 成功, 保存日志
    log.setHandleTime(new Date());
    log.setHandleCode(handleCallbackParam.getExecuteResult().getCode());
    log.setHandleMsg(handleMsg.toString());
    xxlJobLogDao.updateHandleInfo(log);

    return ReturnT.SUCCESS;
}

然后看看第6步

6.初始化Rpc提供程序

XxlJobExecutor.java
/**
 * 初始化rpc提供者
 * @param ip
 * @param port
 * @param appName
 * @param accessToken
 * @throws Exception
 */
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {

    // init, provider factory
    String address = IpUtil.getIpPort(ip, port);
    Map<String, String> serviceRegistryParam = new HashMap<String, String>();
    serviceRegistryParam.put("appName", appName);
    serviceRegistryParam.put("address", address);

    xxlRpcProviderFactory = new XxlRpcProviderFactory();
    xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);

    // 添加服务
    xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());

    // 启动Rpc提供工厂
    xxlRpcProviderFactory.start();

}

这里会调用XxlJobExecutor的注册方法

XxlRpcProviderFactory.java
public void start() throws Exception {
    this.serviceAddress = IpUtil.getIpPort(this.ip, this.port);
    this.server = (Server)this.netType.serverClass.newInstance();
    this.server.setStartedCallback(new BaseCallback() {
        public void run() throws Exception {
            if (XxlRpcProviderFactory.this.serviceRegistryClass != null) {
                XxlRpcProviderFactory.this.serviceRegistry = (ServiceRegistry)XxlRpcProviderFactory.this.serviceRegistryClass.newInstance();
                XxlRpcProviderFactory.this.serviceRegistry.start(XxlRpcProviderFactory.this.serviceRegistryParam);
                if (XxlRpcProviderFactory.this.serviceData.size() > 0) {
                    XxlRpcProviderFactory.this.serviceRegistry.registry(XxlRpcProviderFactory.this.serviceData.keySet(), XxlRpcProviderFactory.this.serviceAddress);
                }
            }

        }
    });
    this.server.setStopedCallback(new BaseCallback() {
        public void run() {
            if (XxlRpcProviderFactory.this.serviceRegistry != null) {
                if (XxlRpcProviderFactory.this.serviceData.size() > 0) {
                    XxlRpcProviderFactory.this.serviceRegistry.remove(XxlRpcProviderFactory.this.serviceData.keySet(), XxlRpcProviderFactory.this.serviceAddress);
                }

                XxlRpcProviderFactory.this.serviceRegistry.stop();
                XxlRpcProviderFactory.this.serviceRegistry = null;
            }

        }
    });
    this.server.start(this);
}

调用任务的注册和删除

XxlJobExecutor.java
//执行器启动初始化的时候回调用这里
public static class ExecutorServiceRegistry extends ServiceRegistry {

    @Override
    public void start(Map<String, String> param) {
        // 启动注册
        ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
    }
    @Override
    public void stop() {
        // 停止注册
        ExecutorRegistryThread.getInstance().toStop();
    }

    @Override
    public boolean registry(Set<String> keys, String value) {
        return false;
    }
    @Override
    public boolean remove(Set<String> keys, String value) {
        return false;
    }
    @Override
    public Map<String, TreeSet<String>> discovery(Set<String> keys) {
        return null;
    }
    @Override
    public TreeSet<String> discovery(String key) {
        return null;
    }

}

执行器注册表线程,具体注册方法,

启动一个ExecutorRegistryThread守护线程,每30秒去注册或更新一次

ExecutorRegistryThread.java
public void start(final String appName, final String address){

    // valid
    if (appName==null || appName.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appName is null.");
        return;
    }
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
        return;
    }

    registryThread = new Thread(new Runnable() {
        @Override
        public void run() {

            // registry
            while (!toStop) {
                try {
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {
                            ReturnT<String> registryResult = adminBiz.registry(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                        }

                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }

                try {
                    if (!toStop) {
                        //30秒注册一次
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    }
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                    }
                }
            }

            // registry remove
            try {
                RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
                for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                    try {
                        ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                        if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                            registryResult = ReturnT.SUCCESS;
                            logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            break;
                        } else {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                        }

                    }

                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");

        }
    });
    registryThread.setDaemon(true);
    registryThread.setName("xxl-job, executor ExecutorRegistryThread");
    registryThread.start();
}

调度器启动流程

 

/**
 * @author xuxueli 2018-10-28 00:18:17
 */
@Component
//xxlJobAdminConfig先被初始化
@DependsOn("xxlJobAdminConfig")
public class XxlJobScheduler implements InitializingBean, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);


    @Override
    public void afterPropertiesSet() throws Exception {
        // init i18n
        initI18n();

        /**
         * 1.调度任务注册监控助手运行,每30s运行一次
         * 主要监听90秒之内没有更新信息的***器删除掉
         * 然后查询90s以内有更新的机器列表
         * 并且把这些机器的最新ip更新到XxlJobGroup表
         * 多个地址以逗号分隔
         */
        JobRegistryMonitorHelper.getInstance().start();

        /**
         * 2.调度任务失败监控助手运行,每10s运行一次
         * 主要负责处理失败的调度任务重新执行,并且更新调度日志
         */
        JobFailMonitorHelper.getInstance().start();

        // 3.初始化Rpc提供者
        initRpcProvider();

        // 4.执行调度任务
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }
}

调度任务注册监控助手运行

JobRegistryMonitorHelper.java
public void start(){
   registryThread = new Thread(new Runnable() {
      @Override
      public void run() {
         while (!toStop) {
            try {
               // auto registry group
               //执行器地址类型:0=自动注册、1=手动录入  查询所有类型为自动注册的执行器
               List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
               if (groupList!=null && !groupList.isEmpty()) {

                  // remove dead address (admin/executor)
                  //删除无效地址(管理员/执行者)
                  //删除 90秒之内没有更新信息的***器, 90秒没有心跳信息返回,代表机器已经出现问题,故移除
                  List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT);
                  if (ids!=null && ids.size()>0) {
                     XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
                  }

                  // 刷新在线地址(管理员/执行者)
                  HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                  // 查询在90秒之内有过更新的机器列表
                  List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT);
                  if (list != null) {
                     //循环***器列表,  根据执行器不同,将这些机器列表区分拿出来,然后放到集合里面
                     for (XxlJobRegistry item: list) {
                        if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                           String appName = item.getRegistryKey();
                           List<String> registryList = appAddressMap.get(appName);
                           if (registryList == null) {
                              registryList = new ArrayList<String>();
                           }

                           if (!registryList.contains(item.getRegistryValue())) {
                              registryList.add(item.getRegistryValue());
                           }
                           appAddressMap.put(appName, registryList);
                        }
                     }
                  }

                  // fresh group address
                  //遍历执行器列表
                  for (XxlJobGroup group: groupList) {
                     // 通过执行器的APP_NAME  拿出他下面的集群机器地址
                     List<String> registryList = appAddressMap.get(group.getAppName());
                     String addressListStr = null;
                     if (registryList!=null && !registryList.isEmpty()) {
                        Collections.sort(registryList);
                        addressListStr = "";
                        for (String item:registryList) {
                           addressListStr += item + ",";
                        }
                        addressListStr = addressListStr.substring(0, addressListStr.length()-1);
                     }
                     //设置执行器地址列表,多地址逗号分隔(手动录入)
                     group.setAddressList(addressListStr);
                     XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
                  }
               }
            } catch (Exception e) {
               if (!toStop) {
                  logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
               }
            }
            try {
               //每30s运行一次
               TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
            } catch (InterruptedException e) {
               if (!toStop) {
                  logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
               }
            }
         }
         logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
      }
   });
   //设置为守护线程
   registryThread.setDaemon(true);
   registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
   //启动线程
   registryThread.start();
}

2.调度任务失败监控助手运行

每10s运行一次, 主要负责处理失败的调度任务重新执行,并且更新调度日志

JobFailMonitorHelper.java
public void start(){
   monitorThread = new Thread(new Runnable() {

      @Override
      public void run() {

         // 监控
         while (!toStop) {
            try {
               // 从队列中拿出所有失败的 jobLogIds
               List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
               if (failLogIds!=null && !failLogIds.isEmpty()) {
                  for (long failLogId: failLogIds) {

                     // lock log
                     //alarm_status从0修改为-1(从默认修改为锁定状态) 告警状态:0-默认、-1=锁定状态 1-无需告警、2-告警成功、3-告警失败
                     int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
                     //没有更新到就跳过此次循环
                     if (lockRet < 1) {
                        continue;
                     }
                     XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
                     XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

                     // 1、失败重试监视器的次数如果大于0就继续执行
                     if (log.getExecutorFailRetryCount() > 0) {
                        //重新调度任务
                        JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam());
                        String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
                        log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
                        //更新XxlJobLog
                        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
                     }

                     // 2、故障警报监视器
                     int newAlarmStatus = 0;       // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
                     //判断报警邮箱是否为空
                     if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
                        boolean alarmResult = true;
                        try {
                           //发送邮件失败警报
                           alarmResult = failAlarm(info, log);
                        } catch (Exception e) {
                           alarmResult = false;
                           logger.error(e.getMessage(), e);
                        }
                        newAlarmStatus = alarmResult?2:3;
                     } else {
                        //无需告警
                        newAlarmStatus = 1;
                     }
                     //修改告警状态字段
                     XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
                  }
               //每10s运行一次
               TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
               if (!toStop) {
                  logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
               }
            }
         }

         logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

      }
   });
   monitorThread.setDaemon(true);
   monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
   monitorThread.start();
}

4.执行调度任务的具体方法

 

有2个线程,

1.个是执行任务线程,差不多每5s执行一次

2.执行超过5秒需要执行任务的线程(时间轮执行线程),每1秒执行一次

 

 

1. 超时未调度(超过调度时间5秒)的任务不再执行,修改下次执行时间。

 

2. 超过调度时间但未超时(超过5秒之内)的任务,立即放入执行线程池,再修改执行时间,接着判断下次执行时间若在5秒之内,加入timewheel的map后再次修改下次执行时间。

 

3. 调度时间在未来5秒之内的(预读5s),根据5秒内即将执行的任务的执行时间的秒数,将其放到ringData的map里面去,key为下次执行的时间戳除以1000然后取模60,value为任务id,并根据表达式修改下次执行时间。

 

时间轮执行线程定时每1秒执行一次

 

1. 删除并取出当前秒数的list和前一秒的list立即放入执行线程池。(往前取1秒防止前1秒的任务未执行,比如当前秒数是59,如果57的任务执行时间大于1秒,可能58的任务就没有被执行过,所以59秒的时候取58和59的任务防止这种情况)。

JobScheduleHelper.java
public void start(){

    // 工作计划线程
    scheduleThread = new Thread(new Runnable() {
        @Override
        public void run() {

            try {
                TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
            } catch (InterruptedException e) {
                if (!scheduleThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
            //调度线程是否停止
            while (!scheduleThreadToStop) {

                // Scan Job
                long start = System.currentTimeMillis();

                Connection conn = null;
                Boolean connAutoCommit = null;
                PreparedStatement preparedStatement = null;

                boolean preReadSuc = true;
                try {

                    conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                    connAutoCommit = conn.getAutoCommit();
                    conn.setAutoCommit(false);

                    preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                    preparedStatement.execute();

                    // tx start

                    // 1、pre read
                    long nowTime = System.currentTimeMillis();
                    //查询所有下次执行时间小于当前时间+5s的XxlJobInfo集合
                    List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS);
                    if (scheduleList!=null && scheduleList.size()>0) {
                        // 2、push time-ring
                        for (XxlJobInfo jobInfo: scheduleList) {

                            // time-ring jump
                            //当前时间是否大于jobInfo的下一个触发时间+5s
                            if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                // 2.1、trigger-expire > 5s:pass && make next-trigger-time

                                // fresh next  通过cron获取下次执行时间
                                Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date());
                                if (nextValidTime != null) {
                                    //最后触发时间set为下次触发时间
                                    jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                    //set下次触发时间
                                    jobInfo.setTriggerNextTime(nextValidTime.getTime());
                                } else {
                                    jobInfo.setTriggerStatus(0);
                                    jobInfo.setTriggerLastTime(0);
                                    jobInfo.setTriggerNextTime(0);
                                }
                            //当前时间是否大于jobInfo的下一个触发时间,此时已经不会超过触发时间5s了
                            } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                CronExpression cronExpression = new CronExpression(jobInfo.getJobCron());
                                long nextTime = cronExpression.getNextValidTimeAfter(new Date()).getTime();

                                // 1、触发
                                JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
                                logger.debug(">>>>>>>>>>> xxl-job, shecule push trigger : jobId = " + jobInfo.getId() );

                                // 2、fresh next
                                jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                jobInfo.setTriggerNextTime(nextTime);


                                // next-trigger-time in 5s, pre-read again
                                if (jobInfo.getTriggerNextTime() - nowTime < PRE_READ_MS) {

                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3、fresh next
                                    Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()));
                                    if (nextValidTime != null) {
                                        jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                        jobInfo.setTriggerNextTime(nextValidTime.getTime());
                                    } else {
                                        jobInfo.setTriggerStatus(0);
                                        jobInfo.setTriggerLastTime(0);
                                        jobInfo.setTriggerNextTime(0);
                                    }

                                }

                            } else {
                                // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                                // 1、make ring second
                                int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                // 2、push time ring
                                pushTimeRing(ringSecond, jobInfo.getId());

                                // 3、fresh next
                                Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()));
                                if (nextValidTime != null) {
                                    jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                    jobInfo.setTriggerNextTime(nextValidTime.getTime());
                                } else {
                                    jobInfo.setTriggerStatus(0);
                                    jobInfo.setTriggerLastTime(0);
                                    jobInfo.setTriggerNextTime(0);
                                }

                            }

                        }

                        // 3、更新触发完后的XxlJobInfo
                        for (XxlJobInfo jobInfo: scheduleList) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                        }

                    } else {
                        preReadSuc = false;
                    }

                    // tx stop


                } catch (Exception e) {
                    if (!scheduleThreadToStop) {
                        logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                    }
                } finally {

                    // commit
                    if (conn != null) {
                        try {
                            conn.commit();
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                        try {
                            conn.setAutoCommit(connAutoCommit);
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                        try {
                            conn.close();
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }

                    // close PreparedStatement
                    if (null != preparedStatement) {
                        try {
                            preparedStatement.close();
                        } catch (SQLException ignore) {
                            if (!scheduleThreadToStop) {
                                logger.error(ignore.getMessage(), ignore);
                            }
                        }
                    }
                }
                long cost = System.currentTimeMillis()-start;


                // Wait seconds, align second
                if (cost < 1000) {  // scan-overtime, not wait
                    try {
                        // pre-read period: success > scan each second; fail > skip this period;
                        TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                    } catch (InterruptedException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }

            }

            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
        }
    });
    scheduleThread.setDaemon(true);
    scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
    scheduleThread.start();


    // ring thread
    ringThread = new Thread(new Runnable() {
        @Override
        public void run() {

            // align second
            try {
                TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
            } catch (InterruptedException e) {
                if (!ringThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }

            while (!ringThreadToStop) {

                try {
                    // second data
                    List<Integer> ringItemData = new ArrayList<>();
                    int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                    for (int i = 0; i < 2; i++) {
                        List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                        if (tmpData != null) {
                            ringItemData.addAll(tmpData);
                        }
                    }

                    // ring trigger
                    logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                    if (ringItemData!=null && ringItemData.size()>0) {
                        // do trigger
                        for (int jobId: ringItemData) {
                            // 触发任务
                            JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
                        }
                        // clear
                        ringItemData.clear();
                    }
                } catch (Exception e) {
                    if (!ringThreadToStop) {
                        logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                    }
                }

                // next second, align second
                try {
                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
                } catch (InterruptedException e) {
                    if (!ringThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
        }
    });
    ringThread.setDaemon(true);
    ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
    ringThread.start();
}

具体的调度方法

JobTriggerPoolHelper.java
/**
 * @param jobId
 * @param triggerType
 * @param failRetryCount
 *           >=0: use this param
 *           <0: use param from job info config
 * @param executorShardingParam
 * @param executorParam
 *          null: use job param
 *          not null: cover job param
 */
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
    helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
}

 

/**
 * 添加调度器
 * @param jobId 任务id
 * @param triggerType 调度类型
 * @param failRetryCount 失败重试次数
 * @param executorShardingParam 任务分片参数
 * @param executorParam 任务参数
 */
public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {

    // 选择线程池
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }

    // 调度
    triggerPool_.execute(new Runnable() {
        @Override
        public void run() {

            long start = System.currentTimeMillis();

            try {
                // do trigger
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {

                // check timeout-count-map
                long minTim_now = System.currentTimeMillis()/60000;
                if (minTim != minTim_now) {
                    minTim = minTim_now;
                    jobTimeoutCountMap.clear();
                }

                // incr timeout-count-map
                long cost = System.currentTimeMillis()-start;
                if (cost > 500) {       // ob-timeout threshold 500ms
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if (timeoutCount != null) {
                        timeoutCount.incrementAndGet();
                    }
                }

            }

        }
    });
}

判断是不是分片,如果是分片就循环执行器地址列表(系统注册)一个一个服务器去调用

XxlJobTrigger.java
/**
 * trigger job
 *
 * @param jobId 任务id
 * @param triggerType 触发类型
 * @param failRetryCount 失败重试次数
 *           >=0: use this param
 *           <0: use param from job info config
 * @param executorShardingParam 任务分片参数
 * @param executorParam 任务参数
 *          null: use job param
 *          not null: cover job param
 */
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
    // 根据id查询XxlJobInfo
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    //设置任务参数
    if (executorParam != null) {
        jobInfo.setExecutorParam(executorParam);
    }
    //失败重试次数
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    //查询执行器
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

    // 分片参数
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    //判断执行器路由策略是不是分片广播
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
            && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        //获取多个执行去,循环去执行
        for (int i = 0; i < group.getRegistryList().size(); i++) {
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else {
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        //执行
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }

}
XxlJobTrigger.java
/**
 * @param group                     job group, registry list may be empty
 * @param jobInfo
 * @param finalFailRetryCount
 * @param triggerType
 * @param index                     sharding index
 * @param total                     sharding index
 */
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

    // 获取阻塞处理策略,默认是串行
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
    // 获取执行器路由策略,默认是null
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    //分片参数
    String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

    // 1、保存XxlJobLog日志
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setJobGroup(jobInfo.getJobGroup());
    jobLog.setJobId(jobInfo.getId());
    jobLog.setTriggerTime(new Date());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

    // 2、初始化触发器参数
    TriggerParam triggerParam = new TriggerParam();
    //任务id
    triggerParam.setJobId(jobInfo.getId());
    //执行器,任务Handler名称
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    //执行器,任务参数
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    // 阻塞处理策略
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    // 任务执行超时时间,单位秒
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    //日志id
    triggerParam.setLogId(jobLog.getId());
    //日志保存时间
    triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
    // GLUE类型  #com.xxl.job.core.glue.GlueTypeEnum
    triggerParam.setGlueType(jobInfo.getGlueType());
    // GLUE源代码
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    // GLUE更新时间
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    //设置执行器列表的索引
    triggerParam.setBroadcastIndex(index);
    //设置执行器列表的总数
    triggerParam.setBroadcastTotal(total);

    // 3、初始化地址
    String address = null;
    ReturnT<String> routeAddressResult = null;
    //判断执行器地址列表是否为空
    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
        //判断是否分片策略
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
            if (index < group.getRegistryList().size()) {
                //获取执行器地址
                address = group.getRegistryList().get(index);
            } else {
                address = group.getRegistryList().get(0);
            }
        } else {
            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
            if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                address = routeAddressResult.getContent();
            }
        }
    } else {
        routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
    }

    // 4、触发远程执行器
    ReturnT<String> triggerResult = null;
    if (address != null) {
        //执行
        triggerResult = runExecutor(triggerParam, address);
    } else {
        triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
    }

    // 5、收集触发器信息
    StringBuffer triggerMsgSb = new StringBuffer();
    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
            .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
    if (shardingParam != null) {
        triggerMsgSb.append("("+shardingParam+")");
    }
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

    triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
            .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

    // 6、保存日志触发器信息
    jobLog.setExecutorAddress(address);
    //执行器任务Handler名称
    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
    //执行器任务参数
    jobLog.setExecutorParam(jobInfo.getExecutorParam());
    //分片参数
    jobLog.setExecutorShardingParam(shardingParam);
    //最终失败重试计数
    jobLog.setExecutorFailRetryCount(finalFailRetryCount);
    //jobLog.setTriggerTime();
    //触发器结果的状态码
    jobLog.setTriggerCode(triggerResult.getCode());
    //触发器信息
    jobLog.setTriggerMsg(triggerMsgSb.toString());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

    logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
XxlJobTrigger.java
/**
 * 运行执行器
 * @param triggerParam
 * @param address
 * @return
 */
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}

1.先加载线程中的执行器,然后和缓存的执行器做比较,如果不一样就以缓存的执行器为主,缓存的执行器就是 执行器服务初始化的时候放入的

2.然后判断运行模式,校验执行器,执行器阻止策略

3.然后把TriggerParam数据推送到triggerQueue队列

并且通过工作线程jobThread实时去监控队列,取出来执行

4.如果jobThread工作线程为空就去创建一个工作线程,并且缓存到jobThreadRepository的map里面,key为调度任务的id并启动工作线程

ExecutorBizImpl.java
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // valid:jobHandler + jobThread
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    //判断运行模式是不是BEAN
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // 去缓冲中根据执行器名字获取一个实例
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

        // 校验执行器
        if (jobThread!=null && jobHandler != newJobHandler) {
            // 如果执行器不一致,就把旧的jobThread和jobHandler置空
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // 校验执行器
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof GlueJobHandler
                    && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change handler or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            try {
                IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
            }
        }
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof ScriptJobHandler
                        && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change script or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
        }
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    // 执行器阻止策略
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // 串行执行
        }
    }

    // replace thread (new or exists invalid)
    //替换掉工作线程
    if (jobThread == null) {
        //如果为空就注册一个工作线程
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // 数据推送到队列
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

工作线程的run方法

执行调度任务的工作线程

实时去取triggerQueue队列里面的调度任务,然后执行

循环30次并且队列没数据就清空一次jobThreadRepository

JobThread.java
@Override
public void run() {

       // init
       try {
      handler.init();
   } catch (Throwable e) {
          logger.error(e.getMessage(), e);
   }

   // execute
   while(!toStop){
      running = false;
      //空闲时间
      idleTimes++;

           TriggerParam triggerParam = null;
           ReturnT<String> executeResult = null;
           try {
         // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
         //获取队列里的任务,设置3秒钟超时
         triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
         if (triggerParam!=null) {
            running = true;
            idleTimes = 0;
            triggerLogIdSet.remove(triggerParam.getLogId());

            // 日志路径名称, 比如 "logPath/yyyy-MM-dd/9999.log"
            String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
            XxlJobFileAppender.contextHolder.set(logFileName);
            //设置分片索引和总数
            ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));

            XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
            //判断是否调度超时
            if (triggerParam.getExecutorTimeout() > 0) {
               // limit timeout
               Thread futureThread = null;
               try {
                  final TriggerParam triggerParamTmp = triggerParam;
                  FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
                     @Override
                     public ReturnT<String> call() throws Exception {
                        return handler.execute(triggerParamTmp.getExecutorParams());
                     }
                  });
                  futureThread = new Thread(futureTask);
                  futureThread.start();
                  //调度结果
                  executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
               } catch (TimeoutException e) {

                  XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
                  XxlJobLogger.log(e);

                  executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
               } finally {
                  futureThread.interrupt();
               }
            } else {
               // 仅仅执行任务
               executeResult = handler.execute(triggerParam.getExecutorParams());
            }

            if (executeResult == null) {
               executeResult = IJobHandler.FAIL;
            } else {
               executeResult.setMsg(
                     (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
                           ?executeResult.getMsg().substring(0, 50000).concat("...")
                           :executeResult.getMsg());
               executeResult.setContent(null);    // limit obj size
            }
            XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);

         } else {
            //超过一定次数,清空线程,并设置JobThread的stop停止标识位,终止轮询。也就是30秒空轮询
            if (idleTimes > 30) {
               if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
                  XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
               }
            }
         }
      } catch (Throwable e) {
         if (toStop) {
            XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
         }

         StringWriter stringWriter = new StringWriter();
         e.printStackTrace(new PrintWriter(stringWriter));
         String errorMsg = stringWriter.toString();
         executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);

         XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
      } finally {
               if(triggerParam != null) {
                   // callback handler info
                   if (!toStop) {
                       // commonm
                       TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
                   } else {
                       // is killed
                       ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running,killed]");
                       TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
                   }
               }
           }
       }

   // callback trigger request in queue
   while(triggerQueue !=null && triggerQueue.size()>0){
      TriggerParam triggerParam = triggerQueue.poll();
      if (triggerParam!=null) {
         // is killed
         ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
         TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
      }
   }

   // 销毁
   try {
      handler.destroy();
   } catch (Throwable e) {
      logger.error(e.getMessage(), e);
   }

   logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

相关标签: 源码解析