~gl-az/percona-xtrabackup/2.1-io-block-size

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_write.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: (391.1.5 staging)
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2003-2007 Tim Kientzle
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
#include "archive_platform.h"
 
27
__FBSDID("$FreeBSD: head/lib/libarchive/archive_write.c 201099 2009-12-28 03:03:00Z kientzle $");
 
28
 
 
29
/*
 
30
 * This file contains the "essential" portions of the write API, that
 
31
 * is, stuff that will essentially always be used by any client that
 
32
 * actually needs to write a archive.  Optional pieces have been, as
 
33
 * far as possible, separated out into separate files to reduce
 
34
 * needlessly bloating statically-linked clients.
 
35
 */
 
36
 
 
37
#ifdef HAVE_SYS_WAIT_H
 
38
#include <sys/wait.h>
 
39
#endif
 
40
#ifdef HAVE_LIMITS_H
 
41
#include <limits.h>
 
42
#endif
 
43
#include <stdio.h>
 
44
#ifdef HAVE_STDLIB_H
 
45
#include <stdlib.h>
 
46
#endif
 
47
#ifdef HAVE_STRING_H
 
48
#include <string.h>
 
49
#endif
 
50
#include <time.h>
 
51
#ifdef HAVE_UNISTD_H
 
52
#include <unistd.h>
 
53
#endif
 
54
 
 
55
#include "archive.h"
 
56
#include "archive_entry.h"
 
57
#include "archive_private.h"
 
58
#include "archive_write_private.h"
 
59
 
 
60
static struct archive_vtable *archive_write_vtable(void);
 
61
 
 
62
static int      _archive_write_close(struct archive *);
 
63
static int      _archive_write_finish(struct archive *);
 
64
static int      _archive_write_header(struct archive *, struct archive_entry *);
 
65
static int      _archive_write_finish_entry(struct archive *);
 
66
static ssize_t  _archive_write_data(struct archive *, const void *, size_t);
 
67
 
 
68
static struct archive_vtable *
 
69
archive_write_vtable(void)
 
70
{
 
71
        static struct archive_vtable av;
 
72
        static int inited = 0;
 
73
 
 
74
        if (!inited) {
 
75
                av.archive_close = _archive_write_close;
 
76
                av.archive_finish = _archive_write_finish;
 
77
                av.archive_write_header = _archive_write_header;
 
78
                av.archive_write_finish_entry = _archive_write_finish_entry;
 
79
                av.archive_write_data = _archive_write_data;
 
80
        }
 
81
        return (&av);
 
82
}
 
83
 
 
84
/*
 
85
 * Allocate, initialize and return an archive object.
 
86
 */
 
87
struct archive *
 
88
archive_write_new(void)
 
89
{
 
90
        struct archive_write *a;
 
91
        unsigned char *nulls;
 
92
 
 
93
        a = (struct archive_write *)malloc(sizeof(*a));
 
94
        if (a == NULL)
 
95
                return (NULL);
 
96
        memset(a, 0, sizeof(*a));
 
97
        a->archive.magic = ARCHIVE_WRITE_MAGIC;
 
98
        a->archive.state = ARCHIVE_STATE_NEW;
 
99
        a->archive.vtable = archive_write_vtable();
 
100
        /*
 
101
         * The value 10240 here matches the traditional tar default,
 
102
         * but is otherwise arbitrary.
 
103
         * TODO: Set the default block size from the format selected.
 
104
         */
 
105
        a->bytes_per_block = 10240;
 
106
        a->bytes_in_last_block = -1;    /* Default */
 
107
 
 
108
        /* Initialize a block of nulls for padding purposes. */
 
109
        a->null_length = 1024;
 
110
        nulls = (unsigned char *)malloc(a->null_length);
 
111
        if (nulls == NULL) {
 
112
                free(a);
 
113
                return (NULL);
 
114
        }
 
115
        memset(nulls, 0, a->null_length);
 
116
        a->nulls = nulls;
 
117
        /*
 
118
         * Set default compression, but don't set a default format.
 
119
         * Were we to set a default format here, we would force every
 
120
         * client to link in support for that format, even if they didn't
 
121
         * ever use it.
 
122
         */
 
123
        archive_write_set_compression_none(&a->archive);
 
124
        return (&a->archive);
 
125
}
 
126
 
 
127
/*
 
128
 * Set write options for the format. Returns 0 if successful.
 
129
 */
 
130
int
 
131
archive_write_set_format_options(struct archive *_a, const char *s)
 
132
{
 
133
        struct archive_write *a = (struct archive_write *)_a;
 
134
        char key[64], val[64];
 
135
        int len, r, ret = ARCHIVE_OK;
 
136
 
 
137
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
138
            ARCHIVE_STATE_NEW, "archive_write_set_format_options");
 
139
        archive_clear_error(&a->archive);
 
140
 
 
141
        if (s == NULL || *s == '\0')
 
142
                return (ARCHIVE_OK);
 
143
        if (a->format_options == NULL)
 
144
                /* This format does not support option. */
 
145
                return (ARCHIVE_OK);
 
146
 
 
147
        while ((len = __archive_parse_options(s, a->format_name,
 
148
            sizeof(key), key, sizeof(val), val)) > 0) {
 
149
                if (val[0] == '\0')
 
150
                        r = a->format_options(a, key, NULL);
 
151
                else
 
152
                        r = a->format_options(a, key, val);
 
153
                if (r == ARCHIVE_FATAL)
 
154
                        return (r);
 
155
                if (r < ARCHIVE_OK) { /* This key was not handled. */
 
156
                        archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC,
 
157
                            "Unsupported option ``%s''", key);
 
158
                        ret = ARCHIVE_WARN;
 
159
                }
 
160
                s += len;
 
161
        }
 
162
        if (len < 0) {
 
163
                archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC,
 
164
                    "Malformed options string.");
 
165
                return (ARCHIVE_WARN);
 
166
        }
 
167
        return (ret);
 
168
}
 
169
 
 
170
/*
 
171
 * Set write options for the compressor. Returns 0 if successful.
 
172
 */
 
173
int
 
174
archive_write_set_compressor_options(struct archive *_a, const char *s)
 
175
{
 
176
        struct archive_write *a = (struct archive_write *)_a;
 
177
        char key[64], val[64];
 
178
        int len, r;
 
179
        int ret = ARCHIVE_OK;
 
180
 
 
181
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
182
            ARCHIVE_STATE_NEW, "archive_write_set_compressor_options");
 
183
        archive_clear_error(&a->archive);
 
184
 
 
185
        if (s == NULL || *s == '\0')
 
186
                return (ARCHIVE_OK);
 
187
        if (a->compressor.options == NULL) {
 
188
                archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC,
 
189
                    "Unsupported option ``%s''", s);
 
190
                /* This compressor does not support option. */
 
191
                return (ARCHIVE_WARN);
 
192
        }
 
193
 
 
194
        while ((len = __archive_parse_options(s, a->archive.compression_name,
 
195
            sizeof(key), key, sizeof(val), val)) > 0) {
 
196
                if (val[0] == '\0')
 
197
                        r = a->compressor.options(a, key, NULL);
 
198
                else
 
199
                        r = a->compressor.options(a, key, val);
 
200
                if (r == ARCHIVE_FATAL)
 
201
                        return (r);
 
202
                if (r < ARCHIVE_OK) {
 
203
                        archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC,
 
204
                            "Unsupported option ``%s''", key);
 
205
                        ret = ARCHIVE_WARN;
 
206
                }
 
207
                s += len;
 
208
        }
 
209
        if (len < 0) {
 
210
                archive_set_error(&a->archive, ARCHIVE_ERRNO_MISC,
 
211
                    "Illegal format options.");
 
212
                return (ARCHIVE_WARN);
 
213
        }
 
214
        return (ret);
 
215
}
 
216
 
 
217
/*
 
218
 * Set write options for the format and the compressor. Returns 0 if successful.
 
219
 */
 
220
int
 
221
archive_write_set_options(struct archive *_a, const char *s)
 
222
{
 
223
        int r1, r2;
 
224
 
 
225
        r1 = archive_write_set_format_options(_a, s);
 
226
        if (r1 < ARCHIVE_WARN)
 
227
                return (r1);
 
228
        r2 = archive_write_set_compressor_options(_a, s);
 
229
        if (r2 < ARCHIVE_WARN)
 
230
                return (r2);
 
231
        if (r1 == ARCHIVE_WARN && r2 == ARCHIVE_WARN)
 
232
                return (ARCHIVE_WARN);
 
233
        return (ARCHIVE_OK);
 
234
}
 
235
 
 
236
/*
 
237
 * Set the block size.  Returns 0 if successful.
 
238
 */
 
239
int
 
240
archive_write_set_bytes_per_block(struct archive *_a, int bytes_per_block)
 
241
{
 
242
        struct archive_write *a = (struct archive_write *)_a;
 
243
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
244
            ARCHIVE_STATE_NEW, "archive_write_set_bytes_per_block");
 
245
        a->bytes_per_block = bytes_per_block;
 
246
        return (ARCHIVE_OK);
 
247
}
 
248
 
 
249
/*
 
250
 * Get the current block size.  -1 if it has never been set.
 
251
 */
 
252
int
 
253
archive_write_get_bytes_per_block(struct archive *_a)
 
254
{
 
255
        struct archive_write *a = (struct archive_write *)_a;
 
256
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
257
            ARCHIVE_STATE_ANY, "archive_write_get_bytes_per_block");
 
258
        return (a->bytes_per_block);
 
259
}
 
260
 
 
261
/*
 
262
 * Set the size for the last block.
 
263
 * Returns 0 if successful.
 
264
 */
 
265
int
 
266
archive_write_set_bytes_in_last_block(struct archive *_a, int bytes)
 
267
{
 
268
        struct archive_write *a = (struct archive_write *)_a;
 
269
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
270
            ARCHIVE_STATE_ANY, "archive_write_set_bytes_in_last_block");
 
271
        a->bytes_in_last_block = bytes;
 
272
        return (ARCHIVE_OK);
 
273
}
 
274
 
 
275
/*
 
276
 * Return the value set above.  -1 indicates it has not been set.
 
277
 */
 
278
int
 
279
archive_write_get_bytes_in_last_block(struct archive *_a)
 
280
{
 
281
        struct archive_write *a = (struct archive_write *)_a;
 
282
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
283
            ARCHIVE_STATE_ANY, "archive_write_get_bytes_in_last_block");
 
284
        return (a->bytes_in_last_block);
 
285
}
 
286
 
 
287
 
 
288
/*
 
289
 * dev/ino of a file to be rejected.  Used to prevent adding
 
290
 * an archive to itself recursively.
 
291
 */
 
292
int
 
293
archive_write_set_skip_file(struct archive *_a, dev_t d, ino_t i)
 
294
{
 
295
        struct archive_write *a = (struct archive_write *)_a;
 
296
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
297
            ARCHIVE_STATE_ANY, "archive_write_set_skip_file");
 
298
        a->skip_file_dev = d;
 
299
        a->skip_file_ino = i;
 
300
        return (ARCHIVE_OK);
 
301
}
 
302
 
 
303
 
 
304
/*
 
305
 * Open the archive using the current settings.
 
306
 */
 
307
int
 
308
archive_write_open(struct archive *_a, void *client_data,
 
309
    archive_open_callback *opener, archive_write_callback *writer,
 
310
    archive_close_callback *closer)
 
311
{
 
312
        struct archive_write *a = (struct archive_write *)_a;
 
313
        int ret;
 
314
 
 
315
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
316
            ARCHIVE_STATE_NEW, "archive_write_open");
 
317
        archive_clear_error(&a->archive);
 
318
        a->archive.state = ARCHIVE_STATE_HEADER;
 
319
        a->client_data = client_data;
 
320
        a->client_writer = writer;
 
321
        a->client_opener = opener;
 
322
        a->client_closer = closer;
 
323
        ret = (a->compressor.init)(a);
 
324
        if (a->format_init && ret == ARCHIVE_OK)
 
325
                ret = (a->format_init)(a);
 
326
        return (ret);
 
327
}
 
328
 
 
329
 
 
330
/*
 
331
 * Close out the archive.
 
332
 *
 
333
 * Be careful: user might just call write_new and then write_finish.
 
334
 * Don't assume we actually wrote anything or performed any non-trivial
 
335
 * initialization.
 
336
 */
 
337
static int
 
338
_archive_write_close(struct archive *_a)
 
339
{
 
340
        struct archive_write *a = (struct archive_write *)_a;
 
341
        int r = ARCHIVE_OK, r1 = ARCHIVE_OK;
 
342
 
 
343
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
344
            ARCHIVE_STATE_ANY, "archive_write_close");
 
345
 
 
346
        /* Finish the last entry. */
 
347
        if (a->archive.state & ARCHIVE_STATE_DATA)
 
348
                r = ((a->format_finish_entry)(a));
 
349
 
 
350
        /* Finish off the archive. */
 
351
        if (a->format_finish != NULL) {
 
352
                r1 = (a->format_finish)(a);
 
353
                if (r1 < r)
 
354
                        r = r1;
 
355
        }
 
356
 
 
357
        /* Release format resources. */
 
358
        if (a->format_destroy != NULL) {
 
359
                r1 = (a->format_destroy)(a);
 
360
                if (r1 < r)
 
361
                        r = r1;
 
362
        }
 
363
 
 
364
        /* Finish the compression and close the stream. */
 
365
        if (a->compressor.finish != NULL) {
 
366
                r1 = (a->compressor.finish)(a);
 
367
                if (r1 < r)
 
368
                        r = r1;
 
369
        }
 
370
 
 
371
        /* Close out the client stream. */
 
372
        if (a->client_closer != NULL) {
 
373
                r1 = (a->client_closer)(&a->archive, a->client_data);
 
374
                if (r1 < r)
 
375
                        r = r1;
 
376
        }
 
377
 
 
378
        a->archive.state = ARCHIVE_STATE_CLOSED;
 
379
        return (r);
 
380
}
 
381
 
 
382
/*
 
383
 * Destroy the archive structure.
 
384
 */
 
385
static int
 
386
_archive_write_finish(struct archive *_a)
 
387
{
 
388
        struct archive_write *a = (struct archive_write *)_a;
 
389
        int r = ARCHIVE_OK;
 
390
 
 
391
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
392
            ARCHIVE_STATE_ANY, "archive_write_finish");
 
393
        if (a->archive.state != ARCHIVE_STATE_CLOSED)
 
394
                r = archive_write_close(&a->archive);
 
395
 
 
396
        /* Release various dynamic buffers. */
 
397
        free((void *)(uintptr_t)(const void *)a->nulls);
 
398
        archive_string_free(&a->archive.error_string);
 
399
        a->archive.magic = 0;
 
400
        free(a);
 
401
        return (r);
 
402
}
 
403
 
 
404
/*
 
405
 * Write the appropriate header.
 
406
 */
 
407
static int
 
408
_archive_write_header(struct archive *_a, struct archive_entry *entry)
 
409
{
 
410
        struct archive_write *a = (struct archive_write *)_a;
 
411
        int ret, r2;
 
412
 
 
413
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
414
            ARCHIVE_STATE_DATA | ARCHIVE_STATE_HEADER, "archive_write_header");
 
415
        archive_clear_error(&a->archive);
 
416
 
 
417
        /* In particular, "retry" and "fatal" get returned immediately. */
 
418
        ret = archive_write_finish_entry(&a->archive);
 
419
        if (ret < ARCHIVE_OK && ret != ARCHIVE_WARN)
 
420
                return (ret);
 
421
 
 
422
        if (a->skip_file_dev != 0 &&
 
423
            archive_entry_dev(entry) == a->skip_file_dev &&
 
424
            a->skip_file_ino != 0 &&
 
425
            archive_entry_ino64(entry) == a->skip_file_ino) {
 
426
                archive_set_error(&a->archive, 0,
 
427
                    "Can't add archive to itself");
 
428
                return (ARCHIVE_FAILED);
 
429
        }
 
430
 
 
431
        /* Format and write header. */
 
432
        r2 = ((a->format_write_header)(a, entry));
 
433
        if (r2 < ret)
 
434
                ret = r2;
 
435
 
 
436
        a->archive.state = ARCHIVE_STATE_DATA;
 
437
        return (ret);
 
438
}
 
439
 
 
440
static int
 
441
_archive_write_finish_entry(struct archive *_a)
 
442
{
 
443
        struct archive_write *a = (struct archive_write *)_a;
 
444
        int ret = ARCHIVE_OK;
 
445
 
 
446
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
447
            ARCHIVE_STATE_HEADER | ARCHIVE_STATE_DATA,
 
448
            "archive_write_finish_entry");
 
449
        if (a->archive.state & ARCHIVE_STATE_DATA)
 
450
                ret = (a->format_finish_entry)(a);
 
451
        a->archive.state = ARCHIVE_STATE_HEADER;
 
452
        return (ret);
 
453
}
 
454
 
 
455
/*
 
456
 * Note that the compressor is responsible for blocking.
 
457
 */
 
458
static ssize_t
 
459
_archive_write_data(struct archive *_a, const void *buff, size_t s)
 
460
{
 
461
        struct archive_write *a = (struct archive_write *)_a;
 
462
        __archive_check_magic(&a->archive, ARCHIVE_WRITE_MAGIC,
 
463
            ARCHIVE_STATE_DATA, "archive_write_data");
 
464
        archive_clear_error(&a->archive);
 
465
        return ((a->format_write_data)(a, buff, s));
 
466
}