2
* Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary
3
* Copyright (c) 1998-2010 Balázs Scheidler
5
* This library is free software; you can redistribute it and/or
6
* modify it under the terms of the GNU Lesser General Public
7
* License as published by the Free Software Foundation; either
8
* version 2.1 of the License, or (at your option) any later version.
10
* This library is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13
* Lesser General Public License for more details.
15
* You should have received a copy of the GNU Lesser General Public
16
* License along with this library; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* As an additional exemption you are allowed to compile & link against the
20
* OpenSSL libraries as published by the OpenSSL project. See the file
21
* COPYING for details.
28
#include "persist-state.h"
39
log_proto_set_encoding(LogProto *self, const gchar *encoding)
41
if (self->convert != (GIConv) -1)
43
g_iconv_close(self->convert);
44
self->convert = (GIConv) -1;
48
g_free(self->encoding);
49
self->encoding = NULL;
52
self->convert = g_iconv_open("utf-8", encoding);
53
if (self->convert == (GIConv) -1)
56
self->encoding = g_strdup(encoding);
61
log_proto_free(LogProto *s)
65
if (s->convert != (GIConv) -1)
66
g_iconv_close(s->convert);
69
log_transport_free(s->transport);
74
typedef struct _LogProtoTextClient
77
gint state, next_state;
79
GDestroyNotify partial_free;
80
gsize partial_len, partial_pos;
84
log_proto_text_client_prepare(LogProto *s, gint *fd, GIOCondition *cond)
86
LogProtoTextClient *self = (LogProtoTextClient *) s;
88
*fd = self->super.transport->fd;
89
*cond = self->super.transport->cond;
91
/* if there's no pending I/O in the transport layer, then we want to do a write */
94
return self->partial != NULL;
98
log_proto_text_client_flush(LogProto *s)
100
LogProtoTextClient *self = (LogProtoTextClient *) s;
103
/* attempt to flush previously buffered data */
106
gint len = self->partial_len - self->partial_pos;
108
rc = log_transport_write(self->super.transport, &self->partial[self->partial_pos], len);
111
if (errno != EAGAIN && errno != EINTR)
113
msg_error("I/O error occurred while writing",
114
evt_tag_int("fd", self->super.transport->fd),
115
evt_tag_errno(EVT_TAG_OSERROR, errno),
123
self->partial_pos += rc;
128
if (self->partial_free)
129
self->partial_free(self->partial);
130
self->partial = NULL;
131
if (self->next_state >= 0)
133
self->state = self->next_state;
134
self->next_state = -1;
136
/* NOTE: we return here to give a chance to the framed protocol to send the frame header. */
143
static LogProtoStatus
144
log_proto_text_client_submit_write(LogProto *s, guchar *msg, gsize msg_len, GDestroyNotify msg_free, gint next_state)
146
LogProtoTextClient *self = (LogProtoTextClient *) s;
148
g_assert(self->partial == NULL);
150
self->partial_len = msg_len;
151
self->partial_pos = 0;
152
self->partial_free = msg_free;
153
self->next_state = next_state;
154
return log_proto_text_client_flush(s);
159
* log_proto_text_client_post:
160
* @msg: formatted log message to send (this might be consumed by this function)
161
* @msg_len: length of @msg
162
* @consumed: pointer to a gboolean that gets set if the message was consumed by this function
163
* @error: error information, if any
165
* This function posts a message to the log transport, performing buffering
166
* of partially sent data if needed. The return value indicates whether we
167
* successfully sent this message, or if it should be resent by the caller.
169
static LogProtoStatus
170
log_proto_text_client_post(LogProto *s, guchar *msg, gsize msg_len, gboolean *consumed)
172
LogProtoTextClient *self = (LogProtoTextClient *) s;
175
/* NOTE: the client does not support charset conversion for now */
176
g_assert(self->super.convert == (GIConv) -1);
178
/* try to flush already buffered data */
180
rc = log_proto_text_client_flush(s);
183
/* log_proto_flush() already logs in the case of an error */
189
/* NOTE: the partial buffer has not been emptied yet even with the
190
* flush above, we shouldn't attempt to write again.
192
* Otherwise: with the framed protocol this could case the frame
193
* header to be split, and interleaved with message payload, as in:
195
* First bytes of frame header || payload || tail of frame header.
197
* This obviously would cause the framing to break. Also libssl
198
* returns an error in this case, which is how this was discovered.
204
return log_proto_text_client_submit_write(s, msg, msg_len, (GDestroyNotify) g_free, -1);
208
log_proto_text_client_new(LogTransport *transport)
210
LogProtoTextClient *self = g_new0(LogProtoTextClient, 1);
212
self->super.prepare = log_proto_text_client_prepare;
213
self->super.flush = log_proto_text_client_flush;
214
self->super.post = log_proto_text_client_post;
215
self->super.transport = transport;
216
self->super.convert = (GIConv) -1;
217
self->next_state = -1;
221
typedef struct _LogProtoFileWriter
225
gsize partial_len, partial_pos;
231
struct iovec buffer[0];
232
} LogProtoFileWriter;
235
* log_proto_file_writer_flush:
237
* this function flushes the file output buffer
238
* it is called either form log_proto_file_writer_post (normal mode: the buffer is full)
239
* or from log_proto_flush (foced flush: flush time, exit, etc)
242
static LogProtoStatus
243
log_proto_file_writer_flush(LogProto *s)
245
LogProtoFileWriter *self = (LogProtoFileWriter *)s;
246
gint rc, i, i0, sum, ofs, pos;
248
/* we might be called from log_writer_deinit() without having a buffer at all */
250
if (self->buf_count == 0)
253
/* lseek() is used instead of O_APPEND, as on NFS O_APPEND performs
254
* poorly, as reported on the mailing list 2008/05/29 */
256
lseek(self->fd, 0, SEEK_END);
257
rc = writev(self->fd, self->buffer, self->buf_count);
258
if (rc > 0 && self->fsync)
263
if (errno != EAGAIN && errno != EINTR)
265
msg_error("I/O error occurred while writing",
266
evt_tag_int("fd", self->super.transport->fd),
267
evt_tag_errno(EVT_TAG_OSERROR, errno),
274
else if (rc != self->sum_len)
276
/* partial success: not everything has been written out */
277
/* look for the first chunk that has been cut */
278
sum = self->buffer[0].iov_len; /* sum is the cumulated length of the already processed items */
281
sum += self->buffer[++i].iov_len;
282
self->partial_len = sum - rc; /* this is the length of the first non-written chunk */
285
/* add the lengths of the following messages */
286
while (i < self->buf_count)
287
self->partial_len += self->buffer[i++].iov_len;
288
/* allocate and copy the remaning data */
289
self->partial = (guchar *)g_malloc(self->partial_len);
290
ofs = sum - rc; /* the length of the remaning (not processed) chunk in the first message */
291
pos = self->buffer[i0].iov_len - ofs;
292
memcpy(self->partial, self->buffer[i0].iov_base + pos, ofs);
294
while (i < self->buf_count)
296
memcpy(self->partial + ofs, self->buffer[i].iov_base, self->buffer[i].iov_len);
297
ofs += self->buffer[i].iov_len;
300
self->partial_pos = 0;
303
/* free the previous message strings (the remaning part has been copied to the partial buffer) */
304
for (i = 0; i < self->buf_count; ++i)
305
g_free(self->buffer[i].iov_base);
313
* log_proto_file_writer_post:
314
* @msg: formatted log message to send (this might be consumed by this function)
315
* @msg_len: length of @msg
316
* @consumed: pointer to a gboolean that gets set if the message was consumed by this function
317
* @error: error information, if any
319
* This function posts a message to the log transport, performing buffering
320
* of partially sent data if needed. The return value indicates whether we
321
* successfully sent this message, or if it should be resent by the caller.
323
static LogProtoStatus
324
log_proto_file_writer_post(LogProto *s, guchar *msg, gsize msg_len, gboolean *consumed)
326
LogProtoFileWriter *self = (LogProtoFileWriter *)s;
329
if (self->buf_count >= self->buf_size)
331
rc = log_proto_file_writer_flush(s);
332
if (rc != LPS_SUCCESS || self->buf_count >= self->buf_size)
334
/* don't consume a new message if flush failed, or even after the flush we don't have any free slots */
342
/* there is still some data from the previous file writing process */
343
gint len = self->partial_len - self->partial_pos;
345
rc = write(self->fd, self->partial + self->partial_pos, len);
346
if (rc > 0 && self->fsync)
354
self->partial_pos += rc;
359
g_free(self->partial);
360
self->partial = NULL;
361
/* NOTE: we return here to give a chance to the framed protocol to send the frame header. */
366
/* register the new message */
367
self->buffer[self->buf_count].iov_base = (void *) msg;
368
self->buffer[self->buf_count].iov_len = msg_len;
370
self->sum_len += msg_len;
373
if (self->buf_count == self->buf_size)
375
/* we have reached the max buffer size -> we need to write the messages */
376
return log_proto_file_writer_flush(s);
382
if (errno != EAGAIN && errno != EINTR)
384
msg_error("I/O error occurred while writing",
385
evt_tag_int("fd", self->super.transport->fd),
386
evt_tag_errno(EVT_TAG_OSERROR, errno),
395
log_proto_file_writer_prepare(LogProto *s, gint *fd, GIOCondition *cond)
397
LogProtoFileWriter *self = (LogProtoFileWriter *) s;
399
*fd = self->super.transport->fd;
400
*cond = self->super.transport->cond;
402
/* if there's no pending I/O in the transport layer, then we want to do a write */
405
return self->buf_count > 0 || self->partial;
409
log_proto_file_writer_new(LogTransport *transport, gint flush_lines, gboolean fsync)
411
if (flush_lines == 0)
412
/* the flush-lines option has not been specified, use a default value */
415
if (flush_lines > IOV_MAX)
416
/* limit the flush_lines according to the current platform */
417
flush_lines = IOV_MAX;
420
/* allocate the structure with the proper number of items at the end */
421
LogProtoFileWriter *self = (LogProtoFileWriter *)g_malloc0(sizeof(LogProtoFileWriter) + sizeof(struct iovec)*flush_lines);
423
self->fd = transport->fd;
424
self->buf_size = flush_lines;
426
self->super.prepare = log_proto_file_writer_prepare;
427
self->super.post = log_proto_file_writer_post;
428
self->super.flush = log_proto_file_writer_flush;
429
self->super.transport = transport;
430
self->super.convert = (GIConv) -1;
436
typedef struct _LogProtoBufferedServerState
438
/* NOTE: that if you add/remove structure members you have to update
439
* the byte order swap code in LogProtoFileReader for mulit-byte
444
/* this indicates that the members in the struct are stored in
445
* big-endian byte order. if the byte ordering of the struct doesn't
446
* match the current CPU byte ordering, then the members are
447
* byte-swapped when the state is loaded.
450
guint8 raw_buffer_leftover_size;
451
guint8 __padding1[1];
453
guint32 pending_buffer_end;
455
guint32 buffer_cached_eol;
456
guint32 pending_buffer_pos;
458
/* the stream position where we converted out current buffer from (offset in file) */
459
gint64 raw_stream_pos;
460
gint64 pending_raw_stream_pos;
461
/* the size of raw data (measured in bytes) that got converted from raw_stream_pos into our buffer */
462
gint32 raw_buffer_size;
463
gint32 pending_raw_buffer_size;
464
guchar raw_buffer_leftover[8];
468
} LogProtoBufferedServerState;
470
typedef struct _LogProtoBufferedServer LogProtoBufferedServer;
471
struct _LogProtoBufferedServer
474
gboolean (*fetch_from_buf)(LogProtoBufferedServer *self, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest);
475
gint (*read_data)(LogProtoBufferedServer *self, guchar *buf, gsize len, GSockAddr **sa);
477
LogProtoBufferedServerState *state1;
478
PersistState *persist_state;
479
PersistEntryHandle persist_handle;
481
gint max_buffer_size;
482
gint init_buffer_size;
484
GSockAddr *prev_saddr;
489
* log_proto_get_fixed_encoding_scale:
491
* This function returns the number of bytes of a single character in the
492
* encoding specified by the @encoding parameter, provided it is listed in
493
* the limited set hard-wired in the fixed_encodings array above.
495
* syslog-ng sometimes needs to calculate the size of the original, raw data
496
* that relates to its already utf8 converted input buffer. For that the
497
* slow solution is to actually perform the utf8 -> raw conversion, however
498
* since we don't really need the actual conversion, just the size of the
499
* data in bytes we can be faster than that by multiplying the number of
500
* input characters with the size of the character in the known
501
* fixed-length-encodings in the list above.
503
* This function returns 0 if the encoding is not known, in which case the
504
* slow path is to be executed.
507
log_proto_get_char_size_for_fixed_encoding(const gchar *encoding)
513
} fixed_encodings[] = {
526
{ "wchar_t", sizeof(wchar_t) },
532
for (i = 0; fixed_encodings[i].prefix; i++)
534
if (strncasecmp(encoding, fixed_encodings[i].prefix, strlen(fixed_encodings[i].prefix)) == 0)
536
scale = fixed_encodings[i].scale;
543
static LogProtoBufferedServerState *
544
log_proto_buffered_server_get_state(LogProtoBufferedServer *self)
546
if (self->persist_state)
548
g_assert(self->persist_handle != 0);
549
return persist_state_map_entry(self->persist_state, self->persist_handle);
551
if (G_UNLIKELY(!self->state1))
553
self->state1 = g_new0(LogProtoBufferedServerState, 1);
559
log_proto_buffered_server_put_state(LogProtoBufferedServer *self)
561
if (self->persist_state && self->persist_handle)
562
persist_state_unmap_entry(self->persist_state, self->persist_handle);
566
log_proto_buffered_server_convert_from_raw(LogProtoBufferedServer *self, const guchar *raw_buffer, gsize raw_buffer_len)
568
/* some data was read */
569
gsize avail_in = raw_buffer_len;
573
gboolean success = FALSE;
574
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
578
avail_out = state->buffer_size - state->pending_buffer_end;
579
out = (gchar *) self->buffer + state->pending_buffer_end;
581
ret = g_iconv(self->super.convert, (gchar **) &raw_buffer, &avail_in, (gchar **) &out, &avail_out);
582
if (ret == (gsize) -1)
587
if ((self->super.flags & LPBS_RECORD) == 0)
589
/* Incomplete text, do not report an error, rather try to read again */
590
state->pending_buffer_end = state->buffer_size - avail_out;
594
if (avail_in > sizeof(state->raw_buffer_leftover))
596
msg_error("Invalid byte sequence, the remaining raw buffer is larger than the supported leftover size",
597
evt_tag_str("encoding", self->super.encoding),
598
evt_tag_int("avail_in", avail_in),
599
evt_tag_int("leftover_size", sizeof(state->raw_buffer_leftover)),
603
memcpy(state->raw_buffer_leftover, raw_buffer, avail_in);
604
state->raw_buffer_leftover_size = avail_in;
605
state->raw_buffer_size -= avail_in;
606
msg_trace("Leftover characters remained after conversion, delaying message until another chunk arrives",
607
evt_tag_str("encoding", self->super.encoding),
608
evt_tag_int("avail_in", avail_in),
615
msg_error("Byte sequence too short, cannot convert an individual frame in its entirety",
616
evt_tag_str("encoding", self->super.encoding),
617
evt_tag_int("avail_in", avail_in),
623
state->pending_buffer_end = state->buffer_size - avail_out;
624
/* extend the buffer */
626
if (state->buffer_size < self->max_buffer_size)
628
state->buffer_size *= 2;
629
if (state->buffer_size > self->max_buffer_size)
630
state->buffer_size = self->max_buffer_size;
632
self->buffer = g_realloc(self->buffer, state->buffer_size);
634
/* recalculate the out pointer, and add what we have now */
639
msg_error("Incoming byte stream requires a too large conversion buffer, probably invalid character sequence",
640
evt_tag_str("encoding", self->super.encoding),
641
evt_tag_printf("buffer", "%.*s", (gint) state->pending_buffer_end, self->buffer),
648
msg_notice("Invalid byte sequence or other error while converting input, skipping character",
649
evt_tag_str("encoding", self->super.encoding),
650
evt_tag_printf("char", "0x%02x", *(guchar *) raw_buffer),
657
state->pending_buffer_end = state->buffer_size - avail_out;
660
while (avail_in > 0);
665
log_proto_buffered_server_put_state(self);
670
log_proto_buffered_server_apply_state(LogProtoBufferedServer *self, PersistEntryHandle handle, const gchar *persist_name)
674
LogProtoBufferedServerState *state;
677
fd = self->super.transport->fd;
678
self->persist_handle = handle;
680
if (fstat(fd, &st) < 0)
683
state = log_proto_buffered_server_get_state(self);
687
self->buffer = g_malloc(state->buffer_size);
689
state->pending_buffer_end = 0;
691
if (state->file_inode &&
692
state->file_inode == st.st_ino &&
693
state->file_size <= st.st_size &&
694
state->raw_stream_pos <= st.st_size)
696
ofs = state->raw_stream_pos;
698
lseek(fd, ofs, SEEK_SET);
702
if (state->file_inode)
704
/* the stored state does not match the current file */
705
msg_notice("The current log file has a mismatching size/inode information, restarting from the beginning",
706
evt_tag_str("state", persist_name),
707
evt_tag_int("stored_inode", state->file_inode),
708
evt_tag_int("cur_file_inode", st.st_ino),
709
evt_tag_int("stored_size", state->file_size),
710
evt_tag_int("cur_file_size", st.st_size),
711
evt_tag_int("raw_stream_pos", state->raw_stream_pos),
716
if (state->raw_buffer_size)
721
if (!self->super.encoding)
723
/* no conversion, we read directly into our buffer */
724
if (state->raw_buffer_size > state->buffer_size)
726
msg_notice("Invalid LogProtoBufferedServerState.raw_buffer_size, larger than buffer_size and no encoding is set, restarting from the beginning",
727
evt_tag_str("state", persist_name),
728
evt_tag_int("raw_buffer_size", state->raw_buffer_size),
729
evt_tag_int("buffer_size", state->buffer_size),
730
evt_tag_int("init_buffer_size", self->init_buffer_size),
734
raw_buffer = self->buffer;
738
if (state->raw_buffer_size > self->max_buffer_size)
740
msg_notice("Invalid LogProtoBufferedServerState.raw_buffer_size, larger than max_buffer_size, restarting from the beginning",
741
evt_tag_str("state", persist_name),
742
evt_tag_int("raw_buffer_size", state->raw_buffer_size),
743
evt_tag_int("init_buffer_size", self->init_buffer_size),
744
evt_tag_int("max_buffer_size", self->max_buffer_size),
748
raw_buffer = g_alloca(state->raw_buffer_size);
751
rc = log_transport_read(self->super.transport, raw_buffer, state->raw_buffer_size, NULL);
752
if (rc != state->raw_buffer_size)
754
msg_notice("Error re-reading buffer contents of the file to be continued, restarting from the beginning",
755
evt_tag_str("state", persist_name),
760
state->pending_buffer_end = 0;
761
if (self->super.encoding)
763
if (!log_proto_buffered_server_convert_from_raw(self, raw_buffer, rc))
765
msg_notice("Error re-converting buffer contents of the file to be continued, restarting from the beginning",
766
evt_tag_str("state", persist_name),
773
state->pending_buffer_end += rc;
776
if (state->buffer_pos > state->pending_buffer_end ||
777
state->buffer_cached_eol > state->pending_buffer_end)
779
msg_notice("Converted buffer contents is smaller than the current buffer position, starting from the beginning of the buffer, some lines may be duplicated",
780
evt_tag_str("state", persist_name),
782
state->buffer_pos = state->pending_buffer_pos = state->buffer_cached_eol = 0;
787
/* although we do have buffer position information, but the
788
* complete contents of the buffer is already processed, instead
789
* of reading and then dropping it, position the file after the
792
state->raw_stream_pos += state->raw_buffer_size;
793
ofs = state->raw_stream_pos;
794
state->raw_buffer_size = 0;
795
state->buffer_pos = state->pending_buffer_end = 0;
797
lseek(fd, state->raw_stream_pos, SEEK_SET);
803
state->buffer_pos = 0;
804
state->pending_buffer_end = 0;
805
state->buffer_cached_eol = 0;
806
state->raw_stream_pos = 0;
807
state->raw_buffer_size = 0;
808
state->raw_buffer_leftover_size = 0;
809
lseek(fd, 0, SEEK_SET);
812
state->file_inode = st.st_ino;
813
state->file_size = st.st_size;
814
state->raw_stream_pos = ofs;
815
state->pending_buffer_pos = state->buffer_pos;
816
state->pending_raw_stream_pos = state->raw_stream_pos;
817
state->pending_raw_buffer_size = state->raw_buffer_size;
820
log_proto_buffered_server_put_state(self);
823
static PersistEntryHandle
824
log_proto_buffered_server_alloc_state(LogProtoBufferedServer *self, PersistState *persist_state, const gchar *persist_name)
826
LogProtoBufferedServerState *state;
827
PersistEntryHandle handle;
829
handle = persist_state_alloc_entry(persist_state, persist_name, sizeof(LogProtoBufferedServerState));
832
state = persist_state_map_entry(persist_state, handle);
835
state->big_endian = (G_BYTE_ORDER == G_BIG_ENDIAN);
837
persist_state_unmap_entry(persist_state, handle);
844
log_proto_buffered_server_convert_state(LogProtoBufferedServer *self, guint8 persist_version, gpointer old_state, gsize old_state_size, LogProtoBufferedServerState *state)
846
if (persist_version <= 2)
849
state->file_inode = 0;
850
state->raw_stream_pos = strtoll((gchar *) old_state, NULL, 10);
851
state->file_size = 0;
855
else if (persist_version == 3)
857
SerializeArchive *archive;
869
archive = serialize_buffer_archive_new(old_state, old_state_size);
871
/* NOTE: the v23 conversion code adds an extra length field which we
872
* need to read out. */
873
g_assert(serialize_read_uint32(archive, &read_length) && read_length == old_state_size - sizeof(read_length));
875
/* original v3 format starts here */
876
if (!serialize_read_uint16(archive, &version) || version != 0)
878
msg_error("Internal error restoring log reader state, stored data has incorrect version",
879
evt_tag_int("version", version));
880
goto error_converting_v3;
883
if (!serialize_read_uint64(archive, (guint64 *) &cur_pos) ||
884
!serialize_read_uint64(archive, (guint64 *) &cur_inode) ||
885
!serialize_read_uint64(archive, (guint64 *) &cur_size))
887
msg_error("Internal error restoring information about the current file position, restarting from the beginning",
889
goto error_converting_v3;
892
if (!serialize_read_uint16(archive, &version) || version != 0)
894
msg_error("Internal error, protocol state has incorrect version",
895
evt_tag_int("version", version),
897
goto error_converting_v3;
900
if (!serialize_read_cstring(archive, &buffer, &buffer_len))
902
msg_error("Internal error, error reading buffer contents",
903
evt_tag_int("version", version),
905
goto error_converting_v3;
908
if (!self->buffer || state->buffer_size < buffer_len)
910
gsize buffer_size = MAX(self->init_buffer_size, buffer_len);
911
self->buffer = g_realloc(self->buffer, buffer_size);
913
serialize_archive_free(archive);
915
memcpy(self->buffer, buffer, buffer_len);
916
state->buffer_pos = 0;
917
state->pending_buffer_end = buffer_len;
921
state->file_inode = cur_inode;
922
state->raw_stream_pos = cur_pos;
923
state->file_size = cur_size;
926
serialize_archive_free(archive);
932
log_proto_buffered_server_restart_with_state(LogProto *s, PersistState *persist_state, const gchar *persist_name)
934
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
935
guint8 persist_version;
936
PersistEntryHandle old_state_handle;
938
gsize old_state_size;
939
PersistEntryHandle new_state_handle = 0;
940
gpointer new_state = NULL;
943
self->persist_state = persist_state;
944
old_state_handle = persist_state_lookup_entry(persist_state, persist_name, &old_state_size, &persist_version);
945
if (!old_state_handle)
947
new_state_handle = log_proto_buffered_server_alloc_state(self, persist_state, persist_name);
948
if (!new_state_handle)
949
goto fallback_non_persistent;
950
log_proto_buffered_server_apply_state(self, new_state_handle, persist_name);
953
if (persist_version < 4)
955
new_state_handle = log_proto_buffered_server_alloc_state(self, persist_state, persist_name);
956
if (!new_state_handle)
957
goto fallback_non_persistent;
959
old_state = persist_state_map_entry(persist_state, old_state_handle);
960
new_state = persist_state_map_entry(persist_state, new_state_handle);
961
success = log_proto_buffered_server_convert_state(self, persist_version, old_state, old_state_size, new_state);
962
persist_state_unmap_entry(persist_state, old_state_handle);
963
persist_state_unmap_entry(persist_state, new_state_handle);
965
/* we're using the newly allocated state structure regardless if
966
* conversion succeeded. If the conversion went wrong then
967
* new_state is still in its freshly initialized form since the
968
* convert function will not touch the state in the error
972
log_proto_buffered_server_apply_state(self, new_state_handle, persist_name);
975
else if (persist_version == 4)
977
LogProtoBufferedServerState *state;
979
old_state = persist_state_map_entry(persist_state, old_state_handle);
981
if ((state->big_endian && G_BYTE_ORDER == G_LITTLE_ENDIAN) ||
982
(!state->big_endian && G_BYTE_ORDER == G_BIG_ENDIAN))
985
/* byte order conversion in order to avoid the hassle with
986
scattered byte order conversions in the code */
988
state->big_endian = !state->big_endian;
989
state->buffer_pos = GUINT32_SWAP_LE_BE(state->buffer_pos);
990
state->pending_buffer_pos = GUINT32_SWAP_LE_BE(state->pending_buffer_pos);
991
state->pending_buffer_end = GUINT32_SWAP_LE_BE(state->pending_buffer_end);
992
state->buffer_size = GUINT32_SWAP_LE_BE(state->buffer_size);
993
state->buffer_cached_eol = GUINT32_SWAP_LE_BE(state->buffer_cached_eol);
994
state->raw_stream_pos = GUINT64_SWAP_LE_BE(state->raw_stream_pos);
995
state->raw_buffer_size = GUINT32_SWAP_LE_BE(state->raw_buffer_size);
996
state->pending_raw_stream_pos = GUINT64_SWAP_LE_BE(state->pending_raw_stream_pos);
997
state->pending_raw_buffer_size = GUINT32_SWAP_LE_BE(state->pending_raw_buffer_size);
998
state->file_size = GUINT64_SWAP_LE_BE(state->file_size);
999
state->file_inode = GUINT64_SWAP_LE_BE(state->file_inode);
1002
if (state->version > 0)
1004
msg_error("Internal error restoring log reader state, stored data is too new",
1005
evt_tag_int("version", state->version));
1008
persist_state_unmap_entry(persist_state, old_state_handle);
1009
log_proto_buffered_server_apply_state(self, old_state_handle, persist_name);
1014
msg_error("Internal error restoring log reader state, stored data is too new",
1015
evt_tag_int("version", persist_version));
1019
fallback_non_persistent:
1020
new_state = g_new0(LogProtoBufferedServerState, 1);
1024
new_state_handle = log_proto_buffered_server_alloc_state(self, persist_state, persist_name);
1025
if (!new_state_handle)
1026
goto fallback_non_persistent;
1027
new_state = persist_state_map_entry(persist_state, new_state_handle);
1031
LogProtoBufferedServerState *state = new_state;
1033
/* error happened, restart the file from the beginning */
1034
state->raw_stream_pos = 0;
1035
state->file_inode = 0;
1036
state->file_size = 0;
1037
if (new_state_handle)
1038
log_proto_buffered_server_apply_state(self, new_state_handle, persist_name);
1040
self->state1 = new_state;
1042
if (new_state_handle)
1044
persist_state_unmap_entry(persist_state, new_state_handle);
1050
log_proto_buffered_server_prepare(LogProto *s, gint *fd, GIOCondition *cond)
1052
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
1054
*fd = self->super.transport->fd;
1055
*cond = self->super.transport->cond;
1057
/* if there's no pending I/O in the transport layer, then we want to do a read */
1066
log_proto_buffered_server_read_data(LogProtoBufferedServer *self, guchar *buf, gsize len, GSockAddr **sa)
1068
return log_transport_read(self->super.transport, buf, len, sa);
1071
static LogProtoStatus
1072
log_proto_buffered_server_fetch_from_buf(LogProtoBufferedServer *self, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
1075
const guchar *buffer_start;
1076
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
1077
gboolean success = FALSE;
1079
buffer_start = self->buffer + state->pending_buffer_pos;
1080
buffer_bytes = state->pending_buffer_end - state->pending_buffer_pos;
1082
if (buffer_bytes == 0)
1084
/* if buffer_bytes is zero bytes, it means that we completely
1085
* processed our buffer without having a fraction of a line still
1086
* there. It is important to reset
1087
* pending_buffer_pos/pending_buffer_end to zero as the caller assumes
1088
* that if we return no message from the buffer, then buffer_pos is
1092
if (G_UNLIKELY(self->super.flags & LPBS_POS_TRACKING))
1094
state->pending_raw_stream_pos += state->pending_raw_buffer_size;
1095
state->pending_raw_buffer_size = 0;
1097
state->pending_buffer_pos = state->pending_buffer_end = 0;
1101
success = self->fetch_from_buf(self, buffer_start, buffer_bytes, msg, msg_len, flush_the_rest);
1103
log_proto_buffered_server_put_state(self);
1108
* Returns: TRUE to indicate success, FALSE otherwise. The returned
1109
* msg can be NULL even if no failure occurred.
1111
static LogProtoStatus
1112
log_proto_buffered_server_fetch(LogProto *s, const guchar **msg, gsize *msg_len, GSockAddr **sa, gboolean *may_read)
1114
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
1116
guchar *raw_buffer = NULL;
1117
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
1118
LogProtoStatus result = self->super.status;
1120
if (G_UNLIKELY(!self->buffer))
1122
self->buffer = g_malloc(self->init_buffer_size);
1123
state->buffer_size = self->init_buffer_size;
1129
if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, FALSE))
1131
if (sa && self->prev_saddr)
1132
*sa = g_sockaddr_ref(self->prev_saddr);
1136
/* ok, no more messages in the buffer, read a chunk */
1141
if (self->super.flags & LPBS_NOMREAD)
1144
/* read the next chunk to be processed */
1146
if (self->prev_saddr)
1148
/* new chunk of data, potentially new sockaddr, forget the previous value */
1149
g_sockaddr_unref(self->prev_saddr);
1150
self->prev_saddr = NULL;
1153
if (!self->super.encoding)
1155
/* no conversion, we read directly into our buffer */
1156
raw_buffer = self->buffer + state->pending_buffer_end;
1157
avail = state->buffer_size - state->pending_buffer_end;
1161
/* if conversion is needed, we first read into an on-stack
1162
* buffer, and then convert it into our internal buffer */
1164
raw_buffer = g_alloca(self->init_buffer_size + state->raw_buffer_leftover_size);
1165
memcpy(raw_buffer, state->raw_buffer_leftover, state->raw_buffer_leftover_size);
1166
avail = self->init_buffer_size;
1169
rc = self->read_data(self, raw_buffer + state->raw_buffer_leftover_size, avail, sa);
1171
self->prev_saddr = *sa;
1174
if (errno == EAGAIN)
1176
/* ok we don't have any more data to read, return to main poll loop */
1181
/* an error occurred while reading */
1182
msg_error("I/O error occurred while reading",
1183
evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
1184
evt_tag_errno(EVT_TAG_OSERROR, errno),
1187
/* we set self->status explicitly as we want to return
1188
* LPS_ERROR on the _next_ invocation, not now */
1189
self->super.status = LPS_ERROR;
1190
if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, TRUE))
1192
if (sa && self->prev_saddr)
1193
*sa = g_sockaddr_ref(self->prev_saddr);
1196
result = self->super.status;
1202
if ((self->super.flags & LPBS_IGNORE_EOF) == 0)
1205
msg_verbose("EOF occurred while reading",
1206
evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
1208
if (state->raw_buffer_leftover_size > 0)
1210
msg_error("EOF read on a channel with leftovers from previous character conversion, dropping input",
1215
self->super.status = LPS_EOF;
1216
if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, TRUE))
1218
if (sa && self->prev_saddr)
1219
*sa = g_sockaddr_ref(self->prev_saddr);
1222
result = self->super.status;
1234
state->pending_raw_buffer_size += rc;
1235
rc += state->raw_buffer_leftover_size;
1236
state->raw_buffer_leftover_size = 0;
1238
if (self->super.encoding)
1240
if (!log_proto_buffered_server_convert_from_raw(self, raw_buffer, rc))
1248
state->pending_buffer_end += rc;
1251
if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, FALSE))
1253
if (sa && self->prev_saddr)
1254
*sa = g_sockaddr_ref(self->prev_saddr);
1261
/* result contains our result, but once an error happens, the error condition remains persistent */
1262
log_proto_buffered_server_put_state(self);
1263
if (result != LPS_SUCCESS)
1264
self->super.status = result;
1269
log_proto_buffered_server_queued(LogProto *s)
1271
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
1272
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
1274
/* NOTE: we modify the current file position _after_ updating
1275
buffer_pos, since if we crash right here, at least we
1276
won't lose data on the next restart, but rather we
1277
duplicate some data */
1279
state->buffer_pos = state->pending_buffer_pos;
1280
state->raw_stream_pos = state->pending_raw_stream_pos;
1281
state->raw_buffer_size = state->pending_raw_buffer_size;
1282
if (state->pending_buffer_pos == state->pending_buffer_end)
1284
state->pending_buffer_end = 0;
1285
state->buffer_pos = state->pending_buffer_pos = 0;
1287
if (self->super.flags & LPBS_POS_TRACKING)
1289
if (state->buffer_pos == state->pending_buffer_end)
1291
state->raw_stream_pos += state->raw_buffer_size;
1292
state->raw_buffer_size = 0;
1295
msg_trace("Last message got confirmed",
1296
evt_tag_int("raw_stream_pos", state->raw_stream_pos),
1297
evt_tag_int("raw_buffer_len", state->raw_buffer_size),
1298
evt_tag_int("buffer_pos", state->buffer_pos),
1299
evt_tag_int("buffer_end", state->pending_buffer_end),
1300
evt_tag_int("buffer_cached_eol", state->buffer_cached_eol),
1302
log_proto_buffered_server_put_state(self);
1306
log_proto_buffered_server_free(LogProto *s)
1308
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
1310
g_sockaddr_unref(self->prev_saddr);
1312
g_free(self->buffer);
1315
g_free(self->state1);
1320
log_proto_buffered_server_init(LogProtoBufferedServer *self, LogTransport *transport, gint max_buffer_size, gint init_buffer_size, guint flags)
1322
self->super.prepare = log_proto_buffered_server_prepare;
1323
self->super.fetch = log_proto_buffered_server_fetch;
1324
self->super.queued = log_proto_buffered_server_queued;
1325
self->super.free_fn = log_proto_buffered_server_free;
1326
self->super.transport = transport;
1327
self->super.convert = (GIConv) -1;
1328
self->super.restart_with_state = log_proto_buffered_server_restart_with_state;
1329
self->read_data = log_proto_buffered_server_read_data;
1331
self->super.flags = flags;
1333
self->init_buffer_size = init_buffer_size;
1334
self->max_buffer_size = max_buffer_size;
1337
struct _LogProtoTextServer
1339
LogProtoBufferedServer super;
1340
GIConv reverse_convert;
1341
gchar *reverse_buffer;
1342
gsize reverse_buffer_len;
1347
* This function is called in cases when several files are continously
1348
* polled for changes. Whenever the caller would like to switch to another
1349
* file, it will call this function to check whether it should be allowed to do so.
1351
* This function returns true if the current state of this LogProto would
1352
* allow preemption, e.g. the contents of the current buffer can be
1356
log_proto_text_server_is_preemptable(LogProtoTextServer *self)
1358
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
1359
gboolean preemptable;
1361
preemptable = (state->buffer_cached_eol == 0);
1362
log_proto_buffered_server_put_state(&self->super);
1367
log_proto_text_server_prepare(LogProto *s, gint *fd, GIOCondition *cond)
1369
LogProtoTextServer *self = (LogProtoTextServer *) s;
1370
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
1373
if (log_proto_buffered_server_prepare(s, fd, cond))
1375
log_proto_buffered_server_put_state(&self->super);
1379
avail = (state->buffer_cached_eol != 0);
1380
log_proto_buffered_server_put_state(&self->super);
1385
* Find the character terminating the buffer.
1387
* NOTE: when looking for the end-of-message here, it either needs to be
1388
* terminated via NUL or via NL, when terminating via NL we have to make
1389
* sure that there's no NUL left in the message. This function iterates over
1390
* the input data and returns a pointer to the first occurence of NL or NUL.
1392
* It uses an algorithm similar to what there's in libc memchr/strchr.
1394
* NOTE: find_eom is not static as it is used by a unit test program.
1397
find_eom(const guchar *s, gsize n)
1399
const guchar *char_ptr;
1400
const gulong *longword_ptr;
1401
gulong longword, magic_bits, charmask;
1406
/* align input to long boundary */
1407
for (char_ptr = s; n > 0 && ((gulong) char_ptr & (sizeof(longword) - 1)) != 0; ++char_ptr, n--)
1409
if (*char_ptr == c || *char_ptr == '\0')
1413
longword_ptr = (gulong *) char_ptr;
1415
#if GLIB_SIZEOF_LONG == 8
1416
magic_bits = 0x7efefefefefefeffL;
1417
#elif GLIB_SIZEOF_LONG == 4
1418
magic_bits = 0x7efefeffL;
1420
#error "unknown architecture"
1422
memset(&charmask, c, sizeof(charmask));
1424
while (n > sizeof(longword))
1426
longword = *longword_ptr++;
1427
if ((((longword + magic_bits) ^ ~longword) & ~magic_bits) != 0 ||
1428
((((longword ^ charmask) + magic_bits) ^ ~(longword ^ charmask)) & ~magic_bits) != 0)
1432
char_ptr = (const guchar *) (longword_ptr - 1);
1434
for (i = 0; i < sizeof(longword); i++)
1436
if (*char_ptr == c || *char_ptr == '\0')
1441
n -= sizeof(longword);
1444
char_ptr = (const guchar *) longword_ptr;
1448
if (*char_ptr == c || *char_ptr == '\0')
1457
* returns the number of bytes that represent the UTF8 encoding buffer
1458
* in the original encoding that the user specified.
1460
* NOTE: this is slow, but we only call this for the remainder of our
1461
* buffer (e.g. the partial line at the end of our last chunk of read
1462
* data). Also, this is only invoked if the file uses an encoding.
1465
log_proto_text_server_get_raw_size_of_buffer(LogProtoTextServer *self, const guchar *buffer, gsize buffer_len)
1469
gsize avail_out, avail_in;
1472
if (self->reverse_convert == ((GIConv) -1) && !self->convert_scale)
1474
/* try to speed up raw size calculation by recognizing the most
1475
* prominent character encodings and in the case the encoding
1476
* uses fixed size characters set that in self->convert_scale,
1477
* which in turn will speed up the reversal of the UTF8 buffer
1478
* size to raw buffer sizes.
1480
self->convert_scale = log_proto_get_char_size_for_fixed_encoding(self->super.super.encoding);
1481
if (self->convert_scale == 0)
1483
/* this encoding is not known, do the conversion for real :( */
1484
self->reverse_convert = g_iconv_open(self->super.super.encoding, "utf-8");
1488
if (self->convert_scale)
1489
return g_utf8_strlen((gchar *) buffer, buffer_len) * self->convert_scale;
1491
if (self->reverse_buffer_len < buffer_len * 6)
1493
/* we free and malloc, since we never need the data still in reverse buffer */
1494
g_free(self->reverse_buffer);
1495
self->reverse_buffer_len = buffer_len * 6;
1496
self->reverse_buffer = g_malloc(buffer_len * 6);
1499
avail_out = self->reverse_buffer_len;
1500
out = self->reverse_buffer;
1502
avail_in = buffer_len;
1505
ret = g_iconv(self->reverse_convert, (gchar **) &in, &avail_in, &out, &avail_out);
1506
if (ret == (gsize) -1)
1508
/* oops, we cannot reverse that we ourselves converted to UTF-8,
1509
* this is simply impossible, but never say never */
1510
msg_error("Internal error, couldn't reverse the internal UTF8 string to the original encoding",
1511
evt_tag_printf("buffer", "%.*s", (gint) buffer_len, buffer),
1517
return self->reverse_buffer_len - avail_out;
1523
* log_proto_text_server_fetch_from_buf:
1524
* @self: LogReader instance
1525
* @saddr: socket address to be assigned to new messages (consumed!)
1526
* @flush: whether to flush the input buffer
1527
* @msg_counter: the number of messages processed in the current poll iteration
1529
* Returns TRUE if a message was found in the buffer, FALSE if we need to read again.
1532
log_proto_text_server_fetch_from_buf(LogProtoBufferedServer *s, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
1534
LogProtoTextServer *self = (LogProtoTextServer *) s;
1536
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
1537
gboolean result = FALSE;
1542
* we are set to packet terminating mode or the connection is to
1543
* be teared down and we have partial data in our buffer.
1545
*msg = buffer_start;
1546
*msg_len = buffer_bytes;
1547
state->pending_buffer_pos = state->pending_buffer_end;
1551
if (state->buffer_cached_eol)
1553
/* previous invocation was nice enough to save a cached EOL
1554
* pointer, no need to look it up again */
1556
eol = self->super.buffer + state->buffer_cached_eol;
1557
state->buffer_cached_eol = 0;
1561
eol = find_eom(buffer_start, buffer_bytes);
1563
if ((!eol && (buffer_bytes == state->buffer_size)))
1565
/* our buffer is full and no EOL was found */
1566
*msg_len = buffer_bytes;
1567
state->pending_buffer_pos = state->pending_buffer_end;
1568
*msg = buffer_start;
1573
gsize raw_split_size;
1575
/* buffer is not full, but no EOL is present, move partial line
1576
* to the beginning of the buffer to make space for new data.
1579
memmove(self->super.buffer, buffer_start, buffer_bytes);
1580
state->pending_buffer_pos = 0;
1581
state->pending_buffer_end = buffer_bytes;
1583
if (G_UNLIKELY(self->super.super.flags & LPBS_POS_TRACKING))
1585
/* NOTE: we modify the current file position _after_ updating
1586
buffer_pos, since if we crash right here, at least we
1587
won't lose data on the next restart, but rather we
1588
duplicate some data */
1590
if (self->super.super.encoding)
1591
raw_split_size = log_proto_text_server_get_raw_size_of_buffer(self, buffer_start, buffer_bytes);
1593
raw_split_size = buffer_bytes;
1595
state->pending_raw_stream_pos += (gint64) (state->pending_raw_buffer_size - raw_split_size);
1596
state->pending_raw_buffer_size = raw_split_size;
1598
msg_trace("Buffer split",
1599
evt_tag_int("raw_split_size", raw_split_size),
1600
evt_tag_int("buffer_bytes", buffer_bytes),
1607
const guchar *msg_end = eol;
1609
/* eol points at the newline character. end points at the
1610
* character terminating the line, which may be a carriage
1611
* return preceeding the newline. */
1613
while ((msg_end > buffer_start) && (msg_end[-1] == '\r' || msg_end[-1] == '\n' || msg_end[-1] == 0))
1616
*msg_len = msg_end - buffer_start;
1617
*msg = buffer_start;
1618
state->pending_buffer_pos = eol + 1 - self->super.buffer;
1620
if (state->pending_buffer_end != state->pending_buffer_pos)
1623
/* store the end of the next line, it indicates whether we need
1624
* to read further data, or the buffer already contains a
1626
eom = find_eom(self->super.buffer + state->pending_buffer_pos, state->pending_buffer_end - state->pending_buffer_pos);
1628
state->buffer_cached_eol = eom - self->super.buffer;
1630
state->buffer_cached_eol = 0;
1634
state->pending_buffer_pos = state->pending_buffer_end;
1641
log_proto_buffered_server_put_state(&self->super);
1646
log_proto_text_server_free(LogProto *s)
1648
LogProtoTextServer *self = (LogProtoTextServer *) s;
1649
if (self->reverse_convert != (GIConv) -1)
1650
g_iconv_close(self->reverse_convert);
1652
g_free(self->reverse_buffer);
1653
log_proto_buffered_server_free(&self->super.super);
1657
log_proto_text_server_init(LogProtoTextServer *self, LogTransport *transport, gint max_msg_size, guint flags)
1659
log_proto_buffered_server_init(&self->super, transport, max_msg_size * 6, max_msg_size, flags);
1660
self->super.fetch_from_buf = log_proto_text_server_fetch_from_buf;
1661
self->super.super.prepare = log_proto_text_server_prepare;
1662
self->super.super.free_fn = log_proto_text_server_free;
1663
self->reverse_convert = (GIConv) -1;
1667
log_proto_text_server_new(LogTransport *transport, gint max_msg_size, guint flags)
1669
LogProtoTextServer *self = g_new0(LogProtoTextServer, 1);
1671
log_proto_text_server_init(self, transport, max_msg_size, flags);
1672
return &self->super.super;
1675
/* proto that reads the stream in even sized chunks */
1676
typedef struct _LogProtoRecordServer LogProtoRecordServer;
1677
struct _LogProtoRecordServer
1679
LogProtoBufferedServer super;
1684
log_proto_record_server_fetch_from_buf(LogProtoBufferedServer *s, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
1686
LogProtoRecordServer *self = (LogProtoRecordServer *) s;
1687
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(s);
1690
if (!(self->super.super.flags & LPRS_BINARY))
1692
eol = find_eom(buffer_start, buffer_bytes);
1693
*msg_len = (eol ? eol - buffer_start : buffer_bytes);
1697
*msg_len = buffer_bytes;
1699
state->pending_buffer_pos = state->pending_buffer_end;
1700
*msg = buffer_start;
1701
log_proto_buffered_server_put_state(s);
1706
log_proto_record_server_read_data(LogProtoBufferedServer *s, guchar *buf, gsize len, GSockAddr **sa)
1708
LogProtoRecordServer *self = (LogProtoRecordServer *) s;
1711
g_assert(len <= self->record_size);
1712
len = self->record_size;
1713
rc = log_transport_read(self->super.super.transport, buf, len, sa);
1714
if (rc > 0 && rc != self->record_size)
1716
msg_error("Padding was set, and couldn't read enough bytes",
1717
evt_tag_int(EVT_TAG_FD, self->super.super.transport->fd),
1718
evt_tag_int("padding", self->record_size),
1719
evt_tag_int("read", rc),
1728
log_proto_record_server_new(LogTransport *transport, gint record_size, guint flags)
1730
LogProtoRecordServer *self = g_new0(LogProtoRecordServer, 1);
1732
log_proto_buffered_server_init(&self->super, transport, record_size * 6, record_size, flags | LPBS_RECORD);
1733
self->super.fetch_from_buf = log_proto_record_server_fetch_from_buf;
1734
self->super.read_data = log_proto_record_server_read_data;
1735
self->record_size = record_size;
1736
return &self->super.super;
1739
/* proto that reads the stream in even sized chunks */
1740
typedef struct _LogProtoDGramServer LogProtoDGramServer;
1741
struct _LogProtoDGramServer
1743
LogProtoBufferedServer super;
1747
log_proto_dgram_server_fetch_from_buf(LogProtoBufferedServer *s, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
1749
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(s);
1752
* we are set to packet terminating mode
1754
*msg = buffer_start;
1755
*msg_len = buffer_bytes;
1756
state->pending_buffer_pos = state->pending_buffer_end;
1757
log_proto_buffered_server_put_state(s);
1762
log_proto_dgram_server_new(LogTransport *transport, gint max_msg_size, guint flags)
1764
LogProtoRecordServer *self = g_new0(LogProtoRecordServer, 1);
1766
log_proto_buffered_server_init(&self->super, transport, max_msg_size * 6, max_msg_size, flags | LPBS_IGNORE_EOF | LPBS_RECORD);
1767
self->super.fetch_from_buf = log_proto_dgram_server_fetch_from_buf;
1768
return &self->super.super;
1771
#define LPFCS_FRAME_SEND 0
1772
#define LPFCS_MESSAGE_SEND 1
1774
typedef struct _LogProtoFramedClient
1776
LogProtoTextClient super;
1777
guchar frame_hdr_buf[9];
1778
} LogProtoFramedClient;
1780
static LogProtoStatus
1781
log_proto_framed_client_post(LogProto *s, guchar *msg, gsize msg_len, gboolean *consumed)
1783
LogProtoFramedClient *self = (LogProtoFramedClient *) s;
1787
if (msg_len > 9999999)
1789
static const guchar *warn_msg;
1791
if (warn_msg != msg)
1793
msg_warning("Error, message length too large for framed protocol, truncated",
1794
evt_tag_int("length", msg_len),
1802
while (rc == LPS_SUCCESS && !(*consumed) && self->super.partial == NULL)
1804
switch (self->super.state)
1806
case LPFCS_FRAME_SEND:
1807
frame_hdr_len = g_snprintf((gchar *) self->frame_hdr_buf, sizeof(self->frame_hdr_buf), "%" G_GSIZE_FORMAT" ", msg_len);
1808
rc = log_proto_text_client_submit_write(s, self->frame_hdr_buf, frame_hdr_len, NULL, LPFCS_MESSAGE_SEND);
1810
case LPFCS_MESSAGE_SEND:
1812
rc = log_proto_text_client_submit_write(s, msg, msg_len, (GDestroyNotify) g_free, LPFCS_FRAME_SEND);
1815
g_assert_not_reached();
1823
log_proto_framed_client_new(LogTransport *transport)
1825
LogProtoFramedClient *self = g_new0(LogProtoFramedClient, 1);
1827
self->super.super.prepare = log_proto_text_client_prepare;
1828
self->super.super.flush = log_proto_text_client_flush;
1829
self->super.super.post = log_proto_framed_client_post;
1830
self->super.super.transport = transport;
1831
self->super.super.convert = (GIConv) -1;
1832
self->super.state = LPFCS_FRAME_SEND;
1833
return &self->super.super;
1836
#define LPFSS_FRAME_READ 0
1837
#define LPFSS_MESSAGE_READ 1
1839
#define LPFS_FRAME_BUFFER 10
1841
typedef struct _LogProtoFramedServer
1847
gsize buffer_size, buffer_pos, buffer_end;
1850
gboolean half_message_in_buffer;
1851
GSockAddr *prev_saddr;
1852
LogProtoStatus status;
1853
} LogProtoFramedServer;
1856
log_proto_framed_server_prepare(LogProto *s, gint *fd, GIOCondition *cond)
1858
LogProtoFramedServer *self = (LogProtoFramedServer *) s;
1860
*fd = self->super.transport->fd;
1861
*cond = self->super.transport->cond;
1863
/* there is a half message in our buffer so try to wait */
1864
if (!self->half_message_in_buffer)
1866
if (self->buffer_pos != self->buffer_end)
1868
/* we have a full message in our buffer so parse it without reading new data from the transport layer */
1873
/* if there's no pending I/O in the transport layer, then we want to do a read */
1880
static LogProtoStatus
1881
log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_read)
1885
if (self->buffer_pos == self->buffer_end)
1886
self->buffer_pos = self->buffer_end = 0;
1888
if (self->buffer_size == self->buffer_end)
1890
/* no more space in the buffer, we can't fetch further data. Move the
1891
* things we already have to the beginning of the buffer to make
1894
memmove(self->buffer, &self->buffer[self->buffer_pos], self->buffer_end - self->buffer_pos);
1895
self->buffer_end = self->buffer_end - self->buffer_pos;
1896
self->buffer_pos = 0;
1902
rc = log_transport_read(self->super.transport, &self->buffer[self->buffer_end], self->buffer_size - self->buffer_end, NULL);
1906
if (errno != EAGAIN)
1908
msg_error("Error reading RFC5428 style framed data",
1909
evt_tag_int("fd", self->super.transport->fd),
1910
evt_tag_errno("error", errno),
1916
/* we need more data to parse this message but the data is not available yet */
1917
self->half_message_in_buffer = TRUE;
1922
msg_verbose("EOF occurred while reading",
1923
evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
1929
self->buffer_end += rc;
1936
log_proto_framed_server_extract_frame_length(LogProtoFramedServer *self, gboolean *need_more_data)
1940
*need_more_data = TRUE;
1941
self->frame_len = 0;
1942
for (i = self->buffer_pos; i < self->buffer_end; i++)
1944
if (isdigit(self->buffer[i]) && (i - self->buffer_pos < 10))
1946
self->frame_len = self->frame_len * 10 + (self->buffer[i] - '0');
1948
else if (self->buffer[i] == ' ')
1950
*need_more_data = FALSE;
1951
self->buffer_pos = i + 1;
1956
msg_error("Invalid frame header",
1957
evt_tag_printf("header", "%.*s", (gint) (i - self->buffer_pos), &self->buffer[self->buffer_pos]),
1962
/* couldn't extract frame header, no error but need more data */
1966
static LogProtoStatus
1967
log_proto_framed_server_fetch(LogProto *s, const guchar **msg, gsize *msg_len, GSockAddr **sa, gboolean *may_read)
1969
LogProtoFramedServer *self = (LogProtoFramedServer *) s;
1970
LogProtoStatus status;
1971
gboolean try_read, need_more_data;
1975
switch (self->state)
1977
case LPFSS_FRAME_READ:
1982
if (!log_proto_framed_server_extract_frame_length(self, &need_more_data))
1984
/* invalid frame header */
1987
if (need_more_data && try_read)
1989
status = log_proto_framed_server_fetch_data(self, may_read);
1990
if (status != LPS_SUCCESS)
1996
if (!need_more_data)
1998
self->state = LPFSS_MESSAGE_READ;
1999
if (self->frame_len > self->max_msg_size)
2001
msg_error("Incoming frame larger than log_msg_size()",
2002
evt_tag_int("log_msg_size", self->buffer_size - LPFS_FRAME_BUFFER),
2003
evt_tag_int("frame_length", self->frame_len),
2007
if (self->buffer_pos + self->frame_len > self->buffer_size)
2009
/* message would be too large to fit into the buffer at
2010
* buffer_pos, we need to move data to the beginning of
2011
* the buffer to make space, and once we are at it, move
2012
* to the beginning to make space for the maximum number
2013
* of messages before the next shift */
2014
memmove(self->buffer, &self->buffer[self->buffer_pos], self->buffer_end - self->buffer_pos);
2015
self->buffer_end = self->buffer_end - self->buffer_pos;
2016
self->buffer_pos = 0;
2021
case LPFSS_MESSAGE_READ:
2025
/* NOTE: here we can assume that the complete message fits into
2026
* the buffer because of the checks/move operation in the
2027
* LPFSS_FRAME_READ state */
2028
if (self->buffer_end - self->buffer_pos >= self->frame_len)
2030
/* ok, we already have the complete message */
2032
*msg = &self->buffer[self->buffer_pos];
2033
*msg_len = self->frame_len;
2034
self->buffer_pos += self->frame_len;
2035
self->state = LPFSS_FRAME_READ;
2037
/* we have the full message here so reset the half message flag */
2038
self->half_message_in_buffer = FALSE;
2043
status = log_proto_framed_server_fetch_data(self, may_read);
2044
if (status != LPS_SUCCESS)
2057
log_proto_framed_server_free(LogProto *s)
2059
LogProtoFramedServer *self = (LogProtoFramedServer *) s;
2060
g_free(self->buffer);
2064
log_proto_framed_server_new(LogTransport *transport, gint max_msg_size)
2066
LogProtoFramedServer *self = g_new0(LogProtoFramedServer, 1);
2068
self->super.prepare = log_proto_framed_server_prepare;
2069
self->super.fetch = log_proto_framed_server_fetch;
2070
self->super.free_fn = log_proto_framed_server_free;
2071
self->super.transport = transport;
2072
self->super.convert = (GIConv) -1;
2073
/* max message + frame header */
2074
self->max_msg_size = max_msg_size;
2075
self->buffer_size = max_msg_size + LPFS_FRAME_BUFFER;
2076
self->buffer = g_malloc(self->buffer_size);
2077
self->half_message_in_buffer = FALSE;
2078
return &self->super;