欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring Boot 整合——Spring batch任务流程控制以及流程分割

程序员文章站 2022-05-01 17:02:02
...

关于版本

依赖 版本
springboot 2.4.0
spring batch 2.4.0

代码地址

因为每个例子涉及代码较多,且包含测试用例,如果都贴到文章中内容过多,所以只贴出了部分代码。全部的代码在这里: https://gitee.com/daifyutils/springboot-samples

此篇文章所属模块为:base-batch-2.4.0

目录地址

目录 测试目录 内容
dai.samples.batch.allowstart dai.samples.allow 测试任务可以重复执行
dai.samples.batch.base dai.samples.base 基础任务配置
dai.samples.batch.skip dai.samples.skip 跳过操作
dai.samples.batch.listener dai.samples.listener 任务监听器
dai.samples.batch.process dai.samples.process 流程控制的代码
dai.samples.batch.add dai.samples.add 任务流程切割
dai.samples.batch.retry dai.samples.retry 任务重试
dai.samples.batch.rollback dai.samples.rollback 任务回滚
dai.samples.batch.rw dai.samples.rw 数据的读取和输出

Spring Batch其他内容

Spring Boot 整合——Spring batch基本使用

Spring Boot 整合——Spring batch的监听器

Spring Boot 整合——Spring batch任务流程控制以及流程分割

Spring Boot 整合——Spring batch通过不同方式读取数据(ItemReader)

Spring Boot 整合——Spring batch通过不同方式输出数据(ItemWriter)

Spring Boot 整合——Spring batch重试和回滚

Spring batch

spring batch 是spring提供的一个批数据处理的框架。提供了大量信息的自动化和定时处理的操作。其是一个相对轻量级的批处理操作

任务流程控制

下面所有的功能点描述都提供了测试方法,具体在项目测试目录下的dai.samples.batch.process

顺序流程

简单的任务中一般不进行流程控制,只是进行顺序执行。

通常在开始任务(start)之后执行下一步(next)直到结束(end)

public Job processJob(JobRepository jobRepository) {
    return this.jobBuilderFactory.get("processJob")
            .repository(jobRepository)
            .start(processStep())
            .next(jobExecutionDecider()).on("failed").to(failed())
            .end()
            .build();
}

条件流程

某些时候我们需要根据不同的结果执行不同的条件这个时候需要对任务流程进行控制。

    public Job processJob(JobRepository jobRepository) {
        return this.jobBuilderFactory.get("processJob")
                .repository(jobRepository)
                .start(processStep())
                .next(jobExecutionDecider())
                .on("*").to(success())
                .from(jobExecutionDecider())
                .on("FAILED").to(failed())
                .end()
                .build();
    }

上面的例子中在执行完processStep步骤后执行jobExecutionDecider步骤。其默认执行success步骤,但是当jobExecutionDecider返回FAILED时其执行failed步骤。

提供决策的步骤是JobExecutionDecider的实现类

    /**
     * 决策流、条件结果
     * @return
     */
    @Bean
    public JobExecutionDecider jobExecutionDecider() {
        return new JobExecutionDecider() {
            @Override
            public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
                // 根据传递参数的类型返回对应类型
                String type = jobExecution.getJobParameters().getString("type");
                return new FlowExecutionStatus(type);
            }
        };
    }

而返回的内容都是基于FlowExecutionStatus中设置的状态值

	private enum Status {
		COMPLETED, STOPPED, FAILED, UNKNOWN;
		static Status match(String value) {
			for (int i = 0; i < values().length; i++) {
				Status status = values()[i];
				if (value.startsWith(status.toString())) {
					return status;
				}
			}
			// Default match should be the lowest priority
			return COMPLETED;
		}

	}

控制结束

Spring Batch 提供了三种退出的方式:

  1. end:Job任务在顺序执行退出,且没有出现其他异常的时候会通过此种方式结束。
  2. Fail:这个时候表示执行失败。

下面的代码就实现了当jobExecutionDecider返回FAILED时直接返回错误的结果。

    public Job processJobEnd(JobRepository jobRepository) {
        return this.jobBuilderFactory.get("processJobEnd")
                .repository(jobRepository)
                .start(processStep())
                .next(jobExecutionDecider())
                .on("*").to(success())
                .from(jobExecutionDecider())
                .on("FAILED").fail()
                .end()
                .build();
    }
  1. 此外batch还提供了了一种stopAndRestart方法。此方法是当任务执行到了指定结果时进行中断,下次重启后直接从中断点继续执行。

任务流程分割

在任务处理过程中,流程都是一个一个执行的,但是在某些情况我们需要多个流程同时执行,并且同时执行完毕后再进行下一步操作,这个时候可以使用下面的方式:

    /**
     * 流程1
     * @return
     */
    @Bean("flow1")
    public Flow addFlow1() {
        return new FlowBuilder<SimpleFlow>("flow1")
                .start(addStep1())
                .build();
    }


    /**
     * 流程2
     * @return
     */
    @Bean("flow2")
    public Flow addFlow2() {
        return new FlowBuilder<SimpleFlow>("flow2")
                .start(addStep2())
                .build();
    }

    /**
     * 任务的流程控制
     * @param jobRepository
     * @return
     */
    @Bean("addJob")
    public Job addJob(JobRepository jobRepository) {
        // 在addFlow1 和 addFlow2中对数据进行处理并保存后,在success中进行再次处理
        return this.jobBuilderFactory.get("addJob")
                .repository(jobRepository)
                .start(addFlow1())
                .split(new SimpleAsyncTaskExecutor())
                .add(addFlow2())
                .next(success())
                .end()
                .build();
    }

在任务的开始设置了flow1和flow2的并行执行,当两个任务执行完毕后执行success任务。


个人水平有限,上面的内容可能存在没有描述清楚或者错误的地方,因为每一个例子都提供了测试代码,一般来说不会有问题,但是因为这几篇内容断断续续用了一个半月可能会出现之后的代码影响了之前的例子,假如开发同学发现了,请及时告知,我会第一时间修改相关内容,也希望大家看在这个新春佳节只能宅到家中埋头苦逼的码代码的情况下,能给我点一个赞。你的点赞就是我前进的动力。