~divmod-dev/divmod.org/dangling-1091

« back to all changes in this revision

Viewing changes to Axiom/axiom/store.py

  • Committer: glyph
  • Date: 2005-07-28 22:09:16 UTC
  • Revision ID: svn-v4:866e43f7-fbfc-0310-8f2a-ec88d1da2979:trunk:2
move this repository to a more official-looking URL

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: axiom.test -*-
 
2
 
 
3
import time
 
4
import os
 
5
import itertools
 
6
 
 
7
from zope.interface import implements
 
8
 
 
9
from twisted.python.filepath import FilePath
 
10
from twisted.internet import defer
 
11
from twisted.python.reflect import qual
 
12
from twisted.application.service import IService, MultiService
 
13
 
 
14
from axiom import _schema, attributes, upgrade, _fincache, iaxiom
 
15
 
 
16
from pysqlite2 import dbapi2 as sqlite
 
17
 
 
18
from axiom.item import Item, \
 
19
    _typeNameToMostRecentClass, dummyItemSubclass,\
 
20
    _legacyTypes, TABLE_NAME
 
21
 
 
22
IN_MEMORY_DATABASE = ':memory:'
 
23
 
 
24
tempCounter = itertools.count()
 
25
 
 
26
class XFilePath(FilePath):
 
27
    def dirname(self):
 
28
        return os.path.dirname(self.path)
 
29
 
 
30
def _md(dirname):
 
31
    if os.path.isdir(dirname):
 
32
        return False
 
33
    os.makedirs(dirname)
 
34
    return True
 
35
 
 
36
class AtomicFile(file):
 
37
    """I am a file which is moved from temporary to permanent storage when it
 
38
    is closed.
 
39
 
 
40
    After I'm closed, I will have a 'finalpath' property saying where I went.
 
41
    """
 
42
 
 
43
    implements(iaxiom.IAtomicFile)
 
44
 
 
45
    def __init__(self, tempname, destpath):
 
46
        self._destpath = destpath
 
47
        file.__init__(self, tempname, 'w+b')
 
48
 
 
49
    def close(self):
 
50
        now = time.time()
 
51
        try:
 
52
            file.close(self)
 
53
            _md(self._destpath.dirname())
 
54
            self.finalpath = self._destpath
 
55
            os.rename(self.name, self.finalpath.path)
 
56
            os.utime(self.finalpath.path, (now, now))
 
57
        except:
 
58
            return defer.fail()
 
59
        return defer.succeed(self.finalpath)
 
60
 
 
61
    def abort(self):
 
62
        os.unlink(self.name)
 
63
 
 
64
class PowerCable(Item):
 
65
    typeName = 'powerup_connector'
 
66
    schemaVersion = 1
 
67
 
 
68
    powerup = attributes.reference()
 
69
    interface = attributes.text()
 
70
    priority = attributes.integer()
 
71
 
 
72
POWERUP_BEFORE = 1              # Priority for 'high' priority powerups.
 
73
POWERUP_AFTER = -1              # Priority for 'low' priority powerups.
 
74
 
 
75
class Store:
 
76
 
 
77
    transaction = None          # current transaction object
 
78
 
 
79
    def __init__(self, dbdir=None, debug=False, parent=None, idInParent=None):
 
80
        if parent is not None or idInParent is not None:
 
81
            assert parent is not None
 
82
            assert idInParent is not None
 
83
        self.parent = parent
 
84
        self.idInParent = idInParent
 
85
        self.debug = debug
 
86
        self.autocommit = True
 
87
        self.queryTimes = []
 
88
        self.execTimes = []
 
89
        self.statementCache = {} # non-normalized => normalized qmark SQL
 
90
                                 # statements
 
91
 
 
92
        self.objectCache = _fincache.FinalizingCache()
 
93
 
 
94
 
 
95
        if dbdir is None:
 
96
            dbfpath = IN_MEMORY_DATABASE
 
97
        else:
 
98
            dbdir = os.path.abspath(dbdir)
 
99
            dbfpath = os.path.join(dbdir, 'db.sqlite')
 
100
            self.filesdir = os.path.join(dbdir, 'files')
 
101
            if os.path.isdir(dbdir):
 
102
                if not os.path.exists(dbfpath):
 
103
                    raise OSError(
 
104
                        "The path %r is already a directory, but not an XAtop Store")
 
105
            else:
 
106
                _md(dbdir)
 
107
                _md(self.filesdir)
 
108
                _md(os.path.join(dbdir, 'temp'))
 
109
        self.dbdir = dbdir
 
110
        self.connection = sqlite.connect(dbfpath)
 
111
        self.cursor = self.connection.cursor()
 
112
        self.activeTables = {}  # tables which have had items added/removed
 
113
                                # this run
 
114
 
 
115
        # install powerups if we've never powered on before;
 
116
        create = not self.querySQL(_schema.HAS_SCHEMA_FEATURE,
 
117
                                   ['table', 'atop_types'])[0][0]
 
118
        if create:
 
119
            for stmt in _schema.BASE_SCHEMA:
 
120
                self.executeSQL(stmt)
 
121
 
 
122
        # activate services if we have(?)
 
123
        # scheduler needs startup hook
 
124
 
 
125
        self.tableQueries = {}  # map typename: query string w/ storeID
 
126
                                # parameter.  a typename is a persistent
 
127
                                # database handle for what we'll call a 'FQPN',
 
128
                                # i.e. arg to namedAny.
 
129
 
 
130
        self.typenameToID = {} # map database-persistent typename to an oid in
 
131
                               # the types table
 
132
 
 
133
        self.typenameAndVersionToID = {} # obvious I hope
 
134
 
 
135
        self.idToTypename = {} # map database-persistent typeID (oid in types
 
136
                               # table) to typename
 
137
 
 
138
        self.service = None
 
139
 
 
140
        for oid, typename, version in self.querySQL(_schema.ALL_TYPES):
 
141
            self.typenameAndVersionToID[typename, version] = oid
 
142
            if version == _typeNameToMostRecentClass[typename].schemaVersion:
 
143
                self.typenameToID[typename] = oid
 
144
                self.idToTypename[oid] = typename
 
145
            else:
 
146
                self._prepareOldVersionOf(oid, typename, version)
 
147
 
 
148
        for typename in self.typenameToID:
 
149
            self.checkTypeSchemaConsistency(typename)
 
150
 
 
151
    def powerUp(self, powerup, interface, priority=0):
 
152
        """
 
153
        Installs a powerup (e.g. plugin) on the store.
 
154
 
 
155
        Powerups will be returned in an iterator when queried for using the
 
156
        'powerupsFor' method.  Normally they will be returned in order of
 
157
        installation [this may change in future versions, so please don't
 
158
        depend on it].  Higher priorities are returned first.  If you have
 
159
        something that should run before "normal" powerups, pass
 
160
        POWERUP_BEFORE; if you have something that should run after, pass
 
161
        POWERUP_AFTER.  We suggest not depending too heavily on order of
 
162
        execution of your powerups, but if finer-grained control is necessary
 
163
        you may pass any integer.  Normal (unspecified) priority is zero.
 
164
 
 
165
        @param powerup: an Item that implements C{interface}
 
166
        @param interface: a zope interface
 
167
 
 
168
        @param priority: An int; preferably either POWERUP_BEFORE,
 
169
        POWERUP_AFTER, or unspecified.
 
170
        """
 
171
        PowerCable(store=self,
 
172
                   interface=unicode(qual(interface)),
 
173
                   powerup=powerup,
 
174
                   priority=priority)
 
175
 
 
176
    def __conform__(self, interface):
 
177
 
 
178
        if interface == IService:
 
179
            if self.service is not None:
 
180
                return self.service
 
181
            svc = MultiService()
 
182
            for subsvc in self.powerupsFor(interface):
 
183
                subsvc.setServiceParent(svc)
 
184
            self.service = svc
 
185
            return svc
 
186
 
 
187
        for i in self.powerupsFor(interface):
 
188
            return i
 
189
 
 
190
    def powerupsFor(self, interface):
 
191
        """
 
192
        Returns powerups installed using C{powerUp}.
 
193
        """
 
194
        for cable in self.query(PowerCable,
 
195
                                PowerCable.interface == unicode(qual(interface)),
 
196
                                sort=PowerCable.priority.descending):
 
197
            yield cable.powerup
 
198
 
 
199
    def newFile(self, *path):
 
200
        assert self.dbdir is not None, "Cannot create files in in-memory Stores (yet)"
 
201
        tmpname = os.path.join(self.dbdir, 'temp', str(tempCounter.next())+".tmp")
 
202
        return AtomicFile(
 
203
            tmpname,
 
204
            XFilePath(os.path.join(self.dbdir, 'files', *path)))
 
205
 
 
206
    def newDirectory(self, *path):
 
207
        assert self.dbdir is not None, "Cannot create directories in in-memory Stores (yet)"
 
208
        return FilePath(os.path.join(self.dbdir, 'files', *path))
 
209
 
 
210
    def checkTypeSchemaConsistency(self, typename):
 
211
        """
 
212
        Called for all known types at database startup: make sure that what we know
 
213
        (in memory) about this type is
 
214
 
 
215
        """
 
216
        # make sure that both the runtime and the database both know about this
 
217
        # type; if they don't both know, we can't check that their views are
 
218
        # consistent
 
219
        assert typename in self.typenameToID
 
220
        if typename not in _typeNameToMostRecentClass:
 
221
            print 'EARLY OUT CONSISTENCY CHECK: WHAT?'
 
222
            return
 
223
        typeID = self.typenameToID[typename]
 
224
        actualType = _typeNameToMostRecentClass[typename]
 
225
        #
 
226
        inMemorySchema = [(storedAttribute.indexed, storedAttribute.sqltype,
 
227
                           storedAttribute.columnName)
 
228
                          for (name, storedAttribute) in actualType.getSchema()]
 
229
 
 
230
        onDiskSchema = self.querySQL(_schema.IDENTIFYING_SCHEMA, [typeID])
 
231
 
 
232
        if inMemorySchema != onDiskSchema:
 
233
            raise RuntimeError(
 
234
                "Schema mismatch on already-loaded %r object version %d: %r != %r" %
 
235
                (actualType.typeName, actualType.schemaVersion, onDiskSchema, inMemorySchema))
 
236
 
 
237
        if self.querySQL(_schema.GET_TYPE_OF_VERSION,
 
238
                      [typename, actualType.schemaVersion]):
 
239
            raise RuntimeError(
 
240
                "Greater versions of database %r objects in the DB than in memory" %
 
241
                (typename,))
 
242
 
 
243
        # finally find old versions of the data and prepare to upgrade it.
 
244
 
 
245
    def _prepareOldVersionOf(self, typeID, typename, version):
 
246
        dummyItemSubclass(*self._dssargs(typeID, typename, version))
 
247
 
 
248
    def getOldVersionOf(self, typename, version):
 
249
        return _legacyTypes[typename, version]
 
250
 
 
251
    def _dssargs(self, typeID, typename, version):
 
252
        """
 
253
        Returns a 4-tuple suitable as args for dummyItemSubclass
 
254
        """
 
255
 
 
256
        appropriateSchema = self.querySQL(_schema.SCHEMA_FOR_TYPE, [typeID])
 
257
        # create actual attribute objects
 
258
        dummyAttributes = {}
 
259
        for indexed, pythontype, attribute, docstring in appropriateSchema:
 
260
            atr = getattr(attributes, pythontype)(indexed=indexed,
 
261
                                                  doc=docstring)
 
262
            dummyAttributes[attribute] = atr
 
263
        dummyBases = []
 
264
        retval = (typename, version, dummyAttributes, dummyBases)
 
265
        return retval
 
266
 
 
267
 
 
268
        # grab the schema for that version
 
269
        # look up upgraders which push it forward
 
270
        # insert "AutoUpgrader" class into idToTypename somehow(?)
 
271
 
 
272
 
 
273
    def query(self, tableClass,
 
274
              comparison=None,
 
275
              limit=None, offset=None,
 
276
              sort=None):
 
277
        if (tableClass.typeName,
 
278
            tableClass.schemaVersion) not in self.typenameAndVersionToID:
 
279
            return
 
280
        if comparison is not None:
 
281
            tables = set(comparison.getTableNames())
 
282
            where = ['WHERE', comparison.getQuery()]
 
283
            args = comparison.getArgsFor(self)
 
284
        else:
 
285
            tables = set()
 
286
            where = []
 
287
            args = []
 
288
        tables.add(tableClass.getTableName())
 
289
        query = ['SELECT',
 
290
                 tableClass.getTableName(), '.oid,',
 
291
                 tableClass.getTableName(), '.*', 'FROM',
 
292
                 ', '.join(tables)]
 
293
        query.extend(where)
 
294
        if sort is not None:
 
295
            query.append(sort)
 
296
        if limit is not None:
 
297
            # XXX LIMIT and OFFSET used to be using ?, but they started
 
298
            # generating syntax errors in places where generating the whole SQL
 
299
            # statement does not.  this smells like a bug in sqlite's parser to
 
300
            # me, but I don't know my SQL syntax standards well enough to be
 
301
            # sure -glyph
 
302
            query.append('LIMIT ')
 
303
            query.append(str(limit))
 
304
            if offset is not None:
 
305
                query.append('OFFSET ')
 
306
                query.append(str(offset))
 
307
        S = ' '.join(query)
 
308
        for row in self.querySQL(S, args):
 
309
            yield self._loadedItem(
 
310
                tableClass,
 
311
                row[0],
 
312
                row[1:])
 
313
 
 
314
    def _loadedItem(self, itemClass, storeID, attrs):
 
315
        if self.objectCache.has(storeID):
 
316
            result = self.objectCache.get(storeID)
 
317
            # XXX do checks on consistency between attrs and DB object, maybe?
 
318
        else:
 
319
            result = itemClass.existingInStore(self, storeID, attrs)
 
320
            if not result.__legacy__:
 
321
                self.objectCache.cache(storeID, result)
 
322
        return result
 
323
 
 
324
    def checkpoint(self):
 
325
        for item in self.transaction:
 
326
            # XXX: it should be possible here, using various clever hacks, to
 
327
            # automatically optimize functionally identical statements into
 
328
            # executemany.
 
329
            item.checkpoint()
 
330
 
 
331
    def revert(self):
 
332
        self.connection.rollback()
 
333
        for item in self.transaction:
 
334
            item.revert()
 
335
 
 
336
    def transact(self, f, *a, **k):
 
337
        self.transaction = set()
 
338
        self.autocommit = False
 
339
        try:
 
340
            try:
 
341
                result = f(*a, **k)
 
342
                self.checkpoint()
 
343
            except:
 
344
                self.revert()
 
345
                raise
 
346
            else:
 
347
                self.commit()
 
348
                for committed in self.transaction:
 
349
                    committed.committed()
 
350
            return result
 
351
        finally:
 
352
            self.autocommit = True
 
353
            self.transaction = None
 
354
 
 
355
    def commit(self):
 
356
        if self.debug:
 
357
            print '*'*10, 'COMMIT', '*'*10
 
358
        self.connection.commit()
 
359
 
 
360
    def close(self):
 
361
        self.connection.close()
 
362
        if self.debug:
 
363
            if not self.queryTimes:
 
364
                print 'no queries'
 
365
            else:
 
366
                print 'query:', self.avgms(self.queryTimes)
 
367
            if not self.execTimes:
 
368
                print 'no execs'
 
369
            else:
 
370
                print 'exec:', self.avgms(self.execTimes)
 
371
 
 
372
    def avgms(self, l):
 
373
        return 'count: %d avg: %dus' % (len(l), int( (sum(l)/len(l)) * 1000000.),)
 
374
 
 
375
 
 
376
    def getTypeID(self, tableClass):
 
377
        """
 
378
        """
 
379
        key = (tableClass.typeName,
 
380
               tableClass.schemaVersion)
 
381
        if key in self.typenameAndVersionToID:
 
382
            return self.typenameAndVersionToID[key]
 
383
 
 
384
        sqlstr = []
 
385
        sqlarg = []
 
386
        indexes = []
 
387
 
 
388
        # needs to be calculated including version
 
389
        tableName = tableClass.getTableName()
 
390
 
 
391
 
 
392
        sqlstr.append("CREATE TABLE %s (" % tableName)
 
393
 
 
394
        for nam, atr in tableClass.getSchema():
 
395
            # it's a stored attribute
 
396
            sqlarg.append("\n%s %s" %
 
397
                          (atr.attrname, atr.sqltype))
 
398
            if atr.indexed:
 
399
                indexes.append(nam)
 
400
 
 
401
        sqlstr.append(', '.join(sqlarg))
 
402
        sqlstr.append(')')
 
403
 
 
404
        typeID = self.executeSQL(_schema.CREATE_TYPE, [tableClass.typeName,
 
405
                                                       tableClass.schemaVersion])
 
406
 
 
407
        for n, (name, storedAttribute) in enumerate(tableClass.getSchema()):
 
408
            self.executeSQL(
 
409
                _schema.ADD_SCHEMA_ATTRIBUTE,
 
410
                [typeID, n, storedAttribute.indexed, storedAttribute.sqltype,
 
411
                 storedAttribute.columnName, storedAttribute.doc,
 
412
                 storedAttribute.__class__.__name__])
 
413
            # XXX probably need something better for pythontype eventually,
 
414
            # when we figure out a good way to do user-defined attributes or we
 
415
            # start parameterizing references.
 
416
 
 
417
        self.typenameToID[tableClass.typeName] = typeID
 
418
        self.typenameAndVersionToID[key] = typeID
 
419
        self.idToTypename[typeID] = tableClass.typeName
 
420
 
 
421
        # The table and index creation code is executed last, because SQLite
 
422
        # has the extremely unfortunate habit of committing transactions as
 
423
        # soon as you issue a (CREATE|DROP) (TABLE|INDEX)
 
424
 
 
425
        # There is an obscure condition where the database could be left in an
 
426
        # inconsistent state.  If you do something that needs to be
 
427
        # transactional, create the first instance of a particular item type,
 
428
        # then do something which potentially raises an exception, then
 
429
        # checkpoint, THEN do something that depends on your first
 
430
        # transactional action, the database will commit as soon as the new
 
431
        # item type is created.
 
432
 
 
433
        # Note, however, that it is perfectly safe to do this _without_ a call
 
434
        # to checkpoint() because none of the Python-level SQL will run until
 
435
        # the database has checkpointed, either from user code or at the end of
 
436
        # the transaction.
 
437
 
 
438
        self.executeSQL(''.join(sqlstr))
 
439
        for index in indexes:
 
440
            self.executeSQL('CREATE INDEX atopidx_%s_%s ON %s(%s)'
 
441
                            % (tableName, index,
 
442
                               tableName, index))
 
443
 
 
444
        return typeID
 
445
 
 
446
    def getTableQuery(self, typename, version):
 
447
        if typename not in self.tableQueries:
 
448
            query = 'SELECT * FROM %s WHERE oid = ?' % (
 
449
                TABLE_NAME(typename, version), )
 
450
            self.tableQueries[typename, version] = query
 
451
        return self.tableQueries[typename, version]
 
452
 
 
453
    def getItemByID(self, storeID, autoUpgrade=True):
 
454
        """
 
455
        """
 
456
        assert storeID is not None
 
457
        if self.objectCache.has(storeID):
 
458
            return self.objectCache.get(storeID)
 
459
        results = self.querySQL(_schema.TYPEOF_QUERY, [storeID])
 
460
        assert (len(results) in [1, 0]),\
 
461
            "Database panic: more than one result for TYPEOF!"
 
462
        if results:
 
463
            typename, version = results[0]
 
464
            # for the moment we're going to assume no inheritance
 
465
            attrs = self.querySQL(self.getTableQuery(typename, version),
 
466
                                  [storeID])
 
467
            assert len(attrs) == 1, "No results for known-to-be-good object"
 
468
            attrs = attrs[0]
 
469
            useMostRecent = False
 
470
            moreRecentAvailable = False
 
471
            if typename in _typeNameToMostRecentClass:
 
472
                moreRecentAvailable = True
 
473
                mostRecent = _typeNameToMostRecentClass[typename]
 
474
 
 
475
                if mostRecent.schemaVersion < version:
 
476
                    raise RuntimeError("%s:%d - was found in the database and most recent %s is %d" %
 
477
                                       (typename, version, typename, mostRecent.schemaVersion))
 
478
                if mostRecent.schemaVersion == version:
 
479
                    useMostRecent = True
 
480
            if useMostRecent:
 
481
                T = mostRecent
 
482
            else:
 
483
                T = self.getOldVersionOf(typename, version)
 
484
            x = T.existingInStore(self, storeID, attrs)
 
485
            if moreRecentAvailable and (not useMostRecent) and autoUpgrade:
 
486
                x = upgrade.upgradeAllTheWay(x, typename, x.schemaVersion)
 
487
            if not x.__legacy__:
 
488
                # X is upgraded all the way
 
489
                self.objectCache.cache(storeID, x)
 
490
            return x
 
491
        else:
 
492
            raise KeyError(storeID)
 
493
 
 
494
    def _normalizeSQL(self, sql):
 
495
        assert "'" not in sql, "Strings are _NOT ALLOWED_"
 
496
        if sql not in self.statementCache:
 
497
            accum = []
 
498
            lines = sql.split('\n')
 
499
            for line in lines:
 
500
                line = line.split('--')[0]         # remove comments
 
501
                words = line.strip().split()
 
502
                accum.extend(words)
 
503
            normsql = ' '.join(accum)   # your SQL should never have any
 
504
                                        # significant whitespace in it, right?
 
505
            self.statementCache[sql] = normsql
 
506
        return self.statementCache[sql]
 
507
 
 
508
 
 
509
    def querySQL(self, sql, args=()):
 
510
        """For use with SELECT (or SELECT-like PRAGMA) statements.
 
511
        """
 
512
        sql = self._normalizeSQL(sql)
 
513
        if self.debug:
 
514
            result = timeinto(self.queryTimes, self._queryandfetch, sql, args)
 
515
        else:
 
516
            result = self._queryandfetch(sql, args)
 
517
        return result
 
518
 
 
519
    def _queryandfetch(self, sql, args):
 
520
        if self.debug:
 
521
            print '**', sql, '--', ', '.join(map(str, args))
 
522
        try:
 
523
            self.cursor.execute(sql, args)
 
524
        except (sqlite.ProgrammingError, sqlite.OperationalError), oe:
 
525
            raise RuntimeError("SQL: %r(%r) caused exception: %s" %(
 
526
                    sql, args, oe))
 
527
        result = self.cursor.fetchall()
 
528
        if self.autocommit:
 
529
            self.commit()
 
530
        if self.debug:
 
531
            print '  lastrow:', self.cursor.lastrowid
 
532
            print '  result:', result
 
533
        return result
 
534
 
 
535
    def executeSQL(self, sql, args=()):
 
536
        """
 
537
        For use with UPDATE, INSERT or CREATE statements.
 
538
        """
 
539
        sql = self._normalizeSQL(sql)
 
540
        if self.debug:
 
541
            rows = timeinto(self.execTimes, self._queryandfetch, sql, args)
 
542
        else:
 
543
            rows = self._queryandfetch(sql, args)
 
544
        assert not rows
 
545
        result = self.cursor.lastrowid
 
546
        return result
 
547
 
 
548
 
 
549
def timeinto(l, f, *a, **k):
 
550
    then = time.time()
 
551
    try:
 
552
        return f(*a, **k)
 
553
    finally:
 
554
        now = time.time()
 
555
        elapsed = now - then
 
556
        l.append(elapsed)
 
557
 
 
558
queryTimes = []
 
559
execTimes = []
 
560
 
 
561
 
 
562
class StorageService(MultiService):
 
563
 
 
564
    def __init__(self, *a, **k):
 
565
        MultiService.__init__(self)
 
566
        self.a = a
 
567
        self.k = k
 
568
        self.store = None
 
569
 
 
570
    def privilegedStartService(self):
 
571
        print 'privstartserv'
 
572
        self.store = Store(*self.a, **self.k)
 
573
        print 'store open'
 
574
        IService(self.store).setServiceParent(self)
 
575
        print 'service parent set'
 
576
        MultiService.privilegedStartService(self)
 
577
        print 'service parent started', list(IService(self.store))
 
578
 
 
579
    def close(self, ignored=None):
 
580
        # final close method, called after the service has been stopped
 
581
        print 'close'
 
582
        self.store.close()
 
583
        self.store = None
 
584
        return ignored
 
585
 
 
586
    def stopService(self):
 
587
        print 'stopping service'
 
588
        return defer.maybeDeferred(
 
589
            MultiService.stopService, self).addBoth(
 
590
            self.close)