~ubuntu-branches/ubuntu/wily/sflphone/wily

« back to all changes in this revision

Viewing changes to daemon/libs/pjproject-2.2.1/pjsip/src/pjsip/sip_transport_tcp.c

  • Committer: Package Import Robot
  • Author(s): Jonathan Riddell
  • Date: 2015-01-07 14:51:16 UTC
  • mfrom: (4.3.5 sid)
  • Revision ID: package-import@ubuntu.com-20150107145116-yxnafinf4lrdvrmx
Tags: 1.4.1-0.1ubuntu1
* Merge with Debian, remaining changes:
 - Drop soprano, nepomuk build-dep
* Drop ubuntu patches, now upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* $Id: sip_transport_tcp.c 4725 2014-02-04 04:45:37Z ming $ */
 
2
/* 
 
3
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
 
4
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
 
5
 *
 
6
 * This program is free software; you can redistribute it and/or modify
 
7
 * it under the terms of the GNU General Public License as published by
 
8
 * the Free Software Foundation; either version 2 of the License, or
 
9
 * (at your option) any later version.
 
10
 *
 
11
 * This program is distributed in the hope that it will be useful,
 
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 * GNU General Public License for more details.
 
15
 *
 
16
 * You should have received a copy of the GNU General Public License
 
17
 * along with this program; if not, write to the Free Software
 
18
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 
19
 */
 
20
#include <pjsip/sip_transport_tcp.h>
 
21
#include <pjsip/sip_endpoint.h>
 
22
#include <pjsip/sip_errno.h>
 
23
#include <pj/compat/socket.h>
 
24
#include <pj/addr_resolv.h>
 
25
#include <pj/activesock.h>
 
26
#include <pj/assert.h>
 
27
#include <pj/lock.h>
 
28
#include <pj/log.h>
 
29
#include <pj/os.h>
 
30
#include <pj/pool.h>
 
31
#include <pj/string.h>
 
32
 
 
33
/* Only declare the API if PJ_HAS_TCP is true */
 
34
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
 
35
 
 
36
 
 
37
#define THIS_FILE       "sip_transport_tcp.c"
 
38
 
 
39
#define MAX_ASYNC_CNT   16
 
40
#define POOL_LIS_INIT   512
 
41
#define POOL_LIS_INC    512
 
42
#define POOL_TP_INIT    512
 
43
#define POOL_TP_INC     512
 
44
 
 
45
struct tcp_listener;
 
46
struct tcp_transport;
 
47
 
 
48
 
 
49
/*
 
50
 * This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the
 
51
 * SIP transport factory).
 
52
 */
 
53
struct tcp_listener
 
54
{
 
55
    pjsip_tpfactory          factory;
 
56
    pj_bool_t                is_registered;
 
57
    pjsip_endpoint          *endpt;
 
58
    pjsip_tpmgr             *tpmgr;
 
59
    pj_activesock_t         *asock;
 
60
    pj_sockaddr              bound_addr;
 
61
    pj_qos_type              qos_type;
 
62
    pj_qos_params            qos_params;
 
63
};
 
64
 
 
65
 
 
66
/*
 
67
 * This structure is used to keep delayed transmit operation in a list.
 
68
 * A delayed transmission occurs when application sends tx_data when
 
69
 * the TCP connect/establishment is still in progress. These delayed
 
70
 * transmission will be "flushed" once the socket is connected (either
 
71
 * successfully or with errors).
 
72
 */
 
73
struct delayed_tdata
 
74
{
 
75
    PJ_DECL_LIST_MEMBER(struct delayed_tdata);
 
76
    pjsip_tx_data_op_key    *tdata_op_key;
 
77
    pj_time_val              timeout;
 
78
};
 
79
 
 
80
 
 
81
/*
 
82
 * This structure describes the TCP transport, and it's descendant of
 
83
 * pjsip_transport.
 
84
 */
 
85
struct tcp_transport
 
86
{
 
87
    pjsip_transport          base;
 
88
    pj_bool_t                is_server;
 
89
 
 
90
    /* Do not save listener instance in the transport, because
 
91
     * listener might be destroyed during transport's lifetime.
 
92
     * See http://trac.pjsip.org/repos/ticket/491
 
93
    struct tcp_listener     *listener;
 
94
     */
 
95
 
 
96
    pj_bool_t                is_registered;
 
97
    pj_bool_t                is_closing;
 
98
    pj_status_t              close_reason;
 
99
    pj_sock_t                sock;
 
100
    pj_activesock_t         *asock;
 
101
    pj_bool_t                has_pending_connect;
 
102
 
 
103
    /* Keep-alive timer. */
 
104
    pj_timer_entry           ka_timer;
 
105
    pj_time_val              last_activity;
 
106
    pjsip_tx_data_op_key     ka_op_key;
 
107
    pj_str_t                 ka_pkt;
 
108
 
 
109
    /* TCP transport can only have  one rdata!
 
110
     * Otherwise chunks of incoming PDU may be received on different
 
111
     * buffer.
 
112
     */
 
113
    pjsip_rx_data            rdata;
 
114
 
 
115
    /* Pending transmission list. */
 
116
    struct delayed_tdata     delayed_list;
 
117
};
 
118
 
 
119
 
 
120
/****************************************************************************
 
121
 * PROTOTYPES
 
122
 */
 
123
 
 
124
/* This callback is called when pending accept() operation completes. */
 
125
static pj_bool_t on_accept_complete(pj_activesock_t *asock,
 
126
                                    pj_sock_t newsock,
 
127
                                    const pj_sockaddr_t *src_addr,
 
128
                                    int src_addr_len);
 
129
 
 
130
/* This callback is called by transport manager to destroy listener */
 
131
static pj_status_t lis_destroy(pjsip_tpfactory *factory);
 
132
 
 
133
/* This callback is called by transport manager to create transport */
 
134
static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
 
135
                                        pjsip_tpmgr *mgr,
 
136
                                        pjsip_endpoint *endpt,
 
137
                                        const pj_sockaddr *rem_addr,
 
138
                                        int addr_len,
 
139
                                        pjsip_transport **transport);
 
140
 
 
141
/* Common function to create and initialize transport */
 
142
static pj_status_t tcp_create(struct tcp_listener *listener,
 
143
                              pj_pool_t *pool,
 
144
                              pj_sock_t sock, pj_bool_t is_server,
 
145
                              const pj_sockaddr *local,
 
146
                              const pj_sockaddr *remote,
 
147
                              struct tcp_transport **p_tcp);
 
148
 
 
149
 
 
150
static void tcp_perror(const char *sender, const char *title,
 
151
                       pj_status_t status)
 
152
{
 
153
    char errmsg[PJ_ERR_MSG_SIZE];
 
154
 
 
155
    pj_strerror(status, errmsg, sizeof(errmsg));
 
156
 
 
157
    PJ_LOG(1,(sender, "%s: %s [code=%d]", title, errmsg, status));
 
158
}
 
159
 
 
160
 
 
161
static void sockaddr_to_host_port( pj_pool_t *pool,
 
162
                                   pjsip_host_port *host_port,
 
163
                                   const pj_sockaddr *addr )
 
164
{
 
165
    host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4);
 
166
    pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 0);
 
167
    host_port->host.slen = pj_ansi_strlen(host_port->host.ptr);
 
168
    host_port->port = pj_sockaddr_get_port(addr);
 
169
}
 
170
 
 
171
 
 
172
static void tcp_init_shutdown(struct tcp_transport *tcp, pj_status_t status)
 
173
{
 
174
    pjsip_tp_state_callback state_cb;
 
175
 
 
176
    if (tcp->close_reason == PJ_SUCCESS)
 
177
        tcp->close_reason = status;
 
178
 
 
179
    if (tcp->base.is_shutdown || tcp->base.is_destroying)
 
180
        return;
 
181
 
 
182
    /* Prevent immediate transport destroy by application, as transport
 
183
     * state notification callback may be stacked and transport instance
 
184
     * must remain valid at any point in the callback.
 
185
     */
 
186
    pjsip_transport_add_ref(&tcp->base);
 
187
 
 
188
    /* Notify application of transport disconnected state */
 
189
    state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
 
190
    if (state_cb) {
 
191
        pjsip_transport_state_info state_info;
 
192
 
 
193
        pj_bzero(&state_info, sizeof(state_info));
 
194
        state_info.status = tcp->close_reason;
 
195
        (*state_cb)(&tcp->base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
 
196
    }
 
197
 
 
198
    /* check again */
 
199
    if (tcp->base.is_shutdown || tcp->base.is_destroying) {
 
200
        pjsip_transport_dec_ref(&tcp->base);
 
201
        return;
 
202
    }
 
203
 
 
204
    /* We can not destroy the transport since high level objects may
 
205
     * still keep reference to this transport. So we can only 
 
206
     * instruct transport manager to gracefully start the shutdown
 
207
     * procedure for this transport.
 
208
     */
 
209
    pjsip_transport_shutdown(&tcp->base);
 
210
 
 
211
    /* Now, it is ok to destroy the transport. */
 
212
    pjsip_transport_dec_ref(&tcp->base);
 
213
}
 
214
 
 
215
 
 
216
/*
 
217
 * Initialize pjsip_tcp_transport_cfg structure with default values.
 
218
 */
 
219
PJ_DEF(void) pjsip_tcp_transport_cfg_default(pjsip_tcp_transport_cfg *cfg,
 
220
                                             int af)
 
221
{
 
222
    pj_bzero(cfg, sizeof(*cfg));
 
223
    cfg->af = af;
 
224
    pj_sockaddr_init(cfg->af, &cfg->bind_addr, NULL, 0);
 
225
    cfg->async_cnt = 1;
 
226
    cfg->reuse_addr = PJSIP_TCP_TRANSPORT_REUSEADDR;
 
227
}
 
228
 
 
229
 
 
230
/****************************************************************************
 
231
 * The TCP listener/transport factory.
 
232
 */
 
233
 
 
234
/*
 
235
 * This is the public API to create, initialize, register, and start the
 
236
 * TCP listener.
 
237
 */
 
238
PJ_DEF(pj_status_t) pjsip_tcp_transport_start3(
 
239
                                        pjsip_endpoint *endpt,
 
240
                                        const pjsip_tcp_transport_cfg *cfg,
 
241
                                        pjsip_tpfactory **p_factory
 
242
                                        )
 
243
{
 
244
    pj_pool_t *pool;
 
245
    pj_sock_t sock = PJ_INVALID_SOCKET;
 
246
    struct tcp_listener *listener;
 
247
    pj_activesock_cfg asock_cfg;
 
248
    pj_activesock_cb listener_cb;
 
249
    pj_sockaddr *listener_addr;
 
250
    int addr_len;
 
251
    pj_status_t status;
 
252
 
 
253
    /* Sanity check */
 
254
    PJ_ASSERT_RETURN(endpt && cfg->async_cnt, PJ_EINVAL);
 
255
 
 
256
    /* Verify that address given in a_name (if any) is valid */
 
257
    if (cfg->addr_name.host.slen) {
 
258
        pj_sockaddr tmp;
 
259
 
 
260
        status = pj_sockaddr_init(cfg->af, &tmp, &cfg->addr_name.host, 
 
261
                                  (pj_uint16_t)cfg->addr_name.port);
 
262
        if (status != PJ_SUCCESS || !pj_sockaddr_has_addr(&tmp) ||
 
263
            (cfg->af==pj_AF_INET() && 
 
264
             tmp.ipv4.sin_addr.s_addr==PJ_INADDR_NONE)) 
 
265
        {
 
266
            /* Invalid address */
 
267
            return PJ_EINVAL;
 
268
        }
 
269
    }
 
270
 
 
271
    pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_LIS_INIT, 
 
272
                                   POOL_LIS_INC);
 
273
    PJ_ASSERT_RETURN(pool, PJ_ENOMEM);
 
274
 
 
275
 
 
276
    listener = PJ_POOL_ZALLOC_T(pool, struct tcp_listener);
 
277
    listener->factory.pool = pool;
 
278
    listener->factory.type = cfg->af==pj_AF_INET() ? PJSIP_TRANSPORT_TCP :
 
279
                                                     PJSIP_TRANSPORT_TCP6;
 
280
    listener->factory.type_name = (char*)
 
281
                pjsip_transport_get_type_name(listener->factory.type);
 
282
    listener->factory.flag = 
 
283
        pjsip_transport_get_flag_from_type(listener->factory.type);
 
284
    listener->qos_type = cfg->qos_type;
 
285
    pj_memcpy(&listener->qos_params, &cfg->qos_params,
 
286
              sizeof(cfg->qos_params));
 
287
 
 
288
    pj_ansi_strcpy(listener->factory.obj_name, "tcplis");
 
289
    if (listener->factory.type==PJSIP_TRANSPORT_TCP6)
 
290
        pj_ansi_strcat(listener->factory.obj_name, "6");
 
291
 
 
292
    status = pj_lock_create_recursive_mutex(pool, listener->factory.obj_name,
 
293
                                            &listener->factory.lock);
 
294
    if (status != PJ_SUCCESS)
 
295
        goto on_error;
 
296
 
 
297
 
 
298
    /* Create socket */
 
299
    status = pj_sock_socket(cfg->af, pj_SOCK_STREAM(), 0, &sock);
 
300
    if (status != PJ_SUCCESS)
 
301
        goto on_error;
 
302
 
 
303
    /* Apply QoS, if specified */
 
304
    status = pj_sock_apply_qos2(sock, cfg->qos_type, &cfg->qos_params, 
 
305
                                2, listener->factory.obj_name, 
 
306
                                "SIP TCP listener socket");
 
307
 
 
308
    /* Apply SO_REUSEADDR */
 
309
    if (cfg->reuse_addr) {
 
310
        int enabled = 1;
 
311
        status = pj_sock_setsockopt(sock, pj_SOL_SOCKET(), pj_SO_REUSEADDR(),
 
312
                                    &enabled, sizeof(enabled));
 
313
        if (status != PJ_SUCCESS) {
 
314
            PJ_PERROR(4,(listener->factory.obj_name, status,
 
315
                         "Warning: error applying SO_REUSEADDR"));
 
316
        }
 
317
    }
 
318
 
 
319
    /* Bind address may be different than factory.local_addr because
 
320
     * factory.local_addr will be resolved below.
 
321
     */
 
322
    pj_sockaddr_cp(&listener->bound_addr, &cfg->bind_addr);
 
323
 
 
324
    /* Bind socket */
 
325
    listener_addr = &listener->factory.local_addr;
 
326
    pj_sockaddr_cp(listener_addr, &cfg->bind_addr);
 
327
 
 
328
    status = pj_sock_bind(sock, listener_addr, 
 
329
                          pj_sockaddr_get_len(listener_addr));
 
330
    if (status != PJ_SUCCESS)
 
331
        goto on_error;
 
332
 
 
333
    /* Retrieve the bound address */
 
334
    addr_len = pj_sockaddr_get_len(listener_addr);
 
335
    status = pj_sock_getsockname(sock, listener_addr, &addr_len);
 
336
    if (status != PJ_SUCCESS)
 
337
        goto on_error;
 
338
 
 
339
    /* If published host/IP is specified, then use that address as the
 
340
     * listener advertised address.
 
341
     */
 
342
    if (cfg->addr_name.host.slen) {
 
343
        /* Copy the address */
 
344
        listener->factory.addr_name = cfg->addr_name;
 
345
        pj_strdup(listener->factory.pool, &listener->factory.addr_name.host, 
 
346
                  &cfg->addr_name.host);
 
347
        listener->factory.addr_name.port = cfg->addr_name.port;
 
348
 
 
349
    } else {
 
350
        /* No published address is given, use the bound address */
 
351
 
 
352
        /* If the address returns 0.0.0.0, use the default
 
353
         * interface address as the transport's address.
 
354
         */
 
355
        if (!pj_sockaddr_has_addr(listener_addr)) {
 
356
            pj_sockaddr hostip;
 
357
 
 
358
            status = pj_gethostip(listener->bound_addr.addr.sa_family,
 
359
                                  &hostip);
 
360
            if (status != PJ_SUCCESS)
 
361
                goto on_error;
 
362
 
 
363
            pj_sockaddr_copy_addr(listener_addr, &hostip);
 
364
        }
 
365
 
 
366
        /* Save the address name */
 
367
        sockaddr_to_host_port(listener->factory.pool, 
 
368
                              &listener->factory.addr_name, 
 
369
                              listener_addr);
 
370
    }
 
371
 
 
372
    /* If port is zero, get the bound port */
 
373
    if (listener->factory.addr_name.port == 0) {
 
374
        listener->factory.addr_name.port = pj_sockaddr_get_port(listener_addr);
 
375
    }
 
376
 
 
377
    pj_ansi_snprintf(listener->factory.obj_name, 
 
378
                     sizeof(listener->factory.obj_name),
 
379
                     "tcplis:%d",  listener->factory.addr_name.port);
 
380
 
 
381
 
 
382
    /* Start listening to the address */
 
383
    status = pj_sock_listen(sock, PJSIP_TCP_TRANSPORT_BACKLOG);
 
384
    if (status != PJ_SUCCESS)
 
385
        goto on_error;
 
386
 
 
387
 
 
388
    /* Create active socket */
 
389
    pj_activesock_cfg_default(&asock_cfg);
 
390
    if (cfg->async_cnt > MAX_ASYNC_CNT) 
 
391
        asock_cfg.async_cnt = MAX_ASYNC_CNT;
 
392
    else
 
393
        asock_cfg.async_cnt = cfg->async_cnt;
 
394
 
 
395
    pj_bzero(&listener_cb, sizeof(listener_cb));
 
396
    listener_cb.on_accept_complete = &on_accept_complete;
 
397
    status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
 
398
                                  pjsip_endpt_get_ioqueue(endpt), 
 
399
                                  &listener_cb, listener,
 
400
                                  &listener->asock);
 
401
 
 
402
    /* Register to transport manager */
 
403
    listener->endpt = endpt;
 
404
    listener->tpmgr = pjsip_endpt_get_tpmgr(endpt);
 
405
    listener->factory.create_transport = lis_create_transport;
 
406
    listener->factory.destroy = lis_destroy;
 
407
    listener->is_registered = PJ_TRUE;
 
408
    status = pjsip_tpmgr_register_tpfactory(listener->tpmgr,
 
409
                                            &listener->factory);
 
410
    if (status != PJ_SUCCESS) {
 
411
        listener->is_registered = PJ_FALSE;
 
412
        goto on_error;
 
413
    }
 
414
 
 
415
    /* Start pending accept() operations */
 
416
    status = pj_activesock_start_accept(listener->asock, pool);
 
417
    if (status != PJ_SUCCESS)
 
418
        goto on_error;
 
419
 
 
420
    PJ_LOG(4,(listener->factory.obj_name, 
 
421
             "SIP TCP listener ready for incoming connections at %.*s:%d",
 
422
             (int)listener->factory.addr_name.host.slen,
 
423
             listener->factory.addr_name.host.ptr,
 
424
             listener->factory.addr_name.port));
 
425
 
 
426
    /* Return the pointer to user */
 
427
    if (p_factory) *p_factory = &listener->factory;
 
428
 
 
429
    return PJ_SUCCESS;
 
430
 
 
431
on_error:
 
432
    if (listener->asock==NULL && sock!=PJ_INVALID_SOCKET)
 
433
        pj_sock_close(sock);
 
434
    lis_destroy(&listener->factory);
 
435
    return status;
 
436
}
 
437
 
 
438
 
 
439
/*
 
440
 * This is the public API to create, initialize, register, and start the
 
441
 * TCP listener.
 
442
 */
 
443
PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
 
444
                                               const pj_sockaddr_in *local,
 
445
                                               const pjsip_host_port *a_name,
 
446
                                               unsigned async_cnt,
 
447
                                               pjsip_tpfactory **p_factory)
 
448
{
 
449
    pjsip_tcp_transport_cfg cfg;
 
450
 
 
451
    pjsip_tcp_transport_cfg_default(&cfg, pj_AF_INET());
 
452
 
 
453
    if (local)
 
454
        pj_sockaddr_cp(&cfg.bind_addr, local);
 
455
    else
 
456
        pj_sockaddr_init(cfg.af, &cfg.bind_addr, NULL, 0);
 
457
 
 
458
    if (a_name)
 
459
        pj_memcpy(&cfg.addr_name, a_name, sizeof(*a_name));
 
460
 
 
461
    if (async_cnt)
 
462
        cfg.async_cnt = async_cnt;
 
463
 
 
464
    return pjsip_tcp_transport_start3(endpt, &cfg, p_factory);
 
465
}
 
466
 
 
467
 
 
468
/*
 
469
 * This is the public API to create, initialize, register, and start the
 
470
 * TCP listener.
 
471
 */
 
472
PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
 
473
                                               const pj_sockaddr_in *local,
 
474
                                               unsigned async_cnt,
 
475
                                               pjsip_tpfactory **p_factory)
 
476
{
 
477
    return pjsip_tcp_transport_start2(endpt, local, NULL, async_cnt, p_factory);
 
478
}
 
479
 
 
480
 
 
481
/* This callback is called by transport manager to destroy listener */
 
482
static pj_status_t lis_destroy(pjsip_tpfactory *factory)
 
483
{
 
484
    struct tcp_listener *listener = (struct tcp_listener *)factory;
 
485
 
 
486
    if (listener->is_registered) {
 
487
        pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory);
 
488
        listener->is_registered = PJ_FALSE;
 
489
    }
 
490
 
 
491
    if (listener->asock) {
 
492
        pj_activesock_close(listener->asock);
 
493
        listener->asock = NULL;
 
494
    }
 
495
 
 
496
    if (listener->factory.lock) {
 
497
        pj_lock_destroy(listener->factory.lock);
 
498
        listener->factory.lock = NULL;
 
499
    }
 
500
 
 
501
    if (listener->factory.pool) {
 
502
        pj_pool_t *pool = listener->factory.pool;
 
503
 
 
504
        PJ_LOG(4,(listener->factory.obj_name,  "SIP TCP listener destroyed"));
 
505
 
 
506
        listener->factory.pool = NULL;
 
507
        pj_pool_release(pool);
 
508
    }
 
509
 
 
510
    return PJ_SUCCESS;
 
511
}
 
512
 
 
513
 
 
514
/***************************************************************************/
 
515
/*
 
516
 * TCP Transport
 
517
 */
 
518
 
 
519
/*
 
520
 * Prototypes.
 
521
 */
 
522
/* Called by transport manager to send message */
 
523
static pj_status_t tcp_send_msg(pjsip_transport *transport, 
 
524
                                pjsip_tx_data *tdata,
 
525
                                const pj_sockaddr_t *rem_addr,
 
526
                                int addr_len,
 
527
                                void *token,
 
528
                                pjsip_transport_callback callback);
 
529
 
 
530
/* Called by transport manager to shutdown */
 
531
static pj_status_t tcp_shutdown(pjsip_transport *transport);
 
532
 
 
533
/* Called by transport manager to destroy transport */
 
534
static pj_status_t tcp_destroy_transport(pjsip_transport *transport);
 
535
 
 
536
/* Utility to destroy transport */
 
537
static pj_status_t tcp_destroy(pjsip_transport *transport,
 
538
                               pj_status_t reason);
 
539
 
 
540
/* Callback on incoming data */
 
541
static pj_bool_t on_data_read(pj_activesock_t *asock,
 
542
                              void *data,
 
543
                              pj_size_t size,
 
544
                              pj_status_t status,
 
545
                              pj_size_t *remainder);
 
546
 
 
547
/* Callback when packet is sent */
 
548
static pj_bool_t on_data_sent(pj_activesock_t *asock,
 
549
                              pj_ioqueue_op_key_t *send_key,
 
550
                              pj_ssize_t sent);
 
551
 
 
552
/* Callback when connect completes */
 
553
static pj_bool_t on_connect_complete(pj_activesock_t *asock,
 
554
                                     pj_status_t status);
 
555
 
 
556
/* TCP keep-alive timer callback */
 
557
static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e);
 
558
 
 
559
/*
 
560
 * Common function to create TCP transport, called when pending accept() and
 
561
 * pending connect() complete.
 
562
 */
 
563
static pj_status_t tcp_create( struct tcp_listener *listener,
 
564
                               pj_pool_t *pool,
 
565
                               pj_sock_t sock, pj_bool_t is_server,
 
566
                               const pj_sockaddr *local,
 
567
                               const pj_sockaddr *remote,
 
568
                               struct tcp_transport **p_tcp)
 
569
{
 
570
    struct tcp_transport *tcp;
 
571
    pj_ioqueue_t *ioqueue;
 
572
    pj_activesock_cfg asock_cfg;
 
573
    pj_activesock_cb tcp_callback;
 
574
    const pj_str_t ka_pkt = PJSIP_TCP_KEEP_ALIVE_DATA;
 
575
    char print_addr[PJ_INET6_ADDRSTRLEN+10];
 
576
    pj_status_t status;
 
577
    
 
578
 
 
579
    PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL);
 
580
 
 
581
 
 
582
    if (pool == NULL) {
 
583
        pool = pjsip_endpt_create_pool(listener->endpt, "tcp",
 
584
                                       POOL_TP_INIT, POOL_TP_INC);
 
585
        PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM);
 
586
    }    
 
587
 
 
588
    /*
 
589
     * Create and initialize basic transport structure.
 
590
     */
 
591
    tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport);
 
592
    tcp->is_server = is_server;
 
593
    tcp->sock = sock;
 
594
    /*tcp->listener = listener;*/
 
595
    pj_list_init(&tcp->delayed_list);
 
596
    tcp->base.pool = pool;
 
597
 
 
598
    pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, 
 
599
                     (is_server ? "tcps%p" :"tcpc%p"), tcp);
 
600
 
 
601
    status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt);
 
602
    if (status != PJ_SUCCESS) {
 
603
        goto on_error;
 
604
    }
 
605
 
 
606
    status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock);
 
607
    if (status != PJ_SUCCESS) {
 
608
        goto on_error;
 
609
    }
 
610
 
 
611
    tcp->base.key.type = listener->factory.type;
 
612
    pj_sockaddr_cp(&tcp->base.key.rem_addr, remote);
 
613
    tcp->base.type_name = (char*)pjsip_transport_get_type_name(
 
614
                                (pjsip_transport_type_e)tcp->base.key.type);
 
615
    tcp->base.flag = pjsip_transport_get_flag_from_type(
 
616
                                (pjsip_transport_type_e)tcp->base.key.type);
 
617
 
 
618
    tcp->base.info = (char*) pj_pool_alloc(pool, 64);
 
619
    pj_ansi_snprintf(tcp->base.info, 64, "%s to %s",
 
620
                     tcp->base.type_name,
 
621
                     pj_sockaddr_print(remote, print_addr,
 
622
                                       sizeof(print_addr), 3));
 
623
 
 
624
    tcp->base.addr_len = pj_sockaddr_get_len(remote);
 
625
    pj_sockaddr_cp(&tcp->base.local_addr, local);
 
626
    sockaddr_to_host_port(pool, &tcp->base.local_name, local);
 
627
    sockaddr_to_host_port(pool, &tcp->base.remote_name, remote);
 
628
    tcp->base.dir = is_server? PJSIP_TP_DIR_INCOMING : PJSIP_TP_DIR_OUTGOING;
 
629
 
 
630
    tcp->base.endpt = listener->endpt;
 
631
    tcp->base.tpmgr = listener->tpmgr;
 
632
    tcp->base.send_msg = &tcp_send_msg;
 
633
    tcp->base.do_shutdown = &tcp_shutdown;
 
634
    tcp->base.destroy = &tcp_destroy_transport;
 
635
 
 
636
    /* Create active socket */
 
637
    pj_activesock_cfg_default(&asock_cfg);
 
638
    asock_cfg.async_cnt = 1;
 
639
 
 
640
    pj_bzero(&tcp_callback, sizeof(tcp_callback));
 
641
    tcp_callback.on_data_read = &on_data_read;
 
642
    tcp_callback.on_data_sent = &on_data_sent;
 
643
    tcp_callback.on_connect_complete = &on_connect_complete;
 
644
 
 
645
    ioqueue = pjsip_endpt_get_ioqueue(listener->endpt);
 
646
    status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
 
647
                                  ioqueue, &tcp_callback, tcp, &tcp->asock);
 
648
    if (status != PJ_SUCCESS) {
 
649
        goto on_error;
 
650
    }
 
651
 
 
652
    /* Register transport to transport manager */
 
653
    status = pjsip_transport_register(listener->tpmgr, &tcp->base);
 
654
    if (status != PJ_SUCCESS) {
 
655
        goto on_error;
 
656
    }
 
657
 
 
658
    tcp->is_registered = PJ_TRUE;
 
659
 
 
660
    /* Initialize keep-alive timer */
 
661
    tcp->ka_timer.user_data = (void*)tcp;
 
662
    tcp->ka_timer.cb = &tcp_keep_alive_timer;
 
663
    pj_ioqueue_op_key_init(&tcp->ka_op_key.key, sizeof(pj_ioqueue_op_key_t));
 
664
    pj_strdup(tcp->base.pool, &tcp->ka_pkt, &ka_pkt);
 
665
 
 
666
    /* Done setting up basic transport. */
 
667
    *p_tcp = tcp;
 
668
 
 
669
    PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created",
 
670
              (tcp->is_server ? "server" : "client")));
 
671
 
 
672
    return PJ_SUCCESS;
 
673
 
 
674
on_error:
 
675
    tcp_destroy(&tcp->base, status);
 
676
    return status;
 
677
}
 
678
 
 
679
 
 
680
/* Flush all delayed transmision once the socket is connected. */
 
681
static void tcp_flush_pending_tx(struct tcp_transport *tcp)
 
682
{
 
683
    pj_time_val now;
 
684
 
 
685
    pj_gettickcount(&now);
 
686
    pj_lock_acquire(tcp->base.lock);
 
687
    while (!pj_list_empty(&tcp->delayed_list)) {
 
688
        struct delayed_tdata *pending_tx;
 
689
        pjsip_tx_data *tdata;
 
690
        pj_ioqueue_op_key_t *op_key;
 
691
        pj_ssize_t size;
 
692
        pj_status_t status;
 
693
 
 
694
        pending_tx = tcp->delayed_list.next;
 
695
        pj_list_erase(pending_tx);
 
696
 
 
697
        tdata = pending_tx->tdata_op_key->tdata;
 
698
        op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
 
699
 
 
700
        if (pending_tx->timeout.sec > 0 &&
 
701
            PJ_TIME_VAL_GT(now, pending_tx->timeout))
 
702
        {
 
703
            continue;
 
704
        }
 
705
 
 
706
        /* send! */
 
707
        size = tdata->buf.cur - tdata->buf.start;
 
708
        status = pj_activesock_send(tcp->asock, op_key, tdata->buf.start, 
 
709
                                    &size, 0);
 
710
        if (status != PJ_EPENDING) {
 
711
            pj_lock_release(tcp->base.lock);
 
712
            on_data_sent(tcp->asock, op_key, size);
 
713
            pj_lock_acquire(tcp->base.lock);
 
714
        }
 
715
 
 
716
    }
 
717
    pj_lock_release(tcp->base.lock);
 
718
}
 
719
 
 
720
 
 
721
/* Called by transport manager to destroy transport */
 
722
static pj_status_t tcp_destroy_transport(pjsip_transport *transport)
 
723
{
 
724
    struct tcp_transport *tcp = (struct tcp_transport*)transport;
 
725
 
 
726
    /* Transport would have been unregistered by now since this callback
 
727
     * is called by transport manager.
 
728
     */
 
729
    tcp->is_registered = PJ_FALSE;
 
730
 
 
731
    return tcp_destroy(transport, tcp->close_reason);
 
732
}
 
733
 
 
734
 
 
735
/* Destroy TCP transport */
 
736
static pj_status_t tcp_destroy(pjsip_transport *transport, 
 
737
                               pj_status_t reason)
 
738
{
 
739
    struct tcp_transport *tcp = (struct tcp_transport*)transport;
 
740
 
 
741
    if (tcp->close_reason == 0)
 
742
        tcp->close_reason = reason;
 
743
 
 
744
    if (tcp->is_registered) {
 
745
        tcp->is_registered = PJ_FALSE;
 
746
        pjsip_transport_destroy(transport);
 
747
 
 
748
        /* pjsip_transport_destroy will recursively call this function
 
749
         * again.
 
750
         */
 
751
        return PJ_SUCCESS;
 
752
    }
 
753
 
 
754
    /* Mark transport as closing */
 
755
    tcp->is_closing = PJ_TRUE;
 
756
 
 
757
    /* Stop keep-alive timer. */
 
758
    if (tcp->ka_timer.id) {
 
759
        pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer);
 
760
        tcp->ka_timer.id = PJ_FALSE;
 
761
    }
 
762
 
 
763
    /* Cancel all delayed transmits */
 
764
    while (!pj_list_empty(&tcp->delayed_list)) {
 
765
        struct delayed_tdata *pending_tx;
 
766
        pj_ioqueue_op_key_t *op_key;
 
767
 
 
768
        pending_tx = tcp->delayed_list.next;
 
769
        pj_list_erase(pending_tx);
 
770
 
 
771
        op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
 
772
 
 
773
        on_data_sent(tcp->asock, op_key, -reason);
 
774
    }
 
775
 
 
776
    if (tcp->rdata.tp_info.pool) {
 
777
        pj_pool_release(tcp->rdata.tp_info.pool);
 
778
        tcp->rdata.tp_info.pool = NULL;
 
779
    }
 
780
 
 
781
    if (tcp->asock) {
 
782
        pj_activesock_close(tcp->asock);
 
783
        tcp->asock = NULL;
 
784
        tcp->sock = PJ_INVALID_SOCKET;
 
785
    } else if (tcp->sock != PJ_INVALID_SOCKET) {
 
786
        pj_sock_close(tcp->sock);
 
787
        tcp->sock = PJ_INVALID_SOCKET;
 
788
    }
 
789
 
 
790
    if (tcp->base.lock) {
 
791
        pj_lock_destroy(tcp->base.lock);
 
792
        tcp->base.lock = NULL;
 
793
    }
 
794
 
 
795
    if (tcp->base.ref_cnt) {
 
796
        pj_atomic_destroy(tcp->base.ref_cnt);
 
797
        tcp->base.ref_cnt = NULL;
 
798
    }
 
799
 
 
800
    if (tcp->base.pool) {
 
801
        pj_pool_t *pool;
 
802
 
 
803
        if (reason != PJ_SUCCESS) {
 
804
            char errmsg[PJ_ERR_MSG_SIZE];
 
805
 
 
806
            pj_strerror(reason, errmsg, sizeof(errmsg));
 
807
            PJ_LOG(4,(tcp->base.obj_name, 
 
808
                      "TCP transport destroyed with reason %d: %s", 
 
809
                      reason, errmsg));
 
810
 
 
811
        } else {
 
812
 
 
813
            PJ_LOG(4,(tcp->base.obj_name, 
 
814
                      "TCP transport destroyed normally"));
 
815
 
 
816
        }
 
817
 
 
818
        pool = tcp->base.pool;
 
819
        tcp->base.pool = NULL;
 
820
        pj_pool_release(pool);
 
821
    }
 
822
 
 
823
    return PJ_SUCCESS;
 
824
}
 
825
 
 
826
 
 
827
/*
 
828
 * This utility function creates receive data buffers and start
 
829
 * asynchronous recv() operations from the socket. It is called after
 
830
 * accept() or connect() operation complete.
 
831
 */
 
832
static pj_status_t tcp_start_read(struct tcp_transport *tcp)
 
833
{
 
834
    pj_pool_t *pool;
 
835
    pj_uint32_t size;
 
836
    pj_sockaddr *rem_addr;
 
837
    void *readbuf[1];
 
838
    pj_status_t status;
 
839
 
 
840
    /* Init rdata */
 
841
    pool = pjsip_endpt_create_pool(tcp->base.endpt,
 
842
                                   "rtd%p",
 
843
                                   PJSIP_POOL_RDATA_LEN,
 
844
                                   PJSIP_POOL_RDATA_INC);
 
845
    if (!pool) {
 
846
        tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM);
 
847
        return PJ_ENOMEM;
 
848
    }
 
849
 
 
850
    tcp->rdata.tp_info.pool = pool;
 
851
 
 
852
    tcp->rdata.tp_info.transport = &tcp->base;
 
853
    tcp->rdata.tp_info.tp_data = tcp;
 
854
    tcp->rdata.tp_info.op_key.rdata = &tcp->rdata;
 
855
    pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key, 
 
856
                           sizeof(pj_ioqueue_op_key_t));
 
857
 
 
858
    tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr;
 
859
    tcp->rdata.pkt_info.src_addr_len = sizeof(tcp->rdata.pkt_info.src_addr);
 
860
    rem_addr = &tcp->base.key.rem_addr;
 
861
    pj_sockaddr_print(rem_addr, tcp->rdata.pkt_info.src_name,
 
862
                      sizeof(tcp->rdata.pkt_info.src_name), 0);
 
863
    tcp->rdata.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
 
864
 
 
865
    size = sizeof(tcp->rdata.pkt_info.packet);
 
866
    readbuf[0] = tcp->rdata.pkt_info.packet;
 
867
    status = pj_activesock_start_read2(tcp->asock, tcp->base.pool, size,
 
868
                                       readbuf, 0);
 
869
    if (status != PJ_SUCCESS && status != PJ_EPENDING) {
 
870
        PJ_LOG(4, (tcp->base.obj_name, 
 
871
                   "pj_activesock_start_read() error, status=%d", 
 
872
                   status));
 
873
        return status;
 
874
    }
 
875
 
 
876
    return PJ_SUCCESS;
 
877
}
 
878
 
 
879
 
 
880
/* This callback is called by transport manager for the TCP factory
 
881
 * to create outgoing transport to the specified destination.
 
882
 */
 
883
static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
 
884
                                        pjsip_tpmgr *mgr,
 
885
                                        pjsip_endpoint *endpt,
 
886
                                        const pj_sockaddr *rem_addr,
 
887
                                        int addr_len,
 
888
                                        pjsip_transport **p_transport)
 
889
{
 
890
    struct tcp_listener *listener;
 
891
    struct tcp_transport *tcp;
 
892
    pj_sock_t sock;
 
893
    pj_sockaddr local_addr;
 
894
    pj_status_t status;
 
895
 
 
896
    /* Sanity checks */
 
897
    PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr &&
 
898
                     addr_len && p_transport, PJ_EINVAL);
 
899
 
 
900
    /* Check that address is a sockaddr_in or sockaddr_in6*/
 
901
    PJ_ASSERT_RETURN((rem_addr->addr.sa_family == pj_AF_INET() &&
 
902
                      addr_len == sizeof(pj_sockaddr_in)) ||
 
903
                     (rem_addr->addr.sa_family == pj_AF_INET6() &&
 
904
                      addr_len == sizeof(pj_sockaddr_in6)), PJ_EINVAL);
 
905
 
 
906
 
 
907
    listener = (struct tcp_listener*)factory;
 
908
 
 
909
    /* Create socket */
 
910
    status = pj_sock_socket(rem_addr->addr.sa_family, pj_SOCK_STREAM(),
 
911
                            0, &sock);
 
912
    if (status != PJ_SUCCESS)
 
913
        return status;
 
914
 
 
915
    /* Apply QoS, if specified */
 
916
    status = pj_sock_apply_qos2(sock, listener->qos_type, 
 
917
                                &listener->qos_params, 
 
918
                                2, listener->factory.obj_name, 
 
919
                                "outgoing SIP TCP socket");
 
920
 
 
921
    /* Bind to listener's address and any port */
 
922
    pj_bzero(&local_addr, sizeof(local_addr));
 
923
    pj_sockaddr_cp(&local_addr, &listener->bound_addr);
 
924
    pj_sockaddr_set_port(&local_addr, 0);
 
925
 
 
926
    status = pj_sock_bind(sock, &local_addr,
 
927
                          pj_sockaddr_get_len(&local_addr));
 
928
    if (status != PJ_SUCCESS) {
 
929
        pj_sock_close(sock);
 
930
        return status;
 
931
    }
 
932
 
 
933
    /* Get the local port */
 
934
    addr_len = sizeof(local_addr);
 
935
    status = pj_sock_getsockname(sock, &local_addr, &addr_len);
 
936
    if (status != PJ_SUCCESS) {
 
937
        pj_sock_close(sock);
 
938
        return status;
 
939
    }
 
940
 
 
941
    /* Initially set the address from the listener's address */
 
942
    if (!pj_sockaddr_has_addr(&local_addr)) {
 
943
        pj_sockaddr_copy_addr(&local_addr, &listener->factory.local_addr);
 
944
    }
 
945
 
 
946
    /* Create the transport descriptor */
 
947
    status = tcp_create(listener, NULL, sock, PJ_FALSE, &local_addr, 
 
948
                        rem_addr, &tcp);
 
949
    if (status != PJ_SUCCESS)
 
950
        return status;
 
951
 
 
952
 
 
953
    /* Start asynchronous connect() operation */
 
954
    tcp->has_pending_connect = PJ_TRUE;
 
955
    status = pj_activesock_start_connect(tcp->asock, tcp->base.pool, rem_addr,
 
956
                                         addr_len);
 
957
    if (status == PJ_SUCCESS) {
 
958
        on_connect_complete(tcp->asock, PJ_SUCCESS);
 
959
    } else if (status != PJ_EPENDING) {
 
960
        tcp_destroy(&tcp->base, status);
 
961
        return status;
 
962
    }
 
963
 
 
964
    if (tcp->has_pending_connect) {
 
965
        /* Update (again) local address, just in case local address currently
 
966
         * set is different now that asynchronous connect() is started.
 
967
         */
 
968
        addr_len = sizeof(local_addr);
 
969
        if (pj_sock_getsockname(sock, &local_addr, &addr_len)==PJ_SUCCESS) {
 
970
            pj_sockaddr *tp_addr = &tcp->base.local_addr;
 
971
 
 
972
            /* Some systems (like old Win32 perhaps) may not set local address
 
973
             * properly before socket is fully connected.
 
974
             */
 
975
            if (pj_sockaddr_cmp(tp_addr, &local_addr) &&
 
976
                pj_sockaddr_has_addr(&local_addr) &&
 
977
                pj_sockaddr_get_port(&local_addr) != 0)
 
978
            {
 
979
                pj_sockaddr_cp(tp_addr, &local_addr);
 
980
                sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
 
981
                                      &local_addr);
 
982
            }
 
983
        }
 
984
        
 
985
        PJ_LOG(4,(tcp->base.obj_name, 
 
986
                  "TCP transport %.*s:%d is connecting to %.*s:%d...",
 
987
                  (int)tcp->base.local_name.host.slen,
 
988
                  tcp->base.local_name.host.ptr,
 
989
                  tcp->base.local_name.port,
 
990
                  (int)tcp->base.remote_name.host.slen,
 
991
                  tcp->base.remote_name.host.ptr,
 
992
                  tcp->base.remote_name.port));
 
993
    }
 
994
 
 
995
    /* Done */
 
996
    *p_transport = &tcp->base;
 
997
 
 
998
    return PJ_SUCCESS;
 
999
}
 
1000
 
 
1001
 
 
1002
/*
 
1003
 * This callback is called by active socket when pending accept() operation
 
1004
 * has completed.
 
1005
 */
 
1006
static pj_bool_t on_accept_complete(pj_activesock_t *asock,
 
1007
                                    pj_sock_t sock,
 
1008
                                    const pj_sockaddr_t *src_addr,
 
1009
                                    int src_addr_len)
 
1010
{
 
1011
    struct tcp_listener *listener;
 
1012
    struct tcp_transport *tcp;
 
1013
    char addr[PJ_INET6_ADDRSTRLEN+10];
 
1014
    pjsip_tp_state_callback state_cb;
 
1015
    pj_sockaddr tmp_src_addr;
 
1016
    pj_status_t status;
 
1017
 
 
1018
    PJ_UNUSED_ARG(src_addr_len);
 
1019
 
 
1020
    listener = (struct tcp_listener*) pj_activesock_get_user_data(asock);
 
1021
 
 
1022
    PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_TRUE);
 
1023
 
 
1024
    PJ_LOG(4,(listener->factory.obj_name, 
 
1025
              "TCP listener %.*s:%d: got incoming TCP connection "
 
1026
              "from %s, sock=%d",
 
1027
              (int)listener->factory.addr_name.host.slen,
 
1028
              listener->factory.addr_name.host.ptr,
 
1029
              listener->factory.addr_name.port,
 
1030
              pj_sockaddr_print(src_addr, addr, sizeof(addr), 3),
 
1031
              sock));
 
1032
 
 
1033
    /* Apply QoS, if specified */
 
1034
    status = pj_sock_apply_qos2(sock, listener->qos_type, 
 
1035
                                &listener->qos_params, 
 
1036
                                2, listener->factory.obj_name, 
 
1037
                                "incoming SIP TCP socket");
 
1038
 
 
1039
    /* tcp_create() expect pj_sockaddr, so copy src_addr to temporary var,
 
1040
     * just in case.
 
1041
     */
 
1042
    pj_bzero(&tmp_src_addr, sizeof(tmp_src_addr));
 
1043
    pj_sockaddr_cp(&tmp_src_addr, src_addr);
 
1044
 
 
1045
    /* 
 
1046
     * Incoming connection!
 
1047
     * Create TCP transport for the new socket.
 
1048
     */
 
1049
    status = tcp_create( listener, NULL, sock, PJ_TRUE,
 
1050
                         &listener->factory.local_addr,
 
1051
                         &tmp_src_addr, &tcp);
 
1052
    if (status == PJ_SUCCESS) {
 
1053
        status = tcp_start_read(tcp);
 
1054
        if (status != PJ_SUCCESS) {
 
1055
            PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));
 
1056
            tcp_destroy(&tcp->base, status);
 
1057
        } else {
 
1058
            /* Start keep-alive timer */
 
1059
            if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
 
1060
                pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0};
 
1061
                pjsip_endpt_schedule_timer(listener->endpt, 
 
1062
                                           &tcp->ka_timer, 
 
1063
                                           &delay);
 
1064
                tcp->ka_timer.id = PJ_TRUE;
 
1065
                pj_gettimeofday(&tcp->last_activity);
 
1066
            }
 
1067
 
 
1068
            /* Notify application of transport state accepted */
 
1069
            state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
 
1070
            if (state_cb) {
 
1071
                pjsip_transport_state_info state_info;
 
1072
            
 
1073
                pj_bzero(&state_info, sizeof(state_info));
 
1074
                (*state_cb)(&tcp->base, PJSIP_TP_STATE_CONNECTED, &state_info);
 
1075
            }
 
1076
        }
 
1077
    }
 
1078
 
 
1079
    return PJ_TRUE;
 
1080
}
 
1081
 
 
1082
 
 
1083
/* 
 
1084
 * Callback from ioqueue when packet is sent.
 
1085
 */
 
1086
static pj_bool_t on_data_sent(pj_activesock_t *asock,
 
1087
                              pj_ioqueue_op_key_t *op_key,
 
1088
                              pj_ssize_t bytes_sent)
 
1089
{
 
1090
    struct tcp_transport *tcp = (struct tcp_transport*) 
 
1091
                                pj_activesock_get_user_data(asock);
 
1092
    pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
 
1093
 
 
1094
    /* Note that op_key may be the op_key from keep-alive, thus
 
1095
     * it will not have tdata etc.
 
1096
     */
 
1097
 
 
1098
    tdata_op_key->tdata = NULL;
 
1099
 
 
1100
    if (tdata_op_key->callback) {
 
1101
        /*
 
1102
         * Notify sip_transport.c that packet has been sent.
 
1103
         */
 
1104
        if (bytes_sent == 0)
 
1105
            bytes_sent = -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
 
1106
 
 
1107
        tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent);
 
1108
 
 
1109
        /* Mark last activity time */
 
1110
        pj_gettimeofday(&tcp->last_activity);
 
1111
 
 
1112
    }
 
1113
 
 
1114
    /* Check for error/closure */
 
1115
    if (bytes_sent <= 0) {
 
1116
        pj_status_t status;
 
1117
 
 
1118
        PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", 
 
1119
                  bytes_sent));
 
1120
 
 
1121
        status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) :
 
1122
                                     (pj_status_t)-bytes_sent;
 
1123
 
 
1124
        tcp_init_shutdown(tcp, status);
 
1125
 
 
1126
        return PJ_FALSE;
 
1127
    }
 
1128
 
 
1129
    return PJ_TRUE;
 
1130
}
 
1131
 
 
1132
 
 
1133
/* 
 
1134
 * This callback is called by transport manager to send SIP message 
 
1135
 */
 
1136
static pj_status_t tcp_send_msg(pjsip_transport *transport, 
 
1137
                                pjsip_tx_data *tdata,
 
1138
                                const pj_sockaddr_t *rem_addr,
 
1139
                                int addr_len,
 
1140
                                void *token,
 
1141
                                pjsip_transport_callback callback)
 
1142
{
 
1143
    struct tcp_transport *tcp = (struct tcp_transport*)transport;
 
1144
    pj_ssize_t size;
 
1145
    pj_bool_t delayed = PJ_FALSE;
 
1146
    pj_status_t status = PJ_SUCCESS;
 
1147
 
 
1148
    /* Sanity check */
 
1149
    PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
 
1150
 
 
1151
    /* Check that there's no pending operation associated with the tdata */
 
1152
    PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
 
1153
    
 
1154
    /* Check the address is supported */
 
1155
    PJ_ASSERT_RETURN(rem_addr && (addr_len==sizeof(pj_sockaddr_in) ||
 
1156
                                  addr_len==sizeof(pj_sockaddr_in6)),
 
1157
                     PJ_EINVAL);
 
1158
 
 
1159
    /* Init op key. */
 
1160
    tdata->op_key.tdata = tdata;
 
1161
    tdata->op_key.token = token;
 
1162
    tdata->op_key.callback = callback;
 
1163
 
 
1164
    /* If asynchronous connect() has not completed yet, just put the
 
1165
     * transmit data in the pending transmission list since we can not
 
1166
     * use the socket yet.
 
1167
     */
 
1168
    if (tcp->has_pending_connect) {
 
1169
 
 
1170
        /*
 
1171
         * Looks like connect() is still in progress. Check again (this time
 
1172
         * with holding the lock) to be sure.
 
1173
         */
 
1174
        pj_lock_acquire(tcp->base.lock);
 
1175
 
 
1176
        if (tcp->has_pending_connect) {
 
1177
            struct delayed_tdata *delayed_tdata;
 
1178
 
 
1179
            /*
 
1180
             * connect() is still in progress. Put the transmit data to
 
1181
             * the delayed list.
 
1182
             * Starting from #1583 (https://trac.pjsip.org/repos/ticket/1583),
 
1183
             * we also add timeout value for the transmit data. When the
 
1184
             * connect() is completed, the timeout value will be checked to
 
1185
             * determine whether the transmit data needs to be sent.
 
1186
             */
 
1187
            delayed_tdata = PJ_POOL_ZALLOC_T(tdata->pool, 
 
1188
                                             struct delayed_tdata);
 
1189
            delayed_tdata->tdata_op_key = &tdata->op_key;
 
1190
            if (tdata->msg && tdata->msg->type == PJSIP_REQUEST_MSG) {
 
1191
                pj_gettickcount(&delayed_tdata->timeout);
 
1192
                delayed_tdata->timeout.msec += pjsip_cfg()->tsx.td;
 
1193
                pj_time_val_normalize(&delayed_tdata->timeout);
 
1194
            }
 
1195
 
 
1196
            pj_list_push_back(&tcp->delayed_list, delayed_tdata);
 
1197
            status = PJ_EPENDING;
 
1198
 
 
1199
            /* Prevent pj_ioqueue_send() to be called below */
 
1200
            delayed = PJ_TRUE;
 
1201
        }
 
1202
 
 
1203
        pj_lock_release(tcp->base.lock);
 
1204
    } 
 
1205
    
 
1206
    if (!delayed) {
 
1207
        /*
 
1208
         * Transport is ready to go. Send the packet to ioqueue to be
 
1209
         * sent asynchronously.
 
1210
         */
 
1211
        size = tdata->buf.cur - tdata->buf.start;
 
1212
        status = pj_activesock_send(tcp->asock, 
 
1213
                                    (pj_ioqueue_op_key_t*)&tdata->op_key,
 
1214
                                    tdata->buf.start, &size, 0);
 
1215
 
 
1216
        if (status != PJ_EPENDING) {
 
1217
            /* Not pending (could be immediate success or error) */
 
1218
            tdata->op_key.tdata = NULL;
 
1219
 
 
1220
            /* Shutdown transport on closure/errors */
 
1221
            if (size <= 0) {
 
1222
 
 
1223
                PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", 
 
1224
                          size));
 
1225
 
 
1226
                if (status == PJ_SUCCESS) 
 
1227
                    status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
 
1228
 
 
1229
                tcp_init_shutdown(tcp, status);
 
1230
            }
 
1231
        }
 
1232
    }
 
1233
 
 
1234
    return status;
 
1235
}
 
1236
 
 
1237
 
 
1238
/* 
 
1239
 * This callback is called by transport manager to shutdown transport.
 
1240
 */
 
1241
static pj_status_t tcp_shutdown(pjsip_transport *transport)
 
1242
{
 
1243
    struct tcp_transport *tcp = (struct tcp_transport*)transport;
 
1244
    
 
1245
    /* Stop keep-alive timer. */
 
1246
    if (tcp->ka_timer.id) {
 
1247
        pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer);
 
1248
        tcp->ka_timer.id = PJ_FALSE;
 
1249
    }
 
1250
 
 
1251
    return PJ_SUCCESS;
 
1252
}
 
1253
 
 
1254
 
 
1255
/* 
 
1256
 * Callback from ioqueue that an incoming data is received from the socket.
 
1257
 */
 
1258
static pj_bool_t on_data_read(pj_activesock_t *asock,
 
1259
                              void *data,
 
1260
                              pj_size_t size,
 
1261
                              pj_status_t status,
 
1262
                              pj_size_t *remainder)
 
1263
{
 
1264
    enum { MAX_IMMEDIATE_PACKET = 10 };
 
1265
    struct tcp_transport *tcp;
 
1266
    pjsip_rx_data *rdata;
 
1267
 
 
1268
    PJ_UNUSED_ARG(data);
 
1269
 
 
1270
    tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
 
1271
    rdata = &tcp->rdata;
 
1272
 
 
1273
    /* Don't do anything if transport is closing. */
 
1274
    if (tcp->is_closing) {
 
1275
        tcp->is_closing++;
 
1276
        return PJ_FALSE;
 
1277
    }
 
1278
 
 
1279
    /* Houston, we have packet! Report the packet to transport manager
 
1280
     * to be parsed.
 
1281
     */
 
1282
    if (status == PJ_SUCCESS) {
 
1283
        pj_size_t size_eaten;
 
1284
 
 
1285
        /* Mark this as an activity */
 
1286
        pj_gettimeofday(&tcp->last_activity);
 
1287
 
 
1288
        pj_assert((void*)rdata->pkt_info.packet == data);
 
1289
 
 
1290
        /* Init pkt_info part. */
 
1291
        rdata->pkt_info.len = size;
 
1292
        rdata->pkt_info.zero = 0;
 
1293
        pj_gettimeofday(&rdata->pkt_info.timestamp);
 
1294
 
 
1295
        /* Report to transport manager.
 
1296
         * The transport manager will tell us how many bytes of the packet
 
1297
         * have been processed (as valid SIP message).
 
1298
         */
 
1299
        size_eaten = 
 
1300
            pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, 
 
1301
                                       rdata);
 
1302
 
 
1303
        pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len);
 
1304
 
 
1305
        /* Move unprocessed data to the front of the buffer */
 
1306
        *remainder = size - size_eaten;
 
1307
        if (*remainder > 0 && *remainder != size) {
 
1308
            pj_memmove(rdata->pkt_info.packet,
 
1309
                       rdata->pkt_info.packet + size_eaten,
 
1310
                       *remainder);
 
1311
        }
 
1312
 
 
1313
    } else {
 
1314
 
 
1315
        /* Transport is closed */
 
1316
        PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed"));
 
1317
        
 
1318
        tcp_init_shutdown(tcp, status);
 
1319
 
 
1320
        return PJ_FALSE;
 
1321
 
 
1322
    }
 
1323
 
 
1324
    /* Reset pool. */
 
1325
    pj_pool_reset(rdata->tp_info.pool);
 
1326
 
 
1327
    return PJ_TRUE;
 
1328
}
 
1329
 
 
1330
 
 
1331
/* 
 
1332
 * Callback from ioqueue when asynchronous connect() operation completes.
 
1333
 */
 
1334
static pj_bool_t on_connect_complete(pj_activesock_t *asock,
 
1335
                                     pj_status_t status)
 
1336
{
 
1337
    struct tcp_transport *tcp;
 
1338
    pj_sockaddr addr;
 
1339
    int addrlen;
 
1340
    pjsip_tp_state_callback state_cb;
 
1341
 
 
1342
    tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
 
1343
 
 
1344
    /* Mark that pending connect() operation has completed. */
 
1345
    tcp->has_pending_connect = PJ_FALSE;
 
1346
 
 
1347
    /* Check connect() status */
 
1348
    if (status != PJ_SUCCESS) {
 
1349
 
 
1350
        tcp_perror(tcp->base.obj_name, "TCP connect() error", status);
 
1351
 
 
1352
        /* Cancel all delayed transmits */
 
1353
        while (!pj_list_empty(&tcp->delayed_list)) {
 
1354
            struct delayed_tdata *pending_tx;
 
1355
            pj_ioqueue_op_key_t *op_key;
 
1356
 
 
1357
            pending_tx = tcp->delayed_list.next;
 
1358
            pj_list_erase(pending_tx);
 
1359
 
 
1360
            op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
 
1361
 
 
1362
            on_data_sent(tcp->asock, op_key, -status);
 
1363
        }
 
1364
 
 
1365
        tcp_init_shutdown(tcp, status);
 
1366
        return PJ_FALSE;
 
1367
    }
 
1368
 
 
1369
    PJ_LOG(4,(tcp->base.obj_name, 
 
1370
              "TCP transport %.*s:%d is connected to %.*s:%d",
 
1371
              (int)tcp->base.local_name.host.slen,
 
1372
              tcp->base.local_name.host.ptr,
 
1373
              tcp->base.local_name.port,
 
1374
              (int)tcp->base.remote_name.host.slen,
 
1375
              tcp->base.remote_name.host.ptr,
 
1376
              tcp->base.remote_name.port));
 
1377
 
 
1378
 
 
1379
    /* Update (again) local address, just in case local address currently
 
1380
     * set is different now that the socket is connected (could happen
 
1381
     * on some systems, like old Win32 probably?).
 
1382
     */
 
1383
    addrlen = sizeof(addr);
 
1384
    if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) {
 
1385
        pj_sockaddr *tp_addr = &tcp->base.local_addr;
 
1386
 
 
1387
        if (pj_sockaddr_has_addr(&addr) &&
 
1388
            pj_sockaddr_cmp(&addr, tp_addr) != 0)
 
1389
        {
 
1390
            pj_sockaddr_cp(tp_addr, &addr);
 
1391
            sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
 
1392
                                  tp_addr);
 
1393
        }
 
1394
    }
 
1395
 
 
1396
    /* Start pending read */
 
1397
    status = tcp_start_read(tcp);
 
1398
    if (status != PJ_SUCCESS) {
 
1399
        tcp_init_shutdown(tcp, status);
 
1400
        return PJ_FALSE;
 
1401
    }
 
1402
 
 
1403
    /* Notify application of transport state connected */
 
1404
    state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
 
1405
    if (state_cb) {
 
1406
        pjsip_transport_state_info state_info;
 
1407
    
 
1408
        pj_bzero(&state_info, sizeof(state_info));
 
1409
        (*state_cb)(&tcp->base, PJSIP_TP_STATE_CONNECTED, &state_info);
 
1410
    }
 
1411
 
 
1412
    /* Flush all pending send operations */
 
1413
    tcp_flush_pending_tx(tcp);
 
1414
 
 
1415
    /* Start keep-alive timer */
 
1416
    if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
 
1417
        pj_time_val delay = { PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0 };
 
1418
        pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer, 
 
1419
                                   &delay);
 
1420
        tcp->ka_timer.id = PJ_TRUE;
 
1421
        pj_gettimeofday(&tcp->last_activity);
 
1422
    }
 
1423
 
 
1424
    return PJ_TRUE;
 
1425
}
 
1426
 
 
1427
/* Transport keep-alive timer callback */
 
1428
static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e)
 
1429
{
 
1430
    struct tcp_transport *tcp = (struct tcp_transport*) e->user_data;
 
1431
    pj_time_val delay;
 
1432
    pj_time_val now;
 
1433
    pj_ssize_t size;
 
1434
    pj_status_t status;
 
1435
 
 
1436
    PJ_UNUSED_ARG(th);
 
1437
 
 
1438
    tcp->ka_timer.id = PJ_TRUE;
 
1439
 
 
1440
    pj_gettimeofday(&now);
 
1441
    PJ_TIME_VAL_SUB(now, tcp->last_activity);
 
1442
 
 
1443
    if (now.sec > 0 && now.sec < PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
 
1444
        /* There has been activity, so don't send keep-alive */
 
1445
        delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL - now.sec;
 
1446
        delay.msec = 0;
 
1447
 
 
1448
        pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer, 
 
1449
                                   &delay);
 
1450
        tcp->ka_timer.id = PJ_TRUE;
 
1451
        return;
 
1452
    }
 
1453
 
 
1454
    PJ_LOG(5,(tcp->base.obj_name, "Sending %d byte(s) keep-alive to %.*s:%d", 
 
1455
              (int)tcp->ka_pkt.slen, (int)tcp->base.remote_name.host.slen,
 
1456
              tcp->base.remote_name.host.ptr,
 
1457
              tcp->base.remote_name.port));
 
1458
 
 
1459
    /* Send the data */
 
1460
    size = tcp->ka_pkt.slen;
 
1461
    status = pj_activesock_send(tcp->asock, &tcp->ka_op_key.key,
 
1462
                                tcp->ka_pkt.ptr, &size, 0);
 
1463
 
 
1464
    if (status != PJ_SUCCESS && status != PJ_EPENDING) {
 
1465
        tcp_perror(tcp->base.obj_name, 
 
1466
                   "Error sending keep-alive packet", status);
 
1467
        tcp_init_shutdown(tcp, status);
 
1468
        return;
 
1469
    }
 
1470
 
 
1471
    /* Register next keep-alive */
 
1472
    delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL;
 
1473
    delay.msec = 0;
 
1474
 
 
1475
    pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer, 
 
1476
                               &delay);
 
1477
    tcp->ka_timer.id = PJ_TRUE;
 
1478
}
 
1479
 
 
1480
 
 
1481
#endif  /* PJ_HAS_TCP */
 
1482