~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
#define DBTUP_C
 
18
#include <Dblqh.hpp>
 
19
#include "Dbtup.hpp"
 
20
#include <RefConvert.hpp>
 
21
#include <ndb_limits.h>
 
22
#include <pc.hpp>
 
23
#include <AttributeDescriptor.hpp>
 
24
#include "AttributeOffset.hpp"
 
25
#include <AttributeHeader.hpp>
 
26
#include <Interpreter.hpp>
 
27
#include <signaldata/TupKey.hpp>
 
28
#include <signaldata/AttrInfo.hpp>
 
29
#include <NdbSqlUtil.hpp>
 
30
 
 
31
/* ----------------------------------------------------------------- */
 
32
/* -----------       INIT_STORED_OPERATIONREC         -------------- */
 
33
/* ----------------------------------------------------------------- */
 
34
int Dbtup::initStoredOperationrec(Operationrec* regOperPtr,
 
35
                                  KeyReqStruct* req_struct,
 
36
                                  Uint32 storedId) 
 
37
{
 
38
  jam();
 
39
  StoredProcPtr storedPtr;
 
40
  c_storedProcPool.getPtr(storedPtr, storedId);
 
41
  if (storedPtr.i != RNIL) {
 
42
    if (storedPtr.p->storedCode == ZSCAN_PROCEDURE) {
 
43
      storedPtr.p->storedCounter++;
 
44
      regOperPtr->firstAttrinbufrec= storedPtr.p->storedLinkFirst;
 
45
      regOperPtr->lastAttrinbufrec= storedPtr.p->storedLinkLast;
 
46
      regOperPtr->currentAttrinbufLen= storedPtr.p->storedProcLength;
 
47
      req_struct->attrinfo_len= storedPtr.p->storedProcLength;
 
48
      return ZOK;
 
49
    }
 
50
  }
 
51
  terrorCode= ZSTORED_PROC_ID_ERROR;
 
52
  return terrorCode;
 
53
}
 
54
 
 
55
void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
 
56
                         Uint32* inBuffer)
 
57
{
 
58
  AttrbufrecPtr copyAttrBufPtr;
 
59
  Uint32 RnoOfAttrBufrec= cnoOfAttrbufrec;
 
60
  int RbufLen;
 
61
  Uint32 RinBufIndex= 0;
 
62
  Uint32 Rnext;
 
63
  Uint32 Rfirst;
 
64
  Uint32 TstoredProcedure= (regOperPtr->storedProcedureId != ZNIL);
 
65
  Uint32 RnoFree= cnoFreeAttrbufrec;
 
66
 
 
67
//-------------------------------------------------------------------------
 
68
// As a prelude to the execution of the TUPKEYREQ we will copy the program
 
69
// into the inBuffer to enable easy execution without any complex jumping
 
70
// between the buffers. In particular this will make the interpreter less
 
71
// complex. Hopefully it does also improve performance.
 
72
//-------------------------------------------------------------------------
 
73
  copyAttrBufPtr.i= regOperPtr->firstAttrinbufrec;
 
74
  while (copyAttrBufPtr.i != RNIL) {
 
75
    jam();
 
76
    ndbrequire(copyAttrBufPtr.i < RnoOfAttrBufrec);
 
77
    ptrAss(copyAttrBufPtr, attrbufrec);
 
78
    RbufLen = copyAttrBufPtr.p->attrbuf[ZBUF_DATA_LEN];
 
79
    Rnext = copyAttrBufPtr.p->attrbuf[ZBUF_NEXT];
 
80
    Rfirst = cfirstfreeAttrbufrec;
 
81
    /*
 
82
     * ATTRINFO comes from 2 mutually exclusive places:
 
83
     * 1) TUPKEYREQ (also interpreted part)
 
84
     * 2) STORED_PROCREQ before scan start
 
85
     * Assert here that both have a check for overflow.
 
86
     * The "<" instead of "<=" is intentional.
 
87
     */
 
88
    ndbrequire(RinBufIndex + RbufLen < ZATTR_BUFFER_SIZE);
 
89
    MEMCOPY_NO_WORDS(&inBuffer[RinBufIndex],
 
90
                     &copyAttrBufPtr.p->attrbuf[0],
 
91
                     RbufLen);
 
92
    RinBufIndex += RbufLen;
 
93
    if (!TstoredProcedure) {
 
94
      copyAttrBufPtr.p->attrbuf[ZBUF_NEXT]= Rfirst;
 
95
      cfirstfreeAttrbufrec= copyAttrBufPtr.i;
 
96
      RnoFree++;
 
97
    }
 
98
    copyAttrBufPtr.i= Rnext;
 
99
  }
 
100
  cnoFreeAttrbufrec= RnoFree;
 
101
  if (TstoredProcedure) {
 
102
    jam();
 
103
    StoredProcPtr storedPtr;
 
104
    c_storedProcPool.getPtr(storedPtr, (Uint32)regOperPtr->storedProcedureId);
 
105
    ndbrequire(storedPtr.p->storedCode == ZSCAN_PROCEDURE);
 
106
    storedPtr.p->storedCounter--;
 
107
  }
 
108
  // Release the ATTRINFO buffers
 
109
  regOperPtr->storedProcedureId= RNIL;
 
110
  regOperPtr->firstAttrinbufrec= RNIL;
 
111
  regOperPtr->lastAttrinbufrec= RNIL;
 
112
  regOperPtr->m_any_value= 0;
 
113
}
 
114
 
 
115
void Dbtup::handleATTRINFOforTUPKEYREQ(Signal* signal,
 
116
                                       const Uint32 *data,
 
117
                                       Uint32 len,
 
118
                                       Operationrec * regOperPtr) 
 
119
{
 
120
  while(len)
 
121
  {
 
122
    Uint32 length = len > AttrInfo::DataLength ? AttrInfo::DataLength : len;
 
123
 
 
124
    AttrbufrecPtr TAttrinbufptr;
 
125
    TAttrinbufptr.i= cfirstfreeAttrbufrec;
 
126
    if ((cfirstfreeAttrbufrec < cnoOfAttrbufrec) &&
 
127
        (cnoFreeAttrbufrec > MIN_ATTRBUF)) {
 
128
      ptrAss(TAttrinbufptr, attrbufrec);
 
129
      MEMCOPY_NO_WORDS(&TAttrinbufptr.p->attrbuf[0],
 
130
                       data,
 
131
                       length);
 
132
      Uint32 RnoFree= cnoFreeAttrbufrec;
 
133
      Uint32 Rnext= TAttrinbufptr.p->attrbuf[ZBUF_NEXT];
 
134
      TAttrinbufptr.p->attrbuf[ZBUF_DATA_LEN]= length;
 
135
      TAttrinbufptr.p->attrbuf[ZBUF_NEXT]= RNIL;
 
136
      
 
137
      AttrbufrecPtr locAttrinbufptr;
 
138
      Uint32 RnewLen= regOperPtr->currentAttrinbufLen;
 
139
      
 
140
      locAttrinbufptr.i= regOperPtr->lastAttrinbufrec;
 
141
      cfirstfreeAttrbufrec= Rnext;
 
142
      cnoFreeAttrbufrec= RnoFree - 1;
 
143
      RnewLen += length;
 
144
      regOperPtr->lastAttrinbufrec= TAttrinbufptr.i;
 
145
      regOperPtr->currentAttrinbufLen= RnewLen;
 
146
      if (locAttrinbufptr.i == RNIL) {
 
147
        regOperPtr->firstAttrinbufrec= TAttrinbufptr.i;
 
148
      } else {
 
149
        jam();
 
150
        ptrCheckGuard(locAttrinbufptr, cnoOfAttrbufrec, attrbufrec);
 
151
        locAttrinbufptr.p->attrbuf[ZBUF_NEXT]= TAttrinbufptr.i;
 
152
      }
 
153
      if (RnewLen < ZATTR_BUFFER_SIZE) {
 
154
      } else {
 
155
        jam();
 
156
        set_trans_state(regOperPtr, TRANS_TOO_MUCH_AI);
 
157
        return;
 
158
      }
 
159
    } else if (cnoFreeAttrbufrec <= MIN_ATTRBUF) {
 
160
      jam();
 
161
      set_trans_state(regOperPtr, TRANS_ERROR_WAIT_TUPKEYREQ);
 
162
    } else {
 
163
      ndbrequire(false);
 
164
    }
 
165
    
 
166
    len -= length;
 
167
    data += length;    
 
168
  }
 
169
}
 
170
 
 
171
void Dbtup::execATTRINFO(Signal* signal) 
 
172
{
 
173
  Uint32 Rsig0= signal->theData[0];
 
174
  Uint32 Rlen= signal->length();
 
175
  jamEntry();
 
176
 
 
177
  receive_attrinfo(signal, Rsig0, signal->theData+3, Rlen-3);
 
178
}
 
179
 
 
180
void
 
181
Dbtup::receive_attrinfo(Signal* signal, Uint32 op, 
 
182
                        const Uint32* data, Uint32 Rlen)
 
183
 
184
  OperationrecPtr regOpPtr;
 
185
  regOpPtr.i= op;
 
186
  c_operation_pool.getPtr(regOpPtr, op);
 
187
  TransState trans_state= get_trans_state(regOpPtr.p);
 
188
  if (trans_state == TRANS_IDLE) {
 
189
    handleATTRINFOforTUPKEYREQ(signal, data, Rlen, regOpPtr.p);
 
190
    return;
 
191
  } else if (trans_state == TRANS_WAIT_STORED_PROCEDURE_ATTR_INFO) {
 
192
    storedProcedureAttrInfo(signal, regOpPtr.p, data, Rlen, false);
 
193
    return;
 
194
  }
 
195
  switch (trans_state) {
 
196
  case TRANS_ERROR_WAIT_STORED_PROCREQ:
 
197
    jam();
 
198
  case TRANS_TOO_MUCH_AI:
 
199
    jam();
 
200
  case TRANS_ERROR_WAIT_TUPKEYREQ:
 
201
    jam();
 
202
    return;     /* IGNORE ATTRINFO IN THOSE STATES, WAITING FOR ABORT SIGNAL */
 
203
  case TRANS_DISCONNECTED:
 
204
    jam();
 
205
  case TRANS_STARTED:
 
206
    jam();
 
207
  default:
 
208
    ndbrequire(false);
 
209
  }
 
210
}
 
211
 
 
212
void
 
213
Dbtup::setChecksum(Tuple_header* tuple_ptr,
 
214
                   Tablerec* regTabPtr)
 
215
{
 
216
  tuple_ptr->m_checksum= 0;
 
217
  tuple_ptr->m_checksum= calculateChecksum(tuple_ptr, regTabPtr);
 
218
}
 
219
 
 
220
Uint32
 
221
Dbtup::calculateChecksum(Tuple_header* tuple_ptr,
 
222
                         Tablerec* regTabPtr)
 
223
{
 
224
  Uint32 checksum;
 
225
  Uint32 i, rec_size, *tuple_header;
 
226
  rec_size= regTabPtr->m_offsets[MM].m_fix_header_size;
 
227
  tuple_header= tuple_ptr->m_data;
 
228
  checksum= 0;
 
229
  // includes tupVersion
 
230
  //printf("%p - ", tuple_ptr);
 
231
  
 
232
  for (i= 0; i < rec_size-Tuple_header::HeaderSize; i++) {
 
233
    checksum ^= tuple_header[i];
 
234
    //printf("%.8x ", tuple_header[i]);
 
235
  }
 
236
  
 
237
  //printf("-> %.8x\n", checksum);
 
238
 
 
239
#if 0
 
240
  if (var_sized) {
 
241
    /*
 
242
    if (! req_struct->fix_var_together) {
 
243
      jam();
 
244
      checksum ^= tuple_header[rec_size];
 
245
    }
 
246
    */
 
247
    jam();
 
248
    var_data_part= req_struct->var_data_start;
 
249
    vsize_words= calculate_total_var_size(req_struct->var_len_array,
 
250
                                          regTabPtr->no_var_attr);
 
251
    ndbassert(req_struct->var_data_end >= &var_data_part[vsize_words]);
 
252
    for (i= 0; i < vsize_words; i++) {
 
253
      checksum ^= var_data_part[i];
 
254
    }
 
255
  }
 
256
#endif
 
257
  return checksum;
 
258
}
 
259
 
 
260
/* ----------------------------------------------------------------- */
 
261
/* -----------       INSERT_ACTIVE_OP_LIST            -------------- */
 
262
/* ----------------------------------------------------------------- */
 
263
bool 
 
264
Dbtup::insertActiveOpList(OperationrecPtr regOperPtr,
 
265
                          KeyReqStruct* req_struct)
 
266
{
 
267
  OperationrecPtr prevOpPtr;
 
268
  ndbrequire(!regOperPtr.p->op_struct.in_active_list);
 
269
  regOperPtr.p->op_struct.in_active_list= true;
 
270
  req_struct->prevOpPtr.i= 
 
271
    prevOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
 
272
  regOperPtr.p->prevActiveOp= prevOpPtr.i;
 
273
  regOperPtr.p->nextActiveOp= RNIL;
 
274
  regOperPtr.p->m_undo_buffer_space= 0;
 
275
  req_struct->m_tuple_ptr->m_operation_ptr_i= regOperPtr.i;
 
276
  if (prevOpPtr.i == RNIL) {
 
277
    set_change_mask_state(regOperPtr.p, USE_SAVED_CHANGE_MASK);
 
278
    regOperPtr.p->saved_change_mask[0] = 0;
 
279
    regOperPtr.p->saved_change_mask[1] = 0;
 
280
    return true;
 
281
  } else {
 
282
    req_struct->prevOpPtr.p= prevOpPtr.p= c_operation_pool.getPtr(prevOpPtr.i);
 
283
    prevOpPtr.p->nextActiveOp= regOperPtr.i;
 
284
 
 
285
    regOperPtr.p->op_struct.m_wait_log_buffer= 
 
286
      prevOpPtr.p->op_struct.m_wait_log_buffer;
 
287
    regOperPtr.p->op_struct.m_load_diskpage_on_commit= 
 
288
      prevOpPtr.p->op_struct.m_load_diskpage_on_commit;
 
289
    regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space;
 
290
    // start with prev mask (matters only for UPD o UPD)
 
291
    set_change_mask_state(regOperPtr.p, get_change_mask_state(prevOpPtr.p));
 
292
    regOperPtr.p->saved_change_mask[0] = prevOpPtr.p->saved_change_mask[0];
 
293
    regOperPtr.p->saved_change_mask[1] = prevOpPtr.p->saved_change_mask[1];
 
294
 
 
295
    regOperPtr.p->m_any_value = prevOpPtr.p->m_any_value;
 
296
 
 
297
    prevOpPtr.p->op_struct.m_wait_log_buffer= 0;
 
298
    prevOpPtr.p->op_struct.m_load_diskpage_on_commit= 0;
 
299
 
 
300
    if(prevOpPtr.p->op_struct.tuple_state == TUPLE_PREPARED)
 
301
    {
 
302
      Uint32 op= regOperPtr.p->op_struct.op_type;
 
303
      Uint32 prevOp= prevOpPtr.p->op_struct.op_type;
 
304
      if (prevOp == ZDELETE)
 
305
      {
 
306
        if(op == ZINSERT)
 
307
        {
 
308
          // mark both
 
309
          prevOpPtr.p->op_struct.delete_insert_flag= true;
 
310
          regOperPtr.p->op_struct.delete_insert_flag= true;
 
311
          return true;
 
312
        } else {
 
313
          terrorCode= ZTUPLE_DELETED_ERROR;
 
314
          return false;
 
315
        }
 
316
      } 
 
317
      else if(op == ZINSERT && prevOp != ZDELETE)
 
318
      {
 
319
        terrorCode= ZINSERT_ERROR;
 
320
        return false;
 
321
      }
 
322
      return true;
 
323
    }
 
324
    else
 
325
    {
 
326
      terrorCode= ZMUST_BE_ABORTED_ERROR;
 
327
      return false;
 
328
    }
 
329
  }
 
330
}
 
331
 
 
332
bool
 
333
Dbtup::setup_read(KeyReqStruct *req_struct,
 
334
                  Operationrec* regOperPtr,
 
335
                  Fragrecord* regFragPtr,
 
336
                  Tablerec* regTabPtr,
 
337
                  bool disk)
 
338
{
 
339
  OperationrecPtr currOpPtr;
 
340
  currOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
 
341
  if (currOpPtr.i == RNIL)
 
342
  {
 
343
    if (regTabPtr->need_expand(disk))
 
344
      prepare_read(req_struct, regTabPtr, disk);
 
345
    return true;
 
346
  }
 
347
 
 
348
  do {
 
349
    Uint32 savepointId= regOperPtr->savepointId;
 
350
    bool dirty= req_struct->dirty_op;
 
351
    
 
352
    c_operation_pool.getPtr(currOpPtr);
 
353
    bool sameTrans= c_lqh->is_same_trans(currOpPtr.p->userpointer,
 
354
                                         req_struct->trans_id1,
 
355
                                         req_struct->trans_id2);
 
356
    /**
 
357
     * Read committed in same trans reads latest copy
 
358
     */
 
359
    if(dirty && !sameTrans)
 
360
    {
 
361
      savepointId= 0;
 
362
    }
 
363
    else if(sameTrans)
 
364
    {
 
365
      // Use savepoint even in read committed mode
 
366
      dirty= false;
 
367
    }
 
368
 
 
369
    bool found= find_savepoint(currOpPtr, savepointId);
 
370
    
 
371
    Uint32 currOp= currOpPtr.p->op_struct.op_type;
 
372
    
 
373
    if((found && currOp == ZDELETE) || 
 
374
       ((dirty || !found) && currOp == ZINSERT))
 
375
    {
 
376
      terrorCode= ZTUPLE_DELETED_ERROR;
 
377
      break;
 
378
    }
 
379
    
 
380
    if(dirty || !found)
 
381
    {
 
382
      
 
383
    }
 
384
    else
 
385
    {
 
386
      req_struct->m_tuple_ptr= (Tuple_header*)
 
387
        c_undo_buffer.get_ptr(&currOpPtr.p->m_copy_tuple_location);
 
388
    }      
 
389
 
 
390
    if (regTabPtr->need_expand(disk))
 
391
      prepare_read(req_struct, regTabPtr, disk);
 
392
    
 
393
#if 0
 
394
    ndbout_c("reading copy");
 
395
    Uint32 *var_ptr = fixed_ptr+regTabPtr->var_offset;
 
396
    req_struct->m_tuple_ptr= fixed_ptr;
 
397
    req_struct->fix_var_together= true;  
 
398
    req_struct->var_len_array= (Uint16*)var_ptr;
 
399
    req_struct->var_data_start= var_ptr+regTabPtr->var_array_wsize;
 
400
    Uint32 var_sz32= init_var_pos_array((Uint16*)var_ptr,
 
401
                                        req_struct->var_pos_array,
 
402
                                        regTabPtr->no_var_attr);
 
403
    req_struct->var_data_end= var_ptr+regTabPtr->var_array_wsize + var_sz32;
 
404
#endif
 
405
    return true;
 
406
  } while(0);
 
407
  
 
408
  return false;
 
409
}
 
410
 
 
411
int
 
412
Dbtup::load_diskpage(Signal* signal, 
 
413
                     Uint32 opRec, Uint32 fragPtrI, 
 
414
                     Uint32 local_key, Uint32 flags)
 
415
{
 
416
  c_operation_pool.getPtr(operPtr, opRec);
 
417
  fragptr.i= fragPtrI;
 
418
  ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
 
419
  
 
420
  Operationrec *  regOperPtr= operPtr.p;
 
421
  Fragrecord * regFragPtr= fragptr.p;
 
422
  
 
423
  tabptr.i = regFragPtr->fragTableId;
 
424
  ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
 
425
  Tablerec* regTabPtr = tabptr.p;
 
426
  
 
427
  if(local_key == ~(Uint32)0)
 
428
  {
 
429
    jam();
 
430
    regOperPtr->op_struct.m_wait_log_buffer= 1;
 
431
    regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
 
432
    return 1;
 
433
  }
 
434
  
 
435
  jam();
 
436
  Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
 
437
  Uint32 frag_page_id= local_key >> MAX_TUPLES_BITS;
 
438
  regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
 
439
                                                     frag_page_id);
 
440
  regOperPtr->m_tuple_location.m_page_idx= page_idx;
 
441
  
 
442
  PagePtr page_ptr;
 
443
  Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
 
444
  Tuple_header* ptr= (Tuple_header*)tmp;
 
445
  
 
446
  int res= 1;
 
447
  if(ptr->m_header_bits & Tuple_header::DISK_PART)
 
448
  {
 
449
    Page_cache_client::Request req;
 
450
    memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
 
451
    req.m_callback.m_callbackData= opRec;
 
452
    req.m_callback.m_callbackFunction= 
 
453
      safe_cast(&Dbtup::disk_page_load_callback);
 
454
 
 
455
#ifdef ERROR_INSERT
 
456
    if (ERROR_INSERTED(4022))
 
457
    {
 
458
      flags |= Page_cache_client::DELAY_REQ;
 
459
      req.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)3000;
 
460
    }
 
461
#endif
 
462
    
 
463
    if((res= m_pgman.get_page(signal, req, flags)) > 0)
 
464
    {
 
465
      //ndbout_c("in cache");
 
466
      // In cache
 
467
    } 
 
468
    else if(res == 0)
 
469
    {
 
470
      //ndbout_c("waiting for callback");
 
471
      // set state
 
472
    }
 
473
    else 
 
474
    {
 
475
      // Error
 
476
    }
 
477
  }
 
478
 
 
479
  switch(flags & 7)
 
480
  {
 
481
  case ZREAD:
 
482
  case ZREAD_EX:
 
483
    break;
 
484
  case ZDELETE:
 
485
  case ZUPDATE:
 
486
  case ZINSERT:
 
487
  case ZWRITE:
 
488
    regOperPtr->op_struct.m_wait_log_buffer= 1;
 
489
    regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
 
490
  }
 
491
  return res;
 
492
}
 
493
 
 
494
void
 
495
Dbtup::disk_page_load_callback(Signal* signal, Uint32 opRec, Uint32 page_id)
 
496
{
 
497
  c_operation_pool.getPtr(operPtr, opRec);
 
498
  c_lqh->acckeyconf_load_diskpage_callback(signal, 
 
499
                                           operPtr.p->userpointer, page_id);
 
500
}
 
501
 
 
502
int
 
503
Dbtup::load_diskpage_scan(Signal* signal, 
 
504
                          Uint32 opRec, Uint32 fragPtrI, 
 
505
                          Uint32 local_key, Uint32 flags)
 
506
{
 
507
  c_operation_pool.getPtr(operPtr, opRec);
 
508
  fragptr.i= fragPtrI;
 
509
  ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
 
510
  
 
511
  Operationrec *  regOperPtr= operPtr.p;
 
512
  Fragrecord * regFragPtr= fragptr.p;
 
513
  
 
514
  tabptr.i = regFragPtr->fragTableId;
 
515
  ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
 
516
  Tablerec* regTabPtr = tabptr.p;
 
517
  
 
518
  jam();
 
519
  Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
 
520
  Uint32 frag_page_id= local_key >> MAX_TUPLES_BITS;
 
521
  regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
 
522
                                                     frag_page_id);
 
523
  regOperPtr->m_tuple_location.m_page_idx= page_idx;
 
524
  regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
 
525
  
 
526
  PagePtr page_ptr;
 
527
  Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
 
528
  Tuple_header* ptr= (Tuple_header*)tmp;
 
529
  
 
530
  int res= 1;
 
531
  if(ptr->m_header_bits & Tuple_header::DISK_PART)
 
532
  {
 
533
    Page_cache_client::Request req;
 
534
    memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
 
535
    req.m_callback.m_callbackData= opRec;
 
536
    req.m_callback.m_callbackFunction= 
 
537
      safe_cast(&Dbtup::disk_page_load_scan_callback);
 
538
    
 
539
    if((res= m_pgman.get_page(signal, req, flags)) > 0)
 
540
    {
 
541
      // ndbout_c("in cache");
 
542
      // In cache
 
543
    } 
 
544
    else if(res == 0)
 
545
    {
 
546
      //ndbout_c("waiting for callback");
 
547
      // set state
 
548
    }
 
549
    else 
 
550
    {
 
551
      // Error
 
552
    }
 
553
  }
 
554
  return res;
 
555
}
 
556
 
 
557
void
 
558
Dbtup::disk_page_load_scan_callback(Signal* signal, 
 
559
                                    Uint32 opRec, Uint32 page_id)
 
560
{
 
561
  c_operation_pool.getPtr(operPtr, opRec);
 
562
  c_lqh->next_scanconf_load_diskpage_callback(signal, 
 
563
                                              operPtr.p->userpointer, page_id);
 
564
}
 
565
 
 
566
void Dbtup::execTUPKEYREQ(Signal* signal) 
 
567
{
 
568
   TupKeyReq * tupKeyReq= (TupKeyReq *)signal->getDataPtr();
 
569
   KeyReqStruct req_struct;
 
570
   Uint32 sig1, sig2, sig3, sig4;
 
571
 
 
572
   Uint32 RoperPtr= tupKeyReq->connectPtr;
 
573
   Uint32 Rfragptr= tupKeyReq->fragPtr;
 
574
 
 
575
   Uint32 RnoOfFragrec= cnoOfFragrec;
 
576
   Uint32 RnoOfTablerec= cnoOfTablerec;
 
577
 
 
578
   jamEntry();
 
579
   fragptr.i= Rfragptr;
 
580
 
 
581
   ndbrequire(Rfragptr < RnoOfFragrec);
 
582
 
 
583
   c_operation_pool.getPtr(operPtr, RoperPtr);
 
584
   ptrAss(fragptr, fragrecord);
 
585
 
 
586
   Uint32 TrequestInfo= tupKeyReq->request;
 
587
 
 
588
   Operationrec *  regOperPtr= operPtr.p;
 
589
   Fragrecord * regFragPtr= fragptr.p;
 
590
 
 
591
   tabptr.i = regFragPtr->fragTableId;
 
592
   ptrCheckGuard(tabptr, RnoOfTablerec, tablerec);
 
593
   Tablerec* regTabPtr = tabptr.p;
 
594
 
 
595
   req_struct.signal= signal;
 
596
   req_struct.dirty_op= TrequestInfo & 1;
 
597
   req_struct.interpreted_exec= (TrequestInfo >> 10) & 1;
 
598
   req_struct.no_fired_triggers= 0;
 
599
   req_struct.read_length= 0;
 
600
   req_struct.max_attr_id_updated= 0;
 
601
   req_struct.no_changed_attrs= 0;
 
602
   req_struct.last_row= false;
 
603
   req_struct.changeMask.clear();
 
604
 
 
605
   if (unlikely(get_trans_state(regOperPtr) != TRANS_IDLE))
 
606
   {
 
607
     TUPKEY_abort(signal, 39);
 
608
     return;
 
609
   }
 
610
 
 
611
 /* ----------------------------------------------------------------- */
 
612
 // Operation is ZREAD when we arrive here so no need to worry about the
 
613
 // abort process.
 
614
 /* ----------------------------------------------------------------- */
 
615
 /* -----------    INITIATE THE OPERATION RECORD       -------------- */
 
616
 /* ----------------------------------------------------------------- */
 
617
   Uint32 Rstoredid= tupKeyReq->storedProcedure;
 
618
 
 
619
   regOperPtr->fragmentPtr= Rfragptr;
 
620
   regOperPtr->op_struct.op_type= (TrequestInfo >> 6) & 0xf;
 
621
   regOperPtr->op_struct.delete_insert_flag = false;
 
622
   regOperPtr->storedProcedureId= Rstoredid;
 
623
 
 
624
   regOperPtr->m_copy_tuple_location.setNull();
 
625
   regOperPtr->tupVersion= ZNIL;
 
626
 
 
627
   sig1= tupKeyReq->savePointId;
 
628
   sig2= tupKeyReq->primaryReplica;
 
629
   sig3= tupKeyReq->keyRef2;
 
630
   
 
631
   regOperPtr->savepointId= sig1;
 
632
   regOperPtr->op_struct.primary_replica= sig2;
 
633
   Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3;
 
634
 
 
635
   sig1= tupKeyReq->opRef;
 
636
   sig2= tupKeyReq->tcOpIndex;
 
637
   sig3= tupKeyReq->coordinatorTC;
 
638
   sig4= tupKeyReq->keyRef1;
 
639
 
 
640
   req_struct.tc_operation_ptr= sig1;
 
641
   req_struct.TC_index= sig2;
 
642
   req_struct.TC_ref= sig3;
 
643
   Uint32 pageid = req_struct.frag_page_id= sig4;
 
644
   req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
 
645
 
 
646
   sig1= tupKeyReq->attrBufLen;
 
647
   sig2= tupKeyReq->applRef;
 
648
   sig3= tupKeyReq->transId1;
 
649
   sig4= tupKeyReq->transId2;
 
650
 
 
651
   Uint32 disk_page= tupKeyReq->disk_page;
 
652
   
 
653
   req_struct.log_size= sig1;
 
654
   req_struct.attrinfo_len= sig1;
 
655
   req_struct.rec_blockref= sig2;
 
656
   req_struct.trans_id1= sig3;
 
657
   req_struct.trans_id2= sig4;
 
658
   req_struct.m_disk_page_ptr.i= disk_page;
 
659
 
 
660
   sig1 = tupKeyReq->m_row_id_page_no;
 
661
   sig2 = tupKeyReq->m_row_id_page_idx;
 
662
 
 
663
   req_struct.m_row_id.m_page_no = sig1;
 
664
   req_struct.m_row_id.m_page_idx = sig2;
 
665
   
 
666
   Uint32 Roptype = regOperPtr->op_struct.op_type;
 
667
 
 
668
   if (Rstoredid != ZNIL) {
 
669
     ndbrequire(initStoredOperationrec(regOperPtr,
 
670
                                       &req_struct,
 
671
                                       Rstoredid) == ZOK);
 
672
   }
 
673
 
 
674
   copyAttrinfo(regOperPtr, &cinBuffer[0]);
 
675
   
 
676
   Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
 
677
   if (Roptype == ZINSERT && localkey == ~ (Uint32) 0)
 
678
   {
 
679
     // No tuple allocatated yet
 
680
     goto do_insert;
 
681
   }
 
682
 
 
683
   /**
 
684
    * Get pointer to tuple
 
685
    */
 
686
   regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr, 
 
687
                                                      req_struct.frag_page_id);
 
688
   
 
689
   setup_fixed_part(&req_struct, regOperPtr, regTabPtr);
 
690
   
 
691
   /**
 
692
    * Check operation
 
693
    */
 
694
   if (Roptype == ZREAD) {
 
695
     jam();
 
696
     
 
697
     if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr, 
 
698
                    disk_page != RNIL))
 
699
     {
 
700
       if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1) 
 
701
       {
 
702
         req_struct.log_size= 0;
 
703
         sendTUPKEYCONF(signal, &req_struct, regOperPtr);
 
704
         /* ---------------------------------------------------------------- */
 
705
         // Read Operations need not to be taken out of any lists. 
 
706
         // We also do not need to wait for commit since there is no changes 
 
707
         // to commit. Thus we
 
708
         // prepare the operation record already now for the next operation.
 
709
         // Write operations have set the state to STARTED above indicating 
 
710
         // that they are waiting for the Commit or Abort decision.
 
711
         /* ---------------------------------------------------------------- */
 
712
         set_trans_state(regOperPtr, TRANS_IDLE);
 
713
         regOperPtr->currentAttrinbufLen= 0;
 
714
       }
 
715
       return;
 
716
     }
 
717
     tupkeyErrorLab(signal);
 
718
     return;
 
719
   }
 
720
   
 
721
   if(insertActiveOpList(operPtr, &req_struct))
 
722
   {
 
723
     if(Roptype == ZINSERT)
 
724
     {
 
725
       jam();
 
726
   do_insert:
 
727
       if (handleInsertReq(signal, operPtr,
 
728
                           fragptr, regTabPtr, &req_struct) == -1) 
 
729
       {
 
730
         return;
 
731
       }
 
732
       if (!regTabPtr->tuxCustomTriggers.isEmpty()) 
 
733
       {
 
734
         jam();
 
735
         if (executeTuxInsertTriggers(signal,
 
736
                                      regOperPtr,
 
737
                                      regFragPtr,
 
738
                                      regTabPtr) != 0) {
 
739
           jam();
 
740
           /*
 
741
            * TUP insert succeeded but add of TUX entries failed.  All
 
742
            * TUX changes have been rolled back at this point.
 
743
            *
 
744
            * We will abort via tupkeyErrorLab() as usual.  This routine
 
745
            * however resets the operation to ZREAD.  The TUP_ABORTREQ
 
746
            * arriving later cannot then undo the insert.
 
747
            *
 
748
            * Therefore we call TUP_ABORTREQ already now.  Diskdata etc
 
749
            * should be in memory and timeslicing cannot occur.  We must
 
750
            * skip TUX abort triggers since TUX is already aborted.
 
751
            */
 
752
           signal->theData[0] = operPtr.i;
 
753
           do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
 
754
           tupkeyErrorLab(signal);
 
755
           return;
 
756
         }
 
757
       }
 
758
       checkImmediateTriggersAfterInsert(&req_struct,
 
759
                                         regOperPtr,
 
760
                                         regTabPtr,
 
761
                                         disk_page != RNIL);
 
762
       set_change_mask_state(regOperPtr, SET_ALL_MASK);
 
763
       sendTUPKEYCONF(signal, &req_struct, regOperPtr);
 
764
       return;
 
765
     }
 
766
 
 
767
     if (Roptype == ZUPDATE) {
 
768
       jam();
 
769
       if (handleUpdateReq(signal, regOperPtr,
 
770
                           regFragPtr, regTabPtr, &req_struct, disk_page != RNIL) == -1) {
 
771
         return;
 
772
       }
 
773
       // If update operation is done on primary, 
 
774
       // check any after op triggers
 
775
       terrorCode= 0;
 
776
       if (!regTabPtr->tuxCustomTriggers.isEmpty()) {
 
777
         jam();
 
778
         if (executeTuxUpdateTriggers(signal,
 
779
                                      regOperPtr,
 
780
                                      regFragPtr,
 
781
                                      regTabPtr) != 0) {
 
782
           jam();
 
783
           /*
 
784
            * See insert case.
 
785
            */
 
786
           signal->theData[0] = operPtr.i;
 
787
           do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
 
788
           tupkeyErrorLab(signal);
 
789
           return;
 
790
         }
 
791
       }
 
792
       checkImmediateTriggersAfterUpdate(&req_struct,
 
793
                                         regOperPtr,
 
794
                                         regTabPtr,
 
795
                                         disk_page != RNIL);
 
796
       // XXX use terrorCode for now since all methods are void
 
797
       if (terrorCode != 0) 
 
798
       {
 
799
         tupkeyErrorLab(signal);
 
800
         return;
 
801
       }
 
802
       update_change_mask_info(&req_struct, regOperPtr);
 
803
       sendTUPKEYCONF(signal, &req_struct, regOperPtr);
 
804
       return;
 
805
     } 
 
806
     else if(Roptype == ZDELETE)
 
807
     {
 
808
       jam();
 
809
       req_struct.log_size= 0;
 
810
       if (handleDeleteReq(signal, regOperPtr,
 
811
                           regFragPtr, regTabPtr, 
 
812
                           &req_struct,
 
813
                           disk_page != RNIL) == -1) {
 
814
         return;
 
815
       }
 
816
       /*
 
817
        * TUX doesn't need to check for triggers at delete since entries in
 
818
        * the index are kept until commit time.
 
819
        */
 
820
 
 
821
       /*
 
822
        * Secondary index triggers fire on the primary after a delete.
 
823
        */
 
824
       checkImmediateTriggersAfterDelete(&req_struct,
 
825
                                         regOperPtr, 
 
826
                                         regTabPtr,
 
827
                                         disk_page != RNIL);
 
828
       set_change_mask_state(regOperPtr, DELETE_CHANGES);
 
829
       sendTUPKEYCONF(signal, &req_struct, regOperPtr);
 
830
       return;
 
831
     }
 
832
     else
 
833
     {
 
834
       ndbrequire(false); // Invalid op type
 
835
     }
 
836
   }
 
837
 
 
838
   tupkeyErrorLab(signal);
 
839
 }
 
840
 
 
841
void
 
842
Dbtup::setup_fixed_part(KeyReqStruct* req_struct,
 
843
                        Operationrec* regOperPtr,
 
844
                        Tablerec* regTabPtr)
 
845
{
 
846
  PagePtr page_ptr;
 
847
  Uint32* ptr= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
 
848
  req_struct->m_page_ptr = page_ptr;
 
849
  req_struct->m_tuple_ptr = (Tuple_header*)ptr;
 
850
  
 
851
  ndbassert(regOperPtr->op_struct.op_type == ZINSERT || (! (req_struct->m_tuple_ptr->m_header_bits & Tuple_header::FREE)));
 
852
  
 
853
  req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
 
854
  req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
 
855
  
 
856
  Uint32 num_attr= regTabPtr->m_no_of_attributes;
 
857
  Uint32 descr_start= regTabPtr->tabDescriptor;
 
858
  TableDescriptor *tab_descr= &tableDescriptor[descr_start];
 
859
  ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
 
860
  req_struct->attr_descr= tab_descr; 
 
861
}
 
862
 
 
863
 /* ---------------------------------------------------------------- */
 
864
 /* ------------------------ CONFIRM REQUEST ----------------------- */
 
865
 /* ---------------------------------------------------------------- */
 
866
 void Dbtup::sendTUPKEYCONF(Signal* signal,
 
867
                            KeyReqStruct *req_struct,
 
868
                            Operationrec * regOperPtr)
 
869
{
 
870
  TupKeyConf * tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();  
 
871
  
 
872
  Uint32 Rcreate_rowid = req_struct->m_use_rowid;
 
873
  Uint32 RuserPointer= regOperPtr->userpointer;
 
874
  Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
 
875
  Uint32 log_size= req_struct->log_size;
 
876
  Uint32 read_length= req_struct->read_length;
 
877
  Uint32 last_row= req_struct->last_row;
 
878
  
 
879
  set_trans_state(regOperPtr, TRANS_STARTED);
 
880
  set_tuple_state(regOperPtr, TUPLE_PREPARED);
 
881
  tupKeyConf->userPtr= RuserPointer;
 
882
  tupKeyConf->readLength= read_length;
 
883
  tupKeyConf->writeLength= log_size;
 
884
  tupKeyConf->noFiredTriggers= RnoFiredTriggers;
 
885
  tupKeyConf->lastRow= last_row;
 
886
  tupKeyConf->rowid = Rcreate_rowid;
 
887
  
 
888
  EXECUTE_DIRECT(DBLQH, GSN_TUPKEYCONF, signal,
 
889
                 TupKeyConf::SignalLength);
 
890
  
 
891
}
 
892
 
 
893
 
 
894
#define MAX_READ (sizeof(signal->theData) > MAX_MESSAGE_SIZE ? MAX_MESSAGE_SIZE : sizeof(signal->theData))
 
895
 
 
896
/* ---------------------------------------------------------------- */
 
897
/* ----------------------------- READ  ---------------------------- */
 
898
/* ---------------------------------------------------------------- */
 
899
int Dbtup::handleReadReq(Signal* signal,
 
900
                         Operationrec* regOperPtr,
 
901
                         Tablerec* regTabPtr,
 
902
                         KeyReqStruct* req_struct)
 
903
{
 
904
  Uint32 *dst;
 
905
  Uint32 dstLen, start_index;
 
906
  const BlockReference sendBref= req_struct->rec_blockref;
 
907
  if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
 
908
      (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0)) {
 
909
    jam();
 
910
    ndbout_c("here2");
 
911
    terrorCode= ZTUPLE_CORRUPTED_ERROR;
 
912
    tupkeyErrorLab(signal);
 
913
    return -1;
 
914
  }
 
915
 
 
916
  const Uint32 node = refToNode(sendBref);
 
917
  if(node != 0 && node != getOwnNodeId()) {
 
918
    start_index= 25;
 
919
  } else {
 
920
    jam();
 
921
    /**
 
922
     * execute direct
 
923
     */
 
924
    start_index= 3;
 
925
  }
 
926
  dst= &signal->theData[start_index];
 
927
  dstLen= (MAX_READ / 4) - start_index;
 
928
  if (!req_struct->interpreted_exec) {
 
929
    jam();
 
930
    int ret = readAttributes(req_struct,
 
931
                             &cinBuffer[0],
 
932
                             req_struct->attrinfo_len,
 
933
                             dst,
 
934
                             dstLen,
 
935
                             false);
 
936
    if (likely(ret != -1)) {
 
937
/* ------------------------------------------------------------------------- */
 
938
// We have read all data into coutBuffer. Now send it to the API.
 
939
/* ------------------------------------------------------------------------- */
 
940
      jam();
 
941
      Uint32 TnoOfDataRead= (Uint32) ret;
 
942
      req_struct->read_length= TnoOfDataRead;
 
943
      sendReadAttrinfo(signal, req_struct, TnoOfDataRead, regOperPtr);
 
944
      return 0;
 
945
    }
 
946
  } else {
 
947
    jam();
 
948
    if (likely(interpreterStartLab(signal, req_struct) != -1)) {
 
949
      return 0;
 
950
    }
 
951
    return -1;
 
952
  }
 
953
 
 
954
  jam();
 
955
  tupkeyErrorLab(signal);
 
956
  return -1;
 
957
}
 
958
 
 
959
/* ---------------------------------------------------------------- */
 
960
/* ---------------------------- UPDATE ---------------------------- */
 
961
/* ---------------------------------------------------------------- */
 
962
int Dbtup::handleUpdateReq(Signal* signal,
 
963
                           Operationrec* operPtrP,
 
964
                           Fragrecord* regFragPtr,
 
965
                           Tablerec* regTabPtr,
 
966
                           KeyReqStruct* req_struct,
 
967
                           bool disk) 
 
968
{
 
969
  Uint32 *dst;
 
970
  Tuple_header *base= req_struct->m_tuple_ptr, *org;
 
971
  if ((dst= c_undo_buffer.alloc_copy_tuple(&operPtrP->m_copy_tuple_location,
 
972
                                           regTabPtr->total_rec_size)) == 0)
 
973
  {
 
974
    terrorCode= ZMEM_NOMEM_ERROR;
 
975
    goto error;
 
976
  }
 
977
 
 
978
  Uint32 tup_version;
 
979
  if(operPtrP->is_first_operation())
 
980
  {
 
981
    org= req_struct->m_tuple_ptr;
 
982
    tup_version= org->get_tuple_version();
 
983
  }
 
984
  else
 
985
  {
 
986
    Operationrec* prevOp= req_struct->prevOpPtr.p;
 
987
    tup_version= prevOp->tupVersion;
 
988
    org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
 
989
  }
 
990
 
 
991
  /**
 
992
   * Check consistency before update/delete
 
993
   */
 
994
  req_struct->m_tuple_ptr= org;
 
995
  if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
 
996
      (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0)) 
 
997
  {
 
998
    terrorCode= ZTUPLE_CORRUPTED_ERROR;
 
999
    goto error;
 
1000
  }
 
1001
 
 
1002
  req_struct->m_tuple_ptr= (Tuple_header*)dst;
 
1003
 
 
1004
  union {
 
1005
    Uint32 sizes[4];
 
1006
    Uint64 cmp[2];
 
1007
  };
 
1008
  
 
1009
  disk = disk || (org->m_header_bits & Tuple_header::DISK_INLINE);
 
1010
  if (regTabPtr->need_expand(disk))
 
1011
  {
 
1012
    expand_tuple(req_struct, sizes, org, regTabPtr, disk);
 
1013
    if(disk && operPtrP->m_undo_buffer_space == 0)
 
1014
    {
 
1015
      operPtrP->op_struct.m_wait_log_buffer = 1;
 
1016
      operPtrP->op_struct.m_load_diskpage_on_commit = 1;
 
1017
      Uint32 sz= operPtrP->m_undo_buffer_space= 
 
1018
        (sizeof(Dbtup::Disk_undo::Update) >> 2) + sizes[DD] - 1;
 
1019
      
 
1020
      terrorCode= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
 
1021
                                           sz);
 
1022
      if(unlikely(terrorCode))
 
1023
      {
 
1024
        operPtrP->m_undo_buffer_space= 0;
 
1025
        goto error;
 
1026
      }
 
1027
    }
 
1028
  }
 
1029
  else
 
1030
  {
 
1031
    memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
 
1032
  }
 
1033
  
 
1034
  tup_version= (tup_version + 1) & ZTUP_VERSION_MASK;
 
1035
  operPtrP->tupVersion= tup_version;
 
1036
  
 
1037
  if (!req_struct->interpreted_exec) {
 
1038
    jam();
 
1039
    int retValue = updateAttributes(req_struct,
 
1040
                                    &cinBuffer[0],
 
1041
                                    req_struct->attrinfo_len);
 
1042
    if (unlikely(retValue == -1))
 
1043
      goto error;
 
1044
  } else {
 
1045
    jam();
 
1046
    if (unlikely(interpreterStartLab(signal, req_struct) == -1))
 
1047
      return -1;
 
1048
  }
 
1049
  
 
1050
  if (regTabPtr->need_shrink())
 
1051
  {  
 
1052
    shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
 
1053
    if (cmp[0] != cmp[1] && handle_size_change_after_update(req_struct,
 
1054
                                                            base,
 
1055
                                                            operPtrP,
 
1056
                                                            regFragPtr,
 
1057
                                                            regTabPtr,
 
1058
                                                            sizes)) {
 
1059
      goto error;
 
1060
    }
 
1061
  }
 
1062
  
 
1063
  req_struct->m_tuple_ptr->set_tuple_version(tup_version);
 
1064
  if (regTabPtr->m_bits & Tablerec::TR_Checksum) {
 
1065
    jam();
 
1066
    setChecksum(req_struct->m_tuple_ptr, regTabPtr);
 
1067
  }
 
1068
  return 0;
 
1069
  
 
1070
error:
 
1071
  tupkeyErrorLab(signal);  
 
1072
  return -1;
 
1073
}
 
1074
 
 
1075
/* ---------------------------------------------------------------- */
 
1076
/* ----------------------------- INSERT --------------------------- */
 
1077
/* ---------------------------------------------------------------- */
 
1078
void
 
1079
Dbtup::prepare_initial_insert(KeyReqStruct *req_struct, 
 
1080
                              Operationrec* regOperPtr,
 
1081
                              Tablerec* regTabPtr)
 
1082
{
 
1083
  Uint32 disk_undo = regTabPtr->m_no_of_disk_attributes ? 
 
1084
    sizeof(Dbtup::Disk_undo::Alloc) >> 2 : 0;
 
1085
  regOperPtr->nextActiveOp= RNIL;
 
1086
  regOperPtr->prevActiveOp= RNIL;
 
1087
  regOperPtr->op_struct.in_active_list= true;
 
1088
  regOperPtr->m_undo_buffer_space= disk_undo; 
 
1089
  
 
1090
  req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
 
1091
  req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
 
1092
  
 
1093
  Uint32 num_attr= regTabPtr->m_no_of_attributes;
 
1094
  Uint32 descr_start= regTabPtr->tabDescriptor;
 
1095
  Uint32 order_desc= regTabPtr->m_real_order_descriptor;
 
1096
  TableDescriptor *tab_descr= &tableDescriptor[descr_start];
 
1097
  ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
 
1098
  req_struct->attr_descr= tab_descr; 
 
1099
  Uint16* order= (Uint16*)&tableDescriptor[order_desc];
 
1100
 
 
1101
  const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
 
1102
  const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
 
1103
  Uint32 *ptr= req_struct->m_tuple_ptr->get_end_of_fix_part_ptr(regTabPtr);
 
1104
  Var_part_ref* ref = req_struct->m_tuple_ptr->get_var_part_ref_ptr(regTabPtr);
 
1105
 
 
1106
  if (regTabPtr->m_bits & Tablerec::TR_ForceVarPart)
 
1107
  {
 
1108
    ref->m_page_no = RNIL;
 
1109
    ref->m_page_idx = Tup_varsize_page::END_OF_FREE_LIST;
 
1110
  }
 
1111
  
 
1112
  if(cnt1)
 
1113
  {
 
1114
    KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
 
1115
    dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt1+1);
 
1116
    dst->m_offset_array_ptr= req_struct->var_pos_array;
 
1117
    dst->m_var_len_offset= cnt1;
 
1118
    dst->m_max_var_offset= regTabPtr->m_offsets[MM].m_max_var_offset;
 
1119
    // Disk part is 32-bit aligned
 
1120
    ptr= ALIGN_WORD(dst->m_data_ptr+regTabPtr->m_offsets[MM].m_max_var_offset);
 
1121
    order += regTabPtr->m_attributes[MM].m_no_of_fixsize;
 
1122
    Uint32 pos= 0;
 
1123
    Uint16 *pos_ptr = req_struct->var_pos_array;
 
1124
    Uint16 *len_ptr = pos_ptr + cnt1;
 
1125
    for(Uint32 i= 0; i<cnt1; i++)
 
1126
    {
 
1127
      * pos_ptr++ = pos;
 
1128
      * len_ptr++ = pos;
 
1129
      pos += AttributeDescriptor::getSizeInBytes(tab_descr[*order++].tabDescr);
 
1130
    }
 
1131
  } 
 
1132
 
 
1133
  req_struct->m_disk_ptr= (Tuple_header*)ptr;
 
1134
  
 
1135
  ndbrequire(cnt2 == 0);
 
1136
  
 
1137
  // Set all null bits
 
1138
  memset(req_struct->m_tuple_ptr->m_null_bits+
 
1139
         regTabPtr->m_offsets[MM].m_null_offset, 0xFF, 
 
1140
         4*regTabPtr->m_offsets[MM].m_null_words);
 
1141
  memset(req_struct->m_disk_ptr->m_null_bits+
 
1142
         regTabPtr->m_offsets[DD].m_null_offset, 0xFF, 
 
1143
         4*regTabPtr->m_offsets[DD].m_null_words);
 
1144
  req_struct->m_tuple_ptr->m_header_bits= 
 
1145
    disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
 
1146
}
 
1147
 
 
1148
int Dbtup::handleInsertReq(Signal* signal,
 
1149
                           Ptr<Operationrec> regOperPtr,
 
1150
                           Ptr<Fragrecord> fragPtr,
 
1151
                           Tablerec* regTabPtr,
 
1152
                           KeyReqStruct *req_struct)
 
1153
{
 
1154
  Uint32 tup_version = 1;
 
1155
  Fragrecord* regFragPtr = fragPtr.p;
 
1156
  Uint32 *dst, *ptr= 0;
 
1157
  Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
 
1158
  Tuple_header *tuple_ptr;
 
1159
    
 
1160
  bool disk = regTabPtr->m_no_of_disk_attributes > 0;
 
1161
  bool mem_insert = regOperPtr.p->is_first_operation();
 
1162
  bool disk_insert = mem_insert && disk;
 
1163
  bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
 
1164
  bool rowid = req_struct->m_use_rowid;
 
1165
  Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
 
1166
  Uint32 frag_page_id = req_struct->frag_page_id;
 
1167
 
 
1168
  union {
 
1169
    Uint32 sizes[4];
 
1170
    Uint64 cmp[2];
 
1171
  };
 
1172
 
 
1173
  if (ERROR_INSERTED(4014))
 
1174
  {
 
1175
    dst = 0;
 
1176
    goto undo_buffer_error;
 
1177
  }
 
1178
 
 
1179
  dst= c_undo_buffer.alloc_copy_tuple(&regOperPtr.p->m_copy_tuple_location,
 
1180
                                      regTabPtr->total_rec_size);
 
1181
  if (unlikely(dst == 0))
 
1182
  {
 
1183
    goto undo_buffer_error;
 
1184
  }
 
1185
  tuple_ptr= req_struct->m_tuple_ptr= (Tuple_header*)dst;
 
1186
 
 
1187
  if(mem_insert)
 
1188
  {
 
1189
    jam();
 
1190
    prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
 
1191
  }
 
1192
  else
 
1193
  {
 
1194
    Operationrec* prevOp= req_struct->prevOpPtr.p;
 
1195
    ndbassert(prevOp->op_struct.op_type == ZDELETE);
 
1196
    tup_version= prevOp->tupVersion + 1;
 
1197
    
 
1198
    if(!prevOp->is_first_operation())
 
1199
      org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
 
1200
    if (regTabPtr->need_expand())
 
1201
    {
 
1202
      expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
 
1203
      memset(req_struct->m_disk_ptr->m_null_bits+
 
1204
             regTabPtr->m_offsets[DD].m_null_offset, 0xFF, 
 
1205
             4*regTabPtr->m_offsets[DD].m_null_words);
 
1206
    } 
 
1207
    else
 
1208
    {
 
1209
      memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
 
1210
    }
 
1211
    memset(tuple_ptr->m_null_bits+
 
1212
           regTabPtr->m_offsets[MM].m_null_offset, 0xFF, 
 
1213
           4*regTabPtr->m_offsets[MM].m_null_words);
 
1214
  }
 
1215
  
 
1216
  if (disk_insert)
 
1217
  {
 
1218
    int res;
 
1219
    
 
1220
    if (ERROR_INSERTED(4015))
 
1221
    {
 
1222
      terrorCode = 1501;
 
1223
      goto log_space_error;
 
1224
    }
 
1225
 
 
1226
    res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
 
1227
                                  regOperPtr.p->m_undo_buffer_space);
 
1228
    if(unlikely(res))
 
1229
    {
 
1230
      terrorCode= res;
 
1231
      goto log_space_error;
 
1232
    }
 
1233
  }
 
1234
  
 
1235
  regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK;
 
1236
  tuple_ptr->set_tuple_version(tup_version);
 
1237
 
 
1238
  if (ERROR_INSERTED(4016))
 
1239
  {
 
1240
    terrorCode = ZAI_INCONSISTENCY_ERROR;
 
1241
    goto update_error;
 
1242
  }
 
1243
 
 
1244
  if(unlikely(updateAttributes(req_struct, &cinBuffer[0], 
 
1245
                               req_struct->attrinfo_len) == -1))
 
1246
  {
 
1247
    goto update_error;
 
1248
  }
 
1249
 
 
1250
  if (ERROR_INSERTED(4017))
 
1251
  {
 
1252
    goto null_check_error;
 
1253
  }
 
1254
  if (unlikely(checkNullAttributes(req_struct, regTabPtr) == false))
 
1255
  {
 
1256
    goto null_check_error;
 
1257
  }
 
1258
  
 
1259
  if (regTabPtr->need_shrink())
 
1260
  {  
 
1261
    shrink_tuple(req_struct, sizes+2, regTabPtr, true);
 
1262
  }
 
1263
 
 
1264
  if (ERROR_INSERTED(4025))
 
1265
  {
 
1266
    goto mem_error;
 
1267
  }
 
1268
 
 
1269
  if (ERROR_INSERTED(4026))
 
1270
  {
 
1271
    CLEAR_ERROR_INSERT_VALUE;
 
1272
    goto mem_error;
 
1273
  }
 
1274
 
 
1275
  if (ERROR_INSERTED(4027) && (rand() % 100) > 25)
 
1276
  {
 
1277
    goto mem_error;
 
1278
  }
 
1279
 
 
1280
  if (ERROR_INSERTED(4028) && (rand() % 100) > 25)
 
1281
  {
 
1282
    CLEAR_ERROR_INSERT_VALUE;
 
1283
    goto mem_error;
 
1284
  }
 
1285
  
 
1286
  /**
 
1287
   * Alloc memory
 
1288
   */
 
1289
  if(mem_insert)
 
1290
  {
 
1291
    if (!rowid)
 
1292
    {
 
1293
      if (ERROR_INSERTED(4018))
 
1294
      {
 
1295
        goto mem_error;
 
1296
      }
 
1297
 
 
1298
      if (!varsize)
 
1299
      {
 
1300
        jam();
 
1301
        ptr= alloc_fix_rec(regFragPtr,
 
1302
                           regTabPtr,
 
1303
                           &regOperPtr.p->m_tuple_location,
 
1304
                           &frag_page_id);
 
1305
      } 
 
1306
      else 
 
1307
      {
 
1308
        jam();
 
1309
        regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
 
1310
        ptr= alloc_var_rec(regFragPtr, regTabPtr,
 
1311
                           sizes[2+MM],
 
1312
                           &regOperPtr.p->m_tuple_location,
 
1313
                           &frag_page_id);
 
1314
      }
 
1315
      if (unlikely(ptr == 0))
 
1316
      {
 
1317
        goto mem_error;
 
1318
      }
 
1319
      req_struct->m_use_rowid = true;
 
1320
    }
 
1321
    else
 
1322
    {
 
1323
      regOperPtr.p->m_tuple_location = req_struct->m_row_id;
 
1324
      if (ERROR_INSERTED(4019))
 
1325
      {
 
1326
        terrorCode = ZROWID_ALLOCATED;
 
1327
        goto alloc_rowid_error;
 
1328
      }
 
1329
      
 
1330
      if (!varsize)
 
1331
      {
 
1332
        jam();
 
1333
        ptr= alloc_fix_rowid(regFragPtr,
 
1334
                             regTabPtr,
 
1335
                             &regOperPtr.p->m_tuple_location,
 
1336
                             &frag_page_id);
 
1337
      } 
 
1338
      else 
 
1339
      {
 
1340
        jam();
 
1341
        regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
 
1342
        ptr= alloc_var_rowid(regFragPtr, regTabPtr,
 
1343
                             sizes[2+MM],
 
1344
                             &regOperPtr.p->m_tuple_location,
 
1345
                             &frag_page_id);
 
1346
      }
 
1347
      if (unlikely(ptr == 0))
 
1348
      {
 
1349
        jam();
 
1350
        goto alloc_rowid_error;
 
1351
      }
 
1352
    }
 
1353
    real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
 
1354
    regOperPtr.p->m_tuple_location.m_page_no= frag_page_id;
 
1355
    c_lqh->accminupdate(signal,
 
1356
                        regOperPtr.p->userpointer,
 
1357
                        &regOperPtr.p->m_tuple_location);
 
1358
    
 
1359
    base = (Tuple_header*)ptr;
 
1360
    base->m_operation_ptr_i= regOperPtr.i;
 
1361
    base->m_header_bits= Tuple_header::ALLOC | 
 
1362
      (varsize ? Tuple_header::CHAINED_ROW : 0);
 
1363
    regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
 
1364
  }
 
1365
  else 
 
1366
  {
 
1367
    int ret;
 
1368
    if (ERROR_INSERTED(4020))
 
1369
    {
 
1370
      goto size_change_error;
 
1371
    }
 
1372
 
 
1373
    if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
 
1374
        unlikely(ret = handle_size_change_after_update(req_struct,
 
1375
                                                       base,
 
1376
                                                       regOperPtr.p,
 
1377
                                                       regFragPtr,
 
1378
                                                       regTabPtr,
 
1379
                                                       sizes)))
 
1380
    {
 
1381
      goto size_change_error;
 
1382
    }
 
1383
    req_struct->m_use_rowid = false;
 
1384
    base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
 
1385
  }
 
1386
 
 
1387
  base->m_header_bits |= Tuple_header::ALLOC & 
 
1388
    (regOperPtr.p->is_first_operation() ? ~0 : 1);
 
1389
  
 
1390
  if (disk_insert)
 
1391
  {
 
1392
    Local_key tmp;
 
1393
    Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ? 
 
1394
      1 : sizes[2+DD];
 
1395
    
 
1396
    if (ERROR_INSERTED(4021))
 
1397
    {
 
1398
      terrorCode = 1601;
 
1399
      goto disk_prealloc_error;
 
1400
    }
 
1401
    
 
1402
    int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
 
1403
    if (unlikely(ret < 0))
 
1404
    {
 
1405
      terrorCode = -ret;
 
1406
      goto disk_prealloc_error;
 
1407
    }
 
1408
    
 
1409
    regOperPtr.p->op_struct.m_disk_preallocated= 1;
 
1410
    tmp.m_page_idx= size;
 
1411
    memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
 
1412
    
 
1413
    /**
 
1414
     * Set ref from disk to mm
 
1415
     */
 
1416
    Local_key ref = regOperPtr.p->m_tuple_location;
 
1417
    ref.m_page_no = frag_page_id;
 
1418
    
 
1419
    Tuple_header* disk_ptr= req_struct->m_disk_ptr;
 
1420
    disk_ptr->m_header_bits = 0;
 
1421
    disk_ptr->m_base_record_ref= ref.ref();
 
1422
  }
 
1423
  
 
1424
  if (regTabPtr->m_bits & Tablerec::TR_Checksum) 
 
1425
  {
 
1426
    jam();
 
1427
    setChecksum(req_struct->m_tuple_ptr, regTabPtr);
 
1428
  }
 
1429
  return 0;
 
1430
  
 
1431
size_change_error:
 
1432
  jam();
 
1433
  terrorCode = ZMEM_NOMEM_ERROR;
 
1434
  goto exit_error;
 
1435
  
 
1436
undo_buffer_error:
 
1437
  jam();
 
1438
  terrorCode= ZMEM_NOMEM_ERROR;
 
1439
  regOperPtr.p->m_undo_buffer_space = 0;
 
1440
  if (mem_insert)
 
1441
    regOperPtr.p->m_tuple_location.setNull();
 
1442
  regOperPtr.p->m_copy_tuple_location.setNull();
 
1443
  tupkeyErrorLab(signal);  
 
1444
  return -1;
 
1445
  
 
1446
null_check_error:
 
1447
  jam();
 
1448
  terrorCode= ZNO_ILLEGAL_NULL_ATTR;
 
1449
  goto update_error;
 
1450
 
 
1451
mem_error:
 
1452
  jam();
 
1453
  terrorCode= ZMEM_NOMEM_ERROR;
 
1454
  goto update_error;
 
1455
 
 
1456
log_space_error:
 
1457
  jam();
 
1458
  regOperPtr.p->m_undo_buffer_space = 0;
 
1459
alloc_rowid_error:
 
1460
  jam();
 
1461
update_error:
 
1462
  jam();
 
1463
  if (mem_insert)
 
1464
  {
 
1465
    regOperPtr.p->op_struct.in_active_list = false;
 
1466
    regOperPtr.p->m_tuple_location.setNull();
 
1467
  }
 
1468
exit_error:
 
1469
  tupkeyErrorLab(signal);
 
1470
  return -1;
 
1471
 
 
1472
disk_prealloc_error:
 
1473
  base->m_header_bits |= Tuple_header::FREED;
 
1474
  goto exit_error;
 
1475
}
 
1476
 
 
1477
/* ---------------------------------------------------------------- */
 
1478
/* ---------------------------- DELETE ---------------------------- */
 
1479
/* ---------------------------------------------------------------- */
 
1480
int Dbtup::handleDeleteReq(Signal* signal,
 
1481
                           Operationrec* regOperPtr,
 
1482
                           Fragrecord* regFragPtr,
 
1483
                           Tablerec* regTabPtr,
 
1484
                           KeyReqStruct *req_struct,
 
1485
                           bool disk)
 
1486
{
 
1487
  // delete must set but not increment tupVersion
 
1488
  if (!regOperPtr->is_first_operation())
 
1489
  {
 
1490
    Operationrec* prevOp= req_struct->prevOpPtr.p;
 
1491
    regOperPtr->tupVersion= prevOp->tupVersion;
 
1492
    // make copy since previous op is committed before this one
 
1493
    const Uint32* org = c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
 
1494
    Uint32* dst = c_undo_buffer.alloc_copy_tuple(
 
1495
        &regOperPtr->m_copy_tuple_location, regTabPtr->total_rec_size);
 
1496
    if (dst == 0) {
 
1497
      terrorCode = ZMEM_NOMEM_ERROR;
 
1498
      goto error;
 
1499
    }
 
1500
    memcpy(dst, org, regTabPtr->total_rec_size << 2);
 
1501
    req_struct->m_tuple_ptr = (Tuple_header*)dst;
 
1502
  } 
 
1503
  else 
 
1504
  {
 
1505
    regOperPtr->tupVersion= req_struct->m_tuple_ptr->get_tuple_version();
 
1506
  }
 
1507
 
 
1508
  if(disk && regOperPtr->m_undo_buffer_space == 0)
 
1509
  {
 
1510
    regOperPtr->op_struct.m_wait_log_buffer = 1;
 
1511
    regOperPtr->op_struct.m_load_diskpage_on_commit = 1;
 
1512
    Uint32 sz= regOperPtr->m_undo_buffer_space= 
 
1513
      (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
 
1514
      regTabPtr->m_offsets[DD].m_fix_header_size - 1;
 
1515
    
 
1516
    terrorCode= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
 
1517
                                         sz);
 
1518
    if(unlikely(terrorCode))
 
1519
    {
 
1520
      regOperPtr->m_undo_buffer_space= 0;
 
1521
      goto error;
 
1522
    }
 
1523
  }
 
1524
  if (req_struct->attrinfo_len == 0)
 
1525
  {
 
1526
    return 0;
 
1527
  }
 
1528
 
 
1529
  if (regTabPtr->need_expand(disk))
 
1530
  {
 
1531
    prepare_read(req_struct, regTabPtr, disk);
 
1532
  }
 
1533
  
 
1534
  {
 
1535
    Uint32 RlogSize;
 
1536
    int ret= handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
 
1537
    if (ret == 0 && (RlogSize= req_struct->log_size))
 
1538
    {
 
1539
      jam();
 
1540
      sendLogAttrinfo(signal, RlogSize, regOperPtr);
 
1541
    }
 
1542
    return ret;
 
1543
  }
 
1544
 
 
1545
error:
 
1546
  tupkeyErrorLab(signal);
 
1547
  return -1;
 
1548
}
 
1549
 
 
1550
bool
 
1551
Dbtup::checkNullAttributes(KeyReqStruct * req_struct,
 
1552
                           Tablerec* regTabPtr)
 
1553
{
 
1554
// Implement checking of updating all not null attributes in an insert here.
 
1555
  Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;  
 
1556
  /* 
 
1557
   * The idea here is maybe that changeMask is not-null attributes
 
1558
   * and must contain notNullAttributeMask.  But:
 
1559
   *
 
1560
   * 1. changeMask has all bits set on insert
 
1561
   * 2. not-null is checked in each UpdateFunction
 
1562
   * 3. the code below does not work except trivially due to 1.
 
1563
   *
 
1564
   * XXX remove or fix
 
1565
   */
 
1566
  attributeMask.clear();
 
1567
  attributeMask.bitOR(req_struct->changeMask);
 
1568
  attributeMask.bitAND(regTabPtr->notNullAttributeMask);
 
1569
  attributeMask.bitXOR(regTabPtr->notNullAttributeMask);
 
1570
  if (!attributeMask.isclear()) {
 
1571
    return false;
 
1572
  }
 
1573
  return true;
 
1574
}
 
1575
 
 
1576
/* ---------------------------------------------------------------- */
 
1577
/* THIS IS THE START OF THE INTERPRETED EXECUTION OF UPDATES. WE    */
 
1578
/* START BY LINKING ALL ATTRINFO'S IN A DOUBLY LINKED LIST (THEY ARE*/
 
1579
/* ALREADY IN A LINKED LIST). WE ALLOCATE A REGISTER MEMORY (EQUAL  */
 
1580
/* TO AN ATTRINFO RECORD). THE INTERPRETER GOES THROUGH FOUR  PHASES*/
 
1581
/* DURING THE FIRST PHASE IT IS ONLY ALLOWED TO READ ATTRIBUTES THAT*/
 
1582
/* ARE SENT TO THE CLIENT APPLICATION. DURING THE SECOND PHASE IT IS*/
 
1583
/* ALLOWED TO READ FROM ATTRIBUTES INTO REGISTERS, TO UPDATE        */
 
1584
/* ATTRIBUTES BASED ON EITHER A CONSTANT VALUE OR A REGISTER VALUE, */
 
1585
/* A DIVERSE SET OF OPERATIONS ON REGISTERS ARE AVAILABLE AS WELL.  */
 
1586
/* IT IS ALSO POSSIBLE TO PERFORM JUMPS WITHIN THE INSTRUCTIONS THAT*/
 
1587
/* BELONGS TO THE SECOND PHASE. ALSO SUBROUTINES CAN BE CALLED IN   */
 
1588
/* THIS PHASE. THE THIRD PHASE IS TO AGAIN READ ATTRIBUTES AND      */
 
1589
/* FINALLY THE FOURTH PHASE READS SELECTED REGISTERS AND SEND THEM  */
 
1590
/* TO THE CLIENT APPLICATION.                                       */
 
1591
/* THERE IS A FIFTH REGION WHICH CONTAINS SUBROUTINES CALLABLE FROM */
 
1592
/* THE INTERPRETER EXECUTION REGION.                                */
 
1593
/* THE FIRST FIVE WORDS WILL GIVE THE LENGTH OF THE FIVEE REGIONS   */
 
1594
/*                                                                  */
 
1595
/* THIS MEANS THAT FROM THE APPLICATIONS POINT OF VIEW THE DATABASE */
 
1596
/* CAN HANDLE SUBROUTINE CALLS WHERE THE CODE IS SENT IN THE REQUEST*/
 
1597
/* THE RETURN PARAMETERS ARE FIXED AND CAN EITHER BE GENERATED      */
 
1598
/* BEFORE THE EXECUTION OF THE ROUTINE OR AFTER.                    */
 
1599
/*                                                                  */
 
1600
/* IN LATER VERSIONS WE WILL ADD MORE THINGS LIKE THE POSSIBILITY   */
 
1601
/* TO ALLOCATE MEMORY AND USE THIS AS LOCAL STORAGE. IT IS ALSO     */
 
1602
/* IMAGINABLE TO HAVE SPECIAL ROUTINES THAT CAN PERFORM CERTAIN     */
 
1603
/* OPERATIONS ON BLOB'S DEPENDENT ON WHAT THE BLOB REPRESENTS.      */
 
1604
/*                                                                  */
 
1605
/*                                                                  */
 
1606
/*       -----------------------------------------                  */
 
1607
/*       +   INITIAL READ REGION                 +                  */
 
1608
/*       -----------------------------------------                  */
 
1609
/*       +   INTERPRETED EXECUTE  REGION         +                  */
 
1610
/*       -----------------------------------------                  */
 
1611
/*       +   FINAL UPDATE REGION                 +                  */
 
1612
/*       -----------------------------------------                  */
 
1613
/*       +   FINAL READ REGION                   +                  */
 
1614
/*       -----------------------------------------                  */
 
1615
/*       +   SUBROUTINE REGION                   +                  */
 
1616
/*       -----------------------------------------                  */
 
1617
/* ---------------------------------------------------------------- */
 
1618
/* ---------------------------------------------------------------- */
 
1619
/* ----------------- INTERPRETED EXECUTION  ----------------------- */
 
1620
/* ---------------------------------------------------------------- */
 
1621
int Dbtup::interpreterStartLab(Signal* signal,
 
1622
                               KeyReqStruct *req_struct)
 
1623
{
 
1624
  Operationrec *  const regOperPtr= operPtr.p;
 
1625
  int TnoDataRW;
 
1626
  Uint32 RtotalLen, start_index, dstLen;
 
1627
  Uint32 *dst;
 
1628
 
 
1629
  Uint32 RinitReadLen= cinBuffer[0];
 
1630
  Uint32 RexecRegionLen= cinBuffer[1];
 
1631
  Uint32 RfinalUpdateLen= cinBuffer[2];
 
1632
  Uint32 RfinalRLen= cinBuffer[3];
 
1633
  Uint32 RsubLen= cinBuffer[4];
 
1634
 
 
1635
  Uint32 RattrinbufLen= req_struct->attrinfo_len;
 
1636
  const BlockReference sendBref= req_struct->rec_blockref;
 
1637
 
 
1638
  const Uint32 node = refToNode(sendBref);
 
1639
  if(node != 0 && node != getOwnNodeId()) {
 
1640
    start_index= 25;
 
1641
  } else {
 
1642
    jam();
 
1643
    /**
 
1644
     * execute direct
 
1645
     */
 
1646
    start_index= 3;
 
1647
  }
 
1648
  dst= &signal->theData[start_index];
 
1649
  dstLen= (MAX_READ / 4) - start_index;
 
1650
  
 
1651
  RtotalLen= RinitReadLen;
 
1652
  RtotalLen += RexecRegionLen;
 
1653
  RtotalLen += RfinalUpdateLen;
 
1654
  RtotalLen += RfinalRLen;
 
1655
  RtotalLen += RsubLen;
 
1656
 
 
1657
  Uint32 RattroutCounter= 0;
 
1658
  Uint32 RinstructionCounter= 5;
 
1659
  Uint32 RlogSize= 0;
 
1660
  if (((RtotalLen + 5) == RattrinbufLen) &&
 
1661
      (RattrinbufLen >= 5) &&
 
1662
      (RattrinbufLen < ZATTR_BUFFER_SIZE)) {
 
1663
    /* ---------------------------------------------------------------- */
 
1664
    // We start by checking consistency. We must have the first five
 
1665
    // words of the ATTRINFO to give us the length of the regions. The
 
1666
    // size of these regions must be the same as the total ATTRINFO
 
1667
    // length and finally the total length must be within the limits.
 
1668
    /* ---------------------------------------------------------------- */
 
1669
 
 
1670
    if (RinitReadLen > 0) {
 
1671
      jam();
 
1672
      /* ---------------------------------------------------------------- */
 
1673
      // The first step that can be taken in the interpreter is to read
 
1674
      // data of the tuple before any updates have been applied.
 
1675
      /* ---------------------------------------------------------------- */
 
1676
      TnoDataRW= readAttributes(req_struct,
 
1677
                                 &cinBuffer[5],
 
1678
                                 RinitReadLen,
 
1679
                                 &dst[0],
 
1680
                                 dstLen,
 
1681
                                 false);
 
1682
      if (TnoDataRW != -1) {
 
1683
        RattroutCounter= TnoDataRW;
 
1684
        RinstructionCounter += RinitReadLen;
 
1685
      } else {
 
1686
        jam();
 
1687
        tupkeyErrorLab(signal);
 
1688
        return -1;
 
1689
      }
 
1690
    }
 
1691
    if (RexecRegionLen > 0) {
 
1692
      jam();
 
1693
      /* ---------------------------------------------------------------- */
 
1694
      // The next step is the actual interpreted execution. This executes
 
1695
      // a register-based virtual machine which can read and write attributes
 
1696
      // to and from registers.
 
1697
      /* ---------------------------------------------------------------- */
 
1698
      Uint32 RsubPC= RinstructionCounter + RfinalUpdateLen + RfinalRLen;     
 
1699
      TnoDataRW= interpreterNextLab(signal,
 
1700
                                     req_struct,
 
1701
                                     &clogMemBuffer[0],
 
1702
                                     &cinBuffer[RinstructionCounter],
 
1703
                                     RexecRegionLen,
 
1704
                                     &cinBuffer[RsubPC],
 
1705
                                     RsubLen,
 
1706
                                     &coutBuffer[0],
 
1707
                                     sizeof(coutBuffer) / 4);
 
1708
      if (TnoDataRW != -1) {
 
1709
        RinstructionCounter += RexecRegionLen;
 
1710
        RlogSize= TnoDataRW;
 
1711
      } else {
 
1712
        jam();
 
1713
        /**
 
1714
         * TUPKEY REF is sent from within interpreter
 
1715
         */
 
1716
        return -1;
 
1717
      }
 
1718
    }
 
1719
    if (RfinalUpdateLen > 0) {
 
1720
      jam();
 
1721
      /* ---------------------------------------------------------------- */
 
1722
      // We can also apply a set of updates without any conditions as part
 
1723
      // of the interpreted execution.
 
1724
      /* ---------------------------------------------------------------- */
 
1725
      if (regOperPtr->op_struct.op_type == ZUPDATE) {
 
1726
        TnoDataRW= updateAttributes(req_struct,
 
1727
                                     &cinBuffer[RinstructionCounter],
 
1728
                                     RfinalUpdateLen);
 
1729
        if (TnoDataRW != -1) {
 
1730
          MEMCOPY_NO_WORDS(&clogMemBuffer[RlogSize],
 
1731
                           &cinBuffer[RinstructionCounter],
 
1732
                           RfinalUpdateLen);
 
1733
          RinstructionCounter += RfinalUpdateLen;
 
1734
          RlogSize += RfinalUpdateLen;
 
1735
        } else {
 
1736
          jam();
 
1737
          tupkeyErrorLab(signal);
 
1738
          return -1;
 
1739
        }
 
1740
      } else {
 
1741
        return TUPKEY_abort(signal, 19);
 
1742
      }
 
1743
    }
 
1744
    if (RfinalRLen > 0) {
 
1745
      jam();
 
1746
      /* ---------------------------------------------------------------- */
 
1747
      // The final action is that we can also read the tuple after it has
 
1748
      // been updated.
 
1749
      /* ---------------------------------------------------------------- */
 
1750
      TnoDataRW= readAttributes(req_struct,
 
1751
                                 &cinBuffer[RinstructionCounter],
 
1752
                                 RfinalRLen,
 
1753
                                 &dst[RattroutCounter],
 
1754
                                 (dstLen - RattroutCounter),
 
1755
                                 false);
 
1756
      if (TnoDataRW != -1) {
 
1757
        RattroutCounter += TnoDataRW;
 
1758
      } else {
 
1759
        jam();
 
1760
        tupkeyErrorLab(signal);
 
1761
        return -1;
 
1762
      }
 
1763
    }
 
1764
    req_struct->log_size= RlogSize;
 
1765
    req_struct->read_length= RattroutCounter;
 
1766
    sendReadAttrinfo(signal, req_struct, RattroutCounter, regOperPtr);
 
1767
    if (RlogSize > 0) {
 
1768
      sendLogAttrinfo(signal, RlogSize, regOperPtr);
 
1769
    }
 
1770
    return 0;
 
1771
  } else {
 
1772
    return TUPKEY_abort(signal, 22);
 
1773
  }
 
1774
}
 
1775
 
 
1776
/* ---------------------------------------------------------------- */
 
1777
/*       WHEN EXECUTION IS INTERPRETED WE NEED TO SEND SOME ATTRINFO*/
 
1778
/*       BACK TO LQH FOR LOGGING AND SENDING TO BACKUP AND STANDBY  */
 
1779
/*       NODES.                                                     */
 
1780
/*       INPUT:  LOG_ATTRINFOPTR         WHERE TO FETCH DATA FROM   */
 
1781
/*               TLOG_START              FIRST INDEX TO LOG         */
 
1782
/*               TLOG_END                LAST INDEX + 1 TO LOG      */
 
1783
/* ---------------------------------------------------------------- */
 
1784
void Dbtup::sendLogAttrinfo(Signal* signal,
 
1785
                            Uint32 TlogSize,
 
1786
                            Operationrec *  const regOperPtr)
 
1787
 
 
1788
{
 
1789
  Uint32 TbufferIndex= 0;
 
1790
  signal->theData[0]= regOperPtr->userpointer;
 
1791
  while (TlogSize > 22) {
 
1792
    MEMCOPY_NO_WORDS(&signal->theData[3],
 
1793
                     &clogMemBuffer[TbufferIndex],
 
1794
                     22);
 
1795
    EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 25);
 
1796
    TbufferIndex += 22;
 
1797
    TlogSize -= 22;
 
1798
  }
 
1799
  MEMCOPY_NO_WORDS(&signal->theData[3],
 
1800
                   &clogMemBuffer[TbufferIndex],
 
1801
                   TlogSize);
 
1802
  EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 3 + TlogSize);
 
1803
}
 
1804
 
 
1805
inline
 
1806
Uint32 
 
1807
brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
 
1808
{         
 
1809
  Uint32 TbranchDirection= TheInstruction >> 31;
 
1810
  Uint32 TbranchLength= (TheInstruction >> 16) & 0x7fff;
 
1811
  TprogramCounter--;
 
1812
  if (TbranchDirection == 1) {
 
1813
    jam();
 
1814
    /* ---------------------------------------------------------------- */
 
1815
    /*       WE JUMP BACKWARDS.                                         */
 
1816
    /* ---------------------------------------------------------------- */
 
1817
    return (TprogramCounter - TbranchLength);
 
1818
  } else {
 
1819
    jam();
 
1820
    /* ---------------------------------------------------------------- */
 
1821
    /*       WE JUMP FORWARD.                                           */
 
1822
    /* ---------------------------------------------------------------- */
 
1823
    return (TprogramCounter + TbranchLength);
 
1824
  }
 
1825
}
 
1826
 
 
1827
int Dbtup::interpreterNextLab(Signal* signal,
 
1828
                              KeyReqStruct* req_struct,
 
1829
                              Uint32* logMemory,
 
1830
                              Uint32* mainProgram,
 
1831
                              Uint32 TmainProgLen,
 
1832
                              Uint32* subroutineProg,
 
1833
                              Uint32 TsubroutineLen,
 
1834
                              Uint32 * tmpArea,
 
1835
                              Uint32 tmpAreaSz)
 
1836
{
 
1837
  register Uint32* TcurrentProgram= mainProgram;
 
1838
  register Uint32 TcurrentSize= TmainProgLen;
 
1839
  register Uint32 RnoOfInstructions= 0;
 
1840
  register Uint32 TprogramCounter= 0;
 
1841
  register Uint32 theInstruction;
 
1842
  register Uint32 theRegister;
 
1843
  Uint32 TdataWritten= 0;
 
1844
  Uint32 RstackPtr= 0;
 
1845
  union {
 
1846
    Uint32 TregMemBuffer[32];
 
1847
    Uint64 align[16];
 
1848
  };
 
1849
  Uint32 TstackMemBuffer[32];
 
1850
 
 
1851
  /* ---------------------------------------------------------------- */
 
1852
  // Initialise all 8 registers to contain the NULL value.
 
1853
  // In this version we can handle 32 and 64 bit unsigned integers.
 
1854
  // They are handled as 64 bit values. Thus the 32 most significant
 
1855
  // bits are zeroed for 32 bit values.
 
1856
  /* ---------------------------------------------------------------- */
 
1857
  TregMemBuffer[0]= 0;
 
1858
  TregMemBuffer[4]= 0;
 
1859
  TregMemBuffer[8]= 0;
 
1860
  TregMemBuffer[12]= 0;
 
1861
  TregMemBuffer[16]= 0;
 
1862
  TregMemBuffer[20]= 0;
 
1863
  TregMemBuffer[24]= 0;
 
1864
  TregMemBuffer[28]= 0;
 
1865
  Uint32 tmpHabitant= ~0;
 
1866
 
 
1867
  while (RnoOfInstructions < 8000) {
 
1868
    /* ---------------------------------------------------------------- */
 
1869
    /* EXECUTE THE NEXT INTERPRETER INSTRUCTION.                        */
 
1870
    /* ---------------------------------------------------------------- */
 
1871
    RnoOfInstructions++;
 
1872
    theInstruction= TcurrentProgram[TprogramCounter];
 
1873
    theRegister= Interpreter::getReg1(theInstruction) << 2;
 
1874
    if (TprogramCounter < TcurrentSize) {
 
1875
      TprogramCounter++;
 
1876
      switch (Interpreter::getOpCode(theInstruction)) {
 
1877
      case Interpreter::READ_ATTR_INTO_REG:
 
1878
        jam();
 
1879
        /* ---------------------------------------------------------------- */
 
1880
        // Read an attribute from the tuple into a register.
 
1881
        // While reading an attribute we allow the attribute to be an array
 
1882
        // as long as it fits in the 64 bits of the register.
 
1883
        /* ---------------------------------------------------------------- */
 
1884
        {
 
1885
          Uint32 theAttrinfo= theInstruction;
 
1886
          int TnoDataRW= readAttributes(req_struct,
 
1887
                                     &theAttrinfo,
 
1888
                                     (Uint32)1,
 
1889
                                     &TregMemBuffer[theRegister],
 
1890
                                     (Uint32)3,
 
1891
                                     false);
 
1892
          if (TnoDataRW == 2) {
 
1893
            /* ------------------------------------------------------------- */
 
1894
            // Two words read means that we get the instruction plus one 32 
 
1895
            // word read. Thus we set the register to be a 32 bit register.
 
1896
            /* ------------------------------------------------------------- */
 
1897
            TregMemBuffer[theRegister]= 0x50;
 
1898
            // arithmetic conversion if big-endian
 
1899
            * (Int64*)(TregMemBuffer+theRegister+2)= TregMemBuffer[theRegister+1];
 
1900
          } else if (TnoDataRW == 3) {
 
1901
            /* ------------------------------------------------------------- */
 
1902
            // Three words read means that we get the instruction plus two 
 
1903
            // 32 words read. Thus we set the register to be a 64 bit register.
 
1904
            /* ------------------------------------------------------------- */
 
1905
            TregMemBuffer[theRegister]= 0x60;
 
1906
            TregMemBuffer[theRegister+3]= TregMemBuffer[theRegister+2];
 
1907
            TregMemBuffer[theRegister+2]= TregMemBuffer[theRegister+1];
 
1908
          } else if (TnoDataRW == 1) {
 
1909
            /* ------------------------------------------------------------- */
 
1910
            // One word read means that we must have read a NULL value. We set
 
1911
            // the register to indicate a NULL value.
 
1912
            /* ------------------------------------------------------------- */
 
1913
            TregMemBuffer[theRegister]= 0;
 
1914
            TregMemBuffer[theRegister + 2]= 0;
 
1915
            TregMemBuffer[theRegister + 3]= 0;
 
1916
          } else if (TnoDataRW == -1) {
 
1917
            jam();
 
1918
            tupkeyErrorLab(signal);
 
1919
            return -1;
 
1920
          } else {
 
1921
            /* ------------------------------------------------------------- */
 
1922
            // Any other return value from the read attribute here is not 
 
1923
            // allowed and will lead to a system crash.
 
1924
            /* ------------------------------------------------------------- */
 
1925
            ndbrequire(false);
 
1926
          }
 
1927
          break;
 
1928
        }
 
1929
 
 
1930
      case Interpreter::WRITE_ATTR_FROM_REG:
 
1931
        jam();
 
1932
        {
 
1933
          Uint32 TattrId= theInstruction >> 16;
 
1934
          Uint32 TattrDescrIndex= tabptr.p->tabDescriptor +
 
1935
            (TattrId << ZAD_LOG_SIZE);
 
1936
          Uint32 TattrDesc1= tableDescriptor[TattrDescrIndex].tabDescr;
 
1937
          Uint32 TregType= TregMemBuffer[theRegister];
 
1938
 
 
1939
          /* --------------------------------------------------------------- */
 
1940
          // Calculate the number of words of this attribute.
 
1941
          // We allow writes into arrays as long as they fit into the 64 bit
 
1942
          // register size.
 
1943
          /* --------------------------------------------------------------- */
 
1944
          Uint32 TattrNoOfWords = AttributeDescriptor::getSizeInWords(TattrDesc1);
 
1945
          Uint32 Toptype = operPtr.p->op_struct.op_type;
 
1946
          Uint32 TdataForUpdate[3];
 
1947
          Uint32 Tlen;
 
1948
 
 
1949
          AttributeHeader ah(TattrId, TattrNoOfWords << 2);
 
1950
          TdataForUpdate[0]= ah.m_value;
 
1951
          TdataForUpdate[1]= TregMemBuffer[theRegister + 2];
 
1952
          TdataForUpdate[2]= TregMemBuffer[theRegister + 3];
 
1953
          Tlen= TattrNoOfWords + 1;
 
1954
          if (Toptype == ZUPDATE) {
 
1955
            if (TattrNoOfWords <= 2) {
 
1956
              if (TattrNoOfWords == 1) {
 
1957
                // arithmetic conversion if big-endian
 
1958
                TdataForUpdate[1] = *(Int64*)&TregMemBuffer[theRegister + 2];
 
1959
                TdataForUpdate[2] = 0;
 
1960
              }
 
1961
              if (TregType == 0) {
 
1962
                /* --------------------------------------------------------- */
 
1963
                // Write a NULL value into the attribute
 
1964
                /* --------------------------------------------------------- */
 
1965
                ah.setNULL();
 
1966
                TdataForUpdate[0]= ah.m_value;
 
1967
                Tlen= 1;
 
1968
              }
 
1969
              int TnoDataRW= updateAttributes(req_struct,
 
1970
                                           &TdataForUpdate[0],
 
1971
                                           Tlen);
 
1972
              if (TnoDataRW != -1) {
 
1973
                /* --------------------------------------------------------- */
 
1974
                // Write the written data also into the log buffer so that it 
 
1975
                // will be logged.
 
1976
                /* --------------------------------------------------------- */
 
1977
                logMemory[TdataWritten + 0]= TdataForUpdate[0];
 
1978
                logMemory[TdataWritten + 1]= TdataForUpdate[1];
 
1979
                logMemory[TdataWritten + 2]= TdataForUpdate[2];
 
1980
                TdataWritten += Tlen;
 
1981
              } else {
 
1982
                tupkeyErrorLab(signal);
 
1983
                return -1;
 
1984
              }
 
1985
            } else {
 
1986
              return TUPKEY_abort(signal, 15);
 
1987
            }
 
1988
          } else {
 
1989
            return TUPKEY_abort(signal, 16);
 
1990
          }
 
1991
          break;
 
1992
        }
 
1993
 
 
1994
      case Interpreter::LOAD_CONST_NULL:
 
1995
        jam();
 
1996
        TregMemBuffer[theRegister]= 0;  /* NULL INDICATOR */
 
1997
        break;
 
1998
 
 
1999
      case Interpreter::LOAD_CONST16:
 
2000
        jam();
 
2001
        TregMemBuffer[theRegister]= 0x50;       /* 32 BIT UNSIGNED CONSTANT */
 
2002
        * (Int64*)(TregMemBuffer+theRegister+2)= theInstruction >> 16;
 
2003
        break;
 
2004
 
 
2005
      case Interpreter::LOAD_CONST32:
 
2006
        jam();
 
2007
        TregMemBuffer[theRegister]= 0x50;       /* 32 BIT UNSIGNED CONSTANT */
 
2008
        * (Int64*)(TregMemBuffer+theRegister+2)= * 
 
2009
          (TcurrentProgram+TprogramCounter);
 
2010
        TprogramCounter++;
 
2011
        break;
 
2012
 
 
2013
      case Interpreter::LOAD_CONST64:
 
2014
        jam();
 
2015
        TregMemBuffer[theRegister]= 0x60;       /* 64 BIT UNSIGNED CONSTANT */
 
2016
        TregMemBuffer[theRegister + 2 ]= * (TcurrentProgram +
 
2017
                                             TprogramCounter++);
 
2018
        TregMemBuffer[theRegister + 3 ]= * (TcurrentProgram +
 
2019
                                             TprogramCounter++);
 
2020
        break;
 
2021
 
 
2022
      case Interpreter::ADD_REG_REG:
 
2023
        jam();
 
2024
        {
 
2025
          Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
2026
          Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
 
2027
 
 
2028
          Uint32 TrightType= TregMemBuffer[TrightRegister];
 
2029
          Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
 
2030
          
 
2031
 
 
2032
          Uint32 TleftType= TregMemBuffer[theRegister];
 
2033
          Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
 
2034
         
 
2035
          if ((TleftType | TrightType) != 0) {
 
2036
            Uint64 Tdest0= Tleft0 + Tright0;
 
2037
            * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
 
2038
            TregMemBuffer[TdestRegister]= 0x60;
 
2039
          } else {
 
2040
            return TUPKEY_abort(signal, 20);
 
2041
          }
 
2042
          break;
 
2043
        }
 
2044
 
 
2045
      case Interpreter::SUB_REG_REG:
 
2046
        jam();
 
2047
        {
 
2048
          Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
2049
          Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
 
2050
 
 
2051
          Uint32 TrightType= TregMemBuffer[TrightRegister];
 
2052
          Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
 
2053
          
 
2054
          Uint32 TleftType= TregMemBuffer[theRegister];
 
2055
          Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
 
2056
         
 
2057
          if ((TleftType | TrightType) != 0) {
 
2058
            Int64 Tdest0= Tleft0 - Tright0;
 
2059
            * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
 
2060
            TregMemBuffer[TdestRegister]= 0x60;
 
2061
          } else {
 
2062
            return TUPKEY_abort(signal, 20);
 
2063
          }
 
2064
          break;
 
2065
        }
 
2066
 
 
2067
      case Interpreter::BRANCH:
 
2068
        TprogramCounter= brancher(theInstruction, TprogramCounter);
 
2069
        break;
 
2070
 
 
2071
      case Interpreter::BRANCH_REG_EQ_NULL:
 
2072
        if (TregMemBuffer[theRegister] != 0) {
 
2073
          jam();
 
2074
          continue;
 
2075
        } else {
 
2076
          jam();
 
2077
          TprogramCounter= brancher(theInstruction, TprogramCounter);
 
2078
        }
 
2079
        break;
 
2080
 
 
2081
      case Interpreter::BRANCH_REG_NE_NULL:
 
2082
        if (TregMemBuffer[theRegister] == 0) {
 
2083
          jam();
 
2084
          continue;
 
2085
        } else {
 
2086
          jam();
 
2087
          TprogramCounter= brancher(theInstruction, TprogramCounter);
 
2088
        }
 
2089
        break;
 
2090
 
 
2091
 
 
2092
      case Interpreter::BRANCH_EQ_REG_REG:
 
2093
        {
 
2094
          Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
2095
 
 
2096
          Uint32 TleftType= TregMemBuffer[theRegister];
 
2097
          Uint32 Tleft0= TregMemBuffer[theRegister + 2];
 
2098
          Uint32 Tleft1= TregMemBuffer[theRegister + 3];
 
2099
 
 
2100
          Uint32 TrightType= TregMemBuffer[TrightRegister];
 
2101
          Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
 
2102
          Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
 
2103
          if ((TrightType | TleftType) != 0) {
 
2104
            jam();
 
2105
            if ((Tleft0 == Tright0) && (Tleft1 == Tright1)) {
 
2106
              TprogramCounter= brancher(theInstruction, TprogramCounter);
 
2107
            }
 
2108
          } else {
 
2109
            return TUPKEY_abort(signal, 23);
 
2110
          }
 
2111
          break;
 
2112
        }
 
2113
 
 
2114
      case Interpreter::BRANCH_NE_REG_REG:
 
2115
        {
 
2116
          Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
2117
 
 
2118
          Uint32 TleftType= TregMemBuffer[theRegister];
 
2119
          Uint32 Tleft0= TregMemBuffer[theRegister + 2];
 
2120
          Uint32 Tleft1= TregMemBuffer[theRegister + 3];
 
2121
 
 
2122
          Uint32 TrightType= TregMemBuffer[TrightRegister];
 
2123
          Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
 
2124
          Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
 
2125
          if ((TrightType | TleftType) != 0) {
 
2126
            jam();
 
2127
            if ((Tleft0 != Tright0) || (Tleft1 != Tright1)) {
 
2128
              TprogramCounter= brancher(theInstruction, TprogramCounter);
 
2129
            }
 
2130
          } else {
 
2131
            return TUPKEY_abort(signal, 24);
 
2132
          }
 
2133
          break;
 
2134
        }
 
2135
 
 
2136
      case Interpreter::BRANCH_LT_REG_REG:
 
2137
        {
 
2138
          Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
2139
 
 
2140
          Uint32 TrightType= TregMemBuffer[TrightRegister];
 
2141
          Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
 
2142
          
 
2143
          Uint32 TleftType= TregMemBuffer[theRegister];
 
2144
          Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
 
2145
         
 
2146
 
 
2147
          if ((TrightType | TleftType) != 0) {
 
2148
            jam();
 
2149
            if (Tleft0 < Tright0) {
 
2150
              TprogramCounter= brancher(theInstruction, TprogramCounter);
 
2151
            }
 
2152
          } else {
 
2153
            return TUPKEY_abort(signal, 24);
 
2154
          }
 
2155
          break;
 
2156
        }
 
2157
 
 
2158
      case Interpreter::BRANCH_LE_REG_REG:
 
2159
        {
 
2160
          Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
2161
 
 
2162
          Uint32 TrightType= TregMemBuffer[TrightRegister];
 
2163
          Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
 
2164
          
 
2165
          Uint32 TleftType= TregMemBuffer[theRegister];
 
2166
          Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
 
2167
          
 
2168
 
 
2169
          if ((TrightType | TleftType) != 0) {
 
2170
            jam();
 
2171
            if (Tleft0 <= Tright0) {
 
2172
              TprogramCounter= brancher(theInstruction, TprogramCounter);
 
2173
            }
 
2174
          } else {
 
2175
            return TUPKEY_abort(signal, 26);
 
2176
          }
 
2177
          break;
 
2178
        }
 
2179
 
 
2180
      case Interpreter::BRANCH_GT_REG_REG:
 
2181
        {
 
2182
          Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
2183
 
 
2184
          Uint32 TrightType= TregMemBuffer[TrightRegister];
 
2185
          Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
 
2186
          
 
2187
          Uint32 TleftType= TregMemBuffer[theRegister];
 
2188
          Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
 
2189
          
 
2190
 
 
2191
          if ((TrightType | TleftType) != 0) {
 
2192
            jam();
 
2193
            if (Tleft0 > Tright0){
 
2194
              TprogramCounter= brancher(theInstruction, TprogramCounter);
 
2195
            }
 
2196
          } else {
 
2197
            return TUPKEY_abort(signal, 27);
 
2198
          }
 
2199
          break;
 
2200
        }
 
2201
 
 
2202
      case Interpreter::BRANCH_GE_REG_REG:
 
2203
        {
 
2204
          Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
2205
 
 
2206
          Uint32 TrightType= TregMemBuffer[TrightRegister];
 
2207
          Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
 
2208
          
 
2209
          Uint32 TleftType= TregMemBuffer[theRegister];
 
2210
          Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
 
2211
          
 
2212
 
 
2213
          if ((TrightType | TleftType) != 0) {
 
2214
            jam();
 
2215
            if (Tleft0 >= Tright0){
 
2216
              TprogramCounter= brancher(theInstruction, TprogramCounter);
 
2217
            }
 
2218
          } else {
 
2219
            return TUPKEY_abort(signal, 28);
 
2220
          }
 
2221
          break;
 
2222
        }
 
2223
 
 
2224
      case Interpreter::BRANCH_ATTR_OP_ARG:{
 
2225
        jam();
 
2226
        Uint32 cond = Interpreter::getBinaryCondition(theInstruction);
 
2227
        Uint32 ins2 = TcurrentProgram[TprogramCounter];
 
2228
        Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
 
2229
        Uint32 argLen = Interpreter::getBranchCol_Len(ins2);
 
2230
 
 
2231
        if(tmpHabitant != attrId){
 
2232
          Int32 TnoDataR = readAttributes(req_struct,
 
2233
                                          &attrId, 1,
 
2234
                                          tmpArea, tmpAreaSz,
 
2235
                                          false);
 
2236
          
 
2237
          if (TnoDataR == -1) {
 
2238
            jam();
 
2239
            tupkeyErrorLab(signal);
 
2240
            return -1;
 
2241
          }
 
2242
          tmpHabitant= attrId;
 
2243
        }
 
2244
 
 
2245
        // get type
 
2246
        attrId >>= 16;
 
2247
        Uint32 TattrDescrIndex = tabptr.p->tabDescriptor +
 
2248
          (attrId << ZAD_LOG_SIZE);
 
2249
        Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr;
 
2250
        Uint32 TattrDesc2 = tableDescriptor[TattrDescrIndex+1].tabDescr;
 
2251
        Uint32 typeId = AttributeDescriptor::getType(TattrDesc1);
 
2252
        void * cs = 0;
 
2253
        if(AttributeOffset::getCharsetFlag(TattrDesc2))
 
2254
        {
 
2255
          Uint32 pos = AttributeOffset::getCharsetPos(TattrDesc2);
 
2256
          cs = tabptr.p->charsetArray[pos];
 
2257
        }
 
2258
        const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(typeId);
 
2259
 
 
2260
        // get data
 
2261
        AttributeHeader ah(tmpArea[0]);
 
2262
        const char* s1 = (char*)&tmpArea[1];
 
2263
        const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
 
2264
        // fixed length in 5.0
 
2265
        Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1);
 
2266
 
 
2267
        bool r1_null = ah.isNULL();
 
2268
        bool r2_null = argLen == 0;
 
2269
        int res1;
 
2270
        if (cond != Interpreter::LIKE &&
 
2271
            cond != Interpreter::NOT_LIKE) {
 
2272
          if (r1_null || r2_null) {
 
2273
            // NULL==NULL and NULL<not-NULL
 
2274
            res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
 
2275
          } else {
 
2276
            jam();
 
2277
            if (unlikely(sqlType.m_cmp == 0))
 
2278
            {
 
2279
              return TUPKEY_abort(signal, 40);
 
2280
            }
 
2281
            res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
 
2282
          }
 
2283
        } else {
 
2284
          if (r1_null || r2_null) {
 
2285
            // NULL like NULL is true (has no practical use)
 
2286
            res1 =  r1_null && r2_null ? 0 : -1;
 
2287
          } else {
 
2288
            jam();
 
2289
            if (unlikely(sqlType.m_like == 0))
 
2290
            {
 
2291
              return TUPKEY_abort(signal, 40);
 
2292
            }
 
2293
            res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
 
2294
          }
 
2295
        }
 
2296
 
 
2297
        int res = 0;
 
2298
        switch ((Interpreter::BinaryCondition)cond) {
 
2299
        case Interpreter::EQ:
 
2300
          res = (res1 == 0);
 
2301
          break;
 
2302
        case Interpreter::NE:
 
2303
          res = (res1 != 0);
 
2304
          break;
 
2305
        // note the condition is backwards
 
2306
        case Interpreter::LT:
 
2307
          res = (res1 > 0);
 
2308
          break;
 
2309
        case Interpreter::LE:
 
2310
          res = (res1 >= 0);
 
2311
          break;
 
2312
        case Interpreter::GT:
 
2313
          res = (res1 < 0);
 
2314
          break;
 
2315
        case Interpreter::GE:
 
2316
          res = (res1 <= 0);
 
2317
          break;
 
2318
        case Interpreter::LIKE:
 
2319
          res = (res1 == 0);
 
2320
          break;
 
2321
        case Interpreter::NOT_LIKE:
 
2322
          res = (res1 == 1);
 
2323
          break;
 
2324
          // XXX handle invalid value
 
2325
        }
 
2326
#ifdef TRACE_INTERPRETER
 
2327
        ndbout_c("cond=%u attr(%d)='%.*s'(%d) str='%.*s'(%d) res1=%d res=%d",
 
2328
                 cond, attrId >> 16,
 
2329
                 attrLen, s1, attrLen, argLen, s2, argLen, res1, res);
 
2330
#endif
 
2331
        if (res)
 
2332
          TprogramCounter = brancher(theInstruction, TprogramCounter);
 
2333
        else 
 
2334
        {
 
2335
          Uint32 tmp = ((argLen + 3) >> 2) + 1;
 
2336
          TprogramCounter += tmp;
 
2337
        }
 
2338
        break;
 
2339
      }
 
2340
        
 
2341
      case Interpreter::BRANCH_ATTR_EQ_NULL:{
 
2342
        jam();
 
2343
        Uint32 ins2= TcurrentProgram[TprogramCounter];
 
2344
        Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
 
2345
        
 
2346
        if (tmpHabitant != attrId){
 
2347
          Int32 TnoDataR= readAttributes(req_struct,
 
2348
                                          &attrId, 1,
 
2349
                                          tmpArea, tmpAreaSz,
 
2350
                                          false);
 
2351
          
 
2352
          if (TnoDataR == -1) {
 
2353
            jam();
 
2354
            tupkeyErrorLab(signal);
 
2355
            return -1;
 
2356
          }
 
2357
          tmpHabitant= attrId;
 
2358
        }
 
2359
        
 
2360
        AttributeHeader ah(tmpArea[0]);
 
2361
        if (ah.isNULL()){
 
2362
          TprogramCounter= brancher(theInstruction, TprogramCounter);
 
2363
        } else {
 
2364
          TprogramCounter ++;
 
2365
        }
 
2366
        break;
 
2367
      }
 
2368
 
 
2369
      case Interpreter::BRANCH_ATTR_NE_NULL:{
 
2370
        jam();
 
2371
        Uint32 ins2= TcurrentProgram[TprogramCounter];
 
2372
        Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
 
2373
        
 
2374
        if (tmpHabitant != attrId){
 
2375
          Int32 TnoDataR= readAttributes(req_struct,
 
2376
                                          &attrId, 1,
 
2377
                                          tmpArea, tmpAreaSz,
 
2378
                                          false);
 
2379
          
 
2380
          if (TnoDataR == -1) {
 
2381
            jam();
 
2382
            tupkeyErrorLab(signal);
 
2383
            return -1;
 
2384
          }
 
2385
          tmpHabitant= attrId;
 
2386
        }
 
2387
        
 
2388
        AttributeHeader ah(tmpArea[0]);
 
2389
        if (ah.isNULL()){
 
2390
          TprogramCounter ++;
 
2391
        } else {
 
2392
          TprogramCounter= brancher(theInstruction, TprogramCounter);
 
2393
        }
 
2394
        break;
 
2395
      }
 
2396
        
 
2397
      case Interpreter::EXIT_OK:
 
2398
        jam();
 
2399
#ifdef TRACE_INTERPRETER
 
2400
        ndbout_c(" - exit_ok");
 
2401
#endif
 
2402
        return TdataWritten;
 
2403
 
 
2404
      case Interpreter::EXIT_OK_LAST:
 
2405
        jam();
 
2406
#ifdef TRACE_INTERPRETER
 
2407
        ndbout_c(" - exit_ok_last");
 
2408
#endif
 
2409
        req_struct->last_row= true;
 
2410
        return TdataWritten;
 
2411
        
 
2412
      case Interpreter::EXIT_REFUSE:
 
2413
        jam();
 
2414
#ifdef TRACE_INTERPRETER
 
2415
        ndbout_c(" - exit_nok");
 
2416
#endif
 
2417
        terrorCode= theInstruction >> 16;
 
2418
        return TUPKEY_abort(signal, 29);
 
2419
 
 
2420
      case Interpreter::CALL:
 
2421
        jam();
 
2422
        RstackPtr++;
 
2423
        if (RstackPtr < 32) {
 
2424
          TstackMemBuffer[RstackPtr]= TprogramCounter + 1;
 
2425
          TprogramCounter= theInstruction >> 16;
 
2426
          if (TprogramCounter < TsubroutineLen) {
 
2427
            TcurrentProgram= subroutineProg;
 
2428
            TcurrentSize= TsubroutineLen;
 
2429
          } else {
 
2430
            return TUPKEY_abort(signal, 30);
 
2431
          }
 
2432
        } else {
 
2433
          return TUPKEY_abort(signal, 31);
 
2434
        }
 
2435
        break;
 
2436
 
 
2437
      case Interpreter::RETURN:
 
2438
        jam();
 
2439
        if (RstackPtr > 0) {
 
2440
          TprogramCounter= TstackMemBuffer[RstackPtr];
 
2441
          RstackPtr--;
 
2442
          if (RstackPtr == 0) {
 
2443
            jam();
 
2444
            /* ------------------------------------------------------------- */
 
2445
            // We are back to the main program.
 
2446
            /* ------------------------------------------------------------- */
 
2447
            TcurrentProgram= mainProgram;
 
2448
            TcurrentSize= TmainProgLen;
 
2449
          }
 
2450
        } else {
 
2451
          return TUPKEY_abort(signal, 32);
 
2452
        }
 
2453
        break;
 
2454
 
 
2455
      default:
 
2456
        return TUPKEY_abort(signal, 33);
 
2457
      }
 
2458
    } else {
 
2459
      return TUPKEY_abort(signal, 34);
 
2460
    }
 
2461
  }
 
2462
  return TUPKEY_abort(signal, 35);
 
2463
}
 
2464
 
 
2465
/**
 
2466
 * expand_var_part - copy packed variable attributes to fully expanded size
 
2467
 * 
 
2468
 * dst:        where to start writing attribute data
 
2469
 * dst_off_ptr where to write attribute offsets
 
2470
 * src         pointer to packed attributes
 
2471
 * tabDesc     array of attribute descriptors (used for getting max size)
 
2472
 * no_of_attr  no of atributes to expand
 
2473
 */
 
2474
Uint32*
 
2475
expand_var_part(Dbtup::KeyReqStruct::Var_data *dst, 
 
2476
                const Uint32* src, 
 
2477
                const Uint32 * tabDesc, 
 
2478
                const Uint16* order)
 
2479
{
 
2480
  char* dst_ptr= dst->m_data_ptr;
 
2481
  Uint32 no_attr= dst->m_var_len_offset;
 
2482
  Uint16* dst_off_ptr= dst->m_offset_array_ptr;
 
2483
  Uint16* dst_len_ptr= dst_off_ptr + no_attr;
 
2484
  const Uint16* src_off_ptr= (const Uint16*)src;
 
2485
  const char* src_ptr= (const char*)(src_off_ptr + no_attr + 1);
 
2486
  
 
2487
  Uint16 tmp= *src_off_ptr++, next_pos, len, max_len, dst_off= 0;
 
2488
  for(Uint32 i = 0; i<no_attr; i++)
 
2489
  {
 
2490
    next_pos= *src_off_ptr++;
 
2491
    len= next_pos - tmp;
 
2492
    
 
2493
    *dst_off_ptr++ = dst_off; 
 
2494
    *dst_len_ptr++ = dst_off + len;
 
2495
    memcpy(dst_ptr, src_ptr, len);
 
2496
    src_ptr += len;
 
2497
    
 
2498
    max_len= AttributeDescriptor::getSizeInBytes(tabDesc[* order++]);
 
2499
    dst_ptr += max_len; // Max size
 
2500
    dst_off += max_len;
 
2501
    
 
2502
    tmp= next_pos;
 
2503
  }
 
2504
  
 
2505
  return ALIGN_WORD(dst_ptr);
 
2506
}
 
2507
 
 
2508
void
 
2509
Dbtup::expand_tuple(KeyReqStruct* req_struct, 
 
2510
                    Uint32 sizes[2],
 
2511
                    Tuple_header* src, 
 
2512
                    const Tablerec* tabPtrP,
 
2513
                    bool disk)
 
2514
{
 
2515
  Uint32 bits= src->m_header_bits;
 
2516
  Tuple_header* ptr= req_struct->m_tuple_ptr;
 
2517
  
 
2518
  Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
 
2519
  Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
 
2520
  Uint32 fix_size= tabPtrP->m_offsets[MM].m_fix_header_size;
 
2521
  Uint32 order_desc= tabPtrP->m_real_order_descriptor;
 
2522
 
 
2523
  Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
 
2524
  const Uint32 *disk_ref= src->get_disk_ref_ptr(tabPtrP);
 
2525
  const Uint32 *src_ptr= src->get_end_of_fix_part_ptr(tabPtrP);
 
2526
  const Var_part_ref* var_ref = src->get_var_part_ref_ptr(tabPtrP);
 
2527
  const Uint32 *desc= (Uint32*)req_struct->attr_descr;
 
2528
  const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
 
2529
  order += tabPtrP->m_attributes[MM].m_no_of_fixsize;
 
2530
  
 
2531
  if(mm_vars)
 
2532
  {
 
2533
 
 
2534
    Uint32 step; // in bytes
 
2535
    const Uint32 *src_data= src_ptr;
 
2536
    KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
 
2537
    if(bits & Tuple_header::CHAINED_ROW)
 
2538
    {
 
2539
      Ptr<Page> var_page;
 
2540
      src_data= get_ptr(&var_page, *var_ref);
 
2541
      step= 4;
 
2542
      sizes[MM]= (2 + (mm_vars << 1) + ((Uint16*)src_data)[mm_vars] + 3) >> 2;
 
2543
      req_struct->m_varpart_page_ptr = var_page;
 
2544
    }
 
2545
    else
 
2546
    {
 
2547
      step= (2 + (mm_vars << 1) + ((Uint16*)src_ptr)[mm_vars]);
 
2548
      sizes[MM]= (step + 3) >> 2;
 
2549
      req_struct->m_varpart_page_ptr = req_struct->m_page_ptr;
 
2550
    }
 
2551
    dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+mm_vars+1);
 
2552
    dst->m_offset_array_ptr= req_struct->var_pos_array;
 
2553
    dst->m_var_len_offset= mm_vars;
 
2554
    dst->m_max_var_offset= tabPtrP->m_offsets[MM].m_max_var_offset;
 
2555
    
 
2556
    dst_ptr= expand_var_part(dst, src_data, desc, order);
 
2557
    ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
 
2558
    ndbassert((UintPtr(src_ptr) & 3) == 0);
 
2559
    src_ptr = ALIGN_WORD(((char*)src_ptr)+step);
 
2560
    
 
2561
    sizes[MM] += fix_size;
 
2562
    memcpy(ptr, src, 4*fix_size);
 
2563
  } 
 
2564
  else 
 
2565
  {
 
2566
    sizes[MM]= 1;
 
2567
    memcpy(ptr, src, 4*fix_size);
 
2568
  }
 
2569
 
 
2570
  src->m_header_bits= bits & 
 
2571
    ~(Uint32)(Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN);
 
2572
  
 
2573
  sizes[DD]= 0;
 
2574
  if(disk && dd_tot)
 
2575
  {
 
2576
    const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
 
2577
    order += mm_vars;
 
2578
    
 
2579
    if(bits & Tuple_header::DISK_INLINE)
 
2580
    {
 
2581
      // Only on copy tuple
 
2582
      ndbassert((bits & Tuple_header::CHAINED_ROW) == 0);
 
2583
    }
 
2584
    else
 
2585
    {
 
2586
      Local_key key;
 
2587
      memcpy(&key, disk_ref, sizeof(key));
 
2588
      key.m_page_no= req_struct->m_disk_page_ptr.i;
 
2589
      src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
 
2590
    }
 
2591
    bits |= Tuple_header::DISK_INLINE;
 
2592
 
 
2593
    // Fix diskpart
 
2594
    req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
 
2595
    memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
 
2596
    sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
 
2597
    
 
2598
    ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
 
2599
    
 
2600
    ndbrequire(dd_vars == 0);
 
2601
  }
 
2602
  
 
2603
  ptr->m_header_bits= (bits & ~(Uint32)(Tuple_header::CHAINED_ROW));
 
2604
}
 
2605
 
 
2606
void
 
2607
Dbtup::prepare_read(KeyReqStruct* req_struct, 
 
2608
                    Tablerec* tabPtrP, bool disk)
 
2609
{
 
2610
  Tuple_header* ptr= req_struct->m_tuple_ptr;
 
2611
  
 
2612
  Uint32 bits= ptr->m_header_bits;
 
2613
  Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
 
2614
  Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
 
2615
  
 
2616
  const Uint32 *src_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
 
2617
  const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
 
2618
  const Var_part_ref* var_ref = ptr->get_var_part_ref_ptr(tabPtrP);
 
2619
  if(mm_vars)
 
2620
  {
 
2621
    const Uint32 *src_data= src_ptr;
 
2622
    KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
 
2623
    if(bits & Tuple_header::CHAINED_ROW)
 
2624
    {
 
2625
#if VM_TRACE
 
2626
      
 
2627
#endif
 
2628
      src_data= get_ptr(* var_ref);
 
2629
    }
 
2630
    dst->m_data_ptr= (char*)(((Uint16*)src_data)+mm_vars+1);
 
2631
    dst->m_offset_array_ptr= (Uint16*)src_data;
 
2632
    dst->m_var_len_offset= 1;
 
2633
    dst->m_max_var_offset= ((Uint16*)src_data)[mm_vars];
 
2634
    
 
2635
    // disk part start after varsize (aligned)
 
2636
    src_ptr = ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset);
 
2637
  } 
 
2638
  
 
2639
  if(disk && dd_tot)
 
2640
  {
 
2641
    const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
 
2642
    
 
2643
    if(bits & Tuple_header::DISK_INLINE)
 
2644
    {
 
2645
      // Only on copy tuple
 
2646
      ndbassert((bits & Tuple_header::CHAINED_ROW) == 0);
 
2647
    }
 
2648
    else
 
2649
    {
 
2650
      // XXX
 
2651
      Local_key key;
 
2652
      memcpy(&key, disk_ref, sizeof(key));
 
2653
      key.m_page_no= req_struct->m_disk_page_ptr.i;
 
2654
      src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
 
2655
    }
 
2656
    // Fix diskpart
 
2657
    req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
 
2658
    ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
 
2659
    ndbrequire(dd_vars == 0);
 
2660
  }
 
2661
}
 
2662
 
 
2663
void
 
2664
Dbtup::shrink_tuple(KeyReqStruct* req_struct, Uint32 sizes[2],
 
2665
                    const Tablerec* tabPtrP, bool disk)
 
2666
{
 
2667
  ndbassert(tabPtrP->need_shrink());
 
2668
  Tuple_header* ptr= req_struct->m_tuple_ptr;
 
2669
  
 
2670
  Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
 
2671
  Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
 
2672
  Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
 
2673
  
 
2674
  Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
 
2675
  Uint16* src_off_ptr= req_struct->var_pos_array;
 
2676
 
 
2677
  sizes[MM] = 1;
 
2678
  sizes[DD] = 0;
 
2679
  if(mm_vars)
 
2680
  {
 
2681
    Uint16* dst_off_ptr= (Uint16*)dst_ptr;
 
2682
    char*  dst_data_ptr= (char*)(dst_off_ptr + mm_vars + 1);
 
2683
    char*  src_data_ptr= dst_data_ptr;
 
2684
    Uint32 off= 0;
 
2685
    for(Uint32 i= 0; i<mm_vars; i++)
 
2686
    {
 
2687
      const char* data_ptr= src_data_ptr + *src_off_ptr;
 
2688
      Uint32 len= src_off_ptr[mm_vars] - *src_off_ptr;
 
2689
      * dst_off_ptr++= off;
 
2690
      memmove(dst_data_ptr, data_ptr, len);
 
2691
      off += len;
 
2692
      src_off_ptr++;
 
2693
      dst_data_ptr += len;
 
2694
    }
 
2695
    *dst_off_ptr= off;
 
2696
    ndbassert(dst_data_ptr <= ((char*)ptr) + 8192);
 
2697
    ndbassert((UintPtr(ptr) & 3) == 0);
 
2698
    sizes[MM]= (dst_data_ptr + 3 - ((char*)ptr)) >> 2;
 
2699
 
 
2700
    dst_ptr = ALIGN_WORD(dst_data_ptr);
 
2701
  }
 
2702
  
 
2703
  if(disk && dd_tot)
 
2704
  {
 
2705
    Uint32 * src_ptr = (Uint32*)req_struct->m_disk_ptr;
 
2706
    req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
 
2707
    ndbrequire(dd_vars == 0);
 
2708
    sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
 
2709
    memmove(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
 
2710
  }
 
2711
}
 
2712
 
 
2713
void
 
2714
Dbtup::validate_page(Tablerec* regTabPtr, Var_page* p)
 
2715
{
 
2716
  Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
 
2717
  Uint32 fix_sz= regTabPtr->m_offsets[MM].m_fix_header_size + 
 
2718
    Tuple_header::HeaderSize;
 
2719
    
 
2720
  if(mm_vars == 0)
 
2721
    return;
 
2722
  
 
2723
  for(Uint32 F= 0; F<MAX_FRAG_PER_NODE; F++)
 
2724
  {
 
2725
    FragrecordPtr fragPtr;
 
2726
 
 
2727
    if((fragPtr.i = regTabPtr->fragrec[F]) == RNIL)
 
2728
      continue;
 
2729
 
 
2730
    ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
2731
    for(Uint32 P= 0; P<fragPtr.p->noOfPages; P++)
 
2732
    {
 
2733
      Uint32 real= getRealpid(fragPtr.p, P);
 
2734
      Var_page* page= (Var_page*)c_page_pool.getPtr(real);
 
2735
 
 
2736
      for(Uint32 i=1; i<page->high_index; i++)
 
2737
      {
 
2738
        Uint32 idx= page->get_index_word(i);
 
2739
        Uint32 len = (idx & Var_page::LEN_MASK) >> Var_page::LEN_SHIFT;
 
2740
        if(!(idx & Var_page::FREE) && !(idx & Var_page::CHAIN))
 
2741
        {
 
2742
          Tuple_header *ptr= (Tuple_header*)page->get_ptr(i);
 
2743
          Uint32 *part= ptr->get_end_of_fix_part_ptr(regTabPtr);
 
2744
          if(ptr->m_header_bits & Tuple_header::CHAINED_ROW)
 
2745
          {
 
2746
            ndbassert(len == fix_sz + 1);
 
2747
            Local_key tmp; tmp.assref(*part);
 
2748
            Ptr<Page> tmpPage;
 
2749
            part= get_ptr(&tmpPage, *(Var_part_ref*)part);
 
2750
            len= ((Var_page*)tmpPage.p)->get_entry_len(tmp.m_page_idx);
 
2751
            Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
 
2752
            ndbassert(len >= ((sz + 3) >> 2));
 
2753
          } 
 
2754
          else
 
2755
          {
 
2756
            Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
 
2757
            ndbassert(len >= ((sz+3)>>2)+fix_sz);
 
2758
          }
 
2759
          if(ptr->m_operation_ptr_i != RNIL)
 
2760
          {
 
2761
            c_operation_pool.getPtr(ptr->m_operation_ptr_i);
 
2762
          }
 
2763
        } 
 
2764
        else if(!(idx & Var_page::FREE))
 
2765
        {
 
2766
          /**
 
2767
           * Chain
 
2768
           */
 
2769
          Uint32 *part= page->get_ptr(i);
 
2770
          Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
 
2771
          ndbassert(len >= ((sz + 3) >> 2));
 
2772
        } 
 
2773
        else 
 
2774
        {
 
2775
          
 
2776
        }
 
2777
      }
 
2778
      if(p == 0 && page->high_index > 1)
 
2779
        page->reorg((Var_page*)ctemp_page);
 
2780
    }
 
2781
  }
 
2782
  
 
2783
  if(p == 0)
 
2784
  {
 
2785
    validate_page(regTabPtr, (Var_page*)1);
 
2786
  }
 
2787
}
 
2788
 
 
2789
int
 
2790
Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
 
2791
                                       Tuple_header* org,
 
2792
                                       Operationrec* regOperPtr,
 
2793
                                       Fragrecord* regFragPtr,
 
2794
                                       Tablerec* regTabPtr,
 
2795
                                       Uint32 sizes[4])
 
2796
{
 
2797
  ndbrequire(sizes[1] == sizes[3]);
 
2798
  //ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
 
2799
  if(0)
 
2800
    printf("%p %d %d - handle_size_change_after_update ",
 
2801
           req_struct->m_tuple_ptr,
 
2802
           regOperPtr->m_tuple_location.m_page_no,
 
2803
           regOperPtr->m_tuple_location.m_page_idx);
 
2804
  
 
2805
  Uint32 bits= org->m_header_bits;
 
2806
  Uint32 copy_bits= req_struct->m_tuple_ptr->m_header_bits;
 
2807
  Uint32 fix_sz = regTabPtr->m_offsets[MM].m_fix_header_size;
 
2808
  
 
2809
  if(sizes[MM] == sizes[2+MM])
 
2810
    ;
 
2811
  else if(sizes[MM] > sizes[2+MM])
 
2812
  {
 
2813
    if(0) ndbout_c("shrink");
 
2814
    copy_bits |= Tuple_header::MM_SHRINK;
 
2815
  }
 
2816
  else
 
2817
  {
 
2818
    if(0) printf("grow - ");
 
2819
    Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
 
2820
    Var_page* pageP= (Var_page*)pagePtr.p;
 
2821
    Uint32 idx, alloc, needed;
 
2822
    Var_part_ref *refptr = org->get_var_part_ref_ptr(regTabPtr);
 
2823
    ndbassert(bits & Tuple_header::CHAINED_ROW);
 
2824
 
 
2825
    Local_key ref;
 
2826
    refptr->copyout(&ref);
 
2827
    idx= ref.m_page_idx;
 
2828
    if (! (copy_bits & Tuple_header::CHAINED_ROW))
 
2829
    {
 
2830
      c_page_pool.getPtr(pagePtr, ref.m_page_no);
 
2831
      pageP = (Var_page*)pagePtr.p;
 
2832
    }
 
2833
    alloc= pageP->get_entry_len(idx);
 
2834
#ifdef VM_TRACE
 
2835
    if(!pageP->get_entry_chain(idx))
 
2836
      ndbout << *pageP << endl;
 
2837
#endif
 
2838
    ndbassert(pageP->get_entry_chain(idx));
 
2839
    needed= sizes[2+MM] - fix_sz;
 
2840
    
 
2841
    if(needed <= alloc)
 
2842
    {
 
2843
      //ndbassert(!regOperPtr->is_first_operation());
 
2844
      if (0) ndbout_c(" no grow");
 
2845
      return 0;
 
2846
    }
 
2847
    copy_bits |= Tuple_header::MM_GROWN;
 
2848
    if (unlikely(realloc_var_part(regFragPtr, regTabPtr, pagePtr, 
 
2849
                                  refptr, alloc, needed)))
 
2850
      return -1;
 
2851
 
 
2852
    if (regTabPtr->m_bits & Tablerec::TR_Checksum) 
 
2853
    {
 
2854
      jam();
 
2855
      setChecksum(org, regTabPtr);
 
2856
    }
 
2857
  }
 
2858
  req_struct->m_tuple_ptr->m_header_bits = copy_bits;
 
2859
  return 0;
 
2860
}
 
2861
 
 
2862
int
 
2863
Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
 
2864
{
 
2865
  FragrecordPtr fragPtr;
 
2866
  fragPtr.i= fragPtrI;
 
2867
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
2868
  TablerecPtr tablePtr;
 
2869
  tablePtr.i= fragPtr.p->fragTableId;
 
2870
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
 
2871
 
 
2872
  if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
 
2873
  {
 
2874
    Local_key tmp = *key;
 
2875
    PagePtr page_ptr;
 
2876
 
 
2877
    int ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
 
2878
 
 
2879
    if (ret)
 
2880
      return -1;
 
2881
    
 
2882
    Tuple_header* ptr = (Tuple_header*)
 
2883
      ((Fix_page*)page_ptr.p)->get_ptr(tmp.m_page_idx, 0);
 
2884
    
 
2885
    ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
 
2886
    *ptr->get_mm_gci(tablePtr.p) = gci;
 
2887
  }
 
2888
  return 0;
 
2889
}
 
2890
 
 
2891
int
 
2892
Dbtup::nr_read_pk(Uint32 fragPtrI, 
 
2893
                  const Local_key* key, Uint32* dst, bool& copy)
 
2894
{
 
2895
  
 
2896
  FragrecordPtr fragPtr;
 
2897
  fragPtr.i= fragPtrI;
 
2898
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
2899
  TablerecPtr tablePtr;
 
2900
  tablePtr.i= fragPtr.p->fragTableId;
 
2901
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
 
2902
 
 
2903
  Local_key tmp = *key;
 
2904
  
 
2905
  
 
2906
  PagePtr page_ptr;
 
2907
  int ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
 
2908
  if (ret)
 
2909
    return -1;
 
2910
  
 
2911
  KeyReqStruct req_struct;
 
2912
  Uint32* ptr= ((Fix_page*)page_ptr.p)->get_ptr(key->m_page_idx, 0);
 
2913
  
 
2914
  req_struct.m_page_ptr = page_ptr;
 
2915
  req_struct.m_tuple_ptr = (Tuple_header*)ptr;
 
2916
  Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;
 
2917
 
 
2918
  ret = 0;
 
2919
  copy = false;
 
2920
  if (! (bits & Tuple_header::FREE))
 
2921
  {
 
2922
    if (bits & Tuple_header::ALLOC)
 
2923
    {
 
2924
      Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
 
2925
      Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
 
2926
      ndbassert(!opPtrP->m_copy_tuple_location.isNull());
 
2927
      req_struct.m_tuple_ptr= (Tuple_header*)
 
2928
        c_undo_buffer.get_ptr(&opPtrP->m_copy_tuple_location);
 
2929
      copy = true;
 
2930
    }
 
2931
    req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
 
2932
    req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
 
2933
    
 
2934
    Uint32 num_attr= tablePtr.p->m_no_of_attributes;
 
2935
    Uint32 descr_start= tablePtr.p->tabDescriptor;
 
2936
    TableDescriptor *tab_descr= &tableDescriptor[descr_start];
 
2937
    ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
 
2938
    req_struct.attr_descr= tab_descr; 
 
2939
 
 
2940
    if (tablePtr.p->need_expand())
 
2941
      prepare_read(&req_struct, tablePtr.p, false);
 
2942
    
 
2943
    const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
 
2944
    const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
 
2945
    // read pk attributes from original tuple
 
2946
    
 
2947
    // new globals
 
2948
    tabptr= tablePtr;
 
2949
    fragptr= fragPtr;
 
2950
    operPtr.i= RNIL;
 
2951
    operPtr.p= NULL;
 
2952
    
 
2953
    // do it
 
2954
    ret = readAttributes(&req_struct,
 
2955
                         attrIds,
 
2956
                         numAttrs,
 
2957
                         dst,
 
2958
                         ZNIL, false);
 
2959
    
 
2960
    // done
 
2961
    if (likely(ret != -1)) {
 
2962
      // remove headers
 
2963
      Uint32 n= 0;
 
2964
      Uint32 i= 0;
 
2965
      while (n < numAttrs) {
 
2966
        const AttributeHeader ah(dst[i]);
 
2967
        Uint32 size= ah.getDataSize();
 
2968
        ndbrequire(size != 0);
 
2969
        for (Uint32 j= 0; j < size; j++) {
 
2970
          dst[i + j - n]= dst[i + j + 1];
 
2971
        }
 
2972
        n+= 1;
 
2973
        i+= 1 + size;
 
2974
      }
 
2975
      ndbrequire((int)i == ret);
 
2976
      ret -= numAttrs;
 
2977
    } else {
 
2978
      return terrorCode ? (-(int)terrorCode) : -1;
 
2979
    }
 
2980
  }
 
2981
    
 
2982
  if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
 
2983
  {
 
2984
    dst[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
 
2985
  }
 
2986
  else
 
2987
  {
 
2988
    dst[ret] = 0;
 
2989
  }
 
2990
  return ret;
 
2991
}
 
2992
 
 
2993
#include <signaldata/TuxMaint.hpp>
 
2994
 
 
2995
int
 
2996
Dbtup::nr_delete(Signal* signal, Uint32 senderData,
 
2997
                 Uint32 fragPtrI, const Local_key* key, Uint32 gci)
 
2998
{
 
2999
  FragrecordPtr fragPtr;
 
3000
  fragPtr.i= fragPtrI;
 
3001
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
3002
  TablerecPtr tablePtr;
 
3003
  tablePtr.i= fragPtr.p->fragTableId;
 
3004
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
 
3005
 
 
3006
  Local_key tmp = * key;
 
3007
  tmp.m_page_no= getRealpid(fragPtr.p, tmp.m_page_no); 
 
3008
  
 
3009
  PagePtr pagePtr;
 
3010
  Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, tablePtr.p);
 
3011
 
 
3012
  if (!tablePtr.p->tuxCustomTriggers.isEmpty()) 
 
3013
  {
 
3014
    jam();
 
3015
    TuxMaintReq* req = (TuxMaintReq*)signal->getDataPtrSend();
 
3016
    req->tableId = fragPtr.p->fragTableId;
 
3017
    req->fragId = fragPtr.p->fragmentId;
 
3018
    req->pageId = tmp.m_page_no;
 
3019
    req->pageIndex = tmp.m_page_idx;
 
3020
    req->tupVersion = ptr->get_tuple_version();
 
3021
    req->opInfo = TuxMaintReq::OpRemove;
 
3022
    removeTuxEntries(signal, tablePtr.p);
 
3023
  }
 
3024
  
 
3025
  Local_key disk;
 
3026
  memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
 
3027
  
 
3028
  if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
 
3029
  {
 
3030
    jam();
 
3031
    free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
 
3032
  } else {
 
3033
    jam();
 
3034
    free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
 
3035
  }
 
3036
 
 
3037
  if (tablePtr.p->m_no_of_disk_attributes)
 
3038
  {
 
3039
    jam();
 
3040
 
 
3041
    Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
 
3042
      tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
 
3043
    
 
3044
    int res = c_lgman->alloc_log_space(fragPtr.p->m_logfile_group_id, sz);
 
3045
    ndbrequire(res == 0);
 
3046
    
 
3047
    /**
 
3048
     * 1) alloc log buffer
 
3049
     * 2) get page
 
3050
     * 3) get log buffer
 
3051
     * 4) delete tuple
 
3052
     */
 
3053
    Page_cache_client::Request preq;
 
3054
    preq.m_page = disk;
 
3055
    preq.m_callback.m_callbackData = senderData;
 
3056
    preq.m_callback.m_callbackFunction =
 
3057
      safe_cast(&Dbtup::nr_delete_page_callback);
 
3058
    int flags = Page_cache_client::COMMIT_REQ;
 
3059
    
 
3060
#ifdef ERROR_INSERT
 
3061
    if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
 
3062
    {
 
3063
      int rnd = rand() % 100;
 
3064
      int slp = 0;
 
3065
      if (ERROR_INSERTED(4024))
 
3066
      {
 
3067
        slp = 3000;
 
3068
      }
 
3069
      else if (rnd > 90)
 
3070
      {
 
3071
        slp = 3000;
 
3072
      }
 
3073
      else if (rnd > 70)
 
3074
      {
 
3075
        slp = 100;
 
3076
      }
 
3077
      
 
3078
      ndbout_c("rnd: %d slp: %d", rnd, slp);
 
3079
      
 
3080
      if (slp)
 
3081
      {
 
3082
        flags |= Page_cache_client::DELAY_REQ;
 
3083
        preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp;
 
3084
      }
 
3085
    }
 
3086
#endif
 
3087
    
 
3088
    res = m_pgman.get_page(signal, preq, flags);
 
3089
    if (res == 0)
 
3090
    {
 
3091
      goto timeslice;
 
3092
    }
 
3093
    else if (unlikely(res == -1))
 
3094
    {
 
3095
      return -1;
 
3096
    }
 
3097
 
 
3098
    PagePtr disk_page = *(PagePtr*)&m_pgman.m_ptr;
 
3099
    disk_page_set_dirty(disk_page);
 
3100
 
 
3101
    preq.m_callback.m_callbackFunction =
 
3102
      safe_cast(&Dbtup::nr_delete_log_buffer_callback);      
 
3103
    Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
 
3104
    res= lgman.get_log_buffer(signal, sz, &preq.m_callback);
 
3105
    switch(res){
 
3106
    case 0:
 
3107
      signal->theData[2] = disk_page.i;
 
3108
      goto timeslice;
 
3109
    case -1:
 
3110
      ndbrequire("NOT YET IMPLEMENTED" == 0);
 
3111
      break;
 
3112
    }
 
3113
 
 
3114
    if (0) ndbout << "DIRECT DISK DELETE: " << disk << endl;
 
3115
    disk_page_free(signal, tablePtr.p, fragPtr.p,
 
3116
                   &disk, *(PagePtr*)&disk_page, gci);
 
3117
    return 0;
 
3118
  }
 
3119
  
 
3120
  return 0;
 
3121
 
 
3122
timeslice:
 
3123
  memcpy(signal->theData, &disk, sizeof(disk));
 
3124
  return 1;
 
3125
}
 
3126
 
 
3127
void
 
3128
Dbtup::nr_delete_page_callback(Signal* signal, 
 
3129
                               Uint32 userpointer, Uint32 page_id)
 
3130
{
 
3131
  Ptr<GlobalPage> gpage;
 
3132
  m_global_page_pool.getPtr(gpage, page_id);
 
3133
  PagePtr pagePtr= *(PagePtr*)&gpage;
 
3134
  disk_page_set_dirty(pagePtr);
 
3135
  Dblqh::Nr_op_info op;
 
3136
  op.m_ptr_i = userpointer;
 
3137
  op.m_disk_ref.m_page_no = pagePtr.p->m_page_no;
 
3138
  op.m_disk_ref.m_file_no = pagePtr.p->m_file_no;
 
3139
  c_lqh->get_nr_op_info(&op, page_id);
 
3140
 
 
3141
  Ptr<Fragrecord> fragPtr;
 
3142
  fragPtr.i= op.m_tup_frag_ptr_i;
 
3143
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
3144
 
 
3145
  Ptr<Tablerec> tablePtr;
 
3146
  tablePtr.i = fragPtr.p->fragTableId;
 
3147
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
 
3148
  
 
3149
  Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
 
3150
    tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
 
3151
  
 
3152
  Callback cb;
 
3153
  cb.m_callbackData = userpointer;
 
3154
  cb.m_callbackFunction =
 
3155
    safe_cast(&Dbtup::nr_delete_log_buffer_callback);      
 
3156
  Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
 
3157
  int res= lgman.get_log_buffer(signal, sz, &cb);
 
3158
  switch(res){
 
3159
  case 0:
 
3160
    return;
 
3161
  case -1:
 
3162
    ndbrequire("NOT YET IMPLEMENTED" == 0);
 
3163
    break;
 
3164
  }
 
3165
    
 
3166
  if (0) ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
 
3167
  disk_page_free(signal, tablePtr.p, fragPtr.p,
 
3168
                 &op.m_disk_ref, pagePtr, op.m_gci);
 
3169
  
 
3170
  c_lqh->nr_delete_complete(signal, &op);
 
3171
  return;
 
3172
}
 
3173
 
 
3174
void
 
3175
Dbtup::nr_delete_log_buffer_callback(Signal* signal, 
 
3176
                                    Uint32 userpointer, 
 
3177
                                    Uint32 unused)
 
3178
{
 
3179
  Dblqh::Nr_op_info op;
 
3180
  op.m_ptr_i = userpointer;
 
3181
  c_lqh->get_nr_op_info(&op, RNIL);
 
3182
  
 
3183
  Ptr<Fragrecord> fragPtr;
 
3184
  fragPtr.i= op.m_tup_frag_ptr_i;
 
3185
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
3186
 
 
3187
  Ptr<Tablerec> tablePtr;
 
3188
  tablePtr.i = fragPtr.p->fragTableId;
 
3189
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
 
3190
 
 
3191
  Ptr<GlobalPage> gpage;
 
3192
  m_global_page_pool.getPtr(gpage, op.m_page_id);
 
3193
  PagePtr pagePtr= *(PagePtr*)&gpage;
 
3194
 
 
3195
  /**
 
3196
   * reset page no
 
3197
   */
 
3198
  if (0) ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
 
3199
  
 
3200
  disk_page_free(signal, tablePtr.p, fragPtr.p,
 
3201
                 &op.m_disk_ref, pagePtr, op.m_gci);
 
3202
  
 
3203
  c_lqh->nr_delete_complete(signal, &op);
 
3204
}