spring batch批处理 入门
程序员文章站
2022-05-29 17:04:16
...
spring batch的处理流程:
读取数据->处理数据->写数据
reader->process->writer
maven 依赖:
<properties> <spring.version>3.2.2.RELEASE</spring.version> <spring.batch.version>2.2.0.RELEASE</spring.batch.version> <mysql.driver.version>5.1.25</mysql.driver.version> <junit.version>4.11</junit.version> </properties> <dependencies> <!-- Spring Core --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <!-- Spring jdbc, for database --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <!-- Spring XML to/back object --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> <version>${spring.version}</version> </dependency> <!-- MySQL database driver --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.driver.version}</version> </dependency> <!-- Spring Batch dependencies --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>${spring.batch.version}</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-infrastructure</artifactId> <version>${spring.batch.version}</version> </dependency> <!-- Spring Batch unit test --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <version>${spring.batch.version}</version> </dependency> <!-- Junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> </dependencies>
POJO类:
package com.tch.test.spring.batch.entity; import java.text.SimpleDateFormat; import java.util.Date; public class Report { private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy"); private int id; private Date date; public int getId() { return id; } public void setId(int id) { this.id = id; } public Date getDate() { return date; } public void setDate(Date date) { this.date = date; } @Override public String toString() { return "Report [id=" + id + ", date=" + dateFormat.format(date) + "]"; } }
mapper类:
package com.tch.test.spring.batch; import java.text.ParseException; import java.text.SimpleDateFormat; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.validation.BindException; import com.tch.test.spring.batch.entity.Report; public class ReportFieldSetMapper implements FieldSetMapper<Report> { private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy"); @Override public Report mapFieldSet(FieldSet fieldSet) throws BindException { Report report = new Report(); report.setId(fieldSet.readInt(0)); String date = fieldSet.readString(1); try { report.setDate(dateFormat.parse(date)); } catch (ParseException e) { e.printStackTrace(); } return report; } }
processor类:
package com.tch.test.spring.batch; import org.springframework.batch.item.ItemProcessor; import com.tch.test.spring.batch.entity.Report; public class CustomItemProcessor implements ItemProcessor<Report, Report> { @Override public Report process(Report report) throws Exception { System.out.println("Processing..." + report); return report; } }
writer类:
package com.tch.test.spring.batch; import java.util.List; import org.springframework.batch.item.ItemWriter; import com.tch.test.spring.batch.entity.Report; public class ReportItemWriter implements ItemWriter<Report>{ public void write(List<? extends Report> reports) throws Exception { for (Report m : reports) { System.out.println("write results : "+m); } } }
beans.xml:(commit-interval表示批处理每次事务处理记录数,这里每次处理十条)
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd "> <bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" /> <batch:job id="helloWorldJob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="cvsFileItemReader" writer="reportWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <bean id="cvsFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="classpath:report.txt" /> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> </bean> </property> <property name="fieldSetMapper"> <bean class="com.tch.test.spring.batch.ReportFieldSetMapper" /> </property> </bean> </property> </bean> <bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> </beans>
report.txt:
1001, 29/7/2013 1002, 30/7/2013 1003, 31/7/2013 1004, 29/7/2013 1005,30/7/2013 1006, 31/7/2013 1007, 29/7/2013 1008,30/7/2013 1009, 31/7/2013 1010, 29/7/2013 1011,30/7/2013 1012, 31/7/2013 1013, 29/7/2013 1014,30/7/2013 1015, 31/7/2013 1016, 29/7/2013 1017,30/7/2013 1018, 31/7/2013
Test:
package com.tch.test.spring.batch; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Test { public static void main(String[] args) throws Exception { String[] springConfig = { "beans.xml" }; ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(springConfig); JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("helloWorldJob"); JobExecution execution = jobLauncher.run(job, new JobParameters()); System.out.println("Exit Status : " + execution.getStatus()); System.out.println("Done"); context.close(); } }
如果要读取多个文件,则只需要使用MultiResourceItemReader即可:
beans.xml:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" /> <batch:job id="helloWorldJob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="multiResourceReader" writer="reportWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <bean id="multiResourceReader" class=" org.springframework.batch.item.file.MultiResourceItemReader"> <property name="resources" value="classpath:report*.txt" /> <property name="delegate" ref="flatFileItemReader" /> </bean> <bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="classpath:report.txt" /> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> </bean> </property> <property name="fieldSetMapper"> <bean class="com.tch.test.spring.batch.ReportFieldSetMapper" /> </property> </bean> </property> </bean> <bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> </beans>
beans.xml:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- 处理数据 --> <bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" /> <!-- 处理数据的job --> <batch:job id="helloWorldJob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="multiResourceReader" writer="reportWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <!-- 读取多个资源的reader --> <bean id="multiResourceReader" class=" org.springframework.batch.item.file.MultiResourceItemReader"> <!-- 资源位置 --> <property name="resources" value="classpath:report*.txt" /> <!-- 使用读取单个资源的reader --> <property name="delegate" ref="flatFileItemReader" /> </bean> <!-- 读取单个资源的reader --> <bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <!-- 分词器 --> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> </bean> </property> <!-- 数据和实体的映射处理 --> <property name="fieldSetMapper"> <bean class="com.tch.test.spring.batch.ReportFieldSetMapper" /> </property> </bean> </property> </bean> <!-- 数据处理完成之后,自定义写操作 --> <bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean> <!-- job启动 --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <!-- job仓库 --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <!-- 事务管理 --> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> <!-- job注册 --> <bean class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor"> <property name="jobRegistry" ref="jobRegistry" /> </bean> <bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" /> <!-- 使用quartz进行调度管理(每5秒钟执行一次) --> <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="triggers"> <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean"> <!-- 执行的操作 --> <property name="jobDetail" ref="jobDetail" /> <!-- 定时表达式 --> <property name="cronExpression" value="*/5 * * * * ?" /> </bean> </property> </bean> <bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean"> <!-- 执行操作的class --> <property name="jobClass" value="com.tch.test.spring.batch.JobLauncherDetails" /> <property name="group" value="quartz-batch" /> <property name="jobDataAsMap"> <map> <entry key="jobName" value="helloWorldJob" /> <entry key="jobLocator" value-ref="jobRegistry" /> <entry key="jobLauncher" value-ref="jobLauncher" /> <entry key="param1" value="value1" /> <entry key="param2" value="value2" /> </map> </property> </bean> </beans>
JobLauncherDetails:
package com.tch.test.spring.batch; import java.util.Date; import java.util.Map; import java.util.Map.Entry; import org.quartz.JobExecutionContext; import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.configuration.JobLocator; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.scheduling.quartz.QuartzJobBean; public class JobLauncherDetails extends QuartzJobBean { static final String JOB_NAME = "jobName"; private JobLocator jobLocator; private JobLauncher jobLauncher; public void setJobLocator(JobLocator jobLocator) { this.jobLocator = jobLocator; } public void setJobLauncher(JobLauncher jobLauncher) { this.jobLauncher = jobLauncher; } @SuppressWarnings("unchecked") protected void executeInternal(JobExecutionContext context) { Map<String, Object> jobDataMap = context.getMergedJobDataMap(); //拿到beans.xml中jobDetail里面配置的"jobName"对应的value String jobName = (String) jobDataMap.get(JOB_NAME); JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap); try { //运行对应的job jobLauncher.run(jobLocator.getJob(jobName), jobParameters); } catch (JobExecutionException e) { e.printStackTrace(); } } // 读取配置的参数 private JobParameters getJobParametersFromJobMap(Map<String, Object> jobDataMap) { JobParametersBuilder builder = new JobParametersBuilder(); for (Entry<String, Object> entry : jobDataMap.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); //过滤掉"jobName" if (value instanceof String && !key.equals(JOB_NAME)) { builder.addString(key, (String) value); } else if (value instanceof Float || value instanceof Double) { builder.addDouble(key, ((Number) value).doubleValue()); } else if (value instanceof Integer || value instanceof Long) { builder.addLong(key, ((Number) value).longValue()); } else if (value instanceof Date) { builder.addDate(key, (Date) value); } else { //过滤掉beans.xml中jobDetail的jobDataAsMap配置的"jobLocator"、"jobLauncher"属性 // JobDataMap contains values which are not job parameters // (ignoring) } } // need unique job parameter to rerun the completed job builder.addDate("run date", new Date()); return builder.toJobParameters(); } }
运行:
package com.tch.test.spring.batch; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Test { public static void main(String[] args) throws Exception { String[] springConfig = { "beans.xml" }; context = new ClassPathXmlApplicationContext(springConfig); } }