欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

springbatch-4.2.x-Chapter 4. Configuring and Running a Job

程序员文章站 2024-02-29 19:32:52
...

springbatch-4.2.x-官方文档-机翻摘要,简单修改,仅供参考

Chapter 4. Configuring and Running a Job

 

Job接口有多种实现,但是构建器可以消除配置上的差异

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}

除了步骤之外,作业配置还可以包含其他有助于并行化的元素(<split>),声明性流控制(<decision>)和流定义的外部化(<flow />)

 

执行批处理作业时的一个关键问题与Job重新启动作业时的行为有关。

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .preventRestart()
                     ...
                     .build();
}

将restartable设置为false意味着“这 Job不支持再次启动”。重新启动Job无法重新启动的会导致JobRestartException抛出

 

JobListeners(JobExecutionListener 可以SimpleJob通过作业上的listeners元素添加到中

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .listener(sampleListener())
                     ...
                     .build();
}

应该注意的是,afterJob无论作业成功与否,都会调用。如果需要确定成功或失败,可以从以下位置获得JobExecution

public void afterJob(JobExecution jobExecution){
    if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
        //job success
    }
    else if(jobExecution.getStatus() == BatchStatus.FAILED){
        //job failure
    }
}

对应注解

  • @BeforeJob

  • @AfterJob

 

有一个 DefaultJobParametersValidator可用于约束简单的强制性和可选参数的组合,对于更复杂的约束,您可以自己实现接口。

@Bean
public Job job1() {
    return this.jobBuilderFactory.get("job1")
                     .validator(parametersValidator())
                     ...
                     .build();
}

 

@EnableBatchProcessing提供用于构建批处理作业的基本配置。(具体请参考源码)
• JobRepository - bean name "jobRepository"
• JobLauncher - bean name "jobLauncher"
• JobRegistry - bean name "jobRegistry"
• PlatformTransactionManager - bean name "transactionManager"
• JobBuilderFactory - bean name "jobBuilders"
• StepBuilderFactory - bean name "stepBuilders"
此配置的核心接口是BatchConfigurer。默认实现提供了上述的Bean,并且需要DataSource在上下文中将其 作为Bean。JobRepository将使用此数据源。

您可以通过创建BatchConfigurer接口的自定义实现来自定义这些bean中的任何一个。通常,扩展DefaultBatchConfigurer(如果BatchConfigurer未找到时提供 )并覆盖所需的方法。以下示例显示如何提供自定义事务管理器:

@Bean
public BatchConfigurer batchConfigurer() {
	return new DefaultBatchConfigurer() {
		@Override
		public PlatformTransactionManager getTransactionManager() {
			return new MyTransactionManager();
		}
	};
}

 

使用基本配置后,用户可以使用提供的构建器工厂来配置作业。以下是通过JobBuilderFactory和配置的两步作业的示例 StepBuilderFactory

@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class)
public class AppConfig {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
        return jobs.get("myJob").start(step1).next(step2).build();
    }

    @Bean
    protected Step step1(ItemReader<Person> reader,
                         ItemProcessor<Person, Person> processor,
                         ItemWriter<Person> writer) {
        return steps.get("step1")
            .<Person, Person> chunk(10)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
    }

    @Bean
    protected Step step2(Tasklet tasklet) {
        return steps.get("step2")
            .tasklet(tasklet)
            .build();
    }
}

 

您可以自定义通过BatchConfigurer接口的实现配置JobRepository

...
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
    factory.setTablePrefix("BATCH_");
    factory.setMaxVarCharLength(1000);
    return factory.getObject();
}
...

 

如果使用名称空间或提供的FactoryBean,则事务通知将自动在repository周围创建。

// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
    return factory.getObject();
}

如果不使用名称空间或工厂Bean,那么使用AOP配置存储库的事务行为也很重要:

@Bean
public TransactionProxyFactoryBean baseProxy() {
	TransactionProxyFactoryBean transactionProxyFactoryBean = new TransactionProxyFactoryBean();
	Properties transactionAttributes = new Properties();
	transactionAttributes.setProperty("*", "PROPAGATION_REQUIRED");
	transactionProxyFactoryBean.setTransactionAttributes(transactionAttributes);
	transactionProxyFactoryBean.setTarget(jobRepository());
	transactionProxyFactoryBean.setTransactionManager(transactionManager());
	return transactionProxyFactoryBean;
}

 

更改表前缀,仅表前缀是可配置的。表名和列名不是。

 

Spring批处理提供了作业存储库的内存Map版本

// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
    factory.setTransactionManager(transactionManager);
    return factory.getObject();
}

请注意,内存中的存储库是易失性的,因此不允许在JVM实例之间重新启动。它还不能保证同时启动两个具有相同参数的作业实例,并且不适合在多线程Job或本地分区中使用Step

 


 

 

 

 

 

 

 

 

 

 

 

相关标签: 学习