1
package org.apache.lucene.index;
4
* Licensed to the Apache Software Foundation (ASF) under one or more
5
* contributor license agreements. See the NOTICE file distributed with
6
* this work for additional information regarding copyright ownership.
7
* The ASF licenses this file to You under the Apache License, Version 2.0
8
* (the "License"); you may not use this file except in compliance with
9
* the License. You may obtain a copy of the License at
11
* http://www.apache.org/licenses/LICENSE-2.0
13
* Unless required by applicable law or agreed to in writing, software
14
* distributed under the License is distributed on an "AS IS" BASIS,
15
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
* See the License for the specific language governing permissions and
17
* limitations under the License.
21
import java.io.IOException;
22
import java.util.ArrayList;
23
import java.util.Collections;
24
import java.util.HashSet;
25
import java.util.List;
27
import java.util.concurrent.ExecutorService;
28
import java.util.concurrent.Executors;
29
import java.util.concurrent.TimeUnit;
30
import java.util.concurrent.atomic.AtomicBoolean;
31
import java.util.concurrent.atomic.AtomicInteger;
33
import org.apache.lucene.analysis.MockAnalyzer;
34
import org.apache.lucene.document.Document;
35
import org.apache.lucene.document.Field;
36
import org.apache.lucene.search.IndexSearcher;
37
import org.apache.lucene.search.PhraseQuery;
38
import org.apache.lucene.search.Query;
39
import org.apache.lucene.search.ScoreDoc;
40
import org.apache.lucene.search.Sort;
41
import org.apache.lucene.search.SortField;
42
import org.apache.lucene.search.TermQuery;
43
import org.apache.lucene.search.TopDocs;
44
import org.apache.lucene.store.MockDirectoryWrapper;
45
import org.apache.lucene.util.LineFileDocs;
46
import org.apache.lucene.util.LuceneTestCase;
47
import org.apache.lucene.util._TestUtil;
48
import org.junit.Test;
51
// - mix in optimize, addIndexes
52
// - randomoly mix in non-congruent docs
54
public class TestNRTThreads extends LuceneTestCase {
56
private static class SubDocs {
57
public final String packID;
58
public final List<String> subIDs;
59
public boolean deleted;
61
public SubDocs(String packID, List<String> subIDs) {
68
public void testNRTThreads() throws Exception {
70
final long t0 = System.currentTimeMillis();
72
final LineFileDocs docs = new LineFileDocs(random);
73
final File tempDir = _TestUtil.getTempDir("nrtopenfiles");
74
final MockDirectoryWrapper dir = newFSDirectory(tempDir);
75
dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves.
76
final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
77
conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
79
public void warm(IndexReader reader) throws IOException {
81
System.out.println("TEST: now warm merged reader=" + reader);
83
final int maxDoc = reader.maxDoc();
85
final int inc = Math.max(1, maxDoc/50);
86
for(int docID=0;docID<maxDoc;docID += inc) {
87
if (reader.isDeleted(docID)) {
88
final Document doc = reader.document(docID);
89
sum += doc.getFields().size();
93
IndexSearcher searcher = newSearcher(reader);
94
sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
98
System.out.println("TEST: warm visited " + sum + " fields");
103
final IndexWriter writer = new IndexWriter(dir, conf);
105
writer.setInfoStream(System.out);
107
_TestUtil.reduceOpenFiles(writer);
109
final int NUM_INDEX_THREADS = 2;
110
final int NUM_SEARCH_THREADS = 3;
112
final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
114
final AtomicBoolean failed = new AtomicBoolean();
115
final AtomicInteger addCount = new AtomicInteger();
116
final AtomicInteger delCount = new AtomicInteger();
117
final AtomicInteger packCount = new AtomicInteger();
119
final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
120
final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
122
final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
123
Thread[] threads = new Thread[NUM_INDEX_THREADS];
124
for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
125
threads[thread] = new Thread() {
128
// TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
129
final List<String> toDeleteIDs = new ArrayList<String>();
130
final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
131
while(System.currentTimeMillis() < stopTime && !failed.get()) {
133
Document doc = docs.nextDoc();
137
final String addedField;
138
if (random.nextBoolean()) {
139
addedField = "extra" + random.nextInt(10);
140
doc.add(new Field(addedField, "a random field", Field.Store.NO, Field.Index.ANALYZED));
144
if (random.nextBoolean()) {
146
System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid"));
149
if (random.nextBoolean()) {
150
// Add a pack of adjacent sub-docs
152
final SubDocs delSubDocs;
153
if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
154
delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
155
assert !delSubDocs.deleted;
156
toDeleteSubDocs.remove(delSubDocs);
157
// reuse prior packID
158
packID = delSubDocs.packID;
162
packID = packCount.getAndIncrement() + "";
165
final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
166
final List<String> docIDs = new ArrayList<String>();
167
final SubDocs subDocs = new SubDocs(packID, docIDs);
168
final List<Document> docsList = new ArrayList<Document>();
170
allSubDocs.add(subDocs);
171
doc.add(packIDField);
172
docsList.add(_TestUtil.cloneDocument(doc));
173
docIDs.add(doc.get("docid"));
175
final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
176
while(docsList.size() < maxDocCount) {
177
doc = docs.nextDoc();
181
docsList.add(_TestUtil.cloneDocument(doc));
182
docIDs.add(doc.get("docid"));
184
addCount.addAndGet(docsList.size());
186
if (delSubDocs != null) {
187
delSubDocs.deleted = true;
188
delIDs.addAll(delSubDocs.subIDs);
189
delCount.addAndGet(delSubDocs.subIDs.size());
191
System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
193
writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
196
writer.deleteDocuments(new Term("packID", delSubDocs.packID));
197
for(Document subDoc : docsList) {
198
writer.addDocument(subDoc);
203
System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
205
writer.addDocuments(docsList);
209
for(Document subDoc : docsList) {
210
writer.addDocument(subDoc);
214
doc.removeField("packID");
216
if (random.nextInt(5) == 2) {
218
//System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
220
toDeleteSubDocs.add(subDocs);
224
writer.addDocument(doc);
225
addCount.getAndIncrement();
227
if (random.nextInt(5) == 3) {
229
//System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
231
toDeleteIDs.add(doc.get("docid"));
235
// we use update but it never replaces a
238
System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
240
writer.updateDocument(new Term("docid", doc.get("docid")), doc);
241
addCount.getAndIncrement();
243
if (random.nextInt(5) == 3) {
245
//System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
247
toDeleteIDs.add(doc.get("docid"));
251
if (random.nextInt(30) == 17) {
253
System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
255
for(String id : toDeleteIDs) {
257
System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
259
writer.deleteDocuments(new Term("docid", id));
261
final int count = delCount.addAndGet(toDeleteIDs.size());
263
System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
265
delIDs.addAll(toDeleteIDs);
268
for(SubDocs subDocs : toDeleteSubDocs) {
269
assert !subDocs.deleted;
270
writer.deleteDocuments(new Term("packID", subDocs.packID));
271
subDocs.deleted = true;
273
System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
275
delIDs.addAll(subDocs.subIDs);
276
delCount.addAndGet(subDocs.subIDs.size());
278
toDeleteSubDocs.clear();
280
if (addedField != null) {
281
doc.removeField(addedField);
283
} catch (Throwable t) {
284
System.out.println(Thread.currentThread().getName() + ": hit exc");
287
throw new RuntimeException(t);
291
System.out.println(Thread.currentThread().getName() + ": indexing done");
295
threads[thread].setDaemon(true);
296
threads[thread].start();
300
System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
303
// let index build up a bit
306
IndexReader r = IndexReader.open(writer, true);
309
// silly starting guess:
310
final AtomicInteger totTermCount = new AtomicInteger(100);
312
final ExecutorService es = Executors.newCachedThreadPool();
314
while(System.currentTimeMillis() < stopTime && !failed.get()) {
315
if (random.nextBoolean()) {
317
System.out.println("TEST: now reopen r=" + r);
319
final IndexReader r2 = r.reopen();
326
System.out.println("TEST: now close reader=" + r);
330
final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
331
if (openDeletedFiles.size() > 0) {
332
System.out.println("OBD files: " + openDeletedFiles);
334
any |= openDeletedFiles.size() > 0;
335
//assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size());
337
System.out.println("TEST: now open");
339
r = IndexReader.open(writer, true);
342
System.out.println("TEST: got new reader=" + r);
344
//System.out.println("numDocs=" + r.numDocs() + "
345
//openDelFileCount=" + dir.openDeleteFileCount());
349
if (r.numDocs() > 0) {
351
final IndexSearcher s = new IndexSearcher(r, es);
353
// run search threads
354
final long searchStopTime = System.currentTimeMillis() + 500;
355
final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS];
356
final AtomicInteger totHits = new AtomicInteger();
357
for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
358
searchThreads[thread] = new Thread() {
362
TermEnum termEnum = s.getIndexReader().terms(new Term("body", ""));
363
int seenTermCount = 0;
366
if (totTermCount.get() < 10) {
370
trigger = totTermCount.get()/10;
371
shift = random.nextInt(trigger);
373
while(System.currentTimeMillis() < searchStopTime) {
374
Term term = termEnum.term();
376
if (seenTermCount < 10) {
379
totTermCount.set(seenTermCount);
381
trigger = totTermCount.get()/10;
382
//System.out.println("trigger " + trigger);
383
shift = random.nextInt(trigger);
384
termEnum = s.getIndexReader().terms(new Term("body", ""));
392
if ((seenTermCount + shift) % trigger == 0) {
394
//System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
396
totHits.addAndGet(runQuery(s, new TermQuery(term)));
401
System.out.println(Thread.currentThread().getName() + ": search done");
403
} catch (Throwable t) {
404
System.out.println(Thread.currentThread().getName() + ": hit exc");
406
t.printStackTrace(System.out);
407
throw new RuntimeException(t);
411
searchThreads[thread].setDaemon(true);
412
searchThreads[thread].start();
415
for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
416
searchThreads[thread].join();
420
System.out.println("TEST: DONE search: totHits=" + totHits);
428
es.awaitTermination(1, TimeUnit.SECONDS);
431
System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
434
//System.out.println("numDocs=" + r.numDocs() + " openDelFileCount=" + dir.openDeleteFileCount());
436
final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
437
if (openDeletedFiles.size() > 0) {
438
System.out.println("OBD files: " + openDeletedFiles);
440
any |= openDeletedFiles.size() > 0;
442
assertFalse("saw non-zero open-but-deleted count", any);
444
System.out.println("TEST: now join");
446
for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
447
threads[thread].join();
450
System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
453
final IndexReader r2 = writer.getReader();
454
final IndexSearcher s = newSearcher(r2);
455
boolean doFail = false;
456
for(String id : delIDs) {
457
final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
458
if (hits.totalHits != 0) {
459
System.out.println("doc id=" + id + " is supposed to be deleted, but got docID=" + hits.scoreDocs[0].doc);
464
// Make sure each group of sub-docs are still in docID order:
465
for(SubDocs subDocs : allSubDocs) {
466
if (!subDocs.deleted) {
467
// We sort by relevance but the scores should be identical so sort falls back to by docID:
468
TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
469
assertEquals(subDocs.subIDs.size(), hits.totalHits);
472
for(ScoreDoc scoreDoc : hits.scoreDocs) {
473
final int docID = scoreDoc.doc;
474
if (lastDocID != -1) {
475
assertEquals(1+lastDocID, docID);
480
final Document doc = s.doc(docID);
481
assertEquals(subDocs.packID, doc.get("packID"));
484
lastDocID = startDocID - 1;
485
for(String subID : subDocs.subIDs) {
486
hits = s.search(new TermQuery(new Term("docid", subID)), 1);
487
assertEquals(1, hits.totalHits);
488
final int docID = hits.scoreDocs[0].doc;
489
if (lastDocID != -1) {
490
assertEquals(1+lastDocID, docID);
495
for(String subID : subDocs.subIDs) {
496
assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
501
final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
502
for(int id=0;id<endID;id++) {
503
String stringID = ""+id;
504
if (!delIDs.contains(stringID)) {
505
final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
506
if (hits.totalHits != 1) {
507
System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
514
assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), r2.numDocs());
518
assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
520
assertFalse(writer.anyNonBulkMerges);
522
_TestUtil.checkIndex(dir);
525
_TestUtil.rmDir(tempDir);
528
System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
532
private int runQuery(IndexSearcher s, Query q) throws Exception {
534
return s.search(q, null, 10, new Sort(new SortField("title", SortField.STRING))).totalHits;
537
private void smokeTestReader(IndexReader r) throws Exception {
538
IndexSearcher s = newSearcher(r);
539
runQuery(s, new TermQuery(new Term("body", "united")));
540
runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
541
PhraseQuery pq = new PhraseQuery();
542
pq.add(new Term("body", "united"));
543
pq.add(new Term("body", "states"));