~stewart/percona-xtrabackup/bug1213036

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_read_support_compression_program.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: (391.1.5 staging)
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2007 Joerg Sonnenberger
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
#include "archive_platform.h"
 
27
__FBSDID("$FreeBSD: head/lib/libarchive/archive_read_support_compression_program.c 201112 2009-12-28 06:59:35Z kientzle $");
 
28
 
 
29
#ifdef HAVE_SYS_WAIT_H
 
30
#  include <sys/wait.h>
 
31
#endif
 
32
#ifdef HAVE_ERRNO_H
 
33
#  include <errno.h>
 
34
#endif
 
35
#ifdef HAVE_FCNTL_H
 
36
#  include <fcntl.h>
 
37
#endif
 
38
#ifdef HAVE_LIMITS_H
 
39
#  include <limits.h>
 
40
#endif
 
41
#ifdef HAVE_SIGNAL_H
 
42
#  include <signal.h>
 
43
#endif
 
44
#ifdef HAVE_STDLIB_H
 
45
#  include <stdlib.h>
 
46
#endif
 
47
#ifdef HAVE_STRING_H
 
48
#  include <string.h>
 
49
#endif
 
50
#ifdef HAVE_UNISTD_H
 
51
#  include <unistd.h>
 
52
#endif
 
53
 
 
54
#include "archive.h"
 
55
#include "archive_private.h"
 
56
#include "archive_read_private.h"
 
57
 
 
58
int
 
59
archive_read_support_compression_program(struct archive *a, const char *cmd)
 
60
{
 
61
        return (archive_read_support_compression_program_signature(a, cmd, NULL, 0));
 
62
}
 
63
 
 
64
 
 
65
/* This capability is only available on POSIX systems. */
 
66
#if (!defined(HAVE_PIPE) || !defined(HAVE_FCNTL) || \
 
67
    !(defined(HAVE_FORK) || defined(HAVE_VFORK))) && (!defined(_WIN32) || defined(__CYGWIN__))
 
68
 
 
69
/*
 
70
 * On non-Posix systems, allow the program to build, but choke if
 
71
 * this function is actually invoked.
 
72
 */
 
73
int
 
74
archive_read_support_compression_program_signature(struct archive *_a,
 
75
    const char *cmd, void *signature, size_t signature_len)
 
76
{
 
77
        (void)_a; /* UNUSED */
 
78
        (void)cmd; /* UNUSED */
 
79
        (void)signature; /* UNUSED */
 
80
        (void)signature_len; /* UNUSED */
 
81
 
 
82
        archive_set_error(_a, -1,
 
83
            "External compression programs not supported on this platform");
 
84
        return (ARCHIVE_FATAL);
 
85
}
 
86
 
 
87
int
 
88
__archive_read_program(struct archive_read_filter *self, const char *cmd)
 
89
{
 
90
        (void)self; /* UNUSED */
 
91
        (void)cmd; /* UNUSED */
 
92
 
 
93
        archive_set_error(&self->archive->archive, -1,
 
94
            "External compression programs not supported on this platform");
 
95
        return (ARCHIVE_FATAL);
 
96
}
 
97
 
 
98
#else
 
99
 
 
100
#include "filter_fork.h"
 
101
 
 
102
/*
 
103
 * The bidder object stores the command and the signature to watch for.
 
104
 * The 'inhibit' entry here is used to ensure that unchecked filters never
 
105
 * bid twice in the same pipeline.
 
106
 */
 
107
struct program_bidder {
 
108
        char *cmd;
 
109
        void *signature;
 
110
        size_t signature_len;
 
111
        int inhibit;
 
112
};
 
113
 
 
114
static int      program_bidder_bid(struct archive_read_filter_bidder *,
 
115
                    struct archive_read_filter *upstream);
 
116
static int      program_bidder_init(struct archive_read_filter *);
 
117
static int      program_bidder_free(struct archive_read_filter_bidder *);
 
118
 
 
119
/*
 
120
 * The actual filter needs to track input and output data.
 
121
 */
 
122
struct program_filter {
 
123
        char            *description;
 
124
        pid_t            child;
 
125
        int              exit_status;
 
126
        int              waitpid_return;
 
127
        int              child_stdin, child_stdout;
 
128
 
 
129
        char            *out_buf;
 
130
        size_t           out_buf_len;
 
131
};
 
132
 
 
133
static ssize_t  program_filter_read(struct archive_read_filter *,
 
134
                    const void **);
 
135
static int      program_filter_close(struct archive_read_filter *);
 
136
 
 
137
int
 
138
archive_read_support_compression_program_signature(struct archive *_a,
 
139
    const char *cmd, const void *signature, size_t signature_len)
 
140
{
 
141
        struct archive_read *a = (struct archive_read *)_a;
 
142
        struct archive_read_filter_bidder *bidder;
 
143
        struct program_bidder *state;
 
144
 
 
145
        /*
 
146
         * Get a bidder object from the read core.
 
147
         */
 
148
        bidder = __archive_read_get_bidder(a);
 
149
        if (bidder == NULL)
 
150
                return (ARCHIVE_FATAL);
 
151
 
 
152
        /*
 
153
         * Allocate our private state.
 
154
         */
 
155
        state = (struct program_bidder *)calloc(sizeof (*state), 1);
 
156
        if (state == NULL)
 
157
                return (ARCHIVE_FATAL);
 
158
        state->cmd = strdup(cmd);
 
159
        if (signature != NULL && signature_len > 0) {
 
160
                state->signature_len = signature_len;
 
161
                state->signature = malloc(signature_len);
 
162
                memcpy(state->signature, signature, signature_len);
 
163
        }
 
164
 
 
165
        /*
 
166
         * Fill in the bidder object.
 
167
         */
 
168
        bidder->data = state;
 
169
        bidder->bid = program_bidder_bid;
 
170
        bidder->init = program_bidder_init;
 
171
        bidder->options = NULL;
 
172
        bidder->free = program_bidder_free;
 
173
        return (ARCHIVE_OK);
 
174
}
 
175
 
 
176
static int
 
177
program_bidder_free(struct archive_read_filter_bidder *self)
 
178
{
 
179
        struct program_bidder *state = (struct program_bidder *)self->data;
 
180
        free(state->cmd);
 
181
        free(state->signature);
 
182
        free(self->data);
 
183
        return (ARCHIVE_OK);
 
184
}
 
185
 
 
186
/*
 
187
 * If we do have a signature, bid only if that matches.
 
188
 *
 
189
 * If there's no signature, we bid INT_MAX the first time
 
190
 * we're called, then never bid again.
 
191
 */
 
192
static int
 
193
program_bidder_bid(struct archive_read_filter_bidder *self,
 
194
    struct archive_read_filter *upstream)
 
195
{
 
196
        struct program_bidder *state = self->data;
 
197
        const char *p;
 
198
 
 
199
        /* If we have a signature, use that to match. */
 
200
        if (state->signature_len > 0) {
 
201
                p = __archive_read_filter_ahead(upstream,
 
202
                    state->signature_len, NULL);
 
203
                if (p == NULL)
 
204
                        return (0);
 
205
                /* No match, so don't bid. */
 
206
                if (memcmp(p, state->signature, state->signature_len) != 0)
 
207
                        return (0);
 
208
                return ((int)state->signature_len * 8);
 
209
        }
 
210
 
 
211
        /* Otherwise, bid once and then never bid again. */
 
212
        if (state->inhibit)
 
213
                return (0);
 
214
        state->inhibit = 1;
 
215
        return (INT_MAX);
 
216
}
 
217
 
 
218
/*
 
219
 * Shut down the child, return ARCHIVE_OK if it exited normally.
 
220
 *
 
221
 * Note that the return value is sticky; if we're called again,
 
222
 * we won't reap the child again, but we will return the same status
 
223
 * (including error message if the child came to a bad end).
 
224
 */
 
225
static int
 
226
child_stop(struct archive_read_filter *self, struct program_filter *state)
 
227
{
 
228
        /* Close our side of the I/O with the child. */
 
229
        if (state->child_stdin != -1) {
 
230
                close(state->child_stdin);
 
231
                state->child_stdin = -1;
 
232
        }
 
233
        if (state->child_stdout != -1) {
 
234
                close(state->child_stdout);
 
235
                state->child_stdout = -1;
 
236
        }
 
237
 
 
238
        if (state->child != 0) {
 
239
                /* Reap the child. */
 
240
                do {
 
241
                        state->waitpid_return
 
242
                            = waitpid(state->child, &state->exit_status, 0);
 
243
                } while (state->waitpid_return == -1 && errno == EINTR);
 
244
                state->child = 0;
 
245
        }
 
246
 
 
247
        if (state->waitpid_return < 0) {
 
248
                /* waitpid() failed?  This is ugly. */
 
249
                archive_set_error(&self->archive->archive, ARCHIVE_ERRNO_MISC,
 
250
                    "Child process exited badly");
 
251
                return (ARCHIVE_WARN);
 
252
        }
 
253
 
 
254
#if !defined(_WIN32) || defined(__CYGWIN__)
 
255
        if (WIFSIGNALED(state->exit_status)) {
 
256
#ifdef SIGPIPE
 
257
                /* If the child died because we stopped reading before
 
258
                 * it was done, that's okay.  Some archive formats
 
259
                 * have padding at the end that we routinely ignore. */
 
260
                /* The alternative to this would be to add a step
 
261
                 * before close(child_stdout) above to read from the
 
262
                 * child until the child has no more to write. */
 
263
                if (WTERMSIG(state->exit_status) == SIGPIPE)
 
264
                        return (ARCHIVE_OK);
 
265
#endif
 
266
                archive_set_error(&self->archive->archive, ARCHIVE_ERRNO_MISC,
 
267
                    "Child process exited with signal %d",
 
268
                    WTERMSIG(state->exit_status));
 
269
                return (ARCHIVE_WARN);
 
270
        }
 
271
#endif /* !_WIN32 || __CYGWIN__ */
 
272
 
 
273
        if (WIFEXITED(state->exit_status)) {
 
274
                if (WEXITSTATUS(state->exit_status) == 0)
 
275
                        return (ARCHIVE_OK);
 
276
 
 
277
                archive_set_error(&self->archive->archive,
 
278
                    ARCHIVE_ERRNO_MISC,
 
279
                    "Child process exited with status %d",
 
280
                    WEXITSTATUS(state->exit_status));
 
281
                return (ARCHIVE_WARN);
 
282
        }
 
283
 
 
284
        return (ARCHIVE_WARN);
 
285
}
 
286
 
 
287
/*
 
288
 * Use select() to decide whether the child is ready for read or write.
 
289
 */
 
290
static ssize_t
 
291
child_read(struct archive_read_filter *self, char *buf, size_t buf_len)
 
292
{
 
293
        struct program_filter *state = self->data;
 
294
        ssize_t ret, requested, avail;
 
295
        const char *p;
 
296
 
 
297
        requested = buf_len > SSIZE_MAX ? SSIZE_MAX : buf_len;
 
298
 
 
299
        for (;;) {
 
300
                do {
 
301
                        ret = read(state->child_stdout, buf, requested);
 
302
                } while (ret == -1 && errno == EINTR);
 
303
 
 
304
                if (ret > 0)
 
305
                        return (ret);
 
306
                if (ret == 0 || (ret == -1 && errno == EPIPE))
 
307
                        /* Child has closed its output; reap the child
 
308
                         * and return the status. */
 
309
                        return (child_stop(self, state));
 
310
                if (ret == -1 && errno != EAGAIN)
 
311
                        return (-1);
 
312
 
 
313
                if (state->child_stdin == -1) {
 
314
                        /* Block until child has some I/O ready. */
 
315
                        __archive_check_child(state->child_stdin,
 
316
                            state->child_stdout);
 
317
                        continue;
 
318
                }
 
319
 
 
320
                /* Get some more data from upstream. */
 
321
                p = __archive_read_filter_ahead(self->upstream, 1, &avail);
 
322
                if (p == NULL) {
 
323
                        close(state->child_stdin);
 
324
                        state->child_stdin = -1;
 
325
                        fcntl(state->child_stdout, F_SETFL, 0);
 
326
                        if (avail < 0)
 
327
                                return (avail);
 
328
                        continue;
 
329
                }
 
330
 
 
331
                do {
 
332
                        ret = write(state->child_stdin, p, avail);
 
333
                } while (ret == -1 && errno == EINTR);
 
334
 
 
335
                if (ret > 0) {
 
336
                        /* Consume whatever we managed to write. */
 
337
                        __archive_read_filter_consume(self->upstream, ret);
 
338
                } else if (ret == -1 && errno == EAGAIN) {
 
339
                        /* Block until child has some I/O ready. */
 
340
                        __archive_check_child(state->child_stdin,
 
341
                            state->child_stdout);
 
342
                } else {
 
343
                        /* Write failed. */
 
344
                        close(state->child_stdin);
 
345
                        state->child_stdin = -1;
 
346
                        fcntl(state->child_stdout, F_SETFL, 0);
 
347
                        /* If it was a bad error, we're done; otherwise
 
348
                         * it was EPIPE or EOF, and we can still read
 
349
                         * from the child. */
 
350
                        if (ret == -1 && errno != EPIPE)
 
351
                                return (-1);
 
352
                }
 
353
        }
 
354
}
 
355
 
 
356
int
 
357
__archive_read_program(struct archive_read_filter *self, const char *cmd)
 
358
{
 
359
        struct program_filter   *state;
 
360
        static const size_t out_buf_len = 65536;
 
361
        char *out_buf;
 
362
        char *description;
 
363
        const char *prefix = "Program: ";
 
364
 
 
365
        state = (struct program_filter *)calloc(1, sizeof(*state));
 
366
        out_buf = (char *)malloc(out_buf_len);
 
367
        description = (char *)malloc(strlen(prefix) + strlen(cmd) + 1);
 
368
        if (state == NULL || out_buf == NULL || description == NULL) {
 
369
                archive_set_error(&self->archive->archive, ENOMEM,
 
370
                    "Can't allocate input data");
 
371
                free(state);
 
372
                free(out_buf);
 
373
                free(description);
 
374
                return (ARCHIVE_FATAL);
 
375
        }
 
376
 
 
377
        self->code = ARCHIVE_COMPRESSION_PROGRAM;
 
378
        state->description = description;
 
379
        strcpy(state->description, prefix);
 
380
        strcat(state->description, cmd);
 
381
        self->name = state->description;
 
382
 
 
383
        state->out_buf = out_buf;
 
384
        state->out_buf_len = out_buf_len;
 
385
 
 
386
        if ((state->child = __archive_create_child(cmd,
 
387
                 &state->child_stdin, &state->child_stdout)) == -1) {
 
388
                free(state->out_buf);
 
389
                free(state);
 
390
                archive_set_error(&self->archive->archive, EINVAL,
 
391
                    "Can't initialise filter");
 
392
                return (ARCHIVE_FATAL);
 
393
        }
 
394
 
 
395
        self->data = state;
 
396
        self->read = program_filter_read;
 
397
        self->skip = NULL;
 
398
        self->close = program_filter_close;
 
399
 
 
400
        /* XXX Check that we can read at least one byte? */
 
401
        return (ARCHIVE_OK);
 
402
}
 
403
 
 
404
static int
 
405
program_bidder_init(struct archive_read_filter *self)
 
406
{
 
407
        struct program_bidder   *bidder_state;
 
408
 
 
409
        bidder_state = (struct program_bidder *)self->bidder->data;
 
410
        return (__archive_read_program(self, bidder_state->cmd));
 
411
}
 
412
 
 
413
static ssize_t
 
414
program_filter_read(struct archive_read_filter *self, const void **buff)
 
415
{
 
416
        struct program_filter *state;
 
417
        ssize_t bytes;
 
418
        size_t total;
 
419
        char *p;
 
420
 
 
421
        state = (struct program_filter *)self->data;
 
422
 
 
423
        total = 0;
 
424
        p = state->out_buf;
 
425
        while (state->child_stdout != -1 && total < state->out_buf_len) {
 
426
                bytes = child_read(self, p, state->out_buf_len - total);
 
427
                if (bytes < 0)
 
428
                        /* No recovery is possible if we can no longer
 
429
                         * read from the child. */
 
430
                        return (ARCHIVE_FATAL);
 
431
                if (bytes == 0)
 
432
                        /* We got EOF from the child. */
 
433
                        break;
 
434
                total += bytes;
 
435
                p += bytes;
 
436
        }
 
437
 
 
438
        *buff = state->out_buf;
 
439
        return (total);
 
440
}
 
441
 
 
442
static int
 
443
program_filter_close(struct archive_read_filter *self)
 
444
{
 
445
        struct program_filter   *state;
 
446
        int e;
 
447
 
 
448
        state = (struct program_filter *)self->data;
 
449
        e = child_stop(self, state);
 
450
 
 
451
        /* Release our private data. */
 
452
        free(state->out_buf);
 
453
        free(state->description);
 
454
        free(state);
 
455
 
 
456
        return (e);
 
457
}
 
458
 
 
459
#endif /* !defined(HAVE_PIPE) || !defined(HAVE_VFORK) || !defined(HAVE_FCNTL) */