~akopytov/percona-xtrabackup/bug1166888-2.1

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_read_support_compression_rpm.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: (391.1.5 staging)
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2009 Michihiro NAKAJIMA
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
#include "archive_platform.h"
 
27
 
 
28
#ifdef HAVE_ERRNO_H
 
29
#include <errno.h>
 
30
#endif
 
31
#ifdef HAVE_STDLIB_H
 
32
#include <stdlib.h>
 
33
#endif
 
34
 
 
35
#include "archive.h"
 
36
#include "archive_endian.h"
 
37
#include "archive_private.h"
 
38
#include "archive_read_private.h"
 
39
 
 
40
struct rpm {
 
41
        int64_t          total_in;
 
42
        size_t           hpos;
 
43
        size_t           hlen;
 
44
        unsigned char    header[16];
 
45
        enum {
 
46
                ST_LEAD,        /* Skipping 'Lead' section. */
 
47
                ST_HEADER,      /* Reading 'Header' section;
 
48
                                 * first 16 bytes. */
 
49
                ST_HEADER_DATA, /* Skipping 'Header' section. */
 
50
                ST_PADDING,     /* Skipping padding data after the
 
51
                                 * 'Header' section. */
 
52
                ST_ARCHIVE      /* Reading 'Archive' section. */
 
53
        }                state;
 
54
        int              first_header;
 
55
};
 
56
#define RPM_LEAD_SIZE   96      /* Size of 'Lead' section. */
 
57
 
 
58
static int      rpm_bidder_bid(struct archive_read_filter_bidder *,
 
59
                    struct archive_read_filter *);
 
60
static int      rpm_bidder_init(struct archive_read_filter *);
 
61
 
 
62
static ssize_t  rpm_filter_read(struct archive_read_filter *,
 
63
                    const void **);
 
64
static int      rpm_filter_close(struct archive_read_filter *);
 
65
 
 
66
int
 
67
archive_read_support_compression_rpm(struct archive *_a)
 
68
{
 
69
        struct archive_read *a = (struct archive_read *)_a;
 
70
        struct archive_read_filter_bidder *bidder;
 
71
 
 
72
        bidder = __archive_read_get_bidder(a);
 
73
        archive_clear_error(_a);
 
74
        if (bidder == NULL)
 
75
                return (ARCHIVE_FATAL);
 
76
 
 
77
        bidder->data = NULL;
 
78
        bidder->bid = rpm_bidder_bid;
 
79
        bidder->init = rpm_bidder_init;
 
80
        bidder->options = NULL;
 
81
        bidder->free = NULL;
 
82
        return (ARCHIVE_OK);
 
83
}
 
84
 
 
85
static int
 
86
rpm_bidder_bid(struct archive_read_filter_bidder *self,
 
87
    struct archive_read_filter *filter)
 
88
{
 
89
        const unsigned char *b;
 
90
        ssize_t avail;
 
91
        int bits_checked;
 
92
 
 
93
        (void)self; /* UNUSED */
 
94
 
 
95
        b = __archive_read_filter_ahead(filter, 8, &avail);
 
96
        if (b == NULL)
 
97
                return (0);
 
98
 
 
99
        bits_checked = 0;
 
100
        /*
 
101
         * Verify Header Magic Bytes : 0xed 0xab 0xee 0xdb
 
102
         */
 
103
        if (b[0] != 0xed)
 
104
                return (0);
 
105
        bits_checked += 8;
 
106
        if (b[1] != 0xab)
 
107
                return (0);
 
108
        bits_checked += 8;
 
109
        if (b[2] != 0xee)
 
110
                return (0);
 
111
        bits_checked += 8;
 
112
        if (b[3] != 0xdb)
 
113
                return (0);
 
114
        bits_checked += 8;
 
115
        /*
 
116
         * Check major version.
 
117
         */
 
118
        if (b[4] != 3 && b[4] != 4)
 
119
                return (0);
 
120
        bits_checked += 8;
 
121
        /*
 
122
         * Check package type; binary or source.
 
123
         */
 
124
        if (b[6] != 0)
 
125
                return (0);
 
126
        bits_checked += 8;
 
127
        if (b[7] != 0 && b[7] != 1)
 
128
                return (0);
 
129
        bits_checked += 8;
 
130
 
 
131
        return (bits_checked);
 
132
}
 
133
 
 
134
static int
 
135
rpm_bidder_init(struct archive_read_filter *self)
 
136
{
 
137
        struct rpm   *rpm;
 
138
 
 
139
        self->code = ARCHIVE_COMPRESSION_RPM;
 
140
        self->name = "rpm";
 
141
        self->read = rpm_filter_read;
 
142
        self->skip = NULL; /* not supported */
 
143
        self->close = rpm_filter_close;
 
144
 
 
145
        rpm = (struct rpm *)calloc(sizeof(*rpm), 1);
 
146
        if (rpm == NULL) {
 
147
                archive_set_error(&self->archive->archive, ENOMEM,
 
148
                    "Can't allocate data for rpm");
 
149
                return (ARCHIVE_FATAL);
 
150
        }
 
151
 
 
152
        self->data = rpm;
 
153
        rpm->state = ST_LEAD;
 
154
 
 
155
        return (ARCHIVE_OK);
 
156
}
 
157
 
 
158
static ssize_t
 
159
rpm_filter_read(struct archive_read_filter *self, const void **buff)
 
160
{
 
161
        struct rpm *rpm;
 
162
        const unsigned char *b;
 
163
        ssize_t avail_in, total;
 
164
        size_t used, n;
 
165
        uint32_t section;
 
166
        uint32_t bytes;
 
167
 
 
168
        rpm = (struct rpm *)self->data;
 
169
        *buff = NULL;
 
170
        total = avail_in = 0;
 
171
        b = NULL;
 
172
        used = 0;
 
173
        do {
 
174
                if (b == NULL) {
 
175
                        b = __archive_read_filter_ahead(self->upstream, 1,
 
176
                            &avail_in);
 
177
                        if (b == NULL) {
 
178
                                if (avail_in < 0)
 
179
                                        return (ARCHIVE_FATAL);
 
180
                                else
 
181
                                        break;
 
182
                        }
 
183
                }
 
184
 
 
185
                switch (rpm->state) {
 
186
                case ST_LEAD:
 
187
                        if (rpm->total_in + avail_in < RPM_LEAD_SIZE)
 
188
                                used += avail_in;
 
189
                        else {
 
190
                                n = RPM_LEAD_SIZE - rpm->total_in;
 
191
                                used += n;
 
192
                                b += n;
 
193
                                rpm->state = ST_HEADER;
 
194
                                rpm->hpos = 0;
 
195
                                rpm->hlen = 0;
 
196
                                rpm->first_header = 1;
 
197
                        }
 
198
                        break;
 
199
                case ST_HEADER:
 
200
                        n = 16 - rpm->hpos;
 
201
                        if (n > avail_in - used)
 
202
                                n = avail_in - used;
 
203
                        memcpy(rpm->header+rpm->hpos, b, n);
 
204
                        b += n;
 
205
                        used += n;
 
206
                        rpm->hpos += n;
 
207
 
 
208
                        if (rpm->hpos == 16) {
 
209
                                if (rpm->header[0] != 0x8e ||
 
210
                                    rpm->header[1] != 0xad ||
 
211
                                    rpm->header[2] != 0xe8 ||
 
212
                                    rpm->header[3] != 0x01) {
 
213
                                        if (rpm->first_header) {
 
214
                                                archive_set_error(
 
215
                                                    &self->archive->archive,
 
216
                                                    ARCHIVE_ERRNO_FILE_FORMAT,
 
217
                                                    "Unrecoginized rpm header");
 
218
                                                return (ARCHIVE_FATAL);
 
219
                                        }
 
220
                                        rpm->state = ST_ARCHIVE;
 
221
                                        *buff = rpm->header;
 
222
                                        total = rpm->hpos;
 
223
                                        break;
 
224
                                }
 
225
                                /* Calculate 'Header' length. */
 
226
                                section = archive_be32dec(rpm->header+8);
 
227
                                bytes = archive_be32dec(rpm->header+12);
 
228
                                rpm->hlen = 16 + section * 16 + bytes;
 
229
                                rpm->state = ST_HEADER_DATA;
 
230
                                rpm->first_header = 0;
 
231
                        }
 
232
                        break;
 
233
                case ST_HEADER_DATA:
 
234
                        n = rpm->hlen - rpm->hpos;
 
235
                        if (n > avail_in - used)
 
236
                                n = avail_in - used;
 
237
                        b += n;
 
238
                        used += n;
 
239
                        rpm->hpos += n;
 
240
                        if (rpm->hpos == rpm->hlen)
 
241
                                rpm->state = ST_PADDING;
 
242
                        break;
 
243
                case ST_PADDING:
 
244
                        while (used < (size_t)avail_in) {
 
245
                                if (*b != 0) {
 
246
                                        /* Read next header. */
 
247
                                        rpm->state = ST_HEADER;
 
248
                                        rpm->hpos = 0;
 
249
                                        rpm->hlen = 0;
 
250
                                        break;
 
251
                                }
 
252
                                b++;
 
253
                                used++;
 
254
                        }
 
255
                        break;
 
256
                case ST_ARCHIVE:
 
257
                        *buff = b;
 
258
                        total = avail_in;
 
259
                        used = avail_in;
 
260
                        break;
 
261
                }
 
262
                if (used == (size_t)avail_in) {
 
263
                        rpm->total_in += used;
 
264
                        __archive_read_filter_consume(self->upstream, used);
 
265
                        b = NULL;
 
266
                        used = 0;
 
267
                }
 
268
        } while (total == 0 && avail_in > 0);
 
269
 
 
270
        if (used > 0 && b != NULL) {
 
271
                rpm->total_in += used;
 
272
                __archive_read_filter_consume(self->upstream, used);
 
273
        }
 
274
        return (total);
 
275
}
 
276
 
 
277
static int
 
278
rpm_filter_close(struct archive_read_filter *self)
 
279
{
 
280
        struct rpm *rpm;
 
281
 
 
282
        rpm = (struct rpm *)self->data;
 
283
        free(rpm);
 
284
 
 
285
        return (ARCHIVE_OK);
 
286
}
 
287