~laurynas-biveinis/percona-xtrabackup/xtrabackup-page-filters

« back to all changes in this revision

Viewing changes to src/libarchive/libarchive/archive_read_support_compression_program.c

merge parallel compression branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * Copyright (c) 2007 Joerg Sonnenberger
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 *
 
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
 
15
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
16
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
17
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
 
18
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
19
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
20
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
21
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
22
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
23
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
24
 */
 
25
 
 
26
#include "archive_platform.h"
 
27
__FBSDID("$FreeBSD: head/lib/libarchive/archive_read_support_compression_program.c 201112 2009-12-28 06:59:35Z kientzle $");
 
28
 
 
29
#ifdef HAVE_SYS_WAIT_H
 
30
#  include <sys/wait.h>
 
31
#endif
 
32
#ifdef HAVE_ERRNO_H
 
33
#  include <errno.h>
 
34
#endif
 
35
#ifdef HAVE_FCNTL_H
 
36
#  include <fcntl.h>
 
37
#endif
 
38
#ifdef HAVE_LIMITS_H
 
39
#  include <limits.h>
 
40
#endif
 
41
#ifdef HAVE_SIGNAL_H
 
42
#  include <signal.h>
 
43
#endif
 
44
#ifdef HAVE_STDLIB_H
 
45
#  include <stdlib.h>
 
46
#endif
 
47
#ifdef HAVE_STRING_H
 
48
#  include <string.h>
 
49
#endif
 
50
#ifdef HAVE_UNISTD_H
 
51
#  include <unistd.h>
 
52
#endif
 
53
 
 
54
#include "archive.h"
 
55
#include "archive_private.h"
 
56
#include "archive_read_private.h"
 
57
 
 
58
int
 
59
archive_read_support_compression_program(struct archive *a, const char *cmd)
 
60
{
 
61
        return (archive_read_support_compression_program_signature(a, cmd, NULL, 0));
 
62
}
 
63
 
 
64
 
 
65
/* This capability is only available on POSIX systems. */
 
66
#if (!defined(HAVE_PIPE) || !defined(HAVE_FCNTL) || \
 
67
    !(defined(HAVE_FORK) || defined(HAVE_VFORK))) && (!defined(_WIN32) || defined(__CYGWIN__))
 
68
 
 
69
/*
 
70
 * On non-Posix systems, allow the program to build, but choke if
 
71
 * this function is actually invoked.
 
72
 */
 
73
int
 
74
archive_read_support_compression_program_signature(struct archive *_a,
 
75
    const char *cmd, void *signature, size_t signature_len)
 
76
{
 
77
        (void)_a; /* UNUSED */
 
78
        (void)cmd; /* UNUSED */
 
79
        (void)signature; /* UNUSED */
 
80
        (void)signature_len; /* UNUSED */
 
81
 
 
82
        archive_set_error(_a, -1,
 
83
            "External compression programs not supported on this platform");
 
84
        return (ARCHIVE_FATAL);
 
85
}
 
86
 
 
87
int
 
88
__archive_read_program(struct archive_read_filter *self, const char *cmd)
 
89
{
 
90
        (void)self; /* UNUSED */
 
91
        (void)cmd; /* UNUSED */
 
92
 
 
93
        archive_set_error(&self->archive->archive, -1,
 
94
            "External compression programs not supported on this platform");
 
95
        return (ARCHIVE_FATAL);
 
96
}
 
97
 
 
98
#else
 
99
 
 
100
#include "filter_fork.h"
 
101
 
 
102
/*
 
103
 * The bidder object stores the command and the signature to watch for.
 
104
 * The 'inhibit' entry here is used to ensure that unchecked filters never
 
105
 * bid twice in the same pipeline.
 
106
 */
 
107
struct program_bidder {
 
108
        char *cmd;
 
109
        void *signature;
 
110
        size_t signature_len;
 
111
        int inhibit;
 
112
};
 
113
 
 
114
static int      program_bidder_bid(struct archive_read_filter_bidder *,
 
115
                    struct archive_read_filter *upstream);
 
116
static int      program_bidder_init(struct archive_read_filter *);
 
117
static int      program_bidder_free(struct archive_read_filter_bidder *);
 
118
 
 
119
/*
 
120
 * The actual filter needs to track input and output data.
 
121
 */
 
122
struct program_filter {
 
123
        char            *description;
 
124
        pid_t            child;
 
125
        int              exit_status;
 
126
        int              waitpid_return;
 
127
        int              child_stdin, child_stdout;
 
128
 
 
129
        char            *out_buf;
 
130
        size_t           out_buf_len;
 
131
};
 
132
 
 
133
static ssize_t  program_filter_read(struct archive_read_filter *,
 
134
                    const void **);
 
135
static int      program_filter_close(struct archive_read_filter *);
 
136
 
 
137
int
 
138
archive_read_support_compression_program_signature(struct archive *_a,
 
139
    const char *cmd, const void *signature, size_t signature_len)
 
140
{
 
141
        struct archive_read *a = (struct archive_read *)_a;
 
142
        struct archive_read_filter_bidder *bidder;
 
143
        struct program_bidder *state;
 
144
 
 
145
        /*
 
146
         * Get a bidder object from the read core.
 
147
         */
 
148
        bidder = __archive_read_get_bidder(a);
 
149
        if (bidder == NULL)
 
150
                return (ARCHIVE_FATAL);
 
151
 
 
152
        /*
 
153
         * Allocate our private state.
 
154
         */
 
155
        state = (struct program_bidder *)calloc(sizeof (*state), 1);
 
156
        if (state == NULL)
 
157
                return (ARCHIVE_FATAL);
 
158
        state->cmd = strdup(cmd);
 
159
        if (signature != NULL && signature_len > 0) {
 
160
                state->signature_len = signature_len;
 
161
                state->signature = malloc(signature_len);
 
162
                memcpy(state->signature, signature, signature_len);
 
163
        }
 
164
 
 
165
        /*
 
166
         * Fill in the bidder object.
 
167
         */
 
168
        bidder->data = state;
 
169
        bidder->bid = program_bidder_bid;
 
170
        bidder->init = program_bidder_init;
 
171
        bidder->options = NULL;
 
172
        bidder->free = program_bidder_free;
 
173
        return (ARCHIVE_OK);
 
174
}
 
175
 
 
176
static int
 
177
program_bidder_free(struct archive_read_filter_bidder *self)
 
178
{
 
179
        struct program_bidder *state = (struct program_bidder *)self->data;
 
180
        free(state->cmd);
 
181
        free(state->signature);
 
182
        free(self->data);
 
183
        return (ARCHIVE_OK);
 
184
}
 
185
 
 
186
/*
 
187
 * If we do have a signature, bid only if that matches.
 
188
 *
 
189
 * If there's no signature, we bid INT_MAX the first time
 
190
 * we're called, then never bid again.
 
191
 */
 
192
static int
 
193
program_bidder_bid(struct archive_read_filter_bidder *self,
 
194
    struct archive_read_filter *upstream)
 
195
{
 
196
        struct program_bidder *state = self->data;
 
197
        const char *p;
 
198
 
 
199
        /* If we have a signature, use that to match. */
 
200
        if (state->signature_len > 0) {
 
201
                p = __archive_read_filter_ahead(upstream,
 
202
                    state->signature_len, NULL);
 
203
                if (p == NULL)
 
204
                        return (0);
 
205
                /* No match, so don't bid. */
 
206
                if (memcmp(p, state->signature, state->signature_len) != 0)
 
207
                        return (0);
 
208
                return ((int)state->signature_len * 8);
 
209
        }
 
210
 
 
211
        /* Otherwise, bid once and then never bid again. */
 
212
        if (state->inhibit)
 
213
                return (0);
 
214
        state->inhibit = 1;
 
215
        return (INT_MAX);
 
216
}
 
217
 
 
218
/*
 
219
 * Shut down the child, return ARCHIVE_OK if it exited normally.
 
220
 *
 
221
 * Note that the return value is sticky; if we're called again,
 
222
 * we won't reap the child again, but we will return the same status
 
223
 * (including error message if the child came to a bad end).
 
224
 */
 
225
static int
 
226
child_stop(struct archive_read_filter *self, struct program_filter *state)
 
227
{
 
228
        /* Close our side of the I/O with the child. */
 
229
        if (state->child_stdin != -1) {
 
230
                close(state->child_stdin);
 
231
                state->child_stdin = -1;
 
232
        }
 
233
        if (state->child_stdout != -1) {
 
234
                close(state->child_stdout);
 
235
                state->child_stdout = -1;
 
236
        }
 
237
 
 
238
        if (state->child != 0) {
 
239
                /* Reap the child. */
 
240
                do {
 
241
                        state->waitpid_return
 
242
                            = waitpid(state->child, &state->exit_status, 0);
 
243
                } while (state->waitpid_return == -1 && errno == EINTR);
 
244
                state->child = 0;
 
245
        }
 
246
 
 
247
        if (state->waitpid_return < 0) {
 
248
                /* waitpid() failed?  This is ugly. */
 
249
                archive_set_error(&self->archive->archive, ARCHIVE_ERRNO_MISC,
 
250
                    "Child process exited badly");
 
251
                return (ARCHIVE_WARN);
 
252
        }
 
253
 
 
254
#if !defined(_WIN32) || defined(__CYGWIN__)
 
255
        if (WIFSIGNALED(state->exit_status)) {
 
256
#ifdef SIGPIPE
 
257
                /* If the child died because we stopped reading before
 
258
                 * it was done, that's okay.  Some archive formats
 
259
                 * have padding at the end that we routinely ignore. */
 
260
                /* The alternative to this would be to add a step
 
261
                 * before close(child_stdout) above to read from the
 
262
                 * child until the child has no more to write. */
 
263
                if (WTERMSIG(state->exit_status) == SIGPIPE)
 
264
                        return (ARCHIVE_OK);
 
265
#endif
 
266
                archive_set_error(&self->archive->archive, ARCHIVE_ERRNO_MISC,
 
267
                    "Child process exited with signal %d",
 
268
                    WTERMSIG(state->exit_status));
 
269
                return (ARCHIVE_WARN);
 
270
        }
 
271
#endif /* !_WIN32 || __CYGWIN__ */
 
272
 
 
273
        if (WIFEXITED(state->exit_status)) {
 
274
                if (WEXITSTATUS(state->exit_status) == 0)
 
275
                        return (ARCHIVE_OK);
 
276
 
 
277
                archive_set_error(&self->archive->archive,
 
278
                    ARCHIVE_ERRNO_MISC,
 
279
                    "Child process exited with status %d",
 
280
                    WEXITSTATUS(state->exit_status));
 
281
                return (ARCHIVE_WARN);
 
282
        }
 
283
 
 
284
        return (ARCHIVE_WARN);
 
285
}
 
286
 
 
287
/*
 
288
 * Use select() to decide whether the child is ready for read or write.
 
289
 */
 
290
static ssize_t
 
291
child_read(struct archive_read_filter *self, char *buf, size_t buf_len)
 
292
{
 
293
        struct program_filter *state = self->data;
 
294
        ssize_t ret, requested, avail;
 
295
        const char *p;
 
296
 
 
297
        requested = buf_len > SSIZE_MAX ? SSIZE_MAX : buf_len;
 
298
 
 
299
        for (;;) {
 
300
                do {
 
301
                        ret = read(state->child_stdout, buf, requested);
 
302
                } while (ret == -1 && errno == EINTR);
 
303
 
 
304
                if (ret > 0)
 
305
                        return (ret);
 
306
                if (ret == 0 || (ret == -1 && errno == EPIPE))
 
307
                        /* Child has closed its output; reap the child
 
308
                         * and return the status. */
 
309
                        return (child_stop(self, state));
 
310
                if (ret == -1 && errno != EAGAIN)
 
311
                        return (-1);
 
312
 
 
313
                if (state->child_stdin == -1) {
 
314
                        /* Block until child has some I/O ready. */
 
315
                        __archive_check_child(state->child_stdin,
 
316
                            state->child_stdout);
 
317
                        continue;
 
318
                }
 
319
 
 
320
                /* Get some more data from upstream. */
 
321
                p = __archive_read_filter_ahead(self->upstream, 1, &avail);
 
322
                if (p == NULL) {
 
323
                        close(state->child_stdin);
 
324
                        state->child_stdin = -1;
 
325
                        fcntl(state->child_stdout, F_SETFL, 0);
 
326
                        if (avail < 0)
 
327
                                return (avail);
 
328
                        continue;
 
329
                }
 
330
 
 
331
                do {
 
332
                        ret = write(state->child_stdin, p, avail);
 
333
                } while (ret == -1 && errno == EINTR);
 
334
 
 
335
                if (ret > 0) {
 
336
                        /* Consume whatever we managed to write. */
 
337
                        __archive_read_filter_consume(self->upstream, ret);
 
338
                } else if (ret == -1 && errno == EAGAIN) {
 
339
                        /* Block until child has some I/O ready. */
 
340
                        __archive_check_child(state->child_stdin,
 
341
                            state->child_stdout);
 
342
                } else {
 
343
                        /* Write failed. */
 
344
                        close(state->child_stdin);
 
345
                        state->child_stdin = -1;
 
346
                        fcntl(state->child_stdout, F_SETFL, 0);
 
347
                        /* If it was a bad error, we're done; otherwise
 
348
                         * it was EPIPE or EOF, and we can still read
 
349
                         * from the child. */
 
350
                        if (ret == -1 && errno != EPIPE)
 
351
                                return (-1);
 
352
                }
 
353
        }
 
354
}
 
355
 
 
356
int
 
357
__archive_read_program(struct archive_read_filter *self, const char *cmd)
 
358
{
 
359
        struct program_filter   *state;
 
360
        static const size_t out_buf_len = 65536;
 
361
        char *out_buf;
 
362
        char *description;
 
363
        const char *prefix = "Program: ";
 
364
 
 
365
        state = (struct program_filter *)calloc(1, sizeof(*state));
 
366
        out_buf = (char *)malloc(out_buf_len);
 
367
        description = (char *)malloc(strlen(prefix) + strlen(cmd) + 1);
 
368
        if (state == NULL || out_buf == NULL || description == NULL) {
 
369
                archive_set_error(&self->archive->archive, ENOMEM,
 
370
                    "Can't allocate input data");
 
371
                free(state);
 
372
                free(out_buf);
 
373
                free(description);
 
374
                return (ARCHIVE_FATAL);
 
375
        }
 
376
 
 
377
        self->code = ARCHIVE_COMPRESSION_PROGRAM;
 
378
        state->description = description;
 
379
        strcpy(state->description, prefix);
 
380
        strcat(state->description, cmd);
 
381
        self->name = state->description;
 
382
 
 
383
        state->out_buf = out_buf;
 
384
        state->out_buf_len = out_buf_len;
 
385
 
 
386
        if ((state->child = __archive_create_child(cmd,
 
387
                 &state->child_stdin, &state->child_stdout)) == -1) {
 
388
                free(state->out_buf);
 
389
                free(state);
 
390
                archive_set_error(&self->archive->archive, EINVAL,
 
391
                    "Can't initialise filter");
 
392
                return (ARCHIVE_FATAL);
 
393
        }
 
394
 
 
395
        self->data = state;
 
396
        self->read = program_filter_read;
 
397
        self->skip = NULL;
 
398
        self->close = program_filter_close;
 
399
 
 
400
        /* XXX Check that we can read at least one byte? */
 
401
        return (ARCHIVE_OK);
 
402
}
 
403
 
 
404
static int
 
405
program_bidder_init(struct archive_read_filter *self)
 
406
{
 
407
        struct program_bidder   *bidder_state;
 
408
 
 
409
        bidder_state = (struct program_bidder *)self->bidder->data;
 
410
        return (__archive_read_program(self, bidder_state->cmd));
 
411
}
 
412
 
 
413
static ssize_t
 
414
program_filter_read(struct archive_read_filter *self, const void **buff)
 
415
{
 
416
        struct program_filter *state;
 
417
        ssize_t bytes;
 
418
        size_t total;
 
419
        char *p;
 
420
 
 
421
        state = (struct program_filter *)self->data;
 
422
 
 
423
        total = 0;
 
424
        p = state->out_buf;
 
425
        while (state->child_stdout != -1 && total < state->out_buf_len) {
 
426
                bytes = child_read(self, p, state->out_buf_len - total);
 
427
                if (bytes < 0)
 
428
                        /* No recovery is possible if we can no longer
 
429
                         * read from the child. */
 
430
                        return (ARCHIVE_FATAL);
 
431
                if (bytes == 0)
 
432
                        /* We got EOF from the child. */
 
433
                        break;
 
434
                total += bytes;
 
435
                p += bytes;
 
436
        }
 
437
 
 
438
        *buff = state->out_buf;
 
439
        return (total);
 
440
}
 
441
 
 
442
static int
 
443
program_filter_close(struct archive_read_filter *self)
 
444
{
 
445
        struct program_filter   *state;
 
446
        int e;
 
447
 
 
448
        state = (struct program_filter *)self->data;
 
449
        e = child_stop(self, state);
 
450
 
 
451
        /* Release our private data. */
 
452
        free(state->out_buf);
 
453
        free(state->description);
 
454
        free(state);
 
455
 
 
456
        return (e);
 
457
}
 
458
 
 
459
#endif /* !defined(HAVE_PIPE) || !defined(HAVE_VFORK) || !defined(HAVE_FCNTL) */