1
/* $Id: activesock.c 3553 2011-05-05 06:14:19Z nanang $ */
3
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
#include <pj/activesock.h>
21
#include <pj/compat/socket.h>
22
#include <pj/assert.h>
27
#include <pj/string.h>
29
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
30
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
31
# include <CFNetwork/CFNetwork.h>
33
static pj_bool_t ios_bg_support = PJ_TRUE;
36
#define PJ_ACTIVESOCK_MAX_LOOP 50
48
pj_ioqueue_op_key_t op_key;
58
pj_ioqueue_op_key_t op_key;
72
struct pj_activesock_t
74
pj_ioqueue_key_t *key;
75
pj_bool_t stream_oriented;
77
pj_ioqueue_t *ioqueue;
82
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
83
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
86
CFReadStreamRef readStream;
92
struct send_data send_data;
94
struct read_op *read_op;
95
pj_uint32_t read_flags;
96
enum read_type read_type;
98
struct accept_op *accept_op;
102
static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
103
pj_ioqueue_op_key_t *op_key,
104
pj_ssize_t bytes_read);
105
static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
106
pj_ioqueue_op_key_t *op_key,
107
pj_ssize_t bytes_sent);
109
static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
110
pj_ioqueue_op_key_t *op_key,
113
static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
117
PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
119
pj_bzero(cfg, sizeof(*cfg));
121
cfg->concurrency = -1;
122
cfg->whole_data = PJ_TRUE;
125
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
126
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
127
static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock)
129
if (asock->readStream) {
130
CFReadStreamClose(asock->readStream);
131
CFRelease(asock->readStream);
132
asock->readStream = NULL;
136
static void activesock_create_iphone_os_stream(pj_activesock_t *asock)
138
if (ios_bg_support && asock->bg_setting && asock->stream_oriented) {
139
activesock_destroy_iphone_os_stream(asock);
141
CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock,
142
&asock->readStream, NULL);
144
if (!asock->readStream ||
145
CFReadStreamSetProperty(asock->readStream,
146
kCFStreamNetworkServiceType,
147
kCFStreamNetworkServiceTypeVoIP)
149
CFReadStreamOpen(asock->readStream) != TRUE)
151
PJ_LOG(2,("", "Failed to configure TCP transport for VoIP "
152
"usage. Background mode will not be supported."));
154
activesock_destroy_iphone_os_stream(asock);
160
PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock,
163
asock->bg_setting = val;
164
if (asock->bg_setting)
165
activesock_create_iphone_os_stream(asock);
167
activesock_destroy_iphone_os_stream(asock);
170
PJ_DEF(void) pj_activesock_enable_iphone_os_bg(pj_bool_t val)
172
ios_bg_support = val;
176
PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
179
const pj_activesock_cfg *opt,
180
pj_ioqueue_t *ioqueue,
181
const pj_activesock_cb *cb,
183
pj_activesock_t **p_asock)
185
pj_activesock_t *asock;
186
pj_ioqueue_callback ioq_cb;
189
PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
190
PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
191
PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
192
sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
193
PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
195
asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
196
asock->ioqueue = ioqueue;
197
asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
198
asock->async_count = (opt? opt->async_cnt : 1);
199
asock->whole_data = (opt? opt->whole_data : 1);
200
asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
201
asock->user_data = user_data;
202
pj_memcpy(&asock->cb, cb, sizeof(*cb));
204
pj_bzero(&ioq_cb, sizeof(ioq_cb));
205
ioq_cb.on_read_complete = &ioqueue_on_read_complete;
206
ioq_cb.on_write_complete = &ioqueue_on_write_complete;
208
ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
209
ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
212
status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
213
&ioq_cb, &asock->key);
214
if (status != PJ_SUCCESS) {
215
pj_activesock_close(asock);
219
if (asock->whole_data) {
220
/* Must disable concurrency otherwise there is a race condition */
221
pj_ioqueue_set_concurrency(asock->key, 0);
222
} else if (opt && opt->concurrency >= 0) {
223
pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
226
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
227
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
229
asock->bg_setting = PJ_ACTIVESOCK_TCP_IPHONE_OS_BG;
237
PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
238
const pj_sockaddr *addr,
239
const pj_activesock_cfg *opt,
240
pj_ioqueue_t *ioqueue,
241
const pj_activesock_cb *cb,
243
pj_activesock_t **p_asock,
244
pj_sockaddr *bound_addr)
247
pj_sockaddr default_addr;
251
pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
252
addr = &default_addr;
255
status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
257
if (status != PJ_SUCCESS) {
261
status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
262
if (status != PJ_SUCCESS) {
263
pj_sock_close(sock_fd);
267
status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
268
ioqueue, cb, user_data, p_asock);
269
if (status != PJ_SUCCESS) {
270
pj_sock_close(sock_fd);
275
int addr_len = sizeof(*bound_addr);
276
status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
277
if (status != PJ_SUCCESS) {
278
pj_activesock_close(*p_asock);
287
PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
289
PJ_ASSERT_RETURN(asock, PJ_EINVAL);
291
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
292
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
293
activesock_destroy_iphone_os_stream(asock);
296
pj_ioqueue_unregister(asock->key);
303
PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
306
PJ_ASSERT_RETURN(asock, PJ_EINVAL);
307
asock->user_data = user_data;
312
PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
314
PJ_ASSERT_RETURN(asock, NULL);
315
return asock->user_data;
319
PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
327
PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
329
readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
332
for (i=0; i<asock->async_count; ++i) {
333
readbuf[i] = pj_pool_alloc(pool, buff_size);
336
return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
340
PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
349
PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
350
PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
351
PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
353
asock->read_op = (struct read_op*)
354
pj_pool_calloc(pool, asock->async_count,
355
sizeof(struct read_op));
356
asock->read_type = TYPE_RECV;
357
asock->read_flags = flags;
359
for (i=0; i<asock->async_count; ++i) {
360
struct read_op *r = &asock->read_op[i];
361
pj_ssize_t size_to_read;
363
r->pkt = (pj_uint8_t*)readbuf[i];
364
r->max_size = size_to_read = buff_size;
366
status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
367
PJ_IOQUEUE_ALWAYS_ASYNC | flags);
368
PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
370
if (status != PJ_EPENDING)
378
PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
386
PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
388
readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
391
for (i=0; i<asock->async_count; ++i) {
392
readbuf[i] = pj_pool_alloc(pool, buff_size);
395
return pj_activesock_start_recvfrom2(asock, pool, buff_size,
400
PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
409
PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
410
PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
412
asock->read_op = (struct read_op*)
413
pj_pool_calloc(pool, asock->async_count,
414
sizeof(struct read_op));
415
asock->read_type = TYPE_RECV_FROM;
416
asock->read_flags = flags;
418
for (i=0; i<asock->async_count; ++i) {
419
struct read_op *r = &asock->read_op[i];
420
pj_ssize_t size_to_read;
422
r->pkt = (pj_uint8_t*) readbuf[i];
423
r->max_size = size_to_read = buff_size;
424
r->src_addr_len = sizeof(r->src_addr);
426
status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
428
PJ_IOQUEUE_ALWAYS_ASYNC | flags,
429
&r->src_addr, &r->src_addr_len);
430
PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
432
if (status != PJ_EPENDING)
440
static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
441
pj_ioqueue_op_key_t *op_key,
442
pj_ssize_t bytes_read)
444
pj_activesock_t *asock;
445
struct read_op *r = (struct read_op*)op_key;
449
asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
454
if (bytes_read > 0) {
456
* We've got new data.
461
/* Append this new data to existing data. If socket is stream
462
* oriented, user might have left some data in the buffer.
463
* Otherwise if socket is datagram there will be nothing in
464
* existing packet hence the packet will contain only the new
467
r->size += bytes_read;
469
/* Set default remainder to zero */
472
/* And return value to TRUE */
475
/* Notify callback */
476
if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
477
ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
478
PJ_SUCCESS, &remainder);
479
} else if (asock->read_type == TYPE_RECV_FROM &&
480
asock->cb.on_data_recvfrom)
482
ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
488
/* If callback returns false, we have been destroyed! */
492
/* Only stream oriented socket may leave data in the packet */
493
if (asock->stream_oriented) {
499
} else if (bytes_read <= 0 &&
500
-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
501
-bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
502
(asock->stream_oriented ||
503
-bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)))
508
if (bytes_read == 0) {
509
/* For stream/connection oriented socket, this means the
510
* connection has been closed. For datagram sockets, it means
511
* we've received datagram with zero length.
513
if (asock->stream_oriented)
518
/* This means we've got an error. If this is stream/connection
519
* oriented, it means connection has been closed. For datagram
520
* sockets, it means we've got some error (e.g. EWOULDBLOCK).
522
status = -bytes_read;
525
/* Set default remainder to zero */
528
/* And return value to TRUE */
531
/* Notify callback */
532
if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
533
/* For connection oriented socket, we still need to report
534
* the remainder data (if any) to the user to let user do
535
* processing with the remainder data before it closes the
537
* If there is no remainder data, set the packet to NULL.
540
/* Shouldn't set the packet to NULL, as there may be active
541
* socket user, such as SSL socket, that needs to have access
542
* to the read buffer packet.
544
//ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
545
// r->size, status, &remainder);
546
ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
549
} else if (asock->read_type == TYPE_RECV_FROM &&
550
asock->cb.on_data_recvfrom)
552
/* This would always be datagram oriented hence there's
553
* nothing in the packet. We can't be sure if there will be
554
* anything useful in the source_addr, so just put NULL
557
/* In some scenarios, status may be PJ_SUCCESS. The upper
558
* layer application may not expect the callback to be called
559
* with successful status and NULL data, so lets not call the
560
* callback if the status is PJ_SUCCESS.
562
if (status != PJ_SUCCESS ) {
563
ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
568
/* If callback returns false, we have been destroyed! */
572
/* Only stream oriented socket may leave data in the packet */
573
if (asock->stream_oriented) {
580
/* Read next data. We limit ourselves to processing max_loop immediate
581
* data, so when the loop counter has exceeded this value, force the
582
* read()/recvfrom() to return pending operation to allow the program
585
bytes_read = r->max_size - r->size;
586
flags = asock->read_flags;
587
if (++loop >= asock->max_loop)
588
flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
590
if (asock->read_type == TYPE_RECV) {
591
status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
594
r->src_addr_len = sizeof(r->src_addr);
595
status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
597
&r->src_addr, &r->src_addr_len);
600
if (status == PJ_SUCCESS) {
603
} else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
605
bytes_read = -status;
614
static pj_status_t send_remaining(pj_activesock_t *asock,
615
pj_ioqueue_op_key_t *send_key)
617
struct send_data *sd = (struct send_data*)send_key->activesock_data;
623
size = sd->len - sd->sent;
624
status = pj_ioqueue_send(asock->key, send_key,
625
sd->data+sd->sent, &size, sd->flags);
626
if (status != PJ_SUCCESS) {
627
/* Pending or error */
632
if (sd->sent == sd->len) {
633
/* The whole data has been sent. */
637
} while (sd->sent < sd->len);
643
PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
644
pj_ioqueue_op_key_t *send_key,
649
PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
651
send_key->activesock_data = NULL;
653
if (asock->whole_data) {
659
status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
660
if (status != PJ_SUCCESS) {
661
/* Pending or error */
665
if (*size == whole) {
666
/* The whole data has been sent. */
670
/* Data was partially sent */
671
asock->send_data.data = (pj_uint8_t*)data;
672
asock->send_data.len = whole;
673
asock->send_data.sent = *size;
674
asock->send_data.flags = flags;
675
send_key->activesock_data = &asock->send_data;
678
status = send_remaining(asock, send_key);
679
if (status == PJ_SUCCESS) {
685
return pj_ioqueue_send(asock->key, send_key, data, size, flags);
690
PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
691
pj_ioqueue_op_key_t *send_key,
695
const pj_sockaddr_t *addr,
698
PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
701
return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
706
static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
707
pj_ioqueue_op_key_t *op_key,
708
pj_ssize_t bytes_sent)
710
pj_activesock_t *asock;
712
asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
714
if (bytes_sent > 0 && op_key->activesock_data) {
715
/* whole_data is requested. Make sure we send all the data */
716
struct send_data *sd = (struct send_data*)op_key->activesock_data;
718
sd->sent += bytes_sent;
719
if (sd->sent == sd->len) {
720
/* all has been sent */
721
bytes_sent = sd->sent;
722
op_key->activesock_data = NULL;
724
/* send remaining data */
727
status = send_remaining(asock, op_key);
728
if (status == PJ_EPENDING)
730
else if (status == PJ_SUCCESS)
731
bytes_sent = sd->sent;
733
bytes_sent = -status;
735
op_key->activesock_data = NULL;
739
if (asock->cb.on_data_sent) {
742
ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
744
/* If callback returns false, we have been destroyed! */
751
PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
756
PJ_ASSERT_RETURN(asock, PJ_EINVAL);
757
PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
759
asock->accept_op = (struct accept_op*)
760
pj_pool_calloc(pool, asock->async_count,
761
sizeof(struct accept_op));
762
for (i=0; i<asock->async_count; ++i) {
763
struct accept_op *a = &asock->accept_op[i];
767
a->new_sock = PJ_INVALID_SOCKET;
768
a->rem_addr_len = sizeof(a->rem_addr);
770
status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
771
NULL, &a->rem_addr, &a->rem_addr_len);
772
if (status == PJ_SUCCESS) {
773
/* We've got immediate connection. Not sure if it's a good
774
* idea to call the callback now (probably application will
775
* not be prepared to process it), so lets just silently
778
pj_sock_close(a->new_sock);
780
} while (status == PJ_SUCCESS);
782
if (status != PJ_EPENDING) {
791
static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
792
pj_ioqueue_op_key_t *op_key,
796
pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
797
struct accept_op *accept_op = (struct accept_op*) op_key;
799
PJ_UNUSED_ARG(new_sock);
802
if (status == asock->last_err && status != PJ_SUCCESS) {
803
asock->err_counter++;
804
if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) {
805
PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()"
806
" operation, stopping further ioqueue accepts.",
807
asock->err_counter, asock->last_err));
811
asock->err_counter = 0;
812
asock->last_err = status;
815
if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
818
/* Notify callback */
819
ret = (*asock->cb.on_accept_complete)(asock, accept_op->new_sock,
820
&accept_op->rem_addr,
821
accept_op->rem_addr_len);
823
/* If callback returns false, we have been destroyed! */
827
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
828
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
829
activesock_create_iphone_os_stream(asock);
831
} else if (status==PJ_SUCCESS) {
832
/* Application doesn't handle the new socket, we need to
833
* close it to avoid resource leak.
835
pj_sock_close(accept_op->new_sock);
838
/* Prepare next accept() */
839
accept_op->new_sock = PJ_INVALID_SOCKET;
840
accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
842
status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
843
NULL, &accept_op->rem_addr,
844
&accept_op->rem_addr_len);
846
} while (status != PJ_EPENDING && status != PJ_ECANCELLED);
850
PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
852
const pj_sockaddr_t *remaddr,
856
return pj_ioqueue_connect(asock->key, remaddr, addr_len);
859
static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
862
pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
864
if (asock->cb.on_connect_complete) {
867
ret = (*asock->cb.on_connect_complete)(asock, status);
870
/* We've been destroyed */
874
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
875
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
876
activesock_create_iphone_os_stream(asock);
881
#endif /* PJ_HAS_TCP */