~vcs-imports/mammoth-replicator/trunk

« back to all changes in this revision

Viewing changes to src/backend/commands/cluster.c

  • Committer: alvherre
  • Date: 2005-12-16 21:24:52 UTC
  • Revision ID: svn-v4:db760fc0-0f08-0410-9d63-cc6633f64896:trunk:1
Initial import of the REL8_0_3 sources from the Pgsql CVS repository.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * cluster.c
 
4
 *        CLUSTER a table on an index.
 
5
 *
 
6
 * There is hardly anything left of Paul Brown's original implementation...
 
7
 *
 
8
 *
 
9
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
 
10
 * Portions Copyright (c) 1994-5, Regents of the University of California
 
11
 *
 
12
 *
 
13
 * IDENTIFICATION
 
14
 *        $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.131.4.1 2005-02-06 20:19:24 tgl Exp $
 
15
 *
 
16
 *-------------------------------------------------------------------------
 
17
 */
 
18
#include "postgres.h"
 
19
 
 
20
#include "access/genam.h"
 
21
#include "access/heapam.h"
 
22
#include "catalog/catalog.h"
 
23
#include "catalog/catname.h"
 
24
#include "catalog/dependency.h"
 
25
#include "catalog/heap.h"
 
26
#include "catalog/index.h"
 
27
#include "catalog/indexing.h"
 
28
#include "catalog/namespace.h"
 
29
#include "commands/cluster.h"
 
30
#include "commands/tablecmds.h"
 
31
#include "miscadmin.h"
 
32
#include "utils/acl.h"
 
33
#include "utils/fmgroids.h"
 
34
#include "utils/inval.h"
 
35
#include "utils/lsyscache.h"
 
36
#include "utils/syscache.h"
 
37
#include "utils/relcache.h"
 
38
 
 
39
 
 
40
/*
 
41
 * This struct is used to pass around the information on tables to be
 
42
 * clustered. We need this so we can make a list of them when invoked without
 
43
 * a specific table/index pair.
 
44
 */
 
45
typedef struct
 
46
{
 
47
        Oid                     tableOid;
 
48
        Oid                     indexOid;
 
49
} RelToCluster;
 
50
 
 
51
 
 
52
static void cluster_rel(RelToCluster *rv, bool recheck);
 
53
static void rebuild_relation(Relation OldHeap, Oid indexOid);
 
54
static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
 
55
static List *get_tables_to_cluster(MemoryContext cluster_context);
 
56
 
 
57
 
 
58
 
 
59
/*---------------------------------------------------------------------------
 
60
 * This cluster code allows for clustering multiple tables at once. Because
 
61
 * of this, we cannot just run everything on a single transaction, or we
 
62
 * would be forced to acquire exclusive locks on all the tables being
 
63
 * clustered, simultaneously --- very likely leading to deadlock.
 
64
 *
 
65
 * To solve this we follow a similar strategy to VACUUM code,
 
66
 * clustering each relation in a separate transaction. For this to work,
 
67
 * we need to:
 
68
 *      - provide a separate memory context so that we can pass information in
 
69
 *        a way that survives across transactions
 
70
 *      - start a new transaction every time a new relation is clustered
 
71
 *      - check for validity of the information on to-be-clustered relations,
 
72
 *        as someone might have deleted a relation behind our back, or
 
73
 *        clustered one on a different index
 
74
 *      - end the transaction
 
75
 *
 
76
 * The single-relation case does not have any such overhead.
 
77
 *
 
78
 * We also allow a relation being specified without index.      In that case,
 
79
 * the indisclustered bit will be looked up, and an ERROR will be thrown
 
80
 * if there is no index with the bit set.
 
81
 *---------------------------------------------------------------------------
 
82
 */
 
83
void
 
84
cluster(ClusterStmt *stmt)
 
85
{
 
86
        if (stmt->relation != NULL)
 
87
        {
 
88
                /* This is the single-relation case. */
 
89
                Oid                     tableOid,
 
90
                                        indexOid = InvalidOid;
 
91
                Relation        rel;
 
92
                RelToCluster rvtc;
 
93
 
 
94
                /* Find and lock the table */
 
95
                rel = heap_openrv(stmt->relation, AccessExclusiveLock);
 
96
 
 
97
                tableOid = RelationGetRelid(rel);
 
98
 
 
99
                /* Check permissions */
 
100
                if (!pg_class_ownercheck(tableOid, GetUserId()))
 
101
                        aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
 
102
                                                   RelationGetRelationName(rel));
 
103
 
 
104
                if (stmt->indexname == NULL)
 
105
                {
 
106
                        ListCell   *index;
 
107
 
 
108
                        /* We need to find the index that has indisclustered set. */
 
109
                        foreach(index, RelationGetIndexList(rel))
 
110
                        {
 
111
                                HeapTuple       idxtuple;
 
112
                                Form_pg_index indexForm;
 
113
 
 
114
                                indexOid = lfirst_oid(index);
 
115
                                idxtuple = SearchSysCache(INDEXRELID,
 
116
                                                                                  ObjectIdGetDatum(indexOid),
 
117
                                                                                  0, 0, 0);
 
118
                                if (!HeapTupleIsValid(idxtuple))
 
119
                                        elog(ERROR, "cache lookup failed for index %u", indexOid);
 
120
                                indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
 
121
                                if (indexForm->indisclustered)
 
122
                                {
 
123
                                        ReleaseSysCache(idxtuple);
 
124
                                        break;
 
125
                                }
 
126
                                ReleaseSysCache(idxtuple);
 
127
                                indexOid = InvalidOid;
 
128
                        }
 
129
 
 
130
                        if (!OidIsValid(indexOid))
 
131
                                ereport(ERROR,
 
132
                                                (errcode(ERRCODE_UNDEFINED_OBJECT),
 
133
                                                 errmsg("there is no previously clustered index for table \"%s\"",
 
134
                                                                stmt->relation->relname)));
 
135
                }
 
136
                else
 
137
                {
 
138
                        /*
 
139
                         * The index is expected to be in the same namespace as the
 
140
                         * relation.
 
141
                         */
 
142
                        indexOid = get_relname_relid(stmt->indexname,
 
143
                                                                                 rel->rd_rel->relnamespace);
 
144
                        if (!OidIsValid(indexOid))
 
145
                                ereport(ERROR,
 
146
                                                (errcode(ERRCODE_UNDEFINED_OBJECT),
 
147
                                   errmsg("index \"%s\" for table \"%s\" does not exist",
 
148
                                                  stmt->indexname, stmt->relation->relname)));
 
149
                }
 
150
 
 
151
                /* All other checks are done in cluster_rel() */
 
152
                rvtc.tableOid = tableOid;
 
153
                rvtc.indexOid = indexOid;
 
154
 
 
155
                /* close relation, keep lock till commit */
 
156
                heap_close(rel, NoLock);
 
157
 
 
158
                /* Do the job */
 
159
                cluster_rel(&rvtc, false);
 
160
        }
 
161
        else
 
162
        {
 
163
                /*
 
164
                 * This is the "multi relation" case. We need to cluster all
 
165
                 * tables that have some index with indisclustered set.
 
166
                 */
 
167
                MemoryContext cluster_context;
 
168
                List       *rvs;
 
169
                ListCell   *rv;
 
170
 
 
171
                /*
 
172
                 * We cannot run this form of CLUSTER inside a user transaction
 
173
                 * block; we'd be holding locks way too long.
 
174
                 */
 
175
                PreventTransactionChain((void *) stmt, "CLUSTER");
 
176
 
 
177
                /*
 
178
                 * Create special memory context for cross-transaction storage.
 
179
                 *
 
180
                 * Since it is a child of PortalContext, it will go away even in case
 
181
                 * of error.
 
182
                 */
 
183
                cluster_context = AllocSetContextCreate(PortalContext,
 
184
                                                                                                "Cluster",
 
185
                                                                                                ALLOCSET_DEFAULT_MINSIZE,
 
186
                                                                                                ALLOCSET_DEFAULT_INITSIZE,
 
187
                                                                                                ALLOCSET_DEFAULT_MAXSIZE);
 
188
 
 
189
                /*
 
190
                 * Build the list of relations to cluster.      Note that this lives
 
191
                 * in cluster_context.
 
192
                 */
 
193
                rvs = get_tables_to_cluster(cluster_context);
 
194
 
 
195
                /* Commit to get out of starting transaction */
 
196
                CommitTransactionCommand();
 
197
 
 
198
                /* Ok, now that we've got them all, cluster them one by one */
 
199
                foreach(rv, rvs)
 
200
                {
 
201
                        RelToCluster *rvtc = (RelToCluster *) lfirst(rv);
 
202
 
 
203
                        /* Start a new transaction for each relation. */
 
204
                        StartTransactionCommand();
 
205
                        /* functions in indexes may want a snapshot set */
 
206
                        ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
 
207
                        cluster_rel(rvtc, true);
 
208
                        CommitTransactionCommand();
 
209
                }
 
210
 
 
211
                /* Start a new transaction for the cleanup work. */
 
212
                StartTransactionCommand();
 
213
 
 
214
                /* Clean up working storage */
 
215
                MemoryContextDelete(cluster_context);
 
216
        }
 
217
}
 
218
 
 
219
/*
 
220
 * cluster_rel
 
221
 *
 
222
 * This clusters the table by creating a new, clustered table and
 
223
 * swapping the relfilenodes of the new table and the old table, so
 
224
 * the OID of the original table is preserved.  Thus we do not lose
 
225
 * GRANT, inheritance nor references to this table (this was a bug
 
226
 * in releases thru 7.3).
 
227
 *
 
228
 * Also create new indexes and swap the filenodes with the old indexes the
 
229
 * same way we do for the relation.  Since we are effectively bulk-loading
 
230
 * the new table, it's better to create the indexes afterwards than to fill
 
231
 * them incrementally while we load the table.
 
232
 */
 
233
static void
 
234
cluster_rel(RelToCluster *rvtc, bool recheck)
 
235
{
 
236
        Relation        OldHeap;
 
237
 
 
238
        /* Check for user-requested abort. */
 
239
        CHECK_FOR_INTERRUPTS();
 
240
 
 
241
        /*
 
242
         * Since we may open a new transaction for each relation, we have to
 
243
         * check that the relation still is what we think it is.
 
244
         *
 
245
         * If this is a single-transaction CLUSTER, we can skip these tests. We
 
246
         * *must* skip the one on indisclustered since it would reject an
 
247
         * attempt to cluster a not-previously-clustered index.
 
248
         */
 
249
        if (recheck)
 
250
        {
 
251
                HeapTuple       tuple;
 
252
                Form_pg_index indexForm;
 
253
 
 
254
                /*
 
255
                 * Check if the relation and index still exist before opening them
 
256
                 */
 
257
                if (!SearchSysCacheExists(RELOID,
 
258
                                                                  ObjectIdGetDatum(rvtc->tableOid),
 
259
                                                                  0, 0, 0) ||
 
260
                        !SearchSysCacheExists(RELOID,
 
261
                                                                  ObjectIdGetDatum(rvtc->indexOid),
 
262
                                                                  0, 0, 0))
 
263
                        return;
 
264
 
 
265
                /* Check that the user still owns the relation */
 
266
                if (!pg_class_ownercheck(rvtc->tableOid, GetUserId()))
 
267
                        return;
 
268
 
 
269
                /*
 
270
                 * Check that the index is still the one with indisclustered set.
 
271
                 */
 
272
                tuple = SearchSysCache(INDEXRELID,
 
273
                                                           ObjectIdGetDatum(rvtc->indexOid),
 
274
                                                           0, 0, 0);
 
275
                if (!HeapTupleIsValid(tuple))
 
276
                        return;                         /* could have gone away... */
 
277
                indexForm = (Form_pg_index) GETSTRUCT(tuple);
 
278
                if (!indexForm->indisclustered)
 
279
                {
 
280
                        ReleaseSysCache(tuple);
 
281
                        return;
 
282
                }
 
283
                ReleaseSysCache(tuple);
 
284
        }
 
285
 
 
286
        /*
 
287
         * We grab exclusive access to the target rel and index for the
 
288
         * duration of the transaction.  (This is redundant for the single-
 
289
         * transaction case, since cluster() already did it.)  The index lock
 
290
         * is taken inside check_index_is_clusterable.
 
291
         */
 
292
        OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);
 
293
 
 
294
        /* Check index is valid to cluster on */
 
295
        check_index_is_clusterable(OldHeap, rvtc->indexOid);
 
296
 
 
297
        /* rebuild_relation does all the dirty work */
 
298
        rebuild_relation(OldHeap, rvtc->indexOid);
 
299
 
 
300
        /* NB: rebuild_relation does heap_close() on OldHeap */
 
301
}
 
302
 
 
303
/*
 
304
 * Verify that the specified index is a legitimate index to cluster on
 
305
 *
 
306
 * Side effect: obtains exclusive lock on the index.  The caller should
 
307
 * already have exclusive lock on the table, so the index lock is likely
 
308
 * redundant, but it seems best to grab it anyway to ensure the index
 
309
 * definition can't change under us.
 
310
 */
 
311
void
 
312
check_index_is_clusterable(Relation OldHeap, Oid indexOid)
 
313
{
 
314
        Relation        OldIndex;
 
315
 
 
316
        OldIndex = index_open(indexOid);
 
317
        LockRelation(OldIndex, AccessExclusiveLock);
 
318
 
 
319
        /*
 
320
         * Check that index is in fact an index on the given relation
 
321
         */
 
322
        if (OldIndex->rd_index == NULL ||
 
323
                OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
 
324
                ereport(ERROR,
 
325
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
 
326
                                 errmsg("\"%s\" is not an index for table \"%s\"",
 
327
                                                RelationGetRelationName(OldIndex),
 
328
                                                RelationGetRelationName(OldHeap))));
 
329
 
 
330
        /*
 
331
         * Disallow clustering on incomplete indexes (those that might not
 
332
         * index every row of the relation).  We could relax this by making a
 
333
         * separate seqscan pass over the table to copy the missing rows, but
 
334
         * that seems expensive and tedious.
 
335
         */
 
336
        if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred))
 
337
                ereport(ERROR,
 
338
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
339
                                 errmsg("cannot cluster on partial index")));
 
340
        if (!OldIndex->rd_am->amindexnulls)
 
341
        {
 
342
                AttrNumber      colno;
 
343
 
 
344
                /*
 
345
                 * If the AM doesn't index nulls, then it's a partial index unless
 
346
                 * we can prove all the rows are non-null.      Note we only need look
 
347
                 * at the first column; multicolumn-capable AMs are *required* to
 
348
                 * index nulls in columns after the first.
 
349
                 */
 
350
                colno = OldIndex->rd_index->indkey[0];
 
351
                if (colno > 0)
 
352
                {
 
353
                        /* ordinary user attribute */
 
354
                        if (!OldHeap->rd_att->attrs[colno - 1]->attnotnull)
 
355
                                ereport(ERROR,
 
356
                                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
357
                                                 errmsg("cannot cluster when index access method does not handle null values"),
 
358
                                                 errhint("You may be able to work around this by marking column \"%s\" NOT NULL.",
 
359
                                  NameStr(OldHeap->rd_att->attrs[colno - 1]->attname))));
 
360
                }
 
361
                else if (colno < 0)
 
362
                {
 
363
                        /* system column --- okay, always non-null */
 
364
                }
 
365
                else
 
366
                {
 
367
                        /* index expression, lose... */
 
368
                        ereport(ERROR,
 
369
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
370
                                         errmsg("cannot cluster on expressional index when index access method does not handle null values")));
 
371
                }
 
372
        }
 
373
 
 
374
        /*
 
375
         * Disallow clustering system relations.  This will definitely NOT
 
376
         * work for shared relations (we have no way to update pg_class rows
 
377
         * in other databases), nor for nailed-in-cache relations (the
 
378
         * relfilenode values for those are hardwired, see relcache.c).  It
 
379
         * might work for other system relations, but I ain't gonna risk it.
 
380
         */
 
381
        if (IsSystemRelation(OldHeap))
 
382
                ereport(ERROR,
 
383
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
384
                                 errmsg("\"%s\" is a system catalog",
 
385
                                                RelationGetRelationName(OldHeap))));
 
386
 
 
387
        /*
 
388
         * Don't allow cluster on temp tables of other backends ... their
 
389
         * local buffer manager is not going to cope.
 
390
         */
 
391
        if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
 
392
                ereport(ERROR,
 
393
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
394
                   errmsg("cannot cluster temporary tables of other sessions")));
 
395
 
 
396
        /* Drop relcache refcnt on OldIndex, but keep lock */
 
397
        index_close(OldIndex);
 
398
}
 
399
 
 
400
/*
 
401
 * mark_index_clustered: mark the specified index as the one clustered on
 
402
 *
 
403
 * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
 
404
 */
 
405
void
 
406
mark_index_clustered(Relation rel, Oid indexOid)
 
407
{
 
408
        HeapTuple       indexTuple;
 
409
        Form_pg_index indexForm;
 
410
        Relation        pg_index;
 
411
        ListCell   *index;
 
412
 
 
413
        /*
 
414
         * If the index is already marked clustered, no need to do anything.
 
415
         */
 
416
        if (OidIsValid(indexOid))
 
417
        {
 
418
                indexTuple = SearchSysCache(INDEXRELID,
 
419
                                                                        ObjectIdGetDatum(indexOid),
 
420
                                                                        0, 0, 0);
 
421
                if (!HeapTupleIsValid(indexTuple))
 
422
                        elog(ERROR, "cache lookup failed for index %u", indexOid);
 
423
                indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
 
424
 
 
425
                if (indexForm->indisclustered)
 
426
                {
 
427
                        ReleaseSysCache(indexTuple);
 
428
                        return;
 
429
                }
 
430
 
 
431
                ReleaseSysCache(indexTuple);
 
432
        }
 
433
 
 
434
        /*
 
435
         * Check each index of the relation and set/clear the bit as needed.
 
436
         */
 
437
        pg_index = heap_openr(IndexRelationName, RowExclusiveLock);
 
438
 
 
439
        foreach(index, RelationGetIndexList(rel))
 
440
        {
 
441
                Oid                     thisIndexOid = lfirst_oid(index);
 
442
 
 
443
                indexTuple = SearchSysCacheCopy(INDEXRELID,
 
444
                                                                                ObjectIdGetDatum(thisIndexOid),
 
445
                                                                                0, 0, 0);
 
446
                if (!HeapTupleIsValid(indexTuple))
 
447
                        elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
 
448
                indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
 
449
 
 
450
                /*
 
451
                 * Unset the bit if set.  We know it's wrong because we checked
 
452
                 * this earlier.
 
453
                 */
 
454
                if (indexForm->indisclustered)
 
455
                {
 
456
                        indexForm->indisclustered = false;
 
457
                        simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
 
458
                        CatalogUpdateIndexes(pg_index, indexTuple);
 
459
                        /* Ensure we see the update in the index's relcache entry */
 
460
                        CacheInvalidateRelcacheByRelid(thisIndexOid);
 
461
                }
 
462
                else if (thisIndexOid == indexOid)
 
463
                {
 
464
                        indexForm->indisclustered = true;
 
465
                        simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
 
466
                        CatalogUpdateIndexes(pg_index, indexTuple);
 
467
                        /* Ensure we see the update in the index's relcache entry */
 
468
                        CacheInvalidateRelcacheByRelid(thisIndexOid);
 
469
                }
 
470
                heap_freetuple(indexTuple);
 
471
        }
 
472
 
 
473
        heap_close(pg_index, RowExclusiveLock);
 
474
}
 
475
 
 
476
/*
 
477
 * rebuild_relation: rebuild an existing relation in index order
 
478
 *
 
479
 * OldHeap: table to rebuild --- must be opened and exclusive-locked!
 
480
 * indexOid: index to cluster by
 
481
 *
 
482
 * NB: this routine closes OldHeap at the right time; caller should not.
 
483
 */
 
484
static void
 
485
rebuild_relation(Relation OldHeap, Oid indexOid)
 
486
{
 
487
        Oid                     tableOid = RelationGetRelid(OldHeap);
 
488
        Oid                     tableSpace = OldHeap->rd_rel->reltablespace;
 
489
        Oid                     OIDNewHeap;
 
490
        char            NewHeapName[NAMEDATALEN];
 
491
        ObjectAddress object;
 
492
 
 
493
        /* Mark the correct index as clustered */
 
494
        mark_index_clustered(OldHeap, indexOid);
 
495
 
 
496
        /* Close relcache entry, but keep lock until transaction commit */
 
497
        heap_close(OldHeap, NoLock);
 
498
 
 
499
        /*
 
500
         * Create the new heap, using a temporary name in the same namespace
 
501
         * as the existing table.  NOTE: there is some risk of collision with
 
502
         * user relnames.  Working around this seems more trouble than it's
 
503
         * worth; in particular, we can't create the new heap in a different
 
504
         * namespace from the old, or we will have problems with the TEMP
 
505
         * status of temp tables.
 
506
         */
 
507
        snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", tableOid);
 
508
 
 
509
        OIDNewHeap = make_new_heap(tableOid, NewHeapName, tableSpace);
 
510
 
 
511
        /*
 
512
         * We don't need CommandCounterIncrement() because make_new_heap did
 
513
         * it.
 
514
         */
 
515
 
 
516
        /*
 
517
         * Copy the heap data into the new table in the desired order.
 
518
         */
 
519
        copy_heap_data(OIDNewHeap, tableOid, indexOid);
 
520
 
 
521
        /* To make the new heap's data visible (probably not needed?). */
 
522
        CommandCounterIncrement();
 
523
 
 
524
        /* Swap the physical files of the old and new heaps. */
 
525
        swap_relation_files(tableOid, OIDNewHeap);
 
526
 
 
527
        CommandCounterIncrement();
 
528
 
 
529
        /* Destroy new heap with old filenode */
 
530
        object.classId = RelOid_pg_class;
 
531
        object.objectId = OIDNewHeap;
 
532
        object.objectSubId = 0;
 
533
 
 
534
        /*
 
535
         * The new relation is local to our transaction and we know nothing
 
536
         * depends on it, so DROP_RESTRICT should be OK.
 
537
         */
 
538
        performDeletion(&object, DROP_RESTRICT);
 
539
 
 
540
        /* performDeletion does CommandCounterIncrement at end */
 
541
 
 
542
        /*
 
543
         * Rebuild each index on the relation (but not the toast table, which
 
544
         * is all-new at this point).  We do not need
 
545
         * CommandCounterIncrement() because reindex_relation does it.
 
546
         */
 
547
        reindex_relation(tableOid, false);
 
548
}
 
549
 
 
550
/*
 
551
 * Create the new table that we will fill with correctly-ordered data.
 
552
 */
 
553
Oid
 
554
make_new_heap(Oid OIDOldHeap, const char *NewName, Oid NewTableSpace)
 
555
{
 
556
        TupleDesc       OldHeapDesc,
 
557
                                tupdesc;
 
558
        Oid                     OIDNewHeap;
 
559
        Relation        OldHeap;
 
560
 
 
561
        OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
 
562
        OldHeapDesc = RelationGetDescr(OldHeap);
 
563
 
 
564
        /*
 
565
         * Need to make a copy of the tuple descriptor, since
 
566
         * heap_create_with_catalog modifies it.
 
567
         */
 
568
        tupdesc = CreateTupleDescCopyConstr(OldHeapDesc);
 
569
 
 
570
        OIDNewHeap = heap_create_with_catalog(NewName,
 
571
                                                                                  RelationGetNamespace(OldHeap),
 
572
                                                                                  NewTableSpace,
 
573
                                                                                  tupdesc,
 
574
                                                                                  OldHeap->rd_rel->relkind,
 
575
                                                                                  OldHeap->rd_rel->relisshared,
 
576
                                                                                  true,
 
577
                                                                                  0,
 
578
                                                                                  ONCOMMIT_NOOP,
 
579
                                                                                  allowSystemTableMods);
 
580
 
 
581
        /*
 
582
         * Advance command counter so that the newly-created relation's
 
583
         * catalog tuples will be visible to heap_open.
 
584
         */
 
585
        CommandCounterIncrement();
 
586
 
 
587
        /*
 
588
         * If necessary, create a TOAST table for the new relation. Note that
 
589
         * AlterTableCreateToastTable ends with CommandCounterIncrement(), so
 
590
         * that the TOAST table will be visible for insertion.
 
591
         */
 
592
        AlterTableCreateToastTable(OIDNewHeap, true);
 
593
 
 
594
        heap_close(OldHeap, NoLock);
 
595
 
 
596
        return OIDNewHeap;
 
597
}
 
598
 
 
599
/*
 
600
 * Do the physical copying of heap data.
 
601
 */
 
602
static void
 
603
copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
 
604
{
 
605
        Relation        NewHeap,
 
606
                                OldHeap,
 
607
                                OldIndex;
 
608
        TupleDesc       oldTupDesc;
 
609
        TupleDesc       newTupDesc;
 
610
        int                     natts;
 
611
        Datum      *values;
 
612
        char       *nulls;
 
613
        IndexScanDesc scan;
 
614
        HeapTuple       tuple;
 
615
 
 
616
        /*
 
617
         * Open the relations we need.
 
618
         */
 
619
        NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock);
 
620
        OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
 
621
        OldIndex = index_open(OIDOldIndex);
 
622
 
 
623
        /*
 
624
         * Their tuple descriptors should be exactly alike, but here we only
 
625
         * need assume that they have the same number of columns.
 
626
         */
 
627
        oldTupDesc = RelationGetDescr(OldHeap);
 
628
        newTupDesc = RelationGetDescr(NewHeap);
 
629
        Assert(newTupDesc->natts == oldTupDesc->natts);
 
630
 
 
631
        /* Preallocate values/nulls arrays */
 
632
        natts = newTupDesc->natts;
 
633
        values = (Datum *) palloc0(natts * sizeof(Datum));
 
634
        nulls = (char *) palloc(natts * sizeof(char));
 
635
        memset(nulls, 'n', natts * sizeof(char));
 
636
 
 
637
        /*
 
638
         * Scan through the OldHeap on the OldIndex and copy each tuple into the
 
639
         * NewHeap.
 
640
         */
 
641
        scan = index_beginscan(OldHeap, OldIndex, SnapshotNow, 0, (ScanKey) NULL);
 
642
 
 
643
        while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
 
644
        {
 
645
                /*
 
646
                 * We cannot simply pass the tuple to heap_insert(), for several
 
647
                 * reasons:
 
648
                 *
 
649
                 * 1. heap_insert() will overwrite the commit-status fields of the
 
650
                 * tuple it's handed.  This would trash the source relation, which is
 
651
                 * bad news if we abort later on.  (This was a bug in releases thru
 
652
                 * 7.0)
 
653
                 *
 
654
                 * 2. We'd like to squeeze out the values of any dropped columns,
 
655
                 * both to save space and to ensure we have no corner-case failures.
 
656
                 * (It's possible for example that the new table hasn't got a TOAST
 
657
                 * table and so is unable to store any large values of dropped cols.)
 
658
                 *
 
659
                 * 3. The tuple might not even be legal for the new table; this is
 
660
                 * currently only known to happen as an after-effect of ALTER TABLE
 
661
                 * SET WITHOUT OIDS.
 
662
                 *
 
663
                 * So, we must reconstruct the tuple from component Datums.
 
664
                 */
 
665
                HeapTuple       copiedTuple;
 
666
                int                     i;
 
667
 
 
668
                heap_deformtuple(tuple, oldTupDesc, values, nulls);
 
669
 
 
670
                /* Be sure to null out any dropped columns */
 
671
                for (i = 0; i < natts; i++)
 
672
                {
 
673
                        if (newTupDesc->attrs[i]->attisdropped)
 
674
                                nulls[i] = 'n';
 
675
                }
 
676
 
 
677
                copiedTuple = heap_formtuple(newTupDesc, values, nulls);
 
678
 
 
679
                /* Preserve OID, if any */
 
680
                if (NewHeap->rd_rel->relhasoids)
 
681
                        HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));
 
682
 
 
683
                simple_heap_insert(NewHeap, copiedTuple);
 
684
 
 
685
                heap_freetuple(copiedTuple);
 
686
 
 
687
                CHECK_FOR_INTERRUPTS();
 
688
        }
 
689
 
 
690
        index_endscan(scan);
 
691
 
 
692
        pfree(values);
 
693
        pfree(nulls);
 
694
 
 
695
        index_close(OldIndex);
 
696
        heap_close(OldHeap, NoLock);
 
697
        heap_close(NewHeap, NoLock);
 
698
}
 
699
 
 
700
/*
 
701
 * Swap the physical files of two given relations.
 
702
 *
 
703
 * We swap the physical identity (reltablespace and relfilenode) while
 
704
 * keeping the same logical identities of the two relations.
 
705
 *
 
706
 * Also swap any TOAST links, so that the toast data moves along with
 
707
 * the main-table data.
 
708
 */
 
709
void
 
710
swap_relation_files(Oid r1, Oid r2)
 
711
{
 
712
        Relation        relRelation,
 
713
                                rel;
 
714
        HeapTuple       reltup1,
 
715
                                reltup2;
 
716
        Form_pg_class relform1,
 
717
                                relform2;
 
718
        Oid                     swaptemp;
 
719
        CatalogIndexState indstate;
 
720
 
 
721
        /* We need writable copies of both pg_class tuples. */
 
722
        relRelation = heap_openr(RelationRelationName, RowExclusiveLock);
 
723
 
 
724
        reltup1 = SearchSysCacheCopy(RELOID,
 
725
                                                                 ObjectIdGetDatum(r1),
 
726
                                                                 0, 0, 0);
 
727
        if (!HeapTupleIsValid(reltup1))
 
728
                elog(ERROR, "cache lookup failed for relation %u", r1);
 
729
        relform1 = (Form_pg_class) GETSTRUCT(reltup1);
 
730
 
 
731
        reltup2 = SearchSysCacheCopy(RELOID,
 
732
                                                                 ObjectIdGetDatum(r2),
 
733
                                                                 0, 0, 0);
 
734
        if (!HeapTupleIsValid(reltup2))
 
735
                elog(ERROR, "cache lookup failed for relation %u", r2);
 
736
        relform2 = (Form_pg_class) GETSTRUCT(reltup2);
 
737
 
 
738
        /*
 
739
         * The buffer manager gets confused if we swap relfilenodes for
 
740
         * relations that are not both local or non-local to this transaction.
 
741
         * Flush the buffers on both relations so the buffer manager can
 
742
         * forget about'em.  (XXX this might not be necessary anymore?)
 
743
         */
 
744
        rel = relation_open(r1, NoLock);
 
745
        FlushRelationBuffers(rel, 0);
 
746
        relation_close(rel, NoLock);
 
747
 
 
748
        rel = relation_open(r2, NoLock);
 
749
        FlushRelationBuffers(rel, 0);
 
750
        relation_close(rel, NoLock);
 
751
 
 
752
        /*
 
753
         * Actually swap the fields in the two tuples
 
754
         */
 
755
        swaptemp = relform1->relfilenode;
 
756
        relform1->relfilenode = relform2->relfilenode;
 
757
        relform2->relfilenode = swaptemp;
 
758
 
 
759
        swaptemp = relform1->reltablespace;
 
760
        relform1->reltablespace = relform2->reltablespace;
 
761
        relform2->reltablespace = swaptemp;
 
762
 
 
763
        swaptemp = relform1->reltoastrelid;
 
764
        relform1->reltoastrelid = relform2->reltoastrelid;
 
765
        relform2->reltoastrelid = swaptemp;
 
766
 
 
767
        /* we should not swap reltoastidxid */
 
768
 
 
769
        /* swap size statistics too, since new rel has freshly-updated stats */
 
770
        {
 
771
                int4            swap_pages;
 
772
                float4          swap_tuples;
 
773
 
 
774
                swap_pages = relform1->relpages;
 
775
                relform1->relpages = relform2->relpages;
 
776
                relform2->relpages = swap_pages;
 
777
 
 
778
                swap_tuples = relform1->reltuples;
 
779
                relform1->reltuples = relform2->reltuples;
 
780
                relform2->reltuples = swap_tuples;
 
781
        }
 
782
 
 
783
        /* Update the tuples in pg_class */
 
784
        simple_heap_update(relRelation, &reltup1->t_self, reltup1);
 
785
        simple_heap_update(relRelation, &reltup2->t_self, reltup2);
 
786
 
 
787
        /* Keep system catalogs current */
 
788
        indstate = CatalogOpenIndexes(relRelation);
 
789
        CatalogIndexInsert(indstate, reltup1);
 
790
        CatalogIndexInsert(indstate, reltup2);
 
791
        CatalogCloseIndexes(indstate);
 
792
 
 
793
        /*
 
794
         * If we have toast tables associated with the relations being
 
795
         * swapped, change their dependency links to re-associate them with
 
796
         * their new owning relations.  Otherwise the wrong one will get
 
797
         * dropped ...
 
798
         *
 
799
         * NOTE: it is possible that only one table has a toast table; this can
 
800
         * happen in CLUSTER if there were dropped columns in the old table,
 
801
         * and in ALTER TABLE when adding or changing type of columns.
 
802
         *
 
803
         * NOTE: at present, a TOAST table's only dependency is the one on its
 
804
         * owning table.  If more are ever created, we'd need to use something
 
805
         * more selective than deleteDependencyRecordsFor() to get rid of only
 
806
         * the link we want.
 
807
         */
 
808
        if (relform1->reltoastrelid || relform2->reltoastrelid)
 
809
        {
 
810
                ObjectAddress baseobject,
 
811
                                        toastobject;
 
812
                long            count;
 
813
 
 
814
                /* Delete old dependencies */
 
815
                if (relform1->reltoastrelid)
 
816
                {
 
817
                        count = deleteDependencyRecordsFor(RelOid_pg_class,
 
818
                                                                                           relform1->reltoastrelid);
 
819
                        if (count != 1)
 
820
                                elog(ERROR, "expected one dependency record for TOAST table, found %ld",
 
821
                                         count);
 
822
                }
 
823
                if (relform2->reltoastrelid)
 
824
                {
 
825
                        count = deleteDependencyRecordsFor(RelOid_pg_class,
 
826
                                                                                           relform2->reltoastrelid);
 
827
                        if (count != 1)
 
828
                                elog(ERROR, "expected one dependency record for TOAST table, found %ld",
 
829
                                         count);
 
830
                }
 
831
 
 
832
                /* Register new dependencies */
 
833
                baseobject.classId = RelOid_pg_class;
 
834
                baseobject.objectSubId = 0;
 
835
                toastobject.classId = RelOid_pg_class;
 
836
                toastobject.objectSubId = 0;
 
837
 
 
838
                if (relform1->reltoastrelid)
 
839
                {
 
840
                        baseobject.objectId = r1;
 
841
                        toastobject.objectId = relform1->reltoastrelid;
 
842
                        recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
 
843
                }
 
844
 
 
845
                if (relform2->reltoastrelid)
 
846
                {
 
847
                        baseobject.objectId = r2;
 
848
                        toastobject.objectId = relform2->reltoastrelid;
 
849
                        recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
 
850
                }
 
851
        }
 
852
 
 
853
        /*
 
854
         * Blow away the old relcache entries now.      We need this kluge because
 
855
         * relcache.c keeps a link to the smgr relation for the physical file,
 
856
         * and that will be out of date as soon as we do
 
857
         * CommandCounterIncrement. Whichever of the rels is the second to be
 
858
         * cleared during cache invalidation will have a dangling reference to
 
859
         * an already-deleted smgr relation.  Rather than trying to avoid this
 
860
         * by ordering operations just so, it's easiest to not have the
 
861
         * relcache entries there at all. (Fortunately, since one of the
 
862
         * entries is local in our transaction, it's sufficient to clear out
 
863
         * our own relcache this way; the problem cannot arise for other
 
864
         * backends when they see our update on the non-local relation.)
 
865
         */
 
866
        RelationForgetRelation(r1);
 
867
        RelationForgetRelation(r2);
 
868
 
 
869
        /* Clean up. */
 
870
        heap_freetuple(reltup1);
 
871
        heap_freetuple(reltup2);
 
872
 
 
873
        heap_close(relRelation, RowExclusiveLock);
 
874
}
 
875
 
 
876
/*
 
877
 * Get a list of tables that the current user owns and
 
878
 * have indisclustered set.  Return the list in a List * of rvsToCluster
 
879
 * with the tableOid and the indexOid on which the table is already
 
880
 * clustered.
 
881
 */
 
882
static List *
 
883
get_tables_to_cluster(MemoryContext cluster_context)
 
884
{
 
885
        Relation        indRelation;
 
886
        HeapScanDesc scan;
 
887
        ScanKeyData entry;
 
888
        HeapTuple       indexTuple;
 
889
        Form_pg_index index;
 
890
        MemoryContext old_context;
 
891
        RelToCluster *rvtc;
 
892
        List       *rvs = NIL;
 
893
 
 
894
        /*
 
895
         * Get all indexes that have indisclustered set and are owned by
 
896
         * appropriate user. System relations or nailed-in relations cannot
 
897
         * ever have indisclustered set, because CLUSTER will refuse to set it
 
898
         * when called with one of them as argument.
 
899
         */
 
900
        indRelation = relation_openr(IndexRelationName, AccessShareLock);
 
901
        ScanKeyInit(&entry,
 
902
                                Anum_pg_index_indisclustered,
 
903
                                BTEqualStrategyNumber, F_BOOLEQ,
 
904
                                BoolGetDatum(true));
 
905
        scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry);
 
906
        while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
 
907
        {
 
908
                index = (Form_pg_index) GETSTRUCT(indexTuple);
 
909
 
 
910
                if (!pg_class_ownercheck(index->indrelid, GetUserId()))
 
911
                        continue;
 
912
 
 
913
                /*
 
914
                 * We have to build the list in a different memory context so it
 
915
                 * will survive the cross-transaction processing
 
916
                 */
 
917
                old_context = MemoryContextSwitchTo(cluster_context);
 
918
 
 
919
                rvtc = (RelToCluster *) palloc(sizeof(RelToCluster));
 
920
                rvtc->tableOid = index->indrelid;
 
921
                rvtc->indexOid = index->indexrelid;
 
922
                rvs = lcons(rvtc, rvs);
 
923
 
 
924
                MemoryContextSwitchTo(old_context);
 
925
        }
 
926
        heap_endscan(scan);
 
927
 
 
928
        relation_close(indRelation, AccessShareLock);
 
929
 
 
930
        return rvs;
 
931
}