1
# -*- test-case-name: twisted.test.test_journal -*-
3
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
4
# See LICENSE for details.
9
"""Basic classes and interfaces for journal."""
11
from __future__ import nested_scopes
17
import cPickle as pickle
22
from zope.interface import implements, Interface
26
"""All commands to the system get routed through here.
28
Subclasses should implement the actual snapshotting capability.
31
def __init__(self, log, journaledService):
33
self.journaledService = journaledService
34
self.latestIndex = self.log.getCurrentIndex()
36
def updateFromLog(self):
37
"""Run all commands from log that haven't been run yet.
39
This method should be run on startup to ensure the snapshot
42
snapshotIndex = self.getLastSnapshot()
43
if snapshotIndex < self.latestIndex:
44
for cmdtime, command in self.log.getCommandsSince(snapshotIndex + 1):
45
command.execute(self.journaledService, cmdtime)
47
def executeCommand(self, command):
48
"""Log and execute a command."""
50
d = self.log.logCommand(command, runTime)
51
d.addCallback(self._reallyExecute, command, runTime)
54
def _reallyExecute(self, index, command, runTime):
55
"""Callback called when logging command is done."""
56
result = command.execute(self.journaledService, runTime)
57
self.latestIndex = index
60
def getLastSnapshot(self):
61
"""Return command index of the last snapshot taken."""
62
raise NotImplementedError
64
def sync(self, *args, **kwargs):
65
"""Save journal to disk, returns Deferred of finish status.
67
Subclasses may choose whatever signature is appropriate, or may
68
not implement this at all.
70
raise NotImplementedError
74
class MemoryJournal(Journal):
75
"""Prevayler-like journal that dumps from memory to disk."""
77
def __init__(self, log, journaledService, path, loadedCallback):
79
if os.path.exists(path):
81
self.lastSync, obj = pickle.load(open(path, "rb"))
82
except (IOError, OSError, pickle.UnpicklingError):
83
self.lastSync, obj = 0, None
88
Journal.__init__(self, log, journaledService)
90
def getLastSnapshot(self):
94
# make this more reliable at some point
95
f = open(self.path, "wb")
96
pickle.dump((self.latestIndex, obj), f, 1)
98
self.lastSync = self.latestIndex
101
class ICommand(Interface):
102
"""A serializable command which interacts with a journaled service."""
104
def execute(journaledService, runTime):
105
"""Run the command and return result."""
108
class ICommandLog(Interface):
109
"""Interface for command log."""
111
def logCommand(command, runTime):
112
"""Add a command and its run time to the log.
114
@return: Deferred of command index.
117
def getCurrentIndex():
118
"""Return index of last command that was logged."""
120
def getCommandsSince(index):
121
"""Return commands who's index >= the given one.
123
@return: list of (time, command) tuples, sorted with ascending times.
127
class LoadingService:
128
"""Base class for journalled service used with Wrappables."""
130
def loadObject(self, objType, objId):
131
"""Return object of specified type and id."""
132
raise NotImplementedError
136
"""Base class for objects used with LoadingService."""
138
objectType = None # override in base class
141
"""Return uid for loading with LoadingService.loadObject"""
142
raise NotImplementedError
145
class WrapperCommand:
149
def __init__(self, methodName, obj, args=(), kwargs={}):
151
self.objId = obj.getUid()
152
self.objType = obj.objectType
153
self.methodName = methodName
157
def execute(self, svc, commandTime):
158
if not hasattr(self, "obj"):
159
obj = svc.loadObject(self.objType, self.objId)
162
return getattr(obj, self.methodName)(*self.args, **self.kwargs)
164
def __getstate__(self):
165
d = self.__dict__.copy()
170
def command(methodName, cmdClass=WrapperCommand):
171
"""Wrap a method so it gets turned into command automatically.
173
For use with Wrappables.
177
| class Foo(Wrappable):
184
| bar = command('_bar')
186
The resulting callable will have signature identical to wrapped
187
function, except that it expects journal as first argument, and
190
def wrapper(obj, journal, *args, **kwargs):
191
return journal.executeCommand(cmdClass(methodName, obj, args, kwargs))
195
class ServiceWrapperCommand:
199
def __init__(self, methodName, args=(), kwargs={}):
200
self.methodName = methodName
204
def execute(self, svc, commandTime):
205
return getattr(svc, self.methodName)(*self.args, **self.kwargs)
208
return "<ServiceWrapperCommand: %s, %s, %s>" % (self.methodName, self.args, self.kwargs)
210
def __cmp__(self, other):
211
if hasattr(other, "__dict__"):
212
return cmp(self.__dict__, other.__dict__)
217
def serviceCommand(methodName, cmdClass=ServiceWrapperCommand):
218
"""Wrap methods into commands for a journalled service.
220
The resulting callable will have signature identical to wrapped
221
function, except that it expects journal as first argument, and
224
def wrapper(obj, journal, *args, **kwargs):
225
return journal.executeCommand(cmdClass(methodName, args, kwargs))