~akopytov/percona-xtrabackup/bug1166888-2.1

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_read_open_filename.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: (391.1.5 staging)
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2003-2007 Tim Kientzle
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
#include "archive_platform.h"
 
27
__FBSDID("$FreeBSD: head/lib/libarchive/archive_read_open_filename.c 201093 2009-12-28 02:28:44Z kientzle $");
 
28
 
 
29
#ifdef HAVE_SYS_STAT_H
 
30
#include <sys/stat.h>
 
31
#endif
 
32
#ifdef HAVE_ERRNO_H
 
33
#include <errno.h>
 
34
#endif
 
35
#ifdef HAVE_FCNTL_H
 
36
#include <fcntl.h>
 
37
#endif
 
38
#ifdef HAVE_IO_H
 
39
#include <io.h>
 
40
#endif
 
41
#ifdef HAVE_STDLIB_H
 
42
#include <stdlib.h>
 
43
#endif
 
44
#ifdef HAVE_STRING_H
 
45
#include <string.h>
 
46
#endif
 
47
#ifdef HAVE_UNISTD_H
 
48
#include <unistd.h>
 
49
#endif
 
50
 
 
51
#include "archive.h"
 
52
 
 
53
#ifndef O_BINARY
 
54
#define O_BINARY 0
 
55
#endif
 
56
 
 
57
struct read_file_data {
 
58
        int      fd;
 
59
        size_t   block_size;
 
60
        void    *buffer;
 
61
        mode_t   st_mode;  /* Mode bits for opened file. */
 
62
        char     can_skip; /* This file supports skipping. */
 
63
        char     filename[1]; /* Must be last! */
 
64
};
 
65
 
 
66
static int      file_close(struct archive *, void *);
 
67
static ssize_t  file_read(struct archive *, void *, const void **buff);
 
68
#if ARCHIVE_API_VERSION < 2
 
69
static ssize_t  file_skip(struct archive *, void *, size_t request);
 
70
#else
 
71
static off_t    file_skip(struct archive *, void *, off_t request);
 
72
#endif
 
73
 
 
74
int
 
75
archive_read_open_file(struct archive *a, const char *filename,
 
76
    size_t block_size)
 
77
{
 
78
        return (archive_read_open_filename(a, filename, block_size));
 
79
}
 
80
 
 
81
int
 
82
archive_read_open_filename(struct archive *a, const char *filename,
 
83
    size_t block_size)
 
84
{
 
85
        struct stat st;
 
86
        struct read_file_data *mine;
 
87
        void *b;
 
88
        int fd;
 
89
 
 
90
        archive_clear_error(a);
 
91
        if (filename == NULL || filename[0] == '\0') {
 
92
                /* We used to invoke archive_read_open_fd(a,0,block_size)
 
93
                 * here, but that doesn't (and shouldn't) handle the
 
94
                 * end-of-file flush when reading stdout from a pipe.
 
95
                 * Basically, read_open_fd() is intended for folks who
 
96
                 * are willing to handle such details themselves.  This
 
97
                 * API is intended to be a little smarter for folks who
 
98
                 * want easy handling of the common case.
 
99
                 */
 
100
                filename = ""; /* Normalize NULL to "" */
 
101
                fd = 0;
 
102
#if defined(__CYGWIN__) || defined(_WIN32)
 
103
                setmode(0, O_BINARY);
 
104
#endif
 
105
        } else {
 
106
                fd = open(filename, O_RDONLY | O_BINARY);
 
107
                if (fd < 0) {
 
108
                        archive_set_error(a, errno,
 
109
                            "Failed to open '%s'", filename);
 
110
                        return (ARCHIVE_FATAL);
 
111
                }
 
112
        }
 
113
        if (fstat(fd, &st) != 0) {
 
114
                archive_set_error(a, errno, "Can't stat '%s'", filename);
 
115
                return (ARCHIVE_FATAL);
 
116
        }
 
117
 
 
118
        mine = (struct read_file_data *)calloc(1,
 
119
            sizeof(*mine) + strlen(filename));
 
120
        b = malloc(block_size);
 
121
        if (mine == NULL || b == NULL) {
 
122
                archive_set_error(a, ENOMEM, "No memory");
 
123
                free(mine);
 
124
                free(b);
 
125
                return (ARCHIVE_FATAL);
 
126
        }
 
127
        strcpy(mine->filename, filename);
 
128
        mine->block_size = block_size;
 
129
        mine->buffer = b;
 
130
        mine->fd = fd;
 
131
        /* Remember mode so close can decide whether to flush. */
 
132
        mine->st_mode = st.st_mode;
 
133
        /* If we're reading a file from disk, ensure that we don't
 
134
           overwrite it with an extracted file. */
 
135
        if (S_ISREG(st.st_mode)) {
 
136
                archive_read_extract_set_skip_file(a, st.st_dev, st.st_ino);
 
137
                /*
 
138
                 * Enabling skip here is a performance optimization
 
139
                 * for anything that supports lseek().  On FreeBSD
 
140
                 * (and probably many other systems), only regular
 
141
                 * files and raw disk devices support lseek() (on
 
142
                 * other input types, lseek() returns success but
 
143
                 * doesn't actually change the file pointer, which
 
144
                 * just completely screws up the position-tracking
 
145
                 * logic).  In addition, I've yet to find a portable
 
146
                 * way to determine if a device is a raw disk device.
 
147
                 * So I don't see a way to do much better than to only
 
148
                 * enable this optimization for regular files.
 
149
                 */
 
150
                mine->can_skip = 1;
 
151
        }
 
152
        return (archive_read_open2(a, mine,
 
153
                NULL, file_read, file_skip, file_close));
 
154
}
 
155
 
 
156
static ssize_t
 
157
file_read(struct archive *a, void *client_data, const void **buff)
 
158
{
 
159
        struct read_file_data *mine = (struct read_file_data *)client_data;
 
160
        ssize_t bytes_read;
 
161
 
 
162
        *buff = mine->buffer;
 
163
        for (;;) {
 
164
                bytes_read = read(mine->fd, mine->buffer, mine->block_size);
 
165
                if (bytes_read < 0) {
 
166
                        if (errno == EINTR)
 
167
                                continue;
 
168
                        else if (mine->filename[0] == '\0')
 
169
                                archive_set_error(a, errno, "Error reading stdin");
 
170
                        else
 
171
                                archive_set_error(a, errno, "Error reading '%s'",
 
172
                                    mine->filename);
 
173
                }
 
174
                return (bytes_read);
 
175
        }
 
176
}
 
177
 
 
178
#if ARCHIVE_API_VERSION < 2
 
179
static ssize_t
 
180
file_skip(struct archive *a, void *client_data, size_t request)
 
181
#else
 
182
static off_t
 
183
file_skip(struct archive *a, void *client_data, off_t request)
 
184
#endif
 
185
{
 
186
        struct read_file_data *mine = (struct read_file_data *)client_data;
 
187
        off_t old_offset, new_offset;
 
188
 
 
189
        if (!mine->can_skip) /* We can't skip, so ... */
 
190
                return (0); /* ... skip zero bytes. */
 
191
 
 
192
        /* Reduce request to the next smallest multiple of block_size */
 
193
        request = (request / mine->block_size) * mine->block_size;
 
194
        if (request == 0)
 
195
                return (0);
 
196
 
 
197
        /*
 
198
         * Hurray for lazy evaluation: if the first lseek fails, the second
 
199
         * one will not be executed.
 
200
         */
 
201
        if (((old_offset = lseek(mine->fd, 0, SEEK_CUR)) < 0) ||
 
202
            ((new_offset = lseek(mine->fd, request, SEEK_CUR)) < 0))
 
203
        {
 
204
                /* If skip failed once, it will probably fail again. */
 
205
                mine->can_skip = 0;
 
206
 
 
207
                if (errno == ESPIPE)
 
208
                {
 
209
                        /*
 
210
                         * Failure to lseek() can be caused by the file
 
211
                         * descriptor pointing to a pipe, socket or FIFO.
 
212
                         * Return 0 here, so the compression layer will use
 
213
                         * read()s instead to advance the file descriptor.
 
214
                         * It's slower of course, but works as well.
 
215
                         */
 
216
                        return (0);
 
217
                }
 
218
                /*
 
219
                 * There's been an error other than ESPIPE. This is most
 
220
                 * likely caused by a programmer error (too large request)
 
221
                 * or a corrupted archive file.
 
222
                 */
 
223
                if (mine->filename[0] == '\0')
 
224
                        /*
 
225
                         * Should never get here, since lseek() on stdin ought
 
226
                         * to return an ESPIPE error.
 
227
                         */
 
228
                        archive_set_error(a, errno, "Error seeking in stdin");
 
229
                else
 
230
                        archive_set_error(a, errno, "Error seeking in '%s'",
 
231
                            mine->filename);
 
232
                return (-1);
 
233
        }
 
234
        return (new_offset - old_offset);
 
235
}
 
236
 
 
237
static int
 
238
file_close(struct archive *a, void *client_data)
 
239
{
 
240
        struct read_file_data *mine = (struct read_file_data *)client_data;
 
241
 
 
242
        (void)a; /* UNUSED */
 
243
 
 
244
        /* Only flush and close if open succeeded. */
 
245
        if (mine->fd >= 0) {
 
246
                /*
 
247
                 * Sometimes, we should flush the input before closing.
 
248
                 *   Regular files: faster to just close without flush.
 
249
                 *   Devices: must not flush (user might need to
 
250
                 *      read the "next" item on a non-rewind device).
 
251
                 *   Pipes and sockets:  must flush (otherwise, the
 
252
                 *      program feeding the pipe or socket may complain).
 
253
                 * Here, I flush everything except for regular files and
 
254
                 * device nodes.
 
255
                 */
 
256
                if (!S_ISREG(mine->st_mode)
 
257
                    && !S_ISCHR(mine->st_mode)
 
258
                    && !S_ISBLK(mine->st_mode)) {
 
259
                        ssize_t bytesRead;
 
260
                        do {
 
261
                                bytesRead = read(mine->fd, mine->buffer,
 
262
                                    mine->block_size);
 
263
                        } while (bytesRead > 0);
 
264
                }
 
265
                /* If a named file was opened, then it needs to be closed. */
 
266
                if (mine->filename[0] != '\0')
 
267
                        close(mine->fd);
 
268
        }
 
269
        free(mine->buffer);
 
270
        free(mine);
 
271
        return (ARCHIVE_OK);
 
272
}