2
Copyright (C) 2005, 2006, 2008 MySQL AB, 2009 Sun Microsystems, Inc.
3
All rights reserved. Use is subject to license terms.
5
This program is free software; you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation; version 2 of the License.
9
This program is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU General Public License for more details.
14
You should have received a copy of the GNU General Public License
15
along with this program; if not, write to the Free Software
16
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
#include "userInterface.h"
25
#include "ndb_schema.hpp"
26
#include "ndb_error.hpp"
31
void T1_Callback(int result, NdbConnection * pCon, void * threadData);
32
void T2_Callback(int result, NdbConnection * pCon, void * threadData);
33
void T3_Callback_1(int result, NdbConnection * pCon, void * threadData);
34
void T3_Callback_2(int result, NdbConnection * pCon, void * threadData);
35
void T3_Callback_3(int result, NdbConnection * pCon, void * threadData);
36
void T4_Callback_1(int result, NdbConnection * pCon, void * threadData);
37
void T4_Callback_2(int result, NdbConnection * pCon, void * threadData);
38
void T4_Callback_3(int result, NdbConnection * pCon, void * threadData);
39
void T5_Callback_1(int result, NdbConnection * pCon, void * threadData);
40
void T5_Callback_2(int result, NdbConnection * pCon, void * threadData);
41
void T5_Callback_3(int result, NdbConnection * pCon, void * threadData);
43
static int stat_async = 0;
48
* Update location and changed by/time on a subscriber
59
#define SFX_START (SUBSCRIBER_NUMBER_LENGTH - SUBSCRIBER_NUMBER_SUFFIX_LENGTH)
63
startTransaction(Ndb * pNDB, ThreadData * td){
64
return pNDB->startTransaction();
66
return pNDB->startTransactionDGroup (0,
67
&td->transactionData.number[SFX_START],
72
// NdbRecord helper macros
73
#define SET_MASK(mask, attrId) \
74
mask[attrId >> 3] |= (1 << (attrId & 7))
77
start_T1(Ndb * pNDB, ThreadData * td, int async){
79
DEBUG2("T1(%.*s): - Starting", SUBSCRIBER_NUMBER_LENGTH,
80
td->transactionData.number);
82
NdbConnection * pCON = 0;
83
while((pCON = startTransaction(pNDB, td)) == 0){
84
CHECK_ALLOWED_ERROR("T1: startTransaction", td, pNDB->getNdbError());
85
NdbSleep_MilliSleep(10);
88
const NdbOperation* op= NULL;
90
if (td->ndbRecordSharedData)
92
char* rowPtr= (char*) &td->transactionData;
93
const NdbRecord* record= td->ndbRecordSharedData->
94
subscriberTableNdbRecord;
96
unsigned char* mask= (unsigned char*) &m;
98
//SET_MASK(mask, IND_SUBSCRIBER_NUMBER);
99
SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
100
SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
101
SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
103
op= pCON->updateTuple(record,
111
NdbOperation *MyOp = pCON->getNdbOperation(SUBSCRIBER_TABLE);
115
MyOp->equal(IND_SUBSCRIBER_NUMBER,
116
td->transactionData.number);
117
MyOp->setValue(IND_SUBSCRIBER_LOCATION,
118
(char *)&td->transactionData.location);
119
MyOp->setValue(IND_SUBSCRIBER_CHANGED_BY,
120
td->transactionData.changed_by);
121
MyOp->setValue(IND_SUBSCRIBER_CHANGED_TIME,
122
td->transactionData.changed_time);
129
pCON->executeAsynchPrepare( Commit , T1_Callback, td);
131
int result = pCON->execute(Commit);
132
T1_Callback(result, pCON, (void*)td);
136
CHECK_NULL(NULL, "T1: getNdbOperation", td, pCON->getNdbError());
141
T1_Callback(int result, NdbConnection * pCON, void * threadData) {
142
ThreadData * td = (ThreadData *)threadData;
144
DEBUG2("T1(%.*s): - Completing", SUBSCRIBER_NUMBER_LENGTH,
145
td->transactionData.number);
148
CHECK_ALLOWED_ERROR("T1: Commit", td, pCON->getNdbError());
149
td->pNDB->closeTransaction(pCON);
150
start_T1(td->pNDB, td, stat_async);
153
td->pNDB->closeTransaction(pCON);
160
* Read from Subscriber:
172
start_T2(Ndb * pNDB, ThreadData * td, int async){
174
DEBUG3("T2(%.*s, %d): - Starting", SUBSCRIBER_NUMBER_LENGTH,
175
td->transactionData.number,
176
td->transactionData.location);
178
NdbConnection * pCON = 0;
180
while((pCON = startTransaction(pNDB, td)) == 0){
181
CHECK_ALLOWED_ERROR("T2-1: startTransaction", td, pNDB->getNdbError());
182
NdbSleep_MilliSleep(10);
185
if (td->ndbRecordSharedData)
187
char* rowPtr= (char*) &td->transactionData;
188
const NdbRecord* record= td->ndbRecordSharedData->
189
subscriberTableNdbRecord;
191
unsigned char* mask= (unsigned char*) &m;
193
SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
194
SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
195
SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
196
SET_MASK(mask, IND_SUBSCRIBER_NAME);
198
const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
199
NdbOperation::LM_Read, mask);
200
CHECK_NULL((void*) MyOp, "T2: readTuple", td,
201
pCON->getNdbError());
205
NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
206
CHECK_NULL(MyOp, "T2: getNdbOperation", td,
207
pCON->getNdbError());
210
MyOp->equal(IND_SUBSCRIBER_NUMBER,
211
td->transactionData.number);
212
MyOp->getValue(IND_SUBSCRIBER_LOCATION,
213
(char *)&td->transactionData.location);
214
MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
215
td->transactionData.changed_by);
216
MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
217
td->transactionData.changed_time);
218
MyOp->getValue(IND_SUBSCRIBER_NAME,
219
td->transactionData.name);
223
pCON->executeAsynchPrepare( Commit , T2_Callback, td);
225
int result = pCON->execute(Commit);
226
T2_Callback(result, pCON, (void*)td);
232
T2_Callback(int result, NdbConnection * pCON, void * threadData){
233
ThreadData * td = (ThreadData *)threadData;
234
DEBUG3("T2(%.*s, %d): - Completing", SUBSCRIBER_NUMBER_LENGTH,
235
td->transactionData.number,
236
td->transactionData.location);
239
CHECK_ALLOWED_ERROR("T2: Commit", td, pCON->getNdbError());
240
td->pNDB->closeTransaction(pCON);
241
start_T2(td->pNDB, td, stat_async);
245
td->pNDB->closeTransaction(pCON);
252
* Read session details
267
start_T3(Ndb * pNDB, ThreadData * td, int async){
269
DEBUG3("T3(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH,
270
td->transactionData.number,
271
td->transactionData.server_id);
273
NdbConnection * pCON = 0;
275
while((pCON = startTransaction(pNDB, td)) == 0){
276
CHECK_ALLOWED_ERROR("T3-1: startTransaction", td, pNDB->getNdbError());
277
NdbSleep_MilliSleep(10);
280
const NdbOperation* op;
282
if (td->ndbRecordSharedData)
284
char* rowPtr= (char*) &td->transactionData;
285
const NdbRecord* record= td->ndbRecordSharedData->
286
subscriberTableNdbRecord;
288
unsigned char* mask= (unsigned char*) &m;
290
SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
291
SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
292
SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
293
SET_MASK(mask, IND_SUBSCRIBER_GROUP);
294
SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
296
op= pCON->readTuple(record, rowPtr, record, rowPtr,
297
NdbOperation::LM_Read, mask);
301
NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
303
CHECK_NULL(MyOp, "T3-1: getNdbOperation", td,
304
pCON->getNdbError());
307
MyOp->equal(IND_SUBSCRIBER_NUMBER,
308
td->transactionData.number);
309
MyOp->getValue(IND_SUBSCRIBER_LOCATION,
310
(char *)&td->transactionData.location);
311
MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
312
td->transactionData.changed_by);
313
MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
314
td->transactionData.changed_time);
315
MyOp->getValue(IND_SUBSCRIBER_GROUP,
316
(char *)&td->transactionData.group_id);
317
MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
318
(char *)&td->transactionData.sessions);
323
pCON->executeAsynchPrepare( NoCommit , T3_Callback_1, td);
325
int result = pCON->execute( NoCommit );
326
T3_Callback_1(result, pCON, (void*)td);
332
T3_Callback_1(int result, NdbConnection * pCON, void * threadData){
333
ThreadData * td = (ThreadData *)threadData;
334
DEBUG3("T3(%.*s, %.2d): - Callback 1", SUBSCRIBER_NUMBER_LENGTH,
335
td->transactionData.number,
336
td->transactionData.server_id);
339
CHECK_ALLOWED_ERROR("T3-1: execute", td, pCON->getNdbError());
340
td->pNDB->closeTransaction(pCON);
341
start_T3(td->pNDB, td, stat_async);
345
const NdbOperation* op= NULL;
347
if (td->ndbRecordSharedData)
349
char* rowPtr= (char*) &td->transactionData;
350
const NdbRecord* record= td->ndbRecordSharedData->
351
groupTableAllowReadNdbRecord;
353
unsigned char* mask= (unsigned char*) &m;
355
SET_MASK(mask, IND_GROUP_ALLOW_READ);
357
op= pCON->readTuple(record, rowPtr, record, rowPtr,
358
NdbOperation::LM_Read, mask);
362
NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
364
CHECK_NULL(MyOp, "T3-2: getNdbOperation", td,
365
pCON->getNdbError());
368
MyOp->equal(IND_GROUP_ID,
369
(char*)&td->transactionData.group_id);
370
MyOp->getValue(IND_GROUP_ALLOW_READ,
371
(char *)&td->transactionData.permission);
374
if (stat_async == 1) {
375
pCON->executeAsynchPrepare( NoCommit , T3_Callback_2, td);
377
int result = pCON->execute( NoCommit );
378
T3_Callback_2(result, pCON, (void*)td);
384
T3_Callback_2(int result, NdbConnection * pCON, void * threadData){
385
ThreadData * td = (ThreadData *)threadData;
388
CHECK_ALLOWED_ERROR("T3-2: execute", td, pCON->getNdbError());
389
td->pNDB->closeTransaction(pCON);
390
start_T3(td->pNDB, td, stat_async);
394
Uint32 permission = td->transactionData.permission;
395
Uint32 sessions = td->transactionData.sessions;
396
Uint32 server_bit = td->transactionData.server_bit;
398
if(((permission & server_bit) == server_bit) &&
399
((sessions & server_bit) == server_bit)){
401
memcpy(td->transactionData.suffix,
402
&td->transactionData.number[SFX_START],
403
SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
404
DEBUG5("T3(%.*s, %.2d): - Callback 2 - reading(%.*s)",
405
SUBSCRIBER_NUMBER_LENGTH,
406
td->transactionData.number,
407
td->transactionData.server_id,
408
SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
409
td->transactionData.suffix);
411
/* Operations 3 + 4 */
412
if (td->ndbRecordSharedData)
415
char* rowPtr= (char*) &td->transactionData;
416
const NdbRecord* record= td->ndbRecordSharedData->
417
sessionTableNdbRecord;
419
unsigned char* mask= (unsigned char*) &m;
421
SET_MASK(mask, IND_SESSION_DATA);
423
const NdbOperation* MyOp = pCON->readTuple(record, rowPtr, record, rowPtr,
424
NdbOperation::LM_SimpleRead,
426
CHECK_NULL((void*) MyOp, "T3-3: readTuple", td,
427
pCON->getNdbError());
430
record= td->ndbRecordSharedData->
431
serverTableNdbRecord;
434
/* Attach interpreted program */
435
NdbOperation::OperationOptions opts;
436
opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
437
opts.interpretedCode= td->ndbRecordSharedData->incrServerReadsProg;
439
MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
442
CHECK_NULL((void*) MyOp, "T3-3: updateTuple", td,
443
pCON->getNdbError());
447
NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
448
CHECK_NULL(MyOp, "T3-3: getNdbOperation", td,
449
pCON->getNdbError());
452
MyOp->equal(IND_SESSION_SUBSCRIBER,
453
(char*)td->transactionData.number);
454
MyOp->equal(IND_SESSION_SERVER,
455
(char*)&td->transactionData.server_id);
456
MyOp->getValue(IND_SESSION_DATA,
457
(char *)td->transactionData.session_details);
459
MyOp = pCON->getNdbOperation(SERVER_TABLE);
460
CHECK_NULL(MyOp, "T3-4: getNdbOperation", td,
461
pCON->getNdbError());
463
MyOp->interpretedUpdateTuple();
464
MyOp->equal(IND_SERVER_ID,
465
(char*)&td->transactionData.server_id);
466
MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
467
(char*)td->transactionData.suffix);
468
MyOp->incValue(IND_SERVER_READS, (uint32)1);
471
td->transactionData.branchExecuted = 1;
473
DEBUG3("T3(%.*s, %.2d): - Callback 2 - no read",
474
SUBSCRIBER_NUMBER_LENGTH,
475
td->transactionData.number,
476
td->transactionData.server_id);
477
td->transactionData.branchExecuted = 0;
479
if (stat_async == 1) {
480
pCON->executeAsynchPrepare( Commit , T3_Callback_3, td);
482
int result = pCON->execute( Commit );
483
T3_Callback_3(result, pCON, (void*)td);
489
T3_Callback_3(int result, NdbConnection * pCON, void * threadData){
490
ThreadData * td = (ThreadData *)threadData;
491
DEBUG3("T3(%.*s, %.2d): - Completing", SUBSCRIBER_NUMBER_LENGTH,
492
td->transactionData.number,
493
td->transactionData.server_id);
496
CHECK_ALLOWED_ERROR("T3-3: Commit", td, pCON->getNdbError());
497
td->pNDB->closeTransaction(pCON);
498
start_T3(td->pNDB, td, stat_async);
502
td->pNDB->closeTransaction(pCON);
524
start_T4(Ndb * pNDB, ThreadData * td, int async){
526
DEBUG3("T4(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH,
527
td->transactionData.number,
528
td->transactionData.server_id);
530
NdbConnection * pCON = 0;
531
while((pCON = startTransaction(pNDB, td)) == 0){
532
CHECK_ALLOWED_ERROR("T4-1: startTransaction", td, pNDB->getNdbError());
533
NdbSleep_MilliSleep(10);
536
if (td->ndbRecordSharedData)
538
char* rowPtr= (char*) &td->transactionData;
539
const NdbRecord* record= td->ndbRecordSharedData->
540
subscriberTableNdbRecord;
542
unsigned char* mask= (unsigned char*) &m;
544
SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
545
SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
546
SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
547
SET_MASK(mask, IND_SUBSCRIBER_GROUP);
548
SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
550
const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
551
NdbOperation::LM_Read,
553
CHECK_NULL((void*)MyOp, "T4-1: readTuple", td,
554
pCON->getNdbError());
558
/* Create program to add something to the subscriber
563
for (Uint32 p=0; p<20; p++)
566
NdbInterpretedCode program(pNDB->getDictionary()->
567
getTable(SUBSCRIBER_TABLE),
571
if (program.add_val(IND_SUBSCRIBER_SESSIONS,
572
(uint32)td->transactionData.server_bit) ||
573
program.interpret_exit_ok() ||
576
CHECK_NULL(NULL , "T4-1: Program create failed", td,
577
program.getNdbError());
580
NdbOperation::OperationOptions opts;
581
opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
582
opts.interpretedCode= &program;
584
MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr,
588
CHECK_NULL((void*)MyOp, "T4-1: updateTuple", td,
589
pCON->getNdbError());
595
if (td->useCombinedUpdate)
597
NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
598
CHECK_NULL(MyOp, "T4-1: getNdbOperation", td,
599
pCON->getNdbError());
601
MyOp->interpretedUpdateTuple();
602
MyOp->equal(IND_SUBSCRIBER_NUMBER,
603
td->transactionData.number);
604
MyOp->getValue(IND_SUBSCRIBER_LOCATION,
605
(char *)&td->transactionData.location);
606
MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
607
td->transactionData.changed_by);
608
MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
609
td->transactionData.changed_time);
610
MyOp->getValue(IND_SUBSCRIBER_GROUP,
611
(char *)&td->transactionData.group_id);
612
MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
613
(char *)&td->transactionData.sessions);
614
MyOp->incValue(IND_SUBSCRIBER_SESSIONS,
615
(uint32)td->transactionData.server_bit);
619
/* Separate read op + update op
620
* Relies on relative ordering of operation execution on a single
623
NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
624
CHECK_NULL(MyOp, "T4-1: getNdbOperation (read)", td,
625
pCON->getNdbError());
627
MyOp->equal(IND_SUBSCRIBER_NUMBER,
628
td->transactionData.number);
629
MyOp->getValue(IND_SUBSCRIBER_LOCATION,
630
(char *)&td->transactionData.location);
631
MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
632
td->transactionData.changed_by);
633
MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
634
td->transactionData.changed_time);
635
MyOp->getValue(IND_SUBSCRIBER_GROUP,
636
(char *)&td->transactionData.group_id);
637
MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
638
(char *)&td->transactionData.sessions);
639
MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
640
CHECK_NULL(MyOp, "T4-1: getNdbOperation (update)", td,
641
pCON->getNdbError());
642
MyOp->interpretedUpdateTuple();
643
MyOp->equal(IND_SUBSCRIBER_NUMBER,
644
td->transactionData.number);
645
MyOp->incValue(IND_SUBSCRIBER_SESSIONS,
646
(uint32)td->transactionData.server_bit);
651
pCON->executeAsynchPrepare( NoCommit , T4_Callback_1, td);
653
int result = pCON->execute( NoCommit );
654
T4_Callback_1(result, pCON, (void*)td);
660
T4_Callback_1(int result, NdbConnection * pCON, void * threadData){
661
ThreadData * td = (ThreadData *)threadData;
663
CHECK_ALLOWED_ERROR("T4-1: execute", td, pCON->getNdbError());
664
td->pNDB->closeTransaction(pCON);
665
start_T4(td->pNDB, td, stat_async);
669
DEBUG3("T4(%.*s, %.2d): - Callback 1",
670
SUBSCRIBER_NUMBER_LENGTH,
671
td->transactionData.number,
672
td->transactionData.server_id);
675
if (td->ndbRecordSharedData)
677
char* rowPtr= (char*) &td->transactionData;
678
const NdbRecord* record= td->ndbRecordSharedData->
679
groupTableAllowInsertNdbRecord;
681
unsigned char* mask= (unsigned char*) &m;
683
SET_MASK(mask, IND_GROUP_ALLOW_INSERT);
685
const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
686
NdbOperation::LM_Read,
689
CHECK_NULL((void*)MyOp, "T4-2: readTuple", td,
690
pCON->getNdbError());
694
NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
695
CHECK_NULL(MyOp, "T4-2: getNdbOperation", td,
696
pCON->getNdbError());
699
MyOp->equal(IND_GROUP_ID,
700
(char*)&td->transactionData.group_id);
701
MyOp->getValue(IND_GROUP_ALLOW_INSERT,
702
(char *)&td->transactionData.permission);
704
if (stat_async == 1) {
705
pCON->executeAsynchPrepare( NoCommit , T4_Callback_2, td);
707
int result = pCON->execute( NoCommit );
708
T4_Callback_2(result, pCON, (void*)td);
714
T4_Callback_2(int result, NdbConnection * pCON, void * threadData){
715
ThreadData * td = (ThreadData *)threadData;
717
CHECK_ALLOWED_ERROR("T4-2: execute", td, pCON->getNdbError());
718
td->pNDB->closeTransaction(pCON);
719
start_T4(td->pNDB, td, stat_async);
723
Uint32 permission = td->transactionData.permission;
724
Uint32 sessions = td->transactionData.sessions;
725
Uint32 server_bit = td->transactionData.server_bit;
727
if(((permission & server_bit) == server_bit) &&
728
((sessions & server_bit) == 0)){
730
memcpy(td->transactionData.suffix,
731
&td->transactionData.number[SFX_START],
732
SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
734
DEBUG5("T4(%.*s, %.2d): - Callback 2 - inserting(%.*s)",
735
SUBSCRIBER_NUMBER_LENGTH,
736
td->transactionData.number,
737
td->transactionData.server_id,
738
SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
739
td->transactionData.suffix);
741
/* Operations 3 + 4 */
743
if (td->ndbRecordSharedData)
745
char* rowPtr= (char*) &td->transactionData;
746
const NdbRecord* record= td->ndbRecordSharedData->
747
sessionTableNdbRecord;
749
unsigned char* mask= (unsigned char*) &m;
751
SET_MASK(mask, IND_SESSION_SUBSCRIBER);
752
SET_MASK(mask, IND_SESSION_SERVER);
753
SET_MASK(mask, IND_SESSION_DATA);
755
const NdbOperation* MyOp= pCON->insertTuple(record, rowPtr, mask);
757
CHECK_NULL((void*)MyOp, "T4-3: insertTuple", td,
758
pCON->getNdbError());
760
record= td->ndbRecordSharedData->
761
serverTableNdbRecord;
764
NdbOperation::OperationOptions opts;
765
opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
766
opts.interpretedCode= td->ndbRecordSharedData->incrServerInsertsProg;
768
MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
769
&opts, sizeof(opts));
771
CHECK_NULL((void*)MyOp, "T4-3: updateTuple", td,
772
pCON->getNdbError());
776
NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
777
CHECK_NULL(MyOp, "T4-3: getNdbOperation", td,
778
pCON->getNdbError());
781
MyOp->equal(IND_SESSION_SUBSCRIBER,
782
(char*)td->transactionData.number);
783
MyOp->equal(IND_SESSION_SERVER,
784
(char*)&td->transactionData.server_id);
785
MyOp->setValue(IND_SESSION_DATA,
786
(char *)td->transactionData.session_details);
790
MyOp = pCON->getNdbOperation(SERVER_TABLE);
791
CHECK_NULL(MyOp, "T4-5: getNdbOperation", td,
792
pCON->getNdbError());
794
MyOp->interpretedUpdateTuple();
795
MyOp->equal(IND_SERVER_ID,
796
(char*)&td->transactionData.server_id);
797
MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
798
(char*)td->transactionData.suffix);
799
MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);
801
td->transactionData.branchExecuted = 1;
803
td->transactionData.branchExecuted = 0;
804
DEBUG5("T4(%.*s, %.2d): - Callback 2 - %s %s",
805
SUBSCRIBER_NUMBER_LENGTH,
806
td->transactionData.number,
807
td->transactionData.server_id,
808
((permission & server_bit) ?
809
"permission - " : "no permission - "),
810
((sessions & server_bit) ?
811
"in session - " : "no in session - "));
814
if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
815
if (stat_async == 1) {
816
pCON->executeAsynchPrepare( Commit , T4_Callback_3, td);
818
int result = pCON->execute( Commit );
819
T4_Callback_3(result, pCON, (void*)td);
823
if (stat_async == 1) {
824
pCON->executeAsynchPrepare( Rollback , T4_Callback_3, td);
826
int result = pCON->execute( Rollback );
827
T4_Callback_3(result, pCON, (void*)td);
834
T4_Callback_3(int result, NdbConnection * pCON, void * threadData){
835
ThreadData * td = (ThreadData *)threadData;
837
CHECK_ALLOWED_ERROR("T4-3: Commit", td, pCON->getNdbError());
838
td->pNDB->closeTransaction(pCON);
839
start_T4(td->pNDB, td, stat_async);
843
DEBUG3("T4(%.*s, %.2d): - Completing",
844
SUBSCRIBER_NUMBER_LENGTH,
845
td->transactionData.number,
846
td->transactionData.server_id);
848
td->pNDB->closeTransaction(pCON);
869
start_T5(Ndb * pNDB, ThreadData * td, int async){
871
DEBUG3("T5(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH,
872
td->transactionData.number,
873
td->transactionData.server_id);
875
NdbConnection * pCON = 0;
876
while((pCON = startTransaction(pNDB, td)) == 0){
877
CHECK_ALLOWED_ERROR("T5-1: startTransaction", td, pNDB->getNdbError());
878
NdbSleep_MilliSleep(10);
881
if (td->ndbRecordSharedData)
883
char* rowPtr= (char*) &td->transactionData;
884
const NdbRecord* record= td->ndbRecordSharedData->
885
subscriberTableNdbRecord;
887
unsigned char* mask= (unsigned char*) &m;
889
SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
890
SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
891
SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
892
SET_MASK(mask, IND_SUBSCRIBER_GROUP);
893
SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
895
const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
896
NdbOperation::LM_Read,
898
CHECK_NULL((void*)MyOp, "T5-1: readTuple", td,
899
pCON->getNdbError());
903
/* Create program to subtract something from the
904
* subscriber sessions column
907
NdbInterpretedCode program(pNDB->getDictionary()->
908
getTable(SUBSCRIBER_TABLE),
911
if (program.sub_val(IND_SUBSCRIBER_SESSIONS,
912
(uint32)td->transactionData.server_bit) ||
913
program.interpret_exit_ok() ||
916
CHECK_NULL(NULL , "T5: Program create failed", td,
917
program.getNdbError());
919
NdbOperation::OperationOptions opts;
920
opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
921
opts.interpretedCode= &program;
923
MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr,
927
CHECK_NULL((void*)MyOp, "T5-1: updateTuple", td,
928
pCON->getNdbError());
933
if (td->useCombinedUpdate)
935
NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
936
CHECK_NULL(MyOp, "T5-1: getNdbOperation", td,
937
pCON->getNdbError());
939
MyOp->interpretedUpdateTuple();
940
MyOp->equal(IND_SUBSCRIBER_NUMBER,
941
td->transactionData.number);
942
MyOp->getValue(IND_SUBSCRIBER_LOCATION,
943
(char *)&td->transactionData.location);
944
MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
945
td->transactionData.changed_by);
946
MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
947
td->transactionData.changed_time);
948
MyOp->getValue(IND_SUBSCRIBER_GROUP,
949
(char *)&td->transactionData.group_id);
950
MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
951
(char *)&td->transactionData.sessions);
952
MyOp->subValue(IND_SUBSCRIBER_SESSIONS,
953
(uint32)td->transactionData.server_bit);
957
/* Use separate read and update operations
958
* This relies on execution ordering between operations on
961
NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
962
CHECK_NULL(MyOp, "T5-1: getNdbOperation (readTuple)", td,
963
pCON->getNdbError());
965
MyOp->equal(IND_SUBSCRIBER_NUMBER,
966
td->transactionData.number);
967
MyOp->getValue(IND_SUBSCRIBER_LOCATION,
968
(char *)&td->transactionData.location);
969
MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
970
td->transactionData.changed_by);
971
MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
972
td->transactionData.changed_time);
973
MyOp->getValue(IND_SUBSCRIBER_GROUP,
974
(char *)&td->transactionData.group_id);
975
MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
976
(char *)&td->transactionData.sessions);
978
MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
979
CHECK_NULL(MyOp, "T5-1: getNdbOperation (updateTuple)", td,
980
pCON->getNdbError());
981
MyOp->interpretedUpdateTuple();
982
MyOp->equal(IND_SUBSCRIBER_NUMBER,
983
td->transactionData.number);
984
MyOp->subValue(IND_SUBSCRIBER_SESSIONS,
985
(uint32)td->transactionData.server_bit);
990
pCON->executeAsynchPrepare( NoCommit , T5_Callback_1, td);
992
int result = pCON->execute( NoCommit );
993
T5_Callback_1(result, pCON, (void*)td);
999
T5_Callback_1(int result, NdbConnection * pCON, void * threadData){
1000
ThreadData * td = (ThreadData *)threadData;
1002
CHECK_ALLOWED_ERROR("T5-1: execute", td, pCON->getNdbError());
1003
td->pNDB->closeTransaction(pCON);
1004
start_T5(td->pNDB, td, stat_async);
1008
DEBUG3("T5(%.*s, %.2d): - Callback 1",
1009
SUBSCRIBER_NUMBER_LENGTH,
1010
td->transactionData.number,
1011
td->transactionData.server_id);
1013
if (td->ndbRecordSharedData)
1015
char* rowPtr= (char*) &td->transactionData;
1016
const NdbRecord* record= td->ndbRecordSharedData->
1017
groupTableAllowDeleteNdbRecord;
1019
unsigned char* mask= (unsigned char*) &m;
1021
SET_MASK(mask, IND_GROUP_ALLOW_DELETE);
1023
const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
1024
NdbOperation::LM_Read,
1027
CHECK_NULL((void*)MyOp, "T5-2: readTuple", td,
1028
pCON->getNdbError());
1032
NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
1033
CHECK_NULL(MyOp, "T5-2: getNdbOperation", td,
1034
pCON->getNdbError());
1037
MyOp->equal(IND_GROUP_ID,
1038
(char*)&td->transactionData.group_id);
1039
MyOp->getValue(IND_GROUP_ALLOW_DELETE,
1040
(char *)&td->transactionData.permission);
1043
if (stat_async == 1) {
1044
pCON->executeAsynchPrepare( NoCommit , T5_Callback_2, td);
1046
int result = pCON->execute( NoCommit );
1047
T5_Callback_2(result, pCON, (void*)td);
1053
T5_Callback_2(int result, NdbConnection * pCON, void * threadData){
1054
ThreadData * td = (ThreadData *)threadData;
1056
CHECK_ALLOWED_ERROR("T5-2: execute", td, pCON->getNdbError());
1057
td->pNDB->closeTransaction(pCON);
1058
start_T5(td->pNDB, td, stat_async);
1062
Uint32 permission = td->transactionData.permission;
1063
Uint32 sessions = td->transactionData.sessions;
1064
Uint32 server_bit = td->transactionData.server_bit;
1066
if(((permission & server_bit) == server_bit) &&
1067
((sessions & server_bit) == server_bit)){
1069
memcpy(td->transactionData.suffix,
1070
&td->transactionData.number[SFX_START],
1071
SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
1073
DEBUG5("T5(%.*s, %.2d): - Callback 2 - deleting(%.*s)",
1074
SUBSCRIBER_NUMBER_LENGTH,
1075
td->transactionData.number,
1076
td->transactionData.server_id,
1077
SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
1078
td->transactionData.suffix);
1080
if (td->ndbRecordSharedData)
1082
char* rowPtr= (char*) &td->transactionData;
1083
const NdbRecord* record= td->ndbRecordSharedData->
1084
sessionTableNdbRecord;
1086
unsigned char* mask= (unsigned char*) &m;
1088
const NdbOperation* MyOp= pCON->deleteTuple(record, rowPtr, record);
1089
CHECK_NULL((void*) MyOp, "T5-3: deleteTuple", td,
1090
pCON->getNdbError());
1092
record= td->ndbRecordSharedData->
1093
serverTableNdbRecord;
1096
NdbOperation::OperationOptions opts;
1097
opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
1098
opts.interpretedCode= td->ndbRecordSharedData->incrServerDeletesProg;
1100
MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
1101
&opts, sizeof(opts));
1103
CHECK_NULL((void*)MyOp, "T5-2: updateTuple", td,
1104
pCON->getNdbError());
1109
NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
1110
CHECK_NULL(MyOp, "T5-3: getNdbOperation", td,
1111
pCON->getNdbError());
1113
MyOp->deleteTuple();
1114
MyOp->equal(IND_SESSION_SUBSCRIBER,
1115
(char*)td->transactionData.number);
1116
MyOp->equal(IND_SESSION_SERVER,
1117
(char*)&td->transactionData.server_id);
1121
MyOp = pCON->getNdbOperation(SERVER_TABLE);
1122
CHECK_NULL(MyOp, "T5-5: getNdbOperation", td,
1123
pCON->getNdbError());
1125
MyOp->interpretedUpdateTuple();
1126
MyOp->equal(IND_SERVER_ID,
1127
(char*)&td->transactionData.server_id);
1128
MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
1129
(char*)td->transactionData.suffix);
1130
MyOp->incValue(IND_SERVER_DELETES, (uint32)1);
1132
td->transactionData.branchExecuted = 1;
1134
td->transactionData.branchExecuted = 0;
1136
DEBUG5("T5(%.*s, %.2d): - Callback 2 - no delete - %s %s",
1137
SUBSCRIBER_NUMBER_LENGTH,
1138
td->transactionData.number,
1139
td->transactionData.server_id,
1140
((permission & server_bit) ?
1141
"permission - " : "no permission - "),
1142
((sessions & server_bit) ?
1143
"in session - " : "no in session - "));
1146
if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
1147
if (stat_async == 1) {
1148
pCON->executeAsynchPrepare( Commit , T5_Callback_3, td);
1150
int result = pCON->execute( Commit );
1151
T5_Callback_3(result, pCON, (void*)td);
1155
if (stat_async == 1) {
1156
pCON->executeAsynchPrepare( Rollback , T5_Callback_3, td);
1158
int result = pCON->execute( Rollback );
1159
T5_Callback_3(result, pCON, (void*)td);
1166
T5_Callback_3(int result, NdbConnection * pCON, void * threadData){
1167
ThreadData * td = (ThreadData *)threadData;
1169
CHECK_ALLOWED_ERROR("T5-3: Commit", td, pCON->getNdbError());
1170
td->pNDB->closeTransaction(pCON);
1171
start_T5(td->pNDB, td, stat_async);
1175
DEBUG3("T5(%.*s, %.2d): - Completing",
1176
SUBSCRIBER_NUMBER_LENGTH,
1177
td->transactionData.number,
1178
td->transactionData.server_id);
1180
td->pNDB->closeTransaction(pCON);