Java Elastic Job动态添加任务实现过程解析
背景
在使用elastic-job的过程中,有很多人遇到了这么一个问题,就是如何动态的去添加任务?
在官方的文档中也有对此作出回答,如下:
动态添加作业这个概念每个人理解不尽相同。
elastic-job-lite为jar包,由开发或运维人员负责启动。启动时自动向注册中心注册作业信息并进行分布式协调,因此并不需要手工在注册中心填写作业信息。 但注册中心与作业部署机无从属关系,注册中心并不能控制将单点的作业分发至其他作业机,也无法将远程服务器未启动的作业启动。elastic-job-lite并不会包含ssh免密管理等功能。
elastic-job-cloud为mesos框架,由mesos负责作业启动和分发。 但需要将作业打包上传,并调用elastic-job-cloud提供的rest api写入注册中心。 打包上传属于部署系统的范畴elastic-job-cloud并未涉及。
综上所述,elastic-job已做了基本动态添加功能,但无法做到真正意义的完全自动化添加。
接下来谈谈我对动态任务的理解,我眼中的动态任务分为2种:
一种是全新的任务,包括实现的逻辑也是全新的,也就是当我们的程序打成一个jar包后,线上已经在运行了,这个时候我加了一个新的任务,如何能做到不停服务,将这个任务集成到已有的任务中去,这个实现起来难度比较大,涉及到java类的热加载等,不过最近阿里又有一开源大作jarslink,github地址:,可以支持在运行时动态加载到系统中,实现不需要重启和发布系统新增功能。还有一种实现思路我们可以利用groovy脚本来做这样的事情,一般情况下重启来发布新的任务会比较常见,如果各位一定要实现动态的任务可以自己尝试着去研究下我提供的思路。
另一种就是执行的业务逻辑不变,只是运行的时间发生变化。比如文章的定时发布,可以设置文章在某天的某分钟进行自动发布,实现这个功能有多种方式,你可以不停的扫描任务,一到时间点就自动发布,比较优雅的方式就是为每篇文章的自动发布都设置一个任务,通过cron表达式来指定执行时间,不同的是每个任务都有自己的参数,业务逻辑都是固定的定时发布。
接下来我给大家介绍下elastic-job实现上面讲的第二种动态任务的方式,也就是任务的实现逻辑已经是存在的,只是需要发布成多个不同时间去触发的任务。
实战
实现任务的动态添加比较简单,只需要接收任务的信息,然后初始化一下就可以了,在实现的过程中笔者遇到了一个麻烦的问题?
在多节点分片任务却只有一个节点能执行,问题原因在于当有任务a和任务b,2个节点的时候,我们调用a节点的接口进行任务的动态添加,在a节点中初始化了任务调度器,数据也存储到了注册中心,但是b节点是不知道有新的任务添加,默认的使用方法是每个节点在启动时去初始化任务调度器,而我们的b节点已经启动过了,任务是新添加的。
解决这个问题最简单的方式就是将任务的节点都集中管理起来,无论动态任务在哪个节点上进行注册,都需要将这个请求转发到其他的节点上进行初始化操作,这样就可以保证多节点分片的任务正常执行。
还有一种对使用者更友好的办法是对zookeeper中的节点进行监听,当有新的节点创建时,就自动获取这个节点的配置信息,在本地进行任务初始化,通过这样的方式就可以不用去转发请求到其他节点了,只要在任何节点有添加操作,都能被监听到,并自己去初始化。
监控代码如下:
/** * 开启任务监听,当有任务添加时,监听zk中的数据增加,自动在其他节点也初始化该任务 */ public void monitorjobregister() { curatorframework client = zookeeperregistrycenter.getclient(); @suppresswarnings("resource") pathchildrencache childrencache = new pathchildrencache(client, "/", true); pathchildrencachelistener childrencachelistener = new pathchildrencachelistener() { public void childevent(curatorframework client, pathchildrencacheevent event) throws exception { childdata data = event.getdata(); switch (event.gettype()) { case child_added: string config = new string(client.getdata().forpath(data.getpath() + "/config")); job job = jsonutils.tobean(job.class, config); addjob(job); break; default: break; } } }; childrencache.getlistenable().addlistener(childrencachelistener); try { childrencache.start(startmode.post_initialized_event); } catch (exception e) { e.printstacktrace(); } }
为了方便大家使用,我将动态添加任务的功能集成到了我之前的elastic-job-spring-boot-starter()中集成了动态添加的逻辑,大家引入依赖即可使用。
使用方式比较简单,只需要在启动类上加一个componentscan注解,让spring能够扫描到elastic-job-spring-boot-starter提供的代码即可:
@springbootapplication @enableelasticjob //开启动态任务添加api @componentscan(basepackages = {"com.cxytiandi"}) public class jobapplication { public static void main(string[] args) { new springapplicationbuilder().sources(jobapplication.class).web(true).run(args); try { new countdownlatch(1).await(); } catch (interruptedexception e) { } } }
配置好之后,启动项目就可以通过rest api来动态的注册任务,api列表如下:
/job
添加任务是post请求,数据格式为json体提交,格式如下:
{
"jobname":"dynamicjob13",
"cron":"0 33 16 ?",
"jobtype":"simple",
"jobclass":"com.cxytiandi.job.demo.dynamicjob",
"jobparameter":"2222222",
"shardingtotalcount":1
}
完整字段请参考:
注意:jobclass必须事先存在于服务中
* /job/remove
删除任务是get请求,参数只要任务名称即可,比如:/job/remove?jobname=任务名。可以用于任务完成之后清空注册中心的任务信息。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。