~akopytov/percona-xtrabackup/bug1166888-2.0

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_read_support_compression_gzip.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2003-2007 Tim Kientzle
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
#include "archive_platform.h"
 
27
 
 
28
__FBSDID("$FreeBSD: head/lib/libarchive/archive_read_support_compression_gzip.c 201082 2009-12-28 02:05:28Z kientzle $");
 
29
 
 
30
 
 
31
#ifdef HAVE_ERRNO_H
 
32
#include <errno.h>
 
33
#endif
 
34
#ifdef HAVE_STDLIB_H
 
35
#include <stdlib.h>
 
36
#endif
 
37
#ifdef HAVE_STRING_H
 
38
#include <string.h>
 
39
#endif
 
40
#ifdef HAVE_UNISTD_H
 
41
#include <unistd.h>
 
42
#endif
 
43
#ifdef HAVE_ZLIB_H
 
44
#include <zlib.h>
 
45
#endif
 
46
 
 
47
#include "archive.h"
 
48
#include "archive_private.h"
 
49
#include "archive_read_private.h"
 
50
 
 
51
#ifdef HAVE_ZLIB_H
 
52
struct private_data {
 
53
        z_stream         stream;
 
54
        char             in_stream;
 
55
        unsigned char   *out_block;
 
56
        size_t           out_block_size;
 
57
        int64_t          total_out;
 
58
        unsigned long    crc;
 
59
        char             eof; /* True = found end of compressed data. */
 
60
};
 
61
 
 
62
/* Gzip Filter. */
 
63
static ssize_t  gzip_filter_read(struct archive_read_filter *, const void **);
 
64
static int      gzip_filter_close(struct archive_read_filter *);
 
65
#endif
 
66
 
 
67
/*
 
68
 * Note that we can detect gzip archives even if we can't decompress
 
69
 * them.  (In fact, we like detecting them because we can give better
 
70
 * error messages.)  So the bid framework here gets compiled even
 
71
 * if zlib is unavailable.
 
72
 *
 
73
 * TODO: If zlib is unavailable, gzip_bidder_init() should
 
74
 * use the compress_program framework to try to fire up an external
 
75
 * gunzip program.
 
76
 */
 
77
static int      gzip_bidder_bid(struct archive_read_filter_bidder *,
 
78
                    struct archive_read_filter *);
 
79
static int      gzip_bidder_init(struct archive_read_filter *);
 
80
 
 
81
int
 
82
archive_read_support_compression_gzip(struct archive *_a)
 
83
{
 
84
        struct archive_read *a = (struct archive_read *)_a;
 
85
        struct archive_read_filter_bidder *bidder = __archive_read_get_bidder(a);
 
86
 
 
87
        if (bidder == NULL)
 
88
                return (ARCHIVE_FATAL);
 
89
 
 
90
        bidder->data = NULL;
 
91
        bidder->bid = gzip_bidder_bid;
 
92
        bidder->init = gzip_bidder_init;
 
93
        bidder->options = NULL;
 
94
        bidder->free = NULL; /* No data, so no cleanup necessary. */
 
95
        /* Signal the extent of gzip support with the return value here. */
 
96
#if HAVE_ZLIB_H
 
97
        return (ARCHIVE_OK);
 
98
#else
 
99
        archive_set_error(_a, ARCHIVE_ERRNO_MISC,
 
100
            "Using external gunzip program");
 
101
        return (ARCHIVE_WARN);
 
102
#endif
 
103
}
 
104
 
 
105
/*
 
106
 * Read and verify the header.
 
107
 *
 
108
 * Returns zero if the header couldn't be validated, else returns
 
109
 * number of bytes in header.  If pbits is non-NULL, it receives a
 
110
 * count of bits verified, suitable for use by bidder.
 
111
 */
 
112
static int
 
113
peek_at_header(struct archive_read_filter *filter, int *pbits)
 
114
{
 
115
        const unsigned char *p;
 
116
        ssize_t avail, len;
 
117
        int bits = 0;
 
118
        int header_flags;
 
119
 
 
120
        /* Start by looking at the first ten bytes of the header, which
 
121
         * is all fixed layout. */
 
122
        len = 10;
 
123
        p = __archive_read_filter_ahead(filter, len, &avail);
 
124
        if (p == NULL || avail == 0)
 
125
                return (0);
 
126
        if (p[0] != 037)
 
127
                return (0);
 
128
        bits += 8;
 
129
        if (p[1] != 0213)
 
130
                return (0);
 
131
        bits += 8;
 
132
        if (p[2] != 8) /* We only support deflation. */
 
133
                return (0);
 
134
        bits += 8;
 
135
        if ((p[3] & 0xE0)!= 0)  /* No reserved flags set. */
 
136
                return (0);
 
137
        bits += 3;
 
138
        header_flags = p[3];
 
139
        /* Bytes 4-7 are mod time. */
 
140
        /* Byte 8 is deflate flags. */
 
141
        /* XXXX TODO: return deflate flags back to consume_header for use
 
142
           in initializing the decompressor. */
 
143
        /* Byte 9 is OS. */
 
144
 
 
145
        /* Optional extra data:  2 byte length plus variable body. */
 
146
        if (header_flags & 4) {
 
147
                p = __archive_read_filter_ahead(filter, len + 2, &avail);
 
148
                if (p == NULL)
 
149
                        return (0);
 
150
                len += ((int)p[len + 1] << 8) | (int)p[len];
 
151
                len += 2;
 
152
        }
 
153
 
 
154
        /* Null-terminated optional filename. */
 
155
        if (header_flags & 8) {
 
156
                do {
 
157
                        ++len;
 
158
                        if (avail < len)
 
159
                                p = __archive_read_filter_ahead(filter,
 
160
                                    len, &avail);
 
161
                        if (p == NULL)
 
162
                                return (0);
 
163
                } while (p[len - 1] != 0);
 
164
        }
 
165
 
 
166
        /* Null-terminated optional comment. */
 
167
        if (header_flags & 16) {
 
168
                do {
 
169
                        ++len;
 
170
                        if (avail < len)
 
171
                                p = __archive_read_filter_ahead(filter,
 
172
                                    len, &avail);
 
173
                        if (p == NULL)
 
174
                                return (0);
 
175
                } while (p[len - 1] != 0);
 
176
        }
 
177
 
 
178
        /* Optional header CRC */
 
179
        if ((header_flags & 2)) {
 
180
                p = __archive_read_filter_ahead(filter, len + 2, &avail);
 
181
                if (p == NULL)
 
182
                        return (0);
 
183
#if 0
 
184
        int hcrc = ((int)p[len + 1] << 8) | (int)p[len];
 
185
        int crc = /* XXX TODO: Compute header CRC. */;
 
186
        if (crc != hcrc)
 
187
                return (0);
 
188
        bits += 16;
 
189
#endif
 
190
                len += 2;
 
191
        }
 
192
 
 
193
        if (pbits != NULL)
 
194
                *pbits = bits;
 
195
        return (len);
 
196
}
 
197
 
 
198
/*
 
199
 * Bidder just verifies the header and returns the number of verified bits.
 
200
 */
 
201
static int
 
202
gzip_bidder_bid(struct archive_read_filter_bidder *self,
 
203
    struct archive_read_filter *filter)
 
204
{
 
205
        int bits_checked;
 
206
 
 
207
        (void)self; /* UNUSED */
 
208
 
 
209
        if (peek_at_header(filter, &bits_checked))
 
210
                return (bits_checked);
 
211
        return (0);
 
212
}
 
213
 
 
214
 
 
215
#ifndef HAVE_ZLIB_H
 
216
 
 
217
/*
 
218
 * If we don't have the library on this system, we can't do the
 
219
 * decompression directly.  We can, however, try to run gunzip
 
220
 * in case that's available.
 
221
 */
 
222
static int
 
223
gzip_bidder_init(struct archive_read_filter *self)
 
224
{
 
225
        int r;
 
226
 
 
227
        r = __archive_read_program(self, "gunzip");
 
228
        /* Note: We set the format here even if __archive_read_program()
 
229
         * above fails.  We do, after all, know what the format is
 
230
         * even if we weren't able to read it. */
 
231
        self->code = ARCHIVE_COMPRESSION_GZIP;
 
232
        self->name = "gzip";
 
233
        return (r);
 
234
}
 
235
 
 
236
#else
 
237
 
 
238
/*
 
239
 * Initialize the filter object.
 
240
 */
 
241
static int
 
242
gzip_bidder_init(struct archive_read_filter *self)
 
243
{
 
244
        struct private_data *state;
 
245
        static const size_t out_block_size = 64 * 1024;
 
246
        void *out_block;
 
247
 
 
248
        self->code = ARCHIVE_COMPRESSION_GZIP;
 
249
        self->name = "gzip";
 
250
 
 
251
        state = (struct private_data *)calloc(sizeof(*state), 1);
 
252
        out_block = (unsigned char *)malloc(out_block_size);
 
253
        if (state == NULL || out_block == NULL) {
 
254
                free(out_block);
 
255
                free(state);
 
256
                archive_set_error(&self->archive->archive, ENOMEM,
 
257
                    "Can't allocate data for gzip decompression");
 
258
                return (ARCHIVE_FATAL);
 
259
        }
 
260
 
 
261
        self->data = state;
 
262
        state->out_block_size = out_block_size;
 
263
        state->out_block = out_block;
 
264
        self->read = gzip_filter_read;
 
265
        self->skip = NULL; /* not supported */
 
266
        self->close = gzip_filter_close;
 
267
 
 
268
        state->in_stream = 0; /* We're not actually within a stream yet. */
 
269
 
 
270
        return (ARCHIVE_OK);
 
271
}
 
272
 
 
273
static int
 
274
consume_header(struct archive_read_filter *self)
 
275
{
 
276
        struct private_data *state;
 
277
        ssize_t avail;
 
278
        size_t len;
 
279
        int ret;
 
280
 
 
281
        state = (struct private_data *)self->data;
 
282
 
 
283
        /* If this is a real header, consume it. */
 
284
        len = peek_at_header(self->upstream, NULL);
 
285
        if (len == 0)
 
286
                return (ARCHIVE_EOF);
 
287
        __archive_read_filter_consume(self->upstream, len);
 
288
 
 
289
        /* Initialize CRC accumulator. */
 
290
        state->crc = crc32(0L, NULL, 0);
 
291
 
 
292
        /* Initialize compression library. */
 
293
        state->stream.next_in = (unsigned char *)(uintptr_t)
 
294
            __archive_read_filter_ahead(self->upstream, 1, &avail);
 
295
        state->stream.avail_in = avail;
 
296
        ret = inflateInit2(&(state->stream),
 
297
            -15 /* Don't check for zlib header */);
 
298
 
 
299
        /* Decipher the error code. */
 
300
        switch (ret) {
 
301
        case Z_OK:
 
302
                state->in_stream = 1;
 
303
                return (ARCHIVE_OK);
 
304
        case Z_STREAM_ERROR:
 
305
                archive_set_error(&self->archive->archive,
 
306
                    ARCHIVE_ERRNO_MISC,
 
307
                    "Internal error initializing compression library: "
 
308
                    "invalid setup parameter");
 
309
                break;
 
310
        case Z_MEM_ERROR:
 
311
                archive_set_error(&self->archive->archive, ENOMEM,
 
312
                    "Internal error initializing compression library: "
 
313
                    "out of memory");
 
314
                break;
 
315
        case Z_VERSION_ERROR:
 
316
                archive_set_error(&self->archive->archive,
 
317
                    ARCHIVE_ERRNO_MISC,
 
318
                    "Internal error initializing compression library: "
 
319
                    "invalid library version");
 
320
                break;
 
321
        default:
 
322
                archive_set_error(&self->archive->archive,
 
323
                    ARCHIVE_ERRNO_MISC,
 
324
                    "Internal error initializing compression library: "
 
325
                    " Zlib error %d", ret);
 
326
                break;
 
327
        }
 
328
        return (ARCHIVE_FATAL);
 
329
}
 
330
 
 
331
static int
 
332
consume_trailer(struct archive_read_filter *self)
 
333
{
 
334
        struct private_data *state;
 
335
        const unsigned char *p;
 
336
        ssize_t avail;
 
337
 
 
338
        state = (struct private_data *)self->data;
 
339
 
 
340
        state->in_stream = 0;
 
341
        switch (inflateEnd(&(state->stream))) {
 
342
        case Z_OK:
 
343
                break;
 
344
        default:
 
345
                archive_set_error(&self->archive->archive,
 
346
                    ARCHIVE_ERRNO_MISC,
 
347
                    "Failed to clean up gzip decompressor");
 
348
                return (ARCHIVE_FATAL);
 
349
        }
 
350
 
 
351
        /* GZip trailer is a fixed 8 byte structure. */
 
352
        p = __archive_read_filter_ahead(self->upstream, 8, &avail);
 
353
        if (p == NULL || avail == 0)
 
354
                return (ARCHIVE_FATAL);
 
355
 
 
356
        /* XXX TODO: Verify the length and CRC. */
 
357
 
 
358
        /* We've verified the trailer, so consume it now. */
 
359
        __archive_read_filter_consume(self->upstream, 8);
 
360
 
 
361
        return (ARCHIVE_OK);
 
362
}
 
363
 
 
364
static ssize_t
 
365
gzip_filter_read(struct archive_read_filter *self, const void **p)
 
366
{
 
367
        struct private_data *state;
 
368
        size_t decompressed;
 
369
        ssize_t avail_in;
 
370
        int ret;
 
371
 
 
372
        state = (struct private_data *)self->data;
 
373
 
 
374
        /* Empty our output buffer. */
 
375
        state->stream.next_out = state->out_block;
 
376
        state->stream.avail_out = state->out_block_size;
 
377
 
 
378
        /* Try to fill the output buffer. */
 
379
        while (state->stream.avail_out > 0 && !state->eof) {
 
380
                /* If we're not in a stream, read a header
 
381
                 * and initialize the decompression library. */
 
382
                if (!state->in_stream) {
 
383
                        ret = consume_header(self);
 
384
                        if (ret == ARCHIVE_EOF) {
 
385
                                state->eof = 1;
 
386
                                break;
 
387
                        }
 
388
                        if (ret < ARCHIVE_OK)
 
389
                                return (ret);
 
390
                }
 
391
 
 
392
                /* Peek at the next available data. */
 
393
                /* ZLib treats stream.next_in as const but doesn't declare
 
394
                 * it so, hence this ugly cast. */
 
395
                state->stream.next_in = (unsigned char *)(uintptr_t)
 
396
                    __archive_read_filter_ahead(self->upstream, 1, &avail_in);
 
397
                if (state->stream.next_in == NULL)
 
398
                        return (ARCHIVE_FATAL);
 
399
                state->stream.avail_in = avail_in;
 
400
 
 
401
                /* Decompress and consume some of that data. */
 
402
                ret = inflate(&(state->stream), 0);
 
403
                switch (ret) {
 
404
                case Z_OK: /* Decompressor made some progress. */
 
405
                        __archive_read_filter_consume(self->upstream,
 
406
                            avail_in - state->stream.avail_in);
 
407
                        break;
 
408
                case Z_STREAM_END: /* Found end of stream. */
 
409
                        __archive_read_filter_consume(self->upstream,
 
410
                            avail_in - state->stream.avail_in);
 
411
                        /* Consume the stream trailer; release the
 
412
                         * decompression library. */
 
413
                        ret = consume_trailer(self);
 
414
                        if (ret < ARCHIVE_OK)
 
415
                                return (ret);
 
416
                        break;
 
417
                default:
 
418
                        /* Return an error. */
 
419
                        archive_set_error(&self->archive->archive,
 
420
                            ARCHIVE_ERRNO_MISC,
 
421
                            "gzip decompression failed");
 
422
                        return (ARCHIVE_FATAL);
 
423
                }
 
424
        }
 
425
 
 
426
        /* We've read as much as we can. */
 
427
        decompressed = state->stream.next_out - state->out_block;
 
428
        state->total_out += decompressed;
 
429
        if (decompressed == 0)
 
430
                *p = NULL;
 
431
        else
 
432
                *p = state->out_block;
 
433
        return (decompressed);
 
434
}
 
435
 
 
436
/*
 
437
 * Clean up the decompressor.
 
438
 */
 
439
static int
 
440
gzip_filter_close(struct archive_read_filter *self)
 
441
{
 
442
        struct private_data *state;
 
443
        int ret;
 
444
 
 
445
        state = (struct private_data *)self->data;
 
446
        ret = ARCHIVE_OK;
 
447
 
 
448
        if (state->in_stream) {
 
449
                switch (inflateEnd(&(state->stream))) {
 
450
                case Z_OK:
 
451
                        break;
 
452
                default:
 
453
                        archive_set_error(&(self->archive->archive),
 
454
                            ARCHIVE_ERRNO_MISC,
 
455
                            "Failed to clean up gzip compressor");
 
456
                        ret = ARCHIVE_FATAL;
 
457
                }
 
458
        }
 
459
 
 
460
        free(state->out_block);
 
461
        free(state);
 
462
        return (ret);
 
463
}
 
464
 
 
465
#endif /* HAVE_ZLIB_H */