~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/abstract.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_abstract -*-
 
2
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
 
 
6
"""
 
7
Support for generic select()able objects.
 
8
 
 
9
Maintainer: Itamar Shtull-Trauring
 
10
"""
 
11
 
 
12
from zope.interface import implements
 
13
 
 
14
# Twisted Imports
 
15
from twisted.python import log, reflect, failure
 
16
from twisted.persisted import styles
 
17
from twisted.internet import interfaces, main
 
18
 
 
19
 
 
20
class FileDescriptor(log.Logger, styles.Ephemeral, object):
 
21
    """An object which can be operated on by select().
 
22
 
 
23
    This is an abstract superclass of all objects which may be notified when
 
24
    they are readable or writable; e.g. they have a file-descriptor that is
 
25
    valid to be passed to select(2).
 
26
    """
 
27
    connected = 0
 
28
    producerPaused = 0
 
29
    streamingProducer = 0
 
30
    producer = None
 
31
    disconnected = 0
 
32
    disconnecting = 0
 
33
    _writeDisconnecting = False
 
34
    _writeDisconnected = False
 
35
    dataBuffer = ""
 
36
    offset = 0
 
37
 
 
38
    SEND_LIMIT = 128*1024
 
39
 
 
40
    implements(interfaces.IProducer, interfaces.IReadWriteDescriptor,
 
41
               interfaces.IConsumer, interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
 
42
 
 
43
    def __init__(self, reactor=None):
 
44
        if not reactor:
 
45
            from twisted.internet import reactor
 
46
        self.reactor = reactor
 
47
        self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
 
48
        self._tempDataLen = 0
 
49
 
 
50
    def connectionLost(self, reason):
 
51
        """The connection was lost.
 
52
 
 
53
        This is called when the connection on a selectable object has been
 
54
        lost.  It will be called whether the connection was closed explicitly,
 
55
        an exception occurred in an event handler, or the other end of the
 
56
        connection closed it first.
 
57
 
 
58
        Clean up state here, but make sure to call back up to FileDescriptor.
 
59
        """
 
60
 
 
61
        self.disconnected = 1
 
62
        self.connected = 0
 
63
        if self.producer is not None:
 
64
            self.producer.stopProducing()
 
65
            self.producer = None
 
66
        self.stopReading()
 
67
        self.stopWriting()
 
68
 
 
69
 
 
70
    def writeSomeData(self, data):
 
71
        """
 
72
        Write as much as possible of the given data, immediately.
 
73
 
 
74
        This is called to invoke the lower-level writing functionality, such
 
75
        as a socket's send() method, or a file's write(); this method
 
76
        returns an integer or an exception.  If an integer, it is the number
 
77
        of bytes written (possibly zero); if an exception, it indicates the
 
78
        connection was lost.
 
79
        """
 
80
        raise NotImplementedError("%s does not implement writeSomeData" %
 
81
                                  reflect.qual(self.__class__))
 
82
 
 
83
 
 
84
    def doRead(self):
 
85
        """Called when data is avaliable for reading.
 
86
 
 
87
        Subclasses must override this method. The result will be interpreted
 
88
        in the same way as a result of doWrite().
 
89
        """
 
90
        raise NotImplementedError("%s does not implement doRead" %
 
91
                                  reflect.qual(self.__class__))
 
92
 
 
93
    def doWrite(self):
 
94
        """
 
95
        Called when data can be written.
 
96
 
 
97
        A result that is true (which will be a negative number or an
 
98
        exception instance) indicates that the connection was lost. A false
 
99
        result implies the connection is still there; a result of 0
 
100
        indicates no write was done, and a result of None indicates that a
 
101
        write was done.
 
102
        """
 
103
        if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
 
104
            # If there is currently less than SEND_LIMIT bytes left to send
 
105
            # in the string, extend it with the array data.
 
106
            self.dataBuffer = buffer(self.dataBuffer, self.offset) + "".join(self._tempDataBuffer)
 
107
            self.offset = 0
 
108
            self._tempDataBuffer = []
 
109
            self._tempDataLen = 0
 
110
 
 
111
        # Send as much data as you can.
 
112
        if self.offset:
 
113
            l = self.writeSomeData(buffer(self.dataBuffer, self.offset))
 
114
        else:
 
115
            l = self.writeSomeData(self.dataBuffer)
 
116
 
 
117
        # There is no writeSomeData implementation in Twisted which returns
 
118
        # 0, but the documentation for writeSomeData used to claim negative
 
119
        # integers meant connection lost.  Keep supporting this here,
 
120
        # although it may be worth deprecating and removing at some point.
 
121
        if l < 0 or isinstance(l, Exception):
 
122
            return l
 
123
        if l == 0 and self.dataBuffer:
 
124
            result = 0
 
125
        else:
 
126
            result = None
 
127
        self.offset += l
 
128
        # If there is nothing left to send,
 
129
        if self.offset == len(self.dataBuffer) and not self._tempDataLen:
 
130
            self.dataBuffer = ""
 
131
            self.offset = 0
 
132
            # stop writing.
 
133
            self.stopWriting()
 
134
            # If I've got a producer who is supposed to supply me with data,
 
135
            if self.producer is not None and ((not self.streamingProducer)
 
136
                                              or self.producerPaused):
 
137
                # tell them to supply some more.
 
138
                self.producerPaused = 0
 
139
                self.producer.resumeProducing()
 
140
            elif self.disconnecting:
 
141
                # But if I was previously asked to let the connection die, do
 
142
                # so.
 
143
                return self._postLoseConnection()
 
144
            elif self._writeDisconnecting:
 
145
                # I was previously asked to to half-close the connection.
 
146
                result = self._closeWriteConnection()
 
147
                self._writeDisconnected = True
 
148
                return result
 
149
        return result
 
150
 
 
151
    def _postLoseConnection(self):
 
152
        """Called after a loseConnection(), when all data has been written.
 
153
 
 
154
        Whatever this returns is then returned by doWrite.
 
155
        """
 
156
        # default implementation, telling reactor we're finished
 
157
        return main.CONNECTION_DONE
 
158
 
 
159
    def _closeWriteConnection(self):
 
160
        # override in subclasses
 
161
        pass
 
162
 
 
163
    def writeConnectionLost(self, reason):
 
164
        # in current code should never be called
 
165
        self.connectionLost(reason)
 
166
 
 
167
    def readConnectionLost(self, reason):
 
168
        # override in subclasses
 
169
        self.connectionLost(reason)
 
170
 
 
171
    def write(self, data):
 
172
        """Reliably write some data.
 
173
 
 
174
        The data is buffered until the underlying file descriptor is ready
 
175
        for writing. If there is more than C{self.bufferSize} data in the
 
176
        buffer and this descriptor has a registered streaming producer, its
 
177
        C{pauseProducing()} method will be called.
 
178
        """
 
179
        if isinstance(data, unicode): # no, really, I mean it
 
180
            raise TypeError("Data must not be unicode")
 
181
        if not self.connected or self._writeDisconnected:
 
182
            return
 
183
        if data:
 
184
            self._tempDataBuffer.append(data)
 
185
            self._tempDataLen += len(data)
 
186
            # If we are responsible for pausing our producer,
 
187
            if self.producer is not None and self.streamingProducer:
 
188
                # and our buffer is full,
 
189
                if len(self.dataBuffer) + self._tempDataLen > self.bufferSize:
 
190
                    # pause it.
 
191
                    self.producerPaused = 1
 
192
                    self.producer.pauseProducing()
 
193
            self.startWriting()
 
194
 
 
195
    def writeSequence(self, iovec):
 
196
        """Reliably write a sequence of data.
 
197
 
 
198
        Currently, this is a convenience method roughly equivalent to::
 
199
 
 
200
            for chunk in iovec:
 
201
                fd.write(chunk)
 
202
 
 
203
        It may have a more efficient implementation at a later time or in a
 
204
        different reactor.
 
205
 
 
206
        As with the C{write()} method, if a buffer size limit is reached and a
 
207
        streaming producer is registered, it will be paused until the buffered
 
208
        data is written to the underlying file descriptor.
 
209
        """
 
210
        if not self.connected or not iovec or self._writeDisconnected:
 
211
            return
 
212
        self._tempDataBuffer.extend(iovec)
 
213
        for i in iovec:
 
214
            self._tempDataLen += len(i)
 
215
        # If we are responsible for pausing our producer,
 
216
        if self.producer is not None and self.streamingProducer:
 
217
            # and our buffer is full,
 
218
            if len(self.dataBuffer) + self._tempDataLen > self.bufferSize:
 
219
                # pause it.
 
220
                self.producerPaused = 1
 
221
                self.producer.pauseProducing()
 
222
        self.startWriting()
 
223
 
 
224
    def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
 
225
        """Close the connection at the next available opportunity.
 
226
 
 
227
        Call this to cause this FileDescriptor to lose its connection.  It will
 
228
        first write any data that it has buffered.
 
229
 
 
230
        If there is data buffered yet to be written, this method will cause the
 
231
        transport to lose its connection as soon as it's done flushing its
 
232
        write buffer.  If you have a producer registered, the connection won't
 
233
        be closed until the producer is finished. Therefore, make sure you
 
234
        unregister your producer when it's finished, or the connection will
 
235
        never close.
 
236
        """
 
237
 
 
238
        if self.connected and not self.disconnecting:
 
239
            if self._writeDisconnected:
 
240
                # doWrite won't trigger the connection close anymore
 
241
                self.stopReading()
 
242
                self.stopWriting()
 
243
                self.connectionLost(_connDone)
 
244
            else:
 
245
                self.stopReading()
 
246
                self.startWriting()
 
247
                self.disconnecting = 1
 
248
 
 
249
    def loseWriteConnection(self):
 
250
        self._writeDisconnecting = True
 
251
        self.startWriting()
 
252
 
 
253
    def stopReading(self):
 
254
        """Stop waiting for read availability.
 
255
 
 
256
        Call this to remove this selectable from being notified when it is
 
257
        ready for reading.
 
258
        """
 
259
        self.reactor.removeReader(self)
 
260
 
 
261
    def stopWriting(self):
 
262
        """Stop waiting for write availability.
 
263
 
 
264
        Call this to remove this selectable from being notified when it is ready
 
265
        for writing.
 
266
        """
 
267
        self.reactor.removeWriter(self)
 
268
 
 
269
    def startReading(self):
 
270
        """Start waiting for read availability.
 
271
        """
 
272
        self.reactor.addReader(self)
 
273
 
 
274
    def startWriting(self):
 
275
        """Start waiting for write availability.
 
276
 
 
277
        Call this to have this FileDescriptor be notified whenever it is ready for
 
278
        writing.
 
279
        """
 
280
        self.reactor.addWriter(self)
 
281
 
 
282
    # Producer/consumer implementation
 
283
 
 
284
    # first, the consumer stuff.  This requires no additional work, as
 
285
    # any object you can write to can be a consumer, really.
 
286
 
 
287
    producer = None
 
288
    bufferSize = 2**2**2**2
 
289
 
 
290
    def registerProducer(self, producer, streaming):
 
291
        """Register to receive data from a producer.
 
292
 
 
293
        This sets this selectable to be a consumer for a producer.  When this
 
294
        selectable runs out of data on a write() call, it will ask the producer
 
295
        to resumeProducing(). When the FileDescriptor's internal data buffer is
 
296
        filled, it will ask the producer to pauseProducing(). If the connection
 
297
        is lost, FileDescriptor calls producer's stopProducing() method.
 
298
 
 
299
        If streaming is true, the producer should provide the IPushProducer
 
300
        interface. Otherwise, it is assumed that producer provides the
 
301
        IPullProducer interface. In this case, the producer won't be asked
 
302
        to pauseProducing(), but it has to be careful to write() data only
 
303
        when its resumeProducing() method is called.
 
304
        """
 
305
        if self.producer is not None:
 
306
            raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer))
 
307
        if self.disconnected:
 
308
            producer.stopProducing()
 
309
        else:
 
310
            self.producer = producer
 
311
            self.streamingProducer = streaming
 
312
            if not streaming:
 
313
                producer.resumeProducing()
 
314
 
 
315
    def unregisterProducer(self):
 
316
        """Stop consuming data from a producer, without disconnecting.
 
317
        """
 
318
        self.producer = None
 
319
 
 
320
    def stopConsuming(self):
 
321
        """Stop consuming data.
 
322
 
 
323
        This is called when a producer has lost its connection, to tell the
 
324
        consumer to go lose its connection (and break potential circular
 
325
        references).
 
326
        """
 
327
        self.unregisterProducer()
 
328
        self.loseConnection()
 
329
 
 
330
    # producer interface implementation
 
331
 
 
332
    def resumeProducing(self):
 
333
        assert self.connected and not self.disconnecting
 
334
        self.startReading()
 
335
 
 
336
    def pauseProducing(self):
 
337
        self.stopReading()
 
338
 
 
339
    def stopProducing(self):
 
340
        self.loseConnection()
 
341
 
 
342
 
 
343
    def fileno(self):
 
344
        """File Descriptor number for select().
 
345
 
 
346
        This method must be overridden or assigned in subclasses to
 
347
        indicate a valid file descriptor for the operating system.
 
348
        """
 
349
        return -1
 
350
 
 
351
 
 
352
def isIPAddress(addr):
 
353
    """
 
354
    Determine whether the given string represents an IPv4 address.
 
355
 
 
356
    @type addr: C{str}
 
357
    @param addr: A string which may or may not be the decimal dotted
 
358
    representation of an IPv4 address.
 
359
 
 
360
    @rtype: C{bool}
 
361
    @return: C{True} if C{addr} represents an IPv4 address, C{False}
 
362
    otherwise.
 
363
    """
 
364
    dottedParts = addr.split('.')
 
365
    if len(dottedParts) == 4:
 
366
        for octet in dottedParts:
 
367
            try:
 
368
                value = int(octet)
 
369
            except ValueError:
 
370
                return False
 
371
            else:
 
372
                if value < 0 or value > 255:
 
373
                    return False
 
374
        return True
 
375
    return False
 
376
 
 
377
 
 
378
__all__ = ["FileDescriptor"]