1
# -*- test-case-name: vertex.test.test_ptcp -*-
5
from binascii import crc32 # used to use zlib.crc32 - but that gives different
6
# results on 64-bit platforms!!
10
from twisted.python.failure import Failure
11
from twisted.internet.defer import Deferred
12
from twisted.internet import protocol, error, reactor, defer
13
from twisted.internet.main import CONNECTION_DONE
14
from twisted.python import log, util
16
from vertex import tcpdfa
17
from vertex.statemachine import StateError
20
genConnID = itertools.count(8).next
22
MAX_PSEUDO_PORT = (2 ** 16)
24
_packetFormat = ('!' # WTF did you think
25
'H' # sourcePseudoPort
28
'L' # acknowledgementNumber
32
# (signed because of binascii.crc32)
35
_fixedSize = struct.calcsize(_packetFormat)
37
_SYN, _ACK, _FIN, _RST, _STB = [1 << n for n in range(5)]
40
def setter(self, value):
45
return property(lambda self: bool(self.flags & flag), setter)
47
def relativeSequence(wireSequence, initialSequence, lapNumber):
48
""" Compute a relative sequence number from a wire sequence number so that we
49
can use natural Python comparisons on it, such as <, >, ==.
51
@param wireSequence: the sequence number received on the wire.
53
@param initialSequence: the ISN for this sequence, negotiated at SYN time.
55
@param lapNumber: the number of times that this value has wrapped around
58
return (wireSequence + (lapNumber * (2**32))) - initialSequence
60
class PTCPPacket(util.FancyStrMixin, object):
62
('sourcePseudoPort', 'sourcePseudoPort', '%d'),
63
('destPseudoPort', 'destPseudoPort', '%d'),
64
('shortdata', 'data', '%r'),
65
('niceflags', 'flags', '%s'),
66
('dlen', 'dlen', '%d'),
67
('seqNum', 'seq', '%d'),
68
('ackNum', 'ack', '%d'),
69
('checksum', 'checksum', '%x'),
70
('peerAddressTuple', 'peerAddress', '%r'),
71
('retransmitCount', 'retransmitCount', '%d'),
80
# Number of retransmit attempts left for this segment. When it reaches
81
# zero, this segment is dead.
86
if len(self.data) > 13:
87
return self.data[:5] + '...' + self.data[-5:]
91
shortdata = property(*shortdata())
97
(self.syn, 'S'), (self.ack, 'A'), (self.fin, 'F'),
98
(self.rst, 'R'), (self.stb, 'T')]:
99
res.append(f and v or '.')
102
niceflags = property(*niceflags())
105
sourcePseudoPort, destPseudoPort,
106
seqNum, ackNum, data,
108
syn=False, ack=False, fin=False,
109
rst=False, stb=False,
111
i = cls(sourcePseudoPort, destPseudoPort,
112
seqNum, ackNum, window,
113
0, 0, len(data), data)
119
i.checksum = i.computeChecksum()
120
i.destination = destination
122
create = classmethod(create)
128
seqNum, ackNum, window, flags,
129
checksum, dlen, data, peerAddressTuple=None,
130
seqOffset=0, ackOffset=0, seqLaps=0, ackLaps=0):
131
self.sourcePseudoPort = sourcePseudoPort
132
self.destPseudoPort = destPseudoPort
137
self.checksum = checksum
140
self.peerAddressTuple = peerAddressTuple # None if local
142
self.seqOffset = seqOffset
143
self.ackOffset = ackOffset
144
self.seqLaps = seqLaps
145
self.ackLaps = ackLaps
147
def segmentLength(self):
148
"""RFC page 26: 'The segment length (SEG.LEN) includes both data and sequence
149
space occupying controls'
151
return self.dlen + self.syn + self.fin
153
def relativeSeq(self):
154
return relativeSequence(self.seqNum, self.seqOffset, self.seqLaps)
156
def relativeAck(self):
157
return relativeSequence(self.ackNum, self.ackOffset, self.ackLaps)
160
def verifyChecksum(self):
161
if len(self.data) != self.dlen:
162
if len(self.data) > self.dlen:
163
raise GarbageDataError(self)
165
raise TruncatedDataError(self)
166
expected = self.computeChecksum()
167
received = self.checksum
168
if expected != received:
169
raise ChecksumMismatchError(expected, received)
171
def computeChecksum(self):
172
return crc32(self.data)
174
def decode(cls, bytes, hostPortPair):
175
fields = struct.unpack(_packetFormat, bytes[:_fixedSize])
176
sourcePseudoPort, destPseudoPort, seq, ack, window, flags, checksum, dlen = fields
177
data = bytes[_fixedSize:]
178
pkt = cls(sourcePseudoPort, destPseudoPort, seq, ack, window, flags,
179
checksum, dlen, data, hostPortPair)
181
decode = classmethod(decode)
183
def mustRetransmit(self):
184
"""Check to see if this packet must be retransmitted until it was received.
186
if self.syn or self.fin or self.dlen:
191
dlen = len(self.data)
192
checksum = self.computeChecksum()
195
self.sourcePseudoPort, self.destPseudoPort,
196
self.seqNum, self.ackNum, self.window,
197
self.flags, checksum, dlen) + self.data
199
def fragment(self, mtu):
202
assert not self.syn, "should not be originating syn packets w/ data"
205
# XXX TODO: need to take seqLaps into account, etc.
206
for chunk in iterchunks(self.data, mtu):
207
last = self.create(self.sourcePseudoPort,
209
self.seqNum + seqOfft,
213
destination=self.destination,
216
seqOfft += len(chunk)
219
last.checksum = last.computeChecksum()
223
def iterchunks(data, chunksize):
224
"""iterate chunks of data
227
while offt < len(data):
228
yield data[offt:offt+chunksize]
234
Initial Sequence Number generator.
236
# return int((time.time() * 1000000) / 4) % 2**32
240
def segmentAcceptable(RCV_NXT, RCV_WND, SEG_SEQ, SEG_LEN):
242
if SEG_LEN == 0 and RCV_WND == 0:
243
return SEG_SEQ == RCV_NXT
244
if SEG_LEN == 0 and RCV_WND > 0:
245
return ((RCV_NXT <= SEG_SEQ) and (SEG_SEQ < RCV_NXT + RCV_WND))
246
if SEG_LEN > 0 and RCV_WND == 0:
248
if SEG_LEN > 0 and RCV_WND > 0:
249
return (( (RCV_NXT <= SEG_SEQ) and (SEG_SEQ < RCV_NXT + RCV_WND))
250
or ((RCV_NXT <= SEG_SEQ+SEG_LEN-1) and
251
(SEG_SEQ+SEG_LEN-1 < RCV_NXT + RCV_WND)))
252
assert 0, 'Should be impossible to get here.'
255
class BadPacketError(Exception):
257
A packet was bad for some reason.
260
class ChecksumMismatchError(Exception):
262
The checksum and data received did not match.
265
class TruncatedDataError(Exception):
267
The packet was truncated in transit, and all of the data did not arrive.
270
class GarbageDataError(Exception):
272
Too much data was received (???)
275
class PTCPConnection(tcpdfa.TCP):
277
Implementation of RFC 793 state machine.
279
@ivar oldestUnackedSendSeqNum: (TCP RFC: SND.UNA) The oldest (relative)
280
sequence number referring to an octet which we have sent or may send which
281
is unacknowledged. This begins at 0, which is special because it is not
282
for an octet, but rather for the initial SYN packet. Unless it is 0, this
283
represents the sequence number of self._outgoingBytes[0].
285
@ivar nextSendSeqNum: (TCP RFC: SND.NXT) The next (relative) sequence
286
number that we will send to our peer after the current buffered segments
287
have all been acknowledged. This is the sequence number of the
288
not-yet-extant octet in the stream at
289
self._outgoingBytes[len(self._outgoingBytes)].
291
@ivar nextRecvSeqNum: (TCP RFC: RCV.NXT) The next (relative) sequence
292
number that the peer should send to us if they want to send more data;
293
their first unacknowledged sequence number as far as we are concerned; the
294
left or lower edge of the receive window; the sequence number of the first
295
octet that has not been delivered to the application. changed whenever we
296
receive an appropriate ACK.
298
@ivar peerSendISN: the initial sequence number that the peer sent us during
299
the negotiation phase. All peer-relative sequence numbers are computed
300
using this. (see C{relativeSequence}).
302
@ivar hostSendISN: the initial sequence number that the we sent during the
303
negotiation phase. All host-relative sequence numbers are computed using
304
this. (see C{relativeSequence})
306
@ivar retransmissionQueue: a list of packets to be re-sent until their
307
acknowledgements come through.
309
@ivar recvWindow: (TCP RFC: RCV.WND) - the size [in octets] of the current
310
window allowed by this host, to be in transit from the other host.
312
@ivar sendWindow: (TCP RFC: SND.WND) - the size [in octets] of the current
313
window allowed by our peer, to be in transit from us.
317
mtu = 512 - _fixedSize
321
sendWindowRemaining = mtu * 2
326
hostPseudoPort, peerPseudoPort,
327
ptcp, factory, peerAddressTuple):
328
tcpdfa.TCP.__init__(self)
329
self.hostPseudoPort = hostPseudoPort
330
self.peerPseudoPort = peerPseudoPort
332
self.factory = factory
333
self._receiveBuffer = []
334
self.retransmissionQueue = []
335
self.peerAddressTuple = peerAddressTuple
337
self.oldestUnackedSendSeqNum = 0
338
self.nextSendSeqNum = 0
340
self.nextRecvSeqNum = 0
342
self.setPeerISN = False
346
def packetReceived(self, packet):
347
# XXX TODO: probably have to do something to the packet here to
348
# identify its relative sequence number.
350
# print 'received', self, packet
354
[self.mtu] = struct.unpack('!H', packet.data)
356
for pkt in self.retransmissionQueue:
357
rq.extend(pkt.fragment(self.mtu))
358
self.retransmissionQueue = rq
364
generatedStateMachineInput = False
367
# Whoops, what? SYNs probably can contain data, I think, but I
368
# certainly don't see anything in the spec about how to deal
369
# with this or in ethereal for how linux deals with it -glyph
370
raise BadPacketError(
371
"currently no data allowed in SYN packets: %r"
374
assert packet.segmentLength() == 1
375
if self.peerAddressTuple is None:
377
assert self.wasEverListen, "Clients must specify a connect address."
378
self.peerAddressTuple = packet.peerAddressTuple
381
assert self.peerAddressTuple == packet.peerAddressTuple
383
if self.peerSendISN != packet.seqNum:
384
raise BadPacketError(
385
"Peer ISN was already set to %s but incoming packet "
386
"tried to set it to %s" % (
387
self.peerSendISN, packet.seqNum))
388
if not self.retransmissionQueue:
389
# If our retransmissionQueue is hot, we are going to send
390
# them an ACK to this with the next packet we send them
391
# anyway; as a bonus, this will properly determine whether
392
# we're sending a SYN+ACK or merely an ACK; the only time
393
# we send an ACK is when we have nothing to say to them and
394
# they're blocked on getting a response to their SYN+ACK
396
self.originate(ack=True)
398
self.setPeerISN = True
399
self.peerSendISN = packet.seqNum
400
# syn, fin, and data are mutually exclusive, so this relative
401
# sequence-number increment is done both here, and below in the
402
# data/fin processing block.
403
self.nextRecvSeqNum += packet.segmentLength()
405
generatedStateMachineInput = True
406
self.input(tcpdfa.SYN)
408
SEG_ACK = packet.relativeAck() # aliasing this for easier reading w/
411
if (self.oldestUnackedSendSeqNum < SEG_ACK and
412
SEG_ACK <= self.nextSendSeqNum):
413
# According to the spec, an 'acceptable ack
414
rq = self.retransmissionQueue
416
segmentOnQueue = rq[0]
417
qSegSeq = segmentOnQueue.relativeSeq()
418
if qSegSeq + segmentOnQueue.segmentLength() <= SEG_ACK:
419
# fully acknowledged, as per RFC!
422
self.sendWindowRemaining += segmentOnQueue.segmentLength()
423
# print 'inc send window', self, self.sendWindowRemaining
424
if segmentOnQueue.syn:
426
sminput = tcpdfa.SYN_ACK
429
elif segmentOnQueue.fin:
431
if sminput is not None:
432
# print 'ack input:', segmentOnQueue, packet, sminput
433
generatedStateMachineInput = True
438
# write buffer is empty; alert the application layer.
439
self._writeBufferEmpty()
440
self.oldestUnackedSendSeqNum = SEG_ACK
443
assert generatedStateMachineInput
446
# XXX TODO: examine 'window' field and adjust sendWindowRemaining
447
# is it 'occupying a portion of valid receive sequence space'? I think
448
# this means 'packet which might acceptably contain useful data'
449
if not packet.segmentLength():
450
assert packet.ack, "What the _HELL_ is wrong with this packet:" +str(packet)
453
if not segmentAcceptable(self.nextRecvSeqNum,
455
packet.relativeSeq(),
456
packet.segmentLength()):
457
# We have to transmit an ack here since it's old data. We probably
458
# need to ack in more states than just ESTABLISHED... but which
460
if not self.retransmissionQueue:
461
self.originate(ack=True)
464
# OK! It's acceptable! Let's process the various bits of data.
465
# Where is the useful data in the packet?
466
if packet.relativeSeq() > self.nextRecvSeqNum:
467
# XXX: Here's what's going on. Data can be 'in the window', but
468
# still in the future. For example, if I have a window of length 3
469
# and I send segments DATA1(len 1) DATA2(len 1) FIN and you receive
470
# them in the order FIN DATA1 DATA2, you don't actually want to
471
# process the FIN until you've processed the data.
473
# For the moment we are just dropping anything that isn't exactly
474
# the next thing we want to process. This is perfectly valid;
475
# these packets might have been dropped, so the other end will have
476
# to retransmit them anyway.
480
assert not packet.syn, 'no seriously I _do not_ know how to handle this'
481
usefulData = packet.data[self.nextRecvSeqNum - packet.relativeSeq():]
482
# DONT check/slice the window size here, the acceptability code
483
# checked it, we can over-ack if the other side is buggy (???)
484
if self.protocol is not None:
486
self.protocol.dataReceived(usefulData)
489
self.loseConnection()
491
self.nextRecvSeqNum += packet.segmentLength()
492
if self.state == tcpdfa.ESTABLISHED:
493
# In all other states, the state machine takes care of sending ACKs
494
# in its output process.
495
self.originate(ack=True)
498
self.input(tcpdfa.FIN)
502
tupl = self.ptcp.transport.getHost()
503
return PTCPAddress((tupl.host, tupl.port),
507
return PTCPAddress(self.peerAddressTuple,
513
def write(self, bytes):
514
assert not self.disconnected, 'Writing to a transport that was already disconnected.'
515
self._outgoingBytes += bytes
519
def writeSequence(self, seq):
520
self.write(''.join(seq))
523
def _writeLater(self):
524
if self._nagle is None:
525
self._nagle = reactor.callLater(0.001, self._reallyWrite)
527
def _originateOneData(self):
528
amount = min(self.sendWindowRemaining, self.mtu)
529
sendOut = self._outgoingBytes[:amount]
530
# print 'originating data packet', len(sendOut)
531
self._outgoingBytes = self._outgoingBytes[amount:]
532
self.sendWindowRemaining -= len(sendOut)
533
self.originate(ack=True, data=sendOut)
535
def _reallyWrite(self):
536
# print self, 'really writing', self._paused
538
if self._outgoingBytes:
539
# print 'window and bytes', self.sendWindowRemaining, len(self._outgoingBytes)
540
while self.sendWindowRemaining and self._outgoingBytes:
541
self._originateOneData()
543
_retransmitter = None
544
_retransmitTimeout = 0.5
546
def _retransmitLater(self):
547
assert self.state != tcpdfa.CLOSED
548
if self._retransmitter is None:
549
self._retransmitter = reactor.callLater(self._retransmitTimeout, self._reallyRetransmit)
551
def _stopRetransmitting(self):
552
# used both as a quick-and-dirty test shutdown hack and a way to shut
553
# down when we die...
554
if self._retransmitter is not None:
555
self._retransmitter.cancel()
556
self._retransmitter = None
557
if self._nagle is not None:
560
if self._closeWaitLoseConnection is not None:
561
self._closeWaitLoseConnection.cancel()
562
self._closeWaitLoseConnection = None
564
def _reallyRetransmit(self):
565
# XXX TODO: packet fragmentation & coalescing.
566
# print 'Wee a retransmit! What I got?', self.retransmissionQueue
567
self._retransmitter = None
568
if self.retransmissionQueue:
569
for packet in self.retransmissionQueue:
570
packet.retransmitCount -= 1
571
if packet.retransmitCount:
572
packet.ackNum = self.currentAckNum()
573
self.ptcp.sendPacket(packet)
575
self.input(tcpdfa.TIMEOUT)
577
self._retransmitLater()
579
disconnecting = False # This is *TWISTED* level state-machine stuff,
582
def loseConnection(self):
583
if not self.disconnecting:
584
self.disconnecting = True
585
if not self._outgoingBytes:
586
self._writeBufferEmpty()
589
def _writeBufferEmpty(self):
590
if self._outgoingBytes:
592
elif self.producer is not None:
593
if (not self.streamingProducer) or self.producerPaused:
594
self.producerPaused = False
595
self.producer.resumeProducing()
596
elif self.disconnecting and (not self.disconnected
597
or self.state == tcpdfa.CLOSE_WAIT):
598
self.input(tcpdfa.APP_CLOSE)
601
def _writeBufferFull(self):
602
# print 'my write buffer is full'
603
if (self.producer is not None
604
and not self.producerPaused):
605
self.producerPaused = True
606
# print 'producer pausing'
607
self.producer.pauseProducing()
608
# print 'producer paused'
610
# print 'but I am not telling my producer to pause!'
611
# print ' ', self.producer, self.streamingProducer, self.producerPaused
617
producerPaused = False
618
streamingProducer = False
620
def registerProducer(self, producer, streaming):
621
if self.producer is not None:
623
"Cannot register producer %s, "
624
"because producer %s was never unregistered."
625
% (producer, self.producer))
626
if self.disconnected:
627
producer.stopProducing()
629
self.producer = producer
630
self.streamingProducer = streaming
631
if not streaming and not self._outgoingBytes:
632
producer.resumeProducing()
634
def unregisterProducer(self):
636
if not self._outgoingBytes:
637
self._writeBufferEmpty()
640
def pauseProducing(self):
643
def resumeProducing(self):
646
def currentAckNum(self):
647
return (self.nextRecvSeqNum + self.peerSendISN) % (2**32)
649
def originate(self, data='', syn=False, ack=False, fin=False):
651
# We really should be randomizing the ISN but until we finish the
652
# implementations of the various bits of wraparound logic that were
653
# started with relativeSequence
654
assert self.nextSendSeqNum == 0
655
assert self.hostSendISN == 0
656
p = PTCPPacket.create(self.hostPseudoPort,
658
seqNum=(self.nextSendSeqNum + self.hostSendISN) % (2**32),
659
ackNum=self.currentAckNum(),
661
window=self.recvWindow,
662
syn=syn, ack=ack, fin=fin,
663
destination=self.peerAddressTuple)
664
# do we want to enqueue this packet for retransmission?
665
sl = p.segmentLength()
666
self.nextSendSeqNum += sl
668
if p.mustRetransmit():
669
# print self, 'originating retransmittable packet', len(self.retransmissionQueue)
670
if self.retransmissionQueue:
671
if self.retransmissionQueue[-1].fin:
672
raise AssertionError("Sending %r after FIN??!" % (p,))
673
# print 'putting it on the queue'
674
self.retransmissionQueue.append(p)
675
# print 'and sending it later'
676
self._retransmitLater()
677
if not self.sendWindowRemaining: # len(self.retransmissionQueue) > 5:
678
# print 'oh no my queue is too big'
679
# This is a random number (5) because I ought to be summing the
680
# packet lengths or something.
681
self._writeBufferFull()
683
# print 'my queue is still small enough', len(self.retransmissionQueue), self, self.sendWindowRemaining
685
self.ptcp.sendPacket(p)
687
# State machine transition definitions, hooray.
688
def transition_SYN_SENT_to_CLOSED(self):
690
The connection never got anywhere. Goodbye.
692
# XXX CONNECTOR API OMFG
693
self.factory.clientConnectionFailed(None, error.TimeoutError())
696
wasEverListen = False
698
def enter_LISTEN(self):
699
# Spec says this is necessary for RST handling; we need it for making
700
# sure it's OK to bind port numbers.
701
self.wasEverListen = True
703
def enter_CLOSED(self):
704
self.ptcp.connectionClosed(self)
705
self._stopRetransmitting()
706
if self._timeWaitCall is not None:
707
self._timeWaitCall.cancel()
708
self._timeWaitCall = None
711
_timeWaitTimeout = 0.01 # REALLY fast timeout, right now this is for
714
def enter_TIME_WAIT(self):
715
self._stopRetransmitting()
716
self._timeWaitCall = reactor.callLater(self._timeWaitTimeout, self._do2mslTimeout)
718
def _do2mslTimeout(self):
719
self._timeWaitCall = None
720
self.input(tcpdfa.TIMEOUT)
722
peerAddressTuple = None
724
def transition_LISTEN_to_SYN_SENT(self):
726
Uh, what? We were listening and we tried to send some bytes.
727
This is an error for PTCP.
729
raise StateError("You can't write anything until someone connects to you.")
731
# def invalidInput(self, datum):
732
# print self, self.protocol, 'invalid input', datum
734
def pseudoPortPair():
736
return (self.hostPseudoPort,
739
pseudoPortPair = property(*pseudoPortPair())
741
def enter_ESTABLISHED(self):
743
We sent out SYN, they acknowledged it. Congratulations, you
744
have a new baby connection.
746
assert not self.disconnecting
747
assert not self.disconnected
749
p = self.factory.buildProtocol(PTCPAddress(
750
self.peerAddressTuple, self.pseudoPortPair))
751
p.makeConnection(self)
753
log.msg("Exception during PTCP connection setup.")
755
self.loseConnection()
759
def exit_ESTABLISHED(self):
760
assert not self.disconnected
761
self.disconnected = True
763
self.protocol.connectionLost(Failure(CONNECTION_DONE))
768
if self.producer is not None:
770
self.producer.stopProducing()
776
_closeWaitLoseConnection = None
778
def enter_CLOSE_WAIT(self):
779
# Twisted automatically reacts to network half-close by issuing a full
781
self._closeWaitLoseConnection = reactor.callLater(0.01, self._loseConnectionBecauseOfCloseWait)
783
def _loseConnectionBecauseOfCloseWait(self):
784
self._closeWaitLoseConnection = None
785
self.loseConnection()
787
def immediateShutdown(self):
788
"""_IMMEDIATELY_ shut down this connection, sending one (non-retransmitted)
789
app-close packet, emptying our buffers, clearing our producer and
790
getting ready to die right after this call.
792
self._outgoingBytes = ''
793
if self.state == tcpdfa.ESTABLISHED:
794
self.input(tcpdfa.APP_CLOSE)
795
self._stopRetransmitting()
796
self._reallyRetransmit()
798
# All states that we can reasonably be in handle a timeout; force our
799
# connection to think that it's become desynchronized with the other
800
# end so that it will totally shut itself down.
802
self.input(tcpdfa.TIMEOUT)
803
assert self._retransmitter is None
804
assert self._nagle is None
806
def output_ACK(self):
807
self.originate(ack=True)
809
def output_FIN(self):
810
self.originate(fin=True)
812
def output_SYN_ACK(self):
813
self.originate(syn=True, ack=True)
815
def output_SYN(self):
816
self.originate(syn=True)
818
class PTCPAddress(object):
821
def __init__(self, (host, port), (pseudoHostPort, pseudoPeerPort)):
824
self.pseudoHostPort = pseudoHostPort
825
self.pseudoPeerPort = pseudoPeerPort
828
return 'PTCPAddress((%r, %r), (%r, %r))' % (
829
self.host, self.port,
835
class _PendingEvent(object):
842
self.listeners.append(d)
846
def callback(self, result):
853
def errback(self, result=None):
863
class PTCP(protocol.DatagramProtocol):
865
L{PTCP} implements a strongly TCP-like protocol on top of UDP. It
866
provides a transport which is connection-oriented, streaming,
867
ordered, and reliable.
869
@ivar factory: A L{ServerFactory} which is used to create
870
L{IProtocol} providers whenever a new PTCP connection is made
873
@ivar _connections: A mapping of endpoint addresses to connection
874
objects. These are the active connections being multiplexed
875
over this UDP port. Many PTCP connections may run over the
876
same L{PTCP} instance, communicating with many different
877
remote hosts as well as multiplexing different PTCP
878
connections to the same remote host. The mapping keys,
879
endpoint addresses, are three-tuples of:
881
- The destination pseudo-port which is always C{1}
882
- The source pseudo-port
883
- A (host, port) tuple giving the UDP address of a PTCP
884
peer holding the other side of the connection
886
The mapping values, connection objects, are L{PTCPConnection}
888
@type _connections: C{dict}
893
def __init__(self, factory):
894
self.factory = factory
895
self._allConnectionsClosed = _PendingEvent()
898
def connect(self, factory, host, port, pseudoPort=1):
900
Attempt to establish a new connection via PTCP to the given
903
@param factory: A L{ClientFactory} which will be used to
904
create an L{IProtocol} provider if the connection is
905
successfully set up, or which will have failure callbacks
906
invoked on it otherwise.
908
@param host: The IP address of another listening PTCP port to
912
@param port: The port number of that other listening PTCP port
916
@param pseudoPort: Not really implemented. Do not pass a
917
value for this parameter or things will break.
919
@return: A L{PTCPConnection} instance representing the new
920
connection, but you really shouldn't use this for
921
anything. Write a protocol!
923
sourcePseudoPort = genConnID() % MAX_PSEUDO_PORT
924
conn = self._connections[(pseudoPort, sourcePseudoPort, (host, port))
926
sourcePseudoPort, pseudoPort, self, factory, (host, port))
927
conn.input(tcpdfa.APP_ACTIVE_OPEN)
930
def sendPacket(self, packet):
931
if self.transportGoneAway:
933
self.transport.write(packet.encode(), packet.destination)
937
def startProtocol(self):
938
self.transportGoneAway = False
939
self._lastConnID = 10 # random.randrange(2 ** 32)
940
self._connections = {}
942
def _finalCleanup(self):
944
Clean up all of our connections by issuing application-level close and
945
stop notifications, sending hail-mary final FIN packets (which may not
946
reach the other end, but nevertheless can be useful) when possible.
948
for conn in self._connections.values():
949
conn.immediateShutdown()
950
assert not self._connections
952
def stopProtocol(self):
954
Notification from twisted that our underlying port has gone away;
955
make sure we're not going to try to send any packets through our
956
transport and blow up, then shut down all of our protocols, issuing
958
opriate application-level messages.
960
self.transportGoneAway = True
963
def cleanupAndClose(self):
965
Clean up all remaining connections, then close our transport.
967
Although in a pinch we will do cleanup after our socket has gone away
968
(if it does so unexpectedly, above in stopProtocol), we would really
969
prefer to do cleanup while we still have access to a transport, since
970
that way we can force out a few final packets and save the remote
971
application an awkward timeout (if it happens to get through, which
972
is generally likely).
977
def datagramReceived(self, bytes, addr):
978
if len(bytes) < _fixedSize:
979
# It can't be any good.
982
pkt = PTCPPacket.decode(bytes, addr)
985
except TruncatedDataError:
986
# print '(ptcp packet truncated: %r)' % (pkt,)
990
pkt.sourcePseudoPort,
993
struct.pack('!H', len(pkt.data)),
996
except GarbageDataError:
997
print "garbage data!", pkt
998
except ChecksumMismatchError, cme:
999
print "bad checksum", pkt, cme
1000
print repr(pkt.data)
1001
print hex(pkt.checksum), hex(pkt.computeChecksum())
1003
self.packetReceived(pkt)
1006
def _stop(self, result=None):
1007
if not self.stopped:
1009
return self.transport.stopListening()
1011
return defer.succeed(None)
1013
def waitForAllConnectionsToClose(self):
1015
Wait for all currently-open connections to enter the 'CLOSED' state.
1016
Currently this is only usable from test fixtures.
1018
if not self._connections:
1020
return self._allConnectionsClosed.deferred().addBoth(self._stop)
1022
def connectionClosed(self, ptcpConn):
1023
packey = (ptcpConn.peerPseudoPort, ptcpConn.hostPseudoPort,
1024
ptcpConn.peerAddressTuple)
1025
del self._connections[packey]
1026
if ((not self.transportGoneAway) and
1027
(not self._connections) and
1028
self.factory is None):
1030
if not self._connections:
1031
self._allConnectionsClosed.callback(None)
1033
def packetReceived(self, packet):
1034
packey = (packet.sourcePseudoPort, packet.destPseudoPort, packet.peerAddressTuple)
1035
if packey not in self._connections:
1036
if packet.flags == _SYN and packet.destPseudoPort == 1: # SYN and _ONLY_ SYN set.
1037
conn = PTCPConnection(packet.destPseudoPort,
1038
packet.sourcePseudoPort, self,
1039
self.factory, packet.peerAddressTuple)
1040
conn.input(tcpdfa.APP_PASSIVE_OPEN)
1041
self._connections[packey] = conn
1043
log.msg("corrupted packet? %r %r %r" % (packet,packey, self._connections))
1046
self._connections[packey].packetReceived(packet)
1048
log.msg("PTCPConnection error on %r:" % (packet,))
1050
del self._connections[packey]