2
* Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary
3
* Copyright (c) 1998-2010 Balázs Scheidler
5
* This library is free software; you can redistribute it and/or
6
* modify it under the terms of the GNU Lesser General Public
7
* License as published by the Free Software Foundation; either
8
* version 2.1 of the License, or (at your option) any later version.
10
* This library is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13
* Lesser General Public License for more details.
15
* You should have received a copy of the GNU Lesser General Public
16
* License along with this library; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* As an additional exemption you are allowed to compile & link against the
20
* OpenSSL libraries as published by the OpenSSL project. See the file
21
* COPYING for details.
28
#include "persist-state.h"
39
log_proto_set_encoding(LogProto *self, const gchar *encoding)
41
if (self->convert != (GIConv) -1)
43
g_iconv_close(self->convert);
44
self->convert = (GIConv) -1;
48
g_free(self->encoding);
49
self->encoding = NULL;
52
self->convert = g_iconv_open("utf-8", encoding);
53
if (self->convert == (GIConv) -1)
56
self->encoding = g_strdup(encoding);
61
log_proto_free(LogProto *s)
65
if (s->convert != (GIConv) -1)
66
g_iconv_close(s->convert);
69
log_transport_free(s->transport);
74
typedef struct _LogProtoTextClient
78
gsize partial_len, partial_pos;
82
log_proto_text_client_prepare(LogProto *s, gint *fd, GIOCondition *cond)
84
LogProtoTextClient *self = (LogProtoTextClient *) s;
86
*fd = self->super.transport->fd;
87
*cond = self->super.transport->cond;
89
/* if there's no pending I/O in the transport layer, then we want to do a write */
92
return !!self->partial;
96
log_proto_text_client_flush(LogProto *s)
98
LogProtoTextClient *self = (LogProtoTextClient *) s;
101
/* attempt to flush previously buffered data */
104
gint len = self->partial_len - self->partial_pos;
106
rc = log_transport_write(self->super.transport, &self->partial[self->partial_pos], len);
109
if (errno != EAGAIN && errno != EINTR)
111
msg_error("I/O error occurred while writing",
112
evt_tag_int("fd", self->super.transport->fd),
113
evt_tag_errno(EVT_TAG_OSERROR, errno),
121
self->partial_pos += rc;
126
g_free(self->partial);
127
self->partial = NULL;
128
/* NOTE: we return here to give a chance to the framed protocol to send the frame header. */
136
* log_proto_text_client_post:
137
* @msg: formatted log message to send (this might be consumed by this function)
138
* @msg_len: length of @msg
139
* @consumed: pointer to a gboolean that gets set if the message was consumed by this function
140
* @error: error information, if any
142
* This function posts a message to the log transport, performing buffering
143
* of partially sent data if needed. The return value indicates whether we
144
* successfully sent this message, or if it should be resent by the caller.
146
static LogProtoStatus
147
log_proto_text_client_post(LogProto *s, guchar *msg, gsize msg_len, gboolean *consumed)
149
LogProtoTextClient *self = (LogProtoTextClient *) s;
152
/* NOTE: the client does not support charset conversion for now */
153
g_assert(self->super.convert == (GIConv) -1);
156
rc = log_proto_flush(s);
161
else if (self->partial)
163
/* NOTE: the partial buffer has not been emptied yet even with the
164
* flush above, we shouldn't attempt to write again.
166
* Otherwise: with the framed protocol this could case the frame
167
* header to be split, and interleaved with message payload, as in:
169
* First bytes of frame header || payload || tail of frame header.
171
* This obviously would cause the framing to break. Also libssl
172
* returns an error in this case, which is how this was discovered.
177
/* OK, partial buffer empty, now flush msg that we just got */
179
rc = log_transport_write(self->super.transport, msg, msg_len);
181
if (rc < 0 || rc != msg_len)
183
/* error OR partial flush, we sent _some_ of the message that we got, save it to self->partial and tell the caller that we consumed it */
184
if (rc < 0 && errno != EAGAIN && errno != EINTR)
187
/* NOTE: log_proto_framed_client_post depends on our current
188
* behaviour, that we consume every message that we can, even if we
189
* couldn't write a single byte out.
191
* If we return LPS_SUCCESS and self->partial == NULL, it assumes that
192
* the message was sent.
197
self->partial_len = msg_len;
198
self->partial_pos = rc > 0 ? rc : 0;
203
/* all data was nicely sent */
210
if (errno != EAGAIN && errno != EINTR)
212
msg_error("I/O error occurred while writing",
213
evt_tag_int("fd", self->super.transport->fd),
214
evt_tag_errno(EVT_TAG_OSERROR, errno),
223
log_proto_text_client_new(LogTransport *transport)
225
LogProtoTextClient *self = g_new0(LogProtoTextClient, 1);
227
self->super.prepare = log_proto_text_client_prepare;
228
self->super.flush = log_proto_text_client_flush;
229
self->super.post = log_proto_text_client_post;
230
self->super.transport = transport;
231
self->super.convert = (GIConv) -1;
235
typedef struct _LogProtoFileWriter
239
gsize partial_len, partial_pos;
244
struct iovec buffer[0];
245
} LogProtoFileWriter;
248
* log_proto_file_writer_flush:
250
* this function flushes the file output buffer
251
* it is called either form log_proto_file_writer_post (normal mode: the buffer is full)
252
* or from log_proto_flush (foced flush: flush time, exit, etc)
255
static LogProtoStatus
256
log_proto_file_writer_flush(LogProto *s)
258
LogProtoFileWriter *self = (LogProtoFileWriter *)s;
259
gint rc, i, i0, sum, ofs;
261
/* we might be called from log_writer_deinit() without having a buffer at all */
263
if (self->buf_count == 0)
266
/* lseek() is used instead of O_APPEND, as on NFS O_APPEND performs
267
* poorly, as reported on the mailing list 2008/05/29 */
269
lseek(self->fd, 0, SEEK_END);
270
rc = writev(self->fd, self->buffer, self->buf_count);
274
if (errno != EAGAIN && errno != EINTR)
276
msg_error("I/O error occurred while writing",
277
evt_tag_int("fd", self->super.transport->fd),
278
evt_tag_errno(EVT_TAG_OSERROR, errno),
285
else if (rc != self->sum_len)
287
/* partial success: not everything has been written out */
288
/* look for the first chunk that has been cut */
289
sum = self->buffer[0].iov_len; /* sum is the cumulated length of the already processed items */
292
sum += self->buffer[++i].iov_len;
293
self->partial_len = sum - rc; /* this is the length of the first non-written chunk */
296
/* add the lengths of the following messages */
297
while (i < self->buf_count)
298
self->partial_len += self->buffer[i++].iov_len;
299
/* allocate and copy the remaning data */
300
self->partial = (guchar *)g_malloc(self->partial_len);
301
ofs = sum - rc; /* the length of the remaning (not processed) chunk in the first message */
302
memcpy(self->partial, self->buffer[i0].iov_base + rc - (i0 > 0 ? (sum - self->buffer[i0 - 1].iov_len) : 0), ofs);
304
while (i < self->buf_count)
306
memcpy(self->partial + ofs, self->buffer[i].iov_base, self->buffer[i].iov_len);
307
ofs += self->buffer[i].iov_len;
310
self->partial_pos = 0;
313
/* free the previous message strings (the remaning part has been copied to the partial buffer) */
314
for (i = 0; i < self->buf_count; ++i)
315
g_free(self->buffer[i].iov_base);
323
* log_proto_file_writer_post:
324
* @msg: formatted log message to send (this might be consumed by this function)
325
* @msg_len: length of @msg
326
* @consumed: pointer to a gboolean that gets set if the message was consumed by this function
327
* @error: error information, if any
329
* This function posts a message to the log transport, performing buffering
330
* of partially sent data if needed. The return value indicates whether we
331
* successfully sent this message, or if it should be resent by the caller.
333
static LogProtoStatus
334
log_proto_file_writer_post(LogProto *s, guchar *msg, gsize msg_len, gboolean *consumed)
336
LogProtoFileWriter *self = (LogProtoFileWriter *)s;
339
if (self->buf_count >= self->buf_size)
341
rc = log_proto_file_writer_flush(s);
342
if (rc != LPS_SUCCESS || self->buf_count >= self->buf_size)
344
/* don't consume a new message if flush failed, or even after the flush we don't have any free slots */
352
/* there is still some data from the previous file writing process */
353
gint len = self->partial_len - self->partial_pos;
355
rc = write(self->fd, self->partial + self->partial_pos, len);
362
self->partial_pos += rc;
367
g_free(self->partial);
368
self->partial = NULL;
369
/* NOTE: we return here to give a chance to the framed protocol to send the frame header. */
374
/* register the new message */
375
self->buffer[self->buf_count].iov_base = (void *) msg;
376
self->buffer[self->buf_count].iov_len = msg_len;
378
self->sum_len += msg_len;
381
if (self->buf_count == self->buf_size)
383
/* we have reached the max buffer size -> we need to write the messages */
384
return log_proto_file_writer_flush(s);
390
if (errno != EAGAIN && errno != EINTR)
392
msg_error("I/O error occurred while writing",
393
evt_tag_int("fd", self->super.transport->fd),
394
evt_tag_errno(EVT_TAG_OSERROR, errno),
403
log_proto_file_writer_prepare(LogProto *s, gint *fd, GIOCondition *cond)
405
LogProtoFileWriter *self = (LogProtoFileWriter *) s;
407
*fd = self->super.transport->fd;
408
*cond = self->super.transport->cond;
410
/* if there's no pending I/O in the transport layer, then we want to do a write */
413
return self->buf_count > 0 || self->partial;
417
log_proto_file_writer_new(LogTransport *transport, gint flush_lines)
419
if (flush_lines == 0)
420
/* the flush-lines option has not been specified, use a default value */
422
if (flush_lines > IOV_MAX)
423
/* limit the flush_lines according to the current platform */
424
flush_lines = IOV_MAX;
426
/* allocate the structure with the proper number of items at the end */
427
LogProtoFileWriter *self = (LogProtoFileWriter *)g_malloc0(sizeof(LogProtoFileWriter) + sizeof(struct iovec)*flush_lines);
429
self->fd = transport->fd;
430
self->buf_size = flush_lines;
431
self->super.prepare = log_proto_file_writer_prepare;
432
self->super.post = log_proto_file_writer_post;
433
self->super.flush = log_proto_file_writer_flush;
434
self->super.transport = transport;
435
self->super.convert = (GIConv) -1;
441
typedef struct _LogProtoBufferedServerState
443
/* NOTE: that if you add/remove structure members you have to update
444
* the byte order swap code in LogProtoFileReader for mulit-byte
449
/* this indicates that the members in the struct are stored in
450
* big-endian byte order. if the byte ordering of the struct doesn't
451
* match the current CPU byte ordering, then the members are
452
* byte-swapped when the state is loaded.
455
guint8 raw_buffer_leftover_size;
456
guint8 __padding1[1];
458
guint32 pending_buffer_end;
460
guint32 buffer_cached_eol;
461
guint32 pending_buffer_pos;
463
/* the stream position where we converted out current buffer from (offset in file) */
464
gint64 raw_stream_pos;
465
gint64 pending_raw_stream_pos;
466
/* the size of raw data (measured in bytes) that got converted from raw_stream_pos into our buffer */
467
gint32 raw_buffer_size;
468
gint32 pending_raw_buffer_size;
469
guchar raw_buffer_leftover[8];
473
} LogProtoBufferedServerState;
475
typedef struct _LogProtoBufferedServer LogProtoBufferedServer;
476
struct _LogProtoBufferedServer
479
gboolean (*fetch_from_buf)(LogProtoBufferedServer *self, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest);
480
gint (*read_data)(LogProtoBufferedServer *self, guchar *buf, gsize len, GSockAddr **sa);
482
LogProtoBufferedServerState *state1;
483
PersistState *persist_state;
484
PersistEntryHandle persist_handle;
486
gint max_buffer_size;
487
gint init_buffer_size;
489
GSockAddr *prev_saddr;
490
LogProtoStatus status;
493
static LogProtoBufferedServerState *
494
log_proto_buffered_server_get_state(LogProtoBufferedServer *self)
496
if (self->persist_state)
498
g_assert(self->persist_handle != 0);
499
return persist_state_map_entry(self->persist_state, self->persist_handle);
501
if (G_UNLIKELY(!self->state1))
503
self->state1 = g_new0(LogProtoBufferedServerState, 1);
509
log_proto_buffered_server_put_state(LogProtoBufferedServer *self)
511
if (self->persist_state && self->persist_handle)
512
persist_state_unmap_entry(self->persist_state, self->persist_handle);
516
log_proto_buffered_server_convert_from_raw(LogProtoBufferedServer *self, const guchar *raw_buffer, gsize raw_buffer_len)
518
/* some data was read */
519
gsize avail_in = raw_buffer_len;
523
gboolean success = FALSE;
524
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
528
avail_out = state->buffer_size - state->pending_buffer_end;
529
out = (gchar *) self->buffer + state->pending_buffer_end;
531
ret = g_iconv(self->super.convert, (gchar **) &raw_buffer, &avail_in, (gchar **) &out, &avail_out);
532
if (ret == (gsize) -1)
537
/* Incomplete text, do not report an error, rather try to read again */
538
state->pending_buffer_end = state->buffer_size - avail_out;
542
if (avail_in > sizeof(state->raw_buffer_leftover))
544
msg_error("Invalid byte sequence, the remaining raw buffer is larger than the supported leftover size",
545
evt_tag_str("encoding", self->super.encoding),
546
evt_tag_int("avail_in", avail_in),
547
evt_tag_int("leftover_size", sizeof(state->raw_buffer_leftover)),
551
memcpy(state->raw_buffer_leftover, raw_buffer, avail_in);
552
state->raw_buffer_leftover_size = avail_in;
553
state->raw_buffer_size -= avail_in;
554
msg_trace("Leftover characters remained after conversion, delaying message until another chunk arrives",
555
evt_tag_str("encoding", self->super.encoding),
556
evt_tag_int("avail_in", avail_in),
562
state->pending_buffer_end = state->buffer_size - avail_out;
563
/* extend the buffer */
565
if ((state->buffer_size < self->max_buffer_size))
567
state->buffer_size *= 2;
568
if (state->buffer_size > self->max_buffer_size)
569
state->buffer_size = self->max_buffer_size;
571
self->buffer = g_realloc(self->buffer, state->buffer_size);
573
/* recalculate the out pointer, and add what we have now */
578
msg_error("Incoming byte stream requires a too large conversion buffer, probably invalid character sequence",
579
evt_tag_str("encoding", self->super.encoding),
580
evt_tag_printf("buffer", "%.*s", (gint) state->pending_buffer_end, self->buffer),
587
msg_notice("Invalid byte sequence or other error while converting input, skipping character",
588
evt_tag_str("encoding", self->super.encoding),
589
evt_tag_printf("char", "0x%02x", *(guchar *) raw_buffer),
596
state->pending_buffer_end = state->buffer_size - avail_out;
599
while (avail_in > 0);
604
log_proto_buffered_server_put_state(self);
609
log_proto_buffered_server_apply_state(LogProtoBufferedServer *self, PersistEntryHandle handle, const gchar *persist_name)
613
LogProtoBufferedServerState *state;
616
fd = self->super.transport->fd;
617
self->persist_handle = handle;
619
if (fstat(fd, &st) < 0)
622
state = log_proto_buffered_server_get_state(self);
626
self->buffer = g_malloc(state->buffer_size);
628
state->pending_buffer_end = 0;
630
if (state->file_inode &&
631
state->file_inode == st.st_ino &&
632
state->file_size <= st.st_size &&
633
state->raw_stream_pos <= st.st_size)
635
ofs = state->raw_stream_pos;
637
lseek(fd, ofs, SEEK_SET);
641
if (state->file_inode)
643
/* the stored state does not match the current file */
644
msg_notice("The current log file has a mismatching size/inode information, restarting from the beginning",
645
evt_tag_str("state", persist_name),
646
evt_tag_int("stored_inode", state->file_inode),
647
evt_tag_int("cur_file_inode", st.st_ino),
648
evt_tag_int("stored_size", state->file_size),
649
evt_tag_int("cur_file_size", st.st_size),
650
evt_tag_int("raw_stream_pos", state->raw_stream_pos),
655
if (state->raw_buffer_size)
660
if (!self->super.encoding)
662
/* no conversion, we read directly into our buffer */
663
if (state->raw_buffer_size > state->buffer_size)
665
msg_notice("Invalid LogProtoBufferedServerState.raw_buffer_size, larger than buffer_size and no encoding is set, restarting from the beginning",
666
evt_tag_str("state", persist_name),
667
evt_tag_int("raw_buffer_size", state->raw_buffer_size),
668
evt_tag_int("buffer_size", state->buffer_size),
669
evt_tag_int("init_buffer_size", self->init_buffer_size),
673
raw_buffer = self->buffer;
677
if (state->raw_buffer_size > self->init_buffer_size)
679
msg_notice("Invalid LogProtoBufferedServerState.raw_buffer_size, larger than init_buffer_size, restarting from the beginning",
680
evt_tag_str("state", persist_name),
681
evt_tag_int("raw_buffer_size", state->raw_buffer_size),
682
evt_tag_int("init_buffer_size", self->init_buffer_size),
686
raw_buffer = g_alloca(state->raw_buffer_size);
689
rc = log_transport_read(self->super.transport, raw_buffer, state->raw_buffer_size, NULL);
690
if (rc != state->raw_buffer_size)
692
msg_notice("Error re-reading buffer contents of the file to be continued, restarting from the beginning",
693
evt_tag_str("state", persist_name),
698
state->pending_buffer_end = 0;
699
if (self->super.encoding)
701
if (!log_proto_buffered_server_convert_from_raw(self, raw_buffer, rc))
703
msg_notice("Error re-converting buffer contents of the file to be continued, restarting from the beginning",
704
evt_tag_str("state", persist_name),
711
state->pending_buffer_end += rc;
714
if (state->buffer_pos > state->pending_buffer_end ||
715
state->buffer_cached_eol > state->pending_buffer_end)
717
msg_notice("Converted buffer contents is smaller than the current buffer position, starting from the beginning of the buffer, some lines may be duplicated",
718
evt_tag_str("state", persist_name),
720
state->buffer_pos = state->pending_buffer_pos = state->buffer_cached_eol = 0;
725
/* although we do have buffer position information, but the
726
* complete contents of the buffer is already processed, instead
727
* of reading and then dropping it, position the file after the
730
state->raw_stream_pos += state->raw_buffer_size;
731
ofs = state->raw_stream_pos;
732
state->raw_buffer_size = 0;
733
state->buffer_pos = state->pending_buffer_end = 0;
735
lseek(fd, state->raw_stream_pos, SEEK_SET);
741
state->buffer_pos = 0;
742
state->pending_buffer_end = 0;
743
state->buffer_cached_eol = 0;
744
state->raw_stream_pos = 0;
745
state->raw_buffer_size = 0;
746
state->raw_buffer_leftover_size = 0;
747
lseek(fd, 0, SEEK_SET);
750
state->file_inode = st.st_ino;
751
state->file_size = st.st_size;
752
state->raw_stream_pos = ofs;
753
state->pending_buffer_pos = state->buffer_pos;
754
state->pending_raw_stream_pos = state->raw_stream_pos;
755
state->pending_raw_buffer_size = state->raw_buffer_size;
758
log_proto_buffered_server_put_state(self);
761
static PersistEntryHandle
762
log_proto_buffered_server_alloc_state(LogProtoBufferedServer *self, PersistState *persist_state, const gchar *persist_name)
764
LogProtoBufferedServerState *state;
765
PersistEntryHandle handle;
767
handle = persist_state_alloc_entry(persist_state, persist_name, sizeof(LogProtoBufferedServerState));
770
state = persist_state_map_entry(persist_state, handle);
773
state->big_endian = (G_BYTE_ORDER == G_BIG_ENDIAN);
775
persist_state_unmap_entry(persist_state, handle);
782
log_proto_buffered_server_convert_state(LogProtoBufferedServer *self, guint8 persist_version, gpointer old_state, gsize old_state_size, LogProtoBufferedServerState *state)
784
if (persist_version <= 2)
787
state->file_inode = 0;
788
state->raw_stream_pos = strtoll((gchar *) old_state, NULL, 10);
789
state->file_size = 0;
793
else if (persist_version == 3)
795
SerializeArchive *archive;
807
archive = serialize_buffer_archive_new(old_state, old_state_size);
809
/* NOTE: the v23 conversion code adds an extra length field which we
810
* need to read out. */
811
g_assert(serialize_read_uint32(archive, &read_length) && read_length == old_state_size - sizeof(read_length));
813
/* original v3 format starts here */
814
if (!serialize_read_uint16(archive, &version) || version != 0)
816
msg_error("Internal error restoring log reader state, stored data has incorrect version",
817
evt_tag_int("version", version));
818
goto error_converting_v3;
821
if (!serialize_read_uint64(archive, (guint64 *) &cur_pos) ||
822
!serialize_read_uint64(archive, (guint64 *) &cur_inode) ||
823
!serialize_read_uint64(archive, (guint64 *) &cur_size))
825
msg_error("Internal error restoring information about the current file position, restarting from the beginning",
827
goto error_converting_v3;
830
if (!serialize_read_uint16(archive, &version) || version != 0)
832
msg_error("Internal error, protocol state has incorrect version",
833
evt_tag_int("version", version),
835
goto error_converting_v3;
838
if (!serialize_read_cstring(archive, &buffer, &buffer_len))
840
msg_error("Internal error, error reading buffer contents",
841
evt_tag_int("version", version),
843
goto error_converting_v3;
846
if (!self->buffer || state->buffer_size < buffer_len)
848
gsize buffer_size = MAX(self->init_buffer_size, buffer_len);
849
self->buffer = g_realloc(self->buffer, buffer_size);
851
serialize_archive_free(archive);
853
memcpy(self->buffer, buffer, buffer_len);
854
state->buffer_pos = 0;
855
state->pending_buffer_end = buffer_len;
859
state->file_inode = cur_inode;
860
state->raw_stream_pos = cur_pos;
861
state->file_size = cur_size;
864
serialize_archive_free(archive);
870
log_proto_buffered_server_restart_with_state(LogProto *s, PersistState *persist_state, const gchar *persist_name)
872
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
873
guint8 persist_version;
874
PersistEntryHandle old_state_handle;
876
gsize old_state_size;
877
PersistEntryHandle new_state_handle = 0;
878
gpointer new_state = NULL;
881
self->persist_state = persist_state;
882
old_state_handle = persist_state_lookup_entry(persist_state, persist_name, &old_state_size, &persist_version);
883
if (!old_state_handle)
885
new_state_handle = log_proto_buffered_server_alloc_state(self, persist_state, persist_name);
886
if (!new_state_handle)
887
goto fallback_non_persistent;
888
log_proto_buffered_server_apply_state(self, new_state_handle, persist_name);
891
if (persist_version < 4)
893
new_state_handle = log_proto_buffered_server_alloc_state(self, persist_state, persist_name);
894
if (!new_state_handle)
895
goto fallback_non_persistent;
897
old_state = persist_state_map_entry(persist_state, old_state_handle);
898
new_state = persist_state_map_entry(persist_state, new_state_handle);
899
success = log_proto_buffered_server_convert_state(self, persist_version, old_state, old_state_size, new_state);
900
persist_state_unmap_entry(persist_state, old_state_handle);
901
persist_state_unmap_entry(persist_state, new_state_handle);
903
/* we're using the newly allocated state structure regardless if
904
* conversion succeeded. If the conversion went wrong then
905
* new_state is still in its freshly initialized form since the
906
* convert function will not touch the state in the error
910
log_proto_buffered_server_apply_state(self, new_state_handle, persist_name);
913
else if (persist_version == 4)
915
LogProtoBufferedServerState *state;
917
old_state = persist_state_map_entry(persist_state, old_state_handle);
919
if ((state->big_endian && G_BYTE_ORDER == G_LITTLE_ENDIAN) ||
920
(!state->big_endian && G_BYTE_ORDER == G_BIG_ENDIAN))
923
/* byte order conversion in order to avoid the hassle with
924
scattered byte order conversions in the code */
926
state->big_endian = !state->big_endian;
927
state->buffer_pos = GUINT32_SWAP_LE_BE(state->buffer_pos);
928
state->pending_buffer_pos = GUINT32_SWAP_LE_BE(state->pending_buffer_pos);
929
state->pending_buffer_end = GUINT32_SWAP_LE_BE(state->pending_buffer_end);
930
state->buffer_size = GUINT32_SWAP_LE_BE(state->buffer_size);
931
state->buffer_cached_eol = GUINT32_SWAP_LE_BE(state->buffer_cached_eol);
932
state->raw_stream_pos = GUINT64_SWAP_LE_BE(state->raw_stream_pos);
933
state->raw_buffer_size = GUINT32_SWAP_LE_BE(state->raw_buffer_size);
934
state->pending_raw_stream_pos = GUINT64_SWAP_LE_BE(state->pending_raw_stream_pos);
935
state->pending_raw_buffer_size = GUINT32_SWAP_LE_BE(state->pending_raw_buffer_size);
936
state->file_size = GUINT64_SWAP_LE_BE(state->file_size);
937
state->file_inode = GUINT64_SWAP_LE_BE(state->file_inode);
940
if (state->version > 0)
942
msg_error("Internal error restoring log reader state, stored data is too new",
943
evt_tag_int("version", state->version));
946
persist_state_unmap_entry(persist_state, old_state_handle);
947
log_proto_buffered_server_apply_state(self, old_state_handle, persist_name);
952
msg_error("Internal error restoring log reader state, stored data is too new",
953
evt_tag_int("version", persist_version));
957
fallback_non_persistent:
958
new_state = g_new0(LogProtoBufferedServerState, 1);
962
new_state_handle = log_proto_buffered_server_alloc_state(self, persist_state, persist_name);
963
if (!new_state_handle)
964
goto fallback_non_persistent;
965
new_state = persist_state_map_entry(persist_state, new_state_handle);
969
LogProtoBufferedServerState *state = new_state;
971
/* error happened, restart the file from the beginning */
972
state->raw_stream_pos = 0;
973
state->file_inode = 0;
974
state->file_size = 0;
975
if (new_state_handle)
976
log_proto_buffered_server_apply_state(self, new_state_handle, persist_name);
978
self->state1 = new_state;
980
if (new_state_handle)
982
persist_state_unmap_entry(persist_state, new_state_handle);
988
log_proto_buffered_server_prepare(LogProto *s, gint *fd, GIOCondition *cond)
990
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
992
*fd = self->super.transport->fd;
993
*cond = self->super.transport->cond;
995
/* if there's no pending I/O in the transport layer, then we want to do a read */
1004
log_proto_buffered_server_read_data(LogProtoBufferedServer *self, guchar *buf, gsize len, GSockAddr **sa)
1006
return log_transport_read(self->super.transport, buf, len, sa);
1009
static LogProtoStatus
1010
log_proto_buffered_server_fetch_from_buf(LogProtoBufferedServer *self, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
1013
const guchar *buffer_start;
1014
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
1015
gboolean success = FALSE;
1017
buffer_start = self->buffer + state->pending_buffer_pos;
1018
buffer_bytes = state->pending_buffer_end - state->pending_buffer_pos;
1020
if (buffer_bytes == 0)
1022
/* if buffer_bytes is zero bytes, it means that we completely
1023
* processed our buffer without having a fraction of a line still
1024
* there. It is important to reset
1025
* pending_buffer_pos/pending_buffer_end to zero as the caller assumes
1026
* that if we return no message from the buffer, then buffer_pos is
1030
if (G_UNLIKELY(self->super.flags & LPBS_POS_TRACKING))
1032
state->pending_raw_stream_pos += state->pending_raw_buffer_size;
1033
state->pending_raw_buffer_size = 0;
1035
state->pending_buffer_pos = state->pending_buffer_end = 0;
1039
success = self->fetch_from_buf(self, buffer_start, buffer_bytes, msg, msg_len, flush_the_rest);
1041
log_proto_buffered_server_put_state(self);
1046
* Returns: TRUE to indicate success, FALSE otherwise. The returned
1047
* msg can be NULL even if no failure occurred.
1049
static LogProtoStatus
1050
log_proto_buffered_server_fetch(LogProto *s, const guchar **msg, gsize *msg_len, GSockAddr **sa, gboolean *may_read)
1052
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
1054
guchar *raw_buffer = NULL;
1055
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
1056
LogProtoStatus result = self->status;
1058
if (G_UNLIKELY(!self->buffer))
1060
self->buffer = g_malloc(self->init_buffer_size);
1061
state->buffer_size = self->init_buffer_size;
1067
if (self->status != LPS_SUCCESS)
1072
if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, FALSE))
1074
if (sa && self->prev_saddr)
1075
*sa = g_sockaddr_ref(self->prev_saddr);
1079
/* ok, no more messages in the buffer, read a chunk */
1084
if (self->super.flags & LPBS_NOMREAD)
1087
/* read the next chunk to be processed */
1089
if (self->prev_saddr)
1091
/* new chunk of data, potentially new sockaddr, forget the previous value */
1092
g_sockaddr_unref(self->prev_saddr);
1093
self->prev_saddr = NULL;
1096
if (!self->super.encoding)
1098
/* no conversion, we read directly into our buffer */
1099
raw_buffer = self->buffer + state->pending_buffer_end;
1100
avail = state->buffer_size - state->pending_buffer_end;
1104
/* if conversion is needed, we first read into an on-stack
1105
* buffer, and then convert it into our internal buffer */
1107
raw_buffer = g_alloca(self->init_buffer_size + state->raw_buffer_leftover_size);
1108
memcpy(raw_buffer, state->raw_buffer_leftover, state->raw_buffer_leftover_size);
1109
avail = self->init_buffer_size;
1112
rc = self->read_data(self, raw_buffer + state->raw_buffer_leftover_size, avail, sa);
1114
self->prev_saddr = *sa;
1117
if (errno == EAGAIN)
1119
/* ok we don't have any more data to read, return to main poll loop */
1124
/* an error occurred while reading */
1125
msg_error("I/O error occurred while reading",
1126
evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
1127
evt_tag_errno(EVT_TAG_OSERROR, errno),
1130
/* we set self->status explicitly as we want to return
1131
* LPS_ERROR on the _next_ invocation, not now */
1132
self->status = LPS_ERROR;
1133
if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, TRUE))
1135
if (sa && self->prev_saddr)
1136
*sa = g_sockaddr_ref(self->prev_saddr);
1139
result = self->status;
1145
if ((self->super.flags & LPBS_IGNORE_EOF) == 0)
1148
msg_verbose("EOF occurred while reading",
1149
evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
1151
if (state->raw_buffer_leftover_size > 0)
1153
msg_error("EOF read on a channel with leftovers from previous character conversion, dropping input",
1158
self->status = LPS_EOF;
1159
if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, TRUE))
1161
if (sa && self->prev_saddr)
1162
*sa = g_sockaddr_ref(self->prev_saddr);
1165
result = self->status;
1177
state->pending_raw_buffer_size += rc;
1178
rc += state->raw_buffer_leftover_size;
1179
state->raw_buffer_leftover_size = 0;
1181
if (self->super.encoding)
1183
if (!log_proto_buffered_server_convert_from_raw(self, raw_buffer, rc))
1191
state->pending_buffer_end += rc;
1194
if (log_proto_buffered_server_fetch_from_buf(self, msg, msg_len, FALSE))
1196
if (sa && self->prev_saddr)
1197
*sa = g_sockaddr_ref(self->prev_saddr);
1204
/* result contains our result, but once an error happens, the error condition remains persistent */
1205
log_proto_buffered_server_put_state(self);
1206
if (result != LPS_SUCCESS)
1207
self->status = result;
1212
log_proto_buffered_server_queued(LogProto *s)
1214
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
1215
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(self);
1217
/* NOTE: we modify the current file position _after_ updating
1218
buffer_pos, since if we crash right here, at least we
1219
won't lose data on the next restart, but rather we
1220
duplicate some data */
1222
state->buffer_pos = state->pending_buffer_pos;
1223
state->raw_stream_pos = state->pending_raw_stream_pos;
1224
state->raw_buffer_size = state->pending_raw_buffer_size;
1225
if (state->pending_buffer_pos == state->pending_buffer_end)
1227
state->pending_buffer_end = 0;
1228
state->buffer_pos = state->pending_buffer_pos = 0;
1230
if (self->super.flags & LPBS_POS_TRACKING)
1232
if (state->buffer_pos == state->pending_buffer_end)
1234
state->raw_stream_pos += state->raw_buffer_size;
1235
state->raw_buffer_size = 0;
1238
msg_trace("Last message got confirmed",
1239
evt_tag_int("raw_stream_pos", state->raw_stream_pos),
1240
evt_tag_int("raw_buffer_len", state->raw_buffer_size),
1241
evt_tag_int("buffer_pos", state->buffer_pos),
1242
evt_tag_int("buffer_end", state->pending_buffer_end),
1243
evt_tag_int("buffer_cached_eol", state->buffer_cached_eol),
1245
log_proto_buffered_server_put_state(self);
1249
log_proto_buffered_server_free(LogProto *s)
1251
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
1253
g_sockaddr_unref(self->prev_saddr);
1255
g_free(self->buffer);
1258
g_free(self->state1);
1263
log_proto_buffered_server_init(LogProtoBufferedServer *self, LogTransport *transport, gint max_buffer_size, gint init_buffer_size, guint flags)
1265
self->super.prepare = log_proto_buffered_server_prepare;
1266
self->super.fetch = log_proto_buffered_server_fetch;
1267
self->super.queued = log_proto_buffered_server_queued;
1268
self->super.free_fn = log_proto_buffered_server_free;
1269
self->super.transport = transport;
1270
self->super.convert = (GIConv) -1;
1271
self->super.restart_with_state = log_proto_buffered_server_restart_with_state;
1272
self->read_data = log_proto_buffered_server_read_data;
1274
self->super.flags = flags;
1276
self->init_buffer_size = init_buffer_size;
1277
self->max_buffer_size = max_buffer_size;
1280
struct _LogProtoTextServer
1282
LogProtoBufferedServer super;
1283
GIConv reverse_convert;
1284
gchar *reverse_buffer;
1285
gsize reverse_buffer_len;
1290
* This function is called in cases when several files are continously
1291
* polled for changes. Whenever the caller would like to switch to another
1292
* file, it will call this function to check whether it should be allowed to do so.
1294
* This function returns true if the current state of this LogProto would
1295
* allow preemption, e.g. the contents of the current buffer can be
1299
log_proto_text_server_is_preemptable(LogProtoTextServer *self)
1301
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
1302
gboolean preemptable;
1304
preemptable = (state->buffer_cached_eol == 0);
1305
log_proto_buffered_server_put_state(&self->super);
1310
log_proto_text_server_prepare(LogProto *s, gint *fd, GIOCondition *cond)
1312
LogProtoTextServer *self = (LogProtoTextServer *) s;
1313
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
1316
if (log_proto_buffered_server_prepare(s, fd, cond))
1318
log_proto_buffered_server_put_state(&self->super);
1322
avail = (state->buffer_cached_eol != 0);
1323
log_proto_buffered_server_put_state(&self->super);
1328
* Find the character terminating the buffer.
1330
* NOTE: when looking for the end-of-message here, it either needs to be
1331
* terminated via NUL or via NL, when terminating via NL we have to make
1332
* sure that there's no NUL left in the message. This function iterates over
1333
* the input data and returns a pointer to the first occurence of NL or NUL.
1335
* It uses an algorithm similar to what there's in libc memchr/strchr.
1337
* NOTE: find_eom is not static as it is used by a unit test program.
1340
find_eom(const guchar *s, gsize n)
1342
const guchar *char_ptr;
1343
const gulong *longword_ptr;
1344
gulong longword, magic_bits, charmask;
1349
/* align input to long boundary */
1350
for (char_ptr = s; n > 0 && ((gulong) char_ptr & (sizeof(longword) - 1)) != 0; ++char_ptr, n--)
1352
if (*char_ptr == c || *char_ptr == '\0')
1356
longword_ptr = (gulong *) char_ptr;
1358
#if GLIB_SIZEOF_LONG == 8
1359
magic_bits = 0x7efefefefefefeffL;
1360
#elif GLIB_SIZEOF_LONG == 4
1361
magic_bits = 0x7efefeffL;
1363
#error "unknown architecture"
1365
memset(&charmask, c, sizeof(charmask));
1367
while (n > sizeof(longword))
1369
longword = *longword_ptr++;
1370
if ((((longword + magic_bits) ^ ~longword) & ~magic_bits) != 0 ||
1371
((((longword ^ charmask) + magic_bits) ^ ~(longword ^ charmask)) & ~magic_bits) != 0)
1375
char_ptr = (const guchar *) (longword_ptr - 1);
1377
for (i = 0; i < sizeof(longword); i++)
1379
if (*char_ptr == c || *char_ptr == '\0')
1384
n -= sizeof(longword);
1387
char_ptr = (const guchar *) longword_ptr;
1391
if (*char_ptr == c || *char_ptr == '\0')
1401
const gchar *prefix;
1403
} fixed_encodings[] = {
1416
{ "wchar_t", sizeof(wchar_t) },
1421
* returns the number of bytes that represent the UTF8 encoding buffer
1422
* in the original encoding that the user specified.
1424
* NOTE: this is slow, but we only call this for the remainder of our
1425
* buffer (e.g. the partial line at the end of our last chunk of read
1426
* data). Also, this is only invoked if the file uses an encoding.
1429
log_proto_text_server_get_raw_size_of_buffer(LogProtoTextServer *self, const guchar *buffer, gsize buffer_len)
1433
gsize avail_out, avail_in;
1436
if (self->reverse_convert == ((GIConv) -1) && !self->convert_scale)
1440
/* try to speed up raw size calculation by recognizing the most
1441
* prominent character encodings and in the case the encoding
1442
* uses fixed size characters set that in self->convert_scale,
1443
* which in turn will speed up the reversal of the UTF8 buffer
1444
* size to raw buffer sizes.
1446
for (i = 0; fixed_encodings[i].prefix; i++)
1448
if (strncasecmp(self->super.super.encoding, fixed_encodings[i].prefix, strlen(fixed_encodings[i].prefix) == 0))
1450
self->convert_scale = fixed_encodings[i].scale;
1454
if (!fixed_encodings[i].prefix)
1456
self->reverse_convert = g_iconv_open(self->super.super.encoding, "utf-8");
1460
if (self->convert_scale)
1461
return g_utf8_strlen((gchar *) buffer, buffer_len) * self->convert_scale;
1463
if (self->reverse_buffer_len < buffer_len * 6)
1465
/* we free and malloc, since we never need the data still in reverse buffer */
1466
g_free(self->reverse_buffer);
1467
self->reverse_buffer_len = buffer_len * 6;
1468
self->reverse_buffer = g_malloc(buffer_len * 6);
1471
avail_out = self->reverse_buffer_len;
1472
out = self->reverse_buffer;
1474
avail_in = buffer_len;
1477
ret = g_iconv(self->reverse_convert, (gchar **) &in, &avail_in, &out, &avail_out);
1478
if (ret == (gsize) -1)
1480
/* oops, we cannot reverse that we ourselves converted to UTF-8,
1481
* this is simply impossible, but never say never */
1482
msg_error("Internal error, couldn't reverse the internal UTF8 string to the original encoding",
1483
evt_tag_printf("buffer", "%.*s", (gint) buffer_len, buffer),
1489
return self->reverse_buffer_len - avail_out;
1495
* log_proto_text_server_fetch_from_buf:
1496
* @self: LogReader instance
1497
* @saddr: socket address to be assigned to new messages (consumed!)
1498
* @flush: whether to flush the input buffer
1499
* @msg_counter: the number of messages processed in the current poll iteration
1501
* Returns TRUE if a message was found in the buffer, FALSE if we need to read again.
1504
log_proto_text_server_fetch_from_buf(LogProtoBufferedServer *s, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
1506
LogProtoTextServer *self = (LogProtoTextServer *) s;
1508
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
1509
gboolean result = FALSE;
1514
* we are set to packet terminating mode or the connection is to
1515
* be teared down and we have partial data in our buffer.
1517
*msg = buffer_start;
1518
*msg_len = buffer_bytes;
1519
state->pending_buffer_pos = state->pending_buffer_end;
1523
if (state->buffer_cached_eol)
1525
/* previous invocation was nice enough to save a cached EOL
1526
* pointer, no need to look it up again */
1528
eol = self->super.buffer + state->buffer_cached_eol;
1529
state->buffer_cached_eol = 0;
1533
eol = find_eom(buffer_start, buffer_bytes);
1535
if ((!eol && (buffer_bytes == state->buffer_size)))
1537
/* our buffer is full and no EOL was found */
1538
*msg_len = buffer_bytes;
1539
state->pending_buffer_pos = state->pending_buffer_end;
1540
*msg = buffer_start;
1545
gsize raw_split_size;
1547
/* buffer is not full, but no EOL is present, move partial line
1548
* to the beginning of the buffer to make space for new data.
1551
memmove(self->super.buffer, buffer_start, buffer_bytes);
1552
state->pending_buffer_pos = 0;
1553
state->pending_buffer_end = buffer_bytes;
1555
if (G_UNLIKELY(self->super.super.flags & LPBS_POS_TRACKING))
1557
/* NOTE: we modify the current file position _after_ updating
1558
buffer_pos, since if we crash right here, at least we
1559
won't lose data on the next restart, but rather we
1560
duplicate some data */
1562
if (self->super.super.encoding)
1563
raw_split_size = log_proto_text_server_get_raw_size_of_buffer(self, buffer_start, buffer_bytes);
1565
raw_split_size = buffer_bytes;
1567
state->pending_raw_stream_pos += (gint64) (state->pending_raw_buffer_size - raw_split_size);
1568
state->pending_raw_buffer_size = raw_split_size;
1570
msg_trace("Buffer split",
1571
evt_tag_int("raw_split_size", raw_split_size),
1572
evt_tag_int("buffer_bytes", buffer_bytes),
1579
const guchar *msg_end = eol;
1581
/* eol points at the newline character. end points at the
1582
* character terminating the line, which may be a carriage
1583
* return preceeding the newline. */
1585
while ((msg_end > buffer_start) && (msg_end[-1] == '\r' || msg_end[-1] == '\n' || msg_end[-1] == 0))
1588
*msg_len = msg_end - buffer_start;
1589
*msg = buffer_start;
1590
state->pending_buffer_pos = eol + 1 - self->super.buffer;
1592
if (state->pending_buffer_end != state->pending_buffer_pos)
1595
/* store the end of the next line, it indicates whether we need
1596
* to read further data, or the buffer already contains a
1598
eom = find_eom(self->super.buffer + state->pending_buffer_pos, state->pending_buffer_end - state->pending_buffer_pos);
1600
state->buffer_cached_eol = eom - self->super.buffer;
1602
state->buffer_cached_eol = 0;
1606
state->pending_buffer_pos = state->pending_buffer_end;
1613
log_proto_buffered_server_put_state(&self->super);
1618
log_proto_text_server_free(LogProtoTextServer *self)
1620
if (self->reverse_convert != (GIConv) -1)
1621
g_iconv_close(self->reverse_convert);
1623
g_free(self->reverse_buffer);
1624
log_proto_buffered_server_free(&self->super.super);
1628
log_proto_text_server_init(LogProtoTextServer *self, LogTransport *transport, gint max_msg_size, guint flags)
1630
log_proto_buffered_server_init(&self->super, transport, max_msg_size * 6, max_msg_size, flags);
1631
self->super.fetch_from_buf = log_proto_text_server_fetch_from_buf;
1632
self->super.super.prepare = log_proto_text_server_prepare;
1633
self->reverse_convert = (GIConv) -1;
1637
log_proto_text_server_new(LogTransport *transport, gint max_msg_size, guint flags)
1639
LogProtoTextServer *self = g_new0(LogProtoTextServer, 1);
1641
log_proto_text_server_init(self, transport, max_msg_size, flags);
1642
return &self->super.super;
1645
/* proto that reads the stream in even sized chunks */
1646
typedef struct _LogProtoRecordServer LogProtoRecordServer;
1647
struct _LogProtoRecordServer
1649
LogProtoBufferedServer super;
1654
log_proto_record_server_fetch_from_buf(LogProtoBufferedServer *s, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
1656
LogProtoRecordServer *self = (LogProtoRecordServer *) s;
1657
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(s);
1660
if (!(self->super.super.flags & LPRS_BINARY))
1662
eol = find_eom(buffer_start, buffer_bytes);
1663
*msg_len = (eol ? eol - buffer_start : buffer_bytes);
1667
*msg_len = buffer_bytes;
1669
state->pending_buffer_pos = state->pending_buffer_end;
1670
*msg = buffer_start;
1671
log_proto_buffered_server_put_state(s);
1676
log_proto_record_server_read_data(LogProtoBufferedServer *s, guchar *buf, gsize len, GSockAddr **sa)
1678
LogProtoRecordServer *self = (LogProtoRecordServer *) s;
1681
g_assert(len <= self->record_size);
1682
len = self->record_size;
1683
rc = log_transport_read(self->super.super.transport, buf, len, sa);
1684
if (rc > 0 && rc != self->record_size)
1686
msg_error("Padding was set, and couldn't read enough bytes",
1687
evt_tag_int(EVT_TAG_FD, self->super.super.transport->fd),
1688
evt_tag_int("padding", self->record_size),
1689
evt_tag_int("read", rc),
1698
log_proto_record_server_new(LogTransport *transport, gint record_size, guint flags)
1700
LogProtoRecordServer *self = g_new0(LogProtoRecordServer, 1);
1702
log_proto_buffered_server_init(&self->super, transport, record_size * 6, record_size, flags);
1703
self->super.fetch_from_buf = log_proto_record_server_fetch_from_buf;
1704
self->super.read_data = log_proto_record_server_read_data;
1705
self->record_size = record_size;
1706
return &self->super.super;
1709
/* proto that reads the stream in even sized chunks */
1710
typedef struct _LogProtoDGramServer LogProtoDGramServer;
1711
struct _LogProtoDGramServer
1713
LogProtoBufferedServer super;
1717
log_proto_dgram_server_fetch_from_buf(LogProtoBufferedServer *s, const guchar *buffer_start, gsize buffer_bytes, const guchar **msg, gsize *msg_len, gboolean flush_the_rest)
1719
LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(s);
1722
* we are set to packet terminating mode
1724
*msg = buffer_start;
1725
*msg_len = buffer_bytes;
1726
state->pending_buffer_pos = state->pending_buffer_end;
1727
log_proto_buffered_server_put_state(s);
1732
log_proto_dgram_server_new(LogTransport *transport, gint max_msg_size, guint flags)
1734
LogProtoRecordServer *self = g_new0(LogProtoRecordServer, 1);
1736
log_proto_buffered_server_init(&self->super, transport, max_msg_size * 6, max_msg_size, flags | LPBS_IGNORE_EOF);
1737
self->super.fetch_from_buf = log_proto_dgram_server_fetch_from_buf;
1738
return &self->super.super;
1741
#define LPFCS_FRAME_INIT 0
1742
#define LPFCS_FRAME_SEND 1
1743
#define LPFCS_MESSAGE_SEND 2
1745
typedef struct _LogProtoFramedClient
1747
LogProtoTextClient super;
1749
gchar frame_hdr_buf[9];
1750
gint frame_hdr_len, frame_hdr_pos;
1751
} LogProtoFramedClient;
1753
static LogProtoStatus
1754
log_proto_framed_client_post(LogProto *s, guchar *msg, gsize msg_len, gboolean *consumed)
1756
LogProtoFramedClient *self = (LogProtoFramedClient *) s;
1759
if (msg_len > 9999999)
1761
static const guchar *warn_msg;
1763
if (warn_msg != msg)
1765
msg_warning("Error, message length too large for framed protocol, truncated",
1766
evt_tag_int("length", msg_len),
1772
switch (self->state)
1774
case LPFCS_FRAME_INIT:
1775
self->frame_hdr_len = g_snprintf(self->frame_hdr_buf, sizeof(self->frame_hdr_buf), "%" G_GSIZE_FORMAT" ", msg_len);
1776
self->frame_hdr_pos = 0;
1777
self->state = LPFCS_FRAME_SEND;
1778
case LPFCS_FRAME_SEND:
1779
rc = log_transport_write(s->transport, &self->frame_hdr_buf[self->frame_hdr_pos], self->frame_hdr_len - self->frame_hdr_pos);
1782
if (errno != EAGAIN)
1784
msg_error("I/O error occurred while writing",
1785
evt_tag_int("fd", self->super.super.transport->fd),
1786
evt_tag_errno(EVT_TAG_OSERROR, errno),
1794
self->frame_hdr_pos += rc;
1795
if (self->frame_hdr_pos != self->frame_hdr_len)
1797
self->state = LPFCS_MESSAGE_SEND;
1799
case LPFCS_MESSAGE_SEND:
1800
rc = log_proto_text_client_post(s, msg, msg_len, consumed);
1802
/* NOTE: we don't check *consumed here, as we might have a pending
1803
* message in self->partial before we begin, in which case *consumed
1806
if (rc == LPS_SUCCESS && self->super.partial == NULL)
1808
self->state = LPFCS_FRAME_INIT;
1812
g_assert_not_reached();
1818
log_proto_framed_client_new(LogTransport *transport)
1820
LogProtoFramedClient *self = g_new0(LogProtoFramedClient, 1);
1822
self->super.super.prepare = log_proto_text_client_prepare;
1823
self->super.super.post = log_proto_framed_client_post;
1824
self->super.super.flush = log_proto_text_client_flush;
1825
self->super.super.transport = transport;
1826
self->super.super.convert = (GIConv) -1;
1827
return &self->super.super;
1830
#define LPFSS_FRAME_READ 0
1831
#define LPFSS_MESSAGE_READ 1
1833
#define LPFS_FRAME_BUFFER 10
1835
typedef struct _LogProtoFramedServer
1841
gsize buffer_size, buffer_pos, buffer_end;
1844
gboolean half_message_in_buffer;
1845
GSockAddr *prev_saddr;
1846
LogProtoStatus status;
1847
} LogProtoFramedServer;
1850
log_proto_framed_server_prepare(LogProto *s, gint *fd, GIOCondition *cond)
1852
LogProtoFramedServer *self = (LogProtoFramedServer *) s;
1854
*fd = self->super.transport->fd;
1855
*cond = self->super.transport->cond;
1857
/* there is a half message in our buffer so try to wait */
1858
if (!self->half_message_in_buffer)
1860
if (self->buffer_pos != self->buffer_end)
1862
/* we have a full message in our buffer so parse it without reading new data from the transport layer */
1867
/* if there's no pending I/O in the transport layer, then we want to do a read */
1874
static LogProtoStatus
1875
log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_read)
1879
if (self->buffer_pos == self->buffer_end)
1880
self->buffer_pos = self->buffer_end = 0;
1882
if (self->buffer_size == self->buffer_end)
1884
/* no more space in the buffer, we can't fetch further data. Move the
1885
* things we already have to the beginning of the buffer to make
1888
memmove(self->buffer, &self->buffer[self->buffer_pos], self->buffer_end - self->buffer_pos);
1889
self->buffer_end = self->buffer_end - self->buffer_pos;
1890
self->buffer_pos = 0;
1896
rc = log_transport_read(self->super.transport, &self->buffer[self->buffer_end], self->buffer_size - self->buffer_end, NULL);
1900
if (errno != EAGAIN)
1902
msg_error("Error reading frame header",
1903
evt_tag_int("fd", self->super.transport->fd),
1904
evt_tag_errno("error", errno),
1910
/* we need more data to parse this message but the data is not available yet */
1911
self->half_message_in_buffer = TRUE;
1916
msg_verbose("EOF occurred while reading",
1917
evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
1923
self->buffer_end += rc;
1930
log_proto_framed_server_extract_frame_length(LogProtoFramedServer *self, gboolean *need_more_data)
1934
*need_more_data = TRUE;
1935
self->frame_len = 0;
1936
for (i = self->buffer_pos; i < self->buffer_end; i++)
1938
if (isdigit(self->buffer[i]) && (i - self->buffer_pos < 10))
1940
self->frame_len = self->frame_len * 10 + (self->buffer[i] - '0');
1942
else if (self->buffer[i] == ' ')
1944
*need_more_data = FALSE;
1945
self->buffer_pos = i + 1;
1950
msg_error("Invalid frame header",
1951
evt_tag_printf("header", "%.*s", (gint) (i - self->buffer_pos), &self->buffer[self->buffer_pos]),
1956
/* couldn't extract frame header, no error but need more data */
1960
static LogProtoStatus
1961
log_proto_framed_server_fetch(LogProto *s, const guchar **msg, gsize *msg_len, GSockAddr **sa, gboolean *may_read)
1963
LogProtoFramedServer *self = (LogProtoFramedServer *) s;
1964
LogProtoStatus status;
1965
gboolean try_read, need_more_data;
1969
switch (self->state)
1971
case LPFSS_FRAME_READ:
1976
if (!log_proto_framed_server_extract_frame_length(self, &need_more_data))
1978
/* invalid frame header */
1981
if (need_more_data && try_read)
1983
status = log_proto_framed_server_fetch_data(self, may_read);
1984
if (status != LPS_SUCCESS)
1990
if (!need_more_data)
1992
self->state = LPFSS_MESSAGE_READ;
1993
if (self->frame_len > self->max_msg_size)
1995
msg_error("Incoming frame larger than log_msg_size()",
1996
evt_tag_int("log_msg_size", self->buffer_size - LPFS_FRAME_BUFFER),
1997
evt_tag_int("frame_length", self->frame_len),
2001
if (self->buffer_pos + self->frame_len > self->buffer_size)
2003
/* message would be too large to fit into the buffer at
2004
* buffer_pos, we need to move data to the beginning of
2005
* the buffer to make space, and once we are at it, move
2006
* to the beginning to make space for the maximum number
2007
* of messages before the next shift */
2008
memmove(self->buffer, &self->buffer[self->buffer_pos], self->buffer_end - self->buffer_pos);
2009
self->buffer_end = self->buffer_end - self->buffer_pos;
2010
self->buffer_pos = 0;
2015
case LPFSS_MESSAGE_READ:
2019
/* NOTE: here we can assume that the complete message fits into
2020
* the buffer because of the checks/move operation in the
2021
* LPFSS_FRAME_READ state */
2022
if (self->buffer_end - self->buffer_pos >= self->frame_len)
2024
/* ok, we already have the complete message */
2026
*msg = &self->buffer[self->buffer_pos];
2027
*msg_len = self->frame_len;
2028
self->buffer_pos += self->frame_len;
2029
self->state = LPFSS_FRAME_READ;
2031
/* we have the full message here so reset the half message flag */
2032
self->half_message_in_buffer = FALSE;
2037
status = log_proto_framed_server_fetch_data(self, may_read);
2038
if (status != LPS_SUCCESS)
2051
log_proto_framed_server_free(LogProto *s)
2053
LogProtoFramedServer *self = (LogProtoFramedServer *) s;
2054
g_free(self->buffer);
2058
log_proto_framed_server_new(LogTransport *transport, gint max_msg_size)
2060
LogProtoFramedServer *self = g_new0(LogProtoFramedServer, 1);
2062
self->super.prepare = log_proto_framed_server_prepare;
2063
self->super.fetch = log_proto_framed_server_fetch;
2064
self->super.free_fn = log_proto_framed_server_free;
2065
self->super.transport = transport;
2066
self->super.convert = (GIConv) -1;
2067
/* max message + frame header */
2068
self->max_msg_size = max_msg_size;
2069
self->buffer_size = max_msg_size + LPFS_FRAME_BUFFER;
2070
self->buffer = g_malloc(self->buffer_size);
2071
self->half_message_in_buffer = FALSE;
2072
return &self->super;