1
/* $Id: ioqueue_epoll.c 3553 2011-05-05 06:14:19Z nanang $ */
3
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23
* This is the implementation of IOQueue framework using /dev/epoll
24
* API in _both_ Linux user-mode and kernel-mode.
27
#include <pj/ioqueue.h>
33
#include <pj/string.h>
34
#include <pj/assert.h>
37
#include <pj/compat/socket.h>
39
#if !defined(PJ_LINUX_KERNEL) || PJ_LINUX_KERNEL==0
43
# include <sys/epoll.h>
47
# define epoll_data data.ptr
48
# define epoll_data_type void*
49
# define ioctl_val_type unsigned long
50
# define getsockopt_val_ptr int*
51
# define os_getsockopt getsockopt
52
# define os_ioctl ioctl
54
# define os_close close
55
# define os_epoll_create epoll_create
56
# define os_epoll_ctl epoll_ctl
57
# define os_epoll_wait epoll_wait
62
# include <linux/config.h>
63
# include <linux/version.h>
64
# if defined(MODVERSIONS)
65
# include <linux/modversions.h>
67
# include <linux/kernel.h>
68
# include <linux/poll.h>
69
# include <linux/eventpoll.h>
70
# include <linux/syscalls.h>
71
# include <linux/errno.h>
72
# include <linux/unistd.h>
73
# include <asm/ioctls.h>
80
# define os_epoll_create sys_epoll_create
81
static int os_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
84
mm_segment_t oldfs = get_fs();
86
rc = sys_epoll_ctl(epfd, op, fd, event);
95
static int os_epoll_wait(int epfd, struct epoll_event *events,
96
int maxevents, int timeout)
99
mm_segment_t oldfs = get_fs();
101
count = sys_epoll_wait(epfd, events, maxevents, timeout);
105
# define os_close sys_close
106
# define os_getsockopt pj_sock_getsockopt
107
static int os_read(int fd, void *buf, size_t len)
110
mm_segment_t oldfs = get_fs();
112
rc = sys_read(fd, buf, len);
121
# define socklen_t unsigned
122
# define ioctl_val_type unsigned long
123
int ioctl(int fd, int opt, ioctl_val_type value);
124
static int os_ioctl(int fd, int opt, ioctl_val_type value)
127
mm_segment_t oldfs = get_fs();
129
rc = ioctl(fd, opt, value);
137
# define getsockopt_val_ptr char*
139
# define epoll_data data
140
# define epoll_data_type __u32
143
#define THIS_FILE "ioq_epoll"
145
//#define TRACE_(expr) PJ_LOG(3,expr)
149
* Include common ioqueue abstraction.
151
#include "ioqueue_common_abs.h"
154
* This describes each key.
156
struct pj_ioqueue_key_t
163
pj_ioqueue_key_t *key;
164
enum ioqueue_event_type event_type;
168
* This describes the I/O queue.
172
DECLARE_COMMON_IOQUEUE
175
//pj_ioqueue_key_t hlist;
176
pj_ioqueue_key_t active_list;
178
//struct epoll_event *events;
179
//struct queue *queue;
181
#if PJ_IOQUEUE_HAS_SAFE_UNREG
182
pj_mutex_t *ref_cnt_mutex;
183
pj_ioqueue_key_t closing_list;
184
pj_ioqueue_key_t free_list;
188
/* Include implementation for common abstraction after we declare
189
* pj_ioqueue_key_t and pj_ioqueue_t.
191
#include "ioqueue_common_abs.c"
193
#if PJ_IOQUEUE_HAS_SAFE_UNREG
194
/* Scan closing keys to be put to free list again */
195
static void scan_closing_keys(pj_ioqueue_t *ioqueue);
201
PJ_DEF(const char*) pj_ioqueue_name(void)
203
#if defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0
204
return "epoll-kernel";
211
* pj_ioqueue_create()
213
* Create select ioqueue.
215
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
217
pj_ioqueue_t **p_ioqueue)
219
pj_ioqueue_t *ioqueue;
224
/* Check that arguments are valid. */
225
PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
226
max_fd > 0, PJ_EINVAL);
228
/* Check that size of pj_ioqueue_op_key_t is sufficient */
229
PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
230
sizeof(union operation_key), PJ_EBUG);
232
ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
234
ioqueue_init(ioqueue);
236
ioqueue->max = max_fd;
238
pj_list_init(&ioqueue->active_list);
240
#if PJ_IOQUEUE_HAS_SAFE_UNREG
241
/* When safe unregistration is used (the default), we pre-create
242
* all keys and put them in the free list.
245
/* Mutex to protect key's reference counter
246
* We don't want to use key's mutex or ioqueue's mutex because
247
* that would create deadlock situation in some cases.
249
rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
250
if (rc != PJ_SUCCESS)
255
pj_list_init(&ioqueue->free_list);
256
pj_list_init(&ioqueue->closing_list);
259
/* Pre-create all keys according to max_fd */
260
for ( i=0; i<max_fd; ++i) {
261
pj_ioqueue_key_t *key;
263
key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
265
rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
266
if (rc != PJ_SUCCESS) {
267
key = ioqueue->free_list.next;
268
while (key != &ioqueue->free_list) {
269
pj_mutex_destroy(key->mutex);
272
pj_mutex_destroy(ioqueue->ref_cnt_mutex);
276
pj_list_push_back(&ioqueue->free_list, key);
280
rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
281
if (rc != PJ_SUCCESS)
284
rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
285
if (rc != PJ_SUCCESS)
288
ioqueue->epfd = os_epoll_create(max_fd);
289
if (ioqueue->epfd < 0) {
290
ioqueue_destroy(ioqueue);
291
return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
294
/*ioqueue->events = pj_pool_calloc(pool, max_fd, sizeof(struct epoll_event));
295
PJ_ASSERT_RETURN(ioqueue->events != NULL, PJ_ENOMEM);
297
ioqueue->queue = pj_pool_calloc(pool, max_fd, sizeof(struct queue));
298
PJ_ASSERT_RETURN(ioqueue->queue != NULL, PJ_ENOMEM);
300
PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue));
302
*p_ioqueue = ioqueue;
307
* pj_ioqueue_destroy()
311
PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
313
pj_ioqueue_key_t *key;
315
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
316
PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP);
318
pj_lock_acquire(ioqueue->lock);
319
os_close(ioqueue->epfd);
322
#if PJ_IOQUEUE_HAS_SAFE_UNREG
323
/* Destroy reference counters */
324
key = ioqueue->active_list.next;
325
while (key != &ioqueue->active_list) {
326
pj_mutex_destroy(key->mutex);
330
key = ioqueue->closing_list.next;
331
while (key != &ioqueue->closing_list) {
332
pj_mutex_destroy(key->mutex);
336
key = ioqueue->free_list.next;
337
while (key != &ioqueue->free_list) {
338
pj_mutex_destroy(key->mutex);
342
pj_mutex_destroy(ioqueue->ref_cnt_mutex);
344
return ioqueue_destroy(ioqueue);
348
* pj_ioqueue_register_sock()
350
* Register a socket to ioqueue.
352
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
353
pj_ioqueue_t *ioqueue,
356
const pj_ioqueue_callback *cb,
357
pj_ioqueue_key_t **p_key)
359
pj_ioqueue_key_t *key = NULL;
361
struct epoll_event ev;
363
pj_status_t rc = PJ_SUCCESS;
365
PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
366
cb && p_key, PJ_EINVAL);
368
pj_lock_acquire(ioqueue->lock);
370
if (ioqueue->count >= ioqueue->max) {
372
TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files"));
376
/* Set socket to nonblocking. */
378
if ((rc=os_ioctl(sock, FIONBIO, (ioctl_val_type)&value))) {
379
TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: ioctl rc=%d",
381
rc = pj_get_netos_error();
385
/* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
386
* the key from the free list. Otherwise allocate a new one.
388
#if PJ_IOQUEUE_HAS_SAFE_UNREG
390
/* Scan closing_keys first to let them come back to free_list */
391
scan_closing_keys(ioqueue);
393
pj_assert(!pj_list_empty(&ioqueue->free_list));
394
if (pj_list_empty(&ioqueue->free_list)) {
399
key = ioqueue->free_list.next;
403
key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
406
rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
407
if (rc != PJ_SUCCESS) {
412
/* Create key's mutex */
413
/* rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
414
if (rc != PJ_SUCCESS) {
420
ev.events = EPOLLIN | EPOLLERR;
421
ev.epoll_data = (epoll_data_type)key;
422
status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev);
424
rc = pj_get_os_error();
425
pj_mutex_destroy(key->mutex);
428
"pj_ioqueue_register_sock error: os_epoll_ctl rc=%d",
434
pj_list_insert_before(&ioqueue->active_list, key);
437
//TRACE_((THIS_FILE, "socket registered, count=%d", ioqueue->count));
441
pj_lock_release(ioqueue->lock);
446
#if PJ_IOQUEUE_HAS_SAFE_UNREG
447
/* Increment key's reference counter */
448
static void increment_counter(pj_ioqueue_key_t *key)
450
pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
452
pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
455
/* Decrement the key's reference counter, and when the counter reach zero,
458
* Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
460
static void decrement_counter(pj_ioqueue_key_t *key)
462
pj_lock_acquire(key->ioqueue->lock);
463
pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
465
if (key->ref_count == 0) {
467
pj_assert(key->closing == 1);
468
pj_gettickcount(&key->free_time);
469
key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
470
pj_time_val_normalize(&key->free_time);
473
pj_list_push_back(&key->ioqueue->closing_list, key);
476
pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
477
pj_lock_release(key->ioqueue->lock);
482
* pj_ioqueue_unregister()
484
* Unregister handle from ioqueue.
486
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
488
pj_ioqueue_t *ioqueue;
489
struct epoll_event ev;
492
PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL);
494
ioqueue = key->ioqueue;
496
/* Lock the key to make sure no callback is simultaneously modifying
497
* the key. We need to lock the key before ioqueue here to prevent
500
pj_mutex_lock(key->mutex);
502
/* Also lock ioqueue */
503
pj_lock_acquire(ioqueue->lock);
505
pj_assert(ioqueue->count > 0);
507
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
512
ev.epoll_data = (epoll_data_type)key;
513
status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev);
515
pj_status_t rc = pj_get_os_error();
516
pj_lock_release(ioqueue->lock);
520
/* Destroy the key. */
521
pj_sock_close(key->fd);
523
pj_lock_release(ioqueue->lock);
526
#if PJ_IOQUEUE_HAS_SAFE_UNREG
527
/* Mark key is closing. */
530
/* Decrement counter. */
531
decrement_counter(key);
534
pj_mutex_unlock(key->mutex);
536
pj_mutex_destroy(key->mutex);
542
/* ioqueue_remove_from_set()
543
* This function is called from ioqueue_dispatch_event() to instruct
544
* the ioqueue to remove the specified descriptor from ioqueue's descriptor
545
* set for the specified event.
547
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
548
pj_ioqueue_key_t *key,
549
enum ioqueue_event_type event_type)
551
if (event_type == WRITEABLE_EVENT) {
552
struct epoll_event ev;
554
ev.events = EPOLLIN | EPOLLERR;
555
ev.epoll_data = (epoll_data_type)key;
556
os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev);
561
* ioqueue_add_to_set()
562
* This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
563
* to instruct the ioqueue to add the specified handle to ioqueue's descriptor
564
* set for the specified event.
566
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
567
pj_ioqueue_key_t *key,
568
enum ioqueue_event_type event_type )
570
if (event_type == WRITEABLE_EVENT) {
571
struct epoll_event ev;
573
ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
574
ev.epoll_data = (epoll_data_type)key;
575
os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev);
579
#if PJ_IOQUEUE_HAS_SAFE_UNREG
580
/* Scan closing keys to be put to free list again */
581
static void scan_closing_keys(pj_ioqueue_t *ioqueue)
586
pj_gettickcount(&now);
587
h = ioqueue->closing_list.next;
588
while (h != &ioqueue->closing_list) {
589
pj_ioqueue_key_t *next = h->next;
591
pj_assert(h->closing != 0);
593
if (PJ_TIME_VAL_GTE(now, h->free_time)) {
595
pj_list_push_back(&ioqueue->free_list, h);
606
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
608
int i, count, processed;
610
//struct epoll_event *events = ioqueue->events;
611
//struct queue *queue = ioqueue->queue;
612
struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
613
struct queue queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
618
msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
620
TRACE_((THIS_FILE, "start os_epoll_wait, msec=%d", msec));
621
pj_get_timestamp(&t1);
623
//count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec);
624
count = os_epoll_wait( ioqueue->epfd, events, PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, msec);
626
#if PJ_IOQUEUE_HAS_SAFE_UNREG
627
/* Check the closing keys only when there's no activity and when there are
628
* pending closing keys.
630
if (count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
631
pj_lock_acquire(ioqueue->lock);
632
scan_closing_keys(ioqueue);
633
pj_lock_release(ioqueue->lock);
636
TRACE_((THIS_FILE, "os_epoll_wait timed out"));
639
else if (count < 0) {
640
TRACE_((THIS_FILE, "os_epoll_wait error"));
641
return -pj_get_netos_error();
644
pj_get_timestamp(&t2);
645
TRACE_((THIS_FILE, "os_epoll_wait returns %d, time=%d usec",
646
count, pj_elapsed_usec(&t1, &t2)));
649
pj_lock_acquire(ioqueue->lock);
651
for (processed=0, i=0; i<count; ++i) {
652
pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
653
events[i].epoll_data;
655
TRACE_((THIS_FILE, "event %d: events=%d", i, events[i].events));
660
if ((events[i].events & EPOLLIN) &&
661
(key_has_pending_read(h) || key_has_pending_accept(h)) && !IS_CLOSING(h) ) {
663
#if PJ_IOQUEUE_HAS_SAFE_UNREG
664
increment_counter(h);
666
queue[processed].key = h;
667
queue[processed].event_type = READABLE_EVENT;
672
* Check for writeability.
674
if ((events[i].events & EPOLLOUT) && key_has_pending_write(h) && !IS_CLOSING(h)) {
676
#if PJ_IOQUEUE_HAS_SAFE_UNREG
677
increment_counter(h);
679
queue[processed].key = h;
680
queue[processed].event_type = WRITEABLE_EVENT;
686
* Check for completion of connect() operation.
688
if ((events[i].events & EPOLLOUT) && (h->connecting) && !IS_CLOSING(h)) {
690
#if PJ_IOQUEUE_HAS_SAFE_UNREG
691
increment_counter(h);
693
queue[processed].key = h;
694
queue[processed].event_type = WRITEABLE_EVENT;
697
#endif /* PJ_HAS_TCP */
700
* Check for error condition.
702
if (events[i].events & EPOLLERR && (h->connecting) && !IS_CLOSING(h)) {
704
#if PJ_IOQUEUE_HAS_SAFE_UNREG
705
increment_counter(h);
707
queue[processed].key = h;
708
queue[processed].event_type = EXCEPTION_EVENT;
712
pj_lock_release(ioqueue->lock);
714
/* Now process the events. */
715
for (i=0; i<processed; ++i) {
716
switch (queue[i].event_type) {
718
ioqueue_dispatch_read_event(ioqueue, queue[i].key);
720
case WRITEABLE_EVENT:
721
ioqueue_dispatch_write_event(ioqueue, queue[i].key);
723
case EXCEPTION_EVENT:
724
ioqueue_dispatch_exception_event(ioqueue, queue[i].key);
727
pj_assert(!"Invalid event!");
731
#if PJ_IOQUEUE_HAS_SAFE_UNREG
732
decrement_counter(queue[i].key);
737
* When epoll returns > 0 but no descriptors are actually set!
739
if (count > 0 && !processed && msec > 0) {
740
pj_thread_sleep(msec);
743
pj_get_timestamp(&t1);
744
TRACE_((THIS_FILE, "ioqueue_poll() returns %d, time=%d usec",
745
processed, pj_elapsed_usec(&t2, &t1)));