1
# -*- test-case-name: twisted.test.test_abstract -*-
2
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
3
# See LICENSE for details.
7
Support for generic select()able objects.
9
Maintainer: Itamar Shtull-Trauring
12
from zope.interface import implements
15
from twisted.python import log, reflect, failure
16
from twisted.persisted import styles
17
from twisted.internet import interfaces, main
20
class FileDescriptor(log.Logger, styles.Ephemeral, object):
21
"""An object which can be operated on by select().
23
This is an abstract superclass of all objects which may be notified when
24
they are readable or writable; e.g. they have a file-descriptor that is
25
valid to be passed to select(2).
33
_writeDisconnecting = False
34
_writeDisconnected = False
40
implements(interfaces.IProducer, interfaces.IReadWriteDescriptor,
41
interfaces.IConsumer, interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
43
def __init__(self, reactor=None):
45
from twisted.internet import reactor
46
self.reactor = reactor
47
self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
50
def connectionLost(self, reason):
51
"""The connection was lost.
53
This is called when the connection on a selectable object has been
54
lost. It will be called whether the connection was closed explicitly,
55
an exception occurred in an event handler, or the other end of the
56
connection closed it first.
58
Clean up state here, but make sure to call back up to FileDescriptor.
63
if self.producer is not None:
64
self.producer.stopProducing()
70
def writeSomeData(self, data):
72
Write as much as possible of the given data, immediately.
74
This is called to invoke the lower-level writing functionality, such
75
as a socket's send() method, or a file's write(); this method
76
returns an integer or an exception. If an integer, it is the number
77
of bytes written (possibly zero); if an exception, it indicates the
80
raise NotImplementedError("%s does not implement writeSomeData" %
81
reflect.qual(self.__class__))
85
"""Called when data is avaliable for reading.
87
Subclasses must override this method. The result will be interpreted
88
in the same way as a result of doWrite().
90
raise NotImplementedError("%s does not implement doRead" %
91
reflect.qual(self.__class__))
95
Called when data can be written.
97
A result that is true (which will be a negative number or an
98
exception instance) indicates that the connection was lost. A false
99
result implies the connection is still there; a result of 0
100
indicates no write was done, and a result of None indicates that a
103
if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
104
# If there is currently less than SEND_LIMIT bytes left to send
105
# in the string, extend it with the array data.
106
self.dataBuffer = buffer(self.dataBuffer, self.offset) + "".join(self._tempDataBuffer)
108
self._tempDataBuffer = []
109
self._tempDataLen = 0
111
# Send as much data as you can.
113
l = self.writeSomeData(buffer(self.dataBuffer, self.offset))
115
l = self.writeSomeData(self.dataBuffer)
117
# There is no writeSomeData implementation in Twisted which returns
118
# 0, but the documentation for writeSomeData used to claim negative
119
# integers meant connection lost. Keep supporting this here,
120
# although it may be worth deprecating and removing at some point.
121
if l < 0 or isinstance(l, Exception):
123
if l == 0 and self.dataBuffer:
128
# If there is nothing left to send,
129
if self.offset == len(self.dataBuffer) and not self._tempDataLen:
134
# If I've got a producer who is supposed to supply me with data,
135
if self.producer is not None and ((not self.streamingProducer)
136
or self.producerPaused):
137
# tell them to supply some more.
138
self.producerPaused = 0
139
self.producer.resumeProducing()
140
elif self.disconnecting:
141
# But if I was previously asked to let the connection die, do
143
return self._postLoseConnection()
144
elif self._writeDisconnecting:
145
# I was previously asked to to half-close the connection.
146
result = self._closeWriteConnection()
147
self._writeDisconnected = True
151
def _postLoseConnection(self):
152
"""Called after a loseConnection(), when all data has been written.
154
Whatever this returns is then returned by doWrite.
156
# default implementation, telling reactor we're finished
157
return main.CONNECTION_DONE
159
def _closeWriteConnection(self):
160
# override in subclasses
163
def writeConnectionLost(self, reason):
164
# in current code should never be called
165
self.connectionLost(reason)
167
def readConnectionLost(self, reason):
168
# override in subclasses
169
self.connectionLost(reason)
171
def write(self, data):
172
"""Reliably write some data.
174
The data is buffered until the underlying file descriptor is ready
175
for writing. If there is more than C{self.bufferSize} data in the
176
buffer and this descriptor has a registered streaming producer, its
177
C{pauseProducing()} method will be called.
179
if isinstance(data, unicode): # no, really, I mean it
180
raise TypeError("Data must not be unicode")
181
if not self.connected or self._writeDisconnected:
184
self._tempDataBuffer.append(data)
185
self._tempDataLen += len(data)
186
# If we are responsible for pausing our producer,
187
if self.producer is not None and self.streamingProducer:
188
# and our buffer is full,
189
if len(self.dataBuffer) + self._tempDataLen > self.bufferSize:
191
self.producerPaused = 1
192
self.producer.pauseProducing()
195
def writeSequence(self, iovec):
196
"""Reliably write a sequence of data.
198
Currently, this is a convenience method roughly equivalent to::
203
It may have a more efficient implementation at a later time or in a
206
As with the C{write()} method, if a buffer size limit is reached and a
207
streaming producer is registered, it will be paused until the buffered
208
data is written to the underlying file descriptor.
210
if not self.connected or not iovec or self._writeDisconnected:
212
self._tempDataBuffer.extend(iovec)
214
self._tempDataLen += len(i)
215
# If we are responsible for pausing our producer,
216
if self.producer is not None and self.streamingProducer:
217
# and our buffer is full,
218
if len(self.dataBuffer) + self._tempDataLen > self.bufferSize:
220
self.producerPaused = 1
221
self.producer.pauseProducing()
224
def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
225
"""Close the connection at the next available opportunity.
227
Call this to cause this FileDescriptor to lose its connection. It will
228
first write any data that it has buffered.
230
If there is data buffered yet to be written, this method will cause the
231
transport to lose its connection as soon as it's done flushing its
232
write buffer. If you have a producer registered, the connection won't
233
be closed until the producer is finished. Therefore, make sure you
234
unregister your producer when it's finished, or the connection will
238
if self.connected and not self.disconnecting:
239
if self._writeDisconnected:
240
# doWrite won't trigger the connection close anymore
243
self.connectionLost(_connDone)
247
self.disconnecting = 1
249
def loseWriteConnection(self):
250
self._writeDisconnecting = True
253
def stopReading(self):
254
"""Stop waiting for read availability.
256
Call this to remove this selectable from being notified when it is
259
self.reactor.removeReader(self)
261
def stopWriting(self):
262
"""Stop waiting for write availability.
264
Call this to remove this selectable from being notified when it is ready
267
self.reactor.removeWriter(self)
269
def startReading(self):
270
"""Start waiting for read availability.
272
self.reactor.addReader(self)
274
def startWriting(self):
275
"""Start waiting for write availability.
277
Call this to have this FileDescriptor be notified whenever it is ready for
280
self.reactor.addWriter(self)
282
# Producer/consumer implementation
284
# first, the consumer stuff. This requires no additional work, as
285
# any object you can write to can be a consumer, really.
288
bufferSize = 2**2**2**2
290
def registerProducer(self, producer, streaming):
291
"""Register to receive data from a producer.
293
This sets this selectable to be a consumer for a producer. When this
294
selectable runs out of data on a write() call, it will ask the producer
295
to resumeProducing(). When the FileDescriptor's internal data buffer is
296
filled, it will ask the producer to pauseProducing(). If the connection
297
is lost, FileDescriptor calls producer's stopProducing() method.
299
If streaming is true, the producer should provide the IPushProducer
300
interface. Otherwise, it is assumed that producer provides the
301
IPullProducer interface. In this case, the producer won't be asked
302
to pauseProducing(), but it has to be careful to write() data only
303
when its resumeProducing() method is called.
305
if self.producer is not None:
306
raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer))
307
if self.disconnected:
308
producer.stopProducing()
310
self.producer = producer
311
self.streamingProducer = streaming
313
producer.resumeProducing()
315
def unregisterProducer(self):
316
"""Stop consuming data from a producer, without disconnecting.
320
def stopConsuming(self):
321
"""Stop consuming data.
323
This is called when a producer has lost its connection, to tell the
324
consumer to go lose its connection (and break potential circular
327
self.unregisterProducer()
328
self.loseConnection()
330
# producer interface implementation
332
def resumeProducing(self):
333
assert self.connected and not self.disconnecting
336
def pauseProducing(self):
339
def stopProducing(self):
340
self.loseConnection()
344
"""File Descriptor number for select().
346
This method must be overridden or assigned in subclasses to
347
indicate a valid file descriptor for the operating system.
352
def isIPAddress(addr):
354
Determine whether the given string represents an IPv4 address.
357
@param addr: A string which may or may not be the decimal dotted
358
representation of an IPv4 address.
361
@return: C{True} if C{addr} represents an IPv4 address, C{False}
364
dottedParts = addr.split('.')
365
if len(dottedParts) == 4:
366
for octet in dottedParts:
372
if value < 0 or value > 255:
378
__all__ = ["FileDescriptor"]