~skinny.moey/drizzle/branch-rev

« back to all changes in this revision

Viewing changes to sql/filesort.cc

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2006 MySQL AB  
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/**
 
18
  @file
 
19
 
 
20
  @brief
 
21
  Sorts a database
 
22
*/
 
23
 
 
24
#include "mysql_priv.h"
 
25
#ifdef HAVE_STDDEF_H
 
26
#include <stddef.h>                     /* for macro offsetof */
 
27
#endif
 
28
#include <m_ctype.h>
 
29
#include "sql_sort.h"
 
30
 
 
31
#ifndef THREAD
 
32
#define SKIP_DBUG_IN_FILESORT
 
33
#endif
 
34
 
 
35
/// How to write record_ref.
 
36
#define WRITE_REF(file,from) \
 
37
if (my_b_write((file),(uchar*) (from),param->ref_length)) \
 
38
  DBUG_RETURN(1);
 
39
 
 
40
        /* functions defined in this file */
 
41
 
 
42
static char **make_char_array(char **old_pos, register uint fields,
 
43
                              uint length, myf my_flag);
 
44
static uchar *read_buffpek_from_file(IO_CACHE *buffer_file, uint count,
 
45
                                     uchar *buf);
 
46
static ha_rows find_all_keys(SORTPARAM *param,SQL_SELECT *select,
 
47
                             uchar * *sort_keys, IO_CACHE *buffer_file,
 
48
                             IO_CACHE *tempfile,IO_CACHE *indexfile);
 
49
static int write_keys(SORTPARAM *param,uchar * *sort_keys,
 
50
                      uint count, IO_CACHE *buffer_file, IO_CACHE *tempfile);
 
51
static void make_sortkey(SORTPARAM *param,uchar *to, uchar *ref_pos);
 
52
static void register_used_fields(SORTPARAM *param);
 
53
static int merge_index(SORTPARAM *param,uchar *sort_buffer,
 
54
                       BUFFPEK *buffpek,
 
55
                       uint maxbuffer,IO_CACHE *tempfile,
 
56
                       IO_CACHE *outfile);
 
57
static bool save_index(SORTPARAM *param,uchar **sort_keys, uint count, 
 
58
                       FILESORT_INFO *table_sort);
 
59
static uint suffix_length(ulong string_length);
 
60
static uint sortlength(THD *thd, SORT_FIELD *sortorder, uint s_length,
 
61
                       bool *multi_byte_charset);
 
62
static SORT_ADDON_FIELD *get_addon_fields(THD *thd, Field **ptabfield,
 
63
                                          uint sortlength, uint *plength);
 
64
static void unpack_addon_fields(struct st_sort_addon_field *addon_field,
 
65
                                uchar *buff);
 
66
/**
 
67
  Sort a table.
 
68
  Creates a set of pointers that can be used to read the rows
 
69
  in sorted order. This should be done with the functions
 
70
  in records.cc.
 
71
 
 
72
  Before calling filesort, one must have done
 
73
  table->file->info(HA_STATUS_VARIABLE)
 
74
 
 
75
  The result set is stored in table->io_cache or
 
76
  table->record_pointers.
 
77
 
 
78
  @param thd           Current thread
 
79
  @param table          Table to sort
 
80
  @param sortorder      How to sort the table
 
81
  @param s_length       Number of elements in sortorder
 
82
  @param select         condition to apply to the rows
 
83
  @param max_rows       Return only this many rows
 
84
  @param sort_positions Set to 1 if we want to force sorting by position
 
85
                        (Needed by UPDATE/INSERT or ALTER TABLE)
 
86
  @param examined_rows  Store number of examined rows here
 
87
 
 
88
  @todo
 
89
    check why we do this (param.keys--)
 
90
  @note
 
91
    If we sort by position (like if sort_positions is 1) filesort() will
 
92
    call table->prepare_for_position().
 
93
 
 
94
  @retval
 
95
    HA_POS_ERROR        Error
 
96
  @retval
 
97
    \#                  Number of rows
 
98
  @retval
 
99
    examined_rows       will be set to number of examined rows
 
100
*/
 
101
 
 
102
ha_rows filesort(THD *thd, TABLE *table, SORT_FIELD *sortorder, uint s_length,
 
103
                 SQL_SELECT *select, ha_rows max_rows,
 
104
                 bool sort_positions, ha_rows *examined_rows)
 
105
{
 
106
  int error;
 
107
  ulong memavl, min_sort_memory;
 
108
  uint maxbuffer;
 
109
  BUFFPEK *buffpek;
 
110
  ha_rows records= HA_POS_ERROR;
 
111
  uchar **sort_keys= 0;
 
112
  IO_CACHE tempfile, buffpek_pointers, *selected_records_file, *outfile; 
 
113
  SORTPARAM param;
 
114
  bool multi_byte_charset;
 
115
  DBUG_ENTER("filesort");
 
116
  DBUG_EXECUTE("info",TEST_filesort(sortorder,s_length););
 
117
#ifdef SKIP_DBUG_IN_FILESORT
 
118
  DBUG_PUSH("");                /* No DBUG here */
 
119
#endif
 
120
  FILESORT_INFO table_sort;
 
121
  TABLE_LIST *tab= table->pos_in_table_list;
 
122
  Item_subselect *subselect= tab ? tab->containing_subselect() : 0;
 
123
 
 
124
  MYSQL_FILESORT_START();
 
125
 
 
126
  /*
 
127
   Release InnoDB's adaptive hash index latch (if holding) before
 
128
   running a sort.
 
129
  */
 
130
  ha_release_temporary_latches(thd);
 
131
 
 
132
  /* 
 
133
    Don't use table->sort in filesort as it is also used by 
 
134
    QUICK_INDEX_MERGE_SELECT. Work with a copy and put it back at the end 
 
135
    when index_merge select has finished with it.
 
136
  */
 
137
  memcpy(&table_sort, &table->sort, sizeof(FILESORT_INFO));
 
138
  table->sort.io_cache= NULL;
 
139
  
 
140
  outfile= table_sort.io_cache;
 
141
  my_b_clear(&tempfile);
 
142
  my_b_clear(&buffpek_pointers);
 
143
  buffpek=0;
 
144
  error= 1;
 
145
  bzero((char*) &param,sizeof(param));
 
146
  param.sort_length= sortlength(thd, sortorder, s_length, &multi_byte_charset);
 
147
  param.ref_length= table->file->ref_length;
 
148
  param.addon_field= 0;
 
149
  param.addon_length= 0;
 
150
  if (!(table->file->ha_table_flags() & HA_FAST_KEY_READ) &&
 
151
      !table->fulltext_searched && !sort_positions)
 
152
  {
 
153
    /* 
 
154
      Get the descriptors of all fields whose values are appended 
 
155
      to sorted fields and get its total length in param.spack_length.
 
156
    */
 
157
    param.addon_field= get_addon_fields(thd, table->field, 
 
158
                                        param.sort_length,
 
159
                                        &param.addon_length);
 
160
  }
 
161
 
 
162
  table_sort.addon_buf= 0;
 
163
  table_sort.addon_length= param.addon_length;
 
164
  table_sort.addon_field= param.addon_field;
 
165
  table_sort.unpack= unpack_addon_fields;
 
166
  if (param.addon_field)
 
167
  {
 
168
    param.res_length= param.addon_length;
 
169
    if (!(table_sort.addon_buf= (uchar *) my_malloc(param.addon_length,
 
170
                                                    MYF(MY_WME))))
 
171
      goto err;
 
172
  }
 
173
  else
 
174
  {
 
175
    param.res_length= param.ref_length;
 
176
    /* 
 
177
      The reference to the record is considered 
 
178
      as an additional sorted field
 
179
    */
 
180
    param.sort_length+= param.ref_length;
 
181
  }
 
182
  param.rec_length= param.sort_length+param.addon_length;
 
183
  param.max_rows= max_rows;
 
184
 
 
185
  if (select && select->quick)
 
186
  {
 
187
    status_var_increment(thd->status_var.filesort_range_count);
 
188
  }
 
189
  else
 
190
  {
 
191
    status_var_increment(thd->status_var.filesort_scan_count);
 
192
  }
 
193
#ifdef CAN_TRUST_RANGE
 
194
  if (select && select->quick && select->quick->records > 0L)
 
195
  {
 
196
    records=min((ha_rows) (select->quick->records*2+EXTRA_RECORDS*2),
 
197
                table->file->stats.records)+EXTRA_RECORDS;
 
198
    selected_records_file=0;
 
199
  }
 
200
  else
 
201
#endif
 
202
  {
 
203
    records= table->file->estimate_rows_upper_bound();
 
204
    /*
 
205
      If number of records is not known, use as much of sort buffer 
 
206
      as possible. 
 
207
    */
 
208
    if (records == HA_POS_ERROR)
 
209
      records--;  // we use 'records+1' below.
 
210
    selected_records_file= 0;
 
211
  }
 
212
 
 
213
  if (multi_byte_charset &&
 
214
      !(param.tmp_buffer= (char*) my_malloc(param.sort_length,MYF(MY_WME))))
 
215
    goto err;
 
216
 
 
217
  memavl= thd->variables.sortbuff_size;
 
218
  min_sort_memory= max(MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
 
219
  while (memavl >= min_sort_memory)
 
220
  {
 
221
    ulong old_memavl;
 
222
    ulong keys= memavl/(param.rec_length+sizeof(char*));
 
223
    param.keys=(uint) min(records+1, keys);
 
224
    if ((table_sort.sort_keys=
 
225
         (uchar **) make_char_array((char **) table_sort.sort_keys,
 
226
                                    param.keys, param.rec_length, MYF(0))))
 
227
      break;
 
228
    old_memavl=memavl;
 
229
    if ((memavl=memavl/4*3) < min_sort_memory && old_memavl > min_sort_memory)
 
230
      memavl= min_sort_memory;
 
231
  }
 
232
  sort_keys= table_sort.sort_keys;
 
233
  if (memavl < min_sort_memory)
 
234
  {
 
235
    my_error(ER_OUT_OF_SORTMEMORY,MYF(ME_ERROR+ME_WAITTANG));
 
236
    goto err;
 
237
  }
 
238
  if (open_cached_file(&buffpek_pointers,mysql_tmpdir,TEMP_PREFIX,
 
239
                       DISK_BUFFER_SIZE, MYF(MY_WME)))
 
240
    goto err;
 
241
 
 
242
  param.keys--;                         /* TODO: check why we do this */
 
243
  param.sort_form= table;
 
244
  param.end=(param.local_sortorder=sortorder)+s_length;
 
245
  if ((records=find_all_keys(&param,select,sort_keys, &buffpek_pointers,
 
246
                             &tempfile, selected_records_file)) ==
 
247
      HA_POS_ERROR)
 
248
    goto err;
 
249
  maxbuffer= (uint) (my_b_tell(&buffpek_pointers)/sizeof(*buffpek));
 
250
 
 
251
  if (maxbuffer == 0)                   // The whole set is in memory
 
252
  {
 
253
    if (save_index(&param,sort_keys,(uint) records, &table_sort))
 
254
      goto err;
 
255
  }
 
256
  else
 
257
  {
 
258
    if (table_sort.buffpek && table_sort.buffpek_len < maxbuffer)
 
259
    {
 
260
      x_free(table_sort.buffpek);
 
261
      table_sort.buffpek= 0;
 
262
    }
 
263
    if (!(table_sort.buffpek=
 
264
          (uchar *) read_buffpek_from_file(&buffpek_pointers, maxbuffer,
 
265
                                 table_sort.buffpek)))
 
266
      goto err;
 
267
    buffpek= (BUFFPEK *) table_sort.buffpek;
 
268
    table_sort.buffpek_len= maxbuffer;
 
269
    close_cached_file(&buffpek_pointers);
 
270
        /* Open cached file if it isn't open */
 
271
    if (! my_b_inited(outfile) &&
 
272
        open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER,
 
273
                          MYF(MY_WME)))
 
274
      goto err;
 
275
    if (reinit_io_cache(outfile,WRITE_CACHE,0L,0,0))
 
276
      goto err;
 
277
 
 
278
    /*
 
279
      Use also the space previously used by string pointers in sort_buffer
 
280
      for temporary key storage.
 
281
    */
 
282
    param.keys=((param.keys*(param.rec_length+sizeof(char*))) /
 
283
                param.rec_length-1);
 
284
    maxbuffer--;                                // Offset from 0
 
285
    if (merge_many_buff(&param,(uchar*) sort_keys,buffpek,&maxbuffer,
 
286
                        &tempfile))
 
287
      goto err;
 
288
    if (flush_io_cache(&tempfile) ||
 
289
        reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
 
290
      goto err;
 
291
    if (merge_index(&param,(uchar*) sort_keys,buffpek,maxbuffer,&tempfile,
 
292
                    outfile))
 
293
      goto err;
 
294
  }
 
295
  if (records > param.max_rows)
 
296
    records=param.max_rows;
 
297
  error =0;
 
298
 
 
299
 err:
 
300
  if (param.tmp_buffer)
 
301
    x_free(param.tmp_buffer);
 
302
  if (!subselect || !subselect->is_uncacheable())
 
303
  {
 
304
    x_free((uchar*) sort_keys);
 
305
    table_sort.sort_keys= 0;
 
306
    x_free((uchar*) buffpek);
 
307
    table_sort.buffpek= 0;
 
308
    table_sort.buffpek_len= 0;
 
309
  }
 
310
  close_cached_file(&tempfile);
 
311
  close_cached_file(&buffpek_pointers);
 
312
  if (my_b_inited(outfile))
 
313
  {
 
314
    if (flush_io_cache(outfile))
 
315
      error=1;
 
316
    {
 
317
      my_off_t save_pos=outfile->pos_in_file;
 
318
      /* For following reads */
 
319
      if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
 
320
        error=1;
 
321
      outfile->end_of_file=save_pos;
 
322
    }
 
323
  }
 
324
  if (error)
 
325
    my_message(ER_FILSORT_ABORT, ER(ER_FILSORT_ABORT),
 
326
               MYF(ME_ERROR+ME_WAITTANG));
 
327
  else
 
328
    statistic_add(thd->status_var.filesort_rows,
 
329
                  (ulong) records, &LOCK_status);
 
330
  *examined_rows= param.examined_rows;
 
331
#ifdef SKIP_DBUG_IN_FILESORT
 
332
  DBUG_POP();                   /* Ok to DBUG */
 
333
#endif
 
334
  memcpy(&table->sort, &table_sort, sizeof(FILESORT_INFO));
 
335
  DBUG_PRINT("exit",("records: %ld", (long) records));
 
336
  MYSQL_FILESORT_END();
 
337
  DBUG_RETURN(error ? HA_POS_ERROR : records);
 
338
} /* filesort */
 
339
 
 
340
 
 
341
void filesort_free_buffers(TABLE *table, bool full)
 
342
{
 
343
  if (table->sort.record_pointers)
 
344
  {
 
345
    my_free((uchar*) table->sort.record_pointers,MYF(0));
 
346
    table->sort.record_pointers=0;
 
347
  }
 
348
  if (full)
 
349
  {
 
350
    if (table->sort.sort_keys )
 
351
    {
 
352
      x_free((uchar*) table->sort.sort_keys);
 
353
      table->sort.sort_keys= 0;
 
354
    }
 
355
    if (table->sort.buffpek)
 
356
    {
 
357
      x_free((uchar*) table->sort.buffpek);
 
358
      table->sort.buffpek= 0;
 
359
      table->sort.buffpek_len= 0;
 
360
    }
 
361
  }
 
362
  if (table->sort.addon_buf)
 
363
  {
 
364
    my_free((char *) table->sort.addon_buf, MYF(0));
 
365
    my_free((char *) table->sort.addon_field, MYF(MY_ALLOW_ZERO_PTR));
 
366
    table->sort.addon_buf=0;
 
367
    table->sort.addon_field=0;
 
368
  }
 
369
}
 
370
 
 
371
/** Make a array of string pointers. */
 
372
 
 
373
static char **make_char_array(char **old_pos, register uint fields,
 
374
                              uint length, myf my_flag)
 
375
{
 
376
  register char **pos;
 
377
  char *char_pos;
 
378
  DBUG_ENTER("make_char_array");
 
379
 
 
380
  if (old_pos ||
 
381
      (old_pos= (char**) my_malloc((uint) fields*(length+sizeof(char*)),
 
382
                                   my_flag)))
 
383
  {
 
384
    pos=old_pos; char_pos=((char*) (pos+fields)) -length;
 
385
    while (fields--) *(pos++) = (char_pos+= length);
 
386
  }
 
387
 
 
388
  DBUG_RETURN(old_pos);
 
389
} /* make_char_array */
 
390
 
 
391
 
 
392
/** Read 'count' number of buffer pointers into memory. */
 
393
 
 
394
static uchar *read_buffpek_from_file(IO_CACHE *buffpek_pointers, uint count,
 
395
                                     uchar *buf)
 
396
{
 
397
  ulong length= sizeof(BUFFPEK)*count;
 
398
  uchar *tmp= buf;
 
399
  DBUG_ENTER("read_buffpek_from_file");
 
400
  if (count > UINT_MAX/sizeof(BUFFPEK))
 
401
    return 0; /* sizeof(BUFFPEK)*count will overflow */
 
402
  if (!tmp)
 
403
    tmp= (uchar *)my_malloc(length, MYF(MY_WME));
 
404
  if (tmp)
 
405
  {
 
406
    if (reinit_io_cache(buffpek_pointers,READ_CACHE,0L,0,0) ||
 
407
        my_b_read(buffpek_pointers, (uchar*) tmp, length))
 
408
    {
 
409
      my_free((char*) tmp, MYF(0));
 
410
      tmp=0;
 
411
    }
 
412
  }
 
413
  DBUG_RETURN(tmp);
 
414
}
 
415
 
 
416
 
 
417
/**
 
418
  Search after sort_keys and write them into tempfile.
 
419
  All produced sequences are guaranteed to be non-empty.
 
420
 
 
421
  @param param             Sorting parameter
 
422
  @param select            Use this to get source data
 
423
  @param sort_keys         Array of pointers to sort key + addon buffers.
 
424
  @param buffpek_pointers  File to write BUFFPEKs describing sorted segments
 
425
                           in tempfile.
 
426
  @param tempfile          File to write sorted sequences of sortkeys to.
 
427
  @param indexfile         If !NULL, use it for source data (contains rowids)
 
428
 
 
429
  @note
 
430
    Basic idea:
 
431
    @verbatim
 
432
     while (get_next_sortkey())
 
433
     {
 
434
       if (no free space in sort_keys buffers)
 
435
       {
 
436
         sort sort_keys buffer;
 
437
         dump sorted sequence to 'tempfile';
 
438
         dump BUFFPEK describing sequence location into 'buffpek_pointers';
 
439
       }
 
440
       put sort key into 'sort_keys';
 
441
     }
 
442
     if (sort_keys has some elements && dumped at least once)
 
443
       sort-dump-dump as above;
 
444
     else
 
445
       don't sort, leave sort_keys array to be sorted by caller.
 
446
  @endverbatim
 
447
 
 
448
  @retval
 
449
    Number of records written on success.
 
450
  @retval
 
451
    HA_POS_ERROR on error.
 
452
*/
 
453
 
 
454
static ha_rows find_all_keys(SORTPARAM *param, SQL_SELECT *select,
 
455
                             uchar **sort_keys,
 
456
                             IO_CACHE *buffpek_pointers,
 
457
                             IO_CACHE *tempfile, IO_CACHE *indexfile)
 
458
{
 
459
  int error,flag,quick_select;
 
460
  uint idx,indexpos,ref_length;
 
461
  uchar *ref_pos,*next_pos,ref_buff[MAX_REFLENGTH];
 
462
  my_off_t record;
 
463
  TABLE *sort_form;
 
464
  THD *thd= current_thd;
 
465
  volatile THD::killed_state *killed= &thd->killed;
 
466
  handler *file;
 
467
  MY_BITMAP *save_read_set, *save_write_set;
 
468
  DBUG_ENTER("find_all_keys");
 
469
  DBUG_PRINT("info",("using: %s",
 
470
                     (select ? select->quick ? "ranges" : "where":
 
471
                      "every row")));
 
472
 
 
473
  idx=indexpos=0;
 
474
  error=quick_select=0;
 
475
  sort_form=param->sort_form;
 
476
  file=sort_form->file;
 
477
  ref_length=param->ref_length;
 
478
  ref_pos= ref_buff;
 
479
  quick_select=select && select->quick;
 
480
  record=0;
 
481
  flag= ((!indexfile && file->ha_table_flags() & HA_REC_NOT_IN_SEQ)
 
482
         || quick_select);
 
483
  if (indexfile || flag)
 
484
    ref_pos= &file->ref[0];
 
485
  next_pos=ref_pos;
 
486
  if (! indexfile && ! quick_select)
 
487
  {
 
488
    next_pos=(uchar*) 0;                        /* Find records in sequence */
 
489
    file->ha_rnd_init(1);
 
490
    file->extra_opt(HA_EXTRA_CACHE,
 
491
                    current_thd->variables.read_buff_size);
 
492
  }
 
493
 
 
494
  READ_RECORD read_record_info;
 
495
  if (quick_select)
 
496
  {
 
497
    if (select->quick->reset())
 
498
      DBUG_RETURN(HA_POS_ERROR);
 
499
    init_read_record(&read_record_info, current_thd, select->quick->head,
 
500
                     select, 1, 1);
 
501
  }
 
502
 
 
503
  /* Remember original bitmaps */
 
504
  save_read_set=  sort_form->read_set;
 
505
  save_write_set= sort_form->write_set;
 
506
  /* Set up temporary column read map for columns used by sort */
 
507
  bitmap_clear_all(&sort_form->tmp_set);
 
508
  /* Temporary set for register_used_fields and register_field_in_read_map */
 
509
  sort_form->read_set= &sort_form->tmp_set;
 
510
  register_used_fields(param);
 
511
  if (select && select->cond)
 
512
    select->cond->walk(&Item::register_field_in_read_map, 1,
 
513
                       (uchar*) sort_form);
 
514
  sort_form->column_bitmaps_set(&sort_form->tmp_set, &sort_form->tmp_set);
 
515
 
 
516
  for (;;)
 
517
  {
 
518
    if (quick_select)
 
519
    {
 
520
      if ((error= read_record_info.read_record(&read_record_info)))
 
521
      {
 
522
        error= HA_ERR_END_OF_FILE;
 
523
        break;
 
524
      }
 
525
      file->position(sort_form->record[0]);
 
526
    }
 
527
    else                                        /* Not quick-select */
 
528
    {
 
529
      if (indexfile)
 
530
      {
 
531
        if (my_b_read(indexfile,(uchar*) ref_pos,ref_length)) /* purecov: deadcode */
 
532
        {
 
533
          error= my_errno ? my_errno : -1;              /* Abort */
 
534
          break;
 
535
        }
 
536
        error=file->rnd_pos(sort_form->record[0],next_pos);
 
537
      }
 
538
      else
 
539
      {
 
540
        error=file->rnd_next(sort_form->record[0]);
 
541
        if (!flag)
 
542
        {
 
543
          my_store_ptr(ref_pos,ref_length,record); // Position to row
 
544
          record+= sort_form->s->db_record_offset;
 
545
        }
 
546
        else if (!error)
 
547
          file->position(sort_form->record[0]);
 
548
      }
 
549
      if (error && error != HA_ERR_RECORD_DELETED)
 
550
        break;
 
551
    }
 
552
 
 
553
    if (*killed)
 
554
    {
 
555
      DBUG_PRINT("info",("Sort killed by user"));
 
556
      if (!indexfile && !quick_select)
 
557
      {
 
558
        (void) file->extra(HA_EXTRA_NO_CACHE);
 
559
        file->ha_rnd_end();
 
560
      }
 
561
      DBUG_RETURN(HA_POS_ERROR);                /* purecov: inspected */
 
562
    }
 
563
    if (error == 0)
 
564
      param->examined_rows++;
 
565
    if (error == 0 && (!select || select->skip_record() == 0))
 
566
    {
 
567
      if (idx == param->keys)
 
568
      {
 
569
        if (write_keys(param,sort_keys,idx,buffpek_pointers,tempfile))
 
570
          DBUG_RETURN(HA_POS_ERROR);
 
571
        idx=0;
 
572
        indexpos++;
 
573
      }
 
574
      make_sortkey(param,sort_keys[idx++],ref_pos);
 
575
    }
 
576
    else
 
577
      file->unlock_row();
 
578
    /* It does not make sense to read more keys in case of a fatal error */
 
579
    if (thd->is_error())
 
580
      break;
 
581
  }
 
582
  if (quick_select)
 
583
  {
 
584
    /*
 
585
      index_merge quick select uses table->sort when retrieving rows, so free
 
586
      resoures it has allocated.
 
587
    */
 
588
    end_read_record(&read_record_info);
 
589
  }
 
590
  else
 
591
  {
 
592
    (void) file->extra(HA_EXTRA_NO_CACHE);      /* End cacheing of records */
 
593
    if (!next_pos)
 
594
      file->ha_rnd_end();
 
595
  }
 
596
 
 
597
  if (thd->is_error())
 
598
    DBUG_RETURN(HA_POS_ERROR);
 
599
  
 
600
  /* Signal we should use orignal column read and write maps */
 
601
  sort_form->column_bitmaps_set(save_read_set, save_write_set);
 
602
 
 
603
  DBUG_PRINT("test",("error: %d  indexpos: %d",error,indexpos));
 
604
  if (error != HA_ERR_END_OF_FILE)
 
605
  {
 
606
    file->print_error(error,MYF(ME_ERROR | ME_WAITTANG)); /* purecov: inspected */
 
607
    DBUG_RETURN(HA_POS_ERROR);                  /* purecov: inspected */
 
608
  }
 
609
  if (indexpos && idx &&
 
610
      write_keys(param,sort_keys,idx,buffpek_pointers,tempfile))
 
611
    DBUG_RETURN(HA_POS_ERROR);                  /* purecov: inspected */
 
612
  DBUG_RETURN(my_b_inited(tempfile) ?
 
613
              (ha_rows) (my_b_tell(tempfile)/param->rec_length) :
 
614
              idx);
 
615
} /* find_all_keys */
 
616
 
 
617
 
 
618
/**
 
619
  @details
 
620
  Sort the buffer and write:
 
621
  -# the sorted sequence to tempfile
 
622
  -# a BUFFPEK describing the sorted sequence position to buffpek_pointers
 
623
 
 
624
    (was: Skriver en buffert med nycklar till filen)
 
625
 
 
626
  @param param             Sort parameters
 
627
  @param sort_keys         Array of pointers to keys to sort
 
628
  @param count             Number of elements in sort_keys array
 
629
  @param buffpek_pointers  One 'BUFFPEK' struct will be written into this file.
 
630
                           The BUFFPEK::{file_pos, count} will indicate where
 
631
                           the sorted data was stored.
 
632
  @param tempfile          The sorted sequence will be written into this file.
 
633
 
 
634
  @retval
 
635
    0 OK
 
636
  @retval
 
637
    1 Error
 
638
*/
 
639
 
 
640
static int
 
641
write_keys(SORTPARAM *param, register uchar **sort_keys, uint count,
 
642
           IO_CACHE *buffpek_pointers, IO_CACHE *tempfile)
 
643
{
 
644
  size_t sort_length, rec_length;
 
645
  uchar **end;
 
646
  BUFFPEK buffpek;
 
647
  DBUG_ENTER("write_keys");
 
648
 
 
649
  sort_length= param->sort_length;
 
650
  rec_length= param->rec_length;
 
651
#ifdef MC68000
 
652
  quicksort(sort_keys,count,sort_length);
 
653
#else
 
654
  my_string_ptr_sort((uchar*) sort_keys, (uint) count, sort_length);
 
655
#endif
 
656
  if (!my_b_inited(tempfile) &&
 
657
      open_cached_file(tempfile, mysql_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
 
658
                       MYF(MY_WME)))
 
659
    goto err;                                   /* purecov: inspected */
 
660
  /* check we won't have more buffpeks than we can possibly keep in memory */
 
661
  if (my_b_tell(buffpek_pointers) + sizeof(BUFFPEK) > (ulonglong)UINT_MAX)
 
662
    goto err;
 
663
  buffpek.file_pos= my_b_tell(tempfile);
 
664
  if ((ha_rows) count > param->max_rows)
 
665
    count=(uint) param->max_rows;               /* purecov: inspected */
 
666
  buffpek.count=(ha_rows) count;
 
667
  for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
 
668
    if (my_b_write(tempfile, (uchar*) *sort_keys, (uint) rec_length))
 
669
      goto err;
 
670
  if (my_b_write(buffpek_pointers, (uchar*) &buffpek, sizeof(buffpek)))
 
671
    goto err;
 
672
  DBUG_RETURN(0);
 
673
 
 
674
err:
 
675
  DBUG_RETURN(1);
 
676
} /* write_keys */
 
677
 
 
678
 
 
679
/**
 
680
  Store length as suffix in high-byte-first order.
 
681
*/
 
682
 
 
683
static inline void store_length(uchar *to, uint length, uint pack_length)
 
684
{
 
685
  switch (pack_length) {
 
686
  case 1:
 
687
    *to= (uchar) length;
 
688
    break;
 
689
  case 2:
 
690
    mi_int2store(to, length);
 
691
    break;
 
692
  case 3:
 
693
    mi_int3store(to, length);
 
694
    break;
 
695
  default:
 
696
    mi_int4store(to, length);
 
697
    break;
 
698
  }
 
699
}
 
700
 
 
701
 
 
702
/** Make a sort-key from record. */
 
703
 
 
704
static void make_sortkey(register SORTPARAM *param,
 
705
                         register uchar *to, uchar *ref_pos)
 
706
{
 
707
  register Field *field;
 
708
  register SORT_FIELD *sort_field;
 
709
  register uint length;
 
710
 
 
711
  for (sort_field=param->local_sortorder ;
 
712
       sort_field != param->end ;
 
713
       sort_field++)
 
714
  {
 
715
    bool maybe_null=0;
 
716
    if ((field=sort_field->field))
 
717
    {                                           // Field
 
718
      if (field->maybe_null())
 
719
      {
 
720
        if (field->is_null())
 
721
        {
 
722
          if (sort_field->reverse)
 
723
            bfill(to,sort_field->length+1,(char) 255);
 
724
          else
 
725
            bzero((char*) to,sort_field->length+1);
 
726
          to+= sort_field->length+1;
 
727
          continue;
 
728
        }
 
729
        else
 
730
          *to++=1;
 
731
      }
 
732
      field->sort_string(to, sort_field->length);
 
733
    }
 
734
    else
 
735
    {                                           // Item
 
736
      Item *item=sort_field->item;
 
737
      maybe_null= item->maybe_null;
 
738
      switch (sort_field->result_type) {
 
739
      case STRING_RESULT:
 
740
      {
 
741
        CHARSET_INFO *cs=item->collation.collation;
 
742
        char fill_char= ((cs->state & MY_CS_BINSORT) ? (char) 0 : ' ');
 
743
        int diff;
 
744
        uint sort_field_length;
 
745
 
 
746
        if (maybe_null)
 
747
          *to++=1;
 
748
        /* All item->str() to use some extra byte for end null.. */
 
749
        String tmp((char*) to,sort_field->length+4,cs);
 
750
        String *res= item->str_result(&tmp);
 
751
        if (!res)
 
752
        {
 
753
          if (maybe_null)
 
754
            bzero((char*) to-1,sort_field->length+1);
 
755
          else
 
756
          {
 
757
            /* purecov: begin deadcode */
 
758
            /*
 
759
              This should only happen during extreme conditions if we run out
 
760
              of memory or have an item marked not null when it can be null.
 
761
              This code is here mainly to avoid a hard crash in this case.
 
762
            */
 
763
            DBUG_ASSERT(0);
 
764
            DBUG_PRINT("warning",
 
765
                       ("Got null on something that shouldn't be null"));
 
766
            bzero((char*) to,sort_field->length);       // Avoid crash
 
767
            /* purecov: end */
 
768
          }
 
769
          break;
 
770
        }
 
771
        length= res->length();
 
772
        sort_field_length= sort_field->length - sort_field->suffix_length;
 
773
        diff=(int) (sort_field_length - length);
 
774
        if (diff < 0)
 
775
        {
 
776
          diff=0;
 
777
          length= sort_field_length;
 
778
        }
 
779
        if (sort_field->suffix_length)
 
780
        {
 
781
          /* Store length last in result_string */
 
782
          store_length(to + sort_field_length, length,
 
783
                       sort_field->suffix_length);
 
784
        }
 
785
        if (sort_field->need_strxnfrm)
 
786
        {
 
787
          char *from=(char*) res->ptr();
 
788
          uint tmp_length;
 
789
          if ((uchar*) from == to)
 
790
          {
 
791
            set_if_smaller(length,sort_field->length);
 
792
            memcpy(param->tmp_buffer,from,length);
 
793
            from=param->tmp_buffer;
 
794
          }
 
795
          tmp_length= my_strnxfrm(cs,to,sort_field->length,
 
796
                                  (uchar*) from, length);
 
797
          DBUG_ASSERT(tmp_length == sort_field->length);
 
798
        }
 
799
        else
 
800
        {
 
801
          my_strnxfrm(cs,(uchar*)to,length,(const uchar*)res->ptr(),length);
 
802
          cs->cset->fill(cs, (char *)to+length,diff,fill_char);
 
803
        }
 
804
        break;
 
805
      }
 
806
      case INT_RESULT:
 
807
        {
 
808
          longlong value= item->val_int_result();
 
809
          if (maybe_null)
 
810
          {
 
811
            *to++=1;                            /* purecov: inspected */
 
812
            if (item->null_value)
 
813
            {
 
814
              if (maybe_null)
 
815
                bzero((char*) to-1,sort_field->length+1);
 
816
              else
 
817
              {
 
818
                DBUG_PRINT("warning",
 
819
                           ("Got null on something that shouldn't be null"));
 
820
                bzero((char*) to,sort_field->length);
 
821
              }
 
822
              break;
 
823
            }
 
824
          }
 
825
#if SIZEOF_LONG_LONG > 4
 
826
          to[7]= (uchar) value;
 
827
          to[6]= (uchar) (value >> 8);
 
828
          to[5]= (uchar) (value >> 16);
 
829
          to[4]= (uchar) (value >> 24);
 
830
          to[3]= (uchar) (value >> 32);
 
831
          to[2]= (uchar) (value >> 40);
 
832
          to[1]= (uchar) (value >> 48);
 
833
          if (item->unsigned_flag)                    /* Fix sign */
 
834
            to[0]= (uchar) (value >> 56);
 
835
          else
 
836
            to[0]= (uchar) (value >> 56) ^ 128; /* Reverse signbit */
 
837
#else
 
838
          to[3]= (uchar) value;
 
839
          to[2]= (uchar) (value >> 8);
 
840
          to[1]= (uchar) (value >> 16);
 
841
          if (item->unsigned_flag)                    /* Fix sign */
 
842
            to[0]= (uchar) (value >> 24);
 
843
          else
 
844
            to[0]= (uchar) (value >> 24) ^ 128; /* Reverse signbit */
 
845
#endif
 
846
          break;
 
847
        }
 
848
      case DECIMAL_RESULT:
 
849
        {
 
850
          my_decimal dec_buf, *dec_val= item->val_decimal_result(&dec_buf);
 
851
          if (maybe_null)
 
852
          {
 
853
            if (item->null_value)
 
854
            { 
 
855
              bzero((char*)to, sort_field->length+1);
 
856
              to++;
 
857
              break;
 
858
            }
 
859
            *to++=1;
 
860
          }
 
861
          my_decimal2binary(E_DEC_FATAL_ERROR, dec_val, to,
 
862
                            item->max_length - (item->decimals ? 1:0),
 
863
                            item->decimals);
 
864
         break;
 
865
        }
 
866
      case REAL_RESULT:
 
867
        {
 
868
          double value= item->val_result();
 
869
          if (maybe_null)
 
870
          {
 
871
            if (item->null_value)
 
872
            {
 
873
              bzero((char*) to,sort_field->length+1);
 
874
              to++;
 
875
              break;
 
876
            }
 
877
            *to++=1;
 
878
          }
 
879
          change_double_for_sort(value,(uchar*) to);
 
880
          break;
 
881
        }
 
882
      case ROW_RESULT:
 
883
      default: 
 
884
        // This case should never be choosen
 
885
        DBUG_ASSERT(0);
 
886
        break;
 
887
      }
 
888
    }
 
889
    if (sort_field->reverse)
 
890
    {                                                   /* Revers key */
 
891
      if (maybe_null)
 
892
        to[-1]= ~to[-1];
 
893
      length=sort_field->length;
 
894
      while (length--)
 
895
      {
 
896
        *to = (uchar) (~ *to);
 
897
        to++;
 
898
      }
 
899
    }
 
900
    else
 
901
      to+= sort_field->length;
 
902
  }
 
903
 
 
904
  if (param->addon_field)
 
905
  {
 
906
    /* 
 
907
      Save field values appended to sorted fields.
 
908
      First null bit indicators are appended then field values follow.
 
909
      In this implementation we use fixed layout for field values -
 
910
      the same for all records.
 
911
    */
 
912
    SORT_ADDON_FIELD *addonf= param->addon_field;
 
913
    uchar *nulls= to;
 
914
    DBUG_ASSERT(addonf != 0);
 
915
    bzero((char *) nulls, addonf->offset);
 
916
    to+= addonf->offset;
 
917
    for ( ; (field= addonf->field) ; addonf++)
 
918
    {
 
919
      if (addonf->null_bit && field->is_null())
 
920
      {
 
921
        nulls[addonf->null_offset]|= addonf->null_bit;
 
922
#ifdef HAVE_purify
 
923
        bzero(to, addonf->length);
 
924
#endif
 
925
      }
 
926
      else
 
927
      {
 
928
#ifdef HAVE_purify
 
929
        uchar *end= field->pack(to, field->ptr);
 
930
        uint length= (uint) ((to + addonf->length) - end);
 
931
        DBUG_ASSERT((int) length >= 0);
 
932
        if (length)
 
933
          bzero(end, length);
 
934
#else
 
935
        (void) field->pack(to, field->ptr);
 
936
#endif
 
937
      }
 
938
      to+= addonf->length;
 
939
    }
 
940
  }
 
941
  else
 
942
  {
 
943
    /* Save filepos last */
 
944
    memcpy((uchar*) to, ref_pos, (size_t) param->ref_length);
 
945
  }
 
946
  return;
 
947
}
 
948
 
 
949
 
 
950
/*
 
951
  Register fields used by sorting in the sorted table's read set
 
952
*/
 
953
 
 
954
static void register_used_fields(SORTPARAM *param)
 
955
{
 
956
  register SORT_FIELD *sort_field;
 
957
  TABLE *table=param->sort_form;
 
958
  MY_BITMAP *bitmap= table->read_set;
 
959
 
 
960
  for (sort_field= param->local_sortorder ;
 
961
       sort_field != param->end ;
 
962
       sort_field++)
 
963
  {
 
964
    Field *field;
 
965
    if ((field= sort_field->field))
 
966
    {
 
967
      if (field->table == table)
 
968
      bitmap_set_bit(bitmap, field->field_index);
 
969
    }
 
970
    else
 
971
    {                                           // Item
 
972
      sort_field->item->walk(&Item::register_field_in_read_map, 1,
 
973
                             (uchar *) table);
 
974
    }
 
975
  }
 
976
 
 
977
  if (param->addon_field)
 
978
  {
 
979
    SORT_ADDON_FIELD *addonf= param->addon_field;
 
980
    Field *field;
 
981
    for ( ; (field= addonf->field) ; addonf++)
 
982
      bitmap_set_bit(bitmap, field->field_index);
 
983
  }
 
984
  else
 
985
  {
 
986
    /* Save filepos last */
 
987
    table->prepare_for_position();
 
988
  }
 
989
}
 
990
 
 
991
 
 
992
static bool save_index(SORTPARAM *param, uchar **sort_keys, uint count, 
 
993
                       FILESORT_INFO *table_sort)
 
994
{
 
995
  uint offset,res_length;
 
996
  uchar *to;
 
997
  DBUG_ENTER("save_index");
 
998
 
 
999
  my_string_ptr_sort((uchar*) sort_keys, (uint) count, param->sort_length);
 
1000
  res_length= param->res_length;
 
1001
  offset= param->rec_length-res_length;
 
1002
  if ((ha_rows) count > param->max_rows)
 
1003
    count=(uint) param->max_rows;
 
1004
  if (!(to= table_sort->record_pointers= 
 
1005
        (uchar*) my_malloc(res_length*count, MYF(MY_WME))))
 
1006
    DBUG_RETURN(1);                 /* purecov: inspected */
 
1007
  for (uchar **end= sort_keys+count ; sort_keys != end ; sort_keys++)
 
1008
  {
 
1009
    memcpy(to, *sort_keys+offset, res_length);
 
1010
    to+= res_length;
 
1011
  }
 
1012
  DBUG_RETURN(0);
 
1013
}
 
1014
 
 
1015
 
 
1016
/** Merge buffers to make < MERGEBUFF2 buffers. */
 
1017
 
 
1018
int merge_many_buff(SORTPARAM *param, uchar *sort_buffer,
 
1019
                    BUFFPEK *buffpek, uint *maxbuffer, IO_CACHE *t_file)
 
1020
{
 
1021
  register uint i;
 
1022
  IO_CACHE t_file2,*from_file,*to_file,*temp;
 
1023
  BUFFPEK *lastbuff;
 
1024
  DBUG_ENTER("merge_many_buff");
 
1025
 
 
1026
  if (*maxbuffer < MERGEBUFF2)
 
1027
    DBUG_RETURN(0);                             /* purecov: inspected */
 
1028
  if (flush_io_cache(t_file) ||
 
1029
      open_cached_file(&t_file2,mysql_tmpdir,TEMP_PREFIX,DISK_BUFFER_SIZE,
 
1030
                        MYF(MY_WME)))
 
1031
    DBUG_RETURN(1);                             /* purecov: inspected */
 
1032
 
 
1033
  from_file= t_file ; to_file= &t_file2;
 
1034
  while (*maxbuffer >= MERGEBUFF2)
 
1035
  {
 
1036
    if (reinit_io_cache(from_file,READ_CACHE,0L,0,0))
 
1037
      goto cleanup;
 
1038
    if (reinit_io_cache(to_file,WRITE_CACHE,0L,0,0))
 
1039
      goto cleanup;
 
1040
    lastbuff=buffpek;
 
1041
    for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
 
1042
    {
 
1043
      if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
 
1044
                        buffpek+i,buffpek+i+MERGEBUFF-1,0))
 
1045
      goto cleanup;
 
1046
    }
 
1047
    if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
 
1048
                      buffpek+i,buffpek+ *maxbuffer,0))
 
1049
      break;                                    /* purecov: inspected */
 
1050
    if (flush_io_cache(to_file))
 
1051
      break;                                    /* purecov: inspected */
 
1052
    temp=from_file; from_file=to_file; to_file=temp;
 
1053
    setup_io_cache(from_file);
 
1054
    setup_io_cache(to_file);
 
1055
    *maxbuffer= (uint) (lastbuff-buffpek)-1;
 
1056
  }
 
1057
cleanup:
 
1058
  close_cached_file(to_file);                   // This holds old result
 
1059
  if (to_file == t_file)
 
1060
  {
 
1061
    *t_file=t_file2;                            // Copy result file
 
1062
    setup_io_cache(t_file);
 
1063
  }
 
1064
 
 
1065
  DBUG_RETURN(*maxbuffer >= MERGEBUFF2);        /* Return 1 if interrupted */
 
1066
} /* merge_many_buff */
 
1067
 
 
1068
 
 
1069
/**
 
1070
  Read data to buffer.
 
1071
 
 
1072
  @retval
 
1073
    (uint)-1 if something goes wrong
 
1074
*/
 
1075
 
 
1076
uint read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
 
1077
                    uint rec_length)
 
1078
{
 
1079
  register uint count;
 
1080
  uint length;
 
1081
 
 
1082
  if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
 
1083
  {
 
1084
    if (my_pread(fromfile->file,(uchar*) buffpek->base,
 
1085
                 (length= rec_length*count),buffpek->file_pos,MYF_RW))
 
1086
      return((uint) -1);                        /* purecov: inspected */
 
1087
    buffpek->key=buffpek->base;
 
1088
    buffpek->file_pos+= length;                 /* New filepos */
 
1089
    buffpek->count-=    count;
 
1090
    buffpek->mem_count= count;
 
1091
  }
 
1092
  return (count*rec_length);
 
1093
} /* read_to_buffer */
 
1094
 
 
1095
 
 
1096
/**
 
1097
  Put all room used by freed buffer to use in adjacent buffer.
 
1098
 
 
1099
  Note, that we can't simply distribute memory evenly between all buffers,
 
1100
  because new areas must not overlap with old ones.
 
1101
 
 
1102
  @param[in] queue      list of non-empty buffers, without freed buffer
 
1103
  @param[in] reuse      empty buffer
 
1104
  @param[in] key_length key length
 
1105
*/
 
1106
 
 
1107
void reuse_freed_buff(QUEUE *queue, BUFFPEK *reuse, uint key_length)
 
1108
{
 
1109
  uchar *reuse_end= reuse->base + reuse->max_keys * key_length;
 
1110
  for (uint i= 0; i < queue->elements; ++i)
 
1111
  {
 
1112
    BUFFPEK *bp= (BUFFPEK *) queue_element(queue, i);
 
1113
    if (bp->base + bp->max_keys * key_length == reuse->base)
 
1114
    {
 
1115
      bp->max_keys+= reuse->max_keys;
 
1116
      return;
 
1117
    }
 
1118
    else if (bp->base == reuse_end)
 
1119
    {
 
1120
      bp->base= reuse->base;
 
1121
      bp->max_keys+= reuse->max_keys;
 
1122
      return;
 
1123
    }
 
1124
  }
 
1125
  DBUG_ASSERT(0);
 
1126
}
 
1127
 
 
1128
 
 
1129
/**
 
1130
  Merge buffers to one buffer.
 
1131
 
 
1132
  @param param        Sort parameter
 
1133
  @param from_file    File with source data (BUFFPEKs point to this file)
 
1134
  @param to_file      File to write the sorted result data.
 
1135
  @param sort_buffer  Buffer for data to store up to MERGEBUFF2 sort keys.
 
1136
  @param lastbuff     OUT Store here BUFFPEK describing data written to to_file
 
1137
  @param Fb           First element in source BUFFPEKs array
 
1138
  @param Tb           Last element in source BUFFPEKs array
 
1139
  @param flag
 
1140
 
 
1141
  @retval
 
1142
    0      OK
 
1143
  @retval
 
1144
    other  error
 
1145
*/
 
1146
 
 
1147
int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
 
1148
                  IO_CACHE *to_file, uchar *sort_buffer,
 
1149
                  BUFFPEK *lastbuff, BUFFPEK *Fb, BUFFPEK *Tb,
 
1150
                  int flag)
 
1151
{
 
1152
  int error;
 
1153
  uint rec_length,res_length,offset;
 
1154
  size_t sort_length;
 
1155
  ulong maxcount;
 
1156
  ha_rows max_rows,org_max_rows;
 
1157
  my_off_t to_start_filepos;
 
1158
  uchar *strpos;
 
1159
  BUFFPEK *buffpek;
 
1160
  QUEUE queue;
 
1161
  qsort2_cmp cmp;
 
1162
  void *first_cmp_arg;
 
1163
  volatile THD::killed_state *killed= &current_thd->killed;
 
1164
  THD::killed_state not_killable;
 
1165
  DBUG_ENTER("merge_buffers");
 
1166
 
 
1167
  status_var_increment(current_thd->status_var.filesort_merge_passes);
 
1168
  if (param->not_killable)
 
1169
  {
 
1170
    killed= &not_killable;
 
1171
    not_killable= THD::NOT_KILLED;
 
1172
  }
 
1173
 
 
1174
  error=0;
 
1175
  rec_length= param->rec_length;
 
1176
  res_length= param->res_length;
 
1177
  sort_length= param->sort_length;
 
1178
  offset= rec_length-res_length;
 
1179
  maxcount= (ulong) (param->keys/((uint) (Tb-Fb) +1));
 
1180
  to_start_filepos= my_b_tell(to_file);
 
1181
  strpos= (uchar*) sort_buffer;
 
1182
  org_max_rows=max_rows= param->max_rows;
 
1183
 
 
1184
  /* The following will fire if there is not enough space in sort_buffer */
 
1185
  DBUG_ASSERT(maxcount!=0);
 
1186
  
 
1187
  if (param->unique_buff)
 
1188
  {
 
1189
    cmp= param->compare;
 
1190
    first_cmp_arg= (void *) &param->cmp_context;
 
1191
  }
 
1192
  else
 
1193
  {
 
1194
    cmp= get_ptr_compare(sort_length);
 
1195
    first_cmp_arg= (void*) &sort_length;
 
1196
  }
 
1197
  if (init_queue(&queue, (uint) (Tb-Fb)+1, offsetof(BUFFPEK,key), 0,
 
1198
                 (queue_compare) cmp, first_cmp_arg))
 
1199
    DBUG_RETURN(1);                                /* purecov: inspected */
 
1200
  for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
 
1201
  {
 
1202
    buffpek->base= strpos;
 
1203
    buffpek->max_keys= maxcount;
 
1204
    strpos+= (uint) (error= (int) read_to_buffer(from_file, buffpek,
 
1205
                                                                         rec_length));
 
1206
    if (error == -1)
 
1207
      goto err;                                 /* purecov: inspected */
 
1208
    buffpek->max_keys= buffpek->mem_count;      // If less data in buffers than expected
 
1209
    queue_insert(&queue, (uchar*) buffpek);
 
1210
  }
 
1211
 
 
1212
  if (param->unique_buff)
 
1213
  {
 
1214
    /* 
 
1215
       Called by Unique::get()
 
1216
       Copy the first argument to param->unique_buff for unique removal.
 
1217
       Store it also in 'to_file'.
 
1218
 
 
1219
       This is safe as we know that there is always more than one element
 
1220
       in each block to merge (This is guaranteed by the Unique:: algorithm
 
1221
    */
 
1222
    buffpek= (BUFFPEK*) queue_top(&queue);
 
1223
    memcpy(param->unique_buff, buffpek->key, rec_length);
 
1224
    if (my_b_write(to_file, (uchar*) buffpek->key, rec_length))
 
1225
    {
 
1226
      error=1; goto err;                        /* purecov: inspected */
 
1227
    }
 
1228
    buffpek->key+= rec_length;
 
1229
    buffpek->mem_count--;
 
1230
    if (!--max_rows)
 
1231
    {
 
1232
      error= 0;                                       /* purecov: inspected */
 
1233
      goto end;                                       /* purecov: inspected */
 
1234
    }
 
1235
    queue_replaced(&queue);                        // Top element has been used
 
1236
  }
 
1237
  else
 
1238
    cmp= 0;                                        // Not unique
 
1239
 
 
1240
  while (queue.elements > 1)
 
1241
  {
 
1242
    if (*killed)
 
1243
    {
 
1244
      error= 1; goto err;                        /* purecov: inspected */
 
1245
    }
 
1246
    for (;;)
 
1247
    {
 
1248
      buffpek= (BUFFPEK*) queue_top(&queue);
 
1249
      if (cmp)                                        // Remove duplicates
 
1250
      {
 
1251
        if (!(*cmp)(first_cmp_arg, &(param->unique_buff),
 
1252
                    (uchar**) &buffpek->key))
 
1253
              goto skip_duplicate;
 
1254
            memcpy(param->unique_buff, (uchar*) buffpek->key, rec_length);
 
1255
      }
 
1256
      if (flag == 0)
 
1257
      {
 
1258
        if (my_b_write(to_file,(uchar*) buffpek->key, rec_length))
 
1259
        {
 
1260
          error=1; goto err;                        /* purecov: inspected */
 
1261
        }
 
1262
      }
 
1263
      else
 
1264
      {
 
1265
        if (my_b_write(to_file, (uchar*) buffpek->key+offset, res_length))
 
1266
        {
 
1267
          error=1; goto err;                        /* purecov: inspected */
 
1268
        }
 
1269
      }
 
1270
      if (!--max_rows)
 
1271
      {
 
1272
        error= 0;                               /* purecov: inspected */
 
1273
        goto end;                               /* purecov: inspected */
 
1274
      }
 
1275
 
 
1276
    skip_duplicate:
 
1277
      buffpek->key+= rec_length;
 
1278
      if (! --buffpek->mem_count)
 
1279
      {
 
1280
        if (!(error= (int) read_to_buffer(from_file,buffpek,
 
1281
                                          rec_length)))
 
1282
        {
 
1283
          VOID(queue_remove(&queue,0));
 
1284
          reuse_freed_buff(&queue, buffpek, rec_length);
 
1285
          break;                        /* One buffer have been removed */
 
1286
        }
 
1287
        else if (error == -1)
 
1288
          goto err;                        /* purecov: inspected */
 
1289
      }
 
1290
      queue_replaced(&queue);              /* Top element has been replaced */
 
1291
    }
 
1292
  }
 
1293
  buffpek= (BUFFPEK*) queue_top(&queue);
 
1294
  buffpek->base= sort_buffer;
 
1295
  buffpek->max_keys= param->keys;
 
1296
 
 
1297
  /*
 
1298
    As we know all entries in the buffer are unique, we only have to
 
1299
    check if the first one is the same as the last one we wrote
 
1300
  */
 
1301
  if (cmp)
 
1302
  {
 
1303
    if (!(*cmp)(first_cmp_arg, &(param->unique_buff), (uchar**) &buffpek->key))
 
1304
    {
 
1305
      buffpek->key+= rec_length;         // Remove duplicate
 
1306
      --buffpek->mem_count;
 
1307
    }
 
1308
  }
 
1309
 
 
1310
  do
 
1311
  {
 
1312
    if ((ha_rows) buffpek->mem_count > max_rows)
 
1313
    {                                        /* Don't write too many records */
 
1314
      buffpek->mem_count= (uint) max_rows;
 
1315
      buffpek->count= 0;                        /* Don't read more */
 
1316
    }
 
1317
    max_rows-= buffpek->mem_count;
 
1318
    if (flag == 0)
 
1319
    {
 
1320
      if (my_b_write(to_file,(uchar*) buffpek->key,
 
1321
                     (rec_length*buffpek->mem_count)))
 
1322
      {
 
1323
        error= 1; goto err;                        /* purecov: inspected */
 
1324
      }
 
1325
    }
 
1326
    else
 
1327
    {
 
1328
      register uchar *end;
 
1329
      strpos= buffpek->key+offset;
 
1330
      for (end= strpos+buffpek->mem_count*rec_length ;
 
1331
           strpos != end ;
 
1332
           strpos+= rec_length)
 
1333
      {     
 
1334
        if (my_b_write(to_file, (uchar *) strpos, res_length))
 
1335
        {
 
1336
          error=1; goto err;                        
 
1337
        }
 
1338
      }
 
1339
    }
 
1340
  }
 
1341
  while ((error=(int) read_to_buffer(from_file,buffpek, rec_length))
 
1342
         != -1 && error != 0);
 
1343
 
 
1344
end:
 
1345
  lastbuff->count= min(org_max_rows-max_rows, param->max_rows);
 
1346
  lastbuff->file_pos= to_start_filepos;
 
1347
err:
 
1348
  delete_queue(&queue);
 
1349
  DBUG_RETURN(error);
 
1350
} /* merge_buffers */
 
1351
 
 
1352
 
 
1353
        /* Do a merge to output-file (save only positions) */
 
1354
 
 
1355
static int merge_index(SORTPARAM *param, uchar *sort_buffer,
 
1356
                       BUFFPEK *buffpek, uint maxbuffer,
 
1357
                       IO_CACHE *tempfile, IO_CACHE *outfile)
 
1358
{
 
1359
  DBUG_ENTER("merge_index");
 
1360
  if (merge_buffers(param,tempfile,outfile,sort_buffer,buffpek,buffpek,
 
1361
                    buffpek+maxbuffer,1))
 
1362
    DBUG_RETURN(1);                             /* purecov: inspected */
 
1363
  DBUG_RETURN(0);
 
1364
} /* merge_index */
 
1365
 
 
1366
 
 
1367
static uint suffix_length(ulong string_length)
 
1368
{
 
1369
  if (string_length < 256)
 
1370
    return 1;
 
1371
  if (string_length < 256L*256L)
 
1372
    return 2;
 
1373
  if (string_length < 256L*256L*256L)
 
1374
    return 3;
 
1375
  return 4;                                     // Can't sort longer than 4G
 
1376
}
 
1377
 
 
1378
 
 
1379
 
 
1380
/**
 
1381
  Calculate length of sort key.
 
1382
 
 
1383
  @param thd                      Thread handler
 
1384
  @param sortorder                Order of items to sort
 
1385
  @param s_length                 Number of items to sort
 
1386
  @param[out] multi_byte_charset Set to 1 if we are using multi-byte charset
 
1387
                                 (In which case we have to use strxnfrm())
 
1388
 
 
1389
  @note
 
1390
    sortorder->length is updated for each sort item.
 
1391
  @n
 
1392
    sortorder->need_strxnfrm is set 1 if we have to use strxnfrm
 
1393
 
 
1394
  @return
 
1395
    Total length of sort buffer in bytes
 
1396
*/
 
1397
 
 
1398
static uint
 
1399
sortlength(THD *thd, SORT_FIELD *sortorder, uint s_length,
 
1400
           bool *multi_byte_charset)
 
1401
{
 
1402
  register uint length;
 
1403
  CHARSET_INFO *cs;
 
1404
  *multi_byte_charset= 0;
 
1405
 
 
1406
  length=0;
 
1407
  for (; s_length-- ; sortorder++)
 
1408
  {
 
1409
    sortorder->need_strxnfrm= 0;
 
1410
    sortorder->suffix_length= 0;
 
1411
    if (sortorder->field)
 
1412
    {
 
1413
      cs= sortorder->field->sort_charset();
 
1414
      sortorder->length= sortorder->field->sort_length();
 
1415
 
 
1416
      if (use_strnxfrm((cs=sortorder->field->sort_charset())))
 
1417
      {
 
1418
        sortorder->need_strxnfrm= 1;
 
1419
        *multi_byte_charset= 1;
 
1420
        sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
 
1421
      }
 
1422
      if (sortorder->field->maybe_null())
 
1423
        length++;                               // Place for NULL marker
 
1424
    }
 
1425
    else
 
1426
    {
 
1427
      sortorder->result_type= sortorder->item->result_type();
 
1428
      if (sortorder->item->result_as_longlong())
 
1429
        sortorder->result_type= INT_RESULT;
 
1430
      switch (sortorder->result_type) {
 
1431
      case STRING_RESULT:
 
1432
        sortorder->length=sortorder->item->max_length;
 
1433
        set_if_smaller(sortorder->length, thd->variables.max_sort_length);
 
1434
        if (use_strnxfrm((cs=sortorder->item->collation.collation)))
 
1435
        { 
 
1436
          sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
 
1437
          sortorder->need_strxnfrm= 1;
 
1438
          *multi_byte_charset= 1;
 
1439
        }
 
1440
        else if (cs == &my_charset_bin)
 
1441
        {
 
1442
          /* Store length last to be able to sort blob/varbinary */
 
1443
          sortorder->suffix_length= suffix_length(sortorder->length);
 
1444
          sortorder->length+= sortorder->suffix_length;
 
1445
        }
 
1446
        break;
 
1447
      case INT_RESULT:
 
1448
#if SIZEOF_LONG_LONG > 4
 
1449
        sortorder->length=8;                    // Size of intern longlong
 
1450
#else
 
1451
        sortorder->length=4;
 
1452
#endif
 
1453
        break;
 
1454
      case DECIMAL_RESULT:
 
1455
        sortorder->length=
 
1456
          my_decimal_get_binary_size(sortorder->item->max_length - 
 
1457
                                     (sortorder->item->decimals ? 1 : 0),
 
1458
                                     sortorder->item->decimals);
 
1459
        break;
 
1460
      case REAL_RESULT:
 
1461
        sortorder->length=sizeof(double);
 
1462
        break;
 
1463
      case ROW_RESULT:
 
1464
      default: 
 
1465
        // This case should never be choosen
 
1466
        DBUG_ASSERT(0);
 
1467
        break;
 
1468
      }
 
1469
      if (sortorder->item->maybe_null)
 
1470
        length++;                               // Place for NULL marker
 
1471
    }
 
1472
    set_if_smaller(sortorder->length, thd->variables.max_sort_length);
 
1473
    length+=sortorder->length;
 
1474
  }
 
1475
  sortorder->field= (Field*) 0;                 // end marker
 
1476
  DBUG_PRINT("info",("sort_length: %d",length));
 
1477
  return length;
 
1478
}
 
1479
 
 
1480
 
 
1481
/**
 
1482
  Get descriptors of fields appended to sorted fields and
 
1483
  calculate its total length.
 
1484
 
 
1485
  The function first finds out what fields are used in the result set.
 
1486
  Then it calculates the length of the buffer to store the values of
 
1487
  these fields together with the value of sort values. 
 
1488
  If the calculated length is not greater than max_length_for_sort_data
 
1489
  the function allocates memory for an array of descriptors containing
 
1490
  layouts for the values of the non-sorted fields in the buffer and
 
1491
  fills them.
 
1492
 
 
1493
  @param thd                 Current thread
 
1494
  @param ptabfield           Array of references to the table fields
 
1495
  @param sortlength          Total length of sorted fields
 
1496
  @param[out] plength        Total length of appended fields
 
1497
 
 
1498
  @note
 
1499
    The null bits for the appended values are supposed to be put together
 
1500
    and stored the buffer just ahead of the value of the first field.
 
1501
 
 
1502
  @return
 
1503
    Pointer to the layout descriptors for the appended fields, if any
 
1504
  @retval
 
1505
    NULL   if we do not store field values with sort data.
 
1506
*/
 
1507
 
 
1508
static SORT_ADDON_FIELD *
 
1509
get_addon_fields(THD *thd, Field **ptabfield, uint sortlength, uint *plength)
 
1510
{
 
1511
  Field **pfield;
 
1512
  Field *field;
 
1513
  SORT_ADDON_FIELD *addonf;
 
1514
  uint length= 0;
 
1515
  uint fields= 0;
 
1516
  uint null_fields= 0;
 
1517
  MY_BITMAP *read_set= (*ptabfield)->table->read_set;
 
1518
 
 
1519
  /*
 
1520
    If there is a reference to a field in the query add it
 
1521
    to the the set of appended fields.
 
1522
    Note for future refinement:
 
1523
    This this a too strong condition.
 
1524
    Actually we need only the fields referred in the
 
1525
    result set. And for some of them it makes sense to use 
 
1526
    the values directly from sorted fields.
 
1527
  */
 
1528
  *plength= 0;
 
1529
 
 
1530
  for (pfield= ptabfield; (field= *pfield) ; pfield++)
 
1531
  {
 
1532
    if (!bitmap_is_set(read_set, field->field_index))
 
1533
      continue;
 
1534
    if (field->flags & BLOB_FLAG)
 
1535
      return 0;
 
1536
    length+= field->max_packed_col_length(field->pack_length());
 
1537
    if (field->maybe_null())
 
1538
      null_fields++;
 
1539
    fields++;
 
1540
  } 
 
1541
  if (!fields)
 
1542
    return 0;
 
1543
  length+= (null_fields+7)/8;
 
1544
 
 
1545
  if (length+sortlength > thd->variables.max_length_for_sort_data ||
 
1546
      !(addonf= (SORT_ADDON_FIELD *) my_malloc(sizeof(SORT_ADDON_FIELD)*
 
1547
                                               (fields+1), MYF(MY_WME))))
 
1548
    return 0;
 
1549
 
 
1550
  *plength= length;
 
1551
  length= (null_fields+7)/8;
 
1552
  null_fields= 0;
 
1553
  for (pfield= ptabfield; (field= *pfield) ; pfield++)
 
1554
  {
 
1555
    if (!bitmap_is_set(read_set, field->field_index))
 
1556
      continue;
 
1557
    addonf->field= field;
 
1558
    addonf->offset= length;
 
1559
    if (field->maybe_null())
 
1560
    {
 
1561
      addonf->null_offset= null_fields/8;
 
1562
      addonf->null_bit= 1<<(null_fields & 7);
 
1563
      null_fields++;
 
1564
    }
 
1565
    else
 
1566
    {
 
1567
      addonf->null_offset= 0;
 
1568
      addonf->null_bit= 0;
 
1569
    }
 
1570
    addonf->length= field->max_packed_col_length(field->pack_length());
 
1571
    length+= addonf->length;
 
1572
    addonf++;
 
1573
  }
 
1574
  addonf->field= 0;     // Put end marker
 
1575
  
 
1576
  DBUG_PRINT("info",("addon_length: %d",length));
 
1577
  return (addonf-fields);
 
1578
}
 
1579
 
 
1580
 
 
1581
/**
 
1582
  Copy (unpack) values appended to sorted fields from a buffer back to
 
1583
  their regular positions specified by the Field::ptr pointers.
 
1584
 
 
1585
  @param addon_field     Array of descriptors for appended fields
 
1586
  @param buff            Buffer which to unpack the value from
 
1587
 
 
1588
  @note
 
1589
    The function is supposed to be used only as a callback function
 
1590
    when getting field values for the sorted result set.
 
1591
 
 
1592
  @return
 
1593
    void.
 
1594
*/
 
1595
 
 
1596
static void 
 
1597
unpack_addon_fields(struct st_sort_addon_field *addon_field, uchar *buff)
 
1598
{
 
1599
  Field *field;
 
1600
  SORT_ADDON_FIELD *addonf= addon_field;
 
1601
 
 
1602
  for ( ; (field= addonf->field) ; addonf++)
 
1603
  {
 
1604
    if (addonf->null_bit && (addonf->null_bit & buff[addonf->null_offset]))
 
1605
    {
 
1606
      field->set_null();
 
1607
      continue;
 
1608
    }
 
1609
    field->set_notnull();
 
1610
    field->unpack(field->ptr, buff + addonf->offset);
 
1611
  }
 
1612
}
 
1613
 
 
1614
/*
 
1615
** functions to change a double or float to a sortable string
 
1616
** The following should work for IEEE
 
1617
*/
 
1618
 
 
1619
#define DBL_EXP_DIG (sizeof(double)*8-DBL_MANT_DIG)
 
1620
 
 
1621
void change_double_for_sort(double nr,uchar *to)
 
1622
{
 
1623
  uchar *tmp=(uchar*) to;
 
1624
  if (nr == 0.0)
 
1625
  {                                             /* Change to zero string */
 
1626
    tmp[0]=(uchar) 128;
 
1627
    bzero((char*) tmp+1,sizeof(nr)-1);
 
1628
  }
 
1629
  else
 
1630
  {
 
1631
#ifdef WORDS_BIGENDIAN
 
1632
    memcpy_fixed(tmp,&nr,sizeof(nr));
 
1633
#else
 
1634
    {
 
1635
      uchar *ptr= (uchar*) &nr;
 
1636
#if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
 
1637
      tmp[0]= ptr[3]; tmp[1]=ptr[2]; tmp[2]= ptr[1]; tmp[3]=ptr[0];
 
1638
      tmp[4]= ptr[7]; tmp[5]=ptr[6]; tmp[6]= ptr[5]; tmp[7]=ptr[4];
 
1639
#else
 
1640
      tmp[0]= ptr[7]; tmp[1]=ptr[6]; tmp[2]= ptr[5]; tmp[3]=ptr[4];
 
1641
      tmp[4]= ptr[3]; tmp[5]=ptr[2]; tmp[6]= ptr[1]; tmp[7]=ptr[0];
 
1642
#endif
 
1643
    }
 
1644
#endif
 
1645
    if (tmp[0] & 128)                           /* Negative */
 
1646
    {                                           /* make complement */
 
1647
      uint i;
 
1648
      for (i=0 ; i < sizeof(nr); i++)
 
1649
        tmp[i]=tmp[i] ^ (uchar) 255;
 
1650
    }
 
1651
    else
 
1652
    {                                   /* Set high and move exponent one up */
 
1653
      ushort exp_part=(((ushort) tmp[0] << 8) | (ushort) tmp[1] |
 
1654
                       (ushort) 32768);
 
1655
      exp_part+= (ushort) 1 << (16-1-DBL_EXP_DIG);
 
1656
      tmp[0]= (uchar) (exp_part >> 8);
 
1657
      tmp[1]= (uchar) exp_part;
 
1658
    }
 
1659
  }
 
1660
}