springbatch的封装与使用实例详解
程序员文章站
2024-02-11 09:42:34
spring batch官网介绍:
a lightweight, comprehensive batch framework designed to enable the...
spring batch官网介绍:
a lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.(一款轻量的、全面的批处理框架,用于开发强大的日常运营的企业级批处理应用程序。)
springbatch
主要实现批量数据的处理,我对batch进行的封装,提出了jobbase类型,具体job需要实现它即可。spring batch 不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程中解放出来,使他们可以更多地去关注核心的业务处理过程。
几个组件
•job
•step
•read
•write
•listener
•process
•validator
jobbase定义了几个公用的方法
/** * springbatch的job基础类. */ public abstract class jobbase<t> { /** * 批次. */ protected int chunkcount = 5000; /** * 监听器. */ private jobexecutionlistener jobexecutionlistener; /** * 处理器. */ private validatingitemprocessor<t> validatingitemprocessor; /** * job名称. */ private string jobname; /** * 检验器. */ private validator<t> validator; @autowired private jobbuilderfactory job; @autowired private stepbuilderfactory step; /** * 初始化. * * @param jobname job名称 * @param jobexecutionlistener 监听器 * @param validatingitemprocessor 处理器 * @param validator 检验 */ public jobbase(string jobname, jobexecutionlistener jobexecutionlistener, validatingitemprocessor<t> validatingitemprocessor, validator<t> validator) { this.jobname = jobname; this.jobexecutionlistener = jobexecutionlistener; this.validatingitemprocessor = validatingitemprocessor; this.validator = validator; } /** * job初始化与启动. */ public job getjob() throws exception { return job.get(jobname).incrementer(new runidincrementer()) .start(syncstep()) .listener(jobexecutionlistener) .build(); } /** * 执行步骤. * * @return */ public step syncstep() throws exception { return step.get("step1") .<t, t>chunk(chunkcount) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } /** * 单条处理数据. * * @return */ public itemprocessor<t, t> processor() { validatingitemprocessor.setvalidator(processorvalidator()); return validatingitemprocessor; } /** * 校验数据. * * @return */ @bean public validator<t> processorvalidator() { return validator; } /** * 批量读数据. * * @return * @throws exception */ public abstract itemreader<t> reader() throws exception; /** * 批量写数据. * * @return */ @bean public abstract itemwriter<t> writer(); }
主要规定了公用方法的执行策略,而具体的job名称,读,写还是需要具体job去实现的。
具体job实现
@configuration @enablebatchprocessing public class syncpersonjob extends jobbase<person> { @autowired private datasource datasource; @autowired @qualifier("primaryjdbctemplate") private jdbctemplate jdbctemplate; /** * 初始化,规则了job名称和监视器. */ public syncpersonjob() { super("personjob", new personjoblistener(), new personitemprocessor(), new beanvalidator<>()); } @override public itemreader<person> reader() throws exception { stringbuffer sb = new stringbuffer(); sb.append("select * from person"); string sql = sb.tostring(); jdbccursoritemreader<person> jdbccursoritemreader = new jdbccursoritemreader<>(); jdbccursoritemreader.setsql(sql); jdbccursoritemreader.setrowmapper(new beanpropertyrowmapper<>(person.class)); jdbccursoritemreader.setdatasource(datasource); return jdbccursoritemreader; } @override @bean("personjobwriter") public itemwriter<person> writer() { jdbcbatchitemwriter<person> writer = new jdbcbatchitemwriter<person>(); writer.setitemsqlparametersourceprovider(new beanpropertyitemsqlparametersourceprovider<person>()); string sql = "insert into person_export " + "(id,name,age,nation,address) " + "values(:id, :name, :age, :nation,:address)"; writer.setsql(sql); writer.setdatasource(datasource); return writer; } }
写操作需要定义自己的bean的声明
注意,需要为每个job的write启个名称,否则在多job时,write将会被打乱
/** * 批量写数据. * * @return */ @override @bean("personverson2jobwriter") public itemwriter<person> writer() { }
添加一个api,手动触发
@autowired syncpersonjob syncpersonjob; @autowired joblauncher joblauncher; void exec(job job) throws exception { jobparameters jobparameters = new jobparametersbuilder() .addlong("time", system.currenttimemillis()) .tojobparameters(); joblauncher.run(job, jobparameters); } @requestmapping("/run1") public string run1() throws exception { exec(syncpersonjob.getjob()); return "personjob success"; }
总结
以上所述是小编给大家介绍的springbatch的封装与使用实例详解,希望对大家有所帮助
上一篇: C# 获取 PC 序列号的方法示例