2
* Copyright (c) 2011 Joakim Johansson <jocke@tbricks.com>.
4
* @APPLE_APACHE_LICENSE_HEADER_START@
6
* Licensed under the Apache License, Version 2.0 (the "License");
7
* you may not use this file except in compliance with the License.
8
* You may obtain a copy of the License at
10
* http://www.apache.org/licenses/LICENSE-2.0
12
* Unless required by applicable law or agreed to in writing, software
13
* distributed under the License is distributed on an "AS IS" BASIS,
14
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
* See the License for the specific language governing permissions and
16
* limitations under the License.
18
* @APPLE_APACHE_LICENSE_HEADER_END@
32
pthread_workqueue_t workqueues[WORKQUEUE_COUNT];
33
struct wq_statistics workqueue_statistics[WORKQUEUE_COUNT];
34
struct wq_event_generator workqueue_generator[GENERATOR_WORKQUEUE_COUNT];
36
struct wq_statistics global_statistics;
37
unsigned int global_stats_used = 0;
39
pthread_mutex_t generator_mutex;
40
pthread_cond_t generator_condition;
41
static unsigned int events_processed;
43
#define PERCENTILE_COUNT 8
44
double percentiles[PERCENTILE_COUNT] = {50.0, 80.0, 98.0, 99.0, 99.5, 99.8, 99.9, 99.99};
45
mytime_t real_start, real_end;
50
#include <CoreServices/CoreServices.h>
51
#include <mach/mach.h>
52
#include <mach/mach_time.h>
54
static mach_timebase_info_data_t sTimebaseInfo;
56
// From http://developer.apple.com/library/mac/#qa/qa2004/qa1398.html
57
unsigned long gettime(void)
59
return (mach_absolute_time() * sTimebaseInfo.numer / sTimebaseInfo.denom);
64
static unsigned long gettime(void)
68
if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0)
70
if (clock_gettime(CLOCK_HIGHRES, &ts) != 0)
72
fprintf(stderr, "Failed to get high resolution clock! errno = %d\n", errno);
73
return ((ts.tv_sec * NANOSECONDS_PER_SECOND) + ts.tv_nsec);
78
// real resolution on solaris is at best system clock tick, i.e. 100Hz unless having the
79
// high res system clock (1000Hz in that case)
81
static void my_sleep(unsigned long nanoseconds)
83
struct timespec timeout0;
84
struct timespec timeout1;
86
struct timespec* t0 = &timeout0;
87
struct timespec* t1 = &timeout1;
89
t0->tv_sec = nanoseconds / NANOSECONDS_PER_SECOND;
90
t0->tv_nsec = nanoseconds % NANOSECONDS_PER_SECOND;
92
while ((nanosleep(t0, t1) == (-1)) && (errno == EINTR))
102
static void _process_data(void* context)
104
struct wq_event *event = (struct wq_event *) context;
105
mytime_t elapsed_time;
107
elapsed_time = gettime() - event->start_time;
109
workqueue_statistics[event->queue_index].avg = ((workqueue_statistics[event->queue_index].count * workqueue_statistics[event->queue_index].avg) + elapsed_time) / (workqueue_statistics[event->queue_index].count + 1);
110
workqueue_statistics[event->queue_index].total += elapsed_time;
111
workqueue_statistics[event->queue_index].count += 1;
113
if (elapsed_time < workqueue_statistics[event->queue_index].min ||
114
workqueue_statistics[event->queue_index].min == 0)
115
workqueue_statistics[event->queue_index].min = elapsed_time;
117
if (elapsed_time > workqueue_statistics[event->queue_index].max)
118
workqueue_statistics[event->queue_index].max = elapsed_time;
120
if ((elapsed_time / 1000) < DISTRIBUTION_BUCKETS)
121
workqueue_statistics[event->queue_index].distribution[(int)(elapsed_time / 1000)] += 1;
123
workqueue_statistics[event->queue_index].distribution[DISTRIBUTION_BUCKETS-1] += 1;
125
// allow generator thread to continue when all events have been processed
126
if (atomic_dec_nv(&events_processed) == 0)
128
pthread_mutex_lock(&generator_mutex);
129
pthread_cond_signal(&generator_condition);
130
pthread_mutex_unlock(&generator_mutex);
135
// Perform a small microburst for this tick
136
static void _event_tick(void* context)
138
struct wq_event *current_event;
139
long i, generator_workqueue = (long) context;
141
for (i = 0; i < EVENTS_GENERATED_PER_TICK; i++)
143
current_event = &workqueue_generator[generator_workqueue].wq_events[i];
144
current_event->start_time = gettime();
145
current_event->queue_index = (current_event->start_time % WORKQUEUE_COUNT);
147
(void) pthread_workqueue_additem_np(workqueues[current_event->queue_index], _process_data, current_event, NULL, NULL);
153
static void _generate_simulated_events()
155
long i, tick, ticks_generated = 0, overhead;
156
mytime_t start, current, overhead_start = 0, overhead_end = 0;
158
start = current = gettime();
160
for (tick = 0; tick < TOTAL_TICKS_TO_RUN; tick++)
162
start = current = overhead_end;
163
overhead = overhead_end - overhead_start;
165
// wait until we have waited proper amount of time for current rate
166
// we should remove overhead of previous lap to not lag behind in data rate
167
// one call to gethrtime() alone is around 211ns on Nehalem 2.93
168
// use busy waiting in case the frequency is higher than the supported resolution of nanosleep()
170
if (overhead > EVENT_TIME_SLICE)
172
printf("Warning: Event processing overhead > event time slice, readjust test parameters.\n");
175
if ((EVENT_GENERATION_FREQUENCY > SYSTEM_CLOCK_RESOLUTION) || FORCE_BUSY_LOOP)
177
while ((current - start) < (EVENT_TIME_SLICE - overhead))
182
my_sleep(EVENT_TIME_SLICE - overhead);
185
overhead_start = gettime();
187
events_processed = GENERATOR_WORKQUEUE_COUNT * EVENTS_GENERATED_PER_TICK; // number of items that will be processed
189
for (i = 0; i < GENERATOR_WORKQUEUE_COUNT; i++)
190
(void) pthread_workqueue_additem_np(workqueue_generator[i].wq, _event_tick, (void *) i, NULL, NULL);
192
// wait for all events to be processed
193
pthread_mutex_lock(&generator_mutex);
194
while (events_processed > 0)
195
pthread_cond_wait(&generator_condition, &generator_mutex);
196
pthread_mutex_unlock(&generator_mutex);
198
overhead_end = gettime();
204
static void _gather_statistics(unsigned long queue_index)
208
if (workqueue_statistics[queue_index].count > 0)
210
global_stats_used ++;
212
global_statistics.avg = ((global_statistics.count * global_statistics.avg) + (workqueue_statistics[queue_index].avg * workqueue_statistics[queue_index].count)) / (global_statistics.count + workqueue_statistics[queue_index].count);
213
global_statistics.total += workqueue_statistics[queue_index].total;
214
global_statistics.count += workqueue_statistics[queue_index].count;
216
if (workqueue_statistics[queue_index].min < global_statistics.min || global_statistics.min == 0)
217
global_statistics.min = workqueue_statistics[queue_index].min;
219
if (workqueue_statistics[queue_index].max > global_statistics.max)
220
global_statistics.max = workqueue_statistics[queue_index].max;
222
for (i = 0; i < DISTRIBUTION_BUCKETS; i++)
223
global_statistics.distribution[i] += workqueue_statistics[queue_index].distribution[i];
229
void _print_statistics()
231
unsigned long i, j, total_events = 0, last_percentile = 0, accumulated_percentile = 0;
234
printf("Collecting statistics...\n");
236
for (i = 0; i < WORKQUEUE_COUNT; i++)
237
_gather_statistics(i);
239
printf("Test is done, run time was %.3f seconds, %.1fM events generated and processed.\n", (double)((double)(real_end - real_start) / (double) NANOSECONDS_PER_SECOND), total_events/1000000.0);
241
printf("Global dispatch queue aggregate statistics for %d queues: %dM events, min = %d ns, avg = %.1f ns, max = %d ns\n",
242
global_stats_used, global_statistics.count/1000000, global_statistics.min, global_statistics.avg, global_statistics.max);
244
printf("\nDistribution:\n");
245
for (i = 0; i < DISTRIBUTION_BUCKETS; i++)
247
printf("%3ld us: %d ", i, global_statistics.distribution[i]);
248
for (j=0; j<(((double) global_statistics.distribution[i] / (double) global_statistics.count) * 400.0); j++)
253
printf("\nPercentiles:\n");
255
for (i = 0; i < DISTRIBUTION_BUCKETS; i++)
257
while ((last_percentile < PERCENTILE_COUNT) && ((100.0 * ((double) accumulated_percentile / (double) global_statistics.count)) > percentiles[last_percentile]))
259
printf("%.2f < %ld us\n", percentiles[last_percentile], i-1);
262
accumulated_percentile += global_statistics.distribution[i];
265
while ((last_percentile < PERCENTILE_COUNT) && ((100.0 * ((double) accumulated_percentile / (double) global_statistics.count)) > percentiles[last_percentile]))
267
printf("%.2f > %d us\n", percentiles[last_percentile], DISTRIBUTION_BUCKETS-1);
275
int main(int argc, const char * argv[])
278
pthread_workqueue_attr_t attr;
281
(void) mach_timebase_info(&sTimebaseInfo);
284
memset(&workqueues, 0, sizeof(workqueues));
285
memset(&workqueue_statistics, 0, sizeof(workqueue_statistics));
286
memset(&global_statistics, 0, sizeof(global_statistics));
287
memset(&workqueue_generator, 0, sizeof(workqueue_generator));
289
pthread_mutex_init(&generator_mutex, NULL);
290
pthread_cond_init(&generator_condition, NULL);
292
if (pthread_workqueue_attr_init_np(&attr) != 0)
293
fprintf(stderr, "Failed to set workqueue attributes\n");
295
if (pthread_workqueue_attr_setqueuepriority_np(&attr, WORKQ_HIGH_PRIOQUEUE) != 0) // high prio for generators
296
fprintf(stderr, "Failed to set workqueue priority\n");
298
for (i = 0; i < GENERATOR_WORKQUEUE_COUNT; i++)
300
workqueue_generator[i].wq_events = malloc(sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK);
301
memset(workqueue_generator[i].wq_events, 0, (sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK));
303
if (pthread_workqueue_create_np(&workqueue_generator[i].wq, &attr) != 0)
304
fprintf(stderr, "Failed to create workqueue\n");
307
for (i = 0; i < WORKQUEUE_COUNT; i++)
309
if (pthread_workqueue_attr_init_np(&attr) != 0)
310
fprintf(stderr, "Failed to set workqueue attributes\n");
312
if (pthread_workqueue_attr_setqueuepriority_np(&attr, (i % (WORKQ_LOW_PRIOQUEUE + 1))) != 0) // spread it round-robin in terms of prio
313
fprintf(stderr, "Failed to set workqueue priority\n");
315
if (pthread_workqueue_create_np(&workqueues[i], &attr) != 0)
316
fprintf(stderr, "Failed to create workqueue\n");
319
if (SLEEP_BEFORE_START > 0)
321
printf("Sleeping for %d seconds to allow for processor set configuration...\n",SLEEP_BEFORE_START);
322
sleep(SLEEP_BEFORE_START);
325
printf("%d workqueues, running for %d seconds at %d Hz, %d events per tick.\n",WORKQUEUE_COUNT, SECONDS_TO_RUN, EVENT_GENERATION_FREQUENCY, EVENTS_GENERATED_PER_TICK);
327
printf("Running %d generator threads at %dK events/s, the aggregated data rate is %dK events/s. %.2f MB is used for %.2fK events.\n",
328
GENERATOR_WORKQUEUE_COUNT,AGGREGATE_DATA_RATE_PER_SECOND/1000, TOTAL_DATA_PER_SECOND/1000,
329
(double) GENERATOR_WORKQUEUE_COUNT * ((sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK + sizeof(workqueues))/(1024.0*1024.0)),
330
GENERATOR_WORKQUEUE_COUNT * EVENTS_GENERATED_PER_TICK/1000.0);
332
real_start = gettime();
334
_generate_simulated_events();
336
real_end = gettime();