欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring Batch 读 10 万条记录,写到 MongoDB

程序员文章站 2022-05-17 09:06:22
实践内容 从 MariaDB 一张表内读 10 万条记录,经处理后写到 MongoDB 。 具体实现 1、新建 Spring Boot 应用,依赖如下: 2、创建一张表,并生成 10 万条数据 3、创建 Person 类 4、创建一个中间处理器 5、创建 ,用户数据库映射 6、创建任务完成的监听 7 ......

实践内容

从 mariadb 一张表内读 10 万条记录,经处理后写到 mongodb 。

具体实现

1、新建 spring boot 应用,依赖如下:

        <!-- web 应用 -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
            <exclusions>
                <exclusion>
                    <groupid>org.springframework.boot</groupid>
                    <artifactid>spring-boot-starter-logging</artifactid>
                </exclusion>
                <exclusion>
                    <groupid>org.springframework.boot</groupid>
                    <artifactid>spring-boot-starter-tomcat</artifactid>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- web 容器 undertow -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-undertow</artifactid>
        </dependency>

        <!-- 日志 log4j2 -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-log4j2</artifactid>
        </dependency>

        <!-- mongodb -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-data-mongodb</artifactid>
        </dependency>

        <!-- brantch -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-batch</artifactid>
        </dependency>

        <!-- mariadb 驱动 -->
        <dependency>
            <groupid>org.mariadb.jdbc</groupid>
            <artifactid>mariadb-java-client</artifactid>
            <version>2.0.2</version>
        </dependency>

        <!-- lombok 代码简化 -->
        <dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
            <version>1.16.14</version>
        </dependency>

2、创建一张表,并生成 10 万条数据

drop table people if exists;

create table people  (
    id bigint identity not null primary key,
    first_name varchar(20),
    last_name varchar(20)
);

3、创建 person 类

@data
public class person {
    private long id;
    private string lastname;
    private string firstname;
}

4、创建一个中间处理器 personitemprocessor

import org.springframework.batch.item.itemprocessor;

@log4j2
public class personitemprocessor implements itemprocessor<person, person> {

    @override
    public person process(final person person) throws exception {
        final string firstname = person.getfirstname().touppercase();
        final string lastname = person.getlastname().touppercase();

        final person transformedperson = new person(firstname, lastname);

        log.info("converting (" + person + ") into (" + transformedperson + ")");

        return transformedperson;
    }

}

5、创建 personmapper,用户数据库映射

public class personmapper implements rowmapper {

    private static final string id_column = "id";
    private static final string nickname_column = "first_name";
    private static final string email_column = "last_name";


    @override
    public object maprow(resultset resultset, int i) throws sqlexception {
        person user = new person();
        person.setid(resultset.getlong(id_column));
        person.setnickname(resultset.getstring(nickname_column));
        person.setemail(resultset.getstring(email_column));
        return person;
    }
}

6、创建任务完成的监听 jobcompletionnotificationlistener

@log4j2
@component
public class jobcompletionnotificationlistener extends jobexecutionlistenersupport {

    @override
    public void afterjob(jobexecution jobexecution) {
        if(jobexecution.getstatus() == batchstatus.completed) {
            log.info("!!! job finished! time to verify the results");
        }
    }
}

7、构建批处理任务 batchconfiguration

@configuration
@enablebatchprocessing
public class batchconfiguration {

    @autowired
    public jobbuilderfactory jobbuilderfactory;

    @autowired
    public stepbuilderfactory stepbuilderfactory;

    @autowired
    public datasource datasource;
    
    @autowired
    public mongotemplate mongotemplate;

    @bean
    public jdbccursoritemreader<person> reader(){
        jdbccursoritemreader<person> itemreader = new jdbccursoritemreader<person>();
        itemreader.setdatasource(datasource);
        itemreader.setsql("select id, nickname, email from people");
        itemreader.setrowmapper(new personmapper());
        return itemreader;
    }

    @bean
    public personitemprocessor processor() {
        return new personitemprocessor();
    }
    
    @bean
    mongoitemwriter<person> writer(){
        mongoitemwriter<person> itemwriter = new mongoitemwriter<person>();
        itemwriter.settemplate(mongotemplate);
        itemwriter.setcollection("branch");
        return itemwriter;
    }

    @bean
    public step step() {
        return stepbuilderfactory.get("step")
                .<person, person> chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @bean
    public job importuserjob(jobcompletionnotificationlistener listener) {
        return jobbuilderfactory.get("importuserjob")
                .incrementer(new runidincrementer())
                .listener(listener)
                .flow(step())
                .end()
                .build();
    }

}

任务处理结果

0出错,耗时 2 分钟左右,测试机 mac