~akopytov/percona-xtrabackup/bug1166888-2.0

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_write_set_compression_compress.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2008 Joerg Sonnenberger
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
/*-
 
27
 * Copyright (c) 1985, 1986, 1992, 1993
 
28
 *      The Regents of the University of California.  All rights reserved.
 
29
 *
 
30
 * This code is derived from software contributed to Berkeley by
 
31
 * Diomidis Spinellis and James A. Woods, derived from original
 
32
 * work by Spencer Thomas and Joseph Orost.
 
33
 *
 
34
 * Redistribution and use in source and binary forms, with or without
 
35
 * modification, are permitted provided that the following conditions
 
36
 * are met:
 
37
 * 1. Redistributions of source code must retain the above copyright
 
38
 *    notice, this list of conditions and the following disclaimer.
 
39
 * 2. Redistributions in binary form must reproduce the above copyright
 
40
 *    notice, this list of conditions and the following disclaimer in the
 
41
 *    documentation and/or other materials provided with the distribution.
 
42
 * 3. Neither the name of the University nor the names of its contributors
 
43
 *    may be used to endorse or promote products derived from this software
 
44
 *    without specific prior written permission.
 
45
 *
 
46
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
 
47
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 
48
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 
49
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
 
50
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 
51
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 
52
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 
53
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
54
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 
55
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 
56
 * SUCH DAMAGE.
 
57
 */
 
58
 
 
59
#include "archive_platform.h"
 
60
 
 
61
__FBSDID("$FreeBSD: head/lib/libarchive/archive_write_set_compression_compress.c 201111 2009-12-28 03:33:05Z kientzle $");
 
62
 
 
63
#ifdef HAVE_ERRNO_H
 
64
#include <errno.h>
 
65
#endif
 
66
#ifdef HAVE_STDLIB_H
 
67
#include <stdlib.h>
 
68
#endif
 
69
#ifdef HAVE_STRING_H
 
70
#include <string.h>
 
71
#endif
 
72
 
 
73
#include "archive.h"
 
74
#include "archive_private.h"
 
75
#include "archive_write_private.h"
 
76
 
 
77
#define HSIZE           69001   /* 95% occupancy */
 
78
#define HSHIFT          8       /* 8 - trunc(log2(HSIZE / 65536)) */
 
79
#define CHECK_GAP 10000         /* Ratio check interval. */
 
80
 
 
81
#define MAXCODE(bits)   ((1 << (bits)) - 1)
 
82
 
 
83
/*
 
84
 * the next two codes should not be changed lightly, as they must not
 
85
 * lie within the contiguous general code space.
 
86
 */
 
87
#define FIRST   257             /* First free entry. */
 
88
#define CLEAR   256             /* Table clear output code. */
 
89
 
 
90
struct private_data {
 
91
        off_t in_count, out_count, checkpoint;
 
92
 
 
93
        int code_len;                   /* Number of bits/code. */
 
94
        int cur_maxcode;                /* Maximum code, given n_bits. */
 
95
        int max_maxcode;                /* Should NEVER generate this code. */
 
96
        int hashtab [HSIZE];
 
97
        unsigned short codetab [HSIZE];
 
98
        int first_free;         /* First unused entry. */
 
99
        int compress_ratio;
 
100
 
 
101
        int cur_code, cur_fcode;
 
102
 
 
103
        int bit_offset;
 
104
        unsigned char bit_buf;
 
105
 
 
106
        unsigned char   *compressed;
 
107
        size_t           compressed_buffer_size;
 
108
        size_t           compressed_offset;
 
109
};
 
110
 
 
111
static int      archive_compressor_compress_finish(struct archive_write *);
 
112
static int      archive_compressor_compress_init(struct archive_write *);
 
113
static int      archive_compressor_compress_write(struct archive_write *,
 
114
                    const void *, size_t);
 
115
 
 
116
/*
 
117
 * Allocate, initialize and return a archive object.
 
118
 */
 
119
int
 
120
archive_write_set_compression_compress(struct archive *_a)
 
121
{
 
122
        struct archive_write *a = (struct archive_write *)_a;
 
123
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
124
            ARCHIVE_STATE_NEW, "archive_write_set_compression_compress");
 
125
        a->compressor.init = &archive_compressor_compress_init;
 
126
        a->archive.compression_code = ARCHIVE_COMPRESSION_COMPRESS;
 
127
        a->archive.compression_name = "compress";
 
128
        return (ARCHIVE_OK);
 
129
}
 
130
 
 
131
/*
 
132
 * Setup callback.
 
133
 */
 
134
static int
 
135
archive_compressor_compress_init(struct archive_write *a)
 
136
{
 
137
        int ret;
 
138
        struct private_data *state;
 
139
 
 
140
        a->archive.compression_code = ARCHIVE_COMPRESSION_COMPRESS;
 
141
        a->archive.compression_name = "compress";
 
142
 
 
143
        if (a->bytes_per_block < 4) {
 
144
                archive_set_error(&a->archive, EINVAL,
 
145
                    "Can't write Compress header as single block");
 
146
                return (ARCHIVE_FATAL);
 
147
        }
 
148
 
 
149
        if (a->client_opener != NULL) {
 
150
                ret = (a->client_opener)(&a->archive, a->client_data);
 
151
                if (ret != ARCHIVE_OK)
 
152
                        return (ret);
 
153
        }
 
154
 
 
155
        state = (struct private_data *)malloc(sizeof(*state));
 
156
        if (state == NULL) {
 
157
                archive_set_error(&a->archive, ENOMEM,
 
158
                    "Can't allocate data for compression");
 
159
                return (ARCHIVE_FATAL);
 
160
        }
 
161
        memset(state, 0, sizeof(*state));
 
162
 
 
163
        state->compressed_buffer_size = a->bytes_per_block;
 
164
        state->compressed = malloc(state->compressed_buffer_size);
 
165
 
 
166
        if (state->compressed == NULL) {
 
167
                archive_set_error(&a->archive, ENOMEM,
 
168
                    "Can't allocate data for compression buffer");
 
169
                free(state);
 
170
                return (ARCHIVE_FATAL);
 
171
        }
 
172
 
 
173
        a->compressor.write = archive_compressor_compress_write;
 
174
        a->compressor.finish = archive_compressor_compress_finish;
 
175
 
 
176
        state->max_maxcode = 0x10000;   /* Should NEVER generate this code. */
 
177
        state->in_count = 0;            /* Length of input. */
 
178
        state->bit_buf = 0;
 
179
        state->bit_offset = 0;
 
180
        state->out_count = 3;           /* Includes 3-byte header mojo. */
 
181
        state->compress_ratio = 0;
 
182
        state->checkpoint = CHECK_GAP;
 
183
        state->code_len = 9;
 
184
        state->cur_maxcode = MAXCODE(state->code_len);
 
185
        state->first_free = FIRST;
 
186
 
 
187
        memset(state->hashtab, 0xff, sizeof(state->hashtab));
 
188
 
 
189
        /* Prime output buffer with a gzip header. */
 
190
        state->compressed[0] = 0x1f; /* Compress */
 
191
        state->compressed[1] = 0x9d;
 
192
        state->compressed[2] = 0x90; /* Block mode, 16bit max */
 
193
        state->compressed_offset = 3;
 
194
 
 
195
        a->compressor.data = state;
 
196
        return (0);
 
197
}
 
198
 
 
199
/*-
 
200
 * Output the given code.
 
201
 * Inputs:
 
202
 *      code:   A n_bits-bit integer.  If == -1, then EOF.  This assumes
 
203
 *              that n_bits =< (long)wordsize - 1.
 
204
 * Outputs:
 
205
 *      Outputs code to the file.
 
206
 * Assumptions:
 
207
 *      Chars are 8 bits long.
 
208
 * Algorithm:
 
209
 *      Maintain a BITS character long buffer (so that 8 codes will
 
210
 * fit in it exactly).  Use the VAX insv instruction to insert each
 
211
 * code in turn.  When the buffer fills up empty it and start over.
 
212
 */
 
213
 
 
214
static unsigned char rmask[9] =
 
215
        {0x00, 0x01, 0x03, 0x07, 0x0f, 0x1f, 0x3f, 0x7f, 0xff};
 
216
 
 
217
static int
 
218
output_byte(struct archive_write *a, unsigned char c)
 
219
{
 
220
        struct private_data *state = a->compressor.data;
 
221
        ssize_t bytes_written;
 
222
 
 
223
        state->compressed[state->compressed_offset++] = c;
 
224
        ++state->out_count;
 
225
 
 
226
        if (state->compressed_buffer_size == state->compressed_offset) {
 
227
                bytes_written = (a->client_writer)(&a->archive,
 
228
                    a->client_data,
 
229
                    state->compressed, state->compressed_buffer_size);
 
230
                if (bytes_written <= 0)
 
231
                        return ARCHIVE_FATAL;
 
232
                a->archive.raw_position += bytes_written;
 
233
                state->compressed_offset = 0;
 
234
        }
 
235
 
 
236
        return ARCHIVE_OK;
 
237
}
 
238
 
 
239
static int
 
240
output_code(struct archive_write *a, int ocode)
 
241
{
 
242
        struct private_data *state = a->compressor.data;
 
243
        int bits, ret, clear_flg, bit_offset;
 
244
 
 
245
        clear_flg = ocode == CLEAR;
 
246
 
 
247
        /*
 
248
         * Since ocode is always >= 8 bits, only need to mask the first
 
249
         * hunk on the left.
 
250
         */
 
251
        bit_offset = state->bit_offset % 8;
 
252
        state->bit_buf |= (ocode << bit_offset) & 0xff;
 
253
        output_byte(a, state->bit_buf);
 
254
 
 
255
        bits = state->code_len - (8 - bit_offset);
 
256
        ocode >>= 8 - bit_offset;
 
257
        /* Get any 8 bit parts in the middle (<=1 for up to 16 bits). */
 
258
        if (bits >= 8) {
 
259
                output_byte(a, ocode & 0xff);
 
260
                ocode >>= 8;
 
261
                bits -= 8;
 
262
        }
 
263
        /* Last bits. */
 
264
        state->bit_offset += state->code_len;
 
265
        state->bit_buf = ocode & rmask[bits];
 
266
        if (state->bit_offset == state->code_len * 8)
 
267
                state->bit_offset = 0;
 
268
 
 
269
        /*
 
270
         * If the next entry is going to be too big for the ocode size,
 
271
         * then increase it, if possible.
 
272
         */
 
273
        if (clear_flg || state->first_free > state->cur_maxcode) {
 
274
               /*
 
275
                * Write the whole buffer, because the input side won't
 
276
                * discover the size increase until after it has read it.
 
277
                */
 
278
                if (state->bit_offset > 0) {
 
279
                        while (state->bit_offset < state->code_len * 8) {
 
280
                                ret = output_byte(a, state->bit_buf);
 
281
                                if (ret != ARCHIVE_OK)
 
282
                                        return ret;
 
283
                                state->bit_offset += 8;
 
284
                                state->bit_buf = 0;
 
285
                        }
 
286
                }
 
287
                state->bit_buf = 0;
 
288
                state->bit_offset = 0;
 
289
 
 
290
                if (clear_flg) {
 
291
                        state->code_len = 9;
 
292
                        state->cur_maxcode = MAXCODE(state->code_len);
 
293
                } else {
 
294
                        state->code_len++;
 
295
                        if (state->code_len == 16)
 
296
                                state->cur_maxcode = state->max_maxcode;
 
297
                        else
 
298
                                state->cur_maxcode = MAXCODE(state->code_len);
 
299
                }
 
300
        }
 
301
 
 
302
        return (ARCHIVE_OK);
 
303
}
 
304
 
 
305
static int
 
306
output_flush(struct archive_write *a)
 
307
{
 
308
        struct private_data *state = a->compressor.data;
 
309
        int ret;
 
310
 
 
311
        /* At EOF, write the rest of the buffer. */
 
312
        if (state->bit_offset % 8) {
 
313
                state->code_len = (state->bit_offset % 8 + 7) / 8;
 
314
                ret = output_byte(a, state->bit_buf);
 
315
                if (ret != ARCHIVE_OK)
 
316
                        return ret;
 
317
        }
 
318
 
 
319
        return (ARCHIVE_OK);
 
320
}
 
321
 
 
322
/*
 
323
 * Write data to the compressed stream.
 
324
 */
 
325
static int
 
326
archive_compressor_compress_write(struct archive_write *a, const void *buff,
 
327
    size_t length)
 
328
{
 
329
        struct private_data *state;
 
330
        int i;
 
331
        int ratio;
 
332
        int c, disp, ret;
 
333
        const unsigned char *bp;
 
334
 
 
335
        state = (struct private_data *)a->compressor.data;
 
336
        if (a->client_writer == NULL) {
 
337
                archive_set_error(&a->archive, ARCHIVE_ERRNO_PROGRAMMER,
 
338
                    "No write callback is registered?  "
 
339
                    "This is probably an internal programming error.");
 
340
                return (ARCHIVE_FATAL);
 
341
        }
 
342
 
 
343
        if (length == 0)
 
344
                return ARCHIVE_OK;
 
345
 
 
346
        bp = buff;
 
347
 
 
348
        if (state->in_count == 0) {
 
349
                state->cur_code = *bp++;
 
350
                ++state->in_count;
 
351
                --length;
 
352
        }
 
353
 
 
354
        while (length--) {
 
355
                c = *bp++;
 
356
                state->in_count++;
 
357
                state->cur_fcode = (c << 16) + state->cur_code;
 
358
                i = ((c << HSHIFT) ^ state->cur_code);  /* Xor hashing. */
 
359
 
 
360
                if (state->hashtab[i] == state->cur_fcode) {
 
361
                        state->cur_code = state->codetab[i];
 
362
                        continue;
 
363
                }
 
364
                if (state->hashtab[i] < 0)      /* Empty slot. */
 
365
                        goto nomatch;
 
366
                /* Secondary hash (after G. Knott). */
 
367
                if (i == 0)
 
368
                        disp = 1;
 
369
                else
 
370
                        disp = HSIZE - i;
 
371
 probe:         
 
372
                if ((i -= disp) < 0)
 
373
                        i += HSIZE;
 
374
 
 
375
                if (state->hashtab[i] == state->cur_fcode) {
 
376
                        state->cur_code = state->codetab[i];
 
377
                        continue;
 
378
                }
 
379
                if (state->hashtab[i] >= 0)
 
380
                        goto probe;
 
381
 nomatch:       
 
382
                ret = output_code(a, state->cur_code);
 
383
                if (ret != ARCHIVE_OK)
 
384
                        return ret;
 
385
                state->cur_code = c;
 
386
                if (state->first_free < state->max_maxcode) {
 
387
                        state->codetab[i] = state->first_free++;        /* code -> hashtable */
 
388
                        state->hashtab[i] = state->cur_fcode;
 
389
                        continue;
 
390
                }
 
391
                if (state->in_count < state->checkpoint)
 
392
                        continue;
 
393
 
 
394
                state->checkpoint = state->in_count + CHECK_GAP;
 
395
 
 
396
                if (state->in_count <= 0x007fffff)
 
397
                        ratio = state->in_count * 256 / state->out_count;
 
398
                else if ((ratio = state->out_count / 256) == 0)
 
399
                        ratio = 0x7fffffff;
 
400
                else
 
401
                        ratio = state->in_count / ratio;
 
402
 
 
403
                if (ratio > state->compress_ratio)
 
404
                        state->compress_ratio = ratio;
 
405
                else {
 
406
                        state->compress_ratio = 0;
 
407
                        memset(state->hashtab, 0xff, sizeof(state->hashtab));
 
408
                        state->first_free = FIRST;
 
409
                        ret = output_code(a, CLEAR);
 
410
                        if (ret != ARCHIVE_OK)
 
411
                                return ret;
 
412
                }
 
413
        }
 
414
 
 
415
        return (ARCHIVE_OK);
 
416
}
 
417
 
 
418
 
 
419
/*
 
420
 * Finish the compression...
 
421
 */
 
422
static int
 
423
archive_compressor_compress_finish(struct archive_write *a)
 
424
{
 
425
        ssize_t block_length, target_block_length, bytes_written;
 
426
        int ret;
 
427
        struct private_data *state;
 
428
        size_t tocopy;
 
429
 
 
430
        state = (struct private_data *)a->compressor.data;
 
431
        if (a->client_writer == NULL) {
 
432
                archive_set_error(&a->archive, ARCHIVE_ERRNO_PROGRAMMER,
 
433
                    "No write callback is registered?  "
 
434
                    "This is probably an internal programming error.");
 
435
                ret = ARCHIVE_FATAL;
 
436
                goto cleanup;
 
437
        }
 
438
 
 
439
        /* By default, always pad the uncompressed data. */
 
440
        if (a->pad_uncompressed) {
 
441
                while (state->in_count % a->bytes_per_block != 0) {
 
442
                        tocopy = a->bytes_per_block -
 
443
                            (state->in_count % a->bytes_per_block);
 
444
                        if (tocopy > a->null_length)
 
445
                                tocopy = a->null_length;
 
446
                        ret = archive_compressor_compress_write(a, a->nulls,
 
447
                            tocopy);
 
448
                        if (ret != ARCHIVE_OK)
 
449
                                goto cleanup;
 
450
                }
 
451
        }
 
452
 
 
453
        ret = output_code(a, state->cur_code);
 
454
        if (ret != ARCHIVE_OK)
 
455
                goto cleanup;
 
456
        ret = output_flush(a);
 
457
        if (ret != ARCHIVE_OK)
 
458
                goto cleanup;
 
459
 
 
460
        /* Optionally, pad the final compressed block. */
 
461
        block_length = state->compressed_offset;
 
462
 
 
463
        /* Tricky calculation to determine size of last block. */
 
464
        if (a->bytes_in_last_block <= 0)
 
465
                /* Default or Zero: pad to full block */
 
466
                target_block_length = a->bytes_per_block;
 
467
        else
 
468
                /* Round length to next multiple of bytes_in_last_block. */
 
469
                target_block_length = a->bytes_in_last_block *
 
470
                    ( (block_length + a->bytes_in_last_block - 1) /
 
471
                        a->bytes_in_last_block);
 
472
        if (target_block_length > a->bytes_per_block)
 
473
                target_block_length = a->bytes_per_block;
 
474
        if (block_length < target_block_length) {
 
475
                memset(state->compressed + state->compressed_offset, 0,
 
476
                    target_block_length - block_length);
 
477
                block_length = target_block_length;
 
478
        }
 
479
 
 
480
        /* Write the last block */
 
481
        bytes_written = (a->client_writer)(&a->archive, a->client_data,
 
482
            state->compressed, block_length);
 
483
        if (bytes_written <= 0)
 
484
                ret = ARCHIVE_FATAL;
 
485
        else
 
486
                a->archive.raw_position += bytes_written;
 
487
 
 
488
cleanup:
 
489
        free(state->compressed);
 
490
        free(state);
 
491
        return (ret);
 
492
}