1
# -*- test-case-name: axiom.test -*-
7
from zope.interface import implements
9
from twisted.python.filepath import FilePath
10
from twisted.internet import defer
11
from twisted.python.reflect import qual
12
from twisted.application.service import IService, MultiService
14
from axiom import _schema, attributes, upgrade, _fincache, iaxiom
16
from pysqlite2 import dbapi2 as sqlite
18
from axiom.item import Item, \
19
_typeNameToMostRecentClass, dummyItemSubclass,\
20
_legacyTypes, TABLE_NAME
22
IN_MEMORY_DATABASE = ':memory:'
24
tempCounter = itertools.count()
26
class XFilePath(FilePath):
28
return os.path.dirname(self.path)
31
if os.path.isdir(dirname):
36
class AtomicFile(file):
37
"""I am a file which is moved from temporary to permanent storage when it
40
After I'm closed, I will have a 'finalpath' property saying where I went.
43
implements(iaxiom.IAtomicFile)
45
def __init__(self, tempname, destpath):
46
self._destpath = destpath
47
file.__init__(self, tempname, 'w+b')
53
_md(self._destpath.dirname())
54
self.finalpath = self._destpath
55
os.rename(self.name, self.finalpath.path)
56
os.utime(self.finalpath.path, (now, now))
59
return defer.succeed(self.finalpath)
64
class PowerCable(Item):
65
typeName = 'powerup_connector'
68
powerup = attributes.reference()
69
interface = attributes.text()
70
priority = attributes.integer()
72
POWERUP_BEFORE = 1 # Priority for 'high' priority powerups.
73
POWERUP_AFTER = -1 # Priority for 'low' priority powerups.
77
transaction = None # current transaction object
79
def __init__(self, dbdir=None, debug=False, parent=None, idInParent=None):
80
if parent is not None or idInParent is not None:
81
assert parent is not None
82
assert idInParent is not None
84
self.idInParent = idInParent
86
self.autocommit = True
89
self.statementCache = {} # non-normalized => normalized qmark SQL
92
self.objectCache = _fincache.FinalizingCache()
96
dbfpath = IN_MEMORY_DATABASE
98
dbdir = os.path.abspath(dbdir)
99
dbfpath = os.path.join(dbdir, 'db.sqlite')
100
self.filesdir = os.path.join(dbdir, 'files')
101
if os.path.isdir(dbdir):
102
if not os.path.exists(dbfpath):
104
"The path %r is already a directory, but not an XAtop Store")
108
_md(os.path.join(dbdir, 'temp'))
110
self.connection = sqlite.connect(dbfpath)
111
self.cursor = self.connection.cursor()
112
self.activeTables = {} # tables which have had items added/removed
115
# install powerups if we've never powered on before;
116
create = not self.querySQL(_schema.HAS_SCHEMA_FEATURE,
117
['table', 'atop_types'])[0][0]
119
for stmt in _schema.BASE_SCHEMA:
120
self.executeSQL(stmt)
122
# activate services if we have(?)
123
# scheduler needs startup hook
125
self.tableQueries = {} # map typename: query string w/ storeID
126
# parameter. a typename is a persistent
127
# database handle for what we'll call a 'FQPN',
128
# i.e. arg to namedAny.
130
self.typenameToID = {} # map database-persistent typename to an oid in
133
self.typenameAndVersionToID = {} # obvious I hope
135
self.idToTypename = {} # map database-persistent typeID (oid in types
140
for oid, typename, version in self.querySQL(_schema.ALL_TYPES):
141
self.typenameAndVersionToID[typename, version] = oid
142
if version == _typeNameToMostRecentClass[typename].schemaVersion:
143
self.typenameToID[typename] = oid
144
self.idToTypename[oid] = typename
146
self._prepareOldVersionOf(oid, typename, version)
148
for typename in self.typenameToID:
149
self.checkTypeSchemaConsistency(typename)
151
def powerUp(self, powerup, interface, priority=0):
153
Installs a powerup (e.g. plugin) on the store.
155
Powerups will be returned in an iterator when queried for using the
156
'powerupsFor' method. Normally they will be returned in order of
157
installation [this may change in future versions, so please don't
158
depend on it]. Higher priorities are returned first. If you have
159
something that should run before "normal" powerups, pass
160
POWERUP_BEFORE; if you have something that should run after, pass
161
POWERUP_AFTER. We suggest not depending too heavily on order of
162
execution of your powerups, but if finer-grained control is necessary
163
you may pass any integer. Normal (unspecified) priority is zero.
165
@param powerup: an Item that implements C{interface}
166
@param interface: a zope interface
168
@param priority: An int; preferably either POWERUP_BEFORE,
169
POWERUP_AFTER, or unspecified.
171
PowerCable(store=self,
172
interface=unicode(qual(interface)),
176
def __conform__(self, interface):
178
if interface == IService:
179
if self.service is not None:
182
for subsvc in self.powerupsFor(interface):
183
subsvc.setServiceParent(svc)
187
for i in self.powerupsFor(interface):
190
def powerupsFor(self, interface):
192
Returns powerups installed using C{powerUp}.
194
for cable in self.query(PowerCable,
195
PowerCable.interface == unicode(qual(interface)),
196
sort=PowerCable.priority.descending):
199
def newFile(self, *path):
200
assert self.dbdir is not None, "Cannot create files in in-memory Stores (yet)"
201
tmpname = os.path.join(self.dbdir, 'temp', str(tempCounter.next())+".tmp")
204
XFilePath(os.path.join(self.dbdir, 'files', *path)))
206
def newDirectory(self, *path):
207
assert self.dbdir is not None, "Cannot create directories in in-memory Stores (yet)"
208
return FilePath(os.path.join(self.dbdir, 'files', *path))
210
def checkTypeSchemaConsistency(self, typename):
212
Called for all known types at database startup: make sure that what we know
213
(in memory) about this type is
216
# make sure that both the runtime and the database both know about this
217
# type; if they don't both know, we can't check that their views are
219
assert typename in self.typenameToID
220
if typename not in _typeNameToMostRecentClass:
221
print 'EARLY OUT CONSISTENCY CHECK: WHAT?'
223
typeID = self.typenameToID[typename]
224
actualType = _typeNameToMostRecentClass[typename]
226
inMemorySchema = [(storedAttribute.indexed, storedAttribute.sqltype,
227
storedAttribute.columnName)
228
for (name, storedAttribute) in actualType.getSchema()]
230
onDiskSchema = self.querySQL(_schema.IDENTIFYING_SCHEMA, [typeID])
232
if inMemorySchema != onDiskSchema:
234
"Schema mismatch on already-loaded %r object version %d: %r != %r" %
235
(actualType.typeName, actualType.schemaVersion, onDiskSchema, inMemorySchema))
237
if self.querySQL(_schema.GET_TYPE_OF_VERSION,
238
[typename, actualType.schemaVersion]):
240
"Greater versions of database %r objects in the DB than in memory" %
243
# finally find old versions of the data and prepare to upgrade it.
245
def _prepareOldVersionOf(self, typeID, typename, version):
246
dummyItemSubclass(*self._dssargs(typeID, typename, version))
248
def getOldVersionOf(self, typename, version):
249
return _legacyTypes[typename, version]
251
def _dssargs(self, typeID, typename, version):
253
Returns a 4-tuple suitable as args for dummyItemSubclass
256
appropriateSchema = self.querySQL(_schema.SCHEMA_FOR_TYPE, [typeID])
257
# create actual attribute objects
259
for indexed, pythontype, attribute, docstring in appropriateSchema:
260
atr = getattr(attributes, pythontype)(indexed=indexed,
262
dummyAttributes[attribute] = atr
264
retval = (typename, version, dummyAttributes, dummyBases)
268
# grab the schema for that version
269
# look up upgraders which push it forward
270
# insert "AutoUpgrader" class into idToTypename somehow(?)
273
def query(self, tableClass,
275
limit=None, offset=None,
277
if (tableClass.typeName,
278
tableClass.schemaVersion) not in self.typenameAndVersionToID:
280
if comparison is not None:
281
tables = set(comparison.getTableNames())
282
where = ['WHERE', comparison.getQuery()]
283
args = comparison.getArgsFor(self)
288
tables.add(tableClass.getTableName())
290
tableClass.getTableName(), '.oid,',
291
tableClass.getTableName(), '.*', 'FROM',
296
if limit is not None:
297
# XXX LIMIT and OFFSET used to be using ?, but they started
298
# generating syntax errors in places where generating the whole SQL
299
# statement does not. this smells like a bug in sqlite's parser to
300
# me, but I don't know my SQL syntax standards well enough to be
302
query.append('LIMIT ')
303
query.append(str(limit))
304
if offset is not None:
305
query.append('OFFSET ')
306
query.append(str(offset))
308
for row in self.querySQL(S, args):
309
yield self._loadedItem(
314
def _loadedItem(self, itemClass, storeID, attrs):
315
if self.objectCache.has(storeID):
316
result = self.objectCache.get(storeID)
317
# XXX do checks on consistency between attrs and DB object, maybe?
319
result = itemClass.existingInStore(self, storeID, attrs)
320
if not result.__legacy__:
321
self.objectCache.cache(storeID, result)
324
def checkpoint(self):
325
for item in self.transaction:
326
# XXX: it should be possible here, using various clever hacks, to
327
# automatically optimize functionally identical statements into
332
self.connection.rollback()
333
for item in self.transaction:
336
def transact(self, f, *a, **k):
337
self.transaction = set()
338
self.autocommit = False
348
for committed in self.transaction:
349
committed.committed()
352
self.autocommit = True
353
self.transaction = None
357
print '*'*10, 'COMMIT', '*'*10
358
self.connection.commit()
361
self.connection.close()
363
if not self.queryTimes:
366
print 'query:', self.avgms(self.queryTimes)
367
if not self.execTimes:
370
print 'exec:', self.avgms(self.execTimes)
373
return 'count: %d avg: %dus' % (len(l), int( (sum(l)/len(l)) * 1000000.),)
376
def getTypeID(self, tableClass):
379
key = (tableClass.typeName,
380
tableClass.schemaVersion)
381
if key in self.typenameAndVersionToID:
382
return self.typenameAndVersionToID[key]
388
# needs to be calculated including version
389
tableName = tableClass.getTableName()
392
sqlstr.append("CREATE TABLE %s (" % tableName)
394
for nam, atr in tableClass.getSchema():
395
# it's a stored attribute
396
sqlarg.append("\n%s %s" %
397
(atr.attrname, atr.sqltype))
401
sqlstr.append(', '.join(sqlarg))
404
typeID = self.executeSQL(_schema.CREATE_TYPE, [tableClass.typeName,
405
tableClass.schemaVersion])
407
for n, (name, storedAttribute) in enumerate(tableClass.getSchema()):
409
_schema.ADD_SCHEMA_ATTRIBUTE,
410
[typeID, n, storedAttribute.indexed, storedAttribute.sqltype,
411
storedAttribute.columnName, storedAttribute.doc,
412
storedAttribute.__class__.__name__])
413
# XXX probably need something better for pythontype eventually,
414
# when we figure out a good way to do user-defined attributes or we
415
# start parameterizing references.
417
self.typenameToID[tableClass.typeName] = typeID
418
self.typenameAndVersionToID[key] = typeID
419
self.idToTypename[typeID] = tableClass.typeName
421
# The table and index creation code is executed last, because SQLite
422
# has the extremely unfortunate habit of committing transactions as
423
# soon as you issue a (CREATE|DROP) (TABLE|INDEX)
425
# There is an obscure condition where the database could be left in an
426
# inconsistent state. If you do something that needs to be
427
# transactional, create the first instance of a particular item type,
428
# then do something which potentially raises an exception, then
429
# checkpoint, THEN do something that depends on your first
430
# transactional action, the database will commit as soon as the new
431
# item type is created.
433
# Note, however, that it is perfectly safe to do this _without_ a call
434
# to checkpoint() because none of the Python-level SQL will run until
435
# the database has checkpointed, either from user code or at the end of
438
self.executeSQL(''.join(sqlstr))
439
for index in indexes:
440
self.executeSQL('CREATE INDEX atopidx_%s_%s ON %s(%s)'
446
def getTableQuery(self, typename, version):
447
if typename not in self.tableQueries:
448
query = 'SELECT * FROM %s WHERE oid = ?' % (
449
TABLE_NAME(typename, version), )
450
self.tableQueries[typename, version] = query
451
return self.tableQueries[typename, version]
453
def getItemByID(self, storeID, autoUpgrade=True):
456
assert storeID is not None
457
if self.objectCache.has(storeID):
458
return self.objectCache.get(storeID)
459
results = self.querySQL(_schema.TYPEOF_QUERY, [storeID])
460
assert (len(results) in [1, 0]),\
461
"Database panic: more than one result for TYPEOF!"
463
typename, version = results[0]
464
# for the moment we're going to assume no inheritance
465
attrs = self.querySQL(self.getTableQuery(typename, version),
467
assert len(attrs) == 1, "No results for known-to-be-good object"
469
useMostRecent = False
470
moreRecentAvailable = False
471
if typename in _typeNameToMostRecentClass:
472
moreRecentAvailable = True
473
mostRecent = _typeNameToMostRecentClass[typename]
475
if mostRecent.schemaVersion < version:
476
raise RuntimeError("%s:%d - was found in the database and most recent %s is %d" %
477
(typename, version, typename, mostRecent.schemaVersion))
478
if mostRecent.schemaVersion == version:
483
T = self.getOldVersionOf(typename, version)
484
x = T.existingInStore(self, storeID, attrs)
485
if moreRecentAvailable and (not useMostRecent) and autoUpgrade:
486
x = upgrade.upgradeAllTheWay(x, typename, x.schemaVersion)
488
# X is upgraded all the way
489
self.objectCache.cache(storeID, x)
492
raise KeyError(storeID)
494
def _normalizeSQL(self, sql):
495
assert "'" not in sql, "Strings are _NOT ALLOWED_"
496
if sql not in self.statementCache:
498
lines = sql.split('\n')
500
line = line.split('--')[0] # remove comments
501
words = line.strip().split()
503
normsql = ' '.join(accum) # your SQL should never have any
504
# significant whitespace in it, right?
505
self.statementCache[sql] = normsql
506
return self.statementCache[sql]
509
def querySQL(self, sql, args=()):
510
"""For use with SELECT (or SELECT-like PRAGMA) statements.
512
sql = self._normalizeSQL(sql)
514
result = timeinto(self.queryTimes, self._queryandfetch, sql, args)
516
result = self._queryandfetch(sql, args)
519
def _queryandfetch(self, sql, args):
521
print '**', sql, '--', ', '.join(map(str, args))
523
self.cursor.execute(sql, args)
524
except (sqlite.ProgrammingError, sqlite.OperationalError), oe:
525
raise RuntimeError("SQL: %r(%r) caused exception: %s" %(
527
result = self.cursor.fetchall()
531
print ' lastrow:', self.cursor.lastrowid
532
print ' result:', result
535
def executeSQL(self, sql, args=()):
537
For use with UPDATE, INSERT or CREATE statements.
539
sql = self._normalizeSQL(sql)
541
rows = timeinto(self.execTimes, self._queryandfetch, sql, args)
543
rows = self._queryandfetch(sql, args)
545
result = self.cursor.lastrowid
549
def timeinto(l, f, *a, **k):
562
class StorageService(MultiService):
564
def __init__(self, *a, **k):
565
MultiService.__init__(self)
570
def privilegedStartService(self):
571
print 'privstartserv'
572
self.store = Store(*self.a, **self.k)
574
IService(self.store).setServiceParent(self)
575
print 'service parent set'
576
MultiService.privilegedStartService(self)
577
print 'service parent started', list(IService(self.store))
579
def close(self, ignored=None):
580
# final close method, called after the service has been stopped
586
def stopService(self):
587
print 'stopping service'
588
return defer.maybeDeferred(
589
MultiService.stopService, self).addBoth(