1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
"""Support for generic select()able objects.
9
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
14
from zope.interface import implements
17
from twisted.python import log, reflect, failure
18
from twisted.persisted import styles
21
import interfaces, main
23
class FileDescriptor(log.Logger, styles.Ephemeral, object):
24
"""An object which can be operated on by select().
26
This is an abstract superclass of all objects which may be notified when
27
they are readable or writable; e.g. they have a file-descriptor that is
28
valid to be passed to select(2).
36
_writeDisconnecting = False
37
_writeDisconnected = False
43
implements(interfaces.IProducer, interfaces.IReadWriteDescriptor,
44
interfaces.IConsumer, interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
46
def __init__(self, reactor=None):
48
from twisted.internet import reactor
49
self.reactor = reactor
50
self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
53
def connectionLost(self, reason):
54
"""The connection was lost.
56
This is called when the connection on a selectable object has been
57
lost. It will be called whether the connection was closed explicitly,
58
an exception occurred in an event handler, or the other end of the
59
connection closed it first.
61
Clean up state here, but make sure to call back up to FileDescriptor.
66
if self.producer is not None:
67
self.producer.stopProducing()
72
def writeSomeData(self, data):
73
"""Write as much as possible of the given data, immediately.
75
This is called to invoke the lower-level writing functionality, such as
76
a socket's send() method, or a file's write(); this method returns an
77
integer. If positive, it is the number of bytes written; if negative,
78
it indicates the connection was lost.
81
raise NotImplementedError("%s does not implement writeSomeData" %
82
reflect.qual(self.__class__))
85
"""Called when data is avaliable for reading.
87
Subclasses must override this method. The result will be interpreted
88
in the same way as a result of doWrite().
90
raise NotImplementedError("%s does not implement doRead" %
91
reflect.qual(self.__class__))
94
"""Called when data can be written.
96
A result that is true (which will be a negative number) implies the
97
connection was lost. A false result implies the connection is still
98
there; a result of 0 implies no write was done, and a result of None
99
indicates that a write was done.
101
if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
102
# If there is currently less than SEND_LIMIT bytes left to send
103
# in the string, extend it with the array data.
104
self.dataBuffer = buffer(self.dataBuffer, self.offset) + "".join(self._tempDataBuffer)
106
self._tempDataBuffer = []
107
self._tempDataLen = 0
109
# Send as much data as you can.
111
l = self.writeSomeData(buffer(self.dataBuffer, self.offset))
113
l = self.writeSomeData(self.dataBuffer)
114
if l < 0 or isinstance(l, Exception):
116
if l == 0 and self.dataBuffer:
121
# If there is nothing left to send,
122
if self.offset == len(self.dataBuffer) and not self._tempDataLen:
127
# If I've got a producer who is supposed to supply me with data,
128
if self.producer is not None and ((not self.streamingProducer)
129
or self.producerPaused):
130
# tell them to supply some more.
131
self.producerPaused = 0
132
self.producer.resumeProducing()
133
elif self.disconnecting:
134
# But if I was previously asked to let the connection die, do
136
return self._postLoseConnection()
137
elif self._writeDisconnecting:
138
# I was previously asked to to half-close the connection.
139
result = self._closeWriteConnection()
140
self._writeDisconnected = True
144
def _postLoseConnection(self):
145
"""Called after a loseConnection(), when all data has been written.
147
Whatever this returns is then returned by doWrite.
149
# default implementation, telling reactor we're finished
150
return main.CONNECTION_DONE
152
def _closeWriteConnection(self):
153
# override in subclasses
156
def writeConnectionLost(self, reason):
157
# in current code should never be called
158
self.connectionLost(reason)
160
def readConnectionLost(self, reason):
161
# override in subclasses
162
self.connectionLost(reason)
164
def write(self, data):
165
"""Reliably write some data.
167
The data is buffered until the underlying file descriptor is ready
168
for writing. If there is more than C{self.bufferSize} data in the
169
buffer and this descriptor has a registered streaming producer, its
170
C{pauseProducing()} method will be called.
172
if isinstance(data, unicode): # no, really, I mean it
173
raise TypeError("Data must not be unicode")
174
if not self.connected or self._writeDisconnected:
177
self._tempDataBuffer.append(data)
178
self._tempDataLen += len(data)
179
# If we are responsible for pausing our producer,
180
if self.producer is not None and self.streamingProducer:
181
# and our buffer is full,
182
if len(self.dataBuffer) + self._tempDataLen > self.bufferSize:
184
self.producerPaused = 1
185
self.producer.pauseProducing()
188
def writeSequence(self, iovec):
189
"""Reliably write a sequence of data.
191
Currently, this is a convenience method roughly equivalent to::
196
It may have a more efficient implementation at a later time or in a
199
As with the C{write()} method, if a buffer size limit is reached and a
200
streaming producer is registered, it will be paused until the buffered
201
data is written to the underlying file descriptor.
203
if not self.connected or not iovec or self._writeDisconnected:
205
self._tempDataBuffer.extend(iovec)
207
self._tempDataLen += len(i)
208
# If we are responsible for pausing our producer,
209
if self.producer is not None and self.streamingProducer:
210
# and our buffer is full,
211
if len(self.dataBuffer) + self._tempDataLen > self.bufferSize:
213
self.producerPaused = 1
214
self.producer.pauseProducing()
217
def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
218
"""Close the connection at the next available opportunity.
220
Call this to cause this FileDescriptor to lose its connection. It will
221
first write any data that it has buffered.
223
If there is data buffered yet to be written, this method will cause the
224
transport to lose its connection as soon as it's done flushing its
225
write buffer. If you have a producer registered, the connection won't
226
be closed until the producer is finished. Therefore, make sure you
227
unregister your producer when it's finished, or the connection will
231
if self.connected and not self.disconnecting:
232
if self._writeDisconnected:
233
# doWrite won't trigger the connection close anymore
236
self.connectionLost(_connDone)
240
self.disconnecting = 1
242
def loseWriteConnection(self):
243
self._writeDisconnecting = True
246
def stopReading(self):
247
"""Stop waiting for read availability.
249
Call this to remove this selectable from being notified when it is
252
self.reactor.removeReader(self)
254
def stopWriting(self):
255
"""Stop waiting for write availability.
257
Call this to remove this selectable from being notified when it is ready
260
self.reactor.removeWriter(self)
262
def startReading(self):
263
"""Start waiting for read availability.
265
self.reactor.addReader(self)
267
def startWriting(self):
268
"""Start waiting for write availability.
270
Call this to have this FileDescriptor be notified whenever it is ready for
273
self.reactor.addWriter(self)
275
# Producer/consumer implementation
277
# first, the consumer stuff. This requires no additional work, as
278
# any object you can write to can be a consumer, really.
281
bufferSize = 2**2**2**2
283
def registerProducer(self, producer, streaming):
284
"""Register to receive data from a producer.
286
This sets this selectable to be a consumer for a producer. When this
287
selectable runs out of data on a write() call, it will ask the producer
288
to resumeProducing(). When the FileDescriptor's internal data buffer is
289
filled, it will ask the producer to pauseProducing(). If the connection
290
is lost, FileDescriptor calls producer's stopProducing() method.
292
If streaming is true, the producer should provide the IPushProducer
293
interface. Otherwise, it is assumed that producer provides the
294
IPullProducer interface. In this case, the producer won't be asked
295
to pauseProducing(), but it has to be careful to write() data only
296
when its resumeProducing() method is called.
298
if self.producer is not None:
299
raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer))
300
if self.disconnected:
301
producer.stopProducing()
303
self.producer = producer
304
self.streamingProducer = streaming
306
producer.resumeProducing()
308
def unregisterProducer(self):
309
"""Stop consuming data from a producer, without disconnecting.
313
def stopConsuming(self):
314
"""Stop consuming data.
316
This is called when a producer has lost its connection, to tell the
317
consumer to go lose its connection (and break potential circular
320
self.unregisterProducer()
321
self.loseConnection()
323
# producer interface implementation
325
def resumeProducing(self):
326
assert self.connected and not self.disconnecting
329
def pauseProducing(self):
332
def stopProducing(self):
333
self.loseConnection()
337
"""File Descriptor number for select().
339
This method must be overridden or assigned in subclasses to
340
indicate a valid file descriptor for the operating system.
345
def isIPAddress(addr):
346
parts = string.split(addr, '.')
349
for part in map(int, parts):
350
if not (0<=part<256):
358
__all__ = ["FileDescriptor"]