1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
#include <HugoAsynchTransactions.hpp>
19
HugoAsynchTransactions::HugoAsynchTransactions(const NdbDictionary::Table& _t)
20
: HugoTransactions(_t),
21
transactionsCompleted(0),
27
HugoAsynchTransactions::~HugoAsynchTransactions(){
28
deallocTransactions();
31
void asynchCallback(int result, NdbConnection* pTrans,
33
HugoAsynchTransactions* pHugo = (HugoAsynchTransactions*) anObject;
35
pHugo->transactionCompleted();
38
const NdbError err = pTrans->getNdbError();
40
case NdbError::Success:
42
g_info << "ERROR: NdbError reports success when transcaction failed"
46
case NdbError::TemporaryError:
51
case 626: // Tuple did not exist
52
g_info << (unsigned int)pHugo->getTransactionsCompleted() << ": "
53
<< err.code << " " << err.message << endl;
57
case NdbError::UnknownResult:
61
case NdbError::PermanentError:
62
switch (err.classification) {
63
case NdbError::ConstraintViolation:
64
// Tuple already existed, OK in this application,
65
// but should be reported
66
g_info << (unsigned int)pHugo->getTransactionsCompleted()
67
<< ": " << err.code << " " << err.message << endl;
75
} else {// if (result == -1)
77
ndbout << (unsigned int)pHugo->getTransactionsCompleted() << " completed"
84
HugoAsynchTransactions::loadTableAsynch(Ndb* pNdb,
90
int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
92
g_info << (unsigned int)transactionsCompleted * operations
93
<< "|- inserted..." << endl;
99
HugoAsynchTransactions::transactionCompleted() {
100
transactionsCompleted++;
104
HugoAsynchTransactions::getTransactionsCompleted() {
105
return transactionsCompleted;
109
HugoAsynchTransactions::pkDelRecordsAsynch(Ndb* pNdb,
115
g_info << "|- Deleting records asynchronous..." << endl;
117
int result = executeAsynchOperation(pNdb, records, batch, trans,
120
g_info << "|- " << (unsigned int)transactionsCompleted * operations
121
<< " deleted..." << endl;
127
HugoAsynchTransactions::pkReadRecordsAsynch(Ndb* pNdb,
133
g_info << "|- Reading records asynchronous..." << endl;
135
allocRows(trans*operations);
136
int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
139
g_info << "|- " << (unsigned int)transactionsCompleted * operations
149
HugoAsynchTransactions::pkUpdateRecordsAsynch(Ndb* pNdb,
155
g_info << "|- Updating records asynchronous..." << endl;
159
int cReadRecords = 0;
164
transactionsCompleted = 0;
166
allocRows(trans*operations);
167
allocTransactions(trans);
170
for (int i = 0; i < batch; i++) { // For each batch
171
while (cRecords < records*batch) {
174
for (t = 0; t < trans; t++) { // For each transaction
175
transactions[t] = pNdb->startTransaction();
176
if (transactions[t] == NULL) {
177
ERR(pNdb->getNdbError());
180
for (int k = 0; k < operations; k++) { // For each operation
181
NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
183
ERR(transactions[t]->getNdbError());
184
pNdb->closeTransaction(transactions[t]);
189
// Define primary keys
190
check = pOp->readTupleExclusive();
191
if (equalForRow(pOp, cReadRecords) != 0)
193
ERR(transactions[t]->getNdbError());
194
pNdb->closeTransaction(transactions[t]);
197
// Define attributes to read
198
for (a = 0; a < tab.getNoOfColumns(); a++) {
199
if ((rows[cReadIndex]->attributeStore(a) =
200
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
201
ERR(transactions[t]->getNdbError());
202
pNdb->closeTransaction(transactions[t]);
209
} // For each operation
212
transactions[t]->executeAsynchPrepare(NoCommit, &asynchCallback,
216
if (cReadRecords >= records) {
217
// No more transactions needed
220
} // For each transaction
222
// Wait for all outstanding transactions
223
pNdb->sendPollNdb(3000, 0, 0);
226
for (r = 0; r < trans*operations; r++) {
227
if (calc.verifyRowValues(rows[r]) != 0) {
228
g_info << "|- Verify failed..." << endl;
229
// Close all transactions
230
for (int t = 0; t < cTrans; t++) {
231
pNdb->closeTransaction(transactions[t]);
240
for (t = 0; t < trans; t++) { // For each transaction
241
for (int k = 0; k < operations; k++) { // For each operation
242
NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
244
ERR(transactions[t]->getNdbError());
245
pNdb->closeTransaction(transactions[t]);
249
int updates = calc.getUpdatesValue(rows[cIndex]) + 1;
251
check = pOp->updateTuple();
253
ERR(transactions[t]->getNdbError());
254
pNdb->closeTransaction(transactions[t]);
258
// Set search condition for the record
259
if (equalForRow(pOp, cReadRecords) != 0)
261
ERR(transactions[t]->getNdbError());
262
pNdb->closeTransaction(transactions[t]);
267
for (a = 0; a < tab.getNoOfColumns(); a++) {
268
if (tab.getColumn(a)->getPrimaryKey() == false) {
269
if (setValueForAttr(pOp, a, cRecords, updates) != 0) {
270
ERR(transactions[t]->getNdbError());
271
pNdb->closeTransaction(transactions[t]);
279
} // For each operation
282
transactions[t]->executeAsynchPrepare(Commit, &asynchCallback,
286
if (cRecords >= records) {
287
// No more transactions needed
290
} // For each transaction
292
// Wait for all outstanding transactions
293
pNdb->sendPollNdb(3000, 0, 0);
295
// Close all transactions
296
for (t = 0; t < cTrans; t++) {
297
pNdb->closeTransaction(transactions[t]);
300
} // while (cRecords < records*batch)
304
deallocTransactions();
307
g_info << "|- " << ((unsigned int)transactionsCompleted * operations)/2
308
<< " updated..." << endl;
313
HugoAsynchTransactions::allocTransactions(int trans) {
314
if (transactions != NULL) {
315
deallocTransactions();
317
numTransactions = trans;
318
transactions = new NdbConnection*[numTransactions];
322
HugoAsynchTransactions::deallocTransactions() {
323
if (transactions != NULL){
324
delete[] transactions;
330
HugoAsynchTransactions::executeAsynchOperation(Ndb* pNdb,
335
NDB_OPERATION theOperation,
339
// int retryAttempt = 0; // Not used at the moment
340
// int retryMax = 5; // Not used at the moment
346
transactionsCompleted = 0;
347
allocTransactions(trans);
349
for (int i = 0; i < batch; i++) { // For each batch
350
while (cRecords < records*batch) {
353
for (t = 0; t < trans; t++) { // For each transaction
354
transactions[t] = pNdb->startTransaction();
355
if (transactions[t] == NULL) {
356
ERR(pNdb->getNdbError());
359
for (int k = 0; k < operations; k++) { // For each operation
360
NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
362
ERR(transactions[t]->getNdbError());
363
pNdb->closeTransaction(transactions[t]);
367
switch (theOperation) {
370
check = pOp->insertTuple();
372
ERR(transactions[t]->getNdbError());
373
pNdb->closeTransaction(transactions[t]);
377
// Set a calculated value for each attribute in this table
378
for (a = 0; a < tab.getNoOfColumns(); a++) {
379
if (setValueForAttr(pOp, a, cRecords, 0 ) != 0) {
380
ERR(transactions[t]->getNdbError());
381
pNdb->closeTransaction(transactions[t]);
384
} // For each attribute
387
// This is a special case and is handled in the calling client...
391
// Define primary keys
392
check = pOp->readTuple();
393
if (equalForRow(pOp, cRecords) != 0)
395
ERR(transactions[t]->getNdbError());
396
pNdb->closeTransaction(transactions[t]);
399
// Define attributes to read
400
for (a = 0; a < tab.getNoOfColumns(); a++) {
401
if ((rows[cIndex]->attributeStore(a) =
402
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
403
ERR(transactions[t]->getNdbError());
404
pNdb->closeTransaction(transactions[t]);
411
check = pOp->deleteTuple();
413
ERR(transactions[t]->getNdbError());
414
pNdb->closeTransaction(transactions[t]);
418
// Define primary keys
419
if (equalForRow(pOp, cRecords) != 0)
421
ERR(transactions[t]->getNdbError());
422
pNdb->closeTransaction(transactions[t]);
427
// Should not happen...
428
pNdb->closeTransaction(transactions[t]);
435
} // For each operation
438
transactions[t]->executeAsynchPrepare(theType, &asynchCallback,
442
if (cRecords >= records) {
443
// No more transactions needed
446
} // For each transaction
448
// Wait for all outstanding transactions
449
pNdb->sendPollNdb(3000, 0, 0);
451
// ugly... it's starts to resemble flexXXX ...:(
452
switch (theOperation) {
455
for (r = 0; r < trans*operations; r++) {
456
if (calc.verifyRowValues(rows[r]) != 0) {
457
g_info << "|- Verify failed..." << endl;
458
// Close all transactions
459
for (int t = 0; t < cTrans; t++) {
460
pNdb->closeTransaction(transactions[t]);
472
// Close all transactions
473
for (t = 0; t < cTrans; t++) {
474
pNdb->closeTransaction(transactions[t]);
477
} // while (cRecords < records*batch)
481
deallocTransactions();