~ubuntu-branches/ubuntu/trusty/zeitgeist-extensions/trusty

« back to all changes in this revision

Viewing changes to fts/fts.py

  • Committer: Bazaar Package Importer
  • Author(s): Didier Roche
  • Date: 2010-08-05 21:27:04 UTC
  • Revision ID: james.westby@ubuntu.com-20100805212704-o4scyil4j8ewkbn2
Tags: upstream-0.0.3
Import upstream version 0.0.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -.- coding: utf-8 -.-
 
2
 
 
3
# Zeitgeist
 
4
#
 
5
# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@gmail.com>
 
6
# Copyright © 2010 Canonical Ltd
 
7
#
 
8
# This program is free software: you can redistribute it and/or modify
 
9
# it under the terms of the GNU Lesser General Public License as published by
 
10
# the Free Software Foundation, either version 3 of the License, or
 
11
# (at your option) any later version.
 
12
#
 
13
# This program is distributed in the hope that it will be useful,
 
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
16
# GNU Lesser General Public License for more details.
 
17
#
 
18
# You should have received a copy of the GNU Lesser General Public License
 
19
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
20
#
 
21
 
 
22
#
 
23
# TODO
 
24
#
 
25
# - Delete events hook
 
26
# - ? Filter on StorageState
 
27
# - Throttle IO and CPU where possible
 
28
 
 
29
import os, sys
 
30
import time
 
31
import pickle
 
32
import dbus
 
33
import dbus.service
 
34
from xdg import BaseDirectory
 
35
import logging
 
36
import subprocess
 
37
from xml.dom import minidom
 
38
import xapian
 
39
import os
 
40
from Queue import Queue, Empty
 
41
import threading
 
42
import gobject, gio
 
43
 
 
44
from zeitgeist.datamodel import Symbol, StorageState, ResultType, TimeRange, NULL_EVENT
 
45
from _zeitgeist.engine.datamodel import Event, Subject
 
46
from _zeitgeist.engine.extension import Extension
 
47
from _zeitgeist.engine import constants
 
48
from zeitgeist.datamodel import Interpretation, Manifestation
 
49
 
 
50
logging.basicConfig(level=logging.DEBUG)
 
51
log = logging.getLogger("zeitgeist.fts")
 
52
 
 
53
INDEX_FILE = os.path.join(constants.DATA_PATH, "fts.index")
 
54
FTS_DBUS_OBJECT_PATH = "/org/gnome/zeitgeist/index/activity"
 
55
FTS_DBUS_INTERFACE = "org.gnome.zeitgeist.Index"
 
56
 
 
57
FILTER_PREFIX_EVENT_INTERPRETATION = "ZGEI"
 
58
FILTER_PREFIX_EVENT_MANIFESTATION = "ZGEM"
 
59
FILTER_PREFIX_ACTOR = "ZGA"
 
60
FILTER_PREFIX_SUBJECT_INTERPRETATION = "ZGSI"
 
61
FILTER_PREFIX_SUBJECT_MANIFESTATION = "ZGSM"
 
62
 
 
63
VALUE_EVENT_ID = 0
 
64
VALUE_TIMESTAMP = 1
 
65
 
 
66
# When sorting by of the COALESCING_RESULT_TYPES result types,
 
67
# we need to fetch some extra events from the Xapian index because
 
68
# the final result set will be coalesced on some property of the event
 
69
COALESCING_RESULT_TYPES = [ \
 
70
        ResultType.MostRecentSubjects,
 
71
        ResultType.LeastRecentSubjects,
 
72
        ResultType.MostPopularSubjects,
 
73
        ResultType.LeastPopularSubjects,
 
74
        ResultType.MostRecentActor,
 
75
        ResultType.LeastRecentActor,
 
76
        ResultType.MostPopularActor,
 
77
        ResultType.LeastPopularActor,
 
78
]
 
79
 
 
80
class SearchEngineExtension (Extension, dbus.service.Object):
 
81
        """
 
82
        Full text indexing and searching extension for Zeitgeist
 
83
        """
 
84
        PUBLIC_METHODS = []
 
85
        
 
86
        def __init__ (self, engine):
 
87
                Extension.__init__(self, engine)
 
88
                dbus.service.Object.__init__(self, dbus.SessionBus(),
 
89
                                             FTS_DBUS_OBJECT_PATH)
 
90
                self._indexer = Indexer(self.engine)
 
91
        
 
92
        def pre_insert_event(self, event, sender):
 
93
                # Fix when Zeitgeist 0.5.1 hits the street use post_insert_event() instead
 
94
                self._indexer.index_event (event)
 
95
                return event
 
96
                
 
97
        @dbus.service.method(FTS_DBUS_INTERFACE,
 
98
                             in_signature="s(xx)a("+constants.SIG_EVENT+")uuu",
 
99
                             out_signature="a("+constants.SIG_EVENT+")u")
 
100
        def Search(self, query_string, time_range, filter_templates, offset, count, result_type):
 
101
                """
 
102
                DBus method to perform a full text search against the contents of the
 
103
                Zeitgeist log. Returns an array of events.
 
104
                """
 
105
                time_range = TimeRange(time_range[0], time_range[1])
 
106
                filter_templates = map(Event, filter_templates)
 
107
                events, hit_count = self._indexer.search(query_string, time_range,
 
108
                                                         filter_templates,
 
109
                                                         offset, count, result_type)
 
110
                return self._make_events_sendable (events), hit_count
 
111
        
 
112
        def _make_events_sendable(self, events):
 
113
                for event in events:
 
114
                        if event is not None:
 
115
                                event._make_dbus_sendable()
 
116
                return [NULL_EVENT if event is None else event for event in events]
 
117
 
 
118
class Indexer:
 
119
        """
 
120
        Abstraction of the FT indexer and search engine
 
121
        """
 
122
        
 
123
        QUERY_PARSER_FLAGS = xapian.QueryParser.FLAG_PHRASE |   \
 
124
                             xapian.QueryParser.FLAG_BOOLEAN |  \
 
125
                             xapian.QueryParser.FLAG_LOVEHATE | \
 
126
                             xapian.QueryParser.FLAG_WILDCARD
 
127
        
 
128
        def __init__ (self, engine):
 
129
                self._engine = engine
 
130
        
 
131
                log.debug("Opening full text index: %s" % INDEX_FILE)
 
132
                self._index = xapian.WritableDatabase(INDEX_FILE, xapian.DB_CREATE_OR_OPEN)
 
133
                self._tokenizer = indexer = xapian.TermGenerator()
 
134
                self._query_parser = xapian.QueryParser()
 
135
                self._query_parser.set_database (self._index)
 
136
                self._query_parser.add_prefix("name", "N")
 
137
                self._query_parser.add_prefix("title", "N")
 
138
                self._query_parser.add_prefix("site", "S")
 
139
                self._query_parser.add_prefix("app", "A")
 
140
                self._query_parser.add_boolean_prefix("zgei", FILTER_PREFIX_EVENT_INTERPRETATION)
 
141
                self._query_parser.add_boolean_prefix("zgem", FILTER_PREFIX_EVENT_MANIFESTATION)
 
142
                self._query_parser.add_boolean_prefix("zga", FILTER_PREFIX_ACTOR)
 
143
                self._query_parser.add_boolean_prefix("zgsi", FILTER_PREFIX_SUBJECT_INTERPRETATION)
 
144
                self._query_parser.add_boolean_prefix("zgsm", FILTER_PREFIX_SUBJECT_MANIFESTATION)
 
145
                self._query_parser.add_valuerangeprocessor(
 
146
                      xapian.NumberValueRangeProcessor(VALUE_EVENT_ID, "id", True))
 
147
                self._query_parser.add_valuerangeprocessor(
 
148
                      xapian.NumberValueRangeProcessor(VALUE_TIMESTAMP, "ms", False))
 
149
                self._query_parser.set_default_op(xapian.Query.OP_AND)
 
150
                self._enquire = xapian.Enquire(self._index)
 
151
                
 
152
                gobject.threads_init()
 
153
                self._may_run = True
 
154
                self._queue = Queue(0)
 
155
                self._worker = threading.Thread(target=self._worker_thread,
 
156
                                                name="IndexWorker")
 
157
                self._worker.daemon = True
 
158
                self._worker.start()
 
159
                
 
160
                if self._index.get_doccount() == 0:
 
161
                        # We must delay reindexing until after the engine is done setting up
 
162
                        log.info("Empty index detected. Doing full rebuild")
 
163
                        gobject.idle_add (self._reindex)
 
164
                
 
165
                # List term freqs
 
166
                #for term in self._index.allterms():
 
167
                #       print term.term, term.termfreq
 
168
        
 
169
        def _reindex (self):
 
170
                """
 
171
                Index everything in the ZG log
 
172
                """
 
173
                all_events = self._engine.find_events(TimeRange.always(),
 
174
                                                      [], StorageState.Any,
 
175
                                                      sys.maxint,
 
176
                                                      ResultType.MostRecentEvents)
 
177
                log.info("Preparing to index %s events" % len(all_events))
 
178
                for e in all_events : self._queue.put(e)
 
179
        
 
180
        def index_event (self, event):
 
181
                """
 
182
                This method schedules and event for indexing. It returns immediate and
 
183
                defers the actual work to a bottom half thread. This means that it
 
184
                will not block the main loop of the Zeitgeist daemon while indexing
 
185
                (which may be a heavy operation)
 
186
                """
 
187
                self._queue.put (event)
 
188
                return event
 
189
        
 
190
        def search (self, query_string, time_range=None, filters=None, offset=0, maxhits=10, result_type=100):
 
191
                """
 
192
                Do a full text search over the indexed corpus. The `result_type`
 
193
                parameter may be a zeitgeist.datamodel.ResultType or 100. In case it is
 
194
                100 the textual relevancy of the search engine will be used to sort the
 
195
                results. Result type 100 is the fastest (and default) mode.
 
196
                
 
197
                The filters argument should be a list of event templates.
 
198
                """
 
199
                # Expand event template filters if necessary
 
200
                if filters:
 
201
                        query_string = "(%s) AND (%s)" % (query_string, self._compile_event_filter_query (filters))
 
202
                
 
203
                # Expand time range value query
 
204
                if time_range and not time_range.is_always():
 
205
                        query_string = "(%s) AND (%s)" % (query_string, self._compile_time_range_filter_query (time_range))
 
206
                
 
207
                # If the result type coalesces the events we need to fetch some extra
 
208
                # events from the index to have a chance of actually holding 'maxhits'
 
209
                # unique events
 
210
                if result_type in COALESCING_RESULT_TYPES:
 
211
                        raw_maxhits = maxhits * 3
 
212
                else:
 
213
                        raw_maxhits = maxhits
 
214
                
 
215
                # Allow wildcards
 
216
                query_start = time.time()
 
217
                query = self._query_parser.parse_query (query_string,
 
218
                                                        self.QUERY_PARSER_FLAGS)
 
219
                self._enquire.set_query (query)
 
220
                hits = self._enquire.get_mset (offset, raw_maxhits)
 
221
                hit_count = hits.get_matches_estimated()
 
222
                log.debug("Search '%s' gave %s hits in %sms" %
 
223
                          (query_string, hits.get_matches_estimated(), (time.time() - query_start)*1000))
 
224
                
 
225
                if result_type == 100:
 
226
                        event_ids = []
 
227
                        for m in hits:
 
228
                                event_id = int(xapian.sortable_unserialise(
 
229
                                                          m.document.get_value(VALUE_EVENT_ID)))
 
230
                                log.debug("%i: %i%% docid=%i eventid=%s" %
 
231
                                          (m.rank + 1, m.percent, m.docid, event_id))
 
232
                                event_ids.append (event_id)
 
233
                        if event_ids:
 
234
                                return self._engine.get_events(ids=event_ids), hit_count
 
235
                        else:
 
236
                                return [], 0
 
237
                else:
 
238
                        templates = []
 
239
                        for m in hits:
 
240
                                event_id = int(xapian.sortable_unserialise(
 
241
                                                          m.document.get_value(VALUE_EVENT_ID)))
 
242
                                log.debug("%i: %i%% docid=%i eventid=%s" %
 
243
                                          (m.rank + 1, m.percent, m.docid, event_id))
 
244
                                ev = Event()
 
245
                                ev[0][Event.Id] = str(event_id)
 
246
                                templates.append(ev)
 
247
                
 
248
                        if templates:
 
249
                                return self._engine._find_events(1, TimeRange.always(),
 
250
                                                                 templates,
 
251
                                                                 StorageState.Any,
 
252
                                                                 maxhits,
 
253
                                                                 result_type), hit_count
 
254
                        else:
 
255
                                return [], 0
 
256
        
 
257
        def _worker_thread (self):
 
258
                is_dirty = False
 
259
                while self._may_run:
 
260
                        # FIXME: Throttle IO and CPU
 
261
                        try:
 
262
                                # If we are dirty wait a while before we flush,
 
263
                                # or if we are clean wait indefinitely to avoid
 
264
                                # needless wakeups
 
265
                                if is_dirty:
 
266
                                        event = self._queue.get(True, 0.5)
 
267
                                else:
 
268
                                        event = self._queue.get(True)
 
269
                                
 
270
                                self._index_event_real (event)
 
271
                                is_dirty = True
 
272
                        except Empty:
 
273
                                if is_dirty:
 
274
                                        # Write changes to disk
 
275
                                        log.debug("Committing FTS index")
 
276
                                        self._index.flush()
 
277
                                        is_dirty = False
 
278
                                else:
 
279
                                        log.debug("No changes to index. Sleeping")
 
280
        
 
281
        def _split_uri (self, uri):
 
282
                """
 
283
                Returns a triple of (scheme, host, and path) extracted from `uri`
 
284
                """
 
285
                i = uri.find(":")
 
286
                if i == -1 :
 
287
                        scheme =  ""
 
288
                        host = ""
 
289
                        path = uri
 
290
                else:
 
291
                        scheme = uri[:i]
 
292
                        host = ""
 
293
                        path = ""
 
294
                
 
295
                if uri[i+1] == "/" and uri[i+2] == "/":
 
296
                        j = uri.find("/", i+3)
 
297
                        if j == -1 :
 
298
                                host = uri[i+3:]
 
299
                        else:
 
300
                                host = uri[i+3:j]
 
301
                                path = uri[j:]
 
302
                else:
 
303
                        host = uri[i+1:]
 
304
                
 
305
                # Strip out URI query part
 
306
                i = path.find("?")
 
307
                if i != -1:
 
308
                        path = path[:i]
 
309
                
 
310
                return scheme, host, path
 
311
        
 
312
        def _get_appinfo (self, app_id):
 
313
                """
 
314
                Return a gio.AppInfo for `app_id`
 
315
                """
 
316
                # FIXME: Use an LRUCache for appinfos
 
317
                return gio.unix.DesktopAppInfo(app_id)
 
318
        
 
319
        def _index_actor (self, actor):
 
320
                """
 
321
                Takes an actor as a path to a .desktop file or app:// uri
 
322
                and index the contents of the corresponding .desktop file
 
323
                into the document currently set for self._tokenizer.
 
324
                """
 
325
                # Get the path of the .desktop file and convert it to
 
326
                # an app id (eg. 'gedit.desktop')
 
327
                scheme, host, path = self._split_uri(actor)
 
328
                if not path:
 
329
                        path = host
 
330
                
 
331
                if not path :
 
332
                        log.debug("Unable to determine application id for %s" % actor)
 
333
                        return
 
334
                
 
335
                if path.startswith("/") :
 
336
                        path = os.path.basename(path)
 
337
                
 
338
                appinfo = self._get_appinfo(path)
 
339
                if appinfo:
 
340
                        self._tokenizer.index_text(appinfo.get_name(), 5)
 
341
                        self._tokenizer.index_text(appinfo.get_name(), 5, "A")
 
342
                        self._tokenizer.index_text(appinfo.get_description(), 2)
 
343
                        self._tokenizer.index_text(appinfo.get_description(), 2, "A")
 
344
                else:
 
345
                        log.debug("Unable to look up app info for %s" % actor)
 
346
                
 
347
        
 
348
        def _index_uri (self, uri):
 
349
                """
 
350
                Index `uri` into the document currectly set on self._tokenizer
 
351
                """
 
352
                # File URIs and paths are indexed in one way, and all other,
 
353
                # usually web URIs, are indexed in another way because there may
 
354
                # be domain name etc. in there we want to rank differently
 
355
                scheme, host, path = self._split_uri (uri)
 
356
                if scheme == "file://" or not scheme:
 
357
                        path, name = os.path.split(path)
 
358
                        self._tokenizer.index_text(name, 5)
 
359
                        self._tokenizer.index_text(name, 5, "N")
 
360
                        
 
361
                        # Index parent names with descending weight
 
362
                        weight = 5
 
363
                        while path and name:
 
364
                                weight = weight / 1.5
 
365
                                path, name = os.path.split(path)
 
366
                                self._tokenizer.index_text(name, weight)
 
367
                        
 
368
                elif scheme == "mailto:":
 
369
                        tokens = host.split("@")
 
370
                        name = tokens[0]
 
371
                        self._tokenizer.index_text(name, 6)
 
372
                        if len(tokens) > 1:
 
373
                                self._tokenizer.index_text(" ".join[1:], 1)
 
374
                else:
 
375
                        path, name = os.path.split(path)
 
376
                        if name:
 
377
                                self._tokenizer.index_text(name, 5)
 
378
                                self._tokenizer.index_text(name, 5, "N")
 
379
                        if path:
 
380
                                self._tokenizer.index_text(path, 1)
 
381
                                self._tokenizer.index_text(path, 1, "N")
 
382
                        if host:
 
383
                                self._tokenizer.index_text(host, 2)
 
384
                                self._tokenizer.index_text(host, 2, "N")
 
385
                                self._tokenizer.index_text(host, 2, "S")
 
386
        
 
387
        def _index_text (self, text):
 
388
                """
 
389
                Index `text` as raw text data for the document currently
 
390
                set on self._tokenizer. The text is assumed to be a primary
 
391
                description of the subject, such as the basename of a file.
 
392
                
 
393
                Primary use is for subject.text
 
394
                """
 
395
                self._tokenizer.index_text(text, 5)
 
396
        
 
397
        def _index_contents (self, uri):
 
398
                # xmlindexer doesn't extract words for URIs only for file paths
 
399
                
 
400
                # FIXME: IONICE and NICE on xmlindexer
 
401
                
 
402
                path = uri.replace("file://", "")
 
403
                xmlindexer = subprocess.Popen(['xmlindexer', path],
 
404
                                              stdout=subprocess.PIPE)
 
405
                xml = xmlindexer.communicate()[0].strip()
 
406
                xmlindexer.wait()               
 
407
                
 
408
                dom = minidom.parseString(xml)
 
409
                text_nodes = dom.getElementsByTagName("text")
 
410
                lines = []
 
411
                if text_nodes:
 
412
                        for line in text_nodes[0].childNodes:
 
413
                                lines.append(line.data)
 
414
                
 
415
                if lines:
 
416
                                self._tokenizer.index_text (" ".join(lines))
 
417
                
 
418
        
 
419
        def _add_doc_filters (self, event, doc):
 
420
                """Adds the filtering rules to the doc. Filtering rules will
 
421
                   not affect the relevancy ranking of the event/doc"""
 
422
                doc.add_term (FILTER_PREFIX_EVENT_INTERPRETATION+event.interpretation)
 
423
                doc.add_term (FILTER_PREFIX_EVENT_MANIFESTATION+event.manifestation)
 
424
                doc.add_term (FILTER_PREFIX_ACTOR+event.actor)
 
425
                
 
426
                for su in event.subjects:
 
427
                        doc.add_term (FILTER_PREFIX_SUBJECT_INTERPRETATION+su.interpretation)
 
428
                        doc.add_term (FILTER_PREFIX_SUBJECT_MANIFESTATION+su.manifestation)
 
429
        
 
430
        def _index_event_real (self, event):
 
431
                if not isinstance (event, Event):
 
432
                        log.error("Not an Event, found: %s" % type(event))
 
433
                if not event.id:
 
434
                        log.debug("Not indexing event. Event has no id")
 
435
                        return
 
436
                
 
437
                try:
 
438
                        doc = xapian.Document()
 
439
                        doc.add_value (VALUE_EVENT_ID,
 
440
                                       xapian.sortable_serialise(float(event.id)))
 
441
                        doc.add_value (VALUE_TIMESTAMP,
 
442
                                       xapian.sortable_serialise(float(event.timestamp)))
 
443
                        self._tokenizer.set_document (doc)
 
444
                
 
445
                        self._index_actor (event.actor)
 
446
                
 
447
                        for subject in event.subjects:
 
448
                                if not subject.uri : continue
 
449
                                log.debug("Indexing '%s'" % subject.uri)
 
450
                                self._index_uri (subject.uri)
 
451
                                self._index_text (subject.text)
 
452
                                # FIXME: index origin into an origin: prefix
 
453
                                # FIXME: index uri into a uri: prefix
 
454
                                #self._index_contents (subject.uri)
 
455
                                # FIXME: Possibly index payloads when we have apriori knowledge
 
456
                        
 
457
                        self._add_doc_filters (event, doc)      
 
458
                                
 
459
                        self._index.add_document (doc)
 
460
                except Exception, e:
 
461
                        log.error("Error indexing event: %s" % e)
 
462
 
 
463
        def _compile_event_filter_query (self, events):
 
464
                """Takes a list of event templates and compiles a filter query
 
465
                   based on their, interpretations, manifestations, and actor,
 
466
                   for event and subjects.
 
467
                   
 
468
                   All fields within the same event will be ANDed and each template
 
469
                   will be ORed with the others. Like elsewhere in Zeitgeist the
 
470
                   type tree of the interpretations and manifestations will be expanded
 
471
                   to match all child symbols as well
 
472
                """
 
473
                query = []
 
474
                for event in events:
 
475
                        if not isinstance(event, Event):
 
476
                                raise TypeError("Expected Event. Found %s" % type(event))
 
477
                        
 
478
                        tmpl = []
 
479
                        if event.interpretation :
 
480
                                children = Symbol.find_child_uris_extended(event.interpretation)
 
481
                                children = [ "zgei:%s" % child for child in children ]
 
482
                                tmpl.append(" OR ".join(children))
 
483
                        if event.manifestation :
 
484
                                children = Symbol.find_child_uris_extended(event.manifestation)
 
485
                                children = [ "zgem:%s" % child for child in children ]
 
486
                                tmpl.append(" OR ".join(children))
 
487
                        if event.actor : tmpl.append("zga:"+event.actor)
 
488
                        for su in event.subjects:
 
489
                                if su.interpretation :
 
490
                                        children = Symbol.find_child_uris_extended(su.interpretation)
 
491
                                        children = [ "zgsi:%s" % child for child in children ]
 
492
                                        tmpl.append(" OR ".join(children))
 
493
                                if su.manifestation :
 
494
                                        children = Symbol.find_child_uris_extended(su.manifestation)
 
495
                                        children = [ "zgsm:%s" % child for child in children ]
 
496
                                        tmpl.append(" OR ".join(children))
 
497
                        
 
498
                        tmpl = "(" + ") AND (".join(tmpl) + ")"
 
499
                        query.append(tmpl)
 
500
                
 
501
                return " OR ".join(query)
 
502
        
 
503
        def _compile_time_range_filter_query (self, time_range):
 
504
                """Takes a TimeRange and compiles a range query for it"""
 
505
                
 
506
                if not isinstance(time_range, TimeRange):
 
507
                        raise TypeError("Expected TimeRange, but found %s" % type(time_range))
 
508
                
 
509
                return "%s..%sms" % (time_range.begin, time_range.end)
 
510
 
 
511
if __name__ == "__main__":
 
512
        indexer = Indexer(None)
 
513
        print indexer._compile_filter_query([Event.new_for_values(subject_interpretation="http://www.semanticdesktop.org/ontologies/2007/03/22/nfo#Document")])