1
package org.apache.lucene.facet.search;
3
import java.io.IOException;
4
import java.util.ArrayList;
5
import java.util.HashMap;
7
import java.util.Map.Entry;
8
import java.util.logging.Level;
9
import java.util.logging.Logger;
11
import org.apache.lucene.index.IndexReader;
13
import org.apache.lucene.facet.search.aggregator.Aggregator;
14
import org.apache.lucene.facet.search.params.FacetSearchParams;
15
import org.apache.lucene.facet.search.params.FacetRequest;
16
import org.apache.lucene.facet.search.results.FacetResult;
17
import org.apache.lucene.facet.search.results.IntermediateFacetResult;
18
import org.apache.lucene.facet.taxonomy.TaxonomyReader;
19
import org.apache.lucene.facet.util.PartitionsUtils;
20
import org.apache.lucene.facet.util.ScoredDocIdsUtils;
23
* Licensed to the Apache Software Foundation (ASF) under one or more
24
* contributor license agreements. See the NOTICE file distributed with
25
* this work for additional information regarding copyright ownership.
26
* The ASF licenses this file to You under the Apache License, Version 2.0
27
* (the "License"); you may not use this file except in compliance with
28
* the License. You may obtain a copy of the License at
30
* http://www.apache.org/licenses/LICENSE-2.0
32
* Unless required by applicable law or agreed to in writing, software
33
* distributed under the License is distributed on an "AS IS" BASIS,
34
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
35
* See the License for the specific language governing permissions and
36
* limitations under the License.
40
* Standard implementation for {@link FacetsAccumulator}, utilizing partitions to save on memory.
42
* Why partitions? Because if there are say 100M categories out of which
43
* only top K are required, we must first compute value for all 100M categories
44
* (going over all documents) and only then could we select top K.
45
* This is made easier on memory by working in partitions of distinct categories:
46
* Once a values for a partition are found, we take the top K for that
47
* partition and work on the next partition, them merge the top K of both,
48
* and so forth, thereby computing top K with RAM needs for the size of
49
* a single partition rather than for the size of all the 100M categories.
51
* Decision on partitions size is done at indexing time, and the facet information
52
* for each partition is maintained separately.
54
* <u>Implementation detail:</u> Since facets information of each partition is
55
* maintained in a separate "category list", we can be more efficient
56
* at search time, because only the facet info for a single partition
57
* need to be read while processing that partition.
59
* @lucene.experimental
61
public class StandardFacetsAccumulator extends FacetsAccumulator {
63
private static final Logger logger = Logger.getLogger(StandardFacetsAccumulator.class.getName());
65
protected final IntArrayAllocator intArrayAllocator;
66
protected final FloatArrayAllocator floatArrayAllocator;
68
protected int partitionSize;
69
protected int maxPartitions;
70
protected boolean isUsingComplements;
72
private TotalFacetCounts totalFacetCounts;
74
private Object accumulateGuard;
76
public StandardFacetsAccumulator(FacetSearchParams searchParams, IndexReader indexReader,
77
TaxonomyReader taxonomyReader, IntArrayAllocator intArrayAllocator,
78
FloatArrayAllocator floatArrayAllocator) {
80
super(searchParams,indexReader,taxonomyReader);
81
int realPartitionSize = intArrayAllocator == null || floatArrayAllocator == null
82
? PartitionsUtils.partitionSize(searchParams, taxonomyReader) : -1; // -1 if not needed.
83
this.intArrayAllocator = intArrayAllocator != null
85
// create a default one if null was provided
86
: new IntArrayAllocator(realPartitionSize, 1);
87
this.floatArrayAllocator = floatArrayAllocator != null
89
// create a default one if null provided
90
: new FloatArrayAllocator(realPartitionSize, 1);
91
// can only be computed later when docids size is known
92
isUsingComplements = false;
93
partitionSize = PartitionsUtils.partitionSize(searchParams, taxonomyReader);
94
maxPartitions = (int) Math.ceil(this.taxonomyReader.getSize() / (double) partitionSize);
95
accumulateGuard = new Object();
98
public StandardFacetsAccumulator(FacetSearchParams searchParams, IndexReader indexReader,
99
TaxonomyReader taxonomyReader) {
101
this(searchParams, indexReader, taxonomyReader, null, null);
105
public List<FacetResult> accumulate(ScoredDocIDs docids) throws IOException {
107
// synchronize to prevent calling two accumulate()'s at the same time.
108
// We decided not to synchronize the method because that might mislead
109
// users to feel encouraged to call this method simultaneously.
110
synchronized (accumulateGuard) {
112
// only now we can compute this
113
isUsingComplements = shouldComplement(docids);
115
if (isUsingComplements) {
117
totalFacetCounts = TotalFacetCountsCache.getSingleton()
118
.getTotalCounts(indexReader, taxonomyReader,
119
searchParams.getFacetIndexingParams(), searchParams.getClCache());
120
if (totalFacetCounts != null) {
121
docids = ScoredDocIdsUtils.getComplementSet(docids, indexReader);
123
isUsingComplements = false;
125
} catch (UnsupportedOperationException e) {
126
// TODO (Facet): this exception is thrown from TotalCountsKey if the
127
// IndexReader used does not support getVersion(). We should re-think
128
// this: is this tiny detail worth disabling total counts completely
129
// for such readers? Currently, it's not supported by Parallel and
130
// MultiReader, which might be problematic for several applications.
131
// We could, for example, base our "isCurrent" logic on something else
132
// than the reader's version. Need to think more deeply about it.
133
if (logger.isLoggable(Level.FINEST)) {
134
logger.log(Level.FINEST, "IndexReader used does not support completents: ", e);
136
isUsingComplements = false;
137
} catch (IOException e) {
138
if (logger.isLoggable(Level.FINEST)) {
139
logger.log(Level.FINEST, "Failed to load/calculate total counts (complement counting disabled): ", e);
141
// silently fail if for some reason failed to load/save from/to dir
142
isUsingComplements = false;
143
} catch (Exception e) {
144
// give up: this should not happen!
145
IOException ioEx = new IOException(
146
"PANIC: Got unexpected exception while trying to get/calculate total counts: "
153
docids = actualDocsToAccumulate(docids);
155
FacetArrays facetArrays = new FacetArrays(intArrayAllocator, floatArrayAllocator);
157
HashMap<FacetRequest, IntermediateFacetResult> fr2tmpRes = new HashMap<FacetRequest, IntermediateFacetResult>();
160
for (int part = 0; part < maxPartitions; part++) {
162
// fill arrays from category lists
163
fillArraysForPartition(docids, facetArrays, part);
165
int offset = part * partitionSize;
167
// for each partition we go over all requests and handle
169
// the request maintains the merged result.
170
// In this implementation merges happen after each
172
// but other impl could merge only at the end.
173
for (FacetRequest fr : searchParams.getFacetRequests()) {
174
FacetResultsHandler frHndlr = fr.createFacetResultsHandler(taxonomyReader);
175
IntermediateFacetResult res4fr = frHndlr.fetchPartitionResult(facetArrays, offset);
176
IntermediateFacetResult oldRes = fr2tmpRes.get(fr);
177
if (oldRes != null) {
178
res4fr = frHndlr.mergeResults(oldRes, res4fr);
180
fr2tmpRes.put(fr, res4fr);
187
// gather results from all requests into a list for returning them
188
List<FacetResult> res = new ArrayList<FacetResult>();
189
for (FacetRequest fr : searchParams.getFacetRequests()) {
190
FacetResultsHandler frHndlr = fr.createFacetResultsHandler(taxonomyReader);
191
IntermediateFacetResult tmpResult = fr2tmpRes.get(fr);
192
if (tmpResult == null) {
193
continue; // do not add a null to the list.
195
FacetResult facetRes = frHndlr.renderFacetResult(tmpResult);
196
// final labeling if allowed (because labeling is a costly operation)
197
if (isAllowLabeling()) {
198
frHndlr.labelResult(facetRes);
208
* Set the actual set of documents over which accumulation should take place.
210
* Allows to override the set of documents to accumulate for. Invoked just
211
* before actual accumulating starts. From this point that set of documents
212
* remains unmodified. Default implementation just returns the input
216
* candidate documents to accumulate for
217
* @return actual documents to accumulate for
219
protected ScoredDocIDs actualDocsToAccumulate(ScoredDocIDs docids) throws IOException {
223
/** Check if it is worth to use complements */
224
protected boolean shouldComplement(ScoredDocIDs docids) {
227
(docids.size() > indexReader.numDocs() * getComplementThreshold()) ;
231
* Iterate over the documents for this partition and fill the facet arrays with the correct
232
* count/complement count/value.
233
* @param internalCollector
236
* @throws IOException
238
private final void fillArraysForPartition(ScoredDocIDs docids,
239
FacetArrays facetArrays, int partition) throws IOException {
241
if (isUsingComplements) {
242
initArraysByTotalCounts(facetArrays, partition, docids.size());
244
facetArrays.free(); // to get a cleared array for this partition
247
HashMap<CategoryListIterator, Aggregator> categoryLists = getCategoryListMap(
248
facetArrays, partition);
250
for (Entry<CategoryListIterator, Aggregator> entry : categoryLists.entrySet()) {
251
CategoryListIterator categoryList = entry.getKey();
252
if (!categoryList.init()) {
256
Aggregator categorator = entry.getValue();
257
ScoredDocIDsIterator iterator = docids.iterator();
258
while (iterator.next()) {
259
int docID = iterator.getDocID();
260
if (!categoryList.skipTo(docID)) {
263
categorator.setNextDoc(docID, iterator.getScore());
265
while ((ordinal = categoryList.nextCategory()) <= Integer.MAX_VALUE) {
266
categorator.aggregate((int) ordinal);
273
* Init arrays for partition by total counts, optionally applying a factor
275
private final void initArraysByTotalCounts(FacetArrays facetArrays, int partition, int nAccumulatedDocs) {
276
int[] intArray = facetArrays.getIntArray();
277
totalFacetCounts.fillTotalCountsForPartition(intArray, partition);
278
double totalCountsFactor = getTotalCountsFactor();
279
// fix total counts, but only if the effect of this would be meaningfull.
280
if (totalCountsFactor < 0.99999) {
281
int delta = nAccumulatedDocs + 1;
282
for (int i = 0; i < intArray.length; i++) {
283
intArray[i] *= totalCountsFactor;
284
// also translate to prevent loss of non-positive values
285
// due to complement sampling (ie if sampled docs all decremented a certain category).
286
intArray[i] += delta;
292
* Expert: factor by which counts should be multiplied when initializing
293
* the count arrays from total counts.
294
* Default implementation for this returns 1, which is a no op.
295
* @return a factor by which total counts should be multiplied
297
protected double getTotalCountsFactor() {
302
* Create an {@link Aggregator} and a {@link CategoryListIterator} for each
303
* and every {@link FacetRequest}. Generating a map, matching each
304
* categoryListIterator to its matching aggregator.
306
* If two CategoryListIterators are served by the same aggregator, a single
307
* aggregator is returned for both.
309
* <b>NOTE: </b>If a given category list iterator is needed with two different
310
* aggregators (e.g counting and association) - an exception is thrown as this
311
* functionality is not supported at this time.
313
protected HashMap<CategoryListIterator, Aggregator> getCategoryListMap(FacetArrays facetArrays,
314
int partition) throws IOException {
316
HashMap<CategoryListIterator, Aggregator> categoryLists = new HashMap<CategoryListIterator, Aggregator>();
318
for (FacetRequest facetRequest : searchParams.getFacetRequests()) {
319
Aggregator categoryAggregator = facetRequest.createAggregator(
320
isUsingComplements, facetArrays, indexReader, taxonomyReader);
322
CategoryListIterator cli =
323
facetRequest.createCategoryListIterator(indexReader, taxonomyReader, searchParams, partition);
325
// get the aggregator
326
Aggregator old = categoryLists.put(cli, categoryAggregator);
328
if (old != null && !old.equals(categoryAggregator)) {
329
// TODO (Facet): create a more meaningful RE class, and throw it.
330
throw new RuntimeException(
331
"Overriding existing category list with different aggregator. THAT'S A NO NO!");
333
// if the aggregator is the same we're covered
336
return categoryLists;
b'\\ No newline at end of file'