~ubuntu-branches/ubuntu/trusty/sflphone/trusty

« back to all changes in this revision

Viewing changes to daemon/libs/pjproject-2.1.0/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c

  • Committer: Package Import Robot
  • Author(s): Mark Purcell
  • Date: 2014-01-28 18:23:36 UTC
  • mfrom: (4.3.4 sid)
  • Revision ID: package-import@ubuntu.com-20140128182336-jrsv0k9u6cawc068
Tags: 1.3.0-1
* New upstream release 
  - Fixes "New Upstream Release" (Closes: #735846)
  - Fixes "Ringtone does not stop" (Closes: #727164)
  - Fixes "[sflphone-kde] crash on startup" (Closes: #718178)
  - Fixes "sflphone GUI crashes when call is hung up" (Closes: #736583)
* Build-Depends: ensure GnuTLS 2.6
  - libucommon-dev (>= 6.0.7-1.1), libccrtp-dev (>= 2.0.6-3)
  - Fixes "FTBFS Build-Depends libgnutls{26,28}-dev" (Closes: #722040)
* Fix "boost 1.49 is going away" unversioned Build-Depends: (Closes: #736746)
* Add Build-Depends: libsndfile-dev, nepomuk-core-dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* $Id$ */
 
2
/* 
 
3
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
 
4
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
 
5
 *
 
6
 * This program is free software; you can redistribute it and/or modify
 
7
 * it under the terms of the GNU General Public License as published by
 
8
 * the Free Software Foundation; either version 2 of the License, or
 
9
 * (at your option) any later version.
 
10
 *
 
11
 * This program is distributed in the hope that it will be useful,
 
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 * GNU General Public License for more details.
 
15
 *
 
16
 * You should have received a copy of the GNU General Public License
 
17
 * along with this program; if not, write to the Free Software
 
18
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 
19
 */
 
20
#include <pjlib.h>
 
21
#include "test.h"
 
22
 
 
23
static pj_ioqueue_key_t *key;
 
24
static pj_atomic_t *total_bytes;
 
25
static pj_bool_t thread_quit_flag;
 
26
 
 
27
struct op_key
 
28
{
 
29
    pj_ioqueue_op_key_t  op_key_;
 
30
    struct op_key       *peer;
 
31
    char                *buffer;
 
32
    pj_size_t            size;
 
33
    int                  is_pending;
 
34
    pj_status_t          last_err;
 
35
    pj_sockaddr_in       addr;
 
36
    int                  addrlen;
 
37
};
 
38
 
 
39
static void on_read_complete(pj_ioqueue_key_t *key, 
 
40
                             pj_ioqueue_op_key_t *op_key, 
 
41
                             pj_ssize_t bytes_received)
 
42
{
 
43
    pj_status_t rc;
 
44
    struct op_key *recv_rec = (struct op_key *)op_key;
 
45
 
 
46
    for (;;) {
 
47
        struct op_key *send_rec = recv_rec->peer;
 
48
        recv_rec->is_pending = 0;
 
49
 
 
50
        if (bytes_received < 0) {
 
51
            if (-bytes_received != recv_rec->last_err) {
 
52
                recv_rec->last_err = -bytes_received;
 
53
                app_perror("...error receiving data", -bytes_received);
 
54
            }
 
55
        } else if (bytes_received == 0) {
 
56
            /* note: previous error, or write callback */
 
57
        } else {
 
58
            pj_atomic_add(total_bytes, bytes_received);
 
59
 
 
60
            if (!send_rec->is_pending) {
 
61
                pj_ssize_t sent = bytes_received;
 
62
                pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);
 
63
                pj_memcpy(&send_rec->addr, &recv_rec->addr, recv_rec->addrlen);
 
64
                send_rec->addrlen = recv_rec->addrlen;
 
65
                rc = pj_ioqueue_sendto(key, &send_rec->op_key_, 
 
66
                                       send_rec->buffer, &sent, 0,
 
67
                                       &send_rec->addr, send_rec->addrlen);
 
68
                send_rec->is_pending = (rc==PJ_EPENDING);
 
69
 
 
70
                if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {
 
71
                    app_perror("...send error(1)", rc);
 
72
                }
 
73
            }
 
74
        }
 
75
 
 
76
        if (!send_rec->is_pending) {
 
77
            bytes_received = recv_rec->size;
 
78
            rc = pj_ioqueue_recvfrom(key, &recv_rec->op_key_, 
 
79
                                     recv_rec->buffer, &bytes_received, 0,
 
80
                                     &recv_rec->addr, &recv_rec->addrlen);
 
81
            recv_rec->is_pending = (rc==PJ_EPENDING);
 
82
            if (rc == PJ_SUCCESS) {
 
83
                /* fall through next loop. */
 
84
            } else if (rc == PJ_EPENDING) {
 
85
                /* quit callback. */
 
86
                break;
 
87
            } else {
 
88
                /* error */
 
89
                app_perror("...recv error", rc);
 
90
                recv_rec->last_err = rc;
 
91
 
 
92
                bytes_received = 0;
 
93
                /* fall through next loop. */
 
94
            }
 
95
        } else {
 
96
            /* recv will be done when write completion callback is called. */
 
97
            break;
 
98
        }
 
99
    }
 
100
}
 
101
 
 
102
static void on_write_complete(pj_ioqueue_key_t *key, 
 
103
                              pj_ioqueue_op_key_t *op_key, 
 
104
                              pj_ssize_t bytes_sent)
 
105
{
 
106
    struct op_key *send_rec = (struct op_key*)op_key;
 
107
 
 
108
    if (bytes_sent <= 0) {
 
109
        pj_status_t rc = -bytes_sent;
 
110
        if (rc != send_rec->last_err) {
 
111
            send_rec->last_err = rc;
 
112
            app_perror("...send error(2)", rc);
 
113
        }
 
114
    }
 
115
 
 
116
    send_rec->is_pending = 0;
 
117
    on_read_complete(key, &send_rec->peer->op_key_, 0);
 
118
}
 
119
 
 
120
static int worker_thread(void *arg)
 
121
{
 
122
    pj_ioqueue_t *ioqueue = (pj_ioqueue_t*) arg;
 
123
    struct op_key read_op, write_op;
 
124
    char recv_buf[512], send_buf[512];
 
125
    pj_ssize_t length;
 
126
    pj_status_t rc;
 
127
 
 
128
    read_op.peer = &write_op;
 
129
    read_op.is_pending = 0;
 
130
    read_op.last_err = 0;
 
131
    read_op.buffer = recv_buf;
 
132
    read_op.size = sizeof(recv_buf);
 
133
    read_op.addrlen = sizeof(read_op.addr);
 
134
 
 
135
    write_op.peer = &read_op;
 
136
    write_op.is_pending = 0;
 
137
    write_op.last_err = 0;
 
138
    write_op.buffer = send_buf;
 
139
    write_op.size = sizeof(send_buf);
 
140
 
 
141
    length = sizeof(recv_buf);
 
142
    rc = pj_ioqueue_recvfrom(key, &read_op.op_key_, recv_buf, &length, 0,
 
143
                             &read_op.addr, &read_op.addrlen);
 
144
    if (rc == PJ_SUCCESS) {
 
145
        read_op.is_pending = 1;
 
146
        on_read_complete(key, &read_op.op_key_, length);
 
147
    }
 
148
    
 
149
    while (!thread_quit_flag) {
 
150
        pj_time_val timeout;
 
151
        timeout.sec = 0; timeout.msec = 10;
 
152
        rc = pj_ioqueue_poll(ioqueue, &timeout);
 
153
    }
 
154
    return 0;
 
155
}
 
156
 
 
157
int udp_echo_srv_ioqueue(void)
 
158
{
 
159
    pj_pool_t *pool;
 
160
    pj_sock_t sock;
 
161
    pj_ioqueue_t *ioqueue;
 
162
    pj_ioqueue_callback callback;
 
163
    int i;
 
164
    pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
 
165
    pj_status_t rc;
 
166
 
 
167
    pj_bzero(&callback, sizeof(callback));
 
168
    callback.on_read_complete = &on_read_complete;
 
169
    callback.on_write_complete = &on_write_complete;
 
170
 
 
171
    pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
 
172
    if (!pool)
 
173
        return -10;
 
174
 
 
175
    rc = pj_ioqueue_create(pool, 2, &ioqueue);
 
176
    if (rc != PJ_SUCCESS) {
 
177
        app_perror("...pj_ioqueue_create error", rc);
 
178
        return -20;
 
179
    }
 
180
 
 
181
    rc = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, 
 
182
                    ECHO_SERVER_START_PORT, &sock);
 
183
    if (rc != PJ_SUCCESS) {
 
184
        app_perror("...app_socket error", rc);
 
185
        return -30;
 
186
    }
 
187
 
 
188
    rc = pj_ioqueue_register_sock(pool, ioqueue, sock, NULL,
 
189
                                  &callback, &key);
 
190
    if (rc != PJ_SUCCESS) {
 
191
        app_perror("...error registering socket", rc);
 
192
        return -40;
 
193
    }
 
194
 
 
195
    rc = pj_atomic_create(pool, 0, &total_bytes);
 
196
    if (rc != PJ_SUCCESS) {
 
197
        app_perror("...error creating atomic variable", rc);
 
198
        return -45;
 
199
    }
 
200
 
 
201
    for (i=0; i<ECHO_SERVER_MAX_THREADS; ++i) {
 
202
        rc = pj_thread_create(pool, NULL, &worker_thread, ioqueue,
 
203
                              PJ_THREAD_DEFAULT_STACK_SIZE, 0,
 
204
                              &thread[i]);
 
205
        if (rc != PJ_SUCCESS) {
 
206
            app_perror("...create thread error", rc);
 
207
            return -50;
 
208
        }
 
209
    }
 
210
 
 
211
    echo_srv_common_loop(total_bytes);
 
212
 
 
213
    return 0;
 
214
}