欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring Boot 整合——Spring batch通过不同方式输出数据(ItemWriter)

程序员文章站 2022-05-01 17:07:49
...

关于版本

依赖 版本
springboot 2.4.0
spring batch 2.4.0

代码地址

因为每个例子涉及代码较多,且包含测试用例,如果都贴到文章中内容过多,所以只贴出了部分代码。全部的代码在这里: https://gitee.com/daifyutils/springboot-samples

此篇文章所属模块为:base-batch-2.4.0

目录地址

目录 测试目录 内容
dai.samples.batch.allowstart dai.samples.allow 测试任务可以重复执行
dai.samples.batch.base dai.samples.base 基础任务配置
dai.samples.batch.skip dai.samples.skip 跳过操作
dai.samples.batch.listener dai.samples.listener 任务监听器
dai.samples.batch.process dai.samples.process 流程控制的代码
dai.samples.batch.add dai.samples.add 任务流程切割
dai.samples.batch.retry dai.samples.retry 任务重试
dai.samples.batch.rollback dai.samples.rollback 任务回滚
dai.samples.batch.rw dai.samples.rw 数据的读取和输出

Spring Batch其他内容

Spring Boot 整合——Spring batch基本使用

Spring Boot 整合——Spring batch的监听器

Spring Boot 整合——Spring batch任务流程控制以及流程分割

Spring Boot 整合——Spring batch通过不同方式读取数据(ItemReader)

Spring Boot 整合——Spring batch通过不同方式输出数据(ItemWriter)

Spring Boot 整合——Spring batch重试和回滚

Spring batch

spring batch 是spring提供的一个批数据处理的框架。提供了大量信息的自动化和定时处理的操作。其是一个相对轻量级的批处理操作

数据输入和输出定义

Spring batch 中数据的输入和输出主要依托于两个接口:itemReaderitemWriter

itemReader

尽管是一个简单的概念,但anItemReader是从许多不同类型的输入中提供数据的方法。最一般的示例包括

itemWriter

ItemWriter在功能上类似于ItemReader反操作。资源仍然需要定位,打开和关闭,但是它们的区别在于 ItemWriter写出而不是读入。对于数据库或队列,这些操作可能是插入,更新或发送。输出序列化的格式特定于每个批处理作业。

ItemWriter 数据输出

此接口只提供了一个方法,writer写入数据则是以chunk为单位,一块一块的进行写入。


public interface ItemWriter<T> {

	void write(List<? extends T> items) throws Exception;

}

以行格式写数据

    /**
     * 数据写到文件中
     * @return
     */
    @Bean(name = "itemLineWriter")
    public FlatFileItemWriter<BatchEntity> itemLineWriter(){
        String patch = "target/test-outputs/writer/itemLineWriter.txt";
        return new FlatFileItemWriterBuilder<BatchEntity>()
                .lineAggregator(new BatchEntityDoLineAggregator())
                .resource(new FileSystemResource(patch))
                .name("itemLineWriter")
                .build();
    }
    
    /**
     * 写入文件中行形式
     */
    class BatchEntityDoLineAggregator implements LineAggregator<BatchEntity> {
        ObjectMapper mapper = new ObjectMapper();

        @Override
        public String aggregate(BatchEntity batchEntity) {
            try {
                return mapper.writeValueAsString(batchEntity);
            } catch (JsonProcessingException e) {
                throw new RuntimeException("unable to writer...",e);
            }
        }

    }

我们需要设置的内容:

  1. 资源地址
  2. 数据格式处理lineAggregator
  3. 名称name

最终的输出结果

{"entityId":null,"firstName":"firstName0","lastName":"lastName0","age":0,"fullName":null,"helloMessage":null,"adult":false}
{"entityId":null,"firstName":"firstName1","lastName":"lastName1","age":2,"fullName":null,"helloMessage":null,"adult":false}
{"entityId":null,"firstName":"firstName2","lastName":"lastName2","age":4,"fullName":null,"helloMessage":null,"adult":false}
{"entityId":null,"firstName":"firstName3","lastName":"lastName3","age":6,"fullName":null,"helloMessage":null,"adult":false}
{"entityId":null,"firstName":"firstName4","lastName":"lastName4","age":8,"fullName":null,"helloMessage":null,"adult":false}
{"entityId":null,"firstName":"firstName5","lastName":"lastName5","age":10,"fullName":null,"helloMessage":null,"adult":false}
{"entityId":null,"firstName":"firstName6","lastName":"lastName6","age":12,"fullName":null,"helloMessage":null,"adult":false}
{"entityId":null,"firstName":"firstName7","lastName":"lastName7","age":14,"fullName":null,"helloMessage":null,"adult":false}
{"entityId":null,"firstName":"firstName8","lastName":"lastName8","age":16,"fullName":null,"helloMessage":null,"adult":false}
{"entityId":null,"firstName":"firstName9","lastName":"lastName9","age":18,"fullName":null,"helloMessage":null,"adult":false}

以JSON格式写数据

    /**
     * JSON写数据
     * @return
     */
    @Bean(name = "itemJsonWriter")
    public ItemWriter<BatchEntity> itemJsonWriter() {
        String patch = "target/test-outputs/writer/itemJsonWriter.json";
        return new JsonFileItemWriterBuilder<BatchEntity>()
                .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
                .resource(new FileSystemResource(patch))
                .name("itemJsonWriter")
                .build();
    }

我们需要设置的内容:

  1. 资源地址
  2. 数据格式处理jsonObjectMarshaller
  3. 名称name

最终的输出结果

[
 {"entityId":null,"firstName":"firstName0","lastName":"lastName0","age":0,"fullName":null,"helloMessage":null,"adult":false},
 {"entityId":null,"firstName":"firstName1","lastName":"lastName1","age":2,"fullName":null,"helloMessage":null,"adult":false},
 {"entityId":null,"firstName":"firstName2","lastName":"lastName2","age":4,"fullName":null,"helloMessage":null,"adult":false},
 {"entityId":null,"firstName":"firstName3","lastName":"lastName3","age":6,"fullName":null,"helloMessage":null,"adult":false},
 {"entityId":null,"firstName":"firstName4","lastName":"lastName4","age":8,"fullName":null,"helloMessage":null,"adult":false},
 {"entityId":null,"firstName":"firstName5","lastName":"lastName5","age":10,"fullName":null,"helloMessage":null,"adult":false},
 {"entityId":null,"firstName":"firstName6","lastName":"lastName6","age":12,"fullName":null,"helloMessage":null,"adult":false},
 {"entityId":null,"firstName":"firstName7","lastName":"lastName7","age":14,"fullName":null,"helloMessage":null,"adult":false},
 {"entityId":null,"firstName":"firstName8","lastName":"lastName8","age":16,"fullName":null,"helloMessage":null,"adult":false},
 {"entityId":null,"firstName":"firstName9","lastName":"lastName9","age":18,"fullName":null,"helloMessage":null,"adult":false}
]

以XML格式写数据

    /**
     * XML写数据
     * @return
     */
    @Bean(name = "itemXmlWriter")
    public StaxEventItemWriter<BatchEntity> itemXmlWriter() {
        XStreamMarshaller marshaller = new XStreamMarshaller();
        Map<String, Class> aliases = new HashMap<>();
        aliases.put("batchEntity", BatchEntity.class);
        marshaller.setAliases(aliases);

        String patch = "target/test-outputs/writer/itemXmlWriter.xml";
        return new StaxEventItemWriterBuilder<BatchEntity>()
                .rootTagName("batchEntity")
                .resource(new FileSystemResource(patch))
                .name("itemXmlWriter")
                .marshaller(marshaller)
                .build();
    }

我们需要设置的内容:

  1. 资源地址
  2. 数据格式处理rootTagName
  3. 名称name

最终的结果

<?xml version="1.0" encoding="UTF-8"?>
<batchEntity>
    <batchEntity>
        <firstName>firstName0</firstName>
        <lastName>lastName0</lastName>
        <age>0</age>
        <isAdult>false</isAdult>
    </batchEntity>
    <batchEntity>
        <firstName>firstName1</firstName>
        <lastName>lastName1</lastName>
        <age>2</age>
        <isAdult>false</isAdult>
    </batchEntity>
    <batchEntity>
        <firstName>firstName2</firstName>
        <lastName>lastName2</lastName>
        <age>4</age>
        <isAdult>false</isAdult>
    </batchEntity>
    <batchEntity>
        <firstName>firstName3</firstName>
        <lastName>lastName3</lastName>
        <age>6</age>
        <isAdult>false</isAdult>
    </batchEntity>
    <batchEntity>
        <firstName>firstName4</firstName>
        <lastName>lastName4</lastName>
        <age>8</age>
        <isAdult>false</isAdult>
    </batchEntity>
    <batchEntity>
        <firstName>firstName5</firstName>
        <lastName>lastName5</lastName>
        <age>10</age>
        <isAdult>false</isAdult>
    </batchEntity>
......
</batchEntity>

使用JDBC写数据

    /**
     * 数据写到数据库中
     * @return
     */
    @Bean(name = "itemJDBCWriter")
    public JdbcBatchItemWriter<BatchEntity> itemJDBCWriter() {
        return new JdbcBatchItemWriterBuilder<BatchEntity>()
                .dataSource(dataSource)
                .sql("insert into batch_entity " +
                        "(entity_id,first_name,last_name,age,full_name,is_adult,hello_message)" +
                        "values (:entityId,:firstName,:lastName,:age,:fullName,false,:helloMessage)")
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BatchEntity>())
                .build();
    }

我们需要设置的内容:

  1. 数据源
  2. sql
  3. 参数映射方式

个人水平有限,上面的内容可能存在没有描述清楚或者错误的地方,因为每一个例子都提供了测试代码,一般来说不会有问题,但是因为这几篇内容断断续续用了一个半月可能会出现之后的代码影响了之前的例子,假如开发同学发现了,请及时告知,我会第一时间修改相关内容,也希望大家看在这个新春佳节只能宅到家中埋头苦逼的码代码的情况下,能给我点一个赞。你的点赞就是我前进的动力。