~ubuntu-branches/ubuntu/precise/mysql-5.5/precise-201203300109

« back to all changes in this revision

Viewing changes to storage/innobase/row/row0ins.c

  • Committer: Package Import Robot
  • Author(s): Clint Byrum
  • Date: 2011-11-08 11:31:13 UTC
  • Revision ID: package-import@ubuntu.com-20111108113113-3ulw01fvi4vn8m25
Tags: upstream-5.5.17
ImportĀ upstreamĀ versionĀ 5.5.17

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*****************************************************************************
 
2
 
 
3
Copyright (c) 1996, 2010, Innobase Oy. All Rights Reserved.
 
4
 
 
5
This program is free software; you can redistribute it and/or modify it under
 
6
the terms of the GNU General Public License as published by the Free Software
 
7
Foundation; version 2 of the License.
 
8
 
 
9
This program is distributed in the hope that it will be useful, but WITHOUT
 
10
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 
11
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 
12
 
 
13
You should have received a copy of the GNU General Public License along with
 
14
this program; if not, write to the Free Software Foundation, Inc., 59 Temple
 
15
Place, Suite 330, Boston, MA 02111-1307 USA
 
16
 
 
17
*****************************************************************************/
 
18
 
 
19
/**************************************************//**
 
20
@file row/row0ins.c
 
21
Insert into a table
 
22
 
 
23
Created 4/20/1996 Heikki Tuuri
 
24
*******************************************************/
 
25
 
 
26
#include "row0ins.h"
 
27
 
 
28
#ifdef UNIV_NONINL
 
29
#include "row0ins.ic"
 
30
#endif
 
31
 
 
32
#include "ha_prototypes.h"
 
33
#include "dict0dict.h"
 
34
#include "dict0boot.h"
 
35
#include "trx0undo.h"
 
36
#include "btr0btr.h"
 
37
#include "btr0cur.h"
 
38
#include "mach0data.h"
 
39
#include "que0que.h"
 
40
#include "row0upd.h"
 
41
#include "row0sel.h"
 
42
#include "row0row.h"
 
43
#include "rem0cmp.h"
 
44
#include "lock0lock.h"
 
45
#include "log0log.h"
 
46
#include "eval0eval.h"
 
47
#include "data0data.h"
 
48
#include "usr0sess.h"
 
49
#include "buf0lru.h"
 
50
 
 
51
#define ROW_INS_PREV    1
 
52
#define ROW_INS_NEXT    2
 
53
 
 
54
/*************************************************************************
 
55
IMPORTANT NOTE: Any operation that generates redo MUST check that there
 
56
is enough space in the redo log before for that operation. This is
 
57
done by calling log_free_check(). The reason for checking the
 
58
availability of the redo log space before the start of the operation is
 
59
that we MUST not hold any synchonization objects when performing the
 
60
check.
 
61
If you make a change in this module make sure that no codepath is
 
62
introduced where a call to log_free_check() is bypassed. */
 
63
 
 
64
/*********************************************************************//**
 
65
Creates an insert node struct.
 
66
@return own: insert node struct */
 
67
UNIV_INTERN
 
68
ins_node_t*
 
69
ins_node_create(
 
70
/*============*/
 
71
        ulint           ins_type,       /*!< in: INS_VALUES, ... */
 
72
        dict_table_t*   table,          /*!< in: table where to insert */
 
73
        mem_heap_t*     heap)           /*!< in: mem heap where created */
 
74
{
 
75
        ins_node_t*     node;
 
76
 
 
77
        node = mem_heap_alloc(heap, sizeof(ins_node_t));
 
78
 
 
79
        node->common.type = QUE_NODE_INSERT;
 
80
 
 
81
        node->ins_type = ins_type;
 
82
 
 
83
        node->state = INS_NODE_SET_IX_LOCK;
 
84
        node->table = table;
 
85
        node->index = NULL;
 
86
        node->entry = NULL;
 
87
 
 
88
        node->select = NULL;
 
89
 
 
90
        node->trx_id = 0;
 
91
 
 
92
        node->entry_sys_heap = mem_heap_create(128);
 
93
 
 
94
        node->magic_n = INS_NODE_MAGIC_N;
 
95
 
 
96
        return(node);
 
97
}
 
98
 
 
99
/***********************************************************//**
 
100
Creates an entry template for each index of a table. */
 
101
UNIV_INTERN
 
102
void
 
103
ins_node_create_entry_list(
 
104
/*=======================*/
 
105
        ins_node_t*     node)   /*!< in: row insert node */
 
106
{
 
107
        dict_index_t*   index;
 
108
        dtuple_t*       entry;
 
109
 
 
110
        ut_ad(node->entry_sys_heap);
 
111
 
 
112
        UT_LIST_INIT(node->entry_list);
 
113
 
 
114
        index = dict_table_get_first_index(node->table);
 
115
 
 
116
        while (index != NULL) {
 
117
                entry = row_build_index_entry(node->row, NULL, index,
 
118
                                              node->entry_sys_heap);
 
119
                UT_LIST_ADD_LAST(tuple_list, node->entry_list, entry);
 
120
 
 
121
                /* We will include all indexes (include those corrupted
 
122
                secondary indexes) in the entry list. Filteration of
 
123
                these corrupted index will be done in row_ins() */
 
124
                index = dict_table_get_next_index(index);
 
125
        }
 
126
}
 
127
 
 
128
/*****************************************************************//**
 
129
Adds system field buffers to a row. */
 
130
static
 
131
void
 
132
row_ins_alloc_sys_fields(
 
133
/*=====================*/
 
134
        ins_node_t*     node)   /*!< in: insert node */
 
135
{
 
136
        dtuple_t*               row;
 
137
        dict_table_t*           table;
 
138
        mem_heap_t*             heap;
 
139
        const dict_col_t*       col;
 
140
        dfield_t*               dfield;
 
141
        byte*                   ptr;
 
142
 
 
143
        row = node->row;
 
144
        table = node->table;
 
145
        heap = node->entry_sys_heap;
 
146
 
 
147
        ut_ad(row && table && heap);
 
148
        ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
 
149
 
 
150
        /* 1. Allocate buffer for row id */
 
151
 
 
152
        col = dict_table_get_sys_col(table, DATA_ROW_ID);
 
153
 
 
154
        dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
 
155
 
 
156
        ptr = mem_heap_zalloc(heap, DATA_ROW_ID_LEN);
 
157
 
 
158
        dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
 
159
 
 
160
        node->row_id_buf = ptr;
 
161
 
 
162
        /* 3. Allocate buffer for trx id */
 
163
 
 
164
        col = dict_table_get_sys_col(table, DATA_TRX_ID);
 
165
 
 
166
        dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
 
167
        ptr = mem_heap_zalloc(heap, DATA_TRX_ID_LEN);
 
168
 
 
169
        dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
 
170
 
 
171
        node->trx_id_buf = ptr;
 
172
 
 
173
        /* 4. Allocate buffer for roll ptr */
 
174
 
 
175
        col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
 
176
 
 
177
        dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
 
178
        ptr = mem_heap_zalloc(heap, DATA_ROLL_PTR_LEN);
 
179
 
 
180
        dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
 
181
}
 
182
 
 
183
/*********************************************************************//**
 
184
Sets a new row to insert for an INS_DIRECT node. This function is only used
 
185
if we have constructed the row separately, which is a rare case; this
 
186
function is quite slow. */
 
187
UNIV_INTERN
 
188
void
 
189
ins_node_set_new_row(
 
190
/*=================*/
 
191
        ins_node_t*     node,   /*!< in: insert node */
 
192
        dtuple_t*       row)    /*!< in: new row (or first row) for the node */
 
193
{
 
194
        node->state = INS_NODE_SET_IX_LOCK;
 
195
        node->index = NULL;
 
196
        node->entry = NULL;
 
197
 
 
198
        node->row = row;
 
199
 
 
200
        mem_heap_empty(node->entry_sys_heap);
 
201
 
 
202
        /* Create templates for index entries */
 
203
 
 
204
        ins_node_create_entry_list(node);
 
205
 
 
206
        /* Allocate from entry_sys_heap buffers for sys fields */
 
207
 
 
208
        row_ins_alloc_sys_fields(node);
 
209
 
 
210
        /* As we allocated a new trx id buf, the trx id should be written
 
211
        there again: */
 
212
 
 
213
        node->trx_id = 0;
 
214
}
 
215
 
 
216
/*******************************************************************//**
 
217
Does an insert operation by updating a delete-marked existing record
 
218
in the index. This situation can occur if the delete-marked record is
 
219
kept in the index for consistent reads.
 
220
@return DB_SUCCESS or error code */
 
221
static
 
222
ulint
 
223
row_ins_sec_index_entry_by_modify(
 
224
/*==============================*/
 
225
        ulint           mode,   /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
 
226
                                depending on whether mtr holds just a leaf
 
227
                                latch or also a tree latch */
 
228
        btr_cur_t*      cursor, /*!< in: B-tree cursor */
 
229
        const dtuple_t* entry,  /*!< in: index entry to insert */
 
230
        que_thr_t*      thr,    /*!< in: query thread */
 
231
        mtr_t*          mtr)    /*!< in: mtr; must be committed before
 
232
                                latching any further pages */
 
233
{
 
234
        big_rec_t*      dummy_big_rec;
 
235
        mem_heap_t*     heap;
 
236
        upd_t*          update;
 
237
        rec_t*          rec;
 
238
        ulint           err;
 
239
 
 
240
        rec = btr_cur_get_rec(cursor);
 
241
 
 
242
        ut_ad(!dict_index_is_clust(cursor->index));
 
243
        ut_ad(rec_get_deleted_flag(rec,
 
244
                                   dict_table_is_comp(cursor->index->table)));
 
245
 
 
246
        /* We know that in the alphabetical ordering, entry and rec are
 
247
        identified. But in their binary form there may be differences if
 
248
        there are char fields in them. Therefore we have to calculate the
 
249
        difference. */
 
250
 
 
251
        heap = mem_heap_create(1024);
 
252
 
 
253
        update = row_upd_build_sec_rec_difference_binary(
 
254
                cursor->index, entry, rec, thr_get_trx(thr), heap);
 
255
        if (mode == BTR_MODIFY_LEAF) {
 
256
                /* Try an optimistic updating of the record, keeping changes
 
257
                within the page */
 
258
 
 
259
                err = btr_cur_optimistic_update(BTR_KEEP_SYS_FLAG, cursor,
 
260
                                                update, 0, thr, mtr);
 
261
                switch (err) {
 
262
                case DB_OVERFLOW:
 
263
                case DB_UNDERFLOW:
 
264
                case DB_ZIP_OVERFLOW:
 
265
                        err = DB_FAIL;
 
266
                }
 
267
        } else {
 
268
                ut_a(mode == BTR_MODIFY_TREE);
 
269
                if (buf_LRU_buf_pool_running_out()) {
 
270
 
 
271
                        err = DB_LOCK_TABLE_FULL;
 
272
 
 
273
                        goto func_exit;
 
274
                }
 
275
 
 
276
                err = btr_cur_pessimistic_update(BTR_KEEP_SYS_FLAG, cursor,
 
277
                                                 &heap, &dummy_big_rec, update,
 
278
                                                 0, thr, mtr);
 
279
                ut_ad(!dummy_big_rec);
 
280
        }
 
281
func_exit:
 
282
        mem_heap_free(heap);
 
283
 
 
284
        return(err);
 
285
}
 
286
 
 
287
/*******************************************************************//**
 
288
Does an insert operation by delete unmarking and updating a delete marked
 
289
existing record in the index. This situation can occur if the delete marked
 
290
record is kept in the index for consistent reads.
 
291
@return DB_SUCCESS, DB_FAIL, or error code */
 
292
static
 
293
ulint
 
294
row_ins_clust_index_entry_by_modify(
 
295
/*================================*/
 
296
        ulint           mode,   /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
 
297
                                depending on whether mtr holds just a leaf
 
298
                                latch or also a tree latch */
 
299
        btr_cur_t*      cursor, /*!< in: B-tree cursor */
 
300
        mem_heap_t**    heap,   /*!< in/out: pointer to memory heap, or NULL */
 
301
        big_rec_t**     big_rec,/*!< out: possible big rec vector of fields
 
302
                                which have to be stored externally by the
 
303
                                caller */
 
304
        const dtuple_t* entry,  /*!< in: index entry to insert */
 
305
        que_thr_t*      thr,    /*!< in: query thread */
 
306
        mtr_t*          mtr)    /*!< in: mtr; must be committed before
 
307
                                latching any further pages */
 
308
{
 
309
        rec_t*          rec;
 
310
        upd_t*          update;
 
311
        ulint           err;
 
312
 
 
313
        ut_ad(dict_index_is_clust(cursor->index));
 
314
 
 
315
        *big_rec = NULL;
 
316
 
 
317
        rec = btr_cur_get_rec(cursor);
 
318
 
 
319
        ut_ad(rec_get_deleted_flag(rec,
 
320
                                   dict_table_is_comp(cursor->index->table)));
 
321
 
 
322
        if (!*heap) {
 
323
                *heap = mem_heap_create(1024);
 
324
        }
 
325
 
 
326
        /* Build an update vector containing all the fields to be modified;
 
327
        NOTE that this vector may NOT contain system columns trx_id or
 
328
        roll_ptr */
 
329
 
 
330
        update = row_upd_build_difference_binary(cursor->index, entry, rec,
 
331
                                                 thr_get_trx(thr), *heap);
 
332
        if (mode == BTR_MODIFY_LEAF) {
 
333
                /* Try optimistic updating of the record, keeping changes
 
334
                within the page */
 
335
 
 
336
                err = btr_cur_optimistic_update(0, cursor, update, 0, thr,
 
337
                                                mtr);
 
338
                switch (err) {
 
339
                case DB_OVERFLOW:
 
340
                case DB_UNDERFLOW:
 
341
                case DB_ZIP_OVERFLOW:
 
342
                        err = DB_FAIL;
 
343
                }
 
344
        } else {
 
345
                ut_a(mode == BTR_MODIFY_TREE);
 
346
                if (buf_LRU_buf_pool_running_out()) {
 
347
 
 
348
                        return(DB_LOCK_TABLE_FULL);
 
349
 
 
350
                }
 
351
                err = btr_cur_pessimistic_update(0, cursor,
 
352
                                                 heap, big_rec, update,
 
353
                                                 0, thr, mtr);
 
354
        }
 
355
 
 
356
        return(err);
 
357
}
 
358
 
 
359
/*********************************************************************//**
 
360
Returns TRUE if in a cascaded update/delete an ancestor node of node
 
361
updates (not DELETE, but UPDATE) table.
 
362
@return TRUE if an ancestor updates table */
 
363
static
 
364
ibool
 
365
row_ins_cascade_ancestor_updates_table(
 
366
/*===================================*/
 
367
        que_node_t*     node,   /*!< in: node in a query graph */
 
368
        dict_table_t*   table)  /*!< in: table */
 
369
{
 
370
        que_node_t*     parent;
 
371
        upd_node_t*     upd_node;
 
372
 
 
373
        parent = que_node_get_parent(node);
 
374
 
 
375
        while (que_node_get_type(parent) == QUE_NODE_UPDATE) {
 
376
 
 
377
                upd_node = parent;
 
378
 
 
379
                if (upd_node->table == table && upd_node->is_delete == FALSE) {
 
380
 
 
381
                        return(TRUE);
 
382
                }
 
383
 
 
384
                parent = que_node_get_parent(parent);
 
385
 
 
386
                ut_a(parent);
 
387
        }
 
388
 
 
389
        return(FALSE);
 
390
}
 
391
 
 
392
/*********************************************************************//**
 
393
Returns the number of ancestor UPDATE or DELETE nodes of a
 
394
cascaded update/delete node.
 
395
@return number of ancestors */
 
396
static
 
397
ulint
 
398
row_ins_cascade_n_ancestors(
 
399
/*========================*/
 
400
        que_node_t*     node)   /*!< in: node in a query graph */
 
401
{
 
402
        que_node_t*     parent;
 
403
        ulint           n_ancestors = 0;
 
404
 
 
405
        parent = que_node_get_parent(node);
 
406
 
 
407
        while (que_node_get_type(parent) == QUE_NODE_UPDATE) {
 
408
                n_ancestors++;
 
409
 
 
410
                parent = que_node_get_parent(parent);
 
411
 
 
412
                ut_a(parent);
 
413
        }
 
414
 
 
415
        return(n_ancestors);
 
416
}
 
417
 
 
418
/******************************************************************//**
 
419
Calculates the update vector node->cascade->update for a child table in
 
420
a cascaded update.
 
421
@return number of fields in the calculated update vector; the value
 
422
can also be 0 if no foreign key fields changed; the returned value is
 
423
ULINT_UNDEFINED if the column type in the child table is too short to
 
424
fit the new value in the parent table: that means the update fails */
 
425
static
 
426
ulint
 
427
row_ins_cascade_calc_update_vec(
 
428
/*============================*/
 
429
        upd_node_t*     node,           /*!< in: update node of the parent
 
430
                                        table */
 
431
        dict_foreign_t* foreign,        /*!< in: foreign key constraint whose
 
432
                                        type is != 0 */
 
433
        mem_heap_t*     heap)           /*!< in: memory heap to use as
 
434
                                        temporary storage */
 
435
{
 
436
        upd_node_t*     cascade         = node->cascade_node;
 
437
        dict_table_t*   table           = foreign->foreign_table;
 
438
        dict_index_t*   index           = foreign->foreign_index;
 
439
        upd_t*          update;
 
440
        upd_field_t*    ufield;
 
441
        dict_table_t*   parent_table;
 
442
        dict_index_t*   parent_index;
 
443
        upd_t*          parent_update;
 
444
        upd_field_t*    parent_ufield;
 
445
        ulint           n_fields_updated;
 
446
        ulint           parent_field_no;
 
447
        ulint           i;
 
448
        ulint           j;
 
449
 
 
450
        ut_a(node);
 
451
        ut_a(foreign);
 
452
        ut_a(cascade);
 
453
        ut_a(table);
 
454
        ut_a(index);
 
455
 
 
456
        /* Calculate the appropriate update vector which will set the fields
 
457
        in the child index record to the same value (possibly padded with
 
458
        spaces if the column is a fixed length CHAR or FIXBINARY column) as
 
459
        the referenced index record will get in the update. */
 
460
 
 
461
        parent_table = node->table;
 
462
        ut_a(parent_table == foreign->referenced_table);
 
463
        parent_index = foreign->referenced_index;
 
464
        parent_update = node->update;
 
465
 
 
466
        update = cascade->update;
 
467
 
 
468
        update->info_bits = 0;
 
469
        update->n_fields = foreign->n_fields;
 
470
 
 
471
        n_fields_updated = 0;
 
472
 
 
473
        for (i = 0; i < foreign->n_fields; i++) {
 
474
 
 
475
                parent_field_no = dict_table_get_nth_col_pos(
 
476
                        parent_table,
 
477
                        dict_index_get_nth_col_no(parent_index, i));
 
478
 
 
479
                for (j = 0; j < parent_update->n_fields; j++) {
 
480
                        parent_ufield = parent_update->fields + j;
 
481
 
 
482
                        if (parent_ufield->field_no == parent_field_no) {
 
483
 
 
484
                                ulint                   min_size;
 
485
                                const dict_col_t*       col;
 
486
                                ulint                   ufield_len;
 
487
 
 
488
                                col = dict_index_get_nth_col(index, i);
 
489
 
 
490
                                /* A field in the parent index record is
 
491
                                updated. Let us make the update vector
 
492
                                field for the child table. */
 
493
 
 
494
                                ufield = update->fields + n_fields_updated;
 
495
 
 
496
                                ufield->field_no
 
497
                                        = dict_table_get_nth_col_pos(
 
498
                                        table, dict_col_get_no(col));
 
499
                                ufield->exp = NULL;
 
500
 
 
501
                                ufield->new_val = parent_ufield->new_val;
 
502
                                ufield_len = dfield_get_len(&ufield->new_val);
 
503
 
 
504
                                /* Clear the "external storage" flag */
 
505
                                dfield_set_len(&ufield->new_val, ufield_len);
 
506
 
 
507
                                /* Do not allow a NOT NULL column to be
 
508
                                updated as NULL */
 
509
 
 
510
                                if (dfield_is_null(&ufield->new_val)
 
511
                                    && (col->prtype & DATA_NOT_NULL)) {
 
512
 
 
513
                                        return(ULINT_UNDEFINED);
 
514
                                }
 
515
 
 
516
                                /* If the new value would not fit in the
 
517
                                column, do not allow the update */
 
518
 
 
519
                                if (!dfield_is_null(&ufield->new_val)
 
520
                                    && dtype_get_at_most_n_mbchars(
 
521
                                        col->prtype, col->mbminmaxlen,
 
522
                                        col->len,
 
523
                                        ufield_len,
 
524
                                        dfield_get_data(&ufield->new_val))
 
525
                                    < ufield_len) {
 
526
 
 
527
                                        return(ULINT_UNDEFINED);
 
528
                                }
 
529
 
 
530
                                /* If the parent column type has a different
 
531
                                length than the child column type, we may
 
532
                                need to pad with spaces the new value of the
 
533
                                child column */
 
534
 
 
535
                                min_size = dict_col_get_min_size(col);
 
536
 
 
537
                                /* Because UNIV_SQL_NULL (the marker
 
538
                                of SQL NULL values) exceeds all possible
 
539
                                values of min_size, the test below will
 
540
                                not hold for SQL NULL columns. */
 
541
 
 
542
                                if (min_size > ufield_len) {
 
543
 
 
544
                                        byte*   pad;
 
545
                                        ulint   pad_len;
 
546
                                        byte*   padded_data;
 
547
                                        ulint   mbminlen;
 
548
 
 
549
                                        padded_data = mem_heap_alloc(
 
550
                                                heap, min_size);
 
551
 
 
552
                                        pad = padded_data + ufield_len;
 
553
                                        pad_len = min_size - ufield_len;
 
554
 
 
555
                                        memcpy(padded_data,
 
556
                                               dfield_get_data(&ufield
 
557
                                                               ->new_val),
 
558
                                               ufield_len);
 
559
 
 
560
                                        mbminlen = dict_col_get_mbminlen(col);
 
561
 
 
562
                                        ut_ad(!(ufield_len % mbminlen));
 
563
                                        ut_ad(!(min_size % mbminlen));
 
564
 
 
565
                                        if (mbminlen == 1
 
566
                                            && dtype_get_charset_coll(
 
567
                                                    col->prtype)
 
568
                                            == DATA_MYSQL_BINARY_CHARSET_COLL) {
 
569
                                                /* Do not pad BINARY columns */
 
570
                                                return(ULINT_UNDEFINED);
 
571
                                        }
 
572
 
 
573
                                        row_mysql_pad_col(mbminlen,
 
574
                                                          pad, pad_len);
 
575
                                        dfield_set_data(&ufield->new_val,
 
576
                                                        padded_data, min_size);
 
577
                                }
 
578
 
 
579
                                n_fields_updated++;
 
580
                        }
 
581
                }
 
582
        }
 
583
 
 
584
        update->n_fields = n_fields_updated;
 
585
 
 
586
        return(n_fields_updated);
 
587
}
 
588
 
 
589
/*********************************************************************//**
 
590
Set detailed error message associated with foreign key errors for
 
591
the given transaction. */
 
592
static
 
593
void
 
594
row_ins_set_detailed(
 
595
/*=================*/
 
596
        trx_t*          trx,            /*!< in: transaction */
 
597
        dict_foreign_t* foreign)        /*!< in: foreign key constraint */
 
598
{
 
599
        mutex_enter(&srv_misc_tmpfile_mutex);
 
600
        rewind(srv_misc_tmpfile);
 
601
 
 
602
        if (os_file_set_eof(srv_misc_tmpfile)) {
 
603
                ut_print_name(srv_misc_tmpfile, trx, TRUE,
 
604
                              foreign->foreign_table_name);
 
605
                dict_print_info_on_foreign_key_in_create_format(
 
606
                        srv_misc_tmpfile, trx, foreign, FALSE);
 
607
                trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
 
608
        } else {
 
609
                trx_set_detailed_error(trx, "temp file operation failed");
 
610
        }
 
611
 
 
612
        mutex_exit(&srv_misc_tmpfile_mutex);
 
613
}
 
614
 
 
615
/*********************************************************************//**
 
616
Reports a foreign key error associated with an update or a delete of a
 
617
parent table index entry. */
 
618
static
 
619
void
 
620
row_ins_foreign_report_err(
 
621
/*=======================*/
 
622
        const char*     errstr,         /*!< in: error string from the viewpoint
 
623
                                        of the parent table */
 
624
        que_thr_t*      thr,            /*!< in: query thread whose run_node
 
625
                                        is an update node */
 
626
        dict_foreign_t* foreign,        /*!< in: foreign key constraint */
 
627
        const rec_t*    rec,            /*!< in: a matching index record in the
 
628
                                        child table */
 
629
        const dtuple_t* entry)          /*!< in: index entry in the parent
 
630
                                        table */
 
631
{
 
632
        FILE*   ef      = dict_foreign_err_file;
 
633
        trx_t*  trx     = thr_get_trx(thr);
 
634
 
 
635
        row_ins_set_detailed(trx, foreign);
 
636
 
 
637
        mutex_enter(&dict_foreign_err_mutex);
 
638
        rewind(ef);
 
639
        ut_print_timestamp(ef);
 
640
        fputs(" Transaction:\n", ef);
 
641
        trx_print(ef, trx, 600);
 
642
 
 
643
        fputs("Foreign key constraint fails for table ", ef);
 
644
        ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
 
645
        fputs(":\n", ef);
 
646
        dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
 
647
                                                        TRUE);
 
648
        putc('\n', ef);
 
649
        fputs(errstr, ef);
 
650
        fputs(" in parent table, in index ", ef);
 
651
        ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
 
652
        if (entry) {
 
653
                fputs(" tuple:\n", ef);
 
654
                dtuple_print(ef, entry);
 
655
        }
 
656
        fputs("\nBut in child table ", ef);
 
657
        ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
 
658
        fputs(", in index ", ef);
 
659
        ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
 
660
        if (rec) {
 
661
                fputs(", there is a record:\n", ef);
 
662
                rec_print(ef, rec, foreign->foreign_index);
 
663
        } else {
 
664
                fputs(", the record is not available\n", ef);
 
665
        }
 
666
        putc('\n', ef);
 
667
 
 
668
        mutex_exit(&dict_foreign_err_mutex);
 
669
}
 
670
 
 
671
/*********************************************************************//**
 
672
Reports a foreign key error to dict_foreign_err_file when we are trying
 
673
to add an index entry to a child table. Note that the adding may be the result
 
674
of an update, too. */
 
675
static
 
676
void
 
677
row_ins_foreign_report_add_err(
 
678
/*===========================*/
 
679
        trx_t*          trx,            /*!< in: transaction */
 
680
        dict_foreign_t* foreign,        /*!< in: foreign key constraint */
 
681
        const rec_t*    rec,            /*!< in: a record in the parent table:
 
682
                                        it does not match entry because we
 
683
                                        have an error! */
 
684
        const dtuple_t* entry)          /*!< in: index entry to insert in the
 
685
                                        child table */
 
686
{
 
687
        FILE*   ef      = dict_foreign_err_file;
 
688
 
 
689
        row_ins_set_detailed(trx, foreign);
 
690
 
 
691
        mutex_enter(&dict_foreign_err_mutex);
 
692
        rewind(ef);
 
693
        ut_print_timestamp(ef);
 
694
        fputs(" Transaction:\n", ef);
 
695
        trx_print(ef, trx, 600);
 
696
        fputs("Foreign key constraint fails for table ", ef);
 
697
        ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
 
698
        fputs(":\n", ef);
 
699
        dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
 
700
                                                        TRUE);
 
701
        fputs("\nTrying to add in child table, in index ", ef);
 
702
        ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
 
703
        if (entry) {
 
704
                fputs(" tuple:\n", ef);
 
705
                /* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
 
706
                It would be better to only display the user columns. */
 
707
                dtuple_print(ef, entry);
 
708
        }
 
709
        fputs("\nBut in parent table ", ef);
 
710
        ut_print_name(ef, trx, TRUE, foreign->referenced_table_name);
 
711
        fputs(", in index ", ef);
 
712
        ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
 
713
        fputs(",\nthe closest match we can find is record:\n", ef);
 
714
        if (rec && page_rec_is_supremum(rec)) {
 
715
                /* If the cursor ended on a supremum record, it is better
 
716
                to report the previous record in the error message, so that
 
717
                the user gets a more descriptive error message. */
 
718
                rec = page_rec_get_prev_const(rec);
 
719
        }
 
720
 
 
721
        if (rec) {
 
722
                rec_print(ef, rec, foreign->referenced_index);
 
723
        }
 
724
        putc('\n', ef);
 
725
 
 
726
        mutex_exit(&dict_foreign_err_mutex);
 
727
}
 
728
 
 
729
/*********************************************************************//**
 
730
Invalidate the query cache for the given table. */
 
731
static
 
732
void
 
733
row_ins_invalidate_query_cache(
 
734
/*===========================*/
 
735
        que_thr_t*      thr,            /*!< in: query thread whose run_node
 
736
                                        is an update node */
 
737
        const char*     name)           /*!< in: table name prefixed with
 
738
                                        database name and a '/' character */
 
739
{
 
740
        char*   buf;
 
741
        char*   ptr;
 
742
        ulint   len = strlen(name) + 1;
 
743
 
 
744
        buf = mem_strdupl(name, len);
 
745
 
 
746
        ptr = strchr(buf, '/');
 
747
        ut_a(ptr);
 
748
        *ptr = '\0';
 
749
 
 
750
        innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
 
751
        mem_free(buf);
 
752
}
 
753
 
 
754
/*********************************************************************//**
 
755
Perform referential actions or checks when a parent row is deleted or updated
 
756
and the constraint had an ON DELETE or ON UPDATE condition which was not
 
757
RESTRICT.
 
758
@return DB_SUCCESS, DB_LOCK_WAIT, or error code */
 
759
static
 
760
ulint
 
761
row_ins_foreign_check_on_constraint(
 
762
/*================================*/
 
763
        que_thr_t*      thr,            /*!< in: query thread whose run_node
 
764
                                        is an update node */
 
765
        dict_foreign_t* foreign,        /*!< in: foreign key constraint whose
 
766
                                        type is != 0 */
 
767
        btr_pcur_t*     pcur,           /*!< in: cursor placed on a matching
 
768
                                        index record in the child table */
 
769
        dtuple_t*       entry,          /*!< in: index entry in the parent
 
770
                                        table */
 
771
        mtr_t*          mtr)            /*!< in: mtr holding the latch of pcur
 
772
                                        page */
 
773
{
 
774
        upd_node_t*     node;
 
775
        upd_node_t*     cascade;
 
776
        dict_table_t*   table           = foreign->foreign_table;
 
777
        dict_index_t*   index;
 
778
        dict_index_t*   clust_index;
 
779
        dtuple_t*       ref;
 
780
        mem_heap_t*     upd_vec_heap    = NULL;
 
781
        const rec_t*    rec;
 
782
        const rec_t*    clust_rec;
 
783
        const buf_block_t* clust_block;
 
784
        upd_t*          update;
 
785
        ulint           n_to_update;
 
786
        ulint           err;
 
787
        ulint           i;
 
788
        trx_t*          trx;
 
789
        mem_heap_t*     tmp_heap        = NULL;
 
790
 
 
791
        ut_a(thr);
 
792
        ut_a(foreign);
 
793
        ut_a(pcur);
 
794
        ut_a(mtr);
 
795
 
 
796
        trx = thr_get_trx(thr);
 
797
 
 
798
        /* Since we are going to delete or update a row, we have to invalidate
 
799
        the MySQL query cache for table. A deadlock of threads is not possible
 
800
        here because the caller of this function does not hold any latches with
 
801
        the sync0sync.h rank above the kernel mutex. The query cache mutex has
 
802
        a rank just above the kernel mutex. */
 
803
 
 
804
        row_ins_invalidate_query_cache(thr, table->name);
 
805
 
 
806
        node = thr->run_node;
 
807
 
 
808
        if (node->is_delete && 0 == (foreign->type
 
809
                                     & (DICT_FOREIGN_ON_DELETE_CASCADE
 
810
                                        | DICT_FOREIGN_ON_DELETE_SET_NULL))) {
 
811
 
 
812
                row_ins_foreign_report_err("Trying to delete",
 
813
                                           thr, foreign,
 
814
                                           btr_pcur_get_rec(pcur), entry);
 
815
 
 
816
                return(DB_ROW_IS_REFERENCED);
 
817
        }
 
818
 
 
819
        if (!node->is_delete && 0 == (foreign->type
 
820
                                      & (DICT_FOREIGN_ON_UPDATE_CASCADE
 
821
                                         | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
 
822
 
 
823
                /* This is an UPDATE */
 
824
 
 
825
                row_ins_foreign_report_err("Trying to update",
 
826
                                           thr, foreign,
 
827
                                           btr_pcur_get_rec(pcur), entry);
 
828
 
 
829
                return(DB_ROW_IS_REFERENCED);
 
830
        }
 
831
 
 
832
        if (node->cascade_node == NULL) {
 
833
                /* Extend our query graph by creating a child to current
 
834
                update node. The child is used in the cascade or set null
 
835
                operation. */
 
836
 
 
837
                node->cascade_heap = mem_heap_create(128);
 
838
                node->cascade_node = row_create_update_node_for_mysql(
 
839
                        table, node->cascade_heap);
 
840
                que_node_set_parent(node->cascade_node, node);
 
841
        }
 
842
 
 
843
        /* Initialize cascade_node to do the operation we want. Note that we
 
844
        use the SAME cascade node to do all foreign key operations of the
 
845
        SQL DELETE: the table of the cascade node may change if there are
 
846
        several child tables to the table where the delete is done! */
 
847
 
 
848
        cascade = node->cascade_node;
 
849
 
 
850
        cascade->table = table;
 
851
 
 
852
        cascade->foreign = foreign;
 
853
 
 
854
        if (node->is_delete
 
855
            && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
 
856
                cascade->is_delete = TRUE;
 
857
        } else {
 
858
                cascade->is_delete = FALSE;
 
859
 
 
860
                if (foreign->n_fields > cascade->update_n_fields) {
 
861
                        /* We have to make the update vector longer */
 
862
 
 
863
                        cascade->update = upd_create(foreign->n_fields,
 
864
                                                     node->cascade_heap);
 
865
                        cascade->update_n_fields = foreign->n_fields;
 
866
                }
 
867
        }
 
868
 
 
869
        /* We do not allow cyclic cascaded updating (DELETE is allowed,
 
870
        but not UPDATE) of the same table, as this can lead to an infinite
 
871
        cycle. Check that we are not updating the same table which is
 
872
        already being modified in this cascade chain. We have to check
 
873
        this also because the modification of the indexes of a 'parent'
 
874
        table may still be incomplete, and we must avoid seeing the indexes
 
875
        of the parent table in an inconsistent state! */
 
876
 
 
877
        if (!cascade->is_delete
 
878
            && row_ins_cascade_ancestor_updates_table(cascade, table)) {
 
879
 
 
880
                /* We do not know if this would break foreign key
 
881
                constraints, but play safe and return an error */
 
882
 
 
883
                err = DB_ROW_IS_REFERENCED;
 
884
 
 
885
                row_ins_foreign_report_err(
 
886
                        "Trying an update, possibly causing a cyclic"
 
887
                        " cascaded update\n"
 
888
                        "in the child table,", thr, foreign,
 
889
                        btr_pcur_get_rec(pcur), entry);
 
890
 
 
891
                goto nonstandard_exit_func;
 
892
        }
 
893
 
 
894
        if (row_ins_cascade_n_ancestors(cascade) >= 15) {
 
895
                err = DB_ROW_IS_REFERENCED;
 
896
 
 
897
                row_ins_foreign_report_err(
 
898
                        "Trying a too deep cascaded delete or update\n",
 
899
                        thr, foreign, btr_pcur_get_rec(pcur), entry);
 
900
 
 
901
                goto nonstandard_exit_func;
 
902
        }
 
903
 
 
904
        index = btr_pcur_get_btr_cur(pcur)->index;
 
905
 
 
906
        ut_a(index == foreign->foreign_index);
 
907
 
 
908
        rec = btr_pcur_get_rec(pcur);
 
909
 
 
910
        if (dict_index_is_clust(index)) {
 
911
                /* pcur is already positioned in the clustered index of
 
912
                the child table */
 
913
 
 
914
                clust_index = index;
 
915
                clust_rec = rec;
 
916
                clust_block = btr_pcur_get_block(pcur);
 
917
        } else {
 
918
                /* We have to look for the record in the clustered index
 
919
                in the child table */
 
920
 
 
921
                clust_index = dict_table_get_first_index(table);
 
922
 
 
923
                tmp_heap = mem_heap_create(256);
 
924
 
 
925
                ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
 
926
                                        tmp_heap);
 
927
                btr_pcur_open_with_no_init(clust_index, ref,
 
928
                                           PAGE_CUR_LE, BTR_SEARCH_LEAF,
 
929
                                           cascade->pcur, 0, mtr);
 
930
 
 
931
                clust_rec = btr_pcur_get_rec(cascade->pcur);
 
932
                clust_block = btr_pcur_get_block(cascade->pcur);
 
933
 
 
934
                if (!page_rec_is_user_rec(clust_rec)
 
935
                    || btr_pcur_get_low_match(cascade->pcur)
 
936
                    < dict_index_get_n_unique(clust_index)) {
 
937
 
 
938
                        fputs("InnoDB: error in cascade of a foreign key op\n"
 
939
                              "InnoDB: ", stderr);
 
940
                        dict_index_name_print(stderr, trx, index);
 
941
 
 
942
                        fputs("\n"
 
943
                              "InnoDB: record ", stderr);
 
944
                        rec_print(stderr, rec, index);
 
945
                        fputs("\n"
 
946
                              "InnoDB: clustered record ", stderr);
 
947
                        rec_print(stderr, clust_rec, clust_index);
 
948
                        fputs("\n"
 
949
                              "InnoDB: Submit a detailed bug report to"
 
950
                              " http://bugs.mysql.com\n", stderr);
 
951
 
 
952
                        err = DB_SUCCESS;
 
953
 
 
954
                        goto nonstandard_exit_func;
 
955
                }
 
956
        }
 
957
 
 
958
        /* Set an X-lock on the row to delete or update in the child table */
 
959
 
 
960
        err = lock_table(0, table, LOCK_IX, thr);
 
961
 
 
962
        if (err == DB_SUCCESS) {
 
963
                /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
 
964
                we already have a normal shared lock on the appropriate
 
965
                gap if the search criterion was not unique */
 
966
 
 
967
                err = lock_clust_rec_read_check_and_lock_alt(
 
968
                        0, clust_block, clust_rec, clust_index,
 
969
                        LOCK_X, LOCK_REC_NOT_GAP, thr);
 
970
        }
 
971
 
 
972
        if (err != DB_SUCCESS) {
 
973
 
 
974
                goto nonstandard_exit_func;
 
975
        }
 
976
 
 
977
        if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
 
978
                /* This can happen if there is a circular reference of
 
979
                rows such that cascading delete comes to delete a row
 
980
                already in the process of being delete marked */
 
981
                err = DB_SUCCESS;
 
982
 
 
983
                goto nonstandard_exit_func;
 
984
        }
 
985
 
 
986
        if ((node->is_delete
 
987
             && (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL))
 
988
            || (!node->is_delete
 
989
                && (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
 
990
 
 
991
                /* Build the appropriate update vector which sets
 
992
                foreign->n_fields first fields in rec to SQL NULL */
 
993
 
 
994
                update = cascade->update;
 
995
 
 
996
                update->info_bits = 0;
 
997
                update->n_fields = foreign->n_fields;
 
998
 
 
999
                for (i = 0; i < foreign->n_fields; i++) {
 
1000
                        upd_field_t*    ufield = &update->fields[i];
 
1001
 
 
1002
                        ufield->field_no = dict_table_get_nth_col_pos(
 
1003
                                table,
 
1004
                                dict_index_get_nth_col_no(index, i));
 
1005
                        ufield->orig_len = 0;
 
1006
                        ufield->exp = NULL;
 
1007
                        dfield_set_null(&ufield->new_val);
 
1008
                }
 
1009
        }
 
1010
 
 
1011
        if (!node->is_delete
 
1012
            && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
 
1013
 
 
1014
                /* Build the appropriate update vector which sets changing
 
1015
                foreign->n_fields first fields in rec to new values */
 
1016
 
 
1017
                upd_vec_heap = mem_heap_create(256);
 
1018
 
 
1019
                n_to_update = row_ins_cascade_calc_update_vec(node, foreign,
 
1020
                                                              upd_vec_heap);
 
1021
                if (n_to_update == ULINT_UNDEFINED) {
 
1022
                        err = DB_ROW_IS_REFERENCED;
 
1023
 
 
1024
                        row_ins_foreign_report_err(
 
1025
                                "Trying a cascaded update where the"
 
1026
                                " updated value in the child\n"
 
1027
                                "table would not fit in the length"
 
1028
                                " of the column, or the value would\n"
 
1029
                                "be NULL and the column is"
 
1030
                                " declared as not NULL in the child table,",
 
1031
                                thr, foreign, btr_pcur_get_rec(pcur), entry);
 
1032
 
 
1033
                        goto nonstandard_exit_func;
 
1034
                }
 
1035
 
 
1036
                if (cascade->update->n_fields == 0) {
 
1037
 
 
1038
                        /* The update does not change any columns referred
 
1039
                        to in this foreign key constraint: no need to do
 
1040
                        anything */
 
1041
 
 
1042
                        err = DB_SUCCESS;
 
1043
 
 
1044
                        goto nonstandard_exit_func;
 
1045
                }
 
1046
        }
 
1047
 
 
1048
        /* Store pcur position and initialize or store the cascade node
 
1049
        pcur stored position */
 
1050
 
 
1051
        btr_pcur_store_position(pcur, mtr);
 
1052
 
 
1053
        if (index == clust_index) {
 
1054
                btr_pcur_copy_stored_position(cascade->pcur, pcur);
 
1055
        } else {
 
1056
                btr_pcur_store_position(cascade->pcur, mtr);
 
1057
        }
 
1058
 
 
1059
        mtr_commit(mtr);
 
1060
 
 
1061
        ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
 
1062
 
 
1063
        cascade->state = UPD_NODE_UPDATE_CLUSTERED;
 
1064
 
 
1065
        err = row_update_cascade_for_mysql(thr, cascade,
 
1066
                                           foreign->foreign_table);
 
1067
 
 
1068
        if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
 
1069
                fprintf(stderr,
 
1070
                        "InnoDB: error: table %s has the counter 0"
 
1071
                        " though there is\n"
 
1072
                        "InnoDB: a FOREIGN KEY check running on it.\n",
 
1073
                        foreign->foreign_table->name);
 
1074
        }
 
1075
 
 
1076
        /* Release the data dictionary latch for a while, so that we do not
 
1077
        starve other threads from doing CREATE TABLE etc. if we have a huge
 
1078
        cascaded operation running. The counter n_foreign_key_checks_running
 
1079
        will prevent other users from dropping or ALTERing the table when we
 
1080
        release the latch. */
 
1081
 
 
1082
        row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
 
1083
        row_mysql_freeze_data_dictionary(thr_get_trx(thr));
 
1084
 
 
1085
        mtr_start(mtr);
 
1086
 
 
1087
        /* Restore pcur position */
 
1088
 
 
1089
        btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
 
1090
 
 
1091
        if (tmp_heap) {
 
1092
                mem_heap_free(tmp_heap);
 
1093
        }
 
1094
 
 
1095
        if (upd_vec_heap) {
 
1096
                mem_heap_free(upd_vec_heap);
 
1097
        }
 
1098
 
 
1099
        return(err);
 
1100
 
 
1101
nonstandard_exit_func:
 
1102
        if (tmp_heap) {
 
1103
                mem_heap_free(tmp_heap);
 
1104
        }
 
1105
 
 
1106
        if (upd_vec_heap) {
 
1107
                mem_heap_free(upd_vec_heap);
 
1108
        }
 
1109
 
 
1110
        btr_pcur_store_position(pcur, mtr);
 
1111
 
 
1112
        mtr_commit(mtr);
 
1113
        mtr_start(mtr);
 
1114
 
 
1115
        btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
 
1116
 
 
1117
        return(err);
 
1118
}
 
1119
 
 
1120
/*********************************************************************//**
 
1121
Sets a shared lock on a record. Used in locking possible duplicate key
 
1122
records and also in checking foreign key constraints.
 
1123
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
 
1124
static
 
1125
enum db_err
 
1126
row_ins_set_shared_rec_lock(
 
1127
/*========================*/
 
1128
        ulint                   type,   /*!< in: LOCK_ORDINARY, LOCK_GAP, or
 
1129
                                        LOCK_REC_NOT_GAP type lock */
 
1130
        const buf_block_t*      block,  /*!< in: buffer block of rec */
 
1131
        const rec_t*            rec,    /*!< in: record */
 
1132
        dict_index_t*           index,  /*!< in: index */
 
1133
        const ulint*            offsets,/*!< in: rec_get_offsets(rec, index) */
 
1134
        que_thr_t*              thr)    /*!< in: query thread */
 
1135
{
 
1136
        enum db_err     err;
 
1137
 
 
1138
        ut_ad(rec_offs_validate(rec, index, offsets));
 
1139
 
 
1140
        if (dict_index_is_clust(index)) {
 
1141
                err = lock_clust_rec_read_check_and_lock(
 
1142
                        0, block, rec, index, offsets, LOCK_S, type, thr);
 
1143
        } else {
 
1144
                err = lock_sec_rec_read_check_and_lock(
 
1145
                        0, block, rec, index, offsets, LOCK_S, type, thr);
 
1146
        }
 
1147
 
 
1148
        return(err);
 
1149
}
 
1150
 
 
1151
/*********************************************************************//**
 
1152
Sets a exclusive lock on a record. Used in locking possible duplicate key
 
1153
records
 
1154
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
 
1155
static
 
1156
enum db_err
 
1157
row_ins_set_exclusive_rec_lock(
 
1158
/*===========================*/
 
1159
        ulint                   type,   /*!< in: LOCK_ORDINARY, LOCK_GAP, or
 
1160
                                        LOCK_REC_NOT_GAP type lock */
 
1161
        const buf_block_t*      block,  /*!< in: buffer block of rec */
 
1162
        const rec_t*            rec,    /*!< in: record */
 
1163
        dict_index_t*           index,  /*!< in: index */
 
1164
        const ulint*            offsets,/*!< in: rec_get_offsets(rec, index) */
 
1165
        que_thr_t*              thr)    /*!< in: query thread */
 
1166
{
 
1167
        enum db_err     err;
 
1168
 
 
1169
        ut_ad(rec_offs_validate(rec, index, offsets));
 
1170
 
 
1171
        if (dict_index_is_clust(index)) {
 
1172
                err = lock_clust_rec_read_check_and_lock(
 
1173
                        0, block, rec, index, offsets, LOCK_X, type, thr);
 
1174
        } else {
 
1175
                err = lock_sec_rec_read_check_and_lock(
 
1176
                        0, block, rec, index, offsets, LOCK_X, type, thr);
 
1177
        }
 
1178
 
 
1179
        return(err);
 
1180
}
 
1181
 
 
1182
/***************************************************************//**
 
1183
Checks if foreign key constraint fails for an index entry. Sets shared locks
 
1184
which lock either the success or the failure of the constraint. NOTE that
 
1185
the caller must have a shared latch on dict_operation_lock.
 
1186
@return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
 
1187
UNIV_INTERN
 
1188
ulint
 
1189
row_ins_check_foreign_constraint(
 
1190
/*=============================*/
 
1191
        ibool           check_ref,/*!< in: TRUE if we want to check that
 
1192
                                the referenced table is ok, FALSE if we
 
1193
                                want to check the foreign key table */
 
1194
        dict_foreign_t* foreign,/*!< in: foreign constraint; NOTE that the
 
1195
                                tables mentioned in it must be in the
 
1196
                                dictionary cache if they exist at all */
 
1197
        dict_table_t*   table,  /*!< in: if check_ref is TRUE, then the foreign
 
1198
                                table, else the referenced table */
 
1199
        dtuple_t*       entry,  /*!< in: index entry for index */
 
1200
        que_thr_t*      thr)    /*!< in: query thread */
 
1201
{
 
1202
        upd_node_t*     upd_node;
 
1203
        dict_table_t*   check_table;
 
1204
        dict_index_t*   check_index;
 
1205
        ulint           n_fields_cmp;
 
1206
        btr_pcur_t      pcur;
 
1207
        int             cmp;
 
1208
        ulint           err;
 
1209
        ulint           i;
 
1210
        mtr_t           mtr;
 
1211
        trx_t*          trx             = thr_get_trx(thr);
 
1212
        mem_heap_t*     heap            = NULL;
 
1213
        ulint           offsets_[REC_OFFS_NORMAL_SIZE];
 
1214
        ulint*          offsets         = offsets_;
 
1215
        rec_offs_init(offsets_);
 
1216
 
 
1217
run_again:
 
1218
#ifdef UNIV_SYNC_DEBUG
 
1219
        ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
 
1220
#endif /* UNIV_SYNC_DEBUG */
 
1221
 
 
1222
        err = DB_SUCCESS;
 
1223
 
 
1224
        if (trx->check_foreigns == FALSE) {
 
1225
                /* The user has suppressed foreign key checks currently for
 
1226
                this session */
 
1227
                goto exit_func;
 
1228
        }
 
1229
 
 
1230
        /* If any of the foreign key fields in entry is SQL NULL, we
 
1231
        suppress the foreign key check: this is compatible with Oracle,
 
1232
        for example */
 
1233
 
 
1234
        for (i = 0; i < foreign->n_fields; i++) {
 
1235
                if (UNIV_SQL_NULL == dfield_get_len(
 
1236
                            dtuple_get_nth_field(entry, i))) {
 
1237
 
 
1238
                        goto exit_func;
 
1239
                }
 
1240
        }
 
1241
 
 
1242
        if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
 
1243
                upd_node = thr->run_node;
 
1244
 
 
1245
                if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
 
1246
                        /* If a cascaded update is done as defined by a
 
1247
                        foreign key constraint, do not check that
 
1248
                        constraint for the child row. In ON UPDATE CASCADE
 
1249
                        the update of the parent row is only half done when
 
1250
                        we come here: if we would check the constraint here
 
1251
                        for the child row it would fail.
 
1252
 
 
1253
                        A QUESTION remains: if in the child table there are
 
1254
                        several constraints which refer to the same parent
 
1255
                        table, we should merge all updates to the child as
 
1256
                        one update? And the updates can be contradictory!
 
1257
                        Currently we just perform the update associated
 
1258
                        with each foreign key constraint, one after
 
1259
                        another, and the user has problems predicting in
 
1260
                        which order they are performed. */
 
1261
 
 
1262
                        goto exit_func;
 
1263
                }
 
1264
        }
 
1265
 
 
1266
        if (check_ref) {
 
1267
                check_table = foreign->referenced_table;
 
1268
                check_index = foreign->referenced_index;
 
1269
        } else {
 
1270
                check_table = foreign->foreign_table;
 
1271
                check_index = foreign->foreign_index;
 
1272
        }
 
1273
 
 
1274
        if (check_table == NULL || check_table->ibd_file_missing) {
 
1275
                if (check_ref) {
 
1276
                        FILE*   ef = dict_foreign_err_file;
 
1277
 
 
1278
                        row_ins_set_detailed(trx, foreign);
 
1279
 
 
1280
                        mutex_enter(&dict_foreign_err_mutex);
 
1281
                        rewind(ef);
 
1282
                        ut_print_timestamp(ef);
 
1283
                        fputs(" Transaction:\n", ef);
 
1284
                        trx_print(ef, trx, 600);
 
1285
                        fputs("Foreign key constraint fails for table ", ef);
 
1286
                        ut_print_name(ef, trx, TRUE,
 
1287
                                      foreign->foreign_table_name);
 
1288
                        fputs(":\n", ef);
 
1289
                        dict_print_info_on_foreign_key_in_create_format(
 
1290
                                ef, trx, foreign, TRUE);
 
1291
                        fputs("\nTrying to add to index ", ef);
 
1292
                        ut_print_name(ef, trx, FALSE,
 
1293
                                      foreign->foreign_index->name);
 
1294
                        fputs(" tuple:\n", ef);
 
1295
                        dtuple_print(ef, entry);
 
1296
                        fputs("\nBut the parent table ", ef);
 
1297
                        ut_print_name(ef, trx, TRUE,
 
1298
                                      foreign->referenced_table_name);
 
1299
                        fputs("\nor its .ibd file does"
 
1300
                              " not currently exist!\n", ef);
 
1301
                        mutex_exit(&dict_foreign_err_mutex);
 
1302
 
 
1303
                        err = DB_NO_REFERENCED_ROW;
 
1304
                }
 
1305
 
 
1306
                goto exit_func;
 
1307
        }
 
1308
 
 
1309
        ut_a(check_table);
 
1310
        ut_a(check_index);
 
1311
 
 
1312
        if (check_table != table) {
 
1313
                /* We already have a LOCK_IX on table, but not necessarily
 
1314
                on check_table */
 
1315
 
 
1316
                err = lock_table(0, check_table, LOCK_IS, thr);
 
1317
 
 
1318
                if (err != DB_SUCCESS) {
 
1319
 
 
1320
                        goto do_possible_lock_wait;
 
1321
                }
 
1322
        }
 
1323
 
 
1324
        mtr_start(&mtr);
 
1325
 
 
1326
        /* Store old value on n_fields_cmp */
 
1327
 
 
1328
        n_fields_cmp = dtuple_get_n_fields_cmp(entry);
 
1329
 
 
1330
        dtuple_set_n_fields_cmp(entry, foreign->n_fields);
 
1331
 
 
1332
        btr_pcur_open(check_index, entry, PAGE_CUR_GE,
 
1333
                      BTR_SEARCH_LEAF, &pcur, &mtr);
 
1334
 
 
1335
        /* Scan index records and check if there is a matching record */
 
1336
 
 
1337
        do {
 
1338
                const rec_t*            rec = btr_pcur_get_rec(&pcur);
 
1339
                const buf_block_t*      block = btr_pcur_get_block(&pcur);
 
1340
 
 
1341
                if (page_rec_is_infimum(rec)) {
 
1342
 
 
1343
                        continue;
 
1344
                }
 
1345
 
 
1346
                offsets = rec_get_offsets(rec, check_index,
 
1347
                                          offsets, ULINT_UNDEFINED, &heap);
 
1348
 
 
1349
                if (page_rec_is_supremum(rec)) {
 
1350
 
 
1351
                        err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
 
1352
                                                          rec, check_index,
 
1353
                                                          offsets, thr);
 
1354
                        switch (err) {
 
1355
                        case DB_SUCCESS_LOCKED_REC:
 
1356
                        case DB_SUCCESS:
 
1357
                                continue;
 
1358
                        default:
 
1359
                                goto end_scan;
 
1360
                        }
 
1361
                }
 
1362
 
 
1363
                cmp = cmp_dtuple_rec(entry, rec, offsets);
 
1364
 
 
1365
                if (cmp == 0) {
 
1366
                        if (rec_get_deleted_flag(rec,
 
1367
                                                 rec_offs_comp(offsets))) {
 
1368
                                err = row_ins_set_shared_rec_lock(
 
1369
                                        LOCK_ORDINARY, block,
 
1370
                                        rec, check_index, offsets, thr);
 
1371
                                switch (err) {
 
1372
                                case DB_SUCCESS_LOCKED_REC:
 
1373
                                case DB_SUCCESS:
 
1374
                                        break;
 
1375
                                default:
 
1376
                                        goto end_scan;
 
1377
                                }
 
1378
                        } else {
 
1379
                                /* Found a matching record. Lock only
 
1380
                                a record because we can allow inserts
 
1381
                                into gaps */
 
1382
 
 
1383
                                err = row_ins_set_shared_rec_lock(
 
1384
                                        LOCK_REC_NOT_GAP, block,
 
1385
                                        rec, check_index, offsets, thr);
 
1386
 
 
1387
                                switch (err) {
 
1388
                                case DB_SUCCESS_LOCKED_REC:
 
1389
                                case DB_SUCCESS:
 
1390
                                        break;
 
1391
                                default:
 
1392
                                        goto end_scan;
 
1393
                                }
 
1394
 
 
1395
                                if (check_ref) {
 
1396
                                        err = DB_SUCCESS;
 
1397
 
 
1398
                                        goto end_scan;
 
1399
                                } else if (foreign->type != 0) {
 
1400
                                        /* There is an ON UPDATE or ON DELETE
 
1401
                                        condition: check them in a separate
 
1402
                                        function */
 
1403
 
 
1404
                                        err = row_ins_foreign_check_on_constraint(
 
1405
                                                thr, foreign, &pcur, entry,
 
1406
                                                &mtr);
 
1407
                                        if (err != DB_SUCCESS) {
 
1408
                                                /* Since reporting a plain
 
1409
                                                "duplicate key" error
 
1410
                                                message to the user in
 
1411
                                                cases where a long CASCADE
 
1412
                                                operation would lead to a
 
1413
                                                duplicate key in some
 
1414
                                                other table is very
 
1415
                                                confusing, map duplicate
 
1416
                                                key errors resulting from
 
1417
                                                FK constraints to a
 
1418
                                                separate error code. */
 
1419
 
 
1420
                                                if (err == DB_DUPLICATE_KEY) {
 
1421
                                                        err = DB_FOREIGN_DUPLICATE_KEY;
 
1422
                                                }
 
1423
 
 
1424
                                                goto end_scan;
 
1425
                                        }
 
1426
 
 
1427
                                        /* row_ins_foreign_check_on_constraint
 
1428
                                        may have repositioned pcur on a
 
1429
                                        different block */
 
1430
                                        block = btr_pcur_get_block(&pcur);
 
1431
                                } else {
 
1432
                                        row_ins_foreign_report_err(
 
1433
                                                "Trying to delete or update",
 
1434
                                                thr, foreign, rec, entry);
 
1435
 
 
1436
                                        err = DB_ROW_IS_REFERENCED;
 
1437
                                        goto end_scan;
 
1438
                                }
 
1439
                        }
 
1440
                } else {
 
1441
                        ut_a(cmp < 0);
 
1442
 
 
1443
                        err = row_ins_set_shared_rec_lock(
 
1444
                                LOCK_GAP, block,
 
1445
                                rec, check_index, offsets, thr);
 
1446
 
 
1447
                        switch (err) {
 
1448
                        case DB_SUCCESS_LOCKED_REC:
 
1449
                        case DB_SUCCESS:
 
1450
                                if (check_ref) {
 
1451
                                        err = DB_NO_REFERENCED_ROW;
 
1452
                                        row_ins_foreign_report_add_err(
 
1453
                                                trx, foreign, rec, entry);
 
1454
                                } else {
 
1455
                                        err = DB_SUCCESS;
 
1456
                                }
 
1457
                        }
 
1458
 
 
1459
                        goto end_scan;
 
1460
                }
 
1461
        } while (btr_pcur_move_to_next(&pcur, &mtr));
 
1462
 
 
1463
        if (check_ref) {
 
1464
                row_ins_foreign_report_add_err(
 
1465
                        trx, foreign, btr_pcur_get_rec(&pcur), entry);
 
1466
                err = DB_NO_REFERENCED_ROW;
 
1467
        } else {
 
1468
                err = DB_SUCCESS;
 
1469
        }
 
1470
 
 
1471
end_scan:
 
1472
        btr_pcur_close(&pcur);
 
1473
 
 
1474
        mtr_commit(&mtr);
 
1475
 
 
1476
        /* Restore old value */
 
1477
        dtuple_set_n_fields_cmp(entry, n_fields_cmp);
 
1478
 
 
1479
do_possible_lock_wait:
 
1480
        if (err == DB_LOCK_WAIT) {
 
1481
                trx->error_state = err;
 
1482
 
 
1483
                que_thr_stop_for_mysql(thr);
 
1484
 
 
1485
                srv_suspend_mysql_thread(thr);
 
1486
 
 
1487
                if (trx->error_state == DB_SUCCESS) {
 
1488
 
 
1489
                        goto run_again;
 
1490
                }
 
1491
 
 
1492
                err = trx->error_state;
 
1493
        }
 
1494
 
 
1495
exit_func:
 
1496
        if (UNIV_LIKELY_NULL(heap)) {
 
1497
                mem_heap_free(heap);
 
1498
        }
 
1499
        return(err);
 
1500
}
 
1501
 
 
1502
/***************************************************************//**
 
1503
Checks if foreign key constraints fail for an index entry. If index
 
1504
is not mentioned in any constraint, this function does nothing,
 
1505
Otherwise does searches to the indexes of referenced tables and
 
1506
sets shared locks which lock either the success or the failure of
 
1507
a constraint.
 
1508
@return DB_SUCCESS or error code */
 
1509
static
 
1510
ulint
 
1511
row_ins_check_foreign_constraints(
 
1512
/*==============================*/
 
1513
        dict_table_t*   table,  /*!< in: table */
 
1514
        dict_index_t*   index,  /*!< in: index */
 
1515
        dtuple_t*       entry,  /*!< in: index entry for index */
 
1516
        que_thr_t*      thr)    /*!< in: query thread */
 
1517
{
 
1518
        dict_foreign_t* foreign;
 
1519
        ulint           err;
 
1520
        trx_t*          trx;
 
1521
        ibool           got_s_lock      = FALSE;
 
1522
 
 
1523
        trx = thr_get_trx(thr);
 
1524
 
 
1525
        foreign = UT_LIST_GET_FIRST(table->foreign_list);
 
1526
 
 
1527
        while (foreign) {
 
1528
                if (foreign->foreign_index == index) {
 
1529
 
 
1530
                        if (foreign->referenced_table == NULL) {
 
1531
                                dict_table_get(foreign->referenced_table_name_lookup,
 
1532
                                               FALSE);
 
1533
                        }
 
1534
 
 
1535
                        if (0 == trx->dict_operation_lock_mode) {
 
1536
                                got_s_lock = TRUE;
 
1537
 
 
1538
                                row_mysql_freeze_data_dictionary(trx);
 
1539
                        }
 
1540
 
 
1541
                        if (foreign->referenced_table) {
 
1542
                                mutex_enter(&(dict_sys->mutex));
 
1543
 
 
1544
                                (foreign->referenced_table
 
1545
                                 ->n_foreign_key_checks_running)++;
 
1546
 
 
1547
                                mutex_exit(&(dict_sys->mutex));
 
1548
                        }
 
1549
 
 
1550
                        /* NOTE that if the thread ends up waiting for a lock
 
1551
                        we will release dict_operation_lock temporarily!
 
1552
                        But the counter on the table protects the referenced
 
1553
                        table from being dropped while the check is running. */
 
1554
 
 
1555
                        err = row_ins_check_foreign_constraint(
 
1556
                                TRUE, foreign, table, entry, thr);
 
1557
 
 
1558
                        if (foreign->referenced_table) {
 
1559
                                mutex_enter(&(dict_sys->mutex));
 
1560
 
 
1561
                                ut_a(foreign->referenced_table
 
1562
                                     ->n_foreign_key_checks_running > 0);
 
1563
                                (foreign->referenced_table
 
1564
                                 ->n_foreign_key_checks_running)--;
 
1565
 
 
1566
                                mutex_exit(&(dict_sys->mutex));
 
1567
                        }
 
1568
 
 
1569
                        if (got_s_lock) {
 
1570
                                row_mysql_unfreeze_data_dictionary(trx);
 
1571
                        }
 
1572
 
 
1573
                        if (err != DB_SUCCESS) {
 
1574
                                return(err);
 
1575
                        }
 
1576
                }
 
1577
 
 
1578
                foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
 
1579
        }
 
1580
 
 
1581
        return(DB_SUCCESS);
 
1582
}
 
1583
 
 
1584
/***************************************************************//**
 
1585
Checks if a unique key violation to rec would occur at the index entry
 
1586
insert.
 
1587
@return TRUE if error */
 
1588
static
 
1589
ibool
 
1590
row_ins_dupl_error_with_rec(
 
1591
/*========================*/
 
1592
        const rec_t*    rec,    /*!< in: user record; NOTE that we assume
 
1593
                                that the caller already has a record lock on
 
1594
                                the record! */
 
1595
        const dtuple_t* entry,  /*!< in: entry to insert */
 
1596
        dict_index_t*   index,  /*!< in: index */
 
1597
        const ulint*    offsets)/*!< in: rec_get_offsets(rec, index) */
 
1598
{
 
1599
        ulint   matched_fields;
 
1600
        ulint   matched_bytes;
 
1601
        ulint   n_unique;
 
1602
        ulint   i;
 
1603
 
 
1604
        ut_ad(rec_offs_validate(rec, index, offsets));
 
1605
 
 
1606
        n_unique = dict_index_get_n_unique(index);
 
1607
 
 
1608
        matched_fields = 0;
 
1609
        matched_bytes = 0;
 
1610
 
 
1611
        cmp_dtuple_rec_with_match(entry, rec, offsets,
 
1612
                                  &matched_fields, &matched_bytes);
 
1613
 
 
1614
        if (matched_fields < n_unique) {
 
1615
 
 
1616
                return(FALSE);
 
1617
        }
 
1618
 
 
1619
        /* In a unique secondary index we allow equal key values if they
 
1620
        contain SQL NULLs */
 
1621
 
 
1622
        if (!dict_index_is_clust(index)) {
 
1623
 
 
1624
                for (i = 0; i < n_unique; i++) {
 
1625
                        if (UNIV_SQL_NULL == dfield_get_len(
 
1626
                                    dtuple_get_nth_field(entry, i))) {
 
1627
 
 
1628
                                return(FALSE);
 
1629
                        }
 
1630
                }
 
1631
        }
 
1632
 
 
1633
        return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
 
1634
}
 
1635
 
 
1636
/***************************************************************//**
 
1637
Scans a unique non-clustered index at a given index entry to determine
 
1638
whether a uniqueness violation has occurred for the key value of the entry.
 
1639
Set shared locks on possible duplicate records.
 
1640
@return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
 
1641
static
 
1642
ulint
 
1643
row_ins_scan_sec_index_for_duplicate(
 
1644
/*=================================*/
 
1645
        dict_index_t*   index,  /*!< in: non-clustered unique index */
 
1646
        dtuple_t*       entry,  /*!< in: index entry */
 
1647
        que_thr_t*      thr)    /*!< in: query thread */
 
1648
{
 
1649
        ulint           n_unique;
 
1650
        ulint           i;
 
1651
        int             cmp;
 
1652
        ulint           n_fields_cmp;
 
1653
        btr_pcur_t      pcur;
 
1654
        ulint           err             = DB_SUCCESS;
 
1655
        unsigned        allow_duplicates;
 
1656
        mtr_t           mtr;
 
1657
        mem_heap_t*     heap            = NULL;
 
1658
        ulint           offsets_[REC_OFFS_NORMAL_SIZE];
 
1659
        ulint*          offsets         = offsets_;
 
1660
        rec_offs_init(offsets_);
 
1661
 
 
1662
        n_unique = dict_index_get_n_unique(index);
 
1663
 
 
1664
        /* If the secondary index is unique, but one of the fields in the
 
1665
        n_unique first fields is NULL, a unique key violation cannot occur,
 
1666
        since we define NULL != NULL in this case */
 
1667
 
 
1668
        for (i = 0; i < n_unique; i++) {
 
1669
                if (UNIV_SQL_NULL == dfield_get_len(
 
1670
                            dtuple_get_nth_field(entry, i))) {
 
1671
 
 
1672
                        return(DB_SUCCESS);
 
1673
                }
 
1674
        }
 
1675
 
 
1676
        mtr_start(&mtr);
 
1677
 
 
1678
        /* Store old value on n_fields_cmp */
 
1679
 
 
1680
        n_fields_cmp = dtuple_get_n_fields_cmp(entry);
 
1681
 
 
1682
        dtuple_set_n_fields_cmp(entry, dict_index_get_n_unique(index));
 
1683
 
 
1684
        btr_pcur_open(index, entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr);
 
1685
 
 
1686
        allow_duplicates = thr_get_trx(thr)->duplicates & TRX_DUP_IGNORE;
 
1687
 
 
1688
        /* Scan index records and check if there is a duplicate */
 
1689
 
 
1690
        do {
 
1691
                const rec_t*            rec     = btr_pcur_get_rec(&pcur);
 
1692
                const buf_block_t*      block   = btr_pcur_get_block(&pcur);
 
1693
 
 
1694
                if (page_rec_is_infimum(rec)) {
 
1695
 
 
1696
                        continue;
 
1697
                }
 
1698
 
 
1699
                offsets = rec_get_offsets(rec, index, offsets,
 
1700
                                          ULINT_UNDEFINED, &heap);
 
1701
 
 
1702
                if (allow_duplicates) {
 
1703
 
 
1704
                        /* If the SQL-query will update or replace
 
1705
                        duplicate key we will take X-lock for
 
1706
                        duplicates ( REPLACE, LOAD DATAFILE REPLACE,
 
1707
                        INSERT ON DUPLICATE KEY UPDATE). */
 
1708
 
 
1709
                        err = row_ins_set_exclusive_rec_lock(
 
1710
                                LOCK_ORDINARY, block,
 
1711
                                rec, index, offsets, thr);
 
1712
                } else {
 
1713
 
 
1714
                        err = row_ins_set_shared_rec_lock(
 
1715
                                LOCK_ORDINARY, block,
 
1716
                                rec, index, offsets, thr);
 
1717
                }
 
1718
 
 
1719
                switch (err) {
 
1720
                case DB_SUCCESS_LOCKED_REC:
 
1721
                        err = DB_SUCCESS;
 
1722
                case DB_SUCCESS:
 
1723
                        break;
 
1724
                default:
 
1725
                        goto end_scan;
 
1726
                }
 
1727
 
 
1728
                if (page_rec_is_supremum(rec)) {
 
1729
 
 
1730
                        continue;
 
1731
                }
 
1732
 
 
1733
                cmp = cmp_dtuple_rec(entry, rec, offsets);
 
1734
 
 
1735
                if (cmp == 0) {
 
1736
                        if (row_ins_dupl_error_with_rec(rec, entry,
 
1737
                                                        index, offsets)) {
 
1738
                                err = DB_DUPLICATE_KEY;
 
1739
 
 
1740
                                thr_get_trx(thr)->error_info = index;
 
1741
 
 
1742
                                goto end_scan;
 
1743
                        }
 
1744
                } else {
 
1745
                        ut_a(cmp < 0);
 
1746
                        goto end_scan;
 
1747
                }
 
1748
        } while (btr_pcur_move_to_next(&pcur, &mtr));
 
1749
 
 
1750
end_scan:
 
1751
        if (UNIV_LIKELY_NULL(heap)) {
 
1752
                mem_heap_free(heap);
 
1753
        }
 
1754
        mtr_commit(&mtr);
 
1755
 
 
1756
        /* Restore old value */
 
1757
        dtuple_set_n_fields_cmp(entry, n_fields_cmp);
 
1758
 
 
1759
        return(err);
 
1760
}
 
1761
 
 
1762
/***************************************************************//**
 
1763
Checks if a unique key violation error would occur at an index entry
 
1764
insert. Sets shared locks on possible duplicate records. Works only
 
1765
for a clustered index!
 
1766
@return DB_SUCCESS if no error, DB_DUPLICATE_KEY if error,
 
1767
DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
 
1768
record */
 
1769
static
 
1770
ulint
 
1771
row_ins_duplicate_error_in_clust(
 
1772
/*=============================*/
 
1773
        btr_cur_t*      cursor, /*!< in: B-tree cursor */
 
1774
        const dtuple_t* entry,  /*!< in: entry to insert */
 
1775
        que_thr_t*      thr,    /*!< in: query thread */
 
1776
        mtr_t*          mtr)    /*!< in: mtr */
 
1777
{
 
1778
        ulint   err;
 
1779
        rec_t*  rec;
 
1780
        ulint   n_unique;
 
1781
        trx_t*  trx             = thr_get_trx(thr);
 
1782
        mem_heap_t*heap         = NULL;
 
1783
        ulint   offsets_[REC_OFFS_NORMAL_SIZE];
 
1784
        ulint*  offsets         = offsets_;
 
1785
        rec_offs_init(offsets_);
 
1786
 
 
1787
        UT_NOT_USED(mtr);
 
1788
 
 
1789
        ut_a(dict_index_is_clust(cursor->index));
 
1790
        ut_ad(dict_index_is_unique(cursor->index));
 
1791
 
 
1792
        /* NOTE: For unique non-clustered indexes there may be any number
 
1793
        of delete marked records with the same value for the non-clustered
 
1794
        index key (remember multiversioning), and which differ only in
 
1795
        the row refererence part of the index record, containing the
 
1796
        clustered index key fields. For such a secondary index record,
 
1797
        to avoid race condition, we must FIRST do the insertion and after
 
1798
        that check that the uniqueness condition is not breached! */
 
1799
 
 
1800
        /* NOTE: A problem is that in the B-tree node pointers on an
 
1801
        upper level may match more to the entry than the actual existing
 
1802
        user records on the leaf level. So, even if low_match would suggest
 
1803
        that a duplicate key violation may occur, this may not be the case. */
 
1804
 
 
1805
        n_unique = dict_index_get_n_unique(cursor->index);
 
1806
 
 
1807
        if (cursor->low_match >= n_unique) {
 
1808
 
 
1809
                rec = btr_cur_get_rec(cursor);
 
1810
 
 
1811
                if (!page_rec_is_infimum(rec)) {
 
1812
                        offsets = rec_get_offsets(rec, cursor->index, offsets,
 
1813
                                                  ULINT_UNDEFINED, &heap);
 
1814
 
 
1815
                        /* We set a lock on the possible duplicate: this
 
1816
                        is needed in logical logging of MySQL to make
 
1817
                        sure that in roll-forward we get the same duplicate
 
1818
                        errors as in original execution */
 
1819
 
 
1820
                        if (trx->duplicates & TRX_DUP_IGNORE) {
 
1821
 
 
1822
                                /* If the SQL-query will update or replace
 
1823
                                duplicate key we will take X-lock for
 
1824
                                duplicates ( REPLACE, LOAD DATAFILE REPLACE,
 
1825
                                INSERT ON DUPLICATE KEY UPDATE). */
 
1826
 
 
1827
                                err = row_ins_set_exclusive_rec_lock(
 
1828
                                        LOCK_REC_NOT_GAP,
 
1829
                                        btr_cur_get_block(cursor),
 
1830
                                        rec, cursor->index, offsets, thr);
 
1831
                        } else {
 
1832
 
 
1833
                                err = row_ins_set_shared_rec_lock(
 
1834
                                        LOCK_REC_NOT_GAP,
 
1835
                                        btr_cur_get_block(cursor), rec,
 
1836
                                        cursor->index, offsets, thr);
 
1837
                        }
 
1838
 
 
1839
                        switch (err) {
 
1840
                        case DB_SUCCESS_LOCKED_REC:
 
1841
                        case DB_SUCCESS:
 
1842
                                break;
 
1843
                        default:
 
1844
                                goto func_exit;
 
1845
                        }
 
1846
 
 
1847
                        if (row_ins_dupl_error_with_rec(
 
1848
                                    rec, entry, cursor->index, offsets)) {
 
1849
                                trx->error_info = cursor->index;
 
1850
                                err = DB_DUPLICATE_KEY;
 
1851
                                goto func_exit;
 
1852
                        }
 
1853
                }
 
1854
        }
 
1855
 
 
1856
        if (cursor->up_match >= n_unique) {
 
1857
 
 
1858
                rec = page_rec_get_next(btr_cur_get_rec(cursor));
 
1859
 
 
1860
                if (!page_rec_is_supremum(rec)) {
 
1861
                        offsets = rec_get_offsets(rec, cursor->index, offsets,
 
1862
                                                  ULINT_UNDEFINED, &heap);
 
1863
 
 
1864
                        if (trx->duplicates & TRX_DUP_IGNORE) {
 
1865
 
 
1866
                                /* If the SQL-query will update or replace
 
1867
                                duplicate key we will take X-lock for
 
1868
                                duplicates ( REPLACE, LOAD DATAFILE REPLACE,
 
1869
                                INSERT ON DUPLICATE KEY UPDATE). */
 
1870
 
 
1871
                                err = row_ins_set_exclusive_rec_lock(
 
1872
                                        LOCK_REC_NOT_GAP,
 
1873
                                        btr_cur_get_block(cursor),
 
1874
                                        rec, cursor->index, offsets, thr);
 
1875
                        } else {
 
1876
 
 
1877
                                err = row_ins_set_shared_rec_lock(
 
1878
                                        LOCK_REC_NOT_GAP,
 
1879
                                        btr_cur_get_block(cursor),
 
1880
                                        rec, cursor->index, offsets, thr);
 
1881
                        }
 
1882
 
 
1883
                        switch (err) {
 
1884
                        case DB_SUCCESS_LOCKED_REC:
 
1885
                        case DB_SUCCESS:
 
1886
                                break;
 
1887
                        default:
 
1888
                                goto func_exit;
 
1889
                        }
 
1890
 
 
1891
                        if (row_ins_dupl_error_with_rec(
 
1892
                                    rec, entry, cursor->index, offsets)) {
 
1893
                                trx->error_info = cursor->index;
 
1894
                                err = DB_DUPLICATE_KEY;
 
1895
                                goto func_exit;
 
1896
                        }
 
1897
                }
 
1898
 
 
1899
                ut_a(!dict_index_is_clust(cursor->index));
 
1900
                /* This should never happen */
 
1901
        }
 
1902
 
 
1903
        err = DB_SUCCESS;
 
1904
func_exit:
 
1905
        if (UNIV_LIKELY_NULL(heap)) {
 
1906
                mem_heap_free(heap);
 
1907
        }
 
1908
        return(err);
 
1909
}
 
1910
 
 
1911
/***************************************************************//**
 
1912
Checks if an index entry has long enough common prefix with an existing
 
1913
record so that the intended insert of the entry must be changed to a modify of
 
1914
the existing record. In the case of a clustered index, the prefix must be
 
1915
n_unique fields long, and in the case of a secondary index, all fields must be
 
1916
equal.
 
1917
@return 0 if no update, ROW_INS_PREV if previous should be updated;
 
1918
currently we do the search so that only the low_match record can match
 
1919
enough to the search tuple, not the next record */
 
1920
UNIV_INLINE
 
1921
ulint
 
1922
row_ins_must_modify(
 
1923
/*================*/
 
1924
        btr_cur_t*      cursor) /*!< in: B-tree cursor */
 
1925
{
 
1926
        ulint   enough_match;
 
1927
        rec_t*  rec;
 
1928
 
 
1929
        /* NOTE: (compare to the note in row_ins_duplicate_error) Because node
 
1930
        pointers on upper levels of the B-tree may match more to entry than
 
1931
        to actual user records on the leaf level, we have to check if the
 
1932
        candidate record is actually a user record. In a clustered index
 
1933
        node pointers contain index->n_unique first fields, and in the case
 
1934
        of a secondary index, all fields of the index. */
 
1935
 
 
1936
        enough_match = dict_index_get_n_unique_in_tree(cursor->index);
 
1937
 
 
1938
        if (cursor->low_match >= enough_match) {
 
1939
 
 
1940
                rec = btr_cur_get_rec(cursor);
 
1941
 
 
1942
                if (!page_rec_is_infimum(rec)) {
 
1943
 
 
1944
                        return(ROW_INS_PREV);
 
1945
                }
 
1946
        }
 
1947
 
 
1948
        return(0);
 
1949
}
 
1950
 
 
1951
/***************************************************************//**
 
1952
Tries to insert an index entry to an index. If the index is clustered
 
1953
and a record with the same unique key is found, the other record is
 
1954
necessarily marked deleted by a committed transaction, or a unique key
 
1955
violation error occurs. The delete marked record is then updated to an
 
1956
existing record, and we must write an undo log record on the delete
 
1957
marked record. If the index is secondary, and a record with exactly the
 
1958
same fields is found, the other record is necessarily marked deleted.
 
1959
It is then unmarked. Otherwise, the entry is just inserted to the index.
 
1960
@return DB_SUCCESS, DB_LOCK_WAIT, DB_FAIL if pessimistic retry needed,
 
1961
or error code */
 
1962
static
 
1963
ulint
 
1964
row_ins_index_entry_low(
 
1965
/*====================*/
 
1966
        ulint           mode,   /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
 
1967
                                depending on whether we wish optimistic or
 
1968
                                pessimistic descent down the index tree */
 
1969
        dict_index_t*   index,  /*!< in: index */
 
1970
        dtuple_t*       entry,  /*!< in/out: index entry to insert */
 
1971
        ulint           n_ext,  /*!< in: number of externally stored columns */
 
1972
        que_thr_t*      thr)    /*!< in: query thread */
 
1973
{
 
1974
        btr_cur_t       cursor;
 
1975
        ulint           search_mode;
 
1976
        ulint           modify = 0; /* remove warning */
 
1977
        rec_t*          insert_rec;
 
1978
        rec_t*          rec;
 
1979
        ulint           err;
 
1980
        ulint           n_unique;
 
1981
        big_rec_t*      big_rec                 = NULL;
 
1982
        mtr_t           mtr;
 
1983
        mem_heap_t*     heap                    = NULL;
 
1984
 
 
1985
        log_free_check();
 
1986
 
 
1987
        mtr_start(&mtr);
 
1988
 
 
1989
        cursor.thr = thr;
 
1990
 
 
1991
        /* Note that we use PAGE_CUR_LE as the search mode, because then
 
1992
        the function will return in both low_match and up_match of the
 
1993
        cursor sensible values */
 
1994
 
 
1995
        if (dict_index_is_clust(index)) {
 
1996
                search_mode = mode;
 
1997
        } else if (!(thr_get_trx(thr)->check_unique_secondary)) {
 
1998
                search_mode = mode | BTR_INSERT | BTR_IGNORE_SEC_UNIQUE;
 
1999
        } else {
 
2000
                search_mode = mode | BTR_INSERT;
 
2001
        }
 
2002
 
 
2003
        btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
 
2004
                                    search_mode,
 
2005
                                    &cursor, 0, __FILE__, __LINE__, &mtr);
 
2006
 
 
2007
        if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
 
2008
                /* The insertion was made to the insert buffer already during
 
2009
                the search: we are done */
 
2010
 
 
2011
                ut_ad(search_mode & BTR_INSERT);
 
2012
                err = DB_SUCCESS;
 
2013
 
 
2014
                goto function_exit;
 
2015
        }
 
2016
 
 
2017
#ifdef UNIV_DEBUG
 
2018
        {
 
2019
                page_t* page = btr_cur_get_page(&cursor);
 
2020
                rec_t*  first_rec = page_rec_get_next(
 
2021
                        page_get_infimum_rec(page));
 
2022
 
 
2023
                ut_ad(page_rec_is_supremum(first_rec)
 
2024
                      || rec_get_n_fields(first_rec, index)
 
2025
                      == dtuple_get_n_fields(entry));
 
2026
        }
 
2027
#endif
 
2028
 
 
2029
        n_unique = dict_index_get_n_unique(index);
 
2030
 
 
2031
        if (dict_index_is_unique(index) && (cursor.up_match >= n_unique
 
2032
                                            || cursor.low_match >= n_unique)) {
 
2033
 
 
2034
                if (dict_index_is_clust(index)) {
 
2035
                        /* Note that the following may return also
 
2036
                        DB_LOCK_WAIT */
 
2037
 
 
2038
                        err = row_ins_duplicate_error_in_clust(
 
2039
                                &cursor, entry, thr, &mtr);
 
2040
                        if (err != DB_SUCCESS) {
 
2041
 
 
2042
                                goto function_exit;
 
2043
                        }
 
2044
                } else {
 
2045
                        mtr_commit(&mtr);
 
2046
                        err = row_ins_scan_sec_index_for_duplicate(
 
2047
                                index, entry, thr);
 
2048
                        mtr_start(&mtr);
 
2049
 
 
2050
                        if (err != DB_SUCCESS) {
 
2051
                                goto function_exit;
 
2052
                        }
 
2053
 
 
2054
                        /* We did not find a duplicate and we have now
 
2055
                        locked with s-locks the necessary records to
 
2056
                        prevent any insertion of a duplicate by another
 
2057
                        transaction. Let us now reposition the cursor and
 
2058
                        continue the insertion. */
 
2059
 
 
2060
                        btr_cur_search_to_nth_level(index, 0, entry,
 
2061
                                                    PAGE_CUR_LE,
 
2062
                                                    mode | BTR_INSERT,
 
2063
                                                    &cursor, 0,
 
2064
                                                    __FILE__, __LINE__, &mtr);
 
2065
                }
 
2066
        }
 
2067
 
 
2068
        modify = row_ins_must_modify(&cursor);
 
2069
 
 
2070
        if (modify != 0) {
 
2071
                /* There is already an index entry with a long enough common
 
2072
                prefix, we must convert the insert into a modify of an
 
2073
                existing record */
 
2074
 
 
2075
                if (modify == ROW_INS_NEXT) {
 
2076
                        rec = page_rec_get_next(btr_cur_get_rec(&cursor));
 
2077
 
 
2078
                        btr_cur_position(index, rec,
 
2079
                                         btr_cur_get_block(&cursor),&cursor);
 
2080
                }
 
2081
 
 
2082
                if (dict_index_is_clust(index)) {
 
2083
                        err = row_ins_clust_index_entry_by_modify(
 
2084
                                mode, &cursor, &heap, &big_rec, entry,
 
2085
                                thr, &mtr);
 
2086
                } else {
 
2087
                        ut_ad(!n_ext);
 
2088
                        err = row_ins_sec_index_entry_by_modify(
 
2089
                                mode, &cursor, entry, thr, &mtr);
 
2090
                }
 
2091
        } else {
 
2092
                if (mode == BTR_MODIFY_LEAF) {
 
2093
                        err = btr_cur_optimistic_insert(
 
2094
                                0, &cursor, entry, &insert_rec, &big_rec,
 
2095
                                n_ext, thr, &mtr);
 
2096
                } else {
 
2097
                        ut_a(mode == BTR_MODIFY_TREE);
 
2098
                        if (buf_LRU_buf_pool_running_out()) {
 
2099
 
 
2100
                                err = DB_LOCK_TABLE_FULL;
 
2101
 
 
2102
                                goto function_exit;
 
2103
                        }
 
2104
                        err = btr_cur_pessimistic_insert(
 
2105
                                0, &cursor, entry, &insert_rec, &big_rec,
 
2106
                                n_ext, thr, &mtr);
 
2107
                }
 
2108
        }
 
2109
 
 
2110
function_exit:
 
2111
        mtr_commit(&mtr);
 
2112
 
 
2113
        if (UNIV_LIKELY_NULL(big_rec)) {
 
2114
                rec_t*  rec;
 
2115
                ulint*  offsets;
 
2116
                mtr_start(&mtr);
 
2117
 
 
2118
                btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
 
2119
                                            BTR_MODIFY_TREE, &cursor, 0,
 
2120
                                            __FILE__, __LINE__, &mtr);
 
2121
                rec = btr_cur_get_rec(&cursor);
 
2122
                offsets = rec_get_offsets(rec, index, NULL,
 
2123
                                          ULINT_UNDEFINED, &heap);
 
2124
 
 
2125
                err = btr_store_big_rec_extern_fields(
 
2126
                        index, btr_cur_get_block(&cursor),
 
2127
                        rec, offsets, &mtr, FALSE, big_rec);
 
2128
 
 
2129
                if (modify) {
 
2130
                        dtuple_big_rec_free(big_rec);
 
2131
                } else {
 
2132
                        dtuple_convert_back_big_rec(index, entry, big_rec);
 
2133
                }
 
2134
 
 
2135
                mtr_commit(&mtr);
 
2136
        }
 
2137
 
 
2138
        if (UNIV_LIKELY_NULL(heap)) {
 
2139
                mem_heap_free(heap);
 
2140
        }
 
2141
        return(err);
 
2142
}
 
2143
 
 
2144
/***************************************************************//**
 
2145
Inserts an index entry to index. Tries first optimistic, then pessimistic
 
2146
descent down the tree. If the entry matches enough to a delete marked record,
 
2147
performs the insert by updating or delete unmarking the delete marked
 
2148
record.
 
2149
@return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
 
2150
UNIV_INTERN
 
2151
ulint
 
2152
row_ins_index_entry(
 
2153
/*================*/
 
2154
        dict_index_t*   index,  /*!< in: index */
 
2155
        dtuple_t*       entry,  /*!< in/out: index entry to insert */
 
2156
        ulint           n_ext,  /*!< in: number of externally stored columns */
 
2157
        ibool           foreign,/*!< in: TRUE=check foreign key constraints
 
2158
                                (foreign=FALSE only during CREATE INDEX) */
 
2159
        que_thr_t*      thr)    /*!< in: query thread */
 
2160
{
 
2161
        ulint   err;
 
2162
 
 
2163
        if (foreign && UT_LIST_GET_FIRST(index->table->foreign_list)) {
 
2164
                err = row_ins_check_foreign_constraints(index->table, index,
 
2165
                                                        entry, thr);
 
2166
                if (err != DB_SUCCESS) {
 
2167
 
 
2168
                        return(err);
 
2169
                }
 
2170
        }
 
2171
 
 
2172
        /* Try first optimistic descent to the B-tree */
 
2173
 
 
2174
        err = row_ins_index_entry_low(BTR_MODIFY_LEAF, index, entry,
 
2175
                                      n_ext, thr);
 
2176
        if (err != DB_FAIL) {
 
2177
 
 
2178
                return(err);
 
2179
        }
 
2180
 
 
2181
        /* Try then pessimistic descent to the B-tree */
 
2182
 
 
2183
        err = row_ins_index_entry_low(BTR_MODIFY_TREE, index, entry,
 
2184
                                      n_ext, thr);
 
2185
        return(err);
 
2186
}
 
2187
 
 
2188
/***********************************************************//**
 
2189
Sets the values of the dtuple fields in entry from the values of appropriate
 
2190
columns in row. */
 
2191
static
 
2192
void
 
2193
row_ins_index_entry_set_vals(
 
2194
/*=========================*/
 
2195
        dict_index_t*   index,  /*!< in: index */
 
2196
        dtuple_t*       entry,  /*!< in: index entry to make */
 
2197
        const dtuple_t* row)    /*!< in: row */
 
2198
{
 
2199
        ulint   n_fields;
 
2200
        ulint   i;
 
2201
 
 
2202
        ut_ad(entry && row);
 
2203
 
 
2204
        n_fields = dtuple_get_n_fields(entry);
 
2205
 
 
2206
        for (i = 0; i < n_fields; i++) {
 
2207
                dict_field_t*   ind_field;
 
2208
                dfield_t*       field;
 
2209
                const dfield_t* row_field;
 
2210
                ulint           len;
 
2211
 
 
2212
                field = dtuple_get_nth_field(entry, i);
 
2213
                ind_field = dict_index_get_nth_field(index, i);
 
2214
                row_field = dtuple_get_nth_field(row, ind_field->col->ind);
 
2215
                len = dfield_get_len(row_field);
 
2216
 
 
2217
                /* Check column prefix indexes */
 
2218
                if (ind_field->prefix_len > 0
 
2219
                    && dfield_get_len(row_field) != UNIV_SQL_NULL) {
 
2220
 
 
2221
                        const   dict_col_t*     col
 
2222
                                = dict_field_get_col(ind_field);
 
2223
 
 
2224
                        len = dtype_get_at_most_n_mbchars(
 
2225
                                col->prtype, col->mbminmaxlen,
 
2226
                                ind_field->prefix_len,
 
2227
                                len, dfield_get_data(row_field));
 
2228
 
 
2229
                        ut_ad(!dfield_is_ext(row_field));
 
2230
                }
 
2231
 
 
2232
                dfield_set_data(field, dfield_get_data(row_field), len);
 
2233
                if (dfield_is_ext(row_field)) {
 
2234
                        ut_ad(dict_index_is_clust(index));
 
2235
                        dfield_set_ext(field);
 
2236
                }
 
2237
        }
 
2238
}
 
2239
 
 
2240
/***********************************************************//**
 
2241
Inserts a single index entry to the table.
 
2242
@return DB_SUCCESS if operation successfully completed, else error
 
2243
code or DB_LOCK_WAIT */
 
2244
static
 
2245
ulint
 
2246
row_ins_index_entry_step(
 
2247
/*=====================*/
 
2248
        ins_node_t*     node,   /*!< in: row insert node */
 
2249
        que_thr_t*      thr)    /*!< in: query thread */
 
2250
{
 
2251
        ulint   err;
 
2252
 
 
2253
        ut_ad(dtuple_check_typed(node->row));
 
2254
 
 
2255
        row_ins_index_entry_set_vals(node->index, node->entry, node->row);
 
2256
 
 
2257
        ut_ad(dtuple_check_typed(node->entry));
 
2258
 
 
2259
        err = row_ins_index_entry(node->index, node->entry, 0, TRUE, thr);
 
2260
 
 
2261
        return(err);
 
2262
}
 
2263
 
 
2264
/***********************************************************//**
 
2265
Allocates a row id for row and inits the node->index field. */
 
2266
UNIV_INLINE
 
2267
void
 
2268
row_ins_alloc_row_id_step(
 
2269
/*======================*/
 
2270
        ins_node_t*     node)   /*!< in: row insert node */
 
2271
{
 
2272
        row_id_t        row_id;
 
2273
 
 
2274
        ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
 
2275
 
 
2276
        if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
 
2277
 
 
2278
                /* No row id is stored if the clustered index is unique */
 
2279
 
 
2280
                return;
 
2281
        }
 
2282
 
 
2283
        /* Fill in row id value to row */
 
2284
 
 
2285
        row_id = dict_sys_get_new_row_id();
 
2286
 
 
2287
        dict_sys_write_row_id(node->row_id_buf, row_id);
 
2288
}
 
2289
 
 
2290
/***********************************************************//**
 
2291
Gets a row to insert from the values list. */
 
2292
UNIV_INLINE
 
2293
void
 
2294
row_ins_get_row_from_values(
 
2295
/*========================*/
 
2296
        ins_node_t*     node)   /*!< in: row insert node */
 
2297
{
 
2298
        que_node_t*     list_node;
 
2299
        dfield_t*       dfield;
 
2300
        dtuple_t*       row;
 
2301
        ulint           i;
 
2302
 
 
2303
        /* The field values are copied in the buffers of the select node and
 
2304
        it is safe to use them until we fetch from select again: therefore
 
2305
        we can just copy the pointers */
 
2306
 
 
2307
        row = node->row;
 
2308
 
 
2309
        i = 0;
 
2310
        list_node = node->values_list;
 
2311
 
 
2312
        while (list_node) {
 
2313
                eval_exp(list_node);
 
2314
 
 
2315
                dfield = dtuple_get_nth_field(row, i);
 
2316
                dfield_copy_data(dfield, que_node_get_val(list_node));
 
2317
 
 
2318
                i++;
 
2319
                list_node = que_node_get_next(list_node);
 
2320
        }
 
2321
}
 
2322
 
 
2323
/***********************************************************//**
 
2324
Gets a row to insert from the select list. */
 
2325
UNIV_INLINE
 
2326
void
 
2327
row_ins_get_row_from_select(
 
2328
/*========================*/
 
2329
        ins_node_t*     node)   /*!< in: row insert node */
 
2330
{
 
2331
        que_node_t*     list_node;
 
2332
        dfield_t*       dfield;
 
2333
        dtuple_t*       row;
 
2334
        ulint           i;
 
2335
 
 
2336
        /* The field values are copied in the buffers of the select node and
 
2337
        it is safe to use them until we fetch from select again: therefore
 
2338
        we can just copy the pointers */
 
2339
 
 
2340
        row = node->row;
 
2341
 
 
2342
        i = 0;
 
2343
        list_node = node->select->select_list;
 
2344
 
 
2345
        while (list_node) {
 
2346
                dfield = dtuple_get_nth_field(row, i);
 
2347
                dfield_copy_data(dfield, que_node_get_val(list_node));
 
2348
 
 
2349
                i++;
 
2350
                list_node = que_node_get_next(list_node);
 
2351
        }
 
2352
}
 
2353
 
 
2354
/***********************************************************//**
 
2355
Inserts a row to a table.
 
2356
@return DB_SUCCESS if operation successfully completed, else error
 
2357
code or DB_LOCK_WAIT */
 
2358
static
 
2359
ulint
 
2360
row_ins(
 
2361
/*====*/
 
2362
        ins_node_t*     node,   /*!< in: row insert node */
 
2363
        que_thr_t*      thr)    /*!< in: query thread */
 
2364
{
 
2365
        ulint   err;
 
2366
 
 
2367
        ut_ad(node && thr);
 
2368
 
 
2369
        if (node->state == INS_NODE_ALLOC_ROW_ID) {
 
2370
 
 
2371
                row_ins_alloc_row_id_step(node);
 
2372
 
 
2373
                node->index = dict_table_get_first_index(node->table);
 
2374
                node->entry = UT_LIST_GET_FIRST(node->entry_list);
 
2375
 
 
2376
                if (node->ins_type == INS_SEARCHED) {
 
2377
 
 
2378
                        row_ins_get_row_from_select(node);
 
2379
 
 
2380
                } else if (node->ins_type == INS_VALUES) {
 
2381
 
 
2382
                        row_ins_get_row_from_values(node);
 
2383
                }
 
2384
 
 
2385
                node->state = INS_NODE_INSERT_ENTRIES;
 
2386
        }
 
2387
 
 
2388
        ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
 
2389
 
 
2390
        while (node->index != NULL) {
 
2391
                err = row_ins_index_entry_step(node, thr);
 
2392
 
 
2393
                if (err != DB_SUCCESS) {
 
2394
 
 
2395
                        return(err);
 
2396
                }
 
2397
 
 
2398
                node->index = dict_table_get_next_index(node->index);
 
2399
                node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
 
2400
 
 
2401
                /* Skip corrupted secondar index and its entry */
 
2402
                while (node->index && dict_index_is_corrupted(node->index)) {
 
2403
 
 
2404
                        node->index = dict_table_get_next_index(node->index);
 
2405
                        node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
 
2406
                }
 
2407
        }
 
2408
 
 
2409
        ut_ad(node->entry == NULL);
 
2410
 
 
2411
        node->state = INS_NODE_ALLOC_ROW_ID;
 
2412
 
 
2413
        return(DB_SUCCESS);
 
2414
}
 
2415
 
 
2416
/***********************************************************//**
 
2417
Inserts a row to a table. This is a high-level function used in SQL execution
 
2418
graphs.
 
2419
@return query thread to run next or NULL */
 
2420
UNIV_INTERN
 
2421
que_thr_t*
 
2422
row_ins_step(
 
2423
/*=========*/
 
2424
        que_thr_t*      thr)    /*!< in: query thread */
 
2425
{
 
2426
        ins_node_t*     node;
 
2427
        que_node_t*     parent;
 
2428
        sel_node_t*     sel_node;
 
2429
        trx_t*          trx;
 
2430
        ulint           err;
 
2431
 
 
2432
        ut_ad(thr);
 
2433
 
 
2434
        trx = thr_get_trx(thr);
 
2435
 
 
2436
        trx_start_if_not_started(trx);
 
2437
 
 
2438
        node = thr->run_node;
 
2439
 
 
2440
        ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
 
2441
 
 
2442
        parent = que_node_get_parent(node);
 
2443
        sel_node = node->select;
 
2444
 
 
2445
        if (thr->prev_node == parent) {
 
2446
                node->state = INS_NODE_SET_IX_LOCK;
 
2447
        }
 
2448
 
 
2449
        /* If this is the first time this node is executed (or when
 
2450
        execution resumes after wait for the table IX lock), set an
 
2451
        IX lock on the table and reset the possible select node. MySQL's
 
2452
        partitioned table code may also call an insert within the same
 
2453
        SQL statement AFTER it has used this table handle to do a search.
 
2454
        This happens, for example, when a row update moves it to another
 
2455
        partition. In that case, we have already set the IX lock on the
 
2456
        table during the search operation, and there is no need to set
 
2457
        it again here. But we must write trx->id to node->trx_id_buf. */
 
2458
 
 
2459
        trx_write_trx_id(node->trx_id_buf, trx->id);
 
2460
 
 
2461
        if (node->state == INS_NODE_SET_IX_LOCK) {
 
2462
 
 
2463
                /* It may be that the current session has not yet started
 
2464
                its transaction, or it has been committed: */
 
2465
 
 
2466
                if (trx->id == node->trx_id) {
 
2467
                        /* No need to do IX-locking */
 
2468
 
 
2469
                        goto same_trx;
 
2470
                }
 
2471
 
 
2472
                err = lock_table(0, node->table, LOCK_IX, thr);
 
2473
 
 
2474
                if (err != DB_SUCCESS) {
 
2475
 
 
2476
                        goto error_handling;
 
2477
                }
 
2478
 
 
2479
                node->trx_id = trx->id;
 
2480
same_trx:
 
2481
                node->state = INS_NODE_ALLOC_ROW_ID;
 
2482
 
 
2483
                if (node->ins_type == INS_SEARCHED) {
 
2484
                        /* Reset the cursor */
 
2485
                        sel_node->state = SEL_NODE_OPEN;
 
2486
 
 
2487
                        /* Fetch a row to insert */
 
2488
 
 
2489
                        thr->run_node = sel_node;
 
2490
 
 
2491
                        return(thr);
 
2492
                }
 
2493
        }
 
2494
 
 
2495
        if ((node->ins_type == INS_SEARCHED)
 
2496
            && (sel_node->state != SEL_NODE_FETCH)) {
 
2497
 
 
2498
                ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
 
2499
 
 
2500
                /* No more rows to insert */
 
2501
                thr->run_node = parent;
 
2502
 
 
2503
                return(thr);
 
2504
        }
 
2505
 
 
2506
        /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
 
2507
 
 
2508
        err = row_ins(node, thr);
 
2509
 
 
2510
error_handling:
 
2511
        trx->error_state = err;
 
2512
 
 
2513
        if (err != DB_SUCCESS) {
 
2514
                /* err == DB_LOCK_WAIT or SQL error detected */
 
2515
                return(NULL);
 
2516
        }
 
2517
 
 
2518
        /* DO THE TRIGGER ACTIONS HERE */
 
2519
 
 
2520
        if (node->ins_type == INS_SEARCHED) {
 
2521
                /* Fetch a row to insert */
 
2522
 
 
2523
                thr->run_node = sel_node;
 
2524
        } else {
 
2525
                thr->run_node = que_node_get_parent(node);
 
2526
        }
 
2527
 
 
2528
        return(thr);
 
2529
}