~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/_pollingfile.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.internet.test.test_pollingfile -*-
 
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
Implements a simple polling interface for file descriptors that don't work with
 
7
select() - this is pretty much only useful on Windows.
 
8
"""
 
9
 
 
10
from zope.interface import implements
 
11
 
 
12
from twisted.internet.interfaces import IConsumer, IPushProducer
 
13
 
 
14
 
 
15
MIN_TIMEOUT = 0.000000001
 
16
MAX_TIMEOUT = 0.1
 
17
 
 
18
 
 
19
 
 
20
class _PollableResource:
 
21
    active = True
 
22
 
 
23
    def activate(self):
 
24
        self.active = True
 
25
 
 
26
 
 
27
    def deactivate(self):
 
28
        self.active = False
 
29
 
 
30
 
 
31
 
 
32
class _PollingTimer:
 
33
    # Everything is private here because it is really an implementation detail.
 
34
 
 
35
    def __init__(self, reactor):
 
36
        self.reactor = reactor
 
37
        self._resources = []
 
38
        self._pollTimer = None
 
39
        self._currentTimeout = MAX_TIMEOUT
 
40
        self._paused = False
 
41
 
 
42
    def _addPollableResource(self, res):
 
43
        self._resources.append(res)
 
44
        self._checkPollingState()
 
45
 
 
46
    def _checkPollingState(self):
 
47
        for resource in self._resources:
 
48
            if resource.active:
 
49
                self._startPolling()
 
50
                break
 
51
        else:
 
52
            self._stopPolling()
 
53
 
 
54
    def _startPolling(self):
 
55
        if self._pollTimer is None:
 
56
            self._pollTimer = self._reschedule()
 
57
 
 
58
    def _stopPolling(self):
 
59
        if self._pollTimer is not None:
 
60
            self._pollTimer.cancel()
 
61
            self._pollTimer = None
 
62
 
 
63
    def _pause(self):
 
64
        self._paused = True
 
65
 
 
66
    def _unpause(self):
 
67
        self._paused = False
 
68
        self._checkPollingState()
 
69
 
 
70
    def _reschedule(self):
 
71
        if not self._paused:
 
72
            return self.reactor.callLater(self._currentTimeout, self._pollEvent)
 
73
 
 
74
    def _pollEvent(self):
 
75
        workUnits = 0.
 
76
        anyActive = []
 
77
        for resource in self._resources:
 
78
            if resource.active:
 
79
                workUnits += resource.checkWork()
 
80
                # Check AFTER work has been done
 
81
                if resource.active:
 
82
                    anyActive.append(resource)
 
83
 
 
84
        newTimeout = self._currentTimeout
 
85
        if workUnits:
 
86
            newTimeout = self._currentTimeout / (workUnits + 1.)
 
87
            if newTimeout < MIN_TIMEOUT:
 
88
                newTimeout = MIN_TIMEOUT
 
89
        else:
 
90
            newTimeout = self._currentTimeout * 2.
 
91
            if newTimeout > MAX_TIMEOUT:
 
92
                newTimeout = MAX_TIMEOUT
 
93
        self._currentTimeout = newTimeout
 
94
        if anyActive:
 
95
            self._pollTimer = self._reschedule()
 
96
 
 
97
 
 
98
# If we ever (let's hope not) need the above functionality on UNIX, this could
 
99
# be factored into a different module.
 
100
 
 
101
import win32pipe
 
102
import win32file
 
103
import win32api
 
104
import pywintypes
 
105
 
 
106
class _PollableReadPipe(_PollableResource):
 
107
 
 
108
    implements(IPushProducer)
 
109
 
 
110
    def __init__(self, pipe, receivedCallback, lostCallback):
 
111
        # security attributes for pipes
 
112
        self.pipe = pipe
 
113
        self.receivedCallback = receivedCallback
 
114
        self.lostCallback = lostCallback
 
115
 
 
116
    def checkWork(self):
 
117
        finished = 0
 
118
        fullDataRead = []
 
119
 
 
120
        while 1:
 
121
            try:
 
122
                buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
 
123
                # finished = (result == -1)
 
124
                if not bytesToRead:
 
125
                    break
 
126
                hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
 
127
                fullDataRead.append(data)
 
128
            except win32api.error:
 
129
                finished = 1
 
130
                break
 
131
 
 
132
        dataBuf = ''.join(fullDataRead)
 
133
        if dataBuf:
 
134
            self.receivedCallback(dataBuf)
 
135
        if finished:
 
136
            self.cleanup()
 
137
        return len(dataBuf)
 
138
 
 
139
    def cleanup(self):
 
140
        self.deactivate()
 
141
        self.lostCallback()
 
142
 
 
143
    def close(self):
 
144
        try:
 
145
            win32api.CloseHandle(self.pipe)
 
146
        except pywintypes.error:
 
147
            # You can't close std handles...?
 
148
            pass
 
149
 
 
150
    def stopProducing(self):
 
151
        self.close()
 
152
 
 
153
    def pauseProducing(self):
 
154
        self.deactivate()
 
155
 
 
156
    def resumeProducing(self):
 
157
        self.activate()
 
158
 
 
159
 
 
160
FULL_BUFFER_SIZE = 64 * 1024
 
161
 
 
162
class _PollableWritePipe(_PollableResource):
 
163
 
 
164
    implements(IConsumer)
 
165
 
 
166
    def __init__(self, writePipe, lostCallback):
 
167
        self.disconnecting = False
 
168
        self.producer = None
 
169
        self.producerPaused = 0
 
170
        self.streamingProducer = 0
 
171
        self.outQueue = []
 
172
        self.writePipe = writePipe
 
173
        self.lostCallback = lostCallback
 
174
        try:
 
175
            win32pipe.SetNamedPipeHandleState(writePipe,
 
176
                                              win32pipe.PIPE_NOWAIT,
 
177
                                              None,
 
178
                                              None)
 
179
        except pywintypes.error:
 
180
            # Maybe it's an invalid handle.  Who knows.
 
181
            pass
 
182
 
 
183
    def close(self):
 
184
        self.disconnecting = True
 
185
 
 
186
    def bufferFull(self):
 
187
        if self.producer is not None:
 
188
            self.producerPaused = 1
 
189
            self.producer.pauseProducing()
 
190
 
 
191
    def bufferEmpty(self):
 
192
        if self.producer is not None and ((not self.streamingProducer) or
 
193
                                          self.producerPaused):
 
194
            self.producer.producerPaused = 0
 
195
            self.producer.resumeProducing()
 
196
            return True
 
197
        return False
 
198
 
 
199
    # almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh
 
200
 
 
201
    def registerProducer(self, producer, streaming):
 
202
        """Register to receive data from a producer.
 
203
 
 
204
        This sets this selectable to be a consumer for a producer.  When this
 
205
        selectable runs out of data on a write() call, it will ask the producer
 
206
        to resumeProducing(). A producer should implement the IProducer
 
207
        interface.
 
208
 
 
209
        FileDescriptor provides some infrastructure for producer methods.
 
210
        """
 
211
        if self.producer is not None:
 
212
            raise RuntimeError(
 
213
                "Cannot register producer %s, because producer %s was never "
 
214
                "unregistered." % (producer, self.producer))
 
215
        if not self.active:
 
216
            producer.stopProducing()
 
217
        else:
 
218
            self.producer = producer
 
219
            self.streamingProducer = streaming
 
220
            if not streaming:
 
221
                producer.resumeProducing()
 
222
 
 
223
    def unregisterProducer(self):
 
224
        """Stop consuming data from a producer, without disconnecting.
 
225
        """
 
226
        self.producer = None
 
227
 
 
228
    def writeConnectionLost(self):
 
229
        self.deactivate()
 
230
        try:
 
231
            win32api.CloseHandle(self.writePipe)
 
232
        except pywintypes.error:
 
233
            # OMG what
 
234
            pass
 
235
        self.lostCallback()
 
236
 
 
237
    def writeSequence(self, seq):
 
238
        self.outQueue.extend(seq)
 
239
 
 
240
    def write(self, data):
 
241
        if self.disconnecting:
 
242
            return
 
243
        self.outQueue.append(data)
 
244
        if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
 
245
            self.bufferFull()
 
246
 
 
247
    def checkWork(self):
 
248
        numBytesWritten = 0
 
249
        if not self.outQueue:
 
250
            if self.disconnecting:
 
251
                self.writeConnectionLost()
 
252
                return 0
 
253
            try:
 
254
                win32file.WriteFile(self.writePipe, '', None)
 
255
            except pywintypes.error:
 
256
                self.writeConnectionLost()
 
257
                return numBytesWritten
 
258
        while self.outQueue:
 
259
            data = self.outQueue.pop(0)
 
260
            errCode = 0
 
261
            if isinstance(data, unicode):
 
262
                raise TypeError("unicode not allowed")
 
263
            try:
 
264
                errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
 
265
                                                             data, None)
 
266
            except win32api.error:
 
267
                self.writeConnectionLost()
 
268
                break
 
269
            else:
 
270
                # assert not errCode, "wtf an error code???"
 
271
                numBytesWritten += nBytesWritten
 
272
                if len(data) > nBytesWritten:
 
273
                    self.outQueue.insert(0, data[nBytesWritten:])
 
274
                    break
 
275
        else:
 
276
            resumed = self.bufferEmpty()
 
277
            if not resumed and self.disconnecting:
 
278
                self.writeConnectionLost()
 
279
        return numBytesWritten