2
* This file is part of the Nice GLib ICE library.
4
* (C) 2014 Collabora Ltd.
5
* Contact: Philip Withnall
7
* The contents of this file are subject to the Mozilla Public License Version
8
* 1.1 (the "License"); you may not use this file except in compliance with
9
* the License. You may obtain a copy of the License at
10
* http://www.mozilla.org/MPL/
12
* Software distributed under the License is distributed on an "AS IS" basis,
13
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14
* for the specific language governing rights and limitations under the
17
* The Original Code is the Nice GLib ICE library.
19
* The Initial Developers of the Original Code are Collabora Ltd and Nokia
20
* Corporation. All Rights Reserved.
23
* Philip Withnall, Collabora Ltd.
25
* Alternatively, the contents of this file may be used under the terms of the
26
* the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
27
* case the provisions of LGPL are applicable instead of those above. If you
28
* wish to allow use of your version of this file only under the terms of the
29
* LGPL and not to allow others to use your version of this file under the
30
* MPL, indicate your decision by deleting the provisions above and replace
31
* them with the notice and other provisions required by the LGPL. If you do
32
* not delete the provisions above, a recipient may use your version of this
33
* file under either the MPL or the LGPL.
37
* This is a comprehensive unit test for send() and recv() behaviour in libnice,
38
* covering all APIs except the old nice_agent_attach_recv() one. It aims to
39
* test the correctness of reliable and non-reliable I/O through libnice, using
40
* a variety of data and a variety of buffer sizes.
42
* Abnormal features like error handling, zero-length buffer handling, stream
43
* closure and cancellation are not tested.
45
* This is *not* a performance test, and would require significant work to be
46
* useful as one. It allocates all of its buffers dynamically, and walks over
47
* them frequently to set and check data.
49
* Several of the strategies in the test make use of random numbers. The seed
50
* values for these are deterministically set (in main()), but may be specified
51
* on the command line to allow fuzzing.
59
#include "test-io-stream-common.h"
67
/* Maximum IP payload ((1 << 16) - 1), minus IP header, minus UDP header. */
68
#define MAX_MESSAGE_SIZE (65535 - 20 - 8) /* bytes */
71
STREAM_AGENT, /* nice_agent_[send|recv]() */
72
STREAM_AGENT_NONBLOCKING, /* nice_agent_[send|recv]_nonblocking() */
73
STREAM_GIO, /* Nice[Input|Output]Stream */
74
STREAM_GSOURCE, /* GPollable[Input|Output]Stream */
76
#define STREAM_API_N_ELEMENTS (STREAM_GSOURCE + 1)
79
BUFFER_SIZE_CONSTANT_LARGE, /* always 65535 bytes */
80
BUFFER_SIZE_CONSTANT_SMALL, /* always 4096 bytes */
81
BUFFER_SIZE_CONSTANT_TINY, /* always 1 byte */
82
BUFFER_SIZE_ASCENDING, /* ascending powers of 2 */
83
BUFFER_SIZE_RANDOM, /* random every time */
85
#define BUFFER_SIZE_STRATEGY_N_ELEMENTS (BUFFER_SIZE_RANDOM + 1)
88
BUFFER_COUNT_CONSTANT_ONE, /* always a single buffer */
89
BUFFER_COUNT_CONSTANT_TWO, /* always two buffers */
90
BUFFER_COUNT_RANDOM, /* random every time */
91
} BufferCountStrategy;
92
#define BUFFER_COUNT_STRATEGY_N_ELEMENTS (BUFFER_COUNT_RANDOM + 1)
95
MESSAGE_COUNT_CONSTANT_ONE, /* always a single message */
96
MESSAGE_COUNT_CONSTANT_TWO, /* always two messages */
97
MESSAGE_COUNT_RANDOM, /* random every time */
98
} MessageCountStrategy;
99
#define MESSAGE_COUNT_STRATEGY_N_ELEMENTS (MESSAGE_COUNT_RANDOM + 1)
102
BUFFER_DATA_CONSTANT, /* fill with 0xfe */
103
BUFFER_DATA_ASCENDING, /* ascending values for each byte */
104
BUFFER_DATA_PSEUDO_RANDOM, /* every byte is pseudo-random */
105
} BufferDataStrategy;
106
#define BUFFER_DATA_STRATEGY_N_ELEMENTS (BUFFER_DATA_PSEUDO_RANDOM + 1)
109
/* Test configuration (immutable per test run). */
111
StreamApi stream_api;
113
BufferSizeStrategy buffer_size_strategy;
114
BufferCountStrategy buffer_count_strategy;
115
MessageCountStrategy message_count_strategy;
118
BufferSizeStrategy buffer_size_strategy;
119
BufferCountStrategy buffer_count_strategy;
120
MessageCountStrategy message_count_strategy;
122
BufferDataStrategy buffer_data_strategy;
127
GRand *transmit_size_rand;
128
GRand *receive_size_rand;
129
gsize transmitted_bytes;
130
gsize received_bytes;
131
gsize *other_received_bytes;
132
guint transmitted_messages;
133
guint received_messages;
134
guint *other_received_messages;
137
/* Whether @stream_api is blocking (vs. non-blocking). */
139
stream_api_is_blocking (StreamApi stream_api)
141
switch (stream_api) {
145
case STREAM_AGENT_NONBLOCKING:
149
g_assert_not_reached ();
153
/* Whether @stream_api only works for reliable NiceAgents. */
155
stream_api_is_reliable_only (StreamApi stream_api)
157
switch (stream_api) {
162
case STREAM_AGENT_NONBLOCKING:
165
g_assert_not_reached ();
169
/* Whether @stream_api supports vectored I/O (multiple buffers or messages). */
171
stream_api_supports_vectored_io (StreamApi stream_api)
173
switch (stream_api) {
175
case STREAM_AGENT_NONBLOCKING:
181
g_assert_not_reached ();
185
/* Generate a size for the buffer containing the @buffer_offset-th byte.
186
* Guaranteed to be in the interval [1, 1 << 16). ((1 << 16) is the maximum
189
generate_buffer_size (BufferSizeStrategy strategy, GRand *grand,
193
case BUFFER_SIZE_CONSTANT_LARGE:
194
return (1 << 16) - 1;
196
case BUFFER_SIZE_CONSTANT_SMALL:
199
case BUFFER_SIZE_CONSTANT_TINY:
202
case BUFFER_SIZE_ASCENDING:
203
return CLAMP (1L << buffer_offset, 1, (1 << 16) - 1);
205
case BUFFER_SIZE_RANDOM:
206
return g_rand_int_range (grand, 1, 1 << 16);
209
g_assert_not_reached ();
213
/* Generate a number of buffers to allocate when receiving the @buffer_offset-th
214
* byte. Guaranteed to be in the interval [1, 100], where 100 was chosen
217
generate_buffer_count (BufferCountStrategy strategy, GRand *grand,
221
case BUFFER_COUNT_CONSTANT_ONE:
224
case BUFFER_COUNT_CONSTANT_TWO:
227
case BUFFER_COUNT_RANDOM:
228
return g_rand_int_range (grand, 1, 100 + 1);
231
g_assert_not_reached ();
235
/* Generate a number of messages to allocate and receive into when receiving the
236
* @buffer_offset-th byte. Guaranteed to be in the interval [1, 100], where 100
237
* was chosen arbitrarily.*/
239
generate_message_count (MessageCountStrategy strategy, GRand *grand,
243
case MESSAGE_COUNT_CONSTANT_ONE:
246
case MESSAGE_COUNT_CONSTANT_TWO:
249
case MESSAGE_COUNT_RANDOM:
250
return g_rand_int_range (grand, 1, 100 + 1);
253
g_assert_not_reached ();
257
/* Fill the given @buf with @buf_len bytes of generated data. The data is
258
* deterministically generated, so that:
259
* generate_buffer_data(_, I, buf, 2)
261
* generate_buffer_data(_, I+1, buf+1, 1)
262
* generate the same buf[I+1] byte, for all I.
264
* The generation strategies are generally chosen to produce data which makes
265
* send/receive errors (insertions, swaps, elisions) obvious. */
267
generate_buffer_data (BufferDataStrategy strategy, gsize buffer_offset,
268
guint8 *buf, gsize buf_len)
271
case BUFFER_DATA_CONSTANT:
272
memset (buf, 0xfe, buf_len);
275
case BUFFER_DATA_ASCENDING: {
278
for (i = 0; i < buf_len; i++) {
279
buf[i] = (i + buffer_offset) & 0xff;
285
case BUFFER_DATA_PSEUDO_RANDOM: {
288
/* This can’t use GRand, because then the number of calls to g_rand_*()
289
* methods would affect its output, and the bytes generated here have to be
290
* entirely deterministic on @buffer_offset.
292
* Instead, use something akin to a LCG, except without any feedback
293
* (because that would make it non-deterministic). The objective is to
294
* generate numbers which are sufficiently pseudo-random that it’s likely
295
* transpositions, elisions and insertions will be detected.
297
* The constants come from ‘ANSI C’ in:
298
* http://en.wikipedia.org/wiki/Linear_congruential_generator
300
for (i = 0; i < buf_len; i++) {
301
buf[i] = (1103515245 * (buffer_offset + i) + 12345) & 0xff;
308
g_assert_not_reached ();
312
/* Choose a size and allocate a receive buffer in @buf, ready to receive bytes
313
* starting at @buffer_offset into the stream. Fill the buffer with poison
314
* values to hopefully make incorrect writes/reads more obvious.
316
* @buf must be freed with g_free(). */
318
generate_buffer_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
319
guint8 **buf, gsize *buf_len)
321
TestData *test_data = data->user_data;
323
/* Allocate the buffer. */
324
*buf_len = generate_buffer_size (test_data->receive.buffer_size_strategy,
325
test_data->receive_size_rand, buffer_offset);
326
*buf = g_malloc (*buf_len);
328
/* Fill it with poison to try and detect incorrect writes. */
329
memset (*buf, 0xaa, *buf_len);
332
/* Similar to generate_buffer_to_receive(), but generate an entire message array
333
* with multiple buffers instead.
335
* @max_buffer_size may be used to limit the total size of all the buffers in
336
* all the messages, for example to avoid blocking on receiving data which will
337
* never be sent. This only applies for blocking, reliable stream APIs.
339
* @max_n_messages may be used to limit the number of messages generated, to
340
* avoid blocking on receiving messages which will never be sent. This only
341
* applies for blocking, non-reliable stream APIs.
343
* @messages must be freed with g_free(), as must all of the buffer arrays and
344
* the buffers themselves. */
346
generate_messages_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
347
NiceInputMessage **messages, guint *n_messages, gsize max_buffer_size,
348
guint max_n_messages)
350
TestData *test_data = data->user_data;
353
/* Allocate the messages. */
355
generate_message_count (test_data->receive.message_count_strategy,
356
test_data->receive_size_rand, buffer_offset);
359
*n_messages = MIN (*n_messages, max_n_messages);
361
*messages = g_malloc_n (*n_messages, sizeof (NiceInputMessage));
363
for (i = 0; i < *n_messages; i++) {
364
NiceInputMessage *message = &((*messages)[i]);
368
generate_buffer_count (test_data->receive.buffer_count_strategy,
369
test_data->receive_size_rand, buffer_offset);
370
message->buffers = g_malloc_n (message->n_buffers, sizeof (GInputVector));
371
message->from = NULL;
374
for (j = 0; j < (guint) message->n_buffers; j++) {
375
GInputVector *buffer = &message->buffers[j];
379
generate_buffer_size (test_data->receive.buffer_size_strategy,
380
test_data->receive_size_rand, buffer_offset);
382
/* Trim the buffer length if it would otherwise cause the API to block. */
383
if (data->reliable) {
384
buf_len = MIN (buf_len, max_buffer_size);
385
max_buffer_size -= buf_len;
388
buffer->size = buf_len;
389
buffer->buffer = g_malloc (buffer->size);
391
/* Fill it with poison to try and detect incorrect writes. */
392
memset (buffer->buffer, 0xaa, buffer->size);
394
/* If we’ve hit the max_buffer_size, adjust the buffer and message counts
396
if (data->reliable && max_buffer_size == 0) {
397
message->n_buffers = j + 1;
405
/* Validate the length and data of a received buffer of length @buf_len, filled
406
* with @len valid bytes. Updates the internal state machine to mark the bytes
407
* as received. This consumes @buf. */
409
validate_received_buffer (TestIOStreamThreadData *data, gsize buffer_offset,
410
guint8 **buf, gsize buf_len, gssize len)
412
TestData *test_data = data->user_data;
413
guint8 *expected_buf;
415
g_assert_cmpint (len, <=, buf_len);
416
g_assert_cmpint (len, >=, 0);
418
if (stream_api_is_blocking (test_data->stream_api) && data->reliable)
419
g_assert_cmpint (len, ==, buf_len);
421
/* Validate the buffer contents.
423
* Note: Buffers can only be validated up to valid_len. The buffer may
424
* have been re-used internally (e.g. by receiving a STUN message, then
425
* overwriting it with a data packet), so we can’t guarantee that the
426
* bytes beyond valid_len have been untouched. */
427
expected_buf = g_malloc (buf_len);
428
memset (expected_buf, 0xaa, buf_len);
429
generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
431
g_assert (memcmp (*buf, expected_buf, len) == 0);
432
g_free (expected_buf);
434
test_data->received_bytes += len;
439
/* Similar to validate_received_buffer(), except it validates a message array
440
* instead of a single buffer. This consumes @messages. */
442
validate_received_messages (TestIOStreamThreadData *data, gsize buffer_offset,
443
NiceInputMessage *messages, guint n_messages, gint n_valid_messages)
445
TestData *test_data = data->user_data;
447
gsize prev_message_len = G_MAXSIZE;
449
g_assert_cmpint (n_valid_messages, <=, n_messages);
450
g_assert_cmpint (n_valid_messages, >=, 0);
452
if (stream_api_is_blocking (test_data->stream_api))
453
g_assert_cmpint (n_valid_messages, ==, n_messages);
455
test_data->received_messages += n_valid_messages;
457
/* Validate the message contents. */
458
for (i = 0; i < (guint) n_valid_messages; i++) {
459
NiceInputMessage *message = &messages[i];
461
gsize total_buf_len = 0;
462
gsize message_len_remaining = message->length;
464
g_assert_cmpint (message->n_buffers, >, 0);
466
for (j = 0; j < (guint) message->n_buffers; j++) {
467
GInputVector *buffer = &message->buffers[j];
470
/* See note above about valid_len. */
471
total_buf_len += buffer->size;
472
valid_len = MIN (message_len_remaining, buffer->size);
474
/* Only validate buffer content for reliable mode, anything could
475
* be received in UDP mode
477
if (test_data->reliable) {
478
guint8 *expected_buf;
480
expected_buf = g_malloc (buffer->size);
481
memset (expected_buf, 0xaa, buffer->size);
482
generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
483
expected_buf, valid_len);
484
g_assert_cmpint (memcmp (buffer->buffer, expected_buf, valid_len), ==,
486
g_free (expected_buf);
487
buffer_offset += valid_len;
488
message_len_remaining -= valid_len;
490
test_data->received_bytes += valid_len;
492
g_free (buffer->buffer);
495
g_assert_cmpuint (message->length, <=, total_buf_len);
496
g_assert_cmpuint (message->length, >=, 0);
498
/* No non-empty messages can follow an empty message. */
499
if (prev_message_len == 0)
500
g_assert_cmpuint (message->length, ==, 0);
501
prev_message_len = message->length;
503
/* If the API was blocking, it should have completely filled the message. */
504
if (stream_api_is_blocking (test_data->stream_api) && data->reliable)
505
g_assert_cmpuint (message->length, ==, total_buf_len);
507
g_assert (message->from == NULL);
509
g_free (message->buffers);
515
/* Determine a size for the next transmit buffer, allocate it, and fill it with
516
* data to be transmitted. */
518
generate_buffer_to_transmit (TestIOStreamThreadData *data, gsize buffer_offset,
519
guint8 **buf, gsize *buf_len)
521
TestData *test_data = data->user_data;
523
/* Allocate the buffer. */
524
*buf_len = generate_buffer_size (test_data->transmit.buffer_size_strategy,
525
test_data->transmit_size_rand, buffer_offset);
526
*buf_len = MIN (*buf_len, test_data->n_bytes - test_data->transmitted_bytes);
527
*buf = g_malloc (*buf_len);
529
/* Fill it with data. */
530
generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
534
/* Similar to generate_buffer_to_transmit(), except that it generates an array
535
* of NiceOutputMessages rather than a single buffer. */
537
generate_messages_to_transmit (TestIOStreamThreadData *data,
538
gsize buffer_offset, NiceOutputMessage **messages, guint *n_messages)
540
TestData *test_data = data->user_data;
542
gsize total_buf_len = 0;
544
/* Determine the number of messages to send. */
546
generate_message_count (test_data->transmit.message_count_strategy,
547
test_data->transmit_size_rand, buffer_offset);
550
test_data->n_messages - test_data->transmitted_messages);
552
*messages = g_malloc_n (*n_messages, sizeof (NiceOutputMessage));
554
for (i = 0; i < *n_messages; i++) {
555
NiceOutputMessage *message = &((*messages)[i]);
557
gsize max_message_size;
558
gsize message_len = 0;
561
generate_buffer_count (test_data->transmit.buffer_count_strategy,
562
test_data->transmit_size_rand, buffer_offset);
563
message->buffers = g_malloc_n (message->n_buffers, sizeof (GOutputVector));
565
/* Limit the overall message size to the smaller of (n_bytes / n_messages)
566
* and MAX_MESSAGE_SIZE, to ensure each message is non-empty. */
568
MIN ((test_data->n_bytes / test_data->n_messages), MAX_MESSAGE_SIZE);
570
for (j = 0; j < (guint) message->n_buffers; j++) {
571
GOutputVector *buffer = &message->buffers[j];
576
generate_buffer_size (test_data->transmit.buffer_size_strategy,
577
test_data->transmit_size_rand, buffer_offset);
580
test_data->n_bytes - test_data->transmitted_bytes - total_buf_len);
581
buf_len = MIN (buf_len, max_message_size - message_len);
583
buffer->size = buf_len;
584
buf = g_malloc (buffer->size);
585
buffer->buffer = buf;
586
message_len += buf_len;
587
total_buf_len += buf_len;
589
/* Fill it with data. */
590
generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
593
buffer_offset += buf_len;
595
/* Reached the maximum UDP payload size? */
596
if (message_len >= max_message_size) {
597
message->n_buffers = j + 1;
602
g_assert_cmpuint (message_len, <=, max_message_size);
606
/* Validate the number of bytes transmitted, and update the test’s internal
607
* state machine. Consumes @buf. */
609
notify_transmitted_buffer (TestIOStreamThreadData *data, gsize buffer_offset,
610
guint8 **buf, gsize buf_len, gssize len)
612
TestData *test_data = data->user_data;
614
g_assert_cmpint (len, <=, buf_len);
615
g_assert_cmpint (len, >=, 0);
617
test_data->transmitted_bytes += len;
623
output_message_get_size (const NiceOutputMessage *message)
626
gsize message_len = 0;
628
/* Find the total size of the message */
630
(message->n_buffers >= 0 && i < (guint) message->n_buffers) ||
631
(message->n_buffers < 0 && message->buffers[i].buffer != NULL);
633
message_len += message->buffers[i].size;
638
/* Similar to notify_transmitted_buffer(), except it operates on an array of
639
* messages from generate_messages_to_transmit(). */
641
notify_transmitted_messages (TestIOStreamThreadData *data, gsize buffer_offset,
642
NiceOutputMessage **messages, guint n_messages, gint n_sent_messages)
644
TestData *test_data = data->user_data;
647
g_assert_cmpint (n_sent_messages, <=, n_messages);
648
g_assert_cmpint (n_sent_messages, >=, 0);
650
test_data->transmitted_messages += n_sent_messages;
652
for (i = 0; i < n_messages; i++) {
653
NiceOutputMessage *message = &((*messages)[i]);
656
if (i < (guint) n_sent_messages)
657
test_data->transmitted_bytes += output_message_get_size (message);
659
for (j = 0; j < (guint) message->n_buffers; j++) {
660
GOutputVector *buffer = &message->buffers[j];
662
g_free ((guint8 *) buffer->buffer);
665
g_free (message->buffers);
672
* Implementation using nice_agent_recv_messages() and nice_agent_send().
675
read_thread_agent_cb (GInputStream *input_stream, TestIOStreamThreadData *data)
677
TestData *test_data = data->user_data;
678
guint stream_id, component_id;
681
tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
682
stream_id = GPOINTER_TO_UINT (tmp);
685
while (test_data->received_bytes < test_data->n_bytes) {
686
GError *error = NULL;
687
NiceInputMessage *messages;
689
gint n_valid_messages;
691
/* Initialise an array of messages to receive into. */
692
generate_messages_to_receive (data, test_data->received_bytes, &messages,
693
&n_messages, test_data->n_bytes - test_data->received_bytes,
694
test_data->n_messages - test_data->received_messages);
696
/* Block on receiving some data. */
697
n_valid_messages = nice_agent_recv_messages (data->agent, stream_id,
698
component_id, messages, n_messages, NULL, &error);
699
g_assert_no_error (error);
701
/* Check the messages and update the test’s state machine. */
702
validate_received_messages (data, test_data->received_bytes, messages,
703
n_messages, n_valid_messages);
706
check_for_termination (data, &test_data->received_bytes,
707
test_data->other_received_bytes, &test_data->transmitted_bytes,
712
write_thread_agent_cb (GOutputStream *output_stream,
713
TestIOStreamThreadData *data)
715
TestData *test_data = data->user_data;
716
guint stream_id, component_id;
719
tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
720
stream_id = GPOINTER_TO_UINT (tmp);
723
while (test_data->transmitted_bytes < test_data->n_bytes) {
724
GError *error = NULL;
725
NiceOutputMessage *messages;
727
gint n_sent_messages;
729
/* Generate a buffer to transmit. */
730
generate_messages_to_transmit (data, test_data->transmitted_bytes,
731
&messages, &n_messages);
733
/* Busy loop on receiving some data. */
735
g_clear_error (&error);
736
n_sent_messages = nice_agent_send_messages_nonblocking (data->agent,
737
stream_id, component_id, messages, n_messages, NULL, &error);
738
} while (n_sent_messages == -1 &&
739
g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK));
740
g_assert_no_error (error);
742
/* Update the test’s buffer generation state machine. */
743
notify_transmitted_messages (data, test_data->transmitted_bytes, &messages,
744
n_messages, n_sent_messages);
749
* Implementation using nice_agent_recv_nonblocking() and
750
* nice_agent_send_nonblocking().
753
read_thread_agent_nonblocking_cb (GInputStream *input_stream,
754
TestIOStreamThreadData *data)
756
TestData *test_data = data->user_data;
757
guint stream_id, component_id;
760
tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
761
stream_id = GPOINTER_TO_UINT (tmp);
764
while (test_data->received_bytes < test_data->n_bytes) {
765
GError *error = NULL;
766
NiceInputMessage *messages;
768
gint n_valid_messages;
770
/* Initialise an array of messages to receive into. */
771
generate_messages_to_receive (data, test_data->received_bytes, &messages,
772
&n_messages, test_data->n_bytes - test_data->received_bytes,
773
test_data->n_messages - test_data->received_messages);
775
/* Trim n_messages to avoid consuming the ‘done’ message. */
777
MIN (n_messages, test_data->n_messages - test_data->received_messages);
779
/* Busy loop on receiving some data. */
781
g_clear_error (&error);
782
n_valid_messages = nice_agent_recv_messages_nonblocking (data->agent,
783
stream_id, component_id, messages, n_messages, NULL, &error);
784
} while (n_valid_messages == -1 &&
785
g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK));
786
g_assert_no_error (error);
788
/* Check the messages and update the test’s state machine. */
789
validate_received_messages (data, test_data->received_bytes, messages,
790
n_messages, n_valid_messages);
793
check_for_termination (data, &test_data->received_bytes,
794
test_data->other_received_bytes, &test_data->transmitted_bytes,
799
write_thread_agent_nonblocking_cb (GOutputStream *output_stream,
800
TestIOStreamThreadData *data)
802
/* FIXME: There is no nice_agent_send_nonblocking(); nice_agent_send() is
803
* non-blocking by default. */
804
write_thread_agent_cb (output_stream, data);
808
* Implementation using NiceInputStream and NiceOutputStream.
811
read_thread_gio_cb (GInputStream *input_stream, TestIOStreamThreadData *data)
813
TestData *test_data = data->user_data;
815
while (test_data->received_bytes < test_data->n_bytes) {
816
GError *error = NULL;
821
/* Initialise a receive buffer. */
822
generate_buffer_to_receive (data, test_data->received_bytes, &buf,
825
/* Trim the receive buffer to avoid blocking on bytes which will never
827
buf_len = MIN (buf_len, test_data->n_bytes - test_data->received_bytes);
829
/* Block on receiving some data. */
830
len = g_input_stream_read (input_stream, buf, buf_len, NULL, &error);
831
g_assert_no_error (error);
833
/* Check the buffer and update the test’s state machine. */
834
validate_received_buffer (data, test_data->received_bytes, &buf, buf_len,
838
check_for_termination (data, &test_data->received_bytes,
839
test_data->other_received_bytes, &test_data->transmitted_bytes,
844
write_thread_gio_cb (GOutputStream *output_stream, TestIOStreamThreadData *data)
846
TestData *test_data = data->user_data;
848
while (test_data->transmitted_bytes < test_data->n_bytes) {
849
GError *error = NULL;
855
/* Generate a buffer to transmit. */
856
generate_buffer_to_transmit (data, test_data->transmitted_bytes, &buf,
861
len = g_output_stream_write (output_stream, buf + total_len,
862
buf_len - total_len, NULL, &error);
863
g_assert_no_error (error);
865
} while (total_len < buf_len);
867
/* Update the test’s buffer generation state machine. */
868
notify_transmitted_buffer (data, test_data->transmitted_bytes, &buf,
874
* Implementation using GPollableInputStream and GPollableOutputStream.
876
* GSourceData is effectively the closure for the ‘for’ loop in other stream API
880
TestIOStreamThreadData *data;
881
GMainLoop *main_loop;
885
read_stream_cb (GObject *pollable_stream, gpointer _user_data)
887
GSourceData *gsource_data = _user_data;
888
TestIOStreamThreadData *data = gsource_data->data;
889
TestData *test_data = data->user_data;
890
GError *error = NULL;
895
/* Initialise a receive buffer. */
896
generate_buffer_to_receive (data, test_data->received_bytes, &buf, &buf_len);
898
/* Trim the receive buffer to avoid consuming the ‘done’ message. */
899
buf_len = MIN (buf_len, test_data->n_bytes - test_data->received_bytes);
901
/* Try to receive some data. */
902
len = g_pollable_input_stream_read_nonblocking (
903
G_POLLABLE_INPUT_STREAM (pollable_stream), buf, buf_len, NULL, &error);
906
g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
911
g_assert_no_error (error);
913
/* Check the buffer and update the test’s state machine. */
914
validate_received_buffer (data, test_data->received_bytes, &buf, buf_len,
917
/* Termination time? */
918
if (test_data->received_bytes == test_data->n_bytes) {
919
g_main_loop_quit (gsource_data->main_loop);
927
read_thread_gsource_cb (GInputStream *input_stream,
928
TestIOStreamThreadData *data)
930
TestData *test_data = data->user_data;
931
GSourceData gsource_data;
932
GMainContext *main_context;
933
GMainLoop *main_loop;
934
GSource *stream_source;
936
main_context = g_main_context_ref_thread_default ();
937
main_loop = g_main_loop_new (main_context, FALSE);
939
gsource_data.data = data;
940
gsource_data.main_loop = main_loop;
943
g_pollable_input_stream_create_source (
944
G_POLLABLE_INPUT_STREAM (input_stream), NULL);
946
g_source_set_callback (stream_source, (GSourceFunc) read_stream_cb,
947
&gsource_data, NULL);
948
g_source_attach (stream_source, main_context);
950
/* Run the main loop. */
951
g_main_loop_run (main_loop);
953
g_source_destroy (stream_source);
954
g_source_unref (stream_source);
955
g_main_loop_unref (main_loop);
956
g_main_context_unref (main_context);
959
check_for_termination (data, &test_data->received_bytes,
960
test_data->other_received_bytes, &test_data->transmitted_bytes,
965
write_stream_cb (GObject *pollable_stream, gpointer _user_data)
967
GSourceData *gsource_data = _user_data;
968
TestIOStreamThreadData *data = gsource_data->data;
969
TestData *test_data = data->user_data;
970
GError *error = NULL;
975
/* Initialise a receive buffer. */
976
generate_buffer_to_transmit (data, test_data->transmitted_bytes, &buf,
979
/* Try to transmit some data. */
980
len = g_pollable_output_stream_write_nonblocking (
981
G_POLLABLE_OUTPUT_STREAM (pollable_stream), buf, buf_len, NULL, &error);
984
g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
989
g_assert_no_error (error);
991
/* Update the test’s buffer generation state machine. */
992
notify_transmitted_buffer (data, test_data->transmitted_bytes, &buf, buf_len,
995
/* Termination time? */
996
if (test_data->transmitted_bytes == test_data->n_bytes) {
997
g_main_loop_quit (gsource_data->main_loop);
1005
write_thread_gsource_cb (GOutputStream *output_stream,
1006
TestIOStreamThreadData *data)
1008
GSourceData gsource_data;
1009
GMainContext *main_context;
1010
GMainLoop *main_loop;
1011
GSource *stream_source;
1013
main_context = g_main_context_ref_thread_default ();
1014
main_loop = g_main_loop_new (main_context, FALSE);
1016
gsource_data.data = data;
1017
gsource_data.main_loop = main_loop;
1020
g_pollable_output_stream_create_source (
1021
G_POLLABLE_OUTPUT_STREAM (output_stream), NULL);
1023
g_source_set_callback (stream_source, (GSourceFunc) write_stream_cb,
1024
&gsource_data, NULL);
1025
g_source_attach (stream_source, main_context);
1027
/* Run the main loop. */
1028
g_main_loop_run (main_loop);
1030
g_source_destroy (stream_source);
1031
g_source_unref (stream_source);
1032
g_main_loop_unref (main_loop);
1033
g_main_context_unref (main_context);
1037
test_data_init (TestData *data, gboolean reliable, StreamApi stream_api,
1038
gsize n_bytes, guint n_messages,
1039
BufferSizeStrategy transmit_buffer_size_strategy,
1040
BufferCountStrategy transmit_buffer_count_strategy,
1041
MessageCountStrategy transmit_message_count_strategy,
1042
BufferSizeStrategy receive_buffer_size_strategy,
1043
BufferCountStrategy receive_buffer_count_strategy,
1044
MessageCountStrategy receive_message_count_strategy,
1045
BufferDataStrategy buffer_data_strategy, guint32 transmit_seed,
1046
guint32 receive_seed, gsize *other_received_bytes,
1047
guint *other_received_messages)
1049
data->reliable = reliable;
1050
data->stream_api = stream_api;
1051
data->n_bytes = n_bytes;
1052
data->n_messages = n_messages;
1053
data->transmit.buffer_size_strategy = transmit_buffer_size_strategy;
1054
data->transmit.buffer_count_strategy = transmit_buffer_count_strategy;
1055
data->transmit.message_count_strategy = transmit_message_count_strategy;
1056
data->receive.buffer_size_strategy = receive_buffer_size_strategy;
1057
data->receive.buffer_count_strategy = receive_buffer_count_strategy;
1058
data->receive.message_count_strategy = receive_message_count_strategy;
1059
data->buffer_data_strategy = buffer_data_strategy;
1060
data->transmit_size_rand = g_rand_new_with_seed (transmit_seed);
1061
data->receive_size_rand = g_rand_new_with_seed (receive_seed);
1062
data->transmitted_bytes = 0;
1063
data->received_bytes = 0;
1064
data->other_received_bytes = other_received_bytes;
1065
data->transmitted_messages = 0;
1066
data->received_messages = 0;
1067
data->other_received_messages = other_received_messages;
1074
test_data_clear (TestData *data)
1076
g_rand_free (data->receive_size_rand);
1077
g_rand_free (data->transmit_size_rand);
1081
test (gboolean reliable, StreamApi stream_api, gsize n_bytes, guint n_messages,
1082
BufferSizeStrategy transmit_buffer_size_strategy,
1083
BufferCountStrategy transmit_buffer_count_strategy,
1084
MessageCountStrategy transmit_message_count_strategy,
1085
BufferSizeStrategy receive_buffer_size_strategy,
1086
BufferCountStrategy receive_buffer_count_strategy,
1087
MessageCountStrategy receive_message_count_strategy,
1088
BufferDataStrategy buffer_data_strategy,
1089
guint32 transmit_seed, guint32 receive_seed,
1090
guint deadlock_timeout)
1092
TestData l_data, r_data;
1094
/* Indexed by StreamApi. */
1095
const TestIOStreamCallbacks callbacks[] = {
1096
{ read_thread_agent_cb,
1097
write_thread_agent_cb, NULL, NULL, }, /* STREAM_AGENT */
1098
{ read_thread_agent_nonblocking_cb, write_thread_agent_nonblocking_cb,
1099
NULL, NULL, }, /* STREAM_AGENT_NONBLOCKING */
1100
{ read_thread_gio_cb, write_thread_gio_cb, NULL, NULL, }, /* STREAM_GIO */
1101
{ read_thread_gsource_cb, write_thread_gsource_cb,
1102
NULL, NULL }, /* STREAM_GSOURCE */
1105
test_data_init (&l_data, reliable, stream_api, n_bytes, n_messages,
1106
transmit_buffer_size_strategy, transmit_buffer_count_strategy,
1107
transmit_message_count_strategy, receive_buffer_size_strategy,
1108
receive_buffer_count_strategy, receive_message_count_strategy,
1109
buffer_data_strategy, transmit_seed, receive_seed,
1110
&r_data.received_bytes, &r_data.received_messages);
1111
test_data_init (&r_data, reliable, stream_api, n_bytes, n_messages,
1112
transmit_buffer_size_strategy, transmit_buffer_count_strategy,
1113
transmit_message_count_strategy, receive_buffer_size_strategy,
1114
receive_buffer_count_strategy, receive_message_count_strategy,
1115
buffer_data_strategy, transmit_seed, receive_seed,
1116
&l_data.received_bytes, &l_data.received_messages);
1118
run_io_stream_test (deadlock_timeout, reliable, &callbacks[stream_api],
1119
&l_data, NULL, &r_data, NULL);
1121
test_data_clear (&r_data);
1122
test_data_clear (&l_data);
1125
/* Options with default values. */
1126
guint32 option_transmit_seed = 0;
1127
guint32 option_receive_seed = 0;
1128
gsize option_n_bytes = 10000;
1129
guint option_n_messages = 50;
1130
guint option_timeout = 1200; /* seconds */
1131
gboolean option_long_mode = FALSE;
1133
static GOptionEntry entries[] = {
1134
{ "transmit-seed", 0, 0, G_OPTION_ARG_INT, &option_transmit_seed,
1135
"Seed for transmission RNG", "S" },
1136
{ "receive-seed", 0, 0, G_OPTION_ARG_INT, &option_receive_seed,
1137
"Seed for reception RNG", "S" },
1138
{ "n-bytes", 'n', 0, G_OPTION_ARG_INT64, &option_n_bytes,
1139
"Number of bytes to send in each test (default 10000)", "N" },
1140
{ "n-messages", 'm', 0, G_OPTION_ARG_INT64, &option_n_messages,
1141
"Number of messages to send in each test (default 50)", "M" },
1142
{ "timeout", 't', 0, G_OPTION_ARG_INT, &option_timeout,
1143
"Deadlock detection timeout length, in seconds (default: 1200)", "S" },
1144
{ "long-mode", 'l', 0, G_OPTION_ARG_NONE, &option_long_mode,
1145
"Enable all tests, rather than a fast subset", NULL },
1150
main (int argc, char *argv[])
1153
StreamApi stream_api;
1154
BufferSizeStrategy transmit_buffer_size_strategy;
1155
BufferCountStrategy transmit_buffer_count_strategy;
1156
MessageCountStrategy transmit_message_count_strategy;
1157
BufferSizeStrategy receive_buffer_size_strategy;
1158
BufferCountStrategy receive_buffer_count_strategy;
1159
MessageCountStrategy receive_message_count_strategy;
1160
BufferDataStrategy buffer_data_strategy;
1161
guint32 transmit_seed;
1162
guint32 receive_seed;
1165
guint deadlock_timeout;
1167
GOptionContext *context;
1168
GError *error = NULL;
1170
/* Argument parsing. Allow some of the test parameters to be specified on the
1172
context = g_option_context_new ("— test send()/recv() correctness");
1173
g_option_context_add_main_entries (context, entries, NULL);
1175
if (!g_option_context_parse (context, &argc, &argv, &error)) {
1176
g_printerr ("Option parsing failed: %s\n", error->message);
1177
g_error_free (error);
1181
/* Set up the defaults. */
1182
transmit_seed = option_transmit_seed;
1183
receive_seed = option_receive_seed;
1184
n_bytes = option_n_bytes;
1185
n_messages = option_n_messages;
1186
deadlock_timeout = option_timeout;
1187
long_mode = option_long_mode;
1191
WSAStartup (0x0202, &w);
1194
g_thread_init (NULL);
1197
/* Quick mode. Just test each of the stream APIs in reliable and
1198
* non-reliable mode, with a single pair of buffer strategies, and a single
1202
for (reliable = 0; reliable < 2; reliable++) {
1204
for (stream_api = 0;
1205
(guint) stream_api < STREAM_API_N_ELEMENTS;
1207
/* GIO streams must always be reliable. */
1208
if (!reliable && stream_api_is_reliable_only (stream_api))
1211
/* Non-reliable socket receives require large buffers. */
1213
receive_buffer_size_strategy = BUFFER_SIZE_RANDOM;
1215
receive_buffer_size_strategy = BUFFER_SIZE_CONSTANT_LARGE;
1218
transmit_buffer_size_strategy = BUFFER_SIZE_RANDOM;
1219
buffer_data_strategy = BUFFER_DATA_PSEUDO_RANDOM;
1221
if (stream_api_supports_vectored_io (stream_api)) {
1222
transmit_buffer_count_strategy = BUFFER_COUNT_RANDOM;
1223
transmit_message_count_strategy = MESSAGE_COUNT_RANDOM;
1224
receive_buffer_count_strategy = BUFFER_COUNT_RANDOM;
1225
receive_message_count_strategy = MESSAGE_COUNT_RANDOM;
1227
transmit_buffer_count_strategy = BUFFER_COUNT_CONSTANT_ONE;
1228
transmit_message_count_strategy = MESSAGE_COUNT_CONSTANT_ONE;
1229
receive_buffer_count_strategy = BUFFER_COUNT_CONSTANT_ONE;
1230
receive_message_count_strategy = MESSAGE_COUNT_CONSTANT_ONE;
1233
g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, %u, "
1235
reliable, stream_api, n_bytes, n_messages,
1236
transmit_buffer_size_strategy,
1237
receive_buffer_size_strategy, buffer_data_strategy,
1238
transmit_seed, receive_seed);
1239
test (reliable, stream_api, n_bytes, n_messages,
1240
transmit_buffer_size_strategy,
1241
transmit_buffer_count_strategy, transmit_message_count_strategy,
1242
receive_buffer_size_strategy, receive_buffer_count_strategy,
1243
receive_message_count_strategy, buffer_data_strategy,
1244
transmit_seed, receive_seed,
1252
#define STRATEGY_LOOP(V, L) for (V = 0; (guint) V < L##_N_ELEMENTS; V++)
1253
STRATEGY_LOOP(transmit_buffer_size_strategy, BUFFER_SIZE_STRATEGY)
1254
STRATEGY_LOOP(transmit_buffer_count_strategy, BUFFER_COUNT_STRATEGY)
1255
STRATEGY_LOOP(transmit_message_count_strategy, MESSAGE_COUNT_STRATEGY)
1256
STRATEGY_LOOP(receive_buffer_size_strategy, BUFFER_SIZE_STRATEGY)
1257
STRATEGY_LOOP(receive_buffer_count_strategy, BUFFER_COUNT_STRATEGY)
1258
STRATEGY_LOOP(receive_message_count_strategy, MESSAGE_COUNT_STRATEGY)
1259
STRATEGY_LOOP(buffer_data_strategy, BUFFER_DATA_STRATEGY)
1261
for (reliable = 0; reliable < 2; reliable++) {
1263
for (stream_api = 0;
1264
(guint) stream_api < STREAM_API_N_ELEMENTS;
1266
/* GIO streams must always be reliable. */
1267
if (!reliable && stream_api_is_reliable_only (stream_api))
1270
/* Non-reliable socket receives require large buffers. We don’t claim to
1271
* support using them with small (< 65536B) buffers, so don’t test
1274
receive_buffer_size_strategy != BUFFER_SIZE_CONSTANT_LARGE)
1277
/* Non-reliable socket transmits will always block with huge buffers. */
1279
transmit_buffer_size_strategy == BUFFER_SIZE_CONSTANT_LARGE)
1282
/* Stream APIs which don’t support vectored I/O must not be passed
1284
if (!stream_api_supports_vectored_io (stream_api) &&
1285
(transmit_buffer_count_strategy != BUFFER_COUNT_CONSTANT_ONE ||
1286
transmit_message_count_strategy != MESSAGE_COUNT_CONSTANT_ONE ||
1287
receive_buffer_count_strategy != BUFFER_COUNT_CONSTANT_ONE ||
1288
receive_message_count_strategy != MESSAGE_COUNT_CONSTANT_ONE))
1291
g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, %u, "
1292
"%u, %u, %u, %u, %u, %u, %u, %u)…",
1293
reliable, stream_api, n_bytes, n_messages,
1294
transmit_buffer_size_strategy,
1295
transmit_buffer_count_strategy, transmit_message_count_strategy,
1296
receive_buffer_size_strategy, receive_buffer_count_strategy,
1297
receive_message_count_strategy, buffer_data_strategy,
1298
transmit_seed, receive_seed);
1299
test (reliable, stream_api, n_bytes, n_messages,
1300
transmit_buffer_size_strategy,
1301
transmit_buffer_count_strategy, transmit_message_count_strategy,
1302
receive_buffer_size_strategy, receive_buffer_count_strategy,
1303
receive_message_count_strategy, buffer_data_strategy,
1304
transmit_seed, receive_seed,