~slub.team/goobi-indexserver/3.x

« back to all changes in this revision

Viewing changes to lucene/src/java/org/apache/lucene/index/DocumentsWriter.java

  • Committer: Sebastian Meyer
  • Date: 2012-08-03 09:12:40 UTC
  • Revision ID: sebastian.meyer@slub-dresden.de-20120803091240-x6861b0vabq1xror
Remove Lucene and Solr source code and add patches instead
Fix Bug #985487: Auto-suggestion for the search interface

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
package org.apache.lucene.index;
2
 
 
3
 
/**
4
 
 * Licensed to the Apache Software Foundation (ASF) under one or more
5
 
 * contributor license agreements.  See the NOTICE file distributed with
6
 
 * this work for additional information regarding copyright ownership.
7
 
 * The ASF licenses this file to You under the Apache License, Version 2.0
8
 
 * (the "License"); you may not use this file except in compliance with
9
 
 * the License.  You may obtain a copy of the License at
10
 
 *
11
 
 *     http://www.apache.org/licenses/LICENSE-2.0
12
 
 *
13
 
 * Unless required by applicable law or agreed to in writing, software
14
 
 * distributed under the License is distributed on an "AS IS" BASIS,
15
 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 
 * See the License for the specific language governing permissions and
17
 
 * limitations under the License.
18
 
 */
19
 
 
20
 
import java.io.IOException;
21
 
import java.io.PrintStream;
22
 
import java.text.NumberFormat;
23
 
import java.util.ArrayList;
24
 
import java.util.Collection;
25
 
import java.util.HashMap;
26
 
import java.util.HashSet;
27
 
import java.util.List;
28
 
import java.util.concurrent.atomic.AtomicLong;
29
 
 
30
 
import org.apache.lucene.analysis.Analyzer;
31
 
import org.apache.lucene.document.Document;
32
 
import org.apache.lucene.search.Query;
33
 
import org.apache.lucene.search.Similarity;
34
 
import org.apache.lucene.store.AlreadyClosedException;
35
 
import org.apache.lucene.store.Directory;
36
 
import org.apache.lucene.store.RAMFile;
37
 
import org.apache.lucene.util.ArrayUtil;
38
 
import org.apache.lucene.util.BitVector;
39
 
import org.apache.lucene.util.RamUsageEstimator;
40
 
import org.apache.lucene.util.ThreadInterruptedException;
41
 
 
42
 
 
43
 
/**
44
 
 * This class accepts multiple added documents and directly
45
 
 * writes a single segment file.  It does this more
46
 
 * efficiently than creating a single segment per document
47
 
 * (with DocumentWriter) and doing standard merges on those
48
 
 * segments.
49
 
 *
50
 
 * Each added document is passed to the {@link DocConsumer},
51
 
 * which in turn processes the document and interacts with
52
 
 * other consumers in the indexing chain.  Certain
53
 
 * consumers, like {@link StoredFieldsWriter} and {@link
54
 
 * TermVectorsTermsWriter}, digest a document and
55
 
 * immediately write bytes to the "doc store" files (ie,
56
 
 * they do not consume RAM per document, except while they
57
 
 * are processing the document).
58
 
 *
59
 
 * Other consumers, eg {@link FreqProxTermsWriter} and
60
 
 * {@link NormsWriter}, buffer bytes in RAM and flush only
61
 
 * when a new segment is produced.
62
 
 
63
 
 * Once we have used our allowed RAM buffer, or the number
64
 
 * of added docs is large enough (in the case we are
65
 
 * flushing by doc count instead of RAM usage), we create a
66
 
 * real segment and flush it to the Directory.
67
 
 *
68
 
 * Threads:
69
 
 *
70
 
 * Multiple threads are allowed into addDocument at once.
71
 
 * There is an initial synchronized call to getThreadState
72
 
 * which allocates a ThreadState for this thread.  The same
73
 
 * thread will get the same ThreadState over time (thread
74
 
 * affinity) so that if there are consistent patterns (for
75
 
 * example each thread is indexing a different content
76
 
 * source) then we make better use of RAM.  Then
77
 
 * processDocument is called on that ThreadState without
78
 
 * synchronization (most of the "heavy lifting" is in this
79
 
 * call).  Finally the synchronized "finishDocument" is
80
 
 * called to flush changes to the directory.
81
 
 *
82
 
 * When flush is called by IndexWriter we forcefully idle
83
 
 * all threads and flush only once they are all idle.  This
84
 
 * means you can call flush with a given thread even while
85
 
 * other threads are actively adding/deleting documents.
86
 
 *
87
 
 *
88
 
 * Exceptions:
89
 
 *
90
 
 * Because this class directly updates in-memory posting
91
 
 * lists, and flushes stored fields and term vectors
92
 
 * directly to files in the directory, there are certain
93
 
 * limited times when an exception can corrupt this state.
94
 
 * For example, a disk full while flushing stored fields
95
 
 * leaves this file in a corrupt state.  Or, an OOM
96
 
 * exception while appending to the in-memory posting lists
97
 
 * can corrupt that posting list.  We call such exceptions
98
 
 * "aborting exceptions".  In these cases we must call
99
 
 * abort() to discard all docs added since the last flush.
100
 
 *
101
 
 * All other exceptions ("non-aborting exceptions") can
102
 
 * still partially update the index structures.  These
103
 
 * updates are consistent, but, they represent only a part
104
 
 * of the document seen up until the exception was hit.
105
 
 * When this happens, we immediately mark the document as
106
 
 * deleted so that the document is always atomically ("all
107
 
 * or none") added to the index.
108
 
 */
109
 
 
110
 
final class DocumentsWriter {
111
 
  final AtomicLong bytesUsed = new AtomicLong(0);
112
 
  IndexWriter writer;
113
 
  Directory directory;
114
 
 
115
 
  String segment;                         // Current segment we are working on
116
 
 
117
 
  private int nextDocID;                  // Next docID to be added
118
 
  private int numDocs;                    // # of docs added, but not yet flushed
119
 
 
120
 
  // Max # ThreadState instances; if there are more threads
121
 
  // than this they share ThreadStates
122
 
  private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
123
 
  private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>();
124
 
 
125
 
  boolean bufferIsFull;                   // True when it's time to write segment
126
 
  private boolean aborting;               // True if an abort is pending
127
 
 
128
 
  PrintStream infoStream;
129
 
  int maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
130
 
  Similarity similarity;
131
 
 
132
 
  // max # simultaneous threads; if there are more than
133
 
  // this, they wait for others to finish first
134
 
  private final int maxThreadStates;
135
 
 
136
 
  // Deletes for our still-in-RAM (to be flushed next) segment
137
 
  private BufferedDeletes pendingDeletes = new BufferedDeletes();
138
 
  
139
 
  static class DocState {
140
 
    DocumentsWriter docWriter;
141
 
    Analyzer analyzer;
142
 
    int maxFieldLength;
143
 
    PrintStream infoStream;
144
 
    Similarity similarity;
145
 
    int docID;
146
 
    Document doc;
147
 
    String maxTermPrefix;
148
 
 
149
 
    // Only called by asserts
150
 
    public boolean testPoint(String name) {
151
 
      return docWriter.writer.testPoint(name);
152
 
    }
153
 
 
154
 
    public void clear() {
155
 
      // don't hold onto doc nor analyzer, in case it is
156
 
      // largish:
157
 
      doc = null;
158
 
      analyzer = null;
159
 
    }
160
 
  }
161
 
 
162
 
  /** Consumer returns this on each doc.  This holds any
163
 
   *  state that must be flushed synchronized "in docID
164
 
   *  order".  We gather these and flush them in order. */
165
 
  abstract static class DocWriter {
166
 
    DocWriter next;
167
 
    int docID;
168
 
    abstract void finish() throws IOException;
169
 
    abstract void abort();
170
 
    abstract long sizeInBytes();
171
 
 
172
 
    void setNext(DocWriter next) {
173
 
      this.next = next;
174
 
    }
175
 
  }
176
 
 
177
 
  /**
178
 
   * Create and return a new DocWriterBuffer.
179
 
   */
180
 
  PerDocBuffer newPerDocBuffer() {
181
 
    return new PerDocBuffer();
182
 
  }
183
 
 
184
 
  /**
185
 
   * RAMFile buffer for DocWriters.
186
 
   */
187
 
  class PerDocBuffer extends RAMFile {
188
 
    
189
 
    /**
190
 
     * Allocate bytes used from shared pool.
191
 
     */
192
 
    @Override
193
 
    protected byte[] newBuffer(int size) {
194
 
      assert size == PER_DOC_BLOCK_SIZE;
195
 
      return perDocAllocator.getByteBlock();
196
 
    }
197
 
    
198
 
    /**
199
 
     * Recycle the bytes used.
200
 
     */
201
 
    synchronized void recycle() {
202
 
      if (buffers.size() > 0) {
203
 
        setLength(0);
204
 
        
205
 
        // Recycle the blocks
206
 
        perDocAllocator.recycleByteBlocks(buffers);
207
 
        buffers.clear();
208
 
        sizeInBytes = 0;
209
 
        
210
 
        assert numBuffers() == 0;
211
 
      }
212
 
    }
213
 
  }
214
 
  
215
 
  /**
216
 
   * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
217
 
   * which returns the DocConsumer that the DocumentsWriter calls to process the
218
 
   * documents. 
219
 
   */
220
 
  abstract static class IndexingChain {
221
 
    abstract DocConsumer getChain(DocumentsWriter documentsWriter);
222
 
  }
223
 
  
224
 
  static final IndexingChain defaultIndexingChain = new IndexingChain() {
225
 
 
226
 
    @Override
227
 
    DocConsumer getChain(DocumentsWriter documentsWriter) {
228
 
      /*
229
 
      This is the current indexing chain:
230
 
 
231
 
      DocConsumer / DocConsumerPerThread
232
 
        --> code: DocFieldProcessor / DocFieldProcessorPerThread
233
 
          --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
234
 
            --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
235
 
              --> code: DocInverter / DocInverterPerThread / DocInverterPerField
236
 
                --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
237
 
                  --> code: TermsHash / TermsHashPerThread / TermsHashPerField
238
 
                    --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
239
 
                      --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
240
 
                      --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
241
 
                --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
242
 
                  --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
243
 
              --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
244
 
    */
245
 
 
246
 
    // Build up indexing chain:
247
 
 
248
 
      final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
249
 
      final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
250
 
 
251
 
      final InvertedDocConsumer  termsHash = new TermsHash(documentsWriter, true, freqProxWriter,
252
 
                                                           new TermsHash(documentsWriter, false, termVectorsWriter, null));
253
 
      final NormsWriter normsWriter = new NormsWriter();
254
 
      final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
255
 
      return new DocFieldProcessor(documentsWriter, docInverter);
256
 
    }
257
 
  };
258
 
 
259
 
  final DocConsumer consumer;
260
 
 
261
 
  // How much RAM we can use before flushing.  This is 0 if
262
 
  // we are flushing by doc count instead.
263
 
 
264
 
  private final IndexWriterConfig config;
265
 
 
266
 
  private boolean closed;
267
 
  private final FieldInfos fieldInfos;
268
 
 
269
 
  private final BufferedDeletesStream bufferedDeletesStream;
270
 
  private final IndexWriter.FlushControl flushControl;
271
 
 
272
 
  DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldInfos fieldInfos, BufferedDeletesStream bufferedDeletesStream) throws IOException {
273
 
    this.directory = directory;
274
 
    this.writer = writer;
275
 
    this.similarity = config.getSimilarity();
276
 
    this.maxThreadStates = config.getMaxThreadStates();
277
 
    this.fieldInfos = fieldInfos;
278
 
    this.bufferedDeletesStream = bufferedDeletesStream;
279
 
    flushControl = writer.flushControl;
280
 
 
281
 
    consumer = config.getIndexingChain().getChain(this);
282
 
    this.config = config;
283
 
  }
284
 
 
285
 
  // Buffer a specific docID for deletion.  Currently only
286
 
  // used when we hit a exception when adding a document
287
 
  synchronized void deleteDocID(int docIDUpto) {
288
 
    pendingDeletes.addDocID(docIDUpto);
289
 
    // NOTE: we do not trigger flush here.  This is
290
 
    // potentially a RAM leak, if you have an app that tries
291
 
    // to add docs but every single doc always hits a
292
 
    // non-aborting exception.  Allowing a flush here gets
293
 
    // very messy because we are only invoked when handling
294
 
    // exceptions so to do this properly, while handling an
295
 
    // exception we'd have to go off and flush new deletes
296
 
    // which is risky (likely would hit some other
297
 
    // confounding exception).
298
 
  }
299
 
  
300
 
  boolean deleteQueries(Query... queries) {
301
 
    final boolean doFlush = flushControl.waitUpdate(0, queries.length);
302
 
    synchronized(this) {
303
 
      for (Query query : queries) {
304
 
        pendingDeletes.addQuery(query, numDocs);
305
 
      }
306
 
    }
307
 
    return doFlush;
308
 
  }
309
 
  
310
 
  boolean deleteQuery(Query query) { 
311
 
    final boolean doFlush = flushControl.waitUpdate(0, 1);
312
 
    synchronized(this) {
313
 
      pendingDeletes.addQuery(query, numDocs);
314
 
    }
315
 
    return doFlush;
316
 
  }
317
 
  
318
 
  boolean deleteTerms(Term... terms) {
319
 
    final boolean doFlush = flushControl.waitUpdate(0, terms.length);
320
 
    synchronized(this) {
321
 
      for (Term term : terms) {
322
 
        pendingDeletes.addTerm(term, numDocs);
323
 
      }
324
 
    }
325
 
    return doFlush;
326
 
  }
327
 
 
328
 
  // TODO: we could check w/ FreqProxTermsWriter: if the
329
 
  // term doesn't exist, don't bother buffering into the
330
 
  // per-DWPT map (but still must go into the global map)
331
 
  boolean deleteTerm(Term term, boolean skipWait) {
332
 
    final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
333
 
    synchronized(this) {
334
 
      pendingDeletes.addTerm(term, numDocs);
335
 
    }
336
 
    return doFlush;
337
 
  }
338
 
 
339
 
  public FieldInfos getFieldInfos() {
340
 
    return fieldInfos;
341
 
  }
342
 
 
343
 
  /** If non-null, various details of indexing are printed
344
 
   *  here. */
345
 
  synchronized void setInfoStream(PrintStream infoStream) {
346
 
    this.infoStream = infoStream;
347
 
    for(int i=0;i<threadStates.length;i++) {
348
 
      threadStates[i].docState.infoStream = infoStream;
349
 
    }
350
 
  }
351
 
 
352
 
  synchronized void setMaxFieldLength(int maxFieldLength) {
353
 
    this.maxFieldLength = maxFieldLength;
354
 
    for(int i=0;i<threadStates.length;i++) {
355
 
      threadStates[i].docState.maxFieldLength = maxFieldLength;
356
 
    }
357
 
  }
358
 
 
359
 
  synchronized void setSimilarity(Similarity similarity) {
360
 
    this.similarity = similarity;
361
 
    for(int i=0;i<threadStates.length;i++) {
362
 
      threadStates[i].docState.similarity = similarity;
363
 
    }
364
 
  }
365
 
 
366
 
  /** Get current segment name we are writing. */
367
 
  synchronized String getSegment() {
368
 
    return segment;
369
 
  }
370
 
 
371
 
  /** Returns how many docs are currently buffered in RAM. */
372
 
  synchronized int getNumDocs() {
373
 
    return numDocs;
374
 
  }
375
 
 
376
 
  void message(String message) {
377
 
    if (infoStream != null) {
378
 
      writer.message("DW: " + message);
379
 
    }
380
 
  }
381
 
 
382
 
  synchronized void setAborting() {
383
 
    if (infoStream != null) {
384
 
      message("setAborting");
385
 
    }
386
 
    aborting = true;
387
 
  }
388
 
 
389
 
  /** Called if we hit an exception at a bad time (when
390
 
   *  updating the index files) and must discard all
391
 
   *  currently buffered docs.  This resets our state,
392
 
   *  discarding any docs added since last flush. */
393
 
  synchronized void abort() throws IOException {
394
 
    if (infoStream != null) {
395
 
      message("docWriter: abort");
396
 
    }
397
 
 
398
 
    boolean success = false;
399
 
 
400
 
    try {
401
 
 
402
 
      // Forcefully remove waiting ThreadStates from line
403
 
      try {
404
 
        waitQueue.abort();
405
 
      } catch (Throwable t) {
406
 
      }
407
 
 
408
 
      // Wait for all other threads to finish with
409
 
      // DocumentsWriter:
410
 
      try {
411
 
        waitIdle();
412
 
      } finally {
413
 
        if (infoStream != null) {
414
 
          message("docWriter: abort waitIdle done");
415
 
        }
416
 
        
417
 
        assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting;
418
 
        waitQueue.waitingBytes = 0;
419
 
        
420
 
        pendingDeletes.clear();
421
 
        
422
 
        for (DocumentsWriterThreadState threadState : threadStates) {
423
 
          try {
424
 
            threadState.consumer.abort();
425
 
          } catch (Throwable t) {
426
 
          }
427
 
        }
428
 
          
429
 
        try {
430
 
          consumer.abort();
431
 
        } catch (Throwable t) {
432
 
        }
433
 
        
434
 
        // Reset all postings data
435
 
        doAfterFlush();
436
 
      }
437
 
 
438
 
      success = true;
439
 
    } finally {
440
 
      aborting = false;
441
 
      notifyAll();
442
 
      if (infoStream != null) {
443
 
        message("docWriter: done abort; success=" + success);
444
 
      }
445
 
    }
446
 
  }
447
 
 
448
 
  /** Reset after a flush */
449
 
  private void doAfterFlush() throws IOException {
450
 
    // All ThreadStates should be idle when we are called
451
 
    assert allThreadsIdle();
452
 
    threadBindings.clear();
453
 
    waitQueue.reset();
454
 
    segment = null;
455
 
    numDocs = 0;
456
 
    nextDocID = 0;
457
 
    bufferIsFull = false;
458
 
    for(int i=0;i<threadStates.length;i++) {
459
 
      threadStates[i].doAfterFlush();
460
 
    }
461
 
  }
462
 
 
463
 
  private synchronized boolean allThreadsIdle() {
464
 
    for(int i=0;i<threadStates.length;i++) {
465
 
      if (!threadStates[i].isIdle) {
466
 
        return false;
467
 
      }
468
 
    }
469
 
    return true;
470
 
  }
471
 
 
472
 
  synchronized boolean anyChanges() {
473
 
    return numDocs != 0 || pendingDeletes.any();
474
 
  }
475
 
 
476
 
  // for testing
477
 
  public BufferedDeletes getPendingDeletes() {
478
 
    return pendingDeletes;
479
 
  }
480
 
 
481
 
  private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
482
 
    // Lock order: DW -> BD
483
 
    final long delGen = bufferedDeletesStream.getNextGen();
484
 
    if (pendingDeletes.any()) {
485
 
      if (segmentInfos.size() > 0 || newSegment != null) {
486
 
        final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
487
 
        if (infoStream != null) {
488
 
          message("flush: push buffered deletes startSize=" + pendingDeletes.bytesUsed.get() + " frozenSize=" + packet.bytesUsed);
489
 
        }
490
 
        bufferedDeletesStream.push(packet);
491
 
        if (infoStream != null) {
492
 
          message("flush: delGen=" + packet.gen);
493
 
        }
494
 
        if (newSegment != null) {
495
 
          newSegment.setBufferedDeletesGen(packet.gen);
496
 
        }
497
 
      } else {
498
 
        if (infoStream != null) {
499
 
          message("flush: drop buffered deletes: no segments");
500
 
        }
501
 
        // We can safely discard these deletes: since
502
 
        // there are no segments, the deletions cannot
503
 
        // affect anything.
504
 
      }
505
 
      pendingDeletes.clear();
506
 
    } else if (newSegment != null) {
507
 
      newSegment.setBufferedDeletesGen(delGen);
508
 
    }
509
 
  }
510
 
 
511
 
  public boolean anyDeletions() {
512
 
    return pendingDeletes.any();
513
 
  }
514
 
 
515
 
  /** Flush all pending docs to a new segment */
516
 
  // Lock order: IW -> DW
517
 
  synchronized SegmentInfo flush(IndexWriter writer, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
518
 
 
519
 
    final long startTime = System.currentTimeMillis();
520
 
 
521
 
    // We change writer's segmentInfos:
522
 
    assert Thread.holdsLock(writer);
523
 
 
524
 
    waitIdle();
525
 
 
526
 
    if (numDocs == 0) {
527
 
      // nothing to do!
528
 
      if (infoStream != null) {
529
 
        message("flush: no docs; skipping");
530
 
      }
531
 
      // Lock order: IW -> DW -> BD
532
 
      pushDeletes(null, segmentInfos);
533
 
      return null;
534
 
    }
535
 
 
536
 
    if (aborting) {
537
 
      if (infoStream != null) {
538
 
        message("flush: skip because aborting is set");
539
 
      }
540
 
      return null;
541
 
    }
542
 
 
543
 
    boolean success = false;
544
 
 
545
 
    SegmentInfo newSegment;
546
 
 
547
 
    try {
548
 
      //System.out.println(Thread.currentThread().getName() + ": nw=" + waitQueue.numWaiting);
549
 
      assert nextDocID == numDocs: "nextDocID=" + nextDocID + " numDocs=" + numDocs;
550
 
      assert waitQueue.numWaiting == 0: "numWaiting=" + waitQueue.numWaiting;
551
 
      assert waitQueue.waitingBytes == 0;
552
 
 
553
 
      if (infoStream != null) {
554
 
        message("flush postings as segment " + segment + " numDocs=" + numDocs);
555
 
      }
556
 
 
557
 
      final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
558
 
                                                                 numDocs, writer.getConfig().getTermIndexInterval(),
559
 
                                                                 pendingDeletes);
560
 
      // Apply delete-by-docID now (delete-byDocID only
561
 
      // happens when an exception is hit processing that
562
 
      // doc, eg if analyzer has some problem w/ the text):
563
 
      if (pendingDeletes.docIDs.size() > 0) {
564
 
        flushState.deletedDocs = new BitVector(numDocs);
565
 
        for(int delDocID : pendingDeletes.docIDs) {
566
 
          flushState.deletedDocs.set(delDocID);
567
 
        }
568
 
        pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
569
 
        pendingDeletes.docIDs.clear();
570
 
      }
571
 
 
572
 
      newSegment = new SegmentInfo(segment, numDocs, directory, false, true, fieldInfos.hasProx(), false);
573
 
 
574
 
      Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
575
 
      for (DocumentsWriterThreadState threadState : threadStates) {
576
 
        threads.add(threadState.consumer);
577
 
      }
578
 
 
579
 
      double startMBUsed = bytesUsed()/1024./1024.;
580
 
 
581
 
      consumer.flush(threads, flushState);
582
 
 
583
 
      newSegment.setHasVectors(flushState.hasVectors);
584
 
 
585
 
      if (infoStream != null) {
586
 
        message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
587
 
        if (flushState.deletedDocs != null) {
588
 
          message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
589
 
        }
590
 
        message("flushedFiles=" + newSegment.files());
591
 
      }
592
 
 
593
 
      if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
594
 
        final String cfsFileName = IndexFileNames.segmentFileName(segment, IndexFileNames.COMPOUND_FILE_EXTENSION);
595
 
 
596
 
        if (infoStream != null) {
597
 
          message("flush: create compound file \"" + cfsFileName + "\"");
598
 
        }
599
 
 
600
 
        CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
601
 
        for(String fileName : newSegment.files()) {
602
 
          cfsWriter.addFile(fileName);
603
 
        }
604
 
        cfsWriter.close();
605
 
        deleter.deleteNewFiles(newSegment.files());
606
 
        newSegment.setUseCompoundFile(true);
607
 
      }
608
 
 
609
 
      // Must write deleted docs after the CFS so we don't
610
 
      // slurp the del file into CFS:
611
 
      if (flushState.deletedDocs != null) {
612
 
        final int delCount = flushState.deletedDocs.count();
613
 
        assert delCount > 0;
614
 
        newSegment.setDelCount(delCount);
615
 
        newSegment.advanceDelGen();
616
 
        final String delFileName = newSegment.getDelFileName();
617
 
        if (infoStream != null) {
618
 
          message("flush: write " + delCount + " deletes to " + delFileName);
619
 
        }
620
 
        boolean success2 = false;
621
 
        try {
622
 
          // TODO: in the NRT case it'd be better to hand
623
 
          // this del vector over to the
624
 
          // shortly-to-be-opened SegmentReader and let it
625
 
          // carry the changes; there's no reason to use
626
 
          // filesystem as intermediary here.
627
 
          flushState.deletedDocs.write(directory, delFileName);
628
 
          success2 = true;
629
 
        } finally {
630
 
          if (!success2) {
631
 
            try {
632
 
              directory.deleteFile(delFileName);
633
 
            } catch (Throwable t) {
634
 
              // suppress this so we keep throwing the
635
 
              // original exception
636
 
            }
637
 
          }
638
 
        }
639
 
      }
640
 
 
641
 
      if (infoStream != null) {
642
 
        message("flush: segment=" + newSegment);
643
 
        final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;
644
 
        final double newSegmentSize = newSegment.sizeInBytes(true)/1024./1024.;
645
 
        message("  ramUsed=" + nf.format(startMBUsed) + " MB" +
646
 
                " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
647
 
                " (" + nf.format(newSegmentSizeNoStore) + " MB w/o doc stores)" +
648
 
                " docs/MB=" + nf.format(numDocs / newSegmentSize) +
649
 
                " new/old=" + nf.format(100.0 * newSegmentSizeNoStore / startMBUsed) + "%");
650
 
      }
651
 
 
652
 
      success = true;
653
 
    } finally {
654
 
      notifyAll();
655
 
      if (!success) {
656
 
        if (segment != null) {
657
 
          deleter.refresh(segment);
658
 
        }
659
 
        abort();
660
 
      }
661
 
    }
662
 
 
663
 
    doAfterFlush();
664
 
 
665
 
    // Lock order: IW -> DW -> BD
666
 
    pushDeletes(newSegment, segmentInfos);
667
 
    if (infoStream != null) {
668
 
      message("flush time " + (System.currentTimeMillis()-startTime) + " msec");
669
 
    }
670
 
 
671
 
    return newSegment;
672
 
  }
673
 
 
674
 
  synchronized void close() {
675
 
    closed = true;
676
 
    notifyAll();
677
 
  }
678
 
 
679
 
  /** Returns a free (idle) ThreadState that may be used for
680
 
   * indexing this one document.  This call also pauses if a
681
 
   * flush is pending.  If delTerm is non-null then we
682
 
   * buffer this deleted term after the thread state has
683
 
   * been acquired. */
684
 
  synchronized DocumentsWriterThreadState getThreadState(Term delTerm, int docCount) throws IOException {
685
 
 
686
 
    final Thread currentThread = Thread.currentThread();
687
 
    assert !Thread.holdsLock(writer);
688
 
 
689
 
    // First, find a thread state.  If this thread already
690
 
    // has affinity to a specific ThreadState, use that one
691
 
    // again.
692
 
    DocumentsWriterThreadState state = threadBindings.get(currentThread);
693
 
    if (state == null) {
694
 
 
695
 
      // First time this thread has called us since last
696
 
      // flush.  Find the least loaded thread state:
697
 
      DocumentsWriterThreadState minThreadState = null;
698
 
      for(int i=0;i<threadStates.length;i++) {
699
 
        DocumentsWriterThreadState ts = threadStates[i];
700
 
        if (minThreadState == null || ts.numThreads < minThreadState.numThreads) {
701
 
          minThreadState = ts;
702
 
        }
703
 
      }
704
 
      if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= maxThreadStates)) {
705
 
        state = minThreadState;
706
 
        state.numThreads++;
707
 
      } else {
708
 
        // Just create a new "private" thread state
709
 
        DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length];
710
 
        if (threadStates.length > 0) {
711
 
          System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);
712
 
        }
713
 
        state = newArray[threadStates.length] = new DocumentsWriterThreadState(this);
714
 
        threadStates = newArray;
715
 
      }
716
 
      threadBindings.put(currentThread, state);
717
 
    }
718
 
 
719
 
    // Next, wait until my thread state is idle (in case
720
 
    // it's shared with other threads), and no flush/abort
721
 
    // pending 
722
 
    waitReady(state);
723
 
 
724
 
    // Allocate segment name if this is the first doc since
725
 
    // last flush:
726
 
    if (segment == null) {
727
 
      segment = writer.newSegmentName();
728
 
      assert numDocs == 0;
729
 
    }
730
 
 
731
 
    state.docState.docID = nextDocID;
732
 
    nextDocID += docCount;
733
 
 
734
 
    if (delTerm != null) {
735
 
      pendingDeletes.addTerm(delTerm, state.docState.docID);
736
 
    }
737
 
 
738
 
    numDocs += docCount;
739
 
    state.isIdle = false;
740
 
    return state;
741
 
  }
742
 
  
743
 
  boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
744
 
    return updateDocument(doc, analyzer, null);
745
 
  }
746
 
  
747
 
  boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
748
 
    throws CorruptIndexException, IOException {
749
 
 
750
 
    // Possibly trigger a flush, or wait until any running flush completes:
751
 
    boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);
752
 
 
753
 
    // This call is synchronized but fast
754
 
    final DocumentsWriterThreadState state = getThreadState(delTerm, 1);
755
 
 
756
 
    final DocState docState = state.docState;
757
 
    docState.doc = doc;
758
 
    docState.analyzer = analyzer;
759
 
 
760
 
    boolean success = false;
761
 
    try {
762
 
      // This call is not synchronized and does all the
763
 
      // work
764
 
      final DocWriter perDoc;
765
 
      try {
766
 
        perDoc = state.consumer.processDocument();
767
 
      } finally {
768
 
        docState.clear();
769
 
      }
770
 
 
771
 
      // This call is synchronized but fast
772
 
      finishDocument(state, perDoc);
773
 
 
774
 
      success = true;
775
 
    } finally {
776
 
      if (!success) {
777
 
 
778
 
        // If this thread state had decided to flush, we
779
 
        // must clear it so another thread can flush
780
 
        if (doFlush) {
781
 
          flushControl.clearFlushPending();
782
 
        }
783
 
 
784
 
        if (infoStream != null) {
785
 
          message("exception in updateDocument aborting=" + aborting);
786
 
        }
787
 
 
788
 
        synchronized(this) {
789
 
 
790
 
          state.isIdle = true;
791
 
          notifyAll();
792
 
            
793
 
          if (aborting) {
794
 
            abort();
795
 
          } else {
796
 
            skipDocWriter.docID = docState.docID;
797
 
            boolean success2 = false;
798
 
            try {
799
 
              waitQueue.add(skipDocWriter);
800
 
              success2 = true;
801
 
            } finally {
802
 
              if (!success2) {
803
 
                abort();
804
 
                return false;
805
 
              }
806
 
            }
807
 
 
808
 
            // Immediately mark this document as deleted
809
 
            // since likely it was partially added.  This
810
 
            // keeps indexing as "all or none" (atomic) when
811
 
            // adding a document:
812
 
            deleteDocID(state.docState.docID);
813
 
          }
814
 
        }
815
 
      }
816
 
    }
817
 
 
818
 
    doFlush |= flushControl.flushByRAMUsage("new document");
819
 
 
820
 
    return doFlush;
821
 
  }
822
 
 
823
 
  boolean updateDocuments(Collection<Document> docs, Analyzer analyzer, Term delTerm)
824
 
    throws CorruptIndexException, IOException {
825
 
 
826
 
    // Possibly trigger a flush, or wait until any running flush completes:
827
 
    boolean doFlush = flushControl.waitUpdate(docs.size(), delTerm != null ? 1 : 0);
828
 
 
829
 
    final int docCount = docs.size();
830
 
 
831
 
    // This call is synchronized but fast -- we allocate the
832
 
    // N docIDs up front:
833
 
    final DocumentsWriterThreadState state = getThreadState(null, docCount);
834
 
    final DocState docState = state.docState;
835
 
 
836
 
    final int startDocID = docState.docID;
837
 
    int docID = startDocID;
838
 
 
839
 
    //System.out.println(Thread.currentThread().getName() + ": A " + docCount);
840
 
    for(Document doc : docs) {
841
 
      docState.doc = doc;
842
 
      docState.analyzer = analyzer;
843
 
      // Assign next docID from our block:
844
 
      docState.docID = docID++;
845
 
      
846
 
      boolean success = false;
847
 
      try {
848
 
        // This call is not synchronized and does all the
849
 
        // work
850
 
        final DocWriter perDoc;
851
 
        try {
852
 
          perDoc = state.consumer.processDocument();
853
 
        } finally {
854
 
          docState.clear();
855
 
        }
856
 
 
857
 
        // Must call this w/o holding synchronized(this) else
858
 
        // we'll hit deadlock:
859
 
        balanceRAM();
860
 
 
861
 
        // Synchronized but fast
862
 
        synchronized(this) {
863
 
          if (aborting) {
864
 
            break;
865
 
          }
866
 
          assert perDoc == null || perDoc.docID == docState.docID;
867
 
          final boolean doPause;
868
 
          if (perDoc != null) {
869
 
            waitQueue.add(perDoc);
870
 
          } else {
871
 
            skipDocWriter.docID = docState.docID;
872
 
            waitQueue.add(skipDocWriter);
873
 
          }
874
 
        }
875
 
 
876
 
        success = true;
877
 
      } finally {
878
 
        if (!success) {
879
 
          //System.out.println(Thread.currentThread().getName() + ": E");
880
 
 
881
 
          // If this thread state had decided to flush, we
882
 
          // must clear it so another thread can flush
883
 
          if (doFlush) {
884
 
            message("clearFlushPending!");
885
 
            flushControl.clearFlushPending();
886
 
          }
887
 
 
888
 
          if (infoStream != null) {
889
 
            message("exception in updateDocuments aborting=" + aborting);
890
 
          }
891
 
 
892
 
          synchronized(this) {
893
 
 
894
 
            state.isIdle = true;
895
 
            notifyAll();
896
 
 
897
 
            if (aborting) {
898
 
              abort();
899
 
            } else {
900
 
 
901
 
              // Fill hole in the doc stores for all
902
 
              // docIDs we pre-allocated
903
 
              //System.out.println(Thread.currentThread().getName() + ": F " + docCount);
904
 
              final int endDocID = startDocID + docCount;
905
 
              docID = docState.docID;
906
 
              while(docID < endDocID) {
907
 
                skipDocWriter.docID = docID++;
908
 
                boolean success2 = false;
909
 
                try {
910
 
                  waitQueue.add(skipDocWriter);
911
 
                  success2 = true;
912
 
                } finally {
913
 
                  if (!success2) {
914
 
                    abort();
915
 
                    return false;
916
 
                  }
917
 
                }
918
 
              }
919
 
              //System.out.println(Thread.currentThread().getName() + ":   F " + docCount + " done");
920
 
 
921
 
              // Mark all pre-allocated docIDs as deleted:
922
 
              docID = startDocID;
923
 
              while(docID < startDocID + docs.size()) {
924
 
                deleteDocID(docID++);
925
 
              }
926
 
            }
927
 
          }
928
 
        }
929
 
      }
930
 
    }
931
 
 
932
 
    synchronized(this) {
933
 
      // We must delay pausing until the full doc block is
934
 
      // added, else we can hit deadlock if more than one
935
 
      // thread is adding a block and we need to pause when
936
 
      // both are only part way done:
937
 
      if (waitQueue.doPause()) {
938
 
        waitForWaitQueue();
939
 
      }
940
 
 
941
 
      //System.out.println(Thread.currentThread().getName() + ":   A " + docCount);
942
 
 
943
 
      if (aborting) {
944
 
 
945
 
        // We are currently aborting, and another thread is
946
 
        // waiting for me to become idle.  We just forcefully
947
 
        // idle this threadState; it will be fully reset by
948
 
        // abort()
949
 
        state.isIdle = true;
950
 
 
951
 
        // wakes up any threads waiting on the wait queue
952
 
        notifyAll();
953
 
 
954
 
        abort();
955
 
 
956
 
        if (doFlush) {
957
 
          message("clearFlushPending!");
958
 
          flushControl.clearFlushPending();
959
 
        }
960
 
 
961
 
        return false;
962
 
      }
963
 
 
964
 
      // Apply delTerm only after all indexing has
965
 
      // succeeded, but apply it only to docs prior to when
966
 
      // this batch started:
967
 
      if (delTerm != null) {
968
 
        pendingDeletes.addTerm(delTerm, startDocID);
969
 
      }
970
 
 
971
 
      state.isIdle = true;
972
 
 
973
 
      // wakes up any threads waiting on the wait queue
974
 
      notifyAll();
975
 
    }
976
 
 
977
 
    doFlush |= flushControl.flushByRAMUsage("new document");
978
 
 
979
 
    //System.out.println(Thread.currentThread().getName() + ":   B " + docCount);
980
 
    return doFlush;
981
 
  }
982
 
 
983
 
  public synchronized void waitIdle() {
984
 
    while (!allThreadsIdle()) {
985
 
      try {
986
 
        wait();
987
 
      } catch (InterruptedException ie) {
988
 
        throw new ThreadInterruptedException(ie);
989
 
      }
990
 
    }
991
 
  }
992
 
 
993
 
  synchronized void waitReady(DocumentsWriterThreadState state) {
994
 
    while (!closed && (!state.isIdle || aborting)) {
995
 
      try {
996
 
        wait();
997
 
      } catch (InterruptedException ie) {
998
 
        throw new ThreadInterruptedException(ie);
999
 
      }
1000
 
    }
1001
 
 
1002
 
    if (closed) {
1003
 
      throw new AlreadyClosedException("this IndexWriter is closed");
1004
 
    }
1005
 
  }
1006
 
 
1007
 
  /** Does the synchronized work to finish/flush the
1008
 
   *  inverted document. */
1009
 
  private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {
1010
 
 
1011
 
    // Must call this w/o holding synchronized(this) else
1012
 
    // we'll hit deadlock:
1013
 
    balanceRAM();
1014
 
 
1015
 
    synchronized(this) {
1016
 
 
1017
 
      assert docWriter == null || docWriter.docID == perThread.docState.docID;
1018
 
 
1019
 
      if (aborting) {
1020
 
 
1021
 
        // We are currently aborting, and another thread is
1022
 
        // waiting for me to become idle.  We just forcefully
1023
 
        // idle this threadState; it will be fully reset by
1024
 
        // abort()
1025
 
        if (docWriter != null) {
1026
 
          try {
1027
 
            docWriter.abort();
1028
 
          } catch (Throwable t) {
1029
 
          }
1030
 
        }
1031
 
 
1032
 
        perThread.isIdle = true;
1033
 
 
1034
 
        // wakes up any threads waiting on the wait queue
1035
 
        notifyAll();
1036
 
 
1037
 
        return;
1038
 
      }
1039
 
 
1040
 
      final boolean doPause;
1041
 
 
1042
 
      if (docWriter != null) {
1043
 
        doPause = waitQueue.add(docWriter);
1044
 
      } else {
1045
 
        skipDocWriter.docID = perThread.docState.docID;
1046
 
        doPause = waitQueue.add(skipDocWriter);
1047
 
      }
1048
 
 
1049
 
      if (doPause) {
1050
 
        waitForWaitQueue();
1051
 
      }
1052
 
 
1053
 
      perThread.isIdle = true;
1054
 
 
1055
 
      // wakes up any threads waiting on the wait queue
1056
 
      notifyAll();
1057
 
    }
1058
 
  }
1059
 
 
1060
 
  synchronized void waitForWaitQueue() {
1061
 
    do {
1062
 
      try {
1063
 
        wait();
1064
 
      } catch (InterruptedException ie) {
1065
 
        throw new ThreadInterruptedException(ie);
1066
 
      }
1067
 
    } while (!waitQueue.doResume());
1068
 
  }
1069
 
 
1070
 
  private static class SkipDocWriter extends DocWriter {
1071
 
    @Override
1072
 
    void finish() {
1073
 
    }
1074
 
    @Override
1075
 
    void abort() {
1076
 
    }
1077
 
    @Override
1078
 
    long sizeInBytes() {
1079
 
      return 0;
1080
 
    }
1081
 
  }
1082
 
  final SkipDocWriter skipDocWriter = new SkipDocWriter();
1083
 
 
1084
 
  NumberFormat nf = NumberFormat.getInstance();
1085
 
 
1086
 
  /* Initial chunks size of the shared byte[] blocks used to
1087
 
     store postings data */
1088
 
  final static int BYTE_BLOCK_SHIFT = 15;
1089
 
  final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
1090
 
  final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
1091
 
  final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
1092
 
 
1093
 
  private class ByteBlockAllocator extends ByteBlockPool.Allocator {
1094
 
    final int blockSize;
1095
 
 
1096
 
    ByteBlockAllocator(int blockSize) {
1097
 
      this.blockSize = blockSize;
1098
 
    }
1099
 
 
1100
 
    ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();
1101
 
    
1102
 
    /* Allocate another byte[] from the shared pool */
1103
 
    @Override
1104
 
    byte[] getByteBlock() {
1105
 
      synchronized(DocumentsWriter.this) {
1106
 
        final int size = freeByteBlocks.size();
1107
 
        final byte[] b;
1108
 
        if (0 == size) {
1109
 
          b = new byte[blockSize];
1110
 
          bytesUsed.addAndGet(blockSize);
1111
 
        } else
1112
 
          b = freeByteBlocks.remove(size-1);
1113
 
        return b;
1114
 
      }
1115
 
    }
1116
 
 
1117
 
    /* Return byte[]'s to the pool */
1118
 
 
1119
 
    @Override
1120
 
    void recycleByteBlocks(byte[][] blocks, int start, int end) {
1121
 
      synchronized(DocumentsWriter.this) {
1122
 
        for(int i=start;i<end;i++) {
1123
 
          freeByteBlocks.add(blocks[i]);
1124
 
          blocks[i] = null;
1125
 
        }
1126
 
      }
1127
 
    }
1128
 
 
1129
 
    @Override
1130
 
    void recycleByteBlocks(List<byte[]> blocks) {
1131
 
      synchronized(DocumentsWriter.this) {
1132
 
        final int size = blocks.size();
1133
 
        for(int i=0;i<size;i++) {
1134
 
          freeByteBlocks.add(blocks.get(i));
1135
 
          blocks.set(i, null);
1136
 
        }
1137
 
      }
1138
 
    }
1139
 
  }
1140
 
 
1141
 
  /* Initial chunks size of the shared int[] blocks used to
1142
 
     store postings data */
1143
 
  final static int INT_BLOCK_SHIFT = 13;
1144
 
  final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
1145
 
  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
1146
 
 
1147
 
  private List<int[]> freeIntBlocks = new ArrayList<int[]>();
1148
 
 
1149
 
  /* Allocate another int[] from the shared pool */
1150
 
  synchronized int[] getIntBlock() {
1151
 
    final int size = freeIntBlocks.size();
1152
 
    final int[] b;
1153
 
    if (0 == size) {
1154
 
      b = new int[INT_BLOCK_SIZE];
1155
 
      bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
1156
 
    } else {
1157
 
      b = freeIntBlocks.remove(size-1);
1158
 
    }
1159
 
    return b;
1160
 
  }
1161
 
 
1162
 
  synchronized void bytesUsed(long numBytes) {
1163
 
    bytesUsed.addAndGet(numBytes);
1164
 
  }
1165
 
 
1166
 
  long bytesUsed() {
1167
 
    return bytesUsed.get() + pendingDeletes.bytesUsed.get();
1168
 
  }
1169
 
 
1170
 
  /* Return int[]s to the pool */
1171
 
  synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
1172
 
    for(int i=start;i<end;i++) {
1173
 
      freeIntBlocks.add(blocks[i]);
1174
 
      blocks[i] = null;
1175
 
    }
1176
 
  }
1177
 
 
1178
 
  ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
1179
 
 
1180
 
  final static int PER_DOC_BLOCK_SIZE = 1024;
1181
 
 
1182
 
  final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
1183
 
 
1184
 
 
1185
 
  /* Initial chunk size of the shared char[] blocks used to
1186
 
     store term text */
1187
 
  final static int CHAR_BLOCK_SHIFT = 14;
1188
 
  final static int CHAR_BLOCK_SIZE = 1 << CHAR_BLOCK_SHIFT;
1189
 
  final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
1190
 
 
1191
 
  final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1;
1192
 
 
1193
 
  private ArrayList<char[]> freeCharBlocks = new ArrayList<char[]>();
1194
 
 
1195
 
  /* Allocate another char[] from the shared pool */
1196
 
  synchronized char[] getCharBlock() {
1197
 
    final int size = freeCharBlocks.size();
1198
 
    final char[] c;
1199
 
    if (0 == size) {
1200
 
      bytesUsed.addAndGet(CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
1201
 
      c = new char[CHAR_BLOCK_SIZE];
1202
 
    } else
1203
 
      c = freeCharBlocks.remove(size-1);
1204
 
    // We always track allocations of char blocks, for now,
1205
 
    // because nothing that skips allocation tracking
1206
 
    // (currently only term vectors) uses its own char
1207
 
    // blocks.
1208
 
    return c;
1209
 
  }
1210
 
 
1211
 
  /* Return char[]s to the pool */
1212
 
  synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
1213
 
    for(int i=0;i<numBlocks;i++) {
1214
 
      freeCharBlocks.add(blocks[i]);
1215
 
      blocks[i] = null;
1216
 
    }
1217
 
  }
1218
 
 
1219
 
  String toMB(long v) {
1220
 
    return nf.format(v/1024./1024.);
1221
 
  }
1222
 
 
1223
 
  /* We have four pools of RAM: Postings, byte blocks
1224
 
   * (holds freq/prox posting data), char blocks (holds
1225
 
   * characters in the term) and per-doc buffers (stored fields/term vectors).  
1226
 
   * Different docs require varying amount of storage from 
1227
 
   * these four classes.
1228
 
   * 
1229
 
   * For example, docs with many unique single-occurrence
1230
 
   * short terms will use up the Postings RAM and hardly any
1231
 
   * of the other two.  Whereas docs with very large terms
1232
 
   * will use alot of char blocks RAM and relatively less of
1233
 
   * the other two.  This method just frees allocations from
1234
 
   * the pools once we are over-budget, which balances the
1235
 
   * pools to match the current docs. */
1236
 
  void balanceRAM() {
1237
 
 
1238
 
    final boolean doBalance;
1239
 
    final long deletesRAMUsed;
1240
 
 
1241
 
    deletesRAMUsed = bufferedDeletesStream.bytesUsed();
1242
 
 
1243
 
    final long ramBufferSize;
1244
 
    final double mb = config.getRAMBufferSizeMB();
1245
 
    if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
1246
 
      ramBufferSize = IndexWriterConfig.DISABLE_AUTO_FLUSH;
1247
 
    } else {
1248
 
      ramBufferSize = (long) (mb*1024*1024);
1249
 
    }
1250
 
 
1251
 
    synchronized(this) {
1252
 
      if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
1253
 
        return;
1254
 
      }
1255
 
    
1256
 
      doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
1257
 
    }
1258
 
 
1259
 
    if (doBalance) {
1260
 
 
1261
 
      if (infoStream != null) {
1262
 
        message("  RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
1263
 
                " vs trigger=" + toMB(ramBufferSize) +
1264
 
                " deletesMB=" + toMB(deletesRAMUsed) +
1265
 
                " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
1266
 
                " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE) +
1267
 
                " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_CHAR));
1268
 
      }
1269
 
 
1270
 
      final long startBytesUsed = bytesUsed() + deletesRAMUsed;
1271
 
 
1272
 
      int iter = 0;
1273
 
 
1274
 
      // We free equally from each pool in 32 KB
1275
 
      // chunks until we are below our threshold
1276
 
      // (freeLevel)
1277
 
 
1278
 
      boolean any = true;
1279
 
 
1280
 
      final long freeLevel = (long) (0.95 * ramBufferSize);
1281
 
 
1282
 
      while(bytesUsed()+deletesRAMUsed > freeLevel) {
1283
 
      
1284
 
        synchronized(this) {
1285
 
          if (0 == perDocAllocator.freeByteBlocks.size() 
1286
 
              && 0 == byteBlockAllocator.freeByteBlocks.size() 
1287
 
              && 0 == freeCharBlocks.size() 
1288
 
              && 0 == freeIntBlocks.size() 
1289
 
              && !any) {
1290
 
            // Nothing else to free -- must flush now.
1291
 
            bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
1292
 
            if (infoStream != null) {
1293
 
              if (bytesUsed()+deletesRAMUsed > ramBufferSize) {
1294
 
                message("    nothing to free; set bufferIsFull");
1295
 
              } else {
1296
 
                message("    nothing to free");
1297
 
              }
1298
 
            }
1299
 
            break;
1300
 
          }
1301
 
 
1302
 
          if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.size() > 0) {
1303
 
            byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
1304
 
            bytesUsed.addAndGet(-BYTE_BLOCK_SIZE);
1305
 
          }
1306
 
 
1307
 
          if ((1 == iter % 5) && freeCharBlocks.size() > 0) {
1308
 
            freeCharBlocks.remove(freeCharBlocks.size()-1);
1309
 
            bytesUsed.addAndGet(-CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
1310
 
          }
1311
 
 
1312
 
          if ((2 == iter % 5) && freeIntBlocks.size() > 0) {
1313
 
            freeIntBlocks.remove(freeIntBlocks.size()-1);
1314
 
            bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
1315
 
          }
1316
 
 
1317
 
          if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.size() > 0) {
1318
 
            // Remove upwards of 32 blocks (each block is 1K)
1319
 
            for (int i = 0; i < 32; ++i) {
1320
 
              perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1);
1321
 
              bytesUsed.addAndGet(-PER_DOC_BLOCK_SIZE);
1322
 
              if (perDocAllocator.freeByteBlocks.size() == 0) {
1323
 
                break;
1324
 
              }
1325
 
            }
1326
 
          }
1327
 
        }
1328
 
 
1329
 
        if ((4 == iter % 5) && any) {
1330
 
          // Ask consumer to free any recycled state
1331
 
          any = consumer.freeRAM();
1332
 
        }
1333
 
 
1334
 
        iter++;
1335
 
      }
1336
 
 
1337
 
      if (infoStream != null) {
1338
 
        message("    after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.));
1339
 
      }
1340
 
    }
1341
 
  }
1342
 
 
1343
 
  final WaitQueue waitQueue = new WaitQueue();
1344
 
 
1345
 
  private class WaitQueue {
1346
 
    DocWriter[] waiting;
1347
 
    int nextWriteDocID;
1348
 
    int nextWriteLoc;
1349
 
    int numWaiting;
1350
 
    long waitingBytes;
1351
 
 
1352
 
    public WaitQueue() {
1353
 
      waiting = new DocWriter[10];
1354
 
    }
1355
 
 
1356
 
    synchronized void reset() {
1357
 
      // NOTE: nextWriteLoc doesn't need to be reset
1358
 
      assert numWaiting == 0;
1359
 
      assert waitingBytes == 0;
1360
 
      nextWriteDocID = 0;
1361
 
    }
1362
 
 
1363
 
    synchronized boolean doResume() {
1364
 
      final double mb = config.getRAMBufferSizeMB();
1365
 
      final long waitQueueResumeBytes;
1366
 
      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
1367
 
        waitQueueResumeBytes = 2*1024*1024;
1368
 
      } else {
1369
 
        waitQueueResumeBytes = (long) (mb*1024*1024*0.05);
1370
 
      }
1371
 
      return waitingBytes <= waitQueueResumeBytes;
1372
 
    }
1373
 
 
1374
 
    synchronized boolean doPause() {
1375
 
      final double mb = config.getRAMBufferSizeMB();
1376
 
      final long waitQueuePauseBytes;
1377
 
      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
1378
 
        waitQueuePauseBytes = 4*1024*1024;
1379
 
      } else {
1380
 
        waitQueuePauseBytes = (long) (mb*1024*1024*0.1);
1381
 
      }
1382
 
      return waitingBytes > waitQueuePauseBytes;
1383
 
    }
1384
 
 
1385
 
    synchronized void abort() {
1386
 
      int count = 0;
1387
 
      for(int i=0;i<waiting.length;i++) {
1388
 
        final DocWriter doc = waiting[i];
1389
 
        if (doc != null) {
1390
 
          doc.abort();
1391
 
          waiting[i] = null;
1392
 
          count++;
1393
 
        }
1394
 
      }
1395
 
      waitingBytes = 0;
1396
 
      assert count == numWaiting;
1397
 
      numWaiting = 0;
1398
 
    }
1399
 
 
1400
 
    private void writeDocument(DocWriter doc) throws IOException {
1401
 
      assert doc == skipDocWriter || nextWriteDocID == doc.docID;
1402
 
      boolean success = false;
1403
 
      try {
1404
 
        doc.finish();
1405
 
        nextWriteDocID++;
1406
 
        nextWriteLoc++;
1407
 
        assert nextWriteLoc <= waiting.length;
1408
 
        if (nextWriteLoc == waiting.length) {
1409
 
          nextWriteLoc = 0;
1410
 
        }
1411
 
        success = true;
1412
 
      } finally {
1413
 
        if (!success) {
1414
 
          setAborting();
1415
 
        }
1416
 
      }
1417
 
    }
1418
 
 
1419
 
    synchronized public boolean add(DocWriter doc) throws IOException {
1420
 
 
1421
 
      assert doc.docID >= nextWriteDocID;
1422
 
 
1423
 
      if (doc.docID == nextWriteDocID) {
1424
 
        writeDocument(doc);
1425
 
        while(true) {
1426
 
          doc = waiting[nextWriteLoc];
1427
 
          if (doc != null) {
1428
 
            numWaiting--;
1429
 
            waiting[nextWriteLoc] = null;
1430
 
            waitingBytes -= doc.sizeInBytes();
1431
 
            writeDocument(doc);
1432
 
          } else {
1433
 
            break;
1434
 
          }
1435
 
        }
1436
 
      } else {
1437
 
 
1438
 
        // I finished before documents that were added
1439
 
        // before me.  This can easily happen when I am a
1440
 
        // small doc and the docs before me were large, or,
1441
 
        // just due to luck in the thread scheduling.  Just
1442
 
        // add myself to the queue and when that large doc
1443
 
        // finishes, it will flush me:
1444
 
        int gap = doc.docID - nextWriteDocID;
1445
 
        if (gap >= waiting.length) {
1446
 
          // Grow queue
1447
 
          DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
1448
 
          assert nextWriteLoc >= 0;
1449
 
          System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
1450
 
          System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
1451
 
          nextWriteLoc = 0;
1452
 
          waiting = newArray;
1453
 
          gap = doc.docID - nextWriteDocID;
1454
 
        }
1455
 
 
1456
 
        int loc = nextWriteLoc + gap;
1457
 
        if (loc >= waiting.length) {
1458
 
          loc -= waiting.length;
1459
 
        }
1460
 
 
1461
 
        // We should only wrap one time
1462
 
        assert loc < waiting.length;
1463
 
 
1464
 
        // Nobody should be in my spot!
1465
 
        assert waiting[loc] == null;
1466
 
        waiting[loc] = doc;
1467
 
        numWaiting++;
1468
 
        waitingBytes += doc.sizeInBytes();
1469
 
      }
1470
 
      
1471
 
      return doPause();
1472
 
    }
1473
 
  }
1474
 
}