solr的facet源码解读(四)——facet.field之非数字单值域类型
上一篇博客中写了单值域数字类型的域是如何做facet的,这一篇写单值域的非数字类型的facet。他的思路是分开多个段进行收集,在收集后最后再进行聚合操作,每个段的收集都会在一个线程中进行,也就是多个线程处理多个段的facet,并且默认的参数就是多线程处理的。看下代码:
case FCS://只能处理单值域且不分词的。
assert !multiToken;
if (ft.getNumericType() != null/* && !sf.multiValued()*/) {//这个是我自己注释的。
// force numeric faceting
if (prefix != null && !prefix.isEmpty()) {
throw new SolrException(ErrorCode.BAD_REQUEST, FacetParams.FACET_PREFIX + " is not supported on numeric types");
}
// 这个会尽可能的不使用读取词典表,除非是要返回的结果不够了且使用了minCount=0的参数
counts = NumericFacets.getCounts(searcher, base, field, offset, limit, mincount, missing, sort);
} else {
//非数字类型的singleValue的facet。
PerSegmentSingleValuedFaceting ps = new PerSegmentSingleValuedFaceting(searcher, base, field, offset, limit, mincount, missing, sort, prefix);
Executor executor = threads == 0 ? directExecutor : facetExecutor;
ps.setNumThreads(threads);
counts = ps.getFacetCounts(executor);
}
break;
上面的代码是在SimpleFacets.getTermCounts(String, Integer, DocSet)中,当是单值域的时候就会优先使用FCS方法,在不是数字类型的时候,就会进入else。在PerSegmentSingleValedFaceting中,会每个段使用一个线程进行收集,在每个段收集的时候会使用SortedDocValue(回想一下,对于单值域的Binary类型的docValue使用的格式就是使用的SortedDocValue,而不是使用的BinaryDocValue)。PerSegmentSingleValuedFaceting也会使用多线程,上面的threads参数默认就是-1,也就是说默认就会使用多线程进行收集。看看getFacetCounts方法:
/** 收集单值的非数字类型的term的facet。 */ NamedList<Integer> getFacetCounts(Executor executor) throws IOException { CompletionService<SegFacet> completionService = new ExecutorCompletionService<>(executor); // reuse the translation logic to go from top level set to per-segment set baseSet = docs.getTopFilter();// final List<AtomicReaderContext> leaves = searcher.getTopReaderContext().leaves(); LinkedList<Callable<SegFacet>> pending = new LinkedList<>(); int threads = nThreads <= 0 ? Integer.MAX_VALUE : nThreads;//这个值默认就是-1,也就是默认就会使用多线程 for (final AtomicReaderContext leave : leaves) { final SegFacet segFacet = new SegFacet(leave); Callable<SegFacet> task = new Callable<SegFacet>() {//每个段形成一个任务, @Override public SegFacet call() throws Exception { segFacet.countTerms();//任务,在下面有代码 return segFacet; } }; if (--threads >= 0) { completionService.submit(task); } else { pending.add(task); } } //根据每个段的termEnm的当前的term进行排序的堆。 PriorityQueue<SegFacet> queue = new PriorityQueue<SegFacet>(leaves.size()) { @Override protected boolean lessThan(SegFacet a, SegFacet b) { return a.tempBR.compareTo(b.tempBR) < 0; } }; //这个域中没有值的doc boolean hasMissingCount = false; int missingCount = 0;//这个域中没有值的doc的数量 for (int i = 0, c = leaves.size(); i < c; i++) { SegFacet seg = null; try { Future<SegFacet> future = completionService.take(); seg = future.get(); if (!pending.isEmpty()) {//执行一个新的任务。 completionService.submit(pending.removeFirst()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"Error in per-segment faceting on field: " + fieldName, cause); } } if (seg.startTermIndex < seg.endTermIndex) {//这个段中收集到了term if (seg.startTermIndex == -1) {//符合条件的第一个term的排序是-1,说明第一个term是null,即有的doc在这个域中没有值。 hasMissingCount = true; missingCount += seg.counts[0];//没有值的doc的数量,counts的第一个值表示没有值的doc的数量。 seg.pos = 0;//termEnum的开始位置是0 } else { seg.pos = seg.startTermIndex;//TermEnum的开始位置 } if (seg.pos < seg.endTermIndex) {// seg.tenum = seg.si.termsEnum();//从docValue中获得的termEnum seg.tenum.seekExact(seg.pos);//定位到开始位置 seg.tempBR = seg.tenum.term();//开始位置的term queue.add(seg);//添加到优先队列里面 } } } FacetCollector collector; if (sort.equals(FacetParams.FACET_SORT_COUNT) || sort.equals(FacetParams.FACET_SORT_COUNT_LEGACY)) {//基于命中的doc的数量排序的 collector = new CountSortedFacetCollector(offset, limit, mincount); } else { collector = new IndexSortedFacetCollector(offset, limit, mincount);//基于命中的term的字符串排序的。没有看 } BytesRefBuilder val = new BytesRefBuilder(); while (queue.size() > 0) { SegFacet seg = queue.top(); // we will normally end up advancing the term enum for this segment while still using "val", so we need to make a copy since the BytesRef may be shared across calls. val.copyBytes(seg.tempBR); int count = 0; do {//这个循环是轮训一个term。如果某个段也是这个term,则继续下一个段,将所有的段中当前的term是这个term的doc的数量相加。 //当前的term匹配的doc的数量 count += seg.counts[seg.pos - seg.startTermIndex]; // if mincount>0 then seg.pos++ can skip ahead to the next non-zero entry. seg.pos++; if (seg.pos >= seg.endTermIndex) {//到达最后一个 queue.pop();//将这个segment删除 seg = queue.top();//使用下一个segment } else { seg.tempBR = seg.tenum.next();//继续轮训这个segment seg = queue.updateTop();//更新堆顶。 } } while (seg != null && val.get().compareTo(seg.tempBR) == 0);//如果更新堆顶之后的term没有变化了,则继续轮训这个term。 boolean stop = collector.collect(val.get(), count);//收集这个term的数量。如果收集够了,则返回true。collector使用了treeSet,有排序功能,并且在收集的时候会检查是否命中的doc的数量即count大于指定的值,如果不是,则不收集。 if (stop) break; } NamedList<Integer> res = collector.getFacetCounts();//在这个方法里面就会考虑offset参数和limit参数,先把offset个忽略,然后再查找limit个。 // convert labels to readable form FieldType ft = searcher.getSchema().getFieldType(fieldName); int sz = res.size(); for (int i = 0; i < sz; i++) {//变为可读的结果 res.setName(i, ft.indexedToReadable(res.getName(i))); } if (missing) {//如果需要返回null的term,也就是没有值的结果 if (!hasMissingCount) { missingCount = SimpleFacets.getFieldMissingCount(searcher, docs, fieldName); } res.add(null, missingCount); } return res; }
在上面的代码中可以发现,他是将每个段交给一个线程去收集term,然后再对最后每个段的结果做操作,将相同的term的doc的数量相加,然后获得offset后面的limit个term。
下面看看每个段的收集的代码:
/** 每个段的facet结果 */ class SegFacet { AtomicReaderContext context; SegFacet(AtomicReaderContext context) { this.context = context; } /**排好序的docValue*/ SortedDocValues si; int startTermIndex;//符合条件的第一个term的排序 int endTermIndex;//符合条件的第后一个term的排序 /**每个term对应的doc的数量的值,下标是term的次序*/ int[] counts; int pos; // only used when merging,在合并的时候,在counts中的指针。 /**docValue的termEnum*/ TermsEnum tenum; // only used when merging /**facet到的所有的term中开始位置的term*/ BytesRef tempBR = new BytesRef(); void countTerms() throws IOException { //这个还是会优先读取docValue,如果没有docValue就会读取词典表。这个是获得由顺序的term,在之前的SortedDocValue的时候,就是有排序的,所以可以满足这里的排序。 si = FieldCache.DEFAULT.getTermsIndex(context.reader(), fieldName); if (prefix != null) { BytesRefBuilder prefixRef = new BytesRefBuilder(); prefixRef.copyChars(prefix); startTermIndex = si.lookupTerm(prefixRef.get());//直到不小于指定的字符串的下一个。 if (startTermIndex < 0)//如果没有查到,返回的是负数。 startTermIndex = -startTermIndex - 1;// prefixRef.append(UnicodeUtil.BIG_TERM);//添加一个最大的值,这样就能确定一个区间了 endTermIndex = si.lookupTerm(prefixRef.get()); assert endTermIndex < 0; endTermIndex = -endTermIndex - 1;//结束的位置 } else { startTermIndex = -1; endTermIndex = si.getValueCount(); } final int nTerms = endTermIndex - startTermIndex;//能收集的term的个数 ,不过比个数大一,因为在下面使用的时候,第一个是用来保存值位null的doc的数量 if (nTerms > 0) { // count collection array only needs to be as big as the number of terms we are going to collect counts for. final int[] counts = this.counts = new int[nTerms]; DocIdSet idSet = baseSet.getDocIdSet(context, null); //和当前的段中id取交集, DocIdSetIterator iter = idSet.iterator(); int doc; if (prefix == null) { // specialized version when collecting counts for all terms while ((doc = iter.nextDoc()) < DocIdSetIterator.NO_MORE_DOCS) { counts[1 + si.getOrd(doc)]++;//第一个用来保存没有值的doc的数量。 } } else { // version that adjusts term numbers because we aren't collecting the full range while ((doc = iter.nextDoc()) < DocIdSetIterator.NO_MORE_DOCS) { int term = si.getOrd(doc); int arrIdx = term - startTermIndex; if (arrIdx >= 0 && arrIdx < nTerms)//第一个条件是必须在指定的prefix后面,第二个条件没有用 counts[arrIdx]++; } } } } }
可以发现,他也是使用的FieldCache,不过获得的term是有顺序的,即可以使用SortedDocValue。在收集term的时候也是轮训的所有的doc,然后找到每个term的顺序,如果顺序在指定的范围内,则收集,单单看收集的term的时间复杂度,是O(n),不过他是分多个线程收集的。
对比一下数字类型的和非数字类型的,数字类型的facet可能会返回没有doc命中的term,条件是在命中的doc不满足条件的前提下;但是字符串类型的facet不会查看没有doc命中的doc,他仅仅返回有doc命中的term。除了这个区别外,他的时间复杂度是一样的,不过字符串类型的分为多个线程收集的。
上一篇: FST源代码解读5——FST的压缩
下一篇: FST源代码解读3——编译节点