~ubuntu-branches/ubuntu/saucy/drizzle/saucy-proposed

« back to all changes in this revision

Viewing changes to .pc/debian-changes-2010.12.06-0ubuntu4/drizzled/filesort.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2011-01-04 09:31:58 UTC
  • mfrom: (1.2.5 upstream)
  • Revision ID: james.westby@ubuntu.com-20110104093158-smhgvkfdi2y9au3i
Tags: 2011.01.07-0ubuntu1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2000-2006 MySQL AB
2
 
 
3
 
   This program is free software; you can redistribute it and/or modify
4
 
   it under the terms of the GNU General Public License as published by
5
 
   the Free Software Foundation; version 2 of the License.
6
 
 
7
 
   This program is distributed in the hope that it will be useful,
8
 
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 
   GNU General Public License for more details.
11
 
 
12
 
   You should have received a copy of the GNU General Public License
13
 
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
 
 
16
 
 
17
 
/**
18
 
  @file
19
 
 
20
 
  @brief
21
 
  Sorts a database
22
 
*/
23
 
 
24
 
#include "config.h"
25
 
 
26
 
#include <float.h>
27
 
#include <limits.h>
28
 
 
29
 
#include <queue>
30
 
#include <algorithm>
31
 
#include <iostream>
32
 
 
33
 
#include "drizzled/drizzled.h"
34
 
#include "drizzled/sql_sort.h"
35
 
#include "drizzled/filesort.h"
36
 
#include "drizzled/error.h"
37
 
#include "drizzled/probes.h"
38
 
#include "drizzled/session.h"
39
 
#include "drizzled/table.h"
40
 
#include "drizzled/table_list.h"
41
 
#include "drizzled/optimizer/range.h"
42
 
#include "drizzled/records.h"
43
 
#include "drizzled/internal/iocache.h"
44
 
#include "drizzled/internal/my_sys.h"
45
 
#include "plugin/myisam/myisam.h"
46
 
#include "drizzled/plugin/transactional_storage_engine.h"
47
 
#include "drizzled/atomics.h"
48
 
#include "drizzled/global_buffer.h"
49
 
 
50
 
 
51
 
using namespace std;
52
 
 
53
 
namespace drizzled
54
 
{
55
 
 
56
 
/* Defines used by filesort and uniques */
57
 
#define MERGEBUFF               7
58
 
#define MERGEBUFF2              15
59
 
 
60
 
class BufferCompareContext
61
 
{
62
 
public:
63
 
  qsort_cmp2 key_compare;
64
 
  void *key_compare_arg;
65
 
 
66
 
  BufferCompareContext() :
67
 
    key_compare(0),
68
 
    key_compare_arg(0)
69
 
  { }
70
 
 
71
 
};
72
 
 
73
 
class SortParam {
74
 
public:
75
 
  uint32_t rec_length;          /* Length of sorted records */
76
 
  uint32_t sort_length;                 /* Length of sorted columns */
77
 
  uint32_t ref_length;                  /* Length of record ref. */
78
 
  uint32_t addon_length;        /* Length of added packed fields */
79
 
  uint32_t res_length;          /* Length of records in final sorted file/buffer */
80
 
  uint32_t keys;                                /* Max keys / buffer */
81
 
  ha_rows max_rows,examined_rows;
82
 
  Table *sort_form;                     /* For quicker make_sortkey */
83
 
  SortField *local_sortorder;
84
 
  SortField *end;
85
 
  sort_addon_field *addon_field; /* Descriptors for companion fields */
86
 
  unsigned char *unique_buff;
87
 
  bool not_killable;
88
 
  char *tmp_buffer;
89
 
  /* The fields below are used only by Unique class */
90
 
  qsort2_cmp compare;
91
 
  BufferCompareContext cmp_context;
92
 
 
93
 
  SortParam() :
94
 
    rec_length(0),
95
 
    sort_length(0),
96
 
    ref_length(0),
97
 
    addon_length(0),
98
 
    res_length(0),
99
 
    keys(0),
100
 
    max_rows(0),
101
 
    examined_rows(0),
102
 
    sort_form(0),
103
 
    local_sortorder(0),
104
 
    end(0),
105
 
    addon_field(0),
106
 
    unique_buff(0),
107
 
    not_killable(0),
108
 
    tmp_buffer(0),
109
 
    compare(0)
110
 
  {
111
 
  }
112
 
 
113
 
  ~SortParam()
114
 
  {
115
 
    if (tmp_buffer)
116
 
      free(tmp_buffer);
117
 
  }
118
 
 
119
 
  int write_keys(unsigned char * *sort_keys,
120
 
                 uint32_t count,
121
 
                 internal::IO_CACHE *buffer_file,
122
 
                 internal::IO_CACHE *tempfile);
123
 
 
124
 
  void make_sortkey(unsigned char *to,
125
 
                    unsigned char *ref_pos);
126
 
  void register_used_fields();
127
 
  bool save_index(unsigned char **sort_keys,
128
 
                  uint32_t count,
129
 
                  filesort_info *table_sort);
130
 
 
131
 
};
132
 
 
133
 
/* functions defined in this file */
134
 
 
135
 
static char **make_char_array(char **old_pos, register uint32_t fields,
136
 
                              uint32_t length);
137
 
 
138
 
static unsigned char *read_buffpek_from_file(internal::IO_CACHE *buffer_file,
139
 
                                             uint32_t count,
140
 
                                             unsigned char *buf);
141
 
 
142
 
static uint32_t suffix_length(uint32_t string_length);
143
 
static void unpack_addon_fields(sort_addon_field *addon_field,
144
 
                                unsigned char *buff);
145
 
 
146
 
FileSort::FileSort(Session &arg) :
147
 
  _session(arg)
148
 
149
 
}
150
 
 
151
 
/**
152
 
  Sort a table.
153
 
  Creates a set of pointers that can be used to read the rows
154
 
  in sorted order. This should be done with the functions
155
 
  in records.cc.
156
 
 
157
 
  Before calling filesort, one must have done
158
 
  table->file->info(HA_STATUS_VARIABLE)
159
 
 
160
 
  The result set is stored in table->io_cache or
161
 
  table->record_pointers.
162
 
 
163
 
  @param table          Table to sort
164
 
  @param sortorder      How to sort the table
165
 
  @param s_length       Number of elements in sortorder
166
 
  @param select         condition to apply to the rows
167
 
  @param max_rows       Return only this many rows
168
 
  @param sort_positions Set to 1 if we want to force sorting by position
169
 
                        (Needed by UPDATE/INSERT or ALTER Table)
170
 
  @param examined_rows  Store number of examined rows here
171
 
 
172
 
  @todo
173
 
    check why we do this (param.keys--)
174
 
  @note
175
 
    If we sort by position (like if sort_positions is 1) filesort() will
176
 
    call table->prepare_for_position().
177
 
 
178
 
  @retval
179
 
    HA_POS_ERROR        Error
180
 
  @retval
181
 
    \#                  Number of rows
182
 
  @retval
183
 
    examined_rows       will be set to number of examined rows
184
 
*/
185
 
 
186
 
ha_rows FileSort::run(Table *table, SortField *sortorder, uint32_t s_length,
187
 
                      optimizer::SqlSelect *select, ha_rows max_rows,
188
 
                      bool sort_positions, ha_rows &examined_rows)
189
 
{
190
 
  int error= 1;
191
 
  uint32_t memavl= 0, min_sort_memory;
192
 
  uint32_t maxbuffer;
193
 
  size_t allocated_sort_memory= 0;
194
 
  buffpek *buffpek_inst= 0;
195
 
  ha_rows records= HA_POS_ERROR;
196
 
  unsigned char **sort_keys= 0;
197
 
  internal::IO_CACHE tempfile;
198
 
  internal::IO_CACHE buffpek_pointers;
199
 
  internal::IO_CACHE *selected_records_file;
200
 
  internal::IO_CACHE *outfile;
201
 
  SortParam param;
202
 
  bool multi_byte_charset;
203
 
 
204
 
  /*
205
 
    Don't use table->sort in filesort as it is also used by
206
 
    QuickIndexMergeSelect. Work with a copy and put it back at the end
207
 
    when index_merge select has finished with it.
208
 
  */
209
 
  filesort_info table_sort(table->sort);
210
 
  table->sort.io_cache= NULL;
211
 
 
212
 
  TableList *tab= table->pos_in_table_list;
213
 
  Item_subselect *subselect= tab ? tab->containing_subselect() : 0;
214
 
 
215
 
  DRIZZLE_FILESORT_START(table->getShare()->getSchemaName(), table->getShare()->getTableName());
216
 
 
217
 
  /*
218
 
   Release InnoDB's adaptive hash index latch (if holding) before
219
 
   running a sort.
220
 
  */
221
 
  plugin::TransactionalStorageEngine::releaseTemporaryLatches(&getSession());
222
 
 
223
 
 
224
 
  outfile= table_sort.io_cache;
225
 
  assert(tempfile.buffer == 0);
226
 
  assert(buffpek_pointers.buffer == 0);
227
 
 
228
 
  param.sort_length= sortlength(sortorder, s_length, &multi_byte_charset);
229
 
  param.ref_length= table->cursor->ref_length;
230
 
 
231
 
  if (!(table->cursor->getEngine()->check_flag(HTON_BIT_FAST_KEY_READ)) && !sort_positions)
232
 
  {
233
 
    /*
234
 
      Get the descriptors of all fields whose values are appended
235
 
      to sorted fields and get its total length in param.spack_length.
236
 
    */
237
 
    param.addon_field= get_addon_fields(table->getFields(),
238
 
                                        param.sort_length,
239
 
                                        &param.addon_length);
240
 
  }
241
 
 
242
 
  table_sort.addon_buf= 0;
243
 
  table_sort.addon_length= param.addon_length;
244
 
  table_sort.addon_field= param.addon_field;
245
 
  table_sort.unpack= unpack_addon_fields;
246
 
  if (param.addon_field)
247
 
  {
248
 
    param.res_length= param.addon_length;
249
 
    if (!(table_sort.addon_buf= (unsigned char *) malloc(param.addon_length)))
250
 
    {
251
 
      goto err;
252
 
    }
253
 
  }
254
 
  else
255
 
  {
256
 
    param.res_length= param.ref_length;
257
 
    /*
258
 
      The reference to the record is considered
259
 
      as an additional sorted field
260
 
    */
261
 
    param.sort_length+= param.ref_length;
262
 
  }
263
 
  param.rec_length= param.sort_length+param.addon_length;
264
 
  param.max_rows= max_rows;
265
 
 
266
 
  if (select && select->quick)
267
 
  {
268
 
    getSession().status_var.filesort_range_count++;
269
 
  }
270
 
  else
271
 
  {
272
 
    getSession().status_var.filesort_scan_count++;
273
 
  }
274
 
#ifdef CAN_TRUST_RANGE
275
 
  if (select && select->quick && select->quick->records > 0L)
276
 
  {
277
 
    records= min((ha_rows) (select->quick->records*2+EXTRA_RECORDS*2),
278
 
                 table->cursor->stats.records)+EXTRA_RECORDS;
279
 
    selected_records_file=0;
280
 
  }
281
 
  else
282
 
#endif
283
 
  {
284
 
    records= table->cursor->estimate_rows_upper_bound();
285
 
    /*
286
 
      If number of records is not known, use as much of sort buffer
287
 
      as possible.
288
 
    */
289
 
    if (records == HA_POS_ERROR)
290
 
      records--;  // we use 'records+1' below.
291
 
    selected_records_file= 0;
292
 
  }
293
 
 
294
 
  if (multi_byte_charset && !(param.tmp_buffer= (char*) malloc(param.sort_length)))
295
 
  {
296
 
    goto err;
297
 
  }
298
 
 
299
 
  memavl= getSession().variables.sortbuff_size;
300
 
  min_sort_memory= max((uint32_t)MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
301
 
  while (memavl >= min_sort_memory)
302
 
  {
303
 
    uint32_t old_memavl;
304
 
    uint32_t keys= memavl/(param.rec_length+sizeof(char*));
305
 
    param.keys= (uint32_t) min(records+1, (ha_rows)keys);
306
 
 
307
 
    allocated_sort_memory= param.keys * param.rec_length;
308
 
    if (not global_sort_buffer.add(allocated_sort_memory))
309
 
    {
310
 
      my_error(ER_OUT_OF_GLOBAL_SORTMEMORY, MYF(ME_ERROR+ME_WAITTANG));
311
 
      goto err;
312
 
    }
313
 
 
314
 
    if ((table_sort.sort_keys=
315
 
         (unsigned char **) make_char_array((char **) table_sort.sort_keys,
316
 
                                            param.keys, param.rec_length)))
317
 
      break;
318
 
 
319
 
    global_sort_buffer.sub(allocated_sort_memory);
320
 
    old_memavl= memavl;
321
 
    if ((memavl= memavl/4*3) < min_sort_memory && old_memavl > min_sort_memory)
322
 
      memavl= min_sort_memory;
323
 
  }
324
 
  sort_keys= table_sort.sort_keys;
325
 
  if (memavl < min_sort_memory)
326
 
  {
327
 
    my_error(ER_OUT_OF_SORTMEMORY,MYF(ME_ERROR+ME_WAITTANG));
328
 
    goto err;
329
 
  }
330
 
 
331
 
  if (buffpek_pointers.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
332
 
  {
333
 
    goto err;
334
 
  }
335
 
 
336
 
  param.keys--;                         /* TODO: check why we do this */
337
 
  param.sort_form= table;
338
 
  param.end=(param.local_sortorder=sortorder)+s_length;
339
 
  if ((records= find_all_keys(&param,select,sort_keys, &buffpek_pointers,
340
 
                              &tempfile, selected_records_file)) == HA_POS_ERROR)
341
 
  {
342
 
    goto err;
343
 
  }
344
 
  maxbuffer= (uint32_t) (my_b_tell(&buffpek_pointers)/sizeof(*buffpek_inst));
345
 
 
346
 
  if (maxbuffer == 0)                   // The whole set is in memory
347
 
  {
348
 
    if (param.save_index(sort_keys,(uint32_t) records, &table_sort))
349
 
    {
350
 
      goto err;
351
 
    }
352
 
  }
353
 
  else
354
 
  {
355
 
    if (table_sort.buffpek && table_sort.buffpek_len < maxbuffer)
356
 
    {
357
 
      if (table_sort.buffpek)
358
 
        free(table_sort.buffpek);
359
 
      table_sort.buffpek = 0;
360
 
    }
361
 
    if (!(table_sort.buffpek=
362
 
          (unsigned char *) read_buffpek_from_file(&buffpek_pointers, maxbuffer, table_sort.buffpek)))
363
 
    {
364
 
      goto err;
365
 
    }
366
 
    buffpek_inst= (buffpek *) table_sort.buffpek;
367
 
    table_sort.buffpek_len= maxbuffer;
368
 
    buffpek_pointers.close_cached_file();
369
 
        /* Open cached file if it isn't open */
370
 
    if (! my_b_inited(outfile) && outfile->open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME)))
371
 
    {
372
 
      goto err;
373
 
    }
374
 
 
375
 
    if (outfile->reinit_io_cache(internal::WRITE_CACHE,0L,0,0))
376
 
    {
377
 
      goto err;
378
 
    }
379
 
 
380
 
    /*
381
 
      Use also the space previously used by string pointers in sort_buffer
382
 
      for temporary key storage.
383
 
    */
384
 
    param.keys=((param.keys*(param.rec_length+sizeof(char*))) / param.rec_length-1);
385
 
 
386
 
    maxbuffer--;                                // Offset from 0
387
 
    if (merge_many_buff(&param,(unsigned char*) sort_keys,buffpek_inst,&maxbuffer, &tempfile))
388
 
    {
389
 
      goto err;
390
 
    }
391
 
 
392
 
    if (flush_io_cache(&tempfile) || tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
393
 
    {
394
 
      goto err;
395
 
    }
396
 
 
397
 
    if (merge_index(&param,(unsigned char*) sort_keys,buffpek_inst,maxbuffer,&tempfile, outfile))
398
 
    {
399
 
      goto err;
400
 
    }
401
 
  }
402
 
 
403
 
  if (records > param.max_rows)
404
 
  {
405
 
    records= param.max_rows;
406
 
  }
407
 
  error =0;
408
 
 
409
 
 err:
410
 
  if (not subselect || not subselect->is_uncacheable())
411
 
  {
412
 
    free(sort_keys);
413
 
    table_sort.sort_keys= 0;
414
 
    free(buffpek_inst);
415
 
    table_sort.buffpek= 0;
416
 
    table_sort.buffpek_len= 0;
417
 
  }
418
 
 
419
 
  tempfile.close_cached_file();
420
 
  buffpek_pointers.close_cached_file();
421
 
 
422
 
  if (my_b_inited(outfile))
423
 
  {
424
 
    if (flush_io_cache(outfile))
425
 
    {
426
 
      error=1;
427
 
    }
428
 
    {
429
 
      internal::my_off_t save_pos= outfile->pos_in_file;
430
 
      /* For following reads */
431
 
      if (outfile->reinit_io_cache(internal::READ_CACHE,0L,0,0))
432
 
      {
433
 
        error=1;
434
 
      }
435
 
      outfile->end_of_file=save_pos;
436
 
    }
437
 
  }
438
 
 
439
 
  if (error)
440
 
  {
441
 
    my_message(ER_FILSORT_ABORT, ER(ER_FILSORT_ABORT),
442
 
               MYF(ME_ERROR+ME_WAITTANG));
443
 
  }
444
 
  else
445
 
  {
446
 
    getSession().status_var.filesort_rows+= (uint32_t) records;
447
 
  }
448
 
  examined_rows= param.examined_rows;
449
 
  global_sort_buffer.sub(allocated_sort_memory);
450
 
  table->sort= table_sort;
451
 
  DRIZZLE_FILESORT_DONE(error, records);
452
 
  return (error ? HA_POS_ERROR : records);
453
 
} /* filesort */
454
 
 
455
 
/** Make a array of string pointers. */
456
 
 
457
 
static char **make_char_array(char **old_pos, register uint32_t fields,
458
 
                              uint32_t length)
459
 
{
460
 
  register char **pos;
461
 
  char *char_pos;
462
 
 
463
 
  if (old_pos ||
464
 
      (old_pos= (char**) malloc((uint32_t) fields*(length+sizeof(char*)))))
465
 
  {
466
 
    pos=old_pos; char_pos=((char*) (pos+fields)) -length;
467
 
    while (fields--) *(pos++) = (char_pos+= length);
468
 
  }
469
 
 
470
 
  return(old_pos);
471
 
} /* make_char_array */
472
 
 
473
 
 
474
 
/** Read 'count' number of buffer pointers into memory. */
475
 
 
476
 
static unsigned char *read_buffpek_from_file(internal::IO_CACHE *buffpek_pointers, uint32_t count,
477
 
                                     unsigned char *buf)
478
 
{
479
 
  uint32_t length= sizeof(buffpek)*count;
480
 
  unsigned char *tmp= buf;
481
 
  if (count > UINT_MAX/sizeof(buffpek))
482
 
    return 0; /* sizeof(buffpek)*count will overflow */
483
 
  if (!tmp)
484
 
    tmp= (unsigned char *)malloc(length);
485
 
  if (tmp)
486
 
  {
487
 
    if (buffpek_pointers->reinit_io_cache(internal::READ_CACHE,0L,0,0) ||
488
 
        my_b_read(buffpek_pointers, (unsigned char*) tmp, length))
489
 
    {
490
 
      free((char*) tmp);
491
 
      tmp=0;
492
 
    }
493
 
  }
494
 
  return(tmp);
495
 
}
496
 
 
497
 
 
498
 
/**
499
 
  Search after sort_keys and write them into tempfile.
500
 
  All produced sequences are guaranteed to be non-empty.
501
 
 
502
 
  @param param             Sorting parameter
503
 
  @param select            Use this to get source data
504
 
  @param sort_keys         Array of pointers to sort key + addon buffers.
505
 
  @param buffpek_pointers  File to write buffpeks describing sorted segments
506
 
                           in tempfile.
507
 
  @param tempfile          File to write sorted sequences of sortkeys to.
508
 
  @param indexfile         If !NULL, use it for source data (contains rowids)
509
 
 
510
 
  @note
511
 
    Basic idea:
512
 
    @verbatim
513
 
     while (get_next_sortkey())
514
 
     {
515
 
       if (no free space in sort_keys buffers)
516
 
       {
517
 
         sort sort_keys buffer;
518
 
         dump sorted sequence to 'tempfile';
519
 
         dump buffpek describing sequence location into 'buffpek_pointers';
520
 
       }
521
 
       put sort key into 'sort_keys';
522
 
     }
523
 
     if (sort_keys has some elements && dumped at least once)
524
 
       sort-dump-dump as above;
525
 
     else
526
 
       don't sort, leave sort_keys array to be sorted by caller.
527
 
  @endverbatim
528
 
 
529
 
  @retval
530
 
    Number of records written on success.
531
 
  @retval
532
 
    HA_POS_ERROR on error.
533
 
*/
534
 
 
535
 
ha_rows FileSort::find_all_keys(SortParam *param, 
536
 
                                optimizer::SqlSelect *select,
537
 
                                unsigned char **sort_keys,
538
 
                                internal::IO_CACHE *buffpek_pointers,
539
 
                                internal::IO_CACHE *tempfile, internal::IO_CACHE *indexfile)
540
 
{
541
 
  int error,flag,quick_select;
542
 
  uint32_t idx,indexpos,ref_length;
543
 
  unsigned char *ref_pos,*next_pos,ref_buff[MAX_REFLENGTH];
544
 
  internal::my_off_t record;
545
 
  Table *sort_form;
546
 
  volatile Session::killed_state_t *killed= getSession().getKilledPtr();
547
 
  Cursor *file;
548
 
  boost::dynamic_bitset<> *save_read_set= NULL;
549
 
  boost::dynamic_bitset<> *save_write_set= NULL;
550
 
 
551
 
  idx=indexpos=0;
552
 
  error=quick_select=0;
553
 
  sort_form=param->sort_form;
554
 
  file= sort_form->cursor;
555
 
  ref_length=param->ref_length;
556
 
  ref_pos= ref_buff;
557
 
  quick_select=select && select->quick;
558
 
  record=0;
559
 
  flag= ((!indexfile && ! file->isOrdered())
560
 
         || quick_select);
561
 
  if (indexfile || flag)
562
 
    ref_pos= &file->ref[0];
563
 
  next_pos=ref_pos;
564
 
  if (! indexfile && ! quick_select)
565
 
  {
566
 
    next_pos=(unsigned char*) 0;                        /* Find records in sequence */
567
 
    file->startTableScan(1);
568
 
    file->extra_opt(HA_EXTRA_CACHE, getSession().variables.read_buff_size);
569
 
  }
570
 
 
571
 
  ReadRecord read_record_info;
572
 
  if (quick_select)
573
 
  {
574
 
    if (select->quick->reset())
575
 
      return(HA_POS_ERROR);
576
 
 
577
 
    read_record_info.init_read_record(&getSession(), select->quick->head, select, 1, 1);
578
 
  }
579
 
 
580
 
  /* Remember original bitmaps */
581
 
  save_read_set=  sort_form->read_set;
582
 
  save_write_set= sort_form->write_set;
583
 
  /* Set up temporary column read map for columns used by sort */
584
 
  sort_form->tmp_set.reset();
585
 
  /* Temporary set for register_used_fields and register_field_in_read_map */
586
 
  sort_form->read_set= &sort_form->tmp_set;
587
 
  param->register_used_fields();
588
 
  if (select && select->cond)
589
 
    select->cond->walk(&Item::register_field_in_read_map, 1,
590
 
                       (unsigned char*) sort_form);
591
 
  sort_form->column_bitmaps_set(sort_form->tmp_set, sort_form->tmp_set);
592
 
 
593
 
  for (;;)
594
 
  {
595
 
    if (quick_select)
596
 
    {
597
 
      if ((error= read_record_info.read_record(&read_record_info)))
598
 
      {
599
 
        error= HA_ERR_END_OF_FILE;
600
 
        break;
601
 
      }
602
 
      file->position(sort_form->getInsertRecord());
603
 
    }
604
 
    else                                        /* Not quick-select */
605
 
    {
606
 
      if (indexfile)
607
 
      {
608
 
        if (my_b_read(indexfile,(unsigned char*) ref_pos,ref_length))
609
 
        {
610
 
          error= errno ? errno : -1;            /* Abort */
611
 
          break;
612
 
        }
613
 
        error=file->rnd_pos(sort_form->getInsertRecord(),next_pos);
614
 
      }
615
 
      else
616
 
      {
617
 
        error=file->rnd_next(sort_form->getInsertRecord());
618
 
 
619
 
        if (!flag)
620
 
        {
621
 
          internal::my_store_ptr(ref_pos,ref_length,record); // Position to row
622
 
          record+= sort_form->getShare()->db_record_offset;
623
 
        }
624
 
        else if (!error)
625
 
          file->position(sort_form->getInsertRecord());
626
 
      }
627
 
      if (error && error != HA_ERR_RECORD_DELETED)
628
 
        break;
629
 
    }
630
 
 
631
 
    if (*killed)
632
 
    {
633
 
      if (!indexfile && !quick_select)
634
 
      {
635
 
        (void) file->extra(HA_EXTRA_NO_CACHE);
636
 
        file->endTableScan();
637
 
      }
638
 
      return(HA_POS_ERROR);
639
 
    }
640
 
    if (error == 0)
641
 
      param->examined_rows++;
642
 
    if (error == 0 && (!select || select->skip_record() == 0))
643
 
    {
644
 
      if (idx == param->keys)
645
 
      {
646
 
        if (param->write_keys(sort_keys, idx, buffpek_pointers, tempfile))
647
 
          return(HA_POS_ERROR);
648
 
        idx=0;
649
 
        indexpos++;
650
 
      }
651
 
      param->make_sortkey(sort_keys[idx++], ref_pos);
652
 
    }
653
 
    else
654
 
    {
655
 
      file->unlock_row();
656
 
    }
657
 
 
658
 
    /* It does not make sense to read more keys in case of a fatal error */
659
 
    if (getSession().is_error())
660
 
      break;
661
 
  }
662
 
  if (quick_select)
663
 
  {
664
 
    /*
665
 
      index_merge quick select uses table->sort when retrieving rows, so free
666
 
      resoures it has allocated.
667
 
    */
668
 
    read_record_info.end_read_record();
669
 
  }
670
 
  else
671
 
  {
672
 
    (void) file->extra(HA_EXTRA_NO_CACHE);      /* End cacheing of records */
673
 
    if (!next_pos)
674
 
      file->endTableScan();
675
 
  }
676
 
 
677
 
  if (getSession().is_error())
678
 
    return(HA_POS_ERROR);
679
 
 
680
 
  /* Signal we should use orignal column read and write maps */
681
 
  sort_form->column_bitmaps_set(*save_read_set, *save_write_set);
682
 
 
683
 
  if (error != HA_ERR_END_OF_FILE)
684
 
  {
685
 
    sort_form->print_error(error,MYF(ME_ERROR | ME_WAITTANG));
686
 
    return(HA_POS_ERROR);
687
 
  }
688
 
 
689
 
  if (indexpos && idx && param->write_keys(sort_keys,idx,buffpek_pointers,tempfile))
690
 
  {
691
 
    return(HA_POS_ERROR);
692
 
  }
693
 
 
694
 
  return(my_b_inited(tempfile) ?
695
 
              (ha_rows) (my_b_tell(tempfile)/param->rec_length) :
696
 
              idx);
697
 
} /* find_all_keys */
698
 
 
699
 
 
700
 
/**
701
 
  @details
702
 
  Sort the buffer and write:
703
 
  -# the sorted sequence to tempfile
704
 
  -# a buffpek describing the sorted sequence position to buffpek_pointers
705
 
 
706
 
    (was: Skriver en buffert med nycklar till filen)
707
 
 
708
 
  @param param             Sort parameters
709
 
  @param sort_keys         Array of pointers to keys to sort
710
 
  @param count             Number of elements in sort_keys array
711
 
  @param buffpek_pointers  One 'buffpek' struct will be written into this file.
712
 
                           The buffpek::{file_pos, count} will indicate where
713
 
                           the sorted data was stored.
714
 
  @param tempfile          The sorted sequence will be written into this file.
715
 
 
716
 
  @retval
717
 
    0 OK
718
 
  @retval
719
 
    1 Error
720
 
*/
721
 
 
722
 
int SortParam::write_keys(register unsigned char **sort_keys, uint32_t count,
723
 
                          internal::IO_CACHE *buffpek_pointers, internal::IO_CACHE *tempfile)
724
 
{
725
 
  buffpek buffpek;
726
 
 
727
 
  internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
728
 
  if (!my_b_inited(tempfile) &&
729
 
      tempfile->open_cached_file(drizzle_tmpdir.c_str(), TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
730
 
  {
731
 
    return 1;
732
 
  }
733
 
  /* check we won't have more buffpeks than we can possibly keep in memory */
734
 
  if (my_b_tell(buffpek_pointers) + sizeof(buffpek) > (uint64_t)UINT_MAX)
735
 
  {
736
 
    return 1;
737
 
  }
738
 
 
739
 
  buffpek.file_pos= my_b_tell(tempfile);
740
 
  if ((ha_rows) count > max_rows)
741
 
    count=(uint32_t) max_rows;
742
 
 
743
 
  buffpek.count=(ha_rows) count;
744
 
 
745
 
  for (unsigned char **ptr= sort_keys + count ; sort_keys != ptr ; sort_keys++)
746
 
  {
747
 
    if (my_b_write(tempfile, (unsigned char*) *sort_keys, (uint32_t) rec_length))
748
 
    {
749
 
      return 1;
750
 
    }
751
 
  }
752
 
 
753
 
  if (my_b_write(buffpek_pointers, (unsigned char*) &buffpek, sizeof(buffpek)))
754
 
  {
755
 
    return 1;
756
 
  }
757
 
 
758
 
  return 0;
759
 
} /* write_keys */
760
 
 
761
 
 
762
 
/**
763
 
  Store length as suffix in high-byte-first order.
764
 
*/
765
 
 
766
 
static inline void store_length(unsigned char *to, uint32_t length, uint32_t pack_length)
767
 
{
768
 
  switch (pack_length) {
769
 
  case 1:
770
 
    *to= (unsigned char) length;
771
 
    break;
772
 
  case 2:
773
 
    mi_int2store(to, length);
774
 
    break;
775
 
  case 3:
776
 
    mi_int3store(to, length);
777
 
    break;
778
 
  default:
779
 
    mi_int4store(to, length);
780
 
    break;
781
 
  }
782
 
}
783
 
 
784
 
 
785
 
/** Make a sort-key from record. */
786
 
 
787
 
void SortParam::make_sortkey(register unsigned char *to, unsigned char *ref_pos)
788
 
{
789
 
  Field *field;
790
 
  SortField *sort_field;
791
 
  size_t length;
792
 
 
793
 
  for (sort_field= local_sortorder ;
794
 
       sort_field != end ;
795
 
       sort_field++)
796
 
  {
797
 
    bool maybe_null=0;
798
 
    if ((field=sort_field->field))
799
 
    {                                           // Field
800
 
      if (field->maybe_null())
801
 
      {
802
 
        if (field->is_null())
803
 
        {
804
 
          if (sort_field->reverse)
805
 
            memset(to, 255, sort_field->length+1);
806
 
          else
807
 
            memset(to, 0, sort_field->length+1);
808
 
          to+= sort_field->length+1;
809
 
          continue;
810
 
        }
811
 
        else
812
 
          *to++=1;
813
 
      }
814
 
      field->sort_string(to, sort_field->length);
815
 
    }
816
 
    else
817
 
    {                                           // Item
818
 
      Item *item=sort_field->item;
819
 
      maybe_null= item->maybe_null;
820
 
 
821
 
      switch (sort_field->result_type) {
822
 
      case STRING_RESULT:
823
 
        {
824
 
          const CHARSET_INFO * const cs=item->collation.collation;
825
 
          char fill_char= ((cs->state & MY_CS_BINSORT) ? (char) 0 : ' ');
826
 
          int diff;
827
 
          uint32_t sort_field_length;
828
 
 
829
 
          if (maybe_null)
830
 
            *to++=1;
831
 
          /* All item->str() to use some extra byte for end null.. */
832
 
          String tmp((char*) to,sort_field->length+4,cs);
833
 
          String *res= item->str_result(&tmp);
834
 
          if (!res)
835
 
          {
836
 
            if (maybe_null)
837
 
              memset(to-1, 0, sort_field->length+1);
838
 
            else
839
 
            {
840
 
              /*
841
 
                This should only happen during extreme conditions if we run out
842
 
                of memory or have an item marked not null when it can be null.
843
 
                This code is here mainly to avoid a hard crash in this case.
844
 
              */
845
 
              assert(0);
846
 
              memset(to, 0, sort_field->length);        // Avoid crash
847
 
            }
848
 
            break;
849
 
          }
850
 
          length= res->length();
851
 
          sort_field_length= sort_field->length - sort_field->suffix_length;
852
 
          diff=(int) (sort_field_length - length);
853
 
          if (diff < 0)
854
 
          {
855
 
            diff=0;
856
 
            length= sort_field_length;
857
 
          }
858
 
          if (sort_field->suffix_length)
859
 
          {
860
 
            /* Store length last in result_string */
861
 
            store_length(to + sort_field_length, length,
862
 
                         sort_field->suffix_length);
863
 
          }
864
 
          if (sort_field->need_strxnfrm)
865
 
          {
866
 
            char *from=(char*) res->ptr();
867
 
            uint32_t tmp_length;
868
 
            if ((unsigned char*) from == to)
869
 
            {
870
 
              set_if_smaller(length,sort_field->length);
871
 
              memcpy(tmp_buffer,from,length);
872
 
              from= tmp_buffer;
873
 
            }
874
 
            tmp_length= my_strnxfrm(cs,to,sort_field->length,
875
 
                                    (unsigned char*) from, length);
876
 
            assert(tmp_length == sort_field->length);
877
 
          }
878
 
          else
879
 
          {
880
 
            my_strnxfrm(cs,(unsigned char*)to,length,(const unsigned char*)res->ptr(),length);
881
 
            cs->cset->fill(cs, (char *)to+length,diff,fill_char);
882
 
          }
883
 
          break;
884
 
        }
885
 
      case INT_RESULT:
886
 
        {
887
 
          int64_t value= item->val_int_result();
888
 
          if (maybe_null)
889
 
          {
890
 
            *to++=1;
891
 
            if (item->null_value)
892
 
            {
893
 
              if (maybe_null)
894
 
                memset(to-1, 0, sort_field->length+1);
895
 
              else
896
 
              {
897
 
                memset(to, 0, sort_field->length);
898
 
              }
899
 
              break;
900
 
            }
901
 
          }
902
 
          to[7]= (unsigned char) value;
903
 
          to[6]= (unsigned char) (value >> 8);
904
 
          to[5]= (unsigned char) (value >> 16);
905
 
          to[4]= (unsigned char) (value >> 24);
906
 
          to[3]= (unsigned char) (value >> 32);
907
 
          to[2]= (unsigned char) (value >> 40);
908
 
          to[1]= (unsigned char) (value >> 48);
909
 
          if (item->unsigned_flag)                    /* Fix sign */
910
 
            to[0]= (unsigned char) (value >> 56);
911
 
          else
912
 
            to[0]= (unsigned char) (value >> 56) ^ 128; /* Reverse signbit */
913
 
          break;
914
 
        }
915
 
      case DECIMAL_RESULT:
916
 
        {
917
 
          my_decimal dec_buf, *dec_val= item->val_decimal_result(&dec_buf);
918
 
          if (maybe_null)
919
 
          {
920
 
            if (item->null_value)
921
 
            {
922
 
              memset(to, 0, sort_field->length+1);
923
 
              to++;
924
 
              break;
925
 
            }
926
 
            *to++=1;
927
 
          }
928
 
          my_decimal2binary(E_DEC_FATAL_ERROR, dec_val, to,
929
 
                            item->max_length - (item->decimals ? 1:0),
930
 
                            item->decimals);
931
 
          break;
932
 
        }
933
 
      case REAL_RESULT:
934
 
        {
935
 
          double value= item->val_result();
936
 
          if (maybe_null)
937
 
          {
938
 
            if (item->null_value)
939
 
            {
940
 
              memset(to, 0, sort_field->length+1);
941
 
              to++;
942
 
              break;
943
 
            }
944
 
            *to++=1;
945
 
          }
946
 
          change_double_for_sort(value,(unsigned char*) to);
947
 
          break;
948
 
        }
949
 
      case ROW_RESULT:
950
 
      default:
951
 
        // This case should never be choosen
952
 
        assert(0);
953
 
        break;
954
 
      }
955
 
    }
956
 
 
957
 
    if (sort_field->reverse)
958
 
    {                                                   /* Revers key */
959
 
      if (maybe_null)
960
 
        to[-1]= ~to[-1];
961
 
      length=sort_field->length;
962
 
      while (length--)
963
 
      {
964
 
        *to = (unsigned char) (~ *to);
965
 
        to++;
966
 
      }
967
 
    }
968
 
    else
969
 
    {
970
 
      to+= sort_field->length;
971
 
    }
972
 
  }
973
 
 
974
 
  if (addon_field)
975
 
  {
976
 
    /*
977
 
      Save field values appended to sorted fields.
978
 
      First null bit indicators are appended then field values follow.
979
 
      In this implementation we use fixed layout for field values -
980
 
      the same for all records.
981
 
    */
982
 
    sort_addon_field *addonf= addon_field;
983
 
    unsigned char *nulls= to;
984
 
    assert(addonf != 0);
985
 
    memset(nulls, 0, addonf->offset);
986
 
    to+= addonf->offset;
987
 
    for ( ; (field= addonf->field) ; addonf++)
988
 
    {
989
 
      if (addonf->null_bit && field->is_null())
990
 
      {
991
 
        nulls[addonf->null_offset]|= addonf->null_bit;
992
 
#ifdef HAVE_VALGRIND
993
 
        memset(to, 0, addonf->length);
994
 
#endif
995
 
      }
996
 
      else
997
 
      {
998
 
#ifdef HAVE_VALGRIND
999
 
        unsigned char *end= field->pack(to, field->ptr);
1000
 
        uint32_t local_length= (uint32_t) ((to + addonf->length) - end);
1001
 
        assert((int) local_length >= 0);
1002
 
        if (local_length)
1003
 
          memset(end, 0, local_length);
1004
 
#else
1005
 
        (void) field->pack(to, field->ptr);
1006
 
#endif
1007
 
      }
1008
 
      to+= addonf->length;
1009
 
    }
1010
 
  }
1011
 
  else
1012
 
  {
1013
 
    /* Save filepos last */
1014
 
    memcpy(to, ref_pos, (size_t) ref_length);
1015
 
  }
1016
 
}
1017
 
 
1018
 
 
1019
 
/*
1020
 
  Register fields used by sorting in the sorted table's read set
1021
 
*/
1022
 
 
1023
 
void SortParam::register_used_fields()
1024
 
{
1025
 
  SortField *sort_field;
1026
 
  Table *table= sort_form;
1027
 
 
1028
 
  for (sort_field= local_sortorder ;
1029
 
       sort_field != end ;
1030
 
       sort_field++)
1031
 
  {
1032
 
    Field *field;
1033
 
    if ((field= sort_field->field))
1034
 
    {
1035
 
      if (field->getTable() == table)
1036
 
        table->setReadSet(field->position());
1037
 
    }
1038
 
    else
1039
 
    {                                           // Item
1040
 
      sort_field->item->walk(&Item::register_field_in_read_map, 1,
1041
 
                             (unsigned char *) table);
1042
 
    }
1043
 
  }
1044
 
 
1045
 
  if (addon_field)
1046
 
  {
1047
 
    sort_addon_field *addonf= addon_field;
1048
 
    Field *field;
1049
 
    for ( ; (field= addonf->field) ; addonf++)
1050
 
      table->setReadSet(field->position());
1051
 
  }
1052
 
  else
1053
 
  {
1054
 
    /* Save filepos last */
1055
 
    table->prepare_for_position();
1056
 
  }
1057
 
}
1058
 
 
1059
 
 
1060
 
bool SortParam::save_index(unsigned char **sort_keys, uint32_t count, filesort_info *table_sort)
1061
 
{
1062
 
  uint32_t offset;
1063
 
  unsigned char *to;
1064
 
 
1065
 
  internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
1066
 
  offset= rec_length - res_length;
1067
 
 
1068
 
  if ((ha_rows) count > max_rows)
1069
 
    count=(uint32_t) max_rows;
1070
 
 
1071
 
  if (!(to= table_sort->record_pointers= (unsigned char*) malloc(res_length*count)))
1072
 
    return true;
1073
 
 
1074
 
  for (unsigned char **end_ptr= sort_keys+count ; sort_keys != end_ptr ; sort_keys++)
1075
 
  {
1076
 
    memcpy(to, *sort_keys+offset, res_length);
1077
 
    to+= res_length;
1078
 
  }
1079
 
 
1080
 
  return false;
1081
 
}
1082
 
 
1083
 
 
1084
 
/** Merge buffers to make < MERGEBUFF2 buffers. */
1085
 
 
1086
 
int FileSort::merge_many_buff(SortParam *param, unsigned char *sort_buffer,
1087
 
                              buffpek *buffpek_inst, uint32_t *maxbuffer, internal::IO_CACHE *t_file)
1088
 
{
1089
 
  internal::IO_CACHE t_file2,*from_file,*to_file,*temp;
1090
 
  buffpek *lastbuff;
1091
 
 
1092
 
  if (*maxbuffer < MERGEBUFF2)
1093
 
    return 0;
1094
 
  if (flush_io_cache(t_file) ||
1095
 
      t_file2.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX,DISK_BUFFER_SIZE, MYF(MY_WME)))
1096
 
  {
1097
 
    return 1;
1098
 
  }
1099
 
 
1100
 
  from_file= t_file ; to_file= &t_file2;
1101
 
  while (*maxbuffer >= MERGEBUFF2)
1102
 
  {
1103
 
    register uint32_t i;
1104
 
 
1105
 
    if (from_file->reinit_io_cache(internal::READ_CACHE,0L,0,0))
1106
 
    {
1107
 
      break;
1108
 
    }
1109
 
 
1110
 
    if (to_file->reinit_io_cache(internal::WRITE_CACHE,0L,0,0))
1111
 
    {
1112
 
      break;
1113
 
    }
1114
 
 
1115
 
    lastbuff=buffpek_inst;
1116
 
    for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
1117
 
    {
1118
 
      if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
1119
 
                        buffpek_inst+i,buffpek_inst+i+MERGEBUFF-1,0))
1120
 
      {
1121
 
        goto cleanup;
1122
 
      }
1123
 
    }
1124
 
 
1125
 
    if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
1126
 
                      buffpek_inst+i,buffpek_inst+ *maxbuffer,0))
1127
 
    {
1128
 
      break;
1129
 
    }
1130
 
 
1131
 
    if (flush_io_cache(to_file))
1132
 
    {
1133
 
      break;
1134
 
    }
1135
 
 
1136
 
    temp=from_file; from_file=to_file; to_file=temp;
1137
 
    from_file->setup_io_cache();
1138
 
    to_file->setup_io_cache();
1139
 
    *maxbuffer= (uint32_t) (lastbuff-buffpek_inst)-1;
1140
 
  }
1141
 
 
1142
 
cleanup:
1143
 
  to_file->close_cached_file();                 // This holds old result
1144
 
  if (to_file == t_file)
1145
 
  {
1146
 
    *t_file=t_file2;                            // Copy result file
1147
 
    t_file->setup_io_cache();
1148
 
  }
1149
 
 
1150
 
  return(*maxbuffer >= MERGEBUFF2);     /* Return 1 if interrupted */
1151
 
} /* merge_many_buff */
1152
 
 
1153
 
 
1154
 
/**
1155
 
  Read data to buffer.
1156
 
 
1157
 
  @retval
1158
 
    (uint32_t)-1 if something goes wrong
1159
 
*/
1160
 
 
1161
 
uint32_t FileSort::read_to_buffer(internal::IO_CACHE *fromfile, buffpek *buffpek_inst, uint32_t rec_length)
1162
 
{
1163
 
  register uint32_t count;
1164
 
  uint32_t length;
1165
 
 
1166
 
  if ((count= (uint32_t) min((ha_rows) buffpek_inst->max_keys,buffpek_inst->count)))
1167
 
  {
1168
 
    if (pread(fromfile->file,(unsigned char*) buffpek_inst->base, (length= rec_length*count),buffpek_inst->file_pos) == 0)
1169
 
      return((uint32_t) -1);
1170
 
 
1171
 
    buffpek_inst->key= buffpek_inst->base;
1172
 
    buffpek_inst->file_pos+= length;                    /* New filepos */
1173
 
    buffpek_inst->count-= count;
1174
 
    buffpek_inst->mem_count= count;
1175
 
  }
1176
 
  return (count*rec_length);
1177
 
} /* read_to_buffer */
1178
 
 
1179
 
 
1180
 
class compare_functor
1181
 
{
1182
 
  qsort2_cmp key_compare;
1183
 
  void *key_compare_arg;
1184
 
 
1185
 
  public:
1186
 
  compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg) :
1187
 
    key_compare(in_key_compare),
1188
 
    key_compare_arg(in_compare_arg)
1189
 
  { }
1190
 
  
1191
 
  inline bool operator()(const buffpek *i, const buffpek *j) const
1192
 
  {
1193
 
    int val= key_compare(key_compare_arg, &i->key, &j->key);
1194
 
 
1195
 
    return (val >= 0);
1196
 
  }
1197
 
};
1198
 
 
1199
 
 
1200
 
/**
1201
 
  Merge buffers to one buffer.
1202
 
 
1203
 
  @param param        Sort parameter
1204
 
  @param from_file    File with source data (buffpeks point to this file)
1205
 
  @param to_file      File to write the sorted result data.
1206
 
  @param sort_buffer  Buffer for data to store up to MERGEBUFF2 sort keys.
1207
 
  @param lastbuff     OUT Store here buffpek describing data written to to_file
1208
 
  @param Fb           First element in source buffpeks array
1209
 
  @param Tb           Last element in source buffpeks array
1210
 
  @param flag
1211
 
 
1212
 
  @retval
1213
 
    0      OK
1214
 
  @retval
1215
 
    other  error
1216
 
*/
1217
 
 
1218
 
int FileSort::merge_buffers(SortParam *param, internal::IO_CACHE *from_file,
1219
 
                            internal::IO_CACHE *to_file, unsigned char *sort_buffer,
1220
 
                            buffpek *lastbuff, buffpek *Fb, buffpek *Tb,
1221
 
                            int flag)
1222
 
{
1223
 
  int error;
1224
 
  uint32_t rec_length,res_length,offset;
1225
 
  size_t sort_length;
1226
 
  uint32_t maxcount;
1227
 
  ha_rows max_rows,org_max_rows;
1228
 
  internal::my_off_t to_start_filepos;
1229
 
  unsigned char *strpos;
1230
 
  buffpek *buffpek_inst;
1231
 
  qsort2_cmp cmp;
1232
 
  void *first_cmp_arg;
1233
 
  volatile Session::killed_state_t *killed= getSession().getKilledPtr();
1234
 
  Session::killed_state_t not_killable;
1235
 
 
1236
 
  getSession().status_var.filesort_merge_passes++;
1237
 
  if (param->not_killable)
1238
 
  {
1239
 
    killed= &not_killable;
1240
 
    not_killable= Session::NOT_KILLED;
1241
 
  }
1242
 
 
1243
 
  error=0;
1244
 
  rec_length= param->rec_length;
1245
 
  res_length= param->res_length;
1246
 
  sort_length= param->sort_length;
1247
 
  offset= rec_length-res_length;
1248
 
  maxcount= (uint32_t) (param->keys/((uint32_t) (Tb-Fb) +1));
1249
 
  to_start_filepos= my_b_tell(to_file);
1250
 
  strpos= (unsigned char*) sort_buffer;
1251
 
  org_max_rows=max_rows= param->max_rows;
1252
 
 
1253
 
  /* The following will fire if there is not enough space in sort_buffer */
1254
 
  assert(maxcount!=0);
1255
 
 
1256
 
  if (param->unique_buff)
1257
 
  {
1258
 
    cmp= param->compare;
1259
 
    first_cmp_arg= (void *) &param->cmp_context;
1260
 
  }
1261
 
  else
1262
 
  {
1263
 
    cmp= internal::get_ptr_compare(sort_length);
1264
 
    first_cmp_arg= (void*) &sort_length;
1265
 
  }
1266
 
  priority_queue<buffpek *, vector<buffpek *>, compare_functor >
1267
 
    queue(compare_functor(cmp, first_cmp_arg));
1268
 
  for (buffpek_inst= Fb ; buffpek_inst <= Tb ; buffpek_inst++)
1269
 
  {
1270
 
    buffpek_inst->base= strpos;
1271
 
    buffpek_inst->max_keys= maxcount;
1272
 
    strpos+= (uint32_t) (error= (int) read_to_buffer(from_file, buffpek_inst,
1273
 
                                                                         rec_length));
1274
 
    if (error == -1)
1275
 
      return -1;
1276
 
 
1277
 
    buffpek_inst->max_keys= buffpek_inst->mem_count;    // If less data in buffers than expected
1278
 
    queue.push(buffpek_inst);
1279
 
  }
1280
 
 
1281
 
  if (param->unique_buff)
1282
 
  {
1283
 
    /*
1284
 
       Called by Unique::get()
1285
 
       Copy the first argument to param->unique_buff for unique removal.
1286
 
       Store it also in 'to_file'.
1287
 
 
1288
 
       This is safe as we know that there is always more than one element
1289
 
       in each block to merge (This is guaranteed by the Unique:: algorithm
1290
 
    */
1291
 
    buffpek_inst= queue.top();
1292
 
    memcpy(param->unique_buff, buffpek_inst->key, rec_length);
1293
 
    if (my_b_write(to_file, (unsigned char*) buffpek_inst->key, rec_length))
1294
 
    {
1295
 
      return 1;
1296
 
    }
1297
 
    buffpek_inst->key+= rec_length;
1298
 
    buffpek_inst->mem_count--;
1299
 
    if (!--max_rows)
1300
 
    {
1301
 
      error= 0;
1302
 
      goto end;
1303
 
    }
1304
 
    /* Top element has been used */
1305
 
    queue.pop();
1306
 
    queue.push(buffpek_inst);
1307
 
  }
1308
 
  else
1309
 
  {
1310
 
    cmp= 0;                                        // Not unique
1311
 
  }
1312
 
 
1313
 
  while (queue.size() > 1)
1314
 
  {
1315
 
    if (*killed)
1316
 
    {
1317
 
      return 1;
1318
 
    }
1319
 
    for (;;)
1320
 
    {
1321
 
      buffpek_inst= queue.top();
1322
 
      if (cmp)                                        // Remove duplicates
1323
 
      {
1324
 
        if (!(*cmp)(first_cmp_arg, &(param->unique_buff),
1325
 
                    (unsigned char**) &buffpek_inst->key))
1326
 
              goto skip_duplicate;
1327
 
            memcpy(param->unique_buff, buffpek_inst->key, rec_length);
1328
 
      }
1329
 
      if (flag == 0)
1330
 
      {
1331
 
        if (my_b_write(to_file,(unsigned char*) buffpek_inst->key, rec_length))
1332
 
        {
1333
 
          return 1;
1334
 
        }
1335
 
      }
1336
 
      else
1337
 
      {
1338
 
        if (my_b_write(to_file, (unsigned char*) buffpek_inst->key+offset, res_length))
1339
 
        {
1340
 
          return 1;
1341
 
        }
1342
 
      }
1343
 
      if (!--max_rows)
1344
 
      {
1345
 
        error= 0;
1346
 
        goto end;
1347
 
      }
1348
 
 
1349
 
    skip_duplicate:
1350
 
      buffpek_inst->key+= rec_length;
1351
 
      if (! --buffpek_inst->mem_count)
1352
 
      {
1353
 
        if (!(error= (int) read_to_buffer(from_file,buffpek_inst,
1354
 
                                          rec_length)))
1355
 
        {
1356
 
          queue.pop();
1357
 
          break;                        /* One buffer have been removed */
1358
 
        }
1359
 
        else if (error == -1)
1360
 
        {
1361
 
          return -1;
1362
 
        }
1363
 
      }
1364
 
      /* Top element has been replaced */
1365
 
      queue.pop();
1366
 
      queue.push(buffpek_inst);
1367
 
    }
1368
 
  }
1369
 
  buffpek_inst= queue.top();
1370
 
  buffpek_inst->base= sort_buffer;
1371
 
  buffpek_inst->max_keys= param->keys;
1372
 
 
1373
 
  /*
1374
 
    As we know all entries in the buffer are unique, we only have to
1375
 
    check if the first one is the same as the last one we wrote
1376
 
  */
1377
 
  if (cmp)
1378
 
  {
1379
 
    if (!(*cmp)(first_cmp_arg, &(param->unique_buff), (unsigned char**) &buffpek_inst->key))
1380
 
    {
1381
 
      buffpek_inst->key+= rec_length;         // Remove duplicate
1382
 
      --buffpek_inst->mem_count;
1383
 
    }
1384
 
  }
1385
 
 
1386
 
  do
1387
 
  {
1388
 
    if ((ha_rows) buffpek_inst->mem_count > max_rows)
1389
 
    {                                        /* Don't write too many records */
1390
 
      buffpek_inst->mem_count= (uint32_t) max_rows;
1391
 
      buffpek_inst->count= 0;                        /* Don't read more */
1392
 
    }
1393
 
    max_rows-= buffpek_inst->mem_count;
1394
 
    if (flag == 0)
1395
 
    {
1396
 
      if (my_b_write(to_file,(unsigned char*) buffpek_inst->key,
1397
 
                     (rec_length*buffpek_inst->mem_count)))
1398
 
      {
1399
 
        return 1;
1400
 
      }
1401
 
    }
1402
 
    else
1403
 
    {
1404
 
      register unsigned char *end;
1405
 
      strpos= buffpek_inst->key+offset;
1406
 
      for (end= strpos+buffpek_inst->mem_count*rec_length ;
1407
 
           strpos != end ;
1408
 
           strpos+= rec_length)
1409
 
      {
1410
 
        if (my_b_write(to_file, (unsigned char *) strpos, res_length))
1411
 
        {
1412
 
          return 1;
1413
 
        }
1414
 
      }
1415
 
    }
1416
 
  }
1417
 
 
1418
 
  while ((error=(int) read_to_buffer(from_file,buffpek_inst, rec_length))
1419
 
         != -1 && error != 0);
1420
 
 
1421
 
end:
1422
 
  lastbuff->count= min(org_max_rows-max_rows, param->max_rows);
1423
 
  lastbuff->file_pos= to_start_filepos;
1424
 
 
1425
 
  return error;
1426
 
} /* merge_buffers */
1427
 
 
1428
 
 
1429
 
        /* Do a merge to output-file (save only positions) */
1430
 
 
1431
 
int FileSort::merge_index(SortParam *param, unsigned char *sort_buffer,
1432
 
                          buffpek *buffpek_inst, uint32_t maxbuffer,
1433
 
                          internal::IO_CACHE *tempfile, internal::IO_CACHE *outfile)
1434
 
{
1435
 
  if (merge_buffers(param,tempfile,outfile,sort_buffer,buffpek_inst,buffpek_inst,
1436
 
                    buffpek_inst+maxbuffer,1))
1437
 
    return 1;
1438
 
 
1439
 
  return 0;
1440
 
} /* merge_index */
1441
 
 
1442
 
 
1443
 
static uint32_t suffix_length(uint32_t string_length)
1444
 
{
1445
 
  if (string_length < 256)
1446
 
    return 1;
1447
 
  if (string_length < 256L*256L)
1448
 
    return 2;
1449
 
  if (string_length < 256L*256L*256L)
1450
 
    return 3;
1451
 
  return 4;                                     // Can't sort longer than 4G
1452
 
}
1453
 
 
1454
 
 
1455
 
 
1456
 
/**
1457
 
  Calculate length of sort key.
1458
 
 
1459
 
  @param sortorder                Order of items to sort
1460
 
  @param s_length                 Number of items to sort
1461
 
  @param[out] multi_byte_charset Set to 1 if we are using multi-byte charset
1462
 
                                 (In which case we have to use strxnfrm())
1463
 
 
1464
 
  @note
1465
 
    sortorder->length is updated for each sort item.
1466
 
  @n
1467
 
    sortorder->need_strxnfrm is set 1 if we have to use strxnfrm
1468
 
 
1469
 
  @return
1470
 
    Total length of sort buffer in bytes
1471
 
*/
1472
 
 
1473
 
uint32_t FileSort::sortlength(SortField *sortorder, uint32_t s_length, bool *multi_byte_charset)
1474
 
{
1475
 
  register uint32_t length;
1476
 
  const CHARSET_INFO *cs;
1477
 
  *multi_byte_charset= 0;
1478
 
 
1479
 
  length=0;
1480
 
  for (; s_length-- ; sortorder++)
1481
 
  {
1482
 
    sortorder->need_strxnfrm= 0;
1483
 
    sortorder->suffix_length= 0;
1484
 
    if (sortorder->field)
1485
 
    {
1486
 
      cs= sortorder->field->sort_charset();
1487
 
      sortorder->length= sortorder->field->sort_length();
1488
 
 
1489
 
      if (use_strnxfrm((cs=sortorder->field->sort_charset())))
1490
 
      {
1491
 
        sortorder->need_strxnfrm= 1;
1492
 
        *multi_byte_charset= 1;
1493
 
        sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
1494
 
      }
1495
 
      if (sortorder->field->maybe_null())
1496
 
        length++;                               // Place for NULL marker
1497
 
    }
1498
 
    else
1499
 
    {
1500
 
      sortorder->result_type= sortorder->item->result_type();
1501
 
      if (sortorder->item->result_as_int64_t())
1502
 
        sortorder->result_type= INT_RESULT;
1503
 
 
1504
 
      switch (sortorder->result_type) {
1505
 
      case STRING_RESULT:
1506
 
        sortorder->length=sortorder->item->max_length;
1507
 
        set_if_smaller(sortorder->length,
1508
 
                       getSession().variables.max_sort_length);
1509
 
        if (use_strnxfrm((cs=sortorder->item->collation.collation)))
1510
 
        {
1511
 
          sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
1512
 
          sortorder->need_strxnfrm= 1;
1513
 
          *multi_byte_charset= 1;
1514
 
        }
1515
 
        else if (cs == &my_charset_bin)
1516
 
        {
1517
 
          /* Store length last to be able to sort blob/varbinary */
1518
 
          sortorder->suffix_length= suffix_length(sortorder->length);
1519
 
          sortorder->length+= sortorder->suffix_length;
1520
 
        }
1521
 
        break;
1522
 
      case INT_RESULT:
1523
 
        sortorder->length=8;                    // Size of intern int64_t
1524
 
        break;
1525
 
      case DECIMAL_RESULT:
1526
 
        sortorder->length=
1527
 
          my_decimal_get_binary_size(sortorder->item->max_length -
1528
 
                                     (sortorder->item->decimals ? 1 : 0),
1529
 
                                     sortorder->item->decimals);
1530
 
        break;
1531
 
      case REAL_RESULT:
1532
 
        sortorder->length=sizeof(double);
1533
 
        break;
1534
 
      case ROW_RESULT:
1535
 
        // This case should never be choosen
1536
 
        assert(0);
1537
 
        break;
1538
 
      }
1539
 
      if (sortorder->item->maybe_null)
1540
 
        length++;                               // Place for NULL marker
1541
 
    }
1542
 
    set_if_smaller(sortorder->length, (size_t)getSession().variables.max_sort_length);
1543
 
    length+=sortorder->length;
1544
 
  }
1545
 
  sortorder->field= (Field*) 0;                 // end marker
1546
 
  return length;
1547
 
}
1548
 
 
1549
 
 
1550
 
/**
1551
 
  Get descriptors of fields appended to sorted fields and
1552
 
  calculate its total length.
1553
 
 
1554
 
  The function first finds out what fields are used in the result set.
1555
 
  Then it calculates the length of the buffer to store the values of
1556
 
  these fields together with the value of sort values.
1557
 
  If the calculated length is not greater than max_length_for_sort_data
1558
 
  the function allocates memory for an array of descriptors containing
1559
 
  layouts for the values of the non-sorted fields in the buffer and
1560
 
  fills them.
1561
 
 
1562
 
  @param ptabfield           Array of references to the table fields
1563
 
  @param sortlength          Total length of sorted fields
1564
 
  @param[out] plength        Total length of appended fields
1565
 
 
1566
 
  @note
1567
 
    The null bits for the appended values are supposed to be put together
1568
 
    and stored the buffer just ahead of the value of the first field.
1569
 
 
1570
 
  @return
1571
 
    Pointer to the layout descriptors for the appended fields, if any
1572
 
  @retval
1573
 
    NULL   if we do not store field values with sort data.
1574
 
*/
1575
 
 
1576
 
sort_addon_field *FileSort::get_addon_fields(Field **ptabfield, uint32_t sortlength_arg, uint32_t *plength)
1577
 
{
1578
 
  Field **pfield;
1579
 
  Field *field;
1580
 
  sort_addon_field *addonf;
1581
 
  uint32_t length= 0;
1582
 
  uint32_t fields= 0;
1583
 
  uint32_t null_fields= 0;
1584
 
 
1585
 
  /*
1586
 
    If there is a reference to a field in the query add it
1587
 
    to the the set of appended fields.
1588
 
    Note for future refinement:
1589
 
    This this a too strong condition.
1590
 
    Actually we need only the fields referred in the
1591
 
    result set. And for some of them it makes sense to use
1592
 
    the values directly from sorted fields.
1593
 
  */
1594
 
  *plength= 0;
1595
 
 
1596
 
  for (pfield= ptabfield; (field= *pfield) ; pfield++)
1597
 
  {
1598
 
    if (!(field->isReadSet()))
1599
 
      continue;
1600
 
    if (field->flags & BLOB_FLAG)
1601
 
      return 0;
1602
 
    length+= field->max_packed_col_length(field->pack_length());
1603
 
    if (field->maybe_null())
1604
 
      null_fields++;
1605
 
    fields++;
1606
 
  }
1607
 
  if (!fields)
1608
 
    return 0;
1609
 
  length+= (null_fields+7)/8;
1610
 
 
1611
 
  if (length+sortlength_arg > getSession().variables.max_length_for_sort_data ||
1612
 
      !(addonf= (sort_addon_field *) malloc(sizeof(sort_addon_field)*
1613
 
                                            (fields+1))))
1614
 
    return 0;
1615
 
 
1616
 
  *plength= length;
1617
 
  length= (null_fields+7)/8;
1618
 
  null_fields= 0;
1619
 
  for (pfield= ptabfield; (field= *pfield) ; pfield++)
1620
 
  {
1621
 
    if (!(field->isReadSet()))
1622
 
      continue;
1623
 
    addonf->field= field;
1624
 
    addonf->offset= length;
1625
 
    if (field->maybe_null())
1626
 
    {
1627
 
      addonf->null_offset= null_fields/8;
1628
 
      addonf->null_bit= 1<<(null_fields & 7);
1629
 
      null_fields++;
1630
 
    }
1631
 
    else
1632
 
    {
1633
 
      addonf->null_offset= 0;
1634
 
      addonf->null_bit= 0;
1635
 
    }
1636
 
    addonf->length= field->max_packed_col_length(field->pack_length());
1637
 
    length+= addonf->length;
1638
 
    addonf++;
1639
 
  }
1640
 
  addonf->field= 0;     // Put end marker
1641
 
 
1642
 
  return (addonf-fields);
1643
 
}
1644
 
 
1645
 
 
1646
 
/**
1647
 
  Copy (unpack) values appended to sorted fields from a buffer back to
1648
 
  their regular positions specified by the Field::ptr pointers.
1649
 
 
1650
 
  @param addon_field     Array of descriptors for appended fields
1651
 
  @param buff            Buffer which to unpack the value from
1652
 
 
1653
 
  @note
1654
 
    The function is supposed to be used only as a callback function
1655
 
    when getting field values for the sorted result set.
1656
 
 
1657
 
  @return
1658
 
    void.
1659
 
*/
1660
 
 
1661
 
static void
1662
 
unpack_addon_fields(sort_addon_field *addon_field, unsigned char *buff)
1663
 
{
1664
 
  Field *field;
1665
 
  sort_addon_field *addonf= addon_field;
1666
 
 
1667
 
  for ( ; (field= addonf->field) ; addonf++)
1668
 
  {
1669
 
    if (addonf->null_bit && (addonf->null_bit & buff[addonf->null_offset]))
1670
 
    {
1671
 
      field->set_null();
1672
 
      continue;
1673
 
    }
1674
 
    field->set_notnull();
1675
 
    field->unpack(field->ptr, buff + addonf->offset);
1676
 
  }
1677
 
}
1678
 
 
1679
 
/*
1680
 
** functions to change a double or float to a sortable string
1681
 
** The following should work for IEEE
1682
 
*/
1683
 
 
1684
 
#define DBL_EXP_DIG (sizeof(double)*8-DBL_MANT_DIG)
1685
 
 
1686
 
void change_double_for_sort(double nr,unsigned char *to)
1687
 
{
1688
 
  unsigned char *tmp=(unsigned char*) to;
1689
 
  if (nr == 0.0)
1690
 
  {                                             /* Change to zero string */
1691
 
    tmp[0]=(unsigned char) 128;
1692
 
    memset(tmp+1, 0, sizeof(nr)-1);
1693
 
  }
1694
 
  else
1695
 
  {
1696
 
#ifdef WORDS_BIGENDIAN
1697
 
    memcpy(tmp,&nr,sizeof(nr));
1698
 
#else
1699
 
    {
1700
 
      unsigned char *ptr= (unsigned char*) &nr;
1701
 
#if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
1702
 
      tmp[0]= ptr[3]; tmp[1]=ptr[2]; tmp[2]= ptr[1]; tmp[3]=ptr[0];
1703
 
      tmp[4]= ptr[7]; tmp[5]=ptr[6]; tmp[6]= ptr[5]; tmp[7]=ptr[4];
1704
 
#else
1705
 
      tmp[0]= ptr[7]; tmp[1]=ptr[6]; tmp[2]= ptr[5]; tmp[3]=ptr[4];
1706
 
      tmp[4]= ptr[3]; tmp[5]=ptr[2]; tmp[6]= ptr[1]; tmp[7]=ptr[0];
1707
 
#endif
1708
 
    }
1709
 
#endif
1710
 
    if (tmp[0] & 128)                           /* Negative */
1711
 
    {                                           /* make complement */
1712
 
      uint32_t i;
1713
 
      for (i=0 ; i < sizeof(nr); i++)
1714
 
        tmp[i]=tmp[i] ^ (unsigned char) 255;
1715
 
    }
1716
 
    else
1717
 
    {                                   /* Set high and move exponent one up */
1718
 
      uint16_t exp_part=(((uint16_t) tmp[0] << 8) | (uint16_t) tmp[1] |
1719
 
                       (uint16_t) 32768);
1720
 
      exp_part+= (uint16_t) 1 << (16-1-DBL_EXP_DIG);
1721
 
      tmp[0]= (unsigned char) (exp_part >> 8);
1722
 
      tmp[1]= (unsigned char) exp_part;
1723
 
    }
1724
 
  }
1725
 
}
1726
 
 
1727
 
} /* namespace drizzled */