Spring Batch 如何自定义ItemReader
spring batch 自定义itemreader
spring batch支持各种数据输入源,如文件、数据库等。然而有时也会遇到一些默认不支持的数据源,这时我们则需要实现自己的数据源————自定义itemreader。本文通过示例说明如何自定义itemreader。
创建自定义itemreader
创建自定义itemreader需要下面两个步骤:
- 创建一个实现itemreader接口的类,并提供返回对象类型 t 作为类型参数。
- 按照下面规则实现itemreader接口的t read()方法
read()方法如果存在下一个对象则返回,否则返回null。
下面我们自定义itemreader,其返回在线测试课程的学生信息studto类型,为了减少复杂性,该数据存储在内存中。studto类是一个简单数据传输对象,代码如下:
@data public class studto { private string emailaddress; private string name; private string purchasedpackage; }
下面参照一下步骤创建itemreader:
- 创建inmemorystudentreader 类
- 实现itemreader接口,并设置返回对象类型为studto
- 类中增加list studentdata 字段,其包括参加课程的学生信息
- 类中增加nextstudentindex 字段,表示下一个studto对象的索引
- 增加私有initialize()方法,初始化学生信息并设置索引值为0
- 创建构造函数并调用initialize方法
- 实现read()方法,包括下面规则:如果存在下一个学生,则返回studto对象并把索引加一。否则返回null。
inmemorystudentreader 代码如下:
public class inmemorystudentreader implements itemreader<studto> { private int nextstudentindex; private list<studto> studentdata; inmemorystudentreader() { initialize(); } private void initialize() { studto tony = new studto(); tony.setemailaddress("tony.tester@gmail.com"); tony.setname("tony tester"); tony.setpurchasedpackage("master"); studto nick = new studto(); nick.setemailaddress("nick.newbie@gmail.com"); nick.setname("nick newbie"); nick.setpurchasedpackage("starter"); studto ian = new studto(); ian.setemailaddress("ian.intermediate@gmail.com"); ian.setname("ian intermediate"); ian.setpurchasedpackage("intermediate"); studentdata = collections.unmodifiablelist(arrays.aslist(tony, nick, ian)); nextstudentindex = 0; } @override public studto read() throws exception { studto nextstudent = null; if (nextstudentindex < studentdata.size()) { nextstudent = studentdata.get(nextstudentindex); nextstudentindex++; } return nextstudent; } }
创建好自定义itemreader后,需要配置其作为bean让spring batch job使用。下面请看如何配置。
配置itemreader bean
配置类代码如下:
@configuration public class inmemorystudentjobconfig { @bean itemreader<studto> inmemorystudentreader() { return new inmemorystudentreader(); } }
需要增加@configuration表明类为配置类, 增加方法返回itemreader类型,并增加@bean注解,实现方法内容————返回inmemorystudentreader对象。
小结一下
本文通过示例说明如何自定义itemreader,主要包括三个方面:
- 自定义itemreader需实现itemreader接口
- 实现itemreader接口,需要指定返回类型作为类型参数(t)
- 实现接口方法read,如果存在下一个对象则返回,反之返回null
spring batch 之 itemreader
重点介绍 itemreader,如何从不同数据源读取数据;以及异常处理及重启机制。
jdbcpagingitemreader
从数据库中读取数据
@configuration public class dbjdbcdemojobconfiguration { @autowired private jobbuilderfactory jobbuilderfactory; @autowired private stepbuilderfactory stepbuilderfactory; @autowired @qualifier("dbjdbcdemowriter") private itemwriter<? super customer> dbjdbcdemowriter; @autowired private datasource datasource; @bean public job dbjdbcdemojob(){ return jobbuilderfactory.get("dbjdbcdemojob") .start(dbjdbcdemostep()) .build(); } @bean public step dbjdbcdemostep() { return stepbuilderfactory.get("dbjdbcdemostep") .<customer,customer>chunk(100) .reader(dbjdbcdemoreader()) .writer(dbjdbcdemowriter) .build(); } @bean @stepscope public jdbcpagingitemreader<customer> dbjdbcdemoreader() { jdbcpagingitemreader<customer> reader = new jdbcpagingitemreader<>(); reader.setdatasource(this.datasource); reader.setfetchsize(100); //批量读取 reader.setrowmapper((rs,rownum)->{ return customer.builder().id(rs.getlong("id")) .firstname(rs.getstring("firstname")) .lastname(rs.getstring("lastname")) .birthdate(rs.getstring("birthdate")) .build(); }); mysqlpagingqueryprovider queryprovider = new mysqlpagingqueryprovider(); queryprovider.setselectclause("id, firstname, lastname, birthdate"); queryprovider.setfromclause("from customer"); map<string, order> sortkeys = new hashmap<>(1); sortkeys.put("id", order.ascending); queryprovider.setsortkeys(sortkeys); reader.setqueryprovider(queryprovider); return reader; } }
job 和 itermwriter不是本文介绍重点,此处举例,下面例子相同
@component("dbjdbcdemowriter") public class dbjdbcdemowriter implements itemwriter<customer> { @override public void write(list<? extends customer> items) throws exception { for (customer customer:items) system.out.println(customer); } }
flatfileitemreader
从cvs文件中读取数据
@configuration public class flatfiledemojobconfiguration { @autowired private jobbuilderfactory jobbuilderfactory; @autowired private stepbuilderfactory stepbuilderfactory; @autowired @qualifier("flatfiledemowriter") private itemwriter<? super customer> flatfiledemowriter; @bean public job flatfiledemojob(){ return jobbuilderfactory.get("flatfiledemojob") .start(flatfiledemostep()) .build(); } @bean public step flatfiledemostep() { return stepbuilderfactory.get("flatfiledemostep") .<customer,customer>chunk(100) .reader(flatfiledemoreader()) .writer(flatfiledemowriter) .build(); } @bean @stepscope public flatfileitemreader<customer> flatfiledemoreader() { flatfileitemreader<customer> reader = new flatfileitemreader<>(); reader.setresource(new classpathresource("customer.csv")); reader.setlinestoskip(1); delimitedlinetokenizer tokenizer = new delimitedlinetokenizer(); tokenizer.setnames(new string[]{"id","firstname","lastname","birthdate"}); defaultlinemapper<customer> linemapper = new defaultlinemapper<>(); linemapper.setlinetokenizer(tokenizer); linemapper.setfieldsetmapper((fieldset -> { return customer.builder().id(fieldset.readlong("id")) .firstname(fieldset.readstring("firstname")) .lastname(fieldset.readstring("lastname")) .birthdate(fieldset.readstring("birthdate")) .build(); })); linemapper.afterpropertiesset(); reader.setlinemapper(linemapper); return reader; } }
staxeventitemreader
从xml文件中读取数据
@configuration public class xmlfiledemojobconfiguration { @autowired private jobbuilderfactory jobbuilderfactory; @autowired private stepbuilderfactory stepbuilderfactory; @autowired @qualifier("xmlfiledemowriter") private itemwriter<? super customer> xmlfiledemowriter; @bean public job xmlfiledemojob(){ return jobbuilderfactory.get("xmlfiledemojob") .start(xmlfiledemostep()) .build(); } @bean public step xmlfiledemostep() { return stepbuilderfactory.get("xmlfiledemostep") .<customer,customer>chunk(10) .reader(xmlfiledemoreader()) .writer(xmlfiledemowriter) .build(); } @bean @stepscope public staxeventitemreader<customer> xmlfiledemoreader() { staxeventitemreader<customer> reader = new staxeventitemreader<>(); reader.setresource(new classpathresource("customer.xml")); reader.setfragmentrootelementname("customer"); xstreammarshaller unmarshaller = new xstreammarshaller(); map<string,class> map = new hashmap<>(); map.put("customer",customer.class); unmarshaller.setaliases(map); reader.setunmarshaller(unmarshaller); return reader; } }
multiresourceitemreader
从多个文件读取数据
@configuration public class multiplefiledemojobconfiguration { @autowired private jobbuilderfactory jobbuilderfactory; @autowired private stepbuilderfactory stepbuilderfactory; @autowired @qualifier("flatfiledemowriter") private itemwriter<? super customer> flatfiledemowriter; @value("classpath*:/file*.csv") private resource[] inputfiles; @bean public job multiplefiledemojob(){ return jobbuilderfactory.get("multiplefiledemojob") .start(multiplefiledemostep()) .build(); } @bean public step multiplefiledemostep() { return stepbuilderfactory.get("multiplefiledemostep") .<customer,customer>chunk(50) .reader(multipleresourceitemreader()) .writer(flatfiledemowriter) .build(); } private multiresourceitemreader<customer> multipleresourceitemreader() { multiresourceitemreader<customer> reader = new multiresourceitemreader<>(); reader.setdelegate(flatfilereader()); reader.setresources(inputfiles); return reader; } @bean public flatfileitemreader<customer> flatfilereader() { flatfileitemreader<customer> reader = new flatfileitemreader<>(); reader.setresource(new classpathresource("customer.csv")); // reader.setlinestoskip(1); delimitedlinetokenizer tokenizer = new delimitedlinetokenizer(); tokenizer.setnames(new string[]{"id","firstname","lastname","birthdate"}); defaultlinemapper<customer> linemapper = new defaultlinemapper<>(); linemapper.setlinetokenizer(tokenizer); linemapper.setfieldsetmapper((fieldset -> { return customer.builder().id(fieldset.readlong("id")) .firstname(fieldset.readstring("firstname")) .lastname(fieldset.readstring("lastname")) .birthdate(fieldset.readstring("birthdate")) .build(); })); linemapper.afterpropertiesset(); reader.setlinemapper(linemapper); return reader; } }
异常处理及重启机制
对于chunk-oriented step,spring batch提供了管理状态的工具。如何在一个步骤中管理状态是通过itemstream接口为开发人员提供访问权限保持状态的组件。这里提到的这个组件是executioncontext实际上它是键值对的映射。map存储特定步骤的状态。该executioncontext使重启步骤成为可能,因为状态在jobrepository中持久存在。
执行期间出现错误时,最后一个状态将更新为jobrepository。下次作业运行时,最后一个状态将用于填充executioncontext然后
可以继续从上次离开的地方开始运行。
检查itemstream接口:
将在步骤开始时调用open()并执行executioncontext;
用db填充值; update()将在每个步骤或事务结束时调用,更新executioncontext;
完成所有数据块后调用close();
下面我们构造个例子
准备个cvs文件,在第33条数据,添加一条错误名字信息 ;当读取到这条数据时,抛出异常终止程序。
itemreader测试代码
@component("restartdemoreader") public class restartdemoreader implements itemstreamreader<customer> { private long curline = 0l; private boolean restart = false; private flatfileitemreader<customer> reader = new flatfileitemreader<>(); private executioncontext executioncontext; restartdemoreader public () { reader.setresource(new classpathresource("restartdemo.csv")); delimitedlinetokenizer tokenizer = new delimitedlinetokenizer(); tokenizer.setnames(new string[]{"id", "firstname", "lastname", "birthdate"}); defaultlinemapper<customer> linemapper = new defaultlinemapper<>(); linemapper.setlinetokenizer(tokenizer); linemapper.setfieldsetmapper((fieldset -> { return customer.builder().id(fieldset.readlong("id")) .firstname(fieldset.readstring("firstname")) .lastname(fieldset.readstring("lastname")) .birthdate(fieldset.readstring("birthdate")) .build(); })); linemapper.afterpropertiesset(); reader.setlinemapper(linemapper); } @override public customer read() throws exception, unexpectedinputexception, parseexception, nontransientresourceexception { customer customer = null; this.curline++; //如果是重启,则从上一步读取的行数继续往下执行 if (restart) { reader.setlinestoskip(this.curline.intvalue()-1); restart = false; system.out.println("start reading from line: " + this.curline); } reader.open(this.executioncontext); customer = reader.read(); //当匹配到wrongname时,显示抛出异常,终止程序 if (customer != null) { if (customer.getfirstname().equals("wrongname")) throw new runtimeexception("something wrong. customer id: " + customer.getid()); } else { curline--; } return customer; } /** * 判断是否是重启job * @param executioncontext * @throws itemstreamexception */ @override public void open(executioncontext executioncontext) throws itemstreamexception { this.executioncontext = executioncontext; if (executioncontext.containskey("curline")) { this.curline = executioncontext.getlong("curline"); this.restart = true; } else { this.curline = 0l; executioncontext.put("curline", this.curline.intvalue()); } } @override public void update(executioncontext executioncontext) throws itemstreamexception { system.out.println("update curline: " + this.curline); executioncontext.put("curline", this.curline); } @override public void close() throws itemstreamexception { } }
job配置
以10条记录为一个批次,进行读取
@configuration public class restartdemojobconfiguration { @autowired private jobbuilderfactory jobbuilderfactory; @autowired private stepbuilderfactory stepbuilderfactory; @autowired @qualifier("flatfiledemowriter") private itemwriter<? super customer> flatfiledemowriter; @autowired @qualifier("restartdemoreader") private itemreader<customer> restartdemoreader; @bean public job restartdemojob(){ return jobbuilderfactory.get("restartdemojob") .start(restartdemostep()) .build(); } @bean public step restartdemostep() { return stepbuilderfactory.get("restartdemostep") .<customer,customer>chunk(10) .reader(restartdemoreader) .writer(flatfiledemowriter) .build(); } }
当我们第一次执行时,程序在33行抛出异常异常,curline值是30;
这时,我们可以查询数据库 batch_step_excution表,发现curline值已经以 键值对形式,持久化进数据库(上文以10条数据为一个批次;故33条数据异常时,curline值为30)
接下来,我们更新wrongname,再次执行程序;
程序会执行open方法,判断数据库step中map是否存在curline,如果存在,则是重跑,即读取curline,从该批次开始往下继续执行;
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。
下一篇: 详细介绍Python中的set集合
推荐阅读
-
Spring Boot如何通过自定义注解实现日志打印详解
-
Spring Batch 如何自定义ItemReader
-
spring data jpa如何使用自定义repository实现类
-
如何在Spring MVC中自定义注解
-
如何在Spring MVC中自定义注解
-
Spring batch自定义LineMapper实现特殊文本的处理
-
Spring Security Oauth2 如何自定义授权获取token
-
Spring Boot如何创建自定义starter
-
Spring Boot 整合——Spring batch通过不同方式读取数据(ItemReader)
-
Spring Boot 如何自定义 配置文件