欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ELK:ElasticSearch 5.x/6.x版本增删改查工具类

程序员文章站 2022-05-18 20:49:41
...

elasticsearch 相关使用方法记录

注意,在6.x版本中 type 字段已经废弃,改为了_doc

一、工具类

package com.test.elastic.crud;

import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.io.IOException;
import java.net.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

/**
 * @author 小白
 * @version 1.0
 * 类说明
 * @date 2020/5/25 19:36
 */
public class ElasticSearchUtils {
    /**
     * 解析配置文件的 es hosts
     * 127.0.0.1:9200,127.0.0.2:9200,127.0.0.3:9200
     * @param hosts
     * @return
     * @throws MalformedURLException
     */
    public static List<HttpHost> getEsAddresses(String hosts) throws MalformedURLException {
        String[] hostList = hosts.split(",");
        List<HttpHost> addresses = new ArrayList<>();
        for (String host : hostList) {
            if (host.startsWith("http")) {
                URL url = new URL(host);
                addresses.add(new HttpHost(url.getHost(), url.getPort()));
            } else {
                String[] parts = host.split(":", -1);
                if (parts.length > 1) {
                    addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1])));
                } else {
                    throw new MalformedURLException("invalid elasticsearch hosts format");
                }
            }
        }
        return addresses;
    }

    /**
     * 初始化client
     *
     * @throws UnknownHostException
     */
    public static TransportClient initClient() throws UnknownHostException {
        System.setProperty("es.set.netty.runtime.available.processors", "false");
        TransportClient client = null;
        try {
            Settings settings = Settings.builder().put("cluster.name", com.suning.waf.hids.conf.Settings.getEsClusterName()).build();
            client = new PreBuiltTransportClient(settings);
            List<HttpHost> httpHosts = null;
            try {
                httpHosts = ElasticSearchUtils.getEsAddresses(com.suning.waf.hids.conf.Settings.getEsCluster());
            } catch (MalformedURLException e) {
                e.printStackTrace();
            }
            //建立连接
            for(int i = 0; i < httpHosts.size(); i++) {
                client.addTransportAddress(new TransportAddress(InetAddress.getByName(httpHosts.get(i).getHostName()), 9300));
            }
            return client;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 关闭client
     */
    public static void closeClient(TransportClient client) {
        client.close();
    }

    public static IndexResponse insertIntoElastic(TransportClient client, String index, String type, String json) {
        IndexResponse response = client
                .prepareIndex(index, type)
                .setSource(json, XContentType.JSON)
                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
                .execute()
                .actionGet();
        return response;
    }

    /**
     * 删除文档
     * @param client es 客户端
     * @param index es 索引
     * @param type  es type类型
     * @param id    es id
     * @throws MalformedURLException
     * @throws UnknownHostException
     */
    public static void deleteElasticDoc(TransportClient client, String index, String type, String id) throws MalformedURLException, UnknownHostException {
        client.prepareDelete(index, type, id)
                .setTimeout(TimeValue.MINUS_ONE)
                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
                .get();
    }


    /**
     * 更新指定字段的值
     *
     * @param client es 客户端
     * @param index es 索引
     * @param type  es type类型
     * @param id    es id
     * @param map  更新字段的名称
     */
    public static void updateElasticDoc(TransportClient client, String index, String type, String id, Map<String, String> map) throws MalformedURLException, UnknownHostException {
        client.prepareUpdate(index, type, id)
                .setDocAsUpsert(true)
                .setDoc(map)
                .setRetryOnConflict(100)
                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
                .setTimeout(TimeValue.MINUS_ONE)
                .get();
    }

    /**
     * 获取命中数据
     *
     * @param client es 客户端
     * @param queryBuilder 数据
     * @param index 索引
     * @param type 类型
     * @return
     */
    public static List<SearchHit> searcher(TransportClient client, QueryBuilder queryBuilder, String index, String type, String field){
        List<SearchHit> list = new ArrayList<>();
//        SortBuilder sortBuilder = SortBuilders.fieldSort(field)
//                .order(SortOrder.DESC);
        SearchResponse searchResponse = client.prepareSearch(index).setTypes(type)
                .setQuery(queryBuilder)
                //.addSort(sortBuilder)
                .setSize(10000)
                .execute()
                .actionGet();
        SearchHits hits = searchResponse.getHits();
        //System.out.println("查询到记录数=" + hits.getTotalHits());
        SearchHit[] searchHists = hits.getHits();
        if(searchHists.length>0){
            for(SearchHit hit:searchHists){
                list.add(hit);
            }
        }
        return list;
    }


    /**
     * 查询所有数据
     * @param client es 客户端
     * @param index 索引
     * @param type 类型
     * @throws Exception
     */
    public static List<SearchHit> searchAll(TransportClient client, QueryBuilder queryBuilder, String index, String type)throws Exception {
        SearchResponse searchResponse = client.prepareSearch(index)
                .setQuery(queryBuilder)
                .setTypes(type)
                .setSize(10000)
                //这个游标维持多长时间
                .setScroll(TimeValue.timeValueMinutes(8))
                .execute().actionGet();
        List<SearchHit> list = new ArrayList<>();
        while(true){
            for (SearchHit hit : searchResponse.getHits()) {
                list.add(hit);
            }
            searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
                    .setScroll(TimeValue.timeValueMinutes(8))
                    .execute().actionGet();
            if (searchResponse.getHits().getHits().length == 0) {
                break;
            }
        }
        return list;
    }

}

二、使用方法

客户端变量

private static TransportClient client = null;

使用方法,注意,在6.x版本中 type 字段已经废弃,改为了_doc

//初始化连接
client = ElasticSearchUtils.initClient();
QueryBuilder queryBuilder = QueryBuilders.boolQuery()//具体查询需要自己实现

//查询
List<SearchHit> list = ElasticSearchUtils.searcher(client, queryBuilder, "index", "_doc", time);

//新增
ElasticSearchUtils.insertIntoElastic(client, "index", "_doc", new Gson().toJson("json", Object.class));

//更新
Map<String, String> map = new HashMap();
map.put("time", time);
ElasticSearchUtils.updateElasticDoc(client, "index", "_doc", id, map);

//删除
String id = searchHitList.get(i).getId();
ElasticSearchUtils.deleteElasticDoc(client, "index", "_doc", id);

客户端关闭

//关闭连接
ElasticSearchUtils.closeClient(client);