~ubuntu-branches/ubuntu/wily/sflphone/wily

« back to all changes in this revision

Viewing changes to daemon/libs/pjproject-2.1.0/pjlib/src/pj/activesock.c

  • Committer: Package Import Robot
  • Author(s): Francois Marier, Francois Marier, Mark Purcell
  • Date: 2014-10-18 15:08:50 UTC
  • mfrom: (1.1.12)
  • mto: This revision was merged to the branch mainline in revision 29.
  • Revision ID: package-import@ubuntu.com-20141018150850-2exfk34ckb15pcwi
Tags: 1.4.1-0.1
[ Francois Marier ]
* Non-maintainer upload
* New upstream release (closes: #759576, #741130)
  - debian/rules +PJPROJECT_VERSION := 2.2.1
  - add upstream patch to fix broken TLS support
  - add patch to fix pjproject regression

[ Mark Purcell ]
* Build-Depends:
  - sflphone-daemon + libavformat-dev, libavcodec-dev, libswscale-dev,
  libavdevice-dev, libavutil-dev
  - sflphone-gnome + libclutter-gtk-1.0-dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* $Id: activesock.c 4359 2013-02-21 11:18:36Z bennylp $ */
2
 
/* 
3
 
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
 
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5
 
 *
6
 
 * This program is free software; you can redistribute it and/or modify
7
 
 * it under the terms of the GNU General Public License as published by
8
 
 * the Free Software Foundation; either version 2 of the License, or
9
 
 * (at your option) any later version.
10
 
 *
11
 
 * This program is distributed in the hope that it will be useful,
12
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 * GNU General Public License for more details.
15
 
 *
16
 
 * You should have received a copy of the GNU General Public License
17
 
 * along with this program; if not, write to the Free Software
18
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
19
 
 */
20
 
#include <pj/activesock.h>
21
 
#include <pj/compat/socket.h>
22
 
#include <pj/assert.h>
23
 
#include <pj/errno.h>
24
 
#include <pj/log.h>
25
 
#include <pj/pool.h>
26
 
#include <pj/sock.h>
27
 
#include <pj/string.h>
28
 
 
29
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
30
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
31
 
#   include <CFNetwork/CFNetwork.h>
32
 
 
33
 
    static pj_bool_t ios_bg_support = PJ_TRUE;
34
 
#endif
35
 
 
36
 
#define PJ_ACTIVESOCK_MAX_LOOP      50
37
 
 
38
 
 
39
 
enum read_type
40
 
{
41
 
    TYPE_NONE,
42
 
    TYPE_RECV,
43
 
    TYPE_RECV_FROM
44
 
};
45
 
 
46
 
enum shutdown_dir
47
 
{
48
 
    SHUT_NONE = 0,
49
 
    SHUT_RX = 1,
50
 
    SHUT_TX = 2
51
 
};
52
 
 
53
 
struct read_op
54
 
{
55
 
    pj_ioqueue_op_key_t  op_key;
56
 
    pj_uint8_t          *pkt;
57
 
    unsigned             max_size;
58
 
    pj_size_t            size;
59
 
    pj_sockaddr          src_addr;
60
 
    int                  src_addr_len;
61
 
};
62
 
 
63
 
struct accept_op
64
 
{
65
 
    pj_ioqueue_op_key_t  op_key;
66
 
    pj_sock_t            new_sock;
67
 
    pj_sockaddr          rem_addr;
68
 
    int                  rem_addr_len;
69
 
};
70
 
 
71
 
struct send_data
72
 
{
73
 
    pj_uint8_t          *data;
74
 
    pj_ssize_t           len;
75
 
    pj_ssize_t           sent;
76
 
    unsigned             flags;
77
 
};
78
 
 
79
 
struct pj_activesock_t
80
 
{
81
 
    pj_ioqueue_key_t    *key;
82
 
    pj_bool_t            stream_oriented;
83
 
    pj_bool_t            whole_data;
84
 
    pj_ioqueue_t        *ioqueue;
85
 
    void                *user_data;
86
 
    unsigned             async_count;
87
 
    unsigned             shutdown;
88
 
    unsigned             max_loop;
89
 
    pj_activesock_cb     cb;
90
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
91
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
92
 
    int                  bg_setting;
93
 
    pj_sock_t            sock;
94
 
    CFReadStreamRef      readStream;
95
 
#endif
96
 
    
97
 
    unsigned             err_counter;
98
 
    pj_status_t          last_err;
99
 
 
100
 
    struct send_data     send_data;
101
 
 
102
 
    struct read_op      *read_op;
103
 
    pj_uint32_t          read_flags;
104
 
    enum read_type       read_type;
105
 
 
106
 
    struct accept_op    *accept_op;
107
 
};
108
 
 
109
 
 
110
 
static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, 
111
 
                                     pj_ioqueue_op_key_t *op_key, 
112
 
                                     pj_ssize_t bytes_read);
113
 
static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, 
114
 
                                      pj_ioqueue_op_key_t *op_key,
115
 
                                      pj_ssize_t bytes_sent);
116
 
#if PJ_HAS_TCP
117
 
static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, 
118
 
                                       pj_ioqueue_op_key_t *op_key,
119
 
                                       pj_sock_t sock, 
120
 
                                       pj_status_t status);
121
 
static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, 
122
 
                                        pj_status_t status);
123
 
#endif
124
 
 
125
 
PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
126
 
{
127
 
    pj_bzero(cfg, sizeof(*cfg));
128
 
    cfg->async_cnt = 1;
129
 
    cfg->concurrency = -1;
130
 
    cfg->whole_data = PJ_TRUE;
131
 
}
132
 
 
133
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
134
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
135
 
static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock)
136
 
{
137
 
    if (asock->readStream) {
138
 
        CFReadStreamClose(asock->readStream);
139
 
        CFRelease(asock->readStream);
140
 
        asock->readStream = NULL;
141
 
    }
142
 
}
143
 
 
144
 
static void activesock_create_iphone_os_stream(pj_activesock_t *asock)
145
 
{
146
 
    if (ios_bg_support && asock->bg_setting && asock->stream_oriented) {
147
 
        activesock_destroy_iphone_os_stream(asock);
148
 
 
149
 
        CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock,
150
 
                                     &asock->readStream, NULL);
151
 
 
152
 
        if (!asock->readStream ||
153
 
            CFReadStreamSetProperty(asock->readStream,
154
 
                                    kCFStreamNetworkServiceType,
155
 
                                    kCFStreamNetworkServiceTypeVoIP)
156
 
            != TRUE ||
157
 
            CFReadStreamOpen(asock->readStream) != TRUE)
158
 
        {
159
 
            PJ_LOG(2,("", "Failed to configure TCP transport for VoIP "
160
 
                      "usage. Background mode will not be supported."));
161
 
            
162
 
            activesock_destroy_iphone_os_stream(asock);
163
 
        }
164
 
    }
165
 
}
166
 
 
167
 
 
168
 
PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock,
169
 
                                            int val)
170
 
{
171
 
    asock->bg_setting = val;
172
 
    if (asock->bg_setting)
173
 
        activesock_create_iphone_os_stream(asock);
174
 
    else
175
 
        activesock_destroy_iphone_os_stream(asock);
176
 
}
177
 
 
178
 
PJ_DEF(void) pj_activesock_enable_iphone_os_bg(pj_bool_t val)
179
 
{
180
 
    ios_bg_support = val;
181
 
}
182
 
#endif
183
 
 
184
 
PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
185
 
                                          pj_sock_t sock,
186
 
                                          int sock_type,
187
 
                                          const pj_activesock_cfg *opt,
188
 
                                          pj_ioqueue_t *ioqueue,
189
 
                                          const pj_activesock_cb *cb,
190
 
                                          void *user_data,
191
 
                                          pj_activesock_t **p_asock)
192
 
{
193
 
    pj_activesock_t *asock;
194
 
    pj_ioqueue_callback ioq_cb;
195
 
    pj_status_t status;
196
 
 
197
 
    PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
198
 
    PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
199
 
    PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
200
 
                     sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
201
 
    PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
202
 
 
203
 
    asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
204
 
    asock->ioqueue = ioqueue;
205
 
    asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
206
 
    asock->async_count = (opt? opt->async_cnt : 1);
207
 
    asock->whole_data = (opt? opt->whole_data : 1);
208
 
    asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
209
 
    asock->user_data = user_data;
210
 
    pj_memcpy(&asock->cb, cb, sizeof(*cb));
211
 
 
212
 
    pj_bzero(&ioq_cb, sizeof(ioq_cb));
213
 
    ioq_cb.on_read_complete = &ioqueue_on_read_complete;
214
 
    ioq_cb.on_write_complete = &ioqueue_on_write_complete;
215
 
#if PJ_HAS_TCP
216
 
    ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
217
 
    ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
218
 
#endif
219
 
 
220
 
    status = pj_ioqueue_register_sock2(pool, ioqueue, sock,
221
 
                                       (opt? opt->grp_lock : NULL),
222
 
                                       asock, &ioq_cb, &asock->key);
223
 
    if (status != PJ_SUCCESS) {
224
 
        pj_activesock_close(asock);
225
 
        return status;
226
 
    }
227
 
    
228
 
    if (asock->whole_data) {
229
 
        /* Must disable concurrency otherwise there is a race condition */
230
 
        pj_ioqueue_set_concurrency(asock->key, 0);
231
 
    } else if (opt && opt->concurrency >= 0) {
232
 
        pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
233
 
    }
234
 
 
235
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
236
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
237
 
    asock->sock = sock;
238
 
    asock->bg_setting = PJ_ACTIVESOCK_TCP_IPHONE_OS_BG;
239
 
#endif
240
 
 
241
 
    *p_asock = asock;
242
 
    return PJ_SUCCESS;
243
 
}
244
 
 
245
 
 
246
 
PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
247
 
                                              const pj_sockaddr *addr,
248
 
                                              const pj_activesock_cfg *opt,
249
 
                                              pj_ioqueue_t *ioqueue,
250
 
                                              const pj_activesock_cb *cb,
251
 
                                              void *user_data,
252
 
                                              pj_activesock_t **p_asock,
253
 
                                              pj_sockaddr *bound_addr)
254
 
{
255
 
    pj_sock_t sock_fd;
256
 
    pj_sockaddr default_addr;
257
 
    pj_status_t status;
258
 
 
259
 
    if (addr == NULL) {
260
 
        pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
261
 
        addr = &default_addr;
262
 
    }
263
 
 
264
 
    status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0, 
265
 
                            &sock_fd);
266
 
    if (status != PJ_SUCCESS) {
267
 
        return status;
268
 
    }
269
 
 
270
 
    status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
271
 
    if (status != PJ_SUCCESS) {
272
 
        pj_sock_close(sock_fd);
273
 
        return status;
274
 
    }
275
 
 
276
 
    status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
277
 
                                  ioqueue, cb, user_data, p_asock);
278
 
    if (status != PJ_SUCCESS) {
279
 
        pj_sock_close(sock_fd);
280
 
        return status;
281
 
    }
282
 
 
283
 
    if (bound_addr) {
284
 
        int addr_len = sizeof(*bound_addr);
285
 
        status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
286
 
        if (status != PJ_SUCCESS) {
287
 
            pj_activesock_close(*p_asock);
288
 
            return status;
289
 
        }
290
 
    }
291
 
 
292
 
    return PJ_SUCCESS;
293
 
}
294
 
 
295
 
PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
296
 
{
297
 
    PJ_ASSERT_RETURN(asock, PJ_EINVAL);
298
 
    asock->shutdown = SHUT_RX | SHUT_TX;
299
 
    if (asock->key) {
300
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
301
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
302
 
        activesock_destroy_iphone_os_stream(asock);
303
 
#endif  
304
 
        
305
 
        pj_ioqueue_unregister(asock->key);
306
 
        asock->key = NULL;
307
 
    }
308
 
    return PJ_SUCCESS;
309
 
}
310
 
 
311
 
 
312
 
PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
313
 
                                                 void *user_data)
314
 
{
315
 
    PJ_ASSERT_RETURN(asock, PJ_EINVAL);
316
 
    asock->user_data = user_data;
317
 
    return PJ_SUCCESS;
318
 
}
319
 
 
320
 
 
321
 
PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
322
 
{
323
 
    PJ_ASSERT_RETURN(asock, NULL);
324
 
    return asock->user_data;
325
 
}
326
 
 
327
 
 
328
 
PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
329
 
                                             pj_pool_t *pool,
330
 
                                             unsigned buff_size,
331
 
                                             pj_uint32_t flags)
332
 
{
333
 
    void **readbuf;
334
 
    unsigned i;
335
 
 
336
 
    PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
337
 
 
338
 
    readbuf = (void**) pj_pool_calloc(pool, asock->async_count, 
339
 
                                      sizeof(void*));
340
 
 
341
 
    for (i=0; i<asock->async_count; ++i) {
342
 
        readbuf[i] = pj_pool_alloc(pool, buff_size);
343
 
    }
344
 
 
345
 
    return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
346
 
}
347
 
 
348
 
 
349
 
PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
350
 
                                               pj_pool_t *pool,
351
 
                                               unsigned buff_size,
352
 
                                               void *readbuf[],
353
 
                                               pj_uint32_t flags)
354
 
{
355
 
    unsigned i;
356
 
    pj_status_t status;
357
 
 
358
 
    PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
359
 
    PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
360
 
    PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
361
 
 
362
 
    asock->read_op = (struct read_op*)
363
 
                     pj_pool_calloc(pool, asock->async_count, 
364
 
                                    sizeof(struct read_op));
365
 
    asock->read_type = TYPE_RECV;
366
 
    asock->read_flags = flags;
367
 
 
368
 
    for (i=0; i<asock->async_count; ++i) {
369
 
        struct read_op *r = &asock->read_op[i];
370
 
        pj_ssize_t size_to_read;
371
 
 
372
 
        r->pkt = (pj_uint8_t*)readbuf[i];
373
 
        r->max_size = size_to_read = buff_size;
374
 
 
375
 
        status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
376
 
                                 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
377
 
        PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
378
 
 
379
 
        if (status != PJ_EPENDING)
380
 
            return status;
381
 
    }
382
 
 
383
 
    return PJ_SUCCESS;
384
 
}
385
 
 
386
 
 
387
 
PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
388
 
                                                 pj_pool_t *pool,
389
 
                                                 unsigned buff_size,
390
 
                                                 pj_uint32_t flags)
391
 
{
392
 
    void **readbuf;
393
 
    unsigned i;
394
 
 
395
 
    PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
396
 
 
397
 
    readbuf = (void**) pj_pool_calloc(pool, asock->async_count, 
398
 
                                      sizeof(void*));
399
 
 
400
 
    for (i=0; i<asock->async_count; ++i) {
401
 
        readbuf[i] = pj_pool_alloc(pool, buff_size);
402
 
    }
403
 
 
404
 
    return pj_activesock_start_recvfrom2(asock, pool, buff_size, 
405
 
                                         readbuf, flags);
406
 
}
407
 
 
408
 
 
409
 
PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
410
 
                                                   pj_pool_t *pool,
411
 
                                                   unsigned buff_size,
412
 
                                                   void *readbuf[],
413
 
                                                   pj_uint32_t flags)
414
 
{
415
 
    unsigned i;
416
 
    pj_status_t status;
417
 
 
418
 
    PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
419
 
    PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
420
 
 
421
 
    asock->read_op = (struct read_op*)
422
 
                     pj_pool_calloc(pool, asock->async_count, 
423
 
                                    sizeof(struct read_op));
424
 
    asock->read_type = TYPE_RECV_FROM;
425
 
    asock->read_flags = flags;
426
 
 
427
 
    for (i=0; i<asock->async_count; ++i) {
428
 
        struct read_op *r = &asock->read_op[i];
429
 
        pj_ssize_t size_to_read;
430
 
 
431
 
        r->pkt = (pj_uint8_t*) readbuf[i];
432
 
        r->max_size = size_to_read = buff_size;
433
 
        r->src_addr_len = sizeof(r->src_addr);
434
 
 
435
 
        status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
436
 
                                     &size_to_read, 
437
 
                                     PJ_IOQUEUE_ALWAYS_ASYNC | flags,
438
 
                                     &r->src_addr, &r->src_addr_len);
439
 
        PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
440
 
 
441
 
        if (status != PJ_EPENDING)
442
 
            return status;
443
 
    }
444
 
 
445
 
    return PJ_SUCCESS;
446
 
}
447
 
 
448
 
 
449
 
static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, 
450
 
                                     pj_ioqueue_op_key_t *op_key, 
451
 
                                     pj_ssize_t bytes_read)
452
 
{
453
 
    pj_activesock_t *asock;
454
 
    struct read_op *r = (struct read_op*)op_key;
455
 
    unsigned loop = 0;
456
 
    pj_status_t status;
457
 
 
458
 
    asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
459
 
 
460
 
    /* Ignore if we've been shutdown */
461
 
    if (asock->shutdown & SHUT_RX)
462
 
        return;
463
 
 
464
 
    do {
465
 
        unsigned flags;
466
 
 
467
 
        if (bytes_read > 0) {
468
 
            /*
469
 
             * We've got new data.
470
 
             */
471
 
            pj_size_t remainder;
472
 
            pj_bool_t ret;
473
 
 
474
 
            /* Append this new data to existing data. If socket is stream 
475
 
             * oriented, user might have left some data in the buffer. 
476
 
             * Otherwise if socket is datagram there will be nothing in 
477
 
             * existing packet hence the packet will contain only the new
478
 
             * packet.
479
 
             */
480
 
            r->size += bytes_read;
481
 
 
482
 
            /* Set default remainder to zero */
483
 
            remainder = 0;
484
 
 
485
 
            /* And return value to TRUE */
486
 
            ret = PJ_TRUE;
487
 
 
488
 
            /* Notify callback */
489
 
            if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
490
 
                ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
491
 
                                                PJ_SUCCESS, &remainder);
492
 
            } else if (asock->read_type == TYPE_RECV_FROM && 
493
 
                       asock->cb.on_data_recvfrom) 
494
 
            {
495
 
                ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
496
 
                                                    &r->src_addr, 
497
 
                                                    r->src_addr_len,
498
 
                                                    PJ_SUCCESS);
499
 
            }
500
 
 
501
 
            /* If callback returns false, we have been destroyed! */
502
 
            if (!ret)
503
 
                return;
504
 
 
505
 
            /* Only stream oriented socket may leave data in the packet */
506
 
            if (asock->stream_oriented) {
507
 
                r->size = remainder;
508
 
            } else {
509
 
                r->size = 0;
510
 
            }
511
 
 
512
 
        } else if (bytes_read <= 0 &&
513
 
                   -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
514
 
                   -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && 
515
 
                   (asock->stream_oriented ||
516
 
                    -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))) 
517
 
        {
518
 
            pj_size_t remainder;
519
 
            pj_bool_t ret;
520
 
 
521
 
            if (bytes_read == 0) {
522
 
                /* For stream/connection oriented socket, this means the 
523
 
                 * connection has been closed. For datagram sockets, it means
524
 
                 * we've received datagram with zero length.
525
 
                 */
526
 
                if (asock->stream_oriented)
527
 
                    status = PJ_EEOF;
528
 
                else
529
 
                    status = PJ_SUCCESS;
530
 
            } else {
531
 
                /* This means we've got an error. If this is stream/connection
532
 
                 * oriented, it means connection has been closed. For datagram
533
 
                 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
534
 
                 */
535
 
                status = -bytes_read;
536
 
            }
537
 
 
538
 
            /* Set default remainder to zero */
539
 
            remainder = 0;
540
 
 
541
 
            /* And return value to TRUE */
542
 
            ret = PJ_TRUE;
543
 
 
544
 
            /* Notify callback */
545
 
            if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
546
 
                /* For connection oriented socket, we still need to report 
547
 
                 * the remainder data (if any) to the user to let user do 
548
 
                 * processing with the remainder data before it closes the
549
 
                 * connection.
550
 
                 * If there is no remainder data, set the packet to NULL.
551
 
                 */
552
 
 
553
 
                /* Shouldn't set the packet to NULL, as there may be active 
554
 
                 * socket user, such as SSL socket, that needs to have access
555
 
                 * to the read buffer packet.
556
 
                 */
557
 
                //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
558
 
                //                              r->size, status, &remainder);
559
 
                ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
560
 
                                                status, &remainder);
561
 
 
562
 
            } else if (asock->read_type == TYPE_RECV_FROM && 
563
 
                       asock->cb.on_data_recvfrom) 
564
 
            {
565
 
                /* This would always be datagram oriented hence there's 
566
 
                 * nothing in the packet. We can't be sure if there will be
567
 
                 * anything useful in the source_addr, so just put NULL
568
 
                 * there too.
569
 
                 */
570
 
                /* In some scenarios, status may be PJ_SUCCESS. The upper 
571
 
                 * layer application may not expect the callback to be called
572
 
                 * with successful status and NULL data, so lets not call the
573
 
                 * callback if the status is PJ_SUCCESS.
574
 
                 */
575
 
                if (status != PJ_SUCCESS ) {
576
 
                    ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
577
 
                                                        NULL, 0, status);
578
 
                }
579
 
            }
580
 
 
581
 
            /* If callback returns false, we have been destroyed! */
582
 
            if (!ret)
583
 
                return;
584
 
 
585
 
            /* Also stop further read if we've been shutdown */
586
 
            if (asock->shutdown & SHUT_RX)
587
 
                return;
588
 
 
589
 
            /* Only stream oriented socket may leave data in the packet */
590
 
            if (asock->stream_oriented) {
591
 
                r->size = remainder;
592
 
            } else {
593
 
                r->size = 0;
594
 
            }
595
 
        }
596
 
 
597
 
        /* Read next data. We limit ourselves to processing max_loop immediate
598
 
         * data, so when the loop counter has exceeded this value, force the
599
 
         * read()/recvfrom() to return pending operation to allow the program
600
 
         * to do other jobs.
601
 
         */
602
 
        bytes_read = r->max_size - r->size;
603
 
        flags = asock->read_flags;
604
 
        if (++loop >= asock->max_loop)
605
 
            flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
606
 
 
607
 
        if (asock->read_type == TYPE_RECV) {
608
 
            status = pj_ioqueue_recv(key, op_key, r->pkt + r->size, 
609
 
                                     &bytes_read, flags);
610
 
        } else {
611
 
            r->src_addr_len = sizeof(r->src_addr);
612
 
            status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
613
 
                                         &bytes_read, flags,
614
 
                                         &r->src_addr, &r->src_addr_len);
615
 
        }
616
 
 
617
 
        if (status == PJ_SUCCESS) {
618
 
            /* Immediate data */
619
 
            ;
620
 
        } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
621
 
            /* Error */
622
 
            bytes_read = -status;
623
 
        } else {
624
 
            break;
625
 
        }
626
 
    } while (1);
627
 
 
628
 
}
629
 
 
630
 
 
631
 
static pj_status_t send_remaining(pj_activesock_t *asock, 
632
 
                                  pj_ioqueue_op_key_t *send_key)
633
 
{
634
 
    struct send_data *sd = (struct send_data*)send_key->activesock_data;
635
 
    pj_status_t status;
636
 
 
637
 
    do {
638
 
        pj_ssize_t size;
639
 
 
640
 
        size = sd->len - sd->sent;
641
 
        status = pj_ioqueue_send(asock->key, send_key, 
642
 
                                 sd->data+sd->sent, &size, sd->flags);
643
 
        if (status != PJ_SUCCESS) {
644
 
            /* Pending or error */
645
 
            break;
646
 
        }
647
 
 
648
 
        sd->sent += size;
649
 
        if (sd->sent == sd->len) {
650
 
            /* The whole data has been sent. */
651
 
            return PJ_SUCCESS;
652
 
        }
653
 
 
654
 
    } while (sd->sent < sd->len);
655
 
 
656
 
    return status;
657
 
}
658
 
 
659
 
 
660
 
PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
661
 
                                        pj_ioqueue_op_key_t *send_key,
662
 
                                        const void *data,
663
 
                                        pj_ssize_t *size,
664
 
                                        unsigned flags)
665
 
{
666
 
    PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
667
 
 
668
 
    if (asock->shutdown & SHUT_TX)
669
 
        return PJ_EINVALIDOP;
670
 
 
671
 
    send_key->activesock_data = NULL;
672
 
 
673
 
    if (asock->whole_data) {
674
 
        pj_ssize_t whole;
675
 
        pj_status_t status;
676
 
 
677
 
        whole = *size;
678
 
 
679
 
        status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
680
 
        if (status != PJ_SUCCESS) {
681
 
            /* Pending or error */
682
 
            return status;
683
 
        }
684
 
 
685
 
        if (*size == whole) {
686
 
            /* The whole data has been sent. */
687
 
            return PJ_SUCCESS;
688
 
        }
689
 
 
690
 
        /* Data was partially sent */
691
 
        asock->send_data.data = (pj_uint8_t*)data;
692
 
        asock->send_data.len = whole;
693
 
        asock->send_data.sent = *size;
694
 
        asock->send_data.flags = flags;
695
 
        send_key->activesock_data = &asock->send_data;
696
 
 
697
 
        /* Try again */
698
 
        status = send_remaining(asock, send_key);
699
 
        if (status == PJ_SUCCESS) {
700
 
            *size = whole;
701
 
        }
702
 
        return status;
703
 
 
704
 
    } else {
705
 
        return pj_ioqueue_send(asock->key, send_key, data, size, flags);
706
 
    }
707
 
}
708
 
 
709
 
 
710
 
PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
711
 
                                          pj_ioqueue_op_key_t *send_key,
712
 
                                          const void *data,
713
 
                                          pj_ssize_t *size,
714
 
                                          unsigned flags,
715
 
                                          const pj_sockaddr_t *addr,
716
 
                                          int addr_len)
717
 
{
718
 
    PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len, 
719
 
                     PJ_EINVAL);
720
 
 
721
 
    if (asock->shutdown & SHUT_TX)
722
 
        return PJ_EINVALIDOP;
723
 
 
724
 
    return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
725
 
                             addr, addr_len);
726
 
}
727
 
 
728
 
 
729
 
static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, 
730
 
                                      pj_ioqueue_op_key_t *op_key,
731
 
                                      pj_ssize_t bytes_sent)
732
 
{
733
 
    pj_activesock_t *asock;
734
 
 
735
 
    asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
736
 
 
737
 
    /* Ignore if we've been shutdown. This may cause data to be partially
738
 
     * sent even when 'wholedata' was requested if the OS only sent partial
739
 
     * buffer.
740
 
     */
741
 
    if (asock->shutdown & SHUT_TX)
742
 
        return;
743
 
 
744
 
    if (bytes_sent > 0 && op_key->activesock_data) {
745
 
        /* whole_data is requested. Make sure we send all the data */
746
 
        struct send_data *sd = (struct send_data*)op_key->activesock_data;
747
 
 
748
 
        sd->sent += bytes_sent;
749
 
        if (sd->sent == sd->len) {
750
 
            /* all has been sent */
751
 
            bytes_sent = sd->sent;
752
 
            op_key->activesock_data = NULL;
753
 
        } else {
754
 
            /* send remaining data */
755
 
            pj_status_t status;
756
 
 
757
 
            status = send_remaining(asock, op_key);
758
 
            if (status == PJ_EPENDING)
759
 
                return;
760
 
            else if (status == PJ_SUCCESS)
761
 
                bytes_sent = sd->sent;
762
 
            else
763
 
                bytes_sent = -status;
764
 
 
765
 
            op_key->activesock_data = NULL;
766
 
        }
767
 
    } 
768
 
 
769
 
    if (asock->cb.on_data_sent) {
770
 
        pj_bool_t ret;
771
 
 
772
 
        ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
773
 
 
774
 
        /* If callback returns false, we have been destroyed! */
775
 
        if (!ret)
776
 
            return;
777
 
    }
778
 
}
779
 
 
780
 
#if PJ_HAS_TCP
781
 
PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
782
 
                                               pj_pool_t *pool)
783
 
{
784
 
    unsigned i;
785
 
 
786
 
    PJ_ASSERT_RETURN(asock, PJ_EINVAL);
787
 
    PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
788
 
 
789
 
    /* Ignore if we've been shutdown */
790
 
    if (asock->shutdown)
791
 
        return PJ_EINVALIDOP;
792
 
 
793
 
    asock->accept_op = (struct accept_op*)
794
 
                       pj_pool_calloc(pool, asock->async_count,
795
 
                                      sizeof(struct accept_op));
796
 
    for (i=0; i<asock->async_count; ++i) {
797
 
        struct accept_op *a = &asock->accept_op[i];
798
 
        pj_status_t status;
799
 
 
800
 
        do {
801
 
            a->new_sock = PJ_INVALID_SOCKET;
802
 
            a->rem_addr_len = sizeof(a->rem_addr);
803
 
 
804
 
            status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
805
 
                                       NULL, &a->rem_addr, &a->rem_addr_len);
806
 
            if (status == PJ_SUCCESS) {
807
 
                /* We've got immediate connection. Not sure if it's a good
808
 
                 * idea to call the callback now (probably application will
809
 
                 * not be prepared to process it), so lets just silently
810
 
                 * close the socket.
811
 
                 */
812
 
                pj_sock_close(a->new_sock);
813
 
            }
814
 
        } while (status == PJ_SUCCESS);
815
 
 
816
 
        if (status != PJ_EPENDING) {
817
 
            return status;
818
 
        }
819
 
    }
820
 
 
821
 
    return PJ_SUCCESS;
822
 
}
823
 
 
824
 
 
825
 
static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, 
826
 
                                       pj_ioqueue_op_key_t *op_key,
827
 
                                       pj_sock_t new_sock, 
828
 
                                       pj_status_t status)
829
 
{
830
 
    pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
831
 
    struct accept_op *accept_op = (struct accept_op*) op_key;
832
 
 
833
 
    PJ_UNUSED_ARG(new_sock);
834
 
 
835
 
    /* Ignore if we've been shutdown */
836
 
    if (asock->shutdown)
837
 
        return;
838
 
 
839
 
    do {
840
 
        if (status == asock->last_err && status != PJ_SUCCESS) {
841
 
            asock->err_counter++;
842
 
            if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) {
843
 
                PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()"
844
 
                               " operation, stopping further ioqueue accepts.",
845
 
                               asock->err_counter, asock->last_err));
846
 
                return;
847
 
            }
848
 
        } else {
849
 
            asock->err_counter = 0;
850
 
            asock->last_err = status;
851
 
        }
852
 
 
853
 
        if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
854
 
            pj_bool_t ret;
855
 
 
856
 
            /* Notify callback */
857
 
            ret = (*asock->cb.on_accept_complete)(asock, accept_op->new_sock,
858
 
                                                  &accept_op->rem_addr,
859
 
                                                  accept_op->rem_addr_len);
860
 
 
861
 
            /* If callback returns false, we have been destroyed! */
862
 
            if (!ret)
863
 
                return;
864
 
 
865
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
866
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
867
 
            activesock_create_iphone_os_stream(asock);
868
 
#endif
869
 
        } else if (status==PJ_SUCCESS) {
870
 
            /* Application doesn't handle the new socket, we need to 
871
 
             * close it to avoid resource leak.
872
 
             */
873
 
            pj_sock_close(accept_op->new_sock);
874
 
        }
875
 
 
876
 
        /* Don't start another accept() if we've been shutdown */
877
 
        if (asock->shutdown)
878
 
            return;
879
 
 
880
 
        /* Prepare next accept() */
881
 
        accept_op->new_sock = PJ_INVALID_SOCKET;
882
 
        accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
883
 
 
884
 
        status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
885
 
                                   NULL, &accept_op->rem_addr, 
886
 
                                   &accept_op->rem_addr_len);
887
 
 
888
 
    } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
889
 
}
890
 
 
891
 
 
892
 
PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
893
 
                                                 pj_pool_t *pool,
894
 
                                                 const pj_sockaddr_t *remaddr,
895
 
                                                 int addr_len)
896
 
{
897
 
    PJ_UNUSED_ARG(pool);
898
 
 
899
 
    if (asock->shutdown)
900
 
        return PJ_EINVALIDOP;
901
 
 
902
 
    return pj_ioqueue_connect(asock->key, remaddr, addr_len);
903
 
}
904
 
 
905
 
static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, 
906
 
                                        pj_status_t status)
907
 
{
908
 
    pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
909
 
 
910
 
    /* Ignore if we've been shutdown */
911
 
    if (asock->shutdown)
912
 
        return;
913
 
 
914
 
    if (asock->cb.on_connect_complete) {
915
 
        pj_bool_t ret;
916
 
 
917
 
        ret = (*asock->cb.on_connect_complete)(asock, status);
918
 
 
919
 
        if (!ret) {
920
 
            /* We've been destroyed */
921
 
            return;
922
 
        }
923
 
        
924
 
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
925
 
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
926
 
        activesock_create_iphone_os_stream(asock);
927
 
#endif
928
 
        
929
 
    }
930
 
}
931
 
#endif  /* PJ_HAS_TCP */
932