1
/******************************************************
2
Copyright (c) 2011 Percona Inc.
4
The xbstream format writer implementation.
6
This program is free software; you can redistribute it and/or modify
7
it under the terms of the GNU General Public License as published by
8
the Free Software Foundation; version 2 of the License.
10
This program is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
15
You should have received a copy of the GNU General Public License
16
along with this program; if not, write to the Free Software
17
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
*******************************************************/
26
/* Group writes smaller than this into a single chunk */
27
#define XB_STREAM_MIN_CHUNK_SIZE (64 * 1024)
29
struct xb_wstream_struct {
30
pthread_mutex_t mutex;
34
struct xb_wstream_file_struct {
38
char chunk[XB_STREAM_MIN_CHUNK_SIZE];
44
static int xb_stream_flush(xb_wstream_file_t *file);
45
static int xb_stream_write_chunk(xb_wstream_file_t *file,
46
const void *buf, size_t len);
47
static int xb_stream_write_eof(xb_wstream_file_t *file);
50
xb_stream_write_new(void)
54
stream = (xb_wstream_t *) my_malloc(sizeof(xb_wstream_t), MYF(MY_FAE));
55
pthread_mutex_init(&stream->mutex, NULL);
56
stream->fd = fileno(stdout);
59
setmode(stream->fd, _O_BINARY);
66
xb_stream_write_open(xb_wstream_t *stream, const char *path,
67
MY_STAT *mystat __attribute__((unused)))
69
xb_wstream_file_t *file;
72
path_len = strlen(path);
74
if (path_len > FN_REFLEN) {
75
msg("xb_stream_write_open(): file path is too long.\n");
79
file = (xb_wstream_file_t *) my_malloc(sizeof(xb_wstream_file_t) +
80
path_len + 1, MYF(MY_FAE));
82
file->path = (char *) (file + 1);
83
memcpy(file->path, path, path_len + 1);
84
file->path_len = path_len;
86
file->stream = stream;
88
file->chunk_ptr = file->chunk;
89
file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
95
xb_stream_write_data(xb_wstream_file_t *file, const void *buf, size_t len)
97
if (len < file->chunk_free) {
98
memcpy(file->chunk_ptr, buf, len);
99
file->chunk_ptr += len;
100
file->chunk_free -= len;
105
if (xb_stream_flush(file))
108
return xb_stream_write_chunk(file, buf, len);
112
xb_stream_write_close(xb_wstream_file_t *file)
114
if (xb_stream_flush(file) ||
115
xb_stream_write_eof(file)) {
126
xb_stream_write_done(xb_wstream_t *stream)
128
pthread_mutex_destroy(&stream->mutex);
137
xb_stream_flush(xb_wstream_file_t *file)
139
if (file->chunk_ptr == file->chunk) {
143
if (xb_stream_write_chunk(file, file->chunk,
144
file->chunk_ptr - file->chunk)) {
148
file->chunk_ptr = file->chunk;
149
file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
154
#define F_WRITE(ptr,size) \
156
if (my_write(fd, ptr, size, MYF(MY_WME | MY_NABP))) \
162
xb_stream_write_chunk(xb_wstream_file_t *file, const void *buf, size_t len)
164
/* Chunk magic + flags + chunk type + path_len + path + len + offset +
166
uchar tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
167
FN_REFLEN + 8 + 8 + 4];
169
xb_wstream_t *stream = file->stream;
170
File fd = stream->fd;
173
/* Write xbstream header */
177
memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
178
ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
180
*ptr++ = 0; /* Chunk flags */
182
*ptr++ = (uchar) XB_CHUNK_TYPE_PAYLOAD; /* Chunk type */
184
int4store(ptr, file->path_len); /* Path length */
187
memcpy(ptr, file->path, file->path_len); /* Path */
188
ptr += file->path_len;
190
int8store(ptr, len); /* Payload length */
193
pthread_mutex_lock(&stream->mutex);
195
int8store(ptr, file->offset); /* Payload offset */
198
checksum = crc32(0, buf, len); /* checksum */
199
int4store(ptr, checksum);
202
xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
204
F_WRITE(tmpbuf, ptr - tmpbuf);
206
F_WRITE(buf, len); /* Payload */
210
pthread_mutex_unlock(&stream->mutex);
216
pthread_mutex_unlock(&stream->mutex);
223
xb_stream_write_eof(xb_wstream_file_t *file)
225
/* Chunk magic + flags + chunk type + path_len + path */
226
uchar tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
229
xb_wstream_t *stream = file->stream;
230
File fd = stream->fd;
232
pthread_mutex_lock(&stream->mutex);
234
/* Write xbstream header */
238
memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
239
ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
241
*ptr++ = 0; /* Chunk flags */
243
*ptr++ = (uchar) XB_CHUNK_TYPE_EOF; /* Chunk type */
245
int4store(ptr, file->path_len); /* Path length */
248
memcpy(ptr, file->path, file->path_len); /* Path */
249
ptr += file->path_len;
251
xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
253
F_WRITE(tmpbuf, (ulonglong) (ptr - tmpbuf));;
255
pthread_mutex_unlock(&stream->mutex);
260
pthread_mutex_unlock(&stream->mutex);