1
package org.apache.lucene.index;
4
* Licensed to the Apache Software Foundation (ASF) under one or more
5
* contributor license agreements. See the NOTICE file distributed with
6
* this work for additional information regarding copyright ownership.
7
* The ASF licenses this file to You under the Apache License, Version 2.0
8
* (the "License"); you may not use this file except in compliance with
9
* the License. You may obtain a copy of the License at
11
* http://www.apache.org/licenses/LICENSE-2.0
13
* Unless required by applicable law or agreed to in writing, software
14
* distributed under the License is distributed on an "AS IS" BASIS,
15
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
* See the License for the specific language governing permissions and
17
* limitations under the License.
20
import org.apache.lucene.document.Document;
21
import org.apache.lucene.document.FieldSelector;
22
import org.apache.lucene.document.FieldSelectorResult;
23
import org.apache.lucene.document.Fieldable;
24
import org.apache.lucene.util.MapBackedSet;
26
import java.io.IOException;
28
import java.util.concurrent.ConcurrentHashMap;
31
/** An IndexReader which reads multiple, parallel indexes. Each index added
32
* must have the same number of documents, but typically each contains
33
* different fields. Each document contains the union of the fields of all
34
* documents with the same document number. When searching, matches for a
35
* query term are from the first index added that has the field.
37
* <p>This is useful, e.g., with collections that have large fields which
38
* change rarely and small fields that change more frequently. The smaller
39
* fields may be re-indexed in a new index and both indexes may be searched
42
* <p><strong>Warning:</strong> It is up to you to make sure all indexes
43
* are created and modified the same way. For example, if you add
44
* documents to one index, you need to add the same documents in the
45
* same order to the other indexes. <em>Failure to do so will result in
46
* undefined behavior</em>.
48
public class ParallelReader extends IndexReader {
49
private List<IndexReader> readers = new ArrayList<IndexReader>();
50
private List<Boolean> decrefOnClose = new ArrayList<Boolean>(); // remember which subreaders to decRef on close
51
boolean incRefReaders = false;
52
private SortedMap<String,IndexReader> fieldToReader = new TreeMap<String,IndexReader>();
53
private Map<IndexReader,Collection<String>> readerToFields = new HashMap<IndexReader,Collection<String>>();
54
private List<IndexReader> storedFieldReaders = new ArrayList<IndexReader>();
58
private boolean hasDeletions;
60
/** Construct a ParallelReader.
61
* <p>Note that all subreaders are closed if this ParallelReader is closed.</p>
63
public ParallelReader() throws IOException { this(true); }
65
/** Construct a ParallelReader.
66
* @param closeSubReaders indicates whether the subreaders should be closed
67
* when this ParallelReader is closed
69
public ParallelReader(boolean closeSubReaders) throws IOException {
71
this.incRefReaders = !closeSubReaders;
72
readerFinishedListeners = new MapBackedSet<ReaderFinishedListener>(new ConcurrentHashMap<ReaderFinishedListener,Boolean>());
77
public String toString() {
78
final StringBuilder buffer = new StringBuilder("ParallelReader(");
79
final Iterator<IndexReader> iter = readers.iterator();
81
buffer.append(iter.next());
83
while (iter.hasNext()) {
84
buffer.append(", ").append(iter.next());
87
return buffer.toString();
90
/** Add an IndexReader.
91
* @throws IOException if there is a low-level IO error
93
public void add(IndexReader reader) throws IOException {
98
/** Add an IndexReader whose stored fields will not be returned. This can
99
* accelerate search when stored fields are only needed from a subset of
102
* @throws IllegalArgumentException if not all indexes contain the same number
104
* @throws IllegalArgumentException if not all indexes have the same value
105
* of {@link IndexReader#maxDoc()}
106
* @throws IOException if there is a low-level IO error
108
public void add(IndexReader reader, boolean ignoreStoredFields)
112
if (readers.size() == 0) {
113
this.maxDoc = reader.maxDoc();
114
this.numDocs = reader.numDocs();
115
this.hasDeletions = reader.hasDeletions();
118
if (reader.maxDoc() != maxDoc) // check compatibility
119
throw new IllegalArgumentException
120
("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
121
if (reader.numDocs() != numDocs)
122
throw new IllegalArgumentException
123
("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());
125
Collection<String> fields = reader.getFieldNames(IndexReader.FieldOption.ALL);
126
readerToFields.put(reader, fields);
127
for (final String field : fields) { // update fieldToReader map
128
if (fieldToReader.get(field) == null)
129
fieldToReader.put(field, reader);
132
if (!ignoreStoredFields)
133
storedFieldReaders.add(reader); // add to storedFieldReaders
139
decrefOnClose.add(Boolean.valueOf(incRefReaders));
143
public synchronized Object clone() {
144
// doReopen calls ensureOpen
146
return doReopen(true);
147
} catch (Exception ex) {
148
throw new RuntimeException(ex);
153
* Tries to reopen the subreaders.
155
* If one or more subreaders could be re-opened (i. e. subReader.reopen()
156
* returned a new instance != subReader), then a new ParallelReader instance
157
* is returned, otherwise null is returned.
159
* A re-opened instance might share one or more subreaders with the old
160
* instance. Index modification operations result in undefined behavior
161
* when performed before the old instance is closed.
162
* (see {@link IndexReader#openIfChanged}).
164
* If subreaders are shared, then the reference count of those
165
* readers is increased to ensure that the subreaders remain open
166
* until the last referring reader is closed.
168
* @throws CorruptIndexException if the index is corrupt
169
* @throws IOException if there is a low-level IO error
172
protected synchronized IndexReader doOpenIfChanged() throws CorruptIndexException, IOException {
173
// doReopen calls ensureOpen
174
return doReopen(false);
177
protected IndexReader doReopen(boolean doClone) throws CorruptIndexException, IOException {
180
boolean reopened = false;
181
List<IndexReader> newReaders = new ArrayList<IndexReader>();
183
boolean success = false;
186
for (final IndexReader oldReader : readers) {
187
IndexReader newReader = null;
189
newReader = (IndexReader) oldReader.clone();
192
newReader = IndexReader.openIfChanged(oldReader);
193
if (newReader != null) {
196
newReader = oldReader;
199
newReaders.add(newReader);
203
if (!success && reopened) {
204
for (int i = 0; i < newReaders.size(); i++) {
205
IndexReader r = newReaders.get(i);
206
if (r != readers.get(i)) {
209
} catch (IOException ignore) {
210
// keep going - we want to clean up as much as possible
218
List<Boolean> newDecrefOnClose = new ArrayList<Boolean>();
219
ParallelReader pr = new ParallelReader();
220
for (int i = 0; i < readers.size(); i++) {
221
IndexReader oldReader = readers.get(i);
222
IndexReader newReader = newReaders.get(i);
223
if (newReader == oldReader) {
224
newDecrefOnClose.add(Boolean.TRUE);
227
// this is a new subreader instance, so on close() we don't
228
// decRef but close it
229
newDecrefOnClose.add(Boolean.FALSE);
231
pr.add(newReader, !storedFieldReaders.contains(oldReader));
233
pr.decrefOnClose = newDecrefOnClose;
234
pr.incRefReaders = incRefReaders;
237
// No subreader was refreshed
244
public int numDocs() {
245
// Don't call ensureOpen() here (it could affect performance)
250
public int maxDoc() {
251
// Don't call ensureOpen() here (it could affect performance)
256
public boolean hasDeletions() {
261
// check first reader
263
public boolean isDeleted(int n) {
264
// Don't call ensureOpen() here (it could affect performance)
265
if (readers.size() > 0)
266
return readers.get(0).isDeleted(n);
270
// delete in all readers
272
protected void doDelete(int n) throws CorruptIndexException, IOException {
273
for (final IndexReader reader : readers) {
274
reader.deleteDocument(n);
279
// undeleteAll in all readers
281
protected void doUndeleteAll() throws CorruptIndexException, IOException {
282
for (final IndexReader reader : readers) {
283
reader.undeleteAll();
285
hasDeletions = false;
288
// append fields from storedFieldReaders
290
public Document document(int n, FieldSelector fieldSelector) throws CorruptIndexException, IOException {
292
Document result = new Document();
293
for (final IndexReader reader: storedFieldReaders) {
295
boolean include = (fieldSelector==null);
297
Collection<String> fields = readerToFields.get(reader);
298
for (final String field : fields)
299
if (fieldSelector.accept(field) != FieldSelectorResult.NO_LOAD) {
305
List<Fieldable> fields = reader.document(n, fieldSelector).getFields();
306
for (Fieldable field : fields) {
316
public TermFreqVector[] getTermFreqVectors(int n) throws IOException {
318
ArrayList<TermFreqVector> results = new ArrayList<TermFreqVector>();
319
for (final Map.Entry<String,IndexReader> e: fieldToReader.entrySet()) {
321
String field = e.getKey();
322
IndexReader reader = e.getValue();
323
TermFreqVector vector = reader.getTermFreqVector(n, field);
327
return results.toArray(new TermFreqVector[results.size()]);
331
public TermFreqVector getTermFreqVector(int n, String field)
334
IndexReader reader = fieldToReader.get(field);
335
return reader==null ? null : reader.getTermFreqVector(n, field);
340
public void getTermFreqVector(int docNumber, String field, TermVectorMapper mapper) throws IOException {
342
IndexReader reader = fieldToReader.get(field);
343
if (reader != null) {
344
reader.getTermFreqVector(docNumber, field, mapper);
349
public void getTermFreqVector(int docNumber, TermVectorMapper mapper) throws IOException {
352
for (final Map.Entry<String,IndexReader> e : fieldToReader.entrySet()) {
354
String field = e.getKey();
355
IndexReader reader = e.getValue();
356
reader.getTermFreqVector(docNumber, field, mapper);
362
public boolean hasNorms(String field) throws IOException {
364
IndexReader reader = fieldToReader.get(field);
365
return reader==null ? false : reader.hasNorms(field);
369
public byte[] norms(String field) throws IOException {
371
IndexReader reader = fieldToReader.get(field);
372
return reader==null ? null : reader.norms(field);
376
public void norms(String field, byte[] result, int offset)
379
IndexReader reader = fieldToReader.get(field);
381
reader.norms(field, result, offset);
385
protected void doSetNorm(int n, String field, byte value)
386
throws CorruptIndexException, IOException {
387
IndexReader reader = fieldToReader.get(field);
389
reader.doSetNorm(n, field, value);
393
public TermEnum terms() throws IOException {
395
return new ParallelTermEnum();
399
public TermEnum terms(Term term) throws IOException {
401
return new ParallelTermEnum(term);
405
public int docFreq(Term term) throws IOException {
407
IndexReader reader = fieldToReader.get(term.field());
408
return reader==null ? 0 : reader.docFreq(term);
412
public TermDocs termDocs(Term term) throws IOException {
414
return new ParallelTermDocs(term);
418
public TermDocs termDocs() throws IOException {
420
return new ParallelTermDocs();
424
public TermPositions termPositions(Term term) throws IOException {
426
return new ParallelTermPositions(term);
430
public TermPositions termPositions() throws IOException {
432
return new ParallelTermPositions();
436
* Checks recursively if all subreaders are up to date.
439
public boolean isCurrent() throws CorruptIndexException, IOException {
441
for (final IndexReader reader : readers) {
442
if (!reader.isCurrent()) {
447
// all subreaders are up to date
453
public boolean isOptimized() {
455
for (final IndexReader reader : readers) {
456
if (!reader.isOptimized()) {
461
// all subindexes are optimized
466
* @throws UnsupportedOperationException
469
public long getVersion() {
470
throw new UnsupportedOperationException("ParallelReader does not support this method.");
474
IndexReader[] getSubReaders() {
475
return readers.toArray(new IndexReader[readers.size()]);
479
protected void doCommit(Map<String,String> commitUserData) throws IOException {
480
for (final IndexReader reader : readers)
481
reader.commit(commitUserData);
485
protected synchronized void doClose() throws IOException {
486
for (int i = 0; i < readers.size(); i++) {
487
if (decrefOnClose.get(i).booleanValue()) {
488
readers.get(i).decRef();
490
readers.get(i).close();
496
public Collection<String> getFieldNames (IndexReader.FieldOption fieldNames) {
498
Set<String> fieldSet = new HashSet<String>();
499
for (final IndexReader reader : readers) {
500
Collection<String> names = reader.getFieldNames(fieldNames);
501
fieldSet.addAll(names);
506
private class ParallelTermEnum extends TermEnum {
507
private String field;
508
private Iterator<String> fieldIterator;
509
private TermEnum termEnum;
511
public ParallelTermEnum() throws IOException {
513
field = fieldToReader.firstKey();
514
} catch(NoSuchElementException e) {
515
// No fields, so keep field == null, termEnum == null
519
termEnum = fieldToReader.get(field).terms();
522
public ParallelTermEnum(Term term) throws IOException {
523
field = term.field();
524
IndexReader reader = fieldToReader.get(field);
526
termEnum = reader.terms(term);
530
public boolean next() throws IOException {
534
// another term in this field?
535
if (termEnum.next() && termEnum.term().field()==field)
536
return true; // yes, keep going
538
termEnum.close(); // close old termEnum
540
// find the next field with terms, if any
541
if (fieldIterator==null) {
542
fieldIterator = fieldToReader.tailMap(field).keySet().iterator();
543
fieldIterator.next(); // Skip field to get next one
545
while (fieldIterator.hasNext()) {
546
field = fieldIterator.next();
547
termEnum = fieldToReader.get(field).terms(new Term(field));
548
Term term = termEnum.term();
549
if (term!=null && term.field()==field)
555
return false; // no more fields
563
return termEnum.term();
567
public int docFreq() {
571
return termEnum.docFreq();
575
public void close() throws IOException {
581
// wrap a TermDocs in order to support seek(Term)
582
private class ParallelTermDocs implements TermDocs {
583
protected TermDocs termDocs;
585
public ParallelTermDocs() {}
586
public ParallelTermDocs(Term term) throws IOException {
588
termDocs = readers.isEmpty() ? null : readers.get(0).termDocs(null);
593
public int doc() { return termDocs.doc(); }
594
public int freq() { return termDocs.freq(); }
596
public void seek(Term term) throws IOException {
597
IndexReader reader = fieldToReader.get(term.field());
598
termDocs = reader!=null ? reader.termDocs(term) : null;
601
public void seek(TermEnum termEnum) throws IOException {
602
seek(termEnum.term());
605
public boolean next() throws IOException {
609
return termDocs.next();
612
public int read(final int[] docs, final int[] freqs) throws IOException {
616
return termDocs.read(docs, freqs);
619
public boolean skipTo(int target) throws IOException {
623
return termDocs.skipTo(target);
626
public void close() throws IOException {
633
private class ParallelTermPositions
634
extends ParallelTermDocs implements TermPositions {
636
public ParallelTermPositions() {}
637
public ParallelTermPositions(Term term) throws IOException { seek(term); }
640
public void seek(Term term) throws IOException {
641
IndexReader reader = fieldToReader.get(term.field());
642
termDocs = reader!=null ? reader.termPositions(term) : null;
645
public int nextPosition() throws IOException {
646
// It is an error to call this if there is no next position, e.g. if termDocs==null
647
return ((TermPositions)termDocs).nextPosition();
650
public int getPayloadLength() {
651
return ((TermPositions)termDocs).getPayloadLength();
654
public byte[] getPayload(byte[] data, int offset) throws IOException {
655
return ((TermPositions)termDocs).getPayload(data, offset);
659
// TODO: Remove warning after API has been finalized
660
public boolean isPayloadAvailable() {
661
return ((TermPositions) termDocs).isPayloadAvailable();
666
public void addReaderFinishedListener(ReaderFinishedListener listener) {
667
super.addReaderFinishedListener(listener);
668
for (IndexReader reader : readers) {
669
reader.addReaderFinishedListener(listener);
674
public void removeReaderFinishedListener(ReaderFinishedListener listener) {
675
super.removeReaderFinishedListener(listener);
676
for (IndexReader reader : readers) {
677
reader.removeReaderFinishedListener(listener);