1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; either version 2 of the License, or
6
(at your option) any later version.
8
This program is distributed in the hope that it will be useful,
9
but WITHOUT ANY WARRANTY; without even the implied warranty of
10
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
GNU General Public License for more details.
13
You should have received a copy of the GNU General Public License
14
along with this program; if not, write to the Free Software
15
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
#include <NdbDictionaryImpl.hpp>
19
#include <NdbTransaction.hpp>
20
#include <NdbOperation.hpp>
21
#include <NdbIndexOperation.hpp>
22
#include <NdbRecAttr.hpp>
23
#include <NdbBlob.hpp>
24
#include "NdbBlobImpl.hpp"
25
#include <NdbScanOperation.hpp>
26
#include <signaldata/TcKeyReq.hpp>
29
* Reading index table directly (as a table) is faster but there are
30
* bugs or limitations. Keep the code and make possible to choose.
32
static const bool g_ndb_blob_ok_to_read_index_table = false;
37
NdbBlob::setState(State newState)
39
DBUG_ENTER("NdbBlob::setState");
40
DBUG_PRINT("info", ("this=%p newState=%u", this, newState));
48
NdbBlob::getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName)
50
NdbTableImpl* t = anNdb->theDictionary->m_impl.getTable(tableName);
53
NdbColumnImpl* c = t->getColumn(columnName);
56
getBlobTableName(btname, t, c);
61
NdbBlob::getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c)
63
assert(t != 0 && c != 0 && c->getBlobType());
64
memset(btname, 0, NdbBlobImpl::BlobTableNameSize);
65
sprintf(btname, "NDB$BLOB_%d_%d", (int)t->m_tableId, (int)c->m_attrId);
69
NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c)
71
char btname[NdbBlobImpl::BlobTableNameSize];
72
getBlobTableName(btname, t, c);
74
bt.setLogging(t->getLogging());
75
bt.setFragmentType(t->getFragmentType());
76
{ NdbDictionary::Column bc("PK");
77
bc.setType(NdbDictionary::Column::Unsigned);
78
assert(t->m_keyLenInWords != 0);
79
bc.setLength(t->m_keyLenInWords);
80
bc.setPrimaryKey(true);
81
bc.setDistributionKey(true);
84
{ NdbDictionary::Column bc("DIST");
85
bc.setType(NdbDictionary::Column::Unsigned);
86
bc.setPrimaryKey(true);
87
bc.setDistributionKey(true);
90
{ NdbDictionary::Column bc("PART");
91
bc.setType(NdbDictionary::Column::Unsigned);
92
bc.setPrimaryKey(true);
93
bc.setDistributionKey(false);
96
{ NdbDictionary::Column bc("DATA");
98
case NdbDictionary::Column::Blob:
99
bc.setType(NdbDictionary::Column::Binary);
101
case NdbDictionary::Column::Text:
102
bc.setType(NdbDictionary::Column::Char);
108
bc.setLength(c->getPartSize());
115
NdbBlob::NdbBlob(Ndb*)
128
theAccessTable = NULL;
140
thePendingBlobOps = 0;
141
theActiveHook = NULL;
142
theActiveHookArg = NULL;
144
theInlineData = NULL;
145
theHeadInlineRecAttr = NULL;
146
theHeadInlineReadOp = NULL;
147
theHeadInlineUpdateFlag = false;
162
NdbBlob::Buf::Buf() :
175
NdbBlob::Buf::alloc(unsigned n)
187
memset(data, 'X', maxsize);
192
NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src)
194
assert(size == src.size);
195
memcpy(data, src.data, size);
198
// classify operations (inline)
203
return theTable == theAccessTable;
209
return theTable != theAccessTable;
216
theNdbOp->theOperationType == NdbOperation::InsertRequest ||
217
theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
218
theNdbOp->theOperationType == NdbOperation::WriteRequest ||
219
theNdbOp->theOperationType == NdbOperation::ReadRequest ||
220
theNdbOp->theOperationType == NdbOperation::ReadExclusive ||
221
theNdbOp->theOperationType == NdbOperation::DeleteRequest;
228
theNdbOp->theOperationType == NdbOperation::ReadRequest ||
229
theNdbOp->theOperationType == NdbOperation::ReadExclusive;
233
NdbBlob::isInsertOp()
236
theNdbOp->theOperationType == NdbOperation::InsertRequest;
240
NdbBlob::isUpdateOp()
243
theNdbOp->theOperationType == NdbOperation::UpdateRequest;
250
theNdbOp->theOperationType == NdbOperation::WriteRequest;
254
NdbBlob::isDeleteOp()
257
theNdbOp->theOperationType == NdbOperation::DeleteRequest;
264
theNdbOp->theOperationType == NdbOperation::OpenScanRequest ||
265
theNdbOp->theOperationType == NdbOperation::OpenRangeScanRequest;
269
NdbBlob::isReadOnlyOp()
272
theNdbOp->theOperationType == NdbOperation::InsertRequest ||
273
theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
274
theNdbOp->theOperationType == NdbOperation::WriteRequest
279
NdbBlob::isTakeOverOp()
282
TcKeyReq::getTakeOverScanFlag(theNdbOp->theScanInfo);
285
// computations (inline)
288
NdbBlob::getPartNumber(Uint64 pos)
290
assert(thePartSize != 0 && pos >= theInlineSize);
291
return (pos - theInlineSize) / thePartSize;
295
NdbBlob::getPartCount()
297
if (theLength <= theInlineSize)
299
return 1 + getPartNumber(theLength - 1);
303
NdbBlob::getDistKey(Uint32 part)
305
assert(theStripeSize != 0);
306
return (part / theStripeSize) % theStripeSize;
309
// getters and setters
312
NdbBlob::getTableKeyValue(NdbOperation* anOp)
314
DBUG_ENTER("NdbBlob::getTableKeyValue");
315
Uint32* data = (Uint32*)theKeyBuf.data;
317
for (unsigned i = 0; i < theTable->m_columns.size(); i++) {
318
NdbColumnImpl* c = theTable->m_columns[i];
321
unsigned len = c->m_attrSize * c->m_arraySize;
322
if (anOp->getValue_impl(c, (char*)&data[pos]) == NULL) {
326
// odd bytes receive no data and must be zeroed
327
while (len % 4 != 0) {
328
char* p = (char*)&data[pos] + len++;
334
assert(pos == theKeyBuf.size / 4);
339
NdbBlob::setTableKeyValue(NdbOperation* anOp)
341
DBUG_ENTER("NdbBlob::setTableKeyValue");
342
DBUG_DUMP("info", theKeyBuf.data, 4 * theTable->m_keyLenInWords);
343
const Uint32* data = (const Uint32*)theKeyBuf.data;
344
const unsigned columns = theTable->m_columns.size();
346
for (unsigned i = 0; i < columns; i++) {
347
NdbColumnImpl* c = theTable->m_columns[i];
350
unsigned len = c->m_attrSize * c->m_arraySize;
351
if (anOp->equal_impl(c, (const char*)&data[pos], len) == -1) {
355
pos += (len + 3) / 4;
358
assert(pos == theKeyBuf.size / 4);
363
NdbBlob::setAccessKeyValue(NdbOperation* anOp)
365
DBUG_ENTER("NdbBlob::setAccessKeyValue");
366
DBUG_DUMP("info", theAccessKeyBuf.data, 4 * theAccessTable->m_keyLenInWords);
367
const Uint32* data = (const Uint32*)theAccessKeyBuf.data;
368
const unsigned columns = theAccessTable->m_columns.size();
370
for (unsigned i = 0; i < columns; i++) {
371
NdbColumnImpl* c = theAccessTable->m_columns[i];
374
unsigned len = c->m_attrSize * c->m_arraySize;
375
if (anOp->equal_impl(c, (const char*)&data[pos], len) == -1) {
379
pos += (len + 3) / 4;
382
assert(pos == theAccessKeyBuf.size / 4);
387
NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part)
389
DBUG_ENTER("NdbBlob::setPartKeyValue");
390
DBUG_PRINT("info", ("dist=%u part=%u key=", getDistKey(part), part));
391
DBUG_DUMP("info", theKeyBuf.data, 4 * theTable->m_keyLenInWords);
392
Uint32* data = (Uint32*)theKeyBuf.data;
393
unsigned size = theTable->m_keyLenInWords;
394
// TODO use attr ids after compatibility with 4.1.7 not needed
395
if (anOp->equal("PK", theKeyBuf.data) == -1 ||
396
anOp->equal("DIST", getDistKey(part)) == -1 ||
397
anOp->equal("PART", part) == -1) {
405
NdbBlob::getHeadInlineValue(NdbOperation* anOp)
407
DBUG_ENTER("NdbBlob::getHeadInlineValue");
408
theHeadInlineRecAttr = anOp->getValue_impl(theColumn, theHeadInlineBuf.data);
409
if (theHeadInlineRecAttr == NULL) {
417
NdbBlob::getHeadFromRecAttr()
419
DBUG_ENTER("NdbBlob::getHeadFromRecAttr");
420
assert(theHeadInlineRecAttr != NULL);
421
theNullFlag = theHeadInlineRecAttr->isNULL();
422
assert(theNullFlag != -1);
423
theLength = ! theNullFlag ? theHead->length : 0;
428
NdbBlob::setHeadInlineValue(NdbOperation* anOp)
430
DBUG_ENTER("NdbBlob::setHeadInlineValue");
431
theHead->length = theLength;
432
if (theLength < theInlineSize)
433
memset(theInlineData + theLength, 0, theInlineSize - theLength);
434
assert(theNullFlag != -1);
435
const char* aValue = theNullFlag ? 0 : theHeadInlineBuf.data;
436
if (anOp->setValue(theColumn, aValue, theHeadInlineBuf.size) == -1) {
440
theHeadInlineUpdateFlag = false;
447
NdbBlob::getValue(void* data, Uint32 bytes)
449
DBUG_ENTER("NdbBlob::getValue");
450
DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes));
451
if (! isReadOp() && ! isScanOp()) {
452
setErrorCode(NdbBlobImpl::ErrCompat);
455
if (theGetFlag || theState != Prepared) {
456
setErrorCode(NdbBlobImpl::ErrState);
459
if (data == NULL && bytes != 0) {
460
setErrorCode(NdbBlobImpl::ErrUsage);
464
theGetBuf = static_cast<char*>(data);
465
theGetSetBytes = bytes;
470
NdbBlob::setValue(const void* data, Uint32 bytes)
472
DBUG_ENTER("NdbBlob::setValue");
473
DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes));
474
if (isReadOnlyOp()) {
475
setErrorCode(NdbBlobImpl::ErrCompat);
478
if (theSetFlag || theState != Prepared) {
479
setErrorCode(NdbBlobImpl::ErrState);
482
if (data == NULL && bytes != 0) {
483
setErrorCode(NdbBlobImpl::ErrUsage);
487
theSetBuf = static_cast<const char*>(data);
488
theGetSetBytes = bytes;
490
// write inline part now
491
if (theSetBuf != NULL) {
492
Uint32 n = theGetSetBytes;
493
if (n > theInlineSize)
496
if (writeDataPrivate(theSetBuf, n) == -1)
502
if (setHeadInlineValue(theNdbOp) == -1)
511
NdbBlob::setActiveHook(ActiveHook activeHook, void* arg)
513
DBUG_ENTER("NdbBlob::setActiveHook");
514
DBUG_PRINT("info", ("hook=%p arg=%p", (void*)activeHook, arg));
515
if (theState != Prepared) {
516
setErrorCode(NdbBlobImpl::ErrState);
519
theActiveHook = activeHook;
520
theActiveHookArg = arg;
527
NdbBlob::getNull(bool& isNull)
529
DBUG_ENTER("NdbBlob::getNull");
530
if (theState == Prepared && theSetFlag) {
531
isNull = (theSetBuf == NULL);
534
if (theNullFlag == -1) {
535
setErrorCode(NdbBlobImpl::ErrState);
538
isNull = theNullFlag;
545
DBUG_ENTER("NdbBlob::setNull");
546
if (isReadOnlyOp()) {
547
setErrorCode(NdbBlobImpl::ErrCompat);
550
if (theNullFlag == -1) {
551
if (theState == Prepared) {
552
DBUG_RETURN(setValue(0, 0));
554
setErrorCode(NdbBlobImpl::ErrState);
559
if (deleteParts(0, getPartCount()) == -1)
563
theHeadInlineUpdateFlag = true;
568
NdbBlob::getLength(Uint64& len)
570
DBUG_ENTER("NdbBlob::getLength");
571
if (theState == Prepared && theSetFlag) {
572
len = theGetSetBytes;
575
if (theNullFlag == -1) {
576
setErrorCode(NdbBlobImpl::ErrState);
584
NdbBlob::truncate(Uint64 length)
586
DBUG_ENTER("NdbBlob::truncate");
587
DBUG_PRINT("info", ("length=%llu", length));
588
if (isReadOnlyOp()) {
589
setErrorCode(NdbBlobImpl::ErrCompat);
592
if (theNullFlag == -1) {
593
setErrorCode(NdbBlobImpl::ErrState);
596
if (theLength > length) {
597
if (length > theInlineSize) {
598
Uint32 part1 = getPartNumber(length - 1);
599
Uint32 part2 = getPartNumber(theLength - 1);
600
assert(part2 >= part1);
601
if (part2 > part1 && deleteParts(part1 + 1, part2 - part1) == -1)
604
if (deleteParts(0, getPartCount()) == -1)
608
theHeadInlineUpdateFlag = true;
616
NdbBlob::getPos(Uint64& pos)
618
DBUG_ENTER("NdbBlob::getPos");
619
if (theNullFlag == -1) {
620
setErrorCode(NdbBlobImpl::ErrState);
628
NdbBlob::setPos(Uint64 pos)
630
DBUG_ENTER("NdbBlob::setPos");
631
DBUG_PRINT("info", ("pos=%llu", pos));
632
if (theNullFlag == -1) {
633
setErrorCode(NdbBlobImpl::ErrState);
636
if (pos > theLength) {
637
setErrorCode(NdbBlobImpl::ErrSeek);
647
NdbBlob::readData(void* data, Uint32& bytes)
649
DBUG_ENTER("NdbBlob::readData");
650
if (theState != Active) {
651
setErrorCode(NdbBlobImpl::ErrState);
654
char* buf = static_cast<char*>(data);
655
int ret = readDataPrivate(buf, bytes);
660
NdbBlob::readDataPrivate(char* buf, Uint32& bytes)
662
DBUG_ENTER("NdbBlob::readDataPrivate");
663
DBUG_PRINT("info", ("bytes=%u", bytes));
664
assert(thePos <= theLength);
666
if (bytes > theLength - pos)
667
bytes = theLength - pos;
671
if (pos < theInlineSize) {
672
Uint32 n = theInlineSize - pos;
675
memcpy(buf, theInlineData + pos, n);
681
if (len > 0 && thePartSize == 0) {
682
setErrorCode(NdbBlobImpl::ErrSeek);
686
assert(pos >= theInlineSize);
687
Uint32 off = (pos - theInlineSize) % thePartSize;
688
// partial first block
690
DBUG_PRINT("info", ("partial first block pos=%llu len=%u", pos, len));
691
Uint32 part = (pos - theInlineSize) / thePartSize;
692
if (readParts(thePartBuf.data, part, 1) == -1)
695
if (executePendingBlobReads() == -1)
697
Uint32 n = thePartSize - off;
700
memcpy(buf, thePartBuf.data + off, n);
707
assert((pos - theInlineSize) % thePartSize == 0);
708
// complete blocks in the middle
709
if (len >= thePartSize) {
710
Uint32 part = (pos - theInlineSize) / thePartSize;
711
Uint32 count = len / thePartSize;
712
if (readParts(buf, part, count) == -1)
714
Uint32 n = thePartSize * count;
721
// partial last block
722
DBUG_PRINT("info", ("partial last block pos=%llu len=%u", pos, len));
723
assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize);
724
Uint32 part = (pos - theInlineSize) / thePartSize;
725
if (readParts(thePartBuf.data, part, 1) == -1)
728
if (executePendingBlobReads() == -1)
730
memcpy(buf, thePartBuf.data, len);
738
assert(thePos <= theLength);
743
NdbBlob::writeData(const void* data, Uint32 bytes)
745
DBUG_ENTER("NdbBlob::writeData");
746
if (isReadOnlyOp()) {
747
setErrorCode(NdbBlobImpl::ErrCompat);
750
if (theState != Active) {
751
setErrorCode(NdbBlobImpl::ErrState);
754
const char* buf = static_cast<const char*>(data);
755
int ret = writeDataPrivate(buf, bytes);
760
NdbBlob::writeDataPrivate(const char* buf, Uint32 bytes)
762
DBUG_ENTER("NdbBlob::writeDataPrivate");
763
DBUG_PRINT("info", ("bytes=%u", bytes));
764
assert(thePos <= theLength);
767
// any write makes blob not NULL
770
theHeadInlineUpdateFlag = true;
774
if (pos < theInlineSize) {
775
Uint32 n = theInlineSize - pos;
778
memcpy(theInlineData + pos, buf, n);
779
theHeadInlineUpdateFlag = true;
785
if (len > 0 && thePartSize == 0) {
786
setErrorCode(NdbBlobImpl::ErrSeek);
790
assert(pos >= theInlineSize);
791
Uint32 off = (pos - theInlineSize) % thePartSize;
792
// partial first block
794
DBUG_PRINT("info", ("partial first block pos=%llu len=%u", pos, len));
795
// flush writes to guarantee correct read
796
if (executePendingBlobWrites() == -1)
798
Uint32 part = (pos - theInlineSize) / thePartSize;
799
if (readParts(thePartBuf.data, part, 1) == -1)
802
if (executePendingBlobReads() == -1)
804
Uint32 n = thePartSize - off;
806
memset(thePartBuf.data + off + len, theFillChar, n - len);
809
memcpy(thePartBuf.data + off, buf, n);
810
if (updateParts(thePartBuf.data, part, 1) == -1)
818
assert((pos - theInlineSize) % thePartSize == 0);
819
// complete blocks in the middle
820
if (len >= thePartSize) {
821
Uint32 part = (pos - theInlineSize) / thePartSize;
822
Uint32 count = len / thePartSize;
823
for (unsigned i = 0; i < count; i++) {
824
if (part + i < getPartCount()) {
825
if (updateParts(buf, part + i, 1) == -1)
828
if (insertParts(buf, part + i, 1) == -1)
831
Uint32 n = thePartSize;
839
// partial last block
840
DBUG_PRINT("info", ("partial last block pos=%llu len=%u", pos, len));
841
assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize);
842
Uint32 part = (pos - theInlineSize) / thePartSize;
843
if (theLength > pos + len) {
844
// flush writes to guarantee correct read
845
if (executePendingBlobWrites() == -1)
847
if (readParts(thePartBuf.data, part, 1) == -1)
850
if (executePendingBlobReads() == -1)
852
memcpy(thePartBuf.data, buf, len);
853
if (updateParts(thePartBuf.data, part, 1) == -1)
856
memcpy(thePartBuf.data, buf, len);
857
memset(thePartBuf.data + len, theFillChar, thePartSize - len);
858
if (part < getPartCount()) {
859
if (updateParts(thePartBuf.data, part, 1) == -1)
862
if (insertParts(thePartBuf.data, part, 1) == -1)
872
if (theLength < pos) {
874
theHeadInlineUpdateFlag = true;
877
assert(thePos <= theLength);
882
NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
884
DBUG_ENTER("NdbBlob::readParts");
885
DBUG_PRINT("info", ("part=%u count=%u", part, count));
888
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
890
tOp->committedRead() == -1 ||
891
setPartKeyValue(tOp, part + n) == -1 ||
892
tOp->getValue((Uint32)3, buf) == NULL) {
896
tOp->m_abortOption = NdbTransaction::AbortOnError;
899
thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
900
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
906
NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
908
DBUG_ENTER("NdbBlob::insertParts");
909
DBUG_PRINT("info", ("part=%u count=%u", part, count));
912
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
914
tOp->insertTuple() == -1 ||
915
setPartKeyValue(tOp, part + n) == -1 ||
916
tOp->setValue((Uint32)3, buf) == -1) {
920
tOp->m_abortOption = NdbTransaction::AbortOnError;
923
thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
924
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
930
NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
932
DBUG_ENTER("NdbBlob::updateParts");
933
DBUG_PRINT("info", ("part=%u count=%u", part, count));
936
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
938
tOp->updateTuple() == -1 ||
939
setPartKeyValue(tOp, part + n) == -1 ||
940
tOp->setValue((Uint32)3, buf) == -1) {
944
tOp->m_abortOption = NdbTransaction::AbortOnError;
947
thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
948
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
954
NdbBlob::deleteParts(Uint32 part, Uint32 count)
956
DBUG_ENTER("NdbBlob::deleteParts");
957
DBUG_PRINT("info", ("part=%u count=%u", part, count));
960
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
962
tOp->deleteTuple() == -1 ||
963
setPartKeyValue(tOp, part + n) == -1) {
967
tOp->m_abortOption = NdbTransaction::AbortOnError;
969
thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
970
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
976
* Number of blob parts not known. Used to check for race condition
977
* when writeTuple is used for insert. Deletes all parts found.
980
NdbBlob::deletePartsUnknown(Uint32 part)
982
DBUG_ENTER("NdbBlob::deletePartsUnknown");
983
DBUG_PRINT("info", ("part=%u count=all", part));
984
if (thePartSize == 0) // tinyblob
986
static const unsigned maxbat = 256;
987
static const unsigned minbat = 1;
988
unsigned bat = minbat;
989
NdbOperation* tOpList[maxbat];
995
NdbOperation*& tOp = tOpList[n]; // ref
996
tOp = theNdbCon->getNdbOperation(theBlobTable);
998
tOp->deleteTuple() == -1 ||
999
setPartKeyValue(tOp, part + count + n) == -1) {
1003
tOp->m_abortOption= NdbTransaction::AO_IgnoreError;
1006
DBUG_PRINT("info", ("bat=%u", bat));
1007
if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1)
1011
NdbOperation* tOp = tOpList[n];
1012
if (tOp->theError.code != 0) {
1013
if (tOp->theError.code != 626) {
1017
// first non-existent part
1018
DBUG_PRINT("info", ("count=%u", count));
1033
NdbBlob::executePendingBlobReads()
1035
DBUG_ENTER("NdbBlob::executePendingBlobReads");
1036
Uint8 flags = (1 << NdbOperation::ReadRequest);
1037
if (thePendingBlobOps & flags) {
1038
if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1)
1040
thePendingBlobOps = 0;
1041
theNdbCon->thePendingBlobOps = 0;
1047
NdbBlob::executePendingBlobWrites()
1049
DBUG_ENTER("NdbBlob::executePendingBlobWrites");
1050
Uint8 flags = 0xFF & ~(1 << NdbOperation::ReadRequest);
1051
if (thePendingBlobOps & flags) {
1052
if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1)
1054
thePendingBlobOps = 0;
1055
theNdbCon->thePendingBlobOps = 0;
1063
NdbBlob::invokeActiveHook()
1065
DBUG_ENTER("NdbBlob::invokeActiveHook");
1066
assert(theState == Active && theActiveHook != NULL);
1067
int ret = (*theActiveHook)(this, theActiveHookArg);
1069
// no error is set on blob level
1075
// blob handle maintenance
1078
* Prepare blob handle linked to an operation. Checks blob table.
1079
* Allocates buffers. For key operation fetches key data from signal
1080
* data. For read operation adds read of head+inline.
1083
NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn)
1085
DBUG_ENTER("NdbBlob::atPrepare");
1086
DBUG_PRINT("info", ("this=%p op=%p con=%p", this, anOp, aCon));
1087
assert(theState == Idle);
1089
theNdb = anOp->theNdb;
1090
theNdbCon = aCon; // for scan, this is the real transaction (m_transConnection)
1092
theTable = anOp->m_currentTable;
1093
theAccessTable = anOp->m_accessTable;
1094
theColumn = aColumn;
1095
NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined;
1096
switch (theColumn->getType()) {
1097
case NdbDictionary::Column::Blob:
1098
partType = NdbDictionary::Column::Binary;
1101
case NdbDictionary::Column::Text:
1102
partType = NdbDictionary::Column::Char;
1106
setErrorCode(NdbBlobImpl::ErrUsage);
1110
theInlineSize = theColumn->getInlineSize();
1111
thePartSize = theColumn->getPartSize();
1112
theStripeSize = theColumn->getStripeSize();
1114
assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
1115
assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
1116
if (thePartSize > 0) {
1117
const NdbDictionary::Table* bt = NULL;
1118
const NdbDictionary::Column* bc = NULL;
1119
if (theStripeSize == 0 ||
1120
(bt = theColumn->getBlobTable()) == NULL ||
1121
(bc = bt->getColumn("DATA")) == NULL ||
1122
bc->getType() != partType ||
1123
bc->getLength() != (int)thePartSize) {
1124
setErrorCode(NdbBlobImpl::ErrTable);
1127
theBlobTable = &NdbTableImpl::getImpl(*bt);
1130
theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
1131
theAccessKeyBuf.alloc(theAccessTable->m_keyLenInWords << 2);
1132
theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
1133
theHeadInlineCopyBuf.alloc(sizeof(Head) + theInlineSize);
1134
thePartBuf.alloc(thePartSize);
1135
theHead = (Head*)theHeadInlineBuf.data;
1136
theInlineData = theHeadInlineBuf.data + sizeof(Head);
1137
// handle different operation types
1138
bool supportedOp = false;
1142
Uint32* data = (Uint32*)theKeyBuf.data;
1143
unsigned size = theTable->m_keyLenInWords;
1144
if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
1145
setErrorCode(NdbBlobImpl::ErrUsage);
1151
Uint32* data = (Uint32*)theAccessKeyBuf.data;
1152
unsigned size = theAccessTable->m_keyLenInWords;
1153
if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
1154
setErrorCode(NdbBlobImpl::ErrUsage);
1159
// upgrade lock mode
1160
if (theNdbOp->theLockMode == NdbOperation::LM_CommittedRead)
1161
theNdbOp->theLockMode = NdbOperation::LM_Read;
1162
// add read of head+inline in this op
1163
if (getHeadInlineValue(theNdbOp) == -1)
1167
// becomes NULL unless set before execute
1172
// becomes NULL unless set before execute
1175
theHeadInlineUpdateFlag = true;
1180
// upgrade lock mode
1181
if (theNdbOp->theLockMode == NdbOperation::LM_CommittedRead)
1182
theNdbOp->theLockMode = NdbOperation::LM_Read;
1183
// add read of head+inline in this op
1184
if (getHeadInlineValue(theNdbOp) == -1)
1188
if (! supportedOp) {
1189
setErrorCode(NdbBlobImpl::ErrUsage);
1197
* Before execute of prepared operation. May add new operations before
1198
* this one. May ask that this operation and all before it (a "batch")
1199
* is executed immediately in no-commit mode. In this case remaining
1200
* prepared operations are saved in a separate list. They are added
1201
* back after postExecute.
1204
NdbBlob::preExecute(NdbTransaction::ExecType anExecType, bool& batch)
1206
DBUG_ENTER("NdbBlob::preExecute");
1207
DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon));
1208
if (theState == Invalid)
1210
assert(theState == Prepared);
1211
// handle different operation types
1214
if (theGetFlag && theGetSetBytes > theInlineSize) {
1215
// need blob head before proceeding
1220
if (theSetFlag && theGetSetBytes > theInlineSize) {
1221
// add ops to write rest of a setValue
1222
assert(theSetBuf != NULL);
1223
const char* buf = theSetBuf + theInlineSize;
1224
Uint32 bytes = theGetSetBytes - theInlineSize;
1225
assert(thePos == theInlineSize);
1226
if (writeDataPrivate(buf, bytes) == -1)
1228
if (theHeadInlineUpdateFlag) {
1229
// add an operation to update head+inline
1230
NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
1232
tOp->updateTuple() == -1 ||
1233
setTableKeyValue(tOp) == -1 ||
1234
setHeadInlineValue(tOp) == -1) {
1235
setErrorCode(NdbBlobImpl::ErrAbort);
1238
DBUG_PRINT("info", ("add op to update head+inline"));
1243
if (isUpdateOp() || isWriteOp() || isDeleteOp()) {
1244
// add operation before this one to read head+inline
1245
NdbOperation* tOp = theNdbCon->getNdbOperation(theTable, theNdbOp);
1247
* If main op is from take over scan lock, the added read is done
1248
* as committed read:
1250
* In normal transactional case, the row is locked by us and
1251
* committed read returns same as normal read.
1253
* In current TRUNCATE TABLE, the deleting trans is committed in
1254
* batches and then restarted with new trans id. A normal read
1255
* would hang on the scan delete lock and then fail.
1257
NdbOperation::LockMode lockMode =
1259
NdbOperation::LM_Read : NdbOperation::LM_CommittedRead;
1261
tOp->readTuple(lockMode) == -1 ||
1262
setTableKeyValue(tOp) == -1 ||
1263
getHeadInlineValue(tOp) == -1) {
1268
tOp->m_abortOption = NdbTransaction::AO_IgnoreError;
1270
theHeadInlineReadOp = tOp;
1271
// execute immediately
1273
DBUG_PRINT("info", ("add op before to read head+inline"));
1277
// add op before this one to read table key
1278
NdbBlob* tFirstBlob = theNdbOp->theBlobList;
1279
if (this == tFirstBlob) {
1280
// first blob does it for all
1281
if (g_ndb_blob_ok_to_read_index_table) {
1282
Uint32 pkAttrId = theAccessTable->getNoOfColumns() - 1;
1283
NdbOperation* tOp = theNdbCon->getNdbOperation(theAccessTable, theNdbOp);
1285
tOp->readTuple() == -1 ||
1286
setAccessKeyValue(tOp) == -1 ||
1287
tOp->getValue(pkAttrId, theKeyBuf.data) == NULL) {
1292
NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp);
1294
tOp->readTuple() == -1 ||
1295
setAccessKeyValue(tOp) == -1 ||
1296
getTableKeyValue(tOp) == -1) {
1302
DBUG_PRINT("info", ("added op before to read table key"));
1303
if (isUpdateOp() || isDeleteOp()) {
1304
// add op before this one to read head+inline via index
1305
NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp);
1307
tOp->readTuple() == -1 ||
1308
setAccessKeyValue(tOp) == -1 ||
1309
getHeadInlineValue(tOp) == -1) {
1314
tOp->m_abortOption = NdbTransaction::AO_IgnoreError;
1316
theHeadInlineReadOp = tOp;
1317
// execute immediately
1319
DBUG_PRINT("info", ("added index op before to read head+inline"));
1322
// XXX until IgnoreError fixed for index op
1328
// write head+inline now
1331
if (theSetBuf != NULL) {
1332
Uint32 n = theGetSetBytes;
1333
if (n > theInlineSize)
1335
assert(thePos == 0);
1336
if (writeDataPrivate(theSetBuf, n) == -1)
1339
if (setHeadInlineValue(theNdbOp) == -1)
1341
// the read op before us may overwrite
1342
theHeadInlineCopyBuf.copyfrom(theHeadInlineBuf);
1345
if (theActiveHook != NULL) {
1346
// need blob head for callback
1349
DBUG_PRINT("info", ("batch=%u", batch));
1354
* After execute, for any operation. If already Active, this routine
1355
* has been done previously. Operations which requested a no-commit
1356
* batch can add new operations after this one. They are added before
1357
* any remaining prepared operations.
1360
NdbBlob::postExecute(NdbTransaction::ExecType anExecType)
1362
DBUG_ENTER("NdbBlob::postExecute");
1363
DBUG_PRINT("info", ("this=%p op=%p con=%p anExecType=%u", this, theNdbOp, theNdbCon, anExecType));
1364
if (theState == Invalid)
1366
if (theState == Active) {
1367
setState(anExecType == NdbTransaction::NoCommit ? Active : Closed);
1368
DBUG_PRINT("info", ("skip active"));
1371
assert(theState == Prepared);
1372
setState(anExecType == NdbTransaction::NoCommit ? Active : Closed);
1375
NdbBlob* tFirstBlob = theNdbOp->theBlobList;
1376
if (this != tFirstBlob) {
1377
// copy key from first blob
1378
assert(theKeyBuf.size == tFirstBlob->theKeyBuf.size);
1379
memcpy(theKeyBuf.data, tFirstBlob->theKeyBuf.data, tFirstBlob->theKeyBuf.size);
1383
getHeadFromRecAttr();
1384
if (setPos(0) == -1)
1387
assert(theGetSetBytes == 0 || theGetBuf != 0);
1388
assert(theGetSetBytes <= theInlineSize ||
1389
anExecType == NdbTransaction::NoCommit);
1390
Uint32 bytes = theGetSetBytes;
1391
if (readDataPrivate(theGetBuf, bytes) == -1)
1396
assert(anExecType == NdbTransaction::NoCommit);
1397
getHeadFromRecAttr();
1399
// setValue overwrites everything
1400
if (theSetBuf != NULL) {
1401
if (truncate(0) == -1)
1403
assert(thePos == 0);
1404
if (writeDataPrivate(theSetBuf, theGetSetBytes) == -1)
1407
if (setNull() == -1)
1412
if (isWriteOp() && isTableOp()) {
1413
assert(anExecType == NdbTransaction::NoCommit);
1414
if (theHeadInlineReadOp->theError.code == 0) {
1415
int tNullFlag = theNullFlag;
1416
Uint64 tLength = theLength;
1417
Uint64 tPos = thePos;
1418
getHeadFromRecAttr();
1419
DBUG_PRINT("info", ("tuple found"));
1420
if (truncate(0) == -1)
1422
// restore previous head+inline
1423
theHeadInlineBuf.copyfrom(theHeadInlineCopyBuf);
1424
theNullFlag = tNullFlag;
1425
theLength = tLength;
1428
if (theHeadInlineReadOp->theError.code != 626) {
1429
setErrorCode(theHeadInlineReadOp);
1432
DBUG_PRINT("info", ("tuple not found"));
1434
* Read found no tuple but it is possible that a tuple was
1435
* created after the read by another transaction. Delete all
1436
* blob parts which may exist.
1438
if (deletePartsUnknown(0) == -1)
1441
if (theSetFlag && theGetSetBytes > theInlineSize) {
1442
assert(theSetBuf != NULL);
1443
const char* buf = theSetBuf + theInlineSize;
1444
Uint32 bytes = theGetSetBytes - theInlineSize;
1445
assert(thePos == theInlineSize);
1446
if (writeDataPrivate(buf, bytes) == -1)
1450
if (isWriteOp() && isIndexOp()) {
1451
// XXX until IgnoreError fixed for index op
1452
if (deletePartsUnknown(0) == -1)
1454
if (theSetFlag && theGetSetBytes > theInlineSize) {
1455
assert(theSetBuf != NULL);
1456
const char* buf = theSetBuf + theInlineSize;
1457
Uint32 bytes = theGetSetBytes - theInlineSize;
1458
assert(thePos == theInlineSize);
1459
if (writeDataPrivate(buf, bytes) == -1)
1464
assert(anExecType == NdbTransaction::NoCommit);
1465
getHeadFromRecAttr();
1466
if (deleteParts(0, getPartCount()) == -1)
1469
setState(anExecType == NdbTransaction::NoCommit ? Active : Closed);
1470
// activation callback
1471
if (theActiveHook != NULL) {
1472
if (invokeActiveHook() == -1)
1475
if (anExecType == NdbTransaction::NoCommit && theHeadInlineUpdateFlag) {
1476
NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
1478
tOp->updateTuple() == -1 ||
1479
setTableKeyValue(tOp) == -1 ||
1480
setHeadInlineValue(tOp) == -1) {
1481
setErrorCode(NdbBlobImpl::ErrAbort);
1484
tOp->m_abortOption = NdbTransaction::AbortOnError;
1485
DBUG_PRINT("info", ("added op to update head+inline"));
1491
* Before commit of completed operation. For write add operation to
1492
* update head+inline.
1495
NdbBlob::preCommit()
1497
DBUG_ENTER("NdbBlob::preCommit");
1498
DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon));
1499
if (theState == Invalid)
1501
assert(theState == Active);
1503
if (isInsertOp() || isUpdateOp() || isWriteOp()) {
1504
if (theHeadInlineUpdateFlag) {
1505
// add an operation to update head+inline
1506
NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
1508
tOp->updateTuple() == -1 ||
1509
setTableKeyValue(tOp) == -1 ||
1510
setHeadInlineValue(tOp) == -1) {
1511
setErrorCode(NdbBlobImpl::ErrAbort);
1514
tOp->m_abortOption = NdbTransaction::AbortOnError;
1515
DBUG_PRINT("info", ("added op to update head+inline"));
1522
* After next scan result. Handle like read op above.
1525
NdbBlob::atNextResult()
1527
DBUG_ENTER("NdbBlob::atNextResult");
1528
DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon));
1529
if (theState == Invalid)
1533
{ Uint32* data = (Uint32*)theKeyBuf.data;
1534
unsigned size = theTable->m_keyLenInWords;
1535
if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) {
1536
setErrorCode(NdbBlobImpl::ErrUsage);
1540
getHeadFromRecAttr();
1541
if (setPos(0) == -1)
1544
assert(theGetSetBytes == 0 || theGetBuf != 0);
1545
Uint32 bytes = theGetSetBytes;
1546
if (readDataPrivate(theGetBuf, bytes) == -1)
1550
// activation callback
1551
if (theActiveHook != NULL) {
1552
if (invokeActiveHook() == -1)
1560
const NdbDictionary::Column*
1561
NdbBlob::getColumn()
1569
NdbBlob::setErrorCode(int anErrorCode, bool invalidFlag)
1571
DBUG_ENTER("NdbBlob::setErrorCode");
1572
DBUG_PRINT("info", ("this=%p code=%u", this, anErrorCode));
1573
theError.code = anErrorCode;
1574
// conditionally copy error to operation level
1575
if (theNdbOp != NULL && theNdbOp->theError.code == 0)
1576
theNdbOp->setErrorCode(theError.code);
1583
NdbBlob::setErrorCode(NdbOperation* anOp, bool invalidFlag)
1586
if (anOp != NULL && (code = anOp->theError.code) != 0)
1588
else if ((code = theNdbCon->theError.code) != 0)
1590
else if ((code = theNdb->theError.code) != 0)
1593
code = NdbBlobImpl::ErrUnknown;
1594
setErrorCode(code, invalidFlag);
1598
NdbBlob::setErrorCode(NdbTransaction* aCon, bool invalidFlag)
1601
if (theNdbCon != NULL && (code = theNdbCon->theError.code) != 0)
1603
else if ((code = theNdb->theError.code) != 0)
1606
code = NdbBlobImpl::ErrUnknown;
1607
setErrorCode(code, invalidFlag);
1610
// info about all blobs in this operation
1613
NdbBlob::blobsFirstBlob()
1615
return theNdbOp->theBlobList;
1619
NdbBlob::blobsNextBlob()
1628
NdbBlob::getOperationType() const
1630
return theNdbOp != NULL ? theNdbOp->theOperationType : -1;
1634
operator<<(NdbOut& out, const NdbBlob& blob)
1636
ndbout << dec << "o=" << blob.getOperationType();
1637
ndbout << dec << " s=" << (Uint32) blob.theState;
1638
ndbout << dec << " n=" << blob.theNullFlag;;
1639
ndbout << dec << " l=" << blob.theLength;
1640
ndbout << dec << " p=" << blob.thePos;
1641
ndbout << dec << " u=" << (Uint32)blob.theHeadInlineUpdateFlag;
1642
ndbout << dec << " g=" << (Uint32)blob.theGetSetBytes;