~ubuntu-branches/ubuntu/hardy/postgresql-8.4/hardy-backports

« back to all changes in this revision

Viewing changes to src/backend/utils/sort/logtape.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2009-03-20 12:00:13 UTC
  • Revision ID: james.westby@ubuntu.com-20090320120013-hogj7egc5mjncc5g
Tags: upstream-8.4~0cvs20090328
ImportĀ upstreamĀ versionĀ 8.4~0cvs20090328

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * logtape.c
 
4
 *        Management of "logical tapes" within temporary files.
 
5
 *
 
6
 * This module exists to support sorting via multiple merge passes (see
 
7
 * tuplesort.c).  Merging is an ideal algorithm for tape devices, but if
 
8
 * we implement it on disk by creating a separate file for each "tape",
 
9
 * there is an annoying problem: the peak space usage is at least twice
 
10
 * the volume of actual data to be sorted.      (This must be so because each
 
11
 * datum will appear in both the input and output tapes of the final
 
12
 * merge pass.  For seven-tape polyphase merge, which is otherwise a
 
13
 * pretty good algorithm, peak usage is more like 4x actual data volume.)
 
14
 *
 
15
 * We can work around this problem by recognizing that any one tape
 
16
 * dataset (with the possible exception of the final output) is written
 
17
 * and read exactly once in a perfectly sequential manner.      Therefore,
 
18
 * a datum once read will not be required again, and we can recycle its
 
19
 * space for use by the new tape dataset(s) being generated.  In this way,
 
20
 * the total space usage is essentially just the actual data volume, plus
 
21
 * insignificant bookkeeping and start/stop overhead.
 
22
 *
 
23
 * Few OSes allow arbitrary parts of a file to be released back to the OS,
 
24
 * so we have to implement this space-recycling ourselves within a single
 
25
 * logical file.  logtape.c exists to perform this bookkeeping and provide
 
26
 * the illusion of N independent tape devices to tuplesort.c.  Note that
 
27
 * logtape.c itself depends on buffile.c to provide a "logical file" of
 
28
 * larger size than the underlying OS may support.
 
29
 *
 
30
 * For simplicity, we allocate and release space in the underlying file
 
31
 * in BLCKSZ-size blocks.  Space allocation boils down to keeping track
 
32
 * of which blocks in the underlying file belong to which logical tape,
 
33
 * plus any blocks that are free (recycled and not yet reused).
 
34
 * The blocks in each logical tape are remembered using a method borrowed
 
35
 * from the Unix HFS filesystem: we store data block numbers in an
 
36
 * "indirect block".  If an indirect block fills up, we write it out to
 
37
 * the underlying file and remember its location in a second-level indirect
 
38
 * block.  In the same way second-level blocks are remembered in third-
 
39
 * level blocks, and so on if necessary (of course we're talking huge
 
40
 * amounts of data here).  The topmost indirect block of a given logical
 
41
 * tape is never actually written out to the physical file, but all lower-
 
42
 * level indirect blocks will be.
 
43
 *
 
44
 * The initial write pass is guaranteed to fill the underlying file
 
45
 * perfectly sequentially, no matter how data is divided into logical tapes.
 
46
 * Once we begin merge passes, the access pattern becomes considerably
 
47
 * less predictable --- but the seeking involved should be comparable to
 
48
 * what would happen if we kept each logical tape in a separate file,
 
49
 * so there's no serious performance penalty paid to obtain the space
 
50
 * savings of recycling.  We try to localize the write accesses by always
 
51
 * writing to the lowest-numbered free block when we have a choice; it's
 
52
 * not clear this helps much, but it can't hurt.  (XXX perhaps a LIFO
 
53
 * policy for free blocks would be better?)
 
54
 *
 
55
 * To support the above policy of writing to the lowest free block,
 
56
 * ltsGetFreeBlock sorts the list of free block numbers into decreasing
 
57
 * order each time it is asked for a block and the list isn't currently
 
58
 * sorted.      This is an efficient way to handle it because we expect cycles
 
59
 * of releasing many blocks followed by re-using many blocks, due to
 
60
 * tuplesort.c's "preread" behavior.
 
61
 *
 
62
 * Since all the bookkeeping and buffer memory is allocated with palloc(),
 
63
 * and the underlying file(s) are made with OpenTemporaryFile, all resources
 
64
 * for a logical tape set are certain to be cleaned up even if processing
 
65
 * is aborted by ereport(ERROR).  To avoid confusion, the caller should take
 
66
 * care that all calls for a single LogicalTapeSet are made in the same
 
67
 * palloc context.
 
68
 *
 
69
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
 
70
 * Portions Copyright (c) 1994, Regents of the University of California
 
71
 *
 
72
 * IDENTIFICATION
 
73
 *        $PostgreSQL$
 
74
 *
 
75
 *-------------------------------------------------------------------------
 
76
 */
 
77
 
 
78
#include "postgres.h"
 
79
 
 
80
#include "storage/buffile.h"
 
81
#include "utils/logtape.h"
 
82
 
 
83
/*
 
84
 * Block indexes are "long"s, so we can fit this many per indirect block.
 
85
 * NB: we assume this is an exact fit!
 
86
 */
 
87
#define BLOCKS_PER_INDIR_BLOCK  ((int) (BLCKSZ / sizeof(long)))
 
88
 
 
89
/*
 
90
 * We use a struct like this for each active indirection level of each
 
91
 * logical tape.  If the indirect block is not the highest level of its
 
92
 * tape, the "nextup" link points to the next higher level.  Only the
 
93
 * "ptrs" array is written out if we have to dump the indirect block to
 
94
 * disk.  If "ptrs" is not completely full, we store -1L in the first
 
95
 * unused slot at completion of the write phase for the logical tape.
 
96
 */
 
97
typedef struct IndirectBlock
 
98
{
 
99
        int                     nextSlot;               /* next pointer slot to write or read */
 
100
        struct IndirectBlock *nextup;           /* parent indirect level, or NULL if
 
101
                                                                                 * top */
 
102
        long            ptrs[BLOCKS_PER_INDIR_BLOCK];   /* indexes of contained blocks */
 
103
} IndirectBlock;
 
104
 
 
105
/*
 
106
 * This data structure represents a single "logical tape" within the set
 
107
 * of logical tapes stored in the same file.  We must keep track of the
 
108
 * current partially-read-or-written data block as well as the active
 
109
 * indirect block level(s).
 
110
 */
 
111
typedef struct LogicalTape
 
112
{
 
113
        IndirectBlock *indirect;        /* bottom of my indirect-block hierarchy */
 
114
        bool            writing;                /* T while in write phase */
 
115
        bool            frozen;                 /* T if blocks should not be freed when read */
 
116
        bool            dirty;                  /* does buffer need to be written? */
 
117
 
 
118
        /*
 
119
         * The total data volume in the logical tape is numFullBlocks * BLCKSZ +
 
120
         * lastBlockBytes.      BUT: we do not update lastBlockBytes during writing,
 
121
         * only at completion of a write phase.
 
122
         */
 
123
        long            numFullBlocks;  /* number of complete blocks in log tape */
 
124
        int                     lastBlockBytes; /* valid bytes in last (incomplete) block */
 
125
 
 
126
        /*
 
127
         * Buffer for current data block.  Note we don't bother to store the
 
128
         * actual file block number of the data block (during the write phase it
 
129
         * hasn't been assigned yet, and during read we don't care anymore). But
 
130
         * we do need the relative block number so we can detect end-of-tape while
 
131
         * reading.
 
132
         */
 
133
        char       *buffer;                     /* physical buffer (separately palloc'd) */
 
134
        long            curBlockNumber; /* this block's logical blk# within tape */
 
135
        int                     pos;                    /* next read/write position in buffer */
 
136
        int                     nbytes;                 /* total # of valid bytes in buffer */
 
137
} LogicalTape;
 
138
 
 
139
/*
 
140
 * This data structure represents a set of related "logical tapes" sharing
 
141
 * space in a single underlying file.  (But that "file" may be multiple files
 
142
 * if needed to escape OS limits on file size; buffile.c handles that for us.)
 
143
 * The number of tapes is fixed at creation.
 
144
 */
 
145
struct LogicalTapeSet
 
146
{
 
147
        BufFile    *pfile;                      /* underlying file for whole tape set */
 
148
        long            nFileBlocks;    /* # of blocks used in underlying file */
 
149
 
 
150
        /*
 
151
         * We store the numbers of recycled-and-available blocks in freeBlocks[].
 
152
         * When there are no such blocks, we extend the underlying file.
 
153
         *
 
154
         * If forgetFreeSpace is true then any freed blocks are simply forgotten
 
155
         * rather than being remembered in freeBlocks[].  See notes for
 
156
         * LogicalTapeSetForgetFreeSpace().
 
157
         *
 
158
         * If blocksSorted is true then the block numbers in freeBlocks are in
 
159
         * *decreasing* order, so that removing the last entry gives us the lowest
 
160
         * free block.  We re-sort the blocks whenever a block is demanded; this
 
161
         * should be reasonably efficient given the expected usage pattern.
 
162
         */
 
163
        bool            forgetFreeSpace;        /* are we remembering free blocks? */
 
164
        bool            blocksSorted;   /* is freeBlocks[] currently in order? */
 
165
        long       *freeBlocks;         /* resizable array */
 
166
        int                     nFreeBlocks;    /* # of currently free blocks */
 
167
        int                     freeBlocksLen;  /* current allocated length of freeBlocks[] */
 
168
 
 
169
        /*
 
170
         * tapes[] is declared size 1 since C wants a fixed size, but actually it
 
171
         * is of length nTapes.
 
172
         */
 
173
        int                     nTapes;                 /* # of logical tapes in set */
 
174
        LogicalTape tapes[1];           /* must be last in struct! */
 
175
};
 
176
 
 
177
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
 
178
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
 
179
static long ltsGetFreeBlock(LogicalTapeSet *lts);
 
180
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
 
181
static void ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
 
182
                                  long blocknum);
 
183
static long ltsRewindIndirectBlock(LogicalTapeSet *lts,
 
184
                                           IndirectBlock *indirect,
 
185
                                           bool freezing);
 
186
static long ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
 
187
                                                         IndirectBlock *indirect);
 
188
static long ltsRecallNextBlockNum(LogicalTapeSet *lts,
 
189
                                          IndirectBlock *indirect,
 
190
                                          bool frozen);
 
191
static long ltsRecallPrevBlockNum(LogicalTapeSet *lts,
 
192
                                          IndirectBlock *indirect);
 
193
static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt);
 
194
 
 
195
 
 
196
/*
 
197
 * Write a block-sized buffer to the specified block of the underlying file.
 
198
 *
 
199
 * NB: should not attempt to write beyond current end of file (ie, create
 
200
 * "holes" in file), since BufFile doesn't allow that.  The first write pass
 
201
 * must write blocks sequentially.
 
202
 *
 
203
 * No need for an error return convention; we ereport() on any error.
 
204
 */
 
205
static void
 
206
ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
 
207
{
 
208
        if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
 
209
                BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
 
210
                ereport(ERROR,
 
211
                /* XXX is it okay to assume errno is correct? */
 
212
                                (errcode_for_file_access(),
 
213
                                 errmsg("could not write block %ld of temporary file: %m",
 
214
                                                blocknum),
 
215
                                 errhint("Perhaps out of disk space?")));
 
216
}
 
217
 
 
218
/*
 
219
 * Read a block-sized buffer from the specified block of the underlying file.
 
220
 *
 
221
 * No need for an error return convention; we ereport() on any error.   This
 
222
 * module should never attempt to read a block it doesn't know is there.
 
223
 */
 
224
static void
 
225
ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
 
226
{
 
227
        if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
 
228
                BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
 
229
                ereport(ERROR,
 
230
                /* XXX is it okay to assume errno is correct? */
 
231
                                (errcode_for_file_access(),
 
232
                                 errmsg("could not read block %ld of temporary file: %m",
 
233
                                                blocknum)));
 
234
}
 
235
 
 
236
/*
 
237
 * qsort comparator for sorting freeBlocks[] into decreasing order.
 
238
 */
 
239
static int
 
240
freeBlocks_cmp(const void *a, const void *b)
 
241
{
 
242
        long            ablk = *((const long *) a);
 
243
        long            bblk = *((const long *) b);
 
244
 
 
245
        /* can't just subtract because long might be wider than int */
 
246
        if (ablk < bblk)
 
247
                return 1;
 
248
        if (ablk > bblk)
 
249
                return -1;
 
250
        return 0;
 
251
}
 
252
 
 
253
/*
 
254
 * Select a currently unused block for writing to.
 
255
 *
 
256
 * NB: should only be called when writer is ready to write immediately,
 
257
 * to ensure that first write pass is sequential.
 
258
 */
 
259
static long
 
260
ltsGetFreeBlock(LogicalTapeSet *lts)
 
261
{
 
262
        /*
 
263
         * If there are multiple free blocks, we select the one appearing last in
 
264
         * freeBlocks[] (after sorting the array if needed).  If there are none,
 
265
         * assign the next block at the end of the file.
 
266
         */
 
267
        if (lts->nFreeBlocks > 0)
 
268
        {
 
269
                if (!lts->blocksSorted)
 
270
                {
 
271
                        qsort((void *) lts->freeBlocks, lts->nFreeBlocks,
 
272
                                  sizeof(long), freeBlocks_cmp);
 
273
                        lts->blocksSorted = true;
 
274
                }
 
275
                return lts->freeBlocks[--lts->nFreeBlocks];
 
276
        }
 
277
        else
 
278
                return lts->nFileBlocks++;
 
279
}
 
280
 
 
281
/*
 
282
 * Return a block# to the freelist.
 
283
 */
 
284
static void
 
285
ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
 
286
{
 
287
        int                     ndx;
 
288
 
 
289
        /*
 
290
         * Do nothing if we're no longer interested in remembering free space.
 
291
         */
 
292
        if (lts->forgetFreeSpace)
 
293
                return;
 
294
 
 
295
        /*
 
296
         * Enlarge freeBlocks array if full.
 
297
         */
 
298
        if (lts->nFreeBlocks >= lts->freeBlocksLen)
 
299
        {
 
300
                lts->freeBlocksLen *= 2;
 
301
                lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
 
302
                                                                                  lts->freeBlocksLen * sizeof(long));
 
303
        }
 
304
 
 
305
        /*
 
306
         * Add blocknum to array, and mark the array unsorted if it's no longer in
 
307
         * decreasing order.
 
308
         */
 
309
        ndx = lts->nFreeBlocks++;
 
310
        lts->freeBlocks[ndx] = blocknum;
 
311
        if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum)
 
312
                lts->blocksSorted = false;
 
313
}
 
314
 
 
315
/*
 
316
 * These routines manipulate indirect-block hierarchies.  All are recursive
 
317
 * so that they don't have any specific limit on the depth of hierarchy.
 
318
 */
 
319
 
 
320
/*
 
321
 * Record a data block number in a logical tape's lowest indirect block,
 
322
 * or record an indirect block's number in the next higher indirect level.
 
323
 */
 
324
static void
 
325
ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
 
326
                                  long blocknum)
 
327
{
 
328
        if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK)
 
329
        {
 
330
                /*
 
331
                 * This indirect block is full, so dump it out and recursively save
 
332
                 * its address in the next indirection level.  Create a new
 
333
                 * indirection level if there wasn't one before.
 
334
                 */
 
335
                long            indirblock = ltsGetFreeBlock(lts);
 
336
 
 
337
                ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
 
338
                if (indirect->nextup == NULL)
 
339
                {
 
340
                        indirect->nextup = (IndirectBlock *) palloc(sizeof(IndirectBlock));
 
341
                        indirect->nextup->nextSlot = 0;
 
342
                        indirect->nextup->nextup = NULL;
 
343
                }
 
344
                ltsRecordBlockNum(lts, indirect->nextup, indirblock);
 
345
 
 
346
                /*
 
347
                 * Reset to fill another indirect block at this level.
 
348
                 */
 
349
                indirect->nextSlot = 0;
 
350
        }
 
351
        indirect->ptrs[indirect->nextSlot++] = blocknum;
 
352
}
 
353
 
 
354
/*
 
355
 * Reset a logical tape's indirect-block hierarchy after a write pass
 
356
 * to prepare for reading.      We dump out partly-filled blocks except
 
357
 * at the top of the hierarchy, and we rewind each level to the start.
 
358
 * This call returns the first data block number, or -1L if the tape
 
359
 * is empty.
 
360
 *
 
361
 * Unless 'freezing' is true, release indirect blocks to the free pool after
 
362
 * reading them.
 
363
 */
 
364
static long
 
365
ltsRewindIndirectBlock(LogicalTapeSet *lts,
 
366
                                           IndirectBlock *indirect,
 
367
                                           bool freezing)
 
368
{
 
369
        /* Handle case of never-written-to tape */
 
370
        if (indirect == NULL)
 
371
                return -1L;
 
372
 
 
373
        /* Insert sentinel if block is not full */
 
374
        if (indirect->nextSlot < BLOCKS_PER_INDIR_BLOCK)
 
375
                indirect->ptrs[indirect->nextSlot] = -1L;
 
376
 
 
377
        /*
 
378
         * If block is not topmost, write it out, and recurse to obtain address of
 
379
         * first block in this hierarchy level.  Read that one in.
 
380
         */
 
381
        if (indirect->nextup != NULL)
 
382
        {
 
383
                long            indirblock = ltsGetFreeBlock(lts);
 
384
 
 
385
                ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
 
386
                ltsRecordBlockNum(lts, indirect->nextup, indirblock);
 
387
                indirblock = ltsRewindIndirectBlock(lts, indirect->nextup, freezing);
 
388
                Assert(indirblock != -1L);
 
389
                ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
 
390
                if (!freezing)
 
391
                        ltsReleaseBlock(lts, indirblock);
 
392
        }
 
393
 
 
394
        /*
 
395
         * Reset my next-block pointer, and then fetch a block number if any.
 
396
         */
 
397
        indirect->nextSlot = 0;
 
398
        if (indirect->ptrs[0] == -1L)
 
399
                return -1L;
 
400
        return indirect->ptrs[indirect->nextSlot++];
 
401
}
 
402
 
 
403
/*
 
404
 * Rewind a previously-frozen indirect-block hierarchy for another read pass.
 
405
 * This call returns the first data block number, or -1L if the tape
 
406
 * is empty.
 
407
 */
 
408
static long
 
409
ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
 
410
                                                         IndirectBlock *indirect)
 
411
{
 
412
        /* Handle case of never-written-to tape */
 
413
        if (indirect == NULL)
 
414
                return -1L;
 
415
 
 
416
        /*
 
417
         * If block is not topmost, recurse to obtain address of first block in
 
418
         * this hierarchy level.  Read that one in.
 
419
         */
 
420
        if (indirect->nextup != NULL)
 
421
        {
 
422
                long            indirblock;
 
423
 
 
424
                indirblock = ltsRewindFrozenIndirectBlock(lts, indirect->nextup);
 
425
                Assert(indirblock != -1L);
 
426
                ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
 
427
        }
 
428
 
 
429
        /*
 
430
         * Reset my next-block pointer, and then fetch a block number if any.
 
431
         */
 
432
        indirect->nextSlot = 0;
 
433
        if (indirect->ptrs[0] == -1L)
 
434
                return -1L;
 
435
        return indirect->ptrs[indirect->nextSlot++];
 
436
}
 
437
 
 
438
/*
 
439
 * Obtain next data block number in the forward direction, or -1L if no more.
 
440
 *
 
441
 * Unless 'frozen' is true, release indirect blocks to the free pool after
 
442
 * reading them.
 
443
 */
 
444
static long
 
445
ltsRecallNextBlockNum(LogicalTapeSet *lts,
 
446
                                          IndirectBlock *indirect,
 
447
                                          bool frozen)
 
448
{
 
449
        /* Handle case of never-written-to tape */
 
450
        if (indirect == NULL)
 
451
                return -1L;
 
452
 
 
453
        if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK ||
 
454
                indirect->ptrs[indirect->nextSlot] == -1L)
 
455
        {
 
456
                long            indirblock;
 
457
 
 
458
                if (indirect->nextup == NULL)
 
459
                        return -1L;                     /* nothing left at this level */
 
460
                indirblock = ltsRecallNextBlockNum(lts, indirect->nextup, frozen);
 
461
                if (indirblock == -1L)
 
462
                        return -1L;                     /* nothing left at this level */
 
463
                ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
 
464
                if (!frozen)
 
465
                        ltsReleaseBlock(lts, indirblock);
 
466
                indirect->nextSlot = 0;
 
467
        }
 
468
        if (indirect->ptrs[indirect->nextSlot] == -1L)
 
469
                return -1L;
 
470
        return indirect->ptrs[indirect->nextSlot++];
 
471
}
 
472
 
 
473
/*
 
474
 * Obtain next data block number in the reverse direction, or -1L if no more.
 
475
 *
 
476
 * Note this fetches the block# before the one last returned, no matter which
 
477
 * direction of call returned that one.  If we fail, no change in state.
 
478
 *
 
479
 * This routine can only be used in 'frozen' state, so there's no need to
 
480
 * pass a parameter telling whether to release blocks ... we never do.
 
481
 */
 
482
static long
 
483
ltsRecallPrevBlockNum(LogicalTapeSet *lts,
 
484
                                          IndirectBlock *indirect)
 
485
{
 
486
        /* Handle case of never-written-to tape */
 
487
        if (indirect == NULL)
 
488
                return -1L;
 
489
 
 
490
        if (indirect->nextSlot <= 1)
 
491
        {
 
492
                long            indirblock;
 
493
 
 
494
                if (indirect->nextup == NULL)
 
495
                        return -1L;                     /* nothing left at this level */
 
496
                indirblock = ltsRecallPrevBlockNum(lts, indirect->nextup);
 
497
                if (indirblock == -1L)
 
498
                        return -1L;                     /* nothing left at this level */
 
499
                ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
 
500
 
 
501
                /*
 
502
                 * The previous block would only have been written out if full, so we
 
503
                 * need not search it for a -1 sentinel.
 
504
                 */
 
505
                indirect->nextSlot = BLOCKS_PER_INDIR_BLOCK + 1;
 
506
        }
 
507
        indirect->nextSlot--;
 
508
        return indirect->ptrs[indirect->nextSlot - 1];
 
509
}
 
510
 
 
511
 
 
512
/*
 
513
 * Create a set of logical tapes in a temporary underlying file.
 
514
 *
 
515
 * Each tape is initialized in write state.
 
516
 */
 
517
LogicalTapeSet *
 
518
LogicalTapeSetCreate(int ntapes)
 
519
{
 
520
        LogicalTapeSet *lts;
 
521
        LogicalTape *lt;
 
522
        int                     i;
 
523
 
 
524
        /*
 
525
         * Create top-level struct including per-tape LogicalTape structs. First
 
526
         * LogicalTape struct is already counted in sizeof(LogicalTapeSet).
 
527
         */
 
528
        Assert(ntapes > 0);
 
529
        lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet) +
 
530
                                                                        (ntapes - 1) *sizeof(LogicalTape));
 
531
        lts->pfile = BufFileCreateTemp(false);
 
532
        lts->nFileBlocks = 0L;
 
533
        lts->forgetFreeSpace = false;
 
534
        lts->blocksSorted = true;       /* a zero-length array is sorted ... */
 
535
        lts->freeBlocksLen = 32;        /* reasonable initial guess */
 
536
        lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
 
537
        lts->nFreeBlocks = 0;
 
538
        lts->nTapes = ntapes;
 
539
 
 
540
        /*
 
541
         * Initialize per-tape structs.  Note we allocate the I/O buffer and
 
542
         * first-level indirect block for a tape only when it is first actually
 
543
         * written to.  This avoids wasting memory space when tuplesort.c
 
544
         * overestimates the number of tapes needed.
 
545
         */
 
546
        for (i = 0; i < ntapes; i++)
 
547
        {
 
548
                lt = &lts->tapes[i];
 
549
                lt->indirect = NULL;
 
550
                lt->writing = true;
 
551
                lt->frozen = false;
 
552
                lt->dirty = false;
 
553
                lt->numFullBlocks = 0L;
 
554
                lt->lastBlockBytes = 0;
 
555
                lt->buffer = NULL;
 
556
                lt->curBlockNumber = 0L;
 
557
                lt->pos = 0;
 
558
                lt->nbytes = 0;
 
559
        }
 
560
        return lts;
 
561
}
 
562
 
 
563
/*
 
564
 * Close a logical tape set and release all resources.
 
565
 */
 
566
void
 
567
LogicalTapeSetClose(LogicalTapeSet *lts)
 
568
{
 
569
        LogicalTape *lt;
 
570
        IndirectBlock *ib,
 
571
                           *nextib;
 
572
        int                     i;
 
573
 
 
574
        BufFileClose(lts->pfile);
 
575
        for (i = 0; i < lts->nTapes; i++)
 
576
        {
 
577
                lt = &lts->tapes[i];
 
578
                for (ib = lt->indirect; ib != NULL; ib = nextib)
 
579
                {
 
580
                        nextib = ib->nextup;
 
581
                        pfree(ib);
 
582
                }
 
583
                if (lt->buffer)
 
584
                        pfree(lt->buffer);
 
585
        }
 
586
        pfree(lts->freeBlocks);
 
587
        pfree(lts);
 
588
}
 
589
 
 
590
/*
 
591
 * Mark a logical tape set as not needing management of free space anymore.
 
592
 *
 
593
 * This should be called if the caller does not intend to write any more data
 
594
 * into the tape set, but is reading from un-frozen tapes.      Since no more
 
595
 * writes are planned, remembering free blocks is no longer useful.  Setting
 
596
 * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
 
597
 * is not designed to handle large numbers of free blocks.
 
598
 */
 
599
void
 
600
LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
 
601
{
 
602
        lts->forgetFreeSpace = true;
 
603
}
 
604
 
 
605
/*
 
606
 * Dump the dirty buffer of a logical tape.
 
607
 */
 
608
static void
 
609
ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
 
610
{
 
611
        long            datablock = ltsGetFreeBlock(lts);
 
612
 
 
613
        Assert(lt->dirty);
 
614
        ltsWriteBlock(lts, datablock, (void *) lt->buffer);
 
615
        ltsRecordBlockNum(lts, lt->indirect, datablock);
 
616
        lt->dirty = false;
 
617
        /* Caller must do other state update as needed */
 
618
}
 
619
 
 
620
/*
 
621
 * Write to a logical tape.
 
622
 *
 
623
 * There are no error returns; we ereport() on failure.
 
624
 */
 
625
void
 
626
LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 
627
                                 void *ptr, size_t size)
 
628
{
 
629
        LogicalTape *lt;
 
630
        size_t          nthistime;
 
631
 
 
632
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
633
        lt = &lts->tapes[tapenum];
 
634
        Assert(lt->writing);
 
635
 
 
636
        /* Allocate data buffer and first indirect block on first write */
 
637
        if (lt->buffer == NULL)
 
638
                lt->buffer = (char *) palloc(BLCKSZ);
 
639
        if (lt->indirect == NULL)
 
640
        {
 
641
                lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
 
642
                lt->indirect->nextSlot = 0;
 
643
                lt->indirect->nextup = NULL;
 
644
        }
 
645
 
 
646
        while (size > 0)
 
647
        {
 
648
                if (lt->pos >= BLCKSZ)
 
649
                {
 
650
                        /* Buffer full, dump it out */
 
651
                        if (lt->dirty)
 
652
                                ltsDumpBuffer(lts, lt);
 
653
                        else
 
654
                        {
 
655
                                /* Hmm, went directly from reading to writing? */
 
656
                                elog(ERROR, "invalid logtape state: should be dirty");
 
657
                        }
 
658
                        lt->numFullBlocks++;
 
659
                        lt->curBlockNumber++;
 
660
                        lt->pos = 0;
 
661
                        lt->nbytes = 0;
 
662
                }
 
663
 
 
664
                nthistime = BLCKSZ - lt->pos;
 
665
                if (nthistime > size)
 
666
                        nthistime = size;
 
667
                Assert(nthistime > 0);
 
668
 
 
669
                memcpy(lt->buffer + lt->pos, ptr, nthistime);
 
670
 
 
671
                lt->dirty = true;
 
672
                lt->pos += nthistime;
 
673
                if (lt->nbytes < lt->pos)
 
674
                        lt->nbytes = lt->pos;
 
675
                ptr = (void *) ((char *) ptr + nthistime);
 
676
                size -= nthistime;
 
677
        }
 
678
}
 
679
 
 
680
/*
 
681
 * Rewind logical tape and switch from writing to reading or vice versa.
 
682
 *
 
683
 * Unless the tape has been "frozen" in read state, forWrite must be the
 
684
 * opposite of the previous tape state.
 
685
 */
 
686
void
 
687
LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
 
688
{
 
689
        LogicalTape *lt;
 
690
        long            datablocknum;
 
691
 
 
692
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
693
        lt = &lts->tapes[tapenum];
 
694
 
 
695
        if (!forWrite)
 
696
        {
 
697
                if (lt->writing)
 
698
                {
 
699
                        /*
 
700
                         * Completion of a write phase.  Flush last partial data block,
 
701
                         * flush any partial indirect blocks, rewind for normal
 
702
                         * (destructive) read.
 
703
                         */
 
704
                        if (lt->dirty)
 
705
                                ltsDumpBuffer(lts, lt);
 
706
                        lt->lastBlockBytes = lt->nbytes;
 
707
                        lt->writing = false;
 
708
                        datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
 
709
                }
 
710
                else
 
711
                {
 
712
                        /*
 
713
                         * This is only OK if tape is frozen; we rewind for (another) read
 
714
                         * pass.
 
715
                         */
 
716
                        Assert(lt->frozen);
 
717
                        datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
 
718
                }
 
719
                /* Read the first block, or reset if tape is empty */
 
720
                lt->curBlockNumber = 0L;
 
721
                lt->pos = 0;
 
722
                lt->nbytes = 0;
 
723
                if (datablocknum != -1L)
 
724
                {
 
725
                        ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
 
726
                        if (!lt->frozen)
 
727
                                ltsReleaseBlock(lts, datablocknum);
 
728
                        lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
 
729
                                BLCKSZ : lt->lastBlockBytes;
 
730
                }
 
731
        }
 
732
        else
 
733
        {
 
734
                /*
 
735
                 * Completion of a read phase.  Rewind and prepare for write.
 
736
                 *
 
737
                 * NOTE: we assume the caller has read the tape to the end; otherwise
 
738
                 * untouched data and indirect blocks will not have been freed. We
 
739
                 * could add more code to free any unread blocks, but in current usage
 
740
                 * of this module it'd be useless code.
 
741
                 */
 
742
                IndirectBlock *ib,
 
743
                                   *nextib;
 
744
 
 
745
                Assert(!lt->writing && !lt->frozen);
 
746
                /* Must truncate the indirect-block hierarchy down to one level. */
 
747
                if (lt->indirect)
 
748
                {
 
749
                        for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
 
750
                        {
 
751
                                nextib = ib->nextup;
 
752
                                pfree(ib);
 
753
                        }
 
754
                        lt->indirect->nextSlot = 0;
 
755
                        lt->indirect->nextup = NULL;
 
756
                }
 
757
                lt->writing = true;
 
758
                lt->dirty = false;
 
759
                lt->numFullBlocks = 0L;
 
760
                lt->lastBlockBytes = 0;
 
761
                lt->curBlockNumber = 0L;
 
762
                lt->pos = 0;
 
763
                lt->nbytes = 0;
 
764
        }
 
765
}
 
766
 
 
767
/*
 
768
 * Read from a logical tape.
 
769
 *
 
770
 * Early EOF is indicated by return value less than #bytes requested.
 
771
 */
 
772
size_t
 
773
LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
 
774
                                void *ptr, size_t size)
 
775
{
 
776
        LogicalTape *lt;
 
777
        size_t          nread = 0;
 
778
        size_t          nthistime;
 
779
 
 
780
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
781
        lt = &lts->tapes[tapenum];
 
782
        Assert(!lt->writing);
 
783
 
 
784
        while (size > 0)
 
785
        {
 
786
                if (lt->pos >= lt->nbytes)
 
787
                {
 
788
                        /* Try to load more data into buffer. */
 
789
                        long            datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
 
790
                                                                                                                         lt->frozen);
 
791
 
 
792
                        if (datablocknum == -1L)
 
793
                                break;                  /* EOF */
 
794
                        lt->curBlockNumber++;
 
795
                        lt->pos = 0;
 
796
                        ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
 
797
                        if (!lt->frozen)
 
798
                                ltsReleaseBlock(lts, datablocknum);
 
799
                        lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
 
800
                                BLCKSZ : lt->lastBlockBytes;
 
801
                        if (lt->nbytes <= 0)
 
802
                                break;                  /* EOF (possible here?) */
 
803
                }
 
804
 
 
805
                nthistime = lt->nbytes - lt->pos;
 
806
                if (nthistime > size)
 
807
                        nthistime = size;
 
808
                Assert(nthistime > 0);
 
809
 
 
810
                memcpy(ptr, lt->buffer + lt->pos, nthistime);
 
811
 
 
812
                lt->pos += nthistime;
 
813
                ptr = (void *) ((char *) ptr + nthistime);
 
814
                size -= nthistime;
 
815
                nread += nthistime;
 
816
        }
 
817
 
 
818
        return nread;
 
819
}
 
820
 
 
821
/*
 
822
 * "Freeze" the contents of a tape so that it can be read multiple times
 
823
 * and/or read backwards.  Once a tape is frozen, its contents will not
 
824
 * be released until the LogicalTapeSet is destroyed.  This is expected
 
825
 * to be used only for the final output pass of a merge.
 
826
 *
 
827
 * This *must* be called just at the end of a write pass, before the
 
828
 * tape is rewound (after rewind is too late!).  It performs a rewind
 
829
 * and switch to read mode "for free".  An immediately following rewind-
 
830
 * for-read call is OK but not necessary.
 
831
 */
 
832
void
 
833
LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
 
834
{
 
835
        LogicalTape *lt;
 
836
        long            datablocknum;
 
837
 
 
838
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
839
        lt = &lts->tapes[tapenum];
 
840
        Assert(lt->writing);
 
841
 
 
842
        /*
 
843
         * Completion of a write phase.  Flush last partial data block, flush any
 
844
         * partial indirect blocks, rewind for nondestructive read.
 
845
         */
 
846
        if (lt->dirty)
 
847
                ltsDumpBuffer(lts, lt);
 
848
        lt->lastBlockBytes = lt->nbytes;
 
849
        lt->writing = false;
 
850
        lt->frozen = true;
 
851
        datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
 
852
        /* Read the first block, or reset if tape is empty */
 
853
        lt->curBlockNumber = 0L;
 
854
        lt->pos = 0;
 
855
        lt->nbytes = 0;
 
856
        if (datablocknum != -1L)
 
857
        {
 
858
                ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
 
859
                lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
 
860
                        BLCKSZ : lt->lastBlockBytes;
 
861
        }
 
862
}
 
863
 
 
864
/*
 
865
 * Backspace the tape a given number of bytes.  (We also support a more
 
866
 * general seek interface, see below.)
 
867
 *
 
868
 * *Only* a frozen-for-read tape can be backed up; we don't support
 
869
 * random access during write, and an unfrozen read tape may have
 
870
 * already discarded the desired data!
 
871
 *
 
872
 * Return value is TRUE if seek successful, FALSE if there isn't that much
 
873
 * data before the current point (in which case there's no state change).
 
874
 */
 
875
bool
 
876
LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
 
877
{
 
878
        LogicalTape *lt;
 
879
        long            nblocks;
 
880
        int                     newpos;
 
881
 
 
882
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
883
        lt = &lts->tapes[tapenum];
 
884
        Assert(lt->frozen);
 
885
 
 
886
        /*
 
887
         * Easy case for seek within current block.
 
888
         */
 
889
        if (size <= (size_t) lt->pos)
 
890
        {
 
891
                lt->pos -= (int) size;
 
892
                return true;
 
893
        }
 
894
 
 
895
        /*
 
896
         * Not-so-easy case.  Figure out whether it's possible at all.
 
897
         */
 
898
        size -= (size_t) lt->pos;       /* part within this block */
 
899
        nblocks = size / BLCKSZ;
 
900
        size = size % BLCKSZ;
 
901
        if (size)
 
902
        {
 
903
                nblocks++;
 
904
                newpos = (int) (BLCKSZ - size);
 
905
        }
 
906
        else
 
907
                newpos = 0;
 
908
        if (nblocks > lt->curBlockNumber)
 
909
                return false;                   /* a seek too far... */
 
910
 
 
911
        /*
 
912
         * OK, we need to back up nblocks blocks.  This implementation would be
 
913
         * pretty inefficient for long seeks, but we really aren't expecting that
 
914
         * (a seek over one tuple is typical).
 
915
         */
 
916
        while (nblocks-- > 0)
 
917
        {
 
918
                long            datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
 
919
 
 
920
                if (datablocknum == -1L)
 
921
                        elog(ERROR, "unexpected end of tape");
 
922
                lt->curBlockNumber--;
 
923
                if (nblocks == 0)
 
924
                {
 
925
                        ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
 
926
                        lt->nbytes = BLCKSZ;
 
927
                }
 
928
        }
 
929
        lt->pos = newpos;
 
930
        return true;
 
931
}
 
932
 
 
933
/*
 
934
 * Seek to an arbitrary position in a logical tape.
 
935
 *
 
936
 * *Only* a frozen-for-read tape can be seeked.
 
937
 *
 
938
 * Return value is TRUE if seek successful, FALSE if there isn't that much
 
939
 * data in the tape (in which case there's no state change).
 
940
 */
 
941
bool
 
942
LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
 
943
                                long blocknum, int offset)
 
944
{
 
945
        LogicalTape *lt;
 
946
 
 
947
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
948
        lt = &lts->tapes[tapenum];
 
949
        Assert(lt->frozen);
 
950
        Assert(offset >= 0 && offset <= BLCKSZ);
 
951
 
 
952
        /*
 
953
         * Easy case for seek within current block.
 
954
         */
 
955
        if (blocknum == lt->curBlockNumber && offset <= lt->nbytes)
 
956
        {
 
957
                lt->pos = offset;
 
958
                return true;
 
959
        }
 
960
 
 
961
        /*
 
962
         * Not-so-easy case.  Figure out whether it's possible at all.
 
963
         */
 
964
        if (blocknum < 0 || blocknum > lt->numFullBlocks ||
 
965
                (blocknum == lt->numFullBlocks && offset > lt->lastBlockBytes))
 
966
                return false;
 
967
 
 
968
        /*
 
969
         * OK, advance or back up to the target block.  This implementation would
 
970
         * be pretty inefficient for long seeks, but we really aren't expecting
 
971
         * that (a seek over one tuple is typical).
 
972
         */
 
973
        while (lt->curBlockNumber > blocknum)
 
974
        {
 
975
                long            datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
 
976
 
 
977
                if (datablocknum == -1L)
 
978
                        elog(ERROR, "unexpected end of tape");
 
979
                if (--lt->curBlockNumber == blocknum)
 
980
                        ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
 
981
        }
 
982
        while (lt->curBlockNumber < blocknum)
 
983
        {
 
984
                long            datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
 
985
                                                                                                                 lt->frozen);
 
986
 
 
987
                if (datablocknum == -1L)
 
988
                        elog(ERROR, "unexpected end of tape");
 
989
                if (++lt->curBlockNumber == blocknum)
 
990
                        ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
 
991
        }
 
992
        lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
 
993
                BLCKSZ : lt->lastBlockBytes;
 
994
        lt->pos = offset;
 
995
        return true;
 
996
}
 
997
 
 
998
/*
 
999
 * Obtain current position in a form suitable for a later LogicalTapeSeek.
 
1000
 *
 
1001
 * NOTE: it'd be OK to do this during write phase with intention of using
 
1002
 * the position for a seek after freezing.      Not clear if anyone needs that.
 
1003
 */
 
1004
void
 
1005
LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
 
1006
                                long *blocknum, int *offset)
 
1007
{
 
1008
        LogicalTape *lt;
 
1009
 
 
1010
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
1011
        lt = &lts->tapes[tapenum];
 
1012
        *blocknum = lt->curBlockNumber;
 
1013
        *offset = lt->pos;
 
1014
}
 
1015
 
 
1016
/*
 
1017
 * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
 
1018
 */
 
1019
long
 
1020
LogicalTapeSetBlocks(LogicalTapeSet *lts)
 
1021
{
 
1022
        return lts->nFileBlocks;
 
1023
}