~vcs-imports/mammoth-replicator/trunk

« back to all changes in this revision

Viewing changes to src/backend/commands/vacuum.c

  • Committer: alvherre
  • Date: 2005-12-16 21:24:52 UTC
  • Revision ID: svn-v4:db760fc0-0f08-0410-9d63-cc6633f64896:trunk:1
Initial import of the REL8_0_3 sources from the Pgsql CVS repository.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * vacuum.c
 
4
 *        The postgres vacuum cleaner.
 
5
 *
 
6
 * This file includes the "full" version of VACUUM, as well as control code
 
7
 * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
 
8
 * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
 
9
 *
 
10
 *
 
11
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
 
12
 * Portions Copyright (c) 1994, Regents of the University of California
 
13
 *
 
14
 *
 
15
 * IDENTIFICATION
 
16
 *        $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.299 2004-12-31 21:59:42 pgsql Exp $
 
17
 *
 
18
 *-------------------------------------------------------------------------
 
19
 */
 
20
#include "postgres.h"
 
21
 
 
22
#include <sys/time.h>
 
23
#include <unistd.h>
 
24
 
 
25
#include "access/clog.h"
 
26
#include "access/genam.h"
 
27
#include "access/heapam.h"
 
28
#include "access/subtrans.h"
 
29
#include "access/xlog.h"
 
30
#include "catalog/catalog.h"
 
31
#include "catalog/catname.h"
 
32
#include "catalog/namespace.h"
 
33
#include "catalog/pg_database.h"
 
34
#include "catalog/pg_index.h"
 
35
#include "commands/vacuum.h"
 
36
#include "executor/executor.h"
 
37
#include "miscadmin.h"
 
38
#include "storage/buf_internals.h"
 
39
#include "storage/freespace.h"
 
40
#include "storage/sinval.h"
 
41
#include "storage/smgr.h"
 
42
#include "tcop/pquery.h"
 
43
#include "utils/acl.h"
 
44
#include "utils/builtins.h"
 
45
#include "utils/fmgroids.h"
 
46
#include "utils/inval.h"
 
47
#include "utils/lsyscache.h"
 
48
#include "utils/relcache.h"
 
49
#include "utils/syscache.h"
 
50
#include "pgstat.h"
 
51
 
 
52
 
 
53
typedef struct VacPageData
 
54
{
 
55
        BlockNumber blkno;                      /* BlockNumber of this Page */
 
56
        Size            free;                   /* FreeSpace on this Page */
 
57
        uint16          offsets_used;   /* Number of OffNums used by vacuum */
 
58
        uint16          offsets_free;   /* Number of OffNums free or to be free */
 
59
        OffsetNumber offsets[1];        /* Array of free OffNums */
 
60
} VacPageData;
 
61
 
 
62
typedef VacPageData *VacPage;
 
63
 
 
64
typedef struct VacPageListData
 
65
{
 
66
        BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
 
67
        int                     num_pages;              /* Number of pages in pagedesc */
 
68
        int                     num_allocated_pages;    /* Number of allocated pages in
 
69
                                                                                 * pagedesc */
 
70
        VacPage    *pagedesc;           /* Descriptions of pages */
 
71
} VacPageListData;
 
72
 
 
73
typedef VacPageListData *VacPageList;
 
74
 
 
75
typedef struct VTupleLinkData
 
76
{
 
77
        ItemPointerData new_tid;
 
78
        ItemPointerData this_tid;
 
79
} VTupleLinkData;
 
80
 
 
81
typedef VTupleLinkData *VTupleLink;
 
82
 
 
83
typedef struct VTupleMoveData
 
84
{
 
85
        ItemPointerData tid;            /* tuple ID */
 
86
        VacPage         vacpage;                /* where to move */
 
87
        bool            cleanVpd;               /* clean vacpage before using */
 
88
} VTupleMoveData;
 
89
 
 
90
typedef VTupleMoveData *VTupleMove;
 
91
 
 
92
typedef struct VRelStats
 
93
{
 
94
        BlockNumber rel_pages;
 
95
        double          rel_tuples;
 
96
        Size            min_tlen;
 
97
        Size            max_tlen;
 
98
        bool            hasindex;
 
99
        int                     num_vtlinks;
 
100
        VTupleLink      vtlinks;
 
101
} VRelStats;
 
102
 
 
103
/*----------------------------------------------------------------------
 
104
 * ExecContext:
 
105
 *
 
106
 * As these variables always appear together, we put them into one struct
 
107
 * and pull initialization and cleanup into separate routines.
 
108
 * ExecContext is used by repair_frag() and move_xxx_tuple().  More
 
109
 * accurately:  It is *used* only in move_xxx_tuple(), but because this
 
110
 * routine is called many times, we initialize the struct just once in
 
111
 * repair_frag() and pass it on to move_xxx_tuple().
 
112
 */
 
113
typedef struct ExecContextData
 
114
{
 
115
        ResultRelInfo *resultRelInfo;
 
116
        EState     *estate;
 
117
        TupleTable      tupleTable;
 
118
        TupleTableSlot *slot;
 
119
} ExecContextData;
 
120
typedef ExecContextData *ExecContext;
 
121
 
 
122
static void
 
123
ExecContext_Init(ExecContext ec, Relation rel)
 
124
{
 
125
        TupleDesc       tupdesc = RelationGetDescr(rel);
 
126
 
 
127
        /*
 
128
         * We need a ResultRelInfo and an EState so we can use the regular
 
129
         * executor's index-entry-making machinery.
 
130
         */
 
131
        ec->estate = CreateExecutorState();
 
132
 
 
133
        ec->resultRelInfo = makeNode(ResultRelInfo);
 
134
        ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
 
135
        ec->resultRelInfo->ri_RelationDesc = rel;
 
136
        ec->resultRelInfo->ri_TrigDesc = NULL;          /* we don't fire triggers */
 
137
 
 
138
        ExecOpenIndices(ec->resultRelInfo);
 
139
 
 
140
        ec->estate->es_result_relations = ec->resultRelInfo;
 
141
        ec->estate->es_num_result_relations = 1;
 
142
        ec->estate->es_result_relation_info = ec->resultRelInfo;
 
143
 
 
144
        /* Set up a dummy tuple table too */
 
145
        ec->tupleTable = ExecCreateTupleTable(1);
 
146
        ec->slot = ExecAllocTableSlot(ec->tupleTable);
 
147
        ExecSetSlotDescriptor(ec->slot, tupdesc, false);
 
148
}
 
149
 
 
150
static void
 
151
ExecContext_Finish(ExecContext ec)
 
152
{
 
153
        ExecDropTupleTable(ec->tupleTable, true);
 
154
        ExecCloseIndices(ec->resultRelInfo);
 
155
        FreeExecutorState(ec->estate);
 
156
}
 
157
 
 
158
/*
 
159
 * End of ExecContext Implementation
 
160
 *----------------------------------------------------------------------
 
161
 */
 
162
 
 
163
static MemoryContext vac_context = NULL;
 
164
 
 
165
static int      elevel = -1;
 
166
 
 
167
static TransactionId OldestXmin;
 
168
static TransactionId FreezeLimit;
 
169
 
 
170
 
 
171
/* non-export function prototypes */
 
172
static List *get_rel_oids(const RangeVar *vacrel, const char *stmttype);
 
173
static void vac_update_dbstats(Oid dbid,
 
174
                                   TransactionId vacuumXID,
 
175
                                   TransactionId frozenXID);
 
176
static void vac_truncate_clog(TransactionId vacuumXID,
 
177
                                  TransactionId frozenXID);
 
178
static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
 
179
static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
 
180
static void scan_heap(VRelStats *vacrelstats, Relation onerel,
 
181
                  VacPageList vacuum_pages, VacPageList fraged_pages);
 
182
static void repair_frag(VRelStats *vacrelstats, Relation onerel,
 
183
                        VacPageList vacuum_pages, VacPageList fraged_pages,
 
184
                        int nindexes, Relation *Irel);
 
185
static void move_chain_tuple(Relation rel,
 
186
                                 Buffer old_buf, Page old_page, HeapTuple old_tup,
 
187
                                 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
 
188
                                 ExecContext ec, ItemPointer ctid, bool cleanVpd);
 
189
static void move_plain_tuple(Relation rel,
 
190
                                 Buffer old_buf, Page old_page, HeapTuple old_tup,
 
191
                                 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
 
192
                                 ExecContext ec);
 
193
static void update_hint_bits(Relation rel, VacPageList fraged_pages,
 
194
                                 int num_fraged_pages, BlockNumber last_move_dest_block,
 
195
                                 int num_moved);
 
196
static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
 
197
                        VacPageList vacpagelist);
 
198
static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
 
199
static void vacuum_index(VacPageList vacpagelist, Relation indrel,
 
200
                         double num_tuples, int keep_tuples);
 
201
static void scan_index(Relation indrel, double num_tuples);
 
202
static bool tid_reaped(ItemPointer itemptr, void *state);
 
203
static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
 
204
static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
 
205
                           BlockNumber rel_pages);
 
206
static VacPage copy_vac_page(VacPage vacpage);
 
207
static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
 
208
static void *vac_bsearch(const void *key, const void *base,
 
209
                        size_t nelem, size_t size,
 
210
                        int (*compar) (const void *, const void *));
 
211
static int      vac_cmp_blk(const void *left, const void *right);
 
212
static int      vac_cmp_offno(const void *left, const void *right);
 
213
static int      vac_cmp_vtlinks(const void *left, const void *right);
 
214
static bool enough_space(VacPage vacpage, Size len);
 
215
 
 
216
 
 
217
/****************************************************************************
 
218
 *                                                                                                                                                      *
 
219
 *                      Code common to all flavors of VACUUM and ANALYZE                                *
 
220
 *                                                                                                                                                      *
 
221
 ****************************************************************************
 
222
 */
 
223
 
 
224
 
 
225
/*
 
226
 * Primary entry point for VACUUM and ANALYZE commands.
 
227
 */
 
228
void
 
229
vacuum(VacuumStmt *vacstmt)
 
230
{
 
231
        const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
 
232
        TransactionId initialOldestXmin = InvalidTransactionId;
 
233
        TransactionId initialFreezeLimit = InvalidTransactionId;
 
234
        volatile MemoryContext anl_context = NULL;
 
235
        volatile bool all_rels,
 
236
                                in_outer_xact,
 
237
                                use_own_xacts;
 
238
        List       *relations;
 
239
 
 
240
        if (vacstmt->verbose)
 
241
                elevel = INFO;
 
242
        else
 
243
                elevel = DEBUG2;
 
244
 
 
245
        /*
 
246
         * We cannot run VACUUM inside a user transaction block; if we were
 
247
         * inside a transaction, then our commit- and
 
248
         * start-transaction-command calls would not have the intended effect!
 
249
         * Furthermore, the forced commit that occurs before truncating the
 
250
         * relation's file would have the effect of committing the rest of the
 
251
         * user's transaction too, which would certainly not be the desired
 
252
         * behavior.  (This only applies to VACUUM FULL, though.  We could in
 
253
         * theory run lazy VACUUM inside a transaction block, but we choose to
 
254
         * disallow that case because we'd rather commit as soon as possible
 
255
         * after finishing the vacuum.  This is mainly so that we can let go
 
256
         * the AccessExclusiveLock that we may be holding.)
 
257
         *
 
258
         * ANALYZE (without VACUUM) can run either way.
 
259
         */
 
260
        if (vacstmt->vacuum)
 
261
        {
 
262
                PreventTransactionChain((void *) vacstmt, stmttype);
 
263
                in_outer_xact = false;
 
264
        }
 
265
        else
 
266
                in_outer_xact = IsInTransactionChain((void *) vacstmt);
 
267
 
 
268
        /*
 
269
         * Disallow the combination VACUUM FULL FREEZE; although it would mostly
 
270
         * work, VACUUM FULL's ability to move tuples around means that it is
 
271
         * injecting its own XID into tuple visibility checks.  We'd have to
 
272
         * guarantee that every moved tuple is properly marked XMIN_COMMITTED or
 
273
         * XMIN_INVALID before the end of the operation.  There are corner cases
 
274
         * where this does not happen, and getting rid of them all seems hard
 
275
         * (not to mention fragile to maintain).  On the whole it's not worth it
 
276
         * compared to telling people to use two operations.  See pgsql-hackers
 
277
         * discussion of 27-Nov-2004, and comments below for update_hint_bits().
 
278
         *
 
279
         * Note: this is enforced here, and not in the grammar, since (a) we can
 
280
         * give a better error message, and (b) we might want to allow it again
 
281
         * someday.
 
282
         */
 
283
        if (vacstmt->vacuum && vacstmt->full && vacstmt->freeze)
 
284
                ereport(ERROR,
 
285
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
286
                                 errmsg("VACUUM FULL FREEZE is not supported"),
 
287
                                 errhint("Use VACUUM FULL, then VACUUM FREEZE.")));
 
288
 
 
289
        /*
 
290
         * Send info about dead objects to the statistics collector
 
291
         */
 
292
        if (vacstmt->vacuum)
 
293
                pgstat_vacuum_tabstat();
 
294
 
 
295
        /*
 
296
         * Create special memory context for cross-transaction storage.
 
297
         *
 
298
         * Since it is a child of PortalContext, it will go away eventually even
 
299
         * if we suffer an error; there's no need for special abort cleanup
 
300
         * logic.
 
301
         */
 
302
        vac_context = AllocSetContextCreate(PortalContext,
 
303
                                                                                "Vacuum",
 
304
                                                                                ALLOCSET_DEFAULT_MINSIZE,
 
305
                                                                                ALLOCSET_DEFAULT_INITSIZE,
 
306
                                                                                ALLOCSET_DEFAULT_MAXSIZE);
 
307
 
 
308
        /* Assume we are processing everything unless one table is mentioned */
 
309
        all_rels = (vacstmt->relation == NULL);
 
310
 
 
311
        /* Build list of relations to process (note this lives in vac_context) */
 
312
        relations = get_rel_oids(vacstmt->relation, stmttype);
 
313
 
 
314
        if (vacstmt->vacuum && all_rels)
 
315
        {
 
316
                /*
 
317
                 * It's a database-wide VACUUM.
 
318
                 *
 
319
                 * Compute the initially applicable OldestXmin and FreezeLimit XIDs,
 
320
                 * so that we can record these values at the end of the VACUUM.
 
321
                 * Note that individual tables may well be processed with newer
 
322
                 * values, but we can guarantee that no (non-shared) relations are
 
323
                 * processed with older ones.
 
324
                 *
 
325
                 * It is okay to record non-shared values in pg_database, even though
 
326
                 * we may vacuum shared relations with older cutoffs, because only
 
327
                 * the minimum of the values present in pg_database matters.  We
 
328
                 * can be sure that shared relations have at some time been
 
329
                 * vacuumed with cutoffs no worse than the global minimum; for, if
 
330
                 * there is a backend in some other DB with xmin = OLDXMIN that's
 
331
                 * determining the cutoff with which we vacuum shared relations,
 
332
                 * it is not possible for that database to have a cutoff newer
 
333
                 * than OLDXMIN recorded in pg_database.
 
334
                 */
 
335
                vacuum_set_xid_limits(vacstmt, false,
 
336
                                                          &initialOldestXmin,
 
337
                                                          &initialFreezeLimit);
 
338
        }
 
339
 
 
340
        /*
 
341
         * Decide whether we need to start/commit our own transactions.
 
342
         *
 
343
         * For VACUUM (with or without ANALYZE): always do so, so that we can
 
344
         * release locks as soon as possible.  (We could possibly use the
 
345
         * outer transaction for a one-table VACUUM, but handling TOAST tables
 
346
         * would be problematic.)
 
347
         *
 
348
         * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
 
349
         * start/commit our own transactions.  Also, there's no need to do so
 
350
         * if only processing one relation.  For multiple relations when not
 
351
         * within a transaction block, use own transactions so we can release
 
352
         * locks sooner.
 
353
         */
 
354
        if (vacstmt->vacuum)
 
355
                use_own_xacts = true;
 
356
        else
 
357
        {
 
358
                Assert(vacstmt->analyze);
 
359
                if (in_outer_xact)
 
360
                        use_own_xacts = false;
 
361
                else if (list_length(relations) > 1)
 
362
                        use_own_xacts = true;
 
363
                else
 
364
                        use_own_xacts = false;
 
365
        }
 
366
 
 
367
        /*
 
368
         * If we are running ANALYZE without per-table transactions, we'll
 
369
         * need a memory context with table lifetime.
 
370
         */
 
371
        if (!use_own_xacts)
 
372
                anl_context = AllocSetContextCreate(PortalContext,
 
373
                                                                                        "Analyze",
 
374
                                                                                        ALLOCSET_DEFAULT_MINSIZE,
 
375
                                                                                        ALLOCSET_DEFAULT_INITSIZE,
 
376
                                                                                        ALLOCSET_DEFAULT_MAXSIZE);
 
377
 
 
378
        /*
 
379
         * vacuum_rel expects to be entered with no transaction active; it
 
380
         * will start and commit its own transaction.  But we are called by an
 
381
         * SQL command, and so we are executing inside a transaction already.
 
382
         * We commit the transaction started in PostgresMain() here, and start
 
383
         * another one before exiting to match the commit waiting for us back
 
384
         * in PostgresMain().
 
385
         */
 
386
        if (use_own_xacts)
 
387
        {
 
388
                /* matches the StartTransaction in PostgresMain() */
 
389
                CommitTransactionCommand();
 
390
        }
 
391
 
 
392
        /* Turn vacuum cost accounting on or off */
 
393
        PG_TRY();
 
394
        {
 
395
                ListCell   *cur;
 
396
 
 
397
                VacuumCostActive = (VacuumCostDelay > 0);
 
398
                VacuumCostBalance = 0;
 
399
 
 
400
                /*
 
401
                 * Loop to process each selected relation.
 
402
                 */
 
403
                foreach(cur, relations)
 
404
                {
 
405
                        Oid                     relid = lfirst_oid(cur);
 
406
 
 
407
                        if (vacstmt->vacuum)
 
408
                        {
 
409
                                if (!vacuum_rel(relid, vacstmt, RELKIND_RELATION))
 
410
                                        all_rels = false;       /* forget about updating dbstats */
 
411
                        }
 
412
                        if (vacstmt->analyze)
 
413
                        {
 
414
                                MemoryContext old_context = NULL;
 
415
 
 
416
                                /*
 
417
                                 * If using separate xacts, start one for analyze.
 
418
                                 * Otherwise, we can use the outer transaction, but we
 
419
                                 * still need to call analyze_rel in a memory context that
 
420
                                 * will be cleaned up on return (else we leak memory while
 
421
                                 * processing multiple tables).
 
422
                                 */
 
423
                                if (use_own_xacts)
 
424
                                {
 
425
                                        StartTransactionCommand();
 
426
                                        /* functions in indexes may want a snapshot set */
 
427
                                        ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
 
428
                                }
 
429
                                else
 
430
                                        old_context = MemoryContextSwitchTo(anl_context);
 
431
 
 
432
                                /*
 
433
                                 * Tell the buffer replacement strategy that vacuum is
 
434
                                 * causing the IO
 
435
                                 */
 
436
                                StrategyHintVacuum(true);
 
437
 
 
438
                                analyze_rel(relid, vacstmt);
 
439
 
 
440
                                StrategyHintVacuum(false);
 
441
 
 
442
                                if (use_own_xacts)
 
443
                                        CommitTransactionCommand();
 
444
                                else
 
445
                                {
 
446
                                        MemoryContextSwitchTo(old_context);
 
447
                                        MemoryContextResetAndDeleteChildren(anl_context);
 
448
                                }
 
449
                        }
 
450
                }
 
451
        }
 
452
        PG_CATCH();
 
453
        {
 
454
                /* Make sure cost accounting is turned off after error */
 
455
                VacuumCostActive = false;
 
456
                PG_RE_THROW();
 
457
        }
 
458
        PG_END_TRY();
 
459
 
 
460
        /* Turn off vacuum cost accounting */
 
461
        VacuumCostActive = false;
 
462
 
 
463
        /*
 
464
         * Finish up processing.
 
465
         */
 
466
        if (use_own_xacts)
 
467
        {
 
468
                /* here, we are not in a transaction */
 
469
 
 
470
                /*
 
471
                 * This matches the CommitTransaction waiting for us in
 
472
                 * PostgresMain().
 
473
                 */
 
474
                StartTransactionCommand();
 
475
        }
 
476
 
 
477
        if (vacstmt->vacuum)
 
478
        {
 
479
                /*
 
480
                 * If it was a database-wide VACUUM, print FSM usage statistics
 
481
                 * (we don't make you be superuser to see these).
 
482
                 */
 
483
                if (vacstmt->relation == NULL)
 
484
                        PrintFreeSpaceMapStatistics(elevel);
 
485
 
 
486
                /*
 
487
                 * If we completed a database-wide VACUUM without skipping any
 
488
                 * relations, update the database's pg_database row with info
 
489
                 * about the transaction IDs used, and try to truncate pg_clog.
 
490
                 */
 
491
                if (all_rels)
 
492
                {
 
493
                        vac_update_dbstats(MyDatabaseId,
 
494
                                                           initialOldestXmin, initialFreezeLimit);
 
495
                        vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
 
496
                }
 
497
        }
 
498
 
 
499
        /*
 
500
         * Clean up working storage --- note we must do this after
 
501
         * StartTransactionCommand, else we might be trying to delete the
 
502
         * active context!
 
503
         */
 
504
        MemoryContextDelete(vac_context);
 
505
        vac_context = NULL;
 
506
 
 
507
        if (anl_context)
 
508
                MemoryContextDelete(anl_context);
 
509
}
 
510
 
 
511
/*
 
512
 * Build a list of Oids for each relation to be processed
 
513
 *
 
514
 * The list is built in vac_context so that it will survive across our
 
515
 * per-relation transactions.
 
516
 */
 
517
static List *
 
518
get_rel_oids(const RangeVar *vacrel, const char *stmttype)
 
519
{
 
520
        List       *oid_list = NIL;
 
521
        MemoryContext oldcontext;
 
522
 
 
523
        if (vacrel)
 
524
        {
 
525
                /* Process a specific relation */
 
526
                Oid                     relid;
 
527
 
 
528
                relid = RangeVarGetRelid(vacrel, false);
 
529
 
 
530
                /* Make a relation list entry for this guy */
 
531
                oldcontext = MemoryContextSwitchTo(vac_context);
 
532
                oid_list = lappend_oid(oid_list, relid);
 
533
                MemoryContextSwitchTo(oldcontext);
 
534
        }
 
535
        else
 
536
        {
 
537
                /* Process all plain relations listed in pg_class */
 
538
                Relation        pgclass;
 
539
                HeapScanDesc scan;
 
540
                HeapTuple       tuple;
 
541
                ScanKeyData key;
 
542
 
 
543
                ScanKeyInit(&key,
 
544
                                        Anum_pg_class_relkind,
 
545
                                        BTEqualStrategyNumber, F_CHAREQ,
 
546
                                        CharGetDatum(RELKIND_RELATION));
 
547
 
 
548
                pgclass = heap_openr(RelationRelationName, AccessShareLock);
 
549
 
 
550
                scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
 
551
 
 
552
                while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
 
553
                {
 
554
                        /* Make a relation list entry for this guy */
 
555
                        oldcontext = MemoryContextSwitchTo(vac_context);
 
556
                        oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
 
557
                        MemoryContextSwitchTo(oldcontext);
 
558
                }
 
559
 
 
560
                heap_endscan(scan);
 
561
                heap_close(pgclass, AccessShareLock);
 
562
        }
 
563
 
 
564
        return oid_list;
 
565
}
 
566
 
 
567
/*
 
568
 * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
 
569
 */
 
570
void
 
571
vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
 
572
                                          TransactionId *oldestXmin,
 
573
                                          TransactionId *freezeLimit)
 
574
{
 
575
        TransactionId limit;
 
576
 
 
577
        *oldestXmin = GetOldestXmin(sharedRel);
 
578
 
 
579
        Assert(TransactionIdIsNormal(*oldestXmin));
 
580
 
 
581
        if (vacstmt->freeze)
 
582
        {
 
583
                /* FREEZE option: use oldest Xmin as freeze cutoff too */
 
584
                limit = *oldestXmin;
 
585
        }
 
586
        else
 
587
        {
 
588
                /*
 
589
                 * Normal case: freeze cutoff is well in the past, to wit, about
 
590
                 * halfway to the wrap horizon
 
591
                 */
 
592
                limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
 
593
        }
 
594
 
 
595
        /*
 
596
         * Be careful not to generate a "permanent" XID
 
597
         */
 
598
        if (!TransactionIdIsNormal(limit))
 
599
                limit = FirstNormalTransactionId;
 
600
 
 
601
        /*
 
602
         * Ensure sane relationship of limits
 
603
         */
 
604
        if (TransactionIdFollows(limit, *oldestXmin))
 
605
        {
 
606
                ereport(WARNING,
 
607
                                (errmsg("oldest xmin is far in the past"),
 
608
                                 errhint("Close open transactions soon to avoid wraparound problems.")));
 
609
                limit = *oldestXmin;
 
610
        }
 
611
 
 
612
        *freezeLimit = limit;
 
613
}
 
614
 
 
615
 
 
616
/*
 
617
 *      vac_update_relstats() -- update statistics for one relation
 
618
 *
 
619
 *              Update the whole-relation statistics that are kept in its pg_class
 
620
 *              row.  There are additional stats that will be updated if we are
 
621
 *              doing ANALYZE, but we always update these stats.  This routine works
 
622
 *              for both index and heap relation entries in pg_class.
 
623
 *
 
624
 *              We violate no-overwrite semantics here by storing new values for the
 
625
 *              statistics columns directly into the pg_class tuple that's already on
 
626
 *              the page.  The reason for this is that if we updated these tuples in
 
627
 *              the usual way, vacuuming pg_class itself wouldn't work very well ---
 
628
 *              by the time we got done with a vacuum cycle, most of the tuples in
 
629
 *              pg_class would've been obsoleted.  Of course, this only works for
 
630
 *              fixed-size never-null columns, but these are.
 
631
 *
 
632
 *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
 
633
 *              ANALYZE.
 
634
 */
 
635
void
 
636
vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
 
637
                                        bool hasindex)
 
638
{
 
639
        Relation        rd;
 
640
        HeapTupleData rtup;
 
641
        HeapTuple       ctup;
 
642
        Form_pg_class pgcform;
 
643
        Buffer          buffer;
 
644
 
 
645
        /*
 
646
         * update number of tuples and number of pages in pg_class
 
647
         */
 
648
        rd = heap_openr(RelationRelationName, RowExclusiveLock);
 
649
 
 
650
        ctup = SearchSysCache(RELOID,
 
651
                                                  ObjectIdGetDatum(relid),
 
652
                                                  0, 0, 0);
 
653
        if (!HeapTupleIsValid(ctup))
 
654
                elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
 
655
                         relid);
 
656
 
 
657
        /* get the buffer cache tuple */
 
658
        rtup.t_self = ctup->t_self;
 
659
        ReleaseSysCache(ctup);
 
660
        if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
 
661
                elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
 
662
                         relid);
 
663
 
 
664
        /* ensure no one else does this at the same time */
 
665
        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
666
 
 
667
        /* overwrite the existing statistics in the tuple */
 
668
        pgcform = (Form_pg_class) GETSTRUCT(&rtup);
 
669
        pgcform->relpages = (int32) num_pages;
 
670
        pgcform->reltuples = (float4) num_tuples;
 
671
        pgcform->relhasindex = hasindex;
 
672
 
 
673
        /*
 
674
         * If we have discovered that there are no indexes, then there's no
 
675
         * primary key either.  This could be done more thoroughly...
 
676
         */
 
677
        if (!hasindex)
 
678
                pgcform->relhaspkey = false;
 
679
 
 
680
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
681
 
 
682
        /*
 
683
         * Invalidate the tuple in the catcaches; this also arranges to flush
 
684
         * the relation's relcache entry.  (If we fail to commit for some
 
685
         * reason, no flush will occur, but no great harm is done since there
 
686
         * are no noncritical state updates here.)
 
687
         */
 
688
        CacheInvalidateHeapTuple(rd, &rtup);
 
689
 
 
690
        /* Write the buffer */
 
691
        WriteBuffer(buffer);
 
692
 
 
693
        heap_close(rd, RowExclusiveLock);
 
694
}
 
695
 
 
696
 
 
697
/*
 
698
 *      vac_update_dbstats() -- update statistics for one database
 
699
 *
 
700
 *              Update the whole-database statistics that are kept in its pg_database
 
701
 *              row.
 
702
 *
 
703
 *              We violate no-overwrite semantics here by storing new values for the
 
704
 *              statistics columns directly into the tuple that's already on the page.
 
705
 *              As with vac_update_relstats, this avoids leaving dead tuples behind
 
706
 *              after a VACUUM; which is good since GetRawDatabaseInfo
 
707
 *              can get confused by finding dead tuples in pg_database.
 
708
 *
 
709
 *              This routine is shared by full and lazy VACUUM.  Note that it is only
 
710
 *              applied after a database-wide VACUUM operation.
 
711
 */
 
712
static void
 
713
vac_update_dbstats(Oid dbid,
 
714
                                   TransactionId vacuumXID,
 
715
                                   TransactionId frozenXID)
 
716
{
 
717
        Relation        relation;
 
718
        ScanKeyData entry[1];
 
719
        HeapScanDesc scan;
 
720
        HeapTuple       tuple;
 
721
        Form_pg_database dbform;
 
722
 
 
723
        relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
 
724
 
 
725
        /* Must use a heap scan, since there's no syscache for pg_database */
 
726
        ScanKeyInit(&entry[0],
 
727
                                ObjectIdAttributeNumber,
 
728
                                BTEqualStrategyNumber, F_OIDEQ,
 
729
                                ObjectIdGetDatum(dbid));
 
730
 
 
731
        scan = heap_beginscan(relation, SnapshotNow, 1, entry);
 
732
 
 
733
        tuple = heap_getnext(scan, ForwardScanDirection);
 
734
 
 
735
        if (!HeapTupleIsValid(tuple))
 
736
                elog(ERROR, "could not find tuple for database %u", dbid);
 
737
 
 
738
        /* ensure no one else does this at the same time */
 
739
        LockBuffer(scan->rs_cbuf, BUFFER_LOCK_EXCLUSIVE);
 
740
 
 
741
        dbform = (Form_pg_database) GETSTRUCT(tuple);
 
742
 
 
743
        /* overwrite the existing statistics in the tuple */
 
744
        dbform->datvacuumxid = vacuumXID;
 
745
        dbform->datfrozenxid = frozenXID;
 
746
 
 
747
        LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
 
748
 
 
749
        /* invalidate the tuple in the cache and write the buffer */
 
750
        CacheInvalidateHeapTuple(relation, tuple);
 
751
        WriteNoReleaseBuffer(scan->rs_cbuf);
 
752
 
 
753
        heap_endscan(scan);
 
754
 
 
755
        heap_close(relation, RowExclusiveLock);
 
756
}
 
757
 
 
758
 
 
759
/*
 
760
 *      vac_truncate_clog() -- attempt to truncate the commit log
 
761
 *
 
762
 *              Scan pg_database to determine the system-wide oldest datvacuumxid,
 
763
 *              and use it to truncate the transaction commit log (pg_clog).
 
764
 *              Also generate a warning if the system-wide oldest datfrozenxid
 
765
 *              seems to be in danger of wrapping around.
 
766
 *
 
767
 *              The passed XIDs are simply the ones I just wrote into my pg_database
 
768
 *              entry.  They're used to initialize the "min" calculations.
 
769
 *
 
770
 *              This routine is shared by full and lazy VACUUM.  Note that it is only
 
771
 *              applied after a database-wide VACUUM operation.
 
772
 */
 
773
static void
 
774
vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
 
775
{
 
776
        TransactionId myXID = GetCurrentTransactionId();
 
777
        Relation        relation;
 
778
        HeapScanDesc scan;
 
779
        HeapTuple       tuple;
 
780
        int32           age;
 
781
        bool            vacuumAlreadyWrapped = false;
 
782
        bool            frozenAlreadyWrapped = false;
 
783
 
 
784
 
 
785
        relation = heap_openr(DatabaseRelationName, AccessShareLock);
 
786
 
 
787
        scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
 
788
 
 
789
        while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
 
790
        {
 
791
                Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
 
792
 
 
793
                /* Ignore non-connectable databases (eg, template0) */
 
794
                /* It's assumed that these have been frozen correctly */
 
795
                if (!dbform->datallowconn)
 
796
                        continue;
 
797
 
 
798
                if (TransactionIdIsNormal(dbform->datvacuumxid))
 
799
                {
 
800
                        if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
 
801
                                vacuumAlreadyWrapped = true;
 
802
                        else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
 
803
                                vacuumXID = dbform->datvacuumxid;
 
804
                }
 
805
                if (TransactionIdIsNormal(dbform->datfrozenxid))
 
806
                {
 
807
                        if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
 
808
                                frozenAlreadyWrapped = true;
 
809
                        else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
 
810
                                frozenXID = dbform->datfrozenxid;
 
811
                }
 
812
        }
 
813
 
 
814
        heap_endscan(scan);
 
815
 
 
816
        heap_close(relation, AccessShareLock);
 
817
 
 
818
        /*
 
819
         * Do not truncate CLOG if we seem to have suffered wraparound
 
820
         * already; the computed minimum XID might be bogus.
 
821
         */
 
822
        if (vacuumAlreadyWrapped)
 
823
        {
 
824
                ereport(WARNING,
 
825
                                (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
 
826
                                 errdetail("You may have already suffered transaction-wraparound data loss.")));
 
827
                return;
 
828
        }
 
829
 
 
830
        /* Truncate CLOG to the oldest vacuumxid */
 
831
        TruncateCLOG(vacuumXID);
 
832
 
 
833
        /* Give warning about impending wraparound problems */
 
834
        if (frozenAlreadyWrapped)
 
835
        {
 
836
                ereport(WARNING,
 
837
                                (errmsg("some databases have not been vacuumed in over 1 billion transactions"),
 
838
                                 errhint("Better vacuum them soon, or you may have a wraparound failure.")));
 
839
        }
 
840
        else
 
841
        {
 
842
                age = (int32) (myXID - frozenXID);
 
843
                if (age > (int32) ((MaxTransactionId >> 3) * 3))
 
844
                        ereport(WARNING,
 
845
                                        (errmsg("some databases have not been vacuumed in %d transactions",
 
846
                                                        age),
 
847
                                         errhint("Better vacuum them within %d transactions, "
 
848
                                                         "or you may have a wraparound failure.",
 
849
                                                         (int32) (MaxTransactionId >> 1) - age)));
 
850
        }
 
851
}
 
852
 
 
853
 
 
854
/****************************************************************************
 
855
 *                                                                                                                                                      *
 
856
 *                      Code common to both flavors of VACUUM                                                   *
 
857
 *                                                                                                                                                      *
 
858
 ****************************************************************************
 
859
 */
 
860
 
 
861
 
 
862
/*
 
863
 *      vacuum_rel() -- vacuum one heap relation
 
864
 *
 
865
 *              Returns TRUE if we actually processed the relation (or can ignore it
 
866
 *              for some reason), FALSE if we failed to process it due to permissions
 
867
 *              or other reasons.  (A FALSE result really means that some data
 
868
 *              may have been left unvacuumed, so we can't update XID stats.)
 
869
 *
 
870
 *              Doing one heap at a time incurs extra overhead, since we need to
 
871
 *              check that the heap exists again just before we vacuum it.      The
 
872
 *              reason that we do this is so that vacuuming can be spread across
 
873
 *              many small transactions.  Otherwise, two-phase locking would require
 
874
 *              us to lock the entire database during one pass of the vacuum cleaner.
 
875
 *
 
876
 *              At entry and exit, we are not inside a transaction.
 
877
 */
 
878
static bool
 
879
vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
 
880
{
 
881
        LOCKMODE        lmode;
 
882
        Relation        onerel;
 
883
        LockRelId       onerelid;
 
884
        Oid                     toast_relid;
 
885
        bool            result;
 
886
 
 
887
        /* Begin a transaction for vacuuming this relation */
 
888
        StartTransactionCommand();
 
889
        /* functions in indexes may want a snapshot set */
 
890
        ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
 
891
 
 
892
        /*
 
893
         * Tell the cache replacement strategy that vacuum is causing all
 
894
         * following IO
 
895
         */
 
896
        StrategyHintVacuum(true);
 
897
 
 
898
        /*
 
899
         * Check for user-requested abort.      Note we want this to be inside a
 
900
         * transaction, so xact.c doesn't issue useless WARNING.
 
901
         */
 
902
        CHECK_FOR_INTERRUPTS();
 
903
 
 
904
        /*
 
905
         * Race condition -- if the pg_class tuple has gone away since the
 
906
         * last time we saw it, we don't need to vacuum it.
 
907
         */
 
908
        if (!SearchSysCacheExists(RELOID,
 
909
                                                          ObjectIdGetDatum(relid),
 
910
                                                          0, 0, 0))
 
911
        {
 
912
                StrategyHintVacuum(false);
 
913
                CommitTransactionCommand();
 
914
                return true;                    /* okay 'cause no data there */
 
915
        }
 
916
 
 
917
        /*
 
918
         * Determine the type of lock we want --- hard exclusive lock for a
 
919
         * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
 
920
         * vacuum.      Either way, we can be sure that no other backend is
 
921
         * vacuuming the same table.
 
922
         */
 
923
        lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
 
924
 
 
925
        /*
 
926
         * Open the class, get an appropriate lock on it, and check
 
927
         * permissions.
 
928
         *
 
929
         * We allow the user to vacuum a table if he is superuser, the table
 
930
         * owner, or the database owner (but in the latter case, only if it's
 
931
         * not a shared relation).      pg_class_ownercheck includes the superuser
 
932
         * case.
 
933
         *
 
934
         * Note we choose to treat permissions failure as a WARNING and keep
 
935
         * trying to vacuum the rest of the DB --- is this appropriate?
 
936
         */
 
937
        onerel = relation_open(relid, lmode);
 
938
 
 
939
        if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
 
940
                  (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
 
941
        {
 
942
                ereport(WARNING,
 
943
                                (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
 
944
                                                RelationGetRelationName(onerel))));
 
945
                relation_close(onerel, lmode);
 
946
                StrategyHintVacuum(false);
 
947
                CommitTransactionCommand();
 
948
                return false;
 
949
        }
 
950
 
 
951
        /*
 
952
         * Check that it's a plain table; we used to do this in get_rel_oids()
 
953
         * but seems safer to check after we've locked the relation.
 
954
         */
 
955
        if (onerel->rd_rel->relkind != expected_relkind)
 
956
        {
 
957
                ereport(WARNING,
 
958
                                (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
 
959
                                                RelationGetRelationName(onerel))));
 
960
                relation_close(onerel, lmode);
 
961
                StrategyHintVacuum(false);
 
962
                CommitTransactionCommand();
 
963
                return false;
 
964
        }
 
965
 
 
966
        /*
 
967
         * Silently ignore tables that are temp tables of other backends ---
 
968
         * trying to vacuum these will lead to great unhappiness, since their
 
969
         * contents are probably not up-to-date on disk.  (We don't throw a
 
970
         * warning here; it would just lead to chatter during a database-wide
 
971
         * VACUUM.)
 
972
         */
 
973
        if (isOtherTempNamespace(RelationGetNamespace(onerel)))
 
974
        {
 
975
                relation_close(onerel, lmode);
 
976
                StrategyHintVacuum(false);
 
977
                CommitTransactionCommand();
 
978
                return true;                    /* assume no long-lived data in temp
 
979
                                                                 * tables */
 
980
        }
 
981
 
 
982
        /*
 
983
         * Get a session-level lock too. This will protect our access to the
 
984
         * relation across multiple transactions, so that we can vacuum the
 
985
         * relation's TOAST table (if any) secure in the knowledge that no one
 
986
         * is deleting the parent relation.
 
987
         *
 
988
         * NOTE: this cannot block, even if someone else is waiting for access,
 
989
         * because the lock manager knows that both lock requests are from the
 
990
         * same process.
 
991
         */
 
992
        onerelid = onerel->rd_lockInfo.lockRelId;
 
993
        LockRelationForSession(&onerelid, lmode);
 
994
 
 
995
        /*
 
996
         * Remember the relation's TOAST relation for later
 
997
         */
 
998
        toast_relid = onerel->rd_rel->reltoastrelid;
 
999
 
 
1000
        /*
 
1001
         * Do the actual work --- either FULL or "lazy" vacuum
 
1002
         */
 
1003
        if (vacstmt->full)
 
1004
                full_vacuum_rel(onerel, vacstmt);
 
1005
        else
 
1006
                lazy_vacuum_rel(onerel, vacstmt);
 
1007
 
 
1008
        result = true;                          /* did the vacuum */
 
1009
 
 
1010
        /* all done with this class, but hold lock until commit */
 
1011
        relation_close(onerel, NoLock);
 
1012
 
 
1013
        /*
 
1014
         * Complete the transaction and free all temporary memory used.
 
1015
         */
 
1016
        StrategyHintVacuum(false);
 
1017
        CommitTransactionCommand();
 
1018
 
 
1019
        /*
 
1020
         * If the relation has a secondary toast rel, vacuum that too while we
 
1021
         * still hold the session lock on the master table.  Note however that
 
1022
         * "analyze" will not get done on the toast table.      This is good,
 
1023
         * because the toaster always uses hardcoded index access and
 
1024
         * statistics are totally unimportant for toast relations.
 
1025
         */
 
1026
        if (toast_relid != InvalidOid)
 
1027
        {
 
1028
                if (!vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
 
1029
                        result = false;         /* failed to vacuum the TOAST table? */
 
1030
        }
 
1031
 
 
1032
        /*
 
1033
         * Now release the session-level lock on the master table.
 
1034
         */
 
1035
        UnlockRelationForSession(&onerelid, lmode);
 
1036
 
 
1037
        return result;
 
1038
}
 
1039
 
 
1040
 
 
1041
/****************************************************************************
 
1042
 *                                                                                                                                                      *
 
1043
 *                      Code for VACUUM FULL (only)                                                                             *
 
1044
 *                                                                                                                                                      *
 
1045
 ****************************************************************************
 
1046
 */
 
1047
 
 
1048
 
 
1049
/*
 
1050
 *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
 
1051
 *
 
1052
 *              This routine vacuums a single heap, cleans out its indexes, and
 
1053
 *              updates its num_pages and num_tuples statistics.
 
1054
 *
 
1055
 *              At entry, we have already established a transaction and opened
 
1056
 *              and locked the relation.
 
1057
 */
 
1058
static void
 
1059
full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
 
1060
{
 
1061
        VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
 
1062
                                                                                 * clean indexes */
 
1063
        VacPageListData fraged_pages;           /* List of pages with space enough
 
1064
                                                                                 * for re-using */
 
1065
        Relation   *Irel;
 
1066
        int                     nindexes,
 
1067
                                i;
 
1068
        VRelStats  *vacrelstats;
 
1069
 
 
1070
        vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
 
1071
                                                  &OldestXmin, &FreezeLimit);
 
1072
 
 
1073
        /*
 
1074
         * Set up statistics-gathering machinery.
 
1075
         */
 
1076
        vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
 
1077
        vacrelstats->rel_pages = 0;
 
1078
        vacrelstats->rel_tuples = 0;
 
1079
        vacrelstats->hasindex = false;
 
1080
 
 
1081
        /* scan the heap */
 
1082
        vacuum_pages.num_pages = fraged_pages.num_pages = 0;
 
1083
        scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
 
1084
 
 
1085
        /* Now open all indexes of the relation */
 
1086
        vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
 
1087
        if (nindexes > 0)
 
1088
                vacrelstats->hasindex = true;
 
1089
 
 
1090
        /* Clean/scan index relation(s) */
 
1091
        if (Irel != NULL)
 
1092
        {
 
1093
                if (vacuum_pages.num_pages > 0)
 
1094
                {
 
1095
                        for (i = 0; i < nindexes; i++)
 
1096
                                vacuum_index(&vacuum_pages, Irel[i],
 
1097
                                                         vacrelstats->rel_tuples, 0);
 
1098
                }
 
1099
                else
 
1100
                {
 
1101
                        /* just scan indexes to update statistic */
 
1102
                        for (i = 0; i < nindexes; i++)
 
1103
                                scan_index(Irel[i], vacrelstats->rel_tuples);
 
1104
                }
 
1105
        }
 
1106
 
 
1107
        if (fraged_pages.num_pages > 0)
 
1108
        {
 
1109
                /* Try to shrink heap */
 
1110
                repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
 
1111
                                        nindexes, Irel);
 
1112
                vac_close_indexes(nindexes, Irel, NoLock);
 
1113
        }
 
1114
        else
 
1115
        {
 
1116
                vac_close_indexes(nindexes, Irel, NoLock);
 
1117
                if (vacuum_pages.num_pages > 0)
 
1118
                {
 
1119
                        /* Clean pages from vacuum_pages list */
 
1120
                        vacuum_heap(vacrelstats, onerel, &vacuum_pages);
 
1121
                }
 
1122
                else
 
1123
                {
 
1124
                        /*
 
1125
                         * Flush dirty pages out to disk.  We must do this even if we
 
1126
                         * didn't do anything else, because we want to ensure that all
 
1127
                         * tuples have correct on-row commit status on disk (see
 
1128
                         * bufmgr.c's comments for FlushRelationBuffers()).
 
1129
                         */
 
1130
                        FlushRelationBuffers(onerel, vacrelstats->rel_pages);
 
1131
                }
 
1132
        }
 
1133
 
 
1134
        /* update shared free space map with final free space info */
 
1135
        vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
 
1136
 
 
1137
        /* update statistics in pg_class */
 
1138
        vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
 
1139
                                                vacrelstats->rel_tuples, vacrelstats->hasindex);
 
1140
}
 
1141
 
 
1142
 
 
1143
/*
 
1144
 *      scan_heap() -- scan an open heap relation
 
1145
 *
 
1146
 *              This routine sets commit status bits, constructs vacuum_pages (list
 
1147
 *              of pages we need to compact free space on and/or clean indexes of
 
1148
 *              deleted tuples), constructs fraged_pages (list of pages with free
 
1149
 *              space that tuples could be moved into), and calculates statistics
 
1150
 *              on the number of live tuples in the heap.
 
1151
 */
 
1152
static void
 
1153
scan_heap(VRelStats *vacrelstats, Relation onerel,
 
1154
                  VacPageList vacuum_pages, VacPageList fraged_pages)
 
1155
{
 
1156
        BlockNumber nblocks,
 
1157
                                blkno;
 
1158
        HeapTupleData tuple;
 
1159
        char       *relname;
 
1160
        VacPage         vacpage;
 
1161
        BlockNumber empty_pages,
 
1162
                                empty_end_pages;
 
1163
        double          num_tuples,
 
1164
                                tups_vacuumed,
 
1165
                                nkeep,
 
1166
                                nunused;
 
1167
        double          free_space,
 
1168
                                usable_free_space;
 
1169
        Size            min_tlen = MaxTupleSize;
 
1170
        Size            max_tlen = 0;
 
1171
        bool            do_shrinking = true;
 
1172
        VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
 
1173
        int                     num_vtlinks = 0;
 
1174
        int                     free_vtlinks = 100;
 
1175
        VacRUsage       ru0;
 
1176
 
 
1177
        vac_init_rusage(&ru0);
 
1178
 
 
1179
        relname = RelationGetRelationName(onerel);
 
1180
        ereport(elevel,
 
1181
                        (errmsg("vacuuming \"%s.%s\"",
 
1182
                                        get_namespace_name(RelationGetNamespace(onerel)),
 
1183
                                        relname)));
 
1184
 
 
1185
        empty_pages = empty_end_pages = 0;
 
1186
        num_tuples = tups_vacuumed = nkeep = nunused = 0;
 
1187
        free_space = 0;
 
1188
 
 
1189
        nblocks = RelationGetNumberOfBlocks(onerel);
 
1190
 
 
1191
        /*
 
1192
         * We initially create each VacPage item in a maximal-sized workspace,
 
1193
         * then copy the workspace into a just-large-enough copy.
 
1194
         */
 
1195
        vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
 
1196
 
 
1197
        for (blkno = 0; blkno < nblocks; blkno++)
 
1198
        {
 
1199
                Page            page,
 
1200
                                        tempPage = NULL;
 
1201
                bool            do_reap,
 
1202
                                        do_frag;
 
1203
                Buffer          buf;
 
1204
                OffsetNumber offnum,
 
1205
                                        maxoff;
 
1206
                bool            pgchanged,
 
1207
                                        notup;
 
1208
 
 
1209
                vacuum_delay_point();
 
1210
 
 
1211
                buf = ReadBuffer(onerel, blkno);
 
1212
                page = BufferGetPage(buf);
 
1213
 
 
1214
                /*
 
1215
                 * We don't bother to do LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE)
 
1216
                 * because we assume that holding exclusive lock on the relation
 
1217
                 * will keep other backends from looking at the page.
 
1218
                 */
 
1219
 
 
1220
                vacpage->blkno = blkno;
 
1221
                vacpage->offsets_used = 0;
 
1222
                vacpage->offsets_free = 0;
 
1223
 
 
1224
                if (PageIsNew(page))
 
1225
                {
 
1226
                        VacPage         vacpagecopy;
 
1227
 
 
1228
                        ereport(WARNING,
 
1229
                        (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
 
1230
                                        relname, blkno)));
 
1231
                        PageInit(page, BufferGetPageSize(buf), 0);
 
1232
                        vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
 
1233
                        free_space += vacpage->free;
 
1234
                        empty_pages++;
 
1235
                        empty_end_pages++;
 
1236
                        vacpagecopy = copy_vac_page(vacpage);
 
1237
                        vpage_insert(vacuum_pages, vacpagecopy);
 
1238
                        vpage_insert(fraged_pages, vacpagecopy);
 
1239
                        WriteBuffer(buf);
 
1240
                        continue;
 
1241
                }
 
1242
 
 
1243
                if (PageIsEmpty(page))
 
1244
                {
 
1245
                        VacPage         vacpagecopy;
 
1246
 
 
1247
                        vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
 
1248
                        free_space += vacpage->free;
 
1249
                        empty_pages++;
 
1250
                        empty_end_pages++;
 
1251
                        vacpagecopy = copy_vac_page(vacpage);
 
1252
                        vpage_insert(vacuum_pages, vacpagecopy);
 
1253
                        vpage_insert(fraged_pages, vacpagecopy);
 
1254
                        ReleaseBuffer(buf);
 
1255
                        continue;
 
1256
                }
 
1257
 
 
1258
                pgchanged = false;
 
1259
                notup = true;
 
1260
                maxoff = PageGetMaxOffsetNumber(page);
 
1261
                for (offnum = FirstOffsetNumber;
 
1262
                         offnum <= maxoff;
 
1263
                         offnum = OffsetNumberNext(offnum))
 
1264
                {
 
1265
                        ItemId          itemid = PageGetItemId(page, offnum);
 
1266
                        bool            tupgone = false;
 
1267
 
 
1268
                        /*
 
1269
                         * Collect un-used items too - it's possible to have indexes
 
1270
                         * pointing here after crash.
 
1271
                         */
 
1272
                        if (!ItemIdIsUsed(itemid))
 
1273
                        {
 
1274
                                vacpage->offsets[vacpage->offsets_free++] = offnum;
 
1275
                                nunused += 1;
 
1276
                                continue;
 
1277
                        }
 
1278
 
 
1279
                        tuple.t_datamcxt = NULL;
 
1280
                        tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
 
1281
                        tuple.t_len = ItemIdGetLength(itemid);
 
1282
                        ItemPointerSet(&(tuple.t_self), blkno, offnum);
 
1283
 
 
1284
                        switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
 
1285
                        {
 
1286
                                case HEAPTUPLE_DEAD:
 
1287
                                        tupgone = true;         /* we can delete the tuple */
 
1288
                                        break;
 
1289
                                case HEAPTUPLE_LIVE:
 
1290
 
 
1291
                                        /*
 
1292
                                         * Tuple is good.  Consider whether to replace its
 
1293
                                         * xmin value with FrozenTransactionId.
 
1294
                                         */
 
1295
                                        if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
 
1296
                                                TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
 
1297
                                                                                          FreezeLimit))
 
1298
                                        {
 
1299
                                                HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
 
1300
                                                /* infomask should be okay already */
 
1301
                                                Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
 
1302
                                                pgchanged = true;
 
1303
                                        }
 
1304
 
 
1305
                                        /*
 
1306
                                         * Other checks...
 
1307
                                         */
 
1308
                                        if (onerel->rd_rel->relhasoids &&
 
1309
                                                !OidIsValid(HeapTupleGetOid(&tuple)))
 
1310
                                                elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
 
1311
                                                         relname, blkno, offnum);
 
1312
                                        break;
 
1313
                                case HEAPTUPLE_RECENTLY_DEAD:
 
1314
 
 
1315
                                        /*
 
1316
                                         * If tuple is recently deleted then we must not
 
1317
                                         * remove it from relation.
 
1318
                                         */
 
1319
                                        nkeep += 1;
 
1320
 
 
1321
                                        /*
 
1322
                                         * If we do shrinking and this tuple is updated one
 
1323
                                         * then remember it to construct updated tuple
 
1324
                                         * dependencies.
 
1325
                                         */
 
1326
                                        if (do_shrinking &&
 
1327
                                                !(ItemPointerEquals(&(tuple.t_self),
 
1328
                                                                                        &(tuple.t_data->t_ctid))))
 
1329
                                        {
 
1330
                                                if (free_vtlinks == 0)
 
1331
                                                {
 
1332
                                                        free_vtlinks = 1000;
 
1333
                                                        vtlinks = (VTupleLink) repalloc(vtlinks,
 
1334
                                                                                   (free_vtlinks + num_vtlinks) *
 
1335
                                                                                                 sizeof(VTupleLinkData));
 
1336
                                                }
 
1337
                                                vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
 
1338
                                                vtlinks[num_vtlinks].this_tid = tuple.t_self;
 
1339
                                                free_vtlinks--;
 
1340
                                                num_vtlinks++;
 
1341
                                        }
 
1342
                                        break;
 
1343
                                case HEAPTUPLE_INSERT_IN_PROGRESS:
 
1344
 
 
1345
                                        /*
 
1346
                                         * This should not happen, since we hold exclusive
 
1347
                                         * lock on the relation; shouldn't we raise an error?
 
1348
                                         * (Actually, it can happen in system catalogs, since
 
1349
                                         * we tend to release write lock before commit there.)
 
1350
                                         */
 
1351
                                        ereport(NOTICE,
 
1352
                                                        (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- can't shrink relation",
 
1353
                                                                        relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data))));
 
1354
                                        do_shrinking = false;
 
1355
                                        break;
 
1356
                                case HEAPTUPLE_DELETE_IN_PROGRESS:
 
1357
 
 
1358
                                        /*
 
1359
                                         * This should not happen, since we hold exclusive
 
1360
                                         * lock on the relation; shouldn't we raise an error?
 
1361
                                         * (Actually, it can happen in system catalogs, since
 
1362
                                         * we tend to release write lock before commit there.)
 
1363
                                         */
 
1364
                                        ereport(NOTICE,
 
1365
                                                        (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- can't shrink relation",
 
1366
                                                                        relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data))));
 
1367
                                        do_shrinking = false;
 
1368
                                        break;
 
1369
                                default:
 
1370
                                        elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
 
1371
                                        break;
 
1372
                        }
 
1373
 
 
1374
                        if (tupgone)
 
1375
                        {
 
1376
                                ItemId          lpp;
 
1377
 
 
1378
                                /*
 
1379
                                 * Here we are building a temporary copy of the page with
 
1380
                                 * dead tuples removed.  Below we will apply
 
1381
                                 * PageRepairFragmentation to the copy, so that we can
 
1382
                                 * determine how much space will be available after
 
1383
                                 * removal of dead tuples.      But note we are NOT changing
 
1384
                                 * the real page yet...
 
1385
                                 */
 
1386
                                if (tempPage == NULL)
 
1387
                                {
 
1388
                                        Size            pageSize;
 
1389
 
 
1390
                                        pageSize = PageGetPageSize(page);
 
1391
                                        tempPage = (Page) palloc(pageSize);
 
1392
                                        memcpy(tempPage, page, pageSize);
 
1393
                                }
 
1394
 
 
1395
                                /* mark it unused on the temp page */
 
1396
                                lpp = PageGetItemId(tempPage, offnum);
 
1397
                                lpp->lp_flags &= ~LP_USED;
 
1398
 
 
1399
                                vacpage->offsets[vacpage->offsets_free++] = offnum;
 
1400
                                tups_vacuumed += 1;
 
1401
                        }
 
1402
                        else
 
1403
                        {
 
1404
                                num_tuples += 1;
 
1405
                                notup = false;
 
1406
                                if (tuple.t_len < min_tlen)
 
1407
                                        min_tlen = tuple.t_len;
 
1408
                                if (tuple.t_len > max_tlen)
 
1409
                                        max_tlen = tuple.t_len;
 
1410
                        }
 
1411
                }                                               /* scan along page */
 
1412
 
 
1413
                if (tempPage != NULL)
 
1414
                {
 
1415
                        /* Some tuples are removable; figure free space after removal */
 
1416
                        PageRepairFragmentation(tempPage, NULL);
 
1417
                        vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
 
1418
                        pfree(tempPage);
 
1419
                        do_reap = true;
 
1420
                }
 
1421
                else
 
1422
                {
 
1423
                        /* Just use current available space */
 
1424
                        vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
 
1425
                        /* Need to reap the page if it has ~LP_USED line pointers */
 
1426
                        do_reap = (vacpage->offsets_free > 0);
 
1427
                }
 
1428
 
 
1429
                free_space += vacpage->free;
 
1430
 
 
1431
                /*
 
1432
                 * Add the page to fraged_pages if it has a useful amount of free
 
1433
                 * space.  "Useful" means enough for a minimal-sized tuple. But we
 
1434
                 * don't know that accurately near the start of the relation, so
 
1435
                 * add pages unconditionally if they have >= BLCKSZ/10 free space.
 
1436
                 */
 
1437
                do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
 
1438
 
 
1439
                if (do_reap || do_frag)
 
1440
                {
 
1441
                        VacPage         vacpagecopy = copy_vac_page(vacpage);
 
1442
 
 
1443
                        if (do_reap)
 
1444
                                vpage_insert(vacuum_pages, vacpagecopy);
 
1445
                        if (do_frag)
 
1446
                                vpage_insert(fraged_pages, vacpagecopy);
 
1447
                }
 
1448
 
 
1449
                /*
 
1450
                 * Include the page in empty_end_pages if it will be empty after
 
1451
                 * vacuuming; this is to keep us from using it as a move
 
1452
                 * destination.
 
1453
                 */
 
1454
                if (notup)
 
1455
                {
 
1456
                        empty_pages++;
 
1457
                        empty_end_pages++;
 
1458
                }
 
1459
                else
 
1460
                        empty_end_pages = 0;
 
1461
 
 
1462
                if (pgchanged)
 
1463
                        WriteBuffer(buf);
 
1464
                else
 
1465
                        ReleaseBuffer(buf);
 
1466
        }
 
1467
 
 
1468
        pfree(vacpage);
 
1469
 
 
1470
        /* save stats in the rel list for use later */
 
1471
        vacrelstats->rel_tuples = num_tuples;
 
1472
        vacrelstats->rel_pages = nblocks;
 
1473
        if (num_tuples == 0)
 
1474
                min_tlen = max_tlen = 0;
 
1475
        vacrelstats->min_tlen = min_tlen;
 
1476
        vacrelstats->max_tlen = max_tlen;
 
1477
 
 
1478
        vacuum_pages->empty_end_pages = empty_end_pages;
 
1479
        fraged_pages->empty_end_pages = empty_end_pages;
 
1480
 
 
1481
        /*
 
1482
         * Clear the fraged_pages list if we found we couldn't shrink. Else,
 
1483
         * remove any "empty" end-pages from the list, and compute usable free
 
1484
         * space = free space in remaining pages.
 
1485
         */
 
1486
        if (do_shrinking)
 
1487
        {
 
1488
                int                     i;
 
1489
 
 
1490
                Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
 
1491
                fraged_pages->num_pages -= empty_end_pages;
 
1492
                usable_free_space = 0;
 
1493
                for (i = 0; i < fraged_pages->num_pages; i++)
 
1494
                        usable_free_space += fraged_pages->pagedesc[i]->free;
 
1495
        }
 
1496
        else
 
1497
        {
 
1498
                fraged_pages->num_pages = 0;
 
1499
                usable_free_space = 0;
 
1500
        }
 
1501
 
 
1502
        /* don't bother to save vtlinks if we will not call repair_frag */
 
1503
        if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
 
1504
        {
 
1505
                qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
 
1506
                          vac_cmp_vtlinks);
 
1507
                vacrelstats->vtlinks = vtlinks;
 
1508
                vacrelstats->num_vtlinks = num_vtlinks;
 
1509
        }
 
1510
        else
 
1511
        {
 
1512
                vacrelstats->vtlinks = NULL;
 
1513
                vacrelstats->num_vtlinks = 0;
 
1514
                pfree(vtlinks);
 
1515
        }
 
1516
 
 
1517
        ereport(elevel,
 
1518
                        (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
 
1519
                                        RelationGetRelationName(onerel),
 
1520
                                        tups_vacuumed, num_tuples, nblocks),
 
1521
                         errdetail("%.0f dead row versions cannot be removed yet.\n"
 
1522
                  "Nonremovable row versions range from %lu to %lu bytes long.\n"
 
1523
                                           "There were %.0f unused item pointers.\n"
 
1524
        "Total free space (including removable row versions) is %.0f bytes.\n"
 
1525
                                           "%u pages are or will become empty, including %u at the end of the table.\n"
 
1526
                                           "%u pages containing %.0f free bytes are potential move destinations.\n"
 
1527
                                           "%s",
 
1528
                                           nkeep,
 
1529
                                           (unsigned long) min_tlen, (unsigned long) max_tlen,
 
1530
                                           nunused,
 
1531
                                           free_space,
 
1532
                                           empty_pages, empty_end_pages,
 
1533
                                           fraged_pages->num_pages, usable_free_space,
 
1534
                                           vac_show_rusage(&ru0))));
 
1535
}
 
1536
 
 
1537
 
 
1538
/*
 
1539
 *      repair_frag() -- try to repair relation's fragmentation
 
1540
 *
 
1541
 *              This routine marks dead tuples as unused and tries re-use dead space
 
1542
 *              by moving tuples (and inserting indexes if needed). It constructs
 
1543
 *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
 
1544
 *              for them after committing (in hack-manner - without losing locks
 
1545
 *              and freeing memory!) current transaction. It truncates relation
 
1546
 *              if some end-blocks are gone away.
 
1547
 */
 
1548
static void
 
1549
repair_frag(VRelStats *vacrelstats, Relation onerel,
 
1550
                        VacPageList vacuum_pages, VacPageList fraged_pages,
 
1551
                        int nindexes, Relation *Irel)
 
1552
{
 
1553
        TransactionId myXID = GetCurrentTransactionId();
 
1554
        Buffer          dst_buffer = InvalidBuffer;
 
1555
        BlockNumber nblocks,
 
1556
                                blkno;
 
1557
        BlockNumber last_move_dest_block = 0,
 
1558
                                last_vacuum_block;
 
1559
        Page            dst_page = NULL;
 
1560
        ExecContextData ec;
 
1561
        VacPageListData Nvacpagelist;
 
1562
        VacPage         dst_vacpage = NULL,
 
1563
                                last_vacuum_page,
 
1564
                                vacpage,
 
1565
                           *curpage;
 
1566
        int                     i;
 
1567
        int                     num_moved = 0,
 
1568
                                num_fraged_pages,
 
1569
                                vacuumed_pages;
 
1570
        int                     keep_tuples = 0;
 
1571
        VacRUsage       ru0;
 
1572
 
 
1573
        vac_init_rusage(&ru0);
 
1574
 
 
1575
        ExecContext_Init(&ec, onerel);
 
1576
 
 
1577
        Nvacpagelist.num_pages = 0;
 
1578
        num_fraged_pages = fraged_pages->num_pages;
 
1579
        Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
 
1580
        vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
 
1581
        if (vacuumed_pages > 0)
 
1582
        {
 
1583
                /* get last reaped page from vacuum_pages */
 
1584
                last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
 
1585
                last_vacuum_block = last_vacuum_page->blkno;
 
1586
        }
 
1587
        else
 
1588
        {
 
1589
                last_vacuum_page = NULL;
 
1590
                last_vacuum_block = InvalidBlockNumber;
 
1591
        }
 
1592
 
 
1593
        vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
 
1594
        vacpage->offsets_used = vacpage->offsets_free = 0;
 
1595
 
 
1596
        /*
 
1597
         * Scan pages backwards from the last nonempty page, trying to move
 
1598
         * tuples down to lower pages.  Quit when we reach a page that we have
 
1599
         * moved any tuples onto, or the first page if we haven't moved
 
1600
         * anything, or when we find a page we cannot completely empty (this
 
1601
         * last condition is handled by "break" statements within the loop).
 
1602
         *
 
1603
         * NB: this code depends on the vacuum_pages and fraged_pages lists being
 
1604
         * in order by blkno.
 
1605
         */
 
1606
        nblocks = vacrelstats->rel_pages;
 
1607
        for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
 
1608
                 blkno > last_move_dest_block;
 
1609
                 blkno--)
 
1610
        {
 
1611
                Buffer          buf;
 
1612
                Page            page;
 
1613
                OffsetNumber offnum,
 
1614
                                        maxoff;
 
1615
                bool            isempty,
 
1616
                                        dowrite,
 
1617
                                        chain_tuple_moved;
 
1618
 
 
1619
                vacuum_delay_point();
 
1620
 
 
1621
                /*
 
1622
                 * Forget fraged_pages pages at or after this one; they're no
 
1623
                 * longer useful as move targets, since we only want to move down.
 
1624
                 * Note that since we stop the outer loop at last_move_dest_block,
 
1625
                 * pages removed here cannot have had anything moved onto them
 
1626
                 * already.
 
1627
                 *
 
1628
                 * Also note that we don't change the stored fraged_pages list, only
 
1629
                 * our local variable num_fraged_pages; so the forgotten pages are
 
1630
                 * still available to be loaded into the free space map later.
 
1631
                 */
 
1632
                while (num_fraged_pages > 0 &&
 
1633
                        fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
 
1634
                {
 
1635
                        Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
 
1636
                        --num_fraged_pages;
 
1637
                }
 
1638
 
 
1639
                /*
 
1640
                 * Process this page of relation.
 
1641
                 */
 
1642
                buf = ReadBuffer(onerel, blkno);
 
1643
                page = BufferGetPage(buf);
 
1644
 
 
1645
                vacpage->offsets_free = 0;
 
1646
 
 
1647
                isempty = PageIsEmpty(page);
 
1648
 
 
1649
                dowrite = false;
 
1650
 
 
1651
                /* Is the page in the vacuum_pages list? */
 
1652
                if (blkno == last_vacuum_block)
 
1653
                {
 
1654
                        if (last_vacuum_page->offsets_free > 0)
 
1655
                        {
 
1656
                                /* there are dead tuples on this page - clean them */
 
1657
                                Assert(!isempty);
 
1658
                                LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
1659
                                vacuum_page(onerel, buf, last_vacuum_page);
 
1660
                                LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
1661
                                dowrite = true;
 
1662
                        }
 
1663
                        else
 
1664
                                Assert(isempty);
 
1665
                        --vacuumed_pages;
 
1666
                        if (vacuumed_pages > 0)
 
1667
                        {
 
1668
                                /* get prev reaped page from vacuum_pages */
 
1669
                                last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
 
1670
                                last_vacuum_block = last_vacuum_page->blkno;
 
1671
                        }
 
1672
                        else
 
1673
                        {
 
1674
                                last_vacuum_page = NULL;
 
1675
                                last_vacuum_block = InvalidBlockNumber;
 
1676
                        }
 
1677
                        if (isempty)
 
1678
                        {
 
1679
                                ReleaseBuffer(buf);
 
1680
                                continue;
 
1681
                        }
 
1682
                }
 
1683
                else
 
1684
                        Assert(!isempty);
 
1685
 
 
1686
                chain_tuple_moved = false;              /* no one chain-tuple was moved
 
1687
                                                                                 * off this page, yet */
 
1688
                vacpage->blkno = blkno;
 
1689
                maxoff = PageGetMaxOffsetNumber(page);
 
1690
                for (offnum = FirstOffsetNumber;
 
1691
                         offnum <= maxoff;
 
1692
                         offnum = OffsetNumberNext(offnum))
 
1693
                {
 
1694
                        Size            tuple_len;
 
1695
                        HeapTupleData tuple;
 
1696
                        ItemId          itemid = PageGetItemId(page, offnum);
 
1697
 
 
1698
                        if (!ItemIdIsUsed(itemid))
 
1699
                                continue;
 
1700
 
 
1701
                        tuple.t_datamcxt = NULL;
 
1702
                        tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
 
1703
                        tuple_len = tuple.t_len = ItemIdGetLength(itemid);
 
1704
                        ItemPointerSet(&(tuple.t_self), blkno, offnum);
 
1705
 
 
1706
                        /*
 
1707
                         * VACUUM FULL has an exclusive lock on the relation.  So
 
1708
                         * normally no other transaction can have pending INSERTs or
 
1709
                         * DELETEs in this relation.  A tuple is either (a) a tuple in
 
1710
                         * a system catalog, inserted or deleted by a not yet
 
1711
                         * committed transaction or (b) dead (XMIN_INVALID or
 
1712
                         * XMAX_COMMITTED) or (c) inserted by a committed xact
 
1713
                         * (XMIN_COMMITTED) or (d) moved by the currently running
 
1714
                         * VACUUM. In case (a) we wouldn't be in repair_frag() at all.
 
1715
                         * In case (b) we cannot be here, because scan_heap() has
 
1716
                         * already marked the item as unused, see continue above. Case
 
1717
                         * (c) is what normally is to be expected. Case (d) is only
 
1718
                         * possible, if a whole tuple chain has been moved while
 
1719
                         * processing this or a higher numbered block.
 
1720
                         */
 
1721
                        if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
 
1722
                        {
 
1723
                                if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
 
1724
                                        elog(ERROR, "HEAP_MOVED_IN was not expected");
 
1725
                                if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
 
1726
                                        elog(ERROR, "HEAP_MOVED_OFF was expected");
 
1727
 
 
1728
                                /*
 
1729
                                 * MOVED_OFF by another VACUUM would have caused the
 
1730
                                 * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
 
1731
                                 */
 
1732
                                if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
 
1733
                                        elog(ERROR, "invalid XVAC in tuple header");
 
1734
 
 
1735
                                /*
 
1736
                                 * If this (chain) tuple is moved by me already then I
 
1737
                                 * have to check is it in vacpage or not - i.e. is it
 
1738
                                 * moved while cleaning this page or some previous one.
 
1739
                                 */
 
1740
 
 
1741
                                /* Can't we Assert(keep_tuples > 0) here? */
 
1742
                                if (keep_tuples == 0)
 
1743
                                        continue;
 
1744
                                if (chain_tuple_moved)
 
1745
                                {
 
1746
                                        /* some chains were moved while cleaning this page */
 
1747
                                        Assert(vacpage->offsets_free > 0);
 
1748
                                        for (i = 0; i < vacpage->offsets_free; i++)
 
1749
                                        {
 
1750
                                                if (vacpage->offsets[i] == offnum)
 
1751
                                                        break;
 
1752
                                        }
 
1753
                                        if (i >= vacpage->offsets_free)         /* not found */
 
1754
                                        {
 
1755
                                                vacpage->offsets[vacpage->offsets_free++] = offnum;
 
1756
                                                keep_tuples--;
 
1757
                                        }
 
1758
                                }
 
1759
                                else
 
1760
                                {
 
1761
                                        vacpage->offsets[vacpage->offsets_free++] = offnum;
 
1762
                                        keep_tuples--;
 
1763
                                }
 
1764
                                continue;
 
1765
                        }
 
1766
 
 
1767
                        /*
 
1768
                         * If this tuple is in the chain of tuples created in updates
 
1769
                         * by "recent" transactions then we have to move all chain of
 
1770
                         * tuples to another places.
 
1771
                         *
 
1772
                         * NOTE: this test is not 100% accurate: it is possible for a
 
1773
                         * tuple to be an updated one with recent xmin, and yet not
 
1774
                         * have a corresponding tuple in the vtlinks list.      Presumably
 
1775
                         * there was once a parent tuple with xmax matching the xmin,
 
1776
                         * but it's possible that that tuple has been removed --- for
 
1777
                         * example, if it had xmin = xmax then
 
1778
                         * HeapTupleSatisfiesVacuum would deem it removable as soon as
 
1779
                         * the xmin xact completes.
 
1780
                         *
 
1781
                         * To be on the safe side, we abandon the repair_frag process if
 
1782
                         * we cannot find the parent tuple in vtlinks.  This may be
 
1783
                         * overly conservative; AFAICS it would be safe to move the
 
1784
                         * chain.
 
1785
                         */
 
1786
                        if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
 
1787
                         !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
 
1788
                                                                        OldestXmin)) ||
 
1789
                                (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
 
1790
                                                                                           HEAP_MARKED_FOR_UPDATE)) &&
 
1791
                                 !(ItemPointerEquals(&(tuple.t_self),
 
1792
                                                                         &(tuple.t_data->t_ctid)))))
 
1793
                        {
 
1794
                                Buffer          Cbuf = buf;
 
1795
                                bool            freeCbuf = false;
 
1796
                                bool            chain_move_failed = false;
 
1797
                                ItemPointerData Ctid;
 
1798
                                HeapTupleData tp = tuple;
 
1799
                                Size            tlen = tuple_len;
 
1800
                                VTupleMove      vtmove;
 
1801
                                int                     num_vtmove;
 
1802
                                int                     free_vtmove;
 
1803
                                VacPage         to_vacpage = NULL;
 
1804
                                int                     to_item = 0;
 
1805
                                int                     ti;
 
1806
 
 
1807
                                if (dst_buffer != InvalidBuffer)
 
1808
                                {
 
1809
                                        WriteBuffer(dst_buffer);
 
1810
                                        dst_buffer = InvalidBuffer;
 
1811
                                }
 
1812
 
 
1813
                                /* Quick exit if we have no vtlinks to search in */
 
1814
                                if (vacrelstats->vtlinks == NULL)
 
1815
                                {
 
1816
                                        elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
 
1817
                                        break;          /* out of walk-along-page loop */
 
1818
                                }
 
1819
 
 
1820
                                vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
 
1821
                                num_vtmove = 0;
 
1822
                                free_vtmove = 100;
 
1823
 
 
1824
                                /*
 
1825
                                 * If this tuple is in the begin/middle of the chain then
 
1826
                                 * we have to move to the end of chain.
 
1827
                                 */
 
1828
                                while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
 
1829
                                                                                          HEAP_MARKED_FOR_UPDATE)) &&
 
1830
                                           !(ItemPointerEquals(&(tp.t_self),
 
1831
                                                                                   &(tp.t_data->t_ctid))))
 
1832
                                {
 
1833
                                        Page            Cpage;
 
1834
                                        ItemId          Citemid;
 
1835
                                        ItemPointerData Ctid;
 
1836
 
 
1837
                                        Ctid = tp.t_data->t_ctid;
 
1838
                                        if (freeCbuf)
 
1839
                                                ReleaseBuffer(Cbuf);
 
1840
                                        freeCbuf = true;
 
1841
                                        Cbuf = ReadBuffer(onerel,
 
1842
                                                                          ItemPointerGetBlockNumber(&Ctid));
 
1843
                                        Cpage = BufferGetPage(Cbuf);
 
1844
                                        Citemid = PageGetItemId(Cpage,
 
1845
                                                                          ItemPointerGetOffsetNumber(&Ctid));
 
1846
                                        if (!ItemIdIsUsed(Citemid))
 
1847
                                        {
 
1848
                                                /*
 
1849
                                                 * This means that in the middle of chain there
 
1850
                                                 * was tuple updated by older (than OldestXmin)
 
1851
                                                 * xaction and this tuple is already deleted by
 
1852
                                                 * me. Actually, upper part of chain should be
 
1853
                                                 * removed and seems that this should be handled
 
1854
                                                 * in scan_heap(), but it's not implemented at the
 
1855
                                                 * moment and so we just stop shrinking here.
 
1856
                                                 */
 
1857
                                                elog(DEBUG2, "child itemid in update-chain marked as unused --- can't continue repair_frag");
 
1858
                                                chain_move_failed = true;
 
1859
                                                break;  /* out of loop to move to chain end */
 
1860
                                        }
 
1861
                                        tp.t_datamcxt = NULL;
 
1862
                                        tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
 
1863
                                        tp.t_self = Ctid;
 
1864
                                        tlen = tp.t_len = ItemIdGetLength(Citemid);
 
1865
                                }
 
1866
                                if (chain_move_failed)
 
1867
                                {
 
1868
                                        if (freeCbuf)
 
1869
                                                ReleaseBuffer(Cbuf);
 
1870
                                        pfree(vtmove);
 
1871
                                        break;          /* out of walk-along-page loop */
 
1872
                                }
 
1873
 
 
1874
                                /*
 
1875
                                 * Check if all items in chain can be moved
 
1876
                                 */
 
1877
                                for (;;)
 
1878
                                {
 
1879
                                        Buffer          Pbuf;
 
1880
                                        Page            Ppage;
 
1881
                                        ItemId          Pitemid;
 
1882
                                        HeapTupleData Ptp;
 
1883
                                        VTupleLinkData vtld,
 
1884
                                                           *vtlp;
 
1885
 
 
1886
                                        if (to_vacpage == NULL ||
 
1887
                                                !enough_space(to_vacpage, tlen))
 
1888
                                        {
 
1889
                                                for (i = 0; i < num_fraged_pages; i++)
 
1890
                                                {
 
1891
                                                        if (enough_space(fraged_pages->pagedesc[i], tlen))
 
1892
                                                                break;
 
1893
                                                }
 
1894
 
 
1895
                                                if (i == num_fraged_pages)
 
1896
                                                {
 
1897
                                                        /* can't move item anywhere */
 
1898
                                                        chain_move_failed = true;
 
1899
                                                        break;          /* out of check-all-items loop */
 
1900
                                                }
 
1901
                                                to_item = i;
 
1902
                                                to_vacpage = fraged_pages->pagedesc[to_item];
 
1903
                                        }
 
1904
                                        to_vacpage->free -= MAXALIGN(tlen);
 
1905
                                        if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
 
1906
                                                to_vacpage->free -= sizeof(ItemIdData);
 
1907
                                        (to_vacpage->offsets_used)++;
 
1908
                                        if (free_vtmove == 0)
 
1909
                                        {
 
1910
                                                free_vtmove = 1000;
 
1911
                                                vtmove = (VTupleMove)
 
1912
                                                        repalloc(vtmove,
 
1913
                                                                         (free_vtmove + num_vtmove) *
 
1914
                                                                         sizeof(VTupleMoveData));
 
1915
                                        }
 
1916
                                        vtmove[num_vtmove].tid = tp.t_self;
 
1917
                                        vtmove[num_vtmove].vacpage = to_vacpage;
 
1918
                                        if (to_vacpage->offsets_used == 1)
 
1919
                                                vtmove[num_vtmove].cleanVpd = true;
 
1920
                                        else
 
1921
                                                vtmove[num_vtmove].cleanVpd = false;
 
1922
                                        free_vtmove--;
 
1923
                                        num_vtmove++;
 
1924
 
 
1925
                                        /* At beginning of chain? */
 
1926
                                        if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
 
1927
                                                TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
 
1928
                                                                                          OldestXmin))
 
1929
                                                break;
 
1930
 
 
1931
                                        /* No, move to tuple with prior row version */
 
1932
                                        vtld.new_tid = tp.t_self;
 
1933
                                        vtlp = (VTupleLink)
 
1934
                                                vac_bsearch((void *) &vtld,
 
1935
                                                                        (void *) (vacrelstats->vtlinks),
 
1936
                                                                        vacrelstats->num_vtlinks,
 
1937
                                                                        sizeof(VTupleLinkData),
 
1938
                                                                        vac_cmp_vtlinks);
 
1939
                                        if (vtlp == NULL)
 
1940
                                        {
 
1941
                                                /* see discussion above */
 
1942
                                                elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
 
1943
                                                chain_move_failed = true;
 
1944
                                                break;  /* out of check-all-items loop */
 
1945
                                        }
 
1946
                                        tp.t_self = vtlp->this_tid;
 
1947
                                        Pbuf = ReadBuffer(onerel,
 
1948
                                                                ItemPointerGetBlockNumber(&(tp.t_self)));
 
1949
                                        Ppage = BufferGetPage(Pbuf);
 
1950
                                        Pitemid = PageGetItemId(Ppage,
 
1951
                                                           ItemPointerGetOffsetNumber(&(tp.t_self)));
 
1952
                                        /* this can't happen since we saw tuple earlier: */
 
1953
                                        if (!ItemIdIsUsed(Pitemid))
 
1954
                                                elog(ERROR, "parent itemid marked as unused");
 
1955
                                        Ptp.t_datamcxt = NULL;
 
1956
                                        Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
 
1957
 
 
1958
                                        /* ctid should not have changed since we saved it */
 
1959
                                        Assert(ItemPointerEquals(&(vtld.new_tid),
 
1960
                                                                                         &(Ptp.t_data->t_ctid)));
 
1961
 
 
1962
                                        /*
 
1963
                                         * Read above about cases when !ItemIdIsUsed(Citemid)
 
1964
                                         * (child item is removed)... Due to the fact that at
 
1965
                                         * the moment we don't remove unuseful part of
 
1966
                                         * update-chain, it's possible to get too old parent
 
1967
                                         * row here. Like as in the case which caused this
 
1968
                                         * problem, we stop shrinking here. I could try to
 
1969
                                         * find real parent row but want not to do it because
 
1970
                                         * of real solution will be implemented anyway, later,
 
1971
                                         * and we are too close to 6.5 release. - vadim
 
1972
                                         * 06/11/99
 
1973
                                         */
 
1974
                                        if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
 
1975
                                                                         HeapTupleHeaderGetXmin(tp.t_data))))
 
1976
                                        {
 
1977
                                                ReleaseBuffer(Pbuf);
 
1978
                                                elog(DEBUG2, "too old parent tuple found --- can't continue repair_frag");
 
1979
                                                chain_move_failed = true;
 
1980
                                                break;  /* out of check-all-items loop */
 
1981
                                        }
 
1982
                                        tp.t_datamcxt = Ptp.t_datamcxt;
 
1983
                                        tp.t_data = Ptp.t_data;
 
1984
                                        tlen = tp.t_len = ItemIdGetLength(Pitemid);
 
1985
                                        if (freeCbuf)
 
1986
                                                ReleaseBuffer(Cbuf);
 
1987
                                        Cbuf = Pbuf;
 
1988
                                        freeCbuf = true;
 
1989
                                }                               /* end of check-all-items loop */
 
1990
 
 
1991
                                if (freeCbuf)
 
1992
                                        ReleaseBuffer(Cbuf);
 
1993
                                freeCbuf = false;
 
1994
 
 
1995
                                if (chain_move_failed)
 
1996
                                {
 
1997
                                        /*
 
1998
                                         * Undo changes to offsets_used state.  We don't
 
1999
                                         * bother cleaning up the amount-free state, since
 
2000
                                         * we're not going to do any further tuple motion.
 
2001
                                         */
 
2002
                                        for (i = 0; i < num_vtmove; i++)
 
2003
                                        {
 
2004
                                                Assert(vtmove[i].vacpage->offsets_used > 0);
 
2005
                                                (vtmove[i].vacpage->offsets_used)--;
 
2006
                                        }
 
2007
                                        pfree(vtmove);
 
2008
                                        break;          /* out of walk-along-page loop */
 
2009
                                }
 
2010
 
 
2011
                                /*
 
2012
                                 * Okay, move the whole tuple chain
 
2013
                                 */
 
2014
                                ItemPointerSetInvalid(&Ctid);
 
2015
                                for (ti = 0; ti < num_vtmove; ti++)
 
2016
                                {
 
2017
                                        VacPage         destvacpage = vtmove[ti].vacpage;
 
2018
                                        Page            Cpage;
 
2019
                                        ItemId          Citemid;
 
2020
 
 
2021
                                        /* Get page to move from */
 
2022
                                        tuple.t_self = vtmove[ti].tid;
 
2023
                                        Cbuf = ReadBuffer(onerel,
 
2024
                                                         ItemPointerGetBlockNumber(&(tuple.t_self)));
 
2025
 
 
2026
                                        /* Get page to move to */
 
2027
                                        dst_buffer = ReadBuffer(onerel, destvacpage->blkno);
 
2028
 
 
2029
                                        LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
 
2030
                                        if (dst_buffer != Cbuf)
 
2031
                                                LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
 
2032
 
 
2033
                                        dst_page = BufferGetPage(dst_buffer);
 
2034
                                        Cpage = BufferGetPage(Cbuf);
 
2035
 
 
2036
                                        Citemid = PageGetItemId(Cpage,
 
2037
                                                        ItemPointerGetOffsetNumber(&(tuple.t_self)));
 
2038
                                        tuple.t_datamcxt = NULL;
 
2039
                                        tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
 
2040
                                        tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
 
2041
 
 
2042
                                        /*
 
2043
                                         * make a copy of the source tuple, and then mark the
 
2044
                                         * source tuple MOVED_OFF.
 
2045
                                         */
 
2046
                                        move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
 
2047
                                                                         dst_buffer, dst_page, destvacpage,
 
2048
                                                                         &ec, &Ctid, vtmove[ti].cleanVpd);
 
2049
 
 
2050
                                        num_moved++;
 
2051
                                        if (destvacpage->blkno > last_move_dest_block)
 
2052
                                                last_move_dest_block = destvacpage->blkno;
 
2053
 
 
2054
                                        /*
 
2055
                                         * Remember that we moved tuple from the current page
 
2056
                                         * (corresponding index tuple will be cleaned).
 
2057
                                         */
 
2058
                                        if (Cbuf == buf)
 
2059
                                                vacpage->offsets[vacpage->offsets_free++] =
 
2060
                                                        ItemPointerGetOffsetNumber(&(tuple.t_self));
 
2061
                                        else
 
2062
                                                keep_tuples++;
 
2063
 
 
2064
                                        WriteBuffer(dst_buffer);
 
2065
                                        WriteBuffer(Cbuf);
 
2066
                                }                               /* end of move-the-tuple-chain loop */
 
2067
 
 
2068
                                dst_buffer = InvalidBuffer;
 
2069
                                pfree(vtmove);
 
2070
                                chain_tuple_moved = true;
 
2071
 
 
2072
                                /* advance to next tuple in walk-along-page loop */
 
2073
                                continue;
 
2074
                        }                                       /* end of is-tuple-in-chain test */
 
2075
 
 
2076
                        /* try to find new page for this tuple */
 
2077
                        if (dst_buffer == InvalidBuffer ||
 
2078
                                !enough_space(dst_vacpage, tuple_len))
 
2079
                        {
 
2080
                                if (dst_buffer != InvalidBuffer)
 
2081
                                {
 
2082
                                        WriteBuffer(dst_buffer);
 
2083
                                        dst_buffer = InvalidBuffer;
 
2084
                                }
 
2085
                                for (i = 0; i < num_fraged_pages; i++)
 
2086
                                {
 
2087
                                        if (enough_space(fraged_pages->pagedesc[i], tuple_len))
 
2088
                                                break;
 
2089
                                }
 
2090
                                if (i == num_fraged_pages)
 
2091
                                        break;          /* can't move item anywhere */
 
2092
                                dst_vacpage = fraged_pages->pagedesc[i];
 
2093
                                dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
 
2094
                                LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
 
2095
                                dst_page = BufferGetPage(dst_buffer);
 
2096
                                /* if this page was not used before - clean it */
 
2097
                                if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
 
2098
                                        vacuum_page(onerel, dst_buffer, dst_vacpage);
 
2099
                        }
 
2100
                        else
 
2101
                                LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
 
2102
 
 
2103
                        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
2104
 
 
2105
                        move_plain_tuple(onerel, buf, page, &tuple,
 
2106
                                                         dst_buffer, dst_page, dst_vacpage, &ec);
 
2107
 
 
2108
 
 
2109
                        num_moved++;
 
2110
                        if (dst_vacpage->blkno > last_move_dest_block)
 
2111
                                last_move_dest_block = dst_vacpage->blkno;
 
2112
 
 
2113
                        /*
 
2114
                         * Remember that we moved tuple from the current page
 
2115
                         * (corresponding index tuple will be cleaned).
 
2116
                         */
 
2117
                        vacpage->offsets[vacpage->offsets_free++] = offnum;
 
2118
                }                                               /* walk along page */
 
2119
 
 
2120
                /*
 
2121
                 * If we broke out of the walk-along-page loop early (ie, still
 
2122
                 * have offnum <= maxoff), then we failed to move some tuple off
 
2123
                 * this page.  No point in shrinking any more, so clean up and
 
2124
                 * exit the per-page loop.
 
2125
                 */
 
2126
                if (offnum < maxoff && keep_tuples > 0)
 
2127
                {
 
2128
                        OffsetNumber off;
 
2129
 
 
2130
                        /*
 
2131
                         * Fix vacpage state for any unvisited tuples remaining on
 
2132
                         * page
 
2133
                         */
 
2134
                        for (off = OffsetNumberNext(offnum);
 
2135
                                 off <= maxoff;
 
2136
                                 off = OffsetNumberNext(off))
 
2137
                        {
 
2138
                                ItemId          itemid = PageGetItemId(page, off);
 
2139
                                HeapTupleHeader htup;
 
2140
 
 
2141
                                if (!ItemIdIsUsed(itemid))
 
2142
                                        continue;
 
2143
                                htup = (HeapTupleHeader) PageGetItem(page, itemid);
 
2144
                                if (htup->t_infomask & HEAP_XMIN_COMMITTED)
 
2145
                                        continue;
 
2146
 
 
2147
                                /*
 
2148
                                 * See comments in the walk-along-page loop above about
 
2149
                                 * why only MOVED_OFF tuples should be found here.
 
2150
                                 */
 
2151
                                if (htup->t_infomask & HEAP_MOVED_IN)
 
2152
                                        elog(ERROR, "HEAP_MOVED_IN was not expected");
 
2153
                                if (!(htup->t_infomask & HEAP_MOVED_OFF))
 
2154
                                        elog(ERROR, "HEAP_MOVED_OFF was expected");
 
2155
                                if (HeapTupleHeaderGetXvac(htup) != myXID)
 
2156
                                        elog(ERROR, "invalid XVAC in tuple header");
 
2157
 
 
2158
                                if (chain_tuple_moved)
 
2159
                                {
 
2160
                                        /* some chains were moved while cleaning this page */
 
2161
                                        Assert(vacpage->offsets_free > 0);
 
2162
                                        for (i = 0; i < vacpage->offsets_free; i++)
 
2163
                                        {
 
2164
                                                if (vacpage->offsets[i] == off)
 
2165
                                                        break;
 
2166
                                        }
 
2167
                                        if (i >= vacpage->offsets_free)         /* not found */
 
2168
                                        {
 
2169
                                                vacpage->offsets[vacpage->offsets_free++] = off;
 
2170
                                                Assert(keep_tuples > 0);
 
2171
                                                keep_tuples--;
 
2172
                                        }
 
2173
                                }
 
2174
                                else
 
2175
                                {
 
2176
                                        vacpage->offsets[vacpage->offsets_free++] = off;
 
2177
                                        Assert(keep_tuples > 0);
 
2178
                                        keep_tuples--;
 
2179
                                }
 
2180
                        }
 
2181
                }
 
2182
 
 
2183
                if (vacpage->offsets_free > 0)  /* some tuples were moved */
 
2184
                {
 
2185
                        if (chain_tuple_moved)          /* else - they are ordered */
 
2186
                        {
 
2187
                                qsort((char *) (vacpage->offsets), vacpage->offsets_free,
 
2188
                                          sizeof(OffsetNumber), vac_cmp_offno);
 
2189
                        }
 
2190
                        vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
 
2191
                        WriteBuffer(buf);
 
2192
                }
 
2193
                else if (dowrite)
 
2194
                        WriteBuffer(buf);
 
2195
                else
 
2196
                        ReleaseBuffer(buf);
 
2197
 
 
2198
                if (offnum <= maxoff)
 
2199
                        break;                          /* had to quit early, see above note */
 
2200
 
 
2201
        }                                                       /* walk along relation */
 
2202
 
 
2203
        blkno++;                                        /* new number of blocks */
 
2204
 
 
2205
        if (dst_buffer != InvalidBuffer)
 
2206
        {
 
2207
                Assert(num_moved > 0);
 
2208
                WriteBuffer(dst_buffer);
 
2209
        }
 
2210
 
 
2211
        if (num_moved > 0)
 
2212
        {
 
2213
                /*
 
2214
                 * We have to commit our tuple movings before we truncate the
 
2215
                 * relation.  Ideally we should do Commit/StartTransactionCommand
 
2216
                 * here, relying on the session-level table lock to protect our
 
2217
                 * exclusive access to the relation.  However, that would require
 
2218
                 * a lot of extra code to close and re-open the relation, indexes,
 
2219
                 * etc.  For now, a quick hack: record status of current
 
2220
                 * transaction as committed, and continue.
 
2221
                 */
 
2222
                RecordTransactionCommit();
 
2223
        }
 
2224
 
 
2225
        /*
 
2226
         * We are not going to move any more tuples across pages, but we still
 
2227
         * need to apply vacuum_page to compact free space in the remaining
 
2228
         * pages in vacuum_pages list.  Note that some of these pages may also
 
2229
         * be in the fraged_pages list, and may have had tuples moved onto
 
2230
         * them; if so, we already did vacuum_page and needn't do it again.
 
2231
         */
 
2232
        for (i = 0, curpage = vacuum_pages->pagedesc;
 
2233
                 i < vacuumed_pages;
 
2234
                 i++, curpage++)
 
2235
        {
 
2236
                vacuum_delay_point();
 
2237
 
 
2238
                Assert((*curpage)->blkno < blkno);
 
2239
                if ((*curpage)->offsets_used == 0)
 
2240
                {
 
2241
                        Buffer          buf;
 
2242
                        Page            page;
 
2243
 
 
2244
                        /* this page was not used as a move target, so must clean it */
 
2245
                        buf = ReadBuffer(onerel, (*curpage)->blkno);
 
2246
                        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
2247
                        page = BufferGetPage(buf);
 
2248
                        if (!PageIsEmpty(page))
 
2249
                                vacuum_page(onerel, buf, *curpage);
 
2250
                        LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
2251
                        WriteBuffer(buf);
 
2252
                }
 
2253
        }
 
2254
 
 
2255
        /*
 
2256
         * Now scan all the pages that we moved tuples onto and update tuple
 
2257
         * status bits.  This is not really necessary, but will save time for
 
2258
         * future transactions examining these tuples.
 
2259
         */
 
2260
        update_hint_bits(onerel, fraged_pages, num_fraged_pages,
 
2261
                                         last_move_dest_block, num_moved);
 
2262
 
 
2263
        /*
 
2264
         * It'd be cleaner to make this report at the bottom of this routine,
 
2265
         * but then the rusage would double-count the second pass of index
 
2266
         * vacuuming.  So do it here and ignore the relatively small amount of
 
2267
         * processing that occurs below.
 
2268
         */
 
2269
        ereport(elevel,
 
2270
           (errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
 
2271
                           RelationGetRelationName(onerel),
 
2272
                           num_moved, nblocks, blkno),
 
2273
                errdetail("%s",
 
2274
                                  vac_show_rusage(&ru0))));
 
2275
 
 
2276
        /*
 
2277
         * Reflect the motion of system tuples to catalog cache here.
 
2278
         */
 
2279
        CommandCounterIncrement();
 
2280
 
 
2281
        if (Nvacpagelist.num_pages > 0)
 
2282
        {
 
2283
                /* vacuum indexes again if needed */
 
2284
                if (Irel != NULL)
 
2285
                {
 
2286
                        VacPage    *vpleft,
 
2287
                                           *vpright,
 
2288
                                                vpsave;
 
2289
 
 
2290
                        /* re-sort Nvacpagelist.pagedesc */
 
2291
                        for (vpleft = Nvacpagelist.pagedesc,
 
2292
                        vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
 
2293
                                 vpleft < vpright; vpleft++, vpright--)
 
2294
                        {
 
2295
                                vpsave = *vpleft;
 
2296
                                *vpleft = *vpright;
 
2297
                                *vpright = vpsave;
 
2298
                        }
 
2299
 
 
2300
                        /*
 
2301
                         * keep_tuples is the number of tuples that have been moved
 
2302
                         * off a page during chain moves but not been scanned over
 
2303
                         * subsequently.  The tuple ids of these tuples are not
 
2304
                         * recorded as free offsets for any VacPage, so they will not
 
2305
                         * be cleared from the indexes.
 
2306
                         */
 
2307
                        Assert(keep_tuples >= 0);
 
2308
                        for (i = 0; i < nindexes; i++)
 
2309
                                vacuum_index(&Nvacpagelist, Irel[i],
 
2310
                                                         vacrelstats->rel_tuples, keep_tuples);
 
2311
                }
 
2312
 
 
2313
                /*
 
2314
                 * Clean moved-off tuples from last page in Nvacpagelist list.
 
2315
                 *
 
2316
                 * We need only do this in this one page, because higher-numbered
 
2317
                 * pages are going to be truncated from the relation entirely.
 
2318
                 * But see comments for update_hint_bits().
 
2319
                 */
 
2320
                if (vacpage->blkno == (blkno - 1) &&
 
2321
                        vacpage->offsets_free > 0)
 
2322
                {
 
2323
                        Buffer          buf;
 
2324
                        Page            page;
 
2325
                        OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
 
2326
                        OffsetNumber offnum,
 
2327
                                                maxoff;
 
2328
                        int                     uncnt;
 
2329
                        int                     num_tuples = 0;
 
2330
 
 
2331
                        buf = ReadBuffer(onerel, vacpage->blkno);
 
2332
                        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
2333
                        page = BufferGetPage(buf);
 
2334
                        maxoff = PageGetMaxOffsetNumber(page);
 
2335
                        for (offnum = FirstOffsetNumber;
 
2336
                                 offnum <= maxoff;
 
2337
                                 offnum = OffsetNumberNext(offnum))
 
2338
                        {
 
2339
                                ItemId          itemid = PageGetItemId(page, offnum);
 
2340
                                HeapTupleHeader htup;
 
2341
 
 
2342
                                if (!ItemIdIsUsed(itemid))
 
2343
                                        continue;
 
2344
                                htup = (HeapTupleHeader) PageGetItem(page, itemid);
 
2345
                                if (htup->t_infomask & HEAP_XMIN_COMMITTED)
 
2346
                                        continue;
 
2347
 
 
2348
                                /*
 
2349
                                 * See comments in the walk-along-page loop above about
 
2350
                                 * why only MOVED_OFF tuples should be found here.
 
2351
                                 */
 
2352
                                if (htup->t_infomask & HEAP_MOVED_IN)
 
2353
                                        elog(ERROR, "HEAP_MOVED_IN was not expected");
 
2354
                                if (!(htup->t_infomask & HEAP_MOVED_OFF))
 
2355
                                        elog(ERROR, "HEAP_MOVED_OFF was expected");
 
2356
                                if (HeapTupleHeaderGetXvac(htup) != myXID)
 
2357
                                        elog(ERROR, "invalid XVAC in tuple header");
 
2358
 
 
2359
                                itemid->lp_flags &= ~LP_USED;
 
2360
                                num_tuples++;
 
2361
                        }
 
2362
                        Assert(vacpage->offsets_free == num_tuples);
 
2363
 
 
2364
                        START_CRIT_SECTION();
 
2365
 
 
2366
                        uncnt = PageRepairFragmentation(page, unused);
 
2367
 
 
2368
                        /* XLOG stuff */
 
2369
                        if (!onerel->rd_istemp)
 
2370
                        {
 
2371
                                XLogRecPtr      recptr;
 
2372
 
 
2373
                                recptr = log_heap_clean(onerel, buf, unused, uncnt);
 
2374
                                PageSetLSN(page, recptr);
 
2375
                                PageSetTLI(page, ThisTimeLineID);
 
2376
                        }
 
2377
                        else
 
2378
                        {
 
2379
                                /*
 
2380
                                 * No XLOG record, but still need to flag that XID exists
 
2381
                                 * on disk
 
2382
                                 */
 
2383
                                MyXactMadeTempRelUpdate = true;
 
2384
                        }
 
2385
 
 
2386
                        END_CRIT_SECTION();
 
2387
 
 
2388
                        LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
2389
                        WriteBuffer(buf);
 
2390
                }
 
2391
 
 
2392
                /* now - free new list of reaped pages */
 
2393
                curpage = Nvacpagelist.pagedesc;
 
2394
                for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
 
2395
                        pfree(*curpage);
 
2396
                pfree(Nvacpagelist.pagedesc);
 
2397
        }
 
2398
 
 
2399
        /*
 
2400
         * Flush dirty pages out to disk.  We do this unconditionally, even if
 
2401
         * we don't need to truncate, because we want to ensure that all
 
2402
         * tuples have correct on-row commit status on disk (see bufmgr.c's
 
2403
         * comments for FlushRelationBuffers()).
 
2404
         */
 
2405
        FlushRelationBuffers(onerel, blkno);
 
2406
 
 
2407
        /* truncate relation, if needed */
 
2408
        if (blkno < nblocks)
 
2409
        {
 
2410
                RelationTruncate(onerel, blkno);
 
2411
                vacrelstats->rel_pages = blkno; /* set new number of blocks */
 
2412
        }
 
2413
 
 
2414
        /* clean up */
 
2415
        pfree(vacpage);
 
2416
        if (vacrelstats->vtlinks != NULL)
 
2417
                pfree(vacrelstats->vtlinks);
 
2418
 
 
2419
        ExecContext_Finish(&ec);
 
2420
}
 
2421
 
 
2422
/*
 
2423
 *      move_chain_tuple() -- move one tuple that is part of a tuple chain
 
2424
 *
 
2425
 *              This routine moves old_tup from old_page to dst_page.
 
2426
 *              old_page and dst_page might be the same page.
 
2427
 *              On entry old_buf and dst_buf are locked exclusively, both locks (or
 
2428
 *              the single lock, if this is a intra-page-move) are released before
 
2429
 *              exit.
 
2430
 *
 
2431
 *              Yes, a routine with ten parameters is ugly, but it's still better
 
2432
 *              than having these 120 lines of code in repair_frag() which is
 
2433
 *              already too long and almost unreadable.
 
2434
 */
 
2435
static void
 
2436
move_chain_tuple(Relation rel,
 
2437
                                 Buffer old_buf, Page old_page, HeapTuple old_tup,
 
2438
                                 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
 
2439
                                 ExecContext ec, ItemPointer ctid, bool cleanVpd)
 
2440
{
 
2441
        TransactionId myXID = GetCurrentTransactionId();
 
2442
        HeapTupleData newtup;
 
2443
        OffsetNumber newoff;
 
2444
        ItemId          newitemid;
 
2445
        Size            tuple_len = old_tup->t_len;
 
2446
 
 
2447
        heap_copytuple_with_tuple(old_tup, &newtup);
 
2448
 
 
2449
        /*
 
2450
         * register invalidation of source tuple in catcaches.
 
2451
         */
 
2452
        CacheInvalidateHeapTuple(rel, old_tup);
 
2453
 
 
2454
        /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
 
2455
        START_CRIT_SECTION();
 
2456
 
 
2457
        old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
 
2458
                                                                         HEAP_XMIN_INVALID |
 
2459
                                                                         HEAP_MOVED_IN);
 
2460
        old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
 
2461
        HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
 
2462
 
 
2463
        /*
 
2464
         * If this page was not used before - clean it.
 
2465
         *
 
2466
         * NOTE: a nasty bug used to lurk here.  It is possible for the source
 
2467
         * and destination pages to be the same (since this tuple-chain member
 
2468
         * can be on a page lower than the one we're currently processing in
 
2469
         * the outer loop).  If that's true, then after vacuum_page() the
 
2470
         * source tuple will have been moved, and tuple.t_data will be
 
2471
         * pointing at garbage.  Therefore we must do everything that uses
 
2472
         * old_tup->t_data BEFORE this step!!
 
2473
         *
 
2474
         * This path is different from the other callers of vacuum_page, because
 
2475
         * we have already incremented the vacpage's offsets_used field to
 
2476
         * account for the tuple(s) we expect to move onto the page. Therefore
 
2477
         * vacuum_page's check for offsets_used == 0 is wrong. But since
 
2478
         * that's a good debugging check for all other callers, we work around
 
2479
         * it here rather than remove it.
 
2480
         */
 
2481
        if (!PageIsEmpty(dst_page) && cleanVpd)
 
2482
        {
 
2483
                int                     sv_offsets_used = dst_vacpage->offsets_used;
 
2484
 
 
2485
                dst_vacpage->offsets_used = 0;
 
2486
                vacuum_page(rel, dst_buf, dst_vacpage);
 
2487
                dst_vacpage->offsets_used = sv_offsets_used;
 
2488
        }
 
2489
 
 
2490
        /*
 
2491
         * Update the state of the copied tuple, and store it on the
 
2492
         * destination page.
 
2493
         */
 
2494
        newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
 
2495
                                                                   HEAP_XMIN_INVALID |
 
2496
                                                                   HEAP_MOVED_OFF);
 
2497
        newtup.t_data->t_infomask |= HEAP_MOVED_IN;
 
2498
        HeapTupleHeaderSetXvac(newtup.t_data, myXID);
 
2499
        newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
 
2500
                                                 InvalidOffsetNumber, LP_USED);
 
2501
        if (newoff == InvalidOffsetNumber)
 
2502
        {
 
2503
                elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
 
2504
                         (unsigned long) tuple_len, dst_vacpage->blkno);
 
2505
        }
 
2506
        newitemid = PageGetItemId(dst_page, newoff);
 
2507
        pfree(newtup.t_data);
 
2508
        newtup.t_datamcxt = NULL;
 
2509
        newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
 
2510
        ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
 
2511
 
 
2512
        /* XLOG stuff */
 
2513
        if (!rel->rd_istemp)
 
2514
        {
 
2515
                XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
 
2516
                                                                                   dst_buf, &newtup);
 
2517
 
 
2518
                if (old_buf != dst_buf)
 
2519
                {
 
2520
                        PageSetLSN(old_page, recptr);
 
2521
                        PageSetTLI(old_page, ThisTimeLineID);
 
2522
                }
 
2523
                PageSetLSN(dst_page, recptr);
 
2524
                PageSetTLI(dst_page, ThisTimeLineID);
 
2525
        }
 
2526
        else
 
2527
        {
 
2528
                /*
 
2529
                 * No XLOG record, but still need to flag that XID exists on disk
 
2530
                 */
 
2531
                MyXactMadeTempRelUpdate = true;
 
2532
        }
 
2533
 
 
2534
        END_CRIT_SECTION();
 
2535
 
 
2536
        /*
 
2537
         * Set new tuple's t_ctid pointing to itself for last tuple in chain,
 
2538
         * and to next tuple in chain otherwise.
 
2539
         */
 
2540
        /* Is this ok after log_heap_move() and END_CRIT_SECTION()? */
 
2541
        if (!ItemPointerIsValid(ctid))
 
2542
                newtup.t_data->t_ctid = newtup.t_self;
 
2543
        else
 
2544
                newtup.t_data->t_ctid = *ctid;
 
2545
        *ctid = newtup.t_self;
 
2546
 
 
2547
        LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
 
2548
        if (dst_buf != old_buf)
 
2549
                LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
 
2550
 
 
2551
        /* Create index entries for the moved tuple */
 
2552
        if (ec->resultRelInfo->ri_NumIndices > 0)
 
2553
        {
 
2554
                ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
 
2555
                ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
 
2556
                ResetPerTupleExprContext(ec->estate);
 
2557
        }
 
2558
}
 
2559
 
 
2560
/*
 
2561
 *      move_plain_tuple() -- move one tuple that is not part of a chain
 
2562
 *
 
2563
 *              This routine moves old_tup from old_page to dst_page.
 
2564
 *              On entry old_buf and dst_buf are locked exclusively, both locks are
 
2565
 *              released before exit.
 
2566
 *
 
2567
 *              Yes, a routine with eight parameters is ugly, but it's still better
 
2568
 *              than having these 90 lines of code in repair_frag() which is already
 
2569
 *              too long and almost unreadable.
 
2570
 */
 
2571
static void
 
2572
move_plain_tuple(Relation rel,
 
2573
                                 Buffer old_buf, Page old_page, HeapTuple old_tup,
 
2574
                                 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
 
2575
                                 ExecContext ec)
 
2576
{
 
2577
        TransactionId myXID = GetCurrentTransactionId();
 
2578
        HeapTupleData newtup;
 
2579
        OffsetNumber newoff;
 
2580
        ItemId          newitemid;
 
2581
        Size            tuple_len = old_tup->t_len;
 
2582
 
 
2583
        /* copy tuple */
 
2584
        heap_copytuple_with_tuple(old_tup, &newtup);
 
2585
 
 
2586
        /*
 
2587
         * register invalidation of source tuple in catcaches.
 
2588
         *
 
2589
         * (Note: we do not need to register the copied tuple, because we are not
 
2590
         * changing the tuple contents and so there cannot be any need to
 
2591
         * flush negative catcache entries.)
 
2592
         */
 
2593
        CacheInvalidateHeapTuple(rel, old_tup);
 
2594
 
 
2595
        /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
 
2596
        START_CRIT_SECTION();
 
2597
 
 
2598
        /*
 
2599
         * Mark new tuple as MOVED_IN by me.
 
2600
         */
 
2601
        newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
 
2602
                                                                   HEAP_XMIN_INVALID |
 
2603
                                                                   HEAP_MOVED_OFF);
 
2604
        newtup.t_data->t_infomask |= HEAP_MOVED_IN;
 
2605
        HeapTupleHeaderSetXvac(newtup.t_data, myXID);
 
2606
 
 
2607
        /* add tuple to the page */
 
2608
        newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
 
2609
                                                 InvalidOffsetNumber, LP_USED);
 
2610
        if (newoff == InvalidOffsetNumber)
 
2611
        {
 
2612
                elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
 
2613
                         (unsigned long) tuple_len,
 
2614
                         dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
 
2615
                         dst_vacpage->offsets_used, dst_vacpage->offsets_free);
 
2616
        }
 
2617
        newitemid = PageGetItemId(dst_page, newoff);
 
2618
        pfree(newtup.t_data);
 
2619
        newtup.t_datamcxt = NULL;
 
2620
        newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
 
2621
        ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
 
2622
        newtup.t_self = newtup.t_data->t_ctid;
 
2623
 
 
2624
        /*
 
2625
         * Mark old tuple as MOVED_OFF by me.
 
2626
         */
 
2627
        old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
 
2628
                                                                         HEAP_XMIN_INVALID |
 
2629
                                                                         HEAP_MOVED_IN);
 
2630
        old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
 
2631
        HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
 
2632
 
 
2633
        /* XLOG stuff */
 
2634
        if (!rel->rd_istemp)
 
2635
        {
 
2636
                XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
 
2637
                                                                                   dst_buf, &newtup);
 
2638
 
 
2639
                PageSetLSN(old_page, recptr);
 
2640
                PageSetTLI(old_page, ThisTimeLineID);
 
2641
                PageSetLSN(dst_page, recptr);
 
2642
                PageSetTLI(dst_page, ThisTimeLineID);
 
2643
        }
 
2644
        else
 
2645
        {
 
2646
                /*
 
2647
                 * No XLOG record, but still need to flag that XID exists on disk
 
2648
                 */
 
2649
                MyXactMadeTempRelUpdate = true;
 
2650
        }
 
2651
 
 
2652
        END_CRIT_SECTION();
 
2653
 
 
2654
        dst_vacpage->free = ((PageHeader) dst_page)->pd_upper -
 
2655
                ((PageHeader) dst_page)->pd_lower;
 
2656
        LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
 
2657
        LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
 
2658
 
 
2659
        dst_vacpage->offsets_used++;
 
2660
 
 
2661
        /* insert index' tuples if needed */
 
2662
        if (ec->resultRelInfo->ri_NumIndices > 0)
 
2663
        {
 
2664
                ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
 
2665
                ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
 
2666
                ResetPerTupleExprContext(ec->estate);
 
2667
        }
 
2668
}
 
2669
 
 
2670
/*
 
2671
 *      update_hint_bits() -- update hint bits in destination pages
 
2672
 *
 
2673
 * Scan all the pages that we moved tuples onto and update tuple status bits.
 
2674
 * This is normally not really necessary, but it will save time for future
 
2675
 * transactions examining these tuples.
 
2676
 *
 
2677
 * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
 
2678
 * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
 
2679
 *
 
2680
 * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
 
2681
 * pages that were move source pages but not move dest pages.  The bulk
 
2682
 * of the move source pages will be physically truncated from the relation,
 
2683
 * and the last page remaining in the rel will be fixed separately in
 
2684
 * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
 
2685
 * hint bits updated are tuples that are moved as part of a chain and were
 
2686
 * on pages that were not either move destinations nor at the end of the rel.
 
2687
 * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
 
2688
 * to remember and revisit those pages too.
 
2689
 *
 
2690
 * Because of this omission, VACUUM FULL FREEZE is not a safe combination;
 
2691
 * it's possible that the VACUUM's own XID remains exposed as something that
 
2692
 * tqual tests would need to check.
 
2693
 *
 
2694
 * For the non-freeze case, one wonders whether it wouldn't be better to skip
 
2695
 * this work entirely, and let the tuple status updates happen someplace
 
2696
 * that's not holding an exclusive lock on the relation.
 
2697
 */
 
2698
static void
 
2699
update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
 
2700
                                 BlockNumber last_move_dest_block, int num_moved)
 
2701
{
 
2702
        TransactionId myXID = GetCurrentTransactionId();
 
2703
        int                     checked_moved = 0;
 
2704
        int                     i;
 
2705
        VacPage    *curpage;
 
2706
 
 
2707
        for (i = 0, curpage = fraged_pages->pagedesc;
 
2708
                 i < num_fraged_pages;
 
2709
                 i++, curpage++)
 
2710
        {
 
2711
                Buffer          buf;
 
2712
                Page            page;
 
2713
                OffsetNumber max_offset;
 
2714
                OffsetNumber off;
 
2715
                int                     num_tuples = 0;
 
2716
 
 
2717
                vacuum_delay_point();
 
2718
 
 
2719
                if ((*curpage)->blkno > last_move_dest_block)
 
2720
                        break;                          /* no need to scan any further */
 
2721
                if ((*curpage)->offsets_used == 0)
 
2722
                        continue;                       /* this page was never used as a move dest */
 
2723
                buf = ReadBuffer(rel, (*curpage)->blkno);
 
2724
                LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
2725
                page = BufferGetPage(buf);
 
2726
                max_offset = PageGetMaxOffsetNumber(page);
 
2727
                for (off = FirstOffsetNumber;
 
2728
                         off <= max_offset;
 
2729
                         off = OffsetNumberNext(off))
 
2730
                {
 
2731
                        ItemId          itemid = PageGetItemId(page, off);
 
2732
                        HeapTupleHeader htup;
 
2733
 
 
2734
                        if (!ItemIdIsUsed(itemid))
 
2735
                                continue;
 
2736
                        htup = (HeapTupleHeader) PageGetItem(page, itemid);
 
2737
                        if (htup->t_infomask & HEAP_XMIN_COMMITTED)
 
2738
                                continue;
 
2739
 
 
2740
                        /*
 
2741
                         * Here we may see either MOVED_OFF or MOVED_IN tuples.
 
2742
                         */
 
2743
                        if (!(htup->t_infomask & HEAP_MOVED))
 
2744
                                elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
 
2745
                        if (HeapTupleHeaderGetXvac(htup) != myXID)
 
2746
                                elog(ERROR, "invalid XVAC in tuple header");
 
2747
 
 
2748
                        if (htup->t_infomask & HEAP_MOVED_IN)
 
2749
                        {
 
2750
                                htup->t_infomask |= HEAP_XMIN_COMMITTED;
 
2751
                                htup->t_infomask &= ~HEAP_MOVED;
 
2752
                                num_tuples++;
 
2753
                        }
 
2754
                        else
 
2755
                                htup->t_infomask |= HEAP_XMIN_INVALID;
 
2756
                }
 
2757
                LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
2758
                WriteBuffer(buf);
 
2759
                Assert((*curpage)->offsets_used == num_tuples);
 
2760
                checked_moved += num_tuples;
 
2761
        }
 
2762
        Assert(num_moved == checked_moved);
 
2763
}
 
2764
 
 
2765
/*
 
2766
 *      vacuum_heap() -- free dead tuples
 
2767
 *
 
2768
 *              This routine marks dead tuples as unused and truncates relation
 
2769
 *              if there are "empty" end-blocks.
 
2770
 */
 
2771
static void
 
2772
vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
 
2773
{
 
2774
        Buffer          buf;
 
2775
        VacPage    *vacpage;
 
2776
        BlockNumber relblocks;
 
2777
        int                     nblocks;
 
2778
        int                     i;
 
2779
 
 
2780
        nblocks = vacuum_pages->num_pages;
 
2781
        nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
 
2782
 
 
2783
        for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
 
2784
        {
 
2785
                vacuum_delay_point();
 
2786
 
 
2787
                if ((*vacpage)->offsets_free > 0)
 
2788
                {
 
2789
                        buf = ReadBuffer(onerel, (*vacpage)->blkno);
 
2790
                        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
2791
                        vacuum_page(onerel, buf, *vacpage);
 
2792
                        LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
2793
                        WriteBuffer(buf);
 
2794
                }
 
2795
        }
 
2796
 
 
2797
        /*
 
2798
         * Flush dirty pages out to disk.  We do this unconditionally, even if
 
2799
         * we don't need to truncate, because we want to ensure that all
 
2800
         * tuples have correct on-row commit status on disk (see bufmgr.c's
 
2801
         * comments for FlushRelationBuffers()).
 
2802
         */
 
2803
        Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
 
2804
        relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
 
2805
 
 
2806
        FlushRelationBuffers(onerel, relblocks);
 
2807
 
 
2808
        /* truncate relation if there are some empty end-pages */
 
2809
        if (vacuum_pages->empty_end_pages > 0)
 
2810
        {
 
2811
                ereport(elevel,
 
2812
                                (errmsg("\"%s\": truncated %u to %u pages",
 
2813
                                                RelationGetRelationName(onerel),
 
2814
                                                vacrelstats->rel_pages, relblocks)));
 
2815
                RelationTruncate(onerel, relblocks);
 
2816
                vacrelstats->rel_pages = relblocks;             /* set new number of
 
2817
                                                                                                 * blocks */
 
2818
        }
 
2819
}
 
2820
 
 
2821
/*
 
2822
 *      vacuum_page() -- free dead tuples on a page
 
2823
 *                                       and repair its fragmentation.
 
2824
 */
 
2825
static void
 
2826
vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
 
2827
{
 
2828
        OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
 
2829
        int                     uncnt;
 
2830
        Page            page = BufferGetPage(buffer);
 
2831
        ItemId          itemid;
 
2832
        int                     i;
 
2833
 
 
2834
        /* There shouldn't be any tuples moved onto the page yet! */
 
2835
        Assert(vacpage->offsets_used == 0);
 
2836
 
 
2837
        START_CRIT_SECTION();
 
2838
 
 
2839
        for (i = 0; i < vacpage->offsets_free; i++)
 
2840
        {
 
2841
                itemid = PageGetItemId(page, vacpage->offsets[i]);
 
2842
                itemid->lp_flags &= ~LP_USED;
 
2843
        }
 
2844
 
 
2845
        uncnt = PageRepairFragmentation(page, unused);
 
2846
 
 
2847
        /* XLOG stuff */
 
2848
        if (!onerel->rd_istemp)
 
2849
        {
 
2850
                XLogRecPtr      recptr;
 
2851
 
 
2852
                recptr = log_heap_clean(onerel, buffer, unused, uncnt);
 
2853
                PageSetLSN(page, recptr);
 
2854
                PageSetTLI(page, ThisTimeLineID);
 
2855
        }
 
2856
        else
 
2857
        {
 
2858
                /* No XLOG record, but still need to flag that XID exists on disk */
 
2859
                MyXactMadeTempRelUpdate = true;
 
2860
        }
 
2861
 
 
2862
        END_CRIT_SECTION();
 
2863
}
 
2864
 
 
2865
/*
 
2866
 *      scan_index() -- scan one index relation to update statistic.
 
2867
 *
 
2868
 * We use this when we have no deletions to do.
 
2869
 */
 
2870
static void
 
2871
scan_index(Relation indrel, double num_tuples)
 
2872
{
 
2873
        IndexBulkDeleteResult *stats;
 
2874
        IndexVacuumCleanupInfo vcinfo;
 
2875
        VacRUsage       ru0;
 
2876
 
 
2877
        vac_init_rusage(&ru0);
 
2878
 
 
2879
        /*
 
2880
         * Even though we're not planning to delete anything, we use the
 
2881
         * ambulkdelete call, because (a) the scan happens within the index AM
 
2882
         * for more speed, and (b) it may want to pass private statistics to
 
2883
         * the amvacuumcleanup call.
 
2884
         */
 
2885
        stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
 
2886
 
 
2887
        /* Do post-VACUUM cleanup, even though we deleted nothing */
 
2888
        vcinfo.vacuum_full = true;
 
2889
        vcinfo.message_level = elevel;
 
2890
 
 
2891
        stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
 
2892
 
 
2893
        if (!stats)
 
2894
                return;
 
2895
 
 
2896
        /* now update statistics in pg_class */
 
2897
        vac_update_relstats(RelationGetRelid(indrel),
 
2898
                                                stats->num_pages, stats->num_index_tuples,
 
2899
                                                false);
 
2900
 
 
2901
        ereport(elevel,
 
2902
           (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
 
2903
                           RelationGetRelationName(indrel),
 
2904
                           stats->num_index_tuples,
 
2905
                           stats->num_pages),
 
2906
                errdetail("%u index pages have been deleted, %u are currently reusable.\n"
 
2907
                                  "%s",
 
2908
                                  stats->pages_deleted, stats->pages_free,
 
2909
                                  vac_show_rusage(&ru0))));
 
2910
 
 
2911
        /*
 
2912
         * Check for tuple count mismatch.      If the index is partial, then it's
 
2913
         * OK for it to have fewer tuples than the heap; else we got trouble.
 
2914
         */
 
2915
        if (stats->num_index_tuples != num_tuples)
 
2916
        {
 
2917
                if (stats->num_index_tuples > num_tuples ||
 
2918
                        !vac_is_partial_index(indrel))
 
2919
                        ereport(WARNING,
 
2920
                                        (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
 
2921
                                                        RelationGetRelationName(indrel),
 
2922
                                                        stats->num_index_tuples, num_tuples),
 
2923
                                         errhint("Rebuild the index with REINDEX.")));
 
2924
        }
 
2925
 
 
2926
        pfree(stats);
 
2927
}
 
2928
 
 
2929
/*
 
2930
 *      vacuum_index() -- vacuum one index relation.
 
2931
 *
 
2932
 *              Vpl is the VacPageList of the heap we're currently vacuuming.
 
2933
 *              It's locked. Indrel is an index relation on the vacuumed heap.
 
2934
 *
 
2935
 *              We don't bother to set locks on the index relation here, since
 
2936
 *              the parent table is exclusive-locked already.
 
2937
 *
 
2938
 *              Finally, we arrange to update the index relation's statistics in
 
2939
 *              pg_class.
 
2940
 */
 
2941
static void
 
2942
vacuum_index(VacPageList vacpagelist, Relation indrel,
 
2943
                         double num_tuples, int keep_tuples)
 
2944
{
 
2945
        IndexBulkDeleteResult *stats;
 
2946
        IndexVacuumCleanupInfo vcinfo;
 
2947
        VacRUsage       ru0;
 
2948
 
 
2949
        vac_init_rusage(&ru0);
 
2950
 
 
2951
        /* Do bulk deletion */
 
2952
        stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
 
2953
 
 
2954
        /* Do post-VACUUM cleanup */
 
2955
        vcinfo.vacuum_full = true;
 
2956
        vcinfo.message_level = elevel;
 
2957
 
 
2958
        stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
 
2959
 
 
2960
        if (!stats)
 
2961
                return;
 
2962
 
 
2963
        /* now update statistics in pg_class */
 
2964
        vac_update_relstats(RelationGetRelid(indrel),
 
2965
                                                stats->num_pages, stats->num_index_tuples,
 
2966
                                                false);
 
2967
 
 
2968
        ereport(elevel,
 
2969
           (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
 
2970
                           RelationGetRelationName(indrel),
 
2971
                           stats->num_index_tuples,
 
2972
                           stats->num_pages),
 
2973
                errdetail("%.0f index row versions were removed.\n"
 
2974
                 "%u index pages have been deleted, %u are currently reusable.\n"
 
2975
                                  "%s",
 
2976
                                  stats->tuples_removed,
 
2977
                                  stats->pages_deleted, stats->pages_free,
 
2978
                                  vac_show_rusage(&ru0))));
 
2979
 
 
2980
        /*
 
2981
         * Check for tuple count mismatch.      If the index is partial, then it's
 
2982
         * OK for it to have fewer tuples than the heap; else we got trouble.
 
2983
         */
 
2984
        if (stats->num_index_tuples != num_tuples + keep_tuples)
 
2985
        {
 
2986
                if (stats->num_index_tuples > num_tuples + keep_tuples ||
 
2987
                        !vac_is_partial_index(indrel))
 
2988
                        ereport(WARNING,
 
2989
                                        (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
 
2990
                                                        RelationGetRelationName(indrel),
 
2991
                                          stats->num_index_tuples, num_tuples + keep_tuples),
 
2992
                                         errhint("Rebuild the index with REINDEX.")));
 
2993
        }
 
2994
 
 
2995
        pfree(stats);
 
2996
}
 
2997
 
 
2998
/*
 
2999
 *      tid_reaped() -- is a particular tid reaped?
 
3000
 *
 
3001
 *              This has the right signature to be an IndexBulkDeleteCallback.
 
3002
 *
 
3003
 *              vacpagelist->VacPage_array is sorted in right order.
 
3004
 */
 
3005
static bool
 
3006
tid_reaped(ItemPointer itemptr, void *state)
 
3007
{
 
3008
        VacPageList vacpagelist = (VacPageList) state;
 
3009
        OffsetNumber ioffno;
 
3010
        OffsetNumber *voff;
 
3011
        VacPage         vp,
 
3012
                           *vpp;
 
3013
        VacPageData vacpage;
 
3014
 
 
3015
        vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
 
3016
        ioffno = ItemPointerGetOffsetNumber(itemptr);
 
3017
 
 
3018
        vp = &vacpage;
 
3019
        vpp = (VacPage *) vac_bsearch((void *) &vp,
 
3020
                                                                  (void *) (vacpagelist->pagedesc),
 
3021
                                                                  vacpagelist->num_pages,
 
3022
                                                                  sizeof(VacPage),
 
3023
                                                                  vac_cmp_blk);
 
3024
 
 
3025
        if (vpp == NULL)
 
3026
                return false;
 
3027
 
 
3028
        /* ok - we are on a partially or fully reaped page */
 
3029
        vp = *vpp;
 
3030
 
 
3031
        if (vp->offsets_free == 0)
 
3032
        {
 
3033
                /* this is EmptyPage, so claim all tuples on it are reaped!!! */
 
3034
                return true;
 
3035
        }
 
3036
 
 
3037
        voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
 
3038
                                                                                (void *) (vp->offsets),
 
3039
                                                                                vp->offsets_free,
 
3040
                                                                                sizeof(OffsetNumber),
 
3041
                                                                                vac_cmp_offno);
 
3042
 
 
3043
        if (voff == NULL)
 
3044
                return false;
 
3045
 
 
3046
        /* tid is reaped */
 
3047
        return true;
 
3048
}
 
3049
 
 
3050
/*
 
3051
 * Dummy version for scan_index.
 
3052
 */
 
3053
static bool
 
3054
dummy_tid_reaped(ItemPointer itemptr, void *state)
 
3055
{
 
3056
        return false;
 
3057
}
 
3058
 
 
3059
/*
 
3060
 * Update the shared Free Space Map with the info we now have about
 
3061
 * free space in the relation, discarding any old info the map may have.
 
3062
 */
 
3063
static void
 
3064
vac_update_fsm(Relation onerel, VacPageList fraged_pages,
 
3065
                           BlockNumber rel_pages)
 
3066
{
 
3067
        int                     nPages = fraged_pages->num_pages;
 
3068
        VacPage    *pagedesc = fraged_pages->pagedesc;
 
3069
        Size            threshold;
 
3070
        PageFreeSpaceInfo *pageSpaces;
 
3071
        int                     outPages;
 
3072
        int                     i;
 
3073
 
 
3074
        /*
 
3075
         * We only report pages with free space at least equal to the average
 
3076
         * request size --- this avoids cluttering FSM with uselessly-small
 
3077
         * bits of space.  Although FSM would discard pages with little free
 
3078
         * space anyway, it's important to do this prefiltering because (a) it
 
3079
         * reduces the time spent holding the FSM lock in
 
3080
         * RecordRelationFreeSpace, and (b) FSM uses the number of pages
 
3081
         * reported as a statistic for guiding space management.  If we didn't
 
3082
         * threshold our reports the same way vacuumlazy.c does, we'd be
 
3083
         * skewing that statistic.
 
3084
         */
 
3085
        threshold = GetAvgFSMRequestSize(&onerel->rd_node);
 
3086
 
 
3087
        pageSpaces = (PageFreeSpaceInfo *)
 
3088
                palloc(nPages * sizeof(PageFreeSpaceInfo));
 
3089
        outPages = 0;
 
3090
 
 
3091
        for (i = 0; i < nPages; i++)
 
3092
        {
 
3093
                /*
 
3094
                 * fraged_pages may contain entries for pages that we later
 
3095
                 * decided to truncate from the relation; don't enter them into
 
3096
                 * the free space map!
 
3097
                 */
 
3098
                if (pagedesc[i]->blkno >= rel_pages)
 
3099
                        break;
 
3100
 
 
3101
                if (pagedesc[i]->free >= threshold)
 
3102
                {
 
3103
                        pageSpaces[outPages].blkno = pagedesc[i]->blkno;
 
3104
                        pageSpaces[outPages].avail = pagedesc[i]->free;
 
3105
                        outPages++;
 
3106
                }
 
3107
        }
 
3108
 
 
3109
        RecordRelationFreeSpace(&onerel->rd_node, outPages, pageSpaces);
 
3110
 
 
3111
        pfree(pageSpaces);
 
3112
}
 
3113
 
 
3114
/* Copy a VacPage structure */
 
3115
static VacPage
 
3116
copy_vac_page(VacPage vacpage)
 
3117
{
 
3118
        VacPage         newvacpage;
 
3119
 
 
3120
        /* allocate a VacPageData entry */
 
3121
        newvacpage = (VacPage) palloc(sizeof(VacPageData) +
 
3122
                                                   vacpage->offsets_free * sizeof(OffsetNumber));
 
3123
 
 
3124
        /* fill it in */
 
3125
        if (vacpage->offsets_free > 0)
 
3126
                memcpy(newvacpage->offsets, vacpage->offsets,
 
3127
                           vacpage->offsets_free * sizeof(OffsetNumber));
 
3128
        newvacpage->blkno = vacpage->blkno;
 
3129
        newvacpage->free = vacpage->free;
 
3130
        newvacpage->offsets_used = vacpage->offsets_used;
 
3131
        newvacpage->offsets_free = vacpage->offsets_free;
 
3132
 
 
3133
        return newvacpage;
 
3134
}
 
3135
 
 
3136
/*
 
3137
 * Add a VacPage pointer to a VacPageList.
 
3138
 *
 
3139
 *              As a side effect of the way that scan_heap works,
 
3140
 *              higher pages come after lower pages in the array
 
3141
 *              (and highest tid on a page is last).
 
3142
 */
 
3143
static void
 
3144
vpage_insert(VacPageList vacpagelist, VacPage vpnew)
 
3145
{
 
3146
#define PG_NPAGEDESC 1024
 
3147
 
 
3148
        /* allocate a VacPage entry if needed */
 
3149
        if (vacpagelist->num_pages == 0)
 
3150
        {
 
3151
                vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
 
3152
                vacpagelist->num_allocated_pages = PG_NPAGEDESC;
 
3153
        }
 
3154
        else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
 
3155
        {
 
3156
                vacpagelist->num_allocated_pages *= 2;
 
3157
                vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
 
3158
        }
 
3159
        vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
 
3160
        (vacpagelist->num_pages)++;
 
3161
}
 
3162
 
 
3163
/*
 
3164
 * vac_bsearch: just like standard C library routine bsearch(),
 
3165
 * except that we first test to see whether the target key is outside
 
3166
 * the range of the table entries.      This case is handled relatively slowly
 
3167
 * by the normal binary search algorithm (ie, no faster than any other key)
 
3168
 * but it occurs often enough in VACUUM to be worth optimizing.
 
3169
 */
 
3170
static void *
 
3171
vac_bsearch(const void *key, const void *base,
 
3172
                        size_t nelem, size_t size,
 
3173
                        int (*compar) (const void *, const void *))
 
3174
{
 
3175
        int                     res;
 
3176
        const void *last;
 
3177
 
 
3178
        if (nelem == 0)
 
3179
                return NULL;
 
3180
        res = compar(key, base);
 
3181
        if (res < 0)
 
3182
                return NULL;
 
3183
        if (res == 0)
 
3184
                return (void *) base;
 
3185
        if (nelem > 1)
 
3186
        {
 
3187
                last = (const void *) ((const char *) base + (nelem - 1) * size);
 
3188
                res = compar(key, last);
 
3189
                if (res > 0)
 
3190
                        return NULL;
 
3191
                if (res == 0)
 
3192
                        return (void *) last;
 
3193
        }
 
3194
        if (nelem <= 2)
 
3195
                return NULL;                    /* already checked 'em all */
 
3196
        return bsearch(key, base, nelem, size, compar);
 
3197
}
 
3198
 
 
3199
/*
 
3200
 * Comparator routines for use with qsort() and bsearch().
 
3201
 */
 
3202
static int
 
3203
vac_cmp_blk(const void *left, const void *right)
 
3204
{
 
3205
        BlockNumber lblk,
 
3206
                                rblk;
 
3207
 
 
3208
        lblk = (*((VacPage *) left))->blkno;
 
3209
        rblk = (*((VacPage *) right))->blkno;
 
3210
 
 
3211
        if (lblk < rblk)
 
3212
                return -1;
 
3213
        if (lblk == rblk)
 
3214
                return 0;
 
3215
        return 1;
 
3216
}
 
3217
 
 
3218
static int
 
3219
vac_cmp_offno(const void *left, const void *right)
 
3220
{
 
3221
        if (*(OffsetNumber *) left < *(OffsetNumber *) right)
 
3222
                return -1;
 
3223
        if (*(OffsetNumber *) left == *(OffsetNumber *) right)
 
3224
                return 0;
 
3225
        return 1;
 
3226
}
 
3227
 
 
3228
static int
 
3229
vac_cmp_vtlinks(const void *left, const void *right)
 
3230
{
 
3231
        if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
 
3232
                ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
 
3233
                return -1;
 
3234
        if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
 
3235
                ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
 
3236
                return 1;
 
3237
        /* bi_hi-es are equal */
 
3238
        if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
 
3239
                ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
 
3240
                return -1;
 
3241
        if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
 
3242
                ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
 
3243
                return 1;
 
3244
        /* bi_lo-es are equal */
 
3245
        if (((VTupleLink) left)->new_tid.ip_posid <
 
3246
                ((VTupleLink) right)->new_tid.ip_posid)
 
3247
                return -1;
 
3248
        if (((VTupleLink) left)->new_tid.ip_posid >
 
3249
                ((VTupleLink) right)->new_tid.ip_posid)
 
3250
                return 1;
 
3251
        return 0;
 
3252
}
 
3253
 
 
3254
 
 
3255
/*
 
3256
 * Open all the indexes of the given relation, obtaining the specified kind
 
3257
 * of lock on each.  Return an array of Relation pointers for the indexes
 
3258
 * into *Irel, and the number of indexes into *nindexes.
 
3259
 */
 
3260
void
 
3261
vac_open_indexes(Relation relation, LOCKMODE lockmode,
 
3262
                                 int *nindexes, Relation **Irel)
 
3263
{
 
3264
        List       *indexoidlist;
 
3265
        ListCell   *indexoidscan;
 
3266
        int                     i;
 
3267
 
 
3268
        indexoidlist = RelationGetIndexList(relation);
 
3269
 
 
3270
        *nindexes = list_length(indexoidlist);
 
3271
 
 
3272
        if (*nindexes > 0)
 
3273
                *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
 
3274
        else
 
3275
                *Irel = NULL;
 
3276
 
 
3277
        i = 0;
 
3278
        foreach(indexoidscan, indexoidlist)
 
3279
        {
 
3280
                Oid                     indexoid = lfirst_oid(indexoidscan);
 
3281
                Relation        ind;
 
3282
 
 
3283
                ind = index_open(indexoid);
 
3284
                (*Irel)[i++] = ind;
 
3285
                LockRelation(ind, lockmode);
 
3286
        }
 
3287
 
 
3288
        list_free(indexoidlist);
 
3289
}
 
3290
 
 
3291
/*
 
3292
 * Release the resources acquired by vac_open_indexes.  Optionally release
 
3293
 * the locks (say NoLock to keep 'em).
 
3294
 */
 
3295
void
 
3296
vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
 
3297
{
 
3298
        if (Irel == NULL)
 
3299
                return;
 
3300
 
 
3301
        while (nindexes--)
 
3302
        {
 
3303
                Relation        ind = Irel[nindexes];
 
3304
 
 
3305
                if (lockmode != NoLock)
 
3306
                        UnlockRelation(ind, lockmode);
 
3307
                index_close(ind);
 
3308
        }
 
3309
        pfree(Irel);
 
3310
}
 
3311
 
 
3312
 
 
3313
/*
 
3314
 * Is an index partial (ie, could it contain fewer tuples than the heap?)
 
3315
 */
 
3316
bool
 
3317
vac_is_partial_index(Relation indrel)
 
3318
{
 
3319
        /*
 
3320
         * If the index's AM doesn't support nulls, it's partial for our
 
3321
         * purposes
 
3322
         */
 
3323
        if (!indrel->rd_am->amindexnulls)
 
3324
                return true;
 
3325
 
 
3326
        /* Otherwise, look to see if there's a partial-index predicate */
 
3327
        if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
 
3328
                return true;
 
3329
 
 
3330
        return false;
 
3331
}
 
3332
 
 
3333
 
 
3334
static bool
 
3335
enough_space(VacPage vacpage, Size len)
 
3336
{
 
3337
        len = MAXALIGN(len);
 
3338
 
 
3339
        if (len > vacpage->free)
 
3340
                return false;
 
3341
 
 
3342
        /* if there are free itemid(s) and len <= free_space... */
 
3343
        if (vacpage->offsets_used < vacpage->offsets_free)
 
3344
                return true;
 
3345
 
 
3346
        /* noff_used >= noff_free and so we'll have to allocate new itemid */
 
3347
        if (len + sizeof(ItemIdData) <= vacpage->free)
 
3348
                return true;
 
3349
 
 
3350
        return false;
 
3351
}
 
3352
 
 
3353
 
 
3354
/*
 
3355
 * Initialize usage snapshot.
 
3356
 */
 
3357
void
 
3358
vac_init_rusage(VacRUsage *ru0)
 
3359
{
 
3360
        struct timezone tz;
 
3361
 
 
3362
        getrusage(RUSAGE_SELF, &ru0->ru);
 
3363
        gettimeofday(&ru0->tv, &tz);
 
3364
}
 
3365
 
 
3366
/*
 
3367
 * Compute elapsed time since ru0 usage snapshot, and format into
 
3368
 * a displayable string.  Result is in a static string, which is
 
3369
 * tacky, but no one ever claimed that the Postgres backend is
 
3370
 * threadable...
 
3371
 */
 
3372
const char *
 
3373
vac_show_rusage(VacRUsage *ru0)
 
3374
{
 
3375
        static char result[100];
 
3376
        VacRUsage       ru1;
 
3377
 
 
3378
        vac_init_rusage(&ru1);
 
3379
 
 
3380
        if (ru1.tv.tv_usec < ru0->tv.tv_usec)
 
3381
        {
 
3382
                ru1.tv.tv_sec--;
 
3383
                ru1.tv.tv_usec += 1000000;
 
3384
        }
 
3385
        if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
 
3386
        {
 
3387
                ru1.ru.ru_stime.tv_sec--;
 
3388
                ru1.ru.ru_stime.tv_usec += 1000000;
 
3389
        }
 
3390
        if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
 
3391
        {
 
3392
                ru1.ru.ru_utime.tv_sec--;
 
3393
                ru1.ru.ru_utime.tv_usec += 1000000;
 
3394
        }
 
3395
 
 
3396
        snprintf(result, sizeof(result),
 
3397
                         "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
 
3398
                         (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
 
3399
          (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
 
3400
                         (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
 
3401
          (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
 
3402
                         (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
 
3403
                         (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
 
3404
 
 
3405
        return result;
 
3406
}
 
3407
 
 
3408
/*
 
3409
 * vacuum_delay_point --- check for interrupts and cost-based delay.
 
3410
 *
 
3411
 * This should be called in each major loop of VACUUM processing,
 
3412
 * typically once per page processed.
 
3413
 */
 
3414
void
 
3415
vacuum_delay_point(void)
 
3416
{
 
3417
        /* Always check for interrupts */
 
3418
        CHECK_FOR_INTERRUPTS();
 
3419
 
 
3420
        /* Nap if appropriate */
 
3421
        if (VacuumCostActive && !InterruptPending &&
 
3422
                VacuumCostBalance >= VacuumCostLimit)
 
3423
        {
 
3424
                int                     msec;
 
3425
 
 
3426
                msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
 
3427
                if (msec > VacuumCostDelay * 4)
 
3428
                        msec = VacuumCostDelay * 4;
 
3429
 
 
3430
                pg_usleep(msec * 1000L);
 
3431
 
 
3432
                VacuumCostBalance = 0;
 
3433
 
 
3434
                /* Might have gotten an interrupt while sleeping */
 
3435
                CHECK_FOR_INTERRUPTS();
 
3436
        }
 
3437
}