ELK:ElasticSearch 5.x/6.x版本增删改查工具类
程序员文章站
2022-05-18 20:49:41
...
elasticsearch 相关使用方法记录
注意,在6.x版本中 type 字段已经废弃,改为了_doc
一、工具类
package com.test.elastic.crud;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.io.IOException;
import java.net.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* @author 小白
* @version 1.0
* 类说明
* @date 2020/5/25 19:36
*/
public class ElasticSearchUtils {
/**
* 解析配置文件的 es hosts
* 127.0.0.1:9200,127.0.0.2:9200,127.0.0.3:9200
* @param hosts
* @return
* @throws MalformedURLException
*/
public static List<HttpHost> getEsAddresses(String hosts) throws MalformedURLException {
String[] hostList = hosts.split(",");
List<HttpHost> addresses = new ArrayList<>();
for (String host : hostList) {
if (host.startsWith("http")) {
URL url = new URL(host);
addresses.add(new HttpHost(url.getHost(), url.getPort()));
} else {
String[] parts = host.split(":", -1);
if (parts.length > 1) {
addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1])));
} else {
throw new MalformedURLException("invalid elasticsearch hosts format");
}
}
}
return addresses;
}
/**
* 初始化client
*
* @throws UnknownHostException
*/
public static TransportClient initClient() throws UnknownHostException {
System.setProperty("es.set.netty.runtime.available.processors", "false");
TransportClient client = null;
try {
Settings settings = Settings.builder().put("cluster.name", com.suning.waf.hids.conf.Settings.getEsClusterName()).build();
client = new PreBuiltTransportClient(settings);
List<HttpHost> httpHosts = null;
try {
httpHosts = ElasticSearchUtils.getEsAddresses(com.suning.waf.hids.conf.Settings.getEsCluster());
} catch (MalformedURLException e) {
e.printStackTrace();
}
//建立连接
for(int i = 0; i < httpHosts.size(); i++) {
client.addTransportAddress(new TransportAddress(InetAddress.getByName(httpHosts.get(i).getHostName()), 9300));
}
return client;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 关闭client
*/
public static void closeClient(TransportClient client) {
client.close();
}
public static IndexResponse insertIntoElastic(TransportClient client, String index, String type, String json) {
IndexResponse response = client
.prepareIndex(index, type)
.setSource(json, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute()
.actionGet();
return response;
}
/**
* 删除文档
* @param client es 客户端
* @param index es 索引
* @param type es type类型
* @param id es id
* @throws MalformedURLException
* @throws UnknownHostException
*/
public static void deleteElasticDoc(TransportClient client, String index, String type, String id) throws MalformedURLException, UnknownHostException {
client.prepareDelete(index, type, id)
.setTimeout(TimeValue.MINUS_ONE)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}
/**
* 更新指定字段的值
*
* @param client es 客户端
* @param index es 索引
* @param type es type类型
* @param id es id
* @param map 更新字段的名称
*/
public static void updateElasticDoc(TransportClient client, String index, String type, String id, Map<String, String> map) throws MalformedURLException, UnknownHostException {
client.prepareUpdate(index, type, id)
.setDocAsUpsert(true)
.setDoc(map)
.setRetryOnConflict(100)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setTimeout(TimeValue.MINUS_ONE)
.get();
}
/**
* 获取命中数据
*
* @param client es 客户端
* @param queryBuilder 数据
* @param index 索引
* @param type 类型
* @return
*/
public static List<SearchHit> searcher(TransportClient client, QueryBuilder queryBuilder, String index, String type, String field){
List<SearchHit> list = new ArrayList<>();
// SortBuilder sortBuilder = SortBuilders.fieldSort(field)
// .order(SortOrder.DESC);
SearchResponse searchResponse = client.prepareSearch(index).setTypes(type)
.setQuery(queryBuilder)
//.addSort(sortBuilder)
.setSize(10000)
.execute()
.actionGet();
SearchHits hits = searchResponse.getHits();
//System.out.println("查询到记录数=" + hits.getTotalHits());
SearchHit[] searchHists = hits.getHits();
if(searchHists.length>0){
for(SearchHit hit:searchHists){
list.add(hit);
}
}
return list;
}
/**
* 查询所有数据
* @param client es 客户端
* @param index 索引
* @param type 类型
* @throws Exception
*/
public static List<SearchHit> searchAll(TransportClient client, QueryBuilder queryBuilder, String index, String type)throws Exception {
SearchResponse searchResponse = client.prepareSearch(index)
.setQuery(queryBuilder)
.setTypes(type)
.setSize(10000)
//这个游标维持多长时间
.setScroll(TimeValue.timeValueMinutes(8))
.execute().actionGet();
List<SearchHit> list = new ArrayList<>();
while(true){
for (SearchHit hit : searchResponse.getHits()) {
list.add(hit);
}
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(8))
.execute().actionGet();
if (searchResponse.getHits().getHits().length == 0) {
break;
}
}
return list;
}
}
二、使用方法
客户端变量
private static TransportClient client = null;
使用方法,注意,在6.x版本中 type 字段已经废弃,改为了_doc
//初始化连接
client = ElasticSearchUtils.initClient();
QueryBuilder queryBuilder = QueryBuilders.boolQuery();//具体查询需要自己实现
//查询
List<SearchHit> list = ElasticSearchUtils.searcher(client, queryBuilder, "index", "_doc", time);
//新增
ElasticSearchUtils.insertIntoElastic(client, "index", "_doc", new Gson().toJson("json", Object.class));
//更新
Map<String, String> map = new HashMap();
map.put("time", time);
ElasticSearchUtils.updateElasticDoc(client, "index", "_doc", id, map);
//删除
String id = searchHitList.get(i).getId();
ElasticSearchUtils.deleteElasticDoc(client, "index", "_doc", id);
客户端关闭
//关闭连接
ElasticSearchUtils.closeClient(client);