~midokura/nova/midostack-oneiric

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/persisted/journal/rowjournal.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
 
5
 
 
6
"""Journal using twisted.enterprise.row RDBMS support.
 
7
 
 
8
You're going to need the following table in your database::
 
9
 
 
10
    | CREATE TABLE journalinfo
 
11
    | (
 
12
    |   commandIndex int
 
13
    | );
 
14
    | INSERT INTO journalinfo VALUES (0);
 
15
 
 
16
"""
 
17
 
 
18
from __future__ import nested_scopes
 
19
 
 
20
# twisted imports
 
21
from twisted.internet import defer
 
22
 
 
23
# sibling imports
 
24
import base
 
25
 
 
26
 
 
27
# constants for command list
 
28
INSERT, DELETE, UPDATE = range(3)
 
29
 
 
30
 
 
31
class RowJournal(base.Journal):
 
32
    """Journal that stores data 'snapshot' in using twisted.enterprise.row.
 
33
 
 
34
    Use this as the reflector instead of the original reflector.
 
35
 
 
36
    It may block on creation, if it has to run recovery.
 
37
    """
 
38
 
 
39
    def __init__(self, log, journaledService, reflector):
 
40
        self.reflector = reflector
 
41
        self.commands = []
 
42
        self.syncing = 0
 
43
        base.Journal.__init__(self, log, journaledService)
 
44
    
 
45
    def updateRow(self, obj):
 
46
        """Mark on object for updating when sync()ing."""
 
47
        self.commands.append((UPDATE, obj))
 
48
 
 
49
    def insertRow(self, obj):
 
50
        """Mark on object for inserting when sync()ing."""
 
51
        self.commands.append((INSERT, obj))
 
52
 
 
53
    def deleteRow(self, obj):
 
54
        """Mark on object for deleting when sync()ing."""
 
55
        self.commands.append((DELETE, obj))
 
56
 
 
57
    def loadObjectsFrom(self, tableName, parentRow=None, data=None, whereClause=None, forceChildren=0):
 
58
        """Flush all objects to the database and then load objects."""
 
59
        d = self.sync()
 
60
        d.addCallback(lambda result: self.reflector.loadObjectsFrom(
 
61
            tableName, parentRow=parentRow, data=data, whereClause=whereClause,
 
62
            forceChildren=forceChildren))
 
63
        return d
 
64
 
 
65
    def sync(self):
 
66
        """Commit changes to database."""
 
67
        if self.syncing:
 
68
            raise ValueError, "sync already in progress"
 
69
        comandMap = {INSERT : self.reflector.insertRowSQL,
 
70
                     UPDATE : self.reflector.updateRowSQL,
 
71
                     DELETE : self.reflector.deleteRowSQL}
 
72
        sqlCommands = []
 
73
        for kind, obj in self.commands:
 
74
            sqlCommands.append(comandMap[kind](obj))
 
75
        self.commands = []
 
76
        if sqlCommands:
 
77
            self.syncing = 1
 
78
            d = self.reflector.dbpool.runInteraction(self._sync, self.latestIndex, sqlCommands)
 
79
            d.addCallback(self._syncDone)
 
80
            return d
 
81
        else:
 
82
            return defer.succeed(1)
 
83
 
 
84
    def _sync(self, txn, index, commands):
 
85
        """Do the actual database synchronization."""
 
86
        for c in commands:
 
87
            txn.execute(c)
 
88
        txn.update("UPDATE journalinfo SET commandIndex = %d" % index)
 
89
    
 
90
    def _syncDone(self, result):
 
91
        self.syncing = 0
 
92
        return result
 
93
    
 
94
    def getLastSnapshot(self):
 
95
        """Return command index of last snapshot."""
 
96
        conn = self.reflector.dbpool.connect()
 
97
        cursor = conn.cursor()
 
98
        cursor.execute("SELECT commandIndex FROM journalinfo")
 
99
        return cursor.fetchall()[0][0]