~ubuntu-branches/debian/experimental/kopete/experimental

« back to all changes in this revision

Viewing changes to protocols/jabber/libjingle/talk/p2p/base/pseudotcp_unittest.cc

  • Committer: Package Import Robot
  • Author(s): Maximiliano Curia
  • Date: 2015-02-24 11:32:57 UTC
  • mfrom: (1.1.41 vivid)
  • Revision ID: package-import@ubuntu.com-20150224113257-gnupg4v7lzz18ij0
Tags: 4:14.12.2-1
* New upstream release (14.12.2).
* Bump Standards-Version to 3.9.6, no changes needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * libjingle
 
3
 * Copyright 2011 Google Inc.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions are met:
 
7
 *
 
8
 *  1. Redistributions of source code must retain the above copyright notice,
 
9
 *     this list of conditions and the following disclaimer.
 
10
 *  2. Redistributions in binary form must reproduce the above copyright notice,
 
11
 *     this list of conditions and the following disclaimer in the documentation
 
12
 *     and/or other materials provided with the distribution.
 
13
 *  3. The name of the author may not be used to endorse or promote products
 
14
 *     derived from this software without specific prior written permission.
 
15
 *
 
16
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
 
17
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
 
18
 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
 
19
 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
20
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 
21
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
 
22
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 
23
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
 
24
 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 
25
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
26
 */
 
27
 
 
28
#include <vector>
 
29
 
 
30
#include "talk/base/gunit.h"
 
31
#include "talk/base/helpers.h"
 
32
#include "talk/base/messagehandler.h"
 
33
#include "talk/base/stream.h"
 
34
#include "talk/base/thread.h"
 
35
#include "talk/base/timeutils.h"
 
36
#include "talk/p2p/base/pseudotcp.h"
 
37
 
 
38
using cricket::PseudoTcp;
 
39
 
 
40
static const int kConnectTimeoutMs = 10000;  // ~3 * default RTO of 3000ms
 
41
static const int kTransferTimeoutMs = 15000;
 
42
static const int kBlockSize = 4096;
 
43
 
 
44
class PseudoTcpForTest : public cricket::PseudoTcp {
 
45
 public:
 
46
  PseudoTcpForTest(cricket::IPseudoTcpNotify* notify, uint32 conv)
 
47
      : PseudoTcp(notify, conv) {
 
48
  }
 
49
 
 
50
  bool isReceiveBufferFull() const {
 
51
    return PseudoTcp::isReceiveBufferFull();
 
52
  }
 
53
 
 
54
  void disableWindowScale() {
 
55
    PseudoTcp::disableWindowScale();
 
56
  }
 
57
};
 
58
 
 
59
class PseudoTcpTestBase : public testing::Test,
 
60
                      public talk_base::MessageHandler,
 
61
                      public cricket::IPseudoTcpNotify {
 
62
 public:
 
63
  PseudoTcpTestBase()
 
64
      : local_(this, 1),
 
65
        remote_(this, 1),
 
66
        have_connected_(false),
 
67
        have_disconnected_(false),
 
68
        local_mtu_(65535),
 
69
        remote_mtu_(65535),
 
70
        delay_(0),
 
71
        loss_(0) {
 
72
    // Set use of the test RNG to get predictable loss patterns.
 
73
    talk_base::SetRandomTestMode(true);
 
74
  }
 
75
  ~PseudoTcpTestBase() {
 
76
    // Put it back for the next test.
 
77
    talk_base::SetRandomTestMode(false);
 
78
  }
 
79
  void SetLocalMtu(int mtu) {
 
80
    local_.NotifyMTU(mtu);
 
81
    local_mtu_ = mtu;
 
82
  }
 
83
  void SetRemoteMtu(int mtu) {
 
84
    remote_.NotifyMTU(mtu);
 
85
    remote_mtu_ = mtu;
 
86
  }
 
87
  void SetDelay(int delay) {
 
88
    delay_ = delay;
 
89
  }
 
90
  void SetLoss(int percent) {
 
91
    loss_ = percent;
 
92
  }
 
93
  void SetOptNagling(bool enable_nagles) {
 
94
    local_.SetOption(PseudoTcp::OPT_NODELAY, !enable_nagles);
 
95
    remote_.SetOption(PseudoTcp::OPT_NODELAY, !enable_nagles);
 
96
  }
 
97
  void SetOptAckDelay(int ack_delay) {
 
98
    local_.SetOption(PseudoTcp::OPT_ACKDELAY, ack_delay);
 
99
    remote_.SetOption(PseudoTcp::OPT_ACKDELAY, ack_delay);
 
100
  }
 
101
  void SetOptSndBuf(int size) {
 
102
    local_.SetOption(PseudoTcp::OPT_SNDBUF, size);
 
103
    remote_.SetOption(PseudoTcp::OPT_SNDBUF, size);
 
104
  }
 
105
  void SetRemoteOptRcvBuf(int size) {
 
106
    remote_.SetOption(PseudoTcp::OPT_RCVBUF, size);
 
107
  }
 
108
  void SetLocalOptRcvBuf(int size) {
 
109
    local_.SetOption(PseudoTcp::OPT_RCVBUF, size);
 
110
  }
 
111
  void DisableRemoteWindowScale() {
 
112
    remote_.disableWindowScale();
 
113
  }
 
114
  void DisableLocalWindowScale() {
 
115
    local_.disableWindowScale();
 
116
  }
 
117
 
 
118
 protected:
 
119
  int Connect() {
 
120
    int ret = local_.Connect();
 
121
    if (ret == 0) {
 
122
      UpdateLocalClock();
 
123
    }
 
124
    return ret;
 
125
  }
 
126
  void Close() {
 
127
    local_.Close(false);
 
128
    UpdateLocalClock();
 
129
  }
 
130
 
 
131
  enum { MSG_LPACKET, MSG_RPACKET, MSG_LCLOCK, MSG_RCLOCK, MSG_IOCOMPLETE,
 
132
         MSG_WRITE};
 
133
  virtual void OnTcpOpen(PseudoTcp* tcp) {
 
134
    // Consider ourselves connected when the local side gets OnTcpOpen.
 
135
    // OnTcpWriteable isn't fired at open, so we trigger it now.
 
136
    LOG(LS_VERBOSE) << "Opened";
 
137
    if (tcp == &local_) {
 
138
      have_connected_ = true;
 
139
      OnTcpWriteable(tcp);
 
140
    }
 
141
  }
 
142
  // Test derived from the base should override
 
143
  //   virtual void OnTcpReadable(PseudoTcp* tcp)
 
144
  // and
 
145
  //   virtual void OnTcpWritable(PseudoTcp* tcp)
 
146
  virtual void OnTcpClosed(PseudoTcp* tcp, uint32 error) {
 
147
    // Consider ourselves closed when the remote side gets OnTcpClosed.
 
148
    // TODO: OnTcpClosed is only ever notified in case of error in
 
149
    // the current implementation.  Solicited close is not (yet) supported.
 
150
    LOG(LS_VERBOSE) << "Closed";
 
151
    EXPECT_EQ(0U, error);
 
152
    if (tcp == &remote_) {
 
153
      have_disconnected_ = true;
 
154
    }
 
155
  }
 
156
  virtual WriteResult TcpWritePacket(PseudoTcp* tcp,
 
157
                                     const char* buffer, size_t len) {
 
158
    // Randomly drop the desired percentage of packets.
 
159
    // Also drop packets that are larger than the configured MTU.
 
160
    if (talk_base::CreateRandomId() % 100 < static_cast<uint32>(loss_)) {
 
161
      LOG(LS_VERBOSE) << "Randomly dropping packet, size=" << len;
 
162
    } else if (len > static_cast<size_t>(
 
163
        talk_base::_min(local_mtu_, remote_mtu_))) {
 
164
      LOG(LS_VERBOSE) << "Dropping packet that exceeds path MTU, size=" << len;
 
165
    } else {
 
166
      int id = (tcp == &local_) ? MSG_RPACKET : MSG_LPACKET;
 
167
      std::string packet(buffer, len);
 
168
      talk_base::Thread::Current()->PostDelayed(delay_, this, id,
 
169
          talk_base::WrapMessageData(packet));
 
170
    }
 
171
    return WR_SUCCESS;
 
172
  }
 
173
 
 
174
  void UpdateLocalClock() { UpdateClock(&local_, MSG_LCLOCK); }
 
175
  void UpdateRemoteClock() { UpdateClock(&remote_, MSG_RCLOCK); }
 
176
  void UpdateClock(PseudoTcp* tcp, uint32 message) {
 
177
    long interval;  // NOLINT
 
178
    tcp->GetNextClock(PseudoTcp::Now(), interval);
 
179
    interval = talk_base::_max<int>(interval, 0L);  // sometimes interval is < 0
 
180
    talk_base::Thread::Current()->Clear(this, message);
 
181
    talk_base::Thread::Current()->PostDelayed(interval, this, message);
 
182
  }
 
183
 
 
184
  virtual void OnMessage(talk_base::Message* message) {
 
185
    switch (message->message_id) {
 
186
      case MSG_LPACKET: {
 
187
        const std::string& s(
 
188
            talk_base::UseMessageData<std::string>(message->pdata));
 
189
        local_.NotifyPacket(s.c_str(), s.size());
 
190
        UpdateLocalClock();
 
191
        break;
 
192
      }
 
193
      case MSG_RPACKET: {
 
194
        const std::string& s(
 
195
            talk_base::UseMessageData<std::string>(message->pdata));
 
196
        remote_.NotifyPacket(s.c_str(), s.size());
 
197
        UpdateRemoteClock();
 
198
        break;
 
199
      }
 
200
      case MSG_LCLOCK:
 
201
        local_.NotifyClock(PseudoTcp::Now());
 
202
        UpdateLocalClock();
 
203
        break;
 
204
      case MSG_RCLOCK:
 
205
        remote_.NotifyClock(PseudoTcp::Now());
 
206
        UpdateRemoteClock();
 
207
        break;
 
208
      default:
 
209
        break;
 
210
    }
 
211
    delete message->pdata;
 
212
  }
 
213
 
 
214
  PseudoTcpForTest local_;
 
215
  PseudoTcpForTest remote_;
 
216
  talk_base::MemoryStream send_stream_;
 
217
  talk_base::MemoryStream recv_stream_;
 
218
  bool have_connected_;
 
219
  bool have_disconnected_;
 
220
  int local_mtu_;
 
221
  int remote_mtu_;
 
222
  int delay_;
 
223
  int loss_;
 
224
};
 
225
 
 
226
class PseudoTcpTest : public PseudoTcpTestBase {
 
227
 public:
 
228
  void TestTransfer(int size) {
 
229
    uint32 start, elapsed;
 
230
    size_t received;
 
231
    // Create some dummy data to send.
 
232
    send_stream_.ReserveSize(size);
 
233
    for (int i = 0; i < size; ++i) {
 
234
      char ch = static_cast<char>(i);
 
235
      send_stream_.Write(&ch, 1, NULL, NULL);
 
236
    }
 
237
    send_stream_.Rewind();
 
238
    // Prepare the receive stream.
 
239
    recv_stream_.ReserveSize(size);
 
240
    // Connect and wait until connected.
 
241
    start = talk_base::Time();
 
242
    EXPECT_EQ(0, Connect());
 
243
    EXPECT_TRUE_WAIT(have_connected_, kConnectTimeoutMs);
 
244
    // Sending will start from OnTcpWriteable and complete when all data has
 
245
    // been received.
 
246
    EXPECT_TRUE_WAIT(have_disconnected_, kTransferTimeoutMs);
 
247
    elapsed = talk_base::TimeSince(start);
 
248
    recv_stream_.GetSize(&received);
 
249
    // Ensure we closed down OK and we got the right data.
 
250
    // TODO: Ensure the errors are cleared properly.
 
251
    //EXPECT_EQ(0, local_.GetError());
 
252
    //EXPECT_EQ(0, remote_.GetError());
 
253
    EXPECT_EQ(static_cast<size_t>(size), received);
 
254
    EXPECT_EQ(0, memcmp(send_stream_.GetBuffer(),
 
255
                        recv_stream_.GetBuffer(), size));
 
256
    LOG(LS_INFO) << "Transferred " << received << " bytes in " << elapsed
 
257
                 << " ms (" << size * 8 / elapsed << " Kbps)";
 
258
  }
 
259
 
 
260
 private:
 
261
  // IPseudoTcpNotify interface
 
262
 
 
263
  virtual void OnTcpReadable(PseudoTcp* tcp) {
 
264
    // Stream bytes to the recv stream as they arrive.
 
265
    if (tcp == &remote_) {
 
266
      ReadData();
 
267
 
 
268
      // TODO: OnTcpClosed() is currently only notified on error -
 
269
      // there is no on-the-wire equivalent of TCP FIN.
 
270
      // So we fake the notification when all the data has been read.
 
271
      size_t received, required;
 
272
      recv_stream_.GetPosition(&received);
 
273
      send_stream_.GetSize(&required);
 
274
      if (received == required)
 
275
        OnTcpClosed(&remote_, 0);
 
276
    }
 
277
  }
 
278
  virtual void OnTcpWriteable(PseudoTcp* tcp) {
 
279
    // Write bytes from the send stream when we can.
 
280
    // Shut down when we've sent everything.
 
281
    if (tcp == &local_) {
 
282
      LOG(LS_VERBOSE) << "Flow Control Lifted";
 
283
      bool done;
 
284
      WriteData(&done);
 
285
      if (done) {
 
286
        Close();
 
287
      }
 
288
    }
 
289
  }
 
290
 
 
291
  void ReadData() {
 
292
    char block[kBlockSize];
 
293
    size_t position;
 
294
    int rcvd;
 
295
    do {
 
296
      rcvd = remote_.Recv(block, sizeof(block));
 
297
      if (rcvd != -1) {
 
298
        recv_stream_.Write(block, rcvd, NULL, NULL);
 
299
        recv_stream_.GetPosition(&position);
 
300
        LOG(LS_VERBOSE) << "Received: " << position;
 
301
      }
 
302
    } while (rcvd > 0);
 
303
  }
 
304
  void WriteData(bool* done) {
 
305
    size_t position, tosend;
 
306
    int sent;
 
307
    char block[kBlockSize];
 
308
    do {
 
309
      send_stream_.GetPosition(&position);
 
310
      if (send_stream_.Read(block, sizeof(block), &tosend, NULL) !=
 
311
          talk_base::SR_EOS) {
 
312
        sent = local_.Send(block, tosend);
 
313
        UpdateLocalClock();
 
314
        if (sent != -1) {
 
315
          send_stream_.SetPosition(position + sent);
 
316
          LOG(LS_VERBOSE) << "Sent: " << position + sent;
 
317
        } else {
 
318
          send_stream_.SetPosition(position);
 
319
          LOG(LS_VERBOSE) << "Flow Controlled";
 
320
        }
 
321
      } else {
 
322
        sent = tosend = 0;
 
323
      }
 
324
    } while (sent > 0);
 
325
    *done = (tosend == 0);
 
326
  }
 
327
 
 
328
 private:
 
329
  talk_base::MemoryStream send_stream_;
 
330
  talk_base::MemoryStream recv_stream_;
 
331
};
 
332
 
 
333
 
 
334
class PseudoTcpTestPingPong : public PseudoTcpTestBase {
 
335
 public:
 
336
  PseudoTcpTestPingPong()
 
337
      : iterations_remaining_(0),
 
338
        sender_(NULL),
 
339
        receiver_(NULL),
 
340
        bytes_per_send_(0) {
 
341
  }
 
342
  void SetBytesPerSend(int bytes) {
 
343
    bytes_per_send_ = bytes;
 
344
  }
 
345
  void TestPingPong(int size, int iterations) {
 
346
    uint32 start, elapsed;
 
347
    iterations_remaining_ = iterations;
 
348
    receiver_ = &remote_;
 
349
    sender_ = &local_;
 
350
    // Create some dummy data to send.
 
351
    send_stream_.ReserveSize(size);
 
352
    for (int i = 0; i < size; ++i) {
 
353
      char ch = static_cast<char>(i);
 
354
      send_stream_.Write(&ch, 1, NULL, NULL);
 
355
    }
 
356
    send_stream_.Rewind();
 
357
    // Prepare the receive stream.
 
358
    recv_stream_.ReserveSize(size);
 
359
    // Connect and wait until connected.
 
360
    start = talk_base::Time();
 
361
    EXPECT_EQ(0, Connect());
 
362
    EXPECT_TRUE_WAIT(have_connected_, kConnectTimeoutMs);
 
363
    // Sending will start from OnTcpWriteable and stop when the required
 
364
    // number of iterations have completed.
 
365
    EXPECT_TRUE_WAIT(have_disconnected_, kTransferTimeoutMs);
 
366
    elapsed = talk_base::TimeSince(start);
 
367
    LOG(LS_INFO) << "Performed " << iterations << " pings in "
 
368
                 << elapsed << " ms";
 
369
  }
 
370
 
 
371
 private:
 
372
  // IPseudoTcpNotify interface
 
373
 
 
374
  virtual void OnTcpReadable(PseudoTcp* tcp) {
 
375
    if (tcp != receiver_) {
 
376
      LOG_F(LS_ERROR) << "unexpected OnTcpReadable";
 
377
      return;
 
378
    }
 
379
    // Stream bytes to the recv stream as they arrive.
 
380
    ReadData();
 
381
    // If we've received the desired amount of data, rewind things
 
382
    // and send it back the other way!
 
383
    size_t position, desired;
 
384
    recv_stream_.GetPosition(&position);
 
385
    send_stream_.GetSize(&desired);
 
386
    if (position == desired) {
 
387
      if (receiver_ == &local_ && --iterations_remaining_ == 0) {
 
388
        Close();
 
389
        // TODO: Fake OnTcpClosed() on the receiver for now.
 
390
        OnTcpClosed(&remote_, 0);
 
391
        return;
 
392
      }
 
393
      PseudoTcp* tmp = receiver_;
 
394
      receiver_ = sender_;
 
395
      sender_ = tmp;
 
396
      recv_stream_.Rewind();
 
397
      send_stream_.Rewind();
 
398
      OnTcpWriteable(sender_);
 
399
    }
 
400
  }
 
401
  virtual void OnTcpWriteable(PseudoTcp* tcp) {
 
402
    if (tcp != sender_)
 
403
      return;
 
404
    // Write bytes from the send stream when we can.
 
405
    // Shut down when we've sent everything.
 
406
    LOG(LS_VERBOSE) << "Flow Control Lifted";
 
407
    WriteData();
 
408
  }
 
409
 
 
410
  void ReadData() {
 
411
    char block[kBlockSize];
 
412
    size_t position;
 
413
    int rcvd;
 
414
    do {
 
415
      rcvd = receiver_->Recv(block, sizeof(block));
 
416
      if (rcvd != -1) {
 
417
        recv_stream_.Write(block, rcvd, NULL, NULL);
 
418
        recv_stream_.GetPosition(&position);
 
419
        LOG(LS_VERBOSE) << "Received: " << position;
 
420
      }
 
421
    } while (rcvd > 0);
 
422
  }
 
423
  void WriteData() {
 
424
    size_t position, tosend;
 
425
    int sent;
 
426
    char block[kBlockSize];
 
427
    do {
 
428
      send_stream_.GetPosition(&position);
 
429
      tosend = bytes_per_send_ ? bytes_per_send_ : sizeof(block);
 
430
      if (send_stream_.Read(block, tosend, &tosend, NULL) !=
 
431
          talk_base::SR_EOS) {
 
432
        sent = sender_->Send(block, tosend);
 
433
        UpdateLocalClock();
 
434
        if (sent != -1) {
 
435
          send_stream_.SetPosition(position + sent);
 
436
          LOG(LS_VERBOSE) << "Sent: " << position + sent;
 
437
        } else {
 
438
          send_stream_.SetPosition(position);
 
439
          LOG(LS_VERBOSE) << "Flow Controlled";
 
440
        }
 
441
      } else {
 
442
        sent = tosend = 0;
 
443
      }
 
444
    } while (sent > 0);
 
445
  }
 
446
 
 
447
 private:
 
448
  int iterations_remaining_;
 
449
  PseudoTcp* sender_;
 
450
  PseudoTcp* receiver_;
 
451
  int bytes_per_send_;
 
452
};
 
453
 
 
454
// Fill the receiver window until it is full, drain it and then
 
455
// fill it with the same amount. This is to test that receiver window
 
456
// contracts and enlarges correctly.
 
457
class PseudoTcpTestReceiveWindow : public PseudoTcpTestBase {
 
458
 public:
 
459
  // Not all the data are transfered, |size| just need to be big enough
 
460
  // to fill up the receiver window twice.
 
461
  void TestTransfer(int size) {
 
462
    // Create some dummy data to send.
 
463
    send_stream_.ReserveSize(size);
 
464
    for (int i = 0; i < size; ++i) {
 
465
      char ch = static_cast<char>(i);
 
466
      send_stream_.Write(&ch, 1, NULL, NULL);
 
467
    }
 
468
    send_stream_.Rewind();
 
469
 
 
470
    // Prepare the receive stream.
 
471
    recv_stream_.ReserveSize(size);
 
472
 
 
473
    // Connect and wait until connected.
 
474
    EXPECT_EQ(0, Connect());
 
475
    EXPECT_TRUE_WAIT(have_connected_, kConnectTimeoutMs);
 
476
 
 
477
    talk_base::Thread::Current()->Post(this, MSG_WRITE);
 
478
    EXPECT_TRUE_WAIT(have_disconnected_, kTransferTimeoutMs);
 
479
 
 
480
    ASSERT_EQ(2u, send_position_.size());
 
481
    ASSERT_EQ(2u, recv_position_.size());
 
482
 
 
483
    const size_t estimated_recv_window = EstimateReceiveWindowSize();
 
484
 
 
485
    // The difference in consecutive send positions should equal the
 
486
    // receive window size or match very closely. This verifies that receive
 
487
    // window is open after receiver drained all the data.
 
488
    const size_t send_position_diff = send_position_[1] - send_position_[0];
 
489
    EXPECT_GE(1024u, estimated_recv_window - send_position_diff);
 
490
 
 
491
    // Receiver drained the receive window twice.
 
492
    EXPECT_EQ(2 * estimated_recv_window, recv_position_[1]);
 
493
  }
 
494
 
 
495
  virtual void OnMessage(talk_base::Message* message) {
 
496
    int message_id = message->message_id;
 
497
    PseudoTcpTestBase::OnMessage(message);
 
498
 
 
499
    switch (message_id) {
 
500
      case MSG_WRITE: {
 
501
        WriteData();
 
502
        break;
 
503
      }
 
504
      default:
 
505
        break;
 
506
    }
 
507
  }
 
508
 
 
509
  uint32 EstimateReceiveWindowSize() const {
 
510
    return recv_position_[0];
 
511
  }
 
512
 
 
513
  uint32 EstimateSendWindowSize() const {
 
514
    return send_position_[0] - recv_position_[0];
 
515
  }
 
516
 
 
517
 private:
 
518
  // IPseudoTcpNotify interface
 
519
  virtual void OnTcpReadable(PseudoTcp* tcp) {
 
520
  }
 
521
 
 
522
  virtual void OnTcpWriteable(PseudoTcp* tcp) {
 
523
  }
 
524
 
 
525
  void ReadUntilIOPending() {
 
526
    char block[kBlockSize];
 
527
    size_t position;
 
528
    int rcvd;
 
529
 
 
530
    do {
 
531
      rcvd = remote_.Recv(block, sizeof(block));
 
532
      if (rcvd != -1) {
 
533
        recv_stream_.Write(block, rcvd, NULL, NULL);
 
534
        recv_stream_.GetPosition(&position);
 
535
        LOG(LS_VERBOSE) << "Received: " << position;
 
536
      }
 
537
    } while (rcvd > 0);
 
538
 
 
539
    recv_stream_.GetPosition(&position);
 
540
    recv_position_.push_back(position);
 
541
 
 
542
    // Disconnect if we have done two transfers.
 
543
    if (recv_position_.size() == 2u) {
 
544
      Close();
 
545
      OnTcpClosed(&remote_, 0);
 
546
    } else {
 
547
      WriteData();
 
548
    }
 
549
  }
 
550
 
 
551
  void WriteData() {
 
552
    size_t position, tosend;
 
553
    int sent;
 
554
    char block[kBlockSize];
 
555
    do {
 
556
      send_stream_.GetPosition(&position);
 
557
      if (send_stream_.Read(block, sizeof(block), &tosend, NULL) !=
 
558
          talk_base::SR_EOS) {
 
559
        sent = local_.Send(block, tosend);
 
560
        UpdateLocalClock();
 
561
        if (sent != -1) {
 
562
          send_stream_.SetPosition(position + sent);
 
563
          LOG(LS_VERBOSE) << "Sent: " << position + sent;
 
564
        } else {
 
565
          send_stream_.SetPosition(position);
 
566
          LOG(LS_VERBOSE) << "Flow Controlled";
 
567
        }
 
568
      } else {
 
569
        sent = tosend = 0;
 
570
      }
 
571
    } while (sent > 0);
 
572
    // At this point, we've filled up the available space in the send queue.
 
573
 
 
574
    int message_queue_size = talk_base::Thread::Current()->size();
 
575
    // The message queue will always have at least 2 messages, an RCLOCK and
 
576
    // an LCLOCK, since they are added back on the delay queue at the same time
 
577
    // they are pulled off and therefore are never really removed.
 
578
    if (message_queue_size > 2) {
 
579
      // If there are non-clock messages remaining, attempt to continue sending
 
580
      // after giving those messages time to process, which should free up the
 
581
      // send buffer.
 
582
      talk_base::Thread::Current()->PostDelayed(10, this, MSG_WRITE);
 
583
    } else {
 
584
      if (!remote_.isReceiveBufferFull()) {
 
585
        LOG(LS_ERROR) << "This shouldn't happen - the send buffer is full, "
 
586
                      << "the receive buffer is not, and there are no "
 
587
                      << "remaining messages to process.";
 
588
      }
 
589
      send_stream_.GetPosition(&position);
 
590
      send_position_.push_back(position);
 
591
 
 
592
      // Drain the receiver buffer.
 
593
      ReadUntilIOPending();
 
594
    }
 
595
  }
 
596
 
 
597
 private:
 
598
  talk_base::MemoryStream send_stream_;
 
599
  talk_base::MemoryStream recv_stream_;
 
600
 
 
601
  std::vector<size_t> send_position_;
 
602
  std::vector<size_t> recv_position_;
 
603
};
 
604
 
 
605
// Basic end-to-end data transfer tests
 
606
 
 
607
// Test the normal case of sending data from one side to the other.
 
608
TEST_F(PseudoTcpTest, TestSend) {
 
609
  SetLocalMtu(1500);
 
610
  SetRemoteMtu(1500);
 
611
  TestTransfer(1000000);
 
612
}
 
613
 
 
614
// Test sending data with a 50 ms RTT. Transmission should take longer due
 
615
// to a slower ramp-up in send rate.
 
616
TEST_F(PseudoTcpTest, TestSendWithDelay) {
 
617
  SetLocalMtu(1500);
 
618
  SetRemoteMtu(1500);
 
619
  SetDelay(50);
 
620
  TestTransfer(1000000);
 
621
}
 
622
 
 
623
// Test sending data with packet loss. Transmission should take much longer due
 
624
// to send back-off when loss occurs.
 
625
TEST_F(PseudoTcpTest, TestSendWithLoss) {
 
626
  SetLocalMtu(1500);
 
627
  SetRemoteMtu(1500);
 
628
  SetLoss(10);
 
629
  TestTransfer(100000);  // less data so test runs faster
 
630
}
 
631
 
 
632
// Test sending data with a 50 ms RTT and 10% packet loss. Transmission should
 
633
// take much longer due to send back-off and slower detection of loss.
 
634
TEST_F(PseudoTcpTest, TestSendWithDelayAndLoss) {
 
635
  SetLocalMtu(1500);
 
636
  SetRemoteMtu(1500);
 
637
  SetDelay(50);
 
638
  SetLoss(10);
 
639
  TestTransfer(100000);  // less data so test runs faster
 
640
}
 
641
 
 
642
// Test sending data with 10% packet loss and Nagling disabled.  Transmission
 
643
// should take about the same time as with Nagling enabled.
 
644
TEST_F(PseudoTcpTest, TestSendWithLossAndOptNaglingOff) {
 
645
  SetLocalMtu(1500);
 
646
  SetRemoteMtu(1500);
 
647
  SetLoss(10);
 
648
  SetOptNagling(false);
 
649
  TestTransfer(100000);  // less data so test runs faster
 
650
}
 
651
 
 
652
// Test sending data with 10% packet loss and Delayed ACK disabled.
 
653
// Transmission should be slightly faster than with it enabled.
 
654
TEST_F(PseudoTcpTest, TestSendWithLossAndOptAckDelayOff) {
 
655
  SetLocalMtu(1500);
 
656
  SetRemoteMtu(1500);
 
657
  SetLoss(10);
 
658
  SetOptAckDelay(0);
 
659
  TestTransfer(100000);
 
660
}
 
661
 
 
662
// Test sending data with 50ms delay and Nagling disabled.
 
663
TEST_F(PseudoTcpTest, TestSendWithDelayAndOptNaglingOff) {
 
664
  SetLocalMtu(1500);
 
665
  SetRemoteMtu(1500);
 
666
  SetDelay(50);
 
667
  SetOptNagling(false);
 
668
  TestTransfer(100000);  // less data so test runs faster
 
669
}
 
670
 
 
671
// Test sending data with 50ms delay and Delayed ACK disabled.
 
672
TEST_F(PseudoTcpTest, TestSendWithDelayAndOptAckDelayOff) {
 
673
  SetLocalMtu(1500);
 
674
  SetRemoteMtu(1500);
 
675
  SetDelay(50);
 
676
  SetOptAckDelay(0);
 
677
  TestTransfer(100000);  // less data so test runs faster
 
678
}
 
679
 
 
680
// Test a large receive buffer with a sender that doesn't support scaling.
 
681
TEST_F(PseudoTcpTest, TestSendRemoteNoWindowScale) {
 
682
  SetLocalMtu(1500);
 
683
  SetRemoteMtu(1500);
 
684
  SetLocalOptRcvBuf(100000);
 
685
  DisableRemoteWindowScale();
 
686
  TestTransfer(1000000);
 
687
}
 
688
 
 
689
// Test a large sender-side receive buffer with a receiver that doesn't support
 
690
// scaling.
 
691
TEST_F(PseudoTcpTest, TestSendLocalNoWindowScale) {
 
692
  SetLocalMtu(1500);
 
693
  SetRemoteMtu(1500);
 
694
  SetRemoteOptRcvBuf(100000);
 
695
  DisableLocalWindowScale();
 
696
  TestTransfer(1000000);
 
697
}
 
698
 
 
699
// Test when both sides use window scaling.
 
700
TEST_F(PseudoTcpTest, TestSendBothUseWindowScale) {
 
701
  SetLocalMtu(1500);
 
702
  SetRemoteMtu(1500);
 
703
  SetRemoteOptRcvBuf(100000);
 
704
  SetLocalOptRcvBuf(100000);
 
705
  TestTransfer(1000000);
 
706
}
 
707
 
 
708
// Test using a large window scale value.
 
709
TEST_F(PseudoTcpTest, TestSendLargeInFlight) {
 
710
  SetLocalMtu(1500);
 
711
  SetRemoteMtu(1500);
 
712
  SetRemoteOptRcvBuf(100000);
 
713
  SetLocalOptRcvBuf(100000);
 
714
  SetOptSndBuf(150000);
 
715
  TestTransfer(1000000);
 
716
}
 
717
 
 
718
TEST_F(PseudoTcpTest, TestSendBothUseLargeWindowScale) {
 
719
  SetLocalMtu(1500);
 
720
  SetRemoteMtu(1500);
 
721
  SetRemoteOptRcvBuf(1000000);
 
722
  SetLocalOptRcvBuf(1000000);
 
723
  TestTransfer(10000000);
 
724
}
 
725
 
 
726
// Test using a small receive buffer.
 
727
TEST_F(PseudoTcpTest, TestSendSmallReceiveBuffer) {
 
728
  SetLocalMtu(1500);
 
729
  SetRemoteMtu(1500);
 
730
  SetRemoteOptRcvBuf(10000);
 
731
  SetLocalOptRcvBuf(10000);
 
732
  TestTransfer(1000000);
 
733
}
 
734
 
 
735
// Test using a very small receive buffer.
 
736
TEST_F(PseudoTcpTest, TestSendVerySmallReceiveBuffer) {
 
737
  SetLocalMtu(1500);
 
738
  SetRemoteMtu(1500);
 
739
  SetRemoteOptRcvBuf(100);
 
740
  SetLocalOptRcvBuf(100);
 
741
  TestTransfer(100000);
 
742
}
 
743
 
 
744
// Ping-pong (request/response) tests
 
745
 
 
746
// Test sending <= 1x MTU of data in each ping/pong.  Should take <10ms.
 
747
TEST_F(PseudoTcpTestPingPong, TestPingPong1xMtu) {
 
748
  SetLocalMtu(1500);
 
749
  SetRemoteMtu(1500);
 
750
  TestPingPong(100, 100);
 
751
}
 
752
 
 
753
// Test sending 2x-3x MTU of data in each ping/pong.  Should take <10ms.
 
754
TEST_F(PseudoTcpTestPingPong, TestPingPong3xMtu) {
 
755
  SetLocalMtu(1500);
 
756
  SetRemoteMtu(1500);
 
757
  TestPingPong(400, 100);
 
758
}
 
759
 
 
760
// Test sending 1x-2x MTU of data in each ping/pong.
 
761
// Should take ~1s, due to interaction between Nagling and Delayed ACK.
 
762
TEST_F(PseudoTcpTestPingPong, TestPingPong2xMtu) {
 
763
  SetLocalMtu(1500);
 
764
  SetRemoteMtu(1500);
 
765
  TestPingPong(2000, 5);
 
766
}
 
767
 
 
768
// Test sending 1x-2x MTU of data in each ping/pong with Delayed ACK off.
 
769
// Should take <10ms.
 
770
TEST_F(PseudoTcpTestPingPong, TestPingPong2xMtuWithAckDelayOff) {
 
771
  SetLocalMtu(1500);
 
772
  SetRemoteMtu(1500);
 
773
  SetOptAckDelay(0);
 
774
  TestPingPong(2000, 100);
 
775
}
 
776
 
 
777
// Test sending 1x-2x MTU of data in each ping/pong with Nagling off.
 
778
// Should take <10ms.
 
779
TEST_F(PseudoTcpTestPingPong, TestPingPong2xMtuWithNaglingOff) {
 
780
  SetLocalMtu(1500);
 
781
  SetRemoteMtu(1500);
 
782
  SetOptNagling(false);
 
783
  TestPingPong(2000, 5);
 
784
}
 
785
 
 
786
// Test sending a ping as pair of short (non-full) segments.
 
787
// Should take ~1s, due to Delayed ACK interaction with Nagling.
 
788
TEST_F(PseudoTcpTestPingPong, TestPingPongShortSegments) {
 
789
  SetLocalMtu(1500);
 
790
  SetRemoteMtu(1500);
 
791
  SetOptAckDelay(5000);
 
792
  SetBytesPerSend(50); // i.e. two Send calls per payload
 
793
  TestPingPong(100, 5);
 
794
}
 
795
 
 
796
// Test sending ping as a pair of short (non-full) segments, with Nagling off.
 
797
// Should take <10ms.
 
798
TEST_F(PseudoTcpTestPingPong, TestPingPongShortSegmentsWithNaglingOff) {
 
799
  SetLocalMtu(1500);
 
800
  SetRemoteMtu(1500);
 
801
  SetOptNagling(false);
 
802
  SetBytesPerSend(50); // i.e. two Send calls per payload
 
803
  TestPingPong(100, 5);
 
804
}
 
805
 
 
806
// Test sending <= 1x MTU of data ping/pong, in two segments, no Delayed ACK.
 
807
// Should take ~1s.
 
808
TEST_F(PseudoTcpTestPingPong, TestPingPongShortSegmentsWithAckDelayOff) {
 
809
  SetLocalMtu(1500);
 
810
  SetRemoteMtu(1500);
 
811
  SetBytesPerSend(50); // i.e. two Send calls per payload
 
812
  SetOptAckDelay(0);
 
813
  TestPingPong(100, 5);
 
814
}
 
815
 
 
816
// Test that receive window expands and contract correctly.
 
817
TEST_F(PseudoTcpTestReceiveWindow, TestReceiveWindow) {
 
818
  SetLocalMtu(1500);
 
819
  SetRemoteMtu(1500);
 
820
  SetOptNagling(false);
 
821
  SetOptAckDelay(0);
 
822
  TestTransfer(1024 * 1000);
 
823
}
 
824
 
 
825
// Test setting send window size to a very small value.
 
826
TEST_F(PseudoTcpTestReceiveWindow, TestSetVerySmallSendWindowSize) {
 
827
  SetLocalMtu(1500);
 
828
  SetRemoteMtu(1500);
 
829
  SetOptNagling(false);
 
830
  SetOptAckDelay(0);
 
831
  SetOptSndBuf(900);
 
832
  TestTransfer(1024 * 1000);
 
833
  EXPECT_EQ(900u, EstimateSendWindowSize());
 
834
}
 
835
 
 
836
// Test setting receive window size to a value other than default.
 
837
TEST_F(PseudoTcpTestReceiveWindow, TestSetReceiveWindowSize) {
 
838
  SetLocalMtu(1500);
 
839
  SetRemoteMtu(1500);
 
840
  SetOptNagling(false);
 
841
  SetOptAckDelay(0);
 
842
  SetRemoteOptRcvBuf(100000);
 
843
  SetLocalOptRcvBuf(100000);
 
844
  TestTransfer(1024 * 1000);
 
845
  EXPECT_EQ(100000u, EstimateReceiveWindowSize());
 
846
}
 
847
 
 
848
/* Test sending data with mismatched MTUs. We should detect this and reduce
 
849
// our packet size accordingly.
 
850
// TODO: This doesn't actually work right now. The current code
 
851
// doesn't detect if the MTU is set too high on either side.
 
852
TEST_F(PseudoTcpTest, TestSendWithMismatchedMtus) {
 
853
  SetLocalMtu(1500);
 
854
  SetRemoteMtu(1280);
 
855
  TestTransfer(1000000);
 
856
}
 
857
*/