~akopytov/percona-xtrabackup/bug1166888-2.0

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_write_set_format_cpio_newc.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2003-2007 Tim Kientzle
 
3
 * Copyright (c) 2006 Rudolf Marek SYSGO s.r.o.
 
4
 * All rights reserved.
 
5
 *
 
6
 * Redistribution and use in source and binary forms, with or without
 
7
 * modification, are permitted provided that the following conditions
 
8
 * are met:
 
9
 * 1. Redistributions of source code must retain the above copyright
 
10
 *    notice, this list of conditions and the following disclaimer.
 
11
 * 2. Redistributions in binary form must reproduce the above copyright
 
12
 *    notice, this list of conditions and the following disclaimer in the
 
13
 *    documentation and/or other materials provided with the distribution.
 
14
 *
 
15
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
16
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
17
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
18
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
19
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
20
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
21
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
22
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
23
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
24
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
25
 */
 
26
 
 
27
#include "archive_platform.h"
 
28
__FBSDID("$FreeBSD: head/lib/libarchive/archive_write_set_format_cpio_newc.c 201160 2009-12-29 05:41:57Z kientzle $");
 
29
 
 
30
#ifdef HAVE_ERRNO_H
 
31
#include <errno.h>
 
32
#endif
 
33
#include <stdio.h>
 
34
#ifdef HAVE_STDLIB_H
 
35
#include <stdlib.h>
 
36
#endif
 
37
#ifdef HAVE_STRING_H
 
38
#include <string.h>
 
39
#endif
 
40
 
 
41
#include "archive.h"
 
42
#include "archive_entry.h"
 
43
#include "archive_private.h"
 
44
#include "archive_write_private.h"
 
45
 
 
46
static ssize_t  archive_write_newc_data(struct archive_write *,
 
47
                    const void *buff, size_t s);
 
48
static int      archive_write_newc_finish(struct archive_write *);
 
49
static int      archive_write_newc_destroy(struct archive_write *);
 
50
static int      archive_write_newc_finish_entry(struct archive_write *);
 
51
static int      archive_write_newc_header(struct archive_write *,
 
52
                    struct archive_entry *);
 
53
static int      format_hex(int64_t, void *, int);
 
54
static int64_t  format_hex_recursive(int64_t, char *, int);
 
55
 
 
56
struct cpio {
 
57
        uint64_t          entry_bytes_remaining;
 
58
        int               padding;
 
59
};
 
60
 
 
61
struct cpio_header_newc {
 
62
        char    c_magic[6];
 
63
        char    c_ino[8];
 
64
        char    c_mode[8];
 
65
        char    c_uid[8];
 
66
        char    c_gid[8];
 
67
        char    c_nlink[8];
 
68
        char    c_mtime[8];
 
69
        char    c_filesize[8];
 
70
        char    c_devmajor[8];
 
71
        char    c_devminor[8];
 
72
        char    c_rdevmajor[8];
 
73
        char    c_rdevminor[8];
 
74
        char    c_namesize[8];
 
75
        char    c_checksum[8];
 
76
};
 
77
 
 
78
/* Logic trick: difference between 'n' and next multiple of 4 */
 
79
#define PAD4(n) (3 & (1 + ~(n)))
 
80
 
 
81
/*
 
82
 * Set output format to 'cpio' format.
 
83
 */
 
84
int
 
85
archive_write_set_format_cpio_newc(struct archive *_a)
 
86
{
 
87
        struct archive_write *a = (struct archive_write *)_a;
 
88
        struct cpio *cpio;
 
89
 
 
90
        /* If someone else was already registered, unregister them. */
 
91
        if (a->format_destroy != NULL)
 
92
                (a->format_destroy)(a);
 
93
 
 
94
        cpio = (struct cpio *)malloc(sizeof(*cpio));
 
95
        if (cpio == NULL) {
 
96
                archive_set_error(&a->archive, ENOMEM, "Can't allocate cpio data");
 
97
                return (ARCHIVE_FATAL);
 
98
        }
 
99
        memset(cpio, 0, sizeof(*cpio));
 
100
        a->format_data = cpio;
 
101
 
 
102
        a->pad_uncompressed = 1;
 
103
        a->format_name = "cpio";
 
104
        a->format_write_header = archive_write_newc_header;
 
105
        a->format_write_data = archive_write_newc_data;
 
106
        a->format_finish_entry = archive_write_newc_finish_entry;
 
107
        a->format_finish = archive_write_newc_finish;
 
108
        a->format_destroy = archive_write_newc_destroy;
 
109
        a->archive.archive_format = ARCHIVE_FORMAT_CPIO_SVR4_NOCRC;
 
110
        a->archive.archive_format_name = "SVR4 cpio nocrc";
 
111
        return (ARCHIVE_OK);
 
112
}
 
113
 
 
114
static int
 
115
archive_write_newc_header(struct archive_write *a, struct archive_entry *entry)
 
116
{
 
117
        int64_t ino;
 
118
        struct cpio *cpio;
 
119
        const char *p, *path;
 
120
        int pathlength, ret, ret2;
 
121
        struct cpio_header_newc  h;
 
122
        int pad;
 
123
 
 
124
        cpio = (struct cpio *)a->format_data;
 
125
        ret2 = ARCHIVE_OK;
 
126
 
 
127
        path = archive_entry_pathname(entry);
 
128
        pathlength = (int)strlen(path) + 1; /* Include trailing null. */
 
129
 
 
130
        memset(&h, 0, sizeof(h));
 
131
        format_hex(0x070701, &h.c_magic, sizeof(h.c_magic));
 
132
        format_hex(archive_entry_devmajor(entry), &h.c_devmajor,
 
133
            sizeof(h.c_devmajor));
 
134
        format_hex(archive_entry_devminor(entry), &h.c_devminor,
 
135
            sizeof(h.c_devminor));
 
136
 
 
137
        ino = archive_entry_ino64(entry);
 
138
        if (ino > 0xffffffff) {
 
139
                archive_set_error(&a->archive, ERANGE,
 
140
                    "large inode number truncated");
 
141
                ret2 = ARCHIVE_WARN;
 
142
        }
 
143
 
 
144
        format_hex(ino & 0xffffffff, &h.c_ino, sizeof(h.c_ino));
 
145
        format_hex(archive_entry_mode(entry), &h.c_mode, sizeof(h.c_mode));
 
146
        format_hex(archive_entry_uid(entry), &h.c_uid, sizeof(h.c_uid));
 
147
        format_hex(archive_entry_gid(entry), &h.c_gid, sizeof(h.c_gid));
 
148
        format_hex(archive_entry_nlink(entry), &h.c_nlink, sizeof(h.c_nlink));
 
149
        if (archive_entry_filetype(entry) == AE_IFBLK
 
150
            || archive_entry_filetype(entry) == AE_IFCHR) {
 
151
            format_hex(archive_entry_rdevmajor(entry), &h.c_rdevmajor, sizeof(h.c_rdevmajor));
 
152
            format_hex(archive_entry_rdevminor(entry), &h.c_rdevminor, sizeof(h.c_rdevminor));
 
153
        } else {
 
154
            format_hex(0, &h.c_rdevmajor, sizeof(h.c_rdevmajor));
 
155
            format_hex(0, &h.c_rdevminor, sizeof(h.c_rdevminor));
 
156
        }
 
157
        format_hex(archive_entry_mtime(entry), &h.c_mtime, sizeof(h.c_mtime));
 
158
        format_hex(pathlength, &h.c_namesize, sizeof(h.c_namesize));
 
159
        format_hex(0, &h.c_checksum, sizeof(h.c_checksum));
 
160
 
 
161
        /* Non-regular files don't store bodies. */
 
162
        if (archive_entry_filetype(entry) != AE_IFREG)
 
163
                archive_entry_set_size(entry, 0);
 
164
 
 
165
        /* Symlinks get the link written as the body of the entry. */
 
166
        p = archive_entry_symlink(entry);
 
167
        if (p != NULL  &&  *p != '\0')
 
168
                format_hex(strlen(p), &h.c_filesize, sizeof(h.c_filesize));
 
169
        else
 
170
                format_hex(archive_entry_size(entry),
 
171
                    &h.c_filesize, sizeof(h.c_filesize));
 
172
 
 
173
        ret = (a->compressor.write)(a, &h, sizeof(h));
 
174
        if (ret != ARCHIVE_OK)
 
175
                return (ARCHIVE_FATAL);
 
176
 
 
177
        /* Pad pathname to even length. */
 
178
        ret = (a->compressor.write)(a, path, pathlength);
 
179
        if (ret != ARCHIVE_OK)
 
180
                return (ARCHIVE_FATAL);
 
181
        pad = PAD4(pathlength + sizeof(struct cpio_header_newc));
 
182
        if (pad)
 
183
                ret = (a->compressor.write)(a, "\0\0\0", pad);
 
184
        if (ret != ARCHIVE_OK)
 
185
                return (ARCHIVE_FATAL);
 
186
 
 
187
        cpio->entry_bytes_remaining = archive_entry_size(entry);
 
188
        cpio->padding = PAD4(cpio->entry_bytes_remaining);
 
189
 
 
190
        /* Write the symlink now. */
 
191
        if (p != NULL  &&  *p != '\0') {
 
192
                ret = (a->compressor.write)(a, p, strlen(p));
 
193
                if (ret != ARCHIVE_OK)
 
194
                        return (ARCHIVE_FATAL);
 
195
                pad = PAD4(strlen(p));
 
196
                ret = (a->compressor.write)(a, "\0\0\0", pad);
 
197
        }
 
198
 
 
199
        if (ret == ARCHIVE_OK)
 
200
                ret = ret2;
 
201
        return (ret);
 
202
}
 
203
 
 
204
static ssize_t
 
205
archive_write_newc_data(struct archive_write *a, const void *buff, size_t s)
 
206
{
 
207
        struct cpio *cpio;
 
208
        int ret;
 
209
 
 
210
        cpio = (struct cpio *)a->format_data;
 
211
        if (s > cpio->entry_bytes_remaining)
 
212
                s = cpio->entry_bytes_remaining;
 
213
 
 
214
        ret = (a->compressor.write)(a, buff, s);
 
215
        cpio->entry_bytes_remaining -= s;
 
216
        if (ret >= 0)
 
217
                return (s);
 
218
        else
 
219
                return (ret);
 
220
}
 
221
 
 
222
/*
 
223
 * Format a number into the specified field.
 
224
 */
 
225
static int
 
226
format_hex(int64_t v, void *p, int digits)
 
227
{
 
228
        int64_t max;
 
229
        int     ret;
 
230
 
 
231
        max = (((int64_t)1) << (digits * 4)) - 1;
 
232
        if (v >= 0  &&  v <= max) {
 
233
            format_hex_recursive(v, (char *)p, digits);
 
234
            ret = 0;
 
235
        } else {
 
236
            format_hex_recursive(max, (char *)p, digits);
 
237
            ret = -1;
 
238
        }
 
239
        return (ret);
 
240
}
 
241
 
 
242
static int64_t
 
243
format_hex_recursive(int64_t v, char *p, int s)
 
244
{
 
245
        if (s == 0)
 
246
                return (v);
 
247
        v = format_hex_recursive(v, p+1, s-1);
 
248
        *p = "0123456789abcdef"[v & 0xf];
 
249
        return (v >> 4);
 
250
}
 
251
 
 
252
static int
 
253
archive_write_newc_finish(struct archive_write *a)
 
254
{
 
255
        int er;
 
256
        struct archive_entry *trailer;
 
257
 
 
258
        trailer = archive_entry_new();
 
259
        archive_entry_set_nlink(trailer, 1);
 
260
        archive_entry_set_pathname(trailer, "TRAILER!!!");
 
261
        er = archive_write_newc_header(a, trailer);
 
262
        archive_entry_free(trailer);
 
263
        return (er);
 
264
}
 
265
 
 
266
static int
 
267
archive_write_newc_destroy(struct archive_write *a)
 
268
{
 
269
        struct cpio *cpio;
 
270
 
 
271
        cpio = (struct cpio *)a->format_data;
 
272
        free(cpio);
 
273
        a->format_data = NULL;
 
274
        return (ARCHIVE_OK);
 
275
}
 
276
 
 
277
static int
 
278
archive_write_newc_finish_entry(struct archive_write *a)
 
279
{
 
280
        struct cpio *cpio;
 
281
        size_t to_write;
 
282
        int ret;
 
283
 
 
284
        cpio = (struct cpio *)a->format_data;
 
285
        while (cpio->entry_bytes_remaining > 0) {
 
286
                to_write = cpio->entry_bytes_remaining < a->null_length ?
 
287
                    cpio->entry_bytes_remaining : a->null_length;
 
288
                ret = (a->compressor.write)(a, a->nulls, to_write);
 
289
                if (ret != ARCHIVE_OK)
 
290
                        return (ret);
 
291
                cpio->entry_bytes_remaining -= to_write;
 
292
        }
 
293
        ret = (a->compressor.write)(a, a->nulls, cpio->padding);
 
294
        return (ret);
 
295
}