~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/internet/abstract.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
 
# See LICENSE for details.
3
 
 
4
 
 
5
 
"""Support for generic select()able objects.
6
 
 
7
 
API Stability: stable
8
 
 
9
 
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
10
 
"""
11
 
 
12
 
# System Imports
13
 
import types, string
14
 
from zope.interface import implements
15
 
 
16
 
# Twisted Imports
17
 
from twisted.python import log, reflect, failure
18
 
from twisted.persisted import styles
19
 
 
20
 
# Sibling Imports
21
 
import interfaces, main
22
 
 
23
 
class FileDescriptor(log.Logger, styles.Ephemeral, object):
24
 
    """An object which can be operated on by select().
25
 
 
26
 
    This is an abstract superclass of all objects which may be notified when
27
 
    they are readable or writable; e.g. they have a file-descriptor that is
28
 
    valid to be passed to select(2).
29
 
    """
30
 
    connected = 0
31
 
    producerPaused = 0
32
 
    streamingProducer = 0
33
 
    producer = None
34
 
    disconnected = 0
35
 
    disconnecting = 0
36
 
    _writeDisconnecting = False
37
 
    _writeDisconnected = False
38
 
    dataBuffer = ""
39
 
    offset = 0
40
 
 
41
 
    SEND_LIMIT = 128*1024
42
 
 
43
 
    implements(interfaces.IProducer, interfaces.IReadWriteDescriptor,
44
 
               interfaces.IConsumer, interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
45
 
 
46
 
    def __init__(self, reactor=None):
47
 
        if not reactor:
48
 
            from twisted.internet import reactor
49
 
        self.reactor = reactor
50
 
        self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
51
 
        self._tempDataLen = 0
52
 
 
53
 
    def connectionLost(self, reason):
54
 
        """The connection was lost.
55
 
 
56
 
        This is called when the connection on a selectable object has been
57
 
        lost.  It will be called whether the connection was closed explicitly,
58
 
        an exception occurred in an event handler, or the other end of the
59
 
        connection closed it first.
60
 
 
61
 
        Clean up state here, but make sure to call back up to FileDescriptor.
62
 
        """
63
 
 
64
 
        self.disconnected = 1
65
 
        self.connected = 0
66
 
        if self.producer is not None:
67
 
            self.producer.stopProducing()
68
 
            self.producer = None
69
 
        self.stopReading()
70
 
        self.stopWriting()
71
 
 
72
 
    def writeSomeData(self, data):
73
 
        """Write as much as possible of the given data, immediately.
74
 
 
75
 
        This is called to invoke the lower-level writing functionality, such as
76
 
        a socket's send() method, or a file's write(); this method returns an
77
 
        integer.  If positive, it is the number of bytes written; if negative,
78
 
        it indicates the connection was lost.
79
 
        """
80
 
 
81
 
        raise NotImplementedError("%s does not implement writeSomeData" %
82
 
                                  reflect.qual(self.__class__))
83
 
 
84
 
    def doRead(self):
85
 
        """Called when data is avaliable for reading.
86
 
 
87
 
        Subclasses must override this method. The result will be interpreted
88
 
        in the same way as a result of doWrite().
89
 
        """
90
 
        raise NotImplementedError("%s does not implement doRead" %
91
 
                                  reflect.qual(self.__class__))
92
 
 
93
 
    def doWrite(self):
94
 
        """Called when data can be written.
95
 
 
96
 
        A result that is true (which will be a negative number) implies the
97
 
        connection was lost. A false result implies the connection is still
98
 
        there; a result of 0 implies no write was done, and a result of None
99
 
        indicates that a write was done.
100
 
        """
101
 
        if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
102
 
            # If there is currently less than SEND_LIMIT bytes left to send
103
 
            # in the string, extend it with the array data.
104
 
            self.dataBuffer = buffer(self.dataBuffer, self.offset) + "".join(self._tempDataBuffer)
105
 
            self.offset = 0
106
 
            self._tempDataBuffer = []
107
 
            self._tempDataLen = 0
108
 
 
109
 
        # Send as much data as you can.
110
 
        if self.offset:
111
 
            l = self.writeSomeData(buffer(self.dataBuffer, self.offset))
112
 
        else:
113
 
            l = self.writeSomeData(self.dataBuffer)
114
 
        if l < 0 or isinstance(l, Exception):
115
 
            return l
116
 
        if l == 0 and self.dataBuffer:
117
 
            result = 0
118
 
        else:
119
 
            result = None
120
 
        self.offset += l
121
 
        # If there is nothing left to send,
122
 
        if self.offset == len(self.dataBuffer) and not self._tempDataLen:
123
 
            self.dataBuffer = ""
124
 
            self.offset = 0
125
 
            # stop writing.
126
 
            self.stopWriting()
127
 
            # If I've got a producer who is supposed to supply me with data,
128
 
            if self.producer is not None and ((not self.streamingProducer)
129
 
                                              or self.producerPaused):
130
 
                # tell them to supply some more.
131
 
                self.producerPaused = 0
132
 
                self.producer.resumeProducing()
133
 
            elif self.disconnecting:
134
 
                # But if I was previously asked to let the connection die, do
135
 
                # so.
136
 
                return self._postLoseConnection()
137
 
            elif self._writeDisconnecting:
138
 
                # I was previously asked to to half-close the connection.
139
 
                result = self._closeWriteConnection()
140
 
                self._writeDisconnected = True
141
 
                return result
142
 
        return result
143
 
 
144
 
    def _postLoseConnection(self):
145
 
        """Called after a loseConnection(), when all data has been written.
146
 
 
147
 
        Whatever this returns is then returned by doWrite.
148
 
        """
149
 
        # default implementation, telling reactor we're finished
150
 
        return main.CONNECTION_DONE
151
 
 
152
 
    def _closeWriteConnection(self):
153
 
        # override in subclasses
154
 
        pass
155
 
 
156
 
    def writeConnectionLost(self, reason):
157
 
        # in current code should never be called
158
 
        self.connectionLost(reason)
159
 
 
160
 
    def readConnectionLost(self, reason):
161
 
        # override in subclasses
162
 
        self.connectionLost(reason)
163
 
 
164
 
    def write(self, data):
165
 
        """Reliably write some data.
166
 
 
167
 
        The data is buffered until the underlying file descriptor is ready
168
 
        for writing. If there is more than C{self.bufferSize} data in the
169
 
        buffer and this descriptor has a registered streaming producer, its
170
 
        C{pauseProducing()} method will be called.
171
 
        """
172
 
        if isinstance(data, unicode): # no, really, I mean it
173
 
            raise TypeError("Data must not be unicode")
174
 
        if not self.connected or self._writeDisconnected:
175
 
            return
176
 
        if data:
177
 
            self._tempDataBuffer.append(data)
178
 
            self._tempDataLen += len(data)
179
 
            # If we are responsible for pausing our producer,
180
 
            if self.producer is not None and self.streamingProducer:
181
 
                # and our buffer is full,
182
 
                if len(self.dataBuffer) + self._tempDataLen > self.bufferSize:
183
 
                    # pause it.
184
 
                    self.producerPaused = 1
185
 
                    self.producer.pauseProducing()
186
 
            self.startWriting()
187
 
 
188
 
    def writeSequence(self, iovec):
189
 
        """Reliably write a sequence of data.
190
 
 
191
 
        Currently, this is a convenience method roughly equivalent to::
192
 
 
193
 
            for chunk in iovec:
194
 
                fd.write(chunk)
195
 
 
196
 
        It may have a more efficient implementation at a later time or in a
197
 
        different reactor.
198
 
 
199
 
        As with the C{write()} method, if a buffer size limit is reached and a
200
 
        streaming producer is registered, it will be paused until the buffered
201
 
        data is written to the underlying file descriptor.
202
 
        """
203
 
        if not self.connected or not iovec or self._writeDisconnected:
204
 
            return
205
 
        self._tempDataBuffer.extend(iovec)
206
 
        for i in iovec:
207
 
            self._tempDataLen += len(i)
208
 
        # If we are responsible for pausing our producer,
209
 
        if self.producer is not None and self.streamingProducer:
210
 
            # and our buffer is full,
211
 
            if len(self.dataBuffer) + self._tempDataLen > self.bufferSize:
212
 
                # pause it.
213
 
                self.producerPaused = 1
214
 
                self.producer.pauseProducing()
215
 
        self.startWriting()
216
 
 
217
 
    def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
218
 
        """Close the connection at the next available opportunity.
219
 
 
220
 
        Call this to cause this FileDescriptor to lose its connection.  It will
221
 
        first write any data that it has buffered.
222
 
 
223
 
        If there is data buffered yet to be written, this method will cause the
224
 
        transport to lose its connection as soon as it's done flushing its
225
 
        write buffer.  If you have a producer registered, the connection won't
226
 
        be closed until the producer is finished. Therefore, make sure you
227
 
        unregister your producer when it's finished, or the connection will
228
 
        never close.
229
 
        """
230
 
 
231
 
        if self.connected and not self.disconnecting:
232
 
            if self._writeDisconnected:
233
 
                # doWrite won't trigger the connection close anymore
234
 
                self.stopReading()
235
 
                self.stopWriting()
236
 
                self.connectionLost(_connDone)
237
 
            else:
238
 
                self.stopReading()
239
 
                self.startWriting()
240
 
                self.disconnecting = 1
241
 
 
242
 
    def loseWriteConnection(self):
243
 
        self._writeDisconnecting = True
244
 
        self.startWriting()
245
 
 
246
 
    def stopReading(self):
247
 
        """Stop waiting for read availability.
248
 
 
249
 
        Call this to remove this selectable from being notified when it is
250
 
        ready for reading.
251
 
        """
252
 
        self.reactor.removeReader(self)
253
 
 
254
 
    def stopWriting(self):
255
 
        """Stop waiting for write availability.
256
 
 
257
 
        Call this to remove this selectable from being notified when it is ready
258
 
        for writing.
259
 
        """
260
 
        self.reactor.removeWriter(self)
261
 
 
262
 
    def startReading(self):
263
 
        """Start waiting for read availability.
264
 
        """
265
 
        self.reactor.addReader(self)
266
 
 
267
 
    def startWriting(self):
268
 
        """Start waiting for write availability.
269
 
 
270
 
        Call this to have this FileDescriptor be notified whenever it is ready for
271
 
        writing.
272
 
        """
273
 
        self.reactor.addWriter(self)
274
 
 
275
 
    # Producer/consumer implementation
276
 
 
277
 
    # first, the consumer stuff.  This requires no additional work, as
278
 
    # any object you can write to can be a consumer, really.
279
 
 
280
 
    producer = None
281
 
    bufferSize = 2**2**2**2
282
 
 
283
 
    def registerProducer(self, producer, streaming):
284
 
        """Register to receive data from a producer.
285
 
 
286
 
        This sets this selectable to be a consumer for a producer.  When this
287
 
        selectable runs out of data on a write() call, it will ask the producer
288
 
        to resumeProducing(). When the FileDescriptor's internal data buffer is
289
 
        filled, it will ask the producer to pauseProducing(). If the connection
290
 
        is lost, FileDescriptor calls producer's stopProducing() method.
291
 
 
292
 
        If streaming is true, the producer should provide the IPushProducer
293
 
        interface. Otherwise, it is assumed that producer provides the
294
 
        IPullProducer interface. In this case, the producer won't be asked
295
 
        to pauseProducing(), but it has to be careful to write() data only
296
 
        when its resumeProducing() method is called.
297
 
        """
298
 
        if self.producer is not None:
299
 
            raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer))
300
 
        if self.disconnected:
301
 
            producer.stopProducing()
302
 
        else:
303
 
            self.producer = producer
304
 
            self.streamingProducer = streaming
305
 
            if not streaming:
306
 
                producer.resumeProducing()
307
 
 
308
 
    def unregisterProducer(self):
309
 
        """Stop consuming data from a producer, without disconnecting.
310
 
        """
311
 
        self.producer = None
312
 
 
313
 
    def stopConsuming(self):
314
 
        """Stop consuming data.
315
 
 
316
 
        This is called when a producer has lost its connection, to tell the
317
 
        consumer to go lose its connection (and break potential circular
318
 
        references).
319
 
        """
320
 
        self.unregisterProducer()
321
 
        self.loseConnection()
322
 
 
323
 
    # producer interface implementation
324
 
 
325
 
    def resumeProducing(self):
326
 
        assert self.connected and not self.disconnecting
327
 
        self.startReading()
328
 
 
329
 
    def pauseProducing(self):
330
 
        self.stopReading()
331
 
 
332
 
    def stopProducing(self):
333
 
        self.loseConnection()
334
 
 
335
 
 
336
 
    def fileno(self):
337
 
        """File Descriptor number for select().
338
 
 
339
 
        This method must be overridden or assigned in subclasses to
340
 
        indicate a valid file descriptor for the operating system.
341
 
        """
342
 
        return -1
343
 
 
344
 
 
345
 
def isIPAddress(addr):
346
 
    parts = string.split(addr, '.')
347
 
    if len(parts) == 4:
348
 
        try:
349
 
            for part in map(int, parts):
350
 
                if not (0<=part<256):
351
 
                    break
352
 
            else:
353
 
                return 1
354
 
        except ValueError:
355
 
                pass
356
 
    return 0
357
 
 
358
 
__all__ = ["FileDescriptor"]