SpringBoot2 整合ElasticJob框架,定制化管理流程
本文源码:github·点这里 || gitee·点这里
一、elasticjob简介
1、定时任务
在前面的文章中,说过quartjob这个定时任务,被广泛应用的定时任务标准。但quartz核心点在于执行定时任务并不是在于关注的业务模式和场景,缺少高度自定义的功能。quartz能够基于数据库实现任务的高可用,但是不具备分布式并行调度的功能。
2、elasticjob说明
- 基础简介
elastic-job 是一个开源的分布式调度中间件,由两个相互独立的子项目 elastic-job-lite 和 elastic-job-cloud 组成。elastic-job-lite 为轻量级无中心化解决方案,使用 jar 包提供分布式任务的调度和治理。 elastic-job-cloud 是一个 mesos framework,依托于mesos额外提供资源治理、应用分发以及进程隔离等服务。
- 功能特点
分布式调度协调 弹性扩容缩容 失效转移 错过执行作业重触发 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
补刀
:人家官网这样描述的,这里赘述一下,充实一下文章。
- 基础框架结构
该图片来自elasticjob官网。
由图可知如下内容:
需要zookeeper组件支持,作为分布式的调度任务,有良好的监听机制,和控制台,下面的案例也就冲这个图解来。
3、分片管理
这个概念在elasticjob中是最具有特点的,实用性极好。
- 分片概念
任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
场景描述:假设有服务3台,分3片管理,要处理数据表100条,那就可以100%3,按照余数0,1,2分散到三台服务上执行,看到这里分库分表的基本逻辑涌上心头,这就是为何很多大牛讲说,编程思维很重要。
- 个性化参数
个性化参数即shardingitemparameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。
场景描述:这里猛一读好像很飘逸,其实就是这个意思,如果分3片,取名[0,1,2]不好看,或者不好标识,可以分别给个别名标识一下,[0=a,1=b,2=c]。
二、定时任务加载
1、核心依赖包
这里使用2.0+的版本。
<dependency> <groupid>com.dangdang</groupid> <artifactid>elastic-job-lite-core</artifactid> <version>2.1.5</version> </dependency> <dependency> <groupid>com.dangdang</groupid> <artifactid>elastic-job-lite-spring</artifactid> <version>2.1.5</version> </dependency>
2、核心配置文件
这里主要配置一下zookeeper中间件,分片和分片参数。
zookeeper: server: 127.0.0.1:2181 namespace: es-job job-config: cron: 0/10 * * * * ? shardcount: 1 sharditem: 0=a,1=b,2=c,3=d
3、自定义注解
看了官方的案例,没看到好用的注解,这里只能自己编写一个,基于案例的加载过程和核心api作为参考。
核心配置类:
com.dangdang.ddframe.job.lite.config.litejobconfiguration
根据自己想如何使用注解的思路,比如我只想注解定时任务名称和cron表达式这两个功能,其他参数直接统一配置(这里可能是受quartjob影响太深,可能根本就是想省事...)
@inherited @target({elementtype.type}) @retention(retentionpolicy.runtime) public @interface taskjobsign { @aliasfor("cron") string value() default ""; @aliasfor("value") string cron() default ""; string jobname() default ""; }
4、作业案例
这里打印一些基本参数,对照配置和注解,一目了然。
@component @taskjobsign(cron = "0/5 * * * * ?",jobname = "hello-job") public class hellojob implements simplejob { private static final logger log = loggerfactory.getlogger(hellojob.class.getname()) ; @override public void execute(shardingcontext shardingcontext) { log.info("当前线程: "+thread.currentthread().getid()); log.info("任务分片:"+shardingcontext.getshardingtotalcount()); log.info("当前分片:"+shardingcontext.getshardingitem()); log.info("分片参数:"+shardingcontext.getshardingparameter()); log.info("任务参数:"+shardingcontext.getjobparameter()); } }
5、加载定时任务
既然自定义注解,那加载过程自然也要自定义一下,读取自定义的注解,配置化,加入容器,然后初始化,等着任务执行就好。
@configuration public class elasticjobconfig { @resource private applicationcontext applicationcontext ; @resource private zookeeperregistrycenter zookeeperregistrycenter; @value("${job-config.cron}") private string cron ; @value("${job-config.shardcount}") private int shardcount ; @value("${job-config.sharditem}") private string sharditem ; /** * 配置任务监听器 */ @bean public elasticjoblistener elasticjoblistener() { return new taskjoblistener(); } /** * 初始化配置任务 */ @postconstruct public void inittaskjob() { map<string, simplejob> jobmap = this.applicationcontext.getbeansoftype(simplejob.class); iterator iterator = jobmap.entryset().iterator(); while (iterator.hasnext()) { // 自定义注解管理 map.entry<string, simplejob> entry = (map.entry)iterator.next(); simplejob simplejob = entry.getvalue(); taskjobsign taskjobsign = simplejob.getclass().getannotation(taskjobsign.class); if (taskjobsign != null){ string cron = taskjobsign.cron() ; string jobname = taskjobsign.jobname() ; // 生成配置 simplejobconfiguration simplejobconfiguration = new simplejobconfiguration( jobcoreconfiguration.newbuilder(jobname, cron, shardcount) .shardingitemparameters(sharditem).jobparameter(jobname).build(), simplejob.getclass().getcanonicalname()); litejobconfiguration litejobconfiguration = litejobconfiguration.newbuilder( simplejobconfiguration).overwrite(true).build(); taskjoblistener taskjoblistener = new taskjoblistener(); // 初始化任务 springjobscheduler jobscheduler = new springjobscheduler( simplejob, zookeeperregistrycenter, litejobconfiguration, taskjoblistener); jobscheduler.init(); } } } }
絮叨一句
:不要疑问这些api是怎么知道,看下官方文档的案例,他们怎么使用这些核心api,这里就是照着写过来,就是多一步自定义注解类的加载过程。当然官方文档大致读一遍还是很有必要的。
补刀一句
:如何快速学习一些组件的用法,首先找到官方文档,或者开源库wiki,再不济readme文档(如果都没有,酌情放弃,另寻其他),熟悉基本功能是否符合自己的需求,如果符合,就看下基本用法案例,熟悉api,最后就是研究自己需要的功能模块,个人经验来看,该过程是弯路最少,坑最少的。
6、任务监听
用法非常简单,实现elasticjoblistener接口。
@component public class taskjoblistener implements elasticjoblistener { private static final logger log = loggerfactory.getlogger(taskjoblistener.class); private long begintime = 0; @override public void beforejobexecuted(shardingcontexts shardingcontexts) { begintime = system.currenttimemillis(); log.info(shardingcontexts.getjobname()+"===>开始..."); } @override public void afterjobexecuted(shardingcontexts shardingcontexts) { long endtime = system.currenttimemillis(); log.info(shardingcontexts.getjobname()+ "===>结束...[耗时:"+(endtime - begintime)+"]"); } }
絮叨一句
:before和after执行前后,中间执行目标方法,标准的aop切面思想,所以底层水平决定了对上层框架的理解速度,那本《java编程思想》上的灰尘是不是该擦擦?
三、动态添加
1、作业任务
有部分场景需要动态添加和管理定时任务,基于上面的加载流程,在自定义一些步骤就可以。
@component public class gettimejob implements simplejob { private static final logger log = loggerfactory.getlogger(gettimejob.class.getname()) ; private static final simpledateformat format = new simpledateformat("yyyy-mm-dd hh:mm:ss") ; @override public void execute(shardingcontext shardingcontext) { log.info("job name:"+shardingcontext.getjobname()); log.info("local time:"+format.format(new date())); } }
2、添加任务服务
这里就动态添加上面的任务。
@service public class taskjobservice { @resource private zookeeperregistrycenter zookeeperregistrycenter; public void addtaskjob(final string jobname,final simplejob simplejob, final string cron,final int shardcount,final string sharditem) { // 配置过程 jobcoreconfiguration jobcoreconfiguration = jobcoreconfiguration.newbuilder( jobname, cron, shardcount) .shardingitemparameters(sharditem).build(); jobtypeconfiguration jobtypeconfiguration = new simplejobconfiguration(jobcoreconfiguration, simplejob.getclass().getcanonicalname()); litejobconfiguration litejobconfiguration = litejobconfiguration.newbuilder( jobtypeconfiguration).overwrite(true).build(); taskjoblistener taskjoblistener = new taskjoblistener(); // 加载执行 springjobscheduler jobscheduler = new springjobscheduler( simplejob, zookeeperregistrycenter, litejobconfiguration, taskjoblistener); jobscheduler.init(); } }
补刀一句
:这里添加之后,任务就会定时执行,如何停止任务又是一个问题,可以在任务名上做一些配置,比如在数据库生成一条记录[1,job1,state],如果调度到state为停止状态的任务,直接截胡即可。
3、测试接口
@restcontroller public class taskjobcontroller { @resource private taskjobservice taskjobservice ; @requestmapping("/addjob") public string addjob(@requestparam("cron") string cron,@requestparam("jobname") string jobname, @requestparam("shardcount") integer shardcount, @requestparam("sharditem") string sharditem) { taskjobservice.addtaskjob(jobname, new gettimejob(), cron, shardcount, sharditem); return "success"; } }
四、源代码地址
github·地址 https://github.com/cicadasmile/middle-ware-parent gitee·地址 https://gitee.com/cicadasmile/middle-ware-parent
下一篇: 用了腾讯云服务器遇到两个问题