~vcs-imports/mammoth-replicator/trunk

« back to all changes in this revision

Viewing changes to src/backend/utils/sort/tuplesort.c

  • Committer: alvherre
  • Date: 2005-12-16 21:24:52 UTC
  • Revision ID: svn-v4:db760fc0-0f08-0410-9d63-cc6633f64896:trunk:1
Initial import of the REL8_0_3 sources from the Pgsql CVS repository.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * tuplesort.c
 
4
 *        Generalized tuple sorting routines.
 
5
 *
 
6
 * This module handles sorting of heap tuples, index tuples, or single
 
7
 * Datums (and could easily support other kinds of sortable objects,
 
8
 * if necessary).  It works efficiently for both small and large amounts
 
9
 * of data.  Small amounts are sorted in-memory using qsort().  Large
 
10
 * amounts are sorted using temporary files and a standard external sort
 
11
 * algorithm.
 
12
 *
 
13
 * See Knuth, volume 3, for more than you want to know about the external
 
14
 * sorting algorithm.  We divide the input into sorted runs using replacement
 
15
 * selection, in the form of a priority tree implemented as a heap
 
16
 * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
 
17
 * merge, Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D
 
18
 * are implemented by logtape.c, which avoids space wastage by recycling
 
19
 * disk space as soon as each block is read from its "tape".
 
20
 *
 
21
 * We do not form the initial runs using Knuth's recommended replacement
 
22
 * selection data structure (Algorithm 5.4.1R), because it uses a fixed
 
23
 * number of records in memory at all times.  Since we are dealing with
 
24
 * tuples that may vary considerably in size, we want to be able to vary
 
25
 * the number of records kept in memory to ensure full utilization of the
 
26
 * allowed sort memory space.  So, we keep the tuples in a variable-size
 
27
 * heap, with the next record to go out at the top of the heap.  Like
 
28
 * Algorithm 5.4.1R, each record is stored with the run number that it
 
29
 * must go into, and we use (run number, key) as the ordering key for the
 
30
 * heap.  When the run number at the top of the heap changes, we know that
 
31
 * no more records of the prior run are left in the heap.
 
32
 *
 
33
 * The approximate amount of memory allowed for any one sort operation
 
34
 * is specified in kilobytes by the caller (most pass work_mem).  Initially,
 
35
 * we absorb tuples and simply store them in an unsorted array as long as
 
36
 * we haven't exceeded workMem.  If we reach the end of the input without
 
37
 * exceeding workMem, we sort the array using qsort() and subsequently return
 
38
 * tuples just by scanning the tuple array sequentially.  If we do exceed
 
39
 * workMem, we construct a heap using Algorithm H and begin to emit tuples
 
40
 * into sorted runs in temporary tapes, emitting just enough tuples at each
 
41
 * step to get back within the workMem limit.  Whenever the run number at
 
42
 * the top of the heap changes, we begin a new run with a new output tape
 
43
 * (selected per Algorithm D).  After the end of the input is reached,
 
44
 * we dump out remaining tuples in memory into a final run (or two),
 
45
 * then merge the runs using Algorithm D.
 
46
 *
 
47
 * When merging runs, we use a heap containing just the frontmost tuple from
 
48
 * each source run; we repeatedly output the smallest tuple and insert the
 
49
 * next tuple from its source tape (if any).  When the heap empties, the merge
 
50
 * is complete.  The basic merge algorithm thus needs very little memory ---
 
51
 * only M tuples for an M-way merge, and M is at most six in the present code.
 
52
 * However, we can still make good use of our full workMem allocation by
 
53
 * pre-reading additional tuples from each source tape.  Without prereading,
 
54
 * our access pattern to the temporary file would be very erratic; on average
 
55
 * we'd read one block from each of M source tapes during the same time that
 
56
 * we're writing M blocks to the output tape, so there is no sequentiality of
 
57
 * access at all, defeating the read-ahead methods used by most Unix kernels.
 
58
 * Worse, the output tape gets written into a very random sequence of blocks
 
59
 * of the temp file, ensuring that things will be even worse when it comes
 
60
 * time to read that tape.      A straightforward merge pass thus ends up doing a
 
61
 * lot of waiting for disk seeks.  We can improve matters by prereading from
 
62
 * each source tape sequentially, loading about workMem/M bytes from each tape
 
63
 * in turn.  Then we run the merge algorithm, writing but not reading until
 
64
 * one of the preloaded tuple series runs out.  Then we switch back to preread
 
65
 * mode, fill memory again, and repeat.  This approach helps to localize both
 
66
 * read and write accesses.
 
67
 *
 
68
 * When the caller requests random access to the sort result, we form
 
69
 * the final sorted run on a logical tape which is then "frozen", so
 
70
 * that we can access it randomly.      When the caller does not need random
 
71
 * access, we return from tuplesort_performsort() as soon as we are down
 
72
 * to one run per logical tape.  The final merge is then performed
 
73
 * on-the-fly as the caller repeatedly calls tuplesort_gettuple; this
 
74
 * saves one cycle of writing all the data out to disk and reading it in.
 
75
 *
 
76
 *
 
77
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
 
78
 * Portions Copyright (c) 1994, Regents of the University of California
 
79
 *
 
80
 * IDENTIFICATION
 
81
 *        $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.45.4.1 2005-02-02 22:40:19 tgl Exp $
 
82
 *
 
83
 *-------------------------------------------------------------------------
 
84
 */
 
85
 
 
86
#include "postgres.h"
 
87
 
 
88
#include "access/heapam.h"
 
89
#include "access/nbtree.h"
 
90
#include "catalog/pg_amop.h"
 
91
#include "catalog/pg_operator.h"
 
92
#include "miscadmin.h"
 
93
#include "utils/catcache.h"
 
94
#include "utils/datum.h"
 
95
#include "utils/logtape.h"
 
96
#include "utils/lsyscache.h"
 
97
#include "utils/syscache.h"
 
98
#include "utils/tuplesort.h"
 
99
 
 
100
 
 
101
/*
 
102
 * Possible states of a Tuplesort object.  These denote the states that
 
103
 * persist between calls of Tuplesort routines.
 
104
 */
 
105
typedef enum
 
106
{
 
107
        TSS_INITIAL,                            /* Loading tuples; still within memory
 
108
                                                                 * limit */
 
109
        TSS_BUILDRUNS,                          /* Loading tuples; writing to tape */
 
110
        TSS_SORTEDINMEM,                        /* Sort completed entirely in memory */
 
111
        TSS_SORTEDONTAPE,                       /* Sort completed, final run is on tape */
 
112
        TSS_FINALMERGE                          /* Performing final merge on-the-fly */
 
113
} TupSortStatus;
 
114
 
 
115
/*
 
116
 * We use a seven-tape polyphase merge, which is the "sweet spot" on the
 
117
 * tapes-to-passes curve according to Knuth's figure 70 (section 5.4.2).
 
118
 */
 
119
#define MAXTAPES                7               /* Knuth's T */
 
120
#define TAPERANGE               (MAXTAPES-1)    /* Knuth's P */
 
121
 
 
122
/*
 
123
 * Private state of a Tuplesort operation.
 
124
 */
 
125
struct Tuplesortstate
 
126
{
 
127
        TupSortStatus status;           /* enumerated value as shown above */
 
128
        bool            randomAccess;   /* did caller request random access? */
 
129
        long            availMem;               /* remaining memory available, in bytes */
 
130
        LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp
 
131
                                                                 * file */
 
132
 
 
133
        /*
 
134
         * These function pointers decouple the routines that must know what
 
135
         * kind of tuple we are sorting from the routines that don't need to
 
136
         * know it. They are set up by the tuplesort_begin_xxx routines.
 
137
         *
 
138
         * Function to compare two tuples; result is per qsort() convention, ie:
 
139
         *
 
140
         * <0, 0, >0 according as a<b, a=b, a>b.
 
141
         */
 
142
        int                     (*comparetup) (Tuplesortstate *state, const void *a, const void *b);
 
143
 
 
144
        /*
 
145
         * Function to copy a supplied input tuple into palloc'd space. (NB:
 
146
         * we assume that a single pfree() is enough to release the tuple
 
147
         * later, so the representation must be "flat" in one palloc chunk.)
 
148
         * state->availMem must be decreased by the amount of space used.
 
149
         */
 
150
        void       *(*copytup) (Tuplesortstate *state, void *tup);
 
151
 
 
152
        /*
 
153
         * Function to write a stored tuple onto tape.  The representation of
 
154
         * the tuple on tape need not be the same as it is in memory;
 
155
         * requirements on the tape representation are given below.  After
 
156
         * writing the tuple, pfree() it, and increase state->availMem by the
 
157
         * amount of memory space thereby released.
 
158
         */
 
159
        void            (*writetup) (Tuplesortstate *state, int tapenum, void *tup);
 
160
 
 
161
        /*
 
162
         * Function to read a stored tuple from tape back into memory. 'len'
 
163
         * is the already-read length of the stored tuple.      Create and return
 
164
         * a palloc'd copy, and decrease state->availMem by the amount of
 
165
         * memory space consumed.
 
166
         */
 
167
        void       *(*readtup) (Tuplesortstate *state, int tapenum, unsigned int len);
 
168
 
 
169
        /*
 
170
         * This array holds pointers to tuples in sort memory.  If we are in
 
171
         * state INITIAL, the tuples are in no particular order; if we are in
 
172
         * state SORTEDINMEM, the tuples are in final sorted order; in states
 
173
         * BUILDRUNS and FINALMERGE, the tuples are organized in "heap" order
 
174
         * per Algorithm H.  (Note that memtupcount only counts the tuples
 
175
         * that are part of the heap --- during merge passes, memtuples[]
 
176
         * entries beyond TAPERANGE are never in the heap and are used to hold
 
177
         * pre-read tuples.)  In state SORTEDONTAPE, the array is not used.
 
178
         */
 
179
        void      **memtuples;          /* array of pointers to palloc'd tuples */
 
180
        int                     memtupcount;    /* number of tuples currently present */
 
181
        int                     memtupsize;             /* allocated length of memtuples array */
 
182
 
 
183
        /*
 
184
         * While building initial runs, this array holds the run number for
 
185
         * each tuple in memtuples[].  During merge passes, we re-use it to
 
186
         * hold the input tape number that each tuple in the heap was read
 
187
         * from, or to hold the index of the next tuple pre-read from the same
 
188
         * tape in the case of pre-read entries.  This array is never
 
189
         * allocated unless we need to use tapes.  Whenever it is allocated,
 
190
         * it has the same length as memtuples[].
 
191
         */
 
192
        int                *memtupindex;        /* index value associated with
 
193
                                                                 * memtuples[i] */
 
194
 
 
195
        /*
 
196
         * While building initial runs, this is the current output run number
 
197
         * (starting at 0).  Afterwards, it is the number of initial runs we
 
198
         * made.
 
199
         */
 
200
        int                     currentRun;
 
201
 
 
202
        /*
 
203
         * These variables are only used during merge passes.  mergeactive[i]
 
204
         * is true if we are reading an input run from (actual) tape number i
 
205
         * and have not yet exhausted that run.  mergenext[i] is the memtuples
 
206
         * index of the next pre-read tuple (next to be loaded into the heap)
 
207
         * for tape i, or 0 if we are out of pre-read tuples.  mergelast[i]
 
208
         * similarly points to the last pre-read tuple from each tape.
 
209
         * mergeavailmem[i] is the amount of unused space allocated for tape
 
210
         * i. mergefreelist and mergefirstfree keep track of unused locations
 
211
         * in the memtuples[] array.  memtupindex[] links together pre-read
 
212
         * tuples for each tape as well as recycled locations in
 
213
         * mergefreelist. It is OK to use 0 as a null link in these lists,
 
214
         * because memtuples[0] is part of the merge heap and is never a
 
215
         * pre-read tuple.
 
216
         */
 
217
        bool            mergeactive[MAXTAPES];  /* Active input run source? */
 
218
        int                     mergenext[MAXTAPES];    /* first preread tuple for each
 
219
                                                                                 * source */
 
220
        int                     mergelast[MAXTAPES];    /* last preread tuple for each
 
221
                                                                                 * source */
 
222
        long            mergeavailmem[MAXTAPES];                /* availMem for prereading
 
223
                                                                                                 * tapes */
 
224
        long            spacePerTape;   /* actual per-tape target usage */
 
225
        int                     mergefreelist;  /* head of freelist of recycled slots */
 
226
        int                     mergefirstfree; /* first slot never used in this merge */
 
227
 
 
228
        /*
 
229
         * Variables for Algorithm D.  Note that destTape is a "logical" tape
 
230
         * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
 
231
         * "logical" and "actual" tape numbers straight!
 
232
         */
 
233
        int                     Level;                  /* Knuth's l */
 
234
        int                     destTape;               /* current output tape (Knuth's j, less 1) */
 
235
        int                     tp_fib[MAXTAPES];               /* Target Fibonacci run counts
 
236
                                                                                 * (A[]) */
 
237
        int                     tp_runs[MAXTAPES];              /* # of real runs on each tape */
 
238
        int                     tp_dummy[MAXTAPES];             /* # of dummy runs for each tape
 
239
                                                                                 * (D[]) */
 
240
        int                     tp_tapenum[MAXTAPES];   /* Actual tape numbers (TAPE[]) */
 
241
 
 
242
        /*
 
243
         * These variables are used after completion of sorting to keep track
 
244
         * of the next tuple to return.  (In the tape case, the tape's current
 
245
         * read position is also critical state.)
 
246
         */
 
247
        int                     result_tape;    /* actual tape number of finished output */
 
248
        int                     current;                /* array index (only used if SORTEDINMEM) */
 
249
        bool            eof_reached;    /* reached EOF (needed for cursors) */
 
250
 
 
251
        /* markpos_xxx holds marked position for mark and restore */
 
252
        long            markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
 
253
        int                     markpos_offset; /* saved "current", or offset in tape
 
254
                                                                 * block */
 
255
        bool            markpos_eof;    /* saved "eof_reached" */
 
256
 
 
257
        /*
 
258
         * These variables are specific to the HeapTuple case; they are set by
 
259
         * tuplesort_begin_heap and used only by the HeapTuple routines.
 
260
         */
 
261
        TupleDesc       tupDesc;
 
262
        int                     nKeys;
 
263
        ScanKey         scanKeys;
 
264
        SortFunctionKind *sortFnKinds;
 
265
 
 
266
        /*
 
267
         * These variables are specific to the IndexTuple case; they are set
 
268
         * by tuplesort_begin_index and used only by the IndexTuple routines.
 
269
         */
 
270
        Relation        indexRel;
 
271
        ScanKey         indexScanKey;
 
272
        bool            enforceUnique;  /* complain if we find duplicate tuples */
 
273
 
 
274
        /*
 
275
         * These variables are specific to the Datum case; they are set by
 
276
         * tuplesort_begin_datum and used only by the DatumTuple routines.
 
277
         */
 
278
        Oid                     datumType;
 
279
        Oid                     sortOperator;
 
280
        FmgrInfo        sortOpFn;               /* cached lookup data for sortOperator */
 
281
        SortFunctionKind sortFnKind;
 
282
        /* we need typelen and byval in order to know how to copy the Datums. */
 
283
        int                     datumTypeLen;
 
284
        bool            datumTypeByVal;
 
285
};
 
286
 
 
287
#define COMPARETUP(state,a,b)   ((*(state)->comparetup) (state, a, b))
 
288
#define COPYTUP(state,tup)      ((*(state)->copytup) (state, tup))
 
289
#define WRITETUP(state,tape,tup)        ((*(state)->writetup) (state, tape, tup))
 
290
#define READTUP(state,tape,len) ((*(state)->readtup) (state, tape, len))
 
291
#define LACKMEM(state)          ((state)->availMem < 0)
 
292
#define USEMEM(state,amt)       ((state)->availMem -= (amt))
 
293
#define FREEMEM(state,amt)      ((state)->availMem += (amt))
 
294
 
 
295
/*--------------------
 
296
 *
 
297
 * NOTES about on-tape representation of tuples:
 
298
 *
 
299
 * We require the first "unsigned int" of a stored tuple to be the total size
 
300
 * on-tape of the tuple, including itself (so it is never zero; an all-zero
 
301
 * unsigned int is used to delimit runs).  The remainder of the stored tuple
 
302
 * may or may not match the in-memory representation of the tuple ---
 
303
 * any conversion needed is the job of the writetup and readtup routines.
 
304
 *
 
305
 * If state->randomAccess is true, then the stored representation of the
 
306
 * tuple must be followed by another "unsigned int" that is a copy of the
 
307
 * length --- so the total tape space used is actually sizeof(unsigned int)
 
308
 * more than the stored length value.  This allows read-backwards.      When
 
309
 * randomAccess is not true, the write/read routines may omit the extra
 
310
 * length word.
 
311
 *
 
312
 * writetup is expected to write both length words as well as the tuple
 
313
 * data.  When readtup is called, the tape is positioned just after the
 
314
 * front length word; readtup must read the tuple data and advance past
 
315
 * the back length word (if present).
 
316
 *
 
317
 * The write/read routines can make use of the tuple description data
 
318
 * stored in the Tuplesortstate record, if needed.      They are also expected
 
319
 * to adjust state->availMem by the amount of memory space (not tape space!)
 
320
 * released or consumed.  There is no error return from either writetup
 
321
 * or readtup; they should ereport() on failure.
 
322
 *
 
323
 *
 
324
 * NOTES about memory consumption calculations:
 
325
 *
 
326
 * We count space allocated for tuples against the workMem limit, plus
 
327
 * the space used by the variable-size arrays memtuples and memtupindex.
 
328
 * Fixed-size space (primarily the LogicalTapeSet I/O buffers) is not
 
329
 * counted.
 
330
 *
 
331
 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
 
332
 * rather than the originally-requested size.  This is important since
 
333
 * palloc can add substantial overhead.  It's not a complete answer since
 
334
 * we won't count any wasted space in palloc allocation blocks, but it's
 
335
 * a lot better than what we were doing before 7.3.
 
336
 *
 
337
 *--------------------
 
338
 */
 
339
 
 
340
/*
 
341
 * For sorting single Datums, we build "pseudo tuples" that just carry
 
342
 * the datum's value and null flag.  For pass-by-reference data types,
 
343
 * the actual data value appears after the DatumTupleHeader (MAXALIGNed,
 
344
 * of course), and the value field in the header is just a pointer to it.
 
345
 */
 
346
 
 
347
typedef struct
 
348
{
 
349
        Datum           val;
 
350
        bool            isNull;
 
351
} DatumTuple;
 
352
 
 
353
 
 
354
static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
 
355
static void puttuple_common(Tuplesortstate *state, void *tuple);
 
356
static void inittapes(Tuplesortstate *state);
 
357
static void selectnewtape(Tuplesortstate *state);
 
358
static void mergeruns(Tuplesortstate *state);
 
359
static void mergeonerun(Tuplesortstate *state);
 
360
static void beginmerge(Tuplesortstate *state);
 
361
static void mergepreread(Tuplesortstate *state);
 
362
static void dumptuples(Tuplesortstate *state, bool alltuples);
 
363
static void tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
 
364
                                          int tupleindex, bool checkIndex);
 
365
static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
 
366
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
 
367
static void markrunend(Tuplesortstate *state, int tapenum);
 
368
static int      qsort_comparetup(const void *a, const void *b);
 
369
static int comparetup_heap(Tuplesortstate *state,
 
370
                                const void *a, const void *b);
 
371
static void *copytup_heap(Tuplesortstate *state, void *tup);
 
372
static void writetup_heap(Tuplesortstate *state, int tapenum, void *tup);
 
373
static void *readtup_heap(Tuplesortstate *state, int tapenum,
 
374
                         unsigned int len);
 
375
static int comparetup_index(Tuplesortstate *state,
 
376
                                 const void *a, const void *b);
 
377
static void *copytup_index(Tuplesortstate *state, void *tup);
 
378
static void writetup_index(Tuplesortstate *state, int tapenum, void *tup);
 
379
static void *readtup_index(Tuplesortstate *state, int tapenum,
 
380
                          unsigned int len);
 
381
static int comparetup_datum(Tuplesortstate *state,
 
382
                                 const void *a, const void *b);
 
383
static void *copytup_datum(Tuplesortstate *state, void *tup);
 
384
static void writetup_datum(Tuplesortstate *state, int tapenum, void *tup);
 
385
static void *readtup_datum(Tuplesortstate *state, int tapenum,
 
386
                          unsigned int len);
 
387
 
 
388
/*
 
389
 * Since qsort(3) will not pass any context info to qsort_comparetup(),
 
390
 * we have to use this ugly static variable.  It is set to point to the
 
391
 * active Tuplesortstate object just before calling qsort.      It should
 
392
 * not be used directly by anything except qsort_comparetup().
 
393
 */
 
394
static Tuplesortstate *qsort_tuplesortstate;
 
395
 
 
396
 
 
397
/*
 
398
 *              tuplesort_begin_xxx
 
399
 *
 
400
 * Initialize for a tuple sort operation.
 
401
 *
 
402
 * After calling tuplesort_begin, the caller should call tuplesort_puttuple
 
403
 * zero or more times, then call tuplesort_performsort when all the tuples
 
404
 * have been supplied.  After performsort, retrieve the tuples in sorted
 
405
 * order by calling tuplesort_gettuple until it returns NULL.  (If random
 
406
 * access was requested, rescan, markpos, and restorepos can also be called.)
 
407
 * For Datum sorts, putdatum/getdatum are used instead of puttuple/gettuple.
 
408
 * Call tuplesort_end to terminate the operation and release memory/disk space.
 
409
 *
 
410
 * Each variant of tuplesort_begin has a workMem parameter specifying the
 
411
 * maximum number of kilobytes of RAM to use before spilling data to disk.
 
412
 * (The normal value of this parameter is work_mem, but some callers use
 
413
 * other values.)  Each variant also has a randomAccess parameter specifying
 
414
 * whether the caller needs non-sequential access to the sort result.
 
415
 */
 
416
 
 
417
static Tuplesortstate *
 
418
tuplesort_begin_common(int workMem, bool randomAccess)
 
419
{
 
420
        Tuplesortstate *state;
 
421
 
 
422
        state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
 
423
 
 
424
        state->status = TSS_INITIAL;
 
425
        state->randomAccess = randomAccess;
 
426
        state->availMem = workMem * 1024L;
 
427
        state->tapeset = NULL;
 
428
 
 
429
        state->memtupcount = 0;
 
430
        state->memtupsize = 1024;       /* initial guess */
 
431
        state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
 
432
 
 
433
        state->memtupindex = NULL;      /* until and unless needed */
 
434
 
 
435
        USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 
436
 
 
437
        state->currentRun = 0;
 
438
 
 
439
        /* Algorithm D variables will be initialized by inittapes, if needed */
 
440
 
 
441
        state->result_tape = -1;        /* flag that result tape has not been
 
442
                                                                 * formed */
 
443
 
 
444
        return state;
 
445
}
 
446
 
 
447
Tuplesortstate *
 
448
tuplesort_begin_heap(TupleDesc tupDesc,
 
449
                                         int nkeys,
 
450
                                         Oid *sortOperators, AttrNumber *attNums,
 
451
                                         int workMem, bool randomAccess)
 
452
{
 
453
        Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
 
454
        int                     i;
 
455
 
 
456
        AssertArg(nkeys > 0);
 
457
 
 
458
        state->comparetup = comparetup_heap;
 
459
        state->copytup = copytup_heap;
 
460
        state->writetup = writetup_heap;
 
461
        state->readtup = readtup_heap;
 
462
 
 
463
        state->tupDesc = tupDesc;
 
464
        state->nKeys = nkeys;
 
465
        state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData));
 
466
        state->sortFnKinds = (SortFunctionKind *)
 
467
                palloc0(nkeys * sizeof(SortFunctionKind));
 
468
 
 
469
        for (i = 0; i < nkeys; i++)
 
470
        {
 
471
                RegProcedure sortFunction;
 
472
 
 
473
                AssertArg(sortOperators[i] != 0);
 
474
                AssertArg(attNums[i] != 0);
 
475
 
 
476
                /* select a function that implements the sort operator */
 
477
                SelectSortFunction(sortOperators[i], &sortFunction,
 
478
                                                   &state->sortFnKinds[i]);
 
479
 
 
480
                /*
 
481
                 * We needn't fill in sk_strategy or sk_subtype since these
 
482
                 * scankeys will never be passed to an index.
 
483
                 */
 
484
                ScanKeyInit(&state->scanKeys[i],
 
485
                                        attNums[i],
 
486
                                        InvalidStrategy,
 
487
                                        sortFunction,
 
488
                                        (Datum) 0);
 
489
        }
 
490
 
 
491
        return state;
 
492
}
 
493
 
 
494
Tuplesortstate *
 
495
tuplesort_begin_index(Relation indexRel,
 
496
                                          bool enforceUnique,
 
497
                                          int workMem, bool randomAccess)
 
498
{
 
499
        Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
 
500
 
 
501
        state->comparetup = comparetup_index;
 
502
        state->copytup = copytup_index;
 
503
        state->writetup = writetup_index;
 
504
        state->readtup = readtup_index;
 
505
 
 
506
        state->indexRel = indexRel;
 
507
        /* see comments below about btree dependence of this code... */
 
508
        state->indexScanKey = _bt_mkscankey_nodata(indexRel);
 
509
        state->enforceUnique = enforceUnique;
 
510
 
 
511
        return state;
 
512
}
 
513
 
 
514
Tuplesortstate *
 
515
tuplesort_begin_datum(Oid datumType,
 
516
                                          Oid sortOperator,
 
517
                                          int workMem, bool randomAccess)
 
518
{
 
519
        Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
 
520
        RegProcedure sortFunction;
 
521
        int16           typlen;
 
522
        bool            typbyval;
 
523
 
 
524
        state->comparetup = comparetup_datum;
 
525
        state->copytup = copytup_datum;
 
526
        state->writetup = writetup_datum;
 
527
        state->readtup = readtup_datum;
 
528
 
 
529
        state->datumType = datumType;
 
530
        state->sortOperator = sortOperator;
 
531
 
 
532
        /* select a function that implements the sort operator */
 
533
        SelectSortFunction(sortOperator, &sortFunction, &state->sortFnKind);
 
534
        /* and look up the function */
 
535
        fmgr_info(sortFunction, &state->sortOpFn);
 
536
 
 
537
        /* lookup necessary attributes of the datum type */
 
538
        get_typlenbyval(datumType, &typlen, &typbyval);
 
539
        state->datumTypeLen = typlen;
 
540
        state->datumTypeByVal = typbyval;
 
541
 
 
542
        return state;
 
543
}
 
544
 
 
545
/*
 
546
 * tuplesort_end
 
547
 *
 
548
 *      Release resources and clean up.
 
549
 */
 
550
void
 
551
tuplesort_end(Tuplesortstate *state)
 
552
{
 
553
        int                     i;
 
554
 
 
555
        if (state->tapeset)
 
556
                LogicalTapeSetClose(state->tapeset);
 
557
        if (state->memtuples)
 
558
        {
 
559
                for (i = 0; i < state->memtupcount; i++)
 
560
                        pfree(state->memtuples[i]);
 
561
                pfree(state->memtuples);
 
562
        }
 
563
        if (state->memtupindex)
 
564
                pfree(state->memtupindex);
 
565
 
 
566
        /*
 
567
         * this stuff might better belong in a variant-specific shutdown
 
568
         * routine
 
569
         */
 
570
        if (state->scanKeys)
 
571
                pfree(state->scanKeys);
 
572
        if (state->sortFnKinds)
 
573
                pfree(state->sortFnKinds);
 
574
 
 
575
        pfree(state);
 
576
}
 
577
 
 
578
/*
 
579
 * Accept one tuple while collecting input data for sort.
 
580
 *
 
581
 * Note that the input tuple is always copied; the caller need not save it.
 
582
 */
 
583
void
 
584
tuplesort_puttuple(Tuplesortstate *state, void *tuple)
 
585
{
 
586
        /*
 
587
         * Copy the given tuple into memory we control, and decrease availMem.
 
588
         * Then call the code shared with the Datum case.
 
589
         */
 
590
        tuple = COPYTUP(state, tuple);
 
591
 
 
592
        puttuple_common(state, tuple);
 
593
}
 
594
 
 
595
/*
 
596
 * Accept one Datum while collecting input data for sort.
 
597
 *
 
598
 * If the Datum is pass-by-ref type, the value will be copied.
 
599
 */
 
600
void
 
601
tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
 
602
{
 
603
        DatumTuple *tuple;
 
604
 
 
605
        /*
 
606
         * Build pseudo-tuple carrying the datum, and decrease availMem.
 
607
         */
 
608
        if (isNull || state->datumTypeByVal)
 
609
        {
 
610
                tuple = (DatumTuple *) palloc(sizeof(DatumTuple));
 
611
                tuple->val = val;
 
612
                tuple->isNull = isNull;
 
613
        }
 
614
        else
 
615
        {
 
616
                Size            datalen;
 
617
                Size            tuplelen;
 
618
                char       *newVal;
 
619
 
 
620
                datalen = datumGetSize(val, false, state->datumTypeLen);
 
621
                tuplelen = datalen + MAXALIGN(sizeof(DatumTuple));
 
622
                tuple = (DatumTuple *) palloc(tuplelen);
 
623
                newVal = ((char *) tuple) + MAXALIGN(sizeof(DatumTuple));
 
624
                memcpy(newVal, DatumGetPointer(val), datalen);
 
625
                tuple->val = PointerGetDatum(newVal);
 
626
                tuple->isNull = false;
 
627
        }
 
628
 
 
629
        USEMEM(state, GetMemoryChunkSpace(tuple));
 
630
 
 
631
        puttuple_common(state, (void *) tuple);
 
632
}
 
633
 
 
634
/*
 
635
 * Shared code for tuple and datum cases.
 
636
 */
 
637
static void
 
638
puttuple_common(Tuplesortstate *state, void *tuple)
 
639
{
 
640
        switch (state->status)
 
641
        {
 
642
                case TSS_INITIAL:
 
643
 
 
644
                        /*
 
645
                         * Save the copied tuple into the unsorted array.
 
646
                         */
 
647
                        if (state->memtupcount >= state->memtupsize)
 
648
                        {
 
649
                                /* Grow the unsorted array as needed. */
 
650
                                FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
 
651
                                state->memtupsize *= 2;
 
652
                                state->memtuples = (void **)
 
653
                                        repalloc(state->memtuples,
 
654
                                                         state->memtupsize * sizeof(void *));
 
655
                                USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 
656
                        }
 
657
                        state->memtuples[state->memtupcount++] = tuple;
 
658
 
 
659
                        /*
 
660
                         * Done if we still fit in available memory.
 
661
                         */
 
662
                        if (!LACKMEM(state))
 
663
                                return;
 
664
 
 
665
                        /*
 
666
                         * Nope; time to switch to tape-based operation.
 
667
                         */
 
668
                        inittapes(state);
 
669
 
 
670
                        /*
 
671
                         * Dump tuples until we are back under the limit.
 
672
                         */
 
673
                        dumptuples(state, false);
 
674
                        break;
 
675
                case TSS_BUILDRUNS:
 
676
 
 
677
                        /*
 
678
                         * Insert the copied tuple into the heap, with run number
 
679
                         * currentRun if it can go into the current run, else run
 
680
                         * number currentRun+1.  The tuple can go into the current run
 
681
                         * if it is >= the first not-yet-output tuple.  (Actually, it
 
682
                         * could go into the current run if it is >= the most recently
 
683
                         * output tuple ... but that would require keeping around the
 
684
                         * tuple we last output, and it's simplest to let writetup
 
685
                         * free each tuple as soon as it's written.)
 
686
                         *
 
687
                         * Note there will always be at least one tuple in the heap at
 
688
                         * this point; see dumptuples.
 
689
                         */
 
690
                        Assert(state->memtupcount > 0);
 
691
                        if (COMPARETUP(state, tuple, state->memtuples[0]) >= 0)
 
692
                                tuplesort_heap_insert(state, tuple, state->currentRun, true);
 
693
                        else
 
694
                                tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
 
695
 
 
696
                        /*
 
697
                         * If we are over the memory limit, dump tuples till we're
 
698
                         * under.
 
699
                         */
 
700
                        dumptuples(state, false);
 
701
                        break;
 
702
                default:
 
703
                        elog(ERROR, "invalid tuplesort state");
 
704
                        break;
 
705
        }
 
706
}
 
707
 
 
708
/*
 
709
 * All tuples have been provided; finish the sort.
 
710
 */
 
711
void
 
712
tuplesort_performsort(Tuplesortstate *state)
 
713
{
 
714
        switch (state->status)
 
715
        {
 
716
                case TSS_INITIAL:
 
717
 
 
718
                        /*
 
719
                         * We were able to accumulate all the tuples within the
 
720
                         * allowed amount of memory.  Just qsort 'em and we're done.
 
721
                         */
 
722
                        if (state->memtupcount > 1)
 
723
                        {
 
724
                                qsort_tuplesortstate = state;
 
725
                                qsort((void *) state->memtuples, state->memtupcount,
 
726
                                          sizeof(void *), qsort_comparetup);
 
727
                        }
 
728
                        state->current = 0;
 
729
                        state->eof_reached = false;
 
730
                        state->markpos_offset = 0;
 
731
                        state->markpos_eof = false;
 
732
                        state->status = TSS_SORTEDINMEM;
 
733
                        break;
 
734
                case TSS_BUILDRUNS:
 
735
 
 
736
                        /*
 
737
                         * Finish tape-based sort.      First, flush all tuples remaining
 
738
                         * in memory out to tape; then merge until we have a single
 
739
                         * remaining run (or, if !randomAccess, one run per tape).
 
740
                         * Note that mergeruns sets the correct state->status.
 
741
                         */
 
742
                        dumptuples(state, true);
 
743
                        mergeruns(state);
 
744
                        state->eof_reached = false;
 
745
                        state->markpos_block = 0L;
 
746
                        state->markpos_offset = 0;
 
747
                        state->markpos_eof = false;
 
748
                        break;
 
749
                default:
 
750
                        elog(ERROR, "invalid tuplesort state");
 
751
                        break;
 
752
        }
 
753
}
 
754
 
 
755
/*
 
756
 * Fetch the next tuple in either forward or back direction.
 
757
 * Returns NULL if no more tuples.      If should_free is set, the
 
758
 * caller must pfree the returned tuple when done with it.
 
759
 */
 
760
void *
 
761
tuplesort_gettuple(Tuplesortstate *state, bool forward,
 
762
                                   bool *should_free)
 
763
{
 
764
        unsigned int tuplen;
 
765
        void       *tup;
 
766
 
 
767
        switch (state->status)
 
768
        {
 
769
                case TSS_SORTEDINMEM:
 
770
                        Assert(forward || state->randomAccess);
 
771
                        *should_free = false;
 
772
                        if (forward)
 
773
                        {
 
774
                                if (state->current < state->memtupcount)
 
775
                                        return state->memtuples[state->current++];
 
776
                                state->eof_reached = true;
 
777
                                return NULL;
 
778
                        }
 
779
                        else
 
780
                        {
 
781
                                if (state->current <= 0)
 
782
                                        return NULL;
 
783
 
 
784
                                /*
 
785
                                 * if all tuples are fetched already then we return last
 
786
                                 * tuple, else - tuple before last returned.
 
787
                                 */
 
788
                                if (state->eof_reached)
 
789
                                        state->eof_reached = false;
 
790
                                else
 
791
                                {
 
792
                                        state->current--;       /* last returned tuple */
 
793
                                        if (state->current <= 0)
 
794
                                                return NULL;
 
795
                                }
 
796
                                return state->memtuples[state->current - 1];
 
797
                        }
 
798
                        break;
 
799
 
 
800
                case TSS_SORTEDONTAPE:
 
801
                        Assert(forward || state->randomAccess);
 
802
                        *should_free = true;
 
803
                        if (forward)
 
804
                        {
 
805
                                if (state->eof_reached)
 
806
                                        return NULL;
 
807
                                if ((tuplen = getlen(state, state->result_tape, true)) != 0)
 
808
                                {
 
809
                                        tup = READTUP(state, state->result_tape, tuplen);
 
810
                                        return tup;
 
811
                                }
 
812
                                else
 
813
                                {
 
814
                                        state->eof_reached = true;
 
815
                                        return NULL;
 
816
                                }
 
817
                        }
 
818
 
 
819
                        /*
 
820
                         * Backward.
 
821
                         *
 
822
                         * if all tuples are fetched already then we return last tuple,
 
823
                         * else - tuple before last returned.
 
824
                         */
 
825
                        if (state->eof_reached)
 
826
                        {
 
827
                                /*
 
828
                                 * Seek position is pointing just past the zero tuplen at
 
829
                                 * the end of file; back up to fetch last tuple's ending
 
830
                                 * length word.  If seek fails we must have a completely
 
831
                                 * empty file.
 
832
                                 */
 
833
                                if (!LogicalTapeBackspace(state->tapeset,
 
834
                                                                                  state->result_tape,
 
835
                                                                                  2 * sizeof(unsigned int)))
 
836
                                        return NULL;
 
837
                                state->eof_reached = false;
 
838
                        }
 
839
                        else
 
840
                        {
 
841
                                /*
 
842
                                 * Back up and fetch previously-returned tuple's ending
 
843
                                 * length word.  If seek fails, assume we are at start of
 
844
                                 * file.
 
845
                                 */
 
846
                                if (!LogicalTapeBackspace(state->tapeset,
 
847
                                                                                  state->result_tape,
 
848
                                                                                  sizeof(unsigned int)))
 
849
                                        return NULL;
 
850
                                tuplen = getlen(state, state->result_tape, false);
 
851
 
 
852
                                /*
 
853
                                 * Back up to get ending length word of tuple before it.
 
854
                                 */
 
855
                                if (!LogicalTapeBackspace(state->tapeset,
 
856
                                                                                  state->result_tape,
 
857
                                                                          tuplen + 2 * sizeof(unsigned int)))
 
858
                                {
 
859
                                        /*
 
860
                                         * If that fails, presumably the prev tuple is the
 
861
                                         * first in the file.  Back up so that it becomes next
 
862
                                         * to read in forward direction (not obviously right,
 
863
                                         * but that is what in-memory case does).
 
864
                                         */
 
865
                                        if (!LogicalTapeBackspace(state->tapeset,
 
866
                                                                                          state->result_tape,
 
867
                                                                                  tuplen + sizeof(unsigned int)))
 
868
                                                elog(ERROR, "bogus tuple length in backward scan");
 
869
                                        return NULL;
 
870
                                }
 
871
                        }
 
872
 
 
873
                        tuplen = getlen(state, state->result_tape, false);
 
874
 
 
875
                        /*
 
876
                         * Now we have the length of the prior tuple, back up and read
 
877
                         * it. Note: READTUP expects we are positioned after the
 
878
                         * initial length word of the tuple, so back up to that point.
 
879
                         */
 
880
                        if (!LogicalTapeBackspace(state->tapeset,
 
881
                                                                          state->result_tape,
 
882
                                                                          tuplen))
 
883
                                elog(ERROR, "bogus tuple length in backward scan");
 
884
                        tup = READTUP(state, state->result_tape, tuplen);
 
885
                        return tup;
 
886
 
 
887
                case TSS_FINALMERGE:
 
888
                        Assert(forward);
 
889
                        *should_free = true;
 
890
 
 
891
                        /*
 
892
                         * This code should match the inner loop of mergeonerun().
 
893
                         */
 
894
                        if (state->memtupcount > 0)
 
895
                        {
 
896
                                int                     srcTape = state->memtupindex[0];
 
897
                                Size            tuplen;
 
898
                                int                     tupIndex;
 
899
                                void       *newtup;
 
900
 
 
901
                                tup = state->memtuples[0];
 
902
                                /* returned tuple is no longer counted in our memory space */
 
903
                                tuplen = GetMemoryChunkSpace(tup);
 
904
                                state->availMem += tuplen;
 
905
                                state->mergeavailmem[srcTape] += tuplen;
 
906
                                tuplesort_heap_siftup(state, false);
 
907
                                if ((tupIndex = state->mergenext[srcTape]) == 0)
 
908
                                {
 
909
                                        /*
 
910
                                         * out of preloaded data on this tape, try to read
 
911
                                         * more
 
912
                                         */
 
913
                                        mergepreread(state);
 
914
 
 
915
                                        /*
 
916
                                         * if still no data, we've reached end of run on this
 
917
                                         * tape
 
918
                                         */
 
919
                                        if ((tupIndex = state->mergenext[srcTape]) == 0)
 
920
                                                return tup;
 
921
                                }
 
922
                                /* pull next preread tuple from list, insert in heap */
 
923
                                newtup = state->memtuples[tupIndex];
 
924
                                state->mergenext[srcTape] = state->memtupindex[tupIndex];
 
925
                                if (state->mergenext[srcTape] == 0)
 
926
                                        state->mergelast[srcTape] = 0;
 
927
                                state->memtupindex[tupIndex] = state->mergefreelist;
 
928
                                state->mergefreelist = tupIndex;
 
929
                                tuplesort_heap_insert(state, newtup, srcTape, false);
 
930
                                return tup;
 
931
                        }
 
932
                        return NULL;
 
933
 
 
934
                default:
 
935
                        elog(ERROR, "invalid tuplesort state");
 
936
                        return NULL;            /* keep compiler quiet */
 
937
        }
 
938
}
 
939
 
 
940
/*
 
941
 * Fetch the next Datum in either forward or back direction.
 
942
 * Returns FALSE if no more datums.
 
943
 *
 
944
 * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
 
945
 * and is now owned by the caller.
 
946
 */
 
947
bool
 
948
tuplesort_getdatum(Tuplesortstate *state, bool forward,
 
949
                                   Datum *val, bool *isNull)
 
950
{
 
951
        DatumTuple *tuple;
 
952
        bool            should_free;
 
953
 
 
954
        tuple = (DatumTuple *) tuplesort_gettuple(state, forward, &should_free);
 
955
 
 
956
        if (tuple == NULL)
 
957
                return false;
 
958
 
 
959
        if (tuple->isNull || state->datumTypeByVal)
 
960
        {
 
961
                *val = tuple->val;
 
962
                *isNull = tuple->isNull;
 
963
        }
 
964
        else
 
965
        {
 
966
                *val = datumCopy(tuple->val, false, state->datumTypeLen);
 
967
                *isNull = false;
 
968
        }
 
969
 
 
970
        if (should_free)
 
971
                pfree(tuple);
 
972
 
 
973
        return true;
 
974
}
 
975
 
 
976
 
 
977
/*
 
978
 * inittapes - initialize for tape sorting.
 
979
 *
 
980
 * This is called only if we have found we don't have room to sort in memory.
 
981
 */
 
982
static void
 
983
inittapes(Tuplesortstate *state)
 
984
{
 
985
        int                     ntuples,
 
986
                                j;
 
987
 
 
988
        state->tapeset = LogicalTapeSetCreate(MAXTAPES);
 
989
 
 
990
        /*
 
991
         * Allocate the memtupindex array, same size as memtuples.
 
992
         */
 
993
        state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int));
 
994
 
 
995
        USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
 
996
 
 
997
        /*
 
998
         * Convert the unsorted contents of memtuples[] into a heap. Each
 
999
         * tuple is marked as belonging to run number zero.
 
1000
         *
 
1001
         * NOTE: we pass false for checkIndex since there's no point in comparing
 
1002
         * indexes in this step, even though we do intend the indexes to be
 
1003
         * part of the sort key...
 
1004
         */
 
1005
        ntuples = state->memtupcount;
 
1006
        state->memtupcount = 0;         /* make the heap empty */
 
1007
        for (j = 0; j < ntuples; j++)
 
1008
                tuplesort_heap_insert(state, state->memtuples[j], 0, false);
 
1009
        Assert(state->memtupcount == ntuples);
 
1010
 
 
1011
        state->currentRun = 0;
 
1012
 
 
1013
        /*
 
1014
         * Initialize variables of Algorithm D (step D1).
 
1015
         */
 
1016
        for (j = 0; j < MAXTAPES; j++)
 
1017
        {
 
1018
                state->tp_fib[j] = 1;
 
1019
                state->tp_runs[j] = 0;
 
1020
                state->tp_dummy[j] = 1;
 
1021
                state->tp_tapenum[j] = j;
 
1022
        }
 
1023
        state->tp_fib[TAPERANGE] = 0;
 
1024
        state->tp_dummy[TAPERANGE] = 0;
 
1025
 
 
1026
        state->Level = 1;
 
1027
        state->destTape = 0;
 
1028
 
 
1029
        state->status = TSS_BUILDRUNS;
 
1030
}
 
1031
 
 
1032
/*
 
1033
 * selectnewtape -- select new tape for new initial run.
 
1034
 *
 
1035
 * This is called after finishing a run when we know another run
 
1036
 * must be started.  This implements steps D3, D4 of Algorithm D.
 
1037
 */
 
1038
static void
 
1039
selectnewtape(Tuplesortstate *state)
 
1040
{
 
1041
        int                     j;
 
1042
        int                     a;
 
1043
 
 
1044
        /* Step D3: advance j (destTape) */
 
1045
        if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
 
1046
        {
 
1047
                state->destTape++;
 
1048
                return;
 
1049
        }
 
1050
        if (state->tp_dummy[state->destTape] != 0)
 
1051
        {
 
1052
                state->destTape = 0;
 
1053
                return;
 
1054
        }
 
1055
 
 
1056
        /* Step D4: increase level */
 
1057
        state->Level++;
 
1058
        a = state->tp_fib[0];
 
1059
        for (j = 0; j < TAPERANGE; j++)
 
1060
        {
 
1061
                state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
 
1062
                state->tp_fib[j] = a + state->tp_fib[j + 1];
 
1063
        }
 
1064
        state->destTape = 0;
 
1065
}
 
1066
 
 
1067
/*
 
1068
 * mergeruns -- merge all the completed initial runs.
 
1069
 *
 
1070
 * This implements steps D5, D6 of Algorithm D.  All input data has
 
1071
 * already been written to initial runs on tape (see dumptuples).
 
1072
 */
 
1073
static void
 
1074
mergeruns(Tuplesortstate *state)
 
1075
{
 
1076
        int                     tapenum,
 
1077
                                svTape,
 
1078
                                svRuns,
 
1079
                                svDummy;
 
1080
 
 
1081
        Assert(state->status == TSS_BUILDRUNS);
 
1082
        Assert(state->memtupcount == 0);
 
1083
 
 
1084
        /*
 
1085
         * If we produced only one initial run (quite likely if the total data
 
1086
         * volume is between 1X and 2X workMem), we can just use that tape as
 
1087
         * the finished output, rather than doing a useless merge.
 
1088
         */
 
1089
        if (state->currentRun == 1)
 
1090
        {
 
1091
                state->result_tape = state->tp_tapenum[state->destTape];
 
1092
                /* must freeze and rewind the finished output tape */
 
1093
                LogicalTapeFreeze(state->tapeset, state->result_tape);
 
1094
                state->status = TSS_SORTEDONTAPE;
 
1095
                return;
 
1096
        }
 
1097
 
 
1098
        /* End of step D2: rewind all output tapes to prepare for merging */
 
1099
        for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
 
1100
                LogicalTapeRewind(state->tapeset, tapenum, false);
 
1101
 
 
1102
        for (;;)
 
1103
        {
 
1104
                /* Step D5: merge runs onto tape[T] until tape[P] is empty */
 
1105
                while (state->tp_runs[TAPERANGE - 1] || state->tp_dummy[TAPERANGE - 1])
 
1106
                {
 
1107
                        bool            allDummy = true;
 
1108
                        bool            allOneRun = true;
 
1109
 
 
1110
                        for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
 
1111
                        {
 
1112
                                if (state->tp_dummy[tapenum] == 0)
 
1113
                                        allDummy = false;
 
1114
                                if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
 
1115
                                        allOneRun = false;
 
1116
                        }
 
1117
 
 
1118
                        /*
 
1119
                         * If we don't have to produce a materialized sorted tape,
 
1120
                         * quit as soon as we're down to one real/dummy run per tape.
 
1121
                         */
 
1122
                        if (!state->randomAccess && allOneRun)
 
1123
                        {
 
1124
                                Assert(!allDummy);
 
1125
                                /* Initialize for the final merge pass */
 
1126
                                beginmerge(state);
 
1127
                                state->status = TSS_FINALMERGE;
 
1128
                                return;
 
1129
                        }
 
1130
                        if (allDummy)
 
1131
                        {
 
1132
                                state->tp_dummy[TAPERANGE]++;
 
1133
                                for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
 
1134
                                        state->tp_dummy[tapenum]--;
 
1135
                        }
 
1136
                        else
 
1137
                                mergeonerun(state);
 
1138
                }
 
1139
                /* Step D6: decrease level */
 
1140
                if (--state->Level == 0)
 
1141
                        break;
 
1142
                /* rewind output tape T to use as new input */
 
1143
                LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE],
 
1144
                                                  false);
 
1145
                /* rewind used-up input tape P, and prepare it for write pass */
 
1146
                LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE - 1],
 
1147
                                                  true);
 
1148
                state->tp_runs[TAPERANGE - 1] = 0;
 
1149
 
 
1150
                /*
 
1151
                 * reassign tape units per step D6; note we no longer care about
 
1152
                 * A[]
 
1153
                 */
 
1154
                svTape = state->tp_tapenum[TAPERANGE];
 
1155
                svDummy = state->tp_dummy[TAPERANGE];
 
1156
                svRuns = state->tp_runs[TAPERANGE];
 
1157
                for (tapenum = TAPERANGE; tapenum > 0; tapenum--)
 
1158
                {
 
1159
                        state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
 
1160
                        state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
 
1161
                        state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
 
1162
                }
 
1163
                state->tp_tapenum[0] = svTape;
 
1164
                state->tp_dummy[0] = svDummy;
 
1165
                state->tp_runs[0] = svRuns;
 
1166
        }
 
1167
 
 
1168
        /*
 
1169
         * Done.  Knuth says that the result is on TAPE[1], but since we
 
1170
         * exited the loop without performing the last iteration of step D6,
 
1171
         * we have not rearranged the tape unit assignment, and therefore the
 
1172
         * result is on TAPE[T].  We need to do it this way so that we can
 
1173
         * freeze the final output tape while rewinding it.  The last
 
1174
         * iteration of step D6 would be a waste of cycles anyway...
 
1175
         */
 
1176
        state->result_tape = state->tp_tapenum[TAPERANGE];
 
1177
        LogicalTapeFreeze(state->tapeset, state->result_tape);
 
1178
        state->status = TSS_SORTEDONTAPE;
 
1179
}
 
1180
 
 
1181
/*
 
1182
 * Merge one run from each input tape, except ones with dummy runs.
 
1183
 *
 
1184
 * This is the inner loop of Algorithm D step D5.  We know that the
 
1185
 * output tape is TAPE[T].
 
1186
 */
 
1187
static void
 
1188
mergeonerun(Tuplesortstate *state)
 
1189
{
 
1190
        int                     destTape = state->tp_tapenum[TAPERANGE];
 
1191
        int                     srcTape;
 
1192
        int                     tupIndex;
 
1193
        void       *tup;
 
1194
        long            priorAvail,
 
1195
                                spaceFreed;
 
1196
 
 
1197
        /*
 
1198
         * Start the merge by loading one tuple from each active source tape
 
1199
         * into the heap.  We can also decrease the input run/dummy run
 
1200
         * counts.
 
1201
         */
 
1202
        beginmerge(state);
 
1203
 
 
1204
        /*
 
1205
         * Execute merge by repeatedly extracting lowest tuple in heap,
 
1206
         * writing it out, and replacing it with next tuple from same tape (if
 
1207
         * there is another one).
 
1208
         */
 
1209
        while (state->memtupcount > 0)
 
1210
        {
 
1211
                CHECK_FOR_INTERRUPTS();
 
1212
                /* write the tuple to destTape */
 
1213
                priorAvail = state->availMem;
 
1214
                srcTape = state->memtupindex[0];
 
1215
                WRITETUP(state, destTape, state->memtuples[0]);
 
1216
                /* writetup adjusted total free space, now fix per-tape space */
 
1217
                spaceFreed = state->availMem - priorAvail;
 
1218
                state->mergeavailmem[srcTape] += spaceFreed;
 
1219
                /* compact the heap */
 
1220
                tuplesort_heap_siftup(state, false);
 
1221
                if ((tupIndex = state->mergenext[srcTape]) == 0)
 
1222
                {
 
1223
                        /* out of preloaded data on this tape, try to read more */
 
1224
                        mergepreread(state);
 
1225
                        /* if still no data, we've reached end of run on this tape */
 
1226
                        if ((tupIndex = state->mergenext[srcTape]) == 0)
 
1227
                                continue;
 
1228
                }
 
1229
                /* pull next preread tuple from list, insert in heap */
 
1230
                tup = state->memtuples[tupIndex];
 
1231
                state->mergenext[srcTape] = state->memtupindex[tupIndex];
 
1232
                if (state->mergenext[srcTape] == 0)
 
1233
                        state->mergelast[srcTape] = 0;
 
1234
                state->memtupindex[tupIndex] = state->mergefreelist;
 
1235
                state->mergefreelist = tupIndex;
 
1236
                tuplesort_heap_insert(state, tup, srcTape, false);
 
1237
        }
 
1238
 
 
1239
        /*
 
1240
         * When the heap empties, we're done.  Write an end-of-run marker on
 
1241
         * the output tape, and increment its count of real runs.
 
1242
         */
 
1243
        markrunend(state, destTape);
 
1244
        state->tp_runs[TAPERANGE]++;
 
1245
}
 
1246
 
 
1247
/*
 
1248
 * beginmerge - initialize for a merge pass
 
1249
 *
 
1250
 * We decrease the counts of real and dummy runs for each tape, and mark
 
1251
 * which tapes contain active input runs in mergeactive[].      Then, load
 
1252
 * as many tuples as we can from each active input tape, and finally
 
1253
 * fill the merge heap with the first tuple from each active tape.
 
1254
 */
 
1255
static void
 
1256
beginmerge(Tuplesortstate *state)
 
1257
{
 
1258
        int                     activeTapes;
 
1259
        int                     tapenum;
 
1260
        int                     srcTape;
 
1261
 
 
1262
        /* Heap should be empty here */
 
1263
        Assert(state->memtupcount == 0);
 
1264
 
 
1265
        /* Clear merge-pass state variables */
 
1266
        memset(state->mergeactive, 0, sizeof(state->mergeactive));
 
1267
        memset(state->mergenext, 0, sizeof(state->mergenext));
 
1268
        memset(state->mergelast, 0, sizeof(state->mergelast));
 
1269
        memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem));
 
1270
        state->mergefreelist = 0;       /* nothing in the freelist */
 
1271
        state->mergefirstfree = MAXTAPES;       /* first slot available for
 
1272
                                                                                 * preread */
 
1273
 
 
1274
        /* Adjust run counts and mark the active tapes */
 
1275
        activeTapes = 0;
 
1276
        for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
 
1277
        {
 
1278
                if (state->tp_dummy[tapenum] > 0)
 
1279
                        state->tp_dummy[tapenum]--;
 
1280
                else
 
1281
                {
 
1282
                        Assert(state->tp_runs[tapenum] > 0);
 
1283
                        state->tp_runs[tapenum]--;
 
1284
                        srcTape = state->tp_tapenum[tapenum];
 
1285
                        state->mergeactive[srcTape] = true;
 
1286
                        activeTapes++;
 
1287
                }
 
1288
        }
 
1289
 
 
1290
        /*
 
1291
         * Initialize space allocation to let each active input tape have an
 
1292
         * equal share of preread space.
 
1293
         */
 
1294
        Assert(activeTapes > 0);
 
1295
        state->spacePerTape = state->availMem / activeTapes;
 
1296
        for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
 
1297
        {
 
1298
                if (state->mergeactive[srcTape])
 
1299
                        state->mergeavailmem[srcTape] = state->spacePerTape;
 
1300
        }
 
1301
 
 
1302
        /*
 
1303
         * Preread as many tuples as possible (and at least one) from each
 
1304
         * active tape
 
1305
         */
 
1306
        mergepreread(state);
 
1307
 
 
1308
        /* Load the merge heap with the first tuple from each input tape */
 
1309
        for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
 
1310
        {
 
1311
                int                     tupIndex = state->mergenext[srcTape];
 
1312
                void       *tup;
 
1313
 
 
1314
                if (tupIndex)
 
1315
                {
 
1316
                        tup = state->memtuples[tupIndex];
 
1317
                        state->mergenext[srcTape] = state->memtupindex[tupIndex];
 
1318
                        if (state->mergenext[srcTape] == 0)
 
1319
                                state->mergelast[srcTape] = 0;
 
1320
                        state->memtupindex[tupIndex] = state->mergefreelist;
 
1321
                        state->mergefreelist = tupIndex;
 
1322
                        tuplesort_heap_insert(state, tup, srcTape, false);
 
1323
                }
 
1324
        }
 
1325
}
 
1326
 
 
1327
/*
 
1328
 * mergepreread - load tuples from merge input tapes
 
1329
 *
 
1330
 * This routine exists to improve sequentiality of reads during a merge pass,
 
1331
 * as explained in the header comments of this file.  Load tuples from each
 
1332
 * active source tape until the tape's run is exhausted or it has used up
 
1333
 * its fair share of available memory.  In any case, we guarantee that there
 
1334
 * is at one preread tuple available from each unexhausted input tape.
 
1335
 */
 
1336
static void
 
1337
mergepreread(Tuplesortstate *state)
 
1338
{
 
1339
        int                     srcTape;
 
1340
        unsigned int tuplen;
 
1341
        void       *tup;
 
1342
        int                     tupIndex;
 
1343
        long            priorAvail,
 
1344
                                spaceUsed;
 
1345
 
 
1346
        for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
 
1347
        {
 
1348
                if (!state->mergeactive[srcTape])
 
1349
                        continue;
 
1350
 
 
1351
                /*
 
1352
                 * Skip reading from any tape that still has at least half of its
 
1353
                 * target memory filled with tuples (threshold fraction may need
 
1354
                 * adjustment?).  This avoids reading just a few tuples when the
 
1355
                 * incoming runs are not being consumed evenly.
 
1356
                 */
 
1357
                if (state->mergenext[srcTape] != 0 &&
 
1358
                        state->mergeavailmem[srcTape] <= state->spacePerTape / 2)
 
1359
                        continue;
 
1360
 
 
1361
                /*
 
1362
                 * Read tuples from this tape until it has used up its free
 
1363
                 * memory, but ensure that we have at least one.
 
1364
                 */
 
1365
                priorAvail = state->availMem;
 
1366
                state->availMem = state->mergeavailmem[srcTape];
 
1367
                while (!LACKMEM(state) || state->mergenext[srcTape] == 0)
 
1368
                {
 
1369
                        /* read next tuple, if any */
 
1370
                        if ((tuplen = getlen(state, srcTape, true)) == 0)
 
1371
                        {
 
1372
                                state->mergeactive[srcTape] = false;
 
1373
                                break;
 
1374
                        }
 
1375
                        tup = READTUP(state, srcTape, tuplen);
 
1376
                        /* find or make a free slot in memtuples[] for it */
 
1377
                        tupIndex = state->mergefreelist;
 
1378
                        if (tupIndex)
 
1379
                                state->mergefreelist = state->memtupindex[tupIndex];
 
1380
                        else
 
1381
                        {
 
1382
                                tupIndex = state->mergefirstfree++;
 
1383
                                /* Might need to enlarge arrays! */
 
1384
                                if (tupIndex >= state->memtupsize)
 
1385
                                {
 
1386
                                        FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
 
1387
                                        FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));
 
1388
                                        state->memtupsize *= 2;
 
1389
                                        state->memtuples = (void **)
 
1390
                                                repalloc(state->memtuples,
 
1391
                                                                 state->memtupsize * sizeof(void *));
 
1392
                                        state->memtupindex = (int *)
 
1393
                                                repalloc(state->memtupindex,
 
1394
                                                                 state->memtupsize * sizeof(int));
 
1395
                                        USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 
1396
                                        USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
 
1397
                                }
 
1398
                        }
 
1399
                        /* store tuple, append to list for its tape */
 
1400
                        state->memtuples[tupIndex] = tup;
 
1401
                        state->memtupindex[tupIndex] = 0;
 
1402
                        if (state->mergelast[srcTape])
 
1403
                                state->memtupindex[state->mergelast[srcTape]] = tupIndex;
 
1404
                        else
 
1405
                                state->mergenext[srcTape] = tupIndex;
 
1406
                        state->mergelast[srcTape] = tupIndex;
 
1407
                }
 
1408
                /* update per-tape and global availmem counts */
 
1409
                spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
 
1410
                state->mergeavailmem[srcTape] = state->availMem;
 
1411
                state->availMem = priorAvail - spaceUsed;
 
1412
        }
 
1413
}
 
1414
 
 
1415
/*
 
1416
 * dumptuples - remove tuples from heap and write to tape
 
1417
 *
 
1418
 * This is used during initial-run building, but not during merging.
 
1419
 *
 
1420
 * When alltuples = false, dump only enough tuples to get under the
 
1421
 * availMem limit (and leave at least one tuple in the heap in any case,
 
1422
 * since puttuple assumes it always has a tuple to compare to).
 
1423
 *
 
1424
 * When alltuples = true, dump everything currently in memory.
 
1425
 * (This case is only used at end of input data.)
 
1426
 *
 
1427
 * If we empty the heap, close out the current run and return (this should
 
1428
 * only happen at end of input data).  If we see that the tuple run number
 
1429
 * at the top of the heap has changed, start a new run.
 
1430
 */
 
1431
static void
 
1432
dumptuples(Tuplesortstate *state, bool alltuples)
 
1433
{
 
1434
        while (alltuples ||
 
1435
                   (LACKMEM(state) && state->memtupcount > 1))
 
1436
        {
 
1437
                /*
 
1438
                 * Dump the heap's frontmost entry, and sift up to remove it from
 
1439
                 * the heap.
 
1440
                 */
 
1441
                Assert(state->memtupcount > 0);
 
1442
                WRITETUP(state, state->tp_tapenum[state->destTape],
 
1443
                                 state->memtuples[0]);
 
1444
                tuplesort_heap_siftup(state, true);
 
1445
 
 
1446
                /*
 
1447
                 * If the heap is empty *or* top run number has changed, we've
 
1448
                 * finished the current run.
 
1449
                 */
 
1450
                if (state->memtupcount == 0 ||
 
1451
                        state->currentRun != state->memtupindex[0])
 
1452
                {
 
1453
                        markrunend(state, state->tp_tapenum[state->destTape]);
 
1454
                        state->currentRun++;
 
1455
                        state->tp_runs[state->destTape]++;
 
1456
                        state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
 
1457
 
 
1458
                        /*
 
1459
                         * Done if heap is empty, else prepare for new run.
 
1460
                         */
 
1461
                        if (state->memtupcount == 0)
 
1462
                                break;
 
1463
                        Assert(state->currentRun == state->memtupindex[0]);
 
1464
                        selectnewtape(state);
 
1465
                }
 
1466
        }
 
1467
}
 
1468
 
 
1469
/*
 
1470
 * tuplesort_rescan             - rewind and replay the scan
 
1471
 */
 
1472
void
 
1473
tuplesort_rescan(Tuplesortstate *state)
 
1474
{
 
1475
        Assert(state->randomAccess);
 
1476
 
 
1477
        switch (state->status)
 
1478
        {
 
1479
                case TSS_SORTEDINMEM:
 
1480
                        state->current = 0;
 
1481
                        state->eof_reached = false;
 
1482
                        state->markpos_offset = 0;
 
1483
                        state->markpos_eof = false;
 
1484
                        break;
 
1485
                case TSS_SORTEDONTAPE:
 
1486
                        LogicalTapeRewind(state->tapeset,
 
1487
                                                          state->result_tape,
 
1488
                                                          false);
 
1489
                        state->eof_reached = false;
 
1490
                        state->markpos_block = 0L;
 
1491
                        state->markpos_offset = 0;
 
1492
                        state->markpos_eof = false;
 
1493
                        break;
 
1494
                default:
 
1495
                        elog(ERROR, "invalid tuplesort state");
 
1496
                        break;
 
1497
        }
 
1498
}
 
1499
 
 
1500
/*
 
1501
 * tuplesort_markpos    - saves current position in the merged sort file
 
1502
 */
 
1503
void
 
1504
tuplesort_markpos(Tuplesortstate *state)
 
1505
{
 
1506
        Assert(state->randomAccess);
 
1507
 
 
1508
        switch (state->status)
 
1509
        {
 
1510
                case TSS_SORTEDINMEM:
 
1511
                        state->markpos_offset = state->current;
 
1512
                        state->markpos_eof = state->eof_reached;
 
1513
                        break;
 
1514
                case TSS_SORTEDONTAPE:
 
1515
                        LogicalTapeTell(state->tapeset,
 
1516
                                                        state->result_tape,
 
1517
                                                        &state->markpos_block,
 
1518
                                                        &state->markpos_offset);
 
1519
                        state->markpos_eof = state->eof_reached;
 
1520
                        break;
 
1521
                default:
 
1522
                        elog(ERROR, "invalid tuplesort state");
 
1523
                        break;
 
1524
        }
 
1525
}
 
1526
 
 
1527
/*
 
1528
 * tuplesort_restorepos - restores current position in merged sort file to
 
1529
 *                                                last saved position
 
1530
 */
 
1531
void
 
1532
tuplesort_restorepos(Tuplesortstate *state)
 
1533
{
 
1534
        Assert(state->randomAccess);
 
1535
 
 
1536
        switch (state->status)
 
1537
        {
 
1538
                case TSS_SORTEDINMEM:
 
1539
                        state->current = state->markpos_offset;
 
1540
                        state->eof_reached = state->markpos_eof;
 
1541
                        break;
 
1542
                case TSS_SORTEDONTAPE:
 
1543
                        if (!LogicalTapeSeek(state->tapeset,
 
1544
                                                                 state->result_tape,
 
1545
                                                                 state->markpos_block,
 
1546
                                                                 state->markpos_offset))
 
1547
                                elog(ERROR, "tuplesort_restorepos failed");
 
1548
                        state->eof_reached = state->markpos_eof;
 
1549
                        break;
 
1550
                default:
 
1551
                        elog(ERROR, "invalid tuplesort state");
 
1552
                        break;
 
1553
        }
 
1554
}
 
1555
 
 
1556
 
 
1557
/*
 
1558
 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
 
1559
 *
 
1560
 * The heap lives in state->memtuples[], with parallel data storage
 
1561
 * for indexes in state->memtupindex[].  If checkIndex is true, use
 
1562
 * the tuple index as the front of the sort key; otherwise, no.
 
1563
 */
 
1564
 
 
1565
#define HEAPCOMPARE(tup1,index1,tup2,index2) \
 
1566
        (checkIndex && (index1 != index2) ? index1 - index2 : \
 
1567
         COMPARETUP(state, tup1, tup2))
 
1568
 
 
1569
/*
 
1570
 * Insert a new tuple into an empty or existing heap, maintaining the
 
1571
 * heap invariant.
 
1572
 */
 
1573
static void
 
1574
tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
 
1575
                                          int tupleindex, bool checkIndex)
 
1576
{
 
1577
        void      **memtuples;
 
1578
        int                *memtupindex;
 
1579
        int                     j;
 
1580
 
 
1581
        /*
 
1582
         * Make sure memtuples[] can handle another entry.
 
1583
         */
 
1584
        if (state->memtupcount >= state->memtupsize)
 
1585
        {
 
1586
                FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
 
1587
                FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));
 
1588
                state->memtupsize *= 2;
 
1589
                state->memtuples = (void **)
 
1590
                        repalloc(state->memtuples,
 
1591
                                         state->memtupsize * sizeof(void *));
 
1592
                state->memtupindex = (int *)
 
1593
                        repalloc(state->memtupindex,
 
1594
                                         state->memtupsize * sizeof(int));
 
1595
                USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 
1596
                USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
 
1597
        }
 
1598
        memtuples = state->memtuples;
 
1599
        memtupindex = state->memtupindex;
 
1600
 
 
1601
        /*
 
1602
         * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth
 
1603
         * is using 1-based array indexes, not 0-based.
 
1604
         */
 
1605
        j = state->memtupcount++;
 
1606
        while (j > 0)
 
1607
        {
 
1608
                int                     i = (j - 1) >> 1;
 
1609
 
 
1610
                if (HEAPCOMPARE(tuple, tupleindex,
 
1611
                                                memtuples[i], memtupindex[i]) >= 0)
 
1612
                        break;
 
1613
                memtuples[j] = memtuples[i];
 
1614
                memtupindex[j] = memtupindex[i];
 
1615
                j = i;
 
1616
        }
 
1617
        memtuples[j] = tuple;
 
1618
        memtupindex[j] = tupleindex;
 
1619
}
 
1620
 
 
1621
/*
 
1622
 * The tuple at state->memtuples[0] has been removed from the heap.
 
1623
 * Decrement memtupcount, and sift up to maintain the heap invariant.
 
1624
 */
 
1625
static void
 
1626
tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
 
1627
{
 
1628
        void      **memtuples = state->memtuples;
 
1629
        int                *memtupindex = state->memtupindex;
 
1630
        void       *tuple;
 
1631
        int                     tupindex,
 
1632
                                i,
 
1633
                                n;
 
1634
 
 
1635
        if (--state->memtupcount <= 0)
 
1636
                return;
 
1637
        n = state->memtupcount;
 
1638
        tuple = memtuples[n];           /* tuple that must be reinserted */
 
1639
        tupindex = memtupindex[n];
 
1640
        i = 0;                                          /* i is where the "hole" is */
 
1641
        for (;;)
 
1642
        {
 
1643
                int                     j = 2 * i + 1;
 
1644
 
 
1645
                if (j >= n)
 
1646
                        break;
 
1647
                if (j + 1 < n &&
 
1648
                        HEAPCOMPARE(memtuples[j], memtupindex[j],
 
1649
                                                memtuples[j + 1], memtupindex[j + 1]) > 0)
 
1650
                        j++;
 
1651
                if (HEAPCOMPARE(tuple, tupindex,
 
1652
                                                memtuples[j], memtupindex[j]) <= 0)
 
1653
                        break;
 
1654
                memtuples[i] = memtuples[j];
 
1655
                memtupindex[i] = memtupindex[j];
 
1656
                i = j;
 
1657
        }
 
1658
        memtuples[i] = tuple;
 
1659
        memtupindex[i] = tupindex;
 
1660
}
 
1661
 
 
1662
 
 
1663
/*
 
1664
 * Tape interface routines
 
1665
 */
 
1666
 
 
1667
static unsigned int
 
1668
getlen(Tuplesortstate *state, int tapenum, bool eofOK)
 
1669
{
 
1670
        unsigned int len;
 
1671
 
 
1672
        if (LogicalTapeRead(state->tapeset, tapenum, (void *) &len,
 
1673
                                                sizeof(len)) != sizeof(len))
 
1674
                elog(ERROR, "unexpected end of tape");
 
1675
        if (len == 0 && !eofOK)
 
1676
                elog(ERROR, "unexpected end of data");
 
1677
        return len;
 
1678
}
 
1679
 
 
1680
static void
 
1681
markrunend(Tuplesortstate *state, int tapenum)
 
1682
{
 
1683
        unsigned int len = 0;
 
1684
 
 
1685
        LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
 
1686
}
 
1687
 
 
1688
 
 
1689
/*
 
1690
 * qsort interface
 
1691
 */
 
1692
 
 
1693
static int
 
1694
qsort_comparetup(const void *a, const void *b)
 
1695
{
 
1696
        /* The passed pointers are pointers to void * ... */
 
1697
 
 
1698
        return COMPARETUP(qsort_tuplesortstate, *(void **) a, *(void **) b);
 
1699
}
 
1700
 
 
1701
 
 
1702
/*
 
1703
 * This routine selects an appropriate sorting function to implement
 
1704
 * a sort operator as efficiently as possible.  The straightforward
 
1705
 * method is to use the operator's implementation proc --- ie, "<"
 
1706
 * comparison.  However, that way often requires two calls of the function
 
1707
 * per comparison.      If we can find a btree three-way comparator function
 
1708
 * associated with the operator, we can use it to do the comparisons
 
1709
 * more efficiently.  We also support the possibility that the operator
 
1710
 * is ">" (descending sort), in which case we have to reverse the output
 
1711
 * of the btree comparator.
 
1712
 *
 
1713
 * Possibly this should live somewhere else (backend/catalog/, maybe?).
 
1714
 */
 
1715
void
 
1716
SelectSortFunction(Oid sortOperator,
 
1717
                                   RegProcedure *sortFunction,
 
1718
                                   SortFunctionKind *kind)
 
1719
{
 
1720
        CatCList   *catlist;
 
1721
        int                     i;
 
1722
        HeapTuple       tuple;
 
1723
        Form_pg_operator optup;
 
1724
        Oid                     opclass = InvalidOid;
 
1725
 
 
1726
        /*
 
1727
         * Search pg_amop to see if the target operator is registered as the
 
1728
         * "<" or ">" operator of any btree opclass.  It's possible that it
 
1729
         * might be registered both ways (eg, if someone were to build a
 
1730
         * "reverse sort" opclass for some reason); prefer the "<" case if so.
 
1731
         * If the operator is registered the same way in multiple opclasses,
 
1732
         * assume we can use the associated comparator function from any one.
 
1733
         */
 
1734
        catlist = SearchSysCacheList(AMOPOPID, 1,
 
1735
                                                                 ObjectIdGetDatum(sortOperator),
 
1736
                                                                 0, 0, 0);
 
1737
 
 
1738
        for (i = 0; i < catlist->n_members; i++)
 
1739
        {
 
1740
                Form_pg_amop aform;
 
1741
 
 
1742
                tuple = &catlist->members[i]->tuple;
 
1743
                aform = (Form_pg_amop) GETSTRUCT(tuple);
 
1744
 
 
1745
                if (!opclass_is_btree(aform->amopclaid))
 
1746
                        continue;
 
1747
                /* must be of default subtype, too */
 
1748
                if (aform->amopsubtype != InvalidOid)
 
1749
                        continue;
 
1750
 
 
1751
                if (aform->amopstrategy == BTLessStrategyNumber)
 
1752
                {
 
1753
                        opclass = aform->amopclaid;
 
1754
                        *kind = SORTFUNC_CMP;
 
1755
                        break;                          /* done looking */
 
1756
                }
 
1757
                else if (aform->amopstrategy == BTGreaterStrategyNumber)
 
1758
                {
 
1759
                        opclass = aform->amopclaid;
 
1760
                        *kind = SORTFUNC_REVCMP;
 
1761
                        /* keep scanning in hopes of finding a BTLess entry */
 
1762
                }
 
1763
        }
 
1764
 
 
1765
        ReleaseSysCacheList(catlist);
 
1766
 
 
1767
        if (OidIsValid(opclass))
 
1768
        {
 
1769
                /* Found a suitable opclass, get its default comparator function */
 
1770
                *sortFunction = get_opclass_proc(opclass, InvalidOid, BTORDER_PROC);
 
1771
                Assert(RegProcedureIsValid(*sortFunction));
 
1772
                return;
 
1773
        }
 
1774
 
 
1775
        /*
 
1776
         * Can't find a comparator, so use the operator as-is.  Decide whether
 
1777
         * it is forward or reverse sort by looking at its name (grotty, but
 
1778
         * this only matters for deciding which end NULLs should get sorted
 
1779
         * to).  XXX possibly better idea: see whether its selectivity
 
1780
         * function is scalargtcmp?
 
1781
         */
 
1782
        tuple = SearchSysCache(OPEROID,
 
1783
                                                   ObjectIdGetDatum(sortOperator),
 
1784
                                                   0, 0, 0);
 
1785
        if (!HeapTupleIsValid(tuple))
 
1786
                elog(ERROR, "cache lookup failed for operator %u", sortOperator);
 
1787
        optup = (Form_pg_operator) GETSTRUCT(tuple);
 
1788
        if (strcmp(NameStr(optup->oprname), ">") == 0)
 
1789
                *kind = SORTFUNC_REVLT;
 
1790
        else
 
1791
                *kind = SORTFUNC_LT;
 
1792
        *sortFunction = optup->oprcode;
 
1793
        ReleaseSysCache(tuple);
 
1794
 
 
1795
        Assert(RegProcedureIsValid(*sortFunction));
 
1796
}
 
1797
 
 
1798
/*
 
1799
 * Inline-able copy of FunctionCall2() to save some cycles in sorting.
 
1800
 */
 
1801
static inline Datum
 
1802
myFunctionCall2(FmgrInfo *flinfo, Datum arg1, Datum arg2)
 
1803
{
 
1804
        FunctionCallInfoData fcinfo;
 
1805
        Datum           result;
 
1806
 
 
1807
        fcinfo.flinfo = flinfo;
 
1808
        fcinfo.context = NULL;
 
1809
        fcinfo.resultinfo = NULL;
 
1810
        fcinfo.isnull = false;
 
1811
        fcinfo.nargs = 2;
 
1812
 
 
1813
        fcinfo.arg[0] = arg1;
 
1814
        fcinfo.arg[1] = arg2;
 
1815
        fcinfo.argnull[0] = false;
 
1816
        fcinfo.argnull[1] = false;
 
1817
 
 
1818
        result = FunctionCallInvoke(&fcinfo);
 
1819
 
 
1820
        /* Check for null result, since caller is clearly not expecting one */
 
1821
        if (fcinfo.isnull)
 
1822
                elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
 
1823
 
 
1824
        return result;
 
1825
}
 
1826
 
 
1827
/*
 
1828
 * Apply a sort function (by now converted to fmgr lookup form)
 
1829
 * and return a 3-way comparison result.  This takes care of handling
 
1830
 * NULLs and sort ordering direction properly.
 
1831
 */
 
1832
static inline int32
 
1833
inlineApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
 
1834
                                                Datum datum1, bool isNull1,
 
1835
                                                Datum datum2, bool isNull2)
 
1836
{
 
1837
        switch (kind)
 
1838
        {
 
1839
                case SORTFUNC_LT:
 
1840
                        if (isNull1)
 
1841
                        {
 
1842
                                if (isNull2)
 
1843
                                        return 0;
 
1844
                                return 1;               /* NULL sorts after non-NULL */
 
1845
                        }
 
1846
                        if (isNull2)
 
1847
                                return -1;
 
1848
                        if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
 
1849
                                return -1;              /* a < b */
 
1850
                        if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
 
1851
                                return 1;               /* a > b */
 
1852
                        return 0;
 
1853
 
 
1854
                case SORTFUNC_REVLT:
 
1855
                        /* We reverse the ordering of NULLs, but not the operator */
 
1856
                        if (isNull1)
 
1857
                        {
 
1858
                                if (isNull2)
 
1859
                                        return 0;
 
1860
                                return -1;              /* NULL sorts before non-NULL */
 
1861
                        }
 
1862
                        if (isNull2)
 
1863
                                return 1;
 
1864
                        if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
 
1865
                                return -1;              /* a < b */
 
1866
                        if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
 
1867
                                return 1;               /* a > b */
 
1868
                        return 0;
 
1869
 
 
1870
                case SORTFUNC_CMP:
 
1871
                        if (isNull1)
 
1872
                        {
 
1873
                                if (isNull2)
 
1874
                                        return 0;
 
1875
                                return 1;               /* NULL sorts after non-NULL */
 
1876
                        }
 
1877
                        if (isNull2)
 
1878
                                return -1;
 
1879
                        return DatumGetInt32(myFunctionCall2(sortFunction,
 
1880
                                                                                                 datum1, datum2));
 
1881
 
 
1882
                case SORTFUNC_REVCMP:
 
1883
                        if (isNull1)
 
1884
                        {
 
1885
                                if (isNull2)
 
1886
                                        return 0;
 
1887
                                return -1;              /* NULL sorts before non-NULL */
 
1888
                        }
 
1889
                        if (isNull2)
 
1890
                                return 1;
 
1891
                        return -DatumGetInt32(myFunctionCall2(sortFunction,
 
1892
                                                                                                  datum1, datum2));
 
1893
 
 
1894
                default:
 
1895
                        elog(ERROR, "unrecognized SortFunctionKind: %d", (int) kind);
 
1896
                        return 0;                       /* can't get here, but keep compiler quiet */
 
1897
        }
 
1898
}
 
1899
 
 
1900
/*
 
1901
 * Non-inline ApplySortFunction() --- this is needed only to conform to
 
1902
 * C99's brain-dead notions about how to implement inline functions...
 
1903
 */
 
1904
int32
 
1905
ApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
 
1906
                                  Datum datum1, bool isNull1,
 
1907
                                  Datum datum2, bool isNull2)
 
1908
{
 
1909
        return inlineApplySortFunction(sortFunction, kind,
 
1910
                                                                   datum1, isNull1,
 
1911
                                                                   datum2, isNull2);
 
1912
}
 
1913
 
 
1914
 
 
1915
/*
 
1916
 * Routines specialized for HeapTuple case
 
1917
 */
 
1918
 
 
1919
static int
 
1920
comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
 
1921
{
 
1922
        HeapTuple       ltup = (HeapTuple) a;
 
1923
        HeapTuple       rtup = (HeapTuple) b;
 
1924
        TupleDesc       tupDesc = state->tupDesc;
 
1925
        int                     nkey;
 
1926
 
 
1927
        for (nkey = 0; nkey < state->nKeys; nkey++)
 
1928
        {
 
1929
                ScanKey         scanKey = state->scanKeys + nkey;
 
1930
                AttrNumber      attno = scanKey->sk_attno;
 
1931
                Datum           datum1,
 
1932
                                        datum2;
 
1933
                bool            isnull1,
 
1934
                                        isnull2;
 
1935
                int32           compare;
 
1936
 
 
1937
                datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
 
1938
                datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
 
1939
 
 
1940
                compare = inlineApplySortFunction(&scanKey->sk_func,
 
1941
                                                                                  state->sortFnKinds[nkey],
 
1942
                                                                                  datum1, isnull1,
 
1943
                                                                                  datum2, isnull2);
 
1944
                if (compare != 0)
 
1945
                        return compare;
 
1946
        }
 
1947
 
 
1948
        return 0;
 
1949
}
 
1950
 
 
1951
static void *
 
1952
copytup_heap(Tuplesortstate *state, void *tup)
 
1953
{
 
1954
        HeapTuple       tuple = (HeapTuple) tup;
 
1955
 
 
1956
        tuple = heap_copytuple(tuple);
 
1957
        USEMEM(state, GetMemoryChunkSpace(tuple));
 
1958
        return (void *) tuple;
 
1959
}
 
1960
 
 
1961
/*
 
1962
 * We don't bother to write the HeapTupleData part of the tuple.
 
1963
 */
 
1964
 
 
1965
static void
 
1966
writetup_heap(Tuplesortstate *state, int tapenum, void *tup)
 
1967
{
 
1968
        HeapTuple       tuple = (HeapTuple) tup;
 
1969
        unsigned int tuplen;
 
1970
 
 
1971
        tuplen = tuple->t_len + sizeof(tuplen);
 
1972
        LogicalTapeWrite(state->tapeset, tapenum,
 
1973
                                         (void *) &tuplen, sizeof(tuplen));
 
1974
        LogicalTapeWrite(state->tapeset, tapenum,
 
1975
                                         (void *) tuple->t_data, tuple->t_len);
 
1976
        if (state->randomAccess)        /* need trailing length word? */
 
1977
                LogicalTapeWrite(state->tapeset, tapenum,
 
1978
                                                 (void *) &tuplen, sizeof(tuplen));
 
1979
 
 
1980
        FREEMEM(state, GetMemoryChunkSpace(tuple));
 
1981
        heap_freetuple(tuple);
 
1982
}
 
1983
 
 
1984
static void *
 
1985
readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len)
 
1986
{
 
1987
        unsigned int tuplen = len - sizeof(unsigned int) + HEAPTUPLESIZE;
 
1988
        HeapTuple       tuple = (HeapTuple) palloc(tuplen);
 
1989
 
 
1990
        USEMEM(state, GetMemoryChunkSpace(tuple));
 
1991
        /* reconstruct the HeapTupleData portion */
 
1992
        tuple->t_len = len - sizeof(unsigned int);
 
1993
        ItemPointerSetInvalid(&(tuple->t_self));
 
1994
        tuple->t_datamcxt = CurrentMemoryContext;
 
1995
        tuple->t_data = (HeapTupleHeader) (((char *) tuple) + HEAPTUPLESIZE);
 
1996
        /* read in the tuple proper */
 
1997
        if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple->t_data,
 
1998
                                                tuple->t_len) != tuple->t_len)
 
1999
                elog(ERROR, "unexpected end of data");
 
2000
        if (state->randomAccess)        /* need trailing length word? */
 
2001
                if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
 
2002
                                                        sizeof(tuplen)) != sizeof(tuplen))
 
2003
                        elog(ERROR, "unexpected end of data");
 
2004
        return (void *) tuple;
 
2005
}
 
2006
 
 
2007
 
 
2008
/*
 
2009
 * Routines specialized for IndexTuple case
 
2010
 *
 
2011
 * NOTE: actually, these are specialized for the btree case; it's not
 
2012
 * clear whether you could use them for a non-btree index.      Possibly
 
2013
 * you'd need to make another set of routines if you needed to sort
 
2014
 * according to another kind of index.
 
2015
 */
 
2016
 
 
2017
static int
 
2018
comparetup_index(Tuplesortstate *state, const void *a, const void *b)
 
2019
{
 
2020
        /*
 
2021
         * This is almost the same as _bt_tuplecompare(), but we need to keep
 
2022
         * track of whether any null fields are present.  Also see the special
 
2023
         * treatment for equal keys at the end.
 
2024
         */
 
2025
        IndexTuple      tuple1 = (IndexTuple) a;
 
2026
        IndexTuple      tuple2 = (IndexTuple) b;
 
2027
        Relation        rel = state->indexRel;
 
2028
        int                     keysz = RelationGetNumberOfAttributes(rel);
 
2029
        ScanKey         scankey = state->indexScanKey;
 
2030
        TupleDesc       tupDes;
 
2031
        int                     i;
 
2032
        bool            equal_hasnull = false;
 
2033
 
 
2034
        tupDes = RelationGetDescr(rel);
 
2035
 
 
2036
        for (i = 1; i <= keysz; i++)
 
2037
        {
 
2038
                ScanKey         entry = &scankey[i - 1];
 
2039
                Datum           datum1,
 
2040
                                        datum2;
 
2041
                bool            isnull1,
 
2042
                                        isnull2;
 
2043
                int32           compare;
 
2044
 
 
2045
                datum1 = index_getattr(tuple1, i, tupDes, &isnull1);
 
2046
                datum2 = index_getattr(tuple2, i, tupDes, &isnull2);
 
2047
 
 
2048
                /* see comments about NULLs handling in btbuild */
 
2049
 
 
2050
                /* the comparison function is always of CMP type */
 
2051
                compare = inlineApplySortFunction(&entry->sk_func, SORTFUNC_CMP,
 
2052
                                                                                  datum1, isnull1,
 
2053
                                                                                  datum2, isnull2);
 
2054
 
 
2055
                if (compare != 0)
 
2056
                        return (int) compare;           /* done when we find unequal
 
2057
                                                                                 * attributes */
 
2058
 
 
2059
                /* they are equal, so we only need to examine one null flag */
 
2060
                if (isnull1)
 
2061
                        equal_hasnull = true;
 
2062
        }
 
2063
 
 
2064
        /*
 
2065
         * If btree has asked us to enforce uniqueness, complain if two equal
 
2066
         * tuples are detected (unless there was at least one NULL field).
 
2067
         *
 
2068
         * It is sufficient to make the test here, because if two tuples are
 
2069
         * equal they *must* get compared at some stage of the sort ---
 
2070
         * otherwise the sort algorithm wouldn't have checked whether one must
 
2071
         * appear before the other.
 
2072
         *
 
2073
         * Some rather brain-dead implementations of qsort will sometimes call
 
2074
         * the comparison routine to compare a value to itself.  (At this
 
2075
         * writing only QNX 4 is known to do such silly things.)  Don't raise
 
2076
         * a bogus error in that case.
 
2077
         */
 
2078
        if (state->enforceUnique && !equal_hasnull && tuple1 != tuple2)
 
2079
                ereport(ERROR,
 
2080
                                (errcode(ERRCODE_UNIQUE_VIOLATION),
 
2081
                                 errmsg("could not create unique index"),
 
2082
                                 errdetail("Table contains duplicated values.")));
 
2083
 
 
2084
        /*
 
2085
         * If key values are equal, we sort on ItemPointer.  This does not
 
2086
         * affect validity of the finished index, but it offers cheap
 
2087
         * insurance against performance problems with bad qsort
 
2088
         * implementations that have trouble with large numbers of equal keys.
 
2089
         */
 
2090
        {
 
2091
                BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
 
2092
                BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
 
2093
 
 
2094
                if (blk1 != blk2)
 
2095
                        return (blk1 < blk2) ? -1 : 1;
 
2096
        }
 
2097
        {
 
2098
                OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
 
2099
                OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
 
2100
 
 
2101
                if (pos1 != pos2)
 
2102
                        return (pos1 < pos2) ? -1 : 1;
 
2103
        }
 
2104
 
 
2105
        return 0;
 
2106
}
 
2107
 
 
2108
static void *
 
2109
copytup_index(Tuplesortstate *state, void *tup)
 
2110
{
 
2111
        IndexTuple      tuple = (IndexTuple) tup;
 
2112
        unsigned int tuplen = IndexTupleSize(tuple);
 
2113
        IndexTuple      newtuple;
 
2114
 
 
2115
        newtuple = (IndexTuple) palloc(tuplen);
 
2116
        USEMEM(state, GetMemoryChunkSpace(newtuple));
 
2117
 
 
2118
        memcpy(newtuple, tuple, tuplen);
 
2119
 
 
2120
        return (void *) newtuple;
 
2121
}
 
2122
 
 
2123
static void
 
2124
writetup_index(Tuplesortstate *state, int tapenum, void *tup)
 
2125
{
 
2126
        IndexTuple      tuple = (IndexTuple) tup;
 
2127
        unsigned int tuplen;
 
2128
 
 
2129
        tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
 
2130
        LogicalTapeWrite(state->tapeset, tapenum,
 
2131
                                         (void *) &tuplen, sizeof(tuplen));
 
2132
        LogicalTapeWrite(state->tapeset, tapenum,
 
2133
                                         (void *) tuple, IndexTupleSize(tuple));
 
2134
        if (state->randomAccess)        /* need trailing length word? */
 
2135
                LogicalTapeWrite(state->tapeset, tapenum,
 
2136
                                                 (void *) &tuplen, sizeof(tuplen));
 
2137
 
 
2138
        FREEMEM(state, GetMemoryChunkSpace(tuple));
 
2139
        pfree(tuple);
 
2140
}
 
2141
 
 
2142
static void *
 
2143
readtup_index(Tuplesortstate *state, int tapenum, unsigned int len)
 
2144
{
 
2145
        unsigned int tuplen = len - sizeof(unsigned int);
 
2146
        IndexTuple      tuple = (IndexTuple) palloc(tuplen);
 
2147
 
 
2148
        USEMEM(state, GetMemoryChunkSpace(tuple));
 
2149
        if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
 
2150
                                                tuplen) != tuplen)
 
2151
                elog(ERROR, "unexpected end of data");
 
2152
        if (state->randomAccess)        /* need trailing length word? */
 
2153
                if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
 
2154
                                                        sizeof(tuplen)) != sizeof(tuplen))
 
2155
                        elog(ERROR, "unexpected end of data");
 
2156
        return (void *) tuple;
 
2157
}
 
2158
 
 
2159
 
 
2160
/*
 
2161
 * Routines specialized for DatumTuple case
 
2162
 */
 
2163
 
 
2164
static int
 
2165
comparetup_datum(Tuplesortstate *state, const void *a, const void *b)
 
2166
{
 
2167
        DatumTuple *ltup = (DatumTuple *) a;
 
2168
        DatumTuple *rtup = (DatumTuple *) b;
 
2169
 
 
2170
        return inlineApplySortFunction(&state->sortOpFn, state->sortFnKind,
 
2171
                                                                   ltup->val, ltup->isNull,
 
2172
                                                                   rtup->val, rtup->isNull);
 
2173
}
 
2174
 
 
2175
static void *
 
2176
copytup_datum(Tuplesortstate *state, void *tup)
 
2177
{
 
2178
        /* Not currently needed */
 
2179
        elog(ERROR, "copytup_datum() should not be called");
 
2180
        return NULL;
 
2181
}
 
2182
 
 
2183
static void
 
2184
writetup_datum(Tuplesortstate *state, int tapenum, void *tup)
 
2185
{
 
2186
        DatumTuple *tuple = (DatumTuple *) tup;
 
2187
        unsigned int tuplen;
 
2188
        unsigned int writtenlen;
 
2189
 
 
2190
        if (tuple->isNull || state->datumTypeByVal)
 
2191
                tuplen = sizeof(DatumTuple);
 
2192
        else
 
2193
        {
 
2194
                Size            datalen;
 
2195
 
 
2196
                datalen = datumGetSize(tuple->val, false, state->datumTypeLen);
 
2197
                tuplen = datalen + MAXALIGN(sizeof(DatumTuple));
 
2198
        }
 
2199
 
 
2200
        writtenlen = tuplen + sizeof(unsigned int);
 
2201
 
 
2202
        LogicalTapeWrite(state->tapeset, tapenum,
 
2203
                                         (void *) &writtenlen, sizeof(writtenlen));
 
2204
        LogicalTapeWrite(state->tapeset, tapenum,
 
2205
                                         (void *) tuple, tuplen);
 
2206
        if (state->randomAccess)        /* need trailing length word? */
 
2207
                LogicalTapeWrite(state->tapeset, tapenum,
 
2208
                                                 (void *) &writtenlen, sizeof(writtenlen));
 
2209
 
 
2210
        FREEMEM(state, GetMemoryChunkSpace(tuple));
 
2211
        pfree(tuple);
 
2212
}
 
2213
 
 
2214
static void *
 
2215
readtup_datum(Tuplesortstate *state, int tapenum, unsigned int len)
 
2216
{
 
2217
        unsigned int tuplen = len - sizeof(unsigned int);
 
2218
        DatumTuple *tuple = (DatumTuple *) palloc(tuplen);
 
2219
 
 
2220
        USEMEM(state, GetMemoryChunkSpace(tuple));
 
2221
        if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
 
2222
                                                tuplen) != tuplen)
 
2223
                elog(ERROR, "unexpected end of data");
 
2224
        if (state->randomAccess)        /* need trailing length word? */
 
2225
                if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
 
2226
                                                        sizeof(tuplen)) != sizeof(tuplen))
 
2227
                        elog(ERROR, "unexpected end of data");
 
2228
 
 
2229
        if (!tuple->isNull && !state->datumTypeByVal)
 
2230
                tuple->val = PointerGetDatum(((char *) tuple) +
 
2231
                                                                         MAXALIGN(sizeof(DatumTuple)));
 
2232
        return (void *) tuple;
 
2233
}