240
////////////////////////////////////////////////////////
241
// PseudoTcpFifo works exactly like FifoBuffer in libjingle
242
////////////////////////////////////////////////////////
254
pseudo_tcp_fifo_init (PseudoTcpFifo *b, gsize size)
256
b->buffer = g_slice_alloc (size);
257
b->buffer_length = size;
261
pseudo_tcp_fifo_clear (PseudoTcpFifo *b)
264
g_slice_free1 (b->buffer_length, b->buffer);
266
b->buffer_length = 0;
270
pseudo_tcp_fifo_get_buffered (PseudoTcpFifo *b)
272
return b->data_length;
276
pseudo_tcp_fifo_set_capacity (PseudoTcpFifo *b, gsize size)
278
if (b->data_length > size)
281
if (size != b->data_length) {
282
guint8 *buffer = g_slice_alloc (size);
283
gsize copy = b->data_length;
284
gsize tail_copy = min (copy, b->buffer_length - b->read_position);
286
memcpy (buffer, &b->buffer[b->read_position], tail_copy);
287
memcpy (buffer + tail_copy, &b->buffer[0], copy - tail_copy);
288
g_slice_free1 (b->buffer_length, b->buffer);
290
b->buffer_length = size;
291
b->read_position = 0;
298
pseudo_tcp_fifo_consume_read_data (PseudoTcpFifo *b, gsize size)
300
g_assert (size <= b->data_length);
302
b->read_position = (b->read_position + size) % b->buffer_length;
303
b->data_length -= size;
307
pseudo_tcp_fifo_consume_write_buffer (PseudoTcpFifo *b, gsize size)
309
g_assert (size <= b->buffer_length - b->data_length);
311
b->data_length += size;
315
pseudo_tcp_fifo_get_write_remaining (PseudoTcpFifo *b)
317
return b->buffer_length - b->data_length;
321
pseudo_tcp_fifo_read_offset (PseudoTcpFifo *b, guint8 *buffer, gsize bytes,
324
gsize available = b->data_length - offset;
325
gsize read_position = (b->read_position + offset) % b->buffer_length;
326
gsize copy = min (bytes, available);
327
gsize tail_copy = min(copy, b->buffer_length - read_position);
330
if (offset >= b->data_length)
333
memcpy(buffer, &b->buffer[read_position], tail_copy);
334
memcpy(buffer + tail_copy, &b->buffer[0], copy - tail_copy);
340
pseudo_tcp_fifo_write_offset (PseudoTcpFifo *b, const guint8 *buffer,
341
gsize bytes, gsize offset)
343
gsize available = b->buffer_length - b->data_length - offset;
344
gsize write_position = (b->read_position + b->data_length + offset)
346
gsize copy = min (bytes, available);
347
gsize tail_copy = min(copy, b->buffer_length - write_position);
349
if (b->data_length + offset >= b->buffer_length) {
353
memcpy(&b->buffer[write_position], buffer, tail_copy);
354
memcpy(&b->buffer[0], buffer + tail_copy, copy - tail_copy);
360
pseudo_tcp_fifo_read (PseudoTcpFifo *b, guint8 *buffer, gsize bytes)
364
copy = pseudo_tcp_fifo_read_offset (b, buffer, bytes, 0);
366
b->read_position = (b->read_position + copy) % b->buffer_length;
367
b->data_length -= copy;
373
pseudo_tcp_fifo_write (PseudoTcpFifo *b, const guint8 *buffer, gsize bytes)
377
copy = pseudo_tcp_fifo_write_offset (b, buffer, bytes, 0);
378
b->data_length += copy;
231
384
//////////////////////////////////////////////////////////////////////
233
386
//////////////////////////////////////////////////////////////////////
289
gchar rbuf[kRcvBufSize];
290
guint32 rcv_nxt, rcv_wnd, rlen, lastrecv;
434
guint32 rbuf_len, rcv_nxt, rcv_wnd, lastrecv;
435
guint8 rwnd_scale; // Window scale factor
294
gchar sbuf[kSndBufSize];
295
guint32 snd_nxt, snd_wnd, slen, lastsend, snd_una;
441
guint32 sbuf_len, snd_nxt, snd_wnd, lastsend;
442
guint32 snd_una; /* oldest unacknowledged sequence number */
443
guint8 swnd_scale; // Window scale factor
296
446
// Maximum segment size, estimated protocol level, largest segment sent
297
447
guint32 mss, msslevel, largest, mtu_advise;
298
448
// Retransmit timer
330
494
static void pseudo_tcp_socket_finalize (GObject *object);
497
static void queue_connect_message (PseudoTcpSocket *self);
333
498
static guint32 queue(PseudoTcpSocket *self, const gchar * data,
334
499
guint32 len, gboolean bCtrl);
335
500
static PseudoTcpWriteResult packet(PseudoTcpSocket *self, guint32 seq,
336
guint8 flags, const gchar * data, guint32 len);
337
static gboolean parse(PseudoTcpSocket *self,
338
const guint8 * buffer, guint32 size);
501
guint8 flags, guint32 offset, guint32 len, guint32 now);
502
static gboolean parse (PseudoTcpSocket *self,
503
const guint8 *_header_buf, gsize header_buf_len,
504
const guint8 *data_buf, gsize data_buf_len);
339
505
static gboolean process(PseudoTcpSocket *self, Segment *seg);
340
static gboolean transmit(PseudoTcpSocket *self, const GList *seg, guint32 now);
506
static gboolean transmit(PseudoTcpSocket *self, SSegment *sseg, guint32 now);
341
507
static void attempt_send(PseudoTcpSocket *self, SendFlags sflags);
342
508
static void closedown(PseudoTcpSocket *self, guint32 err);
343
509
static void adjustMTU(PseudoTcpSocket *self);
510
static void parse_options (PseudoTcpSocket *self, const guint8 *data,
512
static void resize_send_buffer (PseudoTcpSocket *self, guint32 new_size);
513
static void resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size);
346
516
// The following logging is for detailed (packet-level) pseudotcp analysis only.
393
563
TCP_LISTEN, TCP_CLOSED, TCP_LISTEN,
394
564
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
566
g_object_class_install_property (object_class, PROP_ACK_DELAY,
567
g_param_spec_uint ("ack-delay", "ACK Delay",
568
"Delayed ACK timeout (in milliseconds)",
569
0, G_MAXUINT, DEFAULT_ACK_DELAY,
570
G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
572
g_object_class_install_property (object_class, PROP_NO_DELAY,
573
g_param_spec_boolean ("no-delay", "No Delay",
574
"Disable the Nagle algorithm (like the TCP_NODELAY option)",
576
G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
578
g_object_class_install_property (object_class, PROP_RCV_BUF,
579
g_param_spec_uint ("rcv-buf", "Receive Buffer",
580
"Receive Buffer size",
581
1, G_MAXUINT, DEFAULT_RCV_BUF_SIZE,
582
G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
584
g_object_class_install_property (object_class, PROP_SND_BUF,
585
g_param_spec_uint ("snd-buf", "Send Buffer",
587
1, G_MAXUINT, DEFAULT_SND_BUF_SIZE,
588
G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
489
709
priv->shutdown = SD_NONE;
712
priv->rbuf_len = DEFAULT_RCV_BUF_SIZE;
713
pseudo_tcp_fifo_init (&priv->rbuf, priv->rbuf_len);
714
priv->sbuf_len = DEFAULT_SND_BUF_SIZE;
715
pseudo_tcp_fifo_init (&priv->sbuf, priv->sbuf_len);
492
717
priv->state = TCP_LISTEN;
494
priv->rcv_wnd = sizeof(priv->rbuf);
495
priv->snd_nxt = priv->slen = 0;
719
g_queue_init (&priv->slist);
720
g_queue_init (&priv->unsent_slist);
721
priv->rcv_wnd = priv->rbuf_len;
722
priv->rwnd_scale = priv->swnd_scale = 0;
496
724
priv->snd_wnd = 1;
497
priv->snd_una = priv->rcv_nxt = priv->rlen = 0;
725
priv->snd_una = priv->rcv_nxt = 0;
498
726
priv->bReadEnable = TRUE;
499
727
priv->bWriteEnable = FALSE;
629
880
pseudo_tcp_socket_notify_packet(PseudoTcpSocket *self,
630
881
const gchar * buffer, guint32 len)
632
885
if (len > MAX_PACKET) {
633
886
//LOG_F(WARNING) << "packet too large";
636
return parse(self, (guint8 *) buffer, len);
640
pseudo_tcp_socket_get_next_clock(PseudoTcpSocket *self, long *timeout)
888
} else if (len < HEADER_SIZE) {
889
//LOG_F(WARNING) << "packet too small";
893
/* Hold a reference to the PseudoTcpSocket during parsing, since it may be
894
* closed from within a callback. */
896
retval = parse (self, (guint8 *) buffer, HEADER_SIZE,
897
(guint8 *) buffer + HEADER_SIZE, len - HEADER_SIZE);
898
g_object_unref (self);
903
/* Assume there are two buffers in the given #NiceInputMessage: a 24-byte one
904
* containing the header, and a bigger one for the data. */
906
pseudo_tcp_socket_notify_message (PseudoTcpSocket *self,
907
NiceInputMessage *message)
911
g_assert_cmpuint (message->n_buffers, >, 0);
913
if (message->n_buffers == 1)
914
return pseudo_tcp_socket_notify_packet (self, message->buffers[0].buffer,
915
message->buffers[0].size);
917
g_assert_cmpuint (message->n_buffers, ==, 2);
918
g_assert_cmpuint (message->buffers[0].size, ==, HEADER_SIZE);
920
if (message->length > MAX_PACKET) {
921
//LOG_F(WARNING) << "packet too large";
923
} else if (message->length < HEADER_SIZE) {
924
//LOG_F(WARNING) << "packet too small";
928
/* Hold a reference to the PseudoTcpSocket during parsing, since it may be
929
* closed from within a callback. */
931
retval = parse (self, message->buffers[0].buffer, message->buffers[0].size,
932
message->buffers[1].buffer, message->length - message->buffers[0].size);
933
g_object_unref (self);
939
pseudo_tcp_socket_get_next_clock(PseudoTcpSocket *self, guint64 *timeout)
642
941
PseudoTcpSocketPrivate *priv = self->priv;
643
942
guint32 now = get_current_time ();
645
945
if (priv->shutdown == SD_FORCEFUL)
948
snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
648
949
if ((priv->shutdown == SD_GRACEFUL)
649
950
&& ((priv->state != TCP_ESTABLISHED)
650
|| ((priv->slen == 0) && (priv->t_ack == 0)))) {
951
|| ((snd_buffered == 0) && (priv->t_ack == 0)))) {
955
if (*timeout == 0 || *timeout < now)
956
*timeout = now + CLOSED_TIMEOUT;
654
958
if (priv->state == TCP_CLOSED) {
655
*timeout = CLOSED_TIMEOUT;
959
*timeout = min (*timeout, now + CLOSED_TIMEOUT);
659
*timeout = DEFAULT_TIMEOUT;
963
*timeout = min (*timeout, now + DEFAULT_TIMEOUT);
661
965
if (priv->t_ack) {
662
*timeout = min(*timeout, time_diff(priv->t_ack + ACK_DELAY, now));
966
*timeout = min(*timeout, priv->t_ack + priv->ack_delay);
664
968
if (priv->rto_base) {
665
*timeout = min(*timeout, time_diff(priv->rto_base + priv->rx_rto, now));
969
*timeout = min(*timeout, priv->rto_base + priv->rx_rto);
667
971
if (priv->snd_wnd == 0) {
668
*timeout = min(*timeout, time_diff(priv->lastsend + priv->rx_rto, now));
972
*timeout = min(*timeout, priv->lastsend + priv->rx_rto);
676
980
pseudo_tcp_socket_recv(PseudoTcpSocket *self, char * buffer, size_t len)
678
982
PseudoTcpSocketPrivate *priv = self->priv;
984
gsize available_space;
681
986
if (priv->state != TCP_ESTABLISHED) {
682
987
priv->error = ENOTCONN;
686
if (priv->rlen == 0) {
994
bytesread = pseudo_tcp_fifo_read (&priv->rbuf, (guint8 *) buffer, len);
996
// If there's no data in |m_rbuf|.
997
if (bytesread == 0) {
687
998
priv->bReadEnable = TRUE;
688
999
priv->error = EWOULDBLOCK;
692
read = min((guint32) len, priv->rlen);
693
memcpy(buffer, priv->rbuf, read);
696
/* !?! until we create a circular buffer, we need to move all of the rest
698
memmove(priv->rbuf, priv->rbuf + read, sizeof(priv->rbuf) - read);
700
if ((sizeof(priv->rbuf) - priv->rlen - priv->rcv_wnd)
701
>= min(sizeof(priv->rbuf) / 2, priv->mss)) {
1003
available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf);
1005
if (available_space - priv->rcv_wnd >=
1006
min (priv->rbuf_len / 2, priv->mss)) {
702
1007
// !?! Not sure about this was closed business
703
1008
gboolean bWasClosed = (priv->rcv_wnd == 0);
705
priv->rcv_wnd = sizeof(priv->rbuf) - priv->rlen;
1010
priv->rcv_wnd = available_space;
707
1012
if (bWasClosed) {
708
1013
attempt_send(self, sfImmediateAck);
762
1071
queue(PseudoTcpSocket *self, const gchar * data, guint32 len, gboolean bCtrl)
764
1073
PseudoTcpSocketPrivate *priv = self->priv;
1074
gsize available_space;
766
if (len > sizeof(priv->sbuf) - priv->slen) {
1076
available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
1077
if (len > available_space) {
767
1078
g_assert(!bCtrl);
768
len = sizeof(priv->sbuf) - priv->slen;
1079
len = available_space;
771
1082
// We can concatenate data if the last segment is the same type
772
1083
// (control v. regular data), and has not been transmitted yet
773
if (g_list_length (priv->slist) > 0 &&
774
(((SSegment *)g_list_last (priv->slist)->data)->bCtrl == bCtrl) &&
775
(((SSegment *)g_list_last (priv->slist)->data)->xmit == 0)) {
776
((SSegment *)g_list_last (priv->slist)->data)->len += len;
1084
if (g_queue_get_length (&priv->slist) &&
1085
(((SSegment *)g_queue_peek_tail (&priv->slist))->bCtrl == bCtrl) &&
1086
(((SSegment *)g_queue_peek_tail (&priv->slist))->xmit == 0)) {
1087
((SSegment *)g_queue_peek_tail (&priv->slist))->len += len;
778
1089
SSegment *sseg = g_slice_new0 (SSegment);
779
sseg->seq = priv->snd_una + priv->slen;
1090
gsize snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
1092
sseg->seq = priv->snd_una + snd_buffered;
780
1093
sseg->len = len;
781
1094
sseg->bCtrl = bCtrl;
782
priv->slist = g_list_append (priv->slist, sseg);
1095
g_queue_push_tail (&priv->slist, sseg);
1096
g_queue_push_tail (&priv->unsent_slist, sseg);
785
memcpy(priv->sbuf + priv->slen, data, len);
787
1099
//LOG(LS_INFO) << "PseudoTcp::queue - priv->slen = " << priv->slen;
1100
return pseudo_tcp_fifo_write (&priv->sbuf, (guint8*) data, len);;
1103
// Creates a packet and submits it to the network. This method can either
1104
// send payload or just an ACK packet.
1106
// |seq| is the sequence number of this packet.
1107
// |flags| is the flags for sending this packet.
1108
// |offset| is the offset to read from |m_sbuf|.
1109
// |len| is the number of bytes to read from |m_sbuf| as payload. If this
1110
// value is 0 then this is an ACK packet, otherwise this packet has payload.
791
1112
static PseudoTcpWriteResult
792
1113
packet(PseudoTcpSocket *self, guint32 seq, guint8 flags,
793
const gchar * data, guint32 len)
1114
guint32 offset, guint32 len, guint32 now)
795
1116
PseudoTcpSocketPrivate *priv = self->priv;
796
guint32 now = get_current_time();
797
guint8 buffer[MAX_PACKET];
1118
guint8 u8[MAX_PACKET];
1119
guint16 u16[MAX_PACKET / 2];
1120
guint32 u32[MAX_PACKET / 4];
798
1122
PseudoTcpWriteResult wres = WR_SUCCESS;
800
1124
g_assert(HEADER_SIZE + len <= MAX_PACKET);
802
*((guint32 *) buffer) = htonl(priv->conv);
803
*((guint32 *) (buffer + 4)) = htonl(seq);
804
*((guint32 *) (buffer + 8)) = htonl(priv->rcv_nxt);
807
*((guint16 *) (buffer + 14)) = htons((guint16)priv->rcv_wnd);
1126
*buffer.u32 = htonl(priv->conv);
1127
*(buffer.u32 + 1) = htonl(seq);
1128
*(buffer.u32 + 2) = htonl(priv->rcv_nxt);
1130
buffer.u8[13] = flags;
1131
*(buffer.u16 + 7) = htons((guint16)(priv->rcv_wnd >> priv->rwnd_scale));
809
1133
// Timestamp computations
810
*((guint32 *) (buffer + 16)) = htonl(now);
811
*((guint32 *) (buffer + 20)) = htonl(priv->ts_recent);
1134
*(buffer.u32 + 4) = htonl(now);
1135
*(buffer.u32 + 5) = htonl(priv->ts_recent);
812
1136
priv->ts_lastack = priv->rcv_nxt;
815
memcpy(buffer + HEADER_SIZE, data, len);
1141
bytes_read = pseudo_tcp_fifo_read_offset (&priv->sbuf, buffer.u8 + HEADER_SIZE,
1143
g_assert (bytes_read == len);
817
1146
DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "<-- <CONV=%d><FLG=%d><SEQ=%d:%d><ACK=%d>"
818
1147
"<WND=%d><TS=%d><TSR=%d><LEN=%d>",
819
1148
priv->conv, (unsigned)flags, seq, seq + len, priv->rcv_nxt, priv->rcv_wnd,
820
1149
now % 10000, priv->ts_recent % 10000, len);
822
wres = priv->callbacks.WritePacket(self, (gchar *) buffer, len + HEADER_SIZE,
1151
wres = priv->callbacks.WritePacket(self, (gchar *) buffer.u8, len + HEADER_SIZE,
823
1152
priv->callbacks.user_data);
824
/* Note: When data is NULL, this is an ACK packet. We don't read the
1153
/* Note: When len is 0, this is an ACK packet. We don't read the
825
1154
return value for those, and thus we won't retry. So go ahead and treat
826
1155
the packet as a success (basically simulate as if it were dropped),
827
1156
which will prevent our timers from being messed up. */
828
if ((wres != WR_SUCCESS) && (NULL != data))
1157
if ((wres != WR_SUCCESS) && (0 != len))
831
1160
priv->t_ack = 0;
842
parse(PseudoTcpSocket *self, const guint8 * buffer, guint32 size)
1171
parse (PseudoTcpSocket *self, const guint8 *_header_buf, gsize header_buf_len,
1172
const guint8 *data_buf, gsize data_buf_len)
1182
header_buf.u8 = _header_buf;
1184
if (header_buf_len != 24)
849
seg.conv = ntohl(*(guint32 *)buffer);
850
seg.seq = ntohl(*(guint32 *)(buffer + 4));
851
seg.ack = ntohl(*(guint32 *)(buffer + 8));
852
seg.flags = buffer[13];
853
seg.wnd = ntohs(*(guint16 *)(buffer + 14));
855
seg.tsval = ntohl(*(guint32 *)(buffer + 16));
856
seg.tsecr = ntohl(*(guint32 *)(buffer + 20));
858
seg.data = ((gchar *)buffer) + HEADER_SIZE;
859
seg.len = size - HEADER_SIZE;
1187
seg.conv = ntohl(*header_buf.u32);
1188
seg.seq = ntohl(*(header_buf.u32 + 1));
1189
seg.ack = ntohl(*(header_buf.u32 + 2));
1190
seg.flags = header_buf.u8[13];
1191
seg.wnd = ntohs(*(header_buf.u16 + 7));
1193
seg.tsval = ntohl(*(header_buf.u32 + 4));
1194
seg.tsecr = ntohl(*(header_buf.u32 + 5));
1196
seg.data = (const gchar *) data_buf;
1197
seg.len = data_buf_len;
861
1199
DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "--> <CONV=%d><FLG=%d><SEQ=%d:%d><ACK=%d>"
862
1200
"<WND=%d><TS=%d><TSR=%d><LEN=%d>",
960
1302
DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "rtt: %ld srtt: %d rto: %d",
961
1303
rtt, priv->rx_srtt, priv->rx_rto);
963
g_assert_not_reached ();
1305
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid RTT: %ld", rtt);
967
priv->snd_wnd = seg->wnd;
1310
priv->snd_wnd = seg->wnd << priv->swnd_scale;
969
1312
nAcked = seg->ack - priv->snd_una;
970
1313
priv->snd_una = seg->ack;
972
1315
priv->rto_base = (priv->snd_una == priv->snd_nxt) ? 0 : now;
974
priv->slen -= nAcked;
975
memmove(priv->sbuf, priv->sbuf + nAcked, priv->slen);
976
//LOG(LS_INFO) << "PseudoTcp::process - priv->slen = " << priv->slen;
1317
pseudo_tcp_fifo_consume_read_data (&priv->sbuf, nAcked);
978
1319
for (nFree = nAcked; nFree > 0; ) {
981
g_assert(priv->slist != NULL);
982
data = (SSegment *) (priv->slist->data);
1322
g_assert(g_queue_get_length (&priv->slist) != 0);
1323
data = (SSegment *) g_queue_peek_head (&priv->slist);
984
1325
if (nFree < data->len) {
985
1326
data->len -= nFree;
988
1330
if (data->len > priv->largest) {
989
1331
priv->largest = data->len;
991
1333
nFree -= data->len;
992
g_slice_free (SSegment, priv->slist->data);
993
priv->slist = g_list_delete_link (priv->slist, priv->slist);
1334
g_slice_free (SSegment, data);
1335
g_queue_pop_head (&priv->slist);
997
1339
if (priv->dup_acks >= 3) {
998
if (priv->snd_una >= priv->recover) { // NewReno
1340
if (LARGER_OR_EQUAL (priv->snd_una, priv->recover)) { // NewReno
999
1341
guint32 nInFlight = priv->snd_nxt - priv->snd_una;
1000
1342
// (Fast Retransmit)
1001
1343
priv->cwnd = min(priv->ssthresh, nInFlight + priv->mss);
1018
1360
priv->cwnd += max(1LU, priv->mss * priv->mss / priv->cwnd);
1023
if ((priv->state == TCP_SYN_RECEIVED) && !bConnect) {
1024
priv->state = TCP_ESTABLISHED;
1025
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED");
1027
if (priv->callbacks.PseudoTcpOpened)
1028
priv->callbacks.PseudoTcpOpened(self, priv->callbacks.user_data);
1031
// If we make room in the send queue, notify the user
1032
// The goal it to make sure we always have at least enough data to fill the
1033
// window. We'd like to notify the app when we are halfway to that point.
1034
kIdealRefillSize = (sizeof(priv->sbuf) + sizeof(priv->rbuf)) / 2;
1035
if (priv->bWriteEnable && (priv->slen < kIdealRefillSize)) {
1036
priv->bWriteEnable = FALSE;
1037
if (priv->callbacks.PseudoTcpWritable)
1038
priv->callbacks.PseudoTcpWritable(self, priv->callbacks.user_data);
1040
1363
} else if (seg->ack == priv->snd_una) {
1041
1364
/* !?! Note, tcp says don't do this... but otherwise how does a
1042
1365
closed window become open? */
1043
priv->snd_wnd = seg->wnd;
1366
priv->snd_wnd = seg->wnd << priv->swnd_scale;
1045
1368
// Check duplicate acks
1046
1369
if (seg->len > 0) {
1396
if ((priv->state == TCP_SYN_RECEIVED) && !bConnect) {
1397
priv->state = TCP_ESTABLISHED;
1398
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED");
1400
if (priv->callbacks.PseudoTcpOpened)
1401
priv->callbacks.PseudoTcpOpened(self, priv->callbacks.user_data);
1404
// If we make room in the send queue, notify the user
1405
// The goal it to make sure we always have at least enough data to fill the
1406
// window. We'd like to notify the app when we are halfway to that point.
1407
kIdealRefillSize = (priv->sbuf_len + priv->rbuf_len) / 2;
1409
snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
1410
if (priv->bWriteEnable && snd_buffered < kIdealRefillSize) {
1411
priv->bWriteEnable = FALSE;
1412
if (priv->callbacks.PseudoTcpWritable)
1413
priv->callbacks.PseudoTcpWritable(self, priv->callbacks.user_data);
1072
1416
/* Conditions where acks must be sent:
1073
1417
* 1) Segment is too old (they missed an ACK) (immediately)
1074
1418
* 2) Segment is too new (we missed a segment) (immediately)
1080
1424
if (seg->seq != priv->rcv_nxt) {
1081
1425
sflags = sfImmediateAck; // (Fast Recovery)
1082
1426
} else if (seg->len != 0) {
1083
sflags = sfDelayedAck;
1427
if (priv->ack_delay == 0) {
1428
sflags = sfImmediateAck;
1430
sflags = sfDelayedAck;
1085
1433
if (sflags == sfImmediateAck) {
1086
1434
if (seg->seq > priv->rcv_nxt) {
1087
1435
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "too new");
1088
} else if (seg->seq + seg->len <= priv->rcv_nxt) {
1436
} else if (SMALLER_OR_EQUAL(seg->seq + seg->len, priv->rcv_nxt)) {
1089
1437
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "too old");
1093
1441
// Adjust the incoming segment to fit our receive buffer
1094
if (seg->seq < priv->rcv_nxt) {
1442
if (SMALLER(seg->seq, priv->rcv_nxt)) {
1095
1443
guint32 nAdjust = priv->rcv_nxt - seg->seq;
1096
1444
if (nAdjust < seg->len) {
1097
1445
seg->seq += nAdjust;
1124
1473
guint32 nOffset = seg->seq - priv->rcv_nxt;
1125
memcpy(priv->rbuf + priv->rlen + nOffset, seg->data, seg->len);
1476
res = pseudo_tcp_fifo_write_offset (&priv->rbuf, (guint8 *) seg->data,
1478
g_assert (res == seg->len);
1126
1480
if (seg->seq == priv->rcv_nxt) {
1127
1481
GList *iter = NULL;
1129
priv->rlen += seg->len;
1483
pseudo_tcp_fifo_consume_write_buffer (&priv->rbuf, seg->len);
1130
1484
priv->rcv_nxt += seg->len;
1131
1485
priv->rcv_wnd -= seg->len;
1132
1486
bNewData = TRUE;
1134
1488
iter = priv->rlist;
1135
while (iter && (((RSegment *)iter->data)->seq <= priv->rcv_nxt)) {
1490
SMALLER_OR_EQUAL(((RSegment *)iter->data)->seq, priv->rcv_nxt)) {
1136
1491
RSegment *data = (RSegment *)(iter->data);
1137
if (data->seq + data->len > priv->rcv_nxt) {
1492
if (LARGER (data->seq + data->len, priv->rcv_nxt)) {
1138
1493
guint32 nAdjust = (data->seq + data->len) - priv->rcv_nxt;
1139
1494
sflags = sfImmediateAck; // (Fast Recovery)
1140
1495
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Recovered %d bytes (%d -> %d)",
1141
1496
nAdjust, priv->rcv_nxt, priv->rcv_nxt + nAdjust);
1142
priv->rlen += nAdjust;
1497
pseudo_tcp_fifo_consume_write_buffer (&priv->rbuf, nAdjust);
1143
1498
priv->rcv_nxt += nAdjust;
1144
1499
priv->rcv_wnd -= nAdjust;
1303
1676
// If this is an immediate ack, or the second delayed ack
1304
1677
if ((sflags == sfImmediateAck) || priv->t_ack) {
1305
packet(self, priv->snd_nxt, 0, 0, 0);
1678
packet(self, priv->snd_nxt, 0, 0, 0, now);
1307
priv->t_ack = get_current_time();
1312
1685
// Nagle algorithm
1313
if ((priv->snd_nxt > priv->snd_una) && (nAvailable < priv->mss)) {
1686
// If there is data already in-flight, and we haven't a full segment of
1687
// data ready to send then hold off until we get more to send, or the
1688
// in-flight data is acknowledged.
1689
if (priv->use_nagling && (priv->snd_nxt > priv->snd_una) &&
1690
(nAvailable < priv->mss)) {
1317
1694
// Find the next segment to transmit
1319
while (((SSegment*)iter->data)->xmit > 0) {
1320
iter = g_list_next (iter);
1695
iter = g_queue_peek_head_link (&priv->unsent_slist);
1324
1698
// If the segment is too large, break it into two
1325
if (((SSegment*)iter->data)->len > nAvailable) {
1699
if (sseg->len > nAvailable) {
1326
1700
SSegment *subseg = g_slice_new0 (SSegment);
1327
subseg->seq = ((SSegment*)iter->data)->seq + nAvailable;
1328
subseg->len = ((SSegment*)iter->data)->len - nAvailable;
1329
subseg->bCtrl = ((SSegment*)iter->data)->bCtrl;
1701
subseg->seq = sseg->seq + nAvailable;
1702
subseg->len = sseg->len - nAvailable;
1703
subseg->bCtrl = sseg->bCtrl;
1331
((SSegment*)iter->data)->len = nAvailable;
1332
priv->slist = g_list_insert_before(priv->slist, iter->next, subseg);
1705
sseg->len = nAvailable;
1706
g_queue_insert_after (&priv->unsent_slist, iter, subseg);
1707
g_queue_insert_after (&priv->slist, g_queue_find (&priv->slist, sseg),
1335
if (!transmit(self, iter, now)) {
1711
if (!transmit(self, sseg, now)) {
1336
1712
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "transmit failed");
1337
1713
// TODO: consider closing socket
1374
1749
priv->ssthresh = max(priv->ssthresh, 2 * priv->mss);
1375
1750
priv->cwnd = max(priv->cwnd, priv->mss);
1754
apply_window_scale_option (PseudoTcpSocket *self, guint8 scale_factor)
1756
PseudoTcpSocketPrivate *priv = self->priv;
1758
priv->swnd_scale = scale_factor;
1762
apply_option(PseudoTcpSocket *self, char kind, const guint8* data, guint32 len)
1764
if (kind == TCP_OPT_MSS) {
1765
DEBUG (PSEUDO_TCP_DEBUG_NORMAL,
1766
"Peer specified MSS option which is not supported.");
1768
} else if (kind == TCP_OPT_WND_SCALE) {
1769
// Window scale factor.
1770
// http://www.ietf.org/rfc/rfc1323.txt
1772
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid window scale option received.");
1775
apply_window_scale_option(self, data[0]);
1781
parse_options (PseudoTcpSocket *self, const guint8 *data, guint32 len)
1783
PseudoTcpSocketPrivate *priv = self->priv;
1784
gboolean has_window_scaling_option = FALSE;
1787
// See http://www.freesoft.org/CIE/Course/Section4/8.htm for
1788
// parsing the options list.
1790
guint8 kind = TCP_OPT_EOL;
1799
if (kind == TCP_OPT_EOL) {
1800
// End of option list.
1802
} else if (kind == TCP_OPT_NOOP) {
1810
// Length of this option.
1811
opt_len = data[pos];
1814
if (len < pos + opt_len)
1817
// Content of this option.
1818
if (opt_len <= len - pos) {
1819
apply_option (self, kind, data + pos, opt_len);
1822
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid option length received.");
1826
if (kind == TCP_OPT_WND_SCALE)
1827
has_window_scaling_option = TRUE;
1830
if (!has_window_scaling_option) {
1831
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Peer doesn't support window scaling");
1832
if (priv->rwnd_scale > 0) {
1833
// Peer doesn't support TCP options and window scaling.
1834
// Revert receive buffer size to default value.
1835
resize_receive_buffer (self, DEFAULT_RCV_BUF_SIZE);
1836
priv->swnd_scale = 0;
1842
resize_send_buffer (PseudoTcpSocket *self, guint32 new_size)
1844
PseudoTcpSocketPrivate *priv = self->priv;
1846
priv->sbuf_len = new_size;
1847
pseudo_tcp_fifo_set_capacity (&priv->sbuf, new_size);
1852
resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size)
1854
PseudoTcpSocketPrivate *priv = self->priv;
1855
guint8 scale_factor = 0;
1857
gsize available_space;
1859
if (priv->rbuf_len == new_size)
1862
// Determine the scale factor such that the scaled window size can fit
1863
// in a 16-bit unsigned integer.
1864
while (new_size > 0xFFFF) {
1869
// Determine the proper size of the buffer.
1870
new_size <<= scale_factor;
1871
result = pseudo_tcp_fifo_set_capacity (&priv->rbuf, new_size);
1873
// Make sure the new buffer is large enough to contain data in the old
1874
// buffer. This should always be true because this method is called either
1875
// before connection is established or when peers are exchanging connect
1878
priv->rbuf_len = new_size;
1879
priv->rwnd_scale = scale_factor;
1880
priv->ssthresh = new_size;
1882
available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf);
1883
priv->rcv_wnd = available_space;
1887
pseudo_tcp_socket_get_available_bytes (PseudoTcpSocket *self)
1889
PseudoTcpSocketPrivate *priv = self->priv;
1891
if (priv->state != TCP_ESTABLISHED) {
1895
return pseudo_tcp_fifo_get_buffered (&priv->rbuf);
1899
pseudo_tcp_socket_can_send (PseudoTcpSocket *self)
1901
return (pseudo_tcp_socket_get_available_send_space (self) > 0);
1905
pseudo_tcp_socket_get_available_send_space (PseudoTcpSocket *self)
1907
PseudoTcpSocketPrivate *priv = self->priv;
1911
if (priv->state == TCP_ESTABLISHED)
1912
ret = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
1917
priv->bWriteEnable = TRUE;