1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
#define DBTUP_SCAN_CPP
19
#include <signaldata/AccScan.hpp>
20
#include <signaldata/NextScan.hpp>
21
#include <signaldata/AccLock.hpp>
22
#include <md5_hash.hpp>
26
#define jam() { jamLine(32000 + __LINE__); }
27
#define jamEntry() { jamEntryLine(32000 + __LINE__); }
30
#define dbg(x) globalSignalLoggers.log x
36
Dbtup::execACC_SCANREQ(Signal* signal)
39
const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
40
const AccScanReq* const req = &reqCopy;
44
// find table and fragment
46
tablePtr.i = req->tableId;
47
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
48
FragrecordPtr fragPtr;
49
Uint32 fragId = req->fragmentNo;
51
getFragmentrec(fragPtr, fragId, tablePtr.p);
52
ndbrequire(fragPtr.i != RNIL);
53
Fragrecord& frag = *fragPtr.p;
58
if (AccScanReq::getLcpScanFlag(req->requestInfo))
61
bits |= ScanOp::SCAN_LCP;
62
c_scanOpPool.getPtr(scanPtr, c_lcp_scan_op);
66
// seize from pool and link to per-fragment list
67
LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
68
if (! list.seize(scanPtr)) {
74
if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
75
&& tablePtr.p->m_no_of_disk_attributes)
77
bits |= ScanOp::SCAN_DD;
80
bool mm = (bits & ScanOp::SCAN_DD);
81
if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) {
82
bits |= ScanOp::SCAN_VS;
84
// disk pages have fixed page format
85
ndbrequire(! (bits & ScanOp::SCAN_DD));
87
if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) {
88
if (AccScanReq::getLockMode(req->requestInfo) == 0)
89
bits |= ScanOp::SCAN_LOCK_SH;
91
bits |= ScanOp::SCAN_LOCK_EX;
94
if (AccScanReq::getNRScanFlag(req->requestInfo))
97
bits |= ScanOp::SCAN_NR;
98
scanPtr.p->m_endPage = req->maxPage;
99
if (req->maxPage != RNIL && req->maxPage > frag.noOfPages)
101
ndbout_c("%u %u endPage: %u (noOfPages: %u)",
103
req->maxPage, fragPtr.p->noOfPages);
109
scanPtr.p->m_endPage = RNIL;
112
if (AccScanReq::getLcpScanFlag(req->requestInfo))
115
ndbrequire((bits & ScanOp::SCAN_DD) == 0);
116
ndbrequire((bits & ScanOp::SCAN_LOCK) == 0);
120
new (scanPtr.p) ScanOp();
121
ScanOp& scan = *scanPtr.p;
122
scan.m_state = ScanOp::First;
124
scan.m_userPtr = req->senderData;
125
scan.m_userRef = req->senderRef;
126
scan.m_tableId = tablePtr.i;
127
scan.m_fragId = frag.fragmentId;
128
scan.m_fragPtrI = fragPtr.i;
129
scan.m_transId1 = req->transId1;
130
scan.m_transId2 = req->transId2;
131
scan.m_savePointId = req->savePointId;
134
AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
135
conf->scanPtr = req->senderData;
136
conf->accPtr = scanPtr.i;
137
conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
138
sendSignal(req->senderRef, GSN_ACC_SCANCONF,
139
signal, AccScanConf::SignalLength, JBB);
142
if (scanPtr.i != RNIL) {
144
releaseScanOp(scanPtr);
146
// LQH does not handle REF
147
signal->theData[0] = 0x313;
148
sendSignal(req->senderRef, GSN_ACC_SCANREF, signal, 1, JBB);
152
Dbtup::execNEXT_SCANREQ(Signal* signal)
155
const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
156
const NextScanReq* const req = &reqCopy;
158
c_scanOpPool.getPtr(scanPtr, req->accPtr);
159
ScanOp& scan = *scanPtr.p;
160
switch (req->scanFlag) {
161
case NextScanReq::ZSCAN_NEXT:
164
case NextScanReq::ZSCAN_NEXT_COMMIT:
166
case NextScanReq::ZSCAN_COMMIT:
168
if ((scan.m_bits & ScanOp::SCAN_LOCK) != 0) {
170
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
171
lockReq->returnCode = RNIL;
172
lockReq->requestInfo = AccLockReq::Unlock;
173
lockReq->accOpPtr = req->accOperationPtr;
174
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
175
signal, AccLockReq::UndoSignalLength);
177
ndbrequire(lockReq->returnCode == AccLockReq::Success);
178
removeAccLockOp(scan, req->accOperationPtr);
180
if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
181
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
182
conf->scanPtr = scan.m_userPtr;
183
unsigned signalLength = 1;
184
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
185
signal, signalLength, JBB);
189
case NextScanReq::ZSCAN_CLOSE:
191
if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
193
ndbrequire(scan.m_accLockOp != RNIL);
194
// use ACC_ABORTCONF to flush out any reply in job buffer
195
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
196
lockReq->returnCode = RNIL;
197
lockReq->requestInfo = AccLockReq::AbortWithConf;
198
lockReq->accOpPtr = scan.m_accLockOp;
199
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
200
signal, AccLockReq::UndoSignalLength);
202
ndbrequire(lockReq->returnCode == AccLockReq::Success);
203
scan.m_state = ScanOp::Aborting;
206
if (scan.m_state == ScanOp::Locked) {
208
ndbrequire(scan.m_accLockOp != RNIL);
209
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
210
lockReq->returnCode = RNIL;
211
lockReq->requestInfo = AccLockReq::Abort;
212
lockReq->accOpPtr = scan.m_accLockOp;
213
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
214
signal, AccLockReq::UndoSignalLength);
216
ndbrequire(lockReq->returnCode == AccLockReq::Success);
217
scan.m_accLockOp = RNIL;
219
scan.m_state = ScanOp::Aborting;
220
scanClose(signal, scanPtr);
222
case NextScanReq::ZSCAN_NEXT_ABORT:
229
// start looking for next scan result
230
AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
231
checkReq->accPtr = scanPtr.i;
232
checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
233
EXECUTE_DIRECT(DBTUP, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
238
Dbtup::execACC_CHECK_SCAN(Signal* signal)
241
const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
242
const AccCheckScan* const req = &reqCopy;
244
c_scanOpPool.getPtr(scanPtr, req->accPtr);
245
ScanOp& scan = *scanPtr.p;
247
FragrecordPtr fragPtr;
248
fragPtr.i = scan.m_fragPtrI;
249
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
250
Fragrecord& frag = *fragPtr.p;
251
if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
253
signal->theData[0] = scan.m_userPtr;
254
signal->theData[1] = true;
255
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
259
if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
261
// LQH asks if we are waiting for lock and we tell it to ask again
262
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
263
conf->scanPtr = scan.m_userPtr;
264
conf->accOperationPtr = RNIL; // no tuple returned
265
conf->fragId = frag.fragmentId;
266
unsigned signalLength = 3;
267
// if TC has ordered scan close, it will be detected here
268
sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
269
signal, signalLength, JBB);
272
if (scan.m_state == ScanOp::First) {
274
scanFirst(signal, scanPtr);
276
if (scan.m_state == ScanOp::Next) {
278
bool immediate = scanNext(signal, scanPtr);
281
// time-slicing via TUP or PGMAN
285
scanReply(signal, scanPtr);
289
Dbtup::scanReply(Signal* signal, ScanOpPtr scanPtr)
291
ScanOp& scan = *scanPtr.p;
292
FragrecordPtr fragPtr;
293
fragPtr.i = scan.m_fragPtrI;
294
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
295
Fragrecord& frag = *fragPtr.p;
296
// for reading tuple key in Current state
297
Uint32* pkData = (Uint32*)c_dataBuffer;
299
if (scan.m_state == ScanOp::Current) {
300
// found an entry to return
302
ndbrequire(scan.m_accLockOp == RNIL);
303
if (scan.m_bits & ScanOp::SCAN_LOCK) {
305
// read tuple key - use TUX routine
306
const ScanPos& pos = scan.m_scanPos;
307
const Local_key& key_mm = pos.m_key_mm;
308
int ret = tuxReadPk(fragPtr.i, pos.m_realpid_mm, key_mm.m_page_idx,
312
dbg((DBTUP, "PK size=%d data=%08x", pkSize, pkData[0]));
313
// get read lock or exclusive lock
314
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
315
lockReq->returnCode = RNIL;
316
lockReq->requestInfo = (scan.m_bits & ScanOp::SCAN_LOCK_SH) ?
317
AccLockReq::LockShared : AccLockReq::LockExclusive;
318
lockReq->accOpPtr = RNIL;
319
lockReq->userPtr = scanPtr.i;
320
lockReq->userRef = reference();
321
lockReq->tableId = scan.m_tableId;
322
lockReq->fragId = frag.fragmentId;
323
lockReq->fragPtrI = RNIL; // no cached frag ptr yet
324
lockReq->hashValue = md5_hash((Uint64*)pkData, pkSize);
325
lockReq->tupAddr = key_mm.ref();
326
lockReq->transId1 = scan.m_transId1;
327
lockReq->transId2 = scan.m_transId2;
328
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
329
signal, AccLockReq::LockSignalLength);
331
switch (lockReq->returnCode) {
332
case AccLockReq::Success:
334
scan.m_state = ScanOp::Locked;
335
scan.m_accLockOp = lockReq->accOpPtr;
337
case AccLockReq::IsBlocked:
340
scan.m_state = ScanOp::Blocked;
341
scan.m_bits |= ScanOp::SCAN_LOCK_WAIT;
342
scan.m_accLockOp = lockReq->accOpPtr;
343
// LQH will wake us up
344
signal->theData[0] = scan.m_userPtr;
345
signal->theData[1] = true;
346
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
350
case AccLockReq::Refused:
352
// we cannot see deleted tuple (assert only)
355
scan.m_state = ScanOp::Next;
356
signal->theData[0] = scan.m_userPtr;
357
signal->theData[1] = true;
358
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
362
case AccLockReq::NoFreeOp:
364
// max ops should depend on max scans (assert only)
366
// stay in Current state
367
scan.m_state = ScanOp::Current;
368
signal->theData[0] = scan.m_userPtr;
369
signal->theData[1] = true;
370
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
379
scan.m_state = ScanOp::Locked;
383
if (scan.m_state == ScanOp::Locked) {
384
// we have lock or do not need one
387
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
388
conf->scanPtr = scan.m_userPtr;
389
// the lock is passed to LQH
390
Uint32 accLockOp = scan.m_accLockOp;
391
if (accLockOp != RNIL) {
392
scan.m_accLockOp = RNIL;
393
// remember it until LQH unlocks it
394
addAccLockOp(scan, accLockOp);
396
ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK));
397
// operation RNIL in LQH would signal no tuple returned
398
accLockOp = (Uint32)-1;
400
const ScanPos& pos = scan.m_scanPos;
401
conf->accOperationPtr = accLockOp;
402
conf->fragId = frag.fragmentId;
403
conf->localKey[0] = pos.m_key_mm.ref();
404
conf->localKey[1] = 0;
405
conf->localKeyLength = 1;
406
unsigned signalLength = 6;
407
if (scan.m_bits & ScanOp::SCAN_LOCK) {
408
sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
409
signal, signalLength, JBB);
411
Uint32 blockNo = refToBlock(scan.m_userRef);
412
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
415
// next time look for next entry
416
scan.m_state = ScanOp::Next;
419
if (scan.m_state == ScanOp::Last ||
420
scan.m_state == ScanOp::Invalid) {
422
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
423
conf->scanPtr = scan.m_userPtr;
424
conf->accOperationPtr = RNIL;
426
unsigned signalLength = 3;
427
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
428
signal, signalLength, JBB);
435
* Lock succeeded (after delay) in ACC. If the lock is for current
436
* entry, set state to Locked. If the lock is for an entry we were
437
* moved away from, simply unlock it. Finally, if we are closing the
438
* scan, do nothing since we have already sent an abort request.
441
Dbtup::execACCKEYCONF(Signal* signal)
445
scanPtr.i = signal->theData[0];
446
c_scanOpPool.getPtr(scanPtr);
447
ScanOp& scan = *scanPtr.p;
448
ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
449
scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
450
if (scan.m_state == ScanOp::Blocked) {
451
// the lock wait was for current entry
453
scan.m_state = ScanOp::Locked;
457
if (scan.m_state != ScanOp::Aborting) {
458
// we were moved, release lock
460
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
461
lockReq->returnCode = RNIL;
462
lockReq->requestInfo = AccLockReq::Abort;
463
lockReq->accOpPtr = scan.m_accLockOp;
464
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
466
ndbrequire(lockReq->returnCode == AccLockReq::Success);
467
scan.m_accLockOp = RNIL;
472
scan.m_accLockOp = RNIL;
473
// continue at ACC_ABORTCONF
477
* Lock failed (after delay) in ACC. Probably means somebody ahead of
478
* us in lock queue deleted the tuple.
481
Dbtup::execACCKEYREF(Signal* signal)
485
scanPtr.i = signal->theData[0];
486
c_scanOpPool.getPtr(scanPtr);
487
ScanOp& scan = *scanPtr.p;
488
ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
489
scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
490
if (scan.m_state != ScanOp::Aborting) {
492
// release the operation
493
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
494
lockReq->returnCode = RNIL;
495
lockReq->requestInfo = AccLockReq::Abort;
496
lockReq->accOpPtr = scan.m_accLockOp;
497
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
499
ndbrequire(lockReq->returnCode == AccLockReq::Success);
500
scan.m_accLockOp = RNIL;
501
// scan position should already have been moved (assert only)
502
if (scan.m_state == ScanOp::Blocked) {
505
if (scan.m_bits & ScanOp::SCAN_NR)
508
scan.m_state = ScanOp::Next;
509
scan.m_scanPos.m_get = ScanPos::Get_tuple;
510
ndbout_c("Ignoring scan.m_state == ScanOp::Blocked, refetch");
515
scan.m_state = ScanOp::Next;
516
ndbout_c("Ignoring scan.m_state == ScanOp::Blocked");
523
scan.m_accLockOp = RNIL;
524
// continue at ACC_ABORTCONF
528
* Received when scan is closing. This signal arrives after any
529
* ACCKEYCON or ACCKEYREF which may have been in job buffer.
532
Dbtup::execACC_ABORTCONF(Signal* signal)
536
scanPtr.i = signal->theData[0];
537
c_scanOpPool.getPtr(scanPtr);
538
ScanOp& scan = *scanPtr.p;
539
ndbrequire(scan.m_state == ScanOp::Aborting);
540
// most likely we are still in lock wait
541
if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
543
scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
544
scan.m_accLockOp = RNIL;
546
scanClose(signal, scanPtr);
550
Dbtup::scanFirst(Signal*, ScanOpPtr scanPtr)
552
ScanOp& scan = *scanPtr.p;
553
ScanPos& pos = scan.m_scanPos;
554
Local_key& key = pos.m_key;
555
const Uint32 bits = scan.m_bits;
557
FragrecordPtr fragPtr;
558
fragPtr.i = scan.m_fragPtrI;
559
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
560
Fragrecord& frag = *fragPtr.p;
561
// in the future should not pre-allocate pages
562
if (frag.noOfPages == 0 && ((bits & ScanOp::SCAN_NR) == 0)) {
564
scan.m_state = ScanOp::Last;
567
if (! (bits & ScanOp::SCAN_DD)) {
568
key.m_file_no = ZNIL;
570
pos.m_get = ScanPos::Get_page_mm;
571
// for MM scan real page id is cached for efficiency
572
pos.m_realpid_mm = RNIL;
574
Disk_alloc_info& alloc = frag.m_disk_alloc_info;
575
// for now must check disk part explicitly
576
if (alloc.m_extent_list.firstItem == RNIL) {
578
scan.m_state = ScanOp::Last;
581
pos.m_extent_info_ptr_i = alloc.m_extent_list.firstItem;
582
Extent_info* ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
583
key.m_file_no = ext->m_key.m_file_no;
584
key.m_page_no = ext->m_first_page_no;
585
pos.m_get = ScanPos::Get_page_dd;
588
// let scanNext() do the work
589
scan.m_state = ScanOp::Next;
593
Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
595
ScanOp& scan = *scanPtr.p;
596
ScanPos& pos = scan.m_scanPos;
597
Local_key& key = pos.m_key;
598
const Uint32 bits = scan.m_bits;
600
TablerecPtr tablePtr;
601
tablePtr.i = scan.m_tableId;
602
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
603
Tablerec& table = *tablePtr.p;
605
FragrecordPtr fragPtr;
606
fragPtr.i = scan.m_fragPtrI;
607
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
608
Fragrecord& frag = *fragPtr.p;
610
Tuple_header* th = 0;
612
Uint32 loop_count = 0;
613
Uint32 scanGCI = scanPtr.p->m_scanGCI;
616
const bool mm = (bits & ScanOp::SCAN_DD);
617
const bool lcp = (bits & ScanOp::SCAN_LCP);
619
Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
620
Uint32 size = table.m_offsets[mm].m_fix_header_size;
622
if (lcp && lcp_list != RNIL)
626
case ScanPos::Get_next_tuple:
627
case ScanPos::Get_next_tuple_fs:
629
key.m_page_idx += size;
631
case ScanPos::Get_tuple:
632
case ScanPos::Get_tuple_fs:
635
* We need to refetch page after timeslice
637
pos.m_get = ScanPos::Get_page;
645
case ScanPos::Get_next_page:
649
if (! (bits & ScanOp::SCAN_DD))
650
pos.m_get = ScanPos::Get_next_page_mm;
652
pos.m_get = ScanPos::Get_next_page_dd;
655
case ScanPos::Get_page:
659
if (! (bits & ScanOp::SCAN_DD))
660
pos.m_get = ScanPos::Get_page_mm;
662
pos.m_get = ScanPos::Get_page_dd;
665
case ScanPos::Get_next_page_mm:
666
// move to next logical TUP page
670
if (key.m_page_no >= frag.noOfPages) {
673
if ((bits & ScanOp::SCAN_NR) && (scan.m_endPage != RNIL))
676
if (key.m_page_no < scan.m_endPage)
679
ndbout_c("scanning page %u", key.m_page_no);
683
// no more pages, scan ends
684
pos.m_get = ScanPos::Get_undef;
685
scan.m_state = ScanOp::Last;
690
pos.m_get = ScanPos::Get_page_mm;
691
// clear cached value
692
pos.m_realpid_mm = RNIL;
695
case ScanPos::Get_page_mm:
699
if (pos.m_realpid_mm == RNIL) {
701
if (key.m_page_no < frag.noOfPages)
702
pos.m_realpid_mm = getRealpid(fragPtr.p, key.m_page_no);
705
ndbassert(bits & ScanOp::SCAN_NR);
710
c_page_pool.getPtr(pagePtr, pos.m_realpid_mm);
712
if (pagePtr.p->page_state == ZEMPTY_MM) {
715
if (! (bits & ScanOp::SCAN_NR))
717
pos.m_get = ScanPos::Get_next_page_mm;
718
break; // incr loop count
723
pos.m_realpid_mm = RNIL;
727
pos.m_page = pagePtr.p;
728
pos.m_get = ScanPos::Get_tuple;
731
case ScanPos::Get_next_page_dd:
732
// move to next disk page
735
Disk_alloc_info& alloc = frag.m_disk_alloc_info;
736
Local_fragment_extent_list list(c_extent_pool, alloc.m_extent_list);
737
Ptr<Extent_info> ext_ptr;
738
c_extent_pool.getPtr(ext_ptr, pos.m_extent_info_ptr_i);
739
Extent_info* ext = ext_ptr.p;
741
if (key.m_page_no >= ext->m_first_page_no + alloc.m_extent_size) {
742
// no more pages in this extent
744
if (! list.next(ext_ptr)) {
745
// no more extents, scan ends
747
pos.m_get = ScanPos::Get_undef;
748
scan.m_state = ScanOp::Last;
751
// move to next extent
753
pos.m_extent_info_ptr_i = ext_ptr.i;
754
ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
755
key.m_file_no = ext->m_key.m_file_no;
756
key.m_page_no = ext->m_first_page_no;
760
pos.m_get = ScanPos::Get_page_dd;
762
read ahead for scan in disk order
763
do read ahead every 8:th page
765
if ((bits & ScanOp::SCAN_DD) &&
766
(((key.m_page_no - ext->m_first_page_no) & 7) == 0))
769
// initialize PGMAN request
770
Page_cache_client::Request preq;
771
preq.m_page = pos.m_key;
772
preq.m_callback = TheNULLCallback;
774
// set maximum read ahead
775
Uint32 read_ahead = m_max_page_read_ahead;
779
// prepare page read ahead in current extent
780
Uint32 page_no = preq.m_page.m_page_no;
781
Uint32 page_no_limit = page_no + read_ahead;
782
Uint32 limit = ext->m_first_page_no + alloc.m_extent_size;
783
if (page_no_limit > limit)
786
// read ahead crosses extent, set limit for this extent
787
read_ahead = page_no_limit - limit;
788
page_no_limit = limit;
789
// and make sure we only read one extra extent next time around
790
if (read_ahead > alloc.m_extent_size)
791
read_ahead = alloc.m_extent_size;
796
read_ahead = 0; // no more to read ahead after this
798
// do read ahead pages for this extent
799
while (page_no < page_no_limit)
801
// page request to PGMAN
803
preq.m_page.m_page_no = page_no;
806
m_pgman.get_page(signal, preq, flags);
810
if (!read_ahead || !list.next(ext_ptr))
812
// no more extents after this or read ahead done
816
// move to next extent and initialize PGMAN request accordingly
817
Extent_info* ext = c_extent_pool.getPtr(ext_ptr.i);
818
preq.m_page.m_file_no = ext->m_key.m_file_no;
819
preq.m_page.m_page_no = ext->m_first_page_no;
821
} // if ScanOp::SCAN_DD read ahead
824
case ScanPos::Get_page_dd:
825
// get global page in PGMAN cache
828
// check if page is un-allocated or empty
829
if (likely(! (bits & ScanOp::SCAN_NR)))
831
Tablespace_client tsman(signal, c_tsman,
834
frag.m_tablespace_id);
835
unsigned uncommitted, committed;
836
uncommitted = committed = ~(unsigned)0;
837
int ret = tsman.get_page_free_bits(&key, &uncommitted, &committed);
838
ndbrequire(ret == 0);
839
if (committed == 0 && uncommitted == 0) {
842
pos.m_get = ScanPos::Get_next_page_dd;
843
break; // incr loop count
846
// page request to PGMAN
847
Page_cache_client::Request preq;
848
preq.m_page = pos.m_key;
849
preq.m_callback.m_callbackData = scanPtr.i;
850
preq.m_callback.m_callbackFunction =
851
safe_cast(&Dbtup::disk_page_tup_scan_callback);
853
int res = m_pgman.get_page(signal, preq, flags);
858
pos.m_get = ScanPos::Get_tuple;
862
pos.m_page = (Page*)m_pgman.m_ptr.p;
864
pos.m_get = ScanPos::Get_tuple;
867
// move to next tuple
868
case ScanPos::Get_next_tuple:
869
case ScanPos::Get_next_tuple_fs:
870
// move to next fixed size tuple
873
key.m_page_idx += size;
874
pos.m_get = ScanPos::Get_tuple_fs;
877
case ScanPos::Get_tuple:
878
case ScanPos::Get_tuple_fs:
879
// get fixed size tuple
882
Fix_page* page = (Fix_page*)pos.m_page;
883
if (key.m_page_idx + size <= Fix_page::DATA_WORDS)
885
pos.m_get = ScanPos::Get_next_tuple_fs;
886
th = (Tuple_header*)&page->m_data[key.m_page_idx];
888
if (likely(! (bits & ScanOp::SCAN_NR)))
891
thbits = th->m_header_bits;
892
if (! (thbits & Tuple_header::FREE))
899
if (pos.m_realpid_mm == RNIL)
903
goto found_deleted_rowid;
905
thbits = th->m_header_bits;
906
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
909
if (! (thbits & Tuple_header::FREE))
916
goto found_deleted_rowid;
919
else if (thbits != Fix_page::FREE_RECORD &&
920
th->m_operation_ptr_i != RNIL)
923
goto found_tuple; // Locked tuple...
929
// no more tuples on this page
930
pos.m_get = ScanPos::Get_next_page;
933
break; // incr loop count
935
// found possible tuple to return
938
// caller has already set pos.m_get to next tuple
939
if (! (bits & ScanOp::SCAN_LCP && thbits & Tuple_header::LCP_SKIP)) {
940
Local_key& key_mm = pos.m_key_mm;
941
if (! (bits & ScanOp::SCAN_DD)) {
943
// real page id is already set
945
key_mm.assref(th->m_base_record_ref);
946
// recompute for each disk tuple
947
pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
949
// TUPKEYREQ handles savepoint stuff
950
scan.m_state = ScanOp::Current;
954
// clear it so that it will show up in next LCP
955
th->m_header_bits = thbits & ~(Uint32)Tuple_header::LCP_SKIP;
956
if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
958
setChecksum(th, tablePtr.p);
966
ndbassert(bits & ScanOp::SCAN_NR);
967
Local_key& key_mm = pos.m_key_mm;
968
if (! (bits & ScanOp::SCAN_DD)) {
970
// caller has already set pos.m_get to next tuple
971
// real page id is already set
973
key_mm.assref(th->m_base_record_ref);
974
// recompute for each disk tuple
975
pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
977
Fix_page *mmpage = (Fix_page*)c_page_pool.getPtr(pos.m_realpid_mm);
978
th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
979
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
982
if (! (thbits & Tuple_header::FREE))
987
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
988
conf->scanPtr = scan.m_userPtr;
989
conf->accOperationPtr = RNIL;
990
conf->fragId = frag.fragmentId;
991
conf->localKey[0] = pos.m_key_mm.ref();
992
conf->localKey[1] = 0;
993
conf->localKeyLength = 1;
994
conf->gci = foundGCI;
995
Uint32 blockNo = refToBlock(scan.m_userRef);
996
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
999
// TUPKEYREQ handles savepoint stuff
1001
scan.m_state = ScanOp::Next;
1004
break; // incr loop count
1009
if (++loop_count >= 32)
1012
// TODO: at drop table we have to flush and terminate these
1014
signal->theData[0] = ZTUP_SCAN;
1015
signal->theData[1] = scanPtr.i;
1016
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1021
tmp.assref(lcp_list);
1022
tmp.m_page_no = getRealpid(fragPtr.p, tmp.m_page_no);
1025
c_page_pool.getPtr(pagePtr, tmp.m_page_no);
1026
Tuple_header* ptr = (Tuple_header*)
1027
((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
1028
Uint32 headerbits = ptr->m_header_bits;
1029
ndbrequire(headerbits & Tuple_header::LCP_KEEP);
1031
Uint32 next = ptr->m_operation_ptr_i;
1032
ptr->m_operation_ptr_i = RNIL;
1033
ptr->m_header_bits = headerbits & ~(Uint32)Tuple_header::FREE;
1035
if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
1037
setChecksum(ptr, tablePtr.p);
1040
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
1041
conf->scanPtr = scan.m_userPtr;
1042
conf->accOperationPtr = (Uint32)-1;
1043
conf->fragId = frag.fragmentId;
1044
conf->localKey[0] = lcp_list;
1045
conf->localKey[1] = 0;
1046
conf->localKeyLength = 1;
1048
Uint32 blockNo = refToBlock(scan.m_userRef);
1049
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
1051
fragPtr.p->m_lcp_keep_list = next;
1052
ptr->m_header_bits |= Tuple_header::FREED; // RESTORE free flag
1053
if (headerbits & Tuple_header::FREED)
1055
if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
1058
free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
1061
free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
1068
Dbtup::scanCont(Signal* signal, ScanOpPtr scanPtr)
1070
bool immediate = scanNext(signal, scanPtr);
1073
// time-slicing again
1076
scanReply(signal, scanPtr);
1080
Dbtup::disk_page_tup_scan_callback(Signal* signal, Uint32 scanPtrI, Uint32 page_i)
1083
c_scanOpPool.getPtr(scanPtr, scanPtrI);
1084
ScanOp& scan = *scanPtr.p;
1085
ScanPos& pos = scan.m_scanPos;
1087
Ptr<GlobalPage> gptr;
1088
m_global_page_pool.getPtr(gptr, page_i);
1089
pos.m_page = (Page*)gptr.p;
1091
scanCont(signal, scanPtr);
1095
Dbtup::scanClose(Signal* signal, ScanOpPtr scanPtr)
1097
ScanOp& scan = *scanPtr.p;
1098
ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) && scan.m_accLockOp == RNIL);
1099
// unlock all not unlocked by LQH
1100
LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1101
ScanLockPtr lockPtr;
1102
while (list.first(lockPtr)) {
1104
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
1105
lockReq->returnCode = RNIL;
1106
lockReq->requestInfo = AccLockReq::Abort;
1107
lockReq->accOpPtr = lockPtr.p->m_accLockOp;
1108
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
1110
ndbrequire(lockReq->returnCode == AccLockReq::Success);
1111
list.release(lockPtr);
1114
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
1115
conf->scanPtr = scanPtr.p->m_userPtr;
1116
conf->accOperationPtr = RNIL;
1117
conf->fragId = RNIL;
1118
unsigned signalLength = 3;
1119
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
1120
signal, signalLength, JBB);
1121
releaseScanOp(scanPtr);
1125
Dbtup::addAccLockOp(ScanOp& scan, Uint32 accLockOp)
1127
LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1128
ScanLockPtr lockPtr;
1130
list.first(lockPtr);
1131
while (lockPtr.i != RNIL) {
1132
ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
1136
bool ok = list.seize(lockPtr);
1138
lockPtr.p->m_accLockOp = accLockOp;
1142
Dbtup::removeAccLockOp(ScanOp& scan, Uint32 accLockOp)
1144
LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1145
ScanLockPtr lockPtr;
1146
list.first(lockPtr);
1147
while (lockPtr.i != RNIL) {
1148
if (lockPtr.p->m_accLockOp == accLockOp) {
1154
ndbrequire(lockPtr.i != RNIL);
1155
list.release(lockPtr);
1159
Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
1161
FragrecordPtr fragPtr;
1162
fragPtr.i = scanPtr.p->m_fragPtrI;
1163
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
1165
if(scanPtr.p->m_bits & ScanOp::SCAN_LCP)
1168
fragPtr.p->m_lcp_scan_op = RNIL;
1169
scanPtr.p->m_fragPtrI = RNIL;
1174
LocalDLList<ScanOp> list(c_scanOpPool, fragPtr.p->m_scanList);
1175
list.release(scanPtr);
1180
Dbtup::execLCP_FRAG_ORD(Signal* signal)
1182
LcpFragOrd* req= (LcpFragOrd*)signal->getDataPtr();
1184
TablerecPtr tablePtr;
1185
tablePtr.i = req->tableId;
1186
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
1188
if (tablePtr.p->m_no_of_disk_attributes)
1191
FragrecordPtr fragPtr;
1192
Uint32 fragId = req->fragmentId;
1194
getFragmentrec(fragPtr, fragId, tablePtr.p);
1195
ndbrequire(fragPtr.i != RNIL);
1196
Fragrecord& frag = *fragPtr.p;
1198
ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
1199
frag.m_lcp_scan_op = c_lcp_scan_op;
1201
c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
1202
ndbrequire(scanPtr.p->m_fragPtrI == RNIL);
1203
scanPtr.p->m_fragPtrI = fragPtr.i;
1205
scanFirst(signal, scanPtr);
1206
scanPtr.p->m_state = ScanOp::First;