1
# -*- test-case-name: twisted.test.test_tcp -*-
2
# Twisted, the Framework of Your Internet
3
# Copyright (C) 2001 Matthew W. Lefkowitz
5
# This library is free software; you can redistribute it and/or
6
# modify it under the terms of version 2.1 of the GNU Lesser General Public
7
# License as published by the Free Software Foundation.
9
# This library is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12
# Lesser General Public License for more details.
14
# You should have received a copy of the GNU Lesser General Public
15
# License along with this library; if not, write to the Free Software
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
"""Various asynchronous TCP/IP classes.
21
End users shouldn't use this module directly - use the reactor APIs instead.
23
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
43
from OpenSSL import SSL
48
# we hardcode these since windows actually wants e.g.
49
# WSAEALREADY rather than EALREADY. Possibly we should
50
# just be doing "from errno import WSAEALREADY as EALREADY".
60
elif os.name != 'java':
61
from errno import EPERM
62
from errno import EINVAL
63
from errno import EWOULDBLOCK
64
from errno import EINPROGRESS
65
from errno import EALREADY
66
from errno import ECONNRESET
67
from errno import EISCONN
68
from errno import ENOTCONN
69
from errno import EINTR
70
from errno import EAGAIN
73
from twisted.internet import protocol, defer, base
74
from twisted.persisted import styles
75
from twisted.python import log, failure, reflect
76
from twisted.python.runtime import platform, platformType
77
from twisted.internet.error import CannotListenError
86
writeBlockedOnRead = 0
87
readBlockedOnWrite = 0
90
def getPeerCertificate(self):
91
return self.socket.get_peer_certificate()
94
if self.writeBlockedOnRead:
95
self.writeBlockedOnRead = 0
98
return Connection.doRead(self)
99
except SSL.ZeroReturnError:
100
# close SSL layer, since other side has done so, if we haven't
101
if not self.sslShutdown:
103
self.socket.shutdown()
107
print 'losting conn1'
108
return main.CONNECTION_DONE
109
except SSL.WantReadError:
111
except SSL.WantWriteError:
112
self.readBlockedOnWrite = 1
116
print 'losting conn2'
118
return main.CONNECTION_LOST
120
def loseConnection(self):
121
Connection.loseConnection(self)
126
if self.writeBlockedOnRead:
129
if self.readBlockedOnWrite:
130
self.readBlockedOnWrite = 0
131
# XXX - This is touching internal guts bad bad bad
132
if not self.dataBuffer:
135
return Connection.doWrite(self)
137
def writeSomeData(self, data):
139
return _BufferFlushBase.writeSomeData(self, data)
140
except SSL.WantWriteError:
142
except SSL.WantReadError:
143
self.writeBlockedOnRead = 1
145
except SSL.SysCallError, e:
146
if e[0] == -1 and data == "":
147
# errors when writing empty strings are expected
151
print 'losting conn3'
152
return main.CONNECTION_LOST
154
print 'losting conn4'
156
return main.CONNECTION_LOST
159
def write(self, data):
160
"""Reliably write some data.
162
If there is no buffered data this tries to write this data immediately,
163
otherwise this adds data to be written the next time this file descriptor is
166
assert isinstance(data, str), "Data must be a string."
167
if not self.connected or not data:
169
if (not self.dataBuffer) and (self.producer is None):
170
l = self.writeSomeData(data)
172
# all data was sent, our work here is done
174
elif not isinstance(l, Exception) and l > 0:
176
self.dataBuffer = data
179
# either no data was sent, or we were disconnected.
180
# if we were disconnected we still continue, so that
181
# the event loop can figure it out later on.
182
self.dataBuffer = data
184
self.dataBuffer = self.dataBuffer + data
185
if self.producer is not None:
186
if len(self.dataBuffer) > self.bufferSize:
187
self.producerPaused = 1
188
self.producer.pauseProducing()
191
def writeSequence(self, iovec):
192
self.write("".join(iovec))
194
def _closeSocket(self):
196
self.socket.sock_shutdown(2)
204
def _postLoseConnection(self):
205
"""Gets called after loseConnection(), after buffered data is sent.
207
We close the SSL transport layer, and if the other side hasn't
208
closed it yet we start reading, waiting for a ZeroReturnError
209
which will indicate the SSL shutdown has completed.
212
done = self.socket.shutdown()
216
return main.CONNECTION_LOST
218
return main.CONNECTION_DONE
220
# we wait for other side to close SSL connection -
221
# this will be signaled by SSL.ZeroReturnError when reading
226
# don't close socket just yet
229
class _BufferFlushBase(abstract.FileDescriptor):
230
def writeSomeData(self, data):
231
"""Connection.writeSomeData(data) -> #of bytes written | CONNECTION_LOST
232
This writes as much data as possible to the socket and returns either
233
the number of bytes read (which is positive) or a connection error code
237
return self.socket.send(data)
238
except socket.error, se:
239
if se.args[0] == EINTR:
240
return self.writeSomeData(data)
241
elif se.args[0] == EWOULDBLOCK:
244
return main.CONNECTION_LOST
246
def _flattenForSSL(self):
249
class _IOVecFlushBase(abstract.FileDescriptor):
250
def _flattenForSSL(self):
251
self.dataBuffer = ''.join(self.vector)
255
def writeVector(self, vector):
256
written, errno = iovec.writev(self.fileno(), vector)
259
return self.writeVector(vector)
260
elif errno == EWOULDBLOCK:
263
log.msg("writev() failed with errno = %d" % (errno,))
264
return main.CONNECTION_LOST
269
while i < L and w >= len(vector[i]):
274
vector[0] = vector[0][w:]
278
from twisted.python import iovec
280
_FlushBase = _BufferFlushBase
282
_FlushBase = _IOVecFlushBase
284
class Connection(_FlushBase):
285
"""I am the superclass of all socket-based FileDescriptors.
287
This is an abstract superclass of all objects which represent a TCP/IP
288
connection based socket.
291
__implements__ = abstract.FileDescriptor.__implements__, interfaces.ITCPTransport
295
def __init__(self, skt, protocol, reactor=None):
296
abstract.FileDescriptor.__init__(self, reactor=reactor)
298
self.socket.setblocking(0)
299
self.fileno = skt.fileno
300
self.protocol = protocol
303
__implements__ = __implements__ + (interfaces.ITLSTransport,)
305
def startTLS(self, ctx):
310
self.socket = SSL.Connection(ctx.getContext(), self.socket)
311
self.fileno = self.socket.fileno
316
class TLSConnection(_TLSMixin, _BufferFlushBase, self.__class__):
318
self._flattenForSSL()
319
self.__class__ = TLSConnection
322
"""Calls self.protocol.dataReceived with all available data.
324
This reads up to self.bufferSize bytes of data from its socket, then
325
calls self.dataReceived(data) to process it. If the connection is not
326
lost through an error in the physical recv(), this function will return
327
the result of the dataReceived call.
330
data = self.socket.recv(self.bufferSize)
331
except socket.error, se:
332
if se.args[0] == EWOULDBLOCK:
335
return main.CONNECTION_LOST
336
except SSL.SysCallError, (retval, desc):
337
# Yes, SSL might be None, but self.socket.recv() can *only*
338
# raise socket.error, if anything else is raised, it must be an
339
# SSL socket, and so SSL can't be None. (That's my story, I'm
341
if retval == -1 and desc == 'Unexpected EOF':
342
return main.CONNECTION_DONE
345
return main.CONNECTION_DONE
346
return self.protocol.dataReceived(data)
349
def _closeSocket(self):
350
"""Called to close our socket."""
351
# This used to close() the socket, but that doesn't *really* close if
352
# there's another reference to it in the TCP/IP stack, e.g. if it was
353
# was inherited by a subprocess. And we really do want to close the
354
# connection. So we use shutdown() instead.
356
self.socket.shutdown(2)
360
def connectionLost(self, reason):
361
"""See abstract.FileDescriptor.connectionLost().
363
abstract.FileDescriptor.connectionLost(self, reason)
365
protocol = self.protocol
370
protocol.connectionLost(reason)
372
# while this may break, it will only break on deprecated code
373
# as opposed to other approaches that might've broken on
374
# code that uses the new API (e.g. inspect).
375
if e.args and e.args[0] == "connectionLost() takes exactly 1 argument (2 given)":
376
warnings.warn("Protocol %s's connectionLost should accept a reason argument" % protocol,
377
category=DeprecationWarning, stacklevel=2)
378
protocol.connectionLost()
382
logstr = "Uninitialized"
385
"""Return the prefix to log with when I own the logging thread.
389
def getTcpNoDelay(self):
390
return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
392
def setTcpNoDelay(self, enabled):
393
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
395
def getTcpKeepAlive(self):
396
return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
397
socket.SO_KEEPALIVE))
399
def setTcpKeepAlive(self, enabled):
400
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
403
class BaseClient(Connection):
404
"""A base class for client TCP (and similiar) sockets.
406
addressFamily = socket.AF_INET
407
socketType = socket.SOCK_STREAM
409
def _finishInit(self, whenDone, skt, error, reactor):
410
"""Called by base classes to continue to next stage of initialization."""
412
Connection.__init__(self, skt, None, reactor)
413
self.doWrite = self.doConnect
414
self.doRead = self.doConnect
415
reactor.callLater(0, whenDone)
417
reactor.callLater(0, self.failIfNotConnected, error)
419
def startTLS(self, ctx, client=1):
420
holder = Connection.startTLS(self, ctx)
422
self.socket.set_connect_state()
424
self.socket.set_accept_state()
427
def stopConnecting(self):
428
"""Stop attempt to connect."""
429
self.failIfNotConnected(error.UserError())
431
def failIfNotConnected(self, err):
432
if (self.connected or
434
not (hasattr(self, "connector"))):
436
self.connector.connectionFailed(failure.Failure(err))
437
if hasattr(self, "reactor"):
438
# this doesn't happens if we failed in __init__
443
def createInternetSocket(self):
444
"""(internal) Create a non-blocking socket using
445
self.addressFamily, self.socketType.
447
s = socket.socket(self.addressFamily, self.socketType)
449
if fcntl and hasattr(fcntl, 'FD_CLOEXEC'):
450
old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD)
451
fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
455
def resolveAddress(self):
456
if abstract.isIPAddress(self.addr[0]):
457
self._setRealAddress(self.addr[0])
459
d = self.reactor.resolve(self.addr[0])
460
d.addCallbacks(self._setRealAddress, self.failIfNotConnected)
462
def _setRealAddress(self, address):
463
self.realAddress = (address, self.addr[1])
467
"""I connect the socket.
469
Then, call the protocol's makeConnection, and start waiting for data.
471
if not hasattr(self, "connector"):
472
# this happens when connection failed but doConnect
473
# was scheduled via a callLater in self._finishInit
476
# on windows failed connects are reported on exception
477
# list, not write or read list.
478
if platformType == "win32":
479
r, w, e = select.select([], [], [self.fileno()], 0.0)
481
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
482
self.failIfNotConnected(error.getConnectError((err, os.strerror(err))))
486
connectResult = self.socket.connect_ex(self.realAddress)
487
except socket.error, se:
488
connectResult = se.args[0]
490
if connectResult == EISCONN:
492
# on Windows EINVAL means sometimes that we should keep trying:
493
# http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winsock/winsock/connect_2.asp
494
elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or
495
(connectResult == EINVAL and platformType == "win32")):
500
self.failIfNotConnected(error.getConnectError((connectResult, os.strerror(connectResult))))
502
# If I have reached this point without raising or returning, that means
503
# that the socket is connected.
506
# we first stop and then start, to reset any references to the old doRead
511
def _connectDone(self):
512
self.protocol = self.connector.buildProtocol(self.getPeer())
514
self.protocol.makeConnection(self)
515
self.logstr = self.protocol.__class__.__name__+",client"
518
def connectionLost(self, reason):
519
if not self.connected:
520
self.failIfNotConnected(error.ConnectError(string=reason))
522
Connection.connectionLost(self, reason)
523
self.connector.connectionLost(reason)
526
class Client(BaseClient):
529
def __init__(self, host, port, bindAddress, connector, reactor=None):
530
# BaseClient.__init__ is invoked later
531
self.connector = connector
532
self.addr = (host, port)
534
whenDone = self.resolveAddress
539
skt = self.createInternetSocket()
540
except socket.error, se:
541
err = error.ConnectBindError(se[0], se[1])
543
if whenDone and bindAddress is not None:
545
skt.bind(bindAddress)
546
except socket.error, se:
547
err = error.ConnectBindError(se[0], se[1])
549
self._finishInit(whenDone, skt, err, reactor)
552
"""Returns a tuple of ('INET', hostname, port).
554
This indicates the address from which I am connecting.
556
return ('INET',)+self.socket.getsockname()
559
"""Returns a tuple of ('INET', hostname, port).
561
This indicates the address that I am connected to.
563
return ('INET',)+self.addr
566
s = '<%s to %s at %x>' % (self.__class__, self.addr, id(self))
570
class Server(Connection):
571
"""Serverside socket-stream connection class.
573
I am a serverside network connection transport; a socket which came from an
574
accept() on a server.
577
def __init__(self, sock, protocol, client, server, sessionno):
578
"""Server(sock, protocol, client, server, sessionno)
580
Initialize me with a socket, a protocol, a descriptor for my peer (a
581
tuple of host, port describing the other end of the connection), an
582
instance of Port, and a session number.
584
Connection.__init__(self, sock, protocol)
587
self.sessionno = sessionno
588
self.hostname = client[0]
589
self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__, sessionno, self.hostname)
590
self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__, self.sessionno, self.server.port)
595
"""A string representation of this connection.
599
def startTLS(self, ctx, server=1):
600
holder = Connection.startTLS(self, ctx)
602
self.socket.set_accept_state()
604
self.socket.set_connect_state()
608
"""Returns a tuple of ('INET', hostname, port).
610
This indicates the servers address.
612
return ('INET',)+self.socket.getsockname()
616
Returns a tuple of ('INET', hostname, port), indicating the connected
619
return ('INET',)+self.client
622
class Port(base.BasePort):
623
"""I am a TCP server port, listening for connections.
625
When a connection is accepted, I will call my factory's buildProtocol with
626
the incoming connection as an argument, according to the specification
627
described in twisted.internet.interfaces.IProtocolFactory.
629
If you wish to change the sort of transport that will be used, my
630
`transport' attribute will be called with the signature expected for
631
Server.__init__, so it can be replaced.
633
addressFamily = socket.AF_INET
634
socketType = socket.SOCK_STREAM
641
def __init__(self, port, factory, backlog=5, interface='', reactor=None):
642
"""Initialize with a numeric port to listen on.
644
base.BasePort.__init__(self, reactor=reactor)
646
self.factory = factory
647
self.backlog = backlog
648
self.interface = interface
651
return "<%s on %s>" % (self.factory.__class__, self.port)
653
def createInternetSocket(self):
654
s = base.BasePort.createInternetSocket(self)
655
if platformType == "posix":
656
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
659
def startListening(self):
660
"""Create and bind my socket, and begin listening on it.
662
This is called on unserialization, and must be called after creating a
663
server to begin listening on the specified port.
665
log.msg("%s starting on %s"%(self.factory.__class__, self.port))
667
skt = self.createInternetSocket()
668
skt.bind((self.interface, self.port))
669
except socket.error, le:
670
raise CannotListenError, (self.interface, self.port, le)
671
self.factory.doStart()
672
skt.listen(self.backlog)
675
self.fileno = self.socket.fileno
676
self.numberAccepts = 100
680
"""Called when my socket is ready for reading.
682
This accepts a connection and calls self.protocol() to handle the
686
if platformType == "posix":
687
numAccepts = self.numberAccepts
689
# win32 event loop breaks if we do more than one accept()
690
# in an iteration of the event loop.
692
for i in range(numAccepts):
693
# we need this so we can deal with a factory's buildProtocol
694
# calling our loseConnection
695
if self.disconnecting:
698
skt, addr = self.socket.accept()
699
except socket.error, e:
700
if e.args[0] in (EWOULDBLOCK, EAGAIN):
701
self.numberAccepts = i
703
elif e.args[0] == EPERM:
707
protocol = self.factory.buildProtocol(addr)
713
transport = self.transport(skt, protocol, addr, self, s)
714
transport = self._preMakeConnection(transport)
715
protocol.makeConnection(transport)
717
self.numberAccepts = self.numberAccepts+20
719
# Note that in TLS mode, this will possibly catch SSL.Errors
720
# raised by self.socket.accept()
722
# There is no "except SSL.Error:" above because SSL may be
723
# None if there is no SSL support. In any case, all the
724
# "except SSL.Error:" suite would probably do is log.deferr()
725
# and return, so handling it here works just as well.
728
def _preMakeConnection(self, transport):
731
def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
732
"""Stop accepting connections on this port.
734
This will shut down my socket and call self.connectionLost().
735
It returns a deferred which will fire successfully when the
736
port is actually closed.
738
self.disconnecting = 1
741
self.deferred = defer.Deferred()
742
self.reactor.callLater(0, self.connectionLost, connDone)
745
stopListening = loseConnection
747
def connectionLost(self, reason):
748
"""Cleans up my socket.
750
log.msg('(Port %r Closed)' % self.port)
751
base.BasePort.connectionLost(self, reason)
756
self.factory.doStop()
757
if hasattr(self, "deferred"):
758
self.deferred.callback(None)
762
"""Returns the name of my class, to prefix log entries with.
764
return reflect.qual(self.factory.__class__)
767
"""Returns a tuple of ('INET', hostname, port).
769
This indicates the server's address.
771
return ('INET',)+self.socket.getsockname()
774
class Connector(base.BaseConnector):
775
def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
777
if isinstance(port, types.StringTypes):
779
port = socket.getservbyname(port, 'tcp')
780
except socket.error, e:
781
raise error.ServiceNameUnknownError(string=str(e))
783
self.bindAddress = bindAddress
784
base.BaseConnector.__init__(self, factory, timeout, reactor)
786
def _makeTransport(self):
787
return Client(self.host, self.port, self.bindAddress, self, self.reactor)
789
def getDestination(self):
790
return ('INET', self.host, self.port)