欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Elastic-Job:动态添加任务,支持动态分片

程序员文章站 2022-03-25 16:10:00
多情只有春庭月,犹为离人照落花。 " " 概述 因项目中使用到定时任务,且服务部署多实例,因此需要解决定时任务重复执行的问题。即在同一时间点,每一个定时任务只在一个节点上执行。常见的开源方案,如 、 、 、 、 、 等。最终决定使用 。`elastic job`的亮点主要如下: 1. 基于quart ......

多情只有春庭月,犹为离人照落花。

概述

因项目中使用到定时任务,且服务部署多实例,因此需要解决定时任务重复执行的问题。即在同一时间点,每一个定时任务只在一个节点上执行。常见的开源方案,如 elastic-jobxxl-jobquartzsaturnopencronantares 等。最终决定使用elastic-jobelastic-job的亮点主要如下:

  1. 基于quartz 定时任务框架为基础的,因此具备quartz的大部分功能
  2. 使用zookeeper做协调,调度中心,更加轻量级
  3. 支持任务的分片
  4. 支持弹性扩容 , 可以水平扩展 , 当任务再次运行时,会检查当前的服务器数量,重新分片,分片结束之后才会继续执行任务
  5. 失效转移,容错处理,当一台调度服务器宕机或者跟zookeeper断开连接之后,会立即停止作业,然后再去寻找其他空闲的调度服务器,来运行剩余的任务
  6. 提供运维界面,可以管理作业和注册中心

但在实际开发中发现elastic-job对于动态添加的定时任务不支持分片。即在多实例情况下,在某个实例上动态添加任务,则该任务会一直在这一台节点上运行。如果需要在其它实例上运行,则需要以相同的参数调用其它实例接口。参考:。在多次百度+google下发现elastic-job动态添加任务这里与楼主遇到了相同的问题。但经楼主测试动态添加任务的分片时好时坏,且只要在zookeeper中注册了任务,重启时任务还是会自动初始化。(关于对动态呢任务的描述,可以参考上面链接的描述,此处不在做过多的解释)。

解决

顺着的思路,将任务的节点都集中管理起来,无论动态任务在哪个节点上进行注册,都需要将这个请求转发到其他的节点上进行初始化操作,这样就可以保证多节点分片的任务正常执行。

代码如下:

/**
     * 开启任务监听,当有任务添加时,监听zk中的数据增加,自动在其他节点也初始化该任务
     */
    public void monitorjobregister() {
        curatorframework client = zookeeperregistrycenter.getclient();
        @suppresswarnings("resource")
        pathchildrencache childrencache = new pathchildrencache(client, "/", true);
        pathchildrencachelistener childrencachelistener = new pathchildrencachelistener() {
            public void childevent(curatorframework client, pathchildrencacheevent event) throws exception {
                childdata data = event.getdata();
                switch (event.gettype()) {
                    case child_added:
                        string config = new string(client.getdata().forpath(data.getpath() + "/config"));
                        job job = jsonutils.tobean(job.class, config);
                        object bean = null;
                        // 获取bean失败则添加任务
                        try {
                            bean = ctx.getbean("springjobscheduler" + job.getjobname());
                        } catch (beansexception e) {
                            logger.error("error no bean,create bean springjobscheduler" + job.getjobname());
                        }
                        if (objects.isnull(bean)) {
                            addjob(job);
                        }
                        break;
                    default:
                        break;
                }
            }
        };
        childrencache.getlistenable().addlistener(childrencachelistener);
        try {
            // https://blog.csdn.net/u010402202/article/details/79581575
            childrencache.start(pathchildrencache.startmode.build_initial_cache);
        } catch (exception e) {
            e.printstacktrace();
        }
    }

测试

测试动态添加定时任务,支持分片失效转移。

  1. 下载 使用maven 命令install到本地
  2. 创建 demo-elastic-job项目
    目录结构如下:
demo-elastic-job
├── mvnw
├── mvnw.cmd
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── example
│   │   │           └── demo
│   │   │               ├── job
│   │   │               │   ├── dynamicjob.java
│   │   │               │   └── testjob.java
│   │   │               └── demoapplication.java
│   │   └── resources
│   │       ├── application.yml
│   │       └── application-dev.yml
│   └── test
│       └── java
│           └── com
│               └── example
│                   └── demo
│                       └── demoapplicationtests.java
├── pom.xml
└── demo.iml

pom.xml

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>
    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>2.1.1.release</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>
    <groupid>com.example</groupid>
    <artifactid>demo</artifactid>
    <version>0.0.1-snapshot</version>
    <name>demo</name>
    <description>demo project for spring boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <dependency>
            <groupid>org.apache.curator</groupid>
            <artifactid>curator-recipes</artifactid>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupid>org.apache.curator</groupid>
            <artifactid>curator-framework</artifactid>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>

        <dependency>
            <groupid>com.cxytiandi</groupid>
            <artifactid>elastic-job-spring-boot-starter</artifactid>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>

</project>

demoapplication.java

package com.example.demo;

import com.cxytiandi.elasticjob.annotation.enableelasticjob;
import com.cxytiandi.elasticjob.dynamic.service.jobservice;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.commandlinerunner;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.context.annotation.componentscan;

@springbootapplication
@enableelasticjob
@componentscan(basepackages = {"com.cxytiandi", "com.example.demo"})
public class demoapplication implements commandlinerunner {

    public static void main(string[] args) {
        springapplication.run(demoapplication.class, args);
    }

    @autowired
    private jobservice jobservice;

    @override
    public void run(string... args) throws exception {
        // 模拟初始化读取数据库 添加任务
//        job job1 = new job();
//        job1.setjobname("job1");
//        job1.setcron("0/10 * * * * ? ");
//        job1.setjobtype("simple");
//        job1.setjobclass("com.example.demo.job.dynamicjob");
//        job1.setshardingitemparameters("");
//        job1.setshardingtotalcount(2);
//        jobservice.addjob(job1);
//        job job2 = new job();
//        job2.setjobname("job2");
//        job2.setcron("0/10 * * * * ? ");
//        job2.setjobtype("simple");
//        job2.setjobclass("com.example.demo.job.dynamicjob");
//        job2.setshardingitemparameters("0=a,1=b");
//        job2.setshardingtotalcount(2);
//        jobservice.addjob(job2);
    }
}

testjob.java

package com.example.demo.job;

import com.cxytiandi.elasticjob.annotation.elasticjobconf;
import com.dangdang.ddframe.job.api.shardingcontext;
import com.dangdang.ddframe.job.api.simple.simplejob;
import lombok.extern.slf4j.slf4j;
import org.apache.commons.lang3.stringutils;
import org.springframework.stereotype.component;

/**
 * created by zhenglongfei on 2019/7/22
 *
 * @version 1.0
 */
@component
@slf4j
@elasticjobconf(name = "dayjob", cron = "0/10 * * * * ?", shardingtotalcount = 2,
        shardingitemparameters = "0=aaaa,1=bbbb", description = "简单任务", failover = true)
public class testjob implements simplejob {
    @override
    public void execute(shardingcontext shardingcontext) {
        log.info("testjob任务名:【{}】, 片数:【{}】, param=【{}】", shardingcontext.getjobname(), shardingcontext.getshardingtotalcount(),
                shardingcontext.getshardingparameter());
    }
}

dynamicjob.java

package com.example.demo.job;

import com.dangdang.ddframe.job.api.shardingcontext;
import com.dangdang.ddframe.job.api.simple.simplejob;
import lombok.extern.slf4j.slf4j;
import org.springframework.stereotype.component;

/**
 * created by zhenglongfei on 2019/7/24
 *
 * @version 1.0
 */
@component
@slf4j
public class dynamicjob implements simplejob {
    @override
    public void execute(shardingcontext shardingcontext) {


        switch (shardingcontext.getshardingitem()) {
            case 0:
                log.info("【0】 is running");
                break;
            case 1:
                log.info("【1】 is running");
                break;
        }
    }
}

application.yml

elastic:
  job:
    zk:
      serverlists: 172.25.66.137:2181
      namespace: demo_test
server:
  port: 8082
spring:
  redis:
    host: 127.0.0.1
    port: 6379

测试结果

启动两个项目分别为80818082端口,使用rest api来动态的注册任务。

  • job

http://localhost:8081/job post
参数如下:

{
  "jobname": "dynamicjob01",
  "cron": "0/3 * * * * ?",
  "jobtype": "simple",
  "jobclass": "com.example.demo.job.dynamicjob",
  "jobparameter": "test",
  "shardingtotalcount": 2,
  "shardingitemparameters": "0=aaaa,1=bbbb"
}

代码下载

  • github:
  • github: