~0x44/nova/extdoc

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/iocpreactor/abstract.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2008 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
 
 
5
"""
 
6
Abstract file handle class
 
7
"""
 
8
 
 
9
from twisted.internet import main, error, interfaces
 
10
from twisted.python import log, failure
 
11
from twisted.persisted import styles
 
12
 
 
13
from zope.interface import implements
 
14
import errno
 
15
 
 
16
from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF
 
17
from twisted.internet.iocpreactor.const import ERROR_IO_PENDING
 
18
from twisted.internet.iocpreactor import iocpsupport as _iocp
 
19
 
 
20
 
 
21
 
 
22
class FileHandle(log.Logger, styles.Ephemeral, object):
 
23
    """
 
24
    File handle that can read and write asynchronously
 
25
    """
 
26
    implements(interfaces.IProducer, interfaces.IConsumer,
 
27
               interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
 
28
    # read stuff
 
29
    maxReadBuffers = 16
 
30
    readBufferSize = 4096
 
31
    reading = False
 
32
    dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs
 
33
    _readNextBuffer = 0
 
34
    _readSize = 0 # how much data we have in the read buffer
 
35
    _readScheduled = None
 
36
    _readScheduledInOS = False
 
37
 
 
38
 
 
39
    def startReading(self):
 
40
        self.reactor.addActiveHandle(self)
 
41
        if not self._readScheduled and not self.reading:
 
42
            self.reading = True
 
43
            self._readScheduled = self.reactor.callLater(0,
 
44
                                                         self._resumeReading)
 
45
 
 
46
 
 
47
    def stopReading(self):
 
48
        if self._readScheduled:
 
49
            self._readScheduled.cancel()
 
50
            self._readScheduled = None
 
51
        self.reading = False
 
52
 
 
53
 
 
54
    def _resumeReading(self):
 
55
        self._readScheduled = None
 
56
        if self._dispatchData() and not self._readScheduledInOS:
 
57
            self.doRead()
 
58
 
 
59
 
 
60
    def _dispatchData(self):
 
61
        """
 
62
        Dispatch previously read data. Return True if self.reading and we don't
 
63
        have any more data
 
64
        """
 
65
        if not self._readSize:
 
66
            return self.reading
 
67
        size = self._readSize
 
68
        full_buffers = size // self.readBufferSize
 
69
        while self._readNextBuffer < full_buffers:
 
70
            self.dataReceived(self._readBuffers[self._readNextBuffer])
 
71
            self._readNextBuffer += 1
 
72
            if not self.reading:
 
73
                return False
 
74
        remainder = size % self.readBufferSize
 
75
        if remainder:
 
76
            self.dataReceived(buffer(self._readBuffers[full_buffers],
 
77
                                     0, remainder))
 
78
        if self.dynamicReadBuffers:
 
79
            total_buffer_size = self.readBufferSize * len(self._readBuffers)
 
80
            # we have one buffer too many
 
81
            if size < total_buffer_size - self.readBufferSize:
 
82
                del self._readBuffers[-1]
 
83
            # we filled all buffers, so allocate one more
 
84
            elif (size == total_buffer_size and
 
85
                  len(self._readBuffers) < self.maxReadBuffers):
 
86
                self._readBuffers.append(_iocp.AllocateReadBuffer(
 
87
                                            self.readBufferSize))
 
88
        self._readNextBuffer = 0
 
89
        self._readSize = 0
 
90
        return self.reading
 
91
 
 
92
 
 
93
    def _cbRead(self, rc, bytes, evt):
 
94
        self._readScheduledInOS = False
 
95
        if self._handleRead(rc, bytes, evt):
 
96
            self.doRead()
 
97
 
 
98
 
 
99
    def _handleRead(self, rc, bytes, evt):
 
100
        """
 
101
        Returns False if we should stop reading for now
 
102
        """
 
103
        if self.disconnected:
 
104
            return False
 
105
        # graceful disconnection
 
106
        if (not (rc or bytes)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF):
 
107
            self.reactor.removeActiveHandle(self)
 
108
            self.readConnectionLost(failure.Failure(main.CONNECTION_DONE))
 
109
            return False
 
110
        # XXX: not handling WSAEWOULDBLOCK
 
111
        # ("too many outstanding overlapped I/O requests")
 
112
        elif rc:
 
113
            self.connectionLost(failure.Failure(
 
114
                                error.ConnectionLost("read error -- %s (%s)" %
 
115
                                    (errno.errorcode.get(rc, 'unknown'), rc))))
 
116
            return False
 
117
        else:
 
118
            assert self._readSize == 0
 
119
            assert self._readNextBuffer == 0
 
120
            self._readSize = bytes
 
121
            return self._dispatchData()
 
122
 
 
123
 
 
124
    def doRead(self):
 
125
        numReads = 0
 
126
        while 1:
 
127
            evt = _iocp.Event(self._cbRead, self)
 
128
 
 
129
            evt.buff = buff = self._readBuffers
 
130
            rc, bytes = self.readFromHandle(buff, evt)
 
131
 
 
132
            if (rc == ERROR_IO_PENDING
 
133
                or (not rc and numReads >= self.maxReads)):
 
134
                self._readScheduledInOS = True
 
135
                break
 
136
            else:
 
137
                evt.ignore = True
 
138
                if not self._handleRead(rc, bytes, evt):
 
139
                    break
 
140
            numReads += 1
 
141
 
 
142
 
 
143
    def readFromHandle(self, bufflist, evt):
 
144
        raise NotImplementedError() # TODO: this should default to ReadFile
 
145
 
 
146
 
 
147
    def dataReceived(self, data):
 
148
        raise NotImplementedError
 
149
 
 
150
 
 
151
    def readConnectionLost(self, reason):
 
152
        self.connectionLost(reason)
 
153
 
 
154
 
 
155
    # write stuff
 
156
    dataBuffer = ''
 
157
    offset = 0
 
158
    writing = False
 
159
    _writeScheduled = None
 
160
    _writeDisconnecting = False
 
161
    _writeDisconnected = False
 
162
    writeBufferSize = 2**2**2**2
 
163
    maxWrites = 5
 
164
 
 
165
 
 
166
    def loseWriteConnection(self):
 
167
        self._writeDisconnecting = True
 
168
        self.startWriting()
 
169
 
 
170
 
 
171
    def _closeWriteConnection(self):
 
172
        # override in subclasses
 
173
        pass
 
174
 
 
175
 
 
176
    def writeConnectionLost(self, reason):
 
177
        # in current code should never be called
 
178
        self.connectionLost(reason)
 
179
 
 
180
 
 
181
    def startWriting(self):
 
182
        self.reactor.addActiveHandle(self)
 
183
        self.writing = True
 
184
        if not self._writeScheduled:
 
185
            self._writeScheduled = self.reactor.callLater(0,
 
186
                                                          self._resumeWriting)
 
187
 
 
188
 
 
189
    def stopWriting(self):
 
190
        if self._writeScheduled:
 
191
            self._writeScheduled.cancel()
 
192
            self._writeScheduled = None
 
193
        self.writing = False
 
194
 
 
195
 
 
196
    def _resumeWriting(self):
 
197
        self._writeScheduled = None
 
198
        self.doWrite()
 
199
 
 
200
 
 
201
    def _cbWrite(self, rc, bytes, evt):
 
202
        if self._handleWrite(rc, bytes, evt):
 
203
            self.doWrite()
 
204
 
 
205
 
 
206
    def _handleWrite(self, rc, bytes, evt):
 
207
        """
 
208
        Returns false if we should stop writing for now
 
209
        """
 
210
        if self.disconnected or self._writeDisconnected:
 
211
            return False
 
212
        # XXX: not handling WSAEWOULDBLOCK
 
213
        # ("too many outstanding overlapped I/O requests")
 
214
        if rc:
 
215
            self.connectionLost(failure.Failure(
 
216
                                error.ConnectionLost("write error -- %s (%s)" %
 
217
                                    (errno.errorcode.get(rc, 'unknown'), rc))))
 
218
            return False
 
219
        else:
 
220
            self.offset += bytes
 
221
            # If there is nothing left to send,
 
222
            if self.offset == len(self.dataBuffer) and not self._tempDataLen:
 
223
                self.dataBuffer = ""
 
224
                self.offset = 0
 
225
                # stop writing
 
226
                self.stopWriting()
 
227
                # If I've got a producer who is supposed to supply me with data
 
228
                if self.producer is not None and ((not self.streamingProducer)
 
229
                                                  or self.producerPaused):
 
230
                    # tell them to supply some more.
 
231
                    self.producerPaused = True
 
232
                    self.producer.resumeProducing()
 
233
                elif self.disconnecting:
 
234
                    # But if I was previously asked to let the connection die,
 
235
                    # do so.
 
236
                    self.connectionLost(failure.Failure(main.CONNECTION_DONE))
 
237
                elif self._writeDisconnecting:
 
238
                    # I was previously asked to to half-close the connection.
 
239
                    self._closeWriteConnection()
 
240
                    self._writeDisconnected = True
 
241
                return False
 
242
            else:
 
243
                return True
 
244
 
 
245
 
 
246
    def doWrite(self):
 
247
        numWrites = 0
 
248
        while 1:
 
249
            if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
 
250
                # If there is currently less than SEND_LIMIT bytes left to send
 
251
                # in the string, extend it with the array data.
 
252
                self.dataBuffer = (buffer(self.dataBuffer, self.offset) +
 
253
                                   "".join(self._tempDataBuffer))
 
254
                self.offset = 0
 
255
                self._tempDataBuffer = []
 
256
                self._tempDataLen = 0
 
257
 
 
258
            evt = _iocp.Event(self._cbWrite, self)
 
259
 
 
260
            # Send as much data as you can.
 
261
            if self.offset:
 
262
                evt.buff = buff = buffer(self.dataBuffer, self.offset)
 
263
            else:
 
264
                evt.buff = buff = self.dataBuffer
 
265
            rc, bytes = self.writeToHandle(buff, evt)
 
266
            if (rc == ERROR_IO_PENDING
 
267
                or (not rc and numWrites >= self.maxWrites)):
 
268
                break
 
269
            else:
 
270
                evt.ignore = True
 
271
                if not self._handleWrite(rc, bytes, evt):
 
272
                    break
 
273
            numWrites += 1
 
274
 
 
275
 
 
276
    def writeToHandle(self, buff, evt):
 
277
        raise NotImplementedError() # TODO: this should default to WriteFile
 
278
 
 
279
 
 
280
    def write(self, data):
 
281
        """Reliably write some data.
 
282
 
 
283
        The data is buffered until his file descriptor is ready for writing.
 
284
        """
 
285
        if isinstance(data, unicode): # no, really, I mean it
 
286
            raise TypeError("Data must not be unicode")
 
287
        if not self.connected or self._writeDisconnected:
 
288
            return
 
289
        if data:
 
290
            self._tempDataBuffer.append(data)
 
291
            self._tempDataLen += len(data)
 
292
            if self.producer is not None:
 
293
                if (len(self.dataBuffer) + self._tempDataLen
 
294
                    > self.writeBufferSize):
 
295
                    self.producerPaused = True
 
296
                    self.producer.pauseProducing()
 
297
            self.startWriting()
 
298
 
 
299
 
 
300
    def writeSequence(self, iovec):
 
301
        if not self.connected or not iovec or self._writeDisconnected:
 
302
            return
 
303
        self._tempDataBuffer.extend(iovec)
 
304
        for i in iovec:
 
305
            self._tempDataLen += len(i)
 
306
        if self.producer is not None:
 
307
            if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize:
 
308
                self.producerPaused = True
 
309
                self.producer.pauseProducing()
 
310
        self.startWriting()
 
311
 
 
312
 
 
313
    # general stuff
 
314
    connected = False
 
315
    disconnected = False
 
316
    disconnecting = False
 
317
    logstr = "Uninitialized"
 
318
 
 
319
    SEND_LIMIT = 128*1024
 
320
 
 
321
    maxReads = 5
 
322
 
 
323
 
 
324
    def __init__(self, reactor = None):
 
325
        if not reactor:
 
326
            from twisted.internet import reactor
 
327
        self.reactor = reactor
 
328
        self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
 
329
        self._tempDataLen = 0
 
330
        self._readBuffers = [_iocp.AllocateReadBuffer(self.readBufferSize)]
 
331
 
 
332
 
 
333
    def connectionLost(self, reason):
 
334
        """
 
335
        The connection was lost.
 
336
 
 
337
        This is called when the connection on a selectable object has been
 
338
        lost.  It will be called whether the connection was closed explicitly,
 
339
        an exception occurred in an event handler, or the other end of the
 
340
        connection closed it first.
 
341
 
 
342
        Clean up state here, but make sure to call back up to FileDescriptor.
 
343
        """
 
344
 
 
345
        self.disconnected = True
 
346
        self.connected = False
 
347
        if self.producer is not None:
 
348
            self.producer.stopProducing()
 
349
            self.producer = None
 
350
        self.stopReading()
 
351
        self.stopWriting()
 
352
        self.reactor.removeActiveHandle(self)
 
353
 
 
354
 
 
355
    def getFileHandle(self):
 
356
        return -1
 
357
 
 
358
 
 
359
    def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
 
360
        """
 
361
        Close the connection at the next available opportunity.
 
362
 
 
363
        Call this to cause this FileDescriptor to lose its connection.  It will
 
364
        first write any data that it has buffered.
 
365
 
 
366
        If there is data buffered yet to be written, this method will cause the
 
367
        transport to lose its connection as soon as it's done flushing its
 
368
        write buffer.  If you have a producer registered, the connection won't
 
369
        be closed until the producer is finished. Therefore, make sure you
 
370
        unregister your producer when it's finished, or the connection will
 
371
        never close.
 
372
        """
 
373
 
 
374
        if self.connected and not self.disconnecting:
 
375
            if self._writeDisconnected:
 
376
                # doWrite won't trigger the connection close anymore
 
377
                self.stopReading()
 
378
                self.stopWriting
 
379
                self.connectionLost(_connDone)
 
380
            else:
 
381
                self.stopReading()
 
382
                self.startWriting()
 
383
                self.disconnecting = 1
 
384
 
 
385
 
 
386
    # Producer/consumer implementation
 
387
 
 
388
    producerPaused = False
 
389
    streamingProducer = False
 
390
 
 
391
    # first, the consumer stuff.  This requires no additional work, as
 
392
    # any object you can write to can be a consumer, really.
 
393
 
 
394
    producer = None
 
395
 
 
396
 
 
397
    def registerProducer(self, producer, streaming):
 
398
        """
 
399
        Register to receive data from a producer.
 
400
 
 
401
        This sets this selectable to be a consumer for a producer.  When this
 
402
        selectable runs out of data on a write() call, it will ask the producer
 
403
        to resumeProducing(). A producer should implement the IProducer
 
404
        interface.
 
405
 
 
406
        FileDescriptor provides some infrastructure for producer methods.
 
407
        """
 
408
        if self.producer is not None:
 
409
            raise RuntimeError(
 
410
                "Cannot register producer %s, because producer "
 
411
                "%s was never unregistered." % (producer, self.producer))
 
412
        if self.disconnected:
 
413
            producer.stopProducing()
 
414
        else:
 
415
            self.producer = producer
 
416
            self.streamingProducer = streaming
 
417
            if not streaming:
 
418
                producer.resumeProducing()
 
419
 
 
420
 
 
421
    def unregisterProducer(self):
 
422
        """
 
423
        Stop consuming data from a producer, without disconnecting.
 
424
        """
 
425
        self.producer = None
 
426
 
 
427
 
 
428
    def stopConsuming(self):
 
429
        """
 
430
        Stop consuming data.
 
431
 
 
432
        This is called when a producer has lost its connection, to tell the
 
433
        consumer to go lose its connection (and break potential circular
 
434
        references).
 
435
        """
 
436
        self.unregisterProducer()
 
437
        self.loseConnection()
 
438
 
 
439
 
 
440
    # producer interface implementation
 
441
 
 
442
    def resumeProducing(self):
 
443
        assert self.connected and not self.disconnecting
 
444
        self.startReading()
 
445
 
 
446
 
 
447
    def pauseProducing(self):
 
448
        self.stopReading()
 
449
 
 
450
 
 
451
    def stopProducing(self):
 
452
        self.loseConnection()
 
453
 
 
454
 
 
455
__all__ = ['FileHandle']
 
456