~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/iocpreactor/reactor.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.internet.test.test_iocp -*-
 
2
# Copyright (c) 2008-2009 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
Reactor that uses IO completion ports
 
7
"""
 
8
 
 
9
import warnings, socket, sys
 
10
 
 
11
from zope.interface import implements
 
12
 
 
13
from twisted.internet import base, interfaces, main, error
 
14
from twisted.python import log, failure
 
15
from twisted.internet._dumbwin32proc import Process
 
16
 
 
17
from twisted.internet.iocpreactor import iocpsupport as _iocp
 
18
from twisted.internet.iocpreactor.const import WAIT_TIMEOUT
 
19
from twisted.internet.iocpreactor import tcp, udp
 
20
 
 
21
try:
 
22
    from twisted.protocols.tls import TLSMemoryBIOFactory
 
23
except ImportError:
 
24
    # Either pyOpenSSL isn't installed, or it is too old for this code to work.
 
25
    # The reactor won't provide IReactorSSL.
 
26
    TLSMemoryBIOFactory = None
 
27
    _extraInterfaces = ()
 
28
    warnings.warn(
 
29
        "pyOpenSSL 0.10 or newer is required for SSL support in iocpreactor. "
 
30
        "It is missing, so the reactor will not support SSL APIs.")
 
31
else:
 
32
    _extraInterfaces = (interfaces.IReactorSSL,)
 
33
 
 
34
from twisted.python.compat import set
 
35
 
 
36
MAX_TIMEOUT = 2000 # 2 seconds, see doIteration for explanation
 
37
 
 
38
EVENTS_PER_LOOP = 1000 # XXX: what's a good value here?
 
39
 
 
40
# keys to associate with normal and waker events
 
41
KEY_NORMAL, KEY_WAKEUP = range(2)
 
42
 
 
43
_NO_GETHANDLE = error.ConnectionFdescWentAway(
 
44
                    'Handler has no getFileHandle method')
 
45
_NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
 
46
 
 
47
 
 
48
 
 
49
class IOCPReactor(base._SignalReactorMixin, base.ReactorBase):
 
50
    implements(interfaces.IReactorTCP, interfaces.IReactorUDP,
 
51
               interfaces.IReactorMulticast, interfaces.IReactorProcess,
 
52
               *_extraInterfaces)
 
53
 
 
54
    port = None
 
55
 
 
56
    def __init__(self):
 
57
        base.ReactorBase.__init__(self)
 
58
        self.port = _iocp.CompletionPort()
 
59
        self.handles = set()
 
60
 
 
61
 
 
62
    def addActiveHandle(self, handle):
 
63
        self.handles.add(handle)
 
64
 
 
65
 
 
66
    def removeActiveHandle(self, handle):
 
67
        self.handles.discard(handle)
 
68
 
 
69
 
 
70
    def doIteration(self, timeout):
 
71
        # This function sits and waits for an IO completion event.
 
72
        #
 
73
        # There are two requirements: process IO events as soon as they arrive
 
74
        # and process ctrl-break from the user in a reasonable amount of time.
 
75
        #
 
76
        # There are three kinds of waiting.
 
77
        # 1) GetQueuedCompletionStatus (self.port.getEvent) to wait for IO
 
78
        # events only.
 
79
        # 2) Msg* family of wait functions that can stop waiting when
 
80
        # ctrl-break is detected (then, I think, Python converts it into a
 
81
        # KeyboardInterrupt)
 
82
        # 3) *Ex family of wait functions that put the thread into an
 
83
        # "alertable" wait state which is supposedly triggered by IO completion
 
84
        #
 
85
        # 2) and 3) can be combined. Trouble is, my IO completion is not
 
86
        # causing 3) to trigger, possibly because I do not use an IO completion
 
87
        # callback. Windows is weird.
 
88
        # There are two ways to handle this. I could use MsgWaitForSingleObject
 
89
        # here and GetQueuedCompletionStatus in a thread. Or I could poll with
 
90
        # a reasonable interval. Guess what! Threads are hard.
 
91
 
 
92
        processed_events = 0
 
93
        if timeout is None:
 
94
            timeout = MAX_TIMEOUT
 
95
        else:
 
96
            timeout = min(MAX_TIMEOUT, int(1000*timeout))
 
97
        rc, bytes, key, evt = self.port.getEvent(timeout)
 
98
        while processed_events < EVENTS_PER_LOOP:
 
99
            if rc == WAIT_TIMEOUT:
 
100
                break
 
101
            if key != KEY_WAKEUP:
 
102
                assert key == KEY_NORMAL
 
103
                if not evt.ignore:
 
104
                    log.callWithLogger(evt.owner, self._callEventCallback,
 
105
                                       rc, bytes, evt)
 
106
                    processed_events += 1
 
107
            rc, bytes, key, evt = self.port.getEvent(0)
 
108
 
 
109
 
 
110
    def _callEventCallback(self, rc, bytes, evt):
 
111
        owner = evt.owner
 
112
        why = None
 
113
        try:
 
114
            evt.callback(rc, bytes, evt)
 
115
            handfn = getattr(owner, 'getFileHandle', None)
 
116
            if not handfn:
 
117
                why = _NO_GETHANDLE
 
118
            elif handfn() == -1:
 
119
                why = _NO_FILEDESC
 
120
            if why:
 
121
                return # ignore handles that were closed
 
122
        except:
 
123
            why = sys.exc_info()[1]
 
124
            log.err()
 
125
        if why:
 
126
            owner.loseConnection(failure.Failure(why))
 
127
 
 
128
 
 
129
    def installWaker(self):
 
130
        pass
 
131
 
 
132
 
 
133
    def wakeUp(self):
 
134
        self.port.postEvent(0, KEY_WAKEUP, None)
 
135
 
 
136
 
 
137
    def registerHandle(self, handle):
 
138
        self.port.addHandle(handle, KEY_NORMAL)
 
139
 
 
140
 
 
141
    def createSocket(self, af, stype):
 
142
        skt = socket.socket(af, stype)
 
143
        self.registerHandle(skt.fileno())
 
144
        return skt
 
145
 
 
146
 
 
147
    def listenTCP(self, port, factory, backlog=50, interface=''):
 
148
        """
 
149
        @see: twisted.internet.interfaces.IReactorTCP.listenTCP
 
150
        """
 
151
        p = tcp.Port(port, factory, backlog, interface, self)
 
152
        p.startListening()
 
153
        return p
 
154
 
 
155
 
 
156
    def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
 
157
        """
 
158
        @see: twisted.internet.interfaces.IReactorTCP.connectTCP
 
159
        """
 
160
        c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
 
161
        c.connect()
 
162
        return c
 
163
 
 
164
 
 
165
    if TLSMemoryBIOFactory is not None:
 
166
        def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
 
167
            """
 
168
            @see: twisted.internet.interfaces.IReactorSSL.listenSSL
 
169
            """
 
170
            return self.listenTCP(
 
171
                port,
 
172
                TLSMemoryBIOFactory(contextFactory, False, factory),
 
173
                backlog, interface)
 
174
 
 
175
 
 
176
        def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
 
177
            """
 
178
            @see: twisted.internet.interfaces.IReactorSSL.connectSSL
 
179
            """
 
180
            return self.connectTCP(
 
181
                host, port,
 
182
                TLSMemoryBIOFactory(contextFactory, True, factory),
 
183
                timeout, bindAddress)
 
184
    else:
 
185
        def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
 
186
            """
 
187
            Non-implementation of L{IReactorSSL.listenSSL}.  Some dependency
 
188
            is not satisfied.  This implementation always raises
 
189
            L{NotImplementedError}.
 
190
            """
 
191
            raise NotImplementedError(
 
192
                "pyOpenSSL 0.10 or newer is required for SSL support in "
 
193
                "iocpreactor. It is missing, so the reactor does not support "
 
194
                "SSL APIs.")
 
195
 
 
196
 
 
197
        def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
 
198
            """
 
199
            Non-implementation of L{IReactorSSL.connectSSL}.  Some dependency
 
200
            is not satisfied.  This implementation always raises
 
201
            L{NotImplementedError}.
 
202
            """
 
203
            raise NotImplementedError(
 
204
                "pyOpenSSL 0.10 or newer is required for SSL support in "
 
205
                "iocpreactor. It is missing, so the reactor does not support "
 
206
                "SSL APIs.")
 
207
 
 
208
 
 
209
    def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
 
210
        """
 
211
        Connects a given L{DatagramProtocol} to the given numeric UDP port.
 
212
 
 
213
        @returns: object conforming to L{IListeningPort}.
 
214
        """
 
215
        p = udp.Port(port, protocol, interface, maxPacketSize, self)
 
216
        p.startListening()
 
217
        return p
 
218
 
 
219
 
 
220
    def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192,
 
221
                        listenMultiple=False):
 
222
        """
 
223
        Connects a given DatagramProtocol to the given numeric UDP port.
 
224
 
 
225
        EXPERIMENTAL.
 
226
 
 
227
        @returns: object conforming to IListeningPort.
 
228
        """
 
229
        p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self,
 
230
                              listenMultiple)
 
231
        p.startListening()
 
232
        return p
 
233
 
 
234
 
 
235
    def spawnProcess(self, processProtocol, executable, args=(), env={},
 
236
                     path=None, uid=None, gid=None, usePTY=0, childFDs=None):
 
237
        """
 
238
        Spawn a process.
 
239
        """
 
240
        if uid is not None:
 
241
            raise ValueError("Setting UID is unsupported on this platform.")
 
242
        if gid is not None:
 
243
            raise ValueError("Setting GID is unsupported on this platform.")
 
244
        if usePTY:
 
245
            raise ValueError("PTYs are unsupported on this platform.")
 
246
        if childFDs is not None:
 
247
            raise ValueError(
 
248
                "Custom child file descriptor mappings are unsupported on "
 
249
                "this platform.")
 
250
        args, env = self._checkProcessArgs(args, env)
 
251
        return Process(self, processProtocol, executable, args, env, path)
 
252
 
 
253
 
 
254
    def removeAll(self):
 
255
        res = list(self.handles)
 
256
        self.handles.clear()
 
257
        return res
 
258
 
 
259
 
 
260
 
 
261
def install():
 
262
    r = IOCPReactor()
 
263
    main.installReactor(r)
 
264
 
 
265
 
 
266
__all__ = ['IOCPReactor', 'install']
 
267