es导入大批量数据
程序员文章站
2022-03-04 14:07:33
...
要把2.5亿数据导入es中
一开始使用BulkRequestBuilder结果每次跑到60万,es节点就挂了,我也不知道为啥。。。
之后使用BulkProcessor,可以导入
出现问题:
1)一开始将几个字段都默认为索引,没有设置refresh_interval,number_of_replicas参数 ,refresh_interval的默认值为1s,(即每秒更新索引),number_of_replicas每个主分片的副本数,默认值是 1,因此导入非常慢,19个小时才导完
2)之后又新建了一个库,不对这几个字段建索引,设置refresh_interval= -1,(即不更新索引), “number_of_replicas”: 0, 4小时不到导完 ,但是会发现es中的数据不会实时更新,需要手动刷新一下
3)之后又新建了一个库,对这几个字段建索引,设置refresh_interval= -1,(即不更新索引), “number_of_replicas”: 0, 4.5个小时导完
由此可见,是否索引字段及索引刷新时间这两个因素相比,索引刷新时间对导入速度影响比较大
#!/usr/bin/env bash
# 建立索引和类型
curl -H 'content-type: application/json' -XPUT \
-d @- 'http://192.168.88.126:9200/zzz' <<CURL_DATA
{
"settings": {
"number_of_replicas": 0,
"number_of_shards": 6,
"refresh_interval": "-1",
"index.mapping.total_fields.limit": 100000
},
"mappings": {
"log": {
"properties": {
"account":{"type": "keyword"},
"path":{"type": "keyword"},
"card":{"type": "keyword"},
"mid":{"type": "keyword"},
"pid":{"type": "keyword"},
"request_time":{"type": "keyword"},
"status":{"type": "keyword"}
}
}
}
}
CURL_DATA
public class Esimport {
private static TransportClient client= null;
public static void importcsv() throws IOException {
TransportClient client = getClient();
File f = new File("/home/upsmart/1clean/anu/alles.csv");
BufferedReader br = new BufferedReader(new FileReader(f));
String line = null ;
int count = 0;
BulkProcessor bulkProcessor = getProcessor();
while ((line = br.readLine())!=null){
String[] filedValues = line.split("\t");
bulkProcessor.add(new IndexRequest("zzz","log",Integer.toString(count++)).source(jsonBuilder()
.startObject()
.field("account", filedValues[0])
.field("path", filedValues[1])
.field("card", filedValues[2])
.field("mid", filedValues[3])
.field("pid", filedValues[4])
.field("request_time", filedValues[5])
.field("status", filedValues[6])
.endObject()));
if (count % 20000 == 0) {
System.out.println("==========第"+count+"条");
}
}
bulkProcessor.flush();
try {
System.out.println("try");
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
System.out.println("over");
} catch (InterruptedException e) {
System.out.println("关闭异常");
}
br.close();
}
static TransportClient getClient() {
if (null == client) {
synchronized (Esimport.class) {
if(null == client) {
Settings settings = Settings.builder().put("cluster.name", "xiaoyao").put("client.transport.sniff", true).build();
try {
client = new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.88.126"),9300));
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
}
return client;
}
public static void main(String[] args) throws IOException {
System.out.println("开始"+System.currentTimeMillis());
importcsv();
System.out.println("结束"+System.currentTimeMillis());
}
static BulkProcessor getProcessor(){
return BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
}
})//设置提交批处理操作的请求阀值数
.setBulkActions(20000)
//设置提交批处理操作的操作的请求大小阀值
.setBulkSize(new ByteSizeValue(500, ByteSizeUnit.MB))
// 设置刷新索引时间间隔
.setFlushInterval(TimeValue.timeValueSeconds(30))
//设置并发处理的线程个数
.setConcurrentRequests(30)
//设置回滚策略,等待时间1000ms,retry次数1次
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(1000), 1))
.build();
}
}
数据导入完成之后刷新数据命令
curl -XPOST http://192.168.88.126:9200/card_nature/_refresh
数据导入完成之后恢复es索引的刷新周期,并为数据增加一个副本
curl -H "Content-Type: application/json" -XPUT http://192.168.88.126:9200/zzz/_settings?pretty=true -d \ '{"settings": {"refresh_interval": "5s","number_of_replicas":1}}'
上一篇: Javascript继承
下一篇: Rafy 框架 - 大批量导入实体