1
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
2
# See LICENSE for details.
6
A win32event based implementation of the Twisted main loop.
8
This requires win32all or ActivePython to be installed.
10
Maintainer: Itamar Shtull-Trauring
14
1. WaitForMultipleObjects and thus the event loop can only handle 64 objects.
15
2. Process running has some problems (see Process docstring).
19
1. Event loop handling of writes is *very* problematic (this is causing failed tests).
20
Switch to doing it the correct way, whatever that means (see below).
21
2. Replace icky socket loopback waker with event based waker (use dummyEvent object)
22
3. Switch everyone to using Free Software so we don't have to deal with proprietary APIs.
25
ALTERNATIVE SOLUTIONS:
26
- IIRC, sockets can only be registered once. So we switch to a structure
27
like the poll() reactor, thus allowing us to deal with write events in
28
a decent fashion. This should allow us to pass tests, but we're still
33
- Instead of doing a reactor, we make this an addon to the select reactor.
34
The WFMO event loop runs in a separate thread. This means no need to maintain
35
separate code for networking, 64 event limit doesn't apply to sockets,
36
we can run processes and other win32 stuff in default event loop. The
37
only problem is that we're stuck with the icky socket based waker.
38
Another benefit is that this could be extended to support >64 events
39
in a simpler manner than the previous solution.
41
The 2nd solution is probably what will get implemented.
48
from zope.interface import implements
51
from win32file import WSAEventSelect, FD_READ, FD_CLOSE, FD_ACCEPT, FD_CONNECT
52
from win32event import CreateEvent, MsgWaitForMultipleObjects
53
from win32event import WAIT_OBJECT_0, WAIT_TIMEOUT, QS_ALLINPUT, QS_ALLEVENTS
58
from twisted.internet import posixbase
59
from twisted.python import log, threadable, failure
60
from twisted.internet.interfaces import IReactorFDSet, IReactorProcess
62
from twisted.internet._dumbwin32proc import Process
65
class Win32Reactor(posixbase.PosixReactorBase):
67
Reactor that uses Win32 event APIs.
69
@ivar _reads: A dictionary mapping L{FileDescriptor} instances to a
70
win32 event object used to check for read events for that descriptor.
72
@ivar _writes: A dictionary mapping L{FileDescriptor} instances to a
73
arbitrary value. Keys in this dictionary will be given a chance to
76
@ivar _events: A dictionary mapping win32 event object to tuples of
77
L{FileDescriptor} instances and event masks.
79
implements(IReactorFDSet, IReactorProcess)
81
dummyEvent = CreateEvent(None, 0, 0, None)
87
posixbase.PosixReactorBase.__init__(self)
90
def _makeSocketEvent(self, fd, action, why):
92
Make a win32 event object for a socket.
94
event = CreateEvent(None, 0, 0, None)
95
WSAEventSelect(fd, event, why)
96
self._events[event] = (fd, action)
100
def addEvent(self, event, fd, action):
102
Add a new win32 event to the event loop.
104
self._events[event] = (fd, action)
107
def removeEvent(self, event):
111
del self._events[event]
114
def addReader(self, reader):
116
Add a socket FileDescriptor for notification of data available to read.
118
if reader not in self._reads:
119
self._reads[reader] = self._makeSocketEvent(
120
reader, 'doRead', FD_READ | FD_ACCEPT | FD_CONNECT | FD_CLOSE)
122
def addWriter(self, writer):
124
Add a socket FileDescriptor for notification of data available to write.
126
if writer not in self._writes:
127
self._writes[writer] = 1
129
def removeReader(self, reader):
130
"""Remove a Selectable for notification of data available to read.
132
if reader in self._reads:
133
del self._events[self._reads[reader]]
134
del self._reads[reader]
136
def removeWriter(self, writer):
137
"""Remove a Selectable for notification of data available to write.
139
if writer in self._writes:
140
del self._writes[writer]
144
Remove all selectables, and return a list of them.
146
return self._removeAll(self._reads, self._writes)
149
def getReaders(self):
150
return self._reads.keys()
153
def getWriters(self):
154
return self._writes.keys()
157
def doWaitForMultipleEvents(self, timeout):
158
log.msg(channel='system', event='iteration', reactor=self)
163
timeout = int(timeout * 1000)
165
if not (self._events or self._writes):
166
# sleep so we don't suck up CPU time
167
time.sleep(timeout / 1000.0)
171
for fd in self._writes.keys():
172
if log.callWithLogger(fd, self._runWrite, fd):
178
handles = self._events.keys() or [self.dummyEvent]
179
val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT | QS_ALLEVENTS)
180
if val == WAIT_TIMEOUT:
182
elif val == WAIT_OBJECT_0 + len(handles):
183
exit = win32gui.PumpWaitingMessages()
185
self.callLater(0, self.stop)
187
elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles):
188
fd, action = self._events[handles[val - WAIT_OBJECT_0]]
189
log.callWithLogger(fd, self._runAction, action, fd)
191
def _runWrite(self, fd):
194
closed = fd.doWrite()
196
closed = sys.exc_info()[1]
200
self.removeReader(fd)
201
self.removeWriter(fd)
203
fd.connectionLost(failure.Failure(closed))
209
def _runAction(self, action, fd):
211
closed = getattr(fd, action)()
213
closed = sys.exc_info()[1]
217
self._disconnectSelectable(fd, closed, action == 'doRead')
219
doIteration = doWaitForMultipleEvents
221
def spawnProcess(self, processProtocol, executable, args=(), env={}, path=None, uid=None, gid=None, usePTY=0, childFDs=None):
222
"""Spawn a process."""
224
raise ValueError("Setting UID is unsupported on this platform.")
226
raise ValueError("Setting GID is unsupported on this platform.")
228
raise ValueError("PTYs are unsupported on this platform.")
229
if childFDs is not None:
231
"Custom child file descriptor mappings are unsupported on "
233
args, env = self._checkProcessArgs(args, env)
234
return Process(self, processProtocol, executable, args, env, path)
241
main.installReactor(r)
244
__all__ = ["Win32Reactor", "install"]