欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spring batch批处理 入门

程序员文章站 2022-05-29 17:04:16
...

 

参考:spring batch参考

 

spring batch的处理流程:

 

读取数据->处理数据->写数据

 

reader->process->writer

 

 

maven 依赖:

<properties>
		<spring.version>3.2.2.RELEASE</spring.version>
		<spring.batch.version>2.2.0.RELEASE</spring.batch.version>
		<mysql.driver.version>5.1.25</mysql.driver.version>
		<junit.version>4.11</junit.version>
	</properties>
 
	<dependencies>
 
		<!-- Spring Core -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${spring.version}</version>
		</dependency>
 
		<!-- Spring jdbc, for database -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>${spring.version}</version>
		</dependency>
 
		<!-- Spring XML to/back object -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-oxm</artifactId>
			<version>${spring.version}</version>
		</dependency>
 
		<!-- MySQL database driver -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>${mysql.driver.version}</version>
		</dependency>
 
		<!-- Spring Batch dependencies -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-core</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-infrastructure</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
 
		<!-- Spring Batch unit test -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-test</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
 
		<!-- Junit -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>${junit.version}</version>
			<scope>test</scope>
		</dependency>
 
	</dependencies>

 

POJO类:

 

package com.tch.test.spring.batch.entity;
 
import java.text.SimpleDateFormat;
import java.util.Date;
 
public class Report {
 
	private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
	
	private int id;
	private Date date;
 
	public int getId() {
		return id;
	}
 
	public void setId(int id) {
		this.id = id;
	}
 
	public Date getDate() {
		return date;
	}
 
	public void setDate(Date date) {
		this.date = date;
	}

	@Override
	public String toString() {
		return "Report [id=" + id + ", date=" + dateFormat.format(date) + "]";
	}
 
}

 

mapper类:

 

package com.tch.test.spring.batch;
import java.text.ParseException;
import java.text.SimpleDateFormat;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import com.tch.test.spring.batch.entity.Report;
 
public class ReportFieldSetMapper implements FieldSetMapper<Report> {
 
	private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
 
	@Override
	public Report mapFieldSet(FieldSet fieldSet) throws BindException {
		Report report = new Report();
		report.setId(fieldSet.readInt(0));
		String date = fieldSet.readString(1);
		try {
			report.setDate(dateFormat.parse(date));
		} catch (ParseException e) {
			e.printStackTrace();
		}
		return report;
	}
 
}

 

 

 

processor类:

 

package com.tch.test.spring.batch;
import org.springframework.batch.item.ItemProcessor;

import com.tch.test.spring.batch.entity.Report;
 
public class CustomItemProcessor implements ItemProcessor<Report, Report> {
 
	@Override
	public Report process(Report report) throws Exception {
		System.out.println("Processing..." + report);
		return report;
	}
 
}

 

 

writer类:

 

package com.tch.test.spring.batch;
import java.util.List;

import org.springframework.batch.item.ItemWriter;

import com.tch.test.spring.batch.entity.Report;

public class ReportItemWriter implements ItemWriter<Report>{
	public void write(List<? extends Report> reports) throws Exception {
		for (Report m : reports) {
			System.out.println("write results : "+m);
		}
	}
}

 

 

beans.xml:(commit-interval表示批处理每次事务处理记录数,这里每次处理十条)

 

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/batch
		http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
	">

	<bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" />

	<batch:job id="helloWorldJob">
		<batch:step id="step1">
			<batch:tasklet>
				<batch:chunk reader="cvsFileItemReader" writer="reportWriter" processor="itemProcessor" commit-interval="10">
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
	</batch:job>

	<bean id="cvsFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" value="classpath:report.txt" />
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<property name="lineTokenizer">
					<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
					</bean>
				</property>
				<property name="fieldSetMapper">
					<bean class="com.tch.test.spring.batch.ReportFieldSetMapper" />
				</property>
			</bean>
		</property>

	</bean>

	<bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean>

	<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>

	<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

</beans>

 

 

report.txt:

 

1001, 29/7/2013
1002, 30/7/2013
1003, 31/7/2013
1004, 29/7/2013
1005,30/7/2013
1006, 31/7/2013
1007, 29/7/2013
1008,30/7/2013
1009, 31/7/2013
1010, 29/7/2013
1011,30/7/2013
1012, 31/7/2013
1013, 29/7/2013
1014,30/7/2013
1015, 31/7/2013
1016, 29/7/2013
1017,30/7/2013
1018, 31/7/2013

 

Test:

 

package com.tch.test.spring.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Test {
	public static void main(String[] args) throws Exception {

		String[] springConfig = { "beans.xml" };

		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
		
		JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
		Job job = (Job) context.getBean("helloWorldJob");

		JobExecution execution = jobLauncher.run(job, new JobParameters());
		System.out.println("Exit Status : " + execution.getStatus());
		System.out.println("Done");
		
		context.close();
	}
}

 

 

 

 

如果要读取多个文件,则只需要使用MultiResourceItemReader即可:

 

beans.xml:

 

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/batch
		http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

	<bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" />

	<batch:job id="helloWorldJob">
		<batch:step id="step1">
			<batch:tasklet>
				<batch:chunk reader="multiResourceReader" writer="reportWriter"
					processor="itemProcessor" commit-interval="10">
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
	</batch:job>

	<bean id="multiResourceReader"
		class=" org.springframework.batch.item.file.MultiResourceItemReader">
		<property name="resources" value="classpath:report*.txt" />
		<property name="delegate" ref="flatFileItemReader" />
	</bean>

	<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" value="classpath:report.txt" />
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<property name="lineTokenizer">
					<bean
						class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
					</bean>
				</property>
				<property name="fieldSetMapper">
					<bean class="com.tch.test.spring.batch.ReportFieldSetMapper" />
				</property>
			</bean>
		</property>

	</bean>

	<bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean>

	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<bean id="jobRepository"
		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>

	<bean id="transactionManager"
		class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

</beans>

 

 

 

spring batch 和 quartz 定时批处理

 

beans.xml:

 

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/batch
		http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

	<!-- 处理数据 -->
	<bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" />

	<!-- 处理数据的job -->
	<batch:job id="helloWorldJob">
		<batch:step id="step1">
			<batch:tasklet>
				<batch:chunk reader="multiResourceReader" writer="reportWriter"
					processor="itemProcessor" commit-interval="10">
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
	</batch:job>

	<!-- 读取多个资源的reader -->
	<bean id="multiResourceReader"
		class=" org.springframework.batch.item.file.MultiResourceItemReader">
		<!-- 资源位置 -->
		<property name="resources" value="classpath:report*.txt" />
		<!-- 使用读取单个资源的reader -->
		<property name="delegate" ref="flatFileItemReader" />
	</bean>

	<!-- 读取单个资源的reader -->
	<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<!-- 分词器 -->
				<property name="lineTokenizer">
					<bean
						class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
					</bean>
				</property>
				<!-- 数据和实体的映射处理 -->
				<property name="fieldSetMapper">
					<bean class="com.tch.test.spring.batch.ReportFieldSetMapper" />
				</property>
			</bean>
		</property>

	</bean>

	<!-- 数据处理完成之后,自定义写操作 -->
	<bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean>

	<!-- job启动 -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<!-- job仓库 -->
	<bean id="jobRepository"
		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>
	
	<!-- 事务管理 -->
	<bean id="transactionManager"
		class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

	<!-- job注册 -->
	<bean
		class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
		<property name="jobRegistry" ref="jobRegistry" />
	</bean>

	<bean id="jobRegistry"
		class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

	<!-- 使用quartz进行调度管理(每5秒钟执行一次) -->
	<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
		<property name="triggers">
			<bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
				<!-- 执行的操作 -->
				<property name="jobDetail" ref="jobDetail" />
				<!-- 定时表达式 -->
				<property name="cronExpression" value="*/5 * * * * ?" />
			</bean>
		</property>
	</bean>

	<bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
		<!-- 执行操作的class -->
		<property name="jobClass" value="com.tch.test.spring.batch.JobLauncherDetails" />
		<property name="group" value="quartz-batch" />
		<property name="jobDataAsMap">
			<map>
				<entry key="jobName" value="helloWorldJob" />
				<entry key="jobLocator" value-ref="jobRegistry" />
				<entry key="jobLauncher" value-ref="jobLauncher" />
				<entry key="param1" value="value1" />
				<entry key="param2" value="value2" />
			</map>
		</property>
	</bean>
</beans>

 

 

JobLauncherDetails:

 

package com.tch.test.spring.batch;

import java.util.Date;
import java.util.Map;
import java.util.Map.Entry;

import org.quartz.JobExecutionContext;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.JobLocator;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.scheduling.quartz.QuartzJobBean;

public class JobLauncherDetails extends QuartzJobBean {

	static final String JOB_NAME = "jobName";

	private JobLocator jobLocator;

	private JobLauncher jobLauncher;

	public void setJobLocator(JobLocator jobLocator) {
		this.jobLocator = jobLocator;
	}

	public void setJobLauncher(JobLauncher jobLauncher) {
		this.jobLauncher = jobLauncher;
	}

	@SuppressWarnings("unchecked")
	protected void executeInternal(JobExecutionContext context) {

		Map<String, Object> jobDataMap = context.getMergedJobDataMap();

		//拿到beans.xml中jobDetail里面配置的"jobName"对应的value
		String jobName = (String) jobDataMap.get(JOB_NAME);
		JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
		try {
			//运行对应的job
			jobLauncher.run(jobLocator.getJob(jobName), jobParameters);
		} catch (JobExecutionException e) {
			e.printStackTrace();
		}
	}

	// 读取配置的参数
	private JobParameters getJobParametersFromJobMap(Map<String, Object> jobDataMap) {
		JobParametersBuilder builder = new JobParametersBuilder();
		for (Entry<String, Object> entry : jobDataMap.entrySet()) {
			String key = entry.getKey();
			Object value = entry.getValue();
			//过滤掉"jobName"
			if (value instanceof String && !key.equals(JOB_NAME)) {
				builder.addString(key, (String) value);
			} else if (value instanceof Float || value instanceof Double) {
				builder.addDouble(key, ((Number) value).doubleValue());
			} else if (value instanceof Integer || value instanceof Long) {
				builder.addLong(key, ((Number) value).longValue());
			} else if (value instanceof Date) {
				builder.addDate(key, (Date) value);
			} else {
				//过滤掉beans.xml中jobDetail的jobDataAsMap配置的"jobLocator"、"jobLauncher"属性
				// JobDataMap contains values which are not job parameters
				// (ignoring)
			}
		}
		// need unique job parameter to rerun the completed job
		builder.addDate("run date", new Date());
		return builder.toJobParameters();
	}

}

 

 

运行:

 

package com.tch.test.spring.batch;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Test {
	public static void main(String[] args) throws Exception {

		String[] springConfig = { "beans.xml" };
		
		context = new ClassPathXmlApplicationContext(springConfig);

	}
}

 

 

 

 

 

 

 

相关标签: spring batch