~akopytov/percona-xtrabackup/bug1166888-2.1

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_write_set_compression_gzip.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: (391.1.5 staging)
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2003-2007 Tim Kientzle
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
#include "archive_platform.h"
 
27
 
 
28
__FBSDID("$FreeBSD: head/lib/libarchive/archive_write_set_compression_gzip.c 201081 2009-12-28 02:04:42Z kientzle $");
 
29
 
 
30
#ifdef HAVE_ERRNO_H
 
31
#include <errno.h>
 
32
#endif
 
33
#ifdef HAVE_STDLIB_H
 
34
#include <stdlib.h>
 
35
#endif
 
36
#ifdef HAVE_STRING_H
 
37
#include <string.h>
 
38
#endif
 
39
#include <time.h>
 
40
#ifdef HAVE_ZLIB_H
 
41
#include <zlib.h>
 
42
#endif
 
43
 
 
44
#include "archive.h"
 
45
#include "archive_private.h"
 
46
#include "archive_write_private.h"
 
47
 
 
48
#ifndef HAVE_ZLIB_H
 
49
int
 
50
archive_write_set_compression_gzip(struct archive *a)
 
51
{
 
52
        archive_set_error(a, ARCHIVE_ERRNO_MISC,
 
53
            "gzip compression not supported on this platform");
 
54
        return (ARCHIVE_FATAL);
 
55
}
 
56
#else
 
57
/* Don't compile this if we don't have zlib. */
 
58
 
 
59
struct private_data {
 
60
        z_stream         stream;
 
61
        int64_t          total_in;
 
62
        unsigned char   *compressed;
 
63
        size_t           compressed_buffer_size;
 
64
        unsigned long    crc;
 
65
};
 
66
 
 
67
struct private_config {
 
68
        int              compression_level;
 
69
};
 
70
 
 
71
 
 
72
/*
 
73
 * Yuck.  zlib.h is not const-correct, so I need this one bit
 
74
 * of ugly hackery to convert a const * pointer to a non-const pointer.
 
75
 */
 
76
#define SET_NEXT_IN(st,src)                                     \
 
77
        (st)->stream.next_in = (Bytef *)(uintptr_t)(const void *)(src)
 
78
 
 
79
static int      archive_compressor_gzip_finish(struct archive_write *);
 
80
static int      archive_compressor_gzip_init(struct archive_write *);
 
81
static int      archive_compressor_gzip_options(struct archive_write *,
 
82
                    const char *, const char *);
 
83
static int      archive_compressor_gzip_write(struct archive_write *,
 
84
                    const void *, size_t);
 
85
static int      drive_compressor(struct archive_write *, struct private_data *,
 
86
                    int finishing);
 
87
 
 
88
 
 
89
/*
 
90
 * Allocate, initialize and return a archive object.
 
91
 */
 
92
int
 
93
archive_write_set_compression_gzip(struct archive *_a)
 
94
{
 
95
        struct archive_write *a = (struct archive_write *)_a;
 
96
        struct private_config *config;
 
97
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
98
            ARCHIVE_STATE_NEW, "archive_write_set_compression_gzip");
 
99
        config = malloc(sizeof(*config));
 
100
        if (config == NULL) {
 
101
                archive_set_error(&a->archive, ENOMEM, "Out of memory");
 
102
                return (ARCHIVE_FATAL);
 
103
        }
 
104
        a->compressor.config = config;
 
105
        a->compressor.finish = &archive_compressor_gzip_finish;
 
106
        config->compression_level = Z_DEFAULT_COMPRESSION;
 
107
        a->compressor.init = &archive_compressor_gzip_init;
 
108
        a->compressor.options = &archive_compressor_gzip_options;
 
109
        a->archive.compression_code = ARCHIVE_COMPRESSION_GZIP;
 
110
        a->archive.compression_name = "gzip";
 
111
        return (ARCHIVE_OK);
 
112
}
 
113
 
 
114
/*
 
115
 * Setup callback.
 
116
 */
 
117
static int
 
118
archive_compressor_gzip_init(struct archive_write *a)
 
119
{
 
120
        int ret;
 
121
        struct private_data *state;
 
122
        struct private_config *config;
 
123
        time_t t;
 
124
 
 
125
        config = (struct private_config *)a->compressor.config;
 
126
 
 
127
        if (a->client_opener != NULL) {
 
128
                ret = (a->client_opener)(&a->archive, a->client_data);
 
129
                if (ret != ARCHIVE_OK)
 
130
                        return (ret);
 
131
        }
 
132
 
 
133
        /*
 
134
         * The next check is a temporary workaround until the gzip
 
135
         * code can be overhauled some.  The code should not require
 
136
         * that compressed_buffer_size == bytes_per_block.  Removing
 
137
         * this assumption will allow us to compress larger chunks at
 
138
         * a time, which should improve overall performance
 
139
         * marginally.  As a minor side-effect, such a cleanup would
 
140
         * allow us to support truly arbitrary block sizes.
 
141
         */
 
142
        if (a->bytes_per_block < 10) {
 
143
                archive_set_error(&a->archive, EINVAL,
 
144
                    "GZip compressor requires a minimum 10 byte block size");
 
145
                return (ARCHIVE_FATAL);
 
146
        }
 
147
 
 
148
        state = (struct private_data *)malloc(sizeof(*state));
 
149
        if (state == NULL) {
 
150
                archive_set_error(&a->archive, ENOMEM,
 
151
                    "Can't allocate data for compression");
 
152
                return (ARCHIVE_FATAL);
 
153
        }
 
154
        memset(state, 0, sizeof(*state));
 
155
 
 
156
        /*
 
157
         * See comment above.  We should set compressed_buffer_size to
 
158
         * max(bytes_per_block, 65536), but the code can't handle that yet.
 
159
         */
 
160
        state->compressed_buffer_size = a->bytes_per_block;
 
161
        state->compressed = (unsigned char *)malloc(state->compressed_buffer_size);
 
162
        state->crc = crc32(0L, NULL, 0);
 
163
 
 
164
        if (state->compressed == NULL) {
 
165
                archive_set_error(&a->archive, ENOMEM,
 
166
                    "Can't allocate data for compression buffer");
 
167
                free(state);
 
168
                return (ARCHIVE_FATAL);
 
169
        }
 
170
 
 
171
        state->stream.next_out = state->compressed;
 
172
        state->stream.avail_out = state->compressed_buffer_size;
 
173
 
 
174
        /* Prime output buffer with a gzip header. */
 
175
        t = time(NULL);
 
176
        state->compressed[0] = 0x1f; /* GZip signature bytes */
 
177
        state->compressed[1] = 0x8b;
 
178
        state->compressed[2] = 0x08; /* "Deflate" compression */
 
179
        state->compressed[3] = 0; /* No options */
 
180
        state->compressed[4] = (t)&0xff;  /* Timestamp */
 
181
        state->compressed[5] = (t>>8)&0xff;
 
182
        state->compressed[6] = (t>>16)&0xff;
 
183
        state->compressed[7] = (t>>24)&0xff;
 
184
        state->compressed[8] = 0; /* No deflate options */
 
185
        state->compressed[9] = 3; /* OS=Unix */
 
186
        state->stream.next_out += 10;
 
187
        state->stream.avail_out -= 10;
 
188
 
 
189
        a->compressor.write = archive_compressor_gzip_write;
 
190
 
 
191
        /* Initialize compression library. */
 
192
        ret = deflateInit2(&(state->stream),
 
193
            config->compression_level,
 
194
            Z_DEFLATED,
 
195
            -15 /* < 0 to suppress zlib header */,
 
196
            8,
 
197
            Z_DEFAULT_STRATEGY);
 
198
 
 
199
        if (ret == Z_OK) {
 
200
                a->compressor.data = state;
 
201
                return (0);
 
202
        }
 
203
 
 
204
        /* Library setup failed: clean up. */
 
205
        archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC, "Internal error "
 
206
            "initializing compression library");
 
207
        free(state->compressed);
 
208
        free(state);
 
209
 
 
210
        /* Override the error message if we know what really went wrong. */
 
211
        switch (ret) {
 
212
        case Z_STREAM_ERROR:
 
213
                archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC,
 
214
                    "Internal error initializing "
 
215
                    "compression library: invalid setup parameter");
 
216
                break;
 
217
        case Z_MEM_ERROR:
 
218
                archive_set_error(&a->archive, ENOMEM, "Internal error initializing "
 
219
                    "compression library");
 
220
                break;
 
221
        case Z_VERSION_ERROR:
 
222
                archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC,
 
223
                    "Internal error initializing "
 
224
                    "compression library: invalid library version");
 
225
                break;
 
226
        }
 
227
 
 
228
        return (ARCHIVE_FATAL);
 
229
}
 
230
 
 
231
/*
 
232
 * Set write options.
 
233
 */
 
234
static int
 
235
archive_compressor_gzip_options(struct archive_write *a, const char *key,
 
236
    const char *value)
 
237
{
 
238
        struct private_config *config;
 
239
 
 
240
        config = (struct private_config *)a->compressor.config;
 
241
        if (strcmp(key, "compression-level") == 0) {
 
242
                if (value == NULL || !(value[0] >= '0' && value[0] <= '9') ||
 
243
                    value[1] != '\0')
 
244
                        return (ARCHIVE_WARN);
 
245
                config->compression_level = value[0] - '0';
 
246
                return (ARCHIVE_OK);
 
247
        }
 
248
 
 
249
        return (ARCHIVE_WARN);
 
250
}
 
251
 
 
252
/*
 
253
 * Write data to the compressed stream.
 
254
 */
 
255
static int
 
256
archive_compressor_gzip_write(struct archive_write *a, const void *buff,
 
257
    size_t length)
 
258
{
 
259
        struct private_data *state;
 
260
        int ret;
 
261
 
 
262
        state = (struct private_data *)a->compressor.data;
 
263
        if (a->client_writer == NULL) {
 
264
                archive_set_error(&a->archive, ARCHIVE_ERRNO_PROGRAMMER,
 
265
                    "No write callback is registered?  "
 
266
                    "This is probably an internal programming error.");
 
267
                return (ARCHIVE_FATAL);
 
268
        }
 
269
 
 
270
        /* Update statistics */
 
271
        state->crc = crc32(state->crc, (const Bytef *)buff, length);
 
272
        state->total_in += length;
 
273
 
 
274
        /* Compress input data to output buffer */
 
275
        SET_NEXT_IN(state, buff);
 
276
        state->stream.avail_in = length;
 
277
        if ((ret = drive_compressor(a, state, 0)) != ARCHIVE_OK)
 
278
                return (ret);
 
279
 
 
280
        a->archive.file_position += length;
 
281
        return (ARCHIVE_OK);
 
282
}
 
283
 
 
284
/*
 
285
 * Finish the compression...
 
286
 */
 
287
static int
 
288
archive_compressor_gzip_finish(struct archive_write *a)
 
289
{
 
290
        ssize_t block_length, target_block_length, bytes_written;
 
291
        int ret;
 
292
        struct private_data *state;
 
293
        unsigned tocopy;
 
294
        unsigned char trailer[8];
 
295
 
 
296
        state = (struct private_data *)a->compressor.data;
 
297
        ret = 0;
 
298
        if (state != NULL) {
 
299
                if (a->client_writer == NULL) {
 
300
                        archive_set_error(&a->archive,
 
301
                            ARCHIVE_ERRNO_PROGRAMMER,
 
302
                            "No write callback is registered?  "
 
303
                            "This is probably an internal programming error.");
 
304
                        ret = ARCHIVE_FATAL;
 
305
                        goto cleanup;
 
306
                }
 
307
 
 
308
                /* By default, always pad the uncompressed data. */
 
309
                if (a->pad_uncompressed) {
 
310
                        tocopy = a->bytes_per_block -
 
311
                            (state->total_in % a->bytes_per_block);
 
312
                        while (tocopy > 0 && tocopy < (unsigned)a->bytes_per_block) {
 
313
                                SET_NEXT_IN(state, a->nulls);
 
314
                                state->stream.avail_in = tocopy < a->null_length ?
 
315
                                    tocopy : a->null_length;
 
316
                                state->crc = crc32(state->crc, a->nulls,
 
317
                                    state->stream.avail_in);
 
318
                                state->total_in += state->stream.avail_in;
 
319
                                tocopy -= state->stream.avail_in;
 
320
                                ret = drive_compressor(a, state, 0);
 
321
                                if (ret != ARCHIVE_OK)
 
322
                                        goto cleanup;
 
323
                        }
 
324
                }
 
325
 
 
326
                /* Finish compression cycle */
 
327
                if (((ret = drive_compressor(a, state, 1))) != ARCHIVE_OK)
 
328
                        goto cleanup;
 
329
 
 
330
                /* Build trailer: 4-byte CRC and 4-byte length. */
 
331
                trailer[0] = (state->crc)&0xff;
 
332
                trailer[1] = (state->crc >> 8)&0xff;
 
333
                trailer[2] = (state->crc >> 16)&0xff;
 
334
                trailer[3] = (state->crc >> 24)&0xff;
 
335
                trailer[4] = (state->total_in)&0xff;
 
336
                trailer[5] = (state->total_in >> 8)&0xff;
 
337
                trailer[6] = (state->total_in >> 16)&0xff;
 
338
                trailer[7] = (state->total_in >> 24)&0xff;
 
339
 
 
340
                /* Add trailer to current block. */
 
341
                tocopy = 8;
 
342
                if (tocopy > state->stream.avail_out)
 
343
                        tocopy = state->stream.avail_out;
 
344
                memcpy(state->stream.next_out, trailer, tocopy);
 
345
                state->stream.next_out += tocopy;
 
346
                state->stream.avail_out -= tocopy;
 
347
 
 
348
                /* If it overflowed, flush and start a new block. */
 
349
                if (tocopy < 8) {
 
350
                        bytes_written = (a->client_writer)(&a->archive, a->client_data,
 
351
                            state->compressed, state->compressed_buffer_size);
 
352
                        if (bytes_written <= 0) {
 
353
                                ret = ARCHIVE_FATAL;
 
354
                                goto cleanup;
 
355
                        }
 
356
                        a->archive.raw_position += bytes_written;
 
357
                        state->stream.next_out = state->compressed;
 
358
                        state->stream.avail_out = state->compressed_buffer_size;
 
359
                        memcpy(state->stream.next_out, trailer + tocopy, 8-tocopy);
 
360
                        state->stream.next_out += 8-tocopy;
 
361
                        state->stream.avail_out -= 8-tocopy;
 
362
                }
 
363
 
 
364
                /* Optionally, pad the final compressed block. */
 
365
                block_length = state->stream.next_out - state->compressed;
 
366
 
 
367
                /* Tricky calculation to determine size of last block. */
 
368
                if (a->bytes_in_last_block <= 0)
 
369
                        /* Default or Zero: pad to full block */
 
370
                        target_block_length = a->bytes_per_block;
 
371
                else
 
372
                        /* Round length to next multiple of bytes_in_last_block. */
 
373
                        target_block_length = a->bytes_in_last_block *
 
374
                            ( (block_length + a->bytes_in_last_block - 1) /
 
375
                                a->bytes_in_last_block);
 
376
                if (target_block_length > a->bytes_per_block)
 
377
                        target_block_length = a->bytes_per_block;
 
378
                if (block_length < target_block_length) {
 
379
                        memset(state->stream.next_out, 0,
 
380
                            target_block_length - block_length);
 
381
                        block_length = target_block_length;
 
382
                }
 
383
 
 
384
                /* Write the last block */
 
385
                bytes_written = (a->client_writer)(&a->archive, a->client_data,
 
386
                    state->compressed, block_length);
 
387
                if (bytes_written <= 0) {
 
388
                        ret = ARCHIVE_FATAL;
 
389
                        goto cleanup;
 
390
                }
 
391
                a->archive.raw_position += bytes_written;
 
392
 
 
393
                /* Cleanup: shut down compressor, release memory, etc. */
 
394
        cleanup:
 
395
                switch (deflateEnd(&(state->stream))) {
 
396
                case Z_OK:
 
397
                        break;
 
398
                default:
 
399
                        archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC,
 
400
                            "Failed to clean up compressor");
 
401
                        ret = ARCHIVE_FATAL;
 
402
                }
 
403
                free(state->compressed);
 
404
                free(state);
 
405
        }
 
406
        /* Clean up config area even if we never initialized. */
 
407
        free(a->compressor.config);
 
408
        a->compressor.config = NULL;
 
409
        return (ret);
 
410
}
 
411
 
 
412
/*
 
413
 * Utility function to push input data through compressor,
 
414
 * writing full output blocks as necessary.
 
415
 *
 
416
 * Note that this handles both the regular write case (finishing ==
 
417
 * false) and the end-of-archive case (finishing == true).
 
418
 */
 
419
static int
 
420
drive_compressor(struct archive_write *a, struct private_data *state, int finishing)
 
421
{
 
422
        ssize_t bytes_written;
 
423
        int ret;
 
424
 
 
425
        for (;;) {
 
426
                if (state->stream.avail_out == 0) {
 
427
                        bytes_written = (a->client_writer)(&a->archive,
 
428
                            a->client_data, state->compressed,
 
429
                            state->compressed_buffer_size);
 
430
                        if (bytes_written <= 0) {
 
431
                                /* TODO: Handle this write failure */
 
432
                                return (ARCHIVE_FATAL);
 
433
                        } else if ((size_t)bytes_written < state->compressed_buffer_size) {
 
434
                                /* Short write: Move remaining to
 
435
                                 * front of block and keep filling */
 
436
                                memmove(state->compressed,
 
437
                                    state->compressed + bytes_written,
 
438
                                    state->compressed_buffer_size - bytes_written);
 
439
                        }
 
440
                        a->archive.raw_position += bytes_written;
 
441
                        state->stream.next_out
 
442
                            = state->compressed +
 
443
                            state->compressed_buffer_size - bytes_written;
 
444
                        state->stream.avail_out = bytes_written;
 
445
                }
 
446
 
 
447
                /* If there's nothing to do, we're done. */
 
448
                if (!finishing && state->stream.avail_in == 0)
 
449
                        return (ARCHIVE_OK);
 
450
 
 
451
                ret = deflate(&(state->stream),
 
452
                    finishing ? Z_FINISH : Z_NO_FLUSH );
 
453
 
 
454
                switch (ret) {
 
455
                case Z_OK:
 
456
                        /* In non-finishing case, check if compressor
 
457
                         * consumed everything */
 
458
                        if (!finishing && state->stream.avail_in == 0)
 
459
                                return (ARCHIVE_OK);
 
460
                        /* In finishing case, this return always means
 
461
                         * there's more work */
 
462
                        break;
 
463
                case Z_STREAM_END:
 
464
                        /* This return can only occur in finishing case. */
 
465
                        return (ARCHIVE_OK);
 
466
                default:
 
467
                        /* Any other return value indicates an error. */
 
468
                        archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC,
 
469
                            "GZip compression failed:"
 
470
                            " deflate() call returned status %d",
 
471
                            ret);
 
472
                        return (ARCHIVE_FATAL);
 
473
                }
 
474
        }
 
475
}
 
476
 
 
477
#endif /* HAVE_ZLIB_H */