~akopytov/percona-xtrabackup/bug1166888-2.1

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_write_set_compression_program.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: (391.1.5 staging)
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2007 Joerg Sonnenberger
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
#include "archive_platform.h"
 
27
 
 
28
__FBSDID("$FreeBSD: head/lib/libarchive/archive_write_set_compression_program.c 201104 2009-12-28 03:14:30Z kientzle $");
 
29
 
 
30
/* This capability is only available on POSIX systems. */
 
31
#if (!defined(HAVE_PIPE) || !defined(HAVE_FCNTL) || \
 
32
    !(defined(HAVE_FORK) || defined(HAVE_VFORK))) && (!defined(_WIN32) || defined(__CYGWIN__))
 
33
#include "archive.h"
 
34
 
 
35
/*
 
36
 * On non-Posix systems, allow the program to build, but choke if
 
37
 * this function is actually invoked.
 
38
 */
 
39
int
 
40
archive_write_set_compression_program(struct archive *_a, const char *cmd)
 
41
{
 
42
        archive_set_error(_a, -1,
 
43
            "External compression programs not supported on this platform");
 
44
        return (ARCHIVE_FATAL);
 
45
}
 
46
 
 
47
#else
 
48
 
 
49
#ifdef HAVE_SYS_WAIT_H
 
50
#  include <sys/wait.h>
 
51
#endif
 
52
#ifdef HAVE_ERRNO_H
 
53
#  include <errno.h>
 
54
#endif
 
55
#ifdef HAVE_FCNTL_H
 
56
#  include <fcntl.h>
 
57
#endif
 
58
#ifdef HAVE_STDLIB_H
 
59
#  include <stdlib.h>
 
60
#endif
 
61
#ifdef HAVE_STRING_H
 
62
#  include <string.h>
 
63
#endif
 
64
 
 
65
#include "archive.h"
 
66
#include "archive_private.h"
 
67
#include "archive_write_private.h"
 
68
 
 
69
#include "filter_fork.h"
 
70
 
 
71
struct private_data {
 
72
        char            *description;
 
73
        pid_t            child;
 
74
        int              child_stdin, child_stdout;
 
75
 
 
76
        char            *child_buf;
 
77
        size_t           child_buf_len, child_buf_avail;
 
78
};
 
79
 
 
80
static int      archive_compressor_program_finish(struct archive_write *);
 
81
static int      archive_compressor_program_init(struct archive_write *);
 
82
static int      archive_compressor_program_write(struct archive_write *,
 
83
                    const void *, size_t);
 
84
 
 
85
/*
 
86
 * Allocate, initialize and return a archive object.
 
87
 */
 
88
int
 
89
archive_write_set_compression_program(struct archive *_a, const char *cmd)
 
90
{
 
91
        struct archive_write *a = (struct archive_write *)_a;
 
92
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
93
            ARCHIVE_STATE_NEW, "archive_write_set_compression_program");
 
94
        a->compressor.init = &archive_compressor_program_init;
 
95
        a->compressor.config = strdup(cmd);
 
96
        return (ARCHIVE_OK);
 
97
}
 
98
 
 
99
/*
 
100
 * Setup callback.
 
101
 */
 
102
static int
 
103
archive_compressor_program_init(struct archive_write *a)
 
104
{
 
105
        int ret;
 
106
        struct private_data *state;
 
107
        static const char *prefix = "Program: ";
 
108
        char *cmd = a->compressor.config;
 
109
 
 
110
        if (a->client_opener != NULL) {
 
111
                ret = (a->client_opener)(&a->archive, a->client_data);
 
112
                if (ret != ARCHIVE_OK)
 
113
                        return (ret);
 
114
        }
 
115
 
 
116
        state = (struct private_data *)malloc(sizeof(*state));
 
117
        if (state == NULL) {
 
118
                archive_set_error(&a->archive, ENOMEM,
 
119
                    "Can't allocate data for compression");
 
120
                return (ARCHIVE_FATAL);
 
121
        }
 
122
        memset(state, 0, sizeof(*state));
 
123
 
 
124
        a->archive.compression_code = ARCHIVE_COMPRESSION_PROGRAM;
 
125
        state->description = (char *)malloc(strlen(prefix) + strlen(cmd) + 1);
 
126
        strcpy(state->description, prefix);
 
127
        strcat(state->description, cmd);
 
128
        a->archive.compression_name = state->description;
 
129
 
 
130
        state->child_buf_len = a->bytes_per_block;
 
131
        state->child_buf_avail = 0;
 
132
        state->child_buf = malloc(state->child_buf_len);
 
133
 
 
134
        if (state->child_buf == NULL) {
 
135
                archive_set_error(&a->archive, ENOMEM,
 
136
                    "Can't allocate data for compression buffer");
 
137
                free(state);
 
138
                return (ARCHIVE_FATAL);
 
139
        }
 
140
 
 
141
        if ((state->child = __archive_create_child(cmd,
 
142
                 &state->child_stdin, &state->child_stdout)) == -1) {
 
143
                archive_set_error(&a->archive, EINVAL,
 
144
                    "Can't initialise filter");
 
145
                free(state->child_buf);
 
146
                free(state);
 
147
                return (ARCHIVE_FATAL);
 
148
        }
 
149
 
 
150
        a->compressor.write = archive_compressor_program_write;
 
151
        a->compressor.finish = archive_compressor_program_finish;
 
152
 
 
153
        a->compressor.data = state;
 
154
        return (0);
 
155
}
 
156
 
 
157
static ssize_t
 
158
child_write(struct archive_write *a, const char *buf, size_t buf_len)
 
159
{
 
160
        struct private_data *state = a->compressor.data;
 
161
        ssize_t ret;
 
162
 
 
163
        if (state->child_stdin == -1)
 
164
                return (-1);
 
165
 
 
166
        if (buf_len == 0)
 
167
                return (-1);
 
168
 
 
169
restart_write:
 
170
        do {
 
171
                ret = write(state->child_stdin, buf, buf_len);
 
172
        } while (ret == -1 && errno == EINTR);
 
173
 
 
174
        if (ret > 0)
 
175
                return (ret);
 
176
        if (ret == 0) {
 
177
                close(state->child_stdin);
 
178
                state->child_stdin = -1;
 
179
                fcntl(state->child_stdout, F_SETFL, 0);
 
180
                return (0);
 
181
        }
 
182
        if (ret == -1 && errno != EAGAIN)
 
183
                return (-1);
 
184
 
 
185
        if (state->child_stdout == -1) {
 
186
                fcntl(state->child_stdin, F_SETFL, 0);
 
187
                __archive_check_child(state->child_stdin, state->child_stdout);
 
188
                goto restart_write;
 
189
        }
 
190
 
 
191
        do {
 
192
                ret = read(state->child_stdout,
 
193
                    state->child_buf + state->child_buf_avail,
 
194
                    state->child_buf_len - state->child_buf_avail);
 
195
        } while (ret == -1 && errno == EINTR);
 
196
 
 
197
        if (ret == 0 || (ret == -1 && errno == EPIPE)) {
 
198
                close(state->child_stdout);
 
199
                state->child_stdout = -1;
 
200
                fcntl(state->child_stdin, F_SETFL, 0);
 
201
                goto restart_write;
 
202
        }
 
203
        if (ret == -1 && errno == EAGAIN) {
 
204
                __archive_check_child(state->child_stdin, state->child_stdout);
 
205
                goto restart_write;
 
206
        }
 
207
        if (ret == -1)
 
208
                return (-1);
 
209
 
 
210
        state->child_buf_avail += ret;
 
211
 
 
212
        ret = (a->client_writer)(&a->archive, a->client_data,
 
213
            state->child_buf, state->child_buf_avail);
 
214
        if (ret <= 0)
 
215
                return (-1);
 
216
 
 
217
        if ((size_t)ret < state->child_buf_avail) {
 
218
                memmove(state->child_buf, state->child_buf + ret,
 
219
                    state->child_buf_avail - ret);
 
220
        }
 
221
        state->child_buf_avail -= ret;
 
222
        a->archive.raw_position += ret;
 
223
        goto restart_write;
 
224
}
 
225
 
 
226
/*
 
227
 * Write data to the compressed stream.
 
228
 */
 
229
static int
 
230
archive_compressor_program_write(struct archive_write *a, const void *buff,
 
231
    size_t length)
 
232
{
 
233
        ssize_t ret;
 
234
        const char *buf;
 
235
 
 
236
        if (a->client_writer == NULL) {
 
237
                archive_set_error(&a->archive, ARCHIVE_ERRNO_PROGRAMMER,
 
238
                    "No write callback is registered?  "
 
239
                    "This is probably an internal programming error.");
 
240
                return (ARCHIVE_FATAL);
 
241
        }
 
242
 
 
243
        buf = buff;
 
244
        while (length > 0) {
 
245
                ret = child_write(a, buf, length);
 
246
                if (ret == -1 || ret == 0) {
 
247
                        archive_set_error(&a->archive, EIO,
 
248
                            "Can't write to filter");
 
249
                        return (ARCHIVE_FATAL);
 
250
                }
 
251
                length -= ret;
 
252
                buf += ret;
 
253
        }
 
254
 
 
255
        a->archive.file_position += length;
 
256
        return (ARCHIVE_OK);
 
257
}
 
258
 
 
259
 
 
260
/*
 
261
 * Finish the compression...
 
262
 */
 
263
static int
 
264
archive_compressor_program_finish(struct archive_write *a)
 
265
{
 
266
        int ret, status;
 
267
        ssize_t bytes_read, bytes_written;
 
268
        struct private_data *state;
 
269
 
 
270
        state = (struct private_data *)a->compressor.data;
 
271
        ret = 0;
 
272
        if (a->client_writer == NULL) {
 
273
                archive_set_error(&a->archive, ARCHIVE_ERRNO_PROGRAMMER,
 
274
                    "No write callback is registered?  "
 
275
                    "This is probably an internal programming error.");
 
276
                ret = ARCHIVE_FATAL;
 
277
                goto cleanup;
 
278
        }
 
279
 
 
280
        /* XXX pad compressed data. */
 
281
 
 
282
        close(state->child_stdin);
 
283
        state->child_stdin = -1;
 
284
        fcntl(state->child_stdout, F_SETFL, 0);
 
285
 
 
286
        for (;;) {
 
287
                do {
 
288
                        bytes_read = read(state->child_stdout,
 
289
                            state->child_buf + state->child_buf_avail,
 
290
                            state->child_buf_len - state->child_buf_avail);
 
291
                } while (bytes_read == -1 && errno == EINTR);
 
292
 
 
293
                if (bytes_read == 0 || (bytes_read == -1 && errno == EPIPE))
 
294
                        break;
 
295
 
 
296
                if (bytes_read == -1) {
 
297
                        archive_set_error(&a->archive, errno,
 
298
                            "Read from filter failed unexpectedly.");
 
299
                        ret = ARCHIVE_FATAL;
 
300
                        goto cleanup;
 
301
                }
 
302
                state->child_buf_avail += bytes_read;
 
303
 
 
304
                bytes_written = (a->client_writer)(&a->archive, a->client_data,
 
305
                    state->child_buf, state->child_buf_avail);
 
306
                if (bytes_written <= 0) {
 
307
                        ret = ARCHIVE_FATAL;
 
308
                        goto cleanup;
 
309
                }
 
310
                if ((size_t)bytes_written < state->child_buf_avail) {
 
311
                        memmove(state->child_buf,
 
312
                            state->child_buf + bytes_written,
 
313
                            state->child_buf_avail - bytes_written);
 
314
                }
 
315
                state->child_buf_avail -= bytes_written;
 
316
                a->archive.raw_position += bytes_written;
 
317
        }
 
318
 
 
319
        /* XXX pad final compressed block. */
 
320
 
 
321
cleanup:
 
322
        /* Shut down the child. */
 
323
        if (state->child_stdin != -1)
 
324
                close(state->child_stdin);
 
325
        if (state->child_stdout != -1)
 
326
                close(state->child_stdout);
 
327
        while (waitpid(state->child, &status, 0) == -1 && errno == EINTR)
 
328
                continue;
 
329
 
 
330
        if (status != 0) {
 
331
                archive_set_error(&a->archive, EIO,
 
332
                    "Filter exited with failure.");
 
333
                ret = ARCHIVE_FATAL;
 
334
        }
 
335
 
 
336
        /* Release our configuration data. */
 
337
        free(a->compressor.config);
 
338
        a->compressor.config = NULL;
 
339
 
 
340
        /* Release our private state data. */
 
341
        free(state->child_buf);
 
342
        free(state->description);
 
343
        free(state);
 
344
        return (ret);
 
345
}
 
346
 
 
347
#endif /* !defined(HAVE_PIPE) || !defined(HAVE_VFORK) || !defined(HAVE_FCNTL) */