springbatch每小时读取实时追加数据的每日报表入库
程序员文章站
2022-07-13 15:37:24
...
需求:每小时定时读取文件,文件名称是按日期递增,文件信息为同一个文件追加数据,需要每次批处理操作读取的时候设置起始行;
目前我们项目中所采用的方法,不知道还有没有更好的方法,希望大家分享
1、创建项目,配置数据源就不讲了,我的是springboot项目,其中(1)为springbatch项目的文件目录(2)为需要处理的文件,(3)为yml中的springbatch配置,其中个属性配置web应用启动的时候不执行批处理,默认为true执行,项目中是通过定时任务来执行赔处理所以设置为启动不执行,第二个属性为设置springbatch是否每次执行都创建其数据库表,第一次执行需要设置为true,初始化完数据库表之后设为false否则会报错 注意:引入的springbatch依赖中不要有内存数据库的依赖
2、job配置文件
package com.st.batch.job;
import com.st.batch.listener.JobCompletionNotificationListener;
import com.st.batch.step.MemberInfoStepConf;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class JobConf {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public Step memberInfoStep;
@Bean
public Job MFJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory.get("MFJob")
.incrementer(new RunIdIncrementer())
//.listener(listener)
.flow(memberInfoStep)
.end()
.build();
}
}
3、step配置文件
package com.st.batch.step;
import com.st.batch.entity.MemberInfo;
import com.st.batch.listener.JobCompletionNotificationListener;
import com.st.batch.listener.StepCompletionNotificationListener;
import com.st.batch.mapper.MemberInfoLineMapper;
import com.st.service.StartCountService;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.JsonLineMapper;
import org.springframework.batch.item.file.separator.JsonRecordSeparatorPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
@Configuration
@EnableBatchProcessing
public class MemberInfoStepConf {
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Autowired
StepCompletionNotificationListener steplistener;
@Autowired
StartCountService startCountService;
@Bean
public Step memberInfoStep() {
return stepBuilderFactory.get("memberInfoStep")
.allowStartIfComplete(true)
.listener(steplistener)
.<MemberInfo, MemberInfo> chunk(10)
.reader(memberInfoReader())
.writer(writer())
.build();
}
@Bean
@StepScope
public FlatFileItemReader<MemberInfo> memberInfoReader() {
//创建Reader
FlatFileItemReader<MemberInfo> reader = new FlatFileItemReader<MemberInfo>();
//加载Resource
reader.setResource(new ClassPathResource("classpath:").createRelative("IF_MemberInfo-"+new SimpleDateFormat("yyyy-MM-dd").format(new Date().getTime() - 40*60*1000)+".txt"));
//查询起始执行行数
HashMap parm = new HashMap();
parm.put("step", "memberInfoStep");
parm.put("date",new SimpleDateFormat("yyyy-MM-dd").format(new Date(new Date().getTime() - 40*60*1000)));
reader.setLinesToSkip(startCountService.getCount(parm) == null ? 0 :startCountService.getCount(parm));
reader.setRecordSeparatorPolicy(new JsonRecordSeparatorPolicy());
reader.setLineMapper(new MemberInfoLineMapper(new JsonLineMapper()));
return reader;
}
@Bean
public JdbcBatchItemWriter<MemberInfo> writer() {
JdbcBatchItemWriter<MemberInfo> writer = new JdbcBatchItemWriter<MemberInfo>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<MemberInfo>());
writer.setSql("INSERT INTO memberInfo (" +
"BrandCode,IFMemberId,MemberCode,MemName,Gender,MobilePhone,Email,IdentityCard,BirthDay,CounterCodeBelong,JoinDate,JoinTime,TotalPoint,MemberLevelCode,DataSource) " +
"VALUES (:brandCode,:iFMemberId,:memberCode,:memName,:gender,:mobilePhone,:email,:identityCard,:birthDay,:counterCodeBelong,:joinDate,:joinTime,:totalPoint,:memberLevelCode,:dataSource)");
writer.setDataSource(dataSource);
return writer;
}
}
其中reader.setLinesToSkip()方法为设置起始行数,是通过查询下图中表所获得,图中batch前缀的表为yml中第二项配置所配置,都是batch执行中的状态信息,
中的READ_COUNT来实现的,注意reader上必须使用@StepScope注解
4、LineMapper
package com.st.batch.mapper;
import com.st.batch.entity.MemberInfo;
import com.st.util.MapUtils;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.JsonLineMapper;
/**
* Created by admin on 2016/12/29.
*/
public class MemberInfoLineMapper implements LineMapper<MemberInfo> {
private JsonLineMapper delegate;
@Override
public MemberInfo mapLine(String line, int lineNumber) throws Exception {
return MapUtils.toObject(MemberInfo.class,delegate.mapLine(line, lineNumber),true);
//将每条对应信息转化为领域对象的工具类
}
public MemberInfoLineMapper(JsonLineMapper delegate) {
this.delegate = delegate;
}
public JsonLineMapper getDelegate() {
return delegate;
}
public void setDelegate(JsonLineMapper delegate) {
this.delegate = delegate;
}
}
5、定时调用job
package com.st.scheduled;
import com.st.batch.listener.JobCompletionNotificationListener;
import com.st.service.StartCountService;
import com.st.util.SpringContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* Created by sxm on 2016/10/14.
*/
@Component
@Configurable
@EnableScheduling
public class ScheduledTask {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
@Autowired
StartCountService startCountService;
@Scheduled(cron = "0 0/1 * * * ?")
public void reportCurrentTime() {
JobLauncher launcher = SpringContextUtil.getBean(JobLauncher.class);
Job importUserJob = SpringContextUtil.getBean("MFJob");
JobParameters jobParameters = new JobParametersBuilder()
.addDate("date", new Date()).toJobParameters();
try {
launcher.run(importUserJob, jobParameters);
log.info("批处理任务执行完成,date:"+new Date());
} catch (Exception e) {
e.printStackTrace();
}
}
}
IF_MemberInfo-2016-12-30.txt json格式
{"BrandCode":"MF","IFMemberId":"1267266","MemberCode":"13489568093","MemName":"\u5927\u5927\u5927\u6a58\u5b50458","Gender":"0","MobilePhone":"13489568093","Email":" ","IdentityCard":"","BirthYear":"","BirthDay":" ","CounterCodeBelong":"","BaCodeBelong":" ","JoinDate":"2016-12-28","JoinTime":" ","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"TB"}
{"BrandCode":"MF","IFMemberId":"1267265","MemberCode":"13840017311","MemName":"\u6768\u96e8\u6615","Gender":"2","MobilePhone":"13840017311","Email":" ","IdentityCard":"","BirthYear":"1999","BirthDay":"0806","CounterCodeBelong":"mf0sy003","BaCodeBelong":"14129994","JoinDate":"2016-12-28","JoinTime":"12:36:30 ","TotalPoint":"802","MemberLevelCode":"WMLC002","DataSource":"POS3"}
{"BrandCode":"MF","IFMemberId":"1267264","MemberCode":"18210648271","MemName":"","Gender":"","MobilePhone":"18210648271","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1267263","MemberCode":"18753740991","MemName":"","Gender":"","MobilePhone":"18753740991","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1267262","MemberCode":"13918726271","MemName":"","Gender":"","MobilePhone":"13918726271","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1267261","MemberCode":"15533079902","MemName":"","Gender":"","MobilePhone":"15533079902","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1267260","MemberCode":"18213506880","MemName":"\u9a6c\u5c0f\u59d0","Gender":"2","MobilePhone":"18213506880","Email":" ","IdentityCard":"","BirthYear":"1990","BirthDay":"0625","CounterCodeBelong":"MF0KM003","BaCodeBelong":"16108991","JoinDate":"2016-12-28","JoinTime":"12:14:23 ","TotalPoint":"804","MemberLevelCode":"WMLC002","DataSource":"MPOS"}
{"BrandCode":"MF","IFMemberId":"1267259","MemberCode":"15295502603","MemName":"","Gender":"","MobilePhone":"15295502603","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1265714","MemberCode":"18539039009","MemName":"\u6881\u4fca\u971e","Gender":"2","MobilePhone":"18539039009","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"MF0TMALL","BaCodeBelong":"99999998","JoinDate":"2016-12-26","JoinTime":"","TotalPoint":"0","MemberLevelCode":"WMLC003","DataSource":"bycsv"}
{"BrandCode":"MF","IFMemberId":"1262436","MemberCode":"13751786171","MemName":"\u674e\u6631\u84d3","Gender":"2","MobilePhone":"13751786171","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"MF0TMALL","BaCodeBelong":"99999998","JoinDate":"2016-12-23","JoinTime":"","TotalPoint":"0","MemberLevelCode":"WMLC003","DataSource":"bycsv"}
{"BrandCode":"MF","IFMemberId":"1262436","MemberCode":"13751786171","MemName":"\u674e\u6631\u84d3","Gender":"2","MobilePhone":"13751786171","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"MF0TMALL","BaCodeBelong":"99999998","JoinDate":"2016-12-23","JoinTime":"","TotalPoint":"0","MemberLevelCode":"WMLC003","DataSource":"bycsv"}
这只是项目起始的一个Demo,之后准备定义统一的Reader,并将writer中的模板方式插入数据库改为常用的mybatis方式,