欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

springbatch的封装与使用实例详解

程序员文章站 2024-02-11 09:42:34
spring batch官网介绍: a lightweight, comprehensive batch framework designed to enable the...

spring batch官网介绍:

a lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.(一款轻量的、全面的批处理框架,用于开发强大的日常运营的企业级批处理应用程序。)

springbatch

主要实现批量数据的处理,我对batch进行的封装,提出了jobbase类型,具体job需要实现它即可。spring batch 不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程中解放出来,使他们可以更多地去关注核心的业务处理过程。

几个组件

•job
•step
•read
•write
•listener
•process
•validator

jobbase定义了几个公用的方法

 /**
 * springbatch的job基础类.
 */
 public abstract class jobbase<t> {
  /**
  * 批次.
  */
  protected int chunkcount = 5000;
  /**
  * 监听器.
  */
  private jobexecutionlistener jobexecutionlistener;
  /**
  * 处理器.
  */
  private validatingitemprocessor<t> validatingitemprocessor;
  /**
  * job名称.
  */
  private string jobname;
  /**
  * 检验器.
  */
  private validator<t> validator;
  @autowired
  private jobbuilderfactory job;
  @autowired
  private stepbuilderfactory step;
  /**
  * 初始化.
  *
  * @param jobname         job名称
  * @param jobexecutionlistener  监听器
  * @param validatingitemprocessor 处理器
  * @param validator        检验
  */
  public jobbase(string jobname,
         jobexecutionlistener jobexecutionlistener,
         validatingitemprocessor<t> validatingitemprocessor,
         validator<t> validator) {
   this.jobname = jobname;
   this.jobexecutionlistener = jobexecutionlistener;
   this.validatingitemprocessor = validatingitemprocessor;
   this.validator = validator;
  }
  /**
  * job初始化与启动.
  */
  public job getjob() throws exception {
   return job.get(jobname).incrementer(new runidincrementer())
     .start(syncstep())
     .listener(jobexecutionlistener)
     .build();
  }
  /**
  * 执行步骤.
  *
  * @return
  */
  public step syncstep() throws exception {
   return step.get("step1")
     .<t, t>chunk(chunkcount)
     .reader(reader())
     .processor(processor())
     .writer(writer())
     .build();
  }
  /**
  * 单条处理数据.
  *
  * @return
  */
  public itemprocessor<t, t> processor() {
   validatingitemprocessor.setvalidator(processorvalidator());
   return validatingitemprocessor;
  }
  /**
  * 校验数据.
  *
  * @return
  */
  @bean
  public validator<t> processorvalidator() {
   return validator;
  }
  /**
  * 批量读数据.
  *
  * @return
  * @throws exception
  */
  public abstract itemreader<t> reader() throws exception;
  /**
  * 批量写数据.
  *
  * @return
  */
  @bean
  public abstract itemwriter<t> writer();
 }

主要规定了公用方法的执行策略,而具体的job名称,读,写还是需要具体job去实现的。

具体job实现

 @configuration
 @enablebatchprocessing
 public class syncpersonjob extends jobbase<person> {
  @autowired
  private datasource datasource;
  @autowired
  @qualifier("primaryjdbctemplate")
  private jdbctemplate jdbctemplate;
  /**
  * 初始化,规则了job名称和监视器.
  */
  public syncpersonjob() {
   super("personjob", new personjoblistener(), new personitemprocessor(), new beanvalidator<>());
  }
  @override
  public itemreader<person> reader() throws exception {
   stringbuffer sb = new stringbuffer();
   sb.append("select * from person");
   string sql = sb.tostring();
   jdbccursoritemreader<person> jdbccursoritemreader =
     new jdbccursoritemreader<>();
   jdbccursoritemreader.setsql(sql);
   jdbccursoritemreader.setrowmapper(new beanpropertyrowmapper<>(person.class));
   jdbccursoritemreader.setdatasource(datasource);
   return jdbccursoritemreader;
  }
  @override
  @bean("personjobwriter")
  public itemwriter<person> writer() {
   jdbcbatchitemwriter<person> writer = new jdbcbatchitemwriter<person>();
   writer.setitemsqlparametersourceprovider(new beanpropertyitemsqlparametersourceprovider<person>());
   string sql = "insert into person_export " + "(id,name,age,nation,address) "
     + "values(:id, :name, :age, :nation,:address)";
   writer.setsql(sql);
   writer.setdatasource(datasource);
   return writer;
  }
 }

写操作需要定义自己的bean的声明

注意,需要为每个job的write启个名称,否则在多job时,write将会被打乱

 /**
  * 批量写数据.
  *
  * @return
  */
 @override
 @bean("personverson2jobwriter")
 public itemwriter<person> writer() {
  
 }

添加一个api,手动触发

 @autowired
 syncpersonjob syncpersonjob;
 @autowired
 joblauncher joblauncher;
 void exec(job job) throws exception {
  jobparameters jobparameters = new jobparametersbuilder()
    .addlong("time", system.currenttimemillis())
    .tojobparameters();
  joblauncher.run(job, jobparameters);
 }
 @requestmapping("/run1")
 public string run1() throws exception {
  exec(syncpersonjob.getjob());
  return "personjob success";
 }

总结

以上所述是小编给大家介绍的springbatch的封装与使用实例详解,希望对大家有所帮助