1
/* $Id: ioqueue_common_abs.c 4359 2013-02-21 11:18:36Z bennylp $ */
3
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
* ioqueue_common_abs.c
24
* This contains common functionalities to emulate proactor pattern with
25
* various event dispatching mechanisms (e.g. select, epoll).
27
* This file will be included by the appropriate ioqueue implementation.
28
* This file is NOT supposed to be compiled as stand-alone source.
31
#define PENDING_RETRY 2
33
static void ioqueue_init( pj_ioqueue_t *ioqueue )
36
ioqueue->auto_delete_lock = 0;
37
ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
40
static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
42
if (ioqueue->auto_delete_lock && ioqueue->lock ) {
43
pj_lock_release(ioqueue->lock);
44
return pj_lock_destroy(ioqueue->lock);
51
* pj_ioqueue_set_lock()
53
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
55
pj_bool_t auto_delete )
57
PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
59
if (ioqueue->auto_delete_lock && ioqueue->lock) {
60
pj_lock_destroy(ioqueue->lock);
64
ioqueue->auto_delete_lock = auto_delete;
69
static pj_status_t ioqueue_init_key( pj_pool_t *pool,
70
pj_ioqueue_t *ioqueue,
71
pj_ioqueue_key_t *key,
73
pj_grp_lock_t *grp_lock,
75
const pj_ioqueue_callback *cb)
82
key->ioqueue = ioqueue;
84
key->user_data = user_data;
85
pj_list_init(&key->read_list);
86
pj_list_init(&key->write_list);
88
pj_list_init(&key->accept_list);
93
pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
95
#if PJ_IOQUEUE_HAS_SAFE_UNREG
96
/* Set initial reference count to 1 */
97
pj_assert(key->ref_count == 0);
103
rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
104
if (rc != PJ_SUCCESS)
107
/* Get socket type. When socket type is datagram, some optimization
108
* will be performed during send to allow parallel send operations.
110
optlen = sizeof(key->fd_type);
111
rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
112
&key->fd_type, &optlen);
113
if (rc != PJ_SUCCESS)
114
key->fd_type = pj_SOCK_STREAM();
116
/* Create mutex for the key. */
117
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
118
rc = pj_lock_create_simple_mutex(poll, NULL, &key->lock);
120
if (rc != PJ_SUCCESS)
124
key->grp_lock = grp_lock;
126
pj_grp_lock_add_ref_dbg(key->grp_lock, "ioqueue", 0);
133
* pj_ioqueue_get_user_data()
135
* Obtain value associated with a key.
137
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
139
PJ_ASSERT_RETURN(key != NULL, NULL);
140
return key->user_data;
144
* pj_ioqueue_set_user_data()
146
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
150
PJ_ASSERT_RETURN(key, PJ_EINVAL);
153
*old_data = key->user_data;
154
key->user_data = user_data;
159
PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
161
return !pj_list_empty(&key->write_list);
164
PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
166
return !pj_list_empty(&key->read_list);
169
PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
172
return !pj_list_empty(&key->accept_list);
179
PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
181
return key->connecting;
185
#if PJ_IOQUEUE_HAS_SAFE_UNREG
186
# define IS_CLOSING(key) (key->closing)
188
# define IS_CLOSING(key) (0)
193
* ioqueue_dispatch_event()
195
* Report occurence of an event in the key to be processed by the
198
void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
201
pj_ioqueue_lock_key(h);
204
pj_ioqueue_unlock_key(h);
208
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
210
/* Completion of connect() operation */
214
/* Clear operation. */
217
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
218
ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
221
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
223
* On Linux, use getsockopt to read the SO_ERROR option at
224
* level SOL_SOCKET to determine whether connect() completed
225
* successfully (if SO_ERROR is zero).
229
int vallen = sizeof(value);
230
int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
233
/* Argh!! What to do now???
234
* Just indicate that the socket is connected. The
235
* application will get error as soon as it tries to use
236
* the socket to send/receive.
240
status = PJ_STATUS_FROM_OS(value);
243
#elif defined(PJ_WIN32) && PJ_WIN32!=0
244
status = PJ_SUCCESS; /* success */
246
/* Excellent information in D.J. Bernstein page:
247
* http://cr.yp.to/docs/connect.html
249
* Seems like the most portable way of detecting connect()
250
* failure is to call getpeername(). If socket is connected,
251
* getpeername() will return 0. If the socket is not connected,
252
* it will return ENOTCONN, and read(fd, &ch, 1) will produce
253
* the right errno through error slippage. This is a combination
254
* of suggestions from Douglas C. Schmidt and Ken Keys.
257
struct sockaddr_in addr;
258
int addrlen = sizeof(addr);
260
status = pj_sock_getpeername(h->fd, (struct sockaddr*)&addr,
265
/* Unlock; from this point we don't need to hold key's mutex
266
* (unless concurrency is disabled, which in this case we should
267
* hold the mutex while calling the callback) */
268
if (h->allow_concurrent) {
269
/* concurrency may be changed while we're in the callback, so
273
pj_ioqueue_unlock_key(h);
279
if (h->cb.on_connect_complete && !IS_CLOSING(h))
280
(*h->cb.on_connect_complete)(h, status);
282
/* Unlock if we still hold the lock */
284
pj_ioqueue_unlock_key(h);
290
#endif /* PJ_HAS_TCP */
291
if (key_has_pending_write(h)) {
292
/* Socket is writable. */
293
struct write_operation *write_op;
295
pj_status_t send_rc = PJ_SUCCESS;
297
/* Get the first in the queue. */
298
write_op = h->write_list.next;
300
/* For datagrams, we can remove the write_op from the list
301
* so that send() can work in parallel.
303
if (h->fd_type == pj_SOCK_DGRAM()) {
304
pj_list_erase(write_op);
306
if (pj_list_empty(&h->write_list))
307
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
312
* Unfortunately we must do this while holding key's mutex, thus
313
* preventing parallel write on a single key.. :-((
315
sent = write_op->size - write_op->written;
316
if (write_op->op == PJ_IOQUEUE_OP_SEND) {
317
send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
318
&sent, write_op->flags);
319
/* Can't do this. We only clear "op" after we're finished sending
323
} else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
325
while (--retry >= 0) {
326
send_rc = pj_sock_sendto(h->fd,
327
write_op->buf+write_op->written,
328
&sent, write_op->flags,
330
write_op->rmt_addrlen);
331
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
332
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
333
/* Special treatment for dead UDP sockets here, see ticket #1107 */
334
if (send_rc==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(h) &&
335
h->fd_type==pj_SOCK_DGRAM())
337
PJ_PERROR(4,(THIS_FILE, send_rc,
338
"Send error for socket %d, retrying",
347
/* Can't do this. We only clear "op" after we're finished sending
352
pj_assert(!"Invalid operation type!");
353
write_op->op = PJ_IOQUEUE_OP_NONE;
357
if (send_rc == PJ_SUCCESS) {
358
write_op->written += sent;
360
pj_assert(send_rc > 0);
361
write_op->written = -send_rc;
364
/* Are we finished with this buffer? */
365
if (send_rc!=PJ_SUCCESS ||
366
write_op->written == (pj_ssize_t)write_op->size ||
367
h->fd_type == pj_SOCK_DGRAM())
371
write_op->op = PJ_IOQUEUE_OP_NONE;
373
if (h->fd_type != pj_SOCK_DGRAM()) {
374
/* Write completion of the whole stream. */
375
pj_list_erase(write_op);
377
/* Clear operation if there's no more data to send. */
378
if (pj_list_empty(&h->write_list))
379
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
383
/* Unlock; from this point we don't need to hold key's mutex
384
* (unless concurrency is disabled, which in this case we should
385
* hold the mutex while calling the callback) */
386
if (h->allow_concurrent) {
387
/* concurrency may be changed while we're in the callback, so
391
pj_ioqueue_unlock_key(h);
398
if (h->cb.on_write_complete && !IS_CLOSING(h)) {
399
(*h->cb.on_write_complete)(h,
400
(pj_ioqueue_op_key_t*)write_op,
405
pj_ioqueue_unlock_key(h);
409
pj_ioqueue_unlock_key(h);
415
* This is normal; execution may fall here when multiple threads
416
* are signalled for the same event, but only one thread eventually
417
* able to process the event.
419
pj_ioqueue_unlock_key(h);
423
void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
428
pj_ioqueue_lock_key(h);
431
pj_ioqueue_unlock_key(h);
436
if (!pj_list_empty(&h->accept_list)) {
438
struct accept_operation *accept_op;
441
/* Get one accept operation from the list. */
442
accept_op = h->accept_list.next;
443
pj_list_erase(accept_op);
444
accept_op->op = PJ_IOQUEUE_OP_NONE;
446
/* Clear bit in fdset if there is no more pending accept */
447
if (pj_list_empty(&h->accept_list))
448
ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
450
rc=pj_sock_accept(h->fd, accept_op->accept_fd,
451
accept_op->rmt_addr, accept_op->addrlen);
452
if (rc==PJ_SUCCESS && accept_op->local_addr) {
453
rc = pj_sock_getsockname(*accept_op->accept_fd,
454
accept_op->local_addr,
458
/* Unlock; from this point we don't need to hold key's mutex
459
* (unless concurrency is disabled, which in this case we should
460
* hold the mutex while calling the callback) */
461
if (h->allow_concurrent) {
462
/* concurrency may be changed while we're in the callback, so
466
pj_ioqueue_unlock_key(h);
473
if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
474
(*h->cb.on_accept_complete)(h,
475
(pj_ioqueue_op_key_t*)accept_op,
476
*accept_op->accept_fd, rc);
480
pj_ioqueue_unlock_key(h);
485
if (key_has_pending_read(h)) {
486
struct read_operation *read_op;
487
pj_ssize_t bytes_read;
490
/* Get one pending read operation from the list. */
491
read_op = h->read_list.next;
492
pj_list_erase(read_op);
494
/* Clear fdset if there is no pending read. */
495
if (pj_list_empty(&h->read_list))
496
ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
498
bytes_read = read_op->size;
500
if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
501
read_op->op = PJ_IOQUEUE_OP_NONE;
502
rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
505
read_op->rmt_addrlen);
506
} else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
507
read_op->op = PJ_IOQUEUE_OP_NONE;
508
rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
511
pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
512
read_op->op = PJ_IOQUEUE_OP_NONE;
514
* User has specified pj_ioqueue_read().
515
* On Win32, we should do ReadFile(). But because we got
516
* here because of select() anyway, user must have put a
517
* socket descriptor on h->fd, which in this case we can
518
* just call pj_sock_recv() instead of ReadFile().
519
* On Unix, user may put a file in h->fd, so we'll have
520
* to call read() here.
521
* This may not compile on systems which doesn't have
522
* read(). That's why we only specify PJ_LINUX here so
523
* that error is easier to catch.
525
# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
526
defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
527
rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
529
//rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
530
// &bytes_read, NULL);
531
# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
532
bytes_read = read(h->fd, read_op->buf, bytes_read);
533
rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
534
# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
535
bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
536
rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
538
# error "Implement read() for this platform!"
542
if (rc != PJ_SUCCESS) {
543
# if defined(PJ_WIN32) && PJ_WIN32 != 0
544
/* On Win32, for UDP, WSAECONNRESET on the receive side
545
* indicates that previous sending has triggered ICMP Port
546
* Unreachable message.
547
* But we wouldn't know at this point which one of previous
548
* key that has triggered the error, since UDP socket can
550
* So we'll just ignore it!
553
if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
554
//PJ_LOG(4,(THIS_FILE,
555
// "Ignored ICMP port unreach. on key=%p", h));
559
/* In any case we would report this to caller. */
562
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
563
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
564
/* Special treatment for dead UDP sockets here, see ticket #1107 */
565
if (rc == PJ_STATUS_FROM_OS(ENOTCONN) && !IS_CLOSING(h) &&
566
h->fd_type==pj_SOCK_DGRAM())
573
/* Unlock; from this point we don't need to hold key's mutex
574
* (unless concurrency is disabled, which in this case we should
575
* hold the mutex while calling the callback) */
576
if (h->allow_concurrent) {
577
/* concurrency may be changed while we're in the callback, so
581
pj_ioqueue_unlock_key(h);
588
if (h->cb.on_read_complete && !IS_CLOSING(h)) {
589
(*h->cb.on_read_complete)(h,
590
(pj_ioqueue_op_key_t*)read_op,
595
pj_ioqueue_unlock_key(h);
600
* This is normal; execution may fall here when multiple threads
601
* are signalled for the same event, but only one thread eventually
602
* able to process the event.
604
pj_ioqueue_unlock_key(h);
609
void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
610
pj_ioqueue_key_t *h )
614
pj_ioqueue_lock_key(h);
616
if (!h->connecting) {
617
/* It is possible that more than one thread was woken up, thus
618
* the remaining thread will see h->connecting as zero because
619
* it has been processed by other thread.
621
pj_ioqueue_unlock_key(h);
626
pj_ioqueue_unlock_key(h);
630
/* Clear operation. */
633
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
634
ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
636
/* Unlock; from this point we don't need to hold key's mutex
637
* (unless concurrency is disabled, which in this case we should
638
* hold the mutex while calling the callback) */
639
if (h->allow_concurrent) {
640
/* concurrency may be changed while we're in the callback, so
644
pj_ioqueue_unlock_key(h);
651
if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
652
pj_status_t status = -1;
653
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
655
int vallen = sizeof(value);
656
int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
659
status = PJ_RETURN_OS_ERROR(value);
663
(*h->cb.on_connect_complete)(h, status);
667
pj_ioqueue_unlock_key(h);
674
* Start asynchronous recv() from the socket.
676
PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
677
pj_ioqueue_op_key_t *op_key,
682
struct read_operation *read_op;
684
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
687
/* Check if key is closing (need to do this first before accessing
688
* other variables, since they might have been destroyed. See ticket
692
return PJ_ECANCELLED;
694
read_op = (struct read_operation*)op_key;
695
read_op->op = PJ_IOQUEUE_OP_NONE;
697
/* Try to see if there's data immediately available.
699
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
704
status = pj_sock_recv(key->fd, buffer, &size, flags);
705
if (status == PJ_SUCCESS) {
706
/* Yes! Data is available! */
710
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
711
* the error to caller.
713
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
718
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
721
* No data is immediately available.
722
* Must schedule asynchronous operation to the ioqueue.
724
read_op->op = PJ_IOQUEUE_OP_RECV;
725
read_op->buf = buffer;
726
read_op->size = *length;
727
read_op->flags = flags;
729
pj_ioqueue_lock_key(key);
730
/* Check again. Handle may have been closed after the previous check
731
* in multithreaded app. If we add bad handle to the set it will
732
* corrupt the ioqueue set. See #913
734
if (IS_CLOSING(key)) {
735
pj_ioqueue_unlock_key(key);
736
return PJ_ECANCELLED;
738
pj_list_insert_before(&key->read_list, read_op);
739
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
740
pj_ioqueue_unlock_key(key);
746
* pj_ioqueue_recvfrom()
748
* Start asynchronous recvfrom() from the socket.
750
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
751
pj_ioqueue_op_key_t *op_key,
758
struct read_operation *read_op;
760
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
763
/* Check if key is closing. */
765
return PJ_ECANCELLED;
767
read_op = (struct read_operation*)op_key;
768
read_op->op = PJ_IOQUEUE_OP_NONE;
770
/* Try to see if there's data immediately available.
772
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
777
status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
779
if (status == PJ_SUCCESS) {
780
/* Yes! Data is available! */
784
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
785
* the error to caller.
787
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
792
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
795
* No data is immediately available.
796
* Must schedule asynchronous operation to the ioqueue.
798
read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
799
read_op->buf = buffer;
800
read_op->size = *length;
801
read_op->flags = flags;
802
read_op->rmt_addr = addr;
803
read_op->rmt_addrlen = addrlen;
805
pj_ioqueue_lock_key(key);
806
/* Check again. Handle may have been closed after the previous check
807
* in multithreaded app. If we add bad handle to the set it will
808
* corrupt the ioqueue set. See #913
810
if (IS_CLOSING(key)) {
811
pj_ioqueue_unlock_key(key);
812
return PJ_ECANCELLED;
814
pj_list_insert_before(&key->read_list, read_op);
815
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
816
pj_ioqueue_unlock_key(key);
824
* Start asynchronous send() to the descriptor.
826
PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
827
pj_ioqueue_op_key_t *op_key,
832
struct write_operation *write_op;
837
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
840
/* Check if key is closing. */
842
return PJ_ECANCELLED;
844
/* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
845
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
848
* Try to send data immediately, only if there's no pending write!
850
* We are speculating that the list is empty here without properly
851
* acquiring ioqueue's mutex first. This is intentional, to maximize
852
* performance via parallelism.
854
* This should be safe, because:
855
* - by convention, we require caller to make sure that the
856
* key is not unregistered while other threads are invoking
857
* an operation on the same key.
858
* - pj_list_empty() is safe to be invoked by multiple threads,
859
* even when other threads are modifying the list.
861
if (pj_list_empty(&key->write_list)) {
863
* See if data can be sent immediately.
866
status = pj_sock_send(key->fd, data, &sent, flags);
867
if (status == PJ_SUCCESS) {
872
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
873
* the error to caller.
875
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
882
* Schedule asynchronous send.
884
write_op = (struct write_operation*)op_key;
886
/* Spin if write_op has pending operation */
887
for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
892
/* Unable to send packet because there is already pending write in the
893
* write_op. We could not put the operation into the write_op
894
* because write_op already contains a pending operation! And
895
* we could not send the packet directly with send() either,
896
* because that will break the order of the packet. So we can
897
* only return error here.
899
* This could happen for example in multithreads program,
900
* where polling is done by one thread, while other threads are doing
901
* the sending only. If the polling thread runs on lower priority
902
* than the sending thread, then it's possible that the pending
903
* write flag is not cleared in-time because clearing is only done
906
* Aplication should specify multiple write operation keys on
907
* situation like this.
909
//pj_assert(!"ioqueue: there is pending operation on this key!");
913
write_op->op = PJ_IOQUEUE_OP_SEND;
914
write_op->buf = (char*)data;
915
write_op->size = *length;
916
write_op->written = 0;
917
write_op->flags = flags;
919
pj_ioqueue_lock_key(key);
920
/* Check again. Handle may have been closed after the previous check
921
* in multithreaded app. If we add bad handle to the set it will
922
* corrupt the ioqueue set. See #913
924
if (IS_CLOSING(key)) {
925
pj_ioqueue_unlock_key(key);
926
return PJ_ECANCELLED;
928
pj_list_insert_before(&key->write_list, write_op);
929
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
930
pj_ioqueue_unlock_key(key);
937
* pj_ioqueue_sendto()
939
* Start asynchronous write() to the descriptor.
941
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
942
pj_ioqueue_op_key_t *op_key,
946
const pj_sockaddr_t *addr,
949
struct write_operation *write_op;
951
pj_bool_t restart_retry = PJ_FALSE;
955
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
958
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
959
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
962
PJ_UNUSED_ARG(restart_retry);
964
/* Check if key is closing. */
966
return PJ_ECANCELLED;
968
/* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
969
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
972
* Try to send data immediately, only if there's no pending write!
974
* We are speculating that the list is empty here without properly
975
* acquiring ioqueue's mutex first. This is intentional, to maximize
976
* performance via parallelism.
978
* This should be safe, because:
979
* - by convention, we require caller to make sure that the
980
* key is not unregistered while other threads are invoking
981
* an operation on the same key.
982
* - pj_list_empty() is safe to be invoked by multiple threads,
983
* even when other threads are modifying the list.
985
if (pj_list_empty(&key->write_list)) {
987
* See if data can be sent immediately.
990
status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
991
if (status == PJ_SUCCESS) {
996
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
997
* the error to caller.
999
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1000
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
1001
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
1002
/* Special treatment for dead UDP sockets here, see ticket #1107 */
1003
if (status==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(key) &&
1004
key->fd_type==pj_SOCK_DGRAM() && !restart_retry)
1006
PJ_PERROR(4,(THIS_FILE, status,
1007
"Send error for socket %d, retrying",
1009
replace_udp_sock(key);
1010
restart_retry = PJ_TRUE;
1011
goto retry_on_restart;
1022
* Check that address storage can hold the address parameter.
1024
PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
1027
* Schedule asynchronous send.
1029
write_op = (struct write_operation*)op_key;
1031
/* Spin if write_op has pending operation */
1032
for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
1037
/* Unable to send packet because there is already pending write on the
1038
* write_op. We could not put the operation into the write_op
1039
* because write_op already contains a pending operation! And
1040
* we could not send the packet directly with sendto() either,
1041
* because that will break the order of the packet. So we can
1042
* only return error here.
1044
* This could happen for example in multithreads program,
1045
* where polling is done by one thread, while other threads are doing
1046
* the sending only. If the polling thread runs on lower priority
1047
* than the sending thread, then it's possible that the pending
1048
* write flag is not cleared in-time because clearing is only done
1051
* Aplication should specify multiple write operation keys on
1052
* situation like this.
1054
//pj_assert(!"ioqueue: there is pending operation on this key!");
1058
write_op->op = PJ_IOQUEUE_OP_SEND_TO;
1059
write_op->buf = (char*)data;
1060
write_op->size = *length;
1061
write_op->written = 0;
1062
write_op->flags = flags;
1063
pj_memcpy(&write_op->rmt_addr, addr, addrlen);
1064
write_op->rmt_addrlen = addrlen;
1066
pj_ioqueue_lock_key(key);
1067
/* Check again. Handle may have been closed after the previous check
1068
* in multithreaded app. If we add bad handle to the set it will
1069
* corrupt the ioqueue set. See #913
1071
if (IS_CLOSING(key)) {
1072
pj_ioqueue_unlock_key(key);
1073
return PJ_ECANCELLED;
1075
pj_list_insert_before(&key->write_list, write_op);
1076
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1077
pj_ioqueue_unlock_key(key);
1084
* Initiate overlapped accept() operation.
1086
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1087
pj_ioqueue_op_key_t *op_key,
1088
pj_sock_t *new_sock,
1089
pj_sockaddr_t *local,
1090
pj_sockaddr_t *remote,
1093
struct accept_operation *accept_op;
1096
/* check parameters. All must be specified! */
1097
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1099
/* Check if key is closing. */
1100
if (IS_CLOSING(key))
1101
return PJ_ECANCELLED;
1103
accept_op = (struct accept_operation*)op_key;
1104
accept_op->op = PJ_IOQUEUE_OP_NONE;
1107
* See if there's new connection available immediately.
1109
if (pj_list_empty(&key->accept_list)) {
1110
status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1111
if (status == PJ_SUCCESS) {
1112
/* Yes! New connection is available! */
1113
if (local && addrlen) {
1114
status = pj_sock_getsockname(*new_sock, local, addrlen);
1115
if (status != PJ_SUCCESS) {
1116
pj_sock_close(*new_sock);
1117
*new_sock = PJ_INVALID_SOCKET;
1123
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1124
* the error to caller.
1126
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1133
* No connection is available immediately.
1134
* Schedule accept() operation to be completed when there is incoming
1135
* connection available.
1137
accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1138
accept_op->accept_fd = new_sock;
1139
accept_op->rmt_addr = remote;
1140
accept_op->addrlen= addrlen;
1141
accept_op->local_addr = local;
1143
pj_ioqueue_lock_key(key);
1144
/* Check again. Handle may have been closed after the previous check
1145
* in multithreaded app. If we add bad handle to the set it will
1146
* corrupt the ioqueue set. See #913
1148
if (IS_CLOSING(key)) {
1149
pj_ioqueue_unlock_key(key);
1150
return PJ_ECANCELLED;
1152
pj_list_insert_before(&key->accept_list, accept_op);
1153
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
1154
pj_ioqueue_unlock_key(key);
1160
* Initiate overlapped connect() operation (well, it's non-blocking actually,
1161
* since there's no overlapped version of connect()).
1163
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1164
const pj_sockaddr_t *addr,
1169
/* check parameters. All must be specified! */
1170
PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1172
/* Check if key is closing. */
1173
if (IS_CLOSING(key))
1174
return PJ_ECANCELLED;
1176
/* Check if socket has not been marked for connecting */
1177
if (key->connecting != 0)
1180
status = pj_sock_connect(key->fd, addr, addrlen);
1181
if (status == PJ_SUCCESS) {
1185
if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1187
pj_ioqueue_lock_key(key);
1188
/* Check again. Handle may have been closed after the previous
1189
* check in multithreaded app. See #913
1191
if (IS_CLOSING(key)) {
1192
pj_ioqueue_unlock_key(key);
1193
return PJ_ECANCELLED;
1195
key->connecting = PJ_TRUE;
1196
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1197
ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
1198
pj_ioqueue_unlock_key(key);
1206
#endif /* PJ_HAS_TCP */
1209
PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1212
pj_bzero(op_key, size);
1217
* pj_ioqueue_is_pending()
1219
PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1220
pj_ioqueue_op_key_t *op_key )
1222
struct generic_operation *op_rec;
1226
op_rec = (struct generic_operation*)op_key;
1227
return op_rec->op != 0;
1232
* pj_ioqueue_post_completion()
1234
PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1235
pj_ioqueue_op_key_t *op_key,
1236
pj_ssize_t bytes_status )
1238
struct generic_operation *op_rec;
1241
* Find the operation key in all pending operation list to
1242
* really make sure that it's still there; then call the callback.
1244
pj_ioqueue_lock_key(key);
1246
/* Find the operation in the pending read list. */
1247
op_rec = (struct generic_operation*)key->read_list.next;
1248
while (op_rec != (void*)&key->read_list) {
1249
if (op_rec == (void*)op_key) {
1250
pj_list_erase(op_rec);
1251
op_rec->op = PJ_IOQUEUE_OP_NONE;
1252
pj_ioqueue_unlock_key(key);
1254
(*key->cb.on_read_complete)(key, op_key, bytes_status);
1257
op_rec = op_rec->next;
1260
/* Find the operation in the pending write list. */
1261
op_rec = (struct generic_operation*)key->write_list.next;
1262
while (op_rec != (void*)&key->write_list) {
1263
if (op_rec == (void*)op_key) {
1264
pj_list_erase(op_rec);
1265
op_rec->op = PJ_IOQUEUE_OP_NONE;
1266
pj_ioqueue_unlock_key(key);
1268
(*key->cb.on_write_complete)(key, op_key, bytes_status);
1271
op_rec = op_rec->next;
1274
/* Find the operation in the pending accept list. */
1275
op_rec = (struct generic_operation*)key->accept_list.next;
1276
while (op_rec != (void*)&key->accept_list) {
1277
if (op_rec == (void*)op_key) {
1278
pj_list_erase(op_rec);
1279
op_rec->op = PJ_IOQUEUE_OP_NONE;
1280
pj_ioqueue_unlock_key(key);
1282
(*key->cb.on_accept_complete)(key, op_key,
1287
op_rec = op_rec->next;
1290
pj_ioqueue_unlock_key(key);
1292
return PJ_EINVALIDOP;
1295
PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
1298
PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
1299
ioqueue->default_concurrency = allow;
1304
PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1307
PJ_ASSERT_RETURN(key, PJ_EINVAL);
1309
/* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1312
PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1314
key->allow_concurrent = allow;
1318
PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1321
return pj_grp_lock_acquire(key->grp_lock);
1323
return pj_lock_acquire(key->lock);
1326
PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1329
return pj_grp_lock_release(key->grp_lock);
1331
return pj_lock_release(key->lock);