1
/* $Id: sip_transport_loop.c 3553 2011-05-05 06:14:19Z nanang $ */
3
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
#include <pjsip/sip_transport_loop.h>
21
#include <pjsip/sip_endpoint.h>
22
#include <pjsip/sip_errno.h>
25
#include <pj/string.h>
27
#include <pj/assert.h>
28
#include <pj/compat/socket.h>
31
#define ADDR_LOOP "128.0.0.1"
32
#define ADDR_LOOP_DGRAM "129.0.0.1"
35
/** This structure describes incoming packet. */
38
PJ_DECL_LIST_MEMBER(struct recv_list);
42
/** This structure is used to keep delayed send failure. */
45
PJ_DECL_LIST_MEMBER(struct send_list);
46
pj_time_val sent_time;
50
void (*callback)(pjsip_transport*, void*, pj_ssize_t);
53
/** This structure describes the loop transport. */
59
pj_bool_t thread_quit_flag;
64
struct recv_list recv_list;
65
struct send_list send_list;
69
/* Helper function to create "incoming" packet */
70
struct recv_list *create_incoming_packet( struct loop_transport *loop,
71
pjsip_tx_data *tdata )
74
struct recv_list *pkt;
76
pool = pjsip_endpt_create_pool(loop->base.endpt, "rdata",
78
PJSIP_POOL_RDATA_INC+5);
82
pkt = PJ_POOL_ZALLOC_T(pool, struct recv_list);
84
/* Initialize rdata. */
85
pkt->rdata.tp_info.pool = pool;
86
pkt->rdata.tp_info.transport = &loop->base;
88
/* Copy the packet. */
89
pj_memcpy(pkt->rdata.pkt_info.packet, tdata->buf.start,
90
tdata->buf.cur - tdata->buf.start);
91
pkt->rdata.pkt_info.len = tdata->buf.cur - tdata->buf.start;
93
/* the source address */
94
pkt->rdata.pkt_info.src_addr.addr.sa_family = pj_AF_INET();
96
/* "Source address" info. */
97
pkt->rdata.pkt_info.src_addr_len = sizeof(pj_sockaddr_in);
98
if (loop->base.key.type == PJSIP_TRANSPORT_LOOP) {
99
pj_ansi_strcpy(pkt->rdata.pkt_info.src_name, ADDR_LOOP);
101
pj_ansi_strcpy(pkt->rdata.pkt_info.src_name, ADDR_LOOP_DGRAM);
103
pkt->rdata.pkt_info.src_port = loop->base.local_name.port;
105
/* When do we need to "deliver" this packet. */
106
pj_gettimeofday(&pkt->rdata.pkt_info.timestamp);
107
pkt->rdata.pkt_info.timestamp.msec += loop->recv_delay;
108
pj_time_val_normalize(&pkt->rdata.pkt_info.timestamp);
116
/* Helper function to add pending notification callback. */
117
static pj_status_t add_notification( struct loop_transport *loop,
118
pjsip_tx_data *tdata,
121
void (*callback)(pjsip_transport*,
124
struct send_list *sent_status;
126
pjsip_tx_data_add_ref(tdata);
127
pj_lock_acquire(tdata->lock);
128
sent_status = PJ_POOL_ALLOC_T(tdata->pool, struct send_list);
129
pj_lock_release(tdata->lock);
131
sent_status->sent = sent;
132
sent_status->tdata = tdata;
133
sent_status->token = token;
134
sent_status->callback = callback;
136
pj_gettimeofday(&sent_status->sent_time);
137
sent_status->sent_time.msec += loop->send_delay;
138
pj_time_val_normalize(&sent_status->sent_time);
140
pj_lock_acquire(loop->base.lock);
141
pj_list_push_back(&loop->send_list, sent_status);
142
pj_lock_release(loop->base.lock);
147
/* Handler for sending outgoing message; called by transport manager. */
148
static pj_status_t loop_send_msg( pjsip_transport *tp,
149
pjsip_tx_data *tdata,
150
const pj_sockaddr_t *rem_addr,
153
pjsip_transport_callback cb)
155
struct loop_transport *loop = (struct loop_transport*)tp;
156
struct recv_list *recv_pkt;
158
PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
159
tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
161
PJ_UNUSED_ARG(rem_addr);
162
PJ_UNUSED_ARG(addr_len);
165
/* Need to send failure? */
166
if (loop->fail_mode) {
167
if (loop->send_delay == 0) {
168
return PJ_STATUS_FROM_OS(OSERR_ECONNRESET);
170
add_notification(loop, tdata, -PJ_STATUS_FROM_OS(OSERR_ECONNRESET),
177
/* Discard any packets? */
181
/* Create rdata for the "incoming" packet. */
182
recv_pkt = create_incoming_packet(loop, tdata);
186
/* If delay is not configured, deliver this packet now! */
187
if (loop->recv_delay == 0) {
188
pj_ssize_t size_eaten;
190
size_eaten = pjsip_tpmgr_receive_packet( loop->base.tpmgr,
192
pj_assert(size_eaten == recv_pkt->rdata.pkt_info.len);
194
pjsip_endpt_release_pool(loop->base.endpt,
195
recv_pkt->rdata.tp_info.pool);
198
/* Otherwise if delay is configured, add the "packet" to the
199
* receive list to be processed by worker thread.
201
pj_lock_acquire(loop->base.lock);
202
pj_list_push_back(&loop->recv_list, recv_pkt);
203
pj_lock_release(loop->base.lock);
206
if (loop->send_delay != 0) {
207
add_notification(loop, tdata, tdata->buf.cur - tdata->buf.start,
215
/* Handler to destroy the transport; called by transport manager */
216
static pj_status_t loop_destroy(pjsip_transport *tp)
218
struct loop_transport *loop = (struct loop_transport*)tp;
220
PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
221
tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
223
loop->thread_quit_flag = 1;
224
/* Unlock transport mutex before joining thread. */
225
pj_lock_release(tp->lock);
226
pj_thread_join(loop->thread);
227
pj_thread_destroy(loop->thread);
229
/* Clear pending send notifications. */
230
while (!pj_list_empty(&loop->send_list)) {
231
struct send_list *node = loop->send_list.next;
232
/* Notify callback. */
233
if (node->callback) {
234
(*node->callback)(&loop->base, node->token, -PJSIP_ESHUTDOWN);
237
pjsip_tx_data_dec_ref(node->tdata);
240
/* Clear "incoming" packets in the queue. */
241
while (!pj_list_empty(&loop->recv_list)) {
242
struct recv_list *node = loop->recv_list.next;
244
pjsip_endpt_release_pool(loop->base.endpt,
245
node->rdata.tp_info.pool);
248
/* Self destruct.. heheh.. */
249
pj_lock_destroy(loop->base.lock);
250
pj_atomic_destroy(loop->base.ref_cnt);
251
pjsip_endpt_release_pool(loop->base.endpt, loop->base.pool);
256
/* Worker thread for loop transport. */
257
static int loop_transport_worker_thread(void *arg)
259
struct loop_transport *loop = (struct loop_transport*) arg;
266
while (!loop->thread_quit_flag) {
270
pj_gettimeofday(&now);
272
pj_lock_acquire(loop->base.lock);
274
/* Move expired send notification to local list. */
275
while (!pj_list_empty(&loop->send_list)) {
276
struct send_list *node = loop->send_list.next;
278
/* Break when next node time is greater than now. */
279
if (PJ_TIME_VAL_GTE(node->sent_time, now))
282
/* Delete this from the list. */
285
/* Add to local list. */
286
pj_list_push_back(&s, node);
289
/* Move expired "incoming" packet to local list. */
290
while (!pj_list_empty(&loop->recv_list)) {
291
struct recv_list *node = loop->recv_list.next;
293
/* Break when next node time is greater than now. */
294
if (PJ_TIME_VAL_GTE(node->rdata.pkt_info.timestamp, now))
297
/* Delete this from the list. */
300
/* Add to local list. */
301
pj_list_push_back(&r, node);
305
pj_lock_release(loop->base.lock);
307
/* Process send notification and incoming packet notification
308
* without holding down the loop's mutex.
310
while (!pj_list_empty(&s)) {
311
struct send_list *node = s.next;
315
/* Notify callback. */
316
if (node->callback) {
317
(*node->callback)(&loop->base, node->token, node->sent);
320
/* Decrement tdata reference counter. */
321
pjsip_tx_data_dec_ref(node->tdata);
324
/* Process "incoming" packet. */
325
while (!pj_list_empty(&r)) {
326
struct recv_list *node = r.next;
327
pj_ssize_t size_eaten;
331
/* Notify transport manager about the "incoming packet" */
332
size_eaten = pjsip_tpmgr_receive_packet(loop->base.tpmgr,
335
/* Must "eat" all the packets. */
336
pj_assert(size_eaten == node->rdata.pkt_info.len);
339
pjsip_endpt_release_pool(loop->base.endpt,
340
node->rdata.tp_info.pool);
348
/* Start loop transport. */
349
PJ_DEF(pj_status_t) pjsip_loop_start( pjsip_endpoint *endpt,
350
pjsip_transport **transport)
353
struct loop_transport *loop;
357
pool = pjsip_endpt_create_pool(endpt, "loop", 4000, 4000);
361
/* Create the loop structure. */
362
loop = PJ_POOL_ZALLOC_T(pool, struct loop_transport);
364
/* Initialize transport properties. */
365
pj_ansi_snprintf(loop->base.obj_name, sizeof(loop->base.obj_name),
367
loop->base.pool = pool;
368
status = pj_atomic_create(pool, 0, &loop->base.ref_cnt);
369
if (status != PJ_SUCCESS)
371
status = pj_lock_create_recursive_mutex(pool, "loop", &loop->base.lock);
372
if (status != PJ_SUCCESS)
374
loop->base.key.type = PJSIP_TRANSPORT_LOOP_DGRAM;
375
//loop->base.key.rem_addr.sa_family = pj_AF_INET();
376
loop->base.type_name = "LOOP-DGRAM";
377
loop->base.info = "LOOP-DGRAM";
378
loop->base.flag = PJSIP_TRANSPORT_DATAGRAM;
379
loop->base.local_name.host = pj_str(ADDR_LOOP_DGRAM);
380
loop->base.local_name.port =
381
pjsip_transport_get_default_port_for_type((pjsip_transport_type_e)
382
loop->base.key.type);
383
loop->base.addr_len = sizeof(pj_sockaddr_in);
384
loop->base.dir = PJSIP_TP_DIR_NONE;
385
loop->base.endpt = endpt;
386
loop->base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
387
loop->base.send_msg = &loop_send_msg;
388
loop->base.destroy = &loop_destroy;
390
pj_list_init(&loop->recv_list);
391
pj_list_init(&loop->send_list);
393
/* Create worker thread. */
394
status = pj_thread_create(pool, "loop",
395
&loop_transport_worker_thread, loop, 0,
396
PJ_THREAD_SUSPENDED, &loop->thread);
397
if (status != PJ_SUCCESS)
400
/* Register to transport manager. */
401
status = pjsip_transport_register( loop->base.tpmgr, &loop->base);
402
if (status != PJ_SUCCESS)
405
/* Start the thread. */
406
status = pj_thread_resume(loop->thread);
407
if (status != PJ_SUCCESS)
415
*transport = &loop->base;
421
pj_lock_destroy(loop->base.lock);
423
pj_thread_destroy(loop->thread);
424
if (loop->base.ref_cnt)
425
pj_atomic_destroy(loop->base.ref_cnt);
426
pjsip_endpt_release_pool(endpt, loop->pool);
431
PJ_DEF(pj_status_t) pjsip_loop_set_discard( pjsip_transport *tp,
433
pj_bool_t *prev_value )
435
struct loop_transport *loop = (struct loop_transport*)tp;
437
PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
438
tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
441
*prev_value = loop->discard;
442
loop->discard = discard;
448
PJ_DEF(pj_status_t) pjsip_loop_set_failure( pjsip_transport *tp,
452
struct loop_transport *loop = (struct loop_transport*)tp;
454
PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
455
tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
458
*prev_value = loop->fail_mode;
459
loop->fail_mode = fail_flag;
465
PJ_DEF(pj_status_t) pjsip_loop_set_recv_delay( pjsip_transport *tp,
467
unsigned *prev_value)
469
struct loop_transport *loop = (struct loop_transport*)tp;
471
PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
472
tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
475
*prev_value = loop->recv_delay;
476
loop->recv_delay = delay;
481
PJ_DEF(pj_status_t) pjsip_loop_set_send_callback_delay( pjsip_transport *tp,
483
unsigned *prev_value)
485
struct loop_transport *loop = (struct loop_transport*)tp;
487
PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
488
tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
491
*prev_value = loop->send_delay;
492
loop->send_delay = delay;
497
PJ_DEF(pj_status_t) pjsip_loop_set_delay( pjsip_transport *tp, unsigned delay )
499
struct loop_transport *loop = (struct loop_transport*)tp;
501
PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
502
tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
504
loop->recv_delay = delay;
505
loop->send_delay = delay;