2
Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
4
This program is free software; you can redistribute it and/or modify
5
it under the terms of the GNU General Public License as published by
6
the Free Software Foundation; version 2 of the License.
8
This program is distributed in the hope that it will be useful,
9
but WITHOUT ANY WARRANTY; without even the implied warranty of
10
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
GNU General Public License for more details.
13
You should have received a copy of the GNU General Public License
14
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21
#include <kernel_types.h>
24
#include <signaldata/ReadNodesConf.hpp>
25
#include <signaldata/NodeFailRep.hpp>
26
#include <signaldata/DumpStateOrd.hpp>
27
#include <signaldata/GetTabInfo.hpp>
28
#include <signaldata/DictTabInfo.hpp>
29
#include <signaldata/CopyData.hpp>
30
#include <signaldata/BuildIndxImpl.hpp>
31
#include <signaldata/SumaImpl.hpp>
32
#include <signaldata/UtilPrepare.hpp>
33
#include <signaldata/UtilExecute.hpp>
34
#include <signaldata/UtilRelease.hpp>
35
#include <SectionReader.hpp>
36
#include <AttributeHeader.hpp>
37
#include <signaldata/TcKeyReq.hpp>
39
#include <signaldata/DbinfoScan.hpp>
40
#include <signaldata/TransIdAI.hpp>
41
#include <signaldata/WaitGCP.hpp>
43
#define CONSTRAINT_VIOLATION 893
47
check_timeout(Uint32 errCode)
56
#define DEBUG(x) { ndbout << "TRIX::" << x << endl; }
61
Trix::Trix(Block_context& ctx) :
62
SimulatedBlock(TRIX, ctx),
63
c_theNodes(c_theNodeRecPool),
68
c_theSubscriptions(c_theSubscriptionRecPool)
70
BLOCK_CONSTRUCTOR(Trix);
72
// Add received signals
73
addRecSignal(GSN_READ_CONFIG_REQ, &Trix::execREAD_CONFIG_REQ);
74
addRecSignal(GSN_STTOR, &Trix::execSTTOR);
75
addRecSignal(GSN_NDB_STTOR, &Trix::execNDB_STTOR); // Forwarded from DICT
76
addRecSignal(GSN_READ_NODESCONF, &Trix::execREAD_NODESCONF);
77
addRecSignal(GSN_READ_NODESREF, &Trix::execREAD_NODESREF);
78
addRecSignal(GSN_NODE_FAILREP, &Trix::execNODE_FAILREP);
79
addRecSignal(GSN_INCL_NODEREQ, &Trix::execINCL_NODEREQ);
80
addRecSignal(GSN_DUMP_STATE_ORD, &Trix::execDUMP_STATE_ORD);
81
addRecSignal(GSN_DBINFO_SCANREQ, &Trix::execDBINFO_SCANREQ);
84
addRecSignal(GSN_BUILD_INDX_IMPL_REQ, &Trix::execBUILD_INDX_IMPL_REQ);
86
addRecSignal(GSN_BUILD_INDX_IMPL_CONF, &Trix::execBUILD_INDX_IMPL_CONF);
87
addRecSignal(GSN_BUILD_INDX_IMPL_REF, &Trix::execBUILD_INDX_IMPL_REF);
89
addRecSignal(GSN_COPY_DATA_IMPL_REQ, &Trix::execCOPY_DATA_IMPL_REQ);
91
addRecSignal(GSN_UTIL_PREPARE_CONF, &Trix::execUTIL_PREPARE_CONF);
92
addRecSignal(GSN_UTIL_PREPARE_REF, &Trix::execUTIL_PREPARE_REF);
93
addRecSignal(GSN_UTIL_EXECUTE_CONF, &Trix::execUTIL_EXECUTE_CONF);
94
addRecSignal(GSN_UTIL_EXECUTE_REF, &Trix::execUTIL_EXECUTE_REF);
95
addRecSignal(GSN_UTIL_RELEASE_CONF, &Trix::execUTIL_RELEASE_CONF);
96
addRecSignal(GSN_UTIL_RELEASE_REF, &Trix::execUTIL_RELEASE_REF);
100
addRecSignal(GSN_SUB_CREATE_CONF, &Trix::execSUB_CREATE_CONF);
101
addRecSignal(GSN_SUB_CREATE_REF, &Trix::execSUB_CREATE_REF);
102
addRecSignal(GSN_SUB_REMOVE_CONF, &Trix::execSUB_REMOVE_CONF);
103
addRecSignal(GSN_SUB_REMOVE_REF, &Trix::execSUB_REMOVE_REF);
104
addRecSignal(GSN_SUB_SYNC_CONF, &Trix::execSUB_SYNC_CONF);
105
addRecSignal(GSN_SUB_SYNC_REF, &Trix::execSUB_SYNC_REF);
106
addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, &Trix::execSUB_SYNC_CONTINUE_REQ);
107
addRecSignal(GSN_SUB_TABLE_DATA, &Trix::execSUB_TABLE_DATA);
109
addRecSignal(GSN_WAIT_GCP_REF, &Trix::execWAIT_GCP_REF);
110
addRecSignal(GSN_WAIT_GCP_CONF, &Trix::execWAIT_GCP_CONF);
113
addRecSignal(GSN_INDEX_STAT_IMPL_REQ, &Trix::execINDEX_STAT_IMPL_REQ);
114
addRecSignal(GSN_GET_TABINFO_CONF, &Trix::execGET_TABINFO_CONF);
115
addRecSignal(GSN_GET_TABINFOREF, &Trix::execGET_TABINFO_REF);
117
// index stats sys tables
118
c_statGetMetaDone = false;
129
Trix::execREAD_CONFIG_REQ(Signal* signal)
133
const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
135
Uint32 ref = req->senderRef;
136
Uint32 senderData = req->senderData;
138
const ndb_mgm_configuration_iterator * p =
139
m_ctx.m_config.getOwnConfigIterator();
142
// Allocate pool sizes
143
c_theAttrOrderBufferPool.setSize(100);
144
c_theSubscriptionRecPool.setSize(100);
145
c_statOpPool.setSize(5);
147
DLList<SubscriptionRecord> subscriptions(c_theSubscriptionRecPool);
148
SubscriptionRecPtr subptr;
149
while(subscriptions.seize(subptr) == true) {
150
new (subptr.p) SubscriptionRecord(c_theAttrOrderBufferPool);
152
subscriptions.release();
154
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
155
conf->senderRef = reference();
156
conf->senderData = senderData;
157
sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
158
ReadConfigConf::SignalLength, JBB);
164
void Trix::execSTTOR(Signal* signal)
168
//const Uint32 startphase = signal->theData[1];
169
const Uint32 theSignalKey = signal->theData[6];
171
signal->theData[0] = theSignalKey;
172
signal->theData[3] = 1;
173
signal->theData[4] = 255; // No more start phases from missra
174
sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB);
181
void Trix::execNDB_STTOR(Signal* signal)
184
BlockReference ndbcntrRef = signal->theData[0];
185
Uint16 startphase = signal->theData[2]; /* RESTART PHASE */
186
Uint16 mynode = signal->theData[1];
187
//Uint16 restarttype = signal->theData[3];
188
//UintR configInfo1 = signal->theData[6]; /* CONFIGRATION INFO PART 1 */
189
//UintR configInfo2 = signal->theData[7]; /* CONFIGRATION INFO PART 2 */
190
switch (startphase) {
193
/* SYMBOLIC START PHASE 4 */
194
/* ABSOLUTE PHASE 5 */
195
/* REQUEST NODE IDENTITIES FROM DBDIH */
196
signal->theData[0] = calcTrixBlockRef(mynode);
197
sendSignal(ndbcntrRef, GSN_READ_NODESREQ, signal, 1, JBB);
210
void Trix::execREAD_NODESCONF(Signal* signal)
214
ReadNodesConf * const readNodes = (ReadNodesConf *)signal->getDataPtr();
215
//Uint32 noOfNodes = readNodes->noOfNodes;
216
NodeRecPtr nodeRecPtr;
218
c_masterNodeId = readNodes->masterNodeId;
219
c_masterTrixRef = RNIL;
222
for(unsigned i = 0; i < MAX_NDB_NODES; i++) {
224
if(NdbNodeBitmask::get(readNodes->allNodes, i)) {
227
ndbrequire(c_theNodes.seizeId(nodeRecPtr, i));
228
nodeRecPtr.p->trixRef = calcTrixBlockRef(i);
229
if (i == c_masterNodeId) {
230
c_masterTrixRef = nodeRecPtr.p->trixRef;
232
if(NdbNodeBitmask::get(readNodes->inactiveNodes, i)){
233
// Node is not active
235
/**-----------------------------------------------------------------
236
* THIS NODE IS DEFINED IN THE CLUSTER BUT IS NOT ALIVE CURRENTLY.
237
* WE ADD THE NODE TO THE SET OF FAILED NODES AND ALSO SET THE
238
* BLOCKSTATE TO BUSY TO AVOID ADDING TRIGGERS OR INDEXES WHILE
239
* NOT ALL NODES ARE ALIVE.
240
*------------------------------------------------------------------*/
241
arrGuard(c_noNodesFailed, MAX_NDB_NODES);
242
nodeRecPtr.p->alive = false;
244
c_blockState = Trix::NODE_FAILURE;
250
nodeRecPtr.p->alive = true;
254
if (c_noNodesFailed == 0) {
255
c_blockState = Trix::STARTED;
262
void Trix::execREAD_NODESREF(Signal* signal)
270
void Trix::execNODE_FAILREP(Signal* signal)
273
NodeFailRep * const nodeFail = (NodeFailRep *) signal->getDataPtr();
275
//Uint32 failureNr = nodeFail->failNo;
276
//Uint32 numberNodes = nodeFail->noOfNodes;
277
Uint32 masterNodeId = nodeFail->masterNodeId;
279
NodeRecPtr nodeRecPtr;
281
for(c_theNodes.first(nodeRecPtr);
282
nodeRecPtr.i != RNIL;
283
c_theNodes.next(nodeRecPtr)) {
284
if(NdbNodeBitmask::get(nodeFail->theNodes, nodeRecPtr.i)) {
285
nodeRecPtr.p->alive = false;
290
if (c_masterNodeId != masterNodeId) {
291
c_masterNodeId = masterNodeId;
292
NodeRecord* nodeRec = c_theNodes.getPtr(masterNodeId);
293
c_masterTrixRef = nodeRec->trixRef;
300
void Trix::execINCL_NODEREQ(Signal* signal)
303
UintR node_id = signal->theData[1];
304
NodeRecord* nodeRec = c_theNodes.getPtr(node_id);
305
nodeRec->alive = true;
308
nodeRec->trixRef = calcTrixBlockRef(node_id);
309
if (c_noNodesFailed == 0) {
310
c_blockState = Trix::STARTED;
316
Trix::execDUMP_STATE_ORD(Signal* signal)
320
DumpStateOrd * dumpStateOrd = (DumpStateOrd *)signal->getDataPtr();
322
switch(dumpStateOrd->args[0]) {
324
// index2 -T; index2 -I -n10000; index2 -c
325
// all dump 300 0 0 0 0 0 4 2
326
// select_count INDEX0000
327
BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
329
MEMCOPY_NO_WORDS(buildIndxReq,
331
BuildIndxImplReq::SignalLength);
332
buildIndxReq->senderRef = reference(); // return to me
333
buildIndxReq->parallelism = 10;
334
Uint32 indexColumns[1] = {1};
335
Uint32 keyColumns[1] = {0};
336
struct LinearSectionPtr ls_ptr[3];
337
ls_ptr[0].p = indexColumns;
339
ls_ptr[1].p = keyColumns;
341
sendSignal(reference(),
342
GSN_BUILD_INDX_IMPL_REQ,
344
BuildIndxImplReq::SignalLength,
349
// index2 -T; index2 -I -n10000; index2 -c -p
350
// all dump 301 0 0 0 0 0 4 2
351
// select_count INDEX0000
352
BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
354
MEMCOPY_NO_WORDS(buildIndxReq,
356
BuildIndxImplReq::SignalLength);
357
buildIndxReq->senderRef = reference(); // return to me
358
buildIndxReq->parallelism = 10;
359
Uint32 indexColumns[2] = {0, 1};
360
Uint32 keyColumns[1] = {0};
361
struct LinearSectionPtr ls_ptr[3];
362
ls_ptr[0].p = indexColumns;
364
ls_ptr[1].p = keyColumns;
366
sendSignal(reference(),
367
GSN_BUILD_INDX_IMPL_REQ,
369
BuildIndxImplReq::SignalLength,
374
// index -T; index -I -n1000; index -c -p
375
// all dump 302 0 0 0 0 0 4 2
376
// select_count PNUMINDEX0000
377
BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
379
MEMCOPY_NO_WORDS(buildIndxReq,
381
BuildIndxImplReq::SignalLength);
382
buildIndxReq->senderRef = reference(); // return to me
383
buildIndxReq->parallelism = 10;
384
Uint32 indexColumns[3] = {0, 3, 5};
385
Uint32 keyColumns[1] = {0};
386
struct LinearSectionPtr ls_ptr[3];
387
ls_ptr[0].p = indexColumns;
389
ls_ptr[1].p = keyColumns;
391
sendSignal(reference(),
392
GSN_BUILD_INDX_IMPL_REQ,
394
BuildIndxImplReq::SignalLength,
399
// index -T -2; index -I -2 -n1000; index -c -p
400
// all dump 303 0 0 0 0 0 4 2
401
// select_count PNUMINDEX0000
402
BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
404
MEMCOPY_NO_WORDS(buildIndxReq,
406
BuildIndxImplReq::SignalLength);
407
buildIndxReq->senderRef = reference(); // return to me
408
buildIndxReq->parallelism = 10;
409
Uint32 indexColumns[3] = {0, 3, 5};
410
Uint32 keyColumns[2] = {0, 1};
411
struct LinearSectionPtr ls_ptr[3];
412
ls_ptr[0].p = indexColumns;
414
ls_ptr[1].p = keyColumns;
416
sendSignal(reference(),
417
GSN_BUILD_INDX_IMPL_REQ,
419
BuildIndxImplReq::SignalLength,
424
// index -T -L; index -I -L -n1000; index -c -p
425
// all dump 304 0 0 0 0 0 4 2
426
// select_count PNUMINDEX0000
427
BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
429
MEMCOPY_NO_WORDS(buildIndxReq,
431
BuildIndxImplReq::SignalLength);
432
buildIndxReq->senderRef = reference(); // return to me
433
buildIndxReq->parallelism = 10;
434
Uint32 indexColumns[3] = {0, 3, 5};
435
Uint32 keyColumns[1] = {0};
436
struct LinearSectionPtr ls_ptr[3];
437
ls_ptr[0].p = indexColumns;
439
ls_ptr[1].p = keyColumns;
441
sendSignal(reference(),
442
GSN_BUILD_INDX_IMPL_REQ,
444
BuildIndxImplReq::SignalLength,
449
// index -T -2 -L; index -I -2 -L -n1000; index -c -p
450
// all dump 305 0 0 0 0 0 4 2
451
// select_count PNUMINDEX0000
452
BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
454
MEMCOPY_NO_WORDS(buildIndxReq,
456
BuildIndxImplReq::SignalLength);
457
buildIndxReq->senderRef = reference(); // return to me
458
buildIndxReq->parallelism = 10;
459
Uint32 indexColumns[3] = {0, 3, 5};
460
Uint32 keyColumns[2] = {0, 1};
461
struct LinearSectionPtr ls_ptr[3];
462
ls_ptr[0].p = indexColumns;
464
ls_ptr[1].p = keyColumns;
466
sendSignal(reference(),
467
GSN_BUILD_INDX_IMPL_REQ,
469
BuildIndxImplReq::SignalLength,
478
if (signal->theData[0] == DumpStateOrd::SchemaResourceSnapshot)
480
RSS_AP_SNAPSHOT_SAVE(c_theSubscriptionRecPool);
481
RSS_AP_SNAPSHOT_SAVE(c_statOpPool);
485
if (signal->theData[0] == DumpStateOrd::SchemaResourceCheckLeak)
487
RSS_AP_SNAPSHOT_CHECK(c_theSubscriptionRecPool);
488
RSS_AP_SNAPSHOT_CHECK(c_statOpPool);
492
if (signal->theData[0] == 8004)
494
infoEvent("TRIX: c_theSubscriptionRecPool size: %u free: %u",
495
c_theSubscriptionRecPool.getSize(),
496
c_theSubscriptionRecPool.getNoOfFree());
502
void Trix::execDBINFO_SCANREQ(Signal *signal)
504
DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
505
const Ndbinfo::ScanCursor* cursor =
506
CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
507
Ndbinfo::Ratelimit rl;
512
case Ndbinfo::POOLS_TABLEID:
514
Ndbinfo::pool_entry pools[] =
516
{ "Attribute Order Buffer",
517
c_theAttrOrderBufferPool.getUsed(),
518
c_theAttrOrderBufferPool.getSize(),
519
c_theAttrOrderBufferPool.getEntrySize(),
520
c_theAttrOrderBufferPool.getUsedHi(),
522
{ "Subscription Record",
523
c_theSubscriptionRecPool.getUsed(),
524
c_theSubscriptionRecPool.getSize(),
525
c_theSubscriptionRecPool.getEntrySize(),
526
c_theSubscriptionRecPool.getUsedHi(),
528
{ NULL, 0,0,0,0,{0,0,0,0}}
531
const size_t num_config_params =
532
sizeof(pools[0].config_params) / sizeof(pools[0].config_params[0]);
533
Uint32 pool = cursor->data[0];
534
BlockNumber bn = blockToMain(number());
535
while(pools[pool].poolname)
538
Ndbinfo::Row row(signal, req);
539
row.write_uint32(getOwnNodeId());
540
row.write_uint32(bn); // block number
541
row.write_uint32(instance()); // block instance
542
row.write_string(pools[pool].poolname);
543
row.write_uint64(pools[pool].used);
544
row.write_uint64(pools[pool].total);
545
row.write_uint64(pools[pool].used_hi);
546
row.write_uint64(pools[pool].entry_size);
547
for (size_t i = 0; i < num_config_params; i++)
548
row.write_uint32(pools[pool].config_params[i]);
549
ndbinfo_send_row(signal, req, row, rl);
551
if (rl.need_break(req))
554
ndbinfo_send_scan_break(signal, req, rl, pool);
564
ndbinfo_send_scan_conf(signal, req, rl);
568
void Trix:: execBUILD_INDX_IMPL_REQ(Signal* signal)
571
DBUG_ENTER("Trix:: execBUILD_INDX_IMPL_REQ");
573
const BuildIndxImplReq
574
buildIndxReqData = *(const BuildIndxImplReq*)signal->getDataPtr(),
575
*buildIndxReq = &buildIndxReqData;
577
// Seize a subscription record
578
SubscriptionRecPtr subRecPtr;
579
SubscriptionRecord* subRec;
580
SectionHandle handle(this, signal);
582
if (!c_theSubscriptions.seizeId(subRecPtr, buildIndxReq->buildId)) {
584
// Failed to allocate subscription record
585
BuildIndxRef* buildIndxRef = (BuildIndxRef*)signal->getDataPtrSend();
587
buildIndxRef->errorCode = BuildIndxRef::AllocationFailure;
588
releaseSections(handle);
589
sendSignal(buildIndxReq->senderRef, GSN_BUILD_INDX_IMPL_REF, signal,
590
BuildIndxRef::SignalLength, JBB);
594
subRec = subRecPtr.p;
595
subRec->errorCode = BuildIndxRef::NoError;
596
subRec->userReference = buildIndxReq->senderRef;
597
subRec->connectionPtr = buildIndxReq->senderData;
598
subRec->schemaTransId = buildIndxReq->transId;
599
subRec->subscriptionId = buildIndxReq->buildId;
600
subRec->subscriptionKey = buildIndxReq->buildKey;
601
subRec->indexType = buildIndxReq->indexType;
602
subRec->sourceTableId = buildIndxReq->tableId;
603
subRec->targetTableId = buildIndxReq->indexId;
604
subRec->parallelism = buildIndxReq->parallelism;
605
subRec->expectedConf = 0;
606
subRec->subscriptionCreated = false;
607
subRec->pendingSubSyncContinueConf = false;
608
subRec->prepareId = RNIL;
609
subRec->requestType = INDEX_BUILD;
610
subRec->fragCount = 0;
611
subRec->fragId = ZNIL;
612
subRec->m_rows_processed = 0;
613
subRec->m_flags = SubscriptionRecord::RF_WAIT_GCP; // Todo make configurable
615
if (buildIndxReq->requestType & BuildIndxImplReq::RF_NO_DISK)
617
subRec->m_flags |= SubscriptionRecord::RF_NO_DISK;
620
// Get column order segments
621
Uint32 noOfSections = handle.m_cnt;
622
if (noOfSections > 0) {
624
SegmentedSectionPtr ptr;
625
handle.getSection(ptr, BuildIndxImplReq::INDEX_COLUMNS);
626
append(subRec->attributeOrder, ptr, getSectionSegmentPool());
627
subRec->noOfIndexColumns = ptr.sz;
629
if (noOfSections > 1) {
631
SegmentedSectionPtr ptr;
632
handle.getSection(ptr, BuildIndxImplReq::KEY_COLUMNS);
633
append(subRec->attributeOrder, ptr, getSectionSegmentPool());
634
subRec->noOfKeyColumns = ptr.sz;
639
printf("Trix:: execBUILD_INDX_IMPL_REQ: Attribute order:\n");
640
subRec->attributeOrder.print(stdout);
643
releaseSections(handle);
644
prepareInsertTransactions(signal, subRecPtr);
648
void Trix:: execBUILD_INDX_IMPL_CONF(Signal* signal)
650
printf("Trix:: execBUILD_INDX_IMPL_CONF\n");
653
void Trix:: execBUILD_INDX_IMPL_REF(Signal* signal)
655
printf("Trix:: execBUILD_INDX_IMPL_REF\n");
658
void Trix::execUTIL_PREPARE_CONF(Signal* signal)
661
UtilPrepareConf * utilPrepareConf = (UtilPrepareConf *)signal->getDataPtr();
662
SubscriptionRecPtr subRecPtr;
663
SubscriptionRecord* subRec;
665
subRecPtr.i = utilPrepareConf->senderData;
666
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
667
printf("Trix::execUTIL_PREPARE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
670
if (subRec->requestType == STAT_UTIL)
672
statUtilPrepareConf(signal, subRec->m_statPtrI);
675
subRecPtr.p = subRec;
676
subRec->prepareId = utilPrepareConf->prepareId;
677
setupSubscription(signal, subRecPtr);
680
void Trix::execUTIL_PREPARE_REF(Signal* signal)
683
UtilPrepareRef * utilPrepareRef = (UtilPrepareRef *)signal->getDataPtr();
684
SubscriptionRecPtr subRecPtr;
685
SubscriptionRecord* subRec;
687
subRecPtr.i = utilPrepareRef->senderData;
688
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
689
printf("Trix::execUTIL_PREPARE_REF: Failed to find subscription data %u\n", subRecPtr.i);
692
if (subRec->requestType == STAT_UTIL)
694
statUtilPrepareRef(signal, subRec->m_statPtrI);
697
subRecPtr.p = subRec;
698
subRec->errorCode = (BuildIndxRef::ErrorCode)utilPrepareRef->errorCode;
700
UtilReleaseConf* conf = (UtilReleaseConf*)signal->getDataPtrSend();
701
conf->senderData = subRecPtr.i;
702
execUTIL_RELEASE_CONF(signal);
705
void Trix::execUTIL_EXECUTE_CONF(Signal* signal)
708
UtilExecuteConf * utilExecuteConf = (UtilExecuteConf *)signal->getDataPtr();
709
SubscriptionRecPtr subRecPtr;
710
SubscriptionRecord* subRec;
712
const Uint32 gci_hi = utilExecuteConf->gci_hi;
713
const Uint32 gci_lo = utilExecuteConf->gci_lo;
714
const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
716
subRecPtr.i = utilExecuteConf->senderData;
717
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
718
printf("rix::execUTIL_EXECUTE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
721
if (subRec->requestType == STAT_UTIL)
723
statUtilExecuteConf(signal, subRec->m_statPtrI);
726
subRecPtr.p = subRec;
727
subRec->expectedConf--;
729
if (gci > subRecPtr.p->m_gci)
732
subRecPtr.p->m_gci = gci;
735
checkParallelism(signal, subRec);
736
if (subRec->expectedConf == 0)
738
if (subRec->m_flags & SubscriptionRecord::RF_WAIT_GCP)
741
wait_gcp(signal, subRecPtr);
744
buildComplete(signal, subRecPtr);
748
void Trix::execUTIL_EXECUTE_REF(Signal* signal)
751
UtilExecuteRef * utilExecuteRef = (UtilExecuteRef *)signal->getDataPtr();
752
SubscriptionRecPtr subRecPtr;
753
SubscriptionRecord* subRec;
755
subRecPtr.i = utilExecuteRef->senderData;
756
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
757
printf("Trix::execUTIL_EXECUTE_REF: Failed to find subscription data %u\n", subRecPtr.i);
760
if (subRec->requestType == STAT_UTIL)
762
statUtilExecuteRef(signal, subRec->m_statPtrI);
765
subRecPtr.p = subRec;
766
ndbrequire(utilExecuteRef->errorCode == UtilExecuteRef::TCError);
767
if(utilExecuteRef->TCErrorCode == CONSTRAINT_VIOLATION)
770
buildFailed(signal, subRecPtr, BuildIndxRef::IndexNotUnique);
772
else if (check_timeout(utilExecuteRef->TCErrorCode))
775
buildFailed(signal, subRecPtr, BuildIndxRef::DeadlockError);
780
buildFailed(signal, subRecPtr,
781
(BuildIndxRef::ErrorCode)utilExecuteRef->TCErrorCode);
785
void Trix::execSUB_CREATE_CONF(Signal* signal)
788
DBUG_ENTER("Trix::execSUB_CREATE_CONF");
789
SubCreateConf * subCreateConf = (SubCreateConf *)signal->getDataPtr();
790
SubscriptionRecPtr subRecPtr;
791
SubscriptionRecord* subRec;
793
subRecPtr.i = subCreateConf->senderData;
794
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
795
printf("Trix::execSUB_CREATE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
798
subRec->subscriptionCreated = true;
799
subRecPtr.p = subRec;
801
DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
802
subRecPtr.i, subRecPtr.p->subscriptionId,
803
subRecPtr.p->subscriptionKey));
805
startTableScan(signal, subRecPtr);
809
void Trix::execSUB_CREATE_REF(Signal* signal)
812
DBUG_ENTER("Trix::execSUB_CREATE_REF");
814
SubCreateRef * subCreateRef = (SubCreateRef *)signal->getDataPtr();
815
SubscriptionRecPtr subRecPtr;
816
SubscriptionRecord* subRec;
818
subRecPtr.i = subCreateRef->senderData;
819
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL)
821
printf("Trix::execSUB_CREATE_REF: Failed to find subscription data %u\n", subRecPtr.i);
824
subRecPtr.p = subRec;
825
subRecPtr.p->errorCode = (BuildIndxRef::ErrorCode)subCreateRef->errorCode;
827
UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();
828
req->prepareId = subRecPtr.p->prepareId;
829
req->senderData = subRecPtr.i;
831
sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal,
832
UtilReleaseReq::SignalLength, JBB);
837
void Trix::execSUB_SYNC_CONF(Signal* signal)
840
DBUG_ENTER("Trix::execSUB_SYNC_CONF");
841
SubSyncConf * subSyncConf = (SubSyncConf *)signal->getDataPtr();
842
SubscriptionRecPtr subRecPtr;
843
SubscriptionRecord* subRec;
845
subRecPtr.i = subSyncConf->senderData;
846
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
847
printf("Trix::execSUB_SYNC_CONF: Failed to find subscription data %u\n",
852
subRecPtr.p = subRec;
853
subRec->expectedConf--;
854
checkParallelism(signal, subRec);
855
if (subRec->expectedConf == 0)
857
if (subRec->m_flags & SubscriptionRecord::RF_WAIT_GCP)
860
wait_gcp(signal, subRecPtr);
863
buildComplete(signal, subRecPtr);
868
void Trix::execSUB_SYNC_REF(Signal* signal)
871
DBUG_ENTER("Trix::execSUB_SYNC_REF");
872
SubSyncRef * subSyncRef = (SubSyncRef *)signal->getDataPtr();
873
SubscriptionRecPtr subRecPtr;
874
SubscriptionRecord* subRec;
876
subRecPtr.i = subSyncRef->senderData;
877
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
878
printf("Trix::execSUB_SYNC_REF: Failed to find subscription data %u\n", subRecPtr.i);
881
subRecPtr.p = subRec;
882
buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
886
void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal)
888
SubSyncContinueReq * subSyncContinueReq =
889
(SubSyncContinueReq *) signal->getDataPtr();
891
SubscriptionRecPtr subRecPtr;
892
SubscriptionRecord* subRec;
893
subRecPtr.i = subSyncContinueReq->subscriberData;
894
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
895
printf("Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %u\n", subRecPtr.i);
898
subRecPtr.p = subRec;
899
subRec->pendingSubSyncContinueConf = true;
900
subRec->syncPtr = subSyncContinueReq->senderData;
901
checkParallelism(signal, subRec);
904
void Trix::execSUB_TABLE_DATA(Signal* signal)
907
DBUG_ENTER("Trix::execSUB_TABLE_DATA");
908
SubTableData * subTableData = (SubTableData *)signal->getDataPtr();
909
SubscriptionRecPtr subRecPtr;
910
SubscriptionRecord* subRec;
911
subRecPtr.i = subTableData->senderData;
912
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
913
printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %u\n", subRecPtr.i);
916
subRecPtr.p = subRec;
917
switch(subRecPtr.p->requestType){
919
executeBuildInsertTransaction(signal, subRecPtr);
923
executeReorgTransaction(signal, subRecPtr, subTableData->takeOver);
930
StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
931
statCleanExecute(signal, stat);
936
StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
937
statScanExecute(signal, stat);
942
subRecPtr.p->m_rows_processed++;
947
void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr)
950
DBUG_ENTER("Trix::setupSubscription");
951
SubscriptionRecord* subRec = subRecPtr.p;
952
SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend();
953
// Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns;
954
subCreateReq->senderRef = reference();
955
subCreateReq->senderData = subRecPtr.i;
956
subCreateReq->subscriptionId = subRec->subscriptionId;
957
subCreateReq->subscriptionKey = subRec->subscriptionKey;
958
subCreateReq->tableId = subRec->sourceTableId;
959
subCreateReq->subscriptionType = SubCreateReq::SingleTableScan;
960
subCreateReq->schemaTransId = subRec->schemaTransId;
962
DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
963
subRecPtr.i, subCreateReq->subscriptionId,
964
subCreateReq->subscriptionKey));
966
sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ,
967
signal, SubCreateReq::SignalLength, JBB);
972
void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
976
Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2];
977
SubscriptionRecord* subRec = subRecPtr.p;
978
AttrOrderBuffer::DataBufferIterator iter;
981
bool moreAttributes = subRec->attributeOrder.first(iter);
982
while (moreAttributes) {
983
attributeList[i++] = *iter.data;
984
moreAttributes = subRec->attributeOrder.next(iter);
987
// Merge index and key column segments
988
struct LinearSectionPtr orderPtr[3];
990
orderPtr[0].p = attributeList;
991
orderPtr[0].sz = subRec->attributeOrder.getSize();
994
SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
995
subSyncReq->senderRef = reference();
996
subSyncReq->senderData = subRecPtr.i;
997
subSyncReq->subscriptionId = subRec->subscriptionId;
998
subSyncReq->subscriptionKey = subRec->subscriptionKey;
999
subSyncReq->part = SubscriptionData::TableData;
1000
subSyncReq->requestInfo = 0;
1001
subSyncReq->fragCount = subRec->fragCount;
1002
subSyncReq->fragId = subRec->fragId;
1004
if (subRec->m_flags & SubscriptionRecord::RF_NO_DISK)
1007
subSyncReq->requestInfo |= SubSyncReq::NoDisk;
1010
if (subRec->m_flags & SubscriptionRecord::RF_TUP_ORDER)
1013
subSyncReq->requestInfo |= SubSyncReq::TupOrder;
1016
if (subRec->requestType == REORG_COPY)
1019
subSyncReq->requestInfo |= SubSyncReq::LM_Exclusive;
1021
else if (subRec->requestType == REORG_DELETE)
1024
subSyncReq->requestInfo |= SubSyncReq::LM_Exclusive;
1025
subSyncReq->requestInfo |= SubSyncReq::Reorg;
1027
else if (subRec->requestType == STAT_CLEAN)
1030
StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1031
StatOp::Clean clean = stat.m_clean;
1032
orderPtr[1].p = clean.m_bound;
1033
orderPtr[1].sz = clean.m_boundSize;
1035
subSyncReq->requestInfo |= SubSyncReq::LM_CommittedRead;
1036
subSyncReq->requestInfo |= SubSyncReq::RangeScan;
1038
else if (subRec->requestType == STAT_SCAN)
1044
subSyncReq->requestInfo |= SubSyncReq::LM_CommittedRead;
1045
subSyncReq->requestInfo |= SubSyncReq::RangeScan;
1046
subSyncReq->requestInfo |= SubSyncReq::StatScan;
1048
subRecPtr.p->expectedConf = 1;
1050
DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
1051
subRecPtr.i, subSyncReq->subscriptionId,
1052
subSyncReq->subscriptionKey));
1054
sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
1055
signal, SubSyncReq::SignalLength, JBB, orderPtr, noOfSections);
1058
void Trix::prepareInsertTransactions(Signal* signal,
1059
SubscriptionRecPtr subRecPtr)
1061
SubscriptionRecord* subRec = subRecPtr.p;
1062
UtilPrepareReq * utilPrepareReq =
1063
(UtilPrepareReq *)signal->getDataPtrSend();
1066
utilPrepareReq->senderRef = reference();
1067
utilPrepareReq->senderData = subRecPtr.i;
1068
utilPrepareReq->schemaTransId = subRec->schemaTransId;
1070
const Uint32 pageSizeInWords = 128;
1071
Uint32 propPage[pageSizeInWords];
1072
LinearWriter w(&propPage[0],128);
1074
w.add(UtilPrepareReq::NoOfOperations, 1);
1075
w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
1076
w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1077
// Add index attributes in increasing order and one PK attribute
1078
for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++)
1079
w.add(UtilPrepareReq::AttributeId, i);
1083
SimplePropertiesLinearReader reader(propPage, w.getWordsUsed());
1084
printf("Trix::prepareInsertTransactions: Sent SimpleProperties:\n");
1085
reader.printAll(ndbout);
1088
struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
1089
sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1090
sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1091
sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1092
UtilPrepareReq::SignalLength, JBB,
1093
sectionsPtr, UtilPrepareReq::NoOfSections);
1096
void Trix::executeBuildInsertTransaction(Signal* signal,
1097
SubscriptionRecPtr subRecPtr)
1100
SubscriptionRecord* subRec = subRecPtr.p;
1101
UtilExecuteReq * utilExecuteReq =
1102
(UtilExecuteReq *)signal->getDataPtrSend();
1104
utilExecuteReq->senderRef = reference();
1105
utilExecuteReq->senderData = subRecPtr.i;
1106
utilExecuteReq->prepareId = subRec->prepareId;
1108
printf("Header size %u\n", headerPtr.sz);
1109
for(int i = 0; i < headerPtr.sz; i++)
1110
printf("H'%.8x ", headerBuffer[i]);
1113
printf("Data size %u\n", dataPtr.sz);
1114
for(int i = 0; i < dataPtr.sz; i++)
1115
printf("H'%.8x ", dataBuffer[i]);
1118
// Save scan result in linear buffers
1119
SectionHandle handle(this, signal);
1120
SegmentedSectionPtr headerPtr, dataPtr;
1122
handle.getSection(headerPtr, 0);
1123
handle.getSection(dataPtr, 1);
1125
Uint32* headerBuffer = signal->theData + 25;
1126
Uint32* dataBuffer = headerBuffer + headerPtr.sz;
1128
copy(headerBuffer, headerPtr);
1129
copy(dataBuffer, dataPtr);
1130
releaseSections(handle);
1132
// Calculate packed key size
1133
Uint32 noOfKeyData = 0;
1134
for(Uint32 i = 0; i < headerPtr.sz; i++) {
1135
AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i;
1137
// Filter out NULL attributes
1138
if (keyAttrHead->isNULL())
1141
if (i < subRec->noOfIndexColumns)
1142
// Renumber index attributes in consequtive order
1143
keyAttrHead->setAttributeId(i);
1145
// Calculate total size of PK attribute
1146
noOfKeyData += keyAttrHead->getDataSize();
1148
// Increase expected CONF count
1149
subRec->expectedConf++;
1151
// Pack key attributes
1152
AttributeHeader::init(headerBuffer + subRec->noOfIndexColumns,
1153
subRec->noOfIndexColumns,
1156
struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections];
1157
sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;
1158
sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz =
1159
subRec->noOfIndexColumns + 1;
1160
sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;
1161
sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;
1162
sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1163
UtilExecuteReq::SignalLength, JBB,
1164
sectionsPtr, UtilExecuteReq::NoOfSections);
1167
void Trix::executeReorgTransaction(Signal* signal,
1168
SubscriptionRecPtr subRecPtr,
1172
SubscriptionRecord* subRec = subRecPtr.p;
1173
UtilExecuteReq * utilExecuteReq =
1174
(UtilExecuteReq *)signal->getDataPtrSend();
1176
const Uint32 tScanInfo = takeOver & 0x3FFFF;
1177
const Uint32 tTakeOverFragment = takeOver >> 20;
1180
TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
1181
TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
1182
TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
1183
utilExecuteReq->scanTakeOver = scanInfo;
1186
utilExecuteReq->senderRef = reference();
1187
utilExecuteReq->senderData = subRecPtr.i;
1188
utilExecuteReq->prepareId = subRec->prepareId;
1190
printf("Header size %u\n", headerPtr.sz);
1191
for(int i = 0; i < headerPtr.sz; i++)
1192
printf("H'%.8x ", headerBuffer[i]);
1195
printf("Data size %u\n", dataPtr.sz);
1196
for(int i = 0; i < dataPtr.sz; i++)
1197
printf("H'%.8x ", dataBuffer[i]);
1200
// Increase expected CONF count
1201
subRec->expectedConf++;
1203
SectionHandle handle(this, signal);
1204
sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1205
UtilExecuteReq::SignalLength, JBB,
1210
Trix::wait_gcp(Signal* signal, SubscriptionRecPtr subRecPtr, Uint32 delay)
1212
WaitGCPReq * req = (WaitGCPReq*)signal->getDataPtrSend();
1213
req->senderRef = reference();
1214
req->senderData = subRecPtr.i;
1215
req->requestType = WaitGCPReq::CurrentGCI;
1220
sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
1221
WaitGCPReq::SignalLength, JBB);
1226
sendSignalWithDelay(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
1227
delay, WaitGCPReq::SignalLength);
1232
Trix::execWAIT_GCP_REF(Signal* signal)
1234
WaitGCPRef ref = *(WaitGCPRef*)signal->getDataPtr();
1236
SubscriptionRecPtr subRecPtr;
1237
c_theSubscriptions.getPtr(subRecPtr, ref.senderData);
1238
wait_gcp(signal, subRecPtr, 100);
1242
Trix::execWAIT_GCP_CONF(Signal* signal)
1244
WaitGCPConf * conf = (WaitGCPConf*)signal->getDataPtr();
1246
SubscriptionRecPtr subRecPtr;
1247
c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1249
const Uint32 gci_hi = conf->gci_hi;
1250
const Uint32 gci_lo = conf->gci_lo;
1251
const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
1253
if (gci > subRecPtr.p->m_gci)
1256
buildComplete(signal, subRecPtr);
1261
wait_gcp(signal, subRecPtr, 100);
1265
void Trix::buildComplete(Signal* signal, SubscriptionRecPtr subRecPtr)
1267
SubRemoveReq * const req = (SubRemoveReq*)signal->getDataPtrSend();
1268
req->senderRef = reference();
1269
req->senderData = subRecPtr.i;
1270
req->subscriptionId = subRecPtr.p->subscriptionId;
1271
req->subscriptionKey = subRecPtr.p->subscriptionKey;
1272
sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal,
1273
SubRemoveReq::SignalLength, JBB);
1276
void Trix::buildFailed(Signal* signal,
1277
SubscriptionRecPtr subRecPtr,
1278
BuildIndxRef::ErrorCode errorCode)
1280
SubscriptionRecord* subRec = subRecPtr.p;
1282
subRec->errorCode = errorCode;
1283
// Continue accumulating since we currently cannot stop SUMA
1284
subRec->expectedConf--;
1285
checkParallelism(signal, subRec);
1286
if (subRec->expectedConf == 0)
1287
buildComplete(signal, subRecPtr);
1291
Trix::execSUB_REMOVE_REF(Signal* signal){
1298
Trix::execSUB_REMOVE_CONF(Signal* signal){
1301
SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
1303
SubscriptionRecPtr subRecPtr;
1304
c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1306
if(subRecPtr.p->prepareId != RNIL){
1309
UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();
1310
req->prepareId = subRecPtr.p->prepareId;
1311
req->senderData = subRecPtr.i;
1313
sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal,
1314
UtilReleaseReq::SignalLength , JBB);
1319
UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
1320
conf->senderData = subRecPtr.i;
1321
execUTIL_RELEASE_CONF(signal);
1326
Trix::execUTIL_RELEASE_REF(Signal* signal){
1332
Trix::execUTIL_RELEASE_CONF(Signal* signal){
1334
UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
1336
SubscriptionRecPtr subRecPtr;
1337
c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1339
switch(subRecPtr.p->requestType){
1342
if (subRecPtr.p->errorCode == BuildIndxRef::NoError)
1345
// Build is complete, reply to original sender
1346
CopyDataImplConf* conf = (CopyDataImplConf*)signal->getDataPtrSend();
1347
conf->senderRef = reference(); //wl3600_todo ok?
1348
conf->senderData = subRecPtr.p->connectionPtr;
1350
sendSignal(subRecPtr.p->userReference, GSN_COPY_DATA_IMPL_CONF, signal,
1351
CopyDataImplConf::SignalLength , JBB);
1353
infoEvent("%s table %u processed %llu rows",
1354
subRecPtr.p->requestType == REORG_COPY ?
1355
"reorg-copy" : "reorg-delete",
1356
subRecPtr.p->sourceTableId,
1357
subRecPtr.p->m_rows_processed);
1360
// Build failed, reply to original sender
1361
CopyDataImplRef* ref = (CopyDataImplRef*)signal->getDataPtrSend();
1362
ref->senderRef = reference();
1363
ref->senderData = subRecPtr.p->connectionPtr;
1364
ref->errorCode = subRecPtr.p->errorCode;
1366
sendSignal(subRecPtr.p->userReference, GSN_COPY_DATA_IMPL_REF, signal,
1367
CopyDataImplRef::SignalLength , JBB);
1371
if (subRecPtr.p->errorCode == BuildIndxRef::NoError) {
1373
// Build is complete, reply to original sender
1374
BuildIndxImplConf* buildIndxConf =
1375
(BuildIndxImplConf*)signal->getDataPtrSend();
1376
buildIndxConf->senderRef = reference(); //wl3600_todo ok?
1377
buildIndxConf->senderData = subRecPtr.p->connectionPtr;
1379
sendSignal(subRecPtr.p->userReference, GSN_BUILD_INDX_IMPL_CONF, signal,
1380
BuildIndxConf::SignalLength , JBB);
1382
infoEvent("index-build table %u index: %u processed %llu rows",
1383
subRecPtr.p->sourceTableId,
1384
subRecPtr.p->targetTableId,
1385
subRecPtr.p->m_rows_processed);
1388
// Build failed, reply to original sender
1389
BuildIndxImplRef* buildIndxRef =
1390
(BuildIndxImplRef*)signal->getDataPtrSend();
1391
buildIndxRef->senderRef = reference();
1392
buildIndxRef->senderData = subRecPtr.p->connectionPtr;
1393
buildIndxRef->errorCode = subRecPtr.p->errorCode;
1395
sendSignal(subRecPtr.p->userReference, GSN_BUILD_INDX_IMPL_REF, signal,
1396
BuildIndxRef::SignalLength , JBB);
1400
ndbrequire(subRecPtr.p->errorCode == BuildIndxRef::NoError);
1401
statUtilReleaseConf(signal, subRecPtr.p->m_statPtrI);
1405
subRecPtr.p->prepareId = RNIL;
1406
StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1407
statCleanRelease(signal, stat);
1412
subRecPtr.p->prepareId = RNIL;
1413
StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1414
statScanRelease(signal, stat);
1419
// Release subscription record
1420
subRecPtr.p->attributeOrder.release();
1421
c_theSubscriptions.release(subRecPtr.i);
1424
void Trix::checkParallelism(Signal* signal, SubscriptionRecord* subRec)
1426
if ((subRec->pendingSubSyncContinueConf) &&
1427
(subRec->expectedConf == 1)) {
1429
SubSyncContinueConf * subSyncContinueConf =
1430
(SubSyncContinueConf *) signal->getDataPtrSend();
1431
subSyncContinueConf->subscriptionId = subRec->subscriptionId;
1432
subSyncContinueConf->subscriptionKey = subRec->subscriptionKey;
1433
subSyncContinueConf->senderData = subRec->syncPtr;
1434
sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal,
1435
SubSyncContinueConf::SignalLength , JBB);
1436
subRec->pendingSubSyncContinueConf = false;
1443
Trix::execCOPY_DATA_IMPL_REQ(Signal* signal)
1447
const CopyDataImplReq reqData = *(const CopyDataImplReq*)signal->getDataPtr();
1448
const CopyDataImplReq *req = &reqData;
1450
// Seize a subscription record
1451
SubscriptionRecPtr subRecPtr;
1452
SectionHandle handle(this, signal);
1454
if (!c_theSubscriptions.seize(subRecPtr))
1457
// Failed to allocate subscription record
1458
releaseSections(handle);
1460
CopyDataImplRef* ref = (CopyDataRef*)signal->getDataPtrSend();
1462
ref->errorCode = -1; // XXX CopyDataImplRef::AllocationFailure;
1463
ref->senderData = req->senderData;
1464
ref->transId = req->transId;
1465
sendSignal(req->senderRef, GSN_COPY_DATA_IMPL_REF, signal,
1466
CopyDataImplRef::SignalLength, JBB);
1470
SubscriptionRecord* subRec = subRecPtr.p;
1471
subRec->errorCode = BuildIndxRef::NoError;
1472
subRec->userReference = req->senderRef;
1473
subRec->connectionPtr = req->senderData;
1474
subRec->schemaTransId = req->transId;
1475
subRec->subscriptionId = rand();
1476
subRec->subscriptionKey = rand();
1477
subRec->indexType = RNIL;
1478
subRec->sourceTableId = req->srcTableId;
1479
subRec->targetTableId = req->dstTableId;
1480
subRec->parallelism = 16;
1481
subRec->expectedConf = 0;
1482
subRec->subscriptionCreated = false;
1483
subRec->pendingSubSyncContinueConf = false;
1484
subRec->prepareId = req->transId;
1485
subRec->fragCount = req->srcFragments;
1486
subRec->fragId = ZNIL;
1487
subRec->m_rows_processed = 0;
1488
subRec->m_flags = SubscriptionRecord::RF_WAIT_GCP; // Todo make configurable
1490
switch(req->requestType){
1491
case CopyDataImplReq::ReorgCopy:
1493
subRec->requestType = REORG_COPY;
1495
case CopyDataImplReq::ReorgDelete:
1496
subRec->requestType = REORG_DELETE;
1499
jamLine(req->requestType);
1503
if (req->requestInfo & CopyDataReq::TupOrder)
1506
subRec->m_flags |= SubscriptionRecord::RF_TUP_ORDER;
1509
// Get column order segments
1510
Uint32 noOfSections = handle.m_cnt;
1511
if (noOfSections > 0) {
1513
SegmentedSectionPtr ptr;
1514
handle.getSection(ptr, 0);
1515
append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1516
subRec->noOfIndexColumns = ptr.sz;
1519
if (noOfSections > 1) {
1521
SegmentedSectionPtr ptr;
1522
handle.getSection(ptr, 1);
1523
append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1524
subRec->noOfKeyColumns = ptr.sz;
1527
releaseSections(handle);
1529
UtilPrepareReq * utilPrepareReq =
1530
(UtilPrepareReq *)signal->getDataPtrSend();
1532
utilPrepareReq->senderRef = reference();
1533
utilPrepareReq->senderData = subRecPtr.i;
1534
utilPrepareReq->schemaTransId = subRec->schemaTransId;
1536
const Uint32 pageSizeInWords = 128;
1537
Uint32 propPage[pageSizeInWords];
1538
LinearWriter w(&propPage[0],128);
1540
w.add(UtilPrepareReq::NoOfOperations, 1);
1541
if (subRec->requestType == REORG_COPY)
1543
w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
1547
w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Delete);
1549
w.add(UtilPrepareReq::ScanTakeOverInd, 1);
1550
w.add(UtilPrepareReq::ReorgInd, 1);
1551
w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1553
AttrOrderBuffer::DataBufferIterator iter;
1554
ndbrequire(subRec->attributeOrder.first(iter));
1556
for(Uint32 i = 0; i < subRec->noOfIndexColumns; i++)
1558
w.add(UtilPrepareReq::AttributeId, * iter.data);
1559
subRec->attributeOrder.next(iter);
1562
struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
1563
sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1564
sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1565
sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1566
UtilPrepareReq::SignalLength, JBB,
1567
sectionsPtr, UtilPrepareReq::NoOfSections);
1574
Trix::statOpGetPtr(Uint32 statPtrI)
1576
ndbrequire(statPtrI != RNIL);
1577
return *c_statOpPool.getPtr(statPtrI);
1581
Trix::statOpSeize(Uint32& statPtrI)
1584
if (ERROR_INSERTED(18001) ||
1585
!c_statOpPool.seize(statPtr))
1588
D("statOpSeize: seize statOp failed");
1592
memset(statPtr.p, 0xf3, sizeof(*statPtr.p));
1594
new (statPtr.p) StatOp;
1595
statPtrI = statPtr.i;
1596
StatOp& stat = statOpGetPtr(statPtrI);
1597
stat.m_ownPtrI = statPtrI;
1599
SubscriptionRecPtr subRecPtr;
1600
if (ERROR_INSERTED(18002) ||
1601
!c_theSubscriptions.seize(subRecPtr))
1604
c_statOpPool.release(statPtr);
1605
D("statOpSeize: seize subRec failed");
1608
SubscriptionRecord* subRec = subRecPtr.p;
1609
subRec->m_statPtrI = stat.m_ownPtrI;
1610
stat.m_subRecPtrI = subRecPtr.i;
1612
D("statOpSeize" << V(statPtrI) << V(subRecPtr.i));
1617
Trix::statOpRelease(StatOp& stat)
1619
StatOp::Util& util = stat.m_util;
1620
D("statOpRelease" << V(stat));
1622
if (stat.m_subRecPtrI != RNIL)
1625
SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
1626
ndbrequire(subRec->prepareId == RNIL);
1627
subRec->attributeOrder.release();
1628
c_theSubscriptions.release(stat.m_subRecPtrI);
1629
stat.m_subRecPtrI = RNIL;
1631
ndbrequire(util.m_prepareId == RNIL);
1632
c_statOpPool.release(stat.m_ownPtrI);
1636
Trix::execINDEX_STAT_IMPL_REQ(Signal* signal)
1639
const IndexStatImplReq* req = (const IndexStatImplReq*)signal->getDataPtr();
1641
Uint32 statPtrI = RNIL;
1642
if (!statOpSeize(statPtrI))
1645
statOpRef(signal, req, IndexStatRef::NoFreeStatOp, __LINE__);
1648
StatOp& stat = statOpGetPtr(statPtrI);
1650
stat.m_requestType = req->requestType;
1652
// set request name for cluster log message
1653
switch (stat.m_requestType) {
1654
case IndexStatReq::RT_CLEAN_NEW:
1656
stat.m_requestName = "clean new";
1658
case IndexStatReq::RT_CLEAN_OLD:
1660
stat.m_requestName = "clean old";
1662
case IndexStatReq::RT_CLEAN_ALL:
1664
stat.m_requestName = "clean all";
1666
case IndexStatReq::RT_SCAN_FRAG:
1668
stat.m_requestName = "scan frag";
1670
case IndexStatReq::RT_DROP_HEAD:
1672
stat.m_requestName = "drop head";
1679
SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
1680
subRec->prepareId = RNIL;
1681
subRec->errorCode = BuildIndxRef::NoError;
1683
// sys tables are not recreated so do this only once
1684
if (!c_statGetMetaDone)
1687
statMetaGetHead(signal, stat);
1690
statGetMetaDone(signal, stat);
1693
// sys tables metadata
1695
const Trix::SysColumn
1696
Trix::g_statMetaHead_column[] = {
1700
{ 1, "index_version",
1709
{ 4, "value_format",
1712
{ 5, "sample_version",
1718
{ 7, "sample_count",
1726
const Trix::SysColumn
1727
Trix::g_statMetaSample_column[] = {
1731
{ 1, "index_version",
1734
{ 2, "sample_version",
1745
const Trix::SysTable
1746
Trix::g_statMetaHead = {
1747
NDB_INDEX_STAT_DB "/" NDB_INDEX_STAT_SCHEMA "/" NDB_INDEX_STAT_HEAD_TABLE,
1749
sizeof(g_statMetaHead_column)/sizeof(g_statMetaHead_column[0]),
1750
g_statMetaHead_column
1753
const Trix::SysTable
1754
Trix::g_statMetaSample = {
1755
NDB_INDEX_STAT_DB "/" NDB_INDEX_STAT_SCHEMA "/" NDB_INDEX_STAT_SAMPLE_TABLE,
1757
sizeof(g_statMetaSample_column)/sizeof(g_statMetaSample_column[0]),
1758
g_statMetaSample_column
1761
const Trix::SysIndex
1762
Trix::g_statMetaSampleX1 = {
1763
// indexes are always in "sys"
1764
"sys" "/" NDB_INDEX_STAT_SCHEMA "/%u/" NDB_INDEX_STAT_SAMPLE_INDEX1,
1770
Trix::statMetaGetHead(Signal* signal, StatOp& stat)
1772
D("statMetaGetHead" << V(stat));
1773
StatOp::Meta& meta = stat.m_meta;
1774
meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetHeadCB);
1775
meta.m_cb.m_callbackData = stat.m_ownPtrI;
1776
const char* name = g_statMetaHead.name;
1777
sendGetTabInfoReq(signal, stat, name);
1781
Trix::statMetaGetHeadCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
1783
StatOp& stat = statOpGetPtr(statPtrI);
1784
D("statMetaGetHeadCB" << V(stat) << V(ret));
1785
StatOp::Meta& meta = stat.m_meta;
1789
Uint32 supress[] = { GetTabInfoRef::TableNotDefined, 0 };
1790
statOpError(signal, stat, ret, __LINE__, supress);
1793
g_statMetaHead.tableId = meta.m_conf.tableId;
1794
statMetaGetSample(signal, stat);
1798
Trix::statMetaGetSample(Signal* signal, StatOp& stat)
1800
D("statMetaGetSample" << V(stat));
1801
StatOp::Meta& meta = stat.m_meta;
1802
meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetSampleCB);
1803
meta.m_cb.m_callbackData = stat.m_ownPtrI;
1804
const char* name = g_statMetaSample.name;
1805
sendGetTabInfoReq(signal, stat, name);
1809
Trix::statMetaGetSampleCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
1811
StatOp& stat = statOpGetPtr(statPtrI);
1812
D("statMetaGetSampleCB" << V(stat) << V(ret));
1813
StatOp::Meta& meta = stat.m_meta;
1817
statOpError(signal, stat, ret, __LINE__);
1820
g_statMetaSample.tableId = meta.m_conf.tableId;
1821
statMetaGetSampleX1(signal, stat);
1825
Trix::statMetaGetSampleX1(Signal* signal, StatOp& stat)
1827
D("statMetaGetSampleX1" << V(stat));
1828
StatOp::Meta& meta = stat.m_meta;
1829
meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetSampleX1CB);
1830
meta.m_cb.m_callbackData = stat.m_ownPtrI;
1831
const char* name_fmt = g_statMetaSampleX1.name;
1832
char name[MAX_TAB_NAME_SIZE];
1833
BaseString::snprintf(name, sizeof(name), name_fmt, g_statMetaSample.tableId);
1834
sendGetTabInfoReq(signal, stat, name);
1838
Trix::statMetaGetSampleX1CB(Signal* signal, Uint32 statPtrI, Uint32 ret)
1840
StatOp& stat = statOpGetPtr(statPtrI);
1841
D("statMetaGetSampleX1CB" << V(stat) << V(ret));
1842
StatOp::Meta& meta = stat.m_meta;
1846
statOpError(signal, stat, ret, __LINE__);
1849
g_statMetaSampleX1.tableId = g_statMetaSample.tableId;
1850
g_statMetaSampleX1.indexId = meta.m_conf.tableId;
1851
statGetMetaDone(signal, stat);
1855
Trix::sendGetTabInfoReq(Signal* signal, StatOp& stat, const char* name)
1857
D("sendGetTabInfoReq" << V(stat) << V(name));
1858
GetTabInfoReq* req = (GetTabInfoReq*)signal->getDataPtrSend();
1860
Uint32 name_len = (Uint32)strlen(name) + 1;
1861
Uint32 name_len_words = (name_len + 3 ) / 4;
1862
Uint32 name_buf[32];
1863
ndbrequire(name_len_words <= 32);
1864
memset(name_buf, 0, sizeof(name_buf));
1865
memcpy(name_buf, name, name_len);
1867
req->senderData = stat.m_ownPtrI;
1868
req->senderRef = reference();
1869
req->requestType = GetTabInfoReq::RequestByName |
1870
GetTabInfoReq::LongSignalConf;
1871
req->tableNameLen = name_len;
1872
req->schemaTransId = 0;
1873
LinearSectionPtr ptr[3];
1874
ptr[0].p = name_buf;
1875
ptr[0].sz = name_len_words;
1876
sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ,
1877
signal, GetTabInfoReq::SignalLength, JBB, ptr, 1);
1881
Trix::execGET_TABINFO_CONF(Signal* signal)
1884
if (!assembleFragments(signal)) {
1888
const GetTabInfoConf* conf = (const GetTabInfoConf*)signal->getDataPtr();
1889
StatOp& stat = statOpGetPtr(conf->senderData);
1890
D("execGET_TABINFO_CONF" << V(stat));
1891
StatOp::Meta& meta = stat.m_meta;
1892
meta.m_conf = *conf;
1894
// do not need DICTTABINFO
1895
SectionHandle handle(this, signal);
1896
releaseSections(handle);
1898
execute(signal, meta.m_cb, 0);
1902
Trix::execGET_TABINFO_REF(Signal* signal)
1905
const GetTabInfoRef* ref = (const GetTabInfoRef*)signal->getDataPtr();
1906
StatOp& stat = statOpGetPtr(ref->senderData);
1907
D("execGET_TABINFO_REF" << V(stat));
1908
StatOp::Meta& meta = stat.m_meta;
1910
ndbrequire(ref->errorCode != 0);
1911
execute(signal, meta.m_cb, ref->errorCode);
1914
// continue after metadata retrieval
1917
Trix::statGetMetaDone(Signal* signal, StatOp& stat)
1919
const IndexStatImplReq* req = &stat.m_req;
1920
StatOp::Data& data = stat.m_data;
1921
SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
1922
D("statGetMetaDone" << V(stat));
1924
// c_statGetMetaDone = true;
1926
subRec->requestType = STAT_UTIL;
1927
// fill in constant part
1928
ndbrequire(req->fragCount != 0);
1929
data.m_indexId = req->indexId;
1930
data.m_indexVersion = req->indexVersion;
1931
data.m_fragCount = req->fragCount;
1932
statHeadRead(signal, stat);
1938
Trix::statHeadRead(Signal* signal, StatOp& stat)
1940
StatOp::Util& util = stat.m_util;
1941
StatOp::Send& send = stat.m_send;
1942
D("statHeadRead" << V(stat));
1944
util.m_not_found = false;
1945
util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadReadCB);
1946
util.m_cb.m_callbackData = stat.m_ownPtrI;
1947
send.m_sysTable = &g_statMetaHead;
1948
send.m_operationType = UtilPrepareReq::Read;
1949
statUtilPrepare(signal, stat);
1953
Trix::statHeadReadCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
1955
StatOp& stat = statOpGetPtr(statPtrI);
1956
StatOp::Data& data = stat.m_data;
1957
StatOp::Util& util = stat.m_util;
1958
D("statHeadReadCB" << V(stat) << V(ret));
1960
ndbrequire(ret == 0);
1961
data.m_head_found = !util.m_not_found;
1962
statReadHeadDone(signal, stat);
1966
Trix::statHeadInsert(Signal* signal, StatOp& stat)
1968
StatOp::Util& util = stat.m_util;
1969
StatOp::Send& send = stat.m_send;
1970
D("statHeadInsert" << V(stat));
1972
util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadInsertCB);
1973
util.m_cb.m_callbackData = stat.m_ownPtrI;
1974
send.m_sysTable = &g_statMetaHead;
1975
send.m_operationType = UtilPrepareReq::Insert;
1976
statUtilPrepare(signal, stat);
1980
Trix::statHeadInsertCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
1982
StatOp& stat = statOpGetPtr(statPtrI);
1983
D("statHeadInsertCB" << V(stat) << V(ret));
1985
ndbrequire(ret == 0);
1986
statInsertHeadDone(signal, stat);
1990
Trix::statHeadUpdate(Signal* signal, StatOp& stat)
1992
StatOp::Util& util = stat.m_util;
1993
StatOp::Send& send = stat.m_send;
1994
D("statHeadUpdate" << V(stat));
1996
util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadUpdateCB);
1997
util.m_cb.m_callbackData = stat.m_ownPtrI;
1998
send.m_sysTable = &g_statMetaHead;
1999
send.m_operationType = UtilPrepareReq::Update;
2000
statUtilPrepare(signal, stat);
2004
Trix::statHeadUpdateCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2006
StatOp& stat = statOpGetPtr(statPtrI);
2007
D("statHeadUpdateCB" << V(stat) << V(ret));
2009
ndbrequire(ret == 0);
2010
statUpdateHeadDone(signal, stat);
2014
Trix::statHeadDelete(Signal* signal, StatOp& stat)
2016
StatOp::Util& util = stat.m_util;
2017
StatOp::Send& send = stat.m_send;
2018
D("statHeadDelete" << V(stat));
2020
util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadDeleteCB);
2021
util.m_cb.m_callbackData = stat.m_ownPtrI;
2022
send.m_sysTable = &g_statMetaHead;
2023
send.m_operationType = UtilPrepareReq::Delete;
2024
statUtilPrepare(signal, stat);
2028
Trix::statHeadDeleteCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2030
StatOp& stat = statOpGetPtr(statPtrI);
2031
D("statHeadDeleteCB" << V(stat) << V(ret));
2033
ndbrequire(ret == 0);
2034
statDeleteHeadDone(signal, stat);
2037
// util (PK ops, only HEAD for now)
2040
Trix::statUtilPrepare(Signal* signal, StatOp& stat)
2042
StatOp::Util& util = stat.m_util;
2043
D("statUtilPrepare" << V(stat));
2045
util.m_prepareId = RNIL;
2046
statSendPrepare(signal, stat);
2050
Trix::statUtilPrepareConf(Signal* signal, Uint32 statPtrI)
2052
StatOp& stat = statOpGetPtr(statPtrI);
2053
StatOp::Util& util = stat.m_util;
2054
StatOp::Send& send = stat.m_send;
2055
D("statUtilPrepareConf" << V(stat));
2057
const UtilPrepareConf* utilConf =
2058
(const UtilPrepareConf*)signal->getDataPtr();
2059
util.m_prepareId = utilConf->prepareId;
2061
const Uint32 ot = send.m_operationType;
2062
if ((ERROR_INSERTED(18011) && ot == UtilPrepareReq::Read) ||
2063
(ERROR_INSERTED(18012) && ot != UtilPrepareReq::Read))
2066
CLEAR_ERROR_INSERT_VALUE;
2067
UtilExecuteRef* utilRef =
2068
(UtilExecuteRef*)signal->getDataPtrSend();
2069
utilRef->senderData = stat.m_ownPtrI;
2070
utilRef->errorCode = UtilExecuteRef::AllocationError;
2071
utilRef->TCErrorCode = 0;
2072
sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2073
signal, UtilExecuteRef::SignalLength, JBB);
2077
statUtilExecute(signal, stat);
2081
Trix::statUtilPrepareRef(Signal* signal, Uint32 statPtrI)
2083
StatOp& stat = statOpGetPtr(statPtrI);
2084
D("statUtilPrepareRef" << V(stat));
2086
const UtilPrepareRef* utilRef =
2087
(const UtilPrepareRef*)signal->getDataPtr();
2088
Uint32 errorCode = utilRef->errorCode;
2089
ndbrequire(errorCode != 0);
2091
switch (errorCode) {
2092
case UtilPrepareRef::PREPARE_SEIZE_ERROR:
2093
case UtilPrepareRef::PREPARE_PAGES_SEIZE_ERROR:
2094
case UtilPrepareRef::PREPARED_OPERATION_SEIZE_ERROR:
2095
errorCode = IndexStatRef::BusyUtilPrepare;
2097
case UtilPrepareRef::DICT_TAB_INFO_ERROR:
2098
errorCode = IndexStatRef::InvalidSysTable;
2100
case UtilPrepareRef::MISSING_PROPERTIES_SECTION:
2105
statOpError(signal, stat, errorCode, __LINE__);
2109
Trix::statUtilExecute(Signal* signal, StatOp& stat)
2111
StatOp::Util& util = stat.m_util;
2112
StatOp::Send& send = stat.m_send;
2113
D("statUtilExecute" << V(stat));
2115
send.m_prepareId = util.m_prepareId;
2116
statSendExecute(signal, stat);
2120
Trix::statUtilExecuteConf(Signal* signal, Uint32 statPtrI)
2122
StatOp& stat = statOpGetPtr(statPtrI);
2123
StatOp::Attr& attr = stat.m_attr;
2124
StatOp::Send& send = stat.m_send;
2125
D("statUtilExecuteConf" << V(stat));
2127
if (send.m_operationType == UtilPrepareReq::Read)
2130
SectionHandle handle(this, signal);
2133
attr.m_attr = rattr;
2134
attr.m_attrMax = 20;
2135
attr.m_attrSize = 0;
2136
attr.m_data = rdata;
2137
attr.m_dataMax = 2048;
2138
attr.m_dataSize = 0;
2140
SegmentedSectionPtr ssPtr;
2141
handle.getSection(ssPtr, 0);
2142
::copy(rattr, ssPtr);
2145
SegmentedSectionPtr ssPtr;
2146
handle.getSection(ssPtr, 1);
2147
::copy(rdata, ssPtr);
2149
releaseSections(handle);
2151
const SysTable& sysTable = *send.m_sysTable;
2152
for (Uint32 i = 0; i < sysTable.columnCount; i++)
2155
statDataIn(stat, i);
2159
statUtilRelease(signal, stat);
2163
Trix::statUtilExecuteRef(Signal* signal, Uint32 statPtrI)
2165
StatOp& stat = statOpGetPtr(statPtrI);
2166
StatOp::Util& util = stat.m_util;
2167
StatOp::Send& send = stat.m_send;
2168
D("statUtilExecuteRef" << V(stat));
2170
const UtilExecuteRef* utilRef =
2171
(const UtilExecuteRef*)signal->getDataPtr();
2172
Uint32 errorCode = utilRef->errorCode;
2173
ndbrequire(errorCode != 0);
2175
switch (errorCode) {
2176
case UtilExecuteRef::TCError:
2177
errorCode = utilRef->TCErrorCode;
2178
ndbrequire(errorCode != 0);
2179
if (send.m_operationType == UtilPrepareReq::Read &&
2180
errorCode == ZNOT_FOUND)
2183
util.m_not_found = true;
2187
case UtilExecuteRef::AllocationError:
2188
errorCode = IndexStatRef::BusyUtilExecute;
2198
statOpError(signal, stat, errorCode, __LINE__);
2201
statUtilRelease(signal, stat);
2205
Trix::statUtilRelease(Signal* signal, StatOp& stat)
2207
StatOp::Util& util = stat.m_util;
2208
StatOp::Send& send = stat.m_send;
2209
D("statUtilRelease" << V(stat));
2211
send.m_prepareId = util.m_prepareId;
2212
statSendRelease(signal, stat);
2216
Trix::statUtilReleaseConf(Signal* signal, Uint32 statPtrI)
2218
StatOp& stat = statOpGetPtr(statPtrI);
2219
StatOp::Util& util = stat.m_util;
2220
D("statUtilReleaseConf" << V(stat));
2222
util.m_prepareId = RNIL;
2223
execute(signal, util.m_cb, 0);
2226
// continue after head table ops
2229
Trix::statReadHeadDone(Signal* signal, StatOp& stat)
2231
//UNUSED StatOp::Data& data = stat.m_data;
2232
D("statReadHeadDone" << V(stat));
2234
switch (stat.m_requestType) {
2235
case IndexStatReq::RT_CLEAN_NEW:
2237
case IndexStatReq::RT_CLEAN_OLD:
2239
case IndexStatReq::RT_CLEAN_ALL:
2241
statCleanBegin(signal, stat);
2244
case IndexStatReq::RT_SCAN_FRAG:
2246
statScanBegin(signal, stat);
2249
case IndexStatReq::RT_DROP_HEAD:
2251
statDropBegin(signal, stat);
2261
Trix::statInsertHeadDone(Signal* signal, StatOp& stat)
2263
D("statInsertHeadDone" << V(stat));
2265
switch (stat.m_requestType) {
2266
case IndexStatReq::RT_SCAN_FRAG:
2268
statScanEnd(signal, stat);
2277
Trix::statUpdateHeadDone(Signal* signal, StatOp& stat)
2279
D("statUpdateHeadDone" << V(stat));
2281
switch (stat.m_requestType) {
2282
case IndexStatReq::RT_SCAN_FRAG:
2284
statScanEnd(signal, stat);
2293
Trix::statDeleteHeadDone(Signal* signal, StatOp& stat)
2295
D("statDeleteHeadDone" << V(stat));
2297
switch (stat.m_requestType) {
2298
case IndexStatReq::RT_DROP_HEAD:
2300
statDropEnd(signal, stat);
2311
Trix::statCleanBegin(Signal* signal, StatOp& stat)
2313
const IndexStatImplReq* req = &stat.m_req;
2314
StatOp::Data& data = stat.m_data;
2315
D("statCleanBegin" << V(stat));
2317
if (data.m_head_found == true)
2320
if (data.m_tableId != req->tableId &&
2321
stat.m_requestType != IndexStatReq::RT_CLEAN_ALL)
2324
// must run ndb_index_stat --drop
2325
statOpError(signal, stat, IndexStatRef::InvalidSysTableData, __LINE__);
2331
if (stat.m_requestType != IndexStatReq::RT_CLEAN_ALL)
2334
// happens normally on first stats scan
2335
stat.m_requestType = IndexStatReq::RT_CLEAN_ALL;
2338
statCleanPrepare(signal, stat);
2342
Trix::statCleanPrepare(Signal* signal, StatOp& stat)
2344
const IndexStatImplReq* req = &stat.m_req;
2345
StatOp::Data& data = stat.m_data;
2346
StatOp::Clean& clean = stat.m_clean;
2347
StatOp::Send& send = stat.m_send;
2348
SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2349
D("statCleanPrepare" << V(stat));
2351
// count of deleted samples is just for info
2352
clean.m_cleanCount = 0;
2354
const Uint32 ao_list[] = {
2357
2, // SAMPLE_VERSION
2360
const Uint32 ao_size = sizeof(ao_list)/sizeof(ao_list[0]);
2362
ndbrequire(req->fragId == ZNIL);
2363
subRec->m_flags = 0;
2364
subRec->requestType = STAT_CLEAN;
2365
subRec->schemaTransId = req->transId;
2366
subRec->userReference = 0; // not used
2367
subRec->connectionPtr = RNIL;
2368
subRec->subscriptionId = rand();
2369
subRec->subscriptionKey = rand();
2370
subRec->prepareId = RNIL;
2371
subRec->indexType = 0; // not used
2372
subRec->sourceTableId = g_statMetaSampleX1.indexId;
2373
subRec->targetTableId = RNIL;
2374
subRec->noOfIndexColumns = ao_size;
2375
subRec->noOfKeyColumns = 0;
2376
subRec->parallelism = 16;
2377
subRec->fragCount = 0;
2378
subRec->fragId = ZNIL;
2379
subRec->syncPtr = RNIL;
2380
subRec->errorCode = BuildIndxRef::NoError;
2381
subRec->subscriptionCreated = false;
2382
subRec->pendingSubSyncContinueConf = false;
2383
subRec->expectedConf = 0;
2384
subRec->m_rows_processed = 0;
2387
AttrOrderBuffer& ao_buf = subRec->attributeOrder;
2388
ndbrequire(ao_buf.isEmpty());
2389
ao_buf.append(ao_list, ao_size);
2391
// create TUX bounds
2392
clean.m_bound[0] = TuxBoundInfo::BoundEQ;
2393
clean.m_bound[1] = AttributeHeader(0, 4).m_value;
2394
clean.m_bound[2] = data.m_indexId;
2395
clean.m_bound[3] = TuxBoundInfo::BoundEQ;
2396
clean.m_bound[4] = AttributeHeader(1, 4).m_value;
2397
clean.m_bound[5] = data.m_indexVersion;
2398
switch (stat.m_requestType) {
2399
case IndexStatReq::RT_CLEAN_NEW:
2400
D("statCleanPrepare delete sample versions > " << data.m_sampleVersion);
2401
clean.m_bound[6] = TuxBoundInfo::BoundLT;
2402
clean.m_bound[7] = AttributeHeader(2, 4).m_value;
2403
clean.m_bound[8] = data.m_sampleVersion;
2404
clean.m_boundCount = 3;
2406
case IndexStatReq::RT_CLEAN_OLD:
2407
D("statCleanPrepare delete sample versions < " << data.m_sampleVersion);
2408
clean.m_bound[6] = TuxBoundInfo::BoundGT;
2409
clean.m_bound[7] = AttributeHeader(2, 4).m_value;
2410
clean.m_bound[8] = data.m_sampleVersion;
2411
clean.m_boundCount = 3;
2413
case IndexStatReq::RT_CLEAN_ALL:
2414
D("statCleanPrepare delete all sample versions");
2415
clean.m_boundCount = 2;
2421
clean.m_boundSize = 3 * clean.m_boundCount;
2423
// TRIX traps the CONF
2424
send.m_sysTable = &g_statMetaSample;
2425
send.m_operationType = UtilPrepareReq::Delete;
2426
statSendPrepare(signal, stat);
2430
Trix::statCleanExecute(Signal* signal, StatOp& stat)
2432
StatOp::Data& data = stat.m_data;
2433
StatOp::Send& send = stat.m_send;
2434
StatOp::Clean& clean = stat.m_clean;
2435
SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2436
D("statCleanExecute" << V(stat));
2438
SectionHandle handle(this, signal);
2439
ndbrequire(handle.m_cnt == 2);
2442
AttributeHeader ah[4];
2443
SegmentedSectionPtr ptr0;
2444
handle.getSection(ptr0, SubTableData::ATTR_INFO);
2445
ndbrequire(ptr0.sz == 4);
2446
::copy((Uint32*)ah, ptr0);
2447
ndbrequire(ah[0].getAttributeId() == 0 && ah[0].getDataSize() == 1);
2448
ndbrequire(ah[1].getAttributeId() == 1 && ah[1].getDataSize() == 1);
2449
ndbrequire(ah[2].getAttributeId() == 2 && ah[2].getDataSize() == 1);
2450
// read via TUP rounds bytes to words
2451
const Uint32 kz = ah[3].getDataSize();
2452
ndbrequire(ah[3].getAttributeId() == 3 && kz != 0);
2455
const Uint32 avmax = 3 + MAX_INDEX_STAT_KEY_SIZE;
2457
SegmentedSectionPtr ptr1;
2458
handle.getSection(ptr1, SubTableData::AFTER_VALUES);
2459
ndbrequire(ptr1.sz <= avmax);
2461
ndbrequire(data.m_indexId == av[0]);
2462
ndbrequire(data.m_indexVersion == av[1]);
2463
data.m_sampleVersion = av[2];
2464
data.m_statKey = &av[3];
2465
const char* kp = (const char*)data.m_statKey;
2466
const Uint32 kb = kp[0] + (kp[1] << 8);
2468
ndbrequire(kb != 0);
2469
ndbrequire(kz == ((2 + kb) + 3) / 4);
2471
clean.m_cleanCount++;
2472
releaseSections(handle);
2474
const Uint32 rt = stat.m_requestType;
2475
if ((ERROR_INSERTED(18021) && rt == IndexStatReq::RT_CLEAN_NEW) ||
2476
(ERROR_INSERTED(18022) && rt == IndexStatReq::RT_CLEAN_OLD) ||
2477
(ERROR_INSERTED(18023) && rt == IndexStatReq::RT_CLEAN_ALL))
2480
CLEAR_ERROR_INSERT_VALUE;
2481
UtilExecuteRef* utilRef =
2482
(UtilExecuteRef*)signal->getDataPtrSend();
2483
utilRef->senderData = stat.m_ownPtrI;
2484
utilRef->errorCode = UtilExecuteRef::TCError;
2485
utilRef->TCErrorCode = 626;
2486
sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2487
signal, UtilExecuteRef::SignalLength, JBB);
2488
subRec->expectedConf++;
2492
// TRIX traps the CONF
2493
send.m_sysTable = &g_statMetaSample;
2494
send.m_operationType = UtilPrepareReq::Delete;
2495
send.m_prepareId = subRec->prepareId;
2496
subRec->expectedConf++;
2497
statSendExecute(signal, stat);
2501
Trix::statCleanRelease(Signal* signal, StatOp& stat)
2503
SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2504
D("statCleanRelease" << V(stat) << V(subRec->errorCode));
2506
if (subRec->errorCode != 0)
2509
statOpError(signal, stat, subRec->errorCode, __LINE__);
2512
statCleanEnd(signal, stat);
2516
Trix::statCleanEnd(Signal* signal, StatOp& stat)
2518
D("statCleanEnd" << V(stat));
2519
statOpSuccess(signal, stat);
2525
Trix::statScanBegin(Signal* signal, StatOp& stat)
2527
const IndexStatImplReq* req = &stat.m_req;
2528
StatOp::Data& data = stat.m_data;
2529
D("statScanBegin" << V(stat));
2531
if (data.m_head_found == true &&
2532
data.m_tableId != req->tableId)
2535
statOpError(signal, stat, IndexStatRef::InvalidSysTableData, __LINE__);
2538
data.m_tableId = req->tableId;
2539
statScanPrepare(signal, stat);
2543
Trix::statScanPrepare(Signal* signal, StatOp& stat)
2545
const IndexStatImplReq* req = &stat.m_req;
2546
StatOp::Data& data = stat.m_data;
2547
StatOp::Scan& scan = stat.m_scan;
2548
StatOp::Send& send = stat.m_send;
2549
SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2550
D("statScanPrepare" << V(stat));
2552
// update sample version prior to scan
2553
if (data.m_head_found == false)
2554
data.m_sampleVersion = 0;
2555
data.m_sampleVersion += 1;
2558
scan.m_sampleCount = 0;
2559
scan.m_keyBytes = 0;
2561
const Uint32 ao_list[] = {
2562
AttributeHeader::INDEX_STAT_KEY,
2563
AttributeHeader::INDEX_STAT_VALUE
2565
const Uint32 ao_size = sizeof(ao_list)/sizeof(ao_list[0]);
2567
ndbrequire(req->fragId != ZNIL);
2568
subRec->m_flags = 0;
2569
subRec->requestType = STAT_SCAN;
2570
subRec->schemaTransId = req->transId;
2571
subRec->userReference = 0; // not used
2572
subRec->connectionPtr = RNIL;
2573
subRec->subscriptionId = rand();
2574
subRec->subscriptionKey = rand();
2575
subRec->prepareId = RNIL;
2576
subRec->indexType = 0; // not used
2577
subRec->sourceTableId = data.m_indexId;
2578
subRec->targetTableId = RNIL;
2579
subRec->noOfIndexColumns = ao_size;
2580
subRec->noOfKeyColumns = 0;
2581
subRec->parallelism = 16;
2582
subRec->fragCount = 0; // XXX Suma currently checks all frags
2583
subRec->fragId = req->fragId;
2584
subRec->syncPtr = RNIL;
2585
subRec->errorCode = BuildIndxRef::NoError;
2586
subRec->subscriptionCreated = false;
2587
subRec->pendingSubSyncContinueConf = false;
2588
subRec->expectedConf = 0;
2589
subRec->m_rows_processed = 0;
2592
AttrOrderBuffer& ao_buf = subRec->attributeOrder;
2593
ndbrequire(ao_buf.isEmpty());
2594
ao_buf.append(ao_list, ao_size);
2596
// TRIX traps the CONF
2597
send.m_sysTable = &g_statMetaSample;
2598
send.m_operationType = UtilPrepareReq::Insert;
2599
statSendPrepare(signal, stat);
2603
Trix::statScanExecute(Signal* signal, StatOp& stat)
2605
StatOp::Data& data = stat.m_data;
2606
StatOp::Scan& scan = stat.m_scan;
2607
StatOp::Send& send = stat.m_send;
2608
SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2609
D("statScanExecute" << V(stat));
2611
SectionHandle handle(this, signal);
2612
ndbrequire(handle.m_cnt == 2);
2615
AttributeHeader ah[2];
2616
SegmentedSectionPtr ptr0;
2617
handle.getSection(ptr0, SubTableData::ATTR_INFO);
2618
ndbrequire(ptr0.sz == 2);
2619
::copy((Uint32*)ah, ptr0);
2620
ndbrequire(ah[0].getAttributeId() == AttributeHeader::INDEX_STAT_KEY);
2621
ndbrequire(ah[1].getAttributeId() == AttributeHeader::INDEX_STAT_VALUE);
2622
// read via TUP rounds bytes to words
2623
const Uint32 kz = ah[0].getDataSize();
2624
const Uint32 vz = ah[1].getDataSize();
2625
ndbrequire(kz != 0 && vz != 0);
2628
const Uint32 avmax = MAX_INDEX_STAT_KEY_SIZE + MAX_INDEX_STAT_VALUE_SIZE;
2630
SegmentedSectionPtr ptr1;
2631
handle.getSection(ptr1, SubTableData::AFTER_VALUES);
2632
ndbrequire(ptr1.sz <= avmax);
2634
data.m_statKey = &av[0];
2635
data.m_statValue = &av[kz];
2636
const char* kp = (const char*)data.m_statKey;
2637
const char* vp = (const char*)data.m_statValue;
2638
const Uint32 kb = kp[0] + (kp[1] << 8);
2639
const Uint32 vb = vp[0] + (vp[1] << 8);
2640
// key and value are not empty
2641
ndbrequire(kb != 0 && vb != 0);
2642
ndbrequire(kz == ((2 + kb) + 3) / 4);
2643
ndbrequire(vz == ((2 + vb) + 3) / 4);
2645
scan.m_sampleCount++;
2646
scan.m_keyBytes += kb;
2647
releaseSections(handle);
2649
if (ERROR_INSERTED(18024))
2652
CLEAR_ERROR_INSERT_VALUE;
2653
UtilExecuteRef* utilRef =
2654
(UtilExecuteRef*)signal->getDataPtrSend();
2655
utilRef->senderData = stat.m_ownPtrI;
2656
utilRef->errorCode = UtilExecuteRef::TCError;
2657
utilRef->TCErrorCode = 630;
2658
sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2659
signal, UtilExecuteRef::SignalLength, JBB);
2660
subRec->expectedConf++;
2664
// TRIX traps the CONF
2665
send.m_sysTable = &g_statMetaSample;
2666
send.m_operationType = UtilPrepareReq::Insert;
2667
send.m_prepareId = subRec->prepareId;
2668
subRec->expectedConf++;
2669
statSendExecute(signal, stat);
2673
Trix::statScanRelease(Signal* signal, StatOp& stat)
2675
StatOp::Data& data = stat.m_data;
2676
StatOp::Scan& scan = stat.m_scan;
2677
SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2678
D("statScanRelease" << V(stat) << V(subRec->errorCode));
2680
if (subRec->errorCode != 0)
2683
statOpError(signal, stat, subRec->errorCode, __LINE__);
2686
subRec->requestType = STAT_UTIL;
2688
const Uint32 now = (Uint32)time(0);
2689
data.m_loadTime = now;
2690
data.m_sampleCount = scan.m_sampleCount;
2691
data.m_keyBytes = scan.m_keyBytes;
2692
data.m_valueFormat = MAX_INDEX_STAT_VALUE_FORMAT;
2694
if (data.m_head_found == false)
2697
statHeadInsert(signal, stat);
2702
statHeadUpdate(signal, stat);
2707
Trix::statScanEnd(Signal* signal, StatOp& stat)
2709
StatOp::Data& data = stat.m_data;
2710
const IndexStatImplReq* req = &stat.m_req;
2711
D("statScanEnd" << V(stat));
2714
* TRIX reports stats load time to TUX for proper stats monitoring.
2715
* Passing this via DBDICT RT_START_MON is not feasible. For MT-LQH
2716
* we prefer DbtuxProxy to avoid introducing MT-LQH into TRIX.
2719
#if trix_index_stat_rep_to_tux_instance
2720
Uint32 instanceKey = getInstanceKey(req->indexId, req->fragId);
2721
BlockReference tuxRef = numberToRef(DBTUX, instanceKey, getOwnNodeId());
2723
BlockReference tuxRef = DBTUX_REF;
2726
IndexStatRep* rep = (IndexStatRep*)signal->getDataPtrSend();
2727
rep->senderRef = reference();
2728
rep->senderData = 0;
2729
rep->requestType = IndexStatRep::RT_UPDATE_CONF;
2730
rep->requestFlag = 0;
2731
rep->indexId = req->indexId;
2732
rep->indexVersion = req->indexVersion;
2733
rep->tableId = req->tableId;
2734
rep->fragId = req->fragId;
2735
rep->loadTime = data.m_loadTime;
2736
sendSignal(tuxRef, GSN_INDEX_STAT_REP,
2737
signal, IndexStatRep::SignalLength, JBB);
2739
statOpSuccess(signal, stat);
2745
Trix::statDropBegin(Signal* signal, StatOp& stat)
2747
StatOp::Data& data = stat.m_data;
2748
D("statDropBegin" << V(stat));
2750
if (data.m_head_found == true)
2753
statHeadDelete(signal, stat);
2756
statDropEnd(signal, stat);
2760
Trix::statDropEnd(Signal* signal, StatOp& stat)
2763
statOpSuccess(signal, stat);
2769
Trix::statSendPrepare(Signal* signal, StatOp& stat)
2771
StatOp::Send& send = stat.m_send;
2772
const IndexStatImplReq* req = &stat.m_req;
2773
const SysTable& sysTable = *send.m_sysTable;
2774
D("statSendPrepare" << V(stat));
2776
UtilPrepareReq* utilReq =
2777
(UtilPrepareReq*)signal->getDataPtrSend();
2778
utilReq->senderData = stat.m_ownPtrI;
2779
utilReq->senderRef = reference();
2780
utilReq->schemaTransId = req->transId;
2783
LinearWriter w(&wbuf[0], sizeof(wbuf) >> 2);
2786
w.add(UtilPrepareReq::NoOfOperations, 1);
2787
w.add(UtilPrepareReq::OperationType, send.m_operationType);
2788
w.add(UtilPrepareReq::TableId, sysTable.tableId);
2791
for (i = 0; i < sysTable.columnCount; i++) {
2792
const SysColumn& c = sysTable.columnList[i];
2793
switch (send.m_operationType) {
2794
case UtilPrepareReq::Read:
2795
case UtilPrepareReq::Insert:
2796
case UtilPrepareReq::Update:
2798
w.add(UtilPrepareReq::AttributeId, i);
2800
case UtilPrepareReq::Delete:
2803
w.add(UtilPrepareReq::AttributeId, i);
2811
LinearSectionPtr ptr[3];
2812
ptr[0].p = &wbuf[0];
2813
ptr[0].sz = w.getWordsUsed();
2814
sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ,
2815
signal, UtilPrepareReq::SignalLength, JBB, ptr, 1);
2819
Trix::statSendExecute(Signal* signal, StatOp& stat)
2821
D("statSendExecute" << V(stat));
2822
StatOp::Send& send = stat.m_send;
2823
StatOp::Attr& attr = stat.m_attr;
2824
const SysTable& sysTable = *send.m_sysTable;
2826
UtilExecuteReq* utilReq =
2827
(UtilExecuteReq*)signal->getDataPtrSend();
2828
utilReq->senderData = stat.m_ownPtrI;
2829
utilReq->senderRef = reference();
2830
utilReq->prepareId = send.m_prepareId;
2831
utilReq->scanTakeOver = 0;
2835
attr.m_attr = wattr;
2836
attr.m_attrMax = 20;
2837
attr.m_attrSize = 0;
2838
attr.m_data = wdata;
2839
attr.m_dataMax = 2048;
2840
attr.m_dataSize = 0;
2842
for (Uint32 i = 0; i < sysTable.columnCount; i++) {
2843
const SysColumn& c = sysTable.columnList[i];
2844
switch (send.m_operationType) {
2845
case UtilPrepareReq::Read:
2846
case UtilPrepareReq::Insert:
2847
case UtilPrepareReq::Update:
2849
statDataOut(stat, i);
2851
case UtilPrepareReq::Delete:
2854
statDataOut(stat, i);
2862
LinearSectionPtr ptr[3];
2863
ptr[0].p = attr.m_attr;
2864
ptr[0].sz = attr.m_attrSize;
2865
ptr[1].p = attr.m_data;
2866
ptr[1].sz = attr.m_dataSize;
2867
sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ,
2868
signal, UtilExecuteReq::SignalLength, JBB, ptr, 2);
2872
Trix::statSendRelease(Signal* signal, StatOp& stat)
2874
D("statSendRelease" << V(stat));
2875
StatOp::Send& send = stat.m_send;
2876
ndbrequire(send.m_prepareId != RNIL);
2878
UtilReleaseReq* utilReq =
2879
(UtilReleaseReq*)signal->getDataPtrSend();
2880
utilReq->senderData = stat.m_ownPtrI;
2881
utilReq->prepareId = send.m_prepareId;
2882
sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ,
2883
signal, UtilReleaseReq::SignalLength, JBB);
2889
Trix::statDataPtr(StatOp& stat, Uint32 i, Uint32*& dptr, Uint32& bytes)
2891
StatOp::Data& data = stat.m_data;
2892
StatOp::Send& send = stat.m_send;
2894
const SysTable& sysTable = *send.m_sysTable;
2895
ndbrequire(i < sysTable.columnCount);
2896
//UNUSED const SysColumn& c = sysTable.columnList[i];
2898
if (&sysTable == &g_statMetaHead)
2902
dptr = &data.m_indexId;
2906
dptr = &data.m_indexVersion;
2910
dptr = &data.m_tableId;
2914
dptr = &data.m_fragCount;
2918
dptr = &data.m_valueFormat;
2922
dptr = &data.m_sampleVersion;
2926
dptr = &data.m_loadTime;
2930
dptr = &data.m_sampleCount;
2934
dptr = &data.m_keyBytes;
2944
if (&sysTable == &g_statMetaSample)
2948
dptr = &data.m_indexId;
2952
dptr = &data.m_indexVersion;
2956
dptr = &data.m_sampleVersion;
2961
dptr = data.m_statKey;
2962
const uchar* p = (uchar*)dptr;
2964
bytes = 2 + p[0] + (p[1] << 8);
2969
dptr = data.m_statValue;
2970
const uchar* p = (uchar*)dptr;
2972
bytes = 2 + p[0] + (p[1] << 8);
2986
Trix::statDataOut(StatOp& stat, Uint32 i)
2988
StatOp::Attr& attr = stat.m_attr;
2991
statDataPtr(stat, i, dptr, bytes);
2993
ndbrequire(attr.m_attrSize + 1 <= attr.m_attrMax);
2994
AttributeHeader::init(&attr.m_attr[attr.m_attrSize], i, bytes);
2997
Uint32 words = (bytes + 3) / 4;
2998
ndbrequire(attr.m_dataSize + words <= attr.m_dataMax);
2999
Uint8* dst = (Uint8*)&attr.m_data[attr.m_dataSize];
3000
memcpy(dst, dptr, bytes);
3001
while (bytes < words * 4)
3003
attr.m_dataSize += words;
3004
D("statDataOut" << V(i) << V(bytes) << hex << V(dptr[0]));
3008
Trix::statDataIn(StatOp& stat, Uint32 i)
3010
StatOp::Attr& attr = stat.m_attr;
3013
statDataPtr(stat, i, dptr, bytes);
3015
ndbrequire(attr.m_attrSize + 1 <= attr.m_attrMax);
3016
const AttributeHeader& ah = attr.m_attr[attr.m_attrSize];
3019
ndbrequire(ah.getByteSize() == bytes);
3020
Uint32 words = (bytes + 3) / 4;
3021
ndbrequire(attr.m_dataSize + words <= attr.m_dataMax);
3022
const char* src = (const char*)&attr.m_data[attr.m_dataSize];
3023
memcpy(dptr, src, bytes);
3024
attr.m_dataSize += words;
3025
D("statDataIn" << V(i) << V(bytes) << hex << V(dptr[0]));
3031
Trix::statAbortUtil(Signal* signal, StatOp& stat)
3033
StatOp::Util& util = stat.m_util;
3034
D("statAbortUtil" << V(stat));
3036
ndbrequire(util.m_prepareId != RNIL);
3037
util.m_cb.m_callbackFunction = safe_cast(&Trix::statAbortUtilCB);
3038
util.m_cb.m_callbackData = stat.m_ownPtrI;
3039
statUtilRelease(signal, stat);
3043
Trix::statAbortUtilCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
3045
StatOp& stat = statOpGetPtr(statPtrI);
3046
D("statAbortUtilCB" << V(stat) << V(ret));
3048
ndbrequire(ret == 0);
3049
statOpAbort(signal, stat);
3055
Trix::statOpSuccess(Signal* signal, StatOp& stat)
3057
StatOp::Data& data = stat.m_data;
3058
D("statOpSuccess" << V(stat));
3060
if (stat.m_requestType == IndexStatReq::RT_SCAN_FRAG)
3061
statOpEvent(stat, "I", "created %u samples", data.m_sampleCount);
3063
statOpConf(signal, stat);
3064
statOpRelease(stat);
3068
Trix::statOpConf(Signal* signal, StatOp& stat)
3070
const IndexStatImplReq* req = &stat.m_req;
3071
D("statOpConf" << V(stat));
3073
IndexStatImplConf* conf = (IndexStatImplConf*)signal->getDataPtrSend();
3074
conf->senderRef = reference();
3075
conf->senderData = req->senderData;
3076
sendSignal(req->senderRef, GSN_INDEX_STAT_IMPL_CONF,
3077
signal, IndexStatImplConf::SignalLength, JBB);
3081
Trix::statOpError(Signal* signal, StatOp& stat,
3082
Uint32 errorCode, Uint32 errorLine,
3083
const Uint32 * supress)
3085
D("statOpError" << V(stat) << V(errorCode) << V(errorLine));
3089
for (Uint32 i = 0; supress[i] != 0; i++)
3091
if (errorCode == supress[i])
3097
statOpEvent(stat, "W", "error %u line %u", errorCode, errorLine);
3100
ndbrequire(stat.m_errorCode == 0);
3101
stat.m_errorCode = errorCode;
3102
stat.m_errorLine = errorLine;
3103
statOpAbort(signal, stat);
3107
Trix::statOpAbort(Signal* signal, StatOp& stat)
3109
StatOp::Util& util = stat.m_util;
3110
D("statOpAbort" << V(stat));
3112
if (util.m_prepareId != RNIL)
3115
// returns here when done
3116
statAbortUtil(signal, stat);
3119
statOpRef(signal, stat);
3120
statOpRelease(stat);
3124
Trix::statOpRef(Signal* signal, StatOp& stat)
3126
const IndexStatImplReq* req = &stat.m_req;
3127
D("statOpRef" << V(stat));
3129
statOpRef(signal, req, stat.m_errorCode, stat.m_errorLine);
3133
Trix::statOpRef(Signal* signal, const IndexStatImplReq* req,
3134
Uint32 errorCode, Uint32 errorLine)
3136
D("statOpRef" << V(errorCode) << V(errorLine));
3138
IndexStatImplRef* ref = (IndexStatImplRef*)signal->getDataPtrSend();
3139
ref->senderRef = reference();
3140
ref->senderData = req->senderData;
3141
ref->errorCode = errorCode;
3142
ref->errorLine = errorLine;
3143
sendSignal(req->senderRef, GSN_INDEX_STAT_IMPL_REF,
3144
signal, IndexStatImplRef::SignalLength, JBB);
3148
Trix::statOpEvent(StatOp& stat, const char* level, const char* msg, ...)
3150
//UNUSED const IndexStatImplReq* req = &stat.m_req;
3151
StatOp::Data& data = stat.m_data;
3156
BaseString::vsnprintf(tmp1, sizeof(tmp1), msg, ap);
3160
BaseString::snprintf(tmp2, sizeof(tmp2),
3161
"index %u stats version %u: %s: %s",
3162
data.m_indexId, data.m_sampleVersion,
3163
stat.m_requestName, tmp1);
3165
D("statOpEvent" << V(level) << V(tmp2));
3167
if (level[0] == 'I')
3168
infoEvent("%s", tmp2);
3169
if (level[0] == 'W')
3170
warningEvent("%s", tmp2);
3176
operator<<(NdbOut& out, const Trix::StatOp& stat)
3179
out << " i:" << stat.m_ownPtrI;
3180
out << " head_found:" << stat.m_data.m_head_found;
3186
BLOCK_FUNCTIONS(Trix)
3188
template void append(DataBuffer<15>&,SegmentedSectionPtr,SectionSegmentPool&);