Spring Batch 注册监听器
程序员文章站
2022-05-01 13:03:41
...
– Start
点击此处观看本系列配套视频。
Spring 支持如下监听器。
监听器 | 说明 |
---|---|
JobExecutionListener | 在 Job 开始之前(beforeJob)和之后(afterJob)触发 |
StepExecutionListener | 在 Step 开始之前(beforeStep)和之后(afterStep)触发 |
ChunkListener | 在 Chunk 开始之前(beforeChunk),之后(afterChunk)和错误后(afterChunkError)触发 |
ItemReadListener | 在 Read 开始之前(beforeRead),之后(afterRead)和错误后(onReadError)触发 |
ItemProcessListener | 在 Read 开始之前(beforeProcess),之后(afterProcess)和错误后(onProcessError)触发 |
ItemWriteListener | 在 Read 开始之前(beforeWrite),之后(afterWrite)和错误后(onWriteError)触发 |
SkipListener | 在 Read 开始之前(beforeWrite),之后(afterWrite)和错误后(onWriteError)触发 |
来看个简单的例子。
package shangbo.springbatch.example6;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
public class SimpleJobExecutionListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println("SimpleJobExecutionListener.beforeJob");
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("SimpleJobExecutionListener.afterJob");
}
}
package shangbo.springbatch.example6;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
public class SimpleStepExecutionListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("SimpleStepExecutionListener.beforeStep");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("SimpleStepExecutionListener.afterStep");
return stepExecution.getExitStatus();
}
}
package shangbo.springbatch.example6;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;
public class SimpleChunkListener implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
System.out.println("SimpleChunkListener.beforeChunk");
}
@Override
public void afterChunk(ChunkContext context) {
System.out.println("SimpleChunkListener.afterChunk");
}
@Override
public void afterChunkError(ChunkContext context) {
System.out.println("SimpleChunkListener.afterChunkError");
}
}
package shangbo.springbatch.example6;
import org.springframework.batch.core.ItemReadListener;
public class SimpleItemReadListener implements ItemReadListener<People> {
@Override
public void beforeRead() {
System.out.println("SimpleItemReadListener.beforeRead");
}
@Override
public void afterRead(People item) {
System.out.println("SimpleItemReadListener.afterRead -- " + item.getName());
}
@Override
public void onReadError(Exception ex) {
System.out.println("SimpleItemReadListener.onReadError");
}
}
package shangbo.springbatch.example6;
import org.springframework.batch.core.ItemProcessListener;
public class SimpleItemProcessListener implements ItemProcessListener<People, People> {
@Override
public void beforeProcess(People item) {
System.out.println("SimpleItemProcessListener.beforeProcess");
}
@Override
public void afterProcess(People item, People result) {
System.out.println("SimpleItemProcessListener.afterProcess -- " + result.getName());
}
@Override
public void onProcessError(People item, Exception e) {
System.out.println("SimpleItemProcessListener.onProcessError");
}
}
package shangbo.springbatch.example6;
import java.util.List;
import org.springframework.batch.core.ItemWriteListener;
public class SimpleItemWriteListener implements ItemWriteListener<People> {
@Override
public void beforeWrite(List<? extends People> items) {
System.out.println("SimpleItemWriteListener.beforeWrite");
}
@Override
public void afterWrite(List<? extends People> items) {
System.out.println("SimpleItemWriteListener.afterWrite");
}
@Override
public void onWriteError(Exception exception, List<? extends People> items) {
System.out.println("SimpleItemWriteListener.onWriteError");
}
}
package shangbo.springbatch.example6;
import org.springframework.batch.core.SkipListener;
public class SimpleSkipListener implements SkipListener<String, People> {
@Override
public void onSkipInRead(Throwable t) {
System.out.println("SimpleSkipListener.onSkipInRead");
}
@Override
public void onSkipInWrite(People item, Throwable t) {
System.out.println("SimpleSkipListener.onSkipInWrite");
}
@Override
public void onSkipInProcess(String item, Throwable t) {
System.out.println("SimpleSkipListener.onSkipInProcess");
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 定义 dataSource -->
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />
<property name="url" value="jdbc:oracle:thin:@localhost:1521:xe" />
<property name="username" value="hr" />
<property name="password" value="123456" />
</bean>
<!-- 定义 jdbc 事务管理器 -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<!-- 定义 jobRepository, 用来持久化 job -->
<batch:job-repository id="jobRepository" data-source="dataSource" transaction-manager="transactionManager"/>
<!-- 定义 jobLauncher, 来用运行 job -->
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<!-- 定义 job -->
<batch:job id="loadFileJob" job-repository="jobRepository">
<batch:step id="loadFileStep">
<batch:tasklet>
<batch:chunk reader="loadFileReader" processor="loadFileProcessor" writer="loadFileWriter" commit-interval="10">
<batch:listeners>
<batch:listener ref="simpleChunkListener"/>
<batch:listener ref="simpleItemReadListener"/>
<batch:listener ref="simpleItemProcessListener"/>
<batch:listener ref="simpleItemWriteListener"/>
<batch:listener ref="simpleSkipListener"/>
</batch:listeners>
</batch:chunk>
</batch:tasklet>
<batch:listeners>
<batch:listener ref="simpleStepExecutionListener"/>
</batch:listeners>
</batch:step>
<batch:listeners>
<batch:listener ref="simpleJobExecutionListener"/>
</batch:listeners>
</batch:job>
<bean id="simpleJobExecutionListener" class="shangbo.springbatch.example6.SimpleJobExecutionListener"/>
<bean id="simpleStepExecutionListener" class="shangbo.springbatch.example6.SimpleStepExecutionListener"/>
<bean id="simpleChunkListener" class="shangbo.springbatch.example6.SimpleChunkListener"/>
<bean id="simpleItemReadListener" class="shangbo.springbatch.example6.SimpleItemReadListener"/>
<bean id="simpleItemProcessListener" class="shangbo.springbatch.example6.SimpleItemProcessListener"/>
<bean id="simpleItemWriteListener" class="shangbo.springbatch.example6.SimpleItemWriteListener"/>
<bean id="simpleSkipListener" class="shangbo.springbatch.example6.SimpleSkipListener"/>
<!-- 定义 reader -->
<bean id="loadFileReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<!-- 指定要读取的文件 -->
<property name="resource" value="file:///D:/0_Shangbo/Dev/Workspace/people_#{jobParameters['business_date']}.txt" />
<!-- 忽略以 header 和 footer 开头的行-->
<property name="comments">
<list>
<value>header</value>
<value>footer</value>
</list>
</property>
<!-- 文件编码方式 -->
<property name="encoding" value="UTF-8" />
<!-- 文件不存在则报错 -->
<property name="strict" value="true" />
<!-- 指定如何将行转成对象 -->
<property name="lineMapper" ref="lineMapper" />
</bean>
<bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper" scope="step">
<property name="fieldSetMapper" ref="fieldSetMapper" />
<property name="lineTokenizer" ref="lineTokenizer" />
</bean>
<bean id="fieldSetMapper" class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper" scope="step">
<property name="targetType" value="shangbo.springbatch.example6.People" />
</bean>
<bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" scope="step">
<property name="delimiter" value="|" />
<property name="includedFields" value="0,1" />
<property name="names" value="id,name" />
</bean>
<!-- 定义 processor -->
<bean id="loadFileProcessor" class="org.springframework.batch.item.support.PassThroughItemProcessor" scope="step"/>
<!-- 定义 writer -->
<bean id="loadFileWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter" scope="step">
<property name="sql" value="insert into people values (:id,:name)"/>
<property name="jdbcTemplate" ref="jdbcTemplate"/>
<property name="itemSqlParameterSourceProvider" ref="itemSqlParameterSourceProvider"/>
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate" scope="step">
<constructor-arg ref="dataSource"></constructor-arg>
</bean>
<bean id="itemSqlParameterSourceProvider" class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" scope="step"/>
</beans>
package shangbo.springbatch.example6;
public class People implements java.io.Serializable{
private static final long serialVersionUID = 8904705906008476310L;
private Integer id;
private String name;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
package shangbo.springbatch.example6;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class App {
public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
ApplicationContext context = new ClassPathXmlApplicationContext("shangbo/springbatch/example6/LoadFileJob.xml");
// job 和 job 参数
Map<String,JobParameter> parameters = new HashMap<>();
parameters.put("business_date", new JobParameter("20170704"));
JobParameters jobParameters = new JobParameters(parameters);
Job job = context.getBean(Job.class);
// 运行 job
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
jobLauncher.run(job, jobParameters);
}
}
信息: Job: [FlowJob: [name=loadFileJob]] launched with the following parameters: [{business_date=20170705}]
SimpleJobExecutionListener.beforeJob
七月 22, 2017 8:35:22 下午 org.springframework.batch.core.job.SimpleStepHandler handleStep
信息: Executing step: [loadFileStep]
SimpleStepExecutionListener.beforeStep
SimpleChunkListener.beforeChunk
SimpleItemReadListener.beforeRead
SimpleItemReadListener.afterRead -- zhangsan
SimpleItemReadListener.beforeRead
SimpleItemReadListener.afterRead -- lisi
SimpleItemReadListener.beforeRead
SimpleItemReadListener.afterRead -- wangwu
SimpleItemReadListener.beforeRead
SimpleItemProcessListener.beforeProcess
SimpleItemProcessListener.afterProcess -- zhangsan
SimpleItemProcessListener.beforeProcess
SimpleItemProcessListener.afterProcess -- lisi
SimpleItemProcessListener.beforeProcess
SimpleItemProcessListener.afterProcess -- wangwu
SimpleItemWriteListener.beforeWrite
SimpleItemWriteListener.afterWrite
SimpleChunkListener.afterChunk
SimpleStepExecutionListener.afterStep
SimpleJobExecutionListener.afterJob
– 更多参见:Spring Batch 精萃
– 声 明:转载请注明出处
– Last Updated on 2017-07-22
– Written by ShangBo on 2017-07-22
– End
推荐阅读
-
Spring Cloud EureKa Ribbon 服务注册发现与调用
-
Spring Cloud 学习Consul服务注册与发现
-
Spring Cloud Eureka 注册与发现操作步骤详解
-
spring动态bean注册示例分享
-
一起来学Spring Cloud | 第二章:服务注册和发现组件 (Eureka)
-
【死磕 Spring】----- IOC 之 注册 BeanDefinition
-
Spring Boot邮箱链接注册验证及注册流程
-
将Spring Boot应用程序注册成为系统服务
-
Spring Cloud Alibaba系列-第四节-创建生产者与消费者服务,注册到Nacos监控服务
-
详解Spring batch 入门学习教程(附源码)