1
package org.apache.lucene.index;
4
* Licensed to the Apache Software Foundation (ASF) under one or more
5
* contributor license agreements. See the NOTICE file distributed with
6
* this work for additional information regarding copyright ownership.
7
* The ASF licenses this file to You under the Apache License, Version 2.0
8
* (the "License"); you may not use this file except in compliance with
9
* the License. You may obtain a copy of the License at
11
* http://www.apache.org/licenses/LICENSE-2.0
13
* Unless required by applicable law or agreed to in writing, software
14
* distributed under the License is distributed on an "AS IS" BASIS,
15
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
* See the License for the specific language governing permissions and
17
* limitations under the License.
20
import org.apache.lucene.index.FieldInfo.IndexOptions;
21
import org.apache.lucene.util.UnicodeUtil;
23
import java.io.IOException;
24
import java.util.Collection;
26
import java.util.ArrayList;
27
import java.util.List;
28
import org.apache.lucene.util.BitVector;
29
import org.apache.lucene.util.CollectionUtil;
31
final class FreqProxTermsWriter extends TermsHashConsumer {
34
public TermsHashConsumerPerThread addThread(TermsHashPerThread perThread) {
35
return new FreqProxTermsWriterPerThread(perThread);
38
private static int compareText(final char[] text1, int pos1, final char[] text2, int pos2) {
40
final char c1 = text1[pos1++];
41
final char c2 = text2[pos2++];
45
else if (0xffff == c1)
49
} else if (0xffff == c1)
57
// TODO: would be nice to factor out more of this, eg the
58
// FreqProxFieldMergeState, and code to visit all Fields
59
// under the same FieldInfo together, up into TermsHash*.
60
// Other writers would presumably share alot of this...
63
public void flush(Map<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> threadsAndFields, final SegmentWriteState state) throws IOException {
65
// Gather all FieldData's that have postings, across all
67
List<FreqProxTermsWriterPerField> allFields = new ArrayList<FreqProxTermsWriterPerField>();
69
for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
71
Collection<TermsHashConsumerPerField> fields = entry.getValue();
73
for (final TermsHashConsumerPerField i : fields) {
74
final FreqProxTermsWriterPerField perField = (FreqProxTermsWriterPerField) i;
75
if (perField.termsHashPerField.numPostings > 0)
76
allFields.add(perField);
81
CollectionUtil.quickSort(allFields);
82
final int numAllFields = allFields.size();
84
// TODO: allow Lucene user to customize this consumer:
85
final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos);
88
FormatPostingsFieldsConsumer
89
-> IMPL: FormatPostingsFieldsWriter
90
-> FormatPostingsTermsConsumer
91
-> IMPL: FormatPostingsTermsWriter
92
-> FormatPostingsDocConsumer
93
-> IMPL: FormatPostingsDocWriter
94
-> FormatPostingsPositionsConsumer
95
-> IMPL: FormatPostingsPositionsWriter
99
while(start < numAllFields) {
100
final FieldInfo fieldInfo = allFields.get(start).fieldInfo;
101
final String fieldName = fieldInfo.name;
104
while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName))
107
FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start];
108
for(int i=start;i<end;i++) {
109
fields[i-start] = allFields.get(i);
111
// Aggregate the storePayload as seen by the same
112
// field across multiple threads
113
if (fieldInfo.indexOptions == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) {
114
fieldInfo.storePayloads |= fields[i-start].hasPayloads;
118
// If this field has postings then add them to the
120
appendPostings(fieldName, state, fields, consumer);
122
for(int i=0;i<fields.length;i++) {
123
TermsHashPerField perField = fields[i].termsHashPerField;
124
int numPostings = perField.numPostings;
126
perField.shrinkHash(numPostings);
133
for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
134
FreqProxTermsWriterPerThread perThread = (FreqProxTermsWriterPerThread) entry.getKey();
135
perThread.termsHashPerThread.reset(true);
142
private byte[] payloadBuffer;
144
/* Walk through all unique text tokens (Posting
145
* instances) found in this field and serialize them
146
* into a single RAM segment. */
147
void appendPostings(String fieldName, SegmentWriteState state,
148
FreqProxTermsWriterPerField[] fields,
149
FormatPostingsFieldsConsumer consumer)
150
throws CorruptIndexException, IOException {
152
int numFields = fields.length;
154
final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields];
156
for(int i=0;i<numFields;i++) {
157
FreqProxFieldMergeState fms = mergeStates[i] = new FreqProxFieldMergeState(fields[i]);
159
assert fms.field.fieldInfo == fields[0].fieldInfo;
161
// Should always be true
162
boolean result = fms.nextTerm();
166
final FormatPostingsTermsConsumer termsConsumer = consumer.addField(fields[0].fieldInfo);
167
final Term protoTerm = new Term(fieldName);
169
FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields];
171
final IndexOptions currentFieldIndexOptions = fields[0].fieldInfo.indexOptions;
173
final Map<Term,Integer> segDeletes;
174
if (state.segDeletes != null && state.segDeletes.terms.size() > 0) {
175
segDeletes = state.segDeletes.terms;
181
// TODO: really TermsHashPerField should take over most
182
// of this loop, including merge sort of terms from
183
// multiple threads and interacting with the
184
// TermsConsumer, only calling out to us (passing us the
185
// DocsConsumer) to handle delivery of docs/positions
186
while(numFields > 0) {
188
// Get the next term to merge
189
termStates[0] = mergeStates[0];
193
for(int i=1;i<numFields;i++) {
194
final char[] text = mergeStates[i].text;
195
final int textOffset = mergeStates[i].textOffset;
196
final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset);
199
termStates[0] = mergeStates[i];
202
termStates[numToMerge++] = mergeStates[i];
205
final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(termStates[0].text, termStates[0].textOffset);
207
final int delDocLimit;
208
if (segDeletes != null) {
209
final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(termStates[0].termText()));
210
if (docIDUpto != null) {
211
delDocLimit = docIDUpto;
220
// Now termStates has numToMerge FieldMergeStates
221
// which all share the same term. Now we must
222
// interleave the docID streams.
223
while(numToMerge > 0) {
225
FreqProxFieldMergeState minState = termStates[0];
226
for(int i=1;i<numToMerge;i++)
227
if (termStates[i].docID < minState.docID)
228
minState = termStates[i];
230
final int termDocFreq = minState.termFreq;
232
final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(minState.docID, termDocFreq);
234
// NOTE: we could check here if the docID was
235
// deleted, and skip it. However, this is somewhat
236
// dangerous because it can yield non-deterministic
237
// behavior since we may see the docID before we see
238
// the term that caused it to be deleted. This
239
// would mean some (but not all) of its postings may
240
// make it into the index, which'd alter the docFreq
241
// for those terms. We could fix this by doing two
242
// passes, ie first sweep marks all del docs, and
243
// 2nd sweep does the real flush, but I suspect
244
// that'd add too much time to flush.
246
if (minState.docID < delDocLimit) {
247
// Mark it deleted. TODO: we could also skip
248
// writing its postings; this would be
249
// deterministic (just for this Term's docs).
250
if (state.deletedDocs == null) {
251
state.deletedDocs = new BitVector(state.numDocs);
253
state.deletedDocs.set(minState.docID);
256
final ByteSliceReader prox = minState.prox;
258
// Carefully copy over the prox + payload info,
259
// changing the format to match Lucene's segment
261
if (currentFieldIndexOptions == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) {
262
// omitTermFreqAndPositions == false so we do write positions &
266
for(int j=0;j<termDocFreq;j++) {
267
final int code = prox.readVInt();
268
position += code >> 1;
270
final int payloadLength;
271
if ((code & 1) != 0) {
272
// This position has a payload
273
payloadLength = prox.readVInt();
275
if (payloadBuffer == null || payloadBuffer.length < payloadLength)
276
payloadBuffer = new byte[payloadLength];
278
prox.readBytes(payloadBuffer, 0, payloadLength);
283
posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);
286
posConsumer.finish();
290
if (!minState.nextDoc()) {
292
// Remove from termStates
294
for(int i=0;i<numToMerge;i++)
295
if (termStates[i] != minState)
296
termStates[upto++] = termStates[i];
298
assert upto == numToMerge;
300
// Advance this state to the next term
302
if (!minState.nextTerm()) {
303
// OK, no more terms, so remove from mergeStates
306
for(int i=0;i<numFields;i++)
307
if (mergeStates[i] != minState)
308
mergeStates[upto++] = mergeStates[i];
310
assert upto == numFields;
315
docConsumer.finish();
319
termsConsumer.finish();
323
final UnicodeUtil.UTF8Result termsUTF8 = new UnicodeUtil.UTF8Result();