1
/*-------------------------------------------------------------------------
4
* Two-phase commit support functions.
6
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7
* Portions Copyright (c) 1994, Regents of the University of California
10
* src/backend/access/transam/twophase.c
13
* Each global transaction is associated with a global transaction
14
* identifier (GID). The client assigns a GID to a postgres
15
* transaction with the PREPARE TRANSACTION command.
17
* We keep all active global transactions in a shared memory array.
18
* When the PREPARE TRANSACTION command is issued, the GID is
19
* reserved for the transaction in the array. This is done before
20
* a WAL entry is made, because the reservation checks for duplicate
21
* GIDs and aborts the transaction if there already is a global
22
* transaction in prepared state with the same GID.
24
* A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
25
* what keeps the XID considered running by TransactionIdIsInProgress.
26
* It is also convenient as a PGPROC to hook the gxact's locks to.
28
* Information to recover prepared transactions in case of crash is
29
* now stored in WAL for the common case. In some cases there will be
30
* an extended period between preparing a GXACT and commit/abort, in
31
* which case we need to separately record prepared transaction data
32
* in permanent storage. This includes locking information, pending
33
* notifications etc. All that state information is written to the
34
* per-transaction state file in the pg_twophase directory.
35
* All prepared transactions will be written prior to shutdown.
37
* Life track of state data is following:
39
* * On PREPARE TRANSACTION backend writes state data only to the WAL and
40
* stores pointer to the start of the WAL record in
41
* gxact->prepare_start_lsn.
42
* * If COMMIT occurs before checkpoint then backend reads data from WAL
43
* using prepare_start_lsn.
44
* * On checkpoint state data copied to files in pg_twophase directory and
46
* * If COMMIT happens after checkpoint then backend reads state data from
49
* During replay and replication, TwoPhaseState also holds information
50
* about active prepared transactions that haven't been moved to disk yet.
52
* Replay of twophase records happens by the following rules:
54
* * At the beginning of recovery, pg_twophase is scanned once, filling
55
* TwoPhaseState with entries marked with gxact->inredo and
56
* gxact->ondisk. Two-phase file data older than the XID horizon of
57
* the redo position are discarded.
58
* * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59
* gxact->inredo is set to true for such entries.
60
* * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61
* that have gxact->inredo set and are behind the redo_horizon. We
62
* save them to disk and then switch gxact->ondisk to true.
63
* * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64
* If gxact->ondisk is true, the corresponding entry from the disk
65
* is additionally deleted.
66
* * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67
* and PrescanPreparedTransactions() have been modified to go through
68
* gxact->inredo entries that have not made it to disk.
70
*-------------------------------------------------------------------------
79
#include "access/commit_ts.h"
80
#include "access/htup_details.h"
81
#include "access/subtrans.h"
82
#include "access/transam.h"
83
#include "access/twophase.h"
84
#include "access/twophase_rmgr.h"
85
#include "access/xact.h"
86
#include "access/xlog.h"
87
#include "access/xloginsert.h"
88
#include "access/xlogutils.h"
89
#include "access/xlogreader.h"
90
#include "catalog/pg_type.h"
91
#include "catalog/storage.h"
93
#include "miscadmin.h"
96
#include "replication/origin.h"
97
#include "replication/syncrep.h"
98
#include "replication/walsender.h"
99
#include "storage/fd.h"
100
#include "storage/ipc.h"
101
#include "storage/predicate.h"
102
#include "storage/proc.h"
103
#include "storage/procarray.h"
104
#include "storage/sinvaladt.h"
105
#include "storage/smgr.h"
106
#include "utils/builtins.h"
107
#include "utils/memutils.h"
108
#include "utils/timestamp.h"
112
* Directory where Two-phase commit files reside within PGDATA
114
#define TWOPHASE_DIR "pg_twophase"
116
/* GUC variable, can't be changed after startup */
117
int max_prepared_xacts = 0;
120
* This struct describes one global transaction that is in prepared state
121
* or attempting to become prepared.
123
* The lifecycle of a global transaction is:
125
* 1. After checking that the requested GID is not in use, set up an entry in
126
* the TwoPhaseState->prepXacts array with the correct GID and valid = false,
127
* and mark it as locked by my backend.
129
* 2. After successfully completing prepare, set valid = true and enter the
130
* referenced PGPROC into the global ProcArray.
132
* 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
133
* valid and not locked, then mark the entry as locked by storing my current
134
* backend ID into locking_backend. This prevents concurrent attempts to
135
* commit or rollback the same prepared xact.
137
* 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
138
* from the ProcArray and the TwoPhaseState->prepXacts array and return it to
141
* Note that if the preparing transaction fails between steps 1 and 2, the
142
* entry must be removed so that the GID and the GlobalTransaction struct
143
* can be reused. See AtAbort_Twophase().
145
* typedef struct GlobalTransactionData *GlobalTransaction appears in
149
typedef struct GlobalTransactionData
151
GlobalTransaction next; /* list link for free list */
152
int pgprocno; /* ID of associated dummy PGPROC */
153
BackendId dummyBackendId; /* similar to backend id for backends */
154
TimestampTz prepared_at; /* time of preparation */
157
* Note that we need to keep track of two LSNs for each GXACT. We keep
158
* track of the start LSN because this is the address we must use to read
159
* state data back from WAL when committing a prepared GXACT. We keep
160
* track of the end LSN because that is the LSN we need to wait for prior
163
XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
164
XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
165
TransactionId xid; /* The GXACT id */
167
Oid owner; /* ID of user that executed the xact */
168
BackendId locking_backend; /* backend currently working on the xact */
169
bool valid; /* true if PGPROC entry is in proc array */
170
bool ondisk; /* true if prepare state file is on disk */
171
bool inredo; /* true if entry was added via xlog_redo */
172
char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
173
} GlobalTransactionData;
176
* Two Phase Commit shared state. Access to this struct is protected
177
* by TwoPhaseStateLock.
179
typedef struct TwoPhaseStateData
181
/* Head of linked list of free GlobalTransactionData structs */
182
GlobalTransaction freeGXacts;
184
/* Number of valid prepXacts entries. */
187
/* There are max_prepared_xacts items in this array */
188
GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
191
static TwoPhaseStateData *TwoPhaseState;
194
* Global transaction entry currently locked by us, if any. Note that any
195
* access to the entry pointed to by this variable must be protected by
196
* TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
197
* (since it's just local memory).
199
static GlobalTransaction MyLockedGxact = NULL;
201
static bool twophaseExitRegistered = false;
203
static void RecordTransactionCommitPrepared(TransactionId xid,
205
TransactionId *children,
209
SharedInvalidationMessage *invalmsgs,
212
static void RecordTransactionAbortPrepared(TransactionId xid,
214
TransactionId *children,
218
static void ProcessRecords(char *bufptr, TransactionId xid,
219
const TwoPhaseCallback callbacks[]);
220
static void RemoveGXact(GlobalTransaction gxact);
222
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
223
static char *ProcessTwoPhaseBuffer(TransactionId xid,
224
XLogRecPtr prepare_start_lsn,
225
bool fromdisk, bool setParent, bool setNextXid);
226
static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
227
const char *gid, TimestampTz prepared_at, Oid owner,
229
static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
230
static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
233
* Initialization of shared memory
236
TwoPhaseShmemSize(void)
240
/* Need the fixed struct, the array of pointers, and the GTD structs */
241
size = offsetof(TwoPhaseStateData, prepXacts);
242
size = add_size(size, mul_size(max_prepared_xacts,
243
sizeof(GlobalTransaction)));
244
size = MAXALIGN(size);
245
size = add_size(size, mul_size(max_prepared_xacts,
246
sizeof(GlobalTransactionData)));
252
TwoPhaseShmemInit(void)
256
TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
259
if (!IsUnderPostmaster)
261
GlobalTransaction gxacts;
265
TwoPhaseState->freeGXacts = NULL;
266
TwoPhaseState->numPrepXacts = 0;
269
* Initialize the linked list of free GlobalTransactionData structs
271
gxacts = (GlobalTransaction)
272
((char *) TwoPhaseState +
273
MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
274
sizeof(GlobalTransaction) * max_prepared_xacts));
275
for (i = 0; i < max_prepared_xacts; i++)
277
/* insert into linked list */
278
gxacts[i].next = TwoPhaseState->freeGXacts;
279
TwoPhaseState->freeGXacts = &gxacts[i];
281
/* associate it with a PGPROC assigned by InitProcGlobal */
282
gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
285
* Assign a unique ID for each dummy proc, so that the range of
286
* dummy backend IDs immediately follows the range of normal
287
* backend IDs. We don't dare to assign a real backend ID to dummy
288
* procs, because prepared transactions don't take part in cache
289
* invalidation like a real backend ID would imply, but having a
290
* unique ID for them is nevertheless handy. This arrangement
291
* allows you to allocate an array of size (MaxBackends +
292
* max_prepared_xacts + 1), and have a slot for every backend and
293
* prepared transaction. Currently multixact.c uses that
296
gxacts[i].dummyBackendId = MaxBackends + 1 + i;
304
* Exit hook to unlock the global transaction entry we're working on.
307
AtProcExit_Twophase(int code, Datum arg)
309
/* same logic as abort */
314
* Abort hook to unlock the global transaction entry we're working on.
317
AtAbort_Twophase(void)
319
if (MyLockedGxact == NULL)
323
* What to do with the locked global transaction entry? If we were in the
324
* process of preparing the transaction, but haven't written the WAL
325
* record and state file yet, the transaction must not be considered as
326
* prepared. Likewise, if we are in the process of finishing an
327
* already-prepared transaction, and fail after having already written the
328
* 2nd phase commit or rollback record to the WAL, the transaction should
329
* not be considered as prepared anymore. In those cases, just remove the
330
* entry from shared memory.
332
* Otherwise, the entry must be left in place so that the transaction can
333
* be finished later, so just unlock it.
335
* If we abort during prepare, after having written the WAL record, we
336
* might not have transferred all locks and other state to the prepared
337
* transaction yet. Likewise, if we abort during commit or rollback,
338
* after having written the WAL record, we might not have released all the
339
* resources held by the transaction yet. In those cases, the in-memory
340
* state can be wrong, but it's too late to back out.
342
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
343
if (!MyLockedGxact->valid)
344
RemoveGXact(MyLockedGxact);
346
MyLockedGxact->locking_backend = InvalidBackendId;
347
LWLockRelease(TwoPhaseStateLock);
349
MyLockedGxact = NULL;
353
* This is called after we have finished transferring state to the prepared
357
PostPrepare_Twophase(void)
359
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
360
MyLockedGxact->locking_backend = InvalidBackendId;
361
LWLockRelease(TwoPhaseStateLock);
363
MyLockedGxact = NULL;
369
* Reserve the GID for the given transaction.
372
MarkAsPreparing(TransactionId xid, const char *gid,
373
TimestampTz prepared_at, Oid owner, Oid databaseid)
375
GlobalTransaction gxact;
378
if (strlen(gid) >= GIDSIZE)
380
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
381
errmsg("transaction identifier \"%s\" is too long",
384
/* fail immediately if feature is disabled */
385
if (max_prepared_xacts == 0)
387
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
388
errmsg("prepared transactions are disabled"),
389
errhint("Set max_prepared_transactions to a nonzero value.")));
391
/* on first call, register the exit hook */
392
if (!twophaseExitRegistered)
394
before_shmem_exit(AtProcExit_Twophase, 0);
395
twophaseExitRegistered = true;
398
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
400
/* Check for conflicting GID */
401
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
403
gxact = TwoPhaseState->prepXacts[i];
404
if (strcmp(gxact->gid, gid) == 0)
407
(errcode(ERRCODE_DUPLICATE_OBJECT),
408
errmsg("transaction identifier \"%s\" is already in use",
413
/* Get a free gxact from the freelist */
414
if (TwoPhaseState->freeGXacts == NULL)
416
(errcode(ERRCODE_OUT_OF_MEMORY),
417
errmsg("maximum number of prepared transactions reached"),
418
errhint("Increase max_prepared_transactions (currently %d).",
419
max_prepared_xacts)));
420
gxact = TwoPhaseState->freeGXacts;
421
TwoPhaseState->freeGXacts = gxact->next;
423
MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
425
gxact->ondisk = false;
427
/* And insert it into the active array */
428
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
429
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
431
LWLockRelease(TwoPhaseStateLock);
437
* MarkAsPreparingGuts
439
* This uses a gxact struct and puts it into the active array.
440
* NOTE: this is also used when reloading a gxact after a crash; so avoid
441
* assuming that we can use very much backend context.
443
* Note: This function should be called with appropriate locks held.
446
MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
447
TimestampTz prepared_at, Oid owner, Oid databaseid)
453
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
455
Assert(gxact != NULL);
456
proc = &ProcGlobal->allProcs[gxact->pgprocno];
457
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
459
/* Initialize the PGPROC entry */
460
MemSet(proc, 0, sizeof(PGPROC));
461
proc->pgprocno = gxact->pgprocno;
462
SHMQueueElemInit(&(proc->links));
463
proc->waitStatus = STATUS_OK;
464
/* We set up the gxact's VXID as InvalidBackendId/XID */
465
proc->lxid = (LocalTransactionId) xid;
467
pgxact->xmin = InvalidTransactionId;
468
pgxact->delayChkpt = false;
469
pgxact->vacuumFlags = 0;
471
proc->backendId = InvalidBackendId;
472
proc->databaseId = databaseid;
473
proc->roleId = owner;
474
proc->isBackgroundWorker = false;
475
proc->lwWaiting = false;
476
proc->lwWaitMode = 0;
477
proc->waitLock = NULL;
478
proc->waitProcLock = NULL;
479
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
480
SHMQueueInit(&(proc->myProcLocks[i]));
481
/* subxid data must be filled later by GXactLoadSubxactData */
482
pgxact->overflowed = false;
485
gxact->prepared_at = prepared_at;
487
gxact->owner = owner;
488
gxact->locking_backend = MyBackendId;
489
gxact->valid = false;
490
gxact->inredo = false;
491
strcpy(gxact->gid, gid);
494
* Remember that we have this GlobalTransaction entry locked for us. If we
495
* abort after this, we must release it.
497
MyLockedGxact = gxact;
501
* GXactLoadSubxactData
503
* If the transaction being persisted had any subtransactions, this must
504
* be called before MarkAsPrepared() to load information into the dummy
508
GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
509
TransactionId *children)
511
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
512
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
514
/* We need no extra lock since the GXACT isn't valid yet */
515
if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
517
pgxact->overflowed = true;
518
nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
522
memcpy(proc->subxids.xids, children,
523
nsubxacts * sizeof(TransactionId));
524
pgxact->nxids = nsubxacts;
530
* Mark the GXACT as fully valid, and enter it into the global ProcArray.
532
* lock_held indicates whether caller already holds TwoPhaseStateLock.
535
MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
537
/* Lock here may be overkill, but I'm not convinced of that ... */
539
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
540
Assert(!gxact->valid);
543
LWLockRelease(TwoPhaseStateLock);
546
* Put it into the global ProcArray so TransactionIdIsInProgress considers
547
* the XID as still running.
549
ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
554
* Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
556
static GlobalTransaction
557
LockGXact(const char *gid, Oid user)
561
/* on first call, register the exit hook */
562
if (!twophaseExitRegistered)
564
before_shmem_exit(AtProcExit_Twophase, 0);
565
twophaseExitRegistered = true;
568
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
570
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
572
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
573
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
575
/* Ignore not-yet-valid GIDs */
578
if (strcmp(gxact->gid, gid) != 0)
581
/* Found it, but has someone else got it locked? */
582
if (gxact->locking_backend != InvalidBackendId)
584
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
585
errmsg("prepared transaction with identifier \"%s\" is busy",
588
if (user != gxact->owner && !superuser_arg(user))
590
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
591
errmsg("permission denied to finish prepared transaction"),
592
errhint("Must be superuser or the user that prepared the transaction.")));
595
* Note: it probably would be possible to allow committing from
596
* another database; but at the moment NOTIFY is known not to work and
597
* there may be some other issues as well. Hence disallow until
598
* someone gets motivated to make it work.
600
if (MyDatabaseId != proc->databaseId)
602
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
603
errmsg("prepared transaction belongs to another database"),
604
errhint("Connect to the database where the transaction was prepared to finish it.")));
606
/* OK for me to lock it */
607
gxact->locking_backend = MyBackendId;
608
MyLockedGxact = gxact;
610
LWLockRelease(TwoPhaseStateLock);
615
LWLockRelease(TwoPhaseStateLock);
618
(errcode(ERRCODE_UNDEFINED_OBJECT),
619
errmsg("prepared transaction with identifier \"%s\" does not exist",
628
* Remove the prepared transaction from the shared memory array.
630
* NB: caller should have already removed it from ProcArray
633
RemoveGXact(GlobalTransaction gxact)
637
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
639
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
641
if (gxact == TwoPhaseState->prepXacts[i])
643
/* remove from the active array */
644
TwoPhaseState->numPrepXacts--;
645
TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
647
/* and put it back in the freelist */
648
gxact->next = TwoPhaseState->freeGXacts;
649
TwoPhaseState->freeGXacts = gxact;
655
elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
659
* Returns an array of all prepared transactions for the user-level
660
* function pg_prepared_xact.
662
* The returned array and all its elements are copies of internal data
663
* structures, to minimize the time we need to hold the TwoPhaseStateLock.
665
* WARNING -- we return even those transactions that are not fully prepared
666
* yet. The caller should filter them out if he doesn't want them.
668
* The returned array is palloc'd.
671
GetPreparedTransactionList(GlobalTransaction *gxacts)
673
GlobalTransaction array;
677
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
679
if (TwoPhaseState->numPrepXacts == 0)
681
LWLockRelease(TwoPhaseStateLock);
687
num = TwoPhaseState->numPrepXacts;
688
array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
690
for (i = 0; i < num; i++)
691
memcpy(array + i, TwoPhaseState->prepXacts[i],
692
sizeof(GlobalTransactionData));
694
LWLockRelease(TwoPhaseStateLock);
700
/* Working status for pg_prepared_xact */
703
GlobalTransaction array;
710
* Produce a view with one row per prepared transaction.
712
* This function is here so we don't have to export the
713
* GlobalTransactionData struct definition.
716
pg_prepared_xact(PG_FUNCTION_ARGS)
718
FuncCallContext *funcctx;
719
Working_State *status;
721
if (SRF_IS_FIRSTCALL())
724
MemoryContext oldcontext;
726
/* create a function context for cross-call persistence */
727
funcctx = SRF_FIRSTCALL_INIT();
730
* Switch to memory context appropriate for multiple function calls
732
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
734
/* build tupdesc for result tuples */
735
/* this had better match pg_prepared_xacts view in system_views.sql */
736
tupdesc = CreateTemplateTupleDesc(5, false);
737
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
739
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
741
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
742
TIMESTAMPTZOID, -1, 0);
743
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
745
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
748
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
751
* Collect all the 2PC status information that we will format and send
752
* out as a result set.
754
status = (Working_State *) palloc(sizeof(Working_State));
755
funcctx->user_fctx = (void *) status;
757
status->ngxacts = GetPreparedTransactionList(&status->array);
760
MemoryContextSwitchTo(oldcontext);
763
funcctx = SRF_PERCALL_SETUP();
764
status = (Working_State *) funcctx->user_fctx;
766
while (status->array != NULL && status->currIdx < status->ngxacts)
768
GlobalTransaction gxact = &status->array[status->currIdx++];
769
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
770
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
780
* Form tuple with appropriate data.
782
MemSet(values, 0, sizeof(values));
783
MemSet(nulls, 0, sizeof(nulls));
785
values[0] = TransactionIdGetDatum(pgxact->xid);
786
values[1] = CStringGetTextDatum(gxact->gid);
787
values[2] = TimestampTzGetDatum(gxact->prepared_at);
788
values[3] = ObjectIdGetDatum(gxact->owner);
789
values[4] = ObjectIdGetDatum(proc->databaseId);
791
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
792
result = HeapTupleGetDatum(tuple);
793
SRF_RETURN_NEXT(funcctx, result);
796
SRF_RETURN_DONE(funcctx);
801
* Get the GlobalTransaction struct for a prepared transaction
804
static GlobalTransaction
805
TwoPhaseGetGXact(TransactionId xid)
807
GlobalTransaction result = NULL;
810
static TransactionId cached_xid = InvalidTransactionId;
811
static GlobalTransaction cached_gxact = NULL;
814
* During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
815
* repeatedly for the same XID. We can save work with a simple cache.
817
if (xid == cached_xid)
820
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
822
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
824
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
825
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
827
if (pgxact->xid == xid)
834
LWLockRelease(TwoPhaseStateLock);
836
if (result == NULL) /* should not happen */
837
elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
840
cached_gxact = result;
846
* TwoPhaseGetDummyProc
847
* Get the dummy backend ID for prepared transaction specified by XID
849
* Dummy backend IDs are similar to real backend IDs of real backends.
850
* They start at MaxBackends + 1, and are unique across all currently active
851
* real backends and prepared transactions.
854
TwoPhaseGetDummyBackendId(TransactionId xid)
856
GlobalTransaction gxact = TwoPhaseGetGXact(xid);
858
return gxact->dummyBackendId;
862
* TwoPhaseGetDummyProc
863
* Get the PGPROC that represents a prepared transaction specified by XID
866
TwoPhaseGetDummyProc(TransactionId xid)
868
GlobalTransaction gxact = TwoPhaseGetGXact(xid);
870
return &ProcGlobal->allProcs[gxact->pgprocno];
873
/************************************************************************/
874
/* State file support */
875
/************************************************************************/
877
#define TwoPhaseFilePath(path, xid) \
878
snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
881
* 2PC state file format:
883
* 1. TwoPhaseFileHeader
884
* 2. TransactionId[] (subtransactions)
885
* 3. RelFileNode[] (files to be deleted at commit)
886
* 4. RelFileNode[] (files to be deleted at abort)
887
* 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
888
* 6. TwoPhaseRecordOnDisk
890
* 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
891
* 9. checksum (CRC-32C)
893
* Each segment except the final checksum is MAXALIGN'd.
897
* Header for a 2PC state file
899
#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
901
typedef struct TwoPhaseFileHeader
903
uint32 magic; /* format identifier */
904
uint32 total_len; /* actual file length */
905
TransactionId xid; /* original transaction XID */
906
Oid database; /* OID of database it was in */
907
TimestampTz prepared_at; /* time of preparation */
908
Oid owner; /* user running the transaction */
909
int32 nsubxacts; /* number of following subxact XIDs */
910
int32 ncommitrels; /* number of delete-on-commit rels */
911
int32 nabortrels; /* number of delete-on-abort rels */
912
int32 ninvalmsgs; /* number of cache invalidation messages */
913
bool initfileinval; /* does relcache init file need invalidation? */
914
uint16 gidlen; /* length of the GID - GID follows the header */
915
XLogRecPtr origin_lsn; /* lsn of this record at origin node */
916
TimestampTz origin_timestamp; /* time of prepare at origin node */
917
} TwoPhaseFileHeader;
920
* Header for each record in a state file
922
* NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
923
* The rmgr data will be stored starting on a MAXALIGN boundary.
925
typedef struct TwoPhaseRecordOnDisk
927
uint32 len; /* length of rmgr data */
928
TwoPhaseRmgrId rmid; /* resource manager for this record */
929
uint16 info; /* flag bits for use by rmgr */
930
} TwoPhaseRecordOnDisk;
933
* During prepare, the state file is assembled in memory before writing it
934
* to WAL and the actual state file. We use a chain of StateFileChunk blocks
937
typedef struct StateFileChunk
941
struct StateFileChunk *next;
946
StateFileChunk *head; /* first data block in the chain */
947
StateFileChunk *tail; /* last block in chain */
949
uint32 bytes_free; /* free bytes left in tail block */
950
uint32 total_len; /* total data bytes in chain */
955
* Append a block of data to records data structure.
957
* NB: each block is padded to a MAXALIGN multiple. This must be
958
* accounted for when the file is later read!
960
* The data is copied, so the caller is free to modify it afterwards.
963
save_state_data(const void *data, uint32 len)
965
uint32 padlen = MAXALIGN(len);
967
if (padlen > records.bytes_free)
969
records.tail->next = palloc0(sizeof(StateFileChunk));
970
records.tail = records.tail->next;
971
records.tail->len = 0;
972
records.tail->next = NULL;
973
records.num_chunks++;
975
records.bytes_free = Max(padlen, 512);
976
records.tail->data = palloc(records.bytes_free);
979
memcpy(((char *) records.tail->data) + records.tail->len, data, len);
980
records.tail->len += padlen;
981
records.bytes_free -= padlen;
982
records.total_len += padlen;
986
* Start preparing a state file.
988
* Initializes data structure and inserts the 2PC file header record.
991
StartPrepare(GlobalTransaction gxact)
993
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
994
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
995
TransactionId xid = pgxact->xid;
996
TwoPhaseFileHeader hdr;
997
TransactionId *children;
998
RelFileNode *commitrels;
999
RelFileNode *abortrels;
1000
SharedInvalidationMessage *invalmsgs;
1002
/* Initialize linked list */
1003
records.head = palloc0(sizeof(StateFileChunk));
1004
records.head->len = 0;
1005
records.head->next = NULL;
1007
records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1008
records.head->data = palloc(records.bytes_free);
1010
records.tail = records.head;
1011
records.num_chunks = 1;
1013
records.total_len = 0;
1016
hdr.magic = TWOPHASE_MAGIC;
1017
hdr.total_len = 0; /* EndPrepare will fill this in */
1019
hdr.database = proc->databaseId;
1020
hdr.prepared_at = gxact->prepared_at;
1021
hdr.owner = gxact->owner;
1022
hdr.nsubxacts = xactGetCommittedChildren(&children);
1023
hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1024
hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1025
hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1026
&hdr.initfileinval);
1027
hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1029
save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1030
save_state_data(gxact->gid, hdr.gidlen);
1033
* Add the additional info about subxacts, deletable files and cache
1034
* invalidation messages.
1036
if (hdr.nsubxacts > 0)
1038
save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1039
/* While we have the child-xact data, stuff it in the gxact too */
1040
GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1042
if (hdr.ncommitrels > 0)
1044
save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
1047
if (hdr.nabortrels > 0)
1049
save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
1052
if (hdr.ninvalmsgs > 0)
1054
save_state_data(invalmsgs,
1055
hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1061
* Finish preparing state data and writing it to WAL.
1064
EndPrepare(GlobalTransaction gxact)
1066
TwoPhaseFileHeader *hdr;
1067
StateFileChunk *record;
1070
/* Add the end sentinel to the list of 2PC records */
1071
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1074
/* Go back and fill in total_len in the file header record */
1075
hdr = (TwoPhaseFileHeader *) records.head->data;
1076
Assert(hdr->magic == TWOPHASE_MAGIC);
1077
hdr->total_len = records.total_len + sizeof(pg_crc32c);
1079
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1080
replorigin_session_origin != DoNotReplicateId);
1084
Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
1085
hdr->origin_lsn = replorigin_session_origin_lsn;
1086
hdr->origin_timestamp = replorigin_session_origin_timestamp;
1090
hdr->origin_lsn = InvalidXLogRecPtr;
1091
hdr->origin_timestamp = 0;
1095
* If the data size exceeds MaxAllocSize, we won't be able to read it in
1096
* ReadTwoPhaseFile. Check for that now, rather than fail in the case
1097
* where we write data to file and then re-read at commit time.
1099
if (hdr->total_len > MaxAllocSize)
1101
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1102
errmsg("two-phase state file maximum length exceeded")));
1105
* Now writing 2PC state data to WAL. We let the WAL's CRC protection
1106
* cover us, so no need to calculate a separate CRC.
1108
* We have to set delayChkpt here, too; otherwise a checkpoint starting
1109
* immediately after the WAL record is inserted could complete without
1110
* fsync'ing our state file. (This is essentially the same kind of race
1111
* condition as the COMMIT-to-clog-write case that RecordTransactionCommit
1112
* uses delayChkpt for; see notes there.)
1114
* We save the PREPARE record's location in the gxact for later use by
1115
* CheckPointTwoPhase.
1117
XLogEnsureRecordSpace(0, records.num_chunks);
1119
START_CRIT_SECTION();
1121
MyPgXact->delayChkpt = true;
1124
for (record = records.head; record != NULL; record = record->next)
1125
XLogRegisterData(record->data, record->len);
1127
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1129
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1133
/* Move LSNs forward for this replication origin */
1134
replorigin_session_advance(replorigin_session_origin_lsn,
1135
gxact->prepare_end_lsn);
1138
XLogFlush(gxact->prepare_end_lsn);
1140
/* If we crash now, we have prepared: WAL replay will fix things */
1142
/* Store record's start location to read that later on Commit */
1143
gxact->prepare_start_lsn = ProcLastRecPtr;
1146
* Mark the prepared transaction as valid. As soon as xact.c marks
1147
* MyPgXact as not running our XID (which it will do immediately after
1148
* this function returns), others can commit/rollback the xact.
1150
* NB: a side effect of this is to make a dummy ProcArray entry for the
1151
* prepared XID. This must happen before we clear the XID from MyPgXact,
1152
* else there is a window where the XID is not running according to
1153
* TransactionIdIsInProgress, and onlookers would be entitled to assume
1154
* the xact crashed. Instead we have a window where the same XID appears
1155
* twice in ProcArray, which is OK.
1157
MarkAsPrepared(gxact, false);
1160
* Now we can mark ourselves as out of the commit critical section: a
1161
* checkpoint starting after this will certainly see the gxact as a
1162
* candidate for fsyncing.
1164
MyPgXact->delayChkpt = false;
1167
* Remember that we have this GlobalTransaction entry locked for us. If
1168
* we crash after this point, it's too late to abort, but we must unlock
1169
* it so that the prepared transaction can be committed or rolled back.
1171
MyLockedGxact = gxact;
1176
* Wait for synchronous replication, if required.
1178
* Note that at this stage we have marked the prepare, but still show as
1179
* running in the procarray (twice!) and continue to hold locks.
1181
SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1183
records.tail = records.head = NULL;
1184
records.num_chunks = 0;
1188
* Register a 2PC record to be written to state file.
1191
RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1192
const void *data, uint32 len)
1194
TwoPhaseRecordOnDisk record;
1199
save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1201
save_state_data(data, len);
1206
* Read and validate the state file for xid.
1208
* If it looks OK (has a valid magic number and CRC), return the palloc'd
1209
* contents of the file. Otherwise return NULL.
1212
ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
1214
char path[MAXPGPATH];
1216
TwoPhaseFileHeader *hdr;
1223
TwoPhaseFilePath(path, xid);
1225
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1230
(errcode_for_file_access(),
1231
errmsg("could not open two-phase state file \"%s\": %m",
1237
* Check file length. We can determine a lower bound pretty easily. We
1238
* set an upper bound to avoid palloc() failure on a corrupt file, though
1239
* we can't guarantee that we won't get an out of memory error anyway,
1240
* even on a valid file.
1242
if (fstat(fd, &stat))
1244
CloseTransientFile(fd);
1247
(errcode_for_file_access(),
1248
errmsg("could not stat two-phase state file \"%s\": %m",
1253
if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1254
MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1255
sizeof(pg_crc32c)) ||
1256
stat.st_size > MaxAllocSize)
1258
CloseTransientFile(fd);
1262
crc_offset = stat.st_size - sizeof(pg_crc32c);
1263
if (crc_offset != MAXALIGN(crc_offset))
1265
CloseTransientFile(fd);
1270
* OK, slurp in the file.
1272
buf = (char *) palloc(stat.st_size);
1274
pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1275
if (read(fd, buf, stat.st_size) != stat.st_size)
1277
pgstat_report_wait_end();
1278
CloseTransientFile(fd);
1281
(errcode_for_file_access(),
1282
errmsg("could not read two-phase state file \"%s\": %m",
1288
pgstat_report_wait_end();
1289
CloseTransientFile(fd);
1291
hdr = (TwoPhaseFileHeader *) buf;
1292
if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1298
INIT_CRC32C(calc_crc);
1299
COMP_CRC32C(calc_crc, buf, crc_offset);
1300
FIN_CRC32C(calc_crc);
1302
file_crc = *((pg_crc32c *) (buf + crc_offset));
1304
if (!EQ_CRC32C(calc_crc, file_crc))
1314
* ParsePrepareRecord
1317
ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
1319
TwoPhaseFileHeader *hdr;
1322
hdr = (TwoPhaseFileHeader *) xlrec;
1323
bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
1325
parsed->origin_lsn = hdr->origin_lsn;
1326
parsed->origin_timestamp = hdr->origin_timestamp;
1327
parsed->twophase_xid = hdr->xid;
1328
parsed->dbId = hdr->database;
1329
parsed->nsubxacts = hdr->nsubxacts;
1330
parsed->nrels = hdr->ncommitrels;
1331
parsed->nabortrels = hdr->nabortrels;
1332
parsed->nmsgs = hdr->ninvalmsgs;
1334
strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
1335
bufptr += MAXALIGN(hdr->gidlen);
1337
parsed->subxacts = (TransactionId *) bufptr;
1338
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1340
parsed->xnodes = (RelFileNode *) bufptr;
1341
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1343
parsed->abortnodes = (RelFileNode *) bufptr;
1344
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1346
parsed->msgs = (SharedInvalidationMessage *) bufptr;
1347
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1353
* Reads 2PC data from xlog. During checkpoint this data will be moved to
1354
* twophase files and ReadTwoPhaseFile should be used instead.
1356
* Note clearly that this function can access WAL during normal operation,
1357
* similarly to the way WALSender or Logical Decoding would do.
1361
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1364
XLogReaderState *xlogreader;
1367
xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
1371
(errcode(ERRCODE_OUT_OF_MEMORY),
1372
errmsg("out of memory"),
1373
errdetail("Failed while allocating a WAL reading processor.")));
1375
record = XLogReadRecord(xlogreader, lsn, &errormsg);
1378
(errcode_for_file_access(),
1379
errmsg("could not read two-phase state from WAL at %X/%X",
1380
(uint32) (lsn >> 32),
1383
if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1384
(XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1386
(errcode_for_file_access(),
1387
errmsg("expected two-phase state data is not present in WAL at %X/%X",
1388
(uint32) (lsn >> 32),
1392
*len = XLogRecGetDataLen(xlogreader);
1394
*buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
1395
memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1397
XLogReaderFree(xlogreader);
1402
* Confirms an xid is prepared, during recovery
1405
StandbyTransactionIdIsPrepared(TransactionId xid)
1408
TwoPhaseFileHeader *hdr;
1411
Assert(TransactionIdIsValid(xid));
1413
if (max_prepared_xacts <= 0)
1414
return false; /* nothing to do */
1416
/* Read and validate file */
1417
buf = ReadTwoPhaseFile(xid, false);
1421
/* Check header also */
1422
hdr = (TwoPhaseFileHeader *) buf;
1423
result = TransactionIdEquals(hdr->xid, xid);
1430
* FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1433
FinishPreparedTransaction(const char *gid, bool isCommit)
1435
GlobalTransaction gxact;
1441
TwoPhaseFileHeader *hdr;
1442
TransactionId latestXid;
1443
TransactionId *children;
1444
RelFileNode *commitrels;
1445
RelFileNode *abortrels;
1446
RelFileNode *delrels;
1448
SharedInvalidationMessage *invalmsgs;
1452
* Validate the GID, and lock the GXACT to ensure that two backends do not
1453
* try to commit the same GID at once.
1455
gxact = LockGXact(gid, GetUserId());
1456
proc = &ProcGlobal->allProcs[gxact->pgprocno];
1457
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1461
* Read and validate 2PC state data. State data will typically be stored
1462
* in WAL files if the LSN is after the last checkpoint record, or moved
1463
* to disk if for some reason they have lived for a long time.
1466
buf = ReadTwoPhaseFile(xid, true);
1468
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1472
* Disassemble the header area
1474
hdr = (TwoPhaseFileHeader *) buf;
1475
Assert(TransactionIdEquals(hdr->xid, xid));
1476
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1477
bufptr += MAXALIGN(hdr->gidlen);
1478
children = (TransactionId *) bufptr;
1479
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1480
commitrels = (RelFileNode *) bufptr;
1481
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1482
abortrels = (RelFileNode *) bufptr;
1483
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1484
invalmsgs = (SharedInvalidationMessage *) bufptr;
1485
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1487
/* compute latestXid among all children */
1488
latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1490
/* Prevent cancel/die interrupt while cleaning up */
1494
* The order of operations here is critical: make the XLOG entry for
1495
* commit or abort, then mark the transaction committed or aborted in
1496
* pg_xact, then remove its PGPROC from the global ProcArray (which means
1497
* TransactionIdIsInProgress will stop saying the prepared xact is in
1498
* progress), then run the post-commit or post-abort callbacks. The
1499
* callbacks will release the locks the transaction held.
1502
RecordTransactionCommitPrepared(xid,
1503
hdr->nsubxacts, children,
1504
hdr->ncommitrels, commitrels,
1505
hdr->ninvalmsgs, invalmsgs,
1506
hdr->initfileinval, gid);
1508
RecordTransactionAbortPrepared(xid,
1509
hdr->nsubxacts, children,
1510
hdr->nabortrels, abortrels,
1513
ProcArrayRemove(proc, latestXid);
1516
* In case we fail while running the callbacks, mark the gxact invalid so
1517
* no one else will try to commit/rollback, and so it will be recycled if
1518
* we fail after this point. It is still locked by our backend so it
1519
* won't go away yet.
1521
* (We assume it's safe to do this without taking TwoPhaseStateLock.)
1523
gxact->valid = false;
1526
* We have to remove any files that were supposed to be dropped. For
1527
* consistency with the regular xact.c code paths, must do this before
1528
* releasing locks, so do it before running the callbacks.
1530
* NB: this code knows that we couldn't be dropping any temp rels ...
1534
delrels = commitrels;
1535
ndelrels = hdr->ncommitrels;
1539
delrels = abortrels;
1540
ndelrels = hdr->nabortrels;
1542
for (i = 0; i < ndelrels; i++)
1544
SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);
1546
smgrdounlink(srel, false);
1551
* Handle cache invalidation messages.
1553
* Relcache init file invalidation requires processing both before and
1554
* after we send the SI messages. See AtEOXact_Inval()
1556
if (hdr->initfileinval)
1557
RelationCacheInitFilePreInvalidate();
1558
SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1559
if (hdr->initfileinval)
1560
RelationCacheInitFilePostInvalidate();
1562
/* And now do the callbacks */
1564
ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1566
ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1568
PredicateLockTwoPhaseFinish(xid, isCommit);
1570
/* Count the prepared xact as committed or aborted */
1571
AtEOXact_PgStat(isCommit);
1574
* And now we can clean up any files we may have left.
1577
RemoveTwoPhaseFile(xid, true);
1579
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1581
LWLockRelease(TwoPhaseStateLock);
1582
MyLockedGxact = NULL;
1584
RESUME_INTERRUPTS();
1590
* Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1593
ProcessRecords(char *bufptr, TransactionId xid,
1594
const TwoPhaseCallback callbacks[])
1598
TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1600
Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1601
if (record->rmid == TWOPHASE_RM_END_ID)
1604
bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1606
if (callbacks[record->rmid] != NULL)
1607
callbacks[record->rmid] (xid, record->info,
1608
(void *) bufptr, record->len);
1610
bufptr += MAXALIGN(record->len);
1615
* Remove the 2PC file for the specified XID.
1617
* If giveWarning is false, do not complain about file-not-present;
1618
* this is an expected case during WAL replay.
1621
RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1623
char path[MAXPGPATH];
1625
TwoPhaseFilePath(path, xid);
1627
if (errno != ENOENT || giveWarning)
1629
(errcode_for_file_access(),
1630
errmsg("could not remove two-phase state file \"%s\": %m",
1635
* Recreates a state file. This is used in WAL replay and during
1636
* checkpoint creation.
1638
* Note: content and len don't include CRC.
1641
RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1643
char path[MAXPGPATH];
1644
pg_crc32c statefile_crc;
1648
INIT_CRC32C(statefile_crc);
1649
COMP_CRC32C(statefile_crc, content, len);
1650
FIN_CRC32C(statefile_crc);
1652
TwoPhaseFilePath(path, xid);
1654
fd = OpenTransientFile(path,
1655
O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
1658
(errcode_for_file_access(),
1659
errmsg("could not recreate two-phase state file \"%s\": %m",
1662
/* Write content and CRC */
1663
pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1664
if (write(fd, content, len) != len)
1666
pgstat_report_wait_end();
1667
CloseTransientFile(fd);
1669
(errcode_for_file_access(),
1670
errmsg("could not write two-phase state file: %m")));
1672
if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1674
pgstat_report_wait_end();
1675
CloseTransientFile(fd);
1677
(errcode_for_file_access(),
1678
errmsg("could not write two-phase state file: %m")));
1680
pgstat_report_wait_end();
1683
* We must fsync the file because the end-of-replay checkpoint will not do
1684
* so, there being no GXACT in shared memory yet to tell it to.
1686
pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1687
if (pg_fsync(fd) != 0)
1689
CloseTransientFile(fd);
1691
(errcode_for_file_access(),
1692
errmsg("could not fsync two-phase state file: %m")));
1694
pgstat_report_wait_end();
1696
if (CloseTransientFile(fd) != 0)
1698
(errcode_for_file_access(),
1699
errmsg("could not close two-phase state file: %m")));
1703
* CheckPointTwoPhase -- handle 2PC component of checkpointing.
1705
* We must fsync the state file of any GXACT that is valid or has been
1706
* generated during redo and has a PREPARE LSN <= the checkpoint's redo
1707
* horizon. (If the gxact isn't valid yet, has not been generated in
1708
* redo, or has a later LSN, this checkpoint is not responsible for
1711
* This is deliberately run as late as possible in the checkpoint sequence,
1712
* because GXACTs ordinarily have short lifespans, and so it is quite
1713
* possible that GXACTs that were valid at checkpoint start will no longer
1714
* exist if we wait a little bit. With typical checkpoint settings this
1715
* will be about 3 minutes for an online checkpoint, so as a result we
1716
* we expect that there will be no GXACTs that need to be copied to disk.
1718
* If a GXACT remains valid across multiple checkpoints, it will already
1719
* be on disk so we don't bother to repeat that write.
1722
CheckPointTwoPhase(XLogRecPtr redo_horizon)
1725
int serialized_xacts = 0;
1727
if (max_prepared_xacts <= 0)
1728
return; /* nothing to do */
1730
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1733
* We are expecting there to be zero GXACTs that need to be copied to
1734
* disk, so we perform all I/O while holding TwoPhaseStateLock for
1735
* simplicity. This prevents any new xacts from preparing while this
1736
* occurs, which shouldn't be a problem since the presence of long-lived
1737
* prepared xacts indicates the transaction manager isn't active.
1739
* It's also possible to move I/O out of the lock, but on every error we
1740
* should check whether somebody committed our transaction in different
1741
* backend. Let's leave this optimization for future, if somebody will
1742
* spot that this place cause bottleneck.
1744
* Note that it isn't possible for there to be a GXACT with a
1745
* prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1746
* because of the efforts with delayChkpt.
1748
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1749
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1752
* Note that we are using gxact not pgxact so this works in recovery
1755
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1757
if ((gxact->valid || gxact->inredo) &&
1759
gxact->prepare_end_lsn <= redo_horizon)
1764
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1765
RecreateTwoPhaseFile(gxact->xid, buf, len);
1766
gxact->ondisk = true;
1767
gxact->prepare_start_lsn = InvalidXLogRecPtr;
1768
gxact->prepare_end_lsn = InvalidXLogRecPtr;
1773
LWLockRelease(TwoPhaseStateLock);
1776
* Flush unconditionally the parent directory to make any information
1777
* durable on disk. Two-phase files could have been removed and those
1778
* removals need to be made persistent as well as any files newly created
1779
* previously since the last checkpoint.
1781
fsync_fname(TWOPHASE_DIR, true);
1783
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1785
if (log_checkpoints && serialized_xacts > 0)
1787
(errmsg_plural("%u two-phase state file was written "
1788
"for a long-running prepared transaction",
1789
"%u two-phase state files were written "
1790
"for long-running prepared transactions",
1792
serialized_xacts)));
1796
* restoreTwoPhaseData
1798
* Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1799
* This is called once at the beginning of recovery, saving any extra
1800
* lookups in the future. Two-phase files that are newer than the
1801
* minimum XID horizon are discarded on the way.
1804
restoreTwoPhaseData(void)
1807
struct dirent *clde;
1809
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1810
cldir = AllocateDir(TWOPHASE_DIR);
1811
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1813
if (strlen(clde->d_name) == 8 &&
1814
strspn(clde->d_name, "0123456789ABCDEF") == 8)
1819
xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1821
buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
1822
true, false, false);
1826
PrepareRedoAdd(buf, InvalidXLogRecPtr,
1827
InvalidXLogRecPtr, InvalidRepOriginId);
1830
LWLockRelease(TwoPhaseStateLock);
1835
* PrescanPreparedTransactions
1837
* Scan the shared memory entries of TwoPhaseState and determine the range
1838
* of valid XIDs present. This is run during database startup, after we
1839
* have completed reading WAL. ShmemVariableCache->nextXid has been set to
1840
* one more than the highest XID for which evidence exists in WAL.
1842
* We throw away any prepared xacts with main XID beyond nextXid --- if any
1843
* are present, it suggests that the DBA has done a PITR recovery to an
1844
* earlier point in time without cleaning out pg_twophase. We dare not
1845
* try to recover such prepared xacts since they likely depend on database
1846
* state that doesn't exist now.
1848
* However, we will advance nextXid beyond any subxact XIDs belonging to
1849
* valid prepared xacts. We need to do this since subxact commit doesn't
1850
* write a WAL entry, and so there might be no evidence in WAL of those
1853
* Our other responsibility is to determine and return the oldest valid XID
1854
* among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1855
* This is needed to synchronize pg_subtrans startup properly.
1857
* If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1858
* top-level xids is stored in *xids_p. The number of entries in the array
1859
* is returned in *nxids_p.
1862
PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1864
TransactionId origNextXid = ShmemVariableCache->nextXid;
1865
TransactionId result = origNextXid;
1866
TransactionId *xids = NULL;
1871
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1872
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1876
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1878
Assert(gxact->inredo);
1882
buf = ProcessTwoPhaseBuffer(xid,
1883
gxact->prepare_start_lsn,
1884
gxact->ondisk, false, true);
1890
* OK, we think this file is valid. Incorporate xid into the
1891
* running-minimum result.
1893
if (TransactionIdPrecedes(xid, result))
1898
if (nxids == allocsize)
1903
xids = palloc(allocsize * sizeof(TransactionId));
1907
allocsize = allocsize * 2;
1908
xids = repalloc(xids, allocsize * sizeof(TransactionId));
1911
xids[nxids++] = xid;
1916
LWLockRelease(TwoPhaseStateLock);
1928
* StandbyRecoverPreparedTransactions
1930
* Scan the shared memory entries of TwoPhaseState and setup all the required
1931
* information to allow standby queries to treat prepared transactions as still
1934
* This is never called at the end of recovery - we use
1935
* RecoverPreparedTransactions() at that point.
1937
* The lack of calls to SubTransSetParent() calls here is by design;
1938
* those calls are made by RecoverPreparedTransactions() at the end of recovery
1939
* for those xacts that need this.
1942
StandbyRecoverPreparedTransactions(void)
1946
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1947
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1951
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1953
Assert(gxact->inredo);
1957
buf = ProcessTwoPhaseBuffer(xid,
1958
gxact->prepare_start_lsn,
1959
gxact->ondisk, false, false);
1963
LWLockRelease(TwoPhaseStateLock);
1967
* RecoverPreparedTransactions
1969
* Scan the shared memory entries of TwoPhaseState and reload the state for
1970
* each prepared transaction (reacquire locks, etc).
1972
* This is run at the end of recovery, but before we allow backends to write
1975
* At the end of recovery the way we take snapshots will change. We now need
1976
* to mark all running transactions with their full SubTransSetParent() info
1977
* to allow normal snapshots to work correctly if snapshots overflow.
1978
* We do this here because by definition prepared transactions are the only
1979
* type of write transaction still running, so this is necessary and
1983
RecoverPreparedTransactions(void)
1987
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1988
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1992
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1994
TwoPhaseFileHeader *hdr;
1995
TransactionId *subxids;
2001
* Reconstruct subtrans state for the transaction --- needed because
2002
* pg_subtrans is not preserved over a restart. Note that we are
2003
* linking all the subtransactions directly to the top-level XID;
2004
* there may originally have been a more complex hierarchy, but
2005
* there's no need to restore that exactly. It's possible that
2006
* SubTransSetParent has been set before, if the prepared transaction
2007
* generated xid assignment records.
2009
buf = ProcessTwoPhaseBuffer(xid,
2010
gxact->prepare_start_lsn,
2011
gxact->ondisk, true, false);
2016
(errmsg("recovering prepared transaction %u from shared memory", xid)));
2018
hdr = (TwoPhaseFileHeader *) buf;
2019
Assert(TransactionIdEquals(hdr->xid, xid));
2020
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2021
gid = (const char *) bufptr;
2022
bufptr += MAXALIGN(hdr->gidlen);
2023
subxids = (TransactionId *) bufptr;
2024
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2025
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2026
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2027
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2030
* Recreate its GXACT and dummy PGPROC. But, check whether it was
2031
* added in redo and already has a shmem entry for it.
2033
MarkAsPreparingGuts(gxact, xid, gid,
2035
hdr->owner, hdr->database);
2037
/* recovered, so reset the flag for entries generated by redo */
2038
gxact->inredo = false;
2040
GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2041
MarkAsPrepared(gxact, true);
2043
LWLockRelease(TwoPhaseStateLock);
2046
* Recover other state (notably locks) using resource managers.
2048
ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2051
* Release locks held by the standby process after we process each
2052
* prepared transaction. As a result, we don't need too many
2053
* additional locks at any one time.
2056
StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2059
* We're done with recovering this transaction. Clear MyLockedGxact,
2060
* like we do in PrepareTransaction() during normal operation.
2062
PostPrepare_Twophase();
2066
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2069
LWLockRelease(TwoPhaseStateLock);
2073
* ProcessTwoPhaseBuffer
2075
* Given a transaction id, read it either from disk or read it directly
2076
* via shmem xlog record pointer using the provided "prepare_start_lsn".
2078
* If setParent is true, set up subtransaction parent linkages.
2080
* If setNextXid is true, set ShmemVariableCache->nextXid to the newest
2084
ProcessTwoPhaseBuffer(TransactionId xid,
2085
XLogRecPtr prepare_start_lsn,
2087
bool setParent, bool setNextXid)
2089
TransactionId origNextXid = ShmemVariableCache->nextXid;
2090
TransactionId *subxids;
2092
TwoPhaseFileHeader *hdr;
2095
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2098
Assert(prepare_start_lsn != InvalidXLogRecPtr);
2100
/* Already processed? */
2101
if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2106
(errmsg("removing stale two-phase state file for transaction %u",
2108
RemoveTwoPhaseFile(xid, true);
2113
(errmsg("removing stale two-phase state from memory for transaction %u",
2115
PrepareRedoRemove(xid, true);
2120
/* Reject XID if too new */
2121
if (TransactionIdFollowsOrEquals(xid, origNextXid))
2126
(errmsg("removing future two-phase state file for transaction %u",
2128
RemoveTwoPhaseFile(xid, true);
2133
(errmsg("removing future two-phase state from memory for transaction %u",
2135
PrepareRedoRemove(xid, true);
2142
/* Read and validate file */
2143
buf = ReadTwoPhaseFile(xid, true);
2147
(errmsg("removing corrupt two-phase state file for transaction %u",
2149
RemoveTwoPhaseFile(xid, true);
2155
/* Read xlog data */
2156
XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2159
/* Deconstruct header */
2160
hdr = (TwoPhaseFileHeader *) buf;
2161
if (!TransactionIdEquals(hdr->xid, xid))
2166
(errmsg("removing corrupt two-phase state file for transaction %u",
2168
RemoveTwoPhaseFile(xid, true);
2173
(errmsg("removing corrupt two-phase state from memory for transaction %u",
2175
PrepareRedoRemove(xid, true);
2182
* Examine subtransaction XIDs ... they should all follow main XID, and
2183
* they may force us to advance nextXid.
2185
subxids = (TransactionId *) (buf +
2186
MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2187
MAXALIGN(hdr->gidlen));
2188
for (i = 0; i < hdr->nsubxacts; i++)
2190
TransactionId subxid = subxids[i];
2192
Assert(TransactionIdFollows(subxid, xid));
2194
/* update nextXid if needed */
2196
TransactionIdFollowsOrEquals(subxid,
2197
ShmemVariableCache->nextXid))
2200
* We don't expect anyone else to modify nextXid, hence we don't
2201
* need to hold a lock while examining it. We still acquire the
2202
* lock to modify it, though, so we recheck.
2204
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
2205
if (TransactionIdFollowsOrEquals(subxid,
2206
ShmemVariableCache->nextXid))
2208
ShmemVariableCache->nextXid = subxid;
2209
TransactionIdAdvance(ShmemVariableCache->nextXid);
2211
LWLockRelease(XidGenLock);
2215
SubTransSetParent(subxid, xid);
2223
* RecordTransactionCommitPrepared
2225
* This is basically the same as RecordTransactionCommit (q.v. if you change
2226
* this function): in particular, we must set the delayChkpt flag to avoid a
2229
* We know the transaction made at least one XLOG entry (its PREPARE),
2230
* so it is never possible to optimize out the commit record.
2233
RecordTransactionCommitPrepared(TransactionId xid,
2235
TransactionId *children,
2239
SharedInvalidationMessage *invalmsgs,
2244
TimestampTz committs = GetCurrentTimestamp();
2248
* Are we using the replication origins feature? Or, in other words, are
2249
* we replaying remote actions?
2251
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2252
replorigin_session_origin != DoNotReplicateId);
2254
START_CRIT_SECTION();
2256
/* See notes in RecordTransactionCommit */
2257
MyPgXact->delayChkpt = true;
2260
* Emit the XLOG commit record. Note that we mark 2PC commits as
2261
* potentially having AccessExclusiveLocks since we don't know whether or
2264
recptr = XactLogCommitRecord(committs,
2265
nchildren, children, nrels, rels,
2266
ninvalmsgs, invalmsgs,
2267
initfileinval, false,
2268
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2273
/* Move LSNs forward for this replication origin */
2274
replorigin_session_advance(replorigin_session_origin_lsn,
2278
* Record commit timestamp. The value comes from plain commit timestamp
2279
* if replorigin is not enabled, or replorigin already set a value for us
2280
* in replorigin_session_origin_timestamp otherwise.
2282
* We don't need to WAL-log anything here, as the commit record written
2283
* above already contains the data.
2285
if (!replorigin || replorigin_session_origin_timestamp == 0)
2286
replorigin_session_origin_timestamp = committs;
2288
TransactionTreeSetCommitTsData(xid, nchildren, children,
2289
replorigin_session_origin_timestamp,
2290
replorigin_session_origin, false);
2293
* We don't currently try to sleep before flush here ... nor is there any
2294
* support for async commit of a prepared xact (the very idea is probably
2298
/* Flush XLOG to disk */
2301
/* Mark the transaction committed in pg_xact */
2302
TransactionIdCommitTree(xid, nchildren, children);
2304
/* Checkpoint can proceed now */
2305
MyPgXact->delayChkpt = false;
2310
* Wait for synchronous replication, if required.
2312
* Note that at this stage we have marked clog, but still show as running
2313
* in the procarray and continue to hold locks.
2315
SyncRepWaitForLSN(recptr, true);
2319
* RecordTransactionAbortPrepared
2321
* This is basically the same as RecordTransactionAbort.
2323
* We know the transaction made at least one XLOG entry (its PREPARE),
2324
* so it is never possible to optimize out the abort record.
2327
RecordTransactionAbortPrepared(TransactionId xid,
2329
TransactionId *children,
2337
* Catch the scenario where we aborted partway through
2338
* RecordTransactionCommitPrepared ...
2340
if (TransactionIdDidCommit(xid))
2341
elog(PANIC, "cannot abort transaction %u, it was already committed",
2344
START_CRIT_SECTION();
2347
* Emit the XLOG commit record. Note that we mark 2PC aborts as
2348
* potentially having AccessExclusiveLocks since we don't know whether or
2351
recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2352
nchildren, children,
2354
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2357
/* Always flush, since we're about to remove the 2PC state file */
2361
* Mark the transaction aborted in clog. This is not absolutely necessary
2362
* but we may as well do it while we are here.
2364
TransactionIdAbortTree(xid, nchildren, children);
2369
* Wait for synchronous replication, if required.
2371
* Note that at this stage we have marked clog, but still show as running
2372
* in the procarray and continue to hold locks.
2374
SyncRepWaitForLSN(recptr, false);
2380
* Store pointers to the start/end of the WAL record along with the xid in
2381
* a gxact entry in shared memory TwoPhaseState structure. If caller
2382
* specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2383
* data, the entry is marked as located on disk.
2386
PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
2387
XLogRecPtr end_lsn, RepOriginId origin_id)
2389
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2392
GlobalTransaction gxact;
2394
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2395
Assert(RecoveryInProgress());
2397
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2398
gid = (const char *) bufptr;
2401
* Reserve the GID for the given transaction in the redo code path.
2403
* This creates a gxact struct and puts it into the active array.
2405
* In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2406
* shared memory. Hence, we only fill up the bare minimum contents here.
2407
* The gxact also gets marked with gxact->inredo set to true to indicate
2408
* that it got added in the redo phase
2411
/* Get a free gxact from the freelist */
2412
if (TwoPhaseState->freeGXacts == NULL)
2414
(errcode(ERRCODE_OUT_OF_MEMORY),
2415
errmsg("maximum number of prepared transactions reached"),
2416
errhint("Increase max_prepared_transactions (currently %d).",
2417
max_prepared_xacts)));
2418
gxact = TwoPhaseState->freeGXacts;
2419
TwoPhaseState->freeGXacts = gxact->next;
2421
gxact->prepared_at = hdr->prepared_at;
2422
gxact->prepare_start_lsn = start_lsn;
2423
gxact->prepare_end_lsn = end_lsn;
2424
gxact->xid = hdr->xid;
2425
gxact->owner = hdr->owner;
2426
gxact->locking_backend = InvalidBackendId;
2427
gxact->valid = false;
2428
gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2429
gxact->inredo = true; /* yes, added in redo */
2430
strcpy(gxact->gid, gid);
2432
/* And insert it into the active array */
2433
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2434
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2436
if (origin_id != InvalidRepOriginId)
2438
/* recover apply progress */
2439
replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2440
false /* backward */ , false /* WAL */ );
2443
elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2449
* Remove the corresponding gxact entry from TwoPhaseState. Also remove
2450
* the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2452
* Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2456
PrepareRedoRemove(TransactionId xid, bool giveWarning)
2458
GlobalTransaction gxact = NULL;
2462
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2463
Assert(RecoveryInProgress());
2465
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2467
gxact = TwoPhaseState->prepXacts[i];
2469
if (gxact->xid == xid)
2471
Assert(gxact->inredo);
2478
* Just leave if there is nothing, this is expected during WAL replay.
2484
* And now we can clean up any files we may have left.
2486
elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2488
RemoveTwoPhaseFile(xid, giveWarning);