1
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
/* Write a row to a MARIA table */
18
#include "ma_fulltext.h"
19
#include "ma_rt_index.h"
21
#include "ma_key_recover.h"
22
#include "ma_blockrec.h"
24
#define MAX_POINTER_LENGTH 8
26
/* Functions declared in this file */
28
static int w_search(register MARIA_HA *info, uint32 comp_flag,
29
MARIA_KEY *key, my_off_t page,
30
my_off_t father_page, uchar *father_buff,
31
MARIA_PINNED_PAGE *father_page_link, uchar *father_keypos,
33
static int _ma_balance_page(MARIA_HA *info,MARIA_KEYDEF *keyinfo,
34
MARIA_KEY *key, uchar *curr_buff, my_off_t page,
35
my_off_t father_page, uchar *father_buff,
37
MARIA_KEY_PARAM *s_temp);
38
static uchar *_ma_find_last_pos(MARIA_HA *info, MARIA_KEY *int_key,
39
uchar *page, uchar **after_key);
40
static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key);
41
static my_bool _ma_ck_write_btree(register MARIA_HA *info, MARIA_KEY *key);
42
static int _ma_ck_write_btree_with_log(MARIA_HA *info, MARIA_KEY *key,
43
my_off_t *root, uint32 comp_flag);
44
static my_bool _ma_log_split(MARIA_HA *info, my_off_t page, const uchar *buff,
45
uint org_length, uint new_length,
47
uint key_length, int move_length,
48
enum en_key_op prefix_or_suffix,
49
const uchar *data, uint data_length,
51
static my_bool _ma_log_del_prefix(MARIA_HA *info, my_off_t page,
53
uint org_length, uint new_length,
54
const uchar *key_pos, uint key_length,
56
static my_bool _ma_log_key_middle(MARIA_HA *info, my_off_t page,
59
uint data_added_first,
60
uint data_changed_first,
61
uint data_deleted_last,
63
uint key_length, int move_length);
66
@brief Default handler for returing position to new row
69
This is only called for non transactional tables and not for block format
70
which is why we use info->state here.
73
MARIA_RECORD_POS _ma_write_init_default(MARIA_HA *info,
75
__attribute__((unused)))
77
return ((info->s->state.dellink != HA_OFFSET_ERROR &&
78
!info->append_insert_at_end) ?
79
info->s->state.dellink :
80
info->state->data_file_length);
83
my_bool _ma_write_abort_default(MARIA_HA *info __attribute__((unused)))
89
/* Write new record to a table */
91
int maria_write(MARIA_HA *info, uchar *record)
93
MARIA_SHARE *share= info->s;
96
MARIA_RECORD_POS filepos;
98
my_bool lock_tree= share->lock_key_trees;
100
MARIA_KEYDEF *keyinfo;
101
DBUG_ENTER("maria_write");
102
DBUG_PRINT("enter",("index_file: %d data_file: %d",
103
share->kfile.file, info->dfile.file));
105
DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
106
maria_print_error(info->s, HA_ERR_CRASHED);
107
DBUG_RETURN(my_errno= HA_ERR_CRASHED););
108
if (share->options & HA_OPTION_READ_ONLY_DATA)
110
DBUG_RETURN(my_errno=EACCES);
112
if (_ma_readinfo(info,F_WRLCK,1))
113
DBUG_RETURN(my_errno);
114
dont_break(); /* Dont allow SIGHUP or SIGINT */
116
if (share->base.reloc == (ha_rows) 1 &&
117
share->base.records == (ha_rows) 1 &&
118
share->state.state.records == (ha_rows) 1)
120
my_errno=HA_ERR_RECORD_FILE_FULL;
123
if (share->state.state.key_file_length >= share->base.margin_key_file_length)
125
my_errno=HA_ERR_INDEX_FILE_FULL;
128
if (_ma_mark_file_changed(info))
131
/* Calculate and check all unique constraints */
132
for (i=0 ; i < share->state.header.uniques ; i++)
134
if (_ma_check_unique(info,share->uniqueinfo+i,record,
135
_ma_unique_hash(share->uniqueinfo+i,record),
140
/* Ensure we don't try to restore auto_increment if it doesn't change */
141
info->last_auto_increment= ~(ulonglong) 0;
143
if ((info->opt_flag & OPT_NO_ROWS))
144
filepos= HA_OFFSET_ERROR;
148
This may either calculate a record or, or write the record and return
151
if ((filepos= (*share->write_record_init)(info, record)) ==
156
/* Write all keys to indextree */
157
buff= info->lastkey_buff2;
158
for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
161
if (maria_is_key_active(share->state.key_map, i))
163
my_bool local_lock_tree= (lock_tree &&
164
!(info->bulk_insert &&
165
is_tree_inited(&info->bulk_insert[i])));
168
rw_wrlock(&keyinfo->root_lock);
171
if (keyinfo->flag & HA_FULLTEXT )
173
if (_ma_ft_add(info,i, buff,record,filepos))
176
rw_unlock(&keyinfo->root_lock);
177
DBUG_PRINT("error",("Got error: %d on write",my_errno));
183
if (keyinfo->ck_insert(info,
184
(*keyinfo->make_key)(info, &int_key, i,
185
buff, record, filepos,
189
rw_unlock(&keyinfo->root_lock);
190
DBUG_PRINT("error",("Got error: %d on write",my_errno));
195
/* The above changed info->lastkey2. Inform maria_rnext_same(). */
196
info->update&= ~HA_STATE_RNEXT_SAME;
199
rw_unlock(&keyinfo->root_lock);
202
if (share->calc_write_checksum)
203
info->cur_row.checksum= (*share->calc_write_checksum)(info,record);
204
if (filepos != HA_OFFSET_ERROR)
206
if ((*share->write_record)(info,record))
208
info->state->checksum+= info->cur_row.checksum;
210
if (!share->now_transactional)
212
if (share->base.auto_key != 0)
214
const HA_KEYSEG *keyseg= share->keyinfo[share->base.auto_key-1].seg;
215
const uchar *key= record + keyseg->start;
216
set_if_bigger(share->state.auto_increment,
217
ma_retrieve_auto_increment(key, keyseg->type));
220
info->state->records++;
221
info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
222
HA_STATE_ROW_CHANGED);
223
share->state.changed|= STATE_NOT_MOVABLE | STATE_NOT_ZEROFILLED;
225
info->cur_row.lastpos= filepos;
226
(void)(_ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE));
227
if (info->invalidator != 0)
229
DBUG_PRINT("info", ("invalidator... '%s' (update)",
230
share->open_file_name));
231
(*info->invalidator)(share->open_file_name);
236
Update status of the table. We need to do so after each row write
237
for the log tables, as we want the new row to become visible to
238
other threads as soon as possible. We don't lock mutex here
239
(as it is required by pthread memory visibility rules) as (1) it's
240
not critical to use outdated share->is_log_table value (2) locking
241
mutex here for every write is too expensive.
243
if (share->is_log_table)
244
_ma_update_status((void*) info);
246
allow_break(); /* Allow SIGHUP & SIGINT */
250
save_errno= my_errno;
252
if (my_errno == HA_ERR_FOUND_DUPP_KEY ||
253
my_errno == HA_ERR_RECORD_FILE_FULL ||
254
my_errno == HA_ERR_NULL_IN_SPATIAL ||
255
my_errno == HA_ERR_OUT_OF_MEM)
257
if (info->bulk_insert)
260
for (j=0 ; j < share->base.keys ; j++)
261
maria_flush_bulk_insert(info, j);
263
info->errkey= (int) i;
265
We delete keys in the reverse order of insertion. This is the order that
266
a rollback would do and is important for CLR_ENDs generated by
267
_ma_ft|ck_delete() and write_record_abort() to work (with any other
268
order they would cause wrong jumps in the chain).
272
if (maria_is_key_active(share->state.key_map, i))
274
my_bool local_lock_tree= (lock_tree &&
275
!(info->bulk_insert &&
276
is_tree_inited(&info->bulk_insert[i])));
277
keyinfo= share->keyinfo + i;
279
rw_wrlock(&keyinfo->root_lock);
282
The key deletes below should generate CLR_ENDs
284
if (keyinfo->flag & HA_FULLTEXT)
286
if (_ma_ft_del(info,i,buff,record,filepos))
289
rw_unlock(&keyinfo->root_lock);
296
if (_ma_ck_delete(info,
297
(*keyinfo->make_key)(info, &key, i, buff, record,
298
filepos, info->trn->trid)))
301
rw_unlock(&keyinfo->root_lock);
306
rw_unlock(&keyinfo->root_lock);
313
if ((*share->write_record_abort)(info))
317
maria_print_error(info->s, HA_ERR_CRASHED);
318
maria_mark_crashed(info);
321
info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
325
DBUG_ASSERT(save_errno);
327
save_errno= HA_ERR_INTERNAL_ERROR; /* Should never happen */
328
DBUG_PRINT("error", ("got error: %d", save_errno));
329
(void)(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
330
allow_break(); /* Allow SIGHUP & SIGINT */
331
DBUG_RETURN(my_errno=save_errno);
336
Write one key to btree
339
Remove this function and have bulk insert change keyinfo->ck_insert
340
to point to the right function
343
my_bool _ma_ck_write(MARIA_HA *info, MARIA_KEY *key)
345
DBUG_ENTER("_ma_ck_write");
347
if (info->bulk_insert &&
348
is_tree_inited(&info->bulk_insert[key->keyinfo->key_nr]))
350
DBUG_RETURN(_ma_ck_write_tree(info, key));
352
DBUG_RETURN(_ma_ck_write_btree(info, key));
356
/**********************************************************************
357
Insert key into btree (normal case)
358
**********************************************************************/
360
static my_bool _ma_ck_write_btree(MARIA_HA *info, MARIA_KEY *key)
363
MARIA_KEYDEF *keyinfo= key->keyinfo;
364
my_off_t *root= &info->s->state.key_root[keyinfo->key_nr];
365
DBUG_ENTER("_ma_ck_write_btree");
367
error= _ma_ck_write_btree_with_log(info, key, root,
368
keyinfo->write_comp_flag | key->flag);
369
if (info->ft1_to_ft2)
372
error= _ma_ft_convert_to_ft2(info, key);
373
delete_dynamic(info->ft1_to_ft2);
374
my_free((uchar*)info->ft1_to_ft2, MYF(0));
377
DBUG_RETURN(error != 0);
378
} /* _ma_ck_write_btree */
382
@brief Write a key to the b-tree
388
static int _ma_ck_write_btree_with_log(MARIA_HA *info, MARIA_KEY *key,
389
my_off_t *root, uint32 comp_flag)
391
MARIA_SHARE *share= info->s;
392
LSN lsn= LSN_IMPOSSIBLE;
394
my_off_t new_root= *root;
395
uchar key_buff[MARIA_MAX_KEY_BUFF];
397
DBUG_ENTER("_ma_ck_write_btree_with_log");
399
LINT_INIT_STRUCT(org_key);
400
if (share->now_transactional)
402
/* Save original value as the key may change */
404
memcpy(key_buff, key->data, key->data_length + key->ref_length);
407
error= _ma_ck_real_write_btree(info, key, &new_root, comp_flag);
408
if (!error && share->now_transactional)
410
/* Log the original value */
413
error= _ma_write_undo_key_insert(info, key, root, new_root, &lsn);
418
_ma_fast_unlock_key_del(info);
420
_ma_unpin_all_pages_and_finalize_row(info, lsn);
423
} /* _ma_ck_write_btree_with_log */
427
@brief Write a key to the b-tree
433
int _ma_ck_real_write_btree(MARIA_HA *info, MARIA_KEY *key, my_off_t *root,
437
DBUG_ENTER("_ma_ck_real_write_btree");
439
/* key_length parameter is used only if comp_flag is SEARCH_FIND */
440
if (*root == HA_OFFSET_ERROR ||
441
(error= w_search(info, comp_flag, key, *root, (my_off_t) 0, (uchar*) 0,
442
(MARIA_PINNED_PAGE *) 0, (uchar*) 0, 1)) > 0)
443
error= _ma_enlarge_root(info, key, root);
445
} /* _ma_ck_real_write_btree */
449
@brief Make a new root with key as only pointer
455
int _ma_enlarge_root(MARIA_HA *info, MARIA_KEY *key, my_off_t *root)
457
uint t_length, page_flag, nod_flag, page_length;
458
MARIA_KEY_PARAM s_temp;
459
MARIA_SHARE *share= info->s;
460
MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
461
MARIA_KEYDEF *keyinfo= key->keyinfo;
463
DBUG_ENTER("_ma_enlarge_root");
465
nod_flag= (*root != HA_OFFSET_ERROR) ? share->base.key_reflength : 0;
466
/* Store pointer to prev page if nod */
467
_ma_kpointer(info, info->buff + share->keypage_header, *root);
468
t_length= (*keyinfo->pack_key)(key, nod_flag, (uchar*) 0,
469
(uchar*) 0, (uchar*) 0, &s_temp);
470
page_length= share->keypage_header + t_length + nod_flag;
472
bzero(info->buff, share->keypage_header);
473
_ma_store_keynr(share, info->buff, keyinfo->key_nr);
474
_ma_store_page_used(share, info->buff, page_length);
477
page_flag|= KEYPAGE_FLAG_ISNOD;
478
if (key->flag & (SEARCH_USER_KEY_HAS_TRANSID | SEARCH_PAGE_KEY_HAS_TRANSID))
479
page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
480
_ma_store_keypage_flag(share, info->buff, page_flag);
481
(*keyinfo->store_key)(keyinfo, info->buff + share->keypage_header +
484
/* Mark that info->buff was used */
485
info->keyread_buff_used= info->page_changed= 1;
486
if ((*root= _ma_new(info, PAGECACHE_PRIORITY_HIGH, &page_link)) ==
491
Clear unitialized part of page to avoid valgrind/purify warnings
492
and to get a clean page that is easier to compress and compare with
493
pages generated with redo
495
bzero(info->buff + page_length, share->block_size - page_length);
497
if (share->now_transactional &&
498
_ma_log_new(info, *root, info->buff, page_length, keyinfo->key_nr, 1))
500
if (_ma_write_keypage(info, keyinfo, *root, page_link->write_lock,
501
PAGECACHE_PRIORITY_HIGH, info->buff))
505
} /* _ma_enlarge_root */
509
Search after a position for a key and store it there
514
@retval > 0 Key should be stored in higher tree
517
static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key,
518
my_off_t page, my_off_t father_page, uchar *father_buff,
519
MARIA_PINNED_PAGE *father_page_link, uchar *father_keypos,
523
uint page_flag, nod_flag;
524
uchar *temp_buff,*keypos;
525
uchar keybuff[MARIA_MAX_KEY_BUFF];
526
my_bool was_last_key;
527
my_off_t next_page, dup_key_pos;
528
MARIA_PINNED_PAGE *page_link;
529
MARIA_SHARE *share= info->s;
530
MARIA_KEYDEF *keyinfo= key->keyinfo;
531
DBUG_ENTER("w_search");
532
DBUG_PRINT("enter",("page: %ld", (long) page));
534
if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
535
MARIA_MAX_KEY_BUFF*2)))
537
if (!_ma_fetch_keypage(info, keyinfo, page, PAGECACHE_LOCK_WRITE,
538
DFLT_INIT_HITS, temp_buff, 0, &page_link))
541
flag= (*keyinfo->bin_search)(key, temp_buff, comp_flag, &keypos,
542
keybuff, &was_last_key);
543
page_flag= _ma_get_keypage_flag(share, temp_buff);
544
nod_flag= _ma_test_if_nod(share, temp_buff);
548
/* get position to record with duplicated key */
550
tmp_key.keyinfo= keyinfo;
551
tmp_key.data= keybuff;
553
if ((*keyinfo->get_key)(&tmp_key, page_flag, nod_flag, &keypos))
554
dup_key_pos= _ma_row_pos_from_key(&tmp_key);
556
dup_key_pos= HA_OFFSET_ERROR;
558
if (keyinfo->flag & HA_FULLTEXT)
563
get_key_full_length_rdonly(off, keybuff);
564
subkeys=ft_sintXkorr(keybuff+off);
565
comp_flag=SEARCH_SAME;
568
/* normal word, one-level tree structure */
569
flag=(*keyinfo->bin_search)(key, temp_buff, comp_flag,
570
&keypos, keybuff, &was_last_key);
574
/* popular word. two-level tree. going down */
575
my_off_t root=dup_key_pos;
576
keyinfo= &share->ft2_keyinfo;
577
get_key_full_length_rdonly(off, key);
579
/* we'll modify key entry 'in vivo' */
580
keypos-= keyinfo->keylength + nod_flag;
581
error= _ma_ck_real_write_btree(info, key, &root, comp_flag);
582
_ma_dpointer(share, keypos+HA_FT_WLEN, root);
583
subkeys--; /* should there be underflow protection ? */
584
DBUG_ASSERT(subkeys < 0);
585
ft_intXstore(keypos, subkeys);
588
page_link->changed= 1;
589
error= _ma_write_keypage(info, keyinfo, page,
590
PAGECACHE_LOCK_LEFT_WRITELOCKED,
591
DFLT_INIT_HITS, temp_buff);
593
my_afree((uchar*) temp_buff);
597
else /* not HA_FULLTEXT, normal HA_NOSAME key */
599
DBUG_PRINT("warning", ("Duplicate key"));
600
info->dup_key_pos= dup_key_pos;
601
my_afree((uchar*) temp_buff);
602
my_errno=HA_ERR_FOUND_DUPP_KEY;
606
if (flag == MARIA_FOUND_WRONG_KEY)
610
next_page= _ma_kpos(nod_flag,keypos);
611
if (next_page == HA_OFFSET_ERROR ||
612
(error= w_search(info, comp_flag, key, next_page,
613
page, temp_buff, page_link, keypos, insert_last)) > 0)
615
error= _ma_insert(info, key, temp_buff, keypos, page, keybuff,
616
father_page, father_buff, father_page_link,
617
father_keypos, insert_last);
618
page_link->changed= 1;
619
if (_ma_write_keypage(info, keyinfo, page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
620
DFLT_INIT_HITS,temp_buff))
623
my_afree((uchar*) temp_buff);
626
my_afree((uchar*) temp_buff);
627
DBUG_PRINT("exit",("Error: %d",my_errno));
637
info Open table information.
638
keyinfo Key definition information.
640
anc_buff Key page (beginning).
641
key_pos Position in key page where to insert.
642
anc_page Page number for anc_buff
643
key_buff Copy of previous key if keys where packed.
644
father_page position of parent key page in file.
645
father_buff parent key page for balancing.
646
father_page_link Link to father page for marking page changed
647
father_key_pos position in parent key page for balancing.
648
insert_last If to append at end of page.
651
Insert new key at right of key_pos.
652
Note that caller must save anc_buff
654
This function writes log records for all changed pages
655
(Including anc_buff and father page)
660
1 If key contains key to upper level (from balance page)
661
2 If key contains key to upper level (from split space)
664
int _ma_insert(register MARIA_HA *info, MARIA_KEY *key, uchar *anc_buff,
665
uchar *key_pos, my_off_t anc_page, uchar *key_buff,
666
my_off_t father_page, uchar *father_buff,
667
MARIA_PINNED_PAGE *father_page_link,
668
uchar *father_key_pos, my_bool insert_last)
670
uint a_length, nod_flag, org_anc_length;
672
uchar *endpos, *prev_key;
673
MARIA_KEY_PARAM s_temp;
674
MARIA_SHARE *share= info->s;
675
MARIA_KEYDEF *keyinfo= key->keyinfo;
676
DBUG_ENTER("_ma_insert");
677
DBUG_PRINT("enter",("key_pos: 0x%lx", (ulong) key_pos));
678
DBUG_EXECUTE("key", _ma_print_key(DBUG_FILE, key););
680
_ma_get_used_and_nod(share, anc_buff, a_length, nod_flag);
681
org_anc_length= a_length;
682
endpos= anc_buff+ a_length;
683
prev_key= (key_pos == anc_buff + share->keypage_header + nod_flag ?
684
(uchar*) 0 : key_buff);
685
t_length= (*keyinfo->pack_key)(key, nod_flag,
686
(key_pos == endpos ? (uchar*) 0 : key_pos),
687
prev_key, prev_key, &s_temp);
689
if (prev_key && (keyinfo->flag & (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
691
DBUG_DUMP("prev_key",(uchar*) prev_key, _ma_keylength(keyinfo,prev_key));
693
if (keyinfo->flag & HA_PACK_KEY)
695
DBUG_PRINT("test",("t_length: %d ref_len: %d",
696
t_length,s_temp.ref_length));
697
DBUG_PRINT("test",("n_ref_len: %d n_length: %d key_pos: 0x%lx",
698
s_temp.n_ref_length, s_temp.n_length, (long) s_temp.key));
703
if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
705
maria_print_error(share, HA_ERR_CRASHED);
706
my_errno=HA_ERR_CRASHED;
709
bmove_upp((uchar*) endpos+t_length,(uchar*) endpos,
710
(uint) (endpos-key_pos));
714
if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
716
maria_print_error(share, HA_ERR_CRASHED);
717
my_errno=HA_ERR_CRASHED;
720
bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
722
(*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
725
if (key->flag & (SEARCH_USER_KEY_HAS_TRANSID | SEARCH_PAGE_KEY_HAS_TRANSID))
726
_ma_mark_page_with_transid(share, anc_buff);
727
_ma_store_page_used(share, anc_buff, a_length);
730
Check if the new key fits totally into the the page
731
(anc_buff is big enough to contain a full page + one key)
733
if (a_length <= (uint) keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE)
735
if (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE - a_length < 32 &&
736
(keyinfo->flag & HA_FULLTEXT) && key_pos == endpos &&
737
share->base.key_reflength <= share->base.rec_reflength &&
738
share->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
741
Normal word. One-level tree. Page is almost full.
742
Let's consider converting.
743
We'll compare 'key' and the first key at anc_buff
745
const uchar *a= key->data;
746
const uchar *b= anc_buff + share->keypage_header + nod_flag;
747
uint alen, blen, ft2len= share->ft2_keyinfo.keylength;
748
/* the very first key on the page is always unpacked */
749
DBUG_ASSERT((*b & 128) == 0);
750
#if HA_FT_MAXLEN >= 127
751
blen= mi_uint2korr(b); b+=2;
752
When you enable this code, as part of the MyISAM->Maria merge of
753
ChangeSet@1.2562, 2008-04-09 07:41:40+02:00, serg@janus.mylan +9 -0
754
restore ft2 functionality, fix bugs.
755
Then this will enable two-level fulltext index, which is not totally
757
So remove this text and inform Guilhem so that he fixes the issue.
761
get_key_length(alen,a);
762
DBUG_ASSERT(info->ft1_to_ft2==0);
764
ha_compare_text(keyinfo->seg->charset, a, alen,
767
/* Yup. converting */
768
info->ft1_to_ft2=(DYNAMIC_ARRAY *)
769
my_malloc(sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
770
my_init_dynamic_array(info->ft1_to_ft2, ft2len, 300, 50);
773
Now, adding all keys from the page to dynarray
774
if the page is a leaf (if not keys will be deleted later)
779
Let's leave the first key on the page, though, because
780
we cannot easily dispatch an empty page here
783
for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
784
insert_dynamic(info->ft1_to_ft2, b);
786
/* fixing the page's length - it contains only one key now */
787
_ma_store_page_used(share, anc_buff, share->keypage_header + blen +
790
/* the rest will be done when we're back from recursion */
795
if (share->now_transactional &&
796
_ma_log_add(info, anc_page, anc_buff, (uint) (endpos - anc_buff),
797
key_pos, s_temp.changed_length, t_length, 0))
800
DBUG_RETURN(0); /* There is room on page */
807
Remove 'born_transactional' here.
808
The only reason for having it here is that the current
809
_ma_balance_page_ can't handle variable length keys.
811
if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
812
father_buff && !insert_last && !info->quick_mode &&
813
!info->s->base.born_transactional)
815
s_temp.key_pos= key_pos;
816
father_page_link->changed= 1;
817
DBUG_RETURN(_ma_balance_page(info, keyinfo, key, anc_buff, anc_page,
818
father_page, father_buff, father_key_pos,
821
DBUG_RETURN(_ma_split_page(info, key, anc_page,
822
anc_buff, org_anc_length,
823
key_pos, s_temp.changed_length, t_length,
824
key_buff, insert_last));
829
@brief split a full page in two and assign emerging item to key
834
key Buffer for middle key
835
split_page Address on disk for split_buff
836
split_buff Page buffer for page that should be split
837
org_split_length Original length of split_buff before key was inserted
838
inserted_key_pos Address in buffer where key was inserted
839
changed_length Number of bytes changed at 'inserted_key_pos'
840
move_length Number of bytes buffer was moved when key was inserted
841
key_buff Key buffer to use for temporary storage of key
842
insert_last_key If we are insert key on rightmost key page
845
split_buff is not stored on disk (caller has to do this)
848
@retval 2 ok (Middle key up from _ma_insert())
852
int _ma_split_page(MARIA_HA *info, MARIA_KEY *key, my_off_t split_page,
854
uint org_split_length,
855
uchar *inserted_key_pos, uint changed_length,
857
uchar *key_buff, my_bool insert_last_key)
859
uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
860
uint page_length, split_length, page_flag;
861
uchar *key_pos,*pos, *after_key, *new_buff;
863
MARIA_KEY_PARAM s_temp;
864
MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
865
MARIA_SHARE *share= info->s;
866
MARIA_KEYDEF *keyinfo= key->keyinfo;
869
DBUG_ENTER("_ma_split_page");
871
LINT_INIT(after_key);
872
DBUG_DUMP("buff", split_buff, _ma_get_page_used(share, split_buff));
874
info->page_changed=1; /* Info->buff is used */
875
info->keyread_buff_used=1;
876
new_buff= info->buff;
877
page_flag= _ma_get_keypage_flag(share, split_buff);
878
nod_flag= _ma_test_if_nod(share, split_buff);
879
key_ref_length= share->keypage_header + nod_flag;
881
tmp_key.data= key_buff;
882
tmp_key.keyinfo= keyinfo;
884
key_pos= _ma_find_last_pos(info, &tmp_key, split_buff, &after_key);
886
key_pos= _ma_find_half_pos(info, &tmp_key, nod_flag, split_buff,
891
key_length= tmp_key.data_length + tmp_key.ref_length;
892
split_length= (uint) (key_pos - split_buff);
893
a_length= _ma_get_page_used(share, split_buff);
894
_ma_store_page_used(share, split_buff, split_length);
899
DBUG_PRINT("test",("Splitting nod"));
900
pos=key_pos-nod_flag;
901
memcpy((uchar*) new_buff + share->keypage_header, (uchar*) pos,
905
/* Move middle item to key and pointer to new page */
906
if ((new_pos= _ma_new(info, PAGECACHE_PRIORITY_HIGH, &page_link)) ==
910
_ma_copy_key(key, &tmp_key);
911
_ma_kpointer(info, key->data + key_length, new_pos);
914
if (!(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag, &key_pos))
917
t_length=(*keyinfo->pack_key)(&tmp_key, nod_flag, (uchar *) 0,
918
(uchar*) 0, (uchar*) 0, &s_temp);
919
length=(uint) ((split_buff + a_length) - key_pos);
920
memcpy((uchar*) new_buff+key_ref_length+t_length,(uchar*) key_pos,
922
(*keyinfo->store_key)(keyinfo,new_buff+key_ref_length,&s_temp);
923
page_length= length + t_length + key_ref_length;
925
bzero(new_buff, share->keypage_header);
926
/* Copy KEYFLAG_FLAG_ISNODE and KEYPAGE_FLAG_HAS_TRANSID from parent page */
927
_ma_store_keypage_flag(share, new_buff, page_flag);
928
_ma_store_page_used(share, new_buff, page_length);
929
/* Copy key number */
930
new_buff[share->keypage_header - KEYPAGE_USED_SIZE - KEYPAGE_KEYID_SIZE -
932
split_buff[share->keypage_header - KEYPAGE_USED_SIZE -
933
KEYPAGE_KEYID_SIZE - KEYPAGE_FLAG_SIZE];
935
res= 2; /* Middle key up */
936
if (share->now_transactional &&
937
_ma_log_new(info, new_pos, new_buff, page_length, keyinfo->key_nr, 0))
939
bzero(new_buff + page_length, share->block_size - page_length);
941
if (_ma_write_keypage(info, keyinfo, new_pos, page_link->write_lock,
942
DFLT_INIT_HITS, new_buff))
945
/* Save changes to split pages */
946
if (share->now_transactional &&
947
_ma_log_split(info, split_page, split_buff, org_split_length,
949
inserted_key_pos, changed_length, move_length,
950
KEY_OP_NONE, (uchar*) 0, 0, 0))
953
DBUG_DUMP_KEY("middle_key", key);
955
} /* _ma_split_page */
959
Calculate how to much to move to split a page in two
961
Returns pointer to start of key.
962
key will contain the key.
963
return_key_length will contain the length of key
964
after_key will contain the position to where the next key starts
967
uchar *_ma_find_half_pos(MARIA_HA *info, MARIA_KEY *key, uint nod_flag,
968
uchar *page, uchar **after_key)
970
uint keys, length, key_ref_length, page_flag;
972
MARIA_SHARE *share= info->s;
973
MARIA_KEYDEF *keyinfo= key->keyinfo;
974
DBUG_ENTER("_ma_find_half_pos");
976
key_ref_length= share->keypage_header + nod_flag;
977
page_flag= _ma_get_keypage_flag(share, page);
978
length= _ma_get_page_used(share, page) - key_ref_length;
979
page+= key_ref_length; /* Point to first key */
980
if (!(keyinfo->flag &
981
(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
982
HA_BINARY_PACK_KEY)) && !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
984
key_ref_length= keyinfo->keylength+nod_flag;
985
key->data_length= keyinfo->keylength - info->s->rec_reflength;
986
key->ref_length= info->s->rec_reflength;
988
keys=length/(key_ref_length*2);
989
end=page+keys*key_ref_length;
990
*after_key=end+key_ref_length;
991
memcpy(key->data, end, key_ref_length);
995
end=page+length/2-key_ref_length; /* This is aprox. half */
996
key->data[0]= 0; /* Safety */
1000
if (!(length= (*keyinfo->get_key)(key, page_flag, nod_flag, &page)))
1002
} while (page < end);
1004
DBUG_PRINT("exit",("returns: 0x%lx page: 0x%lx half: 0x%lx",
1005
(long) lastpos, (long) page, (long) end));
1006
DBUG_RETURN(lastpos);
1007
} /* _ma_find_half_pos */
1011
Find second to last key on leaf page
1014
Used to split buffer at last key. In this case the next to last
1015
key will be moved to parent page and last key will be on it's own page.
1018
Add one argument for 'last key value' to get_key so that one can
1019
do the loop without having to copy the found key the whole time
1022
@retval Pointer to the start of the key before the last key
1023
@retval int_key will contain the last key
1026
static uchar *_ma_find_last_pos(MARIA_HA *info, MARIA_KEY *int_key,
1027
uchar *page, uchar **after_key)
1029
uint keys, length, key_ref_length, page_flag;
1030
uint last_data_length, last_ref_length;
1031
uchar *key, *end, *lastpos,*prevpos;
1032
uchar key_buff[MARIA_MAX_KEY_BUFF];
1033
MARIA_SHARE *share= info->s;
1034
MARIA_KEYDEF *keyinfo= int_key->keyinfo;
1035
DBUG_ENTER("_ma_find_last_pos");
1037
key_ref_length= share->keypage_header;
1038
page_flag= _ma_get_keypage_flag(share, page);
1039
length= _ma_get_page_used(share, page) - key_ref_length;
1040
page+=key_ref_length;
1041
if (!(keyinfo->flag &
1042
(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1043
HA_BINARY_PACK_KEY)) && !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1045
keys= length / keyinfo->keylength - 2;
1046
length= keyinfo->keylength;
1047
int_key->data_length= length - info->s->rec_reflength;
1048
int_key->ref_length= info->s->rec_reflength;
1050
end=page+keys*length;
1051
*after_key=end+length;
1052
memcpy(int_key->data, end, length);
1057
LINT_INIT(last_data_length);
1058
LINT_INIT(last_ref_length);
1059
end=page+length-key_ref_length;
1064
int_key->data= key_buff;
1065
key_buff[0]= 0; /* Safety */
1069
prevpos=lastpos; lastpos=page;
1070
last_data_length= int_key->data_length;
1071
last_ref_length= int_key->ref_length;
1072
memcpy(key, key_buff, length); /* previous key */
1073
if (!(length=(*keyinfo->get_key)(int_key, page_flag, 0, &page)))
1075
maria_print_error(keyinfo->share, HA_ERR_CRASHED);
1076
my_errno=HA_ERR_CRASHED;
1081
int_key->data_length= last_data_length;
1082
int_key->ref_length= last_ref_length;
1085
DBUG_PRINT("exit",("returns: 0x%lx page: 0x%lx end: 0x%lx",
1086
(long) prevpos,(long) page,(long) end));
1087
DBUG_RETURN(prevpos);
1088
} /* _ma_find_last_pos */
1092
@brief Balance page with static size keys with page on right/left
1094
@param key Middle key will be stored here
1097
Father_buff will always be changed
1098
Caller must handle saving of curr_buff
1101
@retval 0 Balance was done (father buff is saved)
1102
@retval 1 Middle key up (father buff is not saved)
1106
static int _ma_balance_page(register MARIA_HA *info, MARIA_KEYDEF *keyinfo,
1107
MARIA_KEY *key, uchar *curr_buff,
1109
my_off_t father_page, uchar *father_buff,
1110
uchar *father_key_pos, MARIA_KEY_PARAM *s_temp)
1112
MARIA_PINNED_PAGE *next_page_link;
1113
MARIA_PINNED_PAGE tmp_page_link, *new_page_link= &tmp_page_link;
1114
MARIA_SHARE *share= info->s;
1116
uint k_length,father_length,father_keylength,nod_flag,curr_keylength;
1117
uint right_length,left_length,new_right_length,new_left_length,extra_length;
1118
uint keys, tmp_length, extra_buff_length;
1119
uchar *pos,*buff,*extra_buff, *parting_key;
1120
my_off_t next_page,new_pos;
1121
uchar tmp_part_key[MARIA_MAX_KEY_BUFF];
1122
DBUG_ENTER("_ma_balance_page");
1124
k_length=keyinfo->keylength;
1125
father_length= _ma_get_page_used(share, father_buff);
1126
father_keylength= k_length + share->base.key_reflength;
1127
nod_flag= _ma_test_if_nod(share, curr_buff);
1128
curr_keylength=k_length+nod_flag;
1129
info->page_changed=1;
1131
if ((father_key_pos != father_buff+father_length &&
1132
(info->state->records & 1)) ||
1133
father_key_pos == father_buff+ share->keypage_header +
1134
share->base.key_reflength)
1137
next_page= _ma_kpos(share->base.key_reflength,
1138
father_key_pos+father_keylength);
1140
DBUG_PRINT("info", ("use right page: %lu", (ulong) next_page));
1145
father_key_pos-=father_keylength;
1146
next_page= _ma_kpos(share->base.key_reflength,father_key_pos);
1147
/* Move curr_buff so that it's on the left */
1149
curr_buff= info->buff;
1150
DBUG_PRINT("info", ("use left page: %lu", (ulong) next_page));
1151
} /* father_key_pos ptr to parting key */
1153
if (!_ma_fetch_keypage(info,keyinfo, next_page, PAGECACHE_LOCK_WRITE,
1154
DFLT_INIT_HITS, info->buff, 0, &next_page_link))
1156
next_page_link->changed= 1;
1157
DBUG_DUMP("next", info->buff, _ma_get_page_used(share, info->buff));
1159
/* Test if there is room to share keys */
1160
left_length= _ma_get_page_used(share, curr_buff);
1161
right_length= _ma_get_page_used(share, buff);
1162
keys= ((left_length+right_length-share->keypage_header*2-nod_flag*2)/
1165
if ((right ? right_length : left_length) + curr_keylength <=
1166
(uint) keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE)
1168
/* Enough space to hold all keys in the two buffers ; Balance bufferts */
1169
new_left_length= share->keypage_header+nod_flag+(keys/2)*curr_keylength;
1170
new_right_length=share->keypage_header+nod_flag+(((keys+1)/2)*
1172
_ma_store_page_used(share, curr_buff, new_left_length);
1173
_ma_store_page_used(share, buff, new_right_length);
1175
DBUG_PRINT("info", ("left_length: %u -> %u right_length: %u -> %u",
1176
left_length, new_left_length,
1177
right_length, new_right_length));
1178
if (left_length < new_left_length)
1181
DBUG_PRINT("info", ("move keys to end of buff"));
1183
/* Move keys buff -> curr_buff */
1184
pos=curr_buff+left_length;
1185
memcpy(pos,father_key_pos, (size_t) k_length);
1186
memcpy(pos+k_length, buff + share->keypage_header,
1187
(size_t) (length=new_left_length - left_length - k_length));
1188
pos= buff + share->keypage_header + length;
1189
memcpy(father_key_pos, pos, (size_t) k_length);
1190
bmove(buff + share->keypage_header, pos + k_length, new_right_length);
1192
if (share->now_transactional)
1197
Log changes to page on left
1198
The original page is on the left and stored in curr_buff
1199
We have on the page the newly inserted key and data
1200
from buff added last on the page
1202
if (_ma_log_split(info, curr_page, curr_buff,
1203
left_length - s_temp->move_length,
1205
s_temp->key_pos, s_temp->changed_length,
1206
s_temp->move_length,
1208
curr_buff + left_length,
1209
new_left_length - left_length,
1210
new_left_length - left_length+ k_length))
1213
Log changes to page on right
1214
This contains the original data with some keys deleted from
1217
if (_ma_log_prefix(info, next_page, buff, 0,
1218
((int) new_right_length - (int) right_length)))
1224
Log changes to page on right (the original page) which is in buff
1225
Data is removed from start of page
1226
The inserted key may be in buff or moved to curr_buff
1228
if (_ma_log_del_prefix(info, curr_page, buff,
1229
right_length - s_temp->changed_length,
1231
s_temp->key_pos, s_temp->changed_length,
1232
s_temp->move_length))
1235
Log changes to page on left, which has new data added last
1237
if (_ma_log_suffix(info, next_page, curr_buff,
1238
left_length, new_left_length))
1246
DBUG_PRINT("info", ("move keys to start of buff"));
1248
bmove_upp(buff + new_right_length, buff + right_length,
1249
right_length - share->keypage_header);
1250
length= new_right_length -right_length - k_length;
1251
memcpy(buff + share->keypage_header + length, father_key_pos,
1253
pos=curr_buff+new_left_length;
1254
memcpy(father_key_pos, pos, (size_t) k_length);
1255
memcpy(buff + share->keypage_header, pos+k_length, (size_t) length);
1257
if (share->now_transactional)
1262
Log changes to page on left
1263
The original page is on the left and stored in curr_buff
1264
The page is shortened from end and the key may be on the page
1266
if (_ma_log_split(info, curr_page, curr_buff,
1267
left_length - s_temp->move_length,
1269
s_temp->key_pos, s_temp->changed_length,
1270
s_temp->move_length,
1271
KEY_OP_NONE, (uchar*) 0, 0, 0))
1274
Log changes to page on right
1275
This contains the original data, with some data from cur_buff
1278
if (_ma_log_prefix(info, next_page, buff,
1279
(uint) (new_right_length - right_length),
1280
(int) (new_right_length - right_length)))
1286
Log changes to page on right (the original page) which is in buff
1287
We have on the page the newly inserted key and data
1288
from buff added first on the page
1290
uint diff_length= new_right_length - right_length;
1291
if (_ma_log_split(info, curr_page, buff,
1292
left_length - s_temp->move_length,
1294
s_temp->key_pos + diff_length,
1295
s_temp->changed_length,
1296
s_temp->move_length,
1298
buff + share->keypage_header,
1299
diff_length, diff_length + k_length))
1302
Log changes to page on left, which is shortened from end
1304
if (_ma_log_suffix(info, next_page, curr_buff,
1305
left_length, new_left_length))
1311
/* Log changes to father (one level up) page */
1313
if (share->now_transactional &&
1314
_ma_log_change(info, father_page, father_buff, father_key_pos,
1319
next_page_link->changed is marked as true above and fathers
1320
page_link->changed is marked as true in caller
1322
if (_ma_write_keypage(info, keyinfo, next_page,
1323
PAGECACHE_LOCK_LEFT_WRITELOCKED,
1324
DFLT_INIT_HITS, info->buff) ||
1325
_ma_write_keypage(info, keyinfo, father_page,
1326
PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS,
1332
/* curr_buff[] and buff[] are full, lets split and make new nod */
1334
extra_buff= info->buff+share->base.max_key_block_length;
1335
new_left_length= new_right_length= (share->keypage_header + nod_flag +
1336
(keys+1) / 3 * curr_keylength);
1338
5 is the minum number of keys we can have here. This comes from
1339
the fact that each full page can store at least 2 keys and in this case
1340
we have a 'split' key, ie 2+2+1 = 5
1342
if (keys == 5) /* Too few keys to balance */
1343
new_left_length-=curr_keylength;
1344
extra_length= (nod_flag + left_length + right_length -
1345
new_left_length - new_right_length - curr_keylength);
1346
extra_buff_length= extra_length + share->keypage_header;
1347
DBUG_PRINT("info",("left_length: %d right_length: %d new_left_length: %d new_right_length: %d extra_length: %d",
1348
left_length, right_length,
1349
new_left_length, new_right_length,
1351
_ma_store_page_used(share, curr_buff, new_left_length);
1352
_ma_store_page_used(share, buff, new_right_length);
1354
bzero(extra_buff, share->keypage_header);
1356
_ma_store_keypage_flag(share, extra_buff, KEYPAGE_FLAG_ISNOD);
1357
/* Copy key number */
1358
extra_buff[share->keypage_header - KEYPAGE_USED_SIZE - KEYPAGE_KEYID_SIZE -
1360
buff[share->keypage_header - KEYPAGE_USED_SIZE - KEYPAGE_KEYID_SIZE -
1362
_ma_store_page_used(share, extra_buff, extra_buff_length);
1364
/* move first largest keys to new page */
1365
pos=buff+right_length-extra_length;
1366
memcpy(extra_buff + share->keypage_header, pos, extra_length);
1367
/* Zero old data from buffer */
1368
bzero(extra_buff + extra_buff_length,
1369
share->block_size - extra_buff_length);
1371
/* Save new parting key between buff and extra_buff */
1372
memcpy(tmp_part_key, pos-k_length,k_length);
1373
/* Make place for new keys */
1374
bmove_upp(buff+ new_right_length, pos - k_length,
1375
right_length - extra_length - k_length - share->keypage_header);
1376
/* Copy keys from left page */
1377
pos= curr_buff+new_left_length;
1378
memcpy(buff + share->keypage_header, pos + k_length,
1379
(size_t) (tmp_length= left_length - new_left_length - k_length));
1380
/* Copy old parting key */
1381
parting_key= buff + share->keypage_header + tmp_length;
1382
memcpy(parting_key, father_key_pos, (size_t) k_length);
1384
/* Move new parting keys up to caller */
1385
memcpy((right ? key->data : father_key_pos),pos,(size_t) k_length);
1386
memcpy((right ? father_key_pos : key->data),tmp_part_key, k_length);
1388
if ((new_pos= _ma_new(info, DFLT_INIT_HITS, &new_page_link))
1391
_ma_kpointer(info,key->data+k_length,new_pos);
1392
/* This is safe as long we are using not keys with transid */
1393
key->data_length= k_length - info->s->rec_reflength;
1394
key->ref_length= info->s->rec_reflength;
1396
if (share->now_transactional)
1401
Page order according to key values:
1402
orignal_page (curr_buff), next_page (buff), extra_buff
1404
cur_buff is shortened,
1405
buff is getting new keys at start and shortened from end.
1406
extra_buff is new page
1408
Note that extra_buff (largest key parts) will be stored at the
1409
place of the original 'right' page (next_page) and right page (buff)
1410
will be stored at new_pos.
1412
This makes the log entries smaller as right_page contains all
1413
data to generate the data extra_buff
1417
Log changes to page on left (page shortened page at end)
1419
if (_ma_log_split(info, curr_page, curr_buff,
1420
left_length - s_temp->move_length, new_left_length,
1421
s_temp->key_pos, s_temp->changed_length,
1422
s_temp->move_length,
1423
KEY_OP_NONE, (uchar*) 0, 0, 0))
1426
Log changes to right page (stored at next page)
1427
This contains the last 'extra_buff' from 'buff'
1429
if (_ma_log_prefix(info, next_page, extra_buff,
1430
0, (int) (extra_buff_length - right_length)))
1434
Log changes to middle page, which is stored at the new page
1437
if (_ma_log_new(info, new_pos, buff, new_right_length,
1438
keyinfo->key_nr, 0))
1444
Log changes to page on right (the original page) which is in buff
1445
This contains the original data, with some data from curr_buff
1446
added first and shortened at end
1448
int data_added_first= left_length - new_left_length;
1449
if (_ma_log_key_middle(info, curr_page, buff,
1455
s_temp->changed_length,
1456
s_temp->move_length))
1459
/* Log changes to page on left, which is shortened from end */
1460
if (_ma_log_suffix(info, next_page, curr_buff,
1461
left_length, new_left_length))
1464
/* Log change to rightmost (new) page */
1465
if (_ma_log_new(info, new_pos, extra_buff,
1466
extra_buff_length, keyinfo->key_nr, 0))
1470
/* Log changes to father (one level up) page */
1471
if (share->now_transactional &&
1472
_ma_log_change(info, father_page, father_buff, father_key_pos,
1477
if (_ma_write_keypage(info, keyinfo, (right ? new_pos : next_page),
1478
(right ? new_page_link->write_lock :
1479
PAGECACHE_LOCK_LEFT_WRITELOCKED),
1480
DFLT_INIT_HITS, info->buff) ||
1481
_ma_write_keypage(info, keyinfo, (right ? next_page : new_pos),
1482
(!right ? new_page_link->write_lock :
1483
PAGECACHE_LOCK_LEFT_WRITELOCKED),
1484
DFLT_INIT_HITS, extra_buff))
1487
DBUG_RETURN(1); /* Middle key up */
1491
} /* _ma_balance_page */
1494
/**********************************************************************
1495
* Bulk insert code *
1496
**********************************************************************/
1501
} bulk_insert_param;
1504
static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key)
1507
uint keynr= key->keyinfo->key_nr;
1508
DBUG_ENTER("_ma_ck_write_tree");
1510
/* Store ref_length as this is always constant */
1511
info->bulk_insert_ref_length= key->ref_length;
1512
error= tree_insert(&info->bulk_insert[keynr], key->data,
1513
key->data_length + key->ref_length,
1514
info->bulk_insert[keynr].custom_arg) == 0;
1516
} /* _ma_ck_write_tree */
1519
/* typeof(_ma_keys_compare)=qsort_cmp2 */
1521
static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
1524
return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
1525
key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
1530
static int keys_free(uchar *key, TREE_FREE mode, bulk_insert_param *param)
1533
Probably I can use info->lastkey here, but I'm not sure,
1534
and to be safe I'd better use local lastkey.
1536
MARIA_SHARE *share= param->info->s;
1537
uchar lastkey[MARIA_MAX_KEY_BUFF];
1539
MARIA_KEYDEF *keyinfo= share->keyinfo + param->keynr;
1544
if (share->lock_key_trees)
1546
rw_wrlock(&keyinfo->root_lock);
1551
/* Note: keylen doesn't contain transid lengths */
1552
keylen= _ma_keylength(keyinfo, key);
1553
tmp_key.data= lastkey;
1554
tmp_key.keyinfo= keyinfo;
1555
tmp_key.data_length= keylen - share->rec_reflength;
1556
tmp_key.ref_length= param->info->bulk_insert_ref_length;
1557
tmp_key.flag= (param->info->bulk_insert_ref_length ==
1558
share->rec_reflength ? 0 : SEARCH_USER_KEY_HAS_TRANSID);
1560
We have to copy key as ma_ck_write_btree may need the buffer for
1561
copying middle key up if tree is growing
1563
memcpy(lastkey, key, tmp_key.data_length + tmp_key.ref_length);
1564
return _ma_ck_write_btree(param->info, &tmp_key);
1566
if (share->lock_key_trees)
1567
rw_unlock(&keyinfo->root_lock);
1574
int maria_init_bulk_insert(MARIA_HA *info, ulong cache_size, ha_rows rows)
1576
MARIA_SHARE *share= info->s;
1577
MARIA_KEYDEF *key=share->keyinfo;
1578
bulk_insert_param *params;
1579
uint i, num_keys, total_keylength;
1581
DBUG_ENTER("_ma_init_bulk_insert");
1582
DBUG_PRINT("enter",("cache_size: %lu", cache_size));
1584
DBUG_ASSERT(!info->bulk_insert &&
1585
(!rows || rows >= MARIA_MIN_ROWS_TO_USE_BULK_INSERT));
1587
maria_clear_all_keys_active(key_map);
1588
for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
1590
if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
1591
maria_is_key_active(share->state.key_map, i))
1594
maria_set_key_active(key_map, i);
1595
total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
1600
num_keys * MARIA_MIN_SIZE_BULK_INSERT_TREE > cache_size)
1603
if (rows && rows*total_keylength < cache_size)
1604
cache_size= (ulong)rows;
1606
cache_size/=total_keylength*16;
1608
info->bulk_insert=(TREE *)
1609
my_malloc((sizeof(TREE)*share->base.keys+
1610
sizeof(bulk_insert_param)*num_keys),MYF(0));
1612
if (!info->bulk_insert)
1613
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1615
params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
1616
for (i=0 ; i < share->base.keys ; i++)
1618
if (maria_is_key_active(key_map, i))
1622
/* Only allocate a 16'th of the buffer at a time */
1623
init_tree(&info->bulk_insert[i],
1624
cache_size * key[i].maxlength,
1625
cache_size * key[i].maxlength, 0,
1626
(qsort_cmp2)keys_compare, 0,
1627
(tree_element_free) keys_free, (void *)params++);
1630
info->bulk_insert[i].root=0;
1636
void maria_flush_bulk_insert(MARIA_HA *info, uint inx)
1638
if (info->bulk_insert)
1640
if (is_tree_inited(&info->bulk_insert[inx]))
1641
reset_tree(&info->bulk_insert[inx]);
1645
void maria_end_bulk_insert(MARIA_HA *info, my_bool abort)
1647
DBUG_ENTER("maria_end_bulk_insert");
1648
if (info->bulk_insert)
1651
for (i=0 ; i < info->s->base.keys ; i++)
1653
if (is_tree_inited(&info->bulk_insert[i]))
1656
reset_free_element(&info->bulk_insert[i]);
1657
delete_tree(&info->bulk_insert[i]);
1660
my_free(info->bulk_insert, MYF(0));
1661
info->bulk_insert= 0;
1667
/****************************************************************************
1668
Dedicated functions that generate log entries
1669
****************************************************************************/
1672
int _ma_write_undo_key_insert(MARIA_HA *info, const MARIA_KEY *key,
1673
my_off_t *root, my_off_t new_root, LSN *res_lsn)
1675
MARIA_SHARE *share= info->s;
1676
MARIA_KEYDEF *keyinfo= key->keyinfo;
1677
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
1679
const uchar *key_value;
1680
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1681
struct st_msg_to_write_hook_for_undo_key msg;
1684
/* Save if we need to write a clr record */
1685
lsn_store(log_data, info->trn->undo_lsn);
1686
key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
1688
key_length= key->data_length + key->ref_length;
1689
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1690
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
1691
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key->data;
1692
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
1695
msg.value= new_root;
1696
msg.auto_increment= 0;
1697
key_value= key->data;
1698
if (share->base.auto_key == ((uint) keyinfo->key_nr + 1))
1700
const HA_KEYSEG *keyseg= keyinfo->seg;
1701
uchar reversed[MARIA_MAX_KEY_BUFF];
1702
if (keyseg->flag & HA_SWAP_KEY)
1704
/* We put key from log record to "data record" packing format... */
1705
const uchar *key_ptr= key->data, *key_end= key->data + keyseg->length;
1706
uchar *to= reversed + keyseg->length;
1710
} while (key_ptr != key_end);
1713
/* ... so that we can read it with: */
1715
ma_retrieve_auto_increment(key_value, keyseg->type);
1716
/* and write_hook_for_undo_key_insert() will pick this. */
1719
return translog_write_record(res_lsn, LOGREC_UNDO_KEY_INSERT,
1722
log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
1724
TRANSLOG_INTERNAL_PARTS + 2, log_array,
1725
log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
1730
@brief Log creation of new page
1733
We don't have to store the page_length into the log entry as we can
1734
calculate this from the length of the log entry
1740
my_bool _ma_log_new(MARIA_HA *info, my_off_t page, const uchar *buff,
1741
uint page_length, uint key_nr, my_bool root_page)
1744
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE
1746
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1747
MARIA_SHARE *share= info->s;
1748
DBUG_ENTER("_ma_log_new");
1749
DBUG_PRINT("enter", ("page: %lu", (ulong) page));
1751
DBUG_ASSERT(share->now_transactional);
1753
/* Store address of new root page */
1754
page/= share->block_size;
1755
page_store(log_data + FILEID_STORE_SIZE, page);
1757
/* Store link to next unused page */
1758
if (info->used_key_del == 2)
1759
page= 0; /* key_del not changed */
1761
page= ((share->current_key_del == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
1762
share->current_key_del / share->block_size);
1764
page_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE, page);
1765
key_nr_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE*2, key_nr);
1766
log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE*2 + KEY_NR_STORE_SIZE]=
1769
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1770
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
1772
page_length-= LSN_STORE_SIZE;
1773
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= buff + LSN_STORE_SIZE;
1774
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= page_length;
1776
if (translog_write_record(&lsn, LOGREC_REDO_INDEX_NEW_PAGE,
1778
(translog_size_t) (sizeof(log_data) + page_length),
1779
TRANSLOG_INTERNAL_PARTS + 2, log_array,
1788
Log when some part of the key page changes
1791
my_bool _ma_log_change(MARIA_HA *info, my_off_t page, const uchar *buff,
1792
const uchar *key_pos, uint length)
1795
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 6 + 7], *log_pos;
1796
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
1797
uint offset= (uint) (key_pos - buff), translog_parts, extra_length= 0;
1798
DBUG_ENTER("_ma_log_change");
1799
DBUG_PRINT("enter", ("page: %lu length: %u", (ulong) page, length));
1801
DBUG_ASSERT(info->s->now_transactional);
1803
/* Store address of new root page */
1804
page/= info->s->block_size;
1805
page_store(log_data + FILEID_STORE_SIZE, page);
1806
log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
1807
log_pos[0]= KEY_OP_OFFSET;
1808
int2store(log_pos+1, offset);
1809
log_pos[3]= KEY_OP_CHANGE;
1810
int2store(log_pos+4, length);
1812
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1813
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data) - 7;
1814
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
1815
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= length;
1818
#ifdef EXTRA_DEBUG_KEY_CHANGES
1820
int page_length= _ma_get_page_used(info->s, buff);
1822
crc= my_checksum(0, buff + LSN_STORE_SIZE, page_length - LSN_STORE_SIZE);
1824
log_pos[0]= KEY_OP_CHECK;
1825
int2store(log_pos+1, page_length);
1826
int4store(log_pos+3, crc);
1827
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= (char *) log_pos;
1828
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
1834
if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
1836
(translog_size_t) (sizeof(log_data) - 7 + length +
1838
TRANSLOG_INTERNAL_PARTS + translog_parts,
1839
log_array, log_data, NULL))
1846
@brief Write log entry for page splitting
1849
Write log entry for page that has got a key added to the page under
1850
one and only one of the following senarios:
1851
- Page is shortened from end
1852
- Data is added to end of page
1853
- Data added at front of page
1855
@param prefix_or_suffix KEY_OP_NONE Ignored
1856
KEY_OP_ADD_PREFIX Add data to start of page
1857
KEY_OP_ADD_SUFFIX Add data to end of page
1861
static my_bool _ma_log_split(MARIA_HA *info, my_off_t page, const uchar *buff,
1862
uint org_length, uint new_length,
1863
const uchar *key_pos, uint key_length,
1864
int move_length, enum en_key_op prefix_or_suffix,
1865
const uchar *data, uint data_length,
1866
uint changed_length)
1869
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3+3+3+3+3+2];
1871
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
1872
uint offset= (uint) (key_pos - buff);
1873
uint translog_parts, extra_length;
1874
DBUG_ENTER("_ma_log_split");
1875
DBUG_PRINT("enter", ("page: %lu org_length: %u new_length: %u",
1876
(ulong) page, org_length, new_length));
1878
log_pos= log_data + FILEID_STORE_SIZE;
1879
page/= info->s->block_size;
1880
page_store(log_pos, page);
1881
log_pos+= PAGE_STORE_SIZE;
1883
if (new_length <= offset || !key_pos)
1886
Page was split before inserted key. Write redo entry where
1887
we just cut current page at page_length
1889
uint length_offset= org_length - new_length;
1890
log_pos[0]= KEY_OP_DEL_SUFFIX;
1891
int2store(log_pos+1, length_offset);
1898
/* Key was added to page which was split after the inserted key */
1899
uint max_key_length;
1902
Handle case when split happened directly after the newly inserted key.
1904
max_key_length= new_length - offset;
1905
extra_length= min(key_length, max_key_length);
1907
if ((int) new_length < (int) (org_length + move_length + data_length))
1910
uint diff= org_length + move_length + data_length - new_length;
1911
log_pos[0]= KEY_OP_DEL_SUFFIX;
1912
int2store(log_pos + 1, diff);
1917
DBUG_ASSERT(new_length == org_length + move_length + data_length);
1920
log_pos[0]= KEY_OP_OFFSET;
1921
int2store(log_pos+1, offset);
1926
log_pos[0]= KEY_OP_SHIFT;
1927
int2store(log_pos+1, move_length);
1931
log_pos[0]= KEY_OP_CHANGE;
1932
int2store(log_pos+1, extra_length);
1935
/* Point to original inserted key data */
1936
if (prefix_or_suffix == KEY_OP_ADD_PREFIX)
1937
key_pos+= data_length;
1940
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
1941
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= extra_length;
1946
/* Add prefix or suffix */
1947
log_pos[0]= prefix_or_suffix;
1948
int2store(log_pos+1, data_length);
1950
if (prefix_or_suffix == KEY_OP_ADD_PREFIX)
1952
int2store(log_pos+1, changed_length);
1954
data_length= changed_length;
1956
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= data;
1957
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= data_length;
1959
extra_length+= data_length;
1962
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1963
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
1965
DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
1968
log_array[TRANSLOG_INTERNAL_PARTS +
1969
0].length + extra_length,
1970
TRANSLOG_INTERNAL_PARTS + translog_parts,
1971
log_array, log_data, NULL));
1977
Write log entry for page that has got a key added to the page
1978
and page is shortened from start of page
1980
@fn _ma_log_del_prefix()
1981
@param info Maria handler
1982
@param page Page number
1983
@param buff Page buffer
1984
@param org_length Length of buffer when read
1985
@param new_length Final length
1986
@param key_pos Where on page buffer key was added. This is position
1987
before prefix was removed
1988
@param key_length How many bytes was changed at 'key_pos'
1989
@param move_length How many bytes was moved up when key was added
1996
static my_bool _ma_log_del_prefix(MARIA_HA *info, my_off_t page,
1998
uint org_length, uint new_length,
1999
const uchar *key_pos, uint key_length,
2003
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 12], *log_pos;
2004
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
2005
uint offset= (uint) (key_pos - buff);
2006
uint diff_length= org_length + move_length - new_length;
2007
uint translog_parts, extra_length;
2008
DBUG_ENTER("_ma_log_del_prefix");
2009
DBUG_PRINT("enter", ("page: %lu org_length: %u new_length: %u",
2010
(ulong) page, org_length, new_length));
2012
DBUG_ASSERT((int) diff_length > 0);
2014
log_pos= log_data + FILEID_STORE_SIZE;
2015
page/= info->s->block_size;
2016
page_store(log_pos, page);
2017
log_pos+= PAGE_STORE_SIZE;
2022
if (offset < diff_length + info->s->keypage_header)
2025
Key is not anymore on page. Move data down, but take into account that
2026
the original page had grown with 'move_length bytes'
2028
DBUG_ASSERT(offset + key_length <= diff_length + info->s->keypage_header);
2030
log_pos[0]= KEY_OP_DEL_PREFIX;
2031
int2store(log_pos+1, diff_length - move_length);
2037
Correct position to key, as data before key has been delete and key
2038
has thus been moved down
2040
offset-= diff_length;
2041
key_pos-= diff_length;
2043
/* Move data down */
2044
log_pos[0]= KEY_OP_DEL_PREFIX;
2045
int2store(log_pos+1, diff_length);
2048
log_pos[0]= KEY_OP_OFFSET;
2049
int2store(log_pos+1, offset);
2054
log_pos[0]= KEY_OP_SHIFT;
2055
int2store(log_pos+1, move_length);
2058
log_pos[0]= KEY_OP_CHANGE;
2059
int2store(log_pos+1, key_length);
2061
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
2062
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
2064
extra_length= key_length;
2066
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2067
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2069
DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2072
log_array[TRANSLOG_INTERNAL_PARTS +
2073
0].length + extra_length,
2074
TRANSLOG_INTERNAL_PARTS + translog_parts,
2075
log_array, log_data, NULL));
2081
Write log entry for page that has got data added first and
2082
data deleted last. Old changed key may be part of page
2085
static my_bool _ma_log_key_middle(MARIA_HA *info, my_off_t page,
2088
uint data_added_first,
2089
uint data_changed_first,
2090
uint data_deleted_last,
2091
const uchar *key_pos,
2092
uint key_length, int move_length)
2095
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3+5+3+3+3];
2097
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
2099
uint translog_parts, extra_length;
2100
DBUG_ENTER("_ma_log_key_middle");
2101
DBUG_PRINT("enter", ("page: %lu", (ulong) page));
2103
/* new place of key after changes */
2104
key_pos+= data_added_first;
2105
key_offset= (uint) (key_pos - buff);
2106
if (key_offset < new_length)
2108
/* key is on page; Calculate how much of the key is there */
2109
uint max_key_length= new_length - key_offset;
2110
if (max_key_length < key_length)
2112
/* Key is last on page */
2113
key_length= max_key_length;
2117
Take into account that new data was added as part of original key
2118
that also needs to be removed from page
2120
data_deleted_last+= move_length;
2123
page/= info->s->block_size;
2125
/* First log changes to page */
2126
log_pos= log_data + FILEID_STORE_SIZE;
2127
page_store(log_pos, page);
2128
log_pos+= PAGE_STORE_SIZE;
2130
log_pos[0]= KEY_OP_DEL_SUFFIX;
2131
int2store(log_pos+1, data_deleted_last);
2134
log_pos[0]= KEY_OP_ADD_PREFIX;
2135
int2store(log_pos+1, data_added_first);
2136
int2store(log_pos+3, data_changed_first);
2139
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2140
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2142
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (buff +
2143
info->s->keypage_header);
2144
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_changed_first;
2146
extra_length= data_changed_first;
2148
/* If changed key is on page, log those changes too */
2150
if (key_offset < new_length)
2152
uchar *start_log_pos= log_pos;
2154
log_pos[0]= KEY_OP_OFFSET;
2155
int2store(log_pos+1, key_offset);
2159
log_pos[0]= KEY_OP_SHIFT;
2160
int2store(log_pos+1, move_length);
2163
log_pos[0]= KEY_OP_CHANGE;
2164
int2store(log_pos+1, key_length);
2167
log_array[TRANSLOG_INTERNAL_PARTS + 2].str= start_log_pos;
2168
log_array[TRANSLOG_INTERNAL_PARTS + 2].length= (uint) (log_pos -
2171
log_array[TRANSLOG_INTERNAL_PARTS + 3].str= key_pos;
2172
log_array[TRANSLOG_INTERNAL_PARTS + 3].length= key_length;
2174
extra_length+= (uint) (log_array[TRANSLOG_INTERNAL_PARTS + 2].length +
2178
DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2181
(log_array[TRANSLOG_INTERNAL_PARTS +
2182
0].length + extra_length),
2183
TRANSLOG_INTERNAL_PARTS + translog_parts,
2184
log_array, log_data, NULL));
2192
Write log entry for page that has got data added first and
2196
static my_bool _ma_log_middle(MARIA_HA *info, my_off_t page,
2198
uint data_added_first, uint data_changed_first,
2199
uint data_deleted_last)
2202
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
2203
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3 + 5], *log_pos;
2204
DBUG_ENTER("_ma_log_middle");
2205
DBUG_PRINT("enter", ("page: %lu", (ulong) page));
2207
page/= info->s->block_size;
2209
log_pos= log_data + FILEID_STORE_SIZE;
2210
page_store(log_pos, page);
2211
log_pos+= PAGE_STORE_SIZE;
2213
log_pos[0]= KEY_OP_DEL_PREFIX;
2214
int2store(log_pos+1, data_deleted_last);
2217
log_pos[0]= KEY_OP_ADD_PREFIX;
2218
int2store(log_pos+1, data_added_first);
2219
int2store(log_pos+3, data_changed_first);
2222
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2223
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2226
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= ((char*) buff +
2227
info->s->keypage_header);
2228
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_changed_first;
2229
DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2232
log_array[TRANSLOG_INTERNAL_PARTS +
2233
0].length + data_changed_first,
2234
TRANSLOG_INTERNAL_PARTS + 2,
2235
log_array, log_data, NULL));