~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/persisted/journal/base.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_journal -*-
 
2
#
 
3
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
4
# See LICENSE for details.
 
5
 
 
6
 
7
 
 
8
 
 
9
"""Basic classes and interfaces for journal."""
 
10
 
 
11
from __future__ import nested_scopes
 
12
 
 
13
# system imports
 
14
import os, time
 
15
 
 
16
try:
 
17
    import cPickle as pickle
 
18
except ImportError:
 
19
    import pickle
 
20
 
 
21
# twisted imports
 
22
from zope.interface import implements, Interface
 
23
 
 
24
 
 
25
class Journal:
 
26
    """All commands to the system get routed through here.
 
27
 
 
28
    Subclasses should implement the actual snapshotting capability.
 
29
    """
 
30
 
 
31
    def __init__(self, log, journaledService):
 
32
        self.log = log
 
33
        self.journaledService = journaledService
 
34
        self.latestIndex = self.log.getCurrentIndex()
 
35
 
 
36
    def updateFromLog(self):
 
37
        """Run all commands from log that haven't been run yet.
 
38
 
 
39
        This method should be run on startup to ensure the snapshot
 
40
        is up-to-date.
 
41
        """
 
42
        snapshotIndex = self.getLastSnapshot()
 
43
        if snapshotIndex < self.latestIndex:
 
44
            for cmdtime, command in self.log.getCommandsSince(snapshotIndex + 1):
 
45
                command.execute(self.journaledService, cmdtime)
 
46
 
 
47
    def executeCommand(self, command):
 
48
        """Log and execute a command."""
 
49
        runTime = time.time()
 
50
        d = self.log.logCommand(command, runTime)
 
51
        d.addCallback(self._reallyExecute, command, runTime)
 
52
        return d
 
53
 
 
54
    def _reallyExecute(self, index, command, runTime):
 
55
        """Callback called when logging command is done."""
 
56
        result = command.execute(self.journaledService, runTime)
 
57
        self.latestIndex = index
 
58
        return result
 
59
    
 
60
    def getLastSnapshot(self):
 
61
        """Return command index of the last snapshot taken."""
 
62
        raise NotImplementedError
 
63
 
 
64
    def sync(self, *args, **kwargs):
 
65
        """Save journal to disk, returns Deferred of finish status.
 
66
 
 
67
        Subclasses may choose whatever signature is appropriate, or may
 
68
        not implement this at all.
 
69
        """
 
70
        raise NotImplementedError
 
71
 
 
72
 
 
73
 
 
74
class MemoryJournal(Journal):
 
75
    """Prevayler-like journal that dumps from memory to disk."""
 
76
 
 
77
    def __init__(self, log, journaledService, path, loadedCallback):
 
78
        self.path = path
 
79
        if os.path.exists(path):
 
80
            try:
 
81
                self.lastSync, obj = pickle.load(open(path, "rb"))
 
82
            except (IOError, OSError, pickle.UnpicklingError):
 
83
                self.lastSync, obj = 0, None
 
84
            loadedCallback(obj)
 
85
        else:
 
86
            self.lastSync = 0
 
87
            loadedCallback(None)
 
88
        Journal.__init__(self, log, journaledService)
 
89
 
 
90
    def getLastSnapshot(self):
 
91
        return self.lastSync
 
92
 
 
93
    def sync(self, obj):
 
94
        # make this more reliable at some point
 
95
        f = open(self.path, "wb")
 
96
        pickle.dump((self.latestIndex, obj), f, 1)
 
97
        f.close()
 
98
        self.lastSync = self.latestIndex
 
99
 
 
100
 
 
101
class ICommand(Interface):
 
102
    """A serializable command which interacts with a journaled service."""
 
103
 
 
104
    def execute(journaledService, runTime):
 
105
        """Run the command and return result."""
 
106
 
 
107
 
 
108
class ICommandLog(Interface):
 
109
    """Interface for command log."""
 
110
 
 
111
    def logCommand(command, runTime):
 
112
        """Add a command and its run time to the log.
 
113
 
 
114
        @return: Deferred of command index.
 
115
        """
 
116
 
 
117
    def getCurrentIndex():
 
118
        """Return index of last command that was logged."""
 
119
 
 
120
    def getCommandsSince(index):
 
121
        """Return commands who's index >= the given one.
 
122
 
 
123
        @return: list of (time, command) tuples, sorted with ascending times.
 
124
        """
 
125
 
 
126
 
 
127
class LoadingService:
 
128
    """Base class for journalled service used with Wrappables."""
 
129
 
 
130
    def loadObject(self, objType, objId):
 
131
        """Return object of specified type and id."""
 
132
        raise NotImplementedError
 
133
 
 
134
 
 
135
class Wrappable:
 
136
    """Base class for objects used with LoadingService."""
 
137
 
 
138
    objectType = None # override in base class
 
139
 
 
140
    def getUid(self):
 
141
        """Return uid for loading with LoadingService.loadObject"""
 
142
        raise NotImplementedError
 
143
 
 
144
 
 
145
class WrapperCommand:
 
146
    
 
147
    implements(ICommand)
 
148
 
 
149
    def __init__(self, methodName, obj, args=(), kwargs={}):
 
150
        self.obj = obj
 
151
        self.objId = obj.getUid()
 
152
        self.objType = obj.objectType
 
153
        self.methodName = methodName
 
154
        self.args = args
 
155
        self.kwargs = kwargs
 
156
 
 
157
    def execute(self, svc, commandTime):
 
158
        if not hasattr(self, "obj"):
 
159
            obj = svc.loadObject(self.objType, self.objId)
 
160
        else:
 
161
            obj = self.obj
 
162
        return getattr(obj, self.methodName)(*self.args, **self.kwargs)
 
163
 
 
164
    def __getstate__(self):
 
165
        d = self.__dict__.copy()
 
166
        del d["obj"]
 
167
        return d
 
168
 
 
169
 
 
170
def command(methodName, cmdClass=WrapperCommand):
 
171
    """Wrap a method so it gets turned into command automatically.
 
172
 
 
173
    For use with Wrappables.
 
174
 
 
175
    Usage::
 
176
 
 
177
        | class Foo(Wrappable):
 
178
        |     objectType = "foo"
 
179
        |     def getUid(self):
 
180
        |         return self.id
 
181
        |     def _bar(self, x):
 
182
        |         return x + 1
 
183
        |
 
184
        |     bar = command('_bar')
 
185
 
 
186
    The resulting callable will have signature identical to wrapped
 
187
    function, except that it expects journal as first argument, and
 
188
    returns a Deferred.
 
189
    """
 
190
    def wrapper(obj, journal, *args, **kwargs):
 
191
        return journal.executeCommand(cmdClass(methodName, obj, args, kwargs))
 
192
    return wrapper
 
193
 
 
194
 
 
195
class ServiceWrapperCommand:
 
196
 
 
197
    implements(ICommand)
 
198
 
 
199
    def __init__(self, methodName, args=(), kwargs={}):
 
200
        self.methodName = methodName
 
201
        self.args = args
 
202
        self.kwargs = kwargs
 
203
 
 
204
    def execute(self, svc, commandTime):
 
205
        return getattr(svc, self.methodName)(*self.args, **self.kwargs)
 
206
 
 
207
    def __repr__(self):
 
208
        return "<ServiceWrapperCommand: %s, %s, %s>" % (self.methodName, self.args, self.kwargs)
 
209
    
 
210
    def __cmp__(self, other):
 
211
        if hasattr(other, "__dict__"):
 
212
            return cmp(self.__dict__, other.__dict__)
 
213
        else:
 
214
            return 0
 
215
 
 
216
 
 
217
def serviceCommand(methodName, cmdClass=ServiceWrapperCommand):
 
218
    """Wrap methods into commands for a journalled service.
 
219
 
 
220
    The resulting callable will have signature identical to wrapped
 
221
    function, except that it expects journal as first argument, and
 
222
    returns a Deferred.
 
223
    """
 
224
    def wrapper(obj, journal, *args, **kwargs):
 
225
        return journal.executeCommand(cmdClass(methodName, args, kwargs))
 
226
    return wrapper