21
21
#include <mysql_version.h>
22
22
#include <my_base.h>
24
#include <archive_entry.h>
25
23
#include "common.h"
26
24
#include "datasink.h"
27
25
#include "xbstream.h"
30
struct archive *archive;
31
28
xb_wstream_t *xbstream;
32
30
} ds_stream_ctxt_t;
35
struct archive_entry *entry;
36
33
xb_wstream_file_t *xbstream_file;
37
34
ds_stream_ctxt_t *stream_ctxt;
38
35
} ds_stream_file_t;
40
extern xb_stream_fmt_t xtrabackup_stream_fmt;
42
37
/***********************************************************************
43
38
General streaming interface */
45
static ds_ctxt_t *stream_init(const char *root);
46
static ds_file_t *stream_open(ds_ctxt_t *ctxt, const char *path,
40
static ds_ctxt_t *xbstream_init(const char *root);
41
static ds_file_t *xbstream_open(ds_ctxt_t *ctxt, const char *path,
48
static int stream_write(ds_file_t *file, const void *buf, size_t len);
49
static int stream_close(ds_file_t *file);
50
static void stream_deinit(ds_ctxt_t *ctxt);
43
static int xbstream_write(ds_file_t *file, const void *buf, size_t len);
44
static int xbstream_close(ds_file_t *file);
45
static void xbstream_deinit(ds_ctxt_t *ctxt);
52
datasink_t datasink_stream = {
47
datasink_t datasink_xbstream = {
57
my_xbstream_write_callback(xb_wstream_file_t *f __attribute__((unused)),
58
void *userdata, const void *buf, size_t len)
60
ds_stream_ctxt_t *stream_ctxt;
62
stream_ctxt = (ds_stream_ctxt_t *) userdata;
64
xb_ad(stream_ctxt != NULL);
65
xb_ad(stream_ctxt->dest_file != NULL);
67
if (!ds_write(stream_ctxt->dest_file, buf, len)) {
62
stream_init(const char *root __attribute__((unused)))
75
xbstream_init(const char *root __attribute__((unused)))
65
78
ds_stream_ctxt_t *stream_ctxt;
69
82
stream_ctxt = (ds_stream_ctxt_t *)(ctxt + 1);
71
if (xtrabackup_stream_fmt == XB_STREAM_FMT_XBSTREAM) {
73
xb_wstream_t *xbstream;
75
xbstream = xb_stream_write_new();
76
if (xbstream == NULL) {
77
msg("xb_stream_write_new() failed.\n");
80
stream_ctxt->xbstream = xbstream;
85
a = archive_write_new();
87
msg("archive_write_new() failed.\n");
91
if (archive_write_set_compression_none(a) != ARCHIVE_OK ||
92
archive_write_set_format_pax_restricted(a) != ARCHIVE_OK ||
93
/* disable internal buffering so we don't have to flush the
94
output in xtrabackup */
95
archive_write_set_bytes_per_block(a, 0) != ARCHIVE_OK) {
96
msg("failed to set libarchive stream options: %s\n",
97
archive_error_string(a));
98
archive_write_finish(a);
102
if (archive_write_open_fd(a, fileno(stdout)) != ARCHIVE_OK) {
103
msg("cannot open output stream.\n");
106
stream_ctxt->archive = a;
84
xb_wstream_t *xbstream;
86
xbstream = xb_stream_write_new();
87
if (xbstream == NULL) {
88
msg("xb_stream_write_new() failed.\n");
91
stream_ctxt->xbstream = xbstream;
92
stream_ctxt->dest_file = NULL;
110
94
ctxt->ptr = stream_ctxt;
121
stream_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
105
xbstream_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
124
108
ds_stream_file_t *stream_file;
125
109
ds_stream_ctxt_t *stream_ctxt;
110
ds_ctxt_t *dest_ctxt;
112
xb_ad(ctxt->pipe_ctxt != NULL);
113
dest_ctxt = ctxt->pipe_ctxt;
127
115
stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr;
117
if (stream_ctxt->dest_file == NULL) {
118
stream_ctxt->dest_file = ds_open(dest_ctxt, path, mystat);
119
if (stream_ctxt->dest_file == NULL) {
129
124
file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
130
125
sizeof(ds_stream_file_t),
132
127
stream_file = (ds_stream_file_t *) (file + 1);
134
if (xtrabackup_stream_fmt == XB_STREAM_FMT_XBSTREAM) {
135
/* xbstream format */
137
xb_wstream_t *xbstream;
138
xb_wstream_file_t *xbstream_file;
140
xbstream = stream_ctxt->xbstream;
142
xbstream_file = xb_stream_write_open(xbstream, path, mystat);
143
if (xbstream_file == NULL) {
144
msg("xb_stream_write_open() failed.\n");
148
stream_file->xbstream_file = xbstream_file;
153
struct archive_entry *entry;
155
a = stream_ctxt->archive;
157
entry = archive_entry_new();
159
msg("archive_entry_new() failed.\n");
163
archive_entry_set_size(entry, mystat->st_size);
164
archive_entry_set_mode(entry, 0660);
165
archive_entry_set_filetype(entry, AE_IFREG);
166
archive_entry_set_pathname(entry, path);
167
archive_entry_set_mtime(entry, mystat->st_mtime, 0);
169
if (archive_write_header(a, entry) != ARCHIVE_OK) {
170
msg("archive_write_header() failed.\n");
171
archive_entry_free(entry);
174
stream_file->entry = entry;
129
xb_wstream_t *xbstream;
130
xb_wstream_file_t *xbstream_file;
132
xbstream = stream_ctxt->xbstream;
134
xbstream_file = xb_stream_write_open(xbstream, path, mystat,
136
my_xbstream_write_callback);
138
if (xbstream_file == NULL) {
139
msg("xb_stream_write_open() failed.\n");
143
stream_file->xbstream_file = xbstream_file;
177
144
stream_file->stream_ctxt = stream_ctxt;
178
145
file->ptr = stream_file;
179
file->path = "<STDOUT>";
146
file->path = stream_ctxt->dest_file->path;
151
if (stream_ctxt->dest_file) {
152
ds_close(stream_ctxt->dest_file);
153
stream_ctxt->dest_file = NULL;
191
stream_write(ds_file_t *file, const void *buf, size_t len)
162
xbstream_write(ds_file_t *file, const void *buf, size_t len)
193
164
ds_stream_file_t *stream_file;
195
166
stream_file = (ds_stream_file_t *) file->ptr;
197
if (xtrabackup_stream_fmt == XB_STREAM_FMT_XBSTREAM) {
198
/* xbstream format */
200
xb_wstream_file_t *xbstream_file;
202
xbstream_file = stream_file->xbstream_file;
204
if (xb_stream_write_data(xbstream_file, buf, len)) {
205
msg("xb_stream_write_data() failed.\n");
213
a = stream_file->stream_ctxt->archive;
215
if (archive_write_data(a, buf, len) < 0) {
216
msg("archive_write_data() failed: %s (errno = %d)\n",
217
archive_error_string(a), archive_errno(a));
168
xb_wstream_file_t *xbstream_file;
170
xbstream_file = stream_file->xbstream_file;
172
if (xb_stream_write_data(xbstream_file, buf, len)) {
173
msg("xb_stream_write_data() failed.\n");
227
stream_close(ds_file_t *file)
182
xbstream_close(ds_file_t *file)
229
184
ds_stream_file_t *stream_file;
232
187
stream_file = (ds_stream_file_t *)file->ptr;
234
if (xtrabackup_stream_fmt == XB_STREAM_FMT_XBSTREAM) {
235
rc = xb_stream_write_close(stream_file->xbstream_file);
237
archive_entry_free(stream_file->entry);
189
rc = xb_stream_write_close(stream_file->xbstream_file);
247
stream_deinit(ds_ctxt_t *ctxt)
198
xbstream_deinit(ds_ctxt_t *ctxt)
249
200
ds_stream_ctxt_t *stream_ctxt;
251
202
stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr;
253
if (xtrabackup_stream_fmt == XB_STREAM_FMT_XBSTREAM) {
254
if (xb_stream_write_done(stream_ctxt->xbstream)) {
255
msg("xb_stream_done() failed.\n");
260
a = stream_ctxt->archive;
262
if (archive_write_close(a) != ARCHIVE_OK) {
263
msg("archive_write_close() failed.\n");
265
archive_write_finish(a);
204
if (xb_stream_write_done(stream_ctxt->xbstream)) {
205
msg("xb_stream_done() failed.\n");
208
if (stream_ctxt->dest_file) {
209
ds_close(stream_ctxt->dest_file);
210
stream_ctxt->dest_file = NULL;