欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring Boot 整合——Spring batch的监听器

程序员文章站 2022-03-02 18:33:49
...

关于版本

依赖 版本
springboot 2.4.0
spring batch 2.4.0

代码地址

因为每个例子涉及代码较多,且包含测试用例,如果都贴到文章中内容过多,所以只贴出了部分代码。全部的代码在这里: https://gitee.com/daifyutils/springboot-samples

此篇文章所属模块为:base-batch-2.4.0

目录地址

目录 测试目录 内容
dai.samples.batch.allowstart dai.samples.allow 测试任务可以重复执行
dai.samples.batch.base dai.samples.base 基础任务配置
dai.samples.batch.skip dai.samples.skip 跳过操作
dai.samples.batch.listener dai.samples.listener 任务监听器
dai.samples.batch.process dai.samples.process 流程控制的代码
dai.samples.batch.add dai.samples.add 任务流程切割
dai.samples.batch.retry dai.samples.retry 任务重试
dai.samples.batch.rollback dai.samples.rollback 任务回滚
dai.samples.batch.rw dai.samples.rw 数据的读取和输出

Spring Batch其他内容

Spring Boot 整合——Spring batch基本使用

Spring Boot 整合——Spring batch的监听器

Spring Boot 整合——Spring batch任务流程控制以及流程分割

Spring Boot 整合——Spring batch通过不同方式读取数据(ItemReader)

Spring Boot 整合——Spring batch通过不同方式输出数据(ItemWriter)

Spring Boot 整合——Spring batch重试和回滚

Spring batch

spring batch 是spring提供的一个批数据处理的框架。提供了大量信息的自动化和定时处理的操作。其是一个相对轻量级的批处理操作

Spring batch监听器

批任务处理中,需要定义的对象主要为job(任务)、step(任务步骤)。而对应的Spring batch为两者提供了对应的监听器接口。

job监听器接口

public interface JobExecutionListener {

	void beforeJob(JobExecution jobExecution);

	void afterJob(JobExecution jobExecution);

}

step监听器接口

public interface StepListener {

}

看到这里你会发现步监听器接口里面是个空白的内容,这是因为其真正提供监听方法的是其子类

ChunkListenerStepExecutionListenerItemReadListenerItemWriteListenerSkipListener

这里需要注意,javax.batch.api.chunk.listener 包下也有类似接口,不要搞混了

想确定Srping Batch支持哪些接口可以看这里的代码

// org/springframework/batch/core/step/builder/SimpleStepBuilder.java#listener(Object listener)
....
		itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeRead.class));
		itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterRead.class));
		itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeProcess.class));
		itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterProcess.class));
		itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeWrite.class));
		itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterWrite.class));
		itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnReadError.class));
		itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnProcessError.class));
		itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnWriteError.class));
....

和这里

// org/springframework/batch/core/step/builder/FaultTolerantStepBuilder.java#listener(Object listener)
......
		skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInRead.class));
		skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInProcess.class));
		skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInWrite.class));
......

Spring Batch支持的监听器

监听器 作用
JobExecutionListener 批处理任务的监听器
StepExecutionListener 步骤执行的监听器
ChunkListener 块执行监听器
ItemReadListener 数据输入监听器
ItemWriteListener 数据输出监听器
SkipListener 跳过异常监听器

监听器的注入方式

batch支持将监听器的实例注入到任务执行之中。但是不同的监听器注册的地方和时机都有所不同

任务监听器

任务监听器需要在JobBuilderHelper中注入

    @Bean("listenerJob")
    public Job retrySkipJob(JobRepository jobRepository) {
        return this.jobBuilderFactory.get("listenerJob")
                .repository(jobRepository)
                .start(listenerStep())
                .listener(myJobExecutionListener())
                .build();
    }

执行步骤监听器

此监听器需要在StepBuilderHelper中注入

    @Bean("listenerStep")
    public Step listenerStep() {
        return this.stepBuilderFactory.get("listenerStep")
                .<BatchEntity, BatchEntity>chunk(5)
                .reader(itemReader())
                .processor(getProcessor())
                .writer(itemWriter())
                .listener(myChunkListener())// 块监听器
                .listener(myStepExecutionListener())// step监听器
                .listener(myItemReadListener())// 读取监听器
                .listener(myItemProcessListener())// 处理监听器
                .listener(myItemWriterListener())
                .faultTolerant()
                .skip(Exception.class) // 重试
                .skipLimit(100)
                .listener(mySkipListener())// 跳过监听器,注意此监听器需要在faultTolerant后面
                .build();
    }

需要注意的是Skip监听器需要在FaultTolerantStepBuilder主注入,所以需要先将其在faultTolerant()之后注入。
源码中可以看到他只有在FaultTolerantStepBuilder中的注入方法,才会检测指定的注解。

	@Override
	@SuppressWarnings("unchecked")
	public SimpleStepBuilder<I, O> listener(Object listener) {
		super.listener(listener);

		Set<Method> skipListenerMethods = new HashSet<>();
		skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInRead.class));
		skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInProcess.class));
		skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInWrite.class));

		if(skipListenerMethods.size() > 0) {
			StepListenerFactoryBean factory = new StepListenerFactoryBean();
			factory.setDelegate(listener);
			skipListeners.add((SkipListener) factory.getObject());
		}

		@SuppressWarnings("unchecked")
		SimpleStepBuilder<I, O> result = this;
		return result;
	}

监听器的创建方式

根据源码可以看到,所有被注入的监听器,Spring batch检测的是其注解类。我们想要创建一个被识别的监听器,有两种选择

接口实现

我们可以直接实现上面表格中列出的接口

package dai.samples.batch.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;


/**
 * 在事务范围内处理的项目。在每个提交间隔提交事务都会提交一个“块”。ChunkListener可以在块开始处理之前或块成功完成之后使用
 * @author daify
 * @date 2020-11-09
 */
@Slf4j
public class MyChunkListener implements ChunkListener {


    @Override
    public void beforeChunk(ChunkContext context) {
        log.info("-------------beforeChunk--------------");

    }

    @Override
    public void afterChunk(ChunkContext context) {
        log.info("-------------afterChunk--------------");

    }

    @Override
    public void afterChunkError(ChunkContext context) {
        log.info("-------------afterChunkError--------------");
    }
}

方法注解

或者我们并不准备实现官方提供的接口,而是使用自己的代码,这个之后可以在对应方法中设置对应的注解,也可以实现其效果

package dai.samples.batch.listener;

import dai.samples.batch.entity.BatchEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.annotation.OnSkipInProcess;
import org.springframework.batch.core.annotation.OnSkipInRead;
import org.springframework.batch.core.annotation.OnSkipInWrite;

/**
 * 跳过监听器,每跳过一个项目就会调用一次
 * @author daify
 * @date 2020-11-09
 */
@Slf4j
public class MySkipListener {

    @OnSkipInRead
    public void onSkipInRead(Throwable throwable) {
        log.info("-------------onSkipInRead--------------");
    }

    @OnSkipInWrite
    public void onSkipInWrite(BatchEntity entity, Throwable throwable) {
        log.info("-------------onSkipInWrite--------------");
    }

    @OnSkipInProcess
    public void onSkipInProcess(BatchEntity entity, Throwable throwable) {
        log.info("-------------onSkipInProcess--------------");
    }
}

监听器触发的时机

通过向任务中添加各种监听器来测试其触发的时机

package dai.samples.batch.listener;

import dai.samples.batch.entity.BatchEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.builder.JsonFileItemWriterBuilder;
import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * @author daify
 * @date 2020-11-15
 */
@Slf4j
@Configuration
public class ListenerJobConfig {
......

    @Bean("listenerJob")
    public Job retrySkipJob(JobRepository jobRepository) {
        return this.jobBuilderFactory.get("listenerJob")
                .repository(jobRepository)
                .start(listenerStep())
                .listener(myJobExecutionListener())
                .build();
    }

    /**
     * 遇见RuntimeException异常进行重试,重试3次
     * @return
     */
    @Bean("listenerStep")
    public Step listenerStep() {
        return this.stepBuilderFactory.get("listenerStep")
                .<BatchEntity, BatchEntity>chunk(5)
                .reader(itemReader())
                .processor(getProcessor())
                .writer(itemWriter())
                .listener(myChunkListener())// 块监听器
                .listener(myStepExecutionListener())// step监听器
                .listener(myItemReadListener())// 读取监听器
                .listener(myItemProcessListener())// 处理监听器
                .listener(myItemWriterListener())
                .faultTolerant()
                .skip(Exception.class) // 重试
                .skipLimit(100)
                .listener(mySkipListener())// 跳过监听器,注意此监听器需要在faultTolerant后面
                .build();
    }
......
}

在测试用例中执行其逻辑可以测试其输出结果

/**
 * 任务监听器处理测试类
 * @author daify
 * @date 2020-11-15
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {BatchApplication.class})
public class ListenerJobTests {


    @Autowired
    @Qualifier(value = "listenerJob")
    private Job listenerJob;

    @Autowired
    private JobLauncher jobLauncher;

    /**
     * 测试不进行回滚的操作,正常每一个chunk中有数据异常整个chunk会被回滚,
     * 而此操作不会进行回滚,可以尝试修改chunk的参数看不同的效果
     * @throws Exception
     */
    @Test
    public void testListenerJob() throws Exception {

        JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
        jobLauncherTestUtils.setJobLauncher(jobLauncher);
        jobLauncherTestUtils.setJob(listenerJob);

        File file = new File(BatchConfig.PATH);
        Assert.isTrue(file.exists(),"地址错误");
        Assert.isTrue(file.isDirectory(),"目录错误");
        File[] files = file.listFiles();
        Arrays.stream(files).forEach(item -> item.delete());
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
        Assert.isTrue(BatchStatus.COMPLETED.equals(jobExecution.getStatus()),"返回状态失败");
        File[] newFiles = file.listFiles();
        Arrays.stream(newFiles).forEach(item -> System.out.println(item.getName()));
    }
}

控制台输出内容

-------------beforeJob--------------
-------------beforeStep--------------
-------------beforeProcess--------------
-------------afterProcess--------------
-------------beforeProcess--------------
-------------afterProcess--------------
-------------beforeProcess--------------
-------------afterProcess--------------
-------------beforeProcess--------------
-------------afterProcess--------------
-------------beforeProcess--------------
-------------afterProcess--------------
-------------beforeWrite--------------
-------------afterWrite--------------
-------------beforeProcess--------------
-------------afterProcess--------------
-------------beforeProcess--------------
-------------onProcessError--------------
-------------beforeProcess--------------
-------------afterProcess--------------
-------------beforeProcess--------------
-------------onProcessError--------------
-------------beforeProcess--------------
-------------afterProcess--------------
-------------beforeProcess--------------
-------------onProcessError--------------
-------------beforeProcess--------------
-------------afterProcess--------------
-------------beforeProcess--------------
-------------onProcessError--------------
-------------beforeProcess--------------
-------------afterProcess--------------
-------------beforeWrite--------------
-------------afterWrite--------------
-------------onSkipInProcess--------------
-------------onSkipInProcess--------------
-------------onSkipInProcess--------------
-------------onSkipInProcess--------------
-------------afterStep--------------
-------------afterJob--------------

监听器触发时机

JobExecutionListener

在任务执行的开始和结束时会触发此监听器。一次任务只会触发一次。

StepExecutionListener

在此执行步骤的开始和结束时会触发此监听器。每执行一次步骤会触发一次。

ChunkListener

一次处理块的开始和技术时会触发此监听器,一个执行步骤如果分为多个块进行执行会触发多次。

ItemReadListener

在数据读取的开始和结束后以及出现异常时候进入此监听器。

ItemWriteListener

在数据写入的开始和结束后以及出现异常时候进入此监听器。

SkipListener

每跳过一次(读、写、处理)时候会触发一次此监听器。


个人水平有限,上面的内容可能存在没有描述清楚或者错误的地方,因为每一个例子都提供了测试代码,一般来说不会有问题,但是因为这几篇内容断断续续用了一个半月可能会出现之后的代码影响了之前的例子,假如开发同学发现了,请及时告知,我会第一时间修改相关内容,也希望大家看在这个新春佳节只能宅到家中埋头苦逼的码代码的情况下,能给我点一个赞。你的点赞就是我前进的动力。