~sergei.glushchenko/percona-xtrabackup/xb-pprint

« back to all changes in this revision

Viewing changes to src/xbstream_write.c

merge parallel compression branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/******************************************************
 
2
Copyright (c) 2011 Percona Inc.
 
3
 
 
4
The xbstream format writer implementation.
 
5
 
 
6
This program is free software; you can redistribute it and/or modify
 
7
it under the terms of the GNU General Public License as published by
 
8
the Free Software Foundation; version 2 of the License.
 
9
 
 
10
This program is distributed in the hope that it will be useful,
 
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
GNU General Public License for more details.
 
14
 
 
15
You should have received a copy of the GNU General Public License
 
16
along with this program; if not, write to the Free Software
 
17
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 
 
19
*******************************************************/
 
20
 
 
21
#include <my_base.h>
 
22
#include <zlib.h>
 
23
#include "common.h"
 
24
#include "xbstream.h"
 
25
 
 
26
/* Group writes smaller than this into a single chunk */
 
27
#define XB_STREAM_MIN_CHUNK_SIZE (64 * 1024)
 
28
 
 
29
struct xb_wstream_struct {
 
30
        pthread_mutex_t mutex;
 
31
        File            fd;
 
32
};
 
33
 
 
34
struct xb_wstream_file_struct {
 
35
        xb_wstream_t    *stream;
 
36
        char            *path;
 
37
        ulong           path_len;
 
38
        char            chunk[XB_STREAM_MIN_CHUNK_SIZE];
 
39
        char            *chunk_ptr;
 
40
        size_t          chunk_free;
 
41
        my_off_t        offset;
 
42
};
 
43
 
 
44
static int xb_stream_flush(xb_wstream_file_t *file);
 
45
static int xb_stream_write_chunk(xb_wstream_file_t *file,
 
46
                                 const void *buf, size_t len);
 
47
static int xb_stream_write_eof(xb_wstream_file_t *file);
 
48
 
 
49
xb_wstream_t *
 
50
xb_stream_write_new(void)
 
51
{
 
52
        xb_wstream_t    *stream;
 
53
 
 
54
        stream = (xb_wstream_t *) my_malloc(sizeof(xb_wstream_t), MYF(MY_FAE));
 
55
        pthread_mutex_init(&stream->mutex, NULL);
 
56
        stream->fd = fileno(stdout);
 
57
 
 
58
#ifdef __WIN__
 
59
        setmode(stream->fd, _O_BINARY);
 
60
#endif
 
61
 
 
62
        return stream;;
 
63
}
 
64
 
 
65
xb_wstream_file_t *
 
66
xb_stream_write_open(xb_wstream_t *stream, const char *path,
 
67
                     MY_STAT *mystat __attribute__((unused)))
 
68
{
 
69
        xb_wstream_file_t       *file;
 
70
        ulong                   path_len;
 
71
 
 
72
        path_len = strlen(path);
 
73
 
 
74
        if (path_len > FN_REFLEN) {
 
75
                msg("xb_stream_write_open(): file path is too long.\n");
 
76
                return NULL;
 
77
        }
 
78
 
 
79
        file = (xb_wstream_file_t *) my_malloc(sizeof(xb_wstream_file_t) +
 
80
                                               path_len + 1, MYF(MY_FAE));
 
81
 
 
82
        file->path = (char *) (file + 1);
 
83
        memcpy(file->path, path, path_len + 1);
 
84
        file->path_len = path_len;
 
85
 
 
86
        file->stream = stream;
 
87
        file->offset = 0;
 
88
        file->chunk_ptr = file->chunk;
 
89
        file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
 
90
 
 
91
        return file;
 
92
}
 
93
 
 
94
int
 
95
xb_stream_write_data(xb_wstream_file_t *file, const void *buf, size_t len)
 
96
{
 
97
        if (len < file->chunk_free) {
 
98
                memcpy(file->chunk_ptr, buf, len);
 
99
                file->chunk_ptr += len;
 
100
                file->chunk_free -= len;
 
101
 
 
102
                return 0;
 
103
        }
 
104
 
 
105
        if (xb_stream_flush(file))
 
106
                return 1;
 
107
 
 
108
        return xb_stream_write_chunk(file, buf, len);
 
109
}
 
110
 
 
111
int
 
112
xb_stream_write_close(xb_wstream_file_t *file)
 
113
{
 
114
        if (xb_stream_flush(file) ||
 
115
            xb_stream_write_eof(file)) {
 
116
                MY_FREE(file);
 
117
                return 1;
 
118
        }
 
119
 
 
120
        MY_FREE(file);
 
121
 
 
122
        return 0;
 
123
}
 
124
 
 
125
int
 
126
xb_stream_write_done(xb_wstream_t *stream)
 
127
{
 
128
        pthread_mutex_destroy(&stream->mutex);
 
129
 
 
130
        MY_FREE(stream);
 
131
 
 
132
        return 0;
 
133
}
 
134
 
 
135
static
 
136
int
 
137
xb_stream_flush(xb_wstream_file_t *file)
 
138
{
 
139
        if (file->chunk_ptr == file->chunk) {
 
140
                return 0;
 
141
        }
 
142
 
 
143
        if (xb_stream_write_chunk(file, file->chunk,
 
144
                                  file->chunk_ptr - file->chunk)) {
 
145
                return 1;
 
146
        }
 
147
 
 
148
        file->chunk_ptr = file->chunk;
 
149
        file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
 
150
 
 
151
        return 0;
 
152
}
 
153
 
 
154
#define F_WRITE(ptr,size)                                               \
 
155
        do {                                                            \
 
156
                if (my_write(fd, ptr, size, MYF(MY_WME | MY_NABP)))     \
 
157
                        goto err;                                       \
 
158
        } while (0)
 
159
 
 
160
static
 
161
int
 
162
xb_stream_write_chunk(xb_wstream_file_t *file, const void *buf, size_t len)
 
163
{
 
164
        /* Chunk magic + flags + chunk type + path_len + path + len + offset +
 
165
        checksum */
 
166
        uchar           tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
 
167
                               FN_REFLEN + 8 + 8 + 4];
 
168
        uchar           *ptr;
 
169
        xb_wstream_t    *stream = file->stream;
 
170
        File            fd = stream->fd;
 
171
        ulong           checksum;
 
172
 
 
173
        /* Write xbstream header */
 
174
        ptr = tmpbuf;
 
175
 
 
176
        /* Chunk magic */
 
177
        memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
 
178
        ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
 
179
 
 
180
        *ptr++ = 0;                              /* Chunk flags */
 
181
 
 
182
        *ptr++ = (uchar) XB_CHUNK_TYPE_PAYLOAD;  /* Chunk type */
 
183
 
 
184
        int4store(ptr, file->path_len);          /* Path length */
 
185
        ptr += 4;
 
186
 
 
187
        memcpy(ptr, file->path, file->path_len); /* Path */
 
188
        ptr += file->path_len;
 
189
 
 
190
        int8store(ptr, len);                     /* Payload length */
 
191
        ptr += 8;
 
192
 
 
193
        pthread_mutex_lock(&stream->mutex);
 
194
 
 
195
        int8store(ptr, file->offset);            /* Payload offset */
 
196
        ptr += 8;
 
197
 
 
198
        checksum = crc32(0, buf, len);           /* checksum */
 
199
        int4store(ptr, checksum);
 
200
        ptr += 4;
 
201
 
 
202
        xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
 
203
 
 
204
        F_WRITE(tmpbuf, ptr - tmpbuf);
 
205
 
 
206
        F_WRITE(buf, len);                       /* Payload */
 
207
 
 
208
        file->offset+= len;
 
209
 
 
210
        pthread_mutex_unlock(&stream->mutex);
 
211
 
 
212
        return 0;
 
213
 
 
214
err:
 
215
 
 
216
        pthread_mutex_unlock(&stream->mutex);
 
217
 
 
218
        return 1;
 
219
}
 
220
 
 
221
static
 
222
int
 
223
xb_stream_write_eof(xb_wstream_file_t *file)
 
224
{
 
225
        /* Chunk magic + flags + chunk type + path_len + path */
 
226
        uchar           tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
 
227
                               FN_REFLEN];
 
228
        uchar           *ptr;
 
229
        xb_wstream_t    *stream = file->stream;
 
230
        File            fd = stream->fd;
 
231
 
 
232
        pthread_mutex_lock(&stream->mutex);
 
233
 
 
234
        /* Write xbstream header */
 
235
        ptr = tmpbuf;
 
236
 
 
237
        /* Chunk magic */
 
238
        memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
 
239
        ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
 
240
 
 
241
        *ptr++ = 0;                              /* Chunk flags */
 
242
 
 
243
        *ptr++ = (uchar) XB_CHUNK_TYPE_EOF;      /* Chunk type */
 
244
 
 
245
        int4store(ptr, file->path_len);          /* Path length */
 
246
        ptr += 4;
 
247
 
 
248
        memcpy(ptr, file->path, file->path_len); /* Path */
 
249
        ptr += file->path_len;
 
250
 
 
251
        xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
 
252
 
 
253
        F_WRITE(tmpbuf, (ulonglong) (ptr - tmpbuf));;
 
254
 
 
255
        pthread_mutex_unlock(&stream->mutex);
 
256
 
 
257
        return 0;
 
258
err:
 
259
 
 
260
        pthread_mutex_unlock(&stream->mutex);
 
261
 
 
262
        return 1;
 
263
}