~noskcaj/ubuntu/saucy/sflphone/merge-1.2.3-2

« back to all changes in this revision

Viewing changes to daemon/libs/pjproject/pjlib/src/pj/ioqueue_common_abs.c

  • Committer: Jackson Doak
  • Date: 2013-07-10 21:04:46 UTC
  • mfrom: (20.1.3 sid)
  • Revision ID: noskcaj@ubuntu.com-20130710210446-y8f587vza807icr9
Properly merged from upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* $Id: ioqueue_common_abs.c 3553 2011-05-05 06:14:19Z nanang $ */
2
 
/* 
3
 
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
 
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5
 
 *
6
 
 * This program is free software; you can redistribute it and/or modify
7
 
 * it under the terms of the GNU General Public License as published by
8
 
 * the Free Software Foundation; either version 2 of the License, or
9
 
 * (at your option) any later version.
10
 
 *
11
 
 * This program is distributed in the hope that it will be useful,
12
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 * GNU General Public License for more details.
15
 
 *
16
 
 * You should have received a copy of the GNU General Public License
17
 
 * along with this program; if not, write to the Free Software
18
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
19
 
 */
20
 
 
21
 
/*
22
 
 * ioqueue_common_abs.c
23
 
 *
24
 
 * This contains common functionalities to emulate proactor pattern with
25
 
 * various event dispatching mechanisms (e.g. select, epoll).
26
 
 *
27
 
 * This file will be included by the appropriate ioqueue implementation.
28
 
 * This file is NOT supposed to be compiled as stand-alone source.
29
 
 */
30
 
 
31
 
#define PENDING_RETRY   2
32
 
 
33
 
static void ioqueue_init( pj_ioqueue_t *ioqueue )
34
 
{
35
 
    ioqueue->lock = NULL;
36
 
    ioqueue->auto_delete_lock = 0;
37
 
    ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
38
 
}
39
 
 
40
 
static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
41
 
{
42
 
    if (ioqueue->auto_delete_lock && ioqueue->lock ) {
43
 
        pj_lock_release(ioqueue->lock);
44
 
        return pj_lock_destroy(ioqueue->lock);
45
 
    }
46
 
    
47
 
    return PJ_SUCCESS;
48
 
}
49
 
 
50
 
/*
51
 
 * pj_ioqueue_set_lock()
52
 
 */
53
 
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, 
54
 
                                         pj_lock_t *lock,
55
 
                                         pj_bool_t auto_delete )
56
 
{
57
 
    PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
58
 
 
59
 
    if (ioqueue->auto_delete_lock && ioqueue->lock) {
60
 
        pj_lock_destroy(ioqueue->lock);
61
 
    }
62
 
 
63
 
    ioqueue->lock = lock;
64
 
    ioqueue->auto_delete_lock = auto_delete;
65
 
 
66
 
    return PJ_SUCCESS;
67
 
}
68
 
 
69
 
static pj_status_t ioqueue_init_key( pj_pool_t *pool,
70
 
                                     pj_ioqueue_t *ioqueue,
71
 
                                     pj_ioqueue_key_t *key,
72
 
                                     pj_sock_t sock,
73
 
                                     void *user_data,
74
 
                                     const pj_ioqueue_callback *cb)
75
 
{
76
 
    pj_status_t rc;
77
 
    int optlen;
78
 
 
79
 
    PJ_UNUSED_ARG(pool);
80
 
 
81
 
    key->ioqueue = ioqueue;
82
 
    key->fd = sock;
83
 
    key->user_data = user_data;
84
 
    pj_list_init(&key->read_list);
85
 
    pj_list_init(&key->write_list);
86
 
#if PJ_HAS_TCP
87
 
    pj_list_init(&key->accept_list);
88
 
    key->connecting = 0;
89
 
#endif
90
 
 
91
 
    /* Save callback. */
92
 
    pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
93
 
 
94
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
95
 
    /* Set initial reference count to 1 */
96
 
    pj_assert(key->ref_count == 0);
97
 
    ++key->ref_count;
98
 
 
99
 
    key->closing = 0;
100
 
#endif
101
 
 
102
 
    rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
103
 
    if (rc != PJ_SUCCESS)
104
 
        return rc;
105
 
 
106
 
    /* Get socket type. When socket type is datagram, some optimization
107
 
     * will be performed during send to allow parallel send operations.
108
 
     */
109
 
    optlen = sizeof(key->fd_type);
110
 
    rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
111
 
                            &key->fd_type, &optlen);
112
 
    if (rc != PJ_SUCCESS)
113
 
        key->fd_type = pj_SOCK_STREAM();
114
 
 
115
 
    /* Create mutex for the key. */
116
 
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
117
 
    rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
118
 
#endif
119
 
    
120
 
    return rc;
121
 
}
122
 
 
123
 
/*
124
 
 * pj_ioqueue_get_user_data()
125
 
 *
126
 
 * Obtain value associated with a key.
127
 
 */
128
 
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
129
 
{
130
 
    PJ_ASSERT_RETURN(key != NULL, NULL);
131
 
    return key->user_data;
132
 
}
133
 
 
134
 
/*
135
 
 * pj_ioqueue_set_user_data()
136
 
 */
137
 
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
138
 
                                              void *user_data,
139
 
                                              void **old_data)
140
 
{
141
 
    PJ_ASSERT_RETURN(key, PJ_EINVAL);
142
 
 
143
 
    if (old_data)
144
 
        *old_data = key->user_data;
145
 
    key->user_data = user_data;
146
 
 
147
 
    return PJ_SUCCESS;
148
 
}
149
 
 
150
 
PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
151
 
{
152
 
    return !pj_list_empty(&key->write_list);
153
 
}
154
 
 
155
 
PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
156
 
{
157
 
    return !pj_list_empty(&key->read_list);
158
 
}
159
 
 
160
 
PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
161
 
{
162
 
#if PJ_HAS_TCP
163
 
    return !pj_list_empty(&key->accept_list);
164
 
#else
165
 
    PJ_UNUSED_ARG(key);
166
 
    return 0;
167
 
#endif
168
 
}
169
 
 
170
 
PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
171
 
{
172
 
    return key->connecting;
173
 
}
174
 
 
175
 
 
176
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
177
 
#   define IS_CLOSING(key)  (key->closing)
178
 
#else
179
 
#   define IS_CLOSING(key)  (0)
180
 
#endif
181
 
 
182
 
 
183
 
/*
184
 
 * ioqueue_dispatch_event()
185
 
 *
186
 
 * Report occurence of an event in the key to be processed by the
187
 
 * framework.
188
 
 */
189
 
void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
190
 
{
191
 
    /* Lock the key. */
192
 
    pj_mutex_lock(h->mutex);
193
 
 
194
 
    if (IS_CLOSING(h)) {
195
 
        pj_mutex_unlock(h->mutex);
196
 
        return;
197
 
    }
198
 
 
199
 
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
200
 
    if (h->connecting) {
201
 
        /* Completion of connect() operation */
202
 
        pj_status_t status;
203
 
        pj_bool_t has_lock;
204
 
 
205
 
        /* Clear operation. */
206
 
        h->connecting = 0;
207
 
 
208
 
        ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
209
 
        ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
210
 
 
211
 
 
212
 
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
213
 
        /* from connect(2): 
214
 
         * On Linux, use getsockopt to read the SO_ERROR option at
215
 
         * level SOL_SOCKET to determine whether connect() completed
216
 
         * successfully (if SO_ERROR is zero).
217
 
         */
218
 
        {
219
 
          int value;
220
 
          int vallen = sizeof(value);
221
 
          int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 
222
 
                                         &value, &vallen);
223
 
          if (gs_rc != 0) {
224
 
            /* Argh!! What to do now??? 
225
 
             * Just indicate that the socket is connected. The
226
 
             * application will get error as soon as it tries to use
227
 
             * the socket to send/receive.
228
 
             */
229
 
              status = PJ_SUCCESS;
230
 
          } else {
231
 
              status = PJ_STATUS_FROM_OS(value);
232
 
          }
233
 
        }
234
 
#elif defined(PJ_WIN32) && PJ_WIN32!=0
235
 
        status = PJ_SUCCESS; /* success */
236
 
#else
237
 
        /* Excellent information in D.J. Bernstein page:
238
 
         * http://cr.yp.to/docs/connect.html
239
 
         *
240
 
         * Seems like the most portable way of detecting connect()
241
 
         * failure is to call getpeername(). If socket is connected,
242
 
         * getpeername() will return 0. If the socket is not connected,
243
 
         * it will return ENOTCONN, and read(fd, &ch, 1) will produce
244
 
         * the right errno through error slippage. This is a combination
245
 
         * of suggestions from Douglas C. Schmidt and Ken Keys.
246
 
         */
247
 
        {
248
 
            struct sockaddr_in addr;
249
 
            int addrlen = sizeof(addr);
250
 
 
251
 
            status = pj_sock_getpeername(h->fd, (struct sockaddr*)&addr,
252
 
                                         &addrlen);
253
 
        }
254
 
#endif
255
 
 
256
 
        /* Unlock; from this point we don't need to hold key's mutex
257
 
         * (unless concurrency is disabled, which in this case we should
258
 
         * hold the mutex while calling the callback) */
259
 
        if (h->allow_concurrent) {
260
 
            /* concurrency may be changed while we're in the callback, so
261
 
             * save it to a flag.
262
 
             */
263
 
            has_lock = PJ_FALSE;
264
 
            pj_mutex_unlock(h->mutex);
265
 
        } else {
266
 
            has_lock = PJ_TRUE;
267
 
        }
268
 
 
269
 
        /* Call callback. */
270
 
        if (h->cb.on_connect_complete && !IS_CLOSING(h))
271
 
            (*h->cb.on_connect_complete)(h, status);
272
 
 
273
 
        /* Unlock if we still hold the lock */
274
 
        if (has_lock) {
275
 
            pj_mutex_unlock(h->mutex);
276
 
        }
277
 
 
278
 
        /* Done. */
279
 
 
280
 
    } else 
281
 
#endif /* PJ_HAS_TCP */
282
 
    if (key_has_pending_write(h)) {
283
 
        /* Socket is writable. */
284
 
        struct write_operation *write_op;
285
 
        pj_ssize_t sent;
286
 
        pj_status_t send_rc;
287
 
 
288
 
        /* Get the first in the queue. */
289
 
        write_op = h->write_list.next;
290
 
 
291
 
        /* For datagrams, we can remove the write_op from the list
292
 
         * so that send() can work in parallel.
293
 
         */
294
 
        if (h->fd_type == pj_SOCK_DGRAM()) {
295
 
            pj_list_erase(write_op);
296
 
 
297
 
            if (pj_list_empty(&h->write_list))
298
 
                ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
299
 
 
300
 
        }
301
 
 
302
 
        /* Send the data. 
303
 
         * Unfortunately we must do this while holding key's mutex, thus
304
 
         * preventing parallel write on a single key.. :-((
305
 
         */
306
 
        sent = write_op->size - write_op->written;
307
 
        if (write_op->op == PJ_IOQUEUE_OP_SEND) {
308
 
            send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
309
 
                                   &sent, write_op->flags);
310
 
            /* Can't do this. We only clear "op" after we're finished sending
311
 
             * the whole buffer.
312
 
             */
313
 
            //write_op->op = 0;
314
 
        } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
315
 
            int retry;
316
 
            for (retry=0; retry<2; ++retry) {
317
 
                send_rc = pj_sock_sendto(h->fd, 
318
 
                                         write_op->buf+write_op->written,
319
 
                                         &sent, write_op->flags,
320
 
                                         &write_op->rmt_addr, 
321
 
                                         write_op->rmt_addrlen);
322
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
323
 
            PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
324
 
                /* Special treatment for dead UDP sockets here, see ticket #1107 */
325
 
                if (send_rc==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(h) &&
326
 
                    h->fd_type==pj_SOCK_DGRAM())
327
 
                {
328
 
                    PJ_PERROR(4,(THIS_FILE, send_rc,
329
 
                                 "Send error for socket %d, retrying",
330
 
                                 h->fd));
331
 
                    replace_udp_sock(h);
332
 
                    continue;
333
 
                }
334
 
#endif
335
 
                break;
336
 
            }
337
 
 
338
 
            /* Can't do this. We only clear "op" after we're finished sending
339
 
             * the whole buffer.
340
 
             */
341
 
            //write_op->op = 0;
342
 
        } else {
343
 
            pj_assert(!"Invalid operation type!");
344
 
            write_op->op = PJ_IOQUEUE_OP_NONE;
345
 
            send_rc = PJ_EBUG;
346
 
        }
347
 
 
348
 
        if (send_rc == PJ_SUCCESS) {
349
 
            write_op->written += sent;
350
 
        } else {
351
 
            pj_assert(send_rc > 0);
352
 
            write_op->written = -send_rc;
353
 
        }
354
 
 
355
 
        /* Are we finished with this buffer? */
356
 
        if (send_rc!=PJ_SUCCESS || 
357
 
            write_op->written == (pj_ssize_t)write_op->size ||
358
 
            h->fd_type == pj_SOCK_DGRAM()) 
359
 
        {
360
 
            pj_bool_t has_lock;
361
 
 
362
 
            write_op->op = PJ_IOQUEUE_OP_NONE;
363
 
 
364
 
            if (h->fd_type != pj_SOCK_DGRAM()) {
365
 
                /* Write completion of the whole stream. */
366
 
                pj_list_erase(write_op);
367
 
 
368
 
                /* Clear operation if there's no more data to send. */
369
 
                if (pj_list_empty(&h->write_list))
370
 
                    ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
371
 
 
372
 
            }
373
 
 
374
 
            /* Unlock; from this point we don't need to hold key's mutex
375
 
             * (unless concurrency is disabled, which in this case we should
376
 
             * hold the mutex while calling the callback) */
377
 
            if (h->allow_concurrent) {
378
 
                /* concurrency may be changed while we're in the callback, so
379
 
                 * save it to a flag.
380
 
                 */
381
 
                has_lock = PJ_FALSE;
382
 
                pj_mutex_unlock(h->mutex);
383
 
            } else {
384
 
                has_lock = PJ_TRUE;
385
 
            }
386
 
 
387
 
            /* Call callback. */
388
 
            if (h->cb.on_write_complete && !IS_CLOSING(h)) {
389
 
                (*h->cb.on_write_complete)(h, 
390
 
                                           (pj_ioqueue_op_key_t*)write_op,
391
 
                                           write_op->written);
392
 
            }
393
 
 
394
 
            if (has_lock) {
395
 
                pj_mutex_unlock(h->mutex);
396
 
            }
397
 
 
398
 
        } else {
399
 
            pj_mutex_unlock(h->mutex);
400
 
        }
401
 
 
402
 
        /* Done. */
403
 
    } else {
404
 
        /*
405
 
         * This is normal; execution may fall here when multiple threads
406
 
         * are signalled for the same event, but only one thread eventually
407
 
         * able to process the event.
408
 
         */
409
 
        pj_mutex_unlock(h->mutex);
410
 
    }
411
 
}
412
 
 
413
 
void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
414
 
{
415
 
    pj_status_t rc;
416
 
 
417
 
    /* Lock the key. */
418
 
    pj_mutex_lock(h->mutex);
419
 
 
420
 
    if (IS_CLOSING(h)) {
421
 
        pj_mutex_unlock(h->mutex);
422
 
        return;
423
 
    }
424
 
 
425
 
#   if PJ_HAS_TCP
426
 
    if (!pj_list_empty(&h->accept_list)) {
427
 
 
428
 
        struct accept_operation *accept_op;
429
 
        pj_bool_t has_lock;
430
 
        
431
 
        /* Get one accept operation from the list. */
432
 
        accept_op = h->accept_list.next;
433
 
        pj_list_erase(accept_op);
434
 
        accept_op->op = PJ_IOQUEUE_OP_NONE;
435
 
 
436
 
        /* Clear bit in fdset if there is no more pending accept */
437
 
        if (pj_list_empty(&h->accept_list))
438
 
            ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
439
 
 
440
 
        rc=pj_sock_accept(h->fd, accept_op->accept_fd, 
441
 
                          accept_op->rmt_addr, accept_op->addrlen);
442
 
        if (rc==PJ_SUCCESS && accept_op->local_addr) {
443
 
            rc = pj_sock_getsockname(*accept_op->accept_fd, 
444
 
                                     accept_op->local_addr,
445
 
                                     accept_op->addrlen);
446
 
        }
447
 
 
448
 
        /* Unlock; from this point we don't need to hold key's mutex
449
 
         * (unless concurrency is disabled, which in this case we should
450
 
         * hold the mutex while calling the callback) */
451
 
        if (h->allow_concurrent) {
452
 
            /* concurrency may be changed while we're in the callback, so
453
 
             * save it to a flag.
454
 
             */
455
 
            has_lock = PJ_FALSE;
456
 
            pj_mutex_unlock(h->mutex);
457
 
        } else {
458
 
            has_lock = PJ_TRUE;
459
 
        }
460
 
 
461
 
        /* Call callback. */
462
 
        if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
463
 
            (*h->cb.on_accept_complete)(h, 
464
 
                                        (pj_ioqueue_op_key_t*)accept_op,
465
 
                                        *accept_op->accept_fd, rc);
466
 
        }
467
 
 
468
 
        if (has_lock) {
469
 
            pj_mutex_unlock(h->mutex);
470
 
        }
471
 
    }
472
 
    else
473
 
#   endif
474
 
    if (key_has_pending_read(h)) {
475
 
        struct read_operation *read_op;
476
 
        pj_ssize_t bytes_read;
477
 
        pj_bool_t has_lock;
478
 
 
479
 
        /* Get one pending read operation from the list. */
480
 
        read_op = h->read_list.next;
481
 
        pj_list_erase(read_op);
482
 
 
483
 
        /* Clear fdset if there is no pending read. */
484
 
        if (pj_list_empty(&h->read_list))
485
 
            ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
486
 
 
487
 
        bytes_read = read_op->size;
488
 
 
489
 
        if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
490
 
            read_op->op = PJ_IOQUEUE_OP_NONE;
491
 
            rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 
492
 
                                  read_op->flags,
493
 
                                  read_op->rmt_addr, 
494
 
                                  read_op->rmt_addrlen);
495
 
        } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
496
 
            read_op->op = PJ_IOQUEUE_OP_NONE;
497
 
            rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 
498
 
                              read_op->flags);
499
 
        } else {
500
 
            pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
501
 
            read_op->op = PJ_IOQUEUE_OP_NONE;
502
 
            /*
503
 
             * User has specified pj_ioqueue_read().
504
 
             * On Win32, we should do ReadFile(). But because we got
505
 
             * here because of select() anyway, user must have put a
506
 
             * socket descriptor on h->fd, which in this case we can
507
 
             * just call pj_sock_recv() instead of ReadFile().
508
 
             * On Unix, user may put a file in h->fd, so we'll have
509
 
             * to call read() here.
510
 
             * This may not compile on systems which doesn't have 
511
 
             * read(). That's why we only specify PJ_LINUX here so
512
 
             * that error is easier to catch.
513
 
             */
514
 
#           if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
515
 
               defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
516
 
                rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 
517
 
                                  read_op->flags);
518
 
                //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
519
 
                //              &bytes_read, NULL);
520
 
#           elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
521
 
                bytes_read = read(h->fd, read_op->buf, bytes_read);
522
 
                rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
523
 
#           elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
524
 
                bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
525
 
                rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
526
 
#           else
527
 
#               error "Implement read() for this platform!"
528
 
#           endif
529
 
        }
530
 
        
531
 
        if (rc != PJ_SUCCESS) {
532
 
#           if defined(PJ_WIN32) && PJ_WIN32 != 0
533
 
            /* On Win32, for UDP, WSAECONNRESET on the receive side 
534
 
             * indicates that previous sending has triggered ICMP Port 
535
 
             * Unreachable message.
536
 
             * But we wouldn't know at this point which one of previous 
537
 
             * key that has triggered the error, since UDP socket can
538
 
             * be shared!
539
 
             * So we'll just ignore it!
540
 
             */
541
 
 
542
 
            if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
543
 
                //PJ_LOG(4,(THIS_FILE, 
544
 
                //          "Ignored ICMP port unreach. on key=%p", h));
545
 
            }
546
 
#           endif
547
 
 
548
 
            /* In any case we would report this to caller. */
549
 
            bytes_read = -rc;
550
 
 
551
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
552
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
553
 
            /* Special treatment for dead UDP sockets here, see ticket #1107 */
554
 
            if (rc == PJ_STATUS_FROM_OS(ENOTCONN) && !IS_CLOSING(h) &&
555
 
                h->fd_type==pj_SOCK_DGRAM())
556
 
            {
557
 
                replace_udp_sock(h);
558
 
            }
559
 
#endif
560
 
        }
561
 
 
562
 
        /* Unlock; from this point we don't need to hold key's mutex
563
 
         * (unless concurrency is disabled, which in this case we should
564
 
         * hold the mutex while calling the callback) */
565
 
        if (h->allow_concurrent) {
566
 
            /* concurrency may be changed while we're in the callback, so
567
 
             * save it to a flag.
568
 
             */
569
 
            has_lock = PJ_FALSE;
570
 
            pj_mutex_unlock(h->mutex);
571
 
        } else {
572
 
            has_lock = PJ_TRUE;
573
 
        }
574
 
 
575
 
        /* Call callback. */
576
 
        if (h->cb.on_read_complete && !IS_CLOSING(h)) {
577
 
            (*h->cb.on_read_complete)(h, 
578
 
                                      (pj_ioqueue_op_key_t*)read_op,
579
 
                                      bytes_read);
580
 
        }
581
 
 
582
 
        if (has_lock) {
583
 
            pj_mutex_unlock(h->mutex);
584
 
        }
585
 
 
586
 
    } else {
587
 
        /*
588
 
         * This is normal; execution may fall here when multiple threads
589
 
         * are signalled for the same event, but only one thread eventually
590
 
         * able to process the event.
591
 
         */
592
 
        pj_mutex_unlock(h->mutex);
593
 
    }
594
 
}
595
 
 
596
 
 
597
 
void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, 
598
 
                                       pj_ioqueue_key_t *h )
599
 
{
600
 
    pj_bool_t has_lock;
601
 
 
602
 
    pj_mutex_lock(h->mutex);
603
 
 
604
 
    if (!h->connecting) {
605
 
        /* It is possible that more than one thread was woken up, thus
606
 
         * the remaining thread will see h->connecting as zero because
607
 
         * it has been processed by other thread.
608
 
         */
609
 
        pj_mutex_unlock(h->mutex);
610
 
        return;
611
 
    }
612
 
 
613
 
    if (IS_CLOSING(h)) {
614
 
        pj_mutex_unlock(h->mutex);
615
 
        return;
616
 
    }
617
 
 
618
 
    /* Clear operation. */
619
 
    h->connecting = 0;
620
 
 
621
 
    ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
622
 
    ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
623
 
 
624
 
    /* Unlock; from this point we don't need to hold key's mutex
625
 
     * (unless concurrency is disabled, which in this case we should
626
 
     * hold the mutex while calling the callback) */
627
 
    if (h->allow_concurrent) {
628
 
        /* concurrency may be changed while we're in the callback, so
629
 
         * save it to a flag.
630
 
         */
631
 
        has_lock = PJ_FALSE;
632
 
        pj_mutex_unlock(h->mutex);
633
 
    } else {
634
 
        has_lock = PJ_TRUE;
635
 
    }
636
 
 
637
 
    /* Call callback. */
638
 
    if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
639
 
        pj_status_t status = -1;
640
 
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
641
 
        int value;
642
 
        int vallen = sizeof(value);
643
 
        int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 
644
 
                                       &value, &vallen);
645
 
        if (gs_rc == 0) {
646
 
            status = PJ_RETURN_OS_ERROR(value);
647
 
        }
648
 
#endif
649
 
 
650
 
        (*h->cb.on_connect_complete)(h, status);
651
 
    }
652
 
 
653
 
    if (has_lock) {
654
 
        pj_mutex_unlock(h->mutex);
655
 
    }
656
 
}
657
 
 
658
 
/*
659
 
 * pj_ioqueue_recv()
660
 
 *
661
 
 * Start asynchronous recv() from the socket.
662
 
 */
663
 
PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key,
664
 
                                      pj_ioqueue_op_key_t *op_key,
665
 
                                      void *buffer,
666
 
                                      pj_ssize_t *length,
667
 
                                      unsigned flags )
668
 
{
669
 
    struct read_operation *read_op;
670
 
 
671
 
    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
672
 
    PJ_CHECK_STACK();
673
 
 
674
 
    /* Check if key is closing (need to do this first before accessing
675
 
     * other variables, since they might have been destroyed. See ticket
676
 
     * #469).
677
 
     */
678
 
    if (IS_CLOSING(key))
679
 
        return PJ_ECANCELLED;
680
 
 
681
 
    read_op = (struct read_operation*)op_key;
682
 
    read_op->op = PJ_IOQUEUE_OP_NONE;
683
 
 
684
 
    /* Try to see if there's data immediately available. 
685
 
     */
686
 
    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
687
 
        pj_status_t status;
688
 
        pj_ssize_t size;
689
 
 
690
 
        size = *length;
691
 
        status = pj_sock_recv(key->fd, buffer, &size, flags);
692
 
        if (status == PJ_SUCCESS) {
693
 
            /* Yes! Data is available! */
694
 
            *length = size;
695
 
            return PJ_SUCCESS;
696
 
        } else {
697
 
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
698
 
             * the error to caller.
699
 
             */
700
 
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
701
 
                return status;
702
 
        }
703
 
    }
704
 
 
705
 
    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
706
 
 
707
 
    /*
708
 
     * No data is immediately available.
709
 
     * Must schedule asynchronous operation to the ioqueue.
710
 
     */
711
 
    read_op->op = PJ_IOQUEUE_OP_RECV;
712
 
    read_op->buf = buffer;
713
 
    read_op->size = *length;
714
 
    read_op->flags = flags;
715
 
 
716
 
    pj_mutex_lock(key->mutex);
717
 
    /* Check again. Handle may have been closed after the previous check
718
 
     * in multithreaded app. If we add bad handle to the set it will
719
 
     * corrupt the ioqueue set. See #913
720
 
     */
721
 
    if (IS_CLOSING(key)) {
722
 
        pj_mutex_unlock(key->mutex);
723
 
        return PJ_ECANCELLED;
724
 
    }
725
 
    pj_list_insert_before(&key->read_list, read_op);
726
 
    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
727
 
    pj_mutex_unlock(key->mutex);
728
 
 
729
 
    return PJ_EPENDING;
730
 
}
731
 
 
732
 
/*
733
 
 * pj_ioqueue_recvfrom()
734
 
 *
735
 
 * Start asynchronous recvfrom() from the socket.
736
 
 */
737
 
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
738
 
                                         pj_ioqueue_op_key_t *op_key,
739
 
                                         void *buffer,
740
 
                                         pj_ssize_t *length,
741
 
                                         unsigned flags,
742
 
                                         pj_sockaddr_t *addr,
743
 
                                         int *addrlen)
744
 
{
745
 
    struct read_operation *read_op;
746
 
 
747
 
    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
748
 
    PJ_CHECK_STACK();
749
 
 
750
 
    /* Check if key is closing. */
751
 
    if (IS_CLOSING(key))
752
 
        return PJ_ECANCELLED;
753
 
 
754
 
    read_op = (struct read_operation*)op_key;
755
 
    read_op->op = PJ_IOQUEUE_OP_NONE;
756
 
 
757
 
    /* Try to see if there's data immediately available. 
758
 
     */
759
 
    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
760
 
        pj_status_t status;
761
 
        pj_ssize_t size;
762
 
 
763
 
        size = *length;
764
 
        status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
765
 
                                  addr, addrlen);
766
 
        if (status == PJ_SUCCESS) {
767
 
            /* Yes! Data is available! */
768
 
            *length = size;
769
 
            return PJ_SUCCESS;
770
 
        } else {
771
 
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
772
 
             * the error to caller.
773
 
             */
774
 
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
775
 
                return status;
776
 
        }
777
 
    }
778
 
 
779
 
    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
780
 
 
781
 
    /*
782
 
     * No data is immediately available.
783
 
     * Must schedule asynchronous operation to the ioqueue.
784
 
     */
785
 
    read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
786
 
    read_op->buf = buffer;
787
 
    read_op->size = *length;
788
 
    read_op->flags = flags;
789
 
    read_op->rmt_addr = addr;
790
 
    read_op->rmt_addrlen = addrlen;
791
 
 
792
 
    pj_mutex_lock(key->mutex);
793
 
    /* Check again. Handle may have been closed after the previous check
794
 
     * in multithreaded app. If we add bad handle to the set it will
795
 
     * corrupt the ioqueue set. See #913
796
 
     */
797
 
    if (IS_CLOSING(key)) {
798
 
        pj_mutex_unlock(key->mutex);
799
 
        return PJ_ECANCELLED;
800
 
    }
801
 
    pj_list_insert_before(&key->read_list, read_op);
802
 
    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
803
 
    pj_mutex_unlock(key->mutex);
804
 
 
805
 
    return PJ_EPENDING;
806
 
}
807
 
 
808
 
/*
809
 
 * pj_ioqueue_send()
810
 
 *
811
 
 * Start asynchronous send() to the descriptor.
812
 
 */
813
 
PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
814
 
                                     pj_ioqueue_op_key_t *op_key,
815
 
                                     const void *data,
816
 
                                     pj_ssize_t *length,
817
 
                                     unsigned flags)
818
 
{
819
 
    struct write_operation *write_op;
820
 
    pj_status_t status;
821
 
    unsigned retry;
822
 
    pj_ssize_t sent;
823
 
 
824
 
    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
825
 
    PJ_CHECK_STACK();
826
 
 
827
 
    /* Check if key is closing. */
828
 
    if (IS_CLOSING(key))
829
 
        return PJ_ECANCELLED;
830
 
 
831
 
    /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
832
 
    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
833
 
 
834
 
    /* Fast track:
835
 
     *   Try to send data immediately, only if there's no pending write!
836
 
     * Note:
837
 
     *  We are speculating that the list is empty here without properly
838
 
     *  acquiring ioqueue's mutex first. This is intentional, to maximize
839
 
     *  performance via parallelism.
840
 
     *
841
 
     *  This should be safe, because:
842
 
     *      - by convention, we require caller to make sure that the
843
 
     *        key is not unregistered while other threads are invoking
844
 
     *        an operation on the same key.
845
 
     *      - pj_list_empty() is safe to be invoked by multiple threads,
846
 
     *        even when other threads are modifying the list.
847
 
     */
848
 
    if (pj_list_empty(&key->write_list)) {
849
 
        /*
850
 
         * See if data can be sent immediately.
851
 
         */
852
 
        sent = *length;
853
 
        status = pj_sock_send(key->fd, data, &sent, flags);
854
 
        if (status == PJ_SUCCESS) {
855
 
            /* Success! */
856
 
            *length = sent;
857
 
            return PJ_SUCCESS;
858
 
        } else {
859
 
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
860
 
             * the error to caller.
861
 
             */
862
 
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
863
 
                return status;
864
 
            }
865
 
        }
866
 
    }
867
 
 
868
 
    /*
869
 
     * Schedule asynchronous send.
870
 
     */
871
 
    write_op = (struct write_operation*)op_key;
872
 
 
873
 
    /* Spin if write_op has pending operation */
874
 
    for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
875
 
        pj_thread_sleep(0);
876
 
 
877
 
    /* Last chance */
878
 
    if (write_op->op) {
879
 
        /* Unable to send packet because there is already pending write in the
880
 
         * write_op. We could not put the operation into the write_op
881
 
         * because write_op already contains a pending operation! And
882
 
         * we could not send the packet directly with send() either,
883
 
         * because that will break the order of the packet. So we can
884
 
         * only return error here.
885
 
         *
886
 
         * This could happen for example in multithreads program,
887
 
         * where polling is done by one thread, while other threads are doing
888
 
         * the sending only. If the polling thread runs on lower priority
889
 
         * than the sending thread, then it's possible that the pending
890
 
         * write flag is not cleared in-time because clearing is only done
891
 
         * during polling. 
892
 
         *
893
 
         * Aplication should specify multiple write operation keys on
894
 
         * situation like this.
895
 
         */
896
 
        //pj_assert(!"ioqueue: there is pending operation on this key!");
897
 
        return PJ_EBUSY;
898
 
    }
899
 
 
900
 
    write_op->op = PJ_IOQUEUE_OP_SEND;
901
 
    write_op->buf = (char*)data;
902
 
    write_op->size = *length;
903
 
    write_op->written = 0;
904
 
    write_op->flags = flags;
905
 
    
906
 
    pj_mutex_lock(key->mutex);
907
 
    /* Check again. Handle may have been closed after the previous check
908
 
     * in multithreaded app. If we add bad handle to the set it will
909
 
     * corrupt the ioqueue set. See #913
910
 
     */
911
 
    if (IS_CLOSING(key)) {
912
 
        pj_mutex_unlock(key->mutex);
913
 
        return PJ_ECANCELLED;
914
 
    }
915
 
    pj_list_insert_before(&key->write_list, write_op);
916
 
    ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
917
 
    pj_mutex_unlock(key->mutex);
918
 
 
919
 
    return PJ_EPENDING;
920
 
}
921
 
 
922
 
 
923
 
/*
924
 
 * pj_ioqueue_sendto()
925
 
 *
926
 
 * Start asynchronous write() to the descriptor.
927
 
 */
928
 
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
929
 
                                       pj_ioqueue_op_key_t *op_key,
930
 
                                       const void *data,
931
 
                                       pj_ssize_t *length,
932
 
                                       pj_uint32_t flags,
933
 
                                       const pj_sockaddr_t *addr,
934
 
                                       int addrlen)
935
 
{
936
 
    struct write_operation *write_op;
937
 
    unsigned retry;
938
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
939
 
            PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
940
 
    pj_bool_t restart_retry = PJ_FALSE;
941
 
#endif
942
 
    pj_status_t status;
943
 
    pj_ssize_t sent;
944
 
 
945
 
    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
946
 
    PJ_CHECK_STACK();
947
 
 
948
 
retry_on_restart:
949
 
    /* Check if key is closing. */
950
 
    if (IS_CLOSING(key))
951
 
        return PJ_ECANCELLED;
952
 
 
953
 
    /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
954
 
    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
955
 
 
956
 
    /* Fast track:
957
 
     *   Try to send data immediately, only if there's no pending write!
958
 
     * Note:
959
 
     *  We are speculating that the list is empty here without properly
960
 
     *  acquiring ioqueue's mutex first. This is intentional, to maximize
961
 
     *  performance via parallelism.
962
 
     *
963
 
     *  This should be safe, because:
964
 
     *      - by convention, we require caller to make sure that the
965
 
     *        key is not unregistered while other threads are invoking
966
 
     *        an operation on the same key.
967
 
     *      - pj_list_empty() is safe to be invoked by multiple threads,
968
 
     *        even when other threads are modifying the list.
969
 
     */
970
 
    if (pj_list_empty(&key->write_list)) {
971
 
        /*
972
 
         * See if data can be sent immediately.
973
 
         */
974
 
        sent = *length;
975
 
        status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
976
 
        if (status == PJ_SUCCESS) {
977
 
            /* Success! */
978
 
            *length = sent;
979
 
            return PJ_SUCCESS;
980
 
        } else {
981
 
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
982
 
             * the error to caller.
983
 
             */
984
 
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
985
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
986
 
            PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
987
 
                /* Special treatment for dead UDP sockets here, see ticket #1107 */
988
 
                if (status==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(key) &&
989
 
                    key->fd_type==pj_SOCK_DGRAM() && !restart_retry)
990
 
                {
991
 
                    PJ_PERROR(4,(THIS_FILE, status,
992
 
                                 "Send error for socket %d, retrying",
993
 
                                 key->fd));
994
 
                    replace_udp_sock(key);
995
 
                    restart_retry = PJ_TRUE;
996
 
                    goto retry_on_restart;
997
 
                }
998
 
#endif
999
 
 
1000
 
                return status;
1001
 
            }
1002
 
            status = status;
1003
 
        }
1004
 
    }
1005
 
 
1006
 
    /*
1007
 
     * Check that address storage can hold the address parameter.
1008
 
     */
1009
 
    PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
1010
 
 
1011
 
    /*
1012
 
     * Schedule asynchronous send.
1013
 
     */
1014
 
    write_op = (struct write_operation*)op_key;
1015
 
    
1016
 
    /* Spin if write_op has pending operation */
1017
 
    for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
1018
 
        pj_thread_sleep(0);
1019
 
 
1020
 
    /* Last chance */
1021
 
    if (write_op->op) {
1022
 
        /* Unable to send packet because there is already pending write on the
1023
 
         * write_op. We could not put the operation into the write_op
1024
 
         * because write_op already contains a pending operation! And
1025
 
         * we could not send the packet directly with sendto() either,
1026
 
         * because that will break the order of the packet. So we can
1027
 
         * only return error here.
1028
 
         *
1029
 
         * This could happen for example in multithreads program,
1030
 
         * where polling is done by one thread, while other threads are doing
1031
 
         * the sending only. If the polling thread runs on lower priority
1032
 
         * than the sending thread, then it's possible that the pending
1033
 
         * write flag is not cleared in-time because clearing is only done
1034
 
         * during polling. 
1035
 
         *
1036
 
         * Aplication should specify multiple write operation keys on
1037
 
         * situation like this.
1038
 
         */
1039
 
        //pj_assert(!"ioqueue: there is pending operation on this key!");
1040
 
        return PJ_EBUSY;
1041
 
    }
1042
 
 
1043
 
    write_op->op = PJ_IOQUEUE_OP_SEND_TO;
1044
 
    write_op->buf = (char*)data;
1045
 
    write_op->size = *length;
1046
 
    write_op->written = 0;
1047
 
    write_op->flags = flags;
1048
 
    pj_memcpy(&write_op->rmt_addr, addr, addrlen);
1049
 
    write_op->rmt_addrlen = addrlen;
1050
 
    
1051
 
    pj_mutex_lock(key->mutex);
1052
 
    /* Check again. Handle may have been closed after the previous check
1053
 
     * in multithreaded app. If we add bad handle to the set it will
1054
 
     * corrupt the ioqueue set. See #913
1055
 
     */
1056
 
    if (IS_CLOSING(key)) {
1057
 
        pj_mutex_unlock(key->mutex);
1058
 
        return PJ_ECANCELLED;
1059
 
    }
1060
 
    pj_list_insert_before(&key->write_list, write_op);
1061
 
    ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1062
 
    pj_mutex_unlock(key->mutex);
1063
 
 
1064
 
    return PJ_EPENDING;
1065
 
}
1066
 
 
1067
 
#if PJ_HAS_TCP
1068
 
/*
1069
 
 * Initiate overlapped accept() operation.
1070
 
 */
1071
 
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1072
 
                                       pj_ioqueue_op_key_t *op_key,
1073
 
                                       pj_sock_t *new_sock,
1074
 
                                       pj_sockaddr_t *local,
1075
 
                                       pj_sockaddr_t *remote,
1076
 
                                       int *addrlen)
1077
 
{
1078
 
    struct accept_operation *accept_op;
1079
 
    pj_status_t status;
1080
 
 
1081
 
    /* check parameters. All must be specified! */
1082
 
    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1083
 
 
1084
 
    /* Check if key is closing. */
1085
 
    if (IS_CLOSING(key))
1086
 
        return PJ_ECANCELLED;
1087
 
 
1088
 
    accept_op = (struct accept_operation*)op_key;
1089
 
    accept_op->op = PJ_IOQUEUE_OP_NONE;
1090
 
 
1091
 
    /* Fast track:
1092
 
     *  See if there's new connection available immediately.
1093
 
     */
1094
 
    if (pj_list_empty(&key->accept_list)) {
1095
 
        status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1096
 
        if (status == PJ_SUCCESS) {
1097
 
            /* Yes! New connection is available! */
1098
 
            if (local && addrlen) {
1099
 
                status = pj_sock_getsockname(*new_sock, local, addrlen);
1100
 
                if (status != PJ_SUCCESS) {
1101
 
                    pj_sock_close(*new_sock);
1102
 
                    *new_sock = PJ_INVALID_SOCKET;
1103
 
                    return status;
1104
 
                }
1105
 
            }
1106
 
            return PJ_SUCCESS;
1107
 
        } else {
1108
 
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1109
 
             * the error to caller.
1110
 
             */
1111
 
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1112
 
                return status;
1113
 
            }
1114
 
        }
1115
 
    }
1116
 
 
1117
 
    /*
1118
 
     * No connection is available immediately.
1119
 
     * Schedule accept() operation to be completed when there is incoming
1120
 
     * connection available.
1121
 
     */
1122
 
    accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1123
 
    accept_op->accept_fd = new_sock;
1124
 
    accept_op->rmt_addr = remote;
1125
 
    accept_op->addrlen= addrlen;
1126
 
    accept_op->local_addr = local;
1127
 
 
1128
 
    pj_mutex_lock(key->mutex);
1129
 
    /* Check again. Handle may have been closed after the previous check
1130
 
     * in multithreaded app. If we add bad handle to the set it will
1131
 
     * corrupt the ioqueue set. See #913
1132
 
     */
1133
 
    if (IS_CLOSING(key)) {
1134
 
        pj_mutex_unlock(key->mutex);
1135
 
        return PJ_ECANCELLED;
1136
 
    }
1137
 
    pj_list_insert_before(&key->accept_list, accept_op);
1138
 
    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
1139
 
    pj_mutex_unlock(key->mutex);
1140
 
 
1141
 
    return PJ_EPENDING;
1142
 
}
1143
 
 
1144
 
/*
1145
 
 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1146
 
 * since there's no overlapped version of connect()).
1147
 
 */
1148
 
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1149
 
                                        const pj_sockaddr_t *addr,
1150
 
                                        int addrlen )
1151
 
{
1152
 
    pj_status_t status;
1153
 
    
1154
 
    /* check parameters. All must be specified! */
1155
 
    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1156
 
 
1157
 
    /* Check if key is closing. */
1158
 
    if (IS_CLOSING(key))
1159
 
        return PJ_ECANCELLED;
1160
 
 
1161
 
    /* Check if socket has not been marked for connecting */
1162
 
    if (key->connecting != 0)
1163
 
        return PJ_EPENDING;
1164
 
    
1165
 
    status = pj_sock_connect(key->fd, addr, addrlen);
1166
 
    if (status == PJ_SUCCESS) {
1167
 
        /* Connected! */
1168
 
        return PJ_SUCCESS;
1169
 
    } else {
1170
 
        if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1171
 
            /* Pending! */
1172
 
            pj_mutex_lock(key->mutex);
1173
 
            /* Check again. Handle may have been closed after the previous 
1174
 
             * check in multithreaded app. See #913
1175
 
             */
1176
 
            if (IS_CLOSING(key)) {
1177
 
                pj_mutex_unlock(key->mutex);
1178
 
                return PJ_ECANCELLED;
1179
 
            }
1180
 
            key->connecting = PJ_TRUE;
1181
 
            ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1182
 
            ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
1183
 
            pj_mutex_unlock(key->mutex);
1184
 
            return PJ_EPENDING;
1185
 
        } else {
1186
 
            /* Error! */
1187
 
            return status;
1188
 
        }
1189
 
    }
1190
 
}
1191
 
#endif  /* PJ_HAS_TCP */
1192
 
 
1193
 
 
1194
 
PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1195
 
                                     pj_size_t size )
1196
 
{
1197
 
    pj_bzero(op_key, size);
1198
 
}
1199
 
 
1200
 
 
1201
 
/*
1202
 
 * pj_ioqueue_is_pending()
1203
 
 */
1204
 
PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1205
 
                                         pj_ioqueue_op_key_t *op_key )
1206
 
{
1207
 
    struct generic_operation *op_rec;
1208
 
 
1209
 
    PJ_UNUSED_ARG(key);
1210
 
 
1211
 
    op_rec = (struct generic_operation*)op_key;
1212
 
    return op_rec->op != 0;
1213
 
}
1214
 
 
1215
 
 
1216
 
/*
1217
 
 * pj_ioqueue_post_completion()
1218
 
 */
1219
 
PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1220
 
                                                pj_ioqueue_op_key_t *op_key,
1221
 
                                                pj_ssize_t bytes_status )
1222
 
{
1223
 
    struct generic_operation *op_rec;
1224
 
 
1225
 
    /*
1226
 
     * Find the operation key in all pending operation list to
1227
 
     * really make sure that it's still there; then call the callback.
1228
 
     */
1229
 
    pj_mutex_lock(key->mutex);
1230
 
 
1231
 
    /* Find the operation in the pending read list. */
1232
 
    op_rec = (struct generic_operation*)key->read_list.next;
1233
 
    while (op_rec != (void*)&key->read_list) {
1234
 
        if (op_rec == (void*)op_key) {
1235
 
            pj_list_erase(op_rec);
1236
 
            op_rec->op = PJ_IOQUEUE_OP_NONE;
1237
 
            pj_mutex_unlock(key->mutex);
1238
 
 
1239
 
            (*key->cb.on_read_complete)(key, op_key, bytes_status);
1240
 
            return PJ_SUCCESS;
1241
 
        }
1242
 
        op_rec = op_rec->next;
1243
 
    }
1244
 
 
1245
 
    /* Find the operation in the pending write list. */
1246
 
    op_rec = (struct generic_operation*)key->write_list.next;
1247
 
    while (op_rec != (void*)&key->write_list) {
1248
 
        if (op_rec == (void*)op_key) {
1249
 
            pj_list_erase(op_rec);
1250
 
            op_rec->op = PJ_IOQUEUE_OP_NONE;
1251
 
            pj_mutex_unlock(key->mutex);
1252
 
 
1253
 
            (*key->cb.on_write_complete)(key, op_key, bytes_status);
1254
 
            return PJ_SUCCESS;
1255
 
        }
1256
 
        op_rec = op_rec->next;
1257
 
    }
1258
 
 
1259
 
    /* Find the operation in the pending accept list. */
1260
 
    op_rec = (struct generic_operation*)key->accept_list.next;
1261
 
    while (op_rec != (void*)&key->accept_list) {
1262
 
        if (op_rec == (void*)op_key) {
1263
 
            pj_list_erase(op_rec);
1264
 
            op_rec->op = PJ_IOQUEUE_OP_NONE;
1265
 
            pj_mutex_unlock(key->mutex);
1266
 
 
1267
 
            (*key->cb.on_accept_complete)(key, op_key, 
1268
 
                                          PJ_INVALID_SOCKET,
1269
 
                                          bytes_status);
1270
 
            return PJ_SUCCESS;
1271
 
        }
1272
 
        op_rec = op_rec->next;
1273
 
    }
1274
 
 
1275
 
    pj_mutex_unlock(key->mutex);
1276
 
    
1277
 
    return PJ_EINVALIDOP;
1278
 
}
1279
 
 
1280
 
PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
1281
 
                                                        pj_bool_t allow)
1282
 
{
1283
 
    PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
1284
 
    ioqueue->default_concurrency = allow;
1285
 
    return PJ_SUCCESS;
1286
 
}
1287
 
 
1288
 
 
1289
 
PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1290
 
                                               pj_bool_t allow)
1291
 
{
1292
 
    PJ_ASSERT_RETURN(key, PJ_EINVAL);
1293
 
 
1294
 
    /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1295
 
     * disabled.
1296
 
     */
1297
 
    PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1298
 
 
1299
 
    key->allow_concurrent = allow;
1300
 
    return PJ_SUCCESS;
1301
 
}
1302
 
 
1303
 
PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1304
 
{
1305
 
    return pj_mutex_lock(key->mutex);
1306
 
}
1307
 
 
1308
 
PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1309
 
{
1310
 
    return pj_mutex_unlock(key->mutex);
1311
 
}
1312