~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/spread/util.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_pb -*-
 
2
 
 
3
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
 
4
# See LICENSE for details.
 
5
 
 
6
 
 
7
"""
 
8
Utility classes for spread.
 
9
"""
 
10
 
 
11
from twisted.internet import defer
 
12
from twisted.python.failure import Failure
 
13
from twisted.spread import pb
 
14
from twisted.protocols import basic
 
15
from twisted.internet import interfaces
 
16
 
 
17
from zope.interface import implements
 
18
 
 
19
 
 
20
class LocalMethod:
 
21
    def __init__(self, local, name):
 
22
        self.local = local
 
23
        self.name = name
 
24
 
 
25
    def __call__(self, *args, **kw):
 
26
        return self.local.callRemote(self.name, *args, **kw)
 
27
 
 
28
 
 
29
class LocalAsRemote:
 
30
    """
 
31
    A class useful for emulating the effects of remote behavior locally.
 
32
    """
 
33
    reportAllTracebacks = 1
 
34
 
 
35
    def callRemote(self, name, *args, **kw):
 
36
        """
 
37
        Call a specially-designated local method.
 
38
 
 
39
        self.callRemote('x') will first try to invoke a method named
 
40
        sync_x and return its result (which should probably be a
 
41
        Deferred).  Second, it will look for a method called async_x,
 
42
        which will be called and then have its result (or Failure)
 
43
        automatically wrapped in a Deferred.
 
44
        """
 
45
        if hasattr(self, 'sync_'+name):
 
46
            return getattr(self, 'sync_'+name)(*args, **kw)
 
47
        try:
 
48
            method = getattr(self, "async_" + name)
 
49
            return defer.succeed(method(*args, **kw))
 
50
        except:
 
51
            f = Failure()
 
52
            if self.reportAllTracebacks:
 
53
                f.printTraceback()
 
54
            return defer.fail(f)
 
55
 
 
56
    def remoteMethod(self, name):
 
57
        return LocalMethod(self, name)
 
58
 
 
59
 
 
60
class LocalAsyncForwarder:
 
61
    """
 
62
    A class useful for forwarding a locally-defined interface.
 
63
    """
 
64
 
 
65
    def __init__(self, forwarded, interfaceClass, failWhenNotImplemented=0):
 
66
        assert interfaceClass.providedBy(forwarded)
 
67
        self.forwarded = forwarded
 
68
        self.interfaceClass = interfaceClass
 
69
        self.failWhenNotImplemented = failWhenNotImplemented
 
70
 
 
71
    def _callMethod(self, method, *args, **kw):
 
72
        return getattr(self.forwarded, method)(*args, **kw)
 
73
 
 
74
    def callRemote(self, method, *args, **kw):
 
75
        if self.interfaceClass.queryDescriptionFor(method):
 
76
            result = defer.maybeDeferred(self._callMethod, method, *args, **kw)
 
77
            return result
 
78
        elif self.failWhenNotImplemented:
 
79
            return defer.fail(
 
80
                Failure(NotImplementedError,
 
81
                        "No Such Method in Interface: %s" % method))
 
82
        else:
 
83
            return defer.succeed(None)
 
84
 
 
85
 
 
86
class Pager:
 
87
    """
 
88
    I am an object which pages out information.
 
89
    """
 
90
    def __init__(self, collector, callback=None, *args, **kw):
 
91
        """
 
92
        Create a pager with a Reference to a remote collector and
 
93
        an optional callable to invoke upon completion.
 
94
        """
 
95
        if callable(callback):
 
96
            self.callback = callback
 
97
            self.callbackArgs = args
 
98
            self.callbackKeyword = kw
 
99
        else:
 
100
            self.callback = None
 
101
        self._stillPaging = 1
 
102
        self.collector = collector
 
103
        collector.broker.registerPageProducer(self)
 
104
 
 
105
    def stillPaging(self):
 
106
        """
 
107
        (internal) Method called by Broker.
 
108
        """
 
109
        if not self._stillPaging:
 
110
            self.collector.callRemote("endedPaging")
 
111
            if self.callback is not None:
 
112
                self.callback(*self.callbackArgs, **self.callbackKeyword)
 
113
        return self._stillPaging
 
114
 
 
115
    def sendNextPage(self):
 
116
        """
 
117
        (internal) Method called by Broker.
 
118
        """
 
119
        self.collector.callRemote("gotPage", self.nextPage())
 
120
 
 
121
    def nextPage(self):
 
122
        """
 
123
        Override this to return an object to be sent to my collector.
 
124
        """
 
125
        raise NotImplementedError()
 
126
 
 
127
    def stopPaging(self):
 
128
        """
 
129
        Call this when you're done paging.
 
130
        """
 
131
        self._stillPaging = 0
 
132
 
 
133
 
 
134
class StringPager(Pager):
 
135
    """
 
136
    A simple pager that splits a string into chunks.
 
137
    """
 
138
    def __init__(self, collector, st, chunkSize=8192, callback=None, *args, **kw):
 
139
        self.string = st
 
140
        self.pointer = 0
 
141
        self.chunkSize = chunkSize
 
142
        Pager.__init__(self, collector, callback, *args, **kw)
 
143
 
 
144
    def nextPage(self):
 
145
        val = self.string[self.pointer:self.pointer+self.chunkSize]
 
146
        self.pointer += self.chunkSize
 
147
        if self.pointer >= len(self.string):
 
148
            self.stopPaging()
 
149
        return val
 
150
 
 
151
 
 
152
class FilePager(Pager):
 
153
    """
 
154
    Reads a file in chunks and sends the chunks as they come.
 
155
    """
 
156
    implements(interfaces.IConsumer)
 
157
 
 
158
    def __init__(self, collector, fd, callback=None, *args, **kw):
 
159
        self.chunks = []
 
160
        Pager.__init__(self, collector, callback, *args, **kw)
 
161
        self.startProducing(fd)
 
162
 
 
163
    def startProducing(self, fd):
 
164
        self.deferred = basic.FileSender().beginFileTransfer(fd, self)
 
165
        self.deferred.addBoth(lambda x : self.stopPaging())
 
166
 
 
167
    def registerProducer(self, producer, streaming):
 
168
        self.producer = producer
 
169
        if not streaming:
 
170
            self.producer.resumeProducing()
 
171
 
 
172
    def unregisterProducer(self):
 
173
        self.producer = None
 
174
 
 
175
    def write(self, chunk):
 
176
        self.chunks.append(chunk)
 
177
 
 
178
    def sendNextPage(self):
 
179
        """
 
180
        Get the first chunk read and send it to collector.
 
181
        """
 
182
        if not self.chunks:
 
183
            return
 
184
        val = self.chunks.pop(0)
 
185
        self.producer.resumeProducing()
 
186
        self.collector.callRemote("gotPage", val)
 
187
 
 
188
 
 
189
# Utility paging stuff.
 
190
class CallbackPageCollector(pb.Referenceable):
 
191
    """
 
192
    I receive pages from the peer. You may instantiate a Pager with a
 
193
    remote reference to me. I will call the callback with a list of pages
 
194
    once they are all received.
 
195
    """
 
196
    def __init__(self, callback):
 
197
        self.pages = []
 
198
        self.callback = callback
 
199
 
 
200
    def remote_gotPage(self, page):
 
201
        self.pages.append(page)
 
202
 
 
203
    def remote_endedPaging(self):
 
204
        self.callback(self.pages)
 
205
 
 
206
 
 
207
def getAllPages(referenceable, methodName, *args, **kw):
 
208
    """
 
209
    A utility method that will call a remote method which expects a
 
210
    PageCollector as the first argument.
 
211
    """
 
212
    d = defer.Deferred()
 
213
    referenceable.callRemote(methodName, CallbackPageCollector(d.callback), *args, **kw)
 
214
    return d
 
215