1
/* $Id: ioqueue_common_abs.c 3666 2011-07-19 08:40:20Z nanang $ */
3
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
* ioqueue_common_abs.c
24
* This contains common functionalities to emulate proactor pattern with
25
* various event dispatching mechanisms (e.g. select, epoll).
27
* This file will be included by the appropriate ioqueue implementation.
28
* This file is NOT supposed to be compiled as stand-alone source.
31
#define PENDING_RETRY 2
33
static void ioqueue_init( pj_ioqueue_t *ioqueue )
36
ioqueue->auto_delete_lock = 0;
37
ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
40
static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
42
if (ioqueue->auto_delete_lock && ioqueue->lock ) {
43
pj_lock_release(ioqueue->lock);
44
return pj_lock_destroy(ioqueue->lock);
51
* pj_ioqueue_set_lock()
53
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
55
pj_bool_t auto_delete )
57
PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
59
if (ioqueue->auto_delete_lock && ioqueue->lock) {
60
pj_lock_destroy(ioqueue->lock);
64
ioqueue->auto_delete_lock = auto_delete;
69
static pj_status_t ioqueue_init_key( pj_pool_t *pool,
70
pj_ioqueue_t *ioqueue,
71
pj_ioqueue_key_t *key,
74
const pj_ioqueue_callback *cb)
81
key->ioqueue = ioqueue;
83
key->user_data = user_data;
84
pj_list_init(&key->read_list);
85
pj_list_init(&key->write_list);
87
pj_list_init(&key->accept_list);
92
pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
94
#if PJ_IOQUEUE_HAS_SAFE_UNREG
95
/* Set initial reference count to 1 */
96
pj_assert(key->ref_count == 0);
102
rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
103
if (rc != PJ_SUCCESS)
106
/* Get socket type. When socket type is datagram, some optimization
107
* will be performed during send to allow parallel send operations.
109
optlen = sizeof(key->fd_type);
110
rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
111
&key->fd_type, &optlen);
112
if (rc != PJ_SUCCESS)
113
key->fd_type = pj_SOCK_STREAM();
115
/* Create mutex for the key. */
116
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
117
rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
124
* pj_ioqueue_get_user_data()
126
* Obtain value associated with a key.
128
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
130
PJ_ASSERT_RETURN(key != NULL, NULL);
131
return key->user_data;
135
* pj_ioqueue_set_user_data()
137
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
141
PJ_ASSERT_RETURN(key, PJ_EINVAL);
144
*old_data = key->user_data;
145
key->user_data = user_data;
150
PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
152
return !pj_list_empty(&key->write_list);
155
PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
157
return !pj_list_empty(&key->read_list);
160
PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
163
return !pj_list_empty(&key->accept_list);
170
PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
172
return key->connecting;
176
#if PJ_IOQUEUE_HAS_SAFE_UNREG
177
# define IS_CLOSING(key) (key->closing)
179
# define IS_CLOSING(key) (0)
184
* ioqueue_dispatch_event()
186
* Report occurence of an event in the key to be processed by the
189
void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
192
pj_mutex_lock(h->mutex);
195
pj_mutex_unlock(h->mutex);
199
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
201
/* Completion of connect() operation */
205
/* Clear operation. */
208
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
209
ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
212
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
214
* On Linux, use getsockopt to read the SO_ERROR option at
215
* level SOL_SOCKET to determine whether connect() completed
216
* successfully (if SO_ERROR is zero).
220
int vallen = sizeof(value);
221
int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
224
/* Argh!! What to do now???
225
* Just indicate that the socket is connected. The
226
* application will get error as soon as it tries to use
227
* the socket to send/receive.
231
status = PJ_STATUS_FROM_OS(value);
234
#elif defined(PJ_WIN32) && PJ_WIN32!=0
235
status = PJ_SUCCESS; /* success */
237
/* Excellent information in D.J. Bernstein page:
238
* http://cr.yp.to/docs/connect.html
240
* Seems like the most portable way of detecting connect()
241
* failure is to call getpeername(). If socket is connected,
242
* getpeername() will return 0. If the socket is not connected,
243
* it will return ENOTCONN, and read(fd, &ch, 1) will produce
244
* the right errno through error slippage. This is a combination
245
* of suggestions from Douglas C. Schmidt and Ken Keys.
248
struct sockaddr_in addr;
249
int addrlen = sizeof(addr);
251
status = pj_sock_getpeername(h->fd, (struct sockaddr*)&addr,
256
/* Unlock; from this point we don't need to hold key's mutex
257
* (unless concurrency is disabled, which in this case we should
258
* hold the mutex while calling the callback) */
259
if (h->allow_concurrent) {
260
/* concurrency may be changed while we're in the callback, so
264
pj_mutex_unlock(h->mutex);
270
if (h->cb.on_connect_complete && !IS_CLOSING(h))
271
(*h->cb.on_connect_complete)(h, status);
273
/* Unlock if we still hold the lock */
275
pj_mutex_unlock(h->mutex);
281
#endif /* PJ_HAS_TCP */
282
if (key_has_pending_write(h)) {
283
/* Socket is writable. */
284
struct write_operation *write_op;
286
pj_status_t send_rc = PJ_SUCCESS;
288
/* Get the first in the queue. */
289
write_op = h->write_list.next;
291
/* For datagrams, we can remove the write_op from the list
292
* so that send() can work in parallel.
294
if (h->fd_type == pj_SOCK_DGRAM()) {
295
pj_list_erase(write_op);
297
if (pj_list_empty(&h->write_list))
298
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
303
* Unfortunately we must do this while holding key's mutex, thus
304
* preventing parallel write on a single key.. :-((
306
sent = write_op->size - write_op->written;
307
if (write_op->op == PJ_IOQUEUE_OP_SEND) {
308
send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
309
&sent, write_op->flags);
310
/* Can't do this. We only clear "op" after we're finished sending
314
} else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
316
while (--retry >= 0) {
317
send_rc = pj_sock_sendto(h->fd,
318
write_op->buf+write_op->written,
319
&sent, write_op->flags,
321
write_op->rmt_addrlen);
322
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
323
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
324
/* Special treatment for dead UDP sockets here, see ticket #1107 */
325
if (send_rc==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(h) &&
326
h->fd_type==pj_SOCK_DGRAM())
328
PJ_PERROR(4,(THIS_FILE, send_rc,
329
"Send error for socket %d, retrying",
338
/* Can't do this. We only clear "op" after we're finished sending
343
pj_assert(!"Invalid operation type!");
344
write_op->op = PJ_IOQUEUE_OP_NONE;
348
if (send_rc == PJ_SUCCESS) {
349
write_op->written += sent;
351
pj_assert(send_rc > 0);
352
write_op->written = -send_rc;
355
/* Are we finished with this buffer? */
356
if (send_rc!=PJ_SUCCESS ||
357
write_op->written == (pj_ssize_t)write_op->size ||
358
h->fd_type == pj_SOCK_DGRAM())
362
write_op->op = PJ_IOQUEUE_OP_NONE;
364
if (h->fd_type != pj_SOCK_DGRAM()) {
365
/* Write completion of the whole stream. */
366
pj_list_erase(write_op);
368
/* Clear operation if there's no more data to send. */
369
if (pj_list_empty(&h->write_list))
370
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
374
/* Unlock; from this point we don't need to hold key's mutex
375
* (unless concurrency is disabled, which in this case we should
376
* hold the mutex while calling the callback) */
377
if (h->allow_concurrent) {
378
/* concurrency may be changed while we're in the callback, so
382
pj_mutex_unlock(h->mutex);
388
if (h->cb.on_write_complete && !IS_CLOSING(h)) {
389
(*h->cb.on_write_complete)(h,
390
(pj_ioqueue_op_key_t*)write_op,
395
pj_mutex_unlock(h->mutex);
399
pj_mutex_unlock(h->mutex);
405
* This is normal; execution may fall here when multiple threads
406
* are signalled for the same event, but only one thread eventually
407
* able to process the event.
409
pj_mutex_unlock(h->mutex);
413
void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
418
pj_mutex_lock(h->mutex);
421
pj_mutex_unlock(h->mutex);
426
if (!pj_list_empty(&h->accept_list)) {
428
struct accept_operation *accept_op;
431
/* Get one accept operation from the list. */
432
accept_op = h->accept_list.next;
433
pj_list_erase(accept_op);
434
accept_op->op = PJ_IOQUEUE_OP_NONE;
436
/* Clear bit in fdset if there is no more pending accept */
437
if (pj_list_empty(&h->accept_list))
438
ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
440
rc=pj_sock_accept(h->fd, accept_op->accept_fd,
441
accept_op->rmt_addr, accept_op->addrlen);
442
if (rc==PJ_SUCCESS && accept_op->local_addr) {
443
rc = pj_sock_getsockname(*accept_op->accept_fd,
444
accept_op->local_addr,
448
/* Unlock; from this point we don't need to hold key's mutex
449
* (unless concurrency is disabled, which in this case we should
450
* hold the mutex while calling the callback) */
451
if (h->allow_concurrent) {
452
/* concurrency may be changed while we're in the callback, so
456
pj_mutex_unlock(h->mutex);
462
if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
463
(*h->cb.on_accept_complete)(h,
464
(pj_ioqueue_op_key_t*)accept_op,
465
*accept_op->accept_fd, rc);
469
pj_mutex_unlock(h->mutex);
474
if (key_has_pending_read(h)) {
475
struct read_operation *read_op;
476
pj_ssize_t bytes_read;
479
/* Get one pending read operation from the list. */
480
read_op = h->read_list.next;
481
pj_list_erase(read_op);
483
/* Clear fdset if there is no pending read. */
484
if (pj_list_empty(&h->read_list))
485
ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
487
bytes_read = read_op->size;
489
if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
490
read_op->op = PJ_IOQUEUE_OP_NONE;
491
rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
494
read_op->rmt_addrlen);
495
} else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
496
read_op->op = PJ_IOQUEUE_OP_NONE;
497
rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
500
pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
501
read_op->op = PJ_IOQUEUE_OP_NONE;
503
* User has specified pj_ioqueue_read().
504
* On Win32, we should do ReadFile(). But because we got
505
* here because of select() anyway, user must have put a
506
* socket descriptor on h->fd, which in this case we can
507
* just call pj_sock_recv() instead of ReadFile().
508
* On Unix, user may put a file in h->fd, so we'll have
509
* to call read() here.
510
* This may not compile on systems which doesn't have
511
* read(). That's why we only specify PJ_LINUX here so
512
* that error is easier to catch.
514
# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
515
defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
516
rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
518
//rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
519
// &bytes_read, NULL);
520
# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
521
bytes_read = read(h->fd, read_op->buf, bytes_read);
522
rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
523
# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
524
bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
525
rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
527
# error "Implement read() for this platform!"
531
if (rc != PJ_SUCCESS) {
532
# if defined(PJ_WIN32) && PJ_WIN32 != 0
533
/* On Win32, for UDP, WSAECONNRESET on the receive side
534
* indicates that previous sending has triggered ICMP Port
535
* Unreachable message.
536
* But we wouldn't know at this point which one of previous
537
* key that has triggered the error, since UDP socket can
539
* So we'll just ignore it!
542
if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
543
//PJ_LOG(4,(THIS_FILE,
544
// "Ignored ICMP port unreach. on key=%p", h));
548
/* In any case we would report this to caller. */
551
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
552
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
553
/* Special treatment for dead UDP sockets here, see ticket #1107 */
554
if (rc == PJ_STATUS_FROM_OS(ENOTCONN) && !IS_CLOSING(h) &&
555
h->fd_type==pj_SOCK_DGRAM())
562
/* Unlock; from this point we don't need to hold key's mutex
563
* (unless concurrency is disabled, which in this case we should
564
* hold the mutex while calling the callback) */
565
if (h->allow_concurrent) {
566
/* concurrency may be changed while we're in the callback, so
570
pj_mutex_unlock(h->mutex);
576
if (h->cb.on_read_complete && !IS_CLOSING(h)) {
577
(*h->cb.on_read_complete)(h,
578
(pj_ioqueue_op_key_t*)read_op,
583
pj_mutex_unlock(h->mutex);
588
* This is normal; execution may fall here when multiple threads
589
* are signalled for the same event, but only one thread eventually
590
* able to process the event.
592
pj_mutex_unlock(h->mutex);
597
void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
598
pj_ioqueue_key_t *h )
602
pj_mutex_lock(h->mutex);
604
if (!h->connecting) {
605
/* It is possible that more than one thread was woken up, thus
606
* the remaining thread will see h->connecting as zero because
607
* it has been processed by other thread.
609
pj_mutex_unlock(h->mutex);
614
pj_mutex_unlock(h->mutex);
618
/* Clear operation. */
621
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
622
ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
624
/* Unlock; from this point we don't need to hold key's mutex
625
* (unless concurrency is disabled, which in this case we should
626
* hold the mutex while calling the callback) */
627
if (h->allow_concurrent) {
628
/* concurrency may be changed while we're in the callback, so
632
pj_mutex_unlock(h->mutex);
638
if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
639
pj_status_t status = -1;
640
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
642
int vallen = sizeof(value);
643
int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
646
status = PJ_RETURN_OS_ERROR(value);
650
(*h->cb.on_connect_complete)(h, status);
654
pj_mutex_unlock(h->mutex);
661
* Start asynchronous recv() from the socket.
663
PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
664
pj_ioqueue_op_key_t *op_key,
669
struct read_operation *read_op;
671
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
674
/* Check if key is closing (need to do this first before accessing
675
* other variables, since they might have been destroyed. See ticket
679
return PJ_ECANCELLED;
681
read_op = (struct read_operation*)op_key;
682
read_op->op = PJ_IOQUEUE_OP_NONE;
684
/* Try to see if there's data immediately available.
686
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
691
status = pj_sock_recv(key->fd, buffer, &size, flags);
692
if (status == PJ_SUCCESS) {
693
/* Yes! Data is available! */
697
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
698
* the error to caller.
700
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
705
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
708
* No data is immediately available.
709
* Must schedule asynchronous operation to the ioqueue.
711
read_op->op = PJ_IOQUEUE_OP_RECV;
712
read_op->buf = buffer;
713
read_op->size = *length;
714
read_op->flags = flags;
716
pj_mutex_lock(key->mutex);
717
/* Check again. Handle may have been closed after the previous check
718
* in multithreaded app. If we add bad handle to the set it will
719
* corrupt the ioqueue set. See #913
721
if (IS_CLOSING(key)) {
722
pj_mutex_unlock(key->mutex);
723
return PJ_ECANCELLED;
725
pj_list_insert_before(&key->read_list, read_op);
726
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
727
pj_mutex_unlock(key->mutex);
733
* pj_ioqueue_recvfrom()
735
* Start asynchronous recvfrom() from the socket.
737
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
738
pj_ioqueue_op_key_t *op_key,
745
struct read_operation *read_op;
747
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
750
/* Check if key is closing. */
752
return PJ_ECANCELLED;
754
read_op = (struct read_operation*)op_key;
755
read_op->op = PJ_IOQUEUE_OP_NONE;
757
/* Try to see if there's data immediately available.
759
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
764
status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
766
if (status == PJ_SUCCESS) {
767
/* Yes! Data is available! */
771
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
772
* the error to caller.
774
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
779
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
782
* No data is immediately available.
783
* Must schedule asynchronous operation to the ioqueue.
785
read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
786
read_op->buf = buffer;
787
read_op->size = *length;
788
read_op->flags = flags;
789
read_op->rmt_addr = addr;
790
read_op->rmt_addrlen = addrlen;
792
pj_mutex_lock(key->mutex);
793
/* Check again. Handle may have been closed after the previous check
794
* in multithreaded app. If we add bad handle to the set it will
795
* corrupt the ioqueue set. See #913
797
if (IS_CLOSING(key)) {
798
pj_mutex_unlock(key->mutex);
799
return PJ_ECANCELLED;
801
pj_list_insert_before(&key->read_list, read_op);
802
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
803
pj_mutex_unlock(key->mutex);
811
* Start asynchronous send() to the descriptor.
813
PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
814
pj_ioqueue_op_key_t *op_key,
819
struct write_operation *write_op;
824
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
827
/* Check if key is closing. */
829
return PJ_ECANCELLED;
831
/* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
832
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
835
* Try to send data immediately, only if there's no pending write!
837
* We are speculating that the list is empty here without properly
838
* acquiring ioqueue's mutex first. This is intentional, to maximize
839
* performance via parallelism.
841
* This should be safe, because:
842
* - by convention, we require caller to make sure that the
843
* key is not unregistered while other threads are invoking
844
* an operation on the same key.
845
* - pj_list_empty() is safe to be invoked by multiple threads,
846
* even when other threads are modifying the list.
848
if (pj_list_empty(&key->write_list)) {
850
* See if data can be sent immediately.
853
status = pj_sock_send(key->fd, data, &sent, flags);
854
if (status == PJ_SUCCESS) {
859
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
860
* the error to caller.
862
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
869
* Schedule asynchronous send.
871
write_op = (struct write_operation*)op_key;
873
/* Spin if write_op has pending operation */
874
for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
879
/* Unable to send packet because there is already pending write in the
880
* write_op. We could not put the operation into the write_op
881
* because write_op already contains a pending operation! And
882
* we could not send the packet directly with send() either,
883
* because that will break the order of the packet. So we can
884
* only return error here.
886
* This could happen for example in multithreads program,
887
* where polling is done by one thread, while other threads are doing
888
* the sending only. If the polling thread runs on lower priority
889
* than the sending thread, then it's possible that the pending
890
* write flag is not cleared in-time because clearing is only done
893
* Aplication should specify multiple write operation keys on
894
* situation like this.
896
//pj_assert(!"ioqueue: there is pending operation on this key!");
900
write_op->op = PJ_IOQUEUE_OP_SEND;
901
write_op->buf = (char*)data;
902
write_op->size = *length;
903
write_op->written = 0;
904
write_op->flags = flags;
906
pj_mutex_lock(key->mutex);
907
/* Check again. Handle may have been closed after the previous check
908
* in multithreaded app. If we add bad handle to the set it will
909
* corrupt the ioqueue set. See #913
911
if (IS_CLOSING(key)) {
912
pj_mutex_unlock(key->mutex);
913
return PJ_ECANCELLED;
915
pj_list_insert_before(&key->write_list, write_op);
916
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
917
pj_mutex_unlock(key->mutex);
924
* pj_ioqueue_sendto()
926
* Start asynchronous write() to the descriptor.
928
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
929
pj_ioqueue_op_key_t *op_key,
933
const pj_sockaddr_t *addr,
936
struct write_operation *write_op;
938
pj_bool_t restart_retry = PJ_FALSE;
942
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
945
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
946
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
949
PJ_UNUSED_ARG(restart_retry);
951
/* Check if key is closing. */
953
return PJ_ECANCELLED;
955
/* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
956
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
959
* Try to send data immediately, only if there's no pending write!
961
* We are speculating that the list is empty here without properly
962
* acquiring ioqueue's mutex first. This is intentional, to maximize
963
* performance via parallelism.
965
* This should be safe, because:
966
* - by convention, we require caller to make sure that the
967
* key is not unregistered while other threads are invoking
968
* an operation on the same key.
969
* - pj_list_empty() is safe to be invoked by multiple threads,
970
* even when other threads are modifying the list.
972
if (pj_list_empty(&key->write_list)) {
974
* See if data can be sent immediately.
977
status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
978
if (status == PJ_SUCCESS) {
983
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
984
* the error to caller.
986
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
987
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
988
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
989
/* Special treatment for dead UDP sockets here, see ticket #1107 */
990
if (status==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(key) &&
991
key->fd_type==pj_SOCK_DGRAM() && !restart_retry)
993
PJ_PERROR(4,(THIS_FILE, status,
994
"Send error for socket %d, retrying",
996
replace_udp_sock(key);
997
restart_retry = PJ_TRUE;
998
goto retry_on_restart;
1009
* Check that address storage can hold the address parameter.
1011
PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
1014
* Schedule asynchronous send.
1016
write_op = (struct write_operation*)op_key;
1018
/* Spin if write_op has pending operation */
1019
for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
1024
/* Unable to send packet because there is already pending write on the
1025
* write_op. We could not put the operation into the write_op
1026
* because write_op already contains a pending operation! And
1027
* we could not send the packet directly with sendto() either,
1028
* because that will break the order of the packet. So we can
1029
* only return error here.
1031
* This could happen for example in multithreads program,
1032
* where polling is done by one thread, while other threads are doing
1033
* the sending only. If the polling thread runs on lower priority
1034
* than the sending thread, then it's possible that the pending
1035
* write flag is not cleared in-time because clearing is only done
1038
* Aplication should specify multiple write operation keys on
1039
* situation like this.
1041
//pj_assert(!"ioqueue: there is pending operation on this key!");
1045
write_op->op = PJ_IOQUEUE_OP_SEND_TO;
1046
write_op->buf = (char*)data;
1047
write_op->size = *length;
1048
write_op->written = 0;
1049
write_op->flags = flags;
1050
pj_memcpy(&write_op->rmt_addr, addr, addrlen);
1051
write_op->rmt_addrlen = addrlen;
1053
pj_mutex_lock(key->mutex);
1054
/* Check again. Handle may have been closed after the previous check
1055
* in multithreaded app. If we add bad handle to the set it will
1056
* corrupt the ioqueue set. See #913
1058
if (IS_CLOSING(key)) {
1059
pj_mutex_unlock(key->mutex);
1060
return PJ_ECANCELLED;
1062
pj_list_insert_before(&key->write_list, write_op);
1063
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1064
pj_mutex_unlock(key->mutex);
1071
* Initiate overlapped accept() operation.
1073
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1074
pj_ioqueue_op_key_t *op_key,
1075
pj_sock_t *new_sock,
1076
pj_sockaddr_t *local,
1077
pj_sockaddr_t *remote,
1080
struct accept_operation *accept_op;
1083
/* check parameters. All must be specified! */
1084
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1086
/* Check if key is closing. */
1087
if (IS_CLOSING(key))
1088
return PJ_ECANCELLED;
1090
accept_op = (struct accept_operation*)op_key;
1091
accept_op->op = PJ_IOQUEUE_OP_NONE;
1094
* See if there's new connection available immediately.
1096
if (pj_list_empty(&key->accept_list)) {
1097
status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1098
if (status == PJ_SUCCESS) {
1099
/* Yes! New connection is available! */
1100
if (local && addrlen) {
1101
status = pj_sock_getsockname(*new_sock, local, addrlen);
1102
if (status != PJ_SUCCESS) {
1103
pj_sock_close(*new_sock);
1104
*new_sock = PJ_INVALID_SOCKET;
1110
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1111
* the error to caller.
1113
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1120
* No connection is available immediately.
1121
* Schedule accept() operation to be completed when there is incoming
1122
* connection available.
1124
accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1125
accept_op->accept_fd = new_sock;
1126
accept_op->rmt_addr = remote;
1127
accept_op->addrlen= addrlen;
1128
accept_op->local_addr = local;
1130
pj_mutex_lock(key->mutex);
1131
/* Check again. Handle may have been closed after the previous check
1132
* in multithreaded app. If we add bad handle to the set it will
1133
* corrupt the ioqueue set. See #913
1135
if (IS_CLOSING(key)) {
1136
pj_mutex_unlock(key->mutex);
1137
return PJ_ECANCELLED;
1139
pj_list_insert_before(&key->accept_list, accept_op);
1140
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
1141
pj_mutex_unlock(key->mutex);
1147
* Initiate overlapped connect() operation (well, it's non-blocking actually,
1148
* since there's no overlapped version of connect()).
1150
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1151
const pj_sockaddr_t *addr,
1156
/* check parameters. All must be specified! */
1157
PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1159
/* Check if key is closing. */
1160
if (IS_CLOSING(key))
1161
return PJ_ECANCELLED;
1163
/* Check if socket has not been marked for connecting */
1164
if (key->connecting != 0)
1167
status = pj_sock_connect(key->fd, addr, addrlen);
1168
if (status == PJ_SUCCESS) {
1172
if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1174
pj_mutex_lock(key->mutex);
1175
/* Check again. Handle may have been closed after the previous
1176
* check in multithreaded app. See #913
1178
if (IS_CLOSING(key)) {
1179
pj_mutex_unlock(key->mutex);
1180
return PJ_ECANCELLED;
1182
key->connecting = PJ_TRUE;
1183
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1184
ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
1185
pj_mutex_unlock(key->mutex);
1193
#endif /* PJ_HAS_TCP */
1196
PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1199
pj_bzero(op_key, size);
1204
* pj_ioqueue_is_pending()
1206
PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1207
pj_ioqueue_op_key_t *op_key )
1209
struct generic_operation *op_rec;
1213
op_rec = (struct generic_operation*)op_key;
1214
return op_rec->op != 0;
1219
* pj_ioqueue_post_completion()
1221
PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1222
pj_ioqueue_op_key_t *op_key,
1223
pj_ssize_t bytes_status )
1225
struct generic_operation *op_rec;
1228
* Find the operation key in all pending operation list to
1229
* really make sure that it's still there; then call the callback.
1231
pj_mutex_lock(key->mutex);
1233
/* Find the operation in the pending read list. */
1234
op_rec = (struct generic_operation*)key->read_list.next;
1235
while (op_rec != (void*)&key->read_list) {
1236
if (op_rec == (void*)op_key) {
1237
pj_list_erase(op_rec);
1238
op_rec->op = PJ_IOQUEUE_OP_NONE;
1239
pj_mutex_unlock(key->mutex);
1241
(*key->cb.on_read_complete)(key, op_key, bytes_status);
1244
op_rec = op_rec->next;
1247
/* Find the operation in the pending write list. */
1248
op_rec = (struct generic_operation*)key->write_list.next;
1249
while (op_rec != (void*)&key->write_list) {
1250
if (op_rec == (void*)op_key) {
1251
pj_list_erase(op_rec);
1252
op_rec->op = PJ_IOQUEUE_OP_NONE;
1253
pj_mutex_unlock(key->mutex);
1255
(*key->cb.on_write_complete)(key, op_key, bytes_status);
1258
op_rec = op_rec->next;
1261
/* Find the operation in the pending accept list. */
1262
op_rec = (struct generic_operation*)key->accept_list.next;
1263
while (op_rec != (void*)&key->accept_list) {
1264
if (op_rec == (void*)op_key) {
1265
pj_list_erase(op_rec);
1266
op_rec->op = PJ_IOQUEUE_OP_NONE;
1267
pj_mutex_unlock(key->mutex);
1269
(*key->cb.on_accept_complete)(key, op_key,
1274
op_rec = op_rec->next;
1277
pj_mutex_unlock(key->mutex);
1279
return PJ_EINVALIDOP;
1282
PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
1285
PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
1286
ioqueue->default_concurrency = allow;
1291
PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1294
PJ_ASSERT_RETURN(key, PJ_EINVAL);
1296
/* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1299
PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1301
key->allow_concurrent = allow;
1305
PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1307
return pj_mutex_lock(key->mutex);
1310
PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1312
return pj_mutex_unlock(key->mutex);