~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/internet/iocpreactor/ops.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
 
# See LICENSE for details.
3
 
 
4
 
 
5
 
import struct, socket, os, errno
6
 
#import time
7
 
 
8
 
from twisted.internet import error
9
 
from twisted.python import failure, log
10
 
 
11
 
from _iocp import have_connectex
12
 
 
13
 
SO_UPDATE_ACCEPT_CONTEXT = 0x700B
14
 
SO_UPDATE_CONNECT_CONTEXT = 0x7010
15
 
 
16
 
ERROR_CONNECTION_REFUSED = 1225
17
 
ERROR_INVALID_HANDLE = 6
18
 
ERROR_PIPE_ENDED = 109
19
 
ERROR_SEM_TIMEOUT = 121
20
 
ERROR_NETNAME_DELETED = 64
21
 
 
22
 
winerrcodeMapping = {ERROR_CONNECTION_REFUSED: errno.WSAECONNREFUSED}
23
 
 
24
 
class OverlappedOp:
25
 
    def __init__(self, transport):
26
 
        from twisted.internet import reactor
27
 
        self.reactor = reactor
28
 
        self.transport = transport
29
 
 
30
 
    def ovDone(self, ret, bytes, arg):
31
 
        raise NotImplementedError
32
 
 
33
 
    def initiateOp(self):
34
 
        raise NotImplementedError
35
 
 
36
 
class ReadFileOp(OverlappedOp):
37
 
    def ovDone(self, ret, bytes, (handle, buffer)):
38
 
        if ret or not bytes:
39
 
            self.transport.readErr(ret, bytes)
40
 
        else:
41
 
            self.transport.readDone(bytes)
42
 
 
43
 
    def initiateOp(self, handle, buffer):
44
 
        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
45
 
 
46
 
class WriteFileOp(OverlappedOp):
47
 
    def ovDone(self, ret, bytes, (handle, buffer)):
48
 
#        log.msg("WriteFileOp.ovDone", time.time())
49
 
        if ret or not bytes:
50
 
            self.transport.writeErr(ret, bytes)
51
 
        else:
52
 
            self.transport.writeDone(bytes)
53
 
 
54
 
    def initiateOp(self, handle, buffer):
55
 
#        log.msg("WriteFileOp.initiateOp", time.time())
56
 
        self.reactor.issueWriteFile(handle, buffer, self.ovDone, (handle, buffer))
57
 
 
58
 
class WSASendToOp(OverlappedOp):
59
 
    def ovDone(self, ret, bytes, (handle, buffer)):
60
 
        if ret or not bytes:
61
 
            self.transport.writeErr(ret, bytes)
62
 
        else:
63
 
            self.transport.writeDone(bytes)
64
 
 
65
 
    def initiateOp(self, handle, buffer, addr):
66
 
        max_addr, family, type, protocol = self.reactor.getsockinfo(handle)
67
 
        self.reactor.issueWSASendTo(handle, buffer, family, addr, self.ovDone, (handle, buffer))
68
 
 
69
 
class WSARecvFromOp(OverlappedOp):
70
 
    def ovDone(self, ret, bytes, (handle, buffer, ab)):
71
 
        if ret or not bytes:
72
 
            self.transport.readErr(ret, bytes)
73
 
        else:
74
 
            self.transport.readDone(bytes, self.reactor.interpretAB(ab))
75
 
 
76
 
    def initiateOp(self, handle, buffer):
77
 
        ab = self.reactor.AllocateReadBuffer(1024)
78
 
        self.reactor.issueWSARecvFrom(handle, buffer, ab, self.ovDone, (handle, buffer, ab))
79
 
 
80
 
class AcceptExOp(OverlappedOp):
81
 
    def ovDone(self, ret, bytes, (handle, buffer, acc_sock)):
82
 
        if ret in (ERROR_NETNAME_DELETED, ERROR_SEM_TIMEOUT):
83
 
            # yay, recursion
84
 
            self.initiateOp(handle)
85
 
        elif ret:
86
 
            self.transport.acceptErr(ret, bytes)
87
 
        else:
88
 
            try:
89
 
                acc_sock.setsockopt(socket.SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, struct.pack("I", handle))
90
 
            except socket.error, se:
91
 
                self.transport.acceptErr(ret, bytes)
92
 
            else:
93
 
                self.transport.acceptDone(acc_sock, acc_sock.getpeername())
94
 
 
95
 
    def initiateOp(self, handle):
96
 
        max_addr, family, type, protocol = self.reactor.getsockinfo(handle)
97
 
        acc_sock = socket.socket(family, type, protocol)
98
 
        buffer = self.reactor.AllocateReadBuffer(max_addr*2 + 32)
99
 
        self.reactor.issueAcceptEx(handle, acc_sock.fileno(), self.ovDone, (handle, buffer, acc_sock), buffer)
100
 
 
101
 
class ConnectExOp(OverlappedOp):
102
 
    def ovDone(self, ret, bytes, (handle, sock)):
103
 
        if ret:
104
 
#            print "ConnectExOp err", ret
105
 
            self.transport.connectErr(failure.Failure(error.errnoMapping.get(winerrcodeMapping.get(ret), error.ConnectError)())) # finish the mapping in error.py
106
 
        else:
107
 
            if have_connectex:
108
 
                try:
109
 
                    sock.setsockopt(socket.SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, "")
110
 
                except socket.error, se:
111
 
                    self.transport.connectErr(failure.Failure(error.ConnectError()))
112
 
            self.transport.connectDone()
113
 
 
114
 
    def threadedDone(self, _):
115
 
        self.transport.connectDone()
116
 
 
117
 
    def threadedErr(self, err):
118
 
        self.transport.connectErr(err)
119
 
 
120
 
    def initiateOp(self, sock, addr):
121
 
        handle = sock.fileno()
122
 
        if have_connectex:
123
 
            max_addr, family, type, protocol = self.reactor.getsockinfo(handle)
124
 
            self.reactor.issueConnectEx(handle, family, addr, self.ovDone, (handle, sock))
125
 
        else:
126
 
            from twisted.internet.threads import deferToThread
127
 
            d = deferToThread(self.threadedThing, sock, addr)
128
 
            d.addCallback(self.threadedDone)
129
 
            d.addErrback(self.threadedErr)
130
 
 
131
 
    def threadedThing(self, sock, addr):
132
 
        res = sock.connect_ex(addr)
133
 
        if res:
134
 
            raise error.getConnectError((res, os.strerror(res)))
135
 
 
136
 
## Define custom xxxOp classes to handle IO operations related
137
 
## to stdout/err/in for the process transport.
138
 
class ReadOutOp(OverlappedOp):
139
 
    def ovDone(self, ret, bytes, (handle, buffer)):
140
 
        if ret or not bytes:
141
 
            try:
142
 
                self.transport.outConnectionLost()
143
 
            except Exception, e:
144
 
                log.err(e)
145
 
                # Close all handles and proceed as normal,
146
 
                # waiting for process to exit.
147
 
                self.transport.closeStdout()
148
 
                self.transport.closeStderr()
149
 
                self.transport.closeStdin()
150
 
        else:
151
 
            try:
152
 
                self.transport.protocol.outReceived(buffer[:bytes])
153
 
            except Exception, e:
154
 
                log.err(e)
155
 
                # Close all handles and proceed as normal,
156
 
                # waiting for process to exit.
157
 
                self.transport.closeStdout()
158
 
                self.transport.closeStderr()
159
 
                self.transport.closeStdin()
160
 
            # Keep reading
161
 
            try:
162
 
                self.initiateOp(handle, buffer)
163
 
            except WindowsError, e:
164
 
                if e.errno in (ERROR_INVALID_HANDLE, ERROR_PIPE_ENDED):
165
 
                    self.transport.outConnectionLost()
166
 
                else:
167
 
                    raise e
168
 
 
169
 
    def initiateOp(self, handle, buffer):
170
 
        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
171
 
 
172
 
class ReadErrOp(OverlappedOp):
173
 
    def ovDone(self, ret, bytes, (handle, buffer)):
174
 
        if ret or not bytes:
175
 
            try:
176
 
                self.transport.errConnectionLost()
177
 
            except Exception, e:
178
 
                log.err(e)
179
 
                # Close all handles and proceed as normal,
180
 
                # waiting for process to exit.
181
 
                self.transport.closeStdout()
182
 
                self.transport.closeStderr()
183
 
                self.transport.closeStdin()
184
 
        else:
185
 
            try:
186
 
                self.transport.protocol.errReceived(buffer[:bytes])
187
 
            except Exception, e:
188
 
                log.err(e)
189
 
                # Close all handles and proceed as normal,
190
 
                # waiting for process to exit.
191
 
                self.transport.closeStdout()
192
 
                self.transport.closeStderr()
193
 
                self.transport.closeStdin()
194
 
            # Keep reading
195
 
            try:
196
 
                self.initiateOp(handle, buffer)
197
 
            except WindowsError, e:
198
 
                if e.errno in (ERROR_INVALID_HANDLE, ERROR_PIPE_ENDED):
199
 
                    self.transport.errConnectionLost()
200
 
                else:
201
 
                    raise e
202
 
 
203
 
    def initiateOp(self, handle, buffer):
204
 
        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
205
 
 
206
 
class WriteInOp(OverlappedOp):
207
 
    def ovDone(self, ret, bytes, (handle, buffer)):
208
 
        if ret or not bytes:
209
 
            try:
210
 
                self.transport.inConnectionLost()
211
 
            except Exception, e:
212
 
                log.err(e)
213
 
                # Close all handles and proceed as normal,
214
 
                # waiting for process to exit.
215
 
                self.transport.closeStdout()
216
 
                self.transport.closeStderr()
217
 
                self.transport.closeStdin()
218
 
        else:
219
 
            try:
220
 
                self.transport.writeDone(bytes)
221
 
            except Exception, e:
222
 
                log.err(e)
223
 
                # Close all handles and proceed as normal,
224
 
                # waiting for process to exit.
225
 
                self.transport.closeStdout()
226
 
                self.transport.closeStderr()
227
 
                self.transport.closeStdin()
228
 
 
229
 
    def initiateOp(self, handle, buffer):
230
 
        self.reactor.issueWriteFile(handle, buffer, self.ovDone, (handle, buffer))
231
 
 
232
 
class ReadInOp(OverlappedOp):
233
 
    """Stdin pipe will be opened in duplex mode.  The parent will read
234
 
    stdin to detect when the child closes it so we can close our end.
235
 
    """
236
 
    def ovDone(self, ret, bytes, (handle, buffer)):
237
 
        if ret or not bytes:
238
 
            self.transport.inConnectionLost()
239
 
        else:
240
 
            # Keep reading
241
 
            try:
242
 
                self.initiateOp(handle, buffer)
243
 
            except WindowsError, e:
244
 
                if e.errno in (ERROR_INVALID_HANDLE, ERROR_PIPE_ENDED):
245
 
                    self.transport.inConnectionLost()
246
 
                else:
247
 
                    raise e
248
 
                    
249
 
    def initiateOp(self, handle, buffer):
250
 
        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))