~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/ndbapi/NdbScanOperation.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <ndb_global.h>
 
17
#include <Ndb.hpp>
 
18
#include <NdbScanOperation.hpp>
 
19
#include <NdbIndexScanOperation.hpp>
 
20
#include <NdbTransaction.hpp>
 
21
#include "NdbApiSignal.hpp"
 
22
#include <NdbOut.hpp>
 
23
#include "NdbDictionaryImpl.hpp"
 
24
#include <NdbBlob.hpp>
 
25
 
 
26
#include <NdbRecAttr.hpp>
 
27
#include <NdbReceiver.hpp>
 
28
 
 
29
#include <stdlib.h>
 
30
#include <NdbSqlUtil.hpp>
 
31
 
 
32
#include <signaldata/ScanTab.hpp>
 
33
#include <signaldata/KeyInfo.hpp>
 
34
#include <signaldata/AttrInfo.hpp>
 
35
#include <signaldata/TcKeyReq.hpp>
 
36
 
 
37
#define DEBUG_NEXT_RESULT 0
 
38
 
 
39
NdbScanOperation::NdbScanOperation(Ndb* aNdb, NdbOperation::Type aType) :
 
40
  NdbOperation(aNdb, aType),
 
41
  m_transConnection(NULL)
 
42
{
 
43
  theParallelism = 0;
 
44
  m_allocated_receivers = 0;
 
45
  m_prepared_receivers = 0;
 
46
  m_api_receivers = 0;
 
47
  m_conf_receivers = 0;
 
48
  m_sent_receivers = 0;
 
49
  m_receivers = 0;
 
50
  m_array = new Uint32[1]; // skip if on delete in fix_receivers
 
51
  theSCAN_TABREQ = 0;
 
52
  m_executed = false;
 
53
}
 
54
 
 
55
NdbScanOperation::~NdbScanOperation()
 
56
{
 
57
  for(Uint32 i = 0; i<m_allocated_receivers; i++){
 
58
    m_receivers[i]->release();
 
59
    theNdb->releaseNdbScanRec(m_receivers[i]);
 
60
  }
 
61
  delete[] m_array;
 
62
}
 
63
 
 
64
void
 
65
NdbScanOperation::setErrorCode(int aErrorCode){
 
66
  NdbTransaction* tmp = theNdbCon;
 
67
  theNdbCon = m_transConnection;
 
68
  NdbOperation::setErrorCode(aErrorCode);
 
69
  theNdbCon = tmp;
 
70
}
 
71
 
 
72
void
 
73
NdbScanOperation::setErrorCodeAbort(int aErrorCode){
 
74
  NdbTransaction* tmp = theNdbCon;
 
75
  theNdbCon = m_transConnection;
 
76
  NdbOperation::setErrorCodeAbort(aErrorCode);
 
77
  theNdbCon = tmp;
 
78
}
 
79
 
 
80
  
 
81
/*****************************************************************************
 
82
 * int init();
 
83
 *
 
84
 * Return Value:  Return 0 : init was successful.
 
85
 *                Return -1: In all other case.  
 
86
 * Remark:        Initiates operation record after allocation.
 
87
 *****************************************************************************/
 
88
int
 
89
NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
 
90
{
 
91
  m_transConnection = myConnection;
 
92
  //NdbConnection* aScanConnection = theNdb->startTransaction(myConnection);
 
93
  theNdb->theRemainingStartTransactions++; // will be checked in hupp...
 
94
  NdbTransaction* aScanConnection = theNdb->hupp(myConnection);
 
95
  if (!aScanConnection){
 
96
    theNdb->theRemainingStartTransactions--;
 
97
    setErrorCodeAbort(theNdb->getNdbError().code);
 
98
    return -1;
 
99
  }
 
100
 
 
101
  // NOTE! The hupped trans becomes the owner of the operation
 
102
  if(NdbOperation::init(tab, aScanConnection) != 0){
 
103
    theNdb->theRemainingStartTransactions--;
 
104
    return -1;
 
105
  }
 
106
  
 
107
  initInterpreter();
 
108
  
 
109
  theStatus = GetValue;
 
110
  theOperationType = OpenScanRequest;
 
111
  theNdbCon->theMagicNumber = 0xFE11DF;
 
112
  theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;
 
113
  m_read_range_no = 0;
 
114
  m_executed = false;
 
115
  return 0;
 
116
}
 
117
 
 
118
int 
 
119
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
 
120
                             Uint32 scan_flags, 
 
121
                             Uint32 parallel,
 
122
                             Uint32 batch)
 
123
{
 
124
  m_ordered = m_descending = false;
 
125
  Uint32 fragCount = m_currentTable->m_fragmentCount;
 
126
 
 
127
  if (parallel > fragCount || parallel == 0) {
 
128
     parallel = fragCount;
 
129
  }
 
130
 
 
131
  // It is only possible to call openScan if 
 
132
  //  1. this transcation don't already  contain another scan operation
 
133
  //  2. this transaction don't already contain other operations
 
134
  //  3. theScanOp contains a NdbScanOperation
 
135
  if (theNdbCon->theScanningOp != NULL){
 
136
    setErrorCode(4605);
 
137
    return -1;
 
138
  }
 
139
 
 
140
  theNdbCon->theScanningOp = this;
 
141
  bool tupScan = (scan_flags & SF_TupScan);
 
142
 
 
143
#if 0 // XXX temp for testing
 
144
  { char* p = getenv("NDB_USE_TUPSCAN");
 
145
    if (p != 0) {
 
146
      unsigned n = atoi(p); // 0-10
 
147
      if ((unsigned int) (::time(0) % 10) < n) tupScan = true;
 
148
    }
 
149
  }
 
150
#endif
 
151
  if (scan_flags & SF_DiskScan)
 
152
  {
 
153
    tupScan = true;
 
154
    m_no_disk_flag = false;
 
155
  }
 
156
  
 
157
  bool rangeScan = false;
 
158
  if ( (int) m_accessTable->m_indexType ==
 
159
       (int) NdbDictionary::Index::OrderedIndex)
 
160
  {
 
161
    if (m_currentTable == m_accessTable){
 
162
      // Old way of scanning indexes, should not be allowed
 
163
      m_currentTable = theNdb->theDictionary->
 
164
        getTable(m_currentTable->m_primaryTable.c_str());
 
165
      assert(m_currentTable != NULL);
 
166
    }
 
167
    assert (m_currentTable != m_accessTable);
 
168
    // Modify operation state
 
169
    theStatus = GetValue;
 
170
    theOperationType  = OpenRangeScanRequest;
 
171
    rangeScan = true;
 
172
    tupScan = false;
 
173
  }
 
174
  
 
175
  if (rangeScan && (scan_flags & SF_OrderBy))
 
176
    parallel = fragCount;
 
177
  
 
178
  theParallelism = parallel;    
 
179
  
 
180
  if(fix_receivers(parallel) == -1){
 
181
    setErrorCodeAbort(4000);
 
182
    return -1;
 
183
  }
 
184
  
 
185
  theSCAN_TABREQ = (!theSCAN_TABREQ ? theNdb->getSignal() : theSCAN_TABREQ);
 
186
  if (theSCAN_TABREQ == NULL) {
 
187
    setErrorCodeAbort(4000);
 
188
    return -1;
 
189
  }//if
 
190
  
 
191
  theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ);
 
192
  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
 
193
  req->apiConnectPtr = theNdbCon->theTCConPtr;
 
194
  req->tableId = m_accessTable->m_id;
 
195
  req->tableSchemaVersion = m_accessTable->m_version;
 
196
  req->storedProcId = 0xFFFF;
 
197
  req->buddyConPtr = theNdbCon->theBuddyConPtr;
 
198
  req->first_batch_size = batch; // Save user specified batch size
 
199
  
 
200
  Uint32 reqInfo = 0;
 
201
  ScanTabReq::setParallelism(reqInfo, parallel);
 
202
  ScanTabReq::setScanBatch(reqInfo, 0);
 
203
  ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);
 
204
  ScanTabReq::setTupScanFlag(reqInfo, tupScan);
 
205
  req->requestInfo = reqInfo;
 
206
 
 
207
  m_keyInfo = (scan_flags & SF_KeyInfo) ? 1 : 0;
 
208
  setReadLockMode(lm);
 
209
 
 
210
  Uint64 transId = theNdbCon->getTransactionId();
 
211
  req->transId1 = (Uint32) transId;
 
212
  req->transId2 = (Uint32) (transId >> 32);
 
213
 
 
214
  NdbApiSignal* tSignal = theSCAN_TABREQ->next();
 
215
  if(!tSignal)
 
216
  {
 
217
    theSCAN_TABREQ->next(tSignal = theNdb->getSignal());
 
218
  }
 
219
  theLastKEYINFO = tSignal;
 
220
  
 
221
  tSignal->setSignal(GSN_KEYINFO);
 
222
  theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
 
223
  theTotalNrOfKeyWordInSignal= 0;
 
224
 
 
225
  getFirstATTRINFOScan();
 
226
  return 0;
 
227
}
 
228
 
 
229
void
 
230
NdbScanOperation::setReadLockMode(LockMode lockMode)
 
231
{
 
232
  bool lockExcl, lockHoldMode, readCommitted;
 
233
  switch (lockMode)
 
234
  {
 
235
    case LM_CommittedRead:
 
236
      lockExcl= false;
 
237
      lockHoldMode= false;
 
238
      readCommitted= true;
 
239
      break;
 
240
    case LM_SimpleRead:
 
241
    case LM_Read:
 
242
      lockExcl= false;
 
243
      lockHoldMode= true;
 
244
      readCommitted= false;
 
245
      break;
 
246
    case LM_Exclusive:
 
247
      lockExcl= true;
 
248
      lockHoldMode= true;
 
249
      readCommitted= false;
 
250
      m_keyInfo= 1;
 
251
      break;
 
252
    default:
 
253
      /* Not supported / invalid. */
 
254
      assert(false);
 
255
  }
 
256
  theLockMode= lockMode;
 
257
  ScanTabReq *req= CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
 
258
  Uint32 reqInfo= req->requestInfo;
 
259
  ScanTabReq::setLockMode(reqInfo, lockExcl);
 
260
  ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
 
261
  ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
 
262
  req->requestInfo= reqInfo;
 
263
}
 
264
 
 
265
int
 
266
NdbScanOperation::fix_receivers(Uint32 parallel){
 
267
  assert(parallel > 0);
 
268
  if(parallel > m_allocated_receivers){
 
269
    const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32));
 
270
 
 
271
    Uint64 * tmp = new Uint64[(sz+7)/8];
 
272
    // Save old receivers
 
273
    memcpy(tmp, m_receivers, m_allocated_receivers*sizeof(char*));
 
274
    delete[] m_array;
 
275
    m_array = (Uint32*)tmp;
 
276
    
 
277
    m_receivers = (NdbReceiver**)tmp;
 
278
    m_api_receivers = m_receivers + parallel;
 
279
    m_conf_receivers = m_api_receivers + parallel;
 
280
    m_sent_receivers = m_conf_receivers + parallel;
 
281
    m_prepared_receivers = (Uint32*)(m_sent_receivers + parallel);
 
282
 
 
283
    // Only get/init "new" receivers
 
284
    NdbReceiver* tScanRec;
 
285
    for (Uint32 i = m_allocated_receivers; i < parallel; i ++) {
 
286
      tScanRec = theNdb->getNdbScanRec();
 
287
      if (tScanRec == NULL) {
 
288
        setErrorCodeAbort(4000);
 
289
        return -1;
 
290
      }//if
 
291
      m_receivers[i] = tScanRec;
 
292
      tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this);
 
293
    }
 
294
    m_allocated_receivers = parallel;
 
295
  }
 
296
  
 
297
  reset_receivers(parallel, 0);
 
298
  return 0;
 
299
}
 
300
 
 
301
/**
 
302
 * Move receiver from send array to conf:ed array
 
303
 */
 
304
void
 
305
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
 
306
  if(theError.code == 0){
 
307
    if(DEBUG_NEXT_RESULT)
 
308
      ndbout_c("receiver_delivered");
 
309
    
 
310
    Uint32 idx = tRec->m_list_index;
 
311
    Uint32 last = m_sent_receivers_count - 1;
 
312
    if(idx != last){
 
313
      NdbReceiver * move = m_sent_receivers[last];
 
314
      m_sent_receivers[idx] = move;
 
315
      move->m_list_index = idx;
 
316
    }
 
317
    m_sent_receivers_count = last;
 
318
    
 
319
    last = m_conf_receivers_count;
 
320
    m_conf_receivers[last] = tRec;
 
321
    m_conf_receivers_count = last + 1;
 
322
    tRec->m_list_index = last;
 
323
    tRec->m_current_row = 0;
 
324
  }
 
325
}
 
326
 
 
327
/**
 
328
 * Remove receiver as it's completed
 
329
 */
 
330
void
 
331
NdbScanOperation::receiver_completed(NdbReceiver* tRec){
 
332
  if(theError.code == 0){
 
333
    if(DEBUG_NEXT_RESULT)
 
334
      ndbout_c("receiver_completed");
 
335
    
 
336
    Uint32 idx = tRec->m_list_index;
 
337
    Uint32 last = m_sent_receivers_count - 1;
 
338
    if(idx != last){
 
339
      NdbReceiver * move = m_sent_receivers[last];
 
340
      m_sent_receivers[idx] = move;
 
341
      move->m_list_index = idx;
 
342
    }
 
343
    m_sent_receivers_count = last;
 
344
  }
 
345
}
 
346
 
 
347
/*****************************************************************************
 
348
 * int getFirstATTRINFOScan( U_int32 aData )
 
349
 *
 
350
 * Return Value:  Return 0:   Successful
 
351
 *                Return -1:  All other cases
 
352
 * Parameters:    None:            Only allocate the first signal.
 
353
 * Remark:        When a scan is defined we need to use this method instead 
 
354
 *                of insertATTRINFO for the first signal. 
 
355
 *                This is because we need not to mess up the code in 
 
356
 *                insertATTRINFO with if statements since we are not 
 
357
 *                interested in the TCKEYREQ signal.
 
358
 *****************************************************************************/
 
359
int
 
360
NdbScanOperation::getFirstATTRINFOScan()
 
361
{
 
362
  NdbApiSignal* tSignal;
 
363
 
 
364
  tSignal = theNdb->getSignal();
 
365
  if (tSignal == NULL){
 
366
    setErrorCodeAbort(4000);      
 
367
    return -1;    
 
368
  }
 
369
  tSignal->setSignal(m_attrInfoGSN);
 
370
  theAI_LenInCurrAI = 8;
 
371
  theATTRINFOptr = &tSignal->getDataPtrSend()[8];
 
372
  theFirstATTRINFO = tSignal;
 
373
  theCurrentATTRINFO = tSignal;
 
374
  theCurrentATTRINFO->next(NULL);
 
375
 
 
376
  return 0;
 
377
}
 
378
 
 
379
/**
 
380
 * Constats for theTupleKeyDefined[][0]
 
381
 */
 
382
#define SETBOUND_EQ 1
 
383
#define FAKE_PTR 2
 
384
#define API_PTR 3
 
385
 
 
386
#define WAITFOR_SCAN_TIMEOUT 120000
 
387
 
 
388
int
 
389
NdbScanOperation::executeCursor(int nodeId){
 
390
  NdbTransaction * tCon = theNdbCon;
 
391
  TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
 
392
  Guard guard(tp->theMutexPtr);
 
393
 
 
394
  Uint32 magic = tCon->theMagicNumber;
 
395
  Uint32 seq = tCon->theNodeSequence;
 
396
 
 
397
  if (tp->get_node_alive(nodeId) &&
 
398
      (tp->getNodeSequence(nodeId) == seq)) {
 
399
 
 
400
    /**
 
401
     * Only call prepareSendScan first time (incase of restarts)
 
402
     *   - check with theMagicNumber
 
403
     */
 
404
    tCon->theMagicNumber = 0x37412619;
 
405
    if(magic != 0x37412619 && 
 
406
       prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1)
 
407
      return -1;
 
408
    
 
409
    
 
410
    if (doSendScan(nodeId) == -1)
 
411
      return -1;
 
412
 
 
413
    m_executed= true; // Mark operation as executed
 
414
    return 0;
 
415
  } else {
 
416
    if (!(tp->get_node_stopping(nodeId) &&
 
417
          (tp->getNodeSequence(nodeId) == seq))){
 
418
      TRACE_DEBUG("The node is hard dead when attempting to start a scan");
 
419
      setErrorCode(4029);
 
420
      tCon->theReleaseOnClose = true;
 
421
    } else {
 
422
      TRACE_DEBUG("The node is stopping when attempting to start a scan");
 
423
      setErrorCode(4030);
 
424
    }//if
 
425
    tCon->theCommitStatus = NdbTransaction::Aborted;
 
426
  }//if
 
427
  return -1;
 
428
}
 
429
 
 
430
 
 
431
int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
 
432
{
 
433
  int res;
 
434
  if ((res = nextResultImpl(fetchAllowed, forceSend)) == 0) {
 
435
    // handle blobs
 
436
    NdbBlob* tBlob = theBlobList;
 
437
    while (tBlob != 0) {
 
438
      if (tBlob->atNextResult() == -1)
 
439
        return -1;
 
440
      tBlob = tBlob->theNext;
 
441
    }
 
442
    /*
 
443
     * Flush blob part ops on behalf of user because
 
444
     * - nextResult is analogous to execute(NoCommit)
 
445
     * - user is likely to want blob value before next execute
 
446
     */
 
447
    if (m_transConnection->executePendingBlobOps() == -1)
 
448
      return -1;
 
449
    return 0;
 
450
  }
 
451
  return res;
 
452
}
 
453
 
 
454
int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
 
455
{
 
456
  if(m_ordered)
 
457
    return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
 
458
                                                               forceSend);
 
459
  
 
460
  /**
 
461
   * Check current receiver
 
462
   */
 
463
  int retVal = 2;
 
464
  Uint32 idx = m_current_api_receiver;
 
465
  Uint32 last = m_api_receivers_count;
 
466
  m_curr_row = 0;
 
467
 
 
468
  if(DEBUG_NEXT_RESULT)
 
469
    ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last);
 
470
  
 
471
  /**
 
472
   * Check next buckets
 
473
   */
 
474
  for(; idx < last; idx++){
 
475
    NdbReceiver* tRec = m_api_receivers[idx];
 
476
    if(tRec->nextResult()){
 
477
      m_curr_row = tRec->copyout(theReceiver);
 
478
      retVal = 0;
 
479
      break;
 
480
    }
 
481
  }
 
482
    
 
483
  /**
 
484
   * We have advanced atleast one bucket
 
485
   */
 
486
  if(!fetchAllowed || !retVal){
 
487
    m_current_api_receiver = idx;
 
488
    if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
 
489
    return retVal;
 
490
  }
 
491
  
 
492
  Uint32 nodeId = theNdbCon->theDBnode;
 
493
  TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
 
494
  /*
 
495
    The PollGuard has an implicit call of unlock_and_signal through the
 
496
    ~PollGuard method. This method is called implicitly by the compiler
 
497
    in all places where the object is out of context due to a return,
 
498
    break, continue or simply end of statement block
 
499
  */
 
500
  PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
 
501
                       theNdb->theNdbBlockNumber);
 
502
 
 
503
  const Uint32 seq = theNdbCon->theNodeSequence;
 
504
 
 
505
  if(theError.code)
 
506
  {
 
507
    goto err4;
 
508
  }
 
509
  
 
510
  if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0)
 
511
  {
 
512
      
 
513
    idx = m_current_api_receiver;
 
514
    last = m_api_receivers_count;
 
515
    Uint32 timeout = tp->m_waitfor_timeout;
 
516
      
 
517
    do {
 
518
      if(theError.code){
 
519
        setErrorCode(theError.code);
 
520
        if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
 
521
        return -1;
 
522
      }
 
523
      
 
524
      Uint32 cnt = m_conf_receivers_count;
 
525
      Uint32 sent = m_sent_receivers_count;
 
526
 
 
527
      if(DEBUG_NEXT_RESULT)
 
528
        ndbout_c("idx=%d last=%d cnt=%d sent=%d", idx, last, cnt, sent);
 
529
        
 
530
      if(cnt > 0){
 
531
        /**
 
532
         * Just move completed receivers
 
533
         */
 
534
        memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*));
 
535
        last += cnt;
 
536
        m_conf_receivers_count = 0;
 
537
      } else if(retVal == 2 && sent > 0){
 
538
        /**
 
539
         * No completed...
 
540
         */
 
541
        int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
 
542
        if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
 
543
          continue;
 
544
        } else if(ret_code == -1){
 
545
          retVal = -1;
 
546
        } else {
 
547
          idx = last;
 
548
          retVal = -2; //return_code;
 
549
        }
 
550
      } else if(retVal == 2){
 
551
        /**
 
552
         * No completed & no sent -> EndOfData
 
553
         */
 
554
        theError.code = -1; // make sure user gets error if he tries again
 
555
        if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
 
556
        return 1;
 
557
      }
 
558
        
 
559
      if(retVal == 0)
 
560
        break;
 
561
        
 
562
      for(; idx < last; idx++){
 
563
        NdbReceiver* tRec = m_api_receivers[idx];
 
564
        if(tRec->nextResult()){
 
565
          m_curr_row = tRec->copyout(theReceiver);      
 
566
          retVal = 0;
 
567
          break;
 
568
        }
 
569
      }
 
570
    } while(retVal == 2);
 
571
  } else {
 
572
    retVal = -3;
 
573
  }
 
574
    
 
575
  m_api_receivers_count = last;
 
576
  m_current_api_receiver = idx;
 
577
    
 
578
  switch(retVal){
 
579
  case 0:
 
580
  case 1:
 
581
  case 2:
 
582
    if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
 
583
    return retVal;
 
584
  case -1:
 
585
    setErrorCode(4008); // Timeout
 
586
    break;
 
587
  case -2:
 
588
    setErrorCode(4028); // Node fail
 
589
    break;
 
590
  case -3: // send_next_scan -> return fail (set error-code self)
 
591
    if(theError.code == 0)
 
592
      setErrorCode(4028); // seq changed = Node fail
 
593
    break;
 
594
  case -4:
 
595
err4:
 
596
    setErrorCode(theError.code);
 
597
    break;
 
598
  }
 
599
    
 
600
  theNdbCon->theTransactionIsStarted = false;
 
601
  theNdbCon->theReleaseOnClose = true;
 
602
  if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
 
603
  return -1;
 
604
}
 
605
 
 
606
int
 
607
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag)
 
608
{
 
609
  if(cnt > 0){
 
610
    NdbApiSignal tSignal(theNdb->theMyRef);
 
611
    tSignal.setSignal(GSN_SCAN_NEXTREQ);
 
612
    
 
613
    Uint32* theData = tSignal.getDataPtrSend();
 
614
    theData[0] = theNdbCon->theTCConPtr;
 
615
    theData[1] = stopScanFlag == true ? 1 : 0;
 
616
    Uint64 transId = theNdbCon->theTransactionId;
 
617
    theData[2] = transId;
 
618
    theData[3] = (Uint32) (transId >> 32);
 
619
    
 
620
    /**
 
621
     * Prepare ops
 
622
     */
 
623
    Uint32 last = m_sent_receivers_count;
 
624
    Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
 
625
    Uint32 sent = 0;
 
626
    for(Uint32 i = 0; i<cnt; i++){
 
627
      NdbReceiver * tRec = m_api_receivers[i];
 
628
      if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
 
629
      {
 
630
        m_sent_receivers[last+sent] = tRec;
 
631
        tRec->m_list_index = last+sent;
 
632
        tRec->prepareSend();
 
633
        sent++;
 
634
      }
 
635
    }
 
636
    memmove(m_api_receivers, m_api_receivers+cnt, 
 
637
            (theParallelism-cnt) * sizeof(char*));
 
638
    
 
639
    int ret = 0;
 
640
    if(sent)
 
641
    {
 
642
      Uint32 nodeId = theNdbCon->theDBnode;
 
643
      TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
 
644
      if(cnt > 21){
 
645
        tSignal.setLength(4);
 
646
        LinearSectionPtr ptr[3];
 
647
        ptr[0].p = prep_array;
 
648
        ptr[0].sz = sent;
 
649
        ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
 
650
      } else {
 
651
        tSignal.setLength(4+sent);
 
652
        ret = tp->sendSignal(&tSignal, nodeId);
 
653
      }
 
654
    }
 
655
    m_sent_receivers_count = last + sent;
 
656
    m_api_receivers_count -= cnt;
 
657
    m_current_api_receiver = 0;
 
658
    
 
659
    return ret;
 
660
  }
 
661
  return 0;
 
662
}
 
663
 
 
664
int 
 
665
NdbScanOperation::prepareSend(Uint32  TC_ConnectPtr, Uint64  TransactionId)
 
666
{
 
667
  printf("NdbScanOperation::prepareSend\n");
 
668
  abort();
 
669
  return 0;
 
670
}
 
671
 
 
672
int 
 
673
NdbScanOperation::doSend(int ProcessorId)
 
674
{
 
675
  printf("NdbScanOperation::doSend\n");
 
676
  return 0;
 
677
}
 
678
 
 
679
void NdbScanOperation::close(bool forceSend, bool releaseOp)
 
680
{
 
681
  DBUG_ENTER("NdbScanOperation::close");
 
682
  DBUG_PRINT("enter", ("this: 0x%lx  tcon: 0x%lx  con: 0x%lx  force: %d  release: %d",
 
683
                       (long) this,
 
684
                       (long) m_transConnection, (long) theNdbCon,
 
685
                       forceSend, releaseOp));
 
686
 
 
687
  if(m_transConnection){
 
688
    if(DEBUG_NEXT_RESULT)
 
689
      ndbout_c("close() theError.code = %d "
 
690
               "m_api_receivers_count = %d "
 
691
               "m_conf_receivers_count = %d "
 
692
               "m_sent_receivers_count = %d",
 
693
               theError.code, 
 
694
               m_api_receivers_count,
 
695
               m_conf_receivers_count,
 
696
               m_sent_receivers_count);
 
697
    
 
698
    TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
 
699
    /*
 
700
      The PollGuard has an implicit call of unlock_and_signal through the
 
701
      ~PollGuard method. This method is called implicitly by the compiler
 
702
      in all places where the object is out of context due to a return,
 
703
      break, continue or simply end of statement block
 
704
    */
 
705
    PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
 
706
                         theNdb->theNdbBlockNumber);
 
707
    close_impl(tp, forceSend, &poll_guard);
 
708
  }
 
709
 
 
710
  NdbConnection* tCon = theNdbCon;
 
711
  NdbConnection* tTransCon = m_transConnection;
 
712
  theNdbCon = NULL;
 
713
  m_transConnection = NULL;
 
714
 
 
715
  if (tTransCon && releaseOp) 
 
716
  {
 
717
    NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
 
718
 
 
719
    bool ret = true;
 
720
    if (theStatus != WaitResponse)
 
721
    {
 
722
      /**
 
723
       * Not executed yet
 
724
       */
 
725
      ret = 
 
726
        tTransCon->releaseScanOperation(&tTransCon->m_theFirstScanOperation,
 
727
                                        &tTransCon->m_theLastScanOperation,
 
728
                                        tOp);
 
729
    }
 
730
    else
 
731
    {
 
732
      ret = tTransCon->releaseScanOperation(&tTransCon->m_firstExecutedScanOp,
 
733
                                            0, tOp);
 
734
    }
 
735
    assert(ret);
 
736
  }
 
737
  
 
738
  tCon->theScanningOp = 0;
 
739
  theNdb->closeTransaction(tCon);
 
740
  theNdb->theRemainingStartTransactions--;
 
741
  DBUG_VOID_RETURN;
 
742
}
 
743
 
 
744
void
 
745
NdbScanOperation::execCLOSE_SCAN_REP(){
 
746
  m_conf_receivers_count = 0;
 
747
  m_sent_receivers_count = 0;
 
748
}
 
749
 
 
750
void NdbScanOperation::release()
 
751
{
 
752
  if(theNdbCon != 0 || m_transConnection != 0){
 
753
    close();
 
754
  }
 
755
  for(Uint32 i = 0; i<m_allocated_receivers; i++){
 
756
    m_receivers[i]->release();
 
757
  }
 
758
 
 
759
  NdbOperation::release();
 
760
  
 
761
  if(theSCAN_TABREQ)
 
762
  {
 
763
    theNdb->releaseSignal(theSCAN_TABREQ);
 
764
    theSCAN_TABREQ = 0;
 
765
  }
 
766
}
 
767
 
 
768
/***************************************************************************
 
769
int prepareSendScan(Uint32 aTC_ConnectPtr,
 
770
                    Uint64 aTransactionId)
 
771
 
 
772
Return Value:   Return 0 : preparation of send was succesful.
 
773
                Return -1: In all other case.   
 
774
Parameters:     aTC_ConnectPtr: the Connect pointer to TC.
 
775
                aTransactionId: the Transaction identity of the transaction.
 
776
Remark:         Puts the the final data into ATTRINFO signal(s)  after this 
 
777
                we know the how many signal to send and their sizes
 
778
***************************************************************************/
 
779
int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
 
780
                                      Uint64 aTransactionId){
 
781
 
 
782
  if (theInterpretIndicator != 1 ||
 
783
      (theOperationType != OpenScanRequest &&
 
784
       theOperationType != OpenRangeScanRequest)) {
 
785
    setErrorCodeAbort(4005);
 
786
    return -1;
 
787
  }
 
788
 
 
789
  theErrorLine = 0;
 
790
 
 
791
  // In preapareSendInterpreted we set the sizes (word 4-8) in the
 
792
  // first ATTRINFO signal.
 
793
  if (prepareSendInterpreted() == -1)
 
794
    return -1;
 
795
  
 
796
  if(m_ordered){
 
797
    ((NdbIndexScanOperation*)this)->fix_get_values();
 
798
  }
 
799
  
 
800
  theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
 
801
 
 
802
  /**
 
803
   * Prepare all receivers
 
804
   */
 
805
  theReceiver.prepareSend();
 
806
  bool keyInfo = m_keyInfo;
 
807
  Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0;
 
808
  /**
 
809
   * The number of records sent by each LQH is calculated and the kernel
 
810
   * is informed of this number by updating the SCAN_TABREQ signal
 
811
   */
 
812
  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
 
813
  Uint32 batch_size = req->first_batch_size; // User specified
 
814
  Uint32 batch_byte_size, first_batch_size;
 
815
  theReceiver.calculate_batch_size(key_size,
 
816
                                   theParallelism,
 
817
                                   batch_size,
 
818
                                   batch_byte_size,
 
819
                                   first_batch_size);
 
820
  ScanTabReq::setScanBatch(req->requestInfo, batch_size);
 
821
  req->batch_byte_size= batch_byte_size;
 
822
  req->first_batch_size= first_batch_size;
 
823
 
 
824
  /**
 
825
   * Set keyinfo flag
 
826
   *  (Always keyinfo when using blobs)
 
827
   */
 
828
  Uint32 reqInfo = req->requestInfo;
 
829
  ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
 
830
  ScanTabReq::setNoDiskFlag(reqInfo, m_no_disk_flag);
 
831
  req->requestInfo = reqInfo;
 
832
  
 
833
  for(Uint32 i = 0; i<theParallelism; i++){
 
834
    if (m_receivers[i]->do_get_value(&theReceiver, batch_size, 
 
835
                                     key_size, 
 
836
                                     m_read_range_no))
 
837
    {
 
838
      return -1;
 
839
    }
 
840
  }
 
841
  return 0;
 
842
}
 
843
 
 
844
/*****************************************************************************
 
845
int doSend()
 
846
 
 
847
Return Value:   Return >0 : send was succesful, returns number of signals sent
 
848
                Return -1: In all other case.   
 
849
Parameters:     aProcessorId: Receiving processor node
 
850
Remark:         Sends the ATTRINFO signal(s)
 
851
*****************************************************************************/
 
852
int
 
853
NdbScanOperation::doSendScan(int aProcessorId)
 
854
{
 
855
  Uint32 tSignalCount = 0;
 
856
  NdbApiSignal* tSignal;
 
857
 
 
858
  if (theInterpretIndicator != 1 ||
 
859
      (theOperationType != OpenScanRequest &&
 
860
       theOperationType != OpenRangeScanRequest)) {
 
861
      setErrorCodeAbort(4005);
 
862
      return -1;
 
863
  }
 
864
  
 
865
  assert(theSCAN_TABREQ != NULL);
 
866
  tSignal = theSCAN_TABREQ;
 
867
  
 
868
  Uint32 tupKeyLen = theTupKeyLen;
 
869
  Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr;
 
870
  Uint64 transId = theNdbCon->theTransactionId;
 
871
  
 
872
  // Update the "attribute info length in words" in SCAN_TABREQ before 
 
873
  // sending it. This could not be done in openScan because 
 
874
  // we created the ATTRINFO signals after the SCAN_TABREQ signal.
 
875
  ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
 
876
  if (unlikely(theTotalCurrAI_Len > ScanTabReq::MaxTotalAttrInfo)) {
 
877
    setErrorCode(4257);
 
878
    return -1;
 
879
  }
 
880
  req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len;
 
881
  Uint32 tmp = req->requestInfo;
 
882
  ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_);
 
883
  req->distributionKey = theDistributionKey;
 
884
  req->requestInfo = tmp;
 
885
  tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
 
886
 
 
887
  TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
 
888
  LinearSectionPtr ptr[3];
 
889
  ptr[0].p = m_prepared_receivers;
 
890
  ptr[0].sz = theParallelism;
 
891
  if (tp->sendSignal(tSignal, aProcessorId, ptr, 1) == -1) {
 
892
    setErrorCode(4002);
 
893
    return -1;
 
894
  } 
 
895
 
 
896
  if (tupKeyLen > 0){
 
897
    // must have at least one signal since it contains attrLen for bounds
 
898
    assert(theLastKEYINFO != NULL);
 
899
    tSignal = theLastKEYINFO;
 
900
    tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal);
 
901
    
 
902
    assert(theSCAN_TABREQ->next() != NULL);
 
903
    tSignal = theSCAN_TABREQ->next();
 
904
    
 
905
    NdbApiSignal* last;
 
906
    do {
 
907
      KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
 
908
      keyInfo->connectPtr = aTC_ConnectPtr;
 
909
      keyInfo->transId[0] = Uint32(transId);
 
910
      keyInfo->transId[1] = Uint32(transId >> 32);
 
911
      
 
912
      if (tp->sendSignal(tSignal,aProcessorId) == -1){
 
913
        setErrorCode(4002);
 
914
        return -1;
 
915
      }
 
916
      
 
917
      tSignalCount++;
 
918
      last = tSignal;
 
919
      tSignal = tSignal->next();
 
920
    } while(last != theLastKEYINFO);
 
921
  }
 
922
  
 
923
  tSignal = theFirstATTRINFO;
 
924
  while (tSignal != NULL) {
 
925
    AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend());
 
926
    attrInfo->connectPtr = aTC_ConnectPtr;
 
927
    attrInfo->transId[0] = Uint32(transId);
 
928
    attrInfo->transId[1] = Uint32(transId >> 32);
 
929
    
 
930
    if (tp->sendSignal(tSignal,aProcessorId) == -1){
 
931
      setErrorCode(4002);
 
932
      return -1;
 
933
    }
 
934
    tSignalCount++;
 
935
    tSignal = tSignal->next();
 
936
  }    
 
937
  theStatus = WaitResponse;  
 
938
 
 
939
  m_curr_row = 0;
 
940
  m_sent_receivers_count = theParallelism;
 
941
  if(m_ordered)
 
942
  {
 
943
    m_current_api_receiver = theParallelism;
 
944
    m_api_receivers_count = theParallelism;
 
945
  }
 
946
  
 
947
  return tSignalCount;
 
948
}//NdbOperation::doSendScan()
 
949
 
 
950
/*****************************************************************************
 
951
 * NdbOperation* takeOverScanOp(NdbTransaction* updateTrans);
 
952
 *
 
953
 * Parameters:     The update transactions NdbTransaction pointer.
 
954
 * Return Value:   A reference to the transferred operation object 
 
955
 *                   or NULL if no success.
 
956
 * Remark:         Take over the scanning transactions NdbOperation 
 
957
 *                 object for a tuple to an update transaction, 
 
958
 *                 which is the last operation read in nextScanResult()
 
959
 *                 (theNdbCon->thePreviousScanRec)
 
960
 *
 
961
 *     FUTURE IMPLEMENTATION:   (This note was moved from header file.)
 
962
 *     In the future, it will even be possible to transfer 
 
963
 *     to a NdbTransaction on another Ndb-object.  
 
964
 *     In this case the receiving NdbTransaction-object must call 
 
965
 *     a method receiveOpFromScan to actually receive the information.  
 
966
 *     This means that the updating transactions can be placed
 
967
 *     in separate threads and thus increasing the parallelism during
 
968
 *     the scan process. 
 
969
 ****************************************************************************/
 
970
int
 
971
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, Uint32 & size)
 
972
{
 
973
  NdbRecAttr * tRecAttr = m_curr_row;
 
974
  if(tRecAttr)
 
975
  {
 
976
    const Uint32 * src = (Uint32*)tRecAttr->aRef();
 
977
 
 
978
    assert(tRecAttr->get_size_in_bytes() > 0);
 
979
    assert(tRecAttr->get_size_in_bytes() < 65536);
 
980
    const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
 
981
 
 
982
    assert(size >= len);
 
983
    memcpy(data, src, 4*len);
 
984
    size = len;
 
985
    return 0;
 
986
  }
 
987
  return -1;
 
988
}
 
989
 
 
990
NdbOperation*
 
991
NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans)
 
992
{
 
993
  
 
994
  NdbRecAttr * tRecAttr = m_curr_row;
 
995
  if(tRecAttr)
 
996
  {
 
997
    NdbOperation * newOp = pTrans->getNdbOperation(m_currentTable);
 
998
    if (newOp == NULL){
 
999
      return NULL;
 
1000
    }
 
1001
    if (!m_keyInfo)
 
1002
    {
 
1003
      // Cannot take over lock if no keyinfo was requested
 
1004
      setErrorCodeAbort(4604);
 
1005
      return NULL;
 
1006
    }
 
1007
    pTrans->theSimpleState = 0;
 
1008
    
 
1009
    assert(tRecAttr->get_size_in_bytes() > 0);
 
1010
    assert(tRecAttr->get_size_in_bytes() < 65536);
 
1011
    const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
 
1012
    
 
1013
    newOp->theTupKeyLen = len;
 
1014
    newOp->theOperationType = opType;
 
1015
    newOp->m_abortOption = AbortOnError;
 
1016
    switch (opType) {
 
1017
    case (ReadRequest):
 
1018
      newOp->theLockMode = theLockMode;
 
1019
      // Fall through
 
1020
    case (DeleteRequest):
 
1021
      newOp->theStatus = GetValue;
 
1022
      break;
 
1023
    default:
 
1024
      newOp->theStatus = SetValue;
 
1025
    }
 
1026
    const Uint32 * src = (Uint32*)tRecAttr->aRef();
 
1027
    const Uint32 tScanInfo = src[len] & 0x3FFFF;
 
1028
    const Uint32 tTakeOverFragment = src[len] >> 20;
 
1029
    {
 
1030
      UintR scanInfo = 0;
 
1031
      TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
 
1032
      TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
 
1033
      TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
 
1034
      newOp->theScanInfo = scanInfo;
 
1035
      newOp->theDistrKeyIndicator_ = 1;
 
1036
      newOp->theDistributionKey = tTakeOverFragment;
 
1037
    }
 
1038
 
 
1039
    // Copy the first 8 words of key info from KEYINF20 into TCKEYREQ
 
1040
    TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq,newOp->theTCREQ->getDataPtrSend());
 
1041
    Uint32 i = 0;
 
1042
    for (i = 0; i < TcKeyReq::MaxKeyInfo && i < len; i++) {
 
1043
      tcKeyReq->keyInfo[i] = * src++;
 
1044
    }
 
1045
    
 
1046
    if(i < len){
 
1047
      NdbApiSignal* tSignal = theNdb->getSignal();
 
1048
      newOp->theTCREQ->next(tSignal); 
 
1049
      
 
1050
      Uint32 left = len - i;
 
1051
      while(tSignal && left > KeyInfo::DataLength){
 
1052
        tSignal->setSignal(GSN_KEYINFO);
 
1053
        KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
 
1054
        memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength);
 
1055
        src += KeyInfo::DataLength;
 
1056
        left -= KeyInfo::DataLength;
 
1057
 
 
1058
        tSignal->next(theNdb->getSignal());
 
1059
        tSignal = tSignal->next();
 
1060
      }
 
1061
 
 
1062
      if(tSignal && left > 0){
 
1063
        tSignal->setSignal(GSN_KEYINFO);
 
1064
        KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
 
1065
        memcpy(keyInfo->keyData, src, 4 * left);
 
1066
      }      
 
1067
    }
 
1068
    // create blob handles automatically
 
1069
    if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) {
 
1070
      for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) {
 
1071
        NdbColumnImpl* c = m_currentTable->m_columns[i];
 
1072
        assert(c != 0);
 
1073
        if (c->getBlobType()) {
 
1074
          if (newOp->getBlobHandle(pTrans, c) == NULL)
 
1075
            return NULL;
 
1076
        }
 
1077
      }
 
1078
    }
 
1079
    
 
1080
    return newOp;
 
1081
  }
 
1082
  return 0;
 
1083
}
 
1084
 
 
1085
NdbBlob*
 
1086
NdbScanOperation::getBlobHandle(const char* anAttrName)
 
1087
{
 
1088
  m_keyInfo = 1;
 
1089
  return NdbOperation::getBlobHandle(m_transConnection, 
 
1090
                                     m_currentTable->getColumn(anAttrName));
 
1091
}
 
1092
 
 
1093
NdbBlob*
 
1094
NdbScanOperation::getBlobHandle(Uint32 anAttrId)
 
1095
{
 
1096
  m_keyInfo = 1;
 
1097
  return NdbOperation::getBlobHandle(m_transConnection, 
 
1098
                                     m_currentTable->getColumn(anAttrId));
 
1099
}
 
1100
 
 
1101
NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb)
 
1102
  : NdbScanOperation(aNdb, NdbOperation::OrderedIndexScan)
 
1103
{
 
1104
}
 
1105
 
 
1106
NdbIndexScanOperation::~NdbIndexScanOperation(){
 
1107
}
 
1108
 
 
1109
int
 
1110
NdbIndexScanOperation::setBound(const char* anAttrName, int type, 
 
1111
                                const void* aValue)
 
1112
{
 
1113
  return setBound(m_accessTable->getColumn(anAttrName), type, aValue);
 
1114
}
 
1115
 
 
1116
int
 
1117
NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, 
 
1118
                                const void* aValue)
 
1119
{
 
1120
  return setBound(m_accessTable->getColumn(anAttrId), type, aValue);
 
1121
}
 
1122
 
 
1123
int
 
1124
NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject, 
 
1125
                                  const char* aValue)
 
1126
{
 
1127
  return setBound(anAttrObject, BoundEQ, aValue);
 
1128
}
 
1129
 
 
1130
NdbRecAttr*
 
1131
NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, 
 
1132
                                     char* aValue){
 
1133
  if(!m_ordered){
 
1134
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
 
1135
  }
 
1136
  
 
1137
  int id = attrInfo->getColumnNo();                // In "real" table
 
1138
  assert(m_accessTable->m_index);
 
1139
  int sz = (int)m_accessTable->m_index->m_key_ids.size();
 
1140
  if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){
 
1141
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
 
1142
  }
 
1143
  
 
1144
  assert(id < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
 
1145
  Uint32 marker = theTupleKeyDefined[id][0];
 
1146
  
 
1147
  if(marker == SETBOUND_EQ){
 
1148
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
 
1149
  } else if(marker == API_PTR){
 
1150
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
 
1151
  }
 
1152
  
 
1153
  assert(marker == FAKE_PTR);
 
1154
  
 
1155
  UintPtr oldVal;
 
1156
  oldVal = theTupleKeyDefined[id][1];
 
1157
#if (SIZEOF_CHARP == 8)
 
1158
  oldVal = oldVal | (((UintPtr)theTupleKeyDefined[id][2]) << 32);
 
1159
#endif
 
1160
  theTupleKeyDefined[id][0] = API_PTR;
 
1161
 
 
1162
  NdbRecAttr* tmp = (NdbRecAttr*)oldVal;
 
1163
  tmp->setup(attrInfo, aValue);
 
1164
 
 
1165
  return tmp;
 
1166
}
 
1167
 
 
1168
#include <AttributeHeader.hpp>
 
1169
/*
 
1170
 * Define bound on index column in range scan.
 
1171
 */
 
1172
int
 
1173
NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, 
 
1174
                                int type, const void* aValue)
 
1175
{
 
1176
  if (!tAttrInfo)
 
1177
  {
 
1178
    setErrorCodeAbort(4318);    // Invalid attribute
 
1179
    return -1;
 
1180
  }
 
1181
  if (theOperationType == OpenRangeScanRequest &&
 
1182
      (0 <= type && type <= 4)) {
 
1183
    // insert bound type
 
1184
    Uint32 currLen = theTotalNrOfKeyWordInSignal;
 
1185
    Uint32 remaining = KeyInfo::DataLength - currLen;
 
1186
    bool tDistrKey = tAttrInfo->m_distributionKey;
 
1187
 
 
1188
    Uint32 len = 0;
 
1189
    if (aValue != NULL)
 
1190
      if (! tAttrInfo->get_var_length(aValue, len)) {
 
1191
        setErrorCodeAbort(4209);
 
1192
        return -1;
 
1193
      }
 
1194
 
 
1195
    // insert attribute header
 
1196
    Uint32 tIndexAttrId = tAttrInfo->m_attrId;
 
1197
    Uint32 sizeInWords = (len + 3) / 4;
 
1198
    AttributeHeader ah(tIndexAttrId, sizeInWords << 2);
 
1199
    const Uint32 ahValue = ah.m_value;
 
1200
 
 
1201
    const Uint32 align = (UintPtr(aValue) & 7);
 
1202
    const bool aligned = (tDistrKey && type == BoundEQ) ? 
 
1203
      (align == 0) : (align & 3) == 0;
 
1204
 
 
1205
    const bool nobytes = (len & 0x3) == 0;
 
1206
    const Uint32 totalLen = 2 + sizeInWords;
 
1207
    Uint32 tupKeyLen = theTupKeyLen;
 
1208
    union {
 
1209
      Uint32 tempData[2000];
 
1210
      Uint64 __my_align;
 
1211
    };
 
1212
    Uint64 *valPtr;
 
1213
    if(remaining > totalLen && aligned && nobytes){
 
1214
      Uint32 * dst = theKEYINFOptr + currLen;
 
1215
      * dst ++ = type;
 
1216
      * dst ++ = ahValue;
 
1217
      memcpy(dst, aValue, 4 * sizeInWords);
 
1218
      theTotalNrOfKeyWordInSignal = currLen + totalLen;
 
1219
      valPtr = (Uint64*)aValue;
 
1220
    } else {
 
1221
      if(!aligned || !nobytes){
 
1222
        tempData[0] = type;
 
1223
        tempData[1] = ahValue;
 
1224
        tempData[2 + (len >> 2)] = 0;
 
1225
        memcpy(tempData+2, aValue, len);
 
1226
        insertBOUNDS(tempData, 2+sizeInWords);
 
1227
        valPtr = (Uint64*)(tempData+2);
 
1228
      } else {
 
1229
        Uint32 buf[2] = { type, ahValue };
 
1230
        insertBOUNDS(buf, 2);
 
1231
        insertBOUNDS((Uint32*)aValue, sizeInWords);
 
1232
        valPtr = (Uint64*)aValue;
 
1233
      }
 
1234
    }
 
1235
    theTupKeyLen = tupKeyLen + totalLen;
 
1236
 
 
1237
    /**
 
1238
     * Do sorted stuff
 
1239
     */
 
1240
 
 
1241
    /**
 
1242
     * The primary keys for an ordered index is defined in the beginning
 
1243
     * so it's safe to use [tIndexAttrId] 
 
1244
     * (instead of looping as is NdbOperation::equal_impl)
 
1245
     */
 
1246
    if(type == BoundEQ && tDistrKey && !m_multi_range)
 
1247
    {
 
1248
      theNoOfTupKeyLeft--;
 
1249
      return handle_distribution_key(valPtr, sizeInWords);
 
1250
    }
 
1251
    return 0;
 
1252
  } else {
 
1253
    setErrorCodeAbort(4228);    // XXX wrong code
 
1254
    return -1;
 
1255
  }
 
1256
}
 
1257
 
 
1258
int
 
1259
NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){
 
1260
  Uint32 len;
 
1261
  Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal;
 
1262
  Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal;
 
1263
  do {
 
1264
    len = (sz < remaining ? sz : remaining);
 
1265
    memcpy(dst, data, 4 * len);
 
1266
    
 
1267
    if(sz >= remaining){
 
1268
      NdbApiSignal* tCurr = theLastKEYINFO;
 
1269
      tCurr->setLength(KeyInfo::MaxSignalLength);
 
1270
      NdbApiSignal* tSignal = tCurr->next();
 
1271
      if(tSignal)
 
1272
        ;
 
1273
      else if((tSignal = theNdb->getSignal()) != 0)
 
1274
      {
 
1275
        tCurr->next(tSignal);
 
1276
        tSignal->setSignal(GSN_KEYINFO);
 
1277
      } else {
 
1278
        goto error;
 
1279
      }
 
1280
      theLastKEYINFO = tSignal;
 
1281
      theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
 
1282
      remaining = KeyInfo::DataLength;
 
1283
      sz -= len;
 
1284
      data += len;
 
1285
    } else {
 
1286
      len = (KeyInfo::DataLength - remaining) + len;
 
1287
      break;
 
1288
    }
 
1289
  } while(true);   
 
1290
  theTotalNrOfKeyWordInSignal = len;
 
1291
  return 0;
 
1292
 
 
1293
error:
 
1294
  setErrorCodeAbort(4228);    // XXX wrong code
 
1295
  return -1;
 
1296
}
 
1297
 
 
1298
Uint32
 
1299
NdbIndexScanOperation::getKeyFromSCANTABREQ(Uint32* data, Uint32 size)
 
1300
{
 
1301
  DBUG_ENTER("NdbIndexScanOperation::getKeyFromSCANTABREQ");
 
1302
  assert(size >= theTotalNrOfKeyWordInSignal);
 
1303
  size = theTotalNrOfKeyWordInSignal;
 
1304
  NdbApiSignal* tSignal = theSCAN_TABREQ->next();
 
1305
  Uint32 pos = 0;
 
1306
  while (pos < size) {
 
1307
    assert(tSignal != NULL);
 
1308
    Uint32* tData = tSignal->getDataPtrSend();
 
1309
    Uint32 rem = size - pos;
 
1310
    if (rem > KeyInfo::DataLength)
 
1311
      rem = KeyInfo::DataLength;
 
1312
    Uint32 i = 0;
 
1313
    while (i < rem) {
 
1314
      data[pos + i] = tData[KeyInfo::HeaderLength + i];
 
1315
      i++;
 
1316
    }
 
1317
    pos += rem;
 
1318
  }
 
1319
  DBUG_DUMP("key", (uchar*) data, size << 2);
 
1320
  DBUG_RETURN(size);
 
1321
}
 
1322
 
 
1323
int
 
1324
NdbIndexScanOperation::readTuples(LockMode lm,
 
1325
                                  Uint32 scan_flags,
 
1326
                                  Uint32 parallel,
 
1327
                                  Uint32 batch)
 
1328
{
 
1329
  const bool order_by = scan_flags & SF_OrderBy;
 
1330
  const bool order_desc = scan_flags & SF_Descending;
 
1331
  const bool read_range_no = scan_flags & SF_ReadRangeNo;
 
1332
  m_multi_range = scan_flags & SF_MultiRange;
 
1333
 
 
1334
  int res = NdbScanOperation::readTuples(lm, scan_flags, parallel, batch);
 
1335
  if(!res && read_range_no)
 
1336
  {
 
1337
    m_read_range_no = 1;
 
1338
    Uint32 word = 0;
 
1339
    AttributeHeader::init(&word, AttributeHeader::RANGE_NO, 0);
 
1340
    if(insertATTRINFO(word) == -1)
 
1341
      res = -1;
 
1342
  }
 
1343
  if (!res)
 
1344
  {
 
1345
    /**
 
1346
     * Note that it is valid to have order_desc true and order_by false.
 
1347
     *
 
1348
     * This means that there will be no merge sort among partitions, but
 
1349
     * each partition will still be returned in descending sort order.
 
1350
     *
 
1351
     * This is useful eg. if it is known that the scan spans only one
 
1352
     * partition.
 
1353
     */
 
1354
    if (order_desc) {
 
1355
      m_descending = true;
 
1356
      ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
 
1357
      ScanTabReq::setDescendingFlag(req->requestInfo, true);
 
1358
    }
 
1359
    if (order_by) {
 
1360
      m_ordered = true;
 
1361
      Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
 
1362
      m_sort_columns = cnt; // -1 for NDB$NODE
 
1363
      m_current_api_receiver = m_sent_receivers_count;
 
1364
      m_api_receivers_count = m_sent_receivers_count;
 
1365
    
 
1366
      m_sort_columns = cnt;
 
1367
      for(Uint32 i = 0; i<cnt; i++){
 
1368
        const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
 
1369
        const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
 
1370
        NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
 
1371
        UintPtr newVal = UintPtr(tmp);
 
1372
        theTupleKeyDefined[i][0] = FAKE_PTR;
 
1373
        theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
 
1374
#if (SIZEOF_CHARP == 8)
 
1375
        theTupleKeyDefined[i][2] = (newVal >> 32);
 
1376
#endif
 
1377
      }
 
1378
    }
 
1379
  }
 
1380
  m_this_bound_start = 0;
 
1381
  m_first_bound_word = theKEYINFOptr;
 
1382
  
 
1383
  return res;
 
1384
}
 
1385
 
 
1386
void
 
1387
NdbIndexScanOperation::fix_get_values(){
 
1388
  /**
 
1389
   * Loop through all getValues and set buffer pointer to "API" pointer
 
1390
   */
 
1391
  NdbRecAttr * curr = theReceiver.theFirstRecAttr;
 
1392
  Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
 
1393
  assert(cnt <  NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
 
1394
  
 
1395
  for(Uint32 i = 0; i<cnt; i++){
 
1396
    Uint32 val = theTupleKeyDefined[i][0];
 
1397
    switch(val){
 
1398
    case FAKE_PTR:
 
1399
      curr->setup(curr->m_column, 0);
 
1400
    case API_PTR:
 
1401
      curr = curr->next();
 
1402
      break;
 
1403
    case SETBOUND_EQ:
 
1404
      break;
 
1405
#ifdef VM_TRACE
 
1406
    default:
 
1407
      abort();
 
1408
#endif
 
1409
    }
 
1410
  }
 
1411
}
 
1412
 
 
1413
int
 
1414
NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, 
 
1415
                               const NdbReceiver* t1, 
 
1416
                               const NdbReceiver* t2){
 
1417
 
 
1418
  NdbRecAttr * r1 = t1->m_rows[t1->m_current_row];
 
1419
  NdbRecAttr * r2 = t2->m_rows[t2->m_current_row];
 
1420
 
 
1421
  r1 = (skip ? r1->next() : r1);
 
1422
  r2 = (skip ? r2->next() : r2);
 
1423
  const int jdir = 1 - 2 * (int)m_descending;
 
1424
  assert(jdir == 1 || jdir == -1);
 
1425
  while(cols > 0){
 
1426
    Uint32 * d1 = (Uint32*)r1->aRef();
 
1427
    Uint32 * d2 = (Uint32*)r2->aRef();
 
1428
    unsigned r1_null = r1->isNULL();
 
1429
    if((r1_null ^ (unsigned)r2->isNULL())){
 
1430
      return (r1_null ? -1 : 1) * jdir;
 
1431
    }
 
1432
    const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
 
1433
    Uint32 len1 = r1->get_size_in_bytes();
 
1434
    Uint32 len2 = r2->get_size_in_bytes();
 
1435
    if(!r1_null){
 
1436
      const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_type);
 
1437
      int r = (*sqlType.m_cmp)(col.m_cs, d1, len1, d2, len2, true);
 
1438
      if(r){
 
1439
        assert(r != NdbSqlUtil::CmpUnknown);
 
1440
        return r * jdir;
 
1441
      }
 
1442
    }
 
1443
    cols--;
 
1444
    r1 = r1->next();
 
1445
    r2 = r2->next();
 
1446
  }
 
1447
  return 0;
 
1448
}
 
1449
 
 
1450
int
 
1451
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
 
1452
                                           bool forceSend){
 
1453
  
 
1454
  m_curr_row = 0;
 
1455
  Uint32 u_idx = 0, u_last = 0;
 
1456
  Uint32 s_idx   = m_current_api_receiver; // first sorted
 
1457
  Uint32 s_last  = theParallelism;         // last sorted
 
1458
 
 
1459
  NdbReceiver** arr = m_api_receivers;
 
1460
  NdbReceiver* tRec = arr[s_idx];
 
1461
  
 
1462
  if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d",
 
1463
                                 fetchAllowed, 
 
1464
                                 (s_idx < s_last ? tRec->nextResult() : 0));
 
1465
  
 
1466
  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
 
1467
                                 u_idx, u_last,
 
1468
                                 s_idx, s_last);
 
1469
  
 
1470
  bool fetchNeeded = (s_idx == s_last) || !tRec->nextResult();
 
1471
  
 
1472
  if(fetchNeeded){
 
1473
    if(fetchAllowed){
 
1474
      if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
 
1475
      TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
 
1476
      /*
 
1477
        The PollGuard has an implicit call of unlock_and_signal through the
 
1478
        ~PollGuard method. This method is called implicitly by the compiler
 
1479
        in all places where the object is out of context due to a return,
 
1480
        break, continue or simply end of statement block
 
1481
      */
 
1482
      PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
 
1483
                           theNdb->theNdbBlockNumber);
 
1484
      if(theError.code)
 
1485
        return -1;
 
1486
      Uint32 seq = theNdbCon->theNodeSequence;
 
1487
      Uint32 nodeId = theNdbCon->theDBnode;
 
1488
      Uint32 timeout = tp->m_waitfor_timeout;
 
1489
      if(seq == tp->getNodeSequence(nodeId) &&
 
1490
         !send_next_scan_ordered(s_idx)){
 
1491
        Uint32 tmp = m_sent_receivers_count;
 
1492
        s_idx = m_current_api_receiver; 
 
1493
        while(m_sent_receivers_count > 0 && !theError.code){
 
1494
          int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
 
1495
          if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
 
1496
            continue;
 
1497
          }
 
1498
          if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
 
1499
          if(ret_code == -1){
 
1500
            setErrorCode(4008);
 
1501
          } else {
 
1502
            setErrorCode(4028);
 
1503
          }
 
1504
          return -1;
 
1505
        }
 
1506
        
 
1507
        if(theError.code){
 
1508
          setErrorCode(theError.code);
 
1509
          if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
 
1510
          return -1;
 
1511
        }
 
1512
        
 
1513
        u_idx = 0;
 
1514
        u_last = m_conf_receivers_count;
 
1515
        m_conf_receivers_count = 0;
 
1516
        memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
 
1517
        
 
1518
        if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
 
1519
      } else {
 
1520
        setErrorCode(4028);
 
1521
        return -1;
 
1522
      }
 
1523
    } else {
 
1524
      if(DEBUG_NEXT_RESULT) ndbout_c("return 2");
 
1525
      return 2;
 
1526
    }
 
1527
  } else {
 
1528
    u_idx = s_idx;
 
1529
    u_last = s_idx + 1;
 
1530
    s_idx++;
 
1531
  }
 
1532
  
 
1533
  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
 
1534
                                 u_idx, u_last,
 
1535
                                 s_idx, s_last);
 
1536
 
 
1537
 
 
1538
  Uint32 cols = m_sort_columns + m_read_range_no;
 
1539
  Uint32 skip = m_keyInfo;
 
1540
  while(u_idx < u_last){
 
1541
    u_last--;
 
1542
    tRec = arr[u_last];
 
1543
    
 
1544
    // Do binary search instead to find place
 
1545
    Uint32 place = s_idx;
 
1546
    for(; place < s_last; place++){
 
1547
      if(compare(skip, cols, tRec, arr[place]) <= 0){
 
1548
        break;
 
1549
      }
 
1550
    }
 
1551
    
 
1552
    if(place != s_idx){
 
1553
      if(DEBUG_NEXT_RESULT) 
 
1554
        ndbout_c("memmove(%d, %d, %d)", s_idx-1, s_idx, (place - s_idx));
 
1555
      memmove(arr+s_idx-1, arr+s_idx, sizeof(char*)*(place - s_idx));
 
1556
    }
 
1557
    
 
1558
    if(DEBUG_NEXT_RESULT) ndbout_c("putting %d @ %d", u_last, place - 1);
 
1559
    m_api_receivers[place-1] = tRec;
 
1560
    s_idx--;
 
1561
  }
 
1562
 
 
1563
  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
 
1564
                                 u_idx, u_last,
 
1565
                                 s_idx, s_last);
 
1566
  
 
1567
  m_current_api_receiver = s_idx;
 
1568
  
 
1569
  if(DEBUG_NEXT_RESULT)
 
1570
    for(Uint32 i = s_idx; i<s_last; i++)
 
1571
      ndbout_c("%p", arr[i]);
 
1572
  
 
1573
  tRec = m_api_receivers[s_idx];    
 
1574
  if(s_idx < s_last && tRec->nextResult()){
 
1575
    m_curr_row = tRec->copyout(theReceiver);      
 
1576
    if(DEBUG_NEXT_RESULT) ndbout_c("return 0");
 
1577
    return 0;
 
1578
  }
 
1579
 
 
1580
  theError.code = -1;
 
1581
  if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
 
1582
  return 1;
 
1583
}
 
1584
 
 
1585
int
 
1586
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx)
 
1587
{
 
1588
  if(idx == theParallelism)
 
1589
    return 0;
 
1590
  
 
1591
  NdbReceiver* tRec = m_api_receivers[idx];
 
1592
  NdbApiSignal tSignal(theNdb->theMyRef);
 
1593
  tSignal.setSignal(GSN_SCAN_NEXTREQ);
 
1594
  
 
1595
  Uint32 last = m_sent_receivers_count;
 
1596
  Uint32* theData = tSignal.getDataPtrSend();
 
1597
  Uint32* prep_array = theData + 4;
 
1598
  
 
1599
  m_current_api_receiver = idx + 1;
 
1600
  if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
 
1601
  {
 
1602
    if(DEBUG_NEXT_RESULT)
 
1603
      ndbout_c("receiver completed, don't send");
 
1604
    return 0;
 
1605
  }
 
1606
  
 
1607
  theData[0] = theNdbCon->theTCConPtr;
 
1608
  theData[1] = 0;
 
1609
  Uint64 transId = theNdbCon->theTransactionId;
 
1610
  theData[2] = transId;
 
1611
  theData[3] = (Uint32) (transId >> 32);
 
1612
  
 
1613
  /**
 
1614
   * Prepare ops
 
1615
   */
 
1616
  m_sent_receivers[last] = tRec;
 
1617
  tRec->m_list_index = last;
 
1618
  tRec->prepareSend();
 
1619
  m_sent_receivers_count = last + 1;
 
1620
  
 
1621
  Uint32 nodeId = theNdbCon->theDBnode;
 
1622
  TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
 
1623
  tSignal.setLength(4+1);
 
1624
  int ret= tp->sendSignal(&tSignal, nodeId);
 
1625
  return ret;
 
1626
}
 
1627
 
 
1628
int
 
1629
NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
 
1630
                             PollGuard *poll_guard)
 
1631
{
 
1632
  Uint32 seq = theNdbCon->theNodeSequence;
 
1633
  Uint32 nodeId = theNdbCon->theDBnode;
 
1634
  
 
1635
  if(seq != tp->getNodeSequence(nodeId))
 
1636
  {
 
1637
    theNdbCon->theReleaseOnClose = true;
 
1638
    return -1;
 
1639
  }
 
1640
  
 
1641
  Uint32 timeout = tp->m_waitfor_timeout;
 
1642
  /**
 
1643
   * Wait for outstanding
 
1644
   */
 
1645
  while(theError.code == 0 && m_sent_receivers_count)
 
1646
  {
 
1647
    int return_code= poll_guard->wait_scan(3*timeout, nodeId, forceSend);
 
1648
    switch(return_code){
 
1649
    case 0:
 
1650
      break;
 
1651
    case -1:
 
1652
      setErrorCode(4008);
 
1653
    case -2:
 
1654
      m_api_receivers_count = 0;
 
1655
      m_conf_receivers_count = 0;
 
1656
      m_sent_receivers_count = 0;
 
1657
      theNdbCon->theReleaseOnClose = true;
 
1658
      return -1;
 
1659
    }
 
1660
  }
 
1661
 
 
1662
  if(theError.code)
 
1663
  {
 
1664
    m_api_receivers_count = 0;
 
1665
    m_current_api_receiver = m_ordered ? theParallelism : 0;
 
1666
  }
 
1667
 
 
1668
 
 
1669
  /**
 
1670
   * move all conf'ed into api
 
1671
   *   so that send_next_scan can check if they needs to be closed
 
1672
   */
 
1673
  Uint32 api = m_api_receivers_count;
 
1674
  Uint32 conf = m_conf_receivers_count;
 
1675
 
 
1676
  if(m_ordered)
 
1677
  {
 
1678
    /**
 
1679
     * Ordered scan, keep the m_api_receivers "to the right"
 
1680
     */
 
1681
    memmove(m_api_receivers, m_api_receivers+m_current_api_receiver, 
 
1682
            (theParallelism - m_current_api_receiver) * sizeof(char*));
 
1683
    api = (theParallelism - m_current_api_receiver);
 
1684
    m_api_receivers_count = api;
 
1685
  }
 
1686
  
 
1687
  if(DEBUG_NEXT_RESULT)
 
1688
    ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
 
1689
             m_ordered, api, conf, 
 
1690
             m_sent_receivers_count, m_current_api_receiver, theParallelism);
 
1691
  
 
1692
  if(api+conf)
 
1693
  {
 
1694
    /**
 
1695
     * There's something to close
 
1696
     *   setup m_api_receivers (for send_next_scan)
 
1697
     */
 
1698
    memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
 
1699
    m_api_receivers_count = api + conf;
 
1700
    m_conf_receivers_count = 0;
 
1701
  }
 
1702
  
 
1703
  // Send close scan
 
1704
  if(send_next_scan(api+conf, true) == -1)
 
1705
  {
 
1706
    theNdbCon->theReleaseOnClose = true;
 
1707
    return -1;
 
1708
  }
 
1709
  
 
1710
  /**
 
1711
   * wait for close scan conf
 
1712
   */
 
1713
  while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
 
1714
  {
 
1715
    int return_code= poll_guard->wait_scan(3*timeout, nodeId, forceSend);
 
1716
    switch(return_code){
 
1717
    case 0:
 
1718
      break;
 
1719
    case -1:
 
1720
      setErrorCode(4008);
 
1721
    case -2:
 
1722
      m_api_receivers_count = 0;
 
1723
      m_conf_receivers_count = 0;
 
1724
      m_sent_receivers_count = 0;
 
1725
      theNdbCon->theReleaseOnClose = true;
 
1726
      return -1;
 
1727
    }
 
1728
  }
 
1729
  
 
1730
  return 0;
 
1731
}
 
1732
 
 
1733
void
 
1734
NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
 
1735
  for(Uint32 i = 0; i<parallell; i++){
 
1736
    m_receivers[i]->m_list_index = i;
 
1737
    m_prepared_receivers[i] = m_receivers[i]->getId();
 
1738
    m_sent_receivers[i] = m_receivers[i];
 
1739
    m_conf_receivers[i] = 0;
 
1740
    m_api_receivers[i] = 0;
 
1741
    m_receivers[i]->prepareSend();
 
1742
  }
 
1743
  
 
1744
  m_api_receivers_count = 0;
 
1745
  m_current_api_receiver = 0;
 
1746
  m_sent_receivers_count = 0;
 
1747
  m_conf_receivers_count = 0;
 
1748
}
 
1749
 
 
1750
int
 
1751
NdbScanOperation::restart(bool forceSend)
 
1752
{
 
1753
  
 
1754
  TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
 
1755
  /*
 
1756
    The PollGuard has an implicit call of unlock_and_signal through the
 
1757
    ~PollGuard method. This method is called implicitly by the compiler
 
1758
    in all places where the object is out of context due to a return,
 
1759
    break, continue or simply end of statement block
 
1760
  */
 
1761
  PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
 
1762
                       theNdb->theNdbBlockNumber);
 
1763
  Uint32 nodeId = theNdbCon->theDBnode;
 
1764
  
 
1765
  {
 
1766
    int res;
 
1767
    if((res= close_impl(tp, forceSend, &poll_guard)))
 
1768
    {
 
1769
      return res;
 
1770
    }
 
1771
  }
 
1772
  
 
1773
  /**
 
1774
   * Reset receivers
 
1775
   */
 
1776
  reset_receivers(theParallelism, m_ordered);
 
1777
  
 
1778
  theError.code = 0;
 
1779
  if (doSendScan(nodeId) == -1)
 
1780
    return -1;
 
1781
  return 0;
 
1782
}
 
1783
 
 
1784
int
 
1785
NdbIndexScanOperation::reset_bounds(bool forceSend){
 
1786
  int res;
 
1787
  
 
1788
  {
 
1789
    TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
 
1790
    /*
 
1791
      The PollGuard has an implicit call of unlock_and_signal through the
 
1792
      ~PollGuard method. This method is called implicitly by the compiler
 
1793
      in all places where the object is out of context due to a return,
 
1794
      break, continue or simply end of statement block
 
1795
    */
 
1796
    PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
 
1797
                         theNdb->theNdbBlockNumber);
 
1798
    res= close_impl(tp, forceSend, &poll_guard);
 
1799
  }
 
1800
 
 
1801
  if(!res)
 
1802
  {
 
1803
    theError.code = 0;
 
1804
    reset_receivers(theParallelism, m_ordered);
 
1805
    
 
1806
    theLastKEYINFO = theSCAN_TABREQ->next();
 
1807
    theKEYINFOptr = ((KeyInfo*)theLastKEYINFO->getDataPtrSend())->keyData;
 
1808
    theTupKeyLen = 0;
 
1809
    theTotalNrOfKeyWordInSignal = 0;
 
1810
    theNoOfTupKeyLeft = m_accessTable->m_noOfDistributionKeys;
 
1811
    theDistrKeyIndicator_ = 0;
 
1812
    m_this_bound_start = 0;
 
1813
    m_first_bound_word = theKEYINFOptr;
 
1814
    m_transConnection
 
1815
      ->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp,
 
1816
                    this);
 
1817
    m_transConnection->define_scan_op(this);
 
1818
    return 0;
 
1819
  }
 
1820
  return res;
 
1821
}
 
1822
 
 
1823
int
 
1824
NdbIndexScanOperation::end_of_bound(Uint32 no)
 
1825
{
 
1826
  DBUG_ENTER("end_of_bound");
 
1827
  DBUG_PRINT("info", ("Range number %u", no));
 
1828
  /* Check that SF_MultiRange has been specified if more
 
1829
     than one range is specified */
 
1830
  if (no > 0 && !m_multi_range)
 
1831
    DBUG_RETURN(-1);
 
1832
  if(no < (1 << 13)) // Only 12-bits no of ranges
 
1833
  {
 
1834
    Uint32 bound_head = * m_first_bound_word;
 
1835
    bound_head |= (theTupKeyLen - m_this_bound_start) << 16 | (no << 4);
 
1836
    * m_first_bound_word = bound_head;
 
1837
    
 
1838
    m_first_bound_word = theKEYINFOptr + theTotalNrOfKeyWordInSignal;;
 
1839
    m_this_bound_start = theTupKeyLen;
 
1840
    DBUG_RETURN(0);
 
1841
  }
 
1842
  DBUG_RETURN(-1);
 
1843
}
 
1844
 
 
1845
int
 
1846
NdbIndexScanOperation::get_range_no()
 
1847
{
 
1848
  NdbRecAttr* tRecAttr = m_curr_row;
 
1849
  if(m_read_range_no && tRecAttr)
 
1850
  {
 
1851
    if(m_keyInfo)
 
1852
      tRecAttr = tRecAttr->next();
 
1853
    Uint32 ret = *(Uint32*)tRecAttr->aRef();
 
1854
    return ret;
 
1855
  }
 
1856
  return -1;
 
1857
}