~gl-az/percona-xtrabackup/2.1-io-block-size

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_read_support_compression_bzip2.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: (391.1.5 staging)
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2003-2007 Tim Kientzle
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
#include "archive_platform.h"
 
27
 
 
28
__FBSDID("$FreeBSD: head/lib/libarchive/archive_read_support_compression_bzip2.c 201108 2009-12-28 03:28:21Z kientzle $");
 
29
 
 
30
#ifdef HAVE_ERRNO_H
 
31
#include <errno.h>
 
32
#endif
 
33
#include <stdio.h>
 
34
#ifdef HAVE_STDLIB_H
 
35
#include <stdlib.h>
 
36
#endif
 
37
#ifdef HAVE_STRING_H
 
38
#include <string.h>
 
39
#endif
 
40
#ifdef HAVE_UNISTD_H
 
41
#include <unistd.h>
 
42
#endif
 
43
#ifdef HAVE_BZLIB_H
 
44
#include <bzlib.h>
 
45
#endif
 
46
 
 
47
#include "archive.h"
 
48
#include "archive_private.h"
 
49
#include "archive_read_private.h"
 
50
 
 
51
#if defined(HAVE_BZLIB_H) && defined(BZ_CONFIG_ERROR)
 
52
struct private_data {
 
53
        bz_stream        stream;
 
54
        char            *out_block;
 
55
        size_t           out_block_size;
 
56
        char             valid; /* True = decompressor is initialized */
 
57
        char             eof; /* True = found end of compressed data. */
 
58
};
 
59
 
 
60
/* Bzip2 filter */
 
61
static ssize_t  bzip2_filter_read(struct archive_read_filter *, const void **);
 
62
static int      bzip2_filter_close(struct archive_read_filter *);
 
63
#endif
 
64
 
 
65
/*
 
66
 * Note that we can detect bzip2 archives even if we can't decompress
 
67
 * them.  (In fact, we like detecting them because we can give better
 
68
 * error messages.)  So the bid framework here gets compiled even
 
69
 * if bzlib is unavailable.
 
70
 */
 
71
static int      bzip2_reader_bid(struct archive_read_filter_bidder *, struct archive_read_filter *);
 
72
static int      bzip2_reader_init(struct archive_read_filter *);
 
73
static int      bzip2_reader_free(struct archive_read_filter_bidder *);
 
74
 
 
75
int
 
76
archive_read_support_compression_bzip2(struct archive *_a)
 
77
{
 
78
        struct archive_read *a = (struct archive_read *)_a;
 
79
        struct archive_read_filter_bidder *reader = __archive_read_get_bidder(a);
 
80
 
 
81
        if (reader == NULL)
 
82
                return (ARCHIVE_FATAL);
 
83
 
 
84
        reader->data = NULL;
 
85
        reader->bid = bzip2_reader_bid;
 
86
        reader->init = bzip2_reader_init;
 
87
        reader->options = NULL;
 
88
        reader->free = bzip2_reader_free;
 
89
#if defined(HAVE_BZLIB_H) && defined(BZ_CONFIG_ERROR)
 
90
        return (ARCHIVE_OK);
 
91
#else
 
92
        archive_set_error(_a, ARCHIVE_ERRNO_MISC,
 
93
            "Using external bunzip2 program");
 
94
        return (ARCHIVE_WARN);
 
95
#endif
 
96
}
 
97
 
 
98
static int
 
99
bzip2_reader_free(struct archive_read_filter_bidder *self){
 
100
        (void)self; /* UNUSED */
 
101
        return (ARCHIVE_OK);
 
102
}
 
103
 
 
104
/*
 
105
 * Test whether we can handle this data.
 
106
 *
 
107
 * This logic returns zero if any part of the signature fails.  It
 
108
 * also tries to Do The Right Thing if a very short buffer prevents us
 
109
 * from verifying as much as we would like.
 
110
 */
 
111
static int
 
112
bzip2_reader_bid(struct archive_read_filter_bidder *self, struct archive_read_filter *filter)
 
113
{
 
114
        const unsigned char *buffer;
 
115
        ssize_t avail;
 
116
        int bits_checked;
 
117
 
 
118
        (void)self; /* UNUSED */
 
119
 
 
120
        /* Minimal bzip2 archive is 14 bytes. */
 
121
        buffer = __archive_read_filter_ahead(filter, 14, &avail);
 
122
        if (buffer == NULL)
 
123
                return (0);
 
124
 
 
125
        /* First three bytes must be "BZh" */
 
126
        bits_checked = 0;
 
127
        if (buffer[0] != 'B' || buffer[1] != 'Z' || buffer[2] != 'h')
 
128
                return (0);
 
129
        bits_checked += 24;
 
130
 
 
131
        /* Next follows a compression flag which must be an ASCII digit. */
 
132
        if (buffer[3] < '1' || buffer[3] > '9')
 
133
                return (0);
 
134
        bits_checked += 5;
 
135
 
 
136
        /* After BZh[1-9], there must be either a data block
 
137
         * which begins with 0x314159265359 or an end-of-data
 
138
         * marker of 0x177245385090. */
 
139
        if (memcmp(buffer + 4, "\x31\x41\x59\x26\x53\x59", 6) == 0)
 
140
                bits_checked += 48;
 
141
        else if (memcmp(buffer + 4, "\x17\x72\x45\x38\x50\x90", 6) == 0)
 
142
                bits_checked += 48;
 
143
        else
 
144
                return (0);
 
145
 
 
146
        return (bits_checked);
 
147
}
 
148
 
 
149
#if !defined(HAVE_BZLIB_H) || !defined(BZ_CONFIG_ERROR)
 
150
 
 
151
/*
 
152
 * If we don't have the library on this system, we can't actually do the
 
153
 * decompression.  We can, however, still detect compressed archives
 
154
 * and emit a useful message.
 
155
 */
 
156
static int
 
157
bzip2_reader_init(struct archive_read_filter *self)
 
158
{
 
159
        int r;
 
160
 
 
161
        r = __archive_read_program(self, "bunzip2");
 
162
        /* Note: We set the format here even if __archive_read_program()
 
163
         * above fails.  We do, after all, know what the format is
 
164
         * even if we weren't able to read it. */
 
165
        self->code = ARCHIVE_COMPRESSION_BZIP2;
 
166
        self->name = "bzip2";
 
167
        return (r);
 
168
}
 
169
 
 
170
 
 
171
#else
 
172
 
 
173
/*
 
174
 * Setup the callbacks.
 
175
 */
 
176
static int
 
177
bzip2_reader_init(struct archive_read_filter *self)
 
178
{
 
179
        static const size_t out_block_size = 64 * 1024;
 
180
        void *out_block;
 
181
        struct private_data *state;
 
182
 
 
183
        self->code = ARCHIVE_COMPRESSION_BZIP2;
 
184
        self->name = "bzip2";
 
185
 
 
186
        state = (struct private_data *)calloc(sizeof(*state), 1);
 
187
        out_block = (unsigned char *)malloc(out_block_size);
 
188
        if (self == NULL || state == NULL || out_block == NULL) {
 
189
                archive_set_error(&self->archive->archive, ENOMEM,
 
190
                    "Can't allocate data for bzip2 decompression");
 
191
                free(out_block);
 
192
                free(state);
 
193
                return (ARCHIVE_FATAL);
 
194
        }
 
195
 
 
196
        self->data = state;
 
197
        state->out_block_size = out_block_size;
 
198
        state->out_block = out_block;
 
199
        self->read = bzip2_filter_read;
 
200
        self->skip = NULL; /* not supported */
 
201
        self->close = bzip2_filter_close;
 
202
 
 
203
        return (ARCHIVE_OK);
 
204
}
 
205
 
 
206
/*
 
207
 * Return the next block of decompressed data.
 
208
 */
 
209
static ssize_t
 
210
bzip2_filter_read(struct archive_read_filter *self, const void **p)
 
211
{
 
212
        struct private_data *state;
 
213
        size_t decompressed;
 
214
        const char *read_buf;
 
215
        ssize_t ret;
 
216
 
 
217
        state = (struct private_data *)self->data;
 
218
 
 
219
        if (state->eof) {
 
220
                *p = NULL;
 
221
                return (0);
 
222
        }
 
223
 
 
224
        /* Empty our output buffer. */
 
225
        state->stream.next_out = state->out_block;
 
226
        state->stream.avail_out = state->out_block_size;
 
227
 
 
228
        /* Try to fill the output buffer. */
 
229
        for (;;) {
 
230
                if (!state->valid) {
 
231
                        if (bzip2_reader_bid(self->bidder, self->upstream) == 0) {
 
232
                                state->eof = 1;
 
233
                                *p = state->out_block;
 
234
                                decompressed = state->stream.next_out
 
235
                                    - state->out_block;
 
236
                                return (decompressed);
 
237
                        }
 
238
                        /* Initialize compression library. */
 
239
                        ret = BZ2_bzDecompressInit(&(state->stream),
 
240
                                           0 /* library verbosity */,
 
241
                                           0 /* don't use low-mem algorithm */);
 
242
 
 
243
                        /* If init fails, try low-memory algorithm instead. */
 
244
                        if (ret == BZ_MEM_ERROR)
 
245
                                ret = BZ2_bzDecompressInit(&(state->stream),
 
246
                                           0 /* library verbosity */,
 
247
                                           1 /* do use low-mem algo */);
 
248
 
 
249
                        if (ret != BZ_OK) {
 
250
                                const char *detail = NULL;
 
251
                                int err = ARCHIVE_ERRNO_MISC;
 
252
                                switch (ret) {
 
253
                                case BZ_PARAM_ERROR:
 
254
                                        detail = "invalid setup parameter";
 
255
                                        break;
 
256
                                case BZ_MEM_ERROR:
 
257
                                        err = ENOMEM;
 
258
                                        detail = "out of memory";
 
259
                                        break;
 
260
                                case BZ_CONFIG_ERROR:
 
261
                                        detail = "mis-compiled library";
 
262
                                        break;
 
263
                                }
 
264
                                archive_set_error(&self->archive->archive, err,
 
265
                                    "Internal error initializing decompressor%s%s",
 
266
                                    detail == NULL ? "" : ": ",
 
267
                                    detail);
 
268
                                return (ARCHIVE_FATAL);
 
269
                        }
 
270
                        state->valid = 1;
 
271
                }
 
272
 
 
273
                /* stream.next_in is really const, but bzlib
 
274
                 * doesn't declare it so. <sigh> */
 
275
                read_buf =
 
276
                    __archive_read_filter_ahead(self->upstream, 1, &ret);
 
277
                if (read_buf == NULL)
 
278
                        return (ARCHIVE_FATAL);
 
279
                state->stream.next_in = (char *)(uintptr_t)read_buf;
 
280
                state->stream.avail_in = ret;
 
281
                /* There is no more data, return whatever we have. */
 
282
                if (ret == 0) {
 
283
                        state->eof = 1;
 
284
                        *p = state->out_block;
 
285
                        decompressed = state->stream.next_out
 
286
                            - state->out_block;
 
287
                        return (decompressed);
 
288
                }
 
289
 
 
290
                /* Decompress as much as we can in one pass. */
 
291
                ret = BZ2_bzDecompress(&(state->stream));
 
292
                __archive_read_filter_consume(self->upstream,
 
293
                    state->stream.next_in - read_buf);
 
294
 
 
295
                switch (ret) {
 
296
                case BZ_STREAM_END: /* Found end of stream. */
 
297
                        switch (BZ2_bzDecompressEnd(&(state->stream))) {
 
298
                        case BZ_OK:
 
299
                                break;
 
300
                        default:
 
301
                                archive_set_error(&(self->archive->archive),
 
302
                                          ARCHIVE_ERRNO_MISC,
 
303
                                          "Failed to clean up decompressor");
 
304
                                return (ARCHIVE_FATAL);
 
305
                        }
 
306
                        state->valid = 0;
 
307
                        /* FALLTHROUGH */
 
308
                case BZ_OK: /* Decompressor made some progress. */
 
309
                        /* If we filled our buffer, update stats and return. */
 
310
                        if (state->stream.avail_out == 0) {
 
311
                                *p = state->out_block;
 
312
                                decompressed = state->stream.next_out
 
313
                                    - state->out_block;
 
314
                                return (decompressed);
 
315
                        }
 
316
                        break;
 
317
                default: /* Return an error. */
 
318
                        archive_set_error(&self->archive->archive,
 
319
                            ARCHIVE_ERRNO_MISC, "bzip decompression failed");
 
320
                        return (ARCHIVE_FATAL);
 
321
                }
 
322
        }
 
323
}
 
324
 
 
325
/*
 
326
 * Clean up the decompressor.
 
327
 */
 
328
static int
 
329
bzip2_filter_close(struct archive_read_filter *self)
 
330
{
 
331
        struct private_data *state;
 
332
        int ret = ARCHIVE_OK;
 
333
 
 
334
        state = (struct private_data *)self->data;
 
335
 
 
336
        if (state->valid) {
 
337
                switch (BZ2_bzDecompressEnd(&state->stream)) {
 
338
                case BZ_OK:
 
339
                        break;
 
340
                default:
 
341
                        archive_set_error(&self->archive->archive,
 
342
                                          ARCHIVE_ERRNO_MISC,
 
343
                                          "Failed to clean up decompressor");
 
344
                        ret = ARCHIVE_FATAL;
 
345
                }
 
346
        }
 
347
 
 
348
        free(state->out_block);
 
349
        free(state);
 
350
        return (ret);
 
351
}
 
352
 
 
353
#endif /* HAVE_BZLIB_H && BZ_CONFIG_ERROR */