Elastic-Job:动态添加任务,支持动态分片
多情只有春庭月,犹为离人照落花。
概述
因项目中使用到定时任务,且服务部署多实例,因此需要解决定时任务重复执行的问题。即在同一时间点,每一个定时任务只在一个节点上执行。常见的开源方案,如 elastic-job
、 xxl-job
、quartz
、 saturn
、 opencron
、 antares
等。最终决定使用elastic-job
。elastic-job
的亮点主要如下:
- 基于quartz 定时任务框架为基础的,因此具备quartz的大部分功能
- 使用zookeeper做协调,调度中心,更加轻量级
- 支持任务的分片
- 支持弹性扩容 , 可以水平扩展 , 当任务再次运行时,会检查当前的服务器数量,重新分片,分片结束之后才会继续执行任务
- 失效转移,容错处理,当一台调度服务器宕机或者跟zookeeper断开连接之后,会立即停止作业,然后再去寻找其他空闲的调度服务器,来运行剩余的任务
- 提供运维界面,可以管理作业和注册中心
但在实际开发中发现elastic-job
对于动态添加的定时任务不支持分片。即在多实例情况下,在某个实例上动态添加任务,则该任务会一直在这一台节点上运行。如果需要在其它实例上运行,则需要以相同的参数调用其它实例接口。参考:。在多次百度+google
下发现elastic-job动态添加任务这里与楼主遇到了相同的问题。但经楼主测试动态添加任务的分片时好时坏,且只要在zookeeper
中注册了任务,重启时任务还是会自动初始化。(关于对动态呢任务的描述,可以参考上面链接的描述,此处不在做过多的解释)。
解决
顺着的思路,将任务的节点都集中管理起来,无论动态任务在哪个节点上进行注册,都需要将这个请求转发到其他的节点上进行初始化操作,这样就可以保证多节点分片的任务正常执行。
代码如下:
/** * 开启任务监听,当有任务添加时,监听zk中的数据增加,自动在其他节点也初始化该任务 */ public void monitorjobregister() { curatorframework client = zookeeperregistrycenter.getclient(); @suppresswarnings("resource") pathchildrencache childrencache = new pathchildrencache(client, "/", true); pathchildrencachelistener childrencachelistener = new pathchildrencachelistener() { public void childevent(curatorframework client, pathchildrencacheevent event) throws exception { childdata data = event.getdata(); switch (event.gettype()) { case child_added: string config = new string(client.getdata().forpath(data.getpath() + "/config")); job job = jsonutils.tobean(job.class, config); object bean = null; // 获取bean失败则添加任务 try { bean = ctx.getbean("springjobscheduler" + job.getjobname()); } catch (beansexception e) { logger.error("error no bean,create bean springjobscheduler" + job.getjobname()); } if (objects.isnull(bean)) { addjob(job); } break; default: break; } } }; childrencache.getlistenable().addlistener(childrencachelistener); try { // https://blog.csdn.net/u010402202/article/details/79581575 childrencache.start(pathchildrencache.startmode.build_initial_cache); } catch (exception e) { e.printstacktrace(); } }
测试
测试动态添加定时任务,支持分片失效转移。
- 下载 使用
maven
命令install
到本地 - 创建
demo-elastic-job
项目
目录结构如下:
demo-elastic-job ├── mvnw ├── mvnw.cmd ├── src │ ├── main │ │ ├── java │ │ │ └── com │ │ │ └── example │ │ │ └── demo │ │ │ ├── job │ │ │ │ ├── dynamicjob.java │ │ │ │ └── testjob.java │ │ │ └── demoapplication.java │ │ └── resources │ │ ├── application.yml │ │ └── application-dev.yml │ └── test │ └── java │ └── com │ └── example │ └── demo │ └── demoapplicationtests.java ├── pom.xml └── demo.iml
pom.xml
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>2.1.1.release</version> <relativepath/> <!-- lookup parent from repository --> </parent> <groupid>com.example</groupid> <artifactid>demo</artifactid> <version>0.0.1-snapshot</version> <name>demo</name> <description>demo project for spring boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.apache.curator</groupid> <artifactid>curator-recipes</artifactid> <version>2.10.0</version> </dependency> <dependency> <groupid>org.apache.curator</groupid> <artifactid>curator-framework</artifactid> <version>2.10.0</version> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>com.cxytiandi</groupid> <artifactid>elastic-job-spring-boot-starter</artifactid> <version>1.0.4</version> </dependency> <dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> <optional>true</optional> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> </plugins> </build> </project>
demoapplication.java
package com.example.demo; import com.cxytiandi.elasticjob.annotation.enableelasticjob; import com.cxytiandi.elasticjob.dynamic.service.jobservice; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.commandlinerunner; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.context.annotation.componentscan; @springbootapplication @enableelasticjob @componentscan(basepackages = {"com.cxytiandi", "com.example.demo"}) public class demoapplication implements commandlinerunner { public static void main(string[] args) { springapplication.run(demoapplication.class, args); } @autowired private jobservice jobservice; @override public void run(string... args) throws exception { // 模拟初始化读取数据库 添加任务 // job job1 = new job(); // job1.setjobname("job1"); // job1.setcron("0/10 * * * * ? "); // job1.setjobtype("simple"); // job1.setjobclass("com.example.demo.job.dynamicjob"); // job1.setshardingitemparameters(""); // job1.setshardingtotalcount(2); // jobservice.addjob(job1); // job job2 = new job(); // job2.setjobname("job2"); // job2.setcron("0/10 * * * * ? "); // job2.setjobtype("simple"); // job2.setjobclass("com.example.demo.job.dynamicjob"); // job2.setshardingitemparameters("0=a,1=b"); // job2.setshardingtotalcount(2); // jobservice.addjob(job2); } }
testjob.java
package com.example.demo.job; import com.cxytiandi.elasticjob.annotation.elasticjobconf; import com.dangdang.ddframe.job.api.shardingcontext; import com.dangdang.ddframe.job.api.simple.simplejob; import lombok.extern.slf4j.slf4j; import org.apache.commons.lang3.stringutils; import org.springframework.stereotype.component; /** * created by zhenglongfei on 2019/7/22 * * @version 1.0 */ @component @slf4j @elasticjobconf(name = "dayjob", cron = "0/10 * * * * ?", shardingtotalcount = 2, shardingitemparameters = "0=aaaa,1=bbbb", description = "简单任务", failover = true) public class testjob implements simplejob { @override public void execute(shardingcontext shardingcontext) { log.info("testjob任务名:【{}】, 片数:【{}】, param=【{}】", shardingcontext.getjobname(), shardingcontext.getshardingtotalcount(), shardingcontext.getshardingparameter()); } }
dynamicjob.java
package com.example.demo.job; import com.dangdang.ddframe.job.api.shardingcontext; import com.dangdang.ddframe.job.api.simple.simplejob; import lombok.extern.slf4j.slf4j; import org.springframework.stereotype.component; /** * created by zhenglongfei on 2019/7/24 * * @version 1.0 */ @component @slf4j public class dynamicjob implements simplejob { @override public void execute(shardingcontext shardingcontext) { switch (shardingcontext.getshardingitem()) { case 0: log.info("【0】 is running"); break; case 1: log.info("【1】 is running"); break; } } }
application.yml
elastic: job: zk: serverlists: 172.25.66.137:2181 namespace: demo_test server: port: 8082 spring: redis: host: 127.0.0.1 port: 6379
测试结果
启动两个项目分别为8081
和8082
端口,使用rest api
来动态的注册任务。
- job
http://localhost:8081/job post
参数如下:
{ "jobname": "dynamicjob01", "cron": "0/3 * * * * ?", "jobtype": "simple", "jobclass": "com.example.demo.job.dynamicjob", "jobparameter": "test", "shardingtotalcount": 2, "shardingitemparameters": "0=aaaa,1=bbbb" }
代码下载
- github:
- github:
上一篇: c++ sizeof运算符的使用及理解
下一篇: Python基础---控制执行流程
推荐阅读