~ubuntu-branches/ubuntu/trusty/syslog-ng/trusty-proposed

« back to all changes in this revision

Viewing changes to modules/afmongodb/libmongo-client/src/sync-gridfs-stream.c

  • Committer: Package Import Robot
  • Author(s): Laszlo Boszormenyi (GCS), Gergely Nagy
  • Date: 2011-10-11 14:30:48 UTC
  • mfrom: (1.3.7)
  • Revision ID: package-import@ubuntu.com-20111011143048-r1iljux9xbvj3lwh
Tags: 3.3.1.dfsg-1
* New upstream release with important fixes from upstream git tree with
  non-free manpages removed.
* Drop syslog-ng.conf(5) (closes: #496521).
* syslog-ng(8) is generated, and does not mention -Q anymore
  (closes: #616069).
* Supports CAP_SYSLOG on recent kernels (closes: #630172).
* Does not use g_timeout_add_seconds anymore (closes: #609154).

[ Gergely Nagy <algernon@madhouse-project.org> ]
* Update debian/copyright to DEP-5 format.
* Simplified the logrotate file by merging identical entries.
* Include local configuration files from /etc/syslog-ng/conf.d/ (Closes:
  #609050).
* Update syslog-ng.conf to be fully 3.3 compliant.
* Compress both source and binaries (except the syslog-ng meta
  package) with xz, instead of gzip.
* Use dpkg triggers to restart syslog-ng when appropriate.
* Include DFSG-free manual pages for all binaries.
* Build with Hardening enabled.
* Mention syslog(3) in /etc/default/syslog-ng, instead of
  <linux/kernel.h> (Closes: #608605)
* Support 'status' in the init script.
  Patch from Peter Eisentraut <petere@debian.org> (Closes: #644458)
* Build-Depend on libevtlog-dev (>= 0.2.12-5~) for correct shlibs.
* Use [linux-any] in Build-Depends instead of hardcoded links.
  (Closes: #634715)
* Use $SYSLOGNG_OPTS in the init script when reloading syslog-ng.
  (Closes: #589081)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* sync-gridfs-stream.c - libmongo-client GridFS streaming implementation
 
2
 * Copyright 2011 Gergely Nagy <algernon@balabit.hu>
 
3
 *
 
4
 * Licensed under the Apache License, Version 2.0 (the "License");
 
5
 * you may not use this file except in compliance with the License.
 
6
 * You may obtain a copy of the License at
 
7
 *
 
8
 *     http://www.apache.org/licenses/LICENSE-2.0
 
9
 *
 
10
 * Unless required by applicable law or agreed to in writing, software
 
11
 * distributed under the License is distributed on an "AS IS" BASIS,
 
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
 * See the License for the specific language governing permissions and
 
14
 * limitations under the License.
 
15
 */
 
16
 
 
17
/** @file src/sync-gridfs-stream.c
 
18
 * MongoDB GridFS Streaming API implementation.
 
19
 */
 
20
 
 
21
#include "sync-gridfs-stream.h"
 
22
#include "libmongo-private.h"
 
23
 
 
24
#include <unistd.h>
 
25
#include <errno.h>
 
26
 
 
27
mongo_sync_gridfs_stream *
 
28
mongo_sync_gridfs_stream_find (mongo_sync_gridfs *gfs,
 
29
                               const bson *query)
 
30
{
 
31
  mongo_sync_gridfs_stream *stream;
 
32
  bson *meta = NULL;
 
33
  bson_cursor *c;
 
34
  mongo_packet *p;
 
35
  const guint8 *oid;
 
36
 
 
37
  if (!gfs)
 
38
    {
 
39
      errno = ENOTCONN;
 
40
      return NULL;
 
41
    }
 
42
  if (!query)
 
43
    {
 
44
      errno = EINVAL;
 
45
      return NULL;
 
46
    }
 
47
 
 
48
  p = mongo_sync_cmd_query (gfs->conn, gfs->ns.files, 0, 0, 1, query, NULL);
 
49
  if (!p)
 
50
    return NULL;
 
51
 
 
52
  stream = g_new0 (mongo_sync_gridfs_stream, 1);
 
53
  stream->gfs = gfs;
 
54
  stream->file.type = LMC_GRIDFS_FILE_STREAM_READER;
 
55
 
 
56
  mongo_wire_reply_packet_get_nth_document (p, 1, &meta);
 
57
  bson_finish (meta);
 
58
  mongo_wire_packet_free (p);
 
59
 
 
60
  c = bson_find (meta, "_id");
 
61
  if (!bson_cursor_get_oid (c, &oid))
 
62
    {
 
63
      bson_cursor_free (c);
 
64
      bson_free (meta);
 
65
      g_free (stream);
 
66
 
 
67
      errno = EPROTO;
 
68
      return NULL;
 
69
    }
 
70
  stream->file.id = g_malloc (12);
 
71
  memcpy (stream->file.id, oid, 12);
 
72
 
 
73
  bson_cursor_find (c, "length");
 
74
  bson_cursor_get_int64 (c, &stream->file.length);
 
75
  if (stream->file.length == 0)
 
76
    {
 
77
      gint32 i = 0;
 
78
 
 
79
      bson_cursor_get_int32 (c, &i);
 
80
      stream->file.length = i;
 
81
    }
 
82
 
 
83
  bson_cursor_find (c, "chunkSize");
 
84
  bson_cursor_get_int32 (c, &stream->file.chunk_size);
 
85
 
 
86
  bson_cursor_free (c);
 
87
  bson_free (meta);
 
88
 
 
89
  if (stream->file.length == 0 ||
 
90
      stream->file.chunk_size == 0)
 
91
    {
 
92
      g_free (stream->file.id);
 
93
      g_free (stream);
 
94
 
 
95
      errno = EPROTO;
 
96
      return NULL;
 
97
    }
 
98
 
 
99
  return stream;
 
100
}
 
101
 
 
102
mongo_sync_gridfs_stream *
 
103
mongo_sync_gridfs_stream_new (mongo_sync_gridfs *gfs,
 
104
                              const bson *metadata)
 
105
{
 
106
  mongo_sync_gridfs_stream *stream;
 
107
  bson_cursor *c;
 
108
 
 
109
  if (!gfs)
 
110
    {
 
111
      errno = ENOTCONN;
 
112
      return NULL;
 
113
    }
 
114
 
 
115
  stream = g_new0 (mongo_sync_gridfs_stream, 1);
 
116
  stream->file.type = LMC_GRIDFS_FILE_STREAM_WRITER;
 
117
  stream->gfs = gfs;
 
118
 
 
119
  stream->file.chunk_size = gfs->chunk_size;
 
120
 
 
121
  stream->writer.metadata = bson_new_from_data (bson_data (metadata),
 
122
                                                bson_size (metadata) - 1);
 
123
 
 
124
  c = bson_find (metadata, "_id");
 
125
  if (!c)
 
126
    {
 
127
      stream->file.id = mongo_util_oid_new
 
128
        (mongo_connection_get_requestid ((mongo_connection *)gfs->conn));
 
129
      if (!stream->file.id)
 
130
        {
 
131
          bson_free (stream->writer.metadata);
 
132
          g_free (stream);
 
133
 
 
134
          errno = EFAULT;
 
135
          return NULL;
 
136
        }
 
137
      bson_append_oid (stream->writer.metadata, "_id", stream->file.id);
 
138
    }
 
139
  else
 
140
    {
 
141
      const guint8 *oid;
 
142
 
 
143
      if (!bson_cursor_get_oid (c, &oid))
 
144
        {
 
145
          bson_cursor_free (c);
 
146
          bson_free (stream->writer.metadata);
 
147
          g_free (stream);
 
148
 
 
149
          errno = EPROTO;
 
150
          return NULL;
 
151
        }
 
152
 
 
153
      stream->file.id = g_malloc (12);
 
154
      memcpy (stream->file.id, oid, 12);
 
155
    }
 
156
  bson_cursor_free (c);
 
157
  bson_finish (stream->writer.metadata);
 
158
 
 
159
  stream->writer.buffer = g_malloc (stream->file.chunk_size);
 
160
  stream->writer.checksum = g_checksum_new (G_CHECKSUM_MD5);
 
161
 
 
162
  return stream;
 
163
}
 
164
 
 
165
static inline gboolean
 
166
_stream_seek_chunk (mongo_sync_gridfs_stream *stream,
 
167
                    gint64 chunk)
 
168
{
 
169
  bson *b;
 
170
  mongo_packet *p;
 
171
  bson_cursor *c;
 
172
  bson_binary_subtype subt = BSON_BINARY_SUBTYPE_USER_DEFINED;
 
173
  gboolean r;
 
174
 
 
175
  b = bson_new_sized (32);
 
176
  bson_append_oid (b, "files_id", stream->file.id);
 
177
  bson_append_int64 (b, "n", chunk);
 
178
  bson_finish (b);
 
179
 
 
180
  p = mongo_sync_cmd_query (stream->gfs->conn,
 
181
                            stream->gfs->ns.chunks, 0,
 
182
                            0, 1, b, NULL);
 
183
  bson_free (b);
 
184
 
 
185
  bson_free (stream->reader.bson);
 
186
  stream->reader.bson = NULL;
 
187
  stream->reader.chunk.data = NULL;
 
188
 
 
189
  mongo_wire_reply_packet_get_nth_document (p, 1, &stream->reader.bson);
 
190
  mongo_wire_packet_free (p);
 
191
  bson_finish (stream->reader.bson);
 
192
 
 
193
  c = bson_find (stream->reader.bson, "data");
 
194
  r = bson_cursor_get_binary (c, &subt, &stream->reader.chunk.data,
 
195
                              &stream->reader.chunk.size);
 
196
  if (!r || subt != BSON_BINARY_SUBTYPE_GENERIC)
 
197
    {
 
198
      bson_cursor_free (c);
 
199
      bson_free (stream->reader.bson);
 
200
      stream->reader.bson = NULL;
 
201
      stream->reader.chunk.data = NULL;
 
202
 
 
203
      errno = EPROTO;
 
204
      return FALSE;
 
205
    }
 
206
  bson_cursor_free (c);
 
207
 
 
208
  stream->reader.chunk.offset = 0;
 
209
  return TRUE;
 
210
}
 
211
 
 
212
gint64
 
213
mongo_sync_gridfs_stream_read (mongo_sync_gridfs_stream *stream,
 
214
                               guint8 *buffer,
 
215
                               gint64 size)
 
216
{
 
217
  gint64 pos = 0;
 
218
 
 
219
  if (!stream)
 
220
    {
 
221
      errno = ENOENT;
 
222
      return -1;
 
223
    }
 
224
  if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER)
 
225
    {
 
226
      errno = EOPNOTSUPP;
 
227
      return -1;
 
228
    }
 
229
  if (!buffer || size <= 0)
 
230
    {
 
231
      errno = EINVAL;
 
232
      return -1;
 
233
    }
 
234
 
 
235
  if (!stream->reader.chunk.data)
 
236
    {
 
237
      if (!_stream_seek_chunk (stream, 0))
 
238
        return -1;
 
239
    }
 
240
 
 
241
  while (pos < size && stream->file.offset < stream->file.length)
 
242
    {
 
243
      gint32 csize = stream->reader.chunk.size - stream->reader.chunk.offset;
 
244
 
 
245
      if (size - pos < csize)
 
246
        csize = size - pos;
 
247
 
 
248
      memcpy (buffer + pos,
 
249
              stream->reader.chunk.data + stream->reader.chunk.offset, csize);
 
250
 
 
251
      stream->reader.chunk.offset += csize;
 
252
      stream->file.offset += csize;
 
253
      pos += csize;
 
254
 
 
255
      if (stream->reader.chunk.offset >= stream->reader.chunk.size &&
 
256
          stream->file.offset < stream->file.length)
 
257
        {
 
258
          stream->file.current_chunk++;
 
259
          if (!_stream_seek_chunk (stream, stream->file.current_chunk))
 
260
            return -1;
 
261
        }
 
262
    }
 
263
 
 
264
  return pos;
 
265
}
 
266
 
 
267
static gboolean
 
268
_stream_chunk_write (mongo_sync_gridfs *gfs,
 
269
                     const guint8 *oid, gint64 n,
 
270
                     const guint8 *buffer, gint32 size)
 
271
{
 
272
  bson *chunk;
 
273
 
 
274
  chunk = bson_new_sized (size + 128);
 
275
  bson_append_oid (chunk, "files_id", oid);
 
276
  bson_append_int64 (chunk, "n", n);
 
277
  bson_append_binary (chunk, "data", BSON_BINARY_SUBTYPE_GENERIC,
 
278
                      buffer, size);
 
279
  bson_finish (chunk);
 
280
 
 
281
  if (!mongo_sync_cmd_insert (gfs->conn, gfs->ns.chunks, chunk, NULL))
 
282
    {
 
283
      int e = errno;
 
284
 
 
285
      bson_free (chunk);
 
286
      errno = e;
 
287
      return FALSE;
 
288
    }
 
289
  bson_free (chunk);
 
290
 
 
291
  return TRUE;
 
292
}
 
293
 
 
294
gboolean
 
295
mongo_sync_gridfs_stream_write (mongo_sync_gridfs_stream *stream,
 
296
                                const guint8 *buffer,
 
297
                                gint64 size)
 
298
{
 
299
  gint64 pos = 0;
 
300
 
 
301
  if (!stream)
 
302
    {
 
303
      errno = ENOENT;
 
304
      return FALSE;
 
305
    }
 
306
  if (stream->file.type != LMC_GRIDFS_FILE_STREAM_WRITER)
 
307
    {
 
308
      errno = EOPNOTSUPP;
 
309
      return FALSE;
 
310
    }
 
311
  if (!buffer || size <= 0)
 
312
    {
 
313
      errno = EINVAL;
 
314
      return FALSE;
 
315
    }
 
316
 
 
317
  while (pos < size)
 
318
    {
 
319
      gint32 csize = stream->file.chunk_size - stream->writer.buffer_offset;
 
320
 
 
321
      if (size - pos < csize)
 
322
        csize = size - pos;
 
323
 
 
324
      memcpy (stream->writer.buffer + stream->writer.buffer_offset,
 
325
              buffer + pos, csize);
 
326
      stream->writer.buffer_offset += csize;
 
327
      stream->file.offset += csize;
 
328
      stream->file.length += csize;
 
329
      pos += csize;
 
330
 
 
331
      if (stream->writer.buffer_offset == stream->file.chunk_size)
 
332
        {
 
333
          if (!_stream_chunk_write (stream->gfs,
 
334
                                    stream->file.id,
 
335
                                    stream->file.current_chunk,
 
336
                                    stream->writer.buffer,
 
337
                                    stream->file.chunk_size))
 
338
            return FALSE;
 
339
          g_checksum_update (stream->writer.checksum, stream->writer.buffer,
 
340
                             stream->file.chunk_size);
 
341
 
 
342
          stream->writer.buffer_offset = 0;
 
343
          stream->file.current_chunk++;
 
344
        }
 
345
    }
 
346
 
 
347
  return TRUE;
 
348
}
 
349
 
 
350
gboolean
 
351
mongo_sync_gridfs_stream_seek (mongo_sync_gridfs_stream *stream,
 
352
                               gint64 pos,
 
353
                               gint whence)
 
354
{
 
355
  gint64 real_pos = 0;
 
356
  gint64 chunk;
 
357
  gint32 offs;
 
358
 
 
359
  if (!stream)
 
360
    {
 
361
      errno = ENOENT;
 
362
      return FALSE;
 
363
    }
 
364
  if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER)
 
365
    {
 
366
      errno = EOPNOTSUPP;
 
367
      return FALSE;
 
368
    }
 
369
 
 
370
  switch (whence)
 
371
    {
 
372
    case SEEK_SET:
 
373
      if (pos == stream->file.offset)
 
374
        return TRUE;
 
375
      if (pos < 0 || pos > stream->file.length)
 
376
        {
 
377
          errno = ERANGE;
 
378
          return FALSE;
 
379
        }
 
380
      real_pos = pos;
 
381
      break;
 
382
    case SEEK_CUR:
 
383
      if (pos + stream->file.offset < 0 ||
 
384
          pos + stream->file.offset > stream->file.length)
 
385
        {
 
386
          errno = ERANGE;
 
387
          return FALSE;
 
388
        }
 
389
      if (pos == 0)
 
390
        return TRUE;
 
391
      real_pos = pos + stream->file.offset;
 
392
      break;
 
393
    case SEEK_END:
 
394
      if (pos > 0 || pos + stream->file.length < 0)
 
395
        {
 
396
          errno = ERANGE;
 
397
          return FALSE;
 
398
        }
 
399
      real_pos = pos + stream->file.length;
 
400
      break;
 
401
    default:
 
402
      errno = EINVAL;
 
403
      return FALSE;
 
404
    }
 
405
 
 
406
  chunk = real_pos / stream->file.chunk_size;
 
407
  offs = real_pos % stream->file.chunk_size;
 
408
 
 
409
  if (!_stream_seek_chunk (stream, chunk))
 
410
    return FALSE;
 
411
 
 
412
  stream->reader.chunk.offset = offs;
 
413
  stream->file.current_chunk = chunk;
 
414
  stream->file.offset = real_pos;
 
415
 
 
416
  return TRUE;
 
417
}
 
418
 
 
419
gboolean
 
420
mongo_sync_gridfs_stream_close (mongo_sync_gridfs_stream *stream)
 
421
{
 
422
  if (!stream)
 
423
    {
 
424
      errno = ENOENT;
 
425
      return FALSE;
 
426
    }
 
427
 
 
428
  if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER &&
 
429
      stream->file.type != LMC_GRIDFS_FILE_STREAM_WRITER)
 
430
    {
 
431
      errno = EINVAL;
 
432
      return FALSE;
 
433
    }
 
434
 
 
435
  if (stream->file.type == LMC_GRIDFS_FILE_STREAM_WRITER)
 
436
    {
 
437
      bson *meta;
 
438
      gint64 upload_date;
 
439
      GTimeVal tv;
 
440
      gboolean closed = FALSE;
 
441
 
 
442
      if (stream->writer.buffer_offset > 0)
 
443
        {
 
444
          closed = _stream_chunk_write (stream->gfs,
 
445
                                        stream->file.id,
 
446
                                        stream->file.current_chunk,
 
447
                                        stream->writer.buffer,
 
448
                                        stream->writer.buffer_offset);
 
449
 
 
450
          if (closed)
 
451
            g_checksum_update (stream->writer.checksum,
 
452
                               stream->writer.buffer,
 
453
                               stream->writer.buffer_offset);
 
454
        }
 
455
 
 
456
      if (closed)
 
457
        {
 
458
          g_get_current_time (&tv);
 
459
          upload_date =  (((gint64) tv.tv_sec) * 1000) +
 
460
            (gint64)(tv.tv_usec / 1000);
 
461
 
 
462
          /* _id is guaranteed by _stream_new() */
 
463
          meta = bson_new_from_data (bson_data (stream->writer.metadata),
 
464
                                     bson_size (stream->writer.metadata) - 1);
 
465
          bson_append_int64 (meta, "length", stream->file.length);
 
466
          bson_append_int32 (meta, "chunkSize", stream->file.chunk_size);
 
467
          bson_append_utc_datetime (meta, "uploadDate", upload_date);
 
468
          if (stream->file.length)
 
469
            bson_append_string (meta, "md5",
 
470
                                g_checksum_get_string (stream->writer.checksum), -1);
 
471
          bson_finish (meta);
 
472
 
 
473
          if (!mongo_sync_cmd_insert (stream->gfs->conn,
 
474
                                      stream->gfs->ns.files, meta, NULL))
 
475
            {
 
476
              int e = errno;
 
477
 
 
478
              bson_free (meta);
 
479
              errno = e;
 
480
              return FALSE;
 
481
            }
 
482
          bson_free (meta);
 
483
        }
 
484
 
 
485
      bson_free (stream->writer.metadata);
 
486
      g_checksum_free (stream->writer.checksum);
 
487
      g_free (stream->writer.buffer);
 
488
    }
 
489
  else
 
490
    bson_free (stream->reader.bson);
 
491
 
 
492
  g_free (stream->file.id);
 
493
  g_free (stream);
 
494
  return TRUE;
 
495
}