1
# Copyright (c) 2008-2009 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
TCP support for IOCP reactor
8
import socket, operator, errno, struct
10
from zope.interface import implements, directlyProvides
12
from twisted.internet import interfaces, error, address, main, defer
13
from twisted.internet.abstract import isIPAddress
14
from twisted.internet.tcp import _SocketCloser, Connector as TCPConnector
15
from twisted.persisted import styles
16
from twisted.python import log, failure, reflect, util
18
from twisted.internet.iocpreactor import iocpsupport as _iocp, abstract
19
from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
20
from twisted.internet.iocpreactor.const import ERROR_IO_PENDING
21
from twisted.internet.iocpreactor.const import SO_UPDATE_CONNECT_CONTEXT
22
from twisted.internet.iocpreactor.const import SO_UPDATE_ACCEPT_CONTEXT
23
from twisted.internet.iocpreactor.const import ERROR_CONNECTION_REFUSED
24
from twisted.internet.iocpreactor.const import ERROR_NETWORK_UNREACHABLE
27
from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol
29
TLSMemoryBIOProtocol = TLSMemoryBIOFactory = None
32
_extraInterfaces = (interfaces.ITLSTransport,)
34
# ConnectEx returns these. XXX: find out what it does for timeout
36
ERROR_CONNECTION_REFUSED: errno.WSAECONNREFUSED,
37
ERROR_NETWORK_UNREACHABLE: errno.WSAENETUNREACH,
41
class _BypassTLS(object):
43
L{_BypassTLS} is used as the transport object for the TLS protocol object
44
used to implement C{startTLS}. Its methods skip any TLS logic which
47
@ivar _connection: A L{Connection} which TLS has been started on which will
48
be proxied to by this object. Any method which has its behavior
49
altered after C{startTLS} will be skipped in favor of the base class's
50
implementation. This allows the TLS protocol object to have direct
51
access to the transport, necessary to actually implement TLS.
53
def __init__(self, connection):
54
self._connection = connection
57
def __getattr__(self, name):
58
return getattr(self._connection, name)
61
def write(self, data):
62
return abstract.FileHandle.write(self._connection, data)
65
def writeSequence(self, iovec):
66
return abstract.FileHandle.writeSequence(self._connection, iovec)
69
def loseConnection(self, reason=None):
70
return abstract.FileHandle.loseConnection(self._connection, reason)
74
class Connection(abstract.FileHandle, _SocketCloser):
76
@ivar _tls: C{False} to indicate the connection is in normal TCP mode,
77
C{True} to indicate that TLS has been started and that operations must
78
be routed through the L{TLSMemoryBIOProtocol} instance.
80
@ivar _tlsClientDefault: A flag which must be set by a subclass. If set to
81
C{True}, L{startTLS} will default to initiating SSL as a client. If
82
set to C{False}, L{startTLS} will default to initiating SSL as a
85
implements(IReadWriteHandle, interfaces.ITCPTransport,
86
interfaces.ISystemHandle, *_extraInterfaces)
90
def __init__(self, sock, proto, reactor=None):
91
abstract.FileHandle.__init__(self, reactor)
93
self.getFileHandle = sock.fileno
101
def dataReceived(self, rbuffer):
102
# XXX: some day, we'll have protocols that can handle raw buffers
103
self.protocol.dataReceived(str(rbuffer))
106
def readFromHandle(self, bufflist, evt):
107
return _iocp.recv(self.getFileHandle(), bufflist, evt)
110
def writeToHandle(self, buff, evt):
111
return _iocp.send(self.getFileHandle(), buff, evt)
114
def _closeWriteConnection(self):
116
getattr(self.socket, self._socketShutdownMethod)(1)
119
p = interfaces.IHalfCloseableProtocol(self.protocol, None)
122
p.writeConnectionLost()
124
f = failure.Failure()
126
self.connectionLost(f)
129
def readConnectionLost(self, reason):
130
p = interfaces.IHalfCloseableProtocol(self.protocol, None)
133
p.readConnectionLost()
136
self.connectionLost(failure.Failure())
138
self.connectionLost(reason)
141
def connectionLost(self, reason):
142
abstract.FileHandle.connectionLost(self, reason)
144
protocol = self.protocol
147
del self.getFileHandle
148
protocol.connectionLost(reason)
153
Return the prefix to log with when I own the logging thread.
158
def getTcpNoDelay(self):
159
return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP,
163
def setTcpNoDelay(self, enabled):
164
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
167
def getTcpKeepAlive(self):
168
return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
169
socket.SO_KEEPALIVE))
172
def setTcpKeepAlive(self, enabled):
173
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
176
if TLSMemoryBIOFactory is not None:
177
def startTLS(self, contextFactory, normal=True):
179
@see: L{ITLSTransport.startTLS}
181
# Figure out which direction the SSL goes in. If normal is True,
182
# we'll go in the direction indicated by the subclass. Otherwise,
183
# we'll go the other way (client = not normal ^ _tlsClientDefault,
186
client = self._tlsClientDefault
188
client = not self._tlsClientDefault
190
tlsFactory = TLSMemoryBIOFactory(contextFactory, client, None)
191
tlsProtocol = TLSMemoryBIOProtocol(tlsFactory, self.protocol, False)
192
self.protocol = tlsProtocol
194
self.getHandle = tlsProtocol.getHandle
195
self.getPeerCertificate = tlsProtocol.getPeerCertificate
197
# Mark the transport as secure.
198
directlyProvides(self, interfaces.ISSLTransport)
200
# Remember we did this so that write and writeSequence can send the
201
# data to the right place.
205
self.protocol.makeConnection(_BypassTLS(self))
208
def write(self, data):
210
Write some data, either directly to the underlying handle or, if TLS
211
has been started, to the L{TLSMemoryBIOProtocol} for it to encrypt and
214
@see: L{ITCPTransport.write}
217
self.protocol.write(data)
219
abstract.FileHandle.write(self, data)
222
def writeSequence(self, iovec):
224
Write some data, either directly to the underlying handle or, if TLS
225
has been started, to the L{TLSMemoryBIOProtocol} for it to encrypt and
228
@see: L{ITCPTransport.writeSequence}
231
self.protocol.writeSequence(iovec)
233
abstract.FileHandle.writeSequence(self, iovec)
236
def loseConnection(self, reason=None):
238
Close the underlying handle or, if TLS has been started, first shut it
241
@see: L{ITCPTransport.loseConnection}
244
if self.connected and not self.disconnecting:
245
self.protocol.loseConnection()
247
abstract.FileHandle.loseConnection(self, reason)
251
class Client(Connection):
252
addressFamily = socket.AF_INET
253
socketType = socket.SOCK_STREAM
255
_tlsClientDefault = True
257
def __init__(self, host, port, bindAddress, connector, reactor):
258
self.connector = connector
259
self.addr = (host, port)
260
self.reactor = reactor
261
# ConnectEx documentation says socket _has_ to be bound
262
if bindAddress is None:
263
bindAddress = ('', 0)
267
skt = reactor.createSocket(self.addressFamily, self.socketType)
268
except socket.error, se:
269
raise error.ConnectBindError(se[0], se[1])
272
skt.bind(bindAddress)
273
except socket.error, se:
274
raise error.ConnectBindError(se[0], se[1])
276
Connection.__init__(self, skt, None, reactor)
277
reactor.callLater(0, self.resolveAddress)
278
except error.ConnectBindError, err:
279
reactor.callLater(0, self.failIfNotConnected, err)
282
def resolveAddress(self):
283
if isIPAddress(self.addr[0]):
284
self._setRealAddress(self.addr[0])
286
d = self.reactor.resolve(self.addr[0])
287
d.addCallbacks(self._setRealAddress, self.failIfNotConnected)
290
def _setRealAddress(self, address):
291
self.realAddress = (address, self.addr[1])
295
def failIfNotConnected(self, err):
296
if (self.connected or self.disconnected or
297
not hasattr(self, "connector")):
302
except AttributeError:
305
del self.socket, self.getFileHandle
306
self.reactor.removeActiveHandle(self)
308
self.connector.connectionFailed(failure.Failure(err))
312
def stopConnecting(self):
314
Stop attempt to connect.
316
self.failIfNotConnected(error.UserError())
319
def cbConnect(self, rc, bytes, evt):
321
rc = connectExErrors.get(rc, rc)
322
self.failIfNotConnected(error.getConnectError((rc,
323
errno.errorcode.get(rc, 'Unknown error'))))
325
self.socket.setsockopt(socket.SOL_SOCKET,
326
SO_UPDATE_CONNECT_CONTEXT,
327
struct.pack('I', self.socket.fileno()))
328
self.protocol = self.connector.buildProtocol(self.getPeer())
329
self.connected = True
330
self.logstr = self.protocol.__class__.__name__+",client"
331
self.protocol.makeConnection(self)
336
if not hasattr(self, "connector"):
337
# this happens if we connector.stopConnecting in
338
# factory.startedConnecting
340
assert _iocp.have_connectex
341
self.reactor.addActiveHandle(self)
342
evt = _iocp.Event(self.cbConnect, self)
344
rc = _iocp.connect(self.socket.fileno(), self.realAddress, evt)
345
if rc == ERROR_IO_PENDING:
349
self.cbConnect(rc, 0, 0, evt)
354
Returns an IPv4Address.
356
This indicates the address from which I am connecting.
358
return address.IPv4Address('TCP', *(self.socket.getsockname() +
364
Returns an IPv4Address.
366
This indicates the address that I am connected to.
368
return address.IPv4Address('TCP', *(self.realAddress + ('INET',)))
372
s = ('<%s to %s at %x>' %
373
(self.__class__, self.addr, util.unsignedID(self)))
377
def connectionLost(self, reason):
378
if not self.connected:
379
self.failIfNotConnected(error.ConnectError(string=reason))
381
Connection.connectionLost(self, reason)
382
self.connector.connectionLost(reason)
386
class Server(Connection):
388
Serverside socket-stream connection class.
390
I am a serverside network connection transport; a socket which came from an
391
accept() on a server.
394
_tlsClientDefault = False
397
def __init__(self, sock, protocol, clientAddr, serverAddr, sessionno, reactor):
399
Server(sock, protocol, client, server, sessionno)
401
Initialize me with a socket, a protocol, a descriptor for my peer (a
402
tuple of host, port describing the other end of the connection), an
403
instance of Port, and a session number.
405
Connection.__init__(self, sock, protocol, reactor)
406
self.serverAddr = serverAddr
407
self.clientAddr = clientAddr
408
self.sessionno = sessionno
409
self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__,
410
sessionno, self.clientAddr.host)
411
self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
412
self.sessionno, self.serverAddr.port)
413
self.connected = True
419
A string representation of this connection.
426
Returns an IPv4Address.
428
This indicates the server's address.
430
return self.serverAddr
435
Returns an IPv4Address.
437
This indicates the client's address.
439
return self.clientAddr
443
class Connector(TCPConnector):
444
def _makeTransport(self):
445
return Client(self.host, self.port, self.bindAddress, self,
450
class Port(styles.Ephemeral, _SocketCloser):
451
implements(interfaces.IListeningPort)
455
disconnecting = False
456
addressFamily = socket.AF_INET
457
socketType = socket.SOCK_STREAM
463
# Actual port number being listened on, only set to a non-None
464
# value when we are actually listening.
465
_realPortNumber = None
468
def __init__(self, port, factory, backlog=50, interface='', reactor=None):
470
self.factory = factory
471
self.backlog = backlog
472
self.interface = interface
473
self.reactor = reactor
477
if self._realPortNumber is not None:
478
return "<%s of %s on %s>" % (self.__class__,
479
self.factory.__class__,
480
self._realPortNumber)
482
return "<%s of %s (not listening)>" % (self.__class__,
483
self.factory.__class__)
486
def startListening(self):
488
skt = self.reactor.createSocket(self.addressFamily,
490
# TODO: resolve self.interface if necessary
491
skt.bind((self.interface, self.port))
492
except socket.error, le:
493
raise error.CannotListenError, (self.interface, self.port, le)
495
self.addrLen = _iocp.maxAddrLen(skt.fileno())
497
# Make sure that if we listened on port 0, we update that to
498
# reflect what the OS actually assigned us.
499
self._realPortNumber = skt.getsockname()[1]
501
log.msg("%s starting on %s" % (self.factory.__class__,
502
self._realPortNumber))
504
self.factory.doStart()
505
skt.listen(self.backlog)
506
self.connected = True
507
self.disconnected = False
508
self.reactor.addActiveHandle(self)
510
self.getFileHandle = self.socket.fileno
514
def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
516
Stop accepting connections on this port.
518
This will shut down my socket and call self.connectionLost().
519
It returns a deferred which will fire successfully when the
520
port is actually closed.
522
self.disconnecting = True
524
self.deferred = defer.Deferred()
525
self.reactor.callLater(0, self.connectionLost, connDone)
528
stopListening = loseConnection
531
def connectionLost(self, reason):
533
Cleans up the socket.
535
log.msg('(Port %s Closed)' % self._realPortNumber)
536
self._realPortNumber = None
538
if hasattr(self, "deferred"):
542
self.disconnected = True
543
self.reactor.removeActiveHandle(self)
544
self.connected = False
547
del self.getFileHandle
550
self.factory.doStop()
552
self.disconnecting = False
554
d.errback(failure.Failure())
558
self.disconnecting = False
565
Returns the name of my class, to prefix log entries with.
567
return reflect.qual(self.factory.__class__)
572
Returns an IPv4Address.
574
This indicates the server's address.
576
return address.IPv4Address('TCP', *(self.socket.getsockname() +
580
def cbAccept(self, rc, bytes, evt):
581
self.handleAccept(rc, evt)
582
if not (self.disconnecting or self.disconnected):
586
def handleAccept(self, rc, evt):
587
if self.disconnecting or self.disconnected:
591
# (WSAEMFILE, WSAENOBUFS, WSAENFILE, WSAENOMEM, WSAECONNABORTED)
593
log.msg("Could not accept new connection -- %s (%s)" %
594
(errno.errorcode.get(rc, 'unknown error'), rc))
597
evt.newskt.setsockopt(socket.SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
598
struct.pack('I', self.socket.fileno()))
599
family, lAddr, rAddr = _iocp.get_accept_addrs(evt.newskt.fileno(),
601
assert family == self.addressFamily
603
protocol = self.factory.buildProtocol(
604
address._ServerFactoryIPv4Address('TCP', rAddr[0], rAddr[1]))
610
transport = Server(evt.newskt, protocol,
611
address.IPv4Address('TCP', rAddr[0], rAddr[1], 'INET'),
612
address.IPv4Address('TCP', lAddr[0], lAddr[1], 'INET'),
614
protocol.makeConnection(transport)
621
evt = _iocp.Event(self.cbAccept, self)
623
# see AcceptEx documentation
624
evt.buff = buff = _iocp.AllocateReadBuffer(2 * (self.addrLen + 16))
626
evt.newskt = newskt = self.reactor.createSocket(self.addressFamily,
628
rc = _iocp.accept(self.socket.fileno(), newskt.fileno(), buff, evt)
630
if (rc == ERROR_IO_PENDING
631
or (not rc and numAccepts >= self.maxAccepts)):
635
if not self.handleAccept(rc, evt):