~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/internet/tcp.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# -*- test-case-name: twisted.test.test_tcp -*-
2
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
 
# See LICENSE for details.
4
 
 
5
 
 
6
 
 
7
 
"""Various asynchronous TCP/IP classes.
8
 
 
9
 
End users shouldn't use this module directly - use the reactor APIs instead.
10
 
 
11
 
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
12
 
"""
13
 
 
14
 
 
15
 
# System Imports
16
 
import os
17
 
import stat
18
 
import types
19
 
import exceptions
20
 
import socket
21
 
import sys
22
 
import select
23
 
import operator
24
 
import warnings
25
 
 
26
 
try:
27
 
    import fcntl
28
 
except ImportError:
29
 
    fcntl = None
30
 
from zope.interface import implements, classImplements
31
 
 
32
 
try:
33
 
    from OpenSSL import SSL
34
 
except ImportError:
35
 
    SSL = None
36
 
 
37
 
from twisted.python.runtime import platform, platformType
38
 
 
39
 
 
40
 
if platformType == 'win32':
41
 
    # no such thing as WSAEPERM or error code 10001 according to winsock.h or MSDN
42
 
    EPERM = object()
43
 
    from errno import WSAEINVAL as EINVAL
44
 
    from errno import WSAEWOULDBLOCK as EWOULDBLOCK
45
 
    from errno import WSAEINPROGRESS as EINPROGRESS
46
 
    from errno import WSAEALREADY as EALREADY
47
 
    from errno import WSAECONNRESET as ECONNRESET
48
 
    from errno import WSAEISCONN as EISCONN
49
 
    from errno import WSAENOTCONN as ENOTCONN
50
 
    from errno import WSAEINTR as EINTR
51
 
    from errno import WSAENOBUFS as ENOBUFS
52
 
    from errno import WSAEMFILE as EMFILE
53
 
    # No such thing as WSAENFILE, either.
54
 
    ENFILE = object()
55
 
    # Nor ENOMEM
56
 
    ENOMEM = object()
57
 
    EAGAIN = EWOULDBLOCK
58
 
    from errno import WSAECONNRESET as ECONNABORTED
59
 
else:
60
 
    from errno import EPERM
61
 
    from errno import EINVAL
62
 
    from errno import EWOULDBLOCK
63
 
    from errno import EINPROGRESS
64
 
    from errno import EALREADY
65
 
    from errno import ECONNRESET
66
 
    from errno import EISCONN
67
 
    from errno import ENOTCONN
68
 
    from errno import EINTR
69
 
    from errno import ENOBUFS
70
 
    from errno import EMFILE
71
 
    from errno import ENFILE
72
 
    from errno import ENOMEM
73
 
    from errno import EAGAIN
74
 
    from errno import ECONNABORTED
75
 
 
76
 
from errno import errorcode
77
 
 
78
 
# Twisted Imports
79
 
from twisted.internet import protocol, defer, base, address
80
 
from twisted.persisted import styles
81
 
from twisted.python import log, failure, reflect
82
 
from twisted.python.util import unsignedID
83
 
from twisted.internet.error import CannotListenError
84
 
 
85
 
# Sibling Imports
86
 
import abstract
87
 
import main
88
 
import interfaces
89
 
import error
90
 
 
91
 
class _SocketCloser:
92
 
    _socketShutdownMethod = 'shutdown'
93
 
 
94
 
    def _closeSocket(self):
95
 
        # socket.close() doesn't *really* close if there's another reference
96
 
        # to it in the TCP/IP stack, e.g. if it was was inherited by a
97
 
        # subprocess. And we really do want to close the connection. So we
98
 
        # use shutdown() instead, and then close() in order to release the
99
 
        # filedescriptor.
100
 
        skt = self.socket
101
 
        try:
102
 
            getattr(skt, self._socketShutdownMethod)(2)
103
 
        except socket.error:
104
 
            pass
105
 
        try:
106
 
            skt.close()
107
 
        except socket.error:
108
 
            pass
109
 
 
110
 
class _TLSMixin:
111
 
    _socketShutdownMethod = 'sock_shutdown'
112
 
 
113
 
    writeBlockedOnRead = 0
114
 
    readBlockedOnWrite = 0
115
 
    _userWantRead = _userWantWrite = True
116
 
    
117
 
    def getPeerCertificate(self):
118
 
        return self.socket.get_peer_certificate()
119
 
 
120
 
    def doRead(self):
121
 
        if self.writeBlockedOnRead:
122
 
            self.writeBlockedOnRead = 0
123
 
            self._resetReadWrite()
124
 
        try:
125
 
            return Connection.doRead(self)
126
 
        except SSL.ZeroReturnError:
127
 
            return main.CONNECTION_DONE
128
 
        except SSL.WantReadError:
129
 
            return
130
 
        except SSL.WantWriteError:
131
 
            self.readBlockedOnWrite = 1
132
 
            Connection.startWriting(self)
133
 
            Connection.stopReading(self)
134
 
            return
135
 
        except SSL.SysCallError, (retval, desc):
136
 
            if ((retval == -1 and desc == 'Unexpected EOF')
137
 
                or retval > 0):
138
 
                return main.CONNECTION_LOST
139
 
            log.err()
140
 
            return main.CONNECTION_LOST
141
 
        except SSL.Error, e:
142
 
            return e
143
 
 
144
 
    def doWrite(self):
145
 
        # Retry disconnecting
146
 
        if self.disconnected:
147
 
            return self._postLoseConnection()
148
 
        if self._writeDisconnected:
149
 
            return self._closeWriteConnection()
150
 
        
151
 
        if self.readBlockedOnWrite:
152
 
            self.readBlockedOnWrite = 0
153
 
            self._resetReadWrite()
154
 
        return Connection.doWrite(self)
155
 
 
156
 
    def writeSomeData(self, data):
157
 
        try:
158
 
            return Connection.writeSomeData(self, data)
159
 
        except SSL.WantWriteError:
160
 
            return 0
161
 
        except SSL.WantReadError:
162
 
            self.writeBlockedOnRead = 1
163
 
            Connection.stopWriting(self)
164
 
            Connection.startReading(self)
165
 
            return 0
166
 
        except SSL.ZeroReturnError:
167
 
            return main.CONNECTION_LOST
168
 
        except SSL.SysCallError, e:
169
 
            if e[0] == -1 and data == "":
170
 
                # errors when writing empty strings are expected
171
 
                # and can be ignored
172
 
                return 0
173
 
            else:
174
 
                return main.CONNECTION_LOST
175
 
        except SSL.Error, e:
176
 
            return e
177
 
 
178
 
    def _postLoseConnection(self):
179
 
        """Gets called after loseConnection(), after buffered data is sent.
180
 
 
181
 
        We try to send an SSL shutdown alert, but if it doesn't work, retry
182
 
        when the socket is writable.
183
 
        """
184
 
        self.disconnected=1
185
 
        if hasattr(self.socket, 'set_shutdown'):
186
 
            self.socket.set_shutdown(SSL.RECEIVED_SHUTDOWN)
187
 
        return self._sendCloseAlert()
188
 
 
189
 
    _first=False
190
 
    def _sendCloseAlert(self):
191
 
        # Okay, *THIS* is a bit complicated.
192
 
        
193
 
        # Basically, the issue is, OpenSSL seems to not actually return
194
 
        # errors from SSL_shutdown. Therefore, the only way to
195
 
        # determine if the close notification has been sent is by 
196
 
        # SSL_shutdown returning "done". However, it will not claim it's
197
 
        # done until it's both sent *and* received a shutdown notification.
198
 
 
199
 
        # I don't actually want to wait for a received shutdown
200
 
        # notification, though, so, I have to set RECEIVED_SHUTDOWN
201
 
        # before calling shutdown. Then, it'll return True once it's
202
 
        # *SENT* the shutdown.
203
 
 
204
 
        # However, RECEIVED_SHUTDOWN can't be left set, because then
205
 
        # reads will fail, breaking half close.
206
 
 
207
 
        # Also, since shutdown doesn't report errors, an empty write call is
208
 
        # done first, to try to detect if the connection has gone away.
209
 
        # (*NOT* an SSL_write call, because that fails once you've called
210
 
        # shutdown)
211
 
        try:
212
 
            os.write(self.socket.fileno(), '')
213
 
        except OSError, se:
214
 
            if se.args[0] in (EINTR, EWOULDBLOCK, ENOBUFS):
215
 
                return 0
216
 
            # Write error, socket gone
217
 
            return main.CONNECTION_LOST
218
 
 
219
 
        try:
220
 
            if hasattr(self.socket, 'set_shutdown'):
221
 
                laststate = self.socket.get_shutdown()
222
 
                self.socket.set_shutdown(laststate | SSL.RECEIVED_SHUTDOWN)
223
 
                done = self.socket.shutdown()
224
 
                if not (laststate & SSL.RECEIVED_SHUTDOWN):
225
 
                    self.socket.set_shutdown(SSL.SENT_SHUTDOWN)
226
 
            else:
227
 
                #warnings.warn("SSL connection shutdown possibly unreliable, "
228
 
                #              "please upgrade to ver 0.XX", category=UserWarning)
229
 
                self.socket.shutdown()
230
 
                done = True
231
 
        except SSL.Error, e:
232
 
            return e
233
 
 
234
 
        if done:
235
 
            self.stopWriting()
236
 
            # Note that this is tested for by identity below.
237
 
            return main.CONNECTION_DONE
238
 
        else:
239
 
            self.startWriting()
240
 
            return None
241
 
 
242
 
    def _closeWriteConnection(self):
243
 
        result = self._sendCloseAlert()
244
 
        
245
 
        if result is main.CONNECTION_DONE:
246
 
            return Connection._closeWriteConnection(self)
247
 
        
248
 
        return result
249
 
 
250
 
    def startReading(self):
251
 
        self._userWantRead = True
252
 
        if not self.readBlockedOnWrite:
253
 
            return Connection.startReading(self)
254
 
 
255
 
    def stopReading(self):
256
 
        self._userWantRead = False
257
 
        if not self.writeBlockedOnRead:
258
 
            return Connection.stopReading(self)
259
 
 
260
 
    def startWriting(self):
261
 
        self._userWantWrite = True
262
 
        if not self.writeBlockedOnRead:
263
 
            return Connection.startWriting(self)
264
 
 
265
 
    def stopWriting(self):
266
 
        self._userWantWrite = False
267
 
        if not self.readBlockedOnWrite:
268
 
            return Connection.stopWriting(self)
269
 
 
270
 
    def _resetReadWrite(self):
271
 
        # After changing readBlockedOnWrite or writeBlockedOnRead,
272
 
        # call this to reset the state to what the user requested.
273
 
        if self._userWantWrite:
274
 
            self.startWriting()
275
 
        else:
276
 
            self.stopWriting()
277
 
        
278
 
        if self._userWantRead:
279
 
            self.startReading()
280
 
        else:
281
 
            self.stopReading()
282
 
 
283
 
def _getTLSClass(klass, _existing={}):
284
 
    if klass not in _existing:
285
 
        class TLSConnection(_TLSMixin, klass):
286
 
            implements(interfaces.ISSLTransport)
287
 
        _existing[klass] = TLSConnection
288
 
    return _existing[klass]
289
 
 
290
 
class Connection(abstract.FileDescriptor, _SocketCloser):
291
 
    """I am the superclass of all socket-based FileDescriptors.
292
 
 
293
 
    This is an abstract superclass of all objects which represent a TCP/IP
294
 
    connection based socket.
295
 
    """
296
 
 
297
 
    implements(interfaces.ITCPTransport, interfaces.ISystemHandle)
298
 
 
299
 
    TLS = 0
300
 
 
301
 
    def __init__(self, skt, protocol, reactor=None):
302
 
        abstract.FileDescriptor.__init__(self, reactor=reactor)
303
 
        self.socket = skt
304
 
        self.socket.setblocking(0)
305
 
        self.fileno = skt.fileno
306
 
        self.protocol = protocol
307
 
 
308
 
    if SSL:
309
 
 
310
 
        def startTLS(self, ctx):
311
 
            assert not self.TLS
312
 
            error=False
313
 
            if self.dataBuffer or self._tempDataBuffer:
314
 
                self.dataBuffer += "".join(self._tempDataBuffer)
315
 
                self._tempDataBuffer = []
316
 
                self._tempDataLen = 0
317
 
                written = self.writeSomeData(buffer(self.dataBuffer, self.offset))
318
 
                offset = self.offset
319
 
                dataLen = len(self.dataBuffer)
320
 
                self.offset = 0
321
 
                self.dataBuffer = ""
322
 
                if isinstance(written, Exception) or (offset + written != dataLen):
323
 
                    error=True
324
 
 
325
 
 
326
 
            self.stopReading()
327
 
            self.stopWriting()
328
 
            self._startTLS()
329
 
            self.socket = SSL.Connection(ctx.getContext(), self.socket)
330
 
            self.fileno = self.socket.fileno
331
 
            self.startReading()
332
 
            if error:
333
 
                warnings.warn("startTLS with unwritten buffered data currently doesn't work right. See issue #686. Closing connection.", category=RuntimeWarning, stacklevel=2)
334
 
                self.loseConnection()
335
 
                return
336
 
 
337
 
        def _startTLS(self):
338
 
            self.TLS = 1
339
 
            self.__class__ = _getTLSClass(self.__class__)
340
 
 
341
 
    def getHandle(self):
342
 
        """Return the socket for this connection."""
343
 
        return self.socket
344
 
    
345
 
    def doRead(self):
346
 
        """Calls self.protocol.dataReceived with all available data.
347
 
 
348
 
        This reads up to self.bufferSize bytes of data from its socket, then
349
 
        calls self.dataReceived(data) to process it.  If the connection is not
350
 
        lost through an error in the physical recv(), this function will return
351
 
        the result of the dataReceived call.
352
 
        """
353
 
        try:
354
 
            data = self.socket.recv(self.bufferSize)
355
 
        except socket.error, se:
356
 
            if se.args[0] == EWOULDBLOCK:
357
 
                return
358
 
            else:
359
 
                return main.CONNECTION_LOST
360
 
        if not data:
361
 
            return main.CONNECTION_DONE
362
 
        return self.protocol.dataReceived(data)
363
 
 
364
 
    def writeSomeData(self, data):
365
 
        """Connection.writeSomeData(data) -> #of bytes written | CONNECTION_LOST
366
 
        This writes as much data as possible to the socket and returns either
367
 
        the number of bytes read (which is positive) or a connection error code
368
 
        (which is negative)
369
 
        """
370
 
        try:
371
 
            # Limit length of buffer to try to send, because some OSes are too
372
 
            # stupid to do so themselves (ahem windows)
373
 
            return self.socket.send(buffer(data, 0, self.SEND_LIMIT))
374
 
        except socket.error, se:
375
 
            if se.args[0] == EINTR:
376
 
                return self.writeSomeData(data)
377
 
            elif se.args[0] in (EWOULDBLOCK, ENOBUFS):
378
 
                return 0
379
 
            else:
380
 
                return main.CONNECTION_LOST
381
 
 
382
 
    def _closeWriteConnection(self):
383
 
        try:
384
 
            getattr(self.socket, self._socketShutdownMethod)(1)
385
 
        except socket.error:
386
 
            pass
387
 
        p = interfaces.IHalfCloseableProtocol(self.protocol, None)
388
 
        if p:
389
 
            try:
390
 
                p.writeConnectionLost()
391
 
            except:
392
 
                f = failure.Failure()
393
 
                log.err()
394
 
                self.connectionLost(f)                
395
 
 
396
 
    def readConnectionLost(self, reason):
397
 
        p = interfaces.IHalfCloseableProtocol(self.protocol, None)
398
 
        if p:
399
 
            try:
400
 
                p.readConnectionLost()
401
 
            except:
402
 
                log.err()
403
 
                self.connectionLost(failure.Failure())
404
 
        else:
405
 
            self.connectionLost(reason)
406
 
    
407
 
    def connectionLost(self, reason):
408
 
        """See abstract.FileDescriptor.connectionLost().
409
 
        """
410
 
        abstract.FileDescriptor.connectionLost(self, reason)
411
 
        self._closeSocket()
412
 
        protocol = self.protocol
413
 
        del self.protocol
414
 
        del self.socket
415
 
        del self.fileno
416
 
        protocol.connectionLost(reason)
417
 
 
418
 
    logstr = "Uninitialized"
419
 
 
420
 
    def logPrefix(self):
421
 
        """Return the prefix to log with when I own the logging thread.
422
 
        """
423
 
        return self.logstr
424
 
 
425
 
    def getTcpNoDelay(self):
426
 
        return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
427
 
 
428
 
    def setTcpNoDelay(self, enabled):
429
 
        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
430
 
 
431
 
    def getTcpKeepAlive(self):
432
 
        return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
433
 
                                                     socket.SO_KEEPALIVE))
434
 
 
435
 
    def setTcpKeepAlive(self, enabled):
436
 
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
437
 
 
438
 
if SSL:
439
 
    classImplements(Connection, interfaces.ITLSTransport)
440
 
 
441
 
class BaseClient(Connection):
442
 
    """A base class for client TCP (and similiar) sockets.
443
 
    """
444
 
    addressFamily = socket.AF_INET
445
 
    socketType = socket.SOCK_STREAM
446
 
 
447
 
    def _finishInit(self, whenDone, skt, error, reactor):
448
 
        """Called by base classes to continue to next stage of initialization."""
449
 
        if whenDone:
450
 
            Connection.__init__(self, skt, None, reactor)
451
 
            self.doWrite = self.doConnect
452
 
            self.doRead = self.doConnect
453
 
            reactor.callLater(0, whenDone)
454
 
        else:
455
 
            reactor.callLater(0, self.failIfNotConnected, error)
456
 
 
457
 
    def startTLS(self, ctx, client=1):
458
 
        holder = Connection.startTLS(self, ctx)
459
 
        if client:
460
 
            self.socket.set_connect_state()
461
 
        else:
462
 
            self.socket.set_accept_state()
463
 
        return holder
464
 
 
465
 
    def stopConnecting(self):
466
 
        """Stop attempt to connect."""
467
 
        self.failIfNotConnected(error.UserError())
468
 
 
469
 
    def failIfNotConnected(self, err):
470
 
        """
471
 
        Generic method called when the attemps to connect failed. It basically
472
 
        cleans everything it can: call connectionFailed, stop read and write,
473
 
        delete socket related members.
474
 
        """
475
 
        if (self.connected or self.disconnected or 
476
 
            not hasattr(self, "connector")):
477
 
            return
478
 
        
479
 
        self.connector.connectionFailed(failure.Failure(err))
480
 
        if hasattr(self, "reactor"):
481
 
            # this doesn't happen if we failed in __init__
482
 
            self.stopReading()
483
 
            self.stopWriting()
484
 
            del self.connector
485
 
 
486
 
        try:
487
 
            self._closeSocket()
488
 
        except AttributeError:
489
 
            pass
490
 
        else:
491
 
            del self.socket, self.fileno
492
 
 
493
 
    def createInternetSocket(self):
494
 
        """(internal) Create a non-blocking socket using
495
 
        self.addressFamily, self.socketType.
496
 
        """
497
 
        s = socket.socket(self.addressFamily, self.socketType)
498
 
        s.setblocking(0)
499
 
        if fcntl and hasattr(fcntl, 'FD_CLOEXEC'):
500
 
            old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD)
501
 
            fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
502
 
        return s
503
 
 
504
 
    def resolveAddress(self):
505
 
        if abstract.isIPAddress(self.addr[0]):
506
 
            self._setRealAddress(self.addr[0])
507
 
        else:
508
 
            d = self.reactor.resolve(self.addr[0])
509
 
            d.addCallbacks(self._setRealAddress, self.failIfNotConnected)
510
 
 
511
 
    def _setRealAddress(self, address):
512
 
        self.realAddress = (address, self.addr[1])
513
 
        self.doConnect()
514
 
 
515
 
    def doConnect(self):
516
 
        """I connect the socket.
517
 
 
518
 
        Then, call the protocol's makeConnection, and start waiting for data.
519
 
        """
520
 
        if not hasattr(self, "connector"):
521
 
            # this happens when connection failed but doConnect
522
 
            # was scheduled via a callLater in self._finishInit
523
 
            return
524
 
 
525
 
        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
526
 
        if err:
527
 
            self.failIfNotConnected(error.getConnectError((err, os.strerror(err))))
528
 
            return
529
 
 
530
 
 
531
 
        # doConnect gets called twice.  The first time we actually need to
532
 
        # start the connection attempt.  The second time we don't really
533
 
        # want to (SO_ERROR above will have taken care of any errors, and if
534
 
        # it reported none, the mere fact that doConnect was called again is
535
 
        # sufficient to indicate that the connection has succeeded), but it
536
 
        # is not /particularly/ detrimental to do so.  This should get
537
 
        # cleaned up some day, though.
538
 
        try:
539
 
            connectResult = self.socket.connect_ex(self.realAddress)
540
 
        except socket.error, se:
541
 
            connectResult = se.args[0]
542
 
        if connectResult:
543
 
            if connectResult == EISCONN:
544
 
                pass
545
 
            # on Windows EINVAL means sometimes that we should keep trying:
546
 
            # http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winsock/winsock/connect_2.asp
547
 
            elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or
548
 
                  (connectResult == EINVAL and platformType == "win32")):
549
 
                self.startReading()
550
 
                self.startWriting()
551
 
                return
552
 
            else:
553
 
                self.failIfNotConnected(error.getConnectError((connectResult, os.strerror(connectResult))))
554
 
                return
555
 
 
556
 
        # If I have reached this point without raising or returning, that means
557
 
        # that the socket is connected.
558
 
        del self.doWrite
559
 
        del self.doRead
560
 
        # we first stop and then start, to reset any references to the old doRead
561
 
        self.stopReading()
562
 
        self.stopWriting()
563
 
        self._connectDone()
564
 
 
565
 
    def _connectDone(self):
566
 
        self.protocol = self.connector.buildProtocol(self.getPeer())
567
 
        self.connected = 1
568
 
        self.protocol.makeConnection(self)
569
 
        self.logstr = self.protocol.__class__.__name__+",client"
570
 
        self.startReading()
571
 
 
572
 
    def connectionLost(self, reason):
573
 
        if not self.connected:
574
 
            self.failIfNotConnected(error.ConnectError(string=reason))
575
 
        else:
576
 
            Connection.connectionLost(self, reason)
577
 
            self.connector.connectionLost(reason)
578
 
 
579
 
 
580
 
class Client(BaseClient):
581
 
    """A TCP client."""
582
 
 
583
 
    def __init__(self, host, port, bindAddress, connector, reactor=None):
584
 
        # BaseClient.__init__ is invoked later
585
 
        self.connector = connector
586
 
        self.addr = (host, port)
587
 
 
588
 
        whenDone = self.resolveAddress
589
 
        err = None
590
 
        skt = None
591
 
 
592
 
        try:
593
 
            skt = self.createInternetSocket()
594
 
        except socket.error, se:
595
 
            err = error.ConnectBindError(se[0], se[1])
596
 
            whenDone = None
597
 
        if whenDone and bindAddress is not None:
598
 
            try:
599
 
                skt.bind(bindAddress)
600
 
            except socket.error, se:
601
 
                err = error.ConnectBindError(se[0], se[1])
602
 
                whenDone = None
603
 
        self._finishInit(whenDone, skt, err, reactor)
604
 
 
605
 
    def getHost(self):
606
 
        """Returns an IPv4Address.
607
 
 
608
 
        This indicates the address from which I am connecting.
609
 
        """
610
 
        return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
611
 
 
612
 
    def getPeer(self):
613
 
        """Returns an IPv4Address.
614
 
 
615
 
        This indicates the address that I am connected to.
616
 
        """
617
 
        return address.IPv4Address('TCP', *(self.addr + ('INET',)))
618
 
 
619
 
    def __repr__(self):
620
 
        s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self))
621
 
        return s
622
 
 
623
 
 
624
 
class Server(Connection):
625
 
    """Serverside socket-stream connection class.
626
 
 
627
 
    I am a serverside network connection transport; a socket which came from an
628
 
    accept() on a server.
629
 
    """
630
 
 
631
 
    def __init__(self, sock, protocol, client, server, sessionno):
632
 
        """Server(sock, protocol, client, server, sessionno)
633
 
 
634
 
        Initialize me with a socket, a protocol, a descriptor for my peer (a
635
 
        tuple of host, port describing the other end of the connection), an
636
 
        instance of Port, and a session number.
637
 
        """
638
 
        Connection.__init__(self, sock, protocol)
639
 
        self.server = server
640
 
        self.client = client
641
 
        self.sessionno = sessionno
642
 
        self.hostname = client[0]
643
 
        self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__, sessionno, self.hostname)
644
 
        self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__, self.sessionno, self.server.port)
645
 
        self.startReading()
646
 
        self.connected = 1
647
 
 
648
 
    def __repr__(self):
649
 
        """A string representation of this connection.
650
 
        """
651
 
        return self.repstr
652
 
 
653
 
    def startTLS(self, ctx, server=1):
654
 
        holder = Connection.startTLS(self, ctx)
655
 
        if server:
656
 
            self.socket.set_accept_state()
657
 
        else:
658
 
            self.socket.set_connect_state()
659
 
        return holder
660
 
 
661
 
    def getHost(self):
662
 
        """Returns an IPv4Address.
663
 
 
664
 
        This indicates the server's address.
665
 
        """
666
 
        return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
667
 
 
668
 
    def getPeer(self):
669
 
        """Returns an IPv4Address.
670
 
 
671
 
        This indicates the client's address.
672
 
        """
673
 
        return address.IPv4Address('TCP', *(self.client + ('INET',)))
674
 
 
675
 
class Port(base.BasePort, _SocketCloser):
676
 
    """I am a TCP server port, listening for connections.
677
 
 
678
 
    When a connection is accepted, I will call my factory's buildProtocol with
679
 
    the incoming connection as an argument, according to the specification
680
 
    described in twisted.internet.interfaces.IProtocolFactory.
681
 
 
682
 
    If you wish to change the sort of transport that will be used, my
683
 
    `transport' attribute will be called with the signature expected for
684
 
    Server.__init__, so it can be replaced.
685
 
    """
686
 
 
687
 
    implements(interfaces.IListeningPort)
688
 
 
689
 
    addressFamily = socket.AF_INET
690
 
    socketType = socket.SOCK_STREAM
691
 
 
692
 
    transport = Server
693
 
    sessionno = 0
694
 
    interface = ''
695
 
    backlog = 50
696
 
 
697
 
    # Actual port number being listened on, only set to a non-None
698
 
    # value when we are actually listening.
699
 
    _realPortNumber = None
700
 
 
701
 
    def __init__(self, port, factory, backlog=50, interface='', reactor=None):
702
 
        """Initialize with a numeric port to listen on.
703
 
        """
704
 
        base.BasePort.__init__(self, reactor=reactor)
705
 
        self.port = port
706
 
        self.factory = factory
707
 
        self.backlog = backlog
708
 
        self.interface = interface
709
 
 
710
 
    def __repr__(self):
711
 
        if self._realPortNumber is not None:
712
 
            return "<%s of %s on %s>" % (self.__class__, self.factory.__class__,
713
 
                                         self._realPortNumber)
714
 
        else:
715
 
            return "<%s of %s (not listening)>" % (self.__class__, self.factory.__class__)
716
 
 
717
 
    def createInternetSocket(self):
718
 
        s = base.BasePort.createInternetSocket(self)
719
 
        if platformType == "posix" and sys.platform != "cygwin":
720
 
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
721
 
        return s
722
 
 
723
 
    def startListening(self):
724
 
        """Create and bind my socket, and begin listening on it.
725
 
 
726
 
        This is called on unserialization, and must be called after creating a
727
 
        server to begin listening on the specified port.
728
 
        """
729
 
        try:
730
 
            skt = self.createInternetSocket()
731
 
            skt.bind((self.interface, self.port))
732
 
        except socket.error, le:
733
 
            raise CannotListenError, (self.interface, self.port, le)
734
 
 
735
 
        # Make sure that if we listened on port 0, we update that to
736
 
        # reflect what the OS actually assigned us.
737
 
        self._realPortNumber = skt.getsockname()[1]
738
 
 
739
 
        log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNumber))
740
 
 
741
 
        # The order of the next 6 lines is kind of bizarre.  If no one
742
 
        # can explain it, perhaps we should re-arrange them.
743
 
        self.factory.doStart()
744
 
        skt.listen(self.backlog)
745
 
        self.connected = 1
746
 
        self.socket = skt
747
 
        self.fileno = self.socket.fileno
748
 
        self.numberAccepts = 100
749
 
 
750
 
        self.startReading()
751
 
 
752
 
    def _buildAddr(self, (host, port)):
753
 
        return address._ServerFactoryIPv4Address('TCP', host, port)
754
 
 
755
 
    def doRead(self):
756
 
        """Called when my socket is ready for reading.
757
 
 
758
 
        This accepts a connection and calls self.protocol() to handle the
759
 
        wire-level protocol.
760
 
        """
761
 
        try:
762
 
            if platformType == "posix":
763
 
                numAccepts = self.numberAccepts
764
 
            else:
765
 
                # win32 event loop breaks if we do more than one accept()
766
 
                # in an iteration of the event loop.
767
 
                numAccepts = 1
768
 
            for i in range(numAccepts):
769
 
                # we need this so we can deal with a factory's buildProtocol
770
 
                # calling our loseConnection
771
 
                if self.disconnecting:
772
 
                    return
773
 
                try:
774
 
                    skt, addr = self.socket.accept()
775
 
                except socket.error, e:
776
 
                    if e.args[0] in (EWOULDBLOCK, EAGAIN):
777
 
                        self.numberAccepts = i
778
 
                        break
779
 
                    elif e.args[0] == EPERM:
780
 
                        # Netfilter on Linux may have rejected the
781
 
                        # connection, but we get told to try to accept()
782
 
                        # anyway.
783
 
                        continue
784
 
                    elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
785
 
 
786
 
                        # Linux gives EMFILE when a process is not allowed
787
 
                        # to allocate any more file descriptors.  *BSD and
788
 
                        # Win32 give (WSA)ENOBUFS.  Linux can also give
789
 
                        # ENFILE if the system is out of inodes, or ENOMEM
790
 
                        # if there is insufficient memory to allocate a new
791
 
                        # dentry.  ECONNABORTED is documented as possible on
792
 
                        # both Linux and Windows, but it is not clear
793
 
                        # whether there are actually any circumstances under
794
 
                        # which it can happen (one might expect it to be
795
 
                        # possible if a client sends a FIN or RST after the
796
 
                        # server sends a SYN|ACK but before application code
797
 
                        # calls accept(2), however at least on Linux this
798
 
                        # _seems_ to be short-circuited by syncookies.
799
 
 
800
 
                        log.msg("Could not accept new connection (%s)" % (
801
 
                            errorcode[e.args[0]],))
802
 
                        break
803
 
                    raise
804
 
 
805
 
                protocol = self.factory.buildProtocol(self._buildAddr(addr))
806
 
                if protocol is None:
807
 
                    skt.close()
808
 
                    continue
809
 
                s = self.sessionno
810
 
                self.sessionno = s+1
811
 
                transport = self.transport(skt, protocol, addr, self, s)
812
 
                transport = self._preMakeConnection(transport)
813
 
                protocol.makeConnection(transport)
814
 
            else:
815
 
                self.numberAccepts = self.numberAccepts+20
816
 
        except:
817
 
            # Note that in TLS mode, this will possibly catch SSL.Errors
818
 
            # raised by self.socket.accept()
819
 
            #
820
 
            # There is no "except SSL.Error:" above because SSL may be
821
 
            # None if there is no SSL support.  In any case, all the
822
 
            # "except SSL.Error:" suite would probably do is log.deferr()
823
 
            # and return, so handling it here works just as well.
824
 
            log.deferr()
825
 
 
826
 
    def _preMakeConnection(self, transport):
827
 
        return transport
828
 
 
829
 
    def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
830
 
        """Stop accepting connections on this port.
831
 
 
832
 
        This will shut down my socket and call self.connectionLost().
833
 
        It returns a deferred which will fire successfully when the
834
 
        port is actually closed.
835
 
        """
836
 
        self.disconnecting = 1
837
 
        self.stopReading()
838
 
        if self.connected:
839
 
            self.deferred = defer.Deferred()
840
 
            self.reactor.callLater(0, self.connectionLost, connDone)
841
 
            return self.deferred
842
 
 
843
 
    stopListening = loseConnection
844
 
 
845
 
    def connectionLost(self, reason):
846
 
        """Cleans up my socket.
847
 
        """
848
 
        log.msg('(Port %s Closed)' % self._realPortNumber)
849
 
        self._realPortNumber = None
850
 
        base.BasePort.connectionLost(self, reason)
851
 
        self.connected = 0
852
 
        self._closeSocket()
853
 
        del self.socket
854
 
        del self.fileno
855
 
        self.factory.doStop()
856
 
        if hasattr(self, "deferred"):
857
 
            self.deferred.callback(None)
858
 
            del self.deferred
859
 
 
860
 
    def logPrefix(self):
861
 
        """Returns the name of my class, to prefix log entries with.
862
 
        """
863
 
        return reflect.qual(self.factory.__class__)
864
 
 
865
 
    def getHost(self):
866
 
        """Returns an IPv4Address.
867
 
 
868
 
        This indicates the server's address.
869
 
        """
870
 
        return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
871
 
 
872
 
class Connector(base.BaseConnector):
873
 
    def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
874
 
        self.host = host
875
 
        if isinstance(port, types.StringTypes):
876
 
            try:
877
 
                port = socket.getservbyname(port, 'tcp')
878
 
            except socket.error, e:
879
 
                raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
880
 
        self.port = port
881
 
        self.bindAddress = bindAddress
882
 
        base.BaseConnector.__init__(self, factory, timeout, reactor)
883
 
 
884
 
    def _makeTransport(self):
885
 
        return Client(self.host, self.port, self.bindAddress, self, self.reactor)
886
 
 
887
 
    def getDestination(self):
888
 
        return address.IPv4Address('TCP', self.host, self.port, 'INET')