~akopytov/percona-xtrabackup/bug1166888-2.0

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_write_set_compression_bzip2.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2003-2007 Tim Kientzle
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
#include "archive_platform.h"
 
27
 
 
28
__FBSDID("$FreeBSD: head/lib/libarchive/archive_write_set_compression_bzip2.c 201091 2009-12-28 02:22:41Z kientzle $");
 
29
 
 
30
#ifdef HAVE_ERRNO_H
 
31
#include <errno.h>
 
32
#endif
 
33
#include <stdio.h>
 
34
#ifdef HAVE_STDLIB_H
 
35
#include <stdlib.h>
 
36
#endif
 
37
#ifdef HAVE_STRING_H
 
38
#include <string.h>
 
39
#endif
 
40
#ifdef HAVE_BZLIB_H
 
41
#include <bzlib.h>
 
42
#endif
 
43
 
 
44
#include "archive.h"
 
45
#include "archive_private.h"
 
46
#include "archive_write_private.h"
 
47
 
 
48
#if !defined(HAVE_BZLIB_H) || !defined(BZ_CONFIG_ERROR)
 
49
int
 
50
archive_write_set_compression_bzip2(struct archive *a)
 
51
{
 
52
        archive_set_error(a, ARCHIVE_ERRNO_MISC,
 
53
            "bzip2 compression not supported on this platform");
 
54
        return (ARCHIVE_FATAL);
 
55
}
 
56
#else
 
57
/* Don't compile this if we don't have bzlib. */
 
58
 
 
59
struct private_data {
 
60
        bz_stream        stream;
 
61
        int64_t          total_in;
 
62
        char            *compressed;
 
63
        size_t           compressed_buffer_size;
 
64
};
 
65
 
 
66
struct private_config {
 
67
        int              compression_level;
 
68
};
 
69
 
 
70
/*
 
71
 * Yuck.  bzlib.h is not const-correct, so I need this one bit
 
72
 * of ugly hackery to convert a const * pointer to a non-const pointer.
 
73
 */
 
74
#define SET_NEXT_IN(st,src)                                     \
 
75
        (st)->stream.next_in = (char *)(uintptr_t)(const void *)(src)
 
76
 
 
77
static int      archive_compressor_bzip2_finish(struct archive_write *);
 
78
static int      archive_compressor_bzip2_init(struct archive_write *);
 
79
static int      archive_compressor_bzip2_options(struct archive_write *,
 
80
                    const char *, const char *);
 
81
static int      archive_compressor_bzip2_write(struct archive_write *,
 
82
                    const void *, size_t);
 
83
static int      drive_compressor(struct archive_write *, struct private_data *,
 
84
                    int finishing);
 
85
 
 
86
/*
 
87
 * Allocate, initialize and return an archive object.
 
88
 */
 
89
int
 
90
archive_write_set_compression_bzip2(struct archive *_a)
 
91
{
 
92
        struct archive_write *a = (struct archive_write *)_a;
 
93
        struct private_config *config;
 
94
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
95
            ARCHIVE_STATE_NEW, "archive_write_set_compression_bzip2");
 
96
        config = malloc(sizeof(*config));
 
97
        if (config == NULL) {
 
98
                archive_set_error(&a->archive, ENOMEM, "Out of memory");
 
99
                return (ARCHIVE_FATAL);
 
100
        }
 
101
        a->compressor.config = config;
 
102
        a->compressor.finish = archive_compressor_bzip2_finish;
 
103
        config->compression_level = 9; /* default */
 
104
        a->compressor.init = &archive_compressor_bzip2_init;
 
105
        a->compressor.options = &archive_compressor_bzip2_options;
 
106
        a->archive.compression_code = ARCHIVE_COMPRESSION_BZIP2;
 
107
        a->archive.compression_name = "bzip2";
 
108
        return (ARCHIVE_OK);
 
109
}
 
110
 
 
111
/*
 
112
 * Setup callback.
 
113
 */
 
114
static int
 
115
archive_compressor_bzip2_init(struct archive_write *a)
 
116
{
 
117
        int ret;
 
118
        struct private_data *state;
 
119
        struct private_config *config;
 
120
 
 
121
        config = (struct private_config *)a->compressor.config;
 
122
        if (a->client_opener != NULL) {
 
123
                ret = (a->client_opener)(&a->archive, a->client_data);
 
124
                if (ret != 0)
 
125
                        return (ret);
 
126
        }
 
127
 
 
128
        state = (struct private_data *)malloc(sizeof(*state));
 
129
        if (state == NULL) {
 
130
                archive_set_error(&a->archive, ENOMEM,
 
131
                    "Can't allocate data for compression");
 
132
                return (ARCHIVE_FATAL);
 
133
        }
 
134
        memset(state, 0, sizeof(*state));
 
135
 
 
136
        state->compressed_buffer_size = a->bytes_per_block;
 
137
        state->compressed = (char *)malloc(state->compressed_buffer_size);
 
138
 
 
139
        if (state->compressed == NULL) {
 
140
                archive_set_error(&a->archive, ENOMEM,
 
141
                    "Can't allocate data for compression buffer");
 
142
                free(state);
 
143
                return (ARCHIVE_FATAL);
 
144
        }
 
145
 
 
146
        state->stream.next_out = state->compressed;
 
147
        state->stream.avail_out = state->compressed_buffer_size;
 
148
        a->compressor.write = archive_compressor_bzip2_write;
 
149
 
 
150
        /* Initialize compression library */
 
151
        ret = BZ2_bzCompressInit(&(state->stream),
 
152
            config->compression_level, 0, 30);
 
153
        if (ret == BZ_OK) {
 
154
                a->compressor.data = state;
 
155
                return (ARCHIVE_OK);
 
156
        }
 
157
 
 
158
        /* Library setup failed: clean up. */
 
159
        archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC,
 
160
            "Internal error initializing compression library");
 
161
        free(state->compressed);
 
162
        free(state);
 
163
 
 
164
        /* Override the error message if we know what really went wrong. */
 
165
        switch (ret) {
 
166
        case BZ_PARAM_ERROR:
 
167
                archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC,
 
168
                    "Internal error initializing compression library: "
 
169
                    "invalid setup parameter");
 
170
                break;
 
171
        case BZ_MEM_ERROR:
 
172
                archive_set_error(&a->archive, ENOMEM,
 
173
                    "Internal error initializing compression library: "
 
174
                    "out of memory");
 
175
                break;
 
176
        case BZ_CONFIG_ERROR:
 
177
                archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC,
 
178
                    "Internal error initializing compression library: "
 
179
                    "mis-compiled library");
 
180
                break;
 
181
        }
 
182
 
 
183
        return (ARCHIVE_FATAL);
 
184
 
 
185
}
 
186
 
 
187
/*
 
188
 * Set write options.
 
189
 */
 
190
static int
 
191
archive_compressor_bzip2_options(struct archive_write *a, const char *key,
 
192
    const char *value)
 
193
{
 
194
        struct private_config *config;
 
195
 
 
196
        config = (struct private_config *)a->compressor.config;
 
197
        if (strcmp(key, "compression-level") == 0) {
 
198
                if (value == NULL || !(value[0] >= '0' && value[0] <= '9') ||
 
199
                    value[1] != '\0')
 
200
                        return (ARCHIVE_WARN);
 
201
                config->compression_level = value[0] - '0';
 
202
                /* Make '0' be a synonym for '1'. */
 
203
                /* This way, bzip2 compressor supports the same 0..9
 
204
                 * range of levels as gzip. */
 
205
                if (config->compression_level < 1)
 
206
                        config->compression_level = 1;
 
207
                return (ARCHIVE_OK);
 
208
        }
 
209
 
 
210
        return (ARCHIVE_WARN);
 
211
}
 
212
 
 
213
/*
 
214
 * Write data to the compressed stream.
 
215
 *
 
216
 * Returns ARCHIVE_OK if all data written, error otherwise.
 
217
 */
 
218
static int
 
219
archive_compressor_bzip2_write(struct archive_write *a, const void *buff,
 
220
    size_t length)
 
221
{
 
222
        struct private_data *state;
 
223
 
 
224
        state = (struct private_data *)a->compressor.data;
 
225
        if (a->client_writer == NULL) {
 
226
                archive_set_error(&a->archive, ARCHIVE_ERRNO_PROGRAMMER,
 
227
                    "No write callback is registered?  "
 
228
                    "This is probably an internal programming error.");
 
229
                return (ARCHIVE_FATAL);
 
230
        }
 
231
 
 
232
        /* Update statistics */
 
233
        state->total_in += length;
 
234
 
 
235
        /* Compress input data to output buffer */
 
236
        SET_NEXT_IN(state, buff);
 
237
        state->stream.avail_in = length;
 
238
        if (drive_compressor(a, state, 0))
 
239
                return (ARCHIVE_FATAL);
 
240
        a->archive.file_position += length;
 
241
        return (ARCHIVE_OK);
 
242
}
 
243
 
 
244
 
 
245
/*
 
246
 * Finish the compression.
 
247
 */
 
248
static int
 
249
archive_compressor_bzip2_finish(struct archive_write *a)
 
250
{
 
251
        ssize_t block_length;
 
252
        int ret;
 
253
        struct private_data *state;
 
254
        ssize_t target_block_length;
 
255
        ssize_t bytes_written;
 
256
        unsigned tocopy;
 
257
 
 
258
        ret = ARCHIVE_OK;
 
259
        state = (struct private_data *)a->compressor.data;
 
260
        if (state != NULL) {
 
261
                if (a->client_writer == NULL) {
 
262
                        archive_set_error(&a->archive,
 
263
                            ARCHIVE_ERRNO_PROGRAMMER,
 
264
                            "No write callback is registered?\n"
 
265
                            "This is probably an internal programming error.");
 
266
                        ret = ARCHIVE_FATAL;
 
267
                        goto cleanup;
 
268
                }
 
269
 
 
270
                /* By default, always pad the uncompressed data. */
 
271
                if (a->pad_uncompressed) {
 
272
                        tocopy = a->bytes_per_block -
 
273
                            (state->total_in % a->bytes_per_block);
 
274
                        while (tocopy > 0 && tocopy < (unsigned)a->bytes_per_block) {
 
275
                                SET_NEXT_IN(state, a->nulls);
 
276
                                state->stream.avail_in = tocopy < a->null_length ?
 
277
                                    tocopy : a->null_length;
 
278
                                state->total_in += state->stream.avail_in;
 
279
                                tocopy -= state->stream.avail_in;
 
280
                                ret = drive_compressor(a, state, 0);
 
281
                                if (ret != ARCHIVE_OK)
 
282
                                        goto cleanup;
 
283
                        }
 
284
                }
 
285
 
 
286
                /* Finish compression cycle. */
 
287
                if ((ret = drive_compressor(a, state, 1)))
 
288
                        goto cleanup;
 
289
 
 
290
                /* Optionally, pad the final compressed block. */
 
291
                block_length = state->stream.next_out - state->compressed;
 
292
 
 
293
                /* Tricky calculation to determine size of last block. */
 
294
                if (a->bytes_in_last_block <= 0)
 
295
                        /* Default or Zero: pad to full block */
 
296
                        target_block_length = a->bytes_per_block;
 
297
                else
 
298
                        /* Round length to next multiple of bytes_in_last_block. */
 
299
                        target_block_length = a->bytes_in_last_block *
 
300
                            ( (block_length + a->bytes_in_last_block - 1) /
 
301
                                a->bytes_in_last_block);
 
302
                if (target_block_length > a->bytes_per_block)
 
303
                        target_block_length = a->bytes_per_block;
 
304
                if (block_length < target_block_length) {
 
305
                        memset(state->stream.next_out, 0,
 
306
                            target_block_length - block_length);
 
307
                        block_length = target_block_length;
 
308
                }
 
309
 
 
310
                /* Write the last block */
 
311
                bytes_written = (a->client_writer)(&a->archive, a->client_data,
 
312
                    state->compressed, block_length);
 
313
 
 
314
                /* TODO: Handle short write of final block. */
 
315
                if (bytes_written <= 0)
 
316
                        ret = ARCHIVE_FATAL;
 
317
                else {
 
318
                        a->archive.raw_position += ret;
 
319
                        ret = ARCHIVE_OK;
 
320
                }
 
321
 
 
322
                /* Cleanup: shut down compressor, release memory, etc. */
 
323
cleanup:
 
324
                switch (BZ2_bzCompressEnd(&(state->stream))) {
 
325
                case BZ_OK:
 
326
                        break;
 
327
                default:
 
328
                        archive_set_error(&a->archive, ARCHIVE_ERRNO_PROGRAMMER,
 
329
                            "Failed to clean up compressor");
 
330
                        ret = ARCHIVE_FATAL;
 
331
                }
 
332
 
 
333
                free(state->compressed);
 
334
                free(state);
 
335
        }
 
336
        /* Free configuration data even if we were never fully initialized. */
 
337
        free(a->compressor.config);
 
338
        a->compressor.config = NULL;
 
339
        return (ret);
 
340
}
 
341
 
 
342
/*
 
343
 * Utility function to push input data through compressor, writing
 
344
 * full output blocks as necessary.
 
345
 *
 
346
 * Note that this handles both the regular write case (finishing ==
 
347
 * false) and the end-of-archive case (finishing == true).
 
348
 */
 
349
static int
 
350
drive_compressor(struct archive_write *a, struct private_data *state, int finishing)
 
351
{
 
352
        ssize_t bytes_written;
 
353
        int ret;
 
354
 
 
355
        for (;;) {
 
356
                if (state->stream.avail_out == 0) {
 
357
                        bytes_written = (a->client_writer)(&a->archive,
 
358
                            a->client_data, state->compressed,
 
359
                            state->compressed_buffer_size);
 
360
                        if (bytes_written <= 0) {
 
361
                                /* TODO: Handle this write failure */
 
362
                                return (ARCHIVE_FATAL);
 
363
                        } else if ((size_t)bytes_written < state->compressed_buffer_size) {
 
364
                                /* Short write: Move remainder to
 
365
                                 * front and keep filling */
 
366
                                memmove(state->compressed,
 
367
                                    state->compressed + bytes_written,
 
368
                                    state->compressed_buffer_size - bytes_written);
 
369
                        }
 
370
 
 
371
                        a->archive.raw_position += bytes_written;
 
372
                        state->stream.next_out = state->compressed +
 
373
                            state->compressed_buffer_size - bytes_written;
 
374
                        state->stream.avail_out = bytes_written;
 
375
                }
 
376
 
 
377
                /* If there's nothing to do, we're done. */
 
378
                if (!finishing && state->stream.avail_in == 0)
 
379
                        return (ARCHIVE_OK);
 
380
 
 
381
                ret = BZ2_bzCompress(&(state->stream),
 
382
                    finishing ? BZ_FINISH : BZ_RUN);
 
383
 
 
384
                switch (ret) {
 
385
                case BZ_RUN_OK:
 
386
                        /* In non-finishing case, did compressor
 
387
                         * consume everything? */
 
388
                        if (!finishing && state->stream.avail_in == 0)
 
389
                                return (ARCHIVE_OK);
 
390
                        break;
 
391
                case BZ_FINISH_OK:  /* Finishing: There's more work to do */
 
392
                        break;
 
393
                case BZ_STREAM_END: /* Finishing: all done */
 
394
                        /* Only occurs in finishing case */
 
395
                        return (ARCHIVE_OK);
 
396
                default:
 
397
                        /* Any other return value indicates an error */
 
398
                        archive_set_error(&a->archive,
 
399
                            ARCHIVE_ERRNO_PROGRAMMER,
 
400
                            "Bzip2 compression failed;"
 
401
                            " BZ2_bzCompress() returned %d",
 
402
                            ret);
 
403
                        return (ARCHIVE_FATAL);
 
404
                }
 
405
        }
 
406
}
 
407
 
 
408
#endif /* HAVE_BZLIB_H && BZ_CONFIG_ERROR */