~jlukas79/+junk/mysql-server

« back to all changes in this revision

Viewing changes to storage/maria/ma_write.c

manual merge 6.0-main --> 6.0-bka-review

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/* Write a row to a MARIA table */
 
17
 
 
18
#include "ma_fulltext.h"
 
19
#include "ma_rt_index.h"
 
20
#include "trnman.h"
 
21
#include "ma_key_recover.h"
 
22
#include "ma_blockrec.h"
 
23
 
 
24
#define MAX_POINTER_LENGTH 8
 
25
 
 
26
        /* Functions declared in this file */
 
27
 
 
28
static int w_search(register MARIA_HA *info, uint32 comp_flag,
 
29
                    MARIA_KEY *key, my_off_t page,
 
30
                    my_off_t father_page, uchar *father_buff,
 
31
                    MARIA_PINNED_PAGE *father_page_link, uchar *father_keypos,
 
32
                    my_bool insert_last);
 
33
static int _ma_balance_page(MARIA_HA *info,MARIA_KEYDEF *keyinfo,
 
34
                            MARIA_KEY *key, uchar *curr_buff, my_off_t page,
 
35
                            my_off_t father_page, uchar *father_buff,
 
36
                            uchar *father_keypos,
 
37
                            MARIA_KEY_PARAM *s_temp);
 
38
static uchar *_ma_find_last_pos(MARIA_HA *info, MARIA_KEY *int_key,
 
39
                                uchar *page, uchar **after_key);
 
40
static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key);
 
41
static my_bool _ma_ck_write_btree(register MARIA_HA *info, MARIA_KEY *key);
 
42
static int _ma_ck_write_btree_with_log(MARIA_HA *info, MARIA_KEY *key,
 
43
                                       my_off_t *root, uint32 comp_flag);
 
44
static my_bool _ma_log_split(MARIA_HA *info, my_off_t page, const uchar *buff,
 
45
                             uint org_length, uint new_length,
 
46
                             const uchar *key_pos,
 
47
                             uint key_length, int move_length,
 
48
                             enum en_key_op prefix_or_suffix,
 
49
                             const uchar *data, uint data_length,
 
50
                             uint changed_length);
 
51
static my_bool _ma_log_del_prefix(MARIA_HA *info, my_off_t page,
 
52
                                  const uchar *buff,
 
53
                                  uint org_length, uint new_length,
 
54
                                  const uchar *key_pos, uint key_length,
 
55
                                  int move_length);
 
56
static my_bool _ma_log_key_middle(MARIA_HA *info, my_off_t page,
 
57
                                  const uchar *buff,
 
58
                                  uint new_length,
 
59
                                  uint data_added_first,
 
60
                                  uint data_changed_first,
 
61
                                  uint data_deleted_last,
 
62
                                  const uchar *key_pos,
 
63
                                  uint key_length, int move_length);
 
64
 
 
65
/*
 
66
  @brief Default handler for returing position to new row
 
67
 
 
68
  @note
 
69
    This is only called for non transactional tables and not for block format
 
70
    which is why we use info->state here.
 
71
*/
 
72
 
 
73
MARIA_RECORD_POS _ma_write_init_default(MARIA_HA *info,
 
74
                                        const uchar *record
 
75
                                        __attribute__((unused)))
 
76
{
 
77
  return ((info->s->state.dellink != HA_OFFSET_ERROR &&
 
78
           !info->append_insert_at_end) ?
 
79
          info->s->state.dellink :
 
80
          info->state->data_file_length);
 
81
}
 
82
 
 
83
my_bool _ma_write_abort_default(MARIA_HA *info __attribute__((unused)))
 
84
{
 
85
  return 0;
 
86
}
 
87
 
 
88
 
 
89
/* Write new record to a table */
 
90
 
 
91
int maria_write(MARIA_HA *info, uchar *record)
 
92
{
 
93
  MARIA_SHARE *share= info->s;
 
94
  uint i;
 
95
  int save_errno;
 
96
  MARIA_RECORD_POS filepos;
 
97
  uchar *buff;
 
98
  my_bool lock_tree= share->lock_key_trees;
 
99
  my_bool fatal_error;
 
100
  MARIA_KEYDEF *keyinfo;
 
101
  DBUG_ENTER("maria_write");
 
102
  DBUG_PRINT("enter",("index_file: %d  data_file: %d",
 
103
                      share->kfile.file, info->dfile.file));
 
104
 
 
105
  DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
 
106
                  maria_print_error(info->s, HA_ERR_CRASHED);
 
107
                  DBUG_RETURN(my_errno= HA_ERR_CRASHED););
 
108
  if (share->options & HA_OPTION_READ_ONLY_DATA)
 
109
  {
 
110
    DBUG_RETURN(my_errno=EACCES);
 
111
  }
 
112
  if (_ma_readinfo(info,F_WRLCK,1))
 
113
    DBUG_RETURN(my_errno);
 
114
  dont_break();                         /* Dont allow SIGHUP or SIGINT */
 
115
 
 
116
  if (share->base.reloc == (ha_rows) 1 &&
 
117
      share->base.records == (ha_rows) 1 &&
 
118
      share->state.state.records == (ha_rows) 1)
 
119
  {                                             /* System file */
 
120
    my_errno=HA_ERR_RECORD_FILE_FULL;
 
121
    goto err2;
 
122
  }
 
123
  if (share->state.state.key_file_length >= share->base.margin_key_file_length)
 
124
  {
 
125
    my_errno=HA_ERR_INDEX_FILE_FULL;
 
126
    goto err2;
 
127
  }
 
128
  if (_ma_mark_file_changed(info))
 
129
    goto err2;
 
130
 
 
131
  /* Calculate and check all unique constraints */
 
132
  for (i=0 ; i < share->state.header.uniques ; i++)
 
133
  {
 
134
    if (_ma_check_unique(info,share->uniqueinfo+i,record,
 
135
                         _ma_unique_hash(share->uniqueinfo+i,record),
 
136
                         HA_OFFSET_ERROR))
 
137
      goto err2;
 
138
  }
 
139
 
 
140
  /* Ensure we don't try to restore auto_increment if it doesn't change */
 
141
  info->last_auto_increment= ~(ulonglong) 0;
 
142
 
 
143
  if ((info->opt_flag & OPT_NO_ROWS))
 
144
    filepos= HA_OFFSET_ERROR;
 
145
  else
 
146
  {
 
147
    /*
 
148
      This may either calculate a record or, or write the record and return
 
149
      the record id
 
150
    */
 
151
    if ((filepos= (*share->write_record_init)(info, record)) ==
 
152
        HA_OFFSET_ERROR)
 
153
      goto err2;
 
154
  }
 
155
 
 
156
  /* Write all keys to indextree */
 
157
  buff= info->lastkey_buff2;
 
158
  for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
 
159
  {
 
160
    MARIA_KEY int_key;
 
161
    if (maria_is_key_active(share->state.key_map, i))
 
162
    {
 
163
      my_bool local_lock_tree= (lock_tree &&
 
164
                                !(info->bulk_insert &&
 
165
                                  is_tree_inited(&info->bulk_insert[i])));
 
166
      if (local_lock_tree)
 
167
      {
 
168
        rw_wrlock(&keyinfo->root_lock);
 
169
        keyinfo->version++;
 
170
      }
 
171
      if (keyinfo->flag & HA_FULLTEXT )
 
172
      {
 
173
        if (_ma_ft_add(info,i, buff,record,filepos))
 
174
        {
 
175
          if (local_lock_tree)
 
176
            rw_unlock(&keyinfo->root_lock);
 
177
          DBUG_PRINT("error",("Got error: %d on write",my_errno));
 
178
          goto err;
 
179
        }
 
180
      }
 
181
      else
 
182
      {
 
183
        if (keyinfo->ck_insert(info,
 
184
                               (*keyinfo->make_key)(info, &int_key, i,
 
185
                                                    buff, record, filepos,
 
186
                                                    info->trn->trid)))
 
187
        {
 
188
          if (local_lock_tree)
 
189
            rw_unlock(&keyinfo->root_lock);
 
190
          DBUG_PRINT("error",("Got error: %d on write",my_errno));
 
191
          goto err;
 
192
        }
 
193
      }
 
194
 
 
195
      /* The above changed info->lastkey2. Inform maria_rnext_same(). */
 
196
      info->update&= ~HA_STATE_RNEXT_SAME;
 
197
 
 
198
      if (local_lock_tree)
 
199
        rw_unlock(&keyinfo->root_lock);
 
200
    }
 
201
  }
 
202
  if (share->calc_write_checksum)
 
203
    info->cur_row.checksum= (*share->calc_write_checksum)(info,record);
 
204
  if (filepos != HA_OFFSET_ERROR)
 
205
  {
 
206
    if ((*share->write_record)(info,record))
 
207
      goto err;
 
208
    info->state->checksum+= info->cur_row.checksum;
 
209
  }
 
210
  if (!share->now_transactional)
 
211
  {
 
212
    if (share->base.auto_key != 0)
 
213
    {
 
214
      const HA_KEYSEG *keyseg= share->keyinfo[share->base.auto_key-1].seg;
 
215
      const uchar *key= record + keyseg->start;
 
216
      set_if_bigger(share->state.auto_increment,
 
217
                    ma_retrieve_auto_increment(key, keyseg->type));
 
218
    }
 
219
  }
 
220
  info->state->records++;
 
221
  info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
 
222
                 HA_STATE_ROW_CHANGED);
 
223
  share->state.changed|= STATE_NOT_MOVABLE | STATE_NOT_ZEROFILLED;
 
224
 
 
225
  info->cur_row.lastpos= filepos;
 
226
  (void)(_ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE));
 
227
  if (info->invalidator != 0)
 
228
  {
 
229
    DBUG_PRINT("info", ("invalidator... '%s' (update)",
 
230
                        share->open_file_name));
 
231
    (*info->invalidator)(share->open_file_name);
 
232
    info->invalidator=0;
 
233
  }
 
234
 
 
235
  /*
 
236
    Update status of the table. We need to do so after each row write
 
237
    for the log tables, as we want the new row to become visible to
 
238
    other threads as soon as possible. We don't lock mutex here
 
239
    (as it is required by pthread memory visibility rules) as (1) it's
 
240
    not critical to use outdated share->is_log_table value (2) locking
 
241
    mutex here for every write is too expensive.
 
242
  */
 
243
  if (share->is_log_table)
 
244
    _ma_update_status((void*) info);
 
245
 
 
246
  allow_break();                                /* Allow SIGHUP & SIGINT */
 
247
  DBUG_RETURN(0);
 
248
 
 
249
err:
 
250
  save_errno= my_errno;
 
251
  fatal_error= 0;
 
252
  if (my_errno == HA_ERR_FOUND_DUPP_KEY ||
 
253
      my_errno == HA_ERR_RECORD_FILE_FULL ||
 
254
      my_errno == HA_ERR_NULL_IN_SPATIAL ||
 
255
      my_errno == HA_ERR_OUT_OF_MEM)
 
256
  {
 
257
    if (info->bulk_insert)
 
258
    {
 
259
      uint j;
 
260
      for (j=0 ; j < share->base.keys ; j++)
 
261
        maria_flush_bulk_insert(info, j);
 
262
    }
 
263
    info->errkey= (int) i;
 
264
    /*
 
265
      We delete keys in the reverse order of insertion. This is the order that
 
266
      a rollback would do and is important for CLR_ENDs generated by
 
267
      _ma_ft|ck_delete() and write_record_abort() to work (with any other
 
268
      order they would cause wrong jumps in the chain).
 
269
    */
 
270
    while ( i-- > 0)
 
271
    {
 
272
      if (maria_is_key_active(share->state.key_map, i))
 
273
      {
 
274
        my_bool local_lock_tree= (lock_tree &&
 
275
                                  !(info->bulk_insert &&
 
276
                                    is_tree_inited(&info->bulk_insert[i])));
 
277
        keyinfo= share->keyinfo + i;
 
278
        if (local_lock_tree)
 
279
          rw_wrlock(&keyinfo->root_lock);
 
280
        /**
 
281
           @todo RECOVERY BUG
 
282
           The key deletes below should generate CLR_ENDs
 
283
        */
 
284
        if (keyinfo->flag & HA_FULLTEXT)
 
285
        {
 
286
          if (_ma_ft_del(info,i,buff,record,filepos))
 
287
          {
 
288
            if (local_lock_tree)
 
289
              rw_unlock(&keyinfo->root_lock);
 
290
            break;
 
291
          }
 
292
        }
 
293
        else
 
294
        {
 
295
          MARIA_KEY key;
 
296
          if (_ma_ck_delete(info,
 
297
                            (*keyinfo->make_key)(info, &key, i, buff, record,
 
298
                                                 filepos, info->trn->trid)))
 
299
          {
 
300
            if (local_lock_tree)
 
301
              rw_unlock(&keyinfo->root_lock);
 
302
            break;
 
303
          }
 
304
        }
 
305
        if (local_lock_tree)
 
306
          rw_unlock(&keyinfo->root_lock);
 
307
      }
 
308
    }
 
309
  }
 
310
  else
 
311
    fatal_error= 1;
 
312
 
 
313
  if ((*share->write_record_abort)(info))
 
314
    fatal_error= 1;
 
315
  if (fatal_error)
 
316
  {
 
317
    maria_print_error(info->s, HA_ERR_CRASHED);
 
318
    maria_mark_crashed(info);
 
319
  }
 
320
 
 
321
  info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
 
322
  my_errno=save_errno;
 
323
err2:
 
324
  save_errno=my_errno;
 
325
  DBUG_ASSERT(save_errno);
 
326
  if (!save_errno)
 
327
    save_errno= HA_ERR_INTERNAL_ERROR;          /* Should never happen */
 
328
  DBUG_PRINT("error", ("got error: %d", save_errno));
 
329
  (void)(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
 
330
  allow_break();                        /* Allow SIGHUP & SIGINT */
 
331
  DBUG_RETURN(my_errno=save_errno);
 
332
} /* maria_write */
 
333
 
 
334
 
 
335
/*
 
336
  Write one key to btree
 
337
 
 
338
  TODO
 
339
    Remove this function and have bulk insert change keyinfo->ck_insert
 
340
    to point to the right function
 
341
*/
 
342
 
 
343
my_bool _ma_ck_write(MARIA_HA *info, MARIA_KEY *key)
 
344
{
 
345
  DBUG_ENTER("_ma_ck_write");
 
346
 
 
347
  if (info->bulk_insert &&
 
348
      is_tree_inited(&info->bulk_insert[key->keyinfo->key_nr]))
 
349
  {
 
350
    DBUG_RETURN(_ma_ck_write_tree(info, key));
 
351
  }
 
352
  DBUG_RETURN(_ma_ck_write_btree(info, key));
 
353
} /* _ma_ck_write */
 
354
 
 
355
 
 
356
/**********************************************************************
 
357
  Insert key into btree (normal case)
 
358
**********************************************************************/
 
359
 
 
360
static my_bool _ma_ck_write_btree(MARIA_HA *info, MARIA_KEY *key)
 
361
{
 
362
  int error;
 
363
  MARIA_KEYDEF *keyinfo= key->keyinfo;
 
364
  my_off_t  *root= &info->s->state.key_root[keyinfo->key_nr];
 
365
  DBUG_ENTER("_ma_ck_write_btree");
 
366
 
 
367
  error= _ma_ck_write_btree_with_log(info, key, root,
 
368
                                     keyinfo->write_comp_flag | key->flag);
 
369
  if (info->ft1_to_ft2)
 
370
  {
 
371
    if (!error)
 
372
      error= _ma_ft_convert_to_ft2(info, key);
 
373
    delete_dynamic(info->ft1_to_ft2);
 
374
    my_free((uchar*)info->ft1_to_ft2, MYF(0));
 
375
    info->ft1_to_ft2=0;
 
376
  }
 
377
  DBUG_RETURN(error != 0);
 
378
} /* _ma_ck_write_btree */
 
379
 
 
380
 
 
381
/**
 
382
  @brief Write a key to the b-tree
 
383
 
 
384
  @retval -1   error
 
385
  @retval 0    ok
 
386
*/
 
387
 
 
388
static int _ma_ck_write_btree_with_log(MARIA_HA *info, MARIA_KEY *key,
 
389
                                       my_off_t *root, uint32 comp_flag)
 
390
{
 
391
  MARIA_SHARE *share= info->s;
 
392
  LSN lsn= LSN_IMPOSSIBLE;
 
393
  int error;
 
394
  my_off_t new_root= *root;
 
395
  uchar key_buff[MARIA_MAX_KEY_BUFF];
 
396
  MARIA_KEY org_key;
 
397
  DBUG_ENTER("_ma_ck_write_btree_with_log");
 
398
 
 
399
  LINT_INIT_STRUCT(org_key);
 
400
  if (share->now_transactional)
 
401
  {
 
402
    /* Save original value as the key may change */
 
403
    org_key= *key;
 
404
    memcpy(key_buff, key->data, key->data_length + key->ref_length);
 
405
  }
 
406
 
 
407
  error= _ma_ck_real_write_btree(info, key, &new_root, comp_flag);
 
408
  if (!error && share->now_transactional)
 
409
  {
 
410
    /* Log the original value */
 
411
    *key= org_key;
 
412
    key->data= key_buff;
 
413
    error= _ma_write_undo_key_insert(info, key, root, new_root, &lsn);
 
414
  }
 
415
  else
 
416
  {
 
417
    *root= new_root;
 
418
    _ma_fast_unlock_key_del(info);
 
419
  }
 
420
  _ma_unpin_all_pages_and_finalize_row(info, lsn);
 
421
 
 
422
  DBUG_RETURN(error);
 
423
} /* _ma_ck_write_btree_with_log */
 
424
 
 
425
 
 
426
/**
 
427
  @brief Write a key to the b-tree
 
428
 
 
429
  @retval -1   error
 
430
  @retval 0    ok
 
431
*/
 
432
 
 
433
int _ma_ck_real_write_btree(MARIA_HA *info, MARIA_KEY *key, my_off_t *root,
 
434
                            uint32 comp_flag)
 
435
{
 
436
  int error;
 
437
  DBUG_ENTER("_ma_ck_real_write_btree");
 
438
 
 
439
  /* key_length parameter is used only if comp_flag is SEARCH_FIND */
 
440
  if (*root == HA_OFFSET_ERROR ||
 
441
      (error= w_search(info, comp_flag, key, *root, (my_off_t) 0, (uchar*) 0,
 
442
                       (MARIA_PINNED_PAGE *) 0, (uchar*) 0, 1)) > 0)
 
443
    error= _ma_enlarge_root(info, key, root);
 
444
  DBUG_RETURN(error);
 
445
} /* _ma_ck_real_write_btree */
 
446
 
 
447
 
 
448
/**
 
449
  @brief Make a new root with key as only pointer
 
450
 
 
451
  @retval -1   error
 
452
  @retval 0    ok
 
453
*/
 
454
 
 
455
int _ma_enlarge_root(MARIA_HA *info, MARIA_KEY *key, my_off_t *root)
 
456
{
 
457
  uint t_length, page_flag, nod_flag, page_length;
 
458
  MARIA_KEY_PARAM s_temp;
 
459
  MARIA_SHARE *share= info->s;
 
460
  MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
 
461
  MARIA_KEYDEF *keyinfo= key->keyinfo;
 
462
  int res= 0;
 
463
  DBUG_ENTER("_ma_enlarge_root");
 
464
 
 
465
  nod_flag= (*root != HA_OFFSET_ERROR) ?  share->base.key_reflength : 0;
 
466
  /* Store pointer to prev page if nod */
 
467
  _ma_kpointer(info, info->buff + share->keypage_header, *root);
 
468
  t_length= (*keyinfo->pack_key)(key, nod_flag, (uchar*) 0,
 
469
                                 (uchar*) 0, (uchar*) 0, &s_temp);
 
470
  page_length= share->keypage_header + t_length + nod_flag;
 
471
 
 
472
  bzero(info->buff, share->keypage_header);
 
473
  _ma_store_keynr(share, info->buff, keyinfo->key_nr);
 
474
  _ma_store_page_used(share, info->buff, page_length);
 
475
  page_flag= 0;
 
476
  if (nod_flag)
 
477
    page_flag|= KEYPAGE_FLAG_ISNOD;
 
478
  if (key->flag & (SEARCH_USER_KEY_HAS_TRANSID | SEARCH_PAGE_KEY_HAS_TRANSID))
 
479
    page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
 
480
  _ma_store_keypage_flag(share, info->buff, page_flag);
 
481
  (*keyinfo->store_key)(keyinfo, info->buff + share->keypage_header +
 
482
                        nod_flag, &s_temp);
 
483
 
 
484
  /* Mark that info->buff was used */
 
485
  info->keyread_buff_used= info->page_changed= 1;
 
486
  if ((*root= _ma_new(info, PAGECACHE_PRIORITY_HIGH, &page_link)) ==
 
487
      HA_OFFSET_ERROR)
 
488
    DBUG_RETURN(-1);
 
489
 
 
490
  /*
 
491
    Clear unitialized part of page to avoid valgrind/purify warnings
 
492
    and to get a clean page that is easier to compress and compare with
 
493
    pages generated with redo
 
494
  */
 
495
  bzero(info->buff + page_length, share->block_size - page_length);
 
496
 
 
497
  if (share->now_transactional &&
 
498
      _ma_log_new(info, *root, info->buff, page_length, keyinfo->key_nr, 1))
 
499
    res= -1;
 
500
  if (_ma_write_keypage(info, keyinfo, *root, page_link->write_lock,
 
501
                        PAGECACHE_PRIORITY_HIGH, info->buff))
 
502
    res= -1;
 
503
 
 
504
  DBUG_RETURN(res);
 
505
} /* _ma_enlarge_root */
 
506
 
 
507
 
 
508
/*
 
509
  Search after a position for a key and store it there
 
510
 
 
511
  @return
 
512
  @retval -1   error
 
513
  @retval 0    ok
 
514
  @retval > 0  Key should be stored in higher tree
 
515
*/
 
516
 
 
517
static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key,
 
518
                    my_off_t page, my_off_t father_page, uchar *father_buff,
 
519
                    MARIA_PINNED_PAGE *father_page_link, uchar *father_keypos,
 
520
                    my_bool insert_last)
 
521
{
 
522
  int error,flag;
 
523
  uint page_flag, nod_flag;
 
524
  uchar *temp_buff,*keypos;
 
525
  uchar keybuff[MARIA_MAX_KEY_BUFF];
 
526
  my_bool was_last_key;
 
527
  my_off_t next_page, dup_key_pos;
 
528
  MARIA_PINNED_PAGE *page_link;
 
529
  MARIA_SHARE *share= info->s;
 
530
  MARIA_KEYDEF *keyinfo= key->keyinfo;
 
531
  DBUG_ENTER("w_search");
 
532
  DBUG_PRINT("enter",("page: %ld", (long) page));
 
533
 
 
534
  if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
 
535
                                      MARIA_MAX_KEY_BUFF*2)))
 
536
    DBUG_RETURN(-1);
 
537
  if (!_ma_fetch_keypage(info, keyinfo, page, PAGECACHE_LOCK_WRITE,
 
538
                         DFLT_INIT_HITS, temp_buff, 0, &page_link))
 
539
    goto err;
 
540
 
 
541
  flag= (*keyinfo->bin_search)(key, temp_buff, comp_flag, &keypos,
 
542
                               keybuff, &was_last_key);
 
543
  page_flag= _ma_get_keypage_flag(share, temp_buff);
 
544
  nod_flag=  _ma_test_if_nod(share, temp_buff);
 
545
  if (flag == 0)
 
546
  {
 
547
    MARIA_KEY tmp_key;
 
548
    /* get position to record with duplicated key */
 
549
 
 
550
    tmp_key.keyinfo= keyinfo;
 
551
    tmp_key.data= keybuff;
 
552
 
 
553
    if ((*keyinfo->get_key)(&tmp_key, page_flag, nod_flag, &keypos))
 
554
      dup_key_pos= _ma_row_pos_from_key(&tmp_key);
 
555
    else
 
556
      dup_key_pos= HA_OFFSET_ERROR;
 
557
 
 
558
    if (keyinfo->flag & HA_FULLTEXT)
 
559
    {
 
560
      uint off;
 
561
      int  subkeys;
 
562
 
 
563
      get_key_full_length_rdonly(off, keybuff);
 
564
      subkeys=ft_sintXkorr(keybuff+off);
 
565
      comp_flag=SEARCH_SAME;
 
566
      if (subkeys >= 0)
 
567
      {
 
568
        /* normal word, one-level tree structure */
 
569
        flag=(*keyinfo->bin_search)(key, temp_buff, comp_flag,
 
570
                                    &keypos, keybuff, &was_last_key);
 
571
      }
 
572
      else
 
573
      {
 
574
        /* popular word. two-level tree. going down */
 
575
        my_off_t root=dup_key_pos;
 
576
        keyinfo= &share->ft2_keyinfo;
 
577
        get_key_full_length_rdonly(off, key);
 
578
        key+=off;
 
579
        /* we'll modify key entry 'in vivo' */
 
580
        keypos-= keyinfo->keylength + nod_flag;
 
581
        error= _ma_ck_real_write_btree(info, key, &root, comp_flag);
 
582
        _ma_dpointer(share, keypos+HA_FT_WLEN, root);
 
583
        subkeys--; /* should there be underflow protection ? */
 
584
        DBUG_ASSERT(subkeys < 0);
 
585
        ft_intXstore(keypos, subkeys);
 
586
        if (!error)
 
587
        {
 
588
          page_link->changed= 1;
 
589
          error= _ma_write_keypage(info, keyinfo, page,
 
590
                                   PAGECACHE_LOCK_LEFT_WRITELOCKED,
 
591
                                   DFLT_INIT_HITS, temp_buff);
 
592
        }
 
593
        my_afree((uchar*) temp_buff);
 
594
        DBUG_RETURN(error);
 
595
      }
 
596
    }
 
597
    else /* not HA_FULLTEXT, normal HA_NOSAME key */
 
598
    {
 
599
      DBUG_PRINT("warning", ("Duplicate key"));
 
600
      info->dup_key_pos= dup_key_pos;
 
601
      my_afree((uchar*) temp_buff);
 
602
      my_errno=HA_ERR_FOUND_DUPP_KEY;
 
603
      DBUG_RETURN(-1);
 
604
    }
 
605
  }
 
606
  if (flag == MARIA_FOUND_WRONG_KEY)
 
607
    DBUG_RETURN(-1);
 
608
  if (!was_last_key)
 
609
    insert_last=0;
 
610
  next_page= _ma_kpos(nod_flag,keypos);
 
611
  if (next_page == HA_OFFSET_ERROR ||
 
612
      (error= w_search(info, comp_flag, key, next_page,
 
613
                       page, temp_buff, page_link, keypos, insert_last)) > 0)
 
614
  {
 
615
    error= _ma_insert(info, key, temp_buff, keypos, page, keybuff,
 
616
                      father_page, father_buff, father_page_link,
 
617
                      father_keypos, insert_last);
 
618
    page_link->changed= 1;
 
619
    if (_ma_write_keypage(info, keyinfo, page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
 
620
                          DFLT_INIT_HITS,temp_buff))
 
621
      goto err;
 
622
  }
 
623
  my_afree((uchar*) temp_buff);
 
624
  DBUG_RETURN(error);
 
625
err:
 
626
  my_afree((uchar*) temp_buff);
 
627
  DBUG_PRINT("exit",("Error: %d",my_errno));
 
628
  DBUG_RETURN (-1);
 
629
} /* w_search */
 
630
 
 
631
 
 
632
/*
 
633
  Insert new key.
 
634
 
 
635
  SYNOPSIS
 
636
    _ma_insert()
 
637
    info                        Open table information.
 
638
    keyinfo                     Key definition information.
 
639
    key                         New key
 
640
    anc_buff                    Key page (beginning).
 
641
    key_pos                     Position in key page where to insert.
 
642
    anc_page                    Page number for anc_buff
 
643
    key_buff                    Copy of previous key if keys where packed.
 
644
    father_page                 position of parent key page in file.
 
645
    father_buff                 parent key page for balancing.
 
646
    father_page_link            Link to father page for marking page changed
 
647
    father_key_pos              position in parent key page for balancing.
 
648
    insert_last                 If to append at end of page.
 
649
 
 
650
  DESCRIPTION
 
651
    Insert new key at right of key_pos.
 
652
    Note that caller must save anc_buff
 
653
 
 
654
    This function writes log records for all changed pages
 
655
    (Including anc_buff and father page)
 
656
 
 
657
  RETURN
 
658
    < 0         Error.
 
659
    0           OK
 
660
    1           If key contains key to upper level (from balance page)
 
661
    2           If key contains key to upper level (from split space)
 
662
*/
 
663
 
 
664
int _ma_insert(register MARIA_HA *info, MARIA_KEY *key, uchar *anc_buff,
 
665
               uchar *key_pos, my_off_t anc_page, uchar *key_buff,
 
666
               my_off_t father_page, uchar *father_buff,
 
667
               MARIA_PINNED_PAGE *father_page_link,
 
668
               uchar *father_key_pos, my_bool insert_last)
 
669
{
 
670
  uint a_length, nod_flag, org_anc_length;
 
671
  int t_length;
 
672
  uchar *endpos, *prev_key;
 
673
  MARIA_KEY_PARAM s_temp;
 
674
  MARIA_SHARE *share= info->s;
 
675
  MARIA_KEYDEF *keyinfo= key->keyinfo;
 
676
  DBUG_ENTER("_ma_insert");
 
677
  DBUG_PRINT("enter",("key_pos: 0x%lx", (ulong) key_pos));
 
678
  DBUG_EXECUTE("key", _ma_print_key(DBUG_FILE, key););
 
679
 
 
680
  _ma_get_used_and_nod(share, anc_buff, a_length, nod_flag);
 
681
  org_anc_length= a_length;
 
682
  endpos= anc_buff+ a_length;
 
683
  prev_key= (key_pos == anc_buff + share->keypage_header + nod_flag ?
 
684
             (uchar*) 0 : key_buff);
 
685
  t_length= (*keyinfo->pack_key)(key, nod_flag,
 
686
                                 (key_pos == endpos ? (uchar*) 0 : key_pos),
 
687
                                 prev_key, prev_key, &s_temp);
 
688
#ifndef DBUG_OFF
 
689
  if (prev_key && (keyinfo->flag & (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
 
690
  {
 
691
    DBUG_DUMP("prev_key",(uchar*) prev_key, _ma_keylength(keyinfo,prev_key));
 
692
  }
 
693
  if (keyinfo->flag & HA_PACK_KEY)
 
694
  {
 
695
    DBUG_PRINT("test",("t_length: %d  ref_len: %d",
 
696
                       t_length,s_temp.ref_length));
 
697
    DBUG_PRINT("test",("n_ref_len: %d  n_length: %d  key_pos: 0x%lx",
 
698
                       s_temp.n_ref_length, s_temp.n_length, (long) s_temp.key));
 
699
  }
 
700
#endif
 
701
  if (t_length > 0)
 
702
  {
 
703
    if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
 
704
    {
 
705
      maria_print_error(share, HA_ERR_CRASHED);
 
706
      my_errno=HA_ERR_CRASHED;
 
707
      DBUG_RETURN(-1);
 
708
    }
 
709
    bmove_upp((uchar*) endpos+t_length,(uchar*) endpos,
 
710
              (uint) (endpos-key_pos));
 
711
  }
 
712
  else
 
713
  {
 
714
    if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
 
715
    {
 
716
      maria_print_error(share, HA_ERR_CRASHED);
 
717
      my_errno=HA_ERR_CRASHED;
 
718
      DBUG_RETURN(-1);
 
719
    }
 
720
    bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
 
721
  }
 
722
  (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
 
723
  a_length+=t_length;
 
724
 
 
725
  if (key->flag & (SEARCH_USER_KEY_HAS_TRANSID | SEARCH_PAGE_KEY_HAS_TRANSID))
 
726
    _ma_mark_page_with_transid(share, anc_buff);
 
727
  _ma_store_page_used(share, anc_buff, a_length);
 
728
 
 
729
  /*
 
730
    Check if the new key fits totally into the the page
 
731
    (anc_buff is big enough to contain a full page + one key)
 
732
  */
 
733
  if (a_length <= (uint) keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE)
 
734
  {
 
735
    if (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE - a_length < 32 &&
 
736
        (keyinfo->flag & HA_FULLTEXT) && key_pos == endpos &&
 
737
        share->base.key_reflength <= share->base.rec_reflength &&
 
738
        share->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
 
739
    {
 
740
      /*
 
741
        Normal word. One-level tree. Page is almost full.
 
742
        Let's consider converting.
 
743
        We'll compare 'key' and the first key at anc_buff
 
744
      */
 
745
      const uchar *a= key->data;
 
746
      const uchar *b= anc_buff + share->keypage_header + nod_flag;
 
747
      uint alen, blen, ft2len= share->ft2_keyinfo.keylength;
 
748
      /* the very first key on the page is always unpacked */
 
749
      DBUG_ASSERT((*b & 128) == 0);
 
750
#if HA_FT_MAXLEN >= 127
 
751
      blen= mi_uint2korr(b); b+=2;
 
752
      When you enable this code, as part of the MyISAM->Maria merge of
 
753
ChangeSet@1.2562, 2008-04-09 07:41:40+02:00, serg@janus.mylan +9 -0
 
754
  restore ft2 functionality, fix bugs.
 
755
      Then this will enable two-level fulltext index, which is not totally
 
756
      recoverable yet.
 
757
      So remove this text and inform Guilhem so that he fixes the issue.
 
758
#else
 
759
      blen= *b++;
 
760
#endif
 
761
      get_key_length(alen,a);
 
762
      DBUG_ASSERT(info->ft1_to_ft2==0);
 
763
      if (alen == blen &&
 
764
          ha_compare_text(keyinfo->seg->charset, a, alen,
 
765
                          b, blen, 0, 0) == 0)
 
766
      {
 
767
        /* Yup. converting */
 
768
        info->ft1_to_ft2=(DYNAMIC_ARRAY *)
 
769
          my_malloc(sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
 
770
        my_init_dynamic_array(info->ft1_to_ft2, ft2len, 300, 50);
 
771
 
 
772
        /*
 
773
          Now, adding all keys from the page to dynarray
 
774
          if the page is a leaf (if not keys will be deleted later)
 
775
        */
 
776
        if (!nod_flag)
 
777
        {
 
778
          /*
 
779
            Let's leave the first key on the page, though, because
 
780
            we cannot easily dispatch an empty page here
 
781
          */
 
782
          b+=blen+ft2len+2;
 
783
          for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
 
784
            insert_dynamic(info->ft1_to_ft2, b);
 
785
 
 
786
          /* fixing the page's length - it contains only one key now */
 
787
          _ma_store_page_used(share, anc_buff, share->keypage_header + blen +
 
788
                              ft2len + 2);
 
789
        }
 
790
        /* the rest will be done when we're back from recursion */
 
791
      }
 
792
    }
 
793
    else
 
794
    {
 
795
      if (share->now_transactional &&
 
796
          _ma_log_add(info, anc_page, anc_buff, (uint) (endpos - anc_buff),
 
797
                      key_pos, s_temp.changed_length, t_length, 0))
 
798
        DBUG_RETURN(-1);
 
799
    }
 
800
    DBUG_RETURN(0);                             /* There is room on page */
 
801
  }
 
802
  /* Page is full */
 
803
  if (nod_flag)
 
804
    insert_last=0;
 
805
  /*
 
806
    TODO:
 
807
    Remove 'born_transactional' here.
 
808
    The only reason for having it here is that the current
 
809
    _ma_balance_page_ can't handle variable length keys.
 
810
  */
 
811
  if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
 
812
      father_buff && !insert_last && !info->quick_mode &&
 
813
      !info->s->base.born_transactional)
 
814
  {
 
815
    s_temp.key_pos= key_pos;
 
816
    father_page_link->changed= 1;
 
817
    DBUG_RETURN(_ma_balance_page(info, keyinfo, key, anc_buff, anc_page,
 
818
                                 father_page, father_buff, father_key_pos,
 
819
                                 &s_temp));
 
820
  }
 
821
  DBUG_RETURN(_ma_split_page(info, key, anc_page,
 
822
                             anc_buff, org_anc_length,
 
823
                             key_pos, s_temp.changed_length, t_length,
 
824
                             key_buff, insert_last));
 
825
} /* _ma_insert */
 
826
 
 
827
 
 
828
/**
 
829
  @brief split a full page in two and assign emerging item to key
 
830
 
 
831
  @fn _ma_split_page()
 
832
    info             Maria handler
 
833
    keyinfo          Key handler
 
834
    key              Buffer for middle key
 
835
    split_page       Address on disk for split_buff
 
836
    split_buff       Page buffer for page that should be split
 
837
    org_split_length Original length of split_buff before key was inserted
 
838
    inserted_key_pos Address in buffer where key was inserted
 
839
    changed_length   Number of bytes changed at 'inserted_key_pos'
 
840
    move_length      Number of bytes buffer was moved when key was inserted
 
841
    key_buff         Key buffer to use for temporary storage of key
 
842
    insert_last_key  If we are insert key on rightmost key page
 
843
 
 
844
  @note
 
845
    split_buff is not stored on disk    (caller has to do this)
 
846
 
 
847
  @return
 
848
  @retval 2   ok  (Middle key up from _ma_insert())
 
849
  @retval -1  error
 
850
*/
 
851
 
 
852
int _ma_split_page(MARIA_HA *info, MARIA_KEY *key, my_off_t split_page,
 
853
                   uchar *split_buff,
 
854
                   uint org_split_length,
 
855
                   uchar *inserted_key_pos, uint changed_length,
 
856
                   int move_length,
 
857
                   uchar *key_buff, my_bool insert_last_key)
 
858
{
 
859
  uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
 
860
  uint page_length, split_length, page_flag;
 
861
  uchar *key_pos,*pos, *after_key, *new_buff;
 
862
  my_off_t new_pos;
 
863
  MARIA_KEY_PARAM s_temp;
 
864
  MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
 
865
  MARIA_SHARE *share= info->s;
 
866
  MARIA_KEYDEF *keyinfo= key->keyinfo;
 
867
  MARIA_KEY tmp_key;
 
868
  int res;
 
869
  DBUG_ENTER("_ma_split_page");
 
870
 
 
871
  LINT_INIT(after_key);
 
872
  DBUG_DUMP("buff", split_buff, _ma_get_page_used(share, split_buff));
 
873
 
 
874
  info->page_changed=1;                 /* Info->buff is used */
 
875
  info->keyread_buff_used=1;
 
876
  new_buff= info->buff;
 
877
  page_flag= _ma_get_keypage_flag(share, split_buff);
 
878
  nod_flag=  _ma_test_if_nod(share, split_buff);
 
879
  key_ref_length= share->keypage_header + nod_flag;
 
880
 
 
881
  tmp_key.data=   key_buff;
 
882
  tmp_key.keyinfo= keyinfo;
 
883
  if (insert_last_key)
 
884
    key_pos= _ma_find_last_pos(info, &tmp_key, split_buff, &after_key);
 
885
  else
 
886
    key_pos= _ma_find_half_pos(info, &tmp_key, nod_flag, split_buff,
 
887
                               &after_key);
 
888
  if (!key_pos)
 
889
    DBUG_RETURN(-1);
 
890
 
 
891
  key_length= tmp_key.data_length + tmp_key.ref_length;
 
892
  split_length= (uint) (key_pos - split_buff);
 
893
  a_length= _ma_get_page_used(share, split_buff);
 
894
  _ma_store_page_used(share, split_buff, split_length);
 
895
 
 
896
  key_pos=after_key;
 
897
  if (nod_flag)
 
898
  {
 
899
    DBUG_PRINT("test",("Splitting nod"));
 
900
    pos=key_pos-nod_flag;
 
901
    memcpy((uchar*) new_buff + share->keypage_header, (uchar*) pos,
 
902
           (size_t) nod_flag);
 
903
  }
 
904
 
 
905
  /* Move middle item to key and pointer to new page */
 
906
  if ((new_pos= _ma_new(info, PAGECACHE_PRIORITY_HIGH, &page_link)) ==
 
907
      HA_OFFSET_ERROR)
 
908
    DBUG_RETURN(-1);
 
909
 
 
910
  _ma_copy_key(key, &tmp_key);
 
911
  _ma_kpointer(info, key->data + key_length, new_pos);
 
912
 
 
913
  /* Store new page */
 
914
  if (!(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag, &key_pos))
 
915
    DBUG_RETURN(-1);
 
916
 
 
917
  t_length=(*keyinfo->pack_key)(&tmp_key, nod_flag, (uchar *) 0,
 
918
                                (uchar*) 0, (uchar*) 0, &s_temp);
 
919
  length=(uint) ((split_buff + a_length) - key_pos);
 
920
  memcpy((uchar*) new_buff+key_ref_length+t_length,(uchar*) key_pos,
 
921
         (size_t) length);
 
922
  (*keyinfo->store_key)(keyinfo,new_buff+key_ref_length,&s_temp);
 
923
  page_length= length + t_length + key_ref_length;
 
924
 
 
925
  bzero(new_buff, share->keypage_header);
 
926
  /* Copy KEYFLAG_FLAG_ISNODE and KEYPAGE_FLAG_HAS_TRANSID from parent page */
 
927
  _ma_store_keypage_flag(share, new_buff, page_flag);
 
928
  _ma_store_page_used(share, new_buff, page_length);
 
929
  /* Copy key number */
 
930
  new_buff[share->keypage_header - KEYPAGE_USED_SIZE - KEYPAGE_KEYID_SIZE -
 
931
             KEYPAGE_FLAG_SIZE]=
 
932
    split_buff[share->keypage_header - KEYPAGE_USED_SIZE -
 
933
               KEYPAGE_KEYID_SIZE - KEYPAGE_FLAG_SIZE];
 
934
 
 
935
  res= 2;                                       /* Middle key up */
 
936
  if (share->now_transactional &&
 
937
      _ma_log_new(info, new_pos, new_buff, page_length, keyinfo->key_nr, 0))
 
938
    res= -1;
 
939
  bzero(new_buff + page_length, share->block_size - page_length);
 
940
 
 
941
  if (_ma_write_keypage(info, keyinfo, new_pos, page_link->write_lock,
 
942
                        DFLT_INIT_HITS, new_buff))
 
943
    res= -1;
 
944
 
 
945
  /* Save changes to split pages */
 
946
  if (share->now_transactional &&
 
947
      _ma_log_split(info, split_page, split_buff, org_split_length,
 
948
                    split_length,
 
949
                    inserted_key_pos, changed_length, move_length,
 
950
                    KEY_OP_NONE, (uchar*) 0, 0, 0))
 
951
    res= -1;
 
952
 
 
953
  DBUG_DUMP_KEY("middle_key", key);
 
954
  DBUG_RETURN(res);
 
955
} /* _ma_split_page */
 
956
 
 
957
 
 
958
/*
 
959
  Calculate how to much to move to split a page in two
 
960
 
 
961
  Returns pointer to start of key.
 
962
  key will contain the key.
 
963
  return_key_length will contain the length of key
 
964
  after_key will contain the position to where the next key starts
 
965
*/
 
966
 
 
967
uchar *_ma_find_half_pos(MARIA_HA *info, MARIA_KEY *key, uint nod_flag,
 
968
                         uchar *page, uchar **after_key)
 
969
{
 
970
  uint keys, length, key_ref_length, page_flag;
 
971
  uchar *end,*lastpos;
 
972
  MARIA_SHARE *share= info->s;
 
973
  MARIA_KEYDEF *keyinfo= key->keyinfo;
 
974
  DBUG_ENTER("_ma_find_half_pos");
 
975
 
 
976
  key_ref_length= share->keypage_header + nod_flag;
 
977
  page_flag= _ma_get_keypage_flag(share, page);
 
978
  length= _ma_get_page_used(share, page) - key_ref_length;
 
979
  page+= key_ref_length;                        /* Point to first key */
 
980
  if (!(keyinfo->flag &
 
981
        (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
 
982
         HA_BINARY_PACK_KEY)) && !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
 
983
  {
 
984
    key_ref_length=   keyinfo->keylength+nod_flag;
 
985
    key->data_length= keyinfo->keylength - info->s->rec_reflength;
 
986
    key->ref_length=  info->s->rec_reflength;
 
987
    key->flag= 0;
 
988
    keys=length/(key_ref_length*2);
 
989
    end=page+keys*key_ref_length;
 
990
    *after_key=end+key_ref_length;
 
991
    memcpy(key->data, end, key_ref_length);
 
992
    DBUG_RETURN(end);
 
993
  }
 
994
 
 
995
  end=page+length/2-key_ref_length;             /* This is aprox. half */
 
996
  key->data[0]= 0;                               /* Safety */
 
997
  do
 
998
  {
 
999
    lastpos=page;
 
1000
    if (!(length= (*keyinfo->get_key)(key, page_flag, nod_flag, &page)))
 
1001
      DBUG_RETURN(0);
 
1002
  } while (page < end);
 
1003
  *after_key= page;
 
1004
  DBUG_PRINT("exit",("returns: 0x%lx  page: 0x%lx  half: 0x%lx",
 
1005
                     (long) lastpos, (long) page, (long) end));
 
1006
  DBUG_RETURN(lastpos);
 
1007
} /* _ma_find_half_pos */
 
1008
 
 
1009
 
 
1010
/**
 
1011
  Find second to last key on leaf page
 
1012
 
 
1013
  @notes
 
1014
  Used to split buffer at last key.  In this case the next to last
 
1015
  key will be moved to parent page and last key will be on it's own page.
 
1016
  
 
1017
  @TODO
 
1018
  Add one argument for 'last key value' to get_key so that one can
 
1019
  do the loop without having to copy the found key the whole time
 
1020
 
 
1021
  @return
 
1022
  @retval Pointer to the start of the key before the last key
 
1023
  @retval int_key will contain the last key
 
1024
*/
 
1025
 
 
1026
static uchar *_ma_find_last_pos(MARIA_HA *info, MARIA_KEY *int_key,
 
1027
                                uchar *page, uchar **after_key)
 
1028
{
 
1029
  uint keys, length, key_ref_length, page_flag;
 
1030
  uint last_data_length, last_ref_length;
 
1031
  uchar *key, *end, *lastpos,*prevpos;
 
1032
  uchar key_buff[MARIA_MAX_KEY_BUFF];
 
1033
  MARIA_SHARE *share= info->s;
 
1034
  MARIA_KEYDEF *keyinfo= int_key->keyinfo;
 
1035
  DBUG_ENTER("_ma_find_last_pos");
 
1036
 
 
1037
  key_ref_length= share->keypage_header;
 
1038
  page_flag= _ma_get_keypage_flag(share, page);
 
1039
  length= _ma_get_page_used(share, page) - key_ref_length;
 
1040
  page+=key_ref_length;
 
1041
  if (!(keyinfo->flag &
 
1042
        (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
 
1043
         HA_BINARY_PACK_KEY)) && !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
 
1044
  {
 
1045
    keys= length / keyinfo->keylength - 2;
 
1046
    length= keyinfo->keylength;
 
1047
    int_key->data_length= length - info->s->rec_reflength;
 
1048
    int_key->ref_length=  info->s->rec_reflength;
 
1049
    int_key->flag= 0;
 
1050
    end=page+keys*length;
 
1051
    *after_key=end+length;
 
1052
    memcpy(int_key->data, end, length);
 
1053
    DBUG_RETURN(end);
 
1054
  }
 
1055
 
 
1056
  LINT_INIT(prevpos);
 
1057
  LINT_INIT(last_data_length);
 
1058
  LINT_INIT(last_ref_length);
 
1059
  end=page+length-key_ref_length;
 
1060
  key= int_key->data;
 
1061
 
 
1062
  length=0;
 
1063
  lastpos=page;
 
1064
  int_key->data= key_buff;
 
1065
  key_buff[0]= 0;                               /* Safety */
 
1066
 
 
1067
  while (page < end)
 
1068
  {
 
1069
    prevpos=lastpos; lastpos=page;
 
1070
    last_data_length= int_key->data_length;
 
1071
    last_ref_length=  int_key->ref_length;
 
1072
    memcpy(key, key_buff, length);              /* previous key */
 
1073
    if (!(length=(*keyinfo->get_key)(int_key, page_flag, 0, &page)))
 
1074
    {
 
1075
      maria_print_error(keyinfo->share, HA_ERR_CRASHED);
 
1076
      my_errno=HA_ERR_CRASHED;
 
1077
      DBUG_RETURN(0);
 
1078
    }
 
1079
  }
 
1080
  int_key->data=   key;
 
1081
  int_key->data_length= last_data_length;
 
1082
  int_key->ref_length=  last_ref_length;
 
1083
 
 
1084
  *after_key=lastpos;
 
1085
  DBUG_PRINT("exit",("returns: 0x%lx  page: 0x%lx  end: 0x%lx",
 
1086
                     (long) prevpos,(long) page,(long) end));
 
1087
  DBUG_RETURN(prevpos);
 
1088
} /* _ma_find_last_pos */
 
1089
 
 
1090
 
 
1091
/**
 
1092
  @brief Balance page with static size keys with page on right/left
 
1093
 
 
1094
  @param key    Middle key will be stored here
 
1095
 
 
1096
  @notes
 
1097
    Father_buff will always be changed
 
1098
    Caller must handle saving of curr_buff
 
1099
 
 
1100
  @return
 
1101
  @retval  0   Balance was done (father buff is saved)
 
1102
  @retval  1   Middle key up    (father buff is not saved)
 
1103
  @retval  -1  Error
 
1104
*/
 
1105
 
 
1106
static int _ma_balance_page(register MARIA_HA *info, MARIA_KEYDEF *keyinfo,
 
1107
                            MARIA_KEY *key, uchar *curr_buff,
 
1108
                            my_off_t curr_page,
 
1109
                            my_off_t father_page, uchar *father_buff,
 
1110
                            uchar *father_key_pos, MARIA_KEY_PARAM *s_temp)
 
1111
{
 
1112
  MARIA_PINNED_PAGE *next_page_link;
 
1113
  MARIA_PINNED_PAGE tmp_page_link, *new_page_link= &tmp_page_link;
 
1114
  MARIA_SHARE *share= info->s;
 
1115
  my_bool right;
 
1116
  uint k_length,father_length,father_keylength,nod_flag,curr_keylength;
 
1117
  uint right_length,left_length,new_right_length,new_left_length,extra_length;
 
1118
  uint keys, tmp_length, extra_buff_length;
 
1119
  uchar *pos,*buff,*extra_buff, *parting_key;
 
1120
  my_off_t next_page,new_pos;
 
1121
  uchar tmp_part_key[MARIA_MAX_KEY_BUFF];
 
1122
  DBUG_ENTER("_ma_balance_page");
 
1123
 
 
1124
  k_length=keyinfo->keylength;
 
1125
  father_length= _ma_get_page_used(share, father_buff);
 
1126
  father_keylength= k_length + share->base.key_reflength;
 
1127
  nod_flag= _ma_test_if_nod(share, curr_buff);
 
1128
  curr_keylength=k_length+nod_flag;
 
1129
  info->page_changed=1;
 
1130
 
 
1131
  if ((father_key_pos != father_buff+father_length &&
 
1132
       (info->state->records & 1)) ||
 
1133
      father_key_pos == father_buff+ share->keypage_header +
 
1134
      share->base.key_reflength)
 
1135
  {
 
1136
    right=1;
 
1137
    next_page= _ma_kpos(share->base.key_reflength,
 
1138
                        father_key_pos+father_keylength);
 
1139
    buff=info->buff;
 
1140
    DBUG_PRINT("info", ("use right page: %lu", (ulong) next_page));
 
1141
  }
 
1142
  else
 
1143
  {
 
1144
    right=0;
 
1145
    father_key_pos-=father_keylength;
 
1146
    next_page= _ma_kpos(share->base.key_reflength,father_key_pos);
 
1147
    /* Move curr_buff so that it's on the left */
 
1148
    buff= curr_buff;
 
1149
    curr_buff= info->buff;
 
1150
    DBUG_PRINT("info", ("use left page: %lu", (ulong) next_page));
 
1151
  }                                     /* father_key_pos ptr to parting key */
 
1152
 
 
1153
  if (!_ma_fetch_keypage(info,keyinfo, next_page, PAGECACHE_LOCK_WRITE,
 
1154
                         DFLT_INIT_HITS, info->buff, 0, &next_page_link))
 
1155
    goto err;
 
1156
  next_page_link->changed= 1;
 
1157
  DBUG_DUMP("next", info->buff, _ma_get_page_used(share, info->buff));
 
1158
 
 
1159
  /* Test if there is room to share keys */
 
1160
  left_length= _ma_get_page_used(share, curr_buff);
 
1161
  right_length= _ma_get_page_used(share, buff);
 
1162
  keys= ((left_length+right_length-share->keypage_header*2-nod_flag*2)/
 
1163
         curr_keylength);
 
1164
 
 
1165
  if ((right ? right_length : left_length) + curr_keylength <=
 
1166
      (uint) keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE)
 
1167
  {
 
1168
    /* Enough space to hold all keys in the two buffers ; Balance bufferts */
 
1169
    new_left_length= share->keypage_header+nod_flag+(keys/2)*curr_keylength;
 
1170
    new_right_length=share->keypage_header+nod_flag+(((keys+1)/2)*
 
1171
                                                       curr_keylength);
 
1172
    _ma_store_page_used(share, curr_buff, new_left_length);
 
1173
    _ma_store_page_used(share, buff, new_right_length);
 
1174
 
 
1175
    DBUG_PRINT("info", ("left_length: %u -> %u  right_length: %u -> %u",
 
1176
                        left_length, new_left_length,
 
1177
                        right_length, new_right_length));
 
1178
    if (left_length < new_left_length)
 
1179
    {
 
1180
      uint length;
 
1181
      DBUG_PRINT("info", ("move keys to end of buff"));
 
1182
 
 
1183
      /* Move keys buff -> curr_buff */
 
1184
      pos=curr_buff+left_length;
 
1185
      memcpy(pos,father_key_pos, (size_t) k_length);
 
1186
      memcpy(pos+k_length, buff + share->keypage_header,
 
1187
             (size_t) (length=new_left_length - left_length - k_length));
 
1188
      pos= buff + share->keypage_header + length;
 
1189
      memcpy(father_key_pos, pos, (size_t) k_length);
 
1190
      bmove(buff + share->keypage_header, pos + k_length, new_right_length);
 
1191
 
 
1192
      if (share->now_transactional)
 
1193
      {
 
1194
        if (right)
 
1195
        {
 
1196
          /*
 
1197
            Log changes to page on left
 
1198
            The original page is on the left and stored in curr_buff
 
1199
            We have on the page the newly inserted key and data
 
1200
            from buff added last on the page
 
1201
          */
 
1202
          if (_ma_log_split(info, curr_page, curr_buff,
 
1203
                            left_length - s_temp->move_length,
 
1204
                            new_left_length,
 
1205
                            s_temp->key_pos, s_temp->changed_length,
 
1206
                            s_temp->move_length,
 
1207
                            KEY_OP_ADD_SUFFIX,
 
1208
                            curr_buff + left_length,
 
1209
                            new_left_length - left_length,
 
1210
                            new_left_length - left_length+ k_length))
 
1211
            goto err;
 
1212
          /*
 
1213
            Log changes to page on right
 
1214
            This contains the original data with some keys deleted from
 
1215
            start of page
 
1216
          */
 
1217
          if (_ma_log_prefix(info, next_page, buff, 0,
 
1218
                             ((int) new_right_length - (int) right_length)))
 
1219
            goto err;
 
1220
        }
 
1221
        else
 
1222
        {
 
1223
          /*
 
1224
            Log changes to page on right (the original page) which is in buff
 
1225
            Data is removed from start of page
 
1226
            The inserted key may be in buff or moved to curr_buff
 
1227
          */
 
1228
          if (_ma_log_del_prefix(info, curr_page, buff,
 
1229
                                 right_length - s_temp->changed_length,
 
1230
                                 new_right_length,
 
1231
                                 s_temp->key_pos, s_temp->changed_length,
 
1232
                                 s_temp->move_length))
 
1233
            goto err;
 
1234
          /*
 
1235
            Log changes to page on left, which has new data added last
 
1236
          */
 
1237
          if (_ma_log_suffix(info, next_page, curr_buff,
 
1238
                             left_length, new_left_length))
 
1239
            goto err;
 
1240
        }
 
1241
      }
 
1242
    }
 
1243
    else
 
1244
    {
 
1245
      uint length;
 
1246
      DBUG_PRINT("info", ("move keys to start of buff"));
 
1247
 
 
1248
      bmove_upp(buff + new_right_length, buff + right_length,
 
1249
                right_length - share->keypage_header);
 
1250
      length= new_right_length -right_length - k_length;
 
1251
      memcpy(buff + share->keypage_header + length, father_key_pos,
 
1252
             (size_t) k_length);
 
1253
      pos=curr_buff+new_left_length;
 
1254
      memcpy(father_key_pos, pos, (size_t) k_length);
 
1255
      memcpy(buff + share->keypage_header, pos+k_length, (size_t) length);
 
1256
 
 
1257
      if (share->now_transactional)
 
1258
      {
 
1259
        if (right)
 
1260
        {
 
1261
          /*
 
1262
            Log changes to page on left
 
1263
            The original page is on the left and stored in curr_buff
 
1264
            The page is shortened from end and the key may be on the page
 
1265
          */
 
1266
          if (_ma_log_split(info, curr_page, curr_buff,
 
1267
                            left_length - s_temp->move_length,
 
1268
                            new_left_length,
 
1269
                            s_temp->key_pos, s_temp->changed_length,
 
1270
                            s_temp->move_length,
 
1271
                            KEY_OP_NONE, (uchar*) 0, 0, 0))
 
1272
            goto err;
 
1273
          /*
 
1274
            Log changes to page on right
 
1275
            This contains the original data, with some data from cur_buff
 
1276
            added first
 
1277
          */
 
1278
          if (_ma_log_prefix(info, next_page, buff,
 
1279
                             (uint) (new_right_length - right_length),
 
1280
                             (int) (new_right_length - right_length)))
 
1281
            goto err;
 
1282
        }
 
1283
        else
 
1284
        {
 
1285
          /*
 
1286
            Log changes to page on right (the original page) which is in buff
 
1287
            We have on the page the newly inserted key and data
 
1288
            from buff added first on the page
 
1289
          */
 
1290
          uint diff_length= new_right_length - right_length;
 
1291
          if (_ma_log_split(info, curr_page, buff,
 
1292
                            left_length - s_temp->move_length,
 
1293
                            new_right_length,
 
1294
                            s_temp->key_pos + diff_length,
 
1295
                            s_temp->changed_length,
 
1296
                            s_temp->move_length,
 
1297
                            KEY_OP_ADD_PREFIX,
 
1298
                            buff + share->keypage_header,
 
1299
                            diff_length, diff_length + k_length))
 
1300
            goto err;
 
1301
          /*
 
1302
            Log changes to page on left, which is shortened from end
 
1303
          */
 
1304
          if (_ma_log_suffix(info, next_page, curr_buff,
 
1305
                             left_length, new_left_length))
 
1306
            goto err;
 
1307
        }
 
1308
      }
 
1309
    }
 
1310
 
 
1311
    /* Log changes to father (one level up) page */
 
1312
 
 
1313
    if (share->now_transactional &&
 
1314
        _ma_log_change(info, father_page, father_buff, father_key_pos,
 
1315
                       k_length))
 
1316
      goto err;
 
1317
 
 
1318
    /*
 
1319
      next_page_link->changed is marked as true above and fathers
 
1320
      page_link->changed is marked as true in caller
 
1321
    */
 
1322
    if (_ma_write_keypage(info, keyinfo, next_page,
 
1323
                          PAGECACHE_LOCK_LEFT_WRITELOCKED,
 
1324
                          DFLT_INIT_HITS, info->buff) ||
 
1325
        _ma_write_keypage(info, keyinfo, father_page,
 
1326
                          PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS,
 
1327
                          father_buff))
 
1328
      goto err;
 
1329
    DBUG_RETURN(0);
 
1330
  }
 
1331
 
 
1332
  /* curr_buff[] and buff[] are full, lets split and make new nod */
 
1333
 
 
1334
  extra_buff= info->buff+share->base.max_key_block_length;
 
1335
  new_left_length= new_right_length= (share->keypage_header + nod_flag +
 
1336
                                      (keys+1) / 3 * curr_keylength);
 
1337
  /*
 
1338
    5 is the minum number of keys we can have here. This comes from
 
1339
    the fact that each full page can store at least 2 keys and in this case
 
1340
    we have a 'split' key, ie 2+2+1 = 5
 
1341
  */
 
1342
  if (keys == 5)                                /* Too few keys to balance */
 
1343
    new_left_length-=curr_keylength;
 
1344
  extra_length= (nod_flag + left_length + right_length -
 
1345
                 new_left_length - new_right_length - curr_keylength);
 
1346
  extra_buff_length= extra_length + share->keypage_header;
 
1347
  DBUG_PRINT("info",("left_length: %d  right_length: %d  new_left_length: %d  new_right_length: %d  extra_length: %d",
 
1348
                     left_length, right_length,
 
1349
                     new_left_length, new_right_length,
 
1350
                     extra_length));
 
1351
  _ma_store_page_used(share, curr_buff, new_left_length);
 
1352
  _ma_store_page_used(share, buff, new_right_length);
 
1353
 
 
1354
  bzero(extra_buff, share->keypage_header);
 
1355
  if (nod_flag)
 
1356
    _ma_store_keypage_flag(share, extra_buff, KEYPAGE_FLAG_ISNOD);
 
1357
  /* Copy key number */
 
1358
  extra_buff[share->keypage_header - KEYPAGE_USED_SIZE - KEYPAGE_KEYID_SIZE -
 
1359
             KEYPAGE_FLAG_SIZE]=
 
1360
    buff[share->keypage_header - KEYPAGE_USED_SIZE - KEYPAGE_KEYID_SIZE -
 
1361
         KEYPAGE_FLAG_SIZE];
 
1362
  _ma_store_page_used(share, extra_buff, extra_buff_length);
 
1363
 
 
1364
  /* move first largest keys to new page  */
 
1365
  pos=buff+right_length-extra_length;
 
1366
  memcpy(extra_buff + share->keypage_header, pos, extra_length);
 
1367
  /* Zero old data from buffer */
 
1368
  bzero(extra_buff + extra_buff_length,
 
1369
        share->block_size - extra_buff_length);
 
1370
 
 
1371
  /* Save new parting key between buff and extra_buff */
 
1372
  memcpy(tmp_part_key, pos-k_length,k_length);
 
1373
  /* Make place for new keys */
 
1374
  bmove_upp(buff+ new_right_length, pos - k_length,
 
1375
            right_length - extra_length - k_length - share->keypage_header);
 
1376
  /* Copy keys from left page */
 
1377
  pos= curr_buff+new_left_length;
 
1378
  memcpy(buff + share->keypage_header, pos + k_length,
 
1379
         (size_t) (tmp_length= left_length - new_left_length - k_length));
 
1380
  /* Copy old parting key */
 
1381
  parting_key= buff + share->keypage_header + tmp_length;
 
1382
  memcpy(parting_key, father_key_pos, (size_t) k_length);
 
1383
 
 
1384
  /* Move new parting keys up to caller */
 
1385
  memcpy((right ? key->data : father_key_pos),pos,(size_t) k_length);
 
1386
  memcpy((right ? father_key_pos : key->data),tmp_part_key, k_length);
 
1387
 
 
1388
  if ((new_pos= _ma_new(info, DFLT_INIT_HITS, &new_page_link))
 
1389
      == HA_OFFSET_ERROR)
 
1390
    goto err;
 
1391
  _ma_kpointer(info,key->data+k_length,new_pos);
 
1392
  /* This is safe as long we are using not keys with transid */
 
1393
  key->data_length= k_length - info->s->rec_reflength;
 
1394
  key->ref_length= info->s->rec_reflength;
 
1395
 
 
1396
  if (share->now_transactional)
 
1397
  {
 
1398
    if (right)
 
1399
    {
 
1400
      /*
 
1401
        Page order according to key values:
 
1402
        orignal_page (curr_buff), next_page (buff), extra_buff
 
1403
 
 
1404
        cur_buff is shortened,
 
1405
        buff is getting new keys at start and shortened from end.
 
1406
        extra_buff is new page
 
1407
 
 
1408
        Note that extra_buff (largest key parts) will be stored at the
 
1409
        place of the original 'right' page (next_page) and right page (buff)
 
1410
        will be stored at new_pos.
 
1411
 
 
1412
        This makes the log entries smaller as right_page contains all
 
1413
        data to generate the data extra_buff
 
1414
      */
 
1415
 
 
1416
      /*
 
1417
        Log changes to page on left (page shortened page at end)
 
1418
      */
 
1419
      if (_ma_log_split(info, curr_page, curr_buff,
 
1420
                        left_length - s_temp->move_length, new_left_length,
 
1421
                        s_temp->key_pos, s_temp->changed_length,
 
1422
                        s_temp->move_length,
 
1423
                        KEY_OP_NONE, (uchar*) 0, 0, 0))
 
1424
        goto err;
 
1425
      /*
 
1426
        Log changes to right page (stored at next page)
 
1427
        This contains the last 'extra_buff' from 'buff'
 
1428
      */
 
1429
      if (_ma_log_prefix(info, next_page, extra_buff,
 
1430
                         0, (int) (extra_buff_length - right_length)))
 
1431
        goto err;
 
1432
 
 
1433
      /*
 
1434
        Log changes to middle page, which is stored at the new page
 
1435
        position
 
1436
      */
 
1437
      if (_ma_log_new(info, new_pos, buff, new_right_length,
 
1438
                      keyinfo->key_nr, 0))
 
1439
        goto err;
 
1440
    }
 
1441
    else
 
1442
    {
 
1443
      /*
 
1444
        Log changes to page on right (the original page) which is in buff
 
1445
        This contains the original data, with some data from curr_buff
 
1446
        added first and shortened at end
 
1447
      */
 
1448
      int data_added_first= left_length - new_left_length;
 
1449
      if (_ma_log_key_middle(info, curr_page, buff,
 
1450
                             new_right_length,
 
1451
                             data_added_first,
 
1452
                             data_added_first,
 
1453
                             extra_length,
 
1454
                             s_temp->key_pos,
 
1455
                             s_temp->changed_length,
 
1456
                             s_temp->move_length))
 
1457
        goto err;
 
1458
 
 
1459
      /* Log changes to page on left, which is shortened from end */
 
1460
      if (_ma_log_suffix(info, next_page, curr_buff,
 
1461
                         left_length, new_left_length))
 
1462
        goto err;
 
1463
 
 
1464
      /* Log change to rightmost (new) page */
 
1465
      if (_ma_log_new(info, new_pos, extra_buff,
 
1466
                      extra_buff_length, keyinfo->key_nr, 0))
 
1467
        goto err;
 
1468
    }
 
1469
 
 
1470
    /* Log changes to father (one level up) page */
 
1471
    if (share->now_transactional &&
 
1472
        _ma_log_change(info, father_page, father_buff, father_key_pos,
 
1473
                       k_length))
 
1474
      goto err;
 
1475
  }
 
1476
 
 
1477
  if (_ma_write_keypage(info, keyinfo, (right ? new_pos : next_page),
 
1478
                        (right ? new_page_link->write_lock :
 
1479
                         PAGECACHE_LOCK_LEFT_WRITELOCKED),
 
1480
                        DFLT_INIT_HITS, info->buff) ||
 
1481
      _ma_write_keypage(info, keyinfo, (right ? next_page : new_pos),
 
1482
                        (!right ? new_page_link->write_lock :
 
1483
                         PAGECACHE_LOCK_LEFT_WRITELOCKED),
 
1484
                        DFLT_INIT_HITS, extra_buff))
 
1485
    goto err;
 
1486
 
 
1487
  DBUG_RETURN(1);                               /* Middle key up */
 
1488
 
 
1489
err:
 
1490
  DBUG_RETURN(-1);
 
1491
} /* _ma_balance_page */
 
1492
 
 
1493
 
 
1494
/**********************************************************************
 
1495
 *                Bulk insert code                                    *
 
1496
 **********************************************************************/
 
1497
 
 
1498
typedef struct {
 
1499
  MARIA_HA *info;
 
1500
  uint keynr;
 
1501
} bulk_insert_param;
 
1502
 
 
1503
 
 
1504
static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key)
 
1505
{
 
1506
  my_bool error;
 
1507
  uint keynr= key->keyinfo->key_nr;
 
1508
  DBUG_ENTER("_ma_ck_write_tree");
 
1509
 
 
1510
  /* Store ref_length as this is always constant */
 
1511
  info->bulk_insert_ref_length= key->ref_length;
 
1512
  error= tree_insert(&info->bulk_insert[keynr], key->data,
 
1513
                     key->data_length + key->ref_length,
 
1514
                     info->bulk_insert[keynr].custom_arg) == 0;
 
1515
  DBUG_RETURN(error);
 
1516
} /* _ma_ck_write_tree */
 
1517
 
 
1518
 
 
1519
/* typeof(_ma_keys_compare)=qsort_cmp2 */
 
1520
 
 
1521
static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
 
1522
{
 
1523
  uint not_used[2];
 
1524
  return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
 
1525
                    key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
 
1526
                    not_used);
 
1527
}
 
1528
 
 
1529
 
 
1530
static int keys_free(uchar *key, TREE_FREE mode, bulk_insert_param *param)
 
1531
{
 
1532
  /*
 
1533
    Probably I can use info->lastkey here, but I'm not sure,
 
1534
    and to be safe I'd better use local lastkey.
 
1535
  */
 
1536
  MARIA_SHARE *share= param->info->s;
 
1537
  uchar lastkey[MARIA_MAX_KEY_BUFF];
 
1538
  uint keylen;
 
1539
  MARIA_KEYDEF *keyinfo= share->keyinfo + param->keynr;
 
1540
  MARIA_KEY tmp_key;
 
1541
 
 
1542
  switch (mode) {
 
1543
  case free_init:
 
1544
    if (share->lock_key_trees)
 
1545
    {
 
1546
      rw_wrlock(&keyinfo->root_lock);
 
1547
      keyinfo->version++;
 
1548
    }
 
1549
    return 0;
 
1550
  case free_free:
 
1551
    /* Note: keylen doesn't contain transid lengths */
 
1552
    keylen= _ma_keylength(keyinfo, key);
 
1553
    tmp_key.data=        lastkey;
 
1554
    tmp_key.keyinfo=     keyinfo;
 
1555
    tmp_key.data_length= keylen - share->rec_reflength;
 
1556
    tmp_key.ref_length=  param->info->bulk_insert_ref_length;
 
1557
    tmp_key.flag= (param->info->bulk_insert_ref_length ==
 
1558
                   share->rec_reflength ? 0 : SEARCH_USER_KEY_HAS_TRANSID);
 
1559
    /*
 
1560
      We have to copy key as ma_ck_write_btree may need the buffer for
 
1561
      copying middle key up if tree is growing
 
1562
    */
 
1563
    memcpy(lastkey, key, tmp_key.data_length + tmp_key.ref_length);
 
1564
    return _ma_ck_write_btree(param->info, &tmp_key);
 
1565
  case free_end:
 
1566
    if (share->lock_key_trees)
 
1567
      rw_unlock(&keyinfo->root_lock);
 
1568
    return 0;
 
1569
  }
 
1570
  return 1;
 
1571
}
 
1572
 
 
1573
 
 
1574
int maria_init_bulk_insert(MARIA_HA *info, ulong cache_size, ha_rows rows)
 
1575
{
 
1576
  MARIA_SHARE *share= info->s;
 
1577
  MARIA_KEYDEF *key=share->keyinfo;
 
1578
  bulk_insert_param *params;
 
1579
  uint i, num_keys, total_keylength;
 
1580
  ulonglong key_map;
 
1581
  DBUG_ENTER("_ma_init_bulk_insert");
 
1582
  DBUG_PRINT("enter",("cache_size: %lu", cache_size));
 
1583
 
 
1584
  DBUG_ASSERT(!info->bulk_insert &&
 
1585
              (!rows || rows >= MARIA_MIN_ROWS_TO_USE_BULK_INSERT));
 
1586
 
 
1587
  maria_clear_all_keys_active(key_map);
 
1588
  for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
 
1589
  {
 
1590
    if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
 
1591
        maria_is_key_active(share->state.key_map, i))
 
1592
    {
 
1593
      num_keys++;
 
1594
      maria_set_key_active(key_map, i);
 
1595
      total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
 
1596
    }
 
1597
  }
 
1598
 
 
1599
  if (num_keys==0 ||
 
1600
      num_keys * MARIA_MIN_SIZE_BULK_INSERT_TREE > cache_size)
 
1601
    DBUG_RETURN(0);
 
1602
 
 
1603
  if (rows && rows*total_keylength < cache_size)
 
1604
    cache_size= (ulong)rows;
 
1605
  else
 
1606
    cache_size/=total_keylength*16;
 
1607
 
 
1608
  info->bulk_insert=(TREE *)
 
1609
    my_malloc((sizeof(TREE)*share->base.keys+
 
1610
               sizeof(bulk_insert_param)*num_keys),MYF(0));
 
1611
 
 
1612
  if (!info->bulk_insert)
 
1613
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
1614
 
 
1615
  params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
 
1616
  for (i=0 ; i < share->base.keys ; i++)
 
1617
  {
 
1618
    if (maria_is_key_active(key_map, i))
 
1619
    {
 
1620
      params->info=info;
 
1621
      params->keynr=i;
 
1622
      /* Only allocate a 16'th of the buffer at a time */
 
1623
      init_tree(&info->bulk_insert[i],
 
1624
                cache_size * key[i].maxlength,
 
1625
                cache_size * key[i].maxlength, 0,
 
1626
                (qsort_cmp2)keys_compare, 0,
 
1627
                (tree_element_free) keys_free, (void *)params++);
 
1628
    }
 
1629
    else
 
1630
     info->bulk_insert[i].root=0;
 
1631
  }
 
1632
 
 
1633
  DBUG_RETURN(0);
 
1634
}
 
1635
 
 
1636
void maria_flush_bulk_insert(MARIA_HA *info, uint inx)
 
1637
{
 
1638
  if (info->bulk_insert)
 
1639
  {
 
1640
    if (is_tree_inited(&info->bulk_insert[inx]))
 
1641
      reset_tree(&info->bulk_insert[inx]);
 
1642
  }
 
1643
}
 
1644
 
 
1645
void maria_end_bulk_insert(MARIA_HA *info, my_bool abort)
 
1646
{
 
1647
  DBUG_ENTER("maria_end_bulk_insert");
 
1648
  if (info->bulk_insert)
 
1649
  {
 
1650
    uint i;
 
1651
    for (i=0 ; i < info->s->base.keys ; i++)
 
1652
    {
 
1653
      if (is_tree_inited(&info->bulk_insert[i]))
 
1654
      {
 
1655
        if (abort)
 
1656
          reset_free_element(&info->bulk_insert[i]);
 
1657
        delete_tree(&info->bulk_insert[i]);
 
1658
      }
 
1659
    }
 
1660
    my_free(info->bulk_insert, MYF(0));
 
1661
    info->bulk_insert= 0;
 
1662
  }
 
1663
  DBUG_VOID_RETURN;
 
1664
}
 
1665
 
 
1666
 
 
1667
/****************************************************************************
 
1668
  Dedicated functions that generate log entries
 
1669
****************************************************************************/
 
1670
 
 
1671
 
 
1672
int _ma_write_undo_key_insert(MARIA_HA *info, const MARIA_KEY *key,
 
1673
                              my_off_t *root, my_off_t new_root, LSN *res_lsn)
 
1674
{
 
1675
  MARIA_SHARE *share= info->s;
 
1676
  MARIA_KEYDEF *keyinfo= key->keyinfo;
 
1677
  uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
 
1678
                 KEY_NR_STORE_SIZE];
 
1679
  const uchar *key_value;
 
1680
  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
 
1681
  struct st_msg_to_write_hook_for_undo_key msg;
 
1682
  uint key_length;
 
1683
 
 
1684
  /* Save if we need to write a clr record */
 
1685
  lsn_store(log_data, info->trn->undo_lsn);
 
1686
  key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
 
1687
               keyinfo->key_nr);
 
1688
  key_length= key->data_length + key->ref_length;
 
1689
  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
 
1690
  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
 
1691
  log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key->data;
 
1692
  log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
 
1693
 
 
1694
  msg.root= root;
 
1695
  msg.value= new_root;
 
1696
  msg.auto_increment= 0;
 
1697
  key_value= key->data;
 
1698
  if (share->base.auto_key == ((uint) keyinfo->key_nr + 1))
 
1699
  {
 
1700
    const HA_KEYSEG *keyseg= keyinfo->seg;
 
1701
    uchar reversed[MARIA_MAX_KEY_BUFF];
 
1702
    if (keyseg->flag & HA_SWAP_KEY)
 
1703
    {
 
1704
      /* We put key from log record to "data record" packing format... */
 
1705
      const uchar *key_ptr= key->data, *key_end= key->data + keyseg->length;
 
1706
      uchar *to= reversed + keyseg->length;
 
1707
      do
 
1708
      {
 
1709
        *--to= *key_ptr++;
 
1710
      } while (key_ptr != key_end);
 
1711
      key_value= to;
 
1712
    }
 
1713
    /* ... so that we can read it with: */
 
1714
    msg.auto_increment=
 
1715
      ma_retrieve_auto_increment(key_value, keyseg->type);
 
1716
    /* and write_hook_for_undo_key_insert() will pick this. */
 
1717
  }
 
1718
 
 
1719
  return translog_write_record(res_lsn, LOGREC_UNDO_KEY_INSERT,
 
1720
                               info->trn, info,
 
1721
                               (translog_size_t)
 
1722
                               log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
 
1723
                               key_length,
 
1724
                               TRANSLOG_INTERNAL_PARTS + 2, log_array,
 
1725
                               log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
 
1726
}
 
1727
 
 
1728
 
 
1729
/**
 
1730
  @brief Log creation of new page
 
1731
 
 
1732
  @note
 
1733
    We don't have to store the page_length into the log entry as we can
 
1734
    calculate this from the length of the log entry
 
1735
 
 
1736
  @retval 1   error
 
1737
  @retval 0    ok
 
1738
*/
 
1739
 
 
1740
my_bool _ma_log_new(MARIA_HA *info, my_off_t page, const uchar *buff,
 
1741
                    uint page_length, uint key_nr, my_bool root_page)
 
1742
{
 
1743
  LSN lsn;
 
1744
  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE
 
1745
                 +1];
 
1746
  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
 
1747
  MARIA_SHARE *share= info->s;
 
1748
  DBUG_ENTER("_ma_log_new");
 
1749
  DBUG_PRINT("enter", ("page: %lu", (ulong) page));
 
1750
 
 
1751
  DBUG_ASSERT(share->now_transactional);
 
1752
 
 
1753
  /* Store address of new root page */
 
1754
  page/= share->block_size;
 
1755
  page_store(log_data + FILEID_STORE_SIZE, page);
 
1756
 
 
1757
  /* Store link to next unused page */
 
1758
  if (info->used_key_del == 2)
 
1759
    page= 0;                                    /* key_del not changed */
 
1760
  else
 
1761
    page= ((share->current_key_del == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
 
1762
           share->current_key_del / share->block_size);
 
1763
 
 
1764
  page_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE, page);
 
1765
  key_nr_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE*2, key_nr);
 
1766
  log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE*2 + KEY_NR_STORE_SIZE]=
 
1767
    (uchar) root_page;
 
1768
 
 
1769
  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
 
1770
  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
 
1771
 
 
1772
  page_length-= LSN_STORE_SIZE;
 
1773
  log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    buff + LSN_STORE_SIZE;
 
1774
  log_array[TRANSLOG_INTERNAL_PARTS + 1].length= page_length;
 
1775
 
 
1776
  if (translog_write_record(&lsn, LOGREC_REDO_INDEX_NEW_PAGE,
 
1777
                            info->trn, info,
 
1778
                            (translog_size_t) (sizeof(log_data) + page_length),
 
1779
                            TRANSLOG_INTERNAL_PARTS + 2, log_array,
 
1780
                            log_data, NULL))
 
1781
    DBUG_RETURN(1);
 
1782
  DBUG_RETURN(0);
 
1783
}
 
1784
 
 
1785
 
 
1786
/**
 
1787
   @brief
 
1788
   Log when some part of the key page changes
 
1789
*/
 
1790
 
 
1791
my_bool _ma_log_change(MARIA_HA *info, my_off_t page, const uchar *buff,
 
1792
                       const uchar *key_pos, uint length)
 
1793
{
 
1794
  LSN lsn;
 
1795
  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 6 + 7], *log_pos;
 
1796
  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
 
1797
  uint offset= (uint) (key_pos - buff), translog_parts, extra_length= 0;
 
1798
  DBUG_ENTER("_ma_log_change");
 
1799
  DBUG_PRINT("enter", ("page: %lu length: %u", (ulong) page, length));
 
1800
 
 
1801
  DBUG_ASSERT(info->s->now_transactional);
 
1802
 
 
1803
  /* Store address of new root page */
 
1804
  page/= info->s->block_size;
 
1805
  page_store(log_data + FILEID_STORE_SIZE, page);
 
1806
  log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
 
1807
  log_pos[0]= KEY_OP_OFFSET;
 
1808
  int2store(log_pos+1, offset);
 
1809
  log_pos[3]= KEY_OP_CHANGE;
 
1810
  int2store(log_pos+4, length);
 
1811
 
 
1812
  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
 
1813
  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data) - 7;
 
1814
  log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key_pos;
 
1815
  log_array[TRANSLOG_INTERNAL_PARTS + 1].length= length;
 
1816
  translog_parts= 2;
 
1817
 
 
1818
#ifdef EXTRA_DEBUG_KEY_CHANGES
 
1819
  {
 
1820
    int page_length= _ma_get_page_used(info->s, buff);
 
1821
    ha_checksum crc;
 
1822
    crc= my_checksum(0, buff + LSN_STORE_SIZE, page_length - LSN_STORE_SIZE);
 
1823
    log_pos+= 6;
 
1824
    log_pos[0]= KEY_OP_CHECK;
 
1825
    int2store(log_pos+1, page_length);
 
1826
    int4store(log_pos+3, crc);
 
1827
    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= (char *) log_pos;
 
1828
    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
 
1829
    extra_length+= 7;
 
1830
    translog_parts++;
 
1831
  }
 
1832
#endif
 
1833
 
 
1834
  if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
 
1835
                            info->trn, info,
 
1836
                            (translog_size_t) (sizeof(log_data) - 7 + length +
 
1837
                                               extra_length),
 
1838
                            TRANSLOG_INTERNAL_PARTS + translog_parts,
 
1839
                            log_array, log_data, NULL))
 
1840
    DBUG_RETURN(1);
 
1841
  DBUG_RETURN(0);
 
1842
}
 
1843
 
 
1844
 
 
1845
/**
 
1846
   @brief Write log entry for page splitting
 
1847
 
 
1848
   @note
 
1849
     Write log entry for page that has got a key added to the page under
 
1850
     one and only one of the following senarios:
 
1851
     - Page is shortened from end
 
1852
     - Data is added to end of page
 
1853
     - Data added at front of page
 
1854
 
 
1855
   @param prefix_or_suffix  KEY_OP_NONE         Ignored
 
1856
                            KEY_OP_ADD_PREFIX   Add data to start of page
 
1857
                            KEY_OP_ADD_SUFFIX   Add data to end of page
 
1858
 
 
1859
*/
 
1860
 
 
1861
static my_bool _ma_log_split(MARIA_HA *info, my_off_t page, const uchar *buff,
 
1862
                             uint org_length, uint new_length,
 
1863
                             const uchar *key_pos, uint key_length,
 
1864
                             int move_length, enum en_key_op prefix_or_suffix,
 
1865
                             const uchar *data, uint data_length,
 
1866
                             uint changed_length)
 
1867
{
 
1868
  LSN lsn;
 
1869
  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3+3+3+3+3+2];
 
1870
  uchar *log_pos;
 
1871
  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
 
1872
  uint offset= (uint) (key_pos - buff);
 
1873
  uint translog_parts, extra_length;
 
1874
  DBUG_ENTER("_ma_log_split");
 
1875
  DBUG_PRINT("enter", ("page: %lu  org_length: %u  new_length: %u",
 
1876
                       (ulong) page, org_length, new_length));
 
1877
 
 
1878
  log_pos= log_data + FILEID_STORE_SIZE;
 
1879
  page/= info->s->block_size;
 
1880
  page_store(log_pos, page);
 
1881
  log_pos+= PAGE_STORE_SIZE;
 
1882
 
 
1883
  if (new_length <= offset || !key_pos)
 
1884
  {
 
1885
    /*
 
1886
      Page was split before inserted key. Write redo entry where
 
1887
      we just cut current page at page_length
 
1888
    */
 
1889
    uint length_offset= org_length - new_length;
 
1890
    log_pos[0]= KEY_OP_DEL_SUFFIX;
 
1891
    int2store(log_pos+1, length_offset);
 
1892
    log_pos+= 3;
 
1893
    translog_parts= 1;
 
1894
    extra_length= 0;
 
1895
  }
 
1896
  else
 
1897
  {
 
1898
    /* Key was added to page which was split after the inserted key */
 
1899
    uint max_key_length;
 
1900
 
 
1901
    /*
 
1902
      Handle case when split happened directly after the newly inserted key.
 
1903
    */
 
1904
    max_key_length= new_length - offset;
 
1905
    extra_length= min(key_length, max_key_length);
 
1906
 
 
1907
    if ((int) new_length < (int) (org_length + move_length + data_length))
 
1908
    {
 
1909
      /* Shorten page */
 
1910
      uint diff= org_length + move_length + data_length - new_length;
 
1911
      log_pos[0]= KEY_OP_DEL_SUFFIX;
 
1912
      int2store(log_pos + 1, diff);
 
1913
      log_pos+= 3;
 
1914
    }
 
1915
    else
 
1916
    {
 
1917
      DBUG_ASSERT(new_length == org_length + move_length + data_length);
 
1918
    }
 
1919
 
 
1920
    log_pos[0]= KEY_OP_OFFSET;
 
1921
    int2store(log_pos+1, offset);
 
1922
    log_pos+= 3;
 
1923
 
 
1924
    if (move_length)
 
1925
    {
 
1926
      log_pos[0]= KEY_OP_SHIFT;
 
1927
      int2store(log_pos+1, move_length);
 
1928
      log_pos+= 3;
 
1929
    }
 
1930
 
 
1931
    log_pos[0]= KEY_OP_CHANGE;
 
1932
    int2store(log_pos+1, extra_length);
 
1933
    log_pos+= 3;
 
1934
 
 
1935
    /* Point to original inserted key data */
 
1936
    if (prefix_or_suffix == KEY_OP_ADD_PREFIX)
 
1937
      key_pos+= data_length;
 
1938
 
 
1939
    translog_parts= 2;
 
1940
    log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key_pos;
 
1941
    log_array[TRANSLOG_INTERNAL_PARTS + 1].length= extra_length;
 
1942
  }
 
1943
 
 
1944
  if (data_length)
 
1945
  {
 
1946
    /* Add prefix or suffix */
 
1947
    log_pos[0]= prefix_or_suffix;
 
1948
    int2store(log_pos+1, data_length);
 
1949
    log_pos+= 3;
 
1950
    if (prefix_or_suffix == KEY_OP_ADD_PREFIX)
 
1951
    {
 
1952
      int2store(log_pos+1, changed_length);
 
1953
      log_pos+= 2;
 
1954
      data_length= changed_length;
 
1955
    }
 
1956
    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str=    data;
 
1957
    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= data_length;
 
1958
    translog_parts++;
 
1959
    extra_length+= data_length;
 
1960
  }
 
1961
 
 
1962
  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
 
1963
  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
 
1964
                                                         log_data);
 
1965
  DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
 
1966
                                    info->trn, info,
 
1967
                                    (translog_size_t)
 
1968
                                    log_array[TRANSLOG_INTERNAL_PARTS +
 
1969
                                              0].length + extra_length,
 
1970
                                    TRANSLOG_INTERNAL_PARTS + translog_parts,
 
1971
                                    log_array, log_data, NULL));
 
1972
}
 
1973
 
 
1974
 
 
1975
/**
 
1976
   @brief
 
1977
   Write log entry for page that has got a key added to the page
 
1978
   and page is shortened from start of page
 
1979
 
 
1980
   @fn _ma_log_del_prefix()
 
1981
   @param info          Maria handler
 
1982
   @param page          Page number
 
1983
   @param buff          Page buffer
 
1984
   @param org_length    Length of buffer when read
 
1985
   @param new_length    Final length
 
1986
   @param key_pos       Where on page buffer key was added. This is position
 
1987
                        before prefix was removed
 
1988
   @param key_length    How many bytes was changed at 'key_pos'
 
1989
   @param move_length   How many bytes was moved up when key was added
 
1990
 
 
1991
   @return
 
1992
   @retval  0  ok
 
1993
   @retval  1  error
 
1994
*/
 
1995
 
 
1996
static my_bool _ma_log_del_prefix(MARIA_HA *info, my_off_t page,
 
1997
                                  const uchar *buff,
 
1998
                                  uint org_length, uint new_length,
 
1999
                                  const uchar *key_pos, uint key_length,
 
2000
                                  int move_length)
 
2001
{
 
2002
  LSN lsn;
 
2003
  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 12], *log_pos;
 
2004
  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
 
2005
  uint offset= (uint) (key_pos - buff);
 
2006
  uint diff_length= org_length + move_length - new_length;
 
2007
  uint translog_parts, extra_length;
 
2008
  DBUG_ENTER("_ma_log_del_prefix");
 
2009
  DBUG_PRINT("enter", ("page: %lu  org_length: %u  new_length: %u",
 
2010
                       (ulong) page, org_length, new_length));
 
2011
 
 
2012
  DBUG_ASSERT((int) diff_length > 0);
 
2013
 
 
2014
  log_pos= log_data + FILEID_STORE_SIZE;
 
2015
  page/= info->s->block_size;
 
2016
  page_store(log_pos, page);
 
2017
  log_pos+= PAGE_STORE_SIZE;
 
2018
 
 
2019
  translog_parts= 1;
 
2020
  extra_length= 0;
 
2021
 
 
2022
  if (offset < diff_length + info->s->keypage_header)
 
2023
  {
 
2024
    /*
 
2025
      Key is not anymore on page. Move data down, but take into account that
 
2026
      the original page had grown with 'move_length bytes'
 
2027
    */
 
2028
    DBUG_ASSERT(offset + key_length <= diff_length + info->s->keypage_header);
 
2029
 
 
2030
    log_pos[0]= KEY_OP_DEL_PREFIX;
 
2031
    int2store(log_pos+1, diff_length - move_length);
 
2032
    log_pos+= 3;
 
2033
  }
 
2034
  else
 
2035
  {
 
2036
    /*
 
2037
      Correct position to key, as data before key has been delete and key
 
2038
      has thus been moved down
 
2039
    */
 
2040
    offset-= diff_length;
 
2041
    key_pos-= diff_length;
 
2042
 
 
2043
    /* Move data down */
 
2044
    log_pos[0]= KEY_OP_DEL_PREFIX;
 
2045
    int2store(log_pos+1, diff_length);
 
2046
    log_pos+= 3;
 
2047
 
 
2048
    log_pos[0]= KEY_OP_OFFSET;
 
2049
    int2store(log_pos+1, offset);
 
2050
    log_pos+= 3;
 
2051
 
 
2052
    if (move_length)
 
2053
    {
 
2054
      log_pos[0]= KEY_OP_SHIFT;
 
2055
      int2store(log_pos+1, move_length);
 
2056
      log_pos+= 3;
 
2057
    }
 
2058
    log_pos[0]= KEY_OP_CHANGE;
 
2059
    int2store(log_pos+1, key_length);
 
2060
    log_pos+= 3;
 
2061
    log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key_pos;
 
2062
    log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
 
2063
    translog_parts= 2;
 
2064
    extra_length= key_length;
 
2065
  }
 
2066
  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
 
2067
  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
 
2068
                                                         log_data);
 
2069
  DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
 
2070
                                    info->trn, info,
 
2071
                                    (translog_size_t)
 
2072
                                    log_array[TRANSLOG_INTERNAL_PARTS +
 
2073
                                              0].length + extra_length,
 
2074
                                    TRANSLOG_INTERNAL_PARTS + translog_parts,
 
2075
                                    log_array, log_data, NULL));
 
2076
}
 
2077
 
 
2078
 
 
2079
/**
 
2080
   @brief
 
2081
   Write log entry for page that has got data added first and
 
2082
   data deleted last. Old changed key may be part of page
 
2083
*/
 
2084
 
 
2085
static my_bool _ma_log_key_middle(MARIA_HA *info, my_off_t page,
 
2086
                                  const uchar *buff,
 
2087
                                  uint new_length,
 
2088
                                  uint data_added_first,
 
2089
                                  uint data_changed_first,
 
2090
                                  uint data_deleted_last,
 
2091
                                  const uchar *key_pos,
 
2092
                                  uint key_length, int move_length)
 
2093
{
 
2094
  LSN lsn;
 
2095
  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3+5+3+3+3];
 
2096
  uchar *log_pos;
 
2097
  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
 
2098
  uint key_offset;
 
2099
  uint translog_parts, extra_length;
 
2100
  DBUG_ENTER("_ma_log_key_middle");
 
2101
  DBUG_PRINT("enter", ("page: %lu", (ulong) page));
 
2102
 
 
2103
  /* new place of key after changes */
 
2104
  key_pos+= data_added_first;
 
2105
  key_offset= (uint) (key_pos - buff);
 
2106
  if (key_offset < new_length)
 
2107
  {
 
2108
    /* key is on page; Calculate how much of the key is there */
 
2109
    uint max_key_length= new_length - key_offset;
 
2110
    if (max_key_length < key_length)
 
2111
    {
 
2112
      /* Key is last on page */
 
2113
      key_length= max_key_length;
 
2114
      move_length= 0;
 
2115
    }
 
2116
    /*
 
2117
      Take into account that new data was added as part of original key
 
2118
      that also needs to be removed from page
 
2119
    */
 
2120
    data_deleted_last+= move_length;
 
2121
  }
 
2122
 
 
2123
  page/= info->s->block_size;
 
2124
 
 
2125
  /* First log changes to page */
 
2126
  log_pos= log_data + FILEID_STORE_SIZE;
 
2127
  page_store(log_pos, page);
 
2128
  log_pos+= PAGE_STORE_SIZE;
 
2129
 
 
2130
  log_pos[0]= KEY_OP_DEL_SUFFIX;
 
2131
  int2store(log_pos+1, data_deleted_last);
 
2132
  log_pos+= 3;
 
2133
 
 
2134
  log_pos[0]= KEY_OP_ADD_PREFIX;
 
2135
  int2store(log_pos+1, data_added_first);
 
2136
  int2store(log_pos+3, data_changed_first);
 
2137
  log_pos+= 5;
 
2138
 
 
2139
  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
 
2140
  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
 
2141
                                                         log_data);
 
2142
  log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    (buff +
 
2143
                                                  info->s->keypage_header);
 
2144
  log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_changed_first;
 
2145
  translog_parts= 2;
 
2146
  extra_length= data_changed_first;
 
2147
 
 
2148
  /* If changed key is on page, log those changes too */
 
2149
 
 
2150
  if (key_offset < new_length)
 
2151
  {
 
2152
    uchar *start_log_pos= log_pos;
 
2153
 
 
2154
    log_pos[0]= KEY_OP_OFFSET;
 
2155
    int2store(log_pos+1, key_offset);
 
2156
    log_pos+= 3;
 
2157
    if (move_length)
 
2158
    {
 
2159
      log_pos[0]= KEY_OP_SHIFT;
 
2160
      int2store(log_pos+1, move_length);
 
2161
      log_pos+= 3;
 
2162
    }
 
2163
    log_pos[0]= KEY_OP_CHANGE;
 
2164
    int2store(log_pos+1, key_length);
 
2165
    log_pos+= 3;
 
2166
 
 
2167
    log_array[TRANSLOG_INTERNAL_PARTS + 2].str=    start_log_pos;
 
2168
    log_array[TRANSLOG_INTERNAL_PARTS + 2].length= (uint) (log_pos -
 
2169
                                                           start_log_pos);
 
2170
 
 
2171
    log_array[TRANSLOG_INTERNAL_PARTS + 3].str=    key_pos;
 
2172
    log_array[TRANSLOG_INTERNAL_PARTS + 3].length= key_length;
 
2173
    translog_parts+=2;
 
2174
    extra_length+= (uint) (log_array[TRANSLOG_INTERNAL_PARTS + 2].length +
 
2175
                           key_length);
 
2176
  }
 
2177
 
 
2178
  DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
 
2179
                                    info->trn, info,
 
2180
                                    (translog_size_t)
 
2181
                                    (log_array[TRANSLOG_INTERNAL_PARTS +
 
2182
                                               0].length + extra_length),
 
2183
                                    TRANSLOG_INTERNAL_PARTS + translog_parts,
 
2184
                                    log_array, log_data, NULL));
 
2185
}
 
2186
 
 
2187
 
 
2188
#ifdef NOT_NEEDED
 
2189
 
 
2190
/**
 
2191
   @brief
 
2192
   Write log entry for page that has got data added first and
 
2193
   data deleted last
 
2194
*/
 
2195
 
 
2196
static my_bool _ma_log_middle(MARIA_HA *info, my_off_t page,
 
2197
                              const uchar *buff,
 
2198
                              uint data_added_first, uint data_changed_first,
 
2199
                              uint data_deleted_last)
 
2200
{
 
2201
  LSN lsn;
 
2202
  LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
 
2203
  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3 + 5], *log_pos;
 
2204
  DBUG_ENTER("_ma_log_middle");
 
2205
  DBUG_PRINT("enter", ("page: %lu", (ulong) page));
 
2206
 
 
2207
  page/= info->s->block_size;
 
2208
 
 
2209
  log_pos= log_data + FILEID_STORE_SIZE;
 
2210
  page_store(log_pos, page);
 
2211
  log_pos+= PAGE_STORE_SIZE;
 
2212
 
 
2213
  log_pos[0]= KEY_OP_DEL_PREFIX;
 
2214
  int2store(log_pos+1, data_deleted_last);
 
2215
  log_pos+= 3;
 
2216
 
 
2217
  log_pos[0]= KEY_OP_ADD_PREFIX;
 
2218
  int2store(log_pos+1, data_added_first);
 
2219
  int2store(log_pos+3, data_changed_first);
 
2220
  log_pos+= 5;
 
2221
 
 
2222
  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
 
2223
  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
 
2224
                                                         log_data);
 
2225
 
 
2226
  log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    ((char*) buff +
 
2227
                                                  info->s->keypage_header);
 
2228
  log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_changed_first;
 
2229
  DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
 
2230
                                    info->trn, info,
 
2231
                                    (translog_size_t)
 
2232
                                    log_array[TRANSLOG_INTERNAL_PARTS +
 
2233
                                              0].length + data_changed_first,
 
2234
                                    TRANSLOG_INTERNAL_PARTS + 2,
 
2235
                                    log_array, log_data, NULL));
 
2236
}
 
2237
#endif