~akopytov/percona-xtrabackup/bug1168513

« back to all changes in this revision

Viewing changes to src/ds_xbstream.c

MergedĀ lp:~gl-az/percona-xtrabackup/BT-23557-2.1-encrypted_stream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
#include <mysql_version.h>
22
22
#include <my_base.h>
23
 
#include <archive.h>
24
 
#include <archive_entry.h>
25
23
#include "common.h"
26
24
#include "datasink.h"
27
25
#include "xbstream.h"
28
26
 
29
27
typedef struct {
30
 
        struct archive  *archive;
31
28
        xb_wstream_t    *xbstream;
 
29
        ds_file_t       *dest_file;
32
30
} ds_stream_ctxt_t;
33
31
 
34
32
typedef struct {
35
 
        struct archive_entry    *entry;
36
33
        xb_wstream_file_t       *xbstream_file;
37
34
        ds_stream_ctxt_t        *stream_ctxt;
38
35
} ds_stream_file_t;
39
36
 
40
 
extern xb_stream_fmt_t xtrabackup_stream_fmt;
41
 
 
42
37
/***********************************************************************
43
38
General streaming interface */
44
39
 
45
 
static ds_ctxt_t *stream_init(const char *root);
46
 
static ds_file_t *stream_open(ds_ctxt_t *ctxt, const char *path,
 
40
static ds_ctxt_t *xbstream_init(const char *root);
 
41
static ds_file_t *xbstream_open(ds_ctxt_t *ctxt, const char *path,
47
42
                              MY_STAT *mystat);
48
 
static int stream_write(ds_file_t *file, const void *buf, size_t len);
49
 
static int stream_close(ds_file_t *file);
50
 
static void stream_deinit(ds_ctxt_t *ctxt);
 
43
static int xbstream_write(ds_file_t *file, const void *buf, size_t len);
 
44
static int xbstream_close(ds_file_t *file);
 
45
static void xbstream_deinit(ds_ctxt_t *ctxt);
51
46
 
52
 
datasink_t datasink_stream = {
53
 
        &stream_init,
54
 
        &stream_open,
55
 
        &stream_write,
56
 
        &stream_close,
57
 
        &stream_deinit
 
47
datasink_t datasink_xbstream = {
 
48
        &xbstream_init,
 
49
        &xbstream_open,
 
50
        &xbstream_write,
 
51
        &xbstream_close,
 
52
        &xbstream_deinit
58
53
};
59
54
 
60
55
static
 
56
ssize_t
 
57
my_xbstream_write_callback(xb_wstream_file_t *f __attribute__((unused)),
 
58
                       void *userdata, const void *buf, size_t len)
 
59
{
 
60
        ds_stream_ctxt_t        *stream_ctxt;
 
61
 
 
62
        stream_ctxt = (ds_stream_ctxt_t *) userdata;
 
63
 
 
64
        xb_ad(stream_ctxt != NULL);
 
65
        xb_ad(stream_ctxt->dest_file != NULL);
 
66
 
 
67
        if (!ds_write(stream_ctxt->dest_file, buf, len)) {
 
68
                return len;
 
69
        }
 
70
        return -1;
 
71
}
 
72
 
 
73
static
61
74
ds_ctxt_t *
62
 
stream_init(const char *root __attribute__((unused)))
 
75
xbstream_init(const char *root __attribute__((unused)))
63
76
{
64
77
        ds_ctxt_t               *ctxt;
65
78
        ds_stream_ctxt_t        *stream_ctxt;
68
81
                         MYF(MY_FAE));
69
82
        stream_ctxt = (ds_stream_ctxt_t *)(ctxt + 1);
70
83
 
71
 
        if (xtrabackup_stream_fmt == XB_STREAM_FMT_XBSTREAM) {
72
 
                /* xbstream format */
73
 
                xb_wstream_t *xbstream;
74
 
 
75
 
                xbstream = xb_stream_write_new();
76
 
                if (xbstream == NULL) {
77
 
                        msg("xb_stream_write_new() failed.\n");
78
 
                        goto err;
79
 
                }
80
 
                stream_ctxt->xbstream = xbstream;
81
 
        } else {
82
 
                /* Tar format */
83
 
                struct archive          *a;
84
 
 
85
 
                a = archive_write_new();
86
 
                if (a == NULL) {
87
 
                        msg("archive_write_new() failed.\n");
88
 
                        goto err;
89
 
                }
90
 
 
91
 
                if (archive_write_set_compression_none(a) != ARCHIVE_OK ||
92
 
                    archive_write_set_format_pax_restricted(a) != ARCHIVE_OK ||
93
 
                    /* disable internal buffering so we don't have to flush the
94
 
                    output in xtrabackup */
95
 
                    archive_write_set_bytes_per_block(a, 0) != ARCHIVE_OK) {
96
 
                        msg("failed to set libarchive stream options: %s\n",
97
 
                            archive_error_string(a));
98
 
                        archive_write_finish(a);
99
 
                        goto err;
100
 
                }
101
 
 
102
 
                if (archive_write_open_fd(a, fileno(stdout)) != ARCHIVE_OK) {
103
 
                        msg("cannot open output stream.\n");
104
 
                        goto err;
105
 
                }
106
 
                stream_ctxt->archive = a;
 
84
        xb_wstream_t *xbstream;
 
85
 
 
86
        xbstream = xb_stream_write_new();
 
87
        if (xbstream == NULL) {
 
88
                msg("xb_stream_write_new() failed.\n");
 
89
                goto err;
107
90
        }
108
 
 
 
91
        stream_ctxt->xbstream = xbstream;
 
92
        stream_ctxt->dest_file = NULL;
109
93
 
110
94
        ctxt->ptr = stream_ctxt;
111
95
 
118
102
 
119
103
static
120
104
ds_file_t *
121
 
stream_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
 
105
xbstream_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
122
106
{
123
107
        ds_file_t               *file;
124
108
        ds_stream_file_t        *stream_file;
125
109
        ds_stream_ctxt_t        *stream_ctxt;
 
110
        ds_ctxt_t               *dest_ctxt;
 
111
 
 
112
        xb_ad(ctxt->pipe_ctxt != NULL);
 
113
        dest_ctxt = ctxt->pipe_ctxt;
126
114
 
127
115
        stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr;
128
116
 
 
117
        if (stream_ctxt->dest_file == NULL) {
 
118
                stream_ctxt->dest_file = ds_open(dest_ctxt, path, mystat);
 
119
                if (stream_ctxt->dest_file == NULL) {
 
120
                        return NULL;
 
121
                }
 
122
        }
 
123
 
129
124
        file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
130
125
                                       sizeof(ds_stream_file_t),
131
126
                                       MYF(MY_FAE));
132
127
        stream_file = (ds_stream_file_t *) (file + 1);
133
128
 
134
 
        if (xtrabackup_stream_fmt == XB_STREAM_FMT_XBSTREAM) {
135
 
                /* xbstream format */
136
 
 
137
 
                xb_wstream_t            *xbstream;
138
 
                xb_wstream_file_t       *xbstream_file;
139
 
 
140
 
                xbstream = stream_ctxt->xbstream;
141
 
 
142
 
                xbstream_file = xb_stream_write_open(xbstream, path, mystat);
143
 
                if (xbstream_file == NULL) {
144
 
                        msg("xb_stream_write_open() failed.\n");
145
 
                        goto err;
146
 
                }
147
 
 
148
 
                stream_file->xbstream_file = xbstream_file;
149
 
        } else {
150
 
                /* Tar format */
151
 
 
152
 
                struct archive          *a;
153
 
                struct archive_entry    *entry;
154
 
 
155
 
                a = stream_ctxt->archive;
156
 
 
157
 
                entry = archive_entry_new();
158
 
                if (entry == NULL) {
159
 
                        msg("archive_entry_new() failed.\n");
160
 
                        goto err;
161
 
                }
162
 
 
163
 
                archive_entry_set_size(entry, mystat->st_size);
164
 
                archive_entry_set_mode(entry, 0660);
165
 
                archive_entry_set_filetype(entry, AE_IFREG);
166
 
                archive_entry_set_pathname(entry, path);
167
 
                archive_entry_set_mtime(entry, mystat->st_mtime, 0);
168
 
 
169
 
                if (archive_write_header(a, entry) != ARCHIVE_OK) {
170
 
                        msg("archive_write_header() failed.\n");
171
 
                        archive_entry_free(entry);
172
 
                        goto err;
173
 
                }
174
 
                stream_file->entry = entry;
 
129
        xb_wstream_t            *xbstream;
 
130
        xb_wstream_file_t       *xbstream_file;
 
131
 
 
132
        xbstream = stream_ctxt->xbstream;
 
133
 
 
134
        xbstream_file = xb_stream_write_open(xbstream, path, mystat,
 
135
                                             stream_ctxt,
 
136
                                             my_xbstream_write_callback);
 
137
 
 
138
        if (xbstream_file == NULL) {
 
139
                msg("xb_stream_write_open() failed.\n");
 
140
                goto err;
175
141
        }
176
142
 
 
143
        stream_file->xbstream_file = xbstream_file;
177
144
        stream_file->stream_ctxt = stream_ctxt;
178
145
        file->ptr = stream_file;
179
 
        file->path = "<STDOUT>";
 
146
        file->path = stream_ctxt->dest_file->path;
180
147
 
181
148
        return file;
182
149
 
183
150
err:
 
151
        if (stream_ctxt->dest_file) {
 
152
                ds_close(stream_ctxt->dest_file);
 
153
                stream_ctxt->dest_file = NULL;
 
154
        }
184
155
        MY_FREE(file);
185
156
 
186
157
        return NULL;
188
159
 
189
160
static
190
161
int
191
 
stream_write(ds_file_t *file, const void *buf, size_t len)
 
162
xbstream_write(ds_file_t *file, const void *buf, size_t len)
192
163
{
193
164
        ds_stream_file_t        *stream_file;
194
165
 
195
166
        stream_file = (ds_stream_file_t *) file->ptr;
196
167
 
197
 
        if (xtrabackup_stream_fmt == XB_STREAM_FMT_XBSTREAM) {
198
 
                /* xbstream format */
199
 
 
200
 
                xb_wstream_file_t       *xbstream_file;
201
 
 
202
 
                xbstream_file = stream_file->xbstream_file;
203
 
 
204
 
                if (xb_stream_write_data(xbstream_file, buf, len)) {
205
 
                        msg("xb_stream_write_data() failed.\n");
206
 
                        return 1;
207
 
                }
208
 
        } else {
209
 
                /* Tar format */
210
 
 
211
 
                struct archive          *a;
212
 
 
213
 
                a = stream_file->stream_ctxt->archive;
214
 
 
215
 
                if (archive_write_data(a, buf, len) < 0) {
216
 
                        msg("archive_write_data() failed: %s (errno = %d)\n",
217
 
                            archive_error_string(a), archive_errno(a));
218
 
                        return 1;
219
 
                }
 
168
        xb_wstream_file_t       *xbstream_file;
 
169
 
 
170
        xbstream_file = stream_file->xbstream_file;
 
171
 
 
172
        if (xb_stream_write_data(xbstream_file, buf, len)) {
 
173
                msg("xb_stream_write_data() failed.\n");
 
174
                return 1;
220
175
        }
221
176
 
222
177
        return 0;
224
179
 
225
180
static
226
181
int
227
 
stream_close(ds_file_t *file)
 
182
xbstream_close(ds_file_t *file)
228
183
{
229
184
        ds_stream_file_t        *stream_file;
230
185
        int                     rc = 0;
231
186
 
232
187
        stream_file = (ds_stream_file_t *)file->ptr;
233
188
 
234
 
        if (xtrabackup_stream_fmt == XB_STREAM_FMT_XBSTREAM) {
235
 
                rc = xb_stream_write_close(stream_file->xbstream_file);
236
 
        } else {
237
 
                archive_entry_free(stream_file->entry);
238
 
        }
 
189
        rc = xb_stream_write_close(stream_file->xbstream_file);
239
190
 
240
191
        MY_FREE(file);
241
192
 
244
195
 
245
196
static
246
197
void
247
 
stream_deinit(ds_ctxt_t *ctxt)
 
198
xbstream_deinit(ds_ctxt_t *ctxt)
248
199
{
249
200
        ds_stream_ctxt_t        *stream_ctxt;
250
201
 
251
202
        stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr;
252
203
 
253
 
        if (xtrabackup_stream_fmt == XB_STREAM_FMT_XBSTREAM) {
254
 
                if (xb_stream_write_done(stream_ctxt->xbstream)) {
255
 
                        msg("xb_stream_done() failed.\n");
256
 
                }
257
 
        } else {
258
 
                struct archive *a;
259
 
 
260
 
                a = stream_ctxt->archive;
261
 
 
262
 
                if (archive_write_close(a) != ARCHIVE_OK) {
263
 
                        msg("archive_write_close() failed.\n");
264
 
                }
265
 
                archive_write_finish(a);
266
 
        }
267
 
 
 
204
        if (xb_stream_write_done(stream_ctxt->xbstream)) {
 
205
                msg("xb_stream_done() failed.\n");
 
206
        }
 
207
 
 
208
        if (stream_ctxt->dest_file) {
 
209
                ds_close(stream_ctxt->dest_file);
 
210
                stream_ctxt->dest_file = NULL;
 
211
        }
268
212
        MY_FREE(ctxt);
269
213
}