Spring batch自定义LineMapper实现特殊文本的处理
spring-batch是Java Spring的的批处理框架,而且提供了简单的文本处理功能。
下面的的例子,实现了对文本的下载和处理的流程。
<!-- Tasks分多个Task,Task启动job,job分为step --> <batch:job id="wxJob" job-repository="jobRepository" restartable="true" > <batch:step id="downloadWxFile" next="analyseInfo"> <batch:tasklet> <bean class="com.secondgame.demo_service.demo.batch.task.WxDownloadStep" scope="step"> <property name="filenamePrefix" value="#{jobParameters['filenamePrefix']}"></property> <property name="date" value="#{jobParameters['date']}"></property> <property name="datetime" value="#{jobParameters['datetime']}"></property> <property name="workpath" value="#{jobParameters['workpath']}"></property> <property name="filename" value="#{jobParameters['filename']}"></property> </bean> </batch:tasklet> </batch:step> <batch:step id="analyseInfo"> <!-- <bean class="com.secondgame.demo_service.demo.batch.task.depositor.LoadStep" scope="step"> </bean> --> <batch:tasklet transaction-manager="transactionManager"> <batch:chunk reader="wxReader" writer="wxWriter" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <!-- wx Detail --> <bean id="wxReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"> <property name="resource" value="file:#{jobParameters['filename']}"></property> <property name="encoding" value="UTF-8"></property> <property name="comments" value="#{'总'}"></property> <property name="lineMapper" ref="wxMapper"></property> <property name="linesToSkip" value="1"></property> </bean> <bean id="wxWriter" class="com.secondgame.demo_service.demo.batch.task.WxWriter" scope="step"> </bean> <bean id="wxMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <!-- <bean id="wxMapper" class="com.secondgame.demo_service.demo.batch.task.MyLineMapper"> -->
<property name="lineTokenizer" ref="wxMultiTokenizer"></property> <property name="fieldSetMapper"> <bean class="com.secondgame.demo_service.demo.batch.task.WxFileSetMapper"> </bean> </property> </bean> <bean id="wxMultiTokenizer" class="org.springframework.batch.item.file.transform.PatternMatchingCompositeLineTokenizer" scope="step"> <!-- <property name="delimiter" value=","></property> --> <property name="tokenizers"> <map> <entry key="*,*,*,*,*" value-ref="wxTokenizerTail"></entry> <entry key="*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*" value-ref="wxTokenizer"></entry> </map> </property> </bean> <bean id="wxTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" scope="step"> <property name="delimiter" value=","></property> <property name="names"> <list> <value>tradeTime</value> <value>pubAccountId</value> <value>merchantId</value> <value>subMerchantId</value> <value>deviceId</value> <value>wxOrderId</value> <value>merchantOrderId</value> <value>userTag</value> <value>tradeType</value> <value>tradeStatus</value> <value>payerBank</value> <value>capitalType</value> <value>totalAmount</value> <value>enterpriseRedAmount</value> <value>wxRefundId</value> <value>merchantRefundId</value> <value>refundAmount</value> <value>enterpriseRedRefundAmount</value> <value>refundType</value> <value>refundStatus</value> <value>goodsName</value> <value>merchantData</value> <value>fee</value> <value>feeRate</value> </list> </property> </bean> <bean id="wxTokenizerTail" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" scope="step"> <property name="delimiter" value=","></property> <property name="names"> <list> <value>totalCount</value> <value>totalAmount</value> <value>refundAmount</value> <value>enterpriseRedRefundAmount</value> <value>fee</value> </list> </property> </bean>
可以看出,DefaultLineMapper实现了简单的文本处理功能,直接将切分的工作交给了wxMultiTokenizer处理文本的工作交给了WxFileSetMapper。这两个分别由两个bean实现注入,spring类DefaultLineMapper的代码如下:
package org.springframework.batch.item.file.mapping; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.batch.item.file.transform.LineTokenizer; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; /** * Two-phase {@link LineMapper} implementation consisting of tokenization of the line into {@link FieldSet} followed by * mapping to item. If finer grained control of exceptions is needed, the {@link LineMapper} interface should be * implemented directly. * * @author Robert Kasanicky * @author Lucas Ward * * @param <T> type of the item */ public class DefaultLineMapper<T> implements LineMapper<T>, InitializingBean { private LineTokenizer tokenizer; private FieldSetMapper<T> fieldSetMapper; @Override public T mapLine(String line, int lineNumber) throws Exception { return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line)); } public void setLineTokenizer(LineTokenizer tokenizer) { this.tokenizer = tokenizer; } public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) { this.fieldSetMapper = fieldSetMapper; } @Override public void afterPropertiesSet() { Assert.notNull(tokenizer, "The LineTokenizer must be set"); Assert.notNull(fieldSetMapper, "The FieldSetMapper must be set"); } }
核心方法mapLine就一句话:return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
将tokenizer解析后的结果,直接交给fieldSetMapper的mapFieldSet处理,用户仅需要自定义mapFieldSet方法实现如何处理。如何解析和如何处理不是本文关心的内容,后续笔者会专门介绍。
在这里有一个问题,如果文本有空行,tokenizer.tokenize(line)会返回""而不是null,导致fieldSetMapper.mapFieldSet抛异常,而FlatFileItemReader提供的方法又无法跳过空行(暂时还没有找到办法),在这里笔者仅做了一个简单的改造,实现了自己的LineMapper,代码如下:
package com.secondgame.demo_service.demo.batch.task; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.FlatFileItemWriter; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.LineTokenizer; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; import com.alibaba.dubbo.common.utils.StringUtils; public class MyLineMapper<T> implements LineMapper<T>, InitializingBean { private LineTokenizer tokenizer; private FieldSetMapper<T> fieldSetMapper; @Override public T mapLine(String line, int lineNumber) throws Exception { if(StringUtils.isNotEmpty(line)){ return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line)); }else{ return null; } } public void setLineTokenizer(LineTokenizer tokenizer) { this.tokenizer = tokenizer; } public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) { this.fieldSetMapper = fieldSetMapper; } @Override public void afterPropertiesSet() { Assert.notNull(tokenizer, "The LineTokenizer must be set"); Assert.notNull(fieldSetMapper, "The FieldSetMapper must be set"); } }
总体代码都一样,只是在方法mapLine中对空串进行特别处理即可。框架对null的返回不作处理。
当然xml中,仍需修改一行代码,引用自定义的mapper即可
<!--<bean id="wxMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">--> <bean id="wxMapper" class="com.secondgame.demo_service.demo.batch.task.MyLineMapper">
写了半天,就是两句话的事儿:
1. 模仿重写DefaultLineMapper的mapLine方法适配空行
2. 配置文件使用自定义的MyLineMapper方法
3. 其它的更复杂的需求也可以用这个方法:比如过滤含有某些特殊字符的行(非行首),对特别文本进行替换(A替换成B)等等,都可以在这里进行行级别的处理