1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
"""A win32event based implementation of the Twisted main loop.
7
This requires win32all or ActivePython to be installed.
9
API Stability: semi-stable
11
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
15
1. WaitForMultipleObjects and thus the event loop can only handle 64 objects.
16
2. Process running has some problems (see Process docstring).
20
1. Event loop handling of writes is *very* problematic (this is causing failed tests).
21
Switch to doing it the correct way, whatever that means (see below).
22
2. Replace icky socket loopback waker with event based waker (use dummyEvent object)
23
3. Switch everyone to using Free Software so we don't have to deal with proprietary APIs.
26
ALTERNATIVE SOLUTIONS:
27
- IIRC, sockets can only be registered once. So we switch to a structure
28
like the poll() reactor, thus allowing us to deal with write events in
29
a decent fashion. This should allow us to pass tests, but we're still
34
- Instead of doing a reactor, we make this an addon to the select reactor.
35
The WFMO event loop runs in a separate thread. This means no need to maintain
36
separate code for networking, 64 event limit doesn't apply to sockets,
37
we can run processes and other win32 stuff in default event loop. The
38
only problem is that we're stuck with the icky socket based waker.
39
Another benefit is that this could be extended to support >64 events
40
in a simpler manner than the previous solution.
42
The 2nd solution is probably what will get implemented.
46
from win32file import WSAEventSelect, FD_READ, FD_WRITE, FD_CLOSE, \
48
from win32event import CreateEvent, MsgWaitForMultipleObjects, \
49
WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE, QS_ALLINPUT, QS_ALLEVENTS
62
from twisted.internet import abstract, posixbase, main, error
63
from twisted.python import log, threadable, failure
64
from twisted.internet.interfaces import IReactorFDSet, IReactorProcess, IProcessTransport
66
from twisted.internet._dumbwin32proc import Process
75
from zope.interface import implements
84
class Win32Reactor(posixbase.PosixReactorBase):
85
"""Reactor that uses Win32 event APIs."""
87
implements(IReactorFDSet, IReactorProcess)
89
dummyEvent = CreateEvent(None, 0, 0, None)
91
def _makeSocketEvent(self, fd, action, why, events=events):
92
"""Make a win32 event object for a socket."""
93
event = CreateEvent(None, 0, 0, None)
94
WSAEventSelect(fd, event, why)
95
events[event] = (fd, action)
98
def addEvent(self, event, fd, action, events=events):
99
"""Add a new win32 event to the event loop."""
100
events[event] = (fd, action)
102
def removeEvent(self, event):
103
"""Remove an event."""
106
def addReader(self, reader, reads=reads):
107
"""Add a socket FileDescriptor for notification of data available to read.
109
if not reads.has_key(reader):
110
reads[reader] = self._makeSocketEvent(reader, 'doRead', FD_READ|FD_ACCEPT|FD_CONNECT|FD_CLOSE)
112
def addWriter(self, writer, writes=writes):
113
"""Add a socket FileDescriptor for notification of data available to write.
115
if not writes.has_key(writer):
118
def removeReader(self, reader):
119
"""Remove a Selectable for notification of data available to read.
121
if reads.has_key(reader):
122
del events[reads[reader]]
125
def removeWriter(self, writer, writes=writes):
126
"""Remove a Selectable for notification of data available to write.
128
if writes.has_key(writer):
132
"""Remove all selectables, and return a list of them."""
133
return self._removeAll(reads, writes)
135
def doWaitForMultipleEvents(self, timeout,
138
log.msg(channel='system', event='iteration', reactor=self)
143
timeout = int(timeout * 1000)
145
if not (events or writes):
146
# sleep so we don't suck up CPU time
147
time.sleep(timeout / 1000.0)
151
for fd in writes.keys():
152
if log.callWithLogger(fd, self._runWrite, fd):
158
handles = events.keys() or [self.dummyEvent]
159
val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT | QS_ALLEVENTS)
160
if val == WAIT_TIMEOUT:
162
elif val == WAIT_OBJECT_0 + len(handles):
163
exit = win32gui.PumpWaitingMessages()
165
self.callLater(0, self.stop)
167
elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles):
168
fd, action = events[handles[val - WAIT_OBJECT_0]]
169
log.callWithLogger(fd, self._runAction, action, fd)
171
def _runWrite(self, fd):
174
closed = fd.doWrite()
176
closed = sys.exc_info()[1]
180
self.removeReader(fd)
181
self.removeWriter(fd)
183
fd.connectionLost(failure.Failure(closed))
189
def _runAction(self, action, fd):
191
closed = getattr(fd, action)()
193
closed = sys.exc_info()[1]
197
self._disconnectSelectable(fd, closed, action == 'doRead')
199
doIteration = doWaitForMultipleEvents
201
def spawnProcess(self, processProtocol, executable, args=(), env={}, path=None, uid=None, gid=None, usePTY=0, childFDs=None):
202
"""Spawn a process."""
204
raise ValueError("Setting UID is unsupported on this platform.")
206
raise ValueError("Setting GID is unsupported on this platform.")
208
raise ValueError("PTYs are unsupported on this platform.")
209
if childFDs is not None:
211
"Custom child file descriptor mappings are unsupported on "
213
args, env = self._checkProcessArgs(args, env)
214
return Process(self, processProtocol, executable, args, env, path)
221
main.installReactor(r)
224
__all__ = ["Win32Reactor", "install"]