~divmod-dev/divmod.org/1304710-storeless-adapter

« back to all changes in this revision

Viewing changes to Vertex/vertex/ptcp.py

  • Committer: cyli
  • Date: 2013-06-27 06:02:46 UTC
  • mto: This revision was merged to the branch mainline in revision 2702.
  • Revision ID: cyli-20130627060246-ciict8hwvjuy9d81
Move Vertex out of the Divmod.org repository

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# -*- test-case-name: vertex.test.test_ptcp -*-
2
 
 
3
 
import struct
4
 
 
5
 
from binascii import crc32  # used to use zlib.crc32 - but that gives different
6
 
                            # results on 64-bit platforms!!
7
 
 
8
 
import itertools
9
 
 
10
 
from twisted.python.failure import Failure
11
 
from twisted.internet.defer import Deferred
12
 
from twisted.internet import protocol, error, reactor, defer
13
 
from twisted.internet.main import CONNECTION_DONE
14
 
from twisted.python import log, util
15
 
 
16
 
from vertex import tcpdfa
17
 
from vertex.statemachine import StateError
18
 
 
19
 
 
20
 
genConnID = itertools.count(8).next
21
 
 
22
 
MAX_PSEUDO_PORT = (2 ** 16)
23
 
 
24
 
_packetFormat = ('!' # WTF did you think
25
 
                 'H' # sourcePseudoPort
26
 
                 'H' # destPseudoPort
27
 
                 'L' # sequenceNumber
28
 
                 'L' # acknowledgementNumber
29
 
                 'L' # window
30
 
                 'B' # flags
31
 
                 'l' # checksum
32
 
                     # (signed because of binascii.crc32)
33
 
                 'H' # dlen
34
 
                 )
35
 
_fixedSize = struct.calcsize(_packetFormat)
36
 
 
37
 
_SYN, _ACK, _FIN, _RST, _STB = [1 << n for n in range(5)]
38
 
 
39
 
def _flagprop(flag):
40
 
    def setter(self, value):
41
 
        if value:
42
 
            self.flags |= flag
43
 
        else:
44
 
            self.flags &= ~flag
45
 
    return property(lambda self: bool(self.flags & flag), setter)
46
 
 
47
 
def relativeSequence(wireSequence, initialSequence, lapNumber):
48
 
    """ Compute a relative sequence number from a wire sequence number so that we
49
 
    can use natural Python comparisons on it, such as <, >, ==.
50
 
 
51
 
    @param wireSequence: the sequence number received on the wire.
52
 
 
53
 
    @param initialSequence: the ISN for this sequence, negotiated at SYN time.
54
 
 
55
 
    @param lapNumber: the number of times that this value has wrapped around
56
 
    2**32.
57
 
    """
58
 
    return (wireSequence + (lapNumber * (2**32))) - initialSequence
59
 
 
60
 
class PTCPPacket(util.FancyStrMixin, object):
61
 
    showAttributes = (
62
 
        ('sourcePseudoPort', 'sourcePseudoPort', '%d'),
63
 
        ('destPseudoPort', 'destPseudoPort', '%d'),
64
 
        ('shortdata', 'data', '%r'),
65
 
        ('niceflags', 'flags', '%s'),
66
 
        ('dlen', 'dlen', '%d'),
67
 
        ('seqNum', 'seq', '%d'),
68
 
        ('ackNum', 'ack', '%d'),
69
 
        ('checksum', 'checksum', '%x'),
70
 
        ('peerAddressTuple', 'peerAddress', '%r'),
71
 
        ('retransmitCount', 'retransmitCount', '%d'),
72
 
        )
73
 
 
74
 
    syn = _flagprop(_SYN)
75
 
    ack = _flagprop(_ACK)
76
 
    fin = _flagprop(_FIN)
77
 
    rst = _flagprop(_RST)
78
 
    stb = _flagprop(_STB)
79
 
 
80
 
    # Number of retransmit attempts left for this segment.  When it reaches
81
 
    # zero, this segment is dead.
82
 
    retransmitCount = 50
83
 
 
84
 
    def shortdata():
85
 
        def get(self):
86
 
            if len(self.data) > 13:
87
 
                return self.data[:5] + '...' + self.data[-5:]
88
 
            else:
89
 
                return self.data
90
 
        return get,
91
 
    shortdata = property(*shortdata())
92
 
 
93
 
    def niceflags():
94
 
        def get(self):
95
 
            res = []
96
 
            for (f, v) in [
97
 
                (self.syn, 'S'), (self.ack, 'A'), (self.fin, 'F'),
98
 
                (self.rst, 'R'), (self.stb, 'T')]:
99
 
                res.append(f and v or '.')
100
 
            return ''.join(res)
101
 
        return get,
102
 
    niceflags = property(*niceflags())
103
 
 
104
 
    def create(cls,
105
 
               sourcePseudoPort, destPseudoPort,
106
 
               seqNum, ackNum, data,
107
 
               window=(1 << 15),
108
 
               syn=False, ack=False, fin=False,
109
 
               rst=False, stb=False,
110
 
               destination=None):
111
 
        i = cls(sourcePseudoPort, destPseudoPort,
112
 
                seqNum, ackNum, window,
113
 
                0, 0, len(data), data)
114
 
        i.syn = syn
115
 
        i.ack = ack
116
 
        i.fin = fin
117
 
        i.rst = rst
118
 
        i.stb = stb
119
 
        i.checksum = i.computeChecksum()
120
 
        i.destination = destination
121
 
        return i
122
 
    create = classmethod(create)
123
 
 
124
 
 
125
 
    def __init__(self,
126
 
                 sourcePseudoPort,
127
 
                 destPseudoPort,
128
 
                 seqNum, ackNum, window, flags,
129
 
                 checksum, dlen, data, peerAddressTuple=None,
130
 
                 seqOffset=0, ackOffset=0, seqLaps=0, ackLaps=0):
131
 
        self.sourcePseudoPort = sourcePseudoPort
132
 
        self.destPseudoPort = destPseudoPort
133
 
        self.seqNum = seqNum
134
 
        self.ackNum = ackNum
135
 
        self.window = window
136
 
        self.flags = flags
137
 
        self.checksum = checksum
138
 
        self.dlen = dlen
139
 
        self.data = data
140
 
        self.peerAddressTuple = peerAddressTuple # None if local
141
 
 
142
 
        self.seqOffset = seqOffset
143
 
        self.ackOffset = ackOffset
144
 
        self.seqLaps = seqLaps
145
 
        self.ackLaps = ackLaps
146
 
 
147
 
    def segmentLength(self):
148
 
        """RFC page 26: 'The segment length (SEG.LEN) includes both data and sequence
149
 
        space occupying controls'
150
 
        """
151
 
        return self.dlen + self.syn + self.fin
152
 
 
153
 
    def relativeSeq(self):
154
 
        return relativeSequence(self.seqNum, self.seqOffset, self.seqLaps)
155
 
 
156
 
    def relativeAck(self):
157
 
        return relativeSequence(self.ackNum, self.ackOffset, self.ackLaps)
158
 
 
159
 
 
160
 
    def verifyChecksum(self):
161
 
        if len(self.data) != self.dlen:
162
 
            if len(self.data) > self.dlen:
163
 
                raise GarbageDataError(self)
164
 
            else:
165
 
                raise TruncatedDataError(self)
166
 
        expected = self.computeChecksum()
167
 
        received = self.checksum
168
 
        if expected != received:
169
 
            raise ChecksumMismatchError(expected, received)
170
 
 
171
 
    def computeChecksum(self):
172
 
        return crc32(self.data)
173
 
 
174
 
    def decode(cls, bytes, hostPortPair):
175
 
        fields = struct.unpack(_packetFormat, bytes[:_fixedSize])
176
 
        sourcePseudoPort, destPseudoPort, seq, ack, window, flags, checksum, dlen = fields
177
 
        data = bytes[_fixedSize:]
178
 
        pkt = cls(sourcePseudoPort, destPseudoPort, seq, ack, window, flags,
179
 
                  checksum, dlen, data, hostPortPair)
180
 
        return pkt
181
 
    decode = classmethod(decode)
182
 
 
183
 
    def mustRetransmit(self):
184
 
        """Check to see if this packet must be retransmitted until it was received.
185
 
        """
186
 
        if self.syn or self.fin or self.dlen:
187
 
            return True
188
 
        return False
189
 
 
190
 
    def encode(self):
191
 
        dlen = len(self.data)
192
 
        checksum = self.computeChecksum()
193
 
        return struct.pack(
194
 
            _packetFormat,
195
 
            self.sourcePseudoPort, self.destPseudoPort,
196
 
            self.seqNum, self.ackNum, self.window,
197
 
            self.flags, checksum, dlen) + self.data
198
 
 
199
 
    def fragment(self, mtu):
200
 
        if self.dlen < mtu:
201
 
            return [self]
202
 
        assert not self.syn, "should not be originating syn packets w/ data"
203
 
        seqOfft = 0
204
 
        L = []
205
 
        # XXX TODO: need to take seqLaps into account, etc.
206
 
        for chunk in iterchunks(self.data, mtu):
207
 
            last = self.create(self.sourcePseudoPort,
208
 
                               self.destPseudoPort,
209
 
                               self.seqNum + seqOfft,
210
 
                               self.ackNum,
211
 
                               chunk,
212
 
                               self.window,
213
 
                               destination=self.destination,
214
 
                               ack=self.ack)
215
 
            L.append(last)
216
 
            seqOfft += len(chunk)
217
 
        if self.fin:
218
 
            last.fin = self.fin
219
 
            last.checksum = last.computeChecksum()
220
 
        return L
221
 
 
222
 
 
223
 
def iterchunks(data, chunksize):
224
 
    """iterate chunks of data
225
 
    """
226
 
    offt = 0
227
 
    while offt < len(data):
228
 
        yield data[offt:offt+chunksize]
229
 
        offt += chunksize
230
 
 
231
 
 
232
 
def ISN():
233
 
    """
234
 
    Initial Sequence Number generator.
235
 
    """
236
 
    # return int((time.time() * 1000000) / 4) % 2**32
237
 
    return 0
238
 
 
239
 
 
240
 
def segmentAcceptable(RCV_NXT, RCV_WND, SEG_SEQ, SEG_LEN):
241
 
    # RFC page 26.
242
 
    if SEG_LEN == 0 and RCV_WND == 0:
243
 
        return SEG_SEQ == RCV_NXT
244
 
    if SEG_LEN == 0 and RCV_WND > 0:
245
 
        return ((RCV_NXT <= SEG_SEQ) and (SEG_SEQ < RCV_NXT + RCV_WND))
246
 
    if SEG_LEN > 0 and RCV_WND == 0:
247
 
        return False
248
 
    if SEG_LEN > 0 and RCV_WND > 0:
249
 
        return ((  (RCV_NXT <= SEG_SEQ) and (SEG_SEQ < RCV_NXT + RCV_WND))
250
 
                or ((RCV_NXT <= SEG_SEQ+SEG_LEN-1) and
251
 
                    (SEG_SEQ+SEG_LEN-1 < RCV_NXT + RCV_WND)))
252
 
    assert 0, 'Should be impossible to get here.'
253
 
    return False
254
 
 
255
 
class BadPacketError(Exception):
256
 
    """
257
 
    A packet was bad for some reason.
258
 
    """
259
 
 
260
 
class ChecksumMismatchError(Exception):
261
 
    """
262
 
    The checksum and data received did not match.
263
 
    """
264
 
 
265
 
class TruncatedDataError(Exception):
266
 
    """
267
 
    The packet was truncated in transit, and all of the data did not arrive.
268
 
    """
269
 
 
270
 
class GarbageDataError(Exception):
271
 
    """
272
 
    Too much data was received (???)
273
 
    """
274
 
 
275
 
class PTCPConnection(tcpdfa.TCP):
276
 
    """
277
 
    Implementation of RFC 793 state machine.
278
 
 
279
 
    @ivar oldestUnackedSendSeqNum: (TCP RFC: SND.UNA) The oldest (relative)
280
 
    sequence number referring to an octet which we have sent or may send which
281
 
    is unacknowledged.  This begins at 0, which is special because it is not
282
 
    for an octet, but rather for the initial SYN packet.  Unless it is 0, this
283
 
    represents the sequence number of self._outgoingBytes[0].
284
 
 
285
 
    @ivar nextSendSeqNum: (TCP RFC: SND.NXT) The next (relative) sequence
286
 
    number that we will send to our peer after the current buffered segments
287
 
    have all been acknowledged.  This is the sequence number of the
288
 
    not-yet-extant octet in the stream at
289
 
    self._outgoingBytes[len(self._outgoingBytes)].
290
 
 
291
 
    @ivar nextRecvSeqNum: (TCP RFC: RCV.NXT) The next (relative) sequence
292
 
    number that the peer should send to us if they want to send more data;
293
 
    their first unacknowledged sequence number as far as we are concerned; the
294
 
    left or lower edge of the receive window; the sequence number of the first
295
 
    octet that has not been delivered to the application.  changed whenever we
296
 
    receive an appropriate ACK.
297
 
 
298
 
    @ivar peerSendISN: the initial sequence number that the peer sent us during
299
 
    the negotiation phase.  All peer-relative sequence numbers are computed
300
 
    using this.  (see C{relativeSequence}).
301
 
 
302
 
    @ivar hostSendISN: the initial sequence number that the we sent during the
303
 
    negotiation phase.  All host-relative sequence numbers are computed using
304
 
    this.  (see C{relativeSequence})
305
 
 
306
 
    @ivar retransmissionQueue: a list of packets to be re-sent until their
307
 
    acknowledgements come through.
308
 
 
309
 
    @ivar recvWindow: (TCP RFC: RCV.WND) - the size [in octets] of the current
310
 
    window allowed by this host, to be in transit from the other host.
311
 
 
312
 
    @ivar sendWindow: (TCP RFC: SND.WND) - the size [in octets] of the current
313
 
    window allowed by our peer, to be in transit from us.
314
 
 
315
 
    """
316
 
 
317
 
    mtu = 512 - _fixedSize
318
 
 
319
 
    recvWindow = mtu
320
 
    sendWindow = mtu
321
 
    sendWindowRemaining = mtu * 2
322
 
 
323
 
    protocol = None
324
 
 
325
 
    def __init__(self,
326
 
                 hostPseudoPort, peerPseudoPort,
327
 
                 ptcp, factory, peerAddressTuple):
328
 
        tcpdfa.TCP.__init__(self)
329
 
        self.hostPseudoPort = hostPseudoPort
330
 
        self.peerPseudoPort = peerPseudoPort
331
 
        self.ptcp = ptcp
332
 
        self.factory = factory
333
 
        self._receiveBuffer = []
334
 
        self.retransmissionQueue = []
335
 
        self.peerAddressTuple = peerAddressTuple
336
 
 
337
 
        self.oldestUnackedSendSeqNum = 0
338
 
        self.nextSendSeqNum = 0
339
 
        self.hostSendISN = 0
340
 
        self.nextRecvSeqNum = 0
341
 
        self.peerSendISN = 0
342
 
        self.setPeerISN = False
343
 
 
344
 
    peerSendISN = None
345
 
 
346
 
    def packetReceived(self, packet):
347
 
        # XXX TODO: probably have to do something to the packet here to
348
 
        # identify its relative sequence number.
349
 
 
350
 
        # print 'received', self, packet
351
 
 
352
 
        if packet.stb:
353
 
            # Shrink the MTU
354
 
            [self.mtu] = struct.unpack('!H', packet.data)
355
 
            rq = []
356
 
            for pkt in self.retransmissionQueue:
357
 
                rq.extend(pkt.fragment(self.mtu))
358
 
            self.retransmissionQueue = rq
359
 
            return
360
 
 
361
 
        if self._paused:
362
 
            return
363
 
 
364
 
        generatedStateMachineInput = False
365
 
        if packet.syn:
366
 
            if packet.dlen:
367
 
                # Whoops, what?  SYNs probably can contain data, I think, but I
368
 
                # certainly don't see anything in the spec about how to deal
369
 
                # with this or in ethereal for how linux deals with it -glyph
370
 
                raise BadPacketError(
371
 
                    "currently no data allowed in SYN packets: %r"
372
 
                    % (packet,))
373
 
            else:
374
 
                assert packet.segmentLength() == 1
375
 
            if self.peerAddressTuple is None:
376
 
                # we're a server
377
 
                assert self.wasEverListen, "Clients must specify a connect address."
378
 
                self.peerAddressTuple = packet.peerAddressTuple
379
 
            else:
380
 
                # we're a client
381
 
                assert self.peerAddressTuple == packet.peerAddressTuple
382
 
            if self.setPeerISN:
383
 
                if self.peerSendISN != packet.seqNum:
384
 
                    raise BadPacketError(
385
 
                        "Peer ISN was already set to %s but incoming packet "
386
 
                        "tried to set it to %s" % (
387
 
                            self.peerSendISN, packet.seqNum))
388
 
                if not self.retransmissionQueue:
389
 
                    # If our retransmissionQueue is hot, we are going to send
390
 
                    # them an ACK to this with the next packet we send them
391
 
                    # anyway; as a bonus, this will properly determine whether
392
 
                    # we're sending a SYN+ACK or merely an ACK; the only time
393
 
                    # we send an ACK is when we have nothing to say to them and
394
 
                    # they're blocked on getting a response to their SYN+ACK
395
 
                    # from us. -glyph
396
 
                    self.originate(ack=True)
397
 
                return
398
 
            self.setPeerISN = True
399
 
            self.peerSendISN = packet.seqNum
400
 
            # syn, fin, and data are mutually exclusive, so this relative
401
 
            # sequence-number increment is done both here, and below in the
402
 
            # data/fin processing block.
403
 
            self.nextRecvSeqNum += packet.segmentLength()
404
 
            if not packet.ack:
405
 
                generatedStateMachineInput = True
406
 
                self.input(tcpdfa.SYN)
407
 
 
408
 
        SEG_ACK = packet.relativeAck() # aliasing this for easier reading w/
409
 
                                       # the RFC
410
 
        if packet.ack:
411
 
            if (self.oldestUnackedSendSeqNum < SEG_ACK and
412
 
                SEG_ACK <= self.nextSendSeqNum):
413
 
                # According to the spec, an 'acceptable ack
414
 
                rq = self.retransmissionQueue
415
 
                while rq:
416
 
                    segmentOnQueue = rq[0]
417
 
                    qSegSeq = segmentOnQueue.relativeSeq()
418
 
                    if qSegSeq + segmentOnQueue.segmentLength() <= SEG_ACK:
419
 
                        # fully acknowledged, as per RFC!
420
 
                        rq.pop(0)
421
 
                        sminput = None
422
 
                        self.sendWindowRemaining += segmentOnQueue.segmentLength()
423
 
                        # print 'inc send window', self, self.sendWindowRemaining
424
 
                        if segmentOnQueue.syn:
425
 
                            if packet.syn:
426
 
                                sminput = tcpdfa.SYN_ACK
427
 
                            else:
428
 
                                sminput = tcpdfa.ACK
429
 
                        elif segmentOnQueue.fin:
430
 
                            sminput = tcpdfa.ACK
431
 
                        if sminput is not None:
432
 
                            # print 'ack input:', segmentOnQueue, packet, sminput
433
 
                            generatedStateMachineInput = True
434
 
                            self.input(sminput)
435
 
                    else:
436
 
                        break
437
 
                else:
438
 
                    # write buffer is empty; alert the application layer.
439
 
                    self._writeBufferEmpty()
440
 
                self.oldestUnackedSendSeqNum = SEG_ACK
441
 
 
442
 
        if packet.syn:
443
 
            assert generatedStateMachineInput
444
 
            return
445
 
 
446
 
        # XXX TODO: examine 'window' field and adjust sendWindowRemaining
447
 
        # is it 'occupying a portion of valid receive sequence space'?  I think
448
 
        # this means 'packet which might acceptably contain useful data'
449
 
        if not packet.segmentLength():
450
 
            assert packet.ack, "What the _HELL_ is wrong with this packet:" +str(packet)
451
 
            return
452
 
 
453
 
        if not segmentAcceptable(self.nextRecvSeqNum,
454
 
                                 self.recvWindow,
455
 
                                 packet.relativeSeq(),
456
 
                                 packet.segmentLength()):
457
 
            # We have to transmit an ack here since it's old data.  We probably
458
 
            # need to ack in more states than just ESTABLISHED... but which
459
 
            # ones?
460
 
            if not self.retransmissionQueue:
461
 
                self.originate(ack=True)
462
 
            return
463
 
 
464
 
        # OK!  It's acceptable!  Let's process the various bits of data.
465
 
        # Where is the useful data in the packet?
466
 
        if packet.relativeSeq() > self.nextRecvSeqNum:
467
 
            # XXX: Here's what's going on.  Data can be 'in the window', but
468
 
            # still in the future.  For example, if I have a window of length 3
469
 
            # and I send segments DATA1(len 1) DATA2(len 1) FIN and you receive
470
 
            # them in the order FIN DATA1 DATA2, you don't actually want to
471
 
            # process the FIN until you've processed the data.
472
 
 
473
 
            # For the moment we are just dropping anything that isn't exactly
474
 
            # the next thing we want to process.  This is perfectly valid;
475
 
            # these packets might have been dropped, so the other end will have
476
 
            # to retransmit them anyway.
477
 
            return
478
 
 
479
 
        if packet.dlen:
480
 
            assert not packet.syn, 'no seriously I _do not_ know how to handle this'
481
 
            usefulData = packet.data[self.nextRecvSeqNum - packet.relativeSeq():]
482
 
            # DONT check/slice the window size here, the acceptability code
483
 
            # checked it, we can over-ack if the other side is buggy (???)
484
 
            if self.protocol is not None:
485
 
                try:
486
 
                    self.protocol.dataReceived(usefulData)
487
 
                except:
488
 
                    log.err()
489
 
                    self.loseConnection()
490
 
 
491
 
        self.nextRecvSeqNum += packet.segmentLength()
492
 
        if self.state == tcpdfa.ESTABLISHED:
493
 
            # In all other states, the state machine takes care of sending ACKs
494
 
            # in its output process.
495
 
            self.originate(ack=True)
496
 
 
497
 
        if packet.fin:
498
 
            self.input(tcpdfa.FIN)
499
 
 
500
 
 
501
 
    def getHost(self):
502
 
        tupl = self.ptcp.transport.getHost()
503
 
        return PTCPAddress((tupl.host, tupl.port),
504
 
                           self.pseudoPortPair)
505
 
 
506
 
    def getPeer(self):
507
 
        return PTCPAddress(self.peerAddressTuple,
508
 
                           self.pseudoPortPair)
509
 
 
510
 
    _outgoingBytes = ''
511
 
    _nagle = None
512
 
 
513
 
    def write(self, bytes):
514
 
        assert not self.disconnected, 'Writing to a transport that was already disconnected.'
515
 
        self._outgoingBytes += bytes
516
 
        self._writeLater()
517
 
 
518
 
 
519
 
    def writeSequence(self, seq):
520
 
        self.write(''.join(seq))
521
 
 
522
 
 
523
 
    def _writeLater(self):
524
 
        if self._nagle is None:
525
 
            self._nagle = reactor.callLater(0.001, self._reallyWrite)
526
 
 
527
 
    def _originateOneData(self):
528
 
        amount = min(self.sendWindowRemaining, self.mtu)
529
 
        sendOut = self._outgoingBytes[:amount]
530
 
        # print 'originating data packet', len(sendOut)
531
 
        self._outgoingBytes = self._outgoingBytes[amount:]
532
 
        self.sendWindowRemaining -= len(sendOut)
533
 
        self.originate(ack=True, data=sendOut)
534
 
 
535
 
    def _reallyWrite(self):
536
 
        # print self, 'really writing', self._paused
537
 
        self._nagle = None
538
 
        if self._outgoingBytes:
539
 
            # print 'window and bytes', self.sendWindowRemaining, len(self._outgoingBytes)
540
 
            while self.sendWindowRemaining and self._outgoingBytes:
541
 
                self._originateOneData()
542
 
 
543
 
    _retransmitter = None
544
 
    _retransmitTimeout = 0.5
545
 
 
546
 
    def _retransmitLater(self):
547
 
        assert self.state != tcpdfa.CLOSED
548
 
        if self._retransmitter is None:
549
 
            self._retransmitter = reactor.callLater(self._retransmitTimeout, self._reallyRetransmit)
550
 
 
551
 
    def _stopRetransmitting(self):
552
 
        # used both as a quick-and-dirty test shutdown hack and a way to shut
553
 
        # down when we die...
554
 
        if self._retransmitter is not None:
555
 
            self._retransmitter.cancel()
556
 
            self._retransmitter = None
557
 
        if self._nagle is not None:
558
 
            self._nagle.cancel()
559
 
            self._nagle = None
560
 
        if self._closeWaitLoseConnection is not None:
561
 
            self._closeWaitLoseConnection.cancel()
562
 
            self._closeWaitLoseConnection = None
563
 
 
564
 
    def _reallyRetransmit(self):
565
 
        # XXX TODO: packet fragmentation & coalescing.
566
 
        # print 'Wee a retransmit!  What I got?', self.retransmissionQueue
567
 
        self._retransmitter = None
568
 
        if self.retransmissionQueue:
569
 
            for packet in self.retransmissionQueue:
570
 
                packet.retransmitCount -= 1
571
 
                if packet.retransmitCount:
572
 
                    packet.ackNum = self.currentAckNum()
573
 
                    self.ptcp.sendPacket(packet)
574
 
                else:
575
 
                    self.input(tcpdfa.TIMEOUT)
576
 
                    return
577
 
            self._retransmitLater()
578
 
 
579
 
    disconnecting = False       # This is *TWISTED* level state-machine stuff,
580
 
                                # not TCP-level.
581
 
 
582
 
    def loseConnection(self):
583
 
        if not self.disconnecting:
584
 
            self.disconnecting = True
585
 
            if not self._outgoingBytes:
586
 
                self._writeBufferEmpty()
587
 
 
588
 
 
589
 
    def _writeBufferEmpty(self):
590
 
        if self._outgoingBytes:
591
 
            self._reallyWrite()
592
 
        elif self.producer is not None:
593
 
            if (not self.streamingProducer) or self.producerPaused:
594
 
                self.producerPaused = False
595
 
                self.producer.resumeProducing()
596
 
        elif self.disconnecting and (not self.disconnected
597
 
                                     or self.state == tcpdfa.CLOSE_WAIT):
598
 
            self.input(tcpdfa.APP_CLOSE)
599
 
 
600
 
 
601
 
    def _writeBufferFull(self):
602
 
        # print 'my write buffer is full'
603
 
        if (self.producer is not None
604
 
            and not self.producerPaused):
605
 
            self.producerPaused = True
606
 
            # print 'producer pausing'
607
 
            self.producer.pauseProducing()
608
 
            # print 'producer paused'
609
 
        else:
610
 
            # print 'but I am not telling my producer to pause!'
611
 
            # print '  ', self.producer, self.streamingProducer, self.producerPaused
612
 
            pass
613
 
 
614
 
 
615
 
    disconnected = False
616
 
    producer = None
617
 
    producerPaused = False
618
 
    streamingProducer = False
619
 
 
620
 
    def registerProducer(self, producer, streaming):
621
 
        if self.producer is not None:
622
 
            raise RuntimeError(
623
 
                "Cannot register producer %s, "
624
 
                "because producer %s was never unregistered."
625
 
                % (producer, self.producer))
626
 
        if self.disconnected:
627
 
            producer.stopProducing()
628
 
        else:
629
 
            self.producer = producer
630
 
            self.streamingProducer = streaming
631
 
            if not streaming and not self._outgoingBytes:
632
 
                producer.resumeProducing()
633
 
 
634
 
    def unregisterProducer(self):
635
 
        self.producer = None
636
 
        if not self._outgoingBytes:
637
 
            self._writeBufferEmpty()
638
 
 
639
 
    _paused = False
640
 
    def pauseProducing(self):
641
 
        self._paused = True
642
 
 
643
 
    def resumeProducing(self):
644
 
        self._paused = False
645
 
 
646
 
    def currentAckNum(self):
647
 
        return (self.nextRecvSeqNum + self.peerSendISN) % (2**32)
648
 
 
649
 
    def originate(self, data='', syn=False, ack=False, fin=False):
650
 
        if syn:
651
 
            # We really should be randomizing the ISN but until we finish the
652
 
            # implementations of the various bits of wraparound logic that were
653
 
            # started with relativeSequence
654
 
            assert self.nextSendSeqNum == 0
655
 
            assert self.hostSendISN == 0
656
 
        p = PTCPPacket.create(self.hostPseudoPort,
657
 
                              self.peerPseudoPort,
658
 
                              seqNum=(self.nextSendSeqNum + self.hostSendISN) % (2**32),
659
 
                              ackNum=self.currentAckNum(),
660
 
                              data=data,
661
 
                              window=self.recvWindow,
662
 
                              syn=syn, ack=ack, fin=fin,
663
 
                              destination=self.peerAddressTuple)
664
 
        # do we want to enqueue this packet for retransmission?
665
 
        sl = p.segmentLength()
666
 
        self.nextSendSeqNum += sl
667
 
 
668
 
        if p.mustRetransmit():
669
 
            # print self, 'originating retransmittable packet', len(self.retransmissionQueue)
670
 
            if self.retransmissionQueue:
671
 
                if self.retransmissionQueue[-1].fin:
672
 
                    raise AssertionError("Sending %r after FIN??!" % (p,))
673
 
            # print 'putting it on the queue'
674
 
            self.retransmissionQueue.append(p)
675
 
            # print 'and sending it later'
676
 
            self._retransmitLater()
677
 
            if not self.sendWindowRemaining: # len(self.retransmissionQueue) > 5:
678
 
                # print 'oh no my queue is too big'
679
 
                # This is a random number (5) because I ought to be summing the
680
 
                # packet lengths or something.
681
 
                self._writeBufferFull()
682
 
            else:
683
 
                # print 'my queue is still small enough', len(self.retransmissionQueue), self, self.sendWindowRemaining
684
 
                pass
685
 
        self.ptcp.sendPacket(p)
686
 
 
687
 
    # State machine transition definitions, hooray.
688
 
    def transition_SYN_SENT_to_CLOSED(self):
689
 
        """
690
 
        The connection never got anywhere.  Goodbye.
691
 
        """
692
 
        # XXX CONNECTOR API OMFG
693
 
        self.factory.clientConnectionFailed(None, error.TimeoutError())
694
 
 
695
 
 
696
 
    wasEverListen = False
697
 
 
698
 
    def enter_LISTEN(self):
699
 
        # Spec says this is necessary for RST handling; we need it for making
700
 
        # sure it's OK to bind port numbers.
701
 
        self.wasEverListen = True
702
 
 
703
 
    def enter_CLOSED(self):
704
 
        self.ptcp.connectionClosed(self)
705
 
        self._stopRetransmitting()
706
 
        if self._timeWaitCall is not None:
707
 
            self._timeWaitCall.cancel()
708
 
            self._timeWaitCall = None
709
 
 
710
 
    _timeWaitCall = None
711
 
    _timeWaitTimeout = 0.01     # REALLY fast timeout, right now this is for
712
 
                                # the tests...
713
 
 
714
 
    def enter_TIME_WAIT(self):
715
 
        self._stopRetransmitting()
716
 
        self._timeWaitCall = reactor.callLater(self._timeWaitTimeout, self._do2mslTimeout)
717
 
 
718
 
    def _do2mslTimeout(self):
719
 
        self._timeWaitCall = None
720
 
        self.input(tcpdfa.TIMEOUT)
721
 
 
722
 
    peerAddressTuple = None
723
 
 
724
 
    def transition_LISTEN_to_SYN_SENT(self):
725
 
        """
726
 
        Uh, what?  We were listening and we tried to send some bytes.
727
 
        This is an error for PTCP.
728
 
        """
729
 
        raise StateError("You can't write anything until someone connects to you.")
730
 
 
731
 
#     def invalidInput(self, datum):
732
 
#         print self, self.protocol, 'invalid input', datum
733
 
 
734
 
    def pseudoPortPair():
735
 
        def get(self):
736
 
            return (self.hostPseudoPort,
737
 
                    self.peerPseudoPort)
738
 
        return get,
739
 
    pseudoPortPair = property(*pseudoPortPair())
740
 
 
741
 
    def enter_ESTABLISHED(self):
742
 
        """
743
 
        We sent out SYN, they acknowledged it.  Congratulations, you
744
 
        have a new baby connection.
745
 
        """
746
 
        assert not self.disconnecting
747
 
        assert not self.disconnected
748
 
        try:
749
 
            p = self.factory.buildProtocol(PTCPAddress(
750
 
                    self.peerAddressTuple, self.pseudoPortPair))
751
 
            p.makeConnection(self)
752
 
        except:
753
 
            log.msg("Exception during PTCP connection setup.")
754
 
            log.err()
755
 
            self.loseConnection()
756
 
        else:
757
 
            self.protocol = p
758
 
 
759
 
    def exit_ESTABLISHED(self):
760
 
        assert not self.disconnected
761
 
        self.disconnected = True
762
 
        try:
763
 
            self.protocol.connectionLost(Failure(CONNECTION_DONE))
764
 
        except:
765
 
            log.err()
766
 
        self.protocol = None
767
 
 
768
 
        if self.producer is not None:
769
 
            try:
770
 
                self.producer.stopProducing()
771
 
            except:
772
 
                log.err()
773
 
            self.producer = None
774
 
 
775
 
 
776
 
    _closeWaitLoseConnection = None
777
 
 
778
 
    def enter_CLOSE_WAIT(self):
779
 
        # Twisted automatically reacts to network half-close by issuing a full
780
 
        # close.
781
 
        self._closeWaitLoseConnection = reactor.callLater(0.01, self._loseConnectionBecauseOfCloseWait)
782
 
 
783
 
    def _loseConnectionBecauseOfCloseWait(self):
784
 
        self._closeWaitLoseConnection = None
785
 
        self.loseConnection()
786
 
 
787
 
    def immediateShutdown(self):
788
 
        """_IMMEDIATELY_ shut down this connection, sending one (non-retransmitted)
789
 
        app-close packet, emptying our buffers, clearing our producer and
790
 
        getting ready to die right after this call.
791
 
        """
792
 
        self._outgoingBytes = ''
793
 
        if self.state == tcpdfa.ESTABLISHED:
794
 
            self.input(tcpdfa.APP_CLOSE)
795
 
            self._stopRetransmitting()
796
 
            self._reallyRetransmit()
797
 
 
798
 
        # All states that we can reasonably be in handle a timeout; force our
799
 
        # connection to think that it's become desynchronized with the other
800
 
        # end so that it will totally shut itself down.
801
 
 
802
 
        self.input(tcpdfa.TIMEOUT)
803
 
        assert self._retransmitter is None
804
 
        assert self._nagle is None
805
 
 
806
 
    def output_ACK(self):
807
 
        self.originate(ack=True)
808
 
 
809
 
    def output_FIN(self):
810
 
        self.originate(fin=True)
811
 
 
812
 
    def output_SYN_ACK(self):
813
 
        self.originate(syn=True, ack=True)
814
 
 
815
 
    def output_SYN(self):
816
 
        self.originate(syn=True)
817
 
 
818
 
class PTCPAddress(object):
819
 
    # garbage
820
 
 
821
 
    def __init__(self, (host, port), (pseudoHostPort, pseudoPeerPort)):
822
 
        self.host = host
823
 
        self.port = port
824
 
        self.pseudoHostPort = pseudoHostPort
825
 
        self.pseudoPeerPort = pseudoPeerPort
826
 
 
827
 
    def __repr__(self):
828
 
        return 'PTCPAddress((%r, %r), (%r, %r))' % (
829
 
            self.host, self.port,
830
 
            self.pseudoHostPort,
831
 
            self.pseudoPeerPort)
832
 
 
833
 
 
834
 
 
835
 
class _PendingEvent(object):
836
 
    def __init__(self):
837
 
        self.listeners = []
838
 
 
839
 
 
840
 
    def deferred(self):
841
 
        d = Deferred()
842
 
        self.listeners.append(d)
843
 
        return d
844
 
 
845
 
 
846
 
    def callback(self, result):
847
 
        l = self.listeners
848
 
        self.listeners = []
849
 
        for d in l:
850
 
            d.callback(result)
851
 
 
852
 
 
853
 
    def errback(self, result=None):
854
 
        if result is None:
855
 
            result = Failure()
856
 
        l = self.listeners
857
 
        self.listeners = []
858
 
        for d in l:
859
 
            d.errback(result)
860
 
 
861
 
 
862
 
 
863
 
class PTCP(protocol.DatagramProtocol):
864
 
    """
865
 
    L{PTCP} implements a strongly TCP-like protocol on top of UDP.  It
866
 
    provides a transport which is connection-oriented, streaming,
867
 
    ordered, and reliable.
868
 
 
869
 
    @ivar factory: A L{ServerFactory} which is used to create
870
 
        L{IProtocol} providers whenever a new PTCP connection is made
871
 
        to this port.
872
 
 
873
 
    @ivar _connections: A mapping of endpoint addresses to connection
874
 
        objects.  These are the active connections being multiplexed
875
 
        over this UDP port.  Many PTCP connections may run over the
876
 
        same L{PTCP} instance, communicating with many different
877
 
        remote hosts as well as multiplexing different PTCP
878
 
        connections to the same remote host.  The mapping keys,
879
 
        endpoint addresses, are three-tuples of:
880
 
 
881
 
            - The destination pseudo-port which is always C{1}
882
 
            - The source pseudo-port
883
 
            - A (host, port) tuple giving the UDP address of a PTCP
884
 
              peer holding the other side of the connection
885
 
 
886
 
        The mapping values, connection objects, are L{PTCPConnection}
887
 
        instances.
888
 
    @type _connections: C{dict}
889
 
 
890
 
    """
891
 
    # External API
892
 
 
893
 
    def __init__(self, factory):
894
 
        self.factory = factory
895
 
        self._allConnectionsClosed = _PendingEvent()
896
 
 
897
 
 
898
 
    def connect(self, factory, host, port, pseudoPort=1):
899
 
        """
900
 
        Attempt to establish a new connection via PTCP to the given
901
 
        remote address.
902
 
 
903
 
        @param factory: A L{ClientFactory} which will be used to
904
 
            create an L{IProtocol} provider if the connection is
905
 
            successfully set up, or which will have failure callbacks
906
 
            invoked on it otherwise.
907
 
 
908
 
        @param host: The IP address of another listening PTCP port to
909
 
            connect to.
910
 
        @type host: C{str}
911
 
 
912
 
        @param port: The port number of that other listening PTCP port
913
 
            to connect to.
914
 
        @type port: C{int}
915
 
 
916
 
        @param pseudoPort: Not really implemented.  Do not pass a
917
 
            value for this parameter or things will break.
918
 
 
919
 
        @return: A L{PTCPConnection} instance representing the new
920
 
            connection, but you really shouldn't use this for
921
 
            anything.  Write a protocol!
922
 
        """
923
 
        sourcePseudoPort = genConnID() % MAX_PSEUDO_PORT
924
 
        conn = self._connections[(pseudoPort, sourcePseudoPort, (host, port))
925
 
                                 ] = PTCPConnection(
926
 
            sourcePseudoPort, pseudoPort, self, factory, (host, port))
927
 
        conn.input(tcpdfa.APP_ACTIVE_OPEN)
928
 
        return conn
929
 
 
930
 
    def sendPacket(self, packet):
931
 
        if self.transportGoneAway:
932
 
            return
933
 
        self.transport.write(packet.encode(), packet.destination)
934
 
 
935
 
 
936
 
    # Internal stuff
937
 
    def startProtocol(self):
938
 
        self.transportGoneAway = False
939
 
        self._lastConnID = 10 # random.randrange(2 ** 32)
940
 
        self._connections = {}
941
 
 
942
 
    def _finalCleanup(self):
943
 
        """
944
 
        Clean up all of our connections by issuing application-level close and
945
 
        stop notifications, sending hail-mary final FIN packets (which may not
946
 
        reach the other end, but nevertheless can be useful) when possible.
947
 
        """
948
 
        for conn in self._connections.values():
949
 
            conn.immediateShutdown()
950
 
        assert not self._connections
951
 
 
952
 
    def stopProtocol(self):
953
 
        """
954
 
        Notification from twisted that our underlying port has gone away;
955
 
        make sure we're not going to try to send any packets through our
956
 
        transport and blow up, then shut down all of our protocols, issuing
957
 
        appr
958
 
        opriate application-level messages.
959
 
        """
960
 
        self.transportGoneAway = True
961
 
        self._finalCleanup()
962
 
 
963
 
    def cleanupAndClose(self):
964
 
        """
965
 
        Clean up all remaining connections, then close our transport.
966
 
 
967
 
        Although in a pinch we will do cleanup after our socket has gone away
968
 
        (if it does so unexpectedly, above in stopProtocol), we would really
969
 
        prefer to do cleanup while we still have access to a transport, since
970
 
        that way we can force out a few final packets and save the remote
971
 
        application an awkward timeout (if it happens to get through, which
972
 
        is generally likely).
973
 
        """
974
 
        self._finalCleanup()
975
 
        return self._stop()
976
 
 
977
 
    def datagramReceived(self, bytes, addr):
978
 
        if len(bytes) < _fixedSize:
979
 
            # It can't be any good.
980
 
            return
981
 
 
982
 
        pkt = PTCPPacket.decode(bytes, addr)
983
 
        try:
984
 
            pkt.verifyChecksum()
985
 
        except TruncatedDataError:
986
 
#             print '(ptcp packet truncated: %r)' % (pkt,)
987
 
            self.sendPacket(
988
 
                PTCPPacket.create(
989
 
                    pkt.destPseudoPort,
990
 
                    pkt.sourcePseudoPort,
991
 
                    0,
992
 
                    0,
993
 
                    struct.pack('!H', len(pkt.data)),
994
 
                    stb=True,
995
 
                    destination=addr))
996
 
        except GarbageDataError:
997
 
            print "garbage data!", pkt
998
 
        except ChecksumMismatchError, cme:
999
 
            print "bad checksum", pkt, cme
1000
 
            print repr(pkt.data)
1001
 
            print hex(pkt.checksum), hex(pkt.computeChecksum())
1002
 
        else:
1003
 
            self.packetReceived(pkt)
1004
 
 
1005
 
    stopped = False
1006
 
    def _stop(self, result=None):
1007
 
        if not self.stopped:
1008
 
            self.stopped = True
1009
 
            return self.transport.stopListening()
1010
 
        else:
1011
 
            return defer.succeed(None)
1012
 
 
1013
 
    def waitForAllConnectionsToClose(self):
1014
 
        """
1015
 
        Wait for all currently-open connections to enter the 'CLOSED' state.
1016
 
        Currently this is only usable from test fixtures.
1017
 
        """
1018
 
        if not self._connections:
1019
 
            return self._stop()
1020
 
        return self._allConnectionsClosed.deferred().addBoth(self._stop)
1021
 
 
1022
 
    def connectionClosed(self, ptcpConn):
1023
 
        packey = (ptcpConn.peerPseudoPort, ptcpConn.hostPseudoPort,
1024
 
                  ptcpConn.peerAddressTuple)
1025
 
        del self._connections[packey]
1026
 
        if ((not self.transportGoneAway) and
1027
 
            (not self._connections) and
1028
 
            self.factory is None):
1029
 
            self._stop()
1030
 
        if not self._connections:
1031
 
            self._allConnectionsClosed.callback(None)
1032
 
 
1033
 
    def packetReceived(self, packet):
1034
 
        packey = (packet.sourcePseudoPort, packet.destPseudoPort, packet.peerAddressTuple)
1035
 
        if packey not in self._connections:
1036
 
            if packet.flags == _SYN and packet.destPseudoPort == 1: # SYN and _ONLY_ SYN set.
1037
 
                conn = PTCPConnection(packet.destPseudoPort,
1038
 
                                      packet.sourcePseudoPort, self,
1039
 
                                      self.factory, packet.peerAddressTuple)
1040
 
                conn.input(tcpdfa.APP_PASSIVE_OPEN)
1041
 
                self._connections[packey] = conn
1042
 
            else:
1043
 
                log.msg("corrupted packet? %r %r %r" % (packet,packey, self._connections))
1044
 
                return
1045
 
        try:
1046
 
            self._connections[packey].packetReceived(packet)
1047
 
        except:
1048
 
            log.msg("PTCPConnection error on %r:" % (packet,))
1049
 
            log.err()
1050
 
            del self._connections[packey]