~linuxjedi/drizzle/trunk-bug-667053

« back to all changes in this revision

Viewing changes to storage/innobase/page/page0cur.c

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/************************************************************************
 
2
The page cursor
 
3
 
 
4
(c) 1994-1996 Innobase Oy
 
5
 
 
6
Created 10/4/1994 Heikki Tuuri
 
7
*************************************************************************/
 
8
 
 
9
#include "page0cur.h"
 
10
#ifdef UNIV_NONINL
 
11
#include "page0cur.ic"
 
12
#endif
 
13
 
 
14
#include "rem0cmp.h"
 
15
#include "mtr0log.h"
 
16
#include "log0recv.h"
 
17
#include "rem0cmp.h"
 
18
 
 
19
static ulint    page_rnd        = 976722341;
 
20
 
 
21
#ifdef PAGE_CUR_ADAPT
 
22
# ifdef UNIV_SEARCH_PERF_STAT
 
23
ulint   page_cur_short_succ     = 0;
 
24
# endif /* UNIV_SEARCH_PERF_STAT */
 
25
 
 
26
/********************************************************************
 
27
Tries a search shortcut based on the last insert. */
 
28
UNIV_INLINE
 
29
ibool
 
30
page_cur_try_search_shortcut(
 
31
/*=========================*/
 
32
                                /* out: TRUE on success */
 
33
        page_t*         page,   /* in: index page */
 
34
        dict_index_t*   index,  /* in: record descriptor */
 
35
        dtuple_t*       tuple,  /* in: data tuple */
 
36
        ulint*          iup_matched_fields,
 
37
                                /* in/out: already matched fields in upper
 
38
                                limit record */
 
39
        ulint*          iup_matched_bytes,
 
40
                                /* in/out: already matched bytes in a field
 
41
                                not yet completely matched */
 
42
        ulint*          ilow_matched_fields,
 
43
                                /* in/out: already matched fields in lower
 
44
                                limit record */
 
45
        ulint*          ilow_matched_bytes,
 
46
                                /* in/out: already matched bytes in a field
 
47
                                not yet completely matched */
 
48
        page_cur_t*     cursor) /* out: page cursor */
 
49
{
 
50
        rec_t*  rec;
 
51
        rec_t*  next_rec;
 
52
        ulint   low_match;
 
53
        ulint   low_bytes;
 
54
        ulint   up_match;
 
55
        ulint   up_bytes;
 
56
#ifdef UNIV_SEARCH_DEBUG
 
57
        page_cur_t cursor2;
 
58
#endif
 
59
        ibool           success         = FALSE;
 
60
        mem_heap_t*     heap            = NULL;
 
61
        ulint           offsets_[REC_OFFS_NORMAL_SIZE];
 
62
        ulint*          offsets         = offsets_;
 
63
        *offsets_ = (sizeof offsets_) / sizeof *offsets_;
 
64
 
 
65
        ut_ad(dtuple_check_typed(tuple));
 
66
 
 
67
        rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
 
68
        offsets = rec_get_offsets(rec, index, offsets,
 
69
                                  dtuple_get_n_fields(tuple), &heap);
 
70
 
 
71
        ut_ad(rec);
 
72
        ut_ad(page_rec_is_user_rec(rec));
 
73
 
 
74
        ut_pair_min(&low_match, &low_bytes,
 
75
                    *ilow_matched_fields, *ilow_matched_bytes,
 
76
                    *iup_matched_fields, *iup_matched_bytes);
 
77
 
 
78
        up_match = low_match;
 
79
        up_bytes = low_bytes;
 
80
 
 
81
        if (page_cmp_dtuple_rec_with_match(tuple, rec, offsets,
 
82
                                           &low_match, &low_bytes) < 0) {
 
83
                goto exit_func;
 
84
        }
 
85
 
 
86
        next_rec = page_rec_get_next(rec);
 
87
        offsets = rec_get_offsets(next_rec, index, offsets,
 
88
                                  dtuple_get_n_fields(tuple), &heap);
 
89
 
 
90
        if (page_cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
 
91
                                           &up_match, &up_bytes) >= 0) {
 
92
                goto exit_func;
 
93
        }
 
94
 
 
95
        cursor->rec = rec;
 
96
 
 
97
#ifdef UNIV_SEARCH_DEBUG
 
98
        page_cur_search_with_match(page, index, tuple, PAGE_CUR_DBG,
 
99
                                   iup_matched_fields,
 
100
                                   iup_matched_bytes,
 
101
                                   ilow_matched_fields,
 
102
                                   ilow_matched_bytes,
 
103
                                   &cursor2);
 
104
        ut_a(cursor2.rec == cursor->rec);
 
105
 
 
106
        if (next_rec != page_get_supremum_rec(page)) {
 
107
 
 
108
                ut_a(*iup_matched_fields == up_match);
 
109
                ut_a(*iup_matched_bytes == up_bytes);
 
110
        }
 
111
 
 
112
        ut_a(*ilow_matched_fields == low_match);
 
113
        ut_a(*ilow_matched_bytes == low_bytes);
 
114
#endif
 
115
        if (!page_rec_is_supremum(next_rec)) {
 
116
 
 
117
                *iup_matched_fields = up_match;
 
118
                *iup_matched_bytes = up_bytes;
 
119
        }
 
120
 
 
121
        *ilow_matched_fields = low_match;
 
122
        *ilow_matched_bytes = low_bytes;
 
123
 
 
124
#ifdef UNIV_SEARCH_PERF_STAT
 
125
        page_cur_short_succ++;
 
126
#endif
 
127
        success = TRUE;
 
128
exit_func:
 
129
        if (UNIV_LIKELY_NULL(heap)) {
 
130
                mem_heap_free(heap);
 
131
        }
 
132
        return(success);
 
133
}
 
134
 
 
135
#endif
 
136
 
 
137
#ifdef PAGE_CUR_LE_OR_EXTENDS
 
138
/********************************************************************
 
139
Checks if the nth field in a record is a character type field which extends
 
140
the nth field in tuple, i.e., the field is longer or equal in length and has
 
141
common first characters. */
 
142
static
 
143
ibool
 
144
page_cur_rec_field_extends(
 
145
/*=======================*/
 
146
                                /* out: TRUE if rec field
 
147
                                extends tuple field */
 
148
        dtuple_t*       tuple,  /* in: data tuple */
 
149
        rec_t*          rec,    /* in: record */
 
150
        const ulint*    offsets,/* in: array returned by rec_get_offsets() */
 
151
        ulint           n)      /* in: compare nth field */
 
152
{
 
153
        dtype_t* type;
 
154
        dfield_t* dfield;
 
155
        byte*     rec_f;
 
156
        ulint     rec_f_len;
 
157
 
 
158
        ut_ad(rec_offs_validate(rec, NULL, offsets));
 
159
        dfield = dtuple_get_nth_field(tuple, n);
 
160
 
 
161
        type = dfield_get_type(dfield);
 
162
 
 
163
        rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
 
164
 
 
165
        if (type->mtype == DATA_VARCHAR
 
166
            || type->mtype == DATA_CHAR
 
167
            || type->mtype == DATA_FIXBINARY
 
168
            || type->mtype == DATA_BINARY
 
169
            || type->mtype == DATA_BLOB
 
170
            || type->mtype == DATA_VARMYSQL
 
171
            || type->mtype == DATA_MYSQL) {
 
172
 
 
173
                if (dfield_get_len(dfield) != UNIV_SQL_NULL
 
174
                    && rec_f_len != UNIV_SQL_NULL
 
175
                    && rec_f_len >= dfield_get_len(dfield)
 
176
                    && !cmp_data_data_slow(type,
 
177
                                           dfield_get_data(dfield),
 
178
                                           dfield_get_len(dfield),
 
179
                                           rec_f, dfield_get_len(dfield))) {
 
180
 
 
181
                        return(TRUE);
 
182
                }
 
183
        }
 
184
 
 
185
        return(FALSE);
 
186
}
 
187
#endif /* PAGE_CUR_LE_OR_EXTENDS */
 
188
 
 
189
/********************************************************************
 
190
Searches the right position for a page cursor. */
 
191
 
 
192
void
 
193
page_cur_search_with_match(
 
194
/*=======================*/
 
195
        page_t*         page,   /* in: index page */
 
196
        dict_index_t*   index,  /* in: record descriptor */
 
197
        dtuple_t*       tuple,  /* in: data tuple */
 
198
        ulint           mode,   /* in: PAGE_CUR_L, PAGE_CUR_LE, PAGE_CUR_G,
 
199
                                or PAGE_CUR_GE */
 
200
        ulint*          iup_matched_fields,
 
201
                                /* in/out: already matched fields in upper
 
202
                                limit record */
 
203
        ulint*          iup_matched_bytes,
 
204
                                /* in/out: already matched bytes in a field
 
205
                                not yet completely matched */
 
206
        ulint*          ilow_matched_fields,
 
207
                                /* in/out: already matched fields in lower
 
208
                                limit record */
 
209
        ulint*          ilow_matched_bytes,
 
210
                                /* in/out: already matched bytes in a field
 
211
                                not yet completely matched */
 
212
        page_cur_t*     cursor) /* out: page cursor */
 
213
{
 
214
        ulint   up;
 
215
        ulint   low;
 
216
        ulint   mid;
 
217
        page_dir_slot_t* slot;
 
218
        rec_t*  up_rec;
 
219
        rec_t*  low_rec;
 
220
        rec_t*  mid_rec;
 
221
        ulint   up_matched_fields;
 
222
        ulint   up_matched_bytes;
 
223
        ulint   low_matched_fields;
 
224
        ulint   low_matched_bytes;
 
225
        ulint   cur_matched_fields;
 
226
        ulint   cur_matched_bytes;
 
227
        int     cmp;
 
228
#ifdef UNIV_SEARCH_DEBUG
 
229
        int     dbg_cmp;
 
230
        ulint   dbg_matched_fields;
 
231
        ulint   dbg_matched_bytes;
 
232
#endif
 
233
        mem_heap_t*     heap            = NULL;
 
234
        ulint           offsets_[REC_OFFS_NORMAL_SIZE];
 
235
        ulint*          offsets         = offsets_;
 
236
        *offsets_ = (sizeof offsets_) / sizeof *offsets_;
 
237
 
 
238
        ut_ad(page && tuple && iup_matched_fields && iup_matched_bytes
 
239
              && ilow_matched_fields && ilow_matched_bytes && cursor);
 
240
        ut_ad(dtuple_validate(tuple));
 
241
        ut_ad(dtuple_check_typed(tuple));
 
242
#ifdef UNIV_DEBUG
 
243
# ifdef PAGE_CUR_DBG
 
244
        if (mode != PAGE_CUR_DBG)
 
245
# endif /* PAGE_CUR_DBG */
 
246
# ifdef PAGE_CUR_LE_OR_EXTENDS
 
247
                if (mode != PAGE_CUR_LE_OR_EXTENDS)
 
248
# endif /* PAGE_CUR_LE_OR_EXTENDS */
 
249
                        ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
 
250
                              || mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
 
251
#endif /* UNIV_DEBUG */
 
252
 
 
253
        page_check_dir(page);
 
254
 
 
255
#ifdef PAGE_CUR_ADAPT
 
256
        if ((page_header_get_field(page, PAGE_LEVEL) == 0)
 
257
            && (mode == PAGE_CUR_LE)
 
258
            && (page_header_get_field(page, PAGE_N_DIRECTION) > 3)
 
259
            && (page_header_get_ptr(page, PAGE_LAST_INSERT))
 
260
            && (page_header_get_field(page, PAGE_DIRECTION) == PAGE_RIGHT)) {
 
261
 
 
262
                if (page_cur_try_search_shortcut(
 
263
                            page, index, tuple,
 
264
                            iup_matched_fields, iup_matched_bytes,
 
265
                            ilow_matched_fields, ilow_matched_bytes,
 
266
                            cursor)) {
 
267
                        return;
 
268
                }
 
269
        }
 
270
# ifdef PAGE_CUR_DBG
 
271
        if (mode == PAGE_CUR_DBG) {
 
272
                mode = PAGE_CUR_LE;
 
273
        }
 
274
# endif
 
275
#endif
 
276
 
 
277
        /* The following flag does not work for non-latin1 char sets because
 
278
        cmp_full_field does not tell how many bytes matched */
 
279
#ifdef PAGE_CUR_LE_OR_EXTENDS
 
280
        ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
 
281
#endif /* PAGE_CUR_LE_OR_EXTENDS */
 
282
 
 
283
        /* If mode PAGE_CUR_G is specified, we are trying to position the
 
284
        cursor to answer a query of the form "tuple < X", where tuple is
 
285
        the input parameter, and X denotes an arbitrary physical record on
 
286
        the page. We want to position the cursor on the first X which
 
287
        satisfies the condition. */
 
288
 
 
289
        up_matched_fields  = *iup_matched_fields;
 
290
        up_matched_bytes   = *iup_matched_bytes;
 
291
        low_matched_fields = *ilow_matched_fields;
 
292
        low_matched_bytes  = *ilow_matched_bytes;
 
293
 
 
294
        /* Perform binary search. First the search is done through the page
 
295
        directory, after that as a linear search in the list of records
 
296
        owned by the upper limit directory slot. */
 
297
 
 
298
        low = 0;
 
299
        up = page_dir_get_n_slots(page) - 1;
 
300
 
 
301
        /* Perform binary search until the lower and upper limit directory
 
302
        slots come to the distance 1 of each other */
 
303
 
 
304
        while (up - low > 1) {
 
305
                mid = (low + up) / 2;
 
306
                slot = page_dir_get_nth_slot(page, mid);
 
307
                mid_rec = page_dir_slot_get_rec(slot);
 
308
 
 
309
                ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
 
310
                            low_matched_fields, low_matched_bytes,
 
311
                            up_matched_fields, up_matched_bytes);
 
312
 
 
313
                offsets = rec_get_offsets(mid_rec, index, offsets,
 
314
                                          dtuple_get_n_fields_cmp(tuple),
 
315
                                          &heap);
 
316
 
 
317
                cmp = cmp_dtuple_rec_with_match(tuple, mid_rec, offsets,
 
318
                                                &cur_matched_fields,
 
319
                                                &cur_matched_bytes);
 
320
                if (UNIV_LIKELY(cmp > 0)) {
 
321
low_slot_match:
 
322
                        low = mid;
 
323
                        low_matched_fields = cur_matched_fields;
 
324
                        low_matched_bytes = cur_matched_bytes;
 
325
 
 
326
                } else if (UNIV_EXPECT(cmp, -1)) {
 
327
#ifdef PAGE_CUR_LE_OR_EXTENDS
 
328
                        if (mode == PAGE_CUR_LE_OR_EXTENDS
 
329
                            && page_cur_rec_field_extends(
 
330
                                    tuple, mid_rec, offsets,
 
331
                                    cur_matched_fields)) {
 
332
 
 
333
                                goto low_slot_match;
 
334
                        }
 
335
#endif /* PAGE_CUR_LE_OR_EXTENDS */
 
336
up_slot_match:
 
337
                        up = mid;
 
338
                        up_matched_fields = cur_matched_fields;
 
339
                        up_matched_bytes = cur_matched_bytes;
 
340
 
 
341
                } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
 
342
#ifdef PAGE_CUR_LE_OR_EXTENDS
 
343
                           || mode == PAGE_CUR_LE_OR_EXTENDS
 
344
#endif /* PAGE_CUR_LE_OR_EXTENDS */
 
345
                           ) {
 
346
 
 
347
                        goto low_slot_match;
 
348
                } else {
 
349
 
 
350
                        goto up_slot_match;
 
351
                }
 
352
        }
 
353
 
 
354
        slot = page_dir_get_nth_slot(page, low);
 
355
        low_rec = page_dir_slot_get_rec(slot);
 
356
        slot = page_dir_get_nth_slot(page, up);
 
357
        up_rec = page_dir_slot_get_rec(slot);
 
358
 
 
359
        /* Perform linear search until the upper and lower records come to
 
360
        distance 1 of each other. */
 
361
 
 
362
        while (page_rec_get_next(low_rec) != up_rec) {
 
363
 
 
364
                mid_rec = page_rec_get_next(low_rec);
 
365
 
 
366
                ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
 
367
                            low_matched_fields, low_matched_bytes,
 
368
                            up_matched_fields, up_matched_bytes);
 
369
 
 
370
                offsets = rec_get_offsets(mid_rec, index, offsets,
 
371
                                          dtuple_get_n_fields_cmp(tuple),
 
372
                                          &heap);
 
373
 
 
374
                cmp = cmp_dtuple_rec_with_match(tuple, mid_rec, offsets,
 
375
                                                &cur_matched_fields,
 
376
                                                &cur_matched_bytes);
 
377
                if (UNIV_LIKELY(cmp > 0)) {
 
378
low_rec_match:
 
379
                        low_rec = mid_rec;
 
380
                        low_matched_fields = cur_matched_fields;
 
381
                        low_matched_bytes = cur_matched_bytes;
 
382
 
 
383
                } else if (UNIV_EXPECT(cmp, -1)) {
 
384
#ifdef PAGE_CUR_LE_OR_EXTENDS
 
385
                        if (mode == PAGE_CUR_LE_OR_EXTENDS
 
386
                            && page_cur_rec_field_extends(
 
387
                                    tuple, mid_rec, offsets,
 
388
                                    cur_matched_fields)) {
 
389
 
 
390
                                goto low_rec_match;
 
391
                        }
 
392
#endif /* PAGE_CUR_LE_OR_EXTENDS */
 
393
up_rec_match:
 
394
                        up_rec = mid_rec;
 
395
                        up_matched_fields = cur_matched_fields;
 
396
                        up_matched_bytes = cur_matched_bytes;
 
397
                } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
 
398
#ifdef PAGE_CUR_LE_OR_EXTENDS
 
399
                           || mode == PAGE_CUR_LE_OR_EXTENDS
 
400
#endif /* PAGE_CUR_LE_OR_EXTENDS */
 
401
                           ) {
 
402
 
 
403
                        goto low_rec_match;
 
404
                } else {
 
405
 
 
406
                        goto up_rec_match;
 
407
                }
 
408
        }
 
409
 
 
410
#ifdef UNIV_SEARCH_DEBUG
 
411
 
 
412
        /* Check that the lower and upper limit records have the
 
413
        right alphabetical order compared to tuple. */
 
414
        dbg_matched_fields = 0;
 
415
        dbg_matched_bytes = 0;
 
416
 
 
417
        offsets = rec_get_offsets(low_rec, index, offsets,
 
418
                                  ULINT_UNDEFINED, &heap);
 
419
        dbg_cmp = page_cmp_dtuple_rec_with_match(tuple, low_rec, offsets,
 
420
                                                 &dbg_matched_fields,
 
421
                                                 &dbg_matched_bytes);
 
422
        if (mode == PAGE_CUR_G) {
 
423
                ut_a(dbg_cmp >= 0);
 
424
        } else if (mode == PAGE_CUR_GE) {
 
425
                ut_a(dbg_cmp == 1);
 
426
        } else if (mode == PAGE_CUR_L) {
 
427
                ut_a(dbg_cmp == 1);
 
428
        } else if (mode == PAGE_CUR_LE) {
 
429
                ut_a(dbg_cmp >= 0);
 
430
        }
 
431
 
 
432
        if (low_rec != page_get_infimum_rec(page)) {
 
433
 
 
434
                ut_a(low_matched_fields == dbg_matched_fields);
 
435
                ut_a(low_matched_bytes == dbg_matched_bytes);
 
436
        }
 
437
 
 
438
        dbg_matched_fields = 0;
 
439
        dbg_matched_bytes = 0;
 
440
 
 
441
        offsets = rec_get_offsets(up_rec, index, offsets,
 
442
                                  ULINT_UNDEFINED, &heap);
 
443
        dbg_cmp = page_cmp_dtuple_rec_with_match(tuple, up_rec, offsets,
 
444
                                                 &dbg_matched_fields,
 
445
                                                 &dbg_matched_bytes);
 
446
        if (mode == PAGE_CUR_G) {
 
447
                ut_a(dbg_cmp == -1);
 
448
        } else if (mode == PAGE_CUR_GE) {
 
449
                ut_a(dbg_cmp <= 0);
 
450
        } else if (mode == PAGE_CUR_L) {
 
451
                ut_a(dbg_cmp <= 0);
 
452
        } else if (mode == PAGE_CUR_LE) {
 
453
                ut_a(dbg_cmp == -1);
 
454
        }
 
455
 
 
456
        if (up_rec != page_get_supremum_rec(page)) {
 
457
 
 
458
                ut_a(up_matched_fields == dbg_matched_fields);
 
459
                ut_a(up_matched_bytes == dbg_matched_bytes);
 
460
        }
 
461
#endif
 
462
        if (mode <= PAGE_CUR_GE) {
 
463
                cursor->rec = up_rec;
 
464
        } else {
 
465
                cursor->rec = low_rec;
 
466
        }
 
467
 
 
468
        *iup_matched_fields  = up_matched_fields;
 
469
        *iup_matched_bytes   = up_matched_bytes;
 
470
        *ilow_matched_fields = low_matched_fields;
 
471
        *ilow_matched_bytes  = low_matched_bytes;
 
472
        if (UNIV_LIKELY_NULL(heap)) {
 
473
                mem_heap_free(heap);
 
474
        }
 
475
}
 
476
 
 
477
/***************************************************************
 
478
Positions a page cursor on a randomly chosen user record on a page. If there
 
479
are no user records, sets the cursor on the infimum record. */
 
480
 
 
481
void
 
482
page_cur_open_on_rnd_user_rec(
 
483
/*==========================*/
 
484
        page_t*         page,   /* in: page */
 
485
        page_cur_t*     cursor) /* in/out: page cursor */
 
486
{
 
487
        ulint   rnd;
 
488
        rec_t*  rec;
 
489
 
 
490
        if (page_get_n_recs(page) == 0) {
 
491
                page_cur_position(page_get_infimum_rec(page), cursor);
 
492
 
 
493
                return;
 
494
        }
 
495
 
 
496
        page_rnd += 87584577;
 
497
 
 
498
        rnd = page_rnd % page_get_n_recs(page);
 
499
 
 
500
        rec = page_get_infimum_rec(page);
 
501
 
 
502
        rec = page_rec_get_next(rec);
 
503
 
 
504
        while (rnd > 0) {
 
505
                rec = page_rec_get_next(rec);
 
506
 
 
507
                rnd--;
 
508
        }
 
509
 
 
510
        page_cur_position(rec, cursor);
 
511
}
 
512
 
 
513
/***************************************************************
 
514
Writes the log record of a record insert on a page. */
 
515
static
 
516
void
 
517
page_cur_insert_rec_write_log(
 
518
/*==========================*/
 
519
        rec_t*          insert_rec,     /* in: inserted physical record */
 
520
        ulint           rec_size,       /* in: insert_rec size */
 
521
        rec_t*          cursor_rec,     /* in: record the
 
522
                                        cursor is pointing to */
 
523
        dict_index_t*   index,          /* in: record descriptor */
 
524
        mtr_t*          mtr)            /* in: mini-transaction handle */
 
525
{
 
526
        ulint   cur_rec_size;
 
527
        ulint   extra_size;
 
528
        ulint   cur_extra_size;
 
529
        ulint   min_rec_size;
 
530
        byte*   ins_ptr;
 
531
        byte*   cur_ptr;
 
532
        ulint   extra_info_yes;
 
533
        byte*   log_ptr;
 
534
        byte*   log_end;
 
535
        ulint   i;
 
536
        ulint   comp;
 
537
 
 
538
        ut_a(rec_size < UNIV_PAGE_SIZE);
 
539
        ut_ad(buf_frame_align(insert_rec) == buf_frame_align(cursor_rec));
 
540
        ut_ad(!page_rec_is_comp(insert_rec)
 
541
              == !dict_table_is_comp(index->table));
 
542
        comp = page_rec_is_comp(insert_rec);
 
543
 
 
544
        {
 
545
                mem_heap_t*     heap            = NULL;
 
546
                ulint           cur_offs_[REC_OFFS_NORMAL_SIZE];
 
547
                ulint           ins_offs_[REC_OFFS_NORMAL_SIZE];
 
548
 
 
549
                ulint*          cur_offs;
 
550
                ulint*          ins_offs;
 
551
 
 
552
                *cur_offs_ = (sizeof cur_offs_) / sizeof *cur_offs_;
 
553
                *ins_offs_ = (sizeof ins_offs_) / sizeof *ins_offs_;
 
554
 
 
555
                cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
 
556
                                           ULINT_UNDEFINED, &heap);
 
557
                ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
 
558
                                           ULINT_UNDEFINED, &heap);
 
559
 
 
560
                extra_size = rec_offs_extra_size(ins_offs);
 
561
                cur_extra_size = rec_offs_extra_size(cur_offs);
 
562
                ut_ad(rec_size == rec_offs_size(ins_offs));
 
563
                cur_rec_size = rec_offs_size(cur_offs);
 
564
 
 
565
                if (UNIV_LIKELY_NULL(heap)) {
 
566
                        mem_heap_free(heap);
 
567
                }
 
568
        }
 
569
 
 
570
        ins_ptr = insert_rec - extra_size;
 
571
 
 
572
        i = 0;
 
573
 
 
574
        if (cur_extra_size == extra_size) {
 
575
                min_rec_size = ut_min(cur_rec_size, rec_size);
 
576
 
 
577
                cur_ptr = cursor_rec - cur_extra_size;
 
578
 
 
579
                /* Find out the first byte in insert_rec which differs from
 
580
                cursor_rec; skip the bytes in the record info */
 
581
 
 
582
                for (;;) {
 
583
                        if (i >= min_rec_size) {
 
584
 
 
585
                                break;
 
586
                        } else if (*ins_ptr == *cur_ptr) {
 
587
                                i++;
 
588
                                ins_ptr++;
 
589
                                cur_ptr++;
 
590
                        } else if ((i < extra_size)
 
591
                                   && (i >= extra_size
 
592
                                       - (comp
 
593
                                          ? REC_N_NEW_EXTRA_BYTES
 
594
                                          : REC_N_OLD_EXTRA_BYTES))) {
 
595
                                i = extra_size;
 
596
                                ins_ptr = insert_rec;
 
597
                                cur_ptr = cursor_rec;
 
598
                        } else {
 
599
                                break;
 
600
                        }
 
601
                }
 
602
        }
 
603
 
 
604
        if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
 
605
 
 
606
                log_ptr = mlog_open_and_write_index(mtr, insert_rec, index,
 
607
                                                    comp
 
608
                                                    ? MLOG_COMP_REC_INSERT
 
609
                                                    : MLOG_REC_INSERT,
 
610
                                                    2 + 5 + 1 + 5 + 5
 
611
                                                    + MLOG_BUF_MARGIN);
 
612
 
 
613
                if (!log_ptr) {
 
614
                        /* Logging in mtr is switched off during crash
 
615
                        recovery: in that case mlog_open returns NULL */
 
616
                        return;
 
617
                }
 
618
 
 
619
                log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
 
620
                /* Write the cursor rec offset as a 2-byte ulint */
 
621
                mach_write_to_2(log_ptr, cursor_rec
 
622
                                - buf_frame_align(cursor_rec));
 
623
                log_ptr += 2;
 
624
        } else {
 
625
                log_ptr = mlog_open(mtr, 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
 
626
                if (!log_ptr) {
 
627
                        /* Logging in mtr is switched off during crash
 
628
                        recovery: in that case mlog_open returns NULL */
 
629
                        return;
 
630
                }
 
631
                log_end = &log_ptr[5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
 
632
        }
 
633
 
 
634
        if ((rec_get_info_and_status_bits(insert_rec, comp)
 
635
             != rec_get_info_and_status_bits(cursor_rec, comp))
 
636
            || (extra_size != cur_extra_size)
 
637
            || (rec_size != cur_rec_size)) {
 
638
 
 
639
                extra_info_yes = 1;
 
640
        } else {
 
641
                extra_info_yes = 0;
 
642
        }
 
643
 
 
644
        /* Write the record end segment length and the extra info storage
 
645
        flag */
 
646
        log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i)
 
647
                                         + extra_info_yes);
 
648
        if (extra_info_yes) {
 
649
                /* Write the info bits */
 
650
                mach_write_to_1(log_ptr,
 
651
                                rec_get_info_and_status_bits(insert_rec,
 
652
                                                             comp));
 
653
                log_ptr++;
 
654
 
 
655
                /* Write the record origin offset */
 
656
                log_ptr += mach_write_compressed(log_ptr, extra_size);
 
657
 
 
658
                /* Write the mismatch index */
 
659
                log_ptr += mach_write_compressed(log_ptr, i);
 
660
 
 
661
                ut_a(i < UNIV_PAGE_SIZE);
 
662
                ut_a(extra_size < UNIV_PAGE_SIZE);
 
663
        }
 
664
 
 
665
        /* Write to the log the inserted index record end segment which
 
666
        differs from the cursor record */
 
667
 
 
668
        rec_size -= i;
 
669
 
 
670
        if (log_ptr + rec_size <= log_end) {
 
671
                memcpy(log_ptr, ins_ptr, rec_size);
 
672
                mlog_close(mtr, log_ptr + rec_size);
 
673
        } else {
 
674
                mlog_close(mtr, log_ptr);
 
675
                ut_a(rec_size < UNIV_PAGE_SIZE);
 
676
                mlog_catenate_string(mtr, ins_ptr, rec_size);
 
677
        }
 
678
}
 
679
 
 
680
/***************************************************************
 
681
Parses a log record of a record insert on a page. */
 
682
 
 
683
byte*
 
684
page_cur_parse_insert_rec(
 
685
/*======================*/
 
686
                                /* out: end of log record or NULL */
 
687
        ibool           is_short,/* in: TRUE if short inserts */
 
688
        byte*           ptr,    /* in: buffer */
 
689
        byte*           end_ptr,/* in: buffer end */
 
690
        dict_index_t*   index,  /* in: record descriptor */
 
691
        page_t*         page,   /* in: page or NULL */
 
692
        mtr_t*          mtr)    /* in: mtr or NULL */
 
693
{
 
694
        ulint   extra_info_yes;
 
695
        ulint   offset = 0; /* remove warning */
 
696
        ulint   origin_offset;
 
697
        ulint   end_seg_len;
 
698
        ulint   mismatch_index;
 
699
        rec_t*  cursor_rec;
 
700
        byte    buf1[1024];
 
701
        byte*   buf;
 
702
        byte*   ptr2 = ptr;
 
703
        ulint   info_and_status_bits = 0; /* remove warning */
 
704
        page_cur_t cursor;
 
705
        mem_heap_t*     heap            = NULL;
 
706
        ulint           offsets_[REC_OFFS_NORMAL_SIZE];
 
707
        ulint*          offsets         = offsets_;
 
708
        *offsets_ = (sizeof offsets_) / sizeof *offsets_;
 
709
 
 
710
        if (!is_short) {
 
711
                /* Read the cursor rec offset as a 2-byte ulint */
 
712
 
 
713
                if (end_ptr < ptr + 2) {
 
714
 
 
715
                        return(NULL);
 
716
                }
 
717
 
 
718
                offset = mach_read_from_2(ptr);
 
719
 
 
720
                if (offset >= UNIV_PAGE_SIZE) {
 
721
 
 
722
                        recv_sys->found_corrupt_log = TRUE;
 
723
 
 
724
                        return(NULL);
 
725
                }
 
726
 
 
727
                ptr += 2;
 
728
        }
 
729
 
 
730
        ptr = mach_parse_compressed(ptr, end_ptr, &end_seg_len);
 
731
 
 
732
        if (ptr == NULL) {
 
733
 
 
734
                return(NULL);
 
735
        }
 
736
 
 
737
        extra_info_yes = end_seg_len & 0x1UL;
 
738
        end_seg_len >>= 1;
 
739
 
 
740
        if (end_seg_len >= UNIV_PAGE_SIZE) {
 
741
                recv_sys->found_corrupt_log = TRUE;
 
742
 
 
743
                return(NULL);
 
744
        }
 
745
 
 
746
        if (extra_info_yes) {
 
747
                /* Read the info bits */
 
748
 
 
749
                if (end_ptr < ptr + 1) {
 
750
 
 
751
                        return(NULL);
 
752
                }
 
753
 
 
754
                info_and_status_bits = mach_read_from_1(ptr);
 
755
                ptr++;
 
756
 
 
757
                ptr = mach_parse_compressed(ptr, end_ptr, &origin_offset);
 
758
 
 
759
                if (ptr == NULL) {
 
760
 
 
761
                        return(NULL);
 
762
                }
 
763
 
 
764
                ut_a(origin_offset < UNIV_PAGE_SIZE);
 
765
 
 
766
                ptr = mach_parse_compressed(ptr, end_ptr, &mismatch_index);
 
767
 
 
768
                if (ptr == NULL) {
 
769
 
 
770
                        return(NULL);
 
771
                }
 
772
 
 
773
                ut_a(mismatch_index < UNIV_PAGE_SIZE);
 
774
        }
 
775
 
 
776
        if (end_ptr < ptr + end_seg_len) {
 
777
 
 
778
                return(NULL);
 
779
        }
 
780
 
 
781
        if (page == NULL) {
 
782
 
 
783
                return(ptr + end_seg_len);
 
784
        }
 
785
 
 
786
        ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
 
787
 
 
788
        /* Read from the log the inserted index record end segment which
 
789
        differs from the cursor record */
 
790
 
 
791
        if (is_short) {
 
792
                cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
 
793
        } else {
 
794
                cursor_rec = page + offset;
 
795
        }
 
796
 
 
797
        offsets = rec_get_offsets(cursor_rec, index, offsets,
 
798
                                  ULINT_UNDEFINED, &heap);
 
799
 
 
800
        if (extra_info_yes == 0) {
 
801
                info_and_status_bits = rec_get_info_and_status_bits(
 
802
                        cursor_rec, page_is_comp(page));
 
803
                origin_offset = rec_offs_extra_size(offsets);
 
804
                mismatch_index = rec_offs_size(offsets) - end_seg_len;
 
805
        }
 
806
 
 
807
        if (mismatch_index + end_seg_len < sizeof buf1) {
 
808
                buf = buf1;
 
809
        } else {
 
810
                buf = mem_alloc(mismatch_index + end_seg_len);
 
811
        }
 
812
 
 
813
        /* Build the inserted record to buf */
 
814
 
 
815
        if (mismatch_index >= UNIV_PAGE_SIZE) {
 
816
                fprintf(stderr,
 
817
                        "Is short %lu, info_and_status_bits %lu, offset %lu, "
 
818
                        "o_offset %lu\n"
 
819
                        "mismatch index %lu, end_seg_len %lu\n"
 
820
                        "parsed len %lu\n",
 
821
                        (ulong) is_short, (ulong) info_and_status_bits,
 
822
                        (ulong) offset,
 
823
                        (ulong) origin_offset,
 
824
                        (ulong) mismatch_index, (ulong) end_seg_len,
 
825
                        (ulong) (ptr - ptr2));
 
826
 
 
827
                fputs("Dump of 300 bytes of log:\n", stderr);
 
828
                ut_print_buf(stderr, ptr2, 300);
 
829
 
 
830
                buf_page_print(page);
 
831
 
 
832
                ut_error;
 
833
        }
 
834
 
 
835
        ut_memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
 
836
        ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
 
837
 
 
838
        rec_set_info_and_status_bits(buf + origin_offset, page_is_comp(page),
 
839
                                     info_and_status_bits);
 
840
 
 
841
        page_cur_position(cursor_rec, &cursor);
 
842
 
 
843
        offsets = rec_get_offsets(buf + origin_offset, index, offsets,
 
844
                                  ULINT_UNDEFINED, &heap);
 
845
        page_cur_rec_insert(&cursor, buf + origin_offset, index, offsets, mtr);
 
846
 
 
847
        if (buf != buf1) {
 
848
 
 
849
                mem_free(buf);
 
850
        }
 
851
 
 
852
        if (UNIV_LIKELY_NULL(heap)) {
 
853
                mem_heap_free(heap);
 
854
        }
 
855
 
 
856
        return(ptr + end_seg_len);
 
857
}
 
858
 
 
859
/***************************************************************
 
860
Inserts a record next to page cursor. Returns pointer to inserted record if
 
861
succeed, i.e., enough space available, NULL otherwise. The record to be
 
862
inserted can be in a data tuple or as a physical record. The other parameter
 
863
must then be NULL. The cursor stays at the same position. */
 
864
 
 
865
rec_t*
 
866
page_cur_insert_rec_low(
 
867
/*====================*/
 
868
                                /* out: pointer to record if succeed, NULL
 
869
                                otherwise */
 
870
        page_cur_t*     cursor, /* in: a page cursor */
 
871
        dtuple_t*       tuple,  /* in: pointer to a data tuple or NULL */
 
872
        dict_index_t*   index,  /* in: record descriptor */
 
873
        rec_t*          rec,    /* in: pointer to a physical record or NULL */
 
874
        ulint*          offsets,/* in: rec_get_offsets(rec, index) or NULL */
 
875
        mtr_t*          mtr)    /* in: mini-transaction handle */
 
876
{
 
877
        byte*           insert_buf      = NULL;
 
878
        ulint           rec_size;
 
879
        byte*           page;           /* the relevant page */
 
880
        rec_t*          last_insert;    /* cursor position at previous
 
881
                                        insert */
 
882
        rec_t*          insert_rec;     /* inserted record */
 
883
        ulint           heap_no;        /* heap number of the inserted
 
884
                                        record */
 
885
        rec_t*          current_rec;    /* current record after which the
 
886
                                        new record is inserted */
 
887
        rec_t*          next_rec;       /* next record after current before
 
888
                                        the insertion */
 
889
        ulint           owner_slot;     /* the slot which owns the
 
890
                                        inserted record */
 
891
        rec_t*          owner_rec;
 
892
        ulint           n_owned;
 
893
        mem_heap_t*     heap            = NULL;
 
894
        ulint           comp;
 
895
 
 
896
        ut_ad(cursor && mtr);
 
897
        ut_ad(tuple || rec);
 
898
        ut_ad(!(tuple && rec));
 
899
        ut_ad(rec || dtuple_check_typed(tuple));
 
900
 
 
901
        page = page_cur_get_page(cursor);
 
902
        comp = page_is_comp(page);
 
903
        ut_ad(dict_table_is_comp(index->table) == !!comp);
 
904
 
 
905
        ut_ad(cursor->rec != page_get_supremum_rec(page));
 
906
 
 
907
        /* 1. Get the size of the physical record in the page */
 
908
        if (tuple != NULL) {
 
909
                rec_size = rec_get_converted_size(index, tuple);
 
910
        } else {
 
911
                if (!offsets) {
 
912
                        offsets = rec_get_offsets(rec, index, offsets,
 
913
                                                  ULINT_UNDEFINED, &heap);
 
914
                }
 
915
                ut_ad(rec_offs_validate(rec, index, offsets));
 
916
                rec_size = rec_offs_size(offsets);
 
917
        }
 
918
 
 
919
        /* 2. Try to find suitable space from page memory management */
 
920
        insert_buf = page_mem_alloc(page, rec_size, index, &heap_no);
 
921
 
 
922
        if (insert_buf == NULL) {
 
923
                if (UNIV_LIKELY_NULL(heap)) {
 
924
                        mem_heap_free(heap);
 
925
                }
 
926
                return(NULL);
 
927
        }
 
928
 
 
929
        /* 3. Create the record */
 
930
        if (tuple != NULL) {
 
931
                insert_rec = rec_convert_dtuple_to_rec(insert_buf,
 
932
                                                       index, tuple);
 
933
                offsets = rec_get_offsets(insert_rec, index, offsets,
 
934
                                          ULINT_UNDEFINED, &heap);
 
935
        } else {
 
936
                insert_rec = rec_copy(insert_buf, rec, offsets);
 
937
                ut_ad(rec_offs_validate(rec, index, offsets));
 
938
                rec_offs_make_valid(insert_rec, index, offsets);
 
939
        }
 
940
 
 
941
        ut_ad(insert_rec);
 
942
        ut_ad(rec_size == rec_offs_size(offsets));
 
943
 
 
944
        /* 4. Insert the record in the linked list of records */
 
945
        current_rec = cursor->rec;
 
946
 
 
947
        ut_ad(!comp || rec_get_status(current_rec) <= REC_STATUS_INFIMUM);
 
948
        ut_ad(!comp || rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
 
949
 
 
950
        next_rec = page_rec_get_next(current_rec);
 
951
        ut_ad(!comp || rec_get_status(next_rec) != REC_STATUS_INFIMUM);
 
952
        page_rec_set_next(insert_rec, next_rec);
 
953
        page_rec_set_next(current_rec, insert_rec);
 
954
 
 
955
        page_header_set_field(page, PAGE_N_RECS, 1 + page_get_n_recs(page));
 
956
 
 
957
        /* 5. Set the n_owned field in the inserted record to zero,
 
958
        and set the heap_no field */
 
959
 
 
960
        rec_set_n_owned(insert_rec, comp, 0);
 
961
        rec_set_heap_no(insert_rec, comp, heap_no);
 
962
 
 
963
        /* 6. Update the last insertion info in page header */
 
964
 
 
965
        last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
 
966
        ut_ad(!last_insert || !comp
 
967
              || rec_get_node_ptr_flag(last_insert)
 
968
              == rec_get_node_ptr_flag(insert_rec));
 
969
 
 
970
        if (last_insert == NULL) {
 
971
                page_header_set_field(page, PAGE_DIRECTION, PAGE_NO_DIRECTION);
 
972
                page_header_set_field(page, PAGE_N_DIRECTION, 0);
 
973
 
 
974
        } else if ((last_insert == current_rec)
 
975
                   && (page_header_get_field(page, PAGE_DIRECTION)
 
976
                       != PAGE_LEFT)) {
 
977
 
 
978
                page_header_set_field(page, PAGE_DIRECTION, PAGE_RIGHT);
 
979
                page_header_set_field(page, PAGE_N_DIRECTION,
 
980
                                      page_header_get_field(
 
981
                                              page, PAGE_N_DIRECTION) + 1);
 
982
 
 
983
        } else if ((page_rec_get_next(insert_rec) == last_insert)
 
984
                   && (page_header_get_field(page, PAGE_DIRECTION)
 
985
                       != PAGE_RIGHT)) {
 
986
 
 
987
                page_header_set_field(page, PAGE_DIRECTION, PAGE_LEFT);
 
988
                page_header_set_field(page, PAGE_N_DIRECTION,
 
989
                                      page_header_get_field(
 
990
                                              page, PAGE_N_DIRECTION) + 1);
 
991
        } else {
 
992
                page_header_set_field(page, PAGE_DIRECTION, PAGE_NO_DIRECTION);
 
993
                page_header_set_field(page, PAGE_N_DIRECTION, 0);
 
994
        }
 
995
 
 
996
        page_header_set_ptr(page, PAGE_LAST_INSERT, insert_rec);
 
997
 
 
998
        /* 7. It remains to update the owner record. */
 
999
 
 
1000
        owner_rec = page_rec_find_owner_rec(insert_rec);
 
1001
        n_owned = rec_get_n_owned(owner_rec, comp);
 
1002
        rec_set_n_owned(owner_rec, comp, n_owned + 1);
 
1003
 
 
1004
        /* 8. Now we have incremented the n_owned field of the owner
 
1005
        record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
 
1006
        we have to split the corresponding directory slot in two. */
 
1007
 
 
1008
        if (n_owned == PAGE_DIR_SLOT_MAX_N_OWNED) {
 
1009
                owner_slot = page_dir_find_owner_slot(owner_rec);
 
1010
                page_dir_split_slot(page, owner_slot);
 
1011
        }
 
1012
 
 
1013
        /* 9. Write log record of the insert */
 
1014
        page_cur_insert_rec_write_log(insert_rec, rec_size, current_rec,
 
1015
                                      index, mtr);
 
1016
 
 
1017
        if (UNIV_LIKELY_NULL(heap)) {
 
1018
                mem_heap_free(heap);
 
1019
        }
 
1020
        return(insert_rec);
 
1021
}
 
1022
 
 
1023
/**************************************************************
 
1024
Writes a log record of copying a record list end to a new created page. */
 
1025
UNIV_INLINE
 
1026
byte*
 
1027
page_copy_rec_list_to_created_page_write_log(
 
1028
/*=========================================*/
 
1029
                                /* out: 4-byte field where to
 
1030
                                write the log data length */
 
1031
        page_t*         page,   /* in: index page */
 
1032
        dict_index_t*   index,  /* in: record descriptor */
 
1033
        mtr_t*          mtr)    /* in: mtr */
 
1034
{
 
1035
        byte*   log_ptr;
 
1036
 
 
1037
        ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
 
1038
 
 
1039
        log_ptr = mlog_open_and_write_index(mtr, page, index,
 
1040
                                            page_is_comp(page)
 
1041
                                            ? MLOG_COMP_LIST_END_COPY_CREATED
 
1042
                                            : MLOG_LIST_END_COPY_CREATED, 4);
 
1043
        ut_a(log_ptr);
 
1044
        mlog_close(mtr, log_ptr + 4);
 
1045
 
 
1046
        return(log_ptr);
 
1047
}
 
1048
 
 
1049
/**************************************************************
 
1050
Parses a log record of copying a record list end to a new created page. */
 
1051
 
 
1052
byte*
 
1053
page_parse_copy_rec_list_to_created_page(
 
1054
/*=====================================*/
 
1055
                                /* out: end of log record or NULL */
 
1056
        byte*           ptr,    /* in: buffer */
 
1057
        byte*           end_ptr,/* in: buffer end */
 
1058
        dict_index_t*   index,  /* in: record descriptor */
 
1059
        page_t*         page,   /* in: page or NULL */
 
1060
        mtr_t*          mtr)    /* in: mtr or NULL */
 
1061
{
 
1062
        byte*   rec_end;
 
1063
        ulint   log_data_len;
 
1064
 
 
1065
        if (ptr + 4 > end_ptr) {
 
1066
 
 
1067
                return(NULL);
 
1068
        }
 
1069
 
 
1070
        log_data_len = mach_read_from_4(ptr);
 
1071
        ptr += 4;
 
1072
 
 
1073
        rec_end = ptr + log_data_len;
 
1074
 
 
1075
        if (rec_end > end_ptr) {
 
1076
 
 
1077
                return(NULL);
 
1078
        }
 
1079
 
 
1080
        if (!page) {
 
1081
 
 
1082
                return(rec_end);
 
1083
        }
 
1084
 
 
1085
        while (ptr < rec_end) {
 
1086
                ptr = page_cur_parse_insert_rec(TRUE, ptr, end_ptr,
 
1087
                                                index, page, mtr);
 
1088
        }
 
1089
 
 
1090
        ut_a(ptr == rec_end);
 
1091
 
 
1092
        page_header_set_ptr(page, PAGE_LAST_INSERT, NULL);
 
1093
        page_header_set_field(page, PAGE_DIRECTION, PAGE_NO_DIRECTION);
 
1094
        page_header_set_field(page, PAGE_N_DIRECTION, 0);
 
1095
 
 
1096
        return(rec_end);
 
1097
}
 
1098
 
 
1099
/*****************************************************************
 
1100
Copies records from page to a newly created page, from a given record onward,
 
1101
including that record. Infimum and supremum records are not copied. */
 
1102
 
 
1103
void
 
1104
page_copy_rec_list_end_to_created_page(
 
1105
/*===================================*/
 
1106
        page_t*         new_page,       /* in: index page to copy to */
 
1107
        page_t*         page,           /* in: index page */
 
1108
        rec_t*          rec,            /* in: first record to copy */
 
1109
        dict_index_t*   index,          /* in: record descriptor */
 
1110
        mtr_t*          mtr)            /* in: mtr */
 
1111
{
 
1112
        page_dir_slot_t* slot = 0; /* remove warning */
 
1113
        byte*   heap_top;
 
1114
        rec_t*  insert_rec = 0; /* remove warning */
 
1115
        rec_t*  prev_rec;
 
1116
        ulint   count;
 
1117
        ulint   n_recs;
 
1118
        ulint   slot_index;
 
1119
        ulint   rec_size;
 
1120
        ulint   log_mode;
 
1121
        byte*   log_ptr;
 
1122
        ulint   log_data_len;
 
1123
        ulint           comp            = page_is_comp(page);
 
1124
        mem_heap_t*     heap            = NULL;
 
1125
        ulint           offsets_[REC_OFFS_NORMAL_SIZE];
 
1126
        ulint*          offsets         = offsets_;
 
1127
        *offsets_ = (sizeof offsets_) / sizeof *offsets_;
 
1128
 
 
1129
        ut_ad(page_dir_get_n_heap(new_page) == 2);
 
1130
        ut_ad(page != new_page);
 
1131
        ut_ad(comp == page_is_comp(new_page));
 
1132
 
 
1133
        if (rec == page_get_infimum_rec(page)) {
 
1134
 
 
1135
                rec = page_rec_get_next(rec);
 
1136
        }
 
1137
 
 
1138
        if (rec == page_get_supremum_rec(page)) {
 
1139
 
 
1140
                return;
 
1141
        }
 
1142
 
 
1143
#ifdef UNIV_DEBUG
 
1144
        /* To pass the debug tests we have to set these dummy values
 
1145
        in the debug version */
 
1146
        page_dir_set_n_slots(new_page, UNIV_PAGE_SIZE / 2);
 
1147
        page_header_set_ptr(new_page, PAGE_HEAP_TOP,
 
1148
                            new_page + UNIV_PAGE_SIZE - 1);
 
1149
#endif
 
1150
 
 
1151
        log_ptr = page_copy_rec_list_to_created_page_write_log(new_page,
 
1152
                                                               index, mtr);
 
1153
 
 
1154
        log_data_len = dyn_array_get_data_size(&(mtr->log));
 
1155
 
 
1156
        /* Individual inserts are logged in a shorter form */
 
1157
 
 
1158
        log_mode = mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
 
1159
 
 
1160
        prev_rec = page_get_infimum_rec(new_page);
 
1161
        if (comp) {
 
1162
                heap_top = new_page + PAGE_NEW_SUPREMUM_END;
 
1163
        } else {
 
1164
                heap_top = new_page + PAGE_OLD_SUPREMUM_END;
 
1165
        }
 
1166
        count = 0;
 
1167
        slot_index = 0;
 
1168
        n_recs = 0;
 
1169
 
 
1170
        /* should be do ... until, comment by Jani */
 
1171
        while (rec != page_get_supremum_rec(page)) {
 
1172
                offsets = rec_get_offsets(rec, index, offsets,
 
1173
                                          ULINT_UNDEFINED, &heap);
 
1174
                insert_rec = rec_copy(heap_top, rec, offsets);
 
1175
 
 
1176
                rec_set_next_offs(prev_rec, comp, insert_rec - new_page);
 
1177
 
 
1178
                rec_set_n_owned(insert_rec, comp, 0);
 
1179
                rec_set_heap_no(insert_rec, comp, 2 + n_recs);
 
1180
 
 
1181
                rec_size = rec_offs_size(offsets);
 
1182
 
 
1183
                heap_top = heap_top + rec_size;
 
1184
 
 
1185
                ut_ad(heap_top < new_page + UNIV_PAGE_SIZE);
 
1186
 
 
1187
                count++;
 
1188
                n_recs++;
 
1189
 
 
1190
                if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) {
 
1191
 
 
1192
                        slot_index++;
 
1193
 
 
1194
                        slot = page_dir_get_nth_slot(new_page, slot_index);
 
1195
 
 
1196
                        page_dir_slot_set_rec(slot, insert_rec);
 
1197
                        page_dir_slot_set_n_owned(slot, count);
 
1198
 
 
1199
                        count = 0;
 
1200
                }
 
1201
 
 
1202
                page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
 
1203
                                              index, mtr);
 
1204
                prev_rec = insert_rec;
 
1205
                rec = page_rec_get_next(rec);
 
1206
        }
 
1207
 
 
1208
        if ((slot_index > 0) && (count + 1
 
1209
                                 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
 
1210
                                 <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
 
1211
                /* We can merge the two last dir slots. This operation is
 
1212
                here to make this function imitate exactly the equivalent
 
1213
                task made using page_cur_insert_rec, which we use in database
 
1214
                recovery to reproduce the task performed by this function.
 
1215
                To be able to check the correctness of recovery, it is good
 
1216
                that it imitates exactly. */
 
1217
 
 
1218
                count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
 
1219
 
 
1220
                page_dir_slot_set_n_owned(slot, 0);
 
1221
 
 
1222
                slot_index--;
 
1223
        }
 
1224
 
 
1225
        if (UNIV_LIKELY_NULL(heap)) {
 
1226
                mem_heap_free(heap);
 
1227
        }
 
1228
 
 
1229
        log_data_len = dyn_array_get_data_size(&(mtr->log)) - log_data_len;
 
1230
 
 
1231
        ut_a(log_data_len < 100 * UNIV_PAGE_SIZE);
 
1232
 
 
1233
        mach_write_to_4(log_ptr, log_data_len);
 
1234
 
 
1235
        rec_set_next_offs(insert_rec, comp,
 
1236
                          comp ? PAGE_NEW_SUPREMUM : PAGE_OLD_SUPREMUM);
 
1237
 
 
1238
        slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
 
1239
 
 
1240
        page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
 
1241
        page_dir_slot_set_n_owned(slot, count + 1);
 
1242
 
 
1243
        page_dir_set_n_slots(new_page, 2 + slot_index);
 
1244
        page_header_set_ptr(new_page, PAGE_HEAP_TOP, heap_top);
 
1245
        page_dir_set_n_heap(new_page, 2 + n_recs);
 
1246
        page_header_set_field(new_page, PAGE_N_RECS, n_recs);
 
1247
 
 
1248
        page_header_set_ptr(new_page, PAGE_LAST_INSERT, NULL);
 
1249
        page_header_set_field(new_page, PAGE_DIRECTION, PAGE_NO_DIRECTION);
 
1250
        page_header_set_field(new_page, PAGE_N_DIRECTION, 0);
 
1251
 
 
1252
        /* Restore the log mode */
 
1253
 
 
1254
        mtr_set_log_mode(mtr, log_mode);
 
1255
}
 
1256
 
 
1257
/***************************************************************
 
1258
Writes log record of a record delete on a page. */
 
1259
UNIV_INLINE
 
1260
void
 
1261
page_cur_delete_rec_write_log(
 
1262
/*==========================*/
 
1263
        rec_t*          rec,    /* in: record to be deleted */
 
1264
        dict_index_t*   index,  /* in: record descriptor */
 
1265
        mtr_t*          mtr)    /* in: mini-transaction handle */
 
1266
{
 
1267
        byte*   log_ptr;
 
1268
 
 
1269
        ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
 
1270
 
 
1271
        log_ptr = mlog_open_and_write_index(mtr, rec, index,
 
1272
                                            page_rec_is_comp(rec)
 
1273
                                            ? MLOG_COMP_REC_DELETE
 
1274
                                            : MLOG_REC_DELETE, 2);
 
1275
 
 
1276
        if (!log_ptr) {
 
1277
                /* Logging in mtr is switched off during crash recovery:
 
1278
                in that case mlog_open returns NULL */
 
1279
                return;
 
1280
        }
 
1281
 
 
1282
        /* Write the cursor rec offset as a 2-byte ulint */
 
1283
        mach_write_to_2(log_ptr, page_offset(rec));
 
1284
 
 
1285
        mlog_close(mtr, log_ptr + 2);
 
1286
}
 
1287
 
 
1288
/***************************************************************
 
1289
Parses log record of a record delete on a page. */
 
1290
 
 
1291
byte*
 
1292
page_cur_parse_delete_rec(
 
1293
/*======================*/
 
1294
                                /* out: pointer to record end or NULL */
 
1295
        byte*           ptr,    /* in: buffer */
 
1296
        byte*           end_ptr,/* in: buffer end */
 
1297
        dict_index_t*   index,  /* in: record descriptor */
 
1298
        page_t*         page,   /* in: page or NULL */
 
1299
        mtr_t*          mtr)    /* in: mtr or NULL */
 
1300
{
 
1301
        ulint           offset;
 
1302
        page_cur_t      cursor;
 
1303
 
 
1304
        if (end_ptr < ptr + 2) {
 
1305
 
 
1306
                return(NULL);
 
1307
        }
 
1308
 
 
1309
        /* Read the cursor rec offset as a 2-byte ulint */
 
1310
        offset = mach_read_from_2(ptr);
 
1311
        ptr += 2;
 
1312
 
 
1313
        ut_a(offset <= UNIV_PAGE_SIZE);
 
1314
 
 
1315
        if (page) {
 
1316
                mem_heap_t*     heap            = NULL;
 
1317
                ulint           offsets_[REC_OFFS_NORMAL_SIZE];
 
1318
                rec_t*          rec             = page + offset;
 
1319
                *offsets_ = (sizeof offsets_) / sizeof *offsets_;
 
1320
 
 
1321
                page_cur_position(rec, &cursor);
 
1322
 
 
1323
                page_cur_delete_rec(&cursor, index,
 
1324
                                    rec_get_offsets(rec, index, offsets_,
 
1325
                                                    ULINT_UNDEFINED, &heap),
 
1326
                                    mtr);
 
1327
                if (UNIV_LIKELY_NULL(heap)) {
 
1328
                        mem_heap_free(heap);
 
1329
                }
 
1330
        }
 
1331
 
 
1332
        return(ptr);
 
1333
}
 
1334
 
 
1335
/***************************************************************
 
1336
Deletes a record at the page cursor. The cursor is moved to the next
 
1337
record after the deleted one. */
 
1338
 
 
1339
void
 
1340
page_cur_delete_rec(
 
1341
/*================*/
 
1342
        page_cur_t*     cursor, /* in: a page cursor */
 
1343
        dict_index_t*   index,  /* in: record descriptor */
 
1344
        const ulint*    offsets,/* in: rec_get_offsets(cursor->rec, index) */
 
1345
        mtr_t*          mtr)    /* in: mini-transaction handle */
 
1346
{
 
1347
        page_dir_slot_t* cur_dir_slot;
 
1348
        page_dir_slot_t* prev_slot;
 
1349
        page_t*         page;
 
1350
        rec_t*          current_rec;
 
1351
        rec_t*          prev_rec        = NULL;
 
1352
        rec_t*          next_rec;
 
1353
        ulint           cur_slot_no;
 
1354
        ulint           cur_n_owned;
 
1355
        rec_t*          rec;
 
1356
 
 
1357
        ut_ad(cursor && mtr);
 
1358
 
 
1359
        page = page_cur_get_page(cursor);
 
1360
        current_rec = cursor->rec;
 
1361
        ut_ad(rec_offs_validate(current_rec, index, offsets));
 
1362
        ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
 
1363
 
 
1364
        /* The record must not be the supremum or infimum record. */
 
1365
        ut_ad(current_rec != page_get_supremum_rec(page));
 
1366
        ut_ad(current_rec != page_get_infimum_rec(page));
 
1367
 
 
1368
        /* Save to local variables some data associated with current_rec */
 
1369
        cur_slot_no = page_dir_find_owner_slot(current_rec);
 
1370
        cur_dir_slot = page_dir_get_nth_slot(page, cur_slot_no);
 
1371
        cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
 
1372
 
 
1373
        /* 0. Write the log record */
 
1374
        page_cur_delete_rec_write_log(current_rec, index, mtr);
 
1375
 
 
1376
        /* 1. Reset the last insert info in the page header and increment
 
1377
        the modify clock for the frame */
 
1378
 
 
1379
        page_header_set_ptr(page, PAGE_LAST_INSERT, NULL);
 
1380
 
 
1381
        /* The page gets invalid for optimistic searches: increment the
 
1382
        frame modify clock */
 
1383
 
 
1384
        buf_frame_modify_clock_inc(page);
 
1385
 
 
1386
        /* 2. Find the next and the previous record. Note that the cursor is
 
1387
        left at the next record. */
 
1388
 
 
1389
        ut_ad(cur_slot_no > 0);
 
1390
        prev_slot = page_dir_get_nth_slot(page, cur_slot_no - 1);
 
1391
 
 
1392
        rec = page_dir_slot_get_rec(prev_slot);
 
1393
 
 
1394
        /* rec now points to the record of the previous directory slot. Look
 
1395
        for the immediate predecessor of current_rec in a loop. */
 
1396
 
 
1397
        while(current_rec != rec) {
 
1398
                prev_rec = rec;
 
1399
                rec = page_rec_get_next(rec);
 
1400
        }
 
1401
 
 
1402
        page_cur_move_to_next(cursor);
 
1403
        next_rec = cursor->rec;
 
1404
 
 
1405
        /* 3. Remove the record from the linked list of records */
 
1406
 
 
1407
        page_rec_set_next(prev_rec, next_rec);
 
1408
        page_header_set_field(page, PAGE_N_RECS,
 
1409
                              (ulint)(page_get_n_recs(page) - 1));
 
1410
 
 
1411
        /* 4. If the deleted record is pointed to by a dir slot, update the
 
1412
        record pointer in slot. In the following if-clause we assume that
 
1413
        prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
 
1414
        >= 2. */
 
1415
 
 
1416
#if PAGE_DIR_SLOT_MIN_N_OWNED < 2
 
1417
# error "PAGE_DIR_SLOT_MIN_N_OWNED < 2"
 
1418
#endif
 
1419
        ut_ad(cur_n_owned > 1);
 
1420
 
 
1421
        if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
 
1422
                page_dir_slot_set_rec(cur_dir_slot, prev_rec);
 
1423
        }
 
1424
 
 
1425
        /* 5. Update the number of owned records of the slot */
 
1426
 
 
1427
        page_dir_slot_set_n_owned(cur_dir_slot, cur_n_owned - 1);
 
1428
 
 
1429
        /* 6. Free the memory occupied by the record */
 
1430
        page_mem_free(page, current_rec, offsets);
 
1431
 
 
1432
        /* 7. Now we have decremented the number of owned records of the slot.
 
1433
        If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
 
1434
        slots. */
 
1435
 
 
1436
        if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
 
1437
                page_dir_balance_slot(page, cur_slot_no);
 
1438
        }
 
1439
}