基于Spring Batch向Elasticsearch批量导入数据示例
程序员文章站
2022-07-28 15:07:13
1.介绍
当系统有大量数据需要从数据库导入elasticsearch时,使用spring batch可以提高导入的效率。spring batch使用itemreader分...
1.介绍
当系统有大量数据需要从数据库导入elasticsearch时,使用spring batch可以提高导入的效率。spring batch使用itemreader分页读取数据,itemwriter批量写数据。由于spring batch没有提供elastisearch的itemwriter和itemreader,本示例中自定义一个elasticsearchitemwriter(elasticsearchitemreader),用于批量导入。
2.示例
2.1 pom.xml
本文使用spring data jest连接es(也可以使用spring data elasticsearch连接es),es版本为5.5.3
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>com.hfcsbc.estl</groupid> <artifactid>es-etl</artifactid> <version>0.0.1-snapshot</version> <packaging>jar</packaging> <name>es-etl</name> <description>demo project for spring boot</description> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>2.0.0.m7</version> <relativepath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceencoding>utf-8</project.build.sourceencoding> <project.reporting.outputencoding>utf-8</project.reporting.outputencoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-jpa</artifactid> </dependency> <dependency> <groupid>org.postgresql</groupid> <artifactid>postgresql</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-batch</artifactid> </dependency> <dependency> <groupid>com.github.vanroy</groupid> <artifactid>spring-boot-starter-data-jest</artifactid> <version>3.0.0.release</version> </dependency> <dependency> <groupid>io.searchbox</groupid> <artifactid>jest</artifactid> <version>5.3.2</version> </dependency> <dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> </plugins> </build> <repositories> <repository> <id>spring-snapshots</id> <name>spring snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-milestones</id> <name>spring milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <pluginrepositories> <pluginrepository> <id>spring-snapshots</id> <name>spring snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </pluginrepository> <pluginrepository> <id>spring-milestones</id> <name>spring milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </pluginrepository> </pluginrepositories> </project>
2.2 实体类及repository
package com.hfcsbc.esetl.domain; import lombok.data; import org.springframework.data.elasticsearch.annotations.document; import org.springframework.data.elasticsearch.annotations.field; import org.springframework.data.elasticsearch.annotations.fieldtype; import javax.persistence.entity; import javax.persistence.id; import javax.persistence.onetoone; /** * create by pengchao on 2018/2/23 */ @document(indexname = "person", type = "person", shards = 1, replicas = 0, refreshinterval = "-1") @entity @data public class person { @id private long id; private string name; @onetoone @field(type = fieldtype.nested) private address address; }
package com.hfcsbc.esetl.domain; import lombok.data; import javax.persistence.entity; import javax.persistence.id; /** * create by pengchao on 2018/2/23 */ @entity @data public class address { @id private long id; private string name; }
package com.hfcsbc.esetl.repository.jpa; import com.hfcsbc.esetl.domain.person; import org.springframework.data.jpa.repository.jparepository; /** * create by pengchao on 2018/2/23 */ public interface personrepository extends jparepository<person, long> { }
package com.hfcsbc.esetl.repository.es; import com.hfcsbc.esetl.domain.person; import org.springframework.data.elasticsearch.repository.elasticsearchrepository; /** * create by pengchao on 2018/2/23 */ public interface espersonrepository extends elasticsearchrepository<person, long> { }
2.3 配置elasticsearchitemwriter
package com.hfcsbc.esetl.itemwriter; import com.hfcsbc.esetl.repository.es.espersonrepository; import com.hfcsbc.esetl.domain.person; import org.springframework.batch.core.exitstatus; import org.springframework.batch.core.itemwritelistener; import org.springframework.batch.core.stepexecution; import org.springframework.batch.core.stepexecutionlistener; import org.springframework.batch.item.itemwriter; import java.util.list; /** * create by pengchao on 2018/2/23 */ public class elasticsearchitemwriter implements itemwriter<person>, itemwritelistener<person>, stepexecutionlistener { private espersonrepository personrepository; public elasticsearchitemwriter(espersonrepository personrepository) { this.personrepository = personrepository; } @override public void beforewrite(list<? extends person> items) { } @override public void afterwrite(list<? extends person> items) { } @override public void onwriteerror(exception exception, list<? extends person> items) { } @override public void beforestep(stepexecution stepexecution) { } @override public exitstatus afterstep(stepexecution stepexecution) { return null; } @override public void write(list<? extends person> items) throws exception { //实现类abstractelasticsearchrepository的saveall方法调用的是elasticsearchoperations.bulkindex(queries),为批量索引 personrepository.saveall(items); } }
2.4 配置elasticsearchitemreader(本示例未使用,仅供参考)
package com.hfcsbc.esetl.itemreader; import org.springframework.batch.item.data.abstractpaginateddataitemreader; import org.springframework.beans.factory.initializingbean; import org.springframework.data.elasticsearch.core.elasticsearchoperations; import org.springframework.data.elasticsearch.core.query.searchquery; import java.util.iterator; /** * create by pengchao on 2018/2/24 */ public class elasticsearchitemreader<person> extends abstractpaginateddataitemreader<person> implements initializingbean { private final elasticsearchoperations elasticsearchoperations; private final searchquery query; private final class<? extends person> targettype; public elasticsearchitemreader(elasticsearchoperations elasticsearchoperations, searchquery query, class<? extends person> targettype) { this.elasticsearchoperations = elasticsearchoperations; this.query = query; this.targettype = targettype; } @override protected iterator<person> dopageread() { return (iterator<person>)elasticsearchoperations.queryforlist(query, targettype).iterator(); } @override public void afterpropertiesset() throws exception { } }
2.5 配置spring batch需要的配置
package com.hfcsbc.esetl.config; import com.hfcsbc.esetl.itemwriter.elasticsearchitemwriter; import com.hfcsbc.esetl.repository.es.espersonrepository; import com.hfcsbc.esetl.domain.person; import org.springframework.batch.core.job; import org.springframework.batch.core.step; import org.springframework.batch.core.configuration.annotation.enablebatchprocessing; import org.springframework.batch.core.configuration.annotation.jobbuilderfactory; import org.springframework.batch.core.configuration.annotation.stepbuilderfactory; import org.springframework.batch.core.launch.support.runidincrementer; import org.springframework.batch.core.repository.jobrepository; import org.springframework.batch.core.repository.support.jobrepositoryfactorybean; import org.springframework.batch.item.itemreader; import org.springframework.batch.item.itemwriter; import org.springframework.batch.item.database.jpapagingitemreader; import org.springframework.batch.item.database.orm.jpanativequeryprovider; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.transaction.platformtransactionmanager; import javax.persistence.entitymanagerfactory; import javax.sql.datasource; /** * create by pengchao on 2018/2/23 */ @configuration @enablebatchprocessing public class batchconfig { @autowired private espersonrepository personrepository; @bean public itemreader<person> orderitemreader(entitymanagerfactory entitymanagerfactory){ jpapagingitemreader<person> reader = new jpapagingitemreader<person>(); string sqlquery = "select * from person"; try { jpanativequeryprovider<person> queryprovider = new jpanativequeryprovider<person>(); queryprovider.setsqlquery(sqlquery); queryprovider.setentityclass(person.class); queryprovider.afterpropertiesset(); reader.setentitymanagerfactory(entitymanagerfactory); reader.setpagesize(10000); reader.setqueryprovider(queryprovider); reader.afterpropertiesset(); reader.setsavestate(true); } catch (exception e) { e.printstacktrace(); } return reader; } @bean public elasticsearchitemwriter itemwriter(){ return new elasticsearchitemwriter(personrepository); } @bean public step step(stepbuilderfactory stepbuilderfactory, itemreader itemreader, itemwriter itemwriter){ return stepbuilderfactory .get("step1") .chunk(10000) .reader(itemreader) .writer(itemwriter) .build(); } @bean public job job(jobbuilderfactory jobbuilderfactory, step step){ return jobbuilderfactory .get("importjob") .incrementer(new runidincrementer()) .flow(step) .end() .build(); } /** * spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:datasource * @param datasource * @param manager * @return */ @bean public jobrepository jobrepository(datasource datasource, platformtransactionmanager manager){ jobrepositoryfactorybean jobrepositoryfactorybean = new jobrepositoryfactorybean(); jobrepositoryfactorybean.setdatasource(datasource); jobrepositoryfactorybean.settransactionmanager(manager); jobrepositoryfactorybean.setdatabasetype("postgres"); try { return jobrepositoryfactorybean.getobject(); } catch (exception e) { e.printstacktrace(); } return null; } }
2.6配置数据库及es的连接地址
spring: redis: host: 192.168.1.222 data: jest: uri: http://192.168.1.222:9200 username: elastic password: changeme jpa: database: postgresql show-sql: true hibernate: ddl-auto: update datasource: platform: postgres url: jdbc:postgresql://192.168.1.222:5433/person username: hfcb password: hfcb driver-class-name: org.postgresql.driver max-active: 2 spring.batch.initialize-schema: always
2.7 配置入口类
package com.hfcsbc.esetl; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchautoconfiguration; import org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchdataautoconfiguration; import org.springframework.data.elasticsearch.repository.config.enableelasticsearchrepositories; import org.springframework.data.jpa.repository.config.enablejparepositories; @springbootapplication(exclude = {elasticsearchautoconfiguration.class, elasticsearchdataautoconfiguration.class}) @enableelasticsearchrepositories(basepackages = "com.hfcsbc.esetl.repository") @enablejparepositories(basepackages = "com.hfcsbc.esetl.repository.jpa") public class esetlapplication { public static void main(string[] args) { springapplication.run(esetlapplication.class, args); } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: 蔚来汽车的危险青春期 没有温室的花朵
下一篇: Vue学习之路第七篇:跑马灯项目实现