欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于Spring Batch向Elasticsearch批量导入数据示例

程序员文章站 2022-07-28 15:07:13
1.介绍 当系统有大量数据需要从数据库导入elasticsearch时,使用spring batch可以提高导入的效率。spring batch使用itemreader分...

1.介绍

当系统有大量数据需要从数据库导入elasticsearch时,使用spring batch可以提高导入的效率。spring batch使用itemreader分页读取数据,itemwriter批量写数据。由于spring batch没有提供elastisearch的itemwriter和itemreader,本示例中自定义一个elasticsearchitemwriter(elasticsearchitemreader),用于批量导入。

2.示例

2.1 pom.xml

本文使用spring data jest连接es(也可以使用spring data elasticsearch连接es),es版本为5.5.3

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
  xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelversion>4.0.0</modelversion>

  <groupid>com.hfcsbc.estl</groupid>
  <artifactid>es-etl</artifactid>
  <version>0.0.1-snapshot</version>
  <packaging>jar</packaging>

  <name>es-etl</name>
  <description>demo project for spring boot</description>

  <parent>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-parent</artifactid>
    <version>2.0.0.m7</version>
    <relativepath/> <!-- lookup parent from repository -->
  </parent>

  <properties>
    <project.build.sourceencoding>utf-8</project.build.sourceencoding>
    <project.reporting.outputencoding>utf-8</project.reporting.outputencoding>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter</artifactid>
    </dependency>

    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-data-jpa</artifactid>
    </dependency>

    <dependency>
      <groupid>org.postgresql</groupid>
      <artifactid>postgresql</artifactid>
    </dependency>

    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-batch</artifactid>
    </dependency>

    <dependency>
      <groupid>com.github.vanroy</groupid>
      <artifactid>spring-boot-starter-data-jest</artifactid>
      <version>3.0.0.release</version>
    </dependency>

    <dependency>
      <groupid>io.searchbox</groupid>
      <artifactid>jest</artifactid>
      <version>5.3.2</version>
    </dependency>

    <dependency>
      <groupid>org.projectlombok</groupid>
      <artifactid>lombok</artifactid>
    </dependency>

    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-test</artifactid>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-maven-plugin</artifactid>
      </plugin>
    </plugins>
  </build>

  <repositories>
    <repository>
      <id>spring-snapshots</id>
      <name>spring snapshots</name>
      <url>https://repo.spring.io/snapshot</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
    <repository>
      <id>spring-milestones</id>
      <name>spring milestones</name>
      <url>https://repo.spring.io/milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>

  </repositories>

  <pluginrepositories>
    <pluginrepository>
      <id>spring-snapshots</id>
      <name>spring snapshots</name>
      <url>https://repo.spring.io/snapshot</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </pluginrepository>
    <pluginrepository>
      <id>spring-milestones</id>
      <name>spring milestones</name>
      <url>https://repo.spring.io/milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </pluginrepository>
  </pluginrepositories>
</project>

2.2 实体类及repository

package com.hfcsbc.esetl.domain;
import lombok.data;
import org.springframework.data.elasticsearch.annotations.document;
import org.springframework.data.elasticsearch.annotations.field;
import org.springframework.data.elasticsearch.annotations.fieldtype;
import javax.persistence.entity;
import javax.persistence.id;
import javax.persistence.onetoone;

/**
 * create by pengchao on 2018/2/23
 */
@document(indexname = "person", type = "person", shards = 1, replicas = 0, refreshinterval = "-1")
@entity
@data
public class person {
  @id
  private long id;
  private string name;
  @onetoone
  @field(type = fieldtype.nested)
  private address address;
}

package com.hfcsbc.esetl.domain;
import lombok.data;
import javax.persistence.entity;
import javax.persistence.id;
/**
 * create by pengchao on 2018/2/23
 */
@entity
@data
public class address {
  @id
  private long id;
  private string name;
}

package com.hfcsbc.esetl.repository.jpa;
import com.hfcsbc.esetl.domain.person;
import org.springframework.data.jpa.repository.jparepository;
/**
 * create by pengchao on 2018/2/23
 */
public interface personrepository extends jparepository<person, long> {
}

package com.hfcsbc.esetl.repository.es;
import com.hfcsbc.esetl.domain.person;
import org.springframework.data.elasticsearch.repository.elasticsearchrepository;
/**
 * create by pengchao on 2018/2/23
 */
public interface espersonrepository extends elasticsearchrepository<person, long> {
}

2.3 配置elasticsearchitemwriter

package com.hfcsbc.esetl.itemwriter;
import com.hfcsbc.esetl.repository.es.espersonrepository;
import com.hfcsbc.esetl.domain.person;
import org.springframework.batch.core.exitstatus;
import org.springframework.batch.core.itemwritelistener;
import org.springframework.batch.core.stepexecution;
import org.springframework.batch.core.stepexecutionlistener;
import org.springframework.batch.item.itemwriter;
import java.util.list;
/**
 * create by pengchao on 2018/2/23
 */
public class elasticsearchitemwriter implements itemwriter<person>, itemwritelistener<person>, stepexecutionlistener {

  private espersonrepository personrepository;

  public elasticsearchitemwriter(espersonrepository personrepository) {
    this.personrepository = personrepository;
  }

  @override
  public void beforewrite(list<? extends person> items) {

  }

  @override
  public void afterwrite(list<? extends person> items) {

  }

  @override
  public void onwriteerror(exception exception, list<? extends person> items) {

  }

  @override
  public void beforestep(stepexecution stepexecution) {

  }

  @override
  public exitstatus afterstep(stepexecution stepexecution) {
    return null;
  }

  @override
  public void write(list<? extends person> items) throws exception {
    //实现类abstractelasticsearchrepository的saveall方法调用的是elasticsearchoperations.bulkindex(queries),为批量索引
    personrepository.saveall(items);
  }
}

2.4 配置elasticsearchitemreader(本示例未使用,仅供参考)

package com.hfcsbc.esetl.itemreader;
import org.springframework.batch.item.data.abstractpaginateddataitemreader;
import org.springframework.beans.factory.initializingbean;
import org.springframework.data.elasticsearch.core.elasticsearchoperations;
import org.springframework.data.elasticsearch.core.query.searchquery;
import java.util.iterator;
/**
 * create by pengchao on 2018/2/24
 */
public class elasticsearchitemreader<person> extends abstractpaginateddataitemreader<person> implements initializingbean {

  private final elasticsearchoperations elasticsearchoperations;

  private final searchquery query;

  private final class<? extends person> targettype;

  public elasticsearchitemreader(elasticsearchoperations elasticsearchoperations, searchquery query, class<? extends person> targettype) {
    this.elasticsearchoperations = elasticsearchoperations;
    this.query = query;
    this.targettype = targettype;
  }

  @override
  protected iterator<person> dopageread() {
    return (iterator<person>)elasticsearchoperations.queryforlist(query, targettype).iterator();
  }

  @override
  public void afterpropertiesset() throws exception {
  }
}

2.5 配置spring batch需要的配置

package com.hfcsbc.esetl.config;
import com.hfcsbc.esetl.itemwriter.elasticsearchitemwriter;
import com.hfcsbc.esetl.repository.es.espersonrepository;
import com.hfcsbc.esetl.domain.person;
import org.springframework.batch.core.job;
import org.springframework.batch.core.step;
import org.springframework.batch.core.configuration.annotation.enablebatchprocessing;
import org.springframework.batch.core.configuration.annotation.jobbuilderfactory;
import org.springframework.batch.core.configuration.annotation.stepbuilderfactory;
import org.springframework.batch.core.launch.support.runidincrementer;
import org.springframework.batch.core.repository.jobrepository;
import org.springframework.batch.core.repository.support.jobrepositoryfactorybean;
import org.springframework.batch.item.itemreader;
import org.springframework.batch.item.itemwriter;
import org.springframework.batch.item.database.jpapagingitemreader;
import org.springframework.batch.item.database.orm.jpanativequeryprovider;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.transaction.platformtransactionmanager;
import javax.persistence.entitymanagerfactory;
import javax.sql.datasource;
/**
 * create by pengchao on 2018/2/23
 */
@configuration
@enablebatchprocessing
public class batchconfig {
  @autowired
  private espersonrepository personrepository;

  @bean
  public itemreader<person> orderitemreader(entitymanagerfactory entitymanagerfactory){
    jpapagingitemreader<person> reader = new jpapagingitemreader<person>();
    string sqlquery = "select * from person";
    try {
      jpanativequeryprovider<person> queryprovider = new jpanativequeryprovider<person>();
      queryprovider.setsqlquery(sqlquery);
      queryprovider.setentityclass(person.class);
      queryprovider.afterpropertiesset();
      reader.setentitymanagerfactory(entitymanagerfactory);
      reader.setpagesize(10000);
      reader.setqueryprovider(queryprovider);
      reader.afterpropertiesset();
      reader.setsavestate(true);
    } catch (exception e) {
      e.printstacktrace();
    }

    return reader;
  }

  @bean
  public elasticsearchitemwriter itemwriter(){
    return new elasticsearchitemwriter(personrepository);
  }

  @bean
  public step step(stepbuilderfactory stepbuilderfactory,
           itemreader itemreader,
           itemwriter itemwriter){
    return stepbuilderfactory
        .get("step1")
        .chunk(10000)
        .reader(itemreader)
        .writer(itemwriter)
        .build();
  }

  @bean
  public job job(jobbuilderfactory jobbuilderfactory, step step){
    return jobbuilderfactory
        .get("importjob")
        .incrementer(new runidincrementer())
        .flow(step)
        .end()
        .build();
  }

  /**
   * spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:datasource
   * @param datasource
   * @param manager
   * @return
   */
  @bean
  public jobrepository jobrepository(datasource datasource, platformtransactionmanager manager){
    jobrepositoryfactorybean jobrepositoryfactorybean = new jobrepositoryfactorybean();
    jobrepositoryfactorybean.setdatasource(datasource);
    jobrepositoryfactorybean.settransactionmanager(manager);
    jobrepositoryfactorybean.setdatabasetype("postgres");
    try {
      return jobrepositoryfactorybean.getobject();
    } catch (exception e) {
      e.printstacktrace();
    }
    return null;
  }
}

2.6配置数据库及es的连接地址

spring:
 redis:
  host: 192.168.1.222
 data:
  jest:
   uri: http://192.168.1.222:9200
   username: elastic
   password: changeme

 jpa:
  database: postgresql
  show-sql: true
  hibernate:
   ddl-auto: update

 datasource:
  platform: postgres
  url: jdbc:postgresql://192.168.1.222:5433/person
  username: hfcb
  password: hfcb
  driver-class-name: org.postgresql.driver
  max-active: 2

spring.batch.initialize-schema: always

2.7 配置入口类

package com.hfcsbc.esetl;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchautoconfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchdataautoconfiguration;
import org.springframework.data.elasticsearch.repository.config.enableelasticsearchrepositories;
import org.springframework.data.jpa.repository.config.enablejparepositories;

@springbootapplication(exclude = {elasticsearchautoconfiguration.class, elasticsearchdataautoconfiguration.class})
@enableelasticsearchrepositories(basepackages = "com.hfcsbc.esetl.repository")
@enablejparepositories(basepackages = "com.hfcsbc.esetl.repository.jpa")
public class esetlapplication {

  public static void main(string[] args) {
    springapplication.run(esetlapplication.class, args);
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。