~noskcaj/ubuntu/saucy/sflphone/merge-1.2.3-2

« back to all changes in this revision

Viewing changes to daemon/libs/pjproject/pjnath/src/pjturn-srv/listener_tcp.c

  • Committer: Jackson Doak
  • Date: 2013-07-10 21:04:46 UTC
  • mfrom: (20.1.3 sid)
  • Revision ID: noskcaj@ubuntu.com-20130710210446-y8f587vza807icr9
Properly merged from upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* $Id: listener_tcp.c 3553 2011-05-05 06:14:19Z nanang $ */
2
 
/* 
3
 
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
 
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5
 
 *
6
 
 * This program is free software; you can redistribute it and/or modify
7
 
 * it under the terms of the GNU General Public License as published by
8
 
 * the Free Software Foundation; either version 2 of the License, or
9
 
 * (at your option) any later version.
10
 
 *
11
 
 * This program is distributed in the hope that it will be useful,
12
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 * GNU General Public License for more details.
15
 
 *
16
 
 * You should have received a copy of the GNU General Public License
17
 
 * along with this program; if not, write to the Free Software
18
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
19
 
 */
20
 
#include "turn.h"
21
 
#include <pj/compat/socket.h>
22
 
 
23
 
#if PJ_HAS_TCP
24
 
 
25
 
struct accept_op
26
 
{
27
 
    pj_ioqueue_op_key_t op_key;
28
 
    pj_sock_t           sock;
29
 
    pj_sockaddr         src_addr;
30
 
    int                 src_addr_len;
31
 
};
32
 
 
33
 
struct tcp_listener
34
 
{
35
 
    pj_turn_listener         base;
36
 
    pj_ioqueue_key_t        *key;
37
 
    unsigned                 accept_cnt;
38
 
    struct accept_op        *accept_op; /* Array of accept_op's */
39
 
};
40
 
 
41
 
 
42
 
static void lis_on_accept_complete(pj_ioqueue_key_t *key, 
43
 
                                   pj_ioqueue_op_key_t *op_key, 
44
 
                                   pj_sock_t sock, 
45
 
                                   pj_status_t status);
46
 
static pj_status_t lis_destroy(pj_turn_listener *listener);
47
 
static void transport_create(pj_sock_t sock, pj_turn_listener *lis,
48
 
                             pj_sockaddr_t *src_addr, int src_addr_len);
49
 
 
50
 
static void show_err(const char *sender, const char *title, 
51
 
                     pj_status_t status)
52
 
{
53
 
    char errmsg[PJ_ERR_MSG_SIZE];
54
 
 
55
 
    pj_strerror(status, errmsg, sizeof(errmsg));
56
 
    PJ_LOG(4,(sender, "%s: %s", title, errmsg));
57
 
}
58
 
 
59
 
 
60
 
/*
61
 
 * Create a new listener on the specified port.
62
 
 */
63
 
PJ_DEF(pj_status_t) pj_turn_listener_create_tcp(pj_turn_srv *srv,
64
 
                                                int af,
65
 
                                                const pj_str_t *bound_addr,
66
 
                                                unsigned port,
67
 
                                                unsigned concurrency_cnt,
68
 
                                                unsigned flags,
69
 
                                                pj_turn_listener **p_listener)
70
 
{
71
 
    pj_pool_t *pool;
72
 
    struct tcp_listener *tcp_lis;
73
 
    pj_ioqueue_callback ioqueue_cb;
74
 
    unsigned i;
75
 
    pj_status_t status;
76
 
 
77
 
    /* Create structure */
78
 
    pool = pj_pool_create(srv->core.pf, "tcpl%p", 1000, 1000, NULL);
79
 
    tcp_lis = PJ_POOL_ZALLOC_T(pool, struct tcp_listener);
80
 
    tcp_lis->base.pool = pool;
81
 
    tcp_lis->base.obj_name = pool->obj_name;
82
 
    tcp_lis->base.server = srv;
83
 
    tcp_lis->base.tp_type = PJ_TURN_TP_TCP;
84
 
    tcp_lis->base.sock = PJ_INVALID_SOCKET;
85
 
    //tcp_lis->base.sendto = &tcp_sendto;
86
 
    tcp_lis->base.destroy = &lis_destroy;
87
 
    tcp_lis->accept_cnt = concurrency_cnt;
88
 
    tcp_lis->base.flags = flags;
89
 
 
90
 
    /* Create socket */
91
 
    status = pj_sock_socket(af, pj_SOCK_STREAM(), 0, &tcp_lis->base.sock);
92
 
    if (status != PJ_SUCCESS)
93
 
        goto on_error;
94
 
 
95
 
    /* Init bind address */
96
 
    status = pj_sockaddr_init(af, &tcp_lis->base.addr, bound_addr, 
97
 
                              (pj_uint16_t)port);
98
 
    if (status != PJ_SUCCESS) 
99
 
        goto on_error;
100
 
    
101
 
    /* Create info */
102
 
    pj_ansi_strcpy(tcp_lis->base.info, "TCP:");
103
 
    pj_sockaddr_print(&tcp_lis->base.addr, tcp_lis->base.info+4, 
104
 
                      sizeof(tcp_lis->base.info)-4, 3);
105
 
 
106
 
    /* Bind socket */
107
 
    status = pj_sock_bind(tcp_lis->base.sock, &tcp_lis->base.addr, 
108
 
                          pj_sockaddr_get_len(&tcp_lis->base.addr));
109
 
    if (status != PJ_SUCCESS)
110
 
        goto on_error;
111
 
 
112
 
    /* Listen() */
113
 
    status = pj_sock_listen(tcp_lis->base.sock, 5);
114
 
    if (status != PJ_SUCCESS)
115
 
        goto on_error;
116
 
 
117
 
    /* Register to ioqueue */
118
 
    pj_bzero(&ioqueue_cb, sizeof(ioqueue_cb));
119
 
    ioqueue_cb.on_accept_complete = &lis_on_accept_complete;
120
 
    status = pj_ioqueue_register_sock(pool, srv->core.ioqueue, tcp_lis->base.sock,
121
 
                                      tcp_lis, &ioqueue_cb, &tcp_lis->key);
122
 
 
123
 
    /* Create op keys */
124
 
    tcp_lis->accept_op = (struct accept_op*)pj_pool_calloc(pool, concurrency_cnt,
125
 
                                                    sizeof(struct accept_op));
126
 
 
127
 
    /* Create each accept_op and kick off read operation */
128
 
    for (i=0; i<concurrency_cnt; ++i) {
129
 
        lis_on_accept_complete(tcp_lis->key, &tcp_lis->accept_op[i].op_key, 
130
 
                               PJ_INVALID_SOCKET, PJ_EPENDING);
131
 
    }
132
 
 
133
 
    /* Done */
134
 
    PJ_LOG(4,(tcp_lis->base.obj_name, "Listener %s created", 
135
 
           tcp_lis->base.info));
136
 
 
137
 
    *p_listener = &tcp_lis->base;
138
 
    return PJ_SUCCESS;
139
 
 
140
 
 
141
 
on_error:
142
 
    lis_destroy(&tcp_lis->base);
143
 
    return status;
144
 
}
145
 
 
146
 
 
147
 
/*
148
 
 * Destroy listener.
149
 
 */
150
 
static pj_status_t lis_destroy(pj_turn_listener *listener)
151
 
{
152
 
    struct tcp_listener *tcp_lis = (struct tcp_listener *)listener;
153
 
    unsigned i;
154
 
 
155
 
    if (tcp_lis->key) {
156
 
        pj_ioqueue_unregister(tcp_lis->key);
157
 
        tcp_lis->key = NULL;
158
 
        tcp_lis->base.sock = PJ_INVALID_SOCKET;
159
 
    } else if (tcp_lis->base.sock != PJ_INVALID_SOCKET) {
160
 
        pj_sock_close(tcp_lis->base.sock);
161
 
        tcp_lis->base.sock = PJ_INVALID_SOCKET;
162
 
    }
163
 
 
164
 
    for (i=0; i<tcp_lis->accept_cnt; ++i) {
165
 
        /* Nothing to do */
166
 
    }
167
 
 
168
 
    if (tcp_lis->base.pool) {
169
 
        pj_pool_t *pool = tcp_lis->base.pool;
170
 
 
171
 
        PJ_LOG(4,(tcp_lis->base.obj_name, "Listener %s destroyed", 
172
 
                  tcp_lis->base.info));
173
 
 
174
 
        tcp_lis->base.pool = NULL;
175
 
        pj_pool_release(pool);
176
 
    }
177
 
    return PJ_SUCCESS;
178
 
}
179
 
 
180
 
 
181
 
/*
182
 
 * Callback on new TCP connection.
183
 
 */
184
 
static void lis_on_accept_complete(pj_ioqueue_key_t *key, 
185
 
                                   pj_ioqueue_op_key_t *op_key, 
186
 
                                   pj_sock_t sock, 
187
 
                                   pj_status_t status)
188
 
{
189
 
    struct tcp_listener *tcp_lis;
190
 
    struct accept_op *accept_op = (struct accept_op*) op_key;
191
 
 
192
 
    tcp_lis = (struct tcp_listener*) pj_ioqueue_get_user_data(key);
193
 
 
194
 
    PJ_UNUSED_ARG(sock);
195
 
 
196
 
    do {
197
 
        /* Report new connection. */
198
 
        if (status == PJ_SUCCESS) {
199
 
            char addr[PJ_INET6_ADDRSTRLEN+8];
200
 
            PJ_LOG(5,(tcp_lis->base.obj_name, "Incoming TCP from %s",
201
 
                      pj_sockaddr_print(&accept_op->src_addr, addr,
202
 
                                        sizeof(addr), 3)));
203
 
            transport_create(accept_op->sock, &tcp_lis->base,
204
 
                             &accept_op->src_addr, accept_op->src_addr_len);
205
 
        } else if (status != PJ_EPENDING) {
206
 
            show_err(tcp_lis->base.obj_name, "accept()", status);
207
 
        }
208
 
 
209
 
        /* Prepare next accept() */
210
 
        accept_op->src_addr_len = sizeof(accept_op->src_addr);
211
 
        status = pj_ioqueue_accept(key, op_key, &accept_op->sock,
212
 
                                   NULL,
213
 
                                   &accept_op->src_addr,
214
 
                                   &accept_op->src_addr_len);
215
 
 
216
 
    } while (status != PJ_EPENDING && status != PJ_ECANCELLED &&
217
 
             status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL));
218
 
}
219
 
 
220
 
 
221
 
/****************************************************************************/
222
 
/*
223
 
 * Transport
224
 
 */
225
 
enum
226
 
{
227
 
    TIMER_NONE,
228
 
    TIMER_DESTROY
229
 
};
230
 
 
231
 
/* The delay in seconds to be applied before TCP transport is destroyed when 
232
 
 * no allocation is referencing it. This also means the initial time to wait
233
 
 * after the initial TCP connection establishment to receive a valid STUN
234
 
 * message in the transport.
235
 
 */
236
 
#define SHUTDOWN_DELAY  10
237
 
 
238
 
struct recv_op
239
 
{
240
 
    pj_ioqueue_op_key_t op_key;
241
 
    pj_turn_pkt         pkt;
242
 
};
243
 
 
244
 
struct tcp_transport
245
 
{
246
 
    pj_turn_transport    base;
247
 
    pj_pool_t           *pool;
248
 
    pj_timer_entry       timer;
249
 
 
250
 
    pj_turn_allocation  *alloc;
251
 
    int                  ref_cnt;
252
 
 
253
 
    pj_sock_t            sock;
254
 
    pj_ioqueue_key_t    *key;
255
 
    struct recv_op       recv_op;
256
 
    pj_ioqueue_op_key_t  send_op;
257
 
};
258
 
 
259
 
 
260
 
static void tcp_on_read_complete(pj_ioqueue_key_t *key, 
261
 
                                 pj_ioqueue_op_key_t *op_key, 
262
 
                                 pj_ssize_t bytes_read);
263
 
 
264
 
static pj_status_t tcp_sendto(pj_turn_transport *tp,
265
 
                              const void *packet,
266
 
                              pj_size_t size,
267
 
                              unsigned flag,
268
 
                              const pj_sockaddr_t *addr,
269
 
                              int addr_len);
270
 
static void tcp_destroy(struct tcp_transport *tcp);
271
 
static void tcp_add_ref(pj_turn_transport *tp,
272
 
                        pj_turn_allocation *alloc);
273
 
static void tcp_dec_ref(pj_turn_transport *tp,
274
 
                        pj_turn_allocation *alloc);
275
 
static void timer_callback(pj_timer_heap_t *timer_heap,
276
 
                           pj_timer_entry *entry);
277
 
 
278
 
static void transport_create(pj_sock_t sock, pj_turn_listener *lis,
279
 
                             pj_sockaddr_t *src_addr, int src_addr_len)
280
 
{
281
 
    pj_pool_t *pool;
282
 
    struct tcp_transport *tcp;
283
 
    pj_ioqueue_callback cb;
284
 
    pj_status_t status;
285
 
 
286
 
    pool = pj_pool_create(lis->server->core.pf, "tcp%p", 1000, 1000, NULL);
287
 
 
288
 
    tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport);
289
 
    tcp->base.obj_name = pool->obj_name;
290
 
    tcp->base.listener = lis;
291
 
    tcp->base.info = lis->info;
292
 
    tcp->base.sendto = &tcp_sendto;
293
 
    tcp->base.add_ref = &tcp_add_ref;
294
 
    tcp->base.dec_ref = &tcp_dec_ref;
295
 
    tcp->pool = pool;
296
 
    tcp->sock = sock;
297
 
 
298
 
    pj_timer_entry_init(&tcp->timer, TIMER_NONE, tcp, &timer_callback);
299
 
 
300
 
    /* Register to ioqueue */
301
 
    pj_bzero(&cb, sizeof(cb));
302
 
    cb.on_read_complete = &tcp_on_read_complete;
303
 
    status = pj_ioqueue_register_sock(pool, lis->server->core.ioqueue, sock,
304
 
                                      tcp, &cb, &tcp->key);
305
 
    if (status != PJ_SUCCESS) {
306
 
        tcp_destroy(tcp);
307
 
        return;
308
 
    }
309
 
 
310
 
    /* Init pkt */
311
 
    tcp->recv_op.pkt.pool = pj_pool_create(lis->server->core.pf, "tcpkt%p", 
312
 
                                           1000, 1000, NULL);
313
 
    tcp->recv_op.pkt.transport = &tcp->base;
314
 
    tcp->recv_op.pkt.src.tp_type = PJ_TURN_TP_TCP;
315
 
    tcp->recv_op.pkt.src_addr_len = src_addr_len;
316
 
    pj_memcpy(&tcp->recv_op.pkt.src.clt_addr, src_addr, src_addr_len);
317
 
 
318
 
    tcp_on_read_complete(tcp->key, &tcp->recv_op.op_key, -PJ_EPENDING);
319
 
    /* Should not access transport from now, it may have been destroyed */
320
 
}
321
 
 
322
 
 
323
 
static void tcp_destroy(struct tcp_transport *tcp)
324
 
{
325
 
    if (tcp->key) {
326
 
        pj_ioqueue_unregister(tcp->key);
327
 
        tcp->key = NULL;
328
 
        tcp->sock = 0;
329
 
    } else if (tcp->sock) {
330
 
        pj_sock_close(tcp->sock);
331
 
        tcp->sock = 0;
332
 
    }
333
 
 
334
 
    if (tcp->pool) {
335
 
        pj_pool_release(tcp->pool);
336
 
    }
337
 
}
338
 
 
339
 
 
340
 
static void timer_callback(pj_timer_heap_t *timer_heap,
341
 
                           pj_timer_entry *entry)
342
 
{
343
 
    struct tcp_transport *tcp = (struct tcp_transport*) entry->user_data;
344
 
 
345
 
    PJ_UNUSED_ARG(timer_heap);
346
 
 
347
 
    tcp_destroy(tcp);
348
 
}
349
 
 
350
 
 
351
 
static void tcp_on_read_complete(pj_ioqueue_key_t *key, 
352
 
                                 pj_ioqueue_op_key_t *op_key, 
353
 
                                 pj_ssize_t bytes_read)
354
 
{
355
 
    struct tcp_transport *tcp;
356
 
    struct recv_op *recv_op = (struct recv_op*) op_key;
357
 
    pj_status_t status;
358
 
 
359
 
    tcp = (struct tcp_transport*) pj_ioqueue_get_user_data(key);
360
 
 
361
 
    do {
362
 
        /* Report to server or allocation, if we have allocation */
363
 
        if (bytes_read > 0) {
364
 
 
365
 
            recv_op->pkt.len = bytes_read;
366
 
            pj_gettimeofday(&recv_op->pkt.rx_time);
367
 
 
368
 
            tcp_add_ref(&tcp->base, NULL);
369
 
 
370
 
            if (tcp->alloc) {
371
 
                pj_turn_allocation_on_rx_client_pkt(tcp->alloc, &recv_op->pkt);
372
 
            } else {
373
 
                pj_turn_srv_on_rx_pkt(tcp->base.listener->server, &recv_op->pkt);
374
 
            }
375
 
 
376
 
            pj_assert(tcp->ref_cnt > 0);
377
 
            tcp_dec_ref(&tcp->base, NULL);
378
 
 
379
 
        } else if (bytes_read != -PJ_EPENDING) {
380
 
            /* TCP connection closed/error. Notify client and then destroy 
381
 
             * ourselves.
382
 
             * Note: the -PJ_EPENDING is the value passed during init.
383
 
             */
384
 
            ++tcp->ref_cnt;
385
 
 
386
 
            if (tcp->alloc) {
387
 
                if (bytes_read != 0) {
388
 
                    show_err(tcp->base.obj_name, "TCP socket error", 
389
 
                             -bytes_read);
390
 
                } else {
391
 
                    PJ_LOG(5,(tcp->base.obj_name, "TCP socket closed"));
392
 
                }
393
 
                pj_turn_allocation_on_transport_closed(tcp->alloc, &tcp->base);
394
 
                tcp->alloc = NULL;
395
 
            }
396
 
 
397
 
            pj_assert(tcp->ref_cnt > 0);
398
 
            if (--tcp->ref_cnt == 0) {
399
 
                tcp_destroy(tcp);
400
 
                return;
401
 
            }
402
 
        }
403
 
 
404
 
        /* Reset pool */
405
 
        pj_pool_reset(recv_op->pkt.pool);
406
 
 
407
 
        /* If packet is full discard it */
408
 
        if (recv_op->pkt.len == sizeof(recv_op->pkt.pkt)) {
409
 
            PJ_LOG(4,(tcp->base.obj_name, "Buffer discarded"));
410
 
            recv_op->pkt.len = 0;
411
 
        }
412
 
 
413
 
        /* Read next packet */
414
 
        bytes_read = sizeof(recv_op->pkt.pkt) - recv_op->pkt.len;
415
 
        status = pj_ioqueue_recv(tcp->key, op_key,
416
 
                                 recv_op->pkt.pkt + recv_op->pkt.len, 
417
 
                                 &bytes_read, 0);
418
 
 
419
 
        if (status != PJ_EPENDING && status != PJ_SUCCESS)
420
 
            bytes_read = -status;
421
 
 
422
 
    } while (status != PJ_EPENDING && status != PJ_ECANCELLED &&
423
 
             status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL));
424
 
 
425
 
}
426
 
 
427
 
 
428
 
static pj_status_t tcp_sendto(pj_turn_transport *tp,
429
 
                              const void *packet,
430
 
                              pj_size_t size,
431
 
                              unsigned flag,
432
 
                              const pj_sockaddr_t *addr,
433
 
                              int addr_len)
434
 
{
435
 
    struct tcp_transport *tcp = (struct tcp_transport*) tp;
436
 
    pj_ssize_t length = size;
437
 
 
438
 
    PJ_UNUSED_ARG(addr);
439
 
    PJ_UNUSED_ARG(addr_len);
440
 
 
441
 
    return pj_ioqueue_send(tcp->key, &tcp->send_op, packet, &length, flag);
442
 
}
443
 
 
444
 
 
445
 
static void tcp_add_ref(pj_turn_transport *tp,
446
 
                        pj_turn_allocation *alloc)
447
 
{
448
 
    struct tcp_transport *tcp = (struct tcp_transport*) tp;
449
 
 
450
 
    ++tcp->ref_cnt;
451
 
 
452
 
    if (tcp->alloc == NULL && alloc) {
453
 
        tcp->alloc = alloc;
454
 
    }
455
 
 
456
 
    /* Cancel shutdown timer if it's running */
457
 
    if (tcp->timer.id != TIMER_NONE) {
458
 
        pj_timer_heap_cancel(tcp->base.listener->server->core.timer_heap,
459
 
                             &tcp->timer);
460
 
        tcp->timer.id = TIMER_NONE;
461
 
    }
462
 
}
463
 
 
464
 
 
465
 
static void tcp_dec_ref(pj_turn_transport *tp,
466
 
                        pj_turn_allocation *alloc)
467
 
{
468
 
    struct tcp_transport *tcp = (struct tcp_transport*) tp;
469
 
 
470
 
    --tcp->ref_cnt;
471
 
 
472
 
    if (alloc && alloc == tcp->alloc) {
473
 
        tcp->alloc = NULL;
474
 
    }
475
 
 
476
 
    if (tcp->ref_cnt == 0 && tcp->timer.id == TIMER_NONE) {
477
 
        pj_time_val delay = { SHUTDOWN_DELAY, 0 };
478
 
        tcp->timer.id = TIMER_DESTROY;
479
 
        pj_timer_heap_schedule(tcp->base.listener->server->core.timer_heap,
480
 
                               &tcp->timer, &delay);
481
 
    }
482
 
}
483
 
 
484
 
#else   /* PJ_HAS_TCP */
485
 
 
486
 
/* To avoid empty translation unit warning */
487
 
int listener_tcp_dummy = 0;
488
 
 
489
 
#endif  /* PJ_HAS_TCP */
490