1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
/***************************************************************
17
* I N C L U D E D F I L E S *
18
***************************************************************/
20
#include <ndb_global.h>
22
#include "dbGenerator.h"
27
/***************************************************************
28
* L O C A L C O N S T A N T S *
29
***************************************************************/
31
/***************************************************************
32
* L O C A L D A T A S T R U C T U R E S *
33
***************************************************************/
35
/***************************************************************
36
* L O C A L F U N C T I O N S *
37
***************************************************************/
39
static void getRandomSubscriberNumber(SubscriberNumber number);
40
static void getRandomServerId(ServerId *serverId);
41
static void getRandomChangedBy(ChangedBy changedBy);
42
static void getRandomChangedTime(ChangedTime changedTime);
44
static void clearTransaction(TransactionDefinition *trans);
45
static void initGeneratorStatistics(GeneratorStatistics *gen);
47
static void doOneTransaction(ThreadData * td,
52
static void doTransaction_T1(Ndb * pNDB, ThreadData * td, int async);
53
static void doTransaction_T2(Ndb * pNDB, ThreadData * td, int async);
54
static void doTransaction_T3(Ndb * pNDB, ThreadData * td, int async);
55
static void doTransaction_T4(Ndb * pNDB, ThreadData * td, int async);
56
static void doTransaction_T5(Ndb * pNDB, ThreadData * td, int async);
58
/***************************************************************
60
***************************************************************/
62
static SequenceValues transactionDefinition[] = {
71
static SequenceValues rollbackDefinition[] = {
77
static int maxsize = 0;
79
/***************************************************************
80
* P U B L I C D A T A *
81
***************************************************************/
83
/***************************************************************
84
****************************************************************
85
* L O C A L F U N C T I O N S C O D E S E C T I O N *
86
****************************************************************
87
***************************************************************/
89
static void getRandomSubscriberNumber(SubscriberNumber number)
92
char sbuf[SUBSCRIBER_NUMBER_LENGTH + 1];
93
tmp = myRandom48(NO_OF_SUBSCRIBERS);
94
sprintf(sbuf, "%.*d", SUBSCRIBER_NUMBER_LENGTH, tmp);
95
memcpy(number, sbuf, SUBSCRIBER_NUMBER_LENGTH);
98
static void getRandomServerId(ServerId *serverId)
100
*serverId = myRandom48(NO_OF_SERVERS);
103
static void getRandomChangedBy(ChangedBy changedBy)
105
memset(changedBy, myRandom48(26)+'A', CHANGED_BY_LENGTH);
106
changedBy[CHANGED_BY_LENGTH] = 0;
109
static void getRandomChangedTime(ChangedTime changedTime)
111
memset(changedTime, myRandom48(26)+'A', CHANGED_TIME_LENGTH);
112
changedTime[CHANGED_TIME_LENGTH] = 0;
115
static void clearTransaction(TransactionDefinition *trans)
118
trans->branchExecuted = 0;
119
trans->rollbackExecuted = 0;
120
trans->latencyCounter = myRandom48(127);
121
trans->latency.reset();
124
static int listFull(SessionList *list)
126
return(list->numberInList == SESSION_LIST_LENGTH);
129
static int listEmpty(SessionList *list)
131
return(list->numberInList == 0);
134
static void insertSession(SessionList *list,
135
SubscriberNumber number,
139
if( listFull(list) ) return;
141
e = &list->list[list->writeIndex];
143
strcpy(e->subscriberNumber, number);
144
e->serverId = serverId;
146
list->writeIndex = (list->writeIndex + 1) % SESSION_LIST_LENGTH;
147
list->numberInList++;
149
if( list->numberInList > maxsize )
150
maxsize = list->numberInList;
153
static SessionElement *getNextSession(SessionList *list)
155
if( listEmpty(list) ) return(0);
157
return(&list->list[list->readIndex]);
160
static void deleteSession(SessionList *list)
162
if( listEmpty(list) ) return;
164
list->readIndex = (list->readIndex + 1) % SESSION_LIST_LENGTH;
165
list->numberInList--;
168
static void initGeneratorStatistics(GeneratorStatistics *gen)
172
if( initSequence(&gen->transactionSequence,
173
transactionDefinition) != 0 ) {
174
ndbout_c("could not set the transaction types");
178
if( initSequence(&gen->rollbackSequenceT4,
179
rollbackDefinition) != 0 ) {
180
ndbout_c("could not set the rollback sequence");
184
if( initSequence(&gen->rollbackSequenceT5,
185
rollbackDefinition) != 0 ) {
186
ndbout_c("could not set the rollback sequence");
190
for(i = 0; i < NUM_TRANSACTION_TYPES; i++ )
191
clearTransaction(&gen->transactions[i]);
193
gen->totalTransactions = 0;
195
gen->activeSessions.numberInList = 0;
196
gen->activeSessions.readIndex = 0;
197
gen->activeSessions.writeIndex = 0;
203
doOneTransaction(ThreadData * td, int p, int millis, int minEvents, int force)
206
unsigned int transactionType;
211
for(i = 0; i<p; i++){
212
if(td[i].runState == Runnable){
213
transactionType = getNextRandom(&td[i].generator.transactionSequence);
215
switch(transactionType) {
217
doTransaction_T1(td[i].pNDB, &td[i], async);
220
doTransaction_T2(td[i].pNDB, &td[i], async);
223
doTransaction_T3(td[i].pNDB, &td[i], async);
226
doTransaction_T4(td[i].pNDB, &td[i], async);
229
doTransaction_T5(td[i].pNDB, &td[i], async);
232
ndbout_c("Unknown transaction type: %d", transactionType);
237
td[0].pNDB->sendPollNdb(millis, minEvents, force);
243
doTransaction_T1(Ndb * pNDB, ThreadData * td, int async)
248
getRandomSubscriberNumber(td->transactionData.number);
249
getRandomChangedBy(td->transactionData.changed_by);
250
BaseString::snprintf(td->transactionData.changed_time,
251
sizeof(td->transactionData.changed_time),
252
"%ld - %d", td->changedTime++, myRandom48(65536*1024));
253
//getRandomChangedTime(td->transactionData.changed_time);
254
td->transactionData.location = td->transactionData.changed_by[0];
256
/*-----------------*/
257
/* Run transaction */
258
/*-----------------*/
259
td->runState = Running;
260
td->generator.transactions[0].startLatency();
262
start_T1(pNDB, td, async);
267
doTransaction_T2(Ndb * pNDB, ThreadData * td, int async)
272
getRandomSubscriberNumber(td->transactionData.number);
274
/*-----------------*/
275
/* Run transaction */
276
/*-----------------*/
277
td->runState = Running;
278
td->generator.transactions[1].startLatency();
280
start_T2(pNDB, td, async);
285
doTransaction_T3(Ndb * pNDB, ThreadData * td, int async)
292
se = getNextSession(&td->generator.activeSessions);
294
strcpy(td->transactionData.number, se->subscriberNumber);
295
td->transactionData.server_id = se->serverId;
296
td->transactionData.sessionElement = 1;
298
getRandomSubscriberNumber(td->transactionData.number);
299
getRandomServerId(&td->transactionData.server_id);
300
td->transactionData.sessionElement = 0;
303
td->transactionData.server_bit = (1 << td->transactionData.server_id);
305
/*-----------------*/
306
/* Run transaction */
307
/*-----------------*/
308
td->runState = Running;
309
td->generator.transactions[2].startLatency();
310
start_T3(pNDB, td, async);
315
doTransaction_T4(Ndb * pNDB, ThreadData * td, int async)
320
getRandomSubscriberNumber(td->transactionData.number);
321
getRandomServerId(&td->transactionData.server_id);
323
td->transactionData.server_bit = (1 << td->transactionData.server_id);
324
td->transactionData.do_rollback =
325
getNextRandom(&td->generator.rollbackSequenceT4);
327
memset(td->transactionData.session_details+2,
328
myRandom48(26)+'A', SESSION_DETAILS_LENGTH-3);
329
td->transactionData.session_details[SESSION_DETAILS_LENGTH-1] = 0;
330
int2store(td->transactionData.session_details,SESSION_DETAILS_LENGTH-2);
332
/*-----------------*/
333
/* Run transaction */
334
/*-----------------*/
335
td->runState = Running;
336
td->generator.transactions[3].startLatency();
337
start_T4(pNDB, td, async);
342
doTransaction_T5(Ndb * pNDB, ThreadData * td, int async)
345
se = getNextSession(&td->generator.activeSessions);
347
strcpy(td->transactionData.number, se->subscriberNumber);
348
td->transactionData.server_id = se->serverId;
349
td->transactionData.sessionElement = 1;
352
getRandomSubscriberNumber(td->transactionData.number);
353
getRandomServerId(&td->transactionData.server_id);
354
td->transactionData.sessionElement = 0;
357
td->transactionData.server_bit = (1 << td->transactionData.server_id);
358
td->transactionData.do_rollback
359
= getNextRandom(&td->generator.rollbackSequenceT5);
361
/*-----------------*/
362
/* Run transaction */
363
/*-----------------*/
364
td->runState = Running;
365
td->generator.transactions[4].startLatency();
366
start_T5(pNDB, td, async);
370
complete_T1(ThreadData * data){
371
data->generator.transactions[0].stopLatency();
372
data->generator.transactions[0].count++;
374
data->runState = Runnable;
375
data->generator.totalTransactions++;
379
complete_T2(ThreadData * data){
380
data->generator.transactions[1].stopLatency();
381
data->generator.transactions[1].count++;
383
data->runState = Runnable;
384
data->generator.totalTransactions++;
388
complete_T3(ThreadData * data){
390
data->generator.transactions[2].stopLatency();
391
data->generator.transactions[2].count++;
393
if(data->transactionData.branchExecuted)
394
data->generator.transactions[2].branchExecuted++;
396
data->runState = Runnable;
397
data->generator.totalTransactions++;
401
complete_T4(ThreadData * data){
403
data->generator.transactions[3].stopLatency();
404
data->generator.transactions[3].count++;
406
if(data->transactionData.branchExecuted)
407
data->generator.transactions[3].branchExecuted++;
408
if(data->transactionData.do_rollback)
409
data->generator.transactions[3].rollbackExecuted++;
411
if(data->transactionData.branchExecuted &&
412
!data->transactionData.do_rollback){
413
insertSession(&data->generator.activeSessions,
414
data->transactionData.number,
415
data->transactionData.server_id);
418
data->runState = Runnable;
419
data->generator.totalTransactions++;
423
complete_T5(ThreadData * data){
425
data->generator.transactions[4].stopLatency();
426
data->generator.transactions[4].count++;
428
if(data->transactionData.branchExecuted)
429
data->generator.transactions[4].branchExecuted++;
430
if(data->transactionData.do_rollback)
431
data->generator.transactions[4].rollbackExecuted++;
433
if(data->transactionData.sessionElement &&
434
!data->transactionData.do_rollback){
435
deleteSession(&data->generator.activeSessions);
438
data->runState = Runnable;
439
data->generator.totalTransactions++;
442
/***************************************************************
443
****************************************************************
444
* P U B L I C F U N C T I O N S C O D E S E C T I O N *
445
****************************************************************
446
***************************************************************/
448
asyncGenerator(ThreadData *data,
451
int minEventSendPoll,
454
ThreadData * startUp;
456
GeneratorStatistics *st;
458
double benchTimeStart;
462
myRandom48Init(data->randomSeed);
464
for(i = 0; i<parallellism; i++){
465
initGeneratorStatistics(&data[i].generator);
468
startUp = (ThreadData*)malloc(parallellism * sizeof(ThreadData));
469
memcpy(startUp, data, (parallellism * sizeof(ThreadData)));
474
periodStop = userGetTime() + (double)data[0].warmUpSeconds;
476
while(userGetTime() < periodStop){
477
doOneTransaction(startUp, parallellism,
478
millisSendPoll, minEventSendPoll, forceSendPoll);
481
ndbout_c("Waiting for startup to finish");
484
* Wait for all transactions
489
for(i = 0; i<parallellism; i++){
490
if(startUp[i].runState != Runnable){
496
startUp[0].pNDB->sendPollNdb();
499
ndbout_c("Benchmark period starts");
501
/*-------------------------*/
502
/* normal benchmark period */
503
/*-------------------------*/
504
benchTimeStart = userGetTime();
506
periodStop = benchTimeStart + (double)data[0].testSeconds;
507
while(userGetTime() < periodStop)
508
doOneTransaction(data, parallellism,
509
millisSendPoll, minEventSendPoll, forceSendPoll);
511
benchTimeEnd = userGetTime();
513
ndbout_c("Benchmark period done");
516
* Wait for all transactions
521
for(i = 0; i<parallellism; i++){
522
if(data[i].runState != Runnable){
528
data[0].pNDB->sendPollNdb();
532
/*------------------*/
533
/* cool down period */
534
/*------------------*/
535
periodStop = userGetTime() + (double)data[0].coolDownSeconds;
536
while(userGetTime() < periodStop){
537
doOneTransaction(startUp, parallellism,
538
millisSendPoll, minEventSendPoll, forceSendPoll);
544
for(i = 0; i<parallellism; i++){
545
if(startUp[i].runState != Runnable){
551
startUp[0].pNDB->sendPollNdb();
556
/*---------------------------------------------------------*/
557
/* add the times for all transaction for inner loop timing */
558
/*---------------------------------------------------------*/
559
for(j = 0; j<parallellism; j++){
560
st = &data[j].generator;
562
st->outerLoopTime = benchTimeEnd - benchTimeStart;
563
st->outerTps = getTps(st->totalTransactions, st->outerLoopTime);
565
/* ndbout_c("maxsize = %d\n",maxsize); */