~vcs-imports/mammoth-replicator/trunk

« back to all changes in this revision

Viewing changes to src/backend/utils/sort/logtape.c

  • Committer: alvherre
  • Date: 2005-12-16 21:24:52 UTC
  • Revision ID: svn-v4:db760fc0-0f08-0410-9d63-cc6633f64896:trunk:1
Initial import of the REL8_0_3 sources from the Pgsql CVS repository.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * logtape.c
 
4
 *        Management of "logical tapes" within temporary files.
 
5
 *
 
6
 * This module exists to support sorting via multiple merge passes (see
 
7
 * tuplesort.c).  Merging is an ideal algorithm for tape devices, but if
 
8
 * we implement it on disk by creating a separate file for each "tape",
 
9
 * there is an annoying problem: the peak space usage is at least twice
 
10
 * the volume of actual data to be sorted.      (This must be so because each
 
11
 * datum will appear in both the input and output tapes of the final
 
12
 * merge pass.  For seven-tape polyphase merge, which is otherwise a
 
13
 * pretty good algorithm, peak usage is more like 4x actual data volume.)
 
14
 *
 
15
 * We can work around this problem by recognizing that any one tape
 
16
 * dataset (with the possible exception of the final output) is written
 
17
 * and read exactly once in a perfectly sequential manner.      Therefore,
 
18
 * a datum once read will not be required again, and we can recycle its
 
19
 * space for use by the new tape dataset(s) being generated.  In this way,
 
20
 * the total space usage is essentially just the actual data volume, plus
 
21
 * insignificant bookkeeping and start/stop overhead.
 
22
 *
 
23
 * Few OSes allow arbitrary parts of a file to be released back to the OS,
 
24
 * so we have to implement this space-recycling ourselves within a single
 
25
 * logical file.  logtape.c exists to perform this bookkeeping and provide
 
26
 * the illusion of N independent tape devices to tuplesort.c.  Note that
 
27
 * logtape.c itself depends on buffile.c to provide a "logical file" of
 
28
 * larger size than the underlying OS may support.
 
29
 *
 
30
 * For simplicity, we allocate and release space in the underlying file
 
31
 * in BLCKSZ-size blocks.  Space allocation boils down to keeping track
 
32
 * of which blocks in the underlying file belong to which logical tape,
 
33
 * plus any blocks that are free (recycled and not yet reused).  Normally
 
34
 * there are not very many free blocks, so we just keep those in a list.
 
35
 * The blocks in each logical tape are remembered using a method borrowed
 
36
 * from the Unix HFS filesystem: we store data block numbers in an
 
37
 * "indirect block".  If an indirect block fills up, we write it out to
 
38
 * the underlying file and remember its location in a second-level indirect
 
39
 * block.  In the same way second-level blocks are remembered in third-
 
40
 * level blocks, and so on if necessary (of course we're talking huge
 
41
 * amounts of data here).  The topmost indirect block of a given logical
 
42
 * tape is never actually written out to the physical file, but all lower-
 
43
 * level indirect blocks will be.
 
44
 *
 
45
 * The initial write pass is guaranteed to fill the underlying file
 
46
 * perfectly sequentially, no matter how data is divided into logical tapes.
 
47
 * Once we begin merge passes, the access pattern becomes considerably
 
48
 * less predictable --- but the seeking involved should be comparable to
 
49
 * what would happen if we kept each logical tape in a separate file,
 
50
 * so there's no serious performance penalty paid to obtain the space
 
51
 * savings of recycling.  We try to localize the write accesses by always
 
52
 * writing to the lowest-numbered free block when we have a choice; it's
 
53
 * not clear this helps much, but it can't hurt.  (XXX perhaps a LIFO
 
54
 * policy for free blocks would be better?)
 
55
 *
 
56
 * Since all the bookkeeping and buffer memory is allocated with palloc(),
 
57
 * and the underlying file(s) are made with OpenTemporaryFile, all resources
 
58
 * for a logical tape set are certain to be cleaned up even if processing
 
59
 * is aborted by ereport(ERROR).  To avoid confusion, the caller should take
 
60
 * care that all calls for a single LogicalTapeSet are made in the same
 
61
 * palloc context.
 
62
 *
 
63
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
 
64
 * Portions Copyright (c) 1994, Regents of the University of California
 
65
 *
 
66
 * IDENTIFICATION
 
67
 *        $PostgreSQL: pgsql/src/backend/utils/sort/logtape.c,v 1.15 2004-12-31 22:02:52 pgsql Exp $
 
68
 *
 
69
 *-------------------------------------------------------------------------
 
70
 */
 
71
 
 
72
#include "postgres.h"
 
73
 
 
74
#include "storage/buffile.h"
 
75
#include "utils/logtape.h"
 
76
 
 
77
/*
 
78
 * Block indexes are "long"s, so we can fit this many per indirect block.
 
79
 * NB: we assume this is an exact fit!
 
80
 */
 
81
#define BLOCKS_PER_INDIR_BLOCK  ((int) (BLCKSZ / sizeof(long)))
 
82
 
 
83
/*
 
84
 * We use a struct like this for each active indirection level of each
 
85
 * logical tape.  If the indirect block is not the highest level of its
 
86
 * tape, the "nextup" link points to the next higher level.  Only the
 
87
 * "ptrs" array is written out if we have to dump the indirect block to
 
88
 * disk.  If "ptrs" is not completely full, we store -1L in the first
 
89
 * unused slot at completion of the write phase for the logical tape.
 
90
 */
 
91
typedef struct IndirectBlock
 
92
{
 
93
        int                     nextSlot;               /* next pointer slot to write or read */
 
94
        struct IndirectBlock *nextup;           /* parent indirect level, or NULL
 
95
                                                                                 * if top */
 
96
        long            ptrs[BLOCKS_PER_INDIR_BLOCK];   /* indexes of contained
 
97
                                                                                                 * blocks */
 
98
} IndirectBlock;
 
99
 
 
100
/*
 
101
 * This data structure represents a single "logical tape" within the set
 
102
 * of logical tapes stored in the same file.  We must keep track of the
 
103
 * current partially-read-or-written data block as well as the active
 
104
 * indirect block level(s).
 
105
 */
 
106
typedef struct LogicalTape
 
107
{
 
108
        IndirectBlock *indirect;        /* bottom of my indirect-block hierarchy */
 
109
        bool            writing;                /* T while in write phase */
 
110
        bool            frozen;                 /* T if blocks should not be freed when
 
111
                                                                 * read */
 
112
        bool            dirty;                  /* does buffer need to be written? */
 
113
 
 
114
        /*
 
115
         * The total data volume in the logical tape is numFullBlocks * BLCKSZ
 
116
         * + lastBlockBytes.  BUT: we do not update lastBlockBytes during
 
117
         * writing, only at completion of a write phase.
 
118
         */
 
119
        long            numFullBlocks;  /* number of complete blocks in log tape */
 
120
        int                     lastBlockBytes; /* valid bytes in last (incomplete) block */
 
121
 
 
122
        /*
 
123
         * Buffer for current data block.  Note we don't bother to store the
 
124
         * actual file block number of the data block (during the write phase
 
125
         * it hasn't been assigned yet, and during read we don't care
 
126
         * anymore). But we do need the relative block number so we can detect
 
127
         * end-of-tape while reading.
 
128
         */
 
129
        long            curBlockNumber; /* this block's logical blk# within tape */
 
130
        int                     pos;                    /* next read/write position in buffer */
 
131
        int                     nbytes;                 /* total # of valid bytes in buffer */
 
132
        char            buffer[BLCKSZ];
 
133
} LogicalTape;
 
134
 
 
135
/*
 
136
 * This data structure represents a set of related "logical tapes" sharing
 
137
 * space in a single underlying file.  (But that "file" may be multiple files
 
138
 * if needed to escape OS limits on file size; buffile.c handles that for us.)
 
139
 * The number of tapes is fixed at creation.
 
140
 */
 
141
struct LogicalTapeSet
 
142
{
 
143
        BufFile    *pfile;                      /* underlying file for whole tape set */
 
144
        long            nFileBlocks;    /* # of blocks used in underlying file */
 
145
 
 
146
        /*
 
147
         * We store the numbers of recycled-and-available blocks in
 
148
         * freeBlocks[]. When there are no such blocks, we extend the
 
149
         * underlying file.  Note that the block numbers in freeBlocks are
 
150
         * always in *decreasing* order, so that removing the last entry gives
 
151
         * us the lowest free block.
 
152
         */
 
153
        long       *freeBlocks;         /* resizable array */
 
154
        int                     nFreeBlocks;    /* # of currently free blocks */
 
155
        int                     freeBlocksLen;  /* current allocated length of
 
156
                                                                 * freeBlocks[] */
 
157
 
 
158
        /*
 
159
         * tapes[] is declared size 1 since C wants a fixed size, but actually
 
160
         * it is of length nTapes.
 
161
         */
 
162
        int                     nTapes;                 /* # of logical tapes in set */
 
163
        LogicalTape *tapes[1];          /* must be last in struct! */
 
164
};
 
165
 
 
166
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
 
167
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
 
168
static long ltsGetFreeBlock(LogicalTapeSet *lts);
 
169
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
 
170
static void ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
 
171
                                  long blocknum);
 
172
static long ltsRewindIndirectBlock(LogicalTapeSet *lts,
 
173
                                           IndirectBlock *indirect,
 
174
                                           bool freezing);
 
175
static long ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
 
176
                                                         IndirectBlock *indirect);
 
177
static long ltsRecallNextBlockNum(LogicalTapeSet *lts,
 
178
                                          IndirectBlock *indirect,
 
179
                                          bool frozen);
 
180
static long ltsRecallPrevBlockNum(LogicalTapeSet *lts,
 
181
                                          IndirectBlock *indirect);
 
182
static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt);
 
183
 
 
184
 
 
185
/*
 
186
 * Write a block-sized buffer to the specified block of the underlying file.
 
187
 *
 
188
 * NB: should not attempt to write beyond current end of file (ie, create
 
189
 * "holes" in file), since BufFile doesn't allow that.  The first write pass
 
190
 * must write blocks sequentially.
 
191
 *
 
192
 * No need for an error return convention; we ereport() on any error.
 
193
 */
 
194
static void
 
195
ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
 
196
{
 
197
        if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
 
198
                BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
 
199
                ereport(ERROR,
 
200
                /* XXX is it okay to assume errno is correct? */
 
201
                                (errcode_for_file_access(),
 
202
                                 errmsg("could not write block %ld of temporary file: %m",
 
203
                                                blocknum),
 
204
                                 errhint("Perhaps out of disk space?")));
 
205
}
 
206
 
 
207
/*
 
208
 * Read a block-sized buffer from the specified block of the underlying file.
 
209
 *
 
210
 * No need for an error return convention; we ereport() on any error.   This
 
211
 * module should never attempt to read a block it doesn't know is there.
 
212
 */
 
213
static void
 
214
ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
 
215
{
 
216
        if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
 
217
                BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
 
218
                ereport(ERROR,
 
219
                /* XXX is it okay to assume errno is correct? */
 
220
                                (errcode_for_file_access(),
 
221
                                 errmsg("could not read block %ld of temporary file: %m",
 
222
                                                blocknum)));
 
223
}
 
224
 
 
225
/*
 
226
 * Select a currently unused block for writing to.
 
227
 *
 
228
 * NB: should only be called when writer is ready to write immediately,
 
229
 * to ensure that first write pass is sequential.
 
230
 */
 
231
static long
 
232
ltsGetFreeBlock(LogicalTapeSet *lts)
 
233
{
 
234
        /*
 
235
         * If there are multiple free blocks, we select the one appearing last
 
236
         * in freeBlocks[].  If there are none, assign the next block at the
 
237
         * end of the file.
 
238
         */
 
239
        if (lts->nFreeBlocks > 0)
 
240
                return lts->freeBlocks[--lts->nFreeBlocks];
 
241
        else
 
242
                return lts->nFileBlocks++;
 
243
}
 
244
 
 
245
/*
 
246
 * Return a block# to the freelist.
 
247
 */
 
248
static void
 
249
ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
 
250
{
 
251
        int                     ndx;
 
252
        long       *ptr;
 
253
 
 
254
        /*
 
255
         * Enlarge freeBlocks array if full.
 
256
         */
 
257
        if (lts->nFreeBlocks >= lts->freeBlocksLen)
 
258
        {
 
259
                lts->freeBlocksLen *= 2;
 
260
                lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
 
261
                                                                          lts->freeBlocksLen * sizeof(long));
 
262
        }
 
263
 
 
264
        /*
 
265
         * Insert blocknum into array, preserving decreasing order (so that
 
266
         * ltsGetFreeBlock returns the lowest available block number). This
 
267
         * could get fairly slow if there were many free blocks, but we don't
 
268
         * expect there to be very many at one time.
 
269
         */
 
270
        ndx = lts->nFreeBlocks++;
 
271
        ptr = lts->freeBlocks + ndx;
 
272
        while (ndx > 0 && ptr[-1] < blocknum)
 
273
        {
 
274
                ptr[0] = ptr[-1];
 
275
                ndx--, ptr--;
 
276
        }
 
277
        ptr[0] = blocknum;
 
278
}
 
279
 
 
280
/*
 
281
 * These routines manipulate indirect-block hierarchies.  All are recursive
 
282
 * so that they don't have any specific limit on the depth of hierarchy.
 
283
 */
 
284
 
 
285
/*
 
286
 * Record a data block number in a logical tape's lowest indirect block,
 
287
 * or record an indirect block's number in the next higher indirect level.
 
288
 */
 
289
static void
 
290
ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
 
291
                                  long blocknum)
 
292
{
 
293
        if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK)
 
294
        {
 
295
                /*
 
296
                 * This indirect block is full, so dump it out and recursively
 
297
                 * save its address in the next indirection level.      Create a new
 
298
                 * indirection level if there wasn't one before.
 
299
                 */
 
300
                long            indirblock = ltsGetFreeBlock(lts);
 
301
 
 
302
                ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
 
303
                if (indirect->nextup == NULL)
 
304
                {
 
305
                        indirect->nextup = (IndirectBlock *) palloc(sizeof(IndirectBlock));
 
306
                        indirect->nextup->nextSlot = 0;
 
307
                        indirect->nextup->nextup = NULL;
 
308
                }
 
309
                ltsRecordBlockNum(lts, indirect->nextup, indirblock);
 
310
 
 
311
                /*
 
312
                 * Reset to fill another indirect block at this level.
 
313
                 */
 
314
                indirect->nextSlot = 0;
 
315
        }
 
316
        indirect->ptrs[indirect->nextSlot++] = blocknum;
 
317
}
 
318
 
 
319
/*
 
320
 * Reset a logical tape's indirect-block hierarchy after a write pass
 
321
 * to prepare for reading.      We dump out partly-filled blocks except
 
322
 * at the top of the hierarchy, and we rewind each level to the start.
 
323
 * This call returns the first data block number, or -1L if the tape
 
324
 * is empty.
 
325
 *
 
326
 * Unless 'freezing' is true, release indirect blocks to the free pool after
 
327
 * reading them.
 
328
 */
 
329
static long
 
330
ltsRewindIndirectBlock(LogicalTapeSet *lts,
 
331
                                           IndirectBlock *indirect,
 
332
                                           bool freezing)
 
333
{
 
334
        /* Insert sentinel if block is not full */
 
335
        if (indirect->nextSlot < BLOCKS_PER_INDIR_BLOCK)
 
336
                indirect->ptrs[indirect->nextSlot] = -1L;
 
337
 
 
338
        /*
 
339
         * If block is not topmost, write it out, and recurse to obtain
 
340
         * address of first block in this hierarchy level.      Read that one in.
 
341
         */
 
342
        if (indirect->nextup != NULL)
 
343
        {
 
344
                long            indirblock = ltsGetFreeBlock(lts);
 
345
 
 
346
                ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
 
347
                ltsRecordBlockNum(lts, indirect->nextup, indirblock);
 
348
                indirblock = ltsRewindIndirectBlock(lts, indirect->nextup, freezing);
 
349
                Assert(indirblock != -1L);
 
350
                ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
 
351
                if (!freezing)
 
352
                        ltsReleaseBlock(lts, indirblock);
 
353
        }
 
354
 
 
355
        /*
 
356
         * Reset my next-block pointer, and then fetch a block number if any.
 
357
         */
 
358
        indirect->nextSlot = 0;
 
359
        if (indirect->ptrs[0] == -1L)
 
360
                return -1L;
 
361
        return indirect->ptrs[indirect->nextSlot++];
 
362
}
 
363
 
 
364
/*
 
365
 * Rewind a previously-frozen indirect-block hierarchy for another read pass.
 
366
 * This call returns the first data block number, or -1L if the tape
 
367
 * is empty.
 
368
 */
 
369
static long
 
370
ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
 
371
                                                         IndirectBlock *indirect)
 
372
{
 
373
        /*
 
374
         * If block is not topmost, recurse to obtain address of first block
 
375
         * in this hierarchy level.  Read that one in.
 
376
         */
 
377
        if (indirect->nextup != NULL)
 
378
        {
 
379
                long            indirblock;
 
380
 
 
381
                indirblock = ltsRewindFrozenIndirectBlock(lts, indirect->nextup);
 
382
                Assert(indirblock != -1L);
 
383
                ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
 
384
        }
 
385
 
 
386
        /*
 
387
         * Reset my next-block pointer, and then fetch a block number if any.
 
388
         */
 
389
        indirect->nextSlot = 0;
 
390
        if (indirect->ptrs[0] == -1L)
 
391
                return -1L;
 
392
        return indirect->ptrs[indirect->nextSlot++];
 
393
}
 
394
 
 
395
/*
 
396
 * Obtain next data block number in the forward direction, or -1L if no more.
 
397
 *
 
398
 * Unless 'frozen' is true, release indirect blocks to the free pool after
 
399
 * reading them.
 
400
 */
 
401
static long
 
402
ltsRecallNextBlockNum(LogicalTapeSet *lts,
 
403
                                          IndirectBlock *indirect,
 
404
                                          bool frozen)
 
405
{
 
406
        if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK ||
 
407
                indirect->ptrs[indirect->nextSlot] == -1L)
 
408
        {
 
409
                long            indirblock;
 
410
 
 
411
                if (indirect->nextup == NULL)
 
412
                        return -1L;                     /* nothing left at this level */
 
413
                indirblock = ltsRecallNextBlockNum(lts, indirect->nextup, frozen);
 
414
                if (indirblock == -1L)
 
415
                        return -1L;                     /* nothing left at this level */
 
416
                ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
 
417
                if (!frozen)
 
418
                        ltsReleaseBlock(lts, indirblock);
 
419
                indirect->nextSlot = 0;
 
420
        }
 
421
        if (indirect->ptrs[indirect->nextSlot] == -1L)
 
422
                return -1L;
 
423
        return indirect->ptrs[indirect->nextSlot++];
 
424
}
 
425
 
 
426
/*
 
427
 * Obtain next data block number in the reverse direction, or -1L if no more.
 
428
 *
 
429
 * Note this fetches the block# before the one last returned, no matter which
 
430
 * direction of call returned that one.  If we fail, no change in state.
 
431
 *
 
432
 * This routine can only be used in 'frozen' state, so there's no need to
 
433
 * pass a parameter telling whether to release blocks ... we never do.
 
434
 */
 
435
static long
 
436
ltsRecallPrevBlockNum(LogicalTapeSet *lts,
 
437
                                          IndirectBlock *indirect)
 
438
{
 
439
        if (indirect->nextSlot <= 1)
 
440
        {
 
441
                long            indirblock;
 
442
 
 
443
                if (indirect->nextup == NULL)
 
444
                        return -1L;                     /* nothing left at this level */
 
445
                indirblock = ltsRecallPrevBlockNum(lts, indirect->nextup);
 
446
                if (indirblock == -1L)
 
447
                        return -1L;                     /* nothing left at this level */
 
448
                ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
 
449
 
 
450
                /*
 
451
                 * The previous block would only have been written out if full, so
 
452
                 * we need not search it for a -1 sentinel.
 
453
                 */
 
454
                indirect->nextSlot = BLOCKS_PER_INDIR_BLOCK + 1;
 
455
        }
 
456
        indirect->nextSlot--;
 
457
        return indirect->ptrs[indirect->nextSlot - 1];
 
458
}
 
459
 
 
460
 
 
461
/*
 
462
 * Create a set of logical tapes in a temporary underlying file.
 
463
 *
 
464
 * Each tape is initialized in write state.
 
465
 */
 
466
LogicalTapeSet *
 
467
LogicalTapeSetCreate(int ntapes)
 
468
{
 
469
        LogicalTapeSet *lts;
 
470
        LogicalTape *lt;
 
471
        int                     i;
 
472
 
 
473
        /*
 
474
         * Create top-level struct.  First LogicalTape pointer is already
 
475
         * counted in sizeof(LogicalTapeSet).
 
476
         */
 
477
        Assert(ntapes > 0);
 
478
        lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet) +
 
479
                                                                        (ntapes - 1) *sizeof(LogicalTape *));
 
480
        lts->pfile = BufFileCreateTemp(false);
 
481
        lts->nFileBlocks = 0L;
 
482
        lts->freeBlocksLen = 32;        /* reasonable initial guess */
 
483
        lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
 
484
        lts->nFreeBlocks = 0;
 
485
        lts->nTapes = ntapes;
 
486
 
 
487
        /*
 
488
         * Create per-tape structs, including first-level indirect blocks.
 
489
         */
 
490
        for (i = 0; i < ntapes; i++)
 
491
        {
 
492
                lt = (LogicalTape *) palloc(sizeof(LogicalTape));
 
493
                lts->tapes[i] = lt;
 
494
                lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
 
495
                lt->indirect->nextSlot = 0;
 
496
                lt->indirect->nextup = NULL;
 
497
                lt->writing = true;
 
498
                lt->frozen = false;
 
499
                lt->dirty = false;
 
500
                lt->numFullBlocks = 0L;
 
501
                lt->lastBlockBytes = 0;
 
502
                lt->curBlockNumber = 0L;
 
503
                lt->pos = 0;
 
504
                lt->nbytes = 0;
 
505
        }
 
506
        return lts;
 
507
}
 
508
 
 
509
/*
 
510
 * Close a logical tape set and release all resources.
 
511
 */
 
512
void
 
513
LogicalTapeSetClose(LogicalTapeSet *lts)
 
514
{
 
515
        LogicalTape *lt;
 
516
        IndirectBlock *ib,
 
517
                           *nextib;
 
518
        int                     i;
 
519
 
 
520
        BufFileClose(lts->pfile);
 
521
        for (i = 0; i < lts->nTapes; i++)
 
522
        {
 
523
                lt = lts->tapes[i];
 
524
                for (ib = lt->indirect; ib != NULL; ib = nextib)
 
525
                {
 
526
                        nextib = ib->nextup;
 
527
                        pfree(ib);
 
528
                }
 
529
                pfree(lt);
 
530
        }
 
531
        pfree(lts->freeBlocks);
 
532
        pfree(lts);
 
533
}
 
534
 
 
535
/*
 
536
 * Dump the dirty buffer of a logical tape.
 
537
 */
 
538
static void
 
539
ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
 
540
{
 
541
        long            datablock = ltsGetFreeBlock(lts);
 
542
 
 
543
        Assert(lt->dirty);
 
544
        ltsWriteBlock(lts, datablock, (void *) lt->buffer);
 
545
        ltsRecordBlockNum(lts, lt->indirect, datablock);
 
546
        lt->dirty = false;
 
547
        /* Caller must do other state update as needed */
 
548
}
 
549
 
 
550
/*
 
551
 * Write to a logical tape.
 
552
 *
 
553
 * There are no error returns; we ereport() on failure.
 
554
 */
 
555
void
 
556
LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 
557
                                 void *ptr, size_t size)
 
558
{
 
559
        LogicalTape *lt;
 
560
        size_t          nthistime;
 
561
 
 
562
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
563
        lt = lts->tapes[tapenum];
 
564
        Assert(lt->writing);
 
565
 
 
566
        while (size > 0)
 
567
        {
 
568
                if (lt->pos >= BLCKSZ)
 
569
                {
 
570
                        /* Buffer full, dump it out */
 
571
                        if (lt->dirty)
 
572
                                ltsDumpBuffer(lts, lt);
 
573
                        else
 
574
                        {
 
575
                                /* Hmm, went directly from reading to writing? */
 
576
                                elog(ERROR, "invalid logtape state: should be dirty");
 
577
                        }
 
578
                        lt->numFullBlocks++;
 
579
                        lt->curBlockNumber++;
 
580
                        lt->pos = 0;
 
581
                        lt->nbytes = 0;
 
582
                }
 
583
 
 
584
                nthistime = BLCKSZ - lt->pos;
 
585
                if (nthistime > size)
 
586
                        nthistime = size;
 
587
                Assert(nthistime > 0);
 
588
 
 
589
                memcpy(lt->buffer + lt->pos, ptr, nthistime);
 
590
 
 
591
                lt->dirty = true;
 
592
                lt->pos += nthistime;
 
593
                if (lt->nbytes < lt->pos)
 
594
                        lt->nbytes = lt->pos;
 
595
                ptr = (void *) ((char *) ptr + nthistime);
 
596
                size -= nthistime;
 
597
        }
 
598
}
 
599
 
 
600
/*
 
601
 * Rewind logical tape and switch from writing to reading or vice versa.
 
602
 *
 
603
 * Unless the tape has been "frozen" in read state, forWrite must be the
 
604
 * opposite of the previous tape state.
 
605
 */
 
606
void
 
607
LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
 
608
{
 
609
        LogicalTape *lt;
 
610
        long            datablocknum;
 
611
 
 
612
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
613
        lt = lts->tapes[tapenum];
 
614
 
 
615
        if (!forWrite)
 
616
        {
 
617
                if (lt->writing)
 
618
                {
 
619
                        /*
 
620
                         * Completion of a write phase.  Flush last partial data
 
621
                         * block, flush any partial indirect blocks, rewind for normal
 
622
                         * (destructive) read.
 
623
                         */
 
624
                        if (lt->dirty)
 
625
                                ltsDumpBuffer(lts, lt);
 
626
                        lt->lastBlockBytes = lt->nbytes;
 
627
                        lt->writing = false;
 
628
                        datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
 
629
                }
 
630
                else
 
631
                {
 
632
                        /*
 
633
                         * This is only OK if tape is frozen; we rewind for (another)
 
634
                         * read pass.
 
635
                         */
 
636
                        Assert(lt->frozen);
 
637
                        datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
 
638
                }
 
639
                /* Read the first block, or reset if tape is empty */
 
640
                lt->curBlockNumber = 0L;
 
641
                lt->pos = 0;
 
642
                lt->nbytes = 0;
 
643
                if (datablocknum != -1L)
 
644
                {
 
645
                        ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
 
646
                        if (!lt->frozen)
 
647
                                ltsReleaseBlock(lts, datablocknum);
 
648
                        lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
 
649
                                BLCKSZ : lt->lastBlockBytes;
 
650
                }
 
651
        }
 
652
        else
 
653
        {
 
654
                /*
 
655
                 * Completion of a read phase.  Rewind and prepare for write.
 
656
                 *
 
657
                 * NOTE: we assume the caller has read the tape to the end; otherwise
 
658
                 * untouched data and indirect blocks will not have been freed. We
 
659
                 * could add more code to free any unread blocks, but in current
 
660
                 * usage of this module it'd be useless code.
 
661
                 */
 
662
                IndirectBlock *ib,
 
663
                                   *nextib;
 
664
 
 
665
                Assert(!lt->writing && !lt->frozen);
 
666
                /* Must truncate the indirect-block hierarchy down to one level. */
 
667
                for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
 
668
                {
 
669
                        nextib = ib->nextup;
 
670
                        pfree(ib);
 
671
                }
 
672
                lt->indirect->nextSlot = 0;
 
673
                lt->indirect->nextup = NULL;
 
674
                lt->writing = true;
 
675
                lt->dirty = false;
 
676
                lt->numFullBlocks = 0L;
 
677
                lt->lastBlockBytes = 0;
 
678
                lt->curBlockNumber = 0L;
 
679
                lt->pos = 0;
 
680
                lt->nbytes = 0;
 
681
        }
 
682
}
 
683
 
 
684
/*
 
685
 * Read from a logical tape.
 
686
 *
 
687
 * Early EOF is indicated by return value less than #bytes requested.
 
688
 */
 
689
size_t
 
690
LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
 
691
                                void *ptr, size_t size)
 
692
{
 
693
        LogicalTape *lt;
 
694
        size_t          nread = 0;
 
695
        size_t          nthistime;
 
696
 
 
697
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
698
        lt = lts->tapes[tapenum];
 
699
        Assert(!lt->writing);
 
700
 
 
701
        while (size > 0)
 
702
        {
 
703
                if (lt->pos >= lt->nbytes)
 
704
                {
 
705
                        /* Try to load more data into buffer. */
 
706
                        long            datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
 
707
                                                                                                                         lt->frozen);
 
708
 
 
709
                        if (datablocknum == -1L)
 
710
                                break;                  /* EOF */
 
711
                        lt->curBlockNumber++;
 
712
                        lt->pos = 0;
 
713
                        ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
 
714
                        if (!lt->frozen)
 
715
                                ltsReleaseBlock(lts, datablocknum);
 
716
                        lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
 
717
                                BLCKSZ : lt->lastBlockBytes;
 
718
                        if (lt->nbytes <= 0)
 
719
                                break;                  /* EOF (possible here?) */
 
720
                }
 
721
 
 
722
                nthistime = lt->nbytes - lt->pos;
 
723
                if (nthistime > size)
 
724
                        nthistime = size;
 
725
                Assert(nthistime > 0);
 
726
 
 
727
                memcpy(ptr, lt->buffer + lt->pos, nthistime);
 
728
 
 
729
                lt->pos += nthistime;
 
730
                ptr = (void *) ((char *) ptr + nthistime);
 
731
                size -= nthistime;
 
732
                nread += nthistime;
 
733
        }
 
734
 
 
735
        return nread;
 
736
}
 
737
 
 
738
/*
 
739
 * "Freeze" the contents of a tape so that it can be read multiple times
 
740
 * and/or read backwards.  Once a tape is frozen, its contents will not
 
741
 * be released until the LogicalTapeSet is destroyed.  This is expected
 
742
 * to be used only for the final output pass of a merge.
 
743
 *
 
744
 * This *must* be called just at the end of a write pass, before the
 
745
 * tape is rewound (after rewind is too late!).  It performs a rewind
 
746
 * and switch to read mode "for free".  An immediately following rewind-
 
747
 * for-read call is OK but not necessary.
 
748
 */
 
749
void
 
750
LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
 
751
{
 
752
        LogicalTape *lt;
 
753
        long            datablocknum;
 
754
 
 
755
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
756
        lt = lts->tapes[tapenum];
 
757
        Assert(lt->writing);
 
758
 
 
759
        /*
 
760
         * Completion of a write phase.  Flush last partial data block, flush
 
761
         * any partial indirect blocks, rewind for nondestructive read.
 
762
         */
 
763
        if (lt->dirty)
 
764
                ltsDumpBuffer(lts, lt);
 
765
        lt->lastBlockBytes = lt->nbytes;
 
766
        lt->writing = false;
 
767
        lt->frozen = true;
 
768
        datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
 
769
        /* Read the first block, or reset if tape is empty */
 
770
        lt->curBlockNumber = 0L;
 
771
        lt->pos = 0;
 
772
        lt->nbytes = 0;
 
773
        if (datablocknum != -1L)
 
774
        {
 
775
                ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
 
776
                lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
 
777
                        BLCKSZ : lt->lastBlockBytes;
 
778
        }
 
779
}
 
780
 
 
781
/*
 
782
 * Backspace the tape a given number of bytes.  (We also support a more
 
783
 * general seek interface, see below.)
 
784
 *
 
785
 * *Only* a frozen-for-read tape can be backed up; we don't support
 
786
 * random access during write, and an unfrozen read tape may have
 
787
 * already discarded the desired data!
 
788
 *
 
789
 * Return value is TRUE if seek successful, FALSE if there isn't that much
 
790
 * data before the current point (in which case there's no state change).
 
791
 */
 
792
bool
 
793
LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
 
794
{
 
795
        LogicalTape *lt;
 
796
        long            nblocks;
 
797
        int                     newpos;
 
798
 
 
799
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
800
        lt = lts->tapes[tapenum];
 
801
        Assert(lt->frozen);
 
802
 
 
803
        /*
 
804
         * Easy case for seek within current block.
 
805
         */
 
806
        if (size <= (size_t) lt->pos)
 
807
        {
 
808
                lt->pos -= (int) size;
 
809
                return true;
 
810
        }
 
811
 
 
812
        /*
 
813
         * Not-so-easy case.  Figure out whether it's possible at all.
 
814
         */
 
815
        size -= (size_t) lt->pos;       /* part within this block */
 
816
        nblocks = size / BLCKSZ;
 
817
        size = size % BLCKSZ;
 
818
        if (size)
 
819
        {
 
820
                nblocks++;
 
821
                newpos = (int) (BLCKSZ - size);
 
822
        }
 
823
        else
 
824
                newpos = 0;
 
825
        if (nblocks > lt->curBlockNumber)
 
826
                return false;                   /* a seek too far... */
 
827
 
 
828
        /*
 
829
         * OK, we need to back up nblocks blocks.  This implementation would
 
830
         * be pretty inefficient for long seeks, but we really aren't
 
831
         * expecting that (a seek over one tuple is typical).
 
832
         */
 
833
        while (nblocks-- > 0)
 
834
        {
 
835
                long            datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
 
836
 
 
837
                if (datablocknum == -1L)
 
838
                        elog(ERROR, "unexpected end of tape");
 
839
                lt->curBlockNumber--;
 
840
                if (nblocks == 0)
 
841
                {
 
842
                        ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
 
843
                        lt->nbytes = BLCKSZ;
 
844
                }
 
845
        }
 
846
        lt->pos = newpos;
 
847
        return true;
 
848
}
 
849
 
 
850
/*
 
851
 * Seek to an arbitrary position in a logical tape.
 
852
 *
 
853
 * *Only* a frozen-for-read tape can be seeked.
 
854
 *
 
855
 * Return value is TRUE if seek successful, FALSE if there isn't that much
 
856
 * data in the tape (in which case there's no state change).
 
857
 */
 
858
bool
 
859
LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
 
860
                                long blocknum, int offset)
 
861
{
 
862
        LogicalTape *lt;
 
863
 
 
864
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
865
        lt = lts->tapes[tapenum];
 
866
        Assert(lt->frozen);
 
867
        Assert(offset >= 0 && offset <= BLCKSZ);
 
868
 
 
869
        /*
 
870
         * Easy case for seek within current block.
 
871
         */
 
872
        if (blocknum == lt->curBlockNumber && offset <= lt->nbytes)
 
873
        {
 
874
                lt->pos = offset;
 
875
                return true;
 
876
        }
 
877
 
 
878
        /*
 
879
         * Not-so-easy case.  Figure out whether it's possible at all.
 
880
         */
 
881
        if (blocknum < 0 || blocknum > lt->numFullBlocks ||
 
882
                (blocknum == lt->numFullBlocks && offset > lt->lastBlockBytes))
 
883
                return false;
 
884
 
 
885
        /*
 
886
         * OK, advance or back up to the target block.  This implementation
 
887
         * would be pretty inefficient for long seeks, but we really aren't
 
888
         * expecting that (a seek over one tuple is typical).
 
889
         */
 
890
        while (lt->curBlockNumber > blocknum)
 
891
        {
 
892
                long            datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
 
893
 
 
894
                if (datablocknum == -1L)
 
895
                        elog(ERROR, "unexpected end of tape");
 
896
                if (--lt->curBlockNumber == blocknum)
 
897
                        ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
 
898
        }
 
899
        while (lt->curBlockNumber < blocknum)
 
900
        {
 
901
                long            datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
 
902
                                                                                                                 lt->frozen);
 
903
 
 
904
                if (datablocknum == -1L)
 
905
                        elog(ERROR, "unexpected end of tape");
 
906
                if (++lt->curBlockNumber == blocknum)
 
907
                        ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
 
908
        }
 
909
        lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
 
910
                BLCKSZ : lt->lastBlockBytes;
 
911
        lt->pos = offset;
 
912
        return true;
 
913
}
 
914
 
 
915
/*
 
916
 * Obtain current position in a form suitable for a later LogicalTapeSeek.
 
917
 *
 
918
 * NOTE: it'd be OK to do this during write phase with intention of using
 
919
 * the position for a seek after freezing.      Not clear if anyone needs that.
 
920
 */
 
921
void
 
922
LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
 
923
                                long *blocknum, int *offset)
 
924
{
 
925
        LogicalTape *lt;
 
926
 
 
927
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
 
928
        lt = lts->tapes[tapenum];
 
929
        *blocknum = lt->curBlockNumber;
 
930
        *offset = lt->pos;
 
931
}