~ubuntu-branches/debian/experimental/postgresql-11/experimental

« back to all changes in this revision

Viewing changes to src/backend/access/transam/twophase.c

  • Committer: Package Import Robot
  • Author(s): Christoph Berg
  • Date: 2018-05-22 14:19:08 UTC
  • Revision ID: package-import@ubuntu.com-20180522141908-0oy9ujs1b5vrda74
Tags: upstream-11~beta1
ImportĀ upstreamĀ versionĀ 11~beta1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * twophase.c
 
4
 *              Two-phase commit support functions.
 
5
 *
 
6
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
 
7
 * Portions Copyright (c) 1994, Regents of the University of California
 
8
 *
 
9
 * IDENTIFICATION
 
10
 *              src/backend/access/transam/twophase.c
 
11
 *
 
12
 * NOTES
 
13
 *              Each global transaction is associated with a global transaction
 
14
 *              identifier (GID). The client assigns a GID to a postgres
 
15
 *              transaction with the PREPARE TRANSACTION command.
 
16
 *
 
17
 *              We keep all active global transactions in a shared memory array.
 
18
 *              When the PREPARE TRANSACTION command is issued, the GID is
 
19
 *              reserved for the transaction in the array. This is done before
 
20
 *              a WAL entry is made, because the reservation checks for duplicate
 
21
 *              GIDs and aborts the transaction if there already is a global
 
22
 *              transaction in prepared state with the same GID.
 
23
 *
 
24
 *              A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
 
25
 *              what keeps the XID considered running by TransactionIdIsInProgress.
 
26
 *              It is also convenient as a PGPROC to hook the gxact's locks to.
 
27
 *
 
28
 *              Information to recover prepared transactions in case of crash is
 
29
 *              now stored in WAL for the common case. In some cases there will be
 
30
 *              an extended period between preparing a GXACT and commit/abort, in
 
31
 *              which case we need to separately record prepared transaction data
 
32
 *              in permanent storage. This includes locking information, pending
 
33
 *              notifications etc. All that state information is written to the
 
34
 *              per-transaction state file in the pg_twophase directory.
 
35
 *              All prepared transactions will be written prior to shutdown.
 
36
 *
 
37
 *              Life track of state data is following:
 
38
 *
 
39
 *              * On PREPARE TRANSACTION backend writes state data only to the WAL and
 
40
 *                stores pointer to the start of the WAL record in
 
41
 *                gxact->prepare_start_lsn.
 
42
 *              * If COMMIT occurs before checkpoint then backend reads data from WAL
 
43
 *                using prepare_start_lsn.
 
44
 *              * On checkpoint state data copied to files in pg_twophase directory and
 
45
 *                fsynced
 
46
 *              * If COMMIT happens after checkpoint then backend reads state data from
 
47
 *                files
 
48
 *
 
49
 *              During replay and replication, TwoPhaseState also holds information
 
50
 *              about active prepared transactions that haven't been moved to disk yet.
 
51
 *
 
52
 *              Replay of twophase records happens by the following rules:
 
53
 *
 
54
 *              * At the beginning of recovery, pg_twophase is scanned once, filling
 
55
 *                TwoPhaseState with entries marked with gxact->inredo and
 
56
 *                gxact->ondisk.  Two-phase file data older than the XID horizon of
 
57
 *                the redo position are discarded.
 
58
 *              * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
 
59
 *                gxact->inredo is set to true for such entries.
 
60
 *              * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
 
61
 *                that have gxact->inredo set and are behind the redo_horizon. We
 
62
 *                save them to disk and then switch gxact->ondisk to true.
 
63
 *              * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
 
64
 *                If gxact->ondisk is true, the corresponding entry from the disk
 
65
 *                is additionally deleted.
 
66
 *              * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
 
67
 *                and PrescanPreparedTransactions() have been modified to go through
 
68
 *                gxact->inredo entries that have not made it to disk.
 
69
 *
 
70
 *-------------------------------------------------------------------------
 
71
 */
 
72
#include "postgres.h"
 
73
 
 
74
#include <fcntl.h>
 
75
#include <sys/stat.h>
 
76
#include <time.h>
 
77
#include <unistd.h>
 
78
 
 
79
#include "access/commit_ts.h"
 
80
#include "access/htup_details.h"
 
81
#include "access/subtrans.h"
 
82
#include "access/transam.h"
 
83
#include "access/twophase.h"
 
84
#include "access/twophase_rmgr.h"
 
85
#include "access/xact.h"
 
86
#include "access/xlog.h"
 
87
#include "access/xloginsert.h"
 
88
#include "access/xlogutils.h"
 
89
#include "access/xlogreader.h"
 
90
#include "catalog/pg_type.h"
 
91
#include "catalog/storage.h"
 
92
#include "funcapi.h"
 
93
#include "miscadmin.h"
 
94
#include "pg_trace.h"
 
95
#include "pgstat.h"
 
96
#include "replication/origin.h"
 
97
#include "replication/syncrep.h"
 
98
#include "replication/walsender.h"
 
99
#include "storage/fd.h"
 
100
#include "storage/ipc.h"
 
101
#include "storage/predicate.h"
 
102
#include "storage/proc.h"
 
103
#include "storage/procarray.h"
 
104
#include "storage/sinvaladt.h"
 
105
#include "storage/smgr.h"
 
106
#include "utils/builtins.h"
 
107
#include "utils/memutils.h"
 
108
#include "utils/timestamp.h"
 
109
 
 
110
 
 
111
/*
 
112
 * Directory where Two-phase commit files reside within PGDATA
 
113
 */
 
114
#define TWOPHASE_DIR "pg_twophase"
 
115
 
 
116
/* GUC variable, can't be changed after startup */
 
117
int                     max_prepared_xacts = 0;
 
118
 
 
119
/*
 
120
 * This struct describes one global transaction that is in prepared state
 
121
 * or attempting to become prepared.
 
122
 *
 
123
 * The lifecycle of a global transaction is:
 
124
 *
 
125
 * 1. After checking that the requested GID is not in use, set up an entry in
 
126
 * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
 
127
 * and mark it as locked by my backend.
 
128
 *
 
129
 * 2. After successfully completing prepare, set valid = true and enter the
 
130
 * referenced PGPROC into the global ProcArray.
 
131
 *
 
132
 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
 
133
 * valid and not locked, then mark the entry as locked by storing my current
 
134
 * backend ID into locking_backend.  This prevents concurrent attempts to
 
135
 * commit or rollback the same prepared xact.
 
136
 *
 
137
 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
 
138
 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
 
139
 * the freelist.
 
140
 *
 
141
 * Note that if the preparing transaction fails between steps 1 and 2, the
 
142
 * entry must be removed so that the GID and the GlobalTransaction struct
 
143
 * can be reused.  See AtAbort_Twophase().
 
144
 *
 
145
 * typedef struct GlobalTransactionData *GlobalTransaction appears in
 
146
 * twophase.h
 
147
 */
 
148
 
 
149
typedef struct GlobalTransactionData
 
150
{
 
151
        GlobalTransaction next;         /* list link for free list */
 
152
        int                     pgprocno;               /* ID of associated dummy PGPROC */
 
153
        BackendId       dummyBackendId; /* similar to backend id for backends */
 
154
        TimestampTz prepared_at;        /* time of preparation */
 
155
 
 
156
        /*
 
157
         * Note that we need to keep track of two LSNs for each GXACT. We keep
 
158
         * track of the start LSN because this is the address we must use to read
 
159
         * state data back from WAL when committing a prepared GXACT. We keep
 
160
         * track of the end LSN because that is the LSN we need to wait for prior
 
161
         * to commit.
 
162
         */
 
163
        XLogRecPtr      prepare_start_lsn;      /* XLOG offset of prepare record start */
 
164
        XLogRecPtr      prepare_end_lsn;        /* XLOG offset of prepare record end */
 
165
        TransactionId xid;                      /* The GXACT id */
 
166
 
 
167
        Oid                     owner;                  /* ID of user that executed the xact */
 
168
        BackendId       locking_backend;        /* backend currently working on the xact */
 
169
        bool            valid;                  /* true if PGPROC entry is in proc array */
 
170
        bool            ondisk;                 /* true if prepare state file is on disk */
 
171
        bool            inredo;                 /* true if entry was added via xlog_redo */
 
172
        char            gid[GIDSIZE];   /* The GID assigned to the prepared xact */
 
173
}                       GlobalTransactionData;
 
174
 
 
175
/*
 
176
 * Two Phase Commit shared state.  Access to this struct is protected
 
177
 * by TwoPhaseStateLock.
 
178
 */
 
179
typedef struct TwoPhaseStateData
 
180
{
 
181
        /* Head of linked list of free GlobalTransactionData structs */
 
182
        GlobalTransaction freeGXacts;
 
183
 
 
184
        /* Number of valid prepXacts entries. */
 
185
        int                     numPrepXacts;
 
186
 
 
187
        /* There are max_prepared_xacts items in this array */
 
188
        GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
 
189
} TwoPhaseStateData;
 
190
 
 
191
static TwoPhaseStateData *TwoPhaseState;
 
192
 
 
193
/*
 
194
 * Global transaction entry currently locked by us, if any.  Note that any
 
195
 * access to the entry pointed to by this variable must be protected by
 
196
 * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
 
197
 * (since it's just local memory).
 
198
 */
 
199
static GlobalTransaction MyLockedGxact = NULL;
 
200
 
 
201
static bool twophaseExitRegistered = false;
 
202
 
 
203
static void RecordTransactionCommitPrepared(TransactionId xid,
 
204
                                                                int nchildren,
 
205
                                                                TransactionId *children,
 
206
                                                                int nrels,
 
207
                                                                RelFileNode *rels,
 
208
                                                                int ninvalmsgs,
 
209
                                                                SharedInvalidationMessage *invalmsgs,
 
210
                                                                bool initfileinval,
 
211
                                                                const char *gid);
 
212
static void RecordTransactionAbortPrepared(TransactionId xid,
 
213
                                                           int nchildren,
 
214
                                                           TransactionId *children,
 
215
                                                           int nrels,
 
216
                                                           RelFileNode *rels,
 
217
                                                           const char *gid);
 
218
static void ProcessRecords(char *bufptr, TransactionId xid,
 
219
                           const TwoPhaseCallback callbacks[]);
 
220
static void RemoveGXact(GlobalTransaction gxact);
 
221
 
 
222
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
 
223
static char *ProcessTwoPhaseBuffer(TransactionId xid,
 
224
                                          XLogRecPtr prepare_start_lsn,
 
225
                                          bool fromdisk, bool setParent, bool setNextXid);
 
226
static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
 
227
                                        const char *gid, TimestampTz prepared_at, Oid owner,
 
228
                                        Oid databaseid);
 
229
static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
 
230
static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
 
231
 
 
232
/*
 
233
 * Initialization of shared memory
 
234
 */
 
235
Size
 
236
TwoPhaseShmemSize(void)
 
237
{
 
238
        Size            size;
 
239
 
 
240
        /* Need the fixed struct, the array of pointers, and the GTD structs */
 
241
        size = offsetof(TwoPhaseStateData, prepXacts);
 
242
        size = add_size(size, mul_size(max_prepared_xacts,
 
243
                                                                   sizeof(GlobalTransaction)));
 
244
        size = MAXALIGN(size);
 
245
        size = add_size(size, mul_size(max_prepared_xacts,
 
246
                                                                   sizeof(GlobalTransactionData)));
 
247
 
 
248
        return size;
 
249
}
 
250
 
 
251
void
 
252
TwoPhaseShmemInit(void)
 
253
{
 
254
        bool            found;
 
255
 
 
256
        TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
 
257
                                                                        TwoPhaseShmemSize(),
 
258
                                                                        &found);
 
259
        if (!IsUnderPostmaster)
 
260
        {
 
261
                GlobalTransaction gxacts;
 
262
                int                     i;
 
263
 
 
264
                Assert(!found);
 
265
                TwoPhaseState->freeGXacts = NULL;
 
266
                TwoPhaseState->numPrepXacts = 0;
 
267
 
 
268
                /*
 
269
                 * Initialize the linked list of free GlobalTransactionData structs
 
270
                 */
 
271
                gxacts = (GlobalTransaction)
 
272
                        ((char *) TwoPhaseState +
 
273
                         MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
 
274
                                          sizeof(GlobalTransaction) * max_prepared_xacts));
 
275
                for (i = 0; i < max_prepared_xacts; i++)
 
276
                {
 
277
                        /* insert into linked list */
 
278
                        gxacts[i].next = TwoPhaseState->freeGXacts;
 
279
                        TwoPhaseState->freeGXacts = &gxacts[i];
 
280
 
 
281
                        /* associate it with a PGPROC assigned by InitProcGlobal */
 
282
                        gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
 
283
 
 
284
                        /*
 
285
                         * Assign a unique ID for each dummy proc, so that the range of
 
286
                         * dummy backend IDs immediately follows the range of normal
 
287
                         * backend IDs. We don't dare to assign a real backend ID to dummy
 
288
                         * procs, because prepared transactions don't take part in cache
 
289
                         * invalidation like a real backend ID would imply, but having a
 
290
                         * unique ID for them is nevertheless handy. This arrangement
 
291
                         * allows you to allocate an array of size (MaxBackends +
 
292
                         * max_prepared_xacts + 1), and have a slot for every backend and
 
293
                         * prepared transaction. Currently multixact.c uses that
 
294
                         * technique.
 
295
                         */
 
296
                        gxacts[i].dummyBackendId = MaxBackends + 1 + i;
 
297
                }
 
298
        }
 
299
        else
 
300
                Assert(found);
 
301
}
 
302
 
 
303
/*
 
304
 * Exit hook to unlock the global transaction entry we're working on.
 
305
 */
 
306
static void
 
307
AtProcExit_Twophase(int code, Datum arg)
 
308
{
 
309
        /* same logic as abort */
 
310
        AtAbort_Twophase();
 
311
}
 
312
 
 
313
/*
 
314
 * Abort hook to unlock the global transaction entry we're working on.
 
315
 */
 
316
void
 
317
AtAbort_Twophase(void)
 
318
{
 
319
        if (MyLockedGxact == NULL)
 
320
                return;
 
321
 
 
322
        /*
 
323
         * What to do with the locked global transaction entry?  If we were in the
 
324
         * process of preparing the transaction, but haven't written the WAL
 
325
         * record and state file yet, the transaction must not be considered as
 
326
         * prepared.  Likewise, if we are in the process of finishing an
 
327
         * already-prepared transaction, and fail after having already written the
 
328
         * 2nd phase commit or rollback record to the WAL, the transaction should
 
329
         * not be considered as prepared anymore.  In those cases, just remove the
 
330
         * entry from shared memory.
 
331
         *
 
332
         * Otherwise, the entry must be left in place so that the transaction can
 
333
         * be finished later, so just unlock it.
 
334
         *
 
335
         * If we abort during prepare, after having written the WAL record, we
 
336
         * might not have transferred all locks and other state to the prepared
 
337
         * transaction yet.  Likewise, if we abort during commit or rollback,
 
338
         * after having written the WAL record, we might not have released all the
 
339
         * resources held by the transaction yet.  In those cases, the in-memory
 
340
         * state can be wrong, but it's too late to back out.
 
341
         */
 
342
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
343
        if (!MyLockedGxact->valid)
 
344
                RemoveGXact(MyLockedGxact);
 
345
        else
 
346
                MyLockedGxact->locking_backend = InvalidBackendId;
 
347
        LWLockRelease(TwoPhaseStateLock);
 
348
 
 
349
        MyLockedGxact = NULL;
 
350
}
 
351
 
 
352
/*
 
353
 * This is called after we have finished transferring state to the prepared
 
354
 * PGXACT entry.
 
355
 */
 
356
void
 
357
PostPrepare_Twophase(void)
 
358
{
 
359
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
360
        MyLockedGxact->locking_backend = InvalidBackendId;
 
361
        LWLockRelease(TwoPhaseStateLock);
 
362
 
 
363
        MyLockedGxact = NULL;
 
364
}
 
365
 
 
366
 
 
367
/*
 
368
 * MarkAsPreparing
 
369
 *              Reserve the GID for the given transaction.
 
370
 */
 
371
GlobalTransaction
 
372
MarkAsPreparing(TransactionId xid, const char *gid,
 
373
                                TimestampTz prepared_at, Oid owner, Oid databaseid)
 
374
{
 
375
        GlobalTransaction gxact;
 
376
        int                     i;
 
377
 
 
378
        if (strlen(gid) >= GIDSIZE)
 
379
                ereport(ERROR,
 
380
                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 
381
                                 errmsg("transaction identifier \"%s\" is too long",
 
382
                                                gid)));
 
383
 
 
384
        /* fail immediately if feature is disabled */
 
385
        if (max_prepared_xacts == 0)
 
386
                ereport(ERROR,
 
387
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 
388
                                 errmsg("prepared transactions are disabled"),
 
389
                                 errhint("Set max_prepared_transactions to a nonzero value.")));
 
390
 
 
391
        /* on first call, register the exit hook */
 
392
        if (!twophaseExitRegistered)
 
393
        {
 
394
                before_shmem_exit(AtProcExit_Twophase, 0);
 
395
                twophaseExitRegistered = true;
 
396
        }
 
397
 
 
398
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
399
 
 
400
        /* Check for conflicting GID */
 
401
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 
402
        {
 
403
                gxact = TwoPhaseState->prepXacts[i];
 
404
                if (strcmp(gxact->gid, gid) == 0)
 
405
                {
 
406
                        ereport(ERROR,
 
407
                                        (errcode(ERRCODE_DUPLICATE_OBJECT),
 
408
                                         errmsg("transaction identifier \"%s\" is already in use",
 
409
                                                        gid)));
 
410
                }
 
411
        }
 
412
 
 
413
        /* Get a free gxact from the freelist */
 
414
        if (TwoPhaseState->freeGXacts == NULL)
 
415
                ereport(ERROR,
 
416
                                (errcode(ERRCODE_OUT_OF_MEMORY),
 
417
                                 errmsg("maximum number of prepared transactions reached"),
 
418
                                 errhint("Increase max_prepared_transactions (currently %d).",
 
419
                                                 max_prepared_xacts)));
 
420
        gxact = TwoPhaseState->freeGXacts;
 
421
        TwoPhaseState->freeGXacts = gxact->next;
 
422
 
 
423
        MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
 
424
 
 
425
        gxact->ondisk = false;
 
426
 
 
427
        /* And insert it into the active array */
 
428
        Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
 
429
        TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
 
430
 
 
431
        LWLockRelease(TwoPhaseStateLock);
 
432
 
 
433
        return gxact;
 
434
}
 
435
 
 
436
/*
 
437
 * MarkAsPreparingGuts
 
438
 *
 
439
 * This uses a gxact struct and puts it into the active array.
 
440
 * NOTE: this is also used when reloading a gxact after a crash; so avoid
 
441
 * assuming that we can use very much backend context.
 
442
 *
 
443
 * Note: This function should be called with appropriate locks held.
 
444
 */
 
445
static void
 
446
MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
 
447
                                        TimestampTz prepared_at, Oid owner, Oid databaseid)
 
448
{
 
449
        PGPROC     *proc;
 
450
        PGXACT     *pgxact;
 
451
        int                     i;
 
452
 
 
453
        Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 
454
 
 
455
        Assert(gxact != NULL);
 
456
        proc = &ProcGlobal->allProcs[gxact->pgprocno];
 
457
        pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
458
 
 
459
        /* Initialize the PGPROC entry */
 
460
        MemSet(proc, 0, sizeof(PGPROC));
 
461
        proc->pgprocno = gxact->pgprocno;
 
462
        SHMQueueElemInit(&(proc->links));
 
463
        proc->waitStatus = STATUS_OK;
 
464
        /* We set up the gxact's VXID as InvalidBackendId/XID */
 
465
        proc->lxid = (LocalTransactionId) xid;
 
466
        pgxact->xid = xid;
 
467
        pgxact->xmin = InvalidTransactionId;
 
468
        pgxact->delayChkpt = false;
 
469
        pgxact->vacuumFlags = 0;
 
470
        proc->pid = 0;
 
471
        proc->backendId = InvalidBackendId;
 
472
        proc->databaseId = databaseid;
 
473
        proc->roleId = owner;
 
474
        proc->isBackgroundWorker = false;
 
475
        proc->lwWaiting = false;
 
476
        proc->lwWaitMode = 0;
 
477
        proc->waitLock = NULL;
 
478
        proc->waitProcLock = NULL;
 
479
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
 
480
                SHMQueueInit(&(proc->myProcLocks[i]));
 
481
        /* subxid data must be filled later by GXactLoadSubxactData */
 
482
        pgxact->overflowed = false;
 
483
        pgxact->nxids = 0;
 
484
 
 
485
        gxact->prepared_at = prepared_at;
 
486
        gxact->xid = xid;
 
487
        gxact->owner = owner;
 
488
        gxact->locking_backend = MyBackendId;
 
489
        gxact->valid = false;
 
490
        gxact->inredo = false;
 
491
        strcpy(gxact->gid, gid);
 
492
 
 
493
        /*
 
494
         * Remember that we have this GlobalTransaction entry locked for us. If we
 
495
         * abort after this, we must release it.
 
496
         */
 
497
        MyLockedGxact = gxact;
 
498
}
 
499
 
 
500
/*
 
501
 * GXactLoadSubxactData
 
502
 *
 
503
 * If the transaction being persisted had any subtransactions, this must
 
504
 * be called before MarkAsPrepared() to load information into the dummy
 
505
 * PGPROC.
 
506
 */
 
507
static void
 
508
GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
 
509
                                         TransactionId *children)
 
510
{
 
511
        PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
 
512
        PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
513
 
 
514
        /* We need no extra lock since the GXACT isn't valid yet */
 
515
        if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
 
516
        {
 
517
                pgxact->overflowed = true;
 
518
                nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
 
519
        }
 
520
        if (nsubxacts > 0)
 
521
        {
 
522
                memcpy(proc->subxids.xids, children,
 
523
                           nsubxacts * sizeof(TransactionId));
 
524
                pgxact->nxids = nsubxacts;
 
525
        }
 
526
}
 
527
 
 
528
/*
 
529
 * MarkAsPrepared
 
530
 *              Mark the GXACT as fully valid, and enter it into the global ProcArray.
 
531
 *
 
532
 * lock_held indicates whether caller already holds TwoPhaseStateLock.
 
533
 */
 
534
static void
 
535
MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
 
536
{
 
537
        /* Lock here may be overkill, but I'm not convinced of that ... */
 
538
        if (!lock_held)
 
539
                LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
540
        Assert(!gxact->valid);
 
541
        gxact->valid = true;
 
542
        if (!lock_held)
 
543
                LWLockRelease(TwoPhaseStateLock);
 
544
 
 
545
        /*
 
546
         * Put it into the global ProcArray so TransactionIdIsInProgress considers
 
547
         * the XID as still running.
 
548
         */
 
549
        ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
 
550
}
 
551
 
 
552
/*
 
553
 * LockGXact
 
554
 *              Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
 
555
 */
 
556
static GlobalTransaction
 
557
LockGXact(const char *gid, Oid user)
 
558
{
 
559
        int                     i;
 
560
 
 
561
        /* on first call, register the exit hook */
 
562
        if (!twophaseExitRegistered)
 
563
        {
 
564
                before_shmem_exit(AtProcExit_Twophase, 0);
 
565
                twophaseExitRegistered = true;
 
566
        }
 
567
 
 
568
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
569
 
 
570
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 
571
        {
 
572
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
573
                PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
 
574
 
 
575
                /* Ignore not-yet-valid GIDs */
 
576
                if (!gxact->valid)
 
577
                        continue;
 
578
                if (strcmp(gxact->gid, gid) != 0)
 
579
                        continue;
 
580
 
 
581
                /* Found it, but has someone else got it locked? */
 
582
                if (gxact->locking_backend != InvalidBackendId)
 
583
                        ereport(ERROR,
 
584
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 
585
                                         errmsg("prepared transaction with identifier \"%s\" is busy",
 
586
                                                        gid)));
 
587
 
 
588
                if (user != gxact->owner && !superuser_arg(user))
 
589
                        ereport(ERROR,
 
590
                                        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 
591
                                         errmsg("permission denied to finish prepared transaction"),
 
592
                                         errhint("Must be superuser or the user that prepared the transaction.")));
 
593
 
 
594
                /*
 
595
                 * Note: it probably would be possible to allow committing from
 
596
                 * another database; but at the moment NOTIFY is known not to work and
 
597
                 * there may be some other issues as well.  Hence disallow until
 
598
                 * someone gets motivated to make it work.
 
599
                 */
 
600
                if (MyDatabaseId != proc->databaseId)
 
601
                        ereport(ERROR,
 
602
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
603
                                         errmsg("prepared transaction belongs to another database"),
 
604
                                         errhint("Connect to the database where the transaction was prepared to finish it.")));
 
605
 
 
606
                /* OK for me to lock it */
 
607
                gxact->locking_backend = MyBackendId;
 
608
                MyLockedGxact = gxact;
 
609
 
 
610
                LWLockRelease(TwoPhaseStateLock);
 
611
 
 
612
                return gxact;
 
613
        }
 
614
 
 
615
        LWLockRelease(TwoPhaseStateLock);
 
616
 
 
617
        ereport(ERROR,
 
618
                        (errcode(ERRCODE_UNDEFINED_OBJECT),
 
619
                         errmsg("prepared transaction with identifier \"%s\" does not exist",
 
620
                                        gid)));
 
621
 
 
622
        /* NOTREACHED */
 
623
        return NULL;
 
624
}
 
625
 
 
626
/*
 
627
 * RemoveGXact
 
628
 *              Remove the prepared transaction from the shared memory array.
 
629
 *
 
630
 * NB: caller should have already removed it from ProcArray
 
631
 */
 
632
static void
 
633
RemoveGXact(GlobalTransaction gxact)
 
634
{
 
635
        int                     i;
 
636
 
 
637
        Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 
638
 
 
639
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 
640
        {
 
641
                if (gxact == TwoPhaseState->prepXacts[i])
 
642
                {
 
643
                        /* remove from the active array */
 
644
                        TwoPhaseState->numPrepXacts--;
 
645
                        TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
 
646
 
 
647
                        /* and put it back in the freelist */
 
648
                        gxact->next = TwoPhaseState->freeGXacts;
 
649
                        TwoPhaseState->freeGXacts = gxact;
 
650
 
 
651
                        return;
 
652
                }
 
653
        }
 
654
 
 
655
        elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
 
656
}
 
657
 
 
658
/*
 
659
 * Returns an array of all prepared transactions for the user-level
 
660
 * function pg_prepared_xact.
 
661
 *
 
662
 * The returned array and all its elements are copies of internal data
 
663
 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
 
664
 *
 
665
 * WARNING -- we return even those transactions that are not fully prepared
 
666
 * yet.  The caller should filter them out if he doesn't want them.
 
667
 *
 
668
 * The returned array is palloc'd.
 
669
 */
 
670
static int
 
671
GetPreparedTransactionList(GlobalTransaction *gxacts)
 
672
{
 
673
        GlobalTransaction array;
 
674
        int                     num;
 
675
        int                     i;
 
676
 
 
677
        LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 
678
 
 
679
        if (TwoPhaseState->numPrepXacts == 0)
 
680
        {
 
681
                LWLockRelease(TwoPhaseStateLock);
 
682
 
 
683
                *gxacts = NULL;
 
684
                return 0;
 
685
        }
 
686
 
 
687
        num = TwoPhaseState->numPrepXacts;
 
688
        array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
 
689
        *gxacts = array;
 
690
        for (i = 0; i < num; i++)
 
691
                memcpy(array + i, TwoPhaseState->prepXacts[i],
 
692
                           sizeof(GlobalTransactionData));
 
693
 
 
694
        LWLockRelease(TwoPhaseStateLock);
 
695
 
 
696
        return num;
 
697
}
 
698
 
 
699
 
 
700
/* Working status for pg_prepared_xact */
 
701
typedef struct
 
702
{
 
703
        GlobalTransaction array;
 
704
        int                     ngxacts;
 
705
        int                     currIdx;
 
706
} Working_State;
 
707
 
 
708
/*
 
709
 * pg_prepared_xact
 
710
 *              Produce a view with one row per prepared transaction.
 
711
 *
 
712
 * This function is here so we don't have to export the
 
713
 * GlobalTransactionData struct definition.
 
714
 */
 
715
Datum
 
716
pg_prepared_xact(PG_FUNCTION_ARGS)
 
717
{
 
718
        FuncCallContext *funcctx;
 
719
        Working_State *status;
 
720
 
 
721
        if (SRF_IS_FIRSTCALL())
 
722
        {
 
723
                TupleDesc       tupdesc;
 
724
                MemoryContext oldcontext;
 
725
 
 
726
                /* create a function context for cross-call persistence */
 
727
                funcctx = SRF_FIRSTCALL_INIT();
 
728
 
 
729
                /*
 
730
                 * Switch to memory context appropriate for multiple function calls
 
731
                 */
 
732
                oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
733
 
 
734
                /* build tupdesc for result tuples */
 
735
                /* this had better match pg_prepared_xacts view in system_views.sql */
 
736
                tupdesc = CreateTemplateTupleDesc(5, false);
 
737
                TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
 
738
                                                   XIDOID, -1, 0);
 
739
                TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
 
740
                                                   TEXTOID, -1, 0);
 
741
                TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
 
742
                                                   TIMESTAMPTZOID, -1, 0);
 
743
                TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
 
744
                                                   OIDOID, -1, 0);
 
745
                TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
 
746
                                                   OIDOID, -1, 0);
 
747
 
 
748
                funcctx->tuple_desc = BlessTupleDesc(tupdesc);
 
749
 
 
750
                /*
 
751
                 * Collect all the 2PC status information that we will format and send
 
752
                 * out as a result set.
 
753
                 */
 
754
                status = (Working_State *) palloc(sizeof(Working_State));
 
755
                funcctx->user_fctx = (void *) status;
 
756
 
 
757
                status->ngxacts = GetPreparedTransactionList(&status->array);
 
758
                status->currIdx = 0;
 
759
 
 
760
                MemoryContextSwitchTo(oldcontext);
 
761
        }
 
762
 
 
763
        funcctx = SRF_PERCALL_SETUP();
 
764
        status = (Working_State *) funcctx->user_fctx;
 
765
 
 
766
        while (status->array != NULL && status->currIdx < status->ngxacts)
 
767
        {
 
768
                GlobalTransaction gxact = &status->array[status->currIdx++];
 
769
                PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
 
770
                PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
771
                Datum           values[5];
 
772
                bool            nulls[5];
 
773
                HeapTuple       tuple;
 
774
                Datum           result;
 
775
 
 
776
                if (!gxact->valid)
 
777
                        continue;
 
778
 
 
779
                /*
 
780
                 * Form tuple with appropriate data.
 
781
                 */
 
782
                MemSet(values, 0, sizeof(values));
 
783
                MemSet(nulls, 0, sizeof(nulls));
 
784
 
 
785
                values[0] = TransactionIdGetDatum(pgxact->xid);
 
786
                values[1] = CStringGetTextDatum(gxact->gid);
 
787
                values[2] = TimestampTzGetDatum(gxact->prepared_at);
 
788
                values[3] = ObjectIdGetDatum(gxact->owner);
 
789
                values[4] = ObjectIdGetDatum(proc->databaseId);
 
790
 
 
791
                tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
 
792
                result = HeapTupleGetDatum(tuple);
 
793
                SRF_RETURN_NEXT(funcctx, result);
 
794
        }
 
795
 
 
796
        SRF_RETURN_DONE(funcctx);
 
797
}
 
798
 
 
799
/*
 
800
 * TwoPhaseGetGXact
 
801
 *              Get the GlobalTransaction struct for a prepared transaction
 
802
 *              specified by XID
 
803
 */
 
804
static GlobalTransaction
 
805
TwoPhaseGetGXact(TransactionId xid)
 
806
{
 
807
        GlobalTransaction result = NULL;
 
808
        int                     i;
 
809
 
 
810
        static TransactionId cached_xid = InvalidTransactionId;
 
811
        static GlobalTransaction cached_gxact = NULL;
 
812
 
 
813
        /*
 
814
         * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
 
815
         * repeatedly for the same XID.  We can save work with a simple cache.
 
816
         */
 
817
        if (xid == cached_xid)
 
818
                return cached_gxact;
 
819
 
 
820
        LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 
821
 
 
822
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 
823
        {
 
824
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
825
                PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
826
 
 
827
                if (pgxact->xid == xid)
 
828
                {
 
829
                        result = gxact;
 
830
                        break;
 
831
                }
 
832
        }
 
833
 
 
834
        LWLockRelease(TwoPhaseStateLock);
 
835
 
 
836
        if (result == NULL)                     /* should not happen */
 
837
                elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
 
838
 
 
839
        cached_xid = xid;
 
840
        cached_gxact = result;
 
841
 
 
842
        return result;
 
843
}
 
844
 
 
845
/*
 
846
 * TwoPhaseGetDummyProc
 
847
 *              Get the dummy backend ID for prepared transaction specified by XID
 
848
 *
 
849
 * Dummy backend IDs are similar to real backend IDs of real backends.
 
850
 * They start at MaxBackends + 1, and are unique across all currently active
 
851
 * real backends and prepared transactions.
 
852
 */
 
853
BackendId
 
854
TwoPhaseGetDummyBackendId(TransactionId xid)
 
855
{
 
856
        GlobalTransaction gxact = TwoPhaseGetGXact(xid);
 
857
 
 
858
        return gxact->dummyBackendId;
 
859
}
 
860
 
 
861
/*
 
862
 * TwoPhaseGetDummyProc
 
863
 *              Get the PGPROC that represents a prepared transaction specified by XID
 
864
 */
 
865
PGPROC *
 
866
TwoPhaseGetDummyProc(TransactionId xid)
 
867
{
 
868
        GlobalTransaction gxact = TwoPhaseGetGXact(xid);
 
869
 
 
870
        return &ProcGlobal->allProcs[gxact->pgprocno];
 
871
}
 
872
 
 
873
/************************************************************************/
 
874
/* State file support                                                                                                   */
 
875
/************************************************************************/
 
876
 
 
877
#define TwoPhaseFilePath(path, xid) \
 
878
        snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
 
879
 
 
880
/*
 
881
 * 2PC state file format:
 
882
 *
 
883
 *      1. TwoPhaseFileHeader
 
884
 *      2. TransactionId[] (subtransactions)
 
885
 *      3. RelFileNode[] (files to be deleted at commit)
 
886
 *      4. RelFileNode[] (files to be deleted at abort)
 
887
 *      5. SharedInvalidationMessage[] (inval messages to be sent at commit)
 
888
 *      6. TwoPhaseRecordOnDisk
 
889
 *      7. ...
 
890
 *      8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
 
891
 *      9. checksum (CRC-32C)
 
892
 *
 
893
 * Each segment except the final checksum is MAXALIGN'd.
 
894
 */
 
895
 
 
896
/*
 
897
 * Header for a 2PC state file
 
898
 */
 
899
#define TWOPHASE_MAGIC  0x57F94534      /* format identifier */
 
900
 
 
901
typedef struct TwoPhaseFileHeader
 
902
{
 
903
        uint32          magic;                  /* format identifier */
 
904
        uint32          total_len;              /* actual file length */
 
905
        TransactionId xid;                      /* original transaction XID */
 
906
        Oid                     database;               /* OID of database it was in */
 
907
        TimestampTz prepared_at;        /* time of preparation */
 
908
        Oid                     owner;                  /* user running the transaction */
 
909
        int32           nsubxacts;              /* number of following subxact XIDs */
 
910
        int32           ncommitrels;    /* number of delete-on-commit rels */
 
911
        int32           nabortrels;             /* number of delete-on-abort rels */
 
912
        int32           ninvalmsgs;             /* number of cache invalidation messages */
 
913
        bool            initfileinval;  /* does relcache init file need invalidation? */
 
914
        uint16          gidlen;                 /* length of the GID - GID follows the header */
 
915
        XLogRecPtr      origin_lsn;             /* lsn of this record at origin node */
 
916
        TimestampTz origin_timestamp;   /* time of prepare at origin node */
 
917
} TwoPhaseFileHeader;
 
918
 
 
919
/*
 
920
 * Header for each record in a state file
 
921
 *
 
922
 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
 
923
 * The rmgr data will be stored starting on a MAXALIGN boundary.
 
924
 */
 
925
typedef struct TwoPhaseRecordOnDisk
 
926
{
 
927
        uint32          len;                    /* length of rmgr data */
 
928
        TwoPhaseRmgrId rmid;            /* resource manager for this record */
 
929
        uint16          info;                   /* flag bits for use by rmgr */
 
930
} TwoPhaseRecordOnDisk;
 
931
 
 
932
/*
 
933
 * During prepare, the state file is assembled in memory before writing it
 
934
 * to WAL and the actual state file.  We use a chain of StateFileChunk blocks
 
935
 * for that.
 
936
 */
 
937
typedef struct StateFileChunk
 
938
{
 
939
        char       *data;
 
940
        uint32          len;
 
941
        struct StateFileChunk *next;
 
942
} StateFileChunk;
 
943
 
 
944
static struct xllist
 
945
{
 
946
        StateFileChunk *head;           /* first data block in the chain */
 
947
        StateFileChunk *tail;           /* last block in chain */
 
948
        uint32          num_chunks;
 
949
        uint32          bytes_free;             /* free bytes left in tail block */
 
950
        uint32          total_len;              /* total data bytes in chain */
 
951
}                       records;
 
952
 
 
953
 
 
954
/*
 
955
 * Append a block of data to records data structure.
 
956
 *
 
957
 * NB: each block is padded to a MAXALIGN multiple.  This must be
 
958
 * accounted for when the file is later read!
 
959
 *
 
960
 * The data is copied, so the caller is free to modify it afterwards.
 
961
 */
 
962
static void
 
963
save_state_data(const void *data, uint32 len)
 
964
{
 
965
        uint32          padlen = MAXALIGN(len);
 
966
 
 
967
        if (padlen > records.bytes_free)
 
968
        {
 
969
                records.tail->next = palloc0(sizeof(StateFileChunk));
 
970
                records.tail = records.tail->next;
 
971
                records.tail->len = 0;
 
972
                records.tail->next = NULL;
 
973
                records.num_chunks++;
 
974
 
 
975
                records.bytes_free = Max(padlen, 512);
 
976
                records.tail->data = palloc(records.bytes_free);
 
977
        }
 
978
 
 
979
        memcpy(((char *) records.tail->data) + records.tail->len, data, len);
 
980
        records.tail->len += padlen;
 
981
        records.bytes_free -= padlen;
 
982
        records.total_len += padlen;
 
983
}
 
984
 
 
985
/*
 
986
 * Start preparing a state file.
 
987
 *
 
988
 * Initializes data structure and inserts the 2PC file header record.
 
989
 */
 
990
void
 
991
StartPrepare(GlobalTransaction gxact)
 
992
{
 
993
        PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
 
994
        PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
995
        TransactionId xid = pgxact->xid;
 
996
        TwoPhaseFileHeader hdr;
 
997
        TransactionId *children;
 
998
        RelFileNode *commitrels;
 
999
        RelFileNode *abortrels;
 
1000
        SharedInvalidationMessage *invalmsgs;
 
1001
 
 
1002
        /* Initialize linked list */
 
1003
        records.head = palloc0(sizeof(StateFileChunk));
 
1004
        records.head->len = 0;
 
1005
        records.head->next = NULL;
 
1006
 
 
1007
        records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
 
1008
        records.head->data = palloc(records.bytes_free);
 
1009
 
 
1010
        records.tail = records.head;
 
1011
        records.num_chunks = 1;
 
1012
 
 
1013
        records.total_len = 0;
 
1014
 
 
1015
        /* Create header */
 
1016
        hdr.magic = TWOPHASE_MAGIC;
 
1017
        hdr.total_len = 0;                      /* EndPrepare will fill this in */
 
1018
        hdr.xid = xid;
 
1019
        hdr.database = proc->databaseId;
 
1020
        hdr.prepared_at = gxact->prepared_at;
 
1021
        hdr.owner = gxact->owner;
 
1022
        hdr.nsubxacts = xactGetCommittedChildren(&children);
 
1023
        hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
 
1024
        hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
 
1025
        hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
 
1026
                                                                                                                  &hdr.initfileinval);
 
1027
        hdr.gidlen = strlen(gxact->gid) + 1;    /* Include '\0' */
 
1028
 
 
1029
        save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
 
1030
        save_state_data(gxact->gid, hdr.gidlen);
 
1031
 
 
1032
        /*
 
1033
         * Add the additional info about subxacts, deletable files and cache
 
1034
         * invalidation messages.
 
1035
         */
 
1036
        if (hdr.nsubxacts > 0)
 
1037
        {
 
1038
                save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
 
1039
                /* While we have the child-xact data, stuff it in the gxact too */
 
1040
                GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
 
1041
        }
 
1042
        if (hdr.ncommitrels > 0)
 
1043
        {
 
1044
                save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
 
1045
                pfree(commitrels);
 
1046
        }
 
1047
        if (hdr.nabortrels > 0)
 
1048
        {
 
1049
                save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
 
1050
                pfree(abortrels);
 
1051
        }
 
1052
        if (hdr.ninvalmsgs > 0)
 
1053
        {
 
1054
                save_state_data(invalmsgs,
 
1055
                                                hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
 
1056
                pfree(invalmsgs);
 
1057
        }
 
1058
}
 
1059
 
 
1060
/*
 
1061
 * Finish preparing state data and writing it to WAL.
 
1062
 */
 
1063
void
 
1064
EndPrepare(GlobalTransaction gxact)
 
1065
{
 
1066
        TwoPhaseFileHeader *hdr;
 
1067
        StateFileChunk *record;
 
1068
        bool            replorigin;
 
1069
 
 
1070
        /* Add the end sentinel to the list of 2PC records */
 
1071
        RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
 
1072
                                                   NULL, 0);
 
1073
 
 
1074
        /* Go back and fill in total_len in the file header record */
 
1075
        hdr = (TwoPhaseFileHeader *) records.head->data;
 
1076
        Assert(hdr->magic == TWOPHASE_MAGIC);
 
1077
        hdr->total_len = records.total_len + sizeof(pg_crc32c);
 
1078
 
 
1079
        replorigin = (replorigin_session_origin != InvalidRepOriginId &&
 
1080
                                  replorigin_session_origin != DoNotReplicateId);
 
1081
 
 
1082
        if (replorigin)
 
1083
        {
 
1084
                Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
 
1085
                hdr->origin_lsn = replorigin_session_origin_lsn;
 
1086
                hdr->origin_timestamp = replorigin_session_origin_timestamp;
 
1087
        }
 
1088
        else
 
1089
        {
 
1090
                hdr->origin_lsn = InvalidXLogRecPtr;
 
1091
                hdr->origin_timestamp = 0;
 
1092
        }
 
1093
 
 
1094
        /*
 
1095
         * If the data size exceeds MaxAllocSize, we won't be able to read it in
 
1096
         * ReadTwoPhaseFile. Check for that now, rather than fail in the case
 
1097
         * where we write data to file and then re-read at commit time.
 
1098
         */
 
1099
        if (hdr->total_len > MaxAllocSize)
 
1100
                ereport(ERROR,
 
1101
                                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 
1102
                                 errmsg("two-phase state file maximum length exceeded")));
 
1103
 
 
1104
        /*
 
1105
         * Now writing 2PC state data to WAL. We let the WAL's CRC protection
 
1106
         * cover us, so no need to calculate a separate CRC.
 
1107
         *
 
1108
         * We have to set delayChkpt here, too; otherwise a checkpoint starting
 
1109
         * immediately after the WAL record is inserted could complete without
 
1110
         * fsync'ing our state file.  (This is essentially the same kind of race
 
1111
         * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
 
1112
         * uses delayChkpt for; see notes there.)
 
1113
         *
 
1114
         * We save the PREPARE record's location in the gxact for later use by
 
1115
         * CheckPointTwoPhase.
 
1116
         */
 
1117
        XLogEnsureRecordSpace(0, records.num_chunks);
 
1118
 
 
1119
        START_CRIT_SECTION();
 
1120
 
 
1121
        MyPgXact->delayChkpt = true;
 
1122
 
 
1123
        XLogBeginInsert();
 
1124
        for (record = records.head; record != NULL; record = record->next)
 
1125
                XLogRegisterData(record->data, record->len);
 
1126
 
 
1127
        XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
1128
 
 
1129
        gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
 
1130
 
 
1131
        if (replorigin)
 
1132
        {
 
1133
                /* Move LSNs forward for this replication origin */
 
1134
                replorigin_session_advance(replorigin_session_origin_lsn,
 
1135
                                                                   gxact->prepare_end_lsn);
 
1136
        }
 
1137
 
 
1138
        XLogFlush(gxact->prepare_end_lsn);
 
1139
 
 
1140
        /* If we crash now, we have prepared: WAL replay will fix things */
 
1141
 
 
1142
        /* Store record's start location to read that later on Commit */
 
1143
        gxact->prepare_start_lsn = ProcLastRecPtr;
 
1144
 
 
1145
        /*
 
1146
         * Mark the prepared transaction as valid.  As soon as xact.c marks
 
1147
         * MyPgXact as not running our XID (which it will do immediately after
 
1148
         * this function returns), others can commit/rollback the xact.
 
1149
         *
 
1150
         * NB: a side effect of this is to make a dummy ProcArray entry for the
 
1151
         * prepared XID.  This must happen before we clear the XID from MyPgXact,
 
1152
         * else there is a window where the XID is not running according to
 
1153
         * TransactionIdIsInProgress, and onlookers would be entitled to assume
 
1154
         * the xact crashed.  Instead we have a window where the same XID appears
 
1155
         * twice in ProcArray, which is OK.
 
1156
         */
 
1157
        MarkAsPrepared(gxact, false);
 
1158
 
 
1159
        /*
 
1160
         * Now we can mark ourselves as out of the commit critical section: a
 
1161
         * checkpoint starting after this will certainly see the gxact as a
 
1162
         * candidate for fsyncing.
 
1163
         */
 
1164
        MyPgXact->delayChkpt = false;
 
1165
 
 
1166
        /*
 
1167
         * Remember that we have this GlobalTransaction entry locked for us.  If
 
1168
         * we crash after this point, it's too late to abort, but we must unlock
 
1169
         * it so that the prepared transaction can be committed or rolled back.
 
1170
         */
 
1171
        MyLockedGxact = gxact;
 
1172
 
 
1173
        END_CRIT_SECTION();
 
1174
 
 
1175
        /*
 
1176
         * Wait for synchronous replication, if required.
 
1177
         *
 
1178
         * Note that at this stage we have marked the prepare, but still show as
 
1179
         * running in the procarray (twice!) and continue to hold locks.
 
1180
         */
 
1181
        SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
 
1182
 
 
1183
        records.tail = records.head = NULL;
 
1184
        records.num_chunks = 0;
 
1185
}
 
1186
 
 
1187
/*
 
1188
 * Register a 2PC record to be written to state file.
 
1189
 */
 
1190
void
 
1191
RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
 
1192
                                           const void *data, uint32 len)
 
1193
{
 
1194
        TwoPhaseRecordOnDisk record;
 
1195
 
 
1196
        record.rmid = rmid;
 
1197
        record.info = info;
 
1198
        record.len = len;
 
1199
        save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
 
1200
        if (len > 0)
 
1201
                save_state_data(data, len);
 
1202
}
 
1203
 
 
1204
 
 
1205
/*
 
1206
 * Read and validate the state file for xid.
 
1207
 *
 
1208
 * If it looks OK (has a valid magic number and CRC), return the palloc'd
 
1209
 * contents of the file.  Otherwise return NULL.
 
1210
 */
 
1211
static char *
 
1212
ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 
1213
{
 
1214
        char            path[MAXPGPATH];
 
1215
        char       *buf;
 
1216
        TwoPhaseFileHeader *hdr;
 
1217
        int                     fd;
 
1218
        struct stat stat;
 
1219
        uint32          crc_offset;
 
1220
        pg_crc32c       calc_crc,
 
1221
                                file_crc;
 
1222
 
 
1223
        TwoPhaseFilePath(path, xid);
 
1224
 
 
1225
        fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
 
1226
        if (fd < 0)
 
1227
        {
 
1228
                if (give_warnings)
 
1229
                        ereport(WARNING,
 
1230
                                        (errcode_for_file_access(),
 
1231
                                         errmsg("could not open two-phase state file \"%s\": %m",
 
1232
                                                        path)));
 
1233
                return NULL;
 
1234
        }
 
1235
 
 
1236
        /*
 
1237
         * Check file length.  We can determine a lower bound pretty easily. We
 
1238
         * set an upper bound to avoid palloc() failure on a corrupt file, though
 
1239
         * we can't guarantee that we won't get an out of memory error anyway,
 
1240
         * even on a valid file.
 
1241
         */
 
1242
        if (fstat(fd, &stat))
 
1243
        {
 
1244
                CloseTransientFile(fd);
 
1245
                if (give_warnings)
 
1246
                        ereport(WARNING,
 
1247
                                        (errcode_for_file_access(),
 
1248
                                         errmsg("could not stat two-phase state file \"%s\": %m",
 
1249
                                                        path)));
 
1250
                return NULL;
 
1251
        }
 
1252
 
 
1253
        if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
 
1254
                                                MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
 
1255
                                                sizeof(pg_crc32c)) ||
 
1256
                stat.st_size > MaxAllocSize)
 
1257
        {
 
1258
                CloseTransientFile(fd);
 
1259
                return NULL;
 
1260
        }
 
1261
 
 
1262
        crc_offset = stat.st_size - sizeof(pg_crc32c);
 
1263
        if (crc_offset != MAXALIGN(crc_offset))
 
1264
        {
 
1265
                CloseTransientFile(fd);
 
1266
                return NULL;
 
1267
        }
 
1268
 
 
1269
        /*
 
1270
         * OK, slurp in the file.
 
1271
         */
 
1272
        buf = (char *) palloc(stat.st_size);
 
1273
 
 
1274
        pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
 
1275
        if (read(fd, buf, stat.st_size) != stat.st_size)
 
1276
        {
 
1277
                pgstat_report_wait_end();
 
1278
                CloseTransientFile(fd);
 
1279
                if (give_warnings)
 
1280
                        ereport(WARNING,
 
1281
                                        (errcode_for_file_access(),
 
1282
                                         errmsg("could not read two-phase state file \"%s\": %m",
 
1283
                                                        path)));
 
1284
                pfree(buf);
 
1285
                return NULL;
 
1286
        }
 
1287
 
 
1288
        pgstat_report_wait_end();
 
1289
        CloseTransientFile(fd);
 
1290
 
 
1291
        hdr = (TwoPhaseFileHeader *) buf;
 
1292
        if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
 
1293
        {
 
1294
                pfree(buf);
 
1295
                return NULL;
 
1296
        }
 
1297
 
 
1298
        INIT_CRC32C(calc_crc);
 
1299
        COMP_CRC32C(calc_crc, buf, crc_offset);
 
1300
        FIN_CRC32C(calc_crc);
 
1301
 
 
1302
        file_crc = *((pg_crc32c *) (buf + crc_offset));
 
1303
 
 
1304
        if (!EQ_CRC32C(calc_crc, file_crc))
 
1305
        {
 
1306
                pfree(buf);
 
1307
                return NULL;
 
1308
        }
 
1309
 
 
1310
        return buf;
 
1311
}
 
1312
 
 
1313
/*
 
1314
 * ParsePrepareRecord
 
1315
 */
 
1316
void
 
1317
ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
 
1318
{
 
1319
        TwoPhaseFileHeader *hdr;
 
1320
        char       *bufptr;
 
1321
 
 
1322
        hdr = (TwoPhaseFileHeader *) xlrec;
 
1323
        bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
 
1324
 
 
1325
        parsed->origin_lsn = hdr->origin_lsn;
 
1326
        parsed->origin_timestamp = hdr->origin_timestamp;
 
1327
        parsed->twophase_xid = hdr->xid;
 
1328
        parsed->dbId = hdr->database;
 
1329
        parsed->nsubxacts = hdr->nsubxacts;
 
1330
        parsed->nrels = hdr->ncommitrels;
 
1331
        parsed->nabortrels = hdr->nabortrels;
 
1332
        parsed->nmsgs = hdr->ninvalmsgs;
 
1333
 
 
1334
        strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
 
1335
        bufptr += MAXALIGN(hdr->gidlen);
 
1336
 
 
1337
        parsed->subxacts = (TransactionId *) bufptr;
 
1338
        bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
 
1339
 
 
1340
        parsed->xnodes = (RelFileNode *) bufptr;
 
1341
        bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
 
1342
 
 
1343
        parsed->abortnodes = (RelFileNode *) bufptr;
 
1344
        bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
 
1345
 
 
1346
        parsed->msgs = (SharedInvalidationMessage *) bufptr;
 
1347
        bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
 
1348
}
 
1349
 
 
1350
 
 
1351
 
 
1352
/*
 
1353
 * Reads 2PC data from xlog. During checkpoint this data will be moved to
 
1354
 * twophase files and ReadTwoPhaseFile should be used instead.
 
1355
 *
 
1356
 * Note clearly that this function can access WAL during normal operation,
 
1357
 * similarly to the way WALSender or Logical Decoding would do.
 
1358
 *
 
1359
 */
 
1360
static void
 
1361
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 
1362
{
 
1363
        XLogRecord *record;
 
1364
        XLogReaderState *xlogreader;
 
1365
        char       *errormsg;
 
1366
 
 
1367
        xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
 
1368
                                                                        NULL);
 
1369
        if (!xlogreader)
 
1370
                ereport(ERROR,
 
1371
                                (errcode(ERRCODE_OUT_OF_MEMORY),
 
1372
                                 errmsg("out of memory"),
 
1373
                                 errdetail("Failed while allocating a WAL reading processor.")));
 
1374
 
 
1375
        record = XLogReadRecord(xlogreader, lsn, &errormsg);
 
1376
        if (record == NULL)
 
1377
                ereport(ERROR,
 
1378
                                (errcode_for_file_access(),
 
1379
                                 errmsg("could not read two-phase state from WAL at %X/%X",
 
1380
                                                (uint32) (lsn >> 32),
 
1381
                                                (uint32) lsn)));
 
1382
 
 
1383
        if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
 
1384
                (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
 
1385
                ereport(ERROR,
 
1386
                                (errcode_for_file_access(),
 
1387
                                 errmsg("expected two-phase state data is not present in WAL at %X/%X",
 
1388
                                                (uint32) (lsn >> 32),
 
1389
                                                (uint32) lsn)));
 
1390
 
 
1391
        if (len != NULL)
 
1392
                *len = XLogRecGetDataLen(xlogreader);
 
1393
 
 
1394
        *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
 
1395
        memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
 
1396
 
 
1397
        XLogReaderFree(xlogreader);
 
1398
}
 
1399
 
 
1400
 
 
1401
/*
 
1402
 * Confirms an xid is prepared, during recovery
 
1403
 */
 
1404
bool
 
1405
StandbyTransactionIdIsPrepared(TransactionId xid)
 
1406
{
 
1407
        char       *buf;
 
1408
        TwoPhaseFileHeader *hdr;
 
1409
        bool            result;
 
1410
 
 
1411
        Assert(TransactionIdIsValid(xid));
 
1412
 
 
1413
        if (max_prepared_xacts <= 0)
 
1414
                return false;                   /* nothing to do */
 
1415
 
 
1416
        /* Read and validate file */
 
1417
        buf = ReadTwoPhaseFile(xid, false);
 
1418
        if (buf == NULL)
 
1419
                return false;
 
1420
 
 
1421
        /* Check header also */
 
1422
        hdr = (TwoPhaseFileHeader *) buf;
 
1423
        result = TransactionIdEquals(hdr->xid, xid);
 
1424
        pfree(buf);
 
1425
 
 
1426
        return result;
 
1427
}
 
1428
 
 
1429
/*
 
1430
 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
 
1431
 */
 
1432
void
 
1433
FinishPreparedTransaction(const char *gid, bool isCommit)
 
1434
{
 
1435
        GlobalTransaction gxact;
 
1436
        PGPROC     *proc;
 
1437
        PGXACT     *pgxact;
 
1438
        TransactionId xid;
 
1439
        char       *buf;
 
1440
        char       *bufptr;
 
1441
        TwoPhaseFileHeader *hdr;
 
1442
        TransactionId latestXid;
 
1443
        TransactionId *children;
 
1444
        RelFileNode *commitrels;
 
1445
        RelFileNode *abortrels;
 
1446
        RelFileNode *delrels;
 
1447
        int                     ndelrels;
 
1448
        SharedInvalidationMessage *invalmsgs;
 
1449
        int                     i;
 
1450
 
 
1451
        /*
 
1452
         * Validate the GID, and lock the GXACT to ensure that two backends do not
 
1453
         * try to commit the same GID at once.
 
1454
         */
 
1455
        gxact = LockGXact(gid, GetUserId());
 
1456
        proc = &ProcGlobal->allProcs[gxact->pgprocno];
 
1457
        pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
1458
        xid = pgxact->xid;
 
1459
 
 
1460
        /*
 
1461
         * Read and validate 2PC state data. State data will typically be stored
 
1462
         * in WAL files if the LSN is after the last checkpoint record, or moved
 
1463
         * to disk if for some reason they have lived for a long time.
 
1464
         */
 
1465
        if (gxact->ondisk)
 
1466
                buf = ReadTwoPhaseFile(xid, true);
 
1467
        else
 
1468
                XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
 
1469
 
 
1470
 
 
1471
        /*
 
1472
         * Disassemble the header area
 
1473
         */
 
1474
        hdr = (TwoPhaseFileHeader *) buf;
 
1475
        Assert(TransactionIdEquals(hdr->xid, xid));
 
1476
        bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
 
1477
        bufptr += MAXALIGN(hdr->gidlen);
 
1478
        children = (TransactionId *) bufptr;
 
1479
        bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
 
1480
        commitrels = (RelFileNode *) bufptr;
 
1481
        bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
 
1482
        abortrels = (RelFileNode *) bufptr;
 
1483
        bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
 
1484
        invalmsgs = (SharedInvalidationMessage *) bufptr;
 
1485
        bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
 
1486
 
 
1487
        /* compute latestXid among all children */
 
1488
        latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
 
1489
 
 
1490
        /* Prevent cancel/die interrupt while cleaning up */
 
1491
        HOLD_INTERRUPTS();
 
1492
 
 
1493
        /*
 
1494
         * The order of operations here is critical: make the XLOG entry for
 
1495
         * commit or abort, then mark the transaction committed or aborted in
 
1496
         * pg_xact, then remove its PGPROC from the global ProcArray (which means
 
1497
         * TransactionIdIsInProgress will stop saying the prepared xact is in
 
1498
         * progress), then run the post-commit or post-abort callbacks. The
 
1499
         * callbacks will release the locks the transaction held.
 
1500
         */
 
1501
        if (isCommit)
 
1502
                RecordTransactionCommitPrepared(xid,
 
1503
                                                                                hdr->nsubxacts, children,
 
1504
                                                                                hdr->ncommitrels, commitrels,
 
1505
                                                                                hdr->ninvalmsgs, invalmsgs,
 
1506
                                                                                hdr->initfileinval, gid);
 
1507
        else
 
1508
                RecordTransactionAbortPrepared(xid,
 
1509
                                                                           hdr->nsubxacts, children,
 
1510
                                                                           hdr->nabortrels, abortrels,
 
1511
                                                                           gid);
 
1512
 
 
1513
        ProcArrayRemove(proc, latestXid);
 
1514
 
 
1515
        /*
 
1516
         * In case we fail while running the callbacks, mark the gxact invalid so
 
1517
         * no one else will try to commit/rollback, and so it will be recycled if
 
1518
         * we fail after this point.  It is still locked by our backend so it
 
1519
         * won't go away yet.
 
1520
         *
 
1521
         * (We assume it's safe to do this without taking TwoPhaseStateLock.)
 
1522
         */
 
1523
        gxact->valid = false;
 
1524
 
 
1525
        /*
 
1526
         * We have to remove any files that were supposed to be dropped. For
 
1527
         * consistency with the regular xact.c code paths, must do this before
 
1528
         * releasing locks, so do it before running the callbacks.
 
1529
         *
 
1530
         * NB: this code knows that we couldn't be dropping any temp rels ...
 
1531
         */
 
1532
        if (isCommit)
 
1533
        {
 
1534
                delrels = commitrels;
 
1535
                ndelrels = hdr->ncommitrels;
 
1536
        }
 
1537
        else
 
1538
        {
 
1539
                delrels = abortrels;
 
1540
                ndelrels = hdr->nabortrels;
 
1541
        }
 
1542
        for (i = 0; i < ndelrels; i++)
 
1543
        {
 
1544
                SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);
 
1545
 
 
1546
                smgrdounlink(srel, false);
 
1547
                smgrclose(srel);
 
1548
        }
 
1549
 
 
1550
        /*
 
1551
         * Handle cache invalidation messages.
 
1552
         *
 
1553
         * Relcache init file invalidation requires processing both before and
 
1554
         * after we send the SI messages. See AtEOXact_Inval()
 
1555
         */
 
1556
        if (hdr->initfileinval)
 
1557
                RelationCacheInitFilePreInvalidate();
 
1558
        SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
 
1559
        if (hdr->initfileinval)
 
1560
                RelationCacheInitFilePostInvalidate();
 
1561
 
 
1562
        /* And now do the callbacks */
 
1563
        if (isCommit)
 
1564
                ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
 
1565
        else
 
1566
                ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
 
1567
 
 
1568
        PredicateLockTwoPhaseFinish(xid, isCommit);
 
1569
 
 
1570
        /* Count the prepared xact as committed or aborted */
 
1571
        AtEOXact_PgStat(isCommit);
 
1572
 
 
1573
        /*
 
1574
         * And now we can clean up any files we may have left.
 
1575
         */
 
1576
        if (gxact->ondisk)
 
1577
                RemoveTwoPhaseFile(xid, true);
 
1578
 
 
1579
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
1580
        RemoveGXact(gxact);
 
1581
        LWLockRelease(TwoPhaseStateLock);
 
1582
        MyLockedGxact = NULL;
 
1583
 
 
1584
        RESUME_INTERRUPTS();
 
1585
 
 
1586
        pfree(buf);
 
1587
}
 
1588
 
 
1589
/*
 
1590
 * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
 
1591
 */
 
1592
static void
 
1593
ProcessRecords(char *bufptr, TransactionId xid,
 
1594
                           const TwoPhaseCallback callbacks[])
 
1595
{
 
1596
        for (;;)
 
1597
        {
 
1598
                TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
 
1599
 
 
1600
                Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
 
1601
                if (record->rmid == TWOPHASE_RM_END_ID)
 
1602
                        break;
 
1603
 
 
1604
                bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
 
1605
 
 
1606
                if (callbacks[record->rmid] != NULL)
 
1607
                        callbacks[record->rmid] (xid, record->info,
 
1608
                                                                         (void *) bufptr, record->len);
 
1609
 
 
1610
                bufptr += MAXALIGN(record->len);
 
1611
        }
 
1612
}
 
1613
 
 
1614
/*
 
1615
 * Remove the 2PC file for the specified XID.
 
1616
 *
 
1617
 * If giveWarning is false, do not complain about file-not-present;
 
1618
 * this is an expected case during WAL replay.
 
1619
 */
 
1620
static void
 
1621
RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
 
1622
{
 
1623
        char            path[MAXPGPATH];
 
1624
 
 
1625
        TwoPhaseFilePath(path, xid);
 
1626
        if (unlink(path))
 
1627
                if (errno != ENOENT || giveWarning)
 
1628
                        ereport(WARNING,
 
1629
                                        (errcode_for_file_access(),
 
1630
                                         errmsg("could not remove two-phase state file \"%s\": %m",
 
1631
                                                        path)));
 
1632
}
 
1633
 
 
1634
/*
 
1635
 * Recreates a state file. This is used in WAL replay and during
 
1636
 * checkpoint creation.
 
1637
 *
 
1638
 * Note: content and len don't include CRC.
 
1639
 */
 
1640
static void
 
1641
RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
 
1642
{
 
1643
        char            path[MAXPGPATH];
 
1644
        pg_crc32c       statefile_crc;
 
1645
        int                     fd;
 
1646
 
 
1647
        /* Recompute CRC */
 
1648
        INIT_CRC32C(statefile_crc);
 
1649
        COMP_CRC32C(statefile_crc, content, len);
 
1650
        FIN_CRC32C(statefile_crc);
 
1651
 
 
1652
        TwoPhaseFilePath(path, xid);
 
1653
 
 
1654
        fd = OpenTransientFile(path,
 
1655
                                                   O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
 
1656
        if (fd < 0)
 
1657
                ereport(ERROR,
 
1658
                                (errcode_for_file_access(),
 
1659
                                 errmsg("could not recreate two-phase state file \"%s\": %m",
 
1660
                                                path)));
 
1661
 
 
1662
        /* Write content and CRC */
 
1663
        pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
 
1664
        if (write(fd, content, len) != len)
 
1665
        {
 
1666
                pgstat_report_wait_end();
 
1667
                CloseTransientFile(fd);
 
1668
                ereport(ERROR,
 
1669
                                (errcode_for_file_access(),
 
1670
                                 errmsg("could not write two-phase state file: %m")));
 
1671
        }
 
1672
        if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
 
1673
        {
 
1674
                pgstat_report_wait_end();
 
1675
                CloseTransientFile(fd);
 
1676
                ereport(ERROR,
 
1677
                                (errcode_for_file_access(),
 
1678
                                 errmsg("could not write two-phase state file: %m")));
 
1679
        }
 
1680
        pgstat_report_wait_end();
 
1681
 
 
1682
        /*
 
1683
         * We must fsync the file because the end-of-replay checkpoint will not do
 
1684
         * so, there being no GXACT in shared memory yet to tell it to.
 
1685
         */
 
1686
        pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
 
1687
        if (pg_fsync(fd) != 0)
 
1688
        {
 
1689
                CloseTransientFile(fd);
 
1690
                ereport(ERROR,
 
1691
                                (errcode_for_file_access(),
 
1692
                                 errmsg("could not fsync two-phase state file: %m")));
 
1693
        }
 
1694
        pgstat_report_wait_end();
 
1695
 
 
1696
        if (CloseTransientFile(fd) != 0)
 
1697
                ereport(ERROR,
 
1698
                                (errcode_for_file_access(),
 
1699
                                 errmsg("could not close two-phase state file: %m")));
 
1700
}
 
1701
 
 
1702
/*
 
1703
 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
 
1704
 *
 
1705
 * We must fsync the state file of any GXACT that is valid or has been
 
1706
 * generated during redo and has a PREPARE LSN <= the checkpoint's redo
 
1707
 * horizon.  (If the gxact isn't valid yet, has not been generated in
 
1708
 * redo, or has a later LSN, this checkpoint is not responsible for
 
1709
 * fsyncing it.)
 
1710
 *
 
1711
 * This is deliberately run as late as possible in the checkpoint sequence,
 
1712
 * because GXACTs ordinarily have short lifespans, and so it is quite
 
1713
 * possible that GXACTs that were valid at checkpoint start will no longer
 
1714
 * exist if we wait a little bit. With typical checkpoint settings this
 
1715
 * will be about 3 minutes for an online checkpoint, so as a result we
 
1716
 * we expect that there will be no GXACTs that need to be copied to disk.
 
1717
 *
 
1718
 * If a GXACT remains valid across multiple checkpoints, it will already
 
1719
 * be on disk so we don't bother to repeat that write.
 
1720
 */
 
1721
void
 
1722
CheckPointTwoPhase(XLogRecPtr redo_horizon)
 
1723
{
 
1724
        int                     i;
 
1725
        int                     serialized_xacts = 0;
 
1726
 
 
1727
        if (max_prepared_xacts <= 0)
 
1728
                return;                                 /* nothing to do */
 
1729
 
 
1730
        TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
 
1731
 
 
1732
        /*
 
1733
         * We are expecting there to be zero GXACTs that need to be copied to
 
1734
         * disk, so we perform all I/O while holding TwoPhaseStateLock for
 
1735
         * simplicity. This prevents any new xacts from preparing while this
 
1736
         * occurs, which shouldn't be a problem since the presence of long-lived
 
1737
         * prepared xacts indicates the transaction manager isn't active.
 
1738
         *
 
1739
         * It's also possible to move I/O out of the lock, but on every error we
 
1740
         * should check whether somebody committed our transaction in different
 
1741
         * backend. Let's leave this optimization for future, if somebody will
 
1742
         * spot that this place cause bottleneck.
 
1743
         *
 
1744
         * Note that it isn't possible for there to be a GXACT with a
 
1745
         * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
 
1746
         * because of the efforts with delayChkpt.
 
1747
         */
 
1748
        LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 
1749
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 
1750
        {
 
1751
                /*
 
1752
                 * Note that we are using gxact not pgxact so this works in recovery
 
1753
                 * also
 
1754
                 */
 
1755
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
1756
 
 
1757
                if ((gxact->valid || gxact->inredo) &&
 
1758
                        !gxact->ondisk &&
 
1759
                        gxact->prepare_end_lsn <= redo_horizon)
 
1760
                {
 
1761
                        char       *buf;
 
1762
                        int                     len;
 
1763
 
 
1764
                        XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
 
1765
                        RecreateTwoPhaseFile(gxact->xid, buf, len);
 
1766
                        gxact->ondisk = true;
 
1767
                        gxact->prepare_start_lsn = InvalidXLogRecPtr;
 
1768
                        gxact->prepare_end_lsn = InvalidXLogRecPtr;
 
1769
                        pfree(buf);
 
1770
                        serialized_xacts++;
 
1771
                }
 
1772
        }
 
1773
        LWLockRelease(TwoPhaseStateLock);
 
1774
 
 
1775
        /*
 
1776
         * Flush unconditionally the parent directory to make any information
 
1777
         * durable on disk.  Two-phase files could have been removed and those
 
1778
         * removals need to be made persistent as well as any files newly created
 
1779
         * previously since the last checkpoint.
 
1780
         */
 
1781
        fsync_fname(TWOPHASE_DIR, true);
 
1782
 
 
1783
        TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
 
1784
 
 
1785
        if (log_checkpoints && serialized_xacts > 0)
 
1786
                ereport(LOG,
 
1787
                                (errmsg_plural("%u two-phase state file was written "
 
1788
                                                           "for a long-running prepared transaction",
 
1789
                                                           "%u two-phase state files were written "
 
1790
                                                           "for long-running prepared transactions",
 
1791
                                                           serialized_xacts,
 
1792
                                                           serialized_xacts)));
 
1793
}
 
1794
 
 
1795
/*
 
1796
 * restoreTwoPhaseData
 
1797
 *
 
1798
 * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
 
1799
 * This is called once at the beginning of recovery, saving any extra
 
1800
 * lookups in the future.  Two-phase files that are newer than the
 
1801
 * minimum XID horizon are discarded on the way.
 
1802
 */
 
1803
void
 
1804
restoreTwoPhaseData(void)
 
1805
{
 
1806
        DIR                *cldir;
 
1807
        struct dirent *clde;
 
1808
 
 
1809
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
1810
        cldir = AllocateDir(TWOPHASE_DIR);
 
1811
        while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
 
1812
        {
 
1813
                if (strlen(clde->d_name) == 8 &&
 
1814
                        strspn(clde->d_name, "0123456789ABCDEF") == 8)
 
1815
                {
 
1816
                        TransactionId xid;
 
1817
                        char       *buf;
 
1818
 
 
1819
                        xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
 
1820
 
 
1821
                        buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
 
1822
                                                                                true, false, false);
 
1823
                        if (buf == NULL)
 
1824
                                continue;
 
1825
 
 
1826
                        PrepareRedoAdd(buf, InvalidXLogRecPtr,
 
1827
                                                   InvalidXLogRecPtr, InvalidRepOriginId);
 
1828
                }
 
1829
        }
 
1830
        LWLockRelease(TwoPhaseStateLock);
 
1831
        FreeDir(cldir);
 
1832
}
 
1833
 
 
1834
/*
 
1835
 * PrescanPreparedTransactions
 
1836
 *
 
1837
 * Scan the shared memory entries of TwoPhaseState and determine the range
 
1838
 * of valid XIDs present.  This is run during database startup, after we
 
1839
 * have completed reading WAL.  ShmemVariableCache->nextXid has been set to
 
1840
 * one more than the highest XID for which evidence exists in WAL.
 
1841
 *
 
1842
 * We throw away any prepared xacts with main XID beyond nextXid --- if any
 
1843
 * are present, it suggests that the DBA has done a PITR recovery to an
 
1844
 * earlier point in time without cleaning out pg_twophase.  We dare not
 
1845
 * try to recover such prepared xacts since they likely depend on database
 
1846
 * state that doesn't exist now.
 
1847
 *
 
1848
 * However, we will advance nextXid beyond any subxact XIDs belonging to
 
1849
 * valid prepared xacts.  We need to do this since subxact commit doesn't
 
1850
 * write a WAL entry, and so there might be no evidence in WAL of those
 
1851
 * subxact XIDs.
 
1852
 *
 
1853
 * Our other responsibility is to determine and return the oldest valid XID
 
1854
 * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
 
1855
 * This is needed to synchronize pg_subtrans startup properly.
 
1856
 *
 
1857
 * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
 
1858
 * top-level xids is stored in *xids_p. The number of entries in the array
 
1859
 * is returned in *nxids_p.
 
1860
 */
 
1861
TransactionId
 
1862
PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 
1863
{
 
1864
        TransactionId origNextXid = ShmemVariableCache->nextXid;
 
1865
        TransactionId result = origNextXid;
 
1866
        TransactionId *xids = NULL;
 
1867
        int                     nxids = 0;
 
1868
        int                     allocsize = 0;
 
1869
        int                     i;
 
1870
 
 
1871
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
1872
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 
1873
        {
 
1874
                TransactionId xid;
 
1875
                char       *buf;
 
1876
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
1877
 
 
1878
                Assert(gxact->inredo);
 
1879
 
 
1880
                xid = gxact->xid;
 
1881
 
 
1882
                buf = ProcessTwoPhaseBuffer(xid,
 
1883
                                                                        gxact->prepare_start_lsn,
 
1884
                                                                        gxact->ondisk, false, true);
 
1885
 
 
1886
                if (buf == NULL)
 
1887
                        continue;
 
1888
 
 
1889
                /*
 
1890
                 * OK, we think this file is valid.  Incorporate xid into the
 
1891
                 * running-minimum result.
 
1892
                 */
 
1893
                if (TransactionIdPrecedes(xid, result))
 
1894
                        result = xid;
 
1895
 
 
1896
                if (xids_p)
 
1897
                {
 
1898
                        if (nxids == allocsize)
 
1899
                        {
 
1900
                                if (nxids == 0)
 
1901
                                {
 
1902
                                        allocsize = 10;
 
1903
                                        xids = palloc(allocsize * sizeof(TransactionId));
 
1904
                                }
 
1905
                                else
 
1906
                                {
 
1907
                                        allocsize = allocsize * 2;
 
1908
                                        xids = repalloc(xids, allocsize * sizeof(TransactionId));
 
1909
                                }
 
1910
                        }
 
1911
                        xids[nxids++] = xid;
 
1912
                }
 
1913
 
 
1914
                pfree(buf);
 
1915
        }
 
1916
        LWLockRelease(TwoPhaseStateLock);
 
1917
 
 
1918
        if (xids_p)
 
1919
        {
 
1920
                *xids_p = xids;
 
1921
                *nxids_p = nxids;
 
1922
        }
 
1923
 
 
1924
        return result;
 
1925
}
 
1926
 
 
1927
/*
 
1928
 * StandbyRecoverPreparedTransactions
 
1929
 *
 
1930
 * Scan the shared memory entries of TwoPhaseState and setup all the required
 
1931
 * information to allow standby queries to treat prepared transactions as still
 
1932
 * active.
 
1933
 *
 
1934
 * This is never called at the end of recovery - we use
 
1935
 * RecoverPreparedTransactions() at that point.
 
1936
 *
 
1937
 * The lack of calls to SubTransSetParent() calls here is by design;
 
1938
 * those calls are made by RecoverPreparedTransactions() at the end of recovery
 
1939
 * for those xacts that need this.
 
1940
 */
 
1941
void
 
1942
StandbyRecoverPreparedTransactions(void)
 
1943
{
 
1944
        int                     i;
 
1945
 
 
1946
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
1947
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 
1948
        {
 
1949
                TransactionId xid;
 
1950
                char       *buf;
 
1951
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
1952
 
 
1953
                Assert(gxact->inredo);
 
1954
 
 
1955
                xid = gxact->xid;
 
1956
 
 
1957
                buf = ProcessTwoPhaseBuffer(xid,
 
1958
                                                                        gxact->prepare_start_lsn,
 
1959
                                                                        gxact->ondisk, false, false);
 
1960
                if (buf != NULL)
 
1961
                        pfree(buf);
 
1962
        }
 
1963
        LWLockRelease(TwoPhaseStateLock);
 
1964
}
 
1965
 
 
1966
/*
 
1967
 * RecoverPreparedTransactions
 
1968
 *
 
1969
 * Scan the shared memory entries of TwoPhaseState and reload the state for
 
1970
 * each prepared transaction (reacquire locks, etc).
 
1971
 *
 
1972
 * This is run at the end of recovery, but before we allow backends to write
 
1973
 * WAL.
 
1974
 *
 
1975
 * At the end of recovery the way we take snapshots will change. We now need
 
1976
 * to mark all running transactions with their full SubTransSetParent() info
 
1977
 * to allow normal snapshots to work correctly if snapshots overflow.
 
1978
 * We do this here because by definition prepared transactions are the only
 
1979
 * type of write transaction still running, so this is necessary and
 
1980
 * complete.
 
1981
 */
 
1982
void
 
1983
RecoverPreparedTransactions(void)
 
1984
{
 
1985
        int                     i;
 
1986
 
 
1987
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
1988
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 
1989
        {
 
1990
                TransactionId xid;
 
1991
                char       *buf;
 
1992
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
1993
                char       *bufptr;
 
1994
                TwoPhaseFileHeader *hdr;
 
1995
                TransactionId *subxids;
 
1996
                const char *gid;
 
1997
 
 
1998
                xid = gxact->xid;
 
1999
 
 
2000
                /*
 
2001
                 * Reconstruct subtrans state for the transaction --- needed because
 
2002
                 * pg_subtrans is not preserved over a restart.  Note that we are
 
2003
                 * linking all the subtransactions directly to the top-level XID;
 
2004
                 * there may originally have been a more complex hierarchy, but
 
2005
                 * there's no need to restore that exactly. It's possible that
 
2006
                 * SubTransSetParent has been set before, if the prepared transaction
 
2007
                 * generated xid assignment records.
 
2008
                 */
 
2009
                buf = ProcessTwoPhaseBuffer(xid,
 
2010
                                                                        gxact->prepare_start_lsn,
 
2011
                                                                        gxact->ondisk, true, false);
 
2012
                if (buf == NULL)
 
2013
                        continue;
 
2014
 
 
2015
                ereport(LOG,
 
2016
                                (errmsg("recovering prepared transaction %u from shared memory", xid)));
 
2017
 
 
2018
                hdr = (TwoPhaseFileHeader *) buf;
 
2019
                Assert(TransactionIdEquals(hdr->xid, xid));
 
2020
                bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
 
2021
                gid = (const char *) bufptr;
 
2022
                bufptr += MAXALIGN(hdr->gidlen);
 
2023
                subxids = (TransactionId *) bufptr;
 
2024
                bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
 
2025
                bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
 
2026
                bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
 
2027
                bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
 
2028
 
 
2029
                /*
 
2030
                 * Recreate its GXACT and dummy PGPROC. But, check whether it was
 
2031
                 * added in redo and already has a shmem entry for it.
 
2032
                 */
 
2033
                MarkAsPreparingGuts(gxact, xid, gid,
 
2034
                                                        hdr->prepared_at,
 
2035
                                                        hdr->owner, hdr->database);
 
2036
 
 
2037
                /* recovered, so reset the flag for entries generated by redo */
 
2038
                gxact->inredo = false;
 
2039
 
 
2040
                GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
 
2041
                MarkAsPrepared(gxact, true);
 
2042
 
 
2043
                LWLockRelease(TwoPhaseStateLock);
 
2044
 
 
2045
                /*
 
2046
                 * Recover other state (notably locks) using resource managers.
 
2047
                 */
 
2048
                ProcessRecords(bufptr, xid, twophase_recover_callbacks);
 
2049
 
 
2050
                /*
 
2051
                 * Release locks held by the standby process after we process each
 
2052
                 * prepared transaction. As a result, we don't need too many
 
2053
                 * additional locks at any one time.
 
2054
                 */
 
2055
                if (InHotStandby)
 
2056
                        StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
 
2057
 
 
2058
                /*
 
2059
                 * We're done with recovering this transaction. Clear MyLockedGxact,
 
2060
                 * like we do in PrepareTransaction() during normal operation.
 
2061
                 */
 
2062
                PostPrepare_Twophase();
 
2063
 
 
2064
                pfree(buf);
 
2065
 
 
2066
                LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
2067
        }
 
2068
 
 
2069
        LWLockRelease(TwoPhaseStateLock);
 
2070
}
 
2071
 
 
2072
/*
 
2073
 * ProcessTwoPhaseBuffer
 
2074
 *
 
2075
 * Given a transaction id, read it either from disk or read it directly
 
2076
 * via shmem xlog record pointer using the provided "prepare_start_lsn".
 
2077
 *
 
2078
 * If setParent is true, set up subtransaction parent linkages.
 
2079
 *
 
2080
 * If setNextXid is true, set ShmemVariableCache->nextXid to the newest
 
2081
 * value scanned.
 
2082
 */
 
2083
static char *
 
2084
ProcessTwoPhaseBuffer(TransactionId xid,
 
2085
                                          XLogRecPtr prepare_start_lsn,
 
2086
                                          bool fromdisk,
 
2087
                                          bool setParent, bool setNextXid)
 
2088
{
 
2089
        TransactionId origNextXid = ShmemVariableCache->nextXid;
 
2090
        TransactionId *subxids;
 
2091
        char       *buf;
 
2092
        TwoPhaseFileHeader *hdr;
 
2093
        int                     i;
 
2094
 
 
2095
        Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 
2096
 
 
2097
        if (!fromdisk)
 
2098
                Assert(prepare_start_lsn != InvalidXLogRecPtr);
 
2099
 
 
2100
        /* Already processed? */
 
2101
        if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
 
2102
        {
 
2103
                if (fromdisk)
 
2104
                {
 
2105
                        ereport(WARNING,
 
2106
                                        (errmsg("removing stale two-phase state file for transaction %u",
 
2107
                                                        xid)));
 
2108
                        RemoveTwoPhaseFile(xid, true);
 
2109
                }
 
2110
                else
 
2111
                {
 
2112
                        ereport(WARNING,
 
2113
                                        (errmsg("removing stale two-phase state from memory for transaction %u",
 
2114
                                                        xid)));
 
2115
                        PrepareRedoRemove(xid, true);
 
2116
                }
 
2117
                return NULL;
 
2118
        }
 
2119
 
 
2120
        /* Reject XID if too new */
 
2121
        if (TransactionIdFollowsOrEquals(xid, origNextXid))
 
2122
        {
 
2123
                if (fromdisk)
 
2124
                {
 
2125
                        ereport(WARNING,
 
2126
                                        (errmsg("removing future two-phase state file for transaction %u",
 
2127
                                                        xid)));
 
2128
                        RemoveTwoPhaseFile(xid, true);
 
2129
                }
 
2130
                else
 
2131
                {
 
2132
                        ereport(WARNING,
 
2133
                                        (errmsg("removing future two-phase state from memory for transaction %u",
 
2134
                                                        xid)));
 
2135
                        PrepareRedoRemove(xid, true);
 
2136
                }
 
2137
                return NULL;
 
2138
        }
 
2139
 
 
2140
        if (fromdisk)
 
2141
        {
 
2142
                /* Read and validate file */
 
2143
                buf = ReadTwoPhaseFile(xid, true);
 
2144
                if (buf == NULL)
 
2145
                {
 
2146
                        ereport(WARNING,
 
2147
                                        (errmsg("removing corrupt two-phase state file for transaction %u",
 
2148
                                                        xid)));
 
2149
                        RemoveTwoPhaseFile(xid, true);
 
2150
                        return NULL;
 
2151
                }
 
2152
        }
 
2153
        else
 
2154
        {
 
2155
                /* Read xlog data */
 
2156
                XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
 
2157
        }
 
2158
 
 
2159
        /* Deconstruct header */
 
2160
        hdr = (TwoPhaseFileHeader *) buf;
 
2161
        if (!TransactionIdEquals(hdr->xid, xid))
 
2162
        {
 
2163
                if (fromdisk)
 
2164
                {
 
2165
                        ereport(WARNING,
 
2166
                                        (errmsg("removing corrupt two-phase state file for transaction %u",
 
2167
                                                        xid)));
 
2168
                        RemoveTwoPhaseFile(xid, true);
 
2169
                }
 
2170
                else
 
2171
                {
 
2172
                        ereport(WARNING,
 
2173
                                        (errmsg("removing corrupt two-phase state from memory for transaction %u",
 
2174
                                                        xid)));
 
2175
                        PrepareRedoRemove(xid, true);
 
2176
                }
 
2177
                pfree(buf);
 
2178
                return NULL;
 
2179
        }
 
2180
 
 
2181
        /*
 
2182
         * Examine subtransaction XIDs ... they should all follow main XID, and
 
2183
         * they may force us to advance nextXid.
 
2184
         */
 
2185
        subxids = (TransactionId *) (buf +
 
2186
                                                                 MAXALIGN(sizeof(TwoPhaseFileHeader)) +
 
2187
                                                                 MAXALIGN(hdr->gidlen));
 
2188
        for (i = 0; i < hdr->nsubxacts; i++)
 
2189
        {
 
2190
                TransactionId subxid = subxids[i];
 
2191
 
 
2192
                Assert(TransactionIdFollows(subxid, xid));
 
2193
 
 
2194
                /* update nextXid if needed */
 
2195
                if (setNextXid &&
 
2196
                        TransactionIdFollowsOrEquals(subxid,
 
2197
                                                                                 ShmemVariableCache->nextXid))
 
2198
                {
 
2199
                        /*
 
2200
                         * We don't expect anyone else to modify nextXid, hence we don't
 
2201
                         * need to hold a lock while examining it.  We still acquire the
 
2202
                         * lock to modify it, though, so we recheck.
 
2203
                         */
 
2204
                        LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
 
2205
                        if (TransactionIdFollowsOrEquals(subxid,
 
2206
                                                                                         ShmemVariableCache->nextXid))
 
2207
                        {
 
2208
                                ShmemVariableCache->nextXid = subxid;
 
2209
                                TransactionIdAdvance(ShmemVariableCache->nextXid);
 
2210
                        }
 
2211
                        LWLockRelease(XidGenLock);
 
2212
                }
 
2213
 
 
2214
                if (setParent)
 
2215
                        SubTransSetParent(subxid, xid);
 
2216
        }
 
2217
 
 
2218
        return buf;
 
2219
}
 
2220
 
 
2221
 
 
2222
/*
 
2223
 *      RecordTransactionCommitPrepared
 
2224
 *
 
2225
 * This is basically the same as RecordTransactionCommit (q.v. if you change
 
2226
 * this function): in particular, we must set the delayChkpt flag to avoid a
 
2227
 * race condition.
 
2228
 *
 
2229
 * We know the transaction made at least one XLOG entry (its PREPARE),
 
2230
 * so it is never possible to optimize out the commit record.
 
2231
 */
 
2232
static void
 
2233
RecordTransactionCommitPrepared(TransactionId xid,
 
2234
                                                                int nchildren,
 
2235
                                                                TransactionId *children,
 
2236
                                                                int nrels,
 
2237
                                                                RelFileNode *rels,
 
2238
                                                                int ninvalmsgs,
 
2239
                                                                SharedInvalidationMessage *invalmsgs,
 
2240
                                                                bool initfileinval,
 
2241
                                                                const char *gid)
 
2242
{
 
2243
        XLogRecPtr      recptr;
 
2244
        TimestampTz committs = GetCurrentTimestamp();
 
2245
        bool            replorigin;
 
2246
 
 
2247
        /*
 
2248
         * Are we using the replication origins feature?  Or, in other words, are
 
2249
         * we replaying remote actions?
 
2250
         */
 
2251
        replorigin = (replorigin_session_origin != InvalidRepOriginId &&
 
2252
                                  replorigin_session_origin != DoNotReplicateId);
 
2253
 
 
2254
        START_CRIT_SECTION();
 
2255
 
 
2256
        /* See notes in RecordTransactionCommit */
 
2257
        MyPgXact->delayChkpt = true;
 
2258
 
 
2259
        /*
 
2260
         * Emit the XLOG commit record. Note that we mark 2PC commits as
 
2261
         * potentially having AccessExclusiveLocks since we don't know whether or
 
2262
         * not they do.
 
2263
         */
 
2264
        recptr = XactLogCommitRecord(committs,
 
2265
                                                                 nchildren, children, nrels, rels,
 
2266
                                                                 ninvalmsgs, invalmsgs,
 
2267
                                                                 initfileinval, false,
 
2268
                                                                 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
 
2269
                                                                 xid, gid);
 
2270
 
 
2271
 
 
2272
        if (replorigin)
 
2273
                /* Move LSNs forward for this replication origin */
 
2274
                replorigin_session_advance(replorigin_session_origin_lsn,
 
2275
                                                                   XactLastRecEnd);
 
2276
 
 
2277
        /*
 
2278
         * Record commit timestamp.  The value comes from plain commit timestamp
 
2279
         * if replorigin is not enabled, or replorigin already set a value for us
 
2280
         * in replorigin_session_origin_timestamp otherwise.
 
2281
         *
 
2282
         * We don't need to WAL-log anything here, as the commit record written
 
2283
         * above already contains the data.
 
2284
         */
 
2285
        if (!replorigin || replorigin_session_origin_timestamp == 0)
 
2286
                replorigin_session_origin_timestamp = committs;
 
2287
 
 
2288
        TransactionTreeSetCommitTsData(xid, nchildren, children,
 
2289
                                                                   replorigin_session_origin_timestamp,
 
2290
                                                                   replorigin_session_origin, false);
 
2291
 
 
2292
        /*
 
2293
         * We don't currently try to sleep before flush here ... nor is there any
 
2294
         * support for async commit of a prepared xact (the very idea is probably
 
2295
         * a contradiction)
 
2296
         */
 
2297
 
 
2298
        /* Flush XLOG to disk */
 
2299
        XLogFlush(recptr);
 
2300
 
 
2301
        /* Mark the transaction committed in pg_xact */
 
2302
        TransactionIdCommitTree(xid, nchildren, children);
 
2303
 
 
2304
        /* Checkpoint can proceed now */
 
2305
        MyPgXact->delayChkpt = false;
 
2306
 
 
2307
        END_CRIT_SECTION();
 
2308
 
 
2309
        /*
 
2310
         * Wait for synchronous replication, if required.
 
2311
         *
 
2312
         * Note that at this stage we have marked clog, but still show as running
 
2313
         * in the procarray and continue to hold locks.
 
2314
         */
 
2315
        SyncRepWaitForLSN(recptr, true);
 
2316
}
 
2317
 
 
2318
/*
 
2319
 *      RecordTransactionAbortPrepared
 
2320
 *
 
2321
 * This is basically the same as RecordTransactionAbort.
 
2322
 *
 
2323
 * We know the transaction made at least one XLOG entry (its PREPARE),
 
2324
 * so it is never possible to optimize out the abort record.
 
2325
 */
 
2326
static void
 
2327
RecordTransactionAbortPrepared(TransactionId xid,
 
2328
                                                           int nchildren,
 
2329
                                                           TransactionId *children,
 
2330
                                                           int nrels,
 
2331
                                                           RelFileNode *rels,
 
2332
                                                           const char *gid)
 
2333
{
 
2334
        XLogRecPtr      recptr;
 
2335
 
 
2336
        /*
 
2337
         * Catch the scenario where we aborted partway through
 
2338
         * RecordTransactionCommitPrepared ...
 
2339
         */
 
2340
        if (TransactionIdDidCommit(xid))
 
2341
                elog(PANIC, "cannot abort transaction %u, it was already committed",
 
2342
                         xid);
 
2343
 
 
2344
        START_CRIT_SECTION();
 
2345
 
 
2346
        /*
 
2347
         * Emit the XLOG commit record. Note that we mark 2PC aborts as
 
2348
         * potentially having AccessExclusiveLocks since we don't know whether or
 
2349
         * not they do.
 
2350
         */
 
2351
        recptr = XactLogAbortRecord(GetCurrentTimestamp(),
 
2352
                                                                nchildren, children,
 
2353
                                                                nrels, rels,
 
2354
                                                                MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
 
2355
                                                                xid, gid);
 
2356
 
 
2357
        /* Always flush, since we're about to remove the 2PC state file */
 
2358
        XLogFlush(recptr);
 
2359
 
 
2360
        /*
 
2361
         * Mark the transaction aborted in clog.  This is not absolutely necessary
 
2362
         * but we may as well do it while we are here.
 
2363
         */
 
2364
        TransactionIdAbortTree(xid, nchildren, children);
 
2365
 
 
2366
        END_CRIT_SECTION();
 
2367
 
 
2368
        /*
 
2369
         * Wait for synchronous replication, if required.
 
2370
         *
 
2371
         * Note that at this stage we have marked clog, but still show as running
 
2372
         * in the procarray and continue to hold locks.
 
2373
         */
 
2374
        SyncRepWaitForLSN(recptr, false);
 
2375
}
 
2376
 
 
2377
/*
 
2378
 * PrepareRedoAdd
 
2379
 *
 
2380
 * Store pointers to the start/end of the WAL record along with the xid in
 
2381
 * a gxact entry in shared memory TwoPhaseState structure.  If caller
 
2382
 * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
 
2383
 * data, the entry is marked as located on disk.
 
2384
 */
 
2385
void
 
2386
PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
 
2387
                           XLogRecPtr end_lsn, RepOriginId origin_id)
 
2388
{
 
2389
        TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
 
2390
        char       *bufptr;
 
2391
        const char *gid;
 
2392
        GlobalTransaction gxact;
 
2393
 
 
2394
        Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 
2395
        Assert(RecoveryInProgress());
 
2396
 
 
2397
        bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
 
2398
        gid = (const char *) bufptr;
 
2399
 
 
2400
        /*
 
2401
         * Reserve the GID for the given transaction in the redo code path.
 
2402
         *
 
2403
         * This creates a gxact struct and puts it into the active array.
 
2404
         *
 
2405
         * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
 
2406
         * shared memory. Hence, we only fill up the bare minimum contents here.
 
2407
         * The gxact also gets marked with gxact->inredo set to true to indicate
 
2408
         * that it got added in the redo phase
 
2409
         */
 
2410
 
 
2411
        /* Get a free gxact from the freelist */
 
2412
        if (TwoPhaseState->freeGXacts == NULL)
 
2413
                ereport(ERROR,
 
2414
                                (errcode(ERRCODE_OUT_OF_MEMORY),
 
2415
                                 errmsg("maximum number of prepared transactions reached"),
 
2416
                                 errhint("Increase max_prepared_transactions (currently %d).",
 
2417
                                                 max_prepared_xacts)));
 
2418
        gxact = TwoPhaseState->freeGXacts;
 
2419
        TwoPhaseState->freeGXacts = gxact->next;
 
2420
 
 
2421
        gxact->prepared_at = hdr->prepared_at;
 
2422
        gxact->prepare_start_lsn = start_lsn;
 
2423
        gxact->prepare_end_lsn = end_lsn;
 
2424
        gxact->xid = hdr->xid;
 
2425
        gxact->owner = hdr->owner;
 
2426
        gxact->locking_backend = InvalidBackendId;
 
2427
        gxact->valid = false;
 
2428
        gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
 
2429
        gxact->inredo = true;           /* yes, added in redo */
 
2430
        strcpy(gxact->gid, gid);
 
2431
 
 
2432
        /* And insert it into the active array */
 
2433
        Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
 
2434
        TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
 
2435
 
 
2436
        if (origin_id != InvalidRepOriginId)
 
2437
        {
 
2438
                /* recover apply progress */
 
2439
                replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
 
2440
                                                   false /* backward */ , false /* WAL */ );
 
2441
        }
 
2442
 
 
2443
        elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
 
2444
}
 
2445
 
 
2446
/*
 
2447
 * PrepareRedoRemove
 
2448
 *
 
2449
 * Remove the corresponding gxact entry from TwoPhaseState. Also remove
 
2450
 * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
 
2451
 *
 
2452
 * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
 
2453
 * is updated.
 
2454
 */
 
2455
void
 
2456
PrepareRedoRemove(TransactionId xid, bool giveWarning)
 
2457
{
 
2458
        GlobalTransaction gxact = NULL;
 
2459
        int                     i;
 
2460
        bool            found = false;
 
2461
 
 
2462
        Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 
2463
        Assert(RecoveryInProgress());
 
2464
 
 
2465
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 
2466
        {
 
2467
                gxact = TwoPhaseState->prepXacts[i];
 
2468
 
 
2469
                if (gxact->xid == xid)
 
2470
                {
 
2471
                        Assert(gxact->inredo);
 
2472
                        found = true;
 
2473
                        break;
 
2474
                }
 
2475
        }
 
2476
 
 
2477
        /*
 
2478
         * Just leave if there is nothing, this is expected during WAL replay.
 
2479
         */
 
2480
        if (!found)
 
2481
                return;
 
2482
 
 
2483
        /*
 
2484
         * And now we can clean up any files we may have left.
 
2485
         */
 
2486
        elog(DEBUG2, "removing 2PC data for transaction %u", xid);
 
2487
        if (gxact->ondisk)
 
2488
                RemoveTwoPhaseFile(xid, giveWarning);
 
2489
        RemoveGXact(gxact);
 
2490
 
 
2491
        return;
 
2492
}