1
/* $Id: ioq_perf.c 3553 2011-05-05 06:14:19Z nanang $ */
3
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
#include <pj/compat/high_precision.h>
25
* \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance
27
* Test the performance of the I/O queue, using typical producer
28
* consumer test. The test should examine the effect of using multiple
29
* threads on the performance.
31
* This file is <b>pjlib-test/ioq_perf.c</b>
33
* \include pjlib-test/ioq_perf.c
36
#if INCLUDE_IOQUEUE_PERF_TEST
39
# pragma warning ( disable: 4204) // non-constant aggregate initializer
42
#define THIS_FILE "ioq_perf"
43
//#define TRACE_(expr) PJ_LOG(3,expr)
47
static pj_bool_t thread_quit_flag;
48
static pj_status_t last_error;
49
static unsigned last_error_counter;
51
/* Descriptor for each producer/consumer pair. */
52
typedef struct test_item
56
pj_ioqueue_t *ioqueue;
57
pj_ioqueue_key_t *server_key,
59
pj_ioqueue_op_key_t recv_op,
62
pj_size_t buffer_size;
63
char *outgoing_buffer;
64
char *incoming_buffer;
69
/* Callback when data has been read.
70
* Increment item->bytes_recv and ready to read the next data.
72
static void on_read_complete(pj_ioqueue_key_t *key,
73
pj_ioqueue_op_key_t *op_key,
74
pj_ssize_t bytes_read)
76
test_item *item = (test_item*)pj_ioqueue_get_user_data(key);
78
int data_is_available = 1;
80
//TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read));
87
pj_status_t rc = -bytes_read;
88
char errmsg[PJ_ERR_MSG_SIZE];
90
if (rc != last_error) {
92
pj_strerror(rc, errmsg, sizeof(errmsg));
93
PJ_LOG(3,(THIS_FILE,"...error: read error, bytes_read=%d (%s)",
96
".....additional info: total read=%u, total sent=%u",
97
item->bytes_recv, item->bytes_sent));
103
} else if (bytes_read == 0) {
104
PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
107
item->bytes_recv += bytes_read;
109
/* To assure that the test quits, even if main thread
110
* doesn't have time to run.
112
if (item->bytes_recv > item->buffer_size * 10000)
113
thread_quit_flag = 1;
115
bytes_read = item->buffer_size;
116
rc = pj_ioqueue_recv( key, op_key,
117
item->incoming_buffer, &bytes_read, 0 );
119
if (rc == PJ_SUCCESS) {
120
data_is_available = 1;
121
} else if (rc == PJ_EPENDING) {
122
data_is_available = 0;
124
data_is_available = 0;
125
if (rc != last_error) {
127
app_perror("...error: read error(1)", rc);
129
last_error_counter++;
133
if (!item->has_pending_send) {
134
pj_ssize_t sent = item->buffer_size;
135
rc = pj_ioqueue_send(item->client_key, &item->send_op,
136
item->outgoing_buffer, &sent, 0);
137
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
138
app_perror("...error: write error", rc);
141
item->has_pending_send = (rc==PJ_EPENDING);
144
} while (data_is_available);
147
/* Callback when data has been written.
148
* Increment item->bytes_sent and write the next data.
150
static void on_write_complete(pj_ioqueue_key_t *key,
151
pj_ioqueue_op_key_t *op_key,
152
pj_ssize_t bytes_sent)
154
test_item *item = (test_item*) pj_ioqueue_get_user_data(key);
156
//TRACE_((THIS_FILE, " write complete: sent = %d", bytes_sent));
158
if (thread_quit_flag)
161
item->has_pending_send = 0;
162
item->bytes_sent += bytes_sent;
164
if (bytes_sent <= 0) {
165
PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d",
171
bytes_sent = item->buffer_size;
172
rc = pj_ioqueue_send( item->client_key, op_key,
173
item->outgoing_buffer, &bytes_sent, 0);
174
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
175
app_perror("...error: write error", rc);
178
item->has_pending_send = (rc==PJ_EPENDING);
185
pj_ioqueue_t *ioqueue;
189
/* The worker thread. */
190
static int worker_thread(void *p)
192
struct thread_arg *arg = (struct thread_arg*) p;
193
const pj_time_val timeout = {0, 100};
196
while (!thread_quit_flag) {
199
rc = pj_ioqueue_poll(arg->ioqueue, &timeout);
200
//TRACE_((THIS_FILE, " thread: poll returned rc=%d", rc));
202
char errmsg[PJ_ERR_MSG_SIZE];
203
pj_strerror(-rc, errmsg, sizeof(errmsg));
204
PJ_LOG(3, (THIS_FILE,
205
"...error in pj_ioqueue_poll() in thread %d "
206
"after %d loop: %s [pj_status_t=%d]",
207
arg->id, arg->counter, errmsg, -rc));
214
/* Calculate the bandwidth for the specific test configuration.
215
* The test is simple:
216
* - create sockpair_cnt number of producer-consumer socket pair.
217
* - create thread_cnt number of worker threads.
218
* - each producer will send buffer_size bytes data as fast and
220
* - each consumer will read buffer_size bytes of data as fast
222
* - measure the total bytes received by all consumers during a
225
static int perform_test(pj_bool_t allow_concur,
226
int sock_type, const char *type_name,
227
unsigned thread_cnt, unsigned sockpair_cnt,
228
pj_size_t buffer_size,
229
pj_size_t *p_bandwidth)
231
enum { MSEC_DURATION = 5000 };
234
pj_thread_t **thread;
235
pj_ioqueue_t *ioqueue;
237
pj_ioqueue_callback ioqueue_callback;
238
pj_uint32_t total_elapsed_usec, total_received;
239
pj_highprec_t bandwidth;
240
pj_timestamp start, stop;
243
TRACE_((THIS_FILE, " starting test.."));
245
ioqueue_callback.on_read_complete = &on_read_complete;
246
ioqueue_callback.on_write_complete = &on_write_complete;
248
thread_quit_flag = 0;
250
pool = pj_pool_create(mem, NULL, 4096, 4096, NULL);
254
items = (test_item*) pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item));
255
thread = (pj_thread_t**)
256
pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));
258
TRACE_((THIS_FILE, " creating ioqueue.."));
259
rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
260
if (rc != PJ_SUCCESS) {
261
app_perror("...error: unable to create ioqueue", rc);
265
rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
266
if (rc != PJ_SUCCESS) {
267
app_perror("...error: pj_ioqueue_set_default_concurrency()", rc);
271
/* Initialize each producer-consumer pair. */
272
for (i=0; i<sockpair_cnt; ++i) {
275
items[i].ioqueue = ioqueue;
276
items[i].buffer_size = buffer_size;
277
items[i].outgoing_buffer = (char*) pj_pool_alloc(pool, buffer_size);
278
items[i].incoming_buffer = (char*) pj_pool_alloc(pool, buffer_size);
279
items[i].bytes_recv = items[i].bytes_sent = 0;
281
/* randomize outgoing buffer. */
282
pj_create_random_string(items[i].outgoing_buffer, buffer_size);
284
/* Create socket pair. */
285
TRACE_((THIS_FILE, " calling socketpair.."));
286
rc = app_socketpair(pj_AF_INET(), sock_type, 0,
287
&items[i].server_fd, &items[i].client_fd);
288
if (rc != PJ_SUCCESS) {
289
app_perror("...error: unable to create socket pair", rc);
293
/* Register server socket to ioqueue. */
294
TRACE_((THIS_FILE, " register(1).."));
295
rc = pj_ioqueue_register_sock(pool, ioqueue,
297
&items[i], &ioqueue_callback,
298
&items[i].server_key);
299
if (rc != PJ_SUCCESS) {
300
app_perror("...error: registering server socket to ioqueue", rc);
304
/* Register client socket to ioqueue. */
305
TRACE_((THIS_FILE, " register(2).."));
306
rc = pj_ioqueue_register_sock(pool, ioqueue,
308
&items[i], &ioqueue_callback,
309
&items[i].client_key);
310
if (rc != PJ_SUCCESS) {
311
app_perror("...error: registering server socket to ioqueue", rc);
316
TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
317
bytes = items[i].buffer_size;
318
rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
319
items[i].incoming_buffer, &bytes,
321
if (rc != PJ_EPENDING) {
322
app_perror("...error: pj_ioqueue_recv", rc);
327
TRACE_((THIS_FILE, " pj_ioqueue_write.."));
328
bytes = items[i].buffer_size;
329
rc = pj_ioqueue_send(items[i].client_key, &items[i].send_op,
330
items[i].outgoing_buffer, &bytes, 0);
331
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
332
app_perror("...error: pj_ioqueue_write", rc);
336
items[i].has_pending_send = (rc==PJ_EPENDING);
339
/* Create the threads. */
340
for (i=0; i<thread_cnt; ++i) {
341
struct thread_arg *arg;
343
arg = (struct thread_arg*) pj_pool_zalloc(pool, sizeof(*arg));
345
arg->ioqueue = ioqueue;
348
rc = pj_thread_create( pool, NULL,
351
PJ_THREAD_DEFAULT_STACK_SIZE,
352
PJ_THREAD_SUSPENDED, &thread[i] );
353
if (rc != PJ_SUCCESS) {
354
app_perror("...error: unable to create thread", rc);
359
/* Mark start time. */
360
rc = pj_get_timestamp(&start);
361
if (rc != PJ_SUCCESS)
364
/* Start the thread. */
365
TRACE_((THIS_FILE, " resuming all threads.."));
366
for (i=0; i<thread_cnt; ++i) {
367
rc = pj_thread_resume(thread[i]);
372
/* Wait for MSEC_DURATION seconds.
373
* This should be as simple as pj_thread_sleep(MSEC_DURATION) actually,
374
* but unfortunately it doesn't work when system doesn't employ
375
* timeslicing for threads.
377
TRACE_((THIS_FILE, " wait for few seconds.."));
382
rc = pj_get_timestamp(&stop);
384
if (thread_quit_flag) {
385
TRACE_((THIS_FILE, " transfer limit reached.."));
389
if (pj_elapsed_usec(&start,&stop)<MSEC_DURATION * 1000) {
390
TRACE_((THIS_FILE, " time limit reached.."));
396
/* Terminate all threads. */
397
TRACE_((THIS_FILE, " terminating all threads.."));
398
thread_quit_flag = 1;
400
for (i=0; i<thread_cnt; ++i) {
401
TRACE_((THIS_FILE, " join thread %d..", i));
402
pj_thread_join(thread[i]);
405
/* Close all sockets. */
406
TRACE_((THIS_FILE, " closing all sockets.."));
407
for (i=0; i<sockpair_cnt; ++i) {
408
pj_ioqueue_unregister(items[i].server_key);
409
pj_ioqueue_unregister(items[i].client_key);
412
/* Destroy threads */
413
for (i=0; i<thread_cnt; ++i) {
414
pj_thread_destroy(thread[i]);
417
/* Destroy ioqueue. */
418
TRACE_((THIS_FILE, " destroying ioqueue.."));
419
pj_ioqueue_destroy(ioqueue);
421
/* Calculate actual time in usec. */
422
total_elapsed_usec = pj_elapsed_usec(&start, &stop);
424
/* Calculate total bytes received. */
426
for (i=0; i<sockpair_cnt; ++i) {
427
total_received = items[i].bytes_recv;
430
/* bandwidth = total_received*1000/total_elapsed_usec */
431
bandwidth = total_received;
432
pj_highprec_mul(bandwidth, 1000);
433
pj_highprec_div(bandwidth, total_elapsed_usec);
435
*p_bandwidth = (pj_uint32_t)bandwidth;
437
PJ_LOG(3,(THIS_FILE, " %.4s %2d %2d %8d KB/s",
438
type_name, thread_cnt, sockpair_cnt,
442
pj_pool_release(pool);
444
TRACE_((THIS_FILE, " done.."));
448
static int ioqueue_perf_test_imp(pj_bool_t allow_concur)
450
enum { BUF_SIZE = 512 };
454
const char *type_name;
459
{ pj_SOCK_DGRAM(), "udp", 1, 1},
460
{ pj_SOCK_DGRAM(), "udp", 1, 2},
461
{ pj_SOCK_DGRAM(), "udp", 1, 4},
462
{ pj_SOCK_DGRAM(), "udp", 1, 8},
463
{ pj_SOCK_DGRAM(), "udp", 2, 1},
464
{ pj_SOCK_DGRAM(), "udp", 2, 2},
465
{ pj_SOCK_DGRAM(), "udp", 2, 4},
466
{ pj_SOCK_DGRAM(), "udp", 2, 8},
467
{ pj_SOCK_DGRAM(), "udp", 4, 1},
468
{ pj_SOCK_DGRAM(), "udp", 4, 2},
469
{ pj_SOCK_DGRAM(), "udp", 4, 4},
470
{ pj_SOCK_DGRAM(), "udp", 4, 8},
471
{ pj_SOCK_DGRAM(), "udp", 4, 16},
472
{ pj_SOCK_STREAM(), "tcp", 1, 1},
473
{ pj_SOCK_STREAM(), "tcp", 1, 2},
474
{ pj_SOCK_STREAM(), "tcp", 1, 4},
475
{ pj_SOCK_STREAM(), "tcp", 1, 8},
476
{ pj_SOCK_STREAM(), "tcp", 2, 1},
477
{ pj_SOCK_STREAM(), "tcp", 2, 2},
478
{ pj_SOCK_STREAM(), "tcp", 2, 4},
479
{ pj_SOCK_STREAM(), "tcp", 2, 8},
480
{ pj_SOCK_STREAM(), "tcp", 4, 1},
481
{ pj_SOCK_STREAM(), "tcp", 4, 2},
482
{ pj_SOCK_STREAM(), "tcp", 4, 4},
483
{ pj_SOCK_STREAM(), "tcp", 4, 8},
484
{ pj_SOCK_STREAM(), "tcp", 4, 16},
486
{ pj_SOCK_DGRAM(), "udp", 32, 1},
487
{ pj_SOCK_DGRAM(), "udp", 32, 1},
488
{ pj_SOCK_DGRAM(), "udp", 32, 1},
489
{ pj_SOCK_DGRAM(), "udp", 32, 1},
490
{ pj_SOCK_DGRAM(), "udp", 1, 32},
491
{ pj_SOCK_DGRAM(), "udp", 1, 32},
492
{ pj_SOCK_DGRAM(), "udp", 1, 32},
493
{ pj_SOCK_DGRAM(), "udp", 1, 32},
494
{ pj_SOCK_STREAM(), "tcp", 32, 1},
495
{ pj_SOCK_STREAM(), "tcp", 32, 1},
496
{ pj_SOCK_STREAM(), "tcp", 32, 1},
497
{ pj_SOCK_STREAM(), "tcp", 32, 1},
498
{ pj_SOCK_STREAM(), "tcp", 1, 32},
499
{ pj_SOCK_STREAM(), "tcp", 1, 32},
500
{ pj_SOCK_STREAM(), "tcp", 1, 32},
501
{ pj_SOCK_STREAM(), "tcp", 1, 32},
504
pj_size_t best_bandwidth;
507
PJ_LOG(3,(THIS_FILE, " Benchmarking %s ioqueue:", pj_ioqueue_name()));
508
PJ_LOG(3,(THIS_FILE, " Testing with concurency=%d", allow_concur));
509
PJ_LOG(3,(THIS_FILE, " ======================================="));
510
PJ_LOG(3,(THIS_FILE, " Type Threads Skt.Pairs Bandwidth"));
511
PJ_LOG(3,(THIS_FILE, " ======================================="));
514
for (i=0; i<(int)(sizeof(test_param)/sizeof(test_param[0])); ++i) {
517
rc = perform_test(allow_concur,
519
test_param[i].type_name,
520
test_param[i].thread_cnt,
521
test_param[i].sockpair_cnt,
527
if (bandwidth > best_bandwidth)
528
best_bandwidth = bandwidth, best_index = i;
530
/* Give it a rest before next test, to allow system to close the
533
pj_thread_sleep(500);
537
" Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s",
538
test_param[best_index].type_name,
539
test_param[best_index].thread_cnt,
540
test_param[best_index].sockpair_cnt,
542
PJ_LOG(3,(THIS_FILE, " (Note: packet size=%d, total errors=%u)",
543
BUF_SIZE, last_error_counter));
550
int ioqueue_perf_test(void)
554
rc = ioqueue_perf_test_imp(PJ_TRUE);
558
rc = ioqueue_perf_test_imp(PJ_FALSE);
566
/* To prevent warning about "translation unit is empty"
567
* when this test is disabled.
569
int dummy_uiq_perf_test;
570
#endif /* INCLUDE_IOQUEUE_PERF_TEST */