1
/******************************************************
2
Copyright (c) 2011 Percona Inc.
4
Compressing datasink implementation for XtraBackup.
6
This program is free software; you can redistribute it and/or modify
7
it under the terms of the GNU General Public License as published by
8
the Free Software Foundation; version 2 of the License.
10
This program is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
15
You should have received a copy of the GNU General Public License
16
along with this program; if not, write to the Free Software
17
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
*******************************************************/
31
#define COMPRESS_CHUNK_SIZE (64 * 1024UL)
32
#define MY_QLZ_COMPRESS_OVERHEAD 400
37
pthread_mutex_t ctrl_mutex;
38
pthread_cond_t ctrl_cond;
39
pthread_mutex_t data_mutex;
40
pthread_cond_t data_cond;
48
qlz_state_compress state;
54
comp_thread_ctxt_t *threads;
61
ds_compress_ctxt_t *comp_ctxt;
62
size_t bytes_processed;
65
extern ibool xtrabackup_stream;
66
extern uint xtrabackup_parallel;
67
extern ibool xtrabackup_compress_threads;
69
static ds_ctxt_t *compress_init(const char *root);
70
static ds_file_t *compress_open(ds_ctxt_t *ctxt, const char *path,
72
static int compress_write(ds_file_t *file, const void *buf, size_t len);
73
static int compress_close(ds_file_t *file);
74
static void compress_deinit(ds_ctxt_t *ctxt);
76
datasink_t datasink_compress = {
84
static inline int write_uint32_le(datasink_t *sink, ds_file_t *file,
86
static inline int write_uint64_le(datasink_t *sink, ds_file_t *file,
89
static comp_thread_ctxt_t *create_worker_threads(uint n);
90
static void destroy_worker_threads(comp_thread_ctxt_t *threads, uint n);
91
static void *compress_worker_thread_func(void *arg);
95
compress_init(const char *root)
98
ds_compress_ctxt_t *compress_ctxt;
100
ds_ctxt_t *dest_ctxt;
101
comp_thread_ctxt_t *threads;
103
/* Decide whether the compressed data will be stored in local files or
104
streamed to an archive */
105
dest_ds = xtrabackup_stream ? &datasink_stream : &datasink_local;
107
dest_ctxt = dest_ds->init(root);
108
if (dest_ctxt == NULL) {
109
msg("compress: failed to initialize the target datasink.\n");
113
/* Create and initialize the worker threads */
114
threads = create_worker_threads(xtrabackup_compress_threads);
115
if (threads == NULL) {
116
msg("compress: failed to create worker threads.\n");
117
dest_ds->deinit(dest_ctxt);
121
ctxt = (ds_ctxt_t *) my_malloc(sizeof(ds_ctxt_t) +
122
sizeof(ds_compress_ctxt_t),
125
compress_ctxt = (ds_compress_ctxt_t *) (ctxt + 1);
126
compress_ctxt->dest_ctxt = dest_ctxt;
127
compress_ctxt->threads = threads;
128
compress_ctxt->nthreads = xtrabackup_compress_threads;
130
ctxt->datasink = &datasink_compress;
131
ctxt->ptr = compress_ctxt;
138
compress_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
140
ds_compress_ctxt_t *comp_ctxt;
142
ds_ctxt_t *dest_ctxt;
143
ds_file_t *dest_file;
144
char new_name[FN_REFLEN];
147
ds_compress_file_t *comp_file;
149
comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;
150
dest_ctxt = comp_ctxt->dest_ctxt;
151
dest_ds = dest_ctxt->datasink;
153
/* Append the .qp extension to the filename */
154
fn_format(new_name, path, "", ".qp", MYF(MY_APPEND_EXT));
156
dest_file = dest_ds->open(dest_ctxt, new_name, mystat);
157
if (dest_file == NULL) {
161
/* Write the qpress archive header */
162
if (dest_ds->write(dest_file, "qpress10", 8) ||
163
write_uint64_le(dest_ds, dest_file, COMPRESS_CHUNK_SIZE)) {
167
/* We are going to create a one-file "flat" (i.e. with no
168
subdirectories) archive. So strip the directory part from the path and
169
remove the '.qp' suffix. */
170
fn_format(new_name, path, "", "", MYF(MY_REPLACE_DIR));
172
/* Write the qpress file header */
173
name_len = strlen(new_name);
174
if (dest_ds->write(dest_file, "F", 1) ||
175
write_uint32_le(dest_ds, dest_file, name_len) ||
176
/* we want to write the terminating \0 as well */
177
dest_ds->write(dest_file, new_name, name_len + 1)) {
181
file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
182
sizeof(ds_compress_file_t),
184
comp_file = (ds_compress_file_t *) (file + 1);
185
comp_file->dest_file = dest_file;
186
comp_file->dest_ds = dest_ds;
187
comp_file->comp_ctxt = comp_ctxt;
188
comp_file->bytes_processed = 0;
190
file->ptr = comp_file;
191
file->path = dest_file->path;
196
dest_ds->close(dest_file);
202
compress_write(ds_file_t *file, const void *buf, size_t len)
204
ds_compress_file_t *comp_file;
205
ds_compress_ctxt_t *comp_ctxt;
206
comp_thread_ctxt_t *threads;
207
comp_thread_ctxt_t *thd;
212
ds_file_t *dest_file;
214
comp_file = (ds_compress_file_t *) file->ptr;
215
comp_ctxt = comp_file->comp_ctxt;
216
dest_ds = comp_file->dest_ds;
217
dest_file = comp_file->dest_file;
219
threads = comp_ctxt->threads;
220
nthreads = comp_ctxt->nthreads;
222
ptr = (const char *) buf;
226
/* Send data to worker threads for compression */
227
for (i = 0; i < nthreads; i++) {
232
pthread_mutex_lock(&thd->ctrl_mutex);
234
chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
235
COMPRESS_CHUNK_SIZE : len;
237
thd->from_len = chunk_len;
239
pthread_mutex_lock(&thd->data_mutex);
240
thd->data_avail = TRUE;
241
pthread_cond_signal(&thd->data_cond);
242
pthread_mutex_unlock(&thd->data_mutex);
251
max_thread = (i < nthreads) ? i : nthreads - 1;
253
/* Reap and stream the compressed data */
254
for (i = 0; i <= max_thread; i++) {
257
pthread_mutex_lock(&thd->data_mutex);
258
while (thd->data_avail == TRUE) {
259
pthread_cond_wait(&thd->data_cond,
263
ut_a(threads[i].to_len > 0);
265
if (dest_ds->write(dest_file, "NEWBNEWB", 8) ||
266
write_uint64_le(dest_ds, dest_file,
267
comp_file->bytes_processed)) {
268
msg("compress: write to the destination stream "
273
comp_file->bytes_processed += threads[i].from_len;
275
if (write_uint32_le(dest_ds, dest_file,
277
dest_ds->write(dest_file, threads[i].to,
278
threads[i].to_len)) {
279
msg("compress: write to the destination stream "
284
pthread_mutex_unlock(&threads[i].data_mutex);
285
pthread_mutex_unlock(&threads[i].ctrl_mutex);
294
compress_close(ds_file_t *file)
296
ds_compress_file_t *comp_file;
298
ds_file_t *dest_file;
300
comp_file = (ds_compress_file_t *) file->ptr;
301
dest_ds = comp_file->dest_ds;
302
dest_file = comp_file->dest_file;
304
/* Write the qpress file trailer */
305
dest_ds->write(dest_file, "ENDSENDS", 8);
307
/* Supposedly the number of written bytes should be written as a
308
"recovery information" in the file trailer, but in reality qpress
309
always writes 8 zeros here. Let's do the same */
311
write_uint64_le(dest_ds, dest_file, 0);
313
dest_ds->close(dest_file);
322
compress_deinit(ds_ctxt_t *ctxt)
324
ds_compress_ctxt_t *comp_ctxt;
325
ds_ctxt_t *dest_ctxt;
328
comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;;
330
destroy_worker_threads(comp_ctxt->threads, comp_ctxt->nthreads);
332
dest_ctxt = comp_ctxt->dest_ctxt;
333
dest_ds = dest_ctxt->datasink;
335
dest_ds->deinit(dest_ctxt);
342
write_uint32_le(datasink_t *sink, ds_file_t *file, ulong n)
347
return sink->write(file, tmp, sizeof(tmp));
352
write_uint64_le(datasink_t *sink, ds_file_t *file, ulonglong n)
357
return sink->write(file, tmp, sizeof(tmp));
362
create_worker_threads(uint n)
364
comp_thread_ctxt_t *threads;
367
threads = (comp_thread_ctxt_t *)
368
my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE));
370
for (i = 0; i < n; i++) {
371
comp_thread_ctxt_t *thd = threads + i;
374
thd->started = FALSE;
375
thd->cancelled = FALSE;
376
thd->data_avail = FALSE;
378
thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE +
379
MY_QLZ_COMPRESS_OVERHEAD,
382
/* Initialize the control mutex and condition var */
383
if (pthread_mutex_init(&thd->ctrl_mutex, NULL) ||
384
pthread_cond_init(&thd->ctrl_cond, NULL)) {
388
/* Initialize and data mutex and condition var */
389
if (pthread_mutex_init(&thd->data_mutex, NULL) ||
390
pthread_cond_init(&thd->data_cond, NULL)) {
394
pthread_mutex_lock(&thd->ctrl_mutex);
396
if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
398
msg("compress: pthread_create() failed: "
399
"errno = %d\n", errno);
404
/* Wait for the threads to start */
405
for (i = 0; i < n; i++) {
406
comp_thread_ctxt_t *thd = threads + i;
408
while (thd->started == FALSE)
409
pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex);
410
pthread_mutex_unlock(&thd->ctrl_mutex);
421
destroy_worker_threads(comp_thread_ctxt_t *threads, uint n)
425
for (i = 0; i < n; i++) {
426
comp_thread_ctxt_t *thd = threads + i;
428
pthread_mutex_lock(&thd->data_mutex);
429
threads[i].cancelled = TRUE;
430
pthread_cond_signal(&thd->data_cond);
431
pthread_mutex_unlock(&thd->data_mutex);
433
pthread_join(thd->id, NULL);
435
pthread_cond_destroy(&thd->data_cond);
436
pthread_mutex_destroy(&thd->data_mutex);
437
pthread_cond_destroy(&thd->ctrl_cond);
438
pthread_mutex_destroy(&thd->ctrl_mutex);
448
compress_worker_thread_func(void *arg)
450
comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg;
452
pthread_mutex_lock(&thd->ctrl_mutex);
454
pthread_mutex_lock(&thd->data_mutex);
457
pthread_cond_signal(&thd->ctrl_cond);
459
pthread_mutex_unlock(&thd->ctrl_mutex);
462
thd->data_avail = FALSE;
463
pthread_cond_signal(&thd->data_cond);
465
while (!thd->data_avail && !thd->cancelled) {
466
pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
472
thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
475
/* qpress uses 0x00010000 as the initial value, but its own
476
Adler-32 implementation treats the value differently:
477
1. higher order bits are the sum of all bytes in the sequence
478
2. lower order bits are the sum of resulting values at every
480
So it's the other way around as compared to zlib's adler32().
481
That's why 0x00000001 is being passed here to be compatible
482
with qpress implementation. */
484
thd->adler = adler32(0x00000001, (uchar *) thd->to,
488
pthread_mutex_unlock(&thd->data_mutex);