1
# -*- test-case-name: twisted.internet.test.test_pollingfile -*-
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Implements a simple polling interface for file descriptors that don't work with
7
select() - this is pretty much only useful on Windows.
10
from zope.interface import implements
12
from twisted.internet.interfaces import IConsumer, IPushProducer
15
MIN_TIMEOUT = 0.000000001
20
class _PollableResource:
33
# Everything is private here because it is really an implementation detail.
35
def __init__(self, reactor):
36
self.reactor = reactor
38
self._pollTimer = None
39
self._currentTimeout = MAX_TIMEOUT
42
def _addPollableResource(self, res):
43
self._resources.append(res)
44
self._checkPollingState()
46
def _checkPollingState(self):
47
for resource in self._resources:
54
def _startPolling(self):
55
if self._pollTimer is None:
56
self._pollTimer = self._reschedule()
58
def _stopPolling(self):
59
if self._pollTimer is not None:
60
self._pollTimer.cancel()
61
self._pollTimer = None
68
self._checkPollingState()
70
def _reschedule(self):
72
return self.reactor.callLater(self._currentTimeout, self._pollEvent)
77
for resource in self._resources:
79
workUnits += resource.checkWork()
80
# Check AFTER work has been done
82
anyActive.append(resource)
84
newTimeout = self._currentTimeout
86
newTimeout = self._currentTimeout / (workUnits + 1.)
87
if newTimeout < MIN_TIMEOUT:
88
newTimeout = MIN_TIMEOUT
90
newTimeout = self._currentTimeout * 2.
91
if newTimeout > MAX_TIMEOUT:
92
newTimeout = MAX_TIMEOUT
93
self._currentTimeout = newTimeout
95
self._pollTimer = self._reschedule()
98
# If we ever (let's hope not) need the above functionality on UNIX, this could
99
# be factored into a different module.
106
class _PollableReadPipe(_PollableResource):
108
implements(IPushProducer)
110
def __init__(self, pipe, receivedCallback, lostCallback):
111
# security attributes for pipes
113
self.receivedCallback = receivedCallback
114
self.lostCallback = lostCallback
122
buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
123
# finished = (result == -1)
126
hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
127
fullDataRead.append(data)
128
except win32api.error:
132
dataBuf = ''.join(fullDataRead)
134
self.receivedCallback(dataBuf)
145
win32api.CloseHandle(self.pipe)
146
except pywintypes.error:
147
# You can't close std handles...?
150
def stopProducing(self):
153
def pauseProducing(self):
156
def resumeProducing(self):
160
FULL_BUFFER_SIZE = 64 * 1024
162
class _PollableWritePipe(_PollableResource):
164
implements(IConsumer)
166
def __init__(self, writePipe, lostCallback):
167
self.disconnecting = False
169
self.producerPaused = 0
170
self.streamingProducer = 0
172
self.writePipe = writePipe
173
self.lostCallback = lostCallback
175
win32pipe.SetNamedPipeHandleState(writePipe,
176
win32pipe.PIPE_NOWAIT,
179
except pywintypes.error:
180
# Maybe it's an invalid handle. Who knows.
184
self.disconnecting = True
186
def bufferFull(self):
187
if self.producer is not None:
188
self.producerPaused = 1
189
self.producer.pauseProducing()
191
def bufferEmpty(self):
192
if self.producer is not None and ((not self.streamingProducer) or
193
self.producerPaused):
194
self.producer.producerPaused = 0
195
self.producer.resumeProducing()
199
# almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh
201
def registerProducer(self, producer, streaming):
202
"""Register to receive data from a producer.
204
This sets this selectable to be a consumer for a producer. When this
205
selectable runs out of data on a write() call, it will ask the producer
206
to resumeProducing(). A producer should implement the IProducer
209
FileDescriptor provides some infrastructure for producer methods.
211
if self.producer is not None:
213
"Cannot register producer %s, because producer %s was never "
214
"unregistered." % (producer, self.producer))
216
producer.stopProducing()
218
self.producer = producer
219
self.streamingProducer = streaming
221
producer.resumeProducing()
223
def unregisterProducer(self):
224
"""Stop consuming data from a producer, without disconnecting.
228
def writeConnectionLost(self):
231
win32api.CloseHandle(self.writePipe)
232
except pywintypes.error:
237
def writeSequence(self, seq):
238
self.outQueue.extend(seq)
240
def write(self, data):
241
if self.disconnecting:
243
self.outQueue.append(data)
244
if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
249
if not self.outQueue:
250
if self.disconnecting:
251
self.writeConnectionLost()
254
win32file.WriteFile(self.writePipe, '', None)
255
except pywintypes.error:
256
self.writeConnectionLost()
257
return numBytesWritten
259
data = self.outQueue.pop(0)
261
if isinstance(data, unicode):
262
raise TypeError("unicode not allowed")
264
errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
266
except win32api.error:
267
self.writeConnectionLost()
270
# assert not errCode, "wtf an error code???"
271
numBytesWritten += nBytesWritten
272
if len(data) > nBytesWritten:
273
self.outQueue.insert(0, data[nBytesWritten:])
276
resumed = self.bufferEmpty()
277
if not resumed and self.disconnecting:
278
self.writeConnectionLost()
279
return numBytesWritten