~ps10gel/ubuntu/xenial/trafficserver/6.2.0

« back to all changes in this revision

Viewing changes to iocore/aio/test_AIO.cc

  • Committer: Package Import Robot
  • Author(s): Aron Xu
  • Date: 2013-05-09 01:00:04 UTC
  • mto: (1.1.11) (5.3.3 experimental)
  • mto: This revision was merged to the branch mainline in revision 15.
  • Revision ID: package-import@ubuntu.com-20130509010004-9fqq9n0adseg3f8w
Tags: upstream-3.3.2
ImportĀ upstreamĀ versionĀ 3.3.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/** @file
 
2
 
 
3
  A brief file description
 
4
 
 
5
  @section license License
 
6
 
 
7
  Licensed to the Apache Software Foundation (ASF) under one
 
8
  or more contributor license agreements.  See the NOTICE file
 
9
  distributed with this work for additional information
 
10
  regarding copyright ownership.  The ASF licenses this file
 
11
  to you under the Apache License, Version 2.0 (the
 
12
  "License"); you may not use this file except in compliance
 
13
  with the License.  You may obtain a copy of the License at
 
14
 
 
15
      http://www.apache.org/licenses/LICENSE-2.0
 
16
 
 
17
  Unless required by applicable law or agreed to in writing, software
 
18
  distributed under the License is distributed on an "AS IS" BASIS,
 
19
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
20
  See the License for the specific language governing permissions and
 
21
  limitations under the License.
 
22
 */
 
23
 
 
24
#include "P_AIO.h"
 
25
#include "InkAPIInternal.h"
 
26
#include "I_Layout.h"
 
27
#include <iostream>
 
28
#include <fstream>
 
29
 
 
30
using std::cout;
 
31
using std::endl;
 
32
 
 
33
Diags *diags;
 
34
int diags_init = 0;
 
35
#define DIAGS_LOG_FILE "diags.log"
 
36
 
 
37
void syslog_thr_init(void)
 
38
{
 
39
}
 
40
 
 
41
//////////////////////////////////////////////////////////////////////////////
 
42
//
 
43
//      void reconfigure_diags()
 
44
//
 
45
//      This function extracts the current diags configuration settings from
 
46
//      records.config, and rebuilds the Diags data structures.
 
47
//
 
48
//////////////////////////////////////////////////////////////////////////////
 
49
 
 
50
static void
 
51
reconfigure_diags()
 
52
{
 
53
  int i;
 
54
  DiagsConfigState c;
 
55
 
 
56
  // initial value set to 0 or 1 based on command line tags
 
57
  c.enabled[DiagsTagType_Debug] = (diags->base_debug_tags != NULL);
 
58
  c.enabled[DiagsTagType_Action] = (diags->base_action_tags != NULL);
 
59
 
 
60
  c.enabled[DiagsTagType_Debug] = 1;
 
61
  c.enabled[DiagsTagType_Action] = 1;
 
62
  diags->show_location = 1;
 
63
 
 
64
  // read output routing values
 
65
  for (i = 0; i < DiagsLevel_Count; i++) {
 
66
 
 
67
    c.outputs[i].to_stdout = 0;
 
68
    c.outputs[i].to_stderr = 1;
 
69
    c.outputs[i].to_syslog = 1;
 
70
    c.outputs[i].to_diagslog = 1;
 
71
  }
 
72
 
 
73
  //////////////////////////////
 
74
  // clear out old tag tables //
 
75
  //////////////////////////////
 
76
 
 
77
  diags->deactivate_all(DiagsTagType_Debug);
 
78
  diags->deactivate_all(DiagsTagType_Action);
 
79
 
 
80
  //////////////////////////////////////////////////////////////////////
 
81
  //                     add new tag tables
 
82
  //////////////////////////////////////////////////////////////////////
 
83
 
 
84
  if (diags->base_debug_tags)
 
85
    diags->activate_taglist(diags->base_debug_tags, DiagsTagType_Debug);
 
86
  if (diags->base_action_tags)
 
87
    diags->activate_taglist(diags->base_action_tags, DiagsTagType_Action);
 
88
 
 
89
  ////////////////////////////////////
 
90
  // change the diags config values //
 
91
  ////////////////////////////////////
 
92
#if !defined(__GNUC__) && !defined(hpux)
 
93
  diags->config = c;
 
94
#else
 
95
  memcpy(((void *) &diags->config), ((void *) &c), sizeof(DiagsConfigState));
 
96
#endif
 
97
 
 
98
}
 
99
 
 
100
static void
 
101
init_diags(const char *bdt, const char *bat)
 
102
{
 
103
  FILE *diags_log_fp;
 
104
  char diags_logpath[500];
 
105
  strcpy(diags_logpath, DIAGS_LOG_FILE);
 
106
 
 
107
  diags_log_fp = fopen(diags_logpath, "w");
 
108
  if (diags_log_fp) {
 
109
    int status;
 
110
    status = setvbuf(diags_log_fp, NULL, _IOLBF, 512);
 
111
    if (status != 0) {
 
112
      fclose(diags_log_fp);
 
113
      diags_log_fp = NULL;
 
114
    }
 
115
  }
 
116
 
 
117
  diags = NEW(new Diags(bdt, bat, diags_log_fp));
 
118
 
 
119
  if (diags_log_fp == NULL) {
 
120
    Warning("couldn't open diags log file '%s', " "will not log to this file", diags_logpath);
 
121
  }
 
122
 
 
123
  Status("opened %s", diags_logpath);
 
124
  reconfigure_diags();
 
125
 
 
126
}
 
127
 
 
128
#define MAX_DISK_THREADS 200
 
129
#ifdef DISK_ALIGN
 
130
#define MIN_OFFSET       (32*1024)
 
131
#else
 
132
#define MIN_OFFSET       (8*1024)
 
133
#endif
 
134
enum
 
135
{ READ_MODE, WRITE_MODE, RANDOM_READ_MODE };
 
136
 
 
137
struct AIO_Device;
 
138
volatile int n_accessors = 0;
 
139
int orig_n_accessors;
 
140
AIO_Device *dev[MAX_DISK_THREADS];
 
141
 
 
142
extern RecInt cache_config_threads_per_disk;
 
143
 
 
144
int write_after = 0;
 
145
int write_skip = 0;
 
146
int hotset_size = 20;
 
147
double hotset_frequency = 0.9;
 
148
int touch_data = 0;
 
149
int disk_size = 4000;
 
150
int read_size = 1024;
 
151
char *disk_path[MAX_DISK_THREADS];
 
152
int n_disk_path = 0;
 
153
int run_time = 0;
 
154
int threads_per_disk = 1;
 
155
int delete_disks = 0;
 
156
int max_size = 0;
 
157
int use_lseek = 0;
 
158
 
 
159
int chains = 1;
 
160
double seq_read_percent = 0.0;
 
161
double seq_write_percent = 0.0;
 
162
double rand_read_percent = 0.0;
 
163
double real_seq_read_percent = 0.0;
 
164
double real_seq_write_percent = 0.0;
 
165
double real_rand_read_percent = 0.0;
 
166
int seq_read_size = 0;
 
167
int seq_write_size = 0;
 
168
int rand_read_size = 0;
 
169
 
 
170
struct AIO_Device:public Continuation
 
171
{
 
172
  char *path;
 
173
  int fd;
 
174
  int id;
 
175
  char *buf;
 
176
  ink_hrtime time_start, time_end;
 
177
  int seq_reads;
 
178
  int seq_writes;
 
179
  int rand_reads;
 
180
  int hotset_idx;
 
181
  int mode;
 
182
  AIOCallback *io;
 
183
    AIO_Device(ProxyMutex * m):Continuation(m)
 
184
  {
 
185
    hotset_idx = 0;
 
186
    io = new_AIOCallback();
 
187
    time_start = 0;
 
188
    SET_HANDLER(&AIO_Device::do_hotset);
 
189
  }
 
190
  int select_mode(double p)
 
191
  {
 
192
    if (p < real_seq_read_percent)
 
193
      return READ_MODE;
 
194
    else if (p < real_seq_read_percent + real_seq_write_percent)
 
195
      return WRITE_MODE;
 
196
    else
 
197
      return RANDOM_READ_MODE;
 
198
  };
 
199
  void do_touch_data(off_t orig_len, off_t orig_offset)
 
200
  {
 
201
    if (!touch_data)
 
202
      return;
 
203
    unsigned int len = (unsigned int) orig_len;
 
204
    unsigned int offset = (unsigned int) orig_offset;
 
205
    offset = offset % 1024;
 
206
    char *b = buf;
 
207
    unsigned *x = (unsigned *) b;
 
208
    for (unsigned j = 0; j < (len / sizeof(int)); j++) {
 
209
      x[j] = offset;
 
210
      offset = (offset + 1) % 1024;
 
211
    }
 
212
  };
 
213
  int do_check_data(off_t orig_len, off_t orig_offset)
 
214
  {
 
215
    if (!touch_data)
 
216
      return 0;
 
217
    unsigned int len = (unsigned int) orig_len;
 
218
    unsigned int offset = (unsigned int) orig_offset;
 
219
    offset = offset % 1024;
 
220
    unsigned *x = (unsigned *) buf;
 
221
    for (unsigned j = 0; j < (len / sizeof(int)); j++) {
 
222
      if (x[j] != offset)
 
223
        return 1;
 
224
      offset = (offset + 1) % 1024;
 
225
    }
 
226
    return 0;
 
227
  }
 
228
  int do_hotset(int event, Event * e);
 
229
  int do_fd(int event, Event * e);
 
230
 
 
231
};
 
232
 
 
233
void
 
234
dump_summary(void)
 
235
{
 
236
  /* dump timing info */
 
237
  printf("Writing summary info\n");
 
238
 
 
239
  printf("----------\n");
 
240
  printf("parameters\n");
 
241
  printf("----------\n");
 
242
  printf("%d disks\n", n_disk_path);
 
243
  printf("%d chains\n", chains);
 
244
  printf("%d threads_per_disk\n", threads_per_disk);
 
245
 
 
246
  printf("%0.1f percent %d byte seq_reads by volume\n", seq_read_percent * 100.0, seq_read_size);
 
247
  printf("%0.1f percent %d byte seq_writes by volume\n", seq_write_percent * 100.0, seq_write_size);
 
248
  printf("%0.1f percent %d byte rand_reads by volume\n", rand_read_percent * 100.0, rand_read_size);
 
249
  printf("-------\n");
 
250
  printf("factors\n");
 
251
  printf("-------\n");
 
252
  printf("%0.1f percent %d byte seq_reads by count\n", real_seq_read_percent * 100.0, seq_read_size);
 
253
  printf("%0.1f percent %d byte seq_writes by count\n", real_seq_write_percent * 100.0, seq_write_size);
 
254
  printf("%0.1f percent %d byte rand_reads by count\n", real_rand_read_percent * 100.0, rand_read_size);
 
255
 
 
256
  printf("-------------------------\n");
 
257
  printf("individual thread results\n");
 
258
  printf("-------------------------\n");
 
259
  double total_seq_reads = 0;
 
260
  double total_seq_writes = 0;
 
261
  double total_rand_reads = 0;
 
262
  double total_secs = 0.0;
 
263
  for (int i = 0; i < orig_n_accessors; i++) {
 
264
    double secs = (dev[i]->time_end - dev[i]->time_start) / 1000000000.0;
 
265
    double ops_sec = (dev[i]->seq_reads + dev[i]->seq_writes + dev[i]->rand_reads) / secs;
 
266
    printf("%s: #sr:%d #sw:%d #rr:%d %0.1f secs %0.1f ops/sec\n",
 
267
           dev[i]->path, dev[i]->seq_reads, dev[i]->seq_writes, dev[i]->rand_reads, secs, ops_sec);
 
268
    total_secs += secs;
 
269
    total_seq_reads += dev[i]->seq_reads;
 
270
    total_seq_writes += dev[i]->seq_writes;
 
271
    total_rand_reads += dev[i]->rand_reads;
 
272
  }
 
273
  printf("-----------------\n");
 
274
  printf("aggregate results\n");
 
275
  printf("-----------------\n");
 
276
  total_secs /= orig_n_accessors;
 
277
  float sr = (total_seq_reads * seq_read_size) / total_secs;
 
278
  sr /= 1024.0 * 1024.0;
 
279
  float sw = (total_seq_writes * seq_write_size) / total_secs;
 
280
  sw /= 1024.0 * 1024.0;
 
281
  float rr = (total_rand_reads * rand_read_size) / total_secs;
 
282
  rr /= 1024.0 * 1024.0;
 
283
  printf("%f ops %0.2f mbytes/sec %0.1f ops/sec %0.1f ops/sec/disk seq_read\n",
 
284
         total_seq_reads, sr, total_seq_reads / total_secs, total_seq_reads / total_secs / n_disk_path);
 
285
  printf("%f ops %0.2f mbytes/sec %0.1f ops/sec %0.1f ops/sec/disk seq_write\n",
 
286
         total_seq_writes, sw, total_seq_writes / total_secs, total_seq_writes / total_secs / n_disk_path);
 
287
  printf("%f ops %0.2f mbytes/sec %0.1f ops/sec %0.1f ops/sec/disk rand_read\n",
 
288
         total_rand_reads, rr, total_rand_reads / total_secs, total_rand_reads / total_secs / n_disk_path);
 
289
  printf("%0.2f total mbytes/sec\n", sr + sw + rr);
 
290
  printf("----------------------------------------------------------\n");
 
291
 
 
292
  if (delete_disks)
 
293
    for (int i = 0; i < n_disk_path; i++)
 
294
      unlink(disk_path[i]);
 
295
  exit(0);
 
296
}
 
297
 
 
298
int
 
299
AIO_Device::do_hotset(int event, Event * e)
 
300
{
 
301
  off_t max_offset = ((off_t) disk_size) * 1024 * 1024;
 
302
  io->aiocb.aio_lio_opcode = LIO_WRITE;
 
303
  io->aiocb.aio_fildes = fd;
 
304
  io->aiocb.aio_offset = MIN_OFFSET + hotset_idx * max_size;
 
305
  do_touch_data(seq_read_size, io->aiocb.aio_offset);
 
306
  ink_assert(!do_check_data(seq_read_size, io->aiocb.aio_offset));
 
307
  if (!hotset_idx)
 
308
    fprintf(stderr, "Starting hotset document writing \n");
 
309
  if (io->aiocb.aio_offset > max_offset) {
 
310
    fprintf(stderr,
 
311
            "Finished hotset documents  [%d] offset [%6.0f] size [%6.0f]\n",
 
312
            hotset_idx, (float) MIN_OFFSET, (float) max_size);
 
313
    SET_HANDLER(&AIO_Device::do_fd);
 
314
    eventProcessor.schedule_imm(this);
 
315
    return (0);
 
316
  }
 
317
  io->aiocb.aio_nbytes = seq_read_size;
 
318
  io->aiocb.aio_buf = buf;
 
319
  io->action = this;
 
320
  io->thread = mutex->thread_holding;
 
321
  ink_assert(ink_aio_write(io) >= 0);
 
322
  hotset_idx++;
 
323
  return 0;
 
324
}
 
325
 
 
326
int
 
327
AIO_Device::do_fd(int event, Event * e)
 
328
{
 
329
  if (!time_start) {
 
330
    time_start = ink_get_hrtime();
 
331
    fprintf(stderr, "Starting the aio_testing \n");
 
332
  }
 
333
  if ((ink_get_hrtime() - time_start) > (run_time * HRTIME_SECOND)) {
 
334
    time_end = ink_get_hrtime();
 
335
    ink_atomic_increment(&n_accessors, -1);
 
336
    if (n_accessors <= 0)
 
337
      dump_summary();
 
338
    return 0;
 
339
  }
 
340
 
 
341
  off_t max_offset = ((off_t) disk_size) * 1024 * 1024; // MB-GB
 
342
  off_t max_hotset_offset = ((off_t) hotset_size) * 1024 * 1024;        // MB-GB
 
343
  off_t seq_read_point = ((off_t) MIN_OFFSET);
 
344
  off_t seq_write_point = ((off_t) MIN_OFFSET) + max_offset / 2 + write_after * 1024 * 1024;
 
345
  seq_write_point += (id % n_disk_path) * (max_offset / (threads_per_disk * 4));
 
346
  if (seq_write_point > max_offset)
 
347
    seq_write_point = MIN_OFFSET;
 
348
 
 
349
  if (io->aiocb.aio_lio_opcode == LIO_READ) {
 
350
    ink_assert(!do_check_data(io->aiocb.aio_nbytes, io->aiocb.aio_offset));
 
351
  }
 
352
  memset((void *) buf, 0, max_size);
 
353
  io->aiocb.aio_fildes = fd;
 
354
  io->aiocb.aio_buf = buf;
 
355
  io->action = this;
 
356
  io->thread = mutex->thread_holding;
 
357
 
 
358
  switch (select_mode(drand48())) {
 
359
  case READ_MODE:
 
360
    io->aiocb.aio_offset = seq_read_point;
 
361
    io->aiocb.aio_nbytes = seq_read_size;
 
362
    io->aiocb.aio_lio_opcode = LIO_READ;
 
363
    ink_assert(ink_aio_read(io) >= 0);
 
364
    seq_read_point += seq_read_size;
 
365
    if (seq_read_point > max_offset)
 
366
      seq_read_point = MIN_OFFSET;
 
367
    seq_reads++;
 
368
    break;
 
369
  case WRITE_MODE:
 
370
    io->aiocb.aio_offset = seq_write_point;
 
371
    io->aiocb.aio_nbytes = seq_write_size;
 
372
    io->aiocb.aio_lio_opcode = LIO_WRITE;
 
373
    do_touch_data(seq_write_size, ((int) seq_write_point) % 1024);
 
374
    ink_assert(ink_aio_write(io) >= 0);
 
375
    seq_write_point += seq_write_size;
 
376
    seq_write_point += write_skip;
 
377
    if (seq_write_point > max_offset)
 
378
      seq_write_point = MIN_OFFSET;
 
379
 
 
380
    seq_writes++;
 
381
    break;
 
382
  case RANDOM_READ_MODE:{
 
383
      // fprintf(stderr, "random read started \n");
 
384
      double p, f;
 
385
      p = drand48();
 
386
      f = drand48();
 
387
      off_t o = 0;
 
388
      if (f < hotset_frequency)
 
389
        o = (off_t) p *max_hotset_offset;
 
390
      else
 
391
        o = (off_t) p *(max_offset - rand_read_size);
 
392
      if (o < MIN_OFFSET)
 
393
        o = MIN_OFFSET;
 
394
      o = (o + (seq_read_size - 1)) & (~(seq_read_size - 1));
 
395
      io->aiocb.aio_offset = o;
 
396
      io->aiocb.aio_nbytes = rand_read_size;
 
397
      io->aiocb.aio_lio_opcode = LIO_READ;
 
398
      ink_assert(ink_aio_read(io) >= 0);
 
399
      rand_reads++;
 
400
      break;
 
401
    }
 
402
  }
 
403
  return 0;
 
404
}
 
405
 
 
406
#define PARAM(_s) \
 
407
        else if (strcmp(field_name, #_s) == 0) { \
 
408
            fin >> _s; \
 
409
            cout << "reading " #_s " = "  \
 
410
                 << _s << endl; \
 
411
                                  }
 
412
 
 
413
int
 
414
read_config(const char *config_filename)
 
415
{
 
416
  std::ifstream fin(config_filename);
 
417
  char field_name[256];
 
418
  char field_value[256];
 
419
 
 
420
  if (!fin.rdbuf()->is_open()) {
 
421
    fin.open("sample.cfg");
 
422
    if (!fin.rdbuf()->is_open()) {
 
423
      cout << "cannot open config files " << config_filename << endl;
 
424
      return (0);
 
425
    }
 
426
  }
 
427
  while (!fin.eof()) {
 
428
    field_name[0] = '\0';
 
429
    fin >> field_name;
 
430
    if (0) {
 
431
    }
 
432
    PARAM(hotset_size)
 
433
      PARAM(hotset_frequency)
 
434
      PARAM(touch_data)
 
435
      PARAM(use_lseek)
 
436
      PARAM(write_after)
 
437
      PARAM(write_skip)
 
438
      PARAM(disk_size)
 
439
      PARAM(seq_read_percent)
 
440
      PARAM(seq_write_percent)
 
441
      PARAM(rand_read_percent)
 
442
      PARAM(seq_read_size)
 
443
      PARAM(seq_write_size)
 
444
      PARAM(rand_read_size)
 
445
      PARAM(run_time)
 
446
      PARAM(chains)
 
447
      PARAM(threads_per_disk)
 
448
      PARAM(delete_disks)
 
449
      else if (strcmp(field_name, "disk_path") == 0) {
 
450
      assert(n_disk_path < MAX_DISK_THREADS);
 
451
      fin >> field_value;
 
452
      disk_path[n_disk_path] = strdup(field_value);
 
453
      cout << "reading disk_path = " << disk_path[n_disk_path] << endl;
 
454
      n_disk_path++;
 
455
    }
 
456
  }
 
457
  assert(read_size > 0);
 
458
  int t = seq_read_size + seq_write_size + rand_read_size;
 
459
  real_seq_read_percent = seq_read_percent;
 
460
  real_seq_write_percent = seq_write_percent;
 
461
  real_rand_read_percent = rand_read_percent;
 
462
  if (seq_read_size)
 
463
    real_seq_read_percent *= t / seq_read_size;
 
464
  if (seq_write_size)
 
465
    real_seq_write_percent *= t / seq_write_size;
 
466
  if (rand_read_size)
 
467
    real_rand_read_percent *= t / rand_read_size;
 
468
  float tt = real_seq_read_percent + real_seq_write_percent + real_rand_read_percent;
 
469
  real_seq_read_percent = real_seq_read_percent / tt;
 
470
  real_seq_write_percent = real_seq_write_percent / tt;
 
471
  real_rand_read_percent = real_rand_read_percent / tt;
 
472
  return (1);
 
473
}
 
474
 
 
475
int
 
476
main(int argc, char *argv[])
 
477
{
 
478
  int i;
 
479
 
 
480
  Layout::create();
 
481
  init_diags("", NULL);
 
482
  RecProcessInit(RECM_STAND_ALONE);
 
483
  ink_event_system_init(EVENT_SYSTEM_MODULE_VERSION);
 
484
  eventProcessor.start(ink_number_of_processors());
 
485
  RecProcessStart();
 
486
  ink_aio_init(AIO_MODULE_VERSION);
 
487
  srand48(time(NULL));
 
488
 
 
489
  if (!read_config(argv[1]))
 
490
    exit(1);
 
491
 
 
492
  max_size = seq_read_size;
 
493
  if (seq_write_size > max_size)
 
494
    max_size = seq_write_size;
 
495
  if (rand_read_size > max_size)
 
496
    max_size = rand_read_size;
 
497
 
 
498
  cache_config_threads_per_disk = threads_per_disk;
 
499
  orig_n_accessors = n_disk_path * threads_per_disk;
 
500
 
 
501
  for (i = 0; i < n_disk_path; i++) {
 
502
    for (int j = 0; j < threads_per_disk; j++) {
 
503
      dev[n_accessors] = new AIO_Device(new_ProxyMutex());
 
504
      dev[n_accessors]->id = i * threads_per_disk + j;
 
505
      dev[n_accessors]->path = disk_path[i];
 
506
      dev[n_accessors]->seq_reads = 0;
 
507
      dev[n_accessors]->seq_writes = 0;
 
508
      dev[n_accessors]->rand_reads = 0;
 
509
      dev[n_accessors]->fd = open(dev[n_accessors]->path, O_RDWR | O_CREAT, 0600);
 
510
      fchmod(dev[n_accessors]->fd, S_IRWXU | S_IRWXG);
 
511
      if (dev[n_accessors]->fd < 0) {
 
512
        perror(disk_path[i]);
 
513
        exit(1);
 
514
      }
 
515
      dev[n_accessors]->buf = (char *) valloc(max_size);
 
516
      eventProcessor.schedule_imm(dev[n_accessors]);
 
517
      n_accessors++;
 
518
    }
 
519
  }
 
520
 
 
521
  this_thread()->execute();
 
522
}