1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include <ndb_global.h>
18
#include <NdbScanOperation.hpp>
19
#include <NdbIndexScanOperation.hpp>
20
#include <NdbTransaction.hpp>
21
#include "NdbApiSignal.hpp"
23
#include "NdbDictionaryImpl.hpp"
24
#include <NdbBlob.hpp>
26
#include <NdbRecAttr.hpp>
27
#include <NdbReceiver.hpp>
30
#include <NdbSqlUtil.hpp>
32
#include <signaldata/ScanTab.hpp>
33
#include <signaldata/KeyInfo.hpp>
34
#include <signaldata/AttrInfo.hpp>
35
#include <signaldata/TcKeyReq.hpp>
37
#define DEBUG_NEXT_RESULT 0
39
NdbScanOperation::NdbScanOperation(Ndb* aNdb, NdbOperation::Type aType) :
40
NdbOperation(aNdb, aType),
41
m_transConnection(NULL)
44
m_allocated_receivers = 0;
45
m_prepared_receivers = 0;
50
m_array = new Uint32[1]; // skip if on delete in fix_receivers
55
NdbScanOperation::~NdbScanOperation()
57
for(Uint32 i = 0; i<m_allocated_receivers; i++){
58
m_receivers[i]->release();
59
theNdb->releaseNdbScanRec(m_receivers[i]);
65
NdbScanOperation::setErrorCode(int aErrorCode){
66
NdbTransaction* tmp = theNdbCon;
67
theNdbCon = m_transConnection;
68
NdbOperation::setErrorCode(aErrorCode);
73
NdbScanOperation::setErrorCodeAbort(int aErrorCode){
74
NdbTransaction* tmp = theNdbCon;
75
theNdbCon = m_transConnection;
76
NdbOperation::setErrorCodeAbort(aErrorCode);
81
/*****************************************************************************
84
* Return Value: Return 0 : init was successful.
85
* Return -1: In all other case.
86
* Remark: Initiates operation record after allocation.
87
*****************************************************************************/
89
NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
91
m_transConnection = myConnection;
92
//NdbConnection* aScanConnection = theNdb->startTransaction(myConnection);
93
theNdb->theRemainingStartTransactions++; // will be checked in hupp...
94
NdbTransaction* aScanConnection = theNdb->hupp(myConnection);
95
if (!aScanConnection){
96
theNdb->theRemainingStartTransactions--;
97
setErrorCodeAbort(theNdb->getNdbError().code);
101
// NOTE! The hupped trans becomes the owner of the operation
102
if(NdbOperation::init(tab, aScanConnection) != 0){
103
theNdb->theRemainingStartTransactions--;
109
theStatus = GetValue;
110
theOperationType = OpenScanRequest;
111
theNdbCon->theMagicNumber = 0xFE11DF;
112
theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;
119
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
124
m_ordered = m_descending = false;
125
Uint32 fragCount = m_currentTable->m_fragmentCount;
127
if (parallel > fragCount || parallel == 0) {
128
parallel = fragCount;
131
// It is only possible to call openScan if
132
// 1. this transcation don't already contain another scan operation
133
// 2. this transaction don't already contain other operations
134
// 3. theScanOp contains a NdbScanOperation
135
if (theNdbCon->theScanningOp != NULL){
140
theNdbCon->theScanningOp = this;
141
bool tupScan = (scan_flags & SF_TupScan);
143
#if 0 // XXX temp for testing
144
{ char* p = getenv("NDB_USE_TUPSCAN");
146
unsigned n = atoi(p); // 0-10
147
if ((unsigned int) (::time(0) % 10) < n) tupScan = true;
151
if (scan_flags & SF_DiskScan)
154
m_no_disk_flag = false;
157
bool rangeScan = false;
158
if ( (int) m_accessTable->m_indexType ==
159
(int) NdbDictionary::Index::OrderedIndex)
161
if (m_currentTable == m_accessTable){
162
// Old way of scanning indexes, should not be allowed
163
m_currentTable = theNdb->theDictionary->
164
getTable(m_currentTable->m_primaryTable.c_str());
165
assert(m_currentTable != NULL);
167
assert (m_currentTable != m_accessTable);
168
// Modify operation state
169
theStatus = GetValue;
170
theOperationType = OpenRangeScanRequest;
175
if (rangeScan && (scan_flags & SF_OrderBy))
176
parallel = fragCount;
178
theParallelism = parallel;
180
if(fix_receivers(parallel) == -1){
181
setErrorCodeAbort(4000);
185
theSCAN_TABREQ = (!theSCAN_TABREQ ? theNdb->getSignal() : theSCAN_TABREQ);
186
if (theSCAN_TABREQ == NULL) {
187
setErrorCodeAbort(4000);
191
theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ);
192
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
193
req->apiConnectPtr = theNdbCon->theTCConPtr;
194
req->tableId = m_accessTable->m_id;
195
req->tableSchemaVersion = m_accessTable->m_version;
196
req->storedProcId = 0xFFFF;
197
req->buddyConPtr = theNdbCon->theBuddyConPtr;
198
req->first_batch_size = batch; // Save user specified batch size
201
ScanTabReq::setParallelism(reqInfo, parallel);
202
ScanTabReq::setScanBatch(reqInfo, 0);
203
ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);
204
ScanTabReq::setTupScanFlag(reqInfo, tupScan);
205
req->requestInfo = reqInfo;
207
m_keyInfo = (scan_flags & SF_KeyInfo) ? 1 : 0;
210
Uint64 transId = theNdbCon->getTransactionId();
211
req->transId1 = (Uint32) transId;
212
req->transId2 = (Uint32) (transId >> 32);
214
NdbApiSignal* tSignal = theSCAN_TABREQ->next();
217
theSCAN_TABREQ->next(tSignal = theNdb->getSignal());
219
theLastKEYINFO = tSignal;
221
tSignal->setSignal(GSN_KEYINFO);
222
theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
223
theTotalNrOfKeyWordInSignal= 0;
225
getFirstATTRINFOScan();
230
NdbScanOperation::setReadLockMode(LockMode lockMode)
232
bool lockExcl, lockHoldMode, readCommitted;
235
case LM_CommittedRead:
244
readCommitted= false;
249
readCommitted= false;
253
/* Not supported / invalid. */
256
theLockMode= lockMode;
257
ScanTabReq *req= CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
258
Uint32 reqInfo= req->requestInfo;
259
ScanTabReq::setLockMode(reqInfo, lockExcl);
260
ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
261
ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
262
req->requestInfo= reqInfo;
266
NdbScanOperation::fix_receivers(Uint32 parallel){
267
assert(parallel > 0);
268
if(parallel > m_allocated_receivers){
269
const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32));
271
Uint64 * tmp = new Uint64[(sz+7)/8];
272
// Save old receivers
273
memcpy(tmp, m_receivers, m_allocated_receivers*sizeof(char*));
275
m_array = (Uint32*)tmp;
277
m_receivers = (NdbReceiver**)tmp;
278
m_api_receivers = m_receivers + parallel;
279
m_conf_receivers = m_api_receivers + parallel;
280
m_sent_receivers = m_conf_receivers + parallel;
281
m_prepared_receivers = (Uint32*)(m_sent_receivers + parallel);
283
// Only get/init "new" receivers
284
NdbReceiver* tScanRec;
285
for (Uint32 i = m_allocated_receivers; i < parallel; i ++) {
286
tScanRec = theNdb->getNdbScanRec();
287
if (tScanRec == NULL) {
288
setErrorCodeAbort(4000);
291
m_receivers[i] = tScanRec;
292
tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this);
294
m_allocated_receivers = parallel;
297
reset_receivers(parallel, 0);
302
* Move receiver from send array to conf:ed array
305
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
306
if(theError.code == 0){
307
if(DEBUG_NEXT_RESULT)
308
ndbout_c("receiver_delivered");
310
Uint32 idx = tRec->m_list_index;
311
Uint32 last = m_sent_receivers_count - 1;
313
NdbReceiver * move = m_sent_receivers[last];
314
m_sent_receivers[idx] = move;
315
move->m_list_index = idx;
317
m_sent_receivers_count = last;
319
last = m_conf_receivers_count;
320
m_conf_receivers[last] = tRec;
321
m_conf_receivers_count = last + 1;
322
tRec->m_list_index = last;
323
tRec->m_current_row = 0;
328
* Remove receiver as it's completed
331
NdbScanOperation::receiver_completed(NdbReceiver* tRec){
332
if(theError.code == 0){
333
if(DEBUG_NEXT_RESULT)
334
ndbout_c("receiver_completed");
336
Uint32 idx = tRec->m_list_index;
337
Uint32 last = m_sent_receivers_count - 1;
339
NdbReceiver * move = m_sent_receivers[last];
340
m_sent_receivers[idx] = move;
341
move->m_list_index = idx;
343
m_sent_receivers_count = last;
347
/*****************************************************************************
348
* int getFirstATTRINFOScan( U_int32 aData )
350
* Return Value: Return 0: Successful
351
* Return -1: All other cases
352
* Parameters: None: Only allocate the first signal.
353
* Remark: When a scan is defined we need to use this method instead
354
* of insertATTRINFO for the first signal.
355
* This is because we need not to mess up the code in
356
* insertATTRINFO with if statements since we are not
357
* interested in the TCKEYREQ signal.
358
*****************************************************************************/
360
NdbScanOperation::getFirstATTRINFOScan()
362
NdbApiSignal* tSignal;
364
tSignal = theNdb->getSignal();
365
if (tSignal == NULL){
366
setErrorCodeAbort(4000);
369
tSignal->setSignal(m_attrInfoGSN);
370
theAI_LenInCurrAI = 8;
371
theATTRINFOptr = &tSignal->getDataPtrSend()[8];
372
theFirstATTRINFO = tSignal;
373
theCurrentATTRINFO = tSignal;
374
theCurrentATTRINFO->next(NULL);
380
* Constats for theTupleKeyDefined[][0]
382
#define SETBOUND_EQ 1
386
#define WAITFOR_SCAN_TIMEOUT 120000
389
NdbScanOperation::executeCursor(int nodeId){
390
NdbTransaction * tCon = theNdbCon;
391
TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
392
Guard guard(tp->theMutexPtr);
394
Uint32 magic = tCon->theMagicNumber;
395
Uint32 seq = tCon->theNodeSequence;
397
if (tp->get_node_alive(nodeId) &&
398
(tp->getNodeSequence(nodeId) == seq)) {
401
* Only call prepareSendScan first time (incase of restarts)
402
* - check with theMagicNumber
404
tCon->theMagicNumber = 0x37412619;
405
if(magic != 0x37412619 &&
406
prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1)
410
if (doSendScan(nodeId) == -1)
413
m_executed= true; // Mark operation as executed
416
if (!(tp->get_node_stopping(nodeId) &&
417
(tp->getNodeSequence(nodeId) == seq))){
418
TRACE_DEBUG("The node is hard dead when attempting to start a scan");
420
tCon->theReleaseOnClose = true;
422
TRACE_DEBUG("The node is stopping when attempting to start a scan");
425
tCon->theCommitStatus = NdbTransaction::Aborted;
431
int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
434
if ((res = nextResultImpl(fetchAllowed, forceSend)) == 0) {
436
NdbBlob* tBlob = theBlobList;
438
if (tBlob->atNextResult() == -1)
440
tBlob = tBlob->theNext;
443
* Flush blob part ops on behalf of user because
444
* - nextResult is analogous to execute(NoCommit)
445
* - user is likely to want blob value before next execute
447
if (m_transConnection->executePendingBlobOps() == -1)
454
int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
457
return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
461
* Check current receiver
464
Uint32 idx = m_current_api_receiver;
465
Uint32 last = m_api_receivers_count;
468
if(DEBUG_NEXT_RESULT)
469
ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last);
474
for(; idx < last; idx++){
475
NdbReceiver* tRec = m_api_receivers[idx];
476
if(tRec->nextResult()){
477
m_curr_row = tRec->copyout(theReceiver);
484
* We have advanced atleast one bucket
486
if(!fetchAllowed || !retVal){
487
m_current_api_receiver = idx;
488
if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
492
Uint32 nodeId = theNdbCon->theDBnode;
493
TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
495
The PollGuard has an implicit call of unlock_and_signal through the
496
~PollGuard method. This method is called implicitly by the compiler
497
in all places where the object is out of context due to a return,
498
break, continue or simply end of statement block
500
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
501
theNdb->theNdbBlockNumber);
503
const Uint32 seq = theNdbCon->theNodeSequence;
510
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0)
513
idx = m_current_api_receiver;
514
last = m_api_receivers_count;
515
Uint32 timeout = tp->m_waitfor_timeout;
519
setErrorCode(theError.code);
520
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
524
Uint32 cnt = m_conf_receivers_count;
525
Uint32 sent = m_sent_receivers_count;
527
if(DEBUG_NEXT_RESULT)
528
ndbout_c("idx=%d last=%d cnt=%d sent=%d", idx, last, cnt, sent);
532
* Just move completed receivers
534
memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*));
536
m_conf_receivers_count = 0;
537
} else if(retVal == 2 && sent > 0){
541
int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
542
if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
544
} else if(ret_code == -1){
548
retVal = -2; //return_code;
550
} else if(retVal == 2){
552
* No completed & no sent -> EndOfData
554
theError.code = -1; // make sure user gets error if he tries again
555
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
562
for(; idx < last; idx++){
563
NdbReceiver* tRec = m_api_receivers[idx];
564
if(tRec->nextResult()){
565
m_curr_row = tRec->copyout(theReceiver);
570
} while(retVal == 2);
575
m_api_receivers_count = last;
576
m_current_api_receiver = idx;
582
if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
585
setErrorCode(4008); // Timeout
588
setErrorCode(4028); // Node fail
590
case -3: // send_next_scan -> return fail (set error-code self)
591
if(theError.code == 0)
592
setErrorCode(4028); // seq changed = Node fail
596
setErrorCode(theError.code);
600
theNdbCon->theTransactionIsStarted = false;
601
theNdbCon->theReleaseOnClose = true;
602
if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
607
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag)
610
NdbApiSignal tSignal(theNdb->theMyRef);
611
tSignal.setSignal(GSN_SCAN_NEXTREQ);
613
Uint32* theData = tSignal.getDataPtrSend();
614
theData[0] = theNdbCon->theTCConPtr;
615
theData[1] = stopScanFlag == true ? 1 : 0;
616
Uint64 transId = theNdbCon->theTransactionId;
617
theData[2] = transId;
618
theData[3] = (Uint32) (transId >> 32);
623
Uint32 last = m_sent_receivers_count;
624
Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
626
for(Uint32 i = 0; i<cnt; i++){
627
NdbReceiver * tRec = m_api_receivers[i];
628
if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
630
m_sent_receivers[last+sent] = tRec;
631
tRec->m_list_index = last+sent;
636
memmove(m_api_receivers, m_api_receivers+cnt,
637
(theParallelism-cnt) * sizeof(char*));
642
Uint32 nodeId = theNdbCon->theDBnode;
643
TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
645
tSignal.setLength(4);
646
LinearSectionPtr ptr[3];
647
ptr[0].p = prep_array;
649
ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
651
tSignal.setLength(4+sent);
652
ret = tp->sendSignal(&tSignal, nodeId);
655
m_sent_receivers_count = last + sent;
656
m_api_receivers_count -= cnt;
657
m_current_api_receiver = 0;
665
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
667
printf("NdbScanOperation::prepareSend\n");
673
NdbScanOperation::doSend(int ProcessorId)
675
printf("NdbScanOperation::doSend\n");
679
void NdbScanOperation::close(bool forceSend, bool releaseOp)
681
DBUG_ENTER("NdbScanOperation::close");
682
DBUG_PRINT("enter", ("this: 0x%lx tcon: 0x%lx con: 0x%lx force: %d release: %d",
684
(long) m_transConnection, (long) theNdbCon,
685
forceSend, releaseOp));
687
if(m_transConnection){
688
if(DEBUG_NEXT_RESULT)
689
ndbout_c("close() theError.code = %d "
690
"m_api_receivers_count = %d "
691
"m_conf_receivers_count = %d "
692
"m_sent_receivers_count = %d",
694
m_api_receivers_count,
695
m_conf_receivers_count,
696
m_sent_receivers_count);
698
TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
700
The PollGuard has an implicit call of unlock_and_signal through the
701
~PollGuard method. This method is called implicitly by the compiler
702
in all places where the object is out of context due to a return,
703
break, continue or simply end of statement block
705
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
706
theNdb->theNdbBlockNumber);
707
close_impl(tp, forceSend, &poll_guard);
710
NdbConnection* tCon = theNdbCon;
711
NdbConnection* tTransCon = m_transConnection;
713
m_transConnection = NULL;
715
if (tTransCon && releaseOp)
717
NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
720
if (theStatus != WaitResponse)
726
tTransCon->releaseScanOperation(&tTransCon->m_theFirstScanOperation,
727
&tTransCon->m_theLastScanOperation,
732
ret = tTransCon->releaseScanOperation(&tTransCon->m_firstExecutedScanOp,
738
tCon->theScanningOp = 0;
739
theNdb->closeTransaction(tCon);
740
theNdb->theRemainingStartTransactions--;
745
NdbScanOperation::execCLOSE_SCAN_REP(){
746
m_conf_receivers_count = 0;
747
m_sent_receivers_count = 0;
750
void NdbScanOperation::release()
752
if(theNdbCon != 0 || m_transConnection != 0){
755
for(Uint32 i = 0; i<m_allocated_receivers; i++){
756
m_receivers[i]->release();
759
NdbOperation::release();
763
theNdb->releaseSignal(theSCAN_TABREQ);
768
/***************************************************************************
769
int prepareSendScan(Uint32 aTC_ConnectPtr,
770
Uint64 aTransactionId)
772
Return Value: Return 0 : preparation of send was succesful.
773
Return -1: In all other case.
774
Parameters: aTC_ConnectPtr: the Connect pointer to TC.
775
aTransactionId: the Transaction identity of the transaction.
776
Remark: Puts the the final data into ATTRINFO signal(s) after this
777
we know the how many signal to send and their sizes
778
***************************************************************************/
779
int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
780
Uint64 aTransactionId){
782
if (theInterpretIndicator != 1 ||
783
(theOperationType != OpenScanRequest &&
784
theOperationType != OpenRangeScanRequest)) {
785
setErrorCodeAbort(4005);
791
// In preapareSendInterpreted we set the sizes (word 4-8) in the
792
// first ATTRINFO signal.
793
if (prepareSendInterpreted() == -1)
797
((NdbIndexScanOperation*)this)->fix_get_values();
800
theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
803
* Prepare all receivers
805
theReceiver.prepareSend();
806
bool keyInfo = m_keyInfo;
807
Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0;
809
* The number of records sent by each LQH is calculated and the kernel
810
* is informed of this number by updating the SCAN_TABREQ signal
812
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
813
Uint32 batch_size = req->first_batch_size; // User specified
814
Uint32 batch_byte_size, first_batch_size;
815
theReceiver.calculate_batch_size(key_size,
820
ScanTabReq::setScanBatch(req->requestInfo, batch_size);
821
req->batch_byte_size= batch_byte_size;
822
req->first_batch_size= first_batch_size;
826
* (Always keyinfo when using blobs)
828
Uint32 reqInfo = req->requestInfo;
829
ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
830
ScanTabReq::setNoDiskFlag(reqInfo, m_no_disk_flag);
831
req->requestInfo = reqInfo;
833
for(Uint32 i = 0; i<theParallelism; i++){
834
if (m_receivers[i]->do_get_value(&theReceiver, batch_size,
844
/*****************************************************************************
847
Return Value: Return >0 : send was succesful, returns number of signals sent
848
Return -1: In all other case.
849
Parameters: aProcessorId: Receiving processor node
850
Remark: Sends the ATTRINFO signal(s)
851
*****************************************************************************/
853
NdbScanOperation::doSendScan(int aProcessorId)
855
Uint32 tSignalCount = 0;
856
NdbApiSignal* tSignal;
858
if (theInterpretIndicator != 1 ||
859
(theOperationType != OpenScanRequest &&
860
theOperationType != OpenRangeScanRequest)) {
861
setErrorCodeAbort(4005);
865
assert(theSCAN_TABREQ != NULL);
866
tSignal = theSCAN_TABREQ;
868
Uint32 tupKeyLen = theTupKeyLen;
869
Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr;
870
Uint64 transId = theNdbCon->theTransactionId;
872
// Update the "attribute info length in words" in SCAN_TABREQ before
873
// sending it. This could not be done in openScan because
874
// we created the ATTRINFO signals after the SCAN_TABREQ signal.
875
ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
876
if (unlikely(theTotalCurrAI_Len > ScanTabReq::MaxTotalAttrInfo)) {
880
req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len;
881
Uint32 tmp = req->requestInfo;
882
ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_);
883
req->distributionKey = theDistributionKey;
884
req->requestInfo = tmp;
885
tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
887
TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
888
LinearSectionPtr ptr[3];
889
ptr[0].p = m_prepared_receivers;
890
ptr[0].sz = theParallelism;
891
if (tp->sendSignal(tSignal, aProcessorId, ptr, 1) == -1) {
897
// must have at least one signal since it contains attrLen for bounds
898
assert(theLastKEYINFO != NULL);
899
tSignal = theLastKEYINFO;
900
tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal);
902
assert(theSCAN_TABREQ->next() != NULL);
903
tSignal = theSCAN_TABREQ->next();
907
KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
908
keyInfo->connectPtr = aTC_ConnectPtr;
909
keyInfo->transId[0] = Uint32(transId);
910
keyInfo->transId[1] = Uint32(transId >> 32);
912
if (tp->sendSignal(tSignal,aProcessorId) == -1){
919
tSignal = tSignal->next();
920
} while(last != theLastKEYINFO);
923
tSignal = theFirstATTRINFO;
924
while (tSignal != NULL) {
925
AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend());
926
attrInfo->connectPtr = aTC_ConnectPtr;
927
attrInfo->transId[0] = Uint32(transId);
928
attrInfo->transId[1] = Uint32(transId >> 32);
930
if (tp->sendSignal(tSignal,aProcessorId) == -1){
935
tSignal = tSignal->next();
937
theStatus = WaitResponse;
940
m_sent_receivers_count = theParallelism;
943
m_current_api_receiver = theParallelism;
944
m_api_receivers_count = theParallelism;
948
}//NdbOperation::doSendScan()
950
/*****************************************************************************
951
* NdbOperation* takeOverScanOp(NdbTransaction* updateTrans);
953
* Parameters: The update transactions NdbTransaction pointer.
954
* Return Value: A reference to the transferred operation object
955
* or NULL if no success.
956
* Remark: Take over the scanning transactions NdbOperation
957
* object for a tuple to an update transaction,
958
* which is the last operation read in nextScanResult()
959
* (theNdbCon->thePreviousScanRec)
961
* FUTURE IMPLEMENTATION: (This note was moved from header file.)
962
* In the future, it will even be possible to transfer
963
* to a NdbTransaction on another Ndb-object.
964
* In this case the receiving NdbTransaction-object must call
965
* a method receiveOpFromScan to actually receive the information.
966
* This means that the updating transactions can be placed
967
* in separate threads and thus increasing the parallelism during
969
****************************************************************************/
971
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, Uint32 & size)
973
NdbRecAttr * tRecAttr = m_curr_row;
976
const Uint32 * src = (Uint32*)tRecAttr->aRef();
978
assert(tRecAttr->get_size_in_bytes() > 0);
979
assert(tRecAttr->get_size_in_bytes() < 65536);
980
const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
983
memcpy(data, src, 4*len);
991
NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans)
994
NdbRecAttr * tRecAttr = m_curr_row;
997
NdbOperation * newOp = pTrans->getNdbOperation(m_currentTable);
1003
// Cannot take over lock if no keyinfo was requested
1004
setErrorCodeAbort(4604);
1007
pTrans->theSimpleState = 0;
1009
assert(tRecAttr->get_size_in_bytes() > 0);
1010
assert(tRecAttr->get_size_in_bytes() < 65536);
1011
const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
1013
newOp->theTupKeyLen = len;
1014
newOp->theOperationType = opType;
1015
newOp->m_abortOption = AbortOnError;
1018
newOp->theLockMode = theLockMode;
1020
case (DeleteRequest):
1021
newOp->theStatus = GetValue;
1024
newOp->theStatus = SetValue;
1026
const Uint32 * src = (Uint32*)tRecAttr->aRef();
1027
const Uint32 tScanInfo = src[len] & 0x3FFFF;
1028
const Uint32 tTakeOverFragment = src[len] >> 20;
1031
TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
1032
TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
1033
TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
1034
newOp->theScanInfo = scanInfo;
1035
newOp->theDistrKeyIndicator_ = 1;
1036
newOp->theDistributionKey = tTakeOverFragment;
1039
// Copy the first 8 words of key info from KEYINF20 into TCKEYREQ
1040
TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq,newOp->theTCREQ->getDataPtrSend());
1042
for (i = 0; i < TcKeyReq::MaxKeyInfo && i < len; i++) {
1043
tcKeyReq->keyInfo[i] = * src++;
1047
NdbApiSignal* tSignal = theNdb->getSignal();
1048
newOp->theTCREQ->next(tSignal);
1050
Uint32 left = len - i;
1051
while(tSignal && left > KeyInfo::DataLength){
1052
tSignal->setSignal(GSN_KEYINFO);
1053
KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
1054
memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength);
1055
src += KeyInfo::DataLength;
1056
left -= KeyInfo::DataLength;
1058
tSignal->next(theNdb->getSignal());
1059
tSignal = tSignal->next();
1062
if(tSignal && left > 0){
1063
tSignal->setSignal(GSN_KEYINFO);
1064
KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
1065
memcpy(keyInfo->keyData, src, 4 * left);
1068
// create blob handles automatically
1069
if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) {
1070
for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) {
1071
NdbColumnImpl* c = m_currentTable->m_columns[i];
1073
if (c->getBlobType()) {
1074
if (newOp->getBlobHandle(pTrans, c) == NULL)
1086
NdbScanOperation::getBlobHandle(const char* anAttrName)
1089
return NdbOperation::getBlobHandle(m_transConnection,
1090
m_currentTable->getColumn(anAttrName));
1094
NdbScanOperation::getBlobHandle(Uint32 anAttrId)
1097
return NdbOperation::getBlobHandle(m_transConnection,
1098
m_currentTable->getColumn(anAttrId));
1101
NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb)
1102
: NdbScanOperation(aNdb, NdbOperation::OrderedIndexScan)
1106
NdbIndexScanOperation::~NdbIndexScanOperation(){
1110
NdbIndexScanOperation::setBound(const char* anAttrName, int type,
1113
return setBound(m_accessTable->getColumn(anAttrName), type, aValue);
1117
NdbIndexScanOperation::setBound(Uint32 anAttrId, int type,
1120
return setBound(m_accessTable->getColumn(anAttrId), type, aValue);
1124
NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject,
1127
return setBound(anAttrObject, BoundEQ, aValue);
1131
NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo,
1134
return NdbScanOperation::getValue_impl(attrInfo, aValue);
1137
int id = attrInfo->getColumnNo(); // In "real" table
1138
assert(m_accessTable->m_index);
1139
int sz = (int)m_accessTable->m_index->m_key_ids.size();
1140
if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){
1141
return NdbScanOperation::getValue_impl(attrInfo, aValue);
1144
assert(id < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
1145
Uint32 marker = theTupleKeyDefined[id][0];
1147
if(marker == SETBOUND_EQ){
1148
return NdbScanOperation::getValue_impl(attrInfo, aValue);
1149
} else if(marker == API_PTR){
1150
return NdbScanOperation::getValue_impl(attrInfo, aValue);
1153
assert(marker == FAKE_PTR);
1156
oldVal = theTupleKeyDefined[id][1];
1157
#if (SIZEOF_CHARP == 8)
1158
oldVal = oldVal | (((UintPtr)theTupleKeyDefined[id][2]) << 32);
1160
theTupleKeyDefined[id][0] = API_PTR;
1162
NdbRecAttr* tmp = (NdbRecAttr*)oldVal;
1163
tmp->setup(attrInfo, aValue);
1168
#include <AttributeHeader.hpp>
1170
* Define bound on index column in range scan.
1173
NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
1174
int type, const void* aValue)
1178
setErrorCodeAbort(4318); // Invalid attribute
1181
if (theOperationType == OpenRangeScanRequest &&
1182
(0 <= type && type <= 4)) {
1183
// insert bound type
1184
Uint32 currLen = theTotalNrOfKeyWordInSignal;
1185
Uint32 remaining = KeyInfo::DataLength - currLen;
1186
bool tDistrKey = tAttrInfo->m_distributionKey;
1190
if (! tAttrInfo->get_var_length(aValue, len)) {
1191
setErrorCodeAbort(4209);
1195
// insert attribute header
1196
Uint32 tIndexAttrId = tAttrInfo->m_attrId;
1197
Uint32 sizeInWords = (len + 3) / 4;
1198
AttributeHeader ah(tIndexAttrId, sizeInWords << 2);
1199
const Uint32 ahValue = ah.m_value;
1201
const Uint32 align = (UintPtr(aValue) & 7);
1202
const bool aligned = (tDistrKey && type == BoundEQ) ?
1203
(align == 0) : (align & 3) == 0;
1205
const bool nobytes = (len & 0x3) == 0;
1206
const Uint32 totalLen = 2 + sizeInWords;
1207
Uint32 tupKeyLen = theTupKeyLen;
1209
Uint32 tempData[2000];
1213
if(remaining > totalLen && aligned && nobytes){
1214
Uint32 * dst = theKEYINFOptr + currLen;
1217
memcpy(dst, aValue, 4 * sizeInWords);
1218
theTotalNrOfKeyWordInSignal = currLen + totalLen;
1219
valPtr = (Uint64*)aValue;
1221
if(!aligned || !nobytes){
1223
tempData[1] = ahValue;
1224
tempData[2 + (len >> 2)] = 0;
1225
memcpy(tempData+2, aValue, len);
1226
insertBOUNDS(tempData, 2+sizeInWords);
1227
valPtr = (Uint64*)(tempData+2);
1229
Uint32 buf[2] = { type, ahValue };
1230
insertBOUNDS(buf, 2);
1231
insertBOUNDS((Uint32*)aValue, sizeInWords);
1232
valPtr = (Uint64*)aValue;
1235
theTupKeyLen = tupKeyLen + totalLen;
1242
* The primary keys for an ordered index is defined in the beginning
1243
* so it's safe to use [tIndexAttrId]
1244
* (instead of looping as is NdbOperation::equal_impl)
1246
if(type == BoundEQ && tDistrKey && !m_multi_range)
1248
theNoOfTupKeyLeft--;
1249
return handle_distribution_key(valPtr, sizeInWords);
1253
setErrorCodeAbort(4228); // XXX wrong code
1259
NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){
1261
Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal;
1262
Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal;
1264
len = (sz < remaining ? sz : remaining);
1265
memcpy(dst, data, 4 * len);
1267
if(sz >= remaining){
1268
NdbApiSignal* tCurr = theLastKEYINFO;
1269
tCurr->setLength(KeyInfo::MaxSignalLength);
1270
NdbApiSignal* tSignal = tCurr->next();
1273
else if((tSignal = theNdb->getSignal()) != 0)
1275
tCurr->next(tSignal);
1276
tSignal->setSignal(GSN_KEYINFO);
1280
theLastKEYINFO = tSignal;
1281
theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
1282
remaining = KeyInfo::DataLength;
1286
len = (KeyInfo::DataLength - remaining) + len;
1290
theTotalNrOfKeyWordInSignal = len;
1294
setErrorCodeAbort(4228); // XXX wrong code
1299
NdbIndexScanOperation::getKeyFromSCANTABREQ(Uint32* data, Uint32 size)
1301
DBUG_ENTER("NdbIndexScanOperation::getKeyFromSCANTABREQ");
1302
assert(size >= theTotalNrOfKeyWordInSignal);
1303
size = theTotalNrOfKeyWordInSignal;
1304
NdbApiSignal* tSignal = theSCAN_TABREQ->next();
1306
while (pos < size) {
1307
assert(tSignal != NULL);
1308
Uint32* tData = tSignal->getDataPtrSend();
1309
Uint32 rem = size - pos;
1310
if (rem > KeyInfo::DataLength)
1311
rem = KeyInfo::DataLength;
1314
data[pos + i] = tData[KeyInfo::HeaderLength + i];
1319
DBUG_DUMP("key", (uchar*) data, size << 2);
1324
NdbIndexScanOperation::readTuples(LockMode lm,
1329
const bool order_by = scan_flags & SF_OrderBy;
1330
const bool order_desc = scan_flags & SF_Descending;
1331
const bool read_range_no = scan_flags & SF_ReadRangeNo;
1332
m_multi_range = scan_flags & SF_MultiRange;
1334
int res = NdbScanOperation::readTuples(lm, scan_flags, parallel, batch);
1335
if(!res && read_range_no)
1337
m_read_range_no = 1;
1339
AttributeHeader::init(&word, AttributeHeader::RANGE_NO, 0);
1340
if(insertATTRINFO(word) == -1)
1346
* Note that it is valid to have order_desc true and order_by false.
1348
* This means that there will be no merge sort among partitions, but
1349
* each partition will still be returned in descending sort order.
1351
* This is useful eg. if it is known that the scan spans only one
1355
m_descending = true;
1356
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
1357
ScanTabReq::setDescendingFlag(req->requestInfo, true);
1361
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
1362
m_sort_columns = cnt; // -1 for NDB$NODE
1363
m_current_api_receiver = m_sent_receivers_count;
1364
m_api_receivers_count = m_sent_receivers_count;
1366
m_sort_columns = cnt;
1367
for(Uint32 i = 0; i<cnt; i++){
1368
const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
1369
const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
1370
NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
1371
UintPtr newVal = UintPtr(tmp);
1372
theTupleKeyDefined[i][0] = FAKE_PTR;
1373
theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
1374
#if (SIZEOF_CHARP == 8)
1375
theTupleKeyDefined[i][2] = (newVal >> 32);
1380
m_this_bound_start = 0;
1381
m_first_bound_word = theKEYINFOptr;
1387
NdbIndexScanOperation::fix_get_values(){
1389
* Loop through all getValues and set buffer pointer to "API" pointer
1391
NdbRecAttr * curr = theReceiver.theFirstRecAttr;
1392
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
1393
assert(cnt < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
1395
for(Uint32 i = 0; i<cnt; i++){
1396
Uint32 val = theTupleKeyDefined[i][0];
1399
curr->setup(curr->m_column, 0);
1401
curr = curr->next();
1414
NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
1415
const NdbReceiver* t1,
1416
const NdbReceiver* t2){
1418
NdbRecAttr * r1 = t1->m_rows[t1->m_current_row];
1419
NdbRecAttr * r2 = t2->m_rows[t2->m_current_row];
1421
r1 = (skip ? r1->next() : r1);
1422
r2 = (skip ? r2->next() : r2);
1423
const int jdir = 1 - 2 * (int)m_descending;
1424
assert(jdir == 1 || jdir == -1);
1426
Uint32 * d1 = (Uint32*)r1->aRef();
1427
Uint32 * d2 = (Uint32*)r2->aRef();
1428
unsigned r1_null = r1->isNULL();
1429
if((r1_null ^ (unsigned)r2->isNULL())){
1430
return (r1_null ? -1 : 1) * jdir;
1432
const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
1433
Uint32 len1 = r1->get_size_in_bytes();
1434
Uint32 len2 = r2->get_size_in_bytes();
1436
const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_type);
1437
int r = (*sqlType.m_cmp)(col.m_cs, d1, len1, d2, len2, true);
1439
assert(r != NdbSqlUtil::CmpUnknown);
1451
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
1455
Uint32 u_idx = 0, u_last = 0;
1456
Uint32 s_idx = m_current_api_receiver; // first sorted
1457
Uint32 s_last = theParallelism; // last sorted
1459
NdbReceiver** arr = m_api_receivers;
1460
NdbReceiver* tRec = arr[s_idx];
1462
if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d",
1464
(s_idx < s_last ? tRec->nextResult() : 0));
1466
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
1470
bool fetchNeeded = (s_idx == s_last) || !tRec->nextResult();
1474
if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
1475
TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
1477
The PollGuard has an implicit call of unlock_and_signal through the
1478
~PollGuard method. This method is called implicitly by the compiler
1479
in all places where the object is out of context due to a return,
1480
break, continue or simply end of statement block
1482
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
1483
theNdb->theNdbBlockNumber);
1486
Uint32 seq = theNdbCon->theNodeSequence;
1487
Uint32 nodeId = theNdbCon->theDBnode;
1488
Uint32 timeout = tp->m_waitfor_timeout;
1489
if(seq == tp->getNodeSequence(nodeId) &&
1490
!send_next_scan_ordered(s_idx)){
1491
Uint32 tmp = m_sent_receivers_count;
1492
s_idx = m_current_api_receiver;
1493
while(m_sent_receivers_count > 0 && !theError.code){
1494
int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
1495
if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
1498
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
1508
setErrorCode(theError.code);
1509
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
1514
u_last = m_conf_receivers_count;
1515
m_conf_receivers_count = 0;
1516
memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
1518
if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
1524
if(DEBUG_NEXT_RESULT) ndbout_c("return 2");
1533
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
1538
Uint32 cols = m_sort_columns + m_read_range_no;
1539
Uint32 skip = m_keyInfo;
1540
while(u_idx < u_last){
1544
// Do binary search instead to find place
1545
Uint32 place = s_idx;
1546
for(; place < s_last; place++){
1547
if(compare(skip, cols, tRec, arr[place]) <= 0){
1553
if(DEBUG_NEXT_RESULT)
1554
ndbout_c("memmove(%d, %d, %d)", s_idx-1, s_idx, (place - s_idx));
1555
memmove(arr+s_idx-1, arr+s_idx, sizeof(char*)*(place - s_idx));
1558
if(DEBUG_NEXT_RESULT) ndbout_c("putting %d @ %d", u_last, place - 1);
1559
m_api_receivers[place-1] = tRec;
1563
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
1567
m_current_api_receiver = s_idx;
1569
if(DEBUG_NEXT_RESULT)
1570
for(Uint32 i = s_idx; i<s_last; i++)
1571
ndbout_c("%p", arr[i]);
1573
tRec = m_api_receivers[s_idx];
1574
if(s_idx < s_last && tRec->nextResult()){
1575
m_curr_row = tRec->copyout(theReceiver);
1576
if(DEBUG_NEXT_RESULT) ndbout_c("return 0");
1581
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
1586
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx)
1588
if(idx == theParallelism)
1591
NdbReceiver* tRec = m_api_receivers[idx];
1592
NdbApiSignal tSignal(theNdb->theMyRef);
1593
tSignal.setSignal(GSN_SCAN_NEXTREQ);
1595
Uint32 last = m_sent_receivers_count;
1596
Uint32* theData = tSignal.getDataPtrSend();
1597
Uint32* prep_array = theData + 4;
1599
m_current_api_receiver = idx + 1;
1600
if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
1602
if(DEBUG_NEXT_RESULT)
1603
ndbout_c("receiver completed, don't send");
1607
theData[0] = theNdbCon->theTCConPtr;
1609
Uint64 transId = theNdbCon->theTransactionId;
1610
theData[2] = transId;
1611
theData[3] = (Uint32) (transId >> 32);
1616
m_sent_receivers[last] = tRec;
1617
tRec->m_list_index = last;
1618
tRec->prepareSend();
1619
m_sent_receivers_count = last + 1;
1621
Uint32 nodeId = theNdbCon->theDBnode;
1622
TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
1623
tSignal.setLength(4+1);
1624
int ret= tp->sendSignal(&tSignal, nodeId);
1629
NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
1630
PollGuard *poll_guard)
1632
Uint32 seq = theNdbCon->theNodeSequence;
1633
Uint32 nodeId = theNdbCon->theDBnode;
1635
if(seq != tp->getNodeSequence(nodeId))
1637
theNdbCon->theReleaseOnClose = true;
1641
Uint32 timeout = tp->m_waitfor_timeout;
1643
* Wait for outstanding
1645
while(theError.code == 0 && m_sent_receivers_count)
1647
int return_code= poll_guard->wait_scan(3*timeout, nodeId, forceSend);
1648
switch(return_code){
1654
m_api_receivers_count = 0;
1655
m_conf_receivers_count = 0;
1656
m_sent_receivers_count = 0;
1657
theNdbCon->theReleaseOnClose = true;
1664
m_api_receivers_count = 0;
1665
m_current_api_receiver = m_ordered ? theParallelism : 0;
1670
* move all conf'ed into api
1671
* so that send_next_scan can check if they needs to be closed
1673
Uint32 api = m_api_receivers_count;
1674
Uint32 conf = m_conf_receivers_count;
1679
* Ordered scan, keep the m_api_receivers "to the right"
1681
memmove(m_api_receivers, m_api_receivers+m_current_api_receiver,
1682
(theParallelism - m_current_api_receiver) * sizeof(char*));
1683
api = (theParallelism - m_current_api_receiver);
1684
m_api_receivers_count = api;
1687
if(DEBUG_NEXT_RESULT)
1688
ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
1689
m_ordered, api, conf,
1690
m_sent_receivers_count, m_current_api_receiver, theParallelism);
1695
* There's something to close
1696
* setup m_api_receivers (for send_next_scan)
1698
memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
1699
m_api_receivers_count = api + conf;
1700
m_conf_receivers_count = 0;
1704
if(send_next_scan(api+conf, true) == -1)
1706
theNdbCon->theReleaseOnClose = true;
1711
* wait for close scan conf
1713
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
1715
int return_code= poll_guard->wait_scan(3*timeout, nodeId, forceSend);
1716
switch(return_code){
1722
m_api_receivers_count = 0;
1723
m_conf_receivers_count = 0;
1724
m_sent_receivers_count = 0;
1725
theNdbCon->theReleaseOnClose = true;
1734
NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
1735
for(Uint32 i = 0; i<parallell; i++){
1736
m_receivers[i]->m_list_index = i;
1737
m_prepared_receivers[i] = m_receivers[i]->getId();
1738
m_sent_receivers[i] = m_receivers[i];
1739
m_conf_receivers[i] = 0;
1740
m_api_receivers[i] = 0;
1741
m_receivers[i]->prepareSend();
1744
m_api_receivers_count = 0;
1745
m_current_api_receiver = 0;
1746
m_sent_receivers_count = 0;
1747
m_conf_receivers_count = 0;
1751
NdbScanOperation::restart(bool forceSend)
1754
TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
1756
The PollGuard has an implicit call of unlock_and_signal through the
1757
~PollGuard method. This method is called implicitly by the compiler
1758
in all places where the object is out of context due to a return,
1759
break, continue or simply end of statement block
1761
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
1762
theNdb->theNdbBlockNumber);
1763
Uint32 nodeId = theNdbCon->theDBnode;
1767
if((res= close_impl(tp, forceSend, &poll_guard)))
1776
reset_receivers(theParallelism, m_ordered);
1779
if (doSendScan(nodeId) == -1)
1785
NdbIndexScanOperation::reset_bounds(bool forceSend){
1789
TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
1791
The PollGuard has an implicit call of unlock_and_signal through the
1792
~PollGuard method. This method is called implicitly by the compiler
1793
in all places where the object is out of context due to a return,
1794
break, continue or simply end of statement block
1796
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
1797
theNdb->theNdbBlockNumber);
1798
res= close_impl(tp, forceSend, &poll_guard);
1804
reset_receivers(theParallelism, m_ordered);
1806
theLastKEYINFO = theSCAN_TABREQ->next();
1807
theKEYINFOptr = ((KeyInfo*)theLastKEYINFO->getDataPtrSend())->keyData;
1809
theTotalNrOfKeyWordInSignal = 0;
1810
theNoOfTupKeyLeft = m_accessTable->m_noOfDistributionKeys;
1811
theDistrKeyIndicator_ = 0;
1812
m_this_bound_start = 0;
1813
m_first_bound_word = theKEYINFOptr;
1815
->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp,
1817
m_transConnection->define_scan_op(this);
1824
NdbIndexScanOperation::end_of_bound(Uint32 no)
1826
DBUG_ENTER("end_of_bound");
1827
DBUG_PRINT("info", ("Range number %u", no));
1828
/* Check that SF_MultiRange has been specified if more
1829
than one range is specified */
1830
if (no > 0 && !m_multi_range)
1832
if(no < (1 << 13)) // Only 12-bits no of ranges
1834
Uint32 bound_head = * m_first_bound_word;
1835
bound_head |= (theTupKeyLen - m_this_bound_start) << 16 | (no << 4);
1836
* m_first_bound_word = bound_head;
1838
m_first_bound_word = theKEYINFOptr + theTotalNrOfKeyWordInSignal;;
1839
m_this_bound_start = theTupKeyLen;
1846
NdbIndexScanOperation::get_range_no()
1848
NdbRecAttr* tRecAttr = m_curr_row;
1849
if(m_read_range_no && tRecAttr)
1852
tRecAttr = tRecAttr->next();
1853
Uint32 ret = *(Uint32*)tRecAttr->aRef();