~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2004 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#define DBTUP_C
 
17
#define DBTUP_DISK_ALLOC_CPP
 
18
#include "Dbtup.hpp"
 
19
 
 
20
static bool f_undo_done = true;
 
21
 
 
22
static
 
23
NdbOut&
 
24
operator<<(NdbOut& out, const Ptr<Dbtup::Page> & ptr)
 
25
{
 
26
  out << "[ Page: ptr.i: " << ptr.i 
 
27
      << " [ m_file_no: " << ptr.p->m_file_no
 
28
      << " m_page_no: " << ptr.p->m_page_no << "]"
 
29
      << " list_index: " << ptr.p->list_index 
 
30
      << " free_space: " << ptr.p->free_space
 
31
      << " uncommitted_used_space: " << ptr.p->uncommitted_used_space
 
32
      << " ]";
 
33
  return out;
 
34
}
 
35
 
 
36
static
 
37
NdbOut&
 
38
operator<<(NdbOut& out, const Ptr<Dbtup::Page_request> & ptr)
 
39
{
 
40
  out << "[ Page_request: ptr.i: " << ptr.i
 
41
      << " " << ptr.p->m_key
 
42
      << " m_estimated_free_space: " << ptr.p->m_estimated_free_space
 
43
      << " m_list_index: " << ptr.p->m_list_index
 
44
      << " m_frag_ptr_i: " << ptr.p->m_frag_ptr_i
 
45
      << " m_extent_info_ptr: " << ptr.p->m_extent_info_ptr
 
46
      << " m_ref_count: " << ptr.p->m_ref_count
 
47
      << " m_uncommitted_used_space: " << ptr.p->m_uncommitted_used_space
 
48
      << " ]";
 
49
  
 
50
  return out;
 
51
}
 
52
 
 
53
static
 
54
NdbOut&
 
55
operator<<(NdbOut& out, const Ptr<Dbtup::Extent_info> & ptr)
 
56
{
 
57
  out << "[ Extent_info: ptr.i " << ptr.i
 
58
      << " " << ptr.p->m_key
 
59
      << " m_first_page_no: " << ptr.p->m_first_page_no
 
60
      << " m_free_space: " << ptr.p->m_free_space
 
61
      << " m_free_matrix_pos: " << ptr.p->m_free_matrix_pos
 
62
      << " m_free_page_count: [";
 
63
 
 
64
  for(Uint32 i = 0; i<Dbtup::EXTENT_SEARCH_MATRIX_COLS; i++)
 
65
    out << " " << ptr.p->m_free_page_count[i];
 
66
  out << " ] ]";
 
67
 
 
68
  return out;
 
69
}
 
70
 
 
71
#if NOT_YET_FREE_EXTENT
 
72
static
 
73
inline
 
74
bool
 
75
check_free(const Dbtup::Extent_info* extP)
 
76
{
 
77
  Uint32 res = 0;
 
78
  for (Uint32 i = 1; i<MAX_FREE_LIST; i++)
 
79
    res += extP->m_free_page_count[i];
 
80
  return res;
 
81
}
 
82
#error "Code for deallocting extents when they get empty"
 
83
#error "This code is not yet complete"
 
84
#endif
 
85
 
 
86
#if NOT_YET_UNDO_ALLOC_EXTENT
 
87
#error "This is needed for deallocting extents when they get empty"
 
88
#error "This code is not complete yet"
 
89
#endif
 
90
 
 
91
void 
 
92
Dbtup::dump_disk_alloc(Dbtup::Disk_alloc_info & alloc)
 
93
{
 
94
  ndbout_c("dirty pages");
 
95
  for(Uint32 i = 0; i<MAX_FREE_LIST; i++)
 
96
  {
 
97
    printf("  %d : ", i);
 
98
    PagePtr ptr;
 
99
    ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
 
100
    LocalDLList<Page> list(*pool, alloc.m_dirty_pages[i]);
 
101
    for(list.first(ptr); !ptr.isNull(); list.next(ptr))
 
102
    {
 
103
      ndbout << ptr << " ";
 
104
    }
 
105
    ndbout_c(" ");
 
106
  }
 
107
  ndbout_c("page requests");
 
108
  for(Uint32 i = 0; i<MAX_FREE_LIST; i++)
 
109
  {
 
110
    printf("  %d : ", i);
 
111
    Ptr<Page_request> ptr;
 
112
    Local_page_request_list list(c_page_request_pool, 
 
113
                                 alloc.m_page_requests[i]);
 
114
    for(list.first(ptr); !ptr.isNull(); list.next(ptr))
 
115
    {
 
116
      ndbout << ptr << " ";
 
117
    }
 
118
    ndbout_c(" ");
 
119
  }
 
120
 
 
121
  ndbout_c("Extent matrix");
 
122
  for(Uint32 i = 0; i<alloc.SZ; i++)
 
123
  {
 
124
    printf("  %d : ", i);
 
125
    Ptr<Extent_info> ptr;
 
126
    Local_extent_info_list list(c_extent_pool, alloc.m_free_extents[i]);
 
127
    for(list.first(ptr); !ptr.isNull(); list.next(ptr))
 
128
    {
 
129
      ndbout << ptr << " ";
 
130
    }
 
131
    ndbout_c(" ");
 
132
  }
 
133
 
 
134
  if (alloc.m_curr_extent_info_ptr_i != RNIL)
 
135
  {
 
136
    Ptr<Extent_info> ptr;
 
137
    c_extent_pool.getPtr(ptr, alloc.m_curr_extent_info_ptr_i);
 
138
    ndbout << "current extent: " << ptr << endl;
 
139
  }
 
140
}
 
141
 
 
142
#if defined VM_TRACE || true
 
143
#define ddassert(x) do { if(unlikely(!(x))) { dump_disk_alloc(alloc); ndbrequire(false); } } while(0)
 
144
#else
 
145
#define ddassert(x)
 
146
#endif
 
147
 
 
148
Dbtup::Disk_alloc_info::Disk_alloc_info(const Tablerec* tabPtrP, 
 
149
                                        Uint32 extent_size)
 
150
{
 
151
  m_extent_size = extent_size;
 
152
  m_curr_extent_info_ptr_i = RNIL; 
 
153
  if (tabPtrP->m_no_of_disk_attributes == 0)
 
154
    return;
 
155
  
 
156
  Uint32 min_size= 4*tabPtrP->m_offsets[DD].m_fix_header_size;
 
157
  
 
158
  if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
 
159
  {
 
160
    Uint32 recs_per_page= (4*Tup_fixsize_page::DATA_WORDS)/min_size;
 
161
    m_page_free_bits_map[0] = recs_per_page; // 100% free
 
162
    m_page_free_bits_map[1] = 1;
 
163
    m_page_free_bits_map[2] = 0;
 
164
    m_page_free_bits_map[3] = 0;
 
165
    
 
166
    Uint32 max= recs_per_page * extent_size;
 
167
    for(Uint32 i = 0; i<EXTENT_SEARCH_MATRIX_ROWS; i++)
 
168
    {
 
169
      m_total_extent_free_space_thresholds[i] = 
 
170
        (EXTENT_SEARCH_MATRIX_ROWS - i - 1)*max/EXTENT_SEARCH_MATRIX_ROWS;
 
171
    }
 
172
  }
 
173
  else
 
174
  {
 
175
    abort();
 
176
  }
 
177
}
 
178
 
 
179
Uint32
 
180
Dbtup::Disk_alloc_info::find_extent(Uint32 sz) const
 
181
{
 
182
  /**
 
183
   * Find an extent with sufficient space for sz
 
184
   * Find the biggest available (with most free space)
 
185
   * Return position in matrix
 
186
   */
 
187
  Uint32 col = calc_page_free_bits(sz);
 
188
  Uint32 mask= EXTENT_SEARCH_MATRIX_COLS - 1;
 
189
  for(Uint32 i= 0; i<EXTENT_SEARCH_MATRIX_SIZE; i++)
 
190
  {
 
191
    // Check that it can cater for request
 
192
    if (!m_free_extents[i].isEmpty())
 
193
    {
 
194
      return i;
 
195
    }
 
196
    
 
197
    if ((i & mask) >= col)
 
198
    {
 
199
      i = (i & ~mask) + mask;
 
200
    }
 
201
  }
 
202
  
 
203
  return RNIL;
 
204
}
 
205
 
 
206
Uint32
 
207
Dbtup::Disk_alloc_info::calc_extent_pos(const Extent_info* extP) const
 
208
{
 
209
  Uint32 free= extP->m_free_space;
 
210
  Uint32 mask= EXTENT_SEARCH_MATRIX_COLS - 1;
 
211
  
 
212
  Uint32 col= 0, row=0;
 
213
  
 
214
  /**
 
215
   * Find correct row based on total free space
 
216
   *   if zero (or very small free space) put 
 
217
   *     absolutly last
 
218
   */
 
219
  {    
 
220
    const Uint32 *arr= m_total_extent_free_space_thresholds;
 
221
    for(; free < * arr++; row++)
 
222
      assert(row < EXTENT_SEARCH_MATRIX_ROWS);
 
223
  }
 
224
 
 
225
  /**
 
226
   * Find correct col based on largest available chunk
 
227
   */
 
228
  {
 
229
    const Uint16 *arr= extP->m_free_page_count;
 
230
    for(; col < EXTENT_SEARCH_MATRIX_COLS && * arr++ == 0; col++);
 
231
  }
 
232
 
 
233
  /**
 
234
   * NOTE
 
235
   *
 
236
   *   If free space on extent is small or zero,
 
237
   *     col will be = EXTENT_SEARCH_MATRIX_COLS
 
238
   *     row will be = EXTENT_SEARCH_MATRIX_ROWS
 
239
   *   in that case pos will be col * row = max pos
 
240
   *   (as fixed by + 1 in declaration)
 
241
   */
 
242
  Uint32 pos= (row * (mask + 1)) + (col & mask);
 
243
  
 
244
  assert(pos < EXTENT_SEARCH_MATRIX_SIZE);
 
245
  return pos;
 
246
}
 
247
 
 
248
void
 
249
Dbtup::update_extent_pos(Disk_alloc_info& alloc, 
 
250
                         Ptr<Extent_info> extentPtr)
 
251
{
 
252
#ifdef VM_TRACE
 
253
  Uint32 min_free = 0;
 
254
  for(Uint32 i = 0; i<MAX_FREE_LIST; i++)
 
255
  {
 
256
    Uint32 sum = alloc.calc_page_free_space(i);
 
257
    min_free += sum * extentPtr.p->m_free_page_count[i];
 
258
  }
 
259
  ddassert(extentPtr.p->m_free_space >= min_free);
 
260
#endif
 
261
  
 
262
  Uint32 old = extentPtr.p->m_free_matrix_pos;
 
263
  if (old != RNIL)
 
264
  {
 
265
    Uint32 pos = alloc.calc_extent_pos(extentPtr.p);
 
266
    if (old != pos)
 
267
    {
 
268
      jam();
 
269
      Local_extent_info_list old_list(c_extent_pool, alloc.m_free_extents[old]);
 
270
      Local_extent_info_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
 
271
      old_list.remove(extentPtr);
 
272
      new_list.add(extentPtr);
 
273
      extentPtr.p->m_free_matrix_pos= pos;
 
274
    }
 
275
  }
 
276
  else
 
277
  {
 
278
    ddassert(alloc.m_curr_extent_info_ptr_i == extentPtr.i);
 
279
  }
 
280
}
 
281
 
 
282
void
 
283
Dbtup::restart_setup_page(Disk_alloc_info& alloc, PagePtr pagePtr)
 
284
{
 
285
  jam();
 
286
  /**
 
287
   * Link to extent, clear uncommitted_used_space
 
288
   */
 
289
  pagePtr.p->uncommitted_used_space = 0;
 
290
  pagePtr.p->m_restart_seq = globalData.m_restart_seq;
 
291
  
 
292
  Extent_info key;
 
293
  key.m_key.m_file_no = pagePtr.p->m_file_no;
 
294
  key.m_key.m_page_idx = pagePtr.p->m_extent_no;
 
295
  Ptr<Extent_info> extentPtr;
 
296
  ndbrequire(c_extent_hash.find(extentPtr, key));
 
297
  pagePtr.p->m_extent_info_ptr = extentPtr.i;
 
298
 
 
299
  Uint32 idx = pagePtr.p->list_index & ~0x8000;
 
300
  Uint32 estimated = alloc.calc_page_free_space(idx);
 
301
  Uint32 real_free = pagePtr.p->free_space;
 
302
 
 
303
  ddassert(real_free >= estimated);
 
304
  if (real_free != estimated)
 
305
  {
 
306
    jam();
 
307
    extentPtr.p->m_free_space += (real_free - estimated);
 
308
    update_extent_pos(alloc, extentPtr);
 
309
  }
 
310
 
 
311
#ifdef VM_TRACE
 
312
  {
 
313
    Local_key page;
 
314
    page.m_file_no = pagePtr.p->m_file_no;
 
315
    page.m_page_no = pagePtr.p->m_page_no;
 
316
 
 
317
    Tablespace_client tsman(0, c_tsman,
 
318
                            0, 0, 0);
 
319
    unsigned uncommitted, committed;
 
320
    uncommitted = committed = ~(unsigned)0;
 
321
    (void) tsman.get_page_free_bits(&page, &uncommitted, &committed);
 
322
    jamEntry();
 
323
    
 
324
    idx = alloc.calc_page_free_bits(real_free);
 
325
    ddassert(idx == committed);
 
326
  }
 
327
#endif
 
328
}
 
329
 
 
330
/**
 
331
 * - Page free bits -
 
332
 * 0 = 00 - free - 100% free
 
333
 * 1 = 01 - atleast 70% free, 70= pct_free + 2 * (100 - pct_free) / 3
 
334
 * 2 = 10 - atleast 40% free, 40= pct_free + (100 - pct_free) / 3
 
335
 * 3 = 11 - full - less than pct_free% free, pct_free=10%
 
336
 *
 
337
 */
 
338
 
 
339
#define DBG_DISK 0
 
340
 
 
341
int
 
342
Dbtup::disk_page_prealloc(Signal* signal, 
 
343
                          Ptr<Fragrecord> fragPtr,
 
344
                          Local_key* key, Uint32 sz)
 
345
{
 
346
  int err;
 
347
  Uint32 i, ptrI;
 
348
  Ptr<Page_request> req;
 
349
  Fragrecord* fragPtrP = fragPtr.p; 
 
350
  Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
 
351
  Uint32 idx= alloc.calc_page_free_bits(sz);
 
352
  Tablespace_client tsman(signal, c_tsman,
 
353
                          fragPtrP->fragTableId,
 
354
                          fragPtrP->fragmentId,
 
355
                          fragPtrP->m_tablespace_id);
 
356
  
 
357
  if (DBG_DISK)
 
358
    ndbout << "disk_page_prealloc";
 
359
 
 
360
  /**
 
361
   * 1) search current dirty pages
 
362
   */
 
363
  for(i= 0; i <= idx; i++)
 
364
  {
 
365
    if (!alloc.m_dirty_pages[i].isEmpty())
 
366
    {
 
367
      ptrI= alloc.m_dirty_pages[i].firstItem;
 
368
      Ptr<GlobalPage> gpage;
 
369
      m_global_page_pool.getPtr(gpage, ptrI);
 
370
      
 
371
      PagePtr tmp;
 
372
      tmp.i = gpage.i;
 
373
      tmp.p = reinterpret_cast<Page*>(gpage.p);
 
374
      disk_page_prealloc_dirty_page(alloc, tmp, i, sz);
 
375
      key->m_page_no= tmp.p->m_page_no;
 
376
      key->m_file_no= tmp.p->m_file_no;
 
377
      if (DBG_DISK)
 
378
        ndbout << " found dirty page " << *key << endl;
 
379
      jam();
 
380
      return 0; // Page in memory
 
381
    }
 
382
  }
 
383
  
 
384
  /**
 
385
   * Search outanding page requests
 
386
   *   callback does not need to access page request again
 
387
   *   as it's not the first request to this page
 
388
   */
 
389
  for(i= 0; i <= idx; i++)
 
390
  {
 
391
    if (!alloc.m_page_requests[i].isEmpty())
 
392
    {
 
393
      ptrI= alloc.m_page_requests[i].firstItem;
 
394
      Ptr<Page_request> req;
 
395
      c_page_request_pool.getPtr(req, ptrI);
 
396
 
 
397
      disk_page_prealloc_transit_page(alloc, req, i, sz);
 
398
      * key = req.p->m_key;
 
399
      if (DBG_DISK)
 
400
        ndbout << " found transit page " << *key << endl;
 
401
      jam();
 
402
      return 0;
 
403
    }
 
404
  }
 
405
  
 
406
  /**
 
407
   * We need to request a page...
 
408
   */
 
409
  if (!c_page_request_pool.seize(req))
 
410
  {
 
411
    jam();
 
412
    err= 1;
 
413
    //XXX set error code
 
414
    ndbout_c("no free request");
 
415
    return -err;
 
416
  }
 
417
 
 
418
  req.p->m_ref_count= 1;
 
419
  req.p->m_frag_ptr_i= fragPtr.i;
 
420
  req.p->m_uncommitted_used_space= sz;
 
421
  
 
422
  int pageBits; // received
 
423
  Ptr<Extent_info> ext;
 
424
  const Uint32 bits= alloc.calc_page_free_bits(sz); // required
 
425
  bool found= false;
 
426
 
 
427
  /**
 
428
   * Do we have a current extent
 
429
   */
 
430
  if ((ext.i= alloc.m_curr_extent_info_ptr_i) != RNIL)
 
431
  {
 
432
    jam();
 
433
    c_extent_pool.getPtr(ext);
 
434
    if ((pageBits= tsman.alloc_page_from_extent(&ext.p->m_key, bits)) >= 0) 
 
435
    {
 
436
      jamEntry();
 
437
      found= true;
 
438
    }
 
439
    else
 
440
    {
 
441
      jamEntry();
 
442
      /**
 
443
       * The current extent is not in a free list
 
444
       *   and since it couldn't accomadate the request
 
445
       *   we put it on the free list
 
446
       */
 
447
      alloc.m_curr_extent_info_ptr_i = RNIL;
 
448
      Uint32 pos= alloc.calc_extent_pos(ext.p);
 
449
      ext.p->m_free_matrix_pos = pos;
 
450
      Local_extent_info_list list(c_extent_pool, alloc.m_free_extents[pos]);
 
451
      list.add(ext);
 
452
    }
 
453
  }
 
454
  
 
455
  if (!found)
 
456
  {
 
457
    Uint32 pos;
 
458
    if ((pos= alloc.find_extent(sz)) != RNIL)
 
459
    {
 
460
      jam();
 
461
      Local_extent_info_list list(c_extent_pool, alloc.m_free_extents[pos]);
 
462
      list.first(ext);
 
463
      list.remove(ext);
 
464
    }
 
465
    else 
 
466
    {
 
467
      jam();
 
468
      /**
 
469
       * We need to alloc an extent
 
470
       */
 
471
#if NOT_YET_UNDO_ALLOC_EXTENT
 
472
      Uint32 logfile_group_id = fragPtr.p->m_logfile_group_id;
 
473
 
 
474
      err = c_lgman->alloc_log_space(logfile_group_id,
 
475
                                     sizeof(Disk_undo::AllocExtent)>>2);
 
476
      jamEntry();
 
477
      if(unlikely(err))
 
478
      {
 
479
        return -err;
 
480
      }
 
481
#endif
 
482
 
 
483
      if (!c_extent_pool.seize(ext))
 
484
      {
 
485
        jam();
 
486
        //XXX
 
487
        err= 2;
 
488
#if NOT_YET_UNDO_ALLOC_EXTENT
 
489
        c_lgman->free_log_space(logfile_group_id, 
 
490
                                sizeof(Disk_undo::AllocExtent)>>2);
 
491
#endif
 
492
        c_page_request_pool.release(req);
 
493
        ndbout_c("no free extent info");
 
494
        return -err;
 
495
      }
 
496
      
 
497
      if ((err= tsman.alloc_extent(&ext.p->m_key)) < 0)
 
498
      {
 
499
        jamEntry();
 
500
#if NOT_YET_UNDO_ALLOC_EXTENT
 
501
        c_lgman->free_log_space(logfile_group_id, 
 
502
                                sizeof(Disk_undo::AllocExtent)>>2);
 
503
#endif
 
504
        c_extent_pool.release(ext);
 
505
        c_page_request_pool.release(req);
 
506
        return err;
 
507
      }
 
508
 
 
509
      int pages= err;
 
510
#if NOT_YET_UNDO_ALLOC_EXTENT
 
511
      {
 
512
        /**
 
513
         * Do something here
 
514
         */
 
515
        {
 
516
          Callback cb;
 
517
          cb.m_callbackData= ext.i;
 
518
          cb.m_callbackFunction = 
 
519
            safe_cast(&Dbtup::disk_page_alloc_extent_log_buffer_callback);
 
520
          Uint32 sz= sizeof(Disk_undo::AllocExtent)>>2;
 
521
          
 
522
          Logfile_client lgman(this, c_lgman, logfile_group_id);
 
523
          int res= lgman.get_log_buffer(signal, sz, &cb);
 
524
          switch(res){
 
525
          case 0:
 
526
            break;
 
527
          case -1:
 
528
            ndbrequire("NOT YET IMPLEMENTED" == 0);
 
529
            break;
 
530
          default:
 
531
            execute(signal, cb, res);       
 
532
          }
 
533
        }
 
534
      }
 
535
#endif
 
536
      
 
537
      ndbout << "allocated " << pages << " pages: " << ext.p->m_key << endl;
 
538
      ext.p->m_first_page_no = ext.p->m_key.m_page_no;
 
539
      bzero(ext.p->m_free_page_count, sizeof(ext.p->m_free_page_count));
 
540
      ext.p->m_free_space= alloc.m_page_free_bits_map[0] * pages; 
 
541
      ext.p->m_free_page_count[0]= pages; // All pages are "free"-est
 
542
      c_extent_hash.add(ext);
 
543
 
 
544
      Local_fragment_extent_list list1(c_extent_pool, alloc.m_extent_list);
 
545
      list1.add(ext);
 
546
    }      
 
547
    
 
548
    alloc.m_curr_extent_info_ptr_i= ext.i;
 
549
    ext.p->m_free_matrix_pos= RNIL;
 
550
    pageBits= tsman.alloc_page_from_extent(&ext.p->m_key, bits);
 
551
    jamEntry();
 
552
    ddassert(pageBits >= 0);
 
553
  }
 
554
  
 
555
  /**
 
556
   * We have a page from an extent
 
557
   */
 
558
  *key= req.p->m_key= ext.p->m_key;
 
559
 
 
560
  if (DBG_DISK)
 
561
    ndbout << " allocated page " << *key << endl;
 
562
  
 
563
  /**
 
564
   * We don't know exact free space of page
 
565
   *   but we know what page free bits it has.
 
566
   *   compute free space based on them
 
567
   */
 
568
  Uint32 size= alloc.calc_page_free_space((Uint32)pageBits);
 
569
  
 
570
  ddassert(size >= sz);
 
571
  Uint32 new_size = size - sz;   // Subtract alloc rec
 
572
  req.p->m_estimated_free_space= new_size; // Store on page request
 
573
 
 
574
  Uint32 newPageBits= alloc.calc_page_free_bits(new_size);
 
575
  if (newPageBits != (Uint32)pageBits)
 
576
  {
 
577
    jam();
 
578
    ddassert(ext.p->m_free_page_count[pageBits] > 0);
 
579
    ext.p->m_free_page_count[pageBits]--;
 
580
    ext.p->m_free_page_count[newPageBits]++;
 
581
  }
 
582
  ddassert(ext.p->m_free_space >= sz);
 
583
  ext.p->m_free_space -= sz;
 
584
  
 
585
  // And put page request in correct free list
 
586
  idx= alloc.calc_page_free_bits(new_size);
 
587
  {
 
588
    Local_page_request_list list(c_page_request_pool, 
 
589
                                 alloc.m_page_requests[idx]);
 
590
    
 
591
    list.add(req);
 
592
  }
 
593
  req.p->m_list_index= idx;
 
594
  req.p->m_extent_info_ptr= ext.i;
 
595
 
 
596
  Page_cache_client::Request preq;
 
597
  preq.m_page = *key;
 
598
  preq.m_callback.m_callbackData= req.i;
 
599
  preq.m_callback.m_callbackFunction = 
 
600
    safe_cast(&Dbtup::disk_page_prealloc_callback);
 
601
  
 
602
  int flags= Page_cache_client::ALLOC_REQ;
 
603
  if (pageBits == 0)
 
604
  {
 
605
    jam();
 
606
    //XXX empty page -> fast to map
 
607
    flags |= Page_cache_client::EMPTY_PAGE;
 
608
    preq.m_callback.m_callbackFunction = 
 
609
      safe_cast(&Dbtup::disk_page_prealloc_initial_callback);
 
610
  }
 
611
  
 
612
  int res= m_pgman.get_page(signal, preq, flags);
 
613
  jamEntry();
 
614
  switch(res)
 
615
  {
 
616
  case 0:
 
617
    jam();
 
618
    break;
 
619
  case -1:
 
620
    ndbassert(false);
 
621
    break;
 
622
  default:
 
623
    jam();
 
624
    execute(signal, preq.m_callback, res); // run callback
 
625
  }
 
626
  
 
627
  return res;
 
628
}
 
629
 
 
630
void
 
631
Dbtup::disk_page_prealloc_dirty_page(Disk_alloc_info & alloc,
 
632
                                     PagePtr pagePtr, 
 
633
                                     Uint32 old_idx, Uint32 sz)
 
634
{
 
635
  jam();
 
636
  ddassert(pagePtr.p->list_index == old_idx);
 
637
 
 
638
  Uint32 free= pagePtr.p->free_space;
 
639
  Uint32 used= pagePtr.p->uncommitted_used_space + sz;
 
640
  Uint32 ext= pagePtr.p->m_extent_info_ptr;
 
641
  
 
642
  ddassert(free >= used);
 
643
  Ptr<Extent_info> extentPtr;
 
644
  c_extent_pool.getPtr(extentPtr, ext);
 
645
 
 
646
  Uint32 new_idx= alloc.calc_page_free_bits(free - used);
 
647
  ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
 
648
 
 
649
  if (old_idx != new_idx)
 
650
  {
 
651
    jam();
 
652
    LocalDLList<Page> old_list(*pool, alloc.m_dirty_pages[old_idx]);
 
653
    LocalDLList<Page> new_list(*pool, alloc.m_dirty_pages[new_idx]);
 
654
    old_list.remove(pagePtr);
 
655
    new_list.add(pagePtr);
 
656
 
 
657
    ddassert(extentPtr.p->m_free_page_count[old_idx]);
 
658
    extentPtr.p->m_free_page_count[old_idx]--;
 
659
    extentPtr.p->m_free_page_count[new_idx]++;
 
660
    pagePtr.p->list_index= new_idx;  
 
661
  }
 
662
 
 
663
  pagePtr.p->uncommitted_used_space = used;
 
664
  ddassert(extentPtr.p->m_free_space >= sz);
 
665
  extentPtr.p->m_free_space -= sz;
 
666
  update_extent_pos(alloc, extentPtr);
 
667
}
 
668
 
 
669
 
 
670
void
 
671
Dbtup::disk_page_prealloc_transit_page(Disk_alloc_info& alloc,
 
672
                                       Ptr<Page_request> req, 
 
673
                                       Uint32 old_idx, Uint32 sz)
 
674
{
 
675
  jam();
 
676
  ddassert(req.p->m_list_index == old_idx);
 
677
 
 
678
  Uint32 free= req.p->m_estimated_free_space;
 
679
  Uint32 used= req.p->m_uncommitted_used_space + sz;
 
680
  Uint32 ext= req.p->m_extent_info_ptr;
 
681
  
 
682
  Ptr<Extent_info> extentPtr;
 
683
  c_extent_pool.getPtr(extentPtr, ext);
 
684
 
 
685
  ddassert(free >= sz);
 
686
  Uint32 new_idx= alloc.calc_page_free_bits(free - sz);
 
687
  
 
688
  if (old_idx != new_idx)
 
689
  {
 
690
    jam();
 
691
    Page_request_list::Head *lists = alloc.m_page_requests;
 
692
    Local_page_request_list old_list(c_page_request_pool, lists[old_idx]);
 
693
    Local_page_request_list new_list(c_page_request_pool, lists[new_idx]);
 
694
    old_list.remove(req);
 
695
    new_list.add(req);
 
696
 
 
697
    ddassert(extentPtr.p->m_free_page_count[old_idx]);
 
698
    extentPtr.p->m_free_page_count[old_idx]--;
 
699
    extentPtr.p->m_free_page_count[new_idx]++;
 
700
    req.p->m_list_index= new_idx;  
 
701
  }
 
702
 
 
703
  req.p->m_uncommitted_used_space = used;
 
704
  req.p->m_estimated_free_space = free - sz;
 
705
  ddassert(extentPtr.p->m_free_space >= sz);
 
706
  extentPtr.p->m_free_space -= sz;
 
707
  update_extent_pos(alloc, extentPtr);
 
708
}
 
709
 
 
710
 
 
711
void
 
712
Dbtup::disk_page_prealloc_callback(Signal* signal, 
 
713
                                   Uint32 page_request, Uint32 page_id)
 
714
{
 
715
  jamEntry();
 
716
  //ndbout_c("disk_alloc_page_callback id: %d", page_id);
 
717
 
 
718
  Ptr<Page_request> req;
 
719
  c_page_request_pool.getPtr(req, page_request);
 
720
 
 
721
  Ptr<GlobalPage> gpage;
 
722
  m_global_page_pool.getPtr(gpage, page_id);
 
723
 
 
724
  Ptr<Fragrecord> fragPtr;
 
725
  fragPtr.i= req.p->m_frag_ptr_i;
 
726
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
727
 
 
728
  PagePtr pagePtr;
 
729
  pagePtr.i = gpage.i;
 
730
  pagePtr.p = reinterpret_cast<Page*>(gpage.p);
 
731
 
 
732
  if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
 
733
  {
 
734
    restart_setup_page(fragPtr.p->m_disk_alloc_info, pagePtr);
 
735
  }
 
736
 
 
737
  disk_page_prealloc_callback_common(signal, req, fragPtr, pagePtr);
 
738
}
 
739
 
 
740
void
 
741
Dbtup::disk_page_prealloc_initial_callback(Signal*signal, 
 
742
                                           Uint32 page_request, 
 
743
                                           Uint32 page_id)
 
744
{
 
745
  jamEntry();
 
746
  //ndbout_c("disk_alloc_page_callback_initial id: %d", page_id);  
 
747
  /**
 
748
   * 1) lookup page request
 
749
   * 2) lookup page
 
750
   * 3) lookup table
 
751
   * 4) init page (according to page type)
 
752
   * 5) call ordinary callback
 
753
   */
 
754
  Ptr<Page_request> req;
 
755
  c_page_request_pool.getPtr(req, page_request);
 
756
 
 
757
  Ptr<GlobalPage> gpage;
 
758
  m_global_page_pool.getPtr(gpage, page_id);
 
759
  PagePtr pagePtr;
 
760
  pagePtr.i = gpage.i;
 
761
  pagePtr.p = reinterpret_cast<Page*>(gpage.p);
 
762
 
 
763
  Ptr<Fragrecord> fragPtr;
 
764
  fragPtr.i= req.p->m_frag_ptr_i;
 
765
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
 
766
 
 
767
  Ptr<Tablerec> tabPtr;
 
768
  tabPtr.i = fragPtr.p->fragTableId;
 
769
  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
 
770
 
 
771
  Ptr<Extent_info> extentPtr;
 
772
  c_extent_pool.getPtr(extentPtr, req.p->m_extent_info_ptr);
 
773
 
 
774
  pagePtr.p->m_page_no= req.p->m_key.m_page_no;
 
775
  pagePtr.p->m_file_no= req.p->m_key.m_file_no;
 
776
  pagePtr.p->m_table_id= fragPtr.p->fragTableId;
 
777
  pagePtr.p->m_fragment_id = fragPtr.p->fragmentId;
 
778
  pagePtr.p->m_extent_no = extentPtr.p->m_key.m_page_idx; // logical extent no
 
779
  pagePtr.p->m_extent_info_ptr= req.p->m_extent_info_ptr;
 
780
  pagePtr.p->m_restart_seq = globalData.m_restart_seq;
 
781
  pagePtr.p->list_index = 0x8000;
 
782
  pagePtr.p->uncommitted_used_space = 0;
 
783
  pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
 
784
  
 
785
  if (tabPtr.p->m_attributes[DD].m_no_of_varsize == 0)
 
786
  {
 
787
    convertThPage((Fix_page*)pagePtr.p, tabPtr.p, DD);
 
788
  }
 
789
  else
 
790
  {
 
791
    abort();
 
792
  }
 
793
  disk_page_prealloc_callback_common(signal, req, fragPtr, pagePtr);
 
794
}
 
795
 
 
796
void
 
797
Dbtup::disk_page_prealloc_callback_common(Signal* signal, 
 
798
                                          Ptr<Page_request> req, 
 
799
                                          Ptr<Fragrecord> fragPtr, 
 
800
                                          PagePtr pagePtr)
 
801
{
 
802
  /**
 
803
   * 1) remove page request from Disk_alloc_info.m_page_requests
 
804
   * 2) Add page to Disk_alloc_info.m_dirty_pages
 
805
   * 3) register callback in pgman (unmap callback)
 
806
   * 4) inform pgman about current users
 
807
   */
 
808
  Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
 
809
  ddassert((pagePtr.p->list_index & 0x8000) == 0x8000);
 
810
  ddassert(pagePtr.p->m_extent_info_ptr == req.p->m_extent_info_ptr);
 
811
  ddassert(pagePtr.p->m_page_no == req.p->m_key.m_page_no);
 
812
  ddassert(pagePtr.p->m_file_no == req.p->m_key.m_file_no);
 
813
  
 
814
  Uint32 old_idx = req.p->m_list_index;
 
815
  Uint32 free= req.p->m_estimated_free_space;
 
816
  Uint32 ext = req.p->m_extent_info_ptr;
 
817
  Uint32 used= req.p->m_uncommitted_used_space;
 
818
  Uint32 real_free = pagePtr.p->free_space;
 
819
  Uint32 real_used = used + pagePtr.p->uncommitted_used_space;
 
820
 
 
821
  ddassert(real_free >= free);
 
822
  ddassert(real_free >= real_used);
 
823
  ddassert(alloc.calc_page_free_bits(free) == old_idx);
 
824
  Uint32 new_idx= alloc.calc_page_free_bits(real_free - real_used);
 
825
 
 
826
  /**
 
827
   * Add to dirty pages
 
828
   */
 
829
  ArrayPool<Page> *cheat_pool= (ArrayPool<Page>*)&m_global_page_pool;
 
830
  LocalDLList<Page> list(* cheat_pool, alloc.m_dirty_pages[new_idx]);
 
831
  list.add(pagePtr);
 
832
  pagePtr.p->uncommitted_used_space = real_used;
 
833
  pagePtr.p->list_index = new_idx;
 
834
 
 
835
  if (old_idx != new_idx || free != real_free)
 
836
  {
 
837
    jam();
 
838
    Ptr<Extent_info> extentPtr;
 
839
    c_extent_pool.getPtr(extentPtr, ext);
 
840
 
 
841
    extentPtr.p->m_free_space += (real_free - free);
 
842
    
 
843
    if (old_idx != new_idx)
 
844
    {
 
845
      jam();
 
846
      ddassert(extentPtr.p->m_free_page_count[old_idx]);
 
847
      extentPtr.p->m_free_page_count[old_idx]--;
 
848
      extentPtr.p->m_free_page_count[new_idx]++;
 
849
    }
 
850
    
 
851
    update_extent_pos(alloc, extentPtr);
 
852
  }
 
853
  
 
854
  {
 
855
    Local_page_request_list list(c_page_request_pool, 
 
856
                                 alloc.m_page_requests[old_idx]);
 
857
    list.release(req);
 
858
  }
 
859
}
 
860
 
 
861
void
 
862
Dbtup::disk_page_set_dirty(PagePtr pagePtr)
 
863
{
 
864
  jam();
 
865
  Uint32 idx = pagePtr.p->list_index;
 
866
  if ((idx & 0x8000) == 0)
 
867
  {
 
868
    jam();
 
869
    /**
 
870
     * Already in dirty list
 
871
     */
 
872
    return ;
 
873
  }
 
874
  
 
875
  Local_key key;
 
876
  key.m_page_no = pagePtr.p->m_page_no;
 
877
  key.m_file_no = pagePtr.p->m_file_no;
 
878
 
 
879
  pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
 
880
 
 
881
  if (DBG_DISK)
 
882
    ndbout << " disk_page_set_dirty " << key << endl;
 
883
  
 
884
  Ptr<Tablerec> tabPtr;
 
885
  tabPtr.i= pagePtr.p->m_table_id;
 
886
  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
 
887
  
 
888
  Ptr<Fragrecord> fragPtr;
 
889
  getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
 
890
  
 
891
  Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
 
892
 
 
893
  Uint32 free = pagePtr.p->free_space;
 
894
  Uint32 used = pagePtr.p->uncommitted_used_space;
 
895
  if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
 
896
  {
 
897
    restart_setup_page(alloc, pagePtr);
 
898
    idx = alloc.calc_page_free_bits(free);
 
899
    used = 0;
 
900
  }
 
901
  else
 
902
  {
 
903
    idx &= ~0x8000;
 
904
    ddassert(idx == alloc.calc_page_free_bits(free - used));
 
905
  }
 
906
  
 
907
  ddassert(free >= used);
 
908
  
 
909
  Tablespace_client tsman(0, c_tsman,
 
910
                          fragPtr.p->fragTableId,
 
911
                          fragPtr.p->fragmentId,
 
912
                          fragPtr.p->m_tablespace_id);
 
913
  
 
914
  pagePtr.p->list_index = idx;
 
915
  ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
 
916
  LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
 
917
  list.add(pagePtr);
 
918
  
 
919
  // Make sure no one will allocate it...
 
920
  tsman.unmap_page(&key, MAX_FREE_LIST - 1);
 
921
  jamEntry();
 
922
}
 
923
 
 
924
void
 
925
Dbtup::disk_page_unmap_callback(Uint32 when,
 
926
                                Uint32 page_id, Uint32 dirty_count)
 
927
{
 
928
  jamEntry();
 
929
  Ptr<GlobalPage> gpage;
 
930
  m_global_page_pool.getPtr(gpage, page_id);
 
931
  PagePtr pagePtr;
 
932
  pagePtr.i = gpage.i;
 
933
  pagePtr.p = reinterpret_cast<Page*>(gpage.p);
 
934
  
 
935
  Uint32 type = pagePtr.p->m_page_header.m_page_type;
 
936
  if (unlikely((type != File_formats::PT_Tup_fixsize_page &&
 
937
                type != File_formats::PT_Tup_varsize_page) ||
 
938
               f_undo_done == false))
 
939
  {
 
940
    jam();
 
941
    return ;
 
942
  }
 
943
 
 
944
  Uint32 idx = pagePtr.p->list_index;
 
945
 
 
946
  Ptr<Tablerec> tabPtr;
 
947
  tabPtr.i= pagePtr.p->m_table_id;
 
948
  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
 
949
  
 
950
  Ptr<Fragrecord> fragPtr;
 
951
  getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
 
952
  
 
953
  Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
 
954
  
 
955
  if (when == 0)
 
956
  {
 
957
    /**
 
958
     * Before pageout
 
959
     */
 
960
    jam();
 
961
 
 
962
    if (DBG_DISK)
 
963
    {
 
964
      Local_key key;
 
965
      key.m_page_no = pagePtr.p->m_page_no;
 
966
      key.m_file_no = pagePtr.p->m_file_no;
 
967
      ndbout << "disk_page_unmap_callback(before) " << key 
 
968
             << " cnt: " << dirty_count << " " << (idx & ~0x8000) << endl;
 
969
    }
 
970
 
 
971
    ndbassert((idx & 0x8000) == 0);
 
972
 
 
973
    ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
 
974
    LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
 
975
    LocalDLList<Page> list2(*pool, alloc.m_unmap_pages);
 
976
    list.remove(pagePtr);
 
977
    list2.add(pagePtr);
 
978
 
 
979
    if (dirty_count == 0)
 
980
    {
 
981
      jam();
 
982
      pagePtr.p->list_index = idx | 0x8000;      
 
983
      
 
984
      Local_key key;
 
985
      key.m_page_no = pagePtr.p->m_page_no;
 
986
      key.m_file_no = pagePtr.p->m_file_no;
 
987
      
 
988
      Uint32 free = pagePtr.p->free_space;
 
989
      Uint32 used = pagePtr.p->uncommitted_used_space;
 
990
      ddassert(free >= used);
 
991
      ddassert(alloc.calc_page_free_bits(free - used) == idx);
 
992
      
 
993
      Tablespace_client tsman(0, c_tsman,
 
994
                              fragPtr.p->fragTableId,
 
995
                              fragPtr.p->fragmentId,
 
996
                              fragPtr.p->m_tablespace_id);
 
997
      
 
998
      tsman.unmap_page(&key, idx);
 
999
      jamEntry();
 
1000
    }
 
1001
  }
 
1002
  else if (when == 1)
 
1003
  {
 
1004
    /**
 
1005
     * After page out
 
1006
     */
 
1007
    jam();
 
1008
 
 
1009
    Local_key key;
 
1010
    key.m_page_no = pagePtr.p->m_page_no;
 
1011
    key.m_file_no = pagePtr.p->m_file_no;
 
1012
    Uint32 real_free = pagePtr.p->free_space;
 
1013
    
 
1014
    if (DBG_DISK)
 
1015
    {
 
1016
      ndbout << "disk_page_unmap_callback(after) " << key 
 
1017
             << " cnt: " << dirty_count << " " << (idx & ~0x8000) << endl;
 
1018
    }
 
1019
 
 
1020
    ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
 
1021
    LocalDLList<Page> list(*pool, alloc.m_unmap_pages);
 
1022
    list.remove(pagePtr);
 
1023
 
 
1024
    Tablespace_client tsman(0, c_tsman,
 
1025
                            fragPtr.p->fragTableId,
 
1026
                            fragPtr.p->fragmentId,
 
1027
                            fragPtr.p->m_tablespace_id);
 
1028
    
 
1029
    if (DBG_DISK && alloc.calc_page_free_bits(real_free) != (idx & ~0x8000))
 
1030
    {
 
1031
      ndbout << key 
 
1032
             << " calc: " << alloc.calc_page_free_bits(real_free)
 
1033
             << " idx: " << (idx & ~0x8000)
 
1034
             << endl;
 
1035
    }
 
1036
    tsman.update_page_free_bits(&key, alloc.calc_page_free_bits(real_free));
 
1037
    jamEntry();
 
1038
  }
 
1039
}
 
1040
 
 
1041
void
 
1042
Dbtup::disk_page_alloc(Signal* signal, 
 
1043
                       Tablerec* tabPtrP, Fragrecord* fragPtrP, 
 
1044
                       Local_key* key, PagePtr pagePtr, Uint32 gci)
 
1045
{
 
1046
  jam();
 
1047
  Uint32 logfile_group_id= fragPtrP->m_logfile_group_id;
 
1048
  Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
 
1049
 
 
1050
  Uint64 lsn;
 
1051
  if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
 
1052
  {
 
1053
    ddassert(pagePtr.p->uncommitted_used_space > 0);
 
1054
    pagePtr.p->uncommitted_used_space--;
 
1055
    key->m_page_idx= ((Fix_page*)pagePtr.p)->alloc_record();
 
1056
    lsn= disk_page_undo_alloc(pagePtr.p, key, 1, gci, logfile_group_id);
 
1057
  }
 
1058
  else
 
1059
  {
 
1060
    Uint32 sz= key->m_page_idx;
 
1061
    ddassert(pagePtr.p->uncommitted_used_space >= sz);
 
1062
    pagePtr.p->uncommitted_used_space -= sz;
 
1063
    key->m_page_idx= ((Var_page*)pagePtr.p)->
 
1064
      alloc_record(sz, (Var_page*)ctemp_page, 0);
 
1065
    
 
1066
    lsn= disk_page_undo_alloc(pagePtr.p, key, sz, gci, logfile_group_id);
 
1067
  }
 
1068
}
 
1069
 
 
1070
void
 
1071
Dbtup::disk_page_free(Signal *signal, 
 
1072
                      Tablerec *tabPtrP, Fragrecord * fragPtrP,
 
1073
                      Local_key* key, PagePtr pagePtr, Uint32 gci)
 
1074
{
 
1075
  jam();
 
1076
  if (DBG_DISK)
 
1077
    ndbout << " disk_page_free " << *key << endl;
 
1078
  
 
1079
  Uint32 page_idx= key->m_page_idx;
 
1080
  Uint32 logfile_group_id= fragPtrP->m_logfile_group_id;
 
1081
  Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
 
1082
  Uint32 old_free= pagePtr.p->free_space;
 
1083
 
 
1084
  Uint32 sz;
 
1085
  Uint64 lsn;
 
1086
  if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
 
1087
  {
 
1088
    sz = 1;
 
1089
    const Uint32 *src= ((Fix_page*)pagePtr.p)->get_ptr(page_idx, 0);
 
1090
    ndbassert(* (src + 1) != Tup_fixsize_page::FREE_RECORD);
 
1091
    lsn= disk_page_undo_free(pagePtr.p, key,
 
1092
                             src, tabPtrP->m_offsets[DD].m_fix_header_size,
 
1093
                             gci, logfile_group_id);
 
1094
    
 
1095
    ((Fix_page*)pagePtr.p)->free_record(page_idx);
 
1096
  }
 
1097
  else
 
1098
  {
 
1099
    const Uint32 *src= ((Var_page*)pagePtr.p)->get_ptr(page_idx);
 
1100
    sz= ((Var_page*)pagePtr.p)->get_entry_len(page_idx);
 
1101
    lsn= disk_page_undo_free(pagePtr.p, key,
 
1102
                             src, sz,
 
1103
                             gci, logfile_group_id);
 
1104
    
 
1105
    ((Var_page*)pagePtr.p)->free_record(page_idx, 0);
 
1106
  }    
 
1107
  
 
1108
  Uint32 new_free = pagePtr.p->free_space;
 
1109
  
 
1110
  Uint32 ext = pagePtr.p->m_extent_info_ptr;
 
1111
  Uint32 used = pagePtr.p->uncommitted_used_space;
 
1112
  Uint32 old_idx = pagePtr.p->list_index;
 
1113
  ddassert(old_free >= used);
 
1114
  ddassert(new_free >= used);
 
1115
  ddassert(new_free >= old_free);
 
1116
  ddassert((old_idx & 0x8000) == 0);
 
1117
 
 
1118
  Uint32 new_idx = alloc.calc_page_free_bits(new_free - used);
 
1119
  ddassert(alloc.calc_page_free_bits(old_free - used) == old_idx);
 
1120
  
 
1121
  Ptr<Extent_info> extentPtr;
 
1122
  c_extent_pool.getPtr(extentPtr, ext);
 
1123
  
 
1124
  if (old_idx != new_idx)
 
1125
  {
 
1126
    jam();
 
1127
    ddassert(extentPtr.p->m_free_page_count[old_idx]);
 
1128
    extentPtr.p->m_free_page_count[old_idx]--;
 
1129
    extentPtr.p->m_free_page_count[new_idx]++;
 
1130
 
 
1131
    ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
 
1132
    LocalDLList<Page> new_list(*pool, alloc.m_dirty_pages[new_idx]);
 
1133
    LocalDLList<Page> old_list(*pool, alloc.m_dirty_pages[old_idx]);
 
1134
    old_list.remove(pagePtr);
 
1135
    new_list.add(pagePtr);
 
1136
    pagePtr.p->list_index = new_idx;
 
1137
  }
 
1138
  
 
1139
  extentPtr.p->m_free_space += sz;
 
1140
  update_extent_pos(alloc, extentPtr);
 
1141
#if NOT_YET_FREE_EXTENT
 
1142
  if (check_free(extentPtr.p) == 0)
 
1143
  {
 
1144
    ndbout_c("free: extent is free");
 
1145
  }
 
1146
#endif
 
1147
}
 
1148
 
 
1149
void
 
1150
Dbtup::disk_page_abort_prealloc(Signal *signal, Fragrecord* fragPtrP, 
 
1151
                                Local_key* key, Uint32 sz)
 
1152
{
 
1153
  jam();
 
1154
  Page_cache_client::Request req;
 
1155
  req.m_callback.m_callbackData= sz;
 
1156
  req.m_callback.m_callbackFunction = 
 
1157
    safe_cast(&Dbtup::disk_page_abort_prealloc_callback);
 
1158
  
 
1159
  int flags= Page_cache_client::DIRTY_REQ;
 
1160
  memcpy(&req.m_page, key, sizeof(Local_key));
 
1161
 
 
1162
  int res= m_pgman.get_page(signal, req, flags);
 
1163
  jamEntry();
 
1164
  switch(res)
 
1165
  {
 
1166
  case 0:
 
1167
    jam();
 
1168
    break;
 
1169
  case -1:
 
1170
    ndbrequire(false);
 
1171
    break;
 
1172
  default:
 
1173
    jam();
 
1174
    Ptr<GlobalPage> gpage;
 
1175
    m_global_page_pool.getPtr(gpage, (Uint32)res);
 
1176
    PagePtr pagePtr;
 
1177
    pagePtr.i = gpage.i;
 
1178
    pagePtr.p = reinterpret_cast<Page*>(gpage.p);
 
1179
 
 
1180
    disk_page_abort_prealloc_callback_1(signal, fragPtrP, pagePtr, sz);
 
1181
  }
 
1182
}
 
1183
 
 
1184
void
 
1185
Dbtup::disk_page_abort_prealloc_callback(Signal* signal, 
 
1186
                                         Uint32 sz, Uint32 page_id)
 
1187
{
 
1188
  //ndbout_c("disk_alloc_page_callback id: %d", page_id);
 
1189
  jamEntry();  
 
1190
  Ptr<GlobalPage> gpage;
 
1191
  m_global_page_pool.getPtr(gpage, page_id);
 
1192
  
 
1193
  PagePtr pagePtr;
 
1194
  pagePtr.i = gpage.i;
 
1195
  pagePtr.p = reinterpret_cast<Page*>(gpage.p);
 
1196
 
 
1197
  Ptr<Tablerec> tabPtr;
 
1198
  tabPtr.i= pagePtr.p->m_table_id;
 
1199
  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
 
1200
  
 
1201
  Ptr<Fragrecord> fragPtr;
 
1202
  getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
 
1203
 
 
1204
  disk_page_abort_prealloc_callback_1(signal, fragPtr.p, pagePtr, sz);
 
1205
}
 
1206
 
 
1207
void
 
1208
Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal, 
 
1209
                                           Fragrecord* fragPtrP,
 
1210
                                           PagePtr pagePtr,
 
1211
                                           Uint32 sz)
 
1212
{
 
1213
  jam();
 
1214
  disk_page_set_dirty(pagePtr);
 
1215
 
 
1216
  Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
 
1217
  Uint32 page_idx = pagePtr.p->list_index;
 
1218
  Uint32 used = pagePtr.p->uncommitted_used_space;
 
1219
  Uint32 free = pagePtr.p->free_space;
 
1220
  Uint32 ext = pagePtr.p->m_extent_info_ptr;
 
1221
 
 
1222
  Uint32 old_idx = page_idx & 0x7FFF;
 
1223
  ddassert(free >= used);
 
1224
  ddassert(used >= sz);
 
1225
  ddassert(alloc.calc_page_free_bits(free - used) == old_idx);
 
1226
  Uint32 new_idx = alloc.calc_page_free_bits(free - used + sz);
 
1227
 
 
1228
  Ptr<Extent_info> extentPtr;
 
1229
  c_extent_pool.getPtr(extentPtr, ext);
 
1230
  if (old_idx != new_idx)
 
1231
  {
 
1232
    jam();
 
1233
    ddassert(extentPtr.p->m_free_page_count[old_idx]);
 
1234
    extentPtr.p->m_free_page_count[old_idx]--;
 
1235
    extentPtr.p->m_free_page_count[new_idx]++;
 
1236
 
 
1237
    if (old_idx == page_idx)
 
1238
    {
 
1239
      jam();
 
1240
      ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
 
1241
      LocalDLList<Page> old_list(*pool, alloc.m_dirty_pages[old_idx]);
 
1242
      LocalDLList<Page> new_list(*pool, alloc.m_dirty_pages[new_idx]);
 
1243
      old_list.remove(pagePtr);
 
1244
      new_list.add(pagePtr);
 
1245
      pagePtr.p->list_index = new_idx;
 
1246
    }
 
1247
    else
 
1248
    {
 
1249
      jam();
 
1250
      pagePtr.p->list_index = new_idx | 0x8000;
 
1251
    }
 
1252
  }
 
1253
  
 
1254
  pagePtr.p->uncommitted_used_space = used - sz;
 
1255
 
 
1256
  extentPtr.p->m_free_space += sz;
 
1257
  update_extent_pos(alloc, extentPtr);
 
1258
#if NOT_YET_FREE_EXTENT
 
1259
  if (check_free(extentPtr.p) == 0)
 
1260
  {
 
1261
    ndbout_c("abort: extent is free");
 
1262
  }
 
1263
#endif
 
1264
}
 
1265
 
 
1266
#if NOT_YET_UNDO_ALLOC_EXTENT
 
1267
void
 
1268
Dbtup::disk_page_alloc_extent_log_buffer_callback(Signal* signal,
 
1269
                                                  Uint32 extentPtrI,
 
1270
                                                  Uint32 unused)
 
1271
{
 
1272
  Ptr<Extent_info> extentPtr;
 
1273
  c_extent_pool.getPtr(extentPtr, extentPtrI);
 
1274
 
 
1275
  Local_key key = extentPtr.p->m_key;
 
1276
  Tablespace_client2 tsman(signal, c_tsman, &key);
 
1277
 
 
1278
  Ptr<Tablerec> tabPtr;
 
1279
  tabPtr.i= tsman.m_table_id;
 
1280
  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
 
1281
  
 
1282
  Ptr<Fragrecord> fragPtr;
 
1283
  getFragmentrec(fragPtr, tsman.m_fragment_id, tabPtr.p);
 
1284
 
 
1285
  Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
 
1286
 
 
1287
  Disk_undo::AllocExtent alloc;
 
1288
  alloc.m_table = tabPtr.i;
 
1289
  alloc.m_fragment = tsman.m_fragment_id;
 
1290
  alloc.m_page_no = key.m_page_no;
 
1291
  alloc.m_file_no = key.m_file_no;
 
1292
  alloc.m_type_length = (Disk_undo::UNDO_ALLOC_EXTENT<<16)|(sizeof(alloc)>> 2);
 
1293
  
 
1294
  Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } };
 
1295
  
 
1296
  Uint64 lsn= lgman.add_entry(c, 1);
 
1297
  
 
1298
  tsman.update_lsn(&key, lsn);
 
1299
  jamEntry();
 
1300
}
 
1301
#endif
 
1302
 
 
1303
Uint64
 
1304
Dbtup::disk_page_undo_alloc(Page* page, const Local_key* key,
 
1305
                            Uint32 sz, Uint32 gci, Uint32 logfile_group_id)
 
1306
{
 
1307
  jam();
 
1308
  Logfile_client lgman(this, c_lgman, logfile_group_id);
 
1309
  
 
1310
  Disk_undo::Alloc alloc;
 
1311
  alloc.m_type_length= (Disk_undo::UNDO_ALLOC << 16) | (sizeof(alloc) >> 2);
 
1312
  alloc.m_page_no = key->m_page_no;
 
1313
  alloc.m_file_no_page_idx= key->m_file_no << 16 | key->m_page_idx;
 
1314
  
 
1315
  Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } };
 
1316
  
 
1317
  Uint64 lsn= lgman.add_entry(c, 1);
 
1318
  m_pgman.update_lsn(* key, lsn);
 
1319
  jamEntry();
 
1320
 
 
1321
  return lsn;
 
1322
}
 
1323
 
 
1324
Uint64
 
1325
Dbtup::disk_page_undo_update(Page* page, const Local_key* key,
 
1326
                             const Uint32* src, Uint32 sz,
 
1327
                             Uint32 gci, Uint32 logfile_group_id)
 
1328
{
 
1329
  jam();
 
1330
  Logfile_client lgman(this, c_lgman, logfile_group_id);
 
1331
 
 
1332
  Disk_undo::Update update;
 
1333
  update.m_page_no = key->m_page_no;
 
1334
  update.m_file_no_page_idx= key->m_file_no << 16 | key->m_page_idx;
 
1335
  update.m_gci= gci;
 
1336
  
 
1337
  update.m_type_length= 
 
1338
    (Disk_undo::UNDO_UPDATE << 16) | (sz + (sizeof(update) >> 2) - 1);
 
1339
 
 
1340
  Logfile_client::Change c[3] = {
 
1341
    { &update, 3 },
 
1342
    { src, sz },
 
1343
    { &update.m_type_length, 1 }
 
1344
  };
 
1345
 
 
1346
  ndbassert(4*(3 + sz + 1) == (sizeof(update) + 4*sz - 4));
 
1347
    
 
1348
  Uint64 lsn= lgman.add_entry(c, 3);
 
1349
  m_pgman.update_lsn(* key, lsn);
 
1350
  jamEntry();
 
1351
 
 
1352
  return lsn;
 
1353
}
 
1354
  
 
1355
Uint64
 
1356
Dbtup::disk_page_undo_free(Page* page, const Local_key* key,
 
1357
                           const Uint32* src, Uint32 sz,
 
1358
                           Uint32 gci, Uint32 logfile_group_id)
 
1359
{
 
1360
  jam();
 
1361
  Logfile_client lgman(this, c_lgman, logfile_group_id);
 
1362
 
 
1363
  Disk_undo::Free free;
 
1364
  free.m_page_no = key->m_page_no;
 
1365
  free.m_file_no_page_idx= key->m_file_no << 16 | key->m_page_idx;
 
1366
  free.m_gci= gci;
 
1367
  
 
1368
  free.m_type_length= 
 
1369
    (Disk_undo::UNDO_FREE << 16) | (sz + (sizeof(free) >> 2) - 1);
 
1370
  
 
1371
  Logfile_client::Change c[3] = {
 
1372
    { &free, 3 },
 
1373
    { src, sz },
 
1374
    { &free.m_type_length, 1 }
 
1375
  };
 
1376
  
 
1377
  ndbassert(4*(3 + sz + 1) == (sizeof(free) + 4*sz - 4));
 
1378
  
 
1379
  Uint64 lsn= lgman.add_entry(c, 3);
 
1380
  m_pgman.update_lsn(* key, lsn);
 
1381
  jamEntry();
 
1382
 
 
1383
  return lsn;
 
1384
}
 
1385
  
 
1386
#include <signaldata/LgmanContinueB.hpp>
 
1387
 
 
1388
static Dbtup::Apply_undo f_undo;
 
1389
 
 
1390
#define DBG_UNDO 0
 
1391
 
 
1392
void
 
1393
Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
 
1394
                         Uint32 type, const Uint32 * ptr, Uint32 len)
 
1395
{
 
1396
  f_undo_done = false;
 
1397
  f_undo.m_lsn= lsn;
 
1398
  f_undo.m_ptr= ptr;
 
1399
  f_undo.m_len= len;
 
1400
  f_undo.m_type = type;
 
1401
 
 
1402
  Page_cache_client::Request preq;
 
1403
  switch(f_undo.m_type){
 
1404
  case File_formats::Undofile::UNDO_LCP_FIRST:
 
1405
  case File_formats::Undofile::UNDO_LCP:
 
1406
  {
 
1407
    jam();
 
1408
    ndbrequire(len == 3);
 
1409
    Uint32 lcp = ptr[0];
 
1410
    Uint32 tableId = ptr[1] >> 16;
 
1411
    Uint32 fragId = ptr[1] & 0xFFFF;
 
1412
    disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_LCP, lcp);
 
1413
    disk_restart_undo_next(signal);
 
1414
    
 
1415
    if (DBG_UNDO)
 
1416
    {
 
1417
      ndbout_c("UNDO LCP %u (%u, %u)", lcp, tableId, fragId);
 
1418
    }
 
1419
    return;
 
1420
  }
 
1421
  case File_formats::Undofile::UNDO_TUP_ALLOC:
 
1422
  {
 
1423
    jam();
 
1424
    Disk_undo::Alloc* rec= (Disk_undo::Alloc*)ptr;
 
1425
    preq.m_page.m_page_no = rec->m_page_no;
 
1426
    preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
 
1427
    preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;    
 
1428
    break;
 
1429
  }
 
1430
  case File_formats::Undofile::UNDO_TUP_UPDATE:
 
1431
  {
 
1432
    jam();
 
1433
    Disk_undo::Update* rec= (Disk_undo::Update*)ptr;
 
1434
    preq.m_page.m_page_no = rec->m_page_no;
 
1435
    preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
 
1436
    preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
 
1437
    break;
 
1438
  }
 
1439
  case File_formats::Undofile::UNDO_TUP_FREE:
 
1440
  {
 
1441
    jam();
 
1442
    Disk_undo::Free* rec= (Disk_undo::Free*)ptr;
 
1443
    preq.m_page.m_page_no = rec->m_page_no;
 
1444
    preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
 
1445
    preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
 
1446
    break;
 
1447
  }
 
1448
  case File_formats::Undofile::UNDO_TUP_CREATE:
 
1449
    /**
 
1450
     * 
 
1451
     */
 
1452
  {
 
1453
    jam();
 
1454
    Disk_undo::Create* rec= (Disk_undo::Create*)ptr;
 
1455
    Ptr<Tablerec> tabPtr;
 
1456
    tabPtr.i= rec->m_table;
 
1457
    ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
 
1458
    for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
 
1459
      if (tabPtr.p->fragrec[i] != RNIL)
 
1460
        disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i], 
 
1461
                              Fragrecord::UC_CREATE, 0);
 
1462
    disk_restart_undo_next(signal);
 
1463
 
 
1464
    if (DBG_UNDO)
 
1465
    {
 
1466
      ndbout_c("UNDO CREATE (%u)", tabPtr.i);
 
1467
    }
 
1468
    return;
 
1469
  }
 
1470
  case File_formats::Undofile::UNDO_TUP_DROP:
 
1471
  {
 
1472
    jam();
 
1473
    Disk_undo::Drop* rec = (Disk_undo::Drop*)ptr;
 
1474
    Ptr<Tablerec> tabPtr;
 
1475
    tabPtr.i= rec->m_table;
 
1476
    ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
 
1477
    for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
 
1478
      if (tabPtr.p->fragrec[i] != RNIL)
 
1479
        disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i], 
 
1480
                              Fragrecord::UC_CREATE, 0);
 
1481
    disk_restart_undo_next(signal);
 
1482
 
 
1483
    if (DBG_UNDO)
 
1484
    {
 
1485
      ndbout_c("UNDO DROP (%u)", tabPtr.i);
 
1486
    }
 
1487
    return;
 
1488
  }
 
1489
  case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
 
1490
    jam();
 
1491
  case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
 
1492
    jam();
 
1493
    disk_restart_undo_next(signal);
 
1494
    return;
 
1495
 
 
1496
  case File_formats::Undofile::UNDO_END:
 
1497
    jam();
 
1498
    f_undo_done = true;
 
1499
    return;
 
1500
  default:
 
1501
    ndbrequire(false);
 
1502
  }
 
1503
 
 
1504
  f_undo.m_key = preq.m_page;
 
1505
  preq.m_callback.m_callbackFunction = 
 
1506
    safe_cast(&Dbtup::disk_restart_undo_callback);
 
1507
  
 
1508
  int flags = 0;
 
1509
  int res= m_pgman.get_page(signal, preq, flags);
 
1510
  jamEntry();
 
1511
  switch(res)
 
1512
  {
 
1513
  case 0:
 
1514
    break; // Wait for callback
 
1515
  case -1:
 
1516
    ndbrequire(false);
 
1517
    break;
 
1518
  default:
 
1519
    execute(signal, preq.m_callback, res); // run callback
 
1520
  }
 
1521
}
 
1522
 
 
1523
void
 
1524
Dbtup::disk_restart_undo_next(Signal* signal)
 
1525
{
 
1526
  signal->theData[0] = LgmanContinueB::EXECUTE_UNDO_RECORD;
 
1527
  sendSignal(LGMAN_REF, GSN_CONTINUEB, signal, 1, JBB);
 
1528
}
 
1529
 
 
1530
void
 
1531
Dbtup::disk_restart_lcp_id(Uint32 tableId, Uint32 fragId, Uint32 lcpId)
 
1532
{
 
1533
  jamEntry();
 
1534
  
 
1535
  if (lcpId == RNIL)
 
1536
  {
 
1537
    disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_CREATE, 0);
 
1538
    if (DBG_UNDO)
 
1539
    {
 
1540
      ndbout_c("mark_no_lcp (%u, %u)", tableId, fragId);
 
1541
    }
 
1542
  }
 
1543
  else
 
1544
  {
 
1545
    disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_SET_LCP, lcpId); 
 
1546
    if (DBG_UNDO)
 
1547
    {
 
1548
      ndbout_c("mark_no_lcp (%u, %u)", tableId, fragId);
 
1549
    }
 
1550
 
 
1551
  }
 
1552
}
 
1553
 
 
1554
void
 
1555
Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId, Uint32 flag, 
 
1556
                             Uint32 lcpId)
 
1557
{
 
1558
  Ptr<Tablerec> tabPtr;
 
1559
  tabPtr.i= tableId;
 
1560
  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
 
1561
  
 
1562
  if (tabPtr.p->tableStatus == DEFINED)
 
1563
  {
 
1564
    jam();
 
1565
    FragrecordPtr fragPtr;
 
1566
    getFragmentrec(fragPtr, fragId, tabPtr.p);
 
1567
    if (!fragPtr.isNull())
 
1568
    {
 
1569
      jam();
 
1570
      switch(flag){
 
1571
      case Fragrecord::UC_CREATE:
 
1572
        jam();
 
1573
        fragPtr.p->m_undo_complete |= flag;
 
1574
        return;
 
1575
      case Fragrecord::UC_LCP:
 
1576
        jam();
 
1577
        if (fragPtr.p->m_undo_complete == 0 && 
 
1578
            fragPtr.p->m_restore_lcp_id == lcpId)
 
1579
        {
 
1580
          jam();
 
1581
          fragPtr.p->m_undo_complete |= flag;
 
1582
          if (DBG_UNDO)
 
1583
            ndbout_c("table: %u fragment: %u lcp: %u -> done", 
 
1584
                     tableId, fragId, lcpId);
 
1585
        }
 
1586
        return;
 
1587
      case Fragrecord::UC_SET_LCP:
 
1588
      {
 
1589
        jam();
 
1590
        if (DBG_UNDO)
 
1591
           ndbout_c("table: %u fragment: %u restore to lcp: %u",
 
1592
                    tableId, fragId, lcpId);
 
1593
        ndbrequire(fragPtr.p->m_undo_complete == 0);
 
1594
        ndbrequire(fragPtr.p->m_restore_lcp_id == RNIL);
 
1595
        fragPtr.p->m_restore_lcp_id = lcpId;
 
1596
        return;
 
1597
      }
 
1598
      }
 
1599
      jamLine(flag);
 
1600
      ndbrequire(false);
 
1601
    }
 
1602
  }
 
1603
}
 
1604
 
 
1605
void
 
1606
Dbtup::disk_restart_undo_callback(Signal* signal,
 
1607
                                  Uint32 id, 
 
1608
                                  Uint32 page_id)
 
1609
{
 
1610
  jamEntry();
 
1611
  Ptr<GlobalPage> gpage;
 
1612
  m_global_page_pool.getPtr(gpage, page_id);
 
1613
  PagePtr pagePtr;
 
1614
  pagePtr.i = gpage.i;
 
1615
  pagePtr.p = reinterpret_cast<Page*>(gpage.p);
 
1616
 
 
1617
  Apply_undo* undo = &f_undo;
 
1618
  
 
1619
  bool update = false;
 
1620
  if (! (pagePtr.p->list_index & 0x8000) ||
 
1621
      pagePtr.p->nextList != RNIL ||
 
1622
      pagePtr.p->prevList != RNIL)
 
1623
  {
 
1624
    jam();
 
1625
    update = true;
 
1626
    pagePtr.p->list_index |= 0x8000;
 
1627
    pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
 
1628
  }
 
1629
  
 
1630
  Uint32 tableId= pagePtr.p->m_table_id;
 
1631
  Uint32 fragId = pagePtr.p->m_fragment_id;
 
1632
  
 
1633
  if (tableId >= cnoOfTablerec)
 
1634
  {
 
1635
    jam();
 
1636
    if (DBG_UNDO)
 
1637
      ndbout_c("UNDO table> %u", tableId);
 
1638
    disk_restart_undo_next(signal);
 
1639
    return;
 
1640
  }
 
1641
  undo->m_table_ptr.i = tableId;
 
1642
  ptrCheckGuard(undo->m_table_ptr, cnoOfTablerec, tablerec);
 
1643
  
 
1644
  if (undo->m_table_ptr.p->tableStatus != DEFINED)
 
1645
  {
 
1646
    jam();
 
1647
    if (DBG_UNDO)
 
1648
      ndbout_c("UNDO !defined (%u) ", tableId);
 
1649
    disk_restart_undo_next(signal);
 
1650
    return;
 
1651
  }
 
1652
  
 
1653
  getFragmentrec(undo->m_fragment_ptr, fragId, undo->m_table_ptr.p);
 
1654
  if(undo->m_fragment_ptr.isNull())
 
1655
  {
 
1656
    jam();
 
1657
    if (DBG_UNDO)
 
1658
      ndbout_c("UNDO fragment null %u/%u", tableId, fragId);
 
1659
    disk_restart_undo_next(signal);
 
1660
    return;
 
1661
  }
 
1662
 
 
1663
  if (undo->m_fragment_ptr.p->m_undo_complete)
 
1664
  {
 
1665
    jam();
 
1666
    if (DBG_UNDO)
 
1667
      ndbout_c("UNDO undo complete %u/%u", tableId, fragId);
 
1668
    disk_restart_undo_next(signal);
 
1669
    return;
 
1670
  }
 
1671
  
 
1672
  Local_key key = undo->m_key;
 
1673
//  key.m_page_no = pagePtr.p->m_page_no;
 
1674
//  key.m_file_no = pagePtr.p->m_file_no;
 
1675
  
 
1676
  Uint64 lsn = 0;
 
1677
  lsn += pagePtr.p->m_page_header.m_page_lsn_hi; lsn <<= 32;
 
1678
  lsn += pagePtr.p->m_page_header.m_page_lsn_lo;
 
1679
 
 
1680
  undo->m_page_ptr = pagePtr;
 
1681
  
 
1682
  if (undo->m_lsn <= lsn)
 
1683
  {
 
1684
    jam();
 
1685
    if (DBG_UNDO)
 
1686
    {
 
1687
      ndbout << "apply: " << undo->m_lsn << "(" << lsn << " )" 
 
1688
             << key << " type: " << undo->m_type << endl;
 
1689
    }
 
1690
    
 
1691
    update = true;
 
1692
    if (DBG_UNDO)
 
1693
      ndbout_c("applying %lld", undo->m_lsn);
 
1694
    /**
 
1695
     * Apply undo record
 
1696
     */
 
1697
    switch(undo->m_type){
 
1698
    case File_formats::Undofile::UNDO_TUP_ALLOC:
 
1699
      jam();
 
1700
      disk_restart_undo_alloc(undo);
 
1701
      break;
 
1702
    case File_formats::Undofile::UNDO_TUP_UPDATE:
 
1703
      jam();
 
1704
      disk_restart_undo_update(undo);
 
1705
      break;
 
1706
    case File_formats::Undofile::UNDO_TUP_FREE:
 
1707
      jam();
 
1708
      disk_restart_undo_free(undo);
 
1709
      break;
 
1710
    default:
 
1711
      ndbrequire(false);
 
1712
    }
 
1713
 
 
1714
    if (DBG_UNDO)
 
1715
      ndbout << "disk_restart_undo: " << undo->m_type << " " 
 
1716
             << undo->m_key << endl;
 
1717
    
 
1718
    lsn = undo->m_lsn - 1; // make sure undo isn't run again...
 
1719
    
 
1720
    m_pgman.update_lsn(undo->m_key, lsn);
 
1721
    jamEntry();
 
1722
 
 
1723
    disk_restart_undo_page_bits(signal, undo);
 
1724
  }
 
1725
  else if (DBG_UNDO)
 
1726
  {
 
1727
    jam();
 
1728
    ndbout << "ignore: " << undo->m_lsn << "(" << lsn << " )" 
 
1729
           << key << " type: " << undo->m_type 
 
1730
           << " tab: " << tableId << endl;
 
1731
  }
 
1732
 
 
1733
  disk_restart_undo_next(signal);
 
1734
}
 
1735
 
 
1736
void
 
1737
Dbtup::disk_restart_undo_alloc(Apply_undo* undo)
 
1738
{
 
1739
  ndbassert(undo->m_page_ptr.p->m_file_no == undo->m_key.m_file_no);
 
1740
  ndbassert(undo->m_page_ptr.p->m_page_no == undo->m_key.m_page_no);
 
1741
  if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
 
1742
  {
 
1743
    ((Fix_page*)undo->m_page_ptr.p)->free_record(undo->m_key.m_page_idx);
 
1744
  }
 
1745
  else
 
1746
    ((Var_page*)undo->m_page_ptr.p)->free_record(undo->m_key.m_page_idx, 0);
 
1747
}
 
1748
 
 
1749
void
 
1750
Dbtup::disk_restart_undo_update(Apply_undo* undo)
 
1751
{
 
1752
  Uint32* ptr;
 
1753
  Uint32 len= undo->m_len - 4;
 
1754
  if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
 
1755
  {
 
1756
    ptr= ((Fix_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx, len);
 
1757
    ndbrequire(len == undo->m_table_ptr.p->m_offsets[DD].m_fix_header_size);
 
1758
  }
 
1759
  else
 
1760
  {
 
1761
    ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx);
 
1762
    abort();
 
1763
  }  
 
1764
  
 
1765
  const Disk_undo::Update *update = (const Disk_undo::Update*)undo->m_ptr;
 
1766
  const Uint32* src= update->m_data;
 
1767
  memcpy(ptr, src, 4 * len);
 
1768
}
 
1769
 
 
1770
void
 
1771
Dbtup::disk_restart_undo_free(Apply_undo* undo)
 
1772
{
 
1773
  Uint32* ptr, idx = undo->m_key.m_page_idx;
 
1774
  Uint32 len= undo->m_len - 4;
 
1775
  if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
 
1776
  {
 
1777
    ndbrequire(len == undo->m_table_ptr.p->m_offsets[DD].m_fix_header_size);
 
1778
    idx= ((Fix_page*)undo->m_page_ptr.p)->alloc_record(idx);
 
1779
    ptr= ((Fix_page*)undo->m_page_ptr.p)->get_ptr(idx, len);
 
1780
  }
 
1781
  else
 
1782
  {
 
1783
    abort();
 
1784
  }  
 
1785
  
 
1786
  ndbrequire(idx == undo->m_key.m_page_idx);
 
1787
  const Disk_undo::Free *free = (const Disk_undo::Free*)undo->m_ptr;
 
1788
  const Uint32* src= free->m_data;
 
1789
  memcpy(ptr, src, 4 * len);
 
1790
}
 
1791
 
 
1792
void
 
1793
Dbtup::disk_restart_undo_page_bits(Signal* signal, Apply_undo* undo)
 
1794
{
 
1795
  Fragrecord* fragPtrP = undo->m_fragment_ptr.p;
 
1796
  Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
 
1797
  
 
1798
  /**
 
1799
   * Set alloc.m_curr_extent_info_ptr_i to
 
1800
   *   current this extent (and move old extend into free matrix)
 
1801
   */
 
1802
  Page* pageP = undo->m_page_ptr.p;
 
1803
  Uint32 free = pageP->free_space;
 
1804
  Uint32 new_bits = alloc.calc_page_free_bits(free);
 
1805
  pageP->list_index = 0x8000 | new_bits;
 
1806
 
 
1807
  Tablespace_client tsman(signal, c_tsman,
 
1808
                          fragPtrP->fragTableId,
 
1809
                          fragPtrP->fragmentId,
 
1810
                          fragPtrP->m_tablespace_id);
 
1811
  
 
1812
  tsman.restart_undo_page_free_bits(&undo->m_key, new_bits);
 
1813
  jamEntry();
 
1814
}
 
1815
 
 
1816
int
 
1817
Dbtup::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId, 
 
1818
                                 const Local_key* key, Uint32 pages)
 
1819
{
 
1820
  TablerecPtr tabPtr;
 
1821
  FragrecordPtr fragPtr;
 
1822
  tabPtr.i = tableId;
 
1823
  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
 
1824
  if (tabPtr.p->tableStatus == DEFINED)
 
1825
  {
 
1826
    getFragmentrec(fragPtr, fragId, tabPtr.p);
 
1827
    if (fragPtr.p->m_undo_complete & Fragrecord::UC_CREATE)
 
1828
    {
 
1829
      jam();
 
1830
      return -1;
 
1831
    }
 
1832
 
 
1833
    if (!fragPtr.isNull())
 
1834
    {
 
1835
      Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
 
1836
      
 
1837
      Ptr<Extent_info> ext;
 
1838
      ndbrequire(c_extent_pool.seize(ext));
 
1839
      
 
1840
      ndbout << "allocated " << pages << " pages: " << *key << endl;
 
1841
      
 
1842
      ext.p->m_key = *key;
 
1843
      ext.p->m_first_page_no = ext.p->m_key.m_page_no;
 
1844
      ext.p->m_free_space= 0;
 
1845
      bzero(ext.p->m_free_page_count, sizeof(ext.p->m_free_page_count));
 
1846
      
 
1847
      if (alloc.m_curr_extent_info_ptr_i != RNIL)
 
1848
      {
 
1849
        jam();
 
1850
        Ptr<Extent_info> old;
 
1851
        c_extent_pool.getPtr(old, alloc.m_curr_extent_info_ptr_i);
 
1852
        ndbassert(old.p->m_free_matrix_pos == RNIL);
 
1853
        Uint32 pos= alloc.calc_extent_pos(old.p);
 
1854
        Local_extent_info_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
 
1855
        new_list.add(old);
 
1856
        old.p->m_free_matrix_pos= pos;
 
1857
      }
 
1858
      
 
1859
      alloc.m_curr_extent_info_ptr_i = ext.i;
 
1860
      ext.p->m_free_matrix_pos = RNIL;
 
1861
      c_extent_hash.add(ext);
 
1862
 
 
1863
      Local_fragment_extent_list list1(c_extent_pool, alloc.m_extent_list);
 
1864
      list1.add(ext);
 
1865
      return 0;
 
1866
    }
 
1867
  }
 
1868
 
 
1869
  return -1;
 
1870
}
 
1871
 
 
1872
void
 
1873
Dbtup::disk_restart_page_bits(Uint32 tableId, Uint32 fragId,
 
1874
                              const Local_key*, Uint32 bits)
 
1875
{
 
1876
  jam();
 
1877
  TablerecPtr tabPtr;
 
1878
  FragrecordPtr fragPtr;
 
1879
  tabPtr.i = tableId;
 
1880
  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
 
1881
  getFragmentrec(fragPtr, fragId, tabPtr.p);
 
1882
  Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
 
1883
  
 
1884
  Ptr<Extent_info> ext;
 
1885
  c_extent_pool.getPtr(ext, alloc.m_curr_extent_info_ptr_i);
 
1886
  
 
1887
  Uint32 size= alloc.calc_page_free_space(bits);  
 
1888
  
 
1889
  ext.p->m_free_space += size;
 
1890
  ext.p->m_free_page_count[bits]++;
 
1891
  ndbassert(ext.p->m_free_matrix_pos == RNIL);
 
1892
}