Spring Batch 读 10 万条记录,写到 MongoDB
程序员文章站
2024-01-22 14:34:16
实践内容 从 MariaDB 一张表内读 10 万条记录,经处理后写到 MongoDB 。 具体实现 1、新建 Spring Boot 应用,依赖如下: 2、创建一张表,并生成 10 万条数据 3、创建 Person 类 4、创建一个中间处理器 5、创建 ,用户数据库映射 6、创建任务完成的监听 7 ......
实践内容
从 mariadb 一张表内读 10 万条记录,经处理后写到 mongodb 。
具体实现
1、新建 spring boot 应用,依赖如下:
<!-- web 应用 --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> <exclusions> <exclusion> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-logging</artifactid> </exclusion> <exclusion> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-tomcat</artifactid> </exclusion> </exclusions> </dependency> <!-- web 容器 undertow --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-undertow</artifactid> </dependency> <!-- 日志 log4j2 --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-log4j2</artifactid> </dependency> <!-- mongodb --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-mongodb</artifactid> </dependency> <!-- brantch --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-batch</artifactid> </dependency> <!-- mariadb 驱动 --> <dependency> <groupid>org.mariadb.jdbc</groupid> <artifactid>mariadb-java-client</artifactid> <version>2.0.2</version> </dependency> <!-- lombok 代码简化 --> <dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> <version>1.16.14</version> </dependency>
2、创建一张表,并生成 10 万条数据
drop table people if exists; create table people ( id bigint identity not null primary key, first_name varchar(20), last_name varchar(20) );
3、创建 person 类
@data public class person { private long id; private string lastname; private string firstname; }
4、创建一个中间处理器 personitemprocessor
import org.springframework.batch.item.itemprocessor; @log4j2 public class personitemprocessor implements itemprocessor<person, person> { @override public person process(final person person) throws exception { final string firstname = person.getfirstname().touppercase(); final string lastname = person.getlastname().touppercase(); final person transformedperson = new person(firstname, lastname); log.info("converting (" + person + ") into (" + transformedperson + ")"); return transformedperson; } }
5、创建 personmapper
,用户数据库映射
public class personmapper implements rowmapper { private static final string id_column = "id"; private static final string nickname_column = "first_name"; private static final string email_column = "last_name"; @override public object maprow(resultset resultset, int i) throws sqlexception { person user = new person(); person.setid(resultset.getlong(id_column)); person.setnickname(resultset.getstring(nickname_column)); person.setemail(resultset.getstring(email_column)); return person; } }
6、创建任务完成的监听 jobcompletionnotificationlistener
@log4j2 @component public class jobcompletionnotificationlistener extends jobexecutionlistenersupport { @override public void afterjob(jobexecution jobexecution) { if(jobexecution.getstatus() == batchstatus.completed) { log.info("!!! job finished! time to verify the results"); } } }
7、构建批处理任务 batchconfiguration
@configuration @enablebatchprocessing public class batchconfiguration { @autowired public jobbuilderfactory jobbuilderfactory; @autowired public stepbuilderfactory stepbuilderfactory; @autowired public datasource datasource; @autowired public mongotemplate mongotemplate; @bean public jdbccursoritemreader<person> reader(){ jdbccursoritemreader<person> itemreader = new jdbccursoritemreader<person>(); itemreader.setdatasource(datasource); itemreader.setsql("select id, nickname, email from people"); itemreader.setrowmapper(new personmapper()); return itemreader; } @bean public personitemprocessor processor() { return new personitemprocessor(); } @bean mongoitemwriter<person> writer(){ mongoitemwriter<person> itemwriter = new mongoitemwriter<person>(); itemwriter.settemplate(mongotemplate); itemwriter.setcollection("branch"); return itemwriter; } @bean public step step() { return stepbuilderfactory.get("step") .<person, person> chunk(10) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } @bean public job importuserjob(jobcompletionnotificationlistener listener) { return jobbuilderfactory.get("importuserjob") .incrementer(new runidincrementer()) .listener(listener) .flow(step()) .end() .build(); } }
任务处理结果
0出错,耗时 2 分钟左右,测试机 mac
上一篇: Docker简介、安装以及应用部署
下一篇: 人工智能的春天来了吗?