1
#include "CLucene/_ApiHeader.h"
2
#include "_IndexFileDeleter.h"
3
#include "_IndexFileNameFilter.h"
4
#include "_DocumentsWriter.h"
5
#include "_SegmentHeader.h"
6
#include "CLucene/store/Directory.h"
7
#include "CLucene/LuceneThreads.h"
16
bool IndexFileDeleter::VERBOSE_REF_COUNTS = false;
18
IndexFileDeleter::CommitPoint::CommitPoint(IndexFileDeleter* _this, SegmentInfos* segmentInfos){
20
this->deleted = false;
22
segmentsFileName = segmentInfos->getCurrentSegmentFileName();
23
int32_t size = segmentInfos->size();
24
files.push_back(segmentsFileName);
25
gen = segmentInfos->getGeneration();
26
for(int32_t i=0;i<size;i++) {
27
SegmentInfo* segmentInfo = segmentInfos->info(i);
28
if (segmentInfo->dir == _this->directory) {
29
const vector<string>& ff = segmentInfo->files();
30
files.insert(files.end(),ff.begin(), ff.end());
35
IndexFileDeleter::CommitPoint::~CommitPoint(){
39
* Get the segments_N file for this commit point32_t.
41
std::string IndexFileDeleter::CommitPoint::getSegmentsFileName() {
42
return segmentsFileName;
44
bool IndexFileDeleter::CommitPoint::sort(IndexCommitPoint* elem1, IndexCommitPoint* elem2){
45
if (((CommitPoint*)elem1)->gen < ((CommitPoint*)elem2)->gen)
50
const std::vector<std::string>& IndexFileDeleter::CommitPoint::getFileNames() {
55
* Called only be the deletion policy, to remove this
56
* commit point32_t from the index.
58
void IndexFileDeleter::CommitPoint::deleteCommitPoint() {
61
_this->commitsToDelete.push_back(this);
65
const char* IndexFileDeleter::CommitPoint::getClassName(){
66
return "IndexFileDeleter::CommitPoint";
68
const char* IndexFileDeleter::CommitPoint::getObjectName() const{
69
return getClassName();
71
int32_t IndexFileDeleter::CommitPoint::compareTo(NamedObject* obj) {
72
if ( obj->getObjectName() != CommitPoint::getClassName() )
75
CommitPoint* commit = (CommitPoint*) obj;
76
if (gen < commit->gen) {
78
} else if (gen > commit->gen) {
85
void IndexFileDeleter::setInfoStream(std::ostream* infoStream) {
86
this->infoStream = infoStream;
87
if (infoStream != NULL){
88
string msg = string("setInfoStream deletionPolicy=") + policy->getObjectName();
93
void IndexFileDeleter::message(string message) {
94
(*infoStream) << string("IFD [") << Misc::toString( _LUCENE_CURRTHREADID ) << string("]: ") << message << string("\n");
98
IndexFileDeleter::~IndexFileDeleter(){
100
commitsToDelete.clear();
104
IndexFileDeleter::IndexFileDeleter(Directory* directory, IndexDeletionPolicy* policy,
105
SegmentInfos* segmentInfos, std::ostream* infoStream, DocumentsWriter* docWriter):
106
refCounts( RefCountsType(true,true) ), commits(CommitsType(true))
108
this->docWriter = docWriter;
109
this->infoStream = infoStream;
111
if (infoStream != NULL)
112
message( string("init: current segments file is \"") + segmentInfos->getCurrentSegmentFileName() + "\"; deletionPolicy=" + policy->getObjectName());
114
this->policy = policy;
115
this->directory = directory;
116
CommitPoint* currentCommitPoint = NULL;
118
// First pass: walk the files and initialize our ref
120
int64_t currentGen = segmentInfos->getGeneration();
121
const IndexFileNameFilter* filter = IndexFileNameFilter::getFilter();
123
vector<string> files;
124
if ( !directory->list(&files) )
125
_CLTHROWA(CL_ERR_IO, (string("cannot read directory ") + directory->toString() + ": list() returned NULL").c_str());
128
for(size_t i=0;i<files.size();i++) {
130
string& fileName = files.at(i);
132
if (filter->accept(NULL, fileName.c_str()) && !fileName.compare(IndexFileNames::SEGMENTS_GEN) == 0) {
134
// Add this file to refCounts with initial count 0:
135
getRefCount(fileName.c_str());
137
if ( strncmp(fileName.c_str(), IndexFileNames::SEGMENTS, strlen(IndexFileNames::SEGMENTS)) == 0 ) {
139
// This is a commit (segments or segments_N), and
140
// it's valid (<= the max gen). Load it, then
141
// incref all files it refers to:
142
if (SegmentInfos::generationFromSegmentsFileName(fileName.c_str()) <= currentGen) {
143
if (infoStream != NULL) {
144
message("init: load commit \"" + fileName + "\"");
149
sis.read(directory, fileName.c_str());
150
} catch (CLuceneError& e) {
151
if ( e.number() != CL_ERR_IO ){
154
// LUCENE-948: on NFS (and maybe others), if
155
// you have writers switching back and forth
156
// between machines, it's very likely that the
157
// dir listing will be stale and will claim a
158
// file segments_X exists when in fact it
159
// doesn't. So, we catch this and handle it
160
// as if the file does not exist
161
if (infoStream != NULL) {
162
message("init: hit FileNotFoundException when loading commit \"" + fileName + "\"; skipping this commit point32_t");
167
CommitPoint* commitPoint = _CLNEW CommitPoint(this,&sis);
168
if (sis.getGeneration() == segmentInfos->getGeneration()) {
169
currentCommitPoint = commitPoint;
171
commits.push_back(commitPoint);
179
if (currentCommitPoint == NULL) {
180
// We did not in fact see the segments_N file
181
// corresponding to the segmentInfos that was passed
182
// in. Yet, it must exist, because our caller holds
183
// the write lock. This can happen when the directory
184
// listing was stale (eg when index accessed via NFS
185
// client with stale directory listing cache). So we
186
// try now to explicitly open this commit point32_t:
189
sis.read(directory, segmentInfos->getCurrentSegmentFileName().c_str());
190
} catch (CLuceneError& e) {
191
if ( e.number() == CL_ERR_IO ){
192
_CLTHROWA(CL_ERR_CorruptIndex, "failed to locate current segments_N file");
195
if (infoStream != NULL)
196
message("forced open of current segments file " + segmentInfos->getCurrentSegmentFileName());
197
currentCommitPoint = _CLNEW CommitPoint(this,&sis);
198
commits.push_back(currentCommitPoint);
202
// We keep commits list in sorted order (oldest to newest):
203
std::sort(commits.begin(), commits.end(), CommitPoint::sort);
205
// Now delete anything with ref count at 0. These are
206
// presumably abandoned files eg due to crash of
208
RefCountsType::iterator it = refCounts.begin();
209
while(it != refCounts.end()) {
210
char* fileName = it->first;
211
RefCount* rc = it->second;
212
if (0 == rc->count) {
213
if (infoStream != NULL) {
214
message( string("init: removing unreferenced file \"") + fileName + "\"");
216
deleteFile(fileName);
221
// Finally, give policy a chance to remove things on
223
policy->onInit(commits);
225
// It's OK for the onInit to remove the current commit
226
// point; we just have to checkpoint our in-memory
227
// SegmentInfos to protect those files that it uses:
228
if (currentCommitPoint->deleted) {
229
checkpoint(segmentInfos, false);
236
* Remove the CommitPoints in the commitsToDelete List by
237
* DecRef'ing all files from each segmentInfos->
239
void IndexFileDeleter::deleteCommits() {
241
int32_t size = commitsToDelete.size();
245
// First decref all files that had been referred to by
246
// the now-deleted commits:
247
for(int32_t i=0;i<size;i++) {
248
CommitPoint* commit = commitsToDelete[i];
249
if (infoStream != NULL) {
250
message("deleteCommits: now remove commit \"" + commit->getSegmentsFileName() + "\"");
252
decRef(commit->files);
254
commitsToDelete.clear();
256
// Now compact commits to remove deleted ones (preserving the sort):
257
size = commits.size();
258
int32_t readFrom = 0;
260
while(readFrom < size) {
261
CommitPoint* commit = (CommitPoint*)commits[readFrom];
262
if (!commit->deleted) {
263
if (writeTo != readFrom) {
264
commits.remove(readFrom,true);
265
commits.remove(writeTo,false);//delete this one...
266
if ( commits.size() == writeTo )
267
commits.push_back(commit);
269
commits[writeTo] = commit;
276
while(size > writeTo) {
277
commits.remove(size-1);
284
* Writer calls this when it has hit an error and had to
285
* roll back, to tell us that there may now be
286
* unreferenced files in the filesystem. So we re-list
287
* the filesystem and delete such files. If segmentName
288
* is non-NULL, we will only delete files corresponding to
291
void IndexFileDeleter::refresh(const char* segmentName) {
292
vector<string> files;
293
if ( !directory->list(files) )
294
_CLTHROWA(CL_ERR_IO, (string("cannot read directory ") + directory->toString() + ": list() returned NULL").c_str() );
295
const IndexFileNameFilter* filter = IndexFileNameFilter::getFilter();
296
string segmentPrefix1;
297
string segmentPrefix2;
298
if (segmentName != NULL) {
299
segmentPrefix1 = string(segmentName) + ".";
300
segmentPrefix2 = string(segmentName) + "_";
303
for(size_t i=0;i<files.size();i++) {
304
string& fileName = files[i];
305
if ( filter->accept(NULL, fileName.c_str()) &&
306
( (segmentName==NULL || fileName.compare(0,segmentPrefix1.length(),segmentPrefix1) == 0 || fileName.compare(0,segmentPrefix2.length(),segmentPrefix2)==0)
307
&& refCounts.find((char*)fileName.c_str())== refCounts.end() && fileName.compare(IndexFileNames::SEGMENTS_GEN)!=0) ){
309
// Unreferenced file, so remove it
310
if (infoStream != NULL) {
311
message( string("refresh [prefix=") + segmentName + "]: removing newly created unreferenced file \"" + fileName + "\"");
313
deleteFile(fileName.c_str());
318
void IndexFileDeleter::refresh() {
322
void IndexFileDeleter::close() {
323
deletePendingFiles();
326
void IndexFileDeleter::deletePendingFiles() {
327
if (!deletable.empty()) {
328
vector<string> oldDeletable;
329
oldDeletable.insert(oldDeletable.end(),deletable.begin(),deletable.end());
332
int32_t size = oldDeletable.size();
333
for(int32_t i=0;i<size;i++) {
334
if (infoStream != NULL)
335
message("delete pending file " + oldDeletable[i]);
336
deleteFile(oldDeletable[i].c_str());
342
* For definition of "check point32_t" see IndexWriter comments:
343
* "Clarification: Check Point32_ts (and commits)".
345
* Writer calls this when it has made a "consistent
346
* change" to the index, meaning new files are written to
347
* the index and the in-memory SegmentInfos have been
348
* modified to point32_t to those files.
350
* This may or may not be a commit (segments_N may or may
351
* not have been written).
353
* We simply incref the files referenced by the new
354
* SegmentInfos and decref the files we had previously
357
* If this is a commit, we also call the policy to give it
358
* a chance to remove other commits. If any commits are
359
* removed, we decref their files as well.
361
void IndexFileDeleter::checkpoint(SegmentInfos* segmentInfos, bool isCommit) {
363
if (infoStream != NULL) {
364
message(string("now checkpoint \"") + segmentInfos->getCurrentSegmentFileName() + "\" [" +
365
Misc::toString(segmentInfos->size()) + " segments ; isCommit = " + Misc::toString(isCommit) + "]");
368
// Try again now to delete any previously un-deletable
369
// files (because they were in use, on Windows):
370
deletePendingFiles();
373
incRef(segmentInfos, isCommit);
374
const vector<string>* docWriterFiles = NULL;
375
if (docWriter != NULL) {
376
docWriterFiles = &docWriter->files();
377
if (!docWriterFiles->empty())
378
incRef(*docWriterFiles);
380
docWriterFiles = NULL;
384
// Append to our commits list:
385
commits.push_back(_CLNEW CommitPoint(this, segmentInfos));
387
// Tell policy so it can remove commits:
388
policy->onCommit(commits);
390
// Decref files for commits that were deleted by the policy:
394
// DecRef old files from the last checkpoint, if any:
395
int32_t size = lastFiles.size();
397
for(int32_t i=0;i<size;i++)
398
decRef(lastFiles[i]);
403
// Save files so we can decr on next checkpoint/commit:
404
size = segmentInfos->size();
405
for(int32_t i=0;i<size;i++) {
406
SegmentInfo* segmentInfo = segmentInfos->info(i);
407
if (segmentInfo->dir == directory) {
408
const vector<string>& files = segmentInfo->files();
409
lastFiles.insert(lastFiles.end(), files.begin(), files.end());
413
if (docWriterFiles != NULL)
414
lastFiles.insert(lastFiles.end(), docWriterFiles->begin(),docWriterFiles->end());
417
void IndexFileDeleter::incRef(SegmentInfos* segmentInfos, bool isCommit) {
418
int32_t size = segmentInfos->size();
419
for(int32_t i=0;i<size;i++) {
420
SegmentInfo* segmentInfo = segmentInfos->info(i);
421
if (segmentInfo->dir == directory) {
422
incRef(segmentInfo->files());
427
// Since this is a commit point32_t, also incref its
429
getRefCount(segmentInfos->getCurrentSegmentFileName().c_str())->IncRef();
433
void IndexFileDeleter::incRef(const vector<string>& files) {
434
int32_t size = files.size();
435
for(int32_t i=0;i<size;i++) {
436
const string& fileName = files[i];
437
RefCount* rc = getRefCount(fileName.c_str());
438
if (infoStream != NULL && VERBOSE_REF_COUNTS) {
439
message(string(" IncRef \"") + fileName + "\": pre-incr count is " + Misc::toString((int32_t)rc->count));
445
void IndexFileDeleter::decRef(const vector<string>& files) {
446
int32_t size = files.size();
447
for(int32_t i=0;i<size;i++) {
452
void IndexFileDeleter::decRef(const string& fileName) {
453
RefCount* rc = getRefCount(fileName.c_str());
454
if (infoStream != NULL && VERBOSE_REF_COUNTS) {
455
message(string(" DecRef \"") + fileName + "\": pre-decr count is " + Misc::toString((int32_t)rc->count));
457
if (0 == rc->DecRef()) {
458
// This file is no int32_t64_ter referenced by any past
459
// commit point32_ts nor by the in-memory SegmentInfos:
460
deleteFile(fileName.c_str());
461
refCounts.remove((char*)fileName.c_str());
465
void IndexFileDeleter::decRef(SegmentInfos* segmentInfos) {
466
int32_t size = segmentInfos->size();
467
for(int32_t i=0;i<size;i++) {
468
SegmentInfo* segmentInfo = segmentInfos->info(i);
469
if (segmentInfo->dir == directory) {
470
decRef(segmentInfo->files());
475
IndexFileDeleter::RefCount* IndexFileDeleter::getRefCount(const char* fileName) {
477
RefCountsType::iterator itr = refCounts.find((char*)fileName);
478
if (itr == refCounts.end()) {
479
rc = _CLNEW RefCount();
480
refCounts.put( STRDUP_AtoA(fileName), rc);
487
void IndexFileDeleter::deleteFiles(vector<string>& files) {
488
int32_t size = files.size();
489
for(int32_t i=0;i<size;i++)
490
deleteFile(files[i].c_str());
493
/** Delets the specified files, but only if they are new
494
* (have not yet been incref'd). */
495
void IndexFileDeleter::deleteNewFiles(const std::vector<std::string>& files) {
496
int32_t size = files.size();
497
for(int32_t i=0;i<size;i++)
498
if (refCounts.find((char*)files[i].c_str()) == refCounts.end())
499
deleteFile(files[i].c_str());
502
void IndexFileDeleter::deleteFile(const char* fileName)
505
if (infoStream != NULL) {
506
message(string("delete \"") + fileName + "\"");
508
directory->deleteFile(fileName);
509
} catch (CLuceneError& e) { // if delete fails
510
if ( e.number() != CL_ERR_IO ){
513
if (directory->fileExists(fileName)) {
515
// Some operating systems (e.g. Windows) don't
516
// permit a file to be deleted while it is opened
517
// for read (e.g. by another process or thread). So
518
// we assume that when a delete fails it is because
519
// the file is open in another process, and queue
520
// the file for subsequent deletion.
522
if (infoStream != NULL) {
523
message(string("IndexFileDeleter: unable to remove file \"") + fileName + "\": " + e.what() + "; Will re-try later.");
525
deletable.push_back(fileName); // add to deletable