~vlad-lesin/percona-server/mysql-5.0.33-original

« back to all changes in this revision

Viewing changes to ndb/src/ndbapi/NdbBlob.cpp

  • Committer: Vlad Lesin
  • Date: 2012-07-31 09:21:34 UTC
  • Revision ID: vladislav.lesin@percona.com-20120731092134-zfodx022b7992wsi
VirginĀ 5.0.33

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; either version 2 of the License, or
 
6
   (at your option) any later version.
 
7
 
 
8
   This program is distributed in the hope that it will be useful,
 
9
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
   GNU General Public License for more details.
 
12
 
 
13
   You should have received a copy of the GNU General Public License
 
14
   along with this program; if not, write to the Free Software
 
15
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
16
 
 
17
#include <Ndb.hpp>
 
18
#include <NdbDictionaryImpl.hpp>
 
19
#include <NdbTransaction.hpp>
 
20
#include <NdbOperation.hpp>
 
21
#include <NdbIndexOperation.hpp>
 
22
#include <NdbRecAttr.hpp>
 
23
#include <NdbBlob.hpp>
 
24
#include "NdbBlobImpl.hpp"
 
25
#include <NdbScanOperation.hpp>
 
26
#include <signaldata/TcKeyReq.hpp>
 
27
 
 
28
/*
 
29
 * Reading index table directly (as a table) is faster but there are
 
30
 * bugs or limitations.  Keep the code and make possible to choose.
 
31
 */
 
32
static const bool g_ndb_blob_ok_to_read_index_table = false;
 
33
 
 
34
// state (inline)
 
35
 
 
36
inline void
 
37
NdbBlob::setState(State newState)
 
38
{
 
39
  DBUG_ENTER("NdbBlob::setState");
 
40
  DBUG_PRINT("info", ("this=%p newState=%u", this, newState));
 
41
  theState = newState;
 
42
  DBUG_VOID_RETURN;
 
43
}
 
44
 
 
45
// define blob table
 
46
 
 
47
int
 
48
NdbBlob::getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName)
 
49
{
 
50
  NdbTableImpl* t = anNdb->theDictionary->m_impl.getTable(tableName);
 
51
  if (t == NULL)
 
52
    return -1;
 
53
  NdbColumnImpl* c = t->getColumn(columnName);
 
54
  if (c == NULL)
 
55
    return -1;
 
56
  getBlobTableName(btname, t, c);
 
57
  return 0;
 
58
}
 
59
 
 
60
void
 
61
NdbBlob::getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c)
 
62
{
 
63
  assert(t != 0 && c != 0 && c->getBlobType());
 
64
  memset(btname, 0, NdbBlobImpl::BlobTableNameSize);
 
65
  sprintf(btname, "NDB$BLOB_%d_%d", (int)t->m_tableId, (int)c->m_attrId);
 
66
}
 
67
 
 
68
void
 
69
NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c)
 
70
{
 
71
  char btname[NdbBlobImpl::BlobTableNameSize];
 
72
  getBlobTableName(btname, t, c);
 
73
  bt.setName(btname);
 
74
  bt.setLogging(t->getLogging());
 
75
  bt.setFragmentType(t->getFragmentType());
 
76
  { NdbDictionary::Column bc("PK");
 
77
    bc.setType(NdbDictionary::Column::Unsigned);
 
78
    assert(t->m_keyLenInWords != 0);
 
79
    bc.setLength(t->m_keyLenInWords);
 
80
    bc.setPrimaryKey(true);
 
81
    bc.setDistributionKey(true);
 
82
    bt.addColumn(bc);
 
83
  }
 
84
  { NdbDictionary::Column bc("DIST");
 
85
    bc.setType(NdbDictionary::Column::Unsigned);
 
86
    bc.setPrimaryKey(true);
 
87
    bc.setDistributionKey(true);
 
88
    bt.addColumn(bc);
 
89
  }
 
90
  { NdbDictionary::Column bc("PART");
 
91
    bc.setType(NdbDictionary::Column::Unsigned);
 
92
    bc.setPrimaryKey(true);
 
93
    bc.setDistributionKey(false);
 
94
    bt.addColumn(bc);
 
95
  }
 
96
  { NdbDictionary::Column bc("DATA");
 
97
    switch (c->m_type) {
 
98
    case NdbDictionary::Column::Blob:
 
99
      bc.setType(NdbDictionary::Column::Binary);
 
100
      break;
 
101
    case NdbDictionary::Column::Text:
 
102
      bc.setType(NdbDictionary::Column::Char);
 
103
      break;
 
104
    default:
 
105
      assert(false);
 
106
      break;
 
107
    }
 
108
    bc.setLength(c->getPartSize());
 
109
    bt.addColumn(bc);
 
110
  }
 
111
}
 
112
 
 
113
// initialization
 
114
 
 
115
NdbBlob::NdbBlob(Ndb*)
 
116
{
 
117
  init();
 
118
}
 
119
 
 
120
void
 
121
NdbBlob::init()
 
122
{
 
123
  theState = Idle;
 
124
  theNdb = NULL;
 
125
  theNdbCon = NULL;
 
126
  theNdbOp = NULL;
 
127
  theTable = NULL;
 
128
  theAccessTable = NULL;
 
129
  theBlobTable = NULL;
 
130
  theColumn = NULL;
 
131
  theFillChar = 0;
 
132
  theInlineSize = 0;
 
133
  thePartSize = 0;
 
134
  theStripeSize = 0;
 
135
  theGetFlag = false;
 
136
  theGetBuf = NULL;
 
137
  theSetFlag = false;
 
138
  theSetBuf = NULL;
 
139
  theGetSetBytes = 0;
 
140
  thePendingBlobOps = 0;
 
141
  theActiveHook = NULL;
 
142
  theActiveHookArg = NULL;
 
143
  theHead = NULL;
 
144
  theInlineData = NULL;
 
145
  theHeadInlineRecAttr = NULL;
 
146
  theHeadInlineReadOp = NULL;
 
147
  theHeadInlineUpdateFlag = false;
 
148
  theNullFlag = -1;
 
149
  theLength = 0;
 
150
  thePos = 0;
 
151
  theNext = NULL;
 
152
}
 
153
 
 
154
void
 
155
NdbBlob::release()
 
156
{
 
157
  setState(Idle);
 
158
}
 
159
 
 
160
// buffers
 
161
 
 
162
NdbBlob::Buf::Buf() :
 
163
  data(NULL),
 
164
  size(0),
 
165
  maxsize(0)
 
166
{
 
167
}
 
168
 
 
169
NdbBlob::Buf::~Buf()
 
170
{
 
171
  delete [] data;
 
172
}
 
173
 
 
174
void
 
175
NdbBlob::Buf::alloc(unsigned n)
 
176
{
 
177
  size = n;
 
178
  if (maxsize < n) {
 
179
    delete [] data;
 
180
    // align to Uint64
 
181
    if (n % 8 != 0)
 
182
      n += 8 - n % 8;
 
183
    data = new char [n];
 
184
    maxsize = n;
 
185
  }
 
186
#ifdef VM_TRACE
 
187
  memset(data, 'X', maxsize);
 
188
#endif
 
189
}
 
190
 
 
191
void
 
192
NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src)
 
193
{
 
194
  assert(size == src.size);
 
195
  memcpy(data, src.data, size);
 
196
}
 
197
 
 
198
// classify operations (inline)
 
199
 
 
200
inline bool
 
201
NdbBlob::isTableOp()
 
202
{
 
203
  return theTable == theAccessTable;
 
204
}
 
205
 
 
206
inline bool
 
207
NdbBlob::isIndexOp()
 
208
{
 
209
  return theTable != theAccessTable;
 
210
}
 
211
 
 
212
inline bool
 
213
NdbBlob::isKeyOp()
 
214
{
 
215
  return
 
216
    theNdbOp->theOperationType == NdbOperation::InsertRequest ||
 
217
    theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
 
218
    theNdbOp->theOperationType == NdbOperation::WriteRequest ||
 
219
    theNdbOp->theOperationType == NdbOperation::ReadRequest ||
 
220
    theNdbOp->theOperationType == NdbOperation::ReadExclusive ||
 
221
    theNdbOp->theOperationType == NdbOperation::DeleteRequest;
 
222
}
 
223
 
 
224
inline bool
 
225
NdbBlob::isReadOp()
 
226
{
 
227
  return
 
228
    theNdbOp->theOperationType == NdbOperation::ReadRequest ||
 
229
    theNdbOp->theOperationType == NdbOperation::ReadExclusive;
 
230
}
 
231
 
 
232
inline bool
 
233
NdbBlob::isInsertOp()
 
234
{
 
235
  return
 
236
    theNdbOp->theOperationType == NdbOperation::InsertRequest;
 
237
}
 
238
 
 
239
inline bool
 
240
NdbBlob::isUpdateOp()
 
241
{
 
242
  return
 
243
    theNdbOp->theOperationType == NdbOperation::UpdateRequest;
 
244
}
 
245
 
 
246
inline bool
 
247
NdbBlob::isWriteOp()
 
248
{
 
249
  return
 
250
    theNdbOp->theOperationType == NdbOperation::WriteRequest;
 
251
}
 
252
 
 
253
inline bool
 
254
NdbBlob::isDeleteOp()
 
255
{
 
256
  return
 
257
    theNdbOp->theOperationType == NdbOperation::DeleteRequest;
 
258
}
 
259
 
 
260
inline bool
 
261
NdbBlob::isScanOp()
 
262
{
 
263
  return
 
264
    theNdbOp->theOperationType == NdbOperation::OpenScanRequest ||
 
265
    theNdbOp->theOperationType == NdbOperation::OpenRangeScanRequest;
 
266
}
 
267
 
 
268
inline bool
 
269
NdbBlob::isReadOnlyOp()
 
270
{
 
271
  return ! (
 
272
    theNdbOp->theOperationType == NdbOperation::InsertRequest ||
 
273
    theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
 
274
    theNdbOp->theOperationType == NdbOperation::WriteRequest
 
275
  );
 
276
}
 
277
 
 
278
inline bool
 
279
NdbBlob::isTakeOverOp()
 
280
{
 
281
  return
 
282
    TcKeyReq::getTakeOverScanFlag(theNdbOp->theScanInfo);
 
283
}
 
284
 
 
285
// computations (inline)
 
286
 
 
287
inline Uint32
 
288
NdbBlob::getPartNumber(Uint64 pos)
 
289
{
 
290
  assert(thePartSize != 0 && pos >= theInlineSize);
 
291
  return (pos - theInlineSize) / thePartSize;
 
292
}
 
293
 
 
294
inline Uint32
 
295
NdbBlob::getPartCount()
 
296
{
 
297
  if (theLength <= theInlineSize)
 
298
    return 0;
 
299
  return 1 + getPartNumber(theLength - 1);
 
300
}
 
301
 
 
302
inline Uint32
 
303
NdbBlob::getDistKey(Uint32 part)
 
304
{
 
305
  assert(theStripeSize != 0);
 
306
  return (part / theStripeSize) % theStripeSize;
 
307
}
 
308
 
 
309
// getters and setters
 
310
 
 
311
int
 
312
NdbBlob::getTableKeyValue(NdbOperation* anOp)
 
313
{
 
314
  DBUG_ENTER("NdbBlob::getTableKeyValue");
 
315
  Uint32* data = (Uint32*)theKeyBuf.data;
 
316
  unsigned pos = 0;
 
317
  for (unsigned i = 0; i < theTable->m_columns.size(); i++) {
 
318
    NdbColumnImpl* c = theTable->m_columns[i];
 
319
    assert(c != NULL);
 
320
    if (c->m_pk) {
 
321
      unsigned len = c->m_attrSize * c->m_arraySize;
 
322
      if (anOp->getValue_impl(c, (char*)&data[pos]) == NULL) {
 
323
        setErrorCode(anOp);
 
324
        DBUG_RETURN(-1);
 
325
      }
 
326
      // odd bytes receive no data and must be zeroed
 
327
      while (len % 4 != 0) {
 
328
        char* p = (char*)&data[pos] + len++;
 
329
        *p = 0;
 
330
      }
 
331
      pos += len / 4;
 
332
    }
 
333
  }
 
334
  assert(pos == theKeyBuf.size / 4);
 
335
  DBUG_RETURN(0);
 
336
}
 
337
 
 
338
int
 
339
NdbBlob::setTableKeyValue(NdbOperation* anOp)
 
340
{
 
341
  DBUG_ENTER("NdbBlob::setTableKeyValue");
 
342
  DBUG_DUMP("info", theKeyBuf.data, 4 * theTable->m_keyLenInWords);
 
343
  const Uint32* data = (const Uint32*)theKeyBuf.data;
 
344
  const unsigned columns = theTable->m_columns.size();
 
345
  unsigned pos = 0;
 
346
  for (unsigned i = 0; i < columns; i++) {
 
347
    NdbColumnImpl* c = theTable->m_columns[i];
 
348
    assert(c != NULL);
 
349
    if (c->m_pk) {
 
350
      unsigned len = c->m_attrSize * c->m_arraySize;
 
351
      if (anOp->equal_impl(c, (const char*)&data[pos], len) == -1) {
 
352
        setErrorCode(anOp);
 
353
        DBUG_RETURN(-1);
 
354
      }
 
355
      pos += (len + 3) / 4;
 
356
    }
 
357
  }
 
358
  assert(pos == theKeyBuf.size / 4);
 
359
  DBUG_RETURN(0);
 
360
}
 
361
 
 
362
int
 
363
NdbBlob::setAccessKeyValue(NdbOperation* anOp)
 
364
{
 
365
  DBUG_ENTER("NdbBlob::setAccessKeyValue");
 
366
  DBUG_DUMP("info", theAccessKeyBuf.data, 4 * theAccessTable->m_keyLenInWords);
 
367
  const Uint32* data = (const Uint32*)theAccessKeyBuf.data;
 
368
  const unsigned columns = theAccessTable->m_columns.size();
 
369
  unsigned pos = 0;
 
370
  for (unsigned i = 0; i < columns; i++) {
 
371
    NdbColumnImpl* c = theAccessTable->m_columns[i];
 
372
    assert(c != NULL);
 
373
    if (c->m_pk) {
 
374
      unsigned len = c->m_attrSize * c->m_arraySize;
 
375
      if (anOp->equal_impl(c, (const char*)&data[pos], len) == -1) {
 
376
        setErrorCode(anOp);
 
377
        DBUG_RETURN(-1);
 
378
      }
 
379
      pos += (len + 3) / 4;
 
380
    }
 
381
  }
 
382
  assert(pos == theAccessKeyBuf.size / 4);
 
383
  DBUG_RETURN(0);
 
384
}
 
385
 
 
386
int
 
387
NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part)
 
388
{
 
389
  DBUG_ENTER("NdbBlob::setPartKeyValue");
 
390
  DBUG_PRINT("info", ("dist=%u part=%u key=", getDistKey(part), part));
 
391
  DBUG_DUMP("info", theKeyBuf.data, 4 * theTable->m_keyLenInWords);
 
392
  Uint32* data = (Uint32*)theKeyBuf.data;
 
393
  unsigned size = theTable->m_keyLenInWords;
 
394
  // TODO use attr ids after compatibility with 4.1.7 not needed
 
395
  if (anOp->equal("PK", theKeyBuf.data) == -1 ||
 
396
      anOp->equal("DIST", getDistKey(part)) == -1 ||
 
397
      anOp->equal("PART", part) == -1) {
 
398
    setErrorCode(anOp);
 
399
    DBUG_RETURN(-1);
 
400
  }
 
401
  DBUG_RETURN(0);
 
402
}
 
403
 
 
404
int
 
405
NdbBlob::getHeadInlineValue(NdbOperation* anOp)
 
406
{
 
407
  DBUG_ENTER("NdbBlob::getHeadInlineValue");
 
408
  theHeadInlineRecAttr = anOp->getValue_impl(theColumn, theHeadInlineBuf.data);
 
409
  if (theHeadInlineRecAttr == NULL) {
 
410
    setErrorCode(anOp);
 
411
    DBUG_RETURN(-1);
 
412
  }
 
413
  DBUG_RETURN(0);
 
414
}
 
415
 
 
416
void
 
417
NdbBlob::getHeadFromRecAttr()
 
418
{
 
419
  DBUG_ENTER("NdbBlob::getHeadFromRecAttr");
 
420
  assert(theHeadInlineRecAttr != NULL);
 
421
  theNullFlag = theHeadInlineRecAttr->isNULL();
 
422
  assert(theNullFlag != -1);
 
423
  theLength = ! theNullFlag ? theHead->length : 0;
 
424
  DBUG_VOID_RETURN;
 
425
}
 
426
 
 
427
int
 
428
NdbBlob::setHeadInlineValue(NdbOperation* anOp)
 
429
{
 
430
  DBUG_ENTER("NdbBlob::setHeadInlineValue");
 
431
  theHead->length = theLength;
 
432
  if (theLength < theInlineSize)
 
433
    memset(theInlineData + theLength, 0, theInlineSize - theLength);
 
434
  assert(theNullFlag != -1);
 
435
  const char* aValue = theNullFlag ? 0 : theHeadInlineBuf.data;
 
436
  if (anOp->setValue(theColumn, aValue, theHeadInlineBuf.size) == -1) {
 
437
    setErrorCode(anOp);
 
438
    DBUG_RETURN(-1);
 
439
  }
 
440
  theHeadInlineUpdateFlag = false;
 
441
  DBUG_RETURN(0);
 
442
}
 
443
 
 
444
// getValue/setValue
 
445
 
 
446
int
 
447
NdbBlob::getValue(void* data, Uint32 bytes)
 
448
{
 
449
  DBUG_ENTER("NdbBlob::getValue");
 
450
  DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes));
 
451
  if (! isReadOp() && ! isScanOp()) {
 
452
    setErrorCode(NdbBlobImpl::ErrCompat);
 
453
    DBUG_RETURN(-1);
 
454
  }
 
455
  if (theGetFlag || theState != Prepared) {
 
456
    setErrorCode(NdbBlobImpl::ErrState);
 
457
    DBUG_RETURN(-1);
 
458
  }
 
459
  if (data == NULL && bytes != 0) {
 
460
    setErrorCode(NdbBlobImpl::ErrUsage);
 
461
    DBUG_RETURN(-1);
 
462
  }
 
463
  theGetFlag = true;
 
464
  theGetBuf = static_cast<char*>(data);
 
465
  theGetSetBytes = bytes;
 
466
  DBUG_RETURN(0);
 
467
}
 
468
 
 
469
int
 
470
NdbBlob::setValue(const void* data, Uint32 bytes)
 
471
{
 
472
  DBUG_ENTER("NdbBlob::setValue");
 
473
  DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes));
 
474
  if (isReadOnlyOp()) {
 
475
    setErrorCode(NdbBlobImpl::ErrCompat);
 
476
    DBUG_RETURN(-1);
 
477
  }
 
478
  if (theSetFlag || theState != Prepared) {
 
479
    setErrorCode(NdbBlobImpl::ErrState);
 
480
    DBUG_RETURN(-1);
 
481
  }
 
482
  if (data == NULL && bytes != 0) {
 
483
    setErrorCode(NdbBlobImpl::ErrUsage);
 
484
    DBUG_RETURN(-1);
 
485
  }
 
486
  theSetFlag = true;
 
487
  theSetBuf = static_cast<const char*>(data);
 
488
  theGetSetBytes = bytes;
 
489
  if (isInsertOp()) {
 
490
    // write inline part now
 
491
    if (theSetBuf != NULL) {
 
492
      Uint32 n = theGetSetBytes;
 
493
      if (n > theInlineSize)
 
494
        n = theInlineSize;
 
495
      assert(thePos == 0);
 
496
      if (writeDataPrivate(theSetBuf, n) == -1)
 
497
        DBUG_RETURN(-1);
 
498
    } else {
 
499
      theNullFlag = true;
 
500
      theLength = 0;
 
501
    }
 
502
    if (setHeadInlineValue(theNdbOp) == -1)
 
503
      DBUG_RETURN(-1);
 
504
  }
 
505
  DBUG_RETURN(0);
 
506
}
 
507
 
 
508
// activation hook
 
509
 
 
510
int
 
511
NdbBlob::setActiveHook(ActiveHook activeHook, void* arg)
 
512
{
 
513
  DBUG_ENTER("NdbBlob::setActiveHook");
 
514
  DBUG_PRINT("info", ("hook=%p arg=%p", (void*)activeHook, arg));
 
515
  if (theState != Prepared) {
 
516
    setErrorCode(NdbBlobImpl::ErrState);
 
517
    DBUG_RETURN(-1);
 
518
  }
 
519
  theActiveHook = activeHook;
 
520
  theActiveHookArg = arg;
 
521
  DBUG_RETURN(0);
 
522
}
 
523
 
 
524
// misc operations
 
525
 
 
526
int
 
527
NdbBlob::getNull(bool& isNull)
 
528
{
 
529
  DBUG_ENTER("NdbBlob::getNull");
 
530
  if (theState == Prepared && theSetFlag) {
 
531
    isNull = (theSetBuf == NULL);
 
532
    DBUG_RETURN(0);
 
533
  }
 
534
  if (theNullFlag == -1) {
 
535
    setErrorCode(NdbBlobImpl::ErrState);
 
536
    DBUG_RETURN(-1);
 
537
  }
 
538
  isNull = theNullFlag;
 
539
  DBUG_RETURN(0);
 
540
}
 
541
 
 
542
int
 
543
NdbBlob::setNull()
 
544
{
 
545
  DBUG_ENTER("NdbBlob::setNull");
 
546
  if (isReadOnlyOp()) {
 
547
    setErrorCode(NdbBlobImpl::ErrCompat);
 
548
    DBUG_RETURN(-1);
 
549
  }
 
550
  if (theNullFlag == -1) {
 
551
    if (theState == Prepared) {
 
552
      DBUG_RETURN(setValue(0, 0));
 
553
    }
 
554
    setErrorCode(NdbBlobImpl::ErrState);
 
555
    DBUG_RETURN(-1);
 
556
  }
 
557
  if (theNullFlag)
 
558
    DBUG_RETURN(0);
 
559
  if (deleteParts(0, getPartCount()) == -1)
 
560
    DBUG_RETURN(-1);
 
561
  theNullFlag = true;
 
562
  theLength = 0;
 
563
  theHeadInlineUpdateFlag = true;
 
564
  DBUG_RETURN(0);
 
565
}
 
566
 
 
567
int
 
568
NdbBlob::getLength(Uint64& len)
 
569
{
 
570
  DBUG_ENTER("NdbBlob::getLength");
 
571
  if (theState == Prepared && theSetFlag) {
 
572
    len = theGetSetBytes;
 
573
    DBUG_RETURN(0);
 
574
  }
 
575
  if (theNullFlag == -1) {
 
576
    setErrorCode(NdbBlobImpl::ErrState);
 
577
    DBUG_RETURN(-1);
 
578
  }
 
579
  len = theLength;
 
580
  DBUG_RETURN(0);
 
581
}
 
582
 
 
583
int
 
584
NdbBlob::truncate(Uint64 length)
 
585
{
 
586
  DBUG_ENTER("NdbBlob::truncate");
 
587
  DBUG_PRINT("info", ("length=%llu", length));
 
588
  if (isReadOnlyOp()) {
 
589
    setErrorCode(NdbBlobImpl::ErrCompat);
 
590
    DBUG_RETURN(-1);
 
591
  }
 
592
  if (theNullFlag == -1) {
 
593
    setErrorCode(NdbBlobImpl::ErrState);
 
594
    DBUG_RETURN(-1);
 
595
  }
 
596
  if (theLength > length) {
 
597
    if (length > theInlineSize) {
 
598
      Uint32 part1 = getPartNumber(length - 1);
 
599
      Uint32 part2 = getPartNumber(theLength - 1);
 
600
      assert(part2 >= part1);
 
601
      if (part2 > part1 && deleteParts(part1 + 1, part2 - part1) == -1)
 
602
        DBUG_RETURN(-1);
 
603
    } else {
 
604
      if (deleteParts(0, getPartCount()) == -1)
 
605
        DBUG_RETURN(-1);
 
606
    }
 
607
    theLength = length;
 
608
    theHeadInlineUpdateFlag = true;
 
609
    if (thePos > length)
 
610
      thePos = length;
 
611
  }
 
612
  DBUG_RETURN(0);
 
613
}
 
614
 
 
615
int
 
616
NdbBlob::getPos(Uint64& pos)
 
617
{
 
618
  DBUG_ENTER("NdbBlob::getPos");
 
619
  if (theNullFlag == -1) {
 
620
    setErrorCode(NdbBlobImpl::ErrState);
 
621
    DBUG_RETURN(-1);
 
622
  }
 
623
  pos = thePos;
 
624
  DBUG_RETURN(0);
 
625
}
 
626
 
 
627
int
 
628
NdbBlob::setPos(Uint64 pos)
 
629
{
 
630
  DBUG_ENTER("NdbBlob::setPos");
 
631
  DBUG_PRINT("info", ("pos=%llu", pos));
 
632
  if (theNullFlag == -1) {
 
633
    setErrorCode(NdbBlobImpl::ErrState);
 
634
    DBUG_RETURN(-1);
 
635
  }
 
636
  if (pos > theLength) {
 
637
    setErrorCode(NdbBlobImpl::ErrSeek);
 
638
    DBUG_RETURN(-1);
 
639
  }
 
640
  thePos = pos;
 
641
  DBUG_RETURN(0);
 
642
}
 
643
 
 
644
// read/write
 
645
 
 
646
int
 
647
NdbBlob::readData(void* data, Uint32& bytes)
 
648
{
 
649
  DBUG_ENTER("NdbBlob::readData");
 
650
  if (theState != Active) {
 
651
    setErrorCode(NdbBlobImpl::ErrState);
 
652
    DBUG_RETURN(-1);
 
653
  }
 
654
  char* buf = static_cast<char*>(data);
 
655
  int ret = readDataPrivate(buf, bytes);
 
656
  DBUG_RETURN(ret);
 
657
}
 
658
 
 
659
int
 
660
NdbBlob::readDataPrivate(char* buf, Uint32& bytes)
 
661
{
 
662
  DBUG_ENTER("NdbBlob::readDataPrivate");
 
663
  DBUG_PRINT("info", ("bytes=%u", bytes));
 
664
  assert(thePos <= theLength);
 
665
  Uint64 pos = thePos;
 
666
  if (bytes > theLength - pos)
 
667
    bytes = theLength - pos;
 
668
  Uint32 len = bytes;
 
669
  if (len > 0) {
 
670
    // inline part
 
671
    if (pos < theInlineSize) {
 
672
      Uint32 n = theInlineSize - pos;
 
673
      if (n > len)
 
674
        n = len;
 
675
      memcpy(buf, theInlineData + pos, n);
 
676
      pos += n;
 
677
      buf += n;
 
678
      len -= n;
 
679
    }
 
680
  }
 
681
  if (len > 0 && thePartSize == 0) {
 
682
    setErrorCode(NdbBlobImpl::ErrSeek);
 
683
    DBUG_RETURN(-1);
 
684
  }
 
685
  if (len > 0) {
 
686
    assert(pos >= theInlineSize);
 
687
    Uint32 off = (pos - theInlineSize) % thePartSize;
 
688
    // partial first block
 
689
    if (off != 0) {
 
690
      DBUG_PRINT("info", ("partial first block pos=%llu len=%u", pos, len));
 
691
      Uint32 part = (pos - theInlineSize) / thePartSize;
 
692
      if (readParts(thePartBuf.data, part, 1) == -1)
 
693
        DBUG_RETURN(-1);
 
694
      // need result now
 
695
      if (executePendingBlobReads() == -1)
 
696
        DBUG_RETURN(-1);
 
697
      Uint32 n = thePartSize - off;
 
698
      if (n > len)
 
699
        n = len;
 
700
      memcpy(buf, thePartBuf.data + off, n);
 
701
      pos += n;
 
702
      buf += n;
 
703
      len -= n;
 
704
    }
 
705
  }
 
706
  if (len > 0) {
 
707
    assert((pos - theInlineSize) % thePartSize == 0);
 
708
    // complete blocks in the middle
 
709
    if (len >= thePartSize) {
 
710
      Uint32 part = (pos - theInlineSize) / thePartSize;
 
711
      Uint32 count = len / thePartSize;
 
712
      if (readParts(buf, part, count) == -1)
 
713
        DBUG_RETURN(-1);
 
714
      Uint32 n = thePartSize * count;
 
715
      pos += n;
 
716
      buf += n;
 
717
      len -= n;
 
718
    }
 
719
  }
 
720
  if (len > 0) {
 
721
    // partial last block
 
722
    DBUG_PRINT("info", ("partial last block pos=%llu len=%u", pos, len));
 
723
    assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize);
 
724
    Uint32 part = (pos - theInlineSize) / thePartSize;
 
725
    if (readParts(thePartBuf.data, part, 1) == -1)
 
726
      DBUG_RETURN(-1);
 
727
    // need result now
 
728
    if (executePendingBlobReads() == -1)
 
729
      DBUG_RETURN(-1);
 
730
    memcpy(buf, thePartBuf.data, len);
 
731
    Uint32 n = len;
 
732
    pos += n;
 
733
    buf += n;
 
734
    len -= n;
 
735
  }
 
736
  assert(len == 0);
 
737
  thePos = pos;
 
738
  assert(thePos <= theLength);
 
739
  DBUG_RETURN(0);
 
740
}
 
741
 
 
742
int
 
743
NdbBlob::writeData(const void* data, Uint32 bytes)
 
744
{
 
745
  DBUG_ENTER("NdbBlob::writeData");
 
746
  if (isReadOnlyOp()) {
 
747
    setErrorCode(NdbBlobImpl::ErrCompat);
 
748
    DBUG_RETURN(-1);
 
749
  }
 
750
  if (theState != Active) {
 
751
    setErrorCode(NdbBlobImpl::ErrState);
 
752
    DBUG_RETURN(-1);
 
753
  }
 
754
  const char* buf = static_cast<const char*>(data);
 
755
  int ret = writeDataPrivate(buf, bytes);
 
756
  DBUG_RETURN(ret);
 
757
}
 
758
 
 
759
int
 
760
NdbBlob::writeDataPrivate(const char* buf, Uint32 bytes)
 
761
{
 
762
  DBUG_ENTER("NdbBlob::writeDataPrivate");
 
763
  DBUG_PRINT("info", ("bytes=%u", bytes));
 
764
  assert(thePos <= theLength);
 
765
  Uint64 pos = thePos;
 
766
  Uint32 len = bytes;
 
767
  // any write makes blob not NULL
 
768
  if (theNullFlag) {
 
769
    theNullFlag = false;
 
770
    theHeadInlineUpdateFlag = true;
 
771
  }
 
772
  if (len > 0) {
 
773
    // inline part
 
774
    if (pos < theInlineSize) {
 
775
      Uint32 n = theInlineSize - pos;
 
776
      if (n > len)
 
777
        n = len;
 
778
      memcpy(theInlineData + pos, buf, n);
 
779
      theHeadInlineUpdateFlag = true;
 
780
      pos += n;
 
781
      buf += n;
 
782
      len -= n;
 
783
    }
 
784
  }
 
785
  if (len > 0 && thePartSize == 0) {
 
786
    setErrorCode(NdbBlobImpl::ErrSeek);
 
787
    DBUG_RETURN(-1);
 
788
  }
 
789
  if (len > 0) {
 
790
    assert(pos >= theInlineSize);
 
791
    Uint32 off = (pos - theInlineSize) % thePartSize;
 
792
    // partial first block
 
793
    if (off != 0) {
 
794
      DBUG_PRINT("info", ("partial first block pos=%llu len=%u", pos, len));
 
795
      // flush writes to guarantee correct read
 
796
      if (executePendingBlobWrites() == -1)
 
797
        DBUG_RETURN(-1);
 
798
      Uint32 part = (pos - theInlineSize) / thePartSize;
 
799
      if (readParts(thePartBuf.data, part, 1) == -1)
 
800
        DBUG_RETURN(-1);
 
801
      // need result now
 
802
      if (executePendingBlobReads() == -1)
 
803
        DBUG_RETURN(-1);
 
804
      Uint32 n = thePartSize - off;
 
805
      if (n > len) {
 
806
        memset(thePartBuf.data + off + len, theFillChar, n - len);
 
807
        n = len;
 
808
      }
 
809
      memcpy(thePartBuf.data + off, buf, n);
 
810
      if (updateParts(thePartBuf.data, part, 1) == -1)
 
811
        DBUG_RETURN(-1);
 
812
      pos += n;
 
813
      buf += n;
 
814
      len -= n;
 
815
    }
 
816
  }
 
817
  if (len > 0) {
 
818
    assert((pos - theInlineSize) % thePartSize == 0);
 
819
    // complete blocks in the middle
 
820
    if (len >= thePartSize) {
 
821
      Uint32 part = (pos - theInlineSize) / thePartSize;
 
822
      Uint32 count = len / thePartSize;
 
823
      for (unsigned i = 0; i < count; i++) {
 
824
        if (part + i < getPartCount()) {
 
825
          if (updateParts(buf, part + i, 1) == -1)
 
826
            DBUG_RETURN(-1);
 
827
        } else {
 
828
          if (insertParts(buf, part + i, 1) == -1)
 
829
            DBUG_RETURN(-1);
 
830
        }
 
831
        Uint32 n = thePartSize;
 
832
        pos += n;
 
833
        buf += n;
 
834
        len -= n;
 
835
      }
 
836
    }
 
837
  }
 
838
  if (len > 0) {
 
839
    // partial last block
 
840
    DBUG_PRINT("info", ("partial last block pos=%llu len=%u", pos, len));
 
841
    assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize);
 
842
    Uint32 part = (pos - theInlineSize) / thePartSize;
 
843
    if (theLength > pos + len) {
 
844
      // flush writes to guarantee correct read
 
845
      if (executePendingBlobWrites() == -1)
 
846
        DBUG_RETURN(-1);
 
847
      if (readParts(thePartBuf.data, part, 1) == -1)
 
848
        DBUG_RETURN(-1);
 
849
      // need result now
 
850
      if (executePendingBlobReads() == -1)
 
851
        DBUG_RETURN(-1);
 
852
      memcpy(thePartBuf.data, buf, len);
 
853
      if (updateParts(thePartBuf.data, part, 1) == -1)
 
854
        DBUG_RETURN(-1);
 
855
    } else {
 
856
      memcpy(thePartBuf.data, buf, len);
 
857
      memset(thePartBuf.data + len, theFillChar, thePartSize - len);
 
858
      if (part < getPartCount()) {
 
859
        if (updateParts(thePartBuf.data, part, 1) == -1)
 
860
          DBUG_RETURN(-1);
 
861
      } else {
 
862
        if (insertParts(thePartBuf.data, part, 1) == -1)
 
863
          DBUG_RETURN(-1);
 
864
      }
 
865
    }
 
866
    Uint32 n = len;
 
867
    pos += n;
 
868
    buf += n;
 
869
    len -= n;
 
870
  }
 
871
  assert(len == 0);
 
872
  if (theLength < pos) {
 
873
    theLength = pos;
 
874
    theHeadInlineUpdateFlag = true;
 
875
  }
 
876
  thePos = pos;
 
877
  assert(thePos <= theLength);
 
878
  DBUG_RETURN(0);
 
879
}
 
880
 
 
881
int
 
882
NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
 
883
{
 
884
  DBUG_ENTER("NdbBlob::readParts");
 
885
  DBUG_PRINT("info", ("part=%u count=%u", part, count));
 
886
  Uint32 n = 0;
 
887
  while (n < count) {
 
888
    NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
 
889
    if (tOp == NULL ||
 
890
        tOp->committedRead() == -1 ||
 
891
        setPartKeyValue(tOp, part + n) == -1 ||
 
892
        tOp->getValue((Uint32)3, buf) == NULL) {
 
893
      setErrorCode(tOp);
 
894
      DBUG_RETURN(-1);
 
895
    }
 
896
    tOp->m_abortOption = NdbTransaction::AbortOnError;
 
897
    buf += thePartSize;
 
898
    n++;
 
899
    thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
 
900
    theNdbCon->thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
 
901
  }
 
902
  DBUG_RETURN(0);
 
903
}
 
904
 
 
905
int
 
906
NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
 
907
{
 
908
  DBUG_ENTER("NdbBlob::insertParts");
 
909
  DBUG_PRINT("info", ("part=%u count=%u", part, count));
 
910
  Uint32 n = 0;
 
911
  while (n < count) {
 
912
    NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
 
913
    if (tOp == NULL ||
 
914
        tOp->insertTuple() == -1 ||
 
915
        setPartKeyValue(tOp, part + n) == -1 ||
 
916
        tOp->setValue((Uint32)3, buf) == -1) {
 
917
      setErrorCode(tOp);
 
918
      DBUG_RETURN(-1);
 
919
    }
 
920
    tOp->m_abortOption = NdbTransaction::AbortOnError;
 
921
    buf += thePartSize;
 
922
    n++;
 
923
    thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
 
924
    theNdbCon->thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
 
925
  }
 
926
  DBUG_RETURN(0);
 
927
}
 
928
 
 
929
int
 
930
NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
 
931
{
 
932
  DBUG_ENTER("NdbBlob::updateParts");
 
933
  DBUG_PRINT("info", ("part=%u count=%u", part, count));
 
934
  Uint32 n = 0;
 
935
  while (n < count) {
 
936
    NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
 
937
    if (tOp == NULL ||
 
938
        tOp->updateTuple() == -1 ||
 
939
        setPartKeyValue(tOp, part + n) == -1 ||
 
940
        tOp->setValue((Uint32)3, buf) == -1) {
 
941
      setErrorCode(tOp);
 
942
      DBUG_RETURN(-1);
 
943
    }
 
944
    tOp->m_abortOption = NdbTransaction::AbortOnError;
 
945
    buf += thePartSize;
 
946
    n++;
 
947
    thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
 
948
    theNdbCon->thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
 
949
  }
 
950
  DBUG_RETURN(0);
 
951
}
 
952
 
 
953
int
 
954
NdbBlob::deleteParts(Uint32 part, Uint32 count)
 
955
{
 
956
  DBUG_ENTER("NdbBlob::deleteParts");
 
957
  DBUG_PRINT("info", ("part=%u count=%u", part, count));
 
958
  Uint32 n = 0;
 
959
  while (n < count) {
 
960
    NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
 
961
    if (tOp == NULL ||
 
962
        tOp->deleteTuple() == -1 ||
 
963
        setPartKeyValue(tOp, part + n) == -1) {
 
964
      setErrorCode(tOp);
 
965
      DBUG_RETURN(-1);
 
966
    }
 
967
    tOp->m_abortOption = NdbTransaction::AbortOnError;
 
968
    n++;
 
969
    thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
 
970
    theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
 
971
  }
 
972
  DBUG_RETURN(0);
 
973
}
 
974
 
 
975
/*
 
976
 * Number of blob parts not known.  Used to check for race condition
 
977
 * when writeTuple is used for insert.  Deletes all parts found.
 
978
 */
 
979
int
 
980
NdbBlob::deletePartsUnknown(Uint32 part)
 
981
{
 
982
  DBUG_ENTER("NdbBlob::deletePartsUnknown");
 
983
  DBUG_PRINT("info", ("part=%u count=all", part));
 
984
  if (thePartSize == 0) // tinyblob
 
985
    DBUG_RETURN(0);
 
986
  static const unsigned maxbat = 256;
 
987
  static const unsigned minbat = 1;
 
988
  unsigned bat = minbat;
 
989
  NdbOperation* tOpList[maxbat];
 
990
  Uint32 count = 0;
 
991
  while (true) {
 
992
    Uint32 n;
 
993
    n = 0;
 
994
    while (n < bat) {
 
995
      NdbOperation*& tOp = tOpList[n];  // ref
 
996
      tOp = theNdbCon->getNdbOperation(theBlobTable);
 
997
      if (tOp == NULL ||
 
998
          tOp->deleteTuple() == -1 ||
 
999
          setPartKeyValue(tOp, part + count + n) == -1) {
 
1000
        setErrorCode(tOp);
 
1001
        DBUG_RETURN(-1);
 
1002
      }
 
1003
      tOp->m_abortOption= NdbTransaction::AO_IgnoreError;
 
1004
      n++;
 
1005
    }
 
1006
    DBUG_PRINT("info", ("bat=%u", bat));
 
1007
    if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1)
 
1008
      DBUG_RETURN(-1);
 
1009
    n = 0;
 
1010
    while (n < bat) {
 
1011
      NdbOperation* tOp = tOpList[n];
 
1012
      if (tOp->theError.code != 0) {
 
1013
        if (tOp->theError.code != 626) {
 
1014
          setErrorCode(tOp);
 
1015
          DBUG_RETURN(-1);
 
1016
        }
 
1017
        // first non-existent part
 
1018
        DBUG_PRINT("info", ("count=%u", count));
 
1019
        DBUG_RETURN(0);
 
1020
      }
 
1021
      n++;
 
1022
      count++;
 
1023
    }
 
1024
    bat *= 4;
 
1025
    if (bat > maxbat)
 
1026
      bat = maxbat;
 
1027
  }
 
1028
}
 
1029
 
 
1030
// pending ops
 
1031
 
 
1032
int
 
1033
NdbBlob::executePendingBlobReads()
 
1034
{
 
1035
  DBUG_ENTER("NdbBlob::executePendingBlobReads");
 
1036
  Uint8 flags = (1 << NdbOperation::ReadRequest);
 
1037
  if (thePendingBlobOps & flags) {
 
1038
    if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1)
 
1039
      DBUG_RETURN(-1);
 
1040
    thePendingBlobOps = 0;
 
1041
    theNdbCon->thePendingBlobOps = 0;
 
1042
  }
 
1043
  DBUG_RETURN(0);
 
1044
}
 
1045
 
 
1046
int
 
1047
NdbBlob::executePendingBlobWrites()
 
1048
{
 
1049
  DBUG_ENTER("NdbBlob::executePendingBlobWrites");
 
1050
  Uint8 flags = 0xFF & ~(1 << NdbOperation::ReadRequest);
 
1051
  if (thePendingBlobOps & flags) {
 
1052
    if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1)
 
1053
      DBUG_RETURN(-1);
 
1054
    thePendingBlobOps = 0;
 
1055
    theNdbCon->thePendingBlobOps = 0;
 
1056
  }
 
1057
  DBUG_RETURN(0);
 
1058
}
 
1059
 
 
1060
// callbacks
 
1061
 
 
1062
int
 
1063
NdbBlob::invokeActiveHook()
 
1064
{
 
1065
  DBUG_ENTER("NdbBlob::invokeActiveHook");
 
1066
  assert(theState == Active && theActiveHook != NULL);
 
1067
  int ret = (*theActiveHook)(this, theActiveHookArg);
 
1068
  if (ret != 0) {
 
1069
    // no error is set on blob level
 
1070
    DBUG_RETURN(-1);
 
1071
  }
 
1072
  DBUG_RETURN(0);
 
1073
}
 
1074
 
 
1075
// blob handle maintenance
 
1076
 
 
1077
/*
 
1078
 * Prepare blob handle linked to an operation.  Checks blob table.
 
1079
 * Allocates buffers.  For key operation fetches key data from signal
 
1080
 * data.  For read operation adds read of head+inline.
 
1081
 */
 
1082
int
 
1083
NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn)
 
1084
{
 
1085
  DBUG_ENTER("NdbBlob::atPrepare");
 
1086
  DBUG_PRINT("info", ("this=%p op=%p con=%p", this, anOp, aCon));
 
1087
  assert(theState == Idle);
 
1088
  // ndb api stuff
 
1089
  theNdb = anOp->theNdb;
 
1090
  theNdbCon = aCon;     // for scan, this is the real transaction (m_transConnection)
 
1091
  theNdbOp = anOp;
 
1092
  theTable = anOp->m_currentTable;
 
1093
  theAccessTable = anOp->m_accessTable;
 
1094
  theColumn = aColumn;
 
1095
  NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined;
 
1096
  switch (theColumn->getType()) {
 
1097
  case NdbDictionary::Column::Blob:
 
1098
    partType = NdbDictionary::Column::Binary;
 
1099
    theFillChar = 0x0;
 
1100
    break;
 
1101
  case NdbDictionary::Column::Text:
 
1102
    partType = NdbDictionary::Column::Char;
 
1103
    theFillChar = 0x20;
 
1104
    break;
 
1105
  default:
 
1106
    setErrorCode(NdbBlobImpl::ErrUsage);
 
1107
    DBUG_RETURN(-1);
 
1108
  }
 
1109
  // sizes
 
1110
  theInlineSize = theColumn->getInlineSize();
 
1111
  thePartSize = theColumn->getPartSize();
 
1112
  theStripeSize = theColumn->getStripeSize();
 
1113
  // sanity check
 
1114
  assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
 
1115
  assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
 
1116
  if (thePartSize > 0) {
 
1117
    const NdbDictionary::Table* bt = NULL;
 
1118
    const NdbDictionary::Column* bc = NULL;
 
1119
    if (theStripeSize == 0 ||
 
1120
        (bt = theColumn->getBlobTable()) == NULL ||
 
1121
        (bc = bt->getColumn("DATA")) == NULL ||
 
1122
        bc->getType() != partType ||
 
1123
        bc->getLength() != (int)thePartSize) {
 
1124
      setErrorCode(NdbBlobImpl::ErrTable);
 
1125
      DBUG_RETURN(-1);
 
1126
    }
 
1127
    theBlobTable = &NdbTableImpl::getImpl(*bt);
 
1128
  }
 
1129
  // buffers
 
1130
  theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
 
1131
  theAccessKeyBuf.alloc(theAccessTable->m_keyLenInWords << 2);
 
1132
  theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
 
1133
  theHeadInlineCopyBuf.alloc(sizeof(Head) + theInlineSize);
 
1134
  thePartBuf.alloc(thePartSize);
 
1135
  theHead = (Head*)theHeadInlineBuf.data;
 
1136
  theInlineData = theHeadInlineBuf.data + sizeof(Head);
 
1137
  // handle different operation types
 
1138
  bool supportedOp = false;
 
1139
  if (isKeyOp()) {
 
1140
    if (isTableOp()) {
 
1141
      // get table key
 
1142
      Uint32* data = (Uint32*)theKeyBuf.data;
 
1143
      unsigned size = theTable->m_keyLenInWords;
 
1144
      if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
 
1145
        setErrorCode(NdbBlobImpl::ErrUsage);
 
1146
        DBUG_RETURN(-1);
 
1147
      }
 
1148
    }
 
1149
    if (isIndexOp()) {
 
1150
      // get index key
 
1151
      Uint32* data = (Uint32*)theAccessKeyBuf.data;
 
1152
      unsigned size = theAccessTable->m_keyLenInWords;
 
1153
      if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
 
1154
        setErrorCode(NdbBlobImpl::ErrUsage);
 
1155
        DBUG_RETURN(-1);
 
1156
      }
 
1157
    }
 
1158
    if (isReadOp()) {
 
1159
      // upgrade lock mode
 
1160
      if (theNdbOp->theLockMode == NdbOperation::LM_CommittedRead)
 
1161
        theNdbOp->theLockMode = NdbOperation::LM_Read;
 
1162
      // add read of head+inline in this op
 
1163
      if (getHeadInlineValue(theNdbOp) == -1)
 
1164
        DBUG_RETURN(-1);
 
1165
    }
 
1166
    if (isInsertOp()) {
 
1167
      // becomes NULL unless set before execute
 
1168
      theNullFlag = true;
 
1169
      theLength = 0;
 
1170
    }
 
1171
    if (isWriteOp()) {
 
1172
      // becomes NULL unless set before execute
 
1173
      theNullFlag = true;
 
1174
      theLength = 0;
 
1175
      theHeadInlineUpdateFlag = true;
 
1176
    }
 
1177
    supportedOp = true;
 
1178
  }
 
1179
  if (isScanOp()) {
 
1180
    // upgrade lock mode
 
1181
    if (theNdbOp->theLockMode == NdbOperation::LM_CommittedRead)
 
1182
      theNdbOp->theLockMode = NdbOperation::LM_Read;
 
1183
    // add read of head+inline in this op
 
1184
    if (getHeadInlineValue(theNdbOp) == -1)
 
1185
      DBUG_RETURN(-1);
 
1186
    supportedOp = true;
 
1187
  }
 
1188
  if (! supportedOp) {
 
1189
    setErrorCode(NdbBlobImpl::ErrUsage);
 
1190
    DBUG_RETURN(-1);
 
1191
  }
 
1192
  setState(Prepared);
 
1193
  DBUG_RETURN(0);
 
1194
}
 
1195
 
 
1196
/*
 
1197
 * Before execute of prepared operation.  May add new operations before
 
1198
 * this one.  May ask that this operation and all before it (a "batch")
 
1199
 * is executed immediately in no-commit mode.  In this case remaining
 
1200
 * prepared operations are saved in a separate list.  They are added
 
1201
 * back after postExecute.
 
1202
 */
 
1203
int
 
1204
NdbBlob::preExecute(NdbTransaction::ExecType anExecType, bool& batch)
 
1205
{
 
1206
  DBUG_ENTER("NdbBlob::preExecute");
 
1207
  DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon));
 
1208
  if (theState == Invalid)
 
1209
    DBUG_RETURN(-1);
 
1210
  assert(theState == Prepared);
 
1211
  // handle different operation types
 
1212
  assert(isKeyOp());
 
1213
  if (isReadOp()) {
 
1214
    if (theGetFlag && theGetSetBytes > theInlineSize) {
 
1215
      // need blob head before proceeding
 
1216
      batch = true;
 
1217
    }
 
1218
  }
 
1219
  if (isInsertOp()) {
 
1220
    if (theSetFlag && theGetSetBytes > theInlineSize) {
 
1221
      // add ops to write rest of a setValue
 
1222
      assert(theSetBuf != NULL);
 
1223
      const char* buf = theSetBuf + theInlineSize;
 
1224
      Uint32 bytes = theGetSetBytes - theInlineSize;
 
1225
      assert(thePos == theInlineSize);
 
1226
      if (writeDataPrivate(buf, bytes) == -1)
 
1227
        DBUG_RETURN(-1);
 
1228
      if (theHeadInlineUpdateFlag) {
 
1229
          // add an operation to update head+inline
 
1230
          NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
 
1231
          if (tOp == NULL ||
 
1232
              tOp->updateTuple() == -1 ||
 
1233
              setTableKeyValue(tOp) == -1 ||
 
1234
              setHeadInlineValue(tOp) == -1) {
 
1235
            setErrorCode(NdbBlobImpl::ErrAbort);
 
1236
            DBUG_RETURN(-1);
 
1237
          }
 
1238
          DBUG_PRINT("info", ("add op to update head+inline"));
 
1239
      }
 
1240
    }
 
1241
  }
 
1242
  if (isTableOp()) {
 
1243
    if (isUpdateOp() || isWriteOp() || isDeleteOp()) {
 
1244
      // add operation before this one to read head+inline
 
1245
      NdbOperation* tOp = theNdbCon->getNdbOperation(theTable, theNdbOp);
 
1246
      /*
 
1247
       * If main op is from take over scan lock, the added read is done
 
1248
       * as committed read:
 
1249
       *
 
1250
       * In normal transactional case, the row is locked by us and
 
1251
       * committed read returns same as normal read.
 
1252
       *
 
1253
       * In current TRUNCATE TABLE, the deleting trans is committed in
 
1254
       * batches and then restarted with new trans id.  A normal read
 
1255
       * would hang on the scan delete lock and then fail.
 
1256
       */
 
1257
      NdbOperation::LockMode lockMode =
 
1258
        ! isTakeOverOp() ?
 
1259
          NdbOperation::LM_Read : NdbOperation::LM_CommittedRead;
 
1260
      if (tOp == NULL ||
 
1261
          tOp->readTuple(lockMode) == -1 ||
 
1262
          setTableKeyValue(tOp) == -1 ||
 
1263
          getHeadInlineValue(tOp) == -1) {
 
1264
        setErrorCode(tOp);
 
1265
        DBUG_RETURN(-1);
 
1266
      }
 
1267
      if (isWriteOp()) {
 
1268
        tOp->m_abortOption = NdbTransaction::AO_IgnoreError;
 
1269
      }
 
1270
      theHeadInlineReadOp = tOp;
 
1271
      // execute immediately
 
1272
      batch = true;
 
1273
      DBUG_PRINT("info", ("add op before to read head+inline"));
 
1274
    }
 
1275
  }
 
1276
  if (isIndexOp()) {
 
1277
    // add op before this one to read table key
 
1278
    NdbBlob* tFirstBlob = theNdbOp->theBlobList;
 
1279
    if (this == tFirstBlob) {
 
1280
      // first blob does it for all
 
1281
      if (g_ndb_blob_ok_to_read_index_table) {
 
1282
        Uint32 pkAttrId = theAccessTable->getNoOfColumns() - 1;
 
1283
        NdbOperation* tOp = theNdbCon->getNdbOperation(theAccessTable, theNdbOp);
 
1284
        if (tOp == NULL ||
 
1285
            tOp->readTuple() == -1 ||
 
1286
            setAccessKeyValue(tOp) == -1 ||
 
1287
            tOp->getValue(pkAttrId, theKeyBuf.data) == NULL) {
 
1288
          setErrorCode(tOp);
 
1289
          DBUG_RETURN(-1);
 
1290
        }
 
1291
      } else {
 
1292
        NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp);
 
1293
        if (tOp == NULL ||
 
1294
            tOp->readTuple() == -1 ||
 
1295
            setAccessKeyValue(tOp) == -1 ||
 
1296
            getTableKeyValue(tOp) == -1) {
 
1297
          setErrorCode(tOp);
 
1298
          DBUG_RETURN(-1);
 
1299
        }
 
1300
      }
 
1301
    }
 
1302
    DBUG_PRINT("info", ("added op before to read table key"));
 
1303
    if (isUpdateOp() || isDeleteOp()) {
 
1304
      // add op before this one to read head+inline via index
 
1305
      NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp);
 
1306
      if (tOp == NULL ||
 
1307
          tOp->readTuple() == -1 ||
 
1308
          setAccessKeyValue(tOp) == -1 ||
 
1309
          getHeadInlineValue(tOp) == -1) {
 
1310
        setErrorCode(tOp);
 
1311
        DBUG_RETURN(-1);
 
1312
      }
 
1313
      if (isWriteOp()) {
 
1314
        tOp->m_abortOption = NdbTransaction::AO_IgnoreError;
 
1315
      }
 
1316
      theHeadInlineReadOp = tOp;
 
1317
      // execute immediately
 
1318
      batch = true;
 
1319
      DBUG_PRINT("info", ("added index op before to read head+inline"));
 
1320
    }
 
1321
    if (isWriteOp()) {
 
1322
      // XXX until IgnoreError fixed for index op
 
1323
      batch = true;
 
1324
    }
 
1325
  }
 
1326
  if (isWriteOp()) {
 
1327
    if (theSetFlag) {
 
1328
      // write head+inline now
 
1329
      theNullFlag = true;
 
1330
      theLength = 0;
 
1331
      if (theSetBuf != NULL) {
 
1332
        Uint32 n = theGetSetBytes;
 
1333
        if (n > theInlineSize)
 
1334
          n = theInlineSize;
 
1335
        assert(thePos == 0);
 
1336
        if (writeDataPrivate(theSetBuf, n) == -1)
 
1337
          DBUG_RETURN(-1);
 
1338
      }
 
1339
      if (setHeadInlineValue(theNdbOp) == -1)
 
1340
        DBUG_RETURN(-1);
 
1341
      // the read op before us may overwrite
 
1342
      theHeadInlineCopyBuf.copyfrom(theHeadInlineBuf);
 
1343
    }
 
1344
  }
 
1345
  if (theActiveHook != NULL) {
 
1346
    // need blob head for callback
 
1347
    batch = true;
 
1348
  }
 
1349
  DBUG_PRINT("info", ("batch=%u", batch));
 
1350
  DBUG_RETURN(0);
 
1351
}
 
1352
 
 
1353
/*
 
1354
 * After execute, for any operation.  If already Active, this routine
 
1355
 * has been done previously.  Operations which requested a no-commit
 
1356
 * batch can add new operations after this one.  They are added before
 
1357
 * any remaining prepared operations.
 
1358
 */
 
1359
int
 
1360
NdbBlob::postExecute(NdbTransaction::ExecType anExecType)
 
1361
{
 
1362
  DBUG_ENTER("NdbBlob::postExecute");
 
1363
  DBUG_PRINT("info", ("this=%p op=%p con=%p anExecType=%u", this, theNdbOp, theNdbCon, anExecType));
 
1364
  if (theState == Invalid)
 
1365
    DBUG_RETURN(-1);
 
1366
  if (theState == Active) {
 
1367
    setState(anExecType == NdbTransaction::NoCommit ? Active : Closed);
 
1368
    DBUG_PRINT("info", ("skip active"));
 
1369
    DBUG_RETURN(0);
 
1370
  }
 
1371
  assert(theState == Prepared);
 
1372
  setState(anExecType == NdbTransaction::NoCommit ? Active : Closed);
 
1373
  assert(isKeyOp());
 
1374
  if (isIndexOp()) {
 
1375
    NdbBlob* tFirstBlob = theNdbOp->theBlobList;
 
1376
    if (this != tFirstBlob) {
 
1377
      // copy key from first blob
 
1378
      assert(theKeyBuf.size == tFirstBlob->theKeyBuf.size);
 
1379
      memcpy(theKeyBuf.data, tFirstBlob->theKeyBuf.data, tFirstBlob->theKeyBuf.size);
 
1380
    }
 
1381
  }
 
1382
  if (isReadOp()) {
 
1383
    getHeadFromRecAttr();
 
1384
    if (setPos(0) == -1)
 
1385
      DBUG_RETURN(-1);
 
1386
    if (theGetFlag) {
 
1387
      assert(theGetSetBytes == 0 || theGetBuf != 0);
 
1388
      assert(theGetSetBytes <= theInlineSize ||
 
1389
             anExecType == NdbTransaction::NoCommit);
 
1390
      Uint32 bytes = theGetSetBytes;
 
1391
      if (readDataPrivate(theGetBuf, bytes) == -1)
 
1392
        DBUG_RETURN(-1);
 
1393
    }
 
1394
  }
 
1395
  if (isUpdateOp()) {
 
1396
    assert(anExecType == NdbTransaction::NoCommit);
 
1397
    getHeadFromRecAttr();
 
1398
    if (theSetFlag) {
 
1399
      // setValue overwrites everything
 
1400
      if (theSetBuf != NULL) {
 
1401
        if (truncate(0) == -1)
 
1402
          DBUG_RETURN(-1);
 
1403
        assert(thePos == 0);
 
1404
        if (writeDataPrivate(theSetBuf, theGetSetBytes) == -1)
 
1405
          DBUG_RETURN(-1);
 
1406
      } else {
 
1407
        if (setNull() == -1)
 
1408
          DBUG_RETURN(-1);
 
1409
      }
 
1410
    }
 
1411
  }
 
1412
  if (isWriteOp() && isTableOp()) {
 
1413
    assert(anExecType == NdbTransaction::NoCommit);
 
1414
    if (theHeadInlineReadOp->theError.code == 0) {
 
1415
      int tNullFlag = theNullFlag;
 
1416
      Uint64 tLength = theLength;
 
1417
      Uint64 tPos = thePos;
 
1418
      getHeadFromRecAttr();
 
1419
      DBUG_PRINT("info", ("tuple found"));
 
1420
      if (truncate(0) == -1)
 
1421
        DBUG_RETURN(-1);
 
1422
      // restore previous head+inline
 
1423
      theHeadInlineBuf.copyfrom(theHeadInlineCopyBuf);
 
1424
      theNullFlag = tNullFlag;
 
1425
      theLength = tLength;
 
1426
      thePos = tPos;
 
1427
    } else {
 
1428
      if (theHeadInlineReadOp->theError.code != 626) {
 
1429
        setErrorCode(theHeadInlineReadOp);
 
1430
        DBUG_RETURN(-1);
 
1431
      }
 
1432
      DBUG_PRINT("info", ("tuple not found"));
 
1433
      /*
 
1434
       * Read found no tuple but it is possible that a tuple was
 
1435
       * created after the read by another transaction.  Delete all
 
1436
       * blob parts which may exist.
 
1437
       */
 
1438
      if (deletePartsUnknown(0) == -1)
 
1439
        DBUG_RETURN(-1);
 
1440
    }
 
1441
    if (theSetFlag && theGetSetBytes > theInlineSize) {
 
1442
      assert(theSetBuf != NULL);
 
1443
      const char* buf = theSetBuf + theInlineSize;
 
1444
      Uint32 bytes = theGetSetBytes - theInlineSize;
 
1445
      assert(thePos == theInlineSize);
 
1446
      if (writeDataPrivate(buf, bytes) == -1)
 
1447
          DBUG_RETURN(-1);
 
1448
    }
 
1449
  }
 
1450
  if (isWriteOp() && isIndexOp()) {
 
1451
    // XXX until IgnoreError fixed for index op
 
1452
    if (deletePartsUnknown(0) == -1)
 
1453
      DBUG_RETURN(-1);
 
1454
    if (theSetFlag && theGetSetBytes > theInlineSize) {
 
1455
      assert(theSetBuf != NULL);
 
1456
      const char* buf = theSetBuf + theInlineSize;
 
1457
      Uint32 bytes = theGetSetBytes - theInlineSize;
 
1458
      assert(thePos == theInlineSize);
 
1459
      if (writeDataPrivate(buf, bytes) == -1)
 
1460
          DBUG_RETURN(-1);
 
1461
    }
 
1462
  }
 
1463
  if (isDeleteOp()) {
 
1464
    assert(anExecType == NdbTransaction::NoCommit);
 
1465
    getHeadFromRecAttr();
 
1466
    if (deleteParts(0, getPartCount()) == -1)
 
1467
      DBUG_RETURN(-1);
 
1468
  }
 
1469
  setState(anExecType == NdbTransaction::NoCommit ? Active : Closed);
 
1470
  // activation callback
 
1471
  if (theActiveHook != NULL) {
 
1472
    if (invokeActiveHook() == -1)
 
1473
      DBUG_RETURN(-1);
 
1474
  }
 
1475
  if (anExecType == NdbTransaction::NoCommit && theHeadInlineUpdateFlag) {
 
1476
    NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
 
1477
    if (tOp == NULL ||
 
1478
       tOp->updateTuple() == -1 ||
 
1479
       setTableKeyValue(tOp) == -1 ||
 
1480
       setHeadInlineValue(tOp) == -1) {
 
1481
      setErrorCode(NdbBlobImpl::ErrAbort);
 
1482
      DBUG_RETURN(-1);
 
1483
    }
 
1484
    tOp->m_abortOption = NdbTransaction::AbortOnError;
 
1485
    DBUG_PRINT("info", ("added op to update head+inline"));
 
1486
  }
 
1487
  DBUG_RETURN(0);
 
1488
}
 
1489
 
 
1490
/*
 
1491
 * Before commit of completed operation.  For write add operation to
 
1492
 * update head+inline.
 
1493
 */
 
1494
int
 
1495
NdbBlob::preCommit()
 
1496
{
 
1497
  DBUG_ENTER("NdbBlob::preCommit");
 
1498
  DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon));
 
1499
  if (theState == Invalid)
 
1500
    DBUG_RETURN(-1);
 
1501
  assert(theState == Active);
 
1502
  assert(isKeyOp());
 
1503
  if (isInsertOp() || isUpdateOp() || isWriteOp()) {
 
1504
    if (theHeadInlineUpdateFlag) {
 
1505
        // add an operation to update head+inline
 
1506
        NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
 
1507
        if (tOp == NULL ||
 
1508
            tOp->updateTuple() == -1 ||
 
1509
            setTableKeyValue(tOp) == -1 ||
 
1510
            setHeadInlineValue(tOp) == -1) {
 
1511
          setErrorCode(NdbBlobImpl::ErrAbort);
 
1512
          DBUG_RETURN(-1);
 
1513
        }
 
1514
        tOp->m_abortOption = NdbTransaction::AbortOnError;
 
1515
        DBUG_PRINT("info", ("added op to update head+inline"));
 
1516
    }
 
1517
  }
 
1518
  DBUG_RETURN(0);
 
1519
}
 
1520
 
 
1521
/*
 
1522
 * After next scan result.  Handle like read op above.
 
1523
 */
 
1524
int
 
1525
NdbBlob::atNextResult()
 
1526
{
 
1527
  DBUG_ENTER("NdbBlob::atNextResult");
 
1528
  DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon));
 
1529
  if (theState == Invalid)
 
1530
    DBUG_RETURN(-1);
 
1531
  assert(isScanOp());
 
1532
  // get primary key
 
1533
  { Uint32* data = (Uint32*)theKeyBuf.data;
 
1534
    unsigned size = theTable->m_keyLenInWords;
 
1535
    if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) {
 
1536
      setErrorCode(NdbBlobImpl::ErrUsage);
 
1537
      DBUG_RETURN(-1);
 
1538
    }
 
1539
  }
 
1540
  getHeadFromRecAttr();
 
1541
  if (setPos(0) == -1)
 
1542
    DBUG_RETURN(-1);
 
1543
  if (theGetFlag) {
 
1544
    assert(theGetSetBytes == 0 || theGetBuf != 0);
 
1545
    Uint32 bytes = theGetSetBytes;
 
1546
    if (readDataPrivate(theGetBuf, bytes) == -1)
 
1547
      DBUG_RETURN(-1);
 
1548
  }
 
1549
  setState(Active);
 
1550
  // activation callback
 
1551
  if (theActiveHook != NULL) {
 
1552
    if (invokeActiveHook() == -1)
 
1553
      DBUG_RETURN(-1);
 
1554
  }
 
1555
  DBUG_RETURN(0);
 
1556
}
 
1557
 
 
1558
// misc
 
1559
 
 
1560
const NdbDictionary::Column*
 
1561
NdbBlob::getColumn()
 
1562
{
 
1563
  return theColumn;
 
1564
}
 
1565
 
 
1566
// errors
 
1567
 
 
1568
void
 
1569
NdbBlob::setErrorCode(int anErrorCode, bool invalidFlag)
 
1570
{
 
1571
  DBUG_ENTER("NdbBlob::setErrorCode");
 
1572
  DBUG_PRINT("info", ("this=%p code=%u", this, anErrorCode));
 
1573
  theError.code = anErrorCode;
 
1574
  // conditionally copy error to operation level
 
1575
  if (theNdbOp != NULL && theNdbOp->theError.code == 0)
 
1576
    theNdbOp->setErrorCode(theError.code);
 
1577
  if (invalidFlag)
 
1578
    setState(Invalid);
 
1579
  DBUG_VOID_RETURN;
 
1580
}
 
1581
 
 
1582
void
 
1583
NdbBlob::setErrorCode(NdbOperation* anOp, bool invalidFlag)
 
1584
{
 
1585
  int code = 0;
 
1586
  if (anOp != NULL && (code = anOp->theError.code) != 0)
 
1587
    ;
 
1588
  else if ((code = theNdbCon->theError.code) != 0)
 
1589
    ;
 
1590
  else if ((code = theNdb->theError.code) != 0)
 
1591
    ;
 
1592
  else
 
1593
    code = NdbBlobImpl::ErrUnknown;
 
1594
  setErrorCode(code, invalidFlag);
 
1595
}
 
1596
 
 
1597
void
 
1598
NdbBlob::setErrorCode(NdbTransaction* aCon, bool invalidFlag)
 
1599
{
 
1600
  int code = 0;
 
1601
  if (theNdbCon != NULL && (code = theNdbCon->theError.code) != 0)
 
1602
    ;
 
1603
  else if ((code = theNdb->theError.code) != 0)
 
1604
    ;
 
1605
  else
 
1606
    code = NdbBlobImpl::ErrUnknown;
 
1607
  setErrorCode(code, invalidFlag);
 
1608
}
 
1609
 
 
1610
// info about all blobs in this operation
 
1611
 
 
1612
NdbBlob*
 
1613
NdbBlob::blobsFirstBlob()
 
1614
{
 
1615
  return theNdbOp->theBlobList;
 
1616
}
 
1617
 
 
1618
NdbBlob*
 
1619
NdbBlob::blobsNextBlob()
 
1620
{
 
1621
  return theNext;
 
1622
}
 
1623
 
 
1624
// debug
 
1625
 
 
1626
#ifdef VM_TRACE
 
1627
inline int
 
1628
NdbBlob::getOperationType() const
 
1629
{
 
1630
  return theNdbOp != NULL ? theNdbOp->theOperationType : -1;
 
1631
}
 
1632
 
 
1633
NdbOut&
 
1634
operator<<(NdbOut& out, const NdbBlob& blob)
 
1635
{
 
1636
  ndbout << dec << "o=" << blob.getOperationType();
 
1637
  ndbout << dec << " s=" << (Uint32) blob.theState;
 
1638
  ndbout << dec << " n=" << blob.theNullFlag;;
 
1639
  ndbout << dec << " l=" << blob.theLength;
 
1640
  ndbout << dec << " p=" << blob.thePos;
 
1641
  ndbout << dec << " u=" << (Uint32)blob.theHeadInlineUpdateFlag;
 
1642
  ndbout << dec << " g=" << (Uint32)blob.theGetSetBytes;
 
1643
  return out;
 
1644
}
 
1645
#endif