~ubuntu-branches/ubuntu/wily/sflphone/wily

« back to all changes in this revision

Viewing changes to daemon/libs/pjproject-2.1.0/pjlib/src/pj/ioqueue_common_abs.c

  • Committer: Package Import Robot
  • Author(s): Jonathan Riddell
  • Date: 2015-01-07 14:51:16 UTC
  • mfrom: (4.3.5 sid)
  • Revision ID: package-import@ubuntu.com-20150107145116-yxnafinf4lrdvrmx
Tags: 1.4.1-0.1ubuntu1
* Merge with Debian, remaining changes:
 - Drop soprano, nepomuk build-dep
* Drop ubuntu patches, now upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* $Id: ioqueue_common_abs.c 4359 2013-02-21 11:18:36Z bennylp $ */
2
 
/* 
3
 
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
 
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5
 
 *
6
 
 * This program is free software; you can redistribute it and/or modify
7
 
 * it under the terms of the GNU General Public License as published by
8
 
 * the Free Software Foundation; either version 2 of the License, or
9
 
 * (at your option) any later version.
10
 
 *
11
 
 * This program is distributed in the hope that it will be useful,
12
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 * GNU General Public License for more details.
15
 
 *
16
 
 * You should have received a copy of the GNU General Public License
17
 
 * along with this program; if not, write to the Free Software
18
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
19
 
 */
20
 
 
21
 
/*
22
 
 * ioqueue_common_abs.c
23
 
 *
24
 
 * This contains common functionalities to emulate proactor pattern with
25
 
 * various event dispatching mechanisms (e.g. select, epoll).
26
 
 *
27
 
 * This file will be included by the appropriate ioqueue implementation.
28
 
 * This file is NOT supposed to be compiled as stand-alone source.
29
 
 */
30
 
 
31
 
#define PENDING_RETRY   2
32
 
 
33
 
static void ioqueue_init( pj_ioqueue_t *ioqueue )
34
 
{
35
 
    ioqueue->lock = NULL;
36
 
    ioqueue->auto_delete_lock = 0;
37
 
    ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
38
 
}
39
 
 
40
 
static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
41
 
{
42
 
    if (ioqueue->auto_delete_lock && ioqueue->lock ) {
43
 
        pj_lock_release(ioqueue->lock);
44
 
        return pj_lock_destroy(ioqueue->lock);
45
 
    }
46
 
    
47
 
    return PJ_SUCCESS;
48
 
}
49
 
 
50
 
/*
51
 
 * pj_ioqueue_set_lock()
52
 
 */
53
 
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, 
54
 
                                         pj_lock_t *lock,
55
 
                                         pj_bool_t auto_delete )
56
 
{
57
 
    PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
58
 
 
59
 
    if (ioqueue->auto_delete_lock && ioqueue->lock) {
60
 
        pj_lock_destroy(ioqueue->lock);
61
 
    }
62
 
 
63
 
    ioqueue->lock = lock;
64
 
    ioqueue->auto_delete_lock = auto_delete;
65
 
 
66
 
    return PJ_SUCCESS;
67
 
}
68
 
 
69
 
static pj_status_t ioqueue_init_key( pj_pool_t *pool,
70
 
                                     pj_ioqueue_t *ioqueue,
71
 
                                     pj_ioqueue_key_t *key,
72
 
                                     pj_sock_t sock,
73
 
                                     pj_grp_lock_t *grp_lock,
74
 
                                     void *user_data,
75
 
                                     const pj_ioqueue_callback *cb)
76
 
{
77
 
    pj_status_t rc;
78
 
    int optlen;
79
 
 
80
 
    PJ_UNUSED_ARG(pool);
81
 
 
82
 
    key->ioqueue = ioqueue;
83
 
    key->fd = sock;
84
 
    key->user_data = user_data;
85
 
    pj_list_init(&key->read_list);
86
 
    pj_list_init(&key->write_list);
87
 
#if PJ_HAS_TCP
88
 
    pj_list_init(&key->accept_list);
89
 
    key->connecting = 0;
90
 
#endif
91
 
 
92
 
    /* Save callback. */
93
 
    pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
94
 
 
95
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
96
 
    /* Set initial reference count to 1 */
97
 
    pj_assert(key->ref_count == 0);
98
 
    ++key->ref_count;
99
 
 
100
 
    key->closing = 0;
101
 
#endif
102
 
 
103
 
    rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
104
 
    if (rc != PJ_SUCCESS)
105
 
        return rc;
106
 
 
107
 
    /* Get socket type. When socket type is datagram, some optimization
108
 
     * will be performed during send to allow parallel send operations.
109
 
     */
110
 
    optlen = sizeof(key->fd_type);
111
 
    rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
112
 
                            &key->fd_type, &optlen);
113
 
    if (rc != PJ_SUCCESS)
114
 
        key->fd_type = pj_SOCK_STREAM();
115
 
 
116
 
    /* Create mutex for the key. */
117
 
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
118
 
    rc = pj_lock_create_simple_mutex(poll, NULL, &key->lock);
119
 
#endif
120
 
    if (rc != PJ_SUCCESS)
121
 
        return rc;
122
 
 
123
 
    /* Group lock */
124
 
    key->grp_lock = grp_lock;
125
 
    if (key->grp_lock) {
126
 
        pj_grp_lock_add_ref_dbg(key->grp_lock, "ioqueue", 0);
127
 
    }
128
 
    
129
 
    return PJ_SUCCESS;
130
 
}
131
 
 
132
 
/*
133
 
 * pj_ioqueue_get_user_data()
134
 
 *
135
 
 * Obtain value associated with a key.
136
 
 */
137
 
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
138
 
{
139
 
    PJ_ASSERT_RETURN(key != NULL, NULL);
140
 
    return key->user_data;
141
 
}
142
 
 
143
 
/*
144
 
 * pj_ioqueue_set_user_data()
145
 
 */
146
 
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
147
 
                                              void *user_data,
148
 
                                              void **old_data)
149
 
{
150
 
    PJ_ASSERT_RETURN(key, PJ_EINVAL);
151
 
 
152
 
    if (old_data)
153
 
        *old_data = key->user_data;
154
 
    key->user_data = user_data;
155
 
 
156
 
    return PJ_SUCCESS;
157
 
}
158
 
 
159
 
PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
160
 
{
161
 
    return !pj_list_empty(&key->write_list);
162
 
}
163
 
 
164
 
PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
165
 
{
166
 
    return !pj_list_empty(&key->read_list);
167
 
}
168
 
 
169
 
PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
170
 
{
171
 
#if PJ_HAS_TCP
172
 
    return !pj_list_empty(&key->accept_list);
173
 
#else
174
 
    PJ_UNUSED_ARG(key);
175
 
    return 0;
176
 
#endif
177
 
}
178
 
 
179
 
PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
180
 
{
181
 
    return key->connecting;
182
 
}
183
 
 
184
 
 
185
 
#if PJ_IOQUEUE_HAS_SAFE_UNREG
186
 
#   define IS_CLOSING(key)  (key->closing)
187
 
#else
188
 
#   define IS_CLOSING(key)  (0)
189
 
#endif
190
 
 
191
 
 
192
 
/*
193
 
 * ioqueue_dispatch_event()
194
 
 *
195
 
 * Report occurence of an event in the key to be processed by the
196
 
 * framework.
197
 
 */
198
 
void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
199
 
{
200
 
    /* Lock the key. */
201
 
    pj_ioqueue_lock_key(h);
202
 
 
203
 
    if (IS_CLOSING(h)) {
204
 
        pj_ioqueue_unlock_key(h);
205
 
        return;
206
 
    }
207
 
 
208
 
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
209
 
    if (h->connecting) {
210
 
        /* Completion of connect() operation */
211
 
        pj_status_t status;
212
 
        pj_bool_t has_lock;
213
 
 
214
 
        /* Clear operation. */
215
 
        h->connecting = 0;
216
 
 
217
 
        ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
218
 
        ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
219
 
 
220
 
 
221
 
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
222
 
        /* from connect(2): 
223
 
         * On Linux, use getsockopt to read the SO_ERROR option at
224
 
         * level SOL_SOCKET to determine whether connect() completed
225
 
         * successfully (if SO_ERROR is zero).
226
 
         */
227
 
        {
228
 
          int value;
229
 
          int vallen = sizeof(value);
230
 
          int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 
231
 
                                         &value, &vallen);
232
 
          if (gs_rc != 0) {
233
 
            /* Argh!! What to do now??? 
234
 
             * Just indicate that the socket is connected. The
235
 
             * application will get error as soon as it tries to use
236
 
             * the socket to send/receive.
237
 
             */
238
 
              status = PJ_SUCCESS;
239
 
          } else {
240
 
              status = PJ_STATUS_FROM_OS(value);
241
 
          }
242
 
        }
243
 
#elif defined(PJ_WIN32) && PJ_WIN32!=0
244
 
        status = PJ_SUCCESS; /* success */
245
 
#else
246
 
        /* Excellent information in D.J. Bernstein page:
247
 
         * http://cr.yp.to/docs/connect.html
248
 
         *
249
 
         * Seems like the most portable way of detecting connect()
250
 
         * failure is to call getpeername(). If socket is connected,
251
 
         * getpeername() will return 0. If the socket is not connected,
252
 
         * it will return ENOTCONN, and read(fd, &ch, 1) will produce
253
 
         * the right errno through error slippage. This is a combination
254
 
         * of suggestions from Douglas C. Schmidt and Ken Keys.
255
 
         */
256
 
        {
257
 
            struct sockaddr_in addr;
258
 
            int addrlen = sizeof(addr);
259
 
 
260
 
            status = pj_sock_getpeername(h->fd, (struct sockaddr*)&addr,
261
 
                                         &addrlen);
262
 
        }
263
 
#endif
264
 
 
265
 
        /* Unlock; from this point we don't need to hold key's mutex
266
 
         * (unless concurrency is disabled, which in this case we should
267
 
         * hold the mutex while calling the callback) */
268
 
        if (h->allow_concurrent) {
269
 
            /* concurrency may be changed while we're in the callback, so
270
 
             * save it to a flag.
271
 
             */
272
 
            has_lock = PJ_FALSE;
273
 
            pj_ioqueue_unlock_key(h);
274
 
        } else {
275
 
            has_lock = PJ_TRUE;
276
 
        }
277
 
 
278
 
        /* Call callback. */
279
 
        if (h->cb.on_connect_complete && !IS_CLOSING(h))
280
 
            (*h->cb.on_connect_complete)(h, status);
281
 
 
282
 
        /* Unlock if we still hold the lock */
283
 
        if (has_lock) {
284
 
            pj_ioqueue_unlock_key(h);
285
 
        }
286
 
 
287
 
        /* Done. */
288
 
 
289
 
    } else 
290
 
#endif /* PJ_HAS_TCP */
291
 
    if (key_has_pending_write(h)) {
292
 
        /* Socket is writable. */
293
 
        struct write_operation *write_op;
294
 
        pj_ssize_t sent;
295
 
        pj_status_t send_rc = PJ_SUCCESS;
296
 
 
297
 
        /* Get the first in the queue. */
298
 
        write_op = h->write_list.next;
299
 
 
300
 
        /* For datagrams, we can remove the write_op from the list
301
 
         * so that send() can work in parallel.
302
 
         */
303
 
        if (h->fd_type == pj_SOCK_DGRAM()) {
304
 
            pj_list_erase(write_op);
305
 
 
306
 
            if (pj_list_empty(&h->write_list))
307
 
                ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
308
 
 
309
 
        }
310
 
 
311
 
        /* Send the data. 
312
 
         * Unfortunately we must do this while holding key's mutex, thus
313
 
         * preventing parallel write on a single key.. :-((
314
 
         */
315
 
        sent = write_op->size - write_op->written;
316
 
        if (write_op->op == PJ_IOQUEUE_OP_SEND) {
317
 
            send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
318
 
                                   &sent, write_op->flags);
319
 
            /* Can't do this. We only clear "op" after we're finished sending
320
 
             * the whole buffer.
321
 
             */
322
 
            //write_op->op = 0;
323
 
        } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
324
 
            int retry = 2;
325
 
            while (--retry >= 0) {
326
 
                send_rc = pj_sock_sendto(h->fd, 
327
 
                                         write_op->buf+write_op->written,
328
 
                                         &sent, write_op->flags,
329
 
                                         &write_op->rmt_addr, 
330
 
                                         write_op->rmt_addrlen);
331
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
332
 
            PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
333
 
                /* Special treatment for dead UDP sockets here, see ticket #1107 */
334
 
                if (send_rc==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(h) &&
335
 
                    h->fd_type==pj_SOCK_DGRAM())
336
 
                {
337
 
                    PJ_PERROR(4,(THIS_FILE, send_rc,
338
 
                                 "Send error for socket %d, retrying",
339
 
                                 h->fd));
340
 
                    replace_udp_sock(h);
341
 
                    continue;
342
 
                }
343
 
#endif
344
 
                break;
345
 
            }
346
 
 
347
 
            /* Can't do this. We only clear "op" after we're finished sending
348
 
             * the whole buffer.
349
 
             */
350
 
            //write_op->op = 0;
351
 
        } else {
352
 
            pj_assert(!"Invalid operation type!");
353
 
            write_op->op = PJ_IOQUEUE_OP_NONE;
354
 
            send_rc = PJ_EBUG;
355
 
        }
356
 
 
357
 
        if (send_rc == PJ_SUCCESS) {
358
 
            write_op->written += sent;
359
 
        } else {
360
 
            pj_assert(send_rc > 0);
361
 
            write_op->written = -send_rc;
362
 
        }
363
 
 
364
 
        /* Are we finished with this buffer? */
365
 
        if (send_rc!=PJ_SUCCESS || 
366
 
            write_op->written == (pj_ssize_t)write_op->size ||
367
 
            h->fd_type == pj_SOCK_DGRAM()) 
368
 
        {
369
 
            pj_bool_t has_lock;
370
 
 
371
 
            write_op->op = PJ_IOQUEUE_OP_NONE;
372
 
 
373
 
            if (h->fd_type != pj_SOCK_DGRAM()) {
374
 
                /* Write completion of the whole stream. */
375
 
                pj_list_erase(write_op);
376
 
 
377
 
                /* Clear operation if there's no more data to send. */
378
 
                if (pj_list_empty(&h->write_list))
379
 
                    ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
380
 
 
381
 
            }
382
 
 
383
 
            /* Unlock; from this point we don't need to hold key's mutex
384
 
             * (unless concurrency is disabled, which in this case we should
385
 
             * hold the mutex while calling the callback) */
386
 
            if (h->allow_concurrent) {
387
 
                /* concurrency may be changed while we're in the callback, so
388
 
                 * save it to a flag.
389
 
                 */
390
 
                has_lock = PJ_FALSE;
391
 
                pj_ioqueue_unlock_key(h);
392
 
                PJ_RACE_ME(5);
393
 
            } else {
394
 
                has_lock = PJ_TRUE;
395
 
            }
396
 
 
397
 
            /* Call callback. */
398
 
            if (h->cb.on_write_complete && !IS_CLOSING(h)) {
399
 
                (*h->cb.on_write_complete)(h, 
400
 
                                           (pj_ioqueue_op_key_t*)write_op,
401
 
                                           write_op->written);
402
 
            }
403
 
 
404
 
            if (has_lock) {
405
 
                pj_ioqueue_unlock_key(h);
406
 
            }
407
 
 
408
 
        } else {
409
 
            pj_ioqueue_unlock_key(h);
410
 
        }
411
 
 
412
 
        /* Done. */
413
 
    } else {
414
 
        /*
415
 
         * This is normal; execution may fall here when multiple threads
416
 
         * are signalled for the same event, but only one thread eventually
417
 
         * able to process the event.
418
 
         */
419
 
        pj_ioqueue_unlock_key(h);
420
 
    }
421
 
}
422
 
 
423
 
void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
424
 
{
425
 
    pj_status_t rc;
426
 
 
427
 
    /* Lock the key. */
428
 
    pj_ioqueue_lock_key(h);
429
 
 
430
 
    if (IS_CLOSING(h)) {
431
 
        pj_ioqueue_unlock_key(h);
432
 
        return;
433
 
    }
434
 
 
435
 
#   if PJ_HAS_TCP
436
 
    if (!pj_list_empty(&h->accept_list)) {
437
 
 
438
 
        struct accept_operation *accept_op;
439
 
        pj_bool_t has_lock;
440
 
        
441
 
        /* Get one accept operation from the list. */
442
 
        accept_op = h->accept_list.next;
443
 
        pj_list_erase(accept_op);
444
 
        accept_op->op = PJ_IOQUEUE_OP_NONE;
445
 
 
446
 
        /* Clear bit in fdset if there is no more pending accept */
447
 
        if (pj_list_empty(&h->accept_list))
448
 
            ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
449
 
 
450
 
        rc=pj_sock_accept(h->fd, accept_op->accept_fd, 
451
 
                          accept_op->rmt_addr, accept_op->addrlen);
452
 
        if (rc==PJ_SUCCESS && accept_op->local_addr) {
453
 
            rc = pj_sock_getsockname(*accept_op->accept_fd, 
454
 
                                     accept_op->local_addr,
455
 
                                     accept_op->addrlen);
456
 
        }
457
 
 
458
 
        /* Unlock; from this point we don't need to hold key's mutex
459
 
         * (unless concurrency is disabled, which in this case we should
460
 
         * hold the mutex while calling the callback) */
461
 
        if (h->allow_concurrent) {
462
 
            /* concurrency may be changed while we're in the callback, so
463
 
             * save it to a flag.
464
 
             */
465
 
            has_lock = PJ_FALSE;
466
 
            pj_ioqueue_unlock_key(h);
467
 
            PJ_RACE_ME(5);
468
 
        } else {
469
 
            has_lock = PJ_TRUE;
470
 
        }
471
 
 
472
 
        /* Call callback. */
473
 
        if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
474
 
            (*h->cb.on_accept_complete)(h, 
475
 
                                        (pj_ioqueue_op_key_t*)accept_op,
476
 
                                        *accept_op->accept_fd, rc);
477
 
        }
478
 
 
479
 
        if (has_lock) {
480
 
            pj_ioqueue_unlock_key(h);
481
 
        }
482
 
    }
483
 
    else
484
 
#   endif
485
 
    if (key_has_pending_read(h)) {
486
 
        struct read_operation *read_op;
487
 
        pj_ssize_t bytes_read;
488
 
        pj_bool_t has_lock;
489
 
 
490
 
        /* Get one pending read operation from the list. */
491
 
        read_op = h->read_list.next;
492
 
        pj_list_erase(read_op);
493
 
 
494
 
        /* Clear fdset if there is no pending read. */
495
 
        if (pj_list_empty(&h->read_list))
496
 
            ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
497
 
 
498
 
        bytes_read = read_op->size;
499
 
 
500
 
        if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
501
 
            read_op->op = PJ_IOQUEUE_OP_NONE;
502
 
            rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 
503
 
                                  read_op->flags,
504
 
                                  read_op->rmt_addr, 
505
 
                                  read_op->rmt_addrlen);
506
 
        } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
507
 
            read_op->op = PJ_IOQUEUE_OP_NONE;
508
 
            rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 
509
 
                              read_op->flags);
510
 
        } else {
511
 
            pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
512
 
            read_op->op = PJ_IOQUEUE_OP_NONE;
513
 
            /*
514
 
             * User has specified pj_ioqueue_read().
515
 
             * On Win32, we should do ReadFile(). But because we got
516
 
             * here because of select() anyway, user must have put a
517
 
             * socket descriptor on h->fd, which in this case we can
518
 
             * just call pj_sock_recv() instead of ReadFile().
519
 
             * On Unix, user may put a file in h->fd, so we'll have
520
 
             * to call read() here.
521
 
             * This may not compile on systems which doesn't have 
522
 
             * read(). That's why we only specify PJ_LINUX here so
523
 
             * that error is easier to catch.
524
 
             */
525
 
#           if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
526
 
               defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
527
 
                rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 
528
 
                                  read_op->flags);
529
 
                //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
530
 
                //              &bytes_read, NULL);
531
 
#           elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
532
 
                bytes_read = read(h->fd, read_op->buf, bytes_read);
533
 
                rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
534
 
#           elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
535
 
                bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
536
 
                rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
537
 
#           else
538
 
#               error "Implement read() for this platform!"
539
 
#           endif
540
 
        }
541
 
        
542
 
        if (rc != PJ_SUCCESS) {
543
 
#           if defined(PJ_WIN32) && PJ_WIN32 != 0
544
 
            /* On Win32, for UDP, WSAECONNRESET on the receive side 
545
 
             * indicates that previous sending has triggered ICMP Port 
546
 
             * Unreachable message.
547
 
             * But we wouldn't know at this point which one of previous 
548
 
             * key that has triggered the error, since UDP socket can
549
 
             * be shared!
550
 
             * So we'll just ignore it!
551
 
             */
552
 
 
553
 
            if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
554
 
                //PJ_LOG(4,(THIS_FILE, 
555
 
                //          "Ignored ICMP port unreach. on key=%p", h));
556
 
            }
557
 
#           endif
558
 
 
559
 
            /* In any case we would report this to caller. */
560
 
            bytes_read = -rc;
561
 
 
562
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
563
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
564
 
            /* Special treatment for dead UDP sockets here, see ticket #1107 */
565
 
            if (rc == PJ_STATUS_FROM_OS(ENOTCONN) && !IS_CLOSING(h) &&
566
 
                h->fd_type==pj_SOCK_DGRAM())
567
 
            {
568
 
                replace_udp_sock(h);
569
 
            }
570
 
#endif
571
 
        }
572
 
 
573
 
        /* Unlock; from this point we don't need to hold key's mutex
574
 
         * (unless concurrency is disabled, which in this case we should
575
 
         * hold the mutex while calling the callback) */
576
 
        if (h->allow_concurrent) {
577
 
            /* concurrency may be changed while we're in the callback, so
578
 
             * save it to a flag.
579
 
             */
580
 
            has_lock = PJ_FALSE;
581
 
            pj_ioqueue_unlock_key(h);
582
 
            PJ_RACE_ME(5);
583
 
        } else {
584
 
            has_lock = PJ_TRUE;
585
 
        }
586
 
 
587
 
        /* Call callback. */
588
 
        if (h->cb.on_read_complete && !IS_CLOSING(h)) {
589
 
            (*h->cb.on_read_complete)(h, 
590
 
                                      (pj_ioqueue_op_key_t*)read_op,
591
 
                                      bytes_read);
592
 
        }
593
 
 
594
 
        if (has_lock) {
595
 
            pj_ioqueue_unlock_key(h);
596
 
        }
597
 
 
598
 
    } else {
599
 
        /*
600
 
         * This is normal; execution may fall here when multiple threads
601
 
         * are signalled for the same event, but only one thread eventually
602
 
         * able to process the event.
603
 
         */
604
 
        pj_ioqueue_unlock_key(h);
605
 
    }
606
 
}
607
 
 
608
 
 
609
 
void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, 
610
 
                                       pj_ioqueue_key_t *h )
611
 
{
612
 
    pj_bool_t has_lock;
613
 
 
614
 
    pj_ioqueue_lock_key(h);
615
 
 
616
 
    if (!h->connecting) {
617
 
        /* It is possible that more than one thread was woken up, thus
618
 
         * the remaining thread will see h->connecting as zero because
619
 
         * it has been processed by other thread.
620
 
         */
621
 
        pj_ioqueue_unlock_key(h);
622
 
        return;
623
 
    }
624
 
 
625
 
    if (IS_CLOSING(h)) {
626
 
        pj_ioqueue_unlock_key(h);
627
 
        return;
628
 
    }
629
 
 
630
 
    /* Clear operation. */
631
 
    h->connecting = 0;
632
 
 
633
 
    ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
634
 
    ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
635
 
 
636
 
    /* Unlock; from this point we don't need to hold key's mutex
637
 
     * (unless concurrency is disabled, which in this case we should
638
 
     * hold the mutex while calling the callback) */
639
 
    if (h->allow_concurrent) {
640
 
        /* concurrency may be changed while we're in the callback, so
641
 
         * save it to a flag.
642
 
         */
643
 
        has_lock = PJ_FALSE;
644
 
        pj_ioqueue_unlock_key(h);
645
 
        PJ_RACE_ME(5);
646
 
    } else {
647
 
        has_lock = PJ_TRUE;
648
 
    }
649
 
 
650
 
    /* Call callback. */
651
 
    if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
652
 
        pj_status_t status = -1;
653
 
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
654
 
        int value;
655
 
        int vallen = sizeof(value);
656
 
        int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 
657
 
                                       &value, &vallen);
658
 
        if (gs_rc == 0) {
659
 
            status = PJ_RETURN_OS_ERROR(value);
660
 
        }
661
 
#endif
662
 
 
663
 
        (*h->cb.on_connect_complete)(h, status);
664
 
    }
665
 
 
666
 
    if (has_lock) {
667
 
        pj_ioqueue_unlock_key(h);
668
 
    }
669
 
}
670
 
 
671
 
/*
672
 
 * pj_ioqueue_recv()
673
 
 *
674
 
 * Start asynchronous recv() from the socket.
675
 
 */
676
 
PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key,
677
 
                                      pj_ioqueue_op_key_t *op_key,
678
 
                                      void *buffer,
679
 
                                      pj_ssize_t *length,
680
 
                                      unsigned flags )
681
 
{
682
 
    struct read_operation *read_op;
683
 
 
684
 
    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
685
 
    PJ_CHECK_STACK();
686
 
 
687
 
    /* Check if key is closing (need to do this first before accessing
688
 
     * other variables, since they might have been destroyed. See ticket
689
 
     * #469).
690
 
     */
691
 
    if (IS_CLOSING(key))
692
 
        return PJ_ECANCELLED;
693
 
 
694
 
    read_op = (struct read_operation*)op_key;
695
 
    read_op->op = PJ_IOQUEUE_OP_NONE;
696
 
 
697
 
    /* Try to see if there's data immediately available. 
698
 
     */
699
 
    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
700
 
        pj_status_t status;
701
 
        pj_ssize_t size;
702
 
 
703
 
        size = *length;
704
 
        status = pj_sock_recv(key->fd, buffer, &size, flags);
705
 
        if (status == PJ_SUCCESS) {
706
 
            /* Yes! Data is available! */
707
 
            *length = size;
708
 
            return PJ_SUCCESS;
709
 
        } else {
710
 
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
711
 
             * the error to caller.
712
 
             */
713
 
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
714
 
                return status;
715
 
        }
716
 
    }
717
 
 
718
 
    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
719
 
 
720
 
    /*
721
 
     * No data is immediately available.
722
 
     * Must schedule asynchronous operation to the ioqueue.
723
 
     */
724
 
    read_op->op = PJ_IOQUEUE_OP_RECV;
725
 
    read_op->buf = buffer;
726
 
    read_op->size = *length;
727
 
    read_op->flags = flags;
728
 
 
729
 
    pj_ioqueue_lock_key(key);
730
 
    /* Check again. Handle may have been closed after the previous check
731
 
     * in multithreaded app. If we add bad handle to the set it will
732
 
     * corrupt the ioqueue set. See #913
733
 
     */
734
 
    if (IS_CLOSING(key)) {
735
 
        pj_ioqueue_unlock_key(key);
736
 
        return PJ_ECANCELLED;
737
 
    }
738
 
    pj_list_insert_before(&key->read_list, read_op);
739
 
    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
740
 
    pj_ioqueue_unlock_key(key);
741
 
 
742
 
    return PJ_EPENDING;
743
 
}
744
 
 
745
 
/*
746
 
 * pj_ioqueue_recvfrom()
747
 
 *
748
 
 * Start asynchronous recvfrom() from the socket.
749
 
 */
750
 
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
751
 
                                         pj_ioqueue_op_key_t *op_key,
752
 
                                         void *buffer,
753
 
                                         pj_ssize_t *length,
754
 
                                         unsigned flags,
755
 
                                         pj_sockaddr_t *addr,
756
 
                                         int *addrlen)
757
 
{
758
 
    struct read_operation *read_op;
759
 
 
760
 
    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
761
 
    PJ_CHECK_STACK();
762
 
 
763
 
    /* Check if key is closing. */
764
 
    if (IS_CLOSING(key))
765
 
        return PJ_ECANCELLED;
766
 
 
767
 
    read_op = (struct read_operation*)op_key;
768
 
    read_op->op = PJ_IOQUEUE_OP_NONE;
769
 
 
770
 
    /* Try to see if there's data immediately available. 
771
 
     */
772
 
    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
773
 
        pj_status_t status;
774
 
        pj_ssize_t size;
775
 
 
776
 
        size = *length;
777
 
        status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
778
 
                                  addr, addrlen);
779
 
        if (status == PJ_SUCCESS) {
780
 
            /* Yes! Data is available! */
781
 
            *length = size;
782
 
            return PJ_SUCCESS;
783
 
        } else {
784
 
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
785
 
             * the error to caller.
786
 
             */
787
 
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
788
 
                return status;
789
 
        }
790
 
    }
791
 
 
792
 
    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
793
 
 
794
 
    /*
795
 
     * No data is immediately available.
796
 
     * Must schedule asynchronous operation to the ioqueue.
797
 
     */
798
 
    read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
799
 
    read_op->buf = buffer;
800
 
    read_op->size = *length;
801
 
    read_op->flags = flags;
802
 
    read_op->rmt_addr = addr;
803
 
    read_op->rmt_addrlen = addrlen;
804
 
 
805
 
    pj_ioqueue_lock_key(key);
806
 
    /* Check again. Handle may have been closed after the previous check
807
 
     * in multithreaded app. If we add bad handle to the set it will
808
 
     * corrupt the ioqueue set. See #913
809
 
     */
810
 
    if (IS_CLOSING(key)) {
811
 
        pj_ioqueue_unlock_key(key);
812
 
        return PJ_ECANCELLED;
813
 
    }
814
 
    pj_list_insert_before(&key->read_list, read_op);
815
 
    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
816
 
    pj_ioqueue_unlock_key(key);
817
 
 
818
 
    return PJ_EPENDING;
819
 
}
820
 
 
821
 
/*
822
 
 * pj_ioqueue_send()
823
 
 *
824
 
 * Start asynchronous send() to the descriptor.
825
 
 */
826
 
PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
827
 
                                     pj_ioqueue_op_key_t *op_key,
828
 
                                     const void *data,
829
 
                                     pj_ssize_t *length,
830
 
                                     unsigned flags)
831
 
{
832
 
    struct write_operation *write_op;
833
 
    pj_status_t status;
834
 
    unsigned retry;
835
 
    pj_ssize_t sent;
836
 
 
837
 
    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
838
 
    PJ_CHECK_STACK();
839
 
 
840
 
    /* Check if key is closing. */
841
 
    if (IS_CLOSING(key))
842
 
        return PJ_ECANCELLED;
843
 
 
844
 
    /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
845
 
    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
846
 
 
847
 
    /* Fast track:
848
 
     *   Try to send data immediately, only if there's no pending write!
849
 
     * Note:
850
 
     *  We are speculating that the list is empty here without properly
851
 
     *  acquiring ioqueue's mutex first. This is intentional, to maximize
852
 
     *  performance via parallelism.
853
 
     *
854
 
     *  This should be safe, because:
855
 
     *      - by convention, we require caller to make sure that the
856
 
     *        key is not unregistered while other threads are invoking
857
 
     *        an operation on the same key.
858
 
     *      - pj_list_empty() is safe to be invoked by multiple threads,
859
 
     *        even when other threads are modifying the list.
860
 
     */
861
 
    if (pj_list_empty(&key->write_list)) {
862
 
        /*
863
 
         * See if data can be sent immediately.
864
 
         */
865
 
        sent = *length;
866
 
        status = pj_sock_send(key->fd, data, &sent, flags);
867
 
        if (status == PJ_SUCCESS) {
868
 
            /* Success! */
869
 
            *length = sent;
870
 
            return PJ_SUCCESS;
871
 
        } else {
872
 
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
873
 
             * the error to caller.
874
 
             */
875
 
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
876
 
                return status;
877
 
            }
878
 
        }
879
 
    }
880
 
 
881
 
    /*
882
 
     * Schedule asynchronous send.
883
 
     */
884
 
    write_op = (struct write_operation*)op_key;
885
 
 
886
 
    /* Spin if write_op has pending operation */
887
 
    for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
888
 
        pj_thread_sleep(0);
889
 
 
890
 
    /* Last chance */
891
 
    if (write_op->op) {
892
 
        /* Unable to send packet because there is already pending write in the
893
 
         * write_op. We could not put the operation into the write_op
894
 
         * because write_op already contains a pending operation! And
895
 
         * we could not send the packet directly with send() either,
896
 
         * because that will break the order of the packet. So we can
897
 
         * only return error here.
898
 
         *
899
 
         * This could happen for example in multithreads program,
900
 
         * where polling is done by one thread, while other threads are doing
901
 
         * the sending only. If the polling thread runs on lower priority
902
 
         * than the sending thread, then it's possible that the pending
903
 
         * write flag is not cleared in-time because clearing is only done
904
 
         * during polling. 
905
 
         *
906
 
         * Aplication should specify multiple write operation keys on
907
 
         * situation like this.
908
 
         */
909
 
        //pj_assert(!"ioqueue: there is pending operation on this key!");
910
 
        return PJ_EBUSY;
911
 
    }
912
 
 
913
 
    write_op->op = PJ_IOQUEUE_OP_SEND;
914
 
    write_op->buf = (char*)data;
915
 
    write_op->size = *length;
916
 
    write_op->written = 0;
917
 
    write_op->flags = flags;
918
 
    
919
 
    pj_ioqueue_lock_key(key);
920
 
    /* Check again. Handle may have been closed after the previous check
921
 
     * in multithreaded app. If we add bad handle to the set it will
922
 
     * corrupt the ioqueue set. See #913
923
 
     */
924
 
    if (IS_CLOSING(key)) {
925
 
        pj_ioqueue_unlock_key(key);
926
 
        return PJ_ECANCELLED;
927
 
    }
928
 
    pj_list_insert_before(&key->write_list, write_op);
929
 
    ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
930
 
    pj_ioqueue_unlock_key(key);
931
 
 
932
 
    return PJ_EPENDING;
933
 
}
934
 
 
935
 
 
936
 
/*
937
 
 * pj_ioqueue_sendto()
938
 
 *
939
 
 * Start asynchronous write() to the descriptor.
940
 
 */
941
 
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
942
 
                                       pj_ioqueue_op_key_t *op_key,
943
 
                                       const void *data,
944
 
                                       pj_ssize_t *length,
945
 
                                       pj_uint32_t flags,
946
 
                                       const pj_sockaddr_t *addr,
947
 
                                       int addrlen)
948
 
{
949
 
    struct write_operation *write_op;
950
 
    unsigned retry;
951
 
    pj_bool_t restart_retry = PJ_FALSE;
952
 
    pj_status_t status;
953
 
    pj_ssize_t sent;
954
 
 
955
 
    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
956
 
    PJ_CHECK_STACK();
957
 
 
958
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
959
 
            PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
960
 
retry_on_restart:
961
 
#else
962
 
    PJ_UNUSED_ARG(restart_retry);
963
 
#endif
964
 
    /* Check if key is closing. */
965
 
    if (IS_CLOSING(key))
966
 
        return PJ_ECANCELLED;
967
 
 
968
 
    /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
969
 
    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
970
 
 
971
 
    /* Fast track:
972
 
     *   Try to send data immediately, only if there's no pending write!
973
 
     * Note:
974
 
     *  We are speculating that the list is empty here without properly
975
 
     *  acquiring ioqueue's mutex first. This is intentional, to maximize
976
 
     *  performance via parallelism.
977
 
     *
978
 
     *  This should be safe, because:
979
 
     *      - by convention, we require caller to make sure that the
980
 
     *        key is not unregistered while other threads are invoking
981
 
     *        an operation on the same key.
982
 
     *      - pj_list_empty() is safe to be invoked by multiple threads,
983
 
     *        even when other threads are modifying the list.
984
 
     */
985
 
    if (pj_list_empty(&key->write_list)) {
986
 
        /*
987
 
         * See if data can be sent immediately.
988
 
         */
989
 
        sent = *length;
990
 
        status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
991
 
        if (status == PJ_SUCCESS) {
992
 
            /* Success! */
993
 
            *length = sent;
994
 
            return PJ_SUCCESS;
995
 
        } else {
996
 
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
997
 
             * the error to caller.
998
 
             */
999
 
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1000
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
1001
 
            PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
1002
 
                /* Special treatment for dead UDP sockets here, see ticket #1107 */
1003
 
                if (status==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(key) &&
1004
 
                    key->fd_type==pj_SOCK_DGRAM() && !restart_retry)
1005
 
                {
1006
 
                    PJ_PERROR(4,(THIS_FILE, status,
1007
 
                                 "Send error for socket %d, retrying",
1008
 
                                 key->fd));
1009
 
                    replace_udp_sock(key);
1010
 
                    restart_retry = PJ_TRUE;
1011
 
                    goto retry_on_restart;
1012
 
                }
1013
 
#endif
1014
 
 
1015
 
                return status;
1016
 
            }
1017
 
            status = status;
1018
 
        }
1019
 
    }
1020
 
 
1021
 
    /*
1022
 
     * Check that address storage can hold the address parameter.
1023
 
     */
1024
 
    PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
1025
 
 
1026
 
    /*
1027
 
     * Schedule asynchronous send.
1028
 
     */
1029
 
    write_op = (struct write_operation*)op_key;
1030
 
    
1031
 
    /* Spin if write_op has pending operation */
1032
 
    for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
1033
 
        pj_thread_sleep(0);
1034
 
 
1035
 
    /* Last chance */
1036
 
    if (write_op->op) {
1037
 
        /* Unable to send packet because there is already pending write on the
1038
 
         * write_op. We could not put the operation into the write_op
1039
 
         * because write_op already contains a pending operation! And
1040
 
         * we could not send the packet directly with sendto() either,
1041
 
         * because that will break the order of the packet. So we can
1042
 
         * only return error here.
1043
 
         *
1044
 
         * This could happen for example in multithreads program,
1045
 
         * where polling is done by one thread, while other threads are doing
1046
 
         * the sending only. If the polling thread runs on lower priority
1047
 
         * than the sending thread, then it's possible that the pending
1048
 
         * write flag is not cleared in-time because clearing is only done
1049
 
         * during polling. 
1050
 
         *
1051
 
         * Aplication should specify multiple write operation keys on
1052
 
         * situation like this.
1053
 
         */
1054
 
        //pj_assert(!"ioqueue: there is pending operation on this key!");
1055
 
        return PJ_EBUSY;
1056
 
    }
1057
 
 
1058
 
    write_op->op = PJ_IOQUEUE_OP_SEND_TO;
1059
 
    write_op->buf = (char*)data;
1060
 
    write_op->size = *length;
1061
 
    write_op->written = 0;
1062
 
    write_op->flags = flags;
1063
 
    pj_memcpy(&write_op->rmt_addr, addr, addrlen);
1064
 
    write_op->rmt_addrlen = addrlen;
1065
 
    
1066
 
    pj_ioqueue_lock_key(key);
1067
 
    /* Check again. Handle may have been closed after the previous check
1068
 
     * in multithreaded app. If we add bad handle to the set it will
1069
 
     * corrupt the ioqueue set. See #913
1070
 
     */
1071
 
    if (IS_CLOSING(key)) {
1072
 
        pj_ioqueue_unlock_key(key);
1073
 
        return PJ_ECANCELLED;
1074
 
    }
1075
 
    pj_list_insert_before(&key->write_list, write_op);
1076
 
    ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1077
 
    pj_ioqueue_unlock_key(key);
1078
 
 
1079
 
    return PJ_EPENDING;
1080
 
}
1081
 
 
1082
 
#if PJ_HAS_TCP
1083
 
/*
1084
 
 * Initiate overlapped accept() operation.
1085
 
 */
1086
 
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1087
 
                                       pj_ioqueue_op_key_t *op_key,
1088
 
                                       pj_sock_t *new_sock,
1089
 
                                       pj_sockaddr_t *local,
1090
 
                                       pj_sockaddr_t *remote,
1091
 
                                       int *addrlen)
1092
 
{
1093
 
    struct accept_operation *accept_op;
1094
 
    pj_status_t status;
1095
 
 
1096
 
    /* check parameters. All must be specified! */
1097
 
    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1098
 
 
1099
 
    /* Check if key is closing. */
1100
 
    if (IS_CLOSING(key))
1101
 
        return PJ_ECANCELLED;
1102
 
 
1103
 
    accept_op = (struct accept_operation*)op_key;
1104
 
    accept_op->op = PJ_IOQUEUE_OP_NONE;
1105
 
 
1106
 
    /* Fast track:
1107
 
     *  See if there's new connection available immediately.
1108
 
     */
1109
 
    if (pj_list_empty(&key->accept_list)) {
1110
 
        status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1111
 
        if (status == PJ_SUCCESS) {
1112
 
            /* Yes! New connection is available! */
1113
 
            if (local && addrlen) {
1114
 
                status = pj_sock_getsockname(*new_sock, local, addrlen);
1115
 
                if (status != PJ_SUCCESS) {
1116
 
                    pj_sock_close(*new_sock);
1117
 
                    *new_sock = PJ_INVALID_SOCKET;
1118
 
                    return status;
1119
 
                }
1120
 
            }
1121
 
            return PJ_SUCCESS;
1122
 
        } else {
1123
 
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1124
 
             * the error to caller.
1125
 
             */
1126
 
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1127
 
                return status;
1128
 
            }
1129
 
        }
1130
 
    }
1131
 
 
1132
 
    /*
1133
 
     * No connection is available immediately.
1134
 
     * Schedule accept() operation to be completed when there is incoming
1135
 
     * connection available.
1136
 
     */
1137
 
    accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1138
 
    accept_op->accept_fd = new_sock;
1139
 
    accept_op->rmt_addr = remote;
1140
 
    accept_op->addrlen= addrlen;
1141
 
    accept_op->local_addr = local;
1142
 
 
1143
 
    pj_ioqueue_lock_key(key);
1144
 
    /* Check again. Handle may have been closed after the previous check
1145
 
     * in multithreaded app. If we add bad handle to the set it will
1146
 
     * corrupt the ioqueue set. See #913
1147
 
     */
1148
 
    if (IS_CLOSING(key)) {
1149
 
        pj_ioqueue_unlock_key(key);
1150
 
        return PJ_ECANCELLED;
1151
 
    }
1152
 
    pj_list_insert_before(&key->accept_list, accept_op);
1153
 
    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
1154
 
    pj_ioqueue_unlock_key(key);
1155
 
 
1156
 
    return PJ_EPENDING;
1157
 
}
1158
 
 
1159
 
/*
1160
 
 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1161
 
 * since there's no overlapped version of connect()).
1162
 
 */
1163
 
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1164
 
                                        const pj_sockaddr_t *addr,
1165
 
                                        int addrlen )
1166
 
{
1167
 
    pj_status_t status;
1168
 
    
1169
 
    /* check parameters. All must be specified! */
1170
 
    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1171
 
 
1172
 
    /* Check if key is closing. */
1173
 
    if (IS_CLOSING(key))
1174
 
        return PJ_ECANCELLED;
1175
 
 
1176
 
    /* Check if socket has not been marked for connecting */
1177
 
    if (key->connecting != 0)
1178
 
        return PJ_EPENDING;
1179
 
    
1180
 
    status = pj_sock_connect(key->fd, addr, addrlen);
1181
 
    if (status == PJ_SUCCESS) {
1182
 
        /* Connected! */
1183
 
        return PJ_SUCCESS;
1184
 
    } else {
1185
 
        if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1186
 
            /* Pending! */
1187
 
            pj_ioqueue_lock_key(key);
1188
 
            /* Check again. Handle may have been closed after the previous 
1189
 
             * check in multithreaded app. See #913
1190
 
             */
1191
 
            if (IS_CLOSING(key)) {
1192
 
                pj_ioqueue_unlock_key(key);
1193
 
                return PJ_ECANCELLED;
1194
 
            }
1195
 
            key->connecting = PJ_TRUE;
1196
 
            ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1197
 
            ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
1198
 
            pj_ioqueue_unlock_key(key);
1199
 
            return PJ_EPENDING;
1200
 
        } else {
1201
 
            /* Error! */
1202
 
            return status;
1203
 
        }
1204
 
    }
1205
 
}
1206
 
#endif  /* PJ_HAS_TCP */
1207
 
 
1208
 
 
1209
 
PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1210
 
                                     pj_size_t size )
1211
 
{
1212
 
    pj_bzero(op_key, size);
1213
 
}
1214
 
 
1215
 
 
1216
 
/*
1217
 
 * pj_ioqueue_is_pending()
1218
 
 */
1219
 
PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1220
 
                                         pj_ioqueue_op_key_t *op_key )
1221
 
{
1222
 
    struct generic_operation *op_rec;
1223
 
 
1224
 
    PJ_UNUSED_ARG(key);
1225
 
 
1226
 
    op_rec = (struct generic_operation*)op_key;
1227
 
    return op_rec->op != 0;
1228
 
}
1229
 
 
1230
 
 
1231
 
/*
1232
 
 * pj_ioqueue_post_completion()
1233
 
 */
1234
 
PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1235
 
                                                pj_ioqueue_op_key_t *op_key,
1236
 
                                                pj_ssize_t bytes_status )
1237
 
{
1238
 
    struct generic_operation *op_rec;
1239
 
 
1240
 
    /*
1241
 
     * Find the operation key in all pending operation list to
1242
 
     * really make sure that it's still there; then call the callback.
1243
 
     */
1244
 
    pj_ioqueue_lock_key(key);
1245
 
 
1246
 
    /* Find the operation in the pending read list. */
1247
 
    op_rec = (struct generic_operation*)key->read_list.next;
1248
 
    while (op_rec != (void*)&key->read_list) {
1249
 
        if (op_rec == (void*)op_key) {
1250
 
            pj_list_erase(op_rec);
1251
 
            op_rec->op = PJ_IOQUEUE_OP_NONE;
1252
 
            pj_ioqueue_unlock_key(key);
1253
 
 
1254
 
            (*key->cb.on_read_complete)(key, op_key, bytes_status);
1255
 
            return PJ_SUCCESS;
1256
 
        }
1257
 
        op_rec = op_rec->next;
1258
 
    }
1259
 
 
1260
 
    /* Find the operation in the pending write list. */
1261
 
    op_rec = (struct generic_operation*)key->write_list.next;
1262
 
    while (op_rec != (void*)&key->write_list) {
1263
 
        if (op_rec == (void*)op_key) {
1264
 
            pj_list_erase(op_rec);
1265
 
            op_rec->op = PJ_IOQUEUE_OP_NONE;
1266
 
            pj_ioqueue_unlock_key(key);
1267
 
 
1268
 
            (*key->cb.on_write_complete)(key, op_key, bytes_status);
1269
 
            return PJ_SUCCESS;
1270
 
        }
1271
 
        op_rec = op_rec->next;
1272
 
    }
1273
 
 
1274
 
    /* Find the operation in the pending accept list. */
1275
 
    op_rec = (struct generic_operation*)key->accept_list.next;
1276
 
    while (op_rec != (void*)&key->accept_list) {
1277
 
        if (op_rec == (void*)op_key) {
1278
 
            pj_list_erase(op_rec);
1279
 
            op_rec->op = PJ_IOQUEUE_OP_NONE;
1280
 
            pj_ioqueue_unlock_key(key);
1281
 
 
1282
 
            (*key->cb.on_accept_complete)(key, op_key, 
1283
 
                                          PJ_INVALID_SOCKET,
1284
 
                                          bytes_status);
1285
 
            return PJ_SUCCESS;
1286
 
        }
1287
 
        op_rec = op_rec->next;
1288
 
    }
1289
 
 
1290
 
    pj_ioqueue_unlock_key(key);
1291
 
    
1292
 
    return PJ_EINVALIDOP;
1293
 
}
1294
 
 
1295
 
PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
1296
 
                                                        pj_bool_t allow)
1297
 
{
1298
 
    PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
1299
 
    ioqueue->default_concurrency = allow;
1300
 
    return PJ_SUCCESS;
1301
 
}
1302
 
 
1303
 
 
1304
 
PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1305
 
                                               pj_bool_t allow)
1306
 
{
1307
 
    PJ_ASSERT_RETURN(key, PJ_EINVAL);
1308
 
 
1309
 
    /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1310
 
     * disabled.
1311
 
     */
1312
 
    PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1313
 
 
1314
 
    key->allow_concurrent = allow;
1315
 
    return PJ_SUCCESS;
1316
 
}
1317
 
 
1318
 
PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1319
 
{
1320
 
    if (key->grp_lock)
1321
 
        return pj_grp_lock_acquire(key->grp_lock);
1322
 
    else
1323
 
        return pj_lock_acquire(key->lock);
1324
 
}
1325
 
 
1326
 
PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1327
 
{
1328
 
    if (key->grp_lock)
1329
 
        return pj_grp_lock_release(key->grp_lock);
1330
 
    else
1331
 
        return pj_lock_release(key->lock);
1332
 
}
1333
 
 
1334