1
/* $Id: server.c 3553 2011-05-05 06:14:19Z nanang $ */
3
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23
#define MAX_CLIENTS 32
24
#define MAX_PEERS_PER_CLIENT 8
25
//#define MAX_HANDLES (MAX_CLIENTS*MAX_PEERS_PER_CLIENT+MAX_LISTENERS)
26
#define MAX_HANDLES PJ_IOQUEUE_MAX_HANDLES
27
#define MAX_TIMER (MAX_HANDLES * 2)
28
#define MIN_PORT 49152
29
#define MAX_PORT 65535
30
#define MAX_LISTENERS 16
32
#define MAX_NET_EVENTS 1000
35
static int server_thread_proc(void *arg);
36
static pj_status_t on_tx_stun_msg( pj_stun_session *sess,
40
const pj_sockaddr_t *dst_addr,
42
static pj_status_t on_rx_stun_request(pj_stun_session *sess,
43
const pj_uint8_t *pkt,
45
const pj_stun_rx_data *rdata,
47
const pj_sockaddr_t *src_addr,
48
unsigned src_addr_len);
61
* Get transport type name, normally for logging purpose only.
63
PJ_DEF(const char*) pj_turn_tp_type_name(int tp_type)
65
/* Must be 3 characters long! */
66
if (tp_type == PJ_TURN_TP_UDP) {
68
} else if (tp_type == PJ_TURN_TP_TCP) {
71
pj_assert(!"Unsupported transport");
79
PJ_DEF(pj_status_t) pj_turn_srv_create(pj_pool_factory *pf,
83
pj_stun_session_cb sess_cb;
88
PJ_ASSERT_RETURN(pf && p_srv, PJ_EINVAL);
90
/* Create server and init core settings */
91
pool = pj_pool_create(pf, "srv%p", 1000, 1000, NULL);
92
srv = PJ_POOL_ZALLOC_T(pool, pj_turn_srv);
93
srv->obj_name = pool->obj_name;
95
srv->core.pool = pool;
96
srv->core.tls_key = srv->core.tls_data = -1;
99
status = pj_ioqueue_create(pool, MAX_HANDLES, &srv->core.ioqueue);
100
if (status != PJ_SUCCESS)
104
status = pj_lock_create_recursive_mutex(pool, srv->obj_name,
106
if (status != PJ_SUCCESS)
110
status = pj_thread_local_alloc(&srv->core.tls_key);
111
if (status != PJ_SUCCESS)
114
status = pj_thread_local_alloc(&srv->core.tls_data);
115
if (status != PJ_SUCCESS)
118
/* Create timer heap */
119
status = pj_timer_heap_create(pool, MAX_TIMER, &srv->core.timer_heap);
120
if (status != PJ_SUCCESS)
123
/* Configure lock for the timer heap */
124
pj_timer_heap_set_lock(srv->core.timer_heap, srv->core.lock, PJ_FALSE);
126
/* Array of listeners */
127
srv->core.listener = (pj_turn_listener**)
128
pj_pool_calloc(pool, MAX_LISTENERS,
129
sizeof(srv->core.listener[0]));
131
/* Create hash tables */
132
srv->tables.alloc = pj_hash_create(pool, MAX_CLIENTS);
133
srv->tables.res = pj_hash_create(pool, MAX_CLIENTS);
135
/* Init ports settings */
136
srv->ports.min_udp = srv->ports.next_udp = MIN_PORT;
137
srv->ports.max_udp = MAX_PORT;
138
srv->ports.min_tcp = srv->ports.next_tcp = MIN_PORT;
139
srv->ports.max_tcp = MAX_PORT;
141
/* Init STUN config */
142
pj_stun_config_init(&srv->core.stun_cfg, pf, 0, srv->core.ioqueue,
143
srv->core.timer_heap);
145
/* Init STUN credential */
146
srv->core.cred.type = PJ_STUN_AUTH_CRED_DYNAMIC;
147
srv->core.cred.data.dyn_cred.user_data = srv;
148
srv->core.cred.data.dyn_cred.get_auth = &pj_turn_get_auth;
149
srv->core.cred.data.dyn_cred.get_password = &pj_turn_get_password;
150
srv->core.cred.data.dyn_cred.verify_nonce = &pj_turn_verify_nonce;
152
/* Create STUN session to handle new allocation */
153
pj_bzero(&sess_cb, sizeof(sess_cb));
154
sess_cb.on_rx_request = &on_rx_stun_request;
155
sess_cb.on_send_msg = &on_tx_stun_msg;
157
status = pj_stun_session_create(&srv->core.stun_cfg, srv->obj_name,
158
&sess_cb, PJ_FALSE, &srv->core.stun_sess);
159
if (status != PJ_SUCCESS) {
163
pj_stun_session_set_user_data(srv->core.stun_sess, srv);
164
pj_stun_session_set_credential(srv->core.stun_sess, PJ_STUN_AUTH_LONG_TERM,
168
/* Array of worker threads */
169
srv->core.thread_cnt = MAX_THREADS;
170
srv->core.thread = (pj_thread_t**)
171
pj_pool_calloc(pool, srv->core.thread_cnt,
172
sizeof(pj_thread_t*));
174
/* Start the worker threads */
175
for (i=0; i<srv->core.thread_cnt; ++i) {
176
status = pj_thread_create(pool, srv->obj_name, &server_thread_proc,
177
srv, 0, 0, &srv->core.thread[i]);
178
if (status != PJ_SUCCESS)
182
/* We're done. Application should add listeners now */
183
PJ_LOG(4,(srv->obj_name, "TURN server v%s is running",
190
pj_turn_srv_destroy(srv);
196
* Handle timer and network events
198
static void srv_handle_events(pj_turn_srv *srv, const pj_time_val *max_timeout)
200
/* timeout is 'out' var. This just to make compiler happy. */
201
pj_time_val timeout = { 0, 0};
202
unsigned net_event_count = 0;
205
/* Poll the timer. The timer heap has its own mutex for better
206
* granularity, so we don't need to lock the server.
208
timeout.sec = timeout.msec = 0;
209
c = pj_timer_heap_poll( srv->core.timer_heap, &timeout );
211
/* timer_heap_poll should never ever returns negative value, or otherwise
212
* ioqueue_poll() will block forever!
214
pj_assert(timeout.sec >= 0 && timeout.msec >= 0);
215
if (timeout.msec >= 1000) timeout.msec = 999;
217
/* If caller specifies maximum time to wait, then compare the value with
218
* the timeout to wait from timer, and use the minimum value.
220
if (max_timeout && PJ_TIME_VAL_GT(timeout, *max_timeout)) {
221
timeout = *max_timeout;
225
* Repeat polling the ioqueue while we have immediate events, because
226
* timer heap may process more than one events, so if we only process
227
* one network events at a time (such as when IOCP backend is used),
228
* the ioqueue may have trouble keeping up with the request rate.
230
* For example, for each send() request, one network event will be
231
* reported by ioqueue for the send() completion. If we don't poll
232
* the ioqueue often enough, the send() completion will not be
233
* reported in timely manner.
236
c = pj_ioqueue_poll( srv->core.ioqueue, &timeout);
238
pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout));
243
net_event_count += c;
244
timeout.sec = timeout.msec = 0;
246
} while (c > 0 && net_event_count < MAX_NET_EVENTS);
251
* Server worker thread proc.
253
static int server_thread_proc(void *arg)
255
pj_turn_srv *srv = (pj_turn_srv*)arg;
257
while (!srv->core.quit) {
258
pj_time_val timeout_max = {0, 100};
259
srv_handle_events(srv, &timeout_max);
266
* Destroy the server.
268
PJ_DEF(pj_status_t) pj_turn_srv_destroy(pj_turn_srv *srv)
270
pj_hash_iterator_t itbuf, *it;
273
/* Stop all worker threads */
274
srv->core.quit = PJ_TRUE;
275
for (i=0; i<srv->core.thread_cnt; ++i) {
276
if (srv->core.thread[i]) {
277
pj_thread_join(srv->core.thread[i]);
278
pj_thread_destroy(srv->core.thread[i]);
279
srv->core.thread[i] = NULL;
283
/* Destroy all allocations FIRST */
284
if (srv->tables.alloc) {
285
it = pj_hash_first(srv->tables.alloc, &itbuf);
287
pj_turn_allocation *alloc = (pj_turn_allocation*)
288
pj_hash_this(srv->tables.alloc, it);
289
pj_hash_iterator_t *next = pj_hash_next(srv->tables.alloc, it);
290
pj_turn_allocation_destroy(alloc);
295
/* Destroy all listeners. */
296
for (i=0; i<srv->core.lis_cnt; ++i) {
297
if (srv->core.listener[i]) {
298
pj_turn_listener_destroy(srv->core.listener[i]);
299
srv->core.listener[i] = NULL;
303
/* Destroy STUN session */
304
if (srv->core.stun_sess) {
305
pj_stun_session_destroy(srv->core.stun_sess);
306
srv->core.stun_sess = NULL;
309
/* Destroy hash tables (well, sort of) */
310
if (srv->tables.alloc) {
311
srv->tables.alloc = NULL;
312
srv->tables.res = NULL;
315
/* Destroy timer heap */
316
if (srv->core.timer_heap) {
317
pj_timer_heap_destroy(srv->core.timer_heap);
318
srv->core.timer_heap = NULL;
321
/* Destroy ioqueue */
322
if (srv->core.ioqueue) {
323
pj_ioqueue_destroy(srv->core.ioqueue);
324
srv->core.ioqueue = NULL;
327
/* Destroy thread local IDs */
328
if (srv->core.tls_key != -1) {
329
pj_thread_local_free(srv->core.tls_key);
330
srv->core.tls_key = -1;
332
if (srv->core.tls_data != -1) {
333
pj_thread_local_free(srv->core.tls_data);
334
srv->core.tls_data = -1;
337
/* Destroy server lock */
338
if (srv->core.lock) {
339
pj_lock_destroy(srv->core.lock);
340
srv->core.lock = NULL;
344
if (srv->core.pool) {
345
pj_pool_t *pool = srv->core.pool;
346
srv->core.pool = NULL;
347
pj_pool_release(pool);
358
PJ_DEF(pj_status_t) pj_turn_srv_add_listener(pj_turn_srv *srv,
359
pj_turn_listener *lis)
363
PJ_ASSERT_RETURN(srv && lis, PJ_EINVAL);
364
PJ_ASSERT_RETURN(srv->core.lis_cnt < MAX_LISTENERS, PJ_ETOOMANY);
367
index = srv->core.lis_cnt;
368
srv->core.listener[index] = lis;
373
PJ_LOG(4,(srv->obj_name, "Listener %s/%s added at index %d",
374
lis->obj_name, lis->info, lis->id));
383
PJ_DEF(pj_status_t) pj_turn_listener_destroy(pj_turn_listener *listener)
385
pj_turn_srv *srv = listener->server;
388
/* Remove from our listener list */
389
pj_lock_acquire(srv->core.lock);
390
for (i=0; i<srv->core.lis_cnt; ++i) {
391
if (srv->core.listener[i] == listener) {
392
srv->core.listener[i] = NULL;
394
listener->id = PJ_TURN_INVALID_LIS_ID;
398
pj_lock_release(srv->core.lock);
401
return listener->destroy(listener);
406
* Add a reference to a transport.
408
PJ_DEF(void) pj_turn_transport_add_ref( pj_turn_transport *transport,
409
pj_turn_allocation *alloc)
411
transport->add_ref(transport, alloc);
416
* Decrement transport reference counter.
418
PJ_DEF(void) pj_turn_transport_dec_ref( pj_turn_transport *transport,
419
pj_turn_allocation *alloc)
421
transport->dec_ref(transport, alloc);
426
* Register an allocation to the hash tables.
428
PJ_DEF(pj_status_t) pj_turn_srv_register_allocation(pj_turn_srv *srv,
429
pj_turn_allocation *alloc)
431
/* Add to hash tables */
432
pj_lock_acquire(srv->core.lock);
433
pj_hash_set(alloc->pool, srv->tables.alloc,
434
&alloc->hkey, sizeof(alloc->hkey), 0, alloc);
435
pj_hash_set(alloc->pool, srv->tables.res,
436
&alloc->relay.hkey, sizeof(alloc->relay.hkey), 0,
438
pj_lock_release(srv->core.lock);
445
* Unregister an allocation from the hash tables.
447
PJ_DEF(pj_status_t) pj_turn_srv_unregister_allocation(pj_turn_srv *srv,
448
pj_turn_allocation *alloc)
450
/* Unregister from hash tables */
451
pj_lock_acquire(srv->core.lock);
452
pj_hash_set(alloc->pool, srv->tables.alloc,
453
&alloc->hkey, sizeof(alloc->hkey), 0, NULL);
454
pj_hash_set(alloc->pool, srv->tables.res,
455
&alloc->relay.hkey, sizeof(alloc->relay.hkey), 0, NULL);
456
pj_lock_release(srv->core.lock);
462
/* Callback from our own STUN session whenever it needs to send
463
* outgoing STUN packet.
465
static pj_status_t on_tx_stun_msg( pj_stun_session *sess,
469
const pj_sockaddr_t *dst_addr,
472
pj_turn_transport *transport = (pj_turn_transport*) token;
474
PJ_ASSERT_RETURN(transport!=NULL, PJ_EINVALIDOP);
478
return transport->sendto(transport, pdu, pdu_size, 0,
483
/* Respond to STUN request */
484
static pj_status_t stun_respond(pj_stun_session *sess,
485
pj_turn_transport *transport,
486
const pj_stun_rx_data *rdata,
490
const pj_sockaddr_t *dst_addr,
495
pj_stun_tx_data *tdata;
497
/* Create response */
498
status = pj_stun_session_create_res(sess, rdata, code,
499
(errmsg?pj_cstr(&reason,errmsg):NULL),
501
if (status != PJ_SUCCESS)
504
/* Send the response */
505
return pj_stun_session_send_msg(sess, transport, cache, PJ_FALSE,
506
dst_addr, addr_len, tdata);
510
/* Callback from our own STUN session when incoming request arrives.
511
* This function is triggered by pj_stun_session_on_rx_pkt() call in
512
* pj_turn_srv_on_rx_pkt() function below.
514
static pj_status_t on_rx_stun_request(pj_stun_session *sess,
515
const pj_uint8_t *pdu,
517
const pj_stun_rx_data *rdata,
519
const pj_sockaddr_t *src_addr,
520
unsigned src_addr_len)
522
pj_turn_transport *transport;
523
const pj_stun_msg *msg = rdata->msg;
525
pj_turn_allocation *alloc;
529
PJ_UNUSED_ARG(pdu_len);
531
transport = (pj_turn_transport*) token;
532
srv = transport->listener->server;
534
/* Respond any requests other than ALLOCATE with 437 response */
535
if (msg->hdr.type != PJ_STUN_ALLOCATE_REQUEST) {
536
stun_respond(sess, transport, rdata, PJ_STUN_SC_ALLOCATION_MISMATCH,
537
NULL, PJ_FALSE, src_addr, src_addr_len);
541
/* Create new allocation. The relay resource will be allocated
544
status = pj_turn_allocation_create(transport, src_addr, src_addr_len,
545
rdata, sess, &alloc);
546
if (status != PJ_SUCCESS) {
547
/* STUN response has been sent, no need to reply here */
555
/* Handle STUN Binding request */
556
static void handle_binding_request(pj_turn_pkt *pkt,
559
pj_stun_msg *request, *response;
565
status = pj_stun_msg_decode(pkt->pool, pkt->pkt, pkt->len, options,
566
&request, NULL, NULL);
567
if (status != PJ_SUCCESS)
570
/* Create response */
571
status = pj_stun_msg_create_response(pkt->pool, request, 0, NULL,
573
if (status != PJ_SUCCESS)
576
/* Add XOR-MAPPED-ADDRESS */
577
pj_stun_msg_add_sockaddr_attr(pkt->pool, response,
578
PJ_STUN_ATTR_XOR_MAPPED_ADDR,
584
status = pj_stun_msg_encode(response, pdu, sizeof(pdu), 0, NULL, &len);
585
if (status != PJ_SUCCESS)
589
pkt->transport->sendto(pkt->transport, pdu, len, 0,
590
&pkt->src.clt_addr, pkt->src_addr_len);
594
* This callback is called by UDP listener on incoming packet. This is
595
* the first entry for incoming packet (from client) to the server. From
596
* here, the packet may be handed over to an allocation if an allocation
597
* is found for the client address, or handed over to owned STUN session
598
* if an allocation is not found.
600
PJ_DEF(void) pj_turn_srv_on_rx_pkt(pj_turn_srv *srv,
603
pj_turn_allocation *alloc;
605
/* Get TURN allocation from the source address */
606
pj_lock_acquire(srv->core.lock);
607
alloc = (pj_turn_allocation*)
608
pj_hash_get(srv->tables.alloc, &pkt->src, sizeof(pkt->src), NULL);
609
pj_lock_release(srv->core.lock);
611
/* If allocation is found, just hand over the packet to the
615
pj_turn_allocation_on_rx_client_pkt(alloc, pkt);
617
/* Otherwise this is a new client */
619
pj_size_t parsed_len;
622
/* Check that this is a STUN message */
623
options = PJ_STUN_CHECK_PACKET | PJ_STUN_NO_FINGERPRINT_CHECK;
624
if (pkt->transport->listener->tp_type == PJ_TURN_TP_UDP)
625
options |= PJ_STUN_IS_DATAGRAM;
627
status = pj_stun_msg_check(pkt->pkt, pkt->len, options);
628
if (status != PJ_SUCCESS) {
629
/* If the first byte are not STUN, drop the packet. First byte
630
* of STUN message is always 0x00 or 0x01. Otherwise wait for
631
* more data as the data might have come from TCP.
633
* Also drop packet if it's unreasonably too big, as this might
634
* indicate invalid data that's building up in the buffer.
636
* Or if packet is a datagram.
638
if ((*pkt->pkt != 0x00 && *pkt->pkt != 0x01) ||
640
(options & PJ_STUN_IS_DATAGRAM))
642
char errmsg[PJ_ERR_MSG_SIZE];
643
char ip[PJ_INET6_ADDRSTRLEN+10];
647
pj_strerror(status, errmsg, sizeof(errmsg));
648
PJ_LOG(5,(srv->obj_name,
649
"Non-STUN packet from %s is dropped: %s",
650
pj_sockaddr_print(&pkt->src.clt_addr, ip, sizeof(ip), 3),
656
/* Special handling for Binding Request. We won't give it to the
657
* STUN session since this request is not authenticated.
659
if (pkt->pkt[1] == 1) {
660
handle_binding_request(pkt, options);
664
/* Hand over processing to STUN session. This will trigger
665
* on_rx_stun_request() callback to be called if the STUN
666
* message is a request.
668
options &= ~PJ_STUN_CHECK_PACKET;
670
status = pj_stun_session_on_rx_pkt(srv->core.stun_sess, pkt->pkt,
671
pkt->len, options, pkt->transport,
672
&parsed_len, &pkt->src.clt_addr,
674
if (status != PJ_SUCCESS) {
675
char errmsg[PJ_ERR_MSG_SIZE];
676
char ip[PJ_INET6_ADDRSTRLEN+10];
678
pj_strerror(status, errmsg, sizeof(errmsg));
679
PJ_LOG(5,(srv->obj_name,
680
"Error processing STUN packet from %s: %s",
681
pj_sockaddr_print(&pkt->src.clt_addr, ip, sizeof(ip), 3),
685
if (pkt->transport->listener->tp_type == PJ_TURN_TP_UDP) {
687
} else if (parsed_len > 0) {
688
if (parsed_len == pkt->len) {
691
pj_memmove(pkt->pkt, pkt->pkt+parsed_len,
692
pkt->len - parsed_len);
693
pkt->len -= parsed_len;