spring-batch - 从csv读取数据放入数据库
程序员文章站
2022-05-29 17:11:01
...
spring-batch 应用:从csv读取数据写入到数据库
一、spring配置文件
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.1.xsd"> <!-- 读取properties文件 --> <context:property-placeholder properties-ref="springProperties" /> <bean id="springProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean" p:location="classpath:spring.properties" /> <!-- 扫描@Required、@Autowired,、 @PreDestroy、@Resource 等注解,不会扫描@Transactional--> <!--<context:annotation-config />--> <!-- 扫描@Component, @Repository,@Service,@Controller注解的bean,实例化成spring的bean。已经实现了annotation-config的功能 --> <context:component-scan base-package="com.rollingstone.physician.ranker" /> <import resource="spring-data.xml"/> <import resource="spring-batch.xml"/> <import resource="physician-spring-batch-job-.xml"/> </beans>
spring-data.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd"> <!-- 读取poperties文件 --> <context:property-placeholder properties-ref="springProperties" /> <!-- 使注解@Transactional生效--> <tx:annotation-driven transaction-manager="transactionManager" /> <!-- 定义数据库连接--> <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close" p:driverClass="${app.jdbc.driverClassName}" p:jdbcUrl="${app.jdbc.url}" p:user="${app.jdbc.username}" p:password="${app.jdbc.password}" p:acquireIncrement="5" p:idleConnectionTestPeriod="60" p:maxPoolSize="100" p:maxStatements="50" p:minPoolSize="10" /> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate" p:dataSource-ref="dataSource" /> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" p:dataSource-ref="dataSource" /> <bean id="physicianDao" class="com.rollingstone.physician.ranker.spring.dao.PhysicianDao"> <property name="dataSource" ref="dataSource" /> </bean> <!-- 系统启动时,将执行如下sql--> <jdbc:initialize-database data-source="dataSource"> <jdbc:script location="classpath*:/org/springframework/batch/core/schema-drop-hsqldb.sql" /> <jdbc:script location="classpath*:/org/springframework/batch/core/schema-hsqldb.sql" /> <jdbc:script location="classpath:PhysicianData.sql" /> </jdbc:initialize-database> </beans>
spring-batch.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:util="http://www.springframework.org/schema/util" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:batch="http://www.springframework.org/schema/batch" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd"> <!-- 创建jobLauncher,用来运行job --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher" p:jobRepository-ref="jobRepository" p:taskExecutor-ref="taskExecutor"/> <!-- 创建线程池 --> <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" /> <!-- 创建jobRepository,用于存储job信息 --> <batch:job-repository id="jobRepository" data-source="dataSource" isolation-level-for-create="DEFAULT" transaction-manager="transactionManager"/> </beans>
physician-spring-batch-job-.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:util="http://www.springframework.org/schema/util" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:batch="http://www.springframework.org/schema/batch" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd"> <batch:job id="blackListJob" job-repository="jobRepository"> <batch:step id="blackListLoad"> <tasklet> <!--commit-interval 多少条进行一次commit--> <chunk reader="blackListFileItemReader" writer="blackListItemWriter" processor="blackListItemProcessor" commit-interval="${job.commit.interval}" skip-limit="${job.skip_limit}" retry-limit="3"> <skippable-exception-classes> <include class="org.springframework.batch.item.file.FlatFileParseException" /> <exclude class="java.io.FileNotFoundException"/> </skippable-exception-classes> <retryable-exception-classes> <include class="org.springframework.dao.DeadlockLoserDataAccessException"/> <exclude class="java.io.FileNotFoundException"/> </retryable-exception-classes> <no-rollback-exception-classes> <include class="org.springframework.batch.item.validator.ValidationException"/> </no-rollback-exception-classes> </chunk> </tasklet> </batch:step> <batch:listeners> <batch:listener ref="jobExecutionListenerDemo" /> <batch:listener ref="stepExecutionListenerDemo" /> </batch:listeners> <batch:validator ref="jobParametersValidatorDemo" /> </batch:job> <bean id="blackListFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="classpath:csv/BlackListFie.csv" /> <property name="linesToSkip" value="1"/> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="names" value="NAME,CARD_TYPE,CARD_NUM" /> </bean> </property> <property name="fieldSetMapper"> <bean class="com.joandora.spring.batch.spring.batch.BlackListFieldSetMapper" /> </property> </bean> </property> </bean> </beans>
二、主程序入口
/*** * 启动类<br> * 从文件中读取数据 保存到数据库中 */ public class Startup { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); BlackListLoaderJob blackListLoaderTask = (BlackListLoaderJob) context.getBean("blackListLoaderJob"); blackListLoaderTask.loadBlackList(); } }
blackListLoaderJob定义在physician-spring-batch-job-.xml中定义的一个任务。
使用如下标签定义:
<batch:job id="blackListLoaderJob" job-repository="jobRepository"> </batch:job>
一个job可以定义多个step,可以理解成一个任务分为多少个步骤:
<batch:step id="blackListLoad"> </atch:step>
在step里面定义一个tasklet:
<tasklet> <!--commit-interval 多少条进行一次commit--> <chunk reader="blackListFileItemReader" writer="blackListItemWriter" processor="blackListItemProcessor" commit-interval="${job.commit.interval}" skip-limit="${job.skip_limit}" retry-limit="3"> <skippable-exception-classes> <include class="org.springframework.batch.item.file.FlatFileParseException" /> <exclude class="java.io.FileNotFoundException"/> </skippable-exception-classes> <retryable-exception-classes> <include class="org.springframework.dao.DeadlockLoserDataAccessException"/> <exclude class="java.io.FileNotFoundException"/> </retryable-exception-classes> <no-rollback-exception-classes> <include class="org.springframework.batch.item.validator.ValidationException"/> </no-rollback-exception-classes> </chunk> </tasklet>
chunk的reader属性从文件读取数据,processor是将读取数据进行自定义处理,write将处理过的数据写入数据库。
commit-interval属性是一次事务提交时处理的记录数。skip-limit属性是允许跳过的最大记录数。retry-limit属性是允许重试的最大记录数。
skippable-exception-classes属性可以定义报某种错时就跳过该条记录的处理。
retryable-exception-classes属性可以定义报某种错时就重试该条记录。
no-rollback-exception-classes属性可以定义报某种错时事务不会回滚。
三、读取csv文件
<bean id="blackListFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="classpath:csv/BlackListFie.csv" /> <property name="linesToSkip" value="1"/> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="names" value="NAME,CARD_TYPE,CARD_NUM" /> </bean> </property> <property name="fieldSetMapper"> <bean class="com.joandora.spring.batch.spring.batch.BlackListFieldSetMapper" /> </property> </bean> </property> </bean>
属性解释:
resource:csv文件所在路径
linesToSkip:csv文件读取数据时,跳过的起始行数
四、处理从csv文件读取的每条数据
@Component public class BlackListItemProcessor implements ItemProcessor<BlackListDomain,BlackListDomain> { @Override public BlackListDomain process(BlackListDomain item) throws Exception { return item; } }
五、将处理过的数据写入数据库
@Component public class BlackListItemWriter implements ItemWriter<BlackListDomain> { private static final Logger LOG = LoggerFactory.getLogger(BlackListItemWriter.class); @Resource private IBlackListDao physicianDao; @Override public void write(List<? extends BlackListDomain> blackListDomains) { try { for (BlackListDomain blackListDomain : blackListDomains) { physicianDao.loadBlackList(blackListDomain); } physicianDao.queryBlackList(); } catch (Exception ple) { LOG.debug(ple.getMessage()); } } }
六、BlackListLoaderJob
@Component public class BlackListLoaderJob { private static final Logger LOG = LoggerFactory.getLogger(BlackListLoaderJob.class); private JobLauncher jobLauncher; private Job blackListJob; public Job getPhysicianJob() { return blackListJob; } @Autowired @Qualifier("blackListJob") public void setPhysicianJob(Job physicianJob) { this.blackListJob = physicianJob; } public JobLauncher getJobLauncher() { return jobLauncher; } @Autowired @Required public void setJobLauncher(JobLauncher jobLauncher) { this.jobLauncher = jobLauncher; } public void loadBlackList() { LOG.info("Entered sendEvents"); try { Map<String, JobParameter> parameters = new HashMap<String, JobParameter>(); parameters.put("date", new JobParameter(new Date())); jobLauncher.run(blackListJob, new JobParameters(parameters)); } catch (JobInstanceAlreadyCompleteException ex) { LOG.debug("This job has been completed already!"); } catch (Exception e) { throw new RuntimeException(e); } System.out.println("over----------------"); } }
七、pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>spring-batch-demo</artifactId> <groupId>com.joandora</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>spring-batch-writers</artifactId> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring.core.version>4.2.0.RELEASE</spring.core.version> <spring.data.jpa.version>1.7.1.RELEASE</spring.data.jpa.version> <spring.batch.version>3.0.7.RELEASE</spring.batch.version> <cglib.version>2.2</cglib.version> <aspectj.version>1.8.2</aspectj.version> <c3p0.version>0.9.1.2</c3p0.version> <querydsl.version>2.2.5</querydsl.version> <slf4j.version>1.7.13</slf4j.version> <log4j.version>1.2.17</log4j.version> <!-- Testing --> <junit.version>4.12</junit.version> <!-- Plugins --> <maven.copy.plugin.version>0.2.3</maven.copy.plugin.version> <maven.compiler.plugin.version>2.3.2</maven.compiler.plugin.version> <maven.apt.plugin.version>1.0</maven.apt.plugin.version> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.core.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.core.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.core.version}</version> <type>jar</type> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring.core.version}</version> <type>jar</type> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.core.version}</version> <type>jar</type> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> <version>${spring.core.version}</version> <type>jar</type> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId> <version>${spring.core.version}</version> <type>jar</type> <scope>compile</scope> </dependency> <!-- Spring Batch --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>${spring.batch.version}</version> </dependency> <!-- A seamless aspect-oriented extension to the Java programming language --> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> <version>${aspectj.version}</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>${aspectj.version}</version> </dependency> <!-- Cglib is a powerful, high performance and quality Code Generation Library, It is used to extend JAVA classes and implements interfaces at runtime. --> <dependency> <groupId>cglib</groupId> <artifactId>cglib-nodep</artifactId> <version>${cglib.version}</version> <type>jar</type> <scope>compile</scope> </dependency> <!-- Logger --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> <type>jar</type> <scope>compile</scope> </dependency> <!-- The Simple Logging Facade for Java or (SLF4J) serves as a simple facade or abstraction for various logging frameworks, e.g. java.util.logging, log4j and logback, allowing the end user to plug in the desired logging framework at deployment time. --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> <type>jar</type> <scope>compile</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> <type>jar</type> <scope>compile</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>${slf4j.version}</version> <type>jar</type> <scope>compile</scope> </dependency> <!-- Spring Data JPA --> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-jpa</artifactId> <version>${spring.data.jpa.version}</version> </dependency> <!-- Database pooling --> <dependency> <groupId>c3p0</groupId> <artifactId>c3p0</artifactId> <version>${c3p0.version}</version> <type>jar</type> <scope>compile</scope> </dependency> <!-- Testing dependencies --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <type>jar</type> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.core.version}</version> <type>jar</type> <scope>test</scope> </dependency> <!-- HSQLDB --> <dependency> <groupId>org.hsqldb</groupId> <artifactId>hsqldb</artifactId> <version>2.3.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>2.4</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>