~ubuntu-branches/debian/stretch/libnice/stretch

« back to all changes in this revision

Viewing changes to agent/pseudotcp.c

  • Committer: Package Import Robot
  • Author(s): Simon McVittie
  • Date: 2014-05-14 12:00:13 UTC
  • mfrom: (1.2.9) (5.1.13 sid)
  • Revision ID: package-import@ubuntu.com-20140514120013-fi5mh9bexrjnwnd8
Tags: 0.1.7-1
* New upstream release 0.1.6, 0.1.7
  - fixes various compiler warnings that were mistakenly fatal in 0.1.5
    (Closes: #743232, #743233)
  - update symbols file for new API
* Explicitly disable -Werror, even if we package a non-release in future
* Don't run tests during the build. We were ignoring failures already,
  and they sometimes hang until the buildd terminates them.
  Upstream (Olivier Crête) says they are stable enough to be useful
  for developers, but not for integration testing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
74
74
#endif
75
75
 
76
76
#include "pseudotcp.h"
 
77
#include "agent-priv.h"
77
78
 
78
79
G_DEFINE_TYPE (PseudoTcpSocket, pseudo_tcp_socket, G_TYPE_OBJECT);
79
80
 
153
154
#define MIN_RTO      250
154
155
#define DEF_RTO     3000 /* 3 seconds (RFC1122, Sec 4.2.3.1) */
155
156
#define MAX_RTO    60000 /* 60 seconds */
156
 
#define ACK_DELAY    100 /* 100 milliseconds */
 
157
#define DEFAULT_ACK_DELAY    100 /* 100 milliseconds */
 
158
#define DEFAULT_NO_DELAY     FALSE
 
159
 
 
160
#define DEFAULT_RCV_BUF_SIZE (60 * 1024)
 
161
#define DEFAULT_SND_BUF_SIZE (90 * 1024)
 
162
 
 
163
#define TCP_OPT_EOL       0  // End of list.
 
164
#define TCP_OPT_NOOP      1  // No-op.
 
165
#define TCP_OPT_MSS       2  // Maximum segment size.
 
166
#define TCP_OPT_WND_SCALE 3  // Window scale factor.
 
167
 
157
168
 
158
169
/*
159
170
#define FLAG_FIN 0x01
193
204
static guint32
194
205
get_current_time(void)
195
206
{
196
 
  GTimeVal tv;
197
 
  g_get_current_time (&tv);
198
 
  return tv.tv_sec * 1000 + tv.tv_usec / 1000;
 
207
  return g_get_monotonic_time () / 1000;
199
208
}
200
209
 
201
210
static gboolean
228
237
  }
229
238
}
230
239
 
 
240
////////////////////////////////////////////////////////
 
241
// PseudoTcpFifo works exactly like FifoBuffer in libjingle
 
242
////////////////////////////////////////////////////////
 
243
 
 
244
 
 
245
typedef struct {
 
246
  guint8 *buffer;
 
247
  gsize buffer_length;
 
248
  gsize data_length;
 
249
  gsize read_position;
 
250
} PseudoTcpFifo;
 
251
 
 
252
 
 
253
static void
 
254
pseudo_tcp_fifo_init (PseudoTcpFifo *b, gsize size)
 
255
{
 
256
  b->buffer = g_slice_alloc (size);
 
257
  b->buffer_length = size;
 
258
}
 
259
 
 
260
static void
 
261
pseudo_tcp_fifo_clear (PseudoTcpFifo *b)
 
262
{
 
263
  if (b->buffer)
 
264
    g_slice_free1 (b->buffer_length, b->buffer);
 
265
  b->buffer = NULL;
 
266
  b->buffer_length = 0;
 
267
}
 
268
 
 
269
static gsize
 
270
pseudo_tcp_fifo_get_buffered (PseudoTcpFifo *b)
 
271
{
 
272
  return b->data_length;
 
273
}
 
274
 
 
275
static gboolean
 
276
pseudo_tcp_fifo_set_capacity (PseudoTcpFifo *b, gsize size)
 
277
{
 
278
  if (b->data_length > size)
 
279
    return FALSE;
 
280
 
 
281
  if (size != b->data_length) {
 
282
    guint8 *buffer = g_slice_alloc (size);
 
283
    gsize copy = b->data_length;
 
284
    gsize tail_copy = min (copy, b->buffer_length - b->read_position);
 
285
 
 
286
    memcpy (buffer, &b->buffer[b->read_position], tail_copy);
 
287
    memcpy (buffer + tail_copy, &b->buffer[0], copy - tail_copy);
 
288
    g_slice_free1 (b->buffer_length, b->buffer);
 
289
    b->buffer = buffer;
 
290
    b->buffer_length = size;
 
291
    b->read_position = 0;
 
292
  }
 
293
 
 
294
  return TRUE;
 
295
}
 
296
 
 
297
static void
 
298
pseudo_tcp_fifo_consume_read_data (PseudoTcpFifo *b, gsize size)
 
299
{
 
300
  g_assert (size <= b->data_length);
 
301
 
 
302
  b->read_position = (b->read_position + size) % b->buffer_length;
 
303
  b->data_length -= size;
 
304
}
 
305
 
 
306
static void
 
307
pseudo_tcp_fifo_consume_write_buffer (PseudoTcpFifo *b, gsize size)
 
308
{
 
309
  g_assert (size <= b->buffer_length - b->data_length);
 
310
 
 
311
  b->data_length += size;
 
312
}
 
313
 
 
314
static gsize
 
315
pseudo_tcp_fifo_get_write_remaining (PseudoTcpFifo *b)
 
316
{
 
317
  return b->buffer_length - b->data_length;
 
318
}
 
319
 
 
320
static gsize
 
321
pseudo_tcp_fifo_read_offset (PseudoTcpFifo *b, guint8 *buffer, gsize bytes,
 
322
    gsize offset)
 
323
{
 
324
  gsize available = b->data_length - offset;
 
325
  gsize read_position = (b->read_position + offset) % b->buffer_length;
 
326
  gsize copy = min (bytes, available);
 
327
  gsize tail_copy = min(copy, b->buffer_length - read_position);
 
328
 
 
329
  /* EOS */
 
330
  if (offset >= b->data_length)
 
331
    return 0;
 
332
 
 
333
  memcpy(buffer, &b->buffer[read_position], tail_copy);
 
334
  memcpy(buffer + tail_copy, &b->buffer[0], copy - tail_copy);
 
335
 
 
336
  return copy;
 
337
}
 
338
 
 
339
static gsize
 
340
pseudo_tcp_fifo_write_offset (PseudoTcpFifo *b, const guint8 *buffer,
 
341
    gsize bytes, gsize offset)
 
342
{
 
343
  gsize available = b->buffer_length - b->data_length - offset;
 
344
  gsize write_position = (b->read_position + b->data_length + offset)
 
345
      % b->buffer_length;
 
346
  gsize copy = min (bytes, available);
 
347
  gsize tail_copy = min(copy, b->buffer_length - write_position);
 
348
 
 
349
  if (b->data_length + offset >= b->buffer_length) {
 
350
    return 0;
 
351
  }
 
352
 
 
353
  memcpy(&b->buffer[write_position], buffer, tail_copy);
 
354
  memcpy(&b->buffer[0], buffer + tail_copy, copy - tail_copy);
 
355
 
 
356
  return copy;
 
357
}
 
358
 
 
359
static gsize
 
360
pseudo_tcp_fifo_read (PseudoTcpFifo *b, guint8 *buffer, gsize bytes)
 
361
{
 
362
  gsize copy;
 
363
 
 
364
  copy = pseudo_tcp_fifo_read_offset (b, buffer, bytes, 0);
 
365
 
 
366
  b->read_position = (b->read_position + copy) % b->buffer_length;
 
367
  b->data_length -= copy;
 
368
 
 
369
  return copy;
 
370
}
 
371
 
 
372
static gsize
 
373
pseudo_tcp_fifo_write (PseudoTcpFifo *b, const guint8 *buffer, gsize bytes)
 
374
{
 
375
  gsize copy;
 
376
 
 
377
  copy = pseudo_tcp_fifo_write_offset (b, buffer, bytes, 0);
 
378
  b->data_length += copy;
 
379
 
 
380
  return copy;
 
381
}
 
382
 
 
383
 
231
384
//////////////////////////////////////////////////////////////////////
232
385
// PseudoTcp
233
386
//////////////////////////////////////////////////////////////////////
244
397
  sfImmediateAck
245
398
} SendFlags;
246
399
 
247
 
enum {
248
 
  // Note: can't go as high as 1024 * 64, because of uint16 precision
249
 
  kRcvBufSize = 1024 * 60,
250
 
  // Note: send buffer should be larger to make sure we can always fill the
251
 
  // receiver window
252
 
  kSndBufSize = 1024 * 90
253
 
};
254
 
 
255
400
typedef struct {
256
401
  guint32 conv, seq, ack;
257
402
  guint8 flags;
286
431
 
287
432
  // Incoming data
288
433
  GList *rlist;
289
 
  gchar rbuf[kRcvBufSize];
290
 
  guint32 rcv_nxt, rcv_wnd, rlen, lastrecv;
 
434
  guint32 rbuf_len, rcv_nxt, rcv_wnd, lastrecv;
 
435
  guint8 rwnd_scale; // Window scale factor
 
436
  PseudoTcpFifo rbuf;
291
437
 
292
438
  // Outgoing data
293
 
  GList *slist;
294
 
  gchar sbuf[kSndBufSize];
295
 
  guint32 snd_nxt, snd_wnd, slen, lastsend, snd_una;
 
439
  GQueue slist;
 
440
  GQueue unsent_slist;
 
441
  guint32 sbuf_len, snd_nxt, snd_wnd, lastsend;
 
442
  guint32 snd_una;  /* oldest unacknowledged sequence number */
 
443
  guint8 swnd_scale; // Window scale factor
 
444
  PseudoTcpFifo sbuf;
 
445
 
296
446
  // Maximum segment size, estimated protocol level, largest segment sent
297
447
  guint32 mss, msslevel, largest, mtu_advise;
298
448
  // Retransmit timer
308
458
  guint32 ssthresh, cwnd;
309
459
  guint8 dup_acks;
310
460
  guint32 recover;
311
 
  guint32 t_ack;
312
 
 
 
461
  guint32 t_ack;  /* time a delayed ack was scheduled; 0 if no acks scheduled */
 
462
 
 
463
  gboolean use_nagling;
 
464
  guint32 ack_delay;
 
465
 
 
466
  // This is used by unit tests to test backward compatibility of
 
467
  // PseudoTcp implementations that don't support window scaling.
 
468
  gboolean support_wnd_scale;
313
469
};
314
470
 
 
471
#define LARGER(a,b) (((a) - (b) - 1) < (G_MAXUINT32 >> 1))
 
472
#define LARGER_OR_EQUAL(a,b) (((a) - (b)) < (G_MAXUINT32 >> 1))
 
473
#define SMALLER(a,b) LARGER ((b),(a))
 
474
#define SMALLER_OR_EQUAL(a,b) LARGER_OR_EQUAL ((b),(a))
315
475
 
316
476
/* properties */
317
477
enum
319
479
  PROP_CONVERSATION = 1,
320
480
  PROP_CALLBACKS,
321
481
  PROP_STATE,
 
482
  PROP_ACK_DELAY,
 
483
  PROP_NO_DELAY,
 
484
  PROP_RCV_BUF,
 
485
  PROP_SND_BUF,
322
486
  LAST_PROPERTY
323
487
};
324
488
 
330
494
static void pseudo_tcp_socket_finalize (GObject *object);
331
495
 
332
496
 
 
497
static void queue_connect_message (PseudoTcpSocket *self);
333
498
static guint32 queue(PseudoTcpSocket *self, const gchar * data,
334
499
    guint32 len, gboolean bCtrl);
335
500
static PseudoTcpWriteResult packet(PseudoTcpSocket *self, guint32 seq,
336
 
    guint8 flags, const gchar * data, guint32 len);
337
 
static gboolean parse(PseudoTcpSocket *self,
338
 
    const guint8 * buffer, guint32 size);
 
501
    guint8 flags, guint32 offset, guint32 len, guint32 now);
 
502
static gboolean parse (PseudoTcpSocket *self,
 
503
    const guint8 *_header_buf, gsize header_buf_len,
 
504
    const guint8 *data_buf, gsize data_buf_len);
339
505
static gboolean process(PseudoTcpSocket *self, Segment *seg);
340
 
static gboolean transmit(PseudoTcpSocket *self, const GList *seg, guint32 now);
 
506
static gboolean transmit(PseudoTcpSocket *self, SSegment *sseg, guint32 now);
341
507
static void attempt_send(PseudoTcpSocket *self, SendFlags sflags);
342
508
static void closedown(PseudoTcpSocket *self, guint32 err);
343
509
static void adjustMTU(PseudoTcpSocket *self);
 
510
static void parse_options (PseudoTcpSocket *self, const guint8 *data,
 
511
    guint32 len);
 
512
static void resize_send_buffer (PseudoTcpSocket *self, guint32 new_size);
 
513
static void resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size);
344
514
 
345
515
 
346
516
// The following logging is for detailed (packet-level) pseudotcp analysis only.
393
563
          TCP_LISTEN, TCP_CLOSED, TCP_LISTEN,
394
564
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
395
565
 
 
566
  g_object_class_install_property (object_class, PROP_ACK_DELAY,
 
567
      g_param_spec_uint ("ack-delay", "ACK Delay",
 
568
          "Delayed ACK timeout (in milliseconds)",
 
569
          0, G_MAXUINT, DEFAULT_ACK_DELAY,
 
570
          G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
 
571
 
 
572
  g_object_class_install_property (object_class, PROP_NO_DELAY,
 
573
      g_param_spec_boolean ("no-delay", "No Delay",
 
574
          "Disable the Nagle algorithm (like the TCP_NODELAY option)",
 
575
          DEFAULT_NO_DELAY,
 
576
          G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
 
577
 
 
578
  g_object_class_install_property (object_class, PROP_RCV_BUF,
 
579
      g_param_spec_uint ("rcv-buf", "Receive Buffer",
 
580
          "Receive Buffer size",
 
581
          1, G_MAXUINT, DEFAULT_RCV_BUF_SIZE,
 
582
          G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
 
583
 
 
584
  g_object_class_install_property (object_class, PROP_SND_BUF,
 
585
      g_param_spec_uint ("snd-buf", "Send Buffer",
 
586
          "Send Buffer size",
 
587
          1, G_MAXUINT, DEFAULT_SND_BUF_SIZE,
 
588
          G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
396
589
}
397
590
 
398
591
 
414
607
    case PROP_STATE:
415
608
      g_value_set_uint (value, self->priv->state);
416
609
      break;
 
610
    case PROP_ACK_DELAY:
 
611
      g_value_set_uint (value, self->priv->ack_delay);
 
612
      break;
 
613
    case PROP_NO_DELAY:
 
614
      g_value_set_boolean (value, !self->priv->use_nagling);
 
615
      break;
 
616
    case PROP_RCV_BUF:
 
617
      g_value_set_uint (value, self->priv->rbuf_len);
 
618
      break;
 
619
    case PROP_SND_BUF:
 
620
      g_value_set_uint (value, self->priv->sbuf_len);
 
621
      break;
417
622
    default:
418
623
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
419
624
      break;
438
643
        self->priv->callbacks = *c;
439
644
      }
440
645
      break;
 
646
    case PROP_ACK_DELAY:
 
647
      self->priv->ack_delay = g_value_get_uint (value);
 
648
      break;
 
649
    case PROP_NO_DELAY:
 
650
      self->priv->use_nagling = !g_value_get_boolean (value);
 
651
      break;
 
652
    case PROP_RCV_BUF:
 
653
      g_return_if_fail (self->priv->state == TCP_LISTEN);
 
654
      resize_receive_buffer (self, g_value_get_uint (value));
 
655
      break;
 
656
    case PROP_SND_BUF:
 
657
      g_return_if_fail (self->priv->state == TCP_LISTEN);
 
658
      resize_send_buffer (self, g_value_get_uint (value));
 
659
      break;
441
660
    default:
442
661
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
443
662
      break;
450
669
  PseudoTcpSocket *self = PSEUDO_TCP_SOCKET (object);
451
670
  PseudoTcpSocketPrivate *priv = self->priv;
452
671
  GList *i;
 
672
  SSegment *sseg;
453
673
 
454
674
  if (priv == NULL)
455
675
    return;
456
676
 
457
 
  for (i = priv->slist; i; i = i->next) {
458
 
    SSegment *sseg = i->data;
 
677
  while ((sseg = g_queue_pop_head (&priv->slist)))
459
678
    g_slice_free (SSegment, sseg);
460
 
  }
 
679
  g_queue_clear (&priv->unsent_slist);
461
680
  for (i = priv->rlist; i; i = i->next) {
462
681
    RSegment *rseg = i->data;
463
682
    g_slice_free (RSegment, rseg);
464
683
  }
465
 
  g_list_free (priv->slist);
466
 
  priv->slist = NULL;
467
684
  g_list_free (priv->rlist);
468
685
  priv->rlist = NULL;
469
686
 
 
687
  pseudo_tcp_fifo_clear (&priv->rbuf);
 
688
  pseudo_tcp_fifo_clear (&priv->sbuf);
 
689
 
470
690
  g_free (priv);
471
691
  self->priv = NULL;
472
692
 
489
709
  priv->shutdown = SD_NONE;
490
710
  priv->error = 0;
491
711
 
 
712
  priv->rbuf_len = DEFAULT_RCV_BUF_SIZE;
 
713
  pseudo_tcp_fifo_init (&priv->rbuf, priv->rbuf_len);
 
714
  priv->sbuf_len = DEFAULT_SND_BUF_SIZE;
 
715
  pseudo_tcp_fifo_init (&priv->sbuf, priv->sbuf_len);
 
716
 
492
717
  priv->state = TCP_LISTEN;
493
718
  priv->conv = 0;
494
 
  priv->rcv_wnd = sizeof(priv->rbuf);
495
 
  priv->snd_nxt = priv->slen = 0;
 
719
  g_queue_init (&priv->slist);
 
720
  g_queue_init (&priv->unsent_slist);
 
721
  priv->rcv_wnd = priv->rbuf_len;
 
722
  priv->rwnd_scale = priv->swnd_scale = 0;
 
723
  priv->snd_nxt = 0;
496
724
  priv->snd_wnd = 1;
497
 
  priv->snd_una = priv->rcv_nxt = priv->rlen = 0;
 
725
  priv->snd_una = priv->rcv_nxt = 0;
498
726
  priv->bReadEnable = TRUE;
499
727
  priv->bWriteEnable = FALSE;
500
728
  priv->t_ack = 0;
507
735
  priv->rto_base = 0;
508
736
 
509
737
  priv->cwnd = 2 * priv->mss;
510
 
  priv->ssthresh = sizeof(priv->rbuf);
 
738
  priv->ssthresh = priv->rbuf_len;
511
739
  priv->lastrecv = priv->lastsend = priv->last_traffic = now;
512
740
  priv->bOutgoing = FALSE;
513
741
 
518
746
 
519
747
  priv->rx_rto = DEF_RTO;
520
748
  priv->rx_srtt = priv->rx_rttvar = 0;
 
749
 
 
750
  priv->ack_delay = DEFAULT_ACK_DELAY;
 
751
  priv->use_nagling = !DEFAULT_NO_DELAY;
 
752
 
 
753
  priv->support_wnd_scale = TRUE;
521
754
}
522
755
 
523
756
PseudoTcpSocket *pseudo_tcp_socket_new (guint32 conversation,
530
763
      NULL);
531
764
}
532
765
 
 
766
static void
 
767
queue_connect_message (PseudoTcpSocket *self)
 
768
{
 
769
  PseudoTcpSocketPrivate *priv = self->priv;
 
770
  guint8 buf[4];
 
771
  gsize size = 1;
 
772
 
 
773
  buf[0] = CTL_CONNECT;
 
774
  if (priv->support_wnd_scale) {
 
775
    buf[1] = TCP_OPT_WND_SCALE;
 
776
    buf[2] = 1;
 
777
    buf[3] = priv->rwnd_scale;
 
778
    size = 4;
 
779
  }
 
780
 
 
781
  priv->snd_wnd = size;
 
782
 
 
783
  queue(self, (char*) buf, size, TRUE);
 
784
}
 
785
 
533
786
gboolean
534
787
pseudo_tcp_socket_connect(PseudoTcpSocket *self)
535
788
{
536
789
  PseudoTcpSocketPrivate *priv = self->priv;
537
 
  gchar buffer[1];
538
790
 
539
791
  if (priv->state != TCP_LISTEN) {
540
792
    priv->error = EINVAL;
544
796
  priv->state = TCP_SYN_SENT;
545
797
  DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_SYN_SENT");
546
798
 
547
 
  buffer[0] = CTL_CONNECT;
548
 
  queue(self, buffer, 1, TRUE);
 
799
  queue_connect_message (self);
549
800
  attempt_send(self, sfNone);
550
801
 
551
802
  return TRUE;
573
824
  // Check if it's time to retransmit a segment
574
825
  if (priv->rto_base &&
575
826
      (time_diff(priv->rto_base + priv->rx_rto, now) <= 0)) {
576
 
    if (g_list_length (priv->slist) == 0) {
 
827
    if (g_queue_get_length (&priv->slist) == 0) {
577
828
      g_assert_not_reached ();
578
829
    } else {
579
830
      // Note: (priv->slist.front().xmit == 0)) {
585
836
          "(rto_base: %d) (now: %d) (dup_acks: %d)",
586
837
          priv->rx_rto, priv->rto_base, now, (guint) priv->dup_acks);
587
838
 
588
 
      if (!transmit(self, priv->slist, now)) {
 
839
      if (!transmit(self, g_queue_peek_head (&priv->slist), now)) {
589
840
        closedown(self, ECONNABORTED);
590
841
        return;
591
842
      }
611
862
    }
612
863
 
613
864
    // probe the window
614
 
    packet(self, priv->snd_nxt - 1, 0, 0, 0);
 
865
    packet(self, priv->snd_nxt - 1, 0, 0, 0, now);
615
866
    priv->lastsend = now;
616
867
 
617
868
    // back off retransmit timer
619
870
  }
620
871
 
621
872
  // Check if it's time to send delayed acks
622
 
  if (priv->t_ack && (time_diff(priv->t_ack + ACK_DELAY, now) <= 0)) {
623
 
    packet(self, priv->snd_nxt, 0, 0, 0);
 
873
  if (priv->t_ack && (time_diff(priv->t_ack + priv->ack_delay, now) <= 0)) {
 
874
    packet(self, priv->snd_nxt, 0, 0, 0, now);
624
875
  }
625
876
 
626
877
}
629
880
pseudo_tcp_socket_notify_packet(PseudoTcpSocket *self,
630
881
    const gchar * buffer, guint32 len)
631
882
{
 
883
  gboolean retval;
 
884
 
632
885
  if (len > MAX_PACKET) {
633
886
    //LOG_F(WARNING) << "packet too large";
634
887
    return FALSE;
635
 
  }
636
 
  return parse(self, (guint8 *) buffer, len);
637
 
}
638
 
 
639
 
gboolean
640
 
pseudo_tcp_socket_get_next_clock(PseudoTcpSocket *self, long *timeout)
 
888
  } else if (len < HEADER_SIZE) {
 
889
    //LOG_F(WARNING) << "packet too small";
 
890
    return FALSE;
 
891
  }
 
892
 
 
893
  /* Hold a reference to the PseudoTcpSocket during parsing, since it may be
 
894
   * closed from within a callback. */
 
895
  g_object_ref (self);
 
896
  retval = parse (self, (guint8 *) buffer, HEADER_SIZE,
 
897
      (guint8 *) buffer + HEADER_SIZE, len - HEADER_SIZE);
 
898
  g_object_unref (self);
 
899
 
 
900
  return retval;
 
901
}
 
902
 
 
903
/* Assume there are two buffers in the given #NiceInputMessage: a 24-byte one
 
904
 * containing the header, and a bigger one for the data. */
 
905
gboolean
 
906
pseudo_tcp_socket_notify_message (PseudoTcpSocket *self,
 
907
    NiceInputMessage *message)
 
908
{
 
909
  gboolean retval;
 
910
 
 
911
  g_assert_cmpuint (message->n_buffers, >, 0);
 
912
 
 
913
  if (message->n_buffers == 1)
 
914
    return pseudo_tcp_socket_notify_packet (self, message->buffers[0].buffer,
 
915
        message->buffers[0].size);
 
916
 
 
917
  g_assert_cmpuint (message->n_buffers, ==, 2);
 
918
  g_assert_cmpuint (message->buffers[0].size, ==, HEADER_SIZE);
 
919
 
 
920
  if (message->length > MAX_PACKET) {
 
921
    //LOG_F(WARNING) << "packet too large";
 
922
    return FALSE;
 
923
  } else if (message->length < HEADER_SIZE) {
 
924
    //LOG_F(WARNING) << "packet too small";
 
925
    return FALSE;
 
926
  }
 
927
 
 
928
  /* Hold a reference to the PseudoTcpSocket during parsing, since it may be
 
929
   * closed from within a callback. */
 
930
  g_object_ref (self);
 
931
  retval = parse (self, message->buffers[0].buffer, message->buffers[0].size,
 
932
      message->buffers[1].buffer, message->length - message->buffers[0].size);
 
933
  g_object_unref (self);
 
934
 
 
935
  return retval;
 
936
}
 
937
 
 
938
gboolean
 
939
pseudo_tcp_socket_get_next_clock(PseudoTcpSocket *self, guint64 *timeout)
641
940
{
642
941
  PseudoTcpSocketPrivate *priv = self->priv;
643
942
  guint32 now = get_current_time ();
 
943
  gsize snd_buffered;
644
944
 
645
945
  if (priv->shutdown == SD_FORCEFUL)
646
946
    return FALSE;
647
947
 
 
948
  snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
648
949
  if ((priv->shutdown == SD_GRACEFUL)
649
950
      && ((priv->state != TCP_ESTABLISHED)
650
 
          || ((priv->slen == 0) && (priv->t_ack == 0)))) {
 
951
          || ((snd_buffered == 0) && (priv->t_ack == 0)))) {
651
952
    return FALSE;
652
953
  }
653
954
 
 
955
  if (*timeout == 0 || *timeout < now)
 
956
    *timeout = now + CLOSED_TIMEOUT;
 
957
 
654
958
  if (priv->state == TCP_CLOSED) {
655
 
    *timeout = CLOSED_TIMEOUT;
 
959
    *timeout = min (*timeout, now + CLOSED_TIMEOUT);
656
960
    return TRUE;
657
961
  }
658
962
 
659
 
  *timeout = DEFAULT_TIMEOUT;
 
963
  *timeout = min (*timeout, now + DEFAULT_TIMEOUT);
660
964
 
661
965
  if (priv->t_ack) {
662
 
    *timeout = min(*timeout, time_diff(priv->t_ack + ACK_DELAY, now));
 
966
    *timeout = min(*timeout, priv->t_ack + priv->ack_delay);
663
967
  }
664
968
  if (priv->rto_base) {
665
 
    *timeout = min(*timeout, time_diff(priv->rto_base + priv->rx_rto, now));
 
969
    *timeout = min(*timeout, priv->rto_base + priv->rx_rto);
666
970
  }
667
971
  if (priv->snd_wnd == 0) {
668
 
    *timeout = min(*timeout, time_diff(priv->lastsend + priv->rx_rto, now));
 
972
    *timeout = min(*timeout, priv->lastsend + priv->rx_rto);
669
973
  }
670
974
 
671
975
  return TRUE;
676
980
pseudo_tcp_socket_recv(PseudoTcpSocket *self, char * buffer, size_t len)
677
981
{
678
982
  PseudoTcpSocketPrivate *priv = self->priv;
679
 
  guint32 read;
 
983
  gsize bytesread;
 
984
  gsize available_space;
680
985
 
681
986
  if (priv->state != TCP_ESTABLISHED) {
682
987
    priv->error = ENOTCONN;
683
988
    return -1;
684
989
  }
685
990
 
686
 
  if (priv->rlen == 0) {
 
991
  if (len == 0)
 
992
    return 0;
 
993
 
 
994
  bytesread = pseudo_tcp_fifo_read (&priv->rbuf, (guint8 *) buffer, len);
 
995
 
 
996
 // If there's no data in |m_rbuf|.
 
997
  if (bytesread == 0) {
687
998
    priv->bReadEnable = TRUE;
688
999
    priv->error = EWOULDBLOCK;
689
1000
    return -1;
690
1001
  }
691
1002
 
692
 
  read = min((guint32) len, priv->rlen);
693
 
  memcpy(buffer, priv->rbuf, read);
694
 
  priv->rlen -= read;
695
 
 
696
 
  /* !?! until we create a circular buffer, we need to move all of the rest
697
 
     of the buffer up! */
698
 
  memmove(priv->rbuf, priv->rbuf + read, sizeof(priv->rbuf) - read);
699
 
 
700
 
  if ((sizeof(priv->rbuf) - priv->rlen - priv->rcv_wnd)
701
 
      >= min(sizeof(priv->rbuf) / 2, priv->mss)) {
 
1003
  available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf);
 
1004
 
 
1005
  if (available_space - priv->rcv_wnd >=
 
1006
      min (priv->rbuf_len / 2, priv->mss)) {
702
1007
    // !?! Not sure about this was closed business
703
1008
    gboolean bWasClosed = (priv->rcv_wnd == 0);
704
1009
 
705
 
    priv->rcv_wnd = sizeof(priv->rbuf) - priv->rlen;
 
1010
    priv->rcv_wnd = available_space;
706
1011
 
707
1012
    if (bWasClosed) {
708
1013
      attempt_send(self, sfImmediateAck);
709
1014
    }
710
1015
  }
711
1016
 
712
 
  return read;
 
1017
  return bytesread;
713
1018
}
714
1019
 
715
1020
gint
717
1022
{
718
1023
  PseudoTcpSocketPrivate *priv = self->priv;
719
1024
  gint written;
 
1025
  gsize available_space;
720
1026
 
721
1027
  if (priv->state != TCP_ESTABLISHED) {
722
1028
    priv->error = ENOTCONN;
723
1029
    return -1;
724
1030
  }
725
1031
 
726
 
  if (priv->slen == sizeof(priv->sbuf)) {
 
1032
  available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
 
1033
 
 
1034
  if (!available_space) {
727
1035
    priv->bWriteEnable = TRUE;
728
1036
    priv->error = EWOULDBLOCK;
729
1037
    return -1;
743
1051
pseudo_tcp_socket_close(PseudoTcpSocket *self, gboolean force)
744
1052
{
745
1053
  PseudoTcpSocketPrivate *priv = self->priv;
746
 
  //nice_agent ("Closing socket %p : %d", sock, force?"true":"false");
 
1054
  DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "Closing socket %p : %s", self,
 
1055
      force ? "forcefully" : "gracefully");
747
1056
  priv->shutdown = force ? SD_FORCEFUL : SD_GRACEFUL;
748
1057
}
749
1058
 
762
1071
queue(PseudoTcpSocket *self, const gchar * data, guint32 len, gboolean bCtrl)
763
1072
{
764
1073
  PseudoTcpSocketPrivate *priv = self->priv;
 
1074
  gsize available_space;
765
1075
 
766
 
  if (len > sizeof(priv->sbuf) - priv->slen) {
 
1076
  available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
 
1077
  if (len > available_space) {
767
1078
    g_assert(!bCtrl);
768
 
    len = sizeof(priv->sbuf) - priv->slen;
 
1079
    len = available_space;
769
1080
  }
770
1081
 
771
1082
  // We can concatenate data if the last segment is the same type
772
1083
  // (control v. regular data), and has not been transmitted yet
773
 
  if (g_list_length (priv->slist) > 0 &&
774
 
      (((SSegment *)g_list_last (priv->slist)->data)->bCtrl == bCtrl) &&
775
 
      (((SSegment *)g_list_last (priv->slist)->data)->xmit == 0)) {
776
 
    ((SSegment *)g_list_last (priv->slist)->data)->len += len;
 
1084
  if (g_queue_get_length (&priv->slist) &&
 
1085
      (((SSegment *)g_queue_peek_tail (&priv->slist))->bCtrl == bCtrl) &&
 
1086
      (((SSegment *)g_queue_peek_tail (&priv->slist))->xmit == 0)) {
 
1087
    ((SSegment *)g_queue_peek_tail (&priv->slist))->len += len;
777
1088
  } else {
778
1089
    SSegment *sseg = g_slice_new0 (SSegment);
779
 
    sseg->seq = priv->snd_una + priv->slen;
 
1090
    gsize snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
 
1091
 
 
1092
    sseg->seq = priv->snd_una + snd_buffered;
780
1093
    sseg->len = len;
781
1094
    sseg->bCtrl = bCtrl;
782
 
    priv->slist = g_list_append (priv->slist, sseg);
 
1095
    g_queue_push_tail (&priv->slist, sseg);
 
1096
    g_queue_push_tail (&priv->unsent_slist, sseg);
783
1097
  }
784
1098
 
785
 
  memcpy(priv->sbuf + priv->slen, data, len);
786
 
  priv->slen += len;
787
1099
  //LOG(LS_INFO) << "PseudoTcp::queue - priv->slen = " << priv->slen;
788
 
  return len;
 
1100
  return pseudo_tcp_fifo_write (&priv->sbuf, (guint8*) data, len);;
789
1101
}
790
1102
 
 
1103
// Creates a packet and submits it to the network. This method can either
 
1104
// send payload or just an ACK packet.
 
1105
//
 
1106
// |seq| is the sequence number of this packet.
 
1107
// |flags| is the flags for sending this packet.
 
1108
// |offset| is the offset to read from |m_sbuf|.
 
1109
// |len| is the number of bytes to read from |m_sbuf| as payload. If this
 
1110
// value is 0 then this is an ACK packet, otherwise this packet has payload.
 
1111
 
791
1112
static PseudoTcpWriteResult
792
1113
packet(PseudoTcpSocket *self, guint32 seq, guint8 flags,
793
 
    const gchar * data, guint32 len)
 
1114
    guint32 offset, guint32 len, guint32 now)
794
1115
{
795
1116
  PseudoTcpSocketPrivate *priv = self->priv;
796
 
  guint32 now = get_current_time();
797
 
  guint8 buffer[MAX_PACKET];
 
1117
  union {
 
1118
    guint8 u8[MAX_PACKET];
 
1119
    guint16 u16[MAX_PACKET / 2];
 
1120
    guint32 u32[MAX_PACKET / 4];
 
1121
  } buffer;
798
1122
  PseudoTcpWriteResult wres = WR_SUCCESS;
799
1123
 
800
1124
  g_assert(HEADER_SIZE + len <= MAX_PACKET);
801
1125
 
802
 
  *((guint32 *) buffer) = htonl(priv->conv);
803
 
  *((guint32 *) (buffer + 4)) = htonl(seq);
804
 
  *((guint32 *) (buffer + 8)) = htonl(priv->rcv_nxt);
805
 
  buffer[12] = 0;
806
 
  buffer[13] = flags;
807
 
  *((guint16 *) (buffer + 14)) = htons((guint16)priv->rcv_wnd);
 
1126
  *buffer.u32 = htonl(priv->conv);
 
1127
  *(buffer.u32 + 1) = htonl(seq);
 
1128
  *(buffer.u32 + 2) = htonl(priv->rcv_nxt);
 
1129
  buffer.u8[12] = 0;
 
1130
  buffer.u8[13] = flags;
 
1131
  *(buffer.u16 + 7) = htons((guint16)(priv->rcv_wnd >> priv->rwnd_scale));
808
1132
 
809
1133
  // Timestamp computations
810
 
  *((guint32 *) (buffer + 16)) = htonl(now);
811
 
  *((guint32 *) (buffer + 20)) = htonl(priv->ts_recent);
 
1134
  *(buffer.u32 + 4) = htonl(now);
 
1135
  *(buffer.u32 + 5) = htonl(priv->ts_recent);
812
1136
  priv->ts_lastack = priv->rcv_nxt;
813
1137
 
814
 
  if (data != NULL)
815
 
    memcpy(buffer + HEADER_SIZE, data, len);
 
1138
  if (len) {
 
1139
    gsize bytes_read;
 
1140
 
 
1141
    bytes_read = pseudo_tcp_fifo_read_offset (&priv->sbuf, buffer.u8 + HEADER_SIZE,
 
1142
        len, offset);
 
1143
    g_assert (bytes_read == len);
 
1144
  }
816
1145
 
817
1146
  DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "<-- <CONV=%d><FLG=%d><SEQ=%d:%d><ACK=%d>"
818
1147
      "<WND=%d><TS=%d><TSR=%d><LEN=%d>",
819
1148
      priv->conv, (unsigned)flags, seq, seq + len, priv->rcv_nxt, priv->rcv_wnd,
820
1149
      now % 10000, priv->ts_recent % 10000, len);
821
1150
 
822
 
  wres = priv->callbacks.WritePacket(self, (gchar *) buffer, len + HEADER_SIZE,
 
1151
  wres = priv->callbacks.WritePacket(self, (gchar *) buffer.u8, len + HEADER_SIZE,
823
1152
                                     priv->callbacks.user_data);
824
 
  /* Note: When data is NULL, this is an ACK packet.  We don't read the
 
1153
  /* Note: When len is 0, this is an ACK packet.  We don't read the
825
1154
     return value for those, and thus we won't retry.  So go ahead and treat
826
1155
     the packet as a success (basically simulate as if it were dropped),
827
1156
     which will prevent our timers from being messed up. */
828
 
  if ((wres != WR_SUCCESS) && (NULL != data))
 
1157
  if ((wres != WR_SUCCESS) && (0 != len))
829
1158
    return wres;
830
1159
 
831
1160
  priv->t_ack = 0;
839
1168
}
840
1169
 
841
1170
static gboolean
842
 
parse(PseudoTcpSocket *self, const guint8 * buffer, guint32 size)
 
1171
parse (PseudoTcpSocket *self, const guint8 *_header_buf, gsize header_buf_len,
 
1172
    const guint8 *data_buf, gsize data_buf_len)
843
1173
{
844
1174
  Segment seg;
845
1175
 
846
 
  if (size < 12)
 
1176
  union {
 
1177
    const guint8 *u8;
 
1178
    const guint16 *u16;
 
1179
    const guint32 *u32;
 
1180
  } header_buf;
 
1181
 
 
1182
  header_buf.u8 = _header_buf;
 
1183
 
 
1184
  if (header_buf_len != 24)
847
1185
    return FALSE;
848
1186
 
849
 
  seg.conv = ntohl(*(guint32 *)buffer);
850
 
  seg.seq = ntohl(*(guint32 *)(buffer + 4));
851
 
  seg.ack = ntohl(*(guint32 *)(buffer + 8));
852
 
  seg.flags = buffer[13];
853
 
  seg.wnd = ntohs(*(guint16 *)(buffer + 14));
854
 
 
855
 
  seg.tsval = ntohl(*(guint32 *)(buffer + 16));
856
 
  seg.tsecr = ntohl(*(guint32 *)(buffer + 20));
857
 
 
858
 
  seg.data = ((gchar *)buffer) + HEADER_SIZE;
859
 
  seg.len = size - HEADER_SIZE;
 
1187
  seg.conv = ntohl(*header_buf.u32);
 
1188
  seg.seq = ntohl(*(header_buf.u32 + 1));
 
1189
  seg.ack = ntohl(*(header_buf.u32 + 2));
 
1190
  seg.flags = header_buf.u8[13];
 
1191
  seg.wnd = ntohs(*(header_buf.u16 + 7));
 
1192
 
 
1193
  seg.tsval = ntohl(*(header_buf.u32 + 4));
 
1194
  seg.tsecr = ntohl(*(header_buf.u32 + 5));
 
1195
 
 
1196
  seg.data = (const gchar *) data_buf;
 
1197
  seg.len = data_buf_len;
860
1198
 
861
1199
  DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "--> <CONV=%d><FLG=%d><SEQ=%d:%d><ACK=%d>"
862
1200
      "<WND=%d><TS=%d><TSR=%d><LEN=%d>",
876
1214
  gboolean bIgnoreData;
877
1215
  gboolean bNewData;
878
1216
  gboolean bConnect = FALSE;
 
1217
  gsize snd_buffered;
 
1218
  gsize available_space;
 
1219
  guint32 kIdealRefillSize;
879
1220
 
880
1221
  /* If this is the wrong conversation, send a reset!?!
881
1222
     (with the correct conversation?) */
911
1252
      return FALSE;
912
1253
    } else if (seg->data[0] == CTL_CONNECT) {
913
1254
      bConnect = TRUE;
 
1255
 
 
1256
      parse_options (self, (guint8 *) &seg->data[1], seg->len - 1);
 
1257
 
914
1258
      if (priv->state == TCP_LISTEN) {
915
 
        char buffer[1];
916
1259
        priv->state = TCP_SYN_RECEIVED;
917
 
        buffer[0] = CTL_CONNECT;
918
 
        queue(self, buffer, 1, TRUE);
 
1260
        queue_connect_message (self);
919
1261
      } else if (priv->state == TCP_SYN_SENT) {
920
1262
        priv->state = TCP_ESTABLISHED;
921
1263
        DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED");
931
1273
  }
932
1274
 
933
1275
  // Update timestamp
934
 
  if ((seg->seq <= priv->ts_lastack) &&
935
 
      (priv->ts_lastack < seg->seq + seg->len)) {
 
1276
  if (SMALLER_OR_EQUAL (seg->seq, priv->ts_lastack) &&
 
1277
      SMALLER (priv->ts_lastack, seg->seq + seg->len)) {
936
1278
    priv->ts_recent = seg->tsval;
937
1279
  }
938
1280
 
939
1281
  // Check if this is a valuable ack
940
 
  if ((seg->ack > priv->snd_una) && (seg->ack <= priv->snd_nxt)) {
 
1282
  if (LARGER(seg->ack, priv->snd_una) &&
 
1283
      SMALLER_OR_EQUAL(seg->ack, priv->snd_nxt)) {
941
1284
    guint32 nAcked;
942
1285
    guint32 nFree;
943
 
    guint32 kIdealRefillSize;
944
1286
 
945
1287
    // Calculate round-trip time
946
1288
    if (seg->tsecr) {
960
1302
        DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "rtt: %ld   srtt: %d  rto: %d",
961
1303
                rtt, priv->rx_srtt, priv->rx_rto);
962
1304
      } else {
963
 
        g_assert_not_reached ();
 
1305
        DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid RTT: %ld", rtt);
 
1306
        return FALSE;
964
1307
      }
965
1308
    }
966
1309
 
967
 
    priv->snd_wnd = seg->wnd;
 
1310
    priv->snd_wnd = seg->wnd << priv->swnd_scale;
968
1311
 
969
1312
    nAcked = seg->ack - priv->snd_una;
970
1313
    priv->snd_una = seg->ack;
971
1314
 
972
1315
    priv->rto_base = (priv->snd_una == priv->snd_nxt) ? 0 : now;
973
1316
 
974
 
    priv->slen -= nAcked;
975
 
    memmove(priv->sbuf, priv->sbuf + nAcked, priv->slen);
976
 
    //LOG(LS_INFO) << "PseudoTcp::process - priv->slen = " << priv->slen;
 
1317
    pseudo_tcp_fifo_consume_read_data (&priv->sbuf, nAcked);
977
1318
 
978
1319
    for (nFree = nAcked; nFree > 0; ) {
979
1320
      SSegment *data;
980
1321
 
981
 
      g_assert(priv->slist != NULL);
982
 
      data = (SSegment *) (priv->slist->data);
 
1322
      g_assert(g_queue_get_length (&priv->slist) != 0);
 
1323
      data = (SSegment *) g_queue_peek_head (&priv->slist);
983
1324
 
984
1325
      if (nFree < data->len) {
985
1326
        data->len -= nFree;
 
1327
        data->seq += nFree;
986
1328
        nFree = 0;
987
1329
      } else {
988
1330
        if (data->len > priv->largest) {
989
1331
          priv->largest = data->len;
990
1332
        }
991
1333
        nFree -= data->len;
992
 
        g_slice_free (SSegment, priv->slist->data);
993
 
        priv->slist = g_list_delete_link (priv->slist, priv->slist);
 
1334
        g_slice_free (SSegment, data);
 
1335
        g_queue_pop_head (&priv->slist);
994
1336
      }
995
1337
    }
996
1338
 
997
1339
    if (priv->dup_acks >= 3) {
998
 
      if (priv->snd_una >= priv->recover) { // NewReno
 
1340
      if (LARGER_OR_EQUAL (priv->snd_una, priv->recover)) { // NewReno
999
1341
        guint32 nInFlight = priv->snd_nxt - priv->snd_una;
1000
1342
        // (Fast Retransmit)
1001
1343
        priv->cwnd = min(priv->ssthresh, nInFlight + priv->mss);
1003
1345
        priv->dup_acks = 0;
1004
1346
      } else {
1005
1347
        DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "recovery retransmit");
1006
 
        if (!transmit(self, priv->slist, now)) {
 
1348
        if (!transmit(self, g_queue_peek_head (&priv->slist), now)) {
1007
1349
          closedown(self, ECONNABORTED);
1008
1350
          return FALSE;
1009
1351
        }
1018
1360
        priv->cwnd += max(1LU, priv->mss * priv->mss / priv->cwnd);
1019
1361
      }
1020
1362
    }
1021
 
 
1022
 
    // !?! A bit hacky
1023
 
    if ((priv->state == TCP_SYN_RECEIVED) && !bConnect) {
1024
 
      priv->state = TCP_ESTABLISHED;
1025
 
      DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED");
1026
 
      adjustMTU(self);
1027
 
      if (priv->callbacks.PseudoTcpOpened)
1028
 
        priv->callbacks.PseudoTcpOpened(self, priv->callbacks.user_data);
1029
 
    }
1030
 
 
1031
 
    // If we make room in the send queue, notify the user
1032
 
    // The goal it to make sure we always have at least enough data to fill the
1033
 
    // window.  We'd like to notify the app when we are halfway to that point.
1034
 
    kIdealRefillSize = (sizeof(priv->sbuf) + sizeof(priv->rbuf)) / 2;
1035
 
    if (priv->bWriteEnable && (priv->slen < kIdealRefillSize)) {
1036
 
      priv->bWriteEnable = FALSE;
1037
 
      if (priv->callbacks.PseudoTcpWritable)
1038
 
        priv->callbacks.PseudoTcpWritable(self, priv->callbacks.user_data);
1039
 
    }
1040
1363
  } else if (seg->ack == priv->snd_una) {
1041
1364
    /* !?! Note, tcp says don't do this... but otherwise how does a
1042
1365
       closed window become open? */
1043
 
    priv->snd_wnd = seg->wnd;
 
1366
    priv->snd_wnd = seg->wnd << priv->swnd_scale;
1044
1367
 
1045
1368
    // Check duplicate acks
1046
1369
    if (seg->len > 0) {
1052
1375
      if (priv->dup_acks == 3) { // (Fast Retransmit)
1053
1376
        DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "enter recovery");
1054
1377
        DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "recovery retransmit");
1055
 
        if (!transmit(self, priv->slist, now)) {
 
1378
        if (!transmit(self, g_queue_peek_head (&priv->slist), now)) {
1056
1379
          closedown(self, ECONNABORTED);
1057
1380
          return FALSE;
1058
1381
        }
1069
1392
    }
1070
1393
  }
1071
1394
 
 
1395
  // !?! A bit hacky
 
1396
  if ((priv->state == TCP_SYN_RECEIVED) && !bConnect) {
 
1397
    priv->state = TCP_ESTABLISHED;
 
1398
    DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED");
 
1399
    adjustMTU(self);
 
1400
    if (priv->callbacks.PseudoTcpOpened)
 
1401
      priv->callbacks.PseudoTcpOpened(self, priv->callbacks.user_data);
 
1402
  }
 
1403
 
 
1404
  // If we make room in the send queue, notify the user
 
1405
  // The goal it to make sure we always have at least enough data to fill the
 
1406
  // window.  We'd like to notify the app when we are halfway to that point.
 
1407
  kIdealRefillSize = (priv->sbuf_len + priv->rbuf_len) / 2;
 
1408
 
 
1409
  snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
 
1410
  if (priv->bWriteEnable && snd_buffered < kIdealRefillSize) {
 
1411
    priv->bWriteEnable = FALSE;
 
1412
    if (priv->callbacks.PseudoTcpWritable)
 
1413
      priv->callbacks.PseudoTcpWritable(self, priv->callbacks.user_data);
 
1414
  }
 
1415
 
1072
1416
  /* Conditions where acks must be sent:
1073
1417
   * 1) Segment is too old (they missed an ACK) (immediately)
1074
1418
   * 2) Segment is too new (we missed a segment) (immediately)
1080
1424
  if (seg->seq != priv->rcv_nxt) {
1081
1425
    sflags = sfImmediateAck; // (Fast Recovery)
1082
1426
  } else if (seg->len != 0) {
1083
 
    sflags = sfDelayedAck;
 
1427
    if (priv->ack_delay == 0) {
 
1428
      sflags = sfImmediateAck;
 
1429
    } else {
 
1430
      sflags = sfDelayedAck;
 
1431
    }
1084
1432
  }
1085
1433
  if (sflags == sfImmediateAck) {
1086
1434
    if (seg->seq > priv->rcv_nxt) {
1087
1435
      DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "too new");
1088
 
    } else if (seg->seq + seg->len <= priv->rcv_nxt) {
 
1436
    } else if (SMALLER_OR_EQUAL(seg->seq + seg->len, priv->rcv_nxt)) {
1089
1437
      DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "too old");
1090
1438
    }
1091
1439
  }
1092
1440
 
1093
1441
  // Adjust the incoming segment to fit our receive buffer
1094
 
  if (seg->seq < priv->rcv_nxt) {
 
1442
  if (SMALLER(seg->seq, priv->rcv_nxt)) {
1095
1443
    guint32 nAdjust = priv->rcv_nxt - seg->seq;
1096
1444
    if (nAdjust < seg->len) {
1097
1445
      seg->seq += nAdjust;
1101
1449
      seg->len = 0;
1102
1450
    }
1103
1451
  }
1104
 
  if ((seg->seq + seg->len - priv->rcv_nxt) >
1105
 
      (sizeof(priv->rbuf) - priv->rlen)) {
1106
 
    guint32 nAdjust = seg->seq + seg->len - priv->rcv_nxt -
1107
 
        (sizeof(priv->rbuf) - priv->rlen);
 
1452
 
 
1453
  available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf);
 
1454
 
 
1455
  if ((seg->seq + seg->len - priv->rcv_nxt) > available_space) {
 
1456
    guint32 nAdjust = seg->seq + seg->len - priv->rcv_nxt - available_space;
1108
1457
    if (nAdjust < seg->len) {
1109
1458
      seg->len -= nAdjust;
1110
1459
    } else {
1122
1471
      }
1123
1472
    } else {
1124
1473
      guint32 nOffset = seg->seq - priv->rcv_nxt;
1125
 
      memcpy(priv->rbuf + priv->rlen + nOffset, seg->data, seg->len);
 
1474
      gsize res;
 
1475
 
 
1476
      res = pseudo_tcp_fifo_write_offset (&priv->rbuf, (guint8 *) seg->data,
 
1477
          seg->len, nOffset);
 
1478
      g_assert (res == seg->len);
 
1479
 
1126
1480
      if (seg->seq == priv->rcv_nxt) {
1127
1481
        GList *iter = NULL;
1128
1482
 
1129
 
        priv->rlen += seg->len;
 
1483
        pseudo_tcp_fifo_consume_write_buffer (&priv->rbuf, seg->len);
1130
1484
        priv->rcv_nxt += seg->len;
1131
1485
        priv->rcv_wnd -= seg->len;
1132
1486
        bNewData = TRUE;
1133
1487
 
1134
1488
        iter = priv->rlist;
1135
 
        while (iter && (((RSegment *)iter->data)->seq <= priv->rcv_nxt)) {
 
1489
        while (iter &&
 
1490
            SMALLER_OR_EQUAL(((RSegment *)iter->data)->seq, priv->rcv_nxt)) {
1136
1491
          RSegment *data = (RSegment *)(iter->data);
1137
 
          if (data->seq + data->len > priv->rcv_nxt) {
 
1492
          if (LARGER (data->seq + data->len, priv->rcv_nxt)) {
1138
1493
            guint32 nAdjust = (data->seq + data->len) - priv->rcv_nxt;
1139
1494
            sflags = sfImmediateAck; // (Fast Recovery)
1140
1495
            DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Recovered %d bytes (%d -> %d)",
1141
1496
                nAdjust, priv->rcv_nxt, priv->rcv_nxt + nAdjust);
1142
 
            priv->rlen += nAdjust;
 
1497
            pseudo_tcp_fifo_consume_write_buffer (&priv->rbuf, nAdjust);
1143
1498
            priv->rcv_nxt += nAdjust;
1144
1499
            priv->rcv_wnd -= nAdjust;
1145
1500
          }
1156
1511
        rseg->seq = seg->seq;
1157
1512
        rseg->len = seg->len;
1158
1513
        iter = priv->rlist;
1159
 
        while (iter && (((RSegment*)iter->data)->seq < rseg->seq)) {
 
1514
        while (iter && SMALLER (((RSegment*)iter->data)->seq, rseg->seq)) {
1160
1515
          iter = g_list_next (iter);
1161
1516
        }
1162
1517
        priv->rlist = g_list_insert_before(priv->rlist, iter, rseg);
1168
1523
 
1169
1524
  // If we have new data, notify the user
1170
1525
  if (bNewData && priv->bReadEnable) {
1171
 
    priv->bReadEnable = FALSE;
 
1526
    /* priv->bReadEnable = FALSE; — removed so that we’re always notified of
 
1527
     * incoming pseudo-TCP data, rather than having to read the entire buffer
 
1528
     * on each readable() callback before the next callback is enabled.
 
1529
     * (When client-provided buffers are small, this is not possible.) */
1172
1530
    if (priv->callbacks.PseudoTcpReadable)
1173
1531
      priv->callbacks.PseudoTcpReadable(self, priv->callbacks.user_data);
1174
1532
  }
1177
1535
}
1178
1536
 
1179
1537
static gboolean
1180
 
transmit(PseudoTcpSocket *self, const GList *seg, guint32 now)
 
1538
transmit(PseudoTcpSocket *self, SSegment *segment, guint32 now)
1181
1539
{
1182
1540
  PseudoTcpSocketPrivate *priv = self->priv;
1183
 
  SSegment *segment = (SSegment*)(seg->data);
1184
1541
  guint32 nTransmit = min(segment->len, priv->mss);
1185
1542
 
1186
1543
  if (segment->xmit >= ((priv->state == TCP_ESTABLISHED) ? 15 : 30)) {
1191
1548
  while (TRUE) {
1192
1549
    guint32 seq = segment->seq;
1193
1550
    guint8 flags = (segment->bCtrl ? FLAG_CTL : 0);
1194
 
    const gchar * buffer = priv->sbuf + (segment->seq - priv->snd_una);
1195
 
    PseudoTcpWriteResult wres = packet(self, seq, flags, buffer, nTransmit);
 
1551
    PseudoTcpWriteResult wres;
 
1552
 
 
1553
    /* The packet must not have already been acknowledged. */
 
1554
    g_assert_cmpuint (segment->seq, >=, priv->snd_una);
 
1555
 
 
1556
    /* Write out the packet. */
 
1557
    wres = packet(self, seq, flags,
 
1558
        segment->seq - priv->snd_una, nTransmit, now);
1196
1559
 
1197
1560
    if (wres == WR_SUCCESS)
1198
1561
      break;
1234
1597
    DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "mss reduced to %d", priv->mss);
1235
1598
 
1236
1599
    segment->len = nTransmit;
1237
 
    priv->slist = g_list_insert_before(priv->slist, seg->next, subseg);
 
1600
    g_queue_insert_after (&priv->slist,
 
1601
        g_queue_find (&priv->slist, segment), subseg);
 
1602
    if (subseg->xmit == 0)
 
1603
      g_queue_insert_after (&priv->unsent_slist,
 
1604
          g_queue_find (&priv->unsent_slist, segment), subseg);
1238
1605
  }
1239
1606
 
1240
1607
  if (segment->xmit == 0) {
 
1608
    g_assert (g_queue_peek_head (&priv->unsent_slist) == segment);
 
1609
    g_queue_pop_head (&priv->unsent_slist);
1241
1610
    priv->snd_nxt += segment->len;
1242
1611
  }
1243
1612
  segment->xmit += 1;
1267
1636
    guint32 nInFlight;
1268
1637
    guint32 nUseable;
1269
1638
    guint32 nAvailable;
 
1639
    gsize snd_buffered;
1270
1640
    GList *iter;
 
1641
    SSegment *sseg;
1271
1642
 
1272
1643
    cwnd = priv->cwnd;
1273
1644
    if ((priv->dup_acks == 1) || (priv->dup_acks == 2)) { // Limited Transmit
1276
1647
    nWindow = min(priv->snd_wnd, cwnd);
1277
1648
    nInFlight = priv->snd_nxt - priv->snd_una;
1278
1649
    nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0;
1279
 
    nAvailable = min(priv->slen - nInFlight, priv->mss);
 
1650
    snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
 
1651
    nAvailable = min(snd_buffered - nInFlight, priv->mss);
1280
1652
 
1281
1653
    if (nAvailable > nUseable) {
1282
1654
      if (nUseable * 4 < nWindow) {
1288
1660
    }
1289
1661
 
1290
1662
    if (bFirst) {
 
1663
      gsize available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
1291
1664
      bFirst = FALSE;
1292
1665
      DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "[cwnd: %d  nWindow: %d  nInFlight: %d "
1293
 
          "nAvailable: %d nQueued: %d  nEmpty: %" G_GSIZE_FORMAT
 
1666
          "nAvailable: %d nQueued: %" G_GSIZE_FORMAT " nEmpty: %" G_GSIZE_FORMAT
1294
1667
          "  ssthresh: %d]",
1295
 
          priv->cwnd, nWindow, nInFlight, nAvailable, priv->slen - nInFlight,
1296
 
          sizeof(priv->sbuf) - priv->slen, priv->ssthresh);
 
1668
          priv->cwnd, nWindow, nInFlight, nAvailable, snd_buffered,
 
1669
          available_space, priv->ssthresh);
1297
1670
    }
1298
1671
 
1299
1672
    if (nAvailable == 0) {
1302
1675
 
1303
1676
      // If this is an immediate ack, or the second delayed ack
1304
1677
      if ((sflags == sfImmediateAck) || priv->t_ack) {
1305
 
        packet(self, priv->snd_nxt, 0, 0, 0);
 
1678
        packet(self, priv->snd_nxt, 0, 0, 0, now);
1306
1679
      } else {
1307
 
        priv->t_ack = get_current_time();
 
1680
        priv->t_ack = now;
1308
1681
      }
1309
1682
      return;
1310
1683
    }
1311
1684
 
1312
1685
    // Nagle algorithm
1313
 
    if ((priv->snd_nxt > priv->snd_una) && (nAvailable < priv->mss))  {
 
1686
    // If there is data already in-flight, and we haven't a full segment of
 
1687
    // data ready to send then hold off until we get more to send, or the
 
1688
    // in-flight data is acknowledged.
 
1689
    if (priv->use_nagling && (priv->snd_nxt > priv->snd_una) &&
 
1690
        (nAvailable < priv->mss))  {
1314
1691
      return;
1315
1692
    }
1316
1693
 
1317
1694
    // Find the next segment to transmit
1318
 
    iter = priv->slist;
1319
 
    while (((SSegment*)iter->data)->xmit > 0) {
1320
 
      iter = g_list_next (iter);
1321
 
      g_assert(iter);
1322
 
    }
 
1695
    iter = g_queue_peek_head_link (&priv->unsent_slist);
 
1696
    sseg = iter->data;
1323
1697
 
1324
1698
    // If the segment is too large, break it into two
1325
 
    if (((SSegment*)iter->data)->len > nAvailable) {
 
1699
    if (sseg->len > nAvailable) {
1326
1700
      SSegment *subseg = g_slice_new0 (SSegment);
1327
 
      subseg->seq = ((SSegment*)iter->data)->seq + nAvailable;
1328
 
      subseg->len = ((SSegment*)iter->data)->len - nAvailable;
1329
 
      subseg->bCtrl = ((SSegment*)iter->data)->bCtrl;
 
1701
      subseg->seq = sseg->seq + nAvailable;
 
1702
      subseg->len = sseg->len - nAvailable;
 
1703
      subseg->bCtrl = sseg->bCtrl;
1330
1704
 
1331
 
      ((SSegment*)iter->data)->len = nAvailable;
1332
 
      priv->slist = g_list_insert_before(priv->slist, iter->next, subseg);
 
1705
      sseg->len = nAvailable;
 
1706
      g_queue_insert_after (&priv->unsent_slist, iter, subseg);
 
1707
      g_queue_insert_after (&priv->slist, g_queue_find (&priv->slist, sseg),
 
1708
          subseg);
1333
1709
    }
1334
1710
 
1335
 
    if (!transmit(self, iter, now)) {
 
1711
    if (!transmit(self, sseg, now)) {
1336
1712
      DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "transmit failed");
1337
1713
      // TODO: consider closing socket
1338
1714
      return;
1346
1722
closedown(PseudoTcpSocket *self, guint32 err)
1347
1723
{
1348
1724
  PseudoTcpSocketPrivate *priv = self->priv;
1349
 
  priv->slen = 0;
1350
1725
 
1351
1726
  DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_CLOSED");
1352
1727
  priv->state = TCP_CLOSED;
1374
1749
  priv->ssthresh = max(priv->ssthresh, 2 * priv->mss);
1375
1750
  priv->cwnd = max(priv->cwnd, priv->mss);
1376
1751
}
 
1752
 
 
1753
static void
 
1754
apply_window_scale_option (PseudoTcpSocket *self, guint8 scale_factor)
 
1755
{
 
1756
   PseudoTcpSocketPrivate *priv = self->priv;
 
1757
 
 
1758
   priv->swnd_scale = scale_factor;
 
1759
}
 
1760
 
 
1761
static void
 
1762
apply_option(PseudoTcpSocket *self, char kind, const guint8* data, guint32 len)
 
1763
{
 
1764
  if (kind == TCP_OPT_MSS) {
 
1765
    DEBUG (PSEUDO_TCP_DEBUG_NORMAL,
 
1766
        "Peer specified MSS option which is not supported.");
 
1767
    // TODO: Implement.
 
1768
  } else if (kind == TCP_OPT_WND_SCALE) {
 
1769
    // Window scale factor.
 
1770
    // http://www.ietf.org/rfc/rfc1323.txt
 
1771
    if (len != 1) {
 
1772
      DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid window scale option received.");
 
1773
      return;
 
1774
    }
 
1775
    apply_window_scale_option(self, data[0]);
 
1776
  }
 
1777
}
 
1778
 
 
1779
 
 
1780
static void
 
1781
parse_options (PseudoTcpSocket *self, const guint8 *data, guint32 len)
 
1782
{
 
1783
  PseudoTcpSocketPrivate *priv = self->priv;
 
1784
  gboolean has_window_scaling_option = FALSE;
 
1785
  guint32 pos = 0;
 
1786
 
 
1787
  // See http://www.freesoft.org/CIE/Course/Section4/8.htm for
 
1788
  // parsing the options list.
 
1789
  while (pos < len) {
 
1790
    guint8 kind = TCP_OPT_EOL;
 
1791
    guint8 opt_len;
 
1792
 
 
1793
    if (len < pos + 1)
 
1794
      return;
 
1795
 
 
1796
    kind = data[pos];
 
1797
    pos++;
 
1798
 
 
1799
    if (kind == TCP_OPT_EOL) {
 
1800
      // End of option list.
 
1801
      break;
 
1802
    } else if (kind == TCP_OPT_NOOP) {
 
1803
      // No op.
 
1804
      continue;
 
1805
    }
 
1806
 
 
1807
    if (len < pos + 1)
 
1808
      return;
 
1809
 
 
1810
    // Length of this option.
 
1811
    opt_len = data[pos];
 
1812
    pos++;
 
1813
 
 
1814
    if (len < pos + opt_len)
 
1815
      return;
 
1816
 
 
1817
    // Content of this option.
 
1818
    if (opt_len <= len - pos) {
 
1819
      apply_option (self, kind, data + pos, opt_len);
 
1820
      pos += opt_len;
 
1821
    } else {
 
1822
      DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid option length received.");
 
1823
      return;
 
1824
    }
 
1825
 
 
1826
    if (kind == TCP_OPT_WND_SCALE)
 
1827
      has_window_scaling_option = TRUE;
 
1828
  }
 
1829
 
 
1830
  if (!has_window_scaling_option) {
 
1831
    DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Peer doesn't support window scaling");
 
1832
    if (priv->rwnd_scale > 0) {
 
1833
      // Peer doesn't support TCP options and window scaling.
 
1834
      // Revert receive buffer size to default value.
 
1835
      resize_receive_buffer (self, DEFAULT_RCV_BUF_SIZE);
 
1836
      priv->swnd_scale = 0;
 
1837
    }
 
1838
  }
 
1839
}
 
1840
 
 
1841
static void
 
1842
resize_send_buffer (PseudoTcpSocket *self, guint32 new_size)
 
1843
{
 
1844
  PseudoTcpSocketPrivate *priv = self->priv;
 
1845
 
 
1846
  priv->sbuf_len = new_size;
 
1847
  pseudo_tcp_fifo_set_capacity (&priv->sbuf, new_size);
 
1848
}
 
1849
 
 
1850
 
 
1851
static void
 
1852
resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size)
 
1853
{
 
1854
  PseudoTcpSocketPrivate *priv = self->priv;
 
1855
  guint8 scale_factor = 0;
 
1856
  gboolean result;
 
1857
  gsize available_space;
 
1858
 
 
1859
  if (priv->rbuf_len == new_size)
 
1860
    return;
 
1861
 
 
1862
  // Determine the scale factor such that the scaled window size can fit
 
1863
  // in a 16-bit unsigned integer.
 
1864
  while (new_size > 0xFFFF) {
 
1865
    ++scale_factor;
 
1866
    new_size >>= 1;
 
1867
  }
 
1868
 
 
1869
  // Determine the proper size of the buffer.
 
1870
  new_size <<= scale_factor;
 
1871
  result = pseudo_tcp_fifo_set_capacity (&priv->rbuf, new_size);
 
1872
 
 
1873
  // Make sure the new buffer is large enough to contain data in the old
 
1874
  // buffer. This should always be true because this method is called either
 
1875
  // before connection is established or when peers are exchanging connect
 
1876
  // messages.
 
1877
  g_assert(result);
 
1878
  priv->rbuf_len = new_size;
 
1879
  priv->rwnd_scale = scale_factor;
 
1880
  priv->ssthresh = new_size;
 
1881
 
 
1882
  available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf);
 
1883
  priv->rcv_wnd = available_space;
 
1884
}
 
1885
 
 
1886
gint
 
1887
pseudo_tcp_socket_get_available_bytes (PseudoTcpSocket *self)
 
1888
{
 
1889
  PseudoTcpSocketPrivate *priv = self->priv;
 
1890
 
 
1891
  if (priv->state != TCP_ESTABLISHED) {
 
1892
    return -1;
 
1893
  }
 
1894
 
 
1895
  return pseudo_tcp_fifo_get_buffered (&priv->rbuf);
 
1896
}
 
1897
 
 
1898
gboolean
 
1899
pseudo_tcp_socket_can_send (PseudoTcpSocket *self)
 
1900
{
 
1901
  return (pseudo_tcp_socket_get_available_send_space (self) > 0);
 
1902
}
 
1903
 
 
1904
gsize
 
1905
pseudo_tcp_socket_get_available_send_space (PseudoTcpSocket *self)
 
1906
{
 
1907
  PseudoTcpSocketPrivate *priv = self->priv;
 
1908
  gsize ret;
 
1909
 
 
1910
 
 
1911
  if (priv->state == TCP_ESTABLISHED)
 
1912
    ret = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
 
1913
  else
 
1914
    ret = 0;
 
1915
 
 
1916
  if (ret == 0)
 
1917
    priv->bWriteEnable = TRUE;
 
1918
 
 
1919
  return ret;
 
1920
}