~akopytov/percona-xtrabackup/bug1166888-2.1

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_read_extract.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: (391.1.5 staging)
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2003-2007 Tim Kientzle
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
#include "archive_platform.h"
 
27
__FBSDID("$FreeBSD: src/lib/libarchive/archive_read_extract.c,v 1.61 2008/05/26 17:00:22 kientzle Exp $");
 
28
 
 
29
#ifdef HAVE_SYS_TYPES_H
 
30
#include <sys/types.h>
 
31
#endif
 
32
#ifdef HAVE_ERRNO_H
 
33
#include <errno.h>
 
34
#endif
 
35
#ifdef HAVE_STDLIB_H
 
36
#include <stdlib.h>
 
37
#endif
 
38
#ifdef HAVE_STRING_H
 
39
#include <string.h>
 
40
#endif
 
41
 
 
42
#include "archive.h"
 
43
#include "archive_private.h"
 
44
#include "archive_read_private.h"
 
45
#include "archive_write_disk_private.h"
 
46
 
 
47
struct extract {
 
48
        struct archive *ad; /* archive_write_disk object */
 
49
 
 
50
        /* Progress function invoked during extract. */
 
51
        void                    (*extract_progress)(void *);
 
52
        void                     *extract_progress_user_data;
 
53
};
 
54
 
 
55
static int      archive_read_extract_cleanup(struct archive_read *);
 
56
static int      copy_data(struct archive *ar, struct archive *aw);
 
57
static struct extract *get_extract(struct archive_read *);
 
58
 
 
59
static struct extract *
 
60
get_extract(struct archive_read *a)
 
61
{
 
62
        /* If we haven't initialized, do it now. */
 
63
        /* This also sets up a lot of global state. */
 
64
        if (a->extract == NULL) {
 
65
                a->extract = (struct extract *)malloc(sizeof(*a->extract));
 
66
                if (a->extract == NULL) {
 
67
                        archive_set_error(&a->archive, ENOMEM, "Can't extract");
 
68
                        return (NULL);
 
69
                }
 
70
                memset(a->extract, 0, sizeof(*a->extract));
 
71
                a->extract->ad = archive_write_disk_new();
 
72
                if (a->extract->ad == NULL) {
 
73
                        archive_set_error(&a->archive, ENOMEM, "Can't extract");
 
74
                        return (NULL);
 
75
                }
 
76
                archive_write_disk_set_standard_lookup(a->extract->ad);
 
77
                a->cleanup_archive_extract = archive_read_extract_cleanup;
 
78
        }
 
79
        return (a->extract);
 
80
}
 
81
 
 
82
int
 
83
archive_read_extract(struct archive *_a, struct archive_entry *entry, int flags)
 
84
{
 
85
        struct extract *extract;
 
86
 
 
87
        extract = get_extract((struct archive_read *)_a);
 
88
        if (extract == NULL)
 
89
                return (ARCHIVE_FATAL);
 
90
        archive_write_disk_set_options(extract->ad, flags);
 
91
        return (archive_read_extract2(_a, entry, extract->ad));
 
92
}
 
93
 
 
94
int
 
95
archive_read_extract2(struct archive *_a, struct archive_entry *entry,
 
96
    struct archive *ad)
 
97
{
 
98
        struct archive_read *a = (struct archive_read *)_a;
 
99
        int r, r2;
 
100
 
 
101
        /* Set up for this particular entry. */
 
102
        archive_write_disk_set_skip_file(ad,
 
103
            a->skip_file_dev, a->skip_file_ino);
 
104
        r = archive_write_header(ad, entry);
 
105
        if (r < ARCHIVE_WARN)
 
106
                r = ARCHIVE_WARN;
 
107
        if (r != ARCHIVE_OK)
 
108
                /* If _write_header failed, copy the error. */
 
109
                archive_copy_error(&a->archive, ad);
 
110
        else
 
111
                /* Otherwise, pour data into the entry. */
 
112
                r = copy_data(_a, ad);
 
113
        r2 = archive_write_finish_entry(ad);
 
114
        if (r2 < ARCHIVE_WARN)
 
115
                r2 = ARCHIVE_WARN;
 
116
        /* Use the first message. */
 
117
        if (r2 != ARCHIVE_OK && r == ARCHIVE_OK)
 
118
                archive_copy_error(&a->archive, ad);
 
119
        /* Use the worst error return. */
 
120
        if (r2 < r)
 
121
                r = r2;
 
122
        return (r);
 
123
}
 
124
 
 
125
void
 
126
archive_read_extract_set_progress_callback(struct archive *_a,
 
127
    void (*progress_func)(void *), void *user_data)
 
128
{
 
129
        struct archive_read *a = (struct archive_read *)_a;
 
130
        struct extract *extract = get_extract(a);
 
131
        if (extract != NULL) {
 
132
                extract->extract_progress = progress_func;
 
133
                extract->extract_progress_user_data = user_data;
 
134
        }
 
135
}
 
136
 
 
137
static int
 
138
copy_data(struct archive *ar, struct archive *aw)
 
139
{
 
140
        off_t offset;
 
141
        const void *buff;
 
142
        struct extract *extract;
 
143
        size_t size;
 
144
        int r;
 
145
 
 
146
        extract = get_extract((struct archive_read *)ar);
 
147
        for (;;) {
 
148
                r = archive_read_data_block(ar, &buff, &size, &offset);
 
149
                if (r == ARCHIVE_EOF)
 
150
                        return (ARCHIVE_OK);
 
151
                if (r != ARCHIVE_OK)
 
152
                        return (r);
 
153
                r = archive_write_data_block(aw, buff, size, offset);
 
154
                if (r < ARCHIVE_WARN)
 
155
                        r = ARCHIVE_WARN;
 
156
                if (r != ARCHIVE_OK) {
 
157
                        archive_set_error(ar, archive_errno(aw),
 
158
                            "%s", archive_error_string(aw));
 
159
                        return (r);
 
160
                }
 
161
                if (extract->extract_progress)
 
162
                        (extract->extract_progress)
 
163
                            (extract->extract_progress_user_data);
 
164
        }
 
165
}
 
166
 
 
167
/*
 
168
 * Cleanup function for archive_extract.
 
169
 */
 
170
static int
 
171
archive_read_extract_cleanup(struct archive_read *a)
 
172
{
 
173
        int ret = ARCHIVE_OK;
 
174
 
 
175
#if ARCHIVE_API_VERSION > 1
 
176
        ret =
 
177
#endif
 
178
            archive_write_finish(a->extract->ad);
 
179
        free(a->extract);
 
180
        a->extract = NULL;
 
181
        return (ret);
 
182
}