~gl-az/percona-xtrabackup/2.1-io-block-size

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_read_support_compression_compress.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: (391.1.5 staging)
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2003-2007 Tim Kientzle
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
/*
 
27
 * This code borrows heavily from "compress" source code, which is
 
28
 * protected by the following copyright.  (Clause 3 dropped by request
 
29
 * of the Regents.)
 
30
 */
 
31
 
 
32
/*-
 
33
 * Copyright (c) 1985, 1986, 1992, 1993
 
34
 *      The Regents of the University of California.  All rights reserved.
 
35
 *
 
36
 * This code is derived from software contributed to Berkeley by
 
37
 * Diomidis Spinellis and James A. Woods, derived from original
 
38
 * work by Spencer Thomas and Joseph Orost.
 
39
 *
 
40
 * Redistribution and use in source and binary forms, with or without
 
41
 * modification, are permitted provided that the following conditions
 
42
 * are met:
 
43
 * 1. Redistributions of source code must retain the above copyright
 
44
 *    notice, this list of conditions and the following disclaimer.
 
45
 * 2. Redistributions in binary form must reproduce the above copyright
 
46
 *    notice, this list of conditions and the following disclaimer in the
 
47
 *    documentation and/or other materials provided with the distribution.
 
48
 * 4. Neither the name of the University nor the names of its contributors
 
49
 *    may be used to endorse or promote products derived from this software
 
50
 *    without specific prior written permission.
 
51
 *
 
52
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
 
53
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 
54
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 
55
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
 
56
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 
57
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 
58
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 
59
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
60
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 
61
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 
62
 * SUCH DAMAGE.
 
63
 */
 
64
 
 
65
 
 
66
#include "archive_platform.h"
 
67
__FBSDID("$FreeBSD: head/lib/libarchive/archive_read_support_compression_compress.c 201094 2009-12-28 02:29:21Z kientzle $");
 
68
 
 
69
#ifdef HAVE_ERRNO_H
 
70
#include <errno.h>
 
71
#endif
 
72
#ifdef HAVE_STDLIB_H
 
73
#include <stdlib.h>
 
74
#endif
 
75
#ifdef HAVE_STRING_H
 
76
#include <string.h>
 
77
#endif
 
78
#ifdef HAVE_UNISTD_H
 
79
#include <unistd.h>
 
80
#endif
 
81
 
 
82
#include "archive.h"
 
83
#include "archive_private.h"
 
84
#include "archive_read_private.h"
 
85
 
 
86
/*
 
87
 * Because LZW decompression is pretty simple, I've just implemented
 
88
 * the whole decompressor here (cribbing from "compress" source code,
 
89
 * of course), rather than relying on an external library.  I have
 
90
 * made an effort to clarify and simplify the algorithm, so the
 
91
 * names and structure here don't exactly match those used by compress.
 
92
 */
 
93
 
 
94
struct private_data {
 
95
        /* Input variables. */
 
96
        const unsigned char     *next_in;
 
97
        size_t                   avail_in;
 
98
        int                      bit_buffer;
 
99
        int                      bits_avail;
 
100
        size_t                   bytes_in_section;
 
101
 
 
102
        /* Output variables. */
 
103
        size_t                   out_block_size;
 
104
        void                    *out_block;
 
105
 
 
106
        /* Decompression status variables. */
 
107
        int                      use_reset_code;
 
108
        int                      end_of_stream; /* EOF status. */
 
109
        int                      maxcode;       /* Largest code. */
 
110
        int                      maxcode_bits;  /* Length of largest code. */
 
111
        int                      section_end_code; /* When to increase bits. */
 
112
        int                      bits;          /* Current code length. */
 
113
        int                      oldcode;       /* Previous code. */
 
114
        int                      finbyte;       /* Last byte of prev code. */
 
115
 
 
116
        /* Dictionary. */
 
117
        int                      free_ent;       /* Next dictionary entry. */
 
118
        unsigned char            suffix[65536];
 
119
        uint16_t                 prefix[65536];
 
120
 
 
121
        /*
 
122
         * Scratch area for expanding dictionary entries.  Note:
 
123
         * "worst" case here comes from compressing /dev/zero: the
 
124
         * last code in the dictionary will code a sequence of
 
125
         * 65536-256 zero bytes.  Thus, we need stack space to expand
 
126
         * a 65280-byte dictionary entry.  (Of course, 32640:1
 
127
         * compression could also be considered the "best" case. ;-)
 
128
         */
 
129
        unsigned char           *stackp;
 
130
        unsigned char            stack[65300];
 
131
};
 
132
 
 
133
static int      compress_bidder_bid(struct archive_read_filter_bidder *, struct archive_read_filter *);
 
134
static int      compress_bidder_init(struct archive_read_filter *);
 
135
static int      compress_bidder_free(struct archive_read_filter_bidder *);
 
136
 
 
137
static ssize_t  compress_filter_read(struct archive_read_filter *, const void **);
 
138
static int      compress_filter_close(struct archive_read_filter *);
 
139
 
 
140
static int      getbits(struct archive_read_filter *, int n);
 
141
static int      next_code(struct archive_read_filter *);
 
142
 
 
143
int
 
144
archive_read_support_compression_compress(struct archive *_a)
 
145
{
 
146
        struct archive_read *a = (struct archive_read *)_a;
 
147
        struct archive_read_filter_bidder *bidder = __archive_read_get_bidder(a);
 
148
 
 
149
        if (bidder == NULL)
 
150
                return (ARCHIVE_FATAL);
 
151
 
 
152
        bidder->data = NULL;
 
153
        bidder->bid = compress_bidder_bid;
 
154
        bidder->init = compress_bidder_init;
 
155
        bidder->options = NULL;
 
156
        bidder->free = compress_bidder_free;
 
157
        return (ARCHIVE_OK);
 
158
}
 
159
 
 
160
/*
 
161
 * Test whether we can handle this data.
 
162
 *
 
163
 * This logic returns zero if any part of the signature fails.  It
 
164
 * also tries to Do The Right Thing if a very short buffer prevents us
 
165
 * from verifying as much as we would like.
 
166
 */
 
167
static int
 
168
compress_bidder_bid(struct archive_read_filter_bidder *self,
 
169
    struct archive_read_filter *filter)
 
170
{
 
171
        const unsigned char *buffer;
 
172
        ssize_t avail;
 
173
        int bits_checked;
 
174
 
 
175
        (void)self; /* UNUSED */
 
176
 
 
177
        buffer = __archive_read_filter_ahead(filter, 2, &avail);
 
178
 
 
179
        if (buffer == NULL)
 
180
                return (0);
 
181
 
 
182
        bits_checked = 0;
 
183
        if (buffer[0] != 037)   /* Verify first ID byte. */
 
184
                return (0);
 
185
        bits_checked += 8;
 
186
 
 
187
        if (buffer[1] != 0235)  /* Verify second ID byte. */
 
188
                return (0);
 
189
        bits_checked += 8;
 
190
 
 
191
        /*
 
192
         * TODO: Verify more.
 
193
         */
 
194
 
 
195
        return (bits_checked);
 
196
}
 
197
 
 
198
/*
 
199
 * Setup the callbacks.
 
200
 */
 
201
static int
 
202
compress_bidder_init(struct archive_read_filter *self)
 
203
{
 
204
        struct private_data *state;
 
205
        static const size_t out_block_size = 64 * 1024;
 
206
        void *out_block;
 
207
        int code;
 
208
 
 
209
        self->code = ARCHIVE_COMPRESSION_COMPRESS;
 
210
        self->name = "compress (.Z)";
 
211
 
 
212
        state = (struct private_data *)calloc(sizeof(*state), 1);
 
213
        out_block = malloc(out_block_size);
 
214
        if (state == NULL || out_block == NULL) {
 
215
                free(out_block);
 
216
                free(state);
 
217
                archive_set_error(&self->archive->archive, ENOMEM,
 
218
                    "Can't allocate data for %s decompression",
 
219
                    self->name);
 
220
                return (ARCHIVE_FATAL);
 
221
        }
 
222
 
 
223
        self->data = state;
 
224
        state->out_block_size = out_block_size;
 
225
        state->out_block = out_block;
 
226
        self->read = compress_filter_read;
 
227
        self->skip = NULL; /* not supported */
 
228
        self->close = compress_filter_close;
 
229
 
 
230
        /* XXX MOVE THE FOLLOWING OUT OF INIT() XXX */
 
231
 
 
232
        (void)getbits(self, 8); /* Skip first signature byte. */
 
233
        (void)getbits(self, 8); /* Skip second signature byte. */
 
234
 
 
235
        code = getbits(self, 8);
 
236
        state->maxcode_bits = code & 0x1f;
 
237
        state->maxcode = (1 << state->maxcode_bits);
 
238
        state->use_reset_code = code & 0x80;
 
239
 
 
240
        /* Initialize decompressor. */
 
241
        state->free_ent = 256;
 
242
        state->stackp = state->stack;
 
243
        if (state->use_reset_code)
 
244
                state->free_ent++;
 
245
        state->bits = 9;
 
246
        state->section_end_code = (1<<state->bits) - 1;
 
247
        state->oldcode = -1;
 
248
        for (code = 255; code >= 0; code--) {
 
249
                state->prefix[code] = 0;
 
250
                state->suffix[code] = code;
 
251
        }
 
252
        next_code(self);
 
253
 
 
254
        return (ARCHIVE_OK);
 
255
}
 
256
 
 
257
/*
 
258
 * Return a block of data from the decompression buffer.  Decompress more
 
259
 * as necessary.
 
260
 */
 
261
static ssize_t
 
262
compress_filter_read(struct archive_read_filter *self, const void **pblock)
 
263
{
 
264
        struct private_data *state;
 
265
        unsigned char *p, *start, *end;
 
266
        int ret;
 
267
 
 
268
        state = (struct private_data *)self->data;
 
269
        if (state->end_of_stream) {
 
270
                *pblock = NULL;
 
271
                return (0);
 
272
        }
 
273
        p = start = (unsigned char *)state->out_block;
 
274
        end = start + state->out_block_size;
 
275
 
 
276
        while (p < end && !state->end_of_stream) {
 
277
                if (state->stackp > state->stack) {
 
278
                        *p++ = *--state->stackp;
 
279
                } else {
 
280
                        ret = next_code(self);
 
281
                        if (ret == -1)
 
282
                                state->end_of_stream = ret;
 
283
                        else if (ret != ARCHIVE_OK)
 
284
                                return (ret);
 
285
                }
 
286
        }
 
287
 
 
288
        *pblock = start;
 
289
        return (p - start);
 
290
}
 
291
 
 
292
/*
 
293
 * Clean up the reader.
 
294
 */
 
295
static int
 
296
compress_bidder_free(struct archive_read_filter_bidder *self)
 
297
{
 
298
        self->data = NULL;
 
299
        return (ARCHIVE_OK);
 
300
}
 
301
 
 
302
/*
 
303
 * Close and release the filter.
 
304
 */
 
305
static int
 
306
compress_filter_close(struct archive_read_filter *self)
 
307
{
 
308
        struct private_data *state = (struct private_data *)self->data;
 
309
 
 
310
        free(state->out_block);
 
311
        free(state);
 
312
        return (ARCHIVE_OK);
 
313
}
 
314
 
 
315
/*
 
316
 * Process the next code and fill the stack with the expansion
 
317
 * of the code.  Returns ARCHIVE_FATAL if there is a fatal I/O or
 
318
 * format error, ARCHIVE_EOF if we hit end of data, ARCHIVE_OK otherwise.
 
319
 */
 
320
static int
 
321
next_code(struct archive_read_filter *self)
 
322
{
 
323
        struct private_data *state = (struct private_data *)self->data;
 
324
        int code, newcode;
 
325
 
 
326
        static int debug_buff[1024];
 
327
        static unsigned debug_index;
 
328
 
 
329
        code = newcode = getbits(self, state->bits);
 
330
        if (code < 0)
 
331
                return (code);
 
332
 
 
333
        debug_buff[debug_index++] = code;
 
334
        if (debug_index >= sizeof(debug_buff)/sizeof(debug_buff[0]))
 
335
                debug_index = 0;
 
336
 
 
337
        /* If it's a reset code, reset the dictionary. */
 
338
        if ((code == 256) && state->use_reset_code) {
 
339
                /*
 
340
                 * The original 'compress' implementation blocked its
 
341
                 * I/O in a manner that resulted in junk bytes being
 
342
                 * inserted after every reset.  The next section skips
 
343
                 * this junk.  (Yes, the number of *bytes* to skip is
 
344
                 * a function of the current *bit* length.)
 
345
                 */
 
346
                int skip_bytes =  state->bits -
 
347
                    (state->bytes_in_section % state->bits);
 
348
                skip_bytes %= state->bits;
 
349
                state->bits_avail = 0; /* Discard rest of this byte. */
 
350
                while (skip_bytes-- > 0) {
 
351
                        code = getbits(self, 8);
 
352
                        if (code < 0)
 
353
                                return (code);
 
354
                }
 
355
                /* Now, actually do the reset. */
 
356
                state->bytes_in_section = 0;
 
357
                state->bits = 9;
 
358
                state->section_end_code = (1 << state->bits) - 1;
 
359
                state->free_ent = 257;
 
360
                state->oldcode = -1;
 
361
                return (next_code(self));
 
362
        }
 
363
 
 
364
        if (code > state->free_ent) {
 
365
                /* An invalid code is a fatal error. */
 
366
                archive_set_error(&(self->archive->archive), -1,
 
367
                    "Invalid compressed data");
 
368
                return (ARCHIVE_FATAL);
 
369
        }
 
370
 
 
371
        /* Special case for KwKwK string. */
 
372
        if (code >= state->free_ent) {
 
373
                *state->stackp++ = state->finbyte;
 
374
                code = state->oldcode;
 
375
        }
 
376
 
 
377
        /* Generate output characters in reverse order. */
 
378
        while (code >= 256) {
 
379
                *state->stackp++ = state->suffix[code];
 
380
                code = state->prefix[code];
 
381
        }
 
382
        *state->stackp++ = state->finbyte = code;
 
383
 
 
384
        /* Generate the new entry. */
 
385
        code = state->free_ent;
 
386
        if (code < state->maxcode && state->oldcode >= 0) {
 
387
                state->prefix[code] = state->oldcode;
 
388
                state->suffix[code] = state->finbyte;
 
389
                ++state->free_ent;
 
390
        }
 
391
        if (state->free_ent > state->section_end_code) {
 
392
                state->bits++;
 
393
                state->bytes_in_section = 0;
 
394
                if (state->bits == state->maxcode_bits)
 
395
                        state->section_end_code = state->maxcode;
 
396
                else
 
397
                        state->section_end_code = (1 << state->bits) - 1;
 
398
        }
 
399
 
 
400
        /* Remember previous code. */
 
401
        state->oldcode = newcode;
 
402
        return (ARCHIVE_OK);
 
403
}
 
404
 
 
405
/*
 
406
 * Return next 'n' bits from stream.
 
407
 *
 
408
 * -1 indicates end of available data.
 
409
 */
 
410
static int
 
411
getbits(struct archive_read_filter *self, int n)
 
412
{
 
413
        struct private_data *state = (struct private_data *)self->data;
 
414
        int code;
 
415
        ssize_t ret;
 
416
        static const int mask[] = {
 
417
                0x00, 0x01, 0x03, 0x07, 0x0f, 0x1f, 0x3f, 0x7f, 0xff,
 
418
                0x1ff, 0x3ff, 0x7ff, 0xfff, 0x1fff, 0x3fff, 0x7fff, 0xffff
 
419
        };
 
420
 
 
421
        while (state->bits_avail < n) {
 
422
                if (state->avail_in <= 0) {
 
423
                        state->next_in
 
424
                            = __archive_read_filter_ahead(self->upstream,
 
425
                                1, &ret);
 
426
                        if (ret == 0)
 
427
                                return (-1);
 
428
                        if (ret < 0 || state->next_in == NULL)
 
429
                                return (ARCHIVE_FATAL);
 
430
                        state->avail_in = ret;
 
431
                        __archive_read_filter_consume(self->upstream, ret);
 
432
                }
 
433
                state->bit_buffer |= *state->next_in++ << state->bits_avail;
 
434
                state->avail_in--;
 
435
                state->bits_avail += 8;
 
436
                state->bytes_in_section++;
 
437
        }
 
438
 
 
439
        code = state->bit_buffer;
 
440
        state->bit_buffer >>= n;
 
441
        state->bits_avail -= n;
 
442
 
 
443
        return (code & mask[n]);
 
444
}