~linuxjedi/drizzle/trunk-bug-667053

« back to all changes in this revision

Viewing changes to storage/myisam/sort.c

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2006 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/*
 
17
  Creates a index for a database by reading keys, sorting them and outputing
 
18
  them in sorted order through SORT_INFO functions.
 
19
*/
 
20
 
 
21
#include "fulltext.h"
 
22
#if defined(MSDOS) || defined(__WIN__)
 
23
#include <fcntl.h>
 
24
#else
 
25
#include <stddef.h>
 
26
#endif
 
27
#include <queues.h>
 
28
 
 
29
/* static variables */
 
30
 
 
31
#undef MIN_SORT_MEMORY
 
32
#undef MYF_RW
 
33
#undef DISK_BUFFER_SIZE
 
34
 
 
35
#define MERGEBUFF 15
 
36
#define MERGEBUFF2 31
 
37
#define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
 
38
#define MYF_RW  MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
 
39
#define DISK_BUFFER_SIZE (IO_SIZE*16)
 
40
 
 
41
 
 
42
/*
 
43
 Pointers of functions for store and read keys from temp file
 
44
*/
 
45
 
 
46
extern void print_error _VARARGS((const char *fmt,...));
 
47
 
 
48
/* Functions defined in this file */
 
49
 
 
50
static ha_rows  find_all_keys(MI_SORT_PARAM *info,uint keys,
 
51
                                    uchar **sort_keys,
 
52
                                    DYNAMIC_ARRAY *buffpek,int *maxbuffer,
 
53
                                    IO_CACHE *tempfile,
 
54
                                    IO_CACHE *tempfile_for_exceptions);
 
55
static int  write_keys(MI_SORT_PARAM *info,uchar **sort_keys,
 
56
                             uint count, BUFFPEK *buffpek,IO_CACHE *tempfile);
 
57
static int  write_key(MI_SORT_PARAM *info, uchar *key,
 
58
                            IO_CACHE *tempfile);
 
59
static int  write_index(MI_SORT_PARAM *info,uchar * *sort_keys,
 
60
                              uint count);
 
61
static int  merge_many_buff(MI_SORT_PARAM *info,uint keys,
 
62
                                  uchar * *sort_keys,
 
63
                                  BUFFPEK *buffpek,int *maxbuffer,
 
64
                                  IO_CACHE *t_file);
 
65
static uint  read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
 
66
                                  uint sort_length);
 
67
static int  merge_buffers(MI_SORT_PARAM *info,uint keys,
 
68
                                IO_CACHE *from_file, IO_CACHE *to_file,
 
69
                                uchar * *sort_keys, BUFFPEK *lastbuff,
 
70
                                BUFFPEK *Fb, BUFFPEK *Tb);
 
71
static int  merge_index(MI_SORT_PARAM *,uint,uchar **,BUFFPEK *, int,
 
72
                              IO_CACHE *);
 
73
static int flush_ft_buf(MI_SORT_PARAM *info);
 
74
 
 
75
static int  write_keys_varlen(MI_SORT_PARAM *info,uchar **sort_keys,
 
76
                                    uint count, BUFFPEK *buffpek,
 
77
                                    IO_CACHE *tempfile);
 
78
static uint  read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
 
79
                                         uint sort_length);
 
80
static int  write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file,
 
81
                                  uchar *key, uint sort_length, uint count);
 
82
static int  write_merge_key_varlen(MI_SORT_PARAM *info,
 
83
                                         IO_CACHE *to_file,
 
84
                                         uchar* key, uint sort_length,
 
85
                                         uint count);
 
86
static inline int
 
87
my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
 
88
 
 
89
/*
 
90
  Creates a index of sorted keys
 
91
 
 
92
  SYNOPSIS
 
93
    _create_index_by_sort()
 
94
    info                Sort parameters
 
95
    no_messages         Set to 1 if no output
 
96
    sortbuff_size       Size if sortbuffer to allocate
 
97
 
 
98
  RESULT
 
99
    0   ok
 
100
   <> 0 Error
 
101
*/
 
102
 
 
103
int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
 
104
                          ulong sortbuff_size)
 
105
{
 
106
  int error,maxbuffer,skr;
 
107
  uint memavl,old_memavl,keys,sort_length;
 
108
  DYNAMIC_ARRAY buffpek;
 
109
  ha_rows records;
 
110
  uchar **sort_keys;
 
111
  IO_CACHE tempfile, tempfile_for_exceptions;
 
112
  DBUG_ENTER("_create_index_by_sort");
 
113
  DBUG_PRINT("enter",("sort_length: %d", info->key_length));
 
114
 
 
115
  if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
 
116
  {
 
117
    info->write_keys=write_keys_varlen;
 
118
    info->read_to_buffer=read_to_buffer_varlen;
 
119
    info->write_key= write_merge_key_varlen;
 
120
  }
 
121
  else
 
122
  {
 
123
    info->write_keys=write_keys;
 
124
    info->read_to_buffer=read_to_buffer;
 
125
    info->write_key=write_merge_key;
 
126
  }
 
127
 
 
128
  my_b_clear(&tempfile);
 
129
  my_b_clear(&tempfile_for_exceptions);
 
130
  bzero((char*) &buffpek,sizeof(buffpek));
 
131
  sort_keys= (uchar **) NULL; error= 1;
 
132
  maxbuffer=1;
 
133
 
 
134
  memavl=max(sortbuff_size,MIN_SORT_MEMORY);
 
135
  records=      info->sort_info->max_records;
 
136
  sort_length=  info->key_length;
 
137
 
 
138
  while (memavl >= MIN_SORT_MEMORY)
 
139
  {
 
140
    if ((records < UINT_MAX32) && 
 
141
       ((my_off_t) (records + 1) * 
 
142
        (sort_length + sizeof(char*)) <= (my_off_t) memavl))
 
143
      keys= (uint)records+1;
 
144
    else
 
145
      do
 
146
      {
 
147
        skr=maxbuffer;
 
148
        if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer ||
 
149
            (keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/
 
150
             (sort_length+sizeof(char*))) <= 1 ||
 
151
            keys < (uint) maxbuffer)
 
152
        {
 
153
          mi_check_print_error(info->sort_info->param,
 
154
                               "myisam_sort_buffer_size is too small");
 
155
          goto err;
 
156
        }
 
157
      }
 
158
      while ((maxbuffer= (int) (records/(keys-1)+1)) != skr);
 
159
 
 
160
    if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
 
161
                                       HA_FT_MAXBYTELEN, MYF(0))))
 
162
    {
 
163
      if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
 
164
                             maxbuffer/2))
 
165
      {
 
166
        my_free((uchar*) sort_keys,MYF(0));
 
167
        sort_keys= 0;
 
168
      }
 
169
      else
 
170
        break;
 
171
    }
 
172
    old_memavl=memavl;
 
173
    if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
 
174
      memavl=MIN_SORT_MEMORY;
 
175
  }
 
176
  if (memavl < MIN_SORT_MEMORY)
 
177
  {
 
178
    mi_check_print_error(info->sort_info->param,"MyISAM sort buffer too small"); /* purecov: tested */
 
179
    goto err; /* purecov: tested */
 
180
  }
 
181
  (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
 
182
 
 
183
  if (!no_messages)
 
184
    printf("  - Searching for keys, allocating buffer for %d keys\n",keys);
 
185
 
 
186
  if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
 
187
                                  &tempfile,&tempfile_for_exceptions))
 
188
      == HA_POS_ERROR)
 
189
    goto err; /* purecov: tested */
 
190
  if (maxbuffer == 0)
 
191
  {
 
192
    if (!no_messages)
 
193
      printf("  - Dumping %lu keys\n", (ulong) records);
 
194
    if (write_index(info,sort_keys, (uint) records))
 
195
      goto err; /* purecov: inspected */
 
196
  }
 
197
  else
 
198
  {
 
199
    keys=(keys*(sort_length+sizeof(char*)))/sort_length;
 
200
    if (maxbuffer >= MERGEBUFF2)
 
201
    {
 
202
      if (!no_messages)
 
203
        printf("  - Merging %lu keys\n", (ulong) records); /* purecov: tested */
 
204
      if (merge_many_buff(info,keys,sort_keys,
 
205
                  dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
 
206
        goto err;                               /* purecov: inspected */
 
207
    }
 
208
    if (flush_io_cache(&tempfile) ||
 
209
        reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
 
210
      goto err;                                 /* purecov: inspected */
 
211
    if (!no_messages)
 
212
      printf("  - Last merge and dumping keys\n"); /* purecov: tested */
 
213
    if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
 
214
                    maxbuffer,&tempfile))
 
215
      goto err;                                 /* purecov: inspected */
 
216
  }
 
217
 
 
218
  if (flush_ft_buf(info) || flush_pending_blocks(info))
 
219
    goto err;
 
220
 
 
221
  if (my_b_inited(&tempfile_for_exceptions))
 
222
  {
 
223
    MI_INFO *idx=info->sort_info->info;
 
224
    uint     keyno=info->key;
 
225
    uint     key_length, ref_length=idx->s->rec_reflength;
 
226
 
 
227
    if (!no_messages)
 
228
      printf("  - Adding exceptions\n"); /* purecov: tested */
 
229
    if (flush_io_cache(&tempfile_for_exceptions) ||
 
230
        reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
 
231
      goto err;
 
232
 
 
233
    while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
 
234
                      sizeof(key_length))
 
235
        && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
 
236
                      (uint) key_length))
 
237
    {
 
238
        if (_mi_ck_write(idx,keyno,(uchar*) sort_keys,key_length-ref_length))
 
239
          goto err;
 
240
    }
 
241
  }
 
242
 
 
243
  error =0;
 
244
 
 
245
err:
 
246
  if (sort_keys)
 
247
    my_free((uchar*) sort_keys,MYF(0));
 
248
  delete_dynamic(&buffpek);
 
249
  close_cached_file(&tempfile);
 
250
  close_cached_file(&tempfile_for_exceptions);
 
251
 
 
252
  DBUG_RETURN(error ? -1 : 0);
 
253
} /* _create_index_by_sort */
 
254
 
 
255
 
 
256
/* Search after all keys and place them in a temp. file */
 
257
 
 
258
static ha_rows  find_all_keys(MI_SORT_PARAM *info, uint keys,
 
259
                                    uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
 
260
                                    int *maxbuffer, IO_CACHE *tempfile,
 
261
                                    IO_CACHE *tempfile_for_exceptions)
 
262
{
 
263
  int error;
 
264
  uint idx;
 
265
  DBUG_ENTER("find_all_keys");
 
266
 
 
267
  idx=error=0;
 
268
  sort_keys[0]=(uchar*) (sort_keys+keys);
 
269
 
 
270
  while (!(error=(*info->key_read)(info,sort_keys[idx])))
 
271
  {
 
272
    if (info->real_key_length > info->key_length)
 
273
    {
 
274
      if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
 
275
        DBUG_RETURN(HA_POS_ERROR);              /* purecov: inspected */
 
276
      continue;
 
277
    }
 
278
 
 
279
    if (++idx == keys)
 
280
    {
 
281
      if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek),
 
282
                     tempfile))
 
283
      DBUG_RETURN(HA_POS_ERROR);                /* purecov: inspected */
 
284
 
 
285
      sort_keys[0]=(uchar*) (sort_keys+keys);
 
286
      memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
 
287
      idx=1;
 
288
    }
 
289
    sort_keys[idx]=sort_keys[idx-1]+info->key_length;
 
290
  }
 
291
  if (error > 0)
 
292
    DBUG_RETURN(HA_POS_ERROR);          /* Aborted by get_key */ /* purecov: inspected */
 
293
  if (buffpek->elements)
 
294
  {
 
295
    if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
 
296
                   tempfile))
 
297
      DBUG_RETURN(HA_POS_ERROR);                /* purecov: inspected */
 
298
    *maxbuffer=buffpek->elements-1;
 
299
  }
 
300
  else
 
301
    *maxbuffer=0;
 
302
 
 
303
  DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
 
304
} /* find_all_keys */
 
305
 
 
306
 
 
307
#ifdef THREAD
 
308
/* Search after all keys and place them in a temp. file */
 
309
 
 
310
pthread_handler_t thr_find_all_keys(void *arg)
 
311
{
 
312
  MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
 
313
  int error;
 
314
  uint memavl,old_memavl,keys,sort_length;
 
315
  uint idx, maxbuffer;
 
316
  uchar **sort_keys=0;
 
317
 
 
318
  error=1;
 
319
 
 
320
  if (my_thread_init())
 
321
    goto err;
 
322
 
 
323
  { /* Add extra block since DBUG_ENTER declare variables */
 
324
    DBUG_ENTER("thr_find_all_keys");
 
325
    DBUG_PRINT("enter", ("master: %d", sort_param->master));
 
326
    if (sort_param->sort_info->got_error)
 
327
      goto err;
 
328
 
 
329
    if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
 
330
    {
 
331
      sort_param->write_keys=     write_keys_varlen;
 
332
      sort_param->read_to_buffer= read_to_buffer_varlen;
 
333
      sort_param->write_key=      write_merge_key_varlen;
 
334
    }
 
335
    else
 
336
    {
 
337
      sort_param->write_keys=     write_keys;
 
338
      sort_param->read_to_buffer= read_to_buffer;
 
339
      sort_param->write_key=      write_merge_key;
 
340
    }
 
341
 
 
342
    my_b_clear(&sort_param->tempfile);
 
343
    my_b_clear(&sort_param->tempfile_for_exceptions);
 
344
    bzero((char*) &sort_param->buffpek, sizeof(sort_param->buffpek));
 
345
    bzero((char*) &sort_param->unique,  sizeof(sort_param->unique));
 
346
    sort_keys= (uchar **) NULL;
 
347
 
 
348
    memavl=       max(sort_param->sortbuff_size, MIN_SORT_MEMORY);
 
349
    idx=          (uint)sort_param->sort_info->max_records;
 
350
    sort_length=  sort_param->key_length;
 
351
    maxbuffer=    1;
 
352
 
 
353
    while (memavl >= MIN_SORT_MEMORY)
 
354
    {
 
355
      if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <=
 
356
          (my_off_t) memavl)
 
357
        keys= idx+1;
 
358
      else
 
359
      {
 
360
        uint skr;
 
361
        do
 
362
        {
 
363
          skr= maxbuffer;
 
364
          if (memavl < sizeof(BUFFPEK)*maxbuffer ||
 
365
              (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
 
366
               (sort_length+sizeof(char*))) <= 1 ||
 
367
              keys < (uint) maxbuffer)
 
368
          {
 
369
            mi_check_print_error(sort_param->sort_info->param,
 
370
                                 "myisam_sort_buffer_size is too small");
 
371
            goto err;
 
372
          }
 
373
        }
 
374
        while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr);
 
375
      }
 
376
      if ((sort_keys= (uchar**)
 
377
           my_malloc(keys*(sort_length+sizeof(char*))+
 
378
                     ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
 
379
                      HA_FT_MAXBYTELEN : 0), MYF(0))))
 
380
      {
 
381
        if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
 
382
                                  maxbuffer, maxbuffer/2))
 
383
        {
 
384
          my_free((uchar*) sort_keys,MYF(0));
 
385
          sort_keys= (uchar **) NULL; /* for err: label */
 
386
        }
 
387
        else
 
388
          break;
 
389
      }
 
390
      old_memavl= memavl;
 
391
      if ((memavl= memavl/4*3) < MIN_SORT_MEMORY &&
 
392
          old_memavl > MIN_SORT_MEMORY)
 
393
        memavl= MIN_SORT_MEMORY;
 
394
    }
 
395
    if (memavl < MIN_SORT_MEMORY)
 
396
    {
 
397
      mi_check_print_error(sort_param->sort_info->param,
 
398
                           "MyISAM sort buffer too small");
 
399
      goto err; /* purecov: tested */
 
400
    }
 
401
 
 
402
    if (sort_param->sort_info->param->testflag & T_VERBOSE)
 
403
      printf("Key %d - Allocating buffer for %d keys\n",
 
404
             sort_param->key + 1, keys);
 
405
    sort_param->sort_keys= sort_keys;
 
406
 
 
407
    idx= error= 0;
 
408
    sort_keys[0]= (uchar*) (sort_keys+keys);
 
409
 
 
410
    DBUG_PRINT("info", ("reading keys"));
 
411
    while (!(error= sort_param->sort_info->got_error) &&
 
412
           !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
 
413
    {
 
414
      if (sort_param->real_key_length > sort_param->key_length)
 
415
      {
 
416
        if (write_key(sort_param, sort_keys[idx],
 
417
                      &sort_param->tempfile_for_exceptions))
 
418
          goto err;
 
419
        continue;
 
420
      }
 
421
 
 
422
      if (++idx == keys)
 
423
      {
 
424
        if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
 
425
                                   (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
 
426
                                   &sort_param->tempfile))
 
427
          goto err;
 
428
        sort_keys[0]= (uchar*) (sort_keys+keys);
 
429
        memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
 
430
        idx= 1;
 
431
      }
 
432
      sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
 
433
    }
 
434
    if (error > 0)
 
435
      goto err;
 
436
    if (sort_param->buffpek.elements)
 
437
    {
 
438
      if (sort_param->write_keys(sort_param, sort_keys, idx,
 
439
                                 (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
 
440
                                 &sort_param->tempfile))
 
441
        goto err;
 
442
      sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
 
443
    }
 
444
    else
 
445
      sort_param->keys= idx;
 
446
 
 
447
    sort_param->sort_keys_length= keys;
 
448
    goto ok;
 
449
 
 
450
err:
 
451
    DBUG_PRINT("error", ("got some error"));
 
452
    sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
 
453
    if (sort_keys)
 
454
      my_free((uchar*) sort_keys,MYF(0));
 
455
    sort_param->sort_keys= 0;
 
456
    delete_dynamic(& sort_param->buffpek);
 
457
    close_cached_file(&sort_param->tempfile);
 
458
    close_cached_file(&sort_param->tempfile_for_exceptions);
 
459
 
 
460
ok:
 
461
    free_root(&sort_param->wordroot, MYF(0));
 
462
    /*
 
463
      Detach from the share if the writer is involved. Avoid others to
 
464
      be blocked. This includes a flush of the write buffer. This will
 
465
      also indicate EOF to the readers.
 
466
    */
 
467
    if (sort_param->sort_info->info->rec_cache.share)
 
468
      remove_io_thread(&sort_param->sort_info->info->rec_cache);
 
469
 
 
470
    /* Readers detach from the share if any. Avoid others to be blocked. */
 
471
    if (sort_param->read_cache.share)
 
472
      remove_io_thread(&sort_param->read_cache);
 
473
 
 
474
    pthread_mutex_lock(&sort_param->sort_info->mutex);
 
475
    if (!--sort_param->sort_info->threads_running)
 
476
      pthread_cond_signal(&sort_param->sort_info->cond);
 
477
    pthread_mutex_unlock(&sort_param->sort_info->mutex);
 
478
    DBUG_PRINT("exit", ("======== ending thread ========"));
 
479
  }
 
480
  my_thread_end();
 
481
  return NULL;
 
482
}
 
483
 
 
484
 
 
485
int thr_write_keys(MI_SORT_PARAM *sort_param)
 
486
{
 
487
  SORT_INFO *sort_info= sort_param->sort_info;
 
488
  MI_CHECK *param= sort_info->param;
 
489
  ulong length= 0, keys;
 
490
  ulong *rec_per_key_part=param->rec_per_key_part;
 
491
  int got_error=sort_info->got_error;
 
492
  uint i;
 
493
  MI_INFO *info=sort_info->info;
 
494
  MYISAM_SHARE *share=info->s;
 
495
  MI_SORT_PARAM *sinfo;
 
496
  uchar *mergebuf=0;
 
497
  DBUG_ENTER("thr_write_keys");
 
498
 
 
499
  for (i= 0, sinfo= sort_param ;
 
500
       i < sort_info->total_keys ;
 
501
       i++, rec_per_key_part+=sinfo->keyinfo->keysegs, sinfo++)
 
502
  {
 
503
    if (!sinfo->sort_keys)
 
504
    {
 
505
      got_error=1;
 
506
      my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff),
 
507
              MYF(MY_ALLOW_ZERO_PTR));
 
508
      continue;
 
509
    }
 
510
    if (!got_error)
 
511
    {
 
512
      mi_set_key_active(share->state.key_map, sinfo->key);
 
513
      if (!sinfo->buffpek.elements)
 
514
      {
 
515
        if (param->testflag & T_VERBOSE)
 
516
        {
 
517
          printf("Key %d  - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
 
518
          fflush(stdout);
 
519
        }
 
520
        if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
 
521
            flush_ft_buf(sinfo) || flush_pending_blocks(sinfo))
 
522
          got_error=1;
 
523
      }
 
524
      if (!got_error && param->testflag & T_STATISTICS)
 
525
        update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
 
526
                         param->stats_method == MI_STATS_METHOD_IGNORE_NULLS?
 
527
                         sinfo->notnull: NULL,
 
528
                         (ulonglong) info->state->records);
 
529
    }
 
530
    my_free((uchar*) sinfo->sort_keys,MYF(0));
 
531
    my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff),
 
532
            MYF(MY_ALLOW_ZERO_PTR));
 
533
    sinfo->sort_keys=0;
 
534
  }
 
535
 
 
536
  for (i= 0, sinfo= sort_param ;
 
537
       i < sort_info->total_keys ;
 
538
       i++,
 
539
         delete_dynamic(&sinfo->buffpek),
 
540
         close_cached_file(&sinfo->tempfile),
 
541
         close_cached_file(&sinfo->tempfile_for_exceptions),
 
542
         sinfo++)
 
543
  {
 
544
    if (got_error)
 
545
      continue;
 
546
    if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
 
547
    {
 
548
      sinfo->write_keys=write_keys_varlen;
 
549
      sinfo->read_to_buffer=read_to_buffer_varlen;
 
550
      sinfo->write_key=write_merge_key_varlen;
 
551
    }
 
552
    else
 
553
    {
 
554
      sinfo->write_keys=write_keys;
 
555
      sinfo->read_to_buffer=read_to_buffer;
 
556
      sinfo->write_key=write_merge_key;
 
557
    }
 
558
    if (sinfo->buffpek.elements)
 
559
    {
 
560
      uint maxbuffer=sinfo->buffpek.elements-1;
 
561
      if (!mergebuf)
 
562
      {
 
563
        length=param->sort_buffer_length;
 
564
        while (length >= MIN_SORT_MEMORY)
 
565
        {
 
566
          if ((mergebuf= my_malloc(length, MYF(0))))
 
567
              break;
 
568
          length=length*3/4;
 
569
        }
 
570
        if (!mergebuf)
 
571
        {
 
572
          got_error=1;
 
573
          continue;
 
574
        }
 
575
      }
 
576
      keys=length/sinfo->key_length;
 
577
      if (maxbuffer >= MERGEBUFF2)
 
578
      {
 
579
        if (param->testflag & T_VERBOSE)
 
580
          printf("Key %d  - Merging %u keys\n",sinfo->key+1, sinfo->keys);
 
581
        if (merge_many_buff(sinfo, keys, (uchar **)mergebuf,
 
582
                            dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
 
583
                            (int*) &maxbuffer, &sinfo->tempfile))
 
584
        {
 
585
          got_error=1;
 
586
          continue;
 
587
        }
 
588
      }
 
589
      if (flush_io_cache(&sinfo->tempfile) ||
 
590
          reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
 
591
      {
 
592
        got_error=1;
 
593
        continue;
 
594
      }
 
595
      if (param->testflag & T_VERBOSE)
 
596
        printf("Key %d  - Last merge and dumping keys\n", sinfo->key+1);
 
597
      if (merge_index(sinfo, keys, (uchar **)mergebuf,
 
598
                      dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
 
599
                      maxbuffer,&sinfo->tempfile) ||
 
600
          flush_ft_buf(sinfo) ||
 
601
          flush_pending_blocks(sinfo))
 
602
      {
 
603
        got_error=1;
 
604
        continue;
 
605
      }
 
606
    }
 
607
    if (my_b_inited(&sinfo->tempfile_for_exceptions))
 
608
    {
 
609
      uint key_length;
 
610
 
 
611
      if (param->testflag & T_VERBOSE)
 
612
        printf("Key %d  - Dumping 'long' keys\n", sinfo->key+1);
 
613
 
 
614
      if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
 
615
          reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
 
616
      {
 
617
        got_error=1;
 
618
        continue;
 
619
      }
 
620
 
 
621
      while (!got_error &&
 
622
             !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
 
623
                        sizeof(key_length)))
 
624
      {
 
625
        uchar ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
 
626
        if (key_length > sizeof(ft_buf) ||
 
627
            my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)ft_buf,
 
628
                      (uint)key_length) ||
 
629
            _mi_ck_write(info, sinfo->key, (uchar*)ft_buf,
 
630
                         key_length - info->s->rec_reflength))
 
631
          got_error=1;
 
632
      }
 
633
    }
 
634
  }
 
635
  my_free((uchar*) mergebuf,MYF(MY_ALLOW_ZERO_PTR));
 
636
  DBUG_RETURN(got_error);
 
637
}
 
638
#endif /* THREAD */
 
639
 
 
640
        /* Write all keys in memory to file for later merge */
 
641
 
 
642
static int  write_keys(MI_SORT_PARAM *info, register uchar **sort_keys,
 
643
                             uint count, BUFFPEK *buffpek, IO_CACHE *tempfile)
 
644
{
 
645
  uchar **end;
 
646
  uint sort_length=info->key_length;
 
647
  DBUG_ENTER("write_keys");
 
648
 
 
649
  my_qsort2((uchar*) sort_keys,count,sizeof(uchar*),(qsort2_cmp) info->key_cmp,
 
650
            info);
 
651
  if (!my_b_inited(tempfile) &&
 
652
      open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
 
653
                       DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
 
654
    DBUG_RETURN(1); /* purecov: inspected */
 
655
 
 
656
  buffpek->file_pos=my_b_tell(tempfile);
 
657
  buffpek->count=count;
 
658
 
 
659
  for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
 
660
  {
 
661
    if (my_b_write(tempfile,(uchar*) *sort_keys,(uint) sort_length))
 
662
      DBUG_RETURN(1); /* purecov: inspected */
 
663
  }
 
664
  DBUG_RETURN(0);
 
665
} /* write_keys */
 
666
 
 
667
 
 
668
static inline int
 
669
my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
 
670
{
 
671
  int err;
 
672
  uint16 len = _mi_keylength(info->keyinfo, (uchar*) bufs);
 
673
 
 
674
  /* The following is safe as this is a local file */
 
675
  if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
 
676
    return (err);
 
677
  if ((err= my_b_write(to_file,bufs, (uint) len)))
 
678
    return (err);
 
679
  return (0);
 
680
}
 
681
 
 
682
 
 
683
static int  write_keys_varlen(MI_SORT_PARAM *info,
 
684
                                    register uchar **sort_keys,
 
685
                                    uint count, BUFFPEK *buffpek,
 
686
                                    IO_CACHE *tempfile)
 
687
{
 
688
  uchar **end;
 
689
  int err;
 
690
  DBUG_ENTER("write_keys_varlen");
 
691
 
 
692
  my_qsort2((uchar*) sort_keys,count,sizeof(uchar*),(qsort2_cmp) info->key_cmp,
 
693
            info);
 
694
  if (!my_b_inited(tempfile) &&
 
695
      open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
 
696
                       DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
 
697
    DBUG_RETURN(1); /* purecov: inspected */
 
698
 
 
699
  buffpek->file_pos=my_b_tell(tempfile);
 
700
  buffpek->count=count;
 
701
  for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
 
702
  {
 
703
    if ((err= my_var_write(info,tempfile, (uchar*) *sort_keys)))
 
704
      DBUG_RETURN(err);
 
705
  }
 
706
  DBUG_RETURN(0);
 
707
} /* write_keys_varlen */
 
708
 
 
709
 
 
710
static int  write_key(MI_SORT_PARAM *info, uchar *key,
 
711
                            IO_CACHE *tempfile)
 
712
{
 
713
  uint key_length=info->real_key_length;
 
714
  DBUG_ENTER("write_key");
 
715
 
 
716
  if (!my_b_inited(tempfile) &&
 
717
      open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
 
718
                       DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
 
719
    DBUG_RETURN(1);
 
720
 
 
721
  if (my_b_write(tempfile,(uchar*)&key_length,sizeof(key_length)) ||
 
722
      my_b_write(tempfile,(uchar*)key,(uint) key_length))
 
723
    DBUG_RETURN(1);
 
724
  DBUG_RETURN(0);
 
725
} /* write_key */
 
726
 
 
727
 
 
728
/* Write index */
 
729
 
 
730
static int  write_index(MI_SORT_PARAM *info, register uchar **sort_keys,
 
731
                              register uint count)
 
732
{
 
733
  DBUG_ENTER("write_index");
 
734
 
 
735
  my_qsort2((uchar*) sort_keys,(size_t) count,sizeof(uchar*),
 
736
           (qsort2_cmp) info->key_cmp,info);
 
737
  while (count--)
 
738
  {
 
739
    if ((*info->key_write)(info,*sort_keys++))
 
740
      DBUG_RETURN(-1); /* purecov: inspected */
 
741
  }
 
742
  DBUG_RETURN(0);
 
743
} /* write_index */
 
744
 
 
745
 
 
746
        /* Merge buffers to make < MERGEBUFF2 buffers */
 
747
 
 
748
static int  merge_many_buff(MI_SORT_PARAM *info, uint keys,
 
749
                                  uchar **sort_keys, BUFFPEK *buffpek,
 
750
                                  int *maxbuffer, IO_CACHE *t_file)
 
751
{
 
752
  register int i;
 
753
  IO_CACHE t_file2, *from_file, *to_file, *temp;
 
754
  BUFFPEK *lastbuff;
 
755
  DBUG_ENTER("merge_many_buff");
 
756
 
 
757
  if (*maxbuffer < MERGEBUFF2)
 
758
    DBUG_RETURN(0);                             /* purecov: inspected */
 
759
  if (flush_io_cache(t_file) ||
 
760
      open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
 
761
                       DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
 
762
    DBUG_RETURN(1);                             /* purecov: inspected */
 
763
 
 
764
  from_file= t_file ; to_file= &t_file2;
 
765
  while (*maxbuffer >= MERGEBUFF2)
 
766
  {
 
767
    reinit_io_cache(from_file,READ_CACHE,0L,0,0);
 
768
    reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
 
769
    lastbuff=buffpek;
 
770
    for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
 
771
    {
 
772
      if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
 
773
                        buffpek+i,buffpek+i+MERGEBUFF-1))
 
774
        goto cleanup;
 
775
    }
 
776
    if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
 
777
                      buffpek+i,buffpek+ *maxbuffer))
 
778
      break; /* purecov: inspected */
 
779
    if (flush_io_cache(to_file))
 
780
      break;                                    /* purecov: inspected */
 
781
    temp=from_file; from_file=to_file; to_file=temp;
 
782
    *maxbuffer= (int) (lastbuff-buffpek)-1;
 
783
  }
 
784
cleanup:
 
785
  close_cached_file(to_file);                   /* This holds old result */
 
786
  if (to_file == t_file)
 
787
    *t_file=t_file2;                            /* Copy result file */
 
788
 
 
789
  DBUG_RETURN(*maxbuffer >= MERGEBUFF2);        /* Return 1 if interrupted */
 
790
} /* merge_many_buff */
 
791
 
 
792
 
 
793
/*
 
794
   Read data to buffer
 
795
 
 
796
  SYNOPSIS
 
797
    read_to_buffer()
 
798
    fromfile            File to read from
 
799
    buffpek             Where to read from
 
800
    sort_length         max length to read
 
801
  RESULT
 
802
    > 0 Ammount of bytes read
 
803
    -1  Error
 
804
*/
 
805
 
 
806
static uint  read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
 
807
                                  uint sort_length)
 
808
{
 
809
  register uint count;
 
810
  uint length;
 
811
 
 
812
  if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
 
813
  {
 
814
    if (my_pread(fromfile->file,(uchar*) buffpek->base,
 
815
                 (length= sort_length*count),buffpek->file_pos,MYF_RW))
 
816
      return((uint) -1);                        /* purecov: inspected */
 
817
    buffpek->key=buffpek->base;
 
818
    buffpek->file_pos+= length;                 /* New filepos */
 
819
    buffpek->count-=    count;
 
820
    buffpek->mem_count= count;
 
821
  }
 
822
  return (count*sort_length);
 
823
} /* read_to_buffer */
 
824
 
 
825
static uint  read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
 
826
                                         uint sort_length)
 
827
{
 
828
  register uint count;
 
829
  uint16 length_of_key = 0;
 
830
  uint idx;
 
831
  uchar *buffp;
 
832
 
 
833
  if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
 
834
  {
 
835
    buffp = buffpek->base;
 
836
 
 
837
    for (idx=1;idx<=count;idx++)
 
838
    {
 
839
      if (my_pread(fromfile->file,(uchar*)&length_of_key,sizeof(length_of_key),
 
840
                   buffpek->file_pos,MYF_RW))
 
841
        return((uint) -1);
 
842
      buffpek->file_pos+=sizeof(length_of_key);
 
843
      if (my_pread(fromfile->file,(uchar*) buffp,length_of_key,
 
844
                   buffpek->file_pos,MYF_RW))
 
845
        return((uint) -1);
 
846
      buffpek->file_pos+=length_of_key;
 
847
      buffp = buffp + sort_length;
 
848
    }
 
849
    buffpek->key=buffpek->base;
 
850
    buffpek->count-=    count;
 
851
    buffpek->mem_count= count;
 
852
  }
 
853
  return (count*sort_length);
 
854
} /* read_to_buffer_varlen */
 
855
 
 
856
 
 
857
static int  write_merge_key_varlen(MI_SORT_PARAM *info,
 
858
                                         IO_CACHE *to_file, uchar* key,
 
859
                                         uint sort_length, uint count)
 
860
{
 
861
  uint idx;
 
862
  uchar *bufs = key;
 
863
 
 
864
  for (idx=1;idx<=count;idx++)
 
865
  {
 
866
    int err;
 
867
    if ((err= my_var_write(info, to_file, bufs)))
 
868
      return (err);
 
869
    bufs=bufs+sort_length;
 
870
  }
 
871
  return(0);
 
872
}
 
873
 
 
874
 
 
875
static int  write_merge_key(MI_SORT_PARAM *info __attribute__((unused)),
 
876
                                  IO_CACHE *to_file, uchar *key,
 
877
                                  uint sort_length, uint count)
 
878
{
 
879
  return my_b_write(to_file, key, (size_t) sort_length*count);
 
880
}
 
881
 
 
882
/*
 
883
  Merge buffers to one buffer
 
884
  If to_file == 0 then use info->key_write
 
885
*/
 
886
 
 
887
static int
 
888
merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
 
889
              IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
 
890
              BUFFPEK *Fb, BUFFPEK *Tb)
 
891
{
 
892
  int error;
 
893
  uint sort_length,maxcount;
 
894
  ha_rows count;
 
895
  my_off_t to_start_filepos= 0;
 
896
  uchar *strpos;
 
897
  BUFFPEK *buffpek,**refpek;
 
898
  QUEUE queue;
 
899
  volatile int *killed= killed_ptr(info->sort_info->param);
 
900
  DBUG_ENTER("merge_buffers");
 
901
 
 
902
  count=error=0;
 
903
  maxcount=keys/((uint) (Tb-Fb) +1);
 
904
  DBUG_ASSERT(maxcount > 0);
 
905
  if (to_file)
 
906
    to_start_filepos=my_b_tell(to_file);
 
907
  strpos=(uchar*) sort_keys;
 
908
  sort_length=info->key_length;
 
909
 
 
910
  if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
 
911
                 (int (*)(void*, uchar *,uchar*)) info->key_cmp,
 
912
                 (void*) info))
 
913
    DBUG_RETURN(1); /* purecov: inspected */
 
914
 
 
915
  for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
 
916
  {
 
917
    count+= buffpek->count;
 
918
    buffpek->base= strpos;
 
919
    buffpek->max_keys=maxcount;
 
920
    strpos+= (uint) (error=(int) info->read_to_buffer(from_file,buffpek,
 
921
                                                      sort_length));
 
922
    if (error == -1)
 
923
      goto err; /* purecov: inspected */
 
924
    queue_insert(&queue,(uchar*) buffpek);
 
925
  }
 
926
 
 
927
  while (queue.elements > 1)
 
928
  {
 
929
    for (;;)
 
930
    {
 
931
      if (*killed)
 
932
      {
 
933
        error=1; goto err;
 
934
      }
 
935
      buffpek=(BUFFPEK*) queue_top(&queue);
 
936
      if (to_file)
 
937
      {
 
938
        if (info->write_key(info,to_file,(uchar*) buffpek->key,
 
939
                            (uint) sort_length,1))
 
940
        {
 
941
          error=1; goto err; /* purecov: inspected */
 
942
        }
 
943
      }
 
944
      else
 
945
      {
 
946
        if ((*info->key_write)(info,(void*) buffpek->key))
 
947
        {
 
948
          error=1; goto err; /* purecov: inspected */
 
949
        }
 
950
      }
 
951
      buffpek->key+=sort_length;
 
952
      if (! --buffpek->mem_count)
 
953
      {
 
954
        if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
 
955
        {
 
956
          uchar *base=buffpek->base;
 
957
          uint max_keys=buffpek->max_keys;
 
958
 
 
959
          VOID(queue_remove(&queue,0));
 
960
 
 
961
          /* Put room used by buffer to use in other buffer */
 
962
          for (refpek= (BUFFPEK**) &queue_top(&queue);
 
963
               refpek <= (BUFFPEK**) &queue_end(&queue);
 
964
               refpek++)
 
965
          {
 
966
            buffpek= *refpek;
 
967
            if (buffpek->base+buffpek->max_keys*sort_length == base)
 
968
            {
 
969
              buffpek->max_keys+=max_keys;
 
970
              break;
 
971
            }
 
972
            else if (base+max_keys*sort_length == buffpek->base)
 
973
            {
 
974
              buffpek->base=base;
 
975
              buffpek->max_keys+=max_keys;
 
976
              break;
 
977
            }
 
978
          }
 
979
          break;                /* One buffer have been removed */
 
980
        }
 
981
      }
 
982
      else if (error == -1)
 
983
        goto err;               /* purecov: inspected */
 
984
      queue_replaced(&queue);   /* Top element has been replaced */
 
985
    }
 
986
  }
 
987
  buffpek=(BUFFPEK*) queue_top(&queue);
 
988
  buffpek->base=(uchar *) sort_keys;
 
989
  buffpek->max_keys=keys;
 
990
  do
 
991
  {
 
992
    if (to_file)
 
993
    {
 
994
      if (info->write_key(info,to_file,(uchar*) buffpek->key,
 
995
                         sort_length,buffpek->mem_count))
 
996
      {
 
997
        error=1; goto err; /* purecov: inspected */
 
998
      }
 
999
    }
 
1000
    else
 
1001
    {
 
1002
      register uchar *end;
 
1003
      strpos= buffpek->key;
 
1004
      for (end=strpos+buffpek->mem_count*sort_length;
 
1005
           strpos != end ;
 
1006
           strpos+=sort_length)
 
1007
      {
 
1008
        if ((*info->key_write)(info,(void*) strpos))
 
1009
        {
 
1010
          error=1; goto err; /* purecov: inspected */
 
1011
        }
 
1012
      }
 
1013
    }
 
1014
  }
 
1015
  while ((error=(int) info->read_to_buffer(from_file,buffpek,sort_length)) != -1 &&
 
1016
         error != 0);
 
1017
 
 
1018
  lastbuff->count=count;
 
1019
  if (to_file)
 
1020
    lastbuff->file_pos=to_start_filepos;
 
1021
err:
 
1022
  delete_queue(&queue);
 
1023
  DBUG_RETURN(error);
 
1024
} /* merge_buffers */
 
1025
 
 
1026
 
 
1027
        /* Do a merge to output-file (save only positions) */
 
1028
 
 
1029
static int
 
1030
merge_index(MI_SORT_PARAM *info, uint keys, uchar **sort_keys,
 
1031
            BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile)
 
1032
{
 
1033
  DBUG_ENTER("merge_index");
 
1034
  if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
 
1035
                    buffpek+maxbuffer))
 
1036
    DBUG_RETURN(1); /* purecov: inspected */
 
1037
  DBUG_RETURN(0);
 
1038
} /* merge_index */
 
1039
 
 
1040
static int
 
1041
flush_ft_buf(MI_SORT_PARAM *info)
 
1042
{
 
1043
  int err=0;
 
1044
  if (info->sort_info->ft_buf)
 
1045
  {
 
1046
    err=sort_ft_buf_flush(info);
 
1047
    my_free((uchar*)info->sort_info->ft_buf, MYF(0));
 
1048
    info->sort_info->ft_buf=0;
 
1049
  }
 
1050
  return err;
 
1051
}
 
1052