~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/test/ndbapi/bench/asyncGenerator.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/***************************************************************
 
17
* I N C L U D E D   F I L E S                                  *
 
18
***************************************************************/
 
19
 
 
20
#include <ndb_global.h>
 
21
 
 
22
#include "dbGenerator.h"
 
23
#include <NdbApi.hpp>
 
24
#include <NdbOut.hpp>
 
25
#include <NdbSleep.h>
 
26
 
 
27
/***************************************************************
 
28
* L O C A L   C O N S T A N T S                                *
 
29
***************************************************************/
 
30
 
 
31
/***************************************************************
 
32
* L O C A L   D A T A   S T R U C T U R E S                    *
 
33
***************************************************************/
 
34
 
 
35
/***************************************************************
 
36
* L O C A L   F U N C T I O N S                                *
 
37
***************************************************************/
 
38
 
 
39
static void getRandomSubscriberNumber(SubscriberNumber number);
 
40
static void getRandomServerId(ServerId *serverId);
 
41
static void getRandomChangedBy(ChangedBy changedBy);
 
42
static void getRandomChangedTime(ChangedTime changedTime);
 
43
 
 
44
static void clearTransaction(TransactionDefinition *trans);
 
45
static void initGeneratorStatistics(GeneratorStatistics *gen);
 
46
 
 
47
static void doOneTransaction(ThreadData * td, 
 
48
                             int parallellism,
 
49
                             int millisSendPoll,
 
50
                             int minEventSendPoll,
 
51
                             int forceSendPoll);
 
52
static void doTransaction_T1(Ndb * pNDB, ThreadData * td, int async);
 
53
static void doTransaction_T2(Ndb * pNDB, ThreadData * td, int async);
 
54
static void doTransaction_T3(Ndb * pNDB, ThreadData * td, int async);
 
55
static void doTransaction_T4(Ndb * pNDB, ThreadData * td, int async);
 
56
static void doTransaction_T5(Ndb * pNDB, ThreadData * td, int async);
 
57
 
 
58
/***************************************************************
 
59
* L O C A L   D A T A                                          *
 
60
***************************************************************/
 
61
 
 
62
static SequenceValues transactionDefinition[] = {
 
63
   {25, 1},
 
64
   {25, 2},
 
65
   {20, 3},
 
66
   {15, 4},
 
67
   {15, 5},
 
68
   {0,  0}
 
69
};
 
70
 
 
71
static SequenceValues rollbackDefinition[] = {
 
72
   {98, 0},
 
73
   {2 , 1},
 
74
   {0,  0}
 
75
};
 
76
 
 
77
static int maxsize = 0;
 
78
 
 
79
/***************************************************************
 
80
* P U B L I C   D A T A                                        *
 
81
***************************************************************/
 
82
 
 
83
/***************************************************************
 
84
****************************************************************
 
85
* L O C A L   F U N C T I O N S   C O D E   S E C T I O N      *
 
86
****************************************************************
 
87
***************************************************************/
 
88
 
 
89
static void getRandomSubscriberNumber(SubscriberNumber number)
 
90
{
 
91
   uint32 tmp;
 
92
   char sbuf[SUBSCRIBER_NUMBER_LENGTH + 1];
 
93
   tmp = myRandom48(NO_OF_SUBSCRIBERS);
 
94
   sprintf(sbuf, "%.*d", SUBSCRIBER_NUMBER_LENGTH, tmp);
 
95
   memcpy(number, sbuf, SUBSCRIBER_NUMBER_LENGTH);
 
96
}
 
97
 
 
98
static void getRandomServerId(ServerId *serverId)
 
99
{
 
100
   *serverId = myRandom48(NO_OF_SERVERS);
 
101
}
 
102
 
 
103
static void getRandomChangedBy(ChangedBy changedBy)
 
104
{
 
105
   memset(changedBy, myRandom48(26)+'A', CHANGED_BY_LENGTH);
 
106
   changedBy[CHANGED_BY_LENGTH] = 0;
 
107
}
 
108
 
 
109
static void getRandomChangedTime(ChangedTime changedTime)
 
110
{
 
111
   memset(changedTime, myRandom48(26)+'A', CHANGED_TIME_LENGTH);
 
112
   changedTime[CHANGED_TIME_LENGTH] = 0;
 
113
}
 
114
 
 
115
static void clearTransaction(TransactionDefinition *trans)
 
116
{
 
117
  trans->count            = 0;
 
118
  trans->branchExecuted   = 0;
 
119
  trans->rollbackExecuted = 0;
 
120
  trans->latencyCounter   = myRandom48(127);
 
121
  trans->latency.reset();
 
122
}
 
123
 
 
124
static int listFull(SessionList *list)
 
125
{
 
126
   return(list->numberInList == SESSION_LIST_LENGTH);
 
127
}
 
128
 
 
129
static int listEmpty(SessionList *list)
 
130
{
 
131
   return(list->numberInList == 0);
 
132
}
 
133
 
 
134
static void insertSession(SessionList     *list, 
 
135
                          SubscriberNumber number,
 
136
                          ServerId         serverId)
 
137
{
 
138
   SessionElement *e;
 
139
   if( listFull(list) ) return;
 
140
 
 
141
   e = &list->list[list->writeIndex];
 
142
 
 
143
   strcpy(e->subscriberNumber, number);
 
144
   e->serverId = serverId;
 
145
 
 
146
   list->writeIndex = (list->writeIndex + 1) % SESSION_LIST_LENGTH;
 
147
   list->numberInList++;
 
148
 
 
149
   if( list->numberInList > maxsize )
 
150
     maxsize = list->numberInList;
 
151
}
 
152
 
 
153
static SessionElement *getNextSession(SessionList *list)
 
154
{
 
155
   if( listEmpty(list) ) return(0);
 
156
 
 
157
   return(&list->list[list->readIndex]);
 
158
}
 
159
 
 
160
static void deleteSession(SessionList *list)
 
161
{
 
162
   if( listEmpty(list) ) return;
 
163
 
 
164
   list->readIndex = (list->readIndex + 1) % SESSION_LIST_LENGTH;
 
165
   list->numberInList--;
 
166
}
 
167
 
 
168
static void initGeneratorStatistics(GeneratorStatistics *gen)
 
169
{
 
170
   int i;
 
171
 
 
172
   if( initSequence(&gen->transactionSequence,
 
173
                    transactionDefinition) != 0 ) {
 
174
      ndbout_c("could not set the transaction types");
 
175
      exit(0);
 
176
   }
 
177
 
 
178
   if( initSequence(&gen->rollbackSequenceT4,
 
179
                    rollbackDefinition) != 0 ) {
 
180
      ndbout_c("could not set the rollback sequence");
 
181
      exit(0);
 
182
   }
 
183
 
 
184
   if( initSequence(&gen->rollbackSequenceT5,
 
185
                    rollbackDefinition) != 0 ) {
 
186
      ndbout_c("could not set the rollback sequence");
 
187
      exit(0);
 
188
   }
 
189
 
 
190
   for(i = 0; i < NUM_TRANSACTION_TYPES; i++ )
 
191
      clearTransaction(&gen->transactions[i]);
 
192
 
 
193
   gen->totalTransactions = 0;
 
194
 
 
195
   gen->activeSessions.numberInList = 0;
 
196
   gen->activeSessions.readIndex    = 0;
 
197
   gen->activeSessions.writeIndex   = 0;
 
198
}
 
199
 
 
200
 
 
201
static 
 
202
void 
 
203
doOneTransaction(ThreadData * td, int p, int millis, int minEvents, int force)
 
204
{
 
205
  int i;
 
206
  unsigned int transactionType;
 
207
  int async = 1;
 
208
  if (p == 1) {
 
209
    async = 0;
 
210
  }//if
 
211
  for(i = 0; i<p; i++){
 
212
    if(td[i].runState == Runnable){
 
213
      transactionType = getNextRandom(&td[i].generator.transactionSequence);
 
214
 
 
215
      switch(transactionType) {
 
216
      case 1:
 
217
        doTransaction_T1(td[i].pNDB, &td[i], async);
 
218
        break;
 
219
      case 2:
 
220
        doTransaction_T2(td[i].pNDB, &td[i], async);
 
221
        break;
 
222
      case 3:
 
223
        doTransaction_T3(td[i].pNDB, &td[i], async);
 
224
        break;
 
225
      case 4:
 
226
        doTransaction_T4(td[i].pNDB, &td[i], async);
 
227
        break;
 
228
      case 5:
 
229
        doTransaction_T5(td[i].pNDB, &td[i], async);
 
230
        break;
 
231
      default:
 
232
        ndbout_c("Unknown transaction type: %d", transactionType);
 
233
      }
 
234
    }
 
235
  }
 
236
  if (async == 1) {
 
237
    td[0].pNDB->sendPollNdb(millis, minEvents, force);
 
238
  }//if
 
239
}  
 
240
 
 
241
static 
 
242
void 
 
243
doTransaction_T1(Ndb * pNDB, ThreadData * td, int async)
 
244
{
 
245
  /*----------------*/
 
246
  /* Init arguments */
 
247
  /*----------------*/
 
248
  getRandomSubscriberNumber(td->transactionData.number);
 
249
  getRandomChangedBy(td->transactionData.changed_by);
 
250
  BaseString::snprintf(td->transactionData.changed_time,
 
251
           sizeof(td->transactionData.changed_time),
 
252
           "%ld - %d", td->changedTime++, myRandom48(65536*1024));
 
253
  //getRandomChangedTime(td->transactionData.changed_time);
 
254
  td->transactionData.location = td->transactionData.changed_by[0];
 
255
  
 
256
  /*-----------------*/
 
257
  /* Run transaction */
 
258
  /*-----------------*/
 
259
  td->runState = Running;
 
260
  td->generator.transactions[0].startLatency();
 
261
 
 
262
  start_T1(pNDB, td, async);
 
263
}
 
264
 
 
265
static
 
266
void 
 
267
doTransaction_T2(Ndb * pNDB, ThreadData * td, int async)
 
268
{
 
269
  /*----------------*/
 
270
  /* Init arguments */
 
271
  /*----------------*/
 
272
  getRandomSubscriberNumber(td->transactionData.number);
 
273
 
 
274
  /*-----------------*/
 
275
  /* Run transaction */
 
276
  /*-----------------*/
 
277
  td->runState = Running;
 
278
  td->generator.transactions[1].startLatency();
 
279
 
 
280
  start_T2(pNDB, td, async);
 
281
}
 
282
 
 
283
static
 
284
void 
 
285
doTransaction_T3(Ndb * pNDB, ThreadData * td, int async)
 
286
{
 
287
  SessionElement  *se;
 
288
  
 
289
  /*----------------*/
 
290
  /* Init arguments */
 
291
  /*----------------*/
 
292
  se = getNextSession(&td->generator.activeSessions);
 
293
  if( se ) {
 
294
    strcpy(td->transactionData.number, se->subscriberNumber);
 
295
    td->transactionData.server_id = se->serverId;
 
296
    td->transactionData.sessionElement = 1;
 
297
  } else {
 
298
    getRandomSubscriberNumber(td->transactionData.number);
 
299
    getRandomServerId(&td->transactionData.server_id);
 
300
    td->transactionData.sessionElement = 0;
 
301
  }
 
302
  
 
303
  td->transactionData.server_bit = (1 << td->transactionData.server_id);
 
304
 
 
305
  /*-----------------*/
 
306
  /* Run transaction */
 
307
  /*-----------------*/
 
308
  td->runState = Running;
 
309
  td->generator.transactions[2].startLatency();
 
310
  start_T3(pNDB, td, async);
 
311
}
 
312
 
 
313
static 
 
314
void 
 
315
doTransaction_T4(Ndb * pNDB, ThreadData * td, int async)
 
316
{
 
317
   /*----------------*/
 
318
   /* Init arguments */
 
319
   /*----------------*/
 
320
  getRandomSubscriberNumber(td->transactionData.number);
 
321
  getRandomServerId(&td->transactionData.server_id);
 
322
  
 
323
  td->transactionData.server_bit = (1 << td->transactionData.server_id);
 
324
  td->transactionData.do_rollback = 
 
325
    getNextRandom(&td->generator.rollbackSequenceT4);
 
326
 
 
327
  memset(td->transactionData.session_details+2, 
 
328
         myRandom48(26)+'A', SESSION_DETAILS_LENGTH-3);
 
329
  td->transactionData.session_details[SESSION_DETAILS_LENGTH-1] = 0;
 
330
  int2store(td->transactionData.session_details,SESSION_DETAILS_LENGTH-2);
 
331
  
 
332
  /*-----------------*/
 
333
  /* Run transaction */
 
334
  /*-----------------*/
 
335
  td->runState = Running;
 
336
  td->generator.transactions[3].startLatency();
 
337
  start_T4(pNDB, td, async);
 
338
}
 
339
 
 
340
static 
 
341
void 
 
342
doTransaction_T5(Ndb * pNDB, ThreadData * td, int async)
 
343
{
 
344
  SessionElement * se;
 
345
  se = getNextSession(&td->generator.activeSessions);
 
346
  if( se ) {
 
347
    strcpy(td->transactionData.number, se->subscriberNumber);
 
348
    td->transactionData.server_id = se->serverId;
 
349
    td->transactionData.sessionElement = 1;
 
350
  }
 
351
  else {
 
352
    getRandomSubscriberNumber(td->transactionData.number);
 
353
    getRandomServerId(&td->transactionData.server_id);
 
354
    td->transactionData.sessionElement = 0;
 
355
  }
 
356
  
 
357
  td->transactionData.server_bit = (1 << td->transactionData.server_id);
 
358
  td->transactionData.do_rollback  
 
359
    = getNextRandom(&td->generator.rollbackSequenceT5);
 
360
  
 
361
  /*-----------------*/
 
362
  /* Run transaction */
 
363
  /*-----------------*/
 
364
  td->runState = Running;
 
365
  td->generator.transactions[4].startLatency();
 
366
  start_T5(pNDB, td, async);
 
367
}
 
368
 
 
369
void
 
370
complete_T1(ThreadData * data){
 
371
  data->generator.transactions[0].stopLatency();
 
372
  data->generator.transactions[0].count++;
 
373
 
 
374
  data->runState = Runnable;
 
375
  data->generator.totalTransactions++;
 
376
}
 
377
 
 
378
void 
 
379
complete_T2(ThreadData * data){
 
380
  data->generator.transactions[1].stopLatency();
 
381
  data->generator.transactions[1].count++;
 
382
 
 
383
  data->runState = Runnable;
 
384
  data->generator.totalTransactions++;
 
385
}
 
386
 
 
387
void 
 
388
complete_T3(ThreadData * data){
 
389
 
 
390
  data->generator.transactions[2].stopLatency();
 
391
  data->generator.transactions[2].count++;
 
392
 
 
393
  if(data->transactionData.branchExecuted)
 
394
    data->generator.transactions[2].branchExecuted++;
 
395
 
 
396
  data->runState = Runnable;
 
397
  data->generator.totalTransactions++;
 
398
}
 
399
 
 
400
void 
 
401
complete_T4(ThreadData * data){
 
402
 
 
403
  data->generator.transactions[3].stopLatency();
 
404
  data->generator.transactions[3].count++;
 
405
 
 
406
  if(data->transactionData.branchExecuted)
 
407
    data->generator.transactions[3].branchExecuted++;
 
408
  if(data->transactionData.do_rollback)
 
409
    data->generator.transactions[3].rollbackExecuted++;
 
410
  
 
411
  if(data->transactionData.branchExecuted &&
 
412
     !data->transactionData.do_rollback){
 
413
    insertSession(&data->generator.activeSessions, 
 
414
                  data->transactionData.number, 
 
415
                  data->transactionData.server_id);
 
416
  }
 
417
 
 
418
  data->runState = Runnable;
 
419
  data->generator.totalTransactions++;
 
420
 
 
421
}
 
422
void 
 
423
complete_T5(ThreadData * data){
 
424
 
 
425
  data->generator.transactions[4].stopLatency();
 
426
  data->generator.transactions[4].count++;
 
427
 
 
428
  if(data->transactionData.branchExecuted)
 
429
    data->generator.transactions[4].branchExecuted++;
 
430
  if(data->transactionData.do_rollback)
 
431
    data->generator.transactions[4].rollbackExecuted++;
 
432
  
 
433
  if(data->transactionData.sessionElement && 
 
434
     !data->transactionData.do_rollback){
 
435
    deleteSession(&data->generator.activeSessions);
 
436
  }
 
437
  
 
438
  data->runState = Runnable;
 
439
  data->generator.totalTransactions++;
 
440
}
 
441
 
 
442
/***************************************************************
 
443
****************************************************************
 
444
* P U B L I C   F U N C T I O N S   C O D E   S E C T I O N    *
 
445
****************************************************************
 
446
***************************************************************/
 
447
void 
 
448
asyncGenerator(ThreadData *data, 
 
449
               int parallellism, 
 
450
               int millisSendPoll,
 
451
               int minEventSendPoll,
 
452
               int forceSendPoll)
 
453
{
 
454
  ThreadData * startUp;
 
455
  
 
456
  GeneratorStatistics *st;
 
457
  double periodStop;
 
458
  double benchTimeStart;
 
459
  double benchTimeEnd;
 
460
  int i, j, done;
 
461
 
 
462
  myRandom48Init(data->randomSeed);
 
463
  
 
464
  for(i = 0; i<parallellism; i++){
 
465
    initGeneratorStatistics(&data[i].generator);
 
466
   }
 
467
 
 
468
  startUp = (ThreadData*)malloc(parallellism * sizeof(ThreadData));
 
469
  memcpy(startUp, data, (parallellism * sizeof(ThreadData)));
 
470
  
 
471
  /*----------------*/
 
472
  /* warm up period */
 
473
  /*----------------*/
 
474
  periodStop = userGetTime() + (double)data[0].warmUpSeconds;
 
475
  
 
476
  while(userGetTime() < periodStop){
 
477
    doOneTransaction(startUp, parallellism, 
 
478
                     millisSendPoll, minEventSendPoll, forceSendPoll);
 
479
  }
 
480
  
 
481
  ndbout_c("Waiting for startup to finish");
 
482
 
 
483
  /**
 
484
   * Wait for all transactions
 
485
   */
 
486
  done = 0;
 
487
  while(!done){
 
488
    done = 1;
 
489
    for(i = 0; i<parallellism; i++){
 
490
      if(startUp[i].runState != Runnable){
 
491
        done = 0;
 
492
        break;
 
493
      }
 
494
    }
 
495
    if(!done){
 
496
      startUp[0].pNDB->sendPollNdb();
 
497
    }
 
498
  }
 
499
  ndbout_c("Benchmark period starts");
 
500
 
 
501
  /*-------------------------*/
 
502
  /* normal benchmark period */
 
503
  /*-------------------------*/
 
504
  benchTimeStart = userGetTime();
 
505
  
 
506
  periodStop = benchTimeStart + (double)data[0].testSeconds;
 
507
  while(userGetTime() < periodStop)
 
508
    doOneTransaction(data, parallellism,
 
509
                     millisSendPoll, minEventSendPoll, forceSendPoll);  
 
510
 
 
511
  benchTimeEnd = userGetTime();
 
512
  
 
513
  ndbout_c("Benchmark period done");
 
514
 
 
515
  /**
 
516
   * Wait for all transactions
 
517
   */
 
518
  done = 0;
 
519
  while(!done){
 
520
    done = 1;
 
521
    for(i = 0; i<parallellism; i++){
 
522
      if(data[i].runState != Runnable){
 
523
        done = 0;
 
524
        break;
 
525
      }
 
526
    }
 
527
    if(!done){
 
528
      data[0].pNDB->sendPollNdb();
 
529
    }
 
530
  }
 
531
 
 
532
  /*------------------*/
 
533
  /* cool down period */
 
534
   /*------------------*/
 
535
  periodStop = userGetTime() + (double)data[0].coolDownSeconds;
 
536
  while(userGetTime() < periodStop){
 
537
    doOneTransaction(startUp, parallellism,
 
538
                     millisSendPoll, minEventSendPoll, forceSendPoll);
 
539
  }
 
540
 
 
541
  done = 0;
 
542
  while(!done){
 
543
    done = 1;
 
544
    for(i = 0; i<parallellism; i++){
 
545
      if(startUp[i].runState != Runnable){
 
546
        done = 0;
 
547
        break;
 
548
      }
 
549
    }
 
550
    if(!done){
 
551
      startUp[0].pNDB->sendPollNdb();
 
552
    }
 
553
  }
 
554
 
 
555
 
 
556
  /*---------------------------------------------------------*/
 
557
  /* add the times for all transaction for inner loop timing */
 
558
  /*---------------------------------------------------------*/
 
559
  for(j = 0; j<parallellism; j++){
 
560
    st = &data[j].generator;
 
561
    
 
562
    st->outerLoopTime = benchTimeEnd - benchTimeStart;
 
563
    st->outerTps      = getTps(st->totalTransactions, st->outerLoopTime);
 
564
  }
 
565
  /* ndbout_c("maxsize = %d\n",maxsize); */
 
566
 
 
567
  free(startUp);
 
568
}
 
569