欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring Batch 注册监听器

程序员文章站 2022-05-01 13:03:41
...

– Start
点击此处观看本系列配套视频。


Spring 支持如下监听器。

监听器 说明
JobExecutionListener 在 Job 开始之前(beforeJob)和之后(afterJob)触发
StepExecutionListener 在 Step 开始之前(beforeStep)和之后(afterStep)触发
ChunkListener 在 Chunk 开始之前(beforeChunk),之后(afterChunk)和错误后(afterChunkError)触发
ItemReadListener 在 Read 开始之前(beforeRead),之后(afterRead)和错误后(onReadError)触发
ItemProcessListener 在 Read 开始之前(beforeProcess),之后(afterProcess)和错误后(onProcessError)触发
ItemWriteListener 在 Read 开始之前(beforeWrite),之后(afterWrite)和错误后(onWriteError)触发
SkipListener 在 Read 开始之前(beforeWrite),之后(afterWrite)和错误后(onWriteError)触发

来看个简单的例子。

package shangbo.springbatch.example6;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

public class SimpleJobExecutionListener implements JobExecutionListener {

	@Override
	public void beforeJob(JobExecution jobExecution) {
		System.out.println("SimpleJobExecutionListener.beforeJob");
	}

	@Override
	public void afterJob(JobExecution jobExecution) {
		System.out.println("SimpleJobExecutionListener.afterJob");
	}

}

package shangbo.springbatch.example6;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;

public class SimpleStepExecutionListener implements StepExecutionListener {

	@Override
	public void beforeStep(StepExecution stepExecution) {
		System.out.println("SimpleStepExecutionListener.beforeStep");
	}

	@Override
	public ExitStatus afterStep(StepExecution stepExecution) {
		System.out.println("SimpleStepExecutionListener.afterStep");
		return stepExecution.getExitStatus();
	}

}

package shangbo.springbatch.example6;

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;

public class SimpleChunkListener implements ChunkListener {

	@Override
	public void beforeChunk(ChunkContext context) {
		System.out.println("SimpleChunkListener.beforeChunk");
	}

	@Override
	public void afterChunk(ChunkContext context) {
		System.out.println("SimpleChunkListener.afterChunk");
	}

	@Override
	public void afterChunkError(ChunkContext context) {
		System.out.println("SimpleChunkListener.afterChunkError");
	}

}

package shangbo.springbatch.example6;

import org.springframework.batch.core.ItemReadListener;

public class SimpleItemReadListener implements ItemReadListener<People> {

	@Override
	public void beforeRead() {
		System.out.println("SimpleItemReadListener.beforeRead");
	}

	@Override
	public void afterRead(People item) {
		System.out.println("SimpleItemReadListener.afterRead -- " + item.getName());
	}

	@Override
	public void onReadError(Exception ex) {
		System.out.println("SimpleItemReadListener.onReadError");
	}

}

package shangbo.springbatch.example6;

import org.springframework.batch.core.ItemProcessListener;

public class SimpleItemProcessListener implements ItemProcessListener<People, People> {

	@Override
	public void beforeProcess(People item) {
		System.out.println("SimpleItemProcessListener.beforeProcess");
	}

	@Override
	public void afterProcess(People item, People result) {
		System.out.println("SimpleItemProcessListener.afterProcess -- " + result.getName());
	}

	@Override
	public void onProcessError(People item, Exception e) {
		System.out.println("SimpleItemProcessListener.onProcessError");
	}

}

package shangbo.springbatch.example6;

import java.util.List;

import org.springframework.batch.core.ItemWriteListener;

public class SimpleItemWriteListener implements ItemWriteListener<People> {

	@Override
	public void beforeWrite(List<? extends People> items) {
		System.out.println("SimpleItemWriteListener.beforeWrite");
	}

	@Override
	public void afterWrite(List<? extends People> items) {
		System.out.println("SimpleItemWriteListener.afterWrite");
	}

	@Override
	public void onWriteError(Exception exception, List<? extends People> items) {
		System.out.println("SimpleItemWriteListener.onWriteError");
	}

}

package shangbo.springbatch.example6;

import org.springframework.batch.core.SkipListener;

public class SimpleSkipListener implements SkipListener<String, People> {

	@Override
	public void onSkipInRead(Throwable t) {
		System.out.println("SimpleSkipListener.onSkipInRead");
	}

	@Override
	public void onSkipInWrite(People item, Throwable t) {
		System.out.println("SimpleSkipListener.onSkipInWrite");
	}

	@Override
	public void onSkipInProcess(String item, Throwable t) {
		System.out.println("SimpleSkipListener.onSkipInProcess");
	}

}

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch"
	xsi:schemaLocation="http://www.springframework.org/schema/batch 
		http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans.xsd">
	
	<!-- 定义  dataSource -->
	<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
	    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />
	    <property name="url" value="jdbc:oracle:thin:@localhost:1521:xe" />
	    <property name="username" value="hr" />
	    <property name="password" value="123456" />
	</bean>
	
	<!-- 定义 jdbc 事务管理器 -->
	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
	    <property name="dataSource" ref="dataSource"/>
	</bean>
	
	<!-- 定义 jobRepository, 用来持久化 job -->
	<batch:job-repository id="jobRepository" data-source="dataSource" transaction-manager="transactionManager"/>

	<!-- 定义 jobLauncher, 来用运行 job -->
	<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
	    <property name="jobRepository" ref="jobRepository" />
	</bean>

	<!-- 定义 job -->
	<batch:job id="loadFileJob" job-repository="jobRepository">
		<batch:step id="loadFileStep">
			<batch:tasklet>
				<batch:chunk reader="loadFileReader" processor="loadFileProcessor" writer="loadFileWriter" commit-interval="10">
					<batch:listeners>
						<batch:listener ref="simpleChunkListener"/>
						<batch:listener ref="simpleItemReadListener"/>
						<batch:listener ref="simpleItemProcessListener"/>
						<batch:listener ref="simpleItemWriteListener"/>
						<batch:listener ref="simpleSkipListener"/>
					</batch:listeners>
				</batch:chunk>
			</batch:tasklet>
			<batch:listeners>
				<batch:listener ref="simpleStepExecutionListener"/>
			</batch:listeners>
		</batch:step>
		<batch:listeners>
			<batch:listener ref="simpleJobExecutionListener"/>
		</batch:listeners>
	</batch:job>
	
	<bean id="simpleJobExecutionListener" class="shangbo.springbatch.example6.SimpleJobExecutionListener"/>
	<bean id="simpleStepExecutionListener" class="shangbo.springbatch.example6.SimpleStepExecutionListener"/>
	<bean id="simpleChunkListener" class="shangbo.springbatch.example6.SimpleChunkListener"/>
	<bean id="simpleItemReadListener" class="shangbo.springbatch.example6.SimpleItemReadListener"/>
	<bean id="simpleItemProcessListener" class="shangbo.springbatch.example6.SimpleItemProcessListener"/>
	<bean id="simpleItemWriteListener" class="shangbo.springbatch.example6.SimpleItemWriteListener"/>
	<bean id="simpleSkipListener" class="shangbo.springbatch.example6.SimpleSkipListener"/>

	<!-- 定义 reader -->
	<bean id="loadFileReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
		<!-- 指定要读取的文件 -->
	    <property name="resource" value="file:///D:/0_Shangbo/Dev/Workspace/people_#{jobParameters['business_date']}.txt" />
		<!-- 忽略以 header 和 footer 开头的行-->
		<property name="comments">
			<list>
				<value>header</value>
				<value>footer</value>
			</list>
		</property>
		<!-- 文件编码方式 -->
		<property name="encoding" value="UTF-8" />
	    <!-- 文件不存在则报错 -->
		<property name="strict" value="true" />
		<!-- 指定如何将行转成对象 -->
	    <property name="lineMapper" ref="lineMapper" />
	</bean>
	
	<bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper" scope="step">
		<property name="fieldSetMapper" ref="fieldSetMapper" />
		<property name="lineTokenizer" ref="lineTokenizer" />
	</bean>
	
	<bean id="fieldSetMapper" class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper" scope="step">
		<property name="targetType" value="shangbo.springbatch.example6.People" />
	</bean>
	
	<bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" scope="step">
		<property name="delimiter" value="|" />
		<property name="includedFields" value="0,1" />
		<property name="names" value="id,name" />
	</bean>

	<!-- 定义 processor -->
	<bean id="loadFileProcessor" class="org.springframework.batch.item.support.PassThroughItemProcessor" scope="step"/>

	<!-- 定义 writer -->
	<bean id="loadFileWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter" scope="step">
		<property name="sql" value="insert into people values (:id,:name)"/>
		<property name="jdbcTemplate" ref="jdbcTemplate"/>
		<property name="itemSqlParameterSourceProvider" ref="itemSqlParameterSourceProvider"/>
	</bean>
	
	<bean id="jdbcTemplate" class="org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate" scope="step">
		<constructor-arg ref="dataSource"></constructor-arg>
	</bean>
	
	<bean id="itemSqlParameterSourceProvider" class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" scope="step"/>

</beans>
package shangbo.springbatch.example6;

public class People implements java.io.Serializable{
	private static final long serialVersionUID = 8904705906008476310L;
	
	private Integer id;
	private String name;

	public Integer getId() {
		return id;
	}

	public void setId(Integer id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

}

package shangbo.springbatch.example6;

import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class App {
	public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
		ApplicationContext context = new ClassPathXmlApplicationContext("shangbo/springbatch/example6/LoadFileJob.xml");

		// job 和 job 参数
		Map<String,JobParameter> parameters = new HashMap<>();
		parameters.put("business_date", new JobParameter("20170704"));
		JobParameters jobParameters = new JobParameters(parameters);
		Job job = context.getBean(Job.class);

		// 运行 job
		JobLauncher jobLauncher = context.getBean(JobLauncher.class);
		jobLauncher.run(job, jobParameters);
	}
}

信息: Job: [FlowJob: [name=loadFileJob]] launched with the following parameters: [{business_date=20170705}]
SimpleJobExecutionListener.beforeJob
七月 22, 2017 8:35:22 下午 org.springframework.batch.core.job.SimpleStepHandler handleStep
信息: Executing step: [loadFileStep]
SimpleStepExecutionListener.beforeStep
SimpleChunkListener.beforeChunk
SimpleItemReadListener.beforeRead
SimpleItemReadListener.afterRead -- zhangsan
SimpleItemReadListener.beforeRead
SimpleItemReadListener.afterRead -- lisi
SimpleItemReadListener.beforeRead
SimpleItemReadListener.afterRead -- wangwu
SimpleItemReadListener.beforeRead
SimpleItemProcessListener.beforeProcess
SimpleItemProcessListener.afterProcess -- zhangsan
SimpleItemProcessListener.beforeProcess
SimpleItemProcessListener.afterProcess -- lisi
SimpleItemProcessListener.beforeProcess
SimpleItemProcessListener.afterProcess -- wangwu
SimpleItemWriteListener.beforeWrite
SimpleItemWriteListener.afterWrite
SimpleChunkListener.afterChunk
SimpleStepExecutionListener.afterStep
SimpleJobExecutionListener.afterJob

更多参见:Spring Batch 精萃
– 声 明:转载请注明出处
– Last Updated on 2017-07-22
– Written by ShangBo on 2017-07-22
– End

相关标签: spring batch