1
/* sync-gridfs-stream.c - libmongo-client GridFS streaming implementation
2
* Copyright 2011 Gergely Nagy <algernon@balabit.hu>
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
* you may not use this file except in compliance with the License.
6
* You may obtain a copy of the License at
8
* http://www.apache.org/licenses/LICENSE-2.0
10
* Unless required by applicable law or agreed to in writing, software
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
* See the License for the specific language governing permissions and
14
* limitations under the License.
17
/** @file src/sync-gridfs-stream.c
18
* MongoDB GridFS Streaming API implementation.
21
#include "sync-gridfs-stream.h"
22
#include "libmongo-private.h"
27
mongo_sync_gridfs_stream *
28
mongo_sync_gridfs_stream_find (mongo_sync_gridfs *gfs,
31
mongo_sync_gridfs_stream *stream;
48
p = mongo_sync_cmd_query (gfs->conn, gfs->ns.files, 0, 0, 1, query, NULL);
52
stream = g_new0 (mongo_sync_gridfs_stream, 1);
54
stream->file.type = LMC_GRIDFS_FILE_STREAM_READER;
56
mongo_wire_reply_packet_get_nth_document (p, 1, &meta);
58
mongo_wire_packet_free (p);
60
c = bson_find (meta, "_id");
61
if (!bson_cursor_get_oid (c, &oid))
70
stream->file.id = g_malloc (12);
71
memcpy (stream->file.id, oid, 12);
73
bson_cursor_find (c, "length");
74
bson_cursor_get_int64 (c, &stream->file.length);
75
if (stream->file.length == 0)
79
bson_cursor_get_int32 (c, &i);
80
stream->file.length = i;
83
bson_cursor_find (c, "chunkSize");
84
bson_cursor_get_int32 (c, &stream->file.chunk_size);
89
if (stream->file.length == 0 ||
90
stream->file.chunk_size == 0)
92
g_free (stream->file.id);
102
mongo_sync_gridfs_stream *
103
mongo_sync_gridfs_stream_new (mongo_sync_gridfs *gfs,
104
const bson *metadata)
106
mongo_sync_gridfs_stream *stream;
115
stream = g_new0 (mongo_sync_gridfs_stream, 1);
116
stream->file.type = LMC_GRIDFS_FILE_STREAM_WRITER;
119
stream->file.chunk_size = gfs->chunk_size;
121
stream->writer.metadata = bson_new_from_data (bson_data (metadata),
122
bson_size (metadata) - 1);
124
c = bson_find (metadata, "_id");
127
stream->file.id = mongo_util_oid_new
128
(mongo_connection_get_requestid ((mongo_connection *)gfs->conn));
129
if (!stream->file.id)
131
bson_free (stream->writer.metadata);
137
bson_append_oid (stream->writer.metadata, "_id", stream->file.id);
143
if (!bson_cursor_get_oid (c, &oid))
145
bson_cursor_free (c);
146
bson_free (stream->writer.metadata);
153
stream->file.id = g_malloc (12);
154
memcpy (stream->file.id, oid, 12);
156
bson_cursor_free (c);
157
bson_finish (stream->writer.metadata);
159
stream->writer.buffer = g_malloc (stream->file.chunk_size);
160
stream->writer.checksum = g_checksum_new (G_CHECKSUM_MD5);
165
static inline gboolean
166
_stream_seek_chunk (mongo_sync_gridfs_stream *stream,
172
bson_binary_subtype subt = BSON_BINARY_SUBTYPE_USER_DEFINED;
175
b = bson_new_sized (32);
176
bson_append_oid (b, "files_id", stream->file.id);
177
bson_append_int64 (b, "n", chunk);
180
p = mongo_sync_cmd_query (stream->gfs->conn,
181
stream->gfs->ns.chunks, 0,
185
bson_free (stream->reader.bson);
186
stream->reader.bson = NULL;
187
stream->reader.chunk.data = NULL;
189
mongo_wire_reply_packet_get_nth_document (p, 1, &stream->reader.bson);
190
mongo_wire_packet_free (p);
191
bson_finish (stream->reader.bson);
193
c = bson_find (stream->reader.bson, "data");
194
r = bson_cursor_get_binary (c, &subt, &stream->reader.chunk.data,
195
&stream->reader.chunk.size);
196
if (!r || subt != BSON_BINARY_SUBTYPE_GENERIC)
198
bson_cursor_free (c);
199
bson_free (stream->reader.bson);
200
stream->reader.bson = NULL;
201
stream->reader.chunk.data = NULL;
206
bson_cursor_free (c);
208
stream->reader.chunk.offset = 0;
213
mongo_sync_gridfs_stream_read (mongo_sync_gridfs_stream *stream,
224
if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER)
229
if (!buffer || size <= 0)
235
if (!stream->reader.chunk.data)
237
if (!_stream_seek_chunk (stream, 0))
241
while (pos < size && stream->file.offset < stream->file.length)
243
gint32 csize = stream->reader.chunk.size - stream->reader.chunk.offset;
245
if (size - pos < csize)
248
memcpy (buffer + pos,
249
stream->reader.chunk.data + stream->reader.chunk.offset, csize);
251
stream->reader.chunk.offset += csize;
252
stream->file.offset += csize;
255
if (stream->reader.chunk.offset >= stream->reader.chunk.size &&
256
stream->file.offset < stream->file.length)
258
stream->file.current_chunk++;
259
if (!_stream_seek_chunk (stream, stream->file.current_chunk))
268
_stream_chunk_write (mongo_sync_gridfs *gfs,
269
const guint8 *oid, gint64 n,
270
const guint8 *buffer, gint32 size)
274
chunk = bson_new_sized (size + 128);
275
bson_append_oid (chunk, "files_id", oid);
276
bson_append_int64 (chunk, "n", n);
277
bson_append_binary (chunk, "data", BSON_BINARY_SUBTYPE_GENERIC,
281
if (!mongo_sync_cmd_insert (gfs->conn, gfs->ns.chunks, chunk, NULL))
295
mongo_sync_gridfs_stream_write (mongo_sync_gridfs_stream *stream,
296
const guint8 *buffer,
306
if (stream->file.type != LMC_GRIDFS_FILE_STREAM_WRITER)
311
if (!buffer || size <= 0)
319
gint32 csize = stream->file.chunk_size - stream->writer.buffer_offset;
321
if (size - pos < csize)
324
memcpy (stream->writer.buffer + stream->writer.buffer_offset,
325
buffer + pos, csize);
326
stream->writer.buffer_offset += csize;
327
stream->file.offset += csize;
328
stream->file.length += csize;
331
if (stream->writer.buffer_offset == stream->file.chunk_size)
333
if (!_stream_chunk_write (stream->gfs,
335
stream->file.current_chunk,
336
stream->writer.buffer,
337
stream->file.chunk_size))
339
g_checksum_update (stream->writer.checksum, stream->writer.buffer,
340
stream->file.chunk_size);
342
stream->writer.buffer_offset = 0;
343
stream->file.current_chunk++;
351
mongo_sync_gridfs_stream_seek (mongo_sync_gridfs_stream *stream,
364
if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER)
373
if (pos == stream->file.offset)
375
if (pos < 0 || pos > stream->file.length)
383
if (pos + stream->file.offset < 0 ||
384
pos + stream->file.offset > stream->file.length)
391
real_pos = pos + stream->file.offset;
394
if (pos > 0 || pos + stream->file.length < 0)
399
real_pos = pos + stream->file.length;
406
chunk = real_pos / stream->file.chunk_size;
407
offs = real_pos % stream->file.chunk_size;
409
if (!_stream_seek_chunk (stream, chunk))
412
stream->reader.chunk.offset = offs;
413
stream->file.current_chunk = chunk;
414
stream->file.offset = real_pos;
420
mongo_sync_gridfs_stream_close (mongo_sync_gridfs_stream *stream)
428
if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER &&
429
stream->file.type != LMC_GRIDFS_FILE_STREAM_WRITER)
435
if (stream->file.type == LMC_GRIDFS_FILE_STREAM_WRITER)
440
gboolean closed = FALSE;
442
if (stream->writer.buffer_offset > 0)
444
closed = _stream_chunk_write (stream->gfs,
446
stream->file.current_chunk,
447
stream->writer.buffer,
448
stream->writer.buffer_offset);
451
g_checksum_update (stream->writer.checksum,
452
stream->writer.buffer,
453
stream->writer.buffer_offset);
458
g_get_current_time (&tv);
459
upload_date = (((gint64) tv.tv_sec) * 1000) +
460
(gint64)(tv.tv_usec / 1000);
462
/* _id is guaranteed by _stream_new() */
463
meta = bson_new_from_data (bson_data (stream->writer.metadata),
464
bson_size (stream->writer.metadata) - 1);
465
bson_append_int64 (meta, "length", stream->file.length);
466
bson_append_int32 (meta, "chunkSize", stream->file.chunk_size);
467
bson_append_utc_datetime (meta, "uploadDate", upload_date);
468
if (stream->file.length)
469
bson_append_string (meta, "md5",
470
g_checksum_get_string (stream->writer.checksum), -1);
473
if (!mongo_sync_cmd_insert (stream->gfs->conn,
474
stream->gfs->ns.files, meta, NULL))
485
bson_free (stream->writer.metadata);
486
g_checksum_free (stream->writer.checksum);
487
g_free (stream->writer.buffer);
490
bson_free (stream->reader.bson);
492
g_free (stream->file.id);