~ubuntu-branches/ubuntu/wily/syslog-ng/wily-proposed

« back to all changes in this revision

Viewing changes to lib/logproto.c

  • Committer: Package Import Robot
  • Author(s): Gergely Nagy, Gergely Nagy
  • Date: 2013-11-04 15:27:37 UTC
  • mfrom: (1.3.12)
  • Revision ID: package-import@ubuntu.com-20131104152737-mqh6eqtna2xk97jq
Tags: 3.5.1-1
[ Gergely Nagy <algernon@madhouse-project.org> ]
* New upstream release.
  + Support auto-loading modules (Closes: #650814)
  + The SMTP module is available in syslog-ng-mod-smtp (Closes: #722746)
  + New modules: amqp, geoip, stomp, redis and smtp.
  + Multi-line input support (indented multiline and regexp-based)
  + Template type hinting for the MongoDB destination and $(format-json)
  + Support for unit suffixes in the configuration file
  + New filters, template functions and other miscellaneous changes
* New (team) maintainer, Laszlo Boszormenyi, Attila Szalay and myself
  added to Uploaders.
* Ship /var/lib/syslog-ng in the syslog-ng-core package, instead of
  creating it in the init script. Thanks Michael Biebl
  <biebl@debian.org> for the report & assistance. (Closes: #699942, #719910)
* Use dh-systemd for proper systemd-related maintainer scripts. Based on
  a patch by Michael Biebl <biebl@debian.org>. (Closes: #713982,
  #690067)
* Do not wait for syslog-ng to settle down during installation / update.
  This also fixes installing via debootstrap and a fake
  start-stop-daemon. (Closes: #714254)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
 * Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary
3
 
 * Copyright (c) 1998-2010 Balázs Scheidler
4
 
 *
5
 
 * This library is free software; you can redistribute it and/or
6
 
 * modify it under the terms of the GNU Lesser General Public
7
 
 * License as published by the Free Software Foundation; either
8
 
 * version 2.1 of the License, or (at your option) any later version.
9
 
 *
10
 
 * This library is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 
 * Lesser General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU Lesser General Public
16
 
 * License along with this library; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
 
 *
19
 
 * As an additional exemption you are allowed to compile & link against the
20
 
 * OpenSSL libraries as published by the OpenSSL project. See the file
21
 
 * COPYING for details.
22
 
 *
23
 
 */
24
 
 
25
 
 
26
 
#include "logproto.h"
27
 
#include "messages.h"
28
 
#include "persist-state.h"
29
 
#include "compat.h"
30
 
 
31
 
#include <ctype.h>
32
 
#include <string.h>
33
 
#include <sys/stat.h>
34
 
#include <stdlib.h>
35
 
#include <sys/uio.h>
36
 
#include <limits.h>
37
 
 
38
 
gboolean
39
 
log_proto_set_encoding(LogProto *self, const gchar *encoding)
40
 
{
41
 
  if (self->convert != (GIConv) -1)
42
 
    {
43
 
      g_iconv_close(self->convert);
44
 
      self->convert = (GIConv) -1;
45
 
    }
46
 
  if (self->encoding)
47
 
    {
48
 
      g_free(self->encoding);
49
 
      self->encoding = NULL;
50
 
    }
51
 
 
52
 
  self->convert = g_iconv_open("utf-8", encoding);
53
 
  if (self->convert == (GIConv) -1)
54
 
    return FALSE;
55
 
 
56
 
  self->encoding = g_strdup(encoding);
57
 
  return TRUE;
58
 
}
59
 
 
60
 
void
61
 
log_proto_free(LogProto *s)
62
 
{
63
 
  if (s->free_fn)
64
 
    s->free_fn(s);
65
 
  if (s->convert != (GIConv) -1)
66
 
    g_iconv_close(s->convert);
67
 
  if (s->encoding)
68
 
    g_free(s->encoding);
69
 
  log_transport_free(s->transport);
70
 
  g_free(s);
71
 
}
72
 
 
73
 
 
74
 
typedef struct _LogProtoTextClient
75
 
{
76
 
  LogProto super;
77
 
  gint state, next_state;
78
 
  guchar *partial;
79
 
  GDestroyNotify partial_free;
80
 
  gsize partial_len, partial_pos;
81
 
} LogProtoTextClient;
82
 
 
83
 
static gboolean
84
 
log_proto_text_client_prepare(LogProto *s, gint *fd, GIOCondition *cond)
85
 
{
86
 
  LogProtoTextClient *self = (LogProtoTextClient *) s;
87
 
  
88
 
  *fd = self->super.transport->fd;
89
 
  *cond = self->super.transport->cond;
90
 
 
91
 
  /* if there's no pending I/O in the transport layer, then we want to do a write */
92
 
  if (*cond == 0)
93
 
    *cond = G_IO_OUT;
94
 
  return self->partial != NULL;
95
 
}
96
 
 
97
 
static LogProtoStatus
98
 
log_proto_text_client_flush(LogProto *s)
99
 
{
100
 
  LogProtoTextClient *self = (LogProtoTextClient *) s;
101
 
  gint rc;
102
 
 
103
 
  /* attempt to flush previously buffered data */
104
 
  if (self->partial)
105
 
    {
106
 
      gint len = self->partial_len - self->partial_pos;
107
 
      
108
 
      rc = log_transport_write(self->super.transport, &self->partial[self->partial_pos], len);
109
 
      if (rc < 0)
110
 
        {
111
 
          if (errno != EAGAIN && errno != EINTR)
112
 
            {
113
 
              msg_error("I/O error occurred while writing",
114
 
                        evt_tag_int("fd", self->super.transport->fd),
115
 
                        evt_tag_errno(EVT_TAG_OSERROR, errno),
116
 
                        NULL);
117
 
              return LPS_ERROR;
118
 
            }
119
 
          return LPS_SUCCESS;
120
 
        }
121
 
      else if (rc != len)
122
 
        {
123
 
          self->partial_pos += rc;
124
 
          return LPS_SUCCESS;
125
 
        }
126
 
      else
127
 
        {
128
 
          if (self->partial_free)
129
 
            self->partial_free(self->partial);
130
 
          self->partial = NULL;
131
 
          if (self->next_state >= 0)
132
 
            {
133
 
              self->state = self->next_state;
134
 
              self->next_state = -1;
135
 
            }
136
 
          /* NOTE: we return here to give a chance to the framed protocol to send the frame header. */
137
 
          return LPS_SUCCESS;
138
 
        }
139
 
    }
140
 
  return LPS_SUCCESS;
141
 
}
142
 
 
143
 
static LogProtoStatus
144
 
log_proto_text_client_submit_write(LogProto *s, guchar *msg, gsize msg_len, GDestroyNotify msg_free, gint next_state)
145
 
{
146
 
  LogProtoTextClient *self = (LogProtoTextClient *) s;
147
 
 
148
 
  g_assert(self->partial == NULL);
149
 
  self->partial = msg;
150
 
  self->partial_len = msg_len;
151
 
  self->partial_pos = 0;
152
 
  self->partial_free = msg_free;
153
 
  self->next_state = next_state;
154
 
  return log_proto_text_client_flush(s);
155
 
}
156
 
 
157
 
 
158
 
/*
159
 
 * log_proto_text_client_post:
160
 
 * @msg: formatted log message to send (this might be consumed by this function)
161
 
 * @msg_len: length of @msg
162
 
 * @consumed: pointer to a gboolean that gets set if the message was consumed by this function
163
 
 * @error: error information, if any
164
 
 *
165
 
 * This function posts a message to the log transport, performing buffering
166
 
 * of partially sent data if needed. The return value indicates whether we
167
 
 * successfully sent this message, or if it should be resent by the caller.
168
 
 **/
169
 
static LogProtoStatus
170
 
log_proto_text_client_post(LogProto *s, guchar *msg, gsize msg_len, gboolean *consumed)
171
 
{
172
 
  LogProtoTextClient *self = (LogProtoTextClient *) s;
173
 
  gint rc;
174
 
 
175
 
  /* NOTE: the client does not support charset conversion for now */
176
 
  g_assert(self->super.convert == (GIConv) -1);
177
 
 
178
 
  /* try to flush already buffered data */
179
 
  *consumed = FALSE;
180
 
  rc = log_proto_text_client_flush(s);
181
 
  if (rc == LPS_ERROR)
182
 
    {
183
 
      /* log_proto_flush() already logs in the case of an error */
184
 
      return rc;
185
 
    }
186
 
 
187
 
  if (self->partial)
188
 
    {
189
 
      /* NOTE: the partial buffer has not been emptied yet even with the
190
 
       * flush above, we shouldn't attempt to write again.
191
 
       *
192
 
       * Otherwise: with the framed protocol this could case the frame
193
 
       * header to be split, and interleaved with message payload, as in:
194
 
       *
195
 
       *     First bytes of frame header || payload || tail of frame header.
196
 
       *
197
 
       * This obviously would cause the framing to break. Also libssl
198
 
       * returns an error in this case, which is how this was discovered.
199
 
       */
200
 
      return rc;
201
 
    }
202
 
 
203
 
  *consumed = TRUE;
204
 
  return log_proto_text_client_submit_write(s, msg, msg_len, (GDestroyNotify) g_free, -1);
205
 
}
206
 
 
207
 
LogProto *
208
 
log_proto_text_client_new(LogTransport *transport)
209
 
{
210
 
  LogProtoTextClient *self = g_new0(LogProtoTextClient, 1);
211
 
 
212
 
  self->super.prepare = log_proto_text_client_prepare;
213
 
  self->super.flush = log_proto_text_client_flush;
214
 
  self->super.post = log_proto_text_client_post;
215
 
  self->super.transport = transport;
216
 
  self->super.convert = (GIConv) -1;
217
 
  self->next_state = -1;
218
 
  return &self->super;
219
 
}
220
 
 
221
 
typedef struct _LogProtoFileWriter
222
 
{
223
 
  LogProto super;
224
 
  guchar *partial;
225
 
  gsize partial_len, partial_pos;
226
 
  gint buf_size;
227
 
  gint buf_count;
228
 
  gint fd;
229
 
  gint sum_len;
230
 
  gboolean fsync;
231
 
  struct iovec buffer[0];
232
 
} LogProtoFileWriter;
233
 
 
234
 
/*
235
 
 * log_proto_file_writer_flush:
236
 
 *
237
 
 * this function flushes the file output buffer
238
 
 * it is called either form log_proto_file_writer_post (normal mode: the buffer is full)
239
 
 * or from log_proto_flush (foced flush: flush time, exit, etc)
240
 
 *
241
 
 */
242
 
static LogProtoStatus
243
 
log_proto_file_writer_flush(LogProto *s)
244
 
{
245
 
  LogProtoFileWriter *self = (LogProtoFileWriter *)s;
246
 
  gint rc, i, i0, sum, ofs, pos;
247
 
 
248
 
  /* we might be called from log_writer_deinit() without having a buffer at all */
249
 
 
250
 
  if (self->buf_count == 0)
251
 
    return LPS_SUCCESS;
252
 
 
253
 
  /* lseek() is used instead of O_APPEND, as on NFS  O_APPEND performs
254
 
   * poorly, as reported on the mailing list 2008/05/29 */
255
 
 
256
 
  lseek(self->fd, 0, SEEK_END);
257
 
  rc = writev(self->fd, self->buffer, self->buf_count);
258
 
  if (rc > 0 && self->fsync)
259
 
    fsync(self->fd);
260
 
 
261
 
  if (rc < 0)
262
 
    {
263
 
      if (errno != EAGAIN && errno != EINTR)
264
 
        {
265
 
          msg_error("I/O error occurred while writing",
266
 
                    evt_tag_int("fd", self->super.transport->fd),
267
 
                    evt_tag_errno(EVT_TAG_OSERROR, errno),
268
 
                    NULL);
269
 
          return LPS_ERROR;
270
 
        }
271
 
 
272
 
      return LPS_SUCCESS;
273
 
    }
274
 
  else if (rc != self->sum_len)
275
 
    {
276
 
      /* partial success: not everything has been written out */
277
 
      /* look for the first chunk that has been cut */
278
 
      sum = self->buffer[0].iov_len; /* sum is the cumulated length of the already processed items */
279
 
      i = 0;
280
 
      while (rc > sum)
281
 
        sum += self->buffer[++i].iov_len;
282
 
      self->partial_len = sum - rc; /* this is the length of the first non-written chunk */
283
 
      i0 = i;
284
 
      ++i;
285
 
      /* add the lengths of the following messages */
286
 
      while (i < self->buf_count)
287
 
        self->partial_len += self->buffer[i++].iov_len;
288
 
      /* allocate and copy the remaning data */
289
 
      self->partial = (guchar *)g_malloc(self->partial_len);
290
 
      ofs = sum - rc; /* the length of the remaning (not processed) chunk in the first message */
291
 
      pos = self->buffer[i0].iov_len - ofs;
292
 
      memcpy(self->partial, self->buffer[i0].iov_base + pos, ofs);
293
 
      i = i0 + 1;
294
 
      while (i < self->buf_count)
295
 
        {
296
 
          memcpy(self->partial + ofs, self->buffer[i].iov_base, self->buffer[i].iov_len);
297
 
          ofs += self->buffer[i].iov_len;
298
 
          ++i;
299
 
        }
300
 
      self->partial_pos = 0;
301
 
    }
302
 
 
303
 
  /* free the previous message strings (the remaning part has been copied to the partial buffer) */
304
 
  for (i = 0; i < self->buf_count; ++i)
305
 
    g_free(self->buffer[i].iov_base);
306
 
  self->buf_count = 0;
307
 
  self->sum_len = 0;
308
 
 
309
 
  return LPS_SUCCESS;
310
 
}
311
 
 
312
 
/*
313
 
 * log_proto_file_writer_post:
314
 
 * @msg: formatted log message to send (this might be consumed by this function)
315
 
 * @msg_len: length of @msg
316
 
 * @consumed: pointer to a gboolean that gets set if the message was consumed by this function
317
 
 * @error: error information, if any
318
 
 *
319
 
 * This function posts a message to the log transport, performing buffering
320
 
 * of partially sent data if needed. The return value indicates whether we
321
 
 * successfully sent this message, or if it should be resent by the caller.
322
 
 **/
323
 
static LogProtoStatus
324
 
log_proto_file_writer_post(LogProto *s, guchar *msg, gsize msg_len, gboolean *consumed)
325
 
{
326
 
  LogProtoFileWriter *self = (LogProtoFileWriter *)s;
327
 
  gint rc;
328
 
 
329
 
  if (self->buf_count >= self->buf_size)
330
 
    {
331
 
      rc = log_proto_file_writer_flush(s);
332
 
      if (rc != LPS_SUCCESS || self->buf_count >= self->buf_size)
333
 
        {
334
 
          /* don't consume a new message if flush failed, or even after the flush we don't have any free slots */
335
 
          return rc;
336
 
        }
337
 
    }
338
 
 
339
 
  *consumed = FALSE;
340
 
  if (self->partial)
341
 
    {
342
 
      /* there is still some data from the previous file writing process */
343
 
      gint len = self->partial_len - self->partial_pos;
344
 
 
345
 
      rc = write(self->fd, self->partial + self->partial_pos, len);
346
 
      if (rc > 0 && self->fsync)
347
 
        fsync(self->fd);
348
 
      if (rc < 0)
349
 
        {
350
 
          goto write_error;
351
 
        }
352
 
      else if (rc != len)
353
 
        {
354
 
          self->partial_pos += rc;
355
 
          return LPS_SUCCESS;
356
 
        }
357
 
      else
358
 
        {
359
 
          g_free(self->partial);
360
 
          self->partial = NULL;
361
 
          /* NOTE: we return here to give a chance to the framed protocol to send the frame header. */
362
 
          return LPS_SUCCESS;
363
 
        }
364
 
    }
365
 
 
366
 
  /* register the new message */
367
 
  self->buffer[self->buf_count].iov_base = (void *) msg;
368
 
  self->buffer[self->buf_count].iov_len = msg_len;
369
 
  ++self->buf_count;
370
 
  self->sum_len += msg_len;
371
 
  *consumed = TRUE;
372
 
 
373
 
  if (self->buf_count == self->buf_size)
374
 
    {
375
 
      /* we have reached the max buffer size -> we need to write the messages */
376
 
      return log_proto_file_writer_flush(s);
377
 
    }
378
 
 
379
 
  return LPS_SUCCESS;
380
 
 
381
 
write_error:
382
 
  if (errno != EAGAIN && errno != EINTR)
383
 
    {
384
 
      msg_error("I/O error occurred while writing",
385
 
                evt_tag_int("fd", self->super.transport->fd),
386
 
                evt_tag_errno(EVT_TAG_OSERROR, errno),
387
 
                NULL);
388
 
      return LPS_ERROR;
389
 
    }
390
 
 
391
 
  return LPS_SUCCESS;
392
 
}
393
 
 
394
 
static gboolean
395
 
log_proto_file_writer_prepare(LogProto *s, gint *fd, GIOCondition *cond)
396
 
{
397
 
  LogProtoFileWriter *self = (LogProtoFileWriter *) s;
398
 
 
399
 
  *fd = self->super.transport->fd;
400
 
  *cond = self->super.transport->cond;
401
 
 
402
 
  /* if there's no pending I/O in the transport layer, then we want to do a write */
403
 
  if (*cond == 0)
404
 
    *cond = G_IO_OUT;
405
 
  return self->buf_count > 0 || self->partial;
406
 
}
407
 
 
408
 
LogProto *
409
 
log_proto_file_writer_new(LogTransport *transport, gint flush_lines, gboolean fsync)
410
 
{
411
 
  if (flush_lines == 0)
412
 
    /* the flush-lines option has not been specified, use a default value */
413
 
    flush_lines = 1;
414
 
#ifdef IOV_MAX
415
 
  if (flush_lines > IOV_MAX)
416
 
    /* limit the flush_lines according to the current platform */
417
 
    flush_lines = IOV_MAX;
418
 
#endif
419
 
 
420
 
  /* allocate the structure with the proper number of items at the end */
421
 
  LogProtoFileWriter *self = (LogProtoFileWriter *)g_malloc0(sizeof(LogProtoFileWriter) + sizeof(struct iovec)*flush_lines);
422
 
 
423
 
  self->fd = transport->fd;
424
 
  self->buf_size = flush_lines;
425
 
  self->fsync = fsync;
426
 
  self->super.prepare = log_proto_file_writer_prepare;
427
 
  self->super.post = log_proto_file_writer_post;
428
 
  self->super.flush = log_proto_file_writer_flush;
429
 
  self->super.transport = transport;
430
 
  self->super.convert = (GIConv) -1;
431
 
  return &self->super;
432
 
}
433
 
 
434
 
 
435
 
 
436
 
typedef struct _LogProtoBufferedServerState
437
 
{
438
 
  /* NOTE: that if you add/remove structure members you have to update
439
 
   * the byte order swap code in LogProtoFileReader for mulit-byte
440
 
   * members. */
441
 
 
442
 
  guint8 version;
443
 
 
444
 
  /* this indicates that the members in the struct are stored in
445
 
   * big-endian byte order. if the byte ordering of the struct doesn't
446
 
   * match the current CPU byte ordering, then the members are
447
 
   * byte-swapped when the state is loaded.
448
 
   */
449
 
  guint8 big_endian:1;
450
 
  guint8 raw_buffer_leftover_size;
451
 
  guint8 __padding1[1];
452
 
  guint32 buffer_pos;
453
 
  guint32 pending_buffer_end;
454
 
  guint32 buffer_size;
455
 
  guint32 buffer_cached_eol;
456
 
  guint32 pending_buffer_pos;
457
 
 
458
 
  /* the stream position where we converted out current buffer from (offset in file) */
459
 
  gint64 raw_stream_pos;
460
 
  gint64 pending_raw_stream_pos;
461
 
  /* the size of raw data (measured in bytes) that got converted from raw_stream_pos into our buffer */
462
 
  gint32 raw_buffer_size;
463
 
  gint32 pending_raw_buffer_size;
464
 
  guchar raw_buffer_leftover[8];
465
 
 
466
 
  gint64 file_size;
467
 
  gint64 file_inode;
468
 
} LogProtoBufferedServerState;
469
 
 
470
 
typedef struct _LogProtoBufferedServer LogProtoBufferedServer;
471
 
struct _LogProtoBufferedServer
472
 
{
473
 
  LogProto super;
474
 
  gboolean (*fetch_from_buf)(LogProtoBufferedServer *self, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest);
475
 
  gint (*read_data)(LogProtoBufferedServer *self, guchar *buf, gsize len, GSockAddr **sa);
476
 
 
477
 
  LogProtoBufferedServerState *state1;
478
 
  PersistState *persist_state;
479
 
  PersistEntryHandle persist_handle;
480
 
 
481
 
  gint max_buffer_size;
482
 
  gint init_buffer_size;
483
 
  guchar *buffer;
484
 
  GSockAddr *prev_saddr;
485
 
};
486
 
 
487
 
 
488
 
/*
489
 
 * log_proto_get_fixed_encoding_scale:
490
 
 *
491
 
 * This function returns the number of bytes of a single character in the
492
 
 * encoding specified by the @encoding parameter, provided it is listed in
493
 
 * the limited set hard-wired in the fixed_encodings array above.
494
 
 *
495
 
 * syslog-ng sometimes needs to calculate the size of the original, raw data
496
 
 * that relates to its already utf8 converted input buffer.  For that the
497
 
 * slow solution is to actually perform the utf8 -> raw conversion, however
498
 
 * since we don't really need the actual conversion, just the size of the
499
 
 * data in bytes we can be faster than that by multiplying the number of
500
 
 * input characters with the size of the character in the known
501
 
 * fixed-length-encodings in the list above.
502
 
 *
503
 
 * This function returns 0 if the encoding is not known, in which case the
504
 
 * slow path is to be executed.
505
 
 */
506
 
gint
507
 
log_proto_get_char_size_for_fixed_encoding(const gchar *encoding)
508
 
{
509
 
  static struct
510
 
  {
511
 
    const gchar *prefix;
512
 
    gint scale;
513
 
  } fixed_encodings[] = {
514
 
    { "ascii", 1 },
515
 
    { "us-ascii", 1 },
516
 
    { "iso-8859", 1 },
517
 
    { "iso8859", 1 },
518
 
    { "latin", 1 },
519
 
    { "ucs2", 2 },
520
 
    { "ucs-2", 2 },
521
 
    { "ucs4", 4 },
522
 
    { "ucs-4", 4 },
523
 
    { "koi", 1 },
524
 
    { "unicode", 2 },
525
 
    { "windows", 1 },
526
 
    { "wchar_t", sizeof(wchar_t) },
527
 
    { NULL, 0 }
528
 
  };
529
 
  gint scale = 0;
530
 
  gint i;
531
 
 
532
 
  for (i = 0; fixed_encodings[i].prefix; i++)
533
 
   {
534
 
     if (strncasecmp(encoding, fixed_encodings[i].prefix, strlen(fixed_encodings[i].prefix)) == 0)
535
 
       {
536
 
         scale = fixed_encodings[i].scale;
537
 
         break;
538
 
       }
539
 
   }
540
 
  return scale;
541
 
}
542
 
 
543
 
static LogProtoBufferedServerState *
544
 
log_proto_buffered_server_get_state(LogProtoBufferedServer *self)
545
 
{
546
 
  if (self->persist_state)
547
 
    {
548
 
      g_assert(self->persist_handle != 0);
549
 
      return persist_state_map_entry(self->persist_state, self->persist_handle);
550
 
    }
551
 
  if (G_UNLIKELY(!self->state1))
552
 
    {
553
 
      self->state1 = g_new0(LogProtoBufferedServerState, 1);
554
 
    }
555
 
  return self->state1;
556
 
}
557
 
 
558
 
static void
559
 
log_proto_buffered_server_put_state(LogProtoBufferedServer *self)
560
 
{
561
 
  if (self->persist_state && self->persist_handle)
562
 
    persist_state_unmap_entry(self->persist_state, self->persist_handle);
563
 
}
564
 
 
565
 
static gboolean
566
 
log_proto_buffered_server_convert_from_raw(LogProtoBufferedServer *self, const guchar *raw_buffer, gsize raw_buffer_len)
567
 
{
568
 
  /* some data was read */
569
 
  gsize avail_in = raw_buffer_len;
570
 
  gsize avail_out;
571
 
  gchar *out;
572
 
  gint  ret = -1;
573
 
  gboolean success = FALSE;
574
 
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
575
 
 
576
 
  do
577
 
    {
578
 
      avail_out = state->buffer_size - state->pending_buffer_end;
579
 
      out = (gchar *) self->buffer + state->pending_buffer_end;
580
 
 
581
 
      ret = g_iconv(self->super.convert, (gchar **) &raw_buffer, &avail_in, (gchar **) &out, &avail_out);
582
 
      if (ret == (gsize) -1)
583
 
        {
584
 
          switch (errno)
585
 
            {
586
 
            case EINVAL:
587
 
              if ((self->super.flags & LPBS_RECORD) == 0)
588
 
                {
589
 
                  /* Incomplete text, do not report an error, rather try to read again */
590
 
                  state->pending_buffer_end = state->buffer_size - avail_out;
591
 
 
592
 
                  if (avail_in > 0)
593
 
                    {
594
 
                      if (avail_in > sizeof(state->raw_buffer_leftover))
595
 
                        {
596
 
                          msg_error("Invalid byte sequence, the remaining raw buffer is larger than the supported leftover size",
597
 
                                    evt_tag_str("encoding", self->super.encoding),
598
 
                                    evt_tag_int("avail_in", avail_in),
599
 
                                    evt_tag_int("leftover_size", sizeof(state->raw_buffer_leftover)),
600
 
                                    NULL);
601
 
                          goto error;
602
 
                        }
603
 
                      memcpy(state->raw_buffer_leftover, raw_buffer, avail_in);
604
 
                      state->raw_buffer_leftover_size = avail_in;
605
 
                      state->raw_buffer_size -= avail_in;
606
 
                      msg_trace("Leftover characters remained after conversion, delaying message until another chunk arrives",
607
 
                                evt_tag_str("encoding", self->super.encoding),
608
 
                                evt_tag_int("avail_in", avail_in),
609
 
                                NULL);
610
 
                      goto success;
611
 
                    }
612
 
                }
613
 
              else
614
 
                {
615
 
                  msg_error("Byte sequence too short, cannot convert an individual frame in its entirety",
616
 
                            evt_tag_str("encoding", self->super.encoding),
617
 
                            evt_tag_int("avail_in", avail_in),
618
 
                            NULL);
619
 
                  goto error;
620
 
                }
621
 
              break;
622
 
            case E2BIG:
623
 
              state->pending_buffer_end = state->buffer_size - avail_out;
624
 
              /* extend the buffer */
625
 
 
626
 
              if (state->buffer_size < self->max_buffer_size)
627
 
                {
628
 
                  state->buffer_size *= 2;
629
 
                  if (state->buffer_size > self->max_buffer_size)
630
 
                    state->buffer_size = self->max_buffer_size;
631
 
 
632
 
                  self->buffer = g_realloc(self->buffer, state->buffer_size);
633
 
 
634
 
                  /* recalculate the out pointer, and add what we have now */
635
 
                  ret = -1;
636
 
                }
637
 
              else
638
 
                {
639
 
                  msg_error("Incoming byte stream requires a too large conversion buffer, probably invalid character sequence",
640
 
                            evt_tag_str("encoding", self->super.encoding),
641
 
                            evt_tag_printf("buffer", "%.*s", (gint) state->pending_buffer_end, self->buffer),
642
 
                            NULL);
643
 
                  goto error;
644
 
                }
645
 
              break;
646
 
            case EILSEQ:
647
 
            default:
648
 
              msg_notice("Invalid byte sequence or other error while converting input, skipping character",
649
 
                         evt_tag_str("encoding", self->super.encoding),
650
 
                         evt_tag_printf("char", "0x%02x", *(guchar *) raw_buffer),
651
 
                         NULL);
652
 
              goto error;
653
 
            }
654
 
        }
655
 
      else
656
 
        {
657
 
          state->pending_buffer_end = state->buffer_size - avail_out;
658
 
        }
659
 
    }
660
 
  while (avail_in > 0);
661
 
 
662
 
 success:
663
 
  success = TRUE;
664
 
 error:
665
 
  log_proto_buffered_server_put_state(self);
666
 
  return success;
667
 
}
668
 
 
669
 
static void
670
 
log_proto_buffered_server_apply_state(LogProtoBufferedServer *self, PersistEntryHandle handle, const gchar *persist_name)
671
 
{
672
 
  struct stat st;
673
 
  gint64 ofs = 0;
674
 
  LogProtoBufferedServerState *state;
675
 
  gint fd;
676
 
 
677
 
  fd = self->super.transport->fd;
678
 
  self->persist_handle = handle;
679
 
 
680
 
  if (fstat(fd, &st) < 0)
681
 
    return;
682
 
 
683
 
  state = log_proto_buffered_server_get_state(self);
684
 
 
685
 
  if (!self->buffer)
686
 
    {
687
 
      self->buffer = g_malloc(state->buffer_size);
688
 
    }
689
 
  state->pending_buffer_end = 0;
690
 
 
691
 
  if (state->file_inode &&
692
 
      state->file_inode == st.st_ino &&
693
 
      state->file_size <= st.st_size &&
694
 
      state->raw_stream_pos <= st.st_size)
695
 
    {
696
 
      ofs = state->raw_stream_pos;
697
 
 
698
 
      lseek(fd, ofs, SEEK_SET);
699
 
    }
700
 
  else
701
 
    {
702
 
      if (state->file_inode)
703
 
        {
704
 
          /* the stored state does not match the current file */
705
 
          msg_notice("The current log file has a mismatching size/inode information, restarting from the beginning",
706
 
                     evt_tag_str("state", persist_name),
707
 
                     evt_tag_int("stored_inode", state->file_inode),
708
 
                     evt_tag_int("cur_file_inode", st.st_ino),
709
 
                     evt_tag_int("stored_size", state->file_size),
710
 
                     evt_tag_int("cur_file_size", st.st_size),
711
 
                     evt_tag_int("raw_stream_pos", state->raw_stream_pos),
712
 
                     NULL);
713
 
        }
714
 
      goto error;
715
 
    }
716
 
  if (state->raw_buffer_size)
717
 
    {
718
 
      gssize rc;
719
 
      guchar *raw_buffer;
720
 
 
721
 
      if (!self->super.encoding)
722
 
        {
723
 
          /* no conversion, we read directly into our buffer */
724
 
          if (state->raw_buffer_size > state->buffer_size)
725
 
            {
726
 
              msg_notice("Invalid LogProtoBufferedServerState.raw_buffer_size, larger than buffer_size and no encoding is set, restarting from the beginning",
727
 
                         evt_tag_str("state", persist_name),
728
 
                         evt_tag_int("raw_buffer_size", state->raw_buffer_size),
729
 
                         evt_tag_int("buffer_size", state->buffer_size),
730
 
                         evt_tag_int("init_buffer_size", self->init_buffer_size),
731
 
                         NULL);
732
 
              goto error;
733
 
            }
734
 
          raw_buffer = self->buffer;
735
 
        }
736
 
      else
737
 
        {
738
 
          if (state->raw_buffer_size > self->max_buffer_size)
739
 
            {
740
 
              msg_notice("Invalid LogProtoBufferedServerState.raw_buffer_size, larger than max_buffer_size, restarting from the beginning",
741
 
                         evt_tag_str("state", persist_name),
742
 
                         evt_tag_int("raw_buffer_size", state->raw_buffer_size),
743
 
                         evt_tag_int("init_buffer_size", self->init_buffer_size),
744
 
                         evt_tag_int("max_buffer_size", self->max_buffer_size),
745
 
                         NULL);
746
 
              goto error;
747
 
            }
748
 
          raw_buffer = g_alloca(state->raw_buffer_size);
749
 
        }
750
 
 
751
 
      rc = log_transport_read(self->super.transport, raw_buffer, state->raw_buffer_size, NULL);
752
 
      if (rc != state->raw_buffer_size)
753
 
        {
754
 
          msg_notice("Error re-reading buffer contents of the file to be continued, restarting from the beginning",
755
 
                     evt_tag_str("state", persist_name),
756
 
                     NULL);
757
 
          goto error;
758
 
        }
759
 
 
760
 
      state->pending_buffer_end = 0;
761
 
      if (self->super.encoding)
762
 
        {
763
 
          if (!log_proto_buffered_server_convert_from_raw(self, raw_buffer, rc))
764
 
            {
765
 
              msg_notice("Error re-converting buffer contents of the file to be continued, restarting from the beginning",
766
 
                         evt_tag_str("state", persist_name),
767
 
                         NULL);
768
 
              goto error;
769
 
            }
770
 
        }
771
 
      else
772
 
        {
773
 
          state->pending_buffer_end += rc;
774
 
        }
775
 
 
776
 
      if (state->buffer_pos > state->pending_buffer_end ||
777
 
          state->buffer_cached_eol > state->pending_buffer_end)
778
 
        {
779
 
          msg_notice("Converted buffer contents is smaller than the current buffer position, starting from the beginning of the buffer, some lines may be duplicated",
780
 
                     evt_tag_str("state", persist_name),
781
 
                     NULL);
782
 
          state->buffer_pos = state->pending_buffer_pos = state->buffer_cached_eol = 0;
783
 
        }
784
 
    }
785
 
  else
786
 
    {
787
 
      /* although we do have buffer position information, but the
788
 
       * complete contents of the buffer is already processed, instead
789
 
       * of reading and then dropping it, position the file after the
790
 
       * indicated block */
791
 
 
792
 
      state->raw_stream_pos += state->raw_buffer_size;
793
 
      ofs = state->raw_stream_pos;
794
 
      state->raw_buffer_size = 0;
795
 
      state->buffer_pos = state->pending_buffer_end = 0;
796
 
 
797
 
      lseek(fd, state->raw_stream_pos, SEEK_SET);
798
 
    }
799
 
  goto exit;
800
 
 
801
 
 error:
802
 
  ofs = 0;
803
 
  state->buffer_pos = 0;
804
 
  state->pending_buffer_end = 0;
805
 
  state->buffer_cached_eol = 0;
806
 
  state->raw_stream_pos = 0;
807
 
  state->raw_buffer_size = 0;
808
 
  state->raw_buffer_leftover_size = 0;
809
 
  lseek(fd, 0, SEEK_SET);
810
 
 
811
 
 exit:
812
 
  state->file_inode = st.st_ino;
813
 
  state->file_size = st.st_size;
814
 
  state->raw_stream_pos = ofs;
815
 
  state->pending_buffer_pos = state->buffer_pos;
816
 
  state->pending_raw_stream_pos = state->raw_stream_pos;
817
 
  state->pending_raw_buffer_size = state->raw_buffer_size;
818
 
 
819
 
  state = NULL;
820
 
  log_proto_buffered_server_put_state(self);
821
 
}
822
 
 
823
 
static PersistEntryHandle
824
 
log_proto_buffered_server_alloc_state(LogProtoBufferedServer *self, PersistState *persist_state, const gchar *persist_name)
825
 
{
826
 
  LogProtoBufferedServerState *state;
827
 
  PersistEntryHandle handle;
828
 
 
829
 
  handle = persist_state_alloc_entry(persist_state, persist_name, sizeof(LogProtoBufferedServerState));
830
 
  if (handle)
831
 
    {
832
 
      state = persist_state_map_entry(persist_state, handle);
833
 
 
834
 
      state->version = 0;
835
 
      state->big_endian = (G_BYTE_ORDER == G_BIG_ENDIAN);
836
 
 
837
 
      persist_state_unmap_entry(persist_state, handle);
838
 
 
839
 
    }
840
 
  return handle;
841
 
}
842
 
 
843
 
static gboolean
844
 
log_proto_buffered_server_convert_state(LogProtoBufferedServer *self, guint8 persist_version, gpointer old_state, gsize old_state_size, LogProtoBufferedServerState *state)
845
 
{
846
 
  if (persist_version <= 2)
847
 
    {
848
 
      state->version = 0;
849
 
      state->file_inode = 0;
850
 
      state->raw_stream_pos = strtoll((gchar *) old_state, NULL, 10);
851
 
      state->file_size = 0;
852
 
 
853
 
      return TRUE;
854
 
    }
855
 
  else if (persist_version == 3)
856
 
    {
857
 
      SerializeArchive *archive;
858
 
      guint32 read_length;
859
 
      gint64 cur_size;
860
 
      gint64 cur_inode;
861
 
      gint64 cur_pos;
862
 
      guint16 version;
863
 
      gchar *buffer;
864
 
      gsize buffer_len;
865
 
 
866
 
      cur_inode = -1;
867
 
      cur_pos = 0;
868
 
      cur_size = 0;
869
 
      archive = serialize_buffer_archive_new(old_state, old_state_size);
870
 
 
871
 
      /* NOTE: the v23 conversion code adds an extra length field which we
872
 
       * need to read out. */
873
 
      g_assert(serialize_read_uint32(archive, &read_length) && read_length == old_state_size - sizeof(read_length));
874
 
 
875
 
      /* original v3 format starts here */
876
 
      if (!serialize_read_uint16(archive, &version) || version != 0)
877
 
        {
878
 
          msg_error("Internal error restoring log reader state, stored data has incorrect version",
879
 
                    evt_tag_int("version", version));
880
 
          goto error_converting_v3;
881
 
        }
882
 
 
883
 
      if (!serialize_read_uint64(archive, (guint64 *) &cur_pos) ||
884
 
          !serialize_read_uint64(archive, (guint64 *) &cur_inode) ||
885
 
          !serialize_read_uint64(archive, (guint64 *) &cur_size))
886
 
        {
887
 
          msg_error("Internal error restoring information about the current file position, restarting from the beginning",
888
 
                    NULL);
889
 
          goto error_converting_v3;
890
 
        }
891
 
 
892
 
      if (!serialize_read_uint16(archive, &version) || version != 0)
893
 
        {
894
 
          msg_error("Internal error, protocol state has incorrect version",
895
 
                    evt_tag_int("version", version),
896
 
                    NULL);
897
 
          goto error_converting_v3;
898
 
        }
899
 
 
900
 
      if (!serialize_read_cstring(archive, &buffer, &buffer_len))
901
 
        {
902
 
          msg_error("Internal error, error reading buffer contents",
903
 
                    evt_tag_int("version", version),
904
 
                    NULL);
905
 
          goto error_converting_v3;
906
 
        }
907
 
 
908
 
      if (!self->buffer || state->buffer_size < buffer_len)
909
 
        {
910
 
          gsize buffer_size = MAX(self->init_buffer_size, buffer_len);
911
 
          self->buffer = g_realloc(self->buffer, buffer_size);
912
 
        }
913
 
      serialize_archive_free(archive);
914
 
 
915
 
      memcpy(self->buffer, buffer, buffer_len);
916
 
      state->buffer_pos = 0;
917
 
      state->pending_buffer_end = buffer_len;
918
 
      g_free(buffer);
919
 
 
920
 
      state->version = 0;
921
 
      state->file_inode = cur_inode;
922
 
      state->raw_stream_pos = cur_pos;
923
 
      state->file_size = cur_size;
924
 
      return TRUE;
925
 
    error_converting_v3:
926
 
      serialize_archive_free(archive);
927
 
    }
928
 
  return FALSE;
929
 
}
930
 
 
931
 
gboolean
932
 
log_proto_buffered_server_restart_with_state(LogProto *s, PersistState *persist_state, const gchar *persist_name)
933
 
{
934
 
  LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
935
 
  guint8 persist_version;
936
 
  PersistEntryHandle old_state_handle;
937
 
  gpointer old_state;
938
 
  gsize old_state_size;
939
 
  PersistEntryHandle new_state_handle = 0;
940
 
  gpointer new_state = NULL;
941
 
  gboolean success;
942
 
 
943
 
  self->persist_state = persist_state;
944
 
  old_state_handle = persist_state_lookup_entry(persist_state, persist_name, &old_state_size, &persist_version);
945
 
  if (!old_state_handle)
946
 
    {
947
 
      new_state_handle = log_proto_buffered_server_alloc_state(self, persist_state, persist_name);
948
 
      if (!new_state_handle)
949
 
        goto fallback_non_persistent;
950
 
      log_proto_buffered_server_apply_state(self, new_state_handle, persist_name);
951
 
      return TRUE;
952
 
    }
953
 
  if (persist_version < 4)
954
 
    {
955
 
      new_state_handle = log_proto_buffered_server_alloc_state(self, persist_state, persist_name);
956
 
      if (!new_state_handle)
957
 
        goto fallback_non_persistent;
958
 
 
959
 
      old_state = persist_state_map_entry(persist_state, old_state_handle);
960
 
      new_state = persist_state_map_entry(persist_state, new_state_handle);
961
 
      success = log_proto_buffered_server_convert_state(self, persist_version, old_state, old_state_size, new_state);
962
 
      persist_state_unmap_entry(persist_state, old_state_handle);
963
 
      persist_state_unmap_entry(persist_state, new_state_handle);
964
 
 
965
 
      /* we're using the newly allocated state structure regardless if
966
 
       * conversion succeeded. If the conversion went wrong then
967
 
       * new_state is still in its freshly initialized form since the
968
 
       * convert function will not touch the state in the error
969
 
       * branches.
970
 
       */
971
 
 
972
 
      log_proto_buffered_server_apply_state(self, new_state_handle, persist_name);
973
 
      return success;
974
 
    }
975
 
  else if (persist_version == 4)
976
 
    {
977
 
      LogProtoBufferedServerState *state;
978
 
 
979
 
      old_state = persist_state_map_entry(persist_state, old_state_handle);
980
 
      state = old_state;
981
 
      if ((state->big_endian && G_BYTE_ORDER == G_LITTLE_ENDIAN) ||
982
 
          (!state->big_endian && G_BYTE_ORDER == G_BIG_ENDIAN))
983
 
        {
984
 
 
985
 
          /* byte order conversion in order to avoid the hassle with
986
 
             scattered byte order conversions in the code */
987
 
 
988
 
          state->big_endian = !state->big_endian;
989
 
          state->buffer_pos = GUINT32_SWAP_LE_BE(state->buffer_pos);
990
 
          state->pending_buffer_pos = GUINT32_SWAP_LE_BE(state->pending_buffer_pos);
991
 
          state->pending_buffer_end = GUINT32_SWAP_LE_BE(state->pending_buffer_end);
992
 
          state->buffer_size = GUINT32_SWAP_LE_BE(state->buffer_size);
993
 
          state->buffer_cached_eol = GUINT32_SWAP_LE_BE(state->buffer_cached_eol);
994
 
          state->raw_stream_pos = GUINT64_SWAP_LE_BE(state->raw_stream_pos);
995
 
          state->raw_buffer_size = GUINT32_SWAP_LE_BE(state->raw_buffer_size);
996
 
          state->pending_raw_stream_pos = GUINT64_SWAP_LE_BE(state->pending_raw_stream_pos);
997
 
          state->pending_raw_buffer_size = GUINT32_SWAP_LE_BE(state->pending_raw_buffer_size);
998
 
          state->file_size = GUINT64_SWAP_LE_BE(state->file_size);
999
 
          state->file_inode = GUINT64_SWAP_LE_BE(state->file_inode);
1000
 
        }
1001
 
 
1002
 
      if (state->version > 0)
1003
 
        {
1004
 
          msg_error("Internal error restoring log reader state, stored data is too new",
1005
 
                    evt_tag_int("version", state->version));
1006
 
          goto error;
1007
 
        }
1008
 
      persist_state_unmap_entry(persist_state, old_state_handle);
1009
 
      log_proto_buffered_server_apply_state(self, old_state_handle, persist_name);
1010
 
      return TRUE;
1011
 
    }
1012
 
  else
1013
 
    {
1014
 
      msg_error("Internal error restoring log reader state, stored data is too new",
1015
 
                evt_tag_int("version", persist_version));
1016
 
      goto error;
1017
 
    }
1018
 
  return TRUE;
1019
 
 fallback_non_persistent:
1020
 
  new_state = g_new0(LogProtoBufferedServerState, 1);
1021
 
 error:
1022
 
  if (!new_state)
1023
 
    {
1024
 
      new_state_handle = log_proto_buffered_server_alloc_state(self, persist_state, persist_name);
1025
 
      if (!new_state_handle)
1026
 
        goto fallback_non_persistent;
1027
 
      new_state = persist_state_map_entry(persist_state, new_state_handle);
1028
 
    }
1029
 
  if (new_state)
1030
 
    {
1031
 
      LogProtoBufferedServerState *state = new_state;
1032
 
 
1033
 
      /* error happened,  restart the file from the beginning */
1034
 
      state->raw_stream_pos = 0;
1035
 
      state->file_inode = 0;
1036
 
      state->file_size = 0;
1037
 
      if (new_state_handle)
1038
 
        log_proto_buffered_server_apply_state(self, new_state_handle, persist_name);
1039
 
      else
1040
 
        self->state1 = new_state;
1041
 
    }
1042
 
  if (new_state_handle)
1043
 
    {
1044
 
      persist_state_unmap_entry(persist_state, new_state_handle);
1045
 
    }
1046
 
  return FALSE;
1047
 
}
1048
 
 
1049
 
static gboolean
1050
 
log_proto_buffered_server_prepare(LogProto *s, gint *fd, GIOCondition *cond)
1051
 
{
1052
 
  LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
1053
 
 
1054
 
  *fd = self->super.transport->fd;
1055
 
  *cond = self->super.transport->cond;
1056
 
 
1057
 
  /* if there's no pending I/O in the transport layer, then we want to do a read */
1058
 
  if (*cond == 0)
1059
 
    *cond = G_IO_IN;
1060
 
 
1061
 
  return FALSE;
1062
 
}
1063
 
 
1064
 
 
1065
 
static gint
1066
 
log_proto_buffered_server_read_data(LogProtoBufferedServer *self, guchar *buf, gsize len, GSockAddr **sa)
1067
 
{
1068
 
  return log_transport_read(self->super.transport, buf, len, sa);
1069
 
}
1070
 
 
1071
 
static LogProtoStatus
1072
 
log_proto_buffered_server_fetch_from_buf(LogProtoBufferedServer *self, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
1073
 
{
1074
 
  gsize buffer_bytes;
1075
 
  const guchar *buffer_start;
1076
 
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
1077
 
  gboolean success = FALSE;
1078
 
 
1079
 
  buffer_start = self->buffer + state->pending_buffer_pos;
1080
 
  buffer_bytes = state->pending_buffer_end - state->pending_buffer_pos;
1081
 
 
1082
 
  if (buffer_bytes == 0)
1083
 
    {
1084
 
      /* if buffer_bytes is zero bytes, it means that we completely
1085
 
       * processed our buffer without having a fraction of a line still
1086
 
       * there.  It is important to reset
1087
 
       * pending_buffer_pos/pending_buffer_end to zero as the caller assumes
1088
 
       * that if we return no message from the buffer, then buffer_pos is
1089
 
       * _zero_.
1090
 
       */
1091
 
 
1092
 
      if (G_UNLIKELY(self->super.flags & LPBS_POS_TRACKING))
1093
 
        {
1094
 
          state->pending_raw_stream_pos += state->pending_raw_buffer_size;
1095
 
          state->pending_raw_buffer_size = 0;
1096
 
        }
1097
 
      state->pending_buffer_pos = state->pending_buffer_end = 0;
1098
 
      goto exit;
1099
 
    }
1100
 
 
1101
 
  success = self->fetch_from_buf(self, buffer_start, buffer_bytes, msg, msg_len, flush_the_rest);
1102
 
 exit:
1103
 
  log_proto_buffered_server_put_state(self);
1104
 
  return success;
1105
 
}
1106
 
 
1107
 
/**
1108
 
 * Returns: TRUE to indicate success, FALSE otherwise. The returned
1109
 
 * msg can be NULL even if no failure occurred.
1110
 
 **/
1111
 
static LogProtoStatus
1112
 
log_proto_buffered_server_fetch(LogProto *s, const guchar **msg, gsize *msg_len, GSockAddr **sa, gboolean *may_read)
1113
 
{
1114
 
  LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
1115
 
  gint rc;
1116
 
  guchar *raw_buffer = NULL;
1117
 
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
1118
 
  LogProtoStatus result = self->super.status;
1119
 
 
1120
 
  if (G_UNLIKELY(!self->buffer))
1121
 
    {
1122
 
      self->buffer = g_malloc(self->init_buffer_size);
1123
 
      state->buffer_size = self->init_buffer_size;
1124
 
    }
1125
 
 
1126
 
  if (sa)
1127
 
    *sa = NULL;
1128
 
 
1129
 
  if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, FALSE))
1130
 
    {
1131
 
      if (sa && self->prev_saddr)
1132
 
        *sa = g_sockaddr_ref(self->prev_saddr);
1133
 
      goto exit;
1134
 
    }
1135
 
 
1136
 
  /* ok, no more messages in the buffer, read a chunk */
1137
 
  while (*may_read)
1138
 
    {
1139
 
      gint avail;
1140
 
 
1141
 
      if (self->super.flags & LPBS_NOMREAD)
1142
 
        *may_read = FALSE;
1143
 
 
1144
 
      /* read the next chunk to be processed */
1145
 
 
1146
 
      if (self->prev_saddr)
1147
 
        {
1148
 
          /* new chunk of data, potentially new sockaddr, forget the previous value */
1149
 
          g_sockaddr_unref(self->prev_saddr);
1150
 
          self->prev_saddr = NULL;
1151
 
        }
1152
 
 
1153
 
      if (!self->super.encoding)
1154
 
        {
1155
 
          /* no conversion, we read directly into our buffer */
1156
 
          raw_buffer = self->buffer + state->pending_buffer_end;
1157
 
          avail = state->buffer_size - state->pending_buffer_end;
1158
 
        }
1159
 
      else
1160
 
        {
1161
 
          /* if conversion is needed, we first read into an on-stack
1162
 
           * buffer, and then convert it into our internal buffer */
1163
 
 
1164
 
          raw_buffer = g_alloca(self->init_buffer_size + state->raw_buffer_leftover_size);
1165
 
          memcpy(raw_buffer, state->raw_buffer_leftover, state->raw_buffer_leftover_size);
1166
 
          avail = self->init_buffer_size;
1167
 
        }
1168
 
 
1169
 
      rc = self->read_data(self, raw_buffer + state->raw_buffer_leftover_size, avail, sa);
1170
 
      if (sa && *sa)
1171
 
        self->prev_saddr = *sa;
1172
 
      if (rc < 0)
1173
 
        {
1174
 
          if (errno == EAGAIN)
1175
 
            {
1176
 
              /* ok we don't have any more data to read, return to main poll loop */
1177
 
              break;
1178
 
            }
1179
 
          else
1180
 
            {
1181
 
              /* an error occurred while reading */
1182
 
              msg_error("I/O error occurred while reading",
1183
 
                        evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
1184
 
                        evt_tag_errno(EVT_TAG_OSERROR, errno),
1185
 
                        NULL);
1186
 
 
1187
 
              /* we set self->status explicitly as we want to return
1188
 
               * LPS_ERROR on the _next_ invocation, not now */
1189
 
              self->super.status = LPS_ERROR;
1190
 
              if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, TRUE))
1191
 
                {
1192
 
                  if (sa && self->prev_saddr)
1193
 
                    *sa = g_sockaddr_ref(self->prev_saddr);
1194
 
                  goto exit;
1195
 
                }
1196
 
              result = self->super.status;
1197
 
              goto exit;
1198
 
            }
1199
 
        }
1200
 
      else if (rc == 0)
1201
 
        {
1202
 
          if ((self->super.flags & LPBS_IGNORE_EOF) == 0)
1203
 
            {
1204
 
              /* EOF read */
1205
 
              msg_verbose("EOF occurred while reading",
1206
 
                          evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
1207
 
                          NULL);
1208
 
              if (state->raw_buffer_leftover_size > 0)
1209
 
                {
1210
 
                  msg_error("EOF read on a channel with leftovers from previous character conversion, dropping input",
1211
 
                            NULL);
1212
 
                  result = LPS_EOF;
1213
 
                  goto exit;
1214
 
                }
1215
 
              self->super.status = LPS_EOF;
1216
 
              if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, TRUE))
1217
 
                {
1218
 
                  if (sa && self->prev_saddr)
1219
 
                    *sa = g_sockaddr_ref(self->prev_saddr);
1220
 
                  goto exit;
1221
 
                }
1222
 
              result = self->super.status;
1223
 
              goto exit;
1224
 
            }
1225
 
          else
1226
 
            {
1227
 
              *msg = NULL;
1228
 
              *msg_len = 0;
1229
 
              goto exit;
1230
 
            }
1231
 
        }
1232
 
      else
1233
 
        {
1234
 
          state->pending_raw_buffer_size += rc;
1235
 
          rc += state->raw_buffer_leftover_size;
1236
 
          state->raw_buffer_leftover_size = 0;
1237
 
 
1238
 
          if (self->super.encoding)
1239
 
            {
1240
 
              if (!log_proto_buffered_server_convert_from_raw(self, raw_buffer, rc))
1241
 
                {
1242
 
                  result = LPS_ERROR;
1243
 
                  goto exit;
1244
 
                }
1245
 
            }
1246
 
          else
1247
 
            {
1248
 
              state->pending_buffer_end += rc;
1249
 
            }
1250
 
 
1251
 
          if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, FALSE))
1252
 
            {
1253
 
              if (sa && self->prev_saddr)
1254
 
                *sa = g_sockaddr_ref(self->prev_saddr);
1255
 
              goto exit;
1256
 
            }
1257
 
        }
1258
 
    }
1259
 
 exit:
1260
 
 
1261
 
  /* result contains our result, but once an error happens, the error condition remains persistent */
1262
 
  log_proto_buffered_server_put_state(self);
1263
 
  if (result != LPS_SUCCESS)
1264
 
    self->super.status = result;
1265
 
  return result;
1266
 
}
1267
 
 
1268
 
void
1269
 
log_proto_buffered_server_queued(LogProto *s)
1270
 
{
1271
 
  LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
1272
 
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
1273
 
 
1274
 
  /* NOTE: we modify the current file position _after_ updating
1275
 
     buffer_pos, since if we crash right here, at least we
1276
 
     won't lose data on the next restart, but rather we
1277
 
     duplicate some data */
1278
 
 
1279
 
  state->buffer_pos = state->pending_buffer_pos;
1280
 
  state->raw_stream_pos = state->pending_raw_stream_pos;
1281
 
  state->raw_buffer_size = state->pending_raw_buffer_size;
1282
 
  if (state->pending_buffer_pos == state->pending_buffer_end)
1283
 
    {
1284
 
      state->pending_buffer_end = 0;
1285
 
      state->buffer_pos = state->pending_buffer_pos = 0;
1286
 
    }
1287
 
  if (self->super.flags & LPBS_POS_TRACKING)
1288
 
    {
1289
 
      if (state->buffer_pos == state->pending_buffer_end)
1290
 
        {
1291
 
          state->raw_stream_pos += state->raw_buffer_size;
1292
 
          state->raw_buffer_size = 0;
1293
 
        }
1294
 
    }
1295
 
  msg_trace("Last message got confirmed",
1296
 
            evt_tag_int("raw_stream_pos", state->raw_stream_pos),
1297
 
            evt_tag_int("raw_buffer_len", state->raw_buffer_size),
1298
 
            evt_tag_int("buffer_pos", state->buffer_pos),
1299
 
            evt_tag_int("buffer_end", state->pending_buffer_end),
1300
 
            evt_tag_int("buffer_cached_eol", state->buffer_cached_eol),
1301
 
            NULL);
1302
 
  log_proto_buffered_server_put_state(self);
1303
 
}
1304
 
 
1305
 
void
1306
 
log_proto_buffered_server_free(LogProto *s)
1307
 
{
1308
 
  LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
1309
 
 
1310
 
  g_sockaddr_unref(self->prev_saddr);
1311
 
 
1312
 
  g_free(self->buffer);
1313
 
  if (self->state1)
1314
 
    {
1315
 
      g_free(self->state1);
1316
 
    }
1317
 
}
1318
 
 
1319
 
void
1320
 
log_proto_buffered_server_init(LogProtoBufferedServer *self, LogTransport *transport, gint max_buffer_size, gint init_buffer_size, guint flags)
1321
 
{
1322
 
  self->super.prepare = log_proto_buffered_server_prepare;
1323
 
  self->super.fetch = log_proto_buffered_server_fetch;
1324
 
  self->super.queued = log_proto_buffered_server_queued;
1325
 
  self->super.free_fn = log_proto_buffered_server_free;
1326
 
  self->super.transport = transport;
1327
 
  self->super.convert = (GIConv) -1;
1328
 
  self->super.restart_with_state = log_proto_buffered_server_restart_with_state;
1329
 
  self->read_data = log_proto_buffered_server_read_data;
1330
 
 
1331
 
  self->super.flags = flags;
1332
 
 
1333
 
  self->init_buffer_size = init_buffer_size;
1334
 
  self->max_buffer_size = max_buffer_size;
1335
 
}
1336
 
 
1337
 
struct _LogProtoTextServer
1338
 
{
1339
 
  LogProtoBufferedServer super;
1340
 
  GIConv reverse_convert;
1341
 
  gchar *reverse_buffer;
1342
 
  gsize reverse_buffer_len;
1343
 
  gint convert_scale;
1344
 
};
1345
 
 
1346
 
/**
1347
 
 * This function is called in cases when several files are continously
1348
 
 * polled for changes.  Whenever the caller would like to switch to another
1349
 
 * file, it will call this function to check whether it should be allowed to do so.
1350
 
 *
1351
 
 * This function returns true if the current state of this LogProto would
1352
 
 * allow preemption, e.g.  the contents of the current buffer can be
1353
 
 * discarded.
1354
 
 **/
1355
 
gboolean
1356
 
log_proto_text_server_is_preemptable(LogProtoTextServer *self)
1357
 
{
1358
 
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
1359
 
  gboolean preemptable;
1360
 
 
1361
 
  preemptable = (state->buffer_cached_eol == 0);
1362
 
  log_proto_buffered_server_put_state(&self->super);
1363
 
  return preemptable;
1364
 
}
1365
 
 
1366
 
static gboolean
1367
 
log_proto_text_server_prepare(LogProto *s, gint *fd, GIOCondition *cond)
1368
 
{
1369
 
  LogProtoTextServer *self = (LogProtoTextServer *) s;
1370
 
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
1371
 
  gboolean avail;
1372
 
 
1373
 
  if (log_proto_buffered_server_prepare(s, fd, cond))
1374
 
    {
1375
 
      log_proto_buffered_server_put_state(&self->super);
1376
 
      return TRUE;
1377
 
    }
1378
 
 
1379
 
  avail = (state->buffer_cached_eol != 0);
1380
 
  log_proto_buffered_server_put_state(&self->super);
1381
 
  return avail;
1382
 
}
1383
 
 
1384
 
/**
1385
 
 * Find the character terminating the buffer.
1386
 
 *
1387
 
 * NOTE: when looking for the end-of-message here, it either needs to be
1388
 
 * terminated via NUL or via NL, when terminating via NL we have to make
1389
 
 * sure that there's no NUL left in the message. This function iterates over
1390
 
 * the input data and returns a pointer to the first occurence of NL or NUL.
1391
 
 *
1392
 
 * It uses an algorithm similar to what there's in libc memchr/strchr.
1393
 
 *
1394
 
 * NOTE: find_eom is not static as it is used by a unit test program.
1395
 
 **/
1396
 
const guchar *
1397
 
find_eom(const guchar *s, gsize n)
1398
 
{
1399
 
  const guchar *char_ptr;
1400
 
  const gulong *longword_ptr;
1401
 
  gulong longword, magic_bits, charmask;
1402
 
  gchar c;
1403
 
 
1404
 
  c = '\n';
1405
 
 
1406
 
  /* align input to long boundary */
1407
 
  for (char_ptr = s; n > 0 && ((gulong) char_ptr & (sizeof(longword) - 1)) != 0; ++char_ptr, n--)
1408
 
    {
1409
 
      if (*char_ptr == c || *char_ptr == '\0')
1410
 
        return char_ptr;
1411
 
    }
1412
 
 
1413
 
  longword_ptr = (gulong *) char_ptr;
1414
 
 
1415
 
#if GLIB_SIZEOF_LONG == 8
1416
 
  magic_bits = 0x7efefefefefefeffL;
1417
 
#elif GLIB_SIZEOF_LONG == 4
1418
 
  magic_bits = 0x7efefeffL;
1419
 
#else
1420
 
  #error "unknown architecture"
1421
 
#endif
1422
 
  memset(&charmask, c, sizeof(charmask));
1423
 
 
1424
 
  while (n > sizeof(longword))
1425
 
    {
1426
 
      longword = *longword_ptr++;
1427
 
      if ((((longword + magic_bits) ^ ~longword) & ~magic_bits) != 0 ||
1428
 
          ((((longword ^ charmask) + magic_bits) ^ ~(longword ^ charmask)) & ~magic_bits) != 0)
1429
 
        {
1430
 
          gint i;
1431
 
 
1432
 
          char_ptr = (const guchar *) (longword_ptr - 1);
1433
 
 
1434
 
          for (i = 0; i < sizeof(longword); i++)
1435
 
            {
1436
 
              if (*char_ptr == c || *char_ptr == '\0')
1437
 
                return char_ptr;
1438
 
              char_ptr++;
1439
 
            }
1440
 
        }
1441
 
      n -= sizeof(longword);
1442
 
    }
1443
 
 
1444
 
  char_ptr = (const guchar *) longword_ptr;
1445
 
 
1446
 
  while (n-- > 0)
1447
 
    {
1448
 
      if (*char_ptr == c || *char_ptr == '\0')
1449
 
        return char_ptr;
1450
 
      ++char_ptr;
1451
 
    }
1452
 
 
1453
 
  return NULL;
1454
 
}
1455
 
 
1456
 
/*
1457
 
 * returns the number of bytes that represent the UTF8 encoding buffer
1458
 
 * in the original encoding that the user specified.
1459
 
 *
1460
 
 * NOTE: this is slow, but we only call this for the remainder of our
1461
 
 * buffer (e.g. the partial line at the end of our last chunk of read
1462
 
 * data). Also, this is only invoked if the file uses an encoding.
1463
 
 */
1464
 
static gsize
1465
 
log_proto_text_server_get_raw_size_of_buffer(LogProtoTextServer *self, const guchar *buffer, gsize buffer_len)
1466
 
{
1467
 
  gchar *out;
1468
 
  const guchar *in;
1469
 
  gsize avail_out, avail_in;
1470
 
  gint ret;
1471
 
 
1472
 
  if (self->reverse_convert == ((GIConv) -1) && !self->convert_scale)
1473
 
    {
1474
 
      /* try to speed up raw size calculation by recognizing the most
1475
 
       * prominent character encodings and in the case the encoding
1476
 
       * uses fixed size characters set that in self->convert_scale,
1477
 
       * which in turn will speed up the reversal of the UTF8 buffer
1478
 
       * size to raw buffer sizes.
1479
 
       */
1480
 
      self->convert_scale = log_proto_get_char_size_for_fixed_encoding(self->super.super.encoding);
1481
 
      if (self->convert_scale == 0)
1482
 
        {
1483
 
          /* this encoding is not known, do the conversion for real :( */
1484
 
          self->reverse_convert = g_iconv_open(self->super.super.encoding, "utf-8");
1485
 
        }
1486
 
    }
1487
 
 
1488
 
  if (self->convert_scale)
1489
 
    return g_utf8_strlen((gchar *) buffer, buffer_len) * self->convert_scale;
1490
 
 
1491
 
  if (self->reverse_buffer_len < buffer_len * 6)
1492
 
    {
1493
 
      /* we free and malloc, since we never need the data still in reverse buffer */
1494
 
      g_free(self->reverse_buffer);
1495
 
      self->reverse_buffer_len = buffer_len * 6;
1496
 
      self->reverse_buffer = g_malloc(buffer_len * 6);
1497
 
    }
1498
 
 
1499
 
  avail_out = self->reverse_buffer_len;
1500
 
  out = self->reverse_buffer;
1501
 
 
1502
 
  avail_in = buffer_len;
1503
 
  in = buffer;
1504
 
 
1505
 
  ret = g_iconv(self->reverse_convert, (gchar **) &in, &avail_in, &out, &avail_out);
1506
 
  if (ret == (gsize) -1)
1507
 
    {
1508
 
      /* oops, we cannot reverse that we ourselves converted to UTF-8,
1509
 
       * this is simply impossible, but never say never */
1510
 
      msg_error("Internal error, couldn't reverse the internal UTF8 string to the original encoding",
1511
 
                evt_tag_printf("buffer", "%.*s", (gint) buffer_len, buffer),
1512
 
                NULL);
1513
 
      return 0;
1514
 
    }
1515
 
  else
1516
 
    {
1517
 
      return self->reverse_buffer_len - avail_out;
1518
 
    }
1519
 
}
1520
 
 
1521
 
 
1522
 
/**
1523
 
 * log_proto_text_server_fetch_from_buf:
1524
 
 * @self: LogReader instance
1525
 
 * @saddr: socket address to be assigned to new messages (consumed!)
1526
 
 * @flush: whether to flush the input buffer
1527
 
 * @msg_counter: the number of messages processed in the current poll iteration
1528
 
 *
1529
 
 * Returns TRUE if a message was found in the buffer, FALSE if we need to read again.
1530
 
 **/
1531
 
static gboolean
1532
 
log_proto_text_server_fetch_from_buf(LogProtoBufferedServer *s, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
1533
 
{
1534
 
  LogProtoTextServer *self = (LogProtoTextServer *) s;
1535
 
  const guchar *eol;
1536
 
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
1537
 
  gboolean result = FALSE;
1538
 
 
1539
 
  if (flush_the_rest)
1540
 
    {
1541
 
      /*
1542
 
       * we are set to packet terminating mode or the connection is to
1543
 
       * be teared down and we have partial data in our buffer.
1544
 
       */
1545
 
      *msg = buffer_start;
1546
 
      *msg_len = buffer_bytes;
1547
 
      state->pending_buffer_pos = state->pending_buffer_end;
1548
 
      goto success;
1549
 
    }
1550
 
 
1551
 
  if (state->buffer_cached_eol)
1552
 
    {
1553
 
      /* previous invocation was nice enough to save a cached EOL
1554
 
       * pointer, no need to look it up again */
1555
 
 
1556
 
      eol = self->super.buffer + state->buffer_cached_eol;
1557
 
      state->buffer_cached_eol = 0;
1558
 
    }
1559
 
  else
1560
 
    {
1561
 
      eol = find_eom(buffer_start, buffer_bytes);
1562
 
    }
1563
 
  if ((!eol && (buffer_bytes == state->buffer_size)))
1564
 
    {
1565
 
      /* our buffer is full and no EOL was found */
1566
 
      *msg_len = buffer_bytes;
1567
 
      state->pending_buffer_pos = state->pending_buffer_end;
1568
 
      *msg = buffer_start;
1569
 
      goto success;
1570
 
    }
1571
 
  else if (!eol)
1572
 
    {
1573
 
      gsize raw_split_size;
1574
 
 
1575
 
      /* buffer is not full, but no EOL is present, move partial line
1576
 
       * to the beginning of the buffer to make space for new data.
1577
 
       */
1578
 
 
1579
 
      memmove(self->super.buffer, buffer_start, buffer_bytes);
1580
 
      state->pending_buffer_pos = 0;
1581
 
      state->pending_buffer_end = buffer_bytes;
1582
 
 
1583
 
      if (G_UNLIKELY(self->super.super.flags & LPBS_POS_TRACKING))
1584
 
        {
1585
 
          /* NOTE: we modify the current file position _after_ updating
1586
 
             buffer_pos, since if we crash right here, at least we
1587
 
             won't lose data on the next restart, but rather we
1588
 
             duplicate some data */
1589
 
 
1590
 
          if (self->super.super.encoding)
1591
 
            raw_split_size = log_proto_text_server_get_raw_size_of_buffer(self, buffer_start, buffer_bytes);
1592
 
          else
1593
 
            raw_split_size = buffer_bytes;
1594
 
 
1595
 
          state->pending_raw_stream_pos += (gint64) (state->pending_raw_buffer_size - raw_split_size);
1596
 
          state->pending_raw_buffer_size = raw_split_size;
1597
 
 
1598
 
          msg_trace("Buffer split",
1599
 
                    evt_tag_int("raw_split_size", raw_split_size),
1600
 
                    evt_tag_int("buffer_bytes", buffer_bytes),
1601
 
                    NULL);
1602
 
        }
1603
 
      goto exit;
1604
 
    }
1605
 
  else
1606
 
    {
1607
 
      const guchar *msg_end = eol;
1608
 
 
1609
 
      /* eol points at the newline character. end points at the
1610
 
       * character terminating the line, which may be a carriage
1611
 
       * return preceeding the newline. */
1612
 
 
1613
 
      while ((msg_end > buffer_start) && (msg_end[-1] == '\r' || msg_end[-1] == '\n' || msg_end[-1] == 0))
1614
 
        msg_end--;
1615
 
 
1616
 
      *msg_len = msg_end - buffer_start;
1617
 
      *msg = buffer_start;
1618
 
      state->pending_buffer_pos = eol + 1 - self->super.buffer;
1619
 
 
1620
 
      if (state->pending_buffer_end != state->pending_buffer_pos)
1621
 
        {
1622
 
          const guchar *eom;
1623
 
          /* store the end of the next line, it indicates whether we need
1624
 
           * to read further data, or the buffer already contains a
1625
 
           * complete line */
1626
 
          eom = find_eom(self->super.buffer + state->pending_buffer_pos, state->pending_buffer_end - state->pending_buffer_pos);
1627
 
          if (eom)
1628
 
            state->buffer_cached_eol = eom - self->super.buffer;
1629
 
          else
1630
 
            state->buffer_cached_eol = 0;
1631
 
        }
1632
 
      else
1633
 
        {
1634
 
          state->pending_buffer_pos = state->pending_buffer_end;
1635
 
        }
1636
 
      goto success;
1637
 
    }
1638
 
 success:
1639
 
  result = TRUE;
1640
 
 exit:
1641
 
  log_proto_buffered_server_put_state(&self->super);
1642
 
  return result;
1643
 
}
1644
 
 
1645
 
static void
1646
 
log_proto_text_server_free(LogProto *s)
1647
 
{
1648
 
  LogProtoTextServer *self = (LogProtoTextServer *) s;
1649
 
  if (self->reverse_convert != (GIConv) -1)
1650
 
    g_iconv_close(self->reverse_convert);
1651
 
 
1652
 
  g_free(self->reverse_buffer);
1653
 
  log_proto_buffered_server_free(&self->super.super);
1654
 
}
1655
 
 
1656
 
static void
1657
 
log_proto_text_server_init(LogProtoTextServer *self, LogTransport *transport, gint max_msg_size, guint flags)
1658
 
{
1659
 
  log_proto_buffered_server_init(&self->super, transport, max_msg_size * 6, max_msg_size, flags);
1660
 
  self->super.fetch_from_buf = log_proto_text_server_fetch_from_buf;
1661
 
  self->super.super.prepare = log_proto_text_server_prepare;
1662
 
  self->super.super.free_fn = log_proto_text_server_free;
1663
 
  self->reverse_convert = (GIConv) -1;
1664
 
}
1665
 
 
1666
 
LogProto *
1667
 
log_proto_text_server_new(LogTransport *transport, gint max_msg_size, guint flags)
1668
 
{
1669
 
  LogProtoTextServer *self = g_new0(LogProtoTextServer, 1);
1670
 
 
1671
 
  log_proto_text_server_init(self, transport, max_msg_size, flags);
1672
 
  return &self->super.super;
1673
 
}
1674
 
 
1675
 
/* proto that reads the stream in even sized chunks */
1676
 
typedef struct _LogProtoRecordServer LogProtoRecordServer;
1677
 
struct _LogProtoRecordServer
1678
 
{
1679
 
  LogProtoBufferedServer super;
1680
 
  gsize record_size;
1681
 
};
1682
 
 
1683
 
static gboolean
1684
 
log_proto_record_server_fetch_from_buf(LogProtoBufferedServer *s, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
1685
 
{
1686
 
  LogProtoRecordServer *self = (LogProtoRecordServer *) s;
1687
 
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(s);
1688
 
  const guchar *eol;
1689
 
 
1690
 
  if (!(self->super.super.flags & LPRS_BINARY))
1691
 
    {
1692
 
      eol = find_eom(buffer_start, buffer_bytes);
1693
 
      *msg_len = (eol ? eol - buffer_start : buffer_bytes);
1694
 
    }
1695
 
  else
1696
 
    {
1697
 
      *msg_len = buffer_bytes;
1698
 
    }
1699
 
  state->pending_buffer_pos = state->pending_buffer_end;
1700
 
  *msg = buffer_start;
1701
 
  log_proto_buffered_server_put_state(s);
1702
 
  return TRUE;
1703
 
}
1704
 
 
1705
 
static gint
1706
 
log_proto_record_server_read_data(LogProtoBufferedServer *s, guchar *buf, gsize len, GSockAddr **sa)
1707
 
{
1708
 
  LogProtoRecordServer *self = (LogProtoRecordServer *) s;
1709
 
  gint rc;
1710
 
 
1711
 
  g_assert(len <= self->record_size);
1712
 
  len = self->record_size;
1713
 
  rc = log_transport_read(self->super.super.transport, buf, len, sa);
1714
 
  if (rc > 0 && rc != self->record_size)
1715
 
    {
1716
 
      msg_error("Padding was set, and couldn't read enough bytes",
1717
 
                evt_tag_int(EVT_TAG_FD, self->super.super.transport->fd),
1718
 
                evt_tag_int("padding", self->record_size),
1719
 
                evt_tag_int("read", rc),
1720
 
                NULL);
1721
 
      errno = EIO;
1722
 
      return -1;
1723
 
    }
1724
 
  return rc;
1725
 
}
1726
 
 
1727
 
LogProto *
1728
 
log_proto_record_server_new(LogTransport *transport, gint record_size, guint flags)
1729
 
{
1730
 
  LogProtoRecordServer *self = g_new0(LogProtoRecordServer, 1);
1731
 
 
1732
 
  log_proto_buffered_server_init(&self->super, transport, record_size * 6, record_size, flags | LPBS_RECORD);
1733
 
  self->super.fetch_from_buf = log_proto_record_server_fetch_from_buf;
1734
 
  self->super.read_data = log_proto_record_server_read_data;
1735
 
  self->record_size = record_size;
1736
 
  return &self->super.super;
1737
 
}
1738
 
 
1739
 
/* proto that reads the stream in even sized chunks */
1740
 
typedef struct _LogProtoDGramServer LogProtoDGramServer;
1741
 
struct _LogProtoDGramServer
1742
 
{
1743
 
  LogProtoBufferedServer super;
1744
 
};
1745
 
 
1746
 
static gboolean
1747
 
log_proto_dgram_server_fetch_from_buf(LogProtoBufferedServer *s, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
1748
 
{
1749
 
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(s);
1750
 
 
1751
 
  /*
1752
 
   * we are set to packet terminating mode
1753
 
   */
1754
 
  *msg = buffer_start;
1755
 
  *msg_len = buffer_bytes;
1756
 
  state->pending_buffer_pos = state->pending_buffer_end;
1757
 
  log_proto_buffered_server_put_state(s);
1758
 
  return TRUE;
1759
 
}
1760
 
 
1761
 
LogProto *
1762
 
log_proto_dgram_server_new(LogTransport *transport, gint max_msg_size, guint flags)
1763
 
{
1764
 
  LogProtoRecordServer *self = g_new0(LogProtoRecordServer, 1);
1765
 
 
1766
 
  log_proto_buffered_server_init(&self->super, transport, max_msg_size * 6, max_msg_size, flags | LPBS_IGNORE_EOF | LPBS_RECORD);
1767
 
  self->super.fetch_from_buf = log_proto_dgram_server_fetch_from_buf;
1768
 
  return &self->super.super;
1769
 
}
1770
 
 
1771
 
#define LPFCS_FRAME_SEND    0
1772
 
#define LPFCS_MESSAGE_SEND  1
1773
 
 
1774
 
typedef struct _LogProtoFramedClient
1775
 
{
1776
 
  LogProtoTextClient super;
1777
 
  guchar frame_hdr_buf[9];
1778
 
} LogProtoFramedClient;
1779
 
 
1780
 
static LogProtoStatus
1781
 
log_proto_framed_client_post(LogProto *s, guchar *msg, gsize msg_len, gboolean *consumed)
1782
 
{
1783
 
  LogProtoFramedClient *self = (LogProtoFramedClient *) s;
1784
 
  gint frame_hdr_len;
1785
 
  gint rc;
1786
 
 
1787
 
  if (msg_len > 9999999)
1788
 
    {
1789
 
      static const guchar *warn_msg;
1790
 
      
1791
 
      if (warn_msg != msg)
1792
 
        {
1793
 
          msg_warning("Error, message length too large for framed protocol, truncated",
1794
 
                      evt_tag_int("length", msg_len),
1795
 
                      NULL);
1796
 
          warn_msg = msg;
1797
 
        }
1798
 
      msg_len = 9999999;
1799
 
    }
1800
 
 
1801
 
  rc = LPS_SUCCESS;
1802
 
  while (rc == LPS_SUCCESS && !(*consumed) && self->super.partial == NULL)
1803
 
    {
1804
 
      switch (self->super.state)
1805
 
        {
1806
 
        case LPFCS_FRAME_SEND:
1807
 
          frame_hdr_len = g_snprintf((gchar *) self->frame_hdr_buf, sizeof(self->frame_hdr_buf), "%" G_GSIZE_FORMAT" ", msg_len);
1808
 
          rc = log_proto_text_client_submit_write(s, self->frame_hdr_buf, frame_hdr_len, NULL, LPFCS_MESSAGE_SEND);
1809
 
          break;
1810
 
        case LPFCS_MESSAGE_SEND:
1811
 
          *consumed = TRUE;
1812
 
          rc = log_proto_text_client_submit_write(s, msg, msg_len, (GDestroyNotify) g_free, LPFCS_FRAME_SEND);
1813
 
          break;
1814
 
        default:
1815
 
          g_assert_not_reached();
1816
 
        }
1817
 
    }
1818
 
 
1819
 
  return rc;
1820
 
}
1821
 
 
1822
 
LogProto *
1823
 
log_proto_framed_client_new(LogTransport *transport)
1824
 
{
1825
 
  LogProtoFramedClient *self = g_new0(LogProtoFramedClient, 1);
1826
 
 
1827
 
  self->super.super.prepare = log_proto_text_client_prepare;
1828
 
  self->super.super.flush = log_proto_text_client_flush;
1829
 
  self->super.super.post = log_proto_framed_client_post;
1830
 
  self->super.super.transport = transport;
1831
 
  self->super.super.convert = (GIConv) -1;
1832
 
  self->super.state = LPFCS_FRAME_SEND;
1833
 
  return &self->super.super;  
1834
 
}
1835
 
 
1836
 
#define LPFSS_FRAME_READ    0
1837
 
#define LPFSS_MESSAGE_READ  1
1838
 
 
1839
 
#define LPFS_FRAME_BUFFER 10
1840
 
 
1841
 
typedef struct _LogProtoFramedServer
1842
 
{
1843
 
  LogProto super;
1844
 
  gint state;
1845
 
 
1846
 
  guchar *buffer;
1847
 
  gsize buffer_size, buffer_pos, buffer_end;
1848
 
  gsize frame_len;
1849
 
  gsize max_msg_size;
1850
 
  gboolean half_message_in_buffer;
1851
 
  GSockAddr *prev_saddr;
1852
 
  LogProtoStatus status;
1853
 
} LogProtoFramedServer;
1854
 
 
1855
 
static gboolean
1856
 
log_proto_framed_server_prepare(LogProto *s, gint *fd, GIOCondition *cond)
1857
 
{
1858
 
  LogProtoFramedServer *self = (LogProtoFramedServer *) s;
1859
 
 
1860
 
  *fd = self->super.transport->fd;
1861
 
  *cond = self->super.transport->cond;
1862
 
 
1863
 
  /* there is a half message in our buffer so try to wait */
1864
 
  if (!self->half_message_in_buffer)
1865
 
    {
1866
 
      if (self->buffer_pos != self->buffer_end)
1867
 
        {
1868
 
          /* we have a full message in our buffer so parse it without reading new data from the transport layer */
1869
 
          return TRUE;
1870
 
        }
1871
 
    }
1872
 
 
1873
 
  /* if there's no pending I/O in the transport layer, then we want to do a read */
1874
 
  if (*cond == 0)
1875
 
    *cond = G_IO_IN;
1876
 
 
1877
 
  return FALSE;
1878
 
}
1879
 
 
1880
 
static LogProtoStatus
1881
 
log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_read)
1882
 
{
1883
 
  gint rc;
1884
 
 
1885
 
  if (self->buffer_pos == self->buffer_end)
1886
 
    self->buffer_pos = self->buffer_end = 0;
1887
 
 
1888
 
  if (self->buffer_size == self->buffer_end)
1889
 
    {
1890
 
      /* no more space in the buffer, we can't fetch further data. Move the
1891
 
       * things we already have to the beginning of the buffer to make
1892
 
       * space. */
1893
 
 
1894
 
      memmove(self->buffer, &self->buffer[self->buffer_pos], self->buffer_end - self->buffer_pos);
1895
 
      self->buffer_end = self->buffer_end - self->buffer_pos;
1896
 
      self->buffer_pos = 0;
1897
 
    }
1898
 
 
1899
 
  if (!(*may_read))
1900
 
    return LPS_SUCCESS;
1901
 
 
1902
 
  rc = log_transport_read(self->super.transport, &self->buffer[self->buffer_end], self->buffer_size - self->buffer_end, NULL);
1903
 
      
1904
 
  if (rc < 0)
1905
 
    {
1906
 
      if (errno != EAGAIN)
1907
 
        {
1908
 
          msg_error("Error reading RFC5428 style framed data",
1909
 
                    evt_tag_int("fd", self->super.transport->fd),
1910
 
                    evt_tag_errno("error", errno),
1911
 
                    NULL);
1912
 
          return LPS_ERROR;
1913
 
        }
1914
 
      else
1915
 
        {
1916
 
          /* we need more data to parse this message but the data is not available yet */
1917
 
          self->half_message_in_buffer = TRUE;
1918
 
        }
1919
 
    }
1920
 
  else if (rc == 0)
1921
 
    {
1922
 
      msg_verbose("EOF occurred while reading",
1923
 
                  evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
1924
 
                  NULL);
1925
 
      return LPS_EOF;
1926
 
    }
1927
 
  else 
1928
 
    {
1929
 
      self->buffer_end += rc;
1930
 
    }
1931
 
  return LPS_SUCCESS;
1932
 
  
1933
 
}
1934
 
 
1935
 
static gboolean
1936
 
log_proto_framed_server_extract_frame_length(LogProtoFramedServer *self, gboolean *need_more_data)
1937
 
{
1938
 
  gint i;
1939
 
 
1940
 
  *need_more_data = TRUE;
1941
 
  self->frame_len = 0;
1942
 
  for (i = self->buffer_pos; i < self->buffer_end; i++)
1943
 
    {
1944
 
      if (isdigit(self->buffer[i]) && (i - self->buffer_pos < 10))
1945
 
        {
1946
 
          self->frame_len = self->frame_len * 10 + (self->buffer[i] - '0');
1947
 
        }
1948
 
      else if (self->buffer[i] == ' ')
1949
 
        {
1950
 
          *need_more_data = FALSE;
1951
 
          self->buffer_pos = i + 1;
1952
 
          return TRUE;
1953
 
        }
1954
 
      else
1955
 
        {
1956
 
          msg_error("Invalid frame header", 
1957
 
                    evt_tag_printf("header", "%.*s", (gint) (i - self->buffer_pos), &self->buffer[self->buffer_pos]),
1958
 
                    NULL);
1959
 
          return FALSE;
1960
 
        }
1961
 
    }
1962
 
  /* couldn't extract frame header, no error but need more data */
1963
 
  return TRUE;
1964
 
}
1965
 
 
1966
 
static LogProtoStatus
1967
 
log_proto_framed_server_fetch(LogProto *s, const guchar **msg, gsize *msg_len, GSockAddr **sa, gboolean *may_read)
1968
 
{
1969
 
  LogProtoFramedServer *self = (LogProtoFramedServer *) s;
1970
 
  LogProtoStatus status;
1971
 
  gboolean try_read, need_more_data;
1972
 
 
1973
 
  if (sa)
1974
 
    *sa = NULL;
1975
 
  switch (self->state)
1976
 
    {
1977
 
    case LPFSS_FRAME_READ:
1978
 
 
1979
 
      try_read = TRUE;
1980
 
 
1981
 
    read_frame:
1982
 
      if (!log_proto_framed_server_extract_frame_length(self, &need_more_data))
1983
 
        {
1984
 
          /* invalid frame header */
1985
 
          return LPS_ERROR;
1986
 
        }
1987
 
      if (need_more_data && try_read)
1988
 
        {
1989
 
          status = log_proto_framed_server_fetch_data(self, may_read);
1990
 
          if (status != LPS_SUCCESS)
1991
 
            return status;
1992
 
          try_read = FALSE;
1993
 
          goto read_frame;
1994
 
        }
1995
 
 
1996
 
      if (!need_more_data)
1997
 
        {
1998
 
          self->state = LPFSS_MESSAGE_READ;
1999
 
          if (self->frame_len > self->max_msg_size)
2000
 
            {
2001
 
              msg_error("Incoming frame larger than log_msg_size()",
2002
 
                        evt_tag_int("log_msg_size", self->buffer_size - LPFS_FRAME_BUFFER),
2003
 
                        evt_tag_int("frame_length", self->frame_len),
2004
 
                        NULL);
2005
 
              return LPS_ERROR;
2006
 
            }
2007
 
          if (self->buffer_pos + self->frame_len > self->buffer_size)
2008
 
            {
2009
 
              /* message would be too large to fit into the buffer at
2010
 
               * buffer_pos, we need to move data to the beginning of
2011
 
               * the buffer to make space, and once we are at it, move
2012
 
               * to the beginning to make space for the maximum number
2013
 
               * of messages before the next shift */
2014
 
              memmove(self->buffer, &self->buffer[self->buffer_pos], self->buffer_end - self->buffer_pos);
2015
 
              self->buffer_end = self->buffer_end - self->buffer_pos;
2016
 
              self->buffer_pos = 0;
2017
 
            }
2018
 
          goto read_message;
2019
 
        }
2020
 
      break;
2021
 
    case LPFSS_MESSAGE_READ:
2022
 
 
2023
 
      try_read = TRUE;
2024
 
    read_message:
2025
 
      /* NOTE: here we can assume that the complete message fits into
2026
 
       * the buffer because of the checks/move operation in the 
2027
 
       * LPFSS_FRAME_READ state */
2028
 
      if (self->buffer_end - self->buffer_pos >= self->frame_len)
2029
 
        {
2030
 
          /* ok, we already have the complete message */
2031
 
 
2032
 
          *msg = &self->buffer[self->buffer_pos];
2033
 
          *msg_len = self->frame_len;
2034
 
          self->buffer_pos += self->frame_len;
2035
 
          self->state = LPFSS_FRAME_READ;
2036
 
 
2037
 
          /* we have the full message here so reset the half message flag */
2038
 
          self->half_message_in_buffer = FALSE;
2039
 
          return LPS_SUCCESS;
2040
 
        }
2041
 
      if (try_read)
2042
 
        {
2043
 
          status = log_proto_framed_server_fetch_data(self, may_read);
2044
 
          if (status != LPS_SUCCESS)
2045
 
            return status;
2046
 
          try_read = FALSE;
2047
 
          goto read_message;
2048
 
        }
2049
 
      break;
2050
 
    default:
2051
 
      break;
2052
 
    }
2053
 
  return LPS_SUCCESS;
2054
 
}
2055
 
 
2056
 
static void
2057
 
log_proto_framed_server_free(LogProto *s)
2058
 
{
2059
 
  LogProtoFramedServer *self = (LogProtoFramedServer *) s;
2060
 
  g_free(self->buffer);
2061
 
}
2062
 
 
2063
 
LogProto *
2064
 
log_proto_framed_server_new(LogTransport *transport, gint max_msg_size)
2065
 
{
2066
 
  LogProtoFramedServer *self = g_new0(LogProtoFramedServer, 1);
2067
 
 
2068
 
  self->super.prepare = log_proto_framed_server_prepare;
2069
 
  self->super.fetch = log_proto_framed_server_fetch;
2070
 
  self->super.free_fn = log_proto_framed_server_free;
2071
 
  self->super.transport = transport;
2072
 
  self->super.convert = (GIConv) -1;
2073
 
  /* max message + frame header */
2074
 
  self->max_msg_size = max_msg_size;
2075
 
  self->buffer_size = max_msg_size + LPFS_FRAME_BUFFER;
2076
 
  self->buffer = g_malloc(self->buffer_size);
2077
 
  self->half_message_in_buffer = FALSE;
2078
 
  return &self->super;
2079
 
}
2080