~noskcaj/ubuntu/saucy/sflphone/merge-1.2.3-2

« back to all changes in this revision

Viewing changes to daemon/libs/pjproject-2.0.1/pjsip/src/pjsip/sip_transport_loop.c

  • Committer: Jackson Doak
  • Date: 2013-07-10 21:04:46 UTC
  • mfrom: (20.1.3 sid)
  • Revision ID: noskcaj@ubuntu.com-20130710210446-y8f587vza807icr9
Properly merged from upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* $Id: sip_transport_loop.c 3553 2011-05-05 06:14:19Z nanang $ */
 
2
/*
 
3
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
 
4
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
 
5
 *
 
6
 * This program is free software; you can redistribute it and/or modify
 
7
 * it under the terms of the GNU General Public License as published by
 
8
 * the Free Software Foundation; either version 2 of the License, or
 
9
 * (at your option) any later version.
 
10
 *
 
11
 * This program is distributed in the hope that it will be useful,
 
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 * GNU General Public License for more details.
 
15
 *
 
16
 * You should have received a copy of the GNU General Public License
 
17
 * along with this program; if not, write to the Free Software
 
18
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
19
 */
 
20
#include <pjsip/sip_transport_loop.h>
 
21
#include <pjsip/sip_endpoint.h>
 
22
#include <pjsip/sip_errno.h>
 
23
#include <pj/pool.h>
 
24
#include <pj/os.h>
 
25
#include <pj/string.h>
 
26
#include <pj/lock.h>
 
27
#include <pj/assert.h>
 
28
#include <pj/compat/socket.h>
 
29
 
 
30
 
 
31
#define ADDR_LOOP       "128.0.0.1"
 
32
#define ADDR_LOOP_DGRAM "129.0.0.1"
 
33
 
 
34
 
 
35
/** This structure describes incoming packet. */
 
36
struct recv_list
 
37
{
 
38
    PJ_DECL_LIST_MEMBER(struct recv_list);
 
39
    pjsip_rx_data  rdata;
 
40
};
 
41
 
 
42
/** This structure is used to keep delayed send failure. */
 
43
struct send_list
 
44
{
 
45
    PJ_DECL_LIST_MEMBER(struct send_list);
 
46
    pj_time_val    sent_time;
 
47
    pj_ssize_t     sent;
 
48
    pjsip_tx_data *tdata;
 
49
    void          *token;
 
50
    void         (*callback)(pjsip_transport*, void*, pj_ssize_t);
 
51
};
 
52
 
 
53
/** This structure describes the loop transport. */
 
54
struct loop_transport
 
55
{
 
56
    pjsip_transport          base;
 
57
    pj_pool_t               *pool;
 
58
    pj_thread_t             *thread;
 
59
    pj_bool_t                thread_quit_flag;
 
60
    pj_bool_t                discard;
 
61
    int                      fail_mode;
 
62
    unsigned                 recv_delay;
 
63
    unsigned                 send_delay;
 
64
    struct recv_list         recv_list;
 
65
    struct send_list         send_list;
 
66
};
 
67
 
 
68
 
 
69
/* Helper function to create "incoming" packet */
 
70
struct recv_list *create_incoming_packet( struct loop_transport *loop,
 
71
                                          pjsip_tx_data *tdata )
 
72
{
 
73
    pj_pool_t *pool;
 
74
    struct recv_list *pkt;
 
75
 
 
76
    pool = pjsip_endpt_create_pool(loop->base.endpt, "rdata",
 
77
                                   PJSIP_POOL_RDATA_LEN,
 
78
                                   PJSIP_POOL_RDATA_INC+5);
 
79
    if (!pool)
 
80
        return NULL;
 
81
 
 
82
    pkt = PJ_POOL_ZALLOC_T(pool, struct recv_list);
 
83
 
 
84
    /* Initialize rdata. */
 
85
    pkt->rdata.tp_info.pool = pool;
 
86
    pkt->rdata.tp_info.transport = &loop->base;
 
87
 
 
88
    /* Copy the packet. */
 
89
    pj_memcpy(pkt->rdata.pkt_info.packet, tdata->buf.start,
 
90
              tdata->buf.cur - tdata->buf.start);
 
91
    pkt->rdata.pkt_info.len = tdata->buf.cur - tdata->buf.start;
 
92
 
 
93
    /* the source address */
 
94
    pkt->rdata.pkt_info.src_addr.addr.sa_family = pj_AF_INET();
 
95
 
 
96
    /* "Source address" info. */
 
97
    pkt->rdata.pkt_info.src_addr_len = sizeof(pj_sockaddr_in);
 
98
    if (loop->base.key.type == PJSIP_TRANSPORT_LOOP) {
 
99
        pj_ansi_strcpy(pkt->rdata.pkt_info.src_name, ADDR_LOOP);
 
100
    } else {
 
101
        pj_ansi_strcpy(pkt->rdata.pkt_info.src_name, ADDR_LOOP_DGRAM);
 
102
    }
 
103
    pkt->rdata.pkt_info.src_port = loop->base.local_name.port;
 
104
 
 
105
    /* When do we need to "deliver" this packet. */
 
106
    pj_gettimeofday(&pkt->rdata.pkt_info.timestamp);
 
107
    pkt->rdata.pkt_info.timestamp.msec += loop->recv_delay;
 
108
    pj_time_val_normalize(&pkt->rdata.pkt_info.timestamp);
 
109
 
 
110
    /* Done. */
 
111
 
 
112
    return pkt;
 
113
}
 
114
 
 
115
 
 
116
/* Helper function to add pending notification callback. */
 
117
static pj_status_t add_notification( struct loop_transport *loop,
 
118
                                     pjsip_tx_data *tdata,
 
119
                                     pj_ssize_t sent,
 
120
                                     void *token,
 
121
                                     void (*callback)(pjsip_transport*,
 
122
                                                      void*, pj_ssize_t))
 
123
{
 
124
    struct send_list *sent_status;
 
125
 
 
126
    pjsip_tx_data_add_ref(tdata);
 
127
    pj_lock_acquire(tdata->lock);
 
128
    sent_status = PJ_POOL_ALLOC_T(tdata->pool, struct send_list);
 
129
    pj_lock_release(tdata->lock);
 
130
 
 
131
    sent_status->sent = sent;
 
132
    sent_status->tdata = tdata;
 
133
    sent_status->token = token;
 
134
    sent_status->callback = callback;
 
135
 
 
136
    pj_gettimeofday(&sent_status->sent_time);
 
137
    sent_status->sent_time.msec += loop->send_delay;
 
138
    pj_time_val_normalize(&sent_status->sent_time);
 
139
 
 
140
    pj_lock_acquire(loop->base.lock);
 
141
    pj_list_push_back(&loop->send_list, sent_status);
 
142
    pj_lock_release(loop->base.lock);
 
143
 
 
144
    return PJ_SUCCESS;
 
145
}
 
146
 
 
147
/* Handler for sending outgoing message; called by transport manager. */
 
148
static pj_status_t loop_send_msg( pjsip_transport *tp,
 
149
                                  pjsip_tx_data *tdata,
 
150
                                  const pj_sockaddr_t *rem_addr,
 
151
                                  int addr_len,
 
152
                                  void *token,
 
153
                                  pjsip_transport_callback cb)
 
154
{
 
155
    struct loop_transport *loop = (struct loop_transport*)tp;
 
156
    struct recv_list *recv_pkt;
 
157
 
 
158
    PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
 
159
                     tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
 
160
 
 
161
    PJ_UNUSED_ARG(rem_addr);
 
162
    PJ_UNUSED_ARG(addr_len);
 
163
 
 
164
 
 
165
    /* Need to send failure? */
 
166
    if (loop->fail_mode) {
 
167
        if (loop->send_delay == 0) {
 
168
            return PJ_STATUS_FROM_OS(OSERR_ECONNRESET);
 
169
        } else {
 
170
            add_notification(loop, tdata, -PJ_STATUS_FROM_OS(OSERR_ECONNRESET),
 
171
                             token, cb);
 
172
 
 
173
            return PJ_EPENDING;
 
174
        }
 
175
    }
 
176
 
 
177
    /* Discard any packets? */
 
178
    if (loop->discard)
 
179
        return PJ_SUCCESS;
 
180
 
 
181
    /* Create rdata for the "incoming" packet. */
 
182
    recv_pkt = create_incoming_packet(loop, tdata);
 
183
    if (!recv_pkt)
 
184
        return PJ_ENOMEM;
 
185
 
 
186
    /* If delay is not configured, deliver this packet now! */
 
187
    if (loop->recv_delay == 0) {
 
188
        pj_ssize_t size_eaten;
 
189
 
 
190
        size_eaten = pjsip_tpmgr_receive_packet( loop->base.tpmgr,
 
191
                                                 &recv_pkt->rdata);
 
192
        pj_assert(size_eaten == recv_pkt->rdata.pkt_info.len);
 
193
 
 
194
        pjsip_endpt_release_pool(loop->base.endpt,
 
195
                                 recv_pkt->rdata.tp_info.pool);
 
196
 
 
197
    } else {
 
198
        /* Otherwise if delay is configured, add the "packet" to the
 
199
         * receive list to be processed by worker thread.
 
200
         */
 
201
        pj_lock_acquire(loop->base.lock);
 
202
        pj_list_push_back(&loop->recv_list, recv_pkt);
 
203
        pj_lock_release(loop->base.lock);
 
204
    }
 
205
 
 
206
    if (loop->send_delay != 0) {
 
207
        add_notification(loop, tdata, tdata->buf.cur - tdata->buf.start,
 
208
                         token, cb);
 
209
        return PJ_EPENDING;
 
210
    } else {
 
211
        return PJ_SUCCESS;
 
212
    }
 
213
}
 
214
 
 
215
/* Handler to destroy the transport; called by transport manager */
 
216
static pj_status_t loop_destroy(pjsip_transport *tp)
 
217
{
 
218
    struct loop_transport *loop = (struct loop_transport*)tp;
 
219
 
 
220
    PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
 
221
                     tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
 
222
 
 
223
    loop->thread_quit_flag = 1;
 
224
    /* Unlock transport mutex before joining thread. */
 
225
    pj_lock_release(tp->lock);
 
226
    pj_thread_join(loop->thread);
 
227
    pj_thread_destroy(loop->thread);
 
228
 
 
229
    /* Clear pending send notifications. */
 
230
    while (!pj_list_empty(&loop->send_list)) {
 
231
        struct send_list *node = loop->send_list.next;
 
232
        /* Notify callback. */
 
233
        if (node->callback) {
 
234
            (*node->callback)(&loop->base, node->token, -PJSIP_ESHUTDOWN);
 
235
        }
 
236
        pj_list_erase(node);
 
237
        pjsip_tx_data_dec_ref(node->tdata);
 
238
    }
 
239
 
 
240
    /* Clear "incoming" packets in the queue. */
 
241
    while (!pj_list_empty(&loop->recv_list)) {
 
242
        struct recv_list *node = loop->recv_list.next;
 
243
        pj_list_erase(node);
 
244
        pjsip_endpt_release_pool(loop->base.endpt,
 
245
                                 node->rdata.tp_info.pool);
 
246
    }
 
247
 
 
248
    /* Self destruct.. heheh.. */
 
249
    pj_lock_destroy(loop->base.lock);
 
250
    pj_atomic_destroy(loop->base.ref_cnt);
 
251
    pjsip_endpt_release_pool(loop->base.endpt, loop->base.pool);
 
252
 
 
253
    return PJ_SUCCESS;
 
254
}
 
255
 
 
256
/* Worker thread for loop transport. */
 
257
static int loop_transport_worker_thread(void *arg)
 
258
{
 
259
    struct loop_transport *loop = (struct loop_transport*) arg;
 
260
    struct recv_list r;
 
261
    struct send_list s;
 
262
 
 
263
    pj_list_init(&r);
 
264
    pj_list_init(&s);
 
265
 
 
266
    while (!loop->thread_quit_flag) {
 
267
        pj_time_val now;
 
268
 
 
269
        pj_thread_sleep(1);
 
270
        pj_gettimeofday(&now);
 
271
 
 
272
        pj_lock_acquire(loop->base.lock);
 
273
 
 
274
        /* Move expired send notification to local list. */
 
275
        while (!pj_list_empty(&loop->send_list)) {
 
276
            struct send_list *node = loop->send_list.next;
 
277
 
 
278
            /* Break when next node time is greater than now. */
 
279
            if (PJ_TIME_VAL_GTE(node->sent_time, now))
 
280
                break;
 
281
 
 
282
            /* Delete this from the list. */
 
283
            pj_list_erase(node);
 
284
 
 
285
            /* Add to local list. */
 
286
            pj_list_push_back(&s, node);
 
287
        }
 
288
 
 
289
        /* Move expired "incoming" packet to local list. */
 
290
        while (!pj_list_empty(&loop->recv_list)) {
 
291
            struct recv_list *node = loop->recv_list.next;
 
292
 
 
293
            /* Break when next node time is greater than now. */
 
294
            if (PJ_TIME_VAL_GTE(node->rdata.pkt_info.timestamp, now))
 
295
                break;
 
296
 
 
297
            /* Delete this from the list. */
 
298
            pj_list_erase(node);
 
299
 
 
300
            /* Add to local list. */
 
301
            pj_list_push_back(&r, node);
 
302
 
 
303
        }
 
304
 
 
305
        pj_lock_release(loop->base.lock);
 
306
 
 
307
        /* Process send notification and incoming packet notification
 
308
         * without holding down the loop's mutex.
 
309
         */
 
310
        while (!pj_list_empty(&s)) {
 
311
            struct send_list *node = s.next;
 
312
 
 
313
            pj_list_erase(node);
 
314
 
 
315
            /* Notify callback. */
 
316
            if (node->callback) {
 
317
                (*node->callback)(&loop->base, node->token, node->sent);
 
318
            }
 
319
 
 
320
            /* Decrement tdata reference counter. */
 
321
            pjsip_tx_data_dec_ref(node->tdata);
 
322
        }
 
323
 
 
324
        /* Process "incoming" packet. */
 
325
        while (!pj_list_empty(&r)) {
 
326
            struct recv_list *node = r.next;
 
327
            pj_ssize_t size_eaten;
 
328
 
 
329
            pj_list_erase(node);
 
330
 
 
331
            /* Notify transport manager about the "incoming packet" */
 
332
            size_eaten = pjsip_tpmgr_receive_packet(loop->base.tpmgr,
 
333
                                                    &node->rdata);
 
334
 
 
335
            /* Must "eat" all the packets. */
 
336
            pj_assert(size_eaten == node->rdata.pkt_info.len);
 
337
 
 
338
            /* Done. */
 
339
            pjsip_endpt_release_pool(loop->base.endpt,
 
340
                                     node->rdata.tp_info.pool);
 
341
        }
 
342
    }
 
343
 
 
344
    return 0;
 
345
}
 
346
 
 
347
 
 
348
/* Start loop transport. */
 
349
PJ_DEF(pj_status_t) pjsip_loop_start( pjsip_endpoint *endpt,
 
350
                                      pjsip_transport **transport)
 
351
{
 
352
    pj_pool_t *pool;
 
353
    struct loop_transport *loop;
 
354
    pj_status_t status;
 
355
 
 
356
    /* Create pool. */
 
357
    pool = pjsip_endpt_create_pool(endpt, "loop", 4000, 4000);
 
358
    if (!pool)
 
359
        return PJ_ENOMEM;
 
360
 
 
361
    /* Create the loop structure. */
 
362
    loop = PJ_POOL_ZALLOC_T(pool, struct loop_transport);
 
363
 
 
364
    /* Initialize transport properties. */
 
365
    pj_ansi_snprintf(loop->base.obj_name, sizeof(loop->base.obj_name),
 
366
                     "loop%p", loop);
 
367
    loop->base.pool = pool;
 
368
    status = pj_atomic_create(pool, 0, &loop->base.ref_cnt);
 
369
    if (status != PJ_SUCCESS)
 
370
        goto on_error;
 
371
    status = pj_lock_create_recursive_mutex(pool, "loop", &loop->base.lock);
 
372
    if (status != PJ_SUCCESS)
 
373
        goto on_error;
 
374
    loop->base.key.type = PJSIP_TRANSPORT_LOOP_DGRAM;
 
375
    //loop->base.key.rem_addr.sa_family = pj_AF_INET();
 
376
    loop->base.type_name = "LOOP-DGRAM";
 
377
    loop->base.info = "LOOP-DGRAM";
 
378
    loop->base.flag = PJSIP_TRANSPORT_DATAGRAM;
 
379
    loop->base.local_name.host = pj_str(ADDR_LOOP_DGRAM);
 
380
    loop->base.local_name.port =
 
381
        pjsip_transport_get_default_port_for_type((pjsip_transport_type_e)
 
382
                                                  loop->base.key.type);
 
383
    loop->base.addr_len = sizeof(pj_sockaddr_in);
 
384
    loop->base.dir = PJSIP_TP_DIR_NONE;
 
385
    loop->base.endpt = endpt;
 
386
    loop->base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
 
387
    loop->base.send_msg = &loop_send_msg;
 
388
    loop->base.destroy = &loop_destroy;
 
389
 
 
390
    pj_list_init(&loop->recv_list);
 
391
    pj_list_init(&loop->send_list);
 
392
 
 
393
    /* Create worker thread. */
 
394
    status = pj_thread_create(pool, "loop",
 
395
                              &loop_transport_worker_thread, loop, 0,
 
396
                              PJ_THREAD_SUSPENDED, &loop->thread);
 
397
    if (status != PJ_SUCCESS)
 
398
        goto on_error;
 
399
 
 
400
    /* Register to transport manager. */
 
401
    status = pjsip_transport_register( loop->base.tpmgr, &loop->base);
 
402
    if (status != PJ_SUCCESS)
 
403
        goto on_error;
 
404
 
 
405
    /* Start the thread. */
 
406
    status = pj_thread_resume(loop->thread);
 
407
    if (status != PJ_SUCCESS)
 
408
        goto on_error;
 
409
 
 
410
    /*
 
411
     * Done.
 
412
     */
 
413
 
 
414
    if (transport)
 
415
        *transport = &loop->base;
 
416
 
 
417
    return PJ_SUCCESS;
 
418
 
 
419
on_error:
 
420
    if (loop->base.lock)
 
421
        pj_lock_destroy(loop->base.lock);
 
422
    if (loop->thread)
 
423
        pj_thread_destroy(loop->thread);
 
424
    if (loop->base.ref_cnt)
 
425
        pj_atomic_destroy(loop->base.ref_cnt);
 
426
    pjsip_endpt_release_pool(endpt, loop->pool);
 
427
    return status;
 
428
}
 
429
 
 
430
 
 
431
PJ_DEF(pj_status_t) pjsip_loop_set_discard( pjsip_transport *tp,
 
432
                                            pj_bool_t discard,
 
433
                                            pj_bool_t *prev_value )
 
434
{
 
435
    struct loop_transport *loop = (struct loop_transport*)tp;
 
436
 
 
437
    PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
 
438
                     tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
 
439
 
 
440
    if (prev_value)
 
441
        *prev_value = loop->discard;
 
442
    loop->discard = discard;
 
443
 
 
444
    return PJ_SUCCESS;
 
445
}
 
446
 
 
447
 
 
448
PJ_DEF(pj_status_t) pjsip_loop_set_failure( pjsip_transport *tp,
 
449
                                            int fail_flag,
 
450
                                            int *prev_value )
 
451
{
 
452
    struct loop_transport *loop = (struct loop_transport*)tp;
 
453
 
 
454
    PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
 
455
                     tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
 
456
 
 
457
    if (prev_value)
 
458
        *prev_value = loop->fail_mode;
 
459
    loop->fail_mode = fail_flag;
 
460
 
 
461
    return PJ_SUCCESS;
 
462
}
 
463
 
 
464
 
 
465
PJ_DEF(pj_status_t) pjsip_loop_set_recv_delay( pjsip_transport *tp,
 
466
                                               unsigned delay,
 
467
                                               unsigned *prev_value)
 
468
{
 
469
    struct loop_transport *loop = (struct loop_transport*)tp;
 
470
 
 
471
    PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
 
472
                     tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
 
473
 
 
474
    if (prev_value)
 
475
        *prev_value = loop->recv_delay;
 
476
    loop->recv_delay = delay;
 
477
 
 
478
    return PJ_SUCCESS;
 
479
}
 
480
 
 
481
PJ_DEF(pj_status_t) pjsip_loop_set_send_callback_delay( pjsip_transport *tp,
 
482
                                                        unsigned delay,
 
483
                                                        unsigned *prev_value)
 
484
{
 
485
    struct loop_transport *loop = (struct loop_transport*)tp;
 
486
 
 
487
    PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
 
488
                     tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
 
489
 
 
490
    if (prev_value)
 
491
        *prev_value = loop->send_delay;
 
492
    loop->send_delay = delay;
 
493
 
 
494
    return PJ_SUCCESS;
 
495
}
 
496
 
 
497
PJ_DEF(pj_status_t) pjsip_loop_set_delay( pjsip_transport *tp, unsigned delay )
 
498
{
 
499
    struct loop_transport *loop = (struct loop_transport*)tp;
 
500
 
 
501
    PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
 
502
                     tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
 
503
 
 
504
    loop->recv_delay = delay;
 
505
    loop->send_delay = delay;
 
506
 
 
507
    return PJ_SUCCESS;
 
508
}