用elasticsearch聚合函数实现distinct查询
程序员文章站
2022-05-07 11:39:07
...
1.等价的sql
SELECT DISTINCT field1,field2 FROM test_index.test_type等价于
SELECT field1,field2 FROM test_index.test_type GROUP BY field1,field2
2.而group by的查询,在es中我们可以用Aggregation(聚合)去实现,等价的DSL查询语句如下:
POST /test_index/test_type/_search { "from": 0, "size": 0, "aggregations": { "field1": { "terms": { "field": "field1", "size": 2147483647 }, "aggregations": { "field2": { "terms": { "field": "field2", "size": 2147483647 } } } } } }
3.java的实现:
import com.google.common.collect.ArrayListMultimap; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Stack; import java.util.stream.Collectors; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; /** * * @author zhongchenghui */ public class EsSearch { private static final String CLUSTER_NAME = "test_cluster"; private static final String ES_ADDRESSES = "192.168.12.1,192.168.12.2,192.168.12.3"; private static final String INDEX_NAME = "test_index"; private static final Client ES_CLIENT = ESClientFactory.newInstance(CLUSTER_NAME, ES_ADDRESSES); /** * 根据多个字段group by的查询 * * @param type * @param groupColumnsNames * @return * @throws Exception */ public List<Map<String, String>> getRowsByGroup(String type, String... groupColumnsNames) throws Exception { TermsBuilder allTermsBuilder = createTermsAggregationBuilder(groupColumnsNames); SearchResponse response = createSearchRequestBuilder(type, groupColumnsNames).setSize(0).addAggregation(allTermsBuilder).execute().actionGet(); List<Map<String, String>> rows = new ArrayList<>(); Terms agg = response.getAggregations().get(groupColumnsNames[0]); int i = 0; agg.getBuckets().stream().forEach((bucket) -> { rows.addAll(getFlatBucket(i, bucket, groupColumnsNames)); }); return rows; } /** * 逐层创建AggregationBuilder * (此处Integer.MAX_VALUE是逐层分组的最大组数) * @param groupColumnsNames * @return * @throws Exception */ private TermsBuilder createTermsAggregationBuilder(String... groupColumnsNames) { TermsBuilder allTermsBuilder = AggregationBuilders.terms(groupColumnsNames[0]).field(groupColumnsNames[0]).size(Integer.MAX_VALUE); TermsBuilder tmpTermsBuilder = allTermsBuilder; for (int i = 1; i < groupColumnsNames.length; i++) { TermsBuilder termsBuilder = AggregationBuilders.terms(groupColumnsNames[i]).field(groupColumnsNames[i]).size(Integer.MAX_VALUE); tmpTermsBuilder.subAggregation(termsBuilder); tmpTermsBuilder = termsBuilder; } return allTermsBuilder; } /** * 创建查询请求的Builder * * @param type * @param groupColumnsNames * @return * @throws Exception */ private SearchRequestBuilder createSearchRequestBuilder(String type, String... columnNames) { SearchRequestBuilder searchRequestBuilder = ES_CLIENT.prepareSearch(INDEX_NAME).setTypes(type).setSize(50000); if (Objects.nonNull(columnNames) && columnNames.length > 0) { searchRequestBuilder.addFields(columnNames); } return searchRequestBuilder; } /** * 用堆栈将respone中的聚合结果拉平返回(广度优先遍历) * * @param layer * @param bucket * @param groupColumnsNames * @return */ private List<Map<String, String>> getFlatBucket(int layer, Terms.Bucket bucket, String... groupColumnsNames) { ArrayListMultimap<BucketNode, BucketNode> bucketRowMultimap = ArrayListMultimap.create(); Stack<BucketNode> nodeStack = new Stack<>(); BucketNode bucketNode = new BucketNode(layer, groupColumnsNames[layer], bucket); nodeStack.add(bucketNode); bucketRowMultimap.put(bucketNode, bucketNode); while (!nodeStack.isEmpty()) { bucketNode = nodeStack.pop(); List<BucketNode> childerNodes = getChildrenBucketNodes(bucketNode, groupColumnsNames); if (childerNodes != null && !childerNodes.isEmpty()) { List<BucketNode> parentRoute = bucketRowMultimap.removeAll(bucketNode); for (BucketNode child : childerNodes) { nodeStack.push(child); bucketRowMultimap.putAll(child, parentRoute); bucketRowMultimap.put(child, child); } } } return convertToRows(bucketRowMultimap.asMap().values()); } /** * 获得下一层Bucket的节点列表 * * @param parentNode * @param groupColumnsNames * @return */ private List<BucketNode> getChildrenBucketNodes(BucketNode parentNode, String... groupColumnsNames) { int currentLayer = parentNode.layer + 1; if (currentLayer < groupColumnsNames.length) { String currentAggName = groupColumnsNames[currentLayer]; Terms currentAgg = parentNode.bucket.getAggregations().get(currentAggName); if (Objects.nonNull(currentAgg)) { return currentAgg.getBuckets().stream().map(bucket -> new BucketNode(currentLayer, currentAggName, bucket)).collect(Collectors.toList()); } } return null; } private List<Map<String, String>> convertToRows(Collection<Collection<BucketNode>> bucketRoutes) { return bucketRoutes.stream().map(bucketRoute -> convertToRow(bucketRoute)).collect(Collectors.toList()); } private Map<String, String> convertToRow(Collection<BucketNode> bucketRoute) { Map<String, String> row = new HashMap<>(); bucketRoute.stream().forEach(bucketNode -> row.put(bucketNode.aggName, bucketNode.bucket.getKeyAsString())); return row; } class BucketNode { int layer; String aggName; Terms.Bucket bucket; public BucketNode(int layer, String aggName, Terms.Bucket bucket) { BucketNode.this.layer = layer; BucketNode.this.aggName = aggName; BucketNode.this.bucket = bucket; } @Override public String toString() { return "BucketNode{" + "layer=" + layer + ", aggName=" + aggName + ", bucket_key=" + bucket.getKeyAsString() + '}'; } } }
import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author zhongchenghui */ public class ESClientFactory { private static final Logger LOGGER = LoggerFactory.getLogger(ESClientFactory.class); private static final ConcurrentHashMap<String, Client> CLIENT_CACHE = new ConcurrentHashMap<>(); public static Client newInstance(String clusterName, String hostStr) { Client client = CLIENT_CACHE.get(clusterName); if (client == null) { Map<String, Integer> addressMap = ESClientFactory.getESAddressMap(hostStr); Settings settings = Settings.settingsBuilder().put("cluster.name", clusterName).build(); TransportClient newClient = TransportClient.builder().settings(settings).build(); addressMap.keySet().forEach((host) -> { try { newClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), addressMap.get(host))); } catch (UnknownHostException ex) { LOGGER.error("Init ES client failure,cluster name is:{},Error:{}", clusterName, ex); } }); client = newClient; CLIENT_CACHE.put(clusterName, newClient); } return client; } private static Map<String, Integer> getESAddressMap(String hostStr) { Map<String, Integer> hostMap = new HashMap<>(); String[] hosts = hostStr.split(","); for (String host : hosts) { String[] hostPort = host.trim().split(":"); Integer port = hostPort.length < 2 ? 9300 : Integer.valueOf(hostPort[1]); hostMap.put(hostPort[0], port); } return hostMap; } }
4.存在的问题:
a.因为实现的方式是一层层往下聚合,当es中的document出现field1的字段为null的时候,该条件就不会往下聚合,即使该document的field2字段有值。(可用指定字符代替null来解决这个问题)
b.不适合于数据量太大表,3中的代码要求最大每个字段的分组数不能大于Integer.MAX_VALUE
c.返回的字段只能是参与group by的字段
下一篇: sql查询去除重复值语句