1
# -*- test-case-name: twisted.internet.test.test_iocp -*-
2
# Copyright (c) 2008-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Reactor that uses IO completion ports
9
import warnings, socket, sys
11
from zope.interface import implements
13
from twisted.internet import base, interfaces, main, error
14
from twisted.python import log, failure
15
from twisted.internet._dumbwin32proc import Process
17
from twisted.internet.iocpreactor import iocpsupport as _iocp
18
from twisted.internet.iocpreactor.const import WAIT_TIMEOUT
19
from twisted.internet.iocpreactor import tcp, udp
22
from twisted.protocols.tls import TLSMemoryBIOFactory
24
# Either pyOpenSSL isn't installed, or it is too old for this code to work.
25
# The reactor won't provide IReactorSSL.
26
TLSMemoryBIOFactory = None
29
"pyOpenSSL 0.10 or newer is required for SSL support in iocpreactor. "
30
"It is missing, so the reactor will not support SSL APIs.")
32
_extraInterfaces = (interfaces.IReactorSSL,)
34
from twisted.python.compat import set
36
MAX_TIMEOUT = 2000 # 2 seconds, see doIteration for explanation
38
EVENTS_PER_LOOP = 1000 # XXX: what's a good value here?
40
# keys to associate with normal and waker events
41
KEY_NORMAL, KEY_WAKEUP = range(2)
43
_NO_GETHANDLE = error.ConnectionFdescWentAway(
44
'Handler has no getFileHandle method')
45
_NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
49
class IOCPReactor(base._SignalReactorMixin, base.ReactorBase):
50
implements(interfaces.IReactorTCP, interfaces.IReactorUDP,
51
interfaces.IReactorMulticast, interfaces.IReactorProcess,
57
base.ReactorBase.__init__(self)
58
self.port = _iocp.CompletionPort()
62
def addActiveHandle(self, handle):
63
self.handles.add(handle)
66
def removeActiveHandle(self, handle):
67
self.handles.discard(handle)
70
def doIteration(self, timeout):
71
# This function sits and waits for an IO completion event.
73
# There are two requirements: process IO events as soon as they arrive
74
# and process ctrl-break from the user in a reasonable amount of time.
76
# There are three kinds of waiting.
77
# 1) GetQueuedCompletionStatus (self.port.getEvent) to wait for IO
79
# 2) Msg* family of wait functions that can stop waiting when
80
# ctrl-break is detected (then, I think, Python converts it into a
82
# 3) *Ex family of wait functions that put the thread into an
83
# "alertable" wait state which is supposedly triggered by IO completion
85
# 2) and 3) can be combined. Trouble is, my IO completion is not
86
# causing 3) to trigger, possibly because I do not use an IO completion
87
# callback. Windows is weird.
88
# There are two ways to handle this. I could use MsgWaitForSingleObject
89
# here and GetQueuedCompletionStatus in a thread. Or I could poll with
90
# a reasonable interval. Guess what! Threads are hard.
96
timeout = min(MAX_TIMEOUT, int(1000*timeout))
97
rc, bytes, key, evt = self.port.getEvent(timeout)
98
while processed_events < EVENTS_PER_LOOP:
99
if rc == WAIT_TIMEOUT:
101
if key != KEY_WAKEUP:
102
assert key == KEY_NORMAL
104
log.callWithLogger(evt.owner, self._callEventCallback,
106
processed_events += 1
107
rc, bytes, key, evt = self.port.getEvent(0)
110
def _callEventCallback(self, rc, bytes, evt):
114
evt.callback(rc, bytes, evt)
115
handfn = getattr(owner, 'getFileHandle', None)
121
return # ignore handles that were closed
123
why = sys.exc_info()[1]
126
owner.loseConnection(failure.Failure(why))
129
def installWaker(self):
134
self.port.postEvent(0, KEY_WAKEUP, None)
137
def registerHandle(self, handle):
138
self.port.addHandle(handle, KEY_NORMAL)
141
def createSocket(self, af, stype):
142
skt = socket.socket(af, stype)
143
self.registerHandle(skt.fileno())
147
def listenTCP(self, port, factory, backlog=50, interface=''):
149
@see: twisted.internet.interfaces.IReactorTCP.listenTCP
151
p = tcp.Port(port, factory, backlog, interface, self)
156
def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
158
@see: twisted.internet.interfaces.IReactorTCP.connectTCP
160
c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
165
if TLSMemoryBIOFactory is not None:
166
def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
168
@see: twisted.internet.interfaces.IReactorSSL.listenSSL
170
return self.listenTCP(
172
TLSMemoryBIOFactory(contextFactory, False, factory),
176
def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
178
@see: twisted.internet.interfaces.IReactorSSL.connectSSL
180
return self.connectTCP(
182
TLSMemoryBIOFactory(contextFactory, True, factory),
183
timeout, bindAddress)
185
def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
187
Non-implementation of L{IReactorSSL.listenSSL}. Some dependency
188
is not satisfied. This implementation always raises
189
L{NotImplementedError}.
191
raise NotImplementedError(
192
"pyOpenSSL 0.10 or newer is required for SSL support in "
193
"iocpreactor. It is missing, so the reactor does not support "
197
def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
199
Non-implementation of L{IReactorSSL.connectSSL}. Some dependency
200
is not satisfied. This implementation always raises
201
L{NotImplementedError}.
203
raise NotImplementedError(
204
"pyOpenSSL 0.10 or newer is required for SSL support in "
205
"iocpreactor. It is missing, so the reactor does not support "
209
def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
211
Connects a given L{DatagramProtocol} to the given numeric UDP port.
213
@returns: object conforming to L{IListeningPort}.
215
p = udp.Port(port, protocol, interface, maxPacketSize, self)
220
def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192,
221
listenMultiple=False):
223
Connects a given DatagramProtocol to the given numeric UDP port.
227
@returns: object conforming to IListeningPort.
229
p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self,
235
def spawnProcess(self, processProtocol, executable, args=(), env={},
236
path=None, uid=None, gid=None, usePTY=0, childFDs=None):
241
raise ValueError("Setting UID is unsupported on this platform.")
243
raise ValueError("Setting GID is unsupported on this platform.")
245
raise ValueError("PTYs are unsupported on this platform.")
246
if childFDs is not None:
248
"Custom child file descriptor mappings are unsupported on "
250
args, env = self._checkProcessArgs(args, env)
251
return Process(self, processProtocol, executable, args, env, path)
255
res = list(self.handles)
263
main.installReactor(r)
266
__all__ = ['IOCPReactor', 'install']