详解Spring batch 入门学习教程(附源码)
spring batch 是一个开源的批处理框架.执行一系列的任务. 在 spring batch 中 一个job 是由许多 step 组成的。而每一个 step 又是由 read-process-write task或者 单个 task 组成。
1. "read-process-write" 处理,根据字面意思理解就可以:
- read 就是从资源文件里面读取数据,比如从xml文件,csv文件,数据库中读取数据.
- process 就是处理读取的数据
- write 就是将处理过的数据写入到其他资源文件中去,可以是xml,csv,或者数据库.
比如:从csv文件中 读取数据,经过处理之后,保存到数据库. spring batch 提供了很多类去处理这方面的东西。
2.单个task, 也就是处理单个任务。比如在一个step 开始之前或者完成之后清除资源文件等.
3.许多个step 组成在一起,就组成了一个job. 所以他们之间的关系,就如同下面的描述:
一个 job = 很多steps
一个step = 一个read-process-write 或者 一个task.
同样一个job = step1 -->step2--step3 这样链表形式的组成.
spring batch 例子
考虑如下一个批处理的例子,看起来有点啰嗦,只是为了说明用途:
1. step1 : 从 a 文件夹中读取csv 文件,处理之后,写入到b文件夹中(read-process-write)
2. step2 : 从 b 文件夹中读取csv文件 ,处理之后, 存储到数据库中(read-process-write).
3. step3 : 删除b文件夹下的csv文件。(用到单个task)
4. step4 : 从数据库读取数据,处理之后,生成xml报表文件(read-process-write).
5. step5 : 读取xml报表,并发送email给管理员(用到单个task)
用spring batch 我们可以如下定义这个job:
<job id="abcjob" xmlns="http://www.springframework.org/schema/batch"> <step id="step1" next="step2"> <tasklet> <chunk reader="cvsitemreader" writer="cvsitemwriter" processor="itemprocesser" commit-interval="1" /> </tasklet> </step> <step id="step2" next="step3"> <tasklet> <chunk reader="cvsitemreader" writer="databaseitemwriter" processor="itemprocesser" commit-interval="1" /> </tasklet> </step> <step id="step3" next="step4"> <tasklet ref="filedeletingtasklet" /> </step> <step id="step4" next="step5"> <tasklet> <chunk reader="databaseitemreader" writer="xmlitemwriter" processor="itemprocesser" commit-interval="1" /> </tasklet> </step> <step id="step5"> <tasklet ref="sendingemailtasklet" /> </step> </job>
整个 job 的执行是存储在数据库中的,所以即使是某一个step出错失败,也不需要全部从头开始执行这个job.下面是一个真正的入门教程例子.
采用 jar包如下:
spring-batch-2.2.3 以上版本,但是我在2.2.3版本中发现 org/springframework/batch/core/schema-mysql.sql 里面的的mysql 创建表的语句是有问题的,也就是少了“," 号导致的问题( not null, 后面几个创建表的语句not null 后面少了逗号),当然你可以自己修改后再执行,执行完毕后有如下几个表:
xstream-1.3.jar 必须的。
jettison-1.3.3.jar也是必须的, 否则会出现
java.lang.noclassdeffounderror: org/codehaus/jettison/mapped/mappedxmloutputfactory错误。
另外我用的spring 是 3.1 版本的,可以下载相关jar包,还有apache common 相关jar包就可以了。
mysql-connect-java-5.1.jar 连接mysql 数据库用的。
假设要将如下 csv 文件读取出来处理之后,写入到一个xml文件之中.
,"213,100",980,"mkyong", 29/7/2013 ,"320,200",1080,"staff 1", 30/7/2013 ,"342,197",1200,"staff 2", 31/7/2013
用 flatfileitemreader 去读取csv 文件, 用 itemprocessor 去处理数据,用 staxeventitemwriter 去写数据
job 的定义如下(job-hello-world.xml):
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd "> <import resource="../config/context.xml" /> <import resource="../config/database.xml" /> <bean id="report" class="yihaomen.model.report" scope="prototype" /> <bean id="itemprocessor" class="yihaomen.customitemprocessor" /> <batch:job id="helloworldjob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="cvsfileitemreader" writer="xmlitemwriter" processor="itemprocessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <bean id="cvsfileitemreader" class="org.springframework.batch.item.file.flatfileitemreader"> <property name="resource" value="classpath:cvs/input/report.csv" /> <property name="linemapper"> <bean class="org.springframework.batch.item.file.mapping.defaultlinemapper"> <property name="linetokenizer"> <bean class="org.springframework.batch.item.file.transform.delimitedlinetokenizer"> <property name="names" value="id,sales,qty,staffname,date" /> </bean> </property> <property name="fieldsetmapper"> <bean class="yihaomen.reportfieldsetmapper" /> <!-- if no data type conversion, use beanwrapperfieldsetmapper to map by name <bean class="org.springframework.batch.item.file.mapping.beanwrapperfieldsetmapper"> <property name="prototypebeanname" value="report" /> </bean> --> </property> </bean> </property> </bean> <bean id="xmlitemwriter" class="org.springframework.batch.item.xml.staxeventitemwriter"> <property name="resource" value="file:xml/outputs/report.xml" /> <property name="marshaller" ref="reportmarshaller" /> <property name="roottagname" value="report" /> </bean> <bean id="reportmarshaller" class="org.springframework.oxm.jaxb.jaxb2marshaller"> <property name="classestobebound"> <list> <value>yihaomen.model.report</value> </list> </property> </bean> </beans>
映射csv文件到 report 对象并写xml文件 (通过 jaxb annotations).
package yihaomen.model; import java.math.bigdecimal; import java.util.date; import javax.xml.bind.annotation.xmlattribute; import javax.xml.bind.annotation.xmlelement; import javax.xml.bind.annotation.xmlrootelement; @xmlrootelement(name = "record") public class report { private int id; private bigdecimal sales; private int qty; private string staffname; private date date; @xmlattribute(name = "id") public int getid() { return id; } public void setid(int id) { this.id = id; } @xmlelement(name = "sales") public bigdecimal getsales() { return sales; } public void setsales(bigdecimal sales) { this.sales = sales; } @xmlelement(name = "qty") public int getqty() { return qty; } public void setqty(int qty) { this.qty = qty; } @xmlelement(name = "staffname") public string getstaffname() { return staffname; } public void setstaffname(string staffname) { this.staffname = staffname; } public date getdate() { return date; } public void setdate(date date) { this.date = date; } @override public string tostring() { return "report [id=" + id + ", sales=" + sales + ", qty=" + qty + ", staffname=" + staffname + "]"; } }
为了转换日期,用了自定义的 fieldsetmapper. 如果没有数据需要转换, beanwrapperfieldsetmapper 通过名称name 去自动映射值。
package yihaomen; import java.text.parseexception; import java.text.simpledateformat; import org.springframework.batch.item.file.mapping.fieldsetmapper; import org.springframework.batch.item.file.transform.fieldset; import org.springframework.validation.bindexception; import yihaomen.model.report; public class reportfieldsetmapper implements fieldsetmapper<report> { private simpledateformat dateformat = new simpledateformat("yyyy-mm-dd"); @override public report mapfieldset(fieldset fieldset) throws bindexception { report report = new report(); report.setid(fieldset.readint(0)); report.setsales(fieldset.readbigdecimal(1)); report.setqty(fieldset.readint(2)); report.setstaffname(fieldset.readstring(3)); //default format yyyy-mm-dd //fieldset.readdate(4); string date = fieldset.readstring(4); try { report.setdate(dateformat.parse(date)); } catch (parseexception e) { e.printstacktrace(); } return report; } }
在写入数据之前调用itemprocessor 处理数据
package yihaomen; import org.springframework.batch.item.itemprocessor; import yihaomen.model.report; public class customitemprocessor implements itemprocessor<report, report> { @override public report process(report item) throws exception { system.out.println("processing..." + item); return item; } }
spring 配置文件和数据库配置文件
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- stored job-meta in memory --> <!-- <bean id="jobrepository" class="org.springframework.batch.core.repository.support.mapjobrepositoryfactorybean"> <property name="transactionmanager" ref="transactionmanager" /> </bean> --> <!-- stored job-meta in database --> <bean id="jobrepository" class="org.springframework.batch.core.repository.support.jobrepositoryfactorybean"> <property name="datasource" ref="datasource" /> <property name="transactionmanager" ref="transactionmanager" /> <property name="databasetype" value="mysql" /> </bean> <bean id="transactionmanager" class="org.springframework.batch.support.transaction.resourcelesstransactionmanager" /> <bean id="joblauncher" class="org.springframework.batch.core.launch.support.simplejoblauncher"> <property name="jobrepository" ref="jobrepository" /> </bean> </beans>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd"> <!-- connect to mysql database --> <bean id="datasource" class="org.springframework.jdbc.datasource.drivermanagerdatasource"> <property name="driverclassname" value="com.mysql.jdbc.driver" /> <property name="url" value="jdbc:mysql://localhost:3306/test" /> <property name="username" value="root" /> <property name="password" value="" /> </bean> <bean id="transactionmanager" class="org.springframework.batch.support.transaction.resourcelesstransactionmanager" /> <!-- create job-meta tables automatically --> <jdbc:initialize-database data-source="datasource"> <jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" /> <jdbc:script location="org/springframework/batch/core/schema-mysql.sql" /> </jdbc:initialize-database> </beans>
运行程序
package yihaomen; import org.springframework.batch.core.job; import org.springframework.batch.core.jobexecution; import org.springframework.batch.core.jobparameters; import org.springframework.batch.core.launch.joblauncher; import org.springframework.context.applicationcontext; import org.springframework.context.support.classpathxmlapplicationcontext; public class app { public static void main(string[] args) { string[] springconfig = { "spring/batch/jobs/job-hello-world.xml" }; applicationcontext context = new classpathxmlapplicationcontext(springconfig); joblauncher joblauncher = (joblauncher) context.getbean("joblauncher"); job job = (job) context.getbean("helloworldjob"); try { jobexecution execution = joblauncher.run(job, new jobparameters()); system.out.println("exit status : " + execution.getstatus()); } catch (exception e) { e.printstacktrace(); } system.out.println("done"); } }
运行结果 :
十二月 03, 2013 8:56:24 下午 org.springframework.batch.core.launch.support.simplejoblauncher$1 run info: job: [flowjob: [name=helloworldjob]] launched with the following parameters: [{}] 十二月 03, 2013 8:56:24 下午 org.springframework.batch.core.job.simplestephandler handlestep info: executing step: [step1] processing...report [id=1001, sales=213100, qty=980, staffname=yihaomen] processing...report [id=1002, sales=320200, qty=1080, staffname=staff 1] processing...report [id=1003, sales=342197, qty=1200, staffname=staff 2] 十二月 03, 2013 8:56:25 下午 org.springframework.batch.core.launch.support.simplejoblauncher$1 run info: job: [flowjob: [name=helloworldjob]] completed with the following parameters: [{}] and the following status: [completed] exit status : completed done
结果生成了output.xml 在你工程目录的 xml 目录下。
整个源代码,除去jar包之后下载:spring batch 入门教程下载
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。