718
721
/* Normal exit from the walsender is here */
719
722
if (walsender_shutdown_requested)
721
/* Inform the standby that XLOG streaming was done */
724
/* Inform the standby that XLOG streaming is done */
722
725
pq_puttextmessage('C', "COPY 0");
731
/* Check for input from the client */
732
ProcessRepliesIfAny();
729
735
* If we don't have any pending data in the output buffer, try to send
736
* some more. If there is some, we don't bother to call XLogSend
737
* again until we've flushed it ... but we'd better assume we are not
732
740
if (!pq_is_send_pending())
741
XLogSend(output_message, &caughtup);
745
/* Try to flush pending output to the client */
746
if (pq_flush_if_writable() != 0)
749
/* If nothing remains to be sent right now ... */
750
if (caughtup && !pq_is_send_pending())
734
XLogSend(output_message, &caughtup);
737
* Even if we wrote all the WAL that was available when we started
738
* sending, more might have arrived while we were sending this
739
* batch. We had the latch set while sending, so we have not
740
* received any signals from that time. Let's arm the latch again,
741
* and after that check that we're still up-to-date.
743
if (caughtup && !pq_is_send_pending())
745
ResetLatch(&MyWalSnd->latch);
753
* If we're in catchup state, move to streaming. This is an
754
* important state change for users to know about, since before
755
* this point data loss might occur if the primary dies and we
756
* need to failover to the standby. The state change is also
757
* important for synchronous replication, since commits that
758
* started to wait at that point might wait for some time.
760
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
763
(errmsg("standby \"%s\" has now caught up with primary",
765
WalSndSetState(WALSNDSTATE_STREAMING);
769
* When SIGUSR2 arrives, we send any outstanding logs up to the
770
* shutdown checkpoint record (i.e., the latest record) and exit.
771
* This may be a normal termination at shutdown, or a promotion,
772
* the walsender is not sure which.
774
if (walsender_ready_to_stop)
776
/* ... let's just be real sure we're caught up ... */
747
777
XLogSend(output_message, &caughtup);
778
if (caughtup && !pq_is_send_pending())
780
walsender_shutdown_requested = true;
781
continue; /* don't want to wait more */
751
/* Flush pending output to the client */
752
if (pq_flush_if_writable() != 0)
756
* When SIGUSR2 arrives, we send any outstanding logs up to the
757
* shutdown checkpoint record (i.e., the latest record) and exit.
787
* We don't block if not caught up, unless there is unsent data
788
* pending in which case we'd better block until the socket is
789
* write-ready. This test is only needed for the case where XLogSend
790
* loaded a subset of the available data but then pq_flush_if_writable
791
* flushed it all --- we should immediately try to send more.
759
if (walsender_ready_to_stop && !pq_is_send_pending())
761
XLogSend(output_message, &caughtup);
762
ProcessRepliesIfAny();
763
if (caughtup && !pq_is_send_pending())
764
walsender_shutdown_requested = true;
767
if ((caughtup || pq_is_send_pending()) &&
769
!walsender_shutdown_requested)
793
if (caughtup || pq_is_send_pending())
771
795
TimestampTz finish_time = 0;
774
/* Reschedule replication timeout */
798
/* Determine time until replication timeout */
775
799
if (replication_timeout > 0)
819
* If we're in catchup state, see if its time to move to streaming.
820
* This is an important state change for users, since before this
821
* point data loss might occur if the primary dies and we need to
822
* failover to the standby. The state change is also important for
823
* synchronous replication, since commits that started to wait at that
824
* point might wait for some time.
826
if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup)
829
(errmsg("standby \"%s\" has now caught up with primary",
831
WalSndSetState(WALSNDSTATE_STREAMING);
834
ProcessRepliesIfAny();