~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/test/ndbapi/bench/ndb_async2.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
//#define DEBUG_ON
 
17
 
 
18
#include <string.h>
 
19
#include "userInterface.h"
 
20
 
 
21
#include "macros.h"
 
22
#include "ndb_schema.hpp"
 
23
#include "ndb_error.hpp"
 
24
#include <NdbSleep.h>
 
25
 
 
26
#include <NdbApi.hpp>
 
27
 
 
28
void T1_Callback(int result, NdbConnection * pCon, void * threadData);
 
29
void T2_Callback(int result, NdbConnection * pCon, void * threadData);
 
30
void T3_Callback_1(int result, NdbConnection * pCon, void * threadData);
 
31
void T3_Callback_2(int result, NdbConnection * pCon, void * threadData);
 
32
void T3_Callback_3(int result, NdbConnection * pCon, void * threadData);
 
33
void T4_Callback_1(int result, NdbConnection * pCon, void * threadData);
 
34
void T4_Callback_2(int result, NdbConnection * pCon, void * threadData);
 
35
void T4_Callback_3(int result, NdbConnection * pCon, void * threadData);
 
36
void T5_Callback_1(int result, NdbConnection * pCon, void * threadData);
 
37
void T5_Callback_2(int result, NdbConnection * pCon, void * threadData);
 
38
void T5_Callback_3(int result, NdbConnection * pCon, void * threadData);
 
39
 
 
40
static int stat_async = 0;
 
41
 
 
42
/**
 
43
 * Transaction 1 - T1 
 
44
 *
 
45
 * Update location and changed by/time on a subscriber
 
46
 *
 
47
 * Input: 
 
48
 *   SubscriberNumber,
 
49
 *   Location,
 
50
 *   ChangedBy,
 
51
 *   ChangedTime
 
52
 *
 
53
 * Output:
 
54
 */
 
55
 
 
56
#define SFX_START (SUBSCRIBER_NUMBER_LENGTH - SUBSCRIBER_NUMBER_SUFFIX_LENGTH)
 
57
 
 
58
inline
 
59
NdbConnection *
 
60
startTransaction(Ndb * pNDB, ThreadData * td){
 
61
  return pNDB->startTransaction();
 
62
#ifdef OLD_CODE
 
63
  return pNDB->startTransactionDGroup (0, 
 
64
                                       &td->transactionData.number[SFX_START],
 
65
                                       1);
 
66
#endif
 
67
}
 
68
 
 
69
void
 
70
start_T1(Ndb * pNDB, ThreadData * td, int async){
 
71
 
 
72
  DEBUG2("T1(%.*s): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
 
73
         td->transactionData.number); 
 
74
 
 
75
  NdbConnection * pCON = 0;
 
76
  while((pCON = startTransaction(pNDB, td)) == 0){
 
77
    CHECK_ALLOWED_ERROR("T1: startTransaction", td, pNDB->getNdbError());
 
78
    NdbSleep_MilliSleep(10);
 
79
  }
 
80
 
 
81
  NdbOperation *MyOp = pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
82
  if (MyOp != NULL) {  
 
83
    MyOp->updateTuple();  
 
84
    MyOp->equal(IND_SUBSCRIBER_NUMBER, 
 
85
                td->transactionData.number);
 
86
    MyOp->setValue(IND_SUBSCRIBER_LOCATION, 
 
87
                   (char *)&td->transactionData.location);
 
88
    MyOp->setValue(IND_SUBSCRIBER_CHANGED_BY, 
 
89
                   td->transactionData.changed_by);
 
90
    MyOp->setValue(IND_SUBSCRIBER_CHANGED_TIME, 
 
91
                   td->transactionData.changed_time);
 
92
    if (async == 1) {
 
93
      pCON->executeAsynchPrepare( Commit , T1_Callback, td);
 
94
    } else {
 
95
      int result = pCON->execute(Commit);
 
96
      T1_Callback(result, pCON, (void*)td);
 
97
      return;
 
98
    }//if
 
99
  } else {
 
100
    CHECK_NULL(MyOp, "T1: getNdbOperation", td, pCON->getNdbError());
 
101
  }//if
 
102
}
 
103
 
 
104
void
 
105
T1_Callback(int result, NdbConnection * pCON, void * threadData) {
 
106
  ThreadData * td = (ThreadData *)threadData;
 
107
  
 
108
  DEBUG2("T1(%.*s): - Completing", SUBSCRIBER_NUMBER_LENGTH, 
 
109
         td->transactionData.number); 
 
110
 
 
111
  if (result == -1) {
 
112
    CHECK_ALLOWED_ERROR("T1: Commit", td, pCON->getNdbError());
 
113
    td->pNDB->closeTransaction(pCON);
 
114
    start_T1(td->pNDB, td, stat_async);
 
115
    return;
 
116
  }//if
 
117
  td->pNDB->closeTransaction(pCON);
 
118
  complete_T1(td);
 
119
}
 
120
 
 
121
/**
 
122
 * Transaction 2 - T2
 
123
 *
 
124
 * Read from Subscriber:
 
125
 *
 
126
 * Input: 
 
127
 *   SubscriberNumber
 
128
 *
 
129
 * Output:
 
130
 *   Location
 
131
 *   Changed by
 
132
 *   Changed Timestamp
 
133
 *   Name
 
134
 */
 
135
void
 
136
start_T2(Ndb * pNDB, ThreadData * td, int async){
 
137
 
 
138
  DEBUG3("T2(%.*s, %d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
 
139
         td->transactionData.number, 
 
140
         td->transactionData.location);
 
141
  
 
142
  NdbConnection * pCON = 0;
 
143
  
 
144
  while((pCON = startTransaction(pNDB, td)) == 0){
 
145
    CHECK_ALLOWED_ERROR("T2-1: startTransaction", td, pNDB->getNdbError());
 
146
    NdbSleep_MilliSleep(10);
 
147
  }
 
148
 
 
149
  NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
150
  CHECK_NULL(MyOp, "T2: getNdbOperation", td,
 
151
             pCON->getNdbError());
 
152
  
 
153
  MyOp->readTuple();
 
154
  MyOp->equal(IND_SUBSCRIBER_NUMBER,
 
155
              td->transactionData.number);
 
156
  MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
 
157
                 (char *)&td->transactionData.location);
 
158
  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
 
159
                 td->transactionData.changed_by);
 
160
  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
 
161
                 td->transactionData.changed_time);
 
162
  MyOp->getValue(IND_SUBSCRIBER_NAME, 
 
163
                 td->transactionData.name);
 
164
  if (async == 1) {
 
165
    pCON->executeAsynchPrepare( Commit , T2_Callback, td);
 
166
  } else {
 
167
    int result = pCON->execute(Commit);
 
168
    T2_Callback(result, pCON, (void*)td);
 
169
    return;
 
170
  }//if
 
171
}
 
172
 
 
173
void
 
174
T2_Callback(int result, NdbConnection * pCON, void * threadData){
 
175
  ThreadData * td = (ThreadData *)threadData;
 
176
  DEBUG3("T2(%.*s, %d): - Completing", SUBSCRIBER_NUMBER_LENGTH, 
 
177
         td->transactionData.number, 
 
178
         td->transactionData.location);
 
179
  
 
180
  if (result == -1) {
 
181
    CHECK_ALLOWED_ERROR("T2: Commit", td, pCON->getNdbError());
 
182
    td->pNDB->closeTransaction(pCON);
 
183
    start_T2(td->pNDB, td, stat_async);
 
184
    return;
 
185
  }//if
 
186
  td->pNDB->closeTransaction(pCON);
 
187
  complete_T2(td);
 
188
}
 
189
 
 
190
/**
 
191
 * Transaction 3 - T3
 
192
 *
 
193
 * Read session details
 
194
 *
 
195
 * Input:
 
196
 *   SubscriberNumber
 
197
 *   ServerId
 
198
 *   ServerBit
 
199
 *
 
200
 * Output:
 
201
 *   BranchExecuted
 
202
 *   SessionDetails
 
203
 *   ChangedBy
 
204
 *   ChangedTime
 
205
 *   Location
 
206
 */
 
207
void
 
208
start_T3(Ndb * pNDB, ThreadData * td, int async){
 
209
 
 
210
  DEBUG3("T3(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
 
211
         td->transactionData.number, 
 
212
         td->transactionData.server_id);
 
213
  
 
214
  NdbConnection * pCON = 0;
 
215
 
 
216
  while((pCON = startTransaction(pNDB, td)) == 0){
 
217
    CHECK_ALLOWED_ERROR("T3-1: startTransaction", td, pNDB->getNdbError());
 
218
    NdbSleep_MilliSleep(10);
 
219
  }
 
220
  
 
221
  NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
222
  CHECK_NULL(MyOp, "T3-1: getNdbOperation", td,
 
223
             pCON->getNdbError());
 
224
  
 
225
  MyOp->readTuple();
 
226
  MyOp->equal(IND_SUBSCRIBER_NUMBER, 
 
227
              td->transactionData.number);
 
228
  MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
 
229
                 (char *)&td->transactionData.location);
 
230
  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
 
231
                 td->transactionData.changed_by);
 
232
  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
 
233
                 td->transactionData.changed_time);
 
234
  MyOp->getValue(IND_SUBSCRIBER_GROUP, 
 
235
                 (char *)&td->transactionData.group_id);
 
236
  MyOp->getValue(IND_SUBSCRIBER_SESSIONS, 
 
237
                 (char *)&td->transactionData.sessions);
 
238
  stat_async = async;
 
239
  if (async == 1) {
 
240
    pCON->executeAsynchPrepare( NoCommit , T3_Callback_1, td);
 
241
  } else {
 
242
    int result = pCON->execute( NoCommit );
 
243
    T3_Callback_1(result, pCON, (void*)td);
 
244
    return;
 
245
  }//if
 
246
}
 
247
 
 
248
void
 
249
T3_Callback_1(int result, NdbConnection * pCON, void * threadData){
 
250
  ThreadData * td = (ThreadData *)threadData;
 
251
  DEBUG3("T3(%.*s, %.2d): - Callback 1", SUBSCRIBER_NUMBER_LENGTH, 
 
252
         td->transactionData.number, 
 
253
         td->transactionData.server_id);
 
254
 
 
255
  if (result == -1) {
 
256
    CHECK_ALLOWED_ERROR("T3-1: execute", td, pCON->getNdbError());
 
257
    td->pNDB->closeTransaction(pCON);
 
258
    start_T3(td->pNDB, td, stat_async);
 
259
    return;
 
260
  }//if
 
261
 
 
262
  NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
 
263
  CHECK_NULL(MyOp, "T3-2: getNdbOperation", td,
 
264
             pCON->getNdbError());
 
265
    
 
266
  MyOp->readTuple();
 
267
  MyOp->equal(IND_GROUP_ID,
 
268
              (char*)&td->transactionData.group_id);
 
269
  MyOp->getValue(IND_GROUP_ALLOW_READ, 
 
270
                 (char *)&td->transactionData.permission);
 
271
  if (stat_async == 1) {
 
272
    pCON->executeAsynchPrepare( NoCommit , T3_Callback_2, td);
 
273
  } else {
 
274
    int result = pCON->execute( NoCommit );
 
275
    T3_Callback_2(result, pCON, (void*)td);
 
276
    return;
 
277
  }//if
 
278
}
 
279
 
 
280
void
 
281
T3_Callback_2(int result, NdbConnection * pCON, void * threadData){
 
282
  ThreadData * td = (ThreadData *)threadData;
 
283
  
 
284
  if (result == -1) {
 
285
    CHECK_ALLOWED_ERROR("T3-2: execute", td, pCON->getNdbError());
 
286
    td->pNDB->closeTransaction(pCON);
 
287
    start_T3(td->pNDB, td, stat_async);
 
288
    return;
 
289
  }//if
 
290
  
 
291
  Uint32 permission = td->transactionData.permission;
 
292
  Uint32 sessions   = td->transactionData.sessions;
 
293
  Uint32 server_bit = td->transactionData.server_bit;
 
294
 
 
295
  if(((permission & server_bit) == server_bit) &&
 
296
     ((sessions   & server_bit) == server_bit)){
 
297
    
 
298
    memcpy(td->transactionData.suffix,
 
299
           &td->transactionData.number[SFX_START],
 
300
           SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
 
301
    DEBUG5("T3(%.*s, %.2d): - Callback 2 - reading(%.*s)", 
 
302
           SUBSCRIBER_NUMBER_LENGTH, 
 
303
           td->transactionData.number, 
 
304
           td->transactionData.server_id,
 
305
           SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
 
306
           td->transactionData.suffix);
 
307
    
 
308
    /* Operation 3 */
 
309
    NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
 
310
    CHECK_NULL(MyOp, "T3-3: getNdbOperation", td,
 
311
               pCON->getNdbError());
 
312
    
 
313
    MyOp->simpleRead();
 
314
    MyOp->equal(IND_SESSION_SUBSCRIBER,
 
315
                (char*)td->transactionData.number);
 
316
    MyOp->equal(IND_SESSION_SERVER,
 
317
                (char*)&td->transactionData.server_id);
 
318
    MyOp->getValue(IND_SESSION_DATA, 
 
319
                   (char *)td->transactionData.session_details);
 
320
    
 
321
    /* Operation 4 */
 
322
    MyOp = pCON->getNdbOperation(SERVER_TABLE);
 
323
    CHECK_NULL(MyOp, "T3-4: getNdbOperation", td,
 
324
               pCON->getNdbError());
 
325
    
 
326
    MyOp->interpretedUpdateTuple();
 
327
    MyOp->equal(IND_SERVER_ID,
 
328
                (char*)&td->transactionData.server_id);
 
329
    MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
 
330
                (char*)td->transactionData.suffix);
 
331
    MyOp->incValue(IND_SERVER_READS, (uint32)1);
 
332
    td->transactionData.branchExecuted = 1;
 
333
  } else {
 
334
    DEBUG3("T3(%.*s, %.2d): - Callback 2 - no read",
 
335
           SUBSCRIBER_NUMBER_LENGTH, 
 
336
           td->transactionData.number, 
 
337
           td->transactionData.server_id);
 
338
    td->transactionData.branchExecuted = 0;
 
339
  }
 
340
  if (stat_async == 1) {
 
341
    pCON->executeAsynchPrepare( Commit , T3_Callback_3, td);
 
342
  } else {
 
343
    int result = pCON->execute( Commit );
 
344
    T3_Callback_3(result, pCON, (void*)td);
 
345
    return;
 
346
  }//if
 
347
}
 
348
 
 
349
void
 
350
T3_Callback_3(int result, NdbConnection * pCON, void * threadData){
 
351
  ThreadData * td = (ThreadData *)threadData;  
 
352
  DEBUG3("T3(%.*s, %.2d): - Completing", SUBSCRIBER_NUMBER_LENGTH, 
 
353
         td->transactionData.number, 
 
354
         td->transactionData.server_id);
 
355
  
 
356
  if (result == -1) {
 
357
    CHECK_ALLOWED_ERROR("T3-3: Commit", td, pCON->getNdbError());
 
358
    td->pNDB->closeTransaction(pCON);
 
359
    start_T3(td->pNDB, td, stat_async);
 
360
    return;
 
361
  }//if
 
362
  td->pNDB->closeTransaction(pCON);
 
363
  complete_T3(td);
 
364
}
 
365
 
 
366
/**
 
367
 * Transaction 4 - T4
 
368
 * 
 
369
 * Create session
 
370
 *
 
371
 * Input:
 
372
 *   SubscriberNumber
 
373
 *   ServerId
 
374
 *   ServerBit
 
375
 *   SessionDetails,
 
376
 *   DoRollback
 
377
 * Output:
 
378
 *   ChangedBy
 
379
 *   ChangedTime
 
380
 *   Location
 
381
 *   BranchExecuted
 
382
 */
 
383
void
 
384
start_T4(Ndb * pNDB, ThreadData * td, int async){
 
385
 
 
386
  DEBUG3("T4(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
 
387
         td->transactionData.number, 
 
388
         td->transactionData.server_id);
 
389
  
 
390
  NdbConnection * pCON = 0;
 
391
  while((pCON = startTransaction(pNDB, td)) == 0){
 
392
    CHECK_ALLOWED_ERROR("T4-1: startTransaction", td, pNDB->getNdbError());
 
393
    NdbSleep_MilliSleep(10);
 
394
  }
 
395
  
 
396
  NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
397
  CHECK_NULL(MyOp, "T4-1: getNdbOperation", td,
 
398
             pCON->getNdbError());
 
399
  
 
400
  MyOp->interpretedUpdateTuple();
 
401
  MyOp->equal(IND_SUBSCRIBER_NUMBER, 
 
402
              td->transactionData.number);
 
403
  MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
 
404
                 (char *)&td->transactionData.location);
 
405
  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
 
406
                 td->transactionData.changed_by);
 
407
  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
 
408
                 td->transactionData.changed_time);
 
409
  MyOp->getValue(IND_SUBSCRIBER_GROUP,
 
410
                 (char *)&td->transactionData.group_id);
 
411
  MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
 
412
                 (char *)&td->transactionData.sessions); 
 
413
  MyOp->incValue(IND_SUBSCRIBER_SESSIONS, 
 
414
                 (uint32)td->transactionData.server_bit);
 
415
  stat_async = async;
 
416
  if (async == 1) {
 
417
    pCON->executeAsynchPrepare( NoCommit , T4_Callback_1, td);
 
418
  } else {
 
419
    int result = pCON->execute( NoCommit );
 
420
    T4_Callback_1(result, pCON, (void*)td);
 
421
    return;
 
422
  }//if
 
423
}
 
424
 
 
425
void
 
426
T4_Callback_1(int result, NdbConnection * pCON, void * threadData){
 
427
  ThreadData * td = (ThreadData *)threadData;  
 
428
  if (result == -1) {
 
429
    CHECK_ALLOWED_ERROR("T4-1: execute", td, pCON->getNdbError());
 
430
    td->pNDB->closeTransaction(pCON);
 
431
    start_T4(td->pNDB, td, stat_async);
 
432
    return;
 
433
  }//if
 
434
  
 
435
  DEBUG3("T4(%.*s, %.2d): - Callback 1", 
 
436
         SUBSCRIBER_NUMBER_LENGTH, 
 
437
         td->transactionData.number, 
 
438
         td->transactionData.server_id);
 
439
 
 
440
 
 
441
  NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
 
442
  CHECK_NULL(MyOp, "T4-2: getNdbOperation", td,
 
443
             pCON->getNdbError());
 
444
  
 
445
  MyOp->readTuple();
 
446
  MyOp->equal(IND_GROUP_ID,
 
447
              (char*)&td->transactionData.group_id);
 
448
  MyOp->getValue(IND_GROUP_ALLOW_INSERT, 
 
449
                 (char *)&td->transactionData.permission);
 
450
  if (stat_async == 1) {
 
451
    pCON->executeAsynchPrepare( NoCommit , T4_Callback_2, td);
 
452
  } else {
 
453
    int result = pCON->execute( NoCommit );
 
454
    T4_Callback_2(result, pCON, (void*)td);
 
455
    return;
 
456
  }//if
 
457
}
 
458
 
 
459
void
 
460
T4_Callback_2(int result, NdbConnection * pCON, void * threadData){
 
461
  ThreadData * td = (ThreadData *)threadData;  
 
462
  if (result == -1) {
 
463
    CHECK_ALLOWED_ERROR("T4-2: execute", td, pCON->getNdbError());
 
464
    td->pNDB->closeTransaction(pCON);
 
465
    start_T4(td->pNDB, td, stat_async);
 
466
    return;
 
467
  }//if
 
468
 
 
469
  Uint32 permission = td->transactionData.permission;
 
470
  Uint32 sessions   = td->transactionData.sessions;
 
471
  Uint32 server_bit = td->transactionData.server_bit;
 
472
  
 
473
  if(((permission & server_bit) == server_bit) &&
 
474
     ((sessions   & server_bit) == 0)){
 
475
    
 
476
    memcpy(td->transactionData.suffix,
 
477
           &td->transactionData.number[SFX_START],
 
478
           SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
 
479
    
 
480
    DEBUG5("T4(%.*s, %.2d): - Callback 2 - inserting(%.*s)", 
 
481
           SUBSCRIBER_NUMBER_LENGTH, 
 
482
           td->transactionData.number, 
 
483
           td->transactionData.server_id,
 
484
           SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
 
485
           td->transactionData.suffix);
 
486
    
 
487
    /* Operation 3 */
 
488
    
 
489
    NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
 
490
    CHECK_NULL(MyOp, "T4-3: getNdbOperation", td,
 
491
               pCON->getNdbError());
 
492
    
 
493
    MyOp->insertTuple();
 
494
    MyOp->equal(IND_SESSION_SUBSCRIBER,
 
495
                (char*)td->transactionData.number);
 
496
    MyOp->equal(IND_SESSION_SERVER,
 
497
                (char*)&td->transactionData.server_id);
 
498
    MyOp->setValue(SESSION_DATA, 
 
499
                   (char *)td->transactionData.session_details);
 
500
    /* Operation 4 */
 
501
    
 
502
    /* Operation 5 */
 
503
    MyOp = pCON->getNdbOperation(SERVER_TABLE);
 
504
    CHECK_NULL(MyOp, "T4-5: getNdbOperation", td,
 
505
               pCON->getNdbError());
 
506
    
 
507
    MyOp->interpretedUpdateTuple();
 
508
    MyOp->equal(IND_SERVER_ID,
 
509
                (char*)&td->transactionData.server_id);
 
510
    MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
 
511
                (char*)td->transactionData.suffix);
 
512
    MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);
 
513
    td->transactionData.branchExecuted = 1;
 
514
  } else {
 
515
    td->transactionData.branchExecuted = 0;
 
516
    DEBUG5("T4(%.*s, %.2d): - Callback 2 - %s %s",
 
517
           SUBSCRIBER_NUMBER_LENGTH, 
 
518
           td->transactionData.number, 
 
519
           td->transactionData.server_id,
 
520
           ((permission & server_bit) ? 
 
521
            "permission - " : "no permission - "),
 
522
           ((sessions   & server_bit) ? 
 
523
            "in session - " : "no in session - "));
 
524
  }
 
525
  
 
526
  if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
 
527
    if (stat_async == 1) {
 
528
      pCON->executeAsynchPrepare( Commit , T4_Callback_3, td);
 
529
    } else {
 
530
      int result = pCON->execute( Commit );
 
531
      T4_Callback_3(result, pCON, (void*)td);
 
532
      return;
 
533
    }//if
 
534
  } else {
 
535
    if (stat_async == 1) {
 
536
      pCON->executeAsynchPrepare( Rollback , T4_Callback_3, td);
 
537
    } else {
 
538
      int result = pCON->execute( Rollback );
 
539
      T4_Callback_3(result, pCON, (void*)td);
 
540
      return;
 
541
    }//if
 
542
  }
 
543
}
 
544
 
 
545
void
 
546
T4_Callback_3(int result, NdbConnection * pCON, void * threadData){
 
547
  ThreadData * td = (ThreadData *)threadData;  
 
548
  if (result == -1) {
 
549
    CHECK_ALLOWED_ERROR("T4-3: Commit", td, pCON->getNdbError());
 
550
    td->pNDB->closeTransaction(pCON);
 
551
    start_T4(td->pNDB, td, stat_async);
 
552
    return;
 
553
  }//if
 
554
  
 
555
  DEBUG3("T4(%.*s, %.2d): - Completing", 
 
556
         SUBSCRIBER_NUMBER_LENGTH, 
 
557
         td->transactionData.number, 
 
558
         td->transactionData.server_id);
 
559
 
 
560
  td->pNDB->closeTransaction(pCON);
 
561
  complete_T4(td);
 
562
}
 
563
 
 
564
/**
 
565
 * Transaction 5 - T5
 
566
 * 
 
567
 * Delete session
 
568
 *
 
569
 * Input:
 
570
 *   SubscriberNumber
 
571
 *   ServerId
 
572
 *   ServerBit
 
573
 *   DoRollback
 
574
 * Output:
 
575
 *   ChangedBy
 
576
 *   ChangedTime
 
577
 *   Location
 
578
 *   BranchExecuted
 
579
 */
 
580
void
 
581
start_T5(Ndb * pNDB, ThreadData * td, int async){
 
582
 
 
583
  DEBUG3("T5(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
 
584
         td->transactionData.number, 
 
585
         td->transactionData.server_id);
 
586
 
 
587
  NdbConnection * pCON = 0;
 
588
  while((pCON = startTransaction(pNDB, td)) == 0){
 
589
    CHECK_ALLOWED_ERROR("T5-1: startTransaction", td, pNDB->getNdbError());
 
590
    NdbSleep_MilliSleep(10);
 
591
  }
 
592
  
 
593
  NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
594
  CHECK_NULL(MyOp, "T5-1: getNdbOperation", td,
 
595
             pCON->getNdbError());
 
596
  
 
597
  MyOp->interpretedUpdateTuple();
 
598
  MyOp->equal(IND_SUBSCRIBER_NUMBER, 
 
599
              td->transactionData.number);
 
600
  MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
 
601
                 (char *)&td->transactionData.location);
 
602
  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
 
603
                 td->transactionData.changed_by);
 
604
  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
 
605
                 td->transactionData.changed_time);
 
606
  MyOp->getValue(IND_SUBSCRIBER_GROUP,
 
607
                 (char *)&td->transactionData.group_id);
 
608
  MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
 
609
                 (char *)&td->transactionData.sessions);
 
610
  MyOp->subValue(IND_SUBSCRIBER_SESSIONS, 
 
611
                 (uint32)td->transactionData.server_bit);
 
612
  stat_async = async;
 
613
  if (async == 1) {
 
614
    pCON->executeAsynchPrepare( NoCommit , T5_Callback_1, td);
 
615
  } else {
 
616
    int result = pCON->execute( NoCommit );
 
617
    T5_Callback_1(result, pCON, (void*)td);
 
618
    return;
 
619
  }//if
 
620
}
 
621
 
 
622
void
 
623
T5_Callback_1(int result, NdbConnection * pCON, void * threadData){
 
624
  ThreadData * td = (ThreadData *)threadData;  
 
625
  if (result == -1) {
 
626
    CHECK_ALLOWED_ERROR("T5-1: execute", td, pCON->getNdbError());
 
627
    td->pNDB->closeTransaction(pCON);
 
628
    start_T5(td->pNDB, td, stat_async);
 
629
    return;
 
630
  }//if
 
631
 
 
632
  DEBUG3("T5(%.*s, %.2d): - Callback 1", 
 
633
         SUBSCRIBER_NUMBER_LENGTH, 
 
634
         td->transactionData.number, 
 
635
         td->transactionData.server_id);
 
636
  
 
637
  NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
 
638
  CHECK_NULL(MyOp, "T5-2: getNdbOperation", td,
 
639
             pCON->getNdbError());
 
640
  
 
641
  MyOp->readTuple();
 
642
  MyOp->equal(IND_GROUP_ID,
 
643
              (char*)&td->transactionData.group_id);
 
644
  MyOp->getValue(IND_GROUP_ALLOW_DELETE, 
 
645
                 (char *)&td->transactionData.permission);
 
646
  if (stat_async == 1) {
 
647
    pCON->executeAsynchPrepare( NoCommit , T5_Callback_2, td);
 
648
  } else {
 
649
    int result = pCON->execute( NoCommit );
 
650
    T5_Callback_2(result, pCON, (void*)td);
 
651
    return;
 
652
  }//if
 
653
}
 
654
 
 
655
void
 
656
T5_Callback_2(int result, NdbConnection * pCON, void * threadData){
 
657
  ThreadData * td = (ThreadData *)threadData;  
 
658
  if (result == -1) {
 
659
    CHECK_ALLOWED_ERROR("T5-2: execute", td, pCON->getNdbError());
 
660
    td->pNDB->closeTransaction(pCON);
 
661
    start_T5(td->pNDB, td, stat_async);
 
662
    return;
 
663
  }//if
 
664
 
 
665
  Uint32 permission = td->transactionData.permission;
 
666
  Uint32 sessions   = td->transactionData.sessions;
 
667
  Uint32 server_bit = td->transactionData.server_bit;
 
668
 
 
669
  if(((permission & server_bit) == server_bit) &&
 
670
     ((sessions   & server_bit) == server_bit)){
 
671
    
 
672
    memcpy(td->transactionData.suffix,
 
673
           &td->transactionData.number[SFX_START],
 
674
           SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
 
675
    
 
676
    DEBUG5("T5(%.*s, %.2d): - Callback 2 - deleting(%.*s)", 
 
677
           SUBSCRIBER_NUMBER_LENGTH, 
 
678
           td->transactionData.number, 
 
679
           td->transactionData.server_id,
 
680
           SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
 
681
           td->transactionData.suffix);
 
682
    
 
683
    /* Operation 3 */
 
684
    NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
 
685
    CHECK_NULL(MyOp, "T5-3: getNdbOperation", td,
 
686
               pCON->getNdbError());
 
687
    
 
688
    MyOp->deleteTuple();
 
689
    MyOp->equal(IND_SESSION_SUBSCRIBER,
 
690
                (char*)td->transactionData.number);
 
691
    MyOp->equal(IND_SESSION_SERVER,
 
692
                (char*)&td->transactionData.server_id);
 
693
    /* Operation 4 */
 
694
    
 
695
    /* Operation 5 */
 
696
    MyOp = pCON->getNdbOperation(SERVER_TABLE);
 
697
    CHECK_NULL(MyOp, "T5-5: getNdbOperation", td,
 
698
               pCON->getNdbError());
 
699
    
 
700
    MyOp->interpretedUpdateTuple();
 
701
    MyOp->equal(IND_SERVER_ID,
 
702
                (char*)&td->transactionData.server_id);
 
703
    MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
 
704
                (char*)td->transactionData.suffix);
 
705
    MyOp->incValue(IND_SERVER_DELETES, (uint32)1);
 
706
    td->transactionData.branchExecuted = 1;
 
707
  } else {
 
708
    td->transactionData.branchExecuted = 0;
 
709
 
 
710
    DEBUG5("T5(%.*s, %.2d): - Callback 2 - no delete - %s %s", 
 
711
           SUBSCRIBER_NUMBER_LENGTH, 
 
712
           td->transactionData.number, 
 
713
           td->transactionData.server_id,
 
714
           ((permission & server_bit) ? 
 
715
            "permission - " : "no permission - "),
 
716
           ((sessions   & server_bit) ? 
 
717
            "in session - " : "no in session - "));
 
718
  }
 
719
  
 
720
  if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
 
721
    if (stat_async == 1) {
 
722
      pCON->executeAsynchPrepare( Commit , T5_Callback_3, td);
 
723
    } else {
 
724
      int result = pCON->execute( Commit );
 
725
      T5_Callback_3(result, pCON, (void*)td);
 
726
      return;
 
727
    }//if
 
728
  } else {
 
729
    if (stat_async == 1) {
 
730
      pCON->executeAsynchPrepare( Rollback , T5_Callback_3, td);
 
731
    } else {
 
732
      int result = pCON->execute( Rollback );
 
733
      T5_Callback_3(result, pCON, (void*)td);
 
734
      return;
 
735
    }//if
 
736
  }
 
737
}
 
738
 
 
739
void
 
740
T5_Callback_3(int result, NdbConnection * pCON, void * threadData){
 
741
  ThreadData * td = (ThreadData *)threadData;  
 
742
  if (result == -1) {
 
743
    CHECK_ALLOWED_ERROR("T5-3: Commit", td, pCON->getNdbError());
 
744
    td->pNDB->closeTransaction(pCON);
 
745
    start_T5(td->pNDB, td, stat_async);
 
746
    return;
 
747
  }//if
 
748
  
 
749
  DEBUG3("T5(%.*s, %.2d): - Completing", 
 
750
         SUBSCRIBER_NUMBER_LENGTH, 
 
751
         td->transactionData.number, 
 
752
         td->transactionData.server_id);
 
753
  
 
754
  td->pNDB->closeTransaction(pCON);
 
755
  complete_T5(td);
 
756
}