~ubuntu-branches/ubuntu/oneiric/postgresql-9.1/oneiric-security

« back to all changes in this revision

Viewing changes to src/backend/access/nbtree/nbtree.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2011-05-11 10:41:53 UTC
  • Revision ID: james.westby@ubuntu.com-20110511104153-psbh2o58553fv1m0
Tags: upstream-9.1~beta1
ImportĀ upstreamĀ versionĀ 9.1~beta1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * nbtree.c
 
4
 *        Implementation of Lehman and Yao's btree management algorithm for
 
5
 *        Postgres.
 
6
 *
 
7
 * NOTES
 
8
 *        This file contains only the public interface routines.
 
9
 *
 
10
 *
 
11
 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
 
12
 * Portions Copyright (c) 1994, Regents of the University of California
 
13
 *
 
14
 * IDENTIFICATION
 
15
 *        src/backend/access/nbtree/nbtree.c
 
16
 *
 
17
 *-------------------------------------------------------------------------
 
18
 */
 
19
#include "postgres.h"
 
20
 
 
21
#include "access/genam.h"
 
22
#include "access/nbtree.h"
 
23
#include "access/relscan.h"
 
24
#include "catalog/index.h"
 
25
#include "catalog/storage.h"
 
26
#include "commands/vacuum.h"
 
27
#include "storage/bufmgr.h"
 
28
#include "storage/freespace.h"
 
29
#include "storage/indexfsm.h"
 
30
#include "storage/ipc.h"
 
31
#include "storage/lmgr.h"
 
32
#include "storage/predicate.h"
 
33
#include "storage/smgr.h"
 
34
#include "tcop/tcopprot.h"
 
35
#include "utils/memutils.h"
 
36
 
 
37
 
 
38
/* Working state for btbuild and its callback */
 
39
typedef struct
 
40
{
 
41
        bool            isUnique;
 
42
        bool            haveDead;
 
43
        Relation        heapRel;
 
44
        BTSpool    *spool;
 
45
 
 
46
        /*
 
47
         * spool2 is needed only when the index is an unique index. Dead tuples
 
48
         * are put into spool2 instead of spool in order to avoid uniqueness
 
49
         * check.
 
50
         */
 
51
        BTSpool    *spool2;
 
52
        double          indtuples;
 
53
} BTBuildState;
 
54
 
 
55
/* Working state needed by btvacuumpage */
 
56
typedef struct
 
57
{
 
58
        IndexVacuumInfo *info;
 
59
        IndexBulkDeleteResult *stats;
 
60
        IndexBulkDeleteCallback callback;
 
61
        void       *callback_state;
 
62
        BTCycleId       cycleid;
 
63
        BlockNumber lastBlockVacuumed;          /* last blkno reached by Vacuum scan */
 
64
        BlockNumber lastUsedPage;       /* blkno of last non-recyclable page */
 
65
        BlockNumber totFreePages;       /* true total # of free pages */
 
66
        MemoryContext pagedelcontext;
 
67
} BTVacState;
 
68
 
 
69
 
 
70
static void btbuildCallback(Relation index,
 
71
                                HeapTuple htup,
 
72
                                Datum *values,
 
73
                                bool *isnull,
 
74
                                bool tupleIsAlive,
 
75
                                void *state);
 
76
static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 
77
                         IndexBulkDeleteCallback callback, void *callback_state,
 
78
                         BTCycleId cycleid);
 
79
static void btvacuumpage(BTVacState *vstate, BlockNumber blkno,
 
80
                         BlockNumber orig_blkno);
 
81
 
 
82
 
 
83
/*
 
84
 *      btbuild() -- build a new btree index.
 
85
 */
 
86
Datum
 
87
btbuild(PG_FUNCTION_ARGS)
 
88
{
 
89
        Relation        heap = (Relation) PG_GETARG_POINTER(0);
 
90
        Relation        index = (Relation) PG_GETARG_POINTER(1);
 
91
        IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
 
92
        IndexBuildResult *result;
 
93
        double          reltuples;
 
94
        BTBuildState buildstate;
 
95
 
 
96
        buildstate.isUnique = indexInfo->ii_Unique;
 
97
        buildstate.haveDead = false;
 
98
        buildstate.heapRel = heap;
 
99
        buildstate.spool = NULL;
 
100
        buildstate.spool2 = NULL;
 
101
        buildstate.indtuples = 0;
 
102
 
 
103
#ifdef BTREE_BUILD_STATS
 
104
        if (log_btree_build_stats)
 
105
                ResetUsage();
 
106
#endif   /* BTREE_BUILD_STATS */
 
107
 
 
108
        /*
 
109
         * We expect to be called exactly once for any index relation. If that's
 
110
         * not the case, big trouble's what we have.
 
111
         */
 
112
        if (RelationGetNumberOfBlocks(index) != 0)
 
113
                elog(ERROR, "index \"%s\" already contains data",
 
114
                         RelationGetRelationName(index));
 
115
 
 
116
        buildstate.spool = _bt_spoolinit(index, indexInfo->ii_Unique, false);
 
117
 
 
118
        /*
 
119
         * If building a unique index, put dead tuples in a second spool to keep
 
120
         * them out of the uniqueness check.
 
121
         */
 
122
        if (indexInfo->ii_Unique)
 
123
                buildstate.spool2 = _bt_spoolinit(index, false, true);
 
124
 
 
125
        /* do the heap scan */
 
126
        reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
 
127
                                                                   btbuildCallback, (void *) &buildstate);
 
128
 
 
129
        /* okay, all heap tuples are indexed */
 
130
        if (buildstate.spool2 && !buildstate.haveDead)
 
131
        {
 
132
                /* spool2 turns out to be unnecessary */
 
133
                _bt_spooldestroy(buildstate.spool2);
 
134
                buildstate.spool2 = NULL;
 
135
        }
 
136
 
 
137
        /*
 
138
         * Finish the build by (1) completing the sort of the spool file, (2)
 
139
         * inserting the sorted tuples into btree pages and (3) building the upper
 
140
         * levels.
 
141
         */
 
142
        _bt_leafbuild(buildstate.spool, buildstate.spool2);
 
143
        _bt_spooldestroy(buildstate.spool);
 
144
        if (buildstate.spool2)
 
145
                _bt_spooldestroy(buildstate.spool2);
 
146
 
 
147
#ifdef BTREE_BUILD_STATS
 
148
        if (log_btree_build_stats)
 
149
        {
 
150
                ShowUsage("BTREE BUILD STATS");
 
151
                ResetUsage();
 
152
        }
 
153
#endif   /* BTREE_BUILD_STATS */
 
154
 
 
155
        /*
 
156
         * If we are reindexing a pre-existing index, it is critical to send out a
 
157
         * relcache invalidation SI message to ensure all backends re-read the
 
158
         * index metapage.      We expect that the caller will ensure that happens
 
159
         * (typically as a side effect of updating index stats, but it must happen
 
160
         * even if the stats don't change!)
 
161
         */
 
162
 
 
163
        /*
 
164
         * Return statistics
 
165
         */
 
166
        result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
 
167
 
 
168
        result->heap_tuples = reltuples;
 
169
        result->index_tuples = buildstate.indtuples;
 
170
 
 
171
        PG_RETURN_POINTER(result);
 
172
}
 
173
 
 
174
/*
 
175
 * Per-tuple callback from IndexBuildHeapScan
 
176
 */
 
177
static void
 
178
btbuildCallback(Relation index,
 
179
                                HeapTuple htup,
 
180
                                Datum *values,
 
181
                                bool *isnull,
 
182
                                bool tupleIsAlive,
 
183
                                void *state)
 
184
{
 
185
        BTBuildState *buildstate = (BTBuildState *) state;
 
186
        IndexTuple      itup;
 
187
 
 
188
        /* form an index tuple and point it at the heap tuple */
 
189
        itup = index_form_tuple(RelationGetDescr(index), values, isnull);
 
190
        itup->t_tid = htup->t_self;
 
191
 
 
192
        /*
 
193
         * insert the index tuple into the appropriate spool file for subsequent
 
194
         * processing
 
195
         */
 
196
        if (tupleIsAlive || buildstate->spool2 == NULL)
 
197
                _bt_spool(itup, buildstate->spool);
 
198
        else
 
199
        {
 
200
                /* dead tuples are put into spool2 */
 
201
                buildstate->haveDead = true;
 
202
                _bt_spool(itup, buildstate->spool2);
 
203
        }
 
204
 
 
205
        buildstate->indtuples += 1;
 
206
 
 
207
        pfree(itup);
 
208
}
 
209
 
 
210
/*
 
211
 *      btbuildempty() -- build an empty btree index in the initialization fork
 
212
 */
 
213
Datum
 
214
btbuildempty(PG_FUNCTION_ARGS)
 
215
{
 
216
        Relation        index = (Relation) PG_GETARG_POINTER(0);
 
217
        Page            metapage;
 
218
 
 
219
        /* Construct metapage. */
 
220
        metapage = (Page) palloc(BLCKSZ);
 
221
        _bt_initmetapage(metapage, P_NONE, 0);
 
222
 
 
223
        /* Write the page.      If archiving/streaming, XLOG it. */
 
224
        smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
 
225
                          (char *) metapage, true);
 
226
        if (XLogIsNeeded())
 
227
                log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
 
228
                                        BTREE_METAPAGE, metapage);
 
229
 
 
230
        /*
 
231
         * An immediate sync is require even if we xlog'd the page, because the
 
232
         * write did not go through shared_buffers and therefore a concurrent
 
233
         * checkpoint may have move the redo pointer past our xlog record.
 
234
         */
 
235
        smgrimmedsync(index->rd_smgr, INIT_FORKNUM);
 
236
 
 
237
        PG_RETURN_VOID();
 
238
}
 
239
 
 
240
/*
 
241
 *      btinsert() -- insert an index tuple into a btree.
 
242
 *
 
243
 *              Descend the tree recursively, find the appropriate location for our
 
244
 *              new tuple, and put it there.
 
245
 */
 
246
Datum
 
247
btinsert(PG_FUNCTION_ARGS)
 
248
{
 
249
        Relation        rel = (Relation) PG_GETARG_POINTER(0);
 
250
        Datum      *values = (Datum *) PG_GETARG_POINTER(1);
 
251
        bool       *isnull = (bool *) PG_GETARG_POINTER(2);
 
252
        ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
 
253
        Relation        heapRel = (Relation) PG_GETARG_POINTER(4);
 
254
        IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
 
255
        bool            result;
 
256
        IndexTuple      itup;
 
257
 
 
258
        /* generate an index tuple */
 
259
        itup = index_form_tuple(RelationGetDescr(rel), values, isnull);
 
260
        itup->t_tid = *ht_ctid;
 
261
 
 
262
        result = _bt_doinsert(rel, itup, checkUnique, heapRel);
 
263
 
 
264
        pfree(itup);
 
265
 
 
266
        PG_RETURN_BOOL(result);
 
267
}
 
268
 
 
269
/*
 
270
 *      btgettuple() -- Get the next tuple in the scan.
 
271
 */
 
272
Datum
 
273
btgettuple(PG_FUNCTION_ARGS)
 
274
{
 
275
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
276
        ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
 
277
        BTScanOpaque so = (BTScanOpaque) scan->opaque;
 
278
        bool            res;
 
279
 
 
280
        /* btree indexes are never lossy */
 
281
        scan->xs_recheck = false;
 
282
 
 
283
        /*
 
284
         * If we've already initialized this scan, we can just advance it in the
 
285
         * appropriate direction.  If we haven't done so yet, we call a routine to
 
286
         * get the first item in the scan.
 
287
         */
 
288
        if (BTScanPosIsValid(so->currPos))
 
289
        {
 
290
                /*
 
291
                 * Check to see if we should kill the previously-fetched tuple.
 
292
                 */
 
293
                if (scan->kill_prior_tuple)
 
294
                {
 
295
                        /*
 
296
                         * Yes, remember it for later.  (We'll deal with all such tuples
 
297
                         * at once right before leaving the index page.)  The test for
 
298
                         * numKilled overrun is not just paranoia: if the caller reverses
 
299
                         * direction in the indexscan then the same item might get entered
 
300
                         * multiple times.      It's not worth trying to optimize that, so we
 
301
                         * don't detect it, but instead just forget any excess entries.
 
302
                         */
 
303
                        if (so->killedItems == NULL)
 
304
                                so->killedItems = (int *)
 
305
                                        palloc(MaxIndexTuplesPerPage * sizeof(int));
 
306
                        if (so->numKilled < MaxIndexTuplesPerPage)
 
307
                                so->killedItems[so->numKilled++] = so->currPos.itemIndex;
 
308
                }
 
309
 
 
310
                /*
 
311
                 * Now continue the scan.
 
312
                 */
 
313
                res = _bt_next(scan, dir);
 
314
        }
 
315
        else
 
316
                res = _bt_first(scan, dir);
 
317
 
 
318
        PG_RETURN_BOOL(res);
 
319
}
 
320
 
 
321
/*
 
322
 * btgetbitmap() -- gets all matching tuples, and adds them to a bitmap
 
323
 */
 
324
Datum
 
325
btgetbitmap(PG_FUNCTION_ARGS)
 
326
{
 
327
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
328
        TIDBitmap  *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
 
329
        BTScanOpaque so = (BTScanOpaque) scan->opaque;
 
330
        int64           ntids = 0;
 
331
        ItemPointer heapTid;
 
332
 
 
333
        /* Fetch the first page & tuple. */
 
334
        if (!_bt_first(scan, ForwardScanDirection))
 
335
        {
 
336
                /* empty scan */
 
337
                PG_RETURN_INT64(0);
 
338
        }
 
339
        /* Save tuple ID, and continue scanning */
 
340
        heapTid = &scan->xs_ctup.t_self;
 
341
        tbm_add_tuples(tbm, heapTid, 1, false);
 
342
        ntids++;
 
343
 
 
344
        for (;;)
 
345
        {
 
346
                /*
 
347
                 * Advance to next tuple within page.  This is the same as the easy
 
348
                 * case in _bt_next().
 
349
                 */
 
350
                if (++so->currPos.itemIndex > so->currPos.lastItem)
 
351
                {
 
352
                        /* let _bt_next do the heavy lifting */
 
353
                        if (!_bt_next(scan, ForwardScanDirection))
 
354
                                break;
 
355
                }
 
356
 
 
357
                /* Save tuple ID, and continue scanning */
 
358
                heapTid = &so->currPos.items[so->currPos.itemIndex].heapTid;
 
359
                tbm_add_tuples(tbm, heapTid, 1, false);
 
360
                ntids++;
 
361
        }
 
362
 
 
363
        PG_RETURN_INT64(ntids);
 
364
}
 
365
 
 
366
/*
 
367
 *      btbeginscan() -- start a scan on a btree index
 
368
 */
 
369
Datum
 
370
btbeginscan(PG_FUNCTION_ARGS)
 
371
{
 
372
        Relation        rel = (Relation) PG_GETARG_POINTER(0);
 
373
        int                     nkeys = PG_GETARG_INT32(1);
 
374
        int                     norderbys = PG_GETARG_INT32(2);
 
375
        IndexScanDesc scan;
 
376
        BTScanOpaque so;
 
377
 
 
378
        /* no order by operators allowed */
 
379
        Assert(norderbys == 0);
 
380
 
 
381
        /* get the scan */
 
382
        scan = RelationGetIndexScan(rel, nkeys, norderbys);
 
383
 
 
384
        /* allocate private workspace */
 
385
        so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData));
 
386
        so->currPos.buf = so->markPos.buf = InvalidBuffer;
 
387
        if (scan->numberOfKeys > 0)
 
388
                so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
 
389
        else
 
390
                so->keyData = NULL;
 
391
        so->killedItems = NULL;         /* until needed */
 
392
        so->numKilled = 0;
 
393
        scan->opaque = so;
 
394
 
 
395
        PG_RETURN_POINTER(scan);
 
396
}
 
397
 
 
398
/*
 
399
 *      btrescan() -- rescan an index relation
 
400
 */
 
401
Datum
 
402
btrescan(PG_FUNCTION_ARGS)
 
403
{
 
404
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
405
        ScanKey         scankey = (ScanKey) PG_GETARG_POINTER(1);
 
406
 
 
407
        /* remaining arguments are ignored */
 
408
        BTScanOpaque so = (BTScanOpaque) scan->opaque;
 
409
 
 
410
        /* we aren't holding any read locks, but gotta drop the pins */
 
411
        if (BTScanPosIsValid(so->currPos))
 
412
        {
 
413
                /* Before leaving current page, deal with any killed items */
 
414
                if (so->numKilled > 0)
 
415
                        _bt_killitems(scan, false);
 
416
                ReleaseBuffer(so->currPos.buf);
 
417
                so->currPos.buf = InvalidBuffer;
 
418
        }
 
419
 
 
420
        if (BTScanPosIsValid(so->markPos))
 
421
        {
 
422
                ReleaseBuffer(so->markPos.buf);
 
423
                so->markPos.buf = InvalidBuffer;
 
424
        }
 
425
        so->markItemIndex = -1;
 
426
 
 
427
        /*
 
428
         * Reset the scan keys. Note that keys ordering stuff moved to _bt_first.
 
429
         * - vadim 05/05/97
 
430
         */
 
431
        if (scankey && scan->numberOfKeys > 0)
 
432
                memmove(scan->keyData,
 
433
                                scankey,
 
434
                                scan->numberOfKeys * sizeof(ScanKeyData));
 
435
        so->numberOfKeys = 0;           /* until _bt_preprocess_keys sets it */
 
436
 
 
437
        PG_RETURN_VOID();
 
438
}
 
439
 
 
440
/*
 
441
 *      btendscan() -- close down a scan
 
442
 */
 
443
Datum
 
444
btendscan(PG_FUNCTION_ARGS)
 
445
{
 
446
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
447
        BTScanOpaque so = (BTScanOpaque) scan->opaque;
 
448
 
 
449
        /* we aren't holding any read locks, but gotta drop the pins */
 
450
        if (BTScanPosIsValid(so->currPos))
 
451
        {
 
452
                /* Before leaving current page, deal with any killed items */
 
453
                if (so->numKilled > 0)
 
454
                        _bt_killitems(scan, false);
 
455
                ReleaseBuffer(so->currPos.buf);
 
456
                so->currPos.buf = InvalidBuffer;
 
457
        }
 
458
 
 
459
        if (BTScanPosIsValid(so->markPos))
 
460
        {
 
461
                ReleaseBuffer(so->markPos.buf);
 
462
                so->markPos.buf = InvalidBuffer;
 
463
        }
 
464
        so->markItemIndex = -1;
 
465
 
 
466
        if (so->killedItems != NULL)
 
467
                pfree(so->killedItems);
 
468
        if (so->keyData != NULL)
 
469
                pfree(so->keyData);
 
470
        pfree(so);
 
471
 
 
472
        PG_RETURN_VOID();
 
473
}
 
474
 
 
475
/*
 
476
 *      btmarkpos() -- save current scan position
 
477
 */
 
478
Datum
 
479
btmarkpos(PG_FUNCTION_ARGS)
 
480
{
 
481
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
482
        BTScanOpaque so = (BTScanOpaque) scan->opaque;
 
483
 
 
484
        /* we aren't holding any read locks, but gotta drop the pin */
 
485
        if (BTScanPosIsValid(so->markPos))
 
486
        {
 
487
                ReleaseBuffer(so->markPos.buf);
 
488
                so->markPos.buf = InvalidBuffer;
 
489
        }
 
490
 
 
491
        /*
 
492
         * Just record the current itemIndex.  If we later step to next page
 
493
         * before releasing the marked position, _bt_steppage makes a full copy of
 
494
         * the currPos struct in markPos.  If (as often happens) the mark is moved
 
495
         * before we leave the page, we don't have to do that work.
 
496
         */
 
497
        if (BTScanPosIsValid(so->currPos))
 
498
                so->markItemIndex = so->currPos.itemIndex;
 
499
        else
 
500
                so->markItemIndex = -1;
 
501
 
 
502
        PG_RETURN_VOID();
 
503
}
 
504
 
 
505
/*
 
506
 *      btrestrpos() -- restore scan to last saved position
 
507
 */
 
508
Datum
 
509
btrestrpos(PG_FUNCTION_ARGS)
 
510
{
 
511
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
512
        BTScanOpaque so = (BTScanOpaque) scan->opaque;
 
513
 
 
514
        if (so->markItemIndex >= 0)
 
515
        {
 
516
                /*
 
517
                 * The mark position is on the same page we are currently on. Just
 
518
                 * restore the itemIndex.
 
519
                 */
 
520
                so->currPos.itemIndex = so->markItemIndex;
 
521
        }
 
522
        else
 
523
        {
 
524
                /* we aren't holding any read locks, but gotta drop the pin */
 
525
                if (BTScanPosIsValid(so->currPos))
 
526
                {
 
527
                        /* Before leaving current page, deal with any killed items */
 
528
                        if (so->numKilled > 0 &&
 
529
                                so->currPos.buf != so->markPos.buf)
 
530
                                _bt_killitems(scan, false);
 
531
                        ReleaseBuffer(so->currPos.buf);
 
532
                        so->currPos.buf = InvalidBuffer;
 
533
                }
 
534
 
 
535
                if (BTScanPosIsValid(so->markPos))
 
536
                {
 
537
                        /* bump pin on mark buffer for assignment to current buffer */
 
538
                        IncrBufferRefCount(so->markPos.buf);
 
539
                        memcpy(&so->currPos, &so->markPos,
 
540
                                   offsetof(BTScanPosData, items[1]) +
 
541
                                   so->markPos.lastItem * sizeof(BTScanPosItem));
 
542
                }
 
543
        }
 
544
 
 
545
        PG_RETURN_VOID();
 
546
}
 
547
 
 
548
/*
 
549
 * Bulk deletion of all index entries pointing to a set of heap tuples.
 
550
 * The set of target tuples is specified via a callback routine that tells
 
551
 * whether any given heap tuple (identified by ItemPointer) is being deleted.
 
552
 *
 
553
 * Result: a palloc'd struct containing statistical info for VACUUM displays.
 
554
 */
 
555
Datum
 
556
btbulkdelete(PG_FUNCTION_ARGS)
 
557
{
 
558
        IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
 
559
        IndexBulkDeleteResult *volatile stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
 
560
        IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
 
561
        void       *callback_state = (void *) PG_GETARG_POINTER(3);
 
562
        Relation        rel = info->index;
 
563
        BTCycleId       cycleid;
 
564
 
 
565
        /* allocate stats if first time through, else re-use existing struct */
 
566
        if (stats == NULL)
 
567
                stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
 
568
 
 
569
        /* Establish the vacuum cycle ID to use for this scan */
 
570
        /* The ENSURE stuff ensures we clean up shared memory on failure */
 
571
        PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel));
 
572
        {
 
573
                cycleid = _bt_start_vacuum(rel);
 
574
 
 
575
                btvacuumscan(info, stats, callback, callback_state, cycleid);
 
576
        }
 
577
        PG_END_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel));
 
578
        _bt_end_vacuum(rel);
 
579
 
 
580
        PG_RETURN_POINTER(stats);
 
581
}
 
582
 
 
583
/*
 
584
 * Post-VACUUM cleanup.
 
585
 *
 
586
 * Result: a palloc'd struct containing statistical info for VACUUM displays.
 
587
 */
 
588
Datum
 
589
btvacuumcleanup(PG_FUNCTION_ARGS)
 
590
{
 
591
        IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
 
592
        IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
 
593
 
 
594
        /* No-op in ANALYZE ONLY mode */
 
595
        if (info->analyze_only)
 
596
                PG_RETURN_POINTER(stats);
 
597
 
 
598
        /*
 
599
         * If btbulkdelete was called, we need not do anything, just return the
 
600
         * stats from the latest btbulkdelete call.  If it wasn't called, we must
 
601
         * still do a pass over the index, to recycle any newly-recyclable pages
 
602
         * and to obtain index statistics.
 
603
         *
 
604
         * Since we aren't going to actually delete any leaf items, there's no
 
605
         * need to go through all the vacuum-cycle-ID pushups.
 
606
         */
 
607
        if (stats == NULL)
 
608
        {
 
609
                stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
 
610
                btvacuumscan(info, stats, NULL, NULL, 0);
 
611
        }
 
612
 
 
613
        /* Finally, vacuum the FSM */
 
614
        IndexFreeSpaceMapVacuum(info->index);
 
615
 
 
616
        /*
 
617
         * It's quite possible for us to be fooled by concurrent page splits into
 
618
         * double-counting some index tuples, so disbelieve any total that exceeds
 
619
         * the underlying heap's count ... if we know that accurately.  Otherwise
 
620
         * this might just make matters worse.
 
621
         */
 
622
        if (!info->estimated_count)
 
623
        {
 
624
                if (stats->num_index_tuples > info->num_heap_tuples)
 
625
                        stats->num_index_tuples = info->num_heap_tuples;
 
626
        }
 
627
 
 
628
        PG_RETURN_POINTER(stats);
 
629
}
 
630
 
 
631
/*
 
632
 * btvacuumscan --- scan the index for VACUUMing purposes
 
633
 *
 
634
 * This combines the functions of looking for leaf tuples that are deletable
 
635
 * according to the vacuum callback, looking for empty pages that can be
 
636
 * deleted, and looking for old deleted pages that can be recycled.  Both
 
637
 * btbulkdelete and btvacuumcleanup invoke this (the latter only if no
 
638
 * btbulkdelete call occurred).
 
639
 *
 
640
 * The caller is responsible for initially allocating/zeroing a stats struct
 
641
 * and for obtaining a vacuum cycle ID if necessary.
 
642
 */
 
643
static void
 
644
btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 
645
                         IndexBulkDeleteCallback callback, void *callback_state,
 
646
                         BTCycleId cycleid)
 
647
{
 
648
        Relation        rel = info->index;
 
649
        BTVacState      vstate;
 
650
        BlockNumber num_pages;
 
651
        BlockNumber blkno;
 
652
        bool            needLock;
 
653
 
 
654
        /*
 
655
         * Reset counts that will be incremented during the scan; needed in case
 
656
         * of multiple scans during a single VACUUM command
 
657
         */
 
658
        stats->estimated_count = false;
 
659
        stats->num_index_tuples = 0;
 
660
        stats->pages_deleted = 0;
 
661
 
 
662
        /* Set up info to pass down to btvacuumpage */
 
663
        vstate.info = info;
 
664
        vstate.stats = stats;
 
665
        vstate.callback = callback;
 
666
        vstate.callback_state = callback_state;
 
667
        vstate.cycleid = cycleid;
 
668
        vstate.lastBlockVacuumed = BTREE_METAPAGE;      /* Initialise at first block */
 
669
        vstate.lastUsedPage = BTREE_METAPAGE;
 
670
        vstate.totFreePages = 0;
 
671
 
 
672
        /* Create a temporary memory context to run _bt_pagedel in */
 
673
        vstate.pagedelcontext = AllocSetContextCreate(CurrentMemoryContext,
 
674
                                                                                                  "_bt_pagedel",
 
675
                                                                                                  ALLOCSET_DEFAULT_MINSIZE,
 
676
                                                                                                  ALLOCSET_DEFAULT_INITSIZE,
 
677
                                                                                                  ALLOCSET_DEFAULT_MAXSIZE);
 
678
 
 
679
        /*
 
680
         * The outer loop iterates over all index pages except the metapage, in
 
681
         * physical order (we hope the kernel will cooperate in providing
 
682
         * read-ahead for speed).  It is critical that we visit all leaf pages,
 
683
         * including ones added after we start the scan, else we might fail to
 
684
         * delete some deletable tuples.  Hence, we must repeatedly check the
 
685
         * relation length.  We must acquire the relation-extension lock while
 
686
         * doing so to avoid a race condition: if someone else is extending the
 
687
         * relation, there is a window where bufmgr/smgr have created a new
 
688
         * all-zero page but it hasn't yet been write-locked by _bt_getbuf(). If
 
689
         * we manage to scan such a page here, we'll improperly assume it can be
 
690
         * recycled.  Taking the lock synchronizes things enough to prevent a
 
691
         * problem: either num_pages won't include the new page, or _bt_getbuf
 
692
         * already has write lock on the buffer and it will be fully initialized
 
693
         * before we can examine it.  (See also vacuumlazy.c, which has the same
 
694
         * issue.)      Also, we need not worry if a page is added immediately after
 
695
         * we look; the page splitting code already has write-lock on the left
 
696
         * page before it adds a right page, so we must already have processed any
 
697
         * tuples due to be moved into such a page.
 
698
         *
 
699
         * We can skip locking for new or temp relations, however, since no one
 
700
         * else could be accessing them.
 
701
         */
 
702
        needLock = !RELATION_IS_LOCAL(rel);
 
703
 
 
704
        blkno = BTREE_METAPAGE + 1;
 
705
        for (;;)
 
706
        {
 
707
                /* Get the current relation length */
 
708
                if (needLock)
 
709
                        LockRelationForExtension(rel, ExclusiveLock);
 
710
                num_pages = RelationGetNumberOfBlocks(rel);
 
711
                if (needLock)
 
712
                        UnlockRelationForExtension(rel, ExclusiveLock);
 
713
 
 
714
                /* Quit if we've scanned the whole relation */
 
715
                if (blkno >= num_pages)
 
716
                        break;
 
717
                /* Iterate over pages, then loop back to recheck length */
 
718
                for (; blkno < num_pages; blkno++)
 
719
                {
 
720
                        btvacuumpage(&vstate, blkno, blkno);
 
721
                }
 
722
        }
 
723
 
 
724
        /*
 
725
         * InHotStandby we need to scan right up to the end of the index for
 
726
         * correct locking, so we may need to write a WAL record for the final
 
727
         * block in the index if it was not vacuumed. It's possible that VACUUMing
 
728
         * has actually removed zeroed pages at the end of the index so we need to
 
729
         * take care to issue the record for last actual block and not for the
 
730
         * last block that was scanned. Ignore empty indexes.
 
731
         */
 
732
        if (XLogStandbyInfoActive() &&
 
733
                num_pages > 1 && vstate.lastBlockVacuumed < (num_pages - 1))
 
734
        {
 
735
                Buffer          buf;
 
736
 
 
737
                /*
 
738
                 * We can't use _bt_getbuf() here because it always applies
 
739
                 * _bt_checkpage(), which will barf on an all-zero page. We want to
 
740
                 * recycle all-zero pages, not fail.  Also, we want to use a
 
741
                 * nondefault buffer access strategy.
 
742
                 */
 
743
                buf = ReadBufferExtended(rel, MAIN_FORKNUM, num_pages - 1, RBM_NORMAL,
 
744
                                                                 info->strategy);
 
745
                LockBufferForCleanup(buf);
 
746
                _bt_delitems_vacuum(rel, buf, NULL, 0, vstate.lastBlockVacuumed);
 
747
                _bt_relbuf(rel, buf);
 
748
        }
 
749
 
 
750
        MemoryContextDelete(vstate.pagedelcontext);
 
751
 
 
752
        /* update statistics */
 
753
        stats->num_pages = num_pages;
 
754
        stats->pages_free = vstate.totFreePages;
 
755
}
 
756
 
 
757
/*
 
758
 * btvacuumpage --- VACUUM one page
 
759
 *
 
760
 * This processes a single page for btvacuumscan().  In some cases we
 
761
 * must go back and re-examine previously-scanned pages; this routine
 
762
 * recurses when necessary to handle that case.
 
763
 *
 
764
 * blkno is the page to process.  orig_blkno is the highest block number
 
765
 * reached by the outer btvacuumscan loop (the same as blkno, unless we
 
766
 * are recursing to re-examine a previous page).
 
767
 */
 
768
static void
 
769
btvacuumpage(BTVacState *vstate, BlockNumber blkno, BlockNumber orig_blkno)
 
770
{
 
771
        IndexVacuumInfo *info = vstate->info;
 
772
        IndexBulkDeleteResult *stats = vstate->stats;
 
773
        IndexBulkDeleteCallback callback = vstate->callback;
 
774
        void       *callback_state = vstate->callback_state;
 
775
        Relation        rel = info->index;
 
776
        bool            delete_now;
 
777
        BlockNumber recurse_to;
 
778
        Buffer          buf;
 
779
        Page            page;
 
780
        BTPageOpaque opaque;
 
781
 
 
782
restart:
 
783
        delete_now = false;
 
784
        recurse_to = P_NONE;
 
785
 
 
786
        /* call vacuum_delay_point while not holding any buffer lock */
 
787
        vacuum_delay_point();
 
788
 
 
789
        /*
 
790
         * We can't use _bt_getbuf() here because it always applies
 
791
         * _bt_checkpage(), which will barf on an all-zero page. We want to
 
792
         * recycle all-zero pages, not fail.  Also, we want to use a nondefault
 
793
         * buffer access strategy.
 
794
         */
 
795
        buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
 
796
                                                         info->strategy);
 
797
        LockBuffer(buf, BT_READ);
 
798
        page = BufferGetPage(buf);
 
799
        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
800
        if (!PageIsNew(page))
 
801
                _bt_checkpage(rel, buf);
 
802
 
 
803
        /*
 
804
         * If we are recursing, the only case we want to do anything with is a
 
805
         * live leaf page having the current vacuum cycle ID.  Any other state
 
806
         * implies we already saw the page (eg, deleted it as being empty).
 
807
         */
 
808
        if (blkno != orig_blkno)
 
809
        {
 
810
                if (_bt_page_recyclable(page) ||
 
811
                        P_IGNORE(opaque) ||
 
812
                        !P_ISLEAF(opaque) ||
 
813
                        opaque->btpo_cycleid != vstate->cycleid)
 
814
                {
 
815
                        _bt_relbuf(rel, buf);
 
816
                        return;
 
817
                }
 
818
        }
 
819
 
 
820
        /* If the page is in use, update lastUsedPage */
 
821
        if (!_bt_page_recyclable(page) && vstate->lastUsedPage < blkno)
 
822
                vstate->lastUsedPage = blkno;
 
823
 
 
824
        /* Page is valid, see what to do with it */
 
825
        if (_bt_page_recyclable(page))
 
826
        {
 
827
                /* Okay to recycle this page */
 
828
                RecordFreeIndexPage(rel, blkno);
 
829
                vstate->totFreePages++;
 
830
                stats->pages_deleted++;
 
831
        }
 
832
        else if (P_ISDELETED(opaque))
 
833
        {
 
834
                /* Already deleted, but can't recycle yet */
 
835
                stats->pages_deleted++;
 
836
        }
 
837
        else if (P_ISHALFDEAD(opaque))
 
838
        {
 
839
                /* Half-dead, try to delete */
 
840
                delete_now = true;
 
841
        }
 
842
        else if (P_ISLEAF(opaque))
 
843
        {
 
844
                OffsetNumber deletable[MaxOffsetNumber];
 
845
                int                     ndeletable;
 
846
                OffsetNumber offnum,
 
847
                                        minoff,
 
848
                                        maxoff;
 
849
 
 
850
                /*
 
851
                 * Trade in the initial read lock for a super-exclusive write lock on
 
852
                 * this page.  We must get such a lock on every leaf page over the
 
853
                 * course of the vacuum scan, whether or not it actually contains any
 
854
                 * deletable tuples --- see nbtree/README.
 
855
                 */
 
856
                LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
857
                LockBufferForCleanup(buf);
 
858
 
 
859
                /*
 
860
                 * Check whether we need to recurse back to earlier pages.      What we
 
861
                 * are concerned about is a page split that happened since we started
 
862
                 * the vacuum scan.  If the split moved some tuples to a lower page
 
863
                 * then we might have missed 'em.  If so, set up for tail recursion.
 
864
                 * (Must do this before possibly clearing btpo_cycleid below!)
 
865
                 */
 
866
                if (vstate->cycleid != 0 &&
 
867
                        opaque->btpo_cycleid == vstate->cycleid &&
 
868
                        !(opaque->btpo_flags & BTP_SPLIT_END) &&
 
869
                        !P_RIGHTMOST(opaque) &&
 
870
                        opaque->btpo_next < orig_blkno)
 
871
                        recurse_to = opaque->btpo_next;
 
872
 
 
873
                /*
 
874
                 * Scan over all items to see which ones need deleted according to the
 
875
                 * callback function.
 
876
                 */
 
877
                ndeletable = 0;
 
878
                minoff = P_FIRSTDATAKEY(opaque);
 
879
                maxoff = PageGetMaxOffsetNumber(page);
 
880
                if (callback)
 
881
                {
 
882
                        for (offnum = minoff;
 
883
                                 offnum <= maxoff;
 
884
                                 offnum = OffsetNumberNext(offnum))
 
885
                        {
 
886
                                IndexTuple      itup;
 
887
                                ItemPointer htup;
 
888
 
 
889
                                itup = (IndexTuple) PageGetItem(page,
 
890
                                                                                                PageGetItemId(page, offnum));
 
891
                                htup = &(itup->t_tid);
 
892
 
 
893
                                /*
 
894
                                 * During Hot Standby we currently assume that
 
895
                                 * XLOG_BTREE_VACUUM records do not produce conflicts. That is
 
896
                                 * only true as long as the callback function depends only
 
897
                                 * upon whether the index tuple refers to heap tuples removed
 
898
                                 * in the initial heap scan. When vacuum starts it derives a
 
899
                                 * value of OldestXmin. Backends taking later snapshots could
 
900
                                 * have a RecentGlobalXmin with a later xid than the vacuum's
 
901
                                 * OldestXmin, so it is possible that row versions deleted
 
902
                                 * after OldestXmin could be marked as killed by other
 
903
                                 * backends. The callback function *could* look at the index
 
904
                                 * tuple state in isolation and decide to delete the index
 
905
                                 * tuple, though currently it does not. If it ever did, we
 
906
                                 * would need to reconsider whether XLOG_BTREE_VACUUM records
 
907
                                 * should cause conflicts. If they did cause conflicts they
 
908
                                 * would be fairly harsh conflicts, since we haven't yet
 
909
                                 * worked out a way to pass a useful value for
 
910
                                 * latestRemovedXid on the XLOG_BTREE_VACUUM records. This
 
911
                                 * applies to *any* type of index that marks index tuples as
 
912
                                 * killed.
 
913
                                 */
 
914
                                if (callback(htup, callback_state))
 
915
                                        deletable[ndeletable++] = offnum;
 
916
                        }
 
917
                }
 
918
 
 
919
                /*
 
920
                 * Apply any needed deletes.  We issue just one _bt_delitems() call
 
921
                 * per page, so as to minimize WAL traffic.
 
922
                 */
 
923
                if (ndeletable > 0)
 
924
                {
 
925
                        BlockNumber lastBlockVacuumed = BufferGetBlockNumber(buf);
 
926
 
 
927
                        _bt_delitems_vacuum(rel, buf, deletable, ndeletable, vstate->lastBlockVacuumed);
 
928
 
 
929
                        /*
 
930
                         * Keep track of the block number of the lastBlockVacuumed, so we
 
931
                         * can scan those blocks as well during WAL replay. This then
 
932
                         * provides concurrency protection and allows btrees to be used
 
933
                         * while in recovery.
 
934
                         */
 
935
                        if (lastBlockVacuumed > vstate->lastBlockVacuumed)
 
936
                                vstate->lastBlockVacuumed = lastBlockVacuumed;
 
937
 
 
938
                        stats->tuples_removed += ndeletable;
 
939
                        /* must recompute maxoff */
 
940
                        maxoff = PageGetMaxOffsetNumber(page);
 
941
                }
 
942
                else
 
943
                {
 
944
                        /*
 
945
                         * If the page has been split during this vacuum cycle, it seems
 
946
                         * worth expending a write to clear btpo_cycleid even if we don't
 
947
                         * have any deletions to do.  (If we do, _bt_delitems takes care
 
948
                         * of this.)  This ensures we won't process the page again.
 
949
                         *
 
950
                         * We treat this like a hint-bit update because there's no need to
 
951
                         * WAL-log it.
 
952
                         */
 
953
                        if (vstate->cycleid != 0 &&
 
954
                                opaque->btpo_cycleid == vstate->cycleid)
 
955
                        {
 
956
                                opaque->btpo_cycleid = 0;
 
957
                                SetBufferCommitInfoNeedsSave(buf);
 
958
                        }
 
959
                }
 
960
 
 
961
                /*
 
962
                 * If it's now empty, try to delete; else count the live tuples. We
 
963
                 * don't delete when recursing, though, to avoid putting entries into
 
964
                 * freePages out-of-order (doesn't seem worth any extra code to handle
 
965
                 * the case).
 
966
                 */
 
967
                if (minoff > maxoff)
 
968
                        delete_now = (blkno == orig_blkno);
 
969
                else
 
970
                        stats->num_index_tuples += maxoff - minoff + 1;
 
971
        }
 
972
 
 
973
        if (delete_now)
 
974
        {
 
975
                MemoryContext oldcontext;
 
976
                int                     ndel;
 
977
 
 
978
                /* Run pagedel in a temp context to avoid memory leakage */
 
979
                MemoryContextReset(vstate->pagedelcontext);
 
980
                oldcontext = MemoryContextSwitchTo(vstate->pagedelcontext);
 
981
 
 
982
                ndel = _bt_pagedel(rel, buf, NULL);
 
983
 
 
984
                /* count only this page, else may double-count parent */
 
985
                if (ndel)
 
986
                        stats->pages_deleted++;
 
987
 
 
988
                MemoryContextSwitchTo(oldcontext);
 
989
                /* pagedel released buffer, so we shouldn't */
 
990
        }
 
991
        else
 
992
                _bt_relbuf(rel, buf);
 
993
 
 
994
        /*
 
995
         * This is really tail recursion, but if the compiler is too stupid to
 
996
         * optimize it as such, we'd eat an uncomfortably large amount of stack
 
997
         * space per recursion level (due to the deletable[] array). A failure is
 
998
         * improbable since the number of levels isn't likely to be large ... but
 
999
         * just in case, let's hand-optimize into a loop.
 
1000
         */
 
1001
        if (recurse_to != P_NONE)
 
1002
        {
 
1003
                blkno = recurse_to;
 
1004
                goto restart;
 
1005
        }
 
1006
}