~ubuntu-branches/ubuntu/raring/libpthread-workqueue/raring

« back to all changes in this revision

Viewing changes to testing/latency/latency.c

  • Committer: Bazaar Package Importer
  • Author(s): Mark Heily
  • Date: 2011-03-13 16:22:30 UTC
  • Revision ID: james.westby@ubuntu.com-20110313162230-yaiyoa7g3dk8xmww
Tags: upstream-0.4.1
ImportĀ upstreamĀ versionĀ 0.4.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2011 Joakim Johansson <jocke@tbricks.com>.
 
3
 *
 
4
 * @APPLE_APACHE_LICENSE_HEADER_START@
 
5
 * 
 
6
 * Licensed under the Apache License, Version 2.0 (the "License");
 
7
 * you may not use this file except in compliance with the License.
 
8
 * You may obtain a copy of the License at
 
9
 * 
 
10
 *     http://www.apache.org/licenses/LICENSE-2.0
 
11
 * 
 
12
 * Unless required by applicable law or agreed to in writing, software
 
13
 * distributed under the License is distributed on an "AS IS" BASIS,
 
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
15
 * See the License for the specific language governing permissions and
 
16
 * limitations under the License.
 
17
 * 
 
18
 * @APPLE_APACHE_LICENSE_HEADER_END@
 
19
 */
 
20
 
 
21
#include <stdio.h>
 
22
#include <stdlib.h>
 
23
#include <unistd.h>
 
24
#include <pthread.h>
 
25
#include <ctype.h>
 
26
#include <sys/time.h>
 
27
#include <string.h>
 
28
#include <errno.h>
 
29
#include <time.h>
 
30
#include "latency.h"
 
31
 
 
32
pthread_workqueue_t workqueues[WORKQUEUE_COUNT]; 
 
33
struct wq_statistics workqueue_statistics[WORKQUEUE_COUNT]; 
 
34
struct wq_event_generator workqueue_generator[GENERATOR_WORKQUEUE_COUNT];
 
35
 
 
36
struct wq_statistics global_statistics;
 
37
unsigned int global_stats_used = 0;
 
38
 
 
39
pthread_mutex_t generator_mutex;
 
40
pthread_cond_t generator_condition;
 
41
static unsigned int events_processed;
 
42
 
 
43
#define PERCENTILE_COUNT 8 
 
44
double percentiles[PERCENTILE_COUNT] = {50.0, 80.0, 98.0, 99.0, 99.5, 99.8, 99.9, 99.99};
 
45
mytime_t real_start, real_end;
 
46
 
 
47
#ifdef __APPLE__
 
48
 
 
49
#include <assert.h>
 
50
#include <CoreServices/CoreServices.h>
 
51
#include <mach/mach.h>
 
52
#include <mach/mach_time.h>
 
53
 
 
54
static mach_timebase_info_data_t    sTimebaseInfo;
 
55
 
 
56
// From http://developer.apple.com/library/mac/#qa/qa2004/qa1398.html
 
57
unsigned long gettime(void)
 
58
{
 
59
    return (mach_absolute_time() * sTimebaseInfo.numer / sTimebaseInfo.denom);
 
60
}
 
61
 
 
62
#else
 
63
 
 
64
static unsigned long gettime(void)
 
65
{
 
66
    struct timespec ts;
 
67
#ifdef __linux__
 
68
    if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0)
 
69
#else
 
70
    if (clock_gettime(CLOCK_HIGHRES, &ts) != 0)
 
71
#endif
 
72
        fprintf(stderr, "Failed to get high resolution clock! errno = %d\n", errno);   
 
73
    return ((ts.tv_sec * NANOSECONDS_PER_SECOND) + ts.tv_nsec);
 
74
}
 
75
 
 
76
#endif
 
77
 
 
78
// real resolution on solaris is at best system clock tick, i.e. 100Hz unless having the
 
79
// high res system clock (1000Hz in that case)
 
80
 
 
81
static void my_sleep(unsigned long nanoseconds)
 
82
{
 
83
        struct timespec timeout0;
 
84
        struct timespec timeout1;
 
85
        struct timespec* tmp;
 
86
        struct timespec* t0 = &timeout0;
 
87
        struct timespec* t1 = &timeout1;
 
88
        
 
89
        t0->tv_sec = nanoseconds / NANOSECONDS_PER_SECOND;
 
90
        t0->tv_nsec = nanoseconds % NANOSECONDS_PER_SECOND;
 
91
 
 
92
        while ((nanosleep(t0, t1) == (-1)) && (errno == EINTR))
 
93
        {
 
94
                tmp = t0;
 
95
                t0 = t1;
 
96
                t1 = tmp;
 
97
        }
 
98
    
 
99
    return;
 
100
}
 
101
 
 
102
static void _process_data(void* context)
 
103
{
 
104
        struct wq_event *event = (struct wq_event *) context;
 
105
        mytime_t elapsed_time;
 
106
    
 
107
        elapsed_time = gettime() - event->start_time;
 
108
    
 
109
        workqueue_statistics[event->queue_index].avg = ((workqueue_statistics[event->queue_index].count * workqueue_statistics[event->queue_index].avg) + elapsed_time) / (workqueue_statistics[event->queue_index].count + 1);
 
110
        workqueue_statistics[event->queue_index].total += elapsed_time;
 
111
        workqueue_statistics[event->queue_index].count += 1;
 
112
    
 
113
    if (elapsed_time < workqueue_statistics[event->queue_index].min || 
 
114
        workqueue_statistics[event->queue_index].min == 0) 
 
115
        workqueue_statistics[event->queue_index].min = elapsed_time;
 
116
        
 
117
    if (elapsed_time > workqueue_statistics[event->queue_index].max)
 
118
        workqueue_statistics[event->queue_index].max = elapsed_time;
 
119
    
 
120
    if ((elapsed_time / 1000) < DISTRIBUTION_BUCKETS)
 
121
        workqueue_statistics[event->queue_index].distribution[(int)(elapsed_time / 1000)] += 1;
 
122
    else  
 
123
        workqueue_statistics[event->queue_index].distribution[DISTRIBUTION_BUCKETS-1] += 1;
 
124
 
 
125
    // allow generator thread to continue when all events have been processed
 
126
    if (atomic_dec_nv(&events_processed) == 0)
 
127
    {
 
128
        pthread_mutex_lock(&generator_mutex);
 
129
        pthread_cond_signal(&generator_condition);
 
130
        pthread_mutex_unlock(&generator_mutex);
 
131
    }
 
132
    return;
 
133
}
 
134
 
 
135
// Perform a small microburst for this tick
 
136
static void _event_tick(void* context)
 
137
{
 
138
        struct wq_event *current_event;
 
139
        long i, generator_workqueue = (long) context;
 
140
    
 
141
    for (i = 0; i < EVENTS_GENERATED_PER_TICK; i++)
 
142
    {
 
143
        current_event = &workqueue_generator[generator_workqueue].wq_events[i];
 
144
        current_event->start_time = gettime();
 
145
        current_event->queue_index = (current_event->start_time % WORKQUEUE_COUNT);
 
146
        
 
147
        (void) pthread_workqueue_additem_np(workqueues[current_event->queue_index], _process_data, current_event, NULL, NULL);
 
148
    }
 
149
    
 
150
    return;
 
151
}
 
152
 
 
153
static void _generate_simulated_events()
 
154
{
 
155
        long i, tick, ticks_generated = 0, overhead;
 
156
    mytime_t start, current, overhead_start = 0, overhead_end = 0;
 
157
 
 
158
    start = current = gettime();
 
159
 
 
160
        for (tick = 0; tick < TOTAL_TICKS_TO_RUN; tick++)
 
161
        {
 
162
        start = current = overhead_end;
 
163
        overhead = overhead_end - overhead_start;
 
164
        
 
165
        // wait until we have waited proper amount of time for current rate
 
166
        // we should remove overhead of previous lap to not lag behind in data rate
 
167
        // one call to gethrtime() alone is around 211ns on Nehalem 2.93
 
168
        // use busy waiting in case the frequency is higher than the supported resolution of nanosleep()
 
169
 
 
170
        if (overhead > EVENT_TIME_SLICE)
 
171
        {
 
172
            printf("Warning: Event processing overhead > event time slice, readjust test parameters.\n");
 
173
        }
 
174
        else
 
175
        if ((EVENT_GENERATION_FREQUENCY > SYSTEM_CLOCK_RESOLUTION) || FORCE_BUSY_LOOP)
 
176
        {
 
177
            while ((current - start) < (EVENT_TIME_SLICE - overhead))
 
178
                current = gettime();            
 
179
        }
 
180
        else
 
181
        {
 
182
            my_sleep(EVENT_TIME_SLICE - overhead);
 
183
        }
 
184
        
 
185
        overhead_start = gettime();
 
186
        
 
187
        events_processed = GENERATOR_WORKQUEUE_COUNT * EVENTS_GENERATED_PER_TICK; // number of items that will be processed
 
188
        
 
189
        for (i = 0; i < GENERATOR_WORKQUEUE_COUNT; i++)
 
190
            (void) pthread_workqueue_additem_np(workqueue_generator[i].wq, _event_tick, (void *) i, NULL, NULL);
 
191
        
 
192
        // wait for all events to be processed
 
193
        pthread_mutex_lock(&generator_mutex);
 
194
        while (events_processed > 0)
 
195
            pthread_cond_wait(&generator_condition, &generator_mutex);
 
196
        pthread_mutex_unlock(&generator_mutex);
 
197
 
 
198
        overhead_end = gettime();
 
199
        }       
 
200
 
 
201
        return;
 
202
}
 
203
 
 
204
static void _gather_statistics(unsigned long queue_index)
 
205
{
 
206
    unsigned long i;
 
207
    
 
208
    if (workqueue_statistics[queue_index].count > 0)
 
209
    {
 
210
        global_stats_used ++;
 
211
        
 
212
        global_statistics.avg = ((global_statistics.count * global_statistics.avg) + (workqueue_statistics[queue_index].avg * workqueue_statistics[queue_index].count)) / (global_statistics.count + workqueue_statistics[queue_index].count);
 
213
        global_statistics.total += workqueue_statistics[queue_index].total;
 
214
        global_statistics.count += workqueue_statistics[queue_index].count;
 
215
        
 
216
        if (workqueue_statistics[queue_index].min < global_statistics.min || global_statistics.min == 0) 
 
217
            global_statistics.min = workqueue_statistics[queue_index].min;
 
218
        
 
219
        if (workqueue_statistics[queue_index].max > global_statistics.max)
 
220
            global_statistics.max = workqueue_statistics[queue_index].max;
 
221
        
 
222
        for (i = 0; i < DISTRIBUTION_BUCKETS; i++)
 
223
            global_statistics.distribution[i] += workqueue_statistics[queue_index].distribution[i];
 
224
    }
 
225
    
 
226
        return;
 
227
}
 
228
 
 
229
void _print_statistics()
 
230
{
 
231
        unsigned long i, j, total_events = 0, last_percentile = 0, accumulated_percentile = 0;
 
232
        void *events_done;
 
233
        
 
234
        printf("Collecting statistics...\n");
 
235
        
 
236
        for (i = 0; i < WORKQUEUE_COUNT; i++)
 
237
        _gather_statistics(i);
 
238
    
 
239
        printf("Test is done, run time was %.3f seconds, %.1fM events generated and processed.\n", (double)((double)(real_end - real_start) / (double) NANOSECONDS_PER_SECOND), total_events/1000000.0); 
 
240
        
 
241
        printf("Global dispatch queue aggregate statistics for %d queues: %dM events, min = %d ns, avg = %.1f ns, max = %d ns\n",
 
242
           global_stats_used, global_statistics.count/1000000, global_statistics.min, global_statistics.avg, global_statistics.max);
 
243
    
 
244
    printf("\nDistribution:\n");
 
245
    for (i = 0; i < DISTRIBUTION_BUCKETS; i++)
 
246
    {                   
 
247
        printf("%3ld us: %d ", i, global_statistics.distribution[i]);
 
248
        for (j=0; j<(((double) global_statistics.distribution[i] / (double) global_statistics.count) * 400.0); j++)
 
249
            printf("*");
 
250
        printf("\n");
 
251
    }
 
252
    
 
253
    printf("\nPercentiles:\n");
 
254
    
 
255
    for (i = 0; i < DISTRIBUTION_BUCKETS; i++)
 
256
    {        
 
257
        while ((last_percentile < PERCENTILE_COUNT) && ((100.0 * ((double) accumulated_percentile / (double) global_statistics.count)) > percentiles[last_percentile]))
 
258
        {
 
259
            printf("%.2f < %ld us\n", percentiles[last_percentile], i-1);
 
260
            last_percentile++;
 
261
        }
 
262
        accumulated_percentile += global_statistics.distribution[i];        
 
263
    }
 
264
    
 
265
    while ((last_percentile < PERCENTILE_COUNT) && ((100.0 * ((double) accumulated_percentile / (double) global_statistics.count)) > percentiles[last_percentile]))
 
266
    {
 
267
        printf("%.2f > %d us\n", percentiles[last_percentile], DISTRIBUTION_BUCKETS-1);
 
268
        last_percentile++;
 
269
    }
 
270
 
 
271
        return;
 
272
}       
 
273
 
 
274
 
 
275
int main(int argc, const char * argv[])
 
276
{
 
277
        int i;
 
278
    pthread_workqueue_attr_t attr;
 
279
    
 
280
#ifdef __APPLE__
 
281
    (void) mach_timebase_info(&sTimebaseInfo);
 
282
#endif
 
283
    
 
284
        memset(&workqueues, 0, sizeof(workqueues));
 
285
        memset(&workqueue_statistics, 0, sizeof(workqueue_statistics));
 
286
        memset(&global_statistics, 0, sizeof(global_statistics));
 
287
        memset(&workqueue_generator, 0, sizeof(workqueue_generator));
 
288
 
 
289
    pthread_mutex_init(&generator_mutex, NULL);
 
290
    pthread_cond_init(&generator_condition, NULL);
 
291
    
 
292
    if (pthread_workqueue_attr_init_np(&attr) != 0)
 
293
        fprintf(stderr, "Failed to set workqueue attributes\n");
 
294
    
 
295
    if (pthread_workqueue_attr_setqueuepriority_np(&attr, WORKQ_HIGH_PRIOQUEUE) != 0) // high prio for generators
 
296
        fprintf(stderr, "Failed to set workqueue priority\n");
 
297
 
 
298
    for (i = 0; i < GENERATOR_WORKQUEUE_COUNT; i++)
 
299
    {
 
300
        workqueue_generator[i].wq_events = malloc(sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK);
 
301
        memset(workqueue_generator[i].wq_events, 0, (sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK));
 
302
        
 
303
        if (pthread_workqueue_create_np(&workqueue_generator[i].wq, &attr) != 0)
 
304
            fprintf(stderr, "Failed to create workqueue\n");
 
305
    }
 
306
    
 
307
        for (i = 0; i < WORKQUEUE_COUNT; i++)
 
308
        {
 
309
        if (pthread_workqueue_attr_init_np(&attr) != 0)
 
310
            fprintf(stderr, "Failed to set workqueue attributes\n");
 
311
        
 
312
        if (pthread_workqueue_attr_setqueuepriority_np(&attr, (i % (WORKQ_LOW_PRIOQUEUE + 1))) != 0) // spread it round-robin in terms of prio
 
313
            fprintf(stderr, "Failed to set workqueue priority\n");
 
314
        
 
315
        if (pthread_workqueue_create_np(&workqueues[i], &attr) != 0)
 
316
            fprintf(stderr, "Failed to create workqueue\n");
 
317
        }
 
318
    
 
319
        if (SLEEP_BEFORE_START > 0)
 
320
        {
 
321
                printf("Sleeping for %d seconds to allow for processor set configuration...\n",SLEEP_BEFORE_START);
 
322
                sleep(SLEEP_BEFORE_START);              
 
323
        }
 
324
        
 
325
    printf("%d workqueues, running for %d seconds at %d Hz, %d events per tick.\n",WORKQUEUE_COUNT, SECONDS_TO_RUN, EVENT_GENERATION_FREQUENCY, EVENTS_GENERATED_PER_TICK);
 
326
    
 
327
        printf("Running %d generator threads at %dK events/s, the aggregated data rate is %dK events/s. %.2f MB is used for %.2fK events.\n",
 
328
           GENERATOR_WORKQUEUE_COUNT,AGGREGATE_DATA_RATE_PER_SECOND/1000, TOTAL_DATA_PER_SECOND/1000,
 
329
           (double) GENERATOR_WORKQUEUE_COUNT * ((sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK + sizeof(workqueues))/(1024.0*1024.0)), 
 
330
           GENERATOR_WORKQUEUE_COUNT * EVENTS_GENERATED_PER_TICK/1000.0);
 
331
 
 
332
    real_start = gettime();
 
333
    
 
334
    _generate_simulated_events();
 
335
 
 
336
    real_end = gettime();
 
337
 
 
338
    _print_statistics();
 
339
        
 
340
        return 0;
 
341
}