3
* Copyright 2006, Google Inc.
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions are met:
8
* 1. Redistributions of source code must retain the above copyright notice,
9
* this list of conditions and the following disclaimer.
10
* 2. Redistributions in binary form must reproduce the above copyright notice,
11
* this list of conditions and the following disclaimer in the documentation
12
* and/or other materials provided with the distribution.
13
* 3. The name of the author may not be used to endorse or promote products
14
* derived from this software without specific prior written permission.
16
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
#include <netinet/in.h>
34
#include "talk/base/logging.h"
35
#include "talk/base/gunit.h"
36
#include "talk/base/testclient.h"
37
#include "talk/base/testutils.h"
38
#include "talk/base/thread.h"
39
#include "talk/base/timeutils.h"
40
#include "talk/base/virtualsocketserver.h"
42
using namespace talk_base;
44
// Sends at a constant rate but with random packet sizes.
45
struct Sender : public MessageHandler {
46
Sender(Thread* th, AsyncSocket* s, uint32 rt)
47
: thread(th), socket(new AsyncUDPSocket(s)),
48
done(false), rate(rt), count(0) {
50
thread->PostDelayed(NextDelay(), this, 1);
54
uint32 size = (rand() % 4096) + 1;
55
return 1000 * size / rate;
58
void OnMessage(Message* pmsg) {
59
ASSERT_EQ(1u, pmsg->message_id);
64
uint32 cur_time = Time();
65
uint32 delay = cur_time - last_send;
66
uint32 size = rate * delay / 1000;
67
size = std::min<uint32>(size, 4096);
68
size = std::max<uint32>(size, sizeof(uint32));
71
memcpy(dummy, &cur_time, sizeof(cur_time));
72
socket->Send(dummy, size);
75
thread->PostDelayed(NextDelay(), this, 1);
79
scoped_ptr<AsyncUDPSocket> socket;
81
uint32 rate; // bytes per second
87
struct Receiver : public MessageHandler, public sigslot::has_slots<> {
88
Receiver(Thread* th, AsyncSocket* s, uint32 bw)
89
: thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false),
90
count(0), sec_count(0), sum(0), sum_sq(0), samples(0) {
91
socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket);
92
thread->PostDelayed(1000, this, 1);
99
void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size,
100
const SocketAddress& remote_addr) {
101
ASSERT_EQ(socket.get(), s);
107
uint32 send_time = *reinterpret_cast<const uint32*>(data);
108
uint32 recv_time = Time();
109
uint32 delay = recv_time - send_time;
111
sum_sq += delay * delay;
115
void OnMessage(Message* pmsg) {
116
ASSERT_EQ(1u, pmsg->message_id);
121
// It is always possible for us to receive more than expected because
122
// packets can be further delayed in delivery.
124
ASSERT_TRUE(sec_count <= 5 * bandwidth / 4);
126
thread->PostDelayed(1000, this, 1);
130
scoped_ptr<AsyncUDPSocket> socket;
140
class VirtualSocketServerTest : public testing::Test {
142
VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)),
143
kIPv4AnyAddress(IPAddress(INADDR_ANY), 0),
144
kIPv6AnyAddress(IPAddress(in6addr_any), 0) {
147
void CheckAddressIncrementalization(const SocketAddress& post,
148
const SocketAddress& pre) {
149
EXPECT_EQ(post.port(), pre.port() + 1);
150
IPAddress post_ip = post.ipaddr();
151
IPAddress pre_ip = pre.ipaddr();
152
EXPECT_EQ(pre_ip.family(), post_ip.family());
153
if (post_ip.family() == AF_INET) {
154
in_addr pre_ipv4 = pre_ip.ipv4_address();
155
in_addr post_ipv4 = post_ip.ipv4_address();
156
int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr);
157
EXPECT_EQ(1, difference);
158
} else if (post_ip.family() == AF_INET6) {
159
in6_addr post_ip6 = post_ip.ipv6_address();
160
in6_addr pre_ip6 = pre_ip.ipv6_address();
161
uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr);
162
uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr);
163
EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1);
167
void BasicTest(const SocketAddress& initial_addr) {
168
AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
169
socket->Bind(initial_addr);
170
SocketAddress server_addr = socket->GetLocalAddress();
171
// Make sure VSS didn't switch families on us.
172
EXPECT_EQ(server_addr.ipaddr().family(),
173
initial_addr.ipaddr().family());
175
TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
176
AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM);
177
TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
179
SocketAddress client2_addr;
180
EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
181
EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
183
SocketAddress client1_addr;
184
EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
185
EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
186
EXPECT_EQ(client1_addr, server_addr);
188
for (int i = 0; i < 10; i++) {
189
client2 = new TestClient(AsyncUDPSocket::Create(ss_, SocketAddress()));
191
SocketAddress next_client2_addr;
192
EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
193
EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr));
194
CheckAddressIncrementalization(next_client2_addr, client2_addr);
195
// EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1);
197
SocketAddress server_addr2;
198
EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr));
199
EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2));
200
EXPECT_EQ(server_addr2, server_addr);
202
client2_addr = next_client2_addr;
206
// initial_addr should be made from either INADDR_ANY or in6addr_any.
207
void ConnectTest(const SocketAddress& initial_addr) {
208
testing::StreamSink sink;
209
SocketAddress accept_addr;
210
const SocketAddress kEmptyAddr;
213
AsyncSocket* client = ss_->CreateAsyncSocket(SOCK_STREAM);
214
sink.Monitor(client);
215
EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
216
EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr);
219
AsyncSocket* server = ss_->CreateAsyncSocket(SOCK_STREAM);
220
sink.Monitor(server);
221
EXPECT_NE(0, server->Listen(5)); // Bind required
222
EXPECT_EQ(0, server->Bind(initial_addr));
223
EXPECT_EQ(server->GetLocalAddress().ipaddr().family(),
224
initial_addr.ipaddr().family());
225
EXPECT_EQ(0, server->Listen(5));
226
EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING);
228
// No pending server connections
229
EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
230
EXPECT_TRUE(NULL == server->Accept(&accept_addr));
231
EXPECT_EQ(accept_addr, kEmptyAddr);
233
// Attempt connect to listening socket
234
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
235
EXPECT_NE(client->GetLocalAddress(), kEmptyAddr); // Implicit Bind
236
EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress());
238
// Client is connecting
239
EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
240
EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
241
EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
243
ss_->ProcessMessagesUntilIdle();
245
// Client still connecting
246
EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
247
EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
248
EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
250
// Server has pending connection
251
EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
252
Socket* accepted = server->Accept(&accept_addr);
253
EXPECT_TRUE(NULL != accepted);
254
EXPECT_NE(accept_addr, kEmptyAddr);
255
EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
257
EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
258
EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress());
259
EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
261
ss_->ProcessMessagesUntilIdle();
263
// Client has connected
264
EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED);
265
EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
266
EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
267
EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
268
EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
271
void ConnectToNonListenerTest(const SocketAddress& initial_addr) {
272
testing::StreamSink sink;
273
SocketAddress accept_addr;
274
const SocketAddress kEmptyAddr;
277
AsyncSocket* client = ss_->CreateAsyncSocket(SOCK_STREAM);
278
sink.Monitor(client);
281
AsyncSocket* server = ss_->CreateAsyncSocket(SOCK_STREAM);
282
sink.Monitor(server);
283
EXPECT_EQ(0, server->Bind(initial_addr));
284
EXPECT_EQ(server->GetLocalAddress().ipaddr().family(),
285
initial_addr.ipaddr().family());
286
// Attempt connect to non-listening socket
287
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
289
ss_->ProcessMessagesUntilIdle();
291
// No pending server connections
292
EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
293
EXPECT_TRUE(NULL == server->Accept(&accept_addr));
294
EXPECT_EQ(accept_addr, kEmptyAddr);
297
EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
298
EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
299
EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
300
EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr);
303
void CloseDuringConnectTest(const SocketAddress& initial_addr) {
304
testing::StreamSink sink;
305
SocketAddress accept_addr;
306
const SocketAddress kEmptyAddr;
308
// Create client and server
309
AsyncSocket* client = ss_->CreateAsyncSocket(SOCK_STREAM);
310
sink.Monitor(client);
311
AsyncSocket* server = ss_->CreateAsyncSocket(SOCK_STREAM);
312
sink.Monitor(server);
315
EXPECT_EQ(0, server->Bind(initial_addr));
316
EXPECT_EQ(server->GetLocalAddress().ipaddr().family(),
317
initial_addr.ipaddr().family());
319
EXPECT_EQ(0, server->Listen(5));
320
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
322
// Server close before socket enters accept queue
323
EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
326
ss_->ProcessMessagesUntilIdle();
328
// Result: connection failed
329
EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
330
EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
334
server = ss_->CreateAsyncSocket(SOCK_STREAM);
335
sink.Monitor(server);
338
EXPECT_EQ(0, server->Bind(initial_addr));
339
EXPECT_EQ(server->GetLocalAddress().ipaddr().family(),
340
initial_addr.ipaddr().family());
342
EXPECT_EQ(0, server->Listen(5));
343
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
345
ss_->ProcessMessagesUntilIdle();
347
// Server close while socket is in accept queue
348
EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
351
ss_->ProcessMessagesUntilIdle();
353
// Result: connection failed
354
EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
355
EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
359
server = ss_->CreateAsyncSocket(SOCK_STREAM);
360
sink.Monitor(server);
363
EXPECT_EQ(0, server->Bind(initial_addr));
364
EXPECT_EQ(server->GetLocalAddress().ipaddr().family(),
365
initial_addr.ipaddr().family());
367
EXPECT_EQ(0, server->Listen(5));
368
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
370
ss_->ProcessMessagesUntilIdle();
372
// Server accepts connection
373
EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
374
AsyncSocket* accepted = server->Accept(&accept_addr);
375
ASSERT_TRUE(NULL != accepted);
376
sink.Monitor(accepted);
378
// Client closes before connection complets
379
EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
381
// Connected message has not been processed yet.
382
EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
385
ss_->ProcessMessagesUntilIdle();
387
// Result: accepted socket closes
388
EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED);
389
EXPECT_TRUE(sink.Check(accepted, testing::SSE_CLOSE));
390
EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
393
void CloseTest(const SocketAddress& initial_addr) {
394
testing::StreamSink sink;
395
const SocketAddress kEmptyAddr;
398
AsyncSocket* a = ss_->CreateAsyncSocket(SOCK_STREAM);
400
a->Bind(initial_addr);
401
EXPECT_EQ(a->GetLocalAddress().ipaddr().family(),
402
initial_addr.ipaddr().family());
405
AsyncSocket* b = ss_->CreateAsyncSocket(SOCK_STREAM);
407
b->Bind(initial_addr);
408
EXPECT_EQ(b->GetLocalAddress().ipaddr().family(),
409
initial_addr.ipaddr().family());
411
EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
412
EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
414
ss_->ProcessMessagesUntilIdle();
416
EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN));
417
EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED);
418
EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress());
420
EXPECT_TRUE(sink.Check(b, testing::SSE_OPEN));
421
EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED);
422
EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress());
424
EXPECT_EQ(1, a->Send("a", 1));
426
EXPECT_EQ(1, a->Send("b", 1));
428
ss_->ProcessMessagesUntilIdle();
431
EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
432
EXPECT_EQ(-1, b->Recv(buffer, 10));
434
EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE));
435
EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED);
436
EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr);
438
EXPECT_FALSE(sink.Check(b, testing::SSE_CLOSE)); // No signal for Closer
439
EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED);
440
EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr);
443
void TcpSendTest(const SocketAddress& initial_addr) {
444
testing::StreamSink sink;
445
const SocketAddress kEmptyAddr;
447
// Connect two sockets
448
AsyncSocket* a = ss_->CreateAsyncSocket(SOCK_STREAM);
450
a->Bind(initial_addr);
451
EXPECT_EQ(a->GetLocalAddress().ipaddr().family(),
452
initial_addr.ipaddr().family());
454
AsyncSocket* b = ss_->CreateAsyncSocket(SOCK_STREAM);
456
b->Bind(initial_addr);
457
EXPECT_EQ(b->GetLocalAddress().ipaddr().family(),
458
initial_addr.ipaddr().family());
460
EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
461
EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
463
ss_->ProcessMessagesUntilIdle();
465
const size_t kBufferSize = 2000;
466
ss_->set_send_buffer_capacity(kBufferSize);
467
ss_->set_recv_buffer_capacity(kBufferSize);
469
const size_t kDataSize = 5000;
470
char send_buffer[kDataSize], recv_buffer[kDataSize];
471
for (size_t i = 0; i < kDataSize; ++i)
472
send_buffer[i] = static_cast<char>(i % 256);
473
memset(recv_buffer, 0, sizeof(recv_buffer));
474
size_t send_pos = 0, recv_pos = 0;
476
// Can't send more than send buffer in one write
477
int result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
478
EXPECT_EQ(static_cast<int>(kBufferSize), result);
481
ss_->ProcessMessagesUntilIdle();
482
EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
483
EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
485
// Receive buffer is already filled, fill send buffer again
486
result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
487
EXPECT_EQ(static_cast<int>(kBufferSize), result);
490
ss_->ProcessMessagesUntilIdle();
491
EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
492
EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
494
// No more room in send or receive buffer
495
result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
496
EXPECT_EQ(-1, result);
497
EXPECT_TRUE(a->IsBlocking());
499
// Read a subset of the data
500
result = b->Recv(recv_buffer + recv_pos, 500);
501
EXPECT_EQ(500, result);
504
ss_->ProcessMessagesUntilIdle();
505
EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE));
506
EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
508
// Room for more on the sending side
509
result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
510
EXPECT_EQ(500, result);
513
// Empty the recv buffer
515
result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
517
EXPECT_EQ(-1, result);
518
EXPECT_TRUE(b->IsBlocking());
524
ss_->ProcessMessagesUntilIdle();
525
EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
527
// Continue to empty the recv buffer
529
result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
531
EXPECT_EQ(-1, result);
532
EXPECT_TRUE(b->IsBlocking());
538
// Send last of the data
539
result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
540
EXPECT_EQ(500, result);
543
ss_->ProcessMessagesUntilIdle();
544
EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
546
// Receive the last of the data
548
result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
550
EXPECT_EQ(-1, result);
551
EXPECT_TRUE(b->IsBlocking());
557
ss_->ProcessMessagesUntilIdle();
558
EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
560
// The received data matches the sent data
561
EXPECT_EQ(kDataSize, send_pos);
562
EXPECT_EQ(kDataSize, recv_pos);
563
EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize));
566
void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) {
567
const SocketAddress kEmptyAddr;
569
// Connect two sockets
570
AsyncSocket* a = ss_->CreateAsyncSocket(SOCK_STREAM);
571
AsyncSocket* b = ss_->CreateAsyncSocket(SOCK_STREAM);
572
a->Bind(initial_addr);
573
EXPECT_EQ(a->GetLocalAddress().ipaddr().family(),
574
initial_addr.ipaddr().family());
576
b->Bind(initial_addr);
577
EXPECT_EQ(b->GetLocalAddress().ipaddr().family(),
578
initial_addr.ipaddr().family());
580
EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
581
EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
582
ss_->ProcessMessagesUntilIdle();
584
// First, deliver all packets in 0 ms.
585
char buffer[2] = { 0, 0 };
586
const char cNumPackets = 10;
587
for (char i = 0; i < cNumPackets; ++i) {
589
EXPECT_EQ(1, a->Send(buffer, 1));
592
ss_->ProcessMessagesUntilIdle();
594
for (char i = 0; i < cNumPackets; ++i) {
595
EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
596
EXPECT_EQ(static_cast<char>('0' + i), buffer[0]);
599
// Next, deliver packets at random intervals
600
const uint32 mean = 50;
601
const uint32 stddev = 50;
603
ss_->set_delay_mean(mean);
604
ss_->set_delay_stddev(stddev);
605
ss_->UpdateDelayDistribution();
607
for (char i = 0; i < cNumPackets; ++i) {
609
EXPECT_EQ(1, a->Send(buffer, 1));
612
ss_->ProcessMessagesUntilIdle();
614
for (char i = 0; i < cNumPackets; ++i) {
615
EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
616
EXPECT_EQ(static_cast<char>('A' + i), buffer[0]);
620
void BandwidthTest(const SocketAddress& initial_addr) {
621
AsyncSocket* send_socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
622
AsyncSocket* recv_socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
623
ASSERT_EQ(0, send_socket->Bind(initial_addr));
624
ASSERT_EQ(0, recv_socket->Bind(initial_addr));
625
EXPECT_EQ(send_socket->GetLocalAddress().ipaddr().family(),
626
initial_addr.ipaddr().family());
627
EXPECT_EQ(recv_socket->GetLocalAddress().ipaddr().family(),
628
initial_addr.ipaddr().family());
629
ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
631
uint32 bandwidth = 64 * 1024;
632
ss_->set_bandwidth(bandwidth);
634
Thread* pthMain = Thread::Current();
635
Sender sender(pthMain, send_socket, 80 * 1024);
636
Receiver receiver(pthMain, recv_socket, bandwidth);
638
pthMain->ProcessMessages(5000);
640
pthMain->ProcessMessages(5000);
642
ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4);
643
ASSERT_TRUE(receiver.count <= 6 * bandwidth); // queue could drain for 1s
645
ss_->set_bandwidth(0);
648
void DelayTest(const SocketAddress& initial_addr) {
649
time_t seed = ::time(NULL);
650
LOG(LS_VERBOSE) << "seed = " << seed;
651
srand(static_cast<unsigned int>(seed));
653
const uint32 mean = 2000;
654
const uint32 stddev = 500;
656
ss_->set_delay_mean(mean);
657
ss_->set_delay_stddev(stddev);
658
ss_->UpdateDelayDistribution();
660
AsyncSocket* send_socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
661
AsyncSocket* recv_socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
662
ASSERT_EQ(0, send_socket->Bind(initial_addr));
663
ASSERT_EQ(0, recv_socket->Bind(initial_addr));
664
EXPECT_EQ(send_socket->GetLocalAddress().ipaddr().family(),
665
initial_addr.ipaddr().family());
666
EXPECT_EQ(recv_socket->GetLocalAddress().ipaddr().family(),
667
initial_addr.ipaddr().family());
668
ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
670
Thread* pthMain = Thread::Current();
671
// Avg packet size is 2K, so at 200KB/s for 10s, we should see about
672
// 1000 packets, which is necessary to get a good distribution.
673
Sender sender(pthMain, send_socket, 100 * 2 * 1024);
674
Receiver receiver(pthMain, recv_socket, 0);
676
pthMain->ProcessMessages(10000);
677
sender.done = receiver.done = true;
678
ss_->ProcessMessagesUntilIdle();
680
const double sample_mean = receiver.sum / receiver.samples;
682
receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum;
683
double den = receiver.samples * (receiver.samples - 1);
684
const double sample_stddev = std::sqrt(num / den);
685
LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev;
687
EXPECT_LE(500u, receiver.samples);
688
// We initially used a 0.1 fudge factor, but on the build machine, we
689
// have seen the value differ by as much as 0.13.
690
EXPECT_NEAR(mean, sample_mean, 0.15 * mean);
691
EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev);
693
ss_->set_delay_mean(0);
694
ss_->set_delay_stddev(0);
695
ss_->UpdateDelayDistribution();
698
// Test cross-family communication between a client bound to client_addr and a
699
// server bound to server_addr. shouldSucceed indicates if communication is
700
// expected to work or not.
701
void CrossFamilyConnectionTest(const SocketAddress& client_addr,
702
const SocketAddress& server_addr,
703
bool shouldSucceed) {
704
testing::StreamSink sink;
705
SocketAddress accept_address;
706
const SocketAddress kEmptyAddr;
708
// Client gets a IPv4 address
709
AsyncSocket* client = ss_->CreateAsyncSocket(SOCK_STREAM);
710
sink.Monitor(client);
711
EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
712
EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr);
713
client->Bind(client_addr);
715
// Server gets a non-mapped non-any IPv6 address.
716
// IPv4 sockets should not be able to connect to this.
717
AsyncSocket* server = ss_->CreateAsyncSocket(SOCK_STREAM);
718
sink.Monitor(server);
719
server->Bind(server_addr);
723
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
724
ss_->ProcessMessagesUntilIdle();
725
EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
726
Socket* accepted = server->Accept(&accept_address);
727
EXPECT_TRUE(NULL != accepted);
728
EXPECT_NE(kEmptyAddr, accept_address);
729
ss_->ProcessMessagesUntilIdle();
730
EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
731
EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
733
// Check that the connection failed.
734
EXPECT_EQ(-1, client->Connect(server->GetLocalAddress()));
735
ss_->ProcessMessagesUntilIdle();
737
EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
738
EXPECT_TRUE(NULL == server->Accept(&accept_address));
739
EXPECT_EQ(accept_address, kEmptyAddr);
740
EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
741
EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
742
EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr);
746
// Test cross-family datagram sending between a client bound to client_addr
747
// and a server bound to server_addr. shouldSucceed indicates if sending is
748
// expected to succed or not.
749
void CrossFamilyDatagramTest(const SocketAddress& client_addr,
750
const SocketAddress& server_addr,
751
bool shouldSucceed) {
752
AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
753
socket->Bind(server_addr);
754
SocketAddress bound_server_addr = socket->GetLocalAddress();
755
TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
757
AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM);
758
socket2->Bind(client_addr);
759
TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
760
SocketAddress client2_addr;
763
EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr));
764
EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
765
SocketAddress client1_addr;
766
EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
767
EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
768
EXPECT_EQ(client1_addr, bound_server_addr);
770
EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr));
771
EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0));
776
virtual void SetUp() {
777
Thread::Current()->set_socketserver(ss_);
779
virtual void TearDown() {
780
Thread::Current()->set_socketserver(NULL);
783
VirtualSocketServer* ss_;
784
const SocketAddress kIPv4AnyAddress;
785
const SocketAddress kIPv6AnyAddress;
788
TEST_F(VirtualSocketServerTest, basic_v4) {
789
SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000);
790
BasicTest(ipv4_test_addr);
793
TEST_F(VirtualSocketServerTest, basic_v6) {
794
SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000);
795
BasicTest(ipv6_test_addr);
798
TEST_F(VirtualSocketServerTest, connect_v4) {
799
ConnectTest(kIPv4AnyAddress);
802
TEST_F(VirtualSocketServerTest, connect_v6) {
803
ConnectTest(kIPv6AnyAddress);
806
TEST_F(VirtualSocketServerTest, connect_to_non_listener_v4) {
807
ConnectToNonListenerTest(kIPv4AnyAddress);
810
TEST_F(VirtualSocketServerTest, connect_to_non_listener_v6) {
811
ConnectToNonListenerTest(kIPv6AnyAddress);
814
TEST_F(VirtualSocketServerTest, close_during_connect_v4) {
815
CloseDuringConnectTest(kIPv4AnyAddress);
818
TEST_F(VirtualSocketServerTest, close_during_connect_v6) {
819
CloseDuringConnectTest(kIPv6AnyAddress);
822
TEST_F(VirtualSocketServerTest, close_v4) {
823
CloseTest(kIPv4AnyAddress);
826
TEST_F(VirtualSocketServerTest, close_v6) {
827
CloseTest(kIPv6AnyAddress);
830
TEST_F(VirtualSocketServerTest, tcp_send_v4) {
831
TcpSendTest(kIPv4AnyAddress);
834
TEST_F(VirtualSocketServerTest, tcp_send_v6) {
835
TcpSendTest(kIPv6AnyAddress);
838
TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) {
839
TcpSendsPacketsInOrderTest(kIPv4AnyAddress);
842
TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) {
843
TcpSendsPacketsInOrderTest(kIPv6AnyAddress);
846
TEST_F(VirtualSocketServerTest, bandwidth_v4) {
847
SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
848
BandwidthTest(ipv4_test_addr);
851
TEST_F(VirtualSocketServerTest, bandwidth_v6) {
852
SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
853
BandwidthTest(ipv6_test_addr);
856
TEST_F(VirtualSocketServerTest, delay_v4) {
857
SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
858
DelayTest(ipv4_test_addr);
861
TEST_F(VirtualSocketServerTest, delay_v6) {
862
SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
863
DelayTest(ipv6_test_addr);
866
// Works, receiving socket sees 127.0.0.2.
867
TEST_F(VirtualSocketServerTest, CanConnectFromMappedIPv6ToIPv4Any) {
868
CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0),
869
SocketAddress("0.0.0.0", 5000),
874
TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToIPv4Any) {
875
CrossFamilyConnectionTest(SocketAddress("::2", 0),
876
SocketAddress("0.0.0.0", 5000),
881
TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToMappedIPv6) {
882
CrossFamilyConnectionTest(SocketAddress("::2", 0),
883
SocketAddress("::ffff:127.0.0.1", 5000),
887
// Works. receiving socket sees ::ffff:127.0.0.2.
888
TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToIPv6Any) {
889
CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
890
SocketAddress("::", 5000),
895
TEST_F(VirtualSocketServerTest, CantConnectFromIPv4ToUnMappedIPv6) {
896
CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
897
SocketAddress("::1", 5000),
901
// Works. Receiving socket sees ::ffff:127.0.0.1.
902
TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToMappedIPv6) {
903
CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0),
904
SocketAddress("::ffff:127.0.0.2", 5000),
908
// Works, receiving socket sees a result from GetNextIP.
909
TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv6ToIPv4Any) {
910
CrossFamilyConnectionTest(SocketAddress("::", 0),
911
SocketAddress("0.0.0.0", 5000),
915
// Works, receiving socket sees whatever GetNextIP gave the client.
916
TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv4ToIPv6Any) {
917
CrossFamilyConnectionTest(SocketAddress(),
918
SocketAddress("::", 5000),
922
TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv4ToIPv6Any) {
923
CrossFamilyDatagramTest(SocketAddress(),
924
SocketAddress("::", 5000),
928
TEST_F(VirtualSocketServerTest, CanSendDatagramFromMappedIPv6ToIPv4Any) {
929
CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0),
930
SocketAddress("0.0.0.0", 5000),
934
TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToIPv4Any) {
935
CrossFamilyDatagramTest(SocketAddress("::2", 0),
936
SocketAddress("0.0.0.0", 5000),
940
TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToMappedIPv6) {
941
CrossFamilyDatagramTest(SocketAddress("::2", 0),
942
SocketAddress("::ffff:127.0.0.1", 5000),
946
TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToIPv6Any) {
947
CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
948
SocketAddress("::", 5000),
952
TEST_F(VirtualSocketServerTest, CantSendDatagramFromIPv4ToUnMappedIPv6) {
953
CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
954
SocketAddress("::1", 5000),
958
TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToMappedIPv6) {
959
CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0),
960
SocketAddress("::ffff:127.0.0.2", 5000),
964
TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) {
965
CrossFamilyDatagramTest(SocketAddress("::", 0),
966
SocketAddress("0.0.0.0", 5000),
970
TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) {
971
const uint32 kTestMean[] = { 10, 100, 333, 1000 };
972
const double kTestDev[] = { 0.25, 0.1, 0.01 };
973
// TODO: The current code only works for 1000 data points or more.
974
const uint32 kTestSamples[] = { /*10, 100,*/ 1000 };
975
for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) {
976
for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) {
977
for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) {
978
ASSERT_LT(0u, kTestSamples[sidx]);
979
const uint32 kStdDev =
980
static_cast<uint32>(kTestDev[didx] * kTestMean[midx]);
981
VirtualSocketServer::Function* f =
982
VirtualSocketServer::CreateDistribution(kTestMean[midx],
985
ASSERT_TRUE(NULL != f);
986
ASSERT_EQ(kTestSamples[sidx], f->size());
988
for (uint32 i = 0; i < f->size(); ++i) {
989
sum += (*f)[i].second;
991
const double mean = sum / f->size();
992
double sum_sq_dev = 0;
993
for (uint32 i = 0; i < f->size(); ++i) {
994
double dev = (*f)[i].second - mean;
995
sum_sq_dev += dev * dev;
997
const double stddev = std::sqrt(sum_sq_dev / f->size());
998
EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx])
999
<< "M=" << kTestMean[midx]
1000
<< " SD=" << kStdDev
1001
<< " N=" << kTestSamples[sidx];
1002
EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev)
1003
<< "M=" << kTestMean[midx]
1004
<< " SD=" << kStdDev
1005
<< " N=" << kTestSamples[sidx];