31
31
/* fd and filename for currently open WAL file */
32
32
static int walfile = -1;
33
static char current_walfile_name[MAXPGPATH] = "";
33
static char current_walfile_name[MAXPGPATH] = "";
35
35
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
36
36
uint32 timeline, char *basedir,
37
stream_stop_callback stream_stop, int standby_message_timeout,
37
stream_stop_callback stream_stop, int standby_message_timeout,
38
38
char *partial_suffix, XLogRecPtr *stoppos);
40
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
41
44
* Open a new WAL file in the specified directory.
218
221
localTimestampDifference(int64 start_time, int64 stop_time,
219
222
long *secs, int *microsecs)
221
int64 diff = stop_time - start_time;
224
int64 diff = stop_time - start_time;
307
310
* Write into a temp file name.
309
snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
312
snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
411
414
sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
413
416
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
416
419
replybuf[len] = 'r';
418
sendint64(blockpos, &replybuf[len]); /* write */
420
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
422
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
424
sendint64(now, &replybuf[len]); /* sendTime */
426
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
421
sendint64(blockpos, &replybuf[len]); /* write */
423
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
425
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
427
sendint64(now, &replybuf[len]); /* sendTime */
429
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
429
432
if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
461
464
if (serverMajor < minServerMajor || serverMajor > maxServerMajor)
463
466
const char *serverver = PQparameterStatus(conn, "server_version");
464
468
fprintf(stderr, _("%s: incompatible server version %s; streaming is only supported with server version %s\n"),
466
470
serverver ? serverver : "'unknown'",
570
574
/* FIXME: we might send it ok, but get an error */
571
575
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
572
progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
576
progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
584
588
_("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
585
progname, PQntuples(res), PQnfields(res), 1, 2);
589
progname, PQntuples(res), PQnfields(res), 1, 2);
588
592
/* Write the history file to disk */
597
* Before we start streaming from the requested location, check
598
* if the callback tells us to stop here.
601
* Before we start streaming from the requested location, check if the
602
* callback tells us to stop here.
600
604
if (stream_stop(startpos, timeline, false))
625
629
* Streaming finished.
627
* There are two possible reasons for that: a controlled shutdown,
628
* or we reached the end of the current timeline. In case of
631
* There are two possible reasons for that: a controlled shutdown, or
632
* we reached the end of the current timeline. In case of
629
633
* end-of-timeline, the server sends a result set after Copy has
630
* finished, containing the next timeline's ID. Read that, and
631
* restart streaming from the next timeline.
634
* finished, containing information about the next timeline. Read
635
* that, and restart streaming from the next timeline. In case of
636
* controlled shutdown, stop here.
634
638
if (PQresultStatus(res) == PGRES_TUPLES_OK)
637
* End-of-timeline. Read the next timeline's ID.
641
* End-of-timeline. Read the next timeline's ID and starting
642
* position. Usually, the starting position will match the end of
643
* the previous timeline, but there are corner cases like if the
644
* server had sent us half of a WAL record, when it was promoted.
645
* The new timeline will begin at the end of the last complete
646
* record in that case, overlapping the partial WAL record on the
639
649
uint32 newtimeline;
641
newtimeline = atoi(PQgetvalue(res, 0, 0));
652
parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
657
/* Sanity check the values the server gave us */
644
658
if (newtimeline <= timeline)
646
/* shouldn't happen */
648
"server reported unexpected next timeline %u, following timeline %u\n",
649
newtimeline, timeline);
661
_("%s: server reported unexpected next timeline %u, following timeline %u\n"),
662
progname, newtimeline, timeline);
665
if (startpos > stoppos)
668
_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
670
timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
671
newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
655
677
if (PQresultStatus(res) != PGRES_COMMAND_OK)
658
_("%s: unexpected termination of replication stream: %s"),
680
_("%s: unexpected termination of replication stream: %s"),
659
681
progname, PQresultErrorMessage(res));
665
* Loop back to start streaming from the new timeline.
666
* Always start streaming at the beginning of a segment.
687
* Loop back to start streaming from the new timeline. Always
688
* start streaming at the beginning of a segment.
668
690
timeline = newtimeline;
669
startpos = stoppos - (stoppos % XLOG_SEG_SIZE);
691
startpos = startpos - (startpos % XLOG_SEG_SIZE);
672
694
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
730
* Helper function to parse the result set returned by server after streaming
731
* has finished. On failure, prints an error to stderr and returns false.
734
ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
736
uint32 startpos_xlogid,
740
* The result set consists of one row and two columns, e.g:
742
* next_tli | next_tli_startpos
743
* ----------+-------------------
746
* next_tli is the timeline ID of the next timeline after the one that
747
* just finished streaming. next_tli_startpos is the XLOG position where
748
* the server switched to it.
751
if (PQnfields(res) < 2 || PQntuples(res) != 1)
754
_("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
755
progname, PQntuples(res), PQnfields(res), 1, 2);
759
*timeline = atoi(PQgetvalue(res, 0, 0));
760
if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
761
&startpos_xrecoff) != 2)
764
_("%s: could not parse next timeline's starting point \"%s\"\n"),
765
progname, PQgetvalue(res, 0, 1));
768
*startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
708
774
* The main loop of ReceiveXLogStream. Handles the COPY stream after
709
775
* initiating streaming with the START_STREAMING command.
778
* No data available. Wait for some to appear, but not longer
779
* than the specified timeout, so that we can ping the server.
844
* No data available. Wait for some to appear, but not longer than
845
* the specified timeout, so that we can ping the server.
781
847
fd_set input_mask;
782
848
struct timeval timeout;
875
941
/* Check the message type. */
876
942
if (copybuf[0] == 'k')
882
948
* Parse the keepalive message, enclosed in the CopyData message.
883
949
* We just check if the server requested a reply, and ignore the
886
pos = 1; /* skip msgtype 'k' */
887
pos += 8; /* skip walEnd */
888
pos += 8; /* skip sendTime */
952
pos = 1; /* skip msgtype 'k' */
953
pos += 8; /* skip walEnd */
954
pos += 8; /* skip sendTime */
918
984
* CopyData message. We only need the WAL location field
919
985
* (dataStart), the rest of the header is ignored.
921
hdr_len = 1; /* msgtype 'w' */
922
hdr_len += 8; /* dataStart */
923
hdr_len += 8; /* walEnd */
924
hdr_len += 8; /* sendTime */
987
hdr_len = 1; /* msgtype 'w' */
988
hdr_len += 8; /* dataStart */
989
hdr_len += 8; /* walEnd */
990
hdr_len += 8; /* sendTime */
925
991
if (r < hdr_len + 1)
927
993
fprintf(stderr, _("%s: streaming header too small: %d\n"),
955
1021
if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
958
_("%s: got WAL data offset %08x, expected %08x\n"),
959
progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
1024
_("%s: got WAL data offset %08x, expected %08x\n"),
1025
progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));