1
/* $Id: ioqueue_winnt.c 4724 2014-01-31 08:52:09Z nanang $ */
3
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
#include <pj/ioqueue.h>
24
#include <pj/string.h>
28
#include <pj/assert.h>
30
#include <pj/compat/socket.h>
33
#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
34
# include <winsock2.h>
35
#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
39
#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
44
/* The address specified in AcceptEx() must be 16 more than the size of
45
* SOCKADDR (source: MSDN).
47
#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
49
typedef struct generic_overlapped
51
WSAOVERLAPPED overlapped;
52
pj_ioqueue_operation_e operation;
56
* OVERLAPPPED structure for send and receive.
58
typedef struct ioqueue_overlapped
60
WSAOVERLAPPED overlapped;
61
pj_ioqueue_operation_e operation;
63
pj_sockaddr_in dummy_addr;
69
* OVERLAP structure for accept.
71
typedef struct ioqueue_accept_rec
73
WSAOVERLAPPED overlapped;
74
pj_ioqueue_operation_e operation;
76
pj_sock_t *newsock_ptr;
80
char accept_buf[2 * ACCEPT_ADDR_LEN];
85
* Structure to hold pending operation key.
89
generic_overlapped generic;
90
ioqueue_overlapped overlapped;
92
ioqueue_accept_rec accept;
96
/* Type of handle in the key. */
104
enum { POST_QUIT_LEN = 0xFFFFDEADUL };
107
* Structure for individual socket.
109
struct pj_ioqueue_key_t
111
PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
113
pj_ioqueue_t *ioqueue;
116
enum handle_type hnd_type;
117
pj_ioqueue_callback cb;
118
pj_bool_t allow_concurrent;
124
#if PJ_IOQUEUE_HAS_SAFE_UNREG
125
pj_atomic_t *ref_count;
127
pj_time_val free_time;
134
* IO Queue structure.
140
pj_bool_t auto_delete_lock;
141
pj_bool_t default_concurrency;
143
#if PJ_IOQUEUE_HAS_SAFE_UNREG
144
pj_ioqueue_key_t active_list;
145
pj_ioqueue_key_t free_list;
146
pj_ioqueue_key_t closing_list;
149
/* These are to keep track of connecting sockets */
151
unsigned event_count;
152
HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
153
unsigned connecting_count;
154
HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
155
pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
160
#if PJ_IOQUEUE_HAS_SAFE_UNREG
162
static void scan_closing_keys(pj_ioqueue_t *ioqueue);
168
* Process the socket when the overlapped accept() completed.
170
static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
171
ioqueue_accept_rec *accept_overlapped)
173
struct sockaddr *local;
174
struct sockaddr *remote;
175
int locallen, remotelen;
180
/* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
181
* addresses can be obtained with getsockname() and getpeername().
183
status = setsockopt(accept_overlapped->newsock, SOL_SOCKET,
184
SO_UPDATE_ACCEPT_CONTEXT,
187
/* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
188
* So ignore the error status.
191
/* Operation complete immediately. */
192
if (accept_overlapped->addrlen) {
193
GetAcceptExSockaddrs( accept_overlapped->accept_buf,
201
if (*accept_overlapped->addrlen >= locallen) {
202
if (accept_overlapped->local)
203
pj_memcpy(accept_overlapped->local, local, locallen);
204
if (accept_overlapped->remote)
205
pj_memcpy(accept_overlapped->remote, remote, locallen);
207
if (accept_overlapped->local)
208
pj_bzero(accept_overlapped->local,
209
*accept_overlapped->addrlen);
210
if (accept_overlapped->remote)
211
pj_bzero(accept_overlapped->remote,
212
*accept_overlapped->addrlen);
215
*accept_overlapped->addrlen = locallen;
217
if (accept_overlapped->newsock_ptr)
218
*accept_overlapped->newsock_ptr = accept_overlapped->newsock;
219
accept_overlapped->operation = 0;
222
static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
224
pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
225
HANDLE hEvent = ioqueue->connecting_handles[pos];
227
/* Remove key from array of connecting handles. */
228
pj_array_erase(ioqueue->connecting_keys, sizeof(key),
229
ioqueue->connecting_count, pos);
230
pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
231
ioqueue->connecting_count, pos);
232
--ioqueue->connecting_count;
234
/* Disassociate the socket from the event. */
235
WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
237
/* Put event object to pool. */
238
if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
239
ioqueue->event_pool[ioqueue->event_count++] = hEvent;
241
/* Shouldn't happen. There should be no more pending connections
251
* Poll for the completion of non-blocking connect().
252
* If there's a completion, the function return the key of the completed
253
* socket, and 'result' argument contains the connect() result. If connect()
254
* succeeded, 'result' will have value zero, otherwise will have the error
257
static int check_connecting( pj_ioqueue_t *ioqueue )
259
if (ioqueue->connecting_count) {
263
pj_ioqueue_key_t *key;
265
} events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
267
pj_lock_acquire(ioqueue->lock);
268
for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
271
result = WaitForMultipleObjects(ioqueue->connecting_count,
272
ioqueue->connecting_handles,
274
if (result >= WAIT_OBJECT_0 &&
275
result < WAIT_OBJECT_0+ioqueue->connecting_count)
277
WSANETWORKEVENTS net_events;
279
/* Got completed connect(). */
280
unsigned pos = result - WAIT_OBJECT_0;
281
events[count].key = ioqueue->connecting_keys[pos];
283
/* See whether connect has succeeded. */
284
WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
285
ioqueue->connecting_handles[pos],
287
events[count].status =
288
PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
290
/* Erase socket from pending connect. */
291
erase_connecting_socket(ioqueue, pos);
297
pj_lock_release(ioqueue->lock);
299
/* Call callbacks. */
300
for (i=0; i<count; ++i) {
301
if (events[i].key->cb.on_connect_complete) {
302
events[i].key->cb.on_connect_complete(events[i].key,
318
PJ_DEF(const char*) pj_ioqueue_name(void)
324
* pj_ioqueue_create()
326
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
328
pj_ioqueue_t **p_ioqueue)
330
pj_ioqueue_t *ioqueue;
334
PJ_UNUSED_ARG(max_fd);
335
PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
337
rc = sizeof(union operation_key);
339
/* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
340
PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
341
sizeof(union operation_key), PJ_EBUG);
344
ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
345
ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
346
if (ioqueue->iocp == NULL)
347
return PJ_RETURN_OS_ERROR(GetLastError());
349
/* Create IOCP mutex */
350
rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock);
351
if (rc != PJ_SUCCESS) {
352
CloseHandle(ioqueue->iocp);
356
ioqueue->auto_delete_lock = PJ_TRUE;
357
ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
359
#if PJ_IOQUEUE_HAS_SAFE_UNREG
361
* Create and initialize key pools.
363
pj_list_init(&ioqueue->active_list);
364
pj_list_init(&ioqueue->free_list);
365
pj_list_init(&ioqueue->closing_list);
367
/* Preallocate keys according to max_fd setting, and put them
370
for (i=0; i<max_fd; ++i) {
371
pj_ioqueue_key_t *key;
373
key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
375
rc = pj_atomic_create(pool, 0, &key->ref_count);
376
if (rc != PJ_SUCCESS) {
377
key = ioqueue->free_list.next;
378
while (key != &ioqueue->free_list) {
379
pj_atomic_destroy(key->ref_count);
380
pj_mutex_destroy(key->mutex);
383
CloseHandle(ioqueue->iocp);
387
rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex);
388
if (rc != PJ_SUCCESS) {
389
pj_atomic_destroy(key->ref_count);
390
key = ioqueue->free_list.next;
391
while (key != &ioqueue->free_list) {
392
pj_atomic_destroy(key->ref_count);
393
pj_mutex_destroy(key->mutex);
396
CloseHandle(ioqueue->iocp);
400
pj_list_push_back(&ioqueue->free_list, key);
404
*p_ioqueue = ioqueue;
406
PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
411
* pj_ioqueue_destroy()
413
PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
418
pj_ioqueue_key_t *key;
421
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
423
pj_lock_acquire(ioqueue->lock);
426
/* Destroy events in the pool */
427
for (i=0; i<ioqueue->event_count; ++i) {
428
CloseHandle(ioqueue->event_pool[i]);
430
ioqueue->event_count = 0;
433
if (CloseHandle(ioqueue->iocp) != TRUE)
434
return PJ_RETURN_OS_ERROR(GetLastError());
436
#if PJ_IOQUEUE_HAS_SAFE_UNREG
437
/* Destroy reference counters */
438
key = ioqueue->active_list.next;
439
while (key != &ioqueue->active_list) {
440
pj_atomic_destroy(key->ref_count);
441
pj_mutex_destroy(key->mutex);
445
key = ioqueue->closing_list.next;
446
while (key != &ioqueue->closing_list) {
447
pj_atomic_destroy(key->ref_count);
448
pj_mutex_destroy(key->mutex);
452
key = ioqueue->free_list.next;
453
while (key != &ioqueue->free_list) {
454
pj_atomic_destroy(key->ref_count);
455
pj_mutex_destroy(key->mutex);
460
if (ioqueue->auto_delete_lock)
461
pj_lock_destroy(ioqueue->lock);
467
PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
470
PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
471
ioqueue->default_concurrency = allow;
476
* pj_ioqueue_set_lock()
478
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
480
pj_bool_t auto_delete )
482
PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
484
if (ioqueue->auto_delete_lock) {
485
pj_lock_destroy(ioqueue->lock);
488
ioqueue->lock = lock;
489
ioqueue->auto_delete_lock = auto_delete;
495
* pj_ioqueue_register_sock()
497
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
498
pj_ioqueue_t *ioqueue,
501
const pj_ioqueue_callback *cb,
502
pj_ioqueue_key_t **key )
505
pj_ioqueue_key_t *rec;
509
PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
511
pj_lock_acquire(ioqueue->lock);
513
#if PJ_IOQUEUE_HAS_SAFE_UNREG
514
/* Scan closing list first to release unused keys.
515
* Must do this with lock acquired.
517
scan_closing_keys(ioqueue);
519
/* If safe unregistration is used, then get the key record from
522
if (pj_list_empty(&ioqueue->free_list)) {
523
pj_lock_release(ioqueue->lock);
527
rec = ioqueue->free_list.next;
530
/* Set initial reference count to 1 */
531
pj_assert(pj_atomic_get(rec->ref_count) == 0);
532
pj_atomic_inc(rec->ref_count);
537
rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
540
/* Build the key for this socket. */
541
rec->ioqueue = ioqueue;
542
rec->hnd = (HANDLE)sock;
543
rec->hnd_type = HND_IS_SOCKET;
544
rec->user_data = user_data;
545
pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
547
/* Set concurrency for this handle */
548
rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency);
549
if (rc != PJ_SUCCESS) {
550
pj_lock_release(ioqueue->lock);
558
/* Set socket to nonblocking. */
560
rc = ioctlsocket(sock, FIONBIO, &value);
562
pj_lock_release(ioqueue->lock);
563
return PJ_RETURN_OS_ERROR(WSAGetLastError());
566
/* Associate with IOCP */
567
hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
569
pj_lock_release(ioqueue->lock);
570
return PJ_RETURN_OS_ERROR(GetLastError());
575
#if PJ_IOQUEUE_HAS_SAFE_UNREG
576
pj_list_push_back(&ioqueue->active_list, rec);
579
pj_lock_release(ioqueue->lock);
586
* pj_ioqueue_get_user_data()
588
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
590
PJ_ASSERT_RETURN(key, NULL);
591
return key->user_data;
595
* pj_ioqueue_set_user_data()
597
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
601
PJ_ASSERT_RETURN(key, PJ_EINVAL);
604
*old_data = key->user_data;
606
key->user_data = user_data;
611
#if PJ_IOQUEUE_HAS_SAFE_UNREG
612
/* Decrement the key's reference counter, and when the counter reach zero,
615
static void decrement_counter(pj_ioqueue_key_t *key)
617
if (pj_atomic_dec_and_get(key->ref_count) == 0) {
619
pj_lock_acquire(key->ioqueue->lock);
621
pj_assert(key->closing == 1);
622
pj_gettickcount(&key->free_time);
623
key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
624
pj_time_val_normalize(&key->free_time);
627
pj_list_push_back(&key->ioqueue->closing_list, key);
629
pj_lock_release(key->ioqueue->lock);
635
* Poll the I/O Completion Port, execute callback,
636
* and return the key and bytes transferred of the last operation.
638
static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
639
pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
641
DWORD dwBytesTransferred, dwKey;
642
generic_overlapped *pOv;
643
pj_ioqueue_key_t *key;
644
pj_ssize_t size_status = -1;
647
/* Poll for completion status. */
648
rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransferred,
649
&dwKey, (OVERLAPPED**)&pOv,
652
/* The return value is:
653
* - nonzero if event was dequeued.
654
* - zero and pOv==NULL if no event was dequeued.
655
* - zero and pOv!=NULL if event for failed I/O was dequeued.
660
/* Event was dequeued for either successfull or failed I/O */
661
key = (pj_ioqueue_key_t*)dwKey;
662
size_status = dwBytesTransferred;
664
/* Report to caller regardless */
666
*p_bytes = size_status;
670
#if PJ_IOQUEUE_HAS_SAFE_UNREG
671
/* We shouldn't call callbacks if key is quitting. */
675
/* If concurrency is disabled, lock the key
676
* (and save the lock status to local var since app may change
677
* concurrency setting while in the callback) */
678
if (key->allow_concurrent == PJ_FALSE) {
679
pj_mutex_lock(key->mutex);
685
/* Now that we get the lock, check again that key is not closing */
688
pj_mutex_unlock(key->mutex);
693
/* Increment reference counter to prevent this key from being
696
pj_atomic_inc(key->ref_count);
698
PJ_UNUSED_ARG(has_lock);
701
/* Carry out the callback */
702
switch (pOv->operation) {
703
case PJ_IOQUEUE_OP_READ:
704
case PJ_IOQUEUE_OP_RECV:
705
case PJ_IOQUEUE_OP_RECV_FROM:
707
if (key->cb.on_read_complete)
708
key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
711
case PJ_IOQUEUE_OP_WRITE:
712
case PJ_IOQUEUE_OP_SEND:
713
case PJ_IOQUEUE_OP_SEND_TO:
715
if (key->cb.on_write_complete)
716
key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
720
case PJ_IOQUEUE_OP_ACCEPT:
721
/* special case for accept. */
722
ioqueue_on_accept_complete(key, (ioqueue_accept_rec*)pOv);
723
if (key->cb.on_accept_complete) {
724
ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
725
pj_status_t status = PJ_SUCCESS;
728
newsock = accept_rec->newsock;
729
accept_rec->newsock = PJ_INVALID_SOCKET;
731
if (newsock == PJ_INVALID_SOCKET) {
732
int dwError = WSAGetLastError();
733
if (dwError == 0) dwError = OSERR_ENOTCONN;
734
status = PJ_RETURN_OS_ERROR(dwError);
737
key->cb.on_accept_complete(key, (pj_ioqueue_op_key_t*)pOv,
742
case PJ_IOQUEUE_OP_CONNECT:
744
case PJ_IOQUEUE_OP_NONE:
749
#if PJ_IOQUEUE_HAS_SAFE_UNREG
750
decrement_counter(key);
752
pj_mutex_unlock(key->mutex);
758
/* No event was queued. */
763
* pj_ioqueue_unregister()
765
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
771
PJ_ASSERT_RETURN(key, PJ_EINVAL);
774
if (key->connecting) {
776
pj_ioqueue_t *ioqueue;
778
ioqueue = key->ioqueue;
780
/* Erase from connecting_handles */
781
pj_lock_acquire(ioqueue->lock);
782
for (pos=0; pos < ioqueue->connecting_count; ++pos) {
783
if (ioqueue->connecting_keys[pos] == key) {
784
erase_connecting_socket(ioqueue, pos);
789
pj_lock_release(ioqueue->lock);
793
#if PJ_IOQUEUE_HAS_SAFE_UNREG
794
/* Mark key as closing before closing handle. */
797
/* If concurrency is disabled, wait until the key has finished
798
* processing the callback
800
if (key->allow_concurrent == PJ_FALSE) {
801
pj_mutex_lock(key->mutex);
807
PJ_UNUSED_ARG(has_lock);
810
/* Close handle (the only way to disassociate handle from IOCP).
811
* We also need to close handle to make sure that no further events
812
* will come to the handle.
814
/* Update 2008/07/18 (http://trac.pjsip.org/repos/ticket/575):
815
* - It seems that CloseHandle() in itself does not actually close
816
* the socket (i.e. it will still appear in "netstat" output). Also
817
* if we only use CloseHandle(), an "Invalid Handle" exception will
818
* be raised in WSACleanup().
819
* - MSDN documentation says that CloseHandle() must be called after
820
* closesocket() call (see
821
* http://msdn.microsoft.com/en-us/library/ms724211(VS.85).aspx).
822
* But turns out that this will raise "Invalid Handle" exception
824
* So because of this, we replaced CloseHandle() with closesocket()
825
* instead. These was tested on WinXP SP2.
827
//CloseHandle(key->hnd);
828
pj_sock_close((pj_sock_t)key->hnd);
830
/* Reset callbacks */
831
key->cb.on_accept_complete = NULL;
832
key->cb.on_connect_complete = NULL;
833
key->cb.on_read_complete = NULL;
834
key->cb.on_write_complete = NULL;
836
#if PJ_IOQUEUE_HAS_SAFE_UNREG
837
/* Even after handle is closed, I suspect that IOCP may still try to
838
* do something with the handle, causing memory corruption when pool
839
* debugging is enabled.
841
* Forcing context switch seems to have fixed that, but this is quite
845
* This should not happen if concurrency is disallowed for the key.
846
* So at least application has a solution for this (i.e. by disallowing
847
* concurrency in the key).
849
//This will loop forever if unregistration is done on the callback.
850
//Doing this with RETRY I think should solve the IOCP setting the
851
//socket signalled, without causing the deadlock.
852
//while (pj_atomic_get(key->ref_count) != 1)
853
// pj_thread_sleep(0);
854
for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
857
/* Decrement reference counter to destroy the key. */
858
decrement_counter(key);
861
pj_mutex_unlock(key->mutex);
867
#if PJ_IOQUEUE_HAS_SAFE_UNREG
868
/* Scan the closing list, and put pending closing keys to free list.
869
* Must do this with ioqueue mutex held.
871
static void scan_closing_keys(pj_ioqueue_t *ioqueue)
873
if (!pj_list_empty(&ioqueue->closing_list)) {
875
pj_ioqueue_key_t *key;
877
pj_gettickcount(&now);
879
/* Move closing keys to free list when they've finished the closing
882
key = ioqueue->closing_list.next;
883
while (key != &ioqueue->closing_list) {
884
pj_ioqueue_key_t *next = key->next;
886
pj_assert(key->closing != 0);
888
if (PJ_TIME_VAL_GTE(now, key->free_time)) {
890
pj_list_push_back(&ioqueue->free_list, key);
903
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
907
int connect_count = 0;
911
PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
913
/* Calculate miliseconds timeout for GetQueuedCompletionStatus */
914
dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
916
/* Poll for completion status. */
917
event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
920
/* Check the connecting array, only when there's no activity. */
921
if (event_count == 0) {
922
connect_count = check_connecting(ioqueue);
923
if (connect_count > 0)
924
event_count += connect_count;
928
#if PJ_IOQUEUE_HAS_SAFE_UNREG
929
/* Check the closing keys only when there's no activity and when there are
930
* pending closing keys.
932
if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
933
pj_lock_acquire(ioqueue->lock);
934
scan_closing_keys(ioqueue);
935
pj_lock_release(ioqueue->lock);
939
/* Return number of events. */
946
* Initiate overlapped WSARecv() operation.
948
PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
949
pj_ioqueue_op_key_t *op_key,
955
* Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
956
* addrlen here. But unfortunately it generates EINVAL... :-(
962
union operation_key *op_key_rec;
965
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
967
#if PJ_IOQUEUE_HAS_SAFE_UNREG
968
/* Check key is not closing */
970
return PJ_ECANCELLED;
973
op_key_rec = (union operation_key*)op_key->internal__;
974
op_key_rec->overlapped.wsabuf.buf = buffer;
975
op_key_rec->overlapped.wsabuf.len = *length;
979
/* Try non-overlapped received first to see if data is
980
* immediately available.
982
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
983
rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
984
&bytesRead, &dwFlags, NULL, NULL);
989
DWORD dwError = WSAGetLastError();
990
if (dwError != WSAEWOULDBLOCK) {
992
return PJ_RETURN_OS_ERROR(dwError);
997
dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1000
* No immediate data available.
1001
* Register overlapped Recv() operation.
1003
pj_bzero( &op_key_rec->overlapped.overlapped,
1004
sizeof(op_key_rec->overlapped.overlapped));
1005
op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
1007
rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1008
&bytesRead, &dwFlags,
1009
&op_key_rec->overlapped.overlapped, NULL);
1010
if (rc == SOCKET_ERROR) {
1011
DWORD dwStatus = WSAGetLastError();
1012
if (dwStatus!=WSA_IO_PENDING) {
1014
return PJ_STATUS_FROM_OS(dwStatus);
1018
/* Pending operation has been scheduled. */
1023
* pj_ioqueue_recvfrom()
1025
* Initiate overlapped RecvFrom() operation.
1027
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
1028
pj_ioqueue_op_key_t *op_key,
1032
pj_sockaddr_t *addr,
1038
union operation_key *op_key_rec;
1041
PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
1043
#if PJ_IOQUEUE_HAS_SAFE_UNREG
1044
/* Check key is not closing */
1046
return PJ_ECANCELLED;
1049
op_key_rec = (union operation_key*)op_key->internal__;
1050
op_key_rec->overlapped.wsabuf.buf = buffer;
1051
op_key_rec->overlapped.wsabuf.len = *length;
1055
/* Try non-overlapped received first to see if data is
1056
* immediately available.
1058
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
1059
rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1060
&bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
1062
*length = bytesRead;
1065
DWORD dwError = WSAGetLastError();
1066
if (dwError != WSAEWOULDBLOCK) {
1068
return PJ_RETURN_OS_ERROR(dwError);
1073
dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1076
* No immediate data available.
1077
* Register overlapped Recv() operation.
1079
pj_bzero( &op_key_rec->overlapped.overlapped,
1080
sizeof(op_key_rec->overlapped.overlapped));
1081
op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
1083
rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1084
&bytesRead, &dwFlags, addr, addrlen,
1085
&op_key_rec->overlapped.overlapped, NULL);
1086
if (rc == SOCKET_ERROR) {
1087
DWORD dwStatus = WSAGetLastError();
1088
if (dwStatus!=WSA_IO_PENDING) {
1090
return PJ_STATUS_FROM_OS(dwStatus);
1094
/* Pending operation has been scheduled. */
1101
* Initiate overlapped Send operation.
1103
PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
1104
pj_ioqueue_op_key_t *op_key,
1109
return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
1114
* pj_ioqueue_sendto()
1116
* Initiate overlapped SendTo operation.
1118
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
1119
pj_ioqueue_op_key_t *op_key,
1123
const pj_sockaddr_t *addr,
1129
union operation_key *op_key_rec;
1132
PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
1134
#if PJ_IOQUEUE_HAS_SAFE_UNREG
1135
/* Check key is not closing */
1137
return PJ_ECANCELLED;
1140
op_key_rec = (union operation_key*)op_key->internal__;
1143
* First try blocking write.
1145
op_key_rec->overlapped.wsabuf.buf = (void*)data;
1146
op_key_rec->overlapped.wsabuf.len = *length;
1150
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
1151
rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1152
&bytesWritten, dwFlags, addr, addrlen,
1155
*length = bytesWritten;
1158
DWORD dwStatus = WSAGetLastError();
1159
if (dwStatus != WSAEWOULDBLOCK) {
1161
return PJ_RETURN_OS_ERROR(dwStatus);
1166
dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1169
* Data can't be sent immediately.
1170
* Schedule asynchronous WSASend().
1172
pj_bzero( &op_key_rec->overlapped.overlapped,
1173
sizeof(op_key_rec->overlapped.overlapped));
1174
op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
1176
rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1177
&bytesWritten, dwFlags, addr, addrlen,
1178
&op_key_rec->overlapped.overlapped, NULL);
1179
if (rc == SOCKET_ERROR) {
1180
DWORD dwStatus = WSAGetLastError();
1181
if (dwStatus!=WSA_IO_PENDING)
1182
return PJ_STATUS_FROM_OS(dwStatus);
1185
/* Asynchronous operation successfully submitted. */
1192
* pj_ioqueue_accept()
1194
* Initiate overlapped accept() operation.
1196
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1197
pj_ioqueue_op_key_t *op_key,
1198
pj_sock_t *new_sock,
1199
pj_sockaddr_t *local,
1200
pj_sockaddr_t *remote,
1204
DWORD bytesReceived;
1206
union operation_key *op_key_rec;
1210
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1212
#if PJ_IOQUEUE_HAS_SAFE_UNREG
1213
/* Check key is not closing */
1215
return PJ_ECANCELLED;
1219
* See if there is a new connection immediately available.
1221
sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
1222
if (sock != INVALID_SOCKET) {
1223
/* Yes! New socket is available! */
1224
if (local && addrlen) {
1227
/* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
1228
* addresses can be obtained with getsockname() and getpeername().
1230
status = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
1231
(char*)&key->hnd, sizeof(SOCKET));
1232
/* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
1233
* So ignore the error status.
1236
status = getsockname(sock, local, addrlen);
1238
DWORD dwError = WSAGetLastError();
1240
return PJ_RETURN_OS_ERROR(dwError);
1248
DWORD dwError = WSAGetLastError();
1249
if (dwError != WSAEWOULDBLOCK) {
1250
return PJ_RETURN_OS_ERROR(dwError);
1255
* No connection is immediately available.
1256
* Must schedule an asynchronous operation.
1258
op_key_rec = (union operation_key*)op_key->internal__;
1260
status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0,
1261
&op_key_rec->accept.newsock);
1262
if (status != PJ_SUCCESS)
1265
op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
1266
op_key_rec->accept.addrlen = addrlen;
1267
op_key_rec->accept.local = local;
1268
op_key_rec->accept.remote = remote;
1269
op_key_rec->accept.newsock_ptr = new_sock;
1270
pj_bzero( &op_key_rec->accept.overlapped,
1271
sizeof(op_key_rec->accept.overlapped));
1273
rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
1274
op_key_rec->accept.accept_buf,
1275
0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
1277
&op_key_rec->accept.overlapped );
1280
ioqueue_on_accept_complete(key, &op_key_rec->accept);
1283
DWORD dwStatus = WSAGetLastError();
1284
if (dwStatus!=WSA_IO_PENDING)
1285
return PJ_STATUS_FROM_OS(dwStatus);
1288
/* Asynchronous Accept() has been submitted. */
1294
* pj_ioqueue_connect()
1296
* Initiate overlapped connect() operation (well, it's non-blocking actually,
1297
* since there's no overlapped version of connect()).
1299
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1300
const pj_sockaddr_t *addr,
1304
pj_ioqueue_t *ioqueue;
1307
PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1309
#if PJ_IOQUEUE_HAS_SAFE_UNREG
1310
/* Check key is not closing */
1312
return PJ_ECANCELLED;
1315
/* Initiate connect() */
1316
if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
1318
dwStatus = WSAGetLastError();
1319
if (dwStatus != WSAEWOULDBLOCK) {
1320
return PJ_RETURN_OS_ERROR(dwStatus);
1323
/* Connect has completed immediately! */
1327
ioqueue = key->ioqueue;
1329
/* Add to the array of connecting socket to be polled */
1330
pj_lock_acquire(ioqueue->lock);
1332
if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
1333
pj_lock_release(ioqueue->lock);
1334
return PJ_ETOOMANYCONN;
1337
/* Get or create event object. */
1338
if (ioqueue->event_count) {
1339
hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
1340
--ioqueue->event_count;
1342
hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
1343
if (hEvent == NULL) {
1344
DWORD dwStatus = GetLastError();
1345
pj_lock_release(ioqueue->lock);
1346
return PJ_STATUS_FROM_OS(dwStatus);
1350
/* Mark key as connecting.
1351
* We can't use array index since key can be removed dynamically.
1353
key->connecting = 1;
1355
/* Associate socket events to the event object. */
1356
if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
1357
CloseHandle(hEvent);
1358
pj_lock_release(ioqueue->lock);
1359
return PJ_RETURN_OS_ERROR(WSAGetLastError());
1363
ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
1364
ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
1365
ioqueue->connecting_count++;
1367
pj_lock_release(ioqueue->lock);
1371
#endif /* #if PJ_HAS_TCP */
1374
PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1377
pj_bzero(op_key, size);
1380
PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1381
pj_ioqueue_op_key_t *op_key )
1384
DWORD bytesTransferred;
1386
rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
1387
&bytesTransferred, FALSE );
1390
return GetLastError()==ERROR_IO_INCOMPLETE;
1397
PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1398
pj_ioqueue_op_key_t *op_key,
1399
pj_ssize_t bytes_status )
1403
rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
1404
(long)key, (OVERLAPPED*)op_key );
1406
return PJ_RETURN_OS_ERROR(GetLastError());
1412
PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1415
PJ_ASSERT_RETURN(key, PJ_EINVAL);
1417
/* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1420
PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1422
key->allow_concurrent = allow;
1426
PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1428
#if PJ_IOQUEUE_HAS_SAFE_UNREG
1429
return pj_mutex_lock(key->mutex);
1431
PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
1435
PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1437
#if PJ_IOQUEUE_HAS_SAFE_UNREG
1438
return pj_mutex_unlock(key->mutex);
1440
PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);