1
/* $Id: echo_clt.c 3553 2011-05-05 06:14:19Z nanang $ */
3
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23
#if INCLUDE_ECHO_CLIENT
25
enum { BUF_SIZE = 512 };
34
static pj_atomic_t *totalBytes;
35
static pj_atomic_t *timeout_counter;
36
static pj_atomic_t *invalid_counter;
38
#define MSEC_PRINT_DURATION 1000
40
static int wait_socket(pj_sock_t sock, unsigned msec_timeout)
46
timeout.msec = msec_timeout;
47
pj_time_val_normalize(&timeout);
50
PJ_FD_SET(sock, &fdset);
52
return pj_sock_select(FD_SETSIZE, &fdset, NULL, NULL, &timeout);
55
static int echo_client_thread(void *arg)
58
char send_buf[BUF_SIZE];
59
char recv_buf[BUF_SIZE];
63
pj_uint32_t buffer_id;
64
pj_uint32_t buffer_counter;
65
struct client *client = arg;
66
pj_status_t last_recv_err = PJ_SUCCESS, last_send_err = PJ_SUCCESS;
69
rc = app_socket(pj_AF_INET(), client->sock_type, 0, -1, &sock);
70
if (rc != PJ_SUCCESS) {
71
app_perror("...unable to create socket", rc);
75
rc = pj_sockaddr_in_init( &addr, pj_cstr(&s, client->server),
76
(pj_uint16_t)client->port);
77
if (rc != PJ_SUCCESS) {
78
app_perror("...unable to resolve server", rc);
82
rc = pj_sock_connect(sock, &addr, sizeof(addr));
83
if (rc != PJ_SUCCESS) {
84
app_perror("...connect() error", rc);
89
PJ_LOG(3,("", "...socket connected to %s:%d",
90
pj_inet_ntoa(addr.sin_addr),
91
pj_ntohs(addr.sin_port)));
93
pj_memset(send_buf, 'A', BUF_SIZE);
94
send_buf[BUF_SIZE-1]='\0';
96
/* Give other thread chance to initialize themselves! */
99
//PJ_LOG(3,("", "...thread %p running", pj_thread_this()));
101
buffer_id = (pj_uint32_t) pj_thread_this();
104
*(pj_uint32_t*)send_buf = buffer_id;
112
//while (wait_socket(sock,0) > 0)
117
*(pj_uint32_t*)(send_buf+4) = ++buffer_counter;
118
rc = pj_sock_send(sock, send_buf, &bytes, 0);
119
if (rc != PJ_SUCCESS || bytes != BUF_SIZE) {
120
if (rc != last_send_err) {
121
app_perror("...send() error", rc);
122
PJ_LOG(3,("", "...ignoring subsequent error.."));
124
pj_thread_sleep(100);
129
rc = wait_socket(sock, 500);
131
PJ_LOG(3,("", "...timeout"));
133
pj_atomic_inc(timeout_counter);
135
rc = pj_get_netos_error();
136
app_perror("...select() error", rc);
139
/* Receive back the original packet. */
142
pj_ssize_t received = BUF_SIZE - bytes;
143
rc = pj_sock_recv(sock, recv_buf+bytes, &received, 0);
144
if (rc != PJ_SUCCESS || received == 0) {
145
if (rc != last_recv_err) {
146
app_perror("...recv() error", rc);
147
PJ_LOG(3,("", "...ignoring subsequent error.."));
149
pj_thread_sleep(100);
156
} while (bytes != BUF_SIZE && bytes != 0);
162
if (pj_memcmp(send_buf, recv_buf, BUF_SIZE) != 0) {
163
recv_buf[BUF_SIZE-1] = '\0';
164
PJ_LOG(3,("", "...error: buffer %u has changed!\n"
167
counter, send_buf, recv_buf));
168
pj_atomic_inc(invalid_counter);
171
/* Accumulate total received. */
172
pj_atomic_add(totalBytes, bytes);
179
int echo_client(int sock_type, const char *server, int port)
182
pj_thread_t *thread[ECHO_CLIENT_MAX_THREADS];
184
struct client client;
186
pj_atomic_value_t last_received;
187
pj_timestamp last_report;
189
client.sock_type = sock_type;
190
client.server = server;
193
pool = pj_pool_create( mem, NULL, 4000, 4000, NULL );
195
rc = pj_atomic_create(pool, 0, &totalBytes);
196
if (rc != PJ_SUCCESS) {
197
PJ_LOG(3,("", "...error: unable to create atomic variable", rc));
200
rc = pj_atomic_create(pool, 0, &invalid_counter);
201
rc = pj_atomic_create(pool, 0, &timeout_counter);
203
PJ_LOG(3,("", "Echo client started"));
204
PJ_LOG(3,("", " Destination: %s:%d",
205
ECHO_SERVER_ADDRESS, ECHO_SERVER_START_PORT));
206
PJ_LOG(3,("", " Press Ctrl-C to exit"));
208
for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) {
209
rc = pj_thread_create( pool, NULL, &echo_client_thread, &client,
210
PJ_THREAD_DEFAULT_STACK_SIZE, 0,
212
if (rc != PJ_SUCCESS) {
213
app_perror("...error: unable to create thread", rc);
219
pj_get_timestamp(&last_report);
223
unsigned long received, cur_received;
228
pj_uint32_t timeout, invalid;
230
pj_thread_sleep(1000);
232
pj_get_timestamp(&now);
233
elapsed = pj_elapsed_time(&last_report, &now);
234
msec = PJ_TIME_VAL_MSEC(elapsed);
236
received = pj_atomic_get(totalBytes);
237
cur_received = received - last_received;
240
pj_highprec_mul(bw, 1000);
241
pj_highprec_div(bw, msec);
246
last_received = received;
248
timeout = pj_atomic_get(timeout_counter);
249
invalid = pj_atomic_get(invalid_counter);
252
"...%d threads, total bandwidth: %d KB/s, "
253
"timeout=%d, invalid=%d",
254
ECHO_CLIENT_MAX_THREADS, bw32/1000,
258
for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) {
259
pj_thread_join( thread[i] );
262
pj_pool_release(pool);
268
int dummy_echo_client;
269
#endif /* INCLUDE_ECHO_CLIENT */