~jlukas79/+junk/mysql-server

« back to all changes in this revision

Viewing changes to storage/maria/ma_create.c

manual merge 6.0-main --> 6.0-bka-review

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/* Create a MARIA table */
 
17
 
 
18
#include "ma_ftdefs.h"
 
19
#include "ma_sp_defs.h"
 
20
#include <my_bit.h>
 
21
#include "ma_blockrec.h"
 
22
#include "trnman_public.h"
 
23
 
 
24
#if defined(MSDOS) || defined(__WIN__)
 
25
#ifdef __WIN__
 
26
#include <fcntl.h>
 
27
#else
 
28
#include <process.h>                    /* Prototype for getpid */
 
29
#endif
 
30
#endif
 
31
#include <m_ctype.h>
 
32
 
 
33
static int compare_columns(MARIA_COLUMNDEF **a, MARIA_COLUMNDEF **b);
 
34
 
 
35
/*
 
36
  Old options is used when recreating database, from maria_chk
 
37
*/
 
38
 
 
39
int maria_create(const char *name, enum data_file_type datafile_type,
 
40
                 uint keys,MARIA_KEYDEF *keydefs,
 
41
                 uint columns, MARIA_COLUMNDEF *columndef,
 
42
                 uint uniques, MARIA_UNIQUEDEF *uniquedefs,
 
43
                 MARIA_CREATE_INFO *ci,uint flags)
 
44
{
 
45
  register uint i,j;
 
46
  File dfile,file;
 
47
  int errpos,save_errno, create_mode= O_RDWR | O_TRUNC, res;
 
48
  myf create_flag;
 
49
  uint length,max_key_length,packed,pack_bytes,pointer,real_length_diff,
 
50
       key_length,info_length,key_segs,options,min_key_length,
 
51
       base_pos,long_varchar_count,varchar_length,
 
52
       unique_key_parts,fulltext_keys,offset, not_block_record_extra_length;
 
53
  uint max_field_lengths, extra_header_size, column_nr;
 
54
  ulong reclength, real_reclength,min_pack_length;
 
55
  char filename[FN_REFLEN], linkname[FN_REFLEN], *linkname_ptr;
 
56
  ulong pack_reclength;
 
57
  ulonglong tot_length,max_rows, tmp;
 
58
  enum en_fieldtype type;
 
59
  enum data_file_type org_datafile_type= datafile_type;
 
60
  MARIA_SHARE share;
 
61
  MARIA_KEYDEF *keydef,tmp_keydef;
 
62
  MARIA_UNIQUEDEF *uniquedef;
 
63
  HA_KEYSEG *keyseg,tmp_keyseg;
 
64
  MARIA_COLUMNDEF *column, *end_column;
 
65
  double *rec_per_key_part;
 
66
  ulong  *nulls_per_key_part;
 
67
  uint16 *column_array;
 
68
  my_off_t key_root[HA_MAX_POSSIBLE_KEY], kfile_size_before_extension;
 
69
  MARIA_CREATE_INFO tmp_create_info;
 
70
  my_bool tmp_table= FALSE; /* cache for presence of HA_OPTION_TMP_TABLE */
 
71
  my_bool forced_packed;
 
72
  myf     sync_dir=  0;
 
73
  uchar   *log_data= NULL;
 
74
  DBUG_ENTER("maria_create");
 
75
  DBUG_PRINT("enter", ("keys: %u  columns: %u  uniques: %u  flags: %u",
 
76
                      keys, columns, uniques, flags));
 
77
 
 
78
  DBUG_ASSERT(maria_inited);
 
79
  LINT_INIT(dfile);
 
80
  LINT_INIT(file);
 
81
 
 
82
  if (!ci)
 
83
  {
 
84
    bzero((char*) &tmp_create_info,sizeof(tmp_create_info));
 
85
    ci=&tmp_create_info;
 
86
  }
 
87
 
 
88
  if (keys + uniques > MARIA_MAX_KEY)
 
89
  {
 
90
    DBUG_RETURN(my_errno=HA_WRONG_CREATE_OPTION);
 
91
  }
 
92
  errpos=0;
 
93
  options=0;
 
94
  bzero((uchar*) &share,sizeof(share));
 
95
 
 
96
  if (flags & HA_DONT_TOUCH_DATA)
 
97
  {
 
98
    /* We come here from recreate table */
 
99
    org_datafile_type= ci->org_data_file_type;
 
100
    if (!(ci->old_options & HA_OPTION_TEMP_COMPRESS_RECORD))
 
101
      options= (ci->old_options &
 
102
                (HA_OPTION_COMPRESS_RECORD | HA_OPTION_PACK_RECORD |
 
103
                 HA_OPTION_READ_ONLY_DATA | HA_OPTION_CHECKSUM |
 
104
                 HA_OPTION_TMP_TABLE | HA_OPTION_DELAY_KEY_WRITE |
 
105
                 HA_OPTION_LONG_BLOB_PTR | HA_OPTION_PAGE_CHECKSUM));
 
106
    else
 
107
    {
 
108
      /* Uncompressing rows */
 
109
      options= (ci->old_options &
 
110
                (HA_OPTION_CHECKSUM | HA_OPTION_TMP_TABLE |
 
111
                 HA_OPTION_DELAY_KEY_WRITE | HA_OPTION_LONG_BLOB_PTR |
 
112
                 HA_OPTION_PAGE_CHECKSUM));
 
113
    }
 
114
  }
 
115
  else
 
116
  {
 
117
    /* Transactional tables must be of type BLOCK_RECORD */
 
118
    if (ci->transactional)
 
119
      datafile_type= BLOCK_RECORD;
 
120
  }
 
121
 
 
122
  if (ci->reloc_rows > ci->max_rows)
 
123
    ci->reloc_rows=ci->max_rows;                /* Check if wrong parameter */
 
124
 
 
125
  if (!(rec_per_key_part=
 
126
        (double*) my_malloc((keys + uniques)*HA_MAX_KEY_SEG*sizeof(double) +
 
127
                            (keys + uniques)*HA_MAX_KEY_SEG*sizeof(ulong) +
 
128
                            sizeof(uint16) * columns,
 
129
                            MYF(MY_WME | MY_ZEROFILL))))
 
130
    DBUG_RETURN(my_errno);
 
131
  nulls_per_key_part= (ulong*) (rec_per_key_part +
 
132
                                (keys + uniques) * HA_MAX_KEY_SEG);
 
133
  column_array= (uint16*) (nulls_per_key_part +
 
134
                           (keys + uniques) * HA_MAX_KEY_SEG);
 
135
 
 
136
 
 
137
  /* Start by checking fields and field-types used */
 
138
  varchar_length=long_varchar_count=packed= not_block_record_extra_length=
 
139
    pack_reclength= max_field_lengths= 0;
 
140
  reclength= min_pack_length= ci->null_bytes;
 
141
  forced_packed= 0;
 
142
  column_nr= 0;
 
143
 
 
144
  for (column= columndef, end_column= column + columns ;
 
145
       column != end_column ;
 
146
       column++)
 
147
  {
 
148
    /* Fill in not used struct parts */
 
149
    column->column_nr= column_nr++;
 
150
    column->offset= reclength;
 
151
    column->empty_pos= 0;
 
152
    column->empty_bit= 0;
 
153
    column->fill_length= column->length;
 
154
    if (column->null_bit)
 
155
      options|= HA_OPTION_NULL_FIELDS;
 
156
 
 
157
    reclength+= column->length;
 
158
    type= column->type;
 
159
    if (datafile_type == BLOCK_RECORD)
 
160
    {
 
161
      if (type == FIELD_SKIP_PRESPACE)
 
162
        type= column->type= FIELD_NORMAL; /* SKIP_PRESPACE not supported */
 
163
      if (type == FIELD_NORMAL &&
 
164
          column->length > FULL_PAGE_SIZE(maria_block_size))
 
165
      {
 
166
        /* FIELD_NORMAL can't be split over many blocks, convert to a CHAR */
 
167
        type= column->type= FIELD_SKIP_ENDSPACE;
 
168
      }
 
169
    }
 
170
 
 
171
    if (type != FIELD_NORMAL && type != FIELD_CHECK)
 
172
    {
 
173
      column->empty_pos= packed/8;
 
174
      column->empty_bit= (1 << (packed & 7));
 
175
      if (type == FIELD_BLOB)
 
176
      {
 
177
        forced_packed= 1;
 
178
        packed++;
 
179
        share.base.blobs++;
 
180
        if (pack_reclength != INT_MAX32)
 
181
        {
 
182
          if (column->length == 4+portable_sizeof_char_ptr)
 
183
            pack_reclength= INT_MAX32;
 
184
          else
 
185
          {
 
186
            /* Add max possible blob length */
 
187
            pack_reclength+= (1 << ((column->length-
 
188
                                     portable_sizeof_char_ptr)*8));
 
189
          }
 
190
        }
 
191
        max_field_lengths+= (column->length - portable_sizeof_char_ptr);
 
192
      }
 
193
      else if (type == FIELD_SKIP_PRESPACE ||
 
194
               type == FIELD_SKIP_ENDSPACE)
 
195
      {
 
196
        forced_packed= 1;
 
197
        max_field_lengths+= column->length > 255 ? 2 : 1;
 
198
        not_block_record_extra_length++;
 
199
        packed++;
 
200
      }
 
201
      else if (type == FIELD_VARCHAR)
 
202
      {
 
203
        varchar_length+= column->length-1; /* Used for min_pack_length */
 
204
        pack_reclength++;
 
205
        not_block_record_extra_length++;
 
206
        max_field_lengths++;
 
207
        packed++;
 
208
        column->fill_length= 1;
 
209
        options|= HA_OPTION_NULL_FIELDS;        /* Use ma_checksum() */
 
210
 
 
211
        /* We must test for 257 as length includes pack-length */
 
212
        if (test(column->length >= 257))
 
213
        {
 
214
          long_varchar_count++;
 
215
          max_field_lengths++;
 
216
          column->fill_length= 2;
 
217
        }
 
218
      }
 
219
      else if (type == FIELD_SKIP_ZERO)
 
220
        packed++;
 
221
      else
 
222
      {
 
223
        if (!column->null_bit)
 
224
          min_pack_length+= column->length;
 
225
        else
 
226
        {
 
227
          /* Only BLOCK_RECORD skips NULL fields for all field values */
 
228
          not_block_record_extra_length+= column->length;
 
229
        }
 
230
        column->empty_pos= 0;
 
231
        column->empty_bit= 0;
 
232
      }
 
233
    }
 
234
    else                                        /* FIELD_NORMAL */
 
235
    {
 
236
      if (!column->null_bit)
 
237
      {
 
238
        min_pack_length+= column->length;
 
239
        share.base.fixed_not_null_fields++;
 
240
        share.base.fixed_not_null_fields_length+= column->length;
 
241
      }
 
242
      else
 
243
        not_block_record_extra_length+= column->length;
 
244
    }
 
245
  }
 
246
 
 
247
  if (datafile_type == STATIC_RECORD && forced_packed)
 
248
  {
 
249
    /* Can't use fixed length records, revert to block records */
 
250
    datafile_type= BLOCK_RECORD;
 
251
  }
 
252
 
 
253
  if (datafile_type == DYNAMIC_RECORD)
 
254
    options|= HA_OPTION_PACK_RECORD;    /* Must use packed records */
 
255
 
 
256
  if (datafile_type == STATIC_RECORD)
 
257
  {
 
258
    /* We can't use checksum with static length rows */
 
259
    flags&= ~HA_CREATE_CHECKSUM;
 
260
    options&= ~HA_OPTION_CHECKSUM;
 
261
    min_pack_length= reclength;
 
262
    packed= 0;
 
263
  }
 
264
  else if (datafile_type != BLOCK_RECORD)
 
265
    min_pack_length+= not_block_record_extra_length;
 
266
  else
 
267
    min_pack_length+= 5;                        /* Min row overhead */
 
268
 
 
269
  if (flags & HA_CREATE_TMP_TABLE)
 
270
  {
 
271
    options|= HA_OPTION_TMP_TABLE;
 
272
    tmp_table= TRUE;
 
273
    create_mode|= O_NOFOLLOW;
 
274
    /* "CREATE TEMPORARY" tables are not crash-safe (dropped at restart) */
 
275
    ci->transactional= FALSE;
 
276
    flags&= ~HA_CREATE_PAGE_CHECKSUM;
 
277
  }
 
278
  share.base.null_bytes= ci->null_bytes;
 
279
  share.base.original_null_bytes= ci->null_bytes;
 
280
  share.base.born_transactional= ci->transactional;
 
281
  share.base.max_field_lengths= max_field_lengths;
 
282
  share.base.field_offsets= 0;                  /* for future */
 
283
 
 
284
  if (flags & HA_CREATE_CHECKSUM || (options & HA_OPTION_CHECKSUM))
 
285
  {
 
286
    options|= HA_OPTION_CHECKSUM;
 
287
    min_pack_length++;
 
288
    pack_reclength++;
 
289
  }
 
290
  if (pack_reclength < INT_MAX32)
 
291
    pack_reclength+= max_field_lengths + long_varchar_count;
 
292
  else
 
293
    pack_reclength= INT_MAX32;
 
294
 
 
295
  if (flags & HA_CREATE_DELAY_KEY_WRITE)
 
296
    options|= HA_OPTION_DELAY_KEY_WRITE;
 
297
  if (flags & HA_CREATE_RELIES_ON_SQL_LAYER)
 
298
    options|= HA_OPTION_RELIES_ON_SQL_LAYER;
 
299
  if (flags & HA_CREATE_PAGE_CHECKSUM)
 
300
    options|= HA_OPTION_PAGE_CHECKSUM;
 
301
 
 
302
  pack_bytes= (packed + 7) / 8;
 
303
  if (pack_reclength != INT_MAX32)
 
304
    pack_reclength+= reclength+pack_bytes +
 
305
      test(test_all_bits(options, HA_OPTION_CHECKSUM | HA_OPTION_PACK_RECORD));
 
306
  min_pack_length+= pack_bytes;
 
307
  /* Calculate min possible row length for rows-in-block */
 
308
  extra_header_size= MAX_FIXED_HEADER_SIZE;
 
309
  if (ci->transactional)
 
310
  {
 
311
    extra_header_size= TRANS_MAX_FIXED_HEADER_SIZE;
 
312
    DBUG_PRINT("info",("creating a transactional table"));
 
313
  }
 
314
  share.base.min_block_length= (extra_header_size + share.base.null_bytes +
 
315
                                pack_bytes);
 
316
  if (!ci->data_file_length && ci->max_rows)
 
317
  {
 
318
    if (pack_reclength == INT_MAX32 ||
 
319
             (~(ulonglong) 0)/ci->max_rows < (ulonglong) pack_reclength)
 
320
      ci->data_file_length= ~(ulonglong) 0;
 
321
    else
 
322
      ci->data_file_length=(ulonglong) ci->max_rows*pack_reclength;
 
323
  }
 
324
  else if (!ci->max_rows)
 
325
  {
 
326
    if (datafile_type == BLOCK_RECORD)
 
327
    {
 
328
      uint rows_per_page= ((maria_block_size - PAGE_OVERHEAD_SIZE) /
 
329
                           (min_pack_length + extra_header_size +
 
330
                            DIR_ENTRY_SIZE));
 
331
      ulonglong data_file_length= ci->data_file_length;
 
332
      if (!data_file_length)
 
333
        data_file_length= ((((ulonglong) 1 << ((BLOCK_RECORD_POINTER_SIZE-1) *
 
334
                                               8)) -1) * maria_block_size);
 
335
      if (rows_per_page > 0)
 
336
      {
 
337
        set_if_smaller(rows_per_page, MAX_ROWS_PER_PAGE);
 
338
        ci->max_rows= data_file_length / maria_block_size * rows_per_page;
 
339
      }
 
340
      else
 
341
        ci->max_rows= data_file_length / (min_pack_length +
 
342
                                          extra_header_size +
 
343
                                          DIR_ENTRY_SIZE);
 
344
    }
 
345
    else
 
346
      ci->max_rows=(ha_rows) (ci->data_file_length/(min_pack_length +
 
347
                                                    ((options &
 
348
                                                      HA_OPTION_PACK_RECORD) ?
 
349
                                                     3 : 0)));
 
350
  }
 
351
  max_rows= (ulonglong) ci->max_rows;
 
352
  if (datafile_type == BLOCK_RECORD)
 
353
  {
 
354
    /*
 
355
      The + 1 is for record position withing page
 
356
      The / 2 is because we need one bit for knowing if there is transid's
 
357
      after the row pointer
 
358
    */
 
359
    pointer= maria_get_pointer_length((ci->data_file_length /
 
360
                                       (maria_block_size * 2)), 3) + 1;
 
361
    set_if_smaller(pointer, BLOCK_RECORD_POINTER_SIZE);
 
362
 
 
363
    if (!max_rows)
 
364
      max_rows= (((((ulonglong) 1 << ((pointer-1)*8)) -1) * maria_block_size) /
 
365
                 min_pack_length / 2);
 
366
                                      }
 
367
  else
 
368
  {
 
369
    if (datafile_type != STATIC_RECORD)
 
370
      pointer= maria_get_pointer_length(ci->data_file_length,
 
371
                                        maria_data_pointer_size);
 
372
    else
 
373
      pointer= maria_get_pointer_length(ci->max_rows, maria_data_pointer_size);
 
374
    if (!max_rows)
 
375
      max_rows= ((((ulonglong) 1 << (pointer*8)) -1) / min_pack_length);
 
376
  }
 
377
 
 
378
  real_reclength=reclength;
 
379
  if (datafile_type == STATIC_RECORD)
 
380
  {
 
381
    if (reclength <= pointer)
 
382
      reclength=pointer+1;              /* reserve place for delete link */
 
383
  }
 
384
  else
 
385
    reclength+= long_varchar_count;     /* We need space for varchar! */
 
386
 
 
387
  max_key_length=0; tot_length=0 ; key_segs=0;
 
388
  fulltext_keys=0;
 
389
  share.state.rec_per_key_part=   rec_per_key_part;
 
390
  share.state.nulls_per_key_part= nulls_per_key_part;
 
391
  share.state.key_root=key_root;
 
392
  share.state.key_del= HA_OFFSET_ERROR;
 
393
  if (uniques)
 
394
    max_key_length= MARIA_UNIQUE_HASH_LENGTH + pointer;
 
395
 
 
396
  for (i=0, keydef=keydefs ; i < keys ; i++ , keydef++)
 
397
  {
 
398
    share.state.key_root[i]= HA_OFFSET_ERROR;
 
399
    length= real_length_diff= 0;
 
400
    min_key_length= key_length= pointer;
 
401
 
 
402
    if (keydef->key_alg == HA_KEY_ALG_RTREE)
 
403
      keydef->flag|= HA_RTREE_INDEX;            /* For easier tests */
 
404
 
 
405
    if (keydef->flag & HA_SPATIAL)
 
406
    {
 
407
#ifdef HAVE_SPATIAL
 
408
      /* BAR TODO to support 3D and more dimensions in the future */
 
409
      uint sp_segs=SPDIMS*2;
 
410
      keydef->flag=HA_SPATIAL;
 
411
 
 
412
      if (flags & HA_DONT_TOUCH_DATA)
 
413
      {
 
414
        /*
 
415
          Called by maria_chk - i.e. table structure was taken from
 
416
          MYI file and SPATIAL key *does have* additional sp_segs keysegs.
 
417
          keydef->seg here points right at the GEOMETRY segment,
 
418
          so we only need to decrease keydef->keysegs.
 
419
          (see maria_recreate_table() in _ma_check.c)
 
420
        */
 
421
        keydef->keysegs-=sp_segs-1;
 
422
      }
 
423
 
 
424
      for (j=0, keyseg=keydef->seg ; (int) j < keydef->keysegs ;
 
425
           j++, keyseg++)
 
426
      {
 
427
        if (keyseg->type != HA_KEYTYPE_BINARY &&
 
428
            keyseg->type != HA_KEYTYPE_VARBINARY1 &&
 
429
            keyseg->type != HA_KEYTYPE_VARBINARY2)
 
430
        {
 
431
          my_errno=HA_WRONG_CREATE_OPTION;
 
432
          goto err_no_lock;
 
433
        }
 
434
      }
 
435
      keydef->keysegs+=sp_segs;
 
436
      key_length+=SPLEN*sp_segs;
 
437
      length++;                              /* At least one length uchar */
 
438
      min_key_length++;
 
439
#else
 
440
      my_errno= HA_ERR_UNSUPPORTED;
 
441
      goto err_no_lock;
 
442
#endif /*HAVE_SPATIAL*/
 
443
    }
 
444
    else if (keydef->flag & HA_FULLTEXT)
 
445
    {
 
446
      keydef->flag=HA_FULLTEXT | HA_PACK_KEY | HA_VAR_LENGTH_KEY;
 
447
      options|=HA_OPTION_PACK_KEYS;             /* Using packed keys */
 
448
 
 
449
      for (j=0, keyseg=keydef->seg ; (int) j < keydef->keysegs ;
 
450
           j++, keyseg++)
 
451
      {
 
452
        if (keyseg->type != HA_KEYTYPE_TEXT &&
 
453
            keyseg->type != HA_KEYTYPE_VARTEXT1 &&
 
454
            keyseg->type != HA_KEYTYPE_VARTEXT2)
 
455
        {
 
456
          my_errno=HA_WRONG_CREATE_OPTION;
 
457
          goto err_no_lock;
 
458
        }
 
459
        if (!(keyseg->flag & HA_BLOB_PART) &&
 
460
            (keyseg->type == HA_KEYTYPE_VARTEXT1 ||
 
461
             keyseg->type == HA_KEYTYPE_VARTEXT2))
 
462
        {
 
463
          /* Make a flag that this is a VARCHAR */
 
464
          keyseg->flag|= HA_VAR_LENGTH_PART;
 
465
          /* Store in bit_start number of bytes used to pack the length */
 
466
          keyseg->bit_start= ((keyseg->type == HA_KEYTYPE_VARTEXT1)?
 
467
                              1 : 2);
 
468
        }
 
469
      }
 
470
 
 
471
      fulltext_keys++;
 
472
      key_length+= HA_FT_MAXBYTELEN+HA_FT_WLEN;
 
473
      length++;                              /* At least one length uchar */
 
474
      min_key_length+= 1 + HA_FT_WLEN;
 
475
      real_length_diff=HA_FT_MAXBYTELEN-FT_MAX_WORD_LEN_FOR_SORT;
 
476
    }
 
477
    else
 
478
    {
 
479
      /* Test if prefix compression */
 
480
      if (keydef->flag & HA_PACK_KEY)
 
481
      {
 
482
        /* Can't use space_compression on number keys */
 
483
        if ((keydef->seg[0].flag & HA_SPACE_PACK) &&
 
484
            keydef->seg[0].type == (int) HA_KEYTYPE_NUM)
 
485
          keydef->seg[0].flag&= ~HA_SPACE_PACK;
 
486
 
 
487
        /* Only use HA_PACK_KEY when first segment is a variable length key */
 
488
        if (!(keydef->seg[0].flag & (HA_SPACE_PACK | HA_BLOB_PART |
 
489
                                     HA_VAR_LENGTH_PART)))
 
490
        {
 
491
          /* pack relative to previous key */
 
492
          keydef->flag&= ~HA_PACK_KEY;
 
493
          keydef->flag|= HA_BINARY_PACK_KEY | HA_VAR_LENGTH_KEY;
 
494
        }
 
495
        else
 
496
        {
 
497
          keydef->seg[0].flag|=HA_PACK_KEY;     /* for easyer intern test */
 
498
          keydef->flag|=HA_VAR_LENGTH_KEY;
 
499
          options|=HA_OPTION_PACK_KEYS;         /* Using packed keys */
 
500
        }
 
501
      }
 
502
      if (keydef->flag & HA_BINARY_PACK_KEY)
 
503
        options|=HA_OPTION_PACK_KEYS;           /* Using packed keys */
 
504
 
 
505
      if (keydef->flag & HA_AUTO_KEY && ci->with_auto_increment)
 
506
        share.base.auto_key=i+1;
 
507
      for (j=0, keyseg=keydef->seg ; j < keydef->keysegs ; j++, keyseg++)
 
508
      {
 
509
        /* numbers are stored with high by first to make compression easier */
 
510
        switch (keyseg->type) {
 
511
        case HA_KEYTYPE_SHORT_INT:
 
512
        case HA_KEYTYPE_LONG_INT:
 
513
        case HA_KEYTYPE_FLOAT:
 
514
        case HA_KEYTYPE_DOUBLE:
 
515
        case HA_KEYTYPE_USHORT_INT:
 
516
        case HA_KEYTYPE_ULONG_INT:
 
517
        case HA_KEYTYPE_LONGLONG:
 
518
        case HA_KEYTYPE_ULONGLONG:
 
519
        case HA_KEYTYPE_INT24:
 
520
        case HA_KEYTYPE_UINT24:
 
521
        case HA_KEYTYPE_INT8:
 
522
          keyseg->flag|= HA_SWAP_KEY;
 
523
          break;
 
524
        case HA_KEYTYPE_VARTEXT1:
 
525
        case HA_KEYTYPE_VARTEXT2:
 
526
        case HA_KEYTYPE_VARBINARY1:
 
527
        case HA_KEYTYPE_VARBINARY2:
 
528
          if (!(keyseg->flag & HA_BLOB_PART))
 
529
          {
 
530
            /* Make a flag that this is a VARCHAR */
 
531
            keyseg->flag|= HA_VAR_LENGTH_PART;
 
532
            /* Store in bit_start number of bytes used to pack the length */
 
533
            keyseg->bit_start= ((keyseg->type == HA_KEYTYPE_VARTEXT1 ||
 
534
                                 keyseg->type == HA_KEYTYPE_VARBINARY1) ?
 
535
                                1 : 2);
 
536
          }
 
537
          break;
 
538
        default:
 
539
          break;
 
540
        }
 
541
        if (keyseg->flag & HA_SPACE_PACK)
 
542
        {
 
543
          DBUG_ASSERT(!(keyseg->flag & (HA_VAR_LENGTH_PART | HA_BLOB_PART)));
 
544
          keydef->flag |= HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY;
 
545
          options|=HA_OPTION_PACK_KEYS;         /* Using packed keys */
 
546
          length++;                             /* At least one length uchar */
 
547
          if (!keyseg->null_bit)
 
548
            min_key_length++;
 
549
          key_length+= keyseg->length;
 
550
          if (keyseg->length >= 255)
 
551
          {
 
552
            /* prefix may be 3 bytes */
 
553
            length+= 2;
 
554
          }
 
555
        }
 
556
        else if (keyseg->flag & (HA_VAR_LENGTH_PART | HA_BLOB_PART))
 
557
        {
 
558
          DBUG_ASSERT(!test_all_bits(keyseg->flag,
 
559
                                    (HA_VAR_LENGTH_PART | HA_BLOB_PART)));
 
560
          keydef->flag|=HA_VAR_LENGTH_KEY;
 
561
          length++;                             /* At least one length uchar */
 
562
          if (!keyseg->null_bit)
 
563
            min_key_length++;
 
564
          options|=HA_OPTION_PACK_KEYS;         /* Using packed keys */
 
565
          key_length+= keyseg->length;
 
566
          if (keyseg->length >= 255)
 
567
          {
 
568
            /* prefix may be 3 bytes */
 
569
            length+= 2;
 
570
          }
 
571
        }
 
572
        else
 
573
        {
 
574
          key_length+= keyseg->length;
 
575
          if (!keyseg->null_bit)
 
576
            min_key_length+= keyseg->length;
 
577
        }
 
578
        if (keyseg->null_bit)
 
579
        {
 
580
          key_length++;
 
581
          /* min key part is 1 byte */
 
582
          min_key_length++;
 
583
          options|=HA_OPTION_PACK_KEYS;
 
584
          keyseg->flag|=HA_NULL_PART;
 
585
          keydef->flag|=HA_VAR_LENGTH_KEY | HA_NULL_PART_KEY;
 
586
        }
 
587
      }
 
588
    } /* if HA_FULLTEXT */
 
589
    key_segs+=keydef->keysegs;
 
590
    if (keydef->keysegs > HA_MAX_KEY_SEG)
 
591
    {
 
592
      my_errno=HA_WRONG_CREATE_OPTION;
 
593
      goto err_no_lock;
 
594
    }
 
595
    /*
 
596
      key_segs may be 0 in the case when we only want to be able to
 
597
      add on row into the table. This can happen with some DISTINCT queries
 
598
      in MySQL
 
599
    */
 
600
    if ((keydef->flag & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME &&
 
601
        key_segs)
 
602
      share.state.rec_per_key_part[key_segs-1]=1L;
 
603
    length+=key_length;
 
604
    /*
 
605
      A key can't be longer than than half a index block (as we have
 
606
      to be able to put at least 2 keys on an index block for the key
 
607
      algorithms to work).
 
608
    */
 
609
    if (length > maria_max_key_length())
 
610
    {
 
611
      my_errno=HA_WRONG_CREATE_OPTION;
 
612
      goto err_no_lock;
 
613
    }
 
614
    keydef->block_length= (uint16) maria_block_size;
 
615
    keydef->keylength= (uint16) key_length;
 
616
    keydef->minlength= (uint16) min_key_length;
 
617
    keydef->maxlength= (uint16) length;
 
618
 
 
619
    if (length > max_key_length)
 
620
      max_key_length= length;
 
621
    tot_length+= ((max_rows/(ulong) (((uint) maria_block_size -
 
622
                                      MAX_KEYPAGE_HEADER_SIZE -
 
623
                                      KEYPAGE_CHECKSUM_SIZE)/
 
624
                                     (length*2))) *
 
625
                  maria_block_size);
 
626
  }
 
627
 
 
628
  unique_key_parts=0;
 
629
  for (i=0, uniquedef=uniquedefs ; i < uniques ; i++ , uniquedef++)
 
630
  {
 
631
    uniquedef->key=keys+i;
 
632
    unique_key_parts+=uniquedef->keysegs;
 
633
    share.state.key_root[keys+i]= HA_OFFSET_ERROR;
 
634
    tot_length+= (max_rows/(ulong) (((uint) maria_block_size -
 
635
                                     MAX_KEYPAGE_HEADER_SIZE -
 
636
                                     KEYPAGE_CHECKSUM_SIZE) /
 
637
                         ((MARIA_UNIQUE_HASH_LENGTH + pointer)*2)))*
 
638
                         (ulong) maria_block_size;
 
639
  }
 
640
  keys+=uniques;                                /* Each unique has 1 key */
 
641
  key_segs+=uniques;                            /* Each unique has 1 key seg */
 
642
 
 
643
  base_pos=(MARIA_STATE_INFO_SIZE + keys * MARIA_STATE_KEY_SIZE +
 
644
            key_segs * MARIA_STATE_KEYSEG_SIZE);
 
645
  info_length= base_pos+(uint) (MARIA_BASE_INFO_SIZE+
 
646
                                keys * MARIA_KEYDEF_SIZE+
 
647
                                uniques * MARIA_UNIQUEDEF_SIZE +
 
648
                                (key_segs + unique_key_parts)*HA_KEYSEG_SIZE+
 
649
                                columns*(MARIA_COLUMNDEF_SIZE + 2));
 
650
 
 
651
 DBUG_PRINT("info", ("info_length: %u", info_length));
 
652
  /* There are only 16 bits for the total header length. */
 
653
  if (info_length > 65535)
 
654
  {
 
655
    my_printf_error(0, "Maria table '%s' has too many columns and/or "
 
656
                    "indexes and/or unique constraints.",
 
657
                    MYF(0), name + dirname_length(name));
 
658
    my_errno= HA_WRONG_CREATE_OPTION;
 
659
    goto err_no_lock;
 
660
  }
 
661
 
 
662
  bmove(share.state.header.file_version,(uchar*) maria_file_magic,4);
 
663
  ci->old_options=options | (ci->old_options & HA_OPTION_TEMP_COMPRESS_RECORD ?
 
664
                             HA_OPTION_COMPRESS_RECORD |
 
665
                             HA_OPTION_TEMP_COMPRESS_RECORD: 0);
 
666
  mi_int2store(share.state.header.options,ci->old_options);
 
667
  mi_int2store(share.state.header.header_length,info_length);
 
668
  mi_int2store(share.state.header.state_info_length,MARIA_STATE_INFO_SIZE);
 
669
  mi_int2store(share.state.header.base_info_length,MARIA_BASE_INFO_SIZE);
 
670
  mi_int2store(share.state.header.base_pos,base_pos);
 
671
  share.state.header.data_file_type= share.data_file_type= datafile_type;
 
672
  share.state.header.org_data_file_type= org_datafile_type;
 
673
  share.state.header.language= (ci->language ?
 
674
                                ci->language : default_charset_info->number);
 
675
 
 
676
  share.state.dellink = HA_OFFSET_ERROR;
 
677
  share.state.first_bitmap_with_space= 0;
 
678
#ifdef EXTERNAL_LOCKING
 
679
  share.state.process=  (ulong) getpid();
 
680
#endif
 
681
  share.state.version=  (ulong) time((time_t*) 0);
 
682
  share.state.sortkey=  (ushort) ~0;
 
683
  share.state.auto_increment=ci->auto_increment;
 
684
  share.options=options;
 
685
  share.base.rec_reflength=pointer;
 
686
  share.base.block_size= maria_block_size;
 
687
 
 
688
  /*
 
689
    Get estimate for index file length (this may be wrong for FT keys)
 
690
    This is used for pointers to other key pages.
 
691
  */
 
692
  tmp= (tot_length + maria_block_size * keys *
 
693
        MARIA_INDEX_BLOCK_MARGIN) / maria_block_size;
 
694
 
 
695
  /*
 
696
    use maximum of key_file_length we calculated and key_file_length value we
 
697
    got from MAI file header (see also mariapack.c:save_state)
 
698
  */
 
699
  share.base.key_reflength=
 
700
    maria_get_pointer_length(max(ci->key_file_length,tmp),3);
 
701
  share.base.keys= share.state.header.keys= keys;
 
702
  share.state.header.uniques= uniques;
 
703
  share.state.header.fulltext_keys= fulltext_keys;
 
704
  mi_int2store(share.state.header.key_parts,key_segs);
 
705
  mi_int2store(share.state.header.unique_key_parts,unique_key_parts);
 
706
 
 
707
  maria_set_all_keys_active(share.state.key_map, keys);
 
708
 
 
709
  share.base.keystart = share.state.state.key_file_length=
 
710
    MY_ALIGN(info_length, maria_block_size);
 
711
  share.base.max_key_block_length= maria_block_size;
 
712
  share.base.max_key_length=ALIGN_SIZE(max_key_length+4);
 
713
  share.base.records=ci->max_rows;
 
714
  share.base.reloc=  ci->reloc_rows;
 
715
  share.base.reclength=real_reclength;
 
716
  share.base.pack_reclength=reclength+ test(options & HA_OPTION_CHECKSUM);
 
717
  share.base.max_pack_length=pack_reclength;
 
718
  share.base.min_pack_length=min_pack_length;
 
719
  share.base.pack_bytes= pack_bytes;
 
720
  share.base.fields= columns;
 
721
  share.base.pack_fields= packed;
 
722
 
 
723
  if (share.data_file_type == BLOCK_RECORD)
 
724
  {
 
725
    /*
 
726
      we are going to create a first bitmap page, set data_file_length
 
727
      to reflect this, before the state goes to disk
 
728
    */
 
729
    share.state.state.data_file_length= maria_block_size;
 
730
    /* Add length of packed fields + length */
 
731
    share.base.pack_reclength+= share.base.max_field_lengths+3;
 
732
 
 
733
    /* Adjust max_pack_length, to be used if we have short rows */
 
734
    if (share.base.max_pack_length < maria_block_size)
 
735
    {
 
736
      share.base.max_pack_length+= FLAG_SIZE;
 
737
      if (ci->transactional)
 
738
        share.base.max_pack_length+= TRANSID_SIZE * 2;
 
739
    }
 
740
  }
 
741
 
 
742
  /* max_data_file_length and max_key_file_length are recalculated on open */
 
743
  if (tmp_table)
 
744
    share.base.max_data_file_length= (my_off_t) ci->data_file_length;
 
745
  else if (ci->transactional && translog_status == TRANSLOG_OK &&
 
746
           !maria_in_recovery)
 
747
  {
 
748
    /*
 
749
      we have checked translog_inited above, because maria_chk may call us
 
750
      (via maria_recreate_table()) and it does not have a log.
 
751
    */
 
752
    sync_dir= MY_SYNC_DIR;
 
753
  }
 
754
 
 
755
  if (datafile_type == DYNAMIC_RECORD)
 
756
  {
 
757
    share.base.min_block_length=
 
758
      (share.base.pack_reclength+3 < MARIA_EXTEND_BLOCK_LENGTH &&
 
759
       ! share.base.blobs) ?
 
760
      max(share.base.pack_reclength,MARIA_MIN_BLOCK_LENGTH) :
 
761
      MARIA_EXTEND_BLOCK_LENGTH;
 
762
  }
 
763
  else if (datafile_type == STATIC_RECORD)
 
764
    share.base.min_block_length= share.base.pack_reclength;
 
765
 
 
766
  if (! (flags & HA_DONT_TOUCH_DATA))
 
767
    share.state.create_time= (long) time((time_t*) 0);
 
768
 
 
769
  pthread_mutex_lock(&THR_LOCK_maria);
 
770
 
 
771
  /*
 
772
    NOTE: For test_if_reopen() we need a real path name. Hence we need
 
773
    MY_RETURN_REAL_PATH for every fn_format(filename, ...).
 
774
  */
 
775
  if (ci->index_file_name)
 
776
  {
 
777
    char *iext= strrchr(ci->index_file_name, '.');
 
778
    int have_iext= iext && !strcmp(iext, MARIA_NAME_IEXT);
 
779
    if (tmp_table)
 
780
    {
 
781
      char *path;
 
782
      /* chop off the table name, tempory tables use generated name */
 
783
      if ((path= strrchr(ci->index_file_name, FN_LIBCHAR)))
 
784
        *path= '\0';
 
785
      fn_format(filename, name, ci->index_file_name, MARIA_NAME_IEXT,
 
786
                MY_REPLACE_DIR | MY_UNPACK_FILENAME |
 
787
                MY_RETURN_REAL_PATH | MY_APPEND_EXT);
 
788
    }
 
789
    else
 
790
    {
 
791
      fn_format(filename, ci->index_file_name, "", MARIA_NAME_IEXT,
 
792
                MY_UNPACK_FILENAME | MY_RETURN_REAL_PATH |
 
793
                (have_iext ? MY_REPLACE_EXT : MY_APPEND_EXT));
 
794
    }
 
795
    fn_format(linkname, name, "", MARIA_NAME_IEXT,
 
796
              MY_UNPACK_FILENAME|MY_APPEND_EXT);
 
797
    linkname_ptr= linkname;
 
798
    /*
 
799
      Don't create the table if the link or file exists to ensure that one
 
800
      doesn't accidently destroy another table.
 
801
      Don't sync dir now if the data file has the same path.
 
802
    */
 
803
    create_flag=
 
804
      (ci->data_file_name &&
 
805
       !strcmp(ci->index_file_name, ci->data_file_name)) ? 0 : sync_dir;
 
806
  }
 
807
  else
 
808
  {
 
809
    char *iext= strrchr(name, '.');
 
810
    int have_iext= iext && !strcmp(iext, MARIA_NAME_IEXT);
 
811
    fn_format(filename, name, "", MARIA_NAME_IEXT,
 
812
              MY_UNPACK_FILENAME | MY_RETURN_REAL_PATH |
 
813
              (have_iext ? MY_REPLACE_EXT : MY_APPEND_EXT));
 
814
    linkname_ptr= NullS;
 
815
    /*
 
816
      Replace the current file.
 
817
      Don't sync dir now if the data file has the same path.
 
818
    */
 
819
    create_flag=  (flags & HA_CREATE_KEEP_FILES) ? 0 : MY_DELETE_OLD;
 
820
    create_flag|= (!ci->data_file_name ? 0 : sync_dir);
 
821
  }
 
822
 
 
823
  /*
 
824
    If a MRG_MARIA table is in use, the mapped MARIA tables are open,
 
825
    but no entry is made in the table cache for them.
 
826
    A TRUNCATE command checks for the table in the cache only and could
 
827
    be fooled to believe, the table is not open.
 
828
    Pull the emergency brake in this situation. (Bug #8306)
 
829
 
 
830
 
 
831
    NOTE: The filename is compared against unique_file_name of every
 
832
    open table. Hence we need a real path here.
 
833
  */
 
834
  if (_ma_test_if_reopen(filename))
 
835
  {
 
836
    my_printf_error(0, "MARIA table '%s' is in use "
 
837
                    "(most likely by a MERGE table). Try FLUSH TABLES.",
 
838
                    MYF(0), name + dirname_length(name));
 
839
    goto err;
 
840
  }
 
841
 
 
842
  if ((file= my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
 
843
                                    MYF(MY_WME|create_flag))) < 0)
 
844
    goto err;
 
845
  errpos=1;
 
846
 
 
847
  DBUG_PRINT("info", ("write state info and base info"));
 
848
  if (_ma_state_info_write_sub(file, &share.state, 2) ||
 
849
      _ma_base_info_write(file, &share.base))
 
850
    goto err;
 
851
  DBUG_PRINT("info", ("base_pos: %d  base_info_size: %d",
 
852
                      base_pos, MARIA_BASE_INFO_SIZE));
 
853
  DBUG_ASSERT(my_tell(file,MYF(0)) == base_pos+ MARIA_BASE_INFO_SIZE);
 
854
 
 
855
  /* Write key and keyseg definitions */
 
856
  DBUG_PRINT("info", ("write key and keyseg definitions"));
 
857
  for (i=0 ; i < share.base.keys - uniques; i++)
 
858
  {
 
859
    uint sp_segs=(keydefs[i].flag & HA_SPATIAL) ? 2*SPDIMS : 0;
 
860
 
 
861
    if (_ma_keydef_write(file, &keydefs[i]))
 
862
      goto err;
 
863
    for (j=0 ; j < keydefs[i].keysegs-sp_segs ; j++)
 
864
      if (_ma_keyseg_write(file, &keydefs[i].seg[j]))
 
865
       goto err;
 
866
#ifdef HAVE_SPATIAL
 
867
    for (j=0 ; j < sp_segs ; j++)
 
868
    {
 
869
      HA_KEYSEG sseg;
 
870
      sseg.type=SPTYPE;
 
871
      sseg.language= 7;                         /* Binary */
 
872
      sseg.null_bit=0;
 
873
      sseg.bit_start=0;
 
874
      sseg.bit_end=0;
 
875
      sseg.bit_length= 0;
 
876
      sseg.bit_pos= 0;
 
877
      sseg.length=SPLEN;
 
878
      sseg.null_pos=0;
 
879
      sseg.start=j*SPLEN;
 
880
      sseg.flag= HA_SWAP_KEY;
 
881
      if (_ma_keyseg_write(file, &sseg))
 
882
        goto err;
 
883
    }
 
884
#endif
 
885
  }
 
886
  /* Create extra keys for unique definitions */
 
887
  offset= real_reclength - uniques*MARIA_UNIQUE_HASH_LENGTH;
 
888
  bzero((char*) &tmp_keydef,sizeof(tmp_keydef));
 
889
  bzero((char*) &tmp_keyseg,sizeof(tmp_keyseg));
 
890
  for (i=0; i < uniques ; i++)
 
891
  {
 
892
    tmp_keydef.keysegs=1;
 
893
    tmp_keydef.flag=            HA_UNIQUE_CHECK;
 
894
    tmp_keydef.block_length=    (uint16) maria_block_size;
 
895
    tmp_keydef.keylength=       MARIA_UNIQUE_HASH_LENGTH + pointer;
 
896
    tmp_keydef.minlength=tmp_keydef.maxlength=tmp_keydef.keylength;
 
897
    tmp_keyseg.type=            MARIA_UNIQUE_HASH_TYPE;
 
898
    tmp_keyseg.length=          MARIA_UNIQUE_HASH_LENGTH;
 
899
    tmp_keyseg.start=           offset;
 
900
    offset+=                    MARIA_UNIQUE_HASH_LENGTH;
 
901
    if (_ma_keydef_write(file,&tmp_keydef) ||
 
902
        _ma_keyseg_write(file,(&tmp_keyseg)))
 
903
      goto err;
 
904
  }
 
905
 
 
906
  /* Save unique definition */
 
907
  DBUG_PRINT("info", ("write unique definitions"));
 
908
  for (i=0 ; i < share.state.header.uniques ; i++)
 
909
  {
 
910
    HA_KEYSEG *keyseg_end;
 
911
    keyseg= uniquedefs[i].seg;
 
912
    if (_ma_uniquedef_write(file, &uniquedefs[i]))
 
913
      goto err;
 
914
    for (keyseg= uniquedefs[i].seg, keyseg_end= keyseg+ uniquedefs[i].keysegs;
 
915
         keyseg < keyseg_end;
 
916
         keyseg++)
 
917
    {
 
918
      switch (keyseg->type) {
 
919
      case HA_KEYTYPE_VARTEXT1:
 
920
      case HA_KEYTYPE_VARTEXT2:
 
921
      case HA_KEYTYPE_VARBINARY1:
 
922
      case HA_KEYTYPE_VARBINARY2:
 
923
        if (!(keyseg->flag & HA_BLOB_PART))
 
924
        {
 
925
          keyseg->flag|= HA_VAR_LENGTH_PART;
 
926
          keyseg->bit_start= ((keyseg->type == HA_KEYTYPE_VARTEXT1 ||
 
927
                               keyseg->type == HA_KEYTYPE_VARBINARY1) ?
 
928
                              1 : 2);
 
929
        }
 
930
        break;
 
931
      default:
 
932
        DBUG_ASSERT((keyseg->flag & HA_VAR_LENGTH_PART) == 0);
 
933
        break;
 
934
      }
 
935
      if (_ma_keyseg_write(file, keyseg))
 
936
        goto err;
 
937
    }
 
938
  }
 
939
  DBUG_PRINT("info", ("write field definitions"));
 
940
  if (datafile_type == BLOCK_RECORD)
 
941
  {
 
942
    /* Store columns in a more efficent order */
 
943
    MARIA_COLUMNDEF **col_order, **pos;
 
944
    if (!(col_order= (MARIA_COLUMNDEF**) my_malloc(share.base.fields *
 
945
                                                   sizeof(MARIA_COLUMNDEF*),
 
946
                                                   MYF(MY_WME))))
 
947
      goto err;
 
948
    for (column= columndef, pos= col_order ;
 
949
         column != end_column ;
 
950
         column++, pos++)
 
951
      *pos= column;
 
952
    qsort(col_order, share.base.fields, sizeof(*col_order),
 
953
          (qsort_cmp) compare_columns);
 
954
    for (i=0 ; i < share.base.fields ; i++)
 
955
    {
 
956
      column_array[col_order[i]->column_nr]= i;
 
957
      if (_ma_columndef_write(file, col_order[i]))
 
958
      {
 
959
        my_free((uchar*) col_order, MYF(0));
 
960
        goto err;
 
961
      }
 
962
    }
 
963
    my_free((uchar*) col_order, MYF(0));
 
964
  }
 
965
  else
 
966
  {
 
967
    for (i=0 ; i < share.base.fields ; i++)
 
968
    {
 
969
      column_array[i]= (uint16) i;
 
970
      if (_ma_columndef_write(file, &columndef[i]))
 
971
        goto err;
 
972
    }
 
973
  }
 
974
  if (_ma_column_nr_write(file, column_array, columns))
 
975
    goto err;
 
976
 
 
977
  if ((kfile_size_before_extension= my_tell(file,MYF(0))) == MY_FILEPOS_ERROR)
 
978
    goto err;
 
979
#ifndef DBUG_OFF
 
980
  if (kfile_size_before_extension != info_length)
 
981
    DBUG_PRINT("warning",("info_length: %u  != used_length: %u",
 
982
                          info_length, (uint)kfile_size_before_extension));
 
983
#endif
 
984
 
 
985
  if (sync_dir)
 
986
  {
 
987
    /*
 
988
      we log the first bytes and then the size to which we extend; this is
 
989
      not log 1 KB of mostly zeroes if this is a small table.
 
990
    */
 
991
    char empty_string[]= "";
 
992
    LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
 
993
    translog_size_t total_rec_length= 0;
 
994
    uint k;
 
995
    LSN lsn;
 
996
    log_array[TRANSLOG_INTERNAL_PARTS + 1].length= 1 + 2 + 2 +
 
997
      (uint) kfile_size_before_extension;
 
998
    /* we are needing maybe 64 kB, so don't use the stack */
 
999
    log_data= my_malloc(log_array[TRANSLOG_INTERNAL_PARTS + 1].length, MYF(0));
 
1000
    if ((log_data == NULL) ||
 
1001
        my_pread(file, 1 + 2 + 2 + log_data,
 
1002
                 (size_t) kfile_size_before_extension, 0, MYF(MY_NABP)))
 
1003
      goto err;
 
1004
    /*
 
1005
      remember if the data file was created or not, to know if Recovery can
 
1006
      do it or not, in the future
 
1007
    */
 
1008
    log_data[0]= test(flags & HA_DONT_TOUCH_DATA);
 
1009
    int2store(log_data + 1, kfile_size_before_extension);
 
1010
    int2store(log_data + 1 + 2, share.base.keystart);
 
1011
    log_array[TRANSLOG_INTERNAL_PARTS + 0].str= name;
 
1012
    /* we store the end-zero, for Recovery to just pass it to my_create() */
 
1013
    log_array[TRANSLOG_INTERNAL_PARTS + 0].length=
 
1014
      strlen(log_array[TRANSLOG_INTERNAL_PARTS + 0].str) + 1;
 
1015
    log_array[TRANSLOG_INTERNAL_PARTS + 1].str= log_data;
 
1016
    /* symlink description is also needed for re-creation by Recovery: */
 
1017
    log_array[TRANSLOG_INTERNAL_PARTS + 2].str=
 
1018
      (ci->data_file_name ? ci->data_file_name : empty_string);
 
1019
    log_array[TRANSLOG_INTERNAL_PARTS + 2].length=
 
1020
      strlen(log_array[TRANSLOG_INTERNAL_PARTS + 2].str) + 1;
 
1021
    log_array[TRANSLOG_INTERNAL_PARTS + 3].str=
 
1022
      (ci->index_file_name ? ci->index_file_name : empty_string);
 
1023
    log_array[TRANSLOG_INTERNAL_PARTS + 3].length=
 
1024
      strlen(log_array[TRANSLOG_INTERNAL_PARTS + 3].str) + 1;
 
1025
    for (k= TRANSLOG_INTERNAL_PARTS;
 
1026
         k < (sizeof(log_array)/sizeof(log_array[0])); k++)
 
1027
      total_rec_length+= (translog_size_t) log_array[k].length;
 
1028
    /**
 
1029
       For this record to be of any use for Recovery, we need the upper
 
1030
       MySQL layer to be crash-safe, which it is not now (that would require
 
1031
       work using the ddl_log of sql/sql_table.cc); when it is, we should
 
1032
       reconsider the moment of writing this log record (before or after op,
 
1033
       under THR_LOCK_maria or not...), how to use it in Recovery.
 
1034
       For now this record can serve when we apply logs to a backup,
 
1035
       so we sync it. This happens before the data file is created. If the
 
1036
       data file was created before, and we crashed before writing the log
 
1037
       record, at restart the table may be used, so we would not have a
 
1038
       trustable history in the log (impossible to apply this log to a
 
1039
       backup). The way we do it, if we crash before writing the log record
 
1040
       then there is no data file and the table cannot be used.
 
1041
       @todo Note that in case of TRUNCATE TABLE we also come here; for
 
1042
       Recovery to be able to finish TRUNCATE TABLE, instead of leaving a
 
1043
       half-truncated table, we should log the record at start of
 
1044
       maria_create(); for that we shouldn't write to the index file but to a
 
1045
       buffer (DYNAMIC_STRING), put the buffer into the record, then put the
 
1046
       buffer into the index file (so, change _ma_keydef_write() etc). That
 
1047
       would also enable Recovery to finish a CREATE TABLE. The final result
 
1048
       would be that we would be able to finish what the SQL layer has asked
 
1049
       for: it would be atomic.
 
1050
       When in CREATE/TRUNCATE (or DROP or RENAME or REPAIR) we have not
 
1051
       called external_lock(), so have no TRN. It does not matter, as all
 
1052
       these operations are non-transactional and sync their files.
 
1053
    */
 
1054
    if (unlikely(translog_write_record(&lsn,
 
1055
                                       LOGREC_REDO_CREATE_TABLE,
 
1056
                                       &dummy_transaction_object, NULL,
 
1057
                                       total_rec_length,
 
1058
                                       sizeof(log_array)/sizeof(log_array[0]),
 
1059
                                       log_array, NULL, NULL) ||
 
1060
                 translog_flush(lsn)))
 
1061
      goto err;
 
1062
    /*
 
1063
      store LSN into file, needed for Recovery to not be confused if a
 
1064
      DROP+CREATE happened (applying REDOs to the wrong table).
 
1065
    */
 
1066
    share.kfile.file= file;
 
1067
    if (_ma_update_state_lsns_sub(&share, lsn, trnman_get_min_safe_trid(),
 
1068
                                  FALSE, TRUE))
 
1069
      goto err;
 
1070
    my_free(log_data, MYF(0));
 
1071
  }
 
1072
 
 
1073
  if (!(flags & HA_DONT_TOUCH_DATA))
 
1074
  {
 
1075
    if (ci->data_file_name)
 
1076
    {
 
1077
      char *dext= strrchr(ci->data_file_name, '.');
 
1078
      int have_dext= dext && !strcmp(dext, MARIA_NAME_DEXT);
 
1079
 
 
1080
      if (tmp_table)
 
1081
      {
 
1082
        char *path;
 
1083
        /* chop off the table name, tempory tables use generated name */
 
1084
        if ((path= strrchr(ci->data_file_name, FN_LIBCHAR)))
 
1085
          *path= '\0';
 
1086
        fn_format(filename, name, ci->data_file_name, MARIA_NAME_DEXT,
 
1087
                  MY_REPLACE_DIR | MY_UNPACK_FILENAME | MY_APPEND_EXT);
 
1088
      }
 
1089
      else
 
1090
      {
 
1091
        fn_format(filename, ci->data_file_name, "", MARIA_NAME_DEXT,
 
1092
                  MY_UNPACK_FILENAME |
 
1093
                  (have_dext ? MY_REPLACE_EXT : MY_APPEND_EXT));
 
1094
      }
 
1095
      fn_format(linkname, name, "",MARIA_NAME_DEXT,
 
1096
                MY_UNPACK_FILENAME | MY_APPEND_EXT);
 
1097
      linkname_ptr= linkname;
 
1098
      create_flag=0;
 
1099
    }
 
1100
    else
 
1101
    {
 
1102
      fn_format(filename,name,"", MARIA_NAME_DEXT,
 
1103
                MY_UNPACK_FILENAME | MY_APPEND_EXT);
 
1104
      linkname_ptr= NullS;
 
1105
      create_flag= (flags & HA_CREATE_KEEP_FILES) ? 0 : MY_DELETE_OLD;
 
1106
    }
 
1107
    if ((dfile=
 
1108
         my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
 
1109
                                MYF(MY_WME | create_flag | sync_dir))) < 0)
 
1110
      goto err;
 
1111
    errpos=3;
 
1112
 
 
1113
    if (_ma_initialize_data_file(&share, dfile))
 
1114
      goto err;
 
1115
  }
 
1116
 
 
1117
        /* Enlarge files */
 
1118
  DBUG_PRINT("info", ("enlarge to keystart: %lu",
 
1119
                      (ulong) share.base.keystart));
 
1120
  if (my_chsize(file,(ulong) share.base.keystart,0,MYF(0)))
 
1121
    goto err;
 
1122
 
 
1123
  if (sync_dir && my_sync(file, MYF(0)))
 
1124
    goto err;
 
1125
 
 
1126
  if (! (flags & HA_DONT_TOUCH_DATA))
 
1127
  {
 
1128
#ifdef USE_RELOC
 
1129
    if (my_chsize(dfile,share.base.min_pack_length*ci->reloc_rows,0,MYF(0)))
 
1130
      goto err;
 
1131
#endif
 
1132
    if (sync_dir && my_sync(dfile, MYF(0)))
 
1133
      goto err;
 
1134
    if (my_close(dfile,MYF(0)))
 
1135
      goto err;
 
1136
  }
 
1137
  pthread_mutex_unlock(&THR_LOCK_maria);
 
1138
  res= 0;
 
1139
  my_free((char*) rec_per_key_part,MYF(0));
 
1140
  errpos=0;
 
1141
  if (my_close(file,MYF(0)))
 
1142
    res= my_errno;
 
1143
  DBUG_RETURN(res);
 
1144
 
 
1145
err:
 
1146
  pthread_mutex_unlock(&THR_LOCK_maria);
 
1147
 
 
1148
err_no_lock:
 
1149
  save_errno=my_errno;
 
1150
  switch (errpos) {
 
1151
  case 3:
 
1152
    (void)(my_close(dfile,MYF(0)));
 
1153
    /* fall through */
 
1154
  case 2:
 
1155
  if (! (flags & HA_DONT_TOUCH_DATA))
 
1156
    my_delete_with_symlink(fn_format(filename,name,"",MARIA_NAME_DEXT,
 
1157
                                     MY_UNPACK_FILENAME | MY_APPEND_EXT),
 
1158
                           sync_dir);
 
1159
    /* fall through */
 
1160
  case 1:
 
1161
    (void)(my_close(file,MYF(0)));
 
1162
    if (! (flags & HA_DONT_TOUCH_DATA))
 
1163
      my_delete_with_symlink(fn_format(filename,name,"",MARIA_NAME_IEXT,
 
1164
                                       MY_UNPACK_FILENAME | MY_APPEND_EXT),
 
1165
                             sync_dir);
 
1166
  }
 
1167
  my_free(log_data, MYF(MY_ALLOW_ZERO_PTR));
 
1168
  my_free((char*) rec_per_key_part, MYF(0));
 
1169
  DBUG_RETURN(my_errno=save_errno);             /* return the fatal errno */
 
1170
}
 
1171
 
 
1172
 
 
1173
uint maria_get_pointer_length(ulonglong file_length, uint def)
 
1174
{
 
1175
  DBUG_ASSERT(def >= 2 && def <= 7);
 
1176
  if (file_length)                              /* If not default */
 
1177
  {
 
1178
#ifdef NOT_YET_READY_FOR_8_BYTE_POINTERS
 
1179
    if (file_length >= (ULL(1) << 56))
 
1180
      def=8;
 
1181
    else
 
1182
#endif
 
1183
      if (file_length >= (ULL(1) << 48))
 
1184
      def=7;
 
1185
    else if (file_length >= (ULL(1) << 40))
 
1186
      def=6;
 
1187
    else if (file_length >= (ULL(1) << 32))
 
1188
      def=5;
 
1189
    else if (file_length >= (ULL(1) << 24))
 
1190
      def=4;
 
1191
    else if (file_length >= (ULL(1) << 16))
 
1192
      def=3;
 
1193
    else
 
1194
      def=2;
 
1195
  }
 
1196
  return def;
 
1197
}
 
1198
 
 
1199
 
 
1200
/*
 
1201
  Sort columns for records-in-block
 
1202
 
 
1203
  IMPLEMENTATION
 
1204
   Sort columns in following order:
 
1205
 
 
1206
   Fixed size, not null columns
 
1207
   Fixed length, null fields
 
1208
   Numbers (zero fill fields)
 
1209
   Variable length fields (CHAR, VARCHAR) according to length
 
1210
   Blobs
 
1211
 
 
1212
   For same kind of fields, keep fields in original order
 
1213
*/
 
1214
 
 
1215
static inline int sign(long a)
 
1216
{
 
1217
  return a < 0 ? -1 : (a > 0 ? 1 : 0);
 
1218
}
 
1219
 
 
1220
 
 
1221
static int compare_columns(MARIA_COLUMNDEF **a_ptr, MARIA_COLUMNDEF **b_ptr)
 
1222
{
 
1223
  MARIA_COLUMNDEF *a= *a_ptr, *b= *b_ptr;
 
1224
  enum en_fieldtype a_type, b_type;
 
1225
 
 
1226
  a_type= (a->type == FIELD_CHECK) ? FIELD_NORMAL : a->type;
 
1227
  b_type= (b->type == FIELD_CHECK) ? FIELD_NORMAL : b->type;
 
1228
 
 
1229
  if (a_type == FIELD_NORMAL && !a->null_bit)
 
1230
  {
 
1231
    if (b_type != FIELD_NORMAL || b->null_bit)
 
1232
      return -1;
 
1233
    return sign((long) a->offset - (long) b->offset);
 
1234
  }
 
1235
  if (b_type == FIELD_NORMAL && !b->null_bit)
 
1236
    return 1;
 
1237
  if (a_type == b_type)
 
1238
    return sign((long) a->offset - (long) b->offset);
 
1239
  if (a_type == FIELD_NORMAL)
 
1240
    return -1;
 
1241
  if (b_type == FIELD_NORMAL)
 
1242
    return 1;
 
1243
  if (a_type == FIELD_SKIP_ZERO)
 
1244
    return -1;
 
1245
  if (b_type == FIELD_SKIP_ZERO)
 
1246
    return 1;
 
1247
  if (a->type != FIELD_BLOB && b->type != FIELD_BLOB)
 
1248
    if (a->length != b->length)
 
1249
      return sign((long) a->length - (long) b->length);
 
1250
  if (a_type == FIELD_BLOB)
 
1251
    return 1;
 
1252
  if (b_type == FIELD_BLOB)
 
1253
    return -1;
 
1254
  return sign((long) a->offset - (long) b->offset);
 
1255
}
 
1256
 
 
1257
 
 
1258
/**
 
1259
   @brief Initialize data file
 
1260
 
 
1261
   @note
 
1262
   In BLOCK_RECORD, a freshly created datafile is one page long; while in
 
1263
   other formats it is 0-byte long.
 
1264
 */
 
1265
 
 
1266
int _ma_initialize_data_file(MARIA_SHARE *share, File dfile)
 
1267
{
 
1268
  if (share->data_file_type == BLOCK_RECORD)
 
1269
  {
 
1270
    share->bitmap.block_size= share->base.block_size;
 
1271
    share->bitmap.file.file = dfile;
 
1272
    return _ma_bitmap_create_first(share);
 
1273
  }
 
1274
  return 0;
 
1275
}
 
1276
 
 
1277
 
 
1278
/**
 
1279
   @brief Writes create_rename_lsn, skip_redo_lsn and is_of_horizon to disk,
 
1280
   can force.
 
1281
 
 
1282
   This is for special cases where:
 
1283
   - we don't want to write the full state to disk (so, not call
 
1284
   _ma_state_info_write()) because some parts of the state may be
 
1285
   currently inconsistent, or because it would be overkill
 
1286
   - we must sync these LSNs immediately for correctness.
 
1287
   It acquires intern_lock to protect the LSNs and state write.
 
1288
 
 
1289
   @param  share           table's share
 
1290
   @param  lsn             LSN to write to log files
 
1291
   @param  create_trid     Trid to be used as state.create_trid
 
1292
   @param  do_sync         if the write should be forced to disk
 
1293
   @param  update_create_rename_lsn if this LSN should be updated or not
 
1294
 
 
1295
   @return Operation status
 
1296
     @retval 0      ok
 
1297
     @retval 1      error (disk problem)
 
1298
*/
 
1299
 
 
1300
int _ma_update_state_lsns(MARIA_SHARE *share, LSN lsn, TrID create_trid,
 
1301
                          my_bool do_sync, my_bool update_create_rename_lsn)
 
1302
{
 
1303
  int res;
 
1304
  pthread_mutex_lock(&share->intern_lock);
 
1305
  res= _ma_update_state_lsns_sub(share, lsn, create_trid, do_sync,
 
1306
                                 update_create_rename_lsn);
 
1307
  pthread_mutex_unlock(&share->intern_lock);
 
1308
  return res;
 
1309
}
 
1310
 
 
1311
 
 
1312
/**
 
1313
   @brief Writes create_rename_lsn, skip_redo_lsn and is_of_horizon to disk,
 
1314
   can force.
 
1315
 
 
1316
   Shortcut of _ma_update_state_lsns() when we know that intern_lock is not
 
1317
   needed (when creating a table or opening it for the first time).
 
1318
 
 
1319
   @param  share           table's share
 
1320
   @param  lsn             LSN to write to log files
 
1321
   @param  create_trid     Trid to be used as state.create_trid
 
1322
   @param  do_sync         if the write should be forced to disk
 
1323
   @param  update_create_rename_lsn if this LSN should be updated or not
 
1324
 
 
1325
   @return Operation status
 
1326
     @retval 0      ok
 
1327
     @retval 1      error (disk problem)
 
1328
*/
 
1329
 
 
1330
#if (_MSC_VER == 1310)
 
1331
/*
 
1332
 Visual Studio 2003 compiler produces internal compiler error
 
1333
 in this function. Disable optimizations to workaround.
 
1334
*/
 
1335
#pragma optimize("",off)
 
1336
#endif
 
1337
int _ma_update_state_lsns_sub(MARIA_SHARE *share, LSN lsn, TrID create_trid,
 
1338
                              my_bool do_sync,
 
1339
                              my_bool update_create_rename_lsn)
 
1340
{
 
1341
  uchar buf[LSN_STORE_SIZE * 3], *ptr;
 
1342
  uchar trid_buff[8];
 
1343
  File file= share->kfile.file;
 
1344
  DBUG_ASSERT(file >= 0);
 
1345
  for (ptr= buf; ptr < (buf + sizeof(buf)); ptr+= LSN_STORE_SIZE)
 
1346
    lsn_store(ptr, lsn);
 
1347
  share->state.skip_redo_lsn= share->state.is_of_horizon= lsn;
 
1348
  share->state.create_trid= create_trid;
 
1349
  mi_int8store(trid_buff, create_trid);
 
1350
  if (update_create_rename_lsn)
 
1351
  {
 
1352
    share->state.create_rename_lsn= lsn;
 
1353
    if (share->id != 0)
 
1354
    {
 
1355
      /*
 
1356
        If OP is the operation which is calling us, if table is later written,
 
1357
        we could see in the log:
 
1358
        FILE_ID ... REDO_OP ... REDO_INSERT.
 
1359
        (that can happen in real life at least with OP=REPAIR).
 
1360
        As FILE_ID will be ignored by Recovery because it is <
 
1361
        create_rename_lsn, REDO_INSERT would be ignored too, wrongly.
 
1362
        To avoid that, we force a LOGREC_FILE_ID to be logged at next write:
 
1363
      */
 
1364
      translog_deassign_id_from_share(share);
 
1365
    }
 
1366
  }
 
1367
  else
 
1368
    lsn_store(buf, share->state.create_rename_lsn);
 
1369
  return (my_pwrite(file, buf, sizeof(buf),
 
1370
                    sizeof(share->state.header) +
 
1371
                    MARIA_FILE_CREATE_RENAME_LSN_OFFSET, MYF(MY_NABP)) ||
 
1372
          my_pwrite(file, trid_buff, sizeof(trid_buff),
 
1373
                    sizeof(share->state.header) +
 
1374
                    MARIA_FILE_CREATE_TRID_OFFSET, MYF(MY_NABP)) ||
 
1375
          (do_sync && my_sync(file, MYF(0))));
 
1376
}
 
1377
#if (_MSC_VER == 1310)
 
1378
#pragma optimize("",on)
 
1379
#endif /*VS2003 compiler bug workaround*/