~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/internet/win32eventreactor.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
 
# See LICENSE for details.
3
 
 
4
 
 
5
 
"""A win32event based implementation of the Twisted main loop.
6
 
 
7
 
This requires win32all or ActivePython to be installed.
8
 
 
9
 
API Stability: semi-stable
10
 
 
11
 
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
12
 
 
13
 
 
14
 
LIMITATIONS:
15
 
 1. WaitForMultipleObjects and thus the event loop can only handle 64 objects.
16
 
 2. Process running has some problems (see Process docstring).
17
 
 
18
 
 
19
 
TODO:
20
 
 1. Event loop handling of writes is *very* problematic (this is causing failed tests).
21
 
    Switch to doing it the correct way, whatever that means (see below).
22
 
 2. Replace icky socket loopback waker with event based waker (use dummyEvent object)
23
 
 3. Switch everyone to using Free Software so we don't have to deal with proprietary APIs.
24
 
 
25
 
 
26
 
ALTERNATIVE SOLUTIONS:
27
 
 - IIRC, sockets can only be registered once. So we switch to a structure
28
 
   like the poll() reactor, thus allowing us to deal with write events in
29
 
   a decent fashion. This should allow us to pass tests, but we're still
30
 
   limited to 64 events.
31
 
 
32
 
Or:
33
 
 
34
 
 - Instead of doing a reactor, we make this an addon to the select reactor.
35
 
   The WFMO event loop runs in a separate thread. This means no need to maintain
36
 
   separate code for networking, 64 event limit doesn't apply to sockets,
37
 
   we can run processes and other win32 stuff in default event loop. The
38
 
   only problem is that we're stuck with the icky socket based waker.
39
 
   Another benefit is that this could be extended to support >64 events
40
 
   in a simpler manner than the previous solution.
41
 
 
42
 
The 2nd solution is probably what will get implemented.
43
 
"""
44
 
 
45
 
# Win32 imports
46
 
from win32file import WSAEventSelect, FD_READ, FD_WRITE, FD_CLOSE, \
47
 
                      FD_ACCEPT, FD_CONNECT
48
 
from win32event import CreateEvent, MsgWaitForMultipleObjects, \
49
 
                       WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE, QS_ALLINPUT, QS_ALLEVENTS
50
 
import win32api
51
 
import win32con
52
 
import win32event
53
 
import win32file
54
 
import win32pipe
55
 
import win32process
56
 
import win32security
57
 
import pywintypes
58
 
import msvcrt
59
 
import win32gui
60
 
 
61
 
# Twisted imports
62
 
from twisted.internet import abstract, posixbase, main, error
63
 
from twisted.python import log, threadable, failure
64
 
from twisted.internet.interfaces import IReactorFDSet, IReactorProcess, IProcessTransport
65
 
 
66
 
from twisted.internet._dumbwin32proc import Process
67
 
 
68
 
# System imports
69
 
import os
70
 
import threading
71
 
import Queue
72
 
import string
73
 
import time
74
 
import sys
75
 
from zope.interface import implements
76
 
 
77
 
 
78
 
# globals
79
 
reads = {}
80
 
writes = {}
81
 
events = {}
82
 
 
83
 
 
84
 
class Win32Reactor(posixbase.PosixReactorBase):
85
 
    """Reactor that uses Win32 event APIs."""
86
 
 
87
 
    implements(IReactorFDSet, IReactorProcess)
88
 
 
89
 
    dummyEvent = CreateEvent(None, 0, 0, None)
90
 
 
91
 
    def _makeSocketEvent(self, fd, action, why, events=events):
92
 
        """Make a win32 event object for a socket."""
93
 
        event = CreateEvent(None, 0, 0, None)
94
 
        WSAEventSelect(fd, event, why)
95
 
        events[event] = (fd, action)
96
 
        return event
97
 
 
98
 
    def addEvent(self, event, fd, action, events=events):
99
 
        """Add a new win32 event to the event loop."""
100
 
        events[event] = (fd, action)
101
 
 
102
 
    def removeEvent(self, event):
103
 
        """Remove an event."""
104
 
        del events[event]
105
 
 
106
 
    def addReader(self, reader, reads=reads):
107
 
        """Add a socket FileDescriptor for notification of data available to read.
108
 
        """
109
 
        if not reads.has_key(reader):
110
 
            reads[reader] = self._makeSocketEvent(reader, 'doRead', FD_READ|FD_ACCEPT|FD_CONNECT|FD_CLOSE)
111
 
 
112
 
    def addWriter(self, writer, writes=writes):
113
 
        """Add a socket FileDescriptor for notification of data available to write.
114
 
        """
115
 
        if not writes.has_key(writer):
116
 
            writes[writer] = 1
117
 
 
118
 
    def removeReader(self, reader):
119
 
        """Remove a Selectable for notification of data available to read.
120
 
        """
121
 
        if reads.has_key(reader):
122
 
            del events[reads[reader]]
123
 
            del reads[reader]
124
 
 
125
 
    def removeWriter(self, writer, writes=writes):
126
 
        """Remove a Selectable for notification of data available to write.
127
 
        """
128
 
        if writes.has_key(writer):
129
 
            del writes[writer]
130
 
 
131
 
    def removeAll(self):
132
 
        """Remove all selectables, and return a list of them."""
133
 
        return self._removeAll(reads, writes)
134
 
 
135
 
    def doWaitForMultipleEvents(self, timeout,
136
 
                                reads=reads,
137
 
                                writes=writes):
138
 
        log.msg(channel='system', event='iteration', reactor=self)
139
 
        if timeout is None:
140
 
            #timeout = INFINITE
141
 
            timeout = 100
142
 
        else:
143
 
            timeout = int(timeout * 1000)
144
 
 
145
 
        if not (events or writes):
146
 
            # sleep so we don't suck up CPU time
147
 
            time.sleep(timeout / 1000.0)
148
 
            return
149
 
 
150
 
        canDoMoreWrites = 0
151
 
        for fd in writes.keys():
152
 
            if log.callWithLogger(fd, self._runWrite, fd):
153
 
                canDoMoreWrites = 1
154
 
 
155
 
        if canDoMoreWrites:
156
 
            timeout = 0
157
 
 
158
 
        handles = events.keys() or [self.dummyEvent]
159
 
        val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT | QS_ALLEVENTS)
160
 
        if val == WAIT_TIMEOUT:
161
 
            return
162
 
        elif val == WAIT_OBJECT_0 + len(handles):
163
 
            exit = win32gui.PumpWaitingMessages()
164
 
            if exit:
165
 
                self.callLater(0, self.stop)
166
 
                return
167
 
        elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles):
168
 
            fd, action = events[handles[val - WAIT_OBJECT_0]]
169
 
            log.callWithLogger(fd, self._runAction, action, fd)
170
 
 
171
 
    def _runWrite(self, fd):
172
 
        closed = 0
173
 
        try:
174
 
            closed = fd.doWrite()
175
 
        except:
176
 
            closed = sys.exc_info()[1]
177
 
            log.deferr()
178
 
 
179
 
        if closed:
180
 
            self.removeReader(fd)
181
 
            self.removeWriter(fd)
182
 
            try:
183
 
                fd.connectionLost(failure.Failure(closed))
184
 
            except:
185
 
                log.deferr()
186
 
        elif closed is None:
187
 
            return 1
188
 
 
189
 
    def _runAction(self, action, fd):
190
 
        try:
191
 
            closed = getattr(fd, action)()
192
 
        except:
193
 
            closed = sys.exc_info()[1]
194
 
            log.deferr()
195
 
 
196
 
        if closed:
197
 
            self._disconnectSelectable(fd, closed, action == 'doRead')
198
 
 
199
 
    doIteration = doWaitForMultipleEvents
200
 
 
201
 
    def spawnProcess(self, processProtocol, executable, args=(), env={}, path=None, uid=None, gid=None, usePTY=0, childFDs=None):
202
 
        """Spawn a process."""
203
 
        if uid is not None:
204
 
            raise ValueError("Setting UID is unsupported on this platform.")
205
 
        if gid is not None:
206
 
            raise ValueError("Setting GID is unsupported on this platform.")
207
 
        if usePTY:
208
 
            raise ValueError("PTYs are unsupported on this platform.")
209
 
        if childFDs is not None:
210
 
            raise ValueError(
211
 
                "Custom child file descriptor mappings are unsupported on "
212
 
                "this platform.")
213
 
        args, env = self._checkProcessArgs(args, env)
214
 
        return Process(self, processProtocol, executable, args, env, path)
215
 
 
216
 
 
217
 
def install():
218
 
    threadable.init(1)
219
 
    r = Win32Reactor()
220
 
    import main
221
 
    main.installReactor(r)
222
 
 
223
 
 
224
 
__all__ = ["Win32Reactor", "install"]