欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot-ElasticJob封装快速上手使用(分布式定时器)

程序员文章站 2022-06-22 10:41:35
elastic job spring boot qq交流群: 1 简介 是一个分布式调度解决方案,由两个相互独立的子项目 和`Elastic Job Cloud Elastic Job Lite`定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。 基于 定时任务框架为基础的, ......

elastic-job-spring-boot

qq交流群:812321371

1 简介

elastic-job是一个分布式调度解决方案,由两个相互独立的子项目elastic-job-liteelastic-job-cloud组成。elastic-job-lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。 基于quartz定时任务框架为基础的,因此具备quartz的大部分功能 使用zookeeper做协调,调度中心,更加轻量级 支持任务的分片 支持弹性扩容,可以水平扩展, 当任务再次运行时,会检查当前的服务器数量,重新分片,分片结束之后才会继续执行任务 失效转移,容错处理,当一台调度服务器宕机或者跟zookeeper断开连接之后,会立即停止作业,然后再去寻找其他空闲的调度服务器,来运行剩余的任务 提供运维界面,可以管理作业和注册中心。

1.1 使用场景

由于项目为微服务,单模块可能在两个实例以上的数量,定时器就会出现多实例同时执行的情况。 一般定时器缺少管理界面,无法监控定时器是否执行成功。 市面上常见的解决方案为定时器加锁的操作,或者采用第3方分布式定时器。 分布式定时器有多种方案,比如阿里内部的scheduledx,当当网的elastic job,个人开源的xxl-job等。

1.2 功能列表

  • 分布式调度协调
  • 弹性扩容缩容
  • 失效转移
  • 错过执行作业重触发
  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
  • 自诊断并修复分布式不稳定造成的问题
  • 支持并行调度
  • 支持作业生命周期操作
  • 丰富的作业类型
  • spring整合以及命名空间提供
  • 运维平台

1.3 概念

分片:任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。 例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器a遍历id以奇数结尾的数据;服务器b遍历id以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为id%10,而服务器a被分配到分片项0,1,2,3,4;服务器b被分配到分片项5,6,7,8,9,直接的结果就是服务器a遍历id0-4结尾的数据;服务器b遍历id5-9结尾的数据。

历史轨迹:elastic-job提供了事件追踪功能,可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。

1.4 封装elasticjob

由于当当网elastic job处于1年间未更新阶段,相关jar处于可以使用阶段功能不全。考虑到使用场景为多项目使用,将elastic-job-lite-spring简单封装便于使用。

2.使用说明:

2.1 添加依赖

ps:实际version版本请使用最新版

<dependency>
  <groupid>com.purgeteam</groupid>
  <artifactid>elasticjob-spring-boot-starter</artifactid>
  <version>0.1.1.release</version>
</dependency>

2.2 配置

ps: 需要mysql,zookeeper支持,请提前搭建好。

配置bootstrap.yml或者application.yml

加入以下配置:

spring:
  elasticjob:
    datasource: # job需要的记录数据源
      url: jdbc:mysql://127.0.0.1:3306/batch_log?useunicode=true&characterencoding=utf-8&verifyservercertificate=false&usessl=false&requiressl=false
      driver-class-name: com.mysql.cj.jdbc.driver
      username: root
      password: rtqw123opnmer
    regcenter: # 注册中心
      serverlist: 127.0.0.1:2181
      namespace: elasticjobdemo

2.3 定时器实现方法编写

创建定时器类(唯一不同的地方在于将@scheduled改为实现simplejob接口即可) 定时器实现方法编写在execute方法里。

@slf4j
@component
public class mysimplejob implements simplejob {

    //  @scheduled(cron = "0 0/1 * * * ?")
    @override
    public void execute(shardingcontext shardingcontext) {
        log.info(string.format("thread id: %s, 作业分片总数: %s, " +
                        "当前分片项: %s.当前参数: %s," +
                        "作业名称: %s.作业自定义参数: %s",
                thread.currentthread().getid(),
                shardingcontext.getshardingtotalcount(),
                shardingcontext.getshardingitem(),
                shardingcontext.getshardingparameter(),
                shardingcontext.getjobname(),
                shardingcontext.getjobparameter()
        ));
        // 分片大致如下:根据配置的分片参数执行相应的逻辑
        switch (context.getshardingitem()) {
            case 0: 
                // do something by sharding item 0
                break;
            case 1: 
                // do something by sharding item 1
                break;
            case 2: 
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}
log:thread id: 66, 作业分片总数: 1, 当前分片项: 0.当前参数: beijing,作业名称: propertiessimplejob.作业自定义参数: test

2.4 配置定时器

2.4.1 创建configuration类

zookeeperregistrycenterjobeventconfiguration注入。 创建jobscheduler @bean(initmethod = "init")。 在mysimplejobscheduler方法里先通过elasticjobutils#getlitejobconfiguration获取litejobconfiguration对象。 创建springjobscheduler对象返回即可。

@configuration
public class myjobconfig {

    // job 名称
    private static final string job_name = "mysimplejob";

    // 定时器cron参数
    private static final string cron = "0 0/1 * * * ?";

    // 定时器分片
    private static final int sharding_total_count = 1;

    // 分片参数
    private static final string sharding_item_parameters = "0=beijing,1=shanghai,2=guangzhou";

    // 自定义参数
    private static final string job_parameters = "parameter";

    @resource
    private zookeeperregistrycenter regcenter;

    @resource
    private jobeventconfiguration jobeventconfiguration;


    @bean(initmethod = "init")
    public jobscheduler mysimplejobscheduler(final mysimplejob mysimplejob) {

        litejobconfiguration litejobconfiguration = elasticjobutils
                .getlitejobconfiguration(mysimplejob.getclass(), job_name, cron,
                        sharding_total_count, sharding_item_parameters, job_parameters);
        // 参数:1.定时器实例,2.注册中心类,3.litejobconfiguration,
        //     3.历史轨迹(不需要可以省略)
        return new springjobscheduler(mysimplejob, regcenter, litejobconfiguration, jobeventconfiguration);
    }

}

elasticjobutils#getlitejobconfiguration参数简介:

/**
     * 获取 {@link litejobconfiguration} 对象
     *
     * @param jobclass               定时器实现类
     * @param jobname                定时器名称
     * @param cron                   定时参数
     * @param shardingtotalcount     作业分片总数
     * @param shardingitemparameters 当前参数 可以为null
     * @param jobparameters          作业自定义参数 可以为null
     * @return {@link litejobconfiguration}
     */
  public static litejobconfiguration getlitejobconfiguration(
      final class<? extends simplejob> jobclass,
      final string jobname,
      final string cron,
      final int shardingtotalcount,
      final string shardingitemparameters,
      final string jobparameters) {
  ...
    return ...;
  }

2.4.2 简化configuration类

当然也可以用下面的@configuration实现简化,配置bootstrap.yml或者application.yml

spring:
  elasticjob:
    scheduled:
      jobconfigmap: // 为map集合
        propertiessimplejob: // 定时器key名称
          jobname: propertiessimplejob // job名称
          cron: 0 0/1 * * * ? // cron表达式
          shardingtotalcount: 2 // 分片数量
          shardingitemparameters: 0=123,1=332 // 分片参数
          jobparameters: test // 自定义参数

注入springjobschedulerfactory,在propertiessimplejobscheduler方法里调用gerspringjobscheduler方法即可。

@configuration
public class propertiessimplejobconfig {

    @resource
    private springjobschedulerfactory springjobschedulerfactory;

    @bean(initmethod = "init")
    public jobscheduler propertiessimplejobscheduler(final propertiessimplejob job) {
        // 参数:1.定时器实例,2.配置名称,3.是否开启历史轨迹
        return springjobschedulerfactory.getspringjobscheduler(job,"propertiessimplejob", true);
    }

}

2.4.3 注解方式配置(推荐方式)

ps:这个注解包含了上述方式,简化定时器注入。

继承simplejob实现方法execute

annotationsimplejob类上加入注解@elasticjobscheduler即可。 下面为完整注解。

@slf4j
@elasticjobscheduler(
        name = "annotationsimplejob", // 定时器名称
        cron = "0/8 * * * * ?", // 定时器表达式
    	shardingtotalcount = 1, // 作业分片总数 默认为1
        shardingitemparameters = "0=beijing,1=shanghai,2=guangzhou",  // 分片序列号和参数用等号分隔 不需要参数可以不加
        jobparameters = "123", // 作业自定义参数 不需要参数可以不加
    	isevent = true // 是否开启数据记录 默认为true
)
public class annotationsimplejob implements simplejob {

    @override
    public void execute(shardingcontext shardingcontext) {
        log.info(string.format("thread id: %s, 作业分片总数: %s, " +
                        "当前分片项: %s.当前参数: %s," +
                        "作业名称: %s.作业自定义参数: %s",
                thread.currentthread().getid(),
                shardingcontext.getshardingtotalcount(),
                shardingcontext.getshardingitem(),
                shardingcontext.getshardingparameter(),
                shardingcontext.getjobname(),
                shardingcontext.getjobparameter()
        ));
    }
}

总结

分布式job可以解决多个项目同一个定时器都执行的问题,配合elastic-job控制台可以直观监控定时器执行情况等。

SpringBoot-ElasticJob封装快速上手使用(分布式定时器)

示例代码地址:

作者github: purgeyao 欢迎关注 本文由博客一文多发平台 openwrite 发布!