1
# -.- coding: utf-8 -.-
5
# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@gmail.com>
6
# Copyright © 2010 Canonical Ltd
8
# This program is free software: you can redistribute it and/or modify
9
# it under the terms of the GNU Lesser General Public License as published by
10
# the Free Software Foundation, either version 3 of the License, or
11
# (at your option) any later version.
13
# This program is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
# GNU Lesser General Public License for more details.
18
# You should have received a copy of the GNU Lesser General Public License
19
# along with this program. If not, see <http://www.gnu.org/licenses/>.
25
# - Delete events hook
26
# - ? Filter on StorageState
27
# - Throttle IO and CPU where possible
34
from xdg import BaseDirectory
37
from xml.dom import minidom
40
from Queue import Queue, Empty
44
from zeitgeist.datamodel import Symbol, StorageState, ResultType, TimeRange, NULL_EVENT
45
from _zeitgeist.engine.datamodel import Event, Subject
46
from _zeitgeist.engine.extension import Extension
47
from _zeitgeist.engine import constants
48
from zeitgeist.datamodel import Interpretation, Manifestation
50
logging.basicConfig(level=logging.DEBUG)
51
log = logging.getLogger("zeitgeist.fts")
53
INDEX_FILE = os.path.join(constants.DATA_PATH, "fts.index")
54
FTS_DBUS_OBJECT_PATH = "/org/gnome/zeitgeist/index/activity"
55
FTS_DBUS_INTERFACE = "org.gnome.zeitgeist.Index"
57
FILTER_PREFIX_EVENT_INTERPRETATION = "ZGEI"
58
FILTER_PREFIX_EVENT_MANIFESTATION = "ZGEM"
59
FILTER_PREFIX_ACTOR = "ZGA"
60
FILTER_PREFIX_SUBJECT_INTERPRETATION = "ZGSI"
61
FILTER_PREFIX_SUBJECT_MANIFESTATION = "ZGSM"
66
# When sorting by of the COALESCING_RESULT_TYPES result types,
67
# we need to fetch some extra events from the Xapian index because
68
# the final result set will be coalesced on some property of the event
69
COALESCING_RESULT_TYPES = [ \
70
ResultType.MostRecentSubjects,
71
ResultType.LeastRecentSubjects,
72
ResultType.MostPopularSubjects,
73
ResultType.LeastPopularSubjects,
74
ResultType.MostRecentActor,
75
ResultType.LeastRecentActor,
76
ResultType.MostPopularActor,
77
ResultType.LeastPopularActor,
80
class SearchEngineExtension (Extension, dbus.service.Object):
82
Full text indexing and searching extension for Zeitgeist
86
def __init__ (self, engine):
87
Extension.__init__(self, engine)
88
dbus.service.Object.__init__(self, dbus.SessionBus(),
90
self._indexer = Indexer(self.engine)
92
def pre_insert_event(self, event, sender):
93
# Fix when Zeitgeist 0.5.1 hits the street use post_insert_event() instead
94
self._indexer.index_event (event)
97
@dbus.service.method(FTS_DBUS_INTERFACE,
98
in_signature="s(xx)a("+constants.SIG_EVENT+")uuu",
99
out_signature="a("+constants.SIG_EVENT+")u")
100
def Search(self, query_string, time_range, filter_templates, offset, count, result_type):
102
DBus method to perform a full text search against the contents of the
103
Zeitgeist log. Returns an array of events.
105
time_range = TimeRange(time_range[0], time_range[1])
106
filter_templates = map(Event, filter_templates)
107
events, hit_count = self._indexer.search(query_string, time_range,
109
offset, count, result_type)
110
return self._make_events_sendable (events), hit_count
112
def _make_events_sendable(self, events):
114
if event is not None:
115
event._make_dbus_sendable()
116
return [NULL_EVENT if event is None else event for event in events]
120
Abstraction of the FT indexer and search engine
123
QUERY_PARSER_FLAGS = xapian.QueryParser.FLAG_PHRASE | \
124
xapian.QueryParser.FLAG_BOOLEAN | \
125
xapian.QueryParser.FLAG_LOVEHATE | \
126
xapian.QueryParser.FLAG_WILDCARD
128
def __init__ (self, engine):
129
self._engine = engine
131
log.debug("Opening full text index: %s" % INDEX_FILE)
132
self._index = xapian.WritableDatabase(INDEX_FILE, xapian.DB_CREATE_OR_OPEN)
133
self._tokenizer = indexer = xapian.TermGenerator()
134
self._query_parser = xapian.QueryParser()
135
self._query_parser.set_database (self._index)
136
self._query_parser.add_prefix("name", "N")
137
self._query_parser.add_prefix("title", "N")
138
self._query_parser.add_prefix("site", "S")
139
self._query_parser.add_prefix("app", "A")
140
self._query_parser.add_boolean_prefix("zgei", FILTER_PREFIX_EVENT_INTERPRETATION)
141
self._query_parser.add_boolean_prefix("zgem", FILTER_PREFIX_EVENT_MANIFESTATION)
142
self._query_parser.add_boolean_prefix("zga", FILTER_PREFIX_ACTOR)
143
self._query_parser.add_boolean_prefix("zgsi", FILTER_PREFIX_SUBJECT_INTERPRETATION)
144
self._query_parser.add_boolean_prefix("zgsm", FILTER_PREFIX_SUBJECT_MANIFESTATION)
145
self._query_parser.add_valuerangeprocessor(
146
xapian.NumberValueRangeProcessor(VALUE_EVENT_ID, "id", True))
147
self._query_parser.add_valuerangeprocessor(
148
xapian.NumberValueRangeProcessor(VALUE_TIMESTAMP, "ms", False))
149
self._query_parser.set_default_op(xapian.Query.OP_AND)
150
self._enquire = xapian.Enquire(self._index)
152
gobject.threads_init()
154
self._queue = Queue(0)
155
self._worker = threading.Thread(target=self._worker_thread,
157
self._worker.daemon = True
160
if self._index.get_doccount() == 0:
161
# We must delay reindexing until after the engine is done setting up
162
log.info("Empty index detected. Doing full rebuild")
163
gobject.idle_add (self._reindex)
166
#for term in self._index.allterms():
167
# print term.term, term.termfreq
171
Index everything in the ZG log
173
all_events = self._engine.find_events(TimeRange.always(),
174
[], StorageState.Any,
176
ResultType.MostRecentEvents)
177
log.info("Preparing to index %s events" % len(all_events))
178
for e in all_events : self._queue.put(e)
180
def index_event (self, event):
182
This method schedules and event for indexing. It returns immediate and
183
defers the actual work to a bottom half thread. This means that it
184
will not block the main loop of the Zeitgeist daemon while indexing
185
(which may be a heavy operation)
187
self._queue.put (event)
190
def search (self, query_string, time_range=None, filters=None, offset=0, maxhits=10, result_type=100):
192
Do a full text search over the indexed corpus. The `result_type`
193
parameter may be a zeitgeist.datamodel.ResultType or 100. In case it is
194
100 the textual relevancy of the search engine will be used to sort the
195
results. Result type 100 is the fastest (and default) mode.
197
The filters argument should be a list of event templates.
199
# Expand event template filters if necessary
201
query_string = "(%s) AND (%s)" % (query_string, self._compile_event_filter_query (filters))
203
# Expand time range value query
204
if time_range and not time_range.is_always():
205
query_string = "(%s) AND (%s)" % (query_string, self._compile_time_range_filter_query (time_range))
207
# If the result type coalesces the events we need to fetch some extra
208
# events from the index to have a chance of actually holding 'maxhits'
210
if result_type in COALESCING_RESULT_TYPES:
211
raw_maxhits = maxhits * 3
213
raw_maxhits = maxhits
216
query_start = time.time()
217
query = self._query_parser.parse_query (query_string,
218
self.QUERY_PARSER_FLAGS)
219
self._enquire.set_query (query)
220
hits = self._enquire.get_mset (offset, raw_maxhits)
221
hit_count = hits.get_matches_estimated()
222
log.debug("Search '%s' gave %s hits in %sms" %
223
(query_string, hits.get_matches_estimated(), (time.time() - query_start)*1000))
225
if result_type == 100:
228
event_id = int(xapian.sortable_unserialise(
229
m.document.get_value(VALUE_EVENT_ID)))
230
log.debug("%i: %i%% docid=%i eventid=%s" %
231
(m.rank + 1, m.percent, m.docid, event_id))
232
event_ids.append (event_id)
234
return self._engine.get_events(ids=event_ids), hit_count
240
event_id = int(xapian.sortable_unserialise(
241
m.document.get_value(VALUE_EVENT_ID)))
242
log.debug("%i: %i%% docid=%i eventid=%s" %
243
(m.rank + 1, m.percent, m.docid, event_id))
245
ev[0][Event.Id] = str(event_id)
249
return self._engine._find_events(1, TimeRange.always(),
253
result_type), hit_count
257
def _worker_thread (self):
260
# FIXME: Throttle IO and CPU
262
# If we are dirty wait a while before we flush,
263
# or if we are clean wait indefinitely to avoid
266
event = self._queue.get(True, 0.5)
268
event = self._queue.get(True)
270
self._index_event_real (event)
274
# Write changes to disk
275
log.debug("Committing FTS index")
279
log.debug("No changes to index. Sleeping")
281
def _split_uri (self, uri):
283
Returns a triple of (scheme, host, and path) extracted from `uri`
295
if uri[i+1] == "/" and uri[i+2] == "/":
296
j = uri.find("/", i+3)
305
# Strip out URI query part
310
return scheme, host, path
312
def _get_appinfo (self, app_id):
314
Return a gio.AppInfo for `app_id`
316
# FIXME: Use an LRUCache for appinfos
317
return gio.unix.DesktopAppInfo(app_id)
319
def _index_actor (self, actor):
321
Takes an actor as a path to a .desktop file or app:// uri
322
and index the contents of the corresponding .desktop file
323
into the document currently set for self._tokenizer.
325
# Get the path of the .desktop file and convert it to
326
# an app id (eg. 'gedit.desktop')
327
scheme, host, path = self._split_uri(actor)
332
log.debug("Unable to determine application id for %s" % actor)
335
if path.startswith("/") :
336
path = os.path.basename(path)
338
appinfo = self._get_appinfo(path)
340
self._tokenizer.index_text(appinfo.get_name(), 5)
341
self._tokenizer.index_text(appinfo.get_name(), 5, "A")
342
self._tokenizer.index_text(appinfo.get_description(), 2)
343
self._tokenizer.index_text(appinfo.get_description(), 2, "A")
345
log.debug("Unable to look up app info for %s" % actor)
348
def _index_uri (self, uri):
350
Index `uri` into the document currectly set on self._tokenizer
352
# File URIs and paths are indexed in one way, and all other,
353
# usually web URIs, are indexed in another way because there may
354
# be domain name etc. in there we want to rank differently
355
scheme, host, path = self._split_uri (uri)
356
if scheme == "file://" or not scheme:
357
path, name = os.path.split(path)
358
self._tokenizer.index_text(name, 5)
359
self._tokenizer.index_text(name, 5, "N")
361
# Index parent names with descending weight
364
weight = weight / 1.5
365
path, name = os.path.split(path)
366
self._tokenizer.index_text(name, weight)
368
elif scheme == "mailto:":
369
tokens = host.split("@")
371
self._tokenizer.index_text(name, 6)
373
self._tokenizer.index_text(" ".join[1:], 1)
375
path, name = os.path.split(path)
377
self._tokenizer.index_text(name, 5)
378
self._tokenizer.index_text(name, 5, "N")
380
self._tokenizer.index_text(path, 1)
381
self._tokenizer.index_text(path, 1, "N")
383
self._tokenizer.index_text(host, 2)
384
self._tokenizer.index_text(host, 2, "N")
385
self._tokenizer.index_text(host, 2, "S")
387
def _index_text (self, text):
389
Index `text` as raw text data for the document currently
390
set on self._tokenizer. The text is assumed to be a primary
391
description of the subject, such as the basename of a file.
393
Primary use is for subject.text
395
self._tokenizer.index_text(text, 5)
397
def _index_contents (self, uri):
398
# xmlindexer doesn't extract words for URIs only for file paths
400
# FIXME: IONICE and NICE on xmlindexer
402
path = uri.replace("file://", "")
403
xmlindexer = subprocess.Popen(['xmlindexer', path],
404
stdout=subprocess.PIPE)
405
xml = xmlindexer.communicate()[0].strip()
408
dom = minidom.parseString(xml)
409
text_nodes = dom.getElementsByTagName("text")
412
for line in text_nodes[0].childNodes:
413
lines.append(line.data)
416
self._tokenizer.index_text (" ".join(lines))
419
def _add_doc_filters (self, event, doc):
420
"""Adds the filtering rules to the doc. Filtering rules will
421
not affect the relevancy ranking of the event/doc"""
422
doc.add_term (FILTER_PREFIX_EVENT_INTERPRETATION+event.interpretation)
423
doc.add_term (FILTER_PREFIX_EVENT_MANIFESTATION+event.manifestation)
424
doc.add_term (FILTER_PREFIX_ACTOR+event.actor)
426
for su in event.subjects:
427
doc.add_term (FILTER_PREFIX_SUBJECT_INTERPRETATION+su.interpretation)
428
doc.add_term (FILTER_PREFIX_SUBJECT_MANIFESTATION+su.manifestation)
430
def _index_event_real (self, event):
431
if not isinstance (event, Event):
432
log.error("Not an Event, found: %s" % type(event))
434
log.debug("Not indexing event. Event has no id")
438
doc = xapian.Document()
439
doc.add_value (VALUE_EVENT_ID,
440
xapian.sortable_serialise(float(event.id)))
441
doc.add_value (VALUE_TIMESTAMP,
442
xapian.sortable_serialise(float(event.timestamp)))
443
self._tokenizer.set_document (doc)
445
self._index_actor (event.actor)
447
for subject in event.subjects:
448
if not subject.uri : continue
449
log.debug("Indexing '%s'" % subject.uri)
450
self._index_uri (subject.uri)
451
self._index_text (subject.text)
452
# FIXME: index origin into an origin: prefix
453
# FIXME: index uri into a uri: prefix
454
#self._index_contents (subject.uri)
455
# FIXME: Possibly index payloads when we have apriori knowledge
457
self._add_doc_filters (event, doc)
459
self._index.add_document (doc)
461
log.error("Error indexing event: %s" % e)
463
def _compile_event_filter_query (self, events):
464
"""Takes a list of event templates and compiles a filter query
465
based on their, interpretations, manifestations, and actor,
466
for event and subjects.
468
All fields within the same event will be ANDed and each template
469
will be ORed with the others. Like elsewhere in Zeitgeist the
470
type tree of the interpretations and manifestations will be expanded
471
to match all child symbols as well
475
if not isinstance(event, Event):
476
raise TypeError("Expected Event. Found %s" % type(event))
479
if event.interpretation :
480
children = Symbol.find_child_uris_extended(event.interpretation)
481
children = [ "zgei:%s" % child for child in children ]
482
tmpl.append(" OR ".join(children))
483
if event.manifestation :
484
children = Symbol.find_child_uris_extended(event.manifestation)
485
children = [ "zgem:%s" % child for child in children ]
486
tmpl.append(" OR ".join(children))
487
if event.actor : tmpl.append("zga:"+event.actor)
488
for su in event.subjects:
489
if su.interpretation :
490
children = Symbol.find_child_uris_extended(su.interpretation)
491
children = [ "zgsi:%s" % child for child in children ]
492
tmpl.append(" OR ".join(children))
493
if su.manifestation :
494
children = Symbol.find_child_uris_extended(su.manifestation)
495
children = [ "zgsm:%s" % child for child in children ]
496
tmpl.append(" OR ".join(children))
498
tmpl = "(" + ") AND (".join(tmpl) + ")"
501
return " OR ".join(query)
503
def _compile_time_range_filter_query (self, time_range):
504
"""Takes a TimeRange and compiles a range query for it"""
506
if not isinstance(time_range, TimeRange):
507
raise TypeError("Expected TimeRange, but found %s" % type(time_range))
509
return "%s..%sms" % (time_range.begin, time_range.end)
511
if __name__ == "__main__":
512
indexer = Indexer(None)
513
print indexer._compile_filter_query([Event.new_for_values(subject_interpretation="http://www.semanticdesktop.org/ontologies/2007/03/22/nfo#Document")])