~akopytov/percona-xtrabackup/bug1166888-2.1

« back to all changes in this revision

Viewing changes to src/xbstream_read.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: (391.1.5 staging)
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/******************************************************
 
2
Copyright (c) 2011 Percona Inc.
 
3
 
 
4
The xbstream format reader implementation.
 
5
 
 
6
This program is free software; you can redistribute it and/or modify
 
7
it under the terms of the GNU General Public License as published by
 
8
the Free Software Foundation; version 2 of the License.
 
9
 
 
10
This program is distributed in the hope that it will be useful,
 
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
GNU General Public License for more details.
 
14
 
 
15
You should have received a copy of the GNU General Public License
 
16
along with this program; if not, write to the Free Software
 
17
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 
 
19
*******************************************************/
 
20
 
 
21
#include <my_base.h>
 
22
#include <zlib.h>
 
23
#include "common.h"
 
24
#include "xbstream.h"
 
25
 
 
26
/* Allocate 1 MB for the payload buffer initially */
 
27
#define INIT_BUFFER_LEN (1024 * 1024)
 
28
 
 
29
#ifndef MY_OFF_T_MAX
 
30
#define MY_OFF_T_MAX (~(my_off_t)0UL)
 
31
#endif
 
32
 
 
33
struct xb_rstream_struct {
 
34
        my_off_t        offset;
 
35
        File            fd;
 
36
        void            *buffer;
 
37
        size_t          buflen;
 
38
};
 
39
 
 
40
xb_rstream_t *
 
41
xb_stream_read_new(void)
 
42
{
 
43
        xb_rstream_t *stream;
 
44
 
 
45
        stream = (xb_rstream_t *) my_malloc(sizeof(xb_rstream_t), MYF(MY_FAE));
 
46
 
 
47
        stream->buffer = my_malloc(INIT_BUFFER_LEN, MYF(MY_FAE));
 
48
        stream->buflen = INIT_BUFFER_LEN;
 
49
 
 
50
        stream->fd = fileno(stdin);
 
51
        stream->offset = 0;
 
52
 
 
53
#ifdef __WIN__
 
54
        setmode(stream->fd, _O_BINARY);
 
55
#endif
 
56
 
 
57
        return stream;
 
58
}
 
59
 
 
60
static inline
 
61
xb_chunk_type_t
 
62
validate_chunk_type(uchar code)
 
63
{
 
64
        switch ((xb_chunk_type_t) code) {
 
65
        case XB_CHUNK_TYPE_PAYLOAD:
 
66
        case XB_CHUNK_TYPE_EOF:
 
67
                return (xb_chunk_type_t) code;
 
68
        default:
 
69
                return XB_CHUNK_TYPE_UNKNOWN;
 
70
        }
 
71
}
 
72
 
 
73
#define F_READ(buf,len)                                                 \
 
74
        do {                                                            \
 
75
                if (my_read(fd, buf, len, MYF(MY_WME|MY_FULL_IO)) ==    \
 
76
                    MY_FILE_ERROR) {                                    \
 
77
                        msg("xb_stream_read_chunk(): my_read() failed.\n"); \
 
78
                        goto err;                                       \
 
79
                }                                                       \
 
80
        } while (0)
 
81
 
 
82
xb_rstream_result_t
 
83
xb_stream_read_chunk(xb_rstream_t *stream, xb_rstream_chunk_t *chunk)
 
84
{
 
85
        uchar           tmpbuf[16];
 
86
        uchar           *ptr = tmpbuf;
 
87
        uint            pathlen;
 
88
        size_t          tlen;
 
89
        ulonglong       ullval;
 
90
        ulong           checksum_exp;
 
91
        ulong           checksum;;
 
92
        File            fd = stream->fd;
 
93
 
 
94
        /* This is the only place where we expect EOF, so read with my_read()
 
95
        rather than F_READ() */
 
96
        tlen = my_read(fd, tmpbuf, sizeof(XB_STREAM_CHUNK_MAGIC) + 5,
 
97
                       MYF(MY_WME));
 
98
        if (tlen == 0) {
 
99
                return XB_STREAM_READ_EOF;
 
100
        } else if (tlen < sizeof(XB_STREAM_CHUNK_MAGIC) + 4) {
 
101
                msg("xb_stream_read_chunk(): unexpected end of stream at "
 
102
                    "offset 0x%llx.\n", stream->offset);
 
103
                goto err;
 
104
        }
 
105
 
 
106
        /* Chunk magic value */
 
107
        if (memcmp(tmpbuf, XB_STREAM_CHUNK_MAGIC, 8)) {
 
108
                msg("xb_stream_read_chunk(): wrong chunk magic at offset "
 
109
                    "0x%llx.\n", (ulonglong) stream->offset);
 
110
                goto err;
 
111
        }
 
112
        ptr += 8;
 
113
        stream->offset += 8;
 
114
 
 
115
        /* Chunk flags */
 
116
        chunk->flags = *ptr++;
 
117
        stream->offset++;
 
118
 
 
119
        /* Chunk type, ignore unknown ones if ignorable flag is set */
 
120
        chunk->type = validate_chunk_type(*ptr);
 
121
        if (chunk->type == XB_CHUNK_TYPE_UNKNOWN &&
 
122
            !(chunk->flags & XB_STREAM_FLAG_IGNORABLE)) {
 
123
                msg("xb_stream_read_chunk(): unknown chunk type 0x%lu at "
 
124
                    "offset 0x%llx.\n", (ulong) *ptr,
 
125
                    (ulonglong) stream->offset);
 
126
                goto err;
 
127
        }
 
128
        ptr++;
 
129
        stream->offset++;
 
130
 
 
131
        /* Path length */
 
132
        pathlen = uint4korr(ptr);
 
133
        if (pathlen >= FN_REFLEN) {
 
134
                msg("xb_stream_read_chunk(): path length (%lu) is too large at "
 
135
                    "offset 0x%llx.\n", (ulong) pathlen, stream->offset);
 
136
                goto err;
 
137
        }
 
138
        chunk->pathlen = pathlen;
 
139
        stream->offset +=4;
 
140
 
 
141
        /* Path */
 
142
        if (chunk->pathlen > 0) {
 
143
                F_READ((uchar *) chunk->path, pathlen);
 
144
                stream->offset += pathlen;
 
145
        }
 
146
        chunk->path[pathlen] = '\0';
 
147
 
 
148
        if (chunk->type == XB_CHUNK_TYPE_EOF) {
 
149
                return XB_STREAM_READ_CHUNK;
 
150
        }
 
151
 
 
152
        /* Payload length */
 
153
        F_READ(tmpbuf, 16);
 
154
        ullval = uint8korr(tmpbuf);
 
155
        if (ullval > (ulonglong) SIZE_T_MAX) {
 
156
                msg("xb_stream_read_chunk(): chunk length is too large at "
 
157
                    "offset 0x%llx: 0x%llx.\n", (ulonglong) stream->offset,
 
158
                    ullval);
 
159
                goto err;
 
160
        }
 
161
        chunk->length = (size_t) ullval;
 
162
        stream->offset += 8;
 
163
 
 
164
        /* Payload offset */
 
165
        ullval = uint8korr(tmpbuf + 8);
 
166
        if (ullval > (ulonglong) MY_OFF_T_MAX) {
 
167
                msg("xb_stream_read_chunk(): chunk offset is too large at "
 
168
                    "offset 0x%llx: 0x%llx.\n", (ulonglong) stream->offset,
 
169
                    ullval);
 
170
                goto err;
 
171
        }
 
172
        chunk->offset = (my_off_t) ullval;
 
173
        stream->offset += 8;
 
174
 
 
175
        /* Reallocate the buffer if needed */
 
176
        if (chunk->length > stream->buflen) {
 
177
                stream->buffer = my_realloc(stream->buffer, chunk->length,
 
178
                                            MYF(MY_WME));
 
179
                if (stream->buffer == NULL) {
 
180
                        msg("xb_stream_read_chunk(): failed to increase buffer "
 
181
                            "to %lu bytes.\n", (ulong) chunk->length);
 
182
                        goto err;
 
183
                }
 
184
                stream->buflen = chunk->length;
 
185
        }
 
186
 
 
187
        /* Checksum */
 
188
        F_READ(tmpbuf, 4);
 
189
        checksum_exp = uint4korr(tmpbuf);
 
190
 
 
191
        /* Payload */
 
192
        if (chunk->length > 0) {
 
193
                F_READ(stream->buffer, chunk->length);
 
194
                stream->offset += chunk->length;
 
195
        }
 
196
 
 
197
        checksum = crc32(0, stream->buffer, chunk->length);
 
198
        if (checksum != checksum_exp) {
 
199
                msg("xb_stream_read_chunk(): invalid checksum at offset "
 
200
                    "0x%llx: expected 0x%lx, read 0x%lx.\n",
 
201
                    (ulonglong) stream->offset, checksum_exp, checksum);
 
202
                goto err;
 
203
        }
 
204
        stream->offset += 4;
 
205
 
 
206
        chunk->data = stream->buffer;
 
207
        chunk->checksum = checksum;
 
208
 
 
209
        return XB_STREAM_READ_CHUNK;
 
210
 
 
211
err:
 
212
        return XB_STREAM_READ_ERROR;
 
213
}
 
214
 
 
215
int
 
216
xb_stream_read_done(xb_rstream_t *stream)
 
217
{
 
218
        MY_FREE(stream->buffer);
 
219
        MY_FREE(stream);
 
220
 
 
221
        return 0;
 
222
}