1
/* $Id: listener_tcp.c 3553 2011-05-05 06:14:19Z nanang $ */
3
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
#include <pj/compat/socket.h>
27
pj_ioqueue_op_key_t op_key;
35
pj_turn_listener base;
36
pj_ioqueue_key_t *key;
38
struct accept_op *accept_op; /* Array of accept_op's */
42
static void lis_on_accept_complete(pj_ioqueue_key_t *key,
43
pj_ioqueue_op_key_t *op_key,
46
static pj_status_t lis_destroy(pj_turn_listener *listener);
47
static void transport_create(pj_sock_t sock, pj_turn_listener *lis,
48
pj_sockaddr_t *src_addr, int src_addr_len);
50
static void show_err(const char *sender, const char *title,
53
char errmsg[PJ_ERR_MSG_SIZE];
55
pj_strerror(status, errmsg, sizeof(errmsg));
56
PJ_LOG(4,(sender, "%s: %s", title, errmsg));
61
* Create a new listener on the specified port.
63
PJ_DEF(pj_status_t) pj_turn_listener_create_tcp(pj_turn_srv *srv,
65
const pj_str_t *bound_addr,
67
unsigned concurrency_cnt,
69
pj_turn_listener **p_listener)
72
struct tcp_listener *tcp_lis;
73
pj_ioqueue_callback ioqueue_cb;
77
/* Create structure */
78
pool = pj_pool_create(srv->core.pf, "tcpl%p", 1000, 1000, NULL);
79
tcp_lis = PJ_POOL_ZALLOC_T(pool, struct tcp_listener);
80
tcp_lis->base.pool = pool;
81
tcp_lis->base.obj_name = pool->obj_name;
82
tcp_lis->base.server = srv;
83
tcp_lis->base.tp_type = PJ_TURN_TP_TCP;
84
tcp_lis->base.sock = PJ_INVALID_SOCKET;
85
//tcp_lis->base.sendto = &tcp_sendto;
86
tcp_lis->base.destroy = &lis_destroy;
87
tcp_lis->accept_cnt = concurrency_cnt;
88
tcp_lis->base.flags = flags;
91
status = pj_sock_socket(af, pj_SOCK_STREAM(), 0, &tcp_lis->base.sock);
92
if (status != PJ_SUCCESS)
95
/* Init bind address */
96
status = pj_sockaddr_init(af, &tcp_lis->base.addr, bound_addr,
98
if (status != PJ_SUCCESS)
102
pj_ansi_strcpy(tcp_lis->base.info, "TCP:");
103
pj_sockaddr_print(&tcp_lis->base.addr, tcp_lis->base.info+4,
104
sizeof(tcp_lis->base.info)-4, 3);
107
status = pj_sock_bind(tcp_lis->base.sock, &tcp_lis->base.addr,
108
pj_sockaddr_get_len(&tcp_lis->base.addr));
109
if (status != PJ_SUCCESS)
113
status = pj_sock_listen(tcp_lis->base.sock, 5);
114
if (status != PJ_SUCCESS)
117
/* Register to ioqueue */
118
pj_bzero(&ioqueue_cb, sizeof(ioqueue_cb));
119
ioqueue_cb.on_accept_complete = &lis_on_accept_complete;
120
status = pj_ioqueue_register_sock(pool, srv->core.ioqueue, tcp_lis->base.sock,
121
tcp_lis, &ioqueue_cb, &tcp_lis->key);
124
tcp_lis->accept_op = (struct accept_op*)pj_pool_calloc(pool, concurrency_cnt,
125
sizeof(struct accept_op));
127
/* Create each accept_op and kick off read operation */
128
for (i=0; i<concurrency_cnt; ++i) {
129
lis_on_accept_complete(tcp_lis->key, &tcp_lis->accept_op[i].op_key,
130
PJ_INVALID_SOCKET, PJ_EPENDING);
134
PJ_LOG(4,(tcp_lis->base.obj_name, "Listener %s created",
135
tcp_lis->base.info));
137
*p_listener = &tcp_lis->base;
142
lis_destroy(&tcp_lis->base);
150
static pj_status_t lis_destroy(pj_turn_listener *listener)
152
struct tcp_listener *tcp_lis = (struct tcp_listener *)listener;
156
pj_ioqueue_unregister(tcp_lis->key);
158
tcp_lis->base.sock = PJ_INVALID_SOCKET;
159
} else if (tcp_lis->base.sock != PJ_INVALID_SOCKET) {
160
pj_sock_close(tcp_lis->base.sock);
161
tcp_lis->base.sock = PJ_INVALID_SOCKET;
164
for (i=0; i<tcp_lis->accept_cnt; ++i) {
168
if (tcp_lis->base.pool) {
169
pj_pool_t *pool = tcp_lis->base.pool;
171
PJ_LOG(4,(tcp_lis->base.obj_name, "Listener %s destroyed",
172
tcp_lis->base.info));
174
tcp_lis->base.pool = NULL;
175
pj_pool_release(pool);
182
* Callback on new TCP connection.
184
static void lis_on_accept_complete(pj_ioqueue_key_t *key,
185
pj_ioqueue_op_key_t *op_key,
189
struct tcp_listener *tcp_lis;
190
struct accept_op *accept_op = (struct accept_op*) op_key;
192
tcp_lis = (struct tcp_listener*) pj_ioqueue_get_user_data(key);
197
/* Report new connection. */
198
if (status == PJ_SUCCESS) {
199
char addr[PJ_INET6_ADDRSTRLEN+8];
200
PJ_LOG(5,(tcp_lis->base.obj_name, "Incoming TCP from %s",
201
pj_sockaddr_print(&accept_op->src_addr, addr,
203
transport_create(accept_op->sock, &tcp_lis->base,
204
&accept_op->src_addr, accept_op->src_addr_len);
205
} else if (status != PJ_EPENDING) {
206
show_err(tcp_lis->base.obj_name, "accept()", status);
209
/* Prepare next accept() */
210
accept_op->src_addr_len = sizeof(accept_op->src_addr);
211
status = pj_ioqueue_accept(key, op_key, &accept_op->sock,
213
&accept_op->src_addr,
214
&accept_op->src_addr_len);
216
} while (status != PJ_EPENDING && status != PJ_ECANCELLED &&
217
status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL));
221
/****************************************************************************/
231
/* The delay in seconds to be applied before TCP transport is destroyed when
232
* no allocation is referencing it. This also means the initial time to wait
233
* after the initial TCP connection establishment to receive a valid STUN
234
* message in the transport.
236
#define SHUTDOWN_DELAY 10
240
pj_ioqueue_op_key_t op_key;
246
pj_turn_transport base;
248
pj_timer_entry timer;
250
pj_turn_allocation *alloc;
254
pj_ioqueue_key_t *key;
255
struct recv_op recv_op;
256
pj_ioqueue_op_key_t send_op;
260
static void tcp_on_read_complete(pj_ioqueue_key_t *key,
261
pj_ioqueue_op_key_t *op_key,
262
pj_ssize_t bytes_read);
264
static pj_status_t tcp_sendto(pj_turn_transport *tp,
268
const pj_sockaddr_t *addr,
270
static void tcp_destroy(struct tcp_transport *tcp);
271
static void tcp_add_ref(pj_turn_transport *tp,
272
pj_turn_allocation *alloc);
273
static void tcp_dec_ref(pj_turn_transport *tp,
274
pj_turn_allocation *alloc);
275
static void timer_callback(pj_timer_heap_t *timer_heap,
276
pj_timer_entry *entry);
278
static void transport_create(pj_sock_t sock, pj_turn_listener *lis,
279
pj_sockaddr_t *src_addr, int src_addr_len)
282
struct tcp_transport *tcp;
283
pj_ioqueue_callback cb;
286
pool = pj_pool_create(lis->server->core.pf, "tcp%p", 1000, 1000, NULL);
288
tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport);
289
tcp->base.obj_name = pool->obj_name;
290
tcp->base.listener = lis;
291
tcp->base.info = lis->info;
292
tcp->base.sendto = &tcp_sendto;
293
tcp->base.add_ref = &tcp_add_ref;
294
tcp->base.dec_ref = &tcp_dec_ref;
298
pj_timer_entry_init(&tcp->timer, TIMER_NONE, tcp, &timer_callback);
300
/* Register to ioqueue */
301
pj_bzero(&cb, sizeof(cb));
302
cb.on_read_complete = &tcp_on_read_complete;
303
status = pj_ioqueue_register_sock(pool, lis->server->core.ioqueue, sock,
304
tcp, &cb, &tcp->key);
305
if (status != PJ_SUCCESS) {
311
tcp->recv_op.pkt.pool = pj_pool_create(lis->server->core.pf, "tcpkt%p",
313
tcp->recv_op.pkt.transport = &tcp->base;
314
tcp->recv_op.pkt.src.tp_type = PJ_TURN_TP_TCP;
315
tcp->recv_op.pkt.src_addr_len = src_addr_len;
316
pj_memcpy(&tcp->recv_op.pkt.src.clt_addr, src_addr, src_addr_len);
318
tcp_on_read_complete(tcp->key, &tcp->recv_op.op_key, -PJ_EPENDING);
319
/* Should not access transport from now, it may have been destroyed */
323
static void tcp_destroy(struct tcp_transport *tcp)
326
pj_ioqueue_unregister(tcp->key);
329
} else if (tcp->sock) {
330
pj_sock_close(tcp->sock);
335
pj_pool_release(tcp->pool);
340
static void timer_callback(pj_timer_heap_t *timer_heap,
341
pj_timer_entry *entry)
343
struct tcp_transport *tcp = (struct tcp_transport*) entry->user_data;
345
PJ_UNUSED_ARG(timer_heap);
351
static void tcp_on_read_complete(pj_ioqueue_key_t *key,
352
pj_ioqueue_op_key_t *op_key,
353
pj_ssize_t bytes_read)
355
struct tcp_transport *tcp;
356
struct recv_op *recv_op = (struct recv_op*) op_key;
359
tcp = (struct tcp_transport*) pj_ioqueue_get_user_data(key);
362
/* Report to server or allocation, if we have allocation */
363
if (bytes_read > 0) {
365
recv_op->pkt.len = bytes_read;
366
pj_gettimeofday(&recv_op->pkt.rx_time);
368
tcp_add_ref(&tcp->base, NULL);
371
pj_turn_allocation_on_rx_client_pkt(tcp->alloc, &recv_op->pkt);
373
pj_turn_srv_on_rx_pkt(tcp->base.listener->server, &recv_op->pkt);
376
pj_assert(tcp->ref_cnt > 0);
377
tcp_dec_ref(&tcp->base, NULL);
379
} else if (bytes_read != -PJ_EPENDING) {
380
/* TCP connection closed/error. Notify client and then destroy
382
* Note: the -PJ_EPENDING is the value passed during init.
387
if (bytes_read != 0) {
388
show_err(tcp->base.obj_name, "TCP socket error",
391
PJ_LOG(5,(tcp->base.obj_name, "TCP socket closed"));
393
pj_turn_allocation_on_transport_closed(tcp->alloc, &tcp->base);
397
pj_assert(tcp->ref_cnt > 0);
398
if (--tcp->ref_cnt == 0) {
405
pj_pool_reset(recv_op->pkt.pool);
407
/* If packet is full discard it */
408
if (recv_op->pkt.len == sizeof(recv_op->pkt.pkt)) {
409
PJ_LOG(4,(tcp->base.obj_name, "Buffer discarded"));
410
recv_op->pkt.len = 0;
413
/* Read next packet */
414
bytes_read = sizeof(recv_op->pkt.pkt) - recv_op->pkt.len;
415
status = pj_ioqueue_recv(tcp->key, op_key,
416
recv_op->pkt.pkt + recv_op->pkt.len,
419
if (status != PJ_EPENDING && status != PJ_SUCCESS)
420
bytes_read = -status;
422
} while (status != PJ_EPENDING && status != PJ_ECANCELLED &&
423
status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL));
428
static pj_status_t tcp_sendto(pj_turn_transport *tp,
432
const pj_sockaddr_t *addr,
435
struct tcp_transport *tcp = (struct tcp_transport*) tp;
436
pj_ssize_t length = size;
439
PJ_UNUSED_ARG(addr_len);
441
return pj_ioqueue_send(tcp->key, &tcp->send_op, packet, &length, flag);
445
static void tcp_add_ref(pj_turn_transport *tp,
446
pj_turn_allocation *alloc)
448
struct tcp_transport *tcp = (struct tcp_transport*) tp;
452
if (tcp->alloc == NULL && alloc) {
456
/* Cancel shutdown timer if it's running */
457
if (tcp->timer.id != TIMER_NONE) {
458
pj_timer_heap_cancel(tcp->base.listener->server->core.timer_heap,
460
tcp->timer.id = TIMER_NONE;
465
static void tcp_dec_ref(pj_turn_transport *tp,
466
pj_turn_allocation *alloc)
468
struct tcp_transport *tcp = (struct tcp_transport*) tp;
472
if (alloc && alloc == tcp->alloc) {
476
if (tcp->ref_cnt == 0 && tcp->timer.id == TIMER_NONE) {
477
pj_time_val delay = { SHUTDOWN_DELAY, 0 };
478
tcp->timer.id = TIMER_DESTROY;
479
pj_timer_heap_schedule(tcp->base.listener->server->core.timer_heap,
480
&tcp->timer, &delay);
484
#else /* PJ_HAS_TCP */
486
/* To avoid empty translation unit warning */
487
int listener_tcp_dummy = 0;
489
#endif /* PJ_HAS_TCP */