1
# -*- test-case-name: twisted.test.test_pb -*-
3
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
4
# See LICENSE for details.
8
Utility classes for spread.
11
from twisted.internet import defer
12
from twisted.python.failure import Failure
13
from twisted.spread import pb
14
from twisted.protocols import basic
15
from twisted.internet import interfaces
17
from zope.interface import implements
21
def __init__(self, local, name):
25
def __call__(self, *args, **kw):
26
return self.local.callRemote(self.name, *args, **kw)
31
A class useful for emulating the effects of remote behavior locally.
33
reportAllTracebacks = 1
35
def callRemote(self, name, *args, **kw):
37
Call a specially-designated local method.
39
self.callRemote('x') will first try to invoke a method named
40
sync_x and return its result (which should probably be a
41
Deferred). Second, it will look for a method called async_x,
42
which will be called and then have its result (or Failure)
43
automatically wrapped in a Deferred.
45
if hasattr(self, 'sync_'+name):
46
return getattr(self, 'sync_'+name)(*args, **kw)
48
method = getattr(self, "async_" + name)
49
return defer.succeed(method(*args, **kw))
52
if self.reportAllTracebacks:
56
def remoteMethod(self, name):
57
return LocalMethod(self, name)
60
class LocalAsyncForwarder:
62
A class useful for forwarding a locally-defined interface.
65
def __init__(self, forwarded, interfaceClass, failWhenNotImplemented=0):
66
assert interfaceClass.providedBy(forwarded)
67
self.forwarded = forwarded
68
self.interfaceClass = interfaceClass
69
self.failWhenNotImplemented = failWhenNotImplemented
71
def _callMethod(self, method, *args, **kw):
72
return getattr(self.forwarded, method)(*args, **kw)
74
def callRemote(self, method, *args, **kw):
75
if self.interfaceClass.queryDescriptionFor(method):
76
result = defer.maybeDeferred(self._callMethod, method, *args, **kw)
78
elif self.failWhenNotImplemented:
80
Failure(NotImplementedError,
81
"No Such Method in Interface: %s" % method))
83
return defer.succeed(None)
88
I am an object which pages out information.
90
def __init__(self, collector, callback=None, *args, **kw):
92
Create a pager with a Reference to a remote collector and
93
an optional callable to invoke upon completion.
95
if callable(callback):
96
self.callback = callback
97
self.callbackArgs = args
98
self.callbackKeyword = kw
101
self._stillPaging = 1
102
self.collector = collector
103
collector.broker.registerPageProducer(self)
105
def stillPaging(self):
107
(internal) Method called by Broker.
109
if not self._stillPaging:
110
self.collector.callRemote("endedPaging")
111
if self.callback is not None:
112
self.callback(*self.callbackArgs, **self.callbackKeyword)
113
return self._stillPaging
115
def sendNextPage(self):
117
(internal) Method called by Broker.
119
self.collector.callRemote("gotPage", self.nextPage())
123
Override this to return an object to be sent to my collector.
125
raise NotImplementedError()
127
def stopPaging(self):
129
Call this when you're done paging.
131
self._stillPaging = 0
134
class StringPager(Pager):
136
A simple pager that splits a string into chunks.
138
def __init__(self, collector, st, chunkSize=8192, callback=None, *args, **kw):
141
self.chunkSize = chunkSize
142
Pager.__init__(self, collector, callback, *args, **kw)
145
val = self.string[self.pointer:self.pointer+self.chunkSize]
146
self.pointer += self.chunkSize
147
if self.pointer >= len(self.string):
152
class FilePager(Pager):
154
Reads a file in chunks and sends the chunks as they come.
156
implements(interfaces.IConsumer)
158
def __init__(self, collector, fd, callback=None, *args, **kw):
160
Pager.__init__(self, collector, callback, *args, **kw)
161
self.startProducing(fd)
163
def startProducing(self, fd):
164
self.deferred = basic.FileSender().beginFileTransfer(fd, self)
165
self.deferred.addBoth(lambda x : self.stopPaging())
167
def registerProducer(self, producer, streaming):
168
self.producer = producer
170
self.producer.resumeProducing()
172
def unregisterProducer(self):
175
def write(self, chunk):
176
self.chunks.append(chunk)
178
def sendNextPage(self):
180
Get the first chunk read and send it to collector.
184
val = self.chunks.pop(0)
185
self.producer.resumeProducing()
186
self.collector.callRemote("gotPage", val)
189
# Utility paging stuff.
190
class CallbackPageCollector(pb.Referenceable):
192
I receive pages from the peer. You may instantiate a Pager with a
193
remote reference to me. I will call the callback with a list of pages
194
once they are all received.
196
def __init__(self, callback):
198
self.callback = callback
200
def remote_gotPage(self, page):
201
self.pages.append(page)
203
def remote_endedPaging(self):
204
self.callback(self.pages)
207
def getAllPages(referenceable, methodName, *args, **kw):
209
A utility method that will call a remote method which expects a
210
PageCollector as the first argument.
213
referenceable.callRemote(methodName, CallbackPageCollector(d.callback), *args, **kw)