~vcs-imports/mammoth-replicator/trunk

« back to all changes in this revision

Viewing changes to src/backend/commands/tablecmds.c

  • Committer: alvherre
  • Date: 2005-12-16 21:24:52 UTC
  • Revision ID: svn-v4:db760fc0-0f08-0410-9d63-cc6633f64896:trunk:1
Initial import of the REL8_0_3 sources from the Pgsql CVS repository.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * tablecmds.c
 
4
 *        Commands for creating and altering table structures and settings
 
5
 *
 
6
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
 
7
 * Portions Copyright (c) 1994, Regents of the University of California
 
8
 *
 
9
 *
 
10
 * IDENTIFICATION
 
11
 *        $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.142.4.3 2005-03-25 18:04:47 tgl Exp $
 
12
 *
 
13
 *-------------------------------------------------------------------------
 
14
 */
 
15
#include "postgres.h"
 
16
 
 
17
#include "access/genam.h"
 
18
#include "access/tuptoaster.h"
 
19
#include "catalog/catalog.h"
 
20
#include "catalog/catname.h"
 
21
#include "catalog/dependency.h"
 
22
#include "catalog/heap.h"
 
23
#include "catalog/index.h"
 
24
#include "catalog/indexing.h"
 
25
#include "catalog/namespace.h"
 
26
#include "catalog/pg_constraint.h"
 
27
#include "catalog/pg_depend.h"
 
28
#include "catalog/pg_inherits.h"
 
29
#include "catalog/pg_namespace.h"
 
30
#include "catalog/pg_opclass.h"
 
31
#include "catalog/pg_trigger.h"
 
32
#include "catalog/pg_type.h"
 
33
#include "commands/cluster.h"
 
34
#include "commands/defrem.h"
 
35
#include "commands/tablecmds.h"
 
36
#include "commands/tablespace.h"
 
37
#include "commands/trigger.h"
 
38
#include "commands/typecmds.h"
 
39
#include "executor/executor.h"
 
40
#include "lib/stringinfo.h"
 
41
#include "miscadmin.h"
 
42
#include "nodes/makefuncs.h"
 
43
#include "optimizer/clauses.h"
 
44
#include "optimizer/plancat.h"
 
45
#include "optimizer/prep.h"
 
46
#include "parser/analyze.h"
 
47
#include "parser/gramparse.h"
 
48
#include "parser/parser.h"
 
49
#include "parser/parse_clause.h"
 
50
#include "parser/parse_coerce.h"
 
51
#include "parser/parse_expr.h"
 
52
#include "parser/parse_oper.h"
 
53
#include "parser/parse_relation.h"
 
54
#include "parser/parse_type.h"
 
55
#include "rewrite/rewriteHandler.h"
 
56
#include "storage/smgr.h"
 
57
#include "utils/acl.h"
 
58
#include "utils/builtins.h"
 
59
#include "utils/fmgroids.h"
 
60
#include "utils/inval.h"
 
61
#include "utils/lsyscache.h"
 
62
#include "utils/relcache.h"
 
63
#include "utils/syscache.h"
 
64
 
 
65
 
 
66
/*
 
67
 * ON COMMIT action list
 
68
 */
 
69
typedef struct OnCommitItem
 
70
{
 
71
        Oid                     relid;                  /* relid of relation */
 
72
        OnCommitAction oncommit;        /* what to do at end of xact */
 
73
 
 
74
        /*
 
75
         * If this entry was created during the current transaction,
 
76
         * creating_subid is the ID of the creating subxact; if created in a prior
 
77
         * transaction, creating_subid is zero.  If deleted during the current
 
78
         * transaction, deleting_subid is the ID of the deleting subxact; if no
 
79
         * deletion request is pending, deleting_subid is zero.
 
80
         */
 
81
        SubTransactionId creating_subid;
 
82
        SubTransactionId deleting_subid;
 
83
} OnCommitItem;
 
84
 
 
85
static List *on_commits = NIL;
 
86
 
 
87
 
 
88
/*
 
89
 * State information for ALTER TABLE
 
90
 *
 
91
 * The pending-work queue for an ALTER TABLE is a List of AlteredTableInfo
 
92
 * structs, one for each table modified by the operation (the named table
 
93
 * plus any child tables that are affected).  We save lists of subcommands
 
94
 * to apply to this table (possibly modified by parse transformation steps);
 
95
 * these lists will be executed in Phase 2.  If a Phase 3 step is needed,
 
96
 * necessary information is stored in the constraints and newvals lists.
 
97
 *
 
98
 * Phase 2 is divided into multiple passes; subcommands are executed in
 
99
 * a pass determined by subcommand type.
 
100
 */
 
101
 
 
102
#define AT_PASS_DROP                    0               /* DROP (all flavors) */
 
103
#define AT_PASS_ALTER_TYPE              1               /* ALTER COLUMN TYPE */
 
104
#define AT_PASS_OLD_INDEX               2               /* re-add existing indexes */
 
105
#define AT_PASS_OLD_CONSTR              3               /* re-add existing constraints */
 
106
#define AT_PASS_COL_ATTRS               4               /* set other column attributes */
 
107
/* We could support a RENAME COLUMN pass here, but not currently used */
 
108
#define AT_PASS_ADD_COL                 5               /* ADD COLUMN */
 
109
#define AT_PASS_ADD_INDEX               6               /* ADD indexes */
 
110
#define AT_PASS_ADD_CONSTR              7               /* ADD constraints, defaults */
 
111
#define AT_PASS_MISC                    8               /* other stuff */
 
112
#define AT_NUM_PASSES                   9
 
113
 
 
114
typedef struct AlteredTableInfo
 
115
{
 
116
        /* Information saved before any work commences: */
 
117
        Oid                     relid;                  /* Relation to work on */
 
118
        char            relkind;                /* Its relkind */
 
119
        TupleDesc       oldDesc;                /* Pre-modification tuple descriptor */
 
120
        /* Information saved by Phase 1 for Phase 2: */
 
121
        List       *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */
 
122
        /* Information saved by Phases 1/2 for Phase 3: */
 
123
        List       *constraints;        /* List of NewConstraint */
 
124
        List       *newvals;            /* List of NewColumnValue */
 
125
        Oid                     newTableSpace;  /* new tablespace; 0 means no change */
 
126
        /* Objects to rebuild after completing ALTER TYPE operations */
 
127
        List       *changedConstraintOids;      /* OIDs of constraints to rebuild */
 
128
        List       *changedConstraintDefs;      /* string definitions of same */
 
129
        List       *changedIndexOids;           /* OIDs of indexes to rebuild */
 
130
        List       *changedIndexDefs;           /* string definitions of same */
 
131
} AlteredTableInfo;
 
132
 
 
133
/* Struct describing one new constraint to check in Phase 3 scan */
 
134
typedef struct NewConstraint
 
135
{
 
136
        char       *name;                       /* Constraint name, or NULL if none */
 
137
        ConstrType      contype;                /* CHECK, NOT_NULL, or FOREIGN */
 
138
        AttrNumber      attnum;                 /* only relevant for NOT_NULL */
 
139
        Oid                     refrelid;               /* PK rel, if FOREIGN */
 
140
        Node       *qual;                       /* Check expr or FkConstraint struct */
 
141
        List       *qualstate;          /* Execution state for CHECK */
 
142
} NewConstraint;
 
143
 
 
144
/*
 
145
 * Struct describing one new column value that needs to be computed during
 
146
 * Phase 3 copy (this could be either a new column with a non-null default, or
 
147
 * a column that we're changing the type of).  Columns without such an entry
 
148
 * are just copied from the old table during ATRewriteTable.  Note that the
 
149
 * expr is an expression over *old* table values.
 
150
 */
 
151
typedef struct NewColumnValue
 
152
{
 
153
        AttrNumber      attnum;                 /* which column */
 
154
        Expr       *expr;                       /* expression to compute */
 
155
        ExprState  *exprstate;          /* execution state */
 
156
} NewColumnValue;
 
157
 
 
158
 
 
159
/* Used by attribute and relation renaming routines: */
 
160
#define RI_TRIGGER_PK   1               /* is a trigger on the PK relation */
 
161
#define RI_TRIGGER_FK   2               /* is a trigger on the FK relation */
 
162
#define RI_TRIGGER_NONE 0               /* is not an RI trigger function */
 
163
 
 
164
 
 
165
static List *MergeAttributes(List *schema, List *supers, bool istemp,
 
166
                                List **supOids, List **supconstr, int *supOidCount);
 
167
static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
 
168
static void StoreCatalogInheritance(Oid relationId, List *supers);
 
169
static int      findAttrByName(const char *attributeName, List *schema);
 
170
static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
 
171
static bool needs_toast_table(Relation rel);
 
172
static int transformColumnNameList(Oid relId, List *colList,
 
173
                                                int16 *attnums, Oid *atttypids);
 
174
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
 
175
                                                   List **attnamelist,
 
176
                                                   int16 *attnums, Oid *atttypids,
 
177
                                                   Oid *opclasses);
 
178
static Oid transformFkeyCheckAttrs(Relation pkrel,
 
179
                                                int numattrs, int16 *attnums,
 
180
                                                Oid *opclasses);
 
181
static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
 
182
                                                         Relation rel, Relation pkrel);
 
183
static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
 
184
                                                 Oid constrOid);
 
185
static char *fkMatchTypeToString(char match_type);
 
186
static void ATController(Relation rel, List *cmds, bool recurse);
 
187
static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 
188
                  bool recurse, bool recursing);
 
189
static void ATRewriteCatalogs(List **wqueue);
 
190
static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd);
 
191
static void ATRewriteTables(List **wqueue);
 
192
static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
 
193
static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
 
194
static void ATSimplePermissions(Relation rel, bool allowView);
 
195
static void ATSimpleRecursion(List **wqueue, Relation rel,
 
196
                                  AlterTableCmd *cmd, bool recurse);
 
197
static void ATOneLevelRecursion(List **wqueue, Relation rel,
 
198
                                        AlterTableCmd *cmd);
 
199
static void find_composite_type_dependencies(Oid typeOid,
 
200
                                                                 const char *origTblName);
 
201
static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
 
202
                                AlterTableCmd *cmd);
 
203
static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
 
204
                                ColumnDef *colDef);
 
205
static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
 
206
static void add_column_support_dependency(Oid relid, int32 attnum,
 
207
                                                          RangeVar *support);
 
208
static void ATExecDropNotNull(Relation rel, const char *colName);
 
209
static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
 
210
                                 const char *colName);
 
211
static void ATExecColumnDefault(Relation rel, const char *colName,
 
212
                                        Node *newDefault);
 
213
static void ATPrepSetStatistics(Relation rel, const char *colName,
 
214
                                        Node *flagValue);
 
215
static void ATExecSetStatistics(Relation rel, const char *colName,
 
216
                                        Node *newValue);
 
217
static void ATExecSetStorage(Relation rel, const char *colName,
 
218
                                 Node *newValue);
 
219
static void ATExecDropColumn(Relation rel, const char *colName,
 
220
                                 DropBehavior behavior,
 
221
                                 bool recurse, bool recursing);
 
222
static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
 
223
                           IndexStmt *stmt, bool is_rebuild);
 
224
static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel,
 
225
                                        Node *newConstraint);
 
226
static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
 
227
                                                  FkConstraint *fkconstraint);
 
228
static void ATPrepDropConstraint(List **wqueue, Relation rel,
 
229
                                         bool recurse, AlterTableCmd *cmd);
 
230
static void ATExecDropConstraint(Relation rel, const char *constrName,
 
231
                                         DropBehavior behavior, bool quiet);
 
232
static void ATPrepAlterColumnType(List **wqueue,
 
233
                                          AlteredTableInfo *tab, Relation rel,
 
234
                                          bool recurse, bool recursing,
 
235
                                          AlterTableCmd *cmd);
 
236
static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
 
237
                                          const char *colName, TypeName *typename);
 
238
static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab);
 
239
static void ATPostAlterTypeParse(char *cmd, List **wqueue);
 
240
static void ATExecChangeOwner(Oid relationOid, int32 newOwnerSysId);
 
241
static void change_owner_recurse_to_sequences(Oid relationOid,
 
242
                                                                                          int32 newOwnerSysId);
 
243
static void ATExecClusterOn(Relation rel, const char *indexName);
 
244
static void ATExecDropCluster(Relation rel);
 
245
static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
 
246
                                        char *tablespacename);
 
247
static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
 
248
static void copy_relation_data(Relation rel, SMgrRelation dst);
 
249
static int      ri_trigger_type(Oid tgfoid);
 
250
static void update_ri_trigger_args(Oid relid,
 
251
                                           const char *oldname,
 
252
                                           const char *newname,
 
253
                                           bool fk_scan,
 
254
                                           bool update_relname);
 
255
 
 
256
 
 
257
/* ----------------------------------------------------------------
 
258
 *              DefineRelation
 
259
 *                              Creates a new relation.
 
260
 *
 
261
 * If successful, returns the OID of the new relation.
 
262
 * ----------------------------------------------------------------
 
263
 */
 
264
Oid
 
265
DefineRelation(CreateStmt *stmt, char relkind)
 
266
{
 
267
        char            relname[NAMEDATALEN];
 
268
        Oid                     namespaceId;
 
269
        List       *schema = stmt->tableElts;
 
270
        Oid                     relationId;
 
271
        Oid                     tablespaceId;
 
272
        Relation        rel;
 
273
        TupleDesc       descriptor;
 
274
        List       *inheritOids;
 
275
        List       *old_constraints;
 
276
        bool            localHasOids;
 
277
        int                     parentOidCount;
 
278
        List       *rawDefaults;
 
279
        ListCell   *listptr;
 
280
        int                     i;
 
281
        AttrNumber      attnum;
 
282
 
 
283
        /*
 
284
         * Truncate relname to appropriate length (probably a waste of time,
 
285
         * as parser should have done this already).
 
286
         */
 
287
        StrNCpy(relname, stmt->relation->relname, NAMEDATALEN);
 
288
 
 
289
        /*
 
290
         * Check consistency of arguments
 
291
         */
 
292
        if (stmt->oncommit != ONCOMMIT_NOOP && !stmt->relation->istemp)
 
293
                ereport(ERROR,
 
294
                                (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 
295
                          errmsg("ON COMMIT can only be used on temporary tables")));
 
296
 
 
297
        /*
 
298
         * Look up the namespace in which we are supposed to create the
 
299
         * relation.  Check we have permission to create there. Skip check if
 
300
         * bootstrapping, since permissions machinery may not be working yet.
 
301
         */
 
302
        namespaceId = RangeVarGetCreationNamespace(stmt->relation);
 
303
 
 
304
        if (!IsBootstrapProcessingMode())
 
305
        {
 
306
                AclResult       aclresult;
 
307
 
 
308
                aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
 
309
                                                                                  ACL_CREATE);
 
310
                if (aclresult != ACLCHECK_OK)
 
311
                        aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
 
312
                                                   get_namespace_name(namespaceId));
 
313
        }
 
314
 
 
315
        /*
 
316
         * Select tablespace to use.  If not specified, use default_tablespace
 
317
         * (which may in turn default to database's default).
 
318
         */
 
319
        if (stmt->tablespacename)
 
320
        {
 
321
                tablespaceId = get_tablespace_oid(stmt->tablespacename);
 
322
                if (!OidIsValid(tablespaceId))
 
323
                        ereport(ERROR,
 
324
                                        (errcode(ERRCODE_UNDEFINED_OBJECT),
 
325
                                         errmsg("tablespace \"%s\" does not exist",
 
326
                                                        stmt->tablespacename)));
 
327
        }
 
328
        else
 
329
        {
 
330
                tablespaceId = GetDefaultTablespace();
 
331
                /* note InvalidOid is OK in this case */
 
332
        }
 
333
 
 
334
        /* Check permissions except when using database's default */
 
335
        if (OidIsValid(tablespaceId))
 
336
        {
 
337
                AclResult       aclresult;
 
338
 
 
339
                aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
 
340
                                                                                   ACL_CREATE);
 
341
                if (aclresult != ACLCHECK_OK)
 
342
                        aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
 
343
                                                   get_tablespace_name(tablespaceId));
 
344
        }
 
345
 
 
346
        /*
 
347
         * Look up inheritance ancestors and generate relation schema,
 
348
         * including inherited attributes.
 
349
         */
 
350
        schema = MergeAttributes(schema, stmt->inhRelations,
 
351
                                                         stmt->relation->istemp,
 
352
                                                &inheritOids, &old_constraints, &parentOidCount);
 
353
 
 
354
        /*
 
355
         * Create a relation descriptor from the relation schema and create
 
356
         * the relation.  Note that in this stage only inherited (pre-cooked)
 
357
         * defaults and constraints will be included into the new relation.
 
358
         * (BuildDescForRelation takes care of the inherited defaults, but we
 
359
         * have to copy inherited constraints here.)
 
360
         */
 
361
        descriptor = BuildDescForRelation(schema);
 
362
 
 
363
        localHasOids = interpretOidsOption(stmt->hasoids);
 
364
        descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
 
365
 
 
366
        if (old_constraints != NIL)
 
367
        {
 
368
                ConstrCheck *check = (ConstrCheck *)
 
369
                palloc0(list_length(old_constraints) * sizeof(ConstrCheck));
 
370
                int                     ncheck = 0;
 
371
 
 
372
                foreach(listptr, old_constraints)
 
373
                {
 
374
                        Constraint *cdef = (Constraint *) lfirst(listptr);
 
375
                        bool            dup = false;
 
376
 
 
377
                        if (cdef->contype != CONSTR_CHECK)
 
378
                                continue;
 
379
                        Assert(cdef->name != NULL);
 
380
                        Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
 
381
 
 
382
                        /*
 
383
                         * In multiple-inheritance situations, it's possible to
 
384
                         * inherit the same grandparent constraint through multiple
 
385
                         * parents. Hence, discard inherited constraints that match as
 
386
                         * to both name and expression.  Otherwise, gripe if the names
 
387
                         * conflict.
 
388
                         */
 
389
                        for (i = 0; i < ncheck; i++)
 
390
                        {
 
391
                                if (strcmp(check[i].ccname, cdef->name) != 0)
 
392
                                        continue;
 
393
                                if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0)
 
394
                                {
 
395
                                        dup = true;
 
396
                                        break;
 
397
                                }
 
398
                                ereport(ERROR,
 
399
                                                (errcode(ERRCODE_DUPLICATE_OBJECT),
 
400
                                                 errmsg("duplicate check constraint name \"%s\"",
 
401
                                                                cdef->name)));
 
402
                        }
 
403
                        if (!dup)
 
404
                        {
 
405
                                check[ncheck].ccname = cdef->name;
 
406
                                check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
 
407
                                ncheck++;
 
408
                        }
 
409
                }
 
410
                if (ncheck > 0)
 
411
                {
 
412
                        if (descriptor->constr == NULL)
 
413
                        {
 
414
                                descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
 
415
                                descriptor->constr->defval = NULL;
 
416
                                descriptor->constr->num_defval = 0;
 
417
                                descriptor->constr->has_not_null = false;
 
418
                        }
 
419
                        descriptor->constr->num_check = ncheck;
 
420
                        descriptor->constr->check = check;
 
421
                }
 
422
        }
 
423
 
 
424
        relationId = heap_create_with_catalog(relname,
 
425
                                                                                  namespaceId,
 
426
                                                                                  tablespaceId,
 
427
                                                                                  descriptor,
 
428
                                                                                  relkind,
 
429
                                                                                  false,
 
430
                                                                                  localHasOids,
 
431
                                                                                  parentOidCount,
 
432
                                                                                  stmt->oncommit,
 
433
                                                                                  allowSystemTableMods);
 
434
 
 
435
        StoreCatalogInheritance(relationId, inheritOids);
 
436
 
 
437
        /*
 
438
         * We must bump the command counter to make the newly-created relation
 
439
         * tuple visible for opening.
 
440
         */
 
441
        CommandCounterIncrement();
 
442
 
 
443
        /*
 
444
         * Open the new relation and acquire exclusive lock on it.      This isn't
 
445
         * really necessary for locking out other backends (since they can't
 
446
         * see the new rel anyway until we commit), but it keeps the lock
 
447
         * manager from complaining about deadlock risks.
 
448
         */
 
449
        rel = relation_open(relationId, AccessExclusiveLock);
 
450
 
 
451
        /*
 
452
         * Now add any newly specified column default values and CHECK
 
453
         * constraints to the new relation.  These are passed to us in the
 
454
         * form of raw parsetrees; we need to transform them to executable
 
455
         * expression trees before they can be added. The most convenient way
 
456
         * to do that is to apply the parser's transformExpr routine, but
 
457
         * transformExpr doesn't work unless we have a pre-existing relation.
 
458
         * So, the transformation has to be postponed to this final step of
 
459
         * CREATE TABLE.
 
460
         *
 
461
         * Another task that's conveniently done at this step is to add
 
462
         * dependency links between columns and supporting relations (such as
 
463
         * SERIAL sequences).
 
464
         *
 
465
         * First, scan schema to find new column defaults.
 
466
         */
 
467
        rawDefaults = NIL;
 
468
        attnum = 0;
 
469
 
 
470
        foreach(listptr, schema)
 
471
        {
 
472
                ColumnDef  *colDef = lfirst(listptr);
 
473
 
 
474
                attnum++;
 
475
 
 
476
                if (colDef->raw_default != NULL)
 
477
                {
 
478
                        RawColumnDefault *rawEnt;
 
479
 
 
480
                        Assert(colDef->cooked_default == NULL);
 
481
 
 
482
                        rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
 
483
                        rawEnt->attnum = attnum;
 
484
                        rawEnt->raw_default = colDef->raw_default;
 
485
                        rawDefaults = lappend(rawDefaults, rawEnt);
 
486
                }
 
487
 
 
488
                /* Create dependency for supporting relation for this column */
 
489
                if (colDef->support != NULL)
 
490
                        add_column_support_dependency(relationId, attnum, colDef->support);
 
491
        }
 
492
 
 
493
        /*
 
494
         * Parse and add the defaults/constraints, if any.
 
495
         */
 
496
        if (rawDefaults || stmt->constraints)
 
497
                AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
 
498
 
 
499
        /*
 
500
         * Clean up.  We keep lock on new relation (although it shouldn't be
 
501
         * visible to anyone else anyway, until commit).
 
502
         */
 
503
        relation_close(rel, NoLock);
 
504
 
 
505
        return relationId;
 
506
}
 
507
 
 
508
/*
 
509
 * RemoveRelation
 
510
 *              Deletes a relation.
 
511
 */
 
512
void
 
513
RemoveRelation(const RangeVar *relation, DropBehavior behavior)
 
514
{
 
515
        Oid                     relOid;
 
516
        ObjectAddress object;
 
517
 
 
518
        relOid = RangeVarGetRelid(relation, false);
 
519
 
 
520
        object.classId = RelOid_pg_class;
 
521
        object.objectId = relOid;
 
522
        object.objectSubId = 0;
 
523
 
 
524
        performDeletion(&object, behavior);
 
525
}
 
526
 
 
527
/*
 
528
 * TruncateRelation
 
529
 *              Removes all the rows from a relation.
 
530
 */
 
531
void
 
532
TruncateRelation(const RangeVar *relation)
 
533
{
 
534
        Relation        rel;
 
535
        Oid                     heap_relid;
 
536
        Oid                     toast_relid;
 
537
 
 
538
        /* Grab exclusive lock in preparation for truncate */
 
539
        rel = heap_openrv(relation, AccessExclusiveLock);
 
540
 
 
541
        /* Only allow truncate on regular tables */
 
542
        if (rel->rd_rel->relkind != RELKIND_RELATION)
 
543
                ereport(ERROR,
 
544
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
 
545
                                 errmsg("\"%s\" is not a table",
 
546
                                                RelationGetRelationName(rel))));
 
547
 
 
548
        /* Permissions checks */
 
549
        if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
 
550
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
 
551
                                           RelationGetRelationName(rel));
 
552
 
 
553
        if (!allowSystemTableMods && IsSystemRelation(rel))
 
554
                ereport(ERROR,
 
555
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 
556
                                 errmsg("permission denied: \"%s\" is a system catalog",
 
557
                                                RelationGetRelationName(rel))));
 
558
 
 
559
        /*
 
560
         * We can never allow truncation of shared or nailed-in-cache
 
561
         * relations, because we can't support changing their relfilenode
 
562
         * values.
 
563
         */
 
564
        if (rel->rd_rel->relisshared || rel->rd_isnailed)
 
565
                ereport(ERROR,
 
566
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
567
                                 errmsg("cannot truncate system relation \"%s\"",
 
568
                                                RelationGetRelationName(rel))));
 
569
 
 
570
        /*
 
571
         * Don't allow truncate on temp tables of other backends ... their
 
572
         * local buffer manager is not going to cope.
 
573
         */
 
574
        if (isOtherTempNamespace(RelationGetNamespace(rel)))
 
575
                ereport(ERROR,
 
576
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
577
                  errmsg("cannot truncate temporary tables of other sessions")));
 
578
 
 
579
        /*
 
580
         * Don't allow truncate on tables which are referenced by foreign keys
 
581
         */
 
582
        heap_truncate_check_FKs(rel);
 
583
 
 
584
        /*
 
585
         * Okay, here we go: create a new empty storage file for the relation,
 
586
         * and assign it as the relfilenode value.      The old storage file is
 
587
         * scheduled for deletion at commit.
 
588
         */
 
589
        setNewRelfilenode(rel);
 
590
 
 
591
        heap_relid = RelationGetRelid(rel);
 
592
        toast_relid = rel->rd_rel->reltoastrelid;
 
593
 
 
594
        heap_close(rel, NoLock);
 
595
 
 
596
        /*
 
597
         * The same for the toast table, if any.
 
598
         */
 
599
        if (OidIsValid(toast_relid))
 
600
        {
 
601
                rel = relation_open(toast_relid, AccessExclusiveLock);
 
602
                setNewRelfilenode(rel);
 
603
                heap_close(rel, NoLock);
 
604
        }
 
605
 
 
606
        /*
 
607
         * Reconstruct the indexes to match, and we're done.
 
608
         */
 
609
        reindex_relation(heap_relid, true);
 
610
}
 
611
 
 
612
/*----------
 
613
 * MergeAttributes
 
614
 *              Returns new schema given initial schema and superclasses.
 
615
 *
 
616
 * Input arguments:
 
617
 * 'schema' is the column/attribute definition for the table. (It's a list
 
618
 *              of ColumnDef's.) It is destructively changed.
 
619
 * 'supers' is a list of names (as RangeVar nodes) of parent relations.
 
620
 * 'istemp' is TRUE if we are creating a temp relation.
 
621
 *
 
622
 * Output arguments:
 
623
 * 'supOids' receives a list of the OIDs of the parent relations.
 
624
 * 'supconstr' receives a list of constraints belonging to the parents,
 
625
 *              updated as necessary to be valid for the child.
 
626
 * 'supOidCount' is set to the number of parents that have OID columns.
 
627
 *
 
628
 * Return value:
 
629
 * Completed schema list.
 
630
 *
 
631
 * Notes:
 
632
 *        The order in which the attributes are inherited is very important.
 
633
 *        Intuitively, the inherited attributes should come first. If a table
 
634
 *        inherits from multiple parents, the order of those attributes are
 
635
 *        according to the order of the parents specified in CREATE TABLE.
 
636
 *
 
637
 *        Here's an example:
 
638
 *
 
639
 *              create table person (name text, age int4, location point);
 
640
 *              create table emp (salary int4, manager text) inherits(person);
 
641
 *              create table student (gpa float8) inherits (person);
 
642
 *              create table stud_emp (percent int4) inherits (emp, student);
 
643
 *
 
644
 *        The order of the attributes of stud_emp is:
 
645
 *
 
646
 *                                                      person {1:name, 2:age, 3:location}
 
647
 *                                                      /        \
 
648
 *                         {6:gpa}      student   emp {4:salary, 5:manager}
 
649
 *                                                      \        /
 
650
 *                                                 stud_emp {7:percent}
 
651
 *
 
652
 *         If the same attribute name appears multiple times, then it appears
 
653
 *         in the result table in the proper location for its first appearance.
 
654
 *
 
655
 *         Constraints (including NOT NULL constraints) for the child table
 
656
 *         are the union of all relevant constraints, from both the child schema
 
657
 *         and parent tables.
 
658
 *
 
659
 *         The default value for a child column is defined as:
 
660
 *              (1) If the child schema specifies a default, that value is used.
 
661
 *              (2) If neither the child nor any parent specifies a default, then
 
662
 *                      the column will not have a default.
 
663
 *              (3) If conflicting defaults are inherited from different parents
 
664
 *                      (and not overridden by the child), an error is raised.
 
665
 *              (4) Otherwise the inherited default is used.
 
666
 *              Rule (3) is new in Postgres 7.1; in earlier releases you got a
 
667
 *              rather arbitrary choice of which parent default to use.
 
668
 *----------
 
669
 */
 
670
static List *
 
671
MergeAttributes(List *schema, List *supers, bool istemp,
 
672
                                List **supOids, List **supconstr, int *supOidCount)
 
673
{
 
674
        ListCell   *entry;
 
675
        List       *inhSchema = NIL;
 
676
        List       *parentOids = NIL;
 
677
        List       *constraints = NIL;
 
678
        int                     parentsWithOids = 0;
 
679
        bool            have_bogus_defaults = false;
 
680
        char       *bogus_marker = "Bogus!";            /* marks conflicting
 
681
                                                                                                 * defaults */
 
682
        int                     child_attno;
 
683
 
 
684
        /*
 
685
         * Check for and reject tables with too many columns. We perform
 
686
         * this check relatively early for two reasons: (a) we don't run
 
687
         * the risk of overflowing an AttrNumber in subsequent code (b) an
 
688
         * O(n^2) algorithm is okay if we're processing <= 1600 columns,
 
689
         * but could take minutes to execute if the user attempts to
 
690
         * create a table with hundreds of thousands of columns.
 
691
         *
 
692
         * Note that we also need to check that any we do not exceed this
 
693
         * figure after including columns from inherited relations.
 
694
         */
 
695
        if (list_length(schema) > MaxHeapAttributeNumber)
 
696
                ereport(ERROR,
 
697
                                (errcode(ERRCODE_TOO_MANY_COLUMNS),
 
698
                                 errmsg("tables can have at most %d columns",
 
699
                                                MaxHeapAttributeNumber)));
 
700
 
 
701
        /*
 
702
         * Check for duplicate names in the explicit list of attributes.
 
703
         *
 
704
         * Although we might consider merging such entries in the same way that
 
705
         * we handle name conflicts for inherited attributes, it seems to make
 
706
         * more sense to assume such conflicts are errors.
 
707
         */
 
708
        foreach(entry, schema)
 
709
        {
 
710
                ColumnDef  *coldef = lfirst(entry);
 
711
                ListCell   *rest;
 
712
 
 
713
                for_each_cell(rest, lnext(entry))
 
714
                {
 
715
                        ColumnDef  *restdef = lfirst(rest);
 
716
 
 
717
                        if (strcmp(coldef->colname, restdef->colname) == 0)
 
718
                                ereport(ERROR,
 
719
                                                (errcode(ERRCODE_DUPLICATE_COLUMN),
 
720
                                                 errmsg("column \"%s\" duplicated",
 
721
                                                                coldef->colname)));
 
722
                }
 
723
        }
 
724
 
 
725
        /*
 
726
         * Scan the parents left-to-right, and merge their attributes to form
 
727
         * a list of inherited attributes (inhSchema).  Also check to see if
 
728
         * we need to inherit an OID column.
 
729
         */
 
730
        child_attno = 0;
 
731
        foreach(entry, supers)
 
732
        {
 
733
                RangeVar   *parent = (RangeVar *) lfirst(entry);
 
734
                Relation        relation;
 
735
                TupleDesc       tupleDesc;
 
736
                TupleConstr *constr;
 
737
                AttrNumber *newattno;
 
738
                AttrNumber      parent_attno;
 
739
 
 
740
                relation = heap_openrv(parent, AccessShareLock);
 
741
 
 
742
                if (relation->rd_rel->relkind != RELKIND_RELATION)
 
743
                        ereport(ERROR,
 
744
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
 
745
                                         errmsg("inherited relation \"%s\" is not a table",
 
746
                                                        parent->relname)));
 
747
                /* Permanent rels cannot inherit from temporary ones */
 
748
                if (!istemp && isTempNamespace(RelationGetNamespace(relation)))
 
749
                        ereport(ERROR,
 
750
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
 
751
                                  errmsg("cannot inherit from temporary relation \"%s\"",
 
752
                                                 parent->relname)));
 
753
 
 
754
                /*
 
755
                 * We should have an UNDER permission flag for this, but for now,
 
756
                 * demand that creator of a child table own the parent.
 
757
                 */
 
758
                if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
 
759
                        aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
 
760
                                                   RelationGetRelationName(relation));
 
761
 
 
762
                /*
 
763
                 * Reject duplications in the list of parents.
 
764
                 */
 
765
                if (list_member_oid(parentOids, RelationGetRelid(relation)))
 
766
                        ereport(ERROR,
 
767
                                        (errcode(ERRCODE_DUPLICATE_TABLE),
 
768
                                         errmsg("inherited relation \"%s\" duplicated",
 
769
                                                        parent->relname)));
 
770
 
 
771
                parentOids = lappend_oid(parentOids, RelationGetRelid(relation));
 
772
 
 
773
                if (relation->rd_rel->relhasoids)
 
774
                        parentsWithOids++;
 
775
 
 
776
                tupleDesc = RelationGetDescr(relation);
 
777
                constr = tupleDesc->constr;
 
778
 
 
779
                /*
 
780
                 * newattno[] will contain the child-table attribute numbers for
 
781
                 * the attributes of this parent table.  (They are not the same
 
782
                 * for parents after the first one, nor if we have dropped
 
783
                 * columns.)
 
784
                 */
 
785
                newattno = (AttrNumber *)
 
786
                        palloc(tupleDesc->natts * sizeof(AttrNumber));
 
787
 
 
788
                for (parent_attno = 1; parent_attno <= tupleDesc->natts;
 
789
                         parent_attno++)
 
790
                {
 
791
                        Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
 
792
                        char       *attributeName = NameStr(attribute->attname);
 
793
                        int                     exist_attno;
 
794
                        ColumnDef  *def;
 
795
                        TypeName   *typename;
 
796
 
 
797
                        /*
 
798
                         * Ignore dropped columns in the parent.
 
799
                         */
 
800
                        if (attribute->attisdropped)
 
801
                        {
 
802
                                /*
 
803
                                 * change_varattnos_of_a_node asserts that this is greater
 
804
                                 * than zero, so if anything tries to use it, we should
 
805
                                 * find out.
 
806
                                 */
 
807
                                newattno[parent_attno - 1] = 0;
 
808
                                continue;
 
809
                        }
 
810
 
 
811
                        /*
 
812
                         * Does it conflict with some previously inherited column?
 
813
                         */
 
814
                        exist_attno = findAttrByName(attributeName, inhSchema);
 
815
                        if (exist_attno > 0)
 
816
                        {
 
817
                                /*
 
818
                                 * Yes, try to merge the two column definitions. They must
 
819
                                 * have the same type and typmod.
 
820
                                 */
 
821
                                ereport(NOTICE,
 
822
                                                (errmsg("merging multiple inherited definitions of column \"%s\"",
 
823
                                                                attributeName)));
 
824
                                def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
 
825
                                if (typenameTypeId(def->typename) != attribute->atttypid ||
 
826
                                        def->typename->typmod != attribute->atttypmod)
 
827
                                        ereport(ERROR,
 
828
                                                        (errcode(ERRCODE_DATATYPE_MISMATCH),
 
829
                                        errmsg("inherited column \"%s\" has a type conflict",
 
830
                                                   attributeName),
 
831
                                                         errdetail("%s versus %s",
 
832
                                                                           TypeNameToString(def->typename),
 
833
                                                                  format_type_be(attribute->atttypid))));
 
834
                                def->inhcount++;
 
835
                                /* Merge of NOT NULL constraints = OR 'em together */
 
836
                                def->is_not_null |= attribute->attnotnull;
 
837
                                /* Default and other constraints are handled below */
 
838
                                newattno[parent_attno - 1] = exist_attno;
 
839
                        }
 
840
                        else
 
841
                        {
 
842
                                /*
 
843
                                 * No, create a new inherited column
 
844
                                 */
 
845
                                def = makeNode(ColumnDef);
 
846
                                def->colname = pstrdup(attributeName);
 
847
                                typename = makeNode(TypeName);
 
848
                                typename->typeid = attribute->atttypid;
 
849
                                typename->typmod = attribute->atttypmod;
 
850
                                def->typename = typename;
 
851
                                def->inhcount = 1;
 
852
                                def->is_local = false;
 
853
                                def->is_not_null = attribute->attnotnull;
 
854
                                def->raw_default = NULL;
 
855
                                def->cooked_default = NULL;
 
856
                                def->constraints = NIL;
 
857
                                def->support = NULL;
 
858
                                inhSchema = lappend(inhSchema, def);
 
859
                                newattno[parent_attno - 1] = ++child_attno;
 
860
                        }
 
861
 
 
862
                        /*
 
863
                         * Copy default if any
 
864
                         */
 
865
                        if (attribute->atthasdef)
 
866
                        {
 
867
                                char       *this_default = NULL;
 
868
                                AttrDefault *attrdef;
 
869
                                int                     i;
 
870
 
 
871
                                /* Find default in constraint structure */
 
872
                                Assert(constr != NULL);
 
873
                                attrdef = constr->defval;
 
874
                                for (i = 0; i < constr->num_defval; i++)
 
875
                                {
 
876
                                        if (attrdef[i].adnum == parent_attno)
 
877
                                        {
 
878
                                                this_default = attrdef[i].adbin;
 
879
                                                break;
 
880
                                        }
 
881
                                }
 
882
                                Assert(this_default != NULL);
 
883
 
 
884
                                /*
 
885
                                 * If default expr could contain any vars, we'd need to
 
886
                                 * fix 'em, but it can't; so default is ready to apply to
 
887
                                 * child.
 
888
                                 *
 
889
                                 * If we already had a default from some prior parent, check
 
890
                                 * to see if they are the same.  If so, no problem; if
 
891
                                 * not, mark the column as having a bogus default. Below,
 
892
                                 * we will complain if the bogus default isn't overridden
 
893
                                 * by the child schema.
 
894
                                 */
 
895
                                Assert(def->raw_default == NULL);
 
896
                                if (def->cooked_default == NULL)
 
897
                                        def->cooked_default = pstrdup(this_default);
 
898
                                else if (strcmp(def->cooked_default, this_default) != 0)
 
899
                                {
 
900
                                        def->cooked_default = bogus_marker;
 
901
                                        have_bogus_defaults = true;
 
902
                                }
 
903
                        }
 
904
                }
 
905
 
 
906
                /*
 
907
                 * Now copy the constraints of this parent, adjusting attnos using
 
908
                 * the completed newattno[] map
 
909
                 */
 
910
                if (constr && constr->num_check > 0)
 
911
                {
 
912
                        ConstrCheck *check = constr->check;
 
913
                        int                     i;
 
914
 
 
915
                        for (i = 0; i < constr->num_check; i++)
 
916
                        {
 
917
                                Constraint *cdef = makeNode(Constraint);
 
918
                                Node       *expr;
 
919
 
 
920
                                cdef->contype = CONSTR_CHECK;
 
921
                                cdef->name = pstrdup(check[i].ccname);
 
922
                                cdef->raw_expr = NULL;
 
923
                                /* adjust varattnos of ccbin here */
 
924
                                expr = stringToNode(check[i].ccbin);
 
925
                                change_varattnos_of_a_node(expr, newattno);
 
926
                                cdef->cooked_expr = nodeToString(expr);
 
927
                                constraints = lappend(constraints, cdef);
 
928
                        }
 
929
                }
 
930
 
 
931
                pfree(newattno);
 
932
 
 
933
                /*
 
934
                 * Close the parent rel, but keep our AccessShareLock on it until
 
935
                 * xact commit.  That will prevent someone else from deleting or
 
936
                 * ALTERing the parent before the child is committed.
 
937
                 */
 
938
                heap_close(relation, NoLock);
 
939
        }
 
940
 
 
941
        /*
 
942
         * If we had no inherited attributes, the result schema is just the
 
943
         * explicitly declared columns.  Otherwise, we need to merge the
 
944
         * declared columns into the inherited schema list.
 
945
         */
 
946
        if (inhSchema != NIL)
 
947
        {
 
948
                foreach(entry, schema)
 
949
                {
 
950
                        ColumnDef  *newdef = lfirst(entry);
 
951
                        char       *attributeName = newdef->colname;
 
952
                        int                     exist_attno;
 
953
 
 
954
                        /*
 
955
                         * Does it conflict with some previously inherited column?
 
956
                         */
 
957
                        exist_attno = findAttrByName(attributeName, inhSchema);
 
958
                        if (exist_attno > 0)
 
959
                        {
 
960
                                ColumnDef  *def;
 
961
 
 
962
                                /*
 
963
                                 * Yes, try to merge the two column definitions. They must
 
964
                                 * have the same type and typmod.
 
965
                                 */
 
966
                                ereport(NOTICE,
 
967
                                (errmsg("merging column \"%s\" with inherited definition",
 
968
                                                attributeName)));
 
969
                                def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
 
970
                                if (typenameTypeId(def->typename) != typenameTypeId(newdef->typename) ||
 
971
                                        def->typename->typmod != newdef->typename->typmod)
 
972
                                        ereport(ERROR,
 
973
                                                        (errcode(ERRCODE_DATATYPE_MISMATCH),
 
974
                                                         errmsg("column \"%s\" has a type conflict",
 
975
                                                                        attributeName),
 
976
                                                         errdetail("%s versus %s",
 
977
                                                                           TypeNameToString(def->typename),
 
978
                                                                   TypeNameToString(newdef->typename))));
 
979
                                /* Mark the column as locally defined */
 
980
                                def->is_local = true;
 
981
                                /* Merge of NOT NULL constraints = OR 'em together */
 
982
                                def->is_not_null |= newdef->is_not_null;
 
983
                                /* If new def has a default, override previous default */
 
984
                                if (newdef->raw_default != NULL)
 
985
                                {
 
986
                                        def->raw_default = newdef->raw_default;
 
987
                                        def->cooked_default = newdef->cooked_default;
 
988
                                }
 
989
                        }
 
990
                        else
 
991
                        {
 
992
                                /*
 
993
                                 * No, attach new column to result schema
 
994
                                 */
 
995
                                inhSchema = lappend(inhSchema, newdef);
 
996
                        }
 
997
                }
 
998
 
 
999
                schema = inhSchema;
 
1000
 
 
1001
                /*
 
1002
                 * Check that we haven't exceeded the legal # of columns after
 
1003
                 * merging in inherited columns.
 
1004
                 */
 
1005
                if (list_length(schema) > MaxHeapAttributeNumber)
 
1006
                        ereport(ERROR,
 
1007
                                        (errcode(ERRCODE_TOO_MANY_COLUMNS),
 
1008
                                         errmsg("tables can have at most %d columns",
 
1009
                                                        MaxHeapAttributeNumber)));
 
1010
        }
 
1011
 
 
1012
        /*
 
1013
         * If we found any conflicting parent default values, check to make
 
1014
         * sure they were overridden by the child.
 
1015
         */
 
1016
        if (have_bogus_defaults)
 
1017
        {
 
1018
                foreach(entry, schema)
 
1019
                {
 
1020
                        ColumnDef  *def = lfirst(entry);
 
1021
 
 
1022
                        if (def->cooked_default == bogus_marker)
 
1023
                                ereport(ERROR,
 
1024
                                                (errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
 
1025
                                                 errmsg("column \"%s\" inherits conflicting default values",
 
1026
                                                                def->colname),
 
1027
                                                 errhint("To resolve the conflict, specify a default explicitly.")));
 
1028
                }
 
1029
        }
 
1030
 
 
1031
        *supOids = parentOids;
 
1032
        *supconstr = constraints;
 
1033
        *supOidCount = parentsWithOids;
 
1034
        return schema;
 
1035
}
 
1036
 
 
1037
/*
 
1038
 * complementary static functions for MergeAttributes().
 
1039
 *
 
1040
 * Varattnos of pg_constraint.conbin must be rewritten when subclasses inherit
 
1041
 * constraints from parent classes, since the inherited attributes could
 
1042
 * be given different column numbers in multiple-inheritance cases.
 
1043
 *
 
1044
 * Note that the passed node tree is modified in place!
 
1045
 */
 
1046
static bool
 
1047
change_varattnos_walker(Node *node, const AttrNumber *newattno)
 
1048
{
 
1049
        if (node == NULL)
 
1050
                return false;
 
1051
        if (IsA(node, Var))
 
1052
        {
 
1053
                Var                *var = (Var *) node;
 
1054
 
 
1055
                if (var->varlevelsup == 0 && var->varno == 1 &&
 
1056
                        var->varattno > 0)
 
1057
                {
 
1058
                        /*
 
1059
                         * ??? the following may be a problem when the node is
 
1060
                         * multiply referenced though stringToNode() doesn't create
 
1061
                         * such a node currently.
 
1062
                         */
 
1063
                        Assert(newattno[var->varattno - 1] > 0);
 
1064
                        var->varattno = newattno[var->varattno - 1];
 
1065
                }
 
1066
                return false;
 
1067
        }
 
1068
        return expression_tree_walker(node, change_varattnos_walker,
 
1069
                                                                  (void *) newattno);
 
1070
}
 
1071
 
 
1072
static bool
 
1073
change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
 
1074
{
 
1075
        return change_varattnos_walker(node, newattno);
 
1076
}
 
1077
 
 
1078
/*
 
1079
 * StoreCatalogInheritance
 
1080
 *              Updates the system catalogs with proper inheritance information.
 
1081
 *
 
1082
 * supers is a list of the OIDs of the new relation's direct ancestors.
 
1083
 */
 
1084
static void
 
1085
StoreCatalogInheritance(Oid relationId, List *supers)
 
1086
{
 
1087
        Relation        relation;
 
1088
        TupleDesc       desc;
 
1089
        int16           seqNumber;
 
1090
        ListCell   *entry;
 
1091
        HeapTuple       tuple;
 
1092
 
 
1093
        /*
 
1094
         * sanity checks
 
1095
         */
 
1096
        AssertArg(OidIsValid(relationId));
 
1097
 
 
1098
        if (supers == NIL)
 
1099
                return;
 
1100
 
 
1101
        /*
 
1102
         * Store INHERITS information in pg_inherits using direct ancestors
 
1103
         * only. Also enter dependencies on the direct ancestors, and make
 
1104
         * sure they are marked with relhassubclass = true.
 
1105
         *
 
1106
         * (Once upon a time, both direct and indirect ancestors were found here
 
1107
         * and then entered into pg_ipl.  Since that catalog doesn't exist
 
1108
         * anymore, there's no need to look for indirect ancestors.)
 
1109
         */
 
1110
        relation = heap_openr(InheritsRelationName, RowExclusiveLock);
 
1111
        desc = RelationGetDescr(relation);
 
1112
 
 
1113
        seqNumber = 1;
 
1114
        foreach(entry, supers)
 
1115
        {
 
1116
                Oid                     parentOid = lfirst_oid(entry);
 
1117
                Datum           datum[Natts_pg_inherits];
 
1118
                char            nullarr[Natts_pg_inherits];
 
1119
                ObjectAddress childobject,
 
1120
                                        parentobject;
 
1121
 
 
1122
                datum[0] = ObjectIdGetDatum(relationId);                /* inhrel */
 
1123
                datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
 
1124
                datum[2] = Int16GetDatum(seqNumber);    /* inhseqno */
 
1125
 
 
1126
                nullarr[0] = ' ';
 
1127
                nullarr[1] = ' ';
 
1128
                nullarr[2] = ' ';
 
1129
 
 
1130
                tuple = heap_formtuple(desc, datum, nullarr);
 
1131
 
 
1132
                simple_heap_insert(relation, tuple);
 
1133
 
 
1134
                CatalogUpdateIndexes(relation, tuple);
 
1135
 
 
1136
                heap_freetuple(tuple);
 
1137
 
 
1138
                /*
 
1139
                 * Store a dependency too
 
1140
                 */
 
1141
                parentobject.classId = RelOid_pg_class;
 
1142
                parentobject.objectId = parentOid;
 
1143
                parentobject.objectSubId = 0;
 
1144
                childobject.classId = RelOid_pg_class;
 
1145
                childobject.objectId = relationId;
 
1146
                childobject.objectSubId = 0;
 
1147
 
 
1148
                recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
 
1149
 
 
1150
                /*
 
1151
                 * Mark the parent as having subclasses.
 
1152
                 */
 
1153
                setRelhassubclassInRelation(parentOid, true);
 
1154
 
 
1155
                seqNumber += 1;
 
1156
        }
 
1157
 
 
1158
        heap_close(relation, RowExclusiveLock);
 
1159
}
 
1160
 
 
1161
/*
 
1162
 * Look for an existing schema entry with the given name.
 
1163
 *
 
1164
 * Returns the index (starting with 1) if attribute already exists in schema,
 
1165
 * 0 if it doesn't.
 
1166
 */
 
1167
static int
 
1168
findAttrByName(const char *attributeName, List *schema)
 
1169
{
 
1170
        ListCell   *s;
 
1171
        int                     i = 1;
 
1172
 
 
1173
        foreach(s, schema)
 
1174
        {
 
1175
                ColumnDef  *def = lfirst(s);
 
1176
 
 
1177
                if (strcmp(attributeName, def->colname) == 0)
 
1178
                        return i;
 
1179
 
 
1180
                i++;
 
1181
        }
 
1182
        return 0;
 
1183
}
 
1184
 
 
1185
/*
 
1186
 * Update a relation's pg_class.relhassubclass entry to the given value
 
1187
 */
 
1188
static void
 
1189
setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
 
1190
{
 
1191
        Relation        relationRelation;
 
1192
        HeapTuple       tuple;
 
1193
        Form_pg_class classtuple;
 
1194
 
 
1195
        /*
 
1196
         * Fetch a modifiable copy of the tuple, modify it, update pg_class.
 
1197
         *
 
1198
         * If the tuple already has the right relhassubclass setting, we don't
 
1199
         * need to update it, but we still need to issue an SI inval message.
 
1200
         */
 
1201
        relationRelation = heap_openr(RelationRelationName, RowExclusiveLock);
 
1202
        tuple = SearchSysCacheCopy(RELOID,
 
1203
                                                           ObjectIdGetDatum(relationId),
 
1204
                                                           0, 0, 0);
 
1205
        if (!HeapTupleIsValid(tuple))
 
1206
                elog(ERROR, "cache lookup failed for relation %u", relationId);
 
1207
        classtuple = (Form_pg_class) GETSTRUCT(tuple);
 
1208
 
 
1209
        if (classtuple->relhassubclass != relhassubclass)
 
1210
        {
 
1211
                classtuple->relhassubclass = relhassubclass;
 
1212
                simple_heap_update(relationRelation, &tuple->t_self, tuple);
 
1213
 
 
1214
                /* keep the catalog indexes up to date */
 
1215
                CatalogUpdateIndexes(relationRelation, tuple);
 
1216
        }
 
1217
        else
 
1218
        {
 
1219
                /* no need to change tuple, but force relcache rebuild anyway */
 
1220
                CacheInvalidateRelcacheByTuple(tuple);
 
1221
        }
 
1222
 
 
1223
        heap_freetuple(tuple);
 
1224
        heap_close(relationRelation, RowExclusiveLock);
 
1225
}
 
1226
 
 
1227
 
 
1228
/*
 
1229
 *              renameatt               - changes the name of a attribute in a relation
 
1230
 *
 
1231
 *              Attname attribute is changed in attribute catalog.
 
1232
 *              No record of the previous attname is kept (correct?).
 
1233
 *
 
1234
 *              get proper relrelation from relation catalog (if not arg)
 
1235
 *              scan attribute catalog
 
1236
 *                              for name conflict (within rel)
 
1237
 *                              for original attribute (if not arg)
 
1238
 *              modify attname in attribute tuple
 
1239
 *              insert modified attribute in attribute catalog
 
1240
 *              delete original attribute from attribute catalog
 
1241
 */
 
1242
void
 
1243
renameatt(Oid myrelid,
 
1244
                  const char *oldattname,
 
1245
                  const char *newattname,
 
1246
                  bool recurse,
 
1247
                  bool recursing)
 
1248
{
 
1249
        Relation        targetrelation;
 
1250
        Relation        attrelation;
 
1251
        HeapTuple       atttup;
 
1252
        Form_pg_attribute attform;
 
1253
        int                     attnum;
 
1254
        List       *indexoidlist;
 
1255
        ListCell   *indexoidscan;
 
1256
 
 
1257
        /*
 
1258
         * Grab an exclusive lock on the target table, which we will NOT
 
1259
         * release until end of transaction.
 
1260
         */
 
1261
        targetrelation = relation_open(myrelid, AccessExclusiveLock);
 
1262
 
 
1263
        /*
 
1264
         * permissions checking.  this would normally be done in utility.c,
 
1265
         * but this particular routine is recursive.
 
1266
         *
 
1267
         * normally, only the owner of a class can change its schema.
 
1268
         */
 
1269
        if (!pg_class_ownercheck(myrelid, GetUserId()))
 
1270
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
 
1271
                                           RelationGetRelationName(targetrelation));
 
1272
        if (!allowSystemTableMods && IsSystemRelation(targetrelation))
 
1273
                ereport(ERROR,
 
1274
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 
1275
                                 errmsg("permission denied: \"%s\" is a system catalog",
 
1276
                                                RelationGetRelationName(targetrelation))));
 
1277
 
 
1278
        /*
 
1279
         * if the 'recurse' flag is set then we are supposed to rename this
 
1280
         * attribute in all classes that inherit from 'relname' (as well as in
 
1281
         * 'relname').
 
1282
         *
 
1283
         * any permissions or problems with duplicate attributes will cause the
 
1284
         * whole transaction to abort, which is what we want -- all or
 
1285
         * nothing.
 
1286
         */
 
1287
        if (recurse)
 
1288
        {
 
1289
                ListCell   *child;
 
1290
                List       *children;
 
1291
 
 
1292
                /* this routine is actually in the planner */
 
1293
                children = find_all_inheritors(myrelid);
 
1294
 
 
1295
                /*
 
1296
                 * find_all_inheritors does the recursive search of the
 
1297
                 * inheritance hierarchy, so all we have to do is process all of
 
1298
                 * the relids in the list that it returns.
 
1299
                 */
 
1300
                foreach(child, children)
 
1301
                {
 
1302
                        Oid                     childrelid = lfirst_oid(child);
 
1303
 
 
1304
                        if (childrelid == myrelid)
 
1305
                                continue;
 
1306
                        /* note we need not recurse again */
 
1307
                        renameatt(childrelid, oldattname, newattname, false, true);
 
1308
                }
 
1309
        }
 
1310
        else
 
1311
        {
 
1312
                /*
 
1313
                 * If we are told not to recurse, there had better not be any
 
1314
                 * child tables; else the rename would put them out of step.
 
1315
                 */
 
1316
                if (!recursing &&
 
1317
                        find_inheritance_children(myrelid) != NIL)
 
1318
                        ereport(ERROR,
 
1319
                                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 
1320
                                         errmsg("inherited column \"%s\" must be renamed in child tables too",
 
1321
                                                        oldattname)));
 
1322
        }
 
1323
 
 
1324
        attrelation = heap_openr(AttributeRelationName, RowExclusiveLock);
 
1325
 
 
1326
        atttup = SearchSysCacheCopyAttName(myrelid, oldattname);
 
1327
        if (!HeapTupleIsValid(atttup))
 
1328
                ereport(ERROR,
 
1329
                                (errcode(ERRCODE_UNDEFINED_COLUMN),
 
1330
                                 errmsg("column \"%s\" does not exist",
 
1331
                                                oldattname)));
 
1332
        attform = (Form_pg_attribute) GETSTRUCT(atttup);
 
1333
 
 
1334
        attnum = attform->attnum;
 
1335
        if (attnum <= 0)
 
1336
                ereport(ERROR,
 
1337
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
1338
                                 errmsg("cannot rename system column \"%s\"",
 
1339
                                                oldattname)));
 
1340
 
 
1341
        /*
 
1342
         * if the attribute is inherited, forbid the renaming, unless we are
 
1343
         * already inside a recursive rename.
 
1344
         */
 
1345
        if (attform->attinhcount > 0 && !recursing)
 
1346
                ereport(ERROR,
 
1347
                                (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 
1348
                                 errmsg("cannot rename inherited column \"%s\"",
 
1349
                                                oldattname)));
 
1350
 
 
1351
        /* should not already exist */
 
1352
        /* this test is deliberately not attisdropped-aware */
 
1353
        if (SearchSysCacheExists(ATTNAME,
 
1354
                                                         ObjectIdGetDatum(myrelid),
 
1355
                                                         PointerGetDatum(newattname),
 
1356
                                                         0, 0))
 
1357
                ereport(ERROR,
 
1358
                                (errcode(ERRCODE_DUPLICATE_COLUMN),
 
1359
                                 errmsg("column \"%s\" of relation \"%s\" already exists",
 
1360
                                  newattname, RelationGetRelationName(targetrelation))));
 
1361
 
 
1362
        namestrcpy(&(attform->attname), newattname);
 
1363
 
 
1364
        simple_heap_update(attrelation, &atttup->t_self, atttup);
 
1365
 
 
1366
        /* keep system catalog indexes current */
 
1367
        CatalogUpdateIndexes(attrelation, atttup);
 
1368
 
 
1369
        heap_freetuple(atttup);
 
1370
 
 
1371
        /*
 
1372
         * Update column names of indexes that refer to the column being
 
1373
         * renamed.
 
1374
         */
 
1375
        indexoidlist = RelationGetIndexList(targetrelation);
 
1376
 
 
1377
        foreach(indexoidscan, indexoidlist)
 
1378
        {
 
1379
                Oid                     indexoid = lfirst_oid(indexoidscan);
 
1380
                HeapTuple       indextup;
 
1381
                Form_pg_index indexform;
 
1382
                int                     i;
 
1383
 
 
1384
                /*
 
1385
                 * Scan through index columns to see if there's any simple index
 
1386
                 * entries for this attribute.  We ignore expressional entries.
 
1387
                 */
 
1388
                indextup = SearchSysCache(INDEXRELID,
 
1389
                                                                  ObjectIdGetDatum(indexoid),
 
1390
                                                                  0, 0, 0);
 
1391
                if (!HeapTupleIsValid(indextup))
 
1392
                        elog(ERROR, "cache lookup failed for index %u", indexoid);
 
1393
                indexform = (Form_pg_index) GETSTRUCT(indextup);
 
1394
 
 
1395
                for (i = 0; i < indexform->indnatts; i++)
 
1396
                {
 
1397
                        if (attnum != indexform->indkey[i])
 
1398
                                continue;
 
1399
 
 
1400
                        /*
 
1401
                         * Found one, rename it.
 
1402
                         */
 
1403
                        atttup = SearchSysCacheCopy(ATTNUM,
 
1404
                                                                                ObjectIdGetDatum(indexoid),
 
1405
                                                                                Int16GetDatum(i + 1),
 
1406
                                                                                0, 0);
 
1407
                        if (!HeapTupleIsValid(atttup))
 
1408
                                continue;               /* should we raise an error? */
 
1409
 
 
1410
                        /*
 
1411
                         * Update the (copied) attribute tuple.
 
1412
                         */
 
1413
                        namestrcpy(&(((Form_pg_attribute) GETSTRUCT(atttup))->attname),
 
1414
                                           newattname);
 
1415
 
 
1416
                        simple_heap_update(attrelation, &atttup->t_self, atttup);
 
1417
 
 
1418
                        /* keep system catalog indexes current */
 
1419
                        CatalogUpdateIndexes(attrelation, atttup);
 
1420
 
 
1421
                        heap_freetuple(atttup);
 
1422
                }
 
1423
 
 
1424
                ReleaseSysCache(indextup);
 
1425
        }
 
1426
 
 
1427
        list_free(indexoidlist);
 
1428
 
 
1429
        heap_close(attrelation, RowExclusiveLock);
 
1430
 
 
1431
        /*
 
1432
         * Update att name in any RI triggers associated with the relation.
 
1433
         */
 
1434
        if (targetrelation->rd_rel->reltriggers > 0)
 
1435
        {
 
1436
                /* update tgargs column reference where att is primary key */
 
1437
                update_ri_trigger_args(RelationGetRelid(targetrelation),
 
1438
                                                           oldattname, newattname,
 
1439
                                                           false, false);
 
1440
                /* update tgargs column reference where att is foreign key */
 
1441
                update_ri_trigger_args(RelationGetRelid(targetrelation),
 
1442
                                                           oldattname, newattname,
 
1443
                                                           true, false);
 
1444
        }
 
1445
 
 
1446
        relation_close(targetrelation, NoLock);         /* close rel but keep lock */
 
1447
}
 
1448
 
 
1449
/*
 
1450
 *              renamerel               - change the name of a relation
 
1451
 *
 
1452
 *              XXX - When renaming sequences, we don't bother to modify the
 
1453
 *                        sequence name that is stored within the sequence itself
 
1454
 *                        (this would cause problems with MVCC). In the future,
 
1455
 *                        the sequence name should probably be removed from the
 
1456
 *                        sequence, AFAIK there's no need for it to be there.
 
1457
 */
 
1458
void
 
1459
renamerel(Oid myrelid, const char *newrelname)
 
1460
{
 
1461
        Relation        targetrelation;
 
1462
        Relation        relrelation;    /* for RELATION relation */
 
1463
        HeapTuple       reltup;
 
1464
        Oid                     namespaceId;
 
1465
        char       *oldrelname;
 
1466
        char            relkind;
 
1467
        bool            relhastriggers;
 
1468
 
 
1469
        /*
 
1470
         * Grab an exclusive lock on the target table or index, which we will
 
1471
         * NOT release until end of transaction.
 
1472
         */
 
1473
        targetrelation = relation_open(myrelid, AccessExclusiveLock);
 
1474
 
 
1475
        oldrelname = pstrdup(RelationGetRelationName(targetrelation));
 
1476
        namespaceId = RelationGetNamespace(targetrelation);
 
1477
 
 
1478
        if (!allowSystemTableMods && IsSystemRelation(targetrelation))
 
1479
                ereport(ERROR,
 
1480
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 
1481
                                 errmsg("permission denied: \"%s\" is a system catalog",
 
1482
                                                RelationGetRelationName(targetrelation))));
 
1483
 
 
1484
        relkind = targetrelation->rd_rel->relkind;
 
1485
        relhastriggers = (targetrelation->rd_rel->reltriggers > 0);
 
1486
 
 
1487
        /*
 
1488
         * Find relation's pg_class tuple, and make sure newrelname isn't in
 
1489
         * use.
 
1490
         */
 
1491
        relrelation = heap_openr(RelationRelationName, RowExclusiveLock);
 
1492
 
 
1493
        reltup = SearchSysCacheCopy(RELOID,
 
1494
                                                                PointerGetDatum(myrelid),
 
1495
                                                                0, 0, 0);
 
1496
        if (!HeapTupleIsValid(reltup))          /* shouldn't happen */
 
1497
                elog(ERROR, "cache lookup failed for relation %u", myrelid);
 
1498
 
 
1499
        if (get_relname_relid(newrelname, namespaceId) != InvalidOid)
 
1500
                ereport(ERROR,
 
1501
                                (errcode(ERRCODE_DUPLICATE_TABLE),
 
1502
                                 errmsg("relation \"%s\" already exists",
 
1503
                                                newrelname)));
 
1504
 
 
1505
        /*
 
1506
         * Update pg_class tuple with new relname.      (Scribbling on reltup is
 
1507
         * OK because it's a copy...)
 
1508
         */
 
1509
        namestrcpy(&(((Form_pg_class) GETSTRUCT(reltup))->relname), newrelname);
 
1510
 
 
1511
        simple_heap_update(relrelation, &reltup->t_self, reltup);
 
1512
 
 
1513
        /* keep the system catalog indexes current */
 
1514
        CatalogUpdateIndexes(relrelation, reltup);
 
1515
 
 
1516
        heap_freetuple(reltup);
 
1517
        heap_close(relrelation, RowExclusiveLock);
 
1518
 
 
1519
        /*
 
1520
         * Also rename the associated type, if any.
 
1521
         */
 
1522
        if (relkind != RELKIND_INDEX)
 
1523
                TypeRename(oldrelname, namespaceId, newrelname);
 
1524
 
 
1525
        /*
 
1526
         * Update rel name in any RI triggers associated with the relation.
 
1527
         */
 
1528
        if (relhastriggers)
 
1529
        {
 
1530
                /* update tgargs where relname is primary key */
 
1531
                update_ri_trigger_args(myrelid,
 
1532
                                                           oldrelname,
 
1533
                                                           newrelname,
 
1534
                                                           false, true);
 
1535
                /* update tgargs where relname is foreign key */
 
1536
                update_ri_trigger_args(myrelid,
 
1537
                                                           oldrelname,
 
1538
                                                           newrelname,
 
1539
                                                           true, true);
 
1540
        }
 
1541
 
 
1542
        /*
 
1543
         * Close rel, but keep exclusive lock!
 
1544
         */
 
1545
        relation_close(targetrelation, NoLock);
 
1546
}
 
1547
 
 
1548
 
 
1549
/*
 
1550
 * Given a trigger function OID, determine whether it is an RI trigger,
 
1551
 * and if so whether it is attached to PK or FK relation.
 
1552
 *
 
1553
 * XXX this probably doesn't belong here; should be exported by
 
1554
 * ri_triggers.c
 
1555
 */
 
1556
static int
 
1557
ri_trigger_type(Oid tgfoid)
 
1558
{
 
1559
        switch (tgfoid)
 
1560
        {
 
1561
                case F_RI_FKEY_CASCADE_DEL:
 
1562
                case F_RI_FKEY_CASCADE_UPD:
 
1563
                case F_RI_FKEY_RESTRICT_DEL:
 
1564
                case F_RI_FKEY_RESTRICT_UPD:
 
1565
                case F_RI_FKEY_SETNULL_DEL:
 
1566
                case F_RI_FKEY_SETNULL_UPD:
 
1567
                case F_RI_FKEY_SETDEFAULT_DEL:
 
1568
                case F_RI_FKEY_SETDEFAULT_UPD:
 
1569
                case F_RI_FKEY_NOACTION_DEL:
 
1570
                case F_RI_FKEY_NOACTION_UPD:
 
1571
                        return RI_TRIGGER_PK;
 
1572
 
 
1573
                case F_RI_FKEY_CHECK_INS:
 
1574
                case F_RI_FKEY_CHECK_UPD:
 
1575
                        return RI_TRIGGER_FK;
 
1576
        }
 
1577
 
 
1578
        return RI_TRIGGER_NONE;
 
1579
}
 
1580
 
 
1581
/*
 
1582
 * Scan pg_trigger for RI triggers that are on the specified relation
 
1583
 * (if fk_scan is false) or have it as the tgconstrrel (if fk_scan
 
1584
 * is true).  Update RI trigger args fields matching oldname to contain
 
1585
 * newname instead.  If update_relname is true, examine the relname
 
1586
 * fields; otherwise examine the attname fields.
 
1587
 */
 
1588
static void
 
1589
update_ri_trigger_args(Oid relid,
 
1590
                                           const char *oldname,
 
1591
                                           const char *newname,
 
1592
                                           bool fk_scan,
 
1593
                                           bool update_relname)
 
1594
{
 
1595
        Relation        tgrel;
 
1596
        ScanKeyData skey[1];
 
1597
        SysScanDesc trigscan;
 
1598
        HeapTuple       tuple;
 
1599
        Datum           values[Natts_pg_trigger];
 
1600
        char            nulls[Natts_pg_trigger];
 
1601
        char            replaces[Natts_pg_trigger];
 
1602
 
 
1603
        tgrel = heap_openr(TriggerRelationName, RowExclusiveLock);
 
1604
        if (fk_scan)
 
1605
        {
 
1606
                ScanKeyInit(&skey[0],
 
1607
                                        Anum_pg_trigger_tgconstrrelid,
 
1608
                                        BTEqualStrategyNumber, F_OIDEQ,
 
1609
                                        ObjectIdGetDatum(relid));
 
1610
                trigscan = systable_beginscan(tgrel, TriggerConstrRelidIndex,
 
1611
                                                                          true, SnapshotNow,
 
1612
                                                                          1, skey);
 
1613
        }
 
1614
        else
 
1615
        {
 
1616
                ScanKeyInit(&skey[0],
 
1617
                                        Anum_pg_trigger_tgrelid,
 
1618
                                        BTEqualStrategyNumber, F_OIDEQ,
 
1619
                                        ObjectIdGetDatum(relid));
 
1620
                trigscan = systable_beginscan(tgrel, TriggerRelidNameIndex,
 
1621
                                                                          true, SnapshotNow,
 
1622
                                                                          1, skey);
 
1623
        }
 
1624
 
 
1625
        while ((tuple = systable_getnext(trigscan)) != NULL)
 
1626
        {
 
1627
                Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
 
1628
                bytea      *val;
 
1629
                bytea      *newtgargs;
 
1630
                bool            isnull;
 
1631
                int                     tg_type;
 
1632
                bool            examine_pk;
 
1633
                bool            changed;
 
1634
                int                     tgnargs;
 
1635
                int                     i;
 
1636
                int                     newlen;
 
1637
                const char *arga[RI_MAX_ARGUMENTS];
 
1638
                const char *argp;
 
1639
 
 
1640
                tg_type = ri_trigger_type(pg_trigger->tgfoid);
 
1641
                if (tg_type == RI_TRIGGER_NONE)
 
1642
                {
 
1643
                        /* Not an RI trigger, forget it */
 
1644
                        continue;
 
1645
                }
 
1646
 
 
1647
                /*
 
1648
                 * It is an RI trigger, so parse the tgargs bytea.
 
1649
                 *
 
1650
                 * NB: we assume the field will never be compressed or moved out of
 
1651
                 * line; so does trigger.c ...
 
1652
                 */
 
1653
                tgnargs = pg_trigger->tgnargs;
 
1654
                val = (bytea *) fastgetattr(tuple,
 
1655
                                                                        Anum_pg_trigger_tgargs,
 
1656
                                                                        tgrel->rd_att, &isnull);
 
1657
                if (isnull || tgnargs < RI_FIRST_ATTNAME_ARGNO ||
 
1658
                        tgnargs > RI_MAX_ARGUMENTS)
 
1659
                {
 
1660
                        /* This probably shouldn't happen, but ignore busted triggers */
 
1661
                        continue;
 
1662
                }
 
1663
                argp = (const char *) VARDATA(val);
 
1664
                for (i = 0; i < tgnargs; i++)
 
1665
                {
 
1666
                        arga[i] = argp;
 
1667
                        argp += strlen(argp) + 1;
 
1668
                }
 
1669
 
 
1670
                /*
 
1671
                 * Figure out which item(s) to look at.  If the trigger is
 
1672
                 * primary-key type and attached to my rel, I should look at the
 
1673
                 * PK fields; if it is foreign-key type and attached to my rel, I
 
1674
                 * should look at the FK fields.  But the opposite rule holds when
 
1675
                 * examining triggers found by tgconstrrel search.
 
1676
                 */
 
1677
                examine_pk = (tg_type == RI_TRIGGER_PK) == (!fk_scan);
 
1678
 
 
1679
                changed = false;
 
1680
                if (update_relname)
 
1681
                {
 
1682
                        /* Change the relname if needed */
 
1683
                        i = examine_pk ? RI_PK_RELNAME_ARGNO : RI_FK_RELNAME_ARGNO;
 
1684
                        if (strcmp(arga[i], oldname) == 0)
 
1685
                        {
 
1686
                                arga[i] = newname;
 
1687
                                changed = true;
 
1688
                        }
 
1689
                }
 
1690
                else
 
1691
                {
 
1692
                        /* Change attname(s) if needed */
 
1693
                        i = examine_pk ? RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_PK_IDX :
 
1694
                                RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_FK_IDX;
 
1695
                        for (; i < tgnargs; i += 2)
 
1696
                        {
 
1697
                                if (strcmp(arga[i], oldname) == 0)
 
1698
                                {
 
1699
                                        arga[i] = newname;
 
1700
                                        changed = true;
 
1701
                                }
 
1702
                        }
 
1703
                }
 
1704
 
 
1705
                if (!changed)
 
1706
                {
 
1707
                        /* Don't need to update this tuple */
 
1708
                        continue;
 
1709
                }
 
1710
 
 
1711
                /*
 
1712
                 * Construct modified tgargs bytea.
 
1713
                 */
 
1714
                newlen = VARHDRSZ;
 
1715
                for (i = 0; i < tgnargs; i++)
 
1716
                        newlen += strlen(arga[i]) + 1;
 
1717
                newtgargs = (bytea *) palloc(newlen);
 
1718
                VARATT_SIZEP(newtgargs) = newlen;
 
1719
                newlen = VARHDRSZ;
 
1720
                for (i = 0; i < tgnargs; i++)
 
1721
                {
 
1722
                        strcpy(((char *) newtgargs) + newlen, arga[i]);
 
1723
                        newlen += strlen(arga[i]) + 1;
 
1724
                }
 
1725
 
 
1726
                /*
 
1727
                 * Build modified tuple.
 
1728
                 */
 
1729
                for (i = 0; i < Natts_pg_trigger; i++)
 
1730
                {
 
1731
                        values[i] = (Datum) 0;
 
1732
                        replaces[i] = ' ';
 
1733
                        nulls[i] = ' ';
 
1734
                }
 
1735
                values[Anum_pg_trigger_tgargs - 1] = PointerGetDatum(newtgargs);
 
1736
                replaces[Anum_pg_trigger_tgargs - 1] = 'r';
 
1737
 
 
1738
                tuple = heap_modifytuple(tuple, tgrel, values, nulls, replaces);
 
1739
 
 
1740
                /*
 
1741
                 * Update pg_trigger and its indexes
 
1742
                 */
 
1743
                simple_heap_update(tgrel, &tuple->t_self, tuple);
 
1744
 
 
1745
                CatalogUpdateIndexes(tgrel, tuple);
 
1746
 
 
1747
                /*
 
1748
                 * Invalidate trigger's relation's relcache entry so that other
 
1749
                 * backends (and this one too!) are sent SI message to make them
 
1750
                 * rebuild relcache entries.  (Ideally this should happen
 
1751
                 * automatically...)
 
1752
                 *
 
1753
                 * We can skip this for triggers on relid itself, since that relcache
 
1754
                 * flush will happen anyway due to the table or column rename.  We
 
1755
                 * just need to catch the far ends of RI relationships.
 
1756
                 */
 
1757
                pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
 
1758
                if (pg_trigger->tgrelid != relid)
 
1759
                        CacheInvalidateRelcacheByRelid(pg_trigger->tgrelid);
 
1760
 
 
1761
                /* free up our scratch memory */
 
1762
                pfree(newtgargs);
 
1763
                heap_freetuple(tuple);
 
1764
        }
 
1765
 
 
1766
        systable_endscan(trigscan);
 
1767
 
 
1768
        heap_close(tgrel, RowExclusiveLock);
 
1769
 
 
1770
        /*
 
1771
         * Increment cmd counter to make updates visible; this is needed in
 
1772
         * case the same tuple has to be updated again by next pass (can
 
1773
         * happen in case of a self-referential FK relationship).
 
1774
         */
 
1775
        CommandCounterIncrement();
 
1776
}
 
1777
 
 
1778
/*
 
1779
 * AlterTable
 
1780
 *              Execute ALTER TABLE, which can be a list of subcommands
 
1781
 *
 
1782
 * ALTER TABLE is performed in three phases:
 
1783
 *              1. Examine subcommands and perform pre-transformation checking.
 
1784
 *              2. Update system catalogs.
 
1785
 *              3. Scan table(s) to check new constraints, and optionally recopy
 
1786
 *                 the data into new table(s).
 
1787
 * Phase 3 is not performed unless one or more of the subcommands requires
 
1788
 * it.  The intention of this design is to allow multiple independent
 
1789
 * updates of the table schema to be performed with only one pass over the
 
1790
 * data.
 
1791
 *
 
1792
 * ATPrepCmd performs phase 1.  A "work queue" entry is created for
 
1793
 * each table to be affected (there may be multiple affected tables if the
 
1794
 * commands traverse a table inheritance hierarchy).  Also we do preliminary
 
1795
 * validation of the subcommands, including parse transformation of those
 
1796
 * expressions that need to be evaluated with respect to the old table
 
1797
 * schema.
 
1798
 *
 
1799
 * ATRewriteCatalogs performs phase 2 for each affected table (note that
 
1800
 * phases 2 and 3 do no explicit recursion, since phase 1 already did it).
 
1801
 * Certain subcommands need to be performed before others to avoid
 
1802
 * unnecessary conflicts; for example, DROP COLUMN should come before
 
1803
 * ADD COLUMN.  Therefore phase 1 divides the subcommands into multiple
 
1804
 * lists, one for each logical "pass" of phase 2.
 
1805
 *
 
1806
 * ATRewriteTables performs phase 3 for those tables that need it.
 
1807
 *
 
1808
 * Thanks to the magic of MVCC, an error anywhere along the way rolls back
 
1809
 * the whole operation; we don't have to do anything special to clean up.
 
1810
 */
 
1811
void
 
1812
AlterTable(AlterTableStmt *stmt)
 
1813
{
 
1814
        ATController(relation_openrv(stmt->relation, AccessExclusiveLock),
 
1815
                                 stmt->cmds,
 
1816
                                 interpretInhOption(stmt->relation->inhOpt));
 
1817
}
 
1818
 
 
1819
/*
 
1820
 * AlterTableInternal
 
1821
 *
 
1822
 * ALTER TABLE with target specified by OID
 
1823
 */
 
1824
void
 
1825
AlterTableInternal(Oid relid, List *cmds, bool recurse)
 
1826
{
 
1827
        ATController(relation_open(relid, AccessExclusiveLock),
 
1828
                                 cmds,
 
1829
                                 recurse);
 
1830
}
 
1831
 
 
1832
static void
 
1833
ATController(Relation rel, List *cmds, bool recurse)
 
1834
{
 
1835
        List       *wqueue = NIL;
 
1836
        ListCell   *lcmd;
 
1837
 
 
1838
        /* Phase 1: preliminary examination of commands, create work queue */
 
1839
        foreach(lcmd, cmds)
 
1840
        {
 
1841
                AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
 
1842
 
 
1843
                ATPrepCmd(&wqueue, rel, cmd, recurse, false);
 
1844
        }
 
1845
 
 
1846
        /* Close the relation, but keep lock until commit */
 
1847
        relation_close(rel, NoLock);
 
1848
 
 
1849
        /* Phase 2: update system catalogs */
 
1850
        ATRewriteCatalogs(&wqueue);
 
1851
 
 
1852
        /* Phase 3: scan/rewrite tables as needed */
 
1853
        ATRewriteTables(&wqueue);
 
1854
}
 
1855
 
 
1856
/*
 
1857
 * ATPrepCmd
 
1858
 *
 
1859
 * Traffic cop for ALTER TABLE Phase 1 operations, including simple
 
1860
 * recursion and permission checks.
 
1861
 *
 
1862
 * Caller must have acquired AccessExclusiveLock on relation already.
 
1863
 * This lock should be held until commit.
 
1864
 */
 
1865
static void
 
1866
ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 
1867
                  bool recurse, bool recursing)
 
1868
{
 
1869
        AlteredTableInfo *tab;
 
1870
        int                     pass;
 
1871
 
 
1872
        /* Find or create work queue entry for this table */
 
1873
        tab = ATGetQueueEntry(wqueue, rel);
 
1874
 
 
1875
        /*
 
1876
         * Copy the original subcommand for each table.  This avoids conflicts
 
1877
         * when different child tables need to make different parse
 
1878
         * transformations (for example, the same column may have different
 
1879
         * column numbers in different children).
 
1880
         */
 
1881
        cmd = copyObject(cmd);
 
1882
 
 
1883
        /*
 
1884
         * Do permissions checking, recursion to child tables if needed, and
 
1885
         * any additional phase-1 processing needed.
 
1886
         */
 
1887
        switch (cmd->subtype)
 
1888
        {
 
1889
                case AT_AddColumn:              /* ADD COLUMN */
 
1890
                        ATSimplePermissions(rel, false);
 
1891
                        /* Performs own recursion */
 
1892
                        ATPrepAddColumn(wqueue, rel, recurse, cmd);
 
1893
                        pass = AT_PASS_ADD_COL;
 
1894
                        break;
 
1895
                case AT_ColumnDefault:  /* ALTER COLUMN DEFAULT */
 
1896
 
 
1897
                        /*
 
1898
                         * We allow defaults on views so that INSERT into a view can
 
1899
                         * have default-ish behavior.  This works because the rewriter
 
1900
                         * substitutes default values into INSERTs before it expands
 
1901
                         * rules.
 
1902
                         */
 
1903
                        ATSimplePermissions(rel, true);
 
1904
                        ATSimpleRecursion(wqueue, rel, cmd, recurse);
 
1905
                        /* No command-specific prep needed */
 
1906
                        pass = AT_PASS_ADD_CONSTR;
 
1907
                        break;
 
1908
                case AT_DropNotNull:    /* ALTER COLUMN DROP NOT NULL */
 
1909
                        ATSimplePermissions(rel, false);
 
1910
                        ATSimpleRecursion(wqueue, rel, cmd, recurse);
 
1911
                        /* No command-specific prep needed */
 
1912
                        pass = AT_PASS_DROP;
 
1913
                        break;
 
1914
                case AT_SetNotNull:             /* ALTER COLUMN SET NOT NULL */
 
1915
                        ATSimplePermissions(rel, false);
 
1916
                        ATSimpleRecursion(wqueue, rel, cmd, recurse);
 
1917
                        /* No command-specific prep needed */
 
1918
                        pass = AT_PASS_ADD_CONSTR;
 
1919
                        break;
 
1920
                case AT_SetStatistics:  /* ALTER COLUMN STATISTICS */
 
1921
                        ATSimpleRecursion(wqueue, rel, cmd, recurse);
 
1922
                        /* Performs own permission checks */
 
1923
                        ATPrepSetStatistics(rel, cmd->name, cmd->def);
 
1924
                        pass = AT_PASS_COL_ATTRS;
 
1925
                        break;
 
1926
                case AT_SetStorage:             /* ALTER COLUMN STORAGE */
 
1927
                        ATSimplePermissions(rel, false);
 
1928
                        ATSimpleRecursion(wqueue, rel, cmd, recurse);
 
1929
                        /* No command-specific prep needed */
 
1930
                        pass = AT_PASS_COL_ATTRS;
 
1931
                        break;
 
1932
                case AT_DropColumn:             /* DROP COLUMN */
 
1933
                        ATSimplePermissions(rel, false);
 
1934
                        /* Recursion occurs during execution phase */
 
1935
                        /* No command-specific prep needed except saving recurse flag */
 
1936
                        if (recurse)
 
1937
                                cmd->subtype = AT_DropColumnRecurse;
 
1938
                        pass = AT_PASS_DROP;
 
1939
                        break;
 
1940
                case AT_AddIndex:               /* ADD INDEX */
 
1941
                        ATSimplePermissions(rel, false);
 
1942
                        /* This command never recurses */
 
1943
                        /* No command-specific prep needed */
 
1944
                        pass = AT_PASS_ADD_INDEX;
 
1945
                        break;
 
1946
                case AT_AddConstraint:  /* ADD CONSTRAINT */
 
1947
                        ATSimplePermissions(rel, false);
 
1948
 
 
1949
                        /*
 
1950
                         * Currently we recurse only for CHECK constraints, never for
 
1951
                         * foreign-key constraints.  UNIQUE/PKEY constraints won't be
 
1952
                         * seen here.
 
1953
                         */
 
1954
                        if (IsA(cmd->def, Constraint))
 
1955
                                ATSimpleRecursion(wqueue, rel, cmd, recurse);
 
1956
                        /* No command-specific prep needed */
 
1957
                        pass = AT_PASS_ADD_CONSTR;
 
1958
                        break;
 
1959
                case AT_DropConstraint: /* DROP CONSTRAINT */
 
1960
                        ATSimplePermissions(rel, false);
 
1961
                        /* Performs own recursion */
 
1962
                        ATPrepDropConstraint(wqueue, rel, recurse, cmd);
 
1963
                        pass = AT_PASS_DROP;
 
1964
                        break;
 
1965
                case AT_DropConstraintQuietly:  /* DROP CONSTRAINT for child */
 
1966
                        ATSimplePermissions(rel, false);
 
1967
                        ATSimpleRecursion(wqueue, rel, cmd, recurse);
 
1968
                        /* No command-specific prep needed */
 
1969
                        pass = AT_PASS_DROP;
 
1970
                        break;
 
1971
                case AT_AlterColumnType:                /* ALTER COLUMN TYPE */
 
1972
                        ATSimplePermissions(rel, false);
 
1973
                        /* Performs own recursion */
 
1974
                        ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
 
1975
                        pass = AT_PASS_ALTER_TYPE;
 
1976
                        break;
 
1977
                case AT_ToastTable:             /* CREATE TOAST TABLE */
 
1978
                        ATSimplePermissions(rel, false);
 
1979
                        /* This command never recurses */
 
1980
                        /* No command-specific prep needed */
 
1981
                        pass = AT_PASS_MISC;
 
1982
                        break;
 
1983
                case AT_ChangeOwner:    /* ALTER OWNER */
 
1984
                        /* This command never recurses */
 
1985
                        /* No command-specific prep needed */
 
1986
                        pass = AT_PASS_MISC;
 
1987
                        break;
 
1988
                case AT_ClusterOn:              /* CLUSTER ON */
 
1989
                case AT_DropCluster:    /* SET WITHOUT CLUSTER */
 
1990
                        ATSimplePermissions(rel, false);
 
1991
                        /* These commands never recurse */
 
1992
                        /* No command-specific prep needed */
 
1993
                        pass = AT_PASS_MISC;
 
1994
                        break;
 
1995
                case AT_DropOids:               /* SET WITHOUT OIDS */
 
1996
                        ATSimplePermissions(rel, false);
 
1997
                        /* Performs own recursion */
 
1998
                        if (rel->rd_rel->relhasoids)
 
1999
                        {
 
2000
                                AlterTableCmd *dropCmd = makeNode(AlterTableCmd);
 
2001
 
 
2002
                                dropCmd->subtype = AT_DropColumn;
 
2003
                                dropCmd->name = pstrdup("oid");
 
2004
                                dropCmd->behavior = cmd->behavior;
 
2005
                                ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
 
2006
                        }
 
2007
                        pass = AT_PASS_DROP;
 
2008
                        break;
 
2009
                case AT_SetTableSpace:  /* SET TABLESPACE */
 
2010
                        /* This command never recurses */
 
2011
                        ATPrepSetTableSpace(tab, rel, cmd->name);
 
2012
                        pass = AT_PASS_MISC;    /* doesn't actually matter */
 
2013
                        break;
 
2014
                default:                                /* oops */
 
2015
                        elog(ERROR, "unrecognized alter table type: %d",
 
2016
                                 (int) cmd->subtype);
 
2017
                        pass = 0;                       /* keep compiler quiet */
 
2018
                        break;
 
2019
        }
 
2020
 
 
2021
        /* Add the subcommand to the appropriate list for phase 2 */
 
2022
        tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd);
 
2023
}
 
2024
 
 
2025
/*
 
2026
 * ATRewriteCatalogs
 
2027
 *
 
2028
 * Traffic cop for ALTER TABLE Phase 2 operations.      Subcommands are
 
2029
 * dispatched in a "safe" execution order (designed to avoid unnecessary
 
2030
 * conflicts).
 
2031
 */
 
2032
static void
 
2033
ATRewriteCatalogs(List **wqueue)
 
2034
{
 
2035
        int                     pass;
 
2036
        ListCell   *ltab;
 
2037
 
 
2038
        /*
 
2039
         * We process all the tables "in parallel", one pass at a time.  This
 
2040
         * is needed because we may have to propagate work from one table to
 
2041
         * another (specifically, ALTER TYPE on a foreign key's PK has to
 
2042
         * dispatch the re-adding of the foreign key constraint to the other
 
2043
         * table).      Work can only be propagated into later passes, however.
 
2044
         */
 
2045
        for (pass = 0; pass < AT_NUM_PASSES; pass++)
 
2046
        {
 
2047
                /* Go through each table that needs to be processed */
 
2048
                foreach(ltab, *wqueue)
 
2049
                {
 
2050
                        AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
 
2051
                        List       *subcmds = tab->subcmds[pass];
 
2052
                        Relation        rel;
 
2053
                        ListCell   *lcmd;
 
2054
 
 
2055
                        if (subcmds == NIL)
 
2056
                                continue;
 
2057
 
 
2058
                        /*
 
2059
                         * Exclusive lock was obtained by phase 1, needn't get it
 
2060
                         * again
 
2061
                         */
 
2062
                        rel = relation_open(tab->relid, NoLock);
 
2063
 
 
2064
                        foreach(lcmd, subcmds)
 
2065
                                ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd));
 
2066
 
 
2067
                        /*
 
2068
                         * After the ALTER TYPE pass, do cleanup work (this is not
 
2069
                         * done in ATExecAlterColumnType since it should be done only
 
2070
                         * once if multiple columns of a table are altered).
 
2071
                         */
 
2072
                        if (pass == AT_PASS_ALTER_TYPE)
 
2073
                                ATPostAlterTypeCleanup(wqueue, tab);
 
2074
 
 
2075
                        relation_close(rel, NoLock);
 
2076
                }
 
2077
        }
 
2078
 
 
2079
        /*
 
2080
         * Do an implicit CREATE TOAST TABLE if we executed any subcommands
 
2081
         * that might have added a column or changed column storage.
 
2082
         */
 
2083
        foreach(ltab, *wqueue)
 
2084
        {
 
2085
                AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
 
2086
 
 
2087
                if (tab->relkind == RELKIND_RELATION &&
 
2088
                        (tab->subcmds[AT_PASS_ADD_COL] ||
 
2089
                         tab->subcmds[AT_PASS_ALTER_TYPE] ||
 
2090
                         tab->subcmds[AT_PASS_COL_ATTRS]))
 
2091
                        AlterTableCreateToastTable(tab->relid, true);
 
2092
        }
 
2093
}
 
2094
 
 
2095
/*
 
2096
 * ATExecCmd: dispatch a subcommand to appropriate execution routine
 
2097
 */
 
2098
static void
 
2099
ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
 
2100
{
 
2101
        switch (cmd->subtype)
 
2102
        {
 
2103
                case AT_AddColumn:              /* ADD COLUMN */
 
2104
                        ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
 
2105
                        break;
 
2106
                case AT_ColumnDefault:  /* ALTER COLUMN DEFAULT */
 
2107
                        ATExecColumnDefault(rel, cmd->name, cmd->def);
 
2108
                        break;
 
2109
                case AT_DropNotNull:    /* ALTER COLUMN DROP NOT NULL */
 
2110
                        ATExecDropNotNull(rel, cmd->name);
 
2111
                        break;
 
2112
                case AT_SetNotNull:             /* ALTER COLUMN SET NOT NULL */
 
2113
                        ATExecSetNotNull(tab, rel, cmd->name);
 
2114
                        break;
 
2115
                case AT_SetStatistics:  /* ALTER COLUMN STATISTICS */
 
2116
                        ATExecSetStatistics(rel, cmd->name, cmd->def);
 
2117
                        break;
 
2118
                case AT_SetStorage:             /* ALTER COLUMN STORAGE */
 
2119
                        ATExecSetStorage(rel, cmd->name, cmd->def);
 
2120
                        break;
 
2121
                case AT_DropColumn:             /* DROP COLUMN */
 
2122
                        ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
 
2123
                        break;
 
2124
                case AT_DropColumnRecurse:              /* DROP COLUMN with recursion */
 
2125
                        ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
 
2126
                        break;
 
2127
                case AT_AddIndex:               /* ADD INDEX */
 
2128
                        ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
 
2129
                        break;
 
2130
                case AT_ReAddIndex:             /* ADD INDEX */
 
2131
                        ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
 
2132
                        break;
 
2133
                case AT_AddConstraint:  /* ADD CONSTRAINT */
 
2134
                        ATExecAddConstraint(tab, rel, cmd->def);
 
2135
                        break;
 
2136
                case AT_DropConstraint: /* DROP CONSTRAINT */
 
2137
                        ATExecDropConstraint(rel, cmd->name, cmd->behavior, false);
 
2138
                        break;
 
2139
                case AT_DropConstraintQuietly:  /* DROP CONSTRAINT for child */
 
2140
                        ATExecDropConstraint(rel, cmd->name, cmd->behavior, true);
 
2141
                        break;
 
2142
                case AT_AlterColumnType:                /* ALTER COLUMN TYPE */
 
2143
                        ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
 
2144
                        break;
 
2145
                case AT_ToastTable:             /* CREATE TOAST TABLE */
 
2146
                        AlterTableCreateToastTable(RelationGetRelid(rel), false);
 
2147
                        break;
 
2148
                case AT_ChangeOwner:    /* ALTER OWNER */
 
2149
                        /* get_usesysid raises an error if no such user */
 
2150
                        ATExecChangeOwner(RelationGetRelid(rel), get_usesysid(cmd->name));
 
2151
                        break;
 
2152
                case AT_ClusterOn:              /* CLUSTER ON */
 
2153
                        ATExecClusterOn(rel, cmd->name);
 
2154
                        break;
 
2155
                case AT_DropCluster:    /* SET WITHOUT CLUSTER */
 
2156
                        ATExecDropCluster(rel);
 
2157
                        break;
 
2158
                case AT_DropOids:               /* SET WITHOUT OIDS */
 
2159
 
 
2160
                        /*
 
2161
                         * Nothing to do here; we'll have generated a DropColumn
 
2162
                         * subcommand to do the real work
 
2163
                         */
 
2164
                        break;
 
2165
                case AT_SetTableSpace:  /* SET TABLESPACE */
 
2166
 
 
2167
                        /*
 
2168
                         * Nothing to do here; Phase 3 does the work
 
2169
                         */
 
2170
                        break;
 
2171
                default:                                /* oops */
 
2172
                        elog(ERROR, "unrecognized alter table type: %d",
 
2173
                                 (int) cmd->subtype);
 
2174
                        break;
 
2175
        }
 
2176
 
 
2177
        /*
 
2178
         * Bump the command counter to ensure the next subcommand in the
 
2179
         * sequence can see the changes so far
 
2180
         */
 
2181
        CommandCounterIncrement();
 
2182
}
 
2183
 
 
2184
/*
 
2185
 * ATRewriteTables: ALTER TABLE phase 3
 
2186
 */
 
2187
static void
 
2188
ATRewriteTables(List **wqueue)
 
2189
{
 
2190
        ListCell   *ltab;
 
2191
 
 
2192
        /* Go through each table that needs to be checked or rewritten */
 
2193
        foreach(ltab, *wqueue)
 
2194
        {
 
2195
                AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
 
2196
 
 
2197
                /*
 
2198
                 * We only need to rewrite the table if at least one column needs
 
2199
                 * to be recomputed.
 
2200
                 */
 
2201
                if (tab->newvals != NIL)
 
2202
                {
 
2203
                        /* Build a temporary relation and copy data */
 
2204
                        Oid                     OIDNewHeap;
 
2205
                        char            NewHeapName[NAMEDATALEN];
 
2206
                        Oid                     NewTableSpace;
 
2207
                        Relation        OldHeap;
 
2208
                        ObjectAddress object;
 
2209
 
 
2210
                        OldHeap = heap_open(tab->relid, NoLock);
 
2211
 
 
2212
                        /*
 
2213
                         * We can never allow rewriting of shared or nailed-in-cache
 
2214
                         * relations, because we can't support changing their
 
2215
                         * relfilenode values.
 
2216
                         */
 
2217
                        if (OldHeap->rd_rel->relisshared || OldHeap->rd_isnailed)
 
2218
                                ereport(ERROR,
 
2219
                                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
2220
                                                 errmsg("cannot rewrite system relation \"%s\"",
 
2221
                                                                RelationGetRelationName(OldHeap))));
 
2222
 
 
2223
                        /*
 
2224
                         * Don't allow rewrite on temp tables of other backends ...
 
2225
                         * their local buffer manager is not going to cope.
 
2226
                         */
 
2227
                        if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
 
2228
                                ereport(ERROR,
 
2229
                                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
2230
                                                 errmsg("cannot rewrite temporary tables of other sessions")));
 
2231
 
 
2232
                        /*
 
2233
                         * Select destination tablespace (same as original unless user
 
2234
                         * requested a change)
 
2235
                         */
 
2236
                        if (tab->newTableSpace)
 
2237
                                NewTableSpace = tab->newTableSpace;
 
2238
                        else
 
2239
                                NewTableSpace = OldHeap->rd_rel->reltablespace;
 
2240
 
 
2241
                        heap_close(OldHeap, NoLock);
 
2242
 
 
2243
                        /*
 
2244
                         * Create the new heap, using a temporary name in the same
 
2245
                         * namespace as the existing table.  NOTE: there is some risk
 
2246
                         * of collision with user relnames.  Working around this seems
 
2247
                         * more trouble than it's worth; in particular, we can't
 
2248
                         * create the new heap in a different namespace from the old,
 
2249
                         * or we will have problems with the TEMP status of temp
 
2250
                         * tables.
 
2251
                         */
 
2252
                        snprintf(NewHeapName, sizeof(NewHeapName),
 
2253
                                         "pg_temp_%u", tab->relid);
 
2254
 
 
2255
                        OIDNewHeap = make_new_heap(tab->relid, NewHeapName, NewTableSpace);
 
2256
 
 
2257
                        /*
 
2258
                         * Copy the heap data into the new table with the desired
 
2259
                         * modifications, and test the current data within the table
 
2260
                         * against new constraints generated by ALTER TABLE commands.
 
2261
                         */
 
2262
                        ATRewriteTable(tab, OIDNewHeap);
 
2263
 
 
2264
                        /* Swap the physical files of the old and new heaps. */
 
2265
                        swap_relation_files(tab->relid, OIDNewHeap);
 
2266
 
 
2267
                        CommandCounterIncrement();
 
2268
 
 
2269
                        /* Destroy new heap with old filenode */
 
2270
                        object.classId = RelOid_pg_class;
 
2271
                        object.objectId = OIDNewHeap;
 
2272
                        object.objectSubId = 0;
 
2273
 
 
2274
                        /*
 
2275
                         * The new relation is local to our transaction and we know
 
2276
                         * nothing depends on it, so DROP_RESTRICT should be OK.
 
2277
                         */
 
2278
                        performDeletion(&object, DROP_RESTRICT);
 
2279
                        /* performDeletion does CommandCounterIncrement at end */
 
2280
 
 
2281
                        /*
 
2282
                         * Rebuild each index on the relation (but not the toast
 
2283
                         * table, which is all-new anyway).  We do not need
 
2284
                         * CommandCounterIncrement() because reindex_relation does it.
 
2285
                         */
 
2286
                        reindex_relation(tab->relid, false);
 
2287
                }
 
2288
                else
 
2289
                {
 
2290
                        /*
 
2291
                         * Test the current data within the table against new
 
2292
                         * constraints generated by ALTER TABLE commands, but don't
 
2293
                         * rebuild data.
 
2294
                         */
 
2295
                        if (tab->constraints != NIL)
 
2296
                                ATRewriteTable(tab, InvalidOid);
 
2297
 
 
2298
                        /*
 
2299
                         * If we had SET TABLESPACE but no reason to reconstruct
 
2300
                         * tuples, just do a block-by-block copy.
 
2301
                         */
 
2302
                        if (tab->newTableSpace)
 
2303
                                ATExecSetTableSpace(tab->relid, tab->newTableSpace);
 
2304
                }
 
2305
        }
 
2306
 
 
2307
        /*
 
2308
         * Foreign key constraints are checked in a final pass, since (a) it's
 
2309
         * generally best to examine each one separately, and (b) it's at
 
2310
         * least theoretically possible that we have changed both relations of
 
2311
         * the foreign key, and we'd better have finished both rewrites before
 
2312
         * we try to read the tables.
 
2313
         */
 
2314
        foreach(ltab, *wqueue)
 
2315
        {
 
2316
                AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
 
2317
                Relation        rel = NULL;
 
2318
                ListCell   *lcon;
 
2319
 
 
2320
                foreach(lcon, tab->constraints)
 
2321
                {
 
2322
                        NewConstraint *con = lfirst(lcon);
 
2323
 
 
2324
                        if (con->contype == CONSTR_FOREIGN)
 
2325
                        {
 
2326
                                FkConstraint *fkconstraint = (FkConstraint *) con->qual;
 
2327
                                Relation        refrel;
 
2328
 
 
2329
                                if (rel == NULL)
 
2330
                                {
 
2331
                                        /* Long since locked, no need for another */
 
2332
                                        rel = heap_open(tab->relid, NoLock);
 
2333
                                }
 
2334
 
 
2335
                                refrel = heap_open(con->refrelid, RowShareLock);
 
2336
 
 
2337
                                validateForeignKeyConstraint(fkconstraint, rel, refrel);
 
2338
 
 
2339
                                heap_close(refrel, NoLock);
 
2340
                        }
 
2341
                }
 
2342
 
 
2343
                if (rel)
 
2344
                        heap_close(rel, NoLock);
 
2345
        }
 
2346
}
 
2347
 
 
2348
/*
 
2349
 * ATRewriteTable: scan or rewrite one table
 
2350
 *
 
2351
 * OIDNewHeap is InvalidOid if we don't need to rewrite
 
2352
 */
 
2353
static void
 
2354
ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
 
2355
{
 
2356
        Relation        oldrel;
 
2357
        Relation        newrel;
 
2358
        TupleDesc       oldTupDesc;
 
2359
        TupleDesc       newTupDesc;
 
2360
        bool            needscan = false;
 
2361
        int                     i;
 
2362
        ListCell   *l;
 
2363
        EState     *estate;
 
2364
 
 
2365
        /*
 
2366
         * Open the relation(s).  We have surely already locked the existing
 
2367
         * table.
 
2368
         */
 
2369
        oldrel = heap_open(tab->relid, NoLock);
 
2370
        oldTupDesc = tab->oldDesc;
 
2371
        newTupDesc = RelationGetDescr(oldrel);          /* includes all mods */
 
2372
 
 
2373
        if (OidIsValid(OIDNewHeap))
 
2374
                newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
 
2375
        else
 
2376
                newrel = NULL;
 
2377
 
 
2378
        /*
 
2379
         * If we need to rewrite the table, the operation has to be propagated
 
2380
         * to tables that use this table's rowtype as a column type.
 
2381
         *
 
2382
         * (Eventually this will probably become true for scans as well, but at
 
2383
         * the moment a composite type does not enforce any constraints, so
 
2384
         * it's not necessary/appropriate to enforce them just during ALTER.)
 
2385
         */
 
2386
        if (newrel)
 
2387
                find_composite_type_dependencies(oldrel->rd_rel->reltype,
 
2388
                                                                                 RelationGetRelationName(oldrel));
 
2389
 
 
2390
        /*
 
2391
         * Generate the constraint and default execution states
 
2392
         */
 
2393
 
 
2394
        estate = CreateExecutorState();
 
2395
 
 
2396
        /* Build the needed expression execution states */
 
2397
        foreach(l, tab->constraints)
 
2398
        {
 
2399
                NewConstraint *con = lfirst(l);
 
2400
 
 
2401
                switch (con->contype)
 
2402
                {
 
2403
                        case CONSTR_CHECK:
 
2404
                                needscan = true;
 
2405
                                con->qualstate = (List *)
 
2406
                                        ExecPrepareExpr((Expr *) con->qual, estate);
 
2407
                                break;
 
2408
                        case CONSTR_FOREIGN:
 
2409
                                /* Nothing to do here */
 
2410
                                break;
 
2411
                        case CONSTR_NOTNULL:
 
2412
                                needscan = true;
 
2413
                                break;
 
2414
                        default:
 
2415
                                elog(ERROR, "unrecognized constraint type: %d",
 
2416
                                         (int) con->contype);
 
2417
                }
 
2418
        }
 
2419
 
 
2420
        foreach(l, tab->newvals)
 
2421
        {
 
2422
                NewColumnValue *ex = lfirst(l);
 
2423
 
 
2424
                needscan = true;
 
2425
 
 
2426
                ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate);
 
2427
        }
 
2428
 
 
2429
        if (needscan)
 
2430
        {
 
2431
                ExprContext *econtext;
 
2432
                Datum      *values;
 
2433
                char       *nulls;
 
2434
                TupleTableSlot *oldslot;
 
2435
                TupleTableSlot *newslot;
 
2436
                HeapScanDesc scan;
 
2437
                HeapTuple       tuple;
 
2438
                MemoryContext oldCxt;
 
2439
                List *dropped_attrs = NIL;
 
2440
                ListCell *lc;
 
2441
 
 
2442
                econtext = GetPerTupleExprContext(estate);
 
2443
 
 
2444
                /*
 
2445
                 * Make tuple slots for old and new tuples.  Note that even when
 
2446
                 * the tuples are the same, the tupDescs might not be (consider
 
2447
                 * ADD COLUMN without a default).
 
2448
                 */
 
2449
                oldslot = MakeTupleTableSlot();
 
2450
                ExecSetSlotDescriptor(oldslot, oldTupDesc, false);
 
2451
                newslot = MakeTupleTableSlot();
 
2452
                ExecSetSlotDescriptor(newslot, newTupDesc, false);
 
2453
 
 
2454
                /* Preallocate values/nulls arrays */
 
2455
                i = Max(newTupDesc->natts, oldTupDesc->natts);
 
2456
                values = (Datum *) palloc(i * sizeof(Datum));
 
2457
                nulls = (char *) palloc(i * sizeof(char));
 
2458
                memset(values, 0, i * sizeof(Datum));
 
2459
                memset(nulls, 'n', i * sizeof(char));
 
2460
 
 
2461
                /*
 
2462
                 * Any attributes that are dropped according to the new tuple
 
2463
                 * descriptor can be set to NULL. We precompute the list of
 
2464
                 * dropped attributes to avoid needing to do so in the
 
2465
                 * per-tuple loop.
 
2466
                 */
 
2467
                for (i = 0; i < newTupDesc->natts; i++)
 
2468
                {
 
2469
                        if (newTupDesc->attrs[i]->attisdropped)
 
2470
                                dropped_attrs = lappend_int(dropped_attrs, i);
 
2471
                }
 
2472
 
 
2473
                /*
 
2474
                 * Scan through the rows, generating a new row if needed and then
 
2475
                 * checking all the constraints.
 
2476
                 */
 
2477
                scan = heap_beginscan(oldrel, SnapshotNow, 0, NULL);
 
2478
 
 
2479
                /*
 
2480
                 * Switch to per-tuple memory context and reset it for each
 
2481
                 * tuple produced, so we don't leak memory.
 
2482
                 */
 
2483
                oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 
2484
 
 
2485
                while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
 
2486
                {
 
2487
                        if (newrel)
 
2488
                        {
 
2489
                                /* Extract data from old tuple */
 
2490
                                heap_deformtuple(tuple, oldTupDesc, values, nulls);
 
2491
 
 
2492
                                /* Set dropped attributes to null in new tuple */
 
2493
                                foreach (lc, dropped_attrs)
 
2494
                                        nulls[lfirst_int(lc)] = 'n';
 
2495
 
 
2496
                                /*
 
2497
                                 * Process supplied expressions to replace selected
 
2498
                                 * columns. Expression inputs come from the old tuple.
 
2499
                                 */
 
2500
                                ExecStoreTuple(tuple, oldslot, InvalidBuffer, false);
 
2501
                                econtext->ecxt_scantuple = oldslot;
 
2502
 
 
2503
                                foreach(l, tab->newvals)
 
2504
                                {
 
2505
                                        NewColumnValue *ex = lfirst(l);
 
2506
                                        bool            isNull;
 
2507
 
 
2508
                                        values[ex->attnum - 1] = ExecEvalExpr(ex->exprstate,
 
2509
                                                                                                                  econtext,
 
2510
                                                                                                                  &isNull,
 
2511
                                                                                                                  NULL);
 
2512
                                        if (isNull)
 
2513
                                                nulls[ex->attnum - 1] = 'n';
 
2514
                                        else
 
2515
                                                nulls[ex->attnum - 1] = ' ';
 
2516
                                }
 
2517
 
 
2518
                                /*
 
2519
                                 * Form the new tuple. Note that we don't explicitly
 
2520
                                 * pfree it, since the per-tuple memory context will
 
2521
                                 * be reset shortly.
 
2522
                                 */
 
2523
                                tuple = heap_formtuple(newTupDesc, values, nulls);
 
2524
                        }
 
2525
 
 
2526
                        /* Now check any constraints on the possibly-changed tuple */
 
2527
                        ExecStoreTuple(tuple, newslot, InvalidBuffer, false);
 
2528
                        econtext->ecxt_scantuple = newslot;
 
2529
 
 
2530
                        foreach(l, tab->constraints)
 
2531
                        {
 
2532
                                NewConstraint *con = lfirst(l);
 
2533
 
 
2534
                                switch (con->contype)
 
2535
                                {
 
2536
                                        case CONSTR_CHECK:
 
2537
                                                if (!ExecQual(con->qualstate, econtext, true))
 
2538
                                                        ereport(ERROR,
 
2539
                                                                        (errcode(ERRCODE_CHECK_VIOLATION),
 
2540
                                                                         errmsg("check constraint \"%s\" is violated by some row",
 
2541
                                                                                        con->name)));
 
2542
                                                break;
 
2543
                                        case CONSTR_NOTNULL:
 
2544
                                                {
 
2545
                                                        Datum           d;
 
2546
                                                        bool            isnull;
 
2547
 
 
2548
                                                        d = heap_getattr(tuple, con->attnum, newTupDesc,
 
2549
                                                                                         &isnull);
 
2550
                                                        if (isnull)
 
2551
                                                                ereport(ERROR,
 
2552
                                                                        (errcode(ERRCODE_NOT_NULL_VIOLATION),
 
2553
                                                                         errmsg("column \"%s\" contains null values",
 
2554
                                                                                        get_attname(tab->relid,
 
2555
                                                                                                                con->attnum))));
 
2556
                                                }
 
2557
                                                break;
 
2558
                                        case CONSTR_FOREIGN:
 
2559
                                                /* Nothing to do here */
 
2560
                                                break;
 
2561
                                        default:
 
2562
                                                elog(ERROR, "unrecognized constraint type: %d",
 
2563
                                                         (int) con->contype);
 
2564
                                }
 
2565
                        }
 
2566
 
 
2567
                        /* Write the tuple out to the new relation */
 
2568
                        if (newrel)
 
2569
                                simple_heap_insert(newrel, tuple);
 
2570
 
 
2571
                        ResetExprContext(econtext);
 
2572
 
 
2573
                        CHECK_FOR_INTERRUPTS();
 
2574
                }
 
2575
 
 
2576
                MemoryContextSwitchTo(oldCxt);
 
2577
                heap_endscan(scan);
 
2578
        }
 
2579
 
 
2580
        FreeExecutorState(estate);
 
2581
 
 
2582
        heap_close(oldrel, NoLock);
 
2583
        if (newrel)
 
2584
                heap_close(newrel, NoLock);
 
2585
}
 
2586
 
 
2587
/*
 
2588
 * ATGetQueueEntry: find or create an entry in the ALTER TABLE work queue
 
2589
 */
 
2590
static AlteredTableInfo *
 
2591
ATGetQueueEntry(List **wqueue, Relation rel)
 
2592
{
 
2593
        Oid                     relid = RelationGetRelid(rel);
 
2594
        AlteredTableInfo *tab;
 
2595
        ListCell   *ltab;
 
2596
 
 
2597
        foreach(ltab, *wqueue)
 
2598
        {
 
2599
                tab = (AlteredTableInfo *) lfirst(ltab);
 
2600
                if (tab->relid == relid)
 
2601
                        return tab;
 
2602
        }
 
2603
 
 
2604
        /*
 
2605
         * Not there, so add it.  Note that we make a copy of the relation's
 
2606
         * existing descriptor before anything interesting can happen to it.
 
2607
         */
 
2608
        tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo));
 
2609
        tab->relid = relid;
 
2610
        tab->relkind = rel->rd_rel->relkind;
 
2611
        tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel));
 
2612
 
 
2613
        *wqueue = lappend(*wqueue, tab);
 
2614
 
 
2615
        return tab;
 
2616
}
 
2617
 
 
2618
/*
 
2619
 * ATSimplePermissions
 
2620
 *
 
2621
 * - Ensure that it is a relation (or possibly a view)
 
2622
 * - Ensure this user is the owner
 
2623
 * - Ensure that it is not a system table
 
2624
 */
 
2625
static void
 
2626
ATSimplePermissions(Relation rel, bool allowView)
 
2627
{
 
2628
        if (rel->rd_rel->relkind != RELKIND_RELATION)
 
2629
        {
 
2630
                if (allowView)
 
2631
                {
 
2632
                        if (rel->rd_rel->relkind != RELKIND_VIEW)
 
2633
                                ereport(ERROR,
 
2634
                                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
 
2635
                                                 errmsg("\"%s\" is not a table or view",
 
2636
                                                                RelationGetRelationName(rel))));
 
2637
                }
 
2638
                else
 
2639
                        ereport(ERROR,
 
2640
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
 
2641
                                         errmsg("\"%s\" is not a table",
 
2642
                                                        RelationGetRelationName(rel))));
 
2643
        }
 
2644
 
 
2645
        /* Permissions checks */
 
2646
        if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
 
2647
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
 
2648
                                           RelationGetRelationName(rel));
 
2649
 
 
2650
        if (!allowSystemTableMods && IsSystemRelation(rel))
 
2651
                ereport(ERROR,
 
2652
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 
2653
                                 errmsg("permission denied: \"%s\" is a system catalog",
 
2654
                                                RelationGetRelationName(rel))));
 
2655
}
 
2656
 
 
2657
/*
 
2658
 * ATSimpleRecursion
 
2659
 *
 
2660
 * Simple table recursion sufficient for most ALTER TABLE operations.
 
2661
 * All direct and indirect children are processed in an unspecified order.
 
2662
 * Note that if a child inherits from the original table via multiple
 
2663
 * inheritance paths, it will be visited just once.
 
2664
 */
 
2665
static void
 
2666
ATSimpleRecursion(List **wqueue, Relation rel,
 
2667
                                  AlterTableCmd *cmd, bool recurse)
 
2668
{
 
2669
        /*
 
2670
         * Propagate to children if desired.  Non-table relations never have
 
2671
         * children, so no need to search in that case.
 
2672
         */
 
2673
        if (recurse && rel->rd_rel->relkind == RELKIND_RELATION)
 
2674
        {
 
2675
                Oid                     relid = RelationGetRelid(rel);
 
2676
                ListCell   *child;
 
2677
                List       *children;
 
2678
 
 
2679
                /* this routine is actually in the planner */
 
2680
                children = find_all_inheritors(relid);
 
2681
 
 
2682
                /*
 
2683
                 * find_all_inheritors does the recursive search of the
 
2684
                 * inheritance hierarchy, so all we have to do is process all of
 
2685
                 * the relids in the list that it returns.
 
2686
                 */
 
2687
                foreach(child, children)
 
2688
                {
 
2689
                        Oid                     childrelid = lfirst_oid(child);
 
2690
                        Relation        childrel;
 
2691
 
 
2692
                        if (childrelid == relid)
 
2693
                                continue;
 
2694
                        childrel = relation_open(childrelid, AccessExclusiveLock);
 
2695
                        ATPrepCmd(wqueue, childrel, cmd, false, true);
 
2696
                        relation_close(childrel, NoLock);
 
2697
                }
 
2698
        }
 
2699
}
 
2700
 
 
2701
/*
 
2702
 * ATOneLevelRecursion
 
2703
 *
 
2704
 * Here, we visit only direct inheritance children.  It is expected that
 
2705
 * the command's prep routine will recurse again to find indirect children.
 
2706
 * When using this technique, a multiply-inheriting child will be visited
 
2707
 * multiple times.
 
2708
 */
 
2709
static void
 
2710
ATOneLevelRecursion(List **wqueue, Relation rel,
 
2711
                                        AlterTableCmd *cmd)
 
2712
{
 
2713
        Oid                     relid = RelationGetRelid(rel);
 
2714
        ListCell   *child;
 
2715
        List       *children;
 
2716
 
 
2717
        /* this routine is actually in the planner */
 
2718
        children = find_inheritance_children(relid);
 
2719
 
 
2720
        foreach(child, children)
 
2721
        {
 
2722
                Oid                     childrelid = lfirst_oid(child);
 
2723
                Relation        childrel;
 
2724
 
 
2725
                childrel = relation_open(childrelid, AccessExclusiveLock);
 
2726
                ATPrepCmd(wqueue, childrel, cmd, true, true);
 
2727
                relation_close(childrel, NoLock);
 
2728
        }
 
2729
}
 
2730
 
 
2731
 
 
2732
/*
 
2733
 * find_composite_type_dependencies
 
2734
 *
 
2735
 * Check to see if a table's rowtype is being used as a column in some
 
2736
 * other table (possibly nested several levels deep in composite types!).
 
2737
 * Eventually, we'd like to propagate the check or rewrite operation
 
2738
 * into other such tables, but for now, just error out if we find any.
 
2739
 *
 
2740
 * We assume that functions and views depending on the type are not reasons
 
2741
 * to reject the ALTER.  (How safe is this really?)
 
2742
 */
 
2743
static void
 
2744
find_composite_type_dependencies(Oid typeOid, const char *origTblName)
 
2745
{
 
2746
        Relation        depRel;
 
2747
        ScanKeyData key[2];
 
2748
        SysScanDesc depScan;
 
2749
        HeapTuple       depTup;
 
2750
 
 
2751
        /*
 
2752
         * We scan pg_depend to find those things that depend on the rowtype.
 
2753
         * (We assume we can ignore refobjsubid for a rowtype.)
 
2754
         */
 
2755
        depRel = relation_openr(DependRelationName, AccessShareLock);
 
2756
 
 
2757
        ScanKeyInit(&key[0],
 
2758
                                Anum_pg_depend_refclassid,
 
2759
                                BTEqualStrategyNumber, F_OIDEQ,
 
2760
                                ObjectIdGetDatum(RelOid_pg_type));
 
2761
        ScanKeyInit(&key[1],
 
2762
                                Anum_pg_depend_refobjid,
 
2763
                                BTEqualStrategyNumber, F_OIDEQ,
 
2764
                                ObjectIdGetDatum(typeOid));
 
2765
 
 
2766
        depScan = systable_beginscan(depRel, DependReferenceIndex, true,
 
2767
                                                                 SnapshotNow, 2, key);
 
2768
 
 
2769
        while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
 
2770
        {
 
2771
                Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
 
2772
                Relation        rel;
 
2773
                Form_pg_attribute att;
 
2774
 
 
2775
                /* Ignore dependees that aren't user columns of relations */
 
2776
                /* (we assume system columns are never of rowtypes) */
 
2777
                if (pg_depend->classid != RelOid_pg_class ||
 
2778
                        pg_depend->objsubid <= 0)
 
2779
                        continue;
 
2780
 
 
2781
                rel = relation_open(pg_depend->objid, AccessShareLock);
 
2782
                att = rel->rd_att->attrs[pg_depend->objsubid - 1];
 
2783
 
 
2784
                if (rel->rd_rel->relkind == RELKIND_RELATION)
 
2785
                {
 
2786
                        ereport(ERROR,
 
2787
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
2788
                                         errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype",
 
2789
                                                        origTblName,
 
2790
                                                        RelationGetRelationName(rel),
 
2791
                                                        NameStr(att->attname))));
 
2792
                }
 
2793
                else if (OidIsValid(rel->rd_rel->reltype))
 
2794
                {
 
2795
                        /*
 
2796
                         * A view or composite type itself isn't a problem, but we
 
2797
                         * must recursively check for indirect dependencies via its
 
2798
                         * rowtype.
 
2799
                         */
 
2800
                        find_composite_type_dependencies(rel->rd_rel->reltype,
 
2801
                                                                                         origTblName);
 
2802
                }
 
2803
 
 
2804
                relation_close(rel, AccessShareLock);
 
2805
        }
 
2806
 
 
2807
        systable_endscan(depScan);
 
2808
 
 
2809
        relation_close(depRel, AccessShareLock);
 
2810
}
 
2811
 
 
2812
 
 
2813
/*
 
2814
 * ALTER TABLE ADD COLUMN
 
2815
 *
 
2816
 * Adds an additional attribute to a relation making the assumption that
 
2817
 * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
 
2818
 * AT_AddColumn AlterTableCmd by analyze.c and added as independent
 
2819
 * AlterTableCmd's.
 
2820
 */
 
2821
static void
 
2822
ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
 
2823
                                AlterTableCmd *cmd)
 
2824
{
 
2825
        /*
 
2826
         * Recurse to add the column to child classes, if requested.
 
2827
         *
 
2828
         * We must recurse one level at a time, so that multiply-inheriting
 
2829
         * children are visited the right number of times and end up with the
 
2830
         * right attinhcount.
 
2831
         */
 
2832
        if (recurse)
 
2833
        {
 
2834
                AlterTableCmd *childCmd = copyObject(cmd);
 
2835
                ColumnDef  *colDefChild = (ColumnDef *) childCmd->def;
 
2836
 
 
2837
                /* Child should see column as singly inherited */
 
2838
                colDefChild->inhcount = 1;
 
2839
                colDefChild->is_local = false;
 
2840
                /* and don't make a support dependency on the child */
 
2841
                colDefChild->support = NULL;
 
2842
 
 
2843
                ATOneLevelRecursion(wqueue, rel, childCmd);
 
2844
        }
 
2845
        else
 
2846
        {
 
2847
                /*
 
2848
                 * If we are told not to recurse, there had better not be any
 
2849
                 * child tables; else the addition would put them out of step.
 
2850
                 */
 
2851
                if (find_inheritance_children(RelationGetRelid(rel)) != NIL)
 
2852
                        ereport(ERROR,
 
2853
                                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 
2854
                                         errmsg("column must be added to child tables too")));
 
2855
        }
 
2856
}
 
2857
 
 
2858
static void
 
2859
ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
 
2860
                                ColumnDef *colDef)
 
2861
{
 
2862
        Oid                     myrelid = RelationGetRelid(rel);
 
2863
        Relation        pgclass,
 
2864
                                attrdesc;
 
2865
        HeapTuple       reltup;
 
2866
        HeapTuple       attributeTuple;
 
2867
        Form_pg_attribute attribute;
 
2868
        FormData_pg_attribute attributeD;
 
2869
        int                     i;
 
2870
        int                     minattnum,
 
2871
                                maxatts;
 
2872
        HeapTuple       typeTuple;
 
2873
        Oid                     typeOid;
 
2874
        Form_pg_type tform;
 
2875
        Expr       *defval;
 
2876
 
 
2877
        attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock);
 
2878
 
 
2879
        /*
 
2880
         * Are we adding the column to a recursion child?  If so, check
 
2881
         * whether to merge with an existing definition for the column.
 
2882
         */
 
2883
        if (colDef->inhcount > 0)
 
2884
        {
 
2885
                HeapTuple       tuple;
 
2886
 
 
2887
                /* Does child already have a column by this name? */
 
2888
                tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname);
 
2889
                if (HeapTupleIsValid(tuple))
 
2890
                {
 
2891
                        Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
 
2892
 
 
2893
                        /* Okay if child matches by type */
 
2894
                        if (typenameTypeId(colDef->typename) != childatt->atttypid ||
 
2895
                                colDef->typename->typmod != childatt->atttypmod)
 
2896
                                ereport(ERROR,
 
2897
                                                (errcode(ERRCODE_DATATYPE_MISMATCH),
 
2898
                                                 errmsg("child table \"%s\" has different type for column \"%s\"",
 
2899
                                                RelationGetRelationName(rel), colDef->colname)));
 
2900
 
 
2901
                        /* Bump the existing child att's inhcount */
 
2902
                        childatt->attinhcount++;
 
2903
                        simple_heap_update(attrdesc, &tuple->t_self, tuple);
 
2904
                        CatalogUpdateIndexes(attrdesc, tuple);
 
2905
 
 
2906
                        heap_freetuple(tuple);
 
2907
 
 
2908
                        /* Inform the user about the merge */
 
2909
                        ereport(NOTICE,
 
2910
                                        (errmsg("merging definition of column \"%s\" for child \"%s\"",
 
2911
                                                colDef->colname, RelationGetRelationName(rel))));
 
2912
 
 
2913
                        heap_close(attrdesc, RowExclusiveLock);
 
2914
                        return;
 
2915
                }
 
2916
        }
 
2917
 
 
2918
        pgclass = heap_openr(RelationRelationName, RowExclusiveLock);
 
2919
 
 
2920
        reltup = SearchSysCacheCopy(RELOID,
 
2921
                                                                ObjectIdGetDatum(myrelid),
 
2922
                                                                0, 0, 0);
 
2923
        if (!HeapTupleIsValid(reltup))
 
2924
                elog(ERROR, "cache lookup failed for relation %u", myrelid);
 
2925
 
 
2926
        /*
 
2927
         * this test is deliberately not attisdropped-aware, since if one
 
2928
         * tries to add a column matching a dropped column name, it's gonna
 
2929
         * fail anyway.
 
2930
         */
 
2931
        if (SearchSysCacheExists(ATTNAME,
 
2932
                                                         ObjectIdGetDatum(myrelid),
 
2933
                                                         PointerGetDatum(colDef->colname),
 
2934
                                                         0, 0))
 
2935
                ereport(ERROR,
 
2936
                                (errcode(ERRCODE_DUPLICATE_COLUMN),
 
2937
                                 errmsg("column \"%s\" of relation \"%s\" already exists",
 
2938
                                                colDef->colname, RelationGetRelationName(rel))));
 
2939
 
 
2940
        minattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts;
 
2941
        maxatts = minattnum + 1;
 
2942
        if (maxatts > MaxHeapAttributeNumber)
 
2943
                ereport(ERROR,
 
2944
                                (errcode(ERRCODE_TOO_MANY_COLUMNS),
 
2945
                                 errmsg("tables can have at most %d columns",
 
2946
                                                MaxHeapAttributeNumber)));
 
2947
        i = minattnum + 1;
 
2948
 
 
2949
        typeTuple = typenameType(colDef->typename);
 
2950
        tform = (Form_pg_type) GETSTRUCT(typeTuple);
 
2951
        typeOid = HeapTupleGetOid(typeTuple);
 
2952
 
 
2953
        /* make sure datatype is legal for a column */
 
2954
        CheckAttributeType(colDef->colname, typeOid);
 
2955
 
 
2956
        attributeTuple = heap_addheader(Natts_pg_attribute,
 
2957
                                                                        false,
 
2958
                                                                        ATTRIBUTE_TUPLE_SIZE,
 
2959
                                                                        (void *) &attributeD);
 
2960
 
 
2961
        attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple);
 
2962
 
 
2963
        attribute->attrelid = myrelid;
 
2964
        namestrcpy(&(attribute->attname), colDef->colname);
 
2965
        attribute->atttypid = typeOid;
 
2966
        attribute->attstattarget = -1;
 
2967
        attribute->attlen = tform->typlen;
 
2968
        attribute->attcacheoff = -1;
 
2969
        attribute->atttypmod = colDef->typename->typmod;
 
2970
        attribute->attnum = i;
 
2971
        attribute->attbyval = tform->typbyval;
 
2972
        attribute->attndims = list_length(colDef->typename->arrayBounds);
 
2973
        attribute->attstorage = tform->typstorage;
 
2974
        attribute->attalign = tform->typalign;
 
2975
        attribute->attnotnull = colDef->is_not_null;
 
2976
        attribute->atthasdef = false;
 
2977
        attribute->attisdropped = false;
 
2978
        attribute->attislocal = colDef->is_local;
 
2979
        attribute->attinhcount = colDef->inhcount;
 
2980
 
 
2981
        ReleaseSysCache(typeTuple);
 
2982
 
 
2983
        simple_heap_insert(attrdesc, attributeTuple);
 
2984
 
 
2985
        /* Update indexes on pg_attribute */
 
2986
        CatalogUpdateIndexes(attrdesc, attributeTuple);
 
2987
 
 
2988
        heap_close(attrdesc, RowExclusiveLock);
 
2989
 
 
2990
        /*
 
2991
         * Update number of attributes in pg_class tuple
 
2992
         */
 
2993
        ((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts;
 
2994
 
 
2995
        simple_heap_update(pgclass, &reltup->t_self, reltup);
 
2996
 
 
2997
        /* keep catalog indexes current */
 
2998
        CatalogUpdateIndexes(pgclass, reltup);
 
2999
 
 
3000
        heap_freetuple(reltup);
 
3001
 
 
3002
        heap_close(pgclass, RowExclusiveLock);
 
3003
 
 
3004
        /* Make the attribute's catalog entry visible */
 
3005
        CommandCounterIncrement();
 
3006
 
 
3007
        /*
 
3008
         * Store the DEFAULT, if any, in the catalogs
 
3009
         */
 
3010
        if (colDef->raw_default)
 
3011
        {
 
3012
                RawColumnDefault *rawEnt;
 
3013
 
 
3014
                rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
 
3015
                rawEnt->attnum = attribute->attnum;
 
3016
                rawEnt->raw_default = copyObject(colDef->raw_default);
 
3017
 
 
3018
                /*
 
3019
                 * This function is intended for CREATE TABLE, so it processes a
 
3020
                 * _list_ of defaults, but we just do one.
 
3021
                 */
 
3022
                AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
 
3023
 
 
3024
                /* Make the additional catalog changes visible */
 
3025
                CommandCounterIncrement();
 
3026
        }
 
3027
 
 
3028
        /*
 
3029
         * Tell Phase 3 to fill in the default expression, if there is one.
 
3030
         *
 
3031
         * If there is no default, Phase 3 doesn't have to do anything, because
 
3032
         * that effectively means that the default is NULL.  The heap tuple
 
3033
         * access routines always check for attnum > # of attributes in tuple,
 
3034
         * and return NULL if so, so without any modification of the tuple
 
3035
         * data we will get the effect of NULL values in the new column.
 
3036
         *
 
3037
         * An exception occurs when the new column is of a domain type: the
 
3038
         * domain might have a NOT NULL constraint, or a check constraint that
 
3039
         * indirectly rejects nulls.  If there are any domain constraints then
 
3040
         * we construct an explicit NULL default value that will be passed through
 
3041
         * CoerceToDomain processing.  (This is a tad inefficient, since it
 
3042
         * causes rewriting the table which we really don't have to do, but
 
3043
         * the present design of domain processing doesn't offer any simple way
 
3044
         * of checking the constraints more directly.)
 
3045
         *
 
3046
         * Note: we use build_column_default, and not just the cooked default
 
3047
         * returned by AddRelationRawConstraints, so that the right thing
 
3048
         * happens when a datatype's default applies.
 
3049
         */
 
3050
        defval = (Expr *) build_column_default(rel, attribute->attnum);
 
3051
 
 
3052
        if (!defval && GetDomainConstraints(typeOid) != NIL)
 
3053
        {
 
3054
                Oid             basetype = getBaseType(typeOid);
 
3055
 
 
3056
                defval = (Expr *) makeNullConst(basetype);
 
3057
                defval = (Expr *) coerce_to_target_type(NULL,
 
3058
                                                                                                (Node *) defval,
 
3059
                                                                                                basetype,
 
3060
                                                                                                typeOid,
 
3061
                                                                                                colDef->typename->typmod,
 
3062
                                                                                                COERCION_ASSIGNMENT,
 
3063
                                                                                                COERCE_IMPLICIT_CAST);
 
3064
                if (defval == NULL)             /* should not happen */
 
3065
                        elog(ERROR, "failed to coerce base type to domain");
 
3066
        }
 
3067
 
 
3068
        if (defval)
 
3069
        {
 
3070
                NewColumnValue *newval;
 
3071
 
 
3072
                newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
 
3073
                newval->attnum = attribute->attnum;
 
3074
                newval->expr = defval;
 
3075
 
 
3076
                tab->newvals = lappend(tab->newvals, newval);
 
3077
        }
 
3078
 
 
3079
        /*
 
3080
         * Add needed dependency entries for the new column.
 
3081
         */
 
3082
        add_column_datatype_dependency(myrelid, i, attribute->atttypid);
 
3083
        if (colDef->support != NULL)
 
3084
                add_column_support_dependency(myrelid, i, colDef->support);
 
3085
}
 
3086
 
 
3087
/*
 
3088
 * Install a column's dependency on its datatype.
 
3089
 */
 
3090
static void
 
3091
add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
 
3092
{
 
3093
        ObjectAddress myself,
 
3094
                                referenced;
 
3095
 
 
3096
        myself.classId = RelOid_pg_class;
 
3097
        myself.objectId = relid;
 
3098
        myself.objectSubId = attnum;
 
3099
        referenced.classId = RelOid_pg_type;
 
3100
        referenced.objectId = typid;
 
3101
        referenced.objectSubId = 0;
 
3102
        recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
 
3103
}
 
3104
 
 
3105
/*
 
3106
 * Install a dependency for a column's supporting relation (serial sequence).
 
3107
 */
 
3108
static void
 
3109
add_column_support_dependency(Oid relid, int32 attnum, RangeVar *support)
 
3110
{
 
3111
        ObjectAddress colobject,
 
3112
                                suppobject;
 
3113
 
 
3114
        colobject.classId = RelOid_pg_class;
 
3115
        colobject.objectId = relid;
 
3116
        colobject.objectSubId = attnum;
 
3117
        suppobject.classId = RelOid_pg_class;
 
3118
        suppobject.objectId = RangeVarGetRelid(support, false);
 
3119
        suppobject.objectSubId = 0;
 
3120
        recordDependencyOn(&suppobject, &colobject, DEPENDENCY_INTERNAL);
 
3121
}
 
3122
 
 
3123
/*
 
3124
 * ALTER TABLE ALTER COLUMN DROP NOT NULL
 
3125
 */
 
3126
static void
 
3127
ATExecDropNotNull(Relation rel, const char *colName)
 
3128
{
 
3129
        HeapTuple       tuple;
 
3130
        AttrNumber      attnum;
 
3131
        Relation        attr_rel;
 
3132
        List       *indexoidlist;
 
3133
        ListCell   *indexoidscan;
 
3134
 
 
3135
        /*
 
3136
         * lookup the attribute
 
3137
         */
 
3138
        attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock);
 
3139
 
 
3140
        tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
 
3141
 
 
3142
        if (!HeapTupleIsValid(tuple))
 
3143
                ereport(ERROR,
 
3144
                                (errcode(ERRCODE_UNDEFINED_COLUMN),
 
3145
                                 errmsg("column \"%s\" of relation \"%s\" does not exist",
 
3146
                                                colName, RelationGetRelationName(rel))));
 
3147
 
 
3148
        attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
 
3149
 
 
3150
        /* Prevent them from altering a system attribute */
 
3151
        if (attnum <= 0)
 
3152
                ereport(ERROR,
 
3153
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
3154
                                 errmsg("cannot alter system column \"%s\"",
 
3155
                                                colName)));
 
3156
 
 
3157
        /*
 
3158
         * Check that the attribute is not in a primary key
 
3159
         */
 
3160
 
 
3161
        /* Loop over all indexes on the relation */
 
3162
        indexoidlist = RelationGetIndexList(rel);
 
3163
 
 
3164
        foreach(indexoidscan, indexoidlist)
 
3165
        {
 
3166
                Oid                     indexoid = lfirst_oid(indexoidscan);
 
3167
                HeapTuple       indexTuple;
 
3168
                Form_pg_index indexStruct;
 
3169
                int                     i;
 
3170
 
 
3171
                indexTuple = SearchSysCache(INDEXRELID,
 
3172
                                                                        ObjectIdGetDatum(indexoid),
 
3173
                                                                        0, 0, 0);
 
3174
                if (!HeapTupleIsValid(indexTuple))
 
3175
                        elog(ERROR, "cache lookup failed for index %u", indexoid);
 
3176
                indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
 
3177
 
 
3178
                /* If the index is not a primary key, skip the check */
 
3179
                if (indexStruct->indisprimary)
 
3180
                {
 
3181
                        /*
 
3182
                         * Loop over each attribute in the primary key and see if it
 
3183
                         * matches the to-be-altered attribute
 
3184
                         */
 
3185
                        for (i = 0; i < indexStruct->indnatts; i++)
 
3186
                        {
 
3187
                                if (indexStruct->indkey[i] == attnum)
 
3188
                                        ereport(ERROR,
 
3189
                                                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 
3190
                                                         errmsg("column \"%s\" is in a primary key",
 
3191
                                                                        colName)));
 
3192
                        }
 
3193
                }
 
3194
 
 
3195
                ReleaseSysCache(indexTuple);
 
3196
        }
 
3197
 
 
3198
        list_free(indexoidlist);
 
3199
 
 
3200
        /*
 
3201
         * Okay, actually perform the catalog change ... if needed
 
3202
         */
 
3203
        if (((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
 
3204
        {
 
3205
                ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE;
 
3206
 
 
3207
                simple_heap_update(attr_rel, &tuple->t_self, tuple);
 
3208
 
 
3209
                /* keep the system catalog indexes current */
 
3210
                CatalogUpdateIndexes(attr_rel, tuple);
 
3211
        }
 
3212
 
 
3213
        heap_close(attr_rel, RowExclusiveLock);
 
3214
}
 
3215
 
 
3216
/*
 
3217
 * ALTER TABLE ALTER COLUMN SET NOT NULL
 
3218
 */
 
3219
static void
 
3220
ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
 
3221
                                 const char *colName)
 
3222
{
 
3223
        HeapTuple       tuple;
 
3224
        AttrNumber      attnum;
 
3225
        Relation        attr_rel;
 
3226
        NewConstraint *newcon;
 
3227
 
 
3228
        /*
 
3229
         * lookup the attribute
 
3230
         */
 
3231
        attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock);
 
3232
 
 
3233
        tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
 
3234
 
 
3235
        if (!HeapTupleIsValid(tuple))
 
3236
                ereport(ERROR,
 
3237
                                (errcode(ERRCODE_UNDEFINED_COLUMN),
 
3238
                                 errmsg("column \"%s\" of relation \"%s\" does not exist",
 
3239
                                                colName, RelationGetRelationName(rel))));
 
3240
 
 
3241
        attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
 
3242
 
 
3243
        /* Prevent them from altering a system attribute */
 
3244
        if (attnum <= 0)
 
3245
                ereport(ERROR,
 
3246
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
3247
                                 errmsg("cannot alter system column \"%s\"",
 
3248
                                                colName)));
 
3249
 
 
3250
        /*
 
3251
         * Okay, actually perform the catalog change ... if needed
 
3252
         */
 
3253
        if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
 
3254
        {
 
3255
                ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE;
 
3256
 
 
3257
                simple_heap_update(attr_rel, &tuple->t_self, tuple);
 
3258
 
 
3259
                /* keep the system catalog indexes current */
 
3260
                CatalogUpdateIndexes(attr_rel, tuple);
 
3261
 
 
3262
                /* Tell Phase 3 to test the constraint */
 
3263
                newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
 
3264
                newcon->contype = CONSTR_NOTNULL;
 
3265
                newcon->attnum = attnum;
 
3266
                newcon->name = "NOT NULL";
 
3267
 
 
3268
                tab->constraints = lappend(tab->constraints, newcon);
 
3269
        }
 
3270
 
 
3271
        heap_close(attr_rel, RowExclusiveLock);
 
3272
}
 
3273
 
 
3274
/*
 
3275
 * ALTER TABLE ALTER COLUMN SET/DROP DEFAULT
 
3276
 */
 
3277
static void
 
3278
ATExecColumnDefault(Relation rel, const char *colName,
 
3279
                                        Node *newDefault)
 
3280
{
 
3281
        AttrNumber      attnum;
 
3282
 
 
3283
        /*
 
3284
         * get the number of the attribute
 
3285
         */
 
3286
        attnum = get_attnum(RelationGetRelid(rel), colName);
 
3287
        if (attnum == InvalidAttrNumber)
 
3288
                ereport(ERROR,
 
3289
                                (errcode(ERRCODE_UNDEFINED_COLUMN),
 
3290
                                 errmsg("column \"%s\" of relation \"%s\" does not exist",
 
3291
                                                colName, RelationGetRelationName(rel))));
 
3292
 
 
3293
        /* Prevent them from altering a system attribute */
 
3294
        if (attnum <= 0)
 
3295
                ereport(ERROR,
 
3296
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
3297
                                 errmsg("cannot alter system column \"%s\"",
 
3298
                                                colName)));
 
3299
 
 
3300
        /*
 
3301
         * Remove any old default for the column.  We use RESTRICT here for
 
3302
         * safety, but at present we do not expect anything to depend on the
 
3303
         * default.
 
3304
         */
 
3305
        RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false);
 
3306
 
 
3307
        if (newDefault)
 
3308
        {
 
3309
                /* SET DEFAULT */
 
3310
                RawColumnDefault *rawEnt;
 
3311
 
 
3312
                rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
 
3313
                rawEnt->attnum = attnum;
 
3314
                rawEnt->raw_default = newDefault;
 
3315
 
 
3316
                /*
 
3317
                 * This function is intended for CREATE TABLE, so it processes a
 
3318
                 * _list_ of defaults, but we just do one.
 
3319
                 */
 
3320
                AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
 
3321
        }
 
3322
}
 
3323
 
 
3324
/*
 
3325
 * ALTER TABLE ALTER COLUMN SET STATISTICS
 
3326
 */
 
3327
static void
 
3328
ATPrepSetStatistics(Relation rel, const char *colName, Node *flagValue)
 
3329
{
 
3330
        /*
 
3331
         * We do our own permission checking because (a) we want to allow SET
 
3332
         * STATISTICS on indexes (for expressional index columns), and (b) we
 
3333
         * want to allow SET STATISTICS on system catalogs without requiring
 
3334
         * allowSystemTableMods to be turned on.
 
3335
         */
 
3336
        if (rel->rd_rel->relkind != RELKIND_RELATION &&
 
3337
                rel->rd_rel->relkind != RELKIND_INDEX)
 
3338
                ereport(ERROR,
 
3339
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
 
3340
                                 errmsg("\"%s\" is not a table or index",
 
3341
                                                RelationGetRelationName(rel))));
 
3342
 
 
3343
        /* Permissions checks */
 
3344
        if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
 
3345
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
 
3346
                                           RelationGetRelationName(rel));
 
3347
}
 
3348
 
 
3349
static void
 
3350
ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
 
3351
{
 
3352
        int                     newtarget;
 
3353
        Relation        attrelation;
 
3354
        HeapTuple       tuple;
 
3355
        Form_pg_attribute attrtuple;
 
3356
 
 
3357
        Assert(IsA(newValue, Integer));
 
3358
        newtarget = intVal(newValue);
 
3359
 
 
3360
        /*
 
3361
         * Limit target to a sane range
 
3362
         */
 
3363
        if (newtarget < -1)
 
3364
        {
 
3365
                ereport(ERROR,
 
3366
                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 
3367
                                 errmsg("statistics target %d is too low",
 
3368
                                                newtarget)));
 
3369
        }
 
3370
        else if (newtarget > 1000)
 
3371
        {
 
3372
                newtarget = 1000;
 
3373
                ereport(WARNING,
 
3374
                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 
3375
                                 errmsg("lowering statistics target to %d",
 
3376
                                                newtarget)));
 
3377
        }
 
3378
 
 
3379
        attrelation = heap_openr(AttributeRelationName, RowExclusiveLock);
 
3380
 
 
3381
        tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
 
3382
 
 
3383
        if (!HeapTupleIsValid(tuple))
 
3384
                ereport(ERROR,
 
3385
                                (errcode(ERRCODE_UNDEFINED_COLUMN),
 
3386
                                 errmsg("column \"%s\" of relation \"%s\" does not exist",
 
3387
                                                colName, RelationGetRelationName(rel))));
 
3388
        attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
 
3389
 
 
3390
        if (attrtuple->attnum <= 0)
 
3391
                ereport(ERROR,
 
3392
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
3393
                                 errmsg("cannot alter system column \"%s\"",
 
3394
                                                colName)));
 
3395
 
 
3396
        attrtuple->attstattarget = newtarget;
 
3397
 
 
3398
        simple_heap_update(attrelation, &tuple->t_self, tuple);
 
3399
 
 
3400
        /* keep system catalog indexes current */
 
3401
        CatalogUpdateIndexes(attrelation, tuple);
 
3402
 
 
3403
        heap_freetuple(tuple);
 
3404
 
 
3405
        heap_close(attrelation, RowExclusiveLock);
 
3406
}
 
3407
 
 
3408
/*
 
3409
 * ALTER TABLE ALTER COLUMN SET STORAGE
 
3410
 */
 
3411
static void
 
3412
ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
 
3413
{
 
3414
        char       *storagemode;
 
3415
        char            newstorage;
 
3416
        Relation        attrelation;
 
3417
        HeapTuple       tuple;
 
3418
        Form_pg_attribute attrtuple;
 
3419
 
 
3420
        Assert(IsA(newValue, String));
 
3421
        storagemode = strVal(newValue);
 
3422
 
 
3423
        if (pg_strcasecmp(storagemode, "plain") == 0)
 
3424
                newstorage = 'p';
 
3425
        else if (pg_strcasecmp(storagemode, "external") == 0)
 
3426
                newstorage = 'e';
 
3427
        else if (pg_strcasecmp(storagemode, "extended") == 0)
 
3428
                newstorage = 'x';
 
3429
        else if (pg_strcasecmp(storagemode, "main") == 0)
 
3430
                newstorage = 'm';
 
3431
        else
 
3432
        {
 
3433
                ereport(ERROR,
 
3434
                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 
3435
                                 errmsg("invalid storage type \"%s\"",
 
3436
                                                storagemode)));
 
3437
                newstorage = 0;                 /* keep compiler quiet */
 
3438
        }
 
3439
 
 
3440
        attrelation = heap_openr(AttributeRelationName, RowExclusiveLock);
 
3441
 
 
3442
        tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
 
3443
 
 
3444
        if (!HeapTupleIsValid(tuple))
 
3445
                ereport(ERROR,
 
3446
                                (errcode(ERRCODE_UNDEFINED_COLUMN),
 
3447
                                 errmsg("column \"%s\" of relation \"%s\" does not exist",
 
3448
                                                colName, RelationGetRelationName(rel))));
 
3449
        attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
 
3450
 
 
3451
        if (attrtuple->attnum <= 0)
 
3452
                ereport(ERROR,
 
3453
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
3454
                                 errmsg("cannot alter system column \"%s\"",
 
3455
                                                colName)));
 
3456
 
 
3457
        /*
 
3458
         * safety check: do not allow toasted storage modes unless column
 
3459
         * datatype is TOAST-aware.
 
3460
         */
 
3461
        if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid))
 
3462
                attrtuple->attstorage = newstorage;
 
3463
        else
 
3464
                ereport(ERROR,
 
3465
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
3466
                                 errmsg("column data type %s can only have storage PLAIN",
 
3467
                                                format_type_be(attrtuple->atttypid))));
 
3468
 
 
3469
        simple_heap_update(attrelation, &tuple->t_self, tuple);
 
3470
 
 
3471
        /* keep system catalog indexes current */
 
3472
        CatalogUpdateIndexes(attrelation, tuple);
 
3473
 
 
3474
        heap_freetuple(tuple);
 
3475
 
 
3476
        heap_close(attrelation, RowExclusiveLock);
 
3477
}
 
3478
 
 
3479
 
 
3480
/*
 
3481
 * ALTER TABLE DROP COLUMN
 
3482
 *
 
3483
 * DROP COLUMN cannot use the normal ALTER TABLE recursion mechanism,
 
3484
 * because we have to decide at runtime whether to recurse or not depending
 
3485
 * on whether attinhcount goes to zero or not.  (We can't check this in a
 
3486
 * static pre-pass because it won't handle multiple inheritance situations
 
3487
 * correctly.)  Since DROP COLUMN doesn't need to create any work queue
 
3488
 * entries for Phase 3, it's okay to recurse internally in this routine
 
3489
 * without considering the work queue.
 
3490
 */
 
3491
static void
 
3492
ATExecDropColumn(Relation rel, const char *colName,
 
3493
                                 DropBehavior behavior,
 
3494
                                 bool recurse, bool recursing)
 
3495
{
 
3496
        HeapTuple       tuple;
 
3497
        Form_pg_attribute targetatt;
 
3498
        AttrNumber      attnum;
 
3499
        List       *children;
 
3500
        ObjectAddress object;
 
3501
 
 
3502
        /* At top level, permission check was done in ATPrepCmd, else do it */
 
3503
        if (recursing)
 
3504
                ATSimplePermissions(rel, false);
 
3505
 
 
3506
        /*
 
3507
         * get the number of the attribute
 
3508
         */
 
3509
        tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
 
3510
        if (!HeapTupleIsValid(tuple))
 
3511
                ereport(ERROR,
 
3512
                                (errcode(ERRCODE_UNDEFINED_COLUMN),
 
3513
                                 errmsg("column \"%s\" of relation \"%s\" does not exist",
 
3514
                                                colName, RelationGetRelationName(rel))));
 
3515
        targetatt = (Form_pg_attribute) GETSTRUCT(tuple);
 
3516
 
 
3517
        attnum = targetatt->attnum;
 
3518
 
 
3519
        /* Can't drop a system attribute, except OID */
 
3520
        if (attnum <= 0 && attnum != ObjectIdAttributeNumber)
 
3521
                ereport(ERROR,
 
3522
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
3523
                                 errmsg("cannot drop system column \"%s\"",
 
3524
                                                colName)));
 
3525
 
 
3526
        /* Don't drop inherited columns */
 
3527
        if (targetatt->attinhcount > 0 && !recursing)
 
3528
                ereport(ERROR,
 
3529
                                (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 
3530
                                 errmsg("cannot drop inherited column \"%s\"",
 
3531
                                                colName)));
 
3532
 
 
3533
        ReleaseSysCache(tuple);
 
3534
 
 
3535
        /*
 
3536
         * Propagate to children as appropriate.  Unlike most other ALTER
 
3537
         * routines, we have to do this one level of recursion at a time; we
 
3538
         * can't use find_all_inheritors to do it in one pass.
 
3539
         */
 
3540
        children = find_inheritance_children(RelationGetRelid(rel));
 
3541
 
 
3542
        if (children)
 
3543
        {
 
3544
                Relation        attr_rel;
 
3545
                ListCell   *child;
 
3546
 
 
3547
                attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock);
 
3548
                foreach(child, children)
 
3549
                {
 
3550
                        Oid                     childrelid = lfirst_oid(child);
 
3551
                        Relation        childrel;
 
3552
                        Form_pg_attribute childatt;
 
3553
 
 
3554
                        childrel = heap_open(childrelid, AccessExclusiveLock);
 
3555
 
 
3556
                        tuple = SearchSysCacheCopyAttName(childrelid, colName);
 
3557
                        if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
 
3558
                                elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u",
 
3559
                                         colName, childrelid);
 
3560
                        childatt = (Form_pg_attribute) GETSTRUCT(tuple);
 
3561
 
 
3562
                        if (childatt->attinhcount <= 0)         /* shouldn't happen */
 
3563
                                elog(ERROR, "relation %u has non-inherited attribute \"%s\"",
 
3564
                                         childrelid, colName);
 
3565
 
 
3566
                        if (recurse)
 
3567
                        {
 
3568
                                /*
 
3569
                                 * If the child column has other definition sources, just
 
3570
                                 * decrement its inheritance count; if not, recurse to
 
3571
                                 * delete it.
 
3572
                                 */
 
3573
                                if (childatt->attinhcount == 1 && !childatt->attislocal)
 
3574
                                {
 
3575
                                        /* Time to delete this child column, too */
 
3576
                                        ATExecDropColumn(childrel, colName, behavior, true, true);
 
3577
                                }
 
3578
                                else
 
3579
                                {
 
3580
                                        /* Child column must survive my deletion */
 
3581
                                        childatt->attinhcount--;
 
3582
 
 
3583
                                        simple_heap_update(attr_rel, &tuple->t_self, tuple);
 
3584
 
 
3585
                                        /* keep the system catalog indexes current */
 
3586
                                        CatalogUpdateIndexes(attr_rel, tuple);
 
3587
 
 
3588
                                        /* Make update visible */
 
3589
                                        CommandCounterIncrement();
 
3590
                                }
 
3591
                        }
 
3592
                        else
 
3593
                        {
 
3594
                                /*
 
3595
                                 * If we were told to drop ONLY in this table (no
 
3596
                                 * recursion), we need to mark the inheritors' attribute
 
3597
                                 * as locally defined rather than inherited.
 
3598
                                 */
 
3599
                                childatt->attinhcount--;
 
3600
                                childatt->attislocal = true;
 
3601
 
 
3602
                                simple_heap_update(attr_rel, &tuple->t_self, tuple);
 
3603
 
 
3604
                                /* keep the system catalog indexes current */
 
3605
                                CatalogUpdateIndexes(attr_rel, tuple);
 
3606
 
 
3607
                                /* Make update visible */
 
3608
                                CommandCounterIncrement();
 
3609
                        }
 
3610
 
 
3611
                        heap_freetuple(tuple);
 
3612
 
 
3613
                        heap_close(childrel, NoLock);
 
3614
                }
 
3615
                heap_close(attr_rel, RowExclusiveLock);
 
3616
        }
 
3617
 
 
3618
        /*
 
3619
         * Perform the actual column deletion
 
3620
         */
 
3621
        object.classId = RelOid_pg_class;
 
3622
        object.objectId = RelationGetRelid(rel);
 
3623
        object.objectSubId = attnum;
 
3624
 
 
3625
        performDeletion(&object, behavior);
 
3626
 
 
3627
        /*
 
3628
         * If we dropped the OID column, must adjust pg_class.relhasoids
 
3629
         */
 
3630
        if (attnum == ObjectIdAttributeNumber)
 
3631
        {
 
3632
                Relation        class_rel;
 
3633
                Form_pg_class tuple_class;
 
3634
 
 
3635
                class_rel = heap_openr(RelationRelationName, RowExclusiveLock);
 
3636
 
 
3637
                tuple = SearchSysCacheCopy(RELOID,
 
3638
                                                                 ObjectIdGetDatum(RelationGetRelid(rel)),
 
3639
                                                                   0, 0, 0);
 
3640
                if (!HeapTupleIsValid(tuple))
 
3641
                        elog(ERROR, "cache lookup failed for relation %u",
 
3642
                                 RelationGetRelid(rel));
 
3643
                tuple_class = (Form_pg_class) GETSTRUCT(tuple);
 
3644
 
 
3645
                tuple_class->relhasoids = false;
 
3646
                simple_heap_update(class_rel, &tuple->t_self, tuple);
 
3647
 
 
3648
                /* Keep the catalog indexes up to date */
 
3649
                CatalogUpdateIndexes(class_rel, tuple);
 
3650
 
 
3651
                heap_close(class_rel, RowExclusiveLock);
 
3652
        }
 
3653
}
 
3654
 
 
3655
/*
 
3656
 * ALTER TABLE ADD INDEX
 
3657
 *
 
3658
 * There is no such command in the grammar, but the parser converts UNIQUE
 
3659
 * and PRIMARY KEY constraints into AT_AddIndex subcommands.  This lets us
 
3660
 * schedule creation of the index at the appropriate time during ALTER.
 
3661
 */
 
3662
static void
 
3663
ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
 
3664
                           IndexStmt *stmt, bool is_rebuild)
 
3665
{
 
3666
        bool            check_rights;
 
3667
        bool            skip_build;
 
3668
        bool            quiet;
 
3669
 
 
3670
        Assert(IsA(stmt, IndexStmt));
 
3671
 
 
3672
        /* suppress schema rights check when rebuilding existing index */
 
3673
        check_rights = !is_rebuild;
 
3674
        /* skip index build if phase 3 will have to rewrite table anyway */
 
3675
        skip_build = (tab->newvals != NIL);
 
3676
        /* suppress notices when rebuilding existing index */
 
3677
        quiet = is_rebuild;
 
3678
 
 
3679
        DefineIndex(stmt->relation, /* relation */
 
3680
                                stmt->idxname,  /* index name */
 
3681
                                stmt->accessMethod,             /* am name */
 
3682
                                stmt->tableSpace,
 
3683
                                stmt->indexParams,              /* parameters */
 
3684
                                (Expr *) stmt->whereClause,
 
3685
                                stmt->rangetable,
 
3686
                                stmt->unique,
 
3687
                                stmt->primary,
 
3688
                                stmt->isconstraint,
 
3689
                                true,                   /* is_alter_table */
 
3690
                                check_rights,
 
3691
                                skip_build,
 
3692
                                quiet);
 
3693
}
 
3694
 
 
3695
/*
 
3696
 * ALTER TABLE ADD CONSTRAINT
 
3697
 */
 
3698
static void
 
3699
ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint)
 
3700
{
 
3701
        switch (nodeTag(newConstraint))
 
3702
        {
 
3703
                case T_Constraint:
 
3704
                        {
 
3705
                                Constraint *constr = (Constraint *) newConstraint;
 
3706
 
 
3707
                                /*
 
3708
                                 * Currently, we only expect to see CONSTR_CHECK nodes
 
3709
                                 * arriving here (see the preprocessing done in
 
3710
                                 * parser/analyze.c).  Use a switch anyway to make it
 
3711
                                 * easier to add more code later.
 
3712
                                 */
 
3713
                                switch (constr->contype)
 
3714
                                {
 
3715
                                        case CONSTR_CHECK:
 
3716
                                                {
 
3717
                                                        List       *newcons;
 
3718
                                                        ListCell   *lcon;
 
3719
 
 
3720
                                                        /*
 
3721
                                                         * Call AddRelationRawConstraints to do the
 
3722
                                                         * work. It returns a list of cooked
 
3723
                                                         * constraints.
 
3724
                                                         */
 
3725
                                                        newcons = AddRelationRawConstraints(rel, NIL,
 
3726
                                                                                                         list_make1(constr));
 
3727
                                                        /* Add each constraint to Phase 3's queue */
 
3728
                                                        foreach(lcon, newcons)
 
3729
                                                        {
 
3730
                                                                CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
 
3731
                                                                NewConstraint *newcon;
 
3732
 
 
3733
                                                                newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
 
3734
                                                                newcon->name = ccon->name;
 
3735
                                                                newcon->contype = ccon->contype;
 
3736
                                                                newcon->attnum = ccon->attnum;
 
3737
                                                                /* ExecQual wants implicit-AND format */
 
3738
                                                                newcon->qual = (Node *)
 
3739
                                                                        make_ands_implicit((Expr *) ccon->expr);
 
3740
 
 
3741
                                                                tab->constraints = lappend(tab->constraints,
 
3742
                                                                                                                   newcon);
 
3743
                                                        }
 
3744
                                                        break;
 
3745
                                                }
 
3746
                                        default:
 
3747
                                                elog(ERROR, "unrecognized constraint type: %d",
 
3748
                                                         (int) constr->contype);
 
3749
                                }
 
3750
                                break;
 
3751
                        }
 
3752
                case T_FkConstraint:
 
3753
                        {
 
3754
                                FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
 
3755
 
 
3756
                                /*
 
3757
                                 * Assign or validate constraint name
 
3758
                                 */
 
3759
                                if (fkconstraint->constr_name)
 
3760
                                {
 
3761
                                        if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
 
3762
                                                                                         RelationGetRelid(rel),
 
3763
                                                                                         RelationGetNamespace(rel),
 
3764
                                                                                         fkconstraint->constr_name))
 
3765
                                                ereport(ERROR,
 
3766
                                                                (errcode(ERRCODE_DUPLICATE_OBJECT),
 
3767
                                                                 errmsg("constraint \"%s\" for relation \"%s\" already exists",
 
3768
                                                                                fkconstraint->constr_name,
 
3769
                                                                                RelationGetRelationName(rel))));
 
3770
                                }
 
3771
                                else
 
3772
                                        fkconstraint->constr_name =
 
3773
                                                ChooseConstraintName(RelationGetRelationName(rel),
 
3774
                                                                strVal(linitial(fkconstraint->fk_attrs)),
 
3775
                                                                                         "fkey",
 
3776
                                                                                         RelationGetNamespace(rel),
 
3777
                                                                                         NIL);
 
3778
 
 
3779
                                ATAddForeignKeyConstraint(tab, rel, fkconstraint);
 
3780
 
 
3781
                                break;
 
3782
                        }
 
3783
                default:
 
3784
                        elog(ERROR, "unrecognized node type: %d",
 
3785
                                 (int) nodeTag(newConstraint));
 
3786
        }
 
3787
}
 
3788
 
 
3789
/*
 
3790
 * Add a foreign-key constraint to a single table
 
3791
 *
 
3792
 * Subroutine for ATExecAddConstraint.  Must already hold exclusive
 
3793
 * lock on the rel, and have done appropriate validity/permissions checks
 
3794
 * for it.
 
3795
 */
 
3796
static void
 
3797
ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
 
3798
                                                  FkConstraint *fkconstraint)
 
3799
{
 
3800
        Relation        pkrel;
 
3801
        AclResult       aclresult;
 
3802
        int16           pkattnum[INDEX_MAX_KEYS];
 
3803
        int16           fkattnum[INDEX_MAX_KEYS];
 
3804
        Oid                     pktypoid[INDEX_MAX_KEYS];
 
3805
        Oid                     fktypoid[INDEX_MAX_KEYS];
 
3806
        Oid                     opclasses[INDEX_MAX_KEYS];
 
3807
        int                     i;
 
3808
        int                     numfks,
 
3809
                                numpks;
 
3810
        Oid                     indexOid;
 
3811
        Oid                     constrOid;
 
3812
 
 
3813
        /*
 
3814
         * Grab an exclusive lock on the pk table, so that someone doesn't
 
3815
         * delete rows out from under us. (Although a lesser lock would do for
 
3816
         * that purpose, we'll need exclusive lock anyway to add triggers to
 
3817
         * the pk table; trying to start with a lesser lock will just create a
 
3818
         * risk of deadlock.)
 
3819
         */
 
3820
        pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
 
3821
 
 
3822
        /*
 
3823
         * Validity and permissions checks
 
3824
         *
 
3825
         * Note: REFERENCES permissions checks are redundant with CREATE TRIGGER,
 
3826
         * but we may as well error out sooner instead of later.
 
3827
         */
 
3828
        if (pkrel->rd_rel->relkind != RELKIND_RELATION)
 
3829
                ereport(ERROR,
 
3830
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
 
3831
                                 errmsg("referenced relation \"%s\" is not a table",
 
3832
                                                RelationGetRelationName(pkrel))));
 
3833
 
 
3834
        aclresult = pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(),
 
3835
                                                                  ACL_REFERENCES);
 
3836
        if (aclresult != ACLCHECK_OK)
 
3837
                aclcheck_error(aclresult, ACL_KIND_CLASS,
 
3838
                                           RelationGetRelationName(pkrel));
 
3839
 
 
3840
        if (!allowSystemTableMods && IsSystemRelation(pkrel))
 
3841
                ereport(ERROR,
 
3842
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 
3843
                                 errmsg("permission denied: \"%s\" is a system catalog",
 
3844
                                                RelationGetRelationName(pkrel))));
 
3845
 
 
3846
        aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
 
3847
                                                                  ACL_REFERENCES);
 
3848
        if (aclresult != ACLCHECK_OK)
 
3849
                aclcheck_error(aclresult, ACL_KIND_CLASS,
 
3850
                                           RelationGetRelationName(rel));
 
3851
 
 
3852
        /*
 
3853
         * Disallow reference from permanent table to temp table or vice
 
3854
         * versa. (The ban on perm->temp is for fairly obvious reasons.  The
 
3855
         * ban on temp->perm is because other backends might need to run the
 
3856
         * RI triggers on the perm table, but they can't reliably see tuples
 
3857
         * the owning backend has created in the temp table, because
 
3858
         * non-shared buffers are used for temp tables.)
 
3859
         */
 
3860
        if (isTempNamespace(RelationGetNamespace(pkrel)))
 
3861
        {
 
3862
                if (!isTempNamespace(RelationGetNamespace(rel)))
 
3863
                        ereport(ERROR,
 
3864
                                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 
3865
                                         errmsg("cannot reference temporary table from permanent table constraint")));
 
3866
        }
 
3867
        else
 
3868
        {
 
3869
                if (isTempNamespace(RelationGetNamespace(rel)))
 
3870
                        ereport(ERROR,
 
3871
                                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 
3872
                                         errmsg("cannot reference permanent table from temporary table constraint")));
 
3873
        }
 
3874
 
 
3875
        /*
 
3876
         * Look up the referencing attributes to make sure they exist, and
 
3877
         * record their attnums and type OIDs.
 
3878
         */
 
3879
        MemSet(pkattnum, 0, sizeof(pkattnum));
 
3880
        MemSet(fkattnum, 0, sizeof(fkattnum));
 
3881
        MemSet(pktypoid, 0, sizeof(pktypoid));
 
3882
        MemSet(fktypoid, 0, sizeof(fktypoid));
 
3883
        MemSet(opclasses, 0, sizeof(opclasses));
 
3884
 
 
3885
        numfks = transformColumnNameList(RelationGetRelid(rel),
 
3886
                                                                         fkconstraint->fk_attrs,
 
3887
                                                                         fkattnum, fktypoid);
 
3888
 
 
3889
        /*
 
3890
         * If the attribute list for the referenced table was omitted, lookup
 
3891
         * the definition of the primary key and use it.  Otherwise, validate
 
3892
         * the supplied attribute list.  In either case, discover the index
 
3893
         * OID and index opclasses, and the attnums and type OIDs of the
 
3894
         * attributes.
 
3895
         */
 
3896
        if (fkconstraint->pk_attrs == NIL)
 
3897
        {
 
3898
                numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid,
 
3899
                                                                                        &fkconstraint->pk_attrs,
 
3900
                                                                                        pkattnum, pktypoid,
 
3901
                                                                                        opclasses);
 
3902
        }
 
3903
        else
 
3904
        {
 
3905
                numpks = transformColumnNameList(RelationGetRelid(pkrel),
 
3906
                                                                                 fkconstraint->pk_attrs,
 
3907
                                                                                 pkattnum, pktypoid);
 
3908
                /* Look for an index matching the column list */
 
3909
                indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum,
 
3910
                                                                                   opclasses);
 
3911
        }
 
3912
 
 
3913
        /* Be sure referencing and referenced column types are comparable */
 
3914
        if (numfks != numpks)
 
3915
                ereport(ERROR,
 
3916
                                (errcode(ERRCODE_INVALID_FOREIGN_KEY),
 
3917
                                 errmsg("number of referencing and referenced columns for foreign key disagree")));
 
3918
 
 
3919
        for (i = 0; i < numpks; i++)
 
3920
        {
 
3921
                /*
 
3922
                 * pktypoid[i] is the primary key table's i'th key's type
 
3923
                 * fktypoid[i] is the foreign key table's i'th key's type
 
3924
                 *
 
3925
                 * Note that we look for an operator with the PK type on the left;
 
3926
                 * when the types are different this is critical because the PK
 
3927
                 * index will need operators with the indexkey on the left.
 
3928
                 * (Ordinarily both commutator operators will exist if either
 
3929
                 * does, but we won't get the right answer from the test below on
 
3930
                 * opclass membership unless we select the proper operator.)
 
3931
                 */
 
3932
                Operator        o = oper(list_make1(makeString("=")),
 
3933
                                                         pktypoid[i], fktypoid[i], true);
 
3934
 
 
3935
                if (o == NULL)
 
3936
                        ereport(ERROR,
 
3937
                                        (errcode(ERRCODE_UNDEFINED_FUNCTION),
 
3938
                                         errmsg("foreign key constraint \"%s\" "
 
3939
                                                        "cannot be implemented",
 
3940
                                                        fkconstraint->constr_name),
 
3941
                                         errdetail("Key columns \"%s\" and \"%s\" "
 
3942
                                                           "are of incompatible types: %s and %s.",
 
3943
                                                         strVal(list_nth(fkconstraint->fk_attrs, i)),
 
3944
                                                         strVal(list_nth(fkconstraint->pk_attrs, i)),
 
3945
                                                           format_type_be(fktypoid[i]),
 
3946
                                                           format_type_be(pktypoid[i]))));
 
3947
 
 
3948
                /*
 
3949
                 * Check that the found operator is compatible with the PK index,
 
3950
                 * and generate a warning if not, since otherwise costly seqscans
 
3951
                 * will be incurred to check FK validity.
 
3952
                 */
 
3953
                if (!op_in_opclass(oprid(o), opclasses[i]))
 
3954
                        ereport(WARNING,
 
3955
                                        (errmsg("foreign key constraint \"%s\" "
 
3956
                                                        "will require costly sequential scans",
 
3957
                                                        fkconstraint->constr_name),
 
3958
                                         errdetail("Key columns \"%s\" and \"%s\" "
 
3959
                                                           "are of different types: %s and %s.",
 
3960
                                                         strVal(list_nth(fkconstraint->fk_attrs, i)),
 
3961
                                                         strVal(list_nth(fkconstraint->pk_attrs, i)),
 
3962
                                                           format_type_be(fktypoid[i]),
 
3963
                                                           format_type_be(pktypoid[i]))));
 
3964
 
 
3965
                ReleaseSysCache(o);
 
3966
        }
 
3967
 
 
3968
        /*
 
3969
         * Tell Phase 3 to check that the constraint is satisfied by existing
 
3970
         * rows (we can skip this during table creation).
 
3971
         */
 
3972
        if (!fkconstraint->skip_validation)
 
3973
        {
 
3974
                NewConstraint *newcon;
 
3975
 
 
3976
                newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
 
3977
                newcon->name = fkconstraint->constr_name;
 
3978
                newcon->contype = CONSTR_FOREIGN;
 
3979
                newcon->refrelid = RelationGetRelid(pkrel);
 
3980
                newcon->qual = (Node *) fkconstraint;
 
3981
 
 
3982
                tab->constraints = lappend(tab->constraints, newcon);
 
3983
        }
 
3984
 
 
3985
        /*
 
3986
         * Record the FK constraint in pg_constraint.
 
3987
         */
 
3988
        constrOid = CreateConstraintEntry(fkconstraint->constr_name,
 
3989
                                                                          RelationGetNamespace(rel),
 
3990
                                                                          CONSTRAINT_FOREIGN,
 
3991
                                                                          fkconstraint->deferrable,
 
3992
                                                                          fkconstraint->initdeferred,
 
3993
                                                                          RelationGetRelid(rel),
 
3994
                                                                          fkattnum,
 
3995
                                                                          numfks,
 
3996
                                                                          InvalidOid,           /* not a domain
 
3997
                                                                                                                 * constraint */
 
3998
                                                                          RelationGetRelid(pkrel),
 
3999
                                                                          pkattnum,
 
4000
                                                                          numpks,
 
4001
                                                                          fkconstraint->fk_upd_action,
 
4002
                                                                          fkconstraint->fk_del_action,
 
4003
                                                                          fkconstraint->fk_matchtype,
 
4004
                                                                          indexOid,
 
4005
                                                                          NULL,         /* no check constraint */
 
4006
                                                                          NULL,
 
4007
                                                                          NULL);
 
4008
 
 
4009
        /*
 
4010
         * Create the triggers that will enforce the constraint.
 
4011
         */
 
4012
        createForeignKeyTriggers(rel, fkconstraint, constrOid);
 
4013
 
 
4014
        /*
 
4015
         * Close pk table, but keep lock until we've committed.
 
4016
         */
 
4017
        heap_close(pkrel, NoLock);
 
4018
}
 
4019
 
 
4020
 
 
4021
/*
 
4022
 * transformColumnNameList - transform list of column names
 
4023
 *
 
4024
 * Lookup each name and return its attnum and type OID
 
4025
 */
 
4026
static int
 
4027
transformColumnNameList(Oid relId, List *colList,
 
4028
                                                int16 *attnums, Oid *atttypids)
 
4029
{
 
4030
        ListCell   *l;
 
4031
        int                     attnum;
 
4032
 
 
4033
        attnum = 0;
 
4034
        foreach(l, colList)
 
4035
        {
 
4036
                char       *attname = strVal(lfirst(l));
 
4037
                HeapTuple       atttuple;
 
4038
 
 
4039
                atttuple = SearchSysCacheAttName(relId, attname);
 
4040
                if (!HeapTupleIsValid(atttuple))
 
4041
                        ereport(ERROR,
 
4042
                                        (errcode(ERRCODE_UNDEFINED_COLUMN),
 
4043
                                         errmsg("column \"%s\" referenced in foreign key constraint does not exist",
 
4044
                                                        attname)));
 
4045
                if (attnum >= INDEX_MAX_KEYS)
 
4046
                        ereport(ERROR,
 
4047
                                        (errcode(ERRCODE_TOO_MANY_COLUMNS),
 
4048
                                 errmsg("cannot have more than %d keys in a foreign key",
 
4049
                                                INDEX_MAX_KEYS)));
 
4050
                attnums[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->attnum;
 
4051
                atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
 
4052
                ReleaseSysCache(atttuple);
 
4053
                attnum++;
 
4054
        }
 
4055
 
 
4056
        return attnum;
 
4057
}
 
4058
 
 
4059
/*
 
4060
 * transformFkeyGetPrimaryKey -
 
4061
 *
 
4062
 *      Look up the names, attnums, and types of the primary key attributes
 
4063
 *      for the pkrel.  Also return the index OID and index opclasses of the
 
4064
 *      index supporting the primary key.
 
4065
 *
 
4066
 *      All parameters except pkrel are output parameters.      Also, the function
 
4067
 *      return value is the number of attributes in the primary key.
 
4068
 *
 
4069
 *      Used when the column list in the REFERENCES specification is omitted.
 
4070
 */
 
4071
static int
 
4072
transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
 
4073
                                                   List **attnamelist,
 
4074
                                                   int16 *attnums, Oid *atttypids,
 
4075
                                                   Oid *opclasses)
 
4076
{
 
4077
        List       *indexoidlist;
 
4078
        ListCell   *indexoidscan;
 
4079
        HeapTuple       indexTuple = NULL;
 
4080
        Form_pg_index indexStruct = NULL;
 
4081
        int                     i;
 
4082
 
 
4083
        /*
 
4084
         * Get the list of index OIDs for the table from the relcache, and
 
4085
         * look up each one in the pg_index syscache until we find one marked
 
4086
         * primary key (hopefully there isn't more than one such).
 
4087
         */
 
4088
        indexoidlist = RelationGetIndexList(pkrel);
 
4089
 
 
4090
        foreach(indexoidscan, indexoidlist)
 
4091
        {
 
4092
                Oid                     indexoid = lfirst_oid(indexoidscan);
 
4093
 
 
4094
                indexTuple = SearchSysCache(INDEXRELID,
 
4095
                                                                        ObjectIdGetDatum(indexoid),
 
4096
                                                                        0, 0, 0);
 
4097
                if (!HeapTupleIsValid(indexTuple))
 
4098
                        elog(ERROR, "cache lookup failed for index %u", indexoid);
 
4099
                indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
 
4100
                if (indexStruct->indisprimary)
 
4101
                {
 
4102
                        *indexOid = indexoid;
 
4103
                        break;
 
4104
                }
 
4105
                ReleaseSysCache(indexTuple);
 
4106
                indexStruct = NULL;
 
4107
        }
 
4108
 
 
4109
        list_free(indexoidlist);
 
4110
 
 
4111
        /*
 
4112
         * Check that we found it
 
4113
         */
 
4114
        if (indexStruct == NULL)
 
4115
                ereport(ERROR,
 
4116
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
 
4117
                        errmsg("there is no primary key for referenced table \"%s\"",
 
4118
                                   RelationGetRelationName(pkrel))));
 
4119
 
 
4120
        /*
 
4121
         * Now build the list of PK attributes from the indkey definition (we
 
4122
         * assume a primary key cannot have expressional elements)
 
4123
         */
 
4124
        *attnamelist = NIL;
 
4125
        for (i = 0; i < indexStruct->indnatts; i++)
 
4126
        {
 
4127
                int                     pkattno = indexStruct->indkey[i];
 
4128
 
 
4129
                attnums[i] = pkattno;
 
4130
                atttypids[i] = attnumTypeId(pkrel, pkattno);
 
4131
                opclasses[i] = indexStruct->indclass[i];
 
4132
                *attnamelist = lappend(*attnamelist,
 
4133
                   makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
 
4134
        }
 
4135
 
 
4136
        ReleaseSysCache(indexTuple);
 
4137
 
 
4138
        return i;
 
4139
}
 
4140
 
 
4141
/*
 
4142
 * transformFkeyCheckAttrs -
 
4143
 *
 
4144
 *      Make sure that the attributes of a referenced table belong to a unique
 
4145
 *      (or primary key) constraint.  Return the OID of the index supporting
 
4146
 *      the constraint, as well as the opclasses associated with the index
 
4147
 *      columns.
 
4148
 */
 
4149
static Oid
 
4150
transformFkeyCheckAttrs(Relation pkrel,
 
4151
                                                int numattrs, int16 *attnums,
 
4152
                                                Oid *opclasses) /* output parameter */
 
4153
{
 
4154
        Oid                     indexoid = InvalidOid;
 
4155
        bool            found = false;
 
4156
        List       *indexoidlist;
 
4157
        ListCell   *indexoidscan;
 
4158
 
 
4159
        /*
 
4160
         * Get the list of index OIDs for the table from the relcache, and
 
4161
         * look up each one in the pg_index syscache, and match unique indexes
 
4162
         * to the list of attnums we are given.
 
4163
         */
 
4164
        indexoidlist = RelationGetIndexList(pkrel);
 
4165
 
 
4166
        foreach(indexoidscan, indexoidlist)
 
4167
        {
 
4168
                HeapTuple       indexTuple;
 
4169
                Form_pg_index indexStruct;
 
4170
                int                     i,
 
4171
                                        j;
 
4172
 
 
4173
                indexoid = lfirst_oid(indexoidscan);
 
4174
                indexTuple = SearchSysCache(INDEXRELID,
 
4175
                                                                        ObjectIdGetDatum(indexoid),
 
4176
                                                                        0, 0, 0);
 
4177
                if (!HeapTupleIsValid(indexTuple))
 
4178
                        elog(ERROR, "cache lookup failed for index %u", indexoid);
 
4179
                indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
 
4180
 
 
4181
                /*
 
4182
                 * Must have the right number of columns; must be unique and not a
 
4183
                 * partial index; forget it if there are any expressions, too
 
4184
                 */
 
4185
                if (indexStruct->indnatts == numattrs &&
 
4186
                        indexStruct->indisunique &&
 
4187
                        heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
 
4188
                        heap_attisnull(indexTuple, Anum_pg_index_indexprs))
 
4189
                {
 
4190
                        /*
 
4191
                         * The given attnum list may match the index columns in any
 
4192
                         * order.  Check that each list is a subset of the other.
 
4193
                         */
 
4194
                        for (i = 0; i < numattrs; i++)
 
4195
                        {
 
4196
                                found = false;
 
4197
                                for (j = 0; j < numattrs; j++)
 
4198
                                {
 
4199
                                        if (attnums[i] == indexStruct->indkey[j])
 
4200
                                        {
 
4201
                                                found = true;
 
4202
                                                break;
 
4203
                                        }
 
4204
                                }
 
4205
                                if (!found)
 
4206
                                        break;
 
4207
                        }
 
4208
                        if (found)
 
4209
                        {
 
4210
                                for (i = 0; i < numattrs; i++)
 
4211
                                {
 
4212
                                        found = false;
 
4213
                                        for (j = 0; j < numattrs; j++)
 
4214
                                        {
 
4215
                                                if (attnums[j] == indexStruct->indkey[i])
 
4216
                                                {
 
4217
                                                        opclasses[j] = indexStruct->indclass[i];
 
4218
                                                        found = true;
 
4219
                                                        break;
 
4220
                                                }
 
4221
                                        }
 
4222
                                        if (!found)
 
4223
                                                break;
 
4224
                                }
 
4225
                        }
 
4226
                }
 
4227
                ReleaseSysCache(indexTuple);
 
4228
                if (found)
 
4229
                        break;
 
4230
        }
 
4231
 
 
4232
        if (!found)
 
4233
                ereport(ERROR,
 
4234
                                (errcode(ERRCODE_INVALID_FOREIGN_KEY),
 
4235
                                 errmsg("there is no unique constraint matching given keys for referenced table \"%s\"",
 
4236
                                                RelationGetRelationName(pkrel))));
 
4237
 
 
4238
        list_free(indexoidlist);
 
4239
 
 
4240
        return indexoid;
 
4241
}
 
4242
 
 
4243
/*
 
4244
 * Scan the existing rows in a table to verify they meet a proposed FK
 
4245
 * constraint.
 
4246
 *
 
4247
 * Caller must have opened and locked both relations.
 
4248
 */
 
4249
static void
 
4250
validateForeignKeyConstraint(FkConstraint *fkconstraint,
 
4251
                                                         Relation rel,
 
4252
                                                         Relation pkrel)
 
4253
{
 
4254
        HeapScanDesc scan;
 
4255
        HeapTuple       tuple;
 
4256
        Trigger         trig;
 
4257
        ListCell   *list;
 
4258
        int                     count;
 
4259
 
 
4260
        /*
 
4261
         * See if we can do it with a single LEFT JOIN query.  A FALSE result
 
4262
         * indicates we must proceed with the fire-the-trigger method.
 
4263
         */
 
4264
        if (RI_Initial_Check(fkconstraint, rel, pkrel))
 
4265
                return;
 
4266
 
 
4267
        /*
 
4268
         * Scan through each tuple, calling RI_FKey_check_ins (insert trigger)
 
4269
         * as if that tuple had just been inserted.  If any of those fail, it
 
4270
         * should ereport(ERROR) and that's that.
 
4271
         */
 
4272
        MemSet(&trig, 0, sizeof(trig));
 
4273
        trig.tgoid = InvalidOid;
 
4274
        trig.tgname = fkconstraint->constr_name;
 
4275
        trig.tgenabled = TRUE;
 
4276
        trig.tgisconstraint = TRUE;
 
4277
        trig.tgconstrrelid = RelationGetRelid(pkrel);
 
4278
        trig.tgdeferrable = FALSE;
 
4279
        trig.tginitdeferred = FALSE;
 
4280
 
 
4281
        trig.tgargs = (char **) palloc(sizeof(char *) *
 
4282
                                                                 (4 + list_length(fkconstraint->fk_attrs)
 
4283
                                                                  + list_length(fkconstraint->pk_attrs)));
 
4284
 
 
4285
        trig.tgargs[0] = trig.tgname;
 
4286
        trig.tgargs[1] = RelationGetRelationName(rel);
 
4287
        trig.tgargs[2] = RelationGetRelationName(pkrel);
 
4288
        trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype);
 
4289
        count = 4;
 
4290
        foreach(list, fkconstraint->fk_attrs)
 
4291
        {
 
4292
                char       *fk_at = strVal(lfirst(list));
 
4293
 
 
4294
                trig.tgargs[count] = fk_at;
 
4295
                count += 2;
 
4296
        }
 
4297
        count = 5;
 
4298
        foreach(list, fkconstraint->pk_attrs)
 
4299
        {
 
4300
                char       *pk_at = strVal(lfirst(list));
 
4301
 
 
4302
                trig.tgargs[count] = pk_at;
 
4303
                count += 2;
 
4304
        }
 
4305
        trig.tgnargs = count - 1;
 
4306
 
 
4307
        scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
 
4308
 
 
4309
        while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
 
4310
        {
 
4311
                FunctionCallInfoData fcinfo;
 
4312
                TriggerData trigdata;
 
4313
 
 
4314
                /*
 
4315
                 * Make a call to the trigger function
 
4316
                 *
 
4317
                 * No parameters are passed, but we do set a context
 
4318
                 */
 
4319
                MemSet(&fcinfo, 0, sizeof(fcinfo));
 
4320
 
 
4321
                /*
 
4322
                 * We assume RI_FKey_check_ins won't look at flinfo...
 
4323
                 */
 
4324
                trigdata.type = T_TriggerData;
 
4325
                trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
 
4326
                trigdata.tg_relation = rel;
 
4327
                trigdata.tg_trigtuple = tuple;
 
4328
                trigdata.tg_newtuple = NULL;
 
4329
                trigdata.tg_trigger = &trig;
 
4330
                trigdata.tg_trigtuplebuf = scan->rs_cbuf;
 
4331
                trigdata.tg_newtuplebuf = InvalidBuffer;
 
4332
 
 
4333
                fcinfo.context = (Node *) &trigdata;
 
4334
 
 
4335
                RI_FKey_check_ins(&fcinfo);
 
4336
        }
 
4337
 
 
4338
        heap_endscan(scan);
 
4339
 
 
4340
        pfree(trig.tgargs);
 
4341
}
 
4342
 
 
4343
/*
 
4344
 * Create the triggers that implement an FK constraint.
 
4345
 */
 
4346
static void
 
4347
createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
 
4348
                                                 Oid constrOid)
 
4349
{
 
4350
        RangeVar   *myRel;
 
4351
        CreateTrigStmt *fk_trigger;
 
4352
        ListCell   *fk_attr;
 
4353
        ListCell   *pk_attr;
 
4354
        ObjectAddress trigobj,
 
4355
                                constrobj;
 
4356
 
 
4357
        /*
 
4358
         * Reconstruct a RangeVar for my relation (not passed in,
 
4359
         * unfortunately).
 
4360
         */
 
4361
        myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
 
4362
                                                 pstrdup(RelationGetRelationName(rel)));
 
4363
 
 
4364
        /*
 
4365
         * Preset objectAddress fields
 
4366
         */
 
4367
        constrobj.classId = get_system_catalog_relid(ConstraintRelationName);
 
4368
        constrobj.objectId = constrOid;
 
4369
        constrobj.objectSubId = 0;
 
4370
        trigobj.classId = get_system_catalog_relid(TriggerRelationName);
 
4371
        trigobj.objectSubId = 0;
 
4372
 
 
4373
        /* Make changes-so-far visible */
 
4374
        CommandCounterIncrement();
 
4375
 
 
4376
        /*
 
4377
         * Build and execute a CREATE CONSTRAINT TRIGGER statement for the
 
4378
         * CHECK action.
 
4379
         */
 
4380
        fk_trigger = makeNode(CreateTrigStmt);
 
4381
        fk_trigger->trigname = fkconstraint->constr_name;
 
4382
        fk_trigger->relation = myRel;
 
4383
        fk_trigger->funcname = SystemFuncName("RI_FKey_check_ins");
 
4384
        fk_trigger->before = false;
 
4385
        fk_trigger->row = true;
 
4386
        fk_trigger->actions[0] = 'i';
 
4387
        fk_trigger->actions[1] = 'u';
 
4388
        fk_trigger->actions[2] = '\0';
 
4389
 
 
4390
        fk_trigger->isconstraint = true;
 
4391
        fk_trigger->deferrable = fkconstraint->deferrable;
 
4392
        fk_trigger->initdeferred = fkconstraint->initdeferred;
 
4393
        fk_trigger->constrrel = fkconstraint->pktable;
 
4394
 
 
4395
        fk_trigger->args = NIL;
 
4396
        fk_trigger->args = lappend(fk_trigger->args,
 
4397
                                                           makeString(fkconstraint->constr_name));
 
4398
        fk_trigger->args = lappend(fk_trigger->args,
 
4399
                                                           makeString(myRel->relname));
 
4400
        fk_trigger->args = lappend(fk_trigger->args,
 
4401
                                                         makeString(fkconstraint->pktable->relname));
 
4402
        fk_trigger->args = lappend(fk_trigger->args,
 
4403
                        makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
 
4404
        if (list_length(fkconstraint->fk_attrs) != list_length(fkconstraint->pk_attrs))
 
4405
                ereport(ERROR,
 
4406
                                (errcode(ERRCODE_INVALID_FOREIGN_KEY),
 
4407
                                 errmsg("number of referencing and referenced columns for foreign key disagree")));
 
4408
 
 
4409
        forboth(fk_attr, fkconstraint->fk_attrs,
 
4410
                        pk_attr, fkconstraint->pk_attrs)
 
4411
        {
 
4412
                fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
 
4413
                fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
 
4414
        }
 
4415
 
 
4416
        trigobj.objectId = CreateTrigger(fk_trigger, true);
 
4417
 
 
4418
        /* Register dependency from trigger to constraint */
 
4419
        recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
 
4420
 
 
4421
        /* Make changes-so-far visible */
 
4422
        CommandCounterIncrement();
 
4423
 
 
4424
        /*
 
4425
         * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
 
4426
         * DELETE action on the referenced table.
 
4427
         */
 
4428
        fk_trigger = makeNode(CreateTrigStmt);
 
4429
        fk_trigger->trigname = fkconstraint->constr_name;
 
4430
        fk_trigger->relation = fkconstraint->pktable;
 
4431
        fk_trigger->before = false;
 
4432
        fk_trigger->row = true;
 
4433
        fk_trigger->actions[0] = 'd';
 
4434
        fk_trigger->actions[1] = '\0';
 
4435
 
 
4436
        fk_trigger->isconstraint = true;
 
4437
        fk_trigger->constrrel = myRel;
 
4438
        switch (fkconstraint->fk_del_action)
 
4439
        {
 
4440
                case FKCONSTR_ACTION_NOACTION:
 
4441
                        fk_trigger->deferrable = fkconstraint->deferrable;
 
4442
                        fk_trigger->initdeferred = fkconstraint->initdeferred;
 
4443
                        fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_del");
 
4444
                        break;
 
4445
                case FKCONSTR_ACTION_RESTRICT:
 
4446
                        fk_trigger->deferrable = false;
 
4447
                        fk_trigger->initdeferred = false;
 
4448
                        fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_del");
 
4449
                        break;
 
4450
                case FKCONSTR_ACTION_CASCADE:
 
4451
                        fk_trigger->deferrable = false;
 
4452
                        fk_trigger->initdeferred = false;
 
4453
                        fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_del");
 
4454
                        break;
 
4455
                case FKCONSTR_ACTION_SETNULL:
 
4456
                        fk_trigger->deferrable = false;
 
4457
                        fk_trigger->initdeferred = false;
 
4458
                        fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_del");
 
4459
                        break;
 
4460
                case FKCONSTR_ACTION_SETDEFAULT:
 
4461
                        fk_trigger->deferrable = false;
 
4462
                        fk_trigger->initdeferred = false;
 
4463
                        fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_del");
 
4464
                        break;
 
4465
                default:
 
4466
                        elog(ERROR, "unrecognized FK action type: %d",
 
4467
                                 (int) fkconstraint->fk_del_action);
 
4468
                        break;
 
4469
        }
 
4470
 
 
4471
        fk_trigger->args = NIL;
 
4472
        fk_trigger->args = lappend(fk_trigger->args,
 
4473
                                                           makeString(fkconstraint->constr_name));
 
4474
        fk_trigger->args = lappend(fk_trigger->args,
 
4475
                                                           makeString(myRel->relname));
 
4476
        fk_trigger->args = lappend(fk_trigger->args,
 
4477
                                                         makeString(fkconstraint->pktable->relname));
 
4478
        fk_trigger->args = lappend(fk_trigger->args,
 
4479
                        makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
 
4480
        forboth(fk_attr, fkconstraint->fk_attrs,
 
4481
                        pk_attr, fkconstraint->pk_attrs)
 
4482
        {
 
4483
                fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
 
4484
                fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
 
4485
        }
 
4486
 
 
4487
        trigobj.objectId = CreateTrigger(fk_trigger, true);
 
4488
 
 
4489
        /* Register dependency from trigger to constraint */
 
4490
        recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
 
4491
 
 
4492
        /* Make changes-so-far visible */
 
4493
        CommandCounterIncrement();
 
4494
 
 
4495
        /*
 
4496
         * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
 
4497
         * UPDATE action on the referenced table.
 
4498
         */
 
4499
        fk_trigger = makeNode(CreateTrigStmt);
 
4500
        fk_trigger->trigname = fkconstraint->constr_name;
 
4501
        fk_trigger->relation = fkconstraint->pktable;
 
4502
        fk_trigger->before = false;
 
4503
        fk_trigger->row = true;
 
4504
        fk_trigger->actions[0] = 'u';
 
4505
        fk_trigger->actions[1] = '\0';
 
4506
        fk_trigger->isconstraint = true;
 
4507
        fk_trigger->constrrel = myRel;
 
4508
        switch (fkconstraint->fk_upd_action)
 
4509
        {
 
4510
                case FKCONSTR_ACTION_NOACTION:
 
4511
                        fk_trigger->deferrable = fkconstraint->deferrable;
 
4512
                        fk_trigger->initdeferred = fkconstraint->initdeferred;
 
4513
                        fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_upd");
 
4514
                        break;
 
4515
                case FKCONSTR_ACTION_RESTRICT:
 
4516
                        fk_trigger->deferrable = false;
 
4517
                        fk_trigger->initdeferred = false;
 
4518
                        fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_upd");
 
4519
                        break;
 
4520
                case FKCONSTR_ACTION_CASCADE:
 
4521
                        fk_trigger->deferrable = false;
 
4522
                        fk_trigger->initdeferred = false;
 
4523
                        fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_upd");
 
4524
                        break;
 
4525
                case FKCONSTR_ACTION_SETNULL:
 
4526
                        fk_trigger->deferrable = false;
 
4527
                        fk_trigger->initdeferred = false;
 
4528
                        fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_upd");
 
4529
                        break;
 
4530
                case FKCONSTR_ACTION_SETDEFAULT:
 
4531
                        fk_trigger->deferrable = false;
 
4532
                        fk_trigger->initdeferred = false;
 
4533
                        fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_upd");
 
4534
                        break;
 
4535
                default:
 
4536
                        elog(ERROR, "unrecognized FK action type: %d",
 
4537
                                 (int) fkconstraint->fk_upd_action);
 
4538
                        break;
 
4539
        }
 
4540
 
 
4541
        fk_trigger->args = NIL;
 
4542
        fk_trigger->args = lappend(fk_trigger->args,
 
4543
                                                           makeString(fkconstraint->constr_name));
 
4544
        fk_trigger->args = lappend(fk_trigger->args,
 
4545
                                                           makeString(myRel->relname));
 
4546
        fk_trigger->args = lappend(fk_trigger->args,
 
4547
                                                         makeString(fkconstraint->pktable->relname));
 
4548
        fk_trigger->args = lappend(fk_trigger->args,
 
4549
                        makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
 
4550
        forboth(fk_attr, fkconstraint->fk_attrs,
 
4551
                        pk_attr, fkconstraint->pk_attrs)
 
4552
        {
 
4553
                fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
 
4554
                fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
 
4555
        }
 
4556
 
 
4557
        trigobj.objectId = CreateTrigger(fk_trigger, true);
 
4558
 
 
4559
        /* Register dependency from trigger to constraint */
 
4560
        recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
 
4561
}
 
4562
 
 
4563
/*
 
4564
 * fkMatchTypeToString -
 
4565
 *        convert FKCONSTR_MATCH_xxx code to string to use in trigger args
 
4566
 */
 
4567
static char *
 
4568
fkMatchTypeToString(char match_type)
 
4569
{
 
4570
        switch (match_type)
 
4571
        {
 
4572
                case FKCONSTR_MATCH_FULL:
 
4573
                        return pstrdup("FULL");
 
4574
                case FKCONSTR_MATCH_PARTIAL:
 
4575
                        return pstrdup("PARTIAL");
 
4576
                case FKCONSTR_MATCH_UNSPECIFIED:
 
4577
                        return pstrdup("UNSPECIFIED");
 
4578
                default:
 
4579
                        elog(ERROR, "unrecognized match type: %d",
 
4580
                                 (int) match_type);
 
4581
        }
 
4582
        return NULL;                            /* can't get here */
 
4583
}
 
4584
 
 
4585
/*
 
4586
 * ALTER TABLE DROP CONSTRAINT
 
4587
 */
 
4588
static void
 
4589
ATPrepDropConstraint(List **wqueue, Relation rel,
 
4590
                                         bool recurse, AlterTableCmd *cmd)
 
4591
{
 
4592
        /*
 
4593
         * We don't want errors or noise from child tables, so we have to pass
 
4594
         * down a modified command.
 
4595
         */
 
4596
        if (recurse)
 
4597
        {
 
4598
                AlterTableCmd *childCmd = copyObject(cmd);
 
4599
 
 
4600
                childCmd->subtype = AT_DropConstraintQuietly;
 
4601
                ATSimpleRecursion(wqueue, rel, childCmd, recurse);
 
4602
        }
 
4603
}
 
4604
 
 
4605
static void
 
4606
ATExecDropConstraint(Relation rel, const char *constrName,
 
4607
                                         DropBehavior behavior, bool quiet)
 
4608
{
 
4609
        int                     deleted;
 
4610
 
 
4611
        deleted = RemoveRelConstraints(rel, constrName, behavior);
 
4612
 
 
4613
        if (!quiet)
 
4614
        {
 
4615
                /* If zero constraints deleted, complain */
 
4616
                if (deleted == 0)
 
4617
                        ereport(ERROR,
 
4618
                                        (errcode(ERRCODE_UNDEFINED_OBJECT),
 
4619
                                         errmsg("constraint \"%s\" does not exist",
 
4620
                                                        constrName)));
 
4621
                /* Otherwise if more than one constraint deleted, notify */
 
4622
                else if (deleted > 1)
 
4623
                        ereport(NOTICE,
 
4624
                                (errmsg("multiple constraints named \"%s\" were dropped",
 
4625
                                                constrName)));
 
4626
        }
 
4627
}
 
4628
 
 
4629
/*
 
4630
 * ALTER COLUMN TYPE
 
4631
 */
 
4632
static void
 
4633
ATPrepAlterColumnType(List **wqueue,
 
4634
                                          AlteredTableInfo *tab, Relation rel,
 
4635
                                          bool recurse, bool recursing,
 
4636
                                          AlterTableCmd *cmd)
 
4637
{
 
4638
        char       *colName = cmd->name;
 
4639
        TypeName   *typename = (TypeName *) cmd->def;
 
4640
        HeapTuple       tuple;
 
4641
        Form_pg_attribute attTup;
 
4642
        AttrNumber      attnum;
 
4643
        Oid                     targettype;
 
4644
        Node       *transform;
 
4645
        NewColumnValue *newval;
 
4646
        ParseState *pstate = make_parsestate(NULL);
 
4647
 
 
4648
        /* lookup the attribute so we can check inheritance status */
 
4649
        tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
 
4650
        if (!HeapTupleIsValid(tuple))
 
4651
                ereport(ERROR,
 
4652
                                (errcode(ERRCODE_UNDEFINED_COLUMN),
 
4653
                                 errmsg("column \"%s\" of relation \"%s\" does not exist",
 
4654
                                                colName, RelationGetRelationName(rel))));
 
4655
        attTup = (Form_pg_attribute) GETSTRUCT(tuple);
 
4656
        attnum = attTup->attnum;
 
4657
 
 
4658
        /* Can't alter a system attribute */
 
4659
        if (attnum <= 0)
 
4660
                ereport(ERROR,
 
4661
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
4662
                                 errmsg("cannot alter system column \"%s\"",
 
4663
                                                colName)));
 
4664
 
 
4665
        /* Don't alter inherited columns */
 
4666
        if (attTup->attinhcount > 0 && !recursing)
 
4667
                ereport(ERROR,
 
4668
                                (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 
4669
                                 errmsg("cannot alter inherited column \"%s\"",
 
4670
                                                colName)));
 
4671
 
 
4672
        /* Look up the target type */
 
4673
        targettype = LookupTypeName(typename);
 
4674
        if (!OidIsValid(targettype))
 
4675
                ereport(ERROR,
 
4676
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
 
4677
                                 errmsg("type \"%s\" does not exist",
 
4678
                                                TypeNameToString(typename))));
 
4679
 
 
4680
        /* make sure datatype is legal for a column */
 
4681
        CheckAttributeType(colName, targettype);
 
4682
 
 
4683
        /*
 
4684
         * Set up an expression to transform the old data value to the new
 
4685
         * type. If a USING option was given, transform and use that
 
4686
         * expression, else just take the old value and try to coerce it.  We
 
4687
         * do this first so that type incompatibility can be detected before
 
4688
         * we waste effort, and because we need the expression to be parsed
 
4689
         * against the original table rowtype.
 
4690
         */
 
4691
        if (cmd->transform)
 
4692
        {
 
4693
                RangeTblEntry *rte;
 
4694
 
 
4695
                /* Expression must be able to access vars of old table */
 
4696
                rte = addRangeTableEntryForRelation(pstate,
 
4697
                                                                                        RelationGetRelid(rel),
 
4698
                                                        makeAlias(RelationGetRelationName(rel), NIL),
 
4699
                                                                                        false,
 
4700
                                                                                        true);
 
4701
                addRTEtoQuery(pstate, rte, false, true);
 
4702
 
 
4703
                transform = transformExpr(pstate, cmd->transform);
 
4704
 
 
4705
                /* It can't return a set */
 
4706
                if (expression_returns_set(transform))
 
4707
                        ereport(ERROR,
 
4708
                                        (errcode(ERRCODE_DATATYPE_MISMATCH),
 
4709
                                  errmsg("transform expression must not return a set")));
 
4710
 
 
4711
                /* No subplans or aggregates, either... */
 
4712
                if (pstate->p_hasSubLinks)
 
4713
                        ereport(ERROR,
 
4714
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
4715
                                 errmsg("cannot use subquery in transform expression")));
 
4716
                if (pstate->p_hasAggs)
 
4717
                        ereport(ERROR,
 
4718
                                        (errcode(ERRCODE_GROUPING_ERROR),
 
4719
                                         errmsg("cannot use aggregate function in transform expression")));
 
4720
        }
 
4721
        else
 
4722
        {
 
4723
                transform = (Node *) makeVar(1, attnum,
 
4724
                                                                         attTup->atttypid, attTup->atttypmod,
 
4725
                                                                         0);
 
4726
        }
 
4727
 
 
4728
        transform = coerce_to_target_type(pstate,
 
4729
                                                                          transform, exprType(transform),
 
4730
                                                                          targettype, typename->typmod,
 
4731
                                                                          COERCION_ASSIGNMENT,
 
4732
                                                                          COERCE_IMPLICIT_CAST);
 
4733
        if (transform == NULL)
 
4734
                ereport(ERROR,
 
4735
                                (errcode(ERRCODE_DATATYPE_MISMATCH),
 
4736
                                 errmsg("column \"%s\" cannot be cast to type \"%s\"",
 
4737
                                                colName, TypeNameToString(typename))));
 
4738
 
 
4739
        /*
 
4740
         * Add a work queue item to make ATRewriteTable update the column
 
4741
         * contents.
 
4742
         */
 
4743
        newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
 
4744
        newval->attnum = attnum;
 
4745
        newval->expr = (Expr *) transform;
 
4746
 
 
4747
        tab->newvals = lappend(tab->newvals, newval);
 
4748
 
 
4749
        ReleaseSysCache(tuple);
 
4750
 
 
4751
        /*
 
4752
         * The recursion case is handled by ATSimpleRecursion.  However, if we
 
4753
         * are told not to recurse, there had better not be any child tables;
 
4754
         * else the alter would put them out of step.
 
4755
         */
 
4756
        if (recurse)
 
4757
                ATSimpleRecursion(wqueue, rel, cmd, recurse);
 
4758
        else if (!recursing &&
 
4759
                         find_inheritance_children(RelationGetRelid(rel)) != NIL)
 
4760
                ereport(ERROR,
 
4761
                                (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 
4762
                                 errmsg("type of inherited column \"%s\" must be changed in child tables too",
 
4763
                                                colName)));
 
4764
}
 
4765
 
 
4766
static void
 
4767
ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
 
4768
                                          const char *colName, TypeName *typename)
 
4769
{
 
4770
        HeapTuple       heapTup;
 
4771
        Form_pg_attribute attTup;
 
4772
        AttrNumber      attnum;
 
4773
        HeapTuple       typeTuple;
 
4774
        Form_pg_type tform;
 
4775
        Oid                     targettype;
 
4776
        Node       *defaultexpr;
 
4777
        Relation        attrelation;
 
4778
        Relation        depRel;
 
4779
        ScanKeyData key[3];
 
4780
        SysScanDesc scan;
 
4781
        HeapTuple       depTup;
 
4782
 
 
4783
        attrelation = heap_openr(AttributeRelationName, RowExclusiveLock);
 
4784
 
 
4785
        /* Look up the target column */
 
4786
        heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
 
4787
        if (!HeapTupleIsValid(heapTup))         /* shouldn't happen */
 
4788
                ereport(ERROR,
 
4789
                                (errcode(ERRCODE_UNDEFINED_COLUMN),
 
4790
                                 errmsg("column \"%s\" of relation \"%s\" does not exist",
 
4791
                                                colName, RelationGetRelationName(rel))));
 
4792
        attTup = (Form_pg_attribute) GETSTRUCT(heapTup);
 
4793
        attnum = attTup->attnum;
 
4794
 
 
4795
        /* Check for multiple ALTER TYPE on same column --- can't cope */
 
4796
        if (attTup->atttypid != tab->oldDesc->attrs[attnum - 1]->atttypid ||
 
4797
                attTup->atttypmod != tab->oldDesc->attrs[attnum - 1]->atttypmod)
 
4798
                ereport(ERROR,
 
4799
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
4800
                                 errmsg("cannot alter type of column \"%s\" twice",
 
4801
                                                colName)));
 
4802
 
 
4803
        /* Look up the target type (should not fail, since prep found it) */
 
4804
        typeTuple = typenameType(typename);
 
4805
        tform = (Form_pg_type) GETSTRUCT(typeTuple);
 
4806
        targettype = HeapTupleGetOid(typeTuple);
 
4807
 
 
4808
        /*
 
4809
         * If there is a default expression for the column, get it and ensure
 
4810
         * we can coerce it to the new datatype.  (We must do this before
 
4811
         * changing the column type, because build_column_default itself will
 
4812
         * try to coerce, and will not issue the error message we want if it
 
4813
         * fails.)
 
4814
         *
 
4815
         * We remove any implicit coercion steps at the top level of the old
 
4816
         * default expression; this has been agreed to satisfy the principle
 
4817
         * of least surprise.  (The conversion to the new column type should
 
4818
         * act like it started from what the user sees as the stored expression,
 
4819
         * and the implicit coercions aren't going to be shown.)
 
4820
         */
 
4821
        if (attTup->atthasdef)
 
4822
        {
 
4823
                defaultexpr = build_column_default(rel, attnum);
 
4824
                Assert(defaultexpr);
 
4825
                defaultexpr = strip_implicit_coercions(defaultexpr);
 
4826
                defaultexpr = coerce_to_target_type(NULL,               /* no UNKNOWN params */
 
4827
                                                                                        defaultexpr, exprType(defaultexpr),
 
4828
                                                                                        targettype, typename->typmod,
 
4829
                                                                                        COERCION_ASSIGNMENT,
 
4830
                                                                                        COERCE_IMPLICIT_CAST);
 
4831
                if (defaultexpr == NULL)
 
4832
                        ereport(ERROR,
 
4833
                                        (errcode(ERRCODE_DATATYPE_MISMATCH),
 
4834
                                         errmsg("default for column \"%s\" cannot be cast to type \"%s\"",
 
4835
                                                        colName, TypeNameToString(typename))));
 
4836
        }
 
4837
        else
 
4838
                defaultexpr = NULL;
 
4839
 
 
4840
        /*
 
4841
         * Find everything that depends on the column (constraints, indexes,
 
4842
         * etc), and record enough information to let us recreate the objects.
 
4843
         *
 
4844
         * The actual recreation does not happen here, but only after we have
 
4845
         * performed all the individual ALTER TYPE operations.  We have to
 
4846
         * save the info before executing ALTER TYPE, though, else the
 
4847
         * deparser will get confused.
 
4848
         *
 
4849
         * There could be multiple entries for the same object, so we must check
 
4850
         * to ensure we process each one only once.  Note: we assume that an
 
4851
         * index that implements a constraint will not show a direct
 
4852
         * dependency on the column.
 
4853
         */
 
4854
        depRel = heap_openr(DependRelationName, RowExclusiveLock);
 
4855
 
 
4856
        ScanKeyInit(&key[0],
 
4857
                                Anum_pg_depend_refclassid,
 
4858
                                BTEqualStrategyNumber, F_OIDEQ,
 
4859
                                ObjectIdGetDatum(RelOid_pg_class));
 
4860
        ScanKeyInit(&key[1],
 
4861
                                Anum_pg_depend_refobjid,
 
4862
                                BTEqualStrategyNumber, F_OIDEQ,
 
4863
                                ObjectIdGetDatum(RelationGetRelid(rel)));
 
4864
        ScanKeyInit(&key[2],
 
4865
                                Anum_pg_depend_refobjsubid,
 
4866
                                BTEqualStrategyNumber, F_INT4EQ,
 
4867
                                Int32GetDatum((int32) attnum));
 
4868
 
 
4869
        scan = systable_beginscan(depRel, DependReferenceIndex, true,
 
4870
                                                          SnapshotNow, 3, key);
 
4871
 
 
4872
        while (HeapTupleIsValid(depTup = systable_getnext(scan)))
 
4873
        {
 
4874
                Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
 
4875
                ObjectAddress foundObject;
 
4876
 
 
4877
                /* We don't expect any PIN dependencies on columns */
 
4878
                if (foundDep->deptype == DEPENDENCY_PIN)
 
4879
                        elog(ERROR, "cannot alter type of a pinned column");
 
4880
 
 
4881
                foundObject.classId = foundDep->classid;
 
4882
                foundObject.objectId = foundDep->objid;
 
4883
                foundObject.objectSubId = foundDep->objsubid;
 
4884
 
 
4885
                switch (getObjectClass(&foundObject))
 
4886
                {
 
4887
                        case OCLASS_CLASS:
 
4888
                                {
 
4889
                                        char            relKind = get_rel_relkind(foundObject.objectId);
 
4890
 
 
4891
                                        if (relKind == RELKIND_INDEX)
 
4892
                                        {
 
4893
                                                Assert(foundObject.objectSubId == 0);
 
4894
                                                if (!list_member_oid(tab->changedIndexOids, foundObject.objectId))
 
4895
                                                {
 
4896
                                                        tab->changedIndexOids = lappend_oid(tab->changedIndexOids,
 
4897
                                                                                                   foundObject.objectId);
 
4898
                                                        tab->changedIndexDefs = lappend(tab->changedIndexDefs,
 
4899
                                                        pg_get_indexdef_string(foundObject.objectId));
 
4900
                                                }
 
4901
                                        }
 
4902
                                        else if (relKind == RELKIND_SEQUENCE)
 
4903
                                        {
 
4904
                                                /*
 
4905
                                                 * This must be a SERIAL column's sequence.  We
 
4906
                                                 * need not do anything to it.
 
4907
                                                 */
 
4908
                                                Assert(foundObject.objectSubId == 0);
 
4909
                                        }
 
4910
                                        else
 
4911
                                        {
 
4912
                                                /* Not expecting any other direct dependencies... */
 
4913
                                                elog(ERROR, "unexpected object depending on column: %s",
 
4914
                                                         getObjectDescription(&foundObject));
 
4915
                                        }
 
4916
                                        break;
 
4917
                                }
 
4918
 
 
4919
                        case OCLASS_CONSTRAINT:
 
4920
                                Assert(foundObject.objectSubId == 0);
 
4921
                                if (!list_member_oid(tab->changedConstraintOids, foundObject.objectId))
 
4922
                                {
 
4923
                                        tab->changedConstraintOids = lappend_oid(tab->changedConstraintOids,
 
4924
                                                                                                   foundObject.objectId);
 
4925
                                        tab->changedConstraintDefs = lappend(tab->changedConstraintDefs,
 
4926
                                          pg_get_constraintdef_string(foundObject.objectId));
 
4927
                                }
 
4928
                                break;
 
4929
 
 
4930
                        case OCLASS_REWRITE:
 
4931
                                /* XXX someday see if we can cope with revising views */
 
4932
                                ereport(ERROR,
 
4933
                                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
4934
                                                 errmsg("cannot alter type of a column used by a view or rule"),
 
4935
                                                 errdetail("%s depends on column \"%s\"",
 
4936
                                                                   getObjectDescription(&foundObject),
 
4937
                                                                   colName)));
 
4938
                                break;
 
4939
 
 
4940
                        case OCLASS_DEFAULT:
 
4941
 
 
4942
                                /*
 
4943
                                 * Ignore the column's default expression, since we will
 
4944
                                 * fix it below.
 
4945
                                 */
 
4946
                                Assert(defaultexpr);
 
4947
                                break;
 
4948
 
 
4949
                        case OCLASS_PROC:
 
4950
                        case OCLASS_TYPE:
 
4951
                        case OCLASS_CAST:
 
4952
                        case OCLASS_CONVERSION:
 
4953
                        case OCLASS_LANGUAGE:
 
4954
                        case OCLASS_OPERATOR:
 
4955
                        case OCLASS_OPCLASS:
 
4956
                        case OCLASS_TRIGGER:
 
4957
                        case OCLASS_SCHEMA:
 
4958
 
 
4959
                                /*
 
4960
                                 * We don't expect any of these sorts of objects to depend
 
4961
                                 * on a column.
 
4962
                                 */
 
4963
                                elog(ERROR, "unexpected object depending on column: %s",
 
4964
                                         getObjectDescription(&foundObject));
 
4965
                                break;
 
4966
 
 
4967
                        default:
 
4968
                                elog(ERROR, "unrecognized object class: %u",
 
4969
                                         foundObject.classId);
 
4970
                }
 
4971
        }
 
4972
 
 
4973
        systable_endscan(scan);
 
4974
 
 
4975
        /*
 
4976
         * Now scan for dependencies of this column on other things.  The only
 
4977
         * thing we should find is the dependency on the column datatype,
 
4978
         * which we want to remove.
 
4979
         */
 
4980
        ScanKeyInit(&key[0],
 
4981
                                Anum_pg_depend_classid,
 
4982
                                BTEqualStrategyNumber, F_OIDEQ,
 
4983
                                ObjectIdGetDatum(RelOid_pg_class));
 
4984
        ScanKeyInit(&key[1],
 
4985
                                Anum_pg_depend_objid,
 
4986
                                BTEqualStrategyNumber, F_OIDEQ,
 
4987
                                ObjectIdGetDatum(RelationGetRelid(rel)));
 
4988
        ScanKeyInit(&key[2],
 
4989
                                Anum_pg_depend_objsubid,
 
4990
                                BTEqualStrategyNumber, F_INT4EQ,
 
4991
                                Int32GetDatum((int32) attnum));
 
4992
 
 
4993
        scan = systable_beginscan(depRel, DependDependerIndex, true,
 
4994
                                                          SnapshotNow, 3, key);
 
4995
 
 
4996
        while (HeapTupleIsValid(depTup = systable_getnext(scan)))
 
4997
        {
 
4998
                Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
 
4999
 
 
5000
                if (foundDep->deptype != DEPENDENCY_NORMAL)
 
5001
                        elog(ERROR, "found unexpected dependency type '%c'",
 
5002
                                 foundDep->deptype);
 
5003
                if (foundDep->refclassid != RelOid_pg_type ||
 
5004
                        foundDep->refobjid != attTup->atttypid)
 
5005
                        elog(ERROR, "found unexpected dependency for column");
 
5006
 
 
5007
                simple_heap_delete(depRel, &depTup->t_self);
 
5008
        }
 
5009
 
 
5010
        systable_endscan(scan);
 
5011
 
 
5012
        heap_close(depRel, RowExclusiveLock);
 
5013
 
 
5014
        /*
 
5015
         * Here we go --- change the recorded column type.      (Note heapTup is a
 
5016
         * copy of the syscache entry, so okay to scribble on.)
 
5017
         */
 
5018
        attTup->atttypid = targettype;
 
5019
        attTup->atttypmod = typename->typmod;
 
5020
        attTup->attndims = list_length(typename->arrayBounds);
 
5021
        attTup->attlen = tform->typlen;
 
5022
        attTup->attbyval = tform->typbyval;
 
5023
        attTup->attalign = tform->typalign;
 
5024
        attTup->attstorage = tform->typstorage;
 
5025
 
 
5026
        ReleaseSysCache(typeTuple);
 
5027
 
 
5028
        simple_heap_update(attrelation, &heapTup->t_self, heapTup);
 
5029
 
 
5030
        /* keep system catalog indexes current */
 
5031
        CatalogUpdateIndexes(attrelation, heapTup);
 
5032
 
 
5033
        heap_close(attrelation, RowExclusiveLock);
 
5034
 
 
5035
        /* Install dependency on new datatype */
 
5036
        add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype);
 
5037
 
 
5038
        /*
 
5039
         * Drop any pg_statistic entry for the column, since it's now wrong
 
5040
         * type
 
5041
         */
 
5042
        RemoveStatistics(RelationGetRelid(rel), attnum);
 
5043
 
 
5044
        /*
 
5045
         * Update the default, if present, by brute force --- remove and
 
5046
         * re-add the default.  Probably unsafe to take shortcuts, since the
 
5047
         * new version may well have additional dependencies.  (It's okay to
 
5048
         * do this now, rather than after other ALTER TYPE commands, since the
 
5049
         * default won't depend on other column types.)
 
5050
         */
 
5051
        if (defaultexpr)
 
5052
        {
 
5053
                /* Must make new row visible since it will be updated again */
 
5054
                CommandCounterIncrement();
 
5055
 
 
5056
                /*
 
5057
                 * We use RESTRICT here for safety, but at present we do not
 
5058
                 * expect anything to depend on the default.
 
5059
                 */
 
5060
                RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true);
 
5061
 
 
5062
                StoreAttrDefault(rel, attnum, nodeToString(defaultexpr));
 
5063
        }
 
5064
 
 
5065
        /* Cleanup */
 
5066
        heap_freetuple(heapTup);
 
5067
}
 
5068
 
 
5069
/*
 
5070
 * Cleanup after we've finished all the ALTER TYPE operations for a
 
5071
 * particular relation.  We have to drop and recreate all the indexes
 
5072
 * and constraints that depend on the altered columns.
 
5073
 */
 
5074
static void
 
5075
ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
 
5076
{
 
5077
        ObjectAddress obj;
 
5078
        ListCell   *l;
 
5079
 
 
5080
        /*
 
5081
         * Re-parse the index and constraint definitions, and attach them to
 
5082
         * the appropriate work queue entries.  We do this before dropping
 
5083
         * because in the case of a FOREIGN KEY constraint, we might not yet
 
5084
         * have exclusive lock on the table the constraint is attached to, and
 
5085
         * we need to get that before dropping.  It's safe because the parser
 
5086
         * won't actually look at the catalogs to detect the existing entry.
 
5087
         */
 
5088
        foreach(l, tab->changedIndexDefs)
 
5089
                ATPostAlterTypeParse((char *) lfirst(l), wqueue);
 
5090
        foreach(l, tab->changedConstraintDefs)
 
5091
                ATPostAlterTypeParse((char *) lfirst(l), wqueue);
 
5092
 
 
5093
        /*
 
5094
         * Now we can drop the existing constraints and indexes ---
 
5095
         * constraints first, since some of them might depend on the indexes.
 
5096
         * It should be okay to use DROP_RESTRICT here, since nothing else
 
5097
         * should be depending on these objects.
 
5098
         */
 
5099
        if (tab->changedConstraintOids)
 
5100
                obj.classId = get_system_catalog_relid(ConstraintRelationName);
 
5101
        foreach(l, tab->changedConstraintOids)
 
5102
        {
 
5103
                obj.objectId = lfirst_oid(l);
 
5104
                obj.objectSubId = 0;
 
5105
                performDeletion(&obj, DROP_RESTRICT);
 
5106
        }
 
5107
 
 
5108
        obj.classId = RelOid_pg_class;
 
5109
        foreach(l, tab->changedIndexOids)
 
5110
        {
 
5111
                obj.objectId = lfirst_oid(l);
 
5112
                obj.objectSubId = 0;
 
5113
                performDeletion(&obj, DROP_RESTRICT);
 
5114
        }
 
5115
 
 
5116
        /*
 
5117
         * The objects will get recreated during subsequent passes over the
 
5118
         * work queue.
 
5119
         */
 
5120
}
 
5121
 
 
5122
static void
 
5123
ATPostAlterTypeParse(char *cmd, List **wqueue)
 
5124
{
 
5125
        List       *raw_parsetree_list;
 
5126
        List       *querytree_list;
 
5127
        ListCell   *list_item;
 
5128
 
 
5129
        /*
 
5130
         * We expect that we only have to do raw parsing and parse analysis,
 
5131
         * not any rule rewriting, since these will all be utility statements.
 
5132
         */
 
5133
        raw_parsetree_list = raw_parser(cmd);
 
5134
        querytree_list = NIL;
 
5135
        foreach(list_item, raw_parsetree_list)
 
5136
        {
 
5137
                Node       *parsetree = (Node *) lfirst(list_item);
 
5138
 
 
5139
                querytree_list = list_concat(querytree_list,
 
5140
                                                                         parse_analyze(parsetree, NULL, 0));
 
5141
        }
 
5142
 
 
5143
        /*
 
5144
         * Attach each generated command to the proper place in the work
 
5145
         * queue. Note this could result in creation of entirely new
 
5146
         * work-queue entries.
 
5147
         */
 
5148
        foreach(list_item, querytree_list)
 
5149
        {
 
5150
                Query      *query = (Query *) lfirst(list_item);
 
5151
                Relation        rel;
 
5152
                AlteredTableInfo *tab;
 
5153
 
 
5154
                Assert(IsA(query, Query));
 
5155
                Assert(query->commandType == CMD_UTILITY);
 
5156
                switch (nodeTag(query->utilityStmt))
 
5157
                {
 
5158
                        case T_IndexStmt:
 
5159
                                {
 
5160
                                        IndexStmt  *stmt = (IndexStmt *) query->utilityStmt;
 
5161
                                        AlterTableCmd *newcmd;
 
5162
 
 
5163
                                        rel = relation_openrv(stmt->relation, AccessExclusiveLock);
 
5164
                                        tab = ATGetQueueEntry(wqueue, rel);
 
5165
                                        newcmd = makeNode(AlterTableCmd);
 
5166
                                        newcmd->subtype = AT_ReAddIndex;
 
5167
                                        newcmd->def = (Node *) stmt;
 
5168
                                        tab->subcmds[AT_PASS_OLD_INDEX] =
 
5169
                                                lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd);
 
5170
                                        relation_close(rel, NoLock);
 
5171
                                        break;
 
5172
                                }
 
5173
                        case T_AlterTableStmt:
 
5174
                                {
 
5175
                                        AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt;
 
5176
                                        ListCell   *lcmd;
 
5177
 
 
5178
                                        rel = relation_openrv(stmt->relation, AccessExclusiveLock);
 
5179
                                        tab = ATGetQueueEntry(wqueue, rel);
 
5180
                                        foreach(lcmd, stmt->cmds)
 
5181
                                        {
 
5182
                                                AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
 
5183
 
 
5184
                                                switch (cmd->subtype)
 
5185
                                                {
 
5186
                                                        case AT_AddIndex:
 
5187
                                                                cmd->subtype = AT_ReAddIndex;
 
5188
                                                                tab->subcmds[AT_PASS_OLD_INDEX] =
 
5189
                                                                        lappend(tab->subcmds[AT_PASS_OLD_INDEX], cmd);
 
5190
                                                                break;
 
5191
                                                        case AT_AddConstraint:
 
5192
                                                                tab->subcmds[AT_PASS_OLD_CONSTR] =
 
5193
                                                                        lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd);
 
5194
                                                                break;
 
5195
                                                        default:
 
5196
                                                                elog(ERROR, "unexpected statement type: %d",
 
5197
                                                                         (int) cmd->subtype);
 
5198
                                                }
 
5199
                                        }
 
5200
                                        relation_close(rel, NoLock);
 
5201
                                        break;
 
5202
                                }
 
5203
                        default:
 
5204
                                elog(ERROR, "unexpected statement type: %d",
 
5205
                                         (int) nodeTag(query->utilityStmt));
 
5206
                }
 
5207
        }
 
5208
}
 
5209
 
 
5210
 
 
5211
/*
 
5212
 * ALTER TABLE OWNER
 
5213
 */
 
5214
static void
 
5215
ATExecChangeOwner(Oid relationOid, int32 newOwnerSysId)
 
5216
{
 
5217
        Relation        target_rel;
 
5218
        Relation        class_rel;
 
5219
        HeapTuple       tuple;
 
5220
        Form_pg_class tuple_class;
 
5221
 
 
5222
        /*
 
5223
         * Get exclusive lock till end of transaction on the target table.
 
5224
         * Use relation_open so that we can work on indexes and sequences.
 
5225
         */
 
5226
        target_rel = relation_open(relationOid, AccessExclusiveLock);
 
5227
 
 
5228
        /* Get its pg_class tuple, too */
 
5229
        class_rel = heap_openr(RelationRelationName, RowExclusiveLock);
 
5230
 
 
5231
        tuple = SearchSysCache(RELOID,
 
5232
                                                   ObjectIdGetDatum(relationOid),
 
5233
                                                   0, 0, 0);
 
5234
        if (!HeapTupleIsValid(tuple))
 
5235
                elog(ERROR, "cache lookup failed for relation %u", relationOid);
 
5236
        tuple_class = (Form_pg_class) GETSTRUCT(tuple);
 
5237
 
 
5238
        /* Can we change the ownership of this tuple? */
 
5239
        switch (tuple_class->relkind)
 
5240
        {
 
5241
                case RELKIND_RELATION:
 
5242
                case RELKIND_INDEX:
 
5243
                case RELKIND_VIEW:
 
5244
                case RELKIND_SEQUENCE:
 
5245
                case RELKIND_TOASTVALUE:
 
5246
                        /* ok to change owner */
 
5247
                        break;
 
5248
                default:
 
5249
                        ereport(ERROR,
 
5250
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
 
5251
                                         errmsg("\"%s\" is not a table, TOAST table, index, view, or sequence",
 
5252
                                                        NameStr(tuple_class->relname))));
 
5253
        }
 
5254
 
 
5255
        /*
 
5256
         * If the new owner is the same as the existing owner, consider the
 
5257
         * command to have succeeded.  This is for dump restoration purposes.
 
5258
         */
 
5259
        if (tuple_class->relowner != newOwnerSysId)
 
5260
        {
 
5261
                Datum           repl_val[Natts_pg_class];
 
5262
                char            repl_null[Natts_pg_class];
 
5263
                char            repl_repl[Natts_pg_class];
 
5264
                Acl                *newAcl;
 
5265
                Datum           aclDatum;
 
5266
                bool            isNull;
 
5267
                HeapTuple       newtuple;
 
5268
 
 
5269
                /* Otherwise, check that we are the superuser */
 
5270
                if (!superuser())
 
5271
                        ereport(ERROR,
 
5272
                                        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 
5273
                                         errmsg("must be superuser to change owner")));
 
5274
 
 
5275
                memset(repl_null, ' ', sizeof(repl_null));
 
5276
                memset(repl_repl, ' ', sizeof(repl_repl));
 
5277
 
 
5278
                repl_repl[Anum_pg_class_relowner - 1] = 'r';
 
5279
                repl_val[Anum_pg_class_relowner - 1] = Int32GetDatum(newOwnerSysId);
 
5280
 
 
5281
                /*
 
5282
                 * Determine the modified ACL for the new owner.  This is only
 
5283
                 * necessary when the ACL is non-null.
 
5284
                 */
 
5285
                aclDatum = SysCacheGetAttr(RELOID, tuple,
 
5286
                                                                   Anum_pg_class_relacl,
 
5287
                                                                   &isNull);
 
5288
                if (!isNull)
 
5289
                {
 
5290
                        newAcl = aclnewowner(DatumGetAclP(aclDatum),
 
5291
                                                                 tuple_class->relowner, newOwnerSysId);
 
5292
                        repl_repl[Anum_pg_class_relacl - 1] = 'r';
 
5293
                        repl_val[Anum_pg_class_relacl - 1] = PointerGetDatum(newAcl);
 
5294
                }
 
5295
 
 
5296
                newtuple = heap_modifytuple(tuple, class_rel, repl_val, repl_null, repl_repl);
 
5297
 
 
5298
                simple_heap_update(class_rel, &newtuple->t_self, newtuple);
 
5299
                CatalogUpdateIndexes(class_rel, newtuple);
 
5300
 
 
5301
                heap_freetuple(newtuple);
 
5302
 
 
5303
                /*
 
5304
                 * If we are operating on a table, also change the ownership of
 
5305
                 * any indexes and sequences that belong to the table, as well as
 
5306
                 * the table's toast table (if it has one)
 
5307
                 */
 
5308
                if (tuple_class->relkind == RELKIND_RELATION ||
 
5309
                        tuple_class->relkind == RELKIND_TOASTVALUE)
 
5310
                {
 
5311
                        List       *index_oid_list;
 
5312
                        ListCell   *i;
 
5313
 
 
5314
                        /* Find all the indexes belonging to this relation */
 
5315
                        index_oid_list = RelationGetIndexList(target_rel);
 
5316
 
 
5317
                        /* For each index, recursively change its ownership */
 
5318
                        foreach(i, index_oid_list)
 
5319
                                ATExecChangeOwner(lfirst_oid(i), newOwnerSysId);
 
5320
 
 
5321
                        list_free(index_oid_list);
 
5322
                }
 
5323
 
 
5324
                if (tuple_class->relkind == RELKIND_RELATION)
 
5325
                {
 
5326
                        /* If it has a toast table, recurse to change its ownership */
 
5327
                        if (tuple_class->reltoastrelid != InvalidOid)
 
5328
                                ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerSysId);
 
5329
 
 
5330
                        /* If it has dependent sequences, recurse to change them too */
 
5331
                        change_owner_recurse_to_sequences(relationOid, newOwnerSysId);
 
5332
                }
 
5333
        }
 
5334
 
 
5335
        ReleaseSysCache(tuple);
 
5336
        heap_close(class_rel, RowExclusiveLock);
 
5337
        relation_close(target_rel, NoLock);
 
5338
}
 
5339
 
 
5340
/*
 
5341
 * change_owner_recurse_to_sequences
 
5342
 *
 
5343
 * Helper function for ATExecChangeOwner.  Examines pg_depend searching
 
5344
 * for sequences that are dependent on serial columns, and changes their
 
5345
 * ownership.
 
5346
 */
 
5347
static void
 
5348
change_owner_recurse_to_sequences(Oid relationOid, int32 newOwnerSysId)
 
5349
{
 
5350
        Relation        depRel;
 
5351
        SysScanDesc scan;
 
5352
        ScanKeyData     key[2];
 
5353
        HeapTuple       tup;
 
5354
 
 
5355
        /*
 
5356
         * SERIAL sequences are those having an internal dependency on one
 
5357
         * of the table's columns (we don't care *which* column, exactly).
 
5358
         */
 
5359
        depRel = heap_openr(DependRelationName, AccessShareLock);
 
5360
 
 
5361
        ScanKeyInit(&key[0],
 
5362
                        Anum_pg_depend_refclassid,
 
5363
                        BTEqualStrategyNumber, F_OIDEQ,
 
5364
                        ObjectIdGetDatum(RelOid_pg_class));
 
5365
        ScanKeyInit(&key[1],
 
5366
                        Anum_pg_depend_refobjid,
 
5367
                        BTEqualStrategyNumber, F_OIDEQ,
 
5368
                        ObjectIdGetDatum(relationOid));
 
5369
        /* we leave refobjsubid unspecified */
 
5370
 
 
5371
        scan = systable_beginscan(depRel, DependReferenceIndex, true,
 
5372
                                                          SnapshotNow, 2, key);
 
5373
 
 
5374
        while (HeapTupleIsValid(tup = systable_getnext(scan)))
 
5375
        {
 
5376
                Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
 
5377
                Relation        seqRel;
 
5378
 
 
5379
                /* skip dependencies other than internal dependencies on columns */
 
5380
                if (depForm->refobjsubid == 0 ||
 
5381
                        depForm->classid != RelOid_pg_class ||
 
5382
                        depForm->objsubid != 0 ||
 
5383
                        depForm->deptype != DEPENDENCY_INTERNAL)
 
5384
                        continue;
 
5385
 
 
5386
                /* Use relation_open just in case it's an index */
 
5387
                seqRel = relation_open(depForm->objid, AccessExclusiveLock);
 
5388
 
 
5389
                /* skip non-sequence relations */
 
5390
                if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
 
5391
                {
 
5392
                        /* No need to keep the lock */
 
5393
                        relation_close(seqRel, AccessExclusiveLock);
 
5394
                        continue;
 
5395
                }
 
5396
 
 
5397
                /* We don't need to close the sequence while we alter it. */
 
5398
                ATExecChangeOwner(depForm->objid, newOwnerSysId);
 
5399
 
 
5400
                /* Now we can close it.  Keep the lock till end of transaction. */
 
5401
                relation_close(seqRel, NoLock);
 
5402
        }
 
5403
 
 
5404
        systable_endscan(scan);
 
5405
 
 
5406
        relation_close(depRel, AccessShareLock);
 
5407
}
 
5408
 
 
5409
/*
 
5410
 * ALTER TABLE CLUSTER ON
 
5411
 *
 
5412
 * The only thing we have to do is to change the indisclustered bits.
 
5413
 */
 
5414
static void
 
5415
ATExecClusterOn(Relation rel, const char *indexName)
 
5416
{
 
5417
        Oid                     indexOid;
 
5418
 
 
5419
        indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace);
 
5420
 
 
5421
        if (!OidIsValid(indexOid))
 
5422
                ereport(ERROR,
 
5423
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
 
5424
                                 errmsg("index \"%s\" for table \"%s\" does not exist",
 
5425
                                                indexName, RelationGetRelationName(rel))));
 
5426
 
 
5427
        /* Check index is valid to cluster on */
 
5428
        check_index_is_clusterable(rel, indexOid);
 
5429
 
 
5430
        /* And do the work */
 
5431
        mark_index_clustered(rel, indexOid);
 
5432
}
 
5433
 
 
5434
/*
 
5435
 * ALTER TABLE SET WITHOUT CLUSTER
 
5436
 *
 
5437
 * We have to find any indexes on the table that have indisclustered bit
 
5438
 * set and turn it off.
 
5439
 */
 
5440
static void
 
5441
ATExecDropCluster(Relation rel)
 
5442
{
 
5443
        mark_index_clustered(rel, InvalidOid);
 
5444
}
 
5445
 
 
5446
/*
 
5447
 * ALTER TABLE SET TABLESPACE
 
5448
 */
 
5449
static void
 
5450
ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
 
5451
{
 
5452
        Oid                     tablespaceId;
 
5453
        AclResult       aclresult;
 
5454
 
 
5455
        /*
 
5456
         * We do our own permission checking because we want to allow this on
 
5457
         * indexes.
 
5458
         */
 
5459
        if (rel->rd_rel->relkind != RELKIND_RELATION &&
 
5460
                rel->rd_rel->relkind != RELKIND_INDEX)
 
5461
                ereport(ERROR,
 
5462
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
 
5463
                                 errmsg("\"%s\" is not a table or index",
 
5464
                                                RelationGetRelationName(rel))));
 
5465
 
 
5466
        /* Permissions checks */
 
5467
        if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
 
5468
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
 
5469
                                           RelationGetRelationName(rel));
 
5470
 
 
5471
        if (!allowSystemTableMods && IsSystemRelation(rel))
 
5472
                ereport(ERROR,
 
5473
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 
5474
                                 errmsg("permission denied: \"%s\" is a system catalog",
 
5475
                                                RelationGetRelationName(rel))));
 
5476
 
 
5477
        /* Check that the tablespace exists */
 
5478
        tablespaceId = get_tablespace_oid(tablespacename);
 
5479
        if (!OidIsValid(tablespaceId))
 
5480
                ereport(ERROR,
 
5481
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
 
5482
                        errmsg("tablespace \"%s\" does not exist", tablespacename)));
 
5483
 
 
5484
        /* Check its permissions */
 
5485
        aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
 
5486
        if (aclresult != ACLCHECK_OK)
 
5487
                aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
 
5488
 
 
5489
        /* Save info for Phase 3 to do the real work */
 
5490
        if (OidIsValid(tab->newTableSpace))
 
5491
                ereport(ERROR,
 
5492
                                (errcode(ERRCODE_SYNTAX_ERROR),
 
5493
                   errmsg("cannot have multiple SET TABLESPACE subcommands")));
 
5494
        tab->newTableSpace = tablespaceId;
 
5495
}
 
5496
 
 
5497
/*
 
5498
 * Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple
 
5499
 * rewriting to be done, so we just want to copy the data as fast as possible.
 
5500
 */
 
5501
static void
 
5502
ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
 
5503
{
 
5504
        Relation        rel;
 
5505
        Oid                     oldTableSpace;
 
5506
        Oid                     reltoastrelid;
 
5507
        Oid                     reltoastidxid;
 
5508
        RelFileNode newrnode;
 
5509
        SMgrRelation dstrel;
 
5510
        Relation        pg_class;
 
5511
        HeapTuple       tuple;
 
5512
        Form_pg_class rd_rel;
 
5513
 
 
5514
        rel = relation_open(tableOid, NoLock);
 
5515
 
 
5516
        /*
 
5517
         * We can never allow moving of shared or nailed-in-cache relations,
 
5518
         * because we can't support changing their reltablespace values.
 
5519
         */
 
5520
        if (rel->rd_rel->relisshared || rel->rd_isnailed)
 
5521
                ereport(ERROR,
 
5522
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
5523
                                 errmsg("cannot move system relation \"%s\"",
 
5524
                                                RelationGetRelationName(rel))));
 
5525
 
 
5526
        /*
 
5527
         * Don't allow moving temp tables of other backends ... their local
 
5528
         * buffer manager is not going to cope.
 
5529
         */
 
5530
        if (isOtherTempNamespace(RelationGetNamespace(rel)))
 
5531
                ereport(ERROR,
 
5532
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
5533
                          errmsg("cannot move temporary tables of other sessions")));
 
5534
 
 
5535
        /*
 
5536
         * No work if no change in tablespace.
 
5537
         */
 
5538
        oldTableSpace = rel->rd_rel->reltablespace;
 
5539
        if (newTableSpace == oldTableSpace ||
 
5540
                (newTableSpace == MyDatabaseTableSpace && oldTableSpace == 0))
 
5541
        {
 
5542
                relation_close(rel, NoLock);
 
5543
                return;
 
5544
        }
 
5545
 
 
5546
        reltoastrelid = rel->rd_rel->reltoastrelid;
 
5547
        reltoastidxid = rel->rd_rel->reltoastidxid;
 
5548
 
 
5549
        /* Get a modifiable copy of the relation's pg_class row */
 
5550
        pg_class = heap_openr(RelationRelationName, RowExclusiveLock);
 
5551
 
 
5552
        tuple = SearchSysCacheCopy(RELOID,
 
5553
                                                           ObjectIdGetDatum(tableOid),
 
5554
                                                           0, 0, 0);
 
5555
        if (!HeapTupleIsValid(tuple))
 
5556
                elog(ERROR, "cache lookup failed for relation %u", tableOid);
 
5557
        rd_rel = (Form_pg_class) GETSTRUCT(tuple);
 
5558
 
 
5559
        /* create another storage file. Is it a little ugly ? */
 
5560
        /* NOTE: any conflict in relfilenode value will be caught here */
 
5561
        newrnode = rel->rd_node;
 
5562
        newrnode.spcNode = newTableSpace;
 
5563
 
 
5564
        dstrel = smgropen(newrnode);
 
5565
        smgrcreate(dstrel, rel->rd_istemp, false);
 
5566
 
 
5567
        /* copy relation data to the new physical file */
 
5568
        copy_relation_data(rel, dstrel);
 
5569
 
 
5570
        /* schedule unlinking old physical file */
 
5571
        RelationOpenSmgr(rel);
 
5572
        smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
 
5573
 
 
5574
        /*
 
5575
         * Now drop smgr references.  The source was already dropped by
 
5576
         * smgrscheduleunlink.
 
5577
         */
 
5578
        smgrclose(dstrel);
 
5579
 
 
5580
        /* update the pg_class row */
 
5581
        rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace;
 
5582
        simple_heap_update(pg_class, &tuple->t_self, tuple);
 
5583
        CatalogUpdateIndexes(pg_class, tuple);
 
5584
 
 
5585
        heap_freetuple(tuple);
 
5586
 
 
5587
        heap_close(pg_class, RowExclusiveLock);
 
5588
 
 
5589
        relation_close(rel, NoLock);
 
5590
 
 
5591
        /* Make sure the reltablespace change is visible */
 
5592
        CommandCounterIncrement();
 
5593
 
 
5594
        /* Move associated toast relation and/or index, too */
 
5595
        if (OidIsValid(reltoastrelid))
 
5596
                ATExecSetTableSpace(reltoastrelid, newTableSpace);
 
5597
        if (OidIsValid(reltoastidxid))
 
5598
                ATExecSetTableSpace(reltoastidxid, newTableSpace);
 
5599
}
 
5600
 
 
5601
/*
 
5602
 * Copy data, block by block
 
5603
 */
 
5604
static void
 
5605
copy_relation_data(Relation rel, SMgrRelation dst)
 
5606
{
 
5607
        SMgrRelation src;
 
5608
        bool            use_wal;
 
5609
        BlockNumber nblocks;
 
5610
        BlockNumber blkno;
 
5611
        char            buf[BLCKSZ];
 
5612
        Page            page = (Page) buf;
 
5613
 
 
5614
        /*
 
5615
         * Since we copy the data directly without looking at the shared
 
5616
         * buffers, we'd better first flush out any pages of the source
 
5617
         * relation that are in shared buffers.  We assume no new pages will
 
5618
         * get loaded into buffers while we are holding exclusive lock on the
 
5619
         * rel.
 
5620
         */
 
5621
        FlushRelationBuffers(rel, 0);
 
5622
 
 
5623
        /*
 
5624
         * We need to log the copied data in WAL iff WAL archiving is enabled
 
5625
         * AND it's not a temp rel.
 
5626
         */
 
5627
        use_wal = XLogArchivingActive() && !rel->rd_istemp;
 
5628
 
 
5629
        nblocks = RelationGetNumberOfBlocks(rel);
 
5630
        /* RelationGetNumberOfBlocks will certainly have opened rd_smgr */
 
5631
        src = rel->rd_smgr;
 
5632
 
 
5633
        for (blkno = 0; blkno < nblocks; blkno++)
 
5634
        {
 
5635
                smgrread(src, blkno, buf);
 
5636
 
 
5637
                /* XLOG stuff */
 
5638
                if (use_wal)
 
5639
                {
 
5640
                        xl_heap_newpage xlrec;
 
5641
                        XLogRecPtr      recptr;
 
5642
                        XLogRecData rdata[2];
 
5643
 
 
5644
                        /* NO ELOG(ERROR) from here till newpage op is logged */
 
5645
                        START_CRIT_SECTION();
 
5646
 
 
5647
                        xlrec.node = dst->smgr_rnode;
 
5648
                        xlrec.blkno = blkno;
 
5649
 
 
5650
                        rdata[0].buffer = InvalidBuffer;
 
5651
                        rdata[0].data = (char *) &xlrec;
 
5652
                        rdata[0].len = SizeOfHeapNewpage;
 
5653
                        rdata[0].next = &(rdata[1]);
 
5654
 
 
5655
                        rdata[1].buffer = InvalidBuffer;
 
5656
                        rdata[1].data = (char *) page;
 
5657
                        rdata[1].len = BLCKSZ;
 
5658
                        rdata[1].next = NULL;
 
5659
 
 
5660
                        recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
 
5661
 
 
5662
                        PageSetLSN(page, recptr);
 
5663
                        PageSetTLI(page, ThisTimeLineID);
 
5664
 
 
5665
                        END_CRIT_SECTION();
 
5666
                }
 
5667
 
 
5668
                /*
 
5669
                 * Now write the page.  We say isTemp = true even if it's not a
 
5670
                 * temp rel, because there's no need for smgr to schedule an fsync
 
5671
                 * for this write; we'll do it ourselves below.
 
5672
                 */
 
5673
                smgrwrite(dst, blkno, buf, true);
 
5674
        }
 
5675
 
 
5676
        /*
 
5677
         * If the rel isn't temp, we must fsync it down to disk before it's
 
5678
         * safe to commit the transaction.      (For a temp rel we don't care
 
5679
         * since the rel will be uninteresting after a crash anyway.)
 
5680
         *
 
5681
         * It's obvious that we must do this when not WAL-logging the copy. It's
 
5682
         * less obvious that we have to do it even if we did WAL-log the
 
5683
         * copied pages.  The reason is that since we're copying outside
 
5684
         * shared buffers, a CHECKPOINT occurring during the copy has no way
 
5685
         * to flush the previously written data to disk (indeed it won't know
 
5686
         * the new rel even exists).  A crash later on would replay WAL from
 
5687
         * the checkpoint, therefore it wouldn't replay our earlier WAL
 
5688
         * entries. If we do not fsync those pages here, they might still not
 
5689
         * be on disk when the crash occurs.
 
5690
         */
 
5691
        if (!rel->rd_istemp)
 
5692
                smgrimmedsync(dst);
 
5693
}
 
5694
 
 
5695
/*
 
5696
 * ALTER TABLE CREATE TOAST TABLE
 
5697
 *
 
5698
 * Note: this is also invoked from outside this module; in such cases we
 
5699
 * expect the caller to have verified that the relation is a table and we
 
5700
 * have all the right permissions.      Callers expect this function
 
5701
 * to end with CommandCounterIncrement if it makes any changes.
 
5702
 */
 
5703
void
 
5704
AlterTableCreateToastTable(Oid relOid, bool silent)
 
5705
{
 
5706
        Relation        rel;
 
5707
        HeapTuple       reltup;
 
5708
        TupleDesc       tupdesc;
 
5709
        bool            shared_relation;
 
5710
        Relation        class_rel;
 
5711
        Oid                     toast_relid;
 
5712
        Oid                     toast_idxid;
 
5713
        char            toast_relname[NAMEDATALEN];
 
5714
        char            toast_idxname[NAMEDATALEN];
 
5715
        IndexInfo  *indexInfo;
 
5716
        Oid                     classObjectId[2];
 
5717
        ObjectAddress baseobject,
 
5718
                                toastobject;
 
5719
 
 
5720
        /*
 
5721
         * Grab an exclusive lock on the target table, which we will NOT
 
5722
         * release until end of transaction.  (This is probably redundant in
 
5723
         * all present uses...)
 
5724
         */
 
5725
        rel = heap_open(relOid, AccessExclusiveLock);
 
5726
 
 
5727
        /*
 
5728
         * Toast table is shared if and only if its parent is.
 
5729
         *
 
5730
         * We cannot allow toasting a shared relation after initdb (because
 
5731
         * there's no way to mark it toasted in other databases' pg_class).
 
5732
         * Unfortunately we can't distinguish initdb from a manually started
 
5733
         * standalone backend (toasting happens after the bootstrap phase, so
 
5734
         * checking IsBootstrapProcessingMode() won't work).  However, we can
 
5735
         * at least prevent this mistake under normal multi-user operation.
 
5736
         */
 
5737
        shared_relation = rel->rd_rel->relisshared;
 
5738
        if (shared_relation && IsUnderPostmaster)
 
5739
                ereport(ERROR,
 
5740
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 
5741
                                 errmsg("shared tables cannot be toasted after initdb")));
 
5742
 
 
5743
        /*
 
5744
         * Is it already toasted?
 
5745
         */
 
5746
        if (rel->rd_rel->reltoastrelid != InvalidOid)
 
5747
        {
 
5748
                if (silent)
 
5749
                {
 
5750
                        heap_close(rel, NoLock);
 
5751
                        return;
 
5752
                }
 
5753
 
 
5754
                ereport(ERROR,
 
5755
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 
5756
                                 errmsg("table \"%s\" already has a TOAST table",
 
5757
                                                RelationGetRelationName(rel))));
 
5758
        }
 
5759
 
 
5760
        /*
 
5761
         * Check to see whether the table actually needs a TOAST table.
 
5762
         */
 
5763
        if (!needs_toast_table(rel))
 
5764
        {
 
5765
                if (silent)
 
5766
                {
 
5767
                        heap_close(rel, NoLock);
 
5768
                        return;
 
5769
                }
 
5770
 
 
5771
                ereport(ERROR,
 
5772
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 
5773
                                 errmsg("table \"%s\" does not need a TOAST table",
 
5774
                                                RelationGetRelationName(rel))));
 
5775
        }
 
5776
 
 
5777
        /*
 
5778
         * Create the toast table and its index
 
5779
         */
 
5780
        snprintf(toast_relname, sizeof(toast_relname),
 
5781
                         "pg_toast_%u", relOid);
 
5782
        snprintf(toast_idxname, sizeof(toast_idxname),
 
5783
                         "pg_toast_%u_index", relOid);
 
5784
 
 
5785
        /* this is pretty painful...  need a tuple descriptor */
 
5786
        tupdesc = CreateTemplateTupleDesc(3, false);
 
5787
        TupleDescInitEntry(tupdesc, (AttrNumber) 1,
 
5788
                                           "chunk_id",
 
5789
                                           OIDOID,
 
5790
                                           -1, 0);
 
5791
        TupleDescInitEntry(tupdesc, (AttrNumber) 2,
 
5792
                                           "chunk_seq",
 
5793
                                           INT4OID,
 
5794
                                           -1, 0);
 
5795
        TupleDescInitEntry(tupdesc, (AttrNumber) 3,
 
5796
                                           "chunk_data",
 
5797
                                           BYTEAOID,
 
5798
                                           -1, 0);
 
5799
 
 
5800
        /*
 
5801
         * Ensure that the toast table doesn't itself get toasted, or we'll be
 
5802
         * toast :-(.  This is essential for chunk_data because type bytea is
 
5803
         * toastable; hit the other two just to be sure.
 
5804
         */
 
5805
        tupdesc->attrs[0]->attstorage = 'p';
 
5806
        tupdesc->attrs[1]->attstorage = 'p';
 
5807
        tupdesc->attrs[2]->attstorage = 'p';
 
5808
 
 
5809
        /*
 
5810
         * Note: the toast relation is placed in the regular pg_toast
 
5811
         * namespace even if its master relation is a temp table.  There
 
5812
         * cannot be any naming collision, and the toast rel will be destroyed
 
5813
         * when its master is, so there's no need to handle the toast rel as
 
5814
         * temp.
 
5815
         */
 
5816
        toast_relid = heap_create_with_catalog(toast_relname,
 
5817
                                                                                   PG_TOAST_NAMESPACE,
 
5818
                                                                                   rel->rd_rel->reltablespace,
 
5819
                                                                                   tupdesc,
 
5820
                                                                                   RELKIND_TOASTVALUE,
 
5821
                                                                                   shared_relation,
 
5822
                                                                                   true,
 
5823
                                                                                   0,
 
5824
                                                                                   ONCOMMIT_NOOP,
 
5825
                                                                                   true);
 
5826
 
 
5827
        /* make the toast relation visible, else index creation will fail */
 
5828
        CommandCounterIncrement();
 
5829
 
 
5830
        /*
 
5831
         * Create unique index on chunk_id, chunk_seq.
 
5832
         *
 
5833
         * NOTE: the normal TOAST access routines could actually function with a
 
5834
         * single-column index on chunk_id only. However, the slice access
 
5835
         * routines use both columns for faster access to an individual chunk.
 
5836
         * In addition, we want it to be unique as a check against the
 
5837
         * possibility of duplicate TOAST chunk OIDs. The index might also be
 
5838
         * a little more efficient this way, since btree isn't all that happy
 
5839
         * with large numbers of equal keys.
 
5840
         */
 
5841
 
 
5842
        indexInfo = makeNode(IndexInfo);
 
5843
        indexInfo->ii_NumIndexAttrs = 2;
 
5844
        indexInfo->ii_KeyAttrNumbers[0] = 1;
 
5845
        indexInfo->ii_KeyAttrNumbers[1] = 2;
 
5846
        indexInfo->ii_Expressions = NIL;
 
5847
        indexInfo->ii_ExpressionsState = NIL;
 
5848
        indexInfo->ii_Predicate = NIL;
 
5849
        indexInfo->ii_PredicateState = NIL;
 
5850
        indexInfo->ii_Unique = true;
 
5851
 
 
5852
        classObjectId[0] = OID_BTREE_OPS_OID;
 
5853
        classObjectId[1] = INT4_BTREE_OPS_OID;
 
5854
 
 
5855
        toast_idxid = index_create(toast_relid, toast_idxname, indexInfo,
 
5856
                                                           BTREE_AM_OID,
 
5857
                                                           rel->rd_rel->reltablespace,
 
5858
                                                           classObjectId,
 
5859
                                                           true, false, true, false);
 
5860
 
 
5861
        /*
 
5862
         * Update toast rel's pg_class entry to show that it has an index. The
 
5863
         * index OID is stored into the reltoastidxid field for easy access by
 
5864
         * the tuple toaster.
 
5865
         */
 
5866
        setRelhasindex(toast_relid, true, true, toast_idxid);
 
5867
 
 
5868
        /*
 
5869
         * Store the toast table's OID in the parent relation's pg_class row
 
5870
         */
 
5871
        class_rel = heap_openr(RelationRelationName, RowExclusiveLock);
 
5872
 
 
5873
        reltup = SearchSysCacheCopy(RELOID,
 
5874
                                                                ObjectIdGetDatum(relOid),
 
5875
                                                                0, 0, 0);
 
5876
        if (!HeapTupleIsValid(reltup))
 
5877
                elog(ERROR, "cache lookup failed for relation %u", relOid);
 
5878
 
 
5879
        ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
 
5880
 
 
5881
        simple_heap_update(class_rel, &reltup->t_self, reltup);
 
5882
 
 
5883
        /* Keep catalog indexes current */
 
5884
        CatalogUpdateIndexes(class_rel, reltup);
 
5885
 
 
5886
        heap_freetuple(reltup);
 
5887
 
 
5888
        heap_close(class_rel, RowExclusiveLock);
 
5889
 
 
5890
        /*
 
5891
         * Register dependency from the toast table to the master, so that the
 
5892
         * toast table will be deleted if the master is.
 
5893
         */
 
5894
        baseobject.classId = RelOid_pg_class;
 
5895
        baseobject.objectId = relOid;
 
5896
        baseobject.objectSubId = 0;
 
5897
        toastobject.classId = RelOid_pg_class;
 
5898
        toastobject.objectId = toast_relid;
 
5899
        toastobject.objectSubId = 0;
 
5900
 
 
5901
        recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
 
5902
 
 
5903
        /*
 
5904
         * Clean up and make changes visible
 
5905
         */
 
5906
        heap_close(rel, NoLock);
 
5907
 
 
5908
        CommandCounterIncrement();
 
5909
}
 
5910
 
 
5911
/*
 
5912
 * Check to see whether the table needs a TOAST table.  It does only if
 
5913
 * (1) there are any toastable attributes, and (2) the maximum length
 
5914
 * of a tuple could exceed TOAST_TUPLE_THRESHOLD.  (We don't want to
 
5915
 * create a toast table for something like "f1 varchar(20)".)
 
5916
 */
 
5917
static bool
 
5918
needs_toast_table(Relation rel)
 
5919
{
 
5920
        int32           data_length = 0;
 
5921
        bool            maxlength_unknown = false;
 
5922
        bool            has_toastable_attrs = false;
 
5923
        TupleDesc       tupdesc;
 
5924
        Form_pg_attribute *att;
 
5925
        int32           tuple_length;
 
5926
        int                     i;
 
5927
 
 
5928
        tupdesc = rel->rd_att;
 
5929
        att = tupdesc->attrs;
 
5930
 
 
5931
        for (i = 0; i < tupdesc->natts; i++)
 
5932
        {
 
5933
                if (att[i]->attisdropped)
 
5934
                        continue;
 
5935
                data_length = att_align(data_length, att[i]->attalign);
 
5936
                if (att[i]->attlen > 0)
 
5937
                {
 
5938
                        /* Fixed-length types are never toastable */
 
5939
                        data_length += att[i]->attlen;
 
5940
                }
 
5941
                else
 
5942
                {
 
5943
                        int32           maxlen = type_maximum_size(att[i]->atttypid,
 
5944
                                                                                                   att[i]->atttypmod);
 
5945
 
 
5946
                        if (maxlen < 0)
 
5947
                                maxlength_unknown = true;
 
5948
                        else
 
5949
                                data_length += maxlen;
 
5950
                        if (att[i]->attstorage != 'p')
 
5951
                                has_toastable_attrs = true;
 
5952
                }
 
5953
        }
 
5954
        if (!has_toastable_attrs)
 
5955
                return false;                   /* nothing to toast? */
 
5956
        if (maxlength_unknown)
 
5957
                return true;                    /* any unlimited-length attrs? */
 
5958
        tuple_length = MAXALIGN(offsetof(HeapTupleHeaderData, t_bits) +
 
5959
                                                        BITMAPLEN(tupdesc->natts)) +
 
5960
                MAXALIGN(data_length);
 
5961
        return (tuple_length > TOAST_TUPLE_THRESHOLD);
 
5962
}
 
5963
 
 
5964
 
 
5965
/*
 
5966
 * This code supports
 
5967
 *      CREATE TEMP TABLE ... ON COMMIT { DROP | PRESERVE ROWS | DELETE ROWS }
 
5968
 *
 
5969
 * Because we only support this for TEMP tables, it's sufficient to remember
 
5970
 * the state in a backend-local data structure.
 
5971
 */
 
5972
 
 
5973
/*
 
5974
 * Register a newly-created relation's ON COMMIT action.
 
5975
 */
 
5976
void
 
5977
register_on_commit_action(Oid relid, OnCommitAction action)
 
5978
{
 
5979
        OnCommitItem *oc;
 
5980
        MemoryContext oldcxt;
 
5981
 
 
5982
        /*
 
5983
         * We needn't bother registering the relation unless there is an ON
 
5984
         * COMMIT action we need to take.
 
5985
         */
 
5986
        if (action == ONCOMMIT_NOOP || action == ONCOMMIT_PRESERVE_ROWS)
 
5987
                return;
 
5988
 
 
5989
        oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 
5990
 
 
5991
        oc = (OnCommitItem *) palloc(sizeof(OnCommitItem));
 
5992
        oc->relid = relid;
 
5993
        oc->oncommit = action;
 
5994
        oc->creating_subid = GetCurrentSubTransactionId();
 
5995
        oc->deleting_subid = InvalidSubTransactionId;
 
5996
 
 
5997
        on_commits = lcons(oc, on_commits);
 
5998
 
 
5999
        MemoryContextSwitchTo(oldcxt);
 
6000
}
 
6001
 
 
6002
/*
 
6003
 * Unregister any ON COMMIT action when a relation is deleted.
 
6004
 *
 
6005
 * Actually, we only mark the OnCommitItem entry as to be deleted after commit.
 
6006
 */
 
6007
void
 
6008
remove_on_commit_action(Oid relid)
 
6009
{
 
6010
        ListCell   *l;
 
6011
 
 
6012
        foreach(l, on_commits)
 
6013
        {
 
6014
                OnCommitItem *oc = (OnCommitItem *) lfirst(l);
 
6015
 
 
6016
                if (oc->relid == relid)
 
6017
                {
 
6018
                        oc->deleting_subid = GetCurrentSubTransactionId();
 
6019
                        break;
 
6020
                }
 
6021
        }
 
6022
}
 
6023
 
 
6024
/*
 
6025
 * Perform ON COMMIT actions.
 
6026
 *
 
6027
 * This is invoked just before actually committing, since it's possible
 
6028
 * to encounter errors.
 
6029
 */
 
6030
void
 
6031
PreCommit_on_commit_actions(void)
 
6032
{
 
6033
        ListCell   *l;
 
6034
 
 
6035
        foreach(l, on_commits)
 
6036
        {
 
6037
                OnCommitItem *oc = (OnCommitItem *) lfirst(l);
 
6038
 
 
6039
                /* Ignore entry if already dropped in this xact */
 
6040
                if (oc->deleting_subid != InvalidSubTransactionId)
 
6041
                        continue;
 
6042
 
 
6043
                switch (oc->oncommit)
 
6044
                {
 
6045
                        case ONCOMMIT_NOOP:
 
6046
                        case ONCOMMIT_PRESERVE_ROWS:
 
6047
                                /* Do nothing (there shouldn't be such entries, actually) */
 
6048
                                break;
 
6049
                        case ONCOMMIT_DELETE_ROWS:
 
6050
                                heap_truncate(oc->relid);
 
6051
                                CommandCounterIncrement();              /* XXX needed? */
 
6052
                                break;
 
6053
                        case ONCOMMIT_DROP:
 
6054
                                {
 
6055
                                        ObjectAddress object;
 
6056
 
 
6057
                                        object.classId = RelOid_pg_class;
 
6058
                                        object.objectId = oc->relid;
 
6059
                                        object.objectSubId = 0;
 
6060
                                        performDeletion(&object, DROP_CASCADE);
 
6061
 
 
6062
                                        /*
 
6063
                                         * Note that table deletion will call
 
6064
                                         * remove_on_commit_action, so the entry should get
 
6065
                                         * marked as deleted.
 
6066
                                         */
 
6067
                                        Assert(oc->deleting_subid != InvalidSubTransactionId);
 
6068
                                        break;
 
6069
                                }
 
6070
                }
 
6071
        }
 
6072
}
 
6073
 
 
6074
/*
 
6075
 * Post-commit or post-abort cleanup for ON COMMIT management.
 
6076
 *
 
6077
 * All we do here is remove no-longer-needed OnCommitItem entries.
 
6078
 *
 
6079
 * During commit, remove entries that were deleted during this transaction;
 
6080
 * during abort, remove those created during this transaction.
 
6081
 */
 
6082
void
 
6083
AtEOXact_on_commit_actions(bool isCommit)
 
6084
{
 
6085
        ListCell   *cur_item;
 
6086
        ListCell   *prev_item;
 
6087
 
 
6088
        prev_item = NULL;
 
6089
        cur_item = list_head(on_commits);
 
6090
 
 
6091
        while (cur_item != NULL)
 
6092
        {
 
6093
                OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
 
6094
 
 
6095
                if (isCommit ? oc->deleting_subid != InvalidSubTransactionId :
 
6096
                        oc->creating_subid != InvalidSubTransactionId)
 
6097
                {
 
6098
                        /* cur_item must be removed */
 
6099
                        on_commits = list_delete_cell(on_commits, cur_item, prev_item);
 
6100
                        pfree(oc);
 
6101
                        if (prev_item)
 
6102
                                cur_item = lnext(prev_item);
 
6103
                        else
 
6104
                                cur_item = list_head(on_commits);
 
6105
                }
 
6106
                else
 
6107
                {
 
6108
                        /* cur_item must be preserved */
 
6109
                        oc->creating_subid = InvalidSubTransactionId;
 
6110
                        oc->deleting_subid = InvalidSubTransactionId;
 
6111
                        prev_item = cur_item;
 
6112
                        cur_item = lnext(prev_item);
 
6113
                }
 
6114
        }
 
6115
}
 
6116
 
 
6117
/*
 
6118
 * Post-subcommit or post-subabort cleanup for ON COMMIT management.
 
6119
 *
 
6120
 * During subabort, we can immediately remove entries created during this
 
6121
 * subtransaction.      During subcommit, just relabel entries marked during
 
6122
 * this subtransaction as being the parent's responsibility.
 
6123
 */
 
6124
void
 
6125
AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
 
6126
                                                          SubTransactionId parentSubid)
 
6127
{
 
6128
        ListCell   *cur_item;
 
6129
        ListCell   *prev_item;
 
6130
 
 
6131
        prev_item = NULL;
 
6132
        cur_item = list_head(on_commits);
 
6133
 
 
6134
        while (cur_item != NULL)
 
6135
        {
 
6136
                OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
 
6137
 
 
6138
                if (!isCommit && oc->creating_subid == mySubid)
 
6139
                {
 
6140
                        /* cur_item must be removed */
 
6141
                        on_commits = list_delete_cell(on_commits, cur_item, prev_item);
 
6142
                        pfree(oc);
 
6143
                        if (prev_item)
 
6144
                                cur_item = lnext(prev_item);
 
6145
                        else
 
6146
                                cur_item = list_head(on_commits);
 
6147
                }
 
6148
                else
 
6149
                {
 
6150
                        /* cur_item must be preserved */
 
6151
                        if (oc->creating_subid == mySubid)
 
6152
                                oc->creating_subid = parentSubid;
 
6153
                        if (oc->deleting_subid == mySubid)
 
6154
                                oc->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId;
 
6155
                        prev_item = cur_item;
 
6156
                        cur_item = lnext(prev_item);
 
6157
                }
 
6158
        }
 
6159
}