~ubuntu-branches/ubuntu/utopic/libevent/utopic

« back to all changes in this revision

Viewing changes to test/regress_thread.c

  • Committer: Package Import Robot
  • Author(s): Anibal Monsalve Salazar
  • Date: 2011-11-28 15:39:09 UTC
  • mfrom: (1.3.5) (5.1.7 experimental)
  • Revision ID: package-import@ubuntu.com-20111128153909-y8bo0l4y4kzdqluz
Tags: 2.0.16-stable-1
* New upstream version 2.0.16-stable
* Uploading to unstable, see http://bugs.debian.org/631018

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2007-2011 Niels Provos and Nick Mathewson
 
3
 *
 
4
 * Redistribution and use in source and binary forms, with or without
 
5
 * modification, are permitted provided that the following conditions
 
6
 * are met:
 
7
 * 1. Redistributions of source code must retain the above copyright
 
8
 *    notice, this list of conditions and the following disclaimer.
 
9
 * 2. Redistributions in binary form must reproduce the above copyright
 
10
 *    notice, this list of conditions and the following disclaimer in the
 
11
 *    documentation and/or other materials provided with the distribution.
 
12
 * 3. The name of the author may not be used to endorse or promote products
 
13
 *    derived from this software without specific prior written permission.
 
14
 *
 
15
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
 
16
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
17
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
18
 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
 
19
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
20
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
21
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
22
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
23
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
24
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
25
 */
 
26
 
 
27
/* The old tests here need assertions to work. */
 
28
#undef NDEBUG
 
29
 
 
30
#include "event2/event-config.h"
 
31
 
 
32
#include <sys/types.h>
 
33
#include <stdio.h>
 
34
#include <stdlib.h>
 
35
#include <string.h>
 
36
#ifdef _EVENT_HAVE_UNISTD_H
 
37
#include <unistd.h>
 
38
#endif
 
39
#ifdef _EVENT_HAVE_SYS_WAIT_H
 
40
#include <sys/wait.h>
 
41
#endif
 
42
 
 
43
#ifdef _EVENT_HAVE_PTHREADS
 
44
#include <pthread.h>
 
45
#elif defined(WIN32)
 
46
#include <process.h>
 
47
#endif
 
48
#include <assert.h>
 
49
#ifdef _EVENT_HAVE_UNISTD_H
 
50
#include <unistd.h>
 
51
#endif
 
52
#include <time.h>
 
53
 
 
54
#include "sys/queue.h"
 
55
 
 
56
#include "event2/util.h"
 
57
#include "event2/event.h"
 
58
#include "event2/event_struct.h"
 
59
#include "event2/thread.h"
 
60
#include "evthread-internal.h"
 
61
#include "event-internal.h"
 
62
#include "defer-internal.h"
 
63
#include "regress.h"
 
64
#include "tinytest_macros.h"
 
65
 
 
66
#ifdef _EVENT_HAVE_PTHREADS
 
67
#define THREAD_T pthread_t
 
68
#define THREAD_FN void *
 
69
#define THREAD_RETURN() return (NULL)
 
70
#define THREAD_START(threadvar, fn, arg) \
 
71
        pthread_create(&(threadvar), NULL, fn, arg)
 
72
#define THREAD_JOIN(th) pthread_join(th, NULL)
 
73
#else
 
74
#define THREAD_T HANDLE
 
75
#define THREAD_FN unsigned __stdcall
 
76
#define THREAD_RETURN() return (0)
 
77
#define THREAD_START(threadvar, fn, arg) do {           \
 
78
        uintptr_t threadhandle = _beginthreadex(NULL,0,fn,(arg),0,NULL); \
 
79
        (threadvar) = (HANDLE) threadhandle; \
 
80
        } while (0)
 
81
#define THREAD_JOIN(th) WaitForSingleObject(th, INFINITE)
 
82
#endif
 
83
 
 
84
struct cond_wait {
 
85
        void *lock;
 
86
        void *cond;
 
87
};
 
88
 
 
89
static void
 
90
wake_all_timeout(evutil_socket_t fd, short what, void *arg)
 
91
{
 
92
        struct cond_wait *cw = arg;
 
93
        EVLOCK_LOCK(cw->lock, 0);
 
94
        EVTHREAD_COND_BROADCAST(cw->cond);
 
95
        EVLOCK_UNLOCK(cw->lock, 0);
 
96
 
 
97
}
 
98
 
 
99
static void
 
100
wake_one_timeout(evutil_socket_t fd, short what, void *arg)
 
101
{
 
102
        struct cond_wait *cw = arg;
 
103
        EVLOCK_LOCK(cw->lock, 0);
 
104
        EVTHREAD_COND_SIGNAL(cw->cond);
 
105
        EVLOCK_UNLOCK(cw->lock, 0);
 
106
}
 
107
 
 
108
#define NUM_THREADS     100
 
109
#define NUM_ITERATIONS  100
 
110
void *count_lock;
 
111
static int count;
 
112
 
 
113
static THREAD_FN
 
114
basic_thread(void *arg)
 
115
{
 
116
        struct cond_wait cw;
 
117
        struct event_base *base = arg;
 
118
        struct event ev;
 
119
        int i = 0;
 
120
 
 
121
        EVTHREAD_ALLOC_LOCK(cw.lock, 0);
 
122
        EVTHREAD_ALLOC_COND(cw.cond);
 
123
        assert(cw.lock);
 
124
        assert(cw.cond);
 
125
 
 
126
        evtimer_assign(&ev, base, wake_all_timeout, &cw);
 
127
        for (i = 0; i < NUM_ITERATIONS; i++) {
 
128
                struct timeval tv;
 
129
                evutil_timerclear(&tv);
 
130
                tv.tv_sec = 0;
 
131
                tv.tv_usec = 3000;
 
132
 
 
133
                EVLOCK_LOCK(cw.lock, 0);
 
134
                /* we need to make sure that event does not happen before
 
135
                 * we get to wait on the conditional variable */
 
136
                assert(evtimer_add(&ev, &tv) == 0);
 
137
 
 
138
                assert(EVTHREAD_COND_WAIT(cw.cond, cw.lock) == 0);
 
139
                EVLOCK_UNLOCK(cw.lock, 0);
 
140
 
 
141
                EVLOCK_LOCK(count_lock, 0);
 
142
                ++count;
 
143
                EVLOCK_UNLOCK(count_lock, 0);
 
144
        }
 
145
 
 
146
        /* exit the loop only if all threads fired all timeouts */
 
147
        EVLOCK_LOCK(count_lock, 0);
 
148
        if (count >= NUM_THREADS * NUM_ITERATIONS)
 
149
                event_base_loopexit(base, NULL);
 
150
        EVLOCK_UNLOCK(count_lock, 0);
 
151
 
 
152
        EVTHREAD_FREE_LOCK(cw.lock, 0);
 
153
        EVTHREAD_FREE_COND(cw.cond);
 
154
 
 
155
        THREAD_RETURN();
 
156
}
 
157
 
 
158
static int notification_fd_used = 0;
 
159
#ifndef WIN32
 
160
static int got_sigchld = 0;
 
161
static void
 
162
sigchld_cb(evutil_socket_t fd, short event, void *arg)
 
163
{
 
164
        struct timeval tv;
 
165
        struct event_base *base = arg;
 
166
 
 
167
        got_sigchld++;
 
168
        tv.tv_usec = 100000;
 
169
        tv.tv_sec = 0;
 
170
        event_base_loopexit(base, &tv);
 
171
}
 
172
 
 
173
 
 
174
static void
 
175
notify_fd_cb(evutil_socket_t fd, short event, void *arg)
 
176
{
 
177
        ++notification_fd_used;
 
178
}
 
179
#endif
 
180
 
 
181
static void
 
182
thread_basic(void *arg)
 
183
{
 
184
        THREAD_T threads[NUM_THREADS];
 
185
        struct event ev;
 
186
        struct timeval tv;
 
187
        int i;
 
188
        struct basic_test_data *data = arg;
 
189
        struct event_base *base = data->base;
 
190
 
 
191
        struct event *notification_event = NULL;
 
192
        struct event *sigchld_event = NULL;
 
193
 
 
194
        EVTHREAD_ALLOC_LOCK(count_lock, 0);
 
195
        tt_assert(count_lock);
 
196
 
 
197
        tt_assert(base);
 
198
        if (evthread_make_base_notifiable(base)<0) {
 
199
                tt_abort_msg("Couldn't make base notifiable!");
 
200
        }
 
201
 
 
202
#ifndef WIN32
 
203
        if (data->setup_data && !strcmp(data->setup_data, "forking")) {
 
204
                pid_t pid;
 
205
                int status;
 
206
                sigchld_event = evsignal_new(base, SIGCHLD, sigchld_cb, base);
 
207
                /* This piggybacks on the th_notify_fd weirdly, and looks
 
208
                 * inside libevent internals.  Not a good idea in non-testing
 
209
                 * code! */
 
210
                notification_event = event_new(base,
 
211
                    base->th_notify_fd[0], EV_READ|EV_PERSIST, notify_fd_cb,
 
212
                    NULL);
 
213
                event_add(sigchld_event, NULL);
 
214
                event_add(notification_event, NULL);
 
215
 
 
216
                if ((pid = fork()) == 0) {
 
217
                        event_del(notification_event);
 
218
                        if (event_reinit(base) < 0) {
 
219
                                TT_FAIL(("reinit"));
 
220
                                exit(1);
 
221
                        }
 
222
                        event_assign(notification_event, base,
 
223
                            base->th_notify_fd[0], EV_READ|EV_PERSIST,
 
224
                            notify_fd_cb, NULL);
 
225
                        event_add(notification_event, NULL);
 
226
                        goto child;
 
227
                }
 
228
 
 
229
                event_base_dispatch(base);
 
230
 
 
231
                if (waitpid(pid, &status, 0) == -1)
 
232
                        tt_abort_perror("waitpid");
 
233
                TT_BLATHER(("Waitpid okay\n"));
 
234
 
 
235
                tt_assert(got_sigchld);
 
236
                tt_int_op(notification_fd_used, ==, 0);
 
237
 
 
238
                goto end;
 
239
        }
 
240
 
 
241
child:
 
242
#endif
 
243
        for (i = 0; i < NUM_THREADS; ++i)
 
244
                THREAD_START(threads[i], basic_thread, base);
 
245
 
 
246
        evtimer_assign(&ev, base, NULL, NULL);
 
247
        evutil_timerclear(&tv);
 
248
        tv.tv_sec = 1000;
 
249
        event_add(&ev, &tv);
 
250
 
 
251
        event_base_dispatch(base);
 
252
 
 
253
        for (i = 0; i < NUM_THREADS; ++i)
 
254
                THREAD_JOIN(threads[i]);
 
255
 
 
256
        event_del(&ev);
 
257
 
 
258
        tt_int_op(count, ==, NUM_THREADS * NUM_ITERATIONS);
 
259
 
 
260
        EVTHREAD_FREE_LOCK(count_lock, 0);
 
261
 
 
262
        TT_BLATHER(("notifiations==%d", notification_fd_used));
 
263
 
 
264
end:
 
265
 
 
266
        if (notification_event)
 
267
                event_free(notification_event);
 
268
        if (sigchld_event)
 
269
                event_free(sigchld_event);
 
270
}
 
271
 
 
272
#undef NUM_THREADS
 
273
#define NUM_THREADS 10
 
274
 
 
275
struct alerted_record {
 
276
        struct cond_wait *cond;
 
277
        struct timeval delay;
 
278
        struct timeval alerted_at;
 
279
        int timed_out;
 
280
};
 
281
 
 
282
static THREAD_FN
 
283
wait_for_condition(void *arg)
 
284
{
 
285
        struct alerted_record *rec = arg;
 
286
        int r;
 
287
 
 
288
        EVLOCK_LOCK(rec->cond->lock, 0);
 
289
        if (rec->delay.tv_sec || rec->delay.tv_usec) {
 
290
                r = EVTHREAD_COND_WAIT_TIMED(rec->cond->cond, rec->cond->lock,
 
291
                    &rec->delay);
 
292
        } else {
 
293
                r = EVTHREAD_COND_WAIT(rec->cond->cond, rec->cond->lock);
 
294
        }
 
295
        EVLOCK_UNLOCK(rec->cond->lock, 0);
 
296
 
 
297
        evutil_gettimeofday(&rec->alerted_at, NULL);
 
298
        if (r == 1)
 
299
                rec->timed_out = 1;
 
300
 
 
301
        THREAD_RETURN();
 
302
}
 
303
 
 
304
static void
 
305
thread_conditions_simple(void *arg)
 
306
{
 
307
        struct timeval tv_signal, tv_timeout, tv_broadcast;
 
308
        struct alerted_record alerted[NUM_THREADS];
 
309
        THREAD_T threads[NUM_THREADS];
 
310
        struct cond_wait cond;
 
311
        int i;
 
312
        struct timeval launched_at;
 
313
        struct event wake_one;
 
314
        struct event wake_all;
 
315
        struct basic_test_data *data = arg;
 
316
        struct event_base *base = data->base;
 
317
        int n_timed_out=0, n_signal=0, n_broadcast=0;
 
318
 
 
319
        tv_signal.tv_sec = tv_timeout.tv_sec = tv_broadcast.tv_sec = 0;
 
320
        tv_signal.tv_usec = 30*1000;
 
321
        tv_timeout.tv_usec = 150*1000;
 
322
        tv_broadcast.tv_usec = 500*1000;
 
323
 
 
324
        EVTHREAD_ALLOC_LOCK(cond.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
 
325
        EVTHREAD_ALLOC_COND(cond.cond);
 
326
        tt_assert(cond.lock);
 
327
        tt_assert(cond.cond);
 
328
        for (i = 0; i < NUM_THREADS; ++i) {
 
329
                memset(&alerted[i], 0, sizeof(struct alerted_record));
 
330
                alerted[i].cond = &cond;
 
331
        }
 
332
 
 
333
        /* Threads 5 and 6 will be allowed to time out */
 
334
        memcpy(&alerted[5].delay, &tv_timeout, sizeof(tv_timeout));
 
335
        memcpy(&alerted[6].delay, &tv_timeout, sizeof(tv_timeout));
 
336
 
 
337
        evtimer_assign(&wake_one, base, wake_one_timeout, &cond);
 
338
        evtimer_assign(&wake_all, base, wake_all_timeout, &cond);
 
339
 
 
340
        evutil_gettimeofday(&launched_at, NULL);
 
341
 
 
342
        /* Launch the threads... */
 
343
        for (i = 0; i < NUM_THREADS; ++i) {
 
344
                THREAD_START(threads[i], wait_for_condition, &alerted[i]);
 
345
        }
 
346
 
 
347
        /* Start the timers... */
 
348
        tt_int_op(event_add(&wake_one, &tv_signal), ==, 0);
 
349
        tt_int_op(event_add(&wake_all, &tv_broadcast), ==, 0);
 
350
 
 
351
        /* And run for a bit... */
 
352
        event_base_dispatch(base);
 
353
 
 
354
        /* And wait till the threads are done. */
 
355
        for (i = 0; i < NUM_THREADS; ++i)
 
356
                THREAD_JOIN(threads[i]);
 
357
 
 
358
        /* Now, let's see what happened. At least one of 5 or 6 should
 
359
         * have timed out. */
 
360
        n_timed_out = alerted[5].timed_out + alerted[6].timed_out;
 
361
        tt_int_op(n_timed_out, >=, 1);
 
362
        tt_int_op(n_timed_out, <=, 2);
 
363
 
 
364
        for (i = 0; i < NUM_THREADS; ++i) {
 
365
                const struct timeval *target_delay;
 
366
                struct timeval target_time, actual_delay;
 
367
                if (alerted[i].timed_out) {
 
368
                        TT_BLATHER(("%d looks like a timeout\n", i));
 
369
                        target_delay = &tv_timeout;
 
370
                        tt_assert(i == 5 || i == 6);
 
371
                } else if (evutil_timerisset(&alerted[i].alerted_at)) {
 
372
                        long diff1,diff2;
 
373
                        evutil_timersub(&alerted[i].alerted_at,
 
374
                            &launched_at, &actual_delay);
 
375
                        diff1 = timeval_msec_diff(&actual_delay,
 
376
                            &tv_signal);
 
377
                        diff2 = timeval_msec_diff(&actual_delay,
 
378
                            &tv_broadcast);
 
379
                        if (abs(diff1) < abs(diff2)) {
 
380
                                TT_BLATHER(("%d looks like a signal\n", i));
 
381
                                target_delay = &tv_signal;
 
382
                                ++n_signal;
 
383
                        } else {
 
384
                                TT_BLATHER(("%d looks like a broadcast\n", i));
 
385
                                target_delay = &tv_broadcast;
 
386
                                ++n_broadcast;
 
387
                        }
 
388
                } else {
 
389
                        TT_FAIL(("Thread %d never got woken", i));
 
390
                        continue;
 
391
                }
 
392
                evutil_timeradd(target_delay, &launched_at, &target_time);
 
393
                test_timeval_diff_leq(&target_time, &alerted[i].alerted_at,
 
394
                    0, 50);
 
395
        }
 
396
        tt_int_op(n_broadcast + n_signal + n_timed_out, ==, NUM_THREADS);
 
397
        tt_int_op(n_signal, ==, 1);
 
398
 
 
399
end:
 
400
        ;
 
401
}
 
402
 
 
403
#define CB_COUNT 128
 
404
#define QUEUE_THREAD_COUNT 8
 
405
 
 
406
#ifdef WIN32
 
407
#define SLEEP_MS(ms) Sleep(ms)
 
408
#else
 
409
#define SLEEP_MS(ms) usleep((ms) * 1000)
 
410
#endif
 
411
 
 
412
struct deferred_test_data {
 
413
        struct deferred_cb cbs[CB_COUNT];
 
414
        struct deferred_cb_queue *queue;
 
415
};
 
416
 
 
417
static time_t timer_start = 0;
 
418
static time_t timer_end = 0;
 
419
static unsigned callback_count = 0;
 
420
static THREAD_T load_threads[QUEUE_THREAD_COUNT];
 
421
static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT];
 
422
 
 
423
static void
 
424
deferred_callback(struct deferred_cb *cb, void *arg)
 
425
{
 
426
        SLEEP_MS(1);
 
427
        callback_count += 1;
 
428
}
 
429
 
 
430
static THREAD_FN
 
431
load_deferred_queue(void *arg)
 
432
{
 
433
        struct deferred_test_data *data = arg;
 
434
        size_t i;
 
435
 
 
436
        for (i = 0; i < CB_COUNT; ++i) {
 
437
                event_deferred_cb_init(&data->cbs[i], deferred_callback, NULL);
 
438
                event_deferred_cb_schedule(data->queue, &data->cbs[i]);
 
439
                SLEEP_MS(1);
 
440
        }
 
441
 
 
442
        THREAD_RETURN();
 
443
}
 
444
 
 
445
static void
 
446
timer_callback(evutil_socket_t fd, short what, void *arg)
 
447
{
 
448
        timer_end = time(NULL);
 
449
}
 
450
 
 
451
static void
 
452
start_threads_callback(evutil_socket_t fd, short what, void *arg)
 
453
{
 
454
        int i;
 
455
 
 
456
        for (i = 0; i < QUEUE_THREAD_COUNT; ++i) {
 
457
                THREAD_START(load_threads[i], load_deferred_queue,
 
458
                                &deferred_data[i]);
 
459
        }
 
460
}
 
461
 
 
462
static void
 
463
thread_deferred_cb_skew(void *arg)
 
464
{
 
465
        struct basic_test_data *data = arg;
 
466
        struct timeval tv_timer = {4, 0};
 
467
        struct deferred_cb_queue *queue;
 
468
        time_t elapsed;
 
469
        int i;
 
470
 
 
471
        queue = event_base_get_deferred_cb_queue(data->base);
 
472
        tt_assert(queue);
 
473
 
 
474
        for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
 
475
                deferred_data[i].queue = queue;
 
476
 
 
477
        timer_start = time(NULL);
 
478
        event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL,
 
479
                        &tv_timer);
 
480
        event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback,
 
481
                        NULL, NULL);
 
482
        event_base_dispatch(data->base);
 
483
 
 
484
        elapsed = timer_end - timer_start;
 
485
        TT_BLATHER(("callback count, %u", callback_count));
 
486
        TT_BLATHER(("elapsed time, %u", (unsigned)elapsed));
 
487
        /* XXX be more intelligent here.  just make sure skew is
 
488
         * within 2 seconds for now. */
 
489
        tt_assert(elapsed >= 4 && elapsed <= 6);
 
490
 
 
491
end:
 
492
        for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
 
493
                THREAD_JOIN(load_threads[i]);
 
494
}
 
495
 
 
496
#define TEST(name)                                                      \
 
497
        { #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE,   \
 
498
          &basic_setup, NULL }
 
499
 
 
500
struct testcase_t thread_testcases[] = {
 
501
        { "basic", thread_basic, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE,
 
502
          &basic_setup, NULL },
 
503
#ifndef WIN32
 
504
        { "forking", thread_basic, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE,
 
505
          &basic_setup, (char*)"forking" },
 
506
#endif
 
507
        TEST(conditions_simple),
 
508
        TEST(deferred_cb_skew),
 
509
        END_OF_TESTCASES
 
510
};
 
511