~akopytov/percona-xtrabackup/bug1166888-2.1

« back to all changes in this revision

Viewing changes to src/compress.c

  • Committer: Alexey Kopytov
  • Date: 2012-02-10 20:05:56 UTC
  • mto: (391.1.5 staging)
  • mto: This revision was merged to the branch mainline in revision 390.
  • Revision ID: akopytov@gmail.com-20120210200556-6kx41z8wwrqfucro
Rebase of the parallel compression patch on new trunk + post-review
fixes.

Implementation of parallel compression and streaming for XtraBackup.

This revision implements the following changes:

* InnoDB files are now streamed by the xtrabackup binary rather than
innobackupex. As a result, integrity is now verified by xtrabackup and
thus tar4ibd is no longer needed, so it was removed.

* xtrabackup binary now accepts the new '--stream' option which has
exactly the same semantics as the '--stream' option in
innobackupex: it tells xtrabackup to stream all files to the standard
output in the specified format rather than storing them locally.

* The xtrabackup binary can now do parallel compression using the
quicklz library. Two new options were added to xtrabackup to support
this feature:

- '--compress' tells xtrabackup to compress all output data, including
the transaction log file and meta data files, using the specified
compression algorithm. The only currently supported algorithm is
'quicklz'. The resulting files have the qpress archive format,
i.e. every *.qp file produced by xtrabackup is essentially a one-file
qpress archive and can be extracted and uncompressed by the qpress
file archiver (http://www.quicklz.com/).

- '--compress-threads' specifies the number of worker threads used by
xtrabackup for parallel data compression. This option defaults to 1.

Parallel compression ('--compress-threads') can be used together with
parallel file copying ('--parallel'). For example, '--parallel=4
--compress --compress-threads=2' will create 4 IO threads that will
read the data and pipe it to 2 compression threads.

* To support simultaneous compression and streaming, a new custom
streaming format called 'xbstream' was introduced to XtraBackup in
addition to the 'tar' format. That was required to overcome some
limitations of traditional archive formats such as 'tar', 'cpio' and
others that do not allow streaming dynamically generated files, for
example dynamically compressed files.  Other advantages of xbstream over
traditional streaming/archive formats include ability to stream multiple
files concurrently (so it is possible to use streaming in the xbstream
format together with the --parallel option) and more compact data
storage.

* To allow streaming and extracting files to/from the xbstream format
produced by xtrabackup, a new utility aptly called 'xbstream' was
added to the XtraBackup distribution. This utility has a tar-like
interface:

- with the '-x' option it extracts files from the stream read from its
standard input to the current directory unless specified otherwise
with the '-C' option.

- with the '-c' option it streams files specified on the command line
to its standard output.

The utility also tries to minimize its impact on the OS page cache by
using the appropriate posix_fadvise() calls when available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/******************************************************
 
2
Copyright (c) 2011 Percona Inc.
 
3
 
 
4
Compressing datasink implementation for XtraBackup.
 
5
 
 
6
This program is free software; you can redistribute it and/or modify
 
7
it under the terms of the GNU General Public License as published by
 
8
the Free Software Foundation; version 2 of the License.
 
9
 
 
10
This program is distributed in the hope that it will be useful,
 
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
GNU General Public License for more details.
 
14
 
 
15
You should have received a copy of the GNU General Public License
 
16
along with this program; if not, write to the Free Software
 
17
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 
 
19
*******************************************************/
 
20
 
 
21
 
 
22
#include <my_base.h>
 
23
#include <quicklz.h>
 
24
#include <univ.i>
 
25
#include <zlib.h>
 
26
#include "common.h"
 
27
#include "datasink.h"
 
28
#include "stream.h"
 
29
#include "local.h"
 
30
 
 
31
#define COMPRESS_CHUNK_SIZE (64 * 1024UL)
 
32
#define MY_QLZ_COMPRESS_OVERHEAD 400
 
33
 
 
34
typedef struct {
 
35
        pthread_t               id;
 
36
        uint                    num;
 
37
        pthread_mutex_t         ctrl_mutex;
 
38
        pthread_cond_t          ctrl_cond;
 
39
        pthread_mutex_t         data_mutex;
 
40
        pthread_cond_t          data_cond;
 
41
        my_bool                 started;
 
42
        my_bool                 data_avail;
 
43
        my_bool                 cancelled;
 
44
        const char              *from;
 
45
        size_t                  from_len;
 
46
        char                    *to;
 
47
        size_t                  to_len;
 
48
        qlz_state_compress      state;
 
49
        ulong                   adler;
 
50
} comp_thread_ctxt_t;
 
51
 
 
52
typedef struct {
 
53
        ds_ctxt_t               *dest_ctxt;
 
54
        comp_thread_ctxt_t      *threads;
 
55
        uint                    nthreads;
 
56
} ds_compress_ctxt_t;
 
57
 
 
58
typedef struct {
 
59
        datasink_t              *dest_ds;
 
60
        ds_file_t               *dest_file;
 
61
        ds_compress_ctxt_t      *comp_ctxt;
 
62
        size_t                  bytes_processed;
 
63
} ds_compress_file_t;
 
64
 
 
65
extern ibool    xtrabackup_stream;
 
66
extern uint     xtrabackup_parallel;
 
67
extern ibool    xtrabackup_compress_threads;
 
68
 
 
69
static ds_ctxt_t *compress_init(const char *root);
 
70
static ds_file_t *compress_open(ds_ctxt_t *ctxt, const char *path,
 
71
                                MY_STAT *mystat);
 
72
static int compress_write(ds_file_t *file, const void *buf, size_t len);
 
73
static int compress_close(ds_file_t *file);
 
74
static void compress_deinit(ds_ctxt_t *ctxt);
 
75
 
 
76
datasink_t datasink_compress = {
 
77
        &compress_init,
 
78
        &compress_open,
 
79
        &compress_write,
 
80
        &compress_close,
 
81
        &compress_deinit
 
82
};
 
83
 
 
84
static inline int write_uint32_le(datasink_t *sink, ds_file_t *file,
 
85
                                  ulong n);
 
86
static inline int write_uint64_le(datasink_t *sink, ds_file_t *file,
 
87
                                  ulonglong n);
 
88
 
 
89
static comp_thread_ctxt_t *create_worker_threads(uint n);
 
90
static void destroy_worker_threads(comp_thread_ctxt_t *threads, uint n);
 
91
static void *compress_worker_thread_func(void *arg);
 
92
 
 
93
static
 
94
ds_ctxt_t *
 
95
compress_init(const char *root)
 
96
{
 
97
        ds_ctxt_t               *ctxt;
 
98
        ds_compress_ctxt_t      *compress_ctxt;
 
99
        datasink_t              *dest_ds;
 
100
        ds_ctxt_t               *dest_ctxt;
 
101
        comp_thread_ctxt_t      *threads;
 
102
 
 
103
        /* Decide whether the compressed data will be stored in local files or
 
104
        streamed to an archive */
 
105
        dest_ds = xtrabackup_stream ? &datasink_stream : &datasink_local;
 
106
 
 
107
        dest_ctxt = dest_ds->init(root);
 
108
        if (dest_ctxt == NULL) {
 
109
                msg("compress: failed to initialize the target datasink.\n");
 
110
                return NULL;
 
111
        }
 
112
 
 
113
        /* Create and initialize the worker threads */
 
114
        threads = create_worker_threads(xtrabackup_compress_threads);
 
115
        if (threads == NULL) {
 
116
                msg("compress: failed to create worker threads.\n");
 
117
                dest_ds->deinit(dest_ctxt);
 
118
                return NULL;
 
119
        }
 
120
 
 
121
        ctxt = (ds_ctxt_t *) my_malloc(sizeof(ds_ctxt_t) +
 
122
                                       sizeof(ds_compress_ctxt_t),
 
123
                                       MYF(MY_FAE));
 
124
 
 
125
        compress_ctxt = (ds_compress_ctxt_t *) (ctxt + 1);
 
126
        compress_ctxt->dest_ctxt = dest_ctxt;
 
127
        compress_ctxt->threads = threads;
 
128
        compress_ctxt->nthreads = xtrabackup_compress_threads;
 
129
 
 
130
        ctxt->datasink = &datasink_compress;
 
131
        ctxt->ptr = compress_ctxt;
 
132
 
 
133
        return ctxt;
 
134
}
 
135
 
 
136
static
 
137
ds_file_t *
 
138
compress_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
 
139
{
 
140
        ds_compress_ctxt_t      *comp_ctxt;
 
141
        datasink_t              *dest_ds;
 
142
        ds_ctxt_t               *dest_ctxt;
 
143
        ds_file_t               *dest_file;
 
144
        char                    new_name[FN_REFLEN];
 
145
        size_t                  name_len;
 
146
        ds_file_t               *file;
 
147
        ds_compress_file_t      *comp_file;
 
148
 
 
149
        comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;
 
150
        dest_ctxt = comp_ctxt->dest_ctxt;
 
151
        dest_ds = dest_ctxt->datasink;
 
152
 
 
153
        /* Append the .qp extension to the filename */
 
154
        fn_format(new_name, path, "", ".qp", MYF(MY_APPEND_EXT));
 
155
 
 
156
        dest_file = dest_ds->open(dest_ctxt, new_name, mystat);
 
157
        if (dest_file == NULL) {
 
158
                return NULL;
 
159
        }
 
160
 
 
161
        /* Write the qpress archive header */
 
162
        if (dest_ds->write(dest_file, "qpress10", 8) ||
 
163
            write_uint64_le(dest_ds, dest_file, COMPRESS_CHUNK_SIZE)) {
 
164
                goto err;
 
165
        }
 
166
 
 
167
        /* We are going to create a one-file "flat" (i.e. with no
 
168
        subdirectories) archive. So strip the directory part from the path and
 
169
        remove the '.qp' suffix. */
 
170
        fn_format(new_name, path, "", "", MYF(MY_REPLACE_DIR));
 
171
 
 
172
        /* Write the qpress file header */
 
173
        name_len = strlen(new_name);
 
174
        if (dest_ds->write(dest_file, "F", 1) ||
 
175
            write_uint32_le(dest_ds, dest_file, name_len) ||
 
176
            /* we want to write the terminating \0 as well */
 
177
            dest_ds->write(dest_file, new_name, name_len + 1)) {
 
178
                goto err;
 
179
        }
 
180
 
 
181
        file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
 
182
                                       sizeof(ds_compress_file_t),
 
183
                                       MYF(MY_FAE));
 
184
        comp_file = (ds_compress_file_t *) (file + 1);
 
185
        comp_file->dest_file = dest_file;
 
186
        comp_file->dest_ds = dest_ds;
 
187
        comp_file->comp_ctxt = comp_ctxt;
 
188
        comp_file->bytes_processed = 0;
 
189
 
 
190
        file->ptr = comp_file;
 
191
        file->path = dest_file->path;
 
192
 
 
193
        return file;
 
194
 
 
195
err:
 
196
        dest_ds->close(dest_file);
 
197
        return NULL;
 
198
}
 
199
 
 
200
static
 
201
int
 
202
compress_write(ds_file_t *file, const void *buf, size_t len)
 
203
{
 
204
        ds_compress_file_t      *comp_file;
 
205
        ds_compress_ctxt_t      *comp_ctxt;
 
206
        comp_thread_ctxt_t      *threads;
 
207
        comp_thread_ctxt_t      *thd;
 
208
        uint                    nthreads;
 
209
        uint                    i;
 
210
        const char              *ptr;
 
211
        datasink_t              *dest_ds;
 
212
        ds_file_t               *dest_file;
 
213
 
 
214
        comp_file = (ds_compress_file_t *) file->ptr;
 
215
        comp_ctxt = comp_file->comp_ctxt;
 
216
        dest_ds = comp_file->dest_ds;
 
217
        dest_file = comp_file->dest_file;
 
218
 
 
219
        threads = comp_ctxt->threads;
 
220
        nthreads = comp_ctxt->nthreads;
 
221
 
 
222
        ptr = (const char *) buf;
 
223
        while (len > 0) {
 
224
                uint max_thread;
 
225
 
 
226
                /* Send data to worker threads for compression */
 
227
                for (i = 0; i < nthreads; i++) {
 
228
                        size_t chunk_len;
 
229
 
 
230
                        thd = threads + i;
 
231
 
 
232
                        pthread_mutex_lock(&thd->ctrl_mutex);
 
233
 
 
234
                        chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
 
235
                                COMPRESS_CHUNK_SIZE : len;
 
236
                        thd->from = ptr;
 
237
                        thd->from_len = chunk_len;
 
238
 
 
239
                        pthread_mutex_lock(&thd->data_mutex);
 
240
                        thd->data_avail = TRUE;
 
241
                        pthread_cond_signal(&thd->data_cond);
 
242
                        pthread_mutex_unlock(&thd->data_mutex);
 
243
 
 
244
                        len -= chunk_len;
 
245
                        if (len == 0) {
 
246
                                break;
 
247
                        }
 
248
                        ptr += chunk_len;
 
249
                }
 
250
 
 
251
                max_thread = (i < nthreads) ? i :  nthreads - 1;
 
252
 
 
253
                /* Reap and stream the compressed data */
 
254
                for (i = 0; i <= max_thread; i++) {
 
255
                        thd = threads + i;
 
256
 
 
257
                        pthread_mutex_lock(&thd->data_mutex);
 
258
                        while (thd->data_avail == TRUE) {
 
259
                                pthread_cond_wait(&thd->data_cond,
 
260
                                                  &thd->data_mutex);
 
261
                        }
 
262
 
 
263
                        ut_a(threads[i].to_len > 0);
 
264
 
 
265
                        if (dest_ds->write(dest_file, "NEWBNEWB", 8) ||
 
266
                            write_uint64_le(dest_ds, dest_file,
 
267
                                            comp_file->bytes_processed)) {
 
268
                                msg("compress: write to the destination stream "
 
269
                                    "failed.\n");
 
270
                                return 1;
 
271
                        }
 
272
 
 
273
                        comp_file->bytes_processed += threads[i].from_len;
 
274
 
 
275
                        if (write_uint32_le(dest_ds, dest_file,
 
276
                                            threads[i].adler) ||
 
277
                            dest_ds->write(dest_file, threads[i].to,
 
278
                                           threads[i].to_len)) {
 
279
                                msg("compress: write to the destination stream "
 
280
                                    "failed.\n");
 
281
                                return 1;
 
282
                        }
 
283
 
 
284
                        pthread_mutex_unlock(&threads[i].data_mutex);
 
285
                        pthread_mutex_unlock(&threads[i].ctrl_mutex);
 
286
                }
 
287
        }
 
288
 
 
289
        return 0;
 
290
}
 
291
 
 
292
static
 
293
int
 
294
compress_close(ds_file_t *file)
 
295
{
 
296
        ds_compress_file_t      *comp_file;
 
297
        datasink_t              *dest_ds;
 
298
        ds_file_t               *dest_file;
 
299
 
 
300
        comp_file = (ds_compress_file_t *) file->ptr;
 
301
        dest_ds = comp_file->dest_ds;
 
302
        dest_file = comp_file->dest_file;
 
303
 
 
304
        /* Write the qpress file trailer */
 
305
        dest_ds->write(dest_file, "ENDSENDS", 8);
 
306
 
 
307
        /* Supposedly the number of written bytes should be written as a
 
308
        "recovery information" in the file trailer, but in reality qpress
 
309
        always writes 8 zeros here. Let's do the same */
 
310
 
 
311
        write_uint64_le(dest_ds, dest_file, 0);
 
312
 
 
313
        dest_ds->close(dest_file);
 
314
 
 
315
        MY_FREE(file);
 
316
 
 
317
        return 0;
 
318
}
 
319
 
 
320
static
 
321
void
 
322
compress_deinit(ds_ctxt_t *ctxt)
 
323
{
 
324
        ds_compress_ctxt_t      *comp_ctxt;
 
325
        ds_ctxt_t               *dest_ctxt;
 
326
        datasink_t              *dest_ds;
 
327
 
 
328
        comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;;
 
329
 
 
330
        destroy_worker_threads(comp_ctxt->threads, comp_ctxt->nthreads);
 
331
 
 
332
        dest_ctxt = comp_ctxt->dest_ctxt;
 
333
        dest_ds = dest_ctxt->datasink;
 
334
 
 
335
        dest_ds->deinit(dest_ctxt);
 
336
 
 
337
        MY_FREE(ctxt);
 
338
}
 
339
 
 
340
static inline
 
341
int
 
342
write_uint32_le(datasink_t *sink, ds_file_t *file, ulong n)
 
343
{
 
344
        char tmp[4];
 
345
 
 
346
        int4store(tmp, n);
 
347
        return sink->write(file, tmp, sizeof(tmp));
 
348
}
 
349
 
 
350
static inline
 
351
int
 
352
write_uint64_le(datasink_t *sink, ds_file_t *file, ulonglong n)
 
353
{
 
354
        char tmp[8];
 
355
 
 
356
        int8store(tmp, n);
 
357
        return sink->write(file, tmp, sizeof(tmp));
 
358
}
 
359
 
 
360
static
 
361
comp_thread_ctxt_t *
 
362
create_worker_threads(uint n)
 
363
{
 
364
        comp_thread_ctxt_t      *threads;
 
365
        uint                    i;
 
366
 
 
367
        threads = (comp_thread_ctxt_t *)
 
368
                my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE));
 
369
 
 
370
        for (i = 0; i < n; i++) {
 
371
                comp_thread_ctxt_t *thd = threads + i;
 
372
 
 
373
                thd->num = i + 1;
 
374
                thd->started = FALSE;
 
375
                thd->cancelled = FALSE;
 
376
                thd->data_avail = FALSE;
 
377
 
 
378
                thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE +
 
379
                                                   MY_QLZ_COMPRESS_OVERHEAD,
 
380
                                                   MYF(MY_FAE));
 
381
 
 
382
                /* Initialize the control mutex and condition var */
 
383
                if (pthread_mutex_init(&thd->ctrl_mutex, NULL) ||
 
384
                    pthread_cond_init(&thd->ctrl_cond, NULL)) {
 
385
                        goto err;
 
386
                }
 
387
 
 
388
                /* Initialize and data mutex and condition var */
 
389
                if (pthread_mutex_init(&thd->data_mutex, NULL) ||
 
390
                    pthread_cond_init(&thd->data_cond, NULL)) {
 
391
                        goto err;
 
392
                }
 
393
 
 
394
                pthread_mutex_lock(&thd->ctrl_mutex);
 
395
 
 
396
                if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
 
397
                                   thd)) {
 
398
                        msg("compress: pthread_create() failed: "
 
399
                            "errno = %d\n", errno);
 
400
                        goto err;
 
401
                }
 
402
        }
 
403
 
 
404
        /* Wait for the threads to start */
 
405
        for (i = 0; i < n; i++) {
 
406
                comp_thread_ctxt_t *thd = threads + i;
 
407
 
 
408
                while (thd->started == FALSE)
 
409
                        pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex);
 
410
                pthread_mutex_unlock(&thd->ctrl_mutex);
 
411
        }
 
412
 
 
413
        return threads;
 
414
 
 
415
err:
 
416
        return NULL;
 
417
}
 
418
 
 
419
static
 
420
void
 
421
destroy_worker_threads(comp_thread_ctxt_t *threads, uint n)
 
422
{
 
423
        uint i;
 
424
 
 
425
        for (i = 0; i < n; i++) {
 
426
                comp_thread_ctxt_t *thd = threads + i;
 
427
 
 
428
                pthread_mutex_lock(&thd->data_mutex);
 
429
                threads[i].cancelled = TRUE;
 
430
                pthread_cond_signal(&thd->data_cond);
 
431
                pthread_mutex_unlock(&thd->data_mutex);
 
432
 
 
433
                pthread_join(thd->id, NULL);
 
434
 
 
435
                pthread_cond_destroy(&thd->data_cond);
 
436
                pthread_mutex_destroy(&thd->data_mutex);
 
437
                pthread_cond_destroy(&thd->ctrl_cond);
 
438
                pthread_mutex_destroy(&thd->ctrl_mutex);
 
439
 
 
440
                MY_FREE(thd->to);
 
441
        }
 
442
 
 
443
        MY_FREE(threads);
 
444
}
 
445
 
 
446
static
 
447
void *
 
448
compress_worker_thread_func(void *arg)
 
449
{
 
450
        comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg;
 
451
 
 
452
        pthread_mutex_lock(&thd->ctrl_mutex);
 
453
 
 
454
        pthread_mutex_lock(&thd->data_mutex);
 
455
 
 
456
        thd->started = TRUE;
 
457
        pthread_cond_signal(&thd->ctrl_cond);
 
458
 
 
459
        pthread_mutex_unlock(&thd->ctrl_mutex);
 
460
 
 
461
        while (1) {
 
462
                thd->data_avail = FALSE;
 
463
                pthread_cond_signal(&thd->data_cond);
 
464
 
 
465
                while (!thd->data_avail && !thd->cancelled) {
 
466
                        pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
 
467
                }
 
468
 
 
469
                if (thd->cancelled)
 
470
                        break;
 
471
 
 
472
                thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
 
473
                                           &thd->state);
 
474
 
 
475
                /* qpress uses 0x00010000 as the initial value, but its own
 
476
                Adler-32 implementation treats the value differently:
 
477
                  1. higher order bits are the sum of all bytes in the sequence
 
478
                  2. lower order bits are the sum of resulting values at every
 
479
                     step.
 
480
                So it's the other way around as compared to zlib's adler32().
 
481
                That's why  0x00000001 is being passed here to be compatible
 
482
                with qpress implementation. */
 
483
 
 
484
                thd->adler = adler32(0x00000001, (uchar *) thd->to,
 
485
                                     thd->to_len);
 
486
        }
 
487
 
 
488
        pthread_mutex_unlock(&thd->data_mutex);
 
489
 
 
490
        return NULL;
 
491
}