~ubuntu-branches/ubuntu/trusty/sflphone/trusty

« back to all changes in this revision

Viewing changes to daemon/libs/pjproject-2.0.1/pjlib/src/pj/activesock.c

  • Committer: Package Import Robot
  • Author(s): Mark Purcell
  • Date: 2014-01-28 18:23:36 UTC
  • mfrom: (4.3.4 sid)
  • Revision ID: package-import@ubuntu.com-20140128182336-jrsv0k9u6cawc068
Tags: 1.3.0-1
* New upstream release 
  - Fixes "New Upstream Release" (Closes: #735846)
  - Fixes "Ringtone does not stop" (Closes: #727164)
  - Fixes "[sflphone-kde] crash on startup" (Closes: #718178)
  - Fixes "sflphone GUI crashes when call is hung up" (Closes: #736583)
* Build-Depends: ensure GnuTLS 2.6
  - libucommon-dev (>= 6.0.7-1.1), libccrtp-dev (>= 2.0.6-3)
  - Fixes "FTBFS Build-Depends libgnutls{26,28}-dev" (Closes: #722040)
* Fix "boost 1.49 is going away" unversioned Build-Depends: (Closes: #736746)
* Add Build-Depends: libsndfile-dev, nepomuk-core-dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* $Id: activesock.c 3553 2011-05-05 06:14:19Z nanang $ */
2
 
/*
3
 
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
 
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5
 
 *
6
 
 * This program is free software; you can redistribute it and/or modify
7
 
 * it under the terms of the GNU General Public License as published by
8
 
 * the Free Software Foundation; either version 2 of the License, or
9
 
 * (at your option) any later version.
10
 
 *
11
 
 * This program is distributed in the hope that it will be useful,
12
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 * GNU General Public License for more details.
15
 
 *
16
 
 * You should have received a copy of the GNU General Public License
17
 
 * along with this program; if not, write to the Free Software
18
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
 
 */
20
 
#include <pj/activesock.h>
21
 
#include <pj/compat/socket.h>
22
 
#include <pj/assert.h>
23
 
#include <pj/errno.h>
24
 
#include <pj/log.h>
25
 
#include <pj/pool.h>
26
 
#include <pj/sock.h>
27
 
#include <pj/string.h>
28
 
 
29
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
30
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
31
 
#   include <CFNetwork/CFNetwork.h>
32
 
 
33
 
    static pj_bool_t ios_bg_support = PJ_TRUE;
34
 
#endif
35
 
 
36
 
#define PJ_ACTIVESOCK_MAX_LOOP      50
37
 
 
38
 
 
39
 
enum read_type
40
 
{
41
 
    TYPE_NONE,
42
 
    TYPE_RECV,
43
 
    TYPE_RECV_FROM
44
 
};
45
 
 
46
 
struct read_op
47
 
{
48
 
    pj_ioqueue_op_key_t  op_key;
49
 
    pj_uint8_t          *pkt;
50
 
    unsigned             max_size;
51
 
    pj_size_t            size;
52
 
    pj_sockaddr          src_addr;
53
 
    int                  src_addr_len;
54
 
};
55
 
 
56
 
struct accept_op
57
 
{
58
 
    pj_ioqueue_op_key_t  op_key;
59
 
    pj_sock_t            new_sock;
60
 
    pj_sockaddr          rem_addr;
61
 
    int                  rem_addr_len;
62
 
};
63
 
 
64
 
struct send_data
65
 
{
66
 
    pj_uint8_t          *data;
67
 
    pj_ssize_t           len;
68
 
    pj_ssize_t           sent;
69
 
    unsigned             flags;
70
 
};
71
 
 
72
 
struct pj_activesock_t
73
 
{
74
 
    pj_ioqueue_key_t    *key;
75
 
    pj_bool_t            stream_oriented;
76
 
    pj_bool_t            whole_data;
77
 
    pj_ioqueue_t        *ioqueue;
78
 
    void                *user_data;
79
 
    unsigned             async_count;
80
 
    unsigned             max_loop;
81
 
    pj_activesock_cb     cb;
82
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
83
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
84
 
    int                  bg_setting;
85
 
    pj_sock_t            sock;
86
 
    CFReadStreamRef      readStream;
87
 
#endif
88
 
 
89
 
    unsigned             err_counter;
90
 
    pj_status_t          last_err;
91
 
 
92
 
    struct send_data     send_data;
93
 
 
94
 
    struct read_op      *read_op;
95
 
    pj_uint32_t          read_flags;
96
 
    enum read_type       read_type;
97
 
 
98
 
    struct accept_op    *accept_op;
99
 
};
100
 
 
101
 
 
102
 
static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
103
 
                                     pj_ioqueue_op_key_t *op_key,
104
 
                                     pj_ssize_t bytes_read);
105
 
static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
106
 
                                      pj_ioqueue_op_key_t *op_key,
107
 
                                      pj_ssize_t bytes_sent);
108
 
#if PJ_HAS_TCP
109
 
static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
110
 
                                       pj_ioqueue_op_key_t *op_key,
111
 
                                       pj_sock_t sock,
112
 
                                       pj_status_t status);
113
 
static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
114
 
                                        pj_status_t status);
115
 
#endif
116
 
 
117
 
PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
118
 
{
119
 
    pj_bzero(cfg, sizeof(*cfg));
120
 
    cfg->async_cnt = 1;
121
 
    cfg->concurrency = -1;
122
 
    cfg->whole_data = PJ_TRUE;
123
 
}
124
 
 
125
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
126
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
127
 
static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock)
128
 
{
129
 
    if (asock->readStream) {
130
 
        CFReadStreamClose(asock->readStream);
131
 
        CFRelease(asock->readStream);
132
 
        asock->readStream = NULL;
133
 
    }
134
 
}
135
 
 
136
 
static void activesock_create_iphone_os_stream(pj_activesock_t *asock)
137
 
{
138
 
    if (ios_bg_support && asock->bg_setting && asock->stream_oriented) {
139
 
        activesock_destroy_iphone_os_stream(asock);
140
 
 
141
 
        CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock,
142
 
                                     &asock->readStream, NULL);
143
 
 
144
 
        if (!asock->readStream ||
145
 
            CFReadStreamSetProperty(asock->readStream,
146
 
                                    kCFStreamNetworkServiceType,
147
 
                                    kCFStreamNetworkServiceTypeVoIP)
148
 
            != TRUE ||
149
 
            CFReadStreamOpen(asock->readStream) != TRUE)
150
 
        {
151
 
            PJ_LOG(2,("", "Failed to configure TCP transport for VoIP "
152
 
                      "usage. Background mode will not be supported."));
153
 
 
154
 
            activesock_destroy_iphone_os_stream(asock);
155
 
        }
156
 
    }
157
 
}
158
 
 
159
 
 
160
 
PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock,
161
 
                                            int val)
162
 
{
163
 
    asock->bg_setting = val;
164
 
    if (asock->bg_setting)
165
 
        activesock_create_iphone_os_stream(asock);
166
 
    else
167
 
        activesock_destroy_iphone_os_stream(asock);
168
 
}
169
 
 
170
 
PJ_DEF(void) pj_activesock_enable_iphone_os_bg(pj_bool_t val)
171
 
{
172
 
    ios_bg_support = val;
173
 
}
174
 
#endif
175
 
 
176
 
PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
177
 
                                          pj_sock_t sock,
178
 
                                          int sock_type,
179
 
                                          const pj_activesock_cfg *opt,
180
 
                                          pj_ioqueue_t *ioqueue,
181
 
                                          const pj_activesock_cb *cb,
182
 
                                          void *user_data,
183
 
                                          pj_activesock_t **p_asock)
184
 
{
185
 
    pj_activesock_t *asock;
186
 
    pj_ioqueue_callback ioq_cb;
187
 
    pj_status_t status;
188
 
 
189
 
    PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
190
 
    PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
191
 
    PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
192
 
                     sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
193
 
    PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
194
 
 
195
 
    asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
196
 
    asock->ioqueue = ioqueue;
197
 
    asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
198
 
    asock->async_count = (opt? opt->async_cnt : 1);
199
 
    asock->whole_data = (opt? opt->whole_data : 1);
200
 
    asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
201
 
    asock->user_data = user_data;
202
 
    pj_memcpy(&asock->cb, cb, sizeof(*cb));
203
 
 
204
 
    pj_bzero(&ioq_cb, sizeof(ioq_cb));
205
 
    ioq_cb.on_read_complete = &ioqueue_on_read_complete;
206
 
    ioq_cb.on_write_complete = &ioqueue_on_write_complete;
207
 
#if PJ_HAS_TCP
208
 
    ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
209
 
    ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
210
 
#endif
211
 
 
212
 
    status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
213
 
                                      &ioq_cb, &asock->key);
214
 
    if (status != PJ_SUCCESS) {
215
 
        pj_activesock_close(asock);
216
 
        return status;
217
 
    }
218
 
 
219
 
    if (asock->whole_data) {
220
 
        /* Must disable concurrency otherwise there is a race condition */
221
 
        pj_ioqueue_set_concurrency(asock->key, 0);
222
 
    } else if (opt && opt->concurrency >= 0) {
223
 
        pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
224
 
    }
225
 
 
226
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
227
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
228
 
    asock->sock = sock;
229
 
    asock->bg_setting = PJ_ACTIVESOCK_TCP_IPHONE_OS_BG;
230
 
#endif
231
 
 
232
 
    *p_asock = asock;
233
 
    return PJ_SUCCESS;
234
 
}
235
 
 
236
 
 
237
 
PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
238
 
                                              const pj_sockaddr *addr,
239
 
                                              const pj_activesock_cfg *opt,
240
 
                                              pj_ioqueue_t *ioqueue,
241
 
                                              const pj_activesock_cb *cb,
242
 
                                              void *user_data,
243
 
                                              pj_activesock_t **p_asock,
244
 
                                              pj_sockaddr *bound_addr)
245
 
{
246
 
    pj_sock_t sock_fd;
247
 
    pj_sockaddr default_addr;
248
 
    pj_status_t status;
249
 
 
250
 
    if (addr == NULL) {
251
 
        pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
252
 
        addr = &default_addr;
253
 
    }
254
 
 
255
 
    status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
256
 
                            &sock_fd);
257
 
    if (status != PJ_SUCCESS) {
258
 
        return status;
259
 
    }
260
 
 
261
 
    status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
262
 
    if (status != PJ_SUCCESS) {
263
 
        pj_sock_close(sock_fd);
264
 
        return status;
265
 
    }
266
 
 
267
 
    status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
268
 
                                  ioqueue, cb, user_data, p_asock);
269
 
    if (status != PJ_SUCCESS) {
270
 
        pj_sock_close(sock_fd);
271
 
        return status;
272
 
    }
273
 
 
274
 
    if (bound_addr) {
275
 
        int addr_len = sizeof(*bound_addr);
276
 
        status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
277
 
        if (status != PJ_SUCCESS) {
278
 
            pj_activesock_close(*p_asock);
279
 
            return status;
280
 
        }
281
 
    }
282
 
 
283
 
    return PJ_SUCCESS;
284
 
}
285
 
 
286
 
 
287
 
PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
288
 
{
289
 
    PJ_ASSERT_RETURN(asock, PJ_EINVAL);
290
 
    if (asock->key) {
291
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
292
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
293
 
        activesock_destroy_iphone_os_stream(asock);
294
 
#endif
295
 
 
296
 
        pj_ioqueue_unregister(asock->key);
297
 
        asock->key = NULL;
298
 
    }
299
 
    return PJ_SUCCESS;
300
 
}
301
 
 
302
 
 
303
 
PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
304
 
                                                 void *user_data)
305
 
{
306
 
    PJ_ASSERT_RETURN(asock, PJ_EINVAL);
307
 
    asock->user_data = user_data;
308
 
    return PJ_SUCCESS;
309
 
}
310
 
 
311
 
 
312
 
PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
313
 
{
314
 
    PJ_ASSERT_RETURN(asock, NULL);
315
 
    return asock->user_data;
316
 
}
317
 
 
318
 
 
319
 
PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
320
 
                                             pj_pool_t *pool,
321
 
                                             unsigned buff_size,
322
 
                                             pj_uint32_t flags)
323
 
{
324
 
    void **readbuf;
325
 
    unsigned i;
326
 
 
327
 
    PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
328
 
 
329
 
    readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
330
 
                                      sizeof(void*));
331
 
 
332
 
    for (i=0; i<asock->async_count; ++i) {
333
 
        readbuf[i] = pj_pool_alloc(pool, buff_size);
334
 
    }
335
 
 
336
 
    return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
337
 
}
338
 
 
339
 
 
340
 
PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
341
 
                                               pj_pool_t *pool,
342
 
                                               unsigned buff_size,
343
 
                                               void *readbuf[],
344
 
                                               pj_uint32_t flags)
345
 
{
346
 
    unsigned i;
347
 
    pj_status_t status;
348
 
 
349
 
    PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
350
 
    PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
351
 
    PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
352
 
 
353
 
    asock->read_op = (struct read_op*)
354
 
                     pj_pool_calloc(pool, asock->async_count,
355
 
                                    sizeof(struct read_op));
356
 
    asock->read_type = TYPE_RECV;
357
 
    asock->read_flags = flags;
358
 
 
359
 
    for (i=0; i<asock->async_count; ++i) {
360
 
        struct read_op *r = &asock->read_op[i];
361
 
        pj_ssize_t size_to_read;
362
 
 
363
 
        r->pkt = (pj_uint8_t*)readbuf[i];
364
 
        r->max_size = size_to_read = buff_size;
365
 
 
366
 
        status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
367
 
                                 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
368
 
        PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
369
 
 
370
 
        if (status != PJ_EPENDING)
371
 
            return status;
372
 
    }
373
 
 
374
 
    return PJ_SUCCESS;
375
 
}
376
 
 
377
 
 
378
 
PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
379
 
                                                 pj_pool_t *pool,
380
 
                                                 unsigned buff_size,
381
 
                                                 pj_uint32_t flags)
382
 
{
383
 
    void **readbuf;
384
 
    unsigned i;
385
 
 
386
 
    PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
387
 
 
388
 
    readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
389
 
                                      sizeof(void*));
390
 
 
391
 
    for (i=0; i<asock->async_count; ++i) {
392
 
        readbuf[i] = pj_pool_alloc(pool, buff_size);
393
 
    }
394
 
 
395
 
    return pj_activesock_start_recvfrom2(asock, pool, buff_size,
396
 
                                         readbuf, flags);
397
 
}
398
 
 
399
 
 
400
 
PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
401
 
                                                   pj_pool_t *pool,
402
 
                                                   unsigned buff_size,
403
 
                                                   void *readbuf[],
404
 
                                                   pj_uint32_t flags)
405
 
{
406
 
    unsigned i;
407
 
    pj_status_t status;
408
 
 
409
 
    PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
410
 
    PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
411
 
 
412
 
    asock->read_op = (struct read_op*)
413
 
                     pj_pool_calloc(pool, asock->async_count,
414
 
                                    sizeof(struct read_op));
415
 
    asock->read_type = TYPE_RECV_FROM;
416
 
    asock->read_flags = flags;
417
 
 
418
 
    for (i=0; i<asock->async_count; ++i) {
419
 
        struct read_op *r = &asock->read_op[i];
420
 
        pj_ssize_t size_to_read;
421
 
 
422
 
        r->pkt = (pj_uint8_t*) readbuf[i];
423
 
        r->max_size = size_to_read = buff_size;
424
 
        r->src_addr_len = sizeof(r->src_addr);
425
 
 
426
 
        status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
427
 
                                     &size_to_read,
428
 
                                     PJ_IOQUEUE_ALWAYS_ASYNC | flags,
429
 
                                     &r->src_addr, &r->src_addr_len);
430
 
        PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
431
 
 
432
 
        if (status != PJ_EPENDING)
433
 
            return status;
434
 
    }
435
 
 
436
 
    return PJ_SUCCESS;
437
 
}
438
 
 
439
 
 
440
 
static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
441
 
                                     pj_ioqueue_op_key_t *op_key,
442
 
                                     pj_ssize_t bytes_read)
443
 
{
444
 
    pj_activesock_t *asock;
445
 
    struct read_op *r = (struct read_op*)op_key;
446
 
    unsigned loop = 0;
447
 
    pj_status_t status;
448
 
 
449
 
    asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
450
 
 
451
 
    do {
452
 
        unsigned flags;
453
 
 
454
 
        if (bytes_read > 0) {
455
 
            /*
456
 
             * We've got new data.
457
 
             */
458
 
            pj_size_t remainder;
459
 
            pj_bool_t ret;
460
 
 
461
 
            /* Append this new data to existing data. If socket is stream
462
 
             * oriented, user might have left some data in the buffer.
463
 
             * Otherwise if socket is datagram there will be nothing in
464
 
             * existing packet hence the packet will contain only the new
465
 
             * packet.
466
 
             */
467
 
            r->size += bytes_read;
468
 
 
469
 
            /* Set default remainder to zero */
470
 
            remainder = 0;
471
 
 
472
 
            /* And return value to TRUE */
473
 
            ret = PJ_TRUE;
474
 
 
475
 
            /* Notify callback */
476
 
            if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
477
 
                ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
478
 
                                                PJ_SUCCESS, &remainder);
479
 
            } else if (asock->read_type == TYPE_RECV_FROM &&
480
 
                       asock->cb.on_data_recvfrom)
481
 
            {
482
 
                ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
483
 
                                                    &r->src_addr,
484
 
                                                    r->src_addr_len,
485
 
                                                    PJ_SUCCESS);
486
 
            }
487
 
 
488
 
            /* If callback returns false, we have been destroyed! */
489
 
            if (!ret)
490
 
                return;
491
 
 
492
 
            /* Only stream oriented socket may leave data in the packet */
493
 
            if (asock->stream_oriented) {
494
 
                r->size = remainder;
495
 
            } else {
496
 
                r->size = 0;
497
 
            }
498
 
 
499
 
        } else if (bytes_read <= 0 &&
500
 
                   -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
501
 
                   -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
502
 
                   (asock->stream_oriented ||
503
 
                    -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)))
504
 
        {
505
 
            pj_size_t remainder;
506
 
            pj_bool_t ret;
507
 
 
508
 
            if (bytes_read == 0) {
509
 
                /* For stream/connection oriented socket, this means the
510
 
                 * connection has been closed. For datagram sockets, it means
511
 
                 * we've received datagram with zero length.
512
 
                 */
513
 
                if (asock->stream_oriented)
514
 
                    status = PJ_EEOF;
515
 
                else
516
 
                    status = PJ_SUCCESS;
517
 
            } else {
518
 
                /* This means we've got an error. If this is stream/connection
519
 
                 * oriented, it means connection has been closed. For datagram
520
 
                 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
521
 
                 */
522
 
                status = -bytes_read;
523
 
            }
524
 
 
525
 
            /* Set default remainder to zero */
526
 
            remainder = 0;
527
 
 
528
 
            /* And return value to TRUE */
529
 
            ret = PJ_TRUE;
530
 
 
531
 
            /* Notify callback */
532
 
            if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
533
 
                /* For connection oriented socket, we still need to report
534
 
                 * the remainder data (if any) to the user to let user do
535
 
                 * processing with the remainder data before it closes the
536
 
                 * connection.
537
 
                 * If there is no remainder data, set the packet to NULL.
538
 
                 */
539
 
 
540
 
                /* Shouldn't set the packet to NULL, as there may be active
541
 
                 * socket user, such as SSL socket, that needs to have access
542
 
                 * to the read buffer packet.
543
 
                 */
544
 
                //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
545
 
                //                              r->size, status, &remainder);
546
 
                ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
547
 
                                                status, &remainder);
548
 
 
549
 
            } else if (asock->read_type == TYPE_RECV_FROM &&
550
 
                       asock->cb.on_data_recvfrom)
551
 
            {
552
 
                /* This would always be datagram oriented hence there's
553
 
                 * nothing in the packet. We can't be sure if there will be
554
 
                 * anything useful in the source_addr, so just put NULL
555
 
                 * there too.
556
 
                 */
557
 
                /* In some scenarios, status may be PJ_SUCCESS. The upper
558
 
                 * layer application may not expect the callback to be called
559
 
                 * with successful status and NULL data, so lets not call the
560
 
                 * callback if the status is PJ_SUCCESS.
561
 
                 */
562
 
                if (status != PJ_SUCCESS ) {
563
 
                    ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
564
 
                                                        NULL, 0, status);
565
 
                }
566
 
            }
567
 
 
568
 
            /* If callback returns false, we have been destroyed! */
569
 
            if (!ret)
570
 
                return;
571
 
 
572
 
            /* Only stream oriented socket may leave data in the packet */
573
 
            if (asock->stream_oriented) {
574
 
                r->size = remainder;
575
 
            } else {
576
 
                r->size = 0;
577
 
            }
578
 
        }
579
 
 
580
 
        /* Read next data. We limit ourselves to processing max_loop immediate
581
 
         * data, so when the loop counter has exceeded this value, force the
582
 
         * read()/recvfrom() to return pending operation to allow the program
583
 
         * to do other jobs.
584
 
         */
585
 
        bytes_read = r->max_size - r->size;
586
 
        flags = asock->read_flags;
587
 
        if (++loop >= asock->max_loop)
588
 
            flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
589
 
 
590
 
        if (asock->read_type == TYPE_RECV) {
591
 
            status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
592
 
                                     &bytes_read, flags);
593
 
        } else {
594
 
            r->src_addr_len = sizeof(r->src_addr);
595
 
            status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
596
 
                                         &bytes_read, flags,
597
 
                                         &r->src_addr, &r->src_addr_len);
598
 
        }
599
 
 
600
 
        if (status == PJ_SUCCESS) {
601
 
            /* Immediate data */
602
 
            ;
603
 
        } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
604
 
            /* Error */
605
 
            bytes_read = -status;
606
 
        } else {
607
 
            break;
608
 
        }
609
 
    } while (1);
610
 
 
611
 
}
612
 
 
613
 
 
614
 
static pj_status_t send_remaining(pj_activesock_t *asock,
615
 
                                  pj_ioqueue_op_key_t *send_key)
616
 
{
617
 
    struct send_data *sd = (struct send_data*)send_key->activesock_data;
618
 
    pj_status_t status;
619
 
 
620
 
    do {
621
 
        pj_ssize_t size;
622
 
 
623
 
        size = sd->len - sd->sent;
624
 
        status = pj_ioqueue_send(asock->key, send_key,
625
 
                                 sd->data+sd->sent, &size, sd->flags);
626
 
        if (status != PJ_SUCCESS) {
627
 
            /* Pending or error */
628
 
            break;
629
 
        }
630
 
 
631
 
        sd->sent += size;
632
 
        if (sd->sent == sd->len) {
633
 
            /* The whole data has been sent. */
634
 
            return PJ_SUCCESS;
635
 
        }
636
 
 
637
 
    } while (sd->sent < sd->len);
638
 
 
639
 
    return status;
640
 
}
641
 
 
642
 
 
643
 
PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
644
 
                                        pj_ioqueue_op_key_t *send_key,
645
 
                                        const void *data,
646
 
                                        pj_ssize_t *size,
647
 
                                        unsigned flags)
648
 
{
649
 
    PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
650
 
 
651
 
    send_key->activesock_data = NULL;
652
 
 
653
 
    if (asock->whole_data) {
654
 
        pj_ssize_t whole;
655
 
        pj_status_t status;
656
 
 
657
 
        whole = *size;
658
 
 
659
 
        status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
660
 
        if (status != PJ_SUCCESS) {
661
 
            /* Pending or error */
662
 
            return status;
663
 
        }
664
 
 
665
 
        if (*size == whole) {
666
 
            /* The whole data has been sent. */
667
 
            return PJ_SUCCESS;
668
 
        }
669
 
 
670
 
        /* Data was partially sent */
671
 
        asock->send_data.data = (pj_uint8_t*)data;
672
 
        asock->send_data.len = whole;
673
 
        asock->send_data.sent = *size;
674
 
        asock->send_data.flags = flags;
675
 
        send_key->activesock_data = &asock->send_data;
676
 
 
677
 
        /* Try again */
678
 
        status = send_remaining(asock, send_key);
679
 
        if (status == PJ_SUCCESS) {
680
 
            *size = whole;
681
 
        }
682
 
        return status;
683
 
 
684
 
    } else {
685
 
        return pj_ioqueue_send(asock->key, send_key, data, size, flags);
686
 
    }
687
 
}
688
 
 
689
 
 
690
 
PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
691
 
                                          pj_ioqueue_op_key_t *send_key,
692
 
                                          const void *data,
693
 
                                          pj_ssize_t *size,
694
 
                                          unsigned flags,
695
 
                                          const pj_sockaddr_t *addr,
696
 
                                          int addr_len)
697
 
{
698
 
    PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
699
 
                     PJ_EINVAL);
700
 
 
701
 
    return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
702
 
                             addr, addr_len);
703
 
}
704
 
 
705
 
 
706
 
static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
707
 
                                      pj_ioqueue_op_key_t *op_key,
708
 
                                      pj_ssize_t bytes_sent)
709
 
{
710
 
    pj_activesock_t *asock;
711
 
 
712
 
    asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
713
 
 
714
 
    if (bytes_sent > 0 && op_key->activesock_data) {
715
 
        /* whole_data is requested. Make sure we send all the data */
716
 
        struct send_data *sd = (struct send_data*)op_key->activesock_data;
717
 
 
718
 
        sd->sent += bytes_sent;
719
 
        if (sd->sent == sd->len) {
720
 
            /* all has been sent */
721
 
            bytes_sent = sd->sent;
722
 
            op_key->activesock_data = NULL;
723
 
        } else {
724
 
            /* send remaining data */
725
 
            pj_status_t status;
726
 
 
727
 
            status = send_remaining(asock, op_key);
728
 
            if (status == PJ_EPENDING)
729
 
                return;
730
 
            else if (status == PJ_SUCCESS)
731
 
                bytes_sent = sd->sent;
732
 
            else
733
 
                bytes_sent = -status;
734
 
 
735
 
            op_key->activesock_data = NULL;
736
 
        }
737
 
    }
738
 
 
739
 
    if (asock->cb.on_data_sent) {
740
 
        pj_bool_t ret;
741
 
 
742
 
        ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
743
 
 
744
 
        /* If callback returns false, we have been destroyed! */
745
 
        if (!ret)
746
 
            return;
747
 
    }
748
 
}
749
 
 
750
 
#if PJ_HAS_TCP
751
 
PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
752
 
                                               pj_pool_t *pool)
753
 
{
754
 
    unsigned i;
755
 
 
756
 
    PJ_ASSERT_RETURN(asock, PJ_EINVAL);
757
 
    PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
758
 
 
759
 
    asock->accept_op = (struct accept_op*)
760
 
                       pj_pool_calloc(pool, asock->async_count,
761
 
                                      sizeof(struct accept_op));
762
 
    for (i=0; i<asock->async_count; ++i) {
763
 
        struct accept_op *a = &asock->accept_op[i];
764
 
        pj_status_t status;
765
 
 
766
 
        do {
767
 
            a->new_sock = PJ_INVALID_SOCKET;
768
 
            a->rem_addr_len = sizeof(a->rem_addr);
769
 
 
770
 
            status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
771
 
                                       NULL, &a->rem_addr, &a->rem_addr_len);
772
 
            if (status == PJ_SUCCESS) {
773
 
                /* We've got immediate connection. Not sure if it's a good
774
 
                 * idea to call the callback now (probably application will
775
 
                 * not be prepared to process it), so lets just silently
776
 
                 * close the socket.
777
 
                 */
778
 
                pj_sock_close(a->new_sock);
779
 
            }
780
 
        } while (status == PJ_SUCCESS);
781
 
 
782
 
        if (status != PJ_EPENDING) {
783
 
            return status;
784
 
        }
785
 
    }
786
 
 
787
 
    return PJ_SUCCESS;
788
 
}
789
 
 
790
 
 
791
 
static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
792
 
                                       pj_ioqueue_op_key_t *op_key,
793
 
                                       pj_sock_t new_sock,
794
 
                                       pj_status_t status)
795
 
{
796
 
    pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
797
 
    struct accept_op *accept_op = (struct accept_op*) op_key;
798
 
 
799
 
    PJ_UNUSED_ARG(new_sock);
800
 
 
801
 
    do {
802
 
        if (status == asock->last_err && status != PJ_SUCCESS) {
803
 
            asock->err_counter++;
804
 
            if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) {
805
 
                PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()"
806
 
                               " operation, stopping further ioqueue accepts.",
807
 
                               asock->err_counter, asock->last_err));
808
 
                return;
809
 
            }
810
 
        } else {
811
 
            asock->err_counter = 0;
812
 
            asock->last_err = status;
813
 
        }
814
 
 
815
 
        if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
816
 
            pj_bool_t ret;
817
 
 
818
 
            /* Notify callback */
819
 
            ret = (*asock->cb.on_accept_complete)(asock, accept_op->new_sock,
820
 
                                                  &accept_op->rem_addr,
821
 
                                                  accept_op->rem_addr_len);
822
 
 
823
 
            /* If callback returns false, we have been destroyed! */
824
 
            if (!ret)
825
 
                return;
826
 
 
827
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
828
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
829
 
            activesock_create_iphone_os_stream(asock);
830
 
#endif
831
 
        } else if (status==PJ_SUCCESS) {
832
 
            /* Application doesn't handle the new socket, we need to
833
 
             * close it to avoid resource leak.
834
 
             */
835
 
            pj_sock_close(accept_op->new_sock);
836
 
        }
837
 
 
838
 
        /* Prepare next accept() */
839
 
        accept_op->new_sock = PJ_INVALID_SOCKET;
840
 
        accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
841
 
 
842
 
        status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
843
 
                                   NULL, &accept_op->rem_addr,
844
 
                                   &accept_op->rem_addr_len);
845
 
 
846
 
    } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
847
 
}
848
 
 
849
 
 
850
 
PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
851
 
                                                 pj_pool_t *pool,
852
 
                                                 const pj_sockaddr_t *remaddr,
853
 
                                                 int addr_len)
854
 
{
855
 
    PJ_UNUSED_ARG(pool);
856
 
    return pj_ioqueue_connect(asock->key, remaddr, addr_len);
857
 
}
858
 
 
859
 
static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
860
 
                                        pj_status_t status)
861
 
{
862
 
    pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
863
 
 
864
 
    if (asock->cb.on_connect_complete) {
865
 
        pj_bool_t ret;
866
 
 
867
 
        ret = (*asock->cb.on_connect_complete)(asock, status);
868
 
 
869
 
        if (!ret) {
870
 
            /* We've been destroyed */
871
 
            return;
872
 
        }
873
 
 
874
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
875
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
876
 
        activesock_create_iphone_os_stream(asock);
877
 
#endif
878
 
 
879
 
    }
880
 
}
881
 
#endif  /* PJ_HAS_TCP */