~ubuntu-branches/ubuntu/utopic/sflphone/utopic-proposed

« back to all changes in this revision

Viewing changes to daemon/libs/pjproject-2.0.1/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c

  • Committer: Package Import Robot
  • Author(s): Mark Purcell
  • Date: 2014-01-28 18:23:36 UTC
  • mfrom: (4.3.4 sid)
  • Revision ID: package-import@ubuntu.com-20140128182336-jrsv0k9u6cawc068
Tags: 1.3.0-1
* New upstream release 
  - Fixes "New Upstream Release" (Closes: #735846)
  - Fixes "Ringtone does not stop" (Closes: #727164)
  - Fixes "[sflphone-kde] crash on startup" (Closes: #718178)
  - Fixes "sflphone GUI crashes when call is hung up" (Closes: #736583)
* Build-Depends: ensure GnuTLS 2.6
  - libucommon-dev (>= 6.0.7-1.1), libccrtp-dev (>= 2.0.6-3)
  - Fixes "FTBFS Build-Depends libgnutls{26,28}-dev" (Closes: #722040)
* Fix "boost 1.49 is going away" unversioned Build-Depends: (Closes: #736746)
* Add Build-Depends: libsndfile-dev, nepomuk-core-dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* $Id$ */
2
 
/*
3
 
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
 
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5
 
 *
6
 
 * This program is free software; you can redistribute it and/or modify
7
 
 * it under the terms of the GNU General Public License as published by
8
 
 * the Free Software Foundation; either version 2 of the License, or
9
 
 * (at your option) any later version.
10
 
 *
11
 
 * This program is distributed in the hope that it will be useful,
12
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 * GNU General Public License for more details.
15
 
 *
16
 
 * You should have received a copy of the GNU General Public License
17
 
 * along with this program; if not, write to the Free Software
18
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
 
 */
20
 
#include <pjlib.h>
21
 
#include "test.h"
22
 
 
23
 
static pj_ioqueue_key_t *key;
24
 
static pj_atomic_t *total_bytes;
25
 
static pj_bool_t thread_quit_flag;
26
 
 
27
 
struct op_key
28
 
{
29
 
    pj_ioqueue_op_key_t  op_key_;
30
 
    struct op_key       *peer;
31
 
    char                *buffer;
32
 
    pj_size_t            size;
33
 
    int                  is_pending;
34
 
    pj_status_t          last_err;
35
 
    pj_sockaddr_in       addr;
36
 
    int                  addrlen;
37
 
};
38
 
 
39
 
static void on_read_complete(pj_ioqueue_key_t *key,
40
 
                             pj_ioqueue_op_key_t *op_key,
41
 
                             pj_ssize_t bytes_received)
42
 
{
43
 
    pj_status_t rc;
44
 
    struct op_key *recv_rec = (struct op_key *)op_key;
45
 
 
46
 
    for (;;) {
47
 
        struct op_key *send_rec = recv_rec->peer;
48
 
        recv_rec->is_pending = 0;
49
 
 
50
 
        if (bytes_received < 0) {
51
 
            if (-bytes_received != recv_rec->last_err) {
52
 
                recv_rec->last_err = -bytes_received;
53
 
                app_perror("...error receiving data", -bytes_received);
54
 
            }
55
 
        } else if (bytes_received == 0) {
56
 
            /* note: previous error, or write callback */
57
 
        } else {
58
 
            pj_atomic_add(total_bytes, bytes_received);
59
 
 
60
 
            if (!send_rec->is_pending) {
61
 
                pj_ssize_t sent = bytes_received;
62
 
                pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);
63
 
                pj_memcpy(&send_rec->addr, &recv_rec->addr, recv_rec->addrlen);
64
 
                send_rec->addrlen = recv_rec->addrlen;
65
 
                rc = pj_ioqueue_sendto(key, &send_rec->op_key_,
66
 
                                       send_rec->buffer, &sent, 0,
67
 
                                       &send_rec->addr, send_rec->addrlen);
68
 
                send_rec->is_pending = (rc==PJ_EPENDING);
69
 
 
70
 
                if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {
71
 
                    app_perror("...send error(1)", rc);
72
 
                }
73
 
            }
74
 
        }
75
 
 
76
 
        if (!send_rec->is_pending) {
77
 
            bytes_received = recv_rec->size;
78
 
            rc = pj_ioqueue_recvfrom(key, &recv_rec->op_key_,
79
 
                                     recv_rec->buffer, &bytes_received, 0,
80
 
                                     &recv_rec->addr, &recv_rec->addrlen);
81
 
            recv_rec->is_pending = (rc==PJ_EPENDING);
82
 
            if (rc == PJ_SUCCESS) {
83
 
                /* fall through next loop. */
84
 
            } else if (rc == PJ_EPENDING) {
85
 
                /* quit callback. */
86
 
                break;
87
 
            } else {
88
 
                /* error */
89
 
                app_perror("...recv error", rc);
90
 
                recv_rec->last_err = rc;
91
 
 
92
 
                bytes_received = 0;
93
 
                /* fall through next loop. */
94
 
            }
95
 
        } else {
96
 
            /* recv will be done when write completion callback is called. */
97
 
            break;
98
 
        }
99
 
    }
100
 
}
101
 
 
102
 
static void on_write_complete(pj_ioqueue_key_t *key,
103
 
                              pj_ioqueue_op_key_t *op_key,
104
 
                              pj_ssize_t bytes_sent)
105
 
{
106
 
    struct op_key *send_rec = (struct op_key*)op_key;
107
 
 
108
 
    if (bytes_sent <= 0) {
109
 
        pj_status_t rc = -bytes_sent;
110
 
        if (rc != send_rec->last_err) {
111
 
            send_rec->last_err = rc;
112
 
            app_perror("...send error(2)", rc);
113
 
        }
114
 
    }
115
 
 
116
 
    send_rec->is_pending = 0;
117
 
    on_read_complete(key, &send_rec->peer->op_key_, 0);
118
 
}
119
 
 
120
 
static int worker_thread(void *arg)
121
 
{
122
 
    pj_ioqueue_t *ioqueue = (pj_ioqueue_t*) arg;
123
 
    struct op_key read_op, write_op;
124
 
    char recv_buf[512], send_buf[512];
125
 
    pj_ssize_t length;
126
 
    pj_status_t rc;
127
 
 
128
 
    read_op.peer = &write_op;
129
 
    read_op.is_pending = 0;
130
 
    read_op.last_err = 0;
131
 
    read_op.buffer = recv_buf;
132
 
    read_op.size = sizeof(recv_buf);
133
 
    read_op.addrlen = sizeof(read_op.addr);
134
 
 
135
 
    write_op.peer = &read_op;
136
 
    write_op.is_pending = 0;
137
 
    write_op.last_err = 0;
138
 
    write_op.buffer = send_buf;
139
 
    write_op.size = sizeof(send_buf);
140
 
 
141
 
    length = sizeof(recv_buf);
142
 
    rc = pj_ioqueue_recvfrom(key, &read_op.op_key_, recv_buf, &length, 0,
143
 
                             &read_op.addr, &read_op.addrlen);
144
 
    if (rc == PJ_SUCCESS) {
145
 
        read_op.is_pending = 1;
146
 
        on_read_complete(key, &read_op.op_key_, length);
147
 
    }
148
 
 
149
 
    while (!thread_quit_flag) {
150
 
        pj_time_val timeout;
151
 
        timeout.sec = 0; timeout.msec = 10;
152
 
        rc = pj_ioqueue_poll(ioqueue, &timeout);
153
 
    }
154
 
    return 0;
155
 
}
156
 
 
157
 
int udp_echo_srv_ioqueue(void)
158
 
{
159
 
    pj_pool_t *pool;
160
 
    pj_sock_t sock;
161
 
    pj_ioqueue_t *ioqueue;
162
 
    pj_ioqueue_callback callback;
163
 
    int i;
164
 
    pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
165
 
    pj_status_t rc;
166
 
 
167
 
    pj_bzero(&callback, sizeof(callback));
168
 
    callback.on_read_complete = &on_read_complete;
169
 
    callback.on_write_complete = &on_write_complete;
170
 
 
171
 
    pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
172
 
    if (!pool)
173
 
        return -10;
174
 
 
175
 
    rc = pj_ioqueue_create(pool, 2, &ioqueue);
176
 
    if (rc != PJ_SUCCESS) {
177
 
        app_perror("...pj_ioqueue_create error", rc);
178
 
        return -20;
179
 
    }
180
 
 
181
 
    rc = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0,
182
 
                    ECHO_SERVER_START_PORT, &sock);
183
 
    if (rc != PJ_SUCCESS) {
184
 
        app_perror("...app_socket error", rc);
185
 
        return -30;
186
 
    }
187
 
 
188
 
    rc = pj_ioqueue_register_sock(pool, ioqueue, sock, NULL,
189
 
                                  &callback, &key);
190
 
    if (rc != PJ_SUCCESS) {
191
 
        app_perror("...error registering socket", rc);
192
 
        return -40;
193
 
    }
194
 
 
195
 
    rc = pj_atomic_create(pool, 0, &total_bytes);
196
 
    if (rc != PJ_SUCCESS) {
197
 
        app_perror("...error creating atomic variable", rc);
198
 
        return -45;
199
 
    }
200
 
 
201
 
    for (i=0; i<ECHO_SERVER_MAX_THREADS; ++i) {
202
 
        rc = pj_thread_create(pool, NULL, &worker_thread, ioqueue,
203
 
                              PJ_THREAD_DEFAULT_STACK_SIZE, 0,
204
 
                              &thread[i]);
205
 
        if (rc != PJ_SUCCESS) {
206
 
            app_perror("...create thread error", rc);
207
 
            return -50;
208
 
        }
209
 
    }
210
 
 
211
 
    echo_srv_common_loop(total_bytes);
212
 
 
213
 
    return 0;
214
 
}