欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于Spring Batch向Elasticsearch批量导入数据

程序员文章站 2022-03-01 13:47:10
...

1.介绍

当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率。Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据。由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入。

 

源码地址:http://www.wisely.top/2018/02/24/spring-batch-elasticsearch/

2.示例

2.1 pom.xml

本文使用spring data jest连接ES(也可以使用spring data elasticsearch连接ES),ES版本为5.5.3

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hfcsbc.estl</groupId>
    <artifactId>es-etl</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>es-etl</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.M7</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>

        <dependency>
            <groupId>com.github.vanroy</groupId>
            <artifactId>spring-boot-starter-data-jest</artifactId>
            <version>3.0.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>5.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>

    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </pluginRepository>
        <pluginRepository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>


</project>

2.2 实体类及repository

package com.hfcsbc.esetl.domain;

import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.OneToOne;

/**
 * Create by pengchao on 2018/2/23
 */
@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")
@Entity
@Data
public class Person {
    @Id
    private Long id;
    private String name;
    @OneToOne
    @Field(type = FieldType.Nested)
    private Address address;
}
package com.hfcsbc.esetl.domain;

import lombok.Data;

import javax.persistence.Entity;
import javax.persistence.Id;

/**
 * Create by pengchao on 2018/2/23
 */
@Entity
@Data
public class Address {
    @Id
    private Long id;
    private String name;
}
package com.hfcsbc.esetl.repository.jpa;

import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.jpa.repository.JpaRepository;

/**
 * Create by pengchao on 2018/2/23
 */
public interface PersonRepository extends JpaRepository<Person, Long> {
}
package com.hfcsbc.esetl.repository.es;

import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

/**
 * Create by pengchao on 2018/2/23
 */
public interface EsPersonRepository extends ElasticsearchRepository<Person, Long> {
}

2.3 配置elasticsearchItemWriter

package com.hfcsbc.esetl.itemWriter;

import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemWriter;

import java.util.List;

/**
 * Create by pengchao on 2018/2/23
 */
public class ElasticsearchItemWriter implements ItemWriter<Person>, ItemWriteListener<Person>, StepExecutionListener {

    private EsPersonRepository personRepository;

    public ElasticsearchItemWriter(EsPersonRepository personRepository) {
        this.personRepository = personRepository;
    }

    @Override
    public void beforeWrite(List<? extends Person> items) {

    }

    @Override
    public void afterWrite(List<? extends Person> items) {

    }

    @Override
    public void onWriteError(Exception exception, List<? extends Person> items) {

    }

    @Override
    public void beforeStep(StepExecution stepExecution) {

    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }

    @Override
    public void write(List<? extends Person> items) throws Exception {
        //实现类AbstractElasticsearchRepository的saveAll方法调用的是elasticsearchOperations.bulkIndex(queries),为批量索引
        personRepository.saveAll(items);
    }
}

2.4 配置ElasticsearchItemReader(本示例未使用,仅供参考)

package com.hfcsbc.esetl.itemReader;

import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.SearchQuery;

import java.util.Iterator;

/**
 * Create by pengchao on 2018/2/24
 */
public class ElasticsearchItemReader<Person> extends AbstractPaginatedDataItemReader<Person> implements InitializingBean {

    private final ElasticsearchOperations elasticsearchOperations;

    private final SearchQuery query;

    private final Class<? extends Person> targetType;

    public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends Person> targetType) {
        this.elasticsearchOperations = elasticsearchOperations;
        this.query = query;
        this.targetType = targetType;
    }

    @Override
    protected Iterator<Person> doPageRead() {
        return (Iterator<Person>)elasticsearchOperations.queryForList(query, targetType).iterator();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
    }
}

2.5 配置spring batch需要的配置

package com.hfcsbc.esetl.config;

import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;

/**
 * Create by pengchao on 2018/2/23
 */
@Configuration
@EnableBatchProcessing
public class BatchConfig {
    @Autowired
    private EsPersonRepository personRepository;

    @Bean
    public ItemReader<Person> orderItemReader(EntityManagerFactory entityManagerFactory){
        JpaPagingItemReader<Person> reader = new JpaPagingItemReader<Person>();
        String sqlQuery = "select * from person";
        try {
            JpaNativeQueryProvider<Person> queryProvider = new JpaNativeQueryProvider<Person>();
            queryProvider.setSqlQuery(sqlQuery);
            queryProvider.setEntityClass(Person.class);
            queryProvider.afterPropertiesSet();
            reader.setEntityManagerFactory(entityManagerFactory);
            reader.setPageSize(10000);
            reader.setQueryProvider(queryProvider);
            reader.afterPropertiesSet();
            reader.setSaveState(true);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return reader;
    }

    @Bean
    public ElasticsearchItemWriter itemWriter(){
        return new ElasticsearchItemWriter(personRepository);
    }

    @Bean
    public Step step(StepBuilderFactory stepBuilderFactory,
                     ItemReader itemReader,
                     ItemWriter itemWriter){
        return stepBuilderFactory
                .get("step1")
                .chunk(10000)
                .reader(itemReader)
                .writer(itemWriter)
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, Step step){
        return jobBuilderFactory
                .get("importJob")
                .incrementer(new RunIdIncrementer())
                .flow(step)
                .end()
                .build();
    }

    /**
     * spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource
     * @param dataSource
     * @param manager
     * @return
     */
    @Bean
    public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(manager);
        jobRepositoryFactoryBean.setDatabaseType("postgres");
        try {
            return jobRepositoryFactoryBean.getObject();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

}

2.6配置数据库及es的连接地址

spring:
  redis:
    host: 192.168.1.222
  data:
    jest:
      uri: http://192.168.1.222:9200
      username: elastic
      password: changeme

  jpa:
    database: POSTGRESQL
    show-sql: true
    hibernate:
      ddl-auto: update

  datasource:
    platform: postgres
    url: jdbc:postgresql://192.168.1.222:5433/person
    username: hfcb
    password: hfcb
    driver-class-name: org.postgresql.Driver
    max-active: 2

spring.batch.initialize-schema: always

2.7 配置入口类

package com.hfcsbc.esetl;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;

@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class})
@EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository")
@EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa")
public class EsEtlApplication {

    public static void main(String[] args) {
        SpringApplication.run(EsEtlApplication.class, args);
    }
}