XXL分布式任务调度平台
序言
其实用了很多年Spring quartz,突然发现XXL-JOB 这个东西.大概写下搭建的步骤和使用的步骤, 毕竟挨个通看一遍也要花不少时间,所以弄个使用的博客,后面针对性的去找吧.
官方的说明网址:https://www.xuxueli.com/xxl-job/#/
搭建环境
下载源代码
在https://github.com/xuxueli/xxl-job 下载的包结构如下
其中的各个目录包的说明如下:
xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器)
:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
:xxl-job-executor-sample-spring:Spring版本,通过Spring容器管理执行器,比较通用;
:xxl-job-executor-sample-frameless:无框架版本;
:xxl-job-executor-sample-jfinal:JFinal版本,通过JFinal管理执行器;
:xxl-job-executor-sample-nutz:Nutz版本,通过Nutz管理执行器;
:xxl-job-executor-sample-jboot:jboot版本,通过jboot管理执行器;
- xxl-job-admin是xxl-job的后台服务,需要打成Jar后再服务器上运行.(也可以使用Docker的方式搭建,但是修改数据库连接,端口等配置就比较复杂了,可以自己根据源代码生成满足自己条件的镜像,而不是使用公共镜像.)
- 执行器则是具体任务实现类,需要连接上xxl-job-admin来实现高可用,和任务管理.
配置部署“调度中心”(xxl-job-admin)
初始化数据库
xxl-job-admin 和 spring quartz的高可用都依赖于数据库,且有自己的连接数据库.所以需要先建库和建表
该ddl文件在如下路径里(执行下就好):
\xxl-job-master\doc\db\tables_xxl_job.sql
导入xxl-job-admin 到IDE中
在Eclipse中打入Maven工程即我们刚下载下来的xxl-job-admn工程.
修改配置文件
配置文件在如下的位置:
配置文件中一般修改下数据库连接和服务端口就行了,具体配置含义如下:
### 调度中心JDBC链接:链接地址请保持和 2.1章节 所创建的调度数据库的地址一致
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
### 报警邮箱
spring.mail.host=smtp.qq.com
spring.mail.port=25
aaa@qq.com
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
### 调度中心通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 调度中心国际化配置 [必填]: 默认为 "zh_CN"/中文简体, 可选范围为 "zh_CN"/中文简体, "zh_TC"/中文繁体 and "en"/英文;
xxl.job.i18n=zh_CN
## 调度线程池最大线程配置【必填】
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
### 调度中心日志表数据保存天数 [必填]:过期日志自动清理;限制大于等于7时生效,否则, 如-1,关闭自动清理功能;
xxl.job.logretentiondays=30
修改Pom中的版本号
我下载下来的是snapshot版本,所以我要改成稳定版本(注意 xxl-job-admin 要和xxl-job-core版本最好一致)
<parent>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job</artifactId>
<version>2.2.0</version>
<!--
<version>2.2.1-SNAPSHOT</version>
-->
</parent>
生成Jar并打成Docker镜像
使用maven命令package打成jar,然后生成Docker镜像方便将来使用.具体生成镜像的步骤参考:https://blog.csdn.net/cuiyaonan2000/article/details/108471970
启动成功后的访问地址:http://localhost:8080/xxl-job-admin
账号密码:admin/123456
支持集群,但是都是访问同一个数据库,具体参考官网:https://www.xuxueli.com/xxl-job/#/
创建执行器--就是具体的任务(spring quartz 中的service)
在xxl-job-admin创建一个任务
在任务管理界面中新增一个任务,如下图所示
其中运行模式很重要.我们目前主要使用Bean的方式,
Bean模式就是我们的工程里面用一个类注册到xxl-job-admin中作为任务执行体(使用
@XxlJob注解来声明哪个方法
为执行体)Bean要跟JobHandler结合使用,因为在使用@XxlJob需要设置value,该值跟JobHandle相结合.
其它的所谓GLUE都是在xxl-job-admin中创建一个任务执行体,不需要我们的工程去注册.
其它的含义参考:
- 执行器:任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能; 另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器, 可在 "执行器管理" 进行设置;
- 任务描述:任务的描述信息,便于任务管理;
- 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):;
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
- Cron:触发任务执行的Cron表达式;
- 运行模式:
BEAN模式:任务以JobHandler方式维护在执行器端;需要结合 "JobHandler" 属性匹配执行器中任务;
GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并 "groovy" 源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务;
GLUE模式(Shell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "shell" 脚本;
GLUE模式(Python):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "python" 脚本;
GLUE模式(PHP):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "php" 脚本;
GLUE模式(NodeJS):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "nodejs" 脚本;
GLUE模式(PowerShell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "PowerShell" 脚本;
- JobHandler:运行模式为 "BEAN模式" 时生效,对应执行器中新开发的JobHandler类“@JobHandler”注解自定义的value值;
- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
- 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度。
- 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
- 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
- 报警邮件:任务调度失败时邮件通知的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔;
- 负责人:任务的负责人;
- 执行参数:任务执行所需的参数;
创建执行体(在XXL-job-admin 这个相当于是创建一个分组,具体的执行器要归属到这个执行体里)
在执行器管理中点击新增,截图如下:
含义如下:
AppName: 是每个执行器集群的唯一标示AppName, 这个appName会在我们的工程量面设置,相当于是appName都是一个分组内的.用于高可用
名称: 执行器的名称,
排序: 执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表;
注册方式:调度中心获取执行器地址的方式;
自动注册:执行器自动进行执行器注册,调度中心通过底层注册表可以动态发现执行器机器地址;
手动录入:人工手动录入执行器的地址信息,多地址逗号分隔,供调度中心使用;
机器地址:"注册方式"为"手动录入"时有效,支持人工维护执行器的地址信息;
创建具体执行任务(Bean模式)
首先创建个Maven 工程引入跟xxl-job-admin中相同版本的xxl-job-core 包
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.2.0</version>
</dependency>
创建配置文件(如上的appname用来分组用的比较重要,要跟xxl-job-admin中的对上.)
### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30
创建配置xxl的启动文件
package nan.yao.cui.xxl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
@Configuration
public class XXLJobConfig {
private Logger logger = LoggerFactory.getLogger(XXLJobConfig.class);
@Qualifier("${xxl.job.admin.addresses}")
private String adminAddresses;
@Qualifier("${xxl.job.accessToken}")
private String accessToken;
@Qualifier("${xxl.job.executor.appname}")
private String appname;
@Qualifier("${xxl.job.executor.address}")
private String address;
@Qualifier("${xxl.job.executor.ip}")
private String ip;
@Qualifier("${xxl.job.executor.port}")
private int port;
@Qualifier("${xxl.job.executor.logpath}")
private String logPath;
@Qualifier("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
//手动通过如下方式注入到执行器容器。
//XxlJobExecutor.registJobHandler("demoJobHandler", new DemoJobHandler());
return xxlJobSpringExecutor;
}
}
创建执行体
package nan.yao.cui.xxl;
import org.springframework.stereotype.Component;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
@Component
public class XxlJobSinge {
/**
*
1、在Spring Bean实例中,开发Job方法,方式格式要求为 "public ReturnT<String> execute(String param)"
2、为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法",
destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
3、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;
*/
@XxlJob("test")
public ReturnT<String> demoJobHandler(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobLogger.log("beat at:" + i);
}
return ReturnT.SUCCESS;
}
}
关于分片
这里的分片任务是指 ,任务的路由策略为 分片广播的模式下,
各个任务会通过如下的方法获取(执行器数量,和当前任务在执行器队列中的排名位数, 然后业务根据这2个参数来选择处理大批量任务中的指定分配的任务.):
这里发现一个坑在2.2.0版本中如下的获取分片数和当前分片并无此类,但是官网上是这么写的!!!!
// 分片参数
int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex();
int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal();
//XxlJobLogger 打印的日志会在xxl-job-admin中看到,本地看不到
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
实际2.2.0使用的是如下的方法:
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
//执行器数量
int number = shardingVO.getTotal();
//当前分片
int index = shardingVO.getIndex();
//XxlJobLogger 打印的日志会在xxl-job-admin中看到,本地看不到
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO, index);