1
# -.- coding: utf-8 -.-
5
# Copyright © 2009-2010 Siegfried-Angel Gevatter Pujals <rainct@ubuntu.com>
6
# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@gmail.com>
7
# Copyright © 2009-2011 Markus Korn <thekorn@gmx.net>
8
# Copyright © 2009 Seif Lotfy <seif@lotfy.com>
9
# Copyright © 2011 J.P. Lacerda <jpaflacerda@gmail.com>
10
# Copyright © 2011 Collabora Ltd.
11
# By Siegfried-Angel Gevatter Pujals <rainct@ubuntu.com>
13
# This program is free software: you can redistribute it and/or modify
14
# it under the terms of the GNU Lesser General Public License as published by
15
# the Free Software Foundation, either version 2.1 of the License, or
16
# (at your option) any later version.
18
# This program is distributed in the hope that it will be useful,
19
# but WITHOUT ANY WARRANTY; without even the implied warranty of
20
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21
# GNU Lesser General Public License for more details.
23
# You should have received a copy of the GNU Lesser General Public License
24
# along with this program. If not, see <http://www.gnu.org/licenses/>.
32
from _zeitgeist.engine import constants
34
log = logging.getLogger("zeitgeist.sql")
38
"subj_mimetype": "mimetype",
41
"subj_current_uri": "uri",
44
def explain_query(cursor, statement, arguments=()):
46
for r in cursor.execute("EXPLAIN QUERY PLAN "+statement, arguments).fetchall():
47
plan += str(list(r)) + "\n"
48
log.debug("Got query:\nQUERY:\n%s (%s)\nPLAN:\n%s" % (statement, arguments, plan))
50
class UnicodeCursor(sqlite3.Cursor):
52
debug_explain = os.getenv("ZEITGEIST_DEBUG_QUERY_PLANS")
56
if isinstance(obj, (int, long)):
57
# thekorn: as long as we are using the unary operator for timestamp
58
# related queries we have to make sure that integers are not
59
# converted to strings, same applies for long numbers.
61
if isinstance(obj, str):
62
obj = obj.decode("UTF-8")
63
# seif: Python’s default encoding is ASCII, so whenever a character with
64
# an ASCII value > 127 is in the input data, you’ll get a UnicodeDecodeError
65
# because that character can’t be handled by the ASCII encoding.
68
except UnicodeDecodeError, ex:
72
def execute(self, statement, parameters=()):
73
parameters = [self.fix_unicode(p) for p in parameters]
74
if UnicodeCursor.debug_explain:
75
explain_query(super(UnicodeCursor, self), statement, parameters)
76
return super(UnicodeCursor, self).execute(statement, parameters)
78
def fetch(self, index=None):
86
def _get_schema_version (cursor, schema_name):
88
Returns the schema version for schema_name or returns 0 in case
89
the schema doesn't exist.
92
schema_version_result = cursor.execute("""
93
SELECT version FROM schema_version WHERE schema=?
95
result = schema_version_result.fetchone()
96
return result[0] if result else 0
97
except sqlite3.OperationalError, e:
98
# The schema isn't there...
99
log.debug ("Schema '%s' not found: %s" % (schema_name, e))
102
def _set_schema_version (cursor, schema_name, version):
104
Sets the version of `schema_name` to `version`
107
CREATE TABLE IF NOT EXISTS schema_version
108
(schema VARCHAR PRIMARY KEY ON CONFLICT REPLACE, version INT)
111
# The 'ON CONFLICT REPLACE' on the PK converts INSERT to UPDATE
114
INSERT INTO schema_version VALUES (?, ?)
115
""", (schema_name, version))
116
cursor.connection.commit()
118
def _do_schema_upgrade (cursor, schema_name, old_version, new_version):
120
Try and upgrade schema `schema_name` from version `old_version` to
121
`new_version`. This is done by executing a series of upgrade modules
122
named '_zeitgeist.engine.upgrades.$schema_name_$(i)_$(i+1)' and executing
123
the run(cursor) method of those modules until new_version is reached
126
_set_schema_version(cursor, schema_name, -1)
127
for i in xrange(old_version, new_version):
128
# Fire off the right upgrade module
129
log.info("Upgrading database '%s' from version %s to %s. "
130
"This may take a while" % (schema_name, i, i+1))
131
upgrader_name = "%s_%s_%s" % (schema_name, i, i+1)
132
module = __import__ ("_zeitgeist.engine.upgrades.%s" % upgrader_name)
133
eval("module.engine.upgrades.%s.run(cursor)" % upgrader_name)
135
# Update the schema version
136
_set_schema_version(cursor, schema_name, new_version)
138
log.info("Upgrade succesful")
140
def _check_core_schema_upgrade (cursor):
142
Checks whether the schema is good or, if it is outdated, triggers any
143
necessary upgrade scripts. This method will also attempt to restore a
144
database backup in case a previous upgrade was cancelled midway.
146
It returns a boolean indicating whether the schema was good and the
147
database cursor (which will have changed if the database was restored).
149
# See if we have the right schema version, and try an upgrade if needed
150
core_schema_version = _get_schema_version(cursor, constants.CORE_SCHEMA)
151
if core_schema_version >= constants.CORE_SCHEMA_VERSION:
155
if core_schema_version <= -1:
156
cursor.connection.commit()
157
cursor.connection.close()
159
cursor = _connect_to_db(constants.DATABASE_FILE)
160
core_schema_version = _get_schema_version(cursor,
161
constants.CORE_SCHEMA)
162
log.exception("Database corrupted at upgrade -- "
163
"upgrading from version %s" % core_schema_version)
165
_do_schema_upgrade (cursor,
166
constants.CORE_SCHEMA,
168
constants.CORE_SCHEMA_VERSION)
170
# Don't return here. The upgrade process might depend on the
171
# tables, indexes, and views being set up (to avoid code dup)
172
log.info("Running post upgrade setup")
174
except sqlite3.OperationalError:
175
# Something went wrong while applying the upgrade -- this is
176
# probably due to a non existing table (this occurs when
177
# applying core_3_4, for example). We just need to fall through
178
# the rest of create_db to fix this...
179
log.exception("Database corrupted -- proceeding")
183
"Failed to upgrade database '%s' from version %s to %s: %s" % \
184
(constants.CORE_SCHEMA, core_schema_version,
185
constants.CORE_SCHEMA_VERSION, e))
188
def _do_schema_backup ():
189
shutil.copyfile(constants.DATABASE_FILE, constants.DATABASE_FILE_BACKUP)
191
def _do_schema_restore ():
192
shutil.move(constants.DATABASE_FILE_BACKUP, constants.DATABASE_FILE)
194
def _connect_to_db(file_path):
195
conn = sqlite3.connect(file_path)
196
conn.row_factory = sqlite3.Row
197
cursor = conn.cursor(UnicodeCursor)
200
def create_db(file_path):
201
"""Create the database and return a default cursor for it"""
203
log.info("Using database: %s" % file_path)
204
new_database = not os.path.exists(file_path)
205
cursor = _connect_to_db(file_path)
207
# Seif: as result of the optimization story (LP: #639737) we are setting
208
# journal_mode to WAL if possible, this change is irreversible but
209
# gains us a big speedup, for more information see http://www.sqlite.org/wal.html
210
# FIXME: Set journal_mode to WAL when teamdecision has been take.
211
# cursor.execute("PRAGMA journal_mode = WAL")
212
cursor.execute("PRAGMA journal_mode = DELETE")
213
# Seif: another result of the performance tweaks discussed in (LP: #639737)
214
# we decided to set locking_mode to EXCLUSIVE, from now on only
215
# one connection to the database is allowed to revert this setting set locking_mode to NORMAL.
216
cursor.execute("PRAGMA locking_mode = EXCLUSIVE")
218
# thekorn: as part of the workaround for (LP: #598666) we need to
219
# create the '_fix_cache' TEMP table on every start,
220
# this table gets purged once the engine gets closed.
221
# When a cached value gets deleted we automatically store the name
222
# of the cache and the value's id to this table. It's then up to
223
# the python code to delete items from the cache based on the content
225
cursor.execute("CREATE TEMP TABLE _fix_cache (table_name VARCHAR, id INTEGER)")
227
# Always assume that temporary memory backed DBs have good schemas
228
if constants.DATABASE_FILE != ":memory:" and not new_database:
229
do_upgrade, cursor = _check_core_schema_upgrade(cursor)
231
_time = (time.time() - start)*1000
232
log.debug("Core schema is good. DB loaded in %sms" % _time)
235
# the following sql statements are only executed if a new database
236
# is created or an update of the core schema was done
237
log.debug("Updating sql schema")
240
CREATE TABLE IF NOT EXISTS uri
241
(id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
244
CREATE UNIQUE INDEX IF NOT EXISTS uri_value ON uri(value)
249
CREATE TABLE IF NOT EXISTS interpretation
250
(id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
253
CREATE UNIQUE INDEX IF NOT EXISTS interpretation_value
254
ON interpretation(value)
259
CREATE TABLE IF NOT EXISTS manifestation
260
(id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
263
CREATE UNIQUE INDEX IF NOT EXISTS manifestation_value
264
ON manifestation(value)""")
268
CREATE TABLE IF NOT EXISTS mimetype
269
(id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
272
CREATE UNIQUE INDEX IF NOT EXISTS mimetype_value
273
ON mimetype(value)""")
277
CREATE TABLE IF NOT EXISTS actor
278
(id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
281
CREATE UNIQUE INDEX IF NOT EXISTS actor_value
286
CREATE TABLE IF NOT EXISTS text
287
(id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
290
CREATE UNIQUE INDEX IF NOT EXISTS text_value
293
# payload, there's no value index for payload,
294
# they can only be fetched by id
296
CREATE TABLE IF NOT EXISTS payload
297
(id INTEGER PRIMARY KEY, value BLOB)
300
# storage, represented by a StatefulEntityTable
302
CREATE TABLE IF NOT EXISTS storage
303
(id INTEGER PRIMARY KEY,
304
value VARCHAR UNIQUE,
307
display_name VARCHAR)
310
CREATE UNIQUE INDEX IF NOT EXISTS storage_value
311
ON storage(value)""")
313
# event - the primary table for log statements
314
# - Note that event.id is NOT unique, we can have multiple subjects per ID
315
# - Timestamps are integers.
316
# - (event-)origin and subj_id_current are added to the end of the table
318
CREATE TABLE IF NOT EXISTS event (
321
interpretation INTEGER,
322
manifestation INTEGER,
326
subj_interpretation INTEGER,
327
subj_manifestation INTEGER,
329
subj_mimetype INTEGER,
331
subj_storage INTEGER,
333
subj_id_current INTEGER,
334
CONSTRAINT interpretation_fk FOREIGN KEY(interpretation)
335
REFERENCES interpretation(id) ON DELETE CASCADE,
336
CONSTRAINT manifestation_fk FOREIGN KEY(manifestation)
337
REFERENCES manifestation(id) ON DELETE CASCADE,
338
CONSTRAINT actor_fk FOREIGN KEY(actor)
339
REFERENCES actor(id) ON DELETE CASCADE,
340
CONSTRAINT origin_fk FOREIGN KEY(origin)
341
REFERENCES uri(id) ON DELETE CASCADE,
342
CONSTRAINT payload_fk FOREIGN KEY(payload)
343
REFERENCES payload(id) ON DELETE CASCADE,
344
CONSTRAINT subj_id_fk FOREIGN KEY(subj_id)
345
REFERENCES uri(id) ON DELETE CASCADE,
346
CONSTRAINT subj_id_current_fk FOREIGN KEY(subj_id_current)
347
REFERENCES uri(id) ON DELETE CASCADE,
348
CONSTRAINT subj_interpretation_fk FOREIGN KEY(subj_interpretation)
349
REFERENCES interpretation(id) ON DELETE CASCADE,
350
CONSTRAINT subj_manifestation_fk FOREIGN KEY(subj_manifestation)
351
REFERENCES manifestation(id) ON DELETE CASCADE,
352
CONSTRAINT subj_origin_fk FOREIGN KEY(subj_origin)
353
REFERENCES uri(id) ON DELETE CASCADE,
354
CONSTRAINT subj_mimetype_fk FOREIGN KEY(subj_mimetype)
355
REFERENCES mimetype(id) ON DELETE CASCADE,
356
CONSTRAINT subj_text_fk FOREIGN KEY(subj_text)
357
REFERENCES text(id) ON DELETE CASCADE,
358
CONSTRAINT subj_storage_fk FOREIGN KEY(subj_storage)
359
REFERENCES storage(id) ON DELETE CASCADE,
360
CONSTRAINT unique_event UNIQUE (timestamp, interpretation, manifestation, actor, subj_id)
364
CREATE INDEX IF NOT EXISTS event_id
367
CREATE INDEX IF NOT EXISTS event_timestamp
368
ON event(timestamp)""")
370
CREATE INDEX IF NOT EXISTS event_interpretation
371
ON event(interpretation)""")
373
CREATE INDEX IF NOT EXISTS event_manifestation
374
ON event(manifestation)""")
376
CREATE INDEX IF NOT EXISTS event_actor
379
CREATE INDEX IF NOT EXISTS event_origin
382
CREATE INDEX IF NOT EXISTS event_subj_id
383
ON event(subj_id)""")
385
CREATE INDEX IF NOT EXISTS event_subj_id_current
386
ON event(subj_id_current)""")
388
CREATE INDEX IF NOT EXISTS event_subj_interpretation
389
ON event(subj_interpretation)""")
391
CREATE INDEX IF NOT EXISTS event_subj_manifestation
392
ON event(subj_manifestation)""")
394
CREATE INDEX IF NOT EXISTS event_subj_origin
395
ON event(subj_origin)""")
397
CREATE INDEX IF NOT EXISTS event_subj_mimetype
398
ON event(subj_mimetype)""")
400
CREATE INDEX IF NOT EXISTS event_subj_text
401
ON event(subj_text)""")
403
CREATE INDEX IF NOT EXISTS event_subj_storage
404
ON event(subj_storage)""")
406
# Foreign key constraints don't work in SQLite. Yay!
407
for table, columns in (
408
('interpretation', ('interpretation', 'subj_interpretation')),
409
('manifestation', ('manifestation', 'subj_manifestation')),
410
('actor', ('actor',)),
411
('payload', ('payload',)),
412
('mimetype', ('subj_mimetype',)),
413
('text', ('subj_text',)),
414
('storage', ('subj_storage',)),
416
for column in columns:
418
CREATE TRIGGER IF NOT EXISTS fkdc_event_%(column)s
419
BEFORE DELETE ON event
420
WHEN ((SELECT COUNT(*) FROM event WHERE %(column)s=OLD.%(column)s) < 2)
422
DELETE FROM %(table)s WHERE id=OLD.%(column)s;
424
""" % {'column': column, 'table': table})
427
for num, column in enumerate(('subj_id', 'subj_origin',
428
'subj_id_current', 'origin')):
430
CREATE TRIGGER IF NOT EXISTS fkdc_event_uri_%(num)d
431
BEFORE DELETE ON event
436
origin=OLD.%(column)s
437
OR subj_id=OLD.%(column)s
438
OR subj_id_current=OLD.%(column)s
439
OR subj_origin=OLD.%(column)s
442
DELETE FROM uri WHERE id=OLD.%(column)s;
444
""" % {'num': num+1, 'column': column})
446
cursor.execute("DROP VIEW IF EXISTS event_view")
448
CREATE VIEW IF NOT EXISTS event_view AS
451
event.interpretation,
454
(SELECT value FROM payload WHERE payload.id=event.payload)
456
(SELECT value FROM uri WHERE uri.id=event.subj_id)
458
event.subj_id, -- #this directly points to an id in the uri table
459
event.subj_interpretation,
460
event.subj_manifestation,
462
(SELECT value FROM uri WHERE uri.id=event.subj_origin)
465
(SELECT value FROM text WHERE text.id = event.subj_text)
467
(SELECT value FROM storage
468
WHERE storage.id=event.subj_storage) AS subj_storage,
469
(SELECT state FROM storage
470
WHERE storage.id=event.subj_storage) AS subj_storage_state,
472
(SELECT value FROM uri WHERE uri.id=event.origin)
474
(SELECT value FROM uri WHERE uri.id=event.subj_id_current)
476
event.subj_id_current
480
# All good. Set the schema version, so we don't have to do all this
481
# sql the next time around
482
_set_schema_version (cursor, constants.CORE_SCHEMA, constants.CORE_SCHEMA_VERSION)
483
_time = (time.time() - start)*1000
484
log.info("DB set up in %sms" % _time)
485
cursor.connection.commit()
490
def get_default_cursor():
493
dbfile = constants.DATABASE_FILE
494
_cursor = create_db(dbfile)
500
class TableLookup(dict):
502
# We are not using an LRUCache as pressumably there won't be thousands
503
# of manifestations/interpretations/mimetypes/actors on most
504
# installations, so we can save us the overhead of tracking their usage.
506
def __init__(self, cursor, table):
508
self._cursor = cursor
511
for row in cursor.execute("SELECT id, value FROM %s" % table):
512
self[row["value"]] = row["id"]
514
self._inv_dict = dict((value, key) for key, value in self.iteritems())
517
CREATE TEMP TRIGGER update_cache_%(table)s
518
BEFORE DELETE ON %(table)s
520
INSERT INTO _fix_cache VALUES ("%(table)s", OLD.id);
522
""" % {"table": table})
524
def __getitem__(self, name):
525
# Use this for inserting new properties into the database
527
return super(TableLookup, self).__getitem__(name)
529
self._cursor.execute(
530
"INSERT INTO %s (value) VALUES (?)" % self._table, (name,))
531
id = self._cursor.lastrowid
532
except sqlite3.IntegrityError:
533
# This shouldn't happen, but just in case
534
# FIXME: Maybe we should remove it?
535
id = self._cursor.execute("SELECT id FROM %s WHERE value=?"
536
% self._table, (name,)).fetchone()[0]
537
# If we are here it's a newly inserted value, insert it into cache
539
self._inv_dict[id] = name
543
# When we fetch an event, it either was already in the database
544
# at the time Zeitgeist started or it was inserted later -using
545
# Zeitgeist-, so here we always have the data in memory already.
546
return self._inv_dict[id]
549
# Use this when fetching values which are supposed to be in the
550
# database already. Eg., in find_eventids.
551
return super(TableLookup, self).__getitem__(name)
553
def remove_id(self, id):
554
value = self.value(id)
555
del self._inv_dict[id]
558
def get_right_boundary(text):
559
""" returns the smallest string which is greater than `text` """
561
# if the search prefix is empty we query for the whole range
562
# of 'utf-8 'unicode chars
563
return unichr(0x10ffff)
564
if isinstance(text, str):
565
# we need to make sure the text is decoded as 'utf-8' unicode
566
text = unicode(text, "UTF-8")
567
charpoint = ord(text[-1])
568
if charpoint == 0x10ffff:
569
# if the last character is the biggest possible char we need to
570
# look at the second last
571
return get_right_boundary(text[:-1])
572
return text[:-1] + unichr(charpoint+1)
576
This class provides a convenient representation a SQL `WHERE' clause,
577
composed of a set of conditions joined together.
579
The relation between conditions can be either of type *AND* or *OR*, but
580
not both. To create more complex clauses, use several :class:`WhereClause`
581
instances and joining them together using :meth:`extend`.
583
Instances of this class can then be used to obtain a line of SQL code and
584
a list of arguments, for use with the SQLite3 module, accessing the
585
appropriate properties:
586
>>> where.sql, where.arguments
594
def optimize_glob(column, table, prefix):
595
"""returns an optimized version of the GLOB statement as described
596
in http://www.sqlite.org/optoverview.html `4.0 The LIKE optimization`
598
if isinstance(prefix, str):
599
# we need to make sure the text is decoded as 'utf-8' unicode
600
prefix = unicode(prefix, "UTF-8")
602
# empty prefix means 'select all', no way to optimize this
603
sql = "SELECT %s FROM %s" %(column, table)
605
elif all([i == unichr(0x10ffff) for i in prefix]):
606
sql = "SELECT %s FROM %s WHERE value >= ?" %(column, table)
607
return sql, (prefix,)
609
sql = "SELECT %s FROM %s WHERE (value >= ? AND value < ?)" %(column, table)
610
return sql, (prefix, get_right_boundary(prefix))
612
def __init__(self, relation, negation=False):
613
self._conditions = []
615
self._relation = relation
616
self._no_result_member = False
617
self._negation = negation
620
return len(self._conditions)
622
def add(self, condition, arguments=None):
625
self._conditions.append(condition)
626
if arguments is not None:
627
if not hasattr(arguments, "__iter__"):
628
self.arguments.append(arguments)
630
self.arguments.extend(arguments)
632
def add_text_condition(self, column, value, like=False, negation=False, cache=None):
634
assert column in ("origin", "subj_uri", "subj_current_uri",
635
"subj_origin", "actor", "subj_mimetype"), \
636
"prefix search on the %r column is not supported by zeitgeist" % column
637
if column == "subj_uri":
638
# subj_id directly points to the id of an uri entry
639
view_column = "subj_id"
640
elif column == "subj_current_uri":
641
view_column = "subj_id_current"
644
optimized_glob, value = self.optimize_glob("id", TABLE_MAP.get(column, column), value)
645
sql = "%s %sIN (%s)" %(view_column, self.NOT if negation else "", optimized_glob)
647
sql += " OR %s IS NULL" % view_column
649
if column == "origin":
650
column ="event_origin_uri"
651
elif column == "subj_origin":
652
column = "subj_origin_uri"
653
sql = "%s %s= ?" %(column, "!" if negation else "")
654
if cache is not None:
658
def extend(self, where):
659
self.add(where.sql, where.arguments)
660
if not where.may_have_results():
661
if self._relation == self.AND:
663
self.register_no_result()
667
if self: # Do not return "()" if there are no conditions
668
negation = self.NOT if self._negation else ""
669
return "%s(%s)" %(negation, self._relation.join(self._conditions))
671
def register_no_result(self):
672
self._no_result_member = True
674
def may_have_results(self):
676
Return False if we know from our cached data that the query
677
will give no results.
679
return len(self._conditions) > 0 or not self._no_result_member
683
Reset this WhereClause to the state of a newly created one.
685
self._conditions = []
687
self._no_result_member = False