1
package org.apache.lucene.index;
4
* Licensed to the Apache Software Foundation (ASF) under one or more
5
* contributor license agreements. See the NOTICE file distributed with
6
* this work for additional information regarding copyright ownership.
7
* The ASF licenses this file to You under the Apache License, Version 2.0
8
* (the "License"); you may not use this file except in compliance with
9
* the License. You may obtain a copy of the License at
11
* http://www.apache.org/licenses/LICENSE-2.0
13
* Unless required by applicable law or agreed to in writing, software
14
* distributed under the License is distributed on an "AS IS" BASIS,
15
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
* See the License for the specific language governing permissions and
17
* limitations under the License.
20
import java.io.IOException;
21
import java.util.concurrent.CountDownLatch;
23
import org.apache.lucene.analysis.MockAnalyzer;
24
import org.apache.lucene.document.Document;
25
import org.apache.lucene.document.Field;
26
import org.apache.lucene.store.AlreadyClosedException;
27
import org.apache.lucene.store.Directory;
28
import org.apache.lucene.store.MockDirectoryWrapper;
29
import org.apache.lucene.util.LuceneTestCase;
30
import org.apache.lucene.util.ThreadInterruptedException;
33
* MultiThreaded IndexWriter tests
35
public class TestIndexWriterWithThreads extends LuceneTestCase {
37
// Used by test cases below
38
private class IndexerThread extends Thread {
42
AlreadyClosedException ace;
45
volatile int addCount;
47
public IndexerThread(IndexWriter writer, boolean noErrors) {
49
this.noErrors = noErrors;
55
final Document doc = new Document();
56
doc.add(newField("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
60
final long stopTime = System.currentTimeMillis() + 200;
64
writer.updateDocument(new Term("id", ""+(idUpto++)), doc);
66
} catch (IOException ioe) {
68
System.out.println("TEST: expected exc:");
69
ioe.printStackTrace(System.out);
71
//System.out.println(Thread.currentThread().getName() + ": hit exc");
72
//ioe.printStackTrace(System.out);
73
if (ioe.getMessage().startsWith("fake disk full at") ||
74
ioe.getMessage().equals("now failing on purpose")) {
78
} catch (InterruptedException ie) {
79
throw new ThreadInterruptedException(ie);
85
System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected IOException:");
86
ioe.printStackTrace(System.out);
91
} catch (Throwable t) {
92
//t.printStackTrace(System.out);
94
System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected Throwable:");
95
t.printStackTrace(System.out);
100
} while(System.currentTimeMillis() < stopTime);
104
// LUCENE-1130: make sure immediate disk full on creating
105
// an IndexWriter (hit during DW.ThreadState.init()), with
106
// multiple threads, is OK:
107
public void testImmediateDiskFullWithThreads() throws Exception {
111
for(int iter=0;iter<10;iter++) {
113
System.out.println("\nTEST: iter=" + iter);
115
MockDirectoryWrapper dir = newDirectory();
116
IndexWriter writer = new IndexWriter(
118
newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).
119
setMaxBufferedDocs(2).
120
setMergeScheduler(new ConcurrentMergeScheduler()).
121
setMergePolicy(newLogMergePolicy(4))
123
((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
124
dir.setMaxSizeInBytes(4*1024+20*iter);
125
writer.setInfoStream(VERBOSE ? System.out : null);
127
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
129
for(int i=0;i<NUM_THREADS;i++)
130
threads[i] = new IndexerThread(writer, true);
132
for(int i=0;i<NUM_THREADS;i++)
135
for(int i=0;i<NUM_THREADS;i++) {
136
// Without fix for LUCENE-1130: one of the
139
assertTrue("hit unexpected Throwable", threads[i].error == null);
142
// Make sure once disk space is avail again, we can
144
dir.setMaxSizeInBytes(0);
151
// LUCENE-1130: make sure we can close() even while
152
// threads are trying to add documents. Strictly
153
// speaking, this isn't valid us of Lucene's APIs, but we
154
// still want to be robust to this case:
155
public void testCloseWithThreads() throws Exception {
158
for(int iter=0;iter<7;iter++) {
159
Directory dir = newDirectory();
160
IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))
161
.setMaxBufferedDocs(10).setMergeScheduler(new ConcurrentMergeScheduler()).setMergePolicy(newLogMergePolicy(4));
162
// We expect AlreadyClosedException
163
((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions();
164
IndexWriter writer = new IndexWriter(dir, conf);
166
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
168
for(int i=0;i<NUM_THREADS;i++)
169
threads[i] = new IndexerThread(writer, false);
171
for(int i=0;i<NUM_THREADS;i++)
174
boolean done = false;
177
for(int i=0;i<NUM_THREADS;i++)
178
// only stop when at least one thread has added a doc
179
if (threads[i].addCount > 0) {
182
} else if (!threads[i].isAlive()) {
183
fail("thread failed before indexing a single document");
189
// Make sure threads that are adding docs are not hung:
190
for(int i=0;i<NUM_THREADS;i++) {
191
// Without fix for LUCENE-1130: one of the
194
if (threads[i].isAlive())
195
fail("thread seems to be hung");
198
// Quick test to make sure index is not corrupt:
199
IndexReader reader = IndexReader.open(dir, true);
200
TermDocs tdocs = reader.termDocs(new Term("field", "aaa"));
202
while(tdocs.next()) {
205
assertTrue(count > 0);
212
// Runs test, with multiple threads, using the specific
213
// failure to trigger an IOException
214
public void _testMultipleThreadsFailure(MockDirectoryWrapper.Failure failure) throws Exception {
218
for(int iter=0;iter<2;iter++) {
220
System.out.println("TEST: iter=" + iter);
222
MockDirectoryWrapper dir = newDirectory();
223
IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT,
224
new MockAnalyzer(random)).setMaxBufferedDocs(2)
225
.setMergeScheduler(new ConcurrentMergeScheduler())
226
.setMergePolicy(newLogMergePolicy(4));
227
// We expect disk full exceptions in the merge threads
228
((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions();
229
IndexWriter writer = new IndexWriter(dir, conf);
230
writer.setInfoStream(VERBOSE ? System.out : null);
232
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
234
for(int i=0;i<NUM_THREADS;i++)
235
threads[i] = new IndexerThread(writer, true);
237
for(int i=0;i<NUM_THREADS;i++)
245
for(int i=0;i<NUM_THREADS;i++) {
247
assertTrue("hit unexpected Throwable", threads[i].error == null);
250
boolean success = false;
254
} catch (IOException ioe) {
255
failure.clearDoFail();
260
IndexReader reader = IndexReader.open(dir, true);
261
for(int j=0;j<reader.maxDoc();j++) {
262
if (!reader.isDeleted(j)) {
264
reader.getTermFreqVectors(j);
274
// Runs test, with one thread, using the specific failure
275
// to trigger an IOException
276
public void _testSingleThreadFailure(MockDirectoryWrapper.Failure failure) throws IOException {
277
MockDirectoryWrapper dir = newDirectory();
279
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random))
280
.setMaxBufferedDocs(2).setMergeScheduler(new ConcurrentMergeScheduler()));
281
final Document doc = new Document();
282
doc.add(newField("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
285
writer.addDocument(doc);
290
writer.addDocument(doc);
291
writer.addDocument(doc);
293
fail("did not hit exception");
294
} catch (IOException ioe) {
296
failure.clearDoFail();
297
writer.addDocument(doc);
302
// Throws IOException during FieldsWriter.flushDocument and during DocumentsWriter.abort
303
private static class FailOnlyOnAbortOrFlush extends MockDirectoryWrapper.Failure {
304
private boolean onlyOnce;
305
public FailOnlyOnAbortOrFlush(boolean onlyOnce) {
306
this.onlyOnce = onlyOnce;
309
public void eval(MockDirectoryWrapper dir) throws IOException {
311
StackTraceElement[] trace = new Exception().getStackTrace();
312
boolean sawAbortOrFlushDoc = false;
313
boolean sawClose = false;
314
for (int i = 0; i < trace.length; i++) {
315
if ("abort".equals(trace[i].getMethodName()) ||
316
"flushDocument".equals(trace[i].getMethodName())) {
317
sawAbortOrFlushDoc = true;
319
if ("close".equals(trace[i].getMethodName())) {
323
if (sawAbortOrFlushDoc && !sawClose) {
326
//System.out.println(Thread.currentThread().getName() + ": now fail");
327
//new Throwable().printStackTrace(System.out);
328
throw new IOException("now failing on purpose");
336
// LUCENE-1130: make sure initial IOException, and then 2nd
337
// IOException during rollback(), is OK:
338
public void testIOExceptionDuringAbort() throws IOException {
339
_testSingleThreadFailure(new FailOnlyOnAbortOrFlush(false));
342
// LUCENE-1130: make sure initial IOException, and then 2nd
343
// IOException during rollback(), is OK:
344
public void testIOExceptionDuringAbortOnlyOnce() throws IOException {
345
_testSingleThreadFailure(new FailOnlyOnAbortOrFlush(true));
348
// LUCENE-1130: make sure initial IOException, and then 2nd
349
// IOException during rollback(), with multiple threads, is OK:
350
public void testIOExceptionDuringAbortWithThreads() throws Exception {
351
_testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush(false));
354
// LUCENE-1130: make sure initial IOException, and then 2nd
355
// IOException during rollback(), with multiple threads, is OK:
356
public void testIOExceptionDuringAbortWithThreadsOnlyOnce() throws Exception {
357
_testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush(true));
360
// Throws IOException during DocumentsWriter.writeSegment
361
private static class FailOnlyInWriteSegment extends MockDirectoryWrapper.Failure {
362
private boolean onlyOnce;
363
public FailOnlyInWriteSegment(boolean onlyOnce) {
364
this.onlyOnce = onlyOnce;
367
public void eval(MockDirectoryWrapper dir) throws IOException {
369
StackTraceElement[] trace = new Exception().getStackTrace();
370
for (int i = 0; i < trace.length; i++) {
371
if ("flush".equals(trace[i].getMethodName()) && "org.apache.lucene.index.DocFieldProcessor".equals(trace[i].getClassName())) {
374
throw new IOException("now failing on purpose");
381
// LUCENE-1130: test IOException in writeSegment
382
public void testIOExceptionDuringWriteSegment() throws IOException {
383
_testSingleThreadFailure(new FailOnlyInWriteSegment(false));
386
// LUCENE-1130: test IOException in writeSegment
387
public void testIOExceptionDuringWriteSegmentOnlyOnce() throws IOException {
388
_testSingleThreadFailure(new FailOnlyInWriteSegment(true));
391
// LUCENE-1130: test IOException in writeSegment, with threads
392
public void testIOExceptionDuringWriteSegmentWithThreads() throws Exception {
393
_testMultipleThreadsFailure(new FailOnlyInWriteSegment(false));
396
// LUCENE-1130: test IOException in writeSegment, with threads
397
public void testIOExceptionDuringWriteSegmentWithThreadsOnlyOnce() throws Exception {
398
_testMultipleThreadsFailure(new FailOnlyInWriteSegment(true));
401
// LUCENE-3365: Test adding two documents with the same field from two different IndexWriters
402
// that we attempt to open at the same time. As long as the first IndexWriter completes
403
// and closes before the second IndexWriter time's out trying to get the Lock,
404
// we should see both documents
405
public void testOpenTwoIndexWritersOnDifferentThreads() throws IOException, InterruptedException {
406
final MockDirectoryWrapper dir = newDirectory();
407
CountDownLatch oneIWConstructed = new CountDownLatch(1);
408
DelayedIndexAndCloseRunnable thread1 = new DelayedIndexAndCloseRunnable(
409
dir, oneIWConstructed);
410
DelayedIndexAndCloseRunnable thread2 = new DelayedIndexAndCloseRunnable(
411
dir, oneIWConstructed);
415
oneIWConstructed.await();
417
thread1.startIndexing();
418
thread2.startIndexing();
423
assertFalse("Failed due to: " + thread1.failure, thread1.failed);
424
assertFalse("Failed due to: " + thread2.failure, thread2.failed);
425
// now verify that we have two documents in the index
426
IndexReader reader = IndexReader.open(dir, true);
427
assertEquals("IndexReader should have one document per thread running", 2,
434
static class DelayedIndexAndCloseRunnable extends Thread {
435
private final Directory dir;
436
boolean failed = false;
437
Throwable failure = null;
438
private final CountDownLatch startIndexing = new CountDownLatch(1);
439
private CountDownLatch iwConstructed;
441
public DelayedIndexAndCloseRunnable(Directory dir,
442
CountDownLatch iwConstructed) {
444
this.iwConstructed = iwConstructed;
447
public void startIndexing() {
448
this.startIndexing.countDown();
454
Document doc = new Document();
455
Field field = newField("field", "testData", Field.Store.YES,
456
Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS);
458
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
459
TEST_VERSION_CURRENT, new MockAnalyzer(random)));
460
iwConstructed.countDown();
461
startIndexing.await();
462
writer.addDocument(doc);
464
} catch (Throwable e) {
467
failure.printStackTrace(System.out);