~ubuntu-branches/debian/stretch/libnice/stretch

« back to all changes in this revision

Viewing changes to tests/test-send-recv.c

  • Committer: Package Import Robot
  • Author(s): Simon McVittie
  • Date: 2014-05-14 12:00:13 UTC
  • mfrom: (1.2.9) (5.1.13 sid)
  • Revision ID: package-import@ubuntu.com-20140514120013-fi5mh9bexrjnwnd8
Tags: 0.1.7-1
* New upstream release 0.1.6, 0.1.7
  - fixes various compiler warnings that were mistakenly fatal in 0.1.5
    (Closes: #743232, #743233)
  - update symbols file for new API
* Explicitly disable -Werror, even if we package a non-release in future
* Don't run tests during the build. We were ignoring failures already,
  and they sometimes hang until the buildd terminates them.
  Upstream (Olivier Crête) says they are stable enough to be useful
  for developers, but not for integration testing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * This file is part of the Nice GLib ICE library.
 
3
 *
 
4
 * (C) 2014 Collabora Ltd.
 
5
 *  Contact: Philip Withnall
 
6
 *
 
7
 * The contents of this file are subject to the Mozilla Public License Version
 
8
 * 1.1 (the "License"); you may not use this file except in compliance with
 
9
 * the License. You may obtain a copy of the License at
 
10
 * http://www.mozilla.org/MPL/
 
11
 *
 
12
 * Software distributed under the License is distributed on an "AS IS" basis,
 
13
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
 
14
 * for the specific language governing rights and limitations under the
 
15
 * License.
 
16
 *
 
17
 * The Original Code is the Nice GLib ICE library.
 
18
 *
 
19
 * The Initial Developers of the Original Code are Collabora Ltd and Nokia
 
20
 * Corporation. All Rights Reserved.
 
21
 *
 
22
 * Contributors:
 
23
 *   Philip Withnall, Collabora Ltd.
 
24
 *
 
25
 * Alternatively, the contents of this file may be used under the terms of the
 
26
 * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
 
27
 * case the provisions of LGPL are applicable instead of those above. If you
 
28
 * wish to allow use of your version of this file only under the terms of the
 
29
 * LGPL and not to allow others to use your version of this file under the
 
30
 * MPL, indicate your decision by deleting the provisions above and replace
 
31
 * them with the notice and other provisions required by the LGPL. If you do
 
32
 * not delete the provisions above, a recipient may use your version of this
 
33
 * file under either the MPL or the LGPL.
 
34
 */
 
35
 
 
36
/**
 
37
 * This is a comprehensive unit test for send() and recv() behaviour in libnice,
 
38
 * covering all APIs except the old nice_agent_attach_recv() one. It aims to
 
39
 * test the correctness of reliable and non-reliable I/O through libnice, using
 
40
 * a variety of data and a variety of buffer sizes.
 
41
 *
 
42
 * Abnormal features like error handling, zero-length buffer handling, stream
 
43
 * closure and cancellation are not tested.
 
44
 *
 
45
 * This is *not* a performance test, and would require significant work to be
 
46
 * useful as one. It allocates all of its buffers dynamically, and walks over
 
47
 * them frequently to set and check data.
 
48
 *
 
49
 * Several of the strategies in the test make use of random numbers. The seed
 
50
 * values for these are deterministically set (in main()), but may be specified
 
51
 * on the command line to allow fuzzing.
 
52
 */
 
53
 
 
54
#ifdef HAVE_CONFIG_H
 
55
# include <config.h>
 
56
#endif
 
57
 
 
58
#include "agent.h"
 
59
#include "test-io-stream-common.h"
 
60
 
 
61
#include <stdlib.h>
 
62
#include <string.h>
 
63
#ifndef G_OS_WIN32
 
64
#include <unistd.h>
 
65
#endif
 
66
 
 
67
/* Maximum IP payload ((1 << 16) - 1), minus IP header, minus UDP header. */
 
68
#define MAX_MESSAGE_SIZE (65535 - 20 - 8) /* bytes */
 
69
 
 
70
typedef enum {
 
71
  STREAM_AGENT,  /* nice_agent_[send|recv]() */
 
72
  STREAM_AGENT_NONBLOCKING,  /* nice_agent_[send|recv]_nonblocking() */
 
73
  STREAM_GIO,  /* Nice[Input|Output]Stream */
 
74
  STREAM_GSOURCE,  /* GPollable[Input|Output]Stream */
 
75
} StreamApi;
 
76
#define STREAM_API_N_ELEMENTS (STREAM_GSOURCE + 1)
 
77
 
 
78
typedef enum {
 
79
  BUFFER_SIZE_CONSTANT_LARGE,  /* always 65535 bytes */
 
80
  BUFFER_SIZE_CONSTANT_SMALL,  /* always 4096 bytes */
 
81
  BUFFER_SIZE_CONSTANT_TINY,  /* always 1 byte */
 
82
  BUFFER_SIZE_ASCENDING,  /* ascending powers of 2 */
 
83
  BUFFER_SIZE_RANDOM,  /* random every time */
 
84
} BufferSizeStrategy;
 
85
#define BUFFER_SIZE_STRATEGY_N_ELEMENTS (BUFFER_SIZE_RANDOM + 1)
 
86
 
 
87
typedef enum {
 
88
  BUFFER_COUNT_CONSTANT_ONE,  /* always a single buffer */
 
89
  BUFFER_COUNT_CONSTANT_TWO,  /* always two buffers */
 
90
  BUFFER_COUNT_RANDOM,  /* random every time */
 
91
} BufferCountStrategy;
 
92
#define BUFFER_COUNT_STRATEGY_N_ELEMENTS (BUFFER_COUNT_RANDOM + 1)
 
93
 
 
94
typedef enum {
 
95
  MESSAGE_COUNT_CONSTANT_ONE,  /* always a single message */
 
96
  MESSAGE_COUNT_CONSTANT_TWO,  /* always two messages */
 
97
  MESSAGE_COUNT_RANDOM,  /* random every time */
 
98
} MessageCountStrategy;
 
99
#define MESSAGE_COUNT_STRATEGY_N_ELEMENTS (MESSAGE_COUNT_RANDOM + 1)
 
100
 
 
101
typedef enum {
 
102
  BUFFER_DATA_CONSTANT,  /* fill with 0xfe */
 
103
  BUFFER_DATA_ASCENDING,  /* ascending values for each byte */
 
104
  BUFFER_DATA_PSEUDO_RANDOM,  /* every byte is pseudo-random */
 
105
} BufferDataStrategy;
 
106
#define BUFFER_DATA_STRATEGY_N_ELEMENTS (BUFFER_DATA_PSEUDO_RANDOM + 1)
 
107
 
 
108
typedef struct {
 
109
  /* Test configuration (immutable per test run). */
 
110
  gboolean reliable;
 
111
  StreamApi stream_api;
 
112
  struct {
 
113
    BufferSizeStrategy buffer_size_strategy;
 
114
    BufferCountStrategy buffer_count_strategy;
 
115
    MessageCountStrategy message_count_strategy;
 
116
  } transmit;
 
117
  struct {
 
118
    BufferSizeStrategy buffer_size_strategy;
 
119
    BufferCountStrategy buffer_count_strategy;
 
120
    MessageCountStrategy message_count_strategy;
 
121
  } receive;
 
122
  BufferDataStrategy buffer_data_strategy;
 
123
  gsize n_bytes;
 
124
  guint n_messages;
 
125
 
 
126
  /* Test state. */
 
127
  GRand *transmit_size_rand;
 
128
  GRand *receive_size_rand;
 
129
  gsize transmitted_bytes;
 
130
  gsize received_bytes;
 
131
  gsize *other_received_bytes;
 
132
  guint transmitted_messages;
 
133
  guint received_messages;
 
134
  guint *other_received_messages;
 
135
} TestData;
 
136
 
 
137
/* Whether @stream_api is blocking (vs. non-blocking). */
 
138
static gboolean
 
139
stream_api_is_blocking (StreamApi stream_api)
 
140
{
 
141
  switch (stream_api) {
 
142
  case STREAM_AGENT:
 
143
  case STREAM_GIO:
 
144
    return TRUE;
 
145
  case STREAM_AGENT_NONBLOCKING:
 
146
  case STREAM_GSOURCE:
 
147
    return FALSE;
 
148
  default:
 
149
    g_assert_not_reached ();
 
150
  }
 
151
}
 
152
 
 
153
/* Whether @stream_api only works for reliable NiceAgents. */
 
154
static gboolean
 
155
stream_api_is_reliable_only (StreamApi stream_api)
 
156
{
 
157
  switch (stream_api) {
 
158
  case STREAM_GSOURCE:
 
159
  case STREAM_GIO:
 
160
    return TRUE;
 
161
  case STREAM_AGENT:
 
162
  case STREAM_AGENT_NONBLOCKING:
 
163
    return FALSE;
 
164
  default:
 
165
    g_assert_not_reached ();
 
166
  }
 
167
}
 
168
 
 
169
/* Whether @stream_api supports vectored I/O (multiple buffers or messages). */
 
170
static gboolean
 
171
stream_api_supports_vectored_io (StreamApi stream_api)
 
172
{
 
173
  switch (stream_api) {
 
174
  case STREAM_AGENT:
 
175
  case STREAM_AGENT_NONBLOCKING:
 
176
    return TRUE;
 
177
  case STREAM_GSOURCE:
 
178
  case STREAM_GIO:
 
179
    return FALSE;
 
180
  default:
 
181
    g_assert_not_reached ();
 
182
  }
 
183
}
 
184
 
 
185
/* Generate a size for the buffer containing the @buffer_offset-th byte.
 
186
 * Guaranteed to be in the interval [1, 1 << 16). ((1 << 16) is the maximum
 
187
 * message size.) */
 
188
static gsize
 
189
generate_buffer_size (BufferSizeStrategy strategy, GRand *grand,
 
190
    gsize buffer_offset)
 
191
{
 
192
  switch (strategy) {
 
193
  case BUFFER_SIZE_CONSTANT_LARGE:
 
194
    return (1 << 16) - 1;
 
195
 
 
196
  case BUFFER_SIZE_CONSTANT_SMALL:
 
197
    return 4096;
 
198
 
 
199
  case BUFFER_SIZE_CONSTANT_TINY:
 
200
    return 1;
 
201
 
 
202
  case BUFFER_SIZE_ASCENDING:
 
203
    return CLAMP (1L << buffer_offset, 1, (1 << 16) - 1);
 
204
 
 
205
  case BUFFER_SIZE_RANDOM:
 
206
    return g_rand_int_range (grand, 1, 1 << 16);
 
207
 
 
208
  default:
 
209
    g_assert_not_reached ();
 
210
  }
 
211
}
 
212
 
 
213
/* Generate a number of buffers to allocate when receiving the @buffer_offset-th
 
214
 * byte. Guaranteed to be in the interval [1, 100], where 100 was chosen
 
215
 * arbitrarily.*/
 
216
static guint
 
217
generate_buffer_count (BufferCountStrategy strategy, GRand *grand,
 
218
    gsize buffer_offset)
 
219
{
 
220
  switch (strategy) {
 
221
  case BUFFER_COUNT_CONSTANT_ONE:
 
222
    return 1;
 
223
 
 
224
  case BUFFER_COUNT_CONSTANT_TWO:
 
225
    return 2;
 
226
 
 
227
  case BUFFER_COUNT_RANDOM:
 
228
    return g_rand_int_range (grand, 1, 100 + 1);
 
229
 
 
230
  default:
 
231
    g_assert_not_reached ();
 
232
  }
 
233
}
 
234
 
 
235
/* Generate a number of messages to allocate and receive into when receiving the
 
236
 * @buffer_offset-th byte. Guaranteed to be in the interval [1, 100], where 100
 
237
 * was chosen arbitrarily.*/
 
238
static guint
 
239
generate_message_count (MessageCountStrategy strategy, GRand *grand,
 
240
    guint buffer_index)
 
241
{
 
242
  switch (strategy) {
 
243
  case MESSAGE_COUNT_CONSTANT_ONE:
 
244
    return 1;
 
245
 
 
246
  case MESSAGE_COUNT_CONSTANT_TWO:
 
247
    return 2;
 
248
 
 
249
  case MESSAGE_COUNT_RANDOM:
 
250
    return g_rand_int_range (grand, 1, 100 + 1);
 
251
 
 
252
  default:
 
253
    g_assert_not_reached ();
 
254
  }
 
255
}
 
256
 
 
257
/* Fill the given @buf with @buf_len bytes of generated data. The data is
 
258
 * deterministically generated, so that:
 
259
 *     generate_buffer_data(_, I, buf, 2)
 
260
 * and
 
261
 *     generate_buffer_data(_, I+1, buf+1, 1)
 
262
 * generate the same buf[I+1] byte, for all I.
 
263
 *
 
264
 * The generation strategies are generally chosen to produce data which makes
 
265
 * send/receive errors (insertions, swaps, elisions) obvious. */
 
266
static void
 
267
generate_buffer_data (BufferDataStrategy strategy, gsize buffer_offset,
 
268
    guint8 *buf, gsize buf_len)
 
269
{
 
270
  switch (strategy) {
 
271
  case BUFFER_DATA_CONSTANT:
 
272
    memset (buf, 0xfe, buf_len);
 
273
    break;
 
274
 
 
275
  case BUFFER_DATA_ASCENDING: {
 
276
    gsize i;
 
277
 
 
278
    for (i = 0; i < buf_len; i++) {
 
279
      buf[i] = (i + buffer_offset) & 0xff;
 
280
    }
 
281
 
 
282
    break;
 
283
  }
 
284
 
 
285
  case BUFFER_DATA_PSEUDO_RANDOM: {
 
286
    gsize i;
 
287
 
 
288
    /* This can’t use GRand, because then the number of calls to g_rand_*()
 
289
     * methods would affect its output, and the bytes generated here have to be
 
290
     * entirely deterministic on @buffer_offset.
 
291
     *
 
292
     * Instead, use something akin to a LCG, except without any feedback
 
293
     * (because that would make it non-deterministic). The objective is to
 
294
     * generate numbers which are sufficiently pseudo-random that it’s likely
 
295
     * transpositions, elisions and insertions will be detected.
 
296
     *
 
297
     * The constants come from ‘ANSI C’ in:
 
298
     * http://en.wikipedia.org/wiki/Linear_congruential_generator
 
299
     */
 
300
    for (i = 0; i < buf_len; i++) {
 
301
      buf[i] = (1103515245 * (buffer_offset + i) + 12345) & 0xff;
 
302
    }
 
303
 
 
304
    break;
 
305
  }
 
306
 
 
307
  default:
 
308
    g_assert_not_reached ();
 
309
  }
 
310
}
 
311
 
 
312
/* Choose a size and allocate a receive buffer in @buf, ready to receive bytes
 
313
 * starting at @buffer_offset into the stream. Fill the buffer with poison
 
314
 * values to hopefully make incorrect writes/reads more obvious.
 
315
 *
 
316
 * @buf must be freed with g_free(). */
 
317
static void
 
318
generate_buffer_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
 
319
    guint8 **buf, gsize *buf_len)
 
320
{
 
321
  TestData *test_data = data->user_data;
 
322
 
 
323
  /* Allocate the buffer. */
 
324
  *buf_len = generate_buffer_size (test_data->receive.buffer_size_strategy,
 
325
      test_data->receive_size_rand, buffer_offset);
 
326
  *buf = g_malloc (*buf_len);
 
327
 
 
328
  /* Fill it with poison to try and detect incorrect writes. */
 
329
  memset (*buf, 0xaa, *buf_len);
 
330
}
 
331
 
 
332
/* Similar to generate_buffer_to_receive(), but generate an entire message array
 
333
 * with multiple buffers instead.
 
334
 *
 
335
 * @max_buffer_size may be used to limit the total size of all the buffers in
 
336
 * all the messages, for example to avoid blocking on receiving data which will
 
337
 * never be sent. This only applies for blocking, reliable stream APIs.
 
338
 *
 
339
 * @max_n_messages may be used to limit the number of messages generated, to
 
340
 * avoid blocking on receiving messages which will never be sent. This only
 
341
 * applies for blocking, non-reliable stream APIs.
 
342
 *
 
343
 * @messages must be freed with g_free(), as must all of the buffer arrays and
 
344
 * the buffers themselves. */
 
345
static void
 
346
generate_messages_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
 
347
    NiceInputMessage **messages, guint *n_messages, gsize max_buffer_size,
 
348
    guint max_n_messages)
 
349
{
 
350
  TestData *test_data = data->user_data;
 
351
  guint i;
 
352
 
 
353
  /* Allocate the messages. */
 
354
  *n_messages =
 
355
      generate_message_count (test_data->receive.message_count_strategy,
 
356
          test_data->receive_size_rand, buffer_offset);
 
357
 
 
358
  if (!data->reliable)
 
359
    *n_messages = MIN (*n_messages, max_n_messages);
 
360
 
 
361
  *messages = g_malloc_n (*n_messages, sizeof (NiceInputMessage));
 
362
 
 
363
  for (i = 0; i < *n_messages; i++) {
 
364
    NiceInputMessage *message = &((*messages)[i]);
 
365
    guint j;
 
366
 
 
367
    message->n_buffers =
 
368
        generate_buffer_count (test_data->receive.buffer_count_strategy,
 
369
            test_data->receive_size_rand, buffer_offset);
 
370
    message->buffers = g_malloc_n (message->n_buffers, sizeof (GInputVector));
 
371
    message->from = NULL;
 
372
    message->length = 0;
 
373
 
 
374
    for (j = 0; j < (guint) message->n_buffers; j++) {
 
375
      GInputVector *buffer = &message->buffers[j];
 
376
      gsize buf_len;
 
377
 
 
378
      buf_len =
 
379
          generate_buffer_size (test_data->receive.buffer_size_strategy,
 
380
              test_data->receive_size_rand, buffer_offset);
 
381
 
 
382
      /* Trim the buffer length if it would otherwise cause the API to block. */
 
383
      if (data->reliable) {
 
384
        buf_len = MIN (buf_len, max_buffer_size);
 
385
        max_buffer_size -= buf_len;
 
386
      }
 
387
 
 
388
      buffer->size = buf_len;
 
389
      buffer->buffer = g_malloc (buffer->size);
 
390
 
 
391
      /* Fill it with poison to try and detect incorrect writes. */
 
392
      memset (buffer->buffer, 0xaa, buffer->size);
 
393
 
 
394
      /* If we’ve hit the max_buffer_size, adjust the buffer and message counts
 
395
       * and run away. */
 
396
      if (data->reliable && max_buffer_size == 0) {
 
397
        message->n_buffers = j + 1;
 
398
        *n_messages = i + 1;
 
399
        return;
 
400
      }
 
401
    }
 
402
  }
 
403
}
 
404
 
 
405
/* Validate the length and data of a received buffer of length @buf_len, filled
 
406
 * with @len valid bytes. Updates the internal state machine to mark the bytes
 
407
 * as received. This consumes @buf. */
 
408
static void
 
409
validate_received_buffer (TestIOStreamThreadData *data, gsize buffer_offset,
 
410
    guint8 **buf, gsize buf_len, gssize len)
 
411
{
 
412
  TestData *test_data = data->user_data;
 
413
  guint8 *expected_buf;
 
414
 
 
415
  g_assert_cmpint (len, <=, buf_len);
 
416
  g_assert_cmpint (len, >=, 0);
 
417
 
 
418
  if (stream_api_is_blocking (test_data->stream_api) && data->reliable)
 
419
    g_assert_cmpint (len, ==, buf_len);
 
420
 
 
421
  /* Validate the buffer contents.
 
422
   *
 
423
   * Note: Buffers can only be validated up to valid_len. The buffer may
 
424
   * have been re-used internally (e.g. by receiving a STUN message, then
 
425
   * overwriting it with a data packet), so we can’t guarantee that the
 
426
   * bytes beyond valid_len have been untouched. */
 
427
  expected_buf = g_malloc (buf_len);
 
428
  memset (expected_buf, 0xaa, buf_len);
 
429
  generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
 
430
      expected_buf, len);
 
431
  g_assert (memcmp (*buf, expected_buf, len) == 0);
 
432
  g_free (expected_buf);
 
433
 
 
434
  test_data->received_bytes += len;
 
435
 
 
436
  g_free (*buf);
 
437
}
 
438
 
 
439
/* Similar to validate_received_buffer(), except it validates a message array
 
440
 * instead of a single buffer. This consumes @messages. */
 
441
static void
 
442
validate_received_messages (TestIOStreamThreadData *data, gsize buffer_offset,
 
443
    NiceInputMessage *messages, guint n_messages, gint n_valid_messages)
 
444
{
 
445
  TestData *test_data = data->user_data;
 
446
  guint i;
 
447
  gsize prev_message_len = G_MAXSIZE;
 
448
 
 
449
  g_assert_cmpint (n_valid_messages, <=, n_messages);
 
450
  g_assert_cmpint (n_valid_messages, >=, 0);
 
451
 
 
452
  if (stream_api_is_blocking (test_data->stream_api))
 
453
    g_assert_cmpint (n_valid_messages, ==, n_messages);
 
454
 
 
455
  test_data->received_messages += n_valid_messages;
 
456
 
 
457
  /* Validate the message contents. */
 
458
  for (i = 0; i < (guint) n_valid_messages; i++) {
 
459
    NiceInputMessage *message = &messages[i];
 
460
    guint j;
 
461
    gsize total_buf_len = 0;
 
462
    gsize message_len_remaining = message->length;
 
463
 
 
464
    g_assert_cmpint (message->n_buffers, >, 0);
 
465
 
 
466
    for (j = 0; j < (guint) message->n_buffers; j++) {
 
467
      GInputVector *buffer = &message->buffers[j];
 
468
      gsize valid_len;
 
469
 
 
470
      /* See note above about valid_len. */
 
471
      total_buf_len += buffer->size;
 
472
      valid_len = MIN (message_len_remaining, buffer->size);
 
473
 
 
474
      /* Only validate buffer content for reliable mode, anything could
 
475
       * be received in UDP mode
 
476
       */
 
477
      if (test_data->reliable) {
 
478
        guint8 *expected_buf;
 
479
 
 
480
        expected_buf = g_malloc (buffer->size);
 
481
        memset (expected_buf, 0xaa, buffer->size);
 
482
        generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
 
483
            expected_buf, valid_len);
 
484
        g_assert_cmpint (memcmp (buffer->buffer, expected_buf, valid_len), ==,
 
485
            0);
 
486
        g_free (expected_buf);
 
487
        buffer_offset += valid_len;
 
488
        message_len_remaining -= valid_len;
 
489
      }
 
490
      test_data->received_bytes += valid_len;
 
491
 
 
492
      g_free (buffer->buffer);
 
493
    }
 
494
 
 
495
    g_assert_cmpuint (message->length, <=, total_buf_len);
 
496
    g_assert_cmpuint (message->length, >=, 0);
 
497
 
 
498
    /* No non-empty messages can follow an empty message. */
 
499
    if (prev_message_len == 0)
 
500
      g_assert_cmpuint (message->length, ==, 0);
 
501
    prev_message_len = message->length;
 
502
 
 
503
    /* If the API was blocking, it should have completely filled the message. */
 
504
    if (stream_api_is_blocking (test_data->stream_api) && data->reliable)
 
505
      g_assert_cmpuint (message->length, ==, total_buf_len);
 
506
 
 
507
    g_assert (message->from == NULL);
 
508
 
 
509
    g_free (message->buffers);
 
510
  }
 
511
 
 
512
  g_free (messages);
 
513
}
 
514
 
 
515
/* Determine a size for the next transmit buffer, allocate it, and fill it with
 
516
 * data to be transmitted. */
 
517
static void
 
518
generate_buffer_to_transmit (TestIOStreamThreadData *data, gsize buffer_offset,
 
519
    guint8 **buf, gsize *buf_len)
 
520
{
 
521
  TestData *test_data = data->user_data;
 
522
 
 
523
  /* Allocate the buffer. */
 
524
  *buf_len = generate_buffer_size (test_data->transmit.buffer_size_strategy,
 
525
      test_data->transmit_size_rand, buffer_offset);
 
526
  *buf_len = MIN (*buf_len, test_data->n_bytes - test_data->transmitted_bytes);
 
527
  *buf = g_malloc (*buf_len);
 
528
 
 
529
  /* Fill it with data. */
 
530
  generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
 
531
      *buf, *buf_len);
 
532
}
 
533
 
 
534
/* Similar to generate_buffer_to_transmit(), except that it generates an array
 
535
 * of NiceOutputMessages rather than a single buffer. */
 
536
static void
 
537
generate_messages_to_transmit (TestIOStreamThreadData *data,
 
538
    gsize buffer_offset, NiceOutputMessage **messages, guint *n_messages)
 
539
{
 
540
  TestData *test_data = data->user_data;
 
541
  guint i;
 
542
  gsize total_buf_len = 0;
 
543
 
 
544
  /* Determine the number of messages to send. */
 
545
  *n_messages =
 
546
      generate_message_count (test_data->transmit.message_count_strategy,
 
547
          test_data->transmit_size_rand, buffer_offset);
 
548
  *n_messages =
 
549
      MIN (*n_messages,
 
550
          test_data->n_messages - test_data->transmitted_messages);
 
551
 
 
552
  *messages = g_malloc_n (*n_messages, sizeof (NiceOutputMessage));
 
553
 
 
554
  for (i = 0; i < *n_messages; i++) {
 
555
    NiceOutputMessage *message = &((*messages)[i]);
 
556
    guint j;
 
557
    gsize max_message_size;
 
558
    gsize message_len = 0;
 
559
 
 
560
    message->n_buffers =
 
561
        generate_buffer_count (test_data->transmit.buffer_count_strategy,
 
562
            test_data->transmit_size_rand, buffer_offset);
 
563
    message->buffers = g_malloc_n (message->n_buffers, sizeof (GOutputVector));
 
564
 
 
565
    /* Limit the overall message size to the smaller of (n_bytes / n_messages)
 
566
     * and MAX_MESSAGE_SIZE, to ensure each message is non-empty. */
 
567
    max_message_size =
 
568
        MIN ((test_data->n_bytes / test_data->n_messages), MAX_MESSAGE_SIZE);
 
569
 
 
570
    for (j = 0; j < (guint) message->n_buffers; j++) {
 
571
      GOutputVector *buffer = &message->buffers[j];
 
572
      gsize buf_len;
 
573
      guint8 *buf;
 
574
 
 
575
      buf_len =
 
576
          generate_buffer_size (test_data->transmit.buffer_size_strategy,
 
577
              test_data->transmit_size_rand, buffer_offset);
 
578
      buf_len =
 
579
          MIN (buf_len,
 
580
              test_data->n_bytes - test_data->transmitted_bytes - total_buf_len);
 
581
      buf_len = MIN (buf_len, max_message_size - message_len);
 
582
 
 
583
      buffer->size = buf_len;
 
584
      buf = g_malloc (buffer->size);
 
585
      buffer->buffer = buf;
 
586
      message_len += buf_len;
 
587
      total_buf_len += buf_len;
 
588
 
 
589
      /* Fill it with data. */
 
590
      generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
 
591
          buf, buf_len);
 
592
 
 
593
      buffer_offset += buf_len;
 
594
 
 
595
      /* Reached the maximum UDP payload size? */
 
596
      if (message_len >= max_message_size) {
 
597
        message->n_buffers = j + 1;
 
598
        break;
 
599
      }
 
600
    }
 
601
 
 
602
    g_assert_cmpuint (message_len, <=, max_message_size);
 
603
  }
 
604
}
 
605
 
 
606
/* Validate the number of bytes transmitted, and update the test’s internal
 
607
 * state machine. Consumes @buf. */
 
608
static void
 
609
notify_transmitted_buffer (TestIOStreamThreadData *data, gsize buffer_offset,
 
610
    guint8 **buf, gsize buf_len, gssize len)
 
611
{
 
612
  TestData *test_data = data->user_data;
 
613
 
 
614
  g_assert_cmpint (len, <=, buf_len);
 
615
  g_assert_cmpint (len, >=, 0);
 
616
 
 
617
  test_data->transmitted_bytes += len;
 
618
 
 
619
  g_free (*buf);
 
620
}
 
621
 
 
622
static gsize
 
623
output_message_get_size (const NiceOutputMessage *message)
 
624
{
 
625
  guint i;
 
626
  gsize message_len = 0;
 
627
 
 
628
  /* Find the total size of the message */
 
629
  for (i = 0;
 
630
       (message->n_buffers >= 0 && i < (guint) message->n_buffers) ||
 
631
           (message->n_buffers < 0 && message->buffers[i].buffer != NULL);
 
632
       i++)
 
633
    message_len += message->buffers[i].size;
 
634
 
 
635
  return message_len;
 
636
}
 
637
 
 
638
/* Similar to notify_transmitted_buffer(), except it operates on an array of
 
639
 * messages from generate_messages_to_transmit(). */
 
640
static void
 
641
notify_transmitted_messages (TestIOStreamThreadData *data, gsize buffer_offset,
 
642
    NiceOutputMessage **messages, guint n_messages, gint n_sent_messages)
 
643
{
 
644
  TestData *test_data = data->user_data;
 
645
  guint i;
 
646
 
 
647
  g_assert_cmpint (n_sent_messages, <=, n_messages);
 
648
  g_assert_cmpint (n_sent_messages, >=, 0);
 
649
 
 
650
  test_data->transmitted_messages += n_sent_messages;
 
651
 
 
652
  for (i = 0; i < n_messages; i++) {
 
653
    NiceOutputMessage *message = &((*messages)[i]);
 
654
    guint j;
 
655
 
 
656
    if (i < (guint) n_sent_messages)
 
657
      test_data->transmitted_bytes += output_message_get_size (message);
 
658
 
 
659
    for (j = 0; j < (guint) message->n_buffers; j++) {
 
660
      GOutputVector *buffer = &message->buffers[j];
 
661
 
 
662
      g_free ((guint8 *) buffer->buffer);
 
663
    }
 
664
 
 
665
    g_free (message->buffers);
 
666
  }
 
667
 
 
668
  g_free (*messages);
 
669
}
 
670
 
 
671
/*
 
672
 * Implementation using nice_agent_recv_messages() and nice_agent_send().
 
673
 */
 
674
static void
 
675
read_thread_agent_cb (GInputStream *input_stream, TestIOStreamThreadData *data)
 
676
{
 
677
  TestData *test_data = data->user_data;
 
678
  guint stream_id, component_id;
 
679
  gpointer tmp;
 
680
 
 
681
  tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
 
682
  stream_id = GPOINTER_TO_UINT (tmp);
 
683
  component_id = 1;
 
684
 
 
685
  while (test_data->received_bytes < test_data->n_bytes) {
 
686
    GError *error = NULL;
 
687
    NiceInputMessage *messages;
 
688
    guint n_messages;
 
689
    gint n_valid_messages;
 
690
 
 
691
    /* Initialise an array of messages to receive into. */
 
692
    generate_messages_to_receive (data, test_data->received_bytes, &messages,
 
693
        &n_messages, test_data->n_bytes - test_data->received_bytes,
 
694
        test_data->n_messages - test_data->received_messages);
 
695
 
 
696
    /* Block on receiving some data. */
 
697
    n_valid_messages = nice_agent_recv_messages (data->agent, stream_id,
 
698
        component_id, messages, n_messages, NULL, &error);
 
699
    g_assert_no_error (error);
 
700
 
 
701
    /* Check the messages and update the test’s state machine. */
 
702
    validate_received_messages (data, test_data->received_bytes, messages,
 
703
        n_messages, n_valid_messages);
 
704
  }
 
705
 
 
706
  check_for_termination (data, &test_data->received_bytes,
 
707
      test_data->other_received_bytes, &test_data->transmitted_bytes,
 
708
      test_data->n_bytes);
 
709
}
 
710
 
 
711
static void
 
712
write_thread_agent_cb (GOutputStream *output_stream,
 
713
    TestIOStreamThreadData *data)
 
714
{
 
715
  TestData *test_data = data->user_data;
 
716
  guint stream_id, component_id;
 
717
  gpointer tmp;
 
718
 
 
719
  tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
 
720
  stream_id = GPOINTER_TO_UINT (tmp);
 
721
  component_id = 1;
 
722
 
 
723
  while (test_data->transmitted_bytes < test_data->n_bytes) {
 
724
    GError *error = NULL;
 
725
    NiceOutputMessage *messages;
 
726
    guint n_messages;
 
727
    gint n_sent_messages;
 
728
 
 
729
    /* Generate a buffer to transmit. */
 
730
    generate_messages_to_transmit (data, test_data->transmitted_bytes,
 
731
        &messages, &n_messages);
 
732
 
 
733
    /* Busy loop on receiving some data. */
 
734
    do {
 
735
      g_clear_error (&error);
 
736
      n_sent_messages = nice_agent_send_messages_nonblocking (data->agent,
 
737
          stream_id, component_id, messages, n_messages, NULL, &error);
 
738
    } while (n_sent_messages == -1 &&
 
739
        g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK));
 
740
    g_assert_no_error (error);
 
741
 
 
742
    /* Update the test’s buffer generation state machine. */
 
743
    notify_transmitted_messages (data, test_data->transmitted_bytes, &messages,
 
744
        n_messages, n_sent_messages);
 
745
  }
 
746
}
 
747
 
 
748
/*
 
749
 * Implementation using nice_agent_recv_nonblocking() and
 
750
 * nice_agent_send_nonblocking().
 
751
 */
 
752
static void
 
753
read_thread_agent_nonblocking_cb (GInputStream *input_stream,
 
754
    TestIOStreamThreadData *data)
 
755
{
 
756
  TestData *test_data = data->user_data;
 
757
  guint stream_id, component_id;
 
758
  gpointer tmp;
 
759
 
 
760
  tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
 
761
  stream_id = GPOINTER_TO_UINT (tmp);
 
762
  component_id = 1;
 
763
 
 
764
  while (test_data->received_bytes < test_data->n_bytes) {
 
765
    GError *error = NULL;
 
766
    NiceInputMessage *messages;
 
767
    guint n_messages;
 
768
    gint n_valid_messages;
 
769
 
 
770
    /* Initialise an array of messages to receive into. */
 
771
    generate_messages_to_receive (data, test_data->received_bytes, &messages,
 
772
        &n_messages, test_data->n_bytes - test_data->received_bytes,
 
773
        test_data->n_messages - test_data->received_messages);
 
774
 
 
775
    /* Trim n_messages to avoid consuming the ‘done’ message. */
 
776
    n_messages =
 
777
        MIN (n_messages, test_data->n_messages - test_data->received_messages);
 
778
 
 
779
    /* Busy loop on receiving some data. */
 
780
    do {
 
781
      g_clear_error (&error);
 
782
      n_valid_messages = nice_agent_recv_messages_nonblocking (data->agent,
 
783
          stream_id, component_id, messages, n_messages, NULL, &error);
 
784
    } while (n_valid_messages == -1 &&
 
785
        g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK));
 
786
    g_assert_no_error (error);
 
787
 
 
788
    /* Check the messages and update the test’s state machine. */
 
789
    validate_received_messages (data, test_data->received_bytes, messages,
 
790
        n_messages, n_valid_messages);
 
791
  }
 
792
 
 
793
  check_for_termination (data, &test_data->received_bytes,
 
794
      test_data->other_received_bytes, &test_data->transmitted_bytes,
 
795
      test_data->n_bytes);
 
796
}
 
797
 
 
798
static void
 
799
write_thread_agent_nonblocking_cb (GOutputStream *output_stream,
 
800
    TestIOStreamThreadData *data)
 
801
{
 
802
  /* FIXME: There is no nice_agent_send_nonblocking(); nice_agent_send() is
 
803
   * non-blocking by default. */
 
804
  write_thread_agent_cb (output_stream, data);
 
805
}
 
806
 
 
807
/*
 
808
 * Implementation using NiceInputStream and NiceOutputStream.
 
809
 */
 
810
static void
 
811
read_thread_gio_cb (GInputStream *input_stream, TestIOStreamThreadData *data)
 
812
{
 
813
  TestData *test_data = data->user_data;
 
814
 
 
815
  while (test_data->received_bytes < test_data->n_bytes) {
 
816
    GError *error = NULL;
 
817
    guint8 *buf = NULL;
 
818
    gsize buf_len = 0;
 
819
    gssize len;
 
820
 
 
821
    /* Initialise a receive buffer. */
 
822
    generate_buffer_to_receive (data, test_data->received_bytes, &buf,
 
823
        &buf_len);
 
824
 
 
825
    /* Trim the receive buffer to avoid blocking on bytes which will never
 
826
     * appear. */
 
827
    buf_len = MIN (buf_len, test_data->n_bytes - test_data->received_bytes);
 
828
 
 
829
    /* Block on receiving some data. */
 
830
    len = g_input_stream_read (input_stream, buf, buf_len, NULL, &error);
 
831
    g_assert_no_error (error);
 
832
 
 
833
    /* Check the buffer and update the test’s state machine. */
 
834
    validate_received_buffer (data, test_data->received_bytes, &buf, buf_len,
 
835
        len);
 
836
  }
 
837
 
 
838
  check_for_termination (data, &test_data->received_bytes,
 
839
      test_data->other_received_bytes, &test_data->transmitted_bytes,
 
840
      test_data->n_bytes);
 
841
}
 
842
 
 
843
static void
 
844
write_thread_gio_cb (GOutputStream *output_stream, TestIOStreamThreadData *data)
 
845
{
 
846
  TestData *test_data = data->user_data;
 
847
 
 
848
  while (test_data->transmitted_bytes < test_data->n_bytes) {
 
849
    GError *error = NULL;
 
850
    guint8 *buf = NULL;
 
851
    gsize buf_len = 0;
 
852
    gssize len;
 
853
    gsize total_len = 0;
 
854
 
 
855
    /* Generate a buffer to transmit. */
 
856
    generate_buffer_to_transmit (data, test_data->transmitted_bytes, &buf,
 
857
        &buf_len);
 
858
 
 
859
    /* Transmit it. */
 
860
    do {
 
861
      len = g_output_stream_write (output_stream, buf + total_len,
 
862
          buf_len - total_len, NULL, &error);
 
863
      g_assert_no_error (error);
 
864
      total_len += len;
 
865
    } while (total_len < buf_len);
 
866
 
 
867
    /* Update the test’s buffer generation state machine. */
 
868
    notify_transmitted_buffer (data, test_data->transmitted_bytes, &buf,
 
869
        buf_len, total_len);
 
870
  }
 
871
}
 
872
 
 
873
/*
 
874
 * Implementation using GPollableInputStream and GPollableOutputStream.
 
875
 *
 
876
 * GSourceData is effectively the closure for the ‘for’ loop in other stream API
 
877
 * implementations.
 
878
 */
 
879
typedef struct {
 
880
  TestIOStreamThreadData *data;
 
881
  GMainLoop *main_loop;
 
882
} GSourceData;
 
883
 
 
884
static gboolean
 
885
read_stream_cb (GObject *pollable_stream, gpointer _user_data)
 
886
{
 
887
  GSourceData *gsource_data = _user_data;
 
888
  TestIOStreamThreadData *data = gsource_data->data;
 
889
  TestData *test_data = data->user_data;
 
890
  GError *error = NULL;
 
891
  guint8 *buf = NULL;
 
892
  gsize buf_len = 0;
 
893
  gssize len;
 
894
 
 
895
  /* Initialise a receive buffer. */
 
896
  generate_buffer_to_receive (data, test_data->received_bytes, &buf, &buf_len);
 
897
 
 
898
  /* Trim the receive buffer to avoid consuming the ‘done’ message. */
 
899
  buf_len = MIN (buf_len, test_data->n_bytes - test_data->received_bytes);
 
900
 
 
901
  /* Try to receive some data. */
 
902
  len = g_pollable_input_stream_read_nonblocking (
 
903
      G_POLLABLE_INPUT_STREAM (pollable_stream), buf, buf_len, NULL, &error);
 
904
 
 
905
  if (len == -1) {
 
906
    g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
 
907
    g_free (buf);
 
908
    return TRUE;
 
909
  }
 
910
 
 
911
  g_assert_no_error (error);
 
912
 
 
913
  /* Check the buffer and update the test’s state machine. */
 
914
  validate_received_buffer (data, test_data->received_bytes, &buf, buf_len,
 
915
      len);
 
916
 
 
917
  /* Termination time? */
 
918
  if (test_data->received_bytes == test_data->n_bytes) {
 
919
    g_main_loop_quit (gsource_data->main_loop);
 
920
    return FALSE;
 
921
  }
 
922
 
 
923
  return TRUE;
 
924
}
 
925
 
 
926
static void
 
927
read_thread_gsource_cb (GInputStream *input_stream,
 
928
    TestIOStreamThreadData *data)
 
929
{
 
930
  TestData *test_data = data->user_data;
 
931
  GSourceData gsource_data;
 
932
  GMainContext *main_context;
 
933
  GMainLoop *main_loop;
 
934
  GSource *stream_source;
 
935
 
 
936
  main_context = g_main_context_ref_thread_default ();
 
937
  main_loop = g_main_loop_new (main_context, FALSE);
 
938
 
 
939
  gsource_data.data = data;
 
940
  gsource_data.main_loop = main_loop;
 
941
 
 
942
  stream_source =
 
943
      g_pollable_input_stream_create_source (
 
944
          G_POLLABLE_INPUT_STREAM (input_stream), NULL);
 
945
 
 
946
  g_source_set_callback (stream_source, (GSourceFunc) read_stream_cb,
 
947
      &gsource_data, NULL);
 
948
  g_source_attach (stream_source, main_context);
 
949
 
 
950
  /* Run the main loop. */
 
951
  g_main_loop_run (main_loop);
 
952
 
 
953
  g_source_destroy (stream_source);
 
954
  g_source_unref (stream_source);
 
955
  g_main_loop_unref (main_loop);
 
956
  g_main_context_unref (main_context);
 
957
 
 
958
  /* Termination? */
 
959
  check_for_termination (data, &test_data->received_bytes,
 
960
      test_data->other_received_bytes, &test_data->transmitted_bytes,
 
961
      test_data->n_bytes);
 
962
}
 
963
 
 
964
static gboolean
 
965
write_stream_cb (GObject *pollable_stream, gpointer _user_data)
 
966
{
 
967
  GSourceData *gsource_data = _user_data;
 
968
  TestIOStreamThreadData *data = gsource_data->data;
 
969
  TestData *test_data = data->user_data;
 
970
  GError *error = NULL;
 
971
  guint8 *buf = NULL;
 
972
  gsize buf_len = 0;
 
973
  gssize len;
 
974
 
 
975
  /* Initialise a receive buffer. */
 
976
  generate_buffer_to_transmit (data, test_data->transmitted_bytes, &buf,
 
977
      &buf_len);
 
978
 
 
979
  /* Try to transmit some data. */
 
980
  len = g_pollable_output_stream_write_nonblocking (
 
981
      G_POLLABLE_OUTPUT_STREAM (pollable_stream), buf, buf_len, NULL, &error);
 
982
 
 
983
  if (len == -1) {
 
984
    g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
 
985
    g_free (buf);
 
986
    return TRUE;
 
987
  }
 
988
 
 
989
  g_assert_no_error (error);
 
990
 
 
991
  /* Update the test’s buffer generation state machine. */
 
992
  notify_transmitted_buffer (data, test_data->transmitted_bytes, &buf, buf_len,
 
993
      len);
 
994
 
 
995
  /* Termination time? */
 
996
  if (test_data->transmitted_bytes == test_data->n_bytes) {
 
997
    g_main_loop_quit (gsource_data->main_loop);
 
998
    return FALSE;
 
999
  }
 
1000
 
 
1001
  return TRUE;
 
1002
}
 
1003
 
 
1004
static void
 
1005
write_thread_gsource_cb (GOutputStream *output_stream,
 
1006
    TestIOStreamThreadData *data)
 
1007
{
 
1008
  GSourceData gsource_data;
 
1009
  GMainContext *main_context;
 
1010
  GMainLoop *main_loop;
 
1011
  GSource *stream_source;
 
1012
 
 
1013
  main_context = g_main_context_ref_thread_default ();
 
1014
  main_loop = g_main_loop_new (main_context, FALSE);
 
1015
 
 
1016
  gsource_data.data = data;
 
1017
  gsource_data.main_loop = main_loop;
 
1018
 
 
1019
  stream_source =
 
1020
      g_pollable_output_stream_create_source (
 
1021
          G_POLLABLE_OUTPUT_STREAM (output_stream), NULL);
 
1022
 
 
1023
  g_source_set_callback (stream_source, (GSourceFunc) write_stream_cb,
 
1024
      &gsource_data, NULL);
 
1025
  g_source_attach (stream_source, main_context);
 
1026
 
 
1027
  /* Run the main loop. */
 
1028
  g_main_loop_run (main_loop);
 
1029
 
 
1030
  g_source_destroy (stream_source);
 
1031
  g_source_unref (stream_source);
 
1032
  g_main_loop_unref (main_loop);
 
1033
  g_main_context_unref (main_context);
 
1034
}
 
1035
 
 
1036
static void
 
1037
test_data_init (TestData *data, gboolean reliable, StreamApi stream_api,
 
1038
    gsize n_bytes, guint n_messages,
 
1039
    BufferSizeStrategy transmit_buffer_size_strategy,
 
1040
    BufferCountStrategy transmit_buffer_count_strategy,
 
1041
    MessageCountStrategy transmit_message_count_strategy,
 
1042
    BufferSizeStrategy receive_buffer_size_strategy,
 
1043
    BufferCountStrategy receive_buffer_count_strategy,
 
1044
    MessageCountStrategy receive_message_count_strategy,
 
1045
    BufferDataStrategy buffer_data_strategy, guint32 transmit_seed,
 
1046
    guint32 receive_seed, gsize *other_received_bytes,
 
1047
    guint *other_received_messages)
 
1048
{
 
1049
  data->reliable = reliable;
 
1050
  data->stream_api = stream_api;
 
1051
  data->n_bytes = n_bytes;
 
1052
  data->n_messages = n_messages;
 
1053
  data->transmit.buffer_size_strategy = transmit_buffer_size_strategy;
 
1054
  data->transmit.buffer_count_strategy = transmit_buffer_count_strategy;
 
1055
  data->transmit.message_count_strategy = transmit_message_count_strategy;
 
1056
  data->receive.buffer_size_strategy = receive_buffer_size_strategy;
 
1057
  data->receive.buffer_count_strategy = receive_buffer_count_strategy;
 
1058
  data->receive.message_count_strategy = receive_message_count_strategy;
 
1059
  data->buffer_data_strategy = buffer_data_strategy;
 
1060
  data->transmit_size_rand = g_rand_new_with_seed (transmit_seed);
 
1061
  data->receive_size_rand = g_rand_new_with_seed (receive_seed);
 
1062
  data->transmitted_bytes = 0;
 
1063
  data->received_bytes = 0;
 
1064
  data->other_received_bytes = other_received_bytes;
 
1065
  data->transmitted_messages = 0;
 
1066
  data->received_messages = 0;
 
1067
  data->other_received_messages = other_received_messages;
 
1068
}
 
1069
 
 
1070
/*
 
1071
 * Test closures.
 
1072
 */
 
1073
static void
 
1074
test_data_clear (TestData *data)
 
1075
{
 
1076
  g_rand_free (data->receive_size_rand);
 
1077
  g_rand_free (data->transmit_size_rand);
 
1078
}
 
1079
 
 
1080
static void
 
1081
test (gboolean reliable, StreamApi stream_api, gsize n_bytes, guint n_messages,
 
1082
    BufferSizeStrategy transmit_buffer_size_strategy,
 
1083
    BufferCountStrategy transmit_buffer_count_strategy,
 
1084
    MessageCountStrategy transmit_message_count_strategy,
 
1085
    BufferSizeStrategy receive_buffer_size_strategy,
 
1086
    BufferCountStrategy receive_buffer_count_strategy,
 
1087
    MessageCountStrategy receive_message_count_strategy,
 
1088
    BufferDataStrategy buffer_data_strategy,
 
1089
    guint32 transmit_seed, guint32 receive_seed,
 
1090
    guint deadlock_timeout)
 
1091
{
 
1092
  TestData l_data, r_data;
 
1093
 
 
1094
  /* Indexed by StreamApi. */
 
1095
  const TestIOStreamCallbacks callbacks[] = {
 
1096
    { read_thread_agent_cb,
 
1097
      write_thread_agent_cb, NULL, NULL, },  /* STREAM_AGENT */
 
1098
    { read_thread_agent_nonblocking_cb, write_thread_agent_nonblocking_cb,
 
1099
      NULL, NULL, },  /* STREAM_AGENT_NONBLOCKING */
 
1100
    { read_thread_gio_cb, write_thread_gio_cb, NULL, NULL, },  /* STREAM_GIO */
 
1101
    { read_thread_gsource_cb, write_thread_gsource_cb,
 
1102
      NULL, NULL },  /* STREAM_GSOURCE */
 
1103
  };
 
1104
 
 
1105
  test_data_init (&l_data, reliable, stream_api, n_bytes, n_messages,
 
1106
      transmit_buffer_size_strategy, transmit_buffer_count_strategy,
 
1107
      transmit_message_count_strategy, receive_buffer_size_strategy,
 
1108
      receive_buffer_count_strategy, receive_message_count_strategy,
 
1109
      buffer_data_strategy, transmit_seed, receive_seed,
 
1110
      &r_data.received_bytes, &r_data.received_messages);
 
1111
  test_data_init (&r_data, reliable, stream_api, n_bytes, n_messages,
 
1112
      transmit_buffer_size_strategy, transmit_buffer_count_strategy,
 
1113
      transmit_message_count_strategy, receive_buffer_size_strategy,
 
1114
      receive_buffer_count_strategy, receive_message_count_strategy,
 
1115
      buffer_data_strategy, transmit_seed, receive_seed,
 
1116
      &l_data.received_bytes, &l_data.received_messages);
 
1117
 
 
1118
  run_io_stream_test (deadlock_timeout, reliable, &callbacks[stream_api],
 
1119
      &l_data, NULL, &r_data, NULL);
 
1120
 
 
1121
  test_data_clear (&r_data);
 
1122
  test_data_clear (&l_data);
 
1123
}
 
1124
 
 
1125
/* Options with default values. */
 
1126
guint32 option_transmit_seed = 0;
 
1127
guint32 option_receive_seed = 0;
 
1128
gsize option_n_bytes = 10000;
 
1129
guint option_n_messages = 50;
 
1130
guint option_timeout = 1200;  /* seconds */
 
1131
gboolean option_long_mode = FALSE;
 
1132
 
 
1133
static GOptionEntry entries[] = {
 
1134
  { "transmit-seed", 0, 0, G_OPTION_ARG_INT, &option_transmit_seed,
 
1135
    "Seed for transmission RNG", "S" },
 
1136
  { "receive-seed", 0, 0, G_OPTION_ARG_INT, &option_receive_seed,
 
1137
    "Seed for reception RNG", "S" },
 
1138
  { "n-bytes", 'n', 0, G_OPTION_ARG_INT64, &option_n_bytes,
 
1139
    "Number of bytes to send in each test (default 10000)", "N" },
 
1140
  { "n-messages", 'm', 0, G_OPTION_ARG_INT64, &option_n_messages,
 
1141
    "Number of messages to send in each test (default 50)", "M" },
 
1142
  { "timeout", 't', 0, G_OPTION_ARG_INT, &option_timeout,
 
1143
    "Deadlock detection timeout length, in seconds (default: 1200)", "S" },
 
1144
  { "long-mode", 'l', 0, G_OPTION_ARG_NONE, &option_long_mode,
 
1145
    "Enable all tests, rather than a fast subset", NULL },
 
1146
  { NULL },
 
1147
};
 
1148
 
 
1149
int
 
1150
main (int argc, char *argv[])
 
1151
{
 
1152
  gboolean reliable;
 
1153
  StreamApi stream_api;
 
1154
  BufferSizeStrategy transmit_buffer_size_strategy;
 
1155
  BufferCountStrategy transmit_buffer_count_strategy;
 
1156
  MessageCountStrategy transmit_message_count_strategy;
 
1157
  BufferSizeStrategy receive_buffer_size_strategy;
 
1158
  BufferCountStrategy receive_buffer_count_strategy;
 
1159
  MessageCountStrategy receive_message_count_strategy;
 
1160
  BufferDataStrategy buffer_data_strategy;
 
1161
  guint32 transmit_seed;
 
1162
  guint32 receive_seed;
 
1163
  gsize n_bytes;
 
1164
  guint n_messages;
 
1165
  guint deadlock_timeout;
 
1166
  gboolean long_mode;
 
1167
  GOptionContext *context;
 
1168
  GError *error = NULL;
 
1169
 
 
1170
  /* Argument parsing. Allow some of the test parameters to be specified on the
 
1171
   * command line. */
 
1172
  context = g_option_context_new ("— test send()/recv() correctness");
 
1173
  g_option_context_add_main_entries (context, entries, NULL);
 
1174
 
 
1175
  if (!g_option_context_parse (context, &argc, &argv, &error)) {
 
1176
    g_printerr ("Option parsing failed: %s\n", error->message);
 
1177
    g_error_free (error);
 
1178
    exit (1);
 
1179
  }
 
1180
 
 
1181
  /* Set up the defaults. */
 
1182
  transmit_seed = option_transmit_seed;
 
1183
  receive_seed = option_receive_seed;
 
1184
  n_bytes = option_n_bytes;
 
1185
  n_messages = option_n_messages;
 
1186
  deadlock_timeout = option_timeout;
 
1187
  long_mode = option_long_mode;
 
1188
 
 
1189
#ifdef G_OS_WIN32
 
1190
  WSADATA w;
 
1191
  WSAStartup (0x0202, &w);
 
1192
#endif
 
1193
  g_type_init ();
 
1194
  g_thread_init (NULL);
 
1195
 
 
1196
  if (!long_mode) {
 
1197
    /* Quick mode. Just test each of the stream APIs in reliable and
 
1198
     * non-reliable mode, with a single pair of buffer strategies, and a single
 
1199
     * data strategy. */
 
1200
 
 
1201
    /* Reliability. */
 
1202
    for (reliable = 0; reliable < 2; reliable++) {
 
1203
      /* Stream API. */
 
1204
      for (stream_api = 0;
 
1205
           (guint) stream_api < STREAM_API_N_ELEMENTS;
 
1206
           stream_api++) {
 
1207
        /* GIO streams must always be reliable. */
 
1208
        if (!reliable && stream_api_is_reliable_only (stream_api))
 
1209
          continue;
 
1210
 
 
1211
        /* Non-reliable socket receives require large buffers. */
 
1212
        if (reliable) {
 
1213
          receive_buffer_size_strategy = BUFFER_SIZE_RANDOM;
 
1214
        } else {
 
1215
          receive_buffer_size_strategy = BUFFER_SIZE_CONSTANT_LARGE;
 
1216
        }
 
1217
 
 
1218
        transmit_buffer_size_strategy = BUFFER_SIZE_RANDOM;
 
1219
        buffer_data_strategy = BUFFER_DATA_PSEUDO_RANDOM;
 
1220
 
 
1221
        if (stream_api_supports_vectored_io (stream_api)) {
 
1222
          transmit_buffer_count_strategy = BUFFER_COUNT_RANDOM;
 
1223
          transmit_message_count_strategy = MESSAGE_COUNT_RANDOM;
 
1224
          receive_buffer_count_strategy = BUFFER_COUNT_RANDOM;
 
1225
          receive_message_count_strategy = MESSAGE_COUNT_RANDOM;
 
1226
        } else {
 
1227
          transmit_buffer_count_strategy = BUFFER_COUNT_CONSTANT_ONE;
 
1228
          transmit_message_count_strategy = MESSAGE_COUNT_CONSTANT_ONE;
 
1229
          receive_buffer_count_strategy = BUFFER_COUNT_CONSTANT_ONE;
 
1230
          receive_message_count_strategy = MESSAGE_COUNT_CONSTANT_ONE;
 
1231
        }
 
1232
 
 
1233
        g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, %u, "
 
1234
            "%u, %u, %u, %u)…",
 
1235
            reliable, stream_api, n_bytes, n_messages,
 
1236
            transmit_buffer_size_strategy,
 
1237
            receive_buffer_size_strategy, buffer_data_strategy,
 
1238
            transmit_seed, receive_seed);
 
1239
        test (reliable, stream_api, n_bytes, n_messages,
 
1240
            transmit_buffer_size_strategy,
 
1241
            transmit_buffer_count_strategy, transmit_message_count_strategy,
 
1242
            receive_buffer_size_strategy, receive_buffer_count_strategy,
 
1243
            receive_message_count_strategy, buffer_data_strategy,
 
1244
            transmit_seed, receive_seed,
 
1245
            deadlock_timeout);
 
1246
      }
 
1247
    }
 
1248
 
 
1249
    goto done;
 
1250
  }
 
1251
 
 
1252
#define STRATEGY_LOOP(V, L) for (V = 0; (guint) V < L##_N_ELEMENTS; V++)
 
1253
  STRATEGY_LOOP(transmit_buffer_size_strategy, BUFFER_SIZE_STRATEGY)
 
1254
  STRATEGY_LOOP(transmit_buffer_count_strategy, BUFFER_COUNT_STRATEGY)
 
1255
  STRATEGY_LOOP(transmit_message_count_strategy, MESSAGE_COUNT_STRATEGY)
 
1256
  STRATEGY_LOOP(receive_buffer_size_strategy, BUFFER_SIZE_STRATEGY)
 
1257
  STRATEGY_LOOP(receive_buffer_count_strategy, BUFFER_COUNT_STRATEGY)
 
1258
  STRATEGY_LOOP(receive_message_count_strategy, MESSAGE_COUNT_STRATEGY)
 
1259
  STRATEGY_LOOP(buffer_data_strategy, BUFFER_DATA_STRATEGY)
 
1260
  /* Reliability. */
 
1261
  for (reliable = 0; reliable < 2; reliable++) {
 
1262
    /* Stream API. */
 
1263
    for (stream_api = 0;
 
1264
         (guint) stream_api < STREAM_API_N_ELEMENTS;
 
1265
         stream_api++) {
 
1266
      /* GIO streams must always be reliable. */
 
1267
      if (!reliable && stream_api_is_reliable_only (stream_api))
 
1268
        continue;
 
1269
 
 
1270
      /* Non-reliable socket receives require large buffers. We don’t claim to
 
1271
       * support using them with small (< 65536B) buffers, so don’t test
 
1272
       * them. */
 
1273
      if (!reliable &&
 
1274
          receive_buffer_size_strategy != BUFFER_SIZE_CONSTANT_LARGE)
 
1275
        continue;
 
1276
 
 
1277
      /* Non-reliable socket transmits will always block with huge buffers. */
 
1278
      if (!reliable &&
 
1279
          transmit_buffer_size_strategy == BUFFER_SIZE_CONSTANT_LARGE)
 
1280
        continue;
 
1281
 
 
1282
      /* Stream APIs which don’t support vectored I/O must not be passed
 
1283
       * I/O vectors. */
 
1284
      if (!stream_api_supports_vectored_io (stream_api) &&
 
1285
          (transmit_buffer_count_strategy != BUFFER_COUNT_CONSTANT_ONE ||
 
1286
           transmit_message_count_strategy != MESSAGE_COUNT_CONSTANT_ONE ||
 
1287
           receive_buffer_count_strategy != BUFFER_COUNT_CONSTANT_ONE ||
 
1288
           receive_message_count_strategy != MESSAGE_COUNT_CONSTANT_ONE))
 
1289
        continue;
 
1290
 
 
1291
      g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, %u, "
 
1292
          "%u, %u, %u, %u, %u, %u, %u, %u)…",
 
1293
          reliable, stream_api, n_bytes, n_messages,
 
1294
          transmit_buffer_size_strategy,
 
1295
          transmit_buffer_count_strategy, transmit_message_count_strategy,
 
1296
          receive_buffer_size_strategy, receive_buffer_count_strategy,
 
1297
          receive_message_count_strategy, buffer_data_strategy,
 
1298
          transmit_seed, receive_seed);
 
1299
      test (reliable, stream_api, n_bytes, n_messages,
 
1300
          transmit_buffer_size_strategy,
 
1301
          transmit_buffer_count_strategy, transmit_message_count_strategy,
 
1302
          receive_buffer_size_strategy, receive_buffer_count_strategy,
 
1303
          receive_message_count_strategy, buffer_data_strategy,
 
1304
          transmit_seed, receive_seed,
 
1305
          deadlock_timeout);
 
1306
    }
 
1307
  }
 
1308
 
 
1309
done:
 
1310
#ifdef G_OS_WIN32
 
1311
  WSACleanup ();
 
1312
#endif
 
1313
 
 
1314
  return 0;
 
1315
}