1
/* $Id: ioq_perf.c 2394 2008-12-23 17:27:53Z bennylp $ */
3
* Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
* Additional permission under GNU GPL version 3 section 7:
22
* If you modify this program, or any covered work, by linking or
23
* combining it with the OpenSSL project's OpenSSL library (or a
24
* modified version of that library), containing parts covered by the
25
* terms of the OpenSSL or SSLeay licenses, Teluu Inc. (http://www.teluu.com)
26
* grants you additional permission to convey the resulting work.
27
* Corresponding Source for a non-source form of such a combination
28
* shall include the source code for the parts of OpenSSL used as well
29
* as that of the covered work.
33
#include <pj/compat/high_precision.h>
36
* \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance
38
* Test the performance of the I/O queue, using typical producer
39
* consumer test. The test should examine the effect of using multiple
40
* threads on the performance.
42
* This file is <b>pjlib-test/ioq_perf.c</b>
44
* \include pjlib-test/ioq_perf.c
47
#if INCLUDE_IOQUEUE_PERF_TEST
50
# pragma warning ( disable: 4204) // non-constant aggregate initializer
53
#define THIS_FILE "ioq_perf"
54
//#define TRACE_(expr) PJ_LOG(3,expr)
58
static pj_bool_t thread_quit_flag;
59
static pj_status_t last_error;
60
static unsigned last_error_counter;
62
/* Descriptor for each producer/consumer pair. */
63
typedef struct test_item
67
pj_ioqueue_t *ioqueue;
68
pj_ioqueue_key_t *server_key,
70
pj_ioqueue_op_key_t recv_op,
73
pj_size_t buffer_size;
74
char *outgoing_buffer;
75
char *incoming_buffer;
80
/* Callback when data has been read.
81
* Increment item->bytes_recv and ready to read the next data.
83
static void on_read_complete(pj_ioqueue_key_t *key,
84
pj_ioqueue_op_key_t *op_key,
85
pj_ssize_t bytes_read)
87
test_item *item = (test_item*)pj_ioqueue_get_user_data(key);
89
int data_is_available = 1;
91
//TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read));
98
pj_status_t rc = -bytes_read;
99
char errmsg[PJ_ERR_MSG_SIZE];
101
if (rc != last_error) {
103
pj_strerror(rc, errmsg, sizeof(errmsg));
104
PJ_LOG(3,(THIS_FILE,"...error: read error, bytes_read=%d (%s)",
105
bytes_read, errmsg));
107
".....additional info: total read=%u, total sent=%u",
108
item->bytes_recv, item->bytes_sent));
110
last_error_counter++;
114
} else if (bytes_read == 0) {
115
PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
118
item->bytes_recv += bytes_read;
120
/* To assure that the test quits, even if main thread
121
* doesn't have time to run.
123
if (item->bytes_recv > item->buffer_size * 10000)
124
thread_quit_flag = 1;
126
bytes_read = item->buffer_size;
127
rc = pj_ioqueue_recv( key, op_key,
128
item->incoming_buffer, &bytes_read, 0 );
130
if (rc == PJ_SUCCESS) {
131
data_is_available = 1;
132
} else if (rc == PJ_EPENDING) {
133
data_is_available = 0;
135
data_is_available = 0;
136
if (rc != last_error) {
138
app_perror("...error: read error(1)", rc);
140
last_error_counter++;
144
if (!item->has_pending_send) {
145
pj_ssize_t sent = item->buffer_size;
146
rc = pj_ioqueue_send(item->client_key, &item->send_op,
147
item->outgoing_buffer, &sent, 0);
148
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
149
app_perror("...error: write error", rc);
152
item->has_pending_send = (rc==PJ_EPENDING);
155
} while (data_is_available);
158
/* Callback when data has been written.
159
* Increment item->bytes_sent and write the next data.
161
static void on_write_complete(pj_ioqueue_key_t *key,
162
pj_ioqueue_op_key_t *op_key,
163
pj_ssize_t bytes_sent)
165
test_item *item = (test_item*) pj_ioqueue_get_user_data(key);
167
//TRACE_((THIS_FILE, " write complete: sent = %d", bytes_sent));
169
if (thread_quit_flag)
172
item->has_pending_send = 0;
173
item->bytes_sent += bytes_sent;
175
if (bytes_sent <= 0) {
176
PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d",
182
bytes_sent = item->buffer_size;
183
rc = pj_ioqueue_send( item->client_key, op_key,
184
item->outgoing_buffer, &bytes_sent, 0);
185
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
186
app_perror("...error: write error", rc);
189
item->has_pending_send = (rc==PJ_EPENDING);
196
pj_ioqueue_t *ioqueue;
200
/* The worker thread. */
201
static int worker_thread(void *p)
203
struct thread_arg *arg = (struct thread_arg*) p;
204
const pj_time_val timeout = {0, 100};
207
while (!thread_quit_flag) {
210
rc = pj_ioqueue_poll(arg->ioqueue, &timeout);
211
//TRACE_((THIS_FILE, " thread: poll returned rc=%d", rc));
213
char errmsg[PJ_ERR_MSG_SIZE];
214
pj_strerror(-rc, errmsg, sizeof(errmsg));
215
PJ_LOG(3, (THIS_FILE,
216
"...error in pj_ioqueue_poll() in thread %d "
217
"after %d loop: %s [pj_status_t=%d]",
218
arg->id, arg->counter, errmsg, -rc));
225
/* Calculate the bandwidth for the specific test configuration.
226
* The test is simple:
227
* - create sockpair_cnt number of producer-consumer socket pair.
228
* - create thread_cnt number of worker threads.
229
* - each producer will send buffer_size bytes data as fast and
231
* - each consumer will read buffer_size bytes of data as fast
233
* - measure the total bytes received by all consumers during a
236
static int perform_test(pj_bool_t allow_concur,
237
int sock_type, const char *type_name,
238
unsigned thread_cnt, unsigned sockpair_cnt,
239
pj_size_t buffer_size,
240
pj_size_t *p_bandwidth)
242
enum { MSEC_DURATION = 5000 };
245
pj_thread_t **thread;
246
pj_ioqueue_t *ioqueue;
248
pj_ioqueue_callback ioqueue_callback;
249
pj_uint32_t total_elapsed_usec, total_received;
250
pj_highprec_t bandwidth;
251
pj_timestamp start, stop;
254
TRACE_((THIS_FILE, " starting test.."));
256
ioqueue_callback.on_read_complete = &on_read_complete;
257
ioqueue_callback.on_write_complete = &on_write_complete;
259
thread_quit_flag = 0;
261
pool = pj_pool_create(mem, NULL, 4096, 4096, NULL);
265
items = (test_item*) pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item));
266
thread = (pj_thread_t**)
267
pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));
269
TRACE_((THIS_FILE, " creating ioqueue.."));
270
rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
271
if (rc != PJ_SUCCESS) {
272
app_perror("...error: unable to create ioqueue", rc);
276
rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
277
if (rc != PJ_SUCCESS) {
278
app_perror("...error: pj_ioqueue_set_default_concurrency()", rc);
282
/* Initialize each producer-consumer pair. */
283
for (i=0; i<sockpair_cnt; ++i) {
286
items[i].ioqueue = ioqueue;
287
items[i].buffer_size = buffer_size;
288
items[i].outgoing_buffer = (char*) pj_pool_alloc(pool, buffer_size);
289
items[i].incoming_buffer = (char*) pj_pool_alloc(pool, buffer_size);
290
items[i].bytes_recv = items[i].bytes_sent = 0;
292
/* randomize outgoing buffer. */
293
pj_create_random_string(items[i].outgoing_buffer, buffer_size);
295
/* Create socket pair. */
296
TRACE_((THIS_FILE, " calling socketpair.."));
297
rc = app_socketpair(pj_AF_INET(), sock_type, 0,
298
&items[i].server_fd, &items[i].client_fd);
299
if (rc != PJ_SUCCESS) {
300
app_perror("...error: unable to create socket pair", rc);
304
/* Register server socket to ioqueue. */
305
TRACE_((THIS_FILE, " register(1).."));
306
rc = pj_ioqueue_register_sock(pool, ioqueue,
308
&items[i], &ioqueue_callback,
309
&items[i].server_key);
310
if (rc != PJ_SUCCESS) {
311
app_perror("...error: registering server socket to ioqueue", rc);
315
/* Register client socket to ioqueue. */
316
TRACE_((THIS_FILE, " register(2).."));
317
rc = pj_ioqueue_register_sock(pool, ioqueue,
319
&items[i], &ioqueue_callback,
320
&items[i].client_key);
321
if (rc != PJ_SUCCESS) {
322
app_perror("...error: registering server socket to ioqueue", rc);
327
TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
328
bytes = items[i].buffer_size;
329
rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
330
items[i].incoming_buffer, &bytes,
332
if (rc != PJ_EPENDING) {
333
app_perror("...error: pj_ioqueue_recv", rc);
338
TRACE_((THIS_FILE, " pj_ioqueue_write.."));
339
bytes = items[i].buffer_size;
340
rc = pj_ioqueue_send(items[i].client_key, &items[i].send_op,
341
items[i].outgoing_buffer, &bytes, 0);
342
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
343
app_perror("...error: pj_ioqueue_write", rc);
347
items[i].has_pending_send = (rc==PJ_EPENDING);
350
/* Create the threads. */
351
for (i=0; i<thread_cnt; ++i) {
352
struct thread_arg *arg;
354
arg = (struct thread_arg*) pj_pool_zalloc(pool, sizeof(*arg));
356
arg->ioqueue = ioqueue;
359
rc = pj_thread_create( pool, NULL,
362
PJ_THREAD_DEFAULT_STACK_SIZE,
363
PJ_THREAD_SUSPENDED, &thread[i] );
364
if (rc != PJ_SUCCESS) {
365
app_perror("...error: unable to create thread", rc);
370
/* Mark start time. */
371
rc = pj_get_timestamp(&start);
372
if (rc != PJ_SUCCESS)
375
/* Start the thread. */
376
TRACE_((THIS_FILE, " resuming all threads.."));
377
for (i=0; i<thread_cnt; ++i) {
378
rc = pj_thread_resume(thread[i]);
383
/* Wait for MSEC_DURATION seconds.
384
* This should be as simple as pj_thread_sleep(MSEC_DURATION) actually,
385
* but unfortunately it doesn't work when system doesn't employ
386
* timeslicing for threads.
388
TRACE_((THIS_FILE, " wait for few seconds.."));
393
rc = pj_get_timestamp(&stop);
395
if (thread_quit_flag) {
396
TRACE_((THIS_FILE, " transfer limit reached.."));
400
if (pj_elapsed_usec(&start,&stop)<MSEC_DURATION * 1000) {
401
TRACE_((THIS_FILE, " time limit reached.."));
407
/* Terminate all threads. */
408
TRACE_((THIS_FILE, " terminating all threads.."));
409
thread_quit_flag = 1;
411
for (i=0; i<thread_cnt; ++i) {
412
TRACE_((THIS_FILE, " join thread %d..", i));
413
pj_thread_join(thread[i]);
416
/* Close all sockets. */
417
TRACE_((THIS_FILE, " closing all sockets.."));
418
for (i=0; i<sockpair_cnt; ++i) {
419
pj_ioqueue_unregister(items[i].server_key);
420
pj_ioqueue_unregister(items[i].client_key);
423
/* Destroy threads */
424
for (i=0; i<thread_cnt; ++i) {
425
pj_thread_destroy(thread[i]);
428
/* Destroy ioqueue. */
429
TRACE_((THIS_FILE, " destroying ioqueue.."));
430
pj_ioqueue_destroy(ioqueue);
432
/* Calculate actual time in usec. */
433
total_elapsed_usec = pj_elapsed_usec(&start, &stop);
435
/* Calculate total bytes received. */
437
for (i=0; i<sockpair_cnt; ++i) {
438
total_received = items[i].bytes_recv;
441
/* bandwidth = total_received*1000/total_elapsed_usec */
442
bandwidth = total_received;
443
pj_highprec_mul(bandwidth, 1000);
444
pj_highprec_div(bandwidth, total_elapsed_usec);
446
*p_bandwidth = (pj_uint32_t)bandwidth;
448
PJ_LOG(3,(THIS_FILE, " %.4s %2d %2d %8d KB/s",
449
type_name, thread_cnt, sockpair_cnt,
453
pj_pool_release(pool);
455
TRACE_((THIS_FILE, " done.."));
459
static int ioqueue_perf_test_imp(pj_bool_t allow_concur)
461
enum { BUF_SIZE = 512 };
465
const char *type_name;
470
{ pj_SOCK_DGRAM(), "udp", 1, 1},
471
{ pj_SOCK_DGRAM(), "udp", 1, 2},
472
{ pj_SOCK_DGRAM(), "udp", 1, 4},
473
{ pj_SOCK_DGRAM(), "udp", 1, 8},
474
{ pj_SOCK_DGRAM(), "udp", 2, 1},
475
{ pj_SOCK_DGRAM(), "udp", 2, 2},
476
{ pj_SOCK_DGRAM(), "udp", 2, 4},
477
{ pj_SOCK_DGRAM(), "udp", 2, 8},
478
{ pj_SOCK_DGRAM(), "udp", 4, 1},
479
{ pj_SOCK_DGRAM(), "udp", 4, 2},
480
{ pj_SOCK_DGRAM(), "udp", 4, 4},
481
{ pj_SOCK_DGRAM(), "udp", 4, 8},
482
{ pj_SOCK_DGRAM(), "udp", 4, 16},
483
{ pj_SOCK_STREAM(), "tcp", 1, 1},
484
{ pj_SOCK_STREAM(), "tcp", 1, 2},
485
{ pj_SOCK_STREAM(), "tcp", 1, 4},
486
{ pj_SOCK_STREAM(), "tcp", 1, 8},
487
{ pj_SOCK_STREAM(), "tcp", 2, 1},
488
{ pj_SOCK_STREAM(), "tcp", 2, 2},
489
{ pj_SOCK_STREAM(), "tcp", 2, 4},
490
{ pj_SOCK_STREAM(), "tcp", 2, 8},
491
{ pj_SOCK_STREAM(), "tcp", 4, 1},
492
{ pj_SOCK_STREAM(), "tcp", 4, 2},
493
{ pj_SOCK_STREAM(), "tcp", 4, 4},
494
{ pj_SOCK_STREAM(), "tcp", 4, 8},
495
{ pj_SOCK_STREAM(), "tcp", 4, 16},
497
{ pj_SOCK_DGRAM(), "udp", 32, 1},
498
{ pj_SOCK_DGRAM(), "udp", 32, 1},
499
{ pj_SOCK_DGRAM(), "udp", 32, 1},
500
{ pj_SOCK_DGRAM(), "udp", 32, 1},
501
{ pj_SOCK_DGRAM(), "udp", 1, 32},
502
{ pj_SOCK_DGRAM(), "udp", 1, 32},
503
{ pj_SOCK_DGRAM(), "udp", 1, 32},
504
{ pj_SOCK_DGRAM(), "udp", 1, 32},
505
{ pj_SOCK_STREAM(), "tcp", 32, 1},
506
{ pj_SOCK_STREAM(), "tcp", 32, 1},
507
{ pj_SOCK_STREAM(), "tcp", 32, 1},
508
{ pj_SOCK_STREAM(), "tcp", 32, 1},
509
{ pj_SOCK_STREAM(), "tcp", 1, 32},
510
{ pj_SOCK_STREAM(), "tcp", 1, 32},
511
{ pj_SOCK_STREAM(), "tcp", 1, 32},
512
{ pj_SOCK_STREAM(), "tcp", 1, 32},
515
pj_size_t best_bandwidth;
518
PJ_LOG(3,(THIS_FILE, " Benchmarking %s ioqueue:", pj_ioqueue_name()));
519
PJ_LOG(3,(THIS_FILE, " Testing with concurency=%d", allow_concur));
520
PJ_LOG(3,(THIS_FILE, " ======================================="));
521
PJ_LOG(3,(THIS_FILE, " Type Threads Skt.Pairs Bandwidth"));
522
PJ_LOG(3,(THIS_FILE, " ======================================="));
525
for (i=0; i<(int)(sizeof(test_param)/sizeof(test_param[0])); ++i) {
528
rc = perform_test(allow_concur,
530
test_param[i].type_name,
531
test_param[i].thread_cnt,
532
test_param[i].sockpair_cnt,
538
if (bandwidth > best_bandwidth)
539
best_bandwidth = bandwidth, best_index = i;
541
/* Give it a rest before next test, to allow system to close the
544
pj_thread_sleep(500);
548
" Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s",
549
test_param[best_index].type_name,
550
test_param[best_index].thread_cnt,
551
test_param[best_index].sockpair_cnt,
553
PJ_LOG(3,(THIS_FILE, " (Note: packet size=%d, total errors=%u)",
554
BUF_SIZE, last_error_counter));
561
int ioqueue_perf_test(void)
565
rc = ioqueue_perf_test_imp(PJ_TRUE);
569
rc = ioqueue_perf_test_imp(PJ_FALSE);
577
/* To prevent warning about "translation unit is empty"
578
* when this test is disabled.
580
int dummy_uiq_perf_test;
581
#endif /* INCLUDE_IOQUEUE_PERF_TEST */