欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

用elasticsearch聚合函数实现distinct查询

程序员文章站 2022-05-07 11:39:07
...

1.等价的sql

SELECT DISTINCT field1,field2 FROM test_index.test_type
 等价于
SELECT field1,field2 FROM test_index.test_type GROUP BY field1,field2

 

2.而group by的查询,在es中我们可以用Aggregation(聚合)去实现,等价的DSL查询语句如下:

POST /test_index/test_type/_search
{
	"from": 0,
	"size": 0,
	"aggregations": {
		"field1": {
			"terms": {
				"field": "field1",
				"size": 2147483647
			},
			"aggregations": {
				"field2": {
					"terms": {
						"field": "field2",
						"size": 2147483647
					}
				}
			}
		}
	}
}
 
3.java的实现:
import com.google.common.collect.ArrayListMultimap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Stack;
import java.util.stream.Collectors;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;

/**
 *
 * @author zhongchenghui
 */
public class EsSearch {

    private static final String CLUSTER_NAME = "test_cluster";
    private static final String ES_ADDRESSES = "192.168.12.1,192.168.12.2,192.168.12.3";
    private static final String INDEX_NAME = "test_index";
    private static final Client ES_CLIENT = ESClientFactory.newInstance(CLUSTER_NAME, ES_ADDRESSES);

    /**
     * 根据多个字段group by的查询
     *
     * @param type
     * @param groupColumnsNames
     * @return
     * @throws Exception
     */
    public List<Map<String, String>> getRowsByGroup(String type, String... groupColumnsNames) throws Exception {
        TermsBuilder allTermsBuilder = createTermsAggregationBuilder(groupColumnsNames);
        SearchResponse response = createSearchRequestBuilder(type, groupColumnsNames).setSize(0).addAggregation(allTermsBuilder).execute().actionGet();
        List<Map<String, String>> rows = new ArrayList<>();
        Terms agg = response.getAggregations().get(groupColumnsNames[0]);
        int i = 0;
        agg.getBuckets().stream().forEach((bucket) -> {
            rows.addAll(getFlatBucket(i, bucket, groupColumnsNames));
        });
        return rows;
    }

    /**
     * 逐层创建AggregationBuilder
     * (此处Integer.MAX_VALUE是逐层分组的最大组数)
     * @param groupColumnsNames
     * @return
     * @throws Exception
     */
    private TermsBuilder createTermsAggregationBuilder(String... groupColumnsNames) {
        TermsBuilder allTermsBuilder = AggregationBuilders.terms(groupColumnsNames[0]).field(groupColumnsNames[0]).size(Integer.MAX_VALUE);
        TermsBuilder tmpTermsBuilder = allTermsBuilder;
        for (int i = 1; i < groupColumnsNames.length; i++) {
            TermsBuilder termsBuilder = AggregationBuilders.terms(groupColumnsNames[i]).field(groupColumnsNames[i]).size(Integer.MAX_VALUE);
            tmpTermsBuilder.subAggregation(termsBuilder);
            tmpTermsBuilder = termsBuilder;
        }
        return allTermsBuilder;
    }

    /**
     * 创建查询请求的Builder
     *
     * @param type
     * @param groupColumnsNames
     * @return
     * @throws Exception
     */
    private SearchRequestBuilder createSearchRequestBuilder(String type, String... columnNames) {
        SearchRequestBuilder searchRequestBuilder = ES_CLIENT.prepareSearch(INDEX_NAME).setTypes(type).setSize(50000);
        if (Objects.nonNull(columnNames) && columnNames.length > 0) {
            searchRequestBuilder.addFields(columnNames);
        }
        return searchRequestBuilder;
    }

    /**
     * 用堆栈将respone中的聚合结果拉平返回(广度优先遍历)
     *
     * @param layer
     * @param bucket
     * @param groupColumnsNames
     * @return
     */
    private List<Map<String, String>> getFlatBucket(int layer, Terms.Bucket bucket, String... groupColumnsNames) {
        ArrayListMultimap<BucketNode, BucketNode> bucketRowMultimap = ArrayListMultimap.create();
        Stack<BucketNode> nodeStack = new Stack<>();
        BucketNode bucketNode = new BucketNode(layer, groupColumnsNames[layer], bucket);
        nodeStack.add(bucketNode);
        bucketRowMultimap.put(bucketNode, bucketNode);
        while (!nodeStack.isEmpty()) {
            bucketNode = nodeStack.pop();
            List<BucketNode> childerNodes = getChildrenBucketNodes(bucketNode, groupColumnsNames);
            if (childerNodes != null && !childerNodes.isEmpty()) {
                List<BucketNode> parentRoute = bucketRowMultimap.removeAll(bucketNode);
                for (BucketNode child : childerNodes) {
                    nodeStack.push(child);
                    bucketRowMultimap.putAll(child, parentRoute);
                    bucketRowMultimap.put(child, child);
                }
            }
        }
        return convertToRows(bucketRowMultimap.asMap().values());
    }

    /**
     * 获得下一层Bucket的节点列表
     *
     * @param parentNode
     * @param groupColumnsNames
     * @return
     */
    private List<BucketNode> getChildrenBucketNodes(BucketNode parentNode, String... groupColumnsNames) {
        int currentLayer = parentNode.layer + 1;
        if (currentLayer < groupColumnsNames.length) {
            String currentAggName = groupColumnsNames[currentLayer];
            Terms currentAgg = parentNode.bucket.getAggregations().get(currentAggName);
            if (Objects.nonNull(currentAgg)) {
                return currentAgg.getBuckets().stream().map(bucket -> new BucketNode(currentLayer, currentAggName, bucket)).collect(Collectors.toList());
            }
        }
        return null;

    }

    private List<Map<String, String>> convertToRows(Collection<Collection<BucketNode>> bucketRoutes) {
        return bucketRoutes.stream().map(bucketRoute -> convertToRow(bucketRoute)).collect(Collectors.toList());
    }

    private Map<String, String> convertToRow(Collection<BucketNode> bucketRoute) {
        Map<String, String> row = new HashMap<>();
        bucketRoute.stream().forEach(bucketNode -> row.put(bucketNode.aggName, bucketNode.bucket.getKeyAsString()));
        return row;
    }

    class BucketNode {

        int layer;
        String aggName;
        Terms.Bucket bucket;

        public BucketNode(int layer, String aggName, Terms.Bucket bucket) {
            BucketNode.this.layer = layer;
            BucketNode.this.aggName = aggName;
            BucketNode.this.bucket = bucket;
        }

        @Override
        public String toString() {
            return "BucketNode{" + "layer=" + layer + ", aggName=" + aggName + ", bucket_key=" + bucket.getKeyAsString() + '}';
        }

    }
}

 

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * @author zhongchenghui
 */
public class ESClientFactory {

    private static final Logger LOGGER = LoggerFactory.getLogger(ESClientFactory.class);
    private static final ConcurrentHashMap<String, Client> CLIENT_CACHE = new ConcurrentHashMap<>();

    public static Client newInstance(String clusterName, String hostStr) {
        Client client = CLIENT_CACHE.get(clusterName);
        if (client == null) {
            Map<String, Integer> addressMap = ESClientFactory.getESAddressMap(hostStr);
            Settings settings = Settings.settingsBuilder().put("cluster.name", clusterName).build();
            TransportClient newClient = TransportClient.builder().settings(settings).build();
            addressMap.keySet().forEach((host) -> {
                try {
                    newClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), addressMap.get(host)));
                } catch (UnknownHostException ex) {
                    LOGGER.error("Init ES client failure,cluster name is:{},Error:{}", clusterName, ex);
                }
            });
            client = newClient;
            CLIENT_CACHE.put(clusterName, newClient);
        }
        return client;
    }

    private static Map<String, Integer> getESAddressMap(String hostStr) {
        Map<String, Integer> hostMap = new HashMap<>();
        String[] hosts = hostStr.split(",");
        for (String host : hosts) {
            String[] hostPort = host.trim().split(":");
            Integer port = hostPort.length < 2 ? 9300 : Integer.valueOf(hostPort[1]);
            hostMap.put(hostPort[0], port);
        }
        return hostMap;
    }
}
 4.存在的问题:

   a.因为实现的方式是一层层往下聚合,当es中的document出现field1的字段为null的时候,该条件就不会往下聚合,即使该document的field2字段有值。(可用指定字符代替null来解决这个问题)

   b.不适合于数据量太大表,3中的代码要求最大每个字段的分组数不能大于Integer.MAX_VALUE

   c.返回的字段只能是参与group by的字段