1
/* $Id: echo_clt.c 2394 2008-12-23 17:27:53Z bennylp $ */
3
* Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
* Additional permission under GNU GPL version 3 section 7:
22
* If you modify this program, or any covered work, by linking or
23
* combining it with the OpenSSL project's OpenSSL library (or a
24
* modified version of that library), containing parts covered by the
25
* terms of the OpenSSL or SSLeay licenses, Teluu Inc. (http://www.teluu.com)
26
* grants you additional permission to convey the resulting work.
27
* Corresponding Source for a non-source form of such a combination
28
* shall include the source code for the parts of OpenSSL used as well
29
* as that of the covered work.
34
#if INCLUDE_ECHO_CLIENT
36
enum { BUF_SIZE = 512 };
45
static pj_atomic_t *totalBytes;
46
static pj_atomic_t *timeout_counter;
47
static pj_atomic_t *invalid_counter;
49
#define MSEC_PRINT_DURATION 1000
51
static int wait_socket(pj_sock_t sock, unsigned msec_timeout)
57
timeout.msec = msec_timeout;
58
pj_time_val_normalize(&timeout);
61
PJ_FD_SET(sock, &fdset);
63
return pj_sock_select(FD_SETSIZE, &fdset, NULL, NULL, &timeout);
66
static int echo_client_thread(void *arg)
69
char send_buf[BUF_SIZE];
70
char recv_buf[BUF_SIZE];
74
pj_uint32_t buffer_id;
75
pj_uint32_t buffer_counter;
76
struct client *client = arg;
77
pj_status_t last_recv_err = PJ_SUCCESS, last_send_err = PJ_SUCCESS;
80
rc = app_socket(pj_AF_INET(), client->sock_type, 0, -1, &sock);
81
if (rc != PJ_SUCCESS) {
82
app_perror("...unable to create socket", rc);
86
rc = pj_sockaddr_in_init( &addr, pj_cstr(&s, client->server),
87
(pj_uint16_t)client->port);
88
if (rc != PJ_SUCCESS) {
89
app_perror("...unable to resolve server", rc);
93
rc = pj_sock_connect(sock, &addr, sizeof(addr));
94
if (rc != PJ_SUCCESS) {
95
app_perror("...connect() error", rc);
100
PJ_LOG(3,("", "...socket connected to %s:%d",
101
pj_inet_ntoa(addr.sin_addr),
102
pj_ntohs(addr.sin_port)));
104
pj_memset(send_buf, 'A', BUF_SIZE);
105
send_buf[BUF_SIZE-1]='\0';
107
/* Give other thread chance to initialize themselves! */
108
pj_thread_sleep(200);
110
//PJ_LOG(3,("", "...thread %p running", pj_thread_this()));
112
buffer_id = (pj_uint32_t) pj_thread_this();
115
*(pj_uint32_t*)send_buf = buffer_id;
123
//while (wait_socket(sock,0) > 0)
128
*(pj_uint32_t*)(send_buf+4) = ++buffer_counter;
129
rc = pj_sock_send(sock, send_buf, &bytes, 0);
130
if (rc != PJ_SUCCESS || bytes != BUF_SIZE) {
131
if (rc != last_send_err) {
132
app_perror("...send() error", rc);
133
PJ_LOG(3,("", "...ignoring subsequent error.."));
135
pj_thread_sleep(100);
140
rc = wait_socket(sock, 500);
142
PJ_LOG(3,("", "...timeout"));
144
pj_atomic_inc(timeout_counter);
146
rc = pj_get_netos_error();
147
app_perror("...select() error", rc);
150
/* Receive back the original packet. */
153
pj_ssize_t received = BUF_SIZE - bytes;
154
rc = pj_sock_recv(sock, recv_buf+bytes, &received, 0);
155
if (rc != PJ_SUCCESS || received == 0) {
156
if (rc != last_recv_err) {
157
app_perror("...recv() error", rc);
158
PJ_LOG(3,("", "...ignoring subsequent error.."));
160
pj_thread_sleep(100);
167
} while (bytes != BUF_SIZE && bytes != 0);
173
if (pj_memcmp(send_buf, recv_buf, BUF_SIZE) != 0) {
174
recv_buf[BUF_SIZE-1] = '\0';
175
PJ_LOG(3,("", "...error: buffer %u has changed!\n"
178
counter, send_buf, recv_buf));
179
pj_atomic_inc(invalid_counter);
182
/* Accumulate total received. */
183
pj_atomic_add(totalBytes, bytes);
190
int echo_client(int sock_type, const char *server, int port)
193
pj_thread_t *thread[ECHO_CLIENT_MAX_THREADS];
195
struct client client;
197
pj_atomic_value_t last_received;
198
pj_timestamp last_report;
200
client.sock_type = sock_type;
201
client.server = server;
204
pool = pj_pool_create( mem, NULL, 4000, 4000, NULL );
206
rc = pj_atomic_create(pool, 0, &totalBytes);
207
if (rc != PJ_SUCCESS) {
208
PJ_LOG(3,("", "...error: unable to create atomic variable", rc));
211
rc = pj_atomic_create(pool, 0, &invalid_counter);
212
rc = pj_atomic_create(pool, 0, &timeout_counter);
214
PJ_LOG(3,("", "Echo client started"));
215
PJ_LOG(3,("", " Destination: %s:%d",
216
ECHO_SERVER_ADDRESS, ECHO_SERVER_START_PORT));
217
PJ_LOG(3,("", " Press Ctrl-C to exit"));
219
for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) {
220
rc = pj_thread_create( pool, NULL, &echo_client_thread, &client,
221
PJ_THREAD_DEFAULT_STACK_SIZE, 0,
223
if (rc != PJ_SUCCESS) {
224
app_perror("...error: unable to create thread", rc);
230
pj_get_timestamp(&last_report);
234
unsigned long received, cur_received;
239
pj_uint32_t timeout, invalid;
241
pj_thread_sleep(1000);
243
pj_get_timestamp(&now);
244
elapsed = pj_elapsed_time(&last_report, &now);
245
msec = PJ_TIME_VAL_MSEC(elapsed);
247
received = pj_atomic_get(totalBytes);
248
cur_received = received - last_received;
251
pj_highprec_mul(bw, 1000);
252
pj_highprec_div(bw, msec);
257
last_received = received;
259
timeout = pj_atomic_get(timeout_counter);
260
invalid = pj_atomic_get(invalid_counter);
263
"...%d threads, total bandwidth: %d KB/s, "
264
"timeout=%d, invalid=%d",
265
ECHO_CLIENT_MAX_THREADS, bw32/1000,
269
for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) {
270
pj_thread_join( thread[i] );
273
pj_pool_release(pool);
279
int dummy_echo_client;
280
#endif /* INCLUDE_ECHO_CLIENT */