1
# -*- test-case-name: twisted.test.test_tcp -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
7
"""Various asynchronous TCP/IP classes.
9
End users shouldn't use this module directly - use the reactor APIs instead.
11
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
30
from zope.interface import implements, classImplements
33
from OpenSSL import SSL
37
from twisted.python.runtime import platform, platformType
40
if platformType == 'win32':
41
# no such thing as WSAEPERM or error code 10001 according to winsock.h or MSDN
43
from errno import WSAEINVAL as EINVAL
44
from errno import WSAEWOULDBLOCK as EWOULDBLOCK
45
from errno import WSAEINPROGRESS as EINPROGRESS
46
from errno import WSAEALREADY as EALREADY
47
from errno import WSAECONNRESET as ECONNRESET
48
from errno import WSAEISCONN as EISCONN
49
from errno import WSAENOTCONN as ENOTCONN
50
from errno import WSAEINTR as EINTR
51
from errno import WSAENOBUFS as ENOBUFS
52
from errno import WSAEMFILE as EMFILE
53
# No such thing as WSAENFILE, either.
58
from errno import WSAECONNRESET as ECONNABORTED
60
from errno import EPERM
61
from errno import EINVAL
62
from errno import EWOULDBLOCK
63
from errno import EINPROGRESS
64
from errno import EALREADY
65
from errno import ECONNRESET
66
from errno import EISCONN
67
from errno import ENOTCONN
68
from errno import EINTR
69
from errno import ENOBUFS
70
from errno import EMFILE
71
from errno import ENFILE
72
from errno import ENOMEM
73
from errno import EAGAIN
74
from errno import ECONNABORTED
76
from errno import errorcode
79
from twisted.internet import protocol, defer, base, address
80
from twisted.persisted import styles
81
from twisted.python import log, failure, reflect
82
from twisted.python.util import unsignedID
83
from twisted.internet.error import CannotListenError
92
_socketShutdownMethod = 'shutdown'
94
def _closeSocket(self):
95
# socket.close() doesn't *really* close if there's another reference
96
# to it in the TCP/IP stack, e.g. if it was was inherited by a
97
# subprocess. And we really do want to close the connection. So we
98
# use shutdown() instead, and then close() in order to release the
102
getattr(skt, self._socketShutdownMethod)(2)
111
_socketShutdownMethod = 'sock_shutdown'
113
writeBlockedOnRead = 0
114
readBlockedOnWrite = 0
115
_userWantRead = _userWantWrite = True
117
def getPeerCertificate(self):
118
return self.socket.get_peer_certificate()
121
if self.writeBlockedOnRead:
122
self.writeBlockedOnRead = 0
123
self._resetReadWrite()
125
return Connection.doRead(self)
126
except SSL.ZeroReturnError:
127
return main.CONNECTION_DONE
128
except SSL.WantReadError:
130
except SSL.WantWriteError:
131
self.readBlockedOnWrite = 1
132
Connection.startWriting(self)
133
Connection.stopReading(self)
135
except SSL.SysCallError, (retval, desc):
136
if ((retval == -1 and desc == 'Unexpected EOF')
138
return main.CONNECTION_LOST
140
return main.CONNECTION_LOST
145
# Retry disconnecting
146
if self.disconnected:
147
return self._postLoseConnection()
148
if self._writeDisconnected:
149
return self._closeWriteConnection()
151
if self.readBlockedOnWrite:
152
self.readBlockedOnWrite = 0
153
self._resetReadWrite()
154
return Connection.doWrite(self)
156
def writeSomeData(self, data):
158
return Connection.writeSomeData(self, data)
159
except SSL.WantWriteError:
161
except SSL.WantReadError:
162
self.writeBlockedOnRead = 1
163
Connection.stopWriting(self)
164
Connection.startReading(self)
166
except SSL.ZeroReturnError:
167
return main.CONNECTION_LOST
168
except SSL.SysCallError, e:
169
if e[0] == -1 and data == "":
170
# errors when writing empty strings are expected
174
return main.CONNECTION_LOST
178
def _postLoseConnection(self):
179
"""Gets called after loseConnection(), after buffered data is sent.
181
We try to send an SSL shutdown alert, but if it doesn't work, retry
182
when the socket is writable.
185
if hasattr(self.socket, 'set_shutdown'):
186
self.socket.set_shutdown(SSL.RECEIVED_SHUTDOWN)
187
return self._sendCloseAlert()
190
def _sendCloseAlert(self):
191
# Okay, *THIS* is a bit complicated.
193
# Basically, the issue is, OpenSSL seems to not actually return
194
# errors from SSL_shutdown. Therefore, the only way to
195
# determine if the close notification has been sent is by
196
# SSL_shutdown returning "done". However, it will not claim it's
197
# done until it's both sent *and* received a shutdown notification.
199
# I don't actually want to wait for a received shutdown
200
# notification, though, so, I have to set RECEIVED_SHUTDOWN
201
# before calling shutdown. Then, it'll return True once it's
202
# *SENT* the shutdown.
204
# However, RECEIVED_SHUTDOWN can't be left set, because then
205
# reads will fail, breaking half close.
207
# Also, since shutdown doesn't report errors, an empty write call is
208
# done first, to try to detect if the connection has gone away.
209
# (*NOT* an SSL_write call, because that fails once you've called
212
os.write(self.socket.fileno(), '')
214
if se.args[0] in (EINTR, EWOULDBLOCK, ENOBUFS):
216
# Write error, socket gone
217
return main.CONNECTION_LOST
220
if hasattr(self.socket, 'set_shutdown'):
221
laststate = self.socket.get_shutdown()
222
self.socket.set_shutdown(laststate | SSL.RECEIVED_SHUTDOWN)
223
done = self.socket.shutdown()
224
if not (laststate & SSL.RECEIVED_SHUTDOWN):
225
self.socket.set_shutdown(SSL.SENT_SHUTDOWN)
227
#warnings.warn("SSL connection shutdown possibly unreliable, "
228
# "please upgrade to ver 0.XX", category=UserWarning)
229
self.socket.shutdown()
236
# Note that this is tested for by identity below.
237
return main.CONNECTION_DONE
242
def _closeWriteConnection(self):
243
result = self._sendCloseAlert()
245
if result is main.CONNECTION_DONE:
246
return Connection._closeWriteConnection(self)
250
def startReading(self):
251
self._userWantRead = True
252
if not self.readBlockedOnWrite:
253
return Connection.startReading(self)
255
def stopReading(self):
256
self._userWantRead = False
257
if not self.writeBlockedOnRead:
258
return Connection.stopReading(self)
260
def startWriting(self):
261
self._userWantWrite = True
262
if not self.writeBlockedOnRead:
263
return Connection.startWriting(self)
265
def stopWriting(self):
266
self._userWantWrite = False
267
if not self.readBlockedOnWrite:
268
return Connection.stopWriting(self)
270
def _resetReadWrite(self):
271
# After changing readBlockedOnWrite or writeBlockedOnRead,
272
# call this to reset the state to what the user requested.
273
if self._userWantWrite:
278
if self._userWantRead:
283
def _getTLSClass(klass, _existing={}):
284
if klass not in _existing:
285
class TLSConnection(_TLSMixin, klass):
286
implements(interfaces.ISSLTransport)
287
_existing[klass] = TLSConnection
288
return _existing[klass]
290
class Connection(abstract.FileDescriptor, _SocketCloser):
291
"""I am the superclass of all socket-based FileDescriptors.
293
This is an abstract superclass of all objects which represent a TCP/IP
294
connection based socket.
297
implements(interfaces.ITCPTransport, interfaces.ISystemHandle)
301
def __init__(self, skt, protocol, reactor=None):
302
abstract.FileDescriptor.__init__(self, reactor=reactor)
304
self.socket.setblocking(0)
305
self.fileno = skt.fileno
306
self.protocol = protocol
310
def startTLS(self, ctx):
313
if self.dataBuffer or self._tempDataBuffer:
314
self.dataBuffer += "".join(self._tempDataBuffer)
315
self._tempDataBuffer = []
316
self._tempDataLen = 0
317
written = self.writeSomeData(buffer(self.dataBuffer, self.offset))
319
dataLen = len(self.dataBuffer)
322
if isinstance(written, Exception) or (offset + written != dataLen):
329
self.socket = SSL.Connection(ctx.getContext(), self.socket)
330
self.fileno = self.socket.fileno
333
warnings.warn("startTLS with unwritten buffered data currently doesn't work right. See issue #686. Closing connection.", category=RuntimeWarning, stacklevel=2)
334
self.loseConnection()
339
self.__class__ = _getTLSClass(self.__class__)
342
"""Return the socket for this connection."""
346
"""Calls self.protocol.dataReceived with all available data.
348
This reads up to self.bufferSize bytes of data from its socket, then
349
calls self.dataReceived(data) to process it. If the connection is not
350
lost through an error in the physical recv(), this function will return
351
the result of the dataReceived call.
354
data = self.socket.recv(self.bufferSize)
355
except socket.error, se:
356
if se.args[0] == EWOULDBLOCK:
359
return main.CONNECTION_LOST
361
return main.CONNECTION_DONE
362
return self.protocol.dataReceived(data)
364
def writeSomeData(self, data):
365
"""Connection.writeSomeData(data) -> #of bytes written | CONNECTION_LOST
366
This writes as much data as possible to the socket and returns either
367
the number of bytes read (which is positive) or a connection error code
371
# Limit length of buffer to try to send, because some OSes are too
372
# stupid to do so themselves (ahem windows)
373
return self.socket.send(buffer(data, 0, self.SEND_LIMIT))
374
except socket.error, se:
375
if se.args[0] == EINTR:
376
return self.writeSomeData(data)
377
elif se.args[0] in (EWOULDBLOCK, ENOBUFS):
380
return main.CONNECTION_LOST
382
def _closeWriteConnection(self):
384
getattr(self.socket, self._socketShutdownMethod)(1)
387
p = interfaces.IHalfCloseableProtocol(self.protocol, None)
390
p.writeConnectionLost()
392
f = failure.Failure()
394
self.connectionLost(f)
396
def readConnectionLost(self, reason):
397
p = interfaces.IHalfCloseableProtocol(self.protocol, None)
400
p.readConnectionLost()
403
self.connectionLost(failure.Failure())
405
self.connectionLost(reason)
407
def connectionLost(self, reason):
408
"""See abstract.FileDescriptor.connectionLost().
410
abstract.FileDescriptor.connectionLost(self, reason)
412
protocol = self.protocol
416
protocol.connectionLost(reason)
418
logstr = "Uninitialized"
421
"""Return the prefix to log with when I own the logging thread.
425
def getTcpNoDelay(self):
426
return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
428
def setTcpNoDelay(self, enabled):
429
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
431
def getTcpKeepAlive(self):
432
return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
433
socket.SO_KEEPALIVE))
435
def setTcpKeepAlive(self, enabled):
436
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
439
classImplements(Connection, interfaces.ITLSTransport)
441
class BaseClient(Connection):
442
"""A base class for client TCP (and similiar) sockets.
444
addressFamily = socket.AF_INET
445
socketType = socket.SOCK_STREAM
447
def _finishInit(self, whenDone, skt, error, reactor):
448
"""Called by base classes to continue to next stage of initialization."""
450
Connection.__init__(self, skt, None, reactor)
451
self.doWrite = self.doConnect
452
self.doRead = self.doConnect
453
reactor.callLater(0, whenDone)
455
reactor.callLater(0, self.failIfNotConnected, error)
457
def startTLS(self, ctx, client=1):
458
holder = Connection.startTLS(self, ctx)
460
self.socket.set_connect_state()
462
self.socket.set_accept_state()
465
def stopConnecting(self):
466
"""Stop attempt to connect."""
467
self.failIfNotConnected(error.UserError())
469
def failIfNotConnected(self, err):
471
Generic method called when the attemps to connect failed. It basically
472
cleans everything it can: call connectionFailed, stop read and write,
473
delete socket related members.
475
if (self.connected or self.disconnected or
476
not hasattr(self, "connector")):
479
self.connector.connectionFailed(failure.Failure(err))
480
if hasattr(self, "reactor"):
481
# this doesn't happen if we failed in __init__
488
except AttributeError:
491
del self.socket, self.fileno
493
def createInternetSocket(self):
494
"""(internal) Create a non-blocking socket using
495
self.addressFamily, self.socketType.
497
s = socket.socket(self.addressFamily, self.socketType)
499
if fcntl and hasattr(fcntl, 'FD_CLOEXEC'):
500
old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD)
501
fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
504
def resolveAddress(self):
505
if abstract.isIPAddress(self.addr[0]):
506
self._setRealAddress(self.addr[0])
508
d = self.reactor.resolve(self.addr[0])
509
d.addCallbacks(self._setRealAddress, self.failIfNotConnected)
511
def _setRealAddress(self, address):
512
self.realAddress = (address, self.addr[1])
516
"""I connect the socket.
518
Then, call the protocol's makeConnection, and start waiting for data.
520
if not hasattr(self, "connector"):
521
# this happens when connection failed but doConnect
522
# was scheduled via a callLater in self._finishInit
525
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
527
self.failIfNotConnected(error.getConnectError((err, os.strerror(err))))
531
# doConnect gets called twice. The first time we actually need to
532
# start the connection attempt. The second time we don't really
533
# want to (SO_ERROR above will have taken care of any errors, and if
534
# it reported none, the mere fact that doConnect was called again is
535
# sufficient to indicate that the connection has succeeded), but it
536
# is not /particularly/ detrimental to do so. This should get
537
# cleaned up some day, though.
539
connectResult = self.socket.connect_ex(self.realAddress)
540
except socket.error, se:
541
connectResult = se.args[0]
543
if connectResult == EISCONN:
545
# on Windows EINVAL means sometimes that we should keep trying:
546
# http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winsock/winsock/connect_2.asp
547
elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or
548
(connectResult == EINVAL and platformType == "win32")):
553
self.failIfNotConnected(error.getConnectError((connectResult, os.strerror(connectResult))))
556
# If I have reached this point without raising or returning, that means
557
# that the socket is connected.
560
# we first stop and then start, to reset any references to the old doRead
565
def _connectDone(self):
566
self.protocol = self.connector.buildProtocol(self.getPeer())
568
self.protocol.makeConnection(self)
569
self.logstr = self.protocol.__class__.__name__+",client"
572
def connectionLost(self, reason):
573
if not self.connected:
574
self.failIfNotConnected(error.ConnectError(string=reason))
576
Connection.connectionLost(self, reason)
577
self.connector.connectionLost(reason)
580
class Client(BaseClient):
583
def __init__(self, host, port, bindAddress, connector, reactor=None):
584
# BaseClient.__init__ is invoked later
585
self.connector = connector
586
self.addr = (host, port)
588
whenDone = self.resolveAddress
593
skt = self.createInternetSocket()
594
except socket.error, se:
595
err = error.ConnectBindError(se[0], se[1])
597
if whenDone and bindAddress is not None:
599
skt.bind(bindAddress)
600
except socket.error, se:
601
err = error.ConnectBindError(se[0], se[1])
603
self._finishInit(whenDone, skt, err, reactor)
606
"""Returns an IPv4Address.
608
This indicates the address from which I am connecting.
610
return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
613
"""Returns an IPv4Address.
615
This indicates the address that I am connected to.
617
return address.IPv4Address('TCP', *(self.addr + ('INET',)))
620
s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self))
624
class Server(Connection):
625
"""Serverside socket-stream connection class.
627
I am a serverside network connection transport; a socket which came from an
628
accept() on a server.
631
def __init__(self, sock, protocol, client, server, sessionno):
632
"""Server(sock, protocol, client, server, sessionno)
634
Initialize me with a socket, a protocol, a descriptor for my peer (a
635
tuple of host, port describing the other end of the connection), an
636
instance of Port, and a session number.
638
Connection.__init__(self, sock, protocol)
641
self.sessionno = sessionno
642
self.hostname = client[0]
643
self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__, sessionno, self.hostname)
644
self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__, self.sessionno, self.server.port)
649
"""A string representation of this connection.
653
def startTLS(self, ctx, server=1):
654
holder = Connection.startTLS(self, ctx)
656
self.socket.set_accept_state()
658
self.socket.set_connect_state()
662
"""Returns an IPv4Address.
664
This indicates the server's address.
666
return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
669
"""Returns an IPv4Address.
671
This indicates the client's address.
673
return address.IPv4Address('TCP', *(self.client + ('INET',)))
675
class Port(base.BasePort, _SocketCloser):
676
"""I am a TCP server port, listening for connections.
678
When a connection is accepted, I will call my factory's buildProtocol with
679
the incoming connection as an argument, according to the specification
680
described in twisted.internet.interfaces.IProtocolFactory.
682
If you wish to change the sort of transport that will be used, my
683
`transport' attribute will be called with the signature expected for
684
Server.__init__, so it can be replaced.
687
implements(interfaces.IListeningPort)
689
addressFamily = socket.AF_INET
690
socketType = socket.SOCK_STREAM
697
# Actual port number being listened on, only set to a non-None
698
# value when we are actually listening.
699
_realPortNumber = None
701
def __init__(self, port, factory, backlog=50, interface='', reactor=None):
702
"""Initialize with a numeric port to listen on.
704
base.BasePort.__init__(self, reactor=reactor)
706
self.factory = factory
707
self.backlog = backlog
708
self.interface = interface
711
if self._realPortNumber is not None:
712
return "<%s of %s on %s>" % (self.__class__, self.factory.__class__,
713
self._realPortNumber)
715
return "<%s of %s (not listening)>" % (self.__class__, self.factory.__class__)
717
def createInternetSocket(self):
718
s = base.BasePort.createInternetSocket(self)
719
if platformType == "posix" and sys.platform != "cygwin":
720
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
723
def startListening(self):
724
"""Create and bind my socket, and begin listening on it.
726
This is called on unserialization, and must be called after creating a
727
server to begin listening on the specified port.
730
skt = self.createInternetSocket()
731
skt.bind((self.interface, self.port))
732
except socket.error, le:
733
raise CannotListenError, (self.interface, self.port, le)
735
# Make sure that if we listened on port 0, we update that to
736
# reflect what the OS actually assigned us.
737
self._realPortNumber = skt.getsockname()[1]
739
log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNumber))
741
# The order of the next 6 lines is kind of bizarre. If no one
742
# can explain it, perhaps we should re-arrange them.
743
self.factory.doStart()
744
skt.listen(self.backlog)
747
self.fileno = self.socket.fileno
748
self.numberAccepts = 100
752
def _buildAddr(self, (host, port)):
753
return address._ServerFactoryIPv4Address('TCP', host, port)
756
"""Called when my socket is ready for reading.
758
This accepts a connection and calls self.protocol() to handle the
762
if platformType == "posix":
763
numAccepts = self.numberAccepts
765
# win32 event loop breaks if we do more than one accept()
766
# in an iteration of the event loop.
768
for i in range(numAccepts):
769
# we need this so we can deal with a factory's buildProtocol
770
# calling our loseConnection
771
if self.disconnecting:
774
skt, addr = self.socket.accept()
775
except socket.error, e:
776
if e.args[0] in (EWOULDBLOCK, EAGAIN):
777
self.numberAccepts = i
779
elif e.args[0] == EPERM:
780
# Netfilter on Linux may have rejected the
781
# connection, but we get told to try to accept()
784
elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
786
# Linux gives EMFILE when a process is not allowed
787
# to allocate any more file descriptors. *BSD and
788
# Win32 give (WSA)ENOBUFS. Linux can also give
789
# ENFILE if the system is out of inodes, or ENOMEM
790
# if there is insufficient memory to allocate a new
791
# dentry. ECONNABORTED is documented as possible on
792
# both Linux and Windows, but it is not clear
793
# whether there are actually any circumstances under
794
# which it can happen (one might expect it to be
795
# possible if a client sends a FIN or RST after the
796
# server sends a SYN|ACK but before application code
797
# calls accept(2), however at least on Linux this
798
# _seems_ to be short-circuited by syncookies.
800
log.msg("Could not accept new connection (%s)" % (
801
errorcode[e.args[0]],))
805
protocol = self.factory.buildProtocol(self._buildAddr(addr))
811
transport = self.transport(skt, protocol, addr, self, s)
812
transport = self._preMakeConnection(transport)
813
protocol.makeConnection(transport)
815
self.numberAccepts = self.numberAccepts+20
817
# Note that in TLS mode, this will possibly catch SSL.Errors
818
# raised by self.socket.accept()
820
# There is no "except SSL.Error:" above because SSL may be
821
# None if there is no SSL support. In any case, all the
822
# "except SSL.Error:" suite would probably do is log.deferr()
823
# and return, so handling it here works just as well.
826
def _preMakeConnection(self, transport):
829
def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
830
"""Stop accepting connections on this port.
832
This will shut down my socket and call self.connectionLost().
833
It returns a deferred which will fire successfully when the
834
port is actually closed.
836
self.disconnecting = 1
839
self.deferred = defer.Deferred()
840
self.reactor.callLater(0, self.connectionLost, connDone)
843
stopListening = loseConnection
845
def connectionLost(self, reason):
846
"""Cleans up my socket.
848
log.msg('(Port %s Closed)' % self._realPortNumber)
849
self._realPortNumber = None
850
base.BasePort.connectionLost(self, reason)
855
self.factory.doStop()
856
if hasattr(self, "deferred"):
857
self.deferred.callback(None)
861
"""Returns the name of my class, to prefix log entries with.
863
return reflect.qual(self.factory.__class__)
866
"""Returns an IPv4Address.
868
This indicates the server's address.
870
return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
872
class Connector(base.BaseConnector):
873
def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
875
if isinstance(port, types.StringTypes):
877
port = socket.getservbyname(port, 'tcp')
878
except socket.error, e:
879
raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
881
self.bindAddress = bindAddress
882
base.BaseConnector.__init__(self, factory, timeout, reactor)
884
def _makeTransport(self):
885
return Client(self.host, self.port, self.bindAddress, self, self.reactor)
887
def getDestination(self):
888
return address.IPv4Address('TCP', self.host, self.port, 'INET')