欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring Batch 如何自定义ItemReader

程序员文章站 2022-03-28 22:26:11
目录spring batch 自定义itemreader创建自定义itemreader配置itemreader beanspring batch 之 itemreaderjdbcpagingitemr...

spring batch 自定义itemreader

spring batch支持各种数据输入源,如文件、数据库等。然而有时也会遇到一些默认不支持的数据源,这时我们则需要实现自己的数据源————自定义itemreader。本文通过示例说明如何自定义itemreader。

创建自定义itemreader

创建自定义itemreader需要下面两个步骤:

  • 创建一个实现itemreader接口的类,并提供返回对象类型 t 作为类型参数。
  • 按照下面规则实现itemreader接口的t read()方法

read()方法如果存在下一个对象则返回,否则返回null。

下面我们自定义itemreader,其返回在线测试课程的学生信息studto类型,为了减少复杂性,该数据存储在内存中。studto类是一个简单数据传输对象,代码如下:

@data
public class studto {
    private string emailaddress;
    private string name;
    private string purchasedpackage;
}

下面参照一下步骤创建itemreader:

  • 创建inmemorystudentreader 类
  • 实现itemreader接口,并设置返回对象类型为studto
  • 类中增加list studentdata 字段,其包括参加课程的学生信息
  • 类中增加nextstudentindex 字段,表示下一个studto对象的索引
  • 增加私有initialize()方法,初始化学生信息并设置索引值为0
  • 创建构造函数并调用initialize方法
  • 实现read()方法,包括下面规则:如果存在下一个学生,则返回studto对象并把索引加一。否则返回null。

inmemorystudentreader 代码如下:

public class inmemorystudentreader implements itemreader<studto> { 
    private int nextstudentindex;
    private list<studto> studentdata; 
    inmemorystudentreader() {
        initialize();
    }
 
    private void initialize() {
        studto tony = new studto();
        tony.setemailaddress("tony.tester@gmail.com");
        tony.setname("tony tester");
        tony.setpurchasedpackage("master");
 
        studto nick = new studto();
        nick.setemailaddress("nick.newbie@gmail.com");
        nick.setname("nick newbie");
        nick.setpurchasedpackage("starter");
 
        studto ian = new studto();
        ian.setemailaddress("ian.intermediate@gmail.com");
        ian.setname("ian intermediate");
        ian.setpurchasedpackage("intermediate");
 
        studentdata = collections.unmodifiablelist(arrays.aslist(tony, nick, ian));
        nextstudentindex = 0;
    }
 
    @override
    public studto read() throws exception {
        studto nextstudent = null;
 
        if (nextstudentindex < studentdata.size()) {
            nextstudent = studentdata.get(nextstudentindex);
            nextstudentindex++;
        } 
        return nextstudent;
    }
}

创建好自定义itemreader后,需要配置其作为bean让spring batch job使用。下面请看如何配置。

配置itemreader bean

配置类代码如下:

@configuration
public class inmemorystudentjobconfig { 
    @bean
    itemreader<studto> inmemorystudentreader() {
        return new inmemorystudentreader();
    }
}

需要增加@configuration表明类为配置类, 增加方法返回itemreader类型,并增加@bean注解,实现方法内容————返回inmemorystudentreader对象。

小结一下

本文通过示例说明如何自定义itemreader,主要包括三个方面:

  • 自定义itemreader需实现itemreader接口
  • 实现itemreader接口,需要指定返回类型作为类型参数(t)
  • 实现接口方法read,如果存在下一个对象则返回,反之返回null

spring batch 之 itemreader

重点介绍 itemreader,如何从不同数据源读取数据;以及异常处理及重启机制。

jdbcpagingitemreader

从数据库中读取数据

@configuration
public class dbjdbcdemojobconfiguration {
    @autowired
    private jobbuilderfactory jobbuilderfactory;
 
    @autowired
    private stepbuilderfactory stepbuilderfactory;
 
    @autowired
    @qualifier("dbjdbcdemowriter")
    private itemwriter<? super customer> dbjdbcdemowriter;
 
    @autowired
    private datasource datasource;
 
    @bean
    public job dbjdbcdemojob(){
        return jobbuilderfactory.get("dbjdbcdemojob")
                .start(dbjdbcdemostep())
                .build();
     }
 
    @bean
    public step dbjdbcdemostep() {
        return stepbuilderfactory.get("dbjdbcdemostep")
                .<customer,customer>chunk(100)
                .reader(dbjdbcdemoreader())
                .writer(dbjdbcdemowriter)
                .build();
    }
 
    @bean
    @stepscope
    public jdbcpagingitemreader<customer> dbjdbcdemoreader() {
        jdbcpagingitemreader<customer> reader = new jdbcpagingitemreader<>();
 
        reader.setdatasource(this.datasource);
        reader.setfetchsize(100); //批量读取
        reader.setrowmapper((rs,rownum)->{
            return customer.builder().id(rs.getlong("id"))
                    .firstname(rs.getstring("firstname"))
                    .lastname(rs.getstring("lastname"))
                    .birthdate(rs.getstring("birthdate"))
                    .build();
 
        });
 
        mysqlpagingqueryprovider queryprovider = new mysqlpagingqueryprovider();
        queryprovider.setselectclause("id, firstname, lastname, birthdate");
        queryprovider.setfromclause("from customer");
        map<string, order> sortkeys = new hashmap<>(1);
        sortkeys.put("id", order.ascending);
        queryprovider.setsortkeys(sortkeys); 
        reader.setqueryprovider(queryprovider); 
        return reader; 
    }
}

job 和 itermwriter不是本文介绍重点,此处举例,下面例子相同

@component("dbjdbcdemowriter")
public class dbjdbcdemowriter implements itemwriter<customer> {
    @override
    public void write(list<? extends customer> items) throws exception {
        for (customer customer:items)
            system.out.println(customer); 
    }
}

flatfileitemreader

从cvs文件中读取数据

 
@configuration
public class flatfiledemojobconfiguration {
    @autowired
    private jobbuilderfactory jobbuilderfactory; 
    @autowired
    private stepbuilderfactory stepbuilderfactory; 
    @autowired
    @qualifier("flatfiledemowriter")
    private itemwriter<? super customer> flatfiledemowriter; 
    @bean
    public job flatfiledemojob(){
        return jobbuilderfactory.get("flatfiledemojob")
                .start(flatfiledemostep())
                .build(); 
    }
 
    @bean
    public step flatfiledemostep() {
        return stepbuilderfactory.get("flatfiledemostep")
                .<customer,customer>chunk(100)
                .reader(flatfiledemoreader())
                .writer(flatfiledemowriter)
                .build();
    }
 
    @bean
    @stepscope
    public flatfileitemreader<customer> flatfiledemoreader() {
        flatfileitemreader<customer> reader = new flatfileitemreader<>();
        reader.setresource(new classpathresource("customer.csv"));
        reader.setlinestoskip(1);
 
        delimitedlinetokenizer tokenizer = new delimitedlinetokenizer();
        tokenizer.setnames(new string[]{"id","firstname","lastname","birthdate"});
 
        defaultlinemapper<customer> linemapper = new defaultlinemapper<>();
        linemapper.setlinetokenizer(tokenizer);
        linemapper.setfieldsetmapper((fieldset -> {
            return customer.builder().id(fieldset.readlong("id"))
                    .firstname(fieldset.readstring("firstname"))
                    .lastname(fieldset.readstring("lastname"))
                    .birthdate(fieldset.readstring("birthdate"))
                    .build();
        }));
        linemapper.afterpropertiesset(); 
        reader.setlinemapper(linemapper); 
        return reader; 
    }
}

staxeventitemreader

从xml文件中读取数据

@configuration
public class xmlfiledemojobconfiguration {
    @autowired
    private jobbuilderfactory jobbuilderfactory; 
    @autowired
    private stepbuilderfactory stepbuilderfactory; 
    @autowired
    @qualifier("xmlfiledemowriter")
    private itemwriter<? super customer> xmlfiledemowriter; 
    @bean
    public job xmlfiledemojob(){
        return jobbuilderfactory.get("xmlfiledemojob")
                .start(xmlfiledemostep())
                .build(); 
    } 
    @bean
    public step xmlfiledemostep() {
        return stepbuilderfactory.get("xmlfiledemostep")
                .<customer,customer>chunk(10)
                .reader(xmlfiledemoreader())
                .writer(xmlfiledemowriter)
                .build();
    } 
    @bean
    @stepscope
    public staxeventitemreader<customer> xmlfiledemoreader() {
        staxeventitemreader<customer> reader = new staxeventitemreader<>(); 
        reader.setresource(new classpathresource("customer.xml"));
        reader.setfragmentrootelementname("customer");  
        xstreammarshaller unmarshaller = new xstreammarshaller();
        map<string,class> map = new hashmap<>();
        map.put("customer",customer.class);
        unmarshaller.setaliases(map);
        reader.setunmarshaller(unmarshaller);  
        return reader; 
    }
}

multiresourceitemreader

从多个文件读取数据

@configuration
public class multiplefiledemojobconfiguration {
    @autowired
    private jobbuilderfactory jobbuilderfactory;
 
    @autowired
    private stepbuilderfactory stepbuilderfactory;
 
    @autowired
    @qualifier("flatfiledemowriter")
    private itemwriter<? super customer> flatfiledemowriter;
 
    @value("classpath*:/file*.csv")
    private resource[] inputfiles;
 
    @bean
    public job multiplefiledemojob(){
        return jobbuilderfactory.get("multiplefiledemojob")
                .start(multiplefiledemostep())
                .build(); 
    }
 
    @bean
    public step multiplefiledemostep() {
        return stepbuilderfactory.get("multiplefiledemostep")
                .<customer,customer>chunk(50)
                .reader(multipleresourceitemreader())
                .writer(flatfiledemowriter)
                .build();
    }
 
    private multiresourceitemreader<customer> multipleresourceitemreader() { 
        multiresourceitemreader<customer> reader = new multiresourceitemreader<>(); 
        reader.setdelegate(flatfilereader());
        reader.setresources(inputfiles); 
        return reader;
    }
 
    @bean
    public flatfileitemreader<customer> flatfilereader() {
        flatfileitemreader<customer> reader = new flatfileitemreader<>();
        reader.setresource(new classpathresource("customer.csv"));
       // reader.setlinestoskip(1);
 
        delimitedlinetokenizer tokenizer = new delimitedlinetokenizer();
        tokenizer.setnames(new string[]{"id","firstname","lastname","birthdate"});
 
        defaultlinemapper<customer> linemapper = new defaultlinemapper<>();
        linemapper.setlinetokenizer(tokenizer);
        linemapper.setfieldsetmapper((fieldset -> {
            return customer.builder().id(fieldset.readlong("id"))
                    .firstname(fieldset.readstring("firstname"))
                    .lastname(fieldset.readstring("lastname"))
                    .birthdate(fieldset.readstring("birthdate"))
                    .build();
        }));
        linemapper.afterpropertiesset(); 
        reader.setlinemapper(linemapper); 
        return reader; 
    }
}

异常处理及重启机制

对于chunk-oriented step,spring batch提供了管理状态的工具。如何在一个步骤中管理状态是通过itemstream接口为开发人员提供访问权限保持状态的组件。这里提到的这个组件是executioncontext实际上它是键值对的映射。map存储特定步骤的状态。该executioncontext使重启步骤成为可能,因为状态在jobrepository中持久存在。

执行期间出现错误时,最后一个状态将更新为jobrepository。下次作业运行时,最后一个状态将用于填充executioncontext然后

可以继续从上次离开的地方开始运行。

检查itemstream接口:

将在步骤开始时调用open()并执行executioncontext;

用db填充值; update()将在每个步骤或事务结束时调用,更新executioncontext;

完成所有数据块后调用close();

Spring Batch 如何自定义ItemReader

下面我们构造个例子

准备个cvs文件,在第33条数据,添加一条错误名字信息 ;当读取到这条数据时,抛出异常终止程序。

Spring Batch 如何自定义ItemReader

itemreader测试代码

 
@component("restartdemoreader")
public class restartdemoreader implements itemstreamreader<customer> {  
    private long curline = 0l;
    private boolean restart = false; 
    private flatfileitemreader<customer> reader = new flatfileitemreader<>(); 
    private executioncontext executioncontext;
    restartdemoreader
    public () {
        
        reader.setresource(new classpathresource("restartdemo.csv")); 
        delimitedlinetokenizer tokenizer = new delimitedlinetokenizer();
        tokenizer.setnames(new string[]{"id", "firstname", "lastname", "birthdate"});
 
        defaultlinemapper<customer> linemapper = new defaultlinemapper<>();
        linemapper.setlinetokenizer(tokenizer);
        linemapper.setfieldsetmapper((fieldset -> {
            return customer.builder().id(fieldset.readlong("id"))
                    .firstname(fieldset.readstring("firstname"))
                    .lastname(fieldset.readstring("lastname"))
                    .birthdate(fieldset.readstring("birthdate"))
                    .build();
        }));
        linemapper.afterpropertiesset(); 
        reader.setlinemapper(linemapper);
    }
 
    @override
    public customer read() throws exception, unexpectedinputexception, parseexception,
            nontransientresourceexception { 
        customer customer = null; 
        this.curline++;
        //如果是重启,则从上一步读取的行数继续往下执行
        if (restart) {
            reader.setlinestoskip(this.curline.intvalue()-1);
            restart = false;
            system.out.println("start reading from line: " + this.curline);
        }
 
        reader.open(this.executioncontext); 
        customer = reader.read();
        //当匹配到wrongname时,显示抛出异常,终止程序
        if (customer != null) {
            if (customer.getfirstname().equals("wrongname"))
                throw new runtimeexception("something wrong. customer id: " + customer.getid());
        } else {
            curline--;
        }
        return customer;
    }
 
    /**
     * 判断是否是重启job
     * @param executioncontext
     * @throws itemstreamexception
     */
    @override
    public void open(executioncontext executioncontext) throws itemstreamexception {
        this.executioncontext = executioncontext;
        if (executioncontext.containskey("curline")) {
            this.curline = executioncontext.getlong("curline");
            this.restart = true;
        } else {
            this.curline = 0l;
            executioncontext.put("curline", this.curline.intvalue());
        } 
    }
 
    @override
    public void update(executioncontext executioncontext) throws itemstreamexception {
        system.out.println("update curline: " + this.curline);
        executioncontext.put("curline", this.curline); 
    }
 
    @override
    public void close() throws itemstreamexception { 
    }
}

job配置

以10条记录为一个批次,进行读取

@configuration
public class restartdemojobconfiguration {
    @autowired
    private jobbuilderfactory jobbuilderfactory;
 
    @autowired
    private stepbuilderfactory stepbuilderfactory;
 
    @autowired
    @qualifier("flatfiledemowriter")
    private itemwriter<? super customer> flatfiledemowriter;
 
    @autowired
    @qualifier("restartdemoreader")
    private itemreader<customer> restartdemoreader;
 
    @bean
    public job restartdemojob(){
        return jobbuilderfactory.get("restartdemojob")
                .start(restartdemostep())
                .build(); 
    }
 
    @bean
    public step restartdemostep() {
        return stepbuilderfactory.get("restartdemostep")
                .<customer,customer>chunk(10)
                .reader(restartdemoreader)
                .writer(flatfiledemowriter)
                .build();
    }
}

当我们第一次执行时,程序在33行抛出异常异常,curline值是30;

Spring Batch 如何自定义ItemReader

这时,我们可以查询数据库 batch_step_excution表,发现curline值已经以 键值对形式,持久化进数据库(上文以10条数据为一个批次;故33条数据异常时,curline值为30)

Spring Batch 如何自定义ItemReader

接下来,我们更新wrongname,再次执行程序;

程序会执行open方法,判断数据库step中map是否存在curline,如果存在,则是重跑,即读取curline,从该批次开始往下继续执行;

Spring Batch 如何自定义ItemReader

Spring Batch 如何自定义ItemReader

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。