~ubuntu-branches/ubuntu/trusty/mysql-5.6/trusty

« back to all changes in this revision

Viewing changes to storage/ndb/test/ndbapi/bench/ndb_async2.cpp

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-02-12 11:54:27 UTC
  • Revision ID: package-import@ubuntu.com-20140212115427-oq6tfsqxl1wuwehi
Tags: upstream-5.6.15
ImportĀ upstreamĀ versionĀ 5.6.15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
   Copyright (C) 2005, 2006, 2008 MySQL AB, 2009 Sun Microsystems, Inc.
 
3
    All rights reserved. Use is subject to license terms.
 
4
 
 
5
   This program is free software; you can redistribute it and/or modify
 
6
   it under the terms of the GNU General Public License as published by
 
7
   the Free Software Foundation; version 2 of the License.
 
8
 
 
9
   This program is distributed in the hope that it will be useful,
 
10
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
   GNU General Public License for more details.
 
13
 
 
14
   You should have received a copy of the GNU General Public License
 
15
   along with this program; if not, write to the Free Software
 
16
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
 
17
*/
 
18
 
 
19
//#define DEBUG_ON
 
20
 
 
21
#include <string.h>
 
22
#include "userInterface.h"
 
23
 
 
24
#include "macros.h"
 
25
#include "ndb_schema.hpp"
 
26
#include "ndb_error.hpp"
 
27
#include <NdbSleep.h>
 
28
 
 
29
#include <NdbApi.hpp>
 
30
 
 
31
void T1_Callback(int result, NdbConnection * pCon, void * threadData);
 
32
void T2_Callback(int result, NdbConnection * pCon, void * threadData);
 
33
void T3_Callback_1(int result, NdbConnection * pCon, void * threadData);
 
34
void T3_Callback_2(int result, NdbConnection * pCon, void * threadData);
 
35
void T3_Callback_3(int result, NdbConnection * pCon, void * threadData);
 
36
void T4_Callback_1(int result, NdbConnection * pCon, void * threadData);
 
37
void T4_Callback_2(int result, NdbConnection * pCon, void * threadData);
 
38
void T4_Callback_3(int result, NdbConnection * pCon, void * threadData);
 
39
void T5_Callback_1(int result, NdbConnection * pCon, void * threadData);
 
40
void T5_Callback_2(int result, NdbConnection * pCon, void * threadData);
 
41
void T5_Callback_3(int result, NdbConnection * pCon, void * threadData);
 
42
 
 
43
static int stat_async = 0;
 
44
 
 
45
/**
 
46
 * Transaction 1 - T1 
 
47
 *
 
48
 * Update location and changed by/time on a subscriber
 
49
 *
 
50
 * Input: 
 
51
 *   SubscriberNumber,
 
52
 *   Location,
 
53
 *   ChangedBy,
 
54
 *   ChangedTime
 
55
 *
 
56
 * Output:
 
57
 */
 
58
 
 
59
#define SFX_START (SUBSCRIBER_NUMBER_LENGTH - SUBSCRIBER_NUMBER_SUFFIX_LENGTH)
 
60
 
 
61
inline
 
62
NdbConnection *
 
63
startTransaction(Ndb * pNDB, ThreadData * td){
 
64
  return pNDB->startTransaction();
 
65
#ifdef OLD_CODE
 
66
  return pNDB->startTransactionDGroup (0, 
 
67
                                       &td->transactionData.number[SFX_START],
 
68
                                       1);
 
69
#endif
 
70
}
 
71
 
 
72
// NdbRecord helper macros
 
73
#define SET_MASK(mask, attrId)                         \
 
74
  mask[attrId >> 3] |= (1 << (attrId & 7))
 
75
 
 
76
void
 
77
start_T1(Ndb * pNDB, ThreadData * td, int async){
 
78
 
 
79
  DEBUG2("T1(%.*s): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
 
80
         td->transactionData.number); 
 
81
 
 
82
  NdbConnection * pCON = 0;
 
83
  while((pCON = startTransaction(pNDB, td)) == 0){
 
84
    CHECK_ALLOWED_ERROR("T1: startTransaction", td, pNDB->getNdbError());
 
85
    NdbSleep_MilliSleep(10);
 
86
  }
 
87
 
 
88
  const NdbOperation* op= NULL;
 
89
 
 
90
  if (td->ndbRecordSharedData)
 
91
  {
 
92
    char* rowPtr= (char*) &td->transactionData;
 
93
    const NdbRecord* record= td->ndbRecordSharedData->
 
94
      subscriberTableNdbRecord;
 
95
    Uint32 m=0;
 
96
    unsigned char* mask= (unsigned char*) &m;
 
97
 
 
98
    //SET_MASK(mask, IND_SUBSCRIBER_NUMBER);
 
99
    SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
 
100
    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
 
101
    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
 
102
    
 
103
    op= pCON->updateTuple(record,
 
104
                          rowPtr,
 
105
                          record,
 
106
                          rowPtr,
 
107
                          mask);
 
108
  }
 
109
  else
 
110
  {
 
111
    NdbOperation *MyOp = pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
112
    op= MyOp;
 
113
    if (MyOp != NULL) {  
 
114
      MyOp->updateTuple();  
 
115
      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
 
116
                  td->transactionData.number);
 
117
      MyOp->setValue(IND_SUBSCRIBER_LOCATION, 
 
118
                     (char *)&td->transactionData.location);
 
119
      MyOp->setValue(IND_SUBSCRIBER_CHANGED_BY, 
 
120
                     td->transactionData.changed_by);
 
121
      MyOp->setValue(IND_SUBSCRIBER_CHANGED_TIME, 
 
122
                     td->transactionData.changed_time);
 
123
    }
 
124
  }
 
125
 
 
126
  if (op != NULL)
 
127
  {
 
128
    if (async == 1) {
 
129
      pCON->executeAsynchPrepare( Commit , T1_Callback, td);
 
130
    } else {
 
131
      int result = pCON->execute(Commit);
 
132
      T1_Callback(result, pCON, (void*)td);
 
133
      return;
 
134
    }//if
 
135
  } else {
 
136
    CHECK_NULL(NULL, "T1: getNdbOperation", td, pCON->getNdbError());
 
137
  }//if
 
138
}
 
139
 
 
140
void
 
141
T1_Callback(int result, NdbConnection * pCON, void * threadData) {
 
142
  ThreadData * td = (ThreadData *)threadData;
 
143
  
 
144
  DEBUG2("T1(%.*s): - Completing", SUBSCRIBER_NUMBER_LENGTH, 
 
145
         td->transactionData.number); 
 
146
 
 
147
  if (result == -1) {
 
148
    CHECK_ALLOWED_ERROR("T1: Commit", td, pCON->getNdbError());
 
149
    td->pNDB->closeTransaction(pCON);
 
150
    start_T1(td->pNDB, td, stat_async);
 
151
    return;
 
152
  }//if
 
153
  td->pNDB->closeTransaction(pCON);
 
154
  complete_T1(td);
 
155
}
 
156
 
 
157
/**
 
158
 * Transaction 2 - T2
 
159
 *
 
160
 * Read from Subscriber:
 
161
 *
 
162
 * Input: 
 
163
 *   SubscriberNumber
 
164
 *
 
165
 * Output:
 
166
 *   Location
 
167
 *   Changed by
 
168
 *   Changed Timestamp
 
169
 *   Name
 
170
 */
 
171
void
 
172
start_T2(Ndb * pNDB, ThreadData * td, int async){
 
173
 
 
174
  DEBUG3("T2(%.*s, %d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
 
175
         td->transactionData.number, 
 
176
         td->transactionData.location);
 
177
  
 
178
  NdbConnection * pCON = 0;
 
179
  
 
180
  while((pCON = startTransaction(pNDB, td)) == 0){
 
181
    CHECK_ALLOWED_ERROR("T2-1: startTransaction", td, pNDB->getNdbError());
 
182
    NdbSleep_MilliSleep(10);
 
183
  }
 
184
 
 
185
  if (td->ndbRecordSharedData)
 
186
  {
 
187
    char* rowPtr= (char*) &td->transactionData;
 
188
    const NdbRecord* record= td->ndbRecordSharedData->
 
189
      subscriberTableNdbRecord;
 
190
    Uint32 m=0;
 
191
    unsigned char* mask= (unsigned char*) &m;
 
192
 
 
193
    SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
 
194
    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
 
195
    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
 
196
    SET_MASK(mask, IND_SUBSCRIBER_NAME);
 
197
 
 
198
    const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr, 
 
199
                                              NdbOperation::LM_Read, mask);
 
200
    CHECK_NULL((void*) MyOp, "T2: readTuple", td,
 
201
               pCON->getNdbError());
 
202
  }
 
203
  else
 
204
  {
 
205
    NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
206
    CHECK_NULL(MyOp, "T2: getNdbOperation", td,
 
207
               pCON->getNdbError());
 
208
    
 
209
    MyOp->readTuple();
 
210
    MyOp->equal(IND_SUBSCRIBER_NUMBER,
 
211
                td->transactionData.number);
 
212
    MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
 
213
                   (char *)&td->transactionData.location);
 
214
    MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
 
215
                   td->transactionData.changed_by);
 
216
    MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
 
217
                   td->transactionData.changed_time);
 
218
    MyOp->getValue(IND_SUBSCRIBER_NAME, 
 
219
                   td->transactionData.name);
 
220
  }
 
221
 
 
222
  if (async == 1) {
 
223
    pCON->executeAsynchPrepare( Commit , T2_Callback, td);
 
224
  } else {
 
225
    int result = pCON->execute(Commit);
 
226
    T2_Callback(result, pCON, (void*)td);
 
227
    return;
 
228
  }//if
 
229
}
 
230
 
 
231
void
 
232
T2_Callback(int result, NdbConnection * pCON, void * threadData){
 
233
  ThreadData * td = (ThreadData *)threadData;
 
234
  DEBUG3("T2(%.*s, %d): - Completing", SUBSCRIBER_NUMBER_LENGTH, 
 
235
         td->transactionData.number, 
 
236
         td->transactionData.location);
 
237
  
 
238
  if (result == -1) {
 
239
    CHECK_ALLOWED_ERROR("T2: Commit", td, pCON->getNdbError());
 
240
    td->pNDB->closeTransaction(pCON);
 
241
    start_T2(td->pNDB, td, stat_async);
 
242
    return;
 
243
  }//if
 
244
 
 
245
  td->pNDB->closeTransaction(pCON);
 
246
  complete_T2(td);
 
247
}
 
248
 
 
249
/**
 
250
 * Transaction 3 - T3
 
251
 *
 
252
 * Read session details
 
253
 *
 
254
 * Input:
 
255
 *   SubscriberNumber
 
256
 *   ServerId
 
257
 *   ServerBit
 
258
 *
 
259
 * Output:
 
260
 *   BranchExecuted
 
261
 *   SessionDetails
 
262
 *   ChangedBy
 
263
 *   ChangedTime
 
264
 *   Location
 
265
 */
 
266
void
 
267
start_T3(Ndb * pNDB, ThreadData * td, int async){
 
268
 
 
269
  DEBUG3("T3(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
 
270
         td->transactionData.number, 
 
271
         td->transactionData.server_id);
 
272
  
 
273
  NdbConnection * pCON = 0;
 
274
 
 
275
  while((pCON = startTransaction(pNDB, td)) == 0){
 
276
    CHECK_ALLOWED_ERROR("T3-1: startTransaction", td, pNDB->getNdbError());
 
277
    NdbSleep_MilliSleep(10);
 
278
  }
 
279
 
 
280
  const NdbOperation* op;
 
281
 
 
282
  if (td->ndbRecordSharedData)
 
283
  {
 
284
    char* rowPtr= (char*) &td->transactionData;
 
285
    const NdbRecord* record= td->ndbRecordSharedData->
 
286
      subscriberTableNdbRecord;
 
287
    Uint32 m=0;
 
288
    unsigned char* mask= (unsigned char*) &m;
 
289
 
 
290
    SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
 
291
    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
 
292
    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
 
293
    SET_MASK(mask, IND_SUBSCRIBER_GROUP);
 
294
    SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
 
295
 
 
296
    op= pCON->readTuple(record, rowPtr, record, rowPtr,
 
297
                        NdbOperation::LM_Read, mask);
 
298
  }
 
299
  else
 
300
  {
 
301
    NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
302
    op= MyOp;
 
303
    CHECK_NULL(MyOp, "T3-1: getNdbOperation", td,
 
304
               pCON->getNdbError());
 
305
    
 
306
    MyOp->readTuple();
 
307
    MyOp->equal(IND_SUBSCRIBER_NUMBER, 
 
308
                td->transactionData.number);
 
309
    MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
 
310
                   (char *)&td->transactionData.location);
 
311
    MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
 
312
                   td->transactionData.changed_by);
 
313
    MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
 
314
                   td->transactionData.changed_time);
 
315
    MyOp->getValue(IND_SUBSCRIBER_GROUP, 
 
316
                   (char *)&td->transactionData.group_id);
 
317
    MyOp->getValue(IND_SUBSCRIBER_SESSIONS, 
 
318
                   (char *)&td->transactionData.sessions);
 
319
  }
 
320
 
 
321
  stat_async = async;
 
322
  if (async == 1) {
 
323
    pCON->executeAsynchPrepare( NoCommit , T3_Callback_1, td);
 
324
  } else {
 
325
    int result = pCON->execute( NoCommit );
 
326
    T3_Callback_1(result, pCON, (void*)td);
 
327
    return;
 
328
  }//if
 
329
}
 
330
 
 
331
void
 
332
T3_Callback_1(int result, NdbConnection * pCON, void * threadData){
 
333
  ThreadData * td = (ThreadData *)threadData;
 
334
  DEBUG3("T3(%.*s, %.2d): - Callback 1", SUBSCRIBER_NUMBER_LENGTH, 
 
335
         td->transactionData.number, 
 
336
         td->transactionData.server_id);
 
337
 
 
338
  if (result == -1) {
 
339
    CHECK_ALLOWED_ERROR("T3-1: execute", td, pCON->getNdbError());
 
340
    td->pNDB->closeTransaction(pCON);
 
341
    start_T3(td->pNDB, td, stat_async);
 
342
    return;
 
343
  }//if
 
344
 
 
345
  const NdbOperation* op= NULL;
 
346
 
 
347
  if (td->ndbRecordSharedData)
 
348
  {
 
349
    char* rowPtr= (char*) &td->transactionData;
 
350
    const NdbRecord* record= td->ndbRecordSharedData->
 
351
      groupTableAllowReadNdbRecord;
 
352
    Uint32 m=0;
 
353
    unsigned char* mask= (unsigned char*) &m;
 
354
    
 
355
    SET_MASK(mask, IND_GROUP_ALLOW_READ);
 
356
 
 
357
    op= pCON->readTuple(record, rowPtr, record, rowPtr,
 
358
                        NdbOperation::LM_Read, mask);
 
359
  }
 
360
  else
 
361
  {
 
362
    NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
 
363
    op= MyOp;
 
364
    CHECK_NULL(MyOp, "T3-2: getNdbOperation", td,
 
365
               pCON->getNdbError());
 
366
    
 
367
    MyOp->readTuple();
 
368
    MyOp->equal(IND_GROUP_ID,
 
369
                (char*)&td->transactionData.group_id);
 
370
    MyOp->getValue(IND_GROUP_ALLOW_READ, 
 
371
                   (char *)&td->transactionData.permission);
 
372
  }
 
373
 
 
374
  if (stat_async == 1) {
 
375
    pCON->executeAsynchPrepare( NoCommit , T3_Callback_2, td);
 
376
  } else {
 
377
    int result = pCON->execute( NoCommit );
 
378
    T3_Callback_2(result, pCON, (void*)td);
 
379
    return;
 
380
  }//if
 
381
}
 
382
 
 
383
void
 
384
T3_Callback_2(int result, NdbConnection * pCON, void * threadData){
 
385
  ThreadData * td = (ThreadData *)threadData;
 
386
  
 
387
  if (result == -1) {
 
388
    CHECK_ALLOWED_ERROR("T3-2: execute", td, pCON->getNdbError());
 
389
    td->pNDB->closeTransaction(pCON);
 
390
    start_T3(td->pNDB, td, stat_async);
 
391
    return;
 
392
  }//if
 
393
  
 
394
  Uint32 permission = td->transactionData.permission;
 
395
  Uint32 sessions   = td->transactionData.sessions;
 
396
  Uint32 server_bit = td->transactionData.server_bit;
 
397
 
 
398
  if(((permission & server_bit) == server_bit) &&
 
399
     ((sessions   & server_bit) == server_bit)){
 
400
    
 
401
    memcpy(td->transactionData.suffix,
 
402
           &td->transactionData.number[SFX_START],
 
403
           SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
 
404
    DEBUG5("T3(%.*s, %.2d): - Callback 2 - reading(%.*s)", 
 
405
           SUBSCRIBER_NUMBER_LENGTH, 
 
406
           td->transactionData.number, 
 
407
           td->transactionData.server_id,
 
408
           SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
 
409
           td->transactionData.suffix);
 
410
    
 
411
    /* Operations 3 + 4 */
 
412
    if (td->ndbRecordSharedData)
 
413
    {
 
414
      /* Op 3 */
 
415
      char* rowPtr= (char*) &td->transactionData;
 
416
      const NdbRecord* record= td->ndbRecordSharedData->
 
417
        sessionTableNdbRecord;
 
418
      Uint32 m=0;
 
419
      unsigned char* mask= (unsigned char*) &m;
 
420
 
 
421
      SET_MASK(mask, IND_SESSION_DATA);
 
422
 
 
423
      const NdbOperation* MyOp = pCON->readTuple(record, rowPtr, record, rowPtr,
 
424
                                                 NdbOperation::LM_SimpleRead,
 
425
                                                 mask);
 
426
      CHECK_NULL((void*) MyOp, "T3-3: readTuple", td,
 
427
                 pCON->getNdbError());
 
428
 
 
429
      /* Op 4 */
 
430
      record= td->ndbRecordSharedData->
 
431
        serverTableNdbRecord;
 
432
      m= 0;
 
433
 
 
434
      /* Attach interpreted program */
 
435
      NdbOperation::OperationOptions opts;
 
436
      opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
 
437
      opts.interpretedCode= td->ndbRecordSharedData->incrServerReadsProg;
 
438
 
 
439
      MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
 
440
                              &opts,
 
441
                              sizeof(opts));
 
442
      CHECK_NULL((void*) MyOp, "T3-3: updateTuple", td,
 
443
                 pCON->getNdbError());
 
444
    }
 
445
    else
 
446
    {
 
447
      NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
 
448
      CHECK_NULL(MyOp, "T3-3: getNdbOperation", td,
 
449
                 pCON->getNdbError());
 
450
      
 
451
      MyOp->simpleRead();
 
452
      MyOp->equal(IND_SESSION_SUBSCRIBER,
 
453
                  (char*)td->transactionData.number);
 
454
      MyOp->equal(IND_SESSION_SERVER,
 
455
                  (char*)&td->transactionData.server_id);
 
456
      MyOp->getValue(IND_SESSION_DATA, 
 
457
                     (char *)td->transactionData.session_details);
 
458
      
 
459
      MyOp = pCON->getNdbOperation(SERVER_TABLE);
 
460
      CHECK_NULL(MyOp, "T3-4: getNdbOperation", td,
 
461
                 pCON->getNdbError());
 
462
      
 
463
      MyOp->interpretedUpdateTuple();
 
464
      MyOp->equal(IND_SERVER_ID,
 
465
                  (char*)&td->transactionData.server_id);
 
466
      MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
 
467
                  (char*)td->transactionData.suffix);
 
468
      MyOp->incValue(IND_SERVER_READS, (uint32)1);
 
469
    }
 
470
 
 
471
    td->transactionData.branchExecuted = 1;
 
472
  } else {
 
473
    DEBUG3("T3(%.*s, %.2d): - Callback 2 - no read",
 
474
           SUBSCRIBER_NUMBER_LENGTH, 
 
475
           td->transactionData.number, 
 
476
           td->transactionData.server_id);
 
477
    td->transactionData.branchExecuted = 0;
 
478
  }
 
479
  if (stat_async == 1) {
 
480
    pCON->executeAsynchPrepare( Commit , T3_Callback_3, td);
 
481
  } else {
 
482
    int result = pCON->execute( Commit );
 
483
    T3_Callback_3(result, pCON, (void*)td);
 
484
    return;
 
485
  }//if
 
486
}
 
487
 
 
488
void
 
489
T3_Callback_3(int result, NdbConnection * pCON, void * threadData){
 
490
  ThreadData * td = (ThreadData *)threadData;  
 
491
  DEBUG3("T3(%.*s, %.2d): - Completing", SUBSCRIBER_NUMBER_LENGTH, 
 
492
         td->transactionData.number, 
 
493
         td->transactionData.server_id);
 
494
  
 
495
  if (result == -1) {
 
496
    CHECK_ALLOWED_ERROR("T3-3: Commit", td, pCON->getNdbError());
 
497
    td->pNDB->closeTransaction(pCON);
 
498
    start_T3(td->pNDB, td, stat_async);
 
499
    return;
 
500
  }//if
 
501
 
 
502
  td->pNDB->closeTransaction(pCON);
 
503
  complete_T3(td);
 
504
}
 
505
 
 
506
/**
 
507
 * Transaction 4 - T4
 
508
 * 
 
509
 * Create session
 
510
 *
 
511
 * Input:
 
512
 *   SubscriberNumber
 
513
 *   ServerId
 
514
 *   ServerBit
 
515
 *   SessionDetails,
 
516
 *   DoRollback
 
517
 * Output:
 
518
 *   ChangedBy
 
519
 *   ChangedTime
 
520
 *   Location
 
521
 *   BranchExecuted
 
522
 */
 
523
void
 
524
start_T4(Ndb * pNDB, ThreadData * td, int async){
 
525
 
 
526
  DEBUG3("T4(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
 
527
         td->transactionData.number, 
 
528
         td->transactionData.server_id);
 
529
  
 
530
  NdbConnection * pCON = 0;
 
531
  while((pCON = startTransaction(pNDB, td)) == 0){
 
532
    CHECK_ALLOWED_ERROR("T4-1: startTransaction", td, pNDB->getNdbError());
 
533
    NdbSleep_MilliSleep(10);
 
534
  }
 
535
 
 
536
  if (td->ndbRecordSharedData)
 
537
  {
 
538
    char* rowPtr= (char*) &td->transactionData;
 
539
    const NdbRecord* record= td->ndbRecordSharedData->
 
540
      subscriberTableNdbRecord;
 
541
    Uint32 m=0;
 
542
    unsigned char* mask= (unsigned char*) &m;
 
543
 
 
544
    SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
 
545
    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
 
546
    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
 
547
    SET_MASK(mask, IND_SUBSCRIBER_GROUP);
 
548
    SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
 
549
    
 
550
    const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
 
551
                                              NdbOperation::LM_Read,
 
552
                                              mask);
 
553
    CHECK_NULL((void*)MyOp, "T4-1: readTuple", td,
 
554
               pCON->getNdbError());
 
555
 
 
556
    m= 0;
 
557
 
 
558
    /* Create program to add something to the subscriber 
 
559
     * sessions column 
 
560
     */
 
561
    Uint32 codeBuf[20];
 
562
 
 
563
    for (Uint32 p=0; p<20; p++)
 
564
      codeBuf[p]= 0;
 
565
 
 
566
    NdbInterpretedCode program(pNDB->getDictionary()->
 
567
                               getTable(SUBSCRIBER_TABLE),
 
568
                               codeBuf,
 
569
                               20);
 
570
 
 
571
    if (program.add_val(IND_SUBSCRIBER_SESSIONS, 
 
572
                        (uint32)td->transactionData.server_bit) || 
 
573
        program.interpret_exit_ok() ||
 
574
        program.finalise())
 
575
    {
 
576
      CHECK_NULL(NULL , "T4-1: Program create failed", td,
 
577
                 program.getNdbError());
 
578
    }
 
579
 
 
580
    NdbOperation::OperationOptions opts;
 
581
    opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
 
582
    opts.interpretedCode= &program;
 
583
 
 
584
    MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr,
 
585
                            mask,
 
586
                            &opts,
 
587
                            sizeof(opts));
 
588
    CHECK_NULL((void*)MyOp, "T4-1: updateTuple", td,
 
589
               pCON->getNdbError());
 
590
 
 
591
  }
 
592
  else
 
593
  {
 
594
    /* Use old Api */
 
595
    if (td->useCombinedUpdate)
 
596
    {
 
597
      NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
598
      CHECK_NULL(MyOp, "T4-1: getNdbOperation", td,
 
599
                 pCON->getNdbError());
 
600
      
 
601
      MyOp->interpretedUpdateTuple();
 
602
      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
 
603
                  td->transactionData.number);
 
604
      MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
 
605
                     (char *)&td->transactionData.location);
 
606
      MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
 
607
                     td->transactionData.changed_by);
 
608
      MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
 
609
                     td->transactionData.changed_time);
 
610
      MyOp->getValue(IND_SUBSCRIBER_GROUP,
 
611
                     (char *)&td->transactionData.group_id);
 
612
      MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
 
613
                     (char *)&td->transactionData.sessions); 
 
614
      MyOp->incValue(IND_SUBSCRIBER_SESSIONS, 
 
615
                     (uint32)td->transactionData.server_bit);
 
616
    }
 
617
    else
 
618
    {
 
619
      /* Separate read op + update op 
 
620
       * Relies on relative ordering of operation execution on a single
 
621
       * row
 
622
       */
 
623
      NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
624
      CHECK_NULL(MyOp, "T4-1: getNdbOperation (read)", td,
 
625
                 pCON->getNdbError());
 
626
      MyOp->readTuple();
 
627
      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
 
628
                  td->transactionData.number);
 
629
      MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
 
630
                     (char *)&td->transactionData.location);
 
631
      MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
 
632
                     td->transactionData.changed_by);
 
633
      MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
 
634
                     td->transactionData.changed_time);
 
635
      MyOp->getValue(IND_SUBSCRIBER_GROUP,
 
636
                     (char *)&td->transactionData.group_id);
 
637
      MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
 
638
                     (char *)&td->transactionData.sessions);
 
639
      MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
640
      CHECK_NULL(MyOp, "T4-1: getNdbOperation (update)", td,
 
641
                 pCON->getNdbError());
 
642
      MyOp->interpretedUpdateTuple();
 
643
      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
 
644
                  td->transactionData.number);
 
645
      MyOp->incValue(IND_SUBSCRIBER_SESSIONS, 
 
646
                     (uint32)td->transactionData.server_bit);
 
647
    }
 
648
  }
 
649
  stat_async = async;
 
650
  if (async == 1) {
 
651
    pCON->executeAsynchPrepare( NoCommit , T4_Callback_1, td);
 
652
  } else {
 
653
    int result = pCON->execute( NoCommit );
 
654
    T4_Callback_1(result, pCON, (void*)td);
 
655
    return;
 
656
  }//if
 
657
}
 
658
 
 
659
void
 
660
T4_Callback_1(int result, NdbConnection * pCON, void * threadData){
 
661
  ThreadData * td = (ThreadData *)threadData;  
 
662
  if (result == -1) {
 
663
    CHECK_ALLOWED_ERROR("T4-1: execute", td, pCON->getNdbError());
 
664
    td->pNDB->closeTransaction(pCON);
 
665
    start_T4(td->pNDB, td, stat_async);
 
666
    return;
 
667
  }//if
 
668
  
 
669
  DEBUG3("T4(%.*s, %.2d): - Callback 1", 
 
670
         SUBSCRIBER_NUMBER_LENGTH, 
 
671
         td->transactionData.number, 
 
672
         td->transactionData.server_id);
 
673
 
 
674
 
 
675
  if (td->ndbRecordSharedData)
 
676
  {
 
677
    char* rowPtr= (char*) &td->transactionData;
 
678
    const NdbRecord* record= td->ndbRecordSharedData->
 
679
      groupTableAllowInsertNdbRecord;
 
680
    Uint32 m=0;
 
681
    unsigned char* mask= (unsigned char*) &m;
 
682
 
 
683
    SET_MASK(mask, IND_GROUP_ALLOW_INSERT);
 
684
 
 
685
    const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
 
686
                                              NdbOperation::LM_Read,
 
687
                                              mask);
 
688
    
 
689
    CHECK_NULL((void*)MyOp, "T4-2: readTuple", td,
 
690
               pCON->getNdbError());
 
691
  }
 
692
  else
 
693
  {
 
694
    NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
 
695
    CHECK_NULL(MyOp, "T4-2: getNdbOperation", td,
 
696
               pCON->getNdbError());
 
697
    
 
698
    MyOp->readTuple();
 
699
    MyOp->equal(IND_GROUP_ID,
 
700
                (char*)&td->transactionData.group_id);
 
701
    MyOp->getValue(IND_GROUP_ALLOW_INSERT, 
 
702
                   (char *)&td->transactionData.permission);
 
703
  }
 
704
  if (stat_async == 1) {
 
705
    pCON->executeAsynchPrepare( NoCommit , T4_Callback_2, td);
 
706
  } else {
 
707
    int result = pCON->execute( NoCommit );
 
708
    T4_Callback_2(result, pCON, (void*)td);
 
709
    return;
 
710
  }//if
 
711
}
 
712
 
 
713
void
 
714
T4_Callback_2(int result, NdbConnection * pCON, void * threadData){
 
715
  ThreadData * td = (ThreadData *)threadData;  
 
716
  if (result == -1) {
 
717
    CHECK_ALLOWED_ERROR("T4-2: execute", td, pCON->getNdbError());
 
718
    td->pNDB->closeTransaction(pCON);
 
719
    start_T4(td->pNDB, td, stat_async);
 
720
    return;
 
721
  }//if
 
722
 
 
723
  Uint32 permission = td->transactionData.permission;
 
724
  Uint32 sessions   = td->transactionData.sessions;
 
725
  Uint32 server_bit = td->transactionData.server_bit;
 
726
  
 
727
  if(((permission & server_bit) == server_bit) &&
 
728
     ((sessions   & server_bit) == 0)){
 
729
    
 
730
    memcpy(td->transactionData.suffix,
 
731
           &td->transactionData.number[SFX_START],
 
732
           SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
 
733
    
 
734
    DEBUG5("T4(%.*s, %.2d): - Callback 2 - inserting(%.*s)", 
 
735
           SUBSCRIBER_NUMBER_LENGTH, 
 
736
           td->transactionData.number, 
 
737
           td->transactionData.server_id,
 
738
           SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
 
739
           td->transactionData.suffix);
 
740
    
 
741
    /* Operations 3 + 4 */
 
742
   
 
743
    if (td->ndbRecordSharedData)
 
744
    {
 
745
      char* rowPtr= (char*) &td->transactionData;
 
746
      const NdbRecord* record= td->ndbRecordSharedData->
 
747
        sessionTableNdbRecord;
 
748
      Uint32 m=0;
 
749
      unsigned char* mask= (unsigned char*) &m;
 
750
 
 
751
      SET_MASK(mask, IND_SESSION_SUBSCRIBER);
 
752
      SET_MASK(mask, IND_SESSION_SERVER);
 
753
      SET_MASK(mask, IND_SESSION_DATA);
 
754
 
 
755
      const NdbOperation* MyOp= pCON->insertTuple(record, rowPtr, mask);
 
756
 
 
757
      CHECK_NULL((void*)MyOp, "T4-3: insertTuple", td,
 
758
                 pCON->getNdbError());
 
759
 
 
760
      record= td->ndbRecordSharedData->
 
761
        serverTableNdbRecord;
 
762
      m= 0;
 
763
      
 
764
      NdbOperation::OperationOptions opts;
 
765
      opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
 
766
      opts.interpretedCode= td->ndbRecordSharedData->incrServerInsertsProg;
 
767
      
 
768
      MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
 
769
                              &opts, sizeof(opts));
 
770
 
 
771
      CHECK_NULL((void*)MyOp, "T4-3: updateTuple", td,
 
772
                 pCON->getNdbError());
 
773
    }
 
774
    else
 
775
    {
 
776
      NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
 
777
      CHECK_NULL(MyOp, "T4-3: getNdbOperation", td,
 
778
                 pCON->getNdbError());
 
779
      
 
780
      MyOp->insertTuple();
 
781
      MyOp->equal(IND_SESSION_SUBSCRIBER,
 
782
                  (char*)td->transactionData.number);
 
783
      MyOp->equal(IND_SESSION_SERVER,
 
784
                  (char*)&td->transactionData.server_id);
 
785
      MyOp->setValue(IND_SESSION_DATA, 
 
786
                     (char *)td->transactionData.session_details);
 
787
      /* Operation 4 */
 
788
      
 
789
      /* Operation 5 */
 
790
      MyOp = pCON->getNdbOperation(SERVER_TABLE);
 
791
      CHECK_NULL(MyOp, "T4-5: getNdbOperation", td,
 
792
                 pCON->getNdbError());
 
793
      
 
794
      MyOp->interpretedUpdateTuple();
 
795
      MyOp->equal(IND_SERVER_ID,
 
796
                  (char*)&td->transactionData.server_id);
 
797
      MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
 
798
                  (char*)td->transactionData.suffix);
 
799
      MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);
 
800
    }
 
801
    td->transactionData.branchExecuted = 1;
 
802
  } else {
 
803
    td->transactionData.branchExecuted = 0;
 
804
    DEBUG5("T4(%.*s, %.2d): - Callback 2 - %s %s",
 
805
           SUBSCRIBER_NUMBER_LENGTH, 
 
806
           td->transactionData.number, 
 
807
           td->transactionData.server_id,
 
808
           ((permission & server_bit) ? 
 
809
            "permission - " : "no permission - "),
 
810
           ((sessions   & server_bit) ? 
 
811
            "in session - " : "no in session - "));
 
812
  }
 
813
  
 
814
  if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
 
815
    if (stat_async == 1) {
 
816
      pCON->executeAsynchPrepare( Commit , T4_Callback_3, td);
 
817
    } else {
 
818
      int result = pCON->execute( Commit );
 
819
      T4_Callback_3(result, pCON, (void*)td);
 
820
      return;
 
821
    }//if
 
822
  } else {
 
823
    if (stat_async == 1) {
 
824
      pCON->executeAsynchPrepare( Rollback , T4_Callback_3, td);
 
825
    } else {
 
826
      int result = pCON->execute( Rollback );
 
827
      T4_Callback_3(result, pCON, (void*)td);
 
828
      return;
 
829
    }//if
 
830
  }
 
831
}
 
832
 
 
833
void
 
834
T4_Callback_3(int result, NdbConnection * pCON, void * threadData){
 
835
  ThreadData * td = (ThreadData *)threadData;  
 
836
  if (result == -1) {
 
837
    CHECK_ALLOWED_ERROR("T4-3: Commit", td, pCON->getNdbError());
 
838
    td->pNDB->closeTransaction(pCON);
 
839
    start_T4(td->pNDB, td, stat_async);
 
840
    return;
 
841
  }//if
 
842
 
 
843
  DEBUG3("T4(%.*s, %.2d): - Completing", 
 
844
         SUBSCRIBER_NUMBER_LENGTH, 
 
845
         td->transactionData.number, 
 
846
         td->transactionData.server_id);
 
847
 
 
848
  td->pNDB->closeTransaction(pCON);
 
849
  complete_T4(td);
 
850
}
 
851
 
 
852
/**
 
853
 * Transaction 5 - T5
 
854
 * 
 
855
 * Delete session
 
856
 *
 
857
 * Input:
 
858
 *   SubscriberNumber
 
859
 *   ServerId
 
860
 *   ServerBit
 
861
 *   DoRollback
 
862
 * Output:
 
863
 *   ChangedBy
 
864
 *   ChangedTime
 
865
 *   Location
 
866
 *   BranchExecuted
 
867
 */
 
868
void
 
869
start_T5(Ndb * pNDB, ThreadData * td, int async){
 
870
 
 
871
  DEBUG3("T5(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
 
872
         td->transactionData.number, 
 
873
         td->transactionData.server_id);
 
874
 
 
875
  NdbConnection * pCON = 0;
 
876
  while((pCON = startTransaction(pNDB, td)) == 0){
 
877
    CHECK_ALLOWED_ERROR("T5-1: startTransaction", td, pNDB->getNdbError());
 
878
    NdbSleep_MilliSleep(10);
 
879
  }
 
880
  
 
881
  if (td->ndbRecordSharedData)
 
882
  {    
 
883
    char* rowPtr= (char*) &td->transactionData;
 
884
    const NdbRecord* record= td->ndbRecordSharedData->
 
885
      subscriberTableNdbRecord;
 
886
    Uint32 m=0;
 
887
    unsigned char* mask= (unsigned char*) &m;
 
888
 
 
889
    SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
 
890
    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
 
891
    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
 
892
    SET_MASK(mask, IND_SUBSCRIBER_GROUP);
 
893
    SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
 
894
 
 
895
    const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
 
896
                                              NdbOperation::LM_Read,
 
897
                                              mask);
 
898
    CHECK_NULL((void*)MyOp, "T5-1: readTuple", td,
 
899
               pCON->getNdbError());
 
900
 
 
901
    m= 0;
 
902
 
 
903
    /* Create program to subtract something from the 
 
904
     * subscriber sessions column
 
905
     */
 
906
    Uint32 codeBuf[20];
 
907
    NdbInterpretedCode program(pNDB->getDictionary()->
 
908
                               getTable(SUBSCRIBER_TABLE),
 
909
                               codeBuf,
 
910
                               20);
 
911
    if (program.sub_val(IND_SUBSCRIBER_SESSIONS, 
 
912
                        (uint32)td->transactionData.server_bit) ||
 
913
        program.interpret_exit_ok() ||
 
914
        program.finalise())
 
915
    {
 
916
      CHECK_NULL(NULL , "T5: Program create failed", td,
 
917
                 program.getNdbError());
 
918
    }
 
919
    NdbOperation::OperationOptions opts;
 
920
    opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
 
921
    opts.interpretedCode= &program;  
 
922
 
 
923
    MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr,
 
924
                            mask,
 
925
                            &opts,
 
926
                            sizeof(opts));
 
927
    CHECK_NULL((void*)MyOp, "T5-1: updateTuple", td,
 
928
               pCON->getNdbError());
 
929
  }
 
930
  else
 
931
  {
 
932
    /* Use old Api */
 
933
    if (td->useCombinedUpdate)
 
934
    {
 
935
      NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
936
      CHECK_NULL(MyOp, "T5-1: getNdbOperation", td,
 
937
                 pCON->getNdbError());
 
938
      
 
939
      MyOp->interpretedUpdateTuple();
 
940
      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
 
941
                  td->transactionData.number);
 
942
      MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
 
943
                     (char *)&td->transactionData.location);
 
944
      MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
 
945
                     td->transactionData.changed_by);
 
946
      MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
 
947
                     td->transactionData.changed_time);
 
948
      MyOp->getValue(IND_SUBSCRIBER_GROUP,
 
949
                     (char *)&td->transactionData.group_id);
 
950
      MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
 
951
                     (char *)&td->transactionData.sessions);
 
952
      MyOp->subValue(IND_SUBSCRIBER_SESSIONS, 
 
953
                     (uint32)td->transactionData.server_bit);
 
954
    }
 
955
    else
 
956
    {
 
957
      /* Use separate read and update operations
 
958
       * This relies on execution ordering between operations on
 
959
       * the same row
 
960
       */
 
961
      NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
962
      CHECK_NULL(MyOp, "T5-1: getNdbOperation (readTuple)", td,
 
963
                 pCON->getNdbError());
 
964
      MyOp->readTuple();
 
965
      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
 
966
                  td->transactionData.number);
 
967
      MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
 
968
                     (char *)&td->transactionData.location);
 
969
      MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
 
970
                     td->transactionData.changed_by);
 
971
      MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
 
972
                     td->transactionData.changed_time);
 
973
      MyOp->getValue(IND_SUBSCRIBER_GROUP,
 
974
                     (char *)&td->transactionData.group_id);
 
975
      MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
 
976
                     (char *)&td->transactionData.sessions);
 
977
 
 
978
      MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
 
979
      CHECK_NULL(MyOp, "T5-1: getNdbOperation (updateTuple)", td,
 
980
                 pCON->getNdbError());
 
981
      MyOp->interpretedUpdateTuple();
 
982
      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
 
983
                  td->transactionData.number);
 
984
      MyOp->subValue(IND_SUBSCRIBER_SESSIONS, 
 
985
                     (uint32)td->transactionData.server_bit);
 
986
    }
 
987
  }
 
988
  stat_async = async;
 
989
  if (async == 1) {
 
990
    pCON->executeAsynchPrepare( NoCommit , T5_Callback_1, td);
 
991
  } else {
 
992
    int result = pCON->execute( NoCommit );
 
993
    T5_Callback_1(result, pCON, (void*)td);
 
994
    return;
 
995
  }//if
 
996
}
 
997
 
 
998
void
 
999
T5_Callback_1(int result, NdbConnection * pCON, void * threadData){
 
1000
  ThreadData * td = (ThreadData *)threadData;  
 
1001
  if (result == -1) {
 
1002
    CHECK_ALLOWED_ERROR("T5-1: execute", td, pCON->getNdbError());
 
1003
    td->pNDB->closeTransaction(pCON);
 
1004
    start_T5(td->pNDB, td, stat_async);
 
1005
    return;
 
1006
  }//if
 
1007
 
 
1008
  DEBUG3("T5(%.*s, %.2d): - Callback 1", 
 
1009
         SUBSCRIBER_NUMBER_LENGTH, 
 
1010
         td->transactionData.number, 
 
1011
         td->transactionData.server_id);
 
1012
  
 
1013
  if (td->ndbRecordSharedData)
 
1014
  {
 
1015
    char* rowPtr= (char*) &td->transactionData;
 
1016
    const NdbRecord* record= td->ndbRecordSharedData->
 
1017
      groupTableAllowDeleteNdbRecord;
 
1018
    Uint32 m=0;
 
1019
    unsigned char* mask= (unsigned char*) &m;
 
1020
 
 
1021
    SET_MASK(mask, IND_GROUP_ALLOW_DELETE);
 
1022
 
 
1023
    const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
 
1024
                                              NdbOperation::LM_Read,
 
1025
                                              mask);
 
1026
 
 
1027
    CHECK_NULL((void*)MyOp, "T5-2: readTuple", td,
 
1028
               pCON->getNdbError());
 
1029
  }
 
1030
  else
 
1031
  {
 
1032
    NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
 
1033
    CHECK_NULL(MyOp, "T5-2: getNdbOperation", td,
 
1034
               pCON->getNdbError());
 
1035
    
 
1036
    MyOp->readTuple();
 
1037
    MyOp->equal(IND_GROUP_ID,
 
1038
                (char*)&td->transactionData.group_id);
 
1039
    MyOp->getValue(IND_GROUP_ALLOW_DELETE, 
 
1040
                   (char *)&td->transactionData.permission);
 
1041
  }
 
1042
 
 
1043
  if (stat_async == 1) {
 
1044
    pCON->executeAsynchPrepare( NoCommit , T5_Callback_2, td);
 
1045
  } else {
 
1046
    int result = pCON->execute( NoCommit );
 
1047
    T5_Callback_2(result, pCON, (void*)td);
 
1048
    return;
 
1049
  }//if
 
1050
}
 
1051
 
 
1052
void
 
1053
T5_Callback_2(int result, NdbConnection * pCON, void * threadData){
 
1054
  ThreadData * td = (ThreadData *)threadData;  
 
1055
  if (result == -1) {
 
1056
    CHECK_ALLOWED_ERROR("T5-2: execute", td, pCON->getNdbError());
 
1057
    td->pNDB->closeTransaction(pCON);
 
1058
    start_T5(td->pNDB, td, stat_async);
 
1059
    return;
 
1060
  }//if
 
1061
 
 
1062
  Uint32 permission = td->transactionData.permission;
 
1063
  Uint32 sessions   = td->transactionData.sessions;
 
1064
  Uint32 server_bit = td->transactionData.server_bit;
 
1065
 
 
1066
  if(((permission & server_bit) == server_bit) &&
 
1067
     ((sessions   & server_bit) == server_bit)){
 
1068
    
 
1069
    memcpy(td->transactionData.suffix,
 
1070
           &td->transactionData.number[SFX_START],
 
1071
           SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
 
1072
    
 
1073
    DEBUG5("T5(%.*s, %.2d): - Callback 2 - deleting(%.*s)", 
 
1074
           SUBSCRIBER_NUMBER_LENGTH, 
 
1075
           td->transactionData.number, 
 
1076
           td->transactionData.server_id,
 
1077
           SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
 
1078
           td->transactionData.suffix);
 
1079
    
 
1080
    if (td->ndbRecordSharedData)
 
1081
    {
 
1082
      char* rowPtr= (char*) &td->transactionData;
 
1083
      const NdbRecord* record= td->ndbRecordSharedData->
 
1084
        sessionTableNdbRecord;
 
1085
      Uint32 m=0;
 
1086
      unsigned char* mask= (unsigned char*) &m;
 
1087
      
 
1088
      const NdbOperation* MyOp= pCON->deleteTuple(record, rowPtr, record);
 
1089
      CHECK_NULL((void*) MyOp, "T5-3: deleteTuple", td,
 
1090
                 pCON->getNdbError());
 
1091
 
 
1092
      record= td->ndbRecordSharedData->
 
1093
        serverTableNdbRecord;
 
1094
      m= 0;
 
1095
 
 
1096
      NdbOperation::OperationOptions opts;
 
1097
      opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
 
1098
      opts.interpretedCode= td->ndbRecordSharedData->incrServerDeletesProg;
 
1099
      
 
1100
      MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
 
1101
                              &opts, sizeof(opts));
 
1102
 
 
1103
      CHECK_NULL((void*)MyOp, "T5-2: updateTuple", td,
 
1104
                 pCON->getNdbError());
 
1105
    }
 
1106
    else
 
1107
    {
 
1108
      /* Operation 3 */
 
1109
      NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
 
1110
      CHECK_NULL(MyOp, "T5-3: getNdbOperation", td,
 
1111
                 pCON->getNdbError());
 
1112
      
 
1113
      MyOp->deleteTuple();
 
1114
      MyOp->equal(IND_SESSION_SUBSCRIBER,
 
1115
                  (char*)td->transactionData.number);
 
1116
      MyOp->equal(IND_SESSION_SERVER,
 
1117
                  (char*)&td->transactionData.server_id);
 
1118
      /* Operation 4 */
 
1119
      
 
1120
      /* Operation 5 */
 
1121
      MyOp = pCON->getNdbOperation(SERVER_TABLE);
 
1122
      CHECK_NULL(MyOp, "T5-5: getNdbOperation", td,
 
1123
                 pCON->getNdbError());
 
1124
      
 
1125
      MyOp->interpretedUpdateTuple();
 
1126
      MyOp->equal(IND_SERVER_ID,
 
1127
                  (char*)&td->transactionData.server_id);
 
1128
      MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
 
1129
                  (char*)td->transactionData.suffix);
 
1130
      MyOp->incValue(IND_SERVER_DELETES, (uint32)1);
 
1131
    }
 
1132
    td->transactionData.branchExecuted = 1;
 
1133
  } else {
 
1134
    td->transactionData.branchExecuted = 0;
 
1135
 
 
1136
    DEBUG5("T5(%.*s, %.2d): - Callback 2 - no delete - %s %s", 
 
1137
           SUBSCRIBER_NUMBER_LENGTH, 
 
1138
           td->transactionData.number, 
 
1139
           td->transactionData.server_id,
 
1140
           ((permission & server_bit) ? 
 
1141
            "permission - " : "no permission - "),
 
1142
           ((sessions   & server_bit) ? 
 
1143
            "in session - " : "no in session - "));
 
1144
  }
 
1145
  
 
1146
  if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
 
1147
    if (stat_async == 1) {
 
1148
      pCON->executeAsynchPrepare( Commit , T5_Callback_3, td);
 
1149
    } else {
 
1150
      int result = pCON->execute( Commit );
 
1151
      T5_Callback_3(result, pCON, (void*)td);
 
1152
      return;
 
1153
    }//if
 
1154
  } else {
 
1155
    if (stat_async == 1) {
 
1156
      pCON->executeAsynchPrepare( Rollback , T5_Callback_3, td);
 
1157
    } else {
 
1158
      int result = pCON->execute( Rollback );
 
1159
      T5_Callback_3(result, pCON, (void*)td);
 
1160
      return;
 
1161
    }//if
 
1162
  }
 
1163
}
 
1164
 
 
1165
void
 
1166
T5_Callback_3(int result, NdbConnection * pCON, void * threadData){
 
1167
  ThreadData * td = (ThreadData *)threadData;  
 
1168
  if (result == -1) {
 
1169
    CHECK_ALLOWED_ERROR("T5-3: Commit", td, pCON->getNdbError());
 
1170
    td->pNDB->closeTransaction(pCON);
 
1171
    start_T5(td->pNDB, td, stat_async);
 
1172
    return;
 
1173
  }//if
 
1174
  
 
1175
  DEBUG3("T5(%.*s, %.2d): - Completing", 
 
1176
         SUBSCRIBER_NUMBER_LENGTH, 
 
1177
         td->transactionData.number, 
 
1178
         td->transactionData.server_id);
 
1179
  
 
1180
  td->pNDB->closeTransaction(pCON);
 
1181
  complete_T5(td);
 
1182
}