~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/tcp.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_tcp -*-
 
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
Various asynchronous TCP/IP classes.
 
7
 
 
8
End users shouldn't use this module directly - use the reactor APIs instead.
 
9
 
 
10
Maintainer: Itamar Shtull-Trauring
 
11
"""
 
12
 
 
13
 
 
14
# System Imports
 
15
import os
 
16
import types
 
17
import socket
 
18
import sys
 
19
import operator
 
20
 
 
21
from zope.interface import implements, classImplements
 
22
 
 
23
try:
 
24
    from OpenSSL import SSL
 
25
except ImportError:
 
26
    SSL = None
 
27
 
 
28
from twisted.python.runtime import platformType
 
29
 
 
30
 
 
31
if platformType == 'win32':
 
32
    # no such thing as WSAEPERM or error code 10001 according to winsock.h or MSDN
 
33
    EPERM = object()
 
34
    from errno import WSAEINVAL as EINVAL
 
35
    from errno import WSAEWOULDBLOCK as EWOULDBLOCK
 
36
    from errno import WSAEINPROGRESS as EINPROGRESS
 
37
    from errno import WSAEALREADY as EALREADY
 
38
    from errno import WSAECONNRESET as ECONNRESET
 
39
    from errno import WSAEISCONN as EISCONN
 
40
    from errno import WSAENOTCONN as ENOTCONN
 
41
    from errno import WSAEINTR as EINTR
 
42
    from errno import WSAENOBUFS as ENOBUFS
 
43
    from errno import WSAEMFILE as EMFILE
 
44
    # No such thing as WSAENFILE, either.
 
45
    ENFILE = object()
 
46
    # Nor ENOMEM
 
47
    ENOMEM = object()
 
48
    EAGAIN = EWOULDBLOCK
 
49
    from errno import WSAECONNRESET as ECONNABORTED
 
50
 
 
51
    from twisted.python.win32 import formatError as strerror
 
52
else:
 
53
    from errno import EPERM
 
54
    from errno import EINVAL
 
55
    from errno import EWOULDBLOCK
 
56
    from errno import EINPROGRESS
 
57
    from errno import EALREADY
 
58
    from errno import ECONNRESET
 
59
    from errno import EISCONN
 
60
    from errno import ENOTCONN
 
61
    from errno import EINTR
 
62
    from errno import ENOBUFS
 
63
    from errno import EMFILE
 
64
    from errno import ENFILE
 
65
    from errno import ENOMEM
 
66
    from errno import EAGAIN
 
67
    from errno import ECONNABORTED
 
68
 
 
69
    from os import strerror
 
70
 
 
71
from errno import errorcode
 
72
 
 
73
# Twisted Imports
 
74
from twisted.internet import defer, base, address, fdesc
 
75
from twisted.internet.task import deferLater
 
76
from twisted.python import log, failure, reflect
 
77
from twisted.python.util import unsignedID
 
78
from twisted.internet.error import CannotListenError
 
79
from twisted.internet import abstract, main, interfaces, error
 
80
 
 
81
 
 
82
 
 
83
class _SocketCloser:
 
84
    _socketShutdownMethod = 'shutdown'
 
85
 
 
86
    def _closeSocket(self):
 
87
        # socket.close() doesn't *really* close if there's another reference
 
88
        # to it in the TCP/IP stack, e.g. if it was was inherited by a
 
89
        # subprocess. And we really do want to close the connection. So we
 
90
        # use shutdown() instead, and then close() in order to release the
 
91
        # filedescriptor.
 
92
        skt = self.socket
 
93
        try:
 
94
            getattr(skt, self._socketShutdownMethod)(2)
 
95
        except socket.error:
 
96
            pass
 
97
        try:
 
98
            skt.close()
 
99
        except socket.error:
 
100
            pass
 
101
 
 
102
 
 
103
 
 
104
class _TLSMixin:
 
105
    _socketShutdownMethod = 'sock_shutdown'
 
106
 
 
107
    writeBlockedOnRead = 0
 
108
    readBlockedOnWrite = 0
 
109
    _userWantRead = _userWantWrite = True
 
110
 
 
111
    def getPeerCertificate(self):
 
112
        return self.socket.get_peer_certificate()
 
113
 
 
114
    def doRead(self):
 
115
        if self.disconnected:
 
116
            # See the comment in the similar check in doWrite below.
 
117
            # Additionally, in order for anything other than returning
 
118
            # CONNECTION_DONE here to make sense, it will probably be necessary
 
119
            # to implement a way to switch back to TCP from TLS (actually, if
 
120
            # we did something other than return CONNECTION_DONE, that would be
 
121
            # a big part of implementing that feature).  In other words, the
 
122
            # expectation is that doRead will be called when self.disconnected
 
123
            # is True only when the connection has been lost.  It's possible
 
124
            # that the other end could stop speaking TLS and then send us some
 
125
            # non-TLS data.  We'll end up ignoring that data and dropping the
 
126
            # connection.  There's no unit tests for this check in the cases
 
127
            # where it makes a difference.  The test suite only hits this
 
128
            # codepath when it would have otherwise hit the SSL.ZeroReturnError
 
129
            # exception handler below, which has exactly the same behavior as
 
130
            # this conditional.  Maybe that's the only case that can ever be
 
131
            # triggered, I'm not sure.  -exarkun
 
132
            return main.CONNECTION_DONE
 
133
        if self.writeBlockedOnRead:
 
134
            self.writeBlockedOnRead = 0
 
135
            self._resetReadWrite()
 
136
        try:
 
137
            return Connection.doRead(self)
 
138
        except SSL.ZeroReturnError:
 
139
            return main.CONNECTION_DONE
 
140
        except SSL.WantReadError:
 
141
            return
 
142
        except SSL.WantWriteError:
 
143
            self.readBlockedOnWrite = 1
 
144
            Connection.startWriting(self)
 
145
            Connection.stopReading(self)
 
146
            return
 
147
        except SSL.SysCallError, (retval, desc):
 
148
            if ((retval == -1 and desc == 'Unexpected EOF')
 
149
                or retval > 0):
 
150
                return main.CONNECTION_LOST
 
151
            log.err()
 
152
            return main.CONNECTION_LOST
 
153
        except SSL.Error, e:
 
154
            return e
 
155
 
 
156
    def doWrite(self):
 
157
        # Retry disconnecting
 
158
        if self.disconnected:
 
159
            # This case is triggered when "disconnected" is set to True by a
 
160
            # call to _postLoseConnection from FileDescriptor.doWrite (to which
 
161
            # we upcall at the end of this overridden version of that API).  It
 
162
            # means that while, as far as any protocol connected to this
 
163
            # transport is concerned, the connection no longer exists, the
 
164
            # connection *does* actually still exist.  Instead of closing the
 
165
            # connection in the overridden _postLoseConnection, we probably
 
166
            # tried (and failed) to send a TLS close alert.  The TCP connection
 
167
            # is still up and we're waiting for the socket to become writeable
 
168
            # enough for the TLS close alert to actually be sendable.  Only
 
169
            # then will the connection actually be torn down. -exarkun
 
170
            return self._postLoseConnection()
 
171
        if self._writeDisconnected:
 
172
            return self._closeWriteConnection()
 
173
 
 
174
        if self.readBlockedOnWrite:
 
175
            self.readBlockedOnWrite = 0
 
176
            self._resetReadWrite()
 
177
        return Connection.doWrite(self)
 
178
 
 
179
    def writeSomeData(self, data):
 
180
        try:
 
181
            return Connection.writeSomeData(self, data)
 
182
        except SSL.WantWriteError:
 
183
            return 0
 
184
        except SSL.WantReadError:
 
185
            self.writeBlockedOnRead = 1
 
186
            Connection.stopWriting(self)
 
187
            Connection.startReading(self)
 
188
            return 0
 
189
        except SSL.ZeroReturnError:
 
190
            return main.CONNECTION_LOST
 
191
        except SSL.SysCallError, e:
 
192
            if e[0] == -1 and data == "":
 
193
                # errors when writing empty strings are expected
 
194
                # and can be ignored
 
195
                return 0
 
196
            else:
 
197
                return main.CONNECTION_LOST
 
198
        except SSL.Error, e:
 
199
            return e
 
200
 
 
201
 
 
202
    def _postLoseConnection(self):
 
203
        """
 
204
        Gets called after loseConnection(), after buffered data is sent.
 
205
 
 
206
        We try to send an SSL shutdown alert, but if it doesn't work, retry
 
207
        when the socket is writable.
 
208
        """
 
209
        # Here, set "disconnected" to True to trick higher levels into thinking
 
210
        # the connection is really gone.  It's not, and we're not going to
 
211
        # close it yet.  Instead, we'll try to send a TLS close alert to shut
 
212
        # down the TLS connection cleanly.  Only after we actually get the
 
213
        # close alert into the socket will we disconnect the underlying TCP
 
214
        # connection.
 
215
        self.disconnected = True
 
216
        if hasattr(self.socket, 'set_shutdown'):
 
217
            # If possible, mark the state of the TLS connection as having
 
218
            # already received a TLS close alert from the peer.  Why do
 
219
            # this???
 
220
            self.socket.set_shutdown(SSL.RECEIVED_SHUTDOWN)
 
221
        return self._sendCloseAlert()
 
222
 
 
223
 
 
224
    def _sendCloseAlert(self):
 
225
        # Okay, *THIS* is a bit complicated.
 
226
 
 
227
        # Basically, the issue is, OpenSSL seems to not actually return
 
228
        # errors from SSL_shutdown. Therefore, the only way to
 
229
        # determine if the close notification has been sent is by
 
230
        # SSL_shutdown returning "done". However, it will not claim it's
 
231
        # done until it's both sent *and* received a shutdown notification.
 
232
 
 
233
        # I don't actually want to wait for a received shutdown
 
234
        # notification, though, so, I have to set RECEIVED_SHUTDOWN
 
235
        # before calling shutdown. Then, it'll return True once it's
 
236
        # *SENT* the shutdown.
 
237
 
 
238
        # However, RECEIVED_SHUTDOWN can't be left set, because then
 
239
        # reads will fail, breaking half close.
 
240
 
 
241
        # Also, since shutdown doesn't report errors, an empty write call is
 
242
        # done first, to try to detect if the connection has gone away.
 
243
        # (*NOT* an SSL_write call, because that fails once you've called
 
244
        # shutdown)
 
245
        try:
 
246
            os.write(self.socket.fileno(), '')
 
247
        except OSError, se:
 
248
            if se.args[0] in (EINTR, EWOULDBLOCK, ENOBUFS):
 
249
                return 0
 
250
            # Write error, socket gone
 
251
            return main.CONNECTION_LOST
 
252
 
 
253
        try:
 
254
            if hasattr(self.socket, 'set_shutdown'):
 
255
                laststate = self.socket.get_shutdown()
 
256
                self.socket.set_shutdown(laststate | SSL.RECEIVED_SHUTDOWN)
 
257
                done = self.socket.shutdown()
 
258
                if not (laststate & SSL.RECEIVED_SHUTDOWN):
 
259
                    self.socket.set_shutdown(SSL.SENT_SHUTDOWN)
 
260
            else:
 
261
                #warnings.warn("SSL connection shutdown possibly unreliable, "
 
262
                #              "please upgrade to ver 0.XX", category=UserWarning)
 
263
                self.socket.shutdown()
 
264
                done = True
 
265
        except SSL.Error, e:
 
266
            return e
 
267
 
 
268
        if done:
 
269
            self.stopWriting()
 
270
            # Note that this is tested for by identity below.
 
271
            return main.CONNECTION_DONE
 
272
        else:
 
273
            # For some reason, the close alert wasn't sent.  Start writing
 
274
            # again so that we'll get another chance to send it.
 
275
            self.startWriting()
 
276
            # On Linux, select will sometimes not report a closed file
 
277
            # descriptor in the write set (in particular, it seems that if a
 
278
            # send() fails with EPIPE, the socket will not appear in the write
 
279
            # set).  The shutdown call above (which calls down to SSL_shutdown)
 
280
            # may have swallowed a write error.  Therefore, also start reading
 
281
            # so that if the socket is closed we will notice.  This doesn't
 
282
            # seem to be a problem for poll (because poll reports errors
 
283
            # separately) or with select on BSD (presumably because, unlike
 
284
            # Linux, it doesn't implement select in terms of poll and then map
 
285
            # POLLHUP to select's in fd_set).
 
286
            self.startReading()
 
287
            return None
 
288
 
 
289
    def _closeWriteConnection(self):
 
290
        result = self._sendCloseAlert()
 
291
 
 
292
        if result is main.CONNECTION_DONE:
 
293
            return Connection._closeWriteConnection(self)
 
294
 
 
295
        return result
 
296
 
 
297
    def startReading(self):
 
298
        self._userWantRead = True
 
299
        if not self.readBlockedOnWrite:
 
300
            return Connection.startReading(self)
 
301
 
 
302
    def stopReading(self):
 
303
        self._userWantRead = False
 
304
        if not self.writeBlockedOnRead:
 
305
            return Connection.stopReading(self)
 
306
 
 
307
    def startWriting(self):
 
308
        self._userWantWrite = True
 
309
        if not self.writeBlockedOnRead:
 
310
            return Connection.startWriting(self)
 
311
 
 
312
    def stopWriting(self):
 
313
        self._userWantWrite = False
 
314
        if not self.readBlockedOnWrite:
 
315
            return Connection.stopWriting(self)
 
316
 
 
317
    def _resetReadWrite(self):
 
318
        # After changing readBlockedOnWrite or writeBlockedOnRead,
 
319
        # call this to reset the state to what the user requested.
 
320
        if self._userWantWrite:
 
321
            self.startWriting()
 
322
        else:
 
323
            self.stopWriting()
 
324
 
 
325
        if self._userWantRead:
 
326
            self.startReading()
 
327
        else:
 
328
            self.stopReading()
 
329
 
 
330
 
 
331
 
 
332
class _TLSDelayed(object):
 
333
    """
 
334
    State tracking record for TLS startup parameters.  Used to remember how
 
335
    TLS should be started when starting it is delayed to wait for the output
 
336
    buffer to be flushed.
 
337
 
 
338
    @ivar bufferedData: A C{list} which contains all the data which was
 
339
        written to the transport after an attempt to start TLS was made but
 
340
        before the buffers outstanding at that time could be flushed and TLS
 
341
        could really be started.  This is appended to by the transport's
 
342
        write and writeSequence methods until it is possible to actually
 
343
        start TLS, then it is written to the TLS-enabled transport.
 
344
 
 
345
    @ivar context: An SSL context factory object to use to start TLS.
 
346
 
 
347
    @ivar extra: An extra argument to pass to the transport's C{startTLS}
 
348
        method.
 
349
    """
 
350
    def __init__(self, bufferedData, context, extra):
 
351
        self.bufferedData = bufferedData
 
352
        self.context = context
 
353
        self.extra = extra
 
354
 
 
355
 
 
356
 
 
357
def _getTLSClass(klass, _existing={}):
 
358
    if klass not in _existing:
 
359
        class TLSConnection(_TLSMixin, klass):
 
360
            implements(interfaces.ISSLTransport)
 
361
        _existing[klass] = TLSConnection
 
362
    return _existing[klass]
 
363
 
 
364
 
 
365
 
 
366
class Connection(abstract.FileDescriptor, _SocketCloser):
 
367
    """
 
368
    Superclass of all socket-based FileDescriptors.
 
369
 
 
370
    This is an abstract superclass of all objects which represent a TCP/IP
 
371
    connection based socket.
 
372
 
 
373
    @ivar logstr: prefix used when logging events related to this connection.
 
374
    @type logstr: C{str}
 
375
    """
 
376
 
 
377
    implements(interfaces.ITCPTransport, interfaces.ISystemHandle)
 
378
 
 
379
    TLS = 0
 
380
 
 
381
    def __init__(self, skt, protocol, reactor=None):
 
382
        abstract.FileDescriptor.__init__(self, reactor=reactor)
 
383
        self.socket = skt
 
384
        self.socket.setblocking(0)
 
385
        self.fileno = skt.fileno
 
386
        self.protocol = protocol
 
387
 
 
388
    if SSL:
 
389
        _tlsWaiting = None
 
390
        def startTLS(self, ctx, extra):
 
391
            assert not self.TLS
 
392
            if self.dataBuffer or self._tempDataBuffer:
 
393
                # pre-TLS bytes are still being written.  Starting TLS now
 
394
                # will do the wrong thing.  Instead, mark that we're trying
 
395
                # to go into the TLS state.
 
396
                self._tlsWaiting = _TLSDelayed([], ctx, extra)
 
397
                return False
 
398
 
 
399
            self.stopReading()
 
400
            self.stopWriting()
 
401
            self._startTLS()
 
402
            self.socket = SSL.Connection(ctx.getContext(), self.socket)
 
403
            self.fileno = self.socket.fileno
 
404
            self.startReading()
 
405
            return True
 
406
 
 
407
 
 
408
        def _startTLS(self):
 
409
            self.TLS = 1
 
410
            self.__class__ = _getTLSClass(self.__class__)
 
411
 
 
412
 
 
413
        def write(self, bytes):
 
414
            if self._tlsWaiting is not None:
 
415
                self._tlsWaiting.bufferedData.append(bytes)
 
416
            else:
 
417
                abstract.FileDescriptor.write(self, bytes)
 
418
 
 
419
 
 
420
        def writeSequence(self, iovec):
 
421
            if self._tlsWaiting is not None:
 
422
                self._tlsWaiting.bufferedData.extend(iovec)
 
423
            else:
 
424
                abstract.FileDescriptor.writeSequence(self, iovec)
 
425
 
 
426
 
 
427
        def doWrite(self):
 
428
            result = abstract.FileDescriptor.doWrite(self)
 
429
            if self._tlsWaiting is not None:
 
430
                if not self.dataBuffer and not self._tempDataBuffer:
 
431
                    waiting = self._tlsWaiting
 
432
                    self._tlsWaiting = None
 
433
                    self.startTLS(waiting.context, waiting.extra)
 
434
                    self.writeSequence(waiting.bufferedData)
 
435
            return result
 
436
 
 
437
 
 
438
    def getHandle(self):
 
439
        """Return the socket for this connection."""
 
440
        return self.socket
 
441
 
 
442
 
 
443
    def doRead(self):
 
444
        """Calls self.protocol.dataReceived with all available data.
 
445
 
 
446
        This reads up to self.bufferSize bytes of data from its socket, then
 
447
        calls self.dataReceived(data) to process it.  If the connection is not
 
448
        lost through an error in the physical recv(), this function will return
 
449
        the result of the dataReceived call.
 
450
        """
 
451
        try:
 
452
            data = self.socket.recv(self.bufferSize)
 
453
        except socket.error, se:
 
454
            if se.args[0] == EWOULDBLOCK:
 
455
                return
 
456
            else:
 
457
                return main.CONNECTION_LOST
 
458
        if not data:
 
459
            return main.CONNECTION_DONE
 
460
        return self.protocol.dataReceived(data)
 
461
 
 
462
 
 
463
    def writeSomeData(self, data):
 
464
        """
 
465
        Write as much as possible of the given data to this TCP connection.
 
466
 
 
467
        This sends up to C{self.SEND_LIMIT} bytes from C{data}.  If the
 
468
        connection is lost, an exception is returned.  Otherwise, the number
 
469
        of bytes successfully written is returned.
 
470
        """
 
471
        try:
 
472
            # Limit length of buffer to try to send, because some OSes are too
 
473
            # stupid to do so themselves (ahem windows)
 
474
            return self.socket.send(buffer(data, 0, self.SEND_LIMIT))
 
475
        except socket.error, se:
 
476
            if se.args[0] == EINTR:
 
477
                return self.writeSomeData(data)
 
478
            elif se.args[0] in (EWOULDBLOCK, ENOBUFS):
 
479
                return 0
 
480
            else:
 
481
                return main.CONNECTION_LOST
 
482
 
 
483
 
 
484
    def _closeWriteConnection(self):
 
485
        try:
 
486
            getattr(self.socket, self._socketShutdownMethod)(1)
 
487
        except socket.error:
 
488
            pass
 
489
        p = interfaces.IHalfCloseableProtocol(self.protocol, None)
 
490
        if p:
 
491
            try:
 
492
                p.writeConnectionLost()
 
493
            except:
 
494
                f = failure.Failure()
 
495
                log.err()
 
496
                self.connectionLost(f)
 
497
 
 
498
 
 
499
    def readConnectionLost(self, reason):
 
500
        p = interfaces.IHalfCloseableProtocol(self.protocol, None)
 
501
        if p:
 
502
            try:
 
503
                p.readConnectionLost()
 
504
            except:
 
505
                log.err()
 
506
                self.connectionLost(failure.Failure())
 
507
        else:
 
508
            self.connectionLost(reason)
 
509
 
 
510
    def connectionLost(self, reason):
 
511
        """See abstract.FileDescriptor.connectionLost().
 
512
        """
 
513
        abstract.FileDescriptor.connectionLost(self, reason)
 
514
        self._closeSocket()
 
515
        protocol = self.protocol
 
516
        del self.protocol
 
517
        del self.socket
 
518
        del self.fileno
 
519
        protocol.connectionLost(reason)
 
520
 
 
521
    logstr = "Uninitialized"
 
522
 
 
523
    def logPrefix(self):
 
524
        """Return the prefix to log with when I own the logging thread.
 
525
        """
 
526
        return self.logstr
 
527
 
 
528
    def getTcpNoDelay(self):
 
529
        return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
 
530
 
 
531
    def setTcpNoDelay(self, enabled):
 
532
        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
 
533
 
 
534
    def getTcpKeepAlive(self):
 
535
        return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
 
536
                                                     socket.SO_KEEPALIVE))
 
537
 
 
538
    def setTcpKeepAlive(self, enabled):
 
539
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
 
540
 
 
541
if SSL:
 
542
    classImplements(Connection, interfaces.ITLSTransport)
 
543
 
 
544
class BaseClient(Connection):
 
545
    """A base class for client TCP (and similiar) sockets.
 
546
    """
 
547
    addressFamily = socket.AF_INET
 
548
    socketType = socket.SOCK_STREAM
 
549
 
 
550
    def _finishInit(self, whenDone, skt, error, reactor):
 
551
        """Called by base classes to continue to next stage of initialization."""
 
552
        if whenDone:
 
553
            Connection.__init__(self, skt, None, reactor)
 
554
            self.doWrite = self.doConnect
 
555
            self.doRead = self.doConnect
 
556
            reactor.callLater(0, whenDone)
 
557
        else:
 
558
            reactor.callLater(0, self.failIfNotConnected, error)
 
559
 
 
560
    def startTLS(self, ctx, client=1):
 
561
        if Connection.startTLS(self, ctx, client):
 
562
            if client:
 
563
                self.socket.set_connect_state()
 
564
            else:
 
565
                self.socket.set_accept_state()
 
566
 
 
567
 
 
568
    def stopConnecting(self):
 
569
        """Stop attempt to connect."""
 
570
        self.failIfNotConnected(error.UserError())
 
571
 
 
572
    def failIfNotConnected(self, err):
 
573
        """
 
574
        Generic method called when the attemps to connect failed. It basically
 
575
        cleans everything it can: call connectionFailed, stop read and write,
 
576
        delete socket related members.
 
577
        """
 
578
        if (self.connected or self.disconnected or
 
579
            not hasattr(self, "connector")):
 
580
            return
 
581
 
 
582
        self.connector.connectionFailed(failure.Failure(err))
 
583
        if hasattr(self, "reactor"):
 
584
            # this doesn't happen if we failed in __init__
 
585
            self.stopReading()
 
586
            self.stopWriting()
 
587
            del self.connector
 
588
 
 
589
        try:
 
590
            self._closeSocket()
 
591
        except AttributeError:
 
592
            pass
 
593
        else:
 
594
            del self.socket, self.fileno
 
595
 
 
596
    def createInternetSocket(self):
 
597
        """(internal) Create a non-blocking socket using
 
598
        self.addressFamily, self.socketType.
 
599
        """
 
600
        s = socket.socket(self.addressFamily, self.socketType)
 
601
        s.setblocking(0)
 
602
        fdesc._setCloseOnExec(s.fileno())
 
603
        return s
 
604
 
 
605
    def resolveAddress(self):
 
606
        if abstract.isIPAddress(self.addr[0]):
 
607
            self._setRealAddress(self.addr[0])
 
608
        else:
 
609
            d = self.reactor.resolve(self.addr[0])
 
610
            d.addCallbacks(self._setRealAddress, self.failIfNotConnected)
 
611
 
 
612
    def _setRealAddress(self, address):
 
613
        self.realAddress = (address, self.addr[1])
 
614
        self.doConnect()
 
615
 
 
616
    def doConnect(self):
 
617
        """I connect the socket.
 
618
 
 
619
        Then, call the protocol's makeConnection, and start waiting for data.
 
620
        """
 
621
        if not hasattr(self, "connector"):
 
622
            # this happens when connection failed but doConnect
 
623
            # was scheduled via a callLater in self._finishInit
 
624
            return
 
625
 
 
626
        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
 
627
        if err:
 
628
            self.failIfNotConnected(error.getConnectError((err, strerror(err))))
 
629
            return
 
630
 
 
631
 
 
632
        # doConnect gets called twice.  The first time we actually need to
 
633
        # start the connection attempt.  The second time we don't really
 
634
        # want to (SO_ERROR above will have taken care of any errors, and if
 
635
        # it reported none, the mere fact that doConnect was called again is
 
636
        # sufficient to indicate that the connection has succeeded), but it
 
637
        # is not /particularly/ detrimental to do so.  This should get
 
638
        # cleaned up some day, though.
 
639
        try:
 
640
            connectResult = self.socket.connect_ex(self.realAddress)
 
641
        except socket.error, se:
 
642
            connectResult = se.args[0]
 
643
        if connectResult:
 
644
            if connectResult == EISCONN:
 
645
                pass
 
646
            # on Windows EINVAL means sometimes that we should keep trying:
 
647
            # http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winsock/winsock/connect_2.asp
 
648
            elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or
 
649
                  (connectResult == EINVAL and platformType == "win32")):
 
650
                self.startReading()
 
651
                self.startWriting()
 
652
                return
 
653
            else:
 
654
                self.failIfNotConnected(error.getConnectError((connectResult, strerror(connectResult))))
 
655
                return
 
656
 
 
657
        # If I have reached this point without raising or returning, that means
 
658
        # that the socket is connected.
 
659
        del self.doWrite
 
660
        del self.doRead
 
661
        # we first stop and then start, to reset any references to the old doRead
 
662
        self.stopReading()
 
663
        self.stopWriting()
 
664
        self._connectDone()
 
665
 
 
666
    def _connectDone(self):
 
667
        self.protocol = self.connector.buildProtocol(self.getPeer())
 
668
        self.connected = 1
 
669
        self.logstr = self.protocol.__class__.__name__ + ",client"
 
670
        self.startReading()
 
671
        self.protocol.makeConnection(self)
 
672
 
 
673
    def connectionLost(self, reason):
 
674
        if not self.connected:
 
675
            self.failIfNotConnected(error.ConnectError(string=reason))
 
676
        else:
 
677
            Connection.connectionLost(self, reason)
 
678
            self.connector.connectionLost(reason)
 
679
 
 
680
 
 
681
class Client(BaseClient):
 
682
    """A TCP client."""
 
683
 
 
684
    def __init__(self, host, port, bindAddress, connector, reactor=None):
 
685
        # BaseClient.__init__ is invoked later
 
686
        self.connector = connector
 
687
        self.addr = (host, port)
 
688
 
 
689
        whenDone = self.resolveAddress
 
690
        err = None
 
691
        skt = None
 
692
 
 
693
        try:
 
694
            skt = self.createInternetSocket()
 
695
        except socket.error, se:
 
696
            err = error.ConnectBindError(se[0], se[1])
 
697
            whenDone = None
 
698
        if whenDone and bindAddress is not None:
 
699
            try:
 
700
                skt.bind(bindAddress)
 
701
            except socket.error, se:
 
702
                err = error.ConnectBindError(se[0], se[1])
 
703
                whenDone = None
 
704
        self._finishInit(whenDone, skt, err, reactor)
 
705
 
 
706
    def getHost(self):
 
707
        """Returns an IPv4Address.
 
708
 
 
709
        This indicates the address from which I am connecting.
 
710
        """
 
711
        return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
 
712
 
 
713
    def getPeer(self):
 
714
        """Returns an IPv4Address.
 
715
 
 
716
        This indicates the address that I am connected to.
 
717
        """
 
718
        return address.IPv4Address('TCP', *(self.realAddress + ('INET',)))
 
719
 
 
720
    def __repr__(self):
 
721
        s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self))
 
722
        return s
 
723
 
 
724
 
 
725
class Server(Connection):
 
726
    """
 
727
    Serverside socket-stream connection class.
 
728
 
 
729
    This is a serverside network connection transport; a socket which came from
 
730
    an accept() on a server.
 
731
    """
 
732
 
 
733
    def __init__(self, sock, protocol, client, server, sessionno, reactor):
 
734
        """
 
735
        Server(sock, protocol, client, server, sessionno)
 
736
 
 
737
        Initialize it with a socket, a protocol, a descriptor for my peer (a
 
738
        tuple of host, port describing the other end of the connection), an
 
739
        instance of Port, and a session number.
 
740
        """
 
741
        Connection.__init__(self, sock, protocol, reactor)
 
742
        self.server = server
 
743
        self.client = client
 
744
        self.sessionno = sessionno
 
745
        self.hostname = client[0]
 
746
        self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__,
 
747
                                    sessionno,
 
748
                                    self.hostname)
 
749
        self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
 
750
                                          self.sessionno,
 
751
                                          self.server._realPortNumber)
 
752
        self.startReading()
 
753
        self.connected = 1
 
754
 
 
755
    def __repr__(self):
 
756
        """A string representation of this connection.
 
757
        """
 
758
        return self.repstr
 
759
 
 
760
    def startTLS(self, ctx, server=1):
 
761
        if Connection.startTLS(self, ctx, server):
 
762
            if server:
 
763
                self.socket.set_accept_state()
 
764
            else:
 
765
                self.socket.set_connect_state()
 
766
 
 
767
 
 
768
    def getHost(self):
 
769
        """Returns an IPv4Address.
 
770
 
 
771
        This indicates the server's address.
 
772
        """
 
773
        return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
 
774
 
 
775
    def getPeer(self):
 
776
        """Returns an IPv4Address.
 
777
 
 
778
        This indicates the client's address.
 
779
        """
 
780
        return address.IPv4Address('TCP', *(self.client + ('INET',)))
 
781
 
 
782
class Port(base.BasePort, _SocketCloser):
 
783
    """
 
784
    A TCP server port, listening for connections.
 
785
 
 
786
    When a connection is accepted, this will call a factory's buildProtocol
 
787
    with the incoming address as an argument, according to the specification
 
788
    described in L{twisted.internet.interfaces.IProtocolFactory}.
 
789
 
 
790
    If you wish to change the sort of transport that will be used, the
 
791
    C{transport} attribute will be called with the signature expected for
 
792
    C{Server.__init__}, so it can be replaced.
 
793
 
 
794
    @ivar deferred: a deferred created when L{stopListening} is called, and
 
795
        that will fire when connection is lost. This is not to be used it
 
796
        directly: prefer the deferred returned by L{stopListening} instead.
 
797
    @type deferred: L{defer.Deferred}
 
798
 
 
799
    @ivar disconnecting: flag indicating that the L{stopListening} method has
 
800
        been called and that no connections should be accepted anymore.
 
801
    @type disconnecting: C{bool}
 
802
 
 
803
    @ivar connected: flag set once the listen has successfully been called on
 
804
        the socket.
 
805
    @type connected: C{bool}
 
806
    """
 
807
 
 
808
    implements(interfaces.IListeningPort)
 
809
 
 
810
    addressFamily = socket.AF_INET
 
811
    socketType = socket.SOCK_STREAM
 
812
 
 
813
    transport = Server
 
814
    sessionno = 0
 
815
    interface = ''
 
816
    backlog = 50
 
817
 
 
818
    # Actual port number being listened on, only set to a non-None
 
819
    # value when we are actually listening.
 
820
    _realPortNumber = None
 
821
 
 
822
    def __init__(self, port, factory, backlog=50, interface='', reactor=None):
 
823
        """Initialize with a numeric port to listen on.
 
824
        """
 
825
        base.BasePort.__init__(self, reactor=reactor)
 
826
        self.port = port
 
827
        self.factory = factory
 
828
        self.backlog = backlog
 
829
        self.interface = interface
 
830
 
 
831
    def __repr__(self):
 
832
        if self._realPortNumber is not None:
 
833
            return "<%s of %s on %s>" % (self.__class__, self.factory.__class__,
 
834
                                         self._realPortNumber)
 
835
        else:
 
836
            return "<%s of %s (not listening)>" % (self.__class__, self.factory.__class__)
 
837
 
 
838
    def createInternetSocket(self):
 
839
        s = base.BasePort.createInternetSocket(self)
 
840
        if platformType == "posix" and sys.platform != "cygwin":
 
841
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
842
        return s
 
843
 
 
844
 
 
845
    def startListening(self):
 
846
        """Create and bind my socket, and begin listening on it.
 
847
 
 
848
        This is called on unserialization, and must be called after creating a
 
849
        server to begin listening on the specified port.
 
850
        """
 
851
        try:
 
852
            skt = self.createInternetSocket()
 
853
            skt.bind((self.interface, self.port))
 
854
        except socket.error, le:
 
855
            raise CannotListenError, (self.interface, self.port, le)
 
856
 
 
857
        # Make sure that if we listened on port 0, we update that to
 
858
        # reflect what the OS actually assigned us.
 
859
        self._realPortNumber = skt.getsockname()[1]
 
860
 
 
861
        log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNumber))
 
862
 
 
863
        # The order of the next 6 lines is kind of bizarre.  If no one
 
864
        # can explain it, perhaps we should re-arrange them.
 
865
        self.factory.doStart()
 
866
        skt.listen(self.backlog)
 
867
        self.connected = True
 
868
        self.socket = skt
 
869
        self.fileno = self.socket.fileno
 
870
        self.numberAccepts = 100
 
871
 
 
872
        self.startReading()
 
873
 
 
874
 
 
875
    def _buildAddr(self, (host, port)):
 
876
        return address._ServerFactoryIPv4Address('TCP', host, port)
 
877
 
 
878
 
 
879
    def doRead(self):
 
880
        """Called when my socket is ready for reading.
 
881
 
 
882
        This accepts a connection and calls self.protocol() to handle the
 
883
        wire-level protocol.
 
884
        """
 
885
        try:
 
886
            if platformType == "posix":
 
887
                numAccepts = self.numberAccepts
 
888
            else:
 
889
                # win32 event loop breaks if we do more than one accept()
 
890
                # in an iteration of the event loop.
 
891
                numAccepts = 1
 
892
            for i in range(numAccepts):
 
893
                # we need this so we can deal with a factory's buildProtocol
 
894
                # calling our loseConnection
 
895
                if self.disconnecting:
 
896
                    return
 
897
                try:
 
898
                    skt, addr = self.socket.accept()
 
899
                except socket.error, e:
 
900
                    if e.args[0] in (EWOULDBLOCK, EAGAIN):
 
901
                        self.numberAccepts = i
 
902
                        break
 
903
                    elif e.args[0] == EPERM:
 
904
                        # Netfilter on Linux may have rejected the
 
905
                        # connection, but we get told to try to accept()
 
906
                        # anyway.
 
907
                        continue
 
908
                    elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
 
909
 
 
910
                        # Linux gives EMFILE when a process is not allowed
 
911
                        # to allocate any more file descriptors.  *BSD and
 
912
                        # Win32 give (WSA)ENOBUFS.  Linux can also give
 
913
                        # ENFILE if the system is out of inodes, or ENOMEM
 
914
                        # if there is insufficient memory to allocate a new
 
915
                        # dentry.  ECONNABORTED is documented as possible on
 
916
                        # both Linux and Windows, but it is not clear
 
917
                        # whether there are actually any circumstances under
 
918
                        # which it can happen (one might expect it to be
 
919
                        # possible if a client sends a FIN or RST after the
 
920
                        # server sends a SYN|ACK but before application code
 
921
                        # calls accept(2), however at least on Linux this
 
922
                        # _seems_ to be short-circuited by syncookies.
 
923
 
 
924
                        log.msg("Could not accept new connection (%s)" % (
 
925
                            errorcode[e.args[0]],))
 
926
                        break
 
927
                    raise
 
928
 
 
929
                fdesc._setCloseOnExec(skt.fileno())
 
930
                protocol = self.factory.buildProtocol(self._buildAddr(addr))
 
931
                if protocol is None:
 
932
                    skt.close()
 
933
                    continue
 
934
                s = self.sessionno
 
935
                self.sessionno = s+1
 
936
                transport = self.transport(skt, protocol, addr, self, s, self.reactor)
 
937
                transport = self._preMakeConnection(transport)
 
938
                protocol.makeConnection(transport)
 
939
            else:
 
940
                self.numberAccepts = self.numberAccepts+20
 
941
        except:
 
942
            # Note that in TLS mode, this will possibly catch SSL.Errors
 
943
            # raised by self.socket.accept()
 
944
            #
 
945
            # There is no "except SSL.Error:" above because SSL may be
 
946
            # None if there is no SSL support.  In any case, all the
 
947
            # "except SSL.Error:" suite would probably do is log.deferr()
 
948
            # and return, so handling it here works just as well.
 
949
            log.deferr()
 
950
 
 
951
    def _preMakeConnection(self, transport):
 
952
        return transport
 
953
 
 
954
    def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
 
955
        """
 
956
        Stop accepting connections on this port.
 
957
 
 
958
        This will shut down the socket and call self.connectionLost().  It
 
959
        returns a deferred which will fire successfully when the port is
 
960
        actually closed, or with a failure if an error occurs shutting down.
 
961
        """
 
962
        self.disconnecting = True
 
963
        self.stopReading()
 
964
        if self.connected:
 
965
            self.deferred = deferLater(
 
966
                self.reactor, 0, self.connectionLost, connDone)
 
967
            return self.deferred
 
968
 
 
969
    stopListening = loseConnection
 
970
 
 
971
 
 
972
    def connectionLost(self, reason):
 
973
        """
 
974
        Cleans up the socket.
 
975
        """
 
976
        log.msg('(Port %s Closed)' % self._realPortNumber)
 
977
        self._realPortNumber = None
 
978
 
 
979
        base.BasePort.connectionLost(self, reason)
 
980
        self.connected = False
 
981
        self._closeSocket()
 
982
        del self.socket
 
983
        del self.fileno
 
984
 
 
985
        try:
 
986
            self.factory.doStop()
 
987
        finally:
 
988
            self.disconnecting = False
 
989
 
 
990
 
 
991
    def logPrefix(self):
 
992
        """Returns the name of my class, to prefix log entries with.
 
993
        """
 
994
        return reflect.qual(self.factory.__class__)
 
995
 
 
996
    def getHost(self):
 
997
        """Returns an IPv4Address.
 
998
 
 
999
        This indicates the server's address.
 
1000
        """
 
1001
        return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
 
1002
 
 
1003
class Connector(base.BaseConnector):
 
1004
    def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
 
1005
        self.host = host
 
1006
        if isinstance(port, types.StringTypes):
 
1007
            try:
 
1008
                port = socket.getservbyname(port, 'tcp')
 
1009
            except socket.error, e:
 
1010
                raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
 
1011
        self.port = port
 
1012
        self.bindAddress = bindAddress
 
1013
        base.BaseConnector.__init__(self, factory, timeout, reactor)
 
1014
 
 
1015
    def _makeTransport(self):
 
1016
        return Client(self.host, self.port, self.bindAddress, self, self.reactor)
 
1017
 
 
1018
    def getDestination(self):
 
1019
        return address.IPv4Address('TCP', self.host, self.port, 'INET')