欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

XXL分布式任务调度平台

程序员文章站 2022-05-20 14:11:02
...

序言

       其实用了很多年Spring quartz,突然发现XXL-JOB 这个东西.大概写下搭建的步骤和使用的步骤, 毕竟挨个通看一遍也要花不少时间,所以弄个使用的博客,后面针对性的去找吧.

      官方的说明网址:https://www.xuxueli.com/xxl-job/#/

搭建环境

下载源代码

         在https://github.com/xuxueli/xxl-job 下载的包结构如下

XXL分布式任务调度平台

其中的各个目录包的说明如下:

    xxl-job-admin:调度中心
    xxl-job-core:公共依赖
    xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器)
        :xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
        :xxl-job-executor-sample-spring:Spring版本,通过Spring容器管理执行器,比较通用;
        :xxl-job-executor-sample-frameless:无框架版本;
        :xxl-job-executor-sample-jfinal:JFinal版本,通过JFinal管理执行器;
        :xxl-job-executor-sample-nutz:Nutz版本,通过Nutz管理执行器;
        :xxl-job-executor-sample-jboot:jboot版本,通过jboot管理执行器;
  • xxl-job-admin是xxl-job的后台服务,需要打成Jar后再服务器上运行.(也可以使用Docker的方式搭建,但是修改数据库连接,端口等配置就比较复杂了,可以自己根据源代码生成满足自己条件的镜像,而不是使用公共镜像.)
  • 执行器则是具体任务实现类,需要连接上xxl-job-admin来实现高可用,和任务管理.

配置部署“调度中心”(xxl-job-admin)

初始化数据库

xxl-job-admin 和 spring quartz的高可用都依赖于数据库,且有自己的连接数据库.所以需要先建库和建表

该ddl文件在如下路径里(执行下就好):

\xxl-job-master\doc\db\tables_xxl_job.sql

 

导入xxl-job-admin 到IDE中

在Eclipse中打入Maven工程即我们刚下载下来的xxl-job-admn工程.

 

修改配置文件

配置文件在如下的位置:

XXL分布式任务调度平台

配置文件中一般修改下数据库连接和服务端口就行了,具体配置含义如下:

    ### 调度中心JDBC链接:链接地址请保持和 2.1章节 所创建的调度数据库的地址一致
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
    spring.datasource.username=root
    spring.datasource.password=root_pwd
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    ### 报警邮箱
    spring.mail.host=smtp.qq.com
    spring.mail.port=25
    aaa@qq.com
    spring.mail.password=xxx
    spring.mail.properties.mail.smtp.auth=true
    spring.mail.properties.mail.smtp.starttls.enable=true
    spring.mail.properties.mail.smtp.starttls.required=true
    spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
    ### 调度中心通讯TOKEN [选填]:非空时启用;
    xxl.job.accessToken=
    ### 调度中心国际化配置 [必填]: 默认为 "zh_CN"/中文简体, 可选范围为 "zh_CN"/中文简体, "zh_TC"/中文繁体 and "en"/英文;
    xxl.job.i18n=zh_CN
    ## 调度线程池最大线程配置【必填】
    xxl.job.triggerpool.fast.max=200
    xxl.job.triggerpool.slow.max=100
    ### 调度中心日志表数据保存天数 [必填]:过期日志自动清理;限制大于等于7时生效,否则, 如-1,关闭自动清理功能;
    xxl.job.logretentiondays=30

 

修改Pom中的版本号

我下载下来的是snapshot版本,所以我要改成稳定版本(注意 xxl-job-admin 要和xxl-job-core版本最好一致)

<parent>
        <groupId>com.xuxueli</groupId>
        <artifactId>xxl-job</artifactId>
        <version>2.2.0</version>
        <!-- 
            <version>2.2.1-SNAPSHOT</version>
        -->
</parent>

 

生成Jar并打成Docker镜像

使用maven命令package打成jar,然后生成Docker镜像方便将来使用.具体生成镜像的步骤参考:https://blog.csdn.net/cuiyaonan2000/article/details/108471970

启动成功后的访问地址:http://localhost:8080/xxl-job-admin

账号密码:admin/123456

支持集群,但是都是访问同一个数据库,具体参考官网:https://www.xuxueli.com/xxl-job/#/

 

创建执行器--就是具体的任务(spring quartz 中的service)

在xxl-job-admin创建一个任务

在任务管理界面中新增一个任务,如下图所示

XXL分布式任务调度平台

其中运行模式很重要.我们目前主要使用Bean的方式,

  • Bean模式就是我们的工程里面用一个类注册到xxl-job-admin中作为任务执行体(使用@XxlJob注解来声明哪个方法为执行体)
  • Bean要跟JobHandler结合使用,因为在使用@XxlJob需要设置value,该值跟JobHandle相结合.
  • 其它的所谓GLUE都是在xxl-job-admin中创建一个任务执行体,不需要我们的工程去注册.

其它的含义参考:

    - 执行器:任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能; 另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器, 可在 "执行器管理" 进行设置;
    - 任务描述:任务的描述信息,便于任务管理;
    - 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
        FIRST(第一个):固定选择第一个机器;
        LAST(最后一个):固定选择最后一个机器;
        ROUND(轮询):;
        RANDOM(随机):随机选择在线的机器;
        CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
        LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
        LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
        FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
        BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
        SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
    - Cron:触发任务执行的Cron表达式;
    - 运行模式:
        BEAN模式:任务以JobHandler方式维护在执行器端;需要结合 "JobHandler" 属性匹配执行器中任务;
        GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并 "groovy" 源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务;
        GLUE模式(Shell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "shell" 脚本;
        GLUE模式(Python):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "python" 脚本;
        GLUE模式(PHP):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "php" 脚本;
        GLUE模式(NodeJS):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "nodejs" 脚本;
        GLUE模式(PowerShell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "PowerShell" 脚本;
    - JobHandler:运行模式为 "BEAN模式" 时生效,对应执行器中新开发的JobHandler类“@JobHandler”注解自定义的value值;
    - 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
        单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
        丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
        覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
    - 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度。
    - 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
    - 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
    - 报警邮件:任务调度失败时邮件通知的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔;
    - 负责人:任务的负责人;
    - 执行参数:任务执行所需的参数;

创建执行体(在XXL-job-admin 这个相当于是创建一个分组,具体的执行器要归属到这个执行体里)

在执行器管理中点击新增,截图如下:

XXL分布式任务调度平台

含义如下:

  1. AppName: 是每个执行器集群的唯一标示AppName, 这个appName会在我们的工程量面设置,相当于是appName都是一个分组内的.用于高可用
  2. 名称: 执行器的名称,
  3. 排序: 执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表;
  4. 注册方式:调度中心获取执行器地址的方式;
  5. 自动注册:执行器自动进行执行器注册,调度中心通过底层注册表可以动态发现执行器机器地址;
  6. 手动录入:人工手动录入执行器的地址信息,多地址逗号分隔,供调度中心使用;
  7. 机器地址:"注册方式"为"手动录入"时有效,支持人工维护执行器的地址信息;

 

创建具体执行任务(Bean模式)

首先创建个Maven 工程引入跟xxl-job-admin中相同版本的xxl-job-core 包

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.2.0</version>
</dependency>

 

创建配置文件(如上的appname用来分组用的比较重要,要跟xxl-job-admin中的对上.)

    ### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
    xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
    ### 执行器通讯TOKEN [选填]:非空时启用;
    xxl.job.accessToken=
    ### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
    xxl.job.executor.appname=xxl-job-executor-sample
    ### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
    xxl.job.executor.address=
    ### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
    xxl.job.executor.ip=
    ### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
    xxl.job.executor.port=9999
    ### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
    xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
    ### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
    xxl.job.executor.logretentiondays=30

 

创建配置xxl的启动文件

package nan.yao.cui.xxl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;

@Configuration
public class XXLJobConfig {
	
    private Logger logger = LoggerFactory.getLogger(XXLJobConfig.class);
	

    @Qualifier("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Qualifier("${xxl.job.accessToken}")
    private String accessToken;

    @Qualifier("${xxl.job.executor.appname}")
    private String appname;

    @Qualifier("${xxl.job.executor.address}")
    private String address;

    @Qualifier("${xxl.job.executor.ip}")
    private String ip;

    @Qualifier("${xxl.job.executor.port}")
    private int port;

    @Qualifier("${xxl.job.executor.logpath}")
    private String logPath;

    @Qualifier("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;
    
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        //手动通过如下方式注入到执行器容器。
        //XxlJobExecutor.registJobHandler("demoJobHandler", new DemoJobHandler());
        return xxlJobSpringExecutor;
    }

}

创建执行体

package nan.yao.cui.xxl;

import org.springframework.stereotype.Component;

import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;

@Component
public class XxlJobSinge {
	
	/**
	 * 
	    1、在Spring Bean实例中,开发Job方法,方式格式要求为 "public ReturnT<String> execute(String param)"
	    2、为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法",
	     destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
	    3、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;
	 */
	
    @XxlJob("test")
    public ReturnT<String> demoJobHandler(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB, Hello World.");

        for (int i = 0; i < 5; i++) {
            XxlJobLogger.log("beat at:" + i);
        }
        return ReturnT.SUCCESS;
    }

}

 

关于分片

这里的分片任务是指 ,任务的路由策略为 分片广播的模式下,

各个任务会通过如下的方法获取(执行器数量,和当前任务在执行器队列中的排名位数, 然后业务根据这2个参数来选择处理大批量任务中的指定分配的任务.):

 

这里发现一个坑在2.2.0版本中如下的获取分片数和当前分片并无此类,但是官网上是这么写的!!!!

  // 分片参数
int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex();
int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal();


//XxlJobLogger 打印的日志会在xxl-job-admin中看到,本地看不到
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

实际2.2.0使用的是如下的方法:

// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
//执行器数量
int number = shardingVO.getTotal();

//当前分片
int index = shardingVO.getIndex();
    	 
    	 
//XxlJobLogger 打印的日志会在xxl-job-admin中看到,本地看不到
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO, index);

 

 

相关标签: 定时任务