~akopytov/percona-xtrabackup/bug1166888-2.0

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_entry_link_resolver.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2003-2007 Tim Kientzle
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
#include "archive_platform.h"
 
27
__FBSDID("$FreeBSD: head/lib/libarchive/archive_entry_link_resolver.c 201100 2009-12-28 03:05:31Z kientzle $");
 
28
 
 
29
#ifdef HAVE_SYS_STAT_H
 
30
#include <sys/stat.h>
 
31
#endif
 
32
#ifdef HAVE_ERRNO_H
 
33
#include <errno.h>
 
34
#endif
 
35
#include <stdio.h>
 
36
#ifdef HAVE_STDLIB_H
 
37
#include <stdlib.h>
 
38
#endif
 
39
#ifdef HAVE_STRING_H
 
40
#include <string.h>
 
41
#endif
 
42
 
 
43
#include "archive.h"
 
44
#include "archive_entry.h"
 
45
 
 
46
/*
 
47
 * This is mostly a pretty straightforward hash table implementation.
 
48
 * The only interesting bit is the different strategies used to
 
49
 * match up links.  These strategies match those used by various
 
50
 * archiving formats:
 
51
 *   tar - content stored with first link, remainder refer back to it.
 
52
 *       This requires us to match each subsequent link up with the
 
53
 *       first appearance.
 
54
 *   cpio - Old cpio just stored body with each link, match-ups were
 
55
 *       implicit.  This is trivial.
 
56
 *   new cpio - New cpio only stores body with last link, match-ups
 
57
 *       are implicit.  This is actually quite tricky; see the notes
 
58
 *       below.
 
59
 */
 
60
 
 
61
/* Users pass us a format code, we translate that into a strategy here. */
 
62
#define ARCHIVE_ENTRY_LINKIFY_LIKE_TAR  0
 
63
#define ARCHIVE_ENTRY_LINKIFY_LIKE_MTREE 1
 
64
#define ARCHIVE_ENTRY_LINKIFY_LIKE_OLD_CPIO 2
 
65
#define ARCHIVE_ENTRY_LINKIFY_LIKE_NEW_CPIO 3
 
66
 
 
67
/* Initial size of link cache. */
 
68
#define links_cache_initial_size 1024
 
69
 
 
70
struct links_entry {
 
71
        struct links_entry      *next;
 
72
        struct links_entry      *previous;
 
73
        int                      links; /* # links not yet seen */
 
74
        int                      hash;
 
75
        struct archive_entry    *canonical;
 
76
        struct archive_entry    *entry;
 
77
};
 
78
 
 
79
struct archive_entry_linkresolver {
 
80
        struct links_entry      **buckets;
 
81
        struct links_entry       *spare;
 
82
        unsigned long             number_entries;
 
83
        size_t                    number_buckets;
 
84
        int                       strategy;
 
85
};
 
86
 
 
87
static struct links_entry *find_entry(struct archive_entry_linkresolver *,
 
88
                    struct archive_entry *);
 
89
static void grow_hash(struct archive_entry_linkresolver *);
 
90
static struct links_entry *insert_entry(struct archive_entry_linkresolver *,
 
91
                    struct archive_entry *);
 
92
static struct links_entry *next_entry(struct archive_entry_linkresolver *);
 
93
 
 
94
struct archive_entry_linkresolver *
 
95
archive_entry_linkresolver_new(void)
 
96
{
 
97
        struct archive_entry_linkresolver *res;
 
98
        size_t i;
 
99
 
 
100
        res = malloc(sizeof(struct archive_entry_linkresolver));
 
101
        if (res == NULL)
 
102
                return (NULL);
 
103
        memset(res, 0, sizeof(struct archive_entry_linkresolver));
 
104
        res->number_buckets = links_cache_initial_size;
 
105
        res->buckets = malloc(res->number_buckets *
 
106
            sizeof(res->buckets[0]));
 
107
        if (res->buckets == NULL) {
 
108
                free(res);
 
109
                return (NULL);
 
110
        }
 
111
        for (i = 0; i < res->number_buckets; i++)
 
112
                res->buckets[i] = NULL;
 
113
        return (res);
 
114
}
 
115
 
 
116
void
 
117
archive_entry_linkresolver_set_strategy(struct archive_entry_linkresolver *res,
 
118
    int fmt)
 
119
{
 
120
        int fmtbase = fmt & ARCHIVE_FORMAT_BASE_MASK;
 
121
 
 
122
        switch (fmtbase) {
 
123
        case ARCHIVE_FORMAT_CPIO:
 
124
                switch (fmt) {
 
125
                case ARCHIVE_FORMAT_CPIO_SVR4_NOCRC:
 
126
                case ARCHIVE_FORMAT_CPIO_SVR4_CRC:
 
127
                        res->strategy = ARCHIVE_ENTRY_LINKIFY_LIKE_NEW_CPIO;
 
128
                        break;
 
129
                default:
 
130
                        res->strategy = ARCHIVE_ENTRY_LINKIFY_LIKE_OLD_CPIO;
 
131
                        break;
 
132
                }
 
133
                break;
 
134
        case ARCHIVE_FORMAT_MTREE:
 
135
                res->strategy = ARCHIVE_ENTRY_LINKIFY_LIKE_MTREE;
 
136
                break;
 
137
        case ARCHIVE_FORMAT_TAR:
 
138
                res->strategy = ARCHIVE_ENTRY_LINKIFY_LIKE_TAR;
 
139
                break;
 
140
        default:
 
141
                res->strategy = ARCHIVE_ENTRY_LINKIFY_LIKE_TAR;
 
142
                break;
 
143
        }
 
144
}
 
145
 
 
146
void
 
147
archive_entry_linkresolver_free(struct archive_entry_linkresolver *res)
 
148
{
 
149
        struct links_entry *le;
 
150
 
 
151
        if (res == NULL)
 
152
                return;
 
153
 
 
154
        if (res->buckets != NULL) {
 
155
                while ((le = next_entry(res)) != NULL)
 
156
                        archive_entry_free(le->entry);
 
157
                free(res->buckets);
 
158
                res->buckets = NULL;
 
159
        }
 
160
        free(res);
 
161
}
 
162
 
 
163
void
 
164
archive_entry_linkify(struct archive_entry_linkresolver *res,
 
165
    struct archive_entry **e, struct archive_entry **f)
 
166
{
 
167
        struct links_entry *le;
 
168
        struct archive_entry *t;
 
169
 
 
170
        *f = NULL; /* Default: Don't return a second entry. */
 
171
 
 
172
        if (*e == NULL) {
 
173
                le = next_entry(res);
 
174
                if (le != NULL) {
 
175
                        *e = le->entry;
 
176
                        le->entry = NULL;
 
177
                }
 
178
                return;
 
179
        }
 
180
 
 
181
        /* If it has only one link, then we're done. */
 
182
        if (archive_entry_nlink(*e) == 1)
 
183
                return;
 
184
        /* Directories, devices never have hardlinks. */
 
185
        if (archive_entry_filetype(*e) == AE_IFDIR
 
186
            || archive_entry_filetype(*e) == AE_IFBLK
 
187
            || archive_entry_filetype(*e) == AE_IFCHR)
 
188
                return;
 
189
 
 
190
        switch (res->strategy) {
 
191
        case ARCHIVE_ENTRY_LINKIFY_LIKE_TAR:
 
192
                le = find_entry(res, *e);
 
193
                if (le != NULL) {
 
194
                        archive_entry_unset_size(*e);
 
195
                        archive_entry_copy_hardlink(*e,
 
196
                            archive_entry_pathname(le->canonical));
 
197
                } else
 
198
                        insert_entry(res, *e);
 
199
                return;
 
200
        case ARCHIVE_ENTRY_LINKIFY_LIKE_MTREE:
 
201
                le = find_entry(res, *e);
 
202
                if (le != NULL) {
 
203
                        archive_entry_copy_hardlink(*e,
 
204
                            archive_entry_pathname(le->canonical));
 
205
                } else
 
206
                        insert_entry(res, *e);
 
207
                return;
 
208
        case ARCHIVE_ENTRY_LINKIFY_LIKE_OLD_CPIO:
 
209
                /* This one is trivial. */
 
210
                return;
 
211
        case ARCHIVE_ENTRY_LINKIFY_LIKE_NEW_CPIO:
 
212
                le = find_entry(res, *e);
 
213
                if (le != NULL) {
 
214
                        /*
 
215
                         * Put the new entry in le, return the
 
216
                         * old entry from le.
 
217
                         */
 
218
                        t = *e;
 
219
                        *e = le->entry;
 
220
                        le->entry = t;
 
221
                        /* Make the old entry into a hardlink. */
 
222
                        archive_entry_unset_size(*e);
 
223
                        archive_entry_copy_hardlink(*e,
 
224
                            archive_entry_pathname(le->canonical));
 
225
                        /* If we ran out of links, return the
 
226
                         * final entry as well. */
 
227
                        if (le->links == 0) {
 
228
                                *f = le->entry;
 
229
                                le->entry = NULL;
 
230
                        }
 
231
                } else {
 
232
                        /*
 
233
                         * If we haven't seen it, tuck it away
 
234
                         * for future use.
 
235
                         */
 
236
                        le = insert_entry(res, *e);
 
237
                        le->entry = *e;
 
238
                        *e = NULL;
 
239
                }
 
240
                return;
 
241
        default:
 
242
                break;
 
243
        }
 
244
        return;
 
245
}
 
246
 
 
247
static struct links_entry *
 
248
find_entry(struct archive_entry_linkresolver *res,
 
249
    struct archive_entry *entry)
 
250
{
 
251
        struct links_entry      *le;
 
252
        int                      hash, bucket;
 
253
        dev_t                    dev;
 
254
        int64_t                  ino;
 
255
 
 
256
        /* Free a held entry. */
 
257
        if (res->spare != NULL) {
 
258
                archive_entry_free(res->spare->canonical);
 
259
                archive_entry_free(res->spare->entry);
 
260
                free(res->spare);
 
261
                res->spare = NULL;
 
262
        }
 
263
 
 
264
        /* If the links cache overflowed and got flushed, don't bother. */
 
265
        if (res->buckets == NULL)
 
266
                return (NULL);
 
267
 
 
268
        dev = archive_entry_dev(entry);
 
269
        ino = archive_entry_ino64(entry);
 
270
        hash = (int)(dev ^ ino);
 
271
 
 
272
        /* Try to locate this entry in the links cache. */
 
273
        bucket = hash % res->number_buckets;
 
274
        for (le = res->buckets[bucket]; le != NULL; le = le->next) {
 
275
                if (le->hash == hash
 
276
                    && dev == archive_entry_dev(le->canonical)
 
277
                    && ino == archive_entry_ino64(le->canonical)) {
 
278
                        /*
 
279
                         * Decrement link count each time and release
 
280
                         * the entry if it hits zero.  This saves
 
281
                         * memory and is necessary for detecting
 
282
                         * missed links.
 
283
                         */
 
284
                        --le->links;
 
285
                        if (le->links > 0)
 
286
                                return (le);
 
287
                        /* Remove it from this hash bucket. */
 
288
                        if (le->previous != NULL)
 
289
                                le->previous->next = le->next;
 
290
                        if (le->next != NULL)
 
291
                                le->next->previous = le->previous;
 
292
                        if (res->buckets[bucket] == le)
 
293
                                res->buckets[bucket] = le->next;
 
294
                        res->number_entries--;
 
295
                        /* Defer freeing this entry. */
 
296
                        res->spare = le;
 
297
                        return (le);
 
298
                }
 
299
        }
 
300
        return (NULL);
 
301
}
 
302
 
 
303
static struct links_entry *
 
304
next_entry(struct archive_entry_linkresolver *res)
 
305
{
 
306
        struct links_entry      *le;
 
307
        size_t                   bucket;
 
308
 
 
309
        /* Free a held entry. */
 
310
        if (res->spare != NULL) {
 
311
                archive_entry_free(res->spare->canonical);
 
312
                free(res->spare);
 
313
                res->spare = NULL;
 
314
        }
 
315
 
 
316
        /* If the links cache overflowed and got flushed, don't bother. */
 
317
        if (res->buckets == NULL)
 
318
                return (NULL);
 
319
 
 
320
        /* Look for next non-empty bucket in the links cache. */
 
321
        for (bucket = 0; bucket < res->number_buckets; bucket++) {
 
322
                le = res->buckets[bucket];
 
323
                if (le != NULL) {
 
324
                        /* Remove it from this hash bucket. */
 
325
                        if (le->next != NULL)
 
326
                                le->next->previous = le->previous;
 
327
                        res->buckets[bucket] = le->next;
 
328
                        res->number_entries--;
 
329
                        /* Defer freeing this entry. */
 
330
                        res->spare = le;
 
331
                        return (le);
 
332
                }
 
333
        }
 
334
        return (NULL);
 
335
}
 
336
 
 
337
static struct links_entry *
 
338
insert_entry(struct archive_entry_linkresolver *res,
 
339
    struct archive_entry *entry)
 
340
{
 
341
        struct links_entry *le;
 
342
        int                      hash, bucket;
 
343
 
 
344
        /* Add this entry to the links cache. */
 
345
        le = malloc(sizeof(struct links_entry));
 
346
        if (le == NULL)
 
347
                return (NULL);
 
348
        memset(le, 0, sizeof(*le));
 
349
        le->canonical = archive_entry_clone(entry);
 
350
 
 
351
        /* If the links cache is getting too full, enlarge the hash table. */
 
352
        if (res->number_entries > res->number_buckets * 2)
 
353
                grow_hash(res);
 
354
 
 
355
        hash = archive_entry_dev(entry) ^ archive_entry_ino64(entry);
 
356
        bucket = hash % res->number_buckets;
 
357
 
 
358
        /* If we could allocate the entry, record it. */
 
359
        if (res->buckets[bucket] != NULL)
 
360
                res->buckets[bucket]->previous = le;
 
361
        res->number_entries++;
 
362
        le->next = res->buckets[bucket];
 
363
        le->previous = NULL;
 
364
        res->buckets[bucket] = le;
 
365
        le->hash = hash;
 
366
        le->links = archive_entry_nlink(entry) - 1;
 
367
        return (le);
 
368
}
 
369
 
 
370
static void
 
371
grow_hash(struct archive_entry_linkresolver *res)
 
372
{
 
373
        struct links_entry *le, **new_buckets;
 
374
        size_t new_size;
 
375
        size_t i, bucket;
 
376
 
 
377
        /* Try to enlarge the bucket list. */
 
378
        new_size = res->number_buckets * 2;
 
379
        new_buckets = malloc(new_size * sizeof(struct links_entry *));
 
380
 
 
381
        if (new_buckets != NULL) {
 
382
                memset(new_buckets, 0,
 
383
                    new_size * sizeof(struct links_entry *));
 
384
                for (i = 0; i < res->number_buckets; i++) {
 
385
                        while (res->buckets[i] != NULL) {
 
386
                                /* Remove entry from old bucket. */
 
387
                                le = res->buckets[i];
 
388
                                res->buckets[i] = le->next;
 
389
 
 
390
                                /* Add entry to new bucket. */
 
391
                                bucket = le->hash % new_size;
 
392
 
 
393
                                if (new_buckets[bucket] != NULL)
 
394
                                        new_buckets[bucket]->previous =
 
395
                                            le;
 
396
                                le->next = new_buckets[bucket];
 
397
                                le->previous = NULL;
 
398
                                new_buckets[bucket] = le;
 
399
                        }
 
400
                }
 
401
                free(res->buckets);
 
402
                res->buckets = new_buckets;
 
403
                res->number_buckets = new_size;
 
404
        }
 
405
}