2
* Licensed to the Apache Software Foundation (ASF) under one or more
3
* contributor license agreements. See the NOTICE file distributed with
4
* this work for additional information regarding copyright ownership.
5
* The ASF licenses this file to you under the Apache License, Version 2.0
6
* (the "License"); you may not use this file except in compliance with
7
* the License. You may obtain a copy of the License at
9
* http://www.apache.org/licenses/LICENSE-2.0
11
* Unless required by applicable law or agreed to in writing, software
12
* distributed under the License is distributed on an "AS IS" BASIS,
13
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14
* implied. See the License for the specific language governing
15
* permissions and limitations under the License.
18
#include "avro_private.h"
19
#include "avro/allocation.h"
20
#include "avro/generic.h"
21
#include "avro/errors.h"
22
#include "avro/value.h"
32
struct avro_file_reader_t_ {
33
avro_schema_t writers_schema;
35
avro_reader_t block_reader;
40
int64_t current_blocklen;
41
char * current_blockdata;
44
struct avro_file_writer_t_ {
45
avro_schema_t writers_schema;
51
avro_writer_t datum_writer;
53
size_t datum_buffer_size;
54
char schema_buf[64 * 1024];
57
#define DEFAULT_BLOCK_SIZE 16 * 1024
59
/* Note: We should not just read /dev/random here, because it may not
60
* exist on all platforms e.g. Win32.
62
static void generate_sync(avro_file_writer_t w)
66
for (i = 0; i < sizeof(w->sync); i++) {
67
w->sync[i] = ((double)rand() / (RAND_MAX + 1.0)) * 255;
71
static int write_sync(avro_file_writer_t w)
73
return avro_write(w->writer, w->sync, sizeof(w->sync));
76
static int write_header(avro_file_writer_t w)
80
/* TODO: remove this static buffer */
81
avro_writer_t schema_writer;
82
const avro_encoding_t *enc = &avro_binary_encoding;
85
/* Generate random sync */
88
check(rval, avro_write(w->writer, "Obj", 3));
89
check(rval, avro_write(w->writer, &version, 1));
91
check(rval, enc->write_long(w->writer, 2));
92
check(rval, enc->write_string(w->writer, "avro.codec"));
93
check(rval, enc->write_bytes(w->writer, w->codec->name, strlen(w->codec->name)));
94
check(rval, enc->write_string(w->writer, "avro.schema"));
96
avro_writer_memory(&w->schema_buf[0], sizeof(w->schema_buf));
97
rval = avro_schema_to_json(w->writers_schema, schema_writer);
99
avro_writer_free(schema_writer);
102
schema_len = avro_writer_tell(schema_writer);
103
avro_writer_free(schema_writer);
105
enc->write_bytes(w->writer, w->schema_buf, schema_len));
106
check(rval, enc->write_long(w->writer, 0));
107
return write_sync(w);
111
file_writer_init_fp(FILE *fp, const char *path, int should_close, const char *mode, avro_file_writer_t w)
114
fp = fopen(path, mode);
118
avro_set_error("Cannot open file for %s", path);
121
w->writer = avro_writer_file_fp(fp, should_close);
126
avro_set_error("Cannot create file writer for %s", path);
132
/* Exclusive file writing is supported by GCC using the mode
133
* "wx". Win32 does not support exclusive file writing, so for win32
134
* fall back to the non-exclusive file writing.
137
#define EXCLUSIVE_WRITE_MODE "wb"
139
#define EXCLUSIVE_WRITE_MODE "wbx"
143
file_writer_create(FILE *fp, const char *path, int should_close, avro_schema_t schema, avro_file_writer_t w, size_t block_size)
148
rval = file_writer_init_fp(fp, path, should_close, EXCLUSIVE_WRITE_MODE, w);
150
check(rval, file_writer_init_fp(fp, path, should_close, "wb", w));
153
w->datum_buffer_size = block_size;
154
w->datum_buffer = (char *) avro_malloc(w->datum_buffer_size);
156
if(!w->datum_buffer) {
157
avro_set_error("Could not allocate datum buffer\n");
158
avro_writer_free(w->writer);
163
avro_writer_memory(w->datum_buffer, w->datum_buffer_size);
164
if (!w->datum_writer) {
165
avro_set_error("Cannot create datum writer for file %s", path);
166
avro_writer_free(w->writer);
167
avro_free(w->datum_buffer, w->datum_buffer_size);
171
w->writers_schema = avro_schema_incref(schema);
172
return write_header(w);
176
avro_file_writer_create(const char *path, avro_schema_t schema,
177
avro_file_writer_t * writer)
179
return avro_file_writer_create_with_codec_fp(NULL, path, 1, schema, writer, "null", 0);
183
avro_file_writer_create_fp(FILE *fp, const char *path, int should_close, avro_schema_t schema,
184
avro_file_writer_t * writer)
186
return avro_file_writer_create_with_codec_fp(fp, path, should_close, schema, writer, "null", 0);
189
int avro_file_writer_create_with_codec(const char *path,
190
avro_schema_t schema, avro_file_writer_t * writer,
191
const char *codec, size_t block_size)
193
return avro_file_writer_create_with_codec_fp(NULL, path, 1, schema, writer, codec, block_size);
196
int avro_file_writer_create_with_codec_fp(FILE *fp, const char *path, int should_close,
197
avro_schema_t schema, avro_file_writer_t * writer,
198
const char *codec, size_t block_size)
200
avro_file_writer_t w;
202
check_param(EINVAL, path, "path");
203
check_param(EINVAL, is_avro_schema(schema), "schema");
204
check_param(EINVAL, writer, "writer");
205
check_param(EINVAL, codec, "codec");
207
if (block_size == 0) {
208
block_size = DEFAULT_BLOCK_SIZE;
211
w = (avro_file_writer_t) avro_new(struct avro_file_writer_t_);
213
avro_set_error("Cannot allocate new file writer");
216
w->codec = (avro_codec_t) avro_new(struct avro_codec_t_);
218
avro_set_error("Cannot allocate new codec");
219
avro_freet(struct avro_file_writer_t_, w);
222
rval = avro_codec(w->codec, codec);
224
avro_codec_reset(w->codec);
225
avro_freet(struct avro_codec_t_, w->codec);
226
avro_freet(struct avro_file_writer_t_, w);
229
rval = file_writer_create(fp, path, should_close, schema, w, block_size);
231
avro_codec_reset(w->codec);
232
avro_freet(struct avro_codec_t_, w->codec);
233
avro_freet(struct avro_file_writer_t_, w);
241
static int file_read_header(avro_reader_t reader,
242
avro_schema_t * writers_schema, avro_codec_t codec,
243
char *sync, int synclen)
246
avro_schema_t meta_schema;
247
avro_schema_t meta_values_schema;
248
avro_value_iface_t *meta_iface;
251
avro_value_t codec_val;
252
avro_value_t schema_bytes;
256
check(rval, avro_read(reader, magic, sizeof(magic)));
257
if (magic[0] != 'O' || magic[1] != 'b' || magic[2] != 'j'
259
avro_set_error("Incorrect Avro container file magic number");
263
meta_values_schema = avro_schema_bytes();
264
meta_schema = avro_schema_map(meta_values_schema);
265
meta_iface = avro_generic_class_from_schema(meta_schema);
266
if (meta_iface == NULL) {
269
check(rval, avro_generic_value_new(meta_iface, &meta));
270
rval = avro_value_read(reader, &meta);
272
avro_prefix_error("Cannot read file header: ");
275
avro_schema_decref(meta_schema);
277
rval = avro_value_get_by_name(&meta, "avro.codec", &codec_val, NULL);
279
if (avro_codec(codec, NULL) != 0) {
280
avro_set_error("Codec not specified in header and unable to set 'null' codec");
281
avro_value_decref(&meta);
289
avro_type_t type = avro_value_get_type(&codec_val);
291
if (type != AVRO_BYTES) {
292
avro_set_error("Value type of codec is unexpected");
293
avro_value_decref(&meta);
297
avro_value_get_bytes(&codec_val, &buf, &size);
298
memset(codec_name, 0, sizeof(codec_name));
299
strncpy(codec_name, (const char *) buf, size < 10 ? size : 10);
301
if (avro_codec(codec, codec_name) != 0) {
302
avro_set_error("File header contains an unknown codec");
303
avro_value_decref(&meta);
308
rval = avro_value_get_by_name(&meta, "avro.schema", &schema_bytes, NULL);
310
avro_set_error("File header doesn't contain a schema");
311
avro_value_decref(&meta);
315
avro_value_get_bytes(&schema_bytes, &p, &len);
316
rval = avro_schema_from_json_length((const char *) p, len, writers_schema);
318
avro_prefix_error("Cannot parse file header: ");
319
avro_value_decref(&meta);
323
avro_value_decref(&meta);
324
avro_value_iface_decref(meta_iface);
325
return avro_read(reader, sync, synclen);
329
file_writer_open(const char *path, avro_file_writer_t w, size_t block_size)
333
avro_reader_t reader;
335
/* Open for read AND write */
336
fp = fopen(path, "r+b");
338
avro_set_error("Error opening file: %s",
343
/* Don`t close the underlying file descriptor, logrotate can
344
* vanish it from sight. */
345
reader = avro_reader_file_fp(fp, 0);
348
avro_set_error("Cannot create file reader for %s", path);
352
file_read_header(reader, &w->writers_schema, w->codec, w->sync,
355
avro_reader_free(reader);
363
/* Position to end of file and get ready to write */
364
fseek(fp, 0, SEEK_END);
366
w->writer = avro_writer_file(fp);
369
avro_set_error("Cannot create file writer for %s", path);
373
if (block_size == 0) {
374
block_size = DEFAULT_BLOCK_SIZE;
377
w->datum_buffer_size = block_size;
378
w->datum_buffer = (char *) avro_malloc(w->datum_buffer_size);
380
if(!w->datum_buffer) {
381
avro_set_error("Could not allocate datum buffer\n");
382
avro_writer_free(w->writer);
387
avro_writer_memory(w->datum_buffer, w->datum_buffer_size);
388
if (!w->datum_writer) {
389
avro_set_error("Cannot create datum writer for file %s", path);
390
avro_writer_free(w->writer);
391
avro_free(w->datum_buffer, w->datum_buffer_size);
399
avro_file_writer_open_bs(const char *path, avro_file_writer_t * writer,
402
avro_file_writer_t w;
404
check_param(EINVAL, path, "path");
405
check_param(EINVAL, writer, "writer");
407
w = (avro_file_writer_t) avro_new(struct avro_file_writer_t_);
409
avro_set_error("Cannot create new file writer for %s", path);
412
w->codec = (avro_codec_t) avro_new(struct avro_codec_t_);
414
avro_set_error("Cannot allocate new codec");
415
avro_freet(struct avro_file_writer_t_, w);
418
avro_codec(w->codec, NULL);
419
rval = file_writer_open(path, w, block_size);
421
avro_codec_reset(w->codec);
422
avro_freet(struct avro_codec_t_, w->codec);
423
avro_freet(struct avro_file_writer_t_, w);
432
avro_file_writer_open(const char *path, avro_file_writer_t * writer)
434
return avro_file_writer_open_bs(path, writer, 0);
437
static int file_read_block_count(avro_file_reader_t r)
441
const avro_encoding_t *enc = &avro_binary_encoding;
442
check_prefix(rval, enc->read_long(r->reader, &r->blocks_total),
443
"Cannot read file block count: ");
444
check_prefix(rval, enc->read_long(r->reader, &len),
445
"Cannot read file block size: ");
447
if (r->current_blockdata && len > r->current_blocklen) {
448
r->current_blockdata = (char *) avro_realloc(r->current_blockdata, r->current_blocklen, len);
449
r->current_blocklen = len;
450
} else if (!r->current_blockdata) {
451
r->current_blockdata = (char *) avro_malloc(len);
452
r->current_blocklen = len;
455
check_prefix(rval, avro_read(r->reader, r->current_blockdata, len),
456
"Cannot read file block: ");
458
check_prefix(rval, avro_codec_decode(r->codec, r->current_blockdata, len),
459
"Cannot decode file block: ");
461
avro_reader_memory_set_source(r->block_reader, (const char *) r->codec->block_data, r->codec->used_size);
467
int avro_file_reader_fp(FILE *fp, const char *path, int should_close,
468
avro_file_reader_t * reader)
471
avro_file_reader_t r = (avro_file_reader_t) avro_new(struct avro_file_reader_t_);
476
avro_set_error("Cannot allocate file reader for %s", path);
480
r->reader = avro_reader_file_fp(fp, should_close);
485
avro_set_error("Cannot allocate reader for file %s", path);
486
avro_freet(struct avro_file_reader_t_, r);
489
r->block_reader = avro_reader_memory(0, 0);
490
if (!r->block_reader) {
491
avro_set_error("Cannot allocate block reader for file %s", path);
492
avro_reader_free(r->reader);
493
avro_freet(struct avro_file_reader_t_, r);
497
r->codec = (avro_codec_t) avro_new(struct avro_codec_t_);
499
avro_set_error("Could not allocate codec for file %s", path);
500
avro_reader_free(r->reader);
501
avro_freet(struct avro_file_reader_t_, r);
504
avro_codec(r->codec, NULL);
506
rval = file_read_header(r->reader, &r->writers_schema, r->codec,
507
r->sync, sizeof(r->sync));
509
avro_reader_free(r->reader);
510
avro_codec_reset(r->codec);
511
avro_freet(struct avro_codec_t_, r->codec);
512
avro_freet(struct avro_file_reader_t_, r);
516
r->current_blockdata = NULL;
517
r->current_blocklen = 0;
519
rval = file_read_block_count(r);
521
avro_reader_free(r->reader);
522
avro_codec_reset(r->codec);
523
avro_freet(struct avro_codec_t_, r->codec);
524
avro_freet(struct avro_file_reader_t_, r);
532
int avro_file_reader(const char *path, avro_file_reader_t * reader)
536
fp = fopen(path, "rb");
541
return avro_file_reader_fp(fp, path, 1, reader);
545
avro_file_reader_get_writer_schema(avro_file_reader_t r)
547
check_param(NULL, r, "reader");
548
return avro_schema_incref(r->writers_schema);
551
static int file_write_block(avro_file_writer_t w)
553
const avro_encoding_t *enc = &avro_binary_encoding;
556
if (w->block_count) {
557
/* Write the block count */
558
check_prefix(rval, enc->write_long(w->writer, w->block_count),
559
"Cannot write file block count: ");
560
/* Encode the block */
561
check_prefix(rval, avro_codec_encode(w->codec, w->datum_buffer, w->block_size),
562
"Cannot encode file block: ");
563
/* Write the block length */
564
check_prefix(rval, enc->write_long(w->writer, w->codec->used_size),
565
"Cannot write file block size: ");
566
/* Write the block */
567
check_prefix(rval, avro_write(w->writer, w->codec->block_data, w->codec->used_size),
568
"Cannot write file block: ");
569
/* Write the sync marker */
570
check_prefix(rval, write_sync(w),
571
"Cannot write sync marker: ");
572
/* Reset the datum writer */
573
avro_writer_reset(w->datum_writer);
580
int avro_file_writer_append(avro_file_writer_t w, avro_datum_t datum)
583
check_param(EINVAL, w, "writer");
584
check_param(EINVAL, datum, "datum");
586
rval = avro_write_data(w->datum_writer, w->writers_schema, datum);
588
check(rval, file_write_block(w));
590
avro_write_data(w->datum_writer, w->writers_schema, datum);
592
avro_set_error("Datum too large for file block size");
593
/* TODO: if the datum encoder larger than our buffer,
594
just write a single large datum */
599
w->block_size = avro_writer_tell(w->datum_writer);
604
avro_file_writer_append_value(avro_file_writer_t w, avro_value_t *value)
607
check_param(EINVAL, w, "writer");
608
check_param(EINVAL, value, "value");
610
rval = avro_value_write(w->datum_writer, value);
612
check(rval, file_write_block(w));
613
rval = avro_value_write(w->datum_writer, value);
615
avro_set_error("Value too large for file block size");
616
/* TODO: if the value encoder larger than our buffer,
617
just write a single large datum */
622
w->block_size = avro_writer_tell(w->datum_writer);
627
avro_file_writer_append_encoded(avro_file_writer_t w,
628
const void *buf, int64_t len)
631
check_param(EINVAL, w, "writer");
633
rval = avro_write(w->datum_writer, (void *) buf, len);
635
check(rval, file_write_block(w));
636
rval = avro_write(w->datum_writer, (void *) buf, len);
638
avro_set_error("Value too large for file block size");
639
/* TODO: if the value encoder larger than our buffer,
640
just write a single large datum */
645
w->block_size = avro_writer_tell(w->datum_writer);
649
int avro_file_writer_sync(avro_file_writer_t w)
651
return file_write_block(w);
654
int avro_file_writer_flush(avro_file_writer_t w)
657
check(rval, file_write_block(w));
658
avro_writer_flush(w->writer);
662
int avro_file_writer_close(avro_file_writer_t w)
665
check(rval, avro_file_writer_flush(w));
666
avro_schema_decref(w->writers_schema);
667
avro_writer_free(w->datum_writer);
668
avro_writer_free(w->writer);
669
avro_free(w->datum_buffer, w->datum_buffer_size);
670
avro_codec_reset(w->codec);
671
avro_freet(struct avro_codec_t_, w->codec);
672
avro_freet(struct avro_file_writer_t_, w);
676
int avro_file_reader_read(avro_file_reader_t r, avro_schema_t readers_schema,
677
avro_datum_t * datum)
682
check_param(EINVAL, r, "reader");
683
check_param(EINVAL, datum, "datum");
686
avro_read_data(r->block_reader, r->writers_schema, readers_schema,
690
if (r->blocks_read == r->blocks_total) {
691
check(rval, avro_read(r->reader, sync, sizeof(sync)));
692
if (memcmp(r->sync, sync, sizeof(r->sync)) != 0) {
693
/* wrong sync bytes */
694
avro_set_error("Incorrect sync bytes");
697
/* For now, ignore errors (e.g. EOF) */
698
file_read_block_count(r);
704
avro_file_reader_read_value(avro_file_reader_t r, avro_value_t *value)
709
check_param(EINVAL, r, "reader");
710
check_param(EINVAL, value, "value");
712
if (r->blocks_read == r->blocks_total) {
713
check(rval, avro_read(r->reader, sync, sizeof(sync)));
714
if (memcmp(r->sync, sync, sizeof(r->sync)) != 0) {
715
/* wrong sync bytes */
716
avro_set_error("Incorrect sync bytes");
720
/* Did we just hit the end of the file? */
721
if (avro_reader_is_eof(r->reader))
724
check(rval, file_read_block_count(r));
727
check(rval, avro_value_read(r->block_reader, value));
733
int avro_file_reader_close(avro_file_reader_t reader)
735
avro_schema_decref(reader->writers_schema);
736
avro_reader_free(reader->reader);
737
avro_reader_free(reader->block_reader);
738
avro_codec_reset(reader->codec);
739
avro_freet(struct avro_codec_t_, reader->codec);
740
if (reader->current_blockdata) {
741
avro_free(reader->current_blockdata, reader->current_blocklen);
743
avro_freet(struct avro_file_reader_t_, reader);