32
31
ResultType, get_timestamp_for_now
33
32
from _zeitgeist.engine.extension import ExtensionsCollection, load_class
34
33
from _zeitgeist.engine import constants
34
from _zeitgeist.engine.sql import get_default_cursor, unset_cursor, \
35
TableLookup, WhereClause
36
37
logging.basicConfig(level=logging.DEBUG)
37
38
log = logging.getLogger("zeitgeist.engine")
39
class UnicodeCursor(sqlite3.Cursor):
43
if isinstance(obj, str):
44
obj = obj.decode("UTF-8")
47
def execute(self, statement, parameters=None):
48
if parameters is not None:
49
parameters = [self.fix_unicode(p) for p in parameters]
50
return super(UnicodeCursor, self).execute(statement, parameters)
52
return super(UnicodeCursor, self).execute(statement)
54
def create_db(file_path):
55
"""Create the database and return a default cursor for it"""
56
log.info("Using database: %s" % file_path)
57
conn = sqlite3.connect(file_path)
58
conn.row_factory = sqlite3.Row
59
cursor = conn.cursor(UnicodeCursor)
63
CREATE TABLE IF NOT EXISTS uri
64
(id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
67
CREATE UNIQUE INDEX IF NOT EXISTS uri_value ON uri(value)
72
CREATE TABLE IF NOT EXISTS interpretation
73
(id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
76
CREATE UNIQUE INDEX IF NOT EXISTS interpretation_value
77
ON interpretation(value)
82
CREATE TABLE IF NOT EXISTS manifestation
83
(id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
86
CREATE UNIQUE INDEX IF NOT EXISTS manifestation_value
87
ON manifestation(value)""")
91
CREATE TABLE IF NOT EXISTS mimetype
92
(id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
95
CREATE UNIQUE INDEX IF NOT EXISTS mimetype_value
96
ON mimetype(value)""")
100
CREATE TABLE IF NOT EXISTS actor
101
(id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
104
CREATE UNIQUE INDEX IF NOT EXISTS actor_value
109
CREATE TABLE IF NOT EXISTS text
110
(id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
113
CREATE UNIQUE INDEX IF NOT EXISTS text_value
116
# payload, there's no value index for payload,
117
# they can only be fetched by id
119
CREATE TABLE IF NOT EXISTS payload
120
(id INTEGER PRIMARY KEY, value BLOB)
123
# storage, represented by a StatefulEntityTable
125
CREATE TABLE IF NOT EXISTS storage
126
(id INTEGER PRIMARY KEY,
127
value VARCHAR UNIQUE,
131
CREATE UNIQUE INDEX IF NOT EXISTS storage_value
132
ON storage(value)""")
134
# event - the primary table for log statements
135
# note that event.id is NOT unique, we can have multiple subjects per id
136
# timestamps are integers (for now), if you would like to change it
137
# please start a bugreport for it. In case we agree on this change
138
# remember to also fix our unittests to reflect this change
140
CREATE TABLE IF NOT EXISTS event
143
interpretation INTEGER,
144
manifestation INTEGER,
148
subj_interpretation INTEGER,
149
subj_manifestation INTEGER,
151
subj_mimetype INTEGER,
153
subj_storage INTEGER,
154
CONSTRAINT unique_event UNIQUE (timestamp, interpretation, manifestation, actor, subj_id)
158
CREATE INDEX IF NOT EXISTS event_id
161
CREATE INDEX IF NOT EXISTS event_timestamp
162
ON event(timestamp)""")
164
CREATE INDEX IF NOT EXISTS event_interpretation
165
ON event(interpretation)""")
167
CREATE INDEX IF NOT EXISTS event_manifestation
168
ON event(manifestation)""")
170
CREATE INDEX IF NOT EXISTS event_actor
173
CREATE INDEX IF NOT EXISTS event_subj_id
174
ON event(subj_id)""")
176
CREATE INDEX IF NOT EXISTS event_subj_interpretation
177
ON event(subj_interpretation)""")
179
CREATE INDEX IF NOT EXISTS event_subj_manifestation
180
ON event(subj_manifestation)""")
182
CREATE INDEX IF NOT EXISTS event_subj_origin
183
ON event(subj_origin)""")
185
CREATE INDEX IF NOT EXISTS event_subj_mimetype
186
ON event(subj_mimetype)""")
188
CREATE INDEX IF NOT EXISTS event_subj_text
189
ON event(subj_text)""")
191
CREATE INDEX IF NOT EXISTS event_subj_storage
192
ON event(subj_storage)""")
194
#cursor.execute("DROP VIEW event_view")
196
CREATE VIEW IF NOT EXISTS event_view AS
199
event.interpretation,
203
(SELECT value FROM uri WHERE uri.id=event.subj_id)
205
event.subj_interpretation,
206
event.subj_manifestation,
207
(SELECT value FROM uri WHERE uri.id=event.subj_origin)
210
(SELECT value FROM text WHERE text.id = event.subj_text)
212
(SELECT value FROM storage
213
WHERE storage.id=event.subj_storage) AS subj_storage,
214
(SELECT state FROM storage
215
WHERE storage.id=event.subj_storage) AS subj_storage_state
222
def get_default_cursor():
225
dbfile = constants.DATABASE_FILE
226
_cursor = create_db(dbfile)
229
class TableLookup(dict):
231
# We are not using an LRUCache as pressumably there won't be thousands
232
# of manifestations/interpretations/mimetypes/actors on most
233
# installations, so we can save us the overhead of tracking their usage.
235
def __init__(self, cursor, table):
237
self._cursor = cursor
240
for row in cursor.execute("SELECT id, value FROM %s" % table):
241
self[row["value"]] = row["id"]
243
self._inv_dict = dict((value, key) for key, value in self.iteritems())
245
def __getitem__(self, name):
246
# Use this for inserting new properties into the database
248
super(TableLookup, self).__getitem__(name)
250
self._cursor.execute(
251
"INSERT INTO %s (value) VALUES (?)" % self._table, (name,))
252
id = self._cursor.lastrowid
253
except sqlite3.IntegrityError:
254
# This shouldn't happen, but just in case
255
# FIXME: Maybe we should remove it?
256
id = self._cursor.execute("SELECT id FROM %s WHERE value=?"
257
% self._table, (name,)).fetchone()[0]
258
# If we are here it's a newly inserted value, insert it into cache
260
self._inv_dict[id] = name
264
# When we fetch an event, it either was already in the database
265
# at the time Zeitgeist started or it was inserted later -using
266
# Zeitgeist-, so here we always have the data in memory already.
267
return self._inv_dict[id]
270
# Use this when fetching values which are supposed to be in the
271
# database already. Eg., in find_eventids.
272
return super(TableLookup, self).__getitem__(name)
274
40
class ZeitgeistEngine:
276
42
def __init__ (self):
370
143
return sorted_events
372
def insert_events(self, events):
374
m = map(self._insert_event_without_error, events)
375
_cursor.connection.commit()
376
log.debug("Inserted %d events in %fs" % (len(m), time.time()-t))
379
def _insert_event_without_error(self, event):
381
return self._insert_event(event)
383
log.exception("error while inserting '%r'" %event)
386
def _insert_event(self, event):
387
if not isinstance(event, Event):
388
raise ValueError("cannot insert object of type %r" %type(event))
390
raise ValueError("Illegal event: Predefined event id")
391
if not event.subjects:
392
raise ValueError("Illegal event format: No subject")
393
if not event.timestamp:
394
event.timestamp = get_timestamp_for_now()
396
event = self.extensions.apply_insert_hooks(event)
398
raise AssertionError("Inserting of event was blocked by an extension")
399
elif not isinstance(event, Event):
400
raise ValueError("cannot insert object of type %r" %type(event))
402
id = self.next_event_id()
405
# TODO: Rigth now payloads are not unique and every event has its
406
# own one. We could optimize this to store those which are repeated
407
# for different events only once, especially considering that
408
# events cannot be modified once they've been inserted.
409
payload_id = self._cursor.execute(
410
"INSERT INTO payload (value) VALUES (?)", event.payload)
411
payload_id = self._cursor.lastrowid
413
# Don't use None here, as that'd be inserted literally into the DB
416
# Make sure all URIs are inserted
417
_origin = [subject.origin for subject in event.subjects if subject.origin]
418
self._cursor.execute("INSERT OR IGNORE INTO uri (value) %s"
419
% " UNION ".join(["SELECT ?"] * (len(event.subjects) + len(_origin))),
420
[subject.uri for subject in event.subjects] + _origin)
422
# Make sure all mimetypes are inserted
423
_mimetype = [subject.mimetype for subject in event.subjects \
424
if subject.mimetype and not subject.mimetype in self._mimetype]
425
if len(_mimetype) > 1:
426
self._cursor.execute("INSERT OR IGNORE INTO mimetype (value) %s"
427
% " UNION ".join(["SELECT ?"] * len(_mimetype)), _mimetype)
429
# Make sure all texts are inserted
430
_text = [subject.text for subject in event.subjects if subject.text]
432
self._cursor.execute("INSERT OR IGNORE INTO text (value) %s"
433
% " UNION ".join(["SELECT ?"] * len(_text)), _text)
435
# Make sure all storages are inserted
436
_storage = [subject.storage for subject in event.subjects if subject.storage]
438
self._cursor.execute("INSERT OR IGNORE INTO storage (value) %s"
439
% " UNION ".join(["SELECT ?"] * len(_storage)), _storage)
442
for subject in event.subjects:
443
self._cursor.execute("""
444
INSERT INTO event VALUES (
446
(SELECT id FROM uri WHERE value=?),
448
(SELECT id FROM uri WHERE value=?),
450
(SELECT id FROM text WHERE value=?),
451
(SELECT id from storage WHERE value=?)
455
self._interpretation[event.interpretation],
456
self._manifestation[event.manifestation],
457
self._actor[event.actor],
460
self._interpretation[subject.interpretation],
461
self._manifestation[subject.manifestation],
463
self._mimetype[subject.mimetype],
466
except sqlite3.IntegrityError:
467
# The event was already registered.
468
# Rollback _last_event_id and return the ID of the original event
469
self._last_event_id -= 1
470
self._cursor.execute("""
472
WHERE timestamp=? AND interpretation=? AND manifestation=?
474
""", (event.timestamp,
475
self._interpretation[event.interpretation],
476
self._manifestation[event.manifestation],
477
self._actor[event.actor]))
478
return self._cursor.fetchone()[0]
480
_cursor.connection.commit()
484
def delete_events (self, ids):
485
# Extract min and max timestamps for deleted events
486
self._cursor.execute("""
487
SELECT MIN(timestamp), MAX(timestamp)
490
""" % ",".join(["?"] * len(ids)), ids)
491
min_stamp, max_stamp = self._cursor.fetchone()
493
# FIXME: Delete unused interpretation/manifestation/text/etc.
494
self._cursor.execute("DELETE FROM event WHERE id IN (%s)"
495
% ",".join(["?"] * len(ids)), ids)
497
return min_stamp, max_stamp
500
146
def _build_templates(templates):
501
147
for event_template in templates:
658
321
if support >= min_support]
659
322
return [key for support, key in sorted(results, reverse=True)]
666
def __init__(self, relation):
667
self._conditions = []
669
self._relation = relation
670
self._no_result_member = False
673
return len(self._conditions)
675
def add(self, condition, arguments):
678
self._conditions.append(condition)
679
if not hasattr(arguments, "__iter__"):
680
self.arguments.append(arguments)
324
def insert_events(self, events):
326
m = map(self._insert_event_without_error, events)
327
self._cursor.connection.commit()
328
log.debug("Inserted %d events in %fs" % (len(m), time.time()-t))
331
def _insert_event_without_error(self, event):
333
return self._insert_event(event)
335
log.exception("error while inserting '%r'" %event)
338
def _insert_event(self, event):
339
if not isinstance(event, Event):
340
raise ValueError("cannot insert object of type %r" %type(event))
342
raise ValueError("Illegal event: Predefined event id")
343
if not event.subjects:
344
raise ValueError("Illegal event format: No subject")
345
if not event.timestamp:
346
event.timestamp = get_timestamp_for_now()
348
event = self.extensions.apply_insert_hooks(event)
350
raise AssertionError("Inserting of event was blocked by an extension")
351
elif not isinstance(event, Event):
352
raise ValueError("cannot insert object of type %r" %type(event))
354
id = self.next_event_id()
357
# TODO: Rigth now payloads are not unique and every event has its
358
# own one. We could optimize this to store those which are repeated
359
# for different events only once, especially considering that
360
# events cannot be modified once they've been inserted.
361
payload_id = self._cursor.execute(
362
"INSERT INTO payload (value) VALUES (?)", event.payload)
363
payload_id = self._cursor.lastrowid
682
self.arguments.extend(arguments)
684
def extend(self, where):
685
self.add(where.sql, where.arguments)
686
if not where.may_have_results():
687
if self._relation == self.AND:
689
self.register_no_result()
693
if self: # Do not return "()" if there are no conditions
694
return "(" + self._relation.join(self._conditions) + ")"
696
def register_no_result(self):
697
self._no_result_member = True
699
def may_have_results(self):
701
Return False if we know from our cached data that the query
702
will give no results.
704
return len(self._conditions) > 0 or not self._no_result_member
708
Reset this WhereClause to the state of a newly created one.
710
self._conditions = []
712
self._no_result_member = False
365
# Don't use None here, as that'd be inserted literally into the DB
368
# Make sure all URIs are inserted
369
_origin = [subject.origin for subject in event.subjects if subject.origin]
370
self._cursor.execute("INSERT OR IGNORE INTO uri (value) %s"
371
% " UNION ".join(["SELECT ?"] * (len(event.subjects) + len(_origin))),
372
[subject.uri for subject in event.subjects] + _origin)
374
# Make sure all mimetypes are inserted
375
_mimetype = [subject.mimetype for subject in event.subjects \
376
if subject.mimetype and not subject.mimetype in self._mimetype]
377
if len(_mimetype) > 1:
378
self._cursor.execute("INSERT OR IGNORE INTO mimetype (value) %s"
379
% " UNION ".join(["SELECT ?"] * len(_mimetype)), _mimetype)
381
# Make sure all texts are inserted
382
_text = [subject.text for subject in event.subjects if subject.text]
384
self._cursor.execute("INSERT OR IGNORE INTO text (value) %s"
385
% " UNION ".join(["SELECT ?"] * len(_text)), _text)
387
# Make sure all storages are inserted
388
_storage = [subject.storage for subject in event.subjects if subject.storage]
390
self._cursor.execute("INSERT OR IGNORE INTO storage (value) %s"
391
% " UNION ".join(["SELECT ?"] * len(_storage)), _storage)
394
for subject in event.subjects:
395
self._cursor.execute("""
396
INSERT INTO event VALUES (
398
(SELECT id FROM uri WHERE value=?),
400
(SELECT id FROM uri WHERE value=?),
402
(SELECT id FROM text WHERE value=?),
403
(SELECT id from storage WHERE value=?)
407
self._interpretation[event.interpretation],
408
self._manifestation[event.manifestation],
409
self._actor[event.actor],
412
self._interpretation[subject.interpretation],
413
self._manifestation[subject.manifestation],
415
self._mimetype[subject.mimetype],
418
except sqlite3.IntegrityError:
419
# The event was already registered.
420
# Rollback _last_event_id and return the ID of the original event
421
self._last_event_id -= 1
422
self._cursor.execute("""
424
WHERE timestamp=? AND interpretation=? AND manifestation=?
426
""", (event.timestamp,
427
self._interpretation[event.interpretation],
428
self._manifestation[event.manifestation],
429
self._actor[event.actor]))
430
return self._cursor.fetchone()[0]
432
self._cursor.connection.commit()
436
def delete_events (self, ids):
437
# Extract min and max timestamps for deleted events
438
self._cursor.execute("""
439
SELECT MIN(timestamp), MAX(timestamp)
442
""" % ",".join(["?"] * len(ids)), ids)
443
timestamps = self._cursor.fetchone()
446
# FIXME: Delete unused interpretation/manifestation/text/etc.
447
self._cursor.execute("DELETE FROM event WHERE id IN (%s)"
448
% ",".join(["?"] * len(ids)), ids)