1
package org.apache.lucene.search;
4
* Licensed to the Apache Software Foundation (ASF) under one or more
5
* contributor license agreements. See the NOTICE file distributed with
6
* this work for additional information regarding copyright ownership.
7
* The ASF licenses this file to You under the Apache License, Version 2.0
8
* (the "License"); you may not use this file except in compliance with
9
* the License. You may obtain a copy of the License at
11
* http://www.apache.org/licenses/LICENSE-2.0
13
* Unless required by applicable law or agreed to in writing, software
14
* distributed under the License is distributed on an "AS IS" BASIS,
15
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
* See the License for the specific language governing permissions and
17
* limitations under the License.
20
import java.io.IOException;
21
import java.util.ArrayList;
22
import java.util.List;
24
import org.apache.lucene.index.IndexReader;
25
import org.apache.lucene.util.RamUsageEstimator;
28
* Caches all docs, and optionally also scores, coming from
29
* a search, and is then able to replay them to another
30
* collector. You specify the max RAM this class may use.
31
* Once the collection is done, call {@link #isCached}. If
32
* this returns true, you can use {@link #replay} against a
33
* new collector. If it returns false, this means too much
34
* RAM was required and you must instead re-run the original
37
* <p><b>NOTE</b>: this class consumes 4 (or 8 bytes, if
38
* scoring is cached) per collected document. If the result
39
* set is large this can easily be a very substantial amount
42
* <p><b>NOTE</b>: this class caches at least 128 documents
43
* before checking RAM limits.
45
* <p>See the Lucene <tt>contrib/grouping</tt> module for more
46
* details including a full code example.</p>
48
* @lucene.experimental
50
public abstract class CachingCollector extends Collector {
52
// Max out at 512K arrays
53
private static final int MAX_ARRAY_SIZE = 512 * 1024;
54
private static final int INITIAL_ARRAY_SIZE = 128;
55
private final static int[] EMPTY_INT_ARRAY = new int[0];
57
private static class SegStart {
58
public final IndexReader reader;
59
public final int base;
62
public SegStart(IndexReader reader, int base, int end) {
69
private static final class CachedScorer extends Scorer {
71
// NOTE: these members are package-private b/c that way accessing them from
72
// the outer class does not incur access check by the JVM. The same
73
// situation would be if they were defined in the outer class as private
78
private CachedScorer() { super((Weight) null); }
81
public final float score() { return score; }
84
public final int advance(int target) { throw new UnsupportedOperationException(); }
87
public final int docID() { return doc; }
90
public final float freq() { throw new UnsupportedOperationException(); }
93
public final int nextDoc() { throw new UnsupportedOperationException(); }
96
// A CachingCollector which caches scores
97
private static final class ScoreCachingCollector extends CachingCollector {
99
private final CachedScorer cachedScorer;
100
private final List<float[]> cachedScores;
102
private Scorer scorer;
103
private float[] curScores;
105
ScoreCachingCollector(Collector other, double maxRAMMB) {
106
super(other, maxRAMMB, true);
108
cachedScorer = new CachedScorer();
109
cachedScores = new ArrayList<float[]>();
110
curScores = new float[128];
111
cachedScores.add(curScores);
114
ScoreCachingCollector(Collector other, int maxDocsToCache) {
115
super(other, maxDocsToCache);
117
cachedScorer = new CachedScorer();
118
cachedScores = new ArrayList<float[]>();
119
curScores = new float[INITIAL_ARRAY_SIZE];
120
cachedScores.add(curScores);
124
public void collect(int doc) throws IOException {
126
if (curDocs == null) {
127
// Cache was too large
128
cachedScorer.score = scorer.score();
129
cachedScorer.doc = doc;
134
// Allocate a bigger array or abort caching
135
if (upto == curDocs.length) {
138
// Compute next array length - don't allocate too big arrays
139
int nextLength = 8*curDocs.length;
140
if (nextLength > MAX_ARRAY_SIZE) {
141
nextLength = MAX_ARRAY_SIZE;
144
if (base + nextLength > maxDocsToCache) {
145
// try to allocate a smaller array
146
nextLength = maxDocsToCache - base;
147
if (nextLength <= 0) {
148
// Too many docs to collect -- clear cache
153
cachedScores.clear();
154
cachedScorer.score = scorer.score();
155
cachedScorer.doc = doc;
161
curDocs = new int[nextLength];
162
cachedDocs.add(curDocs);
163
curScores = new float[nextLength];
164
cachedScores.add(curScores);
169
cachedScorer.score = curScores[upto] = scorer.score();
171
cachedScorer.doc = doc;
176
public void replay(Collector other) throws IOException {
182
curDocs = EMPTY_INT_ARRAY;
183
for (SegStart seg : cachedSegs) {
184
other.setNextReader(seg.reader, seg.base);
185
other.setScorer(cachedScorer);
186
while (curBase + curUpto < seg.end) {
187
if (curUpto == curDocs.length) {
188
curBase += curDocs.length;
189
curDocs = cachedDocs.get(chunkUpto);
190
curScores = cachedScores.get(chunkUpto);
194
cachedScorer.score = curScores[curUpto];
195
cachedScorer.doc = curDocs[curUpto];
196
other.collect(curDocs[curUpto++]);
202
public void setScorer(Scorer scorer) throws IOException {
203
this.scorer = scorer;
204
other.setScorer(cachedScorer);
208
public String toString() {
210
return "CachingCollector (" + (base+upto) + " docs & scores cached)";
212
return "CachingCollector (cache was cleared)";
218
// A CachingCollector which does not cache scores
219
private static final class NoScoreCachingCollector extends CachingCollector {
221
NoScoreCachingCollector(Collector other, double maxRAMMB) {
222
super(other, maxRAMMB, false);
225
NoScoreCachingCollector(Collector other, int maxDocsToCache) {
226
super(other, maxDocsToCache);
230
public void collect(int doc) throws IOException {
232
if (curDocs == null) {
233
// Cache was too large
238
// Allocate a bigger array or abort caching
239
if (upto == curDocs.length) {
242
// Compute next array length - don't allocate too big arrays
243
int nextLength = 8*curDocs.length;
244
if (nextLength > MAX_ARRAY_SIZE) {
245
nextLength = MAX_ARRAY_SIZE;
248
if (base + nextLength > maxDocsToCache) {
249
// try to allocate a smaller array
250
nextLength = maxDocsToCache - base;
251
if (nextLength <= 0) {
252
// Too many docs to collect -- clear cache
261
curDocs = new int[nextLength];
262
cachedDocs.add(curDocs);
272
public void replay(Collector other) throws IOException {
278
curDocs = EMPTY_INT_ARRAY;
279
for (SegStart seg : cachedSegs) {
280
other.setNextReader(seg.reader, seg.base);
281
while (curbase + curUpto < seg.end) {
282
if (curUpto == curDocs.length) {
283
curbase += curDocs.length;
284
curDocs = cachedDocs.get(chunkUpto);
288
other.collect(curDocs[curUpto++]);
294
public void setScorer(Scorer scorer) throws IOException {
295
other.setScorer(scorer);
299
public String toString() {
301
return "CachingCollector (" + (base+upto) + " docs cached)";
303
return "CachingCollector (cache was cleared)";
309
// TODO: would be nice if a collector defined a
310
// needsScores() method so we can specialize / do checks
311
// up front. This is only relevant for the ScoreCaching
312
// version -- if the wrapped Collector does not need
313
// scores, it can avoid cachedScorer entirely.
314
protected final Collector other;
316
protected final int maxDocsToCache;
317
protected final List<SegStart> cachedSegs = new ArrayList<SegStart>();
318
protected final List<int[]> cachedDocs;
320
private IndexReader lastReader;
322
protected int[] curDocs;
325
protected int lastDocBase;
328
* Creates a {@link CachingCollector} which does not wrap another collector.
329
* The cached documents and scores can later be {@link #replay(Collector)
332
* @param acceptDocsOutOfOrder
333
* whether documents are allowed to be collected out-of-order
335
public static CachingCollector create(final boolean acceptDocsOutOfOrder, boolean cacheScores, double maxRAMMB) {
336
Collector other = new Collector() {
338
public boolean acceptsDocsOutOfOrder() {
339
return acceptDocsOutOfOrder;
343
public void setScorer(Scorer scorer) throws IOException {}
346
public void collect(int doc) throws IOException {}
349
public void setNextReader(IndexReader reader, int docBase) throws IOException {}
352
return create(other, cacheScores, maxRAMMB);
356
* Create a new {@link CachingCollector} that wraps the given collector and
357
* caches documents and scores up to the specified RAM threshold.
360
* the Collector to wrap and delegate calls to.
362
* whether to cache scores in addition to document IDs. Note that
363
* this increases the RAM consumed per doc
365
* the maximum RAM in MB to consume for caching the documents and
366
* scores. If the collector exceeds the threshold, no documents and
369
public static CachingCollector create(Collector other, boolean cacheScores, double maxRAMMB) {
370
return cacheScores ? new ScoreCachingCollector(other, maxRAMMB) : new NoScoreCachingCollector(other, maxRAMMB);
374
* Create a new {@link CachingCollector} that wraps the given collector and
375
* caches documents and scores up to the specified max docs threshold.
378
* the Collector to wrap and delegate calls to.
380
* whether to cache scores in addition to document IDs. Note that
381
* this increases the RAM consumed per doc
382
* @param maxDocsToCache
383
* the maximum number of documents for caching the documents and
384
* possible the scores. If the collector exceeds the threshold,
385
* no documents and scores are cached.
387
public static CachingCollector create(Collector other, boolean cacheScores, int maxDocsToCache) {
388
return cacheScores ? new ScoreCachingCollector(other, maxDocsToCache) : new NoScoreCachingCollector(other, maxDocsToCache);
391
// Prevent extension from non-internal classes
392
private CachingCollector(Collector other, double maxRAMMB, boolean cacheScores) {
395
cachedDocs = new ArrayList<int[]>();
396
curDocs = new int[INITIAL_ARRAY_SIZE];
397
cachedDocs.add(curDocs);
399
int bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT;
401
bytesPerDoc += RamUsageEstimator.NUM_BYTES_FLOAT;
403
maxDocsToCache = (int) ((maxRAMMB * 1024 * 1024) / bytesPerDoc);
406
private CachingCollector(Collector other, int maxDocsToCache) {
409
cachedDocs = new ArrayList<int[]>();
410
curDocs = new int[INITIAL_ARRAY_SIZE];
411
cachedDocs.add(curDocs);
412
this.maxDocsToCache = maxDocsToCache;
416
public boolean acceptsDocsOutOfOrder() {
417
return other.acceptsDocsOutOfOrder();
420
public boolean isCached() {
421
return curDocs != null;
425
public void setNextReader(IndexReader reader, int docBase) throws IOException {
426
other.setNextReader(reader, docBase);
427
if (lastReader != null) {
428
cachedSegs.add(new SegStart(lastReader, lastDocBase, base + upto));
430
lastDocBase = docBase;
434
/** Reused by the specialized inner classes. */
435
void replayInit(Collector other) {
437
throw new IllegalStateException("cannot replay: cache was cleared because too much RAM was required");
440
if (!other.acceptsDocsOutOfOrder() && this.other.acceptsDocsOutOfOrder()) {
441
throw new IllegalArgumentException(
442
"cannot replay: given collector does not support "
443
+ "out-of-order collection, while the wrapped collector does. "
444
+ "Therefore cached documents may be out-of-order.");
447
//System.out.println("CC: replay totHits=" + (upto + base));
448
if (lastReader != null) {
449
cachedSegs.add(new SegStart(lastReader, lastDocBase, base+upto));
455
* Replays the cached doc IDs (and scores) to the given Collector. If this
456
* instance does not cache scores, then Scorer is not set on
457
* {@code other.setScorer} as well as scores are not replayed.
459
* @throws IllegalStateException
460
* if this collector is not cached (i.e., if the RAM limits were too
461
* low for the number of documents + scores to cache).
462
* @throws IllegalArgumentException
463
* if the given Collect's does not support out-of-order collection,
464
* while the collector passed to the ctor does.
466
public abstract void replay(Collector other) throws IOException;