欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

详解SpringBoot和SpringBatch 使用

程序员文章站 2024-02-23 11:42:52
什么是spring batch spring batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。spring batch是spring...

什么是spring batch

spring batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。spring batch是spring的一个子项目,使用java语言并基于spring框架为基础开发,使的已经使用 spring 框架的开发者或者企业更容易访问和利用企业服务。

spring batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务,spring batch 同样提供了高级功能和特性来支持,比如分区功能、远程功能。总之,通过 spring batch 能够支持简单的、复杂的和大数据量的批处理作业。

spring batch 使用

我们首先配置spring batch 在spring boot 中的使用,数据库用的是mysql,pom文件如下,因为spring boot 中的spring batch 包含 hsqsldb 所以我们将其去除

<dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-batch</artifactid>
      <exclusions> <!-- 注意这里-->
        <exclusion>
          <groupid>org.hsqldb</groupid>
          <artifactid>hsqldb</artifactid>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-jdbc</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-web</artifactid>
    </dependency>
<dependency>
    <groupid>org.hibernate</groupid>
    <artifactid>hibernate-validator</artifactid>
  </dependency>
  <dependency>
    <groupid>mysql</groupid>
    <artifactid>mysql-connector-java</artifactid>
    <version>5.1.21</version>
  </dependency>
  <dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-test</artifactid>
    <scope>test</scope>
  </dependency>

配置好我们需要的实体类。页面就不展示了。

如果有数据校验添加的话那么我们需要配置自定义的检验器。若果没有课略过该步骤

public class csvbeanvalidator<t> implements validator<t>,initializingbean {
  private javax.validation.validator  validator;
  @override
  public void validate(t value) throws validationexception {
    set<constraintviolation<t >> constraintviolations=validator.validate(value);
    if(constraintviolations.size()>0){
      stringbuilder message=new stringbuilder();
      for(constraintviolation<t> constraintviolation:constraintviolations){
        message.append(constraintviolation.getmessage() +"\n");
      }
      throw new validationexception(message.tostring());
    }
  }
  //在这里我们使用的是jsr-303校验数据,在此进行初始化
  @override
  public void afterpropertiesset() throws exception { 
    validatorfactory validatorfactory= validation.builddefaultvalidatorfactory();
    validator=validatorfactory.usingcontext().getvalidator();
  }
}
public class csvitemprocessor extends validatingitemprocessor<person> {
  @override
  public person process(person item) throws validationexception {
     super.process(item); // 在这里启动 然后才会调用我们自定义的校验器,否则不能通过 。
     if (item.getnation().equals("汉族")){
       item.setname("01");
     }else{
       item.setnation("02");
     }
     return item;
  }
}

进行job任务监听 自定义类实现jobexecutionlistener 即可

long starttime;
 long endtime;
 @override
 public void beforejob(jobexecution jobexecution) {
   starttime = system.currenttimemillis();
   system.out.println("任务处理开始");
 }
 @override
 public void afterjob(jobexecution jobexecution) {
   endtime = system.currenttimemillis();
   system.out.println("耗时多长时间:" + (endtime - starttime) + "ms");
   system.out.println("任务处理结束");
 }

进行spring batch 的注入 方法有xml文件注入bean ,在这里选择java注入

@configuration
@enablebatchprocessing //开启批处理
public class csvbatchconfig {
  /**1 首先我们通过 flatfileitemreader 读取我们需要的文件 通过setresource来实现
   * 2 设置map 在这里通过先设置解析器 setlinetokenizer 来解析我们csv文件中的数   据
   * 3 setfieldsetmapper 将我们需要的数据转化为我们的实体对象 存储
   * 4 如果想 跳过前面的几行 需要使用setlinestoskip就可以实现 
   */ 
 @bean
 public itemreader<person> reader() throws exception {
   flatfileitemreader<person> reader = new flatfileitemreader<person>(); //1
   reader.setresource(new classpathresource("people.csv")); //2
     reader.setlinemapper(new defaultlinemapper<person>() {{ //3
       setlinetokenizer(new delimitedlinetokenizer() {{
         setnames(new string[] { "name","age", "nation" ,"address"});
       }});
       setfieldsetmapper(new beanwrapperfieldsetmapper<person>() {{
         settargettype(person.class);
       }});
     }});
       reader.setlinestoskip(3); 
     return reader;
 }
 @bean
 public itemprocessor<person, person> processor() {
   csvitemprocessor processor = new csvitemprocessor(); //1
   processor.setvalidator(csvbeanvalidator()); //2
   return processor;
 }
   /** 
    *写入数据到数据库中
    * 1执行的sql 语句 2 设置数据源 
     */
 @bean
 public itemwriter<person> writer(datasource datasource) {//1
   jdbcbatchitemwriter<person> writer = new jdbcbatchitemwriter<person>(); //2
   writer.setitemsqlparametersourceprovider(new beanpropertyitemsqlparametersourceprovider<person>());
   string sql = "insert into person " + "(id,name,age,nation,address) "
       + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)";
   writer.setsql(sql); //3
   writer.setdatasource(datasource);
   return writer;
 }
  // 作业的仓库 就是设置数据源
 @bean
 public jobrepository jobrepository(datasource datasource, platformtransactionmanager transactionmanager)
     throws exception {
   jobrepositoryfactorybean jobrepositoryfactorybean = new jobrepositoryfactorybean();
   jobrepositoryfactorybean.setdatasource(datasource);
   jobrepositoryfactorybean.settransactionmanager(transactionmanager);
   jobrepositoryfactorybean.setdatabasetype("mysql");
   return jobrepositoryfactorybean.getobject();
 }
   //调度器 使用它来执行 我们的批处理
 @bean
 public simplejoblauncher joblauncher(datasource datasource, platformtransactionmanager transactionmanager)
     throws exception {
   simplejoblauncher joblauncher = new simplejoblauncher();
   joblauncher.setjobrepository(jobrepository(datasource, transactionmanager));
   return joblauncher;
 }
   //将监听器加入到job中
 @bean
 public job importjob(jobbuilderfactory jobs, step s1) {
   return jobs.get("importjob")
       .incrementer(new runidincrementer())
       .flow(s1) //1
       .end()
       .listener(csvjoblistener()) //2
       .build();
 }
   //步骤绑定 reader 与writer 一次性处理65000条记录
 @bean
 public step step1(stepbuilderfactory stepbuilderfactory, itemreader<person> reader, itemwriter<person> writer,
     itemprocessor<person,person> processor) {
   return stepbuilderfactory
       .get("step1")
       .<person, person>chunk(65000) //1
       .reader(reader) //2
       .processor(processor) //3
       .writer(writer) //4
       .build();
 }
 @bean
 public csvjoblistener csvjoblistener() {
   return new csvjoblistener();
 }
 @bean
 public validator<person> csvbeanvalidator() {
   return new csvbeanvalidator<person>();
 }
}

在配置文件中 启动自动执行批处理

spring.batch.job.names = job1,job2 #启动时要执行的job,默认执行全部job

spring.batch.job.enabled=true #是否自动执行定义的job,默认是

spring.batch.initializer.enabled=true #是否初始化spring batch的数据库,默认为是

spring.batch.schema=

spring.batch.table-prefix= #设置springbatch的数据库表的前缀

项目汇总

从 项目中我们可以看到 总的步骤就是 首先读取我们需要实现的文件进行解析,然后转换成需要的实体类并且绑定到reader中,二 实现我们需要的writer 并且帮到到数据库上,三实现job监听器将其绑定到步骤中 。最后开启批处理 自动执行入库即可 。这个简单步骤主要是配置中用到的 理解流程 自己也可以方便实现 批处理的流程。

总结

以上所述是小编给大家介绍的springboot和springbatch 使用,希望对大家有所帮助