~akopytov/percona-xtrabackup/bug1166888-2.1

« back to all changes in this revision

Viewing changes to src/xbstream_write.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: (391.1.5 staging)
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/******************************************************
 
2
Copyright (c) 2011 Percona Inc.
 
3
 
 
4
The xbstream format writer implementation.
 
5
 
 
6
This program is free software; you can redistribute it and/or modify
 
7
it under the terms of the GNU General Public License as published by
 
8
the Free Software Foundation; version 2 of the License.
 
9
 
 
10
This program is distributed in the hope that it will be useful,
 
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
GNU General Public License for more details.
 
14
 
 
15
You should have received a copy of the GNU General Public License
 
16
along with this program; if not, write to the Free Software
 
17
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 
 
19
*******************************************************/
 
20
 
 
21
#include <my_base.h>
 
22
#include <zlib.h>
 
23
#include "common.h"
 
24
#include "xbstream.h"
 
25
 
 
26
/* Group writes smaller than this into a single chunk */
 
27
#define XB_STREAM_MIN_CHUNK_SIZE (64 * 1024)
 
28
 
 
29
struct xb_wstream_struct {
 
30
        pthread_mutex_t mutex;
 
31
        File            fd;
 
32
};
 
33
 
 
34
struct xb_wstream_file_struct {
 
35
        xb_wstream_t    *stream;
 
36
        char            *path;
 
37
        ulong           path_len;
 
38
        char            chunk[XB_STREAM_MIN_CHUNK_SIZE];
 
39
        char            *chunk_ptr;
 
40
        size_t          chunk_free;
 
41
        my_off_t        offset;
 
42
};
 
43
 
 
44
static int xb_stream_flush(xb_wstream_file_t *file);
 
45
static int xb_stream_write_chunk(xb_wstream_file_t *file,
 
46
                                 const void *buf, size_t len);
 
47
static int xb_stream_write_eof(xb_wstream_file_t *file);
 
48
 
 
49
xb_wstream_t *
 
50
xb_stream_write_new(void)
 
51
{
 
52
        xb_wstream_t    *stream;
 
53
 
 
54
        stream = (xb_wstream_t *) my_malloc(sizeof(xb_wstream_t), MYF(MY_FAE));
 
55
        pthread_mutex_init(&stream->mutex, NULL);
 
56
        stream->fd = fileno(stdout);
 
57
 
 
58
#ifdef __WIN__
 
59
        setmode(stream->fd, _O_BINARY);
 
60
#endif
 
61
 
 
62
        return stream;;
 
63
}
 
64
 
 
65
xb_wstream_file_t *
 
66
xb_stream_write_open(xb_wstream_t *stream, const char *path,
 
67
                     MY_STAT *mystat __attribute__((unused)))
 
68
{
 
69
        xb_wstream_file_t       *file;
 
70
        ulong                   path_len;
 
71
 
 
72
        path_len = strlen(path);
 
73
 
 
74
        if (path_len > FN_REFLEN) {
 
75
                msg("xb_stream_write_open(): file path is too long.\n");
 
76
                return NULL;
 
77
        }
 
78
 
 
79
        file = (xb_wstream_file_t *) my_malloc(sizeof(xb_wstream_file_t) +
 
80
                                               path_len + 1, MYF(MY_FAE));
 
81
 
 
82
        file->path = (char *) (file + 1);
 
83
        memcpy(file->path, path, path_len + 1);
 
84
        file->path_len = path_len;
 
85
 
 
86
        file->stream = stream;
 
87
        file->offset = 0;
 
88
        file->chunk_ptr = file->chunk;
 
89
        file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
 
90
 
 
91
        return file;
 
92
}
 
93
 
 
94
int
 
95
xb_stream_write_data(xb_wstream_file_t *file, const void *buf, size_t len)
 
96
{
 
97
        if (len < file->chunk_free) {
 
98
                memcpy(file->chunk_ptr, buf, len);
 
99
                file->chunk_ptr += len;
 
100
                file->chunk_free -= len;
 
101
 
 
102
                return 0;
 
103
        }
 
104
 
 
105
        if (xb_stream_flush(file))
 
106
                return 1;
 
107
 
 
108
        return xb_stream_write_chunk(file, buf, len);
 
109
}
 
110
 
 
111
int
 
112
xb_stream_write_close(xb_wstream_file_t *file)
 
113
{
 
114
        if (xb_stream_flush(file) ||
 
115
            xb_stream_write_eof(file)) {
 
116
                MY_FREE(file);
 
117
                return 1;
 
118
        }
 
119
 
 
120
        MY_FREE(file);
 
121
 
 
122
        return 0;
 
123
}
 
124
 
 
125
int
 
126
xb_stream_write_done(xb_wstream_t *stream)
 
127
{
 
128
        pthread_mutex_destroy(&stream->mutex);
 
129
 
 
130
        MY_FREE(stream);
 
131
 
 
132
        return 0;
 
133
}
 
134
 
 
135
static
 
136
int
 
137
xb_stream_flush(xb_wstream_file_t *file)
 
138
{
 
139
        if (file->chunk_ptr == file->chunk) {
 
140
                return 0;
 
141
        }
 
142
 
 
143
        if (xb_stream_write_chunk(file, file->chunk,
 
144
                                  file->chunk_ptr - file->chunk)) {
 
145
                return 1;
 
146
        }
 
147
 
 
148
        file->chunk_ptr = file->chunk;
 
149
        file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
 
150
 
 
151
        return 0;
 
152
}
 
153
 
 
154
#define F_WRITE(ptr,size)                                               \
 
155
        do {                                                            \
 
156
                if (my_write(fd, ptr, size, MYF(MY_WME | MY_NABP)))     \
 
157
                        goto err;                                       \
 
158
        } while (0)
 
159
 
 
160
static
 
161
int
 
162
xb_stream_write_chunk(xb_wstream_file_t *file, const void *buf, size_t len)
 
163
{
 
164
        /* Chunk magic + flags + chunk type + path_len + path + len + offset +
 
165
        checksum */
 
166
        uchar           tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
 
167
                               FN_REFLEN + 8 + 8 + 4];
 
168
        uchar           *ptr;
 
169
        xb_wstream_t    *stream = file->stream;
 
170
        File            fd = stream->fd;
 
171
        ulong           checksum;
 
172
 
 
173
        /* Write xbstream header */
 
174
        ptr = tmpbuf;
 
175
 
 
176
        /* Chunk magic */
 
177
        memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
 
178
        ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
 
179
 
 
180
        *ptr++ = 0;                              /* Chunk flags */
 
181
 
 
182
        *ptr++ = (uchar) XB_CHUNK_TYPE_PAYLOAD;  /* Chunk type */
 
183
 
 
184
        int4store(ptr, file->path_len);          /* Path length */
 
185
        ptr += 4;
 
186
 
 
187
        memcpy(ptr, file->path, file->path_len); /* Path */
 
188
        ptr += file->path_len;
 
189
 
 
190
        int8store(ptr, len);                     /* Payload length */
 
191
        ptr += 8;
 
192
 
 
193
        pthread_mutex_lock(&stream->mutex);
 
194
 
 
195
        int8store(ptr, file->offset);            /* Payload offset */
 
196
        ptr += 8;
 
197
 
 
198
        checksum = crc32(0, buf, len);           /* checksum */
 
199
        int4store(ptr, checksum);
 
200
        ptr += 4;
 
201
 
 
202
        xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
 
203
 
 
204
        F_WRITE(tmpbuf, ptr - tmpbuf);
 
205
 
 
206
        F_WRITE(buf, len);                       /* Payload */
 
207
 
 
208
        file->offset+= len;
 
209
 
 
210
        pthread_mutex_unlock(&stream->mutex);
 
211
 
 
212
        return 0;
 
213
 
 
214
err:
 
215
 
 
216
        pthread_mutex_unlock(&stream->mutex);
 
217
 
 
218
        return 1;
 
219
}
 
220
 
 
221
static
 
222
int
 
223
xb_stream_write_eof(xb_wstream_file_t *file)
 
224
{
 
225
        /* Chunk magic + flags + chunk type + path_len + path */
 
226
        uchar           tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
 
227
                               FN_REFLEN];
 
228
        uchar           *ptr;
 
229
        xb_wstream_t    *stream = file->stream;
 
230
        File            fd = stream->fd;
 
231
 
 
232
        pthread_mutex_lock(&stream->mutex);
 
233
 
 
234
        /* Write xbstream header */
 
235
        ptr = tmpbuf;
 
236
 
 
237
        /* Chunk magic */
 
238
        memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
 
239
        ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
 
240
 
 
241
        *ptr++ = 0;                              /* Chunk flags */
 
242
 
 
243
        *ptr++ = (uchar) XB_CHUNK_TYPE_EOF;      /* Chunk type */
 
244
 
 
245
        int4store(ptr, file->path_len);          /* Path length */
 
246
        ptr += 4;
 
247
 
 
248
        memcpy(ptr, file->path, file->path_len); /* Path */
 
249
        ptr += file->path_len;
 
250
 
 
251
        xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
 
252
 
 
253
        F_WRITE(tmpbuf, (ulonglong) (ptr - tmpbuf));;
 
254
 
 
255
        pthread_mutex_unlock(&stream->mutex);
 
256
 
 
257
        return 0;
 
258
err:
 
259
 
 
260
        pthread_mutex_unlock(&stream->mutex);
 
261
 
 
262
        return 1;
 
263
}