1
# Copyright (c) 2008 Twisted Matrix Laboratories.
2
# See LICENSE for details.
6
Abstract file handle class
9
from twisted.internet import main, error, interfaces
10
from twisted.python import log, failure
11
from twisted.persisted import styles
13
from zope.interface import implements
16
from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF
17
from twisted.internet.iocpreactor.const import ERROR_IO_PENDING
18
from twisted.internet.iocpreactor import iocpsupport as _iocp
22
class FileHandle(log.Logger, styles.Ephemeral, object):
24
File handle that can read and write asynchronously
26
implements(interfaces.IProducer, interfaces.IConsumer,
27
interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
32
dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs
34
_readSize = 0 # how much data we have in the read buffer
36
_readScheduledInOS = False
39
def startReading(self):
40
self.reactor.addActiveHandle(self)
41
if not self._readScheduled and not self.reading:
43
self._readScheduled = self.reactor.callLater(0,
47
def stopReading(self):
48
if self._readScheduled:
49
self._readScheduled.cancel()
50
self._readScheduled = None
54
def _resumeReading(self):
55
self._readScheduled = None
56
if self._dispatchData() and not self._readScheduledInOS:
60
def _dispatchData(self):
62
Dispatch previously read data. Return True if self.reading and we don't
65
if not self._readSize:
68
full_buffers = size // self.readBufferSize
69
while self._readNextBuffer < full_buffers:
70
self.dataReceived(self._readBuffers[self._readNextBuffer])
71
self._readNextBuffer += 1
74
remainder = size % self.readBufferSize
76
self.dataReceived(buffer(self._readBuffers[full_buffers],
78
if self.dynamicReadBuffers:
79
total_buffer_size = self.readBufferSize * len(self._readBuffers)
80
# we have one buffer too many
81
if size < total_buffer_size - self.readBufferSize:
82
del self._readBuffers[-1]
83
# we filled all buffers, so allocate one more
84
elif (size == total_buffer_size and
85
len(self._readBuffers) < self.maxReadBuffers):
86
self._readBuffers.append(_iocp.AllocateReadBuffer(
88
self._readNextBuffer = 0
93
def _cbRead(self, rc, bytes, evt):
94
self._readScheduledInOS = False
95
if self._handleRead(rc, bytes, evt):
99
def _handleRead(self, rc, bytes, evt):
101
Returns False if we should stop reading for now
103
if self.disconnected:
105
# graceful disconnection
106
if (not (rc or bytes)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF):
107
self.reactor.removeActiveHandle(self)
108
self.readConnectionLost(failure.Failure(main.CONNECTION_DONE))
110
# XXX: not handling WSAEWOULDBLOCK
111
# ("too many outstanding overlapped I/O requests")
113
self.connectionLost(failure.Failure(
114
error.ConnectionLost("read error -- %s (%s)" %
115
(errno.errorcode.get(rc, 'unknown'), rc))))
118
assert self._readSize == 0
119
assert self._readNextBuffer == 0
120
self._readSize = bytes
121
return self._dispatchData()
127
evt = _iocp.Event(self._cbRead, self)
129
evt.buff = buff = self._readBuffers
130
rc, bytes = self.readFromHandle(buff, evt)
132
if (rc == ERROR_IO_PENDING
133
or (not rc and numReads >= self.maxReads)):
134
self._readScheduledInOS = True
138
if not self._handleRead(rc, bytes, evt):
143
def readFromHandle(self, bufflist, evt):
144
raise NotImplementedError() # TODO: this should default to ReadFile
147
def dataReceived(self, data):
148
raise NotImplementedError
151
def readConnectionLost(self, reason):
152
self.connectionLost(reason)
159
_writeScheduled = None
160
_writeDisconnecting = False
161
_writeDisconnected = False
162
writeBufferSize = 2**2**2**2
166
def loseWriteConnection(self):
167
self._writeDisconnecting = True
171
def _closeWriteConnection(self):
172
# override in subclasses
176
def writeConnectionLost(self, reason):
177
# in current code should never be called
178
self.connectionLost(reason)
181
def startWriting(self):
182
self.reactor.addActiveHandle(self)
184
if not self._writeScheduled:
185
self._writeScheduled = self.reactor.callLater(0,
189
def stopWriting(self):
190
if self._writeScheduled:
191
self._writeScheduled.cancel()
192
self._writeScheduled = None
196
def _resumeWriting(self):
197
self._writeScheduled = None
201
def _cbWrite(self, rc, bytes, evt):
202
if self._handleWrite(rc, bytes, evt):
206
def _handleWrite(self, rc, bytes, evt):
208
Returns false if we should stop writing for now
210
if self.disconnected or self._writeDisconnected:
212
# XXX: not handling WSAEWOULDBLOCK
213
# ("too many outstanding overlapped I/O requests")
215
self.connectionLost(failure.Failure(
216
error.ConnectionLost("write error -- %s (%s)" %
217
(errno.errorcode.get(rc, 'unknown'), rc))))
221
# If there is nothing left to send,
222
if self.offset == len(self.dataBuffer) and not self._tempDataLen:
227
# If I've got a producer who is supposed to supply me with data
228
if self.producer is not None and ((not self.streamingProducer)
229
or self.producerPaused):
230
# tell them to supply some more.
231
self.producerPaused = True
232
self.producer.resumeProducing()
233
elif self.disconnecting:
234
# But if I was previously asked to let the connection die,
236
self.connectionLost(failure.Failure(main.CONNECTION_DONE))
237
elif self._writeDisconnecting:
238
# I was previously asked to to half-close the connection.
239
self._closeWriteConnection()
240
self._writeDisconnected = True
249
if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
250
# If there is currently less than SEND_LIMIT bytes left to send
251
# in the string, extend it with the array data.
252
self.dataBuffer = (buffer(self.dataBuffer, self.offset) +
253
"".join(self._tempDataBuffer))
255
self._tempDataBuffer = []
256
self._tempDataLen = 0
258
evt = _iocp.Event(self._cbWrite, self)
260
# Send as much data as you can.
262
evt.buff = buff = buffer(self.dataBuffer, self.offset)
264
evt.buff = buff = self.dataBuffer
265
rc, bytes = self.writeToHandle(buff, evt)
266
if (rc == ERROR_IO_PENDING
267
or (not rc and numWrites >= self.maxWrites)):
271
if not self._handleWrite(rc, bytes, evt):
276
def writeToHandle(self, buff, evt):
277
raise NotImplementedError() # TODO: this should default to WriteFile
280
def write(self, data):
281
"""Reliably write some data.
283
The data is buffered until his file descriptor is ready for writing.
285
if isinstance(data, unicode): # no, really, I mean it
286
raise TypeError("Data must not be unicode")
287
if not self.connected or self._writeDisconnected:
290
self._tempDataBuffer.append(data)
291
self._tempDataLen += len(data)
292
if self.producer is not None:
293
if (len(self.dataBuffer) + self._tempDataLen
294
> self.writeBufferSize):
295
self.producerPaused = True
296
self.producer.pauseProducing()
300
def writeSequence(self, iovec):
301
if not self.connected or not iovec or self._writeDisconnected:
303
self._tempDataBuffer.extend(iovec)
305
self._tempDataLen += len(i)
306
if self.producer is not None:
307
if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize:
308
self.producerPaused = True
309
self.producer.pauseProducing()
316
disconnecting = False
317
logstr = "Uninitialized"
319
SEND_LIMIT = 128*1024
324
def __init__(self, reactor = None):
326
from twisted.internet import reactor
327
self.reactor = reactor
328
self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
329
self._tempDataLen = 0
330
self._readBuffers = [_iocp.AllocateReadBuffer(self.readBufferSize)]
333
def connectionLost(self, reason):
335
The connection was lost.
337
This is called when the connection on a selectable object has been
338
lost. It will be called whether the connection was closed explicitly,
339
an exception occurred in an event handler, or the other end of the
340
connection closed it first.
342
Clean up state here, but make sure to call back up to FileDescriptor.
345
self.disconnected = True
346
self.connected = False
347
if self.producer is not None:
348
self.producer.stopProducing()
352
self.reactor.removeActiveHandle(self)
355
def getFileHandle(self):
359
def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
361
Close the connection at the next available opportunity.
363
Call this to cause this FileDescriptor to lose its connection. It will
364
first write any data that it has buffered.
366
If there is data buffered yet to be written, this method will cause the
367
transport to lose its connection as soon as it's done flushing its
368
write buffer. If you have a producer registered, the connection won't
369
be closed until the producer is finished. Therefore, make sure you
370
unregister your producer when it's finished, or the connection will
374
if self.connected and not self.disconnecting:
375
if self._writeDisconnected:
376
# doWrite won't trigger the connection close anymore
379
self.connectionLost(_connDone)
383
self.disconnecting = 1
386
# Producer/consumer implementation
388
producerPaused = False
389
streamingProducer = False
391
# first, the consumer stuff. This requires no additional work, as
392
# any object you can write to can be a consumer, really.
397
def registerProducer(self, producer, streaming):
399
Register to receive data from a producer.
401
This sets this selectable to be a consumer for a producer. When this
402
selectable runs out of data on a write() call, it will ask the producer
403
to resumeProducing(). A producer should implement the IProducer
406
FileDescriptor provides some infrastructure for producer methods.
408
if self.producer is not None:
410
"Cannot register producer %s, because producer "
411
"%s was never unregistered." % (producer, self.producer))
412
if self.disconnected:
413
producer.stopProducing()
415
self.producer = producer
416
self.streamingProducer = streaming
418
producer.resumeProducing()
421
def unregisterProducer(self):
423
Stop consuming data from a producer, without disconnecting.
428
def stopConsuming(self):
432
This is called when a producer has lost its connection, to tell the
433
consumer to go lose its connection (and break potential circular
436
self.unregisterProducer()
437
self.loseConnection()
440
# producer interface implementation
442
def resumeProducing(self):
443
assert self.connected and not self.disconnecting
447
def pauseProducing(self):
451
def stopProducing(self):
452
self.loseConnection()
455
__all__ = ['FileHandle']