~ubuntu-branches/ubuntu/wily/sflphone/wily

« back to all changes in this revision

Viewing changes to daemon/libs/pjproject-2.1.0/pjsip/src/pjsip/sip_transport_tcp.c

  • Committer: Package Import Robot
  • Author(s): Mark Purcell
  • Date: 2014-01-28 18:23:36 UTC
  • mfrom: (1.1.11)
  • mto: This revision was merged to the branch mainline in revision 24.
  • Revision ID: package-import@ubuntu.com-20140128182336-3xenud1kbnwmf3mz
* New upstream release 
  - Fixes "New Upstream Release" (Closes: #735846)
  - Fixes "Ringtone does not stop" (Closes: #727164)
  - Fixes "[sflphone-kde] crash on startup" (Closes: #718178)
  - Fixes "sflphone GUI crashes when call is hung up" (Closes: #736583)
* Build-Depends: ensure GnuTLS 2.6
  - libucommon-dev (>= 6.0.7-1.1), libccrtp-dev (>= 2.0.6-3)
  - Fixes "FTBFS Build-Depends libgnutls{26,28}-dev" (Closes: #722040)
* Fix "boost 1.49 is going away" unversioned Build-Depends: (Closes: #736746)
* Add Build-Depends: libsndfile-dev, nepomuk-core-dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* $Id: sip_transport_tcp.c 4294 2012-11-06 05:02:10Z nanang $ */
 
2
/* 
 
3
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
 
4
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
 
5
 *
 
6
 * This program is free software; you can redistribute it and/or modify
 
7
 * it under the terms of the GNU General Public License as published by
 
8
 * the Free Software Foundation; either version 2 of the License, or
 
9
 * (at your option) any later version.
 
10
 *
 
11
 * This program is distributed in the hope that it will be useful,
 
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 * GNU General Public License for more details.
 
15
 *
 
16
 * You should have received a copy of the GNU General Public License
 
17
 * along with this program; if not, write to the Free Software
 
18
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 
19
 */
 
20
#include <pjsip/sip_transport_tcp.h>
 
21
#include <pjsip/sip_endpoint.h>
 
22
#include <pjsip/sip_errno.h>
 
23
#include <pj/compat/socket.h>
 
24
#include <pj/addr_resolv.h>
 
25
#include <pj/activesock.h>
 
26
#include <pj/assert.h>
 
27
#include <pj/lock.h>
 
28
#include <pj/log.h>
 
29
#include <pj/os.h>
 
30
#include <pj/pool.h>
 
31
#include <pj/string.h>
 
32
 
 
33
/* Only declare the API if PJ_HAS_TCP is true */
 
34
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
 
35
 
 
36
 
 
37
#define THIS_FILE       "sip_transport_tcp.c"
 
38
 
 
39
#define MAX_ASYNC_CNT   16
 
40
#define POOL_LIS_INIT   512
 
41
#define POOL_LIS_INC    512
 
42
#define POOL_TP_INIT    512
 
43
#define POOL_TP_INC     512
 
44
 
 
45
struct tcp_listener;
 
46
struct tcp_transport;
 
47
 
 
48
 
 
49
/*
 
50
 * This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the
 
51
 * SIP transport factory).
 
52
 */
 
53
struct tcp_listener
 
54
{
 
55
    pjsip_tpfactory          factory;
 
56
    pj_bool_t                is_registered;
 
57
    pjsip_endpoint          *endpt;
 
58
    pjsip_tpmgr             *tpmgr;
 
59
    pj_activesock_t         *asock;
 
60
    pj_sockaddr              bound_addr;
 
61
    pj_qos_type              qos_type;
 
62
    pj_qos_params            qos_params;
 
63
};
 
64
 
 
65
 
 
66
/*
 
67
 * This structure is used to keep delayed transmit operation in a list.
 
68
 * A delayed transmission occurs when application sends tx_data when
 
69
 * the TCP connect/establishment is still in progress. These delayed
 
70
 * transmission will be "flushed" once the socket is connected (either
 
71
 * successfully or with errors).
 
72
 */
 
73
struct delayed_tdata
 
74
{
 
75
    PJ_DECL_LIST_MEMBER(struct delayed_tdata);
 
76
    pjsip_tx_data_op_key    *tdata_op_key;
 
77
    pj_time_val              timeout;
 
78
};
 
79
 
 
80
 
 
81
/*
 
82
 * This structure describes the TCP transport, and it's descendant of
 
83
 * pjsip_transport.
 
84
 */
 
85
struct tcp_transport
 
86
{
 
87
    pjsip_transport          base;
 
88
    pj_bool_t                is_server;
 
89
 
 
90
    /* Do not save listener instance in the transport, because
 
91
     * listener might be destroyed during transport's lifetime.
 
92
     * See http://trac.pjsip.org/repos/ticket/491
 
93
    struct tcp_listener     *listener;
 
94
     */
 
95
 
 
96
    pj_bool_t                is_registered;
 
97
    pj_bool_t                is_closing;
 
98
    pj_status_t              close_reason;
 
99
    pj_sock_t                sock;
 
100
    pj_activesock_t         *asock;
 
101
    pj_bool_t                has_pending_connect;
 
102
 
 
103
    /* Keep-alive timer. */
 
104
    pj_timer_entry           ka_timer;
 
105
    pj_time_val              last_activity;
 
106
    pjsip_tx_data_op_key     ka_op_key;
 
107
    pj_str_t                 ka_pkt;
 
108
 
 
109
    /* TCP transport can only have  one rdata!
 
110
     * Otherwise chunks of incoming PDU may be received on different
 
111
     * buffer.
 
112
     */
 
113
    pjsip_rx_data            rdata;
 
114
 
 
115
    /* Pending transmission list. */
 
116
    struct delayed_tdata     delayed_list;
 
117
};
 
118
 
 
119
 
 
120
/****************************************************************************
 
121
 * PROTOTYPES
 
122
 */
 
123
 
 
124
/* This callback is called when pending accept() operation completes. */
 
125
static pj_bool_t on_accept_complete(pj_activesock_t *asock,
 
126
                                    pj_sock_t newsock,
 
127
                                    const pj_sockaddr_t *src_addr,
 
128
                                    int src_addr_len);
 
129
 
 
130
/* This callback is called by transport manager to destroy listener */
 
131
static pj_status_t lis_destroy(pjsip_tpfactory *factory);
 
132
 
 
133
/* This callback is called by transport manager to create transport */
 
134
static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
 
135
                                        pjsip_tpmgr *mgr,
 
136
                                        pjsip_endpoint *endpt,
 
137
                                        const pj_sockaddr *rem_addr,
 
138
                                        int addr_len,
 
139
                                        pjsip_transport **transport);
 
140
 
 
141
/* Common function to create and initialize transport */
 
142
static pj_status_t tcp_create(struct tcp_listener *listener,
 
143
                              pj_pool_t *pool,
 
144
                              pj_sock_t sock, pj_bool_t is_server,
 
145
                              const pj_sockaddr *local,
 
146
                              const pj_sockaddr *remote,
 
147
                              struct tcp_transport **p_tcp);
 
148
 
 
149
 
 
150
static void tcp_perror(const char *sender, const char *title,
 
151
                       pj_status_t status)
 
152
{
 
153
    char errmsg[PJ_ERR_MSG_SIZE];
 
154
 
 
155
    pj_strerror(status, errmsg, sizeof(errmsg));
 
156
 
 
157
    PJ_LOG(1,(sender, "%s: %s [code=%d]", title, errmsg, status));
 
158
}
 
159
 
 
160
 
 
161
static void sockaddr_to_host_port( pj_pool_t *pool,
 
162
                                   pjsip_host_port *host_port,
 
163
                                   const pj_sockaddr *addr )
 
164
{
 
165
    host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4);
 
166
    pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 0);
 
167
    host_port->host.slen = pj_ansi_strlen(host_port->host.ptr);
 
168
    host_port->port = pj_sockaddr_get_port(addr);
 
169
}
 
170
 
 
171
 
 
172
static void tcp_init_shutdown(struct tcp_transport *tcp, pj_status_t status)
 
173
{
 
174
    pjsip_tp_state_callback state_cb;
 
175
 
 
176
    if (tcp->close_reason == PJ_SUCCESS)
 
177
        tcp->close_reason = status;
 
178
 
 
179
    if (tcp->base.is_shutdown)
 
180
        return;
 
181
 
 
182
    /* Prevent immediate transport destroy by application, as transport
 
183
     * state notification callback may be stacked and transport instance
 
184
     * must remain valid at any point in the callback.
 
185
     */
 
186
    pjsip_transport_add_ref(&tcp->base);
 
187
 
 
188
    /* Notify application of transport disconnected state */
 
189
    state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
 
190
    if (state_cb) {
 
191
        pjsip_transport_state_info state_info;
 
192
 
 
193
        pj_bzero(&state_info, sizeof(state_info));
 
194
        state_info.status = tcp->close_reason;
 
195
        (*state_cb)(&tcp->base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
 
196
    }
 
197
 
 
198
    /* We can not destroy the transport since high level objects may
 
199
     * still keep reference to this transport. So we can only 
 
200
     * instruct transport manager to gracefully start the shutdown
 
201
     * procedure for this transport.
 
202
     */
 
203
    pjsip_transport_shutdown(&tcp->base);
 
204
 
 
205
    /* Now, it is ok to destroy the transport. */
 
206
    pjsip_transport_dec_ref(&tcp->base);
 
207
}
 
208
 
 
209
 
 
210
/*
 
211
 * Initialize pjsip_tcp_transport_cfg structure with default values.
 
212
 */
 
213
PJ_DEF(void) pjsip_tcp_transport_cfg_default(pjsip_tcp_transport_cfg *cfg,
 
214
                                             int af)
 
215
{
 
216
    pj_bzero(cfg, sizeof(*cfg));
 
217
    cfg->af = af;
 
218
    pj_sockaddr_init(cfg->af, &cfg->bind_addr, NULL, 0);
 
219
    cfg->async_cnt = 1;
 
220
}
 
221
 
 
222
 
 
223
/****************************************************************************
 
224
 * The TCP listener/transport factory.
 
225
 */
 
226
 
 
227
/*
 
228
 * This is the public API to create, initialize, register, and start the
 
229
 * TCP listener.
 
230
 */
 
231
PJ_DEF(pj_status_t) pjsip_tcp_transport_start3(
 
232
                                        pjsip_endpoint *endpt,
 
233
                                        const pjsip_tcp_transport_cfg *cfg,
 
234
                                        pjsip_tpfactory **p_factory
 
235
                                        )
 
236
{
 
237
    pj_pool_t *pool;
 
238
    pj_sock_t sock = PJ_INVALID_SOCKET;
 
239
    struct tcp_listener *listener;
 
240
    pj_activesock_cfg asock_cfg;
 
241
    pj_activesock_cb listener_cb;
 
242
    pj_sockaddr *listener_addr;
 
243
    int addr_len;
 
244
    pj_status_t status;
 
245
 
 
246
    /* Sanity check */
 
247
    PJ_ASSERT_RETURN(endpt && cfg->async_cnt, PJ_EINVAL);
 
248
 
 
249
    /* Verify that address given in a_name (if any) is valid */
 
250
    if (cfg->addr_name.host.slen) {
 
251
        pj_sockaddr tmp;
 
252
 
 
253
        status = pj_sockaddr_init(cfg->af, &tmp, &cfg->addr_name.host, 
 
254
                                  (pj_uint16_t)cfg->addr_name.port);
 
255
        if (status != PJ_SUCCESS || !pj_sockaddr_has_addr(&tmp) ||
 
256
            (cfg->af==pj_AF_INET() && 
 
257
             tmp.ipv4.sin_addr.s_addr==PJ_INADDR_NONE)) 
 
258
        {
 
259
            /* Invalid address */
 
260
            return PJ_EINVAL;
 
261
        }
 
262
    }
 
263
 
 
264
    pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_LIS_INIT, 
 
265
                                   POOL_LIS_INC);
 
266
    PJ_ASSERT_RETURN(pool, PJ_ENOMEM);
 
267
 
 
268
 
 
269
    listener = PJ_POOL_ZALLOC_T(pool, struct tcp_listener);
 
270
    listener->factory.pool = pool;
 
271
    listener->factory.type = cfg->af==pj_AF_INET() ? PJSIP_TRANSPORT_TCP :
 
272
                                                     PJSIP_TRANSPORT_TCP6;
 
273
    listener->factory.type_name = (char*)
 
274
                pjsip_transport_get_type_name(listener->factory.type);
 
275
    listener->factory.flag = 
 
276
        pjsip_transport_get_flag_from_type(listener->factory.type);
 
277
    listener->qos_type = cfg->qos_type;
 
278
    pj_memcpy(&listener->qos_params, &cfg->qos_params,
 
279
              sizeof(cfg->qos_params));
 
280
 
 
281
    pj_ansi_strcpy(listener->factory.obj_name, "tcplis");
 
282
    if (listener->factory.type==PJSIP_TRANSPORT_TCP6)
 
283
        pj_ansi_strcat(listener->factory.obj_name, "6");
 
284
 
 
285
    status = pj_lock_create_recursive_mutex(pool, listener->factory.obj_name,
 
286
                                            &listener->factory.lock);
 
287
    if (status != PJ_SUCCESS)
 
288
        goto on_error;
 
289
 
 
290
 
 
291
    /* Create socket */
 
292
    status = pj_sock_socket(cfg->af, pj_SOCK_STREAM(), 0, &sock);
 
293
    if (status != PJ_SUCCESS)
 
294
        goto on_error;
 
295
 
 
296
    /* Apply QoS, if specified */
 
297
    status = pj_sock_apply_qos2(sock, cfg->qos_type, &cfg->qos_params, 
 
298
                                2, listener->factory.obj_name, 
 
299
                                "SIP TCP listener socket");
 
300
 
 
301
    /* Bind address may be different than factory.local_addr because
 
302
     * factory.local_addr will be resolved below.
 
303
     */
 
304
    pj_sockaddr_cp(&listener->bound_addr, &cfg->bind_addr);
 
305
 
 
306
    /* Bind socket */
 
307
    listener_addr = &listener->factory.local_addr;
 
308
    pj_sockaddr_cp(listener_addr, &cfg->bind_addr);
 
309
 
 
310
    status = pj_sock_bind(sock, listener_addr, 
 
311
                          pj_sockaddr_get_len(listener_addr));
 
312
    if (status != PJ_SUCCESS)
 
313
        goto on_error;
 
314
 
 
315
    /* Retrieve the bound address */
 
316
    addr_len = pj_sockaddr_get_len(listener_addr);
 
317
    status = pj_sock_getsockname(sock, listener_addr, &addr_len);
 
318
    if (status != PJ_SUCCESS)
 
319
        goto on_error;
 
320
 
 
321
    /* If published host/IP is specified, then use that address as the
 
322
     * listener advertised address.
 
323
     */
 
324
    if (cfg->addr_name.host.slen) {
 
325
        /* Copy the address */
 
326
        listener->factory.addr_name = cfg->addr_name;
 
327
        pj_strdup(listener->factory.pool, &listener->factory.addr_name.host, 
 
328
                  &cfg->addr_name.host);
 
329
        listener->factory.addr_name.port = cfg->addr_name.port;
 
330
 
 
331
    } else {
 
332
        /* No published address is given, use the bound address */
 
333
 
 
334
        /* If the address returns 0.0.0.0, use the default
 
335
         * interface address as the transport's address.
 
336
         */
 
337
        if (!pj_sockaddr_has_addr(listener_addr)) {
 
338
            pj_sockaddr hostip;
 
339
 
 
340
            status = pj_gethostip(listener->bound_addr.addr.sa_family,
 
341
                                  &hostip);
 
342
            if (status != PJ_SUCCESS)
 
343
                goto on_error;
 
344
 
 
345
            pj_sockaddr_copy_addr(listener_addr, &hostip);
 
346
        }
 
347
 
 
348
        /* Save the address name */
 
349
        sockaddr_to_host_port(listener->factory.pool, 
 
350
                              &listener->factory.addr_name, 
 
351
                              listener_addr);
 
352
    }
 
353
 
 
354
    /* If port is zero, get the bound port */
 
355
    if (listener->factory.addr_name.port == 0) {
 
356
        listener->factory.addr_name.port = pj_sockaddr_get_port(listener_addr);
 
357
    }
 
358
 
 
359
    pj_ansi_snprintf(listener->factory.obj_name, 
 
360
                     sizeof(listener->factory.obj_name),
 
361
                     "tcplis:%d",  listener->factory.addr_name.port);
 
362
 
 
363
 
 
364
    /* Start listening to the address */
 
365
    status = pj_sock_listen(sock, PJSIP_TCP_TRANSPORT_BACKLOG);
 
366
    if (status != PJ_SUCCESS)
 
367
        goto on_error;
 
368
 
 
369
 
 
370
    /* Create active socket */
 
371
    pj_activesock_cfg_default(&asock_cfg);
 
372
    if (cfg->async_cnt > MAX_ASYNC_CNT) 
 
373
        asock_cfg.async_cnt = MAX_ASYNC_CNT;
 
374
    else
 
375
        asock_cfg.async_cnt = cfg->async_cnt;
 
376
 
 
377
    pj_bzero(&listener_cb, sizeof(listener_cb));
 
378
    listener_cb.on_accept_complete = &on_accept_complete;
 
379
    status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
 
380
                                  pjsip_endpt_get_ioqueue(endpt), 
 
381
                                  &listener_cb, listener,
 
382
                                  &listener->asock);
 
383
 
 
384
    /* Register to transport manager */
 
385
    listener->endpt = endpt;
 
386
    listener->tpmgr = pjsip_endpt_get_tpmgr(endpt);
 
387
    listener->factory.create_transport = lis_create_transport;
 
388
    listener->factory.destroy = lis_destroy;
 
389
    listener->is_registered = PJ_TRUE;
 
390
    status = pjsip_tpmgr_register_tpfactory(listener->tpmgr,
 
391
                                            &listener->factory);
 
392
    if (status != PJ_SUCCESS) {
 
393
        listener->is_registered = PJ_FALSE;
 
394
        goto on_error;
 
395
    }
 
396
 
 
397
    /* Start pending accept() operations */
 
398
    status = pj_activesock_start_accept(listener->asock, pool);
 
399
    if (status != PJ_SUCCESS)
 
400
        goto on_error;
 
401
 
 
402
    PJ_LOG(4,(listener->factory.obj_name, 
 
403
             "SIP TCP listener ready for incoming connections at %.*s:%d",
 
404
             (int)listener->factory.addr_name.host.slen,
 
405
             listener->factory.addr_name.host.ptr,
 
406
             listener->factory.addr_name.port));
 
407
 
 
408
    /* Return the pointer to user */
 
409
    if (p_factory) *p_factory = &listener->factory;
 
410
 
 
411
    return PJ_SUCCESS;
 
412
 
 
413
on_error:
 
414
    if (listener->asock==NULL && sock!=PJ_INVALID_SOCKET)
 
415
        pj_sock_close(sock);
 
416
    lis_destroy(&listener->factory);
 
417
    return status;
 
418
}
 
419
 
 
420
 
 
421
/*
 
422
 * This is the public API to create, initialize, register, and start the
 
423
 * TCP listener.
 
424
 */
 
425
PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
 
426
                                               const pj_sockaddr_in *local,
 
427
                                               const pjsip_host_port *a_name,
 
428
                                               unsigned async_cnt,
 
429
                                               pjsip_tpfactory **p_factory)
 
430
{
 
431
    pjsip_tcp_transport_cfg cfg;
 
432
 
 
433
    pjsip_tcp_transport_cfg_default(&cfg, pj_AF_INET());
 
434
 
 
435
    if (local)
 
436
        pj_sockaddr_cp(&cfg.bind_addr, local);
 
437
    else
 
438
        pj_sockaddr_init(cfg.af, &cfg.bind_addr, NULL, 0);
 
439
 
 
440
    if (a_name)
 
441
        pj_memcpy(&cfg.addr_name, a_name, sizeof(*a_name));
 
442
 
 
443
    if (async_cnt)
 
444
        cfg.async_cnt = async_cnt;
 
445
 
 
446
    return pjsip_tcp_transport_start3(endpt, &cfg, p_factory);
 
447
}
 
448
 
 
449
 
 
450
/*
 
451
 * This is the public API to create, initialize, register, and start the
 
452
 * TCP listener.
 
453
 */
 
454
PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
 
455
                                               const pj_sockaddr_in *local,
 
456
                                               unsigned async_cnt,
 
457
                                               pjsip_tpfactory **p_factory)
 
458
{
 
459
    return pjsip_tcp_transport_start2(endpt, local, NULL, async_cnt, p_factory);
 
460
}
 
461
 
 
462
 
 
463
/* This callback is called by transport manager to destroy listener */
 
464
static pj_status_t lis_destroy(pjsip_tpfactory *factory)
 
465
{
 
466
    struct tcp_listener *listener = (struct tcp_listener *)factory;
 
467
 
 
468
    if (listener->is_registered) {
 
469
        pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory);
 
470
        listener->is_registered = PJ_FALSE;
 
471
    }
 
472
 
 
473
    if (listener->asock) {
 
474
        pj_activesock_close(listener->asock);
 
475
        listener->asock = NULL;
 
476
    }
 
477
 
 
478
    if (listener->factory.lock) {
 
479
        pj_lock_destroy(listener->factory.lock);
 
480
        listener->factory.lock = NULL;
 
481
    }
 
482
 
 
483
    if (listener->factory.pool) {
 
484
        pj_pool_t *pool = listener->factory.pool;
 
485
 
 
486
        PJ_LOG(4,(listener->factory.obj_name,  "SIP TCP listener destroyed"));
 
487
 
 
488
        listener->factory.pool = NULL;
 
489
        pj_pool_release(pool);
 
490
    }
 
491
 
 
492
    return PJ_SUCCESS;
 
493
}
 
494
 
 
495
 
 
496
/***************************************************************************/
 
497
/*
 
498
 * TCP Transport
 
499
 */
 
500
 
 
501
/*
 
502
 * Prototypes.
 
503
 */
 
504
/* Called by transport manager to send message */
 
505
static pj_status_t tcp_send_msg(pjsip_transport *transport, 
 
506
                                pjsip_tx_data *tdata,
 
507
                                const pj_sockaddr_t *rem_addr,
 
508
                                int addr_len,
 
509
                                void *token,
 
510
                                pjsip_transport_callback callback);
 
511
 
 
512
/* Called by transport manager to shutdown */
 
513
static pj_status_t tcp_shutdown(pjsip_transport *transport);
 
514
 
 
515
/* Called by transport manager to destroy transport */
 
516
static pj_status_t tcp_destroy_transport(pjsip_transport *transport);
 
517
 
 
518
/* Utility to destroy transport */
 
519
static pj_status_t tcp_destroy(pjsip_transport *transport,
 
520
                               pj_status_t reason);
 
521
 
 
522
/* Callback on incoming data */
 
523
static pj_bool_t on_data_read(pj_activesock_t *asock,
 
524
                              void *data,
 
525
                              pj_size_t size,
 
526
                              pj_status_t status,
 
527
                              pj_size_t *remainder);
 
528
 
 
529
/* Callback when packet is sent */
 
530
static pj_bool_t on_data_sent(pj_activesock_t *asock,
 
531
                              pj_ioqueue_op_key_t *send_key,
 
532
                              pj_ssize_t sent);
 
533
 
 
534
/* Callback when connect completes */
 
535
static pj_bool_t on_connect_complete(pj_activesock_t *asock,
 
536
                                     pj_status_t status);
 
537
 
 
538
/* TCP keep-alive timer callback */
 
539
static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e);
 
540
 
 
541
/*
 
542
 * Common function to create TCP transport, called when pending accept() and
 
543
 * pending connect() complete.
 
544
 */
 
545
static pj_status_t tcp_create( struct tcp_listener *listener,
 
546
                               pj_pool_t *pool,
 
547
                               pj_sock_t sock, pj_bool_t is_server,
 
548
                               const pj_sockaddr *local,
 
549
                               const pj_sockaddr *remote,
 
550
                               struct tcp_transport **p_tcp)
 
551
{
 
552
    struct tcp_transport *tcp;
 
553
    pj_ioqueue_t *ioqueue;
 
554
    pj_activesock_cfg asock_cfg;
 
555
    pj_activesock_cb tcp_callback;
 
556
    const pj_str_t ka_pkt = PJSIP_TCP_KEEP_ALIVE_DATA;
 
557
    char print_addr[PJ_INET6_ADDRSTRLEN+10];
 
558
    pj_status_t status;
 
559
    
 
560
 
 
561
    PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL);
 
562
 
 
563
 
 
564
    if (pool == NULL) {
 
565
        pool = pjsip_endpt_create_pool(listener->endpt, "tcp",
 
566
                                       POOL_TP_INIT, POOL_TP_INC);
 
567
        PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM);
 
568
    }    
 
569
 
 
570
    /*
 
571
     * Create and initialize basic transport structure.
 
572
     */
 
573
    tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport);
 
574
    tcp->is_server = is_server;
 
575
    tcp->sock = sock;
 
576
    /*tcp->listener = listener;*/
 
577
    pj_list_init(&tcp->delayed_list);
 
578
    tcp->base.pool = pool;
 
579
 
 
580
    pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, 
 
581
                     (is_server ? "tcps%p" :"tcpc%p"), tcp);
 
582
 
 
583
    status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt);
 
584
    if (status != PJ_SUCCESS) {
 
585
        goto on_error;
 
586
    }
 
587
 
 
588
    status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock);
 
589
    if (status != PJ_SUCCESS) {
 
590
        goto on_error;
 
591
    }
 
592
 
 
593
    tcp->base.key.type = listener->factory.type;
 
594
    pj_sockaddr_cp(&tcp->base.key.rem_addr, remote);
 
595
    tcp->base.type_name = (char*)pjsip_transport_get_type_name(
 
596
                                (pjsip_transport_type_e)tcp->base.key.type);
 
597
    tcp->base.flag = pjsip_transport_get_flag_from_type(
 
598
                                (pjsip_transport_type_e)tcp->base.key.type);
 
599
 
 
600
    tcp->base.info = (char*) pj_pool_alloc(pool, 64);
 
601
    pj_ansi_snprintf(tcp->base.info, 64, "%s to %s",
 
602
                     tcp->base.type_name,
 
603
                     pj_sockaddr_print(remote, print_addr,
 
604
                                       sizeof(print_addr), 3));
 
605
 
 
606
    tcp->base.addr_len = pj_sockaddr_get_len(remote);
 
607
    pj_sockaddr_cp(&tcp->base.local_addr, local);
 
608
    sockaddr_to_host_port(pool, &tcp->base.local_name, local);
 
609
    sockaddr_to_host_port(pool, &tcp->base.remote_name, remote);
 
610
    tcp->base.dir = is_server? PJSIP_TP_DIR_INCOMING : PJSIP_TP_DIR_OUTGOING;
 
611
 
 
612
    tcp->base.endpt = listener->endpt;
 
613
    tcp->base.tpmgr = listener->tpmgr;
 
614
    tcp->base.send_msg = &tcp_send_msg;
 
615
    tcp->base.do_shutdown = &tcp_shutdown;
 
616
    tcp->base.destroy = &tcp_destroy_transport;
 
617
 
 
618
    /* Create active socket */
 
619
    pj_activesock_cfg_default(&asock_cfg);
 
620
    asock_cfg.async_cnt = 1;
 
621
 
 
622
    pj_bzero(&tcp_callback, sizeof(tcp_callback));
 
623
    tcp_callback.on_data_read = &on_data_read;
 
624
    tcp_callback.on_data_sent = &on_data_sent;
 
625
    tcp_callback.on_connect_complete = &on_connect_complete;
 
626
 
 
627
    ioqueue = pjsip_endpt_get_ioqueue(listener->endpt);
 
628
    status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
 
629
                                  ioqueue, &tcp_callback, tcp, &tcp->asock);
 
630
    if (status != PJ_SUCCESS) {
 
631
        goto on_error;
 
632
    }
 
633
 
 
634
    /* Register transport to transport manager */
 
635
    status = pjsip_transport_register(listener->tpmgr, &tcp->base);
 
636
    if (status != PJ_SUCCESS) {
 
637
        goto on_error;
 
638
    }
 
639
 
 
640
    tcp->is_registered = PJ_TRUE;
 
641
 
 
642
    /* Initialize keep-alive timer */
 
643
    tcp->ka_timer.user_data = (void*)tcp;
 
644
    tcp->ka_timer.cb = &tcp_keep_alive_timer;
 
645
    pj_ioqueue_op_key_init(&tcp->ka_op_key.key, sizeof(pj_ioqueue_op_key_t));
 
646
    pj_strdup(tcp->base.pool, &tcp->ka_pkt, &ka_pkt);
 
647
 
 
648
    /* Done setting up basic transport. */
 
649
    *p_tcp = tcp;
 
650
 
 
651
    PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created",
 
652
              (tcp->is_server ? "server" : "client")));
 
653
 
 
654
    return PJ_SUCCESS;
 
655
 
 
656
on_error:
 
657
    tcp_destroy(&tcp->base, status);
 
658
    return status;
 
659
}
 
660
 
 
661
 
 
662
/* Flush all delayed transmision once the socket is connected. */
 
663
static void tcp_flush_pending_tx(struct tcp_transport *tcp)
 
664
{
 
665
    pj_time_val now;
 
666
 
 
667
    pj_gettickcount(&now);
 
668
    pj_lock_acquire(tcp->base.lock);
 
669
    while (!pj_list_empty(&tcp->delayed_list)) {
 
670
        struct delayed_tdata *pending_tx;
 
671
        pjsip_tx_data *tdata;
 
672
        pj_ioqueue_op_key_t *op_key;
 
673
        pj_ssize_t size;
 
674
        pj_status_t status;
 
675
 
 
676
        pending_tx = tcp->delayed_list.next;
 
677
        pj_list_erase(pending_tx);
 
678
 
 
679
        tdata = pending_tx->tdata_op_key->tdata;
 
680
        op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
 
681
 
 
682
        if (pending_tx->timeout.sec > 0 &&
 
683
            PJ_TIME_VAL_GT(now, pending_tx->timeout))
 
684
        {
 
685
            continue;
 
686
        }
 
687
 
 
688
        /* send! */
 
689
        size = tdata->buf.cur - tdata->buf.start;
 
690
        status = pj_activesock_send(tcp->asock, op_key, tdata->buf.start, 
 
691
                                    &size, 0);
 
692
        if (status != PJ_EPENDING) {
 
693
            pj_lock_release(tcp->base.lock);
 
694
            on_data_sent(tcp->asock, op_key, size);
 
695
            pj_lock_acquire(tcp->base.lock);
 
696
        }
 
697
 
 
698
    }
 
699
    pj_lock_release(tcp->base.lock);
 
700
}
 
701
 
 
702
 
 
703
/* Called by transport manager to destroy transport */
 
704
static pj_status_t tcp_destroy_transport(pjsip_transport *transport)
 
705
{
 
706
    struct tcp_transport *tcp = (struct tcp_transport*)transport;
 
707
 
 
708
    /* Transport would have been unregistered by now since this callback
 
709
     * is called by transport manager.
 
710
     */
 
711
    tcp->is_registered = PJ_FALSE;
 
712
 
 
713
    return tcp_destroy(transport, tcp->close_reason);
 
714
}
 
715
 
 
716
 
 
717
/* Destroy TCP transport */
 
718
static pj_status_t tcp_destroy(pjsip_transport *transport, 
 
719
                               pj_status_t reason)
 
720
{
 
721
    struct tcp_transport *tcp = (struct tcp_transport*)transport;
 
722
 
 
723
    if (tcp->close_reason == 0)
 
724
        tcp->close_reason = reason;
 
725
 
 
726
    if (tcp->is_registered) {
 
727
        tcp->is_registered = PJ_FALSE;
 
728
        pjsip_transport_destroy(transport);
 
729
 
 
730
        /* pjsip_transport_destroy will recursively call this function
 
731
         * again.
 
732
         */
 
733
        return PJ_SUCCESS;
 
734
    }
 
735
 
 
736
    /* Mark transport as closing */
 
737
    tcp->is_closing = PJ_TRUE;
 
738
 
 
739
    /* Stop keep-alive timer. */
 
740
    if (tcp->ka_timer.id) {
 
741
        pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer);
 
742
        tcp->ka_timer.id = PJ_FALSE;
 
743
    }
 
744
 
 
745
    /* Cancel all delayed transmits */
 
746
    while (!pj_list_empty(&tcp->delayed_list)) {
 
747
        struct delayed_tdata *pending_tx;
 
748
        pj_ioqueue_op_key_t *op_key;
 
749
 
 
750
        pending_tx = tcp->delayed_list.next;
 
751
        pj_list_erase(pending_tx);
 
752
 
 
753
        op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
 
754
 
 
755
        on_data_sent(tcp->asock, op_key, -reason);
 
756
    }
 
757
 
 
758
    if (tcp->rdata.tp_info.pool) {
 
759
        pj_pool_release(tcp->rdata.tp_info.pool);
 
760
        tcp->rdata.tp_info.pool = NULL;
 
761
    }
 
762
 
 
763
    if (tcp->asock) {
 
764
        pj_activesock_close(tcp->asock);
 
765
        tcp->asock = NULL;
 
766
        tcp->sock = PJ_INVALID_SOCKET;
 
767
    } else if (tcp->sock != PJ_INVALID_SOCKET) {
 
768
        pj_sock_close(tcp->sock);
 
769
        tcp->sock = PJ_INVALID_SOCKET;
 
770
    }
 
771
 
 
772
    if (tcp->base.lock) {
 
773
        pj_lock_destroy(tcp->base.lock);
 
774
        tcp->base.lock = NULL;
 
775
    }
 
776
 
 
777
    if (tcp->base.ref_cnt) {
 
778
        pj_atomic_destroy(tcp->base.ref_cnt);
 
779
        tcp->base.ref_cnt = NULL;
 
780
    }
 
781
 
 
782
    if (tcp->base.pool) {
 
783
        pj_pool_t *pool;
 
784
 
 
785
        if (reason != PJ_SUCCESS) {
 
786
            char errmsg[PJ_ERR_MSG_SIZE];
 
787
 
 
788
            pj_strerror(reason, errmsg, sizeof(errmsg));
 
789
            PJ_LOG(4,(tcp->base.obj_name, 
 
790
                      "TCP transport destroyed with reason %d: %s", 
 
791
                      reason, errmsg));
 
792
 
 
793
        } else {
 
794
 
 
795
            PJ_LOG(4,(tcp->base.obj_name, 
 
796
                      "TCP transport destroyed normally"));
 
797
 
 
798
        }
 
799
 
 
800
        pool = tcp->base.pool;
 
801
        tcp->base.pool = NULL;
 
802
        pj_pool_release(pool);
 
803
    }
 
804
 
 
805
    return PJ_SUCCESS;
 
806
}
 
807
 
 
808
 
 
809
/*
 
810
 * This utility function creates receive data buffers and start
 
811
 * asynchronous recv() operations from the socket. It is called after
 
812
 * accept() or connect() operation complete.
 
813
 */
 
814
static pj_status_t tcp_start_read(struct tcp_transport *tcp)
 
815
{
 
816
    pj_pool_t *pool;
 
817
    pj_ssize_t size;
 
818
    pj_sockaddr *rem_addr;
 
819
    void *readbuf[1];
 
820
    pj_status_t status;
 
821
 
 
822
    /* Init rdata */
 
823
    pool = pjsip_endpt_create_pool(tcp->base.endpt,
 
824
                                   "rtd%p",
 
825
                                   PJSIP_POOL_RDATA_LEN,
 
826
                                   PJSIP_POOL_RDATA_INC);
 
827
    if (!pool) {
 
828
        tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM);
 
829
        return PJ_ENOMEM;
 
830
    }
 
831
 
 
832
    tcp->rdata.tp_info.pool = pool;
 
833
 
 
834
    tcp->rdata.tp_info.transport = &tcp->base;
 
835
    tcp->rdata.tp_info.tp_data = tcp;
 
836
    tcp->rdata.tp_info.op_key.rdata = &tcp->rdata;
 
837
    pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key, 
 
838
                           sizeof(pj_ioqueue_op_key_t));
 
839
 
 
840
    tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr;
 
841
    tcp->rdata.pkt_info.src_addr_len = sizeof(tcp->rdata.pkt_info.src_addr);
 
842
    rem_addr = &tcp->base.key.rem_addr;
 
843
    pj_sockaddr_print(rem_addr, tcp->rdata.pkt_info.src_name,
 
844
                      sizeof(tcp->rdata.pkt_info.src_name), 0);
 
845
    tcp->rdata.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
 
846
 
 
847
    size = sizeof(tcp->rdata.pkt_info.packet);
 
848
    readbuf[0] = tcp->rdata.pkt_info.packet;
 
849
    status = pj_activesock_start_read2(tcp->asock, tcp->base.pool, size,
 
850
                                       readbuf, 0);
 
851
    if (status != PJ_SUCCESS && status != PJ_EPENDING) {
 
852
        PJ_LOG(4, (tcp->base.obj_name, 
 
853
                   "pj_activesock_start_read() error, status=%d", 
 
854
                   status));
 
855
        return status;
 
856
    }
 
857
 
 
858
    return PJ_SUCCESS;
 
859
}
 
860
 
 
861
 
 
862
/* This callback is called by transport manager for the TCP factory
 
863
 * to create outgoing transport to the specified destination.
 
864
 */
 
865
static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
 
866
                                        pjsip_tpmgr *mgr,
 
867
                                        pjsip_endpoint *endpt,
 
868
                                        const pj_sockaddr *rem_addr,
 
869
                                        int addr_len,
 
870
                                        pjsip_transport **p_transport)
 
871
{
 
872
    struct tcp_listener *listener;
 
873
    struct tcp_transport *tcp;
 
874
    pj_sock_t sock;
 
875
    pj_sockaddr local_addr;
 
876
    pj_status_t status;
 
877
 
 
878
    /* Sanity checks */
 
879
    PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr &&
 
880
                     addr_len && p_transport, PJ_EINVAL);
 
881
 
 
882
    /* Check that address is a sockaddr_in or sockaddr_in6*/
 
883
    PJ_ASSERT_RETURN((rem_addr->addr.sa_family == pj_AF_INET() &&
 
884
                      addr_len == sizeof(pj_sockaddr_in)) ||
 
885
                     (rem_addr->addr.sa_family == pj_AF_INET6() &&
 
886
                      addr_len == sizeof(pj_sockaddr_in6)), PJ_EINVAL);
 
887
 
 
888
 
 
889
    listener = (struct tcp_listener*)factory;
 
890
 
 
891
    /* Create socket */
 
892
    status = pj_sock_socket(rem_addr->addr.sa_family, pj_SOCK_STREAM(),
 
893
                            0, &sock);
 
894
    if (status != PJ_SUCCESS)
 
895
        return status;
 
896
 
 
897
    /* Apply QoS, if specified */
 
898
    status = pj_sock_apply_qos2(sock, listener->qos_type, 
 
899
                                &listener->qos_params, 
 
900
                                2, listener->factory.obj_name, 
 
901
                                "outgoing SIP TCP socket");
 
902
 
 
903
    /* Bind to listener's address and any port */
 
904
    pj_bzero(&local_addr, sizeof(local_addr));
 
905
    pj_sockaddr_cp(&local_addr, &listener->bound_addr);
 
906
    pj_sockaddr_set_port(&local_addr, 0);
 
907
 
 
908
    status = pj_sock_bind(sock, &local_addr,
 
909
                          pj_sockaddr_get_len(&local_addr));
 
910
    if (status != PJ_SUCCESS) {
 
911
        pj_sock_close(sock);
 
912
        return status;
 
913
    }
 
914
 
 
915
    /* Get the local port */
 
916
    addr_len = sizeof(local_addr);
 
917
    status = pj_sock_getsockname(sock, &local_addr, &addr_len);
 
918
    if (status != PJ_SUCCESS) {
 
919
        pj_sock_close(sock);
 
920
        return status;
 
921
    }
 
922
 
 
923
    /* Initially set the address from the listener's address */
 
924
    if (!pj_sockaddr_has_addr(&local_addr)) {
 
925
        pj_sockaddr_copy_addr(&local_addr, &listener->factory.local_addr);
 
926
    }
 
927
 
 
928
    /* Create the transport descriptor */
 
929
    status = tcp_create(listener, NULL, sock, PJ_FALSE, &local_addr, 
 
930
                        rem_addr, &tcp);
 
931
    if (status != PJ_SUCCESS)
 
932
        return status;
 
933
 
 
934
 
 
935
    /* Start asynchronous connect() operation */
 
936
    tcp->has_pending_connect = PJ_TRUE;
 
937
    status = pj_activesock_start_connect(tcp->asock, tcp->base.pool, rem_addr,
 
938
                                         addr_len);
 
939
    if (status == PJ_SUCCESS) {
 
940
        on_connect_complete(tcp->asock, PJ_SUCCESS);
 
941
    } else if (status != PJ_EPENDING) {
 
942
        tcp_destroy(&tcp->base, status);
 
943
        return status;
 
944
    }
 
945
 
 
946
    if (tcp->has_pending_connect) {
 
947
        /* Update (again) local address, just in case local address currently
 
948
         * set is different now that asynchronous connect() is started.
 
949
         */
 
950
        addr_len = sizeof(local_addr);
 
951
        if (pj_sock_getsockname(sock, &local_addr, &addr_len)==PJ_SUCCESS) {
 
952
            pj_sockaddr *tp_addr = &tcp->base.local_addr;
 
953
 
 
954
            /* Some systems (like old Win32 perhaps) may not set local address
 
955
             * properly before socket is fully connected.
 
956
             */
 
957
            if (pj_sockaddr_cmp(tp_addr, &local_addr) &&
 
958
                pj_sockaddr_get_port(&local_addr) != 0)
 
959
            {
 
960
                pj_sockaddr_cp(tp_addr, &local_addr);
 
961
                sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
 
962
                                      &local_addr);
 
963
            }
 
964
        }
 
965
        
 
966
        PJ_LOG(4,(tcp->base.obj_name, 
 
967
                  "TCP transport %.*s:%d is connecting to %.*s:%d...",
 
968
                  (int)tcp->base.local_name.host.slen,
 
969
                  tcp->base.local_name.host.ptr,
 
970
                  tcp->base.local_name.port,
 
971
                  (int)tcp->base.remote_name.host.slen,
 
972
                  tcp->base.remote_name.host.ptr,
 
973
                  tcp->base.remote_name.port));
 
974
    }
 
975
 
 
976
    /* Done */
 
977
    *p_transport = &tcp->base;
 
978
 
 
979
    return PJ_SUCCESS;
 
980
}
 
981
 
 
982
 
 
983
/*
 
984
 * This callback is called by active socket when pending accept() operation
 
985
 * has completed.
 
986
 */
 
987
static pj_bool_t on_accept_complete(pj_activesock_t *asock,
 
988
                                    pj_sock_t sock,
 
989
                                    const pj_sockaddr_t *src_addr,
 
990
                                    int src_addr_len)
 
991
{
 
992
    struct tcp_listener *listener;
 
993
    struct tcp_transport *tcp;
 
994
    char addr[PJ_INET6_ADDRSTRLEN+10];
 
995
    pjsip_tp_state_callback state_cb;
 
996
    pj_sockaddr tmp_src_addr;
 
997
    pj_status_t status;
 
998
 
 
999
    PJ_UNUSED_ARG(src_addr_len);
 
1000
 
 
1001
    listener = (struct tcp_listener*) pj_activesock_get_user_data(asock);
 
1002
 
 
1003
    PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_TRUE);
 
1004
 
 
1005
    PJ_LOG(4,(listener->factory.obj_name, 
 
1006
              "TCP listener %.*s:%d: got incoming TCP connection "
 
1007
              "from %s, sock=%d",
 
1008
              (int)listener->factory.addr_name.host.slen,
 
1009
              listener->factory.addr_name.host.ptr,
 
1010
              listener->factory.addr_name.port,
 
1011
              pj_sockaddr_print(src_addr, addr, sizeof(addr), 3),
 
1012
              sock));
 
1013
 
 
1014
    /* Apply QoS, if specified */
 
1015
    status = pj_sock_apply_qos2(sock, listener->qos_type, 
 
1016
                                &listener->qos_params, 
 
1017
                                2, listener->factory.obj_name, 
 
1018
                                "incoming SIP TCP socket");
 
1019
 
 
1020
    /* tcp_create() expect pj_sockaddr, so copy src_addr to temporary var,
 
1021
     * just in case.
 
1022
     */
 
1023
    pj_bzero(&tmp_src_addr, sizeof(tmp_src_addr));
 
1024
    pj_sockaddr_cp(&tmp_src_addr, src_addr);
 
1025
 
 
1026
    /* 
 
1027
     * Incoming connection!
 
1028
     * Create TCP transport for the new socket.
 
1029
     */
 
1030
    status = tcp_create( listener, NULL, sock, PJ_TRUE,
 
1031
                         &listener->factory.local_addr,
 
1032
                         &tmp_src_addr, &tcp);
 
1033
    if (status == PJ_SUCCESS) {
 
1034
        status = tcp_start_read(tcp);
 
1035
        if (status != PJ_SUCCESS) {
 
1036
            PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));
 
1037
            tcp_destroy(&tcp->base, status);
 
1038
        } else {
 
1039
            /* Start keep-alive timer */
 
1040
            if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
 
1041
                pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0};
 
1042
                pjsip_endpt_schedule_timer(listener->endpt, 
 
1043
                                           &tcp->ka_timer, 
 
1044
                                           &delay);
 
1045
                tcp->ka_timer.id = PJ_TRUE;
 
1046
                pj_gettimeofday(&tcp->last_activity);
 
1047
            }
 
1048
 
 
1049
            /* Notify application of transport state accepted */
 
1050
            state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
 
1051
            if (state_cb) {
 
1052
                pjsip_transport_state_info state_info;
 
1053
            
 
1054
                pj_bzero(&state_info, sizeof(state_info));
 
1055
                (*state_cb)(&tcp->base, PJSIP_TP_STATE_CONNECTED, &state_info);
 
1056
            }
 
1057
        }
 
1058
    }
 
1059
 
 
1060
    return PJ_TRUE;
 
1061
}
 
1062
 
 
1063
 
 
1064
/* 
 
1065
 * Callback from ioqueue when packet is sent.
 
1066
 */
 
1067
static pj_bool_t on_data_sent(pj_activesock_t *asock,
 
1068
                              pj_ioqueue_op_key_t *op_key,
 
1069
                              pj_ssize_t bytes_sent)
 
1070
{
 
1071
    struct tcp_transport *tcp = (struct tcp_transport*) 
 
1072
                                pj_activesock_get_user_data(asock);
 
1073
    pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
 
1074
 
 
1075
    /* Note that op_key may be the op_key from keep-alive, thus
 
1076
     * it will not have tdata etc.
 
1077
     */
 
1078
 
 
1079
    tdata_op_key->tdata = NULL;
 
1080
 
 
1081
    if (tdata_op_key->callback) {
 
1082
        /*
 
1083
         * Notify sip_transport.c that packet has been sent.
 
1084
         */
 
1085
        if (bytes_sent == 0)
 
1086
            bytes_sent = -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
 
1087
 
 
1088
        tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent);
 
1089
 
 
1090
        /* Mark last activity time */
 
1091
        pj_gettimeofday(&tcp->last_activity);
 
1092
 
 
1093
    }
 
1094
 
 
1095
    /* Check for error/closure */
 
1096
    if (bytes_sent <= 0) {
 
1097
        pj_status_t status;
 
1098
 
 
1099
        PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", 
 
1100
                  bytes_sent));
 
1101
 
 
1102
        status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) :
 
1103
                                     -bytes_sent;
 
1104
 
 
1105
        tcp_init_shutdown(tcp, status);
 
1106
 
 
1107
        return PJ_FALSE;
 
1108
    }
 
1109
 
 
1110
    return PJ_TRUE;
 
1111
}
 
1112
 
 
1113
 
 
1114
/* 
 
1115
 * This callback is called by transport manager to send SIP message 
 
1116
 */
 
1117
static pj_status_t tcp_send_msg(pjsip_transport *transport, 
 
1118
                                pjsip_tx_data *tdata,
 
1119
                                const pj_sockaddr_t *rem_addr,
 
1120
                                int addr_len,
 
1121
                                void *token,
 
1122
                                pjsip_transport_callback callback)
 
1123
{
 
1124
    struct tcp_transport *tcp = (struct tcp_transport*)transport;
 
1125
    pj_ssize_t size;
 
1126
    pj_bool_t delayed = PJ_FALSE;
 
1127
    pj_status_t status = PJ_SUCCESS;
 
1128
 
 
1129
    /* Sanity check */
 
1130
    PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
 
1131
 
 
1132
    /* Check that there's no pending operation associated with the tdata */
 
1133
    PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
 
1134
    
 
1135
    /* Check the address is supported */
 
1136
    PJ_ASSERT_RETURN(rem_addr && (addr_len==sizeof(pj_sockaddr_in) ||
 
1137
                                  addr_len==sizeof(pj_sockaddr_in6)),
 
1138
                     PJ_EINVAL);
 
1139
 
 
1140
    /* Init op key. */
 
1141
    tdata->op_key.tdata = tdata;
 
1142
    tdata->op_key.token = token;
 
1143
    tdata->op_key.callback = callback;
 
1144
 
 
1145
    /* If asynchronous connect() has not completed yet, just put the
 
1146
     * transmit data in the pending transmission list since we can not
 
1147
     * use the socket yet.
 
1148
     */
 
1149
    if (tcp->has_pending_connect) {
 
1150
 
 
1151
        /*
 
1152
         * Looks like connect() is still in progress. Check again (this time
 
1153
         * with holding the lock) to be sure.
 
1154
         */
 
1155
        pj_lock_acquire(tcp->base.lock);
 
1156
 
 
1157
        if (tcp->has_pending_connect) {
 
1158
            struct delayed_tdata *delayed_tdata;
 
1159
 
 
1160
            /*
 
1161
             * connect() is still in progress. Put the transmit data to
 
1162
             * the delayed list.
 
1163
             * Starting from #1583 (https://trac.pjsip.org/repos/ticket/1583),
 
1164
             * we also add timeout value for the transmit data. When the
 
1165
             * connect() is completed, the timeout value will be checked to
 
1166
             * determine whether the transmit data needs to be sent.
 
1167
             */
 
1168
            delayed_tdata = PJ_POOL_ZALLOC_T(tdata->pool, 
 
1169
                                             struct delayed_tdata);
 
1170
            delayed_tdata->tdata_op_key = &tdata->op_key;
 
1171
            if (tdata->msg && tdata->msg->type == PJSIP_REQUEST_MSG) {
 
1172
                pj_gettickcount(&delayed_tdata->timeout);
 
1173
                delayed_tdata->timeout.msec += pjsip_cfg()->tsx.td;
 
1174
                pj_time_val_normalize(&delayed_tdata->timeout);
 
1175
            }
 
1176
 
 
1177
            pj_list_push_back(&tcp->delayed_list, delayed_tdata);
 
1178
            status = PJ_EPENDING;
 
1179
 
 
1180
            /* Prevent pj_ioqueue_send() to be called below */
 
1181
            delayed = PJ_TRUE;
 
1182
        }
 
1183
 
 
1184
        pj_lock_release(tcp->base.lock);
 
1185
    } 
 
1186
    
 
1187
    if (!delayed) {
 
1188
        /*
 
1189
         * Transport is ready to go. Send the packet to ioqueue to be
 
1190
         * sent asynchronously.
 
1191
         */
 
1192
        size = tdata->buf.cur - tdata->buf.start;
 
1193
        status = pj_activesock_send(tcp->asock, 
 
1194
                                    (pj_ioqueue_op_key_t*)&tdata->op_key,
 
1195
                                    tdata->buf.start, &size, 0);
 
1196
 
 
1197
        if (status != PJ_EPENDING) {
 
1198
            /* Not pending (could be immediate success or error) */
 
1199
            tdata->op_key.tdata = NULL;
 
1200
 
 
1201
            /* Shutdown transport on closure/errors */
 
1202
            if (size <= 0) {
 
1203
 
 
1204
                PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", 
 
1205
                          size));
 
1206
 
 
1207
                if (status == PJ_SUCCESS) 
 
1208
                    status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
 
1209
 
 
1210
                tcp_init_shutdown(tcp, status);
 
1211
            }
 
1212
        }
 
1213
    }
 
1214
 
 
1215
    return status;
 
1216
}
 
1217
 
 
1218
 
 
1219
/* 
 
1220
 * This callback is called by transport manager to shutdown transport.
 
1221
 */
 
1222
static pj_status_t tcp_shutdown(pjsip_transport *transport)
 
1223
{
 
1224
    struct tcp_transport *tcp = (struct tcp_transport*)transport;
 
1225
    
 
1226
    /* Stop keep-alive timer. */
 
1227
    if (tcp->ka_timer.id) {
 
1228
        pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer);
 
1229
        tcp->ka_timer.id = PJ_FALSE;
 
1230
    }
 
1231
 
 
1232
    return PJ_SUCCESS;
 
1233
}
 
1234
 
 
1235
 
 
1236
/* 
 
1237
 * Callback from ioqueue that an incoming data is received from the socket.
 
1238
 */
 
1239
static pj_bool_t on_data_read(pj_activesock_t *asock,
 
1240
                              void *data,
 
1241
                              pj_size_t size,
 
1242
                              pj_status_t status,
 
1243
                              pj_size_t *remainder)
 
1244
{
 
1245
    enum { MAX_IMMEDIATE_PACKET = 10 };
 
1246
    struct tcp_transport *tcp;
 
1247
    pjsip_rx_data *rdata;
 
1248
 
 
1249
    PJ_UNUSED_ARG(data);
 
1250
 
 
1251
    tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
 
1252
    rdata = &tcp->rdata;
 
1253
 
 
1254
    /* Don't do anything if transport is closing. */
 
1255
    if (tcp->is_closing) {
 
1256
        tcp->is_closing++;
 
1257
        return PJ_FALSE;
 
1258
    }
 
1259
 
 
1260
    /* Houston, we have packet! Report the packet to transport manager
 
1261
     * to be parsed.
 
1262
     */
 
1263
    if (status == PJ_SUCCESS) {
 
1264
        pj_size_t size_eaten;
 
1265
 
 
1266
        /* Mark this as an activity */
 
1267
        pj_gettimeofday(&tcp->last_activity);
 
1268
 
 
1269
        pj_assert((void*)rdata->pkt_info.packet == data);
 
1270
 
 
1271
        /* Init pkt_info part. */
 
1272
        rdata->pkt_info.len = size;
 
1273
        rdata->pkt_info.zero = 0;
 
1274
        pj_gettimeofday(&rdata->pkt_info.timestamp);
 
1275
 
 
1276
        /* Report to transport manager.
 
1277
         * The transport manager will tell us how many bytes of the packet
 
1278
         * have been processed (as valid SIP message).
 
1279
         */
 
1280
        size_eaten = 
 
1281
            pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, 
 
1282
                                       rdata);
 
1283
 
 
1284
        pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len);
 
1285
 
 
1286
        /* Move unprocessed data to the front of the buffer */
 
1287
        *remainder = size - size_eaten;
 
1288
        if (*remainder > 0 && *remainder != size) {
 
1289
            pj_memmove(rdata->pkt_info.packet,
 
1290
                       rdata->pkt_info.packet + size_eaten,
 
1291
                       *remainder);
 
1292
        }
 
1293
 
 
1294
    } else {
 
1295
 
 
1296
        /* Transport is closed */
 
1297
        PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed"));
 
1298
        
 
1299
        tcp_init_shutdown(tcp, status);
 
1300
 
 
1301
        return PJ_FALSE;
 
1302
 
 
1303
    }
 
1304
 
 
1305
    /* Reset pool. */
 
1306
    pj_pool_reset(rdata->tp_info.pool);
 
1307
 
 
1308
    return PJ_TRUE;
 
1309
}
 
1310
 
 
1311
 
 
1312
/* 
 
1313
 * Callback from ioqueue when asynchronous connect() operation completes.
 
1314
 */
 
1315
static pj_bool_t on_connect_complete(pj_activesock_t *asock,
 
1316
                                     pj_status_t status)
 
1317
{
 
1318
    struct tcp_transport *tcp;
 
1319
    pj_sockaddr addr;
 
1320
    int addrlen;
 
1321
    pjsip_tp_state_callback state_cb;
 
1322
 
 
1323
    tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
 
1324
 
 
1325
    /* Mark that pending connect() operation has completed. */
 
1326
    tcp->has_pending_connect = PJ_FALSE;
 
1327
 
 
1328
    /* Check connect() status */
 
1329
    if (status != PJ_SUCCESS) {
 
1330
 
 
1331
        tcp_perror(tcp->base.obj_name, "TCP connect() error", status);
 
1332
 
 
1333
        /* Cancel all delayed transmits */
 
1334
        while (!pj_list_empty(&tcp->delayed_list)) {
 
1335
            struct delayed_tdata *pending_tx;
 
1336
            pj_ioqueue_op_key_t *op_key;
 
1337
 
 
1338
            pending_tx = tcp->delayed_list.next;
 
1339
            pj_list_erase(pending_tx);
 
1340
 
 
1341
            op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
 
1342
 
 
1343
            on_data_sent(tcp->asock, op_key, -status);
 
1344
        }
 
1345
 
 
1346
        tcp_init_shutdown(tcp, status);
 
1347
        return PJ_FALSE;
 
1348
    }
 
1349
 
 
1350
    PJ_LOG(4,(tcp->base.obj_name, 
 
1351
              "TCP transport %.*s:%d is connected to %.*s:%d",
 
1352
              (int)tcp->base.local_name.host.slen,
 
1353
              tcp->base.local_name.host.ptr,
 
1354
              tcp->base.local_name.port,
 
1355
              (int)tcp->base.remote_name.host.slen,
 
1356
              tcp->base.remote_name.host.ptr,
 
1357
              tcp->base.remote_name.port));
 
1358
 
 
1359
 
 
1360
    /* Update (again) local address, just in case local address currently
 
1361
     * set is different now that the socket is connected (could happen
 
1362
     * on some systems, like old Win32 probably?).
 
1363
     */
 
1364
    addrlen = sizeof(addr);
 
1365
    if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) {
 
1366
        pj_sockaddr *tp_addr = &tcp->base.local_addr;
 
1367
 
 
1368
        if (pj_sockaddr_has_addr(&addr) &&
 
1369
            pj_sockaddr_cmp(&addr, tp_addr) != 0)
 
1370
        {
 
1371
            pj_sockaddr_cp(tp_addr, &addr);
 
1372
            sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
 
1373
                                  tp_addr);
 
1374
        }
 
1375
    }
 
1376
 
 
1377
    /* Start pending read */
 
1378
    status = tcp_start_read(tcp);
 
1379
    if (status != PJ_SUCCESS) {
 
1380
        tcp_init_shutdown(tcp, status);
 
1381
        return PJ_FALSE;
 
1382
    }
 
1383
 
 
1384
    /* Notify application of transport state connected */
 
1385
    state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
 
1386
    if (state_cb) {
 
1387
        pjsip_transport_state_info state_info;
 
1388
    
 
1389
        pj_bzero(&state_info, sizeof(state_info));
 
1390
        (*state_cb)(&tcp->base, PJSIP_TP_STATE_CONNECTED, &state_info);
 
1391
    }
 
1392
 
 
1393
    /* Flush all pending send operations */
 
1394
    tcp_flush_pending_tx(tcp);
 
1395
 
 
1396
    /* Start keep-alive timer */
 
1397
    if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
 
1398
        pj_time_val delay = { PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0 };
 
1399
        pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer, 
 
1400
                                   &delay);
 
1401
        tcp->ka_timer.id = PJ_TRUE;
 
1402
        pj_gettimeofday(&tcp->last_activity);
 
1403
    }
 
1404
 
 
1405
    return PJ_TRUE;
 
1406
}
 
1407
 
 
1408
/* Transport keep-alive timer callback */
 
1409
static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e)
 
1410
{
 
1411
    struct tcp_transport *tcp = (struct tcp_transport*) e->user_data;
 
1412
    pj_time_val delay;
 
1413
    pj_time_val now;
 
1414
    pj_ssize_t size;
 
1415
    pj_status_t status;
 
1416
 
 
1417
    PJ_UNUSED_ARG(th);
 
1418
 
 
1419
    tcp->ka_timer.id = PJ_TRUE;
 
1420
 
 
1421
    pj_gettimeofday(&now);
 
1422
    PJ_TIME_VAL_SUB(now, tcp->last_activity);
 
1423
 
 
1424
    if (now.sec > 0 && now.sec < PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
 
1425
        /* There has been activity, so don't send keep-alive */
 
1426
        delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL - now.sec;
 
1427
        delay.msec = 0;
 
1428
 
 
1429
        pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer, 
 
1430
                                   &delay);
 
1431
        tcp->ka_timer.id = PJ_TRUE;
 
1432
        return;
 
1433
    }
 
1434
 
 
1435
    PJ_LOG(5,(tcp->base.obj_name, "Sending %d byte(s) keep-alive to %.*s:%d", 
 
1436
              (int)tcp->ka_pkt.slen, (int)tcp->base.remote_name.host.slen,
 
1437
              tcp->base.remote_name.host.ptr,
 
1438
              tcp->base.remote_name.port));
 
1439
 
 
1440
    /* Send the data */
 
1441
    size = tcp->ka_pkt.slen;
 
1442
    status = pj_activesock_send(tcp->asock, &tcp->ka_op_key.key,
 
1443
                                tcp->ka_pkt.ptr, &size, 0);
 
1444
 
 
1445
    if (status != PJ_SUCCESS && status != PJ_EPENDING) {
 
1446
        tcp_perror(tcp->base.obj_name, 
 
1447
                   "Error sending keep-alive packet", status);
 
1448
        tcp_init_shutdown(tcp, status);
 
1449
        return;
 
1450
    }
 
1451
 
 
1452
    /* Register next keep-alive */
 
1453
    delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL;
 
1454
    delay.msec = 0;
 
1455
 
 
1456
    pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer, 
 
1457
                               &delay);
 
1458
    tcp->ka_timer.id = PJ_TRUE;
 
1459
}
 
1460
 
 
1461
 
 
1462
#endif  /* PJ_HAS_TCP */
 
1463