~ubuntu-branches/ubuntu/trusty/syslog-ng/trusty-proposed

« back to all changes in this revision

Viewing changes to .pc/logproto-Fix-log_proto_file_writer_flush-s-partial-construction.patch/lib/logproto.c

  • Committer: Package Import Robot
  • Author(s): Laszlo Boszormenyi (GCS), Gergely Nagy
  • Date: 2011-10-11 14:30:48 UTC
  • mfrom: (1.3.7)
  • Revision ID: package-import@ubuntu.com-20111011143048-r1iljux9xbvj3lwh
Tags: 3.3.1.dfsg-1
* New upstream release with important fixes from upstream git tree with
  non-free manpages removed.
* Drop syslog-ng.conf(5) (closes: #496521).
* syslog-ng(8) is generated, and does not mention -Q anymore
  (closes: #616069).
* Supports CAP_SYSLOG on recent kernels (closes: #630172).
* Does not use g_timeout_add_seconds anymore (closes: #609154).

[ Gergely Nagy <algernon@madhouse-project.org> ]
* Update debian/copyright to DEP-5 format.
* Simplified the logrotate file by merging identical entries.
* Include local configuration files from /etc/syslog-ng/conf.d/ (Closes:
  #609050).
* Update syslog-ng.conf to be fully 3.3 compliant.
* Compress both source and binaries (except the syslog-ng meta
  package) with xz, instead of gzip.
* Use dpkg triggers to restart syslog-ng when appropriate.
* Include DFSG-free manual pages for all binaries.
* Build with Hardening enabled.
* Mention syslog(3) in /etc/default/syslog-ng, instead of
  <linux/kernel.h> (Closes: #608605)
* Support 'status' in the init script.
  Patch from Peter Eisentraut <petere@debian.org> (Closes: #644458)
* Build-Depend on libevtlog-dev (>= 0.2.12-5~) for correct shlibs.
* Use [linux-any] in Build-Depends instead of hardcoded links.
  (Closes: #634715)
* Use $SYSLOGNG_OPTS in the init script when reloading syslog-ng.
  (Closes: #589081)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary
 
3
 * Copyright (c) 1998-2010 Balázs Scheidler
 
4
 *
 
5
 * This library is free software; you can redistribute it and/or
 
6
 * modify it under the terms of the GNU Lesser General Public
 
7
 * License as published by the Free Software Foundation; either
 
8
 * version 2.1 of the License, or (at your option) any later version.
 
9
 *
 
10
 * This library is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
13
 * Lesser General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU Lesser General Public
 
16
 * License along with this library; if not, write to the Free Software
 
17
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
18
 *
 
19
 * As an additional exemption you are allowed to compile & link against the
 
20
 * OpenSSL libraries as published by the OpenSSL project. See the file
 
21
 * COPYING for details.
 
22
 *
 
23
 */
 
24
 
 
25
 
 
26
#include "logproto.h"
 
27
#include "messages.h"
 
28
#include "persist-state.h"
 
29
#include "compat.h"
 
30
 
 
31
#include <ctype.h>
 
32
#include <string.h>
 
33
#include <sys/stat.h>
 
34
#include <stdlib.h>
 
35
#include <sys/uio.h>
 
36
#include <limits.h>
 
37
 
 
38
gboolean
 
39
log_proto_set_encoding(LogProto *self, const gchar *encoding)
 
40
{
 
41
  if (self->convert != (GIConv) -1)
 
42
    {
 
43
      g_iconv_close(self->convert);
 
44
      self->convert = (GIConv) -1;
 
45
    }
 
46
  if (self->encoding)
 
47
    {
 
48
      g_free(self->encoding);
 
49
      self->encoding = NULL;
 
50
    }
 
51
 
 
52
  self->convert = g_iconv_open("utf-8", encoding);
 
53
  if (self->convert == (GIConv) -1)
 
54
    return FALSE;
 
55
 
 
56
  self->encoding = g_strdup(encoding);
 
57
  return TRUE;
 
58
}
 
59
 
 
60
void
 
61
log_proto_free(LogProto *s)
 
62
{
 
63
  if (s->free_fn)
 
64
    s->free_fn(s);
 
65
  if (s->convert != (GIConv) -1)
 
66
    g_iconv_close(s->convert);
 
67
  if (s->encoding)
 
68
    g_free(s->encoding);
 
69
  log_transport_free(s->transport);
 
70
  g_free(s);
 
71
}
 
72
 
 
73
 
 
74
typedef struct _LogProtoTextClient
 
75
{
 
76
  LogProto super;
 
77
  guchar *partial;
 
78
  gsize partial_len, partial_pos;
 
79
} LogProtoTextClient;
 
80
 
 
81
static gboolean
 
82
log_proto_text_client_prepare(LogProto *s, gint *fd, GIOCondition *cond)
 
83
{
 
84
  LogProtoTextClient *self = (LogProtoTextClient *) s;
 
85
  
 
86
  *fd = self->super.transport->fd;
 
87
  *cond = self->super.transport->cond;
 
88
 
 
89
  /* if there's no pending I/O in the transport layer, then we want to do a write */
 
90
  if (*cond == 0)
 
91
    *cond = G_IO_OUT;
 
92
  return !!self->partial;
 
93
}
 
94
 
 
95
static LogProtoStatus
 
96
log_proto_text_client_flush(LogProto *s)
 
97
{
 
98
  LogProtoTextClient *self = (LogProtoTextClient *) s;
 
99
  gint rc;
 
100
 
 
101
  /* attempt to flush previously buffered data */
 
102
  if (self->partial)
 
103
    {
 
104
      gint len = self->partial_len - self->partial_pos;
 
105
      
 
106
      rc = log_transport_write(self->super.transport, &self->partial[self->partial_pos], len);
 
107
      if (rc < 0)
 
108
        {
 
109
          if (errno != EAGAIN && errno != EINTR)
 
110
            {
 
111
              msg_error("I/O error occurred while writing",
 
112
                        evt_tag_int("fd", self->super.transport->fd),
 
113
                        evt_tag_errno(EVT_TAG_OSERROR, errno),
 
114
                        NULL);
 
115
              return LPS_ERROR;
 
116
            }
 
117
          return LPS_SUCCESS;
 
118
        }
 
119
      else if (rc != len)
 
120
        {
 
121
          self->partial_pos += rc;
 
122
          return LPS_SUCCESS;
 
123
        }
 
124
      else
 
125
        {
 
126
          g_free(self->partial);
 
127
          self->partial = NULL;
 
128
          /* NOTE: we return here to give a chance to the framed protocol to send the frame header. */
 
129
          return LPS_SUCCESS;
 
130
        }
 
131
    }
 
132
  return LPS_SUCCESS;
 
133
}
 
134
 
 
135
/*
 
136
 * log_proto_text_client_post:
 
137
 * @msg: formatted log message to send (this might be consumed by this function)
 
138
 * @msg_len: length of @msg
 
139
 * @consumed: pointer to a gboolean that gets set if the message was consumed by this function
 
140
 * @error: error information, if any
 
141
 *
 
142
 * This function posts a message to the log transport, performing buffering
 
143
 * of partially sent data if needed. The return value indicates whether we
 
144
 * successfully sent this message, or if it should be resent by the caller.
 
145
 **/
 
146
static LogProtoStatus
 
147
log_proto_text_client_post(LogProto *s, guchar *msg, gsize msg_len, gboolean *consumed)
 
148
{
 
149
  LogProtoTextClient *self = (LogProtoTextClient *) s;
 
150
  gint rc;
 
151
 
 
152
  /* NOTE: the client does not support charset conversion for now */
 
153
  g_assert(self->super.convert == (GIConv) -1);
 
154
 
 
155
  *consumed = FALSE;
 
156
  rc = log_proto_flush(s);
 
157
  if (rc == LPS_ERROR)
 
158
    {
 
159
      goto write_error;
 
160
    }
 
161
  else if (self->partial)
 
162
    {
 
163
      /* NOTE: the partial buffer has not been emptied yet even with the
 
164
       * flush above, we shouldn't attempt to write again.
 
165
       *
 
166
       * Otherwise: with the framed protocol this could case the frame
 
167
       * header to be split, and interleaved with message payload, as in:
 
168
       *
 
169
       *     First bytes of frame header || payload || tail of frame header.
 
170
       *
 
171
       * This obviously would cause the framing to break. Also libssl
 
172
       * returns an error in this case, which is how this was discovered.
 
173
       */
 
174
      return rc;
 
175
    }
 
176
 
 
177
  /* OK, partial buffer empty, now flush msg that we just got */
 
178
  
 
179
  rc = log_transport_write(self->super.transport, msg, msg_len);
 
180
  
 
181
  if (rc < 0 || rc != msg_len)
 
182
    {
 
183
      /* error OR partial flush, we sent _some_ of the message that we got, save it to self->partial and tell the caller that we consumed it */
 
184
      if (rc < 0 && errno != EAGAIN && errno != EINTR)
 
185
        goto write_error;
 
186
      
 
187
      /* NOTE: log_proto_framed_client_post depends on our current
 
188
       * behaviour, that we consume every message that we can, even if we
 
189
       * couldn't write a single byte out.
 
190
       *
 
191
       * If we return LPS_SUCCESS and self->partial == NULL, it assumes that
 
192
       * the message was sent.
 
193
       */
 
194
      
 
195
        
 
196
      self->partial = msg;
 
197
      self->partial_len = msg_len;
 
198
      self->partial_pos = rc > 0 ? rc : 0;
 
199
      *consumed = TRUE;
 
200
    }
 
201
  else
 
202
    {
 
203
      /* all data was nicely sent */
 
204
      g_free(msg);
 
205
      *consumed = TRUE;
 
206
    }
 
207
  return LPS_SUCCESS;
 
208
 
 
209
 write_error:
 
210
  if (errno != EAGAIN && errno != EINTR)
 
211
    {
 
212
      msg_error("I/O error occurred while writing",
 
213
                evt_tag_int("fd", self->super.transport->fd),
 
214
                evt_tag_errno(EVT_TAG_OSERROR, errno),
 
215
                NULL);
 
216
      return LPS_ERROR;
 
217
    }
 
218
 
 
219
  return LPS_SUCCESS;
 
220
}
 
221
 
 
222
LogProto *
 
223
log_proto_text_client_new(LogTransport *transport)
 
224
{
 
225
  LogProtoTextClient *self = g_new0(LogProtoTextClient, 1);
 
226
 
 
227
  self->super.prepare = log_proto_text_client_prepare;
 
228
  self->super.flush = log_proto_text_client_flush;
 
229
  self->super.post = log_proto_text_client_post;
 
230
  self->super.transport = transport;
 
231
  self->super.convert = (GIConv) -1;
 
232
  return &self->super;
 
233
}
 
234
 
 
235
typedef struct _LogProtoFileWriter
 
236
{
 
237
  LogProto super;
 
238
  guchar *partial;
 
239
  gsize partial_len, partial_pos;
 
240
  gint buf_size;
 
241
  gint buf_count;
 
242
  gint fd;
 
243
  gint sum_len;
 
244
  struct iovec buffer[0];
 
245
} LogProtoFileWriter;
 
246
 
 
247
/*
 
248
 * log_proto_file_writer_flush:
 
249
 *
 
250
 * this function flushes the file output buffer
 
251
 * it is called either form log_proto_file_writer_post (normal mode: the buffer is full)
 
252
 * or from log_proto_flush (foced flush: flush time, exit, etc)
 
253
 *
 
254
 */
 
255
static LogProtoStatus
 
256
log_proto_file_writer_flush(LogProto *s)
 
257
{
 
258
  LogProtoFileWriter *self = (LogProtoFileWriter *)s;
 
259
  gint rc, i, i0, sum, ofs;
 
260
 
 
261
  /* we might be called from log_writer_deinit() without having a buffer at all */
 
262
 
 
263
  if (self->buf_count == 0)
 
264
    return LPS_SUCCESS;
 
265
 
 
266
  /* lseek() is used instead of O_APPEND, as on NFS  O_APPEND performs
 
267
   * poorly, as reported on the mailing list 2008/05/29 */
 
268
 
 
269
  lseek(self->fd, 0, SEEK_END);
 
270
  rc = writev(self->fd, self->buffer, self->buf_count);
 
271
 
 
272
  if (rc < 0)
 
273
    {
 
274
      if (errno != EAGAIN && errno != EINTR)
 
275
        {
 
276
          msg_error("I/O error occurred while writing",
 
277
                    evt_tag_int("fd", self->super.transport->fd),
 
278
                    evt_tag_errno(EVT_TAG_OSERROR, errno),
 
279
                    NULL);
 
280
          return LPS_ERROR;
 
281
        }
 
282
 
 
283
      return LPS_SUCCESS;
 
284
    }
 
285
  else if (rc != self->sum_len)
 
286
    {
 
287
      /* partial success: not everything has been written out */
 
288
      /* look for the first chunk that has been cut */
 
289
      sum = self->buffer[0].iov_len; /* sum is the cumulated length of the already processed items */
 
290
      i = 0;
 
291
      while (rc > sum)
 
292
        sum += self->buffer[++i].iov_len;
 
293
      self->partial_len = sum - rc; /* this is the length of the first non-written chunk */
 
294
      i0 = i;
 
295
      ++i;
 
296
      /* add the lengths of the following messages */
 
297
      while (i < self->buf_count)
 
298
        self->partial_len += self->buffer[i++].iov_len;
 
299
      /* allocate and copy the remaning data */
 
300
      self->partial = (guchar *)g_malloc(self->partial_len);
 
301
      ofs = sum - rc; /* the length of the remaning (not processed) chunk in the first message */
 
302
        memcpy(self->partial, self->buffer[i0].iov_base + rc - (i0 > 0 ? (sum - self->buffer[i0 - 1].iov_len) : 0), ofs);
 
303
      i = i0 + 1;
 
304
      while (i < self->buf_count)
 
305
        {
 
306
          memcpy(self->partial + ofs, self->buffer[i].iov_base, self->buffer[i].iov_len);
 
307
          ofs += self->buffer[i].iov_len;
 
308
          ++i;
 
309
        }
 
310
      self->partial_pos = 0;
 
311
    }
 
312
 
 
313
  /* free the previous message strings (the remaning part has been copied to the partial buffer) */
 
314
  for (i = 0; i < self->buf_count; ++i)
 
315
    g_free(self->buffer[i].iov_base);
 
316
  self->buf_count = 0;
 
317
  self->sum_len = 0;
 
318
 
 
319
  return LPS_SUCCESS;
 
320
}
 
321
 
 
322
/*
 
323
 * log_proto_file_writer_post:
 
324
 * @msg: formatted log message to send (this might be consumed by this function)
 
325
 * @msg_len: length of @msg
 
326
 * @consumed: pointer to a gboolean that gets set if the message was consumed by this function
 
327
 * @error: error information, if any
 
328
 *
 
329
 * This function posts a message to the log transport, performing buffering
 
330
 * of partially sent data if needed. The return value indicates whether we
 
331
 * successfully sent this message, or if it should be resent by the caller.
 
332
 **/
 
333
static LogProtoStatus
 
334
log_proto_file_writer_post(LogProto *s, guchar *msg, gsize msg_len, gboolean *consumed)
 
335
{
 
336
  LogProtoFileWriter *self = (LogProtoFileWriter *)s;
 
337
  gint rc;
 
338
 
 
339
  if (self->buf_count >= self->buf_size)
 
340
    {
 
341
      rc = log_proto_file_writer_flush(s);
 
342
      if (rc != LPS_SUCCESS || self->buf_count >= self->buf_size)
 
343
        {
 
344
          /* don't consume a new message if flush failed, or even after the flush we don't have any free slots */
 
345
          return rc;
 
346
        }
 
347
    }
 
348
 
 
349
  *consumed = FALSE;
 
350
  if (self->partial)
 
351
    {
 
352
      /* there is still some data from the previous file writing process */
 
353
      gint len = self->partial_len - self->partial_pos;
 
354
 
 
355
      rc = write(self->fd, self->partial + self->partial_pos, len);
 
356
      if (rc < 0)
 
357
        {
 
358
          goto write_error;
 
359
        }
 
360
      else if (rc != len)
 
361
        {
 
362
          self->partial_pos += rc;
 
363
          return LPS_SUCCESS;
 
364
        }
 
365
      else
 
366
        {
 
367
          g_free(self->partial);
 
368
          self->partial = NULL;
 
369
          /* NOTE: we return here to give a chance to the framed protocol to send the frame header. */
 
370
          return LPS_SUCCESS;
 
371
        }
 
372
    }
 
373
 
 
374
  /* register the new message */
 
375
  self->buffer[self->buf_count].iov_base = (void *) msg;
 
376
  self->buffer[self->buf_count].iov_len = msg_len;
 
377
  ++self->buf_count;
 
378
  self->sum_len += msg_len;
 
379
  *consumed = TRUE;
 
380
 
 
381
  if (self->buf_count == self->buf_size)
 
382
    {
 
383
      /* we have reached the max buffer size -> we need to write the messages */
 
384
      return log_proto_file_writer_flush(s);
 
385
    }
 
386
 
 
387
  return LPS_SUCCESS;
 
388
 
 
389
write_error:
 
390
  if (errno != EAGAIN && errno != EINTR)
 
391
    {
 
392
      msg_error("I/O error occurred while writing",
 
393
                evt_tag_int("fd", self->super.transport->fd),
 
394
                evt_tag_errno(EVT_TAG_OSERROR, errno),
 
395
                NULL);
 
396
      return LPS_ERROR;
 
397
    }
 
398
 
 
399
  return LPS_SUCCESS;
 
400
}
 
401
 
 
402
static gboolean
 
403
log_proto_file_writer_prepare(LogProto *s, gint *fd, GIOCondition *cond)
 
404
{
 
405
  LogProtoFileWriter *self = (LogProtoFileWriter *) s;
 
406
 
 
407
  *fd = self->super.transport->fd;
 
408
  *cond = self->super.transport->cond;
 
409
 
 
410
  /* if there's no pending I/O in the transport layer, then we want to do a write */
 
411
  if (*cond == 0)
 
412
    *cond = G_IO_OUT;
 
413
  return self->buf_count > 0 || self->partial;
 
414
}
 
415
 
 
416
LogProto *
 
417
log_proto_file_writer_new(LogTransport *transport, gint flush_lines)
 
418
{
 
419
  if (flush_lines == 0)
 
420
    /* the flush-lines option has not been specified, use a default value */
 
421
    flush_lines = 1;
 
422
  if (flush_lines > IOV_MAX)
 
423
    /* limit the flush_lines according to the current platform */
 
424
    flush_lines = IOV_MAX;
 
425
 
 
426
  /* allocate the structure with the proper number of items at the end */
 
427
  LogProtoFileWriter *self = (LogProtoFileWriter *)g_malloc0(sizeof(LogProtoFileWriter) + sizeof(struct iovec)*flush_lines);
 
428
 
 
429
  self->fd = transport->fd;
 
430
  self->buf_size = flush_lines;
 
431
  self->super.prepare = log_proto_file_writer_prepare;
 
432
  self->super.post = log_proto_file_writer_post;
 
433
  self->super.flush = log_proto_file_writer_flush;
 
434
  self->super.transport = transport;
 
435
  self->super.convert = (GIConv) -1;
 
436
  return &self->super;
 
437
}
 
438
 
 
439
 
 
440
 
 
441
typedef struct _LogProtoBufferedServerState
 
442
{
 
443
  /* NOTE: that if you add/remove structure members you have to update
 
444
   * the byte order swap code in LogProtoFileReader for mulit-byte
 
445
   * members. */
 
446
 
 
447
  guint8 version;
 
448
 
 
449
  /* this indicates that the members in the struct are stored in
 
450
   * big-endian byte order. if the byte ordering of the struct doesn't
 
451
   * match the current CPU byte ordering, then the members are
 
452
   * byte-swapped when the state is loaded.
 
453
   */
 
454
  guint8 big_endian:1;
 
455
  guint8 raw_buffer_leftover_size;
 
456
  guint8 __padding1[1];
 
457
  guint32 buffer_pos;
 
458
  guint32 pending_buffer_end;
 
459
  guint32 buffer_size;
 
460
  guint32 buffer_cached_eol;
 
461
  guint32 pending_buffer_pos;
 
462
 
 
463
  /* the stream position where we converted out current buffer from (offset in file) */
 
464
  gint64 raw_stream_pos;
 
465
  gint64 pending_raw_stream_pos;
 
466
  /* the size of raw data (measured in bytes) that got converted from raw_stream_pos into our buffer */
 
467
  gint32 raw_buffer_size;
 
468
  gint32 pending_raw_buffer_size;
 
469
  guchar raw_buffer_leftover[8];
 
470
 
 
471
  gint64 file_size;
 
472
  gint64 file_inode;
 
473
} LogProtoBufferedServerState;
 
474
 
 
475
typedef struct _LogProtoBufferedServer LogProtoBufferedServer;
 
476
struct _LogProtoBufferedServer
 
477
{
 
478
  LogProto super;
 
479
  gboolean (*fetch_from_buf)(LogProtoBufferedServer *self, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest);
 
480
  gint (*read_data)(LogProtoBufferedServer *self, guchar *buf, gsize len, GSockAddr **sa);
 
481
 
 
482
  LogProtoBufferedServerState *state1;
 
483
  PersistState *persist_state;
 
484
  PersistEntryHandle persist_handle;
 
485
 
 
486
  gint max_buffer_size;
 
487
  gint init_buffer_size;
 
488
  guchar *buffer;
 
489
  GSockAddr *prev_saddr;
 
490
  LogProtoStatus status;
 
491
};
 
492
 
 
493
static LogProtoBufferedServerState *
 
494
log_proto_buffered_server_get_state(LogProtoBufferedServer *self)
 
495
{
 
496
  if (self->persist_state)
 
497
    {
 
498
      g_assert(self->persist_handle != 0);
 
499
      return persist_state_map_entry(self->persist_state, self->persist_handle);
 
500
    }
 
501
  if (G_UNLIKELY(!self->state1))
 
502
    {
 
503
      self->state1 = g_new0(LogProtoBufferedServerState, 1);
 
504
    }
 
505
  return self->state1;
 
506
}
 
507
 
 
508
static void
 
509
log_proto_buffered_server_put_state(LogProtoBufferedServer *self)
 
510
{
 
511
  if (self->persist_state && self->persist_handle)
 
512
    persist_state_unmap_entry(self->persist_state, self->persist_handle);
 
513
}
 
514
 
 
515
static gboolean
 
516
log_proto_buffered_server_convert_from_raw(LogProtoBufferedServer *self, const guchar *raw_buffer, gsize raw_buffer_len)
 
517
{
 
518
  /* some data was read */
 
519
  gsize avail_in = raw_buffer_len;
 
520
  gsize avail_out;
 
521
  gchar *out;
 
522
  gint  ret = -1;
 
523
  gboolean success = FALSE;
 
524
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
 
525
 
 
526
  do
 
527
    {
 
528
      avail_out = state->buffer_size - state->pending_buffer_end;
 
529
      out = (gchar *) self->buffer + state->pending_buffer_end;
 
530
 
 
531
      ret = g_iconv(self->super.convert, (gchar **) &raw_buffer, &avail_in, (gchar **) &out, &avail_out);
 
532
      if (ret == (gsize) -1)
 
533
        {
 
534
          switch (errno)
 
535
            {
 
536
            case EINVAL:
 
537
              /* Incomplete text, do not report an error, rather try to read again */
 
538
              state->pending_buffer_end = state->buffer_size - avail_out;
 
539
 
 
540
              if (avail_in > 0)
 
541
                {
 
542
                  if (avail_in > sizeof(state->raw_buffer_leftover))
 
543
                    {
 
544
                      msg_error("Invalid byte sequence, the remaining raw buffer is larger than the supported leftover size",
 
545
                                evt_tag_str("encoding", self->super.encoding),
 
546
                                evt_tag_int("avail_in", avail_in),
 
547
                                evt_tag_int("leftover_size", sizeof(state->raw_buffer_leftover)),
 
548
                                NULL);
 
549
                      goto error;
 
550
                    }
 
551
                  memcpy(state->raw_buffer_leftover, raw_buffer, avail_in);
 
552
                  state->raw_buffer_leftover_size = avail_in;
 
553
                  state->raw_buffer_size -= avail_in;
 
554
                  msg_trace("Leftover characters remained after conversion, delaying message until another chunk arrives",
 
555
                            evt_tag_str("encoding", self->super.encoding),
 
556
                            evt_tag_int("avail_in", avail_in),
 
557
                            NULL);
 
558
                  goto success;
 
559
                }
 
560
              break;
 
561
            case E2BIG:
 
562
              state->pending_buffer_end = state->buffer_size - avail_out;
 
563
              /* extend the buffer */
 
564
 
 
565
              if ((state->buffer_size < self->max_buffer_size))
 
566
                {
 
567
                  state->buffer_size *= 2;
 
568
                  if (state->buffer_size > self->max_buffer_size)
 
569
                    state->buffer_size = self->max_buffer_size;
 
570
 
 
571
                  self->buffer = g_realloc(self->buffer, state->buffer_size);
 
572
 
 
573
                  /* recalculate the out pointer, and add what we have now */
 
574
                  ret = -1;
 
575
                }
 
576
              else
 
577
                {
 
578
                  msg_error("Incoming byte stream requires a too large conversion buffer, probably invalid character sequence",
 
579
                            evt_tag_str("encoding", self->super.encoding),
 
580
                            evt_tag_printf("buffer", "%.*s", (gint) state->pending_buffer_end, self->buffer),
 
581
                            NULL);
 
582
                  goto error;
 
583
                }
 
584
              break;
 
585
            case EILSEQ:
 
586
            default:
 
587
              msg_notice("Invalid byte sequence or other error while converting input, skipping character",
 
588
                         evt_tag_str("encoding", self->super.encoding),
 
589
                         evt_tag_printf("char", "0x%02x", *(guchar *) raw_buffer),
 
590
                         NULL);
 
591
              goto error;
 
592
            }
 
593
        }
 
594
      else
 
595
        {
 
596
          state->pending_buffer_end = state->buffer_size - avail_out;
 
597
        }
 
598
    }
 
599
  while (avail_in > 0);
 
600
 
 
601
 success:
 
602
  success = TRUE;
 
603
 error:
 
604
  log_proto_buffered_server_put_state(self);
 
605
  return success;
 
606
}
 
607
 
 
608
static void
 
609
log_proto_buffered_server_apply_state(LogProtoBufferedServer *self, PersistEntryHandle handle, const gchar *persist_name)
 
610
{
 
611
  struct stat st;
 
612
  gint64 ofs = 0;
 
613
  LogProtoBufferedServerState *state;
 
614
  gint fd;
 
615
 
 
616
  fd = self->super.transport->fd;
 
617
  self->persist_handle = handle;
 
618
 
 
619
  if (fstat(fd, &st) < 0)
 
620
    return;
 
621
 
 
622
  state = log_proto_buffered_server_get_state(self);
 
623
 
 
624
  if (!self->buffer)
 
625
    {
 
626
      self->buffer = g_malloc(state->buffer_size);
 
627
    }
 
628
  state->pending_buffer_end = 0;
 
629
 
 
630
  if (state->file_inode &&
 
631
      state->file_inode == st.st_ino &&
 
632
      state->file_size <= st.st_size &&
 
633
      state->raw_stream_pos <= st.st_size)
 
634
    {
 
635
      ofs = state->raw_stream_pos;
 
636
 
 
637
      lseek(fd, ofs, SEEK_SET);
 
638
    }
 
639
  else
 
640
    {
 
641
      if (state->file_inode)
 
642
        {
 
643
          /* the stored state does not match the current file */
 
644
          msg_notice("The current log file has a mismatching size/inode information, restarting from the beginning",
 
645
                     evt_tag_str("state", persist_name),
 
646
                     evt_tag_int("stored_inode", state->file_inode),
 
647
                     evt_tag_int("cur_file_inode", st.st_ino),
 
648
                     evt_tag_int("stored_size", state->file_size),
 
649
                     evt_tag_int("cur_file_size", st.st_size),
 
650
                     evt_tag_int("raw_stream_pos", state->raw_stream_pos),
 
651
                     NULL);
 
652
        }
 
653
      goto error;
 
654
    }
 
655
  if (state->raw_buffer_size)
 
656
    {
 
657
      gssize rc;
 
658
      guchar *raw_buffer;
 
659
 
 
660
      if (!self->super.encoding)
 
661
        {
 
662
          /* no conversion, we read directly into our buffer */
 
663
          if (state->raw_buffer_size > state->buffer_size)
 
664
            {
 
665
              msg_notice("Invalid LogProtoBufferedServerState.raw_buffer_size, larger than buffer_size and no encoding is set, restarting from the beginning",
 
666
                         evt_tag_str("state", persist_name),
 
667
                         evt_tag_int("raw_buffer_size", state->raw_buffer_size),
 
668
                         evt_tag_int("buffer_size", state->buffer_size),
 
669
                         evt_tag_int("init_buffer_size", self->init_buffer_size),
 
670
                         NULL);
 
671
              goto error;
 
672
            }
 
673
          raw_buffer = self->buffer;
 
674
        }
 
675
      else
 
676
        {
 
677
          if (state->raw_buffer_size > self->init_buffer_size)
 
678
            {
 
679
              msg_notice("Invalid LogProtoBufferedServerState.raw_buffer_size, larger than init_buffer_size, restarting from the beginning",
 
680
                         evt_tag_str("state", persist_name),
 
681
                         evt_tag_int("raw_buffer_size", state->raw_buffer_size),
 
682
                         evt_tag_int("init_buffer_size", self->init_buffer_size),
 
683
                         NULL);
 
684
              goto error;
 
685
            }
 
686
          raw_buffer = g_alloca(state->raw_buffer_size);
 
687
        }
 
688
 
 
689
      rc = log_transport_read(self->super.transport, raw_buffer, state->raw_buffer_size, NULL);
 
690
      if (rc != state->raw_buffer_size)
 
691
        {
 
692
          msg_notice("Error re-reading buffer contents of the file to be continued, restarting from the beginning",
 
693
                     evt_tag_str("state", persist_name),
 
694
                     NULL);
 
695
          goto error;
 
696
        }
 
697
 
 
698
      state->pending_buffer_end = 0;
 
699
      if (self->super.encoding)
 
700
        {
 
701
          if (!log_proto_buffered_server_convert_from_raw(self, raw_buffer, rc))
 
702
            {
 
703
              msg_notice("Error re-converting buffer contents of the file to be continued, restarting from the beginning",
 
704
                         evt_tag_str("state", persist_name),
 
705
                         NULL);
 
706
              goto error;
 
707
            }
 
708
        }
 
709
      else
 
710
        {
 
711
          state->pending_buffer_end += rc;
 
712
        }
 
713
 
 
714
      if (state->buffer_pos > state->pending_buffer_end ||
 
715
          state->buffer_cached_eol > state->pending_buffer_end)
 
716
        {
 
717
          msg_notice("Converted buffer contents is smaller than the current buffer position, starting from the beginning of the buffer, some lines may be duplicated",
 
718
                     evt_tag_str("state", persist_name),
 
719
                     NULL);
 
720
          state->buffer_pos = state->pending_buffer_pos = state->buffer_cached_eol = 0;
 
721
        }
 
722
    }
 
723
  else
 
724
    {
 
725
      /* although we do have buffer position information, but the
 
726
       * complete contents of the buffer is already processed, instead
 
727
       * of reading and then dropping it, position the file after the
 
728
       * indicated block */
 
729
 
 
730
      state->raw_stream_pos += state->raw_buffer_size;
 
731
      ofs = state->raw_stream_pos;
 
732
      state->raw_buffer_size = 0;
 
733
      state->buffer_pos = state->pending_buffer_end = 0;
 
734
 
 
735
      lseek(fd, state->raw_stream_pos, SEEK_SET);
 
736
    }
 
737
  goto exit;
 
738
 
 
739
 error:
 
740
  ofs = 0;
 
741
  state->buffer_pos = 0;
 
742
  state->pending_buffer_end = 0;
 
743
  state->buffer_cached_eol = 0;
 
744
  state->raw_stream_pos = 0;
 
745
  state->raw_buffer_size = 0;
 
746
  state->raw_buffer_leftover_size = 0;
 
747
  lseek(fd, 0, SEEK_SET);
 
748
 
 
749
 exit:
 
750
  state->file_inode = st.st_ino;
 
751
  state->file_size = st.st_size;
 
752
  state->raw_stream_pos = ofs;
 
753
  state->pending_buffer_pos = state->buffer_pos;
 
754
  state->pending_raw_stream_pos = state->raw_stream_pos;
 
755
  state->pending_raw_buffer_size = state->raw_buffer_size;
 
756
 
 
757
  state = NULL;
 
758
  log_proto_buffered_server_put_state(self);
 
759
}
 
760
 
 
761
static PersistEntryHandle
 
762
log_proto_buffered_server_alloc_state(LogProtoBufferedServer *self, PersistState *persist_state, const gchar *persist_name)
 
763
{
 
764
  LogProtoBufferedServerState *state;
 
765
  PersistEntryHandle handle;
 
766
 
 
767
  handle = persist_state_alloc_entry(persist_state, persist_name, sizeof(LogProtoBufferedServerState));
 
768
  if (handle)
 
769
    {
 
770
      state = persist_state_map_entry(persist_state, handle);
 
771
 
 
772
      state->version = 0;
 
773
      state->big_endian = (G_BYTE_ORDER == G_BIG_ENDIAN);
 
774
 
 
775
      persist_state_unmap_entry(persist_state, handle);
 
776
 
 
777
    }
 
778
  return handle;
 
779
}
 
780
 
 
781
static gboolean
 
782
log_proto_buffered_server_convert_state(LogProtoBufferedServer *self, guint8 persist_version, gpointer old_state, gsize old_state_size, LogProtoBufferedServerState *state)
 
783
{
 
784
  if (persist_version <= 2)
 
785
    {
 
786
      state->version = 0;
 
787
      state->file_inode = 0;
 
788
      state->raw_stream_pos = strtoll((gchar *) old_state, NULL, 10);
 
789
      state->file_size = 0;
 
790
 
 
791
      return TRUE;
 
792
    }
 
793
  else if (persist_version == 3)
 
794
    {
 
795
      SerializeArchive *archive;
 
796
      guint32 read_length;
 
797
      gint64 cur_size;
 
798
      gint64 cur_inode;
 
799
      gint64 cur_pos;
 
800
      guint16 version;
 
801
      gchar *buffer;
 
802
      gsize buffer_len;
 
803
 
 
804
      cur_inode = -1;
 
805
      cur_pos = 0;
 
806
      cur_size = 0;
 
807
      archive = serialize_buffer_archive_new(old_state, old_state_size);
 
808
 
 
809
      /* NOTE: the v23 conversion code adds an extra length field which we
 
810
       * need to read out. */
 
811
      g_assert(serialize_read_uint32(archive, &read_length) && read_length == old_state_size - sizeof(read_length));
 
812
 
 
813
      /* original v3 format starts here */
 
814
      if (!serialize_read_uint16(archive, &version) || version != 0)
 
815
        {
 
816
          msg_error("Internal error restoring log reader state, stored data has incorrect version",
 
817
                    evt_tag_int("version", version));
 
818
          goto error_converting_v3;
 
819
        }
 
820
 
 
821
      if (!serialize_read_uint64(archive, (guint64 *) &cur_pos) ||
 
822
          !serialize_read_uint64(archive, (guint64 *) &cur_inode) ||
 
823
          !serialize_read_uint64(archive, (guint64 *) &cur_size))
 
824
        {
 
825
          msg_error("Internal error restoring information about the current file position, restarting from the beginning",
 
826
                    NULL);
 
827
          goto error_converting_v3;
 
828
        }
 
829
 
 
830
      if (!serialize_read_uint16(archive, &version) || version != 0)
 
831
        {
 
832
          msg_error("Internal error, protocol state has incorrect version",
 
833
                    evt_tag_int("version", version),
 
834
                    NULL);
 
835
          goto error_converting_v3;
 
836
        }
 
837
 
 
838
      if (!serialize_read_cstring(archive, &buffer, &buffer_len))
 
839
        {
 
840
          msg_error("Internal error, error reading buffer contents",
 
841
                    evt_tag_int("version", version),
 
842
                    NULL);
 
843
          goto error_converting_v3;
 
844
        }
 
845
 
 
846
      if (!self->buffer || state->buffer_size < buffer_len)
 
847
        {
 
848
          gsize buffer_size = MAX(self->init_buffer_size, buffer_len);
 
849
          self->buffer = g_realloc(self->buffer, buffer_size);
 
850
        }
 
851
      serialize_archive_free(archive);
 
852
 
 
853
      memcpy(self->buffer, buffer, buffer_len);
 
854
      state->buffer_pos = 0;
 
855
      state->pending_buffer_end = buffer_len;
 
856
      g_free(buffer);
 
857
 
 
858
      state->version = 0;
 
859
      state->file_inode = cur_inode;
 
860
      state->raw_stream_pos = cur_pos;
 
861
      state->file_size = cur_size;
 
862
      return TRUE;
 
863
    error_converting_v3:
 
864
      serialize_archive_free(archive);
 
865
    }
 
866
  return FALSE;
 
867
}
 
868
 
 
869
gboolean
 
870
log_proto_buffered_server_restart_with_state(LogProto *s, PersistState *persist_state, const gchar *persist_name)
 
871
{
 
872
  LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
 
873
  guint8 persist_version;
 
874
  PersistEntryHandle old_state_handle;
 
875
  gpointer old_state;
 
876
  gsize old_state_size;
 
877
  PersistEntryHandle new_state_handle = 0;
 
878
  gpointer new_state = NULL;
 
879
  gboolean success;
 
880
 
 
881
  self->persist_state = persist_state;
 
882
  old_state_handle = persist_state_lookup_entry(persist_state, persist_name, &old_state_size, &persist_version);
 
883
  if (!old_state_handle)
 
884
    {
 
885
      new_state_handle = log_proto_buffered_server_alloc_state(self, persist_state, persist_name);
 
886
      if (!new_state_handle)
 
887
        goto fallback_non_persistent;
 
888
      log_proto_buffered_server_apply_state(self, new_state_handle, persist_name);
 
889
      return TRUE;
 
890
    }
 
891
  if (persist_version < 4)
 
892
    {
 
893
      new_state_handle = log_proto_buffered_server_alloc_state(self, persist_state, persist_name);
 
894
      if (!new_state_handle)
 
895
        goto fallback_non_persistent;
 
896
 
 
897
      old_state = persist_state_map_entry(persist_state, old_state_handle);
 
898
      new_state = persist_state_map_entry(persist_state, new_state_handle);
 
899
      success = log_proto_buffered_server_convert_state(self, persist_version, old_state, old_state_size, new_state);
 
900
      persist_state_unmap_entry(persist_state, old_state_handle);
 
901
      persist_state_unmap_entry(persist_state, new_state_handle);
 
902
 
 
903
      /* we're using the newly allocated state structure regardless if
 
904
       * conversion succeeded. If the conversion went wrong then
 
905
       * new_state is still in its freshly initialized form since the
 
906
       * convert function will not touch the state in the error
 
907
       * branches.
 
908
       */
 
909
 
 
910
      log_proto_buffered_server_apply_state(self, new_state_handle, persist_name);
 
911
      return success;
 
912
    }
 
913
  else if (persist_version == 4)
 
914
    {
 
915
      LogProtoBufferedServerState *state;
 
916
 
 
917
      old_state = persist_state_map_entry(persist_state, old_state_handle);
 
918
      state = old_state;
 
919
      if ((state->big_endian && G_BYTE_ORDER == G_LITTLE_ENDIAN) ||
 
920
          (!state->big_endian && G_BYTE_ORDER == G_BIG_ENDIAN))
 
921
        {
 
922
 
 
923
          /* byte order conversion in order to avoid the hassle with
 
924
             scattered byte order conversions in the code */
 
925
 
 
926
          state->big_endian = !state->big_endian;
 
927
          state->buffer_pos = GUINT32_SWAP_LE_BE(state->buffer_pos);
 
928
          state->pending_buffer_pos = GUINT32_SWAP_LE_BE(state->pending_buffer_pos);
 
929
          state->pending_buffer_end = GUINT32_SWAP_LE_BE(state->pending_buffer_end);
 
930
          state->buffer_size = GUINT32_SWAP_LE_BE(state->buffer_size);
 
931
          state->buffer_cached_eol = GUINT32_SWAP_LE_BE(state->buffer_cached_eol);
 
932
          state->raw_stream_pos = GUINT64_SWAP_LE_BE(state->raw_stream_pos);
 
933
          state->raw_buffer_size = GUINT32_SWAP_LE_BE(state->raw_buffer_size);
 
934
          state->pending_raw_stream_pos = GUINT64_SWAP_LE_BE(state->pending_raw_stream_pos);
 
935
          state->pending_raw_buffer_size = GUINT32_SWAP_LE_BE(state->pending_raw_buffer_size);
 
936
          state->file_size = GUINT64_SWAP_LE_BE(state->file_size);
 
937
          state->file_inode = GUINT64_SWAP_LE_BE(state->file_inode);
 
938
        }
 
939
 
 
940
      if (state->version > 0)
 
941
        {
 
942
          msg_error("Internal error restoring log reader state, stored data is too new",
 
943
                    evt_tag_int("version", state->version));
 
944
          goto error;
 
945
        }
 
946
      persist_state_unmap_entry(persist_state, old_state_handle);
 
947
      log_proto_buffered_server_apply_state(self, old_state_handle, persist_name);
 
948
      return TRUE;
 
949
    }
 
950
  else
 
951
    {
 
952
      msg_error("Internal error restoring log reader state, stored data is too new",
 
953
                evt_tag_int("version", persist_version));
 
954
      goto error;
 
955
    }
 
956
  return TRUE;
 
957
 fallback_non_persistent:
 
958
  new_state = g_new0(LogProtoBufferedServerState, 1);
 
959
 error:
 
960
  if (!new_state)
 
961
    {
 
962
      new_state_handle = log_proto_buffered_server_alloc_state(self, persist_state, persist_name);
 
963
      if (!new_state_handle)
 
964
        goto fallback_non_persistent;
 
965
      new_state = persist_state_map_entry(persist_state, new_state_handle);
 
966
    }
 
967
  if (new_state)
 
968
    {
 
969
      LogProtoBufferedServerState *state = new_state;
 
970
 
 
971
      /* error happened,  restart the file from the beginning */
 
972
      state->raw_stream_pos = 0;
 
973
      state->file_inode = 0;
 
974
      state->file_size = 0;
 
975
      if (new_state_handle)
 
976
        log_proto_buffered_server_apply_state(self, new_state_handle, persist_name);
 
977
      else
 
978
        self->state1 = new_state;
 
979
    }
 
980
  if (new_state_handle)
 
981
    {
 
982
      persist_state_unmap_entry(persist_state, new_state_handle);
 
983
    }
 
984
  return FALSE;
 
985
}
 
986
 
 
987
static gboolean
 
988
log_proto_buffered_server_prepare(LogProto *s, gint *fd, GIOCondition *cond)
 
989
{
 
990
  LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
 
991
 
 
992
  *fd = self->super.transport->fd;
 
993
  *cond = self->super.transport->cond;
 
994
 
 
995
  /* if there's no pending I/O in the transport layer, then we want to do a read */
 
996
  if (*cond == 0)
 
997
    *cond = G_IO_IN;
 
998
 
 
999
  return FALSE;
 
1000
}
 
1001
 
 
1002
 
 
1003
static gint
 
1004
log_proto_buffered_server_read_data(LogProtoBufferedServer *self, guchar *buf, gsize len, GSockAddr **sa)
 
1005
{
 
1006
  return log_transport_read(self->super.transport, buf, len, sa);
 
1007
}
 
1008
 
 
1009
static LogProtoStatus
 
1010
log_proto_buffered_server_fetch_from_buf(LogProtoBufferedServer *self, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
 
1011
{
 
1012
  gsize buffer_bytes;
 
1013
  const guchar *buffer_start;
 
1014
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
 
1015
  gboolean success = FALSE;
 
1016
 
 
1017
  buffer_start = self->buffer + state->pending_buffer_pos;
 
1018
  buffer_bytes = state->pending_buffer_end - state->pending_buffer_pos;
 
1019
 
 
1020
  if (buffer_bytes == 0)
 
1021
    {
 
1022
      /* if buffer_bytes is zero bytes, it means that we completely
 
1023
       * processed our buffer without having a fraction of a line still
 
1024
       * there.  It is important to reset
 
1025
       * pending_buffer_pos/pending_buffer_end to zero as the caller assumes
 
1026
       * that if we return no message from the buffer, then buffer_pos is
 
1027
       * _zero_.
 
1028
       */
 
1029
 
 
1030
      if (G_UNLIKELY(self->super.flags & LPBS_POS_TRACKING))
 
1031
        {
 
1032
          state->pending_raw_stream_pos += state->pending_raw_buffer_size;
 
1033
          state->pending_raw_buffer_size = 0;
 
1034
        }
 
1035
      state->pending_buffer_pos = state->pending_buffer_end = 0;
 
1036
      goto exit;
 
1037
    }
 
1038
 
 
1039
  success = self->fetch_from_buf(self, buffer_start, buffer_bytes, msg, msg_len, flush_the_rest);
 
1040
 exit:
 
1041
  log_proto_buffered_server_put_state(self);
 
1042
  return success;
 
1043
}
 
1044
 
 
1045
/**
 
1046
 * Returns: TRUE to indicate success, FALSE otherwise. The returned
 
1047
 * msg can be NULL even if no failure occurred.
 
1048
 **/
 
1049
static LogProtoStatus
 
1050
log_proto_buffered_server_fetch(LogProto *s, const guchar **msg, gsize *msg_len, GSockAddr **sa, gboolean *may_read)
 
1051
{
 
1052
  LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
 
1053
  gint rc;
 
1054
  guchar *raw_buffer = NULL;
 
1055
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
 
1056
  LogProtoStatus result = self->status;
 
1057
 
 
1058
  if (G_UNLIKELY(!self->buffer))
 
1059
    {
 
1060
      self->buffer = g_malloc(self->init_buffer_size);
 
1061
      state->buffer_size = self->init_buffer_size;
 
1062
    }
 
1063
 
 
1064
  if (sa)
 
1065
    *sa = NULL;
 
1066
 
 
1067
  if (self->status != LPS_SUCCESS)
 
1068
    {
 
1069
      goto exit;
 
1070
    }
 
1071
 
 
1072
  if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, FALSE))
 
1073
    {
 
1074
      if (sa && self->prev_saddr)
 
1075
        *sa = g_sockaddr_ref(self->prev_saddr);
 
1076
      goto exit;
 
1077
    }
 
1078
 
 
1079
  /* ok, no more messages in the buffer, read a chunk */
 
1080
  while (*may_read)
 
1081
    {
 
1082
      gint avail;
 
1083
 
 
1084
      if (self->super.flags & LPBS_NOMREAD)
 
1085
        *may_read = FALSE;
 
1086
 
 
1087
      /* read the next chunk to be processed */
 
1088
 
 
1089
      if (self->prev_saddr)
 
1090
        {
 
1091
          /* new chunk of data, potentially new sockaddr, forget the previous value */
 
1092
          g_sockaddr_unref(self->prev_saddr);
 
1093
          self->prev_saddr = NULL;
 
1094
        }
 
1095
 
 
1096
      if (!self->super.encoding)
 
1097
        {
 
1098
          /* no conversion, we read directly into our buffer */
 
1099
          raw_buffer = self->buffer + state->pending_buffer_end;
 
1100
          avail = state->buffer_size - state->pending_buffer_end;
 
1101
        }
 
1102
      else
 
1103
        {
 
1104
          /* if conversion is needed, we first read into an on-stack
 
1105
           * buffer, and then convert it into our internal buffer */
 
1106
 
 
1107
          raw_buffer = g_alloca(self->init_buffer_size + state->raw_buffer_leftover_size);
 
1108
          memcpy(raw_buffer, state->raw_buffer_leftover, state->raw_buffer_leftover_size);
 
1109
          avail = self->init_buffer_size;
 
1110
        }
 
1111
 
 
1112
      rc = self->read_data(self, raw_buffer + state->raw_buffer_leftover_size, avail, sa);
 
1113
      if (sa && *sa)
 
1114
        self->prev_saddr = *sa;
 
1115
      if (rc < 0)
 
1116
        {
 
1117
          if (errno == EAGAIN)
 
1118
            {
 
1119
              /* ok we don't have any more data to read, return to main poll loop */
 
1120
              break;
 
1121
            }
 
1122
          else
 
1123
            {
 
1124
              /* an error occurred while reading */
 
1125
              msg_error("I/O error occurred while reading",
 
1126
                        evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
 
1127
                        evt_tag_errno(EVT_TAG_OSERROR, errno),
 
1128
                        NULL);
 
1129
 
 
1130
              /* we set self->status explicitly as we want to return
 
1131
               * LPS_ERROR on the _next_ invocation, not now */
 
1132
              self->status = LPS_ERROR;
 
1133
              if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, TRUE))
 
1134
                {
 
1135
                  if (sa && self->prev_saddr)
 
1136
                    *sa = g_sockaddr_ref(self->prev_saddr);
 
1137
                  goto exit;
 
1138
                }
 
1139
              result = self->status;
 
1140
              goto exit;
 
1141
            }
 
1142
        }
 
1143
      else if (rc == 0)
 
1144
        {
 
1145
          if ((self->super.flags & LPBS_IGNORE_EOF) == 0)
 
1146
            {
 
1147
              /* EOF read */
 
1148
              msg_verbose("EOF occurred while reading",
 
1149
                          evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
 
1150
                          NULL);
 
1151
              if (state->raw_buffer_leftover_size > 0)
 
1152
                {
 
1153
                  msg_error("EOF read on a channel with leftovers from previous character conversion, dropping input",
 
1154
                            NULL);
 
1155
                  result = LPS_EOF;
 
1156
                  goto exit;
 
1157
                }
 
1158
              self->status = LPS_EOF;
 
1159
              if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, TRUE))
 
1160
                {
 
1161
                  if (sa && self->prev_saddr)
 
1162
                    *sa = g_sockaddr_ref(self->prev_saddr);
 
1163
                  goto exit;
 
1164
                }
 
1165
              result = self->status;
 
1166
              goto exit;
 
1167
            }
 
1168
          else
 
1169
            {
 
1170
              *msg = NULL;
 
1171
              *msg_len = 0;
 
1172
              goto exit;
 
1173
            }
 
1174
        }
 
1175
      else
 
1176
        {
 
1177
          state->pending_raw_buffer_size += rc;
 
1178
          rc += state->raw_buffer_leftover_size;
 
1179
          state->raw_buffer_leftover_size = 0;
 
1180
 
 
1181
          if (self->super.encoding)
 
1182
            {
 
1183
              if (!log_proto_buffered_server_convert_from_raw(self, raw_buffer, rc))
 
1184
                {
 
1185
                  result = LPS_ERROR;
 
1186
                  goto exit;
 
1187
                }
 
1188
            }
 
1189
          else
 
1190
            {
 
1191
              state->pending_buffer_end += rc;
 
1192
            }
 
1193
 
 
1194
          if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, FALSE))
 
1195
            {
 
1196
              if (sa && self->prev_saddr)
 
1197
                *sa = g_sockaddr_ref(self->prev_saddr);
 
1198
              goto exit;
 
1199
            }
 
1200
        }
 
1201
    }
 
1202
 exit:
 
1203
 
 
1204
  /* result contains our result, but once an error happens, the error condition remains persistent */
 
1205
  log_proto_buffered_server_put_state(self);
 
1206
  if (result != LPS_SUCCESS)
 
1207
    self->status = result;
 
1208
  return result;
 
1209
}
 
1210
 
 
1211
void
 
1212
log_proto_buffered_server_queued(LogProto *s)
 
1213
{
 
1214
  LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
 
1215
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
 
1216
 
 
1217
  /* NOTE: we modify the current file position _after_ updating
 
1218
     buffer_pos, since if we crash right here, at least we
 
1219
     won't lose data on the next restart, but rather we
 
1220
     duplicate some data */
 
1221
 
 
1222
  state->buffer_pos = state->pending_buffer_pos;
 
1223
  state->raw_stream_pos = state->pending_raw_stream_pos;
 
1224
  state->raw_buffer_size = state->pending_raw_buffer_size;
 
1225
  if (state->pending_buffer_pos == state->pending_buffer_end)
 
1226
    {
 
1227
      state->pending_buffer_end = 0;
 
1228
      state->buffer_pos = state->pending_buffer_pos = 0;
 
1229
    }
 
1230
  if (self->super.flags & LPBS_POS_TRACKING)
 
1231
    {
 
1232
      if (state->buffer_pos == state->pending_buffer_end)
 
1233
        {
 
1234
          state->raw_stream_pos += state->raw_buffer_size;
 
1235
          state->raw_buffer_size = 0;
 
1236
        }
 
1237
    }
 
1238
  msg_trace("Last message got confirmed",
 
1239
            evt_tag_int("raw_stream_pos", state->raw_stream_pos),
 
1240
            evt_tag_int("raw_buffer_len", state->raw_buffer_size),
 
1241
            evt_tag_int("buffer_pos", state->buffer_pos),
 
1242
            evt_tag_int("buffer_end", state->pending_buffer_end),
 
1243
            evt_tag_int("buffer_cached_eol", state->buffer_cached_eol),
 
1244
            NULL);
 
1245
  log_proto_buffered_server_put_state(self);
 
1246
}
 
1247
 
 
1248
void
 
1249
log_proto_buffered_server_free(LogProto *s)
 
1250
{
 
1251
  LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
 
1252
 
 
1253
  g_sockaddr_unref(self->prev_saddr);
 
1254
 
 
1255
  g_free(self->buffer);
 
1256
  if (self->state1)
 
1257
    {
 
1258
      g_free(self->state1);
 
1259
    }
 
1260
}
 
1261
 
 
1262
void
 
1263
log_proto_buffered_server_init(LogProtoBufferedServer *self, LogTransport *transport, gint max_buffer_size, gint init_buffer_size, guint flags)
 
1264
{
 
1265
  self->super.prepare = log_proto_buffered_server_prepare;
 
1266
  self->super.fetch = log_proto_buffered_server_fetch;
 
1267
  self->super.queued = log_proto_buffered_server_queued;
 
1268
  self->super.free_fn = log_proto_buffered_server_free;
 
1269
  self->super.transport = transport;
 
1270
  self->super.convert = (GIConv) -1;
 
1271
  self->super.restart_with_state = log_proto_buffered_server_restart_with_state;
 
1272
  self->read_data = log_proto_buffered_server_read_data;
 
1273
 
 
1274
  self->super.flags = flags;
 
1275
 
 
1276
  self->init_buffer_size = init_buffer_size;
 
1277
  self->max_buffer_size = max_buffer_size;
 
1278
}
 
1279
 
 
1280
struct _LogProtoTextServer
 
1281
{
 
1282
  LogProtoBufferedServer super;
 
1283
  GIConv reverse_convert;
 
1284
  gchar *reverse_buffer;
 
1285
  gsize reverse_buffer_len;
 
1286
  gint convert_scale;
 
1287
};
 
1288
 
 
1289
/**
 
1290
 * This function is called in cases when several files are continously
 
1291
 * polled for changes.  Whenever the caller would like to switch to another
 
1292
 * file, it will call this function to check whether it should be allowed to do so.
 
1293
 *
 
1294
 * This function returns true if the current state of this LogProto would
 
1295
 * allow preemption, e.g.  the contents of the current buffer can be
 
1296
 * discarded.
 
1297
 **/
 
1298
gboolean
 
1299
log_proto_text_server_is_preemptable(LogProtoTextServer *self)
 
1300
{
 
1301
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
 
1302
  gboolean preemptable;
 
1303
 
 
1304
  preemptable = (state->buffer_cached_eol == 0);
 
1305
  log_proto_buffered_server_put_state(&self->super);
 
1306
  return preemptable;
 
1307
}
 
1308
 
 
1309
static gboolean
 
1310
log_proto_text_server_prepare(LogProto *s, gint *fd, GIOCondition *cond)
 
1311
{
 
1312
  LogProtoTextServer *self = (LogProtoTextServer *) s;
 
1313
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
 
1314
  gboolean avail;
 
1315
 
 
1316
  if (log_proto_buffered_server_prepare(s, fd, cond))
 
1317
    {
 
1318
      log_proto_buffered_server_put_state(&self->super);
 
1319
      return TRUE;
 
1320
    }
 
1321
 
 
1322
  avail = (state->buffer_cached_eol != 0);
 
1323
  log_proto_buffered_server_put_state(&self->super);
 
1324
  return avail;
 
1325
}
 
1326
 
 
1327
/**
 
1328
 * Find the character terminating the buffer.
 
1329
 *
 
1330
 * NOTE: when looking for the end-of-message here, it either needs to be
 
1331
 * terminated via NUL or via NL, when terminating via NL we have to make
 
1332
 * sure that there's no NUL left in the message. This function iterates over
 
1333
 * the input data and returns a pointer to the first occurence of NL or NUL.
 
1334
 *
 
1335
 * It uses an algorithm similar to what there's in libc memchr/strchr.
 
1336
 *
 
1337
 * NOTE: find_eom is not static as it is used by a unit test program.
 
1338
 **/
 
1339
const guchar *
 
1340
find_eom(const guchar *s, gsize n)
 
1341
{
 
1342
  const guchar *char_ptr;
 
1343
  const gulong *longword_ptr;
 
1344
  gulong longword, magic_bits, charmask;
 
1345
  gchar c;
 
1346
 
 
1347
  c = '\n';
 
1348
 
 
1349
  /* align input to long boundary */
 
1350
  for (char_ptr = s; n > 0 && ((gulong) char_ptr & (sizeof(longword) - 1)) != 0; ++char_ptr, n--)
 
1351
    {
 
1352
      if (*char_ptr == c || *char_ptr == '\0')
 
1353
        return char_ptr;
 
1354
    }
 
1355
 
 
1356
  longword_ptr = (gulong *) char_ptr;
 
1357
 
 
1358
#if GLIB_SIZEOF_LONG == 8
 
1359
  magic_bits = 0x7efefefefefefeffL;
 
1360
#elif GLIB_SIZEOF_LONG == 4
 
1361
  magic_bits = 0x7efefeffL;
 
1362
#else
 
1363
  #error "unknown architecture"
 
1364
#endif
 
1365
  memset(&charmask, c, sizeof(charmask));
 
1366
 
 
1367
  while (n > sizeof(longword))
 
1368
    {
 
1369
      longword = *longword_ptr++;
 
1370
      if ((((longword + magic_bits) ^ ~longword) & ~magic_bits) != 0 ||
 
1371
          ((((longword ^ charmask) + magic_bits) ^ ~(longword ^ charmask)) & ~magic_bits) != 0)
 
1372
        {
 
1373
          gint i;
 
1374
 
 
1375
          char_ptr = (const guchar *) (longword_ptr - 1);
 
1376
 
 
1377
          for (i = 0; i < sizeof(longword); i++)
 
1378
            {
 
1379
              if (*char_ptr == c || *char_ptr == '\0')
 
1380
                return char_ptr;
 
1381
              char_ptr++;
 
1382
            }
 
1383
        }
 
1384
      n -= sizeof(longword);
 
1385
    }
 
1386
 
 
1387
  char_ptr = (const guchar *) longword_ptr;
 
1388
 
 
1389
  while (n-- > 0)
 
1390
    {
 
1391
      if (*char_ptr == c || *char_ptr == '\0')
 
1392
        return char_ptr;
 
1393
      ++char_ptr;
 
1394
    }
 
1395
 
 
1396
  return NULL;
 
1397
}
 
1398
 
 
1399
struct
 
1400
{
 
1401
  const gchar *prefix;
 
1402
  gint scale;
 
1403
} fixed_encodings[] = {
 
1404
  { "ascii", 1 },
 
1405
  { "us-ascii", 1 },
 
1406
  { "iso-8859", 1 },
 
1407
  { "iso8859", 1 },
 
1408
  { "latin", 1 },
 
1409
  { "ucs2", 2 },
 
1410
  { "ucs-2", 2 },
 
1411
  { "ucs4", 4 },
 
1412
  { "ucs-4", 4 },
 
1413
  { "koi", 1 },
 
1414
  { "unicode", 2 },
 
1415
  { "windows", 1 },
 
1416
  { "wchar_t", sizeof(wchar_t) },
 
1417
  { NULL, 0 }
 
1418
};
 
1419
 
 
1420
/*
 
1421
 * returns the number of bytes that represent the UTF8 encoding buffer
 
1422
 * in the original encoding that the user specified.
 
1423
 *
 
1424
 * NOTE: this is slow, but we only call this for the remainder of our
 
1425
 * buffer (e.g. the partial line at the end of our last chunk of read
 
1426
 * data). Also, this is only invoked if the file uses an encoding.
 
1427
 */
 
1428
static gsize
 
1429
log_proto_text_server_get_raw_size_of_buffer(LogProtoTextServer *self, const guchar *buffer, gsize buffer_len)
 
1430
{
 
1431
  gchar *out;
 
1432
  const guchar *in;
 
1433
  gsize avail_out, avail_in;
 
1434
  gint ret;
 
1435
 
 
1436
  if (self->reverse_convert == ((GIConv) -1) && !self->convert_scale)
 
1437
    {
 
1438
      gint i;
 
1439
 
 
1440
      /* try to speed up raw size calculation by recognizing the most
 
1441
       * prominent character encodings and in the case the encoding
 
1442
       * uses fixed size characters set that in self->convert_scale,
 
1443
       * which in turn will speed up the reversal of the UTF8 buffer
 
1444
       * size to raw buffer sizes.
 
1445
       */
 
1446
      for (i = 0; fixed_encodings[i].prefix; i++)
 
1447
        {
 
1448
          if (strncasecmp(self->super.super.encoding, fixed_encodings[i].prefix, strlen(fixed_encodings[i].prefix) == 0))
 
1449
            {
 
1450
              self->convert_scale = fixed_encodings[i].scale;
 
1451
              break;
 
1452
            }
 
1453
        }
 
1454
      if (!fixed_encodings[i].prefix)
 
1455
        {
 
1456
          self->reverse_convert = g_iconv_open(self->super.super.encoding, "utf-8");
 
1457
        }
 
1458
    }
 
1459
 
 
1460
  if (self->convert_scale)
 
1461
    return g_utf8_strlen((gchar *) buffer, buffer_len) * self->convert_scale;
 
1462
 
 
1463
  if (self->reverse_buffer_len < buffer_len * 6)
 
1464
    {
 
1465
      /* we free and malloc, since we never need the data still in reverse buffer */
 
1466
      g_free(self->reverse_buffer);
 
1467
      self->reverse_buffer_len = buffer_len * 6;
 
1468
      self->reverse_buffer = g_malloc(buffer_len * 6);
 
1469
    }
 
1470
 
 
1471
  avail_out = self->reverse_buffer_len;
 
1472
  out = self->reverse_buffer;
 
1473
 
 
1474
  avail_in = buffer_len;
 
1475
  in = buffer;
 
1476
 
 
1477
  ret = g_iconv(self->reverse_convert, (gchar **) &in, &avail_in, &out, &avail_out);
 
1478
  if (ret == (gsize) -1)
 
1479
    {
 
1480
      /* oops, we cannot reverse that we ourselves converted to UTF-8,
 
1481
       * this is simply impossible, but never say never */
 
1482
      msg_error("Internal error, couldn't reverse the internal UTF8 string to the original encoding",
 
1483
                evt_tag_printf("buffer", "%.*s", (gint) buffer_len, buffer),
 
1484
                NULL);
 
1485
      return 0;
 
1486
    }
 
1487
  else
 
1488
    {
 
1489
      return self->reverse_buffer_len - avail_out;
 
1490
    }
 
1491
}
 
1492
 
 
1493
 
 
1494
/**
 
1495
 * log_proto_text_server_fetch_from_buf:
 
1496
 * @self: LogReader instance
 
1497
 * @saddr: socket address to be assigned to new messages (consumed!)
 
1498
 * @flush: whether to flush the input buffer
 
1499
 * @msg_counter: the number of messages processed in the current poll iteration
 
1500
 *
 
1501
 * Returns TRUE if a message was found in the buffer, FALSE if we need to read again.
 
1502
 **/
 
1503
static gboolean
 
1504
log_proto_text_server_fetch_from_buf(LogProtoBufferedServer *s, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
 
1505
{
 
1506
  LogProtoTextServer *self = (LogProtoTextServer *) s;
 
1507
  const guchar *eol;
 
1508
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
 
1509
  gboolean result = FALSE;
 
1510
 
 
1511
  if (flush_the_rest)
 
1512
    {
 
1513
      /*
 
1514
       * we are set to packet terminating mode or the connection is to
 
1515
       * be teared down and we have partial data in our buffer.
 
1516
       */
 
1517
      *msg = buffer_start;
 
1518
      *msg_len = buffer_bytes;
 
1519
      state->pending_buffer_pos = state->pending_buffer_end;
 
1520
      goto success;
 
1521
    }
 
1522
 
 
1523
  if (state->buffer_cached_eol)
 
1524
    {
 
1525
      /* previous invocation was nice enough to save a cached EOL
 
1526
       * pointer, no need to look it up again */
 
1527
 
 
1528
      eol = self->super.buffer + state->buffer_cached_eol;
 
1529
      state->buffer_cached_eol = 0;
 
1530
    }
 
1531
  else
 
1532
    {
 
1533
      eol = find_eom(buffer_start, buffer_bytes);
 
1534
    }
 
1535
  if ((!eol && (buffer_bytes == state->buffer_size)))
 
1536
    {
 
1537
      /* our buffer is full and no EOL was found */
 
1538
      *msg_len = buffer_bytes;
 
1539
      state->pending_buffer_pos = state->pending_buffer_end;
 
1540
      *msg = buffer_start;
 
1541
      goto success;
 
1542
    }
 
1543
  else if (!eol)
 
1544
    {
 
1545
      gsize raw_split_size;
 
1546
 
 
1547
      /* buffer is not full, but no EOL is present, move partial line
 
1548
       * to the beginning of the buffer to make space for new data.
 
1549
       */
 
1550
 
 
1551
      memmove(self->super.buffer, buffer_start, buffer_bytes);
 
1552
      state->pending_buffer_pos = 0;
 
1553
      state->pending_buffer_end = buffer_bytes;
 
1554
 
 
1555
      if (G_UNLIKELY(self->super.super.flags & LPBS_POS_TRACKING))
 
1556
        {
 
1557
          /* NOTE: we modify the current file position _after_ updating
 
1558
             buffer_pos, since if we crash right here, at least we
 
1559
             won't lose data on the next restart, but rather we
 
1560
             duplicate some data */
 
1561
 
 
1562
          if (self->super.super.encoding)
 
1563
            raw_split_size = log_proto_text_server_get_raw_size_of_buffer(self, buffer_start, buffer_bytes);
 
1564
          else
 
1565
            raw_split_size = buffer_bytes;
 
1566
 
 
1567
          state->pending_raw_stream_pos += (gint64) (state->pending_raw_buffer_size - raw_split_size);
 
1568
          state->pending_raw_buffer_size = raw_split_size;
 
1569
 
 
1570
          msg_trace("Buffer split",
 
1571
                    evt_tag_int("raw_split_size", raw_split_size),
 
1572
                    evt_tag_int("buffer_bytes", buffer_bytes),
 
1573
                    NULL);
 
1574
        }
 
1575
      goto exit;
 
1576
    }
 
1577
  else
 
1578
    {
 
1579
      const guchar *msg_end = eol;
 
1580
 
 
1581
      /* eol points at the newline character. end points at the
 
1582
       * character terminating the line, which may be a carriage
 
1583
       * return preceeding the newline. */
 
1584
 
 
1585
      while ((msg_end > buffer_start) && (msg_end[-1] == '\r' || msg_end[-1] == '\n' || msg_end[-1] == 0))
 
1586
        msg_end--;
 
1587
 
 
1588
      *msg_len = msg_end - buffer_start;
 
1589
      *msg = buffer_start;
 
1590
      state->pending_buffer_pos = eol + 1 - self->super.buffer;
 
1591
 
 
1592
      if (state->pending_buffer_end != state->pending_buffer_pos)
 
1593
        {
 
1594
          const guchar *eom;
 
1595
          /* store the end of the next line, it indicates whether we need
 
1596
           * to read further data, or the buffer already contains a
 
1597
           * complete line */
 
1598
          eom = find_eom(self->super.buffer + state->pending_buffer_pos, state->pending_buffer_end - state->pending_buffer_pos);
 
1599
          if (eom)
 
1600
            state->buffer_cached_eol = eom - self->super.buffer;
 
1601
          else
 
1602
            state->buffer_cached_eol = 0;
 
1603
        }
 
1604
      else
 
1605
        {
 
1606
          state->pending_buffer_pos = state->pending_buffer_end;
 
1607
        }
 
1608
      goto success;
 
1609
    }
 
1610
 success:
 
1611
  result = TRUE;
 
1612
 exit:
 
1613
  log_proto_buffered_server_put_state(&self->super);
 
1614
  return result;
 
1615
}
 
1616
 
 
1617
void
 
1618
log_proto_text_server_free(LogProtoTextServer *self)
 
1619
{
 
1620
  if (self->reverse_convert != (GIConv) -1)
 
1621
    g_iconv_close(self->reverse_convert);
 
1622
 
 
1623
  g_free(self->reverse_buffer);
 
1624
  log_proto_buffered_server_free(&self->super.super);
 
1625
}
 
1626
 
 
1627
static void
 
1628
log_proto_text_server_init(LogProtoTextServer *self, LogTransport *transport, gint max_msg_size, guint flags)
 
1629
{
 
1630
  log_proto_buffered_server_init(&self->super, transport, max_msg_size * 6, max_msg_size, flags);
 
1631
  self->super.fetch_from_buf = log_proto_text_server_fetch_from_buf;
 
1632
  self->super.super.prepare = log_proto_text_server_prepare;
 
1633
  self->reverse_convert = (GIConv) -1;
 
1634
}
 
1635
 
 
1636
LogProto *
 
1637
log_proto_text_server_new(LogTransport *transport, gint max_msg_size, guint flags)
 
1638
{
 
1639
  LogProtoTextServer *self = g_new0(LogProtoTextServer, 1);
 
1640
 
 
1641
  log_proto_text_server_init(self, transport, max_msg_size, flags);
 
1642
  return &self->super.super;
 
1643
}
 
1644
 
 
1645
/* proto that reads the stream in even sized chunks */
 
1646
typedef struct _LogProtoRecordServer LogProtoRecordServer;
 
1647
struct _LogProtoRecordServer
 
1648
{
 
1649
  LogProtoBufferedServer super;
 
1650
  gsize record_size;
 
1651
};
 
1652
 
 
1653
static gboolean
 
1654
log_proto_record_server_fetch_from_buf(LogProtoBufferedServer *s, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
 
1655
{
 
1656
  LogProtoRecordServer *self = (LogProtoRecordServer *) s;
 
1657
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(s);
 
1658
  const guchar *eol;
 
1659
 
 
1660
  if (!(self->super.super.flags & LPRS_BINARY))
 
1661
    {
 
1662
      eol = find_eom(buffer_start, buffer_bytes);
 
1663
      *msg_len = (eol ? eol - buffer_start : buffer_bytes);
 
1664
    }
 
1665
  else
 
1666
    {
 
1667
      *msg_len = buffer_bytes;
 
1668
    }
 
1669
  state->pending_buffer_pos = state->pending_buffer_end;
 
1670
  *msg = buffer_start;
 
1671
  log_proto_buffered_server_put_state(s);
 
1672
  return TRUE;
 
1673
}
 
1674
 
 
1675
static gint
 
1676
log_proto_record_server_read_data(LogProtoBufferedServer *s, guchar *buf, gsize len, GSockAddr **sa)
 
1677
{
 
1678
  LogProtoRecordServer *self = (LogProtoRecordServer *) s;
 
1679
  gint rc;
 
1680
 
 
1681
  g_assert(len <= self->record_size);
 
1682
  len = self->record_size;
 
1683
  rc = log_transport_read(self->super.super.transport, buf, len, sa);
 
1684
  if (rc > 0 && rc != self->record_size)
 
1685
    {
 
1686
      msg_error("Padding was set, and couldn't read enough bytes",
 
1687
                evt_tag_int(EVT_TAG_FD, self->super.super.transport->fd),
 
1688
                evt_tag_int("padding", self->record_size),
 
1689
                evt_tag_int("read", rc),
 
1690
                NULL);
 
1691
      errno = EIO;
 
1692
      return -1;
 
1693
    }
 
1694
  return rc;
 
1695
}
 
1696
 
 
1697
LogProto *
 
1698
log_proto_record_server_new(LogTransport *transport, gint record_size, guint flags)
 
1699
{
 
1700
  LogProtoRecordServer *self = g_new0(LogProtoRecordServer, 1);
 
1701
 
 
1702
  log_proto_buffered_server_init(&self->super, transport, record_size * 6, record_size, flags);
 
1703
  self->super.fetch_from_buf = log_proto_record_server_fetch_from_buf;
 
1704
  self->super.read_data = log_proto_record_server_read_data;
 
1705
  self->record_size = record_size;
 
1706
  return &self->super.super;
 
1707
}
 
1708
 
 
1709
/* proto that reads the stream in even sized chunks */
 
1710
typedef struct _LogProtoDGramServer LogProtoDGramServer;
 
1711
struct _LogProtoDGramServer
 
1712
{
 
1713
  LogProtoBufferedServer super;
 
1714
};
 
1715
 
 
1716
static gboolean
 
1717
log_proto_dgram_server_fetch_from_buf(LogProtoBufferedServer *s, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
 
1718
{
 
1719
  LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(s);
 
1720
 
 
1721
  /*
 
1722
   * we are set to packet terminating mode
 
1723
   */
 
1724
  *msg = buffer_start;
 
1725
  *msg_len = buffer_bytes;
 
1726
  state->pending_buffer_pos = state->pending_buffer_end;
 
1727
  log_proto_buffered_server_put_state(s);
 
1728
  return TRUE;
 
1729
}
 
1730
 
 
1731
LogProto *
 
1732
log_proto_dgram_server_new(LogTransport *transport, gint max_msg_size, guint flags)
 
1733
{
 
1734
  LogProtoRecordServer *self = g_new0(LogProtoRecordServer, 1);
 
1735
 
 
1736
  log_proto_buffered_server_init(&self->super, transport, max_msg_size * 6, max_msg_size, flags | LPBS_IGNORE_EOF);
 
1737
  self->super.fetch_from_buf = log_proto_dgram_server_fetch_from_buf;
 
1738
  return &self->super.super;
 
1739
}
 
1740
 
 
1741
#define LPFCS_FRAME_INIT    0
 
1742
#define LPFCS_FRAME_SEND    1
 
1743
#define LPFCS_MESSAGE_SEND  2
 
1744
 
 
1745
typedef struct _LogProtoFramedClient
 
1746
{
 
1747
  LogProtoTextClient super;
 
1748
  gint state;
 
1749
  gchar frame_hdr_buf[9];
 
1750
  gint frame_hdr_len, frame_hdr_pos;
 
1751
} LogProtoFramedClient;
 
1752
 
 
1753
static LogProtoStatus
 
1754
log_proto_framed_client_post(LogProto *s, guchar *msg, gsize msg_len, gboolean *consumed)
 
1755
{
 
1756
  LogProtoFramedClient *self = (LogProtoFramedClient *) s;
 
1757
  gint rc;
 
1758
 
 
1759
  if (msg_len > 9999999)
 
1760
    {
 
1761
      static const guchar *warn_msg;
 
1762
      
 
1763
      if (warn_msg != msg)
 
1764
        {
 
1765
          msg_warning("Error, message length too large for framed protocol, truncated",
 
1766
                      evt_tag_int("length", msg_len),
 
1767
                      NULL);
 
1768
          warn_msg = msg;
 
1769
        }
 
1770
      msg_len = 9999999;
 
1771
    }
 
1772
  switch (self->state)
 
1773
    {
 
1774
    case LPFCS_FRAME_INIT:
 
1775
      self->frame_hdr_len = g_snprintf(self->frame_hdr_buf, sizeof(self->frame_hdr_buf), "%" G_GSIZE_FORMAT" ", msg_len);
 
1776
      self->frame_hdr_pos = 0;
 
1777
      self->state = LPFCS_FRAME_SEND;
 
1778
    case LPFCS_FRAME_SEND:
 
1779
      rc = log_transport_write(s->transport, &self->frame_hdr_buf[self->frame_hdr_pos], self->frame_hdr_len - self->frame_hdr_pos);
 
1780
      if (rc < 0)
 
1781
        {
 
1782
          if (errno != EAGAIN)
 
1783
            {
 
1784
              msg_error("I/O error occurred while writing",
 
1785
                        evt_tag_int("fd", self->super.super.transport->fd),
 
1786
                        evt_tag_errno(EVT_TAG_OSERROR, errno),
 
1787
                        NULL);
 
1788
              return LPS_ERROR;
 
1789
            }
 
1790
          break;
 
1791
        }
 
1792
      else
 
1793
        {
 
1794
          self->frame_hdr_pos += rc;
 
1795
          if (self->frame_hdr_pos != self->frame_hdr_len)
 
1796
            break;
 
1797
          self->state = LPFCS_MESSAGE_SEND;
 
1798
        }
 
1799
    case LPFCS_MESSAGE_SEND:
 
1800
      rc = log_proto_text_client_post(s, msg, msg_len, consumed);
 
1801
      
 
1802
      /* NOTE: we don't check *consumed here, as we might have a pending
 
1803
       * message in self->partial before we begin, in which case *consumed
 
1804
       * will be FALSE. */
 
1805
      
 
1806
      if (rc == LPS_SUCCESS && self->super.partial == NULL)
 
1807
        {
 
1808
          self->state = LPFCS_FRAME_INIT;
 
1809
        }
 
1810
      return rc;
 
1811
    default:
 
1812
      g_assert_not_reached();
 
1813
    }
 
1814
  return LPS_SUCCESS;
 
1815
}
 
1816
 
 
1817
LogProto *
 
1818
log_proto_framed_client_new(LogTransport *transport)
 
1819
{
 
1820
  LogProtoFramedClient *self = g_new0(LogProtoFramedClient, 1);
 
1821
 
 
1822
  self->super.super.prepare = log_proto_text_client_prepare;
 
1823
  self->super.super.post = log_proto_framed_client_post;
 
1824
  self->super.super.flush = log_proto_text_client_flush;
 
1825
  self->super.super.transport = transport;
 
1826
  self->super.super.convert = (GIConv) -1;
 
1827
  return &self->super.super;  
 
1828
}
 
1829
 
 
1830
#define LPFSS_FRAME_READ    0
 
1831
#define LPFSS_MESSAGE_READ  1
 
1832
 
 
1833
#define LPFS_FRAME_BUFFER 10
 
1834
 
 
1835
typedef struct _LogProtoFramedServer
 
1836
{
 
1837
  LogProto super;
 
1838
  gint state;
 
1839
 
 
1840
  guchar *buffer;
 
1841
  gsize buffer_size, buffer_pos, buffer_end;
 
1842
  gsize frame_len;
 
1843
  gsize max_msg_size;
 
1844
  gboolean half_message_in_buffer;
 
1845
  GSockAddr *prev_saddr;
 
1846
  LogProtoStatus status;
 
1847
} LogProtoFramedServer;
 
1848
 
 
1849
static gboolean
 
1850
log_proto_framed_server_prepare(LogProto *s, gint *fd, GIOCondition *cond)
 
1851
{
 
1852
  LogProtoFramedServer *self = (LogProtoFramedServer *) s;
 
1853
 
 
1854
  *fd = self->super.transport->fd;
 
1855
  *cond = self->super.transport->cond;
 
1856
 
 
1857
  /* there is a half message in our buffer so try to wait */
 
1858
  if (!self->half_message_in_buffer)
 
1859
    {
 
1860
      if (self->buffer_pos != self->buffer_end)
 
1861
        {
 
1862
          /* we have a full message in our buffer so parse it without reading new data from the transport layer */
 
1863
          return TRUE;
 
1864
        }
 
1865
    }
 
1866
 
 
1867
  /* if there's no pending I/O in the transport layer, then we want to do a read */
 
1868
  if (*cond == 0)
 
1869
    *cond = G_IO_IN;
 
1870
 
 
1871
  return FALSE;
 
1872
}
 
1873
 
 
1874
static LogProtoStatus
 
1875
log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_read)
 
1876
{
 
1877
  gint rc;
 
1878
 
 
1879
  if (self->buffer_pos == self->buffer_end)
 
1880
    self->buffer_pos = self->buffer_end = 0;
 
1881
 
 
1882
  if (self->buffer_size == self->buffer_end)
 
1883
    {
 
1884
      /* no more space in the buffer, we can't fetch further data. Move the
 
1885
       * things we already have to the beginning of the buffer to make
 
1886
       * space. */
 
1887
 
 
1888
      memmove(self->buffer, &self->buffer[self->buffer_pos], self->buffer_end - self->buffer_pos);
 
1889
      self->buffer_end = self->buffer_end - self->buffer_pos;
 
1890
      self->buffer_pos = 0;
 
1891
    }
 
1892
 
 
1893
  if (!(*may_read))
 
1894
    return LPS_SUCCESS;
 
1895
 
 
1896
  rc = log_transport_read(self->super.transport, &self->buffer[self->buffer_end], self->buffer_size - self->buffer_end, NULL);
 
1897
      
 
1898
  if (rc < 0)
 
1899
    {
 
1900
      if (errno != EAGAIN)
 
1901
        {
 
1902
          msg_error("Error reading frame header",
 
1903
                    evt_tag_int("fd", self->super.transport->fd),
 
1904
                    evt_tag_errno("error", errno),
 
1905
                    NULL);
 
1906
          return LPS_ERROR;
 
1907
        }
 
1908
      else
 
1909
        {
 
1910
          /* we need more data to parse this message but the data is not available yet */
 
1911
          self->half_message_in_buffer = TRUE;
 
1912
        }
 
1913
    }
 
1914
  else if (rc == 0)
 
1915
    {
 
1916
      msg_verbose("EOF occurred while reading",
 
1917
                  evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
 
1918
                  NULL);
 
1919
      return LPS_EOF;
 
1920
    }
 
1921
  else 
 
1922
    {
 
1923
      self->buffer_end += rc;
 
1924
    }
 
1925
  return LPS_SUCCESS;
 
1926
  
 
1927
}
 
1928
 
 
1929
static gboolean
 
1930
log_proto_framed_server_extract_frame_length(LogProtoFramedServer *self, gboolean *need_more_data)
 
1931
{
 
1932
  gint i;
 
1933
 
 
1934
  *need_more_data = TRUE;
 
1935
  self->frame_len = 0;
 
1936
  for (i = self->buffer_pos; i < self->buffer_end; i++)
 
1937
    {
 
1938
      if (isdigit(self->buffer[i]) && (i - self->buffer_pos < 10))
 
1939
        {
 
1940
          self->frame_len = self->frame_len * 10 + (self->buffer[i] - '0');
 
1941
        }
 
1942
      else if (self->buffer[i] == ' ')
 
1943
        {
 
1944
          *need_more_data = FALSE;
 
1945
          self->buffer_pos = i + 1;
 
1946
          return TRUE;
 
1947
        }
 
1948
      else
 
1949
        {
 
1950
          msg_error("Invalid frame header", 
 
1951
                    evt_tag_printf("header", "%.*s", (gint) (i - self->buffer_pos), &self->buffer[self->buffer_pos]),
 
1952
                    NULL);
 
1953
          return FALSE;
 
1954
        }
 
1955
    }
 
1956
  /* couldn't extract frame header, no error but need more data */
 
1957
  return TRUE;
 
1958
}
 
1959
 
 
1960
static LogProtoStatus
 
1961
log_proto_framed_server_fetch(LogProto *s, const guchar **msg, gsize *msg_len, GSockAddr **sa, gboolean *may_read)
 
1962
{
 
1963
  LogProtoFramedServer *self = (LogProtoFramedServer *) s;
 
1964
  LogProtoStatus status;
 
1965
  gboolean try_read, need_more_data;
 
1966
 
 
1967
  if (sa)
 
1968
    *sa = NULL;
 
1969
  switch (self->state)
 
1970
    {
 
1971
    case LPFSS_FRAME_READ:
 
1972
 
 
1973
      try_read = TRUE;
 
1974
 
 
1975
    read_frame:
 
1976
      if (!log_proto_framed_server_extract_frame_length(self, &need_more_data))
 
1977
        {
 
1978
          /* invalid frame header */
 
1979
          return LPS_ERROR;
 
1980
        }
 
1981
      if (need_more_data && try_read)
 
1982
        {
 
1983
          status = log_proto_framed_server_fetch_data(self, may_read);
 
1984
          if (status != LPS_SUCCESS)
 
1985
            return status;
 
1986
          try_read = FALSE;
 
1987
          goto read_frame;
 
1988
        }
 
1989
 
 
1990
      if (!need_more_data)
 
1991
        {
 
1992
          self->state = LPFSS_MESSAGE_READ;
 
1993
          if (self->frame_len > self->max_msg_size)
 
1994
            {
 
1995
              msg_error("Incoming frame larger than log_msg_size()",
 
1996
                        evt_tag_int("log_msg_size", self->buffer_size - LPFS_FRAME_BUFFER),
 
1997
                        evt_tag_int("frame_length", self->frame_len),
 
1998
                        NULL);
 
1999
              return LPS_ERROR;
 
2000
            }
 
2001
          if (self->buffer_pos + self->frame_len > self->buffer_size)
 
2002
            {
 
2003
              /* message would be too large to fit into the buffer at
 
2004
               * buffer_pos, we need to move data to the beginning of
 
2005
               * the buffer to make space, and once we are at it, move
 
2006
               * to the beginning to make space for the maximum number
 
2007
               * of messages before the next shift */
 
2008
              memmove(self->buffer, &self->buffer[self->buffer_pos], self->buffer_end - self->buffer_pos);
 
2009
              self->buffer_end = self->buffer_end - self->buffer_pos;
 
2010
              self->buffer_pos = 0;
 
2011
            }
 
2012
          goto read_message;
 
2013
        }
 
2014
      break;
 
2015
    case LPFSS_MESSAGE_READ:
 
2016
 
 
2017
      try_read = TRUE;
 
2018
    read_message:
 
2019
      /* NOTE: here we can assume that the complete message fits into
 
2020
       * the buffer because of the checks/move operation in the 
 
2021
       * LPFSS_FRAME_READ state */
 
2022
      if (self->buffer_end - self->buffer_pos >= self->frame_len)
 
2023
        {
 
2024
          /* ok, we already have the complete message */
 
2025
 
 
2026
          *msg = &self->buffer[self->buffer_pos];
 
2027
          *msg_len = self->frame_len;
 
2028
          self->buffer_pos += self->frame_len;
 
2029
          self->state = LPFSS_FRAME_READ;
 
2030
 
 
2031
          /* we have the full message here so reset the half message flag */
 
2032
          self->half_message_in_buffer = FALSE;
 
2033
          return LPS_SUCCESS;
 
2034
        }
 
2035
      if (try_read)
 
2036
        {
 
2037
          status = log_proto_framed_server_fetch_data(self, may_read);
 
2038
          if (status != LPS_SUCCESS)
 
2039
            return status;
 
2040
          try_read = FALSE;
 
2041
          goto read_message;
 
2042
        }
 
2043
      break;
 
2044
    default:
 
2045
      break;
 
2046
    }
 
2047
  return LPS_SUCCESS;
 
2048
}
 
2049
 
 
2050
static void
 
2051
log_proto_framed_server_free(LogProto *s)
 
2052
{
 
2053
  LogProtoFramedServer *self = (LogProtoFramedServer *) s;
 
2054
  g_free(self->buffer);
 
2055
}
 
2056
 
 
2057
LogProto *
 
2058
log_proto_framed_server_new(LogTransport *transport, gint max_msg_size)
 
2059
{
 
2060
  LogProtoFramedServer *self = g_new0(LogProtoFramedServer, 1);
 
2061
 
 
2062
  self->super.prepare = log_proto_framed_server_prepare;
 
2063
  self->super.fetch = log_proto_framed_server_fetch;
 
2064
  self->super.free_fn = log_proto_framed_server_free;
 
2065
  self->super.transport = transport;
 
2066
  self->super.convert = (GIConv) -1;
 
2067
  /* max message + frame header */
 
2068
  self->max_msg_size = max_msg_size;
 
2069
  self->buffer_size = max_msg_size + LPFS_FRAME_BUFFER;
 
2070
  self->buffer = g_malloc(self->buffer_size);
 
2071
  self->half_message_in_buffer = FALSE;
 
2072
  return &self->super;
 
2073
}
 
2074