欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

es导入大批量数据

程序员文章站 2022-03-04 14:07:33
...

要把2.5亿数据导入es中
一开始使用BulkRequestBuilder结果每次跑到60万,es节点就挂了,我也不知道为啥。。。

之后使用BulkProcessor,可以导入
出现问题:
1)一开始将几个字段都默认为索引,没有设置refresh_interval,number_of_replicas参数 ,refresh_interval的默认值为1s,(即每秒更新索引),number_of_replicas每个主分片的副本数,默认值是 1,因此导入非常慢,19个小时才导完

2)之后又新建了一个库,不对这几个字段建索引,设置refresh_interval= -1,(即不更新索引), “number_of_replicas”: 0, 4小时不到导完 ,但是会发现es中的数据不会实时更新,需要手动刷新一下

3)之后又新建了一个库,对这几个字段建索引,设置refresh_interval= -1,(即不更新索引), “number_of_replicas”: 0, 4.5个小时导完

由此可见,是否索引字段及索引刷新时间这两个因素相比,索引刷新时间对导入速度影响比较大

#!/usr/bin/env bash

# 建立索引和类型

curl -H 'content-type: application/json' -XPUT \
  -d @- 'http://192.168.88.126:9200/zzz' <<CURL_DATA
{
    "settings": {
                "number_of_replicas": 0,
                "number_of_shards": 6,
                "refresh_interval": "-1",
                "index.mapping.total_fields.limit": 100000
    },
        "mappings": {
            "log": {
                "properties": {
                    "account":{"type": "keyword"},
                    "path":{"type": "keyword"},
                    "card":{"type": "keyword"},
                    "mid":{"type": "keyword"},
                    "pid":{"type": "keyword"},
                    "request_time":{"type": "keyword"},
                    "status":{"type": "keyword"}
                }
            }
        }
}
CURL_DATA
public class Esimport {
    private static TransportClient client= null;

    public static  void importcsv() throws IOException {
        TransportClient client = getClient();
        File f = new File("/home/upsmart/1clean/anu/alles.csv");
        BufferedReader br = new BufferedReader(new FileReader(f));
        String line = null ;
        int count = 0;
        BulkProcessor bulkProcessor = getProcessor();
        while ((line = br.readLine())!=null){
            String[] filedValues = line.split("\t");
            bulkProcessor.add(new IndexRequest("zzz","log",Integer.toString(count++)).source(jsonBuilder()
                .startObject()
                .field("account", filedValues[0])
                .field("path", filedValues[1])
                .field("card", filedValues[2])
                .field("mid", filedValues[3])
                .field("pid", filedValues[4])
                .field("request_time", filedValues[5])
                .field("status", filedValues[6])
                .endObject()));
            if (count % 20000 == 0) {
                System.out.println("==========第"+count+"条");
            }
        }
        bulkProcessor.flush();
        try {
            System.out.println("try");
            bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
            System.out.println("over");
        } catch (InterruptedException e) {
            System.out.println("关闭异常");
        }
        br.close();
    }

     static TransportClient getClient() {
         if (null == client) {
             synchronized (Esimport.class) {
                 if(null == client) {
                     Settings settings = Settings.builder().put("cluster.name", "xiaoyao").put("client.transport.sniff", true).build();
                     try {
                         client = new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.88.126"),9300));
                     } catch (UnknownHostException e) {
                         e.printStackTrace();
                     }
                 }
             }
         }
         return client;
     }

    public static void main(String[] args) throws IOException {
        System.out.println("开始"+System.currentTimeMillis());
        importcsv();
        System.out.println("结束"+System.currentTimeMillis());

    }

    static  BulkProcessor getProcessor(){
        return BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override public void beforeBulk(long executionId, BulkRequest request) {

            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {

            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

            }
        })//设置提交批处理操作的请求阀值数
            .setBulkActions(20000)
            //设置提交批处理操作的操作的请求大小阀值
            .setBulkSize(new ByteSizeValue(500, ByteSizeUnit.MB))
            // 设置刷新索引时间间隔
            .setFlushInterval(TimeValue.timeValueSeconds(30))
            //设置并发处理的线程个数
            .setConcurrentRequests(30)
            //设置回滚策略,等待时间1000ms,retry次数1次
            .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
                TimeValue.timeValueMillis(1000), 1))
            .build();

    }
}

数据导入完成之后刷新数据命令

curl -XPOST  http://192.168.88.126:9200/card_nature/_refresh

数据导入完成之后恢复es索引的刷新周期,并为数据增加一个副本

curl -H "Content-Type: application/json"  -XPUT http://192.168.88.126:9200/zzz/_settings?pretty=true -d \ '{"settings": {"refresh_interval": "5s","number_of_replicas":1}}'