~ubuntu-branches/ubuntu/wily/sflphone/wily

« back to all changes in this revision

Viewing changes to daemon/libs/pjproject-2.2.1/pjlib/src/pj/ioqueue_winnt.c

  • Committer: Package Import Robot
  • Author(s): Francois Marier, Francois Marier, Mark Purcell
  • Date: 2014-10-18 15:08:50 UTC
  • mfrom: (1.1.12)
  • mto: This revision was merged to the branch mainline in revision 29.
  • Revision ID: package-import@ubuntu.com-20141018150850-2exfk34ckb15pcwi
Tags: 1.4.1-0.1
[ Francois Marier ]
* Non-maintainer upload
* New upstream release (closes: #759576, #741130)
  - debian/rules +PJPROJECT_VERSION := 2.2.1
  - add upstream patch to fix broken TLS support
  - add patch to fix pjproject regression

[ Mark Purcell ]
* Build-Depends:
  - sflphone-daemon + libavformat-dev, libavcodec-dev, libswscale-dev,
  libavdevice-dev, libavutil-dev
  - sflphone-gnome + libclutter-gtk-1.0-dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* $Id: ioqueue_winnt.c 4724 2014-01-31 08:52:09Z nanang $ */
 
2
/* 
 
3
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
 
4
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
 
5
 *
 
6
 * This program is free software; you can redistribute it and/or modify
 
7
 * it under the terms of the GNU General Public License as published by
 
8
 * the Free Software Foundation; either version 2 of the License, or
 
9
 * (at your option) any later version.
 
10
 *
 
11
 * This program is distributed in the hope that it will be useful,
 
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 * GNU General Public License for more details.
 
15
 *
 
16
 * You should have received a copy of the GNU General Public License
 
17
 * along with this program; if not, write to the Free Software
 
18
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 
19
 */
 
20
#include <pj/ioqueue.h>
 
21
#include <pj/os.h>
 
22
#include <pj/lock.h>
 
23
#include <pj/pool.h>
 
24
#include <pj/string.h>
 
25
#include <pj/sock.h>
 
26
#include <pj/array.h>
 
27
#include <pj/log.h>
 
28
#include <pj/assert.h>
 
29
#include <pj/errno.h>
 
30
#include <pj/compat/socket.h>
 
31
 
 
32
 
 
33
#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
 
34
#  include <winsock2.h>
 
35
#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
 
36
#  include <winsock.h>
 
37
#endif
 
38
 
 
39
#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
 
40
#  include <mswsock.h>
 
41
#endif
 
42
 
 
43
 
 
44
/* The address specified in AcceptEx() must be 16 more than the size of
 
45
 * SOCKADDR (source: MSDN).
 
46
 */
 
47
#define ACCEPT_ADDR_LEN     (sizeof(pj_sockaddr_in)+16)
 
48
 
 
49
typedef struct generic_overlapped
 
50
{
 
51
    WSAOVERLAPPED          overlapped;
 
52
    pj_ioqueue_operation_e operation;
 
53
} generic_overlapped;
 
54
 
 
55
/*
 
56
 * OVERLAPPPED structure for send and receive.
 
57
 */
 
58
typedef struct ioqueue_overlapped
 
59
{
 
60
    WSAOVERLAPPED          overlapped;
 
61
    pj_ioqueue_operation_e operation;
 
62
    WSABUF                 wsabuf;
 
63
    pj_sockaddr_in         dummy_addr;
 
64
    int                    dummy_addrlen;
 
65
} ioqueue_overlapped;
 
66
 
 
67
#if PJ_HAS_TCP
 
68
/*
 
69
 * OVERLAP structure for accept.
 
70
 */
 
71
typedef struct ioqueue_accept_rec
 
72
{
 
73
    WSAOVERLAPPED           overlapped;
 
74
    pj_ioqueue_operation_e  operation;
 
75
    pj_sock_t               newsock;
 
76
    pj_sock_t              *newsock_ptr;
 
77
    int                    *addrlen;
 
78
    void                   *remote;
 
79
    void                   *local;
 
80
    char                    accept_buf[2 * ACCEPT_ADDR_LEN];
 
81
} ioqueue_accept_rec;
 
82
#endif
 
83
 
 
84
/*
 
85
 * Structure to hold pending operation key.
 
86
 */
 
87
union operation_key
 
88
{
 
89
    generic_overlapped      generic;
 
90
    ioqueue_overlapped      overlapped;
 
91
#if PJ_HAS_TCP
 
92
    ioqueue_accept_rec      accept;
 
93
#endif
 
94
};
 
95
 
 
96
/* Type of handle in the key. */
 
97
enum handle_type
 
98
{
 
99
    HND_IS_UNKNOWN,
 
100
    HND_IS_FILE,
 
101
    HND_IS_SOCKET,
 
102
};
 
103
 
 
104
enum { POST_QUIT_LEN = 0xFFFFDEADUL };
 
105
 
 
106
/*
 
107
 * Structure for individual socket.
 
108
 */
 
109
struct pj_ioqueue_key_t
 
110
{
 
111
    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
 
112
 
 
113
    pj_ioqueue_t       *ioqueue;
 
114
    HANDLE              hnd;
 
115
    void               *user_data;
 
116
    enum handle_type    hnd_type;
 
117
    pj_ioqueue_callback cb;
 
118
    pj_bool_t           allow_concurrent;
 
119
 
 
120
#if PJ_HAS_TCP
 
121
    int                 connecting;
 
122
#endif
 
123
 
 
124
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
125
    pj_atomic_t        *ref_count;
 
126
    pj_bool_t           closing;
 
127
    pj_time_val         free_time;
 
128
    pj_mutex_t         *mutex;
 
129
#endif
 
130
 
 
131
};
 
132
 
 
133
/*
 
134
 * IO Queue structure.
 
135
 */
 
136
struct pj_ioqueue_t
 
137
{
 
138
    HANDLE            iocp;
 
139
    pj_lock_t        *lock;
 
140
    pj_bool_t         auto_delete_lock;
 
141
    pj_bool_t         default_concurrency;
 
142
 
 
143
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
144
    pj_ioqueue_key_t  active_list;
 
145
    pj_ioqueue_key_t  free_list;
 
146
    pj_ioqueue_key_t  closing_list;
 
147
#endif
 
148
 
 
149
    /* These are to keep track of connecting sockets */
 
150
#if PJ_HAS_TCP
 
151
    unsigned          event_count;
 
152
    HANDLE            event_pool[MAXIMUM_WAIT_OBJECTS+1];
 
153
    unsigned          connecting_count;
 
154
    HANDLE            connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
 
155
    pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
 
156
#endif
 
157
};
 
158
 
 
159
 
 
160
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
161
/* Prototype */
 
162
static void scan_closing_keys(pj_ioqueue_t *ioqueue);
 
163
#endif
 
164
 
 
165
 
 
166
#if PJ_HAS_TCP
 
167
/*
 
168
 * Process the socket when the overlapped accept() completed.
 
169
 */
 
170
static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
 
171
                                       ioqueue_accept_rec *accept_overlapped)
 
172
{
 
173
    struct sockaddr *local;
 
174
    struct sockaddr *remote;
 
175
    int locallen, remotelen;
 
176
    pj_status_t status;
 
177
 
 
178
    PJ_CHECK_STACK();
 
179
 
 
180
    /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket 
 
181
     * addresses can be obtained with getsockname() and getpeername().
 
182
     */
 
183
    status = setsockopt(accept_overlapped->newsock, SOL_SOCKET,
 
184
                        SO_UPDATE_ACCEPT_CONTEXT, 
 
185
                        (char*)&key->hnd, 
 
186
                        sizeof(SOCKET));
 
187
    /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
 
188
     * So ignore the error status.
 
189
     */
 
190
 
 
191
    /* Operation complete immediately. */
 
192
    if (accept_overlapped->addrlen) {
 
193
        GetAcceptExSockaddrs( accept_overlapped->accept_buf,
 
194
                              0, 
 
195
                              ACCEPT_ADDR_LEN,
 
196
                              ACCEPT_ADDR_LEN,
 
197
                              &local,
 
198
                              &locallen,
 
199
                              &remote,
 
200
                              &remotelen);
 
201
        if (*accept_overlapped->addrlen >= locallen) {
 
202
            if (accept_overlapped->local)
 
203
                pj_memcpy(accept_overlapped->local, local, locallen);
 
204
            if (accept_overlapped->remote)
 
205
                pj_memcpy(accept_overlapped->remote, remote, locallen);
 
206
        } else {
 
207
            if (accept_overlapped->local)
 
208
                pj_bzero(accept_overlapped->local, 
 
209
                         *accept_overlapped->addrlen);
 
210
            if (accept_overlapped->remote)
 
211
                pj_bzero(accept_overlapped->remote, 
 
212
                         *accept_overlapped->addrlen);
 
213
        }
 
214
 
 
215
        *accept_overlapped->addrlen = locallen;
 
216
    }
 
217
    if (accept_overlapped->newsock_ptr)
 
218
        *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
 
219
    accept_overlapped->operation = 0;
 
220
}
 
221
 
 
222
static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
 
223
{
 
224
    pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
 
225
    HANDLE hEvent = ioqueue->connecting_handles[pos];
 
226
 
 
227
    /* Remove key from array of connecting handles. */
 
228
    pj_array_erase(ioqueue->connecting_keys, sizeof(key),
 
229
                   ioqueue->connecting_count, pos);
 
230
    pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
 
231
                   ioqueue->connecting_count, pos);
 
232
    --ioqueue->connecting_count;
 
233
 
 
234
    /* Disassociate the socket from the event. */
 
235
    WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
 
236
 
 
237
    /* Put event object to pool. */
 
238
    if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
 
239
        ioqueue->event_pool[ioqueue->event_count++] = hEvent;
 
240
    } else {
 
241
        /* Shouldn't happen. There should be no more pending connections
 
242
         * than max. 
 
243
         */
 
244
        pj_assert(0);
 
245
        CloseHandle(hEvent);
 
246
    }
 
247
 
 
248
}
 
249
 
 
250
/*
 
251
 * Poll for the completion of non-blocking connect().
 
252
 * If there's a completion, the function return the key of the completed
 
253
 * socket, and 'result' argument contains the connect() result. If connect()
 
254
 * succeeded, 'result' will have value zero, otherwise will have the error
 
255
 * code.
 
256
 */
 
257
static int check_connecting( pj_ioqueue_t *ioqueue )
 
258
{
 
259
    if (ioqueue->connecting_count) {
 
260
        int i, count;
 
261
        struct 
 
262
        {
 
263
            pj_ioqueue_key_t *key;
 
264
            pj_status_t       status;
 
265
        } events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
 
266
 
 
267
        pj_lock_acquire(ioqueue->lock);
 
268
        for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
 
269
            DWORD result;
 
270
 
 
271
            result = WaitForMultipleObjects(ioqueue->connecting_count,
 
272
                                            ioqueue->connecting_handles,
 
273
                                            FALSE, 0);
 
274
            if (result >= WAIT_OBJECT_0 && 
 
275
                result < WAIT_OBJECT_0+ioqueue->connecting_count) 
 
276
            {
 
277
                WSANETWORKEVENTS net_events;
 
278
 
 
279
                /* Got completed connect(). */
 
280
                unsigned pos = result - WAIT_OBJECT_0;
 
281
                events[count].key = ioqueue->connecting_keys[pos];
 
282
 
 
283
                /* See whether connect has succeeded. */
 
284
                WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd, 
 
285
                                     ioqueue->connecting_handles[pos], 
 
286
                                     &net_events);
 
287
                events[count].status = 
 
288
                    PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
 
289
 
 
290
                /* Erase socket from pending connect. */
 
291
                erase_connecting_socket(ioqueue, pos);
 
292
            } else {
 
293
                /* No more events */
 
294
                break;
 
295
            }
 
296
        }
 
297
        pj_lock_release(ioqueue->lock);
 
298
 
 
299
        /* Call callbacks. */
 
300
        for (i=0; i<count; ++i) {
 
301
            if (events[i].key->cb.on_connect_complete) {
 
302
                events[i].key->cb.on_connect_complete(events[i].key, 
 
303
                                                      events[i].status);
 
304
            }
 
305
        }
 
306
 
 
307
        return count;
 
308
    }
 
309
 
 
310
    return 0;
 
311
    
 
312
}
 
313
#endif
 
314
 
 
315
/*
 
316
 * pj_ioqueue_name()
 
317
 */
 
318
PJ_DEF(const char*) pj_ioqueue_name(void)
 
319
{
 
320
    return "iocp";
 
321
}
 
322
 
 
323
/*
 
324
 * pj_ioqueue_create()
 
325
 */
 
326
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 
 
327
                                       pj_size_t max_fd,
 
328
                                       pj_ioqueue_t **p_ioqueue)
 
329
{
 
330
    pj_ioqueue_t *ioqueue;
 
331
    unsigned i;
 
332
    pj_status_t rc;
 
333
 
 
334
    PJ_UNUSED_ARG(max_fd);
 
335
    PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
 
336
 
 
337
    rc = sizeof(union operation_key);
 
338
 
 
339
    /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
 
340
    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= 
 
341
                     sizeof(union operation_key), PJ_EBUG);
 
342
 
 
343
    /* Create IOCP */
 
344
    ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
 
345
    ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
 
346
    if (ioqueue->iocp == NULL)
 
347
        return PJ_RETURN_OS_ERROR(GetLastError());
 
348
 
 
349
    /* Create IOCP mutex */
 
350
    rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock);
 
351
    if (rc != PJ_SUCCESS) {
 
352
        CloseHandle(ioqueue->iocp);
 
353
        return rc;
 
354
    }
 
355
 
 
356
    ioqueue->auto_delete_lock = PJ_TRUE;
 
357
    ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
 
358
 
 
359
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
360
    /*
 
361
     * Create and initialize key pools.
 
362
     */
 
363
    pj_list_init(&ioqueue->active_list);
 
364
    pj_list_init(&ioqueue->free_list);
 
365
    pj_list_init(&ioqueue->closing_list);
 
366
 
 
367
    /* Preallocate keys according to max_fd setting, and put them
 
368
     * in free_list.
 
369
     */
 
370
    for (i=0; i<max_fd; ++i) {
 
371
        pj_ioqueue_key_t *key;
 
372
 
 
373
        key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
 
374
 
 
375
        rc = pj_atomic_create(pool, 0, &key->ref_count);
 
376
        if (rc != PJ_SUCCESS) {
 
377
            key = ioqueue->free_list.next;
 
378
            while (key != &ioqueue->free_list) {
 
379
                pj_atomic_destroy(key->ref_count);
 
380
                pj_mutex_destroy(key->mutex);
 
381
                key = key->next;
 
382
            }
 
383
            CloseHandle(ioqueue->iocp);
 
384
            return rc;
 
385
        }
 
386
 
 
387
        rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex);
 
388
        if (rc != PJ_SUCCESS) {
 
389
            pj_atomic_destroy(key->ref_count);
 
390
            key = ioqueue->free_list.next;
 
391
            while (key != &ioqueue->free_list) {
 
392
                pj_atomic_destroy(key->ref_count);
 
393
                pj_mutex_destroy(key->mutex);
 
394
                key = key->next;
 
395
            }
 
396
            CloseHandle(ioqueue->iocp);
 
397
            return rc;
 
398
        }
 
399
 
 
400
        pj_list_push_back(&ioqueue->free_list, key);
 
401
    }
 
402
#endif
 
403
 
 
404
    *p_ioqueue = ioqueue;
 
405
 
 
406
    PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
 
407
    return PJ_SUCCESS;
 
408
}
 
409
 
 
410
/*
 
411
 * pj_ioqueue_destroy()
 
412
 */
 
413
PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
 
414
{
 
415
#if PJ_HAS_TCP
 
416
    unsigned i;
 
417
#endif
 
418
    pj_ioqueue_key_t *key;
 
419
 
 
420
    PJ_CHECK_STACK();
 
421
    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
 
422
 
 
423
    pj_lock_acquire(ioqueue->lock);
 
424
 
 
425
#if PJ_HAS_TCP
 
426
    /* Destroy events in the pool */
 
427
    for (i=0; i<ioqueue->event_count; ++i) {
 
428
        CloseHandle(ioqueue->event_pool[i]);
 
429
    }
 
430
    ioqueue->event_count = 0;
 
431
#endif
 
432
 
 
433
    if (CloseHandle(ioqueue->iocp) != TRUE)
 
434
        return PJ_RETURN_OS_ERROR(GetLastError());
 
435
 
 
436
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
437
    /* Destroy reference counters */
 
438
    key = ioqueue->active_list.next;
 
439
    while (key != &ioqueue->active_list) {
 
440
        pj_atomic_destroy(key->ref_count);
 
441
        pj_mutex_destroy(key->mutex);
 
442
        key = key->next;
 
443
    }
 
444
 
 
445
    key = ioqueue->closing_list.next;
 
446
    while (key != &ioqueue->closing_list) {
 
447
        pj_atomic_destroy(key->ref_count);
 
448
        pj_mutex_destroy(key->mutex);
 
449
        key = key->next;
 
450
    }
 
451
 
 
452
    key = ioqueue->free_list.next;
 
453
    while (key != &ioqueue->free_list) {
 
454
        pj_atomic_destroy(key->ref_count);
 
455
        pj_mutex_destroy(key->mutex);
 
456
        key = key->next;
 
457
    }
 
458
#endif
 
459
 
 
460
    if (ioqueue->auto_delete_lock)
 
461
        pj_lock_destroy(ioqueue->lock);
 
462
 
 
463
    return PJ_SUCCESS;
 
464
}
 
465
 
 
466
 
 
467
PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
 
468
                                                       pj_bool_t allow)
 
469
{
 
470
    PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
 
471
    ioqueue->default_concurrency = allow;
 
472
    return PJ_SUCCESS;
 
473
}
 
474
 
 
475
/*
 
476
 * pj_ioqueue_set_lock()
 
477
 */
 
478
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, 
 
479
                                         pj_lock_t *lock,
 
480
                                         pj_bool_t auto_delete )
 
481
{
 
482
    PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
 
483
 
 
484
    if (ioqueue->auto_delete_lock) {
 
485
        pj_lock_destroy(ioqueue->lock);
 
486
    }
 
487
 
 
488
    ioqueue->lock = lock;
 
489
    ioqueue->auto_delete_lock = auto_delete;
 
490
 
 
491
    return PJ_SUCCESS;
 
492
}
 
493
 
 
494
/*
 
495
 * pj_ioqueue_register_sock()
 
496
 */
 
497
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
 
498
                                              pj_ioqueue_t *ioqueue,
 
499
                                              pj_sock_t sock,
 
500
                                              void *user_data,
 
501
                                              const pj_ioqueue_callback *cb,
 
502
                                              pj_ioqueue_key_t **key )
 
503
{
 
504
    HANDLE hioq;
 
505
    pj_ioqueue_key_t *rec;
 
506
    u_long value;
 
507
    int rc;
 
508
 
 
509
    PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
 
510
 
 
511
    pj_lock_acquire(ioqueue->lock);
 
512
 
 
513
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
514
    /* Scan closing list first to release unused keys.
 
515
     * Must do this with lock acquired.
 
516
     */
 
517
    scan_closing_keys(ioqueue);
 
518
 
 
519
    /* If safe unregistration is used, then get the key record from
 
520
     * the free list.
 
521
     */
 
522
    if (pj_list_empty(&ioqueue->free_list)) {
 
523
        pj_lock_release(ioqueue->lock);
 
524
        return PJ_ETOOMANY;
 
525
    }
 
526
 
 
527
    rec = ioqueue->free_list.next;
 
528
    pj_list_erase(rec);
 
529
 
 
530
    /* Set initial reference count to 1 */
 
531
    pj_assert(pj_atomic_get(rec->ref_count) == 0);
 
532
    pj_atomic_inc(rec->ref_count);
 
533
 
 
534
    rec->closing = 0;
 
535
 
 
536
#else
 
537
    rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
 
538
#endif
 
539
 
 
540
    /* Build the key for this socket. */
 
541
    rec->ioqueue = ioqueue;
 
542
    rec->hnd = (HANDLE)sock;
 
543
    rec->hnd_type = HND_IS_SOCKET;
 
544
    rec->user_data = user_data;
 
545
    pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
 
546
 
 
547
    /* Set concurrency for this handle */
 
548
    rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency);
 
549
    if (rc != PJ_SUCCESS) {
 
550
        pj_lock_release(ioqueue->lock);
 
551
        return rc;
 
552
    }
 
553
 
 
554
#if PJ_HAS_TCP
 
555
    rec->connecting = 0;
 
556
#endif
 
557
 
 
558
    /* Set socket to nonblocking. */
 
559
    value = 1;
 
560
    rc = ioctlsocket(sock, FIONBIO, &value);
 
561
    if (rc != 0) {
 
562
        pj_lock_release(ioqueue->lock);
 
563
        return PJ_RETURN_OS_ERROR(WSAGetLastError());
 
564
    }
 
565
 
 
566
    /* Associate with IOCP */
 
567
    hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
 
568
    if (!hioq) {
 
569
        pj_lock_release(ioqueue->lock);
 
570
        return PJ_RETURN_OS_ERROR(GetLastError());
 
571
    }
 
572
 
 
573
    *key = rec;
 
574
 
 
575
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
576
    pj_list_push_back(&ioqueue->active_list, rec);
 
577
#endif
 
578
 
 
579
    pj_lock_release(ioqueue->lock);
 
580
 
 
581
    return PJ_SUCCESS;
 
582
}
 
583
 
 
584
 
 
585
/*
 
586
 * pj_ioqueue_get_user_data()
 
587
 */
 
588
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
 
589
{
 
590
    PJ_ASSERT_RETURN(key, NULL);
 
591
    return key->user_data;
 
592
}
 
593
 
 
594
/*
 
595
 * pj_ioqueue_set_user_data()
 
596
 */
 
597
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
 
598
                                              void *user_data,
 
599
                                              void **old_data )
 
600
{
 
601
    PJ_ASSERT_RETURN(key, PJ_EINVAL);
 
602
    
 
603
    if (old_data)
 
604
        *old_data = key->user_data;
 
605
 
 
606
    key->user_data = user_data;
 
607
    return PJ_SUCCESS;
 
608
}
 
609
 
 
610
 
 
611
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
612
/* Decrement the key's reference counter, and when the counter reach zero,
 
613
 * destroy the key.
 
614
 */
 
615
static void decrement_counter(pj_ioqueue_key_t *key)
 
616
{
 
617
    if (pj_atomic_dec_and_get(key->ref_count) == 0) {
 
618
 
 
619
        pj_lock_acquire(key->ioqueue->lock);
 
620
 
 
621
        pj_assert(key->closing == 1);
 
622
        pj_gettickcount(&key->free_time);
 
623
        key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
 
624
        pj_time_val_normalize(&key->free_time);
 
625
 
 
626
        pj_list_erase(key);
 
627
        pj_list_push_back(&key->ioqueue->closing_list, key);
 
628
 
 
629
        pj_lock_release(key->ioqueue->lock);
 
630
    }
 
631
}
 
632
#endif
 
633
 
 
634
/*
 
635
 * Poll the I/O Completion Port, execute callback, 
 
636
 * and return the key and bytes transferred of the last operation.
 
637
 */
 
638
static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, 
 
639
                            pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
 
640
{
 
641
    DWORD dwBytesTransferred, dwKey;
 
642
    generic_overlapped *pOv;
 
643
    pj_ioqueue_key_t *key;
 
644
    pj_ssize_t size_status = -1;
 
645
    BOOL rcGetQueued;
 
646
 
 
647
    /* Poll for completion status. */
 
648
    rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransferred,
 
649
                                            &dwKey, (OVERLAPPED**)&pOv, 
 
650
                                            dwTimeout);
 
651
 
 
652
    /* The return value is:
 
653
     * - nonzero if event was dequeued.
 
654
     * - zero and pOv==NULL if no event was dequeued.
 
655
     * - zero and pOv!=NULL if event for failed I/O was dequeued.
 
656
     */
 
657
    if (pOv) {
 
658
        pj_bool_t has_lock;
 
659
 
 
660
        /* Event was dequeued for either successfull or failed I/O */
 
661
        key = (pj_ioqueue_key_t*)dwKey;
 
662
        size_status = dwBytesTransferred;
 
663
 
 
664
        /* Report to caller regardless */
 
665
        if (p_bytes)
 
666
            *p_bytes = size_status;
 
667
        if (p_key)
 
668
            *p_key = key;
 
669
 
 
670
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
671
        /* We shouldn't call callbacks if key is quitting. */
 
672
        if (key->closing)
 
673
            return PJ_TRUE;
 
674
 
 
675
        /* If concurrency is disabled, lock the key 
 
676
         * (and save the lock status to local var since app may change
 
677
         * concurrency setting while in the callback) */
 
678
        if (key->allow_concurrent == PJ_FALSE) {
 
679
            pj_mutex_lock(key->mutex);
 
680
            has_lock = PJ_TRUE;
 
681
        } else {
 
682
            has_lock = PJ_FALSE;
 
683
        }
 
684
 
 
685
        /* Now that we get the lock, check again that key is not closing */
 
686
        if (key->closing) {
 
687
            if (has_lock) {
 
688
                pj_mutex_unlock(key->mutex);
 
689
            }
 
690
            return PJ_TRUE;
 
691
        }
 
692
 
 
693
        /* Increment reference counter to prevent this key from being
 
694
         * deleted
 
695
         */
 
696
        pj_atomic_inc(key->ref_count);
 
697
#else
 
698
        PJ_UNUSED_ARG(has_lock);
 
699
#endif
 
700
 
 
701
        /* Carry out the callback */
 
702
        switch (pOv->operation) {
 
703
        case PJ_IOQUEUE_OP_READ:
 
704
        case PJ_IOQUEUE_OP_RECV:
 
705
        case PJ_IOQUEUE_OP_RECV_FROM:
 
706
            pOv->operation = 0;
 
707
            if (key->cb.on_read_complete)
 
708
                key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv, 
 
709
                                         size_status);
 
710
            break;
 
711
        case PJ_IOQUEUE_OP_WRITE:
 
712
        case PJ_IOQUEUE_OP_SEND:
 
713
        case PJ_IOQUEUE_OP_SEND_TO:
 
714
            pOv->operation = 0;
 
715
            if (key->cb.on_write_complete)
 
716
                key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv, 
 
717
                                                size_status);
 
718
            break;
 
719
#if PJ_HAS_TCP
 
720
        case PJ_IOQUEUE_OP_ACCEPT:
 
721
            /* special case for accept. */
 
722
            ioqueue_on_accept_complete(key, (ioqueue_accept_rec*)pOv);
 
723
            if (key->cb.on_accept_complete) {
 
724
                ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
 
725
                pj_status_t status = PJ_SUCCESS;
 
726
                pj_sock_t newsock;
 
727
 
 
728
                newsock = accept_rec->newsock;
 
729
                accept_rec->newsock = PJ_INVALID_SOCKET;
 
730
 
 
731
                if (newsock == PJ_INVALID_SOCKET) {
 
732
                    int dwError = WSAGetLastError();
 
733
                    if (dwError == 0) dwError = OSERR_ENOTCONN;
 
734
                    status = PJ_RETURN_OS_ERROR(dwError);
 
735
                }
 
736
 
 
737
                key->cb.on_accept_complete(key, (pj_ioqueue_op_key_t*)pOv,
 
738
                                           newsock, status);
 
739
                
 
740
            }
 
741
            break;
 
742
        case PJ_IOQUEUE_OP_CONNECT:
 
743
#endif
 
744
        case PJ_IOQUEUE_OP_NONE:
 
745
            pj_assert(0);
 
746
            break;
 
747
        }
 
748
 
 
749
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
750
        decrement_counter(key);
 
751
        if (has_lock)
 
752
            pj_mutex_unlock(key->mutex);
 
753
#endif
 
754
 
 
755
        return PJ_TRUE;
 
756
    }
 
757
 
 
758
    /* No event was queued. */
 
759
    return PJ_FALSE;
 
760
}
 
761
 
 
762
/*
 
763
 * pj_ioqueue_unregister()
 
764
 */
 
765
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
 
766
{
 
767
    unsigned i;
 
768
    pj_bool_t has_lock;
 
769
    enum { RETRY = 10 };
 
770
 
 
771
    PJ_ASSERT_RETURN(key, PJ_EINVAL);
 
772
 
 
773
#if PJ_HAS_TCP
 
774
    if (key->connecting) {
 
775
        unsigned pos;
 
776
        pj_ioqueue_t *ioqueue;
 
777
 
 
778
        ioqueue = key->ioqueue;
 
779
 
 
780
        /* Erase from connecting_handles */
 
781
        pj_lock_acquire(ioqueue->lock);
 
782
        for (pos=0; pos < ioqueue->connecting_count; ++pos) {
 
783
            if (ioqueue->connecting_keys[pos] == key) {
 
784
                erase_connecting_socket(ioqueue, pos);
 
785
                break;
 
786
            }
 
787
        }
 
788
        key->connecting = 0;
 
789
        pj_lock_release(ioqueue->lock);
 
790
    }
 
791
#endif
 
792
 
 
793
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
794
    /* Mark key as closing before closing handle. */
 
795
    key->closing = 1;
 
796
 
 
797
    /* If concurrency is disabled, wait until the key has finished
 
798
     * processing the callback
 
799
     */
 
800
    if (key->allow_concurrent == PJ_FALSE) {
 
801
        pj_mutex_lock(key->mutex);
 
802
        has_lock = PJ_TRUE;
 
803
    } else {
 
804
        has_lock = PJ_FALSE;
 
805
    }
 
806
#else
 
807
    PJ_UNUSED_ARG(has_lock);
 
808
#endif
 
809
    
 
810
    /* Close handle (the only way to disassociate handle from IOCP). 
 
811
     * We also need to close handle to make sure that no further events
 
812
     * will come to the handle.
 
813
     */
 
814
    /* Update 2008/07/18 (http://trac.pjsip.org/repos/ticket/575):
 
815
     *  - It seems that CloseHandle() in itself does not actually close
 
816
     *    the socket (i.e. it will still appear in "netstat" output). Also
 
817
     *    if we only use CloseHandle(), an "Invalid Handle" exception will
 
818
     *    be raised in WSACleanup().
 
819
     *  - MSDN documentation says that CloseHandle() must be called after 
 
820
     *    closesocket() call (see
 
821
     *    http://msdn.microsoft.com/en-us/library/ms724211(VS.85).aspx).
 
822
     *    But turns out that this will raise "Invalid Handle" exception
 
823
     *    in debug mode.
 
824
     *  So because of this, we replaced CloseHandle() with closesocket()
 
825
     *  instead. These was tested on WinXP SP2.
 
826
     */
 
827
    //CloseHandle(key->hnd);
 
828
    pj_sock_close((pj_sock_t)key->hnd);
 
829
 
 
830
    /* Reset callbacks */
 
831
    key->cb.on_accept_complete = NULL;
 
832
    key->cb.on_connect_complete = NULL;
 
833
    key->cb.on_read_complete = NULL;
 
834
    key->cb.on_write_complete = NULL;
 
835
 
 
836
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
837
    /* Even after handle is closed, I suspect that IOCP may still try to
 
838
     * do something with the handle, causing memory corruption when pool
 
839
     * debugging is enabled.
 
840
     *
 
841
     * Forcing context switch seems to have fixed that, but this is quite
 
842
     * an ugly solution..
 
843
     *
 
844
     * Update 2008/02/13:
 
845
     *  This should not happen if concurrency is disallowed for the key.
 
846
     *  So at least application has a solution for this (i.e. by disallowing
 
847
     *  concurrency in the key).
 
848
     */
 
849
    //This will loop forever if unregistration is done on the callback.
 
850
    //Doing this with RETRY I think should solve the IOCP setting the 
 
851
    //socket signalled, without causing the deadlock.
 
852
    //while (pj_atomic_get(key->ref_count) != 1)
 
853
    //  pj_thread_sleep(0);
 
854
    for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
 
855
        pj_thread_sleep(0);
 
856
 
 
857
    /* Decrement reference counter to destroy the key. */
 
858
    decrement_counter(key);
 
859
 
 
860
    if (has_lock)
 
861
        pj_mutex_unlock(key->mutex);
 
862
#endif
 
863
 
 
864
    return PJ_SUCCESS;
 
865
}
 
866
 
 
867
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
868
/* Scan the closing list, and put pending closing keys to free list.
 
869
 * Must do this with ioqueue mutex held.
 
870
 */
 
871
static void scan_closing_keys(pj_ioqueue_t *ioqueue)
 
872
{
 
873
    if (!pj_list_empty(&ioqueue->closing_list)) {
 
874
        pj_time_val now;
 
875
        pj_ioqueue_key_t *key;
 
876
 
 
877
        pj_gettickcount(&now);
 
878
        
 
879
        /* Move closing keys to free list when they've finished the closing
 
880
         * idle time.
 
881
         */
 
882
        key = ioqueue->closing_list.next;
 
883
        while (key != &ioqueue->closing_list) {
 
884
            pj_ioqueue_key_t *next = key->next;
 
885
 
 
886
            pj_assert(key->closing != 0);
 
887
 
 
888
            if (PJ_TIME_VAL_GTE(now, key->free_time)) {
 
889
                pj_list_erase(key);
 
890
                pj_list_push_back(&ioqueue->free_list, key);
 
891
            }
 
892
            key = next;
 
893
        }
 
894
    }
 
895
}
 
896
#endif
 
897
 
 
898
/*
 
899
 * pj_ioqueue_poll()
 
900
 *
 
901
 * Poll for events.
 
902
 */
 
903
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
 
904
{
 
905
    DWORD dwMsec;
 
906
#if PJ_HAS_TCP
 
907
    int connect_count = 0;
 
908
#endif
 
909
    int event_count = 0;
 
910
 
 
911
    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
 
912
 
 
913
    /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
 
914
    dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
 
915
 
 
916
    /* Poll for completion status. */
 
917
    event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
 
918
 
 
919
#if PJ_HAS_TCP
 
920
    /* Check the connecting array, only when there's no activity. */
 
921
    if (event_count == 0) {
 
922
        connect_count = check_connecting(ioqueue);
 
923
        if (connect_count > 0)
 
924
            event_count += connect_count;
 
925
    }
 
926
#endif
 
927
 
 
928
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
929
    /* Check the closing keys only when there's no activity and when there are
 
930
     * pending closing keys.
 
931
     */
 
932
    if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
 
933
        pj_lock_acquire(ioqueue->lock);
 
934
        scan_closing_keys(ioqueue);
 
935
        pj_lock_release(ioqueue->lock);
 
936
    }
 
937
#endif
 
938
 
 
939
    /* Return number of events. */
 
940
    return event_count;
 
941
}
 
942
 
 
943
/*
 
944
 * pj_ioqueue_recv()
 
945
 *
 
946
 * Initiate overlapped WSARecv() operation.
 
947
 */
 
948
PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key,
 
949
                                      pj_ioqueue_op_key_t *op_key,
 
950
                                      void *buffer,
 
951
                                      pj_ssize_t *length,
 
952
                                      pj_uint32_t flags )
 
953
{
 
954
    /*
 
955
     * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
 
956
     * addrlen here. But unfortunately it generates EINVAL... :-(
 
957
     *  -bennylp
 
958
     */
 
959
    int rc;
 
960
    DWORD bytesRead;
 
961
    DWORD dwFlags = 0;
 
962
    union operation_key *op_key_rec;
 
963
 
 
964
    PJ_CHECK_STACK();
 
965
    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
 
966
 
 
967
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
968
    /* Check key is not closing */
 
969
    if (key->closing)
 
970
        return PJ_ECANCELLED;
 
971
#endif
 
972
 
 
973
    op_key_rec = (union operation_key*)op_key->internal__;
 
974
    op_key_rec->overlapped.wsabuf.buf = buffer;
 
975
    op_key_rec->overlapped.wsabuf.len = *length;
 
976
 
 
977
    dwFlags = flags;
 
978
    
 
979
    /* Try non-overlapped received first to see if data is
 
980
     * immediately available.
 
981
     */
 
982
    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
 
983
        rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
 
984
                     &bytesRead, &dwFlags, NULL, NULL);
 
985
        if (rc == 0) {
 
986
            *length = bytesRead;
 
987
            return PJ_SUCCESS;
 
988
        } else {
 
989
            DWORD dwError = WSAGetLastError();
 
990
            if (dwError != WSAEWOULDBLOCK) {
 
991
                *length = -1;
 
992
                return PJ_RETURN_OS_ERROR(dwError);
 
993
            }
 
994
        }
 
995
    }
 
996
 
 
997
    dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
 
998
 
 
999
    /*
 
1000
     * No immediate data available.
 
1001
     * Register overlapped Recv() operation.
 
1002
     */
 
1003
    pj_bzero( &op_key_rec->overlapped.overlapped, 
 
1004
              sizeof(op_key_rec->overlapped.overlapped));
 
1005
    op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
 
1006
 
 
1007
    rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 
 
1008
                  &bytesRead, &dwFlags, 
 
1009
                  &op_key_rec->overlapped.overlapped, NULL);
 
1010
    if (rc == SOCKET_ERROR) {
 
1011
        DWORD dwStatus = WSAGetLastError();
 
1012
        if (dwStatus!=WSA_IO_PENDING) {
 
1013
            *length = -1;
 
1014
            return PJ_STATUS_FROM_OS(dwStatus);
 
1015
        }
 
1016
    }
 
1017
 
 
1018
    /* Pending operation has been scheduled. */
 
1019
    return PJ_EPENDING;
 
1020
}
 
1021
 
 
1022
/*
 
1023
 * pj_ioqueue_recvfrom()
 
1024
 *
 
1025
 * Initiate overlapped RecvFrom() operation.
 
1026
 */
 
1027
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
 
1028
                                         pj_ioqueue_op_key_t *op_key,
 
1029
                                         void *buffer,
 
1030
                                         pj_ssize_t *length,
 
1031
                                         pj_uint32_t flags,
 
1032
                                         pj_sockaddr_t *addr,
 
1033
                                         int *addrlen)
 
1034
{
 
1035
    int rc;
 
1036
    DWORD bytesRead;
 
1037
    DWORD dwFlags = 0;
 
1038
    union operation_key *op_key_rec;
 
1039
 
 
1040
    PJ_CHECK_STACK();
 
1041
    PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
 
1042
 
 
1043
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
1044
    /* Check key is not closing */
 
1045
    if (key->closing)
 
1046
        return PJ_ECANCELLED;
 
1047
#endif
 
1048
 
 
1049
    op_key_rec = (union operation_key*)op_key->internal__;
 
1050
    op_key_rec->overlapped.wsabuf.buf = buffer;
 
1051
    op_key_rec->overlapped.wsabuf.len = *length;
 
1052
 
 
1053
    dwFlags = flags;
 
1054
    
 
1055
    /* Try non-overlapped received first to see if data is
 
1056
     * immediately available.
 
1057
     */
 
1058
    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
 
1059
        rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
 
1060
                         &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
 
1061
        if (rc == 0) {
 
1062
            *length = bytesRead;
 
1063
            return PJ_SUCCESS;
 
1064
        } else {
 
1065
            DWORD dwError = WSAGetLastError();
 
1066
            if (dwError != WSAEWOULDBLOCK) {
 
1067
                *length = -1;
 
1068
                return PJ_RETURN_OS_ERROR(dwError);
 
1069
            }
 
1070
        }
 
1071
    }
 
1072
 
 
1073
    dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
 
1074
 
 
1075
    /*
 
1076
     * No immediate data available.
 
1077
     * Register overlapped Recv() operation.
 
1078
     */
 
1079
    pj_bzero( &op_key_rec->overlapped.overlapped, 
 
1080
              sizeof(op_key_rec->overlapped.overlapped));
 
1081
    op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
 
1082
 
 
1083
    rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 
 
1084
                     &bytesRead, &dwFlags, addr, addrlen,
 
1085
                     &op_key_rec->overlapped.overlapped, NULL);
 
1086
    if (rc == SOCKET_ERROR) {
 
1087
        DWORD dwStatus = WSAGetLastError();
 
1088
        if (dwStatus!=WSA_IO_PENDING) {
 
1089
            *length = -1;
 
1090
            return PJ_STATUS_FROM_OS(dwStatus);
 
1091
        }
 
1092
    } 
 
1093
    
 
1094
    /* Pending operation has been scheduled. */
 
1095
    return PJ_EPENDING;
 
1096
}
 
1097
 
 
1098
/*
 
1099
 * pj_ioqueue_send()
 
1100
 *
 
1101
 * Initiate overlapped Send operation.
 
1102
 */
 
1103
PJ_DEF(pj_status_t) pj_ioqueue_send(  pj_ioqueue_key_t *key,
 
1104
                                      pj_ioqueue_op_key_t *op_key,
 
1105
                                      const void *data,
 
1106
                                      pj_ssize_t *length,
 
1107
                                      pj_uint32_t flags )
 
1108
{
 
1109
    return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
 
1110
}
 
1111
 
 
1112
 
 
1113
/*
 
1114
 * pj_ioqueue_sendto()
 
1115
 *
 
1116
 * Initiate overlapped SendTo operation.
 
1117
 */
 
1118
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
 
1119
                                       pj_ioqueue_op_key_t *op_key,
 
1120
                                       const void *data,
 
1121
                                       pj_ssize_t *length,
 
1122
                                       pj_uint32_t flags,
 
1123
                                       const pj_sockaddr_t *addr,
 
1124
                                       int addrlen)
 
1125
{
 
1126
    int rc;
 
1127
    DWORD bytesWritten;
 
1128
    DWORD dwFlags;
 
1129
    union operation_key *op_key_rec;
 
1130
 
 
1131
    PJ_CHECK_STACK();
 
1132
    PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
 
1133
 
 
1134
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
1135
    /* Check key is not closing */
 
1136
    if (key->closing)
 
1137
        return PJ_ECANCELLED;
 
1138
#endif
 
1139
 
 
1140
    op_key_rec = (union operation_key*)op_key->internal__;
 
1141
 
 
1142
    /*
 
1143
     * First try blocking write.
 
1144
     */
 
1145
    op_key_rec->overlapped.wsabuf.buf = (void*)data;
 
1146
    op_key_rec->overlapped.wsabuf.len = *length;
 
1147
 
 
1148
    dwFlags = flags;
 
1149
 
 
1150
    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
 
1151
        rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
 
1152
                       &bytesWritten, dwFlags, addr, addrlen,
 
1153
                       NULL, NULL);
 
1154
        if (rc == 0) {
 
1155
            *length = bytesWritten;
 
1156
            return PJ_SUCCESS;
 
1157
        } else {
 
1158
            DWORD dwStatus = WSAGetLastError();
 
1159
            if (dwStatus != WSAEWOULDBLOCK) {
 
1160
                *length = -1;
 
1161
                return PJ_RETURN_OS_ERROR(dwStatus);
 
1162
            }
 
1163
        }
 
1164
    }
 
1165
 
 
1166
    dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
 
1167
 
 
1168
    /*
 
1169
     * Data can't be sent immediately.
 
1170
     * Schedule asynchronous WSASend().
 
1171
     */
 
1172
    pj_bzero( &op_key_rec->overlapped.overlapped, 
 
1173
              sizeof(op_key_rec->overlapped.overlapped));
 
1174
    op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
 
1175
 
 
1176
    rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
 
1177
                   &bytesWritten,  dwFlags, addr, addrlen,
 
1178
                   &op_key_rec->overlapped.overlapped, NULL);
 
1179
    if (rc == SOCKET_ERROR) {
 
1180
        DWORD dwStatus = WSAGetLastError();
 
1181
        if (dwStatus!=WSA_IO_PENDING)
 
1182
            return PJ_STATUS_FROM_OS(dwStatus);
 
1183
    }
 
1184
 
 
1185
    /* Asynchronous operation successfully submitted. */
 
1186
    return PJ_EPENDING;
 
1187
}
 
1188
 
 
1189
#if PJ_HAS_TCP
 
1190
 
 
1191
/*
 
1192
 * pj_ioqueue_accept()
 
1193
 *
 
1194
 * Initiate overlapped accept() operation.
 
1195
 */
 
1196
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
 
1197
                                       pj_ioqueue_op_key_t *op_key,
 
1198
                                       pj_sock_t *new_sock,
 
1199
                                       pj_sockaddr_t *local,
 
1200
                                       pj_sockaddr_t *remote,
 
1201
                                       int *addrlen)
 
1202
{
 
1203
    BOOL rc;
 
1204
    DWORD bytesReceived;
 
1205
    pj_status_t status;
 
1206
    union operation_key *op_key_rec;
 
1207
    SOCKET sock;
 
1208
 
 
1209
    PJ_CHECK_STACK();
 
1210
    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
 
1211
 
 
1212
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
1213
    /* Check key is not closing */
 
1214
    if (key->closing)
 
1215
        return PJ_ECANCELLED;
 
1216
#endif
 
1217
 
 
1218
    /*
 
1219
     * See if there is a new connection immediately available.
 
1220
     */
 
1221
    sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
 
1222
    if (sock != INVALID_SOCKET) {
 
1223
        /* Yes! New socket is available! */
 
1224
        if (local && addrlen) {
 
1225
            int status;
 
1226
 
 
1227
            /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket 
 
1228
             * addresses can be obtained with getsockname() and getpeername().
 
1229
             */
 
1230
            status = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
 
1231
                                (char*)&key->hnd, sizeof(SOCKET));
 
1232
            /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
 
1233
             * So ignore the error status.
 
1234
             */
 
1235
 
 
1236
            status = getsockname(sock, local, addrlen);
 
1237
            if (status != 0) {
 
1238
                DWORD dwError = WSAGetLastError();
 
1239
                closesocket(sock);
 
1240
                return PJ_RETURN_OS_ERROR(dwError);
 
1241
            }
 
1242
        }
 
1243
 
 
1244
        *new_sock = sock;
 
1245
        return PJ_SUCCESS;
 
1246
 
 
1247
    } else {
 
1248
        DWORD dwError = WSAGetLastError();
 
1249
        if (dwError != WSAEWOULDBLOCK) {
 
1250
            return PJ_RETURN_OS_ERROR(dwError);
 
1251
        }
 
1252
    }
 
1253
 
 
1254
    /*
 
1255
     * No connection is immediately available.
 
1256
     * Must schedule an asynchronous operation.
 
1257
     */
 
1258
    op_key_rec = (union operation_key*)op_key->internal__;
 
1259
    
 
1260
    status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, 
 
1261
                            &op_key_rec->accept.newsock);
 
1262
    if (status != PJ_SUCCESS)
 
1263
        return status;
 
1264
 
 
1265
    op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
 
1266
    op_key_rec->accept.addrlen = addrlen;
 
1267
    op_key_rec->accept.local = local;
 
1268
    op_key_rec->accept.remote = remote;
 
1269
    op_key_rec->accept.newsock_ptr = new_sock;
 
1270
    pj_bzero( &op_key_rec->accept.overlapped, 
 
1271
              sizeof(op_key_rec->accept.overlapped));
 
1272
 
 
1273
    rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
 
1274
                   op_key_rec->accept.accept_buf,
 
1275
                   0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
 
1276
                   &bytesReceived,
 
1277
                   &op_key_rec->accept.overlapped );
 
1278
 
 
1279
    if (rc == TRUE) {
 
1280
        ioqueue_on_accept_complete(key, &op_key_rec->accept);
 
1281
        return PJ_SUCCESS;
 
1282
    } else {
 
1283
        DWORD dwStatus = WSAGetLastError();
 
1284
        if (dwStatus!=WSA_IO_PENDING)
 
1285
            return PJ_STATUS_FROM_OS(dwStatus);
 
1286
    }
 
1287
 
 
1288
    /* Asynchronous Accept() has been submitted. */
 
1289
    return PJ_EPENDING;
 
1290
}
 
1291
 
 
1292
 
 
1293
/*
 
1294
 * pj_ioqueue_connect()
 
1295
 *
 
1296
 * Initiate overlapped connect() operation (well, it's non-blocking actually,
 
1297
 * since there's no overlapped version of connect()).
 
1298
 */
 
1299
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
 
1300
                                        const pj_sockaddr_t *addr,
 
1301
                                        int addrlen )
 
1302
{
 
1303
    HANDLE hEvent;
 
1304
    pj_ioqueue_t *ioqueue;
 
1305
 
 
1306
    PJ_CHECK_STACK();
 
1307
    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
 
1308
 
 
1309
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
1310
    /* Check key is not closing */
 
1311
    if (key->closing)
 
1312
        return PJ_ECANCELLED;
 
1313
#endif
 
1314
 
 
1315
    /* Initiate connect() */
 
1316
    if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
 
1317
        DWORD dwStatus;
 
1318
        dwStatus = WSAGetLastError();
 
1319
        if (dwStatus != WSAEWOULDBLOCK) {
 
1320
            return PJ_RETURN_OS_ERROR(dwStatus);
 
1321
        }
 
1322
    } else {
 
1323
        /* Connect has completed immediately! */
 
1324
        return PJ_SUCCESS;
 
1325
    }
 
1326
 
 
1327
    ioqueue = key->ioqueue;
 
1328
 
 
1329
    /* Add to the array of connecting socket to be polled */
 
1330
    pj_lock_acquire(ioqueue->lock);
 
1331
 
 
1332
    if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
 
1333
        pj_lock_release(ioqueue->lock);
 
1334
        return PJ_ETOOMANYCONN;
 
1335
    }
 
1336
 
 
1337
    /* Get or create event object. */
 
1338
    if (ioqueue->event_count) {
 
1339
        hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
 
1340
        --ioqueue->event_count;
 
1341
    } else {
 
1342
        hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
 
1343
        if (hEvent == NULL) {
 
1344
            DWORD dwStatus = GetLastError();
 
1345
            pj_lock_release(ioqueue->lock);
 
1346
            return PJ_STATUS_FROM_OS(dwStatus);
 
1347
        }
 
1348
    }
 
1349
 
 
1350
    /* Mark key as connecting.
 
1351
     * We can't use array index since key can be removed dynamically. 
 
1352
     */
 
1353
    key->connecting = 1;
 
1354
 
 
1355
    /* Associate socket events to the event object. */
 
1356
    if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
 
1357
        CloseHandle(hEvent);
 
1358
        pj_lock_release(ioqueue->lock);
 
1359
        return PJ_RETURN_OS_ERROR(WSAGetLastError());
 
1360
    }
 
1361
 
 
1362
    /* Add to array. */
 
1363
    ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
 
1364
    ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
 
1365
    ioqueue->connecting_count++;
 
1366
 
 
1367
    pj_lock_release(ioqueue->lock);
 
1368
 
 
1369
    return PJ_EPENDING;
 
1370
}
 
1371
#endif  /* #if PJ_HAS_TCP */
 
1372
 
 
1373
 
 
1374
PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
 
1375
                                     pj_size_t size )
 
1376
{
 
1377
    pj_bzero(op_key, size);
 
1378
}
 
1379
 
 
1380
PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
 
1381
                                         pj_ioqueue_op_key_t *op_key )
 
1382
{
 
1383
    BOOL rc;
 
1384
    DWORD bytesTransferred;
 
1385
 
 
1386
    rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
 
1387
                              &bytesTransferred, FALSE );
 
1388
 
 
1389
    if (rc == FALSE) {
 
1390
        return GetLastError()==ERROR_IO_INCOMPLETE;
 
1391
    }
 
1392
 
 
1393
    return FALSE;
 
1394
}
 
1395
 
 
1396
 
 
1397
PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
 
1398
                                                pj_ioqueue_op_key_t *op_key,
 
1399
                                                pj_ssize_t bytes_status )
 
1400
{
 
1401
    BOOL rc;
 
1402
 
 
1403
    rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
 
1404
                                    (long)key, (OVERLAPPED*)op_key );
 
1405
    if (rc == FALSE) {
 
1406
        return PJ_RETURN_OS_ERROR(GetLastError());
 
1407
    }
 
1408
 
 
1409
    return PJ_SUCCESS;
 
1410
}
 
1411
 
 
1412
PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
 
1413
                                               pj_bool_t allow)
 
1414
{
 
1415
    PJ_ASSERT_RETURN(key, PJ_EINVAL);
 
1416
 
 
1417
    /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
 
1418
     * disabled.
 
1419
     */
 
1420
    PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
 
1421
 
 
1422
    key->allow_concurrent = allow;
 
1423
    return PJ_SUCCESS;
 
1424
}
 
1425
 
 
1426
PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
 
1427
{
 
1428
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
1429
    return pj_mutex_lock(key->mutex);
 
1430
#else
 
1431
    PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
 
1432
#endif
 
1433
}
 
1434
 
 
1435
PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
 
1436
{
 
1437
#if PJ_IOQUEUE_HAS_SAFE_UNREG
 
1438
    return pj_mutex_unlock(key->mutex);
 
1439
#else
 
1440
    PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
 
1441
#endif
 
1442
}
 
1443