~noskcaj/ubuntu/saucy/sflphone/merge-1.2.3-2

« back to all changes in this revision

Viewing changes to daemon/libs/pjproject/pjlib/src/pj/ioqueue_epoll.c

  • Committer: Jackson Doak
  • Date: 2013-07-10 21:04:46 UTC
  • mfrom: (20.1.3 sid)
  • Revision ID: noskcaj@ubuntu.com-20130710210446-y8f587vza807icr9
Properly merged from upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* $Id: ioqueue_epoll.c 3553 2011-05-05 06:14:19Z nanang $ */
2
 
/* 
3
 
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
 
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5
 
 *
6
 
 * This program is free software; you can redistribute it and/or modify
7
 
 * it under the terms of the GNU General Public License as published by
8
 
 * the Free Software Foundation; either version 2 of the License, or
9
 
 * (at your option) any later version.
10
 
 *
11
 
 * This program is distributed in the hope that it will be useful,
12
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 * GNU General Public License for more details.
15
 
 *
16
 
 * You should have received a copy of the GNU General Public License
17
 
 * along with this program; if not, write to the Free Software
18
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
19
 
 */
20
 
/*
21
 
 * ioqueue_epoll.c
22
 
 *
23
 
 * This is the implementation of IOQueue framework using /dev/epoll
24
 
 * API in _both_ Linux user-mode and kernel-mode.
25
 
 */
26
 
 
27
 
#include <pj/ioqueue.h>
28
 
#include <pj/os.h>
29
 
#include <pj/lock.h>
30
 
#include <pj/log.h>
31
 
#include <pj/list.h>
32
 
#include <pj/pool.h>
33
 
#include <pj/string.h>
34
 
#include <pj/assert.h>
35
 
#include <pj/errno.h>
36
 
#include <pj/sock.h>
37
 
#include <pj/compat/socket.h>
38
 
 
39
 
#if !defined(PJ_LINUX_KERNEL) || PJ_LINUX_KERNEL==0
40
 
    /*
41
 
     * Linux user mode
42
 
     */
43
 
#   include <sys/epoll.h>
44
 
#   include <errno.h>
45
 
#   include <unistd.h>
46
 
 
47
 
#   define epoll_data           data.ptr
48
 
#   define epoll_data_type      void*
49
 
#   define ioctl_val_type       unsigned long
50
 
#   define getsockopt_val_ptr   int*
51
 
#   define os_getsockopt        getsockopt
52
 
#   define os_ioctl             ioctl
53
 
#   define os_read              read
54
 
#   define os_close             close
55
 
#   define os_epoll_create      epoll_create
56
 
#   define os_epoll_ctl         epoll_ctl
57
 
#   define os_epoll_wait        epoll_wait
58
 
#else
59
 
    /*
60
 
     * Linux kernel mode.
61
 
     */
62
 
#   include <linux/config.h>
63
 
#   include <linux/version.h>
64
 
#   if defined(MODVERSIONS)
65
 
#       include <linux/modversions.h>
66
 
#   endif
67
 
#   include <linux/kernel.h>
68
 
#   include <linux/poll.h>
69
 
#   include <linux/eventpoll.h>
70
 
#   include <linux/syscalls.h>
71
 
#   include <linux/errno.h>
72
 
#   include <linux/unistd.h>
73
 
#   include <asm/ioctls.h>
74
 
    enum EPOLL_EVENTS
75
 
    {
76
 
        EPOLLIN = 0x001,
77
 
        EPOLLOUT = 0x004,
78
 
        EPOLLERR = 0x008,
79
 
    };
80
 
#   define os_epoll_create              sys_epoll_create
81
 
    static int os_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
82
 
    {
83
 
        long rc;
84
 
        mm_segment_t oldfs = get_fs();
85
 
        set_fs(KERNEL_DS);
86
 
        rc = sys_epoll_ctl(epfd, op, fd, event);
87
 
        set_fs(oldfs);
88
 
        if (rc) {
89
 
            errno = -rc;
90
 
            return -1;
91
 
        } else {
92
 
            return 0;
93
 
        }
94
 
    }
95
 
    static int os_epoll_wait(int epfd, struct epoll_event *events,
96
 
                          int maxevents, int timeout)
97
 
    {
98
 
        int count;
99
 
        mm_segment_t oldfs = get_fs();
100
 
        set_fs(KERNEL_DS);
101
 
        count = sys_epoll_wait(epfd, events, maxevents, timeout);
102
 
        set_fs(oldfs);
103
 
        return count;
104
 
    }
105
 
#   define os_close             sys_close
106
 
#   define os_getsockopt        pj_sock_getsockopt
107
 
    static int os_read(int fd, void *buf, size_t len)
108
 
    {
109
 
        long rc;
110
 
        mm_segment_t oldfs = get_fs();
111
 
        set_fs(KERNEL_DS);
112
 
        rc = sys_read(fd, buf, len);
113
 
        set_fs(oldfs);
114
 
        if (rc) {
115
 
            errno = -rc;
116
 
            return -1;
117
 
        } else {
118
 
            return 0;
119
 
        }
120
 
    }
121
 
#   define socklen_t            unsigned
122
 
#   define ioctl_val_type       unsigned long
123
 
    int ioctl(int fd, int opt, ioctl_val_type value);
124
 
    static int os_ioctl(int fd, int opt, ioctl_val_type value)
125
 
    {
126
 
        int rc;
127
 
        mm_segment_t oldfs = get_fs();
128
 
        set_fs(KERNEL_DS);
129
 
        rc = ioctl(fd, opt, value);
130
 
        set_fs(oldfs);
131
 
        if (rc < 0) {
132
 
            errno = -rc;
133
 
            return rc;
134
 
        } else
135
 
            return rc;
136
 
    }
137
 
#   define getsockopt_val_ptr   char*
138
 
 
139
 
#   define epoll_data           data
140
 
#   define epoll_data_type      __u32
141
 
#endif
142
 
 
143
 
#define THIS_FILE   "ioq_epoll"
144
 
 
145
 
//#define TRACE_(expr) PJ_LOG(3,expr)
146
 
#define TRACE_(expr)
147
 
 
148
 
/*
149
 
 * Include common ioqueue abstraction.
150
 
 */
151
 
#include "ioqueue_common_abs.h"
152
 
 
153
 
/*
154
 
 * This describes each key.
155
 
 */
156
 
struct pj_ioqueue_key_t
157
 
{
158
 
    DECLARE_COMMON_KEY
159
 
};
160
 
 
161
 
struct queue
162
 
{
163
 
    pj_ioqueue_key_t        *key;
164
 
    enum ioqueue_event_type  event_type;
165
 
};
166
 
 
167
 
/*
168
 
 * This describes the I/O queue.
169
 
 */
170
 
struct pj_ioqueue_t
171
 
{
172
 
    DECLARE_COMMON_IOQUEUE
173
 
 
174
 
    unsigned            max, count;
175
 
    //pj_ioqueue_key_t  hlist;
176
 
    pj_ioqueue_key_t    active_list;    
177
 
    int                 epfd;
178
 
    //struct epoll_event *events;
179
 
    //struct queue       *queue;
180
 
 
181
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
182
 
    pj_mutex_t         *ref_cnt_mutex;
183
 
    pj_ioqueue_key_t    closing_list;
184
 
    pj_ioqueue_key_t    free_list;
185
 
#endif
186
 
};
187
 
 
188
 
/* Include implementation for common abstraction after we declare
189
 
 * pj_ioqueue_key_t and pj_ioqueue_t.
190
 
 */
191
 
#include "ioqueue_common_abs.c"
192
 
 
193
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
194
 
/* Scan closing keys to be put to free list again */
195
 
static void scan_closing_keys(pj_ioqueue_t *ioqueue);
196
 
#endif
197
 
 
198
 
/*
199
 
 * pj_ioqueue_name()
200
 
 */
201
 
PJ_DEF(const char*) pj_ioqueue_name(void)
202
 
{
203
 
#if defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0
204
 
        return "epoll-kernel";
205
 
#else
206
 
        return "epoll";
207
 
#endif
208
 
}
209
 
 
210
 
/*
211
 
 * pj_ioqueue_create()
212
 
 *
213
 
 * Create select ioqueue.
214
 
 */
215
 
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 
216
 
                                       pj_size_t max_fd,
217
 
                                       pj_ioqueue_t **p_ioqueue)
218
 
{
219
 
    pj_ioqueue_t *ioqueue;
220
 
    pj_status_t rc;
221
 
    pj_lock_t *lock;
222
 
    int i;
223
 
 
224
 
    /* Check that arguments are valid. */
225
 
    PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 
226
 
                     max_fd > 0, PJ_EINVAL);
227
 
 
228
 
    /* Check that size of pj_ioqueue_op_key_t is sufficient */
229
 
    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
230
 
                     sizeof(union operation_key), PJ_EBUG);
231
 
 
232
 
    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
233
 
 
234
 
    ioqueue_init(ioqueue);
235
 
 
236
 
    ioqueue->max = max_fd;
237
 
    ioqueue->count = 0;
238
 
    pj_list_init(&ioqueue->active_list);
239
 
 
240
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
241
 
    /* When safe unregistration is used (the default), we pre-create
242
 
     * all keys and put them in the free list.
243
 
     */
244
 
 
245
 
    /* Mutex to protect key's reference counter 
246
 
     * We don't want to use key's mutex or ioqueue's mutex because
247
 
     * that would create deadlock situation in some cases.
248
 
     */
249
 
    rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
250
 
    if (rc != PJ_SUCCESS)
251
 
        return rc;
252
 
 
253
 
 
254
 
    /* Init key list */
255
 
    pj_list_init(&ioqueue->free_list);
256
 
    pj_list_init(&ioqueue->closing_list);
257
 
 
258
 
 
259
 
    /* Pre-create all keys according to max_fd */
260
 
    for ( i=0; i<max_fd; ++i) {
261
 
        pj_ioqueue_key_t *key;
262
 
 
263
 
        key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
264
 
        key->ref_count = 0;
265
 
        rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
266
 
        if (rc != PJ_SUCCESS) {
267
 
            key = ioqueue->free_list.next;
268
 
            while (key != &ioqueue->free_list) {
269
 
                pj_mutex_destroy(key->mutex);
270
 
                key = key->next;
271
 
            }
272
 
            pj_mutex_destroy(ioqueue->ref_cnt_mutex);
273
 
            return rc;
274
 
        }
275
 
 
276
 
        pj_list_push_back(&ioqueue->free_list, key);
277
 
    }
278
 
#endif
279
 
 
280
 
    rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
281
 
    if (rc != PJ_SUCCESS)
282
 
        return rc;
283
 
 
284
 
    rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
285
 
    if (rc != PJ_SUCCESS)
286
 
        return rc;
287
 
 
288
 
    ioqueue->epfd = os_epoll_create(max_fd);
289
 
    if (ioqueue->epfd < 0) {
290
 
        ioqueue_destroy(ioqueue);
291
 
        return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
292
 
    }
293
 
    
294
 
    /*ioqueue->events = pj_pool_calloc(pool, max_fd, sizeof(struct epoll_event));
295
 
    PJ_ASSERT_RETURN(ioqueue->events != NULL, PJ_ENOMEM);
296
 
 
297
 
    ioqueue->queue = pj_pool_calloc(pool, max_fd, sizeof(struct queue));
298
 
    PJ_ASSERT_RETURN(ioqueue->queue != NULL, PJ_ENOMEM);
299
 
   */
300
 
    PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue));
301
 
 
302
 
    *p_ioqueue = ioqueue;
303
 
    return PJ_SUCCESS;
304
 
}
305
 
 
306
 
/*
307
 
 * pj_ioqueue_destroy()
308
 
 *
309
 
 * Destroy ioqueue.
310
 
 */
311
 
PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
312
 
{
313
 
    pj_ioqueue_key_t *key;
314
 
 
315
 
    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
316
 
    PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP);
317
 
 
318
 
    pj_lock_acquire(ioqueue->lock);
319
 
    os_close(ioqueue->epfd);
320
 
    ioqueue->epfd = 0;
321
 
 
322
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
323
 
    /* Destroy reference counters */
324
 
    key = ioqueue->active_list.next;
325
 
    while (key != &ioqueue->active_list) {
326
 
        pj_mutex_destroy(key->mutex);
327
 
        key = key->next;
328
 
    }
329
 
 
330
 
    key = ioqueue->closing_list.next;
331
 
    while (key != &ioqueue->closing_list) {
332
 
        pj_mutex_destroy(key->mutex);
333
 
        key = key->next;
334
 
    }
335
 
 
336
 
    key = ioqueue->free_list.next;
337
 
    while (key != &ioqueue->free_list) {
338
 
        pj_mutex_destroy(key->mutex);
339
 
        key = key->next;
340
 
    }
341
 
 
342
 
    pj_mutex_destroy(ioqueue->ref_cnt_mutex);
343
 
#endif
344
 
    return ioqueue_destroy(ioqueue);
345
 
}
346
 
 
347
 
/*
348
 
 * pj_ioqueue_register_sock()
349
 
 *
350
 
 * Register a socket to ioqueue.
351
 
 */
352
 
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
353
 
                                              pj_ioqueue_t *ioqueue,
354
 
                                              pj_sock_t sock,
355
 
                                              void *user_data,
356
 
                                              const pj_ioqueue_callback *cb,
357
 
                                              pj_ioqueue_key_t **p_key)
358
 
{
359
 
    pj_ioqueue_key_t *key = NULL;
360
 
    pj_uint32_t value;
361
 
    struct epoll_event ev;
362
 
    int status;
363
 
    pj_status_t rc = PJ_SUCCESS;
364
 
    
365
 
    PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
366
 
                     cb && p_key, PJ_EINVAL);
367
 
 
368
 
    pj_lock_acquire(ioqueue->lock);
369
 
 
370
 
    if (ioqueue->count >= ioqueue->max) {
371
 
        rc = PJ_ETOOMANY;
372
 
        TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files"));
373
 
        goto on_return;
374
 
    }
375
 
 
376
 
    /* Set socket to nonblocking. */
377
 
    value = 1;
378
 
    if ((rc=os_ioctl(sock, FIONBIO, (ioctl_val_type)&value))) {
379
 
        TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: ioctl rc=%d", 
380
 
                rc));
381
 
        rc = pj_get_netos_error();
382
 
        goto on_return;
383
 
    }
384
 
 
385
 
    /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
386
 
     * the key from the free list. Otherwise allocate a new one. 
387
 
     */
388
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
389
 
 
390
 
    /* Scan closing_keys first to let them come back to free_list */
391
 
    scan_closing_keys(ioqueue);
392
 
 
393
 
    pj_assert(!pj_list_empty(&ioqueue->free_list));
394
 
    if (pj_list_empty(&ioqueue->free_list)) {
395
 
        rc = PJ_ETOOMANY;
396
 
        goto on_return;
397
 
    }
398
 
 
399
 
    key = ioqueue->free_list.next;
400
 
    pj_list_erase(key);
401
 
#else
402
 
    /* Create key. */
403
 
    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
404
 
#endif
405
 
 
406
 
    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
407
 
    if (rc != PJ_SUCCESS) {
408
 
        key = NULL;
409
 
        goto on_return;
410
 
    }
411
 
 
412
 
    /* Create key's mutex */
413
 
 /*   rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
414
 
    if (rc != PJ_SUCCESS) {
415
 
        key = NULL;
416
 
        goto on_return;
417
 
    }
418
 
*/
419
 
    /* os_epoll_ctl. */
420
 
    ev.events = EPOLLIN | EPOLLERR;
421
 
    ev.epoll_data = (epoll_data_type)key;
422
 
    status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev);
423
 
    if (status < 0) {
424
 
        rc = pj_get_os_error();
425
 
        pj_mutex_destroy(key->mutex);
426
 
        key = NULL;
427
 
        TRACE_((THIS_FILE, 
428
 
                "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d", 
429
 
                status));
430
 
        goto on_return;
431
 
    }
432
 
    
433
 
    /* Register */
434
 
    pj_list_insert_before(&ioqueue->active_list, key);
435
 
    ++ioqueue->count;
436
 
 
437
 
    //TRACE_((THIS_FILE, "socket registered, count=%d", ioqueue->count));
438
 
 
439
 
on_return:
440
 
    *p_key = key;
441
 
    pj_lock_release(ioqueue->lock);
442
 
    
443
 
    return rc;
444
 
}
445
 
 
446
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
447
 
/* Increment key's reference counter */
448
 
static void increment_counter(pj_ioqueue_key_t *key)
449
 
{
450
 
    pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
451
 
    ++key->ref_count;
452
 
    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
453
 
}
454
 
 
455
 
/* Decrement the key's reference counter, and when the counter reach zero,
456
 
 * destroy the key.
457
 
 *
458
 
 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
459
 
 */
460
 
static void decrement_counter(pj_ioqueue_key_t *key)
461
 
{
462
 
    pj_lock_acquire(key->ioqueue->lock);
463
 
    pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
464
 
    --key->ref_count;
465
 
    if (key->ref_count == 0) {
466
 
 
467
 
        pj_assert(key->closing == 1);
468
 
        pj_gettickcount(&key->free_time);
469
 
        key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
470
 
        pj_time_val_normalize(&key->free_time);
471
 
 
472
 
        pj_list_erase(key);
473
 
        pj_list_push_back(&key->ioqueue->closing_list, key);
474
 
 
475
 
    }
476
 
    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
477
 
    pj_lock_release(key->ioqueue->lock);
478
 
}
479
 
#endif
480
 
 
481
 
/*
482
 
 * pj_ioqueue_unregister()
483
 
 *
484
 
 * Unregister handle from ioqueue.
485
 
 */
486
 
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
487
 
{
488
 
    pj_ioqueue_t *ioqueue;
489
 
    struct epoll_event ev;
490
 
    int status;
491
 
    
492
 
    PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL);
493
 
 
494
 
    ioqueue = key->ioqueue;
495
 
 
496
 
    /* Lock the key to make sure no callback is simultaneously modifying
497
 
     * the key. We need to lock the key before ioqueue here to prevent
498
 
     * deadlock.
499
 
     */
500
 
    pj_mutex_lock(key->mutex);
501
 
 
502
 
    /* Also lock ioqueue */
503
 
    pj_lock_acquire(ioqueue->lock);
504
 
 
505
 
    pj_assert(ioqueue->count > 0);
506
 
    --ioqueue->count;
507
 
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
508
 
    pj_list_erase(key);
509
 
#endif
510
 
 
511
 
    ev.events = 0;
512
 
    ev.epoll_data = (epoll_data_type)key;
513
 
    status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev);
514
 
    if (status != 0) {
515
 
        pj_status_t rc = pj_get_os_error();
516
 
        pj_lock_release(ioqueue->lock);
517
 
        return rc;
518
 
    }
519
 
 
520
 
    /* Destroy the key. */
521
 
    pj_sock_close(key->fd);
522
 
 
523
 
    pj_lock_release(ioqueue->lock);
524
 
 
525
 
 
526
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
527
 
    /* Mark key is closing. */
528
 
    key->closing = 1;
529
 
 
530
 
    /* Decrement counter. */
531
 
    decrement_counter(key);
532
 
 
533
 
    /* Done. */
534
 
    pj_mutex_unlock(key->mutex);
535
 
#else
536
 
    pj_mutex_destroy(key->mutex);
537
 
#endif
538
 
 
539
 
    return PJ_SUCCESS;
540
 
}
541
 
 
542
 
/* ioqueue_remove_from_set()
543
 
 * This function is called from ioqueue_dispatch_event() to instruct
544
 
 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
545
 
 * set for the specified event.
546
 
 */
547
 
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
548
 
                                     pj_ioqueue_key_t *key, 
549
 
                                     enum ioqueue_event_type event_type)
550
 
{
551
 
    if (event_type == WRITEABLE_EVENT) {
552
 
        struct epoll_event ev;
553
 
 
554
 
        ev.events = EPOLLIN | EPOLLERR;
555
 
        ev.epoll_data = (epoll_data_type)key;
556
 
        os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev);
557
 
    }   
558
 
}
559
 
 
560
 
/*
561
 
 * ioqueue_add_to_set()
562
 
 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
563
 
 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
564
 
 * set for the specified event.
565
 
 */
566
 
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
567
 
                                pj_ioqueue_key_t *key,
568
 
                                enum ioqueue_event_type event_type )
569
 
{
570
 
    if (event_type == WRITEABLE_EVENT) {
571
 
        struct epoll_event ev;
572
 
 
573
 
        ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
574
 
        ev.epoll_data = (epoll_data_type)key;
575
 
        os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev);
576
 
    }   
577
 
}
578
 
 
579
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
580
 
/* Scan closing keys to be put to free list again */
581
 
static void scan_closing_keys(pj_ioqueue_t *ioqueue)
582
 
{
583
 
    pj_time_val now;
584
 
    pj_ioqueue_key_t *h;
585
 
 
586
 
    pj_gettickcount(&now);
587
 
    h = ioqueue->closing_list.next;
588
 
    while (h != &ioqueue->closing_list) {
589
 
        pj_ioqueue_key_t *next = h->next;
590
 
 
591
 
        pj_assert(h->closing != 0);
592
 
 
593
 
        if (PJ_TIME_VAL_GTE(now, h->free_time)) {
594
 
            pj_list_erase(h);
595
 
            pj_list_push_back(&ioqueue->free_list, h);
596
 
        }
597
 
        h = next;
598
 
    }
599
 
}
600
 
#endif
601
 
 
602
 
/*
603
 
 * pj_ioqueue_poll()
604
 
 *
605
 
 */
606
 
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
607
 
{
608
 
    int i, count, processed;
609
 
    int msec;
610
 
    //struct epoll_event *events = ioqueue->events;
611
 
    //struct queue *queue = ioqueue->queue;
612
 
    struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
613
 
    struct queue queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
614
 
    pj_timestamp t1, t2;
615
 
    
616
 
    PJ_CHECK_STACK();
617
 
 
618
 
    msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
619
 
 
620
 
    TRACE_((THIS_FILE, "start os_epoll_wait, msec=%d", msec));
621
 
    pj_get_timestamp(&t1);
622
 
 
623
 
    //count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec);
624
 
    count = os_epoll_wait( ioqueue->epfd, events, PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, msec);
625
 
    if (count == 0) {
626
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
627
 
    /* Check the closing keys only when there's no activity and when there are
628
 
     * pending closing keys.
629
 
     */
630
 
    if (count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
631
 
        pj_lock_acquire(ioqueue->lock);
632
 
        scan_closing_keys(ioqueue);
633
 
        pj_lock_release(ioqueue->lock);
634
 
    }
635
 
#endif
636
 
        TRACE_((THIS_FILE, "os_epoll_wait timed out"));
637
 
        return count;
638
 
    }
639
 
    else if (count < 0) {
640
 
        TRACE_((THIS_FILE, "os_epoll_wait error"));
641
 
        return -pj_get_netos_error();
642
 
    }
643
 
 
644
 
    pj_get_timestamp(&t2);
645
 
    TRACE_((THIS_FILE, "os_epoll_wait returns %d, time=%d usec",
646
 
                       count, pj_elapsed_usec(&t1, &t2)));
647
 
 
648
 
    /* Lock ioqueue. */
649
 
    pj_lock_acquire(ioqueue->lock);
650
 
 
651
 
    for (processed=0, i=0; i<count; ++i) {
652
 
        pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
653
 
                                events[i].epoll_data;
654
 
 
655
 
        TRACE_((THIS_FILE, "event %d: events=%d", i, events[i].events));
656
 
 
657
 
        /*
658
 
         * Check readability.
659
 
         */
660
 
        if ((events[i].events & EPOLLIN) && 
661
 
            (key_has_pending_read(h) || key_has_pending_accept(h)) && !IS_CLOSING(h) ) {
662
 
 
663
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
664
 
            increment_counter(h);
665
 
#endif
666
 
            queue[processed].key = h;
667
 
            queue[processed].event_type = READABLE_EVENT;
668
 
            ++processed;
669
 
        }
670
 
 
671
 
        /*
672
 
         * Check for writeability.
673
 
         */
674
 
        if ((events[i].events & EPOLLOUT) && key_has_pending_write(h) && !IS_CLOSING(h)) {
675
 
 
676
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
677
 
            increment_counter(h);
678
 
#endif
679
 
            queue[processed].key = h;
680
 
            queue[processed].event_type = WRITEABLE_EVENT;
681
 
            ++processed;
682
 
        }
683
 
 
684
 
#if PJ_HAS_TCP
685
 
        /*
686
 
         * Check for completion of connect() operation.
687
 
         */
688
 
        if ((events[i].events & EPOLLOUT) && (h->connecting) && !IS_CLOSING(h)) {
689
 
 
690
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
691
 
            increment_counter(h);
692
 
#endif
693
 
            queue[processed].key = h;
694
 
            queue[processed].event_type = WRITEABLE_EVENT;
695
 
            ++processed;
696
 
        }
697
 
#endif /* PJ_HAS_TCP */
698
 
        
699
 
        /*
700
 
         * Check for error condition.
701
 
         */
702
 
        if (events[i].events & EPOLLERR && (h->connecting) && !IS_CLOSING(h)) {
703
 
                
704
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
705
 
            increment_counter(h);
706
 
#endif          
707
 
            queue[processed].key = h;
708
 
            queue[processed].event_type = EXCEPTION_EVENT;
709
 
            ++processed;
710
 
        }
711
 
    }
712
 
    pj_lock_release(ioqueue->lock);
713
 
 
714
 
    /* Now process the events. */
715
 
    for (i=0; i<processed; ++i) {
716
 
        switch (queue[i].event_type) {
717
 
        case READABLE_EVENT:
718
 
            ioqueue_dispatch_read_event(ioqueue, queue[i].key);
719
 
            break;
720
 
        case WRITEABLE_EVENT:
721
 
            ioqueue_dispatch_write_event(ioqueue, queue[i].key);
722
 
            break;
723
 
        case EXCEPTION_EVENT:
724
 
            ioqueue_dispatch_exception_event(ioqueue, queue[i].key);
725
 
            break;
726
 
        case NO_EVENT:
727
 
            pj_assert(!"Invalid event!");
728
 
            break;
729
 
        }
730
 
 
731
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
732
 
        decrement_counter(queue[i].key);
733
 
#endif
734
 
    }
735
 
 
736
 
    /* Special case:
737
 
     * When epoll returns > 0 but no descriptors are actually set!
738
 
     */
739
 
    if (count > 0 && !processed && msec > 0) {
740
 
        pj_thread_sleep(msec);
741
 
    }
742
 
 
743
 
    pj_get_timestamp(&t1);
744
 
    TRACE_((THIS_FILE, "ioqueue_poll() returns %d, time=%d usec",
745
 
                       processed, pj_elapsed_usec(&t2, &t1)));
746
 
 
747
 
    return processed;
748
 
}
749