欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

详解spring batch的使用和定时器Quart的使用

程序员文章站 2024-02-19 09:56:52
spring batch是一个基于spring的企业级批处理框架,它通过配合定时器quartz来轻易实现大批量的数据读取或插入,并且全程自动化,无需人员管理。 在使用...

spring batch是一个基于spring的企业级批处理框架,它通过配合定时器quartz来轻易实现大批量的数据读取或插入,并且全程自动化,无需人员管理。

在使用spring batch之前,得对spring batch的流程有一个基本了解

每个batch它都包含了一个job,而一个job中却有可能包含多个step,整个batch中干活的是step,batch主要是用来对数据的操作,所以step就有三个操作数据的东西,一个是itemreader用来读取数据的,一个是itemprocessor用来处理数据的,一个是itemwriter用来写数据(可以是文件也可以是插入sql语句),joblauncher用来启动job,jobrepository是上述处理提供的一种持久化机制,它为joblauncher,job,和step实例提供crud操作。

pom.xml  三个batch的jar包

<dependency> 
     <groupid>org.springframework</groupid> 
     <artifactid>spring-batch-core</artifactid> 
     <version>2.1.8.release</version> 
    </dependency> 
     
    <dependency> 
     <groupid>org.springframework</groupid> 
     <artifactid>spring-batch-infrastructure</artifactid> 
     <version>2.1.8.release</version> 
<dependency> 
     
     <dependency> 
     <groupid>org.springframework</groupid> 
     <artifactid>spring-batch-test</artifactid> 
     <version>2.1.8.release</version> 
    </dependency>  

batch.xml

<beans xmlns="http://www.springframework.org/schema/beans" 
  xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" 
  xsi:schemalocation="http://www.springframework.org/schema/batch 
    http://www.springframework.org/schema/batch/spring-batch-2.1.xsd 
    http://www.springframework.org/schema/beans  
    http://www.springframework.org/schema/beans/spring-beans-3.1.xsd 
  "> 
 
 
  <bean id="joblauncher" 
    class="org.springframework.batch.core.launch.support.simplejoblauncher"> 
    <property name="jobrepository" ref="jobrepository" /> 
  </bean> 
 
  <bean id="jobrepository" 
    class="org.springframework.batch.core.repository.support.mapjobrepositoryfactorybean"> 
    <property name="validatetransactionstate" value="false" /> 
 
  </bean> 
<!--一个job--> 
    <batch:job id="writerteacherinterview"> 
        <batch:step id="teacherinterview"> 
          <batch:tasklet> 
            <batch:chunk reader="jdbcitemreaderteacherinterview" writer="teacherinterviewitemwriter" 
              processor="teacherinterviewprocessor" commit-interval="10"> 
            </batch:chunk> 
          </batch:tasklet> 
        </batch:step> 
      </batch:job> 
 
 
 
 
 
  <!--job的读取数据操作--> 
  <bean id="jdbcitemreaderteacherinterview" 
    class="org.springframework.batch.item.database.jdbccursoritemreader" 
    scope="step"> 
    <property name="datasource" ref="datasource" /> 
    <property name="sql" 
      value="select distinct teachername ,count(teachername) as num from examininterviewrecord  where pdate >'${detail_startime}' and pdate < '${detail_endtime}' group by teachername " /> 
    <property name="rowmapper" ref="teacherinterviewmapper"> 
    </property> 
  </bean> 
 
 
</beans> 

读取数据    teacherinterviewmapper

package com.yc.batch; 
 
import java.sql.resultset; 
import java.sql.sqlexception; 
import org.springframework.jdbc.core.rowmapper; 
import org.springframework.stereotype.component; 
import com.yc.vo.teacherinterviewdetail; 
import com.yc.vo.teacherworkdetail; 
import com.yc.vo.workdetail; 
@component("teacherinterviewmapper")  
public class teacherinterviewmapper implements rowmapper {  
  @override 
  public object maprow(resultset rs, int rownum) throws sqlexception {  
     
    teacherinterviewdetail tid=new teacherinterviewdetail(); 
    tid.setteachername(rs.getstring("teachername")); 
     tid.setnum(rs.getint("num")); 
    return tid;  
  } 
}  

处理数据  teacherinterviewprocessor ,这个处理数据方法,一般都是在这里在这里进行一些数据的加工,比如有些数据没有读到,你也可以在这个方法和后面那个写入数据的类里面写,所以就导致了这个类里面你可以什么都不敢,直接把数据抛到后面去,让后面的写数据类来处理;我这里就是处理数据的这个类什么都没写,但是最好还是按它的规则来!

package com.yc.batch; 
 
import org.hibernate.engine.transaction.jta.platform.internal.synchronizationregistrybasedsynchronizationstrategy; 
import org.springframework.batch.item.itemprocessor; 
import org.springframework.stereotype.component; 
import org.springframework.stereotype.service; 
 
import com.yc.vo.teacherinterviewdetail; 
import com.yc.vo.teacherworkdetail; 
import com.yc.vo.workdetail; 
 
 
//业务层 
@component("teacherinterviewprocessor") 
public class teacherinterviewprocessor implements itemprocessor<teacherinterviewdetail, teacherinterviewdetail> { 
 
  @override 
  public teacherinterviewdetail process(teacherinterviewdetail teacherinterviewdetail) throws exception { 
      
    return teacherinterviewdetail; 
  } 
} 

写数据 teacherinterviewitemwriter 这个类里面主要是把数据写进一个文件里,同时我这个类里面还有一些数据处理

package com.yc.batch; 
 
import java.io.inputstream; 
 
import java.text.numberformat; 
import java.util.arraylist; 
import java.util.list; 
import java.util.properties; 
import javax.annotation.resource; 
import org.springframework.batch.item.itemwriter; 
import org.springframework.stereotype.component; 
import org.springframework.stereotype.service; 
import com.yc.biz.examineeclassbiz; 
import com.yc.biz.workbiz; 
import com.yc.utils.csvutils; 
import com.yc.vo.teacherinterviewdetail; 
import com.yc.vo.teacherworkdetail; 
import com.yc.vo.workdetail; 
import net.sf.ehcache.util.propertyutil; 
//写 
@component("teacherinterviewitemwriter") 
public class teacherinterviewitemwriter implements itemwriter<teacherinterviewdetail>{ 
 
  @override 
  public void write(list<? extends teacherinterviewdetail> teacherinterviewdetails) throws exception { 
    properties props = new properties(); 
    inputstream in= propertyutil.class.getclassloader().getresourceasstream("connectionconfig.properties"); 
    props.load(in); 
    string time=props.getproperty("detail_time"); 
    csvutils cu=new csvutils(); 
     list<object> works=new arraylist<object>(); 
     for(teacherinterviewdetail t:teacherinterviewdetails){ 
       works.add(t); 
     } 
      
    string path=this.getclass().getresource("/").getpath(); 
    path=path.substring(0,path.lastindexof("/")); 
    path=path.substring(0,path.lastindexof("/")); 
    path=path.substring(0,path.lastindexof("/")); 
    path=path.substring(0,path.lastindexof("/")); 
    cu.writecsv(path+"/csv/teacherinterview_"+time+".csv",works );  
  }  
} 

我这里有用到一个吧数据写进csv文件的jar包

<dependency> 
     <groupid>net.sourceforge.javacsv</groupid> 
     <artifactid>javacsv</artifactid> 
     <version>2.0</version> 
    </dependency> 

csvutils帮助类的写入csv文件方法

/** 
   * 写入csv文件 
   * @throws ioexception 
   */  
  public void writecsv(string path,list<object> t) throws ioexception{  
    string csvfilepath = path; 
    string filepath=path.substring(0,path.lastindexof("/")); 
    file f=new file(filepath); 
    if(!f.exists()){ 
      f.mkdirs(); 
    } 
    file file=new file(path); 
    if(!file.exists()){ 
      file.createnewfile(); 
    } 
    csvwriter wr =new csvwriter(csvfilepath,',',charset.forname("gbk")); 
    try {  
      for(object obj:t){ 
        string[] contents=obj.tostring().split(","); 
        wr.writerecord(contents);  
      } 
      wr.close();  
    } catch (ioexception e) {  
      e.printstacktrace();  
    }  
  }  

就这样一个基本的batch流程就跑起来了,它通过从数据里读取一些数据,然后经过处理后,被存进服务器下的一个文件里面,之后像这种数据的读取就不需要去数据库里面查询了,而是可以直接通过读取csv文件来处理这个业务。一般使用这个的都会配一个定时器,让它们每隔一段时间跑一次,从而获得较新的数据

下面是定时器的配置

定时器的配置非常简单,我是使用注解方式来配置的

定时器任务类

package com.yc.task.impl; 
import javax.transaction.transactional; 
import org.springframework.batch.core.jobparametersinvalidexception; 
import org.springframework.batch.core.repository.jobexecutionalreadyrunningexception; 
import org.springframework.batch.core.repository.jobinstancealreadycompleteexception; 
import org.springframework.batch.core.repository.jobrestartexception; 
import org.springframework.batch.item.itemprocessor; 
import org.springframework.beans.factory.annotation.autowired; 
import org.springframework.scheduling.annotation.scheduled; 
import org.springframework.stereotype.component; 
import org.springframework.stereotype.service; 
 
import com.yc.batch.classbatch; 
import com.yc.batch.messageitembatch; 
import com.yc.batch.teacherinterviewbatch; 
import com.yc.batch.tearcherbatch; 
import com.yc.po.work; 
import com.yc.task.worktask; 
import com.yc.vo.workdetail; 
@service 
public class worktaskimpl implements worktask{ 
 
  @autowired 
  private teacherinterviewbatch teacherinterviewbatch;//教师访谈记录 
  public void setteacherinterviewbatch(teacherinterviewbatch teacherinterviewbatch) { 
    this.teacherinterviewbatch = teacherinterviewbatch; 
  } 
   
  @scheduled(cron= "0 30 22 * * ?")  //每天晚上十点30执行一次 这个注解会让框架会自动把这个方法看成任务启动方法  
  @override 
  public void task() { 
    try { 
      teacherinterviewbatch.test();//教师访谈 
    } catch (exception e) { 
      e.printstacktrace(); 
    } 
     
  } 
 
} 

定时器所真正要执行的方法

package com.yc.batch; 
 
import javax.annotation.resource; 
import org.apache.commons.jexl2.main; 
import org.springframework.batch.core.job; 
import org.springframework.batch.core.jobexecution; 
import org.springframework.batch.core.jobparameters; 
import org.springframework.batch.core.jobparametersbuilder; 
import org.springframework.batch.core.jobparametersinvalidexception; 
import org.springframework.batch.core.launch.joblauncher; 
import org.springframework.batch.core.repository.jobexecutionalreadyrunningexception; 
import org.springframework.batch.core.repository.jobinstancealreadycompleteexception; 
import org.springframework.batch.core.repository.jobrestartexception; 
import org.springframework.beans.factory.annotation.autowired; 
import org.springframework.stereotype.component; 
 
@component 
public class teacherinterviewbatch { 
 
  private job job; 
  private joblauncher launcher; 
 
  @resource(name="writerteacherinterview") 
  public void setjob(job job) { 
    this.job = job; 
  } 
 
  @autowired  
  public void setlauncher(joblauncher launcher) { 
    this.launcher = launcher; 
  } 

  以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。