611
611
ProcessStandbyHSFeedbackMessage(void)
613
613
StandbyHSFeedbackMessage reply;
614
TransactionId newxmin = InvalidTransactionId;
614
TransactionId nextXid;
616
pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
617
/* Decipher the reply message */
618
pq_copymsgbytes(&reply_message, (char *) &reply,
619
sizeof(StandbyHSFeedbackMessage));
618
621
elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
623
* Update the WalSender's proc xmin to allow it to be visible to
624
* snapshots. This will hold back the removal of dead rows and thereby
625
* prevent the generation of cleanup conflicts on the standby server.
627
if (TransactionIdIsValid(reply.xmin))
629
TransactionId nextXid;
631
bool epochOK = false;
633
GetNextXidAndEpoch(&nextXid, &nextEpoch);
636
* Epoch of oldestXmin should be same as standby or if the counter has
637
* wrapped, then one less than reply.
639
if (reply.xmin <= nextXid)
641
if (reply.epoch == nextEpoch)
646
if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
651
* Feedback from standby must not go backwards, nor should it go
652
* forwards further than our most recent xid.
654
if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
656
if (!TransactionIdIsValid(MyProc->xmin))
658
TransactionId oldestXmin = GetOldestXmin(true, true);
660
if (TransactionIdPrecedes(oldestXmin, reply.xmin))
661
newxmin = reply.xmin;
663
newxmin = oldestXmin;
667
if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
668
newxmin = reply.xmin;
670
newxmin = MyProc->xmin; /* stay the same */
676
* Grab the ProcArrayLock to set xmin, or invalidate for bad reply
678
if (MyProc->xmin != newxmin)
680
LWLockAcquire(ProcArrayLock, LW_SHARED);
681
MyProc->xmin = newxmin;
682
LWLockRelease(ProcArrayLock);
625
/* Ignore invalid xmin (can't actually happen with current walreceiver) */
626
if (!TransactionIdIsNormal(reply.xmin))
630
* Check that the provided xmin/epoch are sane, that is, not in the future
631
* and not so far back as to be already wrapped around. Ignore if not.
633
* Epoch of nextXid should be same as standby, or if the counter has
634
* wrapped, then one greater than standby.
636
GetNextXidAndEpoch(&nextXid, &nextEpoch);
638
if (reply.xmin <= nextXid)
640
if (reply.epoch != nextEpoch)
645
if (reply.epoch + 1 != nextEpoch)
649
if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
650
return; /* epoch OK, but it's wrapped around */
653
* Set the WalSender's xmin equal to the standby's requested xmin, so that
654
* the xmin will be taken into account by GetOldestXmin. This will hold
655
* back the removal of dead rows and thereby prevent the generation of
656
* cleanup conflicts on the standby server.
658
* There is a small window for a race condition here: although we just
659
* checked that reply.xmin precedes nextXid, the nextXid could have gotten
660
* advanced between our fetching it and applying the xmin below, perhaps
661
* far enough to make reply.xmin wrap around. In that case the xmin we
662
* set here would be "in the future" and have no effect. No point in
663
* worrying about this since it's too late to save the desired data
664
* anyway. Assuming that the standby sends us an increasing sequence of
665
* xmins, this could only happen during the first reply cycle, else our
666
* own xmin would prevent nextXid from advancing so far.
668
* We don't bother taking the ProcArrayLock here. Setting the xmin field
669
* is assumed atomic, and there's no real need to prevent a concurrent
670
* GetOldestXmin. (If we're moving our xmin forward, this is obviously
671
* safe, and if we're moving it backwards, well, the data is at risk
672
* already since a VACUUM could have just finished calling GetOldestXmin.)
674
MyProc->xmin = reply.xmin;
686
677
/* Main loop of walsender process */