15
15
from MoinMoin import log
16
16
logging = log.getLogger(__name__)
18
from MoinMoin import wikiutil, config, caching
18
from MoinMoin import wikiutil, config
19
19
from MoinMoin.Page import Page
20
from MoinMoin.search.results import getSearchResults, Match, TextMatch, TitleMatch, getSearchResults
20
from MoinMoin.util import lock, filesys
21
from MoinMoin.search.results import getSearchResults
22
from MoinMoin.search.queryparser import Match, TextMatch, TitleMatch
22
24
##############################################################################
23
25
# Search Engine Abstraction
24
26
##############################################################################
27
class IndexerQueue(object):
29
Represents a locked on-disk queue with jobs for the xapian indexer
31
Each job is a tuple like: (PAGENAME, ATTACHMENTNAME, REVNO)
32
PAGENAME: page name (unicode)
33
ATTACHMENTNAME: attachment name (unicode) or None (for pages)
34
REVNO: revision number (int) - meaning "look at that revision",
35
or None - meaning "look at all revisions"
38
def __init__(self, request, xapian_dir, queuename, timeout=10.0):
40
@param request: request object
41
@param xapian_dir: the xapian main directory
42
@param queuename: name of the queue (used for caching key)
43
@param timeout: lock acquire timeout
45
self.request = request
46
self.xapian_dir = xapian_dir
47
self.queuename = queuename
48
self.timeout = timeout
50
def get_cache(self, locking):
51
return caching.CacheEntry(self.request, self.xapian_dir, self.queuename,
52
scope='dir', use_pickle=True, do_locking=locking)
54
def _queue(self, cache):
56
queue = cache.content()
57
except caching.CacheError:
58
# likely nothing there yet
62
def put(self, pagename, attachmentname=None, revno=None):
63
""" Put an entry into the queue (append at end)
65
@param pagename: page name [unicode]
66
@param attachmentname: attachment name [unicode]
67
@param revno: revision number (int) or None (all revs)
69
cache = self.get_cache(locking=False) # we lock manually
72
queue = self._queue(cache)
73
entry = (pagename, attachmentname, revno)
80
""" Get (and remove) first entry from the queue
82
Raises IndexError if queue was empty when calling get().
84
cache = self.get_cache(locking=False) # we lock manually
87
queue = self._queue(cache)
95
class BaseIndex(object):
29
""" Represents a locked page queue on the disk
31
XXX: check whether we just can use the caching module
34
def __init__(self, f, lock_dir):
36
@param f: file to write to
37
@param lock_dir: directory to save the lock files
40
self.writeLock = lock.WriteLock(lock_dir, timeout=10.0)
41
self.readLock = lock.ReadLock(lock_dir, timeout=10.0)
44
""" Checks if the queue exists on the filesystem """
45
return os.path.exists(self.file)
47
def append(self, pagename):
48
""" Append a page to queue
50
@param pagename: string to save
52
if not self.writeLock.acquire(60.0):
53
logging.warning("can't add %r to xapian update queue: can't lock queue" % pagename)
56
f = codecs.open(self.file, 'a', config.charset)
58
f.write(pagename + "\n")
62
self.writeLock.release()
65
""" Return list of pages in the queue """
66
if self.readLock.acquire(1.0):
68
return self._decode(self._read())
70
self.readLock.release()
73
def remove(self, pages):
74
""" Remove pages from the queue
76
When the queue is empty, the queue file is removed, so exists()
77
can tell if there is something waiting in the queue.
79
@param pages: list of pagenames to remove
81
if self.writeLock.acquire(30.0):
83
queue = self._decode(self._read())
95
self.writeLock.release()
98
# Private -------------------------------------------------------
100
def _decode(self, data):
101
""" Decode queue data
103
@param data: the data to decode
105
pages = data.splitlines()
106
return self._filterDuplicates(pages)
108
def _filterDuplicates(self, pages):
109
""" Filter duplicates in page list, keeping the order
111
@param pages: list of pages to filter
122
""" Read and return queue data
124
This does not do anything with the data so we can release the
125
lock as soon as possible, enabling others to update the queue.
128
f = codecs.open(self.file, 'r', config.charset)
133
except (OSError, IOError), err:
134
if err.errno != errno.ENOENT:
138
def _write(self, pages):
139
""" Write pages to queue file
141
Requires queue write locking.
143
@param pages: list of pages to write
145
# XXX use tmpfile/move for atomic replace on real operating systems
146
data = '\n'.join(pages) + '\n'
147
f = codecs.open(self.file, 'w', config.charset)
153
def _removeFile(self):
154
""" Remove queue file
156
Requires queue write locking.
161
if err.errno != errno.ENOENT:
96
166
""" Represents a search engine index """
168
class LockedException(Exception):
98
171
def __init__(self, request):
100
173
@param request: current request
102
175
self.request = request
103
self.main_dir = self._main_dir()
104
if not os.path.exists(self.main_dir):
105
os.makedirs(self.main_dir)
106
self.update_queue = IndexerQueue(request, self.main_dir, 'indexer-queue')
176
main_dir = self._main_dir()
177
self.dir = os.path.join(main_dir, 'index')
178
if not os.path.exists(self.dir):
179
os.makedirs(self.dir)
180
self.sig_file = os.path.join(main_dir, 'complete')
181
lock_dir = os.path.join(main_dir, 'index-lock')
182
self.lock = lock.WriteLock(lock_dir, timeout=3600.0, readlocktimeout=60.0)
183
#self.read_lock = lock.ReadLock(lock_dir, timeout=3600.0)
184
self.update_queue = UpdateQueue(os.path.join(main_dir, 'update-queue'),
185
os.path.join(main_dir, 'update-queue-lock'))
186
self.remove_queue = UpdateQueue(os.path.join(main_dir, 'remove-queue'),
187
os.path.join(main_dir, 'remove-queue-lock'))
189
# Disabled until we have a sane way to build the index with a
190
# queue in small steps.
191
## if not self.exists():
192
## self.indexPagesInNewThread(request)
108
194
def _main_dir(self):
109
195
raise NotImplemented('...')
111
197
def exists(self):
112
198
""" Check if index exists """
113
raise NotImplemented('...')
199
return os.path.exists(self.sig_file)
116
202
""" Modification time of the index """
117
raise NotImplemented('...')
203
return os.path.getmtime(self.dir)
120
206
""" Touch the index """
121
raise NotImplemented('...')
207
filesys.touch(self.dir)
123
209
def _search(self, query):
124
""" Actually perfom the search
210
""" Actually perfom the search (read-lock acquired)
126
212
@param query: the search query objects tree
133
219
@param query: the search query objects to pass to the index
135
return self._search(query, **kw)
221
#if not self.read_lock.acquire(1.0):
222
# raise self.LockedException
224
hits = self._search(query, **kw)
226
# self.read_lock.release()
137
def update_item(self, pagename, attachmentname=None, revno=None, now=True):
138
""" Update a single item (page or attachment) in the index
229
def update_page(self, pagename, now=1):
230
""" Update a single page in the index
140
232
@param pagename: the name of the page to update
141
@param attachmentname: the name of the attachment to update
142
@param revno: a specific revision number (int) or None (all revs)
143
@param now: do all updates now (default: True)
145
self.update_queue.put(pagename, attachmentname, revno)
147
self.do_queued_updates()
149
def indexPages(self, files=None, mode='update', pages=None):
150
""" Index pages (and files, if given)
152
@param files: iterator or list of files to index additionally
153
@param mode: set the mode of indexing the pages, either 'update' or 'add'
154
@param pages: list of pages to index, if not given, all pages are indexed
157
request = self._indexingRequest(self.request)
158
self._index_pages(request, files, mode, pages=pages)
159
logging.info("indexing completed successfully in %0.2f seconds." %
160
(time.time() - start))
162
def _index_pages(self, request, files=None, mode='update', pages=None):
233
@keyword now: do all updates now (default: 1)
235
self.update_queue.append(pagename)
237
self._do_queued_updates_InNewThread()
239
def remove_item(self, pagename, attachment=None, now=1):
240
""" Removes a page and all its revisions or a single attachment
242
@param pagename: name of the page to be removed
243
@keyword attachment: optional, only remove this attachment of the page
244
@keyword now: do all updates now (default: 1)
246
self.remove_queue.append('%s//%s' % (pagename, attachment or ''))
248
self._do_queued_updates_InNewThread()
250
def indexPages(self, files=None, mode='update'):
251
""" Index all pages (and files, if given)
253
Can be called only from a script. To index pages during a user
254
request, use indexPagesInNewThread.
255
@keyword files: iterator or list of files to index additionally
256
@keyword mode: set the mode of indexing the pages, either 'update', 'add' or 'rebuild'
258
if not self.lock.acquire(1.0):
259
logging.warning("can't index: can't acquire lock")
264
request = self._indexingRequest(self.request)
265
self._index_pages(request, files, mode)
266
logging.info("indexing completed successfully in %0.2f seconds." %
267
(time.time() - start))
272
def indexPagesInNewThread(self, files=None, mode='update'):
273
""" Index all pages in a new thread
275
Should be called from a user request. From a script, use indexPages.
277
# Prevent rebuilding the index just after it was finished
281
from threading import Thread
282
indexThread = Thread(target=self._index_pages, args=(files, mode))
283
indexThread.setDaemon(True)
285
# Join the index thread after current request finish, prevent
286
# Apache CGI from killing the process.
287
def joinDecorator(finish):
293
self.request.finish = joinDecorator(self.request.finish)
296
def _index_pages(self, request, files=None, mode='update'):
163
297
""" Index all pages (and all given files)
165
This should be called from indexPages only!
299
This should be called from indexPages or indexPagesInNewThread only!
301
This may take some time, depending on the size of the wiki and speed
304
When called in a new thread, lock is acquired before the call,
305
and this method must release it when it finishes or fails.
167
307
@param request: current request
168
@param files: iterator or list of files to index additionally
169
@param mode: set the mode of indexing the pages, either 'update' or 'add'
170
@param pages: list of pages to index, if not given, all pages are indexed
173
raise NotImplemented('...')
175
def do_queued_updates(self, amount=-1):
176
""" Perform updates in the queues
308
@keyword files: iterator or list of files to index additionally
309
@keyword mode: set the mode of indexing the pages, either 'update',
312
raise NotImplemented('...')
314
def _remove_item(self, writer, page, attachment=None):
315
""" Remove a page and all its revisions from the index or just
316
an attachment of that page
318
@param pagename: name of the page to remove
319
@keyword attachment: optionally, just remove this attachment
321
raise NotImplemented('...')
323
def _do_queued_updates_InNewThread(self):
324
""" do queued index updates in a new thread
326
Should be called from a user request. From a script, use indexPages.
328
if not self.lock.acquire(1.0):
329
logging.warning("can't index: can't acquire lock")
332
def lockedDecorator(f):
333
def func(*args, **kwargs):
335
return f(*args, **kwargs)
340
from threading import Thread
341
indexThread = Thread(
342
target=lockedDecorator(self._do_queued_updates),
343
args=(self._indexingRequest(self.request), ))
344
indexThread.setDaemon(True)
346
# Join the index thread after current request finish, prevent
347
# Apache CGI from killing the process.
348
def joinDecorator(finish):
354
self.request.finish = joinDecorator(self.request.finish)
360
def _do_queued_updates(self, request, amount=5):
361
""" Perform updates in the queues (read-lock acquired)
178
363
@param request: the current request
179
@keyword amount: how many updates to perform at once (default: -1 == all)
364
@keyword amount: how many updates to perform at once (default: 5)
181
366
raise NotImplemented('...')
259
455
""" Perform search and return results object """
261
456
start = time.time()
262
hits, estimated_hits = self._search()
457
if self.request.cfg.xapian_search:
458
hits = self._xapianSearch()
459
logging.debug("_xapianSearch found %d hits" % len(hits))
461
hits = self._moinSearch()
462
logging.debug("_moinSearch found %d hits" % len(hits))
264
464
# important - filter deleted pages or pages the user may not read!
265
465
if not self.filtered:
266
466
hits = self._filter(hits)
267
467
logging.debug("after filtering: %d hits" % len(hits))
269
return self._get_search_results(hits, start, estimated_hits)
275
Return list of tuples (wikiname, page object, attachment,
276
matches, revision) and estimated number of search results (if
277
there is no estimate, None should be returned).
279
The list may contain deleted pages or pages the user may not read.
281
raise NotImplementedError()
283
def _filter(self, hits):
285
Filter out deleted or acl protected pages
287
@param hits: list of hits
289
userMayRead = self.request.user.may.read
290
fs_rootpage = self.fs_rootpage + "/"
291
thiswiki = (self.request.cfg.interwikiname, 'Self')
292
filtered = [(wikiname, page, attachment, match, rev)
293
for wikiname, page, attachment, match, rev in hits
294
if (not wikiname in thiswiki or
295
page.exists() and userMayRead(page.page_name) or
296
page.page_name.startswith(fs_rootpage)) and
297
(not self.mtime or self.mtime <= page.mtime_usecs()/1000000)]
300
def _get_search_results(self, hits, start, estimated_hits):
301
return getSearchResults(self.request, self.query, hits, start, self.sort, estimated_hits)
303
def _get_match(self, page=None, uid=None):
469
# when xapian was used, we can estimate the numer of matches
470
# Note: hits can't be estimated by xapian with historysearch enabled
471
if not self.request.cfg.xapian_index_history and hasattr(self, '_xapianMset'):
472
_ = self.request.getText
473
mset = self._xapianMset
474
m_lower = mset.get_matches_lower_bound()
475
m_estimated = mset.get_matches_estimated()
476
m_upper = mset.get_matches_upper_bound()
477
estimated_hits = (m_estimated == m_upper and m_estimated == m_lower
478
and '' or _('about'), m_estimated)
480
estimated_hits = None
482
return getSearchResults(self.request, self.query, hits, start,
483
self.sort, estimated_hits)
485
# ----------------------------------------------------------------
488
def _xapianIndex(request):
489
""" Get the xapian index if possible
491
@param request: current request
494
from MoinMoin.search.Xapian import Index
495
index = Index(request)
502
_xapianIndex = staticmethod(_xapianIndex)
504
def _xapianSearch(self):
505
""" Search using Xapian
507
Get a list of pages using fast xapian search and
508
return moin search in those pages if needed.
510
clock = self.request.clock
512
index = self._xapianIndex(self.request)
514
if index and self.query.xapian_wanted():
515
clock.start('_xapianSearch')
517
from MoinMoin.support import xapwrap
519
clock.start('_xapianQuery')
520
query = self.query.xapian_term(self.request, index.allterms)
521
description = str(query)
522
logging.debug("_xapianSearch: query = %r" % description)
523
query = xapwrap.index.QObjQuery(query)
524
enq, mset, hits = index.search(query, sort=self.sort,
525
historysearch=self.historysearch)
526
clock.stop('_xapianQuery')
528
logging.debug("_xapianSearch: finds: %r" % hits)
530
""" decode dict values to unicode """
532
d[key] = d[key].decode(config.charset)
534
pages = [dict_decode(hit['values']) for hit in hits]
535
logging.debug("_xapianSearch: finds pages: %r" % pages)
537
self._xapianEnquire = enq
538
self._xapianMset = mset
539
self._xapianIndex = index
540
except BaseIndex.LockedException:
542
#except AttributeError:
546
# xapian handled the full query
547
if not self.query.xapian_need_postproc():
548
clock.start('_xapianProcess')
550
return self._getHits(hits, self._xapianMatch)
552
clock.stop('_xapianProcess')
554
clock.stop('_xapianSearch')
556
# we didn't use xapian in this request because we have no index,
557
# so we can just disable it until admin builds an index and
558
# restarts moin processes
559
self.request.cfg.xapian_search = 0
561
# some postprocessing by _moinSearch is required
562
return self._moinSearch(pages)
564
def _xapianMatchDecider(self, term, pos):
565
""" Returns correct Match object for a Xapian match
567
@param term: the term as string
568
@param pos: starting position of the match
570
if term[0] == 'S': # TitleMatch
571
return TitleMatch(start=pos, end=pos+len(term)-1)
572
else: # TextMatch (incl. headers)
573
return TextMatch(start=pos, end=pos+len(term))
575
def _xapianMatch(self, uid, page=None):
576
""" Get all relevant Xapian matches per document id
578
@param uid: the id of the document in the xapian index
581
term = self._xapianEnquire.get_matching_terms_begin(uid)
582
while term != self._xapianEnquire.get_matching_terms_end(uid):
583
term_name = term.get_term()
584
for pos in self._xapianIndex.termpositions(uid, term.get_term()):
585
if pos not in positions or \
586
len(positions[pos]) < len(term_name):
587
positions[pos] = term_name
589
matches = [self._xapianMatchDecider(term, pos) for pos, term
590
in positions.iteritems()]
593
return [Match()] # dummy for metadata, we got a match!
597
def _moinSearch(self, pages=None):
598
""" Search pages using moin's built-in full text search
600
Return list of tuples (page, match). The list may contain
601
deleted pages or pages the user may not read.
603
@keyword pages: optional list of pages to search in
605
self.request.clock.start('_moinSearch')
607
# if we are not called from _xapianSearch, we make a full pagelist,
608
# but don't search attachments (thus attachment name = '')
609
pages = [{'pagename': p, 'attachment': '', 'wikiname': 'Self', } for p in self._getPageList()]
610
hits = self._getHits(pages, self._moinMatch)
611
self.request.clock.stop('_moinSearch')
614
def _moinMatch(self, page, uid=None):
615
""" Get all matches from regular moinSearch
307
617
@param page: the current page instance
310
620
return self.query.search(page)
312
def _getHits(self, pages):
313
""" Get the hit tuples in pages through _get_match """
622
def _getHits(self, pages, matchSearchFunction):
623
""" Get the hit tuples in pages through matchSearchFunction """
314
624
logging.debug("_getHits searching in %d pages ..." % len(pages))
316
626
revisionCache = {}
317
627
fs_rootpage = self.fs_rootpage
318
628
for hit in pages:
321
wikiname = hit['wikiname']
322
pagename = hit['pagename']
323
attachment = hit['attachment']
324
revision = int(hit.get('revision', 0))
326
logging.debug("_getHits processing %r %r %d %r" % (wikiname, pagename, revision, attachment))
630
valuedict = hit['values']
636
wikiname = valuedict['wikiname']
637
pagename = valuedict['pagename']
638
attachment = valuedict['attachment']
639
logging.debug("_getHits processing %r %r %r" % (wikiname, pagename, attachment))
641
if 'revision' in valuedict and valuedict['revision']:
642
revision = int(valuedict['revision'])
328
646
if wikiname in (self.request.cfg.interwikiname, 'Self'): # THIS wiki
329
647
page = Page(self.request, pagename, rev=revision)
331
648
if not self.historysearch and revision:
332
649
revlist = page.getRevList()
333
650
# revlist can be empty if page was nuked/renamed since it was included in xapian index
334
651
if not revlist or revlist[0] != revision:
335
652
# nothing there at all or not the current revision
336
logging.debug("no history search, skipping non-current revision...")
340
# revision currently is 0 ever
341
655
if pagename == fs_rootpage: # not really an attachment
342
656
page = Page(self.request, "%s/%s" % (fs_rootpage, attachment))
343
hits.append((wikiname, page, None, None, revision))
657
hits.append((wikiname, page, None, None))
345
matches = self._get_match(page=None, uid=uid)
346
hits.append((wikiname, page, attachment, matches, revision))
659
matches = matchSearchFunction(page=None, uid=uid)
660
hits.append((wikiname, page, attachment, matches))
348
matches = self._get_match(page=page, uid=uid)
349
logging.debug("self._get_match %r" % matches)
662
matches = matchSearchFunction(page=page, uid=uid)
663
logging.debug("matchSearchFunction %r returned %r" % (matchSearchFunction, matches))
351
if not self.historysearch and pagename in revisionCache and revisionCache[pagename][0] < revision:
665
if not self.historysearch and \
666
pagename in revisionCache and \
667
revisionCache[pagename][0] < revision:
352
668
hits.remove(revisionCache[pagename][1])
353
669
del revisionCache[pagename]
354
hits.append((wikiname, page, attachment, matches, revision))
670
hits.append((wikiname, page, attachment, matches))
355
671
revisionCache[pagename] = (revision, hits[-1])
357
672
else: # other wiki
358
673
hits.append((wikiname, pagename, attachment, None, revision))
359
logging.debug("_getHits returning %r." % hits)
363
class MoinSearch(BaseSearch):
365
def __init__(self, request, query, sort='weight', mtime=None, historysearch=0, pages=None):
366
super(MoinSearch, self).__init__(request, query, sort, mtime, historysearch)
372
Search pages using moin's built-in full text search
374
The list may contain deleted pages or pages the user may not
377
if self.pages is not None, searches in that pages.
379
self.request.clock.start('_moinSearch')
381
# if self.pages is none, we make a full pagelist, but don't
382
# search attachments (thus attachment name = '')
383
pages = self.pages or [{'pagename': p, 'attachment': '', 'wikiname': 'Self', } for p in self._getPageList()]
385
hits = self._getHits(pages)
386
self.request.clock.stop('_moinSearch')
390
676
def _getPageList(self):
391
677
""" Get list of pages to search in