欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring Boot 整合——Spring batch通过不同方式读取数据(ItemReader)

程序员文章站 2022-05-01 17:01:44
...

关于版本

依赖 版本
springboot 2.4.0
spring batch 2.4.0

代码地址

因为每个例子涉及代码较多,且包含测试用例,如果都贴到文章中内容过多,所以只贴出了部分代码。全部的代码在这里: https://gitee.com/daifyutils/springboot-samples

此篇文章所属模块为:base-batch-2.4.0

目录地址

目录 测试目录 内容
dai.samples.batch.allowstart dai.samples.allow 测试任务可以重复执行
dai.samples.batch.base dai.samples.base 基础任务配置
dai.samples.batch.skip dai.samples.skip 跳过操作
dai.samples.batch.listener dai.samples.listener 任务监听器
dai.samples.batch.process dai.samples.process 流程控制的代码
dai.samples.batch.add dai.samples.add 任务流程切割
dai.samples.batch.retry dai.samples.retry 任务重试
dai.samples.batch.rollback dai.samples.rollback 任务回滚
dai.samples.batch.rw dai.samples.rw 数据的读取和输出

Spring Batch其他内容

Spring Boot 整合——Spring batch基本使用

Spring Boot 整合——Spring batch的监听器

Spring Boot 整合——Spring batch任务流程控制以及流程分割

Spring Boot 整合——Spring batch通过不同方式读取数据(ItemReader)

Spring Boot 整合——Spring batch通过不同方式输出数据(ItemWriter)

Spring Boot 整合——Spring batch重试和回滚

Spring batch

spring batch 是spring提供的一个批数据处理的框架。提供了大量信息的自动化和定时处理的操作。其是一个相对轻量级的批处理操作

数据输入和输出定义

Spring batch 中数据的输入和输出主要依托于两个接口:itemReaderitemWriter

itemReader

尽管是一个简单的概念,但anItemReader是从许多不同类型的输入中提供数据的方法。最一般的示例包括

itemWriter

ItemWriter在功能上类似于ItemReader反操作。资源仍然需要定位,打开和关闭,但是它们的区别在于 ItemWriter写出而不是读入。对于数据库或队列,这些操作可能是插入,更新或发送。输出序列化的格式特定于每个批处理作业。

itemReader 数据输入

此接口只提供了一个方法,在读取结束后放回一个null表示读取完毕。

public interface ItemReader<T> {

	@Nullable
	T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

读取文件行数据

    /**
     * 读取行数据
     * @return
     */
    @Bean(name = "itemLineReader")
    public FlatFileItemReader<BatchEntity> itemLineReader() throws Exception {
        // 设置文件地址
        FlatFileItemReader<BatchEntity> reader = new FlatFileItemReader<BatchEntity>();
        reader.setResource(new ClassPathResource("rw/batchJobLine.txt"));
        reader.setName("itemLineReader");
        // 设置是否跳过行
        // reader.setLinesToSkip(5);
        /*定义每一条记录 转换java实体类*/
        DefaultLineMapper<BatchEntity> lineMapper = new DefaultLineMapper<BatchEntity>();

        // delimiter 参数定义分隔符
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("entityId,age,firstName,isAdult,lastName".split(","));
        // 设置分隔符
        // tokenizer.setDelimiter(",");
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(fieldSetMapper());
        reader.setLineMapper(lineMapper);
        reader.afterPropertiesSet();
        return reader;
    }

我们需要设置的内容:

  1. 资源地址
  2. 是否需要跳过部分内容setLinesToSkip(5)
  3. 设置结果映射

读取JSON数据

    /**
     * 读取JSON数据
     * @return
     */
    @Bean(name = "itemJsonReader")
    public ItemReader<BatchEntity> itemJsonReader() {
        return new JsonItemReaderBuilder<BatchEntity>()
                .jsonObjectReader(new JacksonJsonObjectReader<>(BatchEntity.class))
                .resource(new ClassPathResource("rw/batchJobJson.json"))
                .name("batchJobReader")
                .build();
    }

我们需要设置的内容:

  1. 资源地址
  2. 设置结果映射

读取XML


    /**
     * XML读取数据
     * @return
     */
    @Bean(name = "itemXmlReader")
    public StaxEventItemReader<BatchEntity> itemXmlReader() {
        XStreamMarshaller unmarshaller = new XStreamMarshaller();
        Map<String,Class> map = new HashMap<>();
        map.put("data", BatchEntity.class);
        unmarshaller.setAliases(map);
        return new StaxEventItemReaderBuilder<BatchEntity>()
                .resource(new ClassPathResource("rw/batchJobXml.xml"))
                .addFragmentRootElements("data")
                .unmarshaller(unmarshaller)
                .name("itemXmlReader")
                .build();
    }

我们需要设置的内容:

  1. 资源地址
  2. 设置数据根节点.addFragmentRootElements("data")
  3. 设置结果映射

读取多个文件的数据

    /**
     * 读取多个文件
     * @return
     */
    @Bean(name = "itemFileReader")
    public MultiResourceItemReader itemFileReader() throws Exception {
        MultiResourceItemReaderBuilder<BatchEntity> rwfile =
                new MultiResourceItemReaderBuilder<BatchEntity>()
                .resources(new ClassPathResource("rwfile/batchJobLine.txt"),
                           new ClassPathResource("rwfile/batchJobLine2.txt"));
        rwfile.delegate(delegate());
        rwfile.name("itemFileReader");
        return rwfile.build();
    }

    private ResourceAwareItemReaderItemStream<? extends BatchEntity> delegate() throws Exception {
        // 设置文件地址
        FlatFileItemReader<BatchEntity> reader = new FlatFileItemReader<BatchEntity>();
        reader.setResource(new ClassPathResource("rw/batchJobLine.txt"));
        reader.setName("delegateReader");
        // 设置是否跳过行
        // reader.setLinesToSkip(5);
        /*定义每一条记录 转换java实体类*/
        DefaultLineMapper<BatchEntity> lineMapper = new DefaultLineMapper<BatchEntity>();

        // delimiter 参数定义分隔符
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("entityId,age,firstName,isAdult,lastName".split(","));
        // 设置分隔符
        // tokenizer.setDelimiter(",");
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(fieldSetMapper());
        reader.setLineMapper(lineMapper);
        reader.afterPropertiesSet();
        return reader;
    }

我们需要设置的内容:

  1. 资源地址
  2. 设置解析规则点
  3. 设置结果映射

读取JDBC数据

    /***
     * 使用JDBC读取数据
     * @return
     */
    @Bean(name = "itemJdbcReader")
    public JdbcPagingItemReader<BatchEntity> itemJdbcReader() {
        // 查询SQL
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("entity_id,age,first_name,full_name,hello_message,is_adult,last_name");
        queryProvider.setFromClause("from batch_entity");
        // 设置排序
        Map<String, Order> sortKeys = new HashMap<String, Order>();
        sortKeys.put("age", Order.ASCENDING);
        queryProvider.setSortKeys(sortKeys);

        return new JdbcPagingItemReaderBuilder<BatchEntity>()
                .dataSource(dataSource)
                .fetchSize(5)
                .rowMapper(new BeanPropertyRowMapper<>(BatchEntity.class))
                .queryProvider(queryProvider)
                .name("itemJdbcReader")
                .build();
    }

我们需要设置的内容:

  1. 设置数据源
  2. 设置单次查询数量
  3. 设置结果映射
  4. 设置SQL

使用游标读取JDBC数据

    /***
     * 使用JDBC游标读取数据
     * @return
     */
    @Bean(name = "itemJdbcCursorReader")
    public JdbcCursorItemReader<BatchEntity> itemJdbcCursorReader() {
        return new JdbcCursorItemReaderBuilder<BatchEntity>()
                .dataSource(dataSource)
                .fetchSize(5)
                .rowMapper(new BeanPropertyRowMapper<>(BatchEntity.class))
                .sql("select entity_id,age,first_name,full_name,hello_message,is_adult,last_name from batch_entity")
                .name("itemJdbcCursorReader")
                .build();
    }

我们需要设置的内容:

  1. 设置数据源
  2. 设置单次查询数量
  3. 设置结果映射
  4. 设置SQL

设置字段映射

    private FieldSetMapper fieldSetMapper() {
        return fieldSet -> new BatchEntity(
                fieldSet.readString("entityId"),
                fieldSet.readInt("age"),
                fieldSet.readString("firstName"),
                fieldSet.readBoolean("isAdult"),
                fieldSet.readString("lastName")
        );
    }

个人水平有限,上面的内容可能存在没有描述清楚或者错误的地方,因为每一个例子都提供了测试代码,一般来说不会有问题,但是因为这几篇内容断断续续用了一个半月可能会出现之后的代码影响了之前的例子,假如开发同学发现了,请及时告知,我会第一时间修改相关内容,也希望大家看在这个新春佳节只能宅到家中埋头苦逼的码代码的情况下,能给我点一个赞。你的点赞就是我前进的动力。