~landscape/zope3/ztk-1.1.3

« back to all changes in this revision

Viewing changes to src/twisted/internet/iocpreactor/ops.py

  • Committer: Andreas Hasenack
  • Date: 2009-07-20 17:49:16 UTC
  • Revision ID: andreas@canonical.com-20090720174916-g2tn6qmietz2hn0u
Revert twisted removal, it breaks several dozen tests [trivial]

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
 
 
5
import struct, socket, os, errno
 
6
#import time
 
7
 
 
8
from twisted.internet import error
 
9
from twisted.python import failure, log
 
10
 
 
11
from _iocp import have_connectex
 
12
 
 
13
SO_UPDATE_ACCEPT_CONTEXT = 0x700B
 
14
SO_UPDATE_CONNECT_CONTEXT = 0x7010
 
15
 
 
16
ERROR_CONNECTION_REFUSED = 1225
 
17
ERROR_INVALID_HANDLE = 6
 
18
ERROR_PIPE_ENDED = 109
 
19
ERROR_SEM_TIMEOUT = 121
 
20
ERROR_NETNAME_DELETED = 64
 
21
 
 
22
winerrcodeMapping = {ERROR_CONNECTION_REFUSED: errno.WSAECONNREFUSED}
 
23
 
 
24
class OverlappedOp:
 
25
    def __init__(self, transport):
 
26
        from twisted.internet import reactor
 
27
        self.reactor = reactor
 
28
        self.transport = transport
 
29
 
 
30
    def ovDone(self, ret, bytes, arg):
 
31
        raise NotImplementedError
 
32
 
 
33
    def initiateOp(self):
 
34
        raise NotImplementedError
 
35
 
 
36
class ReadFileOp(OverlappedOp):
 
37
    def ovDone(self, ret, bytes, (handle, buffer)):
 
38
        if ret or not bytes:
 
39
            self.transport.readErr(ret, bytes)
 
40
        else:
 
41
            self.transport.readDone(bytes)
 
42
 
 
43
    def initiateOp(self, handle, buffer):
 
44
        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
 
45
 
 
46
class WriteFileOp(OverlappedOp):
 
47
    def ovDone(self, ret, bytes, (handle, buffer)):
 
48
#        log.msg("WriteFileOp.ovDone", time.time())
 
49
        if ret or not bytes:
 
50
            self.transport.writeErr(ret, bytes)
 
51
        else:
 
52
            self.transport.writeDone(bytes)
 
53
 
 
54
    def initiateOp(self, handle, buffer):
 
55
#        log.msg("WriteFileOp.initiateOp", time.time())
 
56
        self.reactor.issueWriteFile(handle, buffer, self.ovDone, (handle, buffer))
 
57
 
 
58
class WSASendToOp(OverlappedOp):
 
59
    def ovDone(self, ret, bytes, (handle, buffer)):
 
60
        if ret or not bytes:
 
61
            self.transport.writeErr(ret, bytes)
 
62
        else:
 
63
            self.transport.writeDone(bytes)
 
64
 
 
65
    def initiateOp(self, handle, buffer, addr):
 
66
        max_addr, family, type, protocol = self.reactor.getsockinfo(handle)
 
67
        self.reactor.issueWSASendTo(handle, buffer, family, addr, self.ovDone, (handle, buffer))
 
68
 
 
69
class WSARecvFromOp(OverlappedOp):
 
70
    def ovDone(self, ret, bytes, (handle, buffer, ab)):
 
71
        if ret or not bytes:
 
72
            self.transport.readErr(ret, bytes)
 
73
        else:
 
74
            self.transport.readDone(bytes, self.reactor.interpretAB(ab))
 
75
 
 
76
    def initiateOp(self, handle, buffer):
 
77
        ab = self.reactor.AllocateReadBuffer(1024)
 
78
        self.reactor.issueWSARecvFrom(handle, buffer, ab, self.ovDone, (handle, buffer, ab))
 
79
 
 
80
class AcceptExOp(OverlappedOp):
 
81
    def ovDone(self, ret, bytes, (handle, buffer, acc_sock)):
 
82
        if ret in (ERROR_NETNAME_DELETED, ERROR_SEM_TIMEOUT):
 
83
            # yay, recursion
 
84
            self.initiateOp(handle)
 
85
        elif ret:
 
86
            self.transport.acceptErr(ret, bytes)
 
87
        else:
 
88
            try:
 
89
                acc_sock.setsockopt(socket.SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, struct.pack("I", handle))
 
90
            except socket.error, se:
 
91
                self.transport.acceptErr(ret, bytes)
 
92
            else:
 
93
                self.transport.acceptDone(acc_sock, acc_sock.getpeername())
 
94
 
 
95
    def initiateOp(self, handle):
 
96
        max_addr, family, type, protocol = self.reactor.getsockinfo(handle)
 
97
        acc_sock = socket.socket(family, type, protocol)
 
98
        buffer = self.reactor.AllocateReadBuffer(max_addr*2 + 32)
 
99
        self.reactor.issueAcceptEx(handle, acc_sock.fileno(), self.ovDone, (handle, buffer, acc_sock), buffer)
 
100
 
 
101
class ConnectExOp(OverlappedOp):
 
102
    def ovDone(self, ret, bytes, (handle, sock)):
 
103
        if ret:
 
104
#            print "ConnectExOp err", ret
 
105
            self.transport.connectErr(failure.Failure(error.errnoMapping.get(winerrcodeMapping.get(ret), error.ConnectError)())) # finish the mapping in error.py
 
106
        else:
 
107
            if have_connectex:
 
108
                try:
 
109
                    sock.setsockopt(socket.SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, "")
 
110
                except socket.error, se:
 
111
                    self.transport.connectErr(failure.Failure(error.ConnectError()))
 
112
            self.transport.connectDone()
 
113
 
 
114
    def threadedDone(self, _):
 
115
        self.transport.connectDone()
 
116
 
 
117
    def threadedErr(self, err):
 
118
        self.transport.connectErr(err)
 
119
 
 
120
    def initiateOp(self, sock, addr):
 
121
        handle = sock.fileno()
 
122
        if have_connectex:
 
123
            max_addr, family, type, protocol = self.reactor.getsockinfo(handle)
 
124
            self.reactor.issueConnectEx(handle, family, addr, self.ovDone, (handle, sock))
 
125
        else:
 
126
            from twisted.internet.threads import deferToThread
 
127
            d = deferToThread(self.threadedThing, sock, addr)
 
128
            d.addCallback(self.threadedDone)
 
129
            d.addErrback(self.threadedErr)
 
130
 
 
131
    def threadedThing(self, sock, addr):
 
132
        res = sock.connect_ex(addr)
 
133
        if res:
 
134
            raise error.getConnectError((res, os.strerror(res)))
 
135
 
 
136
## Define custom xxxOp classes to handle IO operations related
 
137
## to stdout/err/in for the process transport.
 
138
class ReadOutOp(OverlappedOp):
 
139
    def ovDone(self, ret, bytes, (handle, buffer)):
 
140
        if ret or not bytes:
 
141
            try:
 
142
                self.transport.outConnectionLost()
 
143
            except Exception, e:
 
144
                log.err(e)
 
145
                # Close all handles and proceed as normal,
 
146
                # waiting for process to exit.
 
147
                self.transport.closeStdout()
 
148
                self.transport.closeStderr()
 
149
                self.transport.closeStdin()
 
150
        else:
 
151
            try:
 
152
                self.transport.protocol.outReceived(buffer[:bytes])
 
153
            except Exception, e:
 
154
                log.err(e)
 
155
                # Close all handles and proceed as normal,
 
156
                # waiting for process to exit.
 
157
                self.transport.closeStdout()
 
158
                self.transport.closeStderr()
 
159
                self.transport.closeStdin()
 
160
            # Keep reading
 
161
            try:
 
162
                self.initiateOp(handle, buffer)
 
163
            except WindowsError, e:
 
164
                if e.errno in (ERROR_INVALID_HANDLE, ERROR_PIPE_ENDED):
 
165
                    self.transport.outConnectionLost()
 
166
                else:
 
167
                    raise e
 
168
 
 
169
    def initiateOp(self, handle, buffer):
 
170
        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
 
171
 
 
172
class ReadErrOp(OverlappedOp):
 
173
    def ovDone(self, ret, bytes, (handle, buffer)):
 
174
        if ret or not bytes:
 
175
            try:
 
176
                self.transport.errConnectionLost()
 
177
            except Exception, e:
 
178
                log.err(e)
 
179
                # Close all handles and proceed as normal,
 
180
                # waiting for process to exit.
 
181
                self.transport.closeStdout()
 
182
                self.transport.closeStderr()
 
183
                self.transport.closeStdin()
 
184
        else:
 
185
            try:
 
186
                self.transport.protocol.errReceived(buffer[:bytes])
 
187
            except Exception, e:
 
188
                log.err(e)
 
189
                # Close all handles and proceed as normal,
 
190
                # waiting for process to exit.
 
191
                self.transport.closeStdout()
 
192
                self.transport.closeStderr()
 
193
                self.transport.closeStdin()
 
194
            # Keep reading
 
195
            try:
 
196
                self.initiateOp(handle, buffer)
 
197
            except WindowsError, e:
 
198
                if e.errno in (ERROR_INVALID_HANDLE, ERROR_PIPE_ENDED):
 
199
                    self.transport.errConnectionLost()
 
200
                else:
 
201
                    raise e
 
202
 
 
203
    def initiateOp(self, handle, buffer):
 
204
        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
 
205
 
 
206
class WriteInOp(OverlappedOp):
 
207
    def ovDone(self, ret, bytes, (handle, buffer)):
 
208
        if ret or not bytes:
 
209
            try:
 
210
                self.transport.inConnectionLost()
 
211
            except Exception, e:
 
212
                log.err(e)
 
213
                # Close all handles and proceed as normal,
 
214
                # waiting for process to exit.
 
215
                self.transport.closeStdout()
 
216
                self.transport.closeStderr()
 
217
                self.transport.closeStdin()
 
218
        else:
 
219
            try:
 
220
                self.transport.writeDone(bytes)
 
221
            except Exception, e:
 
222
                log.err(e)
 
223
                # Close all handles and proceed as normal,
 
224
                # waiting for process to exit.
 
225
                self.transport.closeStdout()
 
226
                self.transport.closeStderr()
 
227
                self.transport.closeStdin()
 
228
 
 
229
    def initiateOp(self, handle, buffer):
 
230
        self.reactor.issueWriteFile(handle, buffer, self.ovDone, (handle, buffer))
 
231
 
 
232
class ReadInOp(OverlappedOp):
 
233
    """Stdin pipe will be opened in duplex mode.  The parent will read
 
234
    stdin to detect when the child closes it so we can close our end.
 
235
    """
 
236
    def ovDone(self, ret, bytes, (handle, buffer)):
 
237
        if ret or not bytes:
 
238
            self.transport.inConnectionLost()
 
239
        else:
 
240
            # Keep reading
 
241
            try:
 
242
                self.initiateOp(handle, buffer)
 
243
            except WindowsError, e:
 
244
                if e.errno in (ERROR_INVALID_HANDLE, ERROR_PIPE_ENDED):
 
245
                    self.transport.inConnectionLost()
 
246
                else:
 
247
                    raise e
 
248
                    
 
249
    def initiateOp(self, handle, buffer):
 
250
        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))