~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#define DBTUP_C
 
17
#define DBTUP_SCAN_CPP
 
18
#include "Dbtup.hpp"
 
19
#include <signaldata/AccScan.hpp>
 
20
#include <signaldata/NextScan.hpp>
 
21
#include <signaldata/AccLock.hpp>
 
22
#include <md5_hash.hpp>
 
23
 
 
24
#undef jam
 
25
#undef jamEntry
 
26
#define jam() { jamLine(32000 + __LINE__); }
 
27
#define jamEntry() { jamEntryLine(32000 + __LINE__); }
 
28
 
 
29
#ifdef VM_TRACE
 
30
#define dbg(x) globalSignalLoggers.log x
 
31
#else
 
32
#define dbg(x)
 
33
#endif
 
34
 
 
35
void
 
36
Dbtup::execACC_SCANREQ(Signal* signal)
 
37
{
 
38
  jamEntry();
 
39
  const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
 
40
  const AccScanReq* const req = &reqCopy;
 
41
  ScanOpPtr scanPtr;
 
42
  scanPtr.i = RNIL;
 
43
  do {
 
44
    // find table and fragment
 
45
    TablerecPtr tablePtr;
 
46
    tablePtr.i = req->tableId;
 
47
    ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
 
48
    FragrecordPtr fragPtr;
 
49
    Uint32 fragId = req->fragmentNo;
 
50
    fragPtr.i = RNIL;
 
51
    getFragmentrec(fragPtr, fragId, tablePtr.p);
 
52
    ndbrequire(fragPtr.i != RNIL);
 
53
    Fragrecord& frag = *fragPtr.p;
 
54
    // flags
 
55
    Uint32 bits = 0;
 
56
    
 
57
 
 
58
    if (AccScanReq::getLcpScanFlag(req->requestInfo))
 
59
    {
 
60
      jam();
 
61
      bits |= ScanOp::SCAN_LCP;
 
62
      c_scanOpPool.getPtr(scanPtr, c_lcp_scan_op);
 
63
    }
 
64
    else
 
65
    {
 
66
      // seize from pool and link to per-fragment list
 
67
      LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
 
68
      if (! list.seize(scanPtr)) {
 
69
        jam();
 
70
        break;
 
71
      }
 
72
    }
 
73
     
 
74
    if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
 
75
        && tablePtr.p->m_no_of_disk_attributes)
 
76
    {
 
77
      bits |= ScanOp::SCAN_DD;
 
78
    }
 
79
    
 
80
    bool mm = (bits & ScanOp::SCAN_DD);
 
81
    if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) {
 
82
      bits |= ScanOp::SCAN_VS;
 
83
      
 
84
      // disk pages have fixed page format
 
85
      ndbrequire(! (bits & ScanOp::SCAN_DD));
 
86
    }
 
87
    if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) {
 
88
      if (AccScanReq::getLockMode(req->requestInfo) == 0)
 
89
        bits |= ScanOp::SCAN_LOCK_SH;
 
90
      else
 
91
        bits |= ScanOp::SCAN_LOCK_EX;
 
92
    }
 
93
 
 
94
    if (AccScanReq::getNRScanFlag(req->requestInfo))
 
95
    {
 
96
      jam();
 
97
      bits |= ScanOp::SCAN_NR;
 
98
      scanPtr.p->m_endPage = req->maxPage;
 
99
      if (req->maxPage != RNIL && req->maxPage > frag.noOfPages)
 
100
      {
 
101
         ndbout_c("%u %u endPage: %u (noOfPages: %u)", 
 
102
                   tablePtr.i, fragId,
 
103
                   req->maxPage, fragPtr.p->noOfPages);
 
104
      }
 
105
    }
 
106
    else
 
107
    {
 
108
      jam();
 
109
      scanPtr.p->m_endPage = RNIL;
 
110
    }
 
111
 
 
112
    if (AccScanReq::getLcpScanFlag(req->requestInfo))
 
113
    {
 
114
      jam();
 
115
      ndbrequire((bits & ScanOp::SCAN_DD) == 0);
 
116
      ndbrequire((bits & ScanOp::SCAN_LOCK) == 0);
 
117
    }
 
118
    
 
119
    // set up scan op
 
120
    new (scanPtr.p) ScanOp();
 
121
    ScanOp& scan = *scanPtr.p;
 
122
    scan.m_state = ScanOp::First;
 
123
    scan.m_bits = bits;
 
124
    scan.m_userPtr = req->senderData;
 
125
    scan.m_userRef = req->senderRef;
 
126
    scan.m_tableId = tablePtr.i;
 
127
    scan.m_fragId = frag.fragmentId;
 
128
    scan.m_fragPtrI = fragPtr.i;
 
129
    scan.m_transId1 = req->transId1;
 
130
    scan.m_transId2 = req->transId2;
 
131
    scan.m_savePointId = req->savePointId;
 
132
 
 
133
    // conf
 
134
    AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
 
135
    conf->scanPtr = req->senderData;
 
136
    conf->accPtr = scanPtr.i;
 
137
    conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
 
138
    sendSignal(req->senderRef, GSN_ACC_SCANCONF,
 
139
        signal, AccScanConf::SignalLength, JBB);
 
140
    return;
 
141
  } while (0);
 
142
  if (scanPtr.i != RNIL) {
 
143
    jam();
 
144
    releaseScanOp(scanPtr);
 
145
  }
 
146
  // LQH does not handle REF
 
147
  signal->theData[0] = 0x313;
 
148
  sendSignal(req->senderRef, GSN_ACC_SCANREF, signal, 1, JBB);
 
149
}
 
150
 
 
151
void
 
152
Dbtup::execNEXT_SCANREQ(Signal* signal)
 
153
{
 
154
  jamEntry();
 
155
  const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
 
156
  const NextScanReq* const req = &reqCopy;
 
157
  ScanOpPtr scanPtr;
 
158
  c_scanOpPool.getPtr(scanPtr, req->accPtr);
 
159
  ScanOp& scan = *scanPtr.p;
 
160
  switch (req->scanFlag) {
 
161
  case NextScanReq::ZSCAN_NEXT:
 
162
    jam();
 
163
    break;
 
164
  case NextScanReq::ZSCAN_NEXT_COMMIT:
 
165
    jam();
 
166
  case NextScanReq::ZSCAN_COMMIT:
 
167
    jam();
 
168
    if ((scan.m_bits & ScanOp::SCAN_LOCK) != 0) {
 
169
      jam();
 
170
      AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
 
171
      lockReq->returnCode = RNIL;
 
172
      lockReq->requestInfo = AccLockReq::Unlock;
 
173
      lockReq->accOpPtr = req->accOperationPtr;
 
174
      EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
 
175
          signal, AccLockReq::UndoSignalLength);
 
176
      jamEntry();
 
177
      ndbrequire(lockReq->returnCode == AccLockReq::Success);
 
178
      removeAccLockOp(scan, req->accOperationPtr);
 
179
    }
 
180
    if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
 
181
      NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
 
182
      conf->scanPtr = scan.m_userPtr;
 
183
      unsigned signalLength = 1;
 
184
      sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
 
185
                 signal, signalLength, JBB);
 
186
      return;
 
187
    }
 
188
    break;
 
189
  case NextScanReq::ZSCAN_CLOSE:
 
190
    jam();
 
191
    if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
 
192
      jam();
 
193
      ndbrequire(scan.m_accLockOp != RNIL);
 
194
      // use ACC_ABORTCONF to flush out any reply in job buffer
 
195
      AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
 
196
      lockReq->returnCode = RNIL;
 
197
      lockReq->requestInfo = AccLockReq::AbortWithConf;
 
198
      lockReq->accOpPtr = scan.m_accLockOp;
 
199
      EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
 
200
                     signal, AccLockReq::UndoSignalLength);
 
201
      jamEntry();
 
202
      ndbrequire(lockReq->returnCode == AccLockReq::Success);
 
203
      scan.m_state = ScanOp::Aborting;
 
204
      return;
 
205
    }
 
206
    if (scan.m_state == ScanOp::Locked) {
 
207
      jam();
 
208
      ndbrequire(scan.m_accLockOp != RNIL);
 
209
      AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
 
210
      lockReq->returnCode = RNIL;
 
211
      lockReq->requestInfo = AccLockReq::Abort;
 
212
      lockReq->accOpPtr = scan.m_accLockOp;
 
213
      EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
 
214
                     signal, AccLockReq::UndoSignalLength);
 
215
      jamEntry();
 
216
      ndbrequire(lockReq->returnCode == AccLockReq::Success);
 
217
      scan.m_accLockOp = RNIL;
 
218
    }
 
219
    scan.m_state = ScanOp::Aborting;
 
220
    scanClose(signal, scanPtr);
 
221
    return;
 
222
  case NextScanReq::ZSCAN_NEXT_ABORT:
 
223
    jam();
 
224
  default:
 
225
    jam();
 
226
    ndbrequire(false);
 
227
    break;
 
228
  }
 
229
  // start looking for next scan result
 
230
  AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
 
231
  checkReq->accPtr = scanPtr.i;
 
232
  checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
 
233
  EXECUTE_DIRECT(DBTUP, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
 
234
  jamEntry();
 
235
}
 
236
 
 
237
void
 
238
Dbtup::execACC_CHECK_SCAN(Signal* signal)
 
239
{
 
240
  jamEntry();
 
241
  const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
 
242
  const AccCheckScan* const req = &reqCopy;
 
243
  ScanOpPtr scanPtr;
 
244
  c_scanOpPool.getPtr(scanPtr, req->accPtr);
 
245
  ScanOp& scan = *scanPtr.p;
 
246
  // fragment
 
247
  FragrecordPtr fragPtr;
 
248
  fragPtr.i = scan.m_fragPtrI;
 
249
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
250
  Fragrecord& frag = *fragPtr.p;
 
251
  if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
 
252
    jam();
 
253
    signal->theData[0] = scan.m_userPtr;
 
254
    signal->theData[1] = true;
 
255
    EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
 
256
    jamEntry();
 
257
    return;
 
258
  }
 
259
  if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
 
260
    jam();
 
261
    // LQH asks if we are waiting for lock and we tell it to ask again
 
262
    NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
 
263
    conf->scanPtr = scan.m_userPtr;
 
264
    conf->accOperationPtr = RNIL;       // no tuple returned
 
265
    conf->fragId = frag.fragmentId;
 
266
    unsigned signalLength = 3;
 
267
    // if TC has ordered scan close, it will be detected here
 
268
    sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
 
269
        signal, signalLength, JBB);
 
270
    return;     // stop
 
271
  }
 
272
  if (scan.m_state == ScanOp::First) {
 
273
    jam();
 
274
    scanFirst(signal, scanPtr);
 
275
  }
 
276
  if (scan.m_state == ScanOp::Next) {
 
277
    jam();
 
278
    bool immediate = scanNext(signal, scanPtr);
 
279
    if (! immediate) {
 
280
      jam();
 
281
      // time-slicing via TUP or PGMAN
 
282
      return;
 
283
    }
 
284
  }
 
285
  scanReply(signal, scanPtr);
 
286
}
 
287
 
 
288
void
 
289
Dbtup::scanReply(Signal* signal, ScanOpPtr scanPtr)
 
290
{
 
291
  ScanOp& scan = *scanPtr.p;
 
292
  FragrecordPtr fragPtr;
 
293
  fragPtr.i = scan.m_fragPtrI;
 
294
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
295
  Fragrecord& frag = *fragPtr.p;
 
296
  // for reading tuple key in Current state
 
297
  Uint32* pkData = (Uint32*)c_dataBuffer;
 
298
  unsigned pkSize = 0;
 
299
  if (scan.m_state == ScanOp::Current) {
 
300
    // found an entry to return
 
301
    jam();
 
302
    ndbrequire(scan.m_accLockOp == RNIL);
 
303
    if (scan.m_bits & ScanOp::SCAN_LOCK) {
 
304
      jam();
 
305
      // read tuple key - use TUX routine
 
306
      const ScanPos& pos = scan.m_scanPos;
 
307
      const Local_key& key_mm = pos.m_key_mm;
 
308
      int ret = tuxReadPk(fragPtr.i, pos.m_realpid_mm, key_mm.m_page_idx,
 
309
                          pkData, true);
 
310
      ndbrequire(ret > 0);
 
311
      pkSize = ret;
 
312
      dbg((DBTUP, "PK size=%d data=%08x", pkSize, pkData[0]));
 
313
      // get read lock or exclusive lock
 
314
      AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
 
315
      lockReq->returnCode = RNIL;
 
316
      lockReq->requestInfo = (scan.m_bits & ScanOp::SCAN_LOCK_SH) ?
 
317
        AccLockReq::LockShared : AccLockReq::LockExclusive;
 
318
      lockReq->accOpPtr = RNIL;
 
319
      lockReq->userPtr = scanPtr.i;
 
320
      lockReq->userRef = reference();
 
321
      lockReq->tableId = scan.m_tableId;
 
322
      lockReq->fragId = frag.fragmentId;
 
323
      lockReq->fragPtrI = RNIL; // no cached frag ptr yet
 
324
      lockReq->hashValue = md5_hash((Uint64*)pkData, pkSize);
 
325
      lockReq->tupAddr = key_mm.ref();
 
326
      lockReq->transId1 = scan.m_transId1;
 
327
      lockReq->transId2 = scan.m_transId2;
 
328
      EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
 
329
          signal, AccLockReq::LockSignalLength);
 
330
      jamEntry();
 
331
      switch (lockReq->returnCode) {
 
332
      case AccLockReq::Success:
 
333
        jam();
 
334
        scan.m_state = ScanOp::Locked;
 
335
        scan.m_accLockOp = lockReq->accOpPtr;
 
336
        break;
 
337
      case AccLockReq::IsBlocked:
 
338
        jam();
 
339
        // normal lock wait
 
340
        scan.m_state = ScanOp::Blocked;
 
341
        scan.m_bits |= ScanOp::SCAN_LOCK_WAIT;
 
342
        scan.m_accLockOp = lockReq->accOpPtr;
 
343
        // LQH will wake us up
 
344
        signal->theData[0] = scan.m_userPtr;
 
345
        signal->theData[1] = true;
 
346
        EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
 
347
        jamEntry();
 
348
        return;
 
349
        break;
 
350
      case AccLockReq::Refused:
 
351
        jam();
 
352
        // we cannot see deleted tuple (assert only)
 
353
        ndbassert(false);
 
354
        // skip it
 
355
        scan.m_state = ScanOp::Next;
 
356
        signal->theData[0] = scan.m_userPtr;
 
357
        signal->theData[1] = true;
 
358
        EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
 
359
        jamEntry();
 
360
        return;
 
361
        break;
 
362
      case AccLockReq::NoFreeOp:
 
363
        jam();
 
364
        // max ops should depend on max scans (assert only)
 
365
        ndbassert(false);
 
366
        // stay in Current state
 
367
        scan.m_state = ScanOp::Current;
 
368
        signal->theData[0] = scan.m_userPtr;
 
369
        signal->theData[1] = true;
 
370
        EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
 
371
        jamEntry();
 
372
        return;
 
373
        break;
 
374
      default:
 
375
        ndbrequire(false);
 
376
        break;
 
377
      }
 
378
    } else {
 
379
      scan.m_state = ScanOp::Locked;
 
380
    }
 
381
  } 
 
382
 
 
383
  if (scan.m_state == ScanOp::Locked) {
 
384
    // we have lock or do not need one
 
385
    jam();
 
386
    // conf signal
 
387
    NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
 
388
    conf->scanPtr = scan.m_userPtr;
 
389
    // the lock is passed to LQH
 
390
    Uint32 accLockOp = scan.m_accLockOp;
 
391
    if (accLockOp != RNIL) {
 
392
      scan.m_accLockOp = RNIL;
 
393
      // remember it until LQH unlocks it
 
394
      addAccLockOp(scan, accLockOp);
 
395
    } else {
 
396
      ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK));
 
397
      // operation RNIL in LQH would signal no tuple returned
 
398
      accLockOp = (Uint32)-1;
 
399
    }
 
400
    const ScanPos& pos = scan.m_scanPos;
 
401
    conf->accOperationPtr = accLockOp;
 
402
    conf->fragId = frag.fragmentId;
 
403
    conf->localKey[0] = pos.m_key_mm.ref();
 
404
    conf->localKey[1] = 0;
 
405
    conf->localKeyLength = 1;
 
406
    unsigned signalLength = 6;
 
407
    if (scan.m_bits & ScanOp::SCAN_LOCK) {
 
408
      sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
 
409
          signal, signalLength, JBB);
 
410
    } else {
 
411
      Uint32 blockNo = refToBlock(scan.m_userRef);
 
412
      EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
 
413
      jamEntry();
 
414
    }
 
415
    // next time look for next entry
 
416
    scan.m_state = ScanOp::Next;
 
417
    return;
 
418
  }
 
419
  if (scan.m_state == ScanOp::Last ||
 
420
      scan.m_state == ScanOp::Invalid) {
 
421
    jam();
 
422
    NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
 
423
    conf->scanPtr = scan.m_userPtr;
 
424
    conf->accOperationPtr = RNIL;
 
425
    conf->fragId = RNIL;
 
426
    unsigned signalLength = 3;
 
427
    sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
 
428
        signal, signalLength, JBB);
 
429
    return;
 
430
  }
 
431
  ndbrequire(false);
 
432
}
 
433
 
 
434
/*
 
435
 * Lock succeeded (after delay) in ACC.  If the lock is for current
 
436
 * entry, set state to Locked.  If the lock is for an entry we were
 
437
 * moved away from, simply unlock it.  Finally, if we are closing the
 
438
 * scan, do nothing since we have already sent an abort request.
 
439
 */
 
440
void
 
441
Dbtup::execACCKEYCONF(Signal* signal)
 
442
{
 
443
  jamEntry();
 
444
  ScanOpPtr scanPtr;
 
445
  scanPtr.i = signal->theData[0];
 
446
  c_scanOpPool.getPtr(scanPtr);
 
447
  ScanOp& scan = *scanPtr.p;
 
448
  ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
 
449
  scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
 
450
  if (scan.m_state == ScanOp::Blocked) {
 
451
    // the lock wait was for current entry
 
452
    jam();
 
453
    scan.m_state = ScanOp::Locked;
 
454
    // LQH has the ball
 
455
    return;
 
456
  }
 
457
  if (scan.m_state != ScanOp::Aborting) {
 
458
    // we were moved, release lock
 
459
    jam();
 
460
    AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
 
461
    lockReq->returnCode = RNIL;
 
462
    lockReq->requestInfo = AccLockReq::Abort;
 
463
    lockReq->accOpPtr = scan.m_accLockOp;
 
464
    EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
 
465
    jamEntry();
 
466
    ndbrequire(lockReq->returnCode == AccLockReq::Success);
 
467
    scan.m_accLockOp = RNIL;
 
468
    // LQH has the ball
 
469
    return;
 
470
  }
 
471
  // lose the lock
 
472
  scan.m_accLockOp = RNIL;
 
473
  // continue at ACC_ABORTCONF
 
474
}
 
475
 
 
476
/*
 
477
 * Lock failed (after delay) in ACC.  Probably means somebody ahead of
 
478
 * us in lock queue deleted the tuple.
 
479
 */
 
480
void
 
481
Dbtup::execACCKEYREF(Signal* signal)
 
482
{
 
483
  jamEntry();
 
484
  ScanOpPtr scanPtr;
 
485
  scanPtr.i = signal->theData[0];
 
486
  c_scanOpPool.getPtr(scanPtr);
 
487
  ScanOp& scan = *scanPtr.p;
 
488
  ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
 
489
  scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
 
490
  if (scan.m_state != ScanOp::Aborting) {
 
491
    jam();
 
492
    // release the operation
 
493
    AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
 
494
    lockReq->returnCode = RNIL;
 
495
    lockReq->requestInfo = AccLockReq::Abort;
 
496
    lockReq->accOpPtr = scan.m_accLockOp;
 
497
    EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
 
498
    jamEntry();
 
499
    ndbrequire(lockReq->returnCode == AccLockReq::Success);
 
500
    scan.m_accLockOp = RNIL;
 
501
    // scan position should already have been moved (assert only)
 
502
    if (scan.m_state == ScanOp::Blocked) {
 
503
      jam();
 
504
      //ndbassert(false);
 
505
      if (scan.m_bits & ScanOp::SCAN_NR)
 
506
      {
 
507
        jam();
 
508
        scan.m_state = ScanOp::Next;
 
509
        scan.m_scanPos.m_get = ScanPos::Get_tuple;
 
510
        ndbout_c("Ignoring scan.m_state == ScanOp::Blocked, refetch");
 
511
      }
 
512
      else
 
513
      {
 
514
        jam();
 
515
        scan.m_state = ScanOp::Next;
 
516
        ndbout_c("Ignoring scan.m_state == ScanOp::Blocked");
 
517
      }
 
518
    }
 
519
    // LQH has the ball
 
520
    return;
 
521
  }
 
522
  // lose the lock
 
523
  scan.m_accLockOp = RNIL;
 
524
  // continue at ACC_ABORTCONF
 
525
}
 
526
 
 
527
/*
 
528
 * Received when scan is closing.  This signal arrives after any
 
529
 * ACCKEYCON or ACCKEYREF which may have been in job buffer.
 
530
 */
 
531
void
 
532
Dbtup::execACC_ABORTCONF(Signal* signal)
 
533
{
 
534
  jamEntry();
 
535
  ScanOpPtr scanPtr;
 
536
  scanPtr.i = signal->theData[0];
 
537
  c_scanOpPool.getPtr(scanPtr);
 
538
  ScanOp& scan = *scanPtr.p;
 
539
  ndbrequire(scan.m_state == ScanOp::Aborting);
 
540
  // most likely we are still in lock wait
 
541
  if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
 
542
    jam();
 
543
    scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
 
544
    scan.m_accLockOp = RNIL;
 
545
  }
 
546
  scanClose(signal, scanPtr);
 
547
}
 
548
 
 
549
void
 
550
Dbtup::scanFirst(Signal*, ScanOpPtr scanPtr)
 
551
{
 
552
  ScanOp& scan = *scanPtr.p;
 
553
  ScanPos& pos = scan.m_scanPos;
 
554
  Local_key& key = pos.m_key;
 
555
  const Uint32 bits = scan.m_bits;
 
556
  // fragment
 
557
  FragrecordPtr fragPtr;
 
558
  fragPtr.i = scan.m_fragPtrI;
 
559
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
560
  Fragrecord& frag = *fragPtr.p;
 
561
  // in the future should not pre-allocate pages
 
562
  if (frag.noOfPages == 0 && ((bits & ScanOp::SCAN_NR) == 0)) {
 
563
    jam();
 
564
    scan.m_state = ScanOp::Last;
 
565
    return;
 
566
  }
 
567
  if (! (bits & ScanOp::SCAN_DD)) {
 
568
    key.m_file_no = ZNIL;
 
569
    key.m_page_no = 0;
 
570
    pos.m_get = ScanPos::Get_page_mm;
 
571
    // for MM scan real page id is cached for efficiency
 
572
    pos.m_realpid_mm = RNIL;
 
573
  } else {
 
574
    Disk_alloc_info& alloc = frag.m_disk_alloc_info;
 
575
    // for now must check disk part explicitly
 
576
    if (alloc.m_extent_list.firstItem == RNIL) {
 
577
      jam();
 
578
      scan.m_state = ScanOp::Last;
 
579
      return;
 
580
    }
 
581
    pos.m_extent_info_ptr_i = alloc.m_extent_list.firstItem;
 
582
    Extent_info* ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
 
583
    key.m_file_no = ext->m_key.m_file_no;
 
584
    key.m_page_no = ext->m_first_page_no;
 
585
    pos.m_get = ScanPos::Get_page_dd;
 
586
  }
 
587
  key.m_page_idx = 0;
 
588
  // let scanNext() do the work
 
589
  scan.m_state = ScanOp::Next;
 
590
}
 
591
 
 
592
bool
 
593
Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
 
594
{
 
595
  ScanOp& scan = *scanPtr.p;
 
596
  ScanPos& pos = scan.m_scanPos;
 
597
  Local_key& key = pos.m_key;
 
598
  const Uint32 bits = scan.m_bits;
 
599
  // table
 
600
  TablerecPtr tablePtr;
 
601
  tablePtr.i = scan.m_tableId;
 
602
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
 
603
  Tablerec& table = *tablePtr.p;
 
604
  // fragment
 
605
  FragrecordPtr fragPtr;
 
606
  fragPtr.i = scan.m_fragPtrI;
 
607
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
608
  Fragrecord& frag = *fragPtr.p;
 
609
  // tuple found
 
610
  Tuple_header* th = 0;
 
611
  Uint32 thbits = 0;
 
612
  Uint32 loop_count = 0;
 
613
  Uint32 scanGCI = scanPtr.p->m_scanGCI;
 
614
  Uint32 foundGCI;
 
615
 
 
616
  const bool mm = (bits & ScanOp::SCAN_DD);
 
617
  const bool lcp = (bits & ScanOp::SCAN_LCP);
 
618
  
 
619
  Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
 
620
  Uint32 size = table.m_offsets[mm].m_fix_header_size;
 
621
 
 
622
  if (lcp && lcp_list != RNIL)
 
623
    goto found_lcp_keep;
 
624
 
 
625
  switch(pos.m_get){
 
626
  case ScanPos::Get_next_tuple:
 
627
  case ScanPos::Get_next_tuple_fs:
 
628
    jam();
 
629
    key.m_page_idx += size;
 
630
    // fall through
 
631
  case ScanPos::Get_tuple:
 
632
  case ScanPos::Get_tuple_fs:
 
633
    jam();
 
634
    /**
 
635
     * We need to refetch page after timeslice
 
636
     */
 
637
    pos.m_get = ScanPos::Get_page;
 
638
    break;
 
639
  default:
 
640
    break;
 
641
  }
 
642
  
 
643
  while (true) {
 
644
    switch (pos.m_get) {
 
645
    case ScanPos::Get_next_page:
 
646
      // move to next page
 
647
      jam();
 
648
      {
 
649
        if (! (bits & ScanOp::SCAN_DD))
 
650
          pos.m_get = ScanPos::Get_next_page_mm;
 
651
        else
 
652
          pos.m_get = ScanPos::Get_next_page_dd;
 
653
      }
 
654
      continue;
 
655
    case ScanPos::Get_page:
 
656
      // get real page
 
657
      jam();
 
658
      {
 
659
        if (! (bits & ScanOp::SCAN_DD))
 
660
          pos.m_get = ScanPos::Get_page_mm;
 
661
        else
 
662
          pos.m_get = ScanPos::Get_page_dd;
 
663
      }
 
664
      continue;
 
665
    case ScanPos::Get_next_page_mm:
 
666
      // move to next logical TUP page
 
667
      jam();
 
668
      {
 
669
        key.m_page_no++;
 
670
        if (key.m_page_no >= frag.noOfPages) {
 
671
          jam();
 
672
 
 
673
          if ((bits & ScanOp::SCAN_NR) && (scan.m_endPage != RNIL))
 
674
          {
 
675
            jam();
 
676
            if (key.m_page_no < scan.m_endPage)
 
677
            {
 
678
              jam();
 
679
              ndbout_c("scanning page %u", key.m_page_no);
 
680
              goto cont;
 
681
            }
 
682
          }
 
683
          // no more pages, scan ends
 
684
          pos.m_get = ScanPos::Get_undef;
 
685
          scan.m_state = ScanOp::Last;
 
686
          return true;
 
687
        }
 
688
    cont:
 
689
        key.m_page_idx = 0;
 
690
        pos.m_get = ScanPos::Get_page_mm;
 
691
        // clear cached value
 
692
        pos.m_realpid_mm = RNIL;
 
693
      }
 
694
      /*FALLTHRU*/
 
695
    case ScanPos::Get_page_mm:
 
696
      // get TUP real page
 
697
      jam();
 
698
      {
 
699
        if (pos.m_realpid_mm == RNIL) {
 
700
          jam();
 
701
          if (key.m_page_no < frag.noOfPages)
 
702
            pos.m_realpid_mm = getRealpid(fragPtr.p, key.m_page_no);
 
703
          else
 
704
          {
 
705
            ndbassert(bits & ScanOp::SCAN_NR);
 
706
            goto nopage;
 
707
          }
 
708
        }
 
709
        PagePtr pagePtr;
 
710
        c_page_pool.getPtr(pagePtr, pos.m_realpid_mm);
 
711
 
 
712
        if (pagePtr.p->page_state == ZEMPTY_MM) {
 
713
          // skip empty page
 
714
          jam();
 
715
          if (! (bits & ScanOp::SCAN_NR))
 
716
          {
 
717
            pos.m_get = ScanPos::Get_next_page_mm;
 
718
            break; // incr loop count
 
719
          }
 
720
          else
 
721
          {
 
722
            jam();
 
723
            pos.m_realpid_mm = RNIL;
 
724
          }
 
725
        }
 
726
    nopage:
 
727
        pos.m_page = pagePtr.p;
 
728
        pos.m_get = ScanPos::Get_tuple;
 
729
      }
 
730
      continue;
 
731
    case ScanPos::Get_next_page_dd:
 
732
      // move to next disk page
 
733
      jam();
 
734
      {
 
735
        Disk_alloc_info& alloc = frag.m_disk_alloc_info;
 
736
        Local_fragment_extent_list list(c_extent_pool, alloc.m_extent_list);
 
737
        Ptr<Extent_info> ext_ptr;
 
738
        c_extent_pool.getPtr(ext_ptr, pos.m_extent_info_ptr_i);
 
739
        Extent_info* ext = ext_ptr.p;
 
740
        key.m_page_no++;
 
741
        if (key.m_page_no >= ext->m_first_page_no + alloc.m_extent_size) {
 
742
          // no more pages in this extent
 
743
          jam();
 
744
          if (! list.next(ext_ptr)) {
 
745
            // no more extents, scan ends
 
746
            jam();
 
747
            pos.m_get = ScanPos::Get_undef;
 
748
            scan.m_state = ScanOp::Last;
 
749
            return true;
 
750
          } else {
 
751
            // move to next extent
 
752
            jam();
 
753
            pos.m_extent_info_ptr_i = ext_ptr.i;
 
754
            ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
 
755
            key.m_file_no = ext->m_key.m_file_no;
 
756
            key.m_page_no = ext->m_first_page_no;
 
757
          }
 
758
        }
 
759
        key.m_page_idx = 0;
 
760
        pos.m_get = ScanPos::Get_page_dd;
 
761
        /*
 
762
          read ahead for scan in disk order
 
763
          do read ahead every 8:th page
 
764
        */
 
765
        if ((bits & ScanOp::SCAN_DD) &&
 
766
            (((key.m_page_no - ext->m_first_page_no) & 7) == 0))
 
767
        {
 
768
          jam();
 
769
          // initialize PGMAN request
 
770
          Page_cache_client::Request preq;
 
771
          preq.m_page = pos.m_key;
 
772
          preq.m_callback = TheNULLCallback;
 
773
 
 
774
          // set maximum read ahead
 
775
          Uint32 read_ahead = m_max_page_read_ahead;
 
776
 
 
777
          while (true)
 
778
          {
 
779
            // prepare page read ahead in current extent
 
780
            Uint32 page_no = preq.m_page.m_page_no;
 
781
            Uint32 page_no_limit = page_no + read_ahead;
 
782
            Uint32 limit = ext->m_first_page_no + alloc.m_extent_size;
 
783
            if (page_no_limit > limit)
 
784
            {
 
785
              jam();
 
786
              // read ahead crosses extent, set limit for this extent
 
787
              read_ahead = page_no_limit - limit;
 
788
              page_no_limit = limit;
 
789
              // and make sure we only read one extra extent next time around
 
790
              if (read_ahead > alloc.m_extent_size)
 
791
                read_ahead = alloc.m_extent_size;
 
792
            }
 
793
            else
 
794
            {
 
795
              jam();
 
796
              read_ahead = 0; // no more to read ahead after this
 
797
            }
 
798
            // do read ahead pages for this extent
 
799
            while (page_no < page_no_limit)
 
800
            {
 
801
              // page request to PGMAN
 
802
              jam();
 
803
              preq.m_page.m_page_no = page_no;
 
804
              int flags = 0;
 
805
              // ignore result
 
806
              m_pgman.get_page(signal, preq, flags);
 
807
              jamEntry();
 
808
              page_no++;
 
809
            }
 
810
            if (!read_ahead || !list.next(ext_ptr))
 
811
            {
 
812
              // no more extents after this or read ahead done
 
813
              jam();
 
814
              break;
 
815
            }
 
816
            // move to next extent and initialize PGMAN request accordingly
 
817
            Extent_info* ext = c_extent_pool.getPtr(ext_ptr.i);
 
818
            preq.m_page.m_file_no = ext->m_key.m_file_no;
 
819
            preq.m_page.m_page_no = ext->m_first_page_no;
 
820
          }
 
821
        } // if ScanOp::SCAN_DD read ahead
 
822
      }
 
823
      /*FALLTHRU*/
 
824
    case ScanPos::Get_page_dd:
 
825
      // get global page in PGMAN cache
 
826
      jam();
 
827
      {
 
828
        // check if page is un-allocated or empty
 
829
        if (likely(! (bits & ScanOp::SCAN_NR)))
 
830
        {
 
831
          Tablespace_client tsman(signal, c_tsman,
 
832
                                  frag.fragTableId, 
 
833
                                  frag.fragmentId, 
 
834
                                  frag.m_tablespace_id);
 
835
          unsigned uncommitted, committed;
 
836
          uncommitted = committed = ~(unsigned)0;
 
837
          int ret = tsman.get_page_free_bits(&key, &uncommitted, &committed);
 
838
          ndbrequire(ret == 0);
 
839
          if (committed == 0 && uncommitted == 0) {
 
840
            // skip empty page
 
841
            jam();
 
842
            pos.m_get = ScanPos::Get_next_page_dd;
 
843
            break; // incr loop count
 
844
          }
 
845
        }
 
846
        // page request to PGMAN
 
847
        Page_cache_client::Request preq;
 
848
        preq.m_page = pos.m_key;
 
849
        preq.m_callback.m_callbackData = scanPtr.i;
 
850
        preq.m_callback.m_callbackFunction =
 
851
          safe_cast(&Dbtup::disk_page_tup_scan_callback);
 
852
        int flags = 0;
 
853
        int res = m_pgman.get_page(signal, preq, flags);
 
854
        jamEntry();
 
855
        if (res == 0) {
 
856
          jam();
 
857
          // request queued
 
858
          pos.m_get = ScanPos::Get_tuple;
 
859
          return false;
 
860
        }
 
861
        ndbrequire(res > 0);
 
862
        pos.m_page = (Page*)m_pgman.m_ptr.p;
 
863
      }
 
864
      pos.m_get = ScanPos::Get_tuple;
 
865
      continue;
 
866
      // get tuple
 
867
      // move to next tuple
 
868
    case ScanPos::Get_next_tuple:
 
869
    case ScanPos::Get_next_tuple_fs:
 
870
      // move to next fixed size tuple
 
871
      jam();
 
872
      {
 
873
        key.m_page_idx += size;
 
874
        pos.m_get = ScanPos::Get_tuple_fs;
 
875
      }
 
876
      /*FALLTHRU*/
 
877
    case ScanPos::Get_tuple:
 
878
    case ScanPos::Get_tuple_fs:
 
879
      // get fixed size tuple
 
880
      jam();
 
881
      {
 
882
        Fix_page* page = (Fix_page*)pos.m_page;
 
883
        if (key.m_page_idx + size <= Fix_page::DATA_WORDS) 
 
884
        {
 
885
          pos.m_get = ScanPos::Get_next_tuple_fs;
 
886
          th = (Tuple_header*)&page->m_data[key.m_page_idx];
 
887
          
 
888
          if (likely(! (bits & ScanOp::SCAN_NR)))
 
889
          {
 
890
            jam();
 
891
            thbits = th->m_header_bits;
 
892
            if (! (thbits & Tuple_header::FREE))
 
893
            {
 
894
              goto found_tuple;
 
895
            } 
 
896
          }
 
897
          else
 
898
          {
 
899
            if (pos.m_realpid_mm == RNIL)
 
900
            {
 
901
              jam();
 
902
              foundGCI = 0;
 
903
              goto found_deleted_rowid;
 
904
            }
 
905
            thbits = th->m_header_bits;
 
906
            if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
 
907
                foundGCI == 0)
 
908
            {
 
909
              if (! (thbits & Tuple_header::FREE))
 
910
              {
 
911
                jam();
 
912
                goto found_tuple;
 
913
              }
 
914
              else
 
915
              {
 
916
                goto found_deleted_rowid;
 
917
              }
 
918
            }
 
919
            else if (thbits != Fix_page::FREE_RECORD && 
 
920
                     th->m_operation_ptr_i != RNIL)
 
921
            {
 
922
              jam();
 
923
              goto found_tuple; // Locked tuple...
 
924
              // skip free tuple
 
925
            }
 
926
          }
 
927
        } else {
 
928
          jam();
 
929
          // no more tuples on this page
 
930
          pos.m_get = ScanPos::Get_next_page;
 
931
        }
 
932
      }
 
933
      break; // incr loop count
 
934
  found_tuple:
 
935
      // found possible tuple to return
 
936
      jam();
 
937
      {
 
938
        // caller has already set pos.m_get to next tuple
 
939
        if (! (bits & ScanOp::SCAN_LCP && thbits & Tuple_header::LCP_SKIP)) {
 
940
          Local_key& key_mm = pos.m_key_mm;
 
941
          if (! (bits & ScanOp::SCAN_DD)) {
 
942
            key_mm = pos.m_key;
 
943
            // real page id is already set
 
944
          } else {
 
945
            key_mm.assref(th->m_base_record_ref);
 
946
            // recompute for each disk tuple
 
947
            pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
 
948
          }
 
949
          // TUPKEYREQ handles savepoint stuff
 
950
          scan.m_state = ScanOp::Current;
 
951
          return true;
 
952
        } else {
 
953
          jam();
 
954
          // clear it so that it will show up in next LCP
 
955
          th->m_header_bits = thbits & ~(Uint32)Tuple_header::LCP_SKIP;
 
956
          if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
 
957
            jam();
 
958
            setChecksum(th, tablePtr.p);
 
959
          }
 
960
        }
 
961
      }
 
962
      break;
 
963
  found_deleted_rowid:
 
964
      jam();
 
965
      {
 
966
        ndbassert(bits & ScanOp::SCAN_NR);
 
967
        Local_key& key_mm = pos.m_key_mm;
 
968
        if (! (bits & ScanOp::SCAN_DD)) {
 
969
          key_mm = pos.m_key;
 
970
          // caller has already set pos.m_get to next tuple
 
971
          // real page id is already set
 
972
        } else {
 
973
          key_mm.assref(th->m_base_record_ref);
 
974
          // recompute for each disk tuple
 
975
          pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
 
976
          
 
977
          Fix_page *mmpage = (Fix_page*)c_page_pool.getPtr(pos.m_realpid_mm);
 
978
          th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
 
979
          if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
 
980
              foundGCI == 0)
 
981
          {
 
982
            if (! (thbits & Tuple_header::FREE))
 
983
              break;
 
984
          }
 
985
        }
 
986
        
 
987
        NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
 
988
        conf->scanPtr = scan.m_userPtr;
 
989
        conf->accOperationPtr = RNIL;
 
990
        conf->fragId = frag.fragmentId;
 
991
        conf->localKey[0] = pos.m_key_mm.ref();
 
992
        conf->localKey[1] = 0;
 
993
        conf->localKeyLength = 1;
 
994
        conf->gci = foundGCI;
 
995
        Uint32 blockNo = refToBlock(scan.m_userRef);
 
996
        EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
 
997
        jamEntry();
 
998
 
 
999
        // TUPKEYREQ handles savepoint stuff
 
1000
        loop_count = 32;
 
1001
        scan.m_state = ScanOp::Next;
 
1002
        return false;
 
1003
      }
 
1004
      break; // incr loop count
 
1005
    default:
 
1006
      ndbrequire(false);
 
1007
      break;
 
1008
    }
 
1009
    if (++loop_count >= 32)
 
1010
      break;
 
1011
  }
 
1012
  // TODO: at drop table we have to flush and terminate these
 
1013
  jam();
 
1014
  signal->theData[0] = ZTUP_SCAN;
 
1015
  signal->theData[1] = scanPtr.i;
 
1016
  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
 
1017
  return false;
 
1018
 
 
1019
found_lcp_keep:
 
1020
  Local_key tmp;
 
1021
  tmp.assref(lcp_list);
 
1022
  tmp.m_page_no = getRealpid(fragPtr.p, tmp.m_page_no);
 
1023
  
 
1024
  Ptr<Page> pagePtr;
 
1025
  c_page_pool.getPtr(pagePtr, tmp.m_page_no);
 
1026
  Tuple_header* ptr = (Tuple_header*)
 
1027
    ((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
 
1028
  Uint32 headerbits = ptr->m_header_bits;
 
1029
  ndbrequire(headerbits & Tuple_header::LCP_KEEP);
 
1030
  
 
1031
  Uint32 next = ptr->m_operation_ptr_i;
 
1032
  ptr->m_operation_ptr_i = RNIL;
 
1033
  ptr->m_header_bits = headerbits & ~(Uint32)Tuple_header::FREE;
 
1034
  
 
1035
  if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
 
1036
    jam();
 
1037
    setChecksum(ptr, tablePtr.p);
 
1038
  }
 
1039
  
 
1040
  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
 
1041
  conf->scanPtr = scan.m_userPtr;
 
1042
  conf->accOperationPtr = (Uint32)-1;
 
1043
  conf->fragId = frag.fragmentId;
 
1044
  conf->localKey[0] = lcp_list;
 
1045
  conf->localKey[1] = 0;
 
1046
  conf->localKeyLength = 1;
 
1047
  conf->gci = 0;
 
1048
  Uint32 blockNo = refToBlock(scan.m_userRef);
 
1049
  EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
 
1050
  
 
1051
  fragPtr.p->m_lcp_keep_list = next;
 
1052
  ptr->m_header_bits |= Tuple_header::FREED; // RESTORE free flag
 
1053
  if (headerbits & Tuple_header::FREED)
 
1054
  {
 
1055
    if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
 
1056
    {
 
1057
      jam();
 
1058
      free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
 
1059
    } else {
 
1060
      jam();
 
1061
      free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
 
1062
    }
 
1063
  }
 
1064
  return false;
 
1065
}
 
1066
 
 
1067
void
 
1068
Dbtup::scanCont(Signal* signal, ScanOpPtr scanPtr)
 
1069
{
 
1070
  bool immediate = scanNext(signal, scanPtr);
 
1071
  if (! immediate) {
 
1072
    jam();
 
1073
    // time-slicing again
 
1074
    return;
 
1075
  }
 
1076
  scanReply(signal, scanPtr);
 
1077
}
 
1078
 
 
1079
void
 
1080
Dbtup::disk_page_tup_scan_callback(Signal* signal, Uint32 scanPtrI, Uint32 page_i)
 
1081
{
 
1082
  ScanOpPtr scanPtr;
 
1083
  c_scanOpPool.getPtr(scanPtr, scanPtrI);
 
1084
  ScanOp& scan = *scanPtr.p;
 
1085
  ScanPos& pos = scan.m_scanPos;
 
1086
  // get cache page
 
1087
  Ptr<GlobalPage> gptr;
 
1088
  m_global_page_pool.getPtr(gptr, page_i);
 
1089
  pos.m_page = (Page*)gptr.p;
 
1090
  // continue
 
1091
  scanCont(signal, scanPtr);
 
1092
}
 
1093
 
 
1094
void
 
1095
Dbtup::scanClose(Signal* signal, ScanOpPtr scanPtr)
 
1096
{
 
1097
  ScanOp& scan = *scanPtr.p;
 
1098
  ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) && scan.m_accLockOp == RNIL);
 
1099
  // unlock all not unlocked by LQH
 
1100
  LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
 
1101
  ScanLockPtr lockPtr;
 
1102
  while (list.first(lockPtr)) {
 
1103
    jam();
 
1104
    AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
 
1105
    lockReq->returnCode = RNIL;
 
1106
    lockReq->requestInfo = AccLockReq::Abort;
 
1107
    lockReq->accOpPtr = lockPtr.p->m_accLockOp;
 
1108
    EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
 
1109
    jamEntry();
 
1110
    ndbrequire(lockReq->returnCode == AccLockReq::Success);
 
1111
    list.release(lockPtr);
 
1112
  }
 
1113
  // send conf
 
1114
  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
 
1115
  conf->scanPtr = scanPtr.p->m_userPtr;
 
1116
  conf->accOperationPtr = RNIL;
 
1117
  conf->fragId = RNIL;
 
1118
  unsigned signalLength = 3;
 
1119
  sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
 
1120
      signal, signalLength, JBB);
 
1121
  releaseScanOp(scanPtr);
 
1122
}
 
1123
 
 
1124
void
 
1125
Dbtup::addAccLockOp(ScanOp& scan, Uint32 accLockOp)
 
1126
{
 
1127
  LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
 
1128
  ScanLockPtr lockPtr;
 
1129
#ifdef VM_TRACE
 
1130
  list.first(lockPtr);
 
1131
  while (lockPtr.i != RNIL) {
 
1132
    ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
 
1133
    list.next(lockPtr);
 
1134
  }
 
1135
#endif
 
1136
  bool ok = list.seize(lockPtr);
 
1137
  ndbrequire(ok);
 
1138
  lockPtr.p->m_accLockOp = accLockOp;
 
1139
}
 
1140
 
 
1141
void
 
1142
Dbtup::removeAccLockOp(ScanOp& scan, Uint32 accLockOp)
 
1143
{
 
1144
  LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
 
1145
  ScanLockPtr lockPtr;
 
1146
  list.first(lockPtr);
 
1147
  while (lockPtr.i != RNIL) {
 
1148
    if (lockPtr.p->m_accLockOp == accLockOp) {
 
1149
      jam();
 
1150
      break;
 
1151
    }
 
1152
    list.next(lockPtr);
 
1153
  }
 
1154
  ndbrequire(lockPtr.i != RNIL);
 
1155
  list.release(lockPtr);
 
1156
}
 
1157
 
 
1158
void
 
1159
Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
 
1160
{
 
1161
  FragrecordPtr fragPtr;
 
1162
  fragPtr.i = scanPtr.p->m_fragPtrI;
 
1163
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
1164
 
 
1165
  if(scanPtr.p->m_bits & ScanOp::SCAN_LCP)
 
1166
  {
 
1167
    jam();
 
1168
    fragPtr.p->m_lcp_scan_op = RNIL;
 
1169
    scanPtr.p->m_fragPtrI = RNIL;
 
1170
  }
 
1171
  else
 
1172
  {
 
1173
    jam();
 
1174
    LocalDLList<ScanOp> list(c_scanOpPool, fragPtr.p->m_scanList);    
 
1175
    list.release(scanPtr);
 
1176
  }
 
1177
}
 
1178
 
 
1179
void
 
1180
Dbtup::execLCP_FRAG_ORD(Signal* signal)
 
1181
{
 
1182
  LcpFragOrd* req= (LcpFragOrd*)signal->getDataPtr();
 
1183
  
 
1184
  TablerecPtr tablePtr;
 
1185
  tablePtr.i = req->tableId;
 
1186
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
 
1187
 
 
1188
  if (tablePtr.p->m_no_of_disk_attributes)
 
1189
  {
 
1190
    jam();
 
1191
    FragrecordPtr fragPtr;
 
1192
    Uint32 fragId = req->fragmentId;
 
1193
    fragPtr.i = RNIL;
 
1194
    getFragmentrec(fragPtr, fragId, tablePtr.p);
 
1195
    ndbrequire(fragPtr.i != RNIL);
 
1196
    Fragrecord& frag = *fragPtr.p;
 
1197
    
 
1198
    ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
 
1199
    frag.m_lcp_scan_op = c_lcp_scan_op;
 
1200
    ScanOpPtr scanPtr;
 
1201
    c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
 
1202
    ndbrequire(scanPtr.p->m_fragPtrI == RNIL);
 
1203
    scanPtr.p->m_fragPtrI = fragPtr.i;
 
1204
    
 
1205
    scanFirst(signal, scanPtr);
 
1206
    scanPtr.p->m_state = ScanOp::First;
 
1207
  }
 
1208
}