~vishvananda/nova/network-refactor

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/win32eventreactor.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
 
 
5
"""
 
6
A win32event based implementation of the Twisted main loop.
 
7
 
 
8
This requires win32all or ActivePython to be installed.
 
9
 
 
10
Maintainer: Itamar Shtull-Trauring
 
11
 
 
12
 
 
13
LIMITATIONS:
 
14
 1. WaitForMultipleObjects and thus the event loop can only handle 64 objects.
 
15
 2. Process running has some problems (see Process docstring).
 
16
 
 
17
 
 
18
TODO:
 
19
 1. Event loop handling of writes is *very* problematic (this is causing failed tests).
 
20
    Switch to doing it the correct way, whatever that means (see below).
 
21
 2. Replace icky socket loopback waker with event based waker (use dummyEvent object)
 
22
 3. Switch everyone to using Free Software so we don't have to deal with proprietary APIs.
 
23
 
 
24
 
 
25
ALTERNATIVE SOLUTIONS:
 
26
 - IIRC, sockets can only be registered once. So we switch to a structure
 
27
   like the poll() reactor, thus allowing us to deal with write events in
 
28
   a decent fashion. This should allow us to pass tests, but we're still
 
29
   limited to 64 events.
 
30
 
 
31
Or:
 
32
 
 
33
 - Instead of doing a reactor, we make this an addon to the select reactor.
 
34
   The WFMO event loop runs in a separate thread. This means no need to maintain
 
35
   separate code for networking, 64 event limit doesn't apply to sockets,
 
36
   we can run processes and other win32 stuff in default event loop. The
 
37
   only problem is that we're stuck with the icky socket based waker.
 
38
   Another benefit is that this could be extended to support >64 events
 
39
   in a simpler manner than the previous solution.
 
40
 
 
41
The 2nd solution is probably what will get implemented.
 
42
"""
 
43
 
 
44
# System imports
 
45
import time
 
46
import sys
 
47
 
 
48
from zope.interface import implements
 
49
 
 
50
# Win32 imports
 
51
from win32file import WSAEventSelect, FD_READ, FD_CLOSE, FD_ACCEPT, FD_CONNECT
 
52
from win32event import CreateEvent, MsgWaitForMultipleObjects
 
53
from win32event import WAIT_OBJECT_0, WAIT_TIMEOUT, QS_ALLINPUT, QS_ALLEVENTS
 
54
 
 
55
import win32gui
 
56
 
 
57
# Twisted imports
 
58
from twisted.internet import posixbase
 
59
from twisted.python import log, threadable, failure
 
60
from twisted.internet.interfaces import IReactorFDSet, IReactorProcess
 
61
 
 
62
from twisted.internet._dumbwin32proc import Process
 
63
 
 
64
 
 
65
class Win32Reactor(posixbase.PosixReactorBase):
 
66
    """
 
67
    Reactor that uses Win32 event APIs.
 
68
 
 
69
    @ivar _reads: A dictionary mapping L{FileDescriptor} instances to a
 
70
        win32 event object used to check for read events for that descriptor.
 
71
 
 
72
    @ivar _writes: A dictionary mapping L{FileDescriptor} instances to a
 
73
        arbitrary value.  Keys in this dictionary will be given a chance to
 
74
        write out their data.
 
75
 
 
76
    @ivar _events: A dictionary mapping win32 event object to tuples of
 
77
        L{FileDescriptor} instances and event masks.
 
78
    """
 
79
    implements(IReactorFDSet, IReactorProcess)
 
80
 
 
81
    dummyEvent = CreateEvent(None, 0, 0, None)
 
82
 
 
83
    def __init__(self):
 
84
        self._reads = {}
 
85
        self._writes = {}
 
86
        self._events = {}
 
87
        posixbase.PosixReactorBase.__init__(self)
 
88
 
 
89
 
 
90
    def _makeSocketEvent(self, fd, action, why):
 
91
        """
 
92
        Make a win32 event object for a socket.
 
93
        """
 
94
        event = CreateEvent(None, 0, 0, None)
 
95
        WSAEventSelect(fd, event, why)
 
96
        self._events[event] = (fd, action)
 
97
        return event
 
98
 
 
99
 
 
100
    def addEvent(self, event, fd, action):
 
101
        """
 
102
        Add a new win32 event to the event loop.
 
103
        """
 
104
        self._events[event] = (fd, action)
 
105
 
 
106
 
 
107
    def removeEvent(self, event):
 
108
        """
 
109
        Remove an event.
 
110
        """
 
111
        del self._events[event]
 
112
 
 
113
 
 
114
    def addReader(self, reader):
 
115
        """
 
116
        Add a socket FileDescriptor for notification of data available to read.
 
117
        """
 
118
        if reader not in self._reads:
 
119
            self._reads[reader] = self._makeSocketEvent(
 
120
                reader, 'doRead', FD_READ | FD_ACCEPT | FD_CONNECT | FD_CLOSE)
 
121
 
 
122
    def addWriter(self, writer):
 
123
        """
 
124
        Add a socket FileDescriptor for notification of data available to write.
 
125
        """
 
126
        if writer not in self._writes:
 
127
            self._writes[writer] = 1
 
128
 
 
129
    def removeReader(self, reader):
 
130
        """Remove a Selectable for notification of data available to read.
 
131
        """
 
132
        if reader in self._reads:
 
133
            del self._events[self._reads[reader]]
 
134
            del self._reads[reader]
 
135
 
 
136
    def removeWriter(self, writer):
 
137
        """Remove a Selectable for notification of data available to write.
 
138
        """
 
139
        if writer in self._writes:
 
140
            del self._writes[writer]
 
141
 
 
142
    def removeAll(self):
 
143
        """
 
144
        Remove all selectables, and return a list of them.
 
145
        """
 
146
        return self._removeAll(self._reads, self._writes)
 
147
 
 
148
 
 
149
    def getReaders(self):
 
150
        return self._reads.keys()
 
151
 
 
152
 
 
153
    def getWriters(self):
 
154
        return self._writes.keys()
 
155
 
 
156
 
 
157
    def doWaitForMultipleEvents(self, timeout):
 
158
        log.msg(channel='system', event='iteration', reactor=self)
 
159
        if timeout is None:
 
160
            #timeout = INFINITE
 
161
            timeout = 100
 
162
        else:
 
163
            timeout = int(timeout * 1000)
 
164
 
 
165
        if not (self._events or self._writes):
 
166
            # sleep so we don't suck up CPU time
 
167
            time.sleep(timeout / 1000.0)
 
168
            return
 
169
 
 
170
        canDoMoreWrites = 0
 
171
        for fd in self._writes.keys():
 
172
            if log.callWithLogger(fd, self._runWrite, fd):
 
173
                canDoMoreWrites = 1
 
174
 
 
175
        if canDoMoreWrites:
 
176
            timeout = 0
 
177
 
 
178
        handles = self._events.keys() or [self.dummyEvent]
 
179
        val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT | QS_ALLEVENTS)
 
180
        if val == WAIT_TIMEOUT:
 
181
            return
 
182
        elif val == WAIT_OBJECT_0 + len(handles):
 
183
            exit = win32gui.PumpWaitingMessages()
 
184
            if exit:
 
185
                self.callLater(0, self.stop)
 
186
                return
 
187
        elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles):
 
188
            fd, action = self._events[handles[val - WAIT_OBJECT_0]]
 
189
            log.callWithLogger(fd, self._runAction, action, fd)
 
190
 
 
191
    def _runWrite(self, fd):
 
192
        closed = 0
 
193
        try:
 
194
            closed = fd.doWrite()
 
195
        except:
 
196
            closed = sys.exc_info()[1]
 
197
            log.deferr()
 
198
 
 
199
        if closed:
 
200
            self.removeReader(fd)
 
201
            self.removeWriter(fd)
 
202
            try:
 
203
                fd.connectionLost(failure.Failure(closed))
 
204
            except:
 
205
                log.deferr()
 
206
        elif closed is None:
 
207
            return 1
 
208
 
 
209
    def _runAction(self, action, fd):
 
210
        try:
 
211
            closed = getattr(fd, action)()
 
212
        except:
 
213
            closed = sys.exc_info()[1]
 
214
            log.deferr()
 
215
 
 
216
        if closed:
 
217
            self._disconnectSelectable(fd, closed, action == 'doRead')
 
218
 
 
219
    doIteration = doWaitForMultipleEvents
 
220
 
 
221
    def spawnProcess(self, processProtocol, executable, args=(), env={}, path=None, uid=None, gid=None, usePTY=0, childFDs=None):
 
222
        """Spawn a process."""
 
223
        if uid is not None:
 
224
            raise ValueError("Setting UID is unsupported on this platform.")
 
225
        if gid is not None:
 
226
            raise ValueError("Setting GID is unsupported on this platform.")
 
227
        if usePTY:
 
228
            raise ValueError("PTYs are unsupported on this platform.")
 
229
        if childFDs is not None:
 
230
            raise ValueError(
 
231
                "Custom child file descriptor mappings are unsupported on "
 
232
                "this platform.")
 
233
        args, env = self._checkProcessArgs(args, env)
 
234
        return Process(self, processProtocol, executable, args, env, path)
 
235
 
 
236
 
 
237
def install():
 
238
    threadable.init(1)
 
239
    r = Win32Reactor()
 
240
    import main
 
241
    main.installReactor(r)
 
242
 
 
243
 
 
244
__all__ = ["Win32Reactor", "install"]