~divmod-dev/divmod.org/dangling-1091

« back to all changes in this revision

Viewing changes to Xapwrap/xapwrap/index.py

  • Committer: washort
  • Date: 2005-10-20 15:16:48 UTC
  • Revision ID: svn-v4:866e43f7-fbfc-0310-8f2a-ec88d1da2979:trunk:2508
Add Xapwrap. Closes #182.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2005 Divmod Inc. See LICENSE file for details.
 
2
"""
 
3
Xapwrap provides an improved interface to the Xapian text indexing
 
4
library (see http://www.xapian.org/ for more information on
 
5
Xapian). Xapwrap provides a layered approach offering ample
 
6
opportunities for customization.
 
7
 
 
8
Example
 
9
-------
 
10
::
 
11
 
 
12
    from xapwrap import SmartIndex, Document, TextField, SortKey
 
13
    from datetime import date
 
14
 
 
15
    idx = SmartIndex('/tmp/index', True)
 
16
    d1 = Document(TextField('hi there bob'),
 
17
                  sortFields = [SortKey('date', date(2004, 1, 1)),
 
18
                                SortKey('author', 'Bob'),
 
19
                                SortKey('size', 450)])
 
20
    idx.index(d1)
 
21
    idx.close()
 
22
 
 
23
    idx = SmartIndex('/tmp/index')
 
24
    print idx.search('there', 'date', sortAscending = True)
 
25
 
 
26
 
 
27
 
 
28
Indices
 
29
-------
 
30
 
 
31
Important methods for C{ReadOnlyIndex}:
 
32
 __init__(self, *pathnames)
 
33
 close(self)
 
34
 configure(self, prefixMap = None, indexValueMap = None)
 
35
 flush(self)
 
36
 search(self, query, sortKeyt = None,
 
37
        startingIndex = 0, batchSize = MAX_DOCS_TO_RETURN,
 
38
        sortIndex = None, sortAscending = True,
 
39
        sortByRelevence = False)
 
40
 count(self, query)
 
41
 checkIndex(self, maxID)
 
42
 get_doccount(self, uid)
 
43
 
 
44
Important methods for C{Index}:
 
45
 (all methods in ReadOnlyIndex)
 
46
 __init__(self, pathname, create)
 
47
 index(self, doc)
 
48
 add_document(self, doc)
 
49
 replace_document(self, uid, doc)
 
50
 delete_document(self, uid)
 
51
 
 
52
C{SmartIndex} and C{SmartReadOnlyIndex} define the same methods as their
 
53
dumb counterparts.
 
54
 
 
55
The primary way to interact with a Xapian index is to use either the
 
56
C{Index} or C{ReadOnlyIndex} class. In addition to offering read only
 
57
access without the inconveniance of lock files, C{ReadOnlyIndex} offers
 
58
the ability to merge several xapian indices into one super index with
 
59
only a small performance impediement.
 
60
 
 
61
In addition to C{Index} and C{ReadOnlyIndex}, Xapwrap also offers
 
62
C{SmartIndex} and C{SmartReadOnlyIndex} classes. These classes
 
63
automatically store and manage the index value map and the prefix map in
 
64
the index. There are two caveats to using them however. First, one
 
65
cannot index documents that have a xapian ID of 1. Secondly, when using
 
66
C{SmartReadOnlyIndex} to combine multiple indices together, the indices
 
67
must have consistent value index maps. Indices where all documents have
 
68
the same index value map are always consistent. The problem only emerges
 
69
when indices can have different types of documents with different sets
 
70
of sort keys. More specifically, the problem can only emerge if one
 
71
indices documents in such a way that sort keys are added to different
 
72
indices in different orders.
 
73
 
 
74
 
 
75
Documents
 
76
---------
 
77
 
 
78
In order to add new data to an index, one asks a C{Index} or
 
79
C{SmartIndex} instance to index a C{Document} instance. Documents take a
 
80
sequence of text fields, a sequence of sort keys and a sequence of
 
81
keywords as constructor arguments. They also take optional universal
 
82
identifiers and an arbitrary serializable object. The first three
 
83
sequences can be created using the C{TextField}, C{SortKey}, and
 
84
C{Keyword} classes defined below. C{TextField} instances contain a chunk
 
85
of text and an optional name as well as a boolean indicating whether the
 
86
field is to be prefixed. Prefixed fields are effectively indexed twice:
 
87
after being indexed normally, each token is indexed again with the field
 
88
name. This allows the user to perform fielded searches and is primarily
 
89
useful for small text fields, such as the subject of an email or a list
 
90
of author names. C{Keyword} instances denote individiual prefixed tokens
 
91
that are indexed with no positional information. C{SortKey} instances
 
92
denote arbitrary fields that are used for sorting documents. They
 
93
include a sort field name and the sort key value. Since Xapian only
 
94
accepts strings as sort keys, sort key values must be flattened into
 
95
strings before entering the index.
 
96
 
 
97
Xapwrap defines flattener functions that automatically flatten integer,
 
98
date, time, and datetime instances into strings that sort properly. You
 
99
can define your own flatteners for custom data types by using the
 
100
C{registerFlattener} class method of the C{Document} class.
 
101
 
 
102
 
 
103
Error Handling
 
104
--------------
 
105
Internal Xapian error conditions should generate normal python
 
106
exceptions defined in this file that inherit from xapwrap.XapianError.
 
107
 
 
108
 
 
109
Logging
 
110
-------
 
111
Xapwrap will use twisted's logging facilities if available. In any
 
112
event, a custom logging function can be supplied by setting xapwrap.log.
 
113
 
 
114
 
 
115
Future Work
 
116
-----------
 
117
Xapwrap currently does not support stemming or stop words, although a
 
118
future version will.
 
119
 
 
120
"""
 
121
import xapian, cPickle, sets, glob, os
 
122
from xapwrap.document import makePairForWrite, StandardAnalyzer, Document, SortKey, Keyword
 
123
 
 
124
try:
 
125
    from atop.tpython import FilesystemLock
 
126
except ImportError:
 
127
    from os import symlink, readlink, remove as rmlink
 
128
    import errno
 
129
 
 
130
    class FilesystemLock:
 
131
        """A mutex.
 
132
 
 
133
        This relies on the filesystem property that creating
 
134
        a symlink is an atomic operation and that it will
 
135
        fail if the symlink already exists.  Deleting the
 
136
        symlink will release the lock.
 
137
 
 
138
        @ivar name: The name of the file associated with this lock.
 
139
        @ivar clean: Indicates whether this lock was released cleanly by its
 
140
        last owner.  Only meaningful after C{lock} has been called and returns
 
141
        True.
 
142
        """
 
143
 
 
144
        clean = None
 
145
        locked = False
 
146
 
 
147
        def __init__(self, name):
 
148
            self.name = name
 
149
 
 
150
        def lock(self):
 
151
            """Acquire this lock.
 
152
 
 
153
            @rtype: C{bool}
 
154
            @return: True if the lock is acquired, false otherwise.
 
155
 
 
156
            @raise: Any exception os.symlink() may raise, other than
 
157
            EEXIST.
 
158
            """
 
159
            try:
 
160
                pid = readlink(self.name)
 
161
            except (OSError, IOError), e:
 
162
                if e.errno != errno.ENOENT:
 
163
                    raise
 
164
                self.clean = True
 
165
            else:
 
166
                if not hasattr(os, 'kill'):
 
167
                    return False
 
168
                try:
 
169
                    os.kill(int(pid), 0)
 
170
                except (OSError, IOError), e:
 
171
                    if e.errno != errno.ESRCH:
 
172
                        raise
 
173
                    rmlink(self.name)
 
174
                    self.clean = False
 
175
                else:
 
176
                    return False
 
177
 
 
178
            symlink(str(os.getpid()), self.name)
 
179
            self.locked = True
 
180
            return True
 
181
 
 
182
        def unlock(self):
 
183
            """Release this lock.
 
184
 
 
185
            This deletes the directory with the given name.
 
186
 
 
187
            @raise: Any exception os.readlink() may raise, or
 
188
            ValueError if the lock is not owned by this process.
 
189
            """
 
190
            pid = readlink(self.name)
 
191
            if int(pid) != os.getpid():
 
192
                raise ValueError("Lock %r not owned by this process" % (self.name,))
 
193
            rmlink(self.name)
 
194
            self.locked = False
 
195
 
 
196
try:
 
197
    from twisted.python.log import msg as log
 
198
except ImportError:
 
199
    def log(*args):
 
200
        pass
 
201
 
 
202
 
 
203
# max number of bytes that can be indexed without forcing an index
 
204
# flush. this limits memory consumption
 
205
MAX_DATA_INDEXED_BETWEEN_FLUSHES = 200 * 1000
 
206
 
 
207
MAX_DOCS_TO_RETURN = 1000 * 1000
 
208
 
 
209
XAPIAN_LOCK_FILENAME = "db_lock"
 
210
XAPWRAP_LOCK_FILENAME = "xapian_lock"
 
211
 
 
212
# Xapian error handling is somewhat weak: all errors trigger either an
 
213
# IOError, a RuntimeError, or a ValueError. The exception's args
 
214
# attribute is a singleton tuple containing an explanation
 
215
# string. Possible errors include 'DatabaseCorruptError: Quartz metafile
 
216
# /tmp/foo/meta is invalid: magic string not found.' and
 
217
# 'DatabaseLockError: Unable to acquire database write lock
 
218
# /tmp/foo/db_lock'. Instead of looking inside exception error strings
 
219
# everywhere, I made a wrapper for xapian database operations that
 
220
# catches exceptions and translates them into the more meaningful
 
221
# exceptions shown below.
 
222
 
 
223
class XapianError(StandardError):
 
224
    pass
 
225
class XapianRuntimeError(XapianError):
 
226
    pass
 
227
class XapianLogicError(XapianError):
 
228
    pass
 
229
class XapianDatabaseError(XapianError):
 
230
    pass
 
231
 
 
232
class XapianAssertionError(XapianLogicError):
 
233
    pass
 
234
class InvalidOperationError(XapianLogicError):
 
235
    pass
 
236
class InvalidArgumentError(XapianLogicError):
 
237
    pass
 
238
class UnimplementedError(XapianLogicError):
 
239
    pass
 
240
 
 
241
class DocNotFoundError(XapianRuntimeError):
 
242
    pass
 
243
class RangeError(XapianRuntimeError):
 
244
    pass
 
245
class InternalError(XapianRuntimeError):
 
246
    pass
 
247
class FeatureUnavalableError(XapianRuntimeError):
 
248
    pass
 
249
class XapianNetworkError(XapianRuntimeError):
 
250
    pass
 
251
 
 
252
class NetworkTimeoutError(XapianNetworkError):
 
253
    pass
 
254
 
 
255
class DatabaseCorruptionError(XapianDatabaseError):
 
256
    pass
 
257
class DatabaseCreationError(XapianDatabaseError):
 
258
    pass
 
259
class DatabaseOpeningError(XapianDatabaseError):
 
260
    pass
 
261
class DatabaseLockError(XapianDatabaseError):
 
262
    pass
 
263
class DatabaseModifiedError(XapianDatabaseError):
 
264
    pass
 
265
 
 
266
# these exceptions are not Xapian errors
 
267
class UnknownDatabaseError(XapianError):
 
268
    pass
 
269
 
 
270
class NoIndexValueFound(XapianError):
 
271
    pass
 
272
 
 
273
class InconsistantIndex(XapianError):
 
274
    pass
 
275
 
 
276
class InconsistantIndexCombination(XapianError):
 
277
    pass
 
278
 
 
279
 
 
280
def makeTranslatedMethod(methodName):
 
281
    def translatedMethod(self, *args, **kwargs):
 
282
        try:
 
283
            return getattr(self.db, methodName)(*args, **kwargs)
 
284
        except (IOError, RuntimeError, ValueError), e:
 
285
            errorMsg = e.args[0]
 
286
            for subString, exceptionClass in self.exceptionStrMap.iteritems():
 
287
                if subString in errorMsg:
 
288
                    raise exceptionClass(e)
 
289
            else:
 
290
                raise UnknownDatabaseError(e)
 
291
        except:
 
292
            raise
 
293
    return translatedMethod
 
294
 
 
295
class ExceptionTranslater:
 
296
    def __init__(self, db):
 
297
        self.db = db
 
298
 
 
299
    def openIndex(klass, readOnly, *args, **kwargs):
 
300
        try:
 
301
            if readOnly:
 
302
                assert len(kwargs) == 0
 
303
                # assume all args are db paths
 
304
                db = (xapian.Database(args[0]))
 
305
                for path in args[1:]:
 
306
                    db.add_database(xapian.Database(path))
 
307
                return klass(db)
 
308
            else:
 
309
                return klass(xapian.open(*args, **kwargs))
 
310
        except (IOError, RuntimeError, ValueError ), e:
 
311
            errorMsg = e.args[0]
 
312
            for subString, exceptionClass in klass.exceptionStrMap.iteritems():
 
313
                if subString in errorMsg:
 
314
                    raise exceptionClass(e)
 
315
            else:
 
316
                raise UnknownDatabaseError(e)
 
317
        except Exception, e:
 
318
            raise UnknownDatabaseError(e)
 
319
 
 
320
    openIndex = classmethod(openIndex)
 
321
 
 
322
    # possible exceptions are taken from the list at
 
323
    # http://www.xapian.org/docs/apidoc/html/errortypes_8h.html
 
324
    exceptionStrMap = {
 
325
        # exceptions whose names differ between xapwrap and Xapian
 
326
        'DatabaseCorruptError': DatabaseCorruptionError,
 
327
        'AssertionError': XapianAssertionError,
 
328
        'DatabaseCreateError': DatabaseCreationError,
 
329
 
 
330
        # exceptions translated with the same name
 
331
        'DatabaseLockError': DatabaseLockError,
 
332
        'DatabaseOpeningError': DatabaseOpeningError,
 
333
        'DatabaseModifiedError': DatabaseModifiedError,
 
334
        'FeatureUnavalableError': FeatureUnavalableError,
 
335
        'DocNotFoundError': DocNotFoundError,
 
336
        'InvalidOperationError': InvalidOperationError,
 
337
        'InvalidArgumentError': InvalidArgumentError,
 
338
        'UnimplementedError': UnimplementedError,
 
339
        'NetworkError': XapianNetworkError,
 
340
        'NetworkTimeoutError': NetworkTimeoutError,
 
341
        'DatabaseError': XapianDatabaseError,
 
342
        'InternalError': InternalError,
 
343
        'RangeError': RangeError,
 
344
        'RuntimeError': XapianRuntimeError,
 
345
        'LogicError': XapianLogicError
 
346
        }
 
347
 
 
348
    get_doccount = makeTranslatedMethod('get_doccount')
 
349
    add_document = makeTranslatedMethod('add_document')
 
350
    replace_document = makeTranslatedMethod('replace_document')
 
351
    delete_document = makeTranslatedMethod('delete_document')
 
352
    flush = makeTranslatedMethod('flush')
 
353
    term_exists = makeTranslatedMethod('term_exists')
 
354
    reopen = makeTranslatedMethod('reopen')
 
355
    begin_transaction = makeTranslatedMethod('begin_transaction')
 
356
    commit_transaction = makeTranslatedMethod('commit_transaction')
 
357
    cancel_transaction = makeTranslatedMethod('cancel_transaction')
 
358
    get_lastdocid = makeTranslatedMethod('get_lastdocid')
 
359
    get_avlength = makeTranslatedMethod('get_avlength')
 
360
    get_termfreq = makeTranslatedMethod('get_termfreq')
 
361
    get_collection_freq = makeTranslatedMethod('get_collection_freq')
 
362
    get_doclength = makeTranslatedMethod('get_doclength')
 
363
    get_document = makeTranslatedMethod('get_document')
 
364
 
 
365
    postlist_begin = makeTranslatedMethod('postlist_begin')
 
366
    postlist_end = makeTranslatedMethod('postlist_end')
 
367
    termlist_begin = makeTranslatedMethod('termlist_begin')
 
368
    termlist_end = makeTranslatedMethod('termlist_end')
 
369
    positionlist_begin = makeTranslatedMethod('positionlist_begin')
 
370
    positionlist_end = makeTranslatedMethod('positionlist_end')
 
371
    allterms_begin = makeTranslatedMethod('allterms_begin')
 
372
    allterms_end = makeTranslatedMethod('allterms_end')
 
373
 
 
374
 
 
375
 
 
376
def makeProtectedDBMethod(method, setupDB = True):
 
377
    def protectedMethod(self, *args, **kwargs):
 
378
        if setupDB:
 
379
            self.setupDB()
 
380
        try:
 
381
            return method(self, *args, **kwargs)
 
382
##         # test that this works and doesn't recurse indefinitely
 
383
##         except DatabaseModifiedError:
 
384
##             self.reopen()
 
385
##             return protectedMethod(self, *args, **kwargs)
 
386
        except XapianError, e:
 
387
            #log("error encountered while performing xapian index operation %s: %s"
 
388
            #    % (method.__name__, e))
 
389
            self.close()
 
390
            raise
 
391
    return protectedMethod
 
392
 
 
393
 
 
394
# there are lots of places below where we write code like:
 
395
# enq = mset = None
 
396
# try:
 
397
#     enq = self.enquire(foo)
 
398
#     mset = enq.get_mset(0, 10)
 
399
#     return mset[0][flimflam]
 
400
# except:
 
401
#     del enq, mset
 
402
#     raise
 
403
 
 
404
# the purpose of this code is to ensure that no references to enquire
 
405
# objects or msets will outlive the function call. msets and enquire
 
406
# objsects hold a reference to the xapian db, and thus prevent it from
 
407
# being properly gc'd. if we fail to delete enq and mset on exception,
 
408
# then they can be kept around for arbitrarily long periods of time as
 
409
# part of the exception state
 
410
 
 
411
 
 
412
# be extremely careful about keeping a db object in local scope;
 
413
# once its there, an unhandled exception could create a traceback
 
414
# containing a frame object that holds a copy of the locals dict,
 
415
# including the db object. if that frame/traceback object is kept
 
416
# around forever (which parts of twisted/quotient seem to do,
 
417
# especially deferreds), then the db object will never be deleted
 
418
# and the indexer lock will never go away.
 
419
 
 
420
# in order to prevent that from happening, we maintain two invariants:
 
421
 
 
422
# 1. the db is only accessed as an instance attribute and is never
 
423
# copied into a local variable. i.e., we always say self.db and
 
424
# never ever say db = self.db. this keeps the db object from ever
 
425
# getting captured by a frame/traceback.
 
426
 
 
427
# 2. the db is only accessed from within an exception handler that
 
428
# calls self.close() in the event of *any* failure. this ensures
 
429
# that the instance loses all references to the db on failure, so,
 
430
# even if the instance object is captured by a frame object (or
 
431
# something else), the db will already have been freed.
 
432
 
 
433
 
 
434
class ReadOnlyIndex:
 
435
    """
 
436
    I represent a Xapian index that is read only by wrapping the
 
437
    xapian.Database class. Because I provide read only access, I can be
 
438
    used to combine several Xapian indices into one index with
 
439
    performance only slightly lower than when using only one index.
 
440
 
 
441
    @cvar DEFAULT_QUERY_COMBINER_OP: the operation used by the query parser to combine query terms
 
442
 
 
443
    @cvar STEMMING_LANGUAGE: the language used by the query parser for
 
444
    stemming. this is of little use since Xapwrap does not yet support
 
445
    stemming when indexing.
 
446
 
 
447
    @ivar names: a sequence of file names representing paths to Xapian
 
448
    indices
 
449
 
 
450
    Please use the configure method to modify C{prefixMap} and C{indexValueMap}
 
451
 
 
452
    @ivar prefixMap: a map of prefixes used by named fields in the index
 
453
    and the name they should be referred to by the query parser
 
454
 
 
455
    @ivar indexValueMap: a map from sort field names to value integer
 
456
 
 
457
    @ivar amountIndexedSinceLastFlush: the number of bytes indexed since
 
458
    the last flush
 
459
 
 
460
    The following instance attributes should never be modified or
 
461
    accessed directly:
 
462
 
 
463
    @ivar db: the xapian index object
 
464
    @ivar qp: the xapian query parser object
 
465
    @ivar _searchSessions: a map from query description string to
 
466
    (enquire, lastIndexSortedBy)
 
467
    """
 
468
 
 
469
    DEFAULT_QUERY_COMBINER_OP = xapian.Query.OP_AND
 
470
    STEMMING_LANGUAGE = 'none'
 
471
 
 
472
    def __init__(self, *names):
 
473
        if len(names) < 1:
 
474
            raise ValueError("No index directory supplied to Index constructor")
 
475
        self.names = names
 
476
        self.db = None
 
477
        self.qp = None
 
478
        self._searchSessions = {}
 
479
        self.prefixMap = {}
 
480
        self.indexValueMap = {}
 
481
        self.amountIndexedSinceLastFlush = 0
 
482
 
 
483
    def setupDB(self):
 
484
        # we hide the db so that methods always access it only through
 
485
        # this method since db objects can be silently reaped when not
 
486
        # in use. db objects consume 5 file descriptors.
 
487
        
 
488
        if self.db is None:
 
489
            self._setupDB()
 
490
 
 
491
            self.qp = xapian.QueryParser()
 
492
            # this is vital: these options specify no language for
 
493
            # stemming (""), disable stemming (False), and specify an
 
494
            # empty stop word object (None). we need this because by
 
495
            # default, xapian's query parser does english stemming
 
496
            s = xapian.Stem(self.STEMMING_LANGUAGE)
 
497
            self.qp.set_stemmer(s)
 
498
 
 
499
            # we want query terms to be ANDed together by default
 
500
            self.qp.set_default_op(self.DEFAULT_QUERY_COMBINER_OP)
 
501
            self._configure()
 
502
        
 
503
            log("Index %s contains %s documents" %
 
504
                (self.names, self.get_doccount()))
 
505
 
 
506
    def _setupDB(self):
 
507
        self.db = ExceptionTranslater.openIndex(True, *self.names)
 
508
 
 
509
    def close(self):
 
510
        log("closing xapian index %s" % self.names)
 
511
        for query in self._searchSessions.keys():
 
512
            del self._searchSessions[query]
 
513
        self.qp = None
 
514
        self.db = None
 
515
 
 
516
    def _configure(self):
 
517
        if 'uid' not in self.indexValueMap:
 
518
            # this a gross hack...
 
519
            self.indexValueMap['uid'] = 0
 
520
            self.indexValueMap['uidREV'] = 1
 
521
        if self.qp is not None:
 
522
            for k, v in self.prefixMap.iteritems():
 
523
                # check for unicode encoding?
 
524
                if v:
 
525
                    V = v.upper()
 
526
                else:
 
527
                    V = k.upper()
 
528
                self.qp.set_prefix(k, V)
 
529
 
 
530
    def configure(self, prefixMap = None, indexValueMap = None):
 
531
        if prefixMap is not None:
 
532
            self.prefixMap = prefixMap
 
533
        if indexValueMap is not None:
 
534
            self.indexValueMap = indexValueMap
 
535
        self._configure()
 
536
 
 
537
    def get_doccount(self):
 
538
        return self.db.get_doccount()
 
539
    get_doccount = makeProtectedDBMethod(get_doccount)
 
540
 
 
541
    def enquire(self, query):
 
542
        searchSession = None
 
543
        try:
 
544
            searchSession = xapian.Enquire(self.db.db)
 
545
            searchSession.set_query(query)
 
546
            return searchSession
 
547
        except:
 
548
            del query, searchSession
 
549
            raise
 
550
    enquire = makeProtectedDBMethod(enquire)
 
551
 
 
552
    def flush(self):
 
553
        if self.db is not None:
 
554
            self.db.flush()
 
555
            self.amountIndexedSinceLastFlush = 0
 
556
    flush = makeProtectedDBMethod(flush)
 
557
 
 
558
    def search(self, query,
 
559
               sortKey = None,
 
560
               startingIndex = 0,
 
561
               batchSize = MAX_DOCS_TO_RETURN,
 
562
               sortIndex = None, sortAscending = True,
 
563
               sortByRelevence = False,
 
564
               valuesWanted=None):
 
565
 
 
566
        self.setupDB()
 
567
        if isinstance(query, str):
 
568
            query = ParsedQuery(query)
 
569
        elif not(isinstance(query, Query)):
 
570
            raise ValueError("query %s must be either a string or a "
 
571
                             "subclass of xapwrap.Query" % query)
 
572
 
 
573
        q = query.prepare(self.qp)
 
574
        # uggg. this mess is due to the fact that xapain Query objects
 
575
        # don't hash in a sane way.
 
576
        qString = q.get_description()
 
577
 
 
578
        # the only thing we use sortKey for is to set sort index
 
579
        if sortKey is not None:
 
580
            sortIndex = self.indexValueMap[sortKey]
 
581
 
 
582
        # once you call set_sorting on an Enquire instance, there is no
 
583
        # way to resort it by relevence, so we have to open a new
 
584
        # session instead.
 
585
 
 
586
        # ignore sortAscending since there's no easy way to implement
 
587
        # ascending relevency sorts and its tough to imagine a case
 
588
        # where you'd want to see the worst results. in any event, the
 
589
        # user can always sort by relevency and go to the last page of
 
590
        # results.
 
591
 
 
592
        enq = mset = None
 
593
        if qString not in self._searchSessions:
 
594
            self._searchSessions[qString] = (self.enquire(q), None)
 
595
        try:
 
596
            enq, lastIndexSortedBy = self._searchSessions[qString]
 
597
 
 
598
            # if we don't set sortIndex, the results will be returned
 
599
            # sorted by relevance, assuming that we have never called
 
600
            # set_sorting on this session
 
601
            if sortByRelevence and (lastIndexSortedBy is not None):
 
602
                sortIndex = sortKey = None
 
603
                if lastIndexSortedBy is not None:
 
604
                    del self._searchSessions[qString]
 
605
                    self._searchSessions[qString] = (self.enquire(q), None)
 
606
                    enq, lastIndexSortedBy = self._searchSessions[qString]
 
607
            if sortIndex is not None:
 
608
                # It seems that we have the opposite definition of sort ascending
 
609
                # than Xapian so we invert the ascending flag!
 
610
                enq.set_sort_by_value(sortIndex, not sortAscending)
 
611
                
 
612
 
 
613
            self._searchSessions[qString] = (enq, sortIndex)
 
614
 
 
615
            mset = enq.get_mset(startingIndex, batchSize)
 
616
            results = []
 
617
            for m in mset:
 
618
                thisResult = {}
 
619
                thisResult['uid']=m[xapian.MSET_DID]
 
620
                thisResult['score']=m[xapian.MSET_PERCENT]
 
621
                if valuesWanted:
 
622
                    xapDoc = m[4]
 
623
                    valRes = {}
 
624
                    for valName in valuesWanted:
 
625
                        valueIndex = self.indexValueMap.get(valName, None)
 
626
                        if valueIndex is None:
 
627
                            raise NoIndexValueFound(valName, self.indexValueMap)
 
628
                        valRes[valName]=xapDoc.get_value(valueIndex)
 
629
                    thisResult['values']=valRes
 
630
                results.append(thisResult)
 
631
            return results
 
632
        except:
 
633
            del enq, mset
 
634
            raise
 
635
    search = makeProtectedDBMethod(search)
 
636
 
 
637
    def count(self, query):
 
638
        enq = mset = None
 
639
        try:
 
640
            enq = self.enquire(query)
 
641
            # get_matches_estimated does not return accurate results if
 
642
            # given a small ending number like 0 or 1
 
643
            mset = enq.get_mset(0, MAX_DOCS_TO_RETURN)
 
644
            sizeEstimate = mset.get_matches_estimated()
 
645
            return sizeEstimate, self.get_doccount()
 
646
        except:
 
647
            del enq, mset
 
648
            raise
 
649
    count = makeProtectedDBMethod(count)
 
650
 
 
651
    def checkIndex(self, maxID):
 
652
        """Compute a list of all UIDs less than or equal to maxID that
 
653
        are not in the db.
 
654
        """
 
655
        # I had originally suspected that the performance hit of
 
656
        # returning a huge list in the case of empty indexes would be
 
657
        # substantial, but testing with a 120,000 msg index indicates
 
658
        # that performance is fine and that the space overhead is quite
 
659
        # reasonable. If that were not the case, this could be optimized
 
660
        # by calculating the maximum document ID in the index and only
 
661
        # scanning up to the minimum of maxID and the max ID in the
 
662
        # index, assuming that were using the same document IDs in the
 
663
        # index as in atop.
 
664
 
 
665
        missingUIDs = []
 
666
        for uid in xrange(maxID + 1):
 
667
            term = makePairForWrite('UID', str(uid))
 
668
            if not self.db.term_exists(term):
 
669
                missingUIDs.append(uid)
 
670
        return missingUIDs
 
671
    checkIndex = makeProtectedDBMethod(checkIndex)
 
672
 
 
673
    def get_documents(self, uid):
 
674
        # return a list of remapped UIDs corresponding to the actual UID given
 
675
        docTerm = makePairForWrite('UID', str(uid))
 
676
        candidates = self.search(RawQuery(docTerm))
 
677
        return [int(c['uid']) for c in candidates]
 
678
 
 
679
    def get_document(self, uid):
 
680
        # we cannot simply use db.get_document since doc ids get
 
681
        # remapped when combining multiple databases
 
682
        candidates = self.get_documents(uid)
 
683
        if len(candidates) == 0:
 
684
            raise DocNotFoundError(uid)
 
685
        elif len(candidates) == 1:
 
686
            return self._get_document(candidates[0])
 
687
        else:
 
688
            raise InconsistantIndex(
 
689
                "Something has gone horribly wrong. I tried "
 
690
                "retrieving document id %s but found %i documents "
 
691
                "with that document ID term" % (uid, len(candidates)))
 
692
 
 
693
    def _get_document(self, uid):
 
694
        assert isinstance(uid, int)
 
695
        return self.db.get_document(uid)
 
696
    _get_document = makeProtectedDBMethod(_get_document)
 
697
 
 
698
 
 
699
    def term_exists(self, term):
 
700
        assert isinstance(term, str)
 
701
        return self.db.term_exists(term)
 
702
    term_exists = makeProtectedDBMethod(term_exists)
 
703
 
 
704
    def get_lastdocid(self):
 
705
        return self.db.get_lastdocid()
 
706
    get_lastdocid = makeProtectedDBMethod(get_lastdocid)
 
707
 
 
708
# XXX FIXME: we should consider deleting all searchSessions whenever we
 
709
# add a document, or we should reopen the db
 
710
 
 
711
 
 
712
class Index(ReadOnlyIndex):
 
713
 
 
714
    def __init__(self, name, create = False, analyzer = None):
 
715
        # XXX FIXME: we should really try opening the db here, so that
 
716
        # any errors are caught immediately rather than waiting for the
 
717
        # first time we try to do something...
 
718
        ReadOnlyIndex.__init__(self, name)
 
719
        self.name = name
 
720
        if create:
 
721
            self.flags = xapian.DB_CREATE_OR_OPEN
 
722
        else:
 
723
            self.flags = xapian.DB_OPEN
 
724
        self.analyzer = analyzer or StandardAnalyzer()
 
725
        self.lockFile = FilesystemLock(
 
726
            os.path.join(self.name, XAPWRAP_LOCK_FILENAME))
 
727
 
 
728
    def _setupDB(self):
 
729
        "really get a xapian database object"
 
730
 
 
731
        # xapian expects directories! self.name should refer to a
 
732
        # directory. if it doesn't exist, we'll make one.
 
733
        if not(os.path.exists(self.name)):
 
734
            os.mkdir(self.name)
 
735
 
 
736
        # try to acquire a lock file
 
737
        if not self.lockFile.lock():
 
738
            owningPid = os.readlink(self.lockFile.name)
 
739
            errorMsg = ("cannot acquire lock file for xapian index %s"
 
740
                        "because it is owned by process %s" %
 
741
                        (self.name, owningPid))
 
742
            log(errorMsg)
 
743
            raise DatabaseLockError(errorMsg)
 
744
        xapLockFilePath = os.path.join(self.name, XAPIAN_LOCK_FILENAME)
 
745
        if os.path.exists(xapLockFilePath):
 
746
            log("Stale database lock found in ",
 
747
                xapLockFilePath, ". Deleting it now.")
 
748
            os.remove(xapLockFilePath)
 
749
 
 
750
        # actually try to open a xapian DB
 
751
        try:
 
752
            try:
 
753
                self.db = ExceptionTranslater.openIndex(False, self.name, self.flags)
 
754
            except DatabaseCorruptionError, e:
 
755
                # the index is trashed, so there's no harm in blowing it
 
756
                # away and starting from scratch
 
757
                log("Xapian index at %s is corrupted and will be destroyed"
 
758
                    % self.name)
 
759
                if self.lockFile.locked:
 
760
                    self.lockFile.unlock()
 
761
                for idxFname in glob.glob(os.path.join(self.name, '*')):
 
762
                    os.remove(idxFname)
 
763
                self.db = ExceptionTranslater.openIndex(False, self.name, self.flags)
 
764
        finally:
 
765
            if (self.db is None) and self.lockFile.locked:
 
766
                self.lockFile.unlock()
 
767
 
 
768
    def __del__(self):
 
769
        self.close()
 
770
 
 
771
    def close(self):
 
772
        # this is important! the only way to get xapian to release the
 
773
        # db lock is to call the db object's destructor. that won't
 
774
        # happen until nobody is holding a reference to the db
 
775
        # object. unfortunately, the query parser holds a reference to
 
776
        # it, so the query parser must also go away. do not hold
 
777
        # references to these objects anywhere but here.
 
778
 
 
779
        # enquire objects and mset objects hold a reference to the db,
 
780
        # so if any of them are left alive, the db will not be reclaimed
 
781
 
 
782
        if self.db is not None:
 
783
            ReadOnlyIndex.close(self)
 
784
            # the islink test is needed in case the index directory has
 
785
            # been deleted before we close was called.
 
786
            if self.lockFile.locked and os.path.islink(self.lockFile.name):
 
787
                self.lockFile.unlock()
 
788
            # there is no point in checking if the lock file is still
 
789
            # around right here: it will only be deleted when xapian's
 
790
            # destructor runs, but python defers running destructors
 
791
            # until after exception handling is complete. since this
 
792
            # code will often get called from an exception handler, we
 
793
            # have to assume that the lock file's removal will be
 
794
            # delayed at least until after this method exits
 
795
 
 
796
    def get_document(self, uid):
 
797
        return self._get_document(uid)
 
798
 
 
799
    # methods that modify db state
 
800
 
 
801
    def index(self, doc):
 
802
        self.setupDB()
 
803
        if hasattr(doc, 'uid') and doc.uid:
 
804
            uid = int(doc.uid)
 
805
            doc.sortFields.append(SortKey('uid', uid))
 
806
            doc.keywords.append(Keyword('uid', str(uid)))
 
807
            xapDoc = doc.toXapianDocument(self.indexValueMap)
 
808
            self.replace_document(uid, xapDoc)
 
809
        else:
 
810
            # We need to know the uid of the doc we're going to add
 
811
            # before we add it so we can setup appropriate uid sorting
 
812
            # values. But, another thread could potentially insert a
 
813
            # document at that uid after we determine the last uid, but
 
814
            # before we manage the insertion. Yay race conditions! So we
 
815
            # try to add the document and then check that it ended up at
 
816
            # the right uid. If it did not, we update it with the
 
817
            # correct uid sort values.
 
818
            uid = self.get_lastdocid() + 1
 
819
            doc.sortFields.append(SortKey('uid', uid))
 
820
            doc.keywords.append(Keyword('uid', str(uid)))
 
821
            xapDoc = doc.toXapianDocument(self.indexValueMap)
 
822
            newUID = self.add_document(xapDoc)
 
823
            if newUID != uid:
 
824
                doc.sortFields.append(SortKey('uid', newUID))
 
825
                doc.keywords.append(Keyword('uid', str(newUID)))
 
826
                xapDoc = doc.toXapianDocument(self.indexValueMap)
 
827
                self.replace_document(newUID, xapDoc)
 
828
 
 
829
            # a simpler alternative would be to add an empty document
 
830
            # and then replace it. the problem with that strategy is
 
831
            # that it kills performance since xapian performs an
 
832
            # implicit flush when you replace a document that was added
 
833
            # but not yet committed to disk.
 
834
 
 
835
        self.amountIndexedSinceLastFlush += len(doc)
 
836
        if self.amountIndexedSinceLastFlush > MAX_DATA_INDEXED_BETWEEN_FLUSHES:
 
837
            self.flush()
 
838
        return uid
 
839
 
 
840
    def add_document(self, doc):
 
841
        return self.db.add_document(doc)
 
842
    add_document = makeProtectedDBMethod(add_document)
 
843
 
 
844
    def replace_document(self, uid, doc):
 
845
        return self.db.replace_document(uid, doc)
 
846
    replace_document = makeProtectedDBMethod(replace_document)
 
847
 
 
848
    def delete_document(self, docID):
 
849
        return self.db.delete_document(docID)
 
850
    delete_document = makeProtectedDBMethod(delete_document)
 
851
 
 
852
class Query:
 
853
    pass
 
854
 
 
855
class ParsedQuery(Query):
 
856
    def __init__(self, queryString):
 
857
        self.queryString = queryString
 
858
 
 
859
    def prepare(self, queryParser):
 
860
        return queryParser.parse_query(self.queryString)
 
861
 
 
862
class RawQuery(Query):
 
863
    def __init__(self, queryString):
 
864
        assert isinstance(queryString, str)
 
865
        self.queryString = queryString
 
866
 
 
867
    def prepare(self, queryParser):
 
868
        return xapian.Query(self.queryString)
 
869
 
 
870
 
 
871
 
 
872
class SmartIndex(Index):
 
873
    documentFactory = Document
 
874
 
 
875
    def __init__(self, *args, **kwargs):
 
876
        Index.__init__(self, *args, **kwargs)
 
877
        self.fetchState()
 
878
 
 
879
    def saveState(self):
 
880
        self.setupDB()
 
881
        state = {'indexValueMap': self.indexValueMap,
 
882
                 'prefixMap': self.prefixMap}
 
883
        d = self.documentFactory(uid = 1, data = state)
 
884
        self.index(d, checkID = False)
 
885
        self.flush()
 
886
 
 
887
    def fetchState(self):
 
888
        self.setupDB()
 
889
        if self.get_doccount() == 0:
 
890
            # Don't rely on the try:except: for this case
 
891
            self.saveState()
 
892
        try:
 
893
            doc = self.get_document(1)
 
894
        except DocNotFoundError:
 
895
            newState = {'indexValueMap': {}, 'prefixMap': {}}
 
896
            self.saveState()
 
897
        else:
 
898
            dataStr = doc.get_data()
 
899
            newState = cPickle.loads(dataStr)
 
900
        self.indexValueMap.update(newState['indexValueMap'])
 
901
        self.prefixMap.update(newState['prefixMap'])
 
902
 
 
903
    def index(self, doc, checkID = True):
 
904
        if hasattr(doc, 'uid') and (doc.uid == 1) and checkID:
 
905
            raise InvalidArgumentError(
 
906
                "document UIDs must be greater than one when using SmartIndex")
 
907
 
 
908
        docSortKeys = sets.Set([sk.name for sk in doc.sortFields if sk.name is not None])
 
909
        indexSortKeys = sets.Set(self.indexValueMap.keys())
 
910
        if not docSortKeys.issubset(indexSortKeys):
 
911
            nextValueIndex = 1 + max(self.indexValueMap.itervalues())
 
912
            # we sort the sortKeys in order to improve the odds that two
 
913
            # indices that are indexed with the same documents in the
 
914
            # same order will always end up with the same
 
915
            # indexValueMaps, even if different versions of python are
 
916
            # used with different hash functions
 
917
            sortKeys = list(docSortKeys)
 
918
            sortKeys.sort()
 
919
            for sortKey in sortKeys:
 
920
                if sortKey not in self.indexValueMap:
 
921
                    assert nextValueIndex % 2 == 0
 
922
                    self.indexValueMap[sortKey] = nextValueIndex
 
923
                    self.indexValueMap[sortKey + 'REV'] = nextValueIndex + 1
 
924
                    nextValueIndex += 2
 
925
            self.saveState()
 
926
 
 
927
        docKeywords = sets.Set([tf.name for tf in doc.textFields if tf.prefix] +
 
928
                               [kw.name for kw in doc.keywords])
 
929
        indexKeyWords = sets.Set(self.prefixMap.keys())
 
930
        if not docKeywords.issubset(indexKeyWords):
 
931
            for k in docKeywords - indexKeyWords:
 
932
                self.prefixMap[k] = k.upper()
 
933
            self.saveState()
 
934
 
 
935
        return Index.index(self, doc)
 
936
 
 
937
 
 
938
class SmartReadOnlyIndex(ReadOnlyIndex):
 
939
 
 
940
    def __init__(self, *args, **kwargs):
 
941
        ReadOnlyIndex.__init__(self, *args, **kwargs)
 
942
        self.fetchState()
 
943
 
 
944
    def fetchState(self):
 
945
        stateDocIDs = self.get_documents(1)
 
946
        stateDocs = map(self._get_document, stateDocIDs)
 
947
        states = [cPickle.loads(s.get_data()) for s in stateDocs]
 
948
 
 
949
        # should we issue a warning when the number of states that we
 
950
        # retrieve is less than the number of indices we opened? the
 
951
        # only problem is that some indices may be empty, but there's no
 
952
        # easy way to check how many documents are in a subindex without
 
953
        # opening it explicitly using xapian.Database and that seems
 
954
        # rather expensive for this code path.
 
955
 
 
956
        # merge all the states into a master state
 
957
        master = {'prefixMap': self.prefixMap,
 
958
                  'indexValueMap': self.indexValueMap}
 
959
        # note that if there are conflicts, there is no garuntee on who
 
960
        # will win, but it doesn't matter since we'll die on conflicts
 
961
        # later anyway
 
962
        for s in states:
 
963
            for substate in ('prefixMap', 'indexValueMap'):
 
964
                sub = s.get(substate, {})
 
965
                mSub = master[substate]
 
966
                for k, v in sub.iteritems():
 
967
                    mSub[k] = v
 
968
 
 
969
        # ensure that states are compatible (check for conflicts)
 
970
        conflicts = []
 
971
        for s in states:
 
972
            for substate in ('prefixMap', 'indexValueMap'):
 
973
                sub = s.get(substate, {})
 
974
                mSub = master[substate]
 
975
                for k, v in sub.iteritems():
 
976
                    if k in mSub and (mSub[k] != v):
 
977
                        # we defer error reporting so that the user sees
 
978
                        # as much info on the error as possible
 
979
                        conflicts.append((substate, k, v, mSub[k]))
 
980
 
 
981
        # the only way states can be incompatible is if two states have
 
982
        # different values for the same keys in the same substate
 
983
 
 
984
        if conflicts:
 
985
            raise InconsistantIndexCombination(
 
986
                "The SmartReadOnlyIndex opened on %s cannot recconcile "
 
987
                "the following conflicts in the subindices' states:\n%s"
 
988
                % (self.names,
 
989
                   '\n'.join(["%s[%r] is %r in one index but %r in another"
 
990
                              % c for c in conflicts])))
 
991
 
 
992
        self.prefixMap = master['prefixMap']
 
993
        self.indexValueMap = master['indexValueMap']
 
994
 
 
995
    def search(self, query, sortKey = None,
 
996
               startingIndex = 0,
 
997
               batchSize = MAX_DOCS_TO_RETURN,
 
998
               sortIndex = None, sortAscending = True,
 
999
               sortByRelevence = False):
 
1000
        # if the appropriate index value string is not in
 
1001
        # self.indexValueMap, fetchState() before calling
 
1002
        # ReadOnlyIndex.search. if it still isn't there, let
 
1003
        # ReadOnlyIndex.search take care of throwing an error
 
1004
        if (sortKey is not None) and (sortKey not in self.indexValueMap):
 
1005
            self.fetchState()
 
1006
        return ReadOnlyIndex.search(self, query, sortKey,
 
1007
                                    startingIndex, batchSize,
 
1008
                                    sortIndex, sortAscending,
 
1009
                                    sortByRelevence)