欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

XXL-Job启动源码详解

程序员文章站 2022-07-10 20:50:32
从官方将源码下载下来,当前发布版本为2.2.0,该版本下载的为最新版本2.2.1,。代码相差不大,接下来对xxl-job进行一步一步拆解github地址 https://github.com/xuxueli/xxl-job码云地址 https://gitee.com/xuxueli0323/xxl-job该框架有如下优点1、使用层面简单、好用,源码易读,修改简单2、功能层面1、有可视化界面进行操作,可以集中化管理任务2、通过可视化可以对任务的管理,执行,调度,任务......

目录

1、xxl目录结构

1.1 xxl-job-admin 

1.2 xxl-job-core

1.3 xxl-job-executor-samples

2、架构设计

2.1  整体架构设计

2.2 服务集成

3、启动源码分析


从官方将源码下载下来,当前发布版本为2.2.0,该版本下载的为最新版本2.2.1,。代码相差不大,接下来对xxl-job进行一步一步拆解

github地址    https://github.com/xuxueli/xxl-job

码云地址    https://gitee.com/xuxueli0323/xxl-job

该框架有如下优点

1、使用层面

简单、好用,源码易读,修改简单

2、功能层面

1、有可视化界面进行操作,可以集中化管理任务

2、通过可视化可以对任务的管理,执行,调度,任务生命周期管理

3、任务执行支持手动和corn表达式定时触发

4、任务调度支持配置任务链以及任务执行失败的阻断策略

5、任务执行时间,执行信息等记录,可以更好地分析任务瓶颈以及执行分布时间

等等

1、xxl目录结构

XXL-Job启动源码详解

xxl-job的目录如下

1.1 xxl-job-admin 

该目录为调度中心源码,包括任务的管理,执行,调度,以及运行监控报表。喏,就下面这个

XXL-Job启动源码详解

1.2 xxl-job-core

该代码为服务集成的核心源码包,包括重要注解,执行器等。

1.3 xxl-job-executor-samples

该包为服务集成测试demo,支持frameless,jfinal,spring,SpringBoot等多个框架集成。

2、架构设计

2.1  整体架构设计

通过服务注册的方式,将job注册到任务调度中心,通过调度中心进行统一的任务管理

XXL-Job启动源码详解

2.2 服务集成

以SpringBoot集成xxl-job为例,集成直接运行demo即可,可以参考01.XXL-JOB这个文章。

1、配置文件

# web port spring web容器端口
server.port=8083
# no web  
#spring.main.web-environment=false

# log config
logging.config=classpath:logback.xml
### xxl-job注册中心地址
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### xxl-job access token
xxl.job.accessToken=
### xxl-job 执行器名称
xxl.job.executor.appname=omsJob
### xxl-job executor 执行器ip
xxl.job.executor.ip=
### xxl-job executor 执行器端口
xxl.job.executor.port=9999
### xxl-job executor log-path 日志配置
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days 日志定时清理时间
xxl.job.executor.logretentiondays=30

2、初始化执行器

@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
    logger.info(">>>>>>>>>>> xxl-job config init.");
    XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
    xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
    xxlJobSpringExecutor.setAppname(appname);
    xxlJobSpringExecutor.setAddress(address);
    xxlJobSpringExecutor.setIp(ip);
    xxlJobSpringExecutor.setPort(port);
    xxlJobSpringExecutor.setAccessToken(accessToken);
    xxlJobSpringExecutor.setLogPath(logPath);
    xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

    return xxlJobSpringExecutor;
}

3、使用xxl-job注解配置任务

/**
 * 1、简单任务示例(Bean模式)
 */
@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {
    XxlJobLogger.log("XXL-JOB, Hello World.");

    for (int i = 0; i < 5; i++) {
        XxlJobLogger.log("beat at:" + i);
        TimeUnit.SECONDS.sleep(2);
    }
    return ReturnT.SUCCESS;
}

4、启动成功后,服务会开启两个端口,一个是业务端口,一个是调度中心下发job执行的端口。(此处要优化,服务和job触发其实使用一个端口即可)

XXL-Job启动源码详解

3、启动源码分析

1、服务启动流程

@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
    logger.info(">>>>>>>>>>> xxl-job config init.");
    XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
    xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
    xxlJobSpringExecutor.setAppname(appname);
    xxlJobSpringExecutor.setAddress(address);
    xxlJobSpringExecutor.setIp(ip);
    xxlJobSpringExecutor.setPort(port);
    xxlJobSpringExecutor.setAccessToken(accessToken);
    xxlJobSpringExecutor.setLogPath(logPath);
    xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

    return xxlJobSpringExecutor;
}

服务主要启动是初始化XxlJobSpringExecutor这个bean对象,该对象定义了执行器xxlJobSpringExecutor的相关配置,如注册中心地址,服务提供地址,以及授权token等。该对象类图如下,

 

XxlJobSpringExecutor执行器继承XxlJobExecutor并实现Spring的ApplicationContextAware等类,对该bean进行了增强。主要核心类为XxlJobExecutor。

因为继承SmartInitializingSingleton,所以bean初始化完成之后会执行afterSingletonsInstantiated方法,该类主要为initJobHandlerMethodRepository这个方法,用于扫描xxl-job注解,进行任务加载和管理

 

 

// start
@Override
public void afterSingletonsInstantiated() {

    // 扫描xxl-job注解,进行任务加载和管理
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
initJobHandlerMethodRepository方法主要如下

该方法主要是通过获取Spring管理的容器bean,然后扫描带有xxljob注解的方法,将他们保存在jobHandlerRepository对象中

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
        return;
    }
    // 扫描Spring管理的bean
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {
        Object bean = applicationContext.getBean(beanDefinitionName);

        Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
        try {
			
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        @Override
                        public XxlJob inspect(Method method) {
 							// 获取注解为XxlJob的方法,并保存在annotatedMethods
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        }
        if (annotatedMethods==null || annotatedMethods.isEmpty()) {
            continue;
        }
		
		//获取方法属性,并存储在jobHandlerRepository对象中
        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
            Method method = methodXxlJobEntry.getKey();
            XxlJob xxlJob = methodXxlJobEntry.getValue();
            if (xxlJob == null) {
                continue;
            }

            String name = xxlJob.value();
            if (name.trim().length() == 0) {
                throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
            }
            if (loadJobHandler(name) != null) {
                throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
            }

            // execute method
            if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                        "The correct method format like \" public ReturnT<String> execute(String param) \" .");
            }
            if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                        "The correct method format like \" public ReturnT<String> execute(String param) \" .");
            }
            method.setAccessible(true);

            // init and destory
            Method initMethod = null;
            Method destroyMethod = null;

            if (xxlJob.init().trim().length() > 0) {
                try {
                    initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                    initMethod.setAccessible(true);
                } catch (NoSuchMethodException e) {
                    throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                }
            }
            if (xxlJob.destroy().trim().length() > 0) {
                try {
                    destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                    destroyMethod.setAccessible(true);
                } catch (NoSuchMethodException e) {
                    throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                }
            }
            // registry jobhandler
		    //将任务存储在jobHandlerRepository对象中,后续下发任务使用
            registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
        }
    }
}

之后执行super.start(),执行父类XxlJobExecutor的start方法。该方法主要有日志初始化,日志清理任务初始化,RPC调用触发器回调线程启动,调度中心列表初始化以及执行器端口初始化。

public void start() throws Exception { // init logpath
	//初始化任务执行日志路径
    XxlJobFileAppender.initLogPath(logPath);

    // 日志定时清理任务
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // 初始化触发器回调线程(用RPC回调调度中心接口)
    TriggerCallbackThread.getInstance().start();
	//初始化调度中心列表
    initAdminBizList( adminAddresses,  accessToken);
    // init executor-server  执行器端口启动
    initEmbedServer(address, ip, port, appname, accessToken);
}
 XxlJobFileAppender.initLogPath(logPath)和JobLogFileCleanThread.getInstance().start(logRetentionDays)主要对执行日志进行初始化,就不多解释了,直接往下看。
TriggerCallbackThread.getInstance().start();
public void start() {

    // 调度中心注册表会否为空
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
        return;
    }

    // callback
    triggerCallbackThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // 监听阻塞队列
            while(!toStop){
                try {
					
                    HandleCallbackParam callback = getInstance().callBackQueue.take();
                    if (callback != null) {

                        // 组装callback返回的参数
                        List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                        int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                        callbackParamList.add(callback);

                        // 执行回调
                        if (callbackParamList!=null && callbackParamList.size()>0) {
                            doCallback(callbackParamList);
                        }
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }

            // last callback
            try {
                List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                if (callbackParamList!=null && callbackParamList.size()>0) {
                    doCallback(callbackParamList);
                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");

        }
    });
    triggerCallbackThread.setDaemon(true);
    triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
    triggerCallbackThread.start();


    // retry
    triggerRetryCallbackThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while(!toStop){
                try {
                    retryFailCallbackFile();
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }
                try {
                    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
        }
    });
    triggerRetryCallbackThread.setDaemon(true);
    triggerRetryCallbackThread.start();
}
 doCallback(callbackParamList)如下
/**
 * do callback, will retry if error
 * @param callbackParamList
 */
private void doCallback(List<HandleCallbackParam> callbackParamList){
    boolean callbackRet = false;
    // 向所有的调度中心发送回调信息
    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
        try {
		//本质上是调用注册中心的api/callback接口。记录调用结果。
            ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
            if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
                callbackRet = true;
                break;
            } else {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
            }
        } catch (Exception e) {
            callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
        }
    }
    if (!callbackRet) {
        appendFailCallbackFile(callbackParamList);
    }
}
adminBiz.callback(callbackParamList) 

调用注册中心api接口

@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
    return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}
initAdminBizList( adminAddresses, accessToken); 初始化注册中心列表,用于后期和注册中心交互
//扫描xxl.job.admin.addresses配置,将他们加入注册中心列表adminBizList对象中。用于后期发送回调
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    if (adminAddresses!=null && adminAddresses.trim().length()>0) {
        for (String address: adminAddresses.trim().split(",")) {
            if (address!=null && address.trim().length()>0) {

                AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

                if (adminBizList == null) {
                    adminBizList = new ArrayList<AdminBiz>();
                }
                adminBizList.add(adminBiz);
            }
        }
    }
}
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);<核心>

 

//初始化xxljob执行器服务
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

 //初始化ip和端口,如果没有ip则自动获取本地ip
 port = port>0?port: NetUtil.findAvailablePort(9999);
 ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

 // generate address
 if (address==null || address.trim().length()==0) {
 String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
 address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
 }

 // 启动服务
 embedServer = new EmbedServer();
 embedServer.start(address, port, appname, accessToken);
}
embedServer.start(address, port, appname, accessToken); 本质上是一个Netty服务,标准的Netty服务启动,我们只看EmbedHttpServerHandler,Netty处理请求的handler
public void start(final String address, final int port, final String appname, final String accessToken) {
 executorBiz = new ExecutorBizImpl();
 thread = new Thread(new Runnable() {

 @Override
 public void run() {

 // param
 EventLoopGroup bossGroup = new NioEventLoopGroup();
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
 0,
 200,
 60L,
 TimeUnit.SECONDS,
 new LinkedBlockingQueue<Runnable>(2000),
 new ThreadFactory() {
 @Override
 public Thread newThread(Runnable r) {
 return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
 }
 },
 new RejectedExecutionHandler() {
 @Override
 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
 throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
 }
 });


 try {
 // start server
 ServerBootstrap bootstrap = new ServerBootstrap();
 bootstrap.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<SocketChannel>() {
 @Override
 public void initChannel(SocketChannel channel) throws Exception {
 channel.pipeline()
 .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
 .addLast(new HttpServerCodec())
 .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
 .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
 }
 })
 .childOption(ChannelOption.SO_KEEPALIVE, true);

 // bind
 ChannelFuture future = bootstrap.bind(port).sync();

 logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

 //注册到调度中心
 startRegistry(appname, address);

 // wait util stop
 future.channel().closeFuture().sync();

 } catch (InterruptedException e) {
 if (e instanceof InterruptedException) {
 logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
 } else {
 logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
 }
 } finally {
 // stop
 try {
 workerGroup.shutdownGracefully();
 bossGroup.shutdownGracefully();
 } catch (Exception e) {
 logger.error(e.getMessage(), e);
 }
 }

 }

 });
 thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
 thread.start();
}

 

 

public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
 private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);
 
 private ExecutorBiz executorBiz; //执行器
 private String accessToken; //token
 private ThreadPoolExecutor bizThreadPool;//执行器线程池
 public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
 this.executorBiz = executorBiz;
 this.accessToken = accessToken;
 this.bizThreadPool = bizThreadPool;
 }

 @Override
 protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

 // request parse
 //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
 String requestData = msg.content().toString(CharsetUtil.UTF_8);//获取请求数据
 String uri = msg.uri();
 HttpMethod httpMethod = msg.method();
 boolean keepAlive = HttpUtil.isKeepAlive(msg);
 String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

 // invoke
 bizThreadPool.execute(new Runnable() {
 @Override
 public void run() {
 // 处理请求
 Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

 // 格式化为JSON
 String responseJson = GsonTool.toJson(responseObj);

 // 写回客户端
 writeResponse(ctx, keepAlive, responseJson);
 }
 });
 }
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

 // valid
 if (HttpMethod.POST != httpMethod) {
 return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
 }
 if (uri==null || uri.trim().length()==0) {
 return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
 }
 if (accessToken!=null
 && accessToken.trim().length()>0
 && !accessToken.equals(accessTokenReq)) {
 return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
 }

 // services mapping
 try {
 //接收注册中心请求接口处理
 if ("/beat".equals(uri)) {
 return executorBiz.beat();
 } else if ("/idleBeat".equals(uri)) {
 IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
 return executorBiz.idleBeat(idleBeatParam);
 } else if ("/run".equals(uri)) { //注册中心执行接口
 TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
 return executorBiz.run(triggerParam);
 } else if ("/kill".equals(uri)) {
 KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
 return executorBiz.kill(killParam);
 } else if ("/log".equals(uri)) {
 LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
 return executorBiz.log(logParam);
 } else {
 return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
 }
 } catch (Exception e) {
 logger.error(e.getMessage(), e);
 return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
 }
}
我们主要看下run方法的执行过程
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
 // 根据jobid加载对应的job执行信息,第一次执行为null
 JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
 IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;//根绝jobThread获取job处理handler
 String removeOldReason = null;

 // valid:jobHandler + jobThread
 GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); //获取任务类型
 if (GlueTypeEnum.BEAN == glueTypeEnum) {

 // new jobhandler
 IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());//获取任务的执行器

 // 校验新*ob是否一致,不一致将老的进行初始化。有可能任务更新。通过jobid获取的是老的
 if (jobThread!=null && jobHandler != newJobHandler) {
 // change handler, need kill old thread
 removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

 jobThread = null;
 jobHandler = null;
 }

 // valid handler
 if (jobHandler == null) {
 jobHandler = newJobHandler; //将新处理handler赋值给老的
 if (jobHandler == null) {
 return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
 }
 }

 } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

 // valid old jobThread
 if (jobThread != null &&
 !(jobThread.getHandler() instanceof GlueJobHandler
 && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
 // change handler or gluesource updated, need kill old thread
 removeOldReason = "change job source or glue type, and terminate the old job thread.";

 jobThread = null;
 jobHandler = null;
 }

 
 if (jobHandler == null) {
 try {
 IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
 jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
 } catch (Exception e) {
 logger.error(e.getMessage(), e);
 return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
 }
 }
 } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

 // valid old jobThread
 if (jobThread != null &&
 !(jobThread.getHandler() instanceof ScriptJobHandler
 && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
 // change script or gluesource updated, need kill old thread
 removeOldReason = "change job source or glue type, and terminate the old job thread.";

 jobThread = null;
 jobHandler = null;
 }

 // valid handler
 if (jobHandler == null) {
 jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
 }
 } else {
 return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
 }

 // executor block strategy
 if (jobThread != null) {
 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
 if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
 // discard when running
 if (jobThread.isRunningOrHasQueue()) {
 return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
 }
 } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
 // kill running jobThread
 if (jobThread.isRunningOrHasQueue()) {
 removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

 jobThread = null;
 }
 } else {
 // just queue trigger
 }
 }

  // 如果jobThread 为null,则将任务信息注册到jobThreadRepository对象进行缓存,并启动线程
 if (jobThread == null) {
 jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
 }

 //将任务推送到自己现成的任务队列中区
 ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
 return pushResult;
}
看下registJobThread方法,该方法主要是根据任务信息,创建一个jobThread,之后启动该线程。然后将其缓存到jobThreadRepository中。如果存在老的任务,则将老的任务停掉。
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();

public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
 JobThread newJobThread = new JobThread(jobId, handler);//创建新得jobThread对象
 newJobThread.start();//启动线程
 logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
 //将新的jobThread放入map中,并弹出老的。
 JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
 if (oldJobThread != null) {
//将老的停掉
 oldJobThread.toStop(removeOldReason);
 oldJobThread.interrupt();
 }

 return newJobThread;
}

该线程执行如下,主要就是获取队列中的任务,然后通过新建FutureTask线程执行任务。之后将执行结果推到TriggerCallbackThread的队列中。通过TriggerCallbackThread推到任务调度中心,进行记录结果信息。

 @Override
public void run() {

       // init
       try {
      handler.init();
   } catch (Throwable e) {
          logger.error(e.getMessage(), e);
   }

   // execute
   while(!toStop){
      running = false;
      idleTimes++;

           TriggerParam triggerParam = null;
           ReturnT<String> executeResult = null;
           try {
         // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
		//弹出队列任务
         triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
         if (triggerParam!=null) {
            running = true;
            idleTimes = 0;
            triggerLogIdSet.remove(triggerParam.getLogId());

            // log filename, like "logPath/yyyy-MM-dd/9999.log"
            String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
            XxlJobContext.setXxlJobContext(new XxlJobContext(
                  triggerParam.getLogId(),
                  logFileName,
                  triggerParam.getBroadcastIndex(),
                  triggerParam.getBroadcastTotal()));

            // execute
            XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());

            if (triggerParam.getExecutorTimeout() > 0) {
               // limit timeout
               Thread futureThread = null;
               try {
                  final TriggerParam triggerParamTmp = triggerParam;
				//创建futureTask线程任务,并执行
                  FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
                     @Override
                     public ReturnT<String> call() throws Exception {
                        return handler.execute(triggerParamTmp.getExecutorParams());
                     }
                  });
                  futureThread = new Thread(futureTask);
                  futureThread.start();
					//获取执行结果
                  executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
               } catch (TimeoutException e) {

                  XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
                  XxlJobLogger.log(e);

                  executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
               } finally {
                  futureThread.interrupt();
               }
            } else {
               // just execute
               executeResult = handler.execute(triggerParam.getExecutorParams());
            }

            if (executeResult == null) {
               executeResult = IJobHandler.FAIL;
            } else {
               executeResult.setMsg(
                     (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
                           ?executeResult.getMsg().substring(0, 50000).concat("...")
                           :executeResult.getMsg());
               executeResult.setContent(null);    // limit obj size
            }
            XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);

         } else {
            if (idleTimes > 30) {
               if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
                  XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
               }
            }
         }
      } catch (Throwable e) {
         if (toStop) {
            XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
         }

         StringWriter stringWriter = new StringWriter();
         e.printStackTrace(new PrintWriter(stringWriter));
         String errorMsg = stringWriter.toString();
         executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);

         XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
      } finally {
		//结果添加到回调队列中
               if(triggerParam != null) {
                   // callback handler info
                   if (!toStop) {
                       // commonm
                       TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
                   } else {
                       // is killed
                       ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
                       TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
                   }
               }
           }
       }

   // callback trigger request in queue
   while(triggerQueue !=null && triggerQueue.size()>0){
      TriggerParam triggerParam = triggerQueue.poll();
      if (triggerParam!=null) {
         // is killed
         ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
         TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
      }
   }

   // destroy
   try {
      handler.destroy();
   } catch (Throwable e) {
      logger.error(e.getMessage(), e);
   }

   logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

 

至此,整个XXlJOb的启动和接收请求的处理大致梳理完了。代码整体很简单。就不细致展开了。了解整个设计思想即可。

本文地址:https://blog.csdn.net/b379685397/article/details/111034785