~stub/ubuntu/trusty/avro-c/trunk

« back to all changes in this revision

Viewing changes to src/datafile.c

  • Committer: Stuart Bishop
  • Date: 2015-05-14 11:53:53 UTC
  • Revision ID: stuart@stuartbishop.net-20150514115353-0cvnrcyohcq5l7yj
Tags: upstream-1.7.7
ImportĀ upstreamĀ versionĀ 1.7.7

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
 
3
 * contributor license agreements.  See the NOTICE file distributed with
 
4
 * this work for additional information regarding copyright ownership.
 
5
 * The ASF licenses this file to you under the Apache License, Version 2.0 
 
6
 * (the "License"); you may not use this file except in compliance with
 
7
 * the License.  You may obtain a copy of the License at
 
8
 * 
 
9
 * http://www.apache.org/licenses/LICENSE-2.0
 
10
 * 
 
11
 * Unless required by applicable law or agreed to in writing, software
 
12
 * distributed under the License is distributed on an "AS IS" BASIS,
 
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
14
 * implied.  See the License for the specific language governing
 
15
 * permissions and limitations under the License. 
 
16
 */
 
17
 
 
18
#include "avro_private.h"
 
19
#include "avro/allocation.h"
 
20
#include "avro/generic.h"
 
21
#include "avro/errors.h"
 
22
#include "avro/value.h"
 
23
#include "encoding.h"
 
24
#include "codec.h"
 
25
#include <stdio.h>
 
26
#include <stdlib.h>
 
27
#include <errno.h>
 
28
#include <fcntl.h>
 
29
#include <time.h>
 
30
#include <string.h>
 
31
 
 
32
struct avro_file_reader_t_ {
 
33
        avro_schema_t writers_schema;
 
34
        avro_reader_t reader;
 
35
        avro_reader_t block_reader;
 
36
        avro_codec_t codec;
 
37
        char sync[16];
 
38
        int64_t blocks_read;
 
39
        int64_t blocks_total;
 
40
        int64_t current_blocklen;
 
41
        char * current_blockdata;
 
42
};
 
43
 
 
44
struct avro_file_writer_t_ {
 
45
        avro_schema_t writers_schema;
 
46
        avro_writer_t writer;
 
47
        avro_codec_t codec;
 
48
        char sync[16];
 
49
        int block_count;
 
50
        size_t block_size;
 
51
        avro_writer_t datum_writer;
 
52
        char* datum_buffer;
 
53
        size_t datum_buffer_size;
 
54
        char schema_buf[64 * 1024];
 
55
};
 
56
 
 
57
#define DEFAULT_BLOCK_SIZE 16 * 1024
 
58
 
 
59
/* Note: We should not just read /dev/random here, because it may not
 
60
 * exist on all platforms e.g. Win32.
 
61
 */
 
62
static void generate_sync(avro_file_writer_t w)
 
63
{
 
64
        unsigned int i;
 
65
        srand(time(NULL));
 
66
        for (i = 0; i < sizeof(w->sync); i++) {
 
67
                w->sync[i] = ((double)rand() / (RAND_MAX + 1.0)) * 255;
 
68
        }
 
69
}
 
70
 
 
71
static int write_sync(avro_file_writer_t w)
 
72
{
 
73
        return avro_write(w->writer, w->sync, sizeof(w->sync));
 
74
}
 
75
 
 
76
static int write_header(avro_file_writer_t w)
 
77
{
 
78
        int rval;
 
79
        uint8_t version = 1;
 
80
        /* TODO: remove this static buffer */
 
81
        avro_writer_t schema_writer;
 
82
        const avro_encoding_t *enc = &avro_binary_encoding;
 
83
        int64_t schema_len;
 
84
 
 
85
        /* Generate random sync */
 
86
        generate_sync(w);
 
87
 
 
88
        check(rval, avro_write(w->writer, "Obj", 3));
 
89
        check(rval, avro_write(w->writer, &version, 1));
 
90
 
 
91
        check(rval, enc->write_long(w->writer, 2));
 
92
        check(rval, enc->write_string(w->writer, "avro.codec"));
 
93
        check(rval, enc->write_bytes(w->writer, w->codec->name, strlen(w->codec->name)));
 
94
        check(rval, enc->write_string(w->writer, "avro.schema"));
 
95
        schema_writer =
 
96
            avro_writer_memory(&w->schema_buf[0], sizeof(w->schema_buf));
 
97
        rval = avro_schema_to_json(w->writers_schema, schema_writer);
 
98
        if (rval) {
 
99
                avro_writer_free(schema_writer);
 
100
                return rval;
 
101
        }
 
102
        schema_len = avro_writer_tell(schema_writer);
 
103
        avro_writer_free(schema_writer);
 
104
        check(rval,
 
105
              enc->write_bytes(w->writer, w->schema_buf, schema_len));
 
106
        check(rval, enc->write_long(w->writer, 0));
 
107
        return write_sync(w);
 
108
}
 
109
 
 
110
static int
 
111
file_writer_init_fp(FILE *fp, const char *path, int should_close, const char *mode, avro_file_writer_t w)
 
112
{
 
113
        if (!fp) {
 
114
                fp = fopen(path, mode);
 
115
        }
 
116
 
 
117
        if (!fp) {
 
118
                avro_set_error("Cannot open file for %s", path);
 
119
                return ENOMEM;
 
120
        }
 
121
        w->writer = avro_writer_file_fp(fp, should_close);
 
122
        if (!w->writer) {
 
123
                if (should_close) {
 
124
                        fclose(fp);
 
125
                }
 
126
                avro_set_error("Cannot create file writer for %s", path);
 
127
                return ENOMEM;
 
128
        }
 
129
        return 0;
 
130
}
 
131
 
 
132
/* Exclusive file writing is supported by GCC using the mode
 
133
 * "wx". Win32 does not support exclusive file writing, so for win32
 
134
 * fall back to the non-exclusive file writing.
 
135
 */
 
136
#ifdef _WIN32
 
137
  #define EXCLUSIVE_WRITE_MODE   "wb"
 
138
#else
 
139
  #define EXCLUSIVE_WRITE_MODE   "wbx"
 
140
#endif
 
141
 
 
142
static int
 
143
file_writer_create(FILE *fp, const char *path, int should_close, avro_schema_t schema, avro_file_writer_t w, size_t block_size)
 
144
{
 
145
        int rval;
 
146
 
 
147
        w->block_count = 0;
 
148
        rval = file_writer_init_fp(fp, path, should_close, EXCLUSIVE_WRITE_MODE, w);
 
149
        if (rval) {
 
150
                check(rval, file_writer_init_fp(fp, path, should_close, "wb", w));
 
151
        }
 
152
 
 
153
        w->datum_buffer_size = block_size;
 
154
        w->datum_buffer = (char *) avro_malloc(w->datum_buffer_size);
 
155
 
 
156
        if(!w->datum_buffer) {
 
157
                avro_set_error("Could not allocate datum buffer\n");
 
158
                avro_writer_free(w->writer);
 
159
                return ENOMEM;
 
160
        }
 
161
 
 
162
        w->datum_writer =
 
163
            avro_writer_memory(w->datum_buffer, w->datum_buffer_size);
 
164
        if (!w->datum_writer) {
 
165
                avro_set_error("Cannot create datum writer for file %s", path);
 
166
                avro_writer_free(w->writer);
 
167
                avro_free(w->datum_buffer, w->datum_buffer_size);
 
168
                return ENOMEM;
 
169
        }
 
170
 
 
171
        w->writers_schema = avro_schema_incref(schema);
 
172
        return write_header(w);
 
173
}
 
174
 
 
175
int
 
176
avro_file_writer_create(const char *path, avro_schema_t schema,
 
177
                        avro_file_writer_t * writer)
 
178
{
 
179
        return avro_file_writer_create_with_codec_fp(NULL, path, 1, schema, writer, "null", 0);
 
180
}
 
181
 
 
182
int
 
183
avro_file_writer_create_fp(FILE *fp, const char *path, int should_close, avro_schema_t schema,
 
184
                        avro_file_writer_t * writer)
 
185
{
 
186
        return avro_file_writer_create_with_codec_fp(fp, path, should_close, schema, writer, "null", 0);
 
187
}
 
188
 
 
189
int avro_file_writer_create_with_codec(const char *path,
 
190
                        avro_schema_t schema, avro_file_writer_t * writer,
 
191
                        const char *codec, size_t block_size)
 
192
{
 
193
        return avro_file_writer_create_with_codec_fp(NULL, path, 1, schema, writer, codec, block_size);
 
194
}
 
195
 
 
196
int avro_file_writer_create_with_codec_fp(FILE *fp, const char *path, int should_close,
 
197
                        avro_schema_t schema, avro_file_writer_t * writer,
 
198
                        const char *codec, size_t block_size)
 
199
{
 
200
        avro_file_writer_t w;
 
201
        int rval;
 
202
        check_param(EINVAL, path, "path");
 
203
        check_param(EINVAL, is_avro_schema(schema), "schema");
 
204
        check_param(EINVAL, writer, "writer");
 
205
        check_param(EINVAL, codec, "codec");
 
206
 
 
207
        if (block_size == 0) {
 
208
                block_size = DEFAULT_BLOCK_SIZE;
 
209
        }
 
210
 
 
211
        w = (avro_file_writer_t) avro_new(struct avro_file_writer_t_);
 
212
        if (!w) {
 
213
                avro_set_error("Cannot allocate new file writer");
 
214
                return ENOMEM;
 
215
        }
 
216
        w->codec = (avro_codec_t) avro_new(struct avro_codec_t_);
 
217
        if (!w->codec) {
 
218
                avro_set_error("Cannot allocate new codec");
 
219
                avro_freet(struct avro_file_writer_t_, w);
 
220
                return ENOMEM;
 
221
        }
 
222
        rval = avro_codec(w->codec, codec);
 
223
        if (rval) {
 
224
                avro_codec_reset(w->codec);
 
225
                avro_freet(struct avro_codec_t_, w->codec);
 
226
                avro_freet(struct avro_file_writer_t_, w);
 
227
                return rval;
 
228
        }
 
229
        rval = file_writer_create(fp, path, should_close, schema, w, block_size);
 
230
        if (rval) {
 
231
                avro_codec_reset(w->codec);
 
232
                avro_freet(struct avro_codec_t_, w->codec);
 
233
                avro_freet(struct avro_file_writer_t_, w);
 
234
                return rval;
 
235
        }
 
236
        *writer = w;
 
237
 
 
238
        return 0;
 
239
}
 
240
 
 
241
static int file_read_header(avro_reader_t reader,
 
242
                            avro_schema_t * writers_schema, avro_codec_t codec,
 
243
                            char *sync, int synclen)
 
244
{
 
245
        int rval;
 
246
        avro_schema_t meta_schema;
 
247
        avro_schema_t meta_values_schema;
 
248
        avro_value_iface_t *meta_iface;
 
249
        avro_value_t meta;
 
250
        char magic[4];
 
251
        avro_value_t codec_val;
 
252
        avro_value_t schema_bytes;
 
253
        const void *p;
 
254
        size_t len;
 
255
 
 
256
        check(rval, avro_read(reader, magic, sizeof(magic)));
 
257
        if (magic[0] != 'O' || magic[1] != 'b' || magic[2] != 'j'
 
258
            || magic[3] != 1) {
 
259
                avro_set_error("Incorrect Avro container file magic number");
 
260
                return EILSEQ;
 
261
        }
 
262
 
 
263
        meta_values_schema = avro_schema_bytes();
 
264
        meta_schema = avro_schema_map(meta_values_schema);
 
265
        meta_iface = avro_generic_class_from_schema(meta_schema);
 
266
        if (meta_iface == NULL) {
 
267
                return EILSEQ;
 
268
        }
 
269
        check(rval, avro_generic_value_new(meta_iface, &meta));
 
270
        rval = avro_value_read(reader, &meta);
 
271
        if (rval) {
 
272
                avro_prefix_error("Cannot read file header: ");
 
273
                return EILSEQ;
 
274
        }
 
275
        avro_schema_decref(meta_schema);
 
276
 
 
277
        rval = avro_value_get_by_name(&meta, "avro.codec", &codec_val, NULL);
 
278
        if (rval) {
 
279
                if (avro_codec(codec, NULL) != 0) {
 
280
                        avro_set_error("Codec not specified in header and unable to set 'null' codec");
 
281
                        avro_value_decref(&meta);
 
282
                        return EILSEQ;
 
283
                }
 
284
        } else {
 
285
                const void *buf;
 
286
                size_t size;
 
287
                char codec_name[11];
 
288
 
 
289
                avro_type_t type = avro_value_get_type(&codec_val);
 
290
 
 
291
                if (type != AVRO_BYTES) {
 
292
                        avro_set_error("Value type of codec is unexpected");
 
293
                        avro_value_decref(&meta);
 
294
                        return EILSEQ;
 
295
                }
 
296
 
 
297
                avro_value_get_bytes(&codec_val, &buf, &size);
 
298
                memset(codec_name, 0, sizeof(codec_name));
 
299
                strncpy(codec_name, (const char *) buf, size < 10 ? size : 10);
 
300
 
 
301
                if (avro_codec(codec, codec_name) != 0) {
 
302
                        avro_set_error("File header contains an unknown codec");
 
303
                        avro_value_decref(&meta);
 
304
                        return EILSEQ;
 
305
                }
 
306
        }
 
307
 
 
308
        rval = avro_value_get_by_name(&meta, "avro.schema", &schema_bytes, NULL);
 
309
        if (rval) {
 
310
                avro_set_error("File header doesn't contain a schema");
 
311
                avro_value_decref(&meta);
 
312
                return EILSEQ;
 
313
        }
 
314
 
 
315
        avro_value_get_bytes(&schema_bytes, &p, &len);
 
316
        rval = avro_schema_from_json_length((const char *) p, len, writers_schema);
 
317
        if (rval) {
 
318
                avro_prefix_error("Cannot parse file header: ");
 
319
                avro_value_decref(&meta);
 
320
                return rval;
 
321
        }
 
322
 
 
323
        avro_value_decref(&meta);
 
324
        avro_value_iface_decref(meta_iface);
 
325
        return avro_read(reader, sync, synclen);
 
326
}
 
327
 
 
328
static int
 
329
file_writer_open(const char *path, avro_file_writer_t w, size_t block_size)
 
330
{
 
331
        int rval;
 
332
        FILE *fp;
 
333
        avro_reader_t reader;
 
334
 
 
335
        /* Open for read AND write */
 
336
        fp = fopen(path, "r+b");
 
337
        if (!fp) {
 
338
                avro_set_error("Error opening file: %s",
 
339
                               strerror(errno));
 
340
                return errno;
 
341
        }
 
342
 
 
343
        /* Don`t close the underlying file descriptor, logrotate can
 
344
         * vanish it from sight. */
 
345
        reader = avro_reader_file_fp(fp, 0);
 
346
        if (!reader) {
 
347
                fclose(fp);
 
348
                avro_set_error("Cannot create file reader for %s", path);
 
349
                return ENOMEM;
 
350
        }
 
351
        rval =
 
352
            file_read_header(reader, &w->writers_schema, w->codec, w->sync,
 
353
                             sizeof(w->sync));
 
354
 
 
355
        avro_reader_free(reader);
 
356
        if (rval) {
 
357
                fclose(fp);
 
358
                return rval;
 
359
        }
 
360
 
 
361
        w->block_count = 0;
 
362
 
 
363
        /* Position to end of file and get ready to write */
 
364
        fseek(fp, 0, SEEK_END);
 
365
 
 
366
        w->writer = avro_writer_file(fp);
 
367
        if (!w->writer) {
 
368
                fclose(fp);
 
369
                avro_set_error("Cannot create file writer for %s", path);
 
370
                return ENOMEM;
 
371
        }
 
372
 
 
373
        if (block_size == 0) {
 
374
                block_size = DEFAULT_BLOCK_SIZE;
 
375
        }
 
376
 
 
377
        w->datum_buffer_size = block_size;
 
378
        w->datum_buffer = (char *) avro_malloc(w->datum_buffer_size);
 
379
 
 
380
        if(!w->datum_buffer) {
 
381
                avro_set_error("Could not allocate datum buffer\n");
 
382
                avro_writer_free(w->writer);
 
383
                return ENOMEM;
 
384
        }
 
385
 
 
386
        w->datum_writer =
 
387
            avro_writer_memory(w->datum_buffer, w->datum_buffer_size);
 
388
        if (!w->datum_writer) {
 
389
                avro_set_error("Cannot create datum writer for file %s", path);
 
390
                avro_writer_free(w->writer);
 
391
                avro_free(w->datum_buffer, w->datum_buffer_size);
 
392
                return ENOMEM;
 
393
        }
 
394
 
 
395
        return 0;
 
396
}
 
397
 
 
398
int
 
399
avro_file_writer_open_bs(const char *path, avro_file_writer_t * writer,
 
400
                         size_t block_size)
 
401
{
 
402
        avro_file_writer_t w;
 
403
        int rval;
 
404
        check_param(EINVAL, path, "path");
 
405
        check_param(EINVAL, writer, "writer");
 
406
 
 
407
        w = (avro_file_writer_t) avro_new(struct avro_file_writer_t_);
 
408
        if (!w) {
 
409
                avro_set_error("Cannot create new file writer for %s", path);
 
410
                return ENOMEM;
 
411
        }
 
412
        w->codec = (avro_codec_t) avro_new(struct avro_codec_t_);
 
413
        if (!w->codec) {
 
414
                avro_set_error("Cannot allocate new codec");
 
415
                avro_freet(struct avro_file_writer_t_, w);
 
416
                return ENOMEM;
 
417
        }
 
418
        avro_codec(w->codec, NULL);
 
419
        rval = file_writer_open(path, w, block_size);
 
420
        if (rval) {
 
421
                avro_codec_reset(w->codec);
 
422
                avro_freet(struct avro_codec_t_, w->codec);
 
423
                avro_freet(struct avro_file_writer_t_, w);
 
424
                return rval;
 
425
        }
 
426
 
 
427
        *writer = w;
 
428
        return 0;
 
429
}
 
430
 
 
431
int
 
432
avro_file_writer_open(const char *path, avro_file_writer_t * writer)
 
433
{
 
434
        return avro_file_writer_open_bs(path, writer, 0);
 
435
}
 
436
 
 
437
static int file_read_block_count(avro_file_reader_t r)
 
438
{
 
439
        int rval;
 
440
        int64_t len;
 
441
        const avro_encoding_t *enc = &avro_binary_encoding;
 
442
        check_prefix(rval, enc->read_long(r->reader, &r->blocks_total),
 
443
                     "Cannot read file block count: ");
 
444
        check_prefix(rval, enc->read_long(r->reader, &len),
 
445
                     "Cannot read file block size: ");
 
446
 
 
447
        if (r->current_blockdata && len > r->current_blocklen) {
 
448
                r->current_blockdata = (char *) avro_realloc(r->current_blockdata, r->current_blocklen, len);
 
449
                r->current_blocklen = len;
 
450
        } else if (!r->current_blockdata) {
 
451
                r->current_blockdata = (char *) avro_malloc(len);
 
452
                r->current_blocklen = len;
 
453
        }
 
454
 
 
455
        check_prefix(rval, avro_read(r->reader, r->current_blockdata, len),
 
456
                     "Cannot read file block: ");
 
457
 
 
458
        check_prefix(rval, avro_codec_decode(r->codec, r->current_blockdata, len),
 
459
                     "Cannot decode file block: ");
 
460
 
 
461
        avro_reader_memory_set_source(r->block_reader, (const char *) r->codec->block_data, r->codec->used_size);
 
462
 
 
463
        r->blocks_read = 0;
 
464
        return 0;
 
465
}
 
466
 
 
467
int avro_file_reader_fp(FILE *fp, const char *path, int should_close,
 
468
                        avro_file_reader_t * reader)
 
469
{
 
470
        int rval;
 
471
        avro_file_reader_t r = (avro_file_reader_t) avro_new(struct avro_file_reader_t_);
 
472
        if (!r) {
 
473
                if (should_close) {
 
474
                        fclose(fp);
 
475
                }
 
476
                avro_set_error("Cannot allocate file reader for %s", path);
 
477
                return ENOMEM;
 
478
        }
 
479
 
 
480
        r->reader = avro_reader_file_fp(fp, should_close);
 
481
        if (!r->reader) {
 
482
                if (should_close) {
 
483
                        fclose(fp);
 
484
                }
 
485
                avro_set_error("Cannot allocate reader for file %s", path);
 
486
                avro_freet(struct avro_file_reader_t_, r);
 
487
                return ENOMEM;
 
488
        }
 
489
        r->block_reader = avro_reader_memory(0, 0);
 
490
        if (!r->block_reader) {
 
491
                avro_set_error("Cannot allocate block reader for file %s", path);
 
492
                avro_reader_free(r->reader);
 
493
                avro_freet(struct avro_file_reader_t_, r);
 
494
                return ENOMEM;
 
495
        }
 
496
 
 
497
        r->codec = (avro_codec_t) avro_new(struct avro_codec_t_);
 
498
        if (!r->codec) {
 
499
                avro_set_error("Could not allocate codec for file %s", path);
 
500
                avro_reader_free(r->reader);
 
501
                avro_freet(struct avro_file_reader_t_, r);
 
502
                return ENOMEM;
 
503
        }
 
504
        avro_codec(r->codec, NULL);
 
505
 
 
506
        rval = file_read_header(r->reader, &r->writers_schema, r->codec,
 
507
                                r->sync, sizeof(r->sync));
 
508
        if (rval) {
 
509
                avro_reader_free(r->reader);
 
510
                avro_codec_reset(r->codec);
 
511
                avro_freet(struct avro_codec_t_, r->codec);
 
512
                avro_freet(struct avro_file_reader_t_, r);
 
513
                return rval;
 
514
        }
 
515
 
 
516
        r->current_blockdata = NULL;
 
517
        r->current_blocklen = 0;
 
518
 
 
519
        rval = file_read_block_count(r);
 
520
        if (rval) {
 
521
                avro_reader_free(r->reader);
 
522
                avro_codec_reset(r->codec);
 
523
                avro_freet(struct avro_codec_t_, r->codec);
 
524
                avro_freet(struct avro_file_reader_t_, r);
 
525
                return rval;
 
526
        }
 
527
 
 
528
        *reader = r;
 
529
        return rval;
 
530
}
 
531
 
 
532
int avro_file_reader(const char *path, avro_file_reader_t * reader)
 
533
{
 
534
        FILE *fp;
 
535
 
 
536
        fp = fopen(path, "rb");
 
537
        if (!fp) {
 
538
                return errno;
 
539
        }
 
540
 
 
541
        return avro_file_reader_fp(fp, path, 1, reader);
 
542
}
 
543
 
 
544
avro_schema_t
 
545
avro_file_reader_get_writer_schema(avro_file_reader_t r)
 
546
{
 
547
        check_param(NULL, r, "reader");
 
548
        return avro_schema_incref(r->writers_schema);
 
549
}
 
550
 
 
551
static int file_write_block(avro_file_writer_t w)
 
552
{
 
553
        const avro_encoding_t *enc = &avro_binary_encoding;
 
554
        int rval;
 
555
 
 
556
        if (w->block_count) {
 
557
                /* Write the block count */
 
558
                check_prefix(rval, enc->write_long(w->writer, w->block_count),
 
559
                             "Cannot write file block count: ");
 
560
                /* Encode the block */
 
561
                check_prefix(rval, avro_codec_encode(w->codec, w->datum_buffer, w->block_size),
 
562
                             "Cannot encode file block: ");
 
563
                /* Write the block length */
 
564
                check_prefix(rval, enc->write_long(w->writer, w->codec->used_size),
 
565
                             "Cannot write file block size: ");
 
566
                /* Write the block */
 
567
                check_prefix(rval, avro_write(w->writer, w->codec->block_data, w->codec->used_size),
 
568
                             "Cannot write file block: ");
 
569
                /* Write the sync marker */
 
570
                check_prefix(rval, write_sync(w),
 
571
                             "Cannot write sync marker: ");
 
572
                /* Reset the datum writer */
 
573
                avro_writer_reset(w->datum_writer);
 
574
                w->block_count = 0;
 
575
                w->block_size = 0;
 
576
        }
 
577
        return 0;
 
578
}
 
579
 
 
580
int avro_file_writer_append(avro_file_writer_t w, avro_datum_t datum)
 
581
{
 
582
        int rval;
 
583
        check_param(EINVAL, w, "writer");
 
584
        check_param(EINVAL, datum, "datum");
 
585
 
 
586
        rval = avro_write_data(w->datum_writer, w->writers_schema, datum);
 
587
        if (rval) {
 
588
                check(rval, file_write_block(w));
 
589
                rval =
 
590
                    avro_write_data(w->datum_writer, w->writers_schema, datum);
 
591
                if (rval) {
 
592
                        avro_set_error("Datum too large for file block size");
 
593
                        /* TODO: if the datum encoder larger than our buffer,
 
594
                           just write a single large datum */
 
595
                        return rval;
 
596
                }
 
597
        }
 
598
        w->block_count++;
 
599
        w->block_size = avro_writer_tell(w->datum_writer);
 
600
        return 0;
 
601
}
 
602
 
 
603
int
 
604
avro_file_writer_append_value(avro_file_writer_t w, avro_value_t *value)
 
605
{
 
606
        int rval;
 
607
        check_param(EINVAL, w, "writer");
 
608
        check_param(EINVAL, value, "value");
 
609
 
 
610
        rval = avro_value_write(w->datum_writer, value);
 
611
        if (rval) {
 
612
                check(rval, file_write_block(w));
 
613
                rval = avro_value_write(w->datum_writer, value);
 
614
                if (rval) {
 
615
                        avro_set_error("Value too large for file block size");
 
616
                        /* TODO: if the value encoder larger than our buffer,
 
617
                           just write a single large datum */
 
618
                        return rval;
 
619
                }
 
620
        }
 
621
        w->block_count++;
 
622
        w->block_size = avro_writer_tell(w->datum_writer);
 
623
        return 0;
 
624
}
 
625
 
 
626
int
 
627
avro_file_writer_append_encoded(avro_file_writer_t w,
 
628
                                const void *buf, int64_t len)
 
629
{
 
630
        int rval;
 
631
        check_param(EINVAL, w, "writer");
 
632
 
 
633
        rval = avro_write(w->datum_writer, (void *) buf, len);
 
634
        if (rval) {
 
635
                check(rval, file_write_block(w));
 
636
                rval = avro_write(w->datum_writer, (void *) buf, len);
 
637
                if (rval) {
 
638
                        avro_set_error("Value too large for file block size");
 
639
                        /* TODO: if the value encoder larger than our buffer,
 
640
                           just write a single large datum */
 
641
                        return rval;
 
642
                }
 
643
        }
 
644
        w->block_count++;
 
645
        w->block_size = avro_writer_tell(w->datum_writer);
 
646
        return 0;
 
647
}
 
648
 
 
649
int avro_file_writer_sync(avro_file_writer_t w)
 
650
{
 
651
        return file_write_block(w);
 
652
}
 
653
 
 
654
int avro_file_writer_flush(avro_file_writer_t w)
 
655
{
 
656
        int rval;
 
657
        check(rval, file_write_block(w));
 
658
        avro_writer_flush(w->writer);
 
659
        return 0;
 
660
}
 
661
 
 
662
int avro_file_writer_close(avro_file_writer_t w)
 
663
{
 
664
        int rval;
 
665
        check(rval, avro_file_writer_flush(w));
 
666
        avro_schema_decref(w->writers_schema);
 
667
        avro_writer_free(w->datum_writer);
 
668
        avro_writer_free(w->writer);
 
669
        avro_free(w->datum_buffer, w->datum_buffer_size);
 
670
        avro_codec_reset(w->codec);
 
671
        avro_freet(struct avro_codec_t_, w->codec);
 
672
        avro_freet(struct avro_file_writer_t_, w);
 
673
        return 0;
 
674
}
 
675
 
 
676
int avro_file_reader_read(avro_file_reader_t r, avro_schema_t readers_schema,
 
677
                          avro_datum_t * datum)
 
678
{
 
679
        int rval;
 
680
        char sync[16];
 
681
 
 
682
        check_param(EINVAL, r, "reader");
 
683
        check_param(EINVAL, datum, "datum");
 
684
 
 
685
        check(rval,
 
686
              avro_read_data(r->block_reader, r->writers_schema, readers_schema,
 
687
                             datum));
 
688
        r->blocks_read++;
 
689
 
 
690
        if (r->blocks_read == r->blocks_total) {
 
691
                check(rval, avro_read(r->reader, sync, sizeof(sync)));
 
692
                if (memcmp(r->sync, sync, sizeof(r->sync)) != 0) {
 
693
                        /* wrong sync bytes */
 
694
                        avro_set_error("Incorrect sync bytes");
 
695
                        return EILSEQ;
 
696
                }
 
697
                /* For now, ignore errors (e.g. EOF) */
 
698
                file_read_block_count(r);
 
699
        }
 
700
        return 0;
 
701
}
 
702
 
 
703
int
 
704
avro_file_reader_read_value(avro_file_reader_t r, avro_value_t *value)
 
705
{
 
706
        int rval;
 
707
        char sync[16];
 
708
 
 
709
        check_param(EINVAL, r, "reader");
 
710
        check_param(EINVAL, value, "value");
 
711
 
 
712
        if (r->blocks_read == r->blocks_total) {
 
713
                check(rval, avro_read(r->reader, sync, sizeof(sync)));
 
714
                if (memcmp(r->sync, sync, sizeof(r->sync)) != 0) {
 
715
                        /* wrong sync bytes */
 
716
                        avro_set_error("Incorrect sync bytes");
 
717
                        return EILSEQ;
 
718
                }
 
719
 
 
720
                /* Did we just hit the end of the file? */
 
721
                if (avro_reader_is_eof(r->reader))
 
722
                        return EOF;
 
723
 
 
724
                check(rval, file_read_block_count(r));
 
725
        }
 
726
 
 
727
        check(rval, avro_value_read(r->block_reader, value));
 
728
        r->blocks_read++;
 
729
 
 
730
        return 0;
 
731
}
 
732
 
 
733
int avro_file_reader_close(avro_file_reader_t reader)
 
734
{
 
735
        avro_schema_decref(reader->writers_schema);
 
736
        avro_reader_free(reader->reader);
 
737
        avro_reader_free(reader->block_reader);
 
738
        avro_codec_reset(reader->codec);
 
739
        avro_freet(struct avro_codec_t_, reader->codec);
 
740
        if (reader->current_blockdata) {
 
741
                avro_free(reader->current_blockdata, reader->current_blocklen);
 
742
        }
 
743
        avro_freet(struct avro_file_reader_t_, reader);
 
744
        return 0;
 
745
}