1
/* $Id: sip_transport_tcp.c 4294 2012-11-06 05:02:10Z nanang $ */
3
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
#include <pjsip/sip_transport_tcp.h>
21
#include <pjsip/sip_endpoint.h>
22
#include <pjsip/sip_errno.h>
23
#include <pj/compat/socket.h>
24
#include <pj/addr_resolv.h>
25
#include <pj/activesock.h>
26
#include <pj/assert.h>
31
#include <pj/string.h>
33
/* Only declare the API if PJ_HAS_TCP is true */
34
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
37
#define THIS_FILE "sip_transport_tcp.c"
39
#define MAX_ASYNC_CNT 16
40
#define POOL_LIS_INIT 512
41
#define POOL_LIS_INC 512
42
#define POOL_TP_INIT 512
43
#define POOL_TP_INC 512
50
* This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the
51
* SIP transport factory).
55
pjsip_tpfactory factory;
56
pj_bool_t is_registered;
57
pjsip_endpoint *endpt;
59
pj_activesock_t *asock;
60
pj_sockaddr bound_addr;
62
pj_qos_params qos_params;
67
* This structure is used to keep delayed transmit operation in a list.
68
* A delayed transmission occurs when application sends tx_data when
69
* the TCP connect/establishment is still in progress. These delayed
70
* transmission will be "flushed" once the socket is connected (either
71
* successfully or with errors).
75
PJ_DECL_LIST_MEMBER(struct delayed_tdata);
76
pjsip_tx_data_op_key *tdata_op_key;
82
* This structure describes the TCP transport, and it's descendant of
90
/* Do not save listener instance in the transport, because
91
* listener might be destroyed during transport's lifetime.
92
* See http://trac.pjsip.org/repos/ticket/491
93
struct tcp_listener *listener;
96
pj_bool_t is_registered;
98
pj_status_t close_reason;
100
pj_activesock_t *asock;
101
pj_bool_t has_pending_connect;
103
/* Keep-alive timer. */
104
pj_timer_entry ka_timer;
105
pj_time_val last_activity;
106
pjsip_tx_data_op_key ka_op_key;
109
/* TCP transport can only have one rdata!
110
* Otherwise chunks of incoming PDU may be received on different
115
/* Pending transmission list. */
116
struct delayed_tdata delayed_list;
120
/****************************************************************************
124
/* This callback is called when pending accept() operation completes. */
125
static pj_bool_t on_accept_complete(pj_activesock_t *asock,
127
const pj_sockaddr_t *src_addr,
130
/* This callback is called by transport manager to destroy listener */
131
static pj_status_t lis_destroy(pjsip_tpfactory *factory);
133
/* This callback is called by transport manager to create transport */
134
static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
136
pjsip_endpoint *endpt,
137
const pj_sockaddr *rem_addr,
139
pjsip_transport **transport);
141
/* Common function to create and initialize transport */
142
static pj_status_t tcp_create(struct tcp_listener *listener,
144
pj_sock_t sock, pj_bool_t is_server,
145
const pj_sockaddr *local,
146
const pj_sockaddr *remote,
147
struct tcp_transport **p_tcp);
150
static void tcp_perror(const char *sender, const char *title,
153
char errmsg[PJ_ERR_MSG_SIZE];
155
pj_strerror(status, errmsg, sizeof(errmsg));
157
PJ_LOG(1,(sender, "%s: %s [code=%d]", title, errmsg, status));
161
static void sockaddr_to_host_port( pj_pool_t *pool,
162
pjsip_host_port *host_port,
163
const pj_sockaddr *addr )
165
host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4);
166
pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 0);
167
host_port->host.slen = pj_ansi_strlen(host_port->host.ptr);
168
host_port->port = pj_sockaddr_get_port(addr);
172
static void tcp_init_shutdown(struct tcp_transport *tcp, pj_status_t status)
174
pjsip_tp_state_callback state_cb;
176
if (tcp->close_reason == PJ_SUCCESS)
177
tcp->close_reason = status;
179
if (tcp->base.is_shutdown)
182
/* Prevent immediate transport destroy by application, as transport
183
* state notification callback may be stacked and transport instance
184
* must remain valid at any point in the callback.
186
pjsip_transport_add_ref(&tcp->base);
188
/* Notify application of transport disconnected state */
189
state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
191
pjsip_transport_state_info state_info;
193
pj_bzero(&state_info, sizeof(state_info));
194
state_info.status = tcp->close_reason;
195
(*state_cb)(&tcp->base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
198
/* We can not destroy the transport since high level objects may
199
* still keep reference to this transport. So we can only
200
* instruct transport manager to gracefully start the shutdown
201
* procedure for this transport.
203
pjsip_transport_shutdown(&tcp->base);
205
/* Now, it is ok to destroy the transport. */
206
pjsip_transport_dec_ref(&tcp->base);
211
* Initialize pjsip_tcp_transport_cfg structure with default values.
213
PJ_DEF(void) pjsip_tcp_transport_cfg_default(pjsip_tcp_transport_cfg *cfg,
216
pj_bzero(cfg, sizeof(*cfg));
218
pj_sockaddr_init(cfg->af, &cfg->bind_addr, NULL, 0);
223
/****************************************************************************
224
* The TCP listener/transport factory.
228
* This is the public API to create, initialize, register, and start the
231
PJ_DEF(pj_status_t) pjsip_tcp_transport_start3(
232
pjsip_endpoint *endpt,
233
const pjsip_tcp_transport_cfg *cfg,
234
pjsip_tpfactory **p_factory
238
pj_sock_t sock = PJ_INVALID_SOCKET;
239
struct tcp_listener *listener;
240
pj_activesock_cfg asock_cfg;
241
pj_activesock_cb listener_cb;
242
pj_sockaddr *listener_addr;
247
PJ_ASSERT_RETURN(endpt && cfg->async_cnt, PJ_EINVAL);
249
/* Verify that address given in a_name (if any) is valid */
250
if (cfg->addr_name.host.slen) {
253
status = pj_sockaddr_init(cfg->af, &tmp, &cfg->addr_name.host,
254
(pj_uint16_t)cfg->addr_name.port);
255
if (status != PJ_SUCCESS || !pj_sockaddr_has_addr(&tmp) ||
256
(cfg->af==pj_AF_INET() &&
257
tmp.ipv4.sin_addr.s_addr==PJ_INADDR_NONE))
259
/* Invalid address */
264
pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_LIS_INIT,
266
PJ_ASSERT_RETURN(pool, PJ_ENOMEM);
269
listener = PJ_POOL_ZALLOC_T(pool, struct tcp_listener);
270
listener->factory.pool = pool;
271
listener->factory.type = cfg->af==pj_AF_INET() ? PJSIP_TRANSPORT_TCP :
272
PJSIP_TRANSPORT_TCP6;
273
listener->factory.type_name = (char*)
274
pjsip_transport_get_type_name(listener->factory.type);
275
listener->factory.flag =
276
pjsip_transport_get_flag_from_type(listener->factory.type);
277
listener->qos_type = cfg->qos_type;
278
pj_memcpy(&listener->qos_params, &cfg->qos_params,
279
sizeof(cfg->qos_params));
281
pj_ansi_strcpy(listener->factory.obj_name, "tcplis");
282
if (listener->factory.type==PJSIP_TRANSPORT_TCP6)
283
pj_ansi_strcat(listener->factory.obj_name, "6");
285
status = pj_lock_create_recursive_mutex(pool, listener->factory.obj_name,
286
&listener->factory.lock);
287
if (status != PJ_SUCCESS)
292
status = pj_sock_socket(cfg->af, pj_SOCK_STREAM(), 0, &sock);
293
if (status != PJ_SUCCESS)
296
/* Apply QoS, if specified */
297
status = pj_sock_apply_qos2(sock, cfg->qos_type, &cfg->qos_params,
298
2, listener->factory.obj_name,
299
"SIP TCP listener socket");
301
/* Bind address may be different than factory.local_addr because
302
* factory.local_addr will be resolved below.
304
pj_sockaddr_cp(&listener->bound_addr, &cfg->bind_addr);
307
listener_addr = &listener->factory.local_addr;
308
pj_sockaddr_cp(listener_addr, &cfg->bind_addr);
310
status = pj_sock_bind(sock, listener_addr,
311
pj_sockaddr_get_len(listener_addr));
312
if (status != PJ_SUCCESS)
315
/* Retrieve the bound address */
316
addr_len = pj_sockaddr_get_len(listener_addr);
317
status = pj_sock_getsockname(sock, listener_addr, &addr_len);
318
if (status != PJ_SUCCESS)
321
/* If published host/IP is specified, then use that address as the
322
* listener advertised address.
324
if (cfg->addr_name.host.slen) {
325
/* Copy the address */
326
listener->factory.addr_name = cfg->addr_name;
327
pj_strdup(listener->factory.pool, &listener->factory.addr_name.host,
328
&cfg->addr_name.host);
329
listener->factory.addr_name.port = cfg->addr_name.port;
332
/* No published address is given, use the bound address */
334
/* If the address returns 0.0.0.0, use the default
335
* interface address as the transport's address.
337
if (!pj_sockaddr_has_addr(listener_addr)) {
340
status = pj_gethostip(listener->bound_addr.addr.sa_family,
342
if (status != PJ_SUCCESS)
345
pj_sockaddr_copy_addr(listener_addr, &hostip);
348
/* Save the address name */
349
sockaddr_to_host_port(listener->factory.pool,
350
&listener->factory.addr_name,
354
/* If port is zero, get the bound port */
355
if (listener->factory.addr_name.port == 0) {
356
listener->factory.addr_name.port = pj_sockaddr_get_port(listener_addr);
359
pj_ansi_snprintf(listener->factory.obj_name,
360
sizeof(listener->factory.obj_name),
361
"tcplis:%d", listener->factory.addr_name.port);
364
/* Start listening to the address */
365
status = pj_sock_listen(sock, PJSIP_TCP_TRANSPORT_BACKLOG);
366
if (status != PJ_SUCCESS)
370
/* Create active socket */
371
pj_activesock_cfg_default(&asock_cfg);
372
if (cfg->async_cnt > MAX_ASYNC_CNT)
373
asock_cfg.async_cnt = MAX_ASYNC_CNT;
375
asock_cfg.async_cnt = cfg->async_cnt;
377
pj_bzero(&listener_cb, sizeof(listener_cb));
378
listener_cb.on_accept_complete = &on_accept_complete;
379
status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
380
pjsip_endpt_get_ioqueue(endpt),
381
&listener_cb, listener,
384
/* Register to transport manager */
385
listener->endpt = endpt;
386
listener->tpmgr = pjsip_endpt_get_tpmgr(endpt);
387
listener->factory.create_transport = lis_create_transport;
388
listener->factory.destroy = lis_destroy;
389
listener->is_registered = PJ_TRUE;
390
status = pjsip_tpmgr_register_tpfactory(listener->tpmgr,
392
if (status != PJ_SUCCESS) {
393
listener->is_registered = PJ_FALSE;
397
/* Start pending accept() operations */
398
status = pj_activesock_start_accept(listener->asock, pool);
399
if (status != PJ_SUCCESS)
402
PJ_LOG(4,(listener->factory.obj_name,
403
"SIP TCP listener ready for incoming connections at %.*s:%d",
404
(int)listener->factory.addr_name.host.slen,
405
listener->factory.addr_name.host.ptr,
406
listener->factory.addr_name.port));
408
/* Return the pointer to user */
409
if (p_factory) *p_factory = &listener->factory;
414
if (listener->asock==NULL && sock!=PJ_INVALID_SOCKET)
416
lis_destroy(&listener->factory);
422
* This is the public API to create, initialize, register, and start the
425
PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
426
const pj_sockaddr_in *local,
427
const pjsip_host_port *a_name,
429
pjsip_tpfactory **p_factory)
431
pjsip_tcp_transport_cfg cfg;
433
pjsip_tcp_transport_cfg_default(&cfg, pj_AF_INET());
436
pj_sockaddr_cp(&cfg.bind_addr, local);
438
pj_sockaddr_init(cfg.af, &cfg.bind_addr, NULL, 0);
441
pj_memcpy(&cfg.addr_name, a_name, sizeof(*a_name));
444
cfg.async_cnt = async_cnt;
446
return pjsip_tcp_transport_start3(endpt, &cfg, p_factory);
451
* This is the public API to create, initialize, register, and start the
454
PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
455
const pj_sockaddr_in *local,
457
pjsip_tpfactory **p_factory)
459
return pjsip_tcp_transport_start2(endpt, local, NULL, async_cnt, p_factory);
463
/* This callback is called by transport manager to destroy listener */
464
static pj_status_t lis_destroy(pjsip_tpfactory *factory)
466
struct tcp_listener *listener = (struct tcp_listener *)factory;
468
if (listener->is_registered) {
469
pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory);
470
listener->is_registered = PJ_FALSE;
473
if (listener->asock) {
474
pj_activesock_close(listener->asock);
475
listener->asock = NULL;
478
if (listener->factory.lock) {
479
pj_lock_destroy(listener->factory.lock);
480
listener->factory.lock = NULL;
483
if (listener->factory.pool) {
484
pj_pool_t *pool = listener->factory.pool;
486
PJ_LOG(4,(listener->factory.obj_name, "SIP TCP listener destroyed"));
488
listener->factory.pool = NULL;
489
pj_pool_release(pool);
496
/***************************************************************************/
504
/* Called by transport manager to send message */
505
static pj_status_t tcp_send_msg(pjsip_transport *transport,
506
pjsip_tx_data *tdata,
507
const pj_sockaddr_t *rem_addr,
510
pjsip_transport_callback callback);
512
/* Called by transport manager to shutdown */
513
static pj_status_t tcp_shutdown(pjsip_transport *transport);
515
/* Called by transport manager to destroy transport */
516
static pj_status_t tcp_destroy_transport(pjsip_transport *transport);
518
/* Utility to destroy transport */
519
static pj_status_t tcp_destroy(pjsip_transport *transport,
522
/* Callback on incoming data */
523
static pj_bool_t on_data_read(pj_activesock_t *asock,
527
pj_size_t *remainder);
529
/* Callback when packet is sent */
530
static pj_bool_t on_data_sent(pj_activesock_t *asock,
531
pj_ioqueue_op_key_t *send_key,
534
/* Callback when connect completes */
535
static pj_bool_t on_connect_complete(pj_activesock_t *asock,
538
/* TCP keep-alive timer callback */
539
static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e);
542
* Common function to create TCP transport, called when pending accept() and
543
* pending connect() complete.
545
static pj_status_t tcp_create( struct tcp_listener *listener,
547
pj_sock_t sock, pj_bool_t is_server,
548
const pj_sockaddr *local,
549
const pj_sockaddr *remote,
550
struct tcp_transport **p_tcp)
552
struct tcp_transport *tcp;
553
pj_ioqueue_t *ioqueue;
554
pj_activesock_cfg asock_cfg;
555
pj_activesock_cb tcp_callback;
556
const pj_str_t ka_pkt = PJSIP_TCP_KEEP_ALIVE_DATA;
557
char print_addr[PJ_INET6_ADDRSTRLEN+10];
561
PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL);
565
pool = pjsip_endpt_create_pool(listener->endpt, "tcp",
566
POOL_TP_INIT, POOL_TP_INC);
567
PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM);
571
* Create and initialize basic transport structure.
573
tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport);
574
tcp->is_server = is_server;
576
/*tcp->listener = listener;*/
577
pj_list_init(&tcp->delayed_list);
578
tcp->base.pool = pool;
580
pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME,
581
(is_server ? "tcps%p" :"tcpc%p"), tcp);
583
status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt);
584
if (status != PJ_SUCCESS) {
588
status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock);
589
if (status != PJ_SUCCESS) {
593
tcp->base.key.type = listener->factory.type;
594
pj_sockaddr_cp(&tcp->base.key.rem_addr, remote);
595
tcp->base.type_name = (char*)pjsip_transport_get_type_name(
596
(pjsip_transport_type_e)tcp->base.key.type);
597
tcp->base.flag = pjsip_transport_get_flag_from_type(
598
(pjsip_transport_type_e)tcp->base.key.type);
600
tcp->base.info = (char*) pj_pool_alloc(pool, 64);
601
pj_ansi_snprintf(tcp->base.info, 64, "%s to %s",
603
pj_sockaddr_print(remote, print_addr,
604
sizeof(print_addr), 3));
606
tcp->base.addr_len = pj_sockaddr_get_len(remote);
607
pj_sockaddr_cp(&tcp->base.local_addr, local);
608
sockaddr_to_host_port(pool, &tcp->base.local_name, local);
609
sockaddr_to_host_port(pool, &tcp->base.remote_name, remote);
610
tcp->base.dir = is_server? PJSIP_TP_DIR_INCOMING : PJSIP_TP_DIR_OUTGOING;
612
tcp->base.endpt = listener->endpt;
613
tcp->base.tpmgr = listener->tpmgr;
614
tcp->base.send_msg = &tcp_send_msg;
615
tcp->base.do_shutdown = &tcp_shutdown;
616
tcp->base.destroy = &tcp_destroy_transport;
618
/* Create active socket */
619
pj_activesock_cfg_default(&asock_cfg);
620
asock_cfg.async_cnt = 1;
622
pj_bzero(&tcp_callback, sizeof(tcp_callback));
623
tcp_callback.on_data_read = &on_data_read;
624
tcp_callback.on_data_sent = &on_data_sent;
625
tcp_callback.on_connect_complete = &on_connect_complete;
627
ioqueue = pjsip_endpt_get_ioqueue(listener->endpt);
628
status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
629
ioqueue, &tcp_callback, tcp, &tcp->asock);
630
if (status != PJ_SUCCESS) {
634
/* Register transport to transport manager */
635
status = pjsip_transport_register(listener->tpmgr, &tcp->base);
636
if (status != PJ_SUCCESS) {
640
tcp->is_registered = PJ_TRUE;
642
/* Initialize keep-alive timer */
643
tcp->ka_timer.user_data = (void*)tcp;
644
tcp->ka_timer.cb = &tcp_keep_alive_timer;
645
pj_ioqueue_op_key_init(&tcp->ka_op_key.key, sizeof(pj_ioqueue_op_key_t));
646
pj_strdup(tcp->base.pool, &tcp->ka_pkt, &ka_pkt);
648
/* Done setting up basic transport. */
651
PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created",
652
(tcp->is_server ? "server" : "client")));
657
tcp_destroy(&tcp->base, status);
662
/* Flush all delayed transmision once the socket is connected. */
663
static void tcp_flush_pending_tx(struct tcp_transport *tcp)
667
pj_gettickcount(&now);
668
pj_lock_acquire(tcp->base.lock);
669
while (!pj_list_empty(&tcp->delayed_list)) {
670
struct delayed_tdata *pending_tx;
671
pjsip_tx_data *tdata;
672
pj_ioqueue_op_key_t *op_key;
676
pending_tx = tcp->delayed_list.next;
677
pj_list_erase(pending_tx);
679
tdata = pending_tx->tdata_op_key->tdata;
680
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
682
if (pending_tx->timeout.sec > 0 &&
683
PJ_TIME_VAL_GT(now, pending_tx->timeout))
689
size = tdata->buf.cur - tdata->buf.start;
690
status = pj_activesock_send(tcp->asock, op_key, tdata->buf.start,
692
if (status != PJ_EPENDING) {
693
pj_lock_release(tcp->base.lock);
694
on_data_sent(tcp->asock, op_key, size);
695
pj_lock_acquire(tcp->base.lock);
699
pj_lock_release(tcp->base.lock);
703
/* Called by transport manager to destroy transport */
704
static pj_status_t tcp_destroy_transport(pjsip_transport *transport)
706
struct tcp_transport *tcp = (struct tcp_transport*)transport;
708
/* Transport would have been unregistered by now since this callback
709
* is called by transport manager.
711
tcp->is_registered = PJ_FALSE;
713
return tcp_destroy(transport, tcp->close_reason);
717
/* Destroy TCP transport */
718
static pj_status_t tcp_destroy(pjsip_transport *transport,
721
struct tcp_transport *tcp = (struct tcp_transport*)transport;
723
if (tcp->close_reason == 0)
724
tcp->close_reason = reason;
726
if (tcp->is_registered) {
727
tcp->is_registered = PJ_FALSE;
728
pjsip_transport_destroy(transport);
730
/* pjsip_transport_destroy will recursively call this function
736
/* Mark transport as closing */
737
tcp->is_closing = PJ_TRUE;
739
/* Stop keep-alive timer. */
740
if (tcp->ka_timer.id) {
741
pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer);
742
tcp->ka_timer.id = PJ_FALSE;
745
/* Cancel all delayed transmits */
746
while (!pj_list_empty(&tcp->delayed_list)) {
747
struct delayed_tdata *pending_tx;
748
pj_ioqueue_op_key_t *op_key;
750
pending_tx = tcp->delayed_list.next;
751
pj_list_erase(pending_tx);
753
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
755
on_data_sent(tcp->asock, op_key, -reason);
758
if (tcp->rdata.tp_info.pool) {
759
pj_pool_release(tcp->rdata.tp_info.pool);
760
tcp->rdata.tp_info.pool = NULL;
764
pj_activesock_close(tcp->asock);
766
tcp->sock = PJ_INVALID_SOCKET;
767
} else if (tcp->sock != PJ_INVALID_SOCKET) {
768
pj_sock_close(tcp->sock);
769
tcp->sock = PJ_INVALID_SOCKET;
772
if (tcp->base.lock) {
773
pj_lock_destroy(tcp->base.lock);
774
tcp->base.lock = NULL;
777
if (tcp->base.ref_cnt) {
778
pj_atomic_destroy(tcp->base.ref_cnt);
779
tcp->base.ref_cnt = NULL;
782
if (tcp->base.pool) {
785
if (reason != PJ_SUCCESS) {
786
char errmsg[PJ_ERR_MSG_SIZE];
788
pj_strerror(reason, errmsg, sizeof(errmsg));
789
PJ_LOG(4,(tcp->base.obj_name,
790
"TCP transport destroyed with reason %d: %s",
795
PJ_LOG(4,(tcp->base.obj_name,
796
"TCP transport destroyed normally"));
800
pool = tcp->base.pool;
801
tcp->base.pool = NULL;
802
pj_pool_release(pool);
810
* This utility function creates receive data buffers and start
811
* asynchronous recv() operations from the socket. It is called after
812
* accept() or connect() operation complete.
814
static pj_status_t tcp_start_read(struct tcp_transport *tcp)
818
pj_sockaddr *rem_addr;
823
pool = pjsip_endpt_create_pool(tcp->base.endpt,
825
PJSIP_POOL_RDATA_LEN,
826
PJSIP_POOL_RDATA_INC);
828
tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM);
832
tcp->rdata.tp_info.pool = pool;
834
tcp->rdata.tp_info.transport = &tcp->base;
835
tcp->rdata.tp_info.tp_data = tcp;
836
tcp->rdata.tp_info.op_key.rdata = &tcp->rdata;
837
pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key,
838
sizeof(pj_ioqueue_op_key_t));
840
tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr;
841
tcp->rdata.pkt_info.src_addr_len = sizeof(tcp->rdata.pkt_info.src_addr);
842
rem_addr = &tcp->base.key.rem_addr;
843
pj_sockaddr_print(rem_addr, tcp->rdata.pkt_info.src_name,
844
sizeof(tcp->rdata.pkt_info.src_name), 0);
845
tcp->rdata.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
847
size = sizeof(tcp->rdata.pkt_info.packet);
848
readbuf[0] = tcp->rdata.pkt_info.packet;
849
status = pj_activesock_start_read2(tcp->asock, tcp->base.pool, size,
851
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
852
PJ_LOG(4, (tcp->base.obj_name,
853
"pj_activesock_start_read() error, status=%d",
862
/* This callback is called by transport manager for the TCP factory
863
* to create outgoing transport to the specified destination.
865
static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
867
pjsip_endpoint *endpt,
868
const pj_sockaddr *rem_addr,
870
pjsip_transport **p_transport)
872
struct tcp_listener *listener;
873
struct tcp_transport *tcp;
875
pj_sockaddr local_addr;
879
PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr &&
880
addr_len && p_transport, PJ_EINVAL);
882
/* Check that address is a sockaddr_in or sockaddr_in6*/
883
PJ_ASSERT_RETURN((rem_addr->addr.sa_family == pj_AF_INET() &&
884
addr_len == sizeof(pj_sockaddr_in)) ||
885
(rem_addr->addr.sa_family == pj_AF_INET6() &&
886
addr_len == sizeof(pj_sockaddr_in6)), PJ_EINVAL);
889
listener = (struct tcp_listener*)factory;
892
status = pj_sock_socket(rem_addr->addr.sa_family, pj_SOCK_STREAM(),
894
if (status != PJ_SUCCESS)
897
/* Apply QoS, if specified */
898
status = pj_sock_apply_qos2(sock, listener->qos_type,
899
&listener->qos_params,
900
2, listener->factory.obj_name,
901
"outgoing SIP TCP socket");
903
/* Bind to listener's address and any port */
904
pj_bzero(&local_addr, sizeof(local_addr));
905
pj_sockaddr_cp(&local_addr, &listener->bound_addr);
906
pj_sockaddr_set_port(&local_addr, 0);
908
status = pj_sock_bind(sock, &local_addr,
909
pj_sockaddr_get_len(&local_addr));
910
if (status != PJ_SUCCESS) {
915
/* Get the local port */
916
addr_len = sizeof(local_addr);
917
status = pj_sock_getsockname(sock, &local_addr, &addr_len);
918
if (status != PJ_SUCCESS) {
923
/* Initially set the address from the listener's address */
924
if (!pj_sockaddr_has_addr(&local_addr)) {
925
pj_sockaddr_copy_addr(&local_addr, &listener->factory.local_addr);
928
/* Create the transport descriptor */
929
status = tcp_create(listener, NULL, sock, PJ_FALSE, &local_addr,
931
if (status != PJ_SUCCESS)
935
/* Start asynchronous connect() operation */
936
tcp->has_pending_connect = PJ_TRUE;
937
status = pj_activesock_start_connect(tcp->asock, tcp->base.pool, rem_addr,
939
if (status == PJ_SUCCESS) {
940
on_connect_complete(tcp->asock, PJ_SUCCESS);
941
} else if (status != PJ_EPENDING) {
942
tcp_destroy(&tcp->base, status);
946
if (tcp->has_pending_connect) {
947
/* Update (again) local address, just in case local address currently
948
* set is different now that asynchronous connect() is started.
950
addr_len = sizeof(local_addr);
951
if (pj_sock_getsockname(sock, &local_addr, &addr_len)==PJ_SUCCESS) {
952
pj_sockaddr *tp_addr = &tcp->base.local_addr;
954
/* Some systems (like old Win32 perhaps) may not set local address
955
* properly before socket is fully connected.
957
if (pj_sockaddr_cmp(tp_addr, &local_addr) &&
958
pj_sockaddr_get_port(&local_addr) != 0)
960
pj_sockaddr_cp(tp_addr, &local_addr);
961
sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
966
PJ_LOG(4,(tcp->base.obj_name,
967
"TCP transport %.*s:%d is connecting to %.*s:%d...",
968
(int)tcp->base.local_name.host.slen,
969
tcp->base.local_name.host.ptr,
970
tcp->base.local_name.port,
971
(int)tcp->base.remote_name.host.slen,
972
tcp->base.remote_name.host.ptr,
973
tcp->base.remote_name.port));
977
*p_transport = &tcp->base;
984
* This callback is called by active socket when pending accept() operation
987
static pj_bool_t on_accept_complete(pj_activesock_t *asock,
989
const pj_sockaddr_t *src_addr,
992
struct tcp_listener *listener;
993
struct tcp_transport *tcp;
994
char addr[PJ_INET6_ADDRSTRLEN+10];
995
pjsip_tp_state_callback state_cb;
996
pj_sockaddr tmp_src_addr;
999
PJ_UNUSED_ARG(src_addr_len);
1001
listener = (struct tcp_listener*) pj_activesock_get_user_data(asock);
1003
PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_TRUE);
1005
PJ_LOG(4,(listener->factory.obj_name,
1006
"TCP listener %.*s:%d: got incoming TCP connection "
1008
(int)listener->factory.addr_name.host.slen,
1009
listener->factory.addr_name.host.ptr,
1010
listener->factory.addr_name.port,
1011
pj_sockaddr_print(src_addr, addr, sizeof(addr), 3),
1014
/* Apply QoS, if specified */
1015
status = pj_sock_apply_qos2(sock, listener->qos_type,
1016
&listener->qos_params,
1017
2, listener->factory.obj_name,
1018
"incoming SIP TCP socket");
1020
/* tcp_create() expect pj_sockaddr, so copy src_addr to temporary var,
1023
pj_bzero(&tmp_src_addr, sizeof(tmp_src_addr));
1024
pj_sockaddr_cp(&tmp_src_addr, src_addr);
1027
* Incoming connection!
1028
* Create TCP transport for the new socket.
1030
status = tcp_create( listener, NULL, sock, PJ_TRUE,
1031
&listener->factory.local_addr,
1032
&tmp_src_addr, &tcp);
1033
if (status == PJ_SUCCESS) {
1034
status = tcp_start_read(tcp);
1035
if (status != PJ_SUCCESS) {
1036
PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));
1037
tcp_destroy(&tcp->base, status);
1039
/* Start keep-alive timer */
1040
if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
1041
pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0};
1042
pjsip_endpt_schedule_timer(listener->endpt,
1045
tcp->ka_timer.id = PJ_TRUE;
1046
pj_gettimeofday(&tcp->last_activity);
1049
/* Notify application of transport state accepted */
1050
state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
1052
pjsip_transport_state_info state_info;
1054
pj_bzero(&state_info, sizeof(state_info));
1055
(*state_cb)(&tcp->base, PJSIP_TP_STATE_CONNECTED, &state_info);
1065
* Callback from ioqueue when packet is sent.
1067
static pj_bool_t on_data_sent(pj_activesock_t *asock,
1068
pj_ioqueue_op_key_t *op_key,
1069
pj_ssize_t bytes_sent)
1071
struct tcp_transport *tcp = (struct tcp_transport*)
1072
pj_activesock_get_user_data(asock);
1073
pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
1075
/* Note that op_key may be the op_key from keep-alive, thus
1076
* it will not have tdata etc.
1079
tdata_op_key->tdata = NULL;
1081
if (tdata_op_key->callback) {
1083
* Notify sip_transport.c that packet has been sent.
1085
if (bytes_sent == 0)
1086
bytes_sent = -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
1088
tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent);
1090
/* Mark last activity time */
1091
pj_gettimeofday(&tcp->last_activity);
1095
/* Check for error/closure */
1096
if (bytes_sent <= 0) {
1099
PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",
1102
status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) :
1105
tcp_init_shutdown(tcp, status);
1115
* This callback is called by transport manager to send SIP message
1117
static pj_status_t tcp_send_msg(pjsip_transport *transport,
1118
pjsip_tx_data *tdata,
1119
const pj_sockaddr_t *rem_addr,
1122
pjsip_transport_callback callback)
1124
struct tcp_transport *tcp = (struct tcp_transport*)transport;
1126
pj_bool_t delayed = PJ_FALSE;
1127
pj_status_t status = PJ_SUCCESS;
1130
PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
1132
/* Check that there's no pending operation associated with the tdata */
1133
PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
1135
/* Check the address is supported */
1136
PJ_ASSERT_RETURN(rem_addr && (addr_len==sizeof(pj_sockaddr_in) ||
1137
addr_len==sizeof(pj_sockaddr_in6)),
1141
tdata->op_key.tdata = tdata;
1142
tdata->op_key.token = token;
1143
tdata->op_key.callback = callback;
1145
/* If asynchronous connect() has not completed yet, just put the
1146
* transmit data in the pending transmission list since we can not
1147
* use the socket yet.
1149
if (tcp->has_pending_connect) {
1152
* Looks like connect() is still in progress. Check again (this time
1153
* with holding the lock) to be sure.
1155
pj_lock_acquire(tcp->base.lock);
1157
if (tcp->has_pending_connect) {
1158
struct delayed_tdata *delayed_tdata;
1161
* connect() is still in progress. Put the transmit data to
1163
* Starting from #1583 (https://trac.pjsip.org/repos/ticket/1583),
1164
* we also add timeout value for the transmit data. When the
1165
* connect() is completed, the timeout value will be checked to
1166
* determine whether the transmit data needs to be sent.
1168
delayed_tdata = PJ_POOL_ZALLOC_T(tdata->pool,
1169
struct delayed_tdata);
1170
delayed_tdata->tdata_op_key = &tdata->op_key;
1171
if (tdata->msg && tdata->msg->type == PJSIP_REQUEST_MSG) {
1172
pj_gettickcount(&delayed_tdata->timeout);
1173
delayed_tdata->timeout.msec += pjsip_cfg()->tsx.td;
1174
pj_time_val_normalize(&delayed_tdata->timeout);
1177
pj_list_push_back(&tcp->delayed_list, delayed_tdata);
1178
status = PJ_EPENDING;
1180
/* Prevent pj_ioqueue_send() to be called below */
1184
pj_lock_release(tcp->base.lock);
1189
* Transport is ready to go. Send the packet to ioqueue to be
1190
* sent asynchronously.
1192
size = tdata->buf.cur - tdata->buf.start;
1193
status = pj_activesock_send(tcp->asock,
1194
(pj_ioqueue_op_key_t*)&tdata->op_key,
1195
tdata->buf.start, &size, 0);
1197
if (status != PJ_EPENDING) {
1198
/* Not pending (could be immediate success or error) */
1199
tdata->op_key.tdata = NULL;
1201
/* Shutdown transport on closure/errors */
1204
PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",
1207
if (status == PJ_SUCCESS)
1208
status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
1210
tcp_init_shutdown(tcp, status);
1220
* This callback is called by transport manager to shutdown transport.
1222
static pj_status_t tcp_shutdown(pjsip_transport *transport)
1224
struct tcp_transport *tcp = (struct tcp_transport*)transport;
1226
/* Stop keep-alive timer. */
1227
if (tcp->ka_timer.id) {
1228
pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer);
1229
tcp->ka_timer.id = PJ_FALSE;
1237
* Callback from ioqueue that an incoming data is received from the socket.
1239
static pj_bool_t on_data_read(pj_activesock_t *asock,
1243
pj_size_t *remainder)
1245
enum { MAX_IMMEDIATE_PACKET = 10 };
1246
struct tcp_transport *tcp;
1247
pjsip_rx_data *rdata;
1249
PJ_UNUSED_ARG(data);
1251
tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
1252
rdata = &tcp->rdata;
1254
/* Don't do anything if transport is closing. */
1255
if (tcp->is_closing) {
1260
/* Houston, we have packet! Report the packet to transport manager
1263
if (status == PJ_SUCCESS) {
1264
pj_size_t size_eaten;
1266
/* Mark this as an activity */
1267
pj_gettimeofday(&tcp->last_activity);
1269
pj_assert((void*)rdata->pkt_info.packet == data);
1271
/* Init pkt_info part. */
1272
rdata->pkt_info.len = size;
1273
rdata->pkt_info.zero = 0;
1274
pj_gettimeofday(&rdata->pkt_info.timestamp);
1276
/* Report to transport manager.
1277
* The transport manager will tell us how many bytes of the packet
1278
* have been processed (as valid SIP message).
1281
pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
1284
pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len);
1286
/* Move unprocessed data to the front of the buffer */
1287
*remainder = size - size_eaten;
1288
if (*remainder > 0 && *remainder != size) {
1289
pj_memmove(rdata->pkt_info.packet,
1290
rdata->pkt_info.packet + size_eaten,
1296
/* Transport is closed */
1297
PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed"));
1299
tcp_init_shutdown(tcp, status);
1306
pj_pool_reset(rdata->tp_info.pool);
1313
* Callback from ioqueue when asynchronous connect() operation completes.
1315
static pj_bool_t on_connect_complete(pj_activesock_t *asock,
1318
struct tcp_transport *tcp;
1321
pjsip_tp_state_callback state_cb;
1323
tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
1325
/* Mark that pending connect() operation has completed. */
1326
tcp->has_pending_connect = PJ_FALSE;
1328
/* Check connect() status */
1329
if (status != PJ_SUCCESS) {
1331
tcp_perror(tcp->base.obj_name, "TCP connect() error", status);
1333
/* Cancel all delayed transmits */
1334
while (!pj_list_empty(&tcp->delayed_list)) {
1335
struct delayed_tdata *pending_tx;
1336
pj_ioqueue_op_key_t *op_key;
1338
pending_tx = tcp->delayed_list.next;
1339
pj_list_erase(pending_tx);
1341
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
1343
on_data_sent(tcp->asock, op_key, -status);
1346
tcp_init_shutdown(tcp, status);
1350
PJ_LOG(4,(tcp->base.obj_name,
1351
"TCP transport %.*s:%d is connected to %.*s:%d",
1352
(int)tcp->base.local_name.host.slen,
1353
tcp->base.local_name.host.ptr,
1354
tcp->base.local_name.port,
1355
(int)tcp->base.remote_name.host.slen,
1356
tcp->base.remote_name.host.ptr,
1357
tcp->base.remote_name.port));
1360
/* Update (again) local address, just in case local address currently
1361
* set is different now that the socket is connected (could happen
1362
* on some systems, like old Win32 probably?).
1364
addrlen = sizeof(addr);
1365
if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) {
1366
pj_sockaddr *tp_addr = &tcp->base.local_addr;
1368
if (pj_sockaddr_has_addr(&addr) &&
1369
pj_sockaddr_cmp(&addr, tp_addr) != 0)
1371
pj_sockaddr_cp(tp_addr, &addr);
1372
sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
1377
/* Start pending read */
1378
status = tcp_start_read(tcp);
1379
if (status != PJ_SUCCESS) {
1380
tcp_init_shutdown(tcp, status);
1384
/* Notify application of transport state connected */
1385
state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
1387
pjsip_transport_state_info state_info;
1389
pj_bzero(&state_info, sizeof(state_info));
1390
(*state_cb)(&tcp->base, PJSIP_TP_STATE_CONNECTED, &state_info);
1393
/* Flush all pending send operations */
1394
tcp_flush_pending_tx(tcp);
1396
/* Start keep-alive timer */
1397
if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
1398
pj_time_val delay = { PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0 };
1399
pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer,
1401
tcp->ka_timer.id = PJ_TRUE;
1402
pj_gettimeofday(&tcp->last_activity);
1408
/* Transport keep-alive timer callback */
1409
static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e)
1411
struct tcp_transport *tcp = (struct tcp_transport*) e->user_data;
1419
tcp->ka_timer.id = PJ_TRUE;
1421
pj_gettimeofday(&now);
1422
PJ_TIME_VAL_SUB(now, tcp->last_activity);
1424
if (now.sec > 0 && now.sec < PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
1425
/* There has been activity, so don't send keep-alive */
1426
delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL - now.sec;
1429
pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer,
1431
tcp->ka_timer.id = PJ_TRUE;
1435
PJ_LOG(5,(tcp->base.obj_name, "Sending %d byte(s) keep-alive to %.*s:%d",
1436
(int)tcp->ka_pkt.slen, (int)tcp->base.remote_name.host.slen,
1437
tcp->base.remote_name.host.ptr,
1438
tcp->base.remote_name.port));
1441
size = tcp->ka_pkt.slen;
1442
status = pj_activesock_send(tcp->asock, &tcp->ka_op_key.key,
1443
tcp->ka_pkt.ptr, &size, 0);
1445
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
1446
tcp_perror(tcp->base.obj_name,
1447
"Error sending keep-alive packet", status);
1448
tcp_init_shutdown(tcp, status);
1452
/* Register next keep-alive */
1453
delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL;
1456
pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer,
1458
tcp->ka_timer.id = PJ_TRUE;
1462
#endif /* PJ_HAS_TCP */