2
* This file is part of the Nice GLib ICE library.
4
* (C) 2014 Collabora Ltd.
5
* Contact: Philip Withnall
7
* The contents of this file are subject to the Mozilla Public License Version
8
* 1.1 (the "License"); you may not use this file except in compliance with
9
* the License. You may obtain a copy of the License at
10
* http://www.mozilla.org/MPL/
12
* Software distributed under the License is distributed on an "AS IS" basis,
13
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14
* for the specific language governing rights and limitations under the
17
* The Original Code is the Nice GLib ICE library.
19
* The Initial Developers of the Original Code are Collabora Ltd and Nokia
20
* Corporation. All Rights Reserved.
23
* Philip Withnall, Collabora Ltd.
25
* Alternatively, the contents of this file may be used under the terms of the
26
* the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
27
* case the provisions of LGPL are applicable instead of those above. If you
28
* wish to allow use of your version of this file only under the terms of the
29
* LGPL and not to allow others to use your version of this file under the
30
* MPL, indicate your decision by deleting the provisions above and replace
31
* them with the notice and other provisions required by the LGPL. If you do
32
* not delete the provisions above, a recipient may use your version of this
33
* file under either the MPL or the LGPL.
40
#include "test-io-stream-common.h"
52
static gboolean timer_cb (gpointer pointer)
54
g_debug ("test-thread:%s: %p", G_STRFUNC, pointer);
56
/* note: should not be reached, abort */
57
g_debug ("ERROR: test has got stuck, aborting...");
62
wait_for_start (TestIOStreamThreadData *data)
64
g_mutex_lock (data->start_mutex);
65
(*data->start_count)--;
66
g_cond_broadcast (data->start_cond);
67
while (*data->start_count > 0)
68
g_cond_wait (data->start_cond, data->start_mutex);
69
g_mutex_unlock (data->start_mutex);
73
write_thread_cb (gpointer user_data)
75
TestIOStreamThreadData *data = user_data;
76
GMainContext *main_context;
77
GOutputStream *output_stream = NULL;
79
main_context = g_main_context_new ();
80
g_main_context_push_thread_default (main_context);
82
/* Synchronise thread starting. */
83
wait_for_start (data);
85
/* Wait for the stream to be writeable. */
86
g_mutex_lock (&data->write_mutex);
87
while (!(data->stream_open && data->stream_ready))
88
g_cond_wait (&data->write_cond, &data->write_mutex);
89
g_mutex_unlock (&data->write_mutex);
92
output_stream = g_io_stream_get_output_stream (data->io_stream);
93
data->callbacks->write_thread (output_stream, data);
95
g_main_context_pop_thread_default (main_context);
96
g_main_context_unref (main_context);
102
read_thread_cb (gpointer user_data)
104
TestIOStreamThreadData *data = user_data;
105
GMainContext *main_context;
106
GInputStream *input_stream = NULL;
108
main_context = g_main_context_new ();
109
g_main_context_push_thread_default (main_context);
111
/* Synchronise thread starting. */
112
wait_for_start (data);
115
input_stream = g_io_stream_get_input_stream (data->io_stream);
116
data->callbacks->read_thread (input_stream, data);
118
g_main_context_pop_thread_default (main_context);
119
g_main_context_unref (main_context);
125
main_thread_cb (gpointer user_data)
127
TestIOStreamThreadData *data = user_data;
129
g_main_context_push_thread_default (data->main_context);
131
/* Synchronise thread starting. */
132
wait_for_start (data);
134
/* Run the main context. */
135
g_main_loop_run (data->main_loop);
137
g_main_context_pop_thread_default (data->main_context);
143
candidate_gathering_done_cb (NiceAgent *agent, guint stream_id,
146
NiceAgent *other = g_object_get_data (G_OBJECT (agent), "other-agent");
147
gchar *ufrag = NULL, *password = NULL;
152
tmp = g_object_get_data (G_OBJECT (agent), "stream-id");
153
id = GPOINTER_TO_UINT (tmp);
154
tmp = g_object_get_data (G_OBJECT (other), "stream-id");
155
other_id = GPOINTER_TO_UINT (tmp);
157
nice_agent_get_local_credentials (agent, id, &ufrag, &password);
158
nice_agent_set_remote_credentials (other,
159
other_id, ufrag, password);
163
cands = nice_agent_get_local_candidates (agent, id, 1);
164
g_assert (cands != NULL);
166
nice_agent_set_remote_candidates (other, other_id, 1, cands);
168
for (i = cands; i; i = i->next)
169
nice_candidate_free ((NiceCandidate *) i->data);
170
g_slist_free (cands);
174
reliable_transport_writable_cb (NiceAgent *agent, guint stream_id,
175
guint component_id, gpointer user_data)
177
TestIOStreamThreadData *data = user_data;
179
g_assert (data->reliable);
181
/* Signal writeability. */
182
g_mutex_lock (&data->write_mutex);
183
data->stream_open = TRUE;
184
g_cond_broadcast (&data->write_cond);
185
g_mutex_unlock (&data->write_mutex);
187
if (data->callbacks->reliable_transport_writable != NULL) {
188
GIOStream *io_stream;
189
GOutputStream *output_stream;
191
io_stream = g_object_get_data (G_OBJECT (agent), "io-stream");
192
g_assert (io_stream != NULL);
193
output_stream = g_io_stream_get_output_stream (io_stream);
195
data->callbacks->reliable_transport_writable (output_stream, agent,
196
stream_id, component_id, data);
201
component_state_changed_cb (NiceAgent *agent, guint stream_id,
202
guint component_id, guint state, gpointer user_data)
204
TestIOStreamThreadData *data = user_data;
206
if (state != NICE_COMPONENT_STATE_READY)
209
/* Signal stream state. */
210
g_mutex_lock (&data->write_mutex);
211
data->stream_ready = TRUE;
212
g_cond_broadcast (&data->write_cond);
213
g_mutex_unlock (&data->write_mutex);
217
new_selected_pair_cb (NiceAgent *agent, guint stream_id, guint component_id,
218
gchar *lfoundation, gchar *rfoundation, gpointer user_data)
220
TestIOStreamThreadData *data = user_data;
222
if (data->callbacks->new_selected_pair != NULL) {
223
data->callbacks->new_selected_pair (agent, stream_id, component_id,
224
lfoundation, rfoundation, data);
229
create_agent (gboolean controlling_mode, TestIOStreamThreadData *data,
230
GMainContext **main_context, GMainLoop **main_loop)
233
NiceAddress base_addr;
234
const gchar *stun_server, *stun_server_port;
236
/* Create main contexts. */
237
*main_context = g_main_context_new ();
238
*main_loop = g_main_loop_new (*main_context, FALSE);
240
/* Use Google compatibility to ignore credentials. */
242
agent = nice_agent_new_reliable (*main_context, NICE_COMPATIBILITY_GOOGLE);
244
agent = nice_agent_new (*main_context, NICE_COMPATIBILITY_GOOGLE);
246
g_object_set (G_OBJECT (agent),
247
"controlling-mode", controlling_mode,
251
/* Specify which local interface to use. */
252
g_assert (nice_address_set_from_string (&base_addr, "127.0.0.1"));
253
nice_agent_add_local_address (agent, &base_addr);
255
/* Hook up signals. */
256
g_signal_connect (G_OBJECT (agent), "candidate-gathering-done",
257
(GCallback) candidate_gathering_done_cb,
258
GUINT_TO_POINTER (controlling_mode));
259
g_signal_connect (G_OBJECT (agent), "new-selected-pair",
260
(GCallback) new_selected_pair_cb, data);
261
g_signal_connect (G_OBJECT (agent), "component-state-changed",
262
(GCallback) component_state_changed_cb, data);
264
if (data->reliable) {
265
g_signal_connect (G_OBJECT (agent), "reliable-transport-writable",
266
(GCallback) reliable_transport_writable_cb, data);
268
data->stream_open = TRUE;
271
/* Configure the STUN server. */
272
stun_server = g_getenv ("NICE_STUN_SERVER");
273
stun_server_port = g_getenv ("NICE_STUN_SERVER_PORT");
275
if (stun_server != NULL) {
276
g_object_set (G_OBJECT (agent),
277
"stun-server", stun_server,
278
"stun-server-port", atoi (stun_server_port),
286
add_stream (NiceAgent *agent)
290
stream_id = nice_agent_add_stream (agent, 2);
291
g_assert (stream_id > 0);
293
g_object_set_data (G_OBJECT (agent), "stream-id",
294
GUINT_TO_POINTER (stream_id));
298
run_agent (TestIOStreamThreadData *data, NiceAgent *agent)
303
tmp = g_object_get_data (G_OBJECT (agent), "stream-id");
304
stream_id = GPOINTER_TO_UINT (tmp);
306
nice_agent_gather_candidates (agent, stream_id);
308
if (data->reliable) {
310
G_IO_STREAM (nice_agent_get_io_stream (agent, stream_id, 1));
311
g_object_set_data (G_OBJECT (agent), "io-stream", data->io_stream);
313
data->io_stream = NULL;
318
spawn_thread (const gchar *thread_name, GThreadFunc thread_func,
323
#if !GLIB_CHECK_VERSION(2, 31, 8)
324
thread = g_thread_create (thread_func, user_data, TRUE, NULL);
326
thread = g_thread_new (thread_name, thread_func, user_data);
335
run_io_stream_test (guint deadlock_timeout, gboolean reliable,
336
const TestIOStreamCallbacks *callbacks,
337
gpointer l_user_data, GDestroyNotify l_user_data_free,
338
gpointer r_user_data, GDestroyNotify r_user_data_free)
340
GMainLoop *error_loop;
341
GThread *l_main_thread, *r_main_thread;
342
GThread *l_write_thread, *l_read_thread, *r_write_thread, *r_read_thread;
343
TestIOStreamThreadData l_data, r_data;
346
guint start_count = 6;
348
g_mutex_init (&mutex);
351
error_loop = g_main_loop_new (NULL, FALSE);
353
/* Set up data structures. */
354
l_data.reliable = reliable;
355
l_data.error_loop = error_loop;
356
l_data.callbacks = callbacks;
357
l_data.user_data = l_user_data;
358
l_data.user_data_free = l_user_data_free;
360
g_cond_init (&l_data.write_cond);
361
g_mutex_init (&l_data.write_mutex);
362
l_data.stream_open = FALSE;
363
l_data.stream_ready = FALSE;
364
l_data.start_mutex = &mutex;
365
l_data.start_cond = &cond;
366
l_data.start_count = &start_count;
368
r_data.reliable = reliable;
369
r_data.error_loop = error_loop;
370
r_data.callbacks = callbacks;
371
r_data.user_data = r_user_data;
372
r_data.user_data_free = r_user_data_free;
374
g_cond_init (&r_data.write_cond);
375
g_mutex_init (&r_data.write_mutex);
376
r_data.stream_open = FALSE;
377
r_data.stream_ready = FALSE;
378
r_data.start_mutex = &mutex;
379
r_data.start_cond = &cond;
380
r_data.start_count = &start_count;
382
l_data.other = &r_data;
383
r_data.other = &l_data;
385
/* Create the L and R agents. */
386
l_data.agent = create_agent (TRUE, &l_data,
387
&l_data.main_context, &l_data.main_loop);
388
r_data.agent = create_agent (FALSE, &r_data,
389
&r_data.main_context, &r_data.main_loop);
391
g_object_set_data (G_OBJECT (l_data.agent), "other-agent", r_data.agent);
392
g_object_set_data (G_OBJECT (r_data.agent), "other-agent", l_data.agent);
394
/* Add a timer to catch deadlocks. */
395
g_timeout_add_seconds (deadlock_timeout, timer_cb, NULL);
397
l_main_thread = spawn_thread ("libnice L main", main_thread_cb, &l_data);
398
r_main_thread = spawn_thread ("libnice R main", main_thread_cb, &r_data);
400
add_stream (l_data.agent);
401
add_stream (r_data.agent);
402
run_agent (&l_data, l_data.agent);
403
run_agent (&r_data, r_data.agent);
405
l_read_thread = spawn_thread ("libnice L read", read_thread_cb, &l_data);
406
r_read_thread = spawn_thread ("libnice R read", read_thread_cb, &r_data);
408
if (callbacks->write_thread != NULL) {
409
l_write_thread = spawn_thread ("libnice L write", write_thread_cb, &l_data);
410
r_write_thread = spawn_thread ("libnice R write", write_thread_cb, &r_data);
412
g_mutex_lock (&mutex);
414
g_cond_broadcast (&cond);
415
g_mutex_unlock (&mutex);
417
l_write_thread = NULL;
418
r_write_thread = NULL;
421
/* Run loop for error timer */
422
g_main_loop_run (error_loop);
424
/* Clean up the main loops and threads. */
425
stop_main_loop (l_data.main_loop);
426
stop_main_loop (r_data.main_loop);
428
g_thread_join (l_read_thread);
429
g_thread_join (r_read_thread);
430
if (l_write_thread != NULL)
431
g_thread_join (l_write_thread);
432
if (r_write_thread != NULL)
433
g_thread_join (r_write_thread);
434
g_thread_join (l_main_thread);
435
g_thread_join (r_main_thread);
438
if (r_data.user_data_free != NULL)
439
r_data.user_data_free (r_data.user_data);
441
if (l_data.user_data_free != NULL)
442
l_data.user_data_free (l_data.user_data);
444
g_cond_clear (&r_data.write_cond);
445
g_mutex_clear (&r_data.write_mutex);
446
g_cond_clear (&l_data.write_cond);
447
g_mutex_clear (&l_data.write_mutex);
449
if (r_data.io_stream != NULL)
450
g_object_unref (r_data.io_stream);
451
if (l_data.io_stream != NULL)
452
g_object_unref (l_data.io_stream);
454
g_object_unref (r_data.agent);
455
g_object_unref (l_data.agent);
457
g_main_loop_unref (r_data.main_loop);
458
g_main_loop_unref (l_data.main_loop);
460
g_main_context_unref (r_data.main_context);
461
g_main_context_unref (l_data.main_context);
463
g_main_loop_unref (error_loop);
465
g_mutex_clear (&mutex);
466
g_cond_clear (&cond);
469
/* Once we’ve received all the expected bytes, wait to finish sending all bytes,
470
* then send and wait for the close message. Finally, remove the stream.
472
* This must only be called from the read thread implementation. */
474
check_for_termination (TestIOStreamThreadData *data, gsize *recv_count,
475
gsize *other_recv_count, gsize *send_count, gsize expected_recv_count)
480
/* Wait for transmission to complete. */
481
while (*send_count < expected_recv_count);
483
/* Send a close message. */
484
tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
485
stream_id = GPOINTER_TO_UINT (tmp);
487
/* Can't be certain enough to test for termination on non-reliable streams.
488
* There may be packet losses, etc
490
if (data->reliable) {
494
GError *error = NULL;
496
g_assert_cmpuint (*recv_count, >=, expected_recv_count);
498
buf_len = strlen ("Done");
499
memcpy (buf, "Done", buf_len);
500
len = nice_agent_send (data->agent, stream_id, 1, buf_len, (gchar *) buf);
501
g_assert_cmpint (len, ==, buf_len);
503
/* Wait for a done packet. */
504
len = nice_agent_recv (data->agent, stream_id, 1, buf, buf_len, NULL,
506
g_assert_no_error (error);
508
g_assert_cmpint (len, ==, strlen ("Done"));
509
g_assert_cmpint (memcmp (buf, "Done", strlen ("Done")), ==, 0);
511
*recv_count = *recv_count + 1;
514
/* Remove the stream and run away. */
515
nice_agent_remove_stream (data->agent, stream_id);
517
/* If both sides have finished, quit the test main loop. */
518
if (*recv_count > expected_recv_count &&
519
*other_recv_count > expected_recv_count) {
520
g_main_loop_quit (data->error_loop);
525
stop_main_loop (GMainLoop *loop)
527
GSource *src = g_idle_source_new ();
528
g_source_set_callback (src, (GSourceFunc) g_main_loop_quit,
529
g_main_loop_ref (loop), (GDestroyNotify) g_main_loop_unref);
530
g_source_attach (src, g_main_loop_get_context (loop));
531
g_source_unref (src);