1
/* $Id: sip_transport_tcp.c 4725 2014-02-04 04:45:37Z ming $ */
3
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
#include <pjsip/sip_transport_tcp.h>
21
#include <pjsip/sip_endpoint.h>
22
#include <pjsip/sip_errno.h>
23
#include <pj/compat/socket.h>
24
#include <pj/addr_resolv.h>
25
#include <pj/activesock.h>
26
#include <pj/assert.h>
31
#include <pj/string.h>
33
/* Only declare the API if PJ_HAS_TCP is true */
34
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
37
#define THIS_FILE "sip_transport_tcp.c"
39
#define MAX_ASYNC_CNT 16
40
#define POOL_LIS_INIT 512
41
#define POOL_LIS_INC 512
42
#define POOL_TP_INIT 512
43
#define POOL_TP_INC 512
50
* This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the
51
* SIP transport factory).
55
pjsip_tpfactory factory;
56
pj_bool_t is_registered;
57
pjsip_endpoint *endpt;
59
pj_activesock_t *asock;
60
pj_sockaddr bound_addr;
62
pj_qos_params qos_params;
67
* This structure is used to keep delayed transmit operation in a list.
68
* A delayed transmission occurs when application sends tx_data when
69
* the TCP connect/establishment is still in progress. These delayed
70
* transmission will be "flushed" once the socket is connected (either
71
* successfully or with errors).
75
PJ_DECL_LIST_MEMBER(struct delayed_tdata);
76
pjsip_tx_data_op_key *tdata_op_key;
82
* This structure describes the TCP transport, and it's descendant of
90
/* Do not save listener instance in the transport, because
91
* listener might be destroyed during transport's lifetime.
92
* See http://trac.pjsip.org/repos/ticket/491
93
struct tcp_listener *listener;
96
pj_bool_t is_registered;
98
pj_status_t close_reason;
100
pj_activesock_t *asock;
101
pj_bool_t has_pending_connect;
103
/* Keep-alive timer. */
104
pj_timer_entry ka_timer;
105
pj_time_val last_activity;
106
pjsip_tx_data_op_key ka_op_key;
109
/* TCP transport can only have one rdata!
110
* Otherwise chunks of incoming PDU may be received on different
115
/* Pending transmission list. */
116
struct delayed_tdata delayed_list;
120
/****************************************************************************
124
/* This callback is called when pending accept() operation completes. */
125
static pj_bool_t on_accept_complete(pj_activesock_t *asock,
127
const pj_sockaddr_t *src_addr,
130
/* This callback is called by transport manager to destroy listener */
131
static pj_status_t lis_destroy(pjsip_tpfactory *factory);
133
/* This callback is called by transport manager to create transport */
134
static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
136
pjsip_endpoint *endpt,
137
const pj_sockaddr *rem_addr,
139
pjsip_transport **transport);
141
/* Common function to create and initialize transport */
142
static pj_status_t tcp_create(struct tcp_listener *listener,
144
pj_sock_t sock, pj_bool_t is_server,
145
const pj_sockaddr *local,
146
const pj_sockaddr *remote,
147
struct tcp_transport **p_tcp);
150
static void tcp_perror(const char *sender, const char *title,
153
char errmsg[PJ_ERR_MSG_SIZE];
155
pj_strerror(status, errmsg, sizeof(errmsg));
157
PJ_LOG(1,(sender, "%s: %s [code=%d]", title, errmsg, status));
161
static void sockaddr_to_host_port( pj_pool_t *pool,
162
pjsip_host_port *host_port,
163
const pj_sockaddr *addr )
165
host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4);
166
pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 0);
167
host_port->host.slen = pj_ansi_strlen(host_port->host.ptr);
168
host_port->port = pj_sockaddr_get_port(addr);
172
static void tcp_init_shutdown(struct tcp_transport *tcp, pj_status_t status)
174
pjsip_tp_state_callback state_cb;
176
if (tcp->close_reason == PJ_SUCCESS)
177
tcp->close_reason = status;
179
if (tcp->base.is_shutdown || tcp->base.is_destroying)
182
/* Prevent immediate transport destroy by application, as transport
183
* state notification callback may be stacked and transport instance
184
* must remain valid at any point in the callback.
186
pjsip_transport_add_ref(&tcp->base);
188
/* Notify application of transport disconnected state */
189
state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
191
pjsip_transport_state_info state_info;
193
pj_bzero(&state_info, sizeof(state_info));
194
state_info.status = tcp->close_reason;
195
(*state_cb)(&tcp->base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
199
if (tcp->base.is_shutdown || tcp->base.is_destroying) {
200
pjsip_transport_dec_ref(&tcp->base);
204
/* We can not destroy the transport since high level objects may
205
* still keep reference to this transport. So we can only
206
* instruct transport manager to gracefully start the shutdown
207
* procedure for this transport.
209
pjsip_transport_shutdown(&tcp->base);
211
/* Now, it is ok to destroy the transport. */
212
pjsip_transport_dec_ref(&tcp->base);
217
* Initialize pjsip_tcp_transport_cfg structure with default values.
219
PJ_DEF(void) pjsip_tcp_transport_cfg_default(pjsip_tcp_transport_cfg *cfg,
222
pj_bzero(cfg, sizeof(*cfg));
224
pj_sockaddr_init(cfg->af, &cfg->bind_addr, NULL, 0);
226
cfg->reuse_addr = PJSIP_TCP_TRANSPORT_REUSEADDR;
230
/****************************************************************************
231
* The TCP listener/transport factory.
235
* This is the public API to create, initialize, register, and start the
238
PJ_DEF(pj_status_t) pjsip_tcp_transport_start3(
239
pjsip_endpoint *endpt,
240
const pjsip_tcp_transport_cfg *cfg,
241
pjsip_tpfactory **p_factory
245
pj_sock_t sock = PJ_INVALID_SOCKET;
246
struct tcp_listener *listener;
247
pj_activesock_cfg asock_cfg;
248
pj_activesock_cb listener_cb;
249
pj_sockaddr *listener_addr;
254
PJ_ASSERT_RETURN(endpt && cfg->async_cnt, PJ_EINVAL);
256
/* Verify that address given in a_name (if any) is valid */
257
if (cfg->addr_name.host.slen) {
260
status = pj_sockaddr_init(cfg->af, &tmp, &cfg->addr_name.host,
261
(pj_uint16_t)cfg->addr_name.port);
262
if (status != PJ_SUCCESS || !pj_sockaddr_has_addr(&tmp) ||
263
(cfg->af==pj_AF_INET() &&
264
tmp.ipv4.sin_addr.s_addr==PJ_INADDR_NONE))
266
/* Invalid address */
271
pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_LIS_INIT,
273
PJ_ASSERT_RETURN(pool, PJ_ENOMEM);
276
listener = PJ_POOL_ZALLOC_T(pool, struct tcp_listener);
277
listener->factory.pool = pool;
278
listener->factory.type = cfg->af==pj_AF_INET() ? PJSIP_TRANSPORT_TCP :
279
PJSIP_TRANSPORT_TCP6;
280
listener->factory.type_name = (char*)
281
pjsip_transport_get_type_name(listener->factory.type);
282
listener->factory.flag =
283
pjsip_transport_get_flag_from_type(listener->factory.type);
284
listener->qos_type = cfg->qos_type;
285
pj_memcpy(&listener->qos_params, &cfg->qos_params,
286
sizeof(cfg->qos_params));
288
pj_ansi_strcpy(listener->factory.obj_name, "tcplis");
289
if (listener->factory.type==PJSIP_TRANSPORT_TCP6)
290
pj_ansi_strcat(listener->factory.obj_name, "6");
292
status = pj_lock_create_recursive_mutex(pool, listener->factory.obj_name,
293
&listener->factory.lock);
294
if (status != PJ_SUCCESS)
299
status = pj_sock_socket(cfg->af, pj_SOCK_STREAM(), 0, &sock);
300
if (status != PJ_SUCCESS)
303
/* Apply QoS, if specified */
304
status = pj_sock_apply_qos2(sock, cfg->qos_type, &cfg->qos_params,
305
2, listener->factory.obj_name,
306
"SIP TCP listener socket");
308
/* Apply SO_REUSEADDR */
309
if (cfg->reuse_addr) {
311
status = pj_sock_setsockopt(sock, pj_SOL_SOCKET(), pj_SO_REUSEADDR(),
312
&enabled, sizeof(enabled));
313
if (status != PJ_SUCCESS) {
314
PJ_PERROR(4,(listener->factory.obj_name, status,
315
"Warning: error applying SO_REUSEADDR"));
319
/* Bind address may be different than factory.local_addr because
320
* factory.local_addr will be resolved below.
322
pj_sockaddr_cp(&listener->bound_addr, &cfg->bind_addr);
325
listener_addr = &listener->factory.local_addr;
326
pj_sockaddr_cp(listener_addr, &cfg->bind_addr);
328
status = pj_sock_bind(sock, listener_addr,
329
pj_sockaddr_get_len(listener_addr));
330
if (status != PJ_SUCCESS)
333
/* Retrieve the bound address */
334
addr_len = pj_sockaddr_get_len(listener_addr);
335
status = pj_sock_getsockname(sock, listener_addr, &addr_len);
336
if (status != PJ_SUCCESS)
339
/* If published host/IP is specified, then use that address as the
340
* listener advertised address.
342
if (cfg->addr_name.host.slen) {
343
/* Copy the address */
344
listener->factory.addr_name = cfg->addr_name;
345
pj_strdup(listener->factory.pool, &listener->factory.addr_name.host,
346
&cfg->addr_name.host);
347
listener->factory.addr_name.port = cfg->addr_name.port;
350
/* No published address is given, use the bound address */
352
/* If the address returns 0.0.0.0, use the default
353
* interface address as the transport's address.
355
if (!pj_sockaddr_has_addr(listener_addr)) {
358
status = pj_gethostip(listener->bound_addr.addr.sa_family,
360
if (status != PJ_SUCCESS)
363
pj_sockaddr_copy_addr(listener_addr, &hostip);
366
/* Save the address name */
367
sockaddr_to_host_port(listener->factory.pool,
368
&listener->factory.addr_name,
372
/* If port is zero, get the bound port */
373
if (listener->factory.addr_name.port == 0) {
374
listener->factory.addr_name.port = pj_sockaddr_get_port(listener_addr);
377
pj_ansi_snprintf(listener->factory.obj_name,
378
sizeof(listener->factory.obj_name),
379
"tcplis:%d", listener->factory.addr_name.port);
382
/* Start listening to the address */
383
status = pj_sock_listen(sock, PJSIP_TCP_TRANSPORT_BACKLOG);
384
if (status != PJ_SUCCESS)
388
/* Create active socket */
389
pj_activesock_cfg_default(&asock_cfg);
390
if (cfg->async_cnt > MAX_ASYNC_CNT)
391
asock_cfg.async_cnt = MAX_ASYNC_CNT;
393
asock_cfg.async_cnt = cfg->async_cnt;
395
pj_bzero(&listener_cb, sizeof(listener_cb));
396
listener_cb.on_accept_complete = &on_accept_complete;
397
status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
398
pjsip_endpt_get_ioqueue(endpt),
399
&listener_cb, listener,
402
/* Register to transport manager */
403
listener->endpt = endpt;
404
listener->tpmgr = pjsip_endpt_get_tpmgr(endpt);
405
listener->factory.create_transport = lis_create_transport;
406
listener->factory.destroy = lis_destroy;
407
listener->is_registered = PJ_TRUE;
408
status = pjsip_tpmgr_register_tpfactory(listener->tpmgr,
410
if (status != PJ_SUCCESS) {
411
listener->is_registered = PJ_FALSE;
415
/* Start pending accept() operations */
416
status = pj_activesock_start_accept(listener->asock, pool);
417
if (status != PJ_SUCCESS)
420
PJ_LOG(4,(listener->factory.obj_name,
421
"SIP TCP listener ready for incoming connections at %.*s:%d",
422
(int)listener->factory.addr_name.host.slen,
423
listener->factory.addr_name.host.ptr,
424
listener->factory.addr_name.port));
426
/* Return the pointer to user */
427
if (p_factory) *p_factory = &listener->factory;
432
if (listener->asock==NULL && sock!=PJ_INVALID_SOCKET)
434
lis_destroy(&listener->factory);
440
* This is the public API to create, initialize, register, and start the
443
PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
444
const pj_sockaddr_in *local,
445
const pjsip_host_port *a_name,
447
pjsip_tpfactory **p_factory)
449
pjsip_tcp_transport_cfg cfg;
451
pjsip_tcp_transport_cfg_default(&cfg, pj_AF_INET());
454
pj_sockaddr_cp(&cfg.bind_addr, local);
456
pj_sockaddr_init(cfg.af, &cfg.bind_addr, NULL, 0);
459
pj_memcpy(&cfg.addr_name, a_name, sizeof(*a_name));
462
cfg.async_cnt = async_cnt;
464
return pjsip_tcp_transport_start3(endpt, &cfg, p_factory);
469
* This is the public API to create, initialize, register, and start the
472
PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
473
const pj_sockaddr_in *local,
475
pjsip_tpfactory **p_factory)
477
return pjsip_tcp_transport_start2(endpt, local, NULL, async_cnt, p_factory);
481
/* This callback is called by transport manager to destroy listener */
482
static pj_status_t lis_destroy(pjsip_tpfactory *factory)
484
struct tcp_listener *listener = (struct tcp_listener *)factory;
486
if (listener->is_registered) {
487
pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory);
488
listener->is_registered = PJ_FALSE;
491
if (listener->asock) {
492
pj_activesock_close(listener->asock);
493
listener->asock = NULL;
496
if (listener->factory.lock) {
497
pj_lock_destroy(listener->factory.lock);
498
listener->factory.lock = NULL;
501
if (listener->factory.pool) {
502
pj_pool_t *pool = listener->factory.pool;
504
PJ_LOG(4,(listener->factory.obj_name, "SIP TCP listener destroyed"));
506
listener->factory.pool = NULL;
507
pj_pool_release(pool);
514
/***************************************************************************/
522
/* Called by transport manager to send message */
523
static pj_status_t tcp_send_msg(pjsip_transport *transport,
524
pjsip_tx_data *tdata,
525
const pj_sockaddr_t *rem_addr,
528
pjsip_transport_callback callback);
530
/* Called by transport manager to shutdown */
531
static pj_status_t tcp_shutdown(pjsip_transport *transport);
533
/* Called by transport manager to destroy transport */
534
static pj_status_t tcp_destroy_transport(pjsip_transport *transport);
536
/* Utility to destroy transport */
537
static pj_status_t tcp_destroy(pjsip_transport *transport,
540
/* Callback on incoming data */
541
static pj_bool_t on_data_read(pj_activesock_t *asock,
545
pj_size_t *remainder);
547
/* Callback when packet is sent */
548
static pj_bool_t on_data_sent(pj_activesock_t *asock,
549
pj_ioqueue_op_key_t *send_key,
552
/* Callback when connect completes */
553
static pj_bool_t on_connect_complete(pj_activesock_t *asock,
556
/* TCP keep-alive timer callback */
557
static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e);
560
* Common function to create TCP transport, called when pending accept() and
561
* pending connect() complete.
563
static pj_status_t tcp_create( struct tcp_listener *listener,
565
pj_sock_t sock, pj_bool_t is_server,
566
const pj_sockaddr *local,
567
const pj_sockaddr *remote,
568
struct tcp_transport **p_tcp)
570
struct tcp_transport *tcp;
571
pj_ioqueue_t *ioqueue;
572
pj_activesock_cfg asock_cfg;
573
pj_activesock_cb tcp_callback;
574
const pj_str_t ka_pkt = PJSIP_TCP_KEEP_ALIVE_DATA;
575
char print_addr[PJ_INET6_ADDRSTRLEN+10];
579
PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL);
583
pool = pjsip_endpt_create_pool(listener->endpt, "tcp",
584
POOL_TP_INIT, POOL_TP_INC);
585
PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM);
589
* Create and initialize basic transport structure.
591
tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport);
592
tcp->is_server = is_server;
594
/*tcp->listener = listener;*/
595
pj_list_init(&tcp->delayed_list);
596
tcp->base.pool = pool;
598
pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME,
599
(is_server ? "tcps%p" :"tcpc%p"), tcp);
601
status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt);
602
if (status != PJ_SUCCESS) {
606
status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock);
607
if (status != PJ_SUCCESS) {
611
tcp->base.key.type = listener->factory.type;
612
pj_sockaddr_cp(&tcp->base.key.rem_addr, remote);
613
tcp->base.type_name = (char*)pjsip_transport_get_type_name(
614
(pjsip_transport_type_e)tcp->base.key.type);
615
tcp->base.flag = pjsip_transport_get_flag_from_type(
616
(pjsip_transport_type_e)tcp->base.key.type);
618
tcp->base.info = (char*) pj_pool_alloc(pool, 64);
619
pj_ansi_snprintf(tcp->base.info, 64, "%s to %s",
621
pj_sockaddr_print(remote, print_addr,
622
sizeof(print_addr), 3));
624
tcp->base.addr_len = pj_sockaddr_get_len(remote);
625
pj_sockaddr_cp(&tcp->base.local_addr, local);
626
sockaddr_to_host_port(pool, &tcp->base.local_name, local);
627
sockaddr_to_host_port(pool, &tcp->base.remote_name, remote);
628
tcp->base.dir = is_server? PJSIP_TP_DIR_INCOMING : PJSIP_TP_DIR_OUTGOING;
630
tcp->base.endpt = listener->endpt;
631
tcp->base.tpmgr = listener->tpmgr;
632
tcp->base.send_msg = &tcp_send_msg;
633
tcp->base.do_shutdown = &tcp_shutdown;
634
tcp->base.destroy = &tcp_destroy_transport;
636
/* Create active socket */
637
pj_activesock_cfg_default(&asock_cfg);
638
asock_cfg.async_cnt = 1;
640
pj_bzero(&tcp_callback, sizeof(tcp_callback));
641
tcp_callback.on_data_read = &on_data_read;
642
tcp_callback.on_data_sent = &on_data_sent;
643
tcp_callback.on_connect_complete = &on_connect_complete;
645
ioqueue = pjsip_endpt_get_ioqueue(listener->endpt);
646
status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
647
ioqueue, &tcp_callback, tcp, &tcp->asock);
648
if (status != PJ_SUCCESS) {
652
/* Register transport to transport manager */
653
status = pjsip_transport_register(listener->tpmgr, &tcp->base);
654
if (status != PJ_SUCCESS) {
658
tcp->is_registered = PJ_TRUE;
660
/* Initialize keep-alive timer */
661
tcp->ka_timer.user_data = (void*)tcp;
662
tcp->ka_timer.cb = &tcp_keep_alive_timer;
663
pj_ioqueue_op_key_init(&tcp->ka_op_key.key, sizeof(pj_ioqueue_op_key_t));
664
pj_strdup(tcp->base.pool, &tcp->ka_pkt, &ka_pkt);
666
/* Done setting up basic transport. */
669
PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created",
670
(tcp->is_server ? "server" : "client")));
675
tcp_destroy(&tcp->base, status);
680
/* Flush all delayed transmision once the socket is connected. */
681
static void tcp_flush_pending_tx(struct tcp_transport *tcp)
685
pj_gettickcount(&now);
686
pj_lock_acquire(tcp->base.lock);
687
while (!pj_list_empty(&tcp->delayed_list)) {
688
struct delayed_tdata *pending_tx;
689
pjsip_tx_data *tdata;
690
pj_ioqueue_op_key_t *op_key;
694
pending_tx = tcp->delayed_list.next;
695
pj_list_erase(pending_tx);
697
tdata = pending_tx->tdata_op_key->tdata;
698
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
700
if (pending_tx->timeout.sec > 0 &&
701
PJ_TIME_VAL_GT(now, pending_tx->timeout))
707
size = tdata->buf.cur - tdata->buf.start;
708
status = pj_activesock_send(tcp->asock, op_key, tdata->buf.start,
710
if (status != PJ_EPENDING) {
711
pj_lock_release(tcp->base.lock);
712
on_data_sent(tcp->asock, op_key, size);
713
pj_lock_acquire(tcp->base.lock);
717
pj_lock_release(tcp->base.lock);
721
/* Called by transport manager to destroy transport */
722
static pj_status_t tcp_destroy_transport(pjsip_transport *transport)
724
struct tcp_transport *tcp = (struct tcp_transport*)transport;
726
/* Transport would have been unregistered by now since this callback
727
* is called by transport manager.
729
tcp->is_registered = PJ_FALSE;
731
return tcp_destroy(transport, tcp->close_reason);
735
/* Destroy TCP transport */
736
static pj_status_t tcp_destroy(pjsip_transport *transport,
739
struct tcp_transport *tcp = (struct tcp_transport*)transport;
741
if (tcp->close_reason == 0)
742
tcp->close_reason = reason;
744
if (tcp->is_registered) {
745
tcp->is_registered = PJ_FALSE;
746
pjsip_transport_destroy(transport);
748
/* pjsip_transport_destroy will recursively call this function
754
/* Mark transport as closing */
755
tcp->is_closing = PJ_TRUE;
757
/* Stop keep-alive timer. */
758
if (tcp->ka_timer.id) {
759
pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer);
760
tcp->ka_timer.id = PJ_FALSE;
763
/* Cancel all delayed transmits */
764
while (!pj_list_empty(&tcp->delayed_list)) {
765
struct delayed_tdata *pending_tx;
766
pj_ioqueue_op_key_t *op_key;
768
pending_tx = tcp->delayed_list.next;
769
pj_list_erase(pending_tx);
771
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
773
on_data_sent(tcp->asock, op_key, -reason);
776
if (tcp->rdata.tp_info.pool) {
777
pj_pool_release(tcp->rdata.tp_info.pool);
778
tcp->rdata.tp_info.pool = NULL;
782
pj_activesock_close(tcp->asock);
784
tcp->sock = PJ_INVALID_SOCKET;
785
} else if (tcp->sock != PJ_INVALID_SOCKET) {
786
pj_sock_close(tcp->sock);
787
tcp->sock = PJ_INVALID_SOCKET;
790
if (tcp->base.lock) {
791
pj_lock_destroy(tcp->base.lock);
792
tcp->base.lock = NULL;
795
if (tcp->base.ref_cnt) {
796
pj_atomic_destroy(tcp->base.ref_cnt);
797
tcp->base.ref_cnt = NULL;
800
if (tcp->base.pool) {
803
if (reason != PJ_SUCCESS) {
804
char errmsg[PJ_ERR_MSG_SIZE];
806
pj_strerror(reason, errmsg, sizeof(errmsg));
807
PJ_LOG(4,(tcp->base.obj_name,
808
"TCP transport destroyed with reason %d: %s",
813
PJ_LOG(4,(tcp->base.obj_name,
814
"TCP transport destroyed normally"));
818
pool = tcp->base.pool;
819
tcp->base.pool = NULL;
820
pj_pool_release(pool);
828
* This utility function creates receive data buffers and start
829
* asynchronous recv() operations from the socket. It is called after
830
* accept() or connect() operation complete.
832
static pj_status_t tcp_start_read(struct tcp_transport *tcp)
836
pj_sockaddr *rem_addr;
841
pool = pjsip_endpt_create_pool(tcp->base.endpt,
843
PJSIP_POOL_RDATA_LEN,
844
PJSIP_POOL_RDATA_INC);
846
tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM);
850
tcp->rdata.tp_info.pool = pool;
852
tcp->rdata.tp_info.transport = &tcp->base;
853
tcp->rdata.tp_info.tp_data = tcp;
854
tcp->rdata.tp_info.op_key.rdata = &tcp->rdata;
855
pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key,
856
sizeof(pj_ioqueue_op_key_t));
858
tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr;
859
tcp->rdata.pkt_info.src_addr_len = sizeof(tcp->rdata.pkt_info.src_addr);
860
rem_addr = &tcp->base.key.rem_addr;
861
pj_sockaddr_print(rem_addr, tcp->rdata.pkt_info.src_name,
862
sizeof(tcp->rdata.pkt_info.src_name), 0);
863
tcp->rdata.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
865
size = sizeof(tcp->rdata.pkt_info.packet);
866
readbuf[0] = tcp->rdata.pkt_info.packet;
867
status = pj_activesock_start_read2(tcp->asock, tcp->base.pool, size,
869
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
870
PJ_LOG(4, (tcp->base.obj_name,
871
"pj_activesock_start_read() error, status=%d",
880
/* This callback is called by transport manager for the TCP factory
881
* to create outgoing transport to the specified destination.
883
static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
885
pjsip_endpoint *endpt,
886
const pj_sockaddr *rem_addr,
888
pjsip_transport **p_transport)
890
struct tcp_listener *listener;
891
struct tcp_transport *tcp;
893
pj_sockaddr local_addr;
897
PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr &&
898
addr_len && p_transport, PJ_EINVAL);
900
/* Check that address is a sockaddr_in or sockaddr_in6*/
901
PJ_ASSERT_RETURN((rem_addr->addr.sa_family == pj_AF_INET() &&
902
addr_len == sizeof(pj_sockaddr_in)) ||
903
(rem_addr->addr.sa_family == pj_AF_INET6() &&
904
addr_len == sizeof(pj_sockaddr_in6)), PJ_EINVAL);
907
listener = (struct tcp_listener*)factory;
910
status = pj_sock_socket(rem_addr->addr.sa_family, pj_SOCK_STREAM(),
912
if (status != PJ_SUCCESS)
915
/* Apply QoS, if specified */
916
status = pj_sock_apply_qos2(sock, listener->qos_type,
917
&listener->qos_params,
918
2, listener->factory.obj_name,
919
"outgoing SIP TCP socket");
921
/* Bind to listener's address and any port */
922
pj_bzero(&local_addr, sizeof(local_addr));
923
pj_sockaddr_cp(&local_addr, &listener->bound_addr);
924
pj_sockaddr_set_port(&local_addr, 0);
926
status = pj_sock_bind(sock, &local_addr,
927
pj_sockaddr_get_len(&local_addr));
928
if (status != PJ_SUCCESS) {
933
/* Get the local port */
934
addr_len = sizeof(local_addr);
935
status = pj_sock_getsockname(sock, &local_addr, &addr_len);
936
if (status != PJ_SUCCESS) {
941
/* Initially set the address from the listener's address */
942
if (!pj_sockaddr_has_addr(&local_addr)) {
943
pj_sockaddr_copy_addr(&local_addr, &listener->factory.local_addr);
946
/* Create the transport descriptor */
947
status = tcp_create(listener, NULL, sock, PJ_FALSE, &local_addr,
949
if (status != PJ_SUCCESS)
953
/* Start asynchronous connect() operation */
954
tcp->has_pending_connect = PJ_TRUE;
955
status = pj_activesock_start_connect(tcp->asock, tcp->base.pool, rem_addr,
957
if (status == PJ_SUCCESS) {
958
on_connect_complete(tcp->asock, PJ_SUCCESS);
959
} else if (status != PJ_EPENDING) {
960
tcp_destroy(&tcp->base, status);
964
if (tcp->has_pending_connect) {
965
/* Update (again) local address, just in case local address currently
966
* set is different now that asynchronous connect() is started.
968
addr_len = sizeof(local_addr);
969
if (pj_sock_getsockname(sock, &local_addr, &addr_len)==PJ_SUCCESS) {
970
pj_sockaddr *tp_addr = &tcp->base.local_addr;
972
/* Some systems (like old Win32 perhaps) may not set local address
973
* properly before socket is fully connected.
975
if (pj_sockaddr_cmp(tp_addr, &local_addr) &&
976
pj_sockaddr_has_addr(&local_addr) &&
977
pj_sockaddr_get_port(&local_addr) != 0)
979
pj_sockaddr_cp(tp_addr, &local_addr);
980
sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
985
PJ_LOG(4,(tcp->base.obj_name,
986
"TCP transport %.*s:%d is connecting to %.*s:%d...",
987
(int)tcp->base.local_name.host.slen,
988
tcp->base.local_name.host.ptr,
989
tcp->base.local_name.port,
990
(int)tcp->base.remote_name.host.slen,
991
tcp->base.remote_name.host.ptr,
992
tcp->base.remote_name.port));
996
*p_transport = &tcp->base;
1003
* This callback is called by active socket when pending accept() operation
1006
static pj_bool_t on_accept_complete(pj_activesock_t *asock,
1008
const pj_sockaddr_t *src_addr,
1011
struct tcp_listener *listener;
1012
struct tcp_transport *tcp;
1013
char addr[PJ_INET6_ADDRSTRLEN+10];
1014
pjsip_tp_state_callback state_cb;
1015
pj_sockaddr tmp_src_addr;
1018
PJ_UNUSED_ARG(src_addr_len);
1020
listener = (struct tcp_listener*) pj_activesock_get_user_data(asock);
1022
PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_TRUE);
1024
PJ_LOG(4,(listener->factory.obj_name,
1025
"TCP listener %.*s:%d: got incoming TCP connection "
1027
(int)listener->factory.addr_name.host.slen,
1028
listener->factory.addr_name.host.ptr,
1029
listener->factory.addr_name.port,
1030
pj_sockaddr_print(src_addr, addr, sizeof(addr), 3),
1033
/* Apply QoS, if specified */
1034
status = pj_sock_apply_qos2(sock, listener->qos_type,
1035
&listener->qos_params,
1036
2, listener->factory.obj_name,
1037
"incoming SIP TCP socket");
1039
/* tcp_create() expect pj_sockaddr, so copy src_addr to temporary var,
1042
pj_bzero(&tmp_src_addr, sizeof(tmp_src_addr));
1043
pj_sockaddr_cp(&tmp_src_addr, src_addr);
1046
* Incoming connection!
1047
* Create TCP transport for the new socket.
1049
status = tcp_create( listener, NULL, sock, PJ_TRUE,
1050
&listener->factory.local_addr,
1051
&tmp_src_addr, &tcp);
1052
if (status == PJ_SUCCESS) {
1053
status = tcp_start_read(tcp);
1054
if (status != PJ_SUCCESS) {
1055
PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));
1056
tcp_destroy(&tcp->base, status);
1058
/* Start keep-alive timer */
1059
if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
1060
pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0};
1061
pjsip_endpt_schedule_timer(listener->endpt,
1064
tcp->ka_timer.id = PJ_TRUE;
1065
pj_gettimeofday(&tcp->last_activity);
1068
/* Notify application of transport state accepted */
1069
state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
1071
pjsip_transport_state_info state_info;
1073
pj_bzero(&state_info, sizeof(state_info));
1074
(*state_cb)(&tcp->base, PJSIP_TP_STATE_CONNECTED, &state_info);
1084
* Callback from ioqueue when packet is sent.
1086
static pj_bool_t on_data_sent(pj_activesock_t *asock,
1087
pj_ioqueue_op_key_t *op_key,
1088
pj_ssize_t bytes_sent)
1090
struct tcp_transport *tcp = (struct tcp_transport*)
1091
pj_activesock_get_user_data(asock);
1092
pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
1094
/* Note that op_key may be the op_key from keep-alive, thus
1095
* it will not have tdata etc.
1098
tdata_op_key->tdata = NULL;
1100
if (tdata_op_key->callback) {
1102
* Notify sip_transport.c that packet has been sent.
1104
if (bytes_sent == 0)
1105
bytes_sent = -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
1107
tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent);
1109
/* Mark last activity time */
1110
pj_gettimeofday(&tcp->last_activity);
1114
/* Check for error/closure */
1115
if (bytes_sent <= 0) {
1118
PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",
1121
status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) :
1122
(pj_status_t)-bytes_sent;
1124
tcp_init_shutdown(tcp, status);
1134
* This callback is called by transport manager to send SIP message
1136
static pj_status_t tcp_send_msg(pjsip_transport *transport,
1137
pjsip_tx_data *tdata,
1138
const pj_sockaddr_t *rem_addr,
1141
pjsip_transport_callback callback)
1143
struct tcp_transport *tcp = (struct tcp_transport*)transport;
1145
pj_bool_t delayed = PJ_FALSE;
1146
pj_status_t status = PJ_SUCCESS;
1149
PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
1151
/* Check that there's no pending operation associated with the tdata */
1152
PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
1154
/* Check the address is supported */
1155
PJ_ASSERT_RETURN(rem_addr && (addr_len==sizeof(pj_sockaddr_in) ||
1156
addr_len==sizeof(pj_sockaddr_in6)),
1160
tdata->op_key.tdata = tdata;
1161
tdata->op_key.token = token;
1162
tdata->op_key.callback = callback;
1164
/* If asynchronous connect() has not completed yet, just put the
1165
* transmit data in the pending transmission list since we can not
1166
* use the socket yet.
1168
if (tcp->has_pending_connect) {
1171
* Looks like connect() is still in progress. Check again (this time
1172
* with holding the lock) to be sure.
1174
pj_lock_acquire(tcp->base.lock);
1176
if (tcp->has_pending_connect) {
1177
struct delayed_tdata *delayed_tdata;
1180
* connect() is still in progress. Put the transmit data to
1182
* Starting from #1583 (https://trac.pjsip.org/repos/ticket/1583),
1183
* we also add timeout value for the transmit data. When the
1184
* connect() is completed, the timeout value will be checked to
1185
* determine whether the transmit data needs to be sent.
1187
delayed_tdata = PJ_POOL_ZALLOC_T(tdata->pool,
1188
struct delayed_tdata);
1189
delayed_tdata->tdata_op_key = &tdata->op_key;
1190
if (tdata->msg && tdata->msg->type == PJSIP_REQUEST_MSG) {
1191
pj_gettickcount(&delayed_tdata->timeout);
1192
delayed_tdata->timeout.msec += pjsip_cfg()->tsx.td;
1193
pj_time_val_normalize(&delayed_tdata->timeout);
1196
pj_list_push_back(&tcp->delayed_list, delayed_tdata);
1197
status = PJ_EPENDING;
1199
/* Prevent pj_ioqueue_send() to be called below */
1203
pj_lock_release(tcp->base.lock);
1208
* Transport is ready to go. Send the packet to ioqueue to be
1209
* sent asynchronously.
1211
size = tdata->buf.cur - tdata->buf.start;
1212
status = pj_activesock_send(tcp->asock,
1213
(pj_ioqueue_op_key_t*)&tdata->op_key,
1214
tdata->buf.start, &size, 0);
1216
if (status != PJ_EPENDING) {
1217
/* Not pending (could be immediate success or error) */
1218
tdata->op_key.tdata = NULL;
1220
/* Shutdown transport on closure/errors */
1223
PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",
1226
if (status == PJ_SUCCESS)
1227
status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
1229
tcp_init_shutdown(tcp, status);
1239
* This callback is called by transport manager to shutdown transport.
1241
static pj_status_t tcp_shutdown(pjsip_transport *transport)
1243
struct tcp_transport *tcp = (struct tcp_transport*)transport;
1245
/* Stop keep-alive timer. */
1246
if (tcp->ka_timer.id) {
1247
pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer);
1248
tcp->ka_timer.id = PJ_FALSE;
1256
* Callback from ioqueue that an incoming data is received from the socket.
1258
static pj_bool_t on_data_read(pj_activesock_t *asock,
1262
pj_size_t *remainder)
1264
enum { MAX_IMMEDIATE_PACKET = 10 };
1265
struct tcp_transport *tcp;
1266
pjsip_rx_data *rdata;
1268
PJ_UNUSED_ARG(data);
1270
tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
1271
rdata = &tcp->rdata;
1273
/* Don't do anything if transport is closing. */
1274
if (tcp->is_closing) {
1279
/* Houston, we have packet! Report the packet to transport manager
1282
if (status == PJ_SUCCESS) {
1283
pj_size_t size_eaten;
1285
/* Mark this as an activity */
1286
pj_gettimeofday(&tcp->last_activity);
1288
pj_assert((void*)rdata->pkt_info.packet == data);
1290
/* Init pkt_info part. */
1291
rdata->pkt_info.len = size;
1292
rdata->pkt_info.zero = 0;
1293
pj_gettimeofday(&rdata->pkt_info.timestamp);
1295
/* Report to transport manager.
1296
* The transport manager will tell us how many bytes of the packet
1297
* have been processed (as valid SIP message).
1300
pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
1303
pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len);
1305
/* Move unprocessed data to the front of the buffer */
1306
*remainder = size - size_eaten;
1307
if (*remainder > 0 && *remainder != size) {
1308
pj_memmove(rdata->pkt_info.packet,
1309
rdata->pkt_info.packet + size_eaten,
1315
/* Transport is closed */
1316
PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed"));
1318
tcp_init_shutdown(tcp, status);
1325
pj_pool_reset(rdata->tp_info.pool);
1332
* Callback from ioqueue when asynchronous connect() operation completes.
1334
static pj_bool_t on_connect_complete(pj_activesock_t *asock,
1337
struct tcp_transport *tcp;
1340
pjsip_tp_state_callback state_cb;
1342
tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
1344
/* Mark that pending connect() operation has completed. */
1345
tcp->has_pending_connect = PJ_FALSE;
1347
/* Check connect() status */
1348
if (status != PJ_SUCCESS) {
1350
tcp_perror(tcp->base.obj_name, "TCP connect() error", status);
1352
/* Cancel all delayed transmits */
1353
while (!pj_list_empty(&tcp->delayed_list)) {
1354
struct delayed_tdata *pending_tx;
1355
pj_ioqueue_op_key_t *op_key;
1357
pending_tx = tcp->delayed_list.next;
1358
pj_list_erase(pending_tx);
1360
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
1362
on_data_sent(tcp->asock, op_key, -status);
1365
tcp_init_shutdown(tcp, status);
1369
PJ_LOG(4,(tcp->base.obj_name,
1370
"TCP transport %.*s:%d is connected to %.*s:%d",
1371
(int)tcp->base.local_name.host.slen,
1372
tcp->base.local_name.host.ptr,
1373
tcp->base.local_name.port,
1374
(int)tcp->base.remote_name.host.slen,
1375
tcp->base.remote_name.host.ptr,
1376
tcp->base.remote_name.port));
1379
/* Update (again) local address, just in case local address currently
1380
* set is different now that the socket is connected (could happen
1381
* on some systems, like old Win32 probably?).
1383
addrlen = sizeof(addr);
1384
if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) {
1385
pj_sockaddr *tp_addr = &tcp->base.local_addr;
1387
if (pj_sockaddr_has_addr(&addr) &&
1388
pj_sockaddr_cmp(&addr, tp_addr) != 0)
1390
pj_sockaddr_cp(tp_addr, &addr);
1391
sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
1396
/* Start pending read */
1397
status = tcp_start_read(tcp);
1398
if (status != PJ_SUCCESS) {
1399
tcp_init_shutdown(tcp, status);
1403
/* Notify application of transport state connected */
1404
state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
1406
pjsip_transport_state_info state_info;
1408
pj_bzero(&state_info, sizeof(state_info));
1409
(*state_cb)(&tcp->base, PJSIP_TP_STATE_CONNECTED, &state_info);
1412
/* Flush all pending send operations */
1413
tcp_flush_pending_tx(tcp);
1415
/* Start keep-alive timer */
1416
if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
1417
pj_time_val delay = { PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0 };
1418
pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer,
1420
tcp->ka_timer.id = PJ_TRUE;
1421
pj_gettimeofday(&tcp->last_activity);
1427
/* Transport keep-alive timer callback */
1428
static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e)
1430
struct tcp_transport *tcp = (struct tcp_transport*) e->user_data;
1438
tcp->ka_timer.id = PJ_TRUE;
1440
pj_gettimeofday(&now);
1441
PJ_TIME_VAL_SUB(now, tcp->last_activity);
1443
if (now.sec > 0 && now.sec < PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
1444
/* There has been activity, so don't send keep-alive */
1445
delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL - now.sec;
1448
pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer,
1450
tcp->ka_timer.id = PJ_TRUE;
1454
PJ_LOG(5,(tcp->base.obj_name, "Sending %d byte(s) keep-alive to %.*s:%d",
1455
(int)tcp->ka_pkt.slen, (int)tcp->base.remote_name.host.slen,
1456
tcp->base.remote_name.host.ptr,
1457
tcp->base.remote_name.port));
1460
size = tcp->ka_pkt.slen;
1461
status = pj_activesock_send(tcp->asock, &tcp->ka_op_key.key,
1462
tcp->ka_pkt.ptr, &size, 0);
1464
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
1465
tcp_perror(tcp->base.obj_name,
1466
"Error sending keep-alive packet", status);
1467
tcp_init_shutdown(tcp, status);
1471
/* Register next keep-alive */
1472
delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL;
1475
pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer,
1477
tcp->ka_timer.id = PJ_TRUE;
1481
#endif /* PJ_HAS_TCP */