SpringBoot-ElasticJob封装快速上手使用(分布式定时器)
elastic-job-spring-boot
qq交流群:812321371
1 简介
elastic-job
是一个分布式调度解决方案,由两个相互独立的子项目elastic-job-lite
和elastic-job-cloud
组成。elastic-job-lite
定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
基于quartz
定时任务框架为基础的,因此具备quartz
的大部分功能
使用zookeeper
做协调,调度中心,更加轻量级
支持任务的分片
支持弹性扩容,可以水平扩展, 当任务再次运行时,会检查当前的服务器数量,重新分片,分片结束之后才会继续执行任务
失效转移,容错处理,当一台调度服务器宕机或者跟zookeeper
断开连接之后,会立即停止作业,然后再去寻找其他空闲的调度服务器,来运行剩余的任务
提供运维界面,可以管理作业和注册中心。
1.1 使用场景
由于项目为微服务,单模块可能在两个实例以上的数量,定时器就会出现多实例同时执行的情况。
一般定时器缺少管理界面,无法监控定时器是否执行成功。
市面上常见的解决方案为定时器加锁的操作,或者采用第3方分布式定时器。
分布式定时器有多种方案,比如阿里内部的scheduledx
,当当网的elastic job
,个人开源的xxl-job
等。
1.2 功能列表
- 分布式调度协调
- 弹性扩容缩容
- 失效转移
- 错过执行作业重触发
- 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
- 自诊断并修复分布式不稳定造成的问题
- 支持并行调度
- 支持作业生命周期操作
- 丰富的作业类型
- spring整合以及命名空间提供
- 运维平台
1.3 概念
分片:任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%
。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器a遍历id以奇数结尾的数据;服务器b遍历id以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为id%10,而服务器a被分配到分片项0,1,2,3,4
;服务器b被分配到分片项5,6,7,8,9
,直接的结果就是服务器a遍历id
以0-4
结尾的数据;服务器b遍历id
以5-9
结尾的数据。
历史轨迹:elastic-job
提供了事件追踪功能,可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。
1.4 封装elasticjob
由于当当网elastic job
处于1年间未更新阶段,相关jar处于可以使用阶段功能不全。考虑到使用场景为多项目使用,将elastic-job-lite-spring
简单封装便于使用。
2.使用说明:
2.1 添加依赖
ps:实际version版本请使用最新版
<dependency> <groupid>com.purgeteam</groupid> <artifactid>elasticjob-spring-boot-starter</artifactid> <version>0.1.1.release</version> </dependency>
2.2 配置
ps: 需要mysql
,zookeeper
支持,请提前搭建好。
配置bootstrap.yml
或者application.yml
。
加入以下配置:
spring: elasticjob: datasource: # job需要的记录数据源 url: jdbc:mysql://127.0.0.1:3306/batch_log?useunicode=true&characterencoding=utf-8&verifyservercertificate=false&usessl=false&requiressl=false driver-class-name: com.mysql.cj.jdbc.driver username: root password: rtqw123opnmer regcenter: # 注册中心 serverlist: 127.0.0.1:2181 namespace: elasticjobdemo
2.3 定时器实现方法编写
创建定时器类(唯一不同的地方在于将@scheduled
改为实现simplejob
接口即可)
定时器实现方法编写在execute
方法里。
@slf4j @component public class mysimplejob implements simplejob { // @scheduled(cron = "0 0/1 * * * ?") @override public void execute(shardingcontext shardingcontext) { log.info(string.format("thread id: %s, 作业分片总数: %s, " + "当前分片项: %s.当前参数: %s," + "作业名称: %s.作业自定义参数: %s", thread.currentthread().getid(), shardingcontext.getshardingtotalcount(), shardingcontext.getshardingitem(), shardingcontext.getshardingparameter(), shardingcontext.getjobname(), shardingcontext.getjobparameter() )); // 分片大致如下:根据配置的分片参数执行相应的逻辑 switch (context.getshardingitem()) { case 0: // do something by sharding item 0 break; case 1: // do something by sharding item 1 break; case 2: // do something by sharding item 2 break; // case n: ... } } }
log:thread id: 66, 作业分片总数: 1, 当前分片项: 0.当前参数: beijing,作业名称: propertiessimplejob.作业自定义参数: test
2.4 配置定时器
2.4.1 创建configuration类
将zookeeperregistrycenter
和jobeventconfiguration
注入。
创建jobscheduler
@bean(initmethod = "init")
。
在mysimplejobscheduler
方法里先通过elasticjobutils#getlitejobconfiguration
获取litejobconfiguration
对象。
创建springjobscheduler
对象返回即可。
@configuration public class myjobconfig { // job 名称 private static final string job_name = "mysimplejob"; // 定时器cron参数 private static final string cron = "0 0/1 * * * ?"; // 定时器分片 private static final int sharding_total_count = 1; // 分片参数 private static final string sharding_item_parameters = "0=beijing,1=shanghai,2=guangzhou"; // 自定义参数 private static final string job_parameters = "parameter"; @resource private zookeeperregistrycenter regcenter; @resource private jobeventconfiguration jobeventconfiguration; @bean(initmethod = "init") public jobscheduler mysimplejobscheduler(final mysimplejob mysimplejob) { litejobconfiguration litejobconfiguration = elasticjobutils .getlitejobconfiguration(mysimplejob.getclass(), job_name, cron, sharding_total_count, sharding_item_parameters, job_parameters); // 参数:1.定时器实例,2.注册中心类,3.litejobconfiguration, // 3.历史轨迹(不需要可以省略) return new springjobscheduler(mysimplejob, regcenter, litejobconfiguration, jobeventconfiguration); } }
elasticjobutils#getlitejobconfiguration
参数简介:
/** * 获取 {@link litejobconfiguration} 对象 * * @param jobclass 定时器实现类 * @param jobname 定时器名称 * @param cron 定时参数 * @param shardingtotalcount 作业分片总数 * @param shardingitemparameters 当前参数 可以为null * @param jobparameters 作业自定义参数 可以为null * @return {@link litejobconfiguration} */ public static litejobconfiguration getlitejobconfiguration( final class<? extends simplejob> jobclass, final string jobname, final string cron, final int shardingtotalcount, final string shardingitemparameters, final string jobparameters) { ... return ...; }
2.4.2 简化configuration类
当然也可以用下面的@configuration
实现简化,配置bootstrap.yml
或者application.yml
。
spring: elasticjob: scheduled: jobconfigmap: // 为map集合 propertiessimplejob: // 定时器key名称 jobname: propertiessimplejob // job名称 cron: 0 0/1 * * * ? // cron表达式 shardingtotalcount: 2 // 分片数量 shardingitemparameters: 0=123,1=332 // 分片参数 jobparameters: test // 自定义参数
注入springjobschedulerfactory
,在propertiessimplejobscheduler
方法里调用gerspringjobscheduler
方法即可。
@configuration public class propertiessimplejobconfig { @resource private springjobschedulerfactory springjobschedulerfactory; @bean(initmethod = "init") public jobscheduler propertiessimplejobscheduler(final propertiessimplejob job) { // 参数:1.定时器实例,2.配置名称,3.是否开启历史轨迹 return springjobschedulerfactory.getspringjobscheduler(job,"propertiessimplejob", true); } }
2.4.3 注解方式配置(推荐方式)
ps:这个注解包含了上述方式,简化定时器注入。
继承simplejob
实现方法execute
。
在annotationsimplejob
类上加入注解@elasticjobscheduler
即可。
下面为完整注解。
@slf4j @elasticjobscheduler( name = "annotationsimplejob", // 定时器名称 cron = "0/8 * * * * ?", // 定时器表达式 shardingtotalcount = 1, // 作业分片总数 默认为1 shardingitemparameters = "0=beijing,1=shanghai,2=guangzhou", // 分片序列号和参数用等号分隔 不需要参数可以不加 jobparameters = "123", // 作业自定义参数 不需要参数可以不加 isevent = true // 是否开启数据记录 默认为true ) public class annotationsimplejob implements simplejob { @override public void execute(shardingcontext shardingcontext) { log.info(string.format("thread id: %s, 作业分片总数: %s, " + "当前分片项: %s.当前参数: %s," + "作业名称: %s.作业自定义参数: %s", thread.currentthread().getid(), shardingcontext.getshardingtotalcount(), shardingcontext.getshardingitem(), shardingcontext.getshardingparameter(), shardingcontext.getjobname(), shardingcontext.getjobparameter() )); } }
总结
分布式job可以解决多个项目同一个定时器都执行的问题,配合elastic-job控制台可以直观监控定时器执行情况等。
示例代码地址: