~aglenyoung/+junk/postgres-9.3-dtrace

« back to all changes in this revision

Viewing changes to src/bin/pg_basebackup/receivelog.c

  • Committer: Package Import Robot
  • Author(s): Martin Pitt, Christoph Berg, Martin Pitt
  • Date: 2013-06-26 15:13:32 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20130626151332-p34yjpn0txbdsdzd
Tags: 9.3~beta2-1
[ Christoph Berg ]
* hurd-i386: Ignore testsuite failures so we have a working libpq5 (they
  don't implement semaphores so the server won't even start).
* Mark postgresql-9.3 as beta in the description, suggested by Joshua D.
  Drake.

[ Martin Pitt ]
* New upstream release 9.3 beta2.

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
 
31
31
/* fd and filename for currently open WAL file */
32
32
static int      walfile = -1;
33
 
static char     current_walfile_name[MAXPGPATH] = "";
 
33
static char current_walfile_name[MAXPGPATH] = "";
34
34
 
35
35
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
36
36
                                 uint32 timeline, char *basedir,
37
 
                                 stream_stop_callback stream_stop, int standby_message_timeout,
 
37
                           stream_stop_callback stream_stop, int standby_message_timeout,
38
38
                                 char *partial_suffix, XLogRecPtr *stoppos);
39
39
 
 
40
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
 
41
                                                 uint32 *timeline);
 
42
 
40
43
/*
41
44
 * Open a new WAL file in the specified directory.
42
45
 *
197
200
static int64
198
201
localGetCurrentTimestamp(void)
199
202
{
200
 
        int64 result;
 
203
        int64           result;
201
204
        struct timeval tp;
202
205
 
203
206
        gettimeofday(&tp, NULL);
218
221
localTimestampDifference(int64 start_time, int64 stop_time,
219
222
                                                 long *secs, int *microsecs)
220
223
{
221
 
        int64 diff = stop_time - start_time;
 
224
        int64           diff = stop_time - start_time;
222
225
 
223
226
        if (diff <= 0)
224
227
        {
241
244
                                                                int64 stop_time,
242
245
                                                                int msec)
243
246
{
244
 
        int64 diff = stop_time - start_time;
 
247
        int64           diff = stop_time - start_time;
245
248
 
246
249
        return (diff >= msec * INT64CONST(1000));
247
250
}
306
309
        /*
307
310
         * Write into a temp file name.
308
311
         */
309
 
        snprintf(tmppath, MAXPGPATH,  "%s.tmp", path);
 
312
        snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
310
313
 
311
314
        unlink(tmppath);
312
315
 
411
414
sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
412
415
{
413
416
        char            replybuf[1 + 8 + 8 + 8 + 8 + 1];
414
 
        int             len = 0;
 
417
        int                     len = 0;
415
418
 
416
419
        replybuf[len] = 'r';
417
420
        len += 1;
418
 
        sendint64(blockpos, &replybuf[len]);                    /* write */
419
 
        len += 8;
420
 
        sendint64(InvalidXLogRecPtr, &replybuf[len]);   /* flush */
421
 
        len += 8;
422
 
        sendint64(InvalidXLogRecPtr, &replybuf[len]);   /* apply */
423
 
        len += 8;
424
 
        sendint64(now, &replybuf[len]);                                 /* sendTime */
425
 
        len += 8;
426
 
        replybuf[len] = replyRequested ? 1 : 0;                 /* replyRequested */
 
421
        sendint64(blockpos, &replybuf[len]);            /* write */
 
422
        len += 8;
 
423
        sendint64(InvalidXLogRecPtr, &replybuf[len]);           /* flush */
 
424
        len += 8;
 
425
        sendint64(InvalidXLogRecPtr, &replybuf[len]);           /* apply */
 
426
        len += 8;
 
427
        sendint64(now, &replybuf[len]);         /* sendTime */
 
428
        len += 8;
 
429
        replybuf[len] = replyRequested ? 1 : 0;         /* replyRequested */
427
430
        len += 1;
428
431
 
429
432
        if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
461
464
        if (serverMajor < minServerMajor || serverMajor > maxServerMajor)
462
465
        {
463
466
                const char *serverver = PQparameterStatus(conn, "server_version");
 
467
 
464
468
                fprintf(stderr, _("%s: incompatible server version %s; streaming is only supported with server version %s\n"),
465
469
                                progname,
466
470
                                serverver ? serverver : "'unknown'",
547
551
                if (timeline > atoi(PQgetvalue(res, 0, 1)))
548
552
                {
549
553
                        fprintf(stderr,
550
 
                                        _("%s: starting timeline %u is not present in the server\n"),
 
554
                                _("%s: starting timeline %u is not present in the server\n"),
551
555
                                        progname, timeline);
552
556
                        PQclear(res);
553
557
                        return false;
558
562
        while (1)
559
563
        {
560
564
                /*
561
 
                 * Fetch the timeline history file for this timeline, if we don't
562
 
                 * have it already.
 
565
                 * Fetch the timeline history file for this timeline, if we don't have
 
566
                 * it already.
563
567
                 */
564
568
                if (!existsTimeLineHistoryFile(basedir, timeline))
565
569
                {
569
573
                        {
570
574
                                /* FIXME: we might send it ok, but get an error */
571
575
                                fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
572
 
                                                progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
 
576
                                        progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
573
577
                                PQclear(res);
574
578
                                return false;
575
579
                        }
582
586
                        {
583
587
                                fprintf(stderr,
584
588
                                                _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
585
 
                                        progname, PQntuples(res), PQnfields(res), 1, 2);
 
589
                                                progname, PQntuples(res), PQnfields(res), 1, 2);
586
590
                        }
587
591
 
588
592
                        /* Write the history file to disk */
594
598
                }
595
599
 
596
600
                /*
597
 
                 * Before we start streaming from the requested location, check
598
 
                 * if the callback tells us to stop here.
 
601
                 * Before we start streaming from the requested location, check if the
 
602
                 * callback tells us to stop here.
599
603
                 */
600
604
                if (stream_stop(startpos, timeline, false))
601
605
                        return true;
624
628
                /*
625
629
                 * Streaming finished.
626
630
                 *
627
 
                 * There are two possible reasons for that: a controlled shutdown,
628
 
                 * or we reached the end of the current timeline. In case of
 
631
                 * There are two possible reasons for that: a controlled shutdown, or
 
632
                 * we reached the end of the current timeline. In case of
629
633
                 * end-of-timeline, the server sends a result set after Copy has
630
 
                 * finished, containing the next timeline's ID. Read that, and
631
 
                 * restart streaming from the next timeline.
 
634
                 * finished, containing information about the next timeline. Read
 
635
                 * that, and restart streaming from the next timeline. In case of
 
636
                 * controlled shutdown, stop here.
632
637
                 */
633
 
 
634
638
                if (PQresultStatus(res) == PGRES_TUPLES_OK)
635
639
                {
636
640
                        /*
637
 
                         * End-of-timeline. Read the next timeline's ID.
 
641
                         * End-of-timeline. Read the next timeline's ID and starting
 
642
                         * position. Usually, the starting position will match the end of
 
643
                         * the previous timeline, but there are corner cases like if the
 
644
                         * server had sent us half of a WAL record, when it was promoted.
 
645
                         * The new timeline will begin at the end of the last complete
 
646
                         * record in that case, overlapping the partial WAL record on the
 
647
                         * the old timeline.
638
648
                         */
639
649
                        uint32          newtimeline;
 
650
                        bool            parsed;
640
651
 
641
 
                        newtimeline = atoi(PQgetvalue(res, 0, 0));
 
652
                        parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
642
653
                        PQclear(res);
 
654
                        if (!parsed)
 
655
                                goto error;
643
656
 
 
657
                        /* Sanity check the values the server gave us */
644
658
                        if (newtimeline <= timeline)
645
659
                        {
646
 
                                /* shouldn't happen */
647
 
                                fprintf(stderr,
648
 
                                                "server reported unexpected next timeline %u, following timeline %u\n",
649
 
                                                newtimeline, timeline);
 
660
                                fprintf(stderr,
 
661
                                                _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
 
662
                                                progname, newtimeline, timeline);
 
663
                                goto error;
 
664
                        }
 
665
                        if (startpos > stoppos)
 
666
                        {
 
667
                                fprintf(stderr,
 
668
                                                _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
 
669
                                                progname,
 
670
                                                timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
 
671
                                  newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
650
672
                                goto error;
651
673
                        }
652
674
 
655
677
                        if (PQresultStatus(res) != PGRES_COMMAND_OK)
656
678
                        {
657
679
                                fprintf(stderr,
658
 
                                                _("%s: unexpected termination of replication stream: %s"),
 
680
                                   _("%s: unexpected termination of replication stream: %s"),
659
681
                                                progname, PQresultErrorMessage(res));
660
682
                                goto error;
661
683
                        }
662
684
                        PQclear(res);
663
685
 
664
686
                        /*
665
 
                         * Loop back to start streaming from the new timeline.
666
 
                         * Always start streaming at the beginning of a segment.
 
687
                         * Loop back to start streaming from the new timeline. Always
 
688
                         * start streaming at the beginning of a segment.
667
689
                         */
668
690
                        timeline = newtimeline;
669
 
                        startpos = stoppos - (stoppos % XLOG_SEG_SIZE);
 
691
                        startpos = startpos - (startpos % XLOG_SEG_SIZE);
670
692
                        continue;
671
693
                }
672
694
                else if (PQresultStatus(res) == PGRES_COMMAND_OK)
705
727
}
706
728
 
707
729
/*
 
730
 * Helper function to parse the result set returned by server after streaming
 
731
 * has finished. On failure, prints an error to stderr and returns false.
 
732
 */
 
733
static bool
 
734
ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
 
735
{
 
736
        uint32          startpos_xlogid,
 
737
                                startpos_xrecoff;
 
738
 
 
739
        /*----------
 
740
         * The result set consists of one row and two columns, e.g:
 
741
         *
 
742
         *      next_tli | next_tli_startpos
 
743
         * ----------+-------------------
 
744
         *                 4 | 0/9949AE0
 
745
         *
 
746
         * next_tli is the timeline ID of the next timeline after the one that
 
747
         * just finished streaming. next_tli_startpos is the XLOG position where
 
748
         * the server switched to it.
 
749
         *----------
 
750
         */
 
751
        if (PQnfields(res) < 2 || PQntuples(res) != 1)
 
752
        {
 
753
                fprintf(stderr,
 
754
                                _("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
 
755
                                progname, PQntuples(res), PQnfields(res), 1, 2);
 
756
                return false;
 
757
        }
 
758
 
 
759
        *timeline = atoi(PQgetvalue(res, 0, 0));
 
760
        if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
 
761
                           &startpos_xrecoff) != 2)
 
762
        {
 
763
                fprintf(stderr,
 
764
                        _("%s: could not parse next timeline's starting point \"%s\"\n"),
 
765
                                progname, PQgetvalue(res, 0, 1));
 
766
                return false;
 
767
        }
 
768
        *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
 
769
 
 
770
        return true;
 
771
}
 
772
 
 
773
/*
708
774
 * The main loop of ReceiveXLogStream. Handles the COPY stream after
709
775
 * initiating streaming with the START_STREAMING command.
710
776
 *
775
841
                if (r == 0)
776
842
                {
777
843
                        /*
778
 
                         * No data available. Wait for some to appear, but not longer
779
 
                         * than the specified timeout, so that we can ping the server.
 
844
                         * No data available. Wait for some to appear, but not longer than
 
845
                         * the specified timeout, so that we can ping the server.
780
846
                         */
781
847
                        fd_set          input_mask;
782
848
                        struct timeval timeout;
810
876
                        {
811
877
                                /*
812
878
                                 * Got a timeout or signal. Continue the loop and either
813
 
                                 * deliver a status packet to the server or just go back
814
 
                                 * into blocking.
 
879
                                 * deliver a status packet to the server or just go back into
 
880
                                 * blocking.
815
881
                                 */
816
882
                                continue;
817
883
                        }
875
941
                /* Check the message type. */
876
942
                if (copybuf[0] == 'k')
877
943
                {
878
 
                        int             pos;
879
 
                        bool    replyRequested;
 
944
                        int                     pos;
 
945
                        bool            replyRequested;
880
946
 
881
947
                        /*
882
948
                         * Parse the keepalive message, enclosed in the CopyData message.
883
949
                         * We just check if the server requested a reply, and ignore the
884
950
                         * rest.
885
951
                         */
886
 
                        pos = 1;        /* skip msgtype 'k' */
887
 
                        pos += 8;       /* skip walEnd */
888
 
                        pos += 8;       /* skip sendTime */
 
952
                        pos = 1;                        /* skip msgtype 'k' */
 
953
                        pos += 8;                       /* skip walEnd */
 
954
                        pos += 8;                       /* skip sendTime */
889
955
 
890
956
                        if (r < pos + 1)
891
957
                        {
918
984
                         * CopyData message. We only need the WAL location field
919
985
                         * (dataStart), the rest of the header is ignored.
920
986
                         */
921
 
                        hdr_len = 1;    /* msgtype 'w' */
922
 
                        hdr_len += 8;   /* dataStart */
923
 
                        hdr_len += 8;   /* walEnd */
924
 
                        hdr_len += 8;   /* sendTime */
 
987
                        hdr_len = 1;            /* msgtype 'w' */
 
988
                        hdr_len += 8;           /* dataStart */
 
989
                        hdr_len += 8;           /* walEnd */
 
990
                        hdr_len += 8;           /* sendTime */
925
991
                        if (r < hdr_len + 1)
926
992
                        {
927
993
                                fprintf(stderr, _("%s: streaming header too small: %d\n"),
934
1000
                        xlogoff = blockpos % XLOG_SEG_SIZE;
935
1001
 
936
1002
                        /*
937
 
                         * Verify that the initial location in the stream matches where
938
 
                         * we think we are.
 
1003
                         * Verify that the initial location in the stream matches where we
 
1004
                         * think we are.
939
1005
                         */
940
1006
                        if (walfile == -1)
941
1007
                        {
955
1021
                                if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
956
1022
                                {
957
1023
                                        fprintf(stderr,
958
 
                                                        _("%s: got WAL data offset %08x, expected %08x\n"),
959
 
                                                        progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
 
1024
                                                  _("%s: got WAL data offset %08x, expected %08x\n"),
 
1025
                                           progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
960
1026
                                        goto error;
961
1027
                                }
962
1028
                        }
1022
1088
                                                        goto error;
1023
1089
                                                }
1024
1090
                                                still_sending = false;
1025
 
                                                break; /* ignore the rest of this XLogData packet */
 
1091
                                                break;  /* ignore the rest of this XLogData packet */
1026
1092
                                        }
1027
1093
                                }
1028
1094
                        }