1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
import struct, socket, os, errno
8
from twisted.internet import error
9
from twisted.python import failure, log
11
from _iocp import have_connectex
13
SO_UPDATE_ACCEPT_CONTEXT = 0x700B
14
SO_UPDATE_CONNECT_CONTEXT = 0x7010
16
ERROR_CONNECTION_REFUSED = 1225
17
ERROR_INVALID_HANDLE = 6
18
ERROR_PIPE_ENDED = 109
19
ERROR_SEM_TIMEOUT = 121
20
ERROR_NETNAME_DELETED = 64
22
winerrcodeMapping = {ERROR_CONNECTION_REFUSED: errno.WSAECONNREFUSED}
25
def __init__(self, transport):
26
from twisted.internet import reactor
27
self.reactor = reactor
28
self.transport = transport
30
def ovDone(self, ret, bytes, arg):
31
raise NotImplementedError
34
raise NotImplementedError
36
class ReadFileOp(OverlappedOp):
37
def ovDone(self, ret, bytes, (handle, buffer)):
39
self.transport.readErr(ret, bytes)
41
self.transport.readDone(bytes)
43
def initiateOp(self, handle, buffer):
44
self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
46
class WriteFileOp(OverlappedOp):
47
def ovDone(self, ret, bytes, (handle, buffer)):
48
# log.msg("WriteFileOp.ovDone", time.time())
50
self.transport.writeErr(ret, bytes)
52
self.transport.writeDone(bytes)
54
def initiateOp(self, handle, buffer):
55
# log.msg("WriteFileOp.initiateOp", time.time())
56
self.reactor.issueWriteFile(handle, buffer, self.ovDone, (handle, buffer))
58
class WSASendToOp(OverlappedOp):
59
def ovDone(self, ret, bytes, (handle, buffer)):
61
self.transport.writeErr(ret, bytes)
63
self.transport.writeDone(bytes)
65
def initiateOp(self, handle, buffer, addr):
66
max_addr, family, type, protocol = self.reactor.getsockinfo(handle)
67
self.reactor.issueWSASendTo(handle, buffer, family, addr, self.ovDone, (handle, buffer))
69
class WSARecvFromOp(OverlappedOp):
70
def ovDone(self, ret, bytes, (handle, buffer, ab)):
72
self.transport.readErr(ret, bytes)
74
self.transport.readDone(bytes, self.reactor.interpretAB(ab))
76
def initiateOp(self, handle, buffer):
77
ab = self.reactor.AllocateReadBuffer(1024)
78
self.reactor.issueWSARecvFrom(handle, buffer, ab, self.ovDone, (handle, buffer, ab))
80
class AcceptExOp(OverlappedOp):
81
def ovDone(self, ret, bytes, (handle, buffer, acc_sock)):
82
if ret in (ERROR_NETNAME_DELETED, ERROR_SEM_TIMEOUT):
84
self.initiateOp(handle)
86
self.transport.acceptErr(ret, bytes)
89
acc_sock.setsockopt(socket.SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, struct.pack("I", handle))
90
except socket.error, se:
91
self.transport.acceptErr(ret, bytes)
93
self.transport.acceptDone(acc_sock, acc_sock.getpeername())
95
def initiateOp(self, handle):
96
max_addr, family, type, protocol = self.reactor.getsockinfo(handle)
97
acc_sock = socket.socket(family, type, protocol)
98
buffer = self.reactor.AllocateReadBuffer(max_addr*2 + 32)
99
self.reactor.issueAcceptEx(handle, acc_sock.fileno(), self.ovDone, (handle, buffer, acc_sock), buffer)
101
class ConnectExOp(OverlappedOp):
102
def ovDone(self, ret, bytes, (handle, sock)):
104
# print "ConnectExOp err", ret
105
self.transport.connectErr(failure.Failure(error.errnoMapping.get(winerrcodeMapping.get(ret), error.ConnectError)())) # finish the mapping in error.py
109
sock.setsockopt(socket.SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, "")
110
except socket.error, se:
111
self.transport.connectErr(failure.Failure(error.ConnectError()))
112
self.transport.connectDone()
114
def threadedDone(self, _):
115
self.transport.connectDone()
117
def threadedErr(self, err):
118
self.transport.connectErr(err)
120
def initiateOp(self, sock, addr):
121
handle = sock.fileno()
123
max_addr, family, type, protocol = self.reactor.getsockinfo(handle)
124
self.reactor.issueConnectEx(handle, family, addr, self.ovDone, (handle, sock))
126
from twisted.internet.threads import deferToThread
127
d = deferToThread(self.threadedThing, sock, addr)
128
d.addCallback(self.threadedDone)
129
d.addErrback(self.threadedErr)
131
def threadedThing(self, sock, addr):
132
res = sock.connect_ex(addr)
134
raise error.getConnectError((res, os.strerror(res)))
136
## Define custom xxxOp classes to handle IO operations related
137
## to stdout/err/in for the process transport.
138
class ReadOutOp(OverlappedOp):
139
def ovDone(self, ret, bytes, (handle, buffer)):
142
self.transport.outConnectionLost()
145
# Close all handles and proceed as normal,
146
# waiting for process to exit.
147
self.transport.closeStdout()
148
self.transport.closeStderr()
149
self.transport.closeStdin()
152
self.transport.protocol.outReceived(buffer[:bytes])
155
# Close all handles and proceed as normal,
156
# waiting for process to exit.
157
self.transport.closeStdout()
158
self.transport.closeStderr()
159
self.transport.closeStdin()
162
self.initiateOp(handle, buffer)
163
except WindowsError, e:
164
if e.errno in (ERROR_INVALID_HANDLE, ERROR_PIPE_ENDED):
165
self.transport.outConnectionLost()
169
def initiateOp(self, handle, buffer):
170
self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
172
class ReadErrOp(OverlappedOp):
173
def ovDone(self, ret, bytes, (handle, buffer)):
176
self.transport.errConnectionLost()
179
# Close all handles and proceed as normal,
180
# waiting for process to exit.
181
self.transport.closeStdout()
182
self.transport.closeStderr()
183
self.transport.closeStdin()
186
self.transport.protocol.errReceived(buffer[:bytes])
189
# Close all handles and proceed as normal,
190
# waiting for process to exit.
191
self.transport.closeStdout()
192
self.transport.closeStderr()
193
self.transport.closeStdin()
196
self.initiateOp(handle, buffer)
197
except WindowsError, e:
198
if e.errno in (ERROR_INVALID_HANDLE, ERROR_PIPE_ENDED):
199
self.transport.errConnectionLost()
203
def initiateOp(self, handle, buffer):
204
self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
206
class WriteInOp(OverlappedOp):
207
def ovDone(self, ret, bytes, (handle, buffer)):
210
self.transport.inConnectionLost()
213
# Close all handles and proceed as normal,
214
# waiting for process to exit.
215
self.transport.closeStdout()
216
self.transport.closeStderr()
217
self.transport.closeStdin()
220
self.transport.writeDone(bytes)
223
# Close all handles and proceed as normal,
224
# waiting for process to exit.
225
self.transport.closeStdout()
226
self.transport.closeStderr()
227
self.transport.closeStdin()
229
def initiateOp(self, handle, buffer):
230
self.reactor.issueWriteFile(handle, buffer, self.ovDone, (handle, buffer))
232
class ReadInOp(OverlappedOp):
233
"""Stdin pipe will be opened in duplex mode. The parent will read
234
stdin to detect when the child closes it so we can close our end.
236
def ovDone(self, ret, bytes, (handle, buffer)):
238
self.transport.inConnectionLost()
242
self.initiateOp(handle, buffer)
243
except WindowsError, e:
244
if e.errno in (ERROR_INVALID_HANDLE, ERROR_PIPE_ENDED):
245
self.transport.inConnectionLost()
249
def initiateOp(self, handle, buffer):
250
self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))