2
* Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary
3
* Copyright (c) 1998-2010 Balázs Scheidler
5
* This library is free software; you can redistribute it and/or
6
* modify it under the terms of the GNU Lesser General Public
7
* License as published by the Free Software Foundation; either
8
* version 2.1 of the License, or (at your option) any later version.
10
* This library is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13
* Lesser General Public License for more details.
15
* You should have received a copy of the GNU Lesser General Public
16
* License along with this library; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* As an additional exemption you are allowed to compile & link against the
20
* OpenSSL libraries as published by the OpenSSL project. See the file
21
* COPYING for details.
25
#include "logwriter.h"
30
#include "str-format.h"
46
/* business as usual, flush when the buffer is full */
48
/* flush the buffer immediately please */
50
/* pull off any queued items, at maximum speed, even ignoring throttle, and flush the buffer too */
60
StatsCounterItem *dropped_messages;
61
StatsCounterItem *suppressed_messages;
62
StatsCounterItem *processed_messages;
63
StatsCounterItem *stored_messages;
65
LogWriterOptions *options;
66
GStaticMutex suppress_lock;
68
guint32 last_msg_count;
74
gchar *stats_instance;
76
struct iv_fd fd_watch;
77
struct iv_timer suspend_timer;
78
struct iv_task immed_io_task;
79
struct iv_event queue_filled;
80
MainLoopIOWorkerJob io_job;
81
struct iv_timer suppress_timer;
82
struct timespec suppress_timer_expires;
83
gboolean suppress_timer_updated;
86
LogProto *proto, *pending_proto;
87
gboolean watches_running:1, suspended:1, working:1, flush_waiting_for_timeout:1;
88
gboolean pending_proto_present;
89
GCond *pending_proto_cond;
90
GStaticMutex pending_proto_lock;
97
* LogWriter is a core element of syslog-ng sending messages out to some
98
* kind of destination represented by a UNIX fd. Outgoing messages are sent
99
* to the target asynchronously, first by placing them to a queue and then
100
* sending messages when poll() indicates that the fd is writable.
105
* For a simple log writer without a disk buffer messages are placed on a
106
* GQueue and they are acknowledged when the send() system call returned
107
* success. This is more complex when disk buffering is used, in which case
108
* messages are put to the "disk buffer" first and acknowledged immediately.
109
* (this way the reader never stops when the disk buffer area is not yet
110
* full). When disk buffer reaches its limit, messages are added to the the
111
* usual GQueue and messages get acknowledged when they are moved to the
116
static gboolean log_writer_flush(LogWriter *self, LogWriterFlushMode flush_mode);
117
static void log_writer_broken(LogWriter *self, gint notify_code);
118
static void log_writer_start_watches(LogWriter *self);
119
static void log_writer_stop_watches(LogWriter *self);
120
static void log_writer_update_watches(LogWriter *self);
121
static void log_writer_suspend(LogWriter *self);
124
log_writer_work_perform(gpointer s)
126
LogWriter *self = (LogWriter *) s;
128
g_assert((self->super.flags & PIF_INITIALIZED) != 0);
129
self->work_result = log_writer_flush(self, self->flush_waiting_for_timeout ? LW_FLUSH_BUFFER : LW_FLUSH_NORMAL);
133
log_writer_work_finished(gpointer s)
135
LogWriter *self = (LogWriter *) s;
137
main_loop_assert_main_thread();
138
self->flush_waiting_for_timeout = FALSE;
140
if (self->pending_proto_present)
142
/* pending proto is only set in the main thread, so no need to
143
* lock it before coming here. After we're syncing with the
144
* log_writer_reopen() call, quite possibly coming from a
145
* non-main thread. */
147
g_static_mutex_lock(&self->pending_proto_lock);
149
log_proto_free(self->proto);
151
self->proto = self->pending_proto;
152
self->pending_proto = NULL;
153
self->pending_proto_present = FALSE;
155
g_cond_signal(self->pending_proto_cond);
156
g_static_mutex_unlock(&self->pending_proto_lock);
159
if (!self->work_result)
161
log_writer_broken(self, NC_WRITE_ERROR);
164
log_writer_suspend(self);
165
msg_notice("Suspending write operation because of an I/O error",
166
evt_tag_int("fd", log_proto_get_fd(self->proto)),
167
evt_tag_int("time_reopen", self->options->time_reopen),
173
if ((self->super.flags & PIF_INITIALIZED) && self->proto)
175
/* reenable polling the source, but only if we're still initialized */
176
log_writer_start_watches(self);
180
log_pipe_unref(&self->super);
184
log_writer_io_flush_output(gpointer s)
186
LogWriter *self = (LogWriter *) s;
188
main_loop_assert_main_thread();
190
log_writer_stop_watches(self);
191
log_pipe_ref(&self->super);
192
if ((self->options->options & LWO_THREADED))
194
main_loop_io_worker_job_submit(&self->io_job);
198
log_writer_work_perform(s);
199
log_writer_work_finished(s);
204
log_writer_io_error(gpointer s)
206
LogWriter *self = (LogWriter *) s;
208
if (self->fd_watch.handler_out == NULL && self->fd_watch.handler_in == NULL)
210
msg_debug("POLLERR occurred while idle",
211
evt_tag_int("fd", log_proto_get_fd(self->proto)),
213
log_writer_broken(self, NC_WRITE_ERROR);
218
/* in case we have an error state but we also asked for read/write
219
* polling, the error should be handled by the I/O callback. But we
220
* need not call that explicitly as ivykis does that for us. */
222
log_writer_update_watches(self);
226
log_writer_io_check_eof(gpointer s)
228
LogWriter *self = (LogWriter *) s;
230
msg_error("EOF occurred while idle",
231
evt_tag_int("fd", log_proto_get_fd(self->proto)),
233
log_writer_broken(self, NC_CLOSE);
237
log_writer_error_suspend_elapsed(gpointer s)
239
LogWriter *self = (LogWriter *) s;
241
self->suspended = FALSE;
242
msg_notice("Error suspend timeout has elapsed, attempting to write again",
243
evt_tag_int("fd", log_proto_get_fd(self->proto)),
245
log_writer_update_watches(self);
249
log_writer_update_fd_callbacks(LogWriter *self, GIOCondition cond)
251
main_loop_assert_main_thread();
252
if (self->pollable_state > 0)
254
if (self->flags & LW_DETECT_EOF && (cond & G_IO_IN) == 0 && (cond & G_IO_OUT))
256
/* if output is enabled, and we're in DETECT_EOF mode, and input is
257
* not needed by the log protocol, install the eof check callback to
258
* destroy the connection if an EOF is received. */
260
iv_fd_set_handler_in(&self->fd_watch, log_writer_io_check_eof);
262
else if (cond & G_IO_IN)
264
/* in case the protocol requested G_IO_IN, it means that it needs to
265
* invoke read in the flush code, so just install the flush_output
266
* handler for input */
268
iv_fd_set_handler_in(&self->fd_watch, log_writer_io_flush_output);
272
/* otherwise we're not interested in input */
273
iv_fd_set_handler_in(&self->fd_watch, NULL);
276
iv_fd_set_handler_out(&self->fd_watch, log_writer_io_flush_output);
278
iv_fd_set_handler_out(&self->fd_watch, NULL);
280
iv_fd_set_handler_err(&self->fd_watch, log_writer_io_error);
284
/* fd is not pollable, assume it is always writable */
287
if (!iv_task_registered(&self->immed_io_task))
288
iv_task_register(&self->immed_io_task);
290
else if (iv_task_registered(&self->immed_io_task))
292
iv_task_unregister(&self->immed_io_task);
298
log_writer_arm_suspend_timer(LogWriter *self, void (*handler)(void *), gint timeout_msec)
300
if (iv_timer_registered(&self->suspend_timer))
301
iv_timer_unregister(&self->suspend_timer);
303
self->suspend_timer.handler = handler;
304
self->suspend_timer.expires = iv_now;
305
timespec_add_msec(&self->suspend_timer.expires, timeout_msec);
306
iv_timer_register(&self->suspend_timer);
310
log_writer_queue_filled(gpointer s)
312
LogWriter *self = (LogWriter *) s;
314
main_loop_assert_main_thread();
317
* NOTE: This theory is somewhat questionable, e.g. I'm not 100% sure it
318
* is the right scenario, but the race was closed. So take this with a
321
* The queue_filled callback is running in the main thread. Because of the
322
* possible delay caused by iv_event_post() the callback might be
323
* delivered event after stop_watches() has been called.
325
* - log_writer_schedule_update_watches() is called by the reader
326
* thread, which calls iv_event_post()
327
* - the main thread calls stop_watches() in work_perform
328
* - the event is delivered in the main thread
330
* But since stop/start watches always run in the main thread and we do
331
* too, we can check if this is the case. A LogWriter without watches
332
* running is busy writing out data to the destination, e.g. a
333
* start_watches is to be expected once log_writer_work_finished() is run
334
* at the end of the deferred work, executed by the I/O threads.
336
if (self->watches_running)
337
log_writer_update_watches((LogWriter *) s);
340
/* NOTE: runs in the source thread */
342
log_writer_schedule_update_watches(LogWriter *self)
344
iv_event_post(&self->queue_filled);
348
log_writer_suspend(LogWriter *self)
350
/* flush code indicates that we need to suspend our writing activities
351
* until time_reopen elapses */
353
log_writer_arm_suspend_timer(self, log_writer_error_suspend_elapsed, self->options->time_reopen * 1e3);
354
self->suspended = TRUE;
358
log_writer_update_watches(LogWriter *self)
361
GIOCondition cond = 0;
362
gboolean partial_batch;
363
gint timeout_msec = 0;
365
main_loop_assert_main_thread();
367
/* NOTE: we either start the suspend_timer or enable the fd_watch. The two MUST not happen at the same time. */
369
if (log_proto_prepare(self->proto, &fd, &cond) ||
370
self->flush_waiting_for_timeout ||
371
log_queue_check_items(self->queue, self->options->flush_lines, &partial_batch, &timeout_msec,
372
(LogQueuePushNotifyFunc) log_writer_schedule_update_watches, self, NULL))
374
/* flush_lines number of element is already available and throttle would permit us to send. */
375
log_writer_update_fd_callbacks(self, cond);
377
else if (partial_batch || timeout_msec)
379
/* few elements are available, but less than flush_lines, we need to start a timer to initiate a flush */
381
log_writer_update_fd_callbacks(self, 0);
382
self->flush_waiting_for_timeout = TRUE;
383
log_writer_arm_suspend_timer(self, (void (*)(void *)) log_writer_update_watches, timeout_msec ? timeout_msec : self->options->flush_timeout);
387
/* no elements or no throttle space, wait for a wakeup by the queue
388
* when the required number of items are added. see the
389
* log_queue_check_items and its parallel_push argument above
391
log_writer_update_fd_callbacks(self, 0);
396
is_file_regular(gint fd)
400
if (fstat(fd, &st) >= 0)
402
return S_ISREG(st.st_mode);
405
/* if stat fails, that's interesting, but we should probably poll
406
* it, hopefully that's less likely to cause spinning */
412
log_writer_start_watches(LogWriter *self)
417
if (!self->watches_running)
419
log_proto_prepare(self->proto, &fd, &cond);
421
if (self->pollable_state < 0)
423
if (is_file_regular(fd))
424
self->pollable_state = 0;
426
self->pollable_state = iv_fd_pollable(fd);
429
if (self->pollable_state)
431
self->fd_watch.fd = fd;
432
iv_fd_register(&self->fd_watch);
435
log_writer_update_watches(self);
436
self->watches_running = TRUE;
441
log_writer_stop_watches(LogWriter *self)
443
if (self->watches_running)
445
if (iv_timer_registered(&self->suspend_timer))
446
iv_timer_unregister(&self->suspend_timer);
447
if (iv_fd_registered(&self->fd_watch))
448
iv_fd_unregister(&self->fd_watch);
449
if (iv_task_registered(&self->immed_io_task))
450
iv_task_unregister(&self->immed_io_task);
452
log_queue_reset_parallel_push(self->queue);
454
self->watches_running = FALSE;
458
/* function called using main_loop_call() in case the suppress timer needs
461
log_writer_perform_suppress_timer_update(LogWriter *self)
463
main_loop_assert_main_thread();
465
if (iv_timer_registered(&self->suppress_timer))
466
iv_timer_unregister(&self->suppress_timer);
467
g_static_mutex_lock(&self->suppress_lock);
468
self->suppress_timer.expires = self->suppress_timer_expires;
469
self->suppress_timer_updated = TRUE;
470
g_static_mutex_unlock(&self->suppress_lock);
471
if (self->suppress_timer.expires.tv_sec > 0)
472
iv_timer_register(&self->suppress_timer);
473
log_pipe_unref(&self->super);
477
* Update the suppress timer in a deferred manner, possibly batching the
478
* results of multiple updates to the suppress timer. This is necessary as
479
* suppress timer updates must run in the main thread, and updating it every
480
* time a new message comes in would cause enormous latency in the fast
481
* path. By collecting multiple updates
483
* msec == 0 means to turn off the suppress timer
484
* msec > 0 to enable the timer with the specified timeout
486
* NOTE: suppress_lock must be held.
489
log_writer_update_suppress_timer(LogWriter *self, glong sec)
492
struct timespec next_expires;
496
/* we deliberately use nsec == 0 in order to increase the likelyhood that
497
* we target the same second, in case only a fraction of a second has
498
* passed between two updates. */
501
next_expires.tv_nsec = 0;
502
next_expires.tv_sec = iv_now.tv_sec + sec;
506
next_expires.tv_sec = 0;
507
next_expires.tv_nsec = 0;
509
/* last update was finished, we need to invoke the updater again */
510
invoke = ((next_expires.tv_sec != self->suppress_timer_expires.tv_sec) || (next_expires.tv_nsec != self->suppress_timer_expires.tv_nsec)) && self->suppress_timer_updated;
511
self->suppress_timer_updated = FALSE;
515
self->suppress_timer_expires = next_expires;
516
g_static_mutex_unlock(&self->suppress_lock);
517
log_pipe_ref(&self->super);
518
main_loop_call((void *(*)(void *)) log_writer_perform_suppress_timer_update, self, FALSE);
519
g_static_mutex_lock(&self->suppress_lock);
525
* NOTE: suppress_lock must be held.
528
log_writer_last_msg_release(LogWriter *self)
530
log_writer_update_suppress_timer(self, 0);
532
log_msg_unref(self->last_msg);
534
self->last_msg = NULL;
535
self->last_msg_count = 0;
539
* NOTE: suppress_lock must be held.
542
log_writer_last_msg_flush(LogWriter *self)
545
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
551
msg_debug("Suppress timer elapsed, emitting suppression summary",
554
getlonghostname(hostname, sizeof(hostname));
555
m = log_msg_new_empty();
556
m->timestamps[LM_TS_STAMP] = m->timestamps[LM_TS_RECVD];
557
m->pri = self->last_msg->pri;
558
m->flags = LF_INTERNAL | LF_LOCAL;
560
p = log_msg_get_value(self->last_msg, LM_V_HOST, &len);
561
log_msg_set_value(m, LM_V_HOST, p, len);
562
p = log_msg_get_value(self->last_msg, LM_V_PROGRAM, &len);
563
log_msg_set_value(m, LM_V_PROGRAM, p, len);
565
len = g_snprintf(buf, sizeof(buf), "Last message '%.20s' repeated %d times, suppressed by syslog-ng on %s",
566
log_msg_get_value(self->last_msg, LM_V_MESSAGE, NULL),
567
self->last_msg_count,
569
log_msg_set_value(m, LM_V_MESSAGE, buf, len);
571
path_options.ack_needed = FALSE;
573
log_queue_push_tail(self->queue, m, &path_options);
574
log_writer_last_msg_release(self);
579
* Remember the last message for dup detection.
581
* NOTE: suppress_lock must be held.
584
log_writer_last_msg_record(LogWriter *self, LogMessage *lm)
587
log_msg_unref(self->last_msg);
591
self->last_msg_count = 0;
595
log_writer_last_msg_timer(gpointer pt)
597
LogWriter *self = (LogWriter *) pt;
599
main_loop_assert_main_thread();
601
g_static_mutex_lock(&self->suppress_lock);
602
log_writer_last_msg_flush(self);
603
g_static_mutex_unlock(&self->suppress_lock);
609
* log_writer_last_msg_check:
611
* This function is called to suppress duplicate messages from a given host.
613
* Returns TRUE to indicate that the message was consumed.
616
log_writer_last_msg_check(LogWriter *self, LogMessage *lm, const LogPathOptions *path_options)
618
g_static_mutex_lock(&self->suppress_lock);
621
if (self->last_msg->timestamps[LM_TS_RECVD].tv_sec >= lm->timestamps[LM_TS_RECVD].tv_sec - self->options->suppress &&
622
strcmp(log_msg_get_value(self->last_msg, LM_V_MESSAGE, NULL), log_msg_get_value(lm, LM_V_MESSAGE, NULL)) == 0 &&
623
strcmp(log_msg_get_value(self->last_msg, LM_V_HOST, NULL), log_msg_get_value(lm, LM_V_HOST, NULL)) == 0 &&
624
strcmp(log_msg_get_value(self->last_msg, LM_V_PROGRAM, NULL), log_msg_get_value(lm, LM_V_PROGRAM, NULL)) == 0 &&
625
strcmp(log_msg_get_value(self->last_msg, LM_V_PID, NULL), log_msg_get_value(lm, LM_V_PID, NULL)) == 0 &&
626
strcmp(log_msg_get_value(lm, LM_V_MESSAGE, NULL), "-- MARK --") != 0)
628
stats_counter_inc(self->suppressed_messages);
629
self->last_msg_count++;
631
if (self->last_msg_count == 1)
633
/* we only create the timer if this is the first suppressed message, otherwise it is already running. */
635
log_writer_update_suppress_timer(self, self->options->suppress);
637
g_static_mutex_unlock(&self->suppress_lock);
639
msg_debug("Suppressing duplicate message",
640
evt_tag_str("host", log_msg_get_value(lm, LM_V_HOST, NULL)),
641
evt_tag_str("msg", log_msg_get_value(lm, LM_V_MESSAGE, NULL)),
643
log_msg_drop(lm, path_options);
647
if (self->last_msg_count)
648
log_writer_last_msg_flush(self);
650
log_writer_last_msg_release(self);
653
log_writer_last_msg_record(self, lm);
654
g_static_mutex_unlock(&self->suppress_lock);
658
/* NOTE: runs in the reader thread */
660
log_writer_queue(LogPipe *s, LogMessage *lm, const LogPathOptions *path_options, gpointer user_data)
662
LogWriter *self = (LogWriter *) s;
663
LogPathOptions local_options;
665
if (!path_options->flow_control_requested &&
666
(self->suspended || !(self->flags & LW_SOFT_FLOW_CONTROL)))
668
/* NOTE: this code ACKs the message back if there's a write error in
669
* order not to hang the client in case of a disk full */
671
path_options = log_msg_break_ack(lm, path_options, &local_options);
673
if (self->options->suppress > 0 && log_writer_last_msg_check(self, lm, path_options))
676
stats_counter_inc(self->processed_messages);
677
log_queue_push_tail(self->queue, lm, path_options);
681
log_writer_append_value(GString *result, LogMessage *lm, NVHandle handle, gboolean use_nil, gboolean append_space)
686
value = log_msg_get_value(lm, handle, &value_len);
687
if (use_nil && value_len == 0)
688
g_string_append_c(result, '-');
693
space = strchr(value, ' ');
696
g_string_append_len(result, value, value_len);
698
g_string_append_len(result, value, space - value);
701
g_string_append_c(result, ' ');
705
log_writer_do_padding(LogWriter *self, GString *result)
707
if (!self->options->padding)
710
if(G_UNLIKELY(self->options->padding < result->len))
712
msg_warning("Padding is too small to hold the full message",
713
evt_tag_int("padding", self->options->padding),
714
evt_tag_int("msg_size", result->len),
716
g_string_set_size(result, self->options->padding);
719
/* store the original length of the result */
720
gint len = result->len;
721
gint padd_bytes = self->options->padding - result->len;
722
/* set the size to the padded size, this will allocate the string */
723
g_string_set_size(result, self->options->padding);
724
memset(result->str + len - 1, '\0', padd_bytes);
728
log_writer_format_log(LogWriter *self, LogMessage *lm, GString *result)
730
LogTemplate *template = NULL;
733
static NVHandle meta_seqid = 0;
736
meta_seqid = log_msg_get_value_handle(".SDATA.meta.sequenceId");
738
if (lm->flags & LF_LOCAL)
740
seq_num = self->seq_num;
747
seqid = log_msg_get_value(lm, meta_seqid, &seqid_length);
748
APPEND_ZERO(seqid, seqid, seqid_length);
750
seq_num = strtol(seqid, NULL, 10);
755
/* no template was specified, use default */
756
stamp = &lm->timestamps[LM_TS_STAMP];
758
g_string_truncate(result, 0);
760
if ((self->flags & LW_SYSLOG_PROTOCOL) || (self->options->options & LWO_SYSLOG_PROTOCOL))
764
/* we currently hard-wire version 1 */
765
g_string_append_c(result, '<');
766
format_uint32_padded(result, 0, 0, 10, lm->pri);
767
g_string_append_c(result, '>');
768
g_string_append_c(result, '1');
769
g_string_append_c(result, ' ');
771
log_stamp_append_format(stamp, result, TS_FMT_ISO,
772
time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->tv_sec),
773
self->options->template_options.frac_digits);
774
g_string_append_c(result, ' ');
776
log_writer_append_value(result, lm, LM_V_HOST, TRUE, TRUE);
777
log_writer_append_value(result, lm, LM_V_PROGRAM, TRUE, TRUE);
778
log_writer_append_value(result, lm, LM_V_PID, TRUE, TRUE);
779
log_writer_append_value(result, lm, LM_V_MSGID, TRUE, TRUE);
782
if (lm->flags & LF_LOCAL)
784
gchar sequence_id[16];
786
g_snprintf(sequence_id, sizeof(sequence_id), "%d", seq_num);
787
log_msg_update_sdata(lm, "meta", "sequenceId", sequence_id);
791
log_msg_append_format_sdata(lm, result, seq_num);
792
if (len == result->len)
794
/* NOTE: sd_param format did not generate any output, take it as an empty SD string */
795
g_string_append_c(result, '-');
798
if (self->options->template)
800
g_string_append_c(result, ' ');
801
if (lm->flags & LF_UTF8)
802
g_string_append_len(result, "\xEF\xBB\xBF", 3);
803
log_template_append_format(self->options->template, lm,
804
&self->options->template_options,
814
p = log_msg_get_value(lm, LM_V_MESSAGE, &len);
815
g_string_append_c(result, ' ');
818
if (lm->flags & LF_UTF8)
819
g_string_append_len(result, "\xEF\xBB\xBF", 3);
821
g_string_append_len(result, p, len);
824
g_string_append_c(result, '\n');
825
log_writer_do_padding(self, result);
830
if (self->options->template)
832
template = self->options->template;
834
else if (self->flags & LW_FORMAT_FILE)
836
template = self->options->file_template;
838
else if ((self->flags & LW_FORMAT_PROTO))
840
template = self->options->proto_template;
845
log_template_format(template, lm,
846
&self->options->template_options,
857
if (self->flags & LW_FORMAT_FILE)
859
log_stamp_format(stamp, result, self->options->template_options.ts_format,
860
time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->tv_sec),
861
self->options->template_options.frac_digits);
863
else if (self->flags & LW_FORMAT_PROTO)
865
g_string_append_c(result, '<');
866
format_uint32_padded(result, 0, 0, 10, lm->pri);
867
g_string_append_c(result, '>');
869
/* always use BSD timestamp by default, the use can override this using a custom template */
870
log_stamp_append_format(stamp, result, TS_FMT_BSD,
871
time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->tv_sec),
872
self->options->template_options.frac_digits);
874
g_string_append_c(result, ' ');
876
p = log_msg_get_value(lm, LM_V_HOST, &len);
877
g_string_append_len(result, p, len);
878
g_string_append_c(result, ' ');
880
if ((lm->flags & LF_LEGACY_MSGHDR))
882
p = log_msg_get_value(lm, LM_V_LEGACY_MSGHDR, &len);
883
g_string_append_len(result, p, len);
887
p = log_msg_get_value(lm, LM_V_PROGRAM, &len);
890
g_string_append_len(result, p, len);
891
p = log_msg_get_value(lm, LM_V_PID, &len);
894
g_string_append_c(result, '[');
895
g_string_append_len(result, p, len);
896
g_string_append_c(result, ']');
898
g_string_append_len(result, ": ", 2);
901
p = log_msg_get_value(lm, LM_V_MESSAGE, &len);
902
g_string_append_len(result, p, len);
903
g_string_append_c(result, '\n');
904
log_writer_do_padding(self, result);
907
if (self->options->options & LWO_NO_MULTI_LINE)
912
/* NOTE: the size is calculated to leave trailing new line */
913
while ((p = find_cr_or_lf(p, result->str + result->len - p - 1)))
923
log_writer_broken(LogWriter *self, gint notify_code)
925
log_writer_stop_watches(self);
926
log_pipe_notify(self->control, &self->super, notify_code, self);
930
* Write messages to the underlying file descriptor using the installed
931
* LogProto instance. This is called whenever the output is ready to accept
932
* further messages, and once during config deinitialization, in order to
933
* flush messages still in the queue, in the hope that most of them can be
936
* In threaded mode, this function is invoked as part of the "output" task
937
* (in essence, this is the function that performs the output task).
939
* @flush_mode specifies how hard LogWriter is trying to send messages to
940
* the actual destination:
943
* LW_FLUSH_NORMAL - business as usual, flush when the buffer is full
944
* LW_FLUSH_BUFFER - flush the buffer immediately please
945
* LW_FLUSH_QUEUE - pull off any queued items, at maximum speed, even
946
* ignoring throttle, and flush the buffer too
950
log_writer_flush(LogWriter *self, LogWriterFlushMode flush_mode)
952
LogProto *proto = self->proto;
954
gboolean ignore_throttle = (flush_mode >= LW_FLUSH_QUEUE);
959
/* NOTE: in case we're reloading or exiting we flush all queued items as
960
* long as the destination can consume it. This is not going to be an
961
* infinite loop, since the reader will cease to produce new messages when
962
* main_loop_io_worker_job_quit() is set. */
964
while (!main_loop_io_worker_job_quit() || flush_mode >= LW_FLUSH_QUEUE)
967
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
968
gboolean consumed = FALSE;
970
if (!log_queue_pop_head(self->queue, &lm, &path_options, FALSE, ignore_throttle))
972
/* no more items are available */
976
log_msg_refcache_start_consumer(lm, &path_options);
979
log_writer_format_log(self, lm, self->line_buffer);
981
if (self->line_buffer->len)
983
LogProtoStatus status;
985
status = log_proto_post(proto, (guchar *) self->line_buffer->str, self->line_buffer->len, &consumed);
986
if (status == LPS_ERROR)
988
if ((self->options->options & LWO_IGNORE_ERRORS) == 0)
990
msg_set_context(NULL);
991
log_msg_refcache_stop();
997
g_free(self->line_buffer->str);
1003
self->line_buffer->str = g_malloc(self->line_buffer->allocated_len);
1004
self->line_buffer->str[0] = 0;
1005
self->line_buffer->len = 0;
1010
if (lm->flags & LF_LOCAL)
1011
step_sequence_number(&self->seq_num);
1012
log_msg_ack(lm, &path_options);
1017
/* push back to the queue */
1018
log_queue_push_head(self->queue, lm, &path_options);
1020
msg_set_context(NULL);
1021
log_msg_refcache_stop();
1025
msg_set_context(NULL);
1026
log_msg_refcache_stop();
1030
if (flush_mode >= LW_FLUSH_BUFFER || count == 0)
1032
if (log_proto_flush(proto) == LPS_ERROR)
1040
log_writer_init_watches(LogWriter *self)
1042
IV_FD_INIT(&self->fd_watch);
1043
self->fd_watch.cookie = self;
1045
IV_TASK_INIT(&self->immed_io_task);
1046
self->immed_io_task.cookie = self;
1047
self->immed_io_task.handler = log_writer_io_flush_output;
1049
IV_TIMER_INIT(&self->suspend_timer);
1050
self->suspend_timer.cookie = self;
1052
IV_TIMER_INIT(&self->suppress_timer);
1053
self->suppress_timer.cookie = self;
1054
self->suppress_timer.handler = (void (*)(void *)) log_writer_last_msg_timer;
1056
IV_EVENT_INIT(&self->queue_filled);
1057
self->queue_filled.cookie = self;
1058
self->queue_filled.handler = log_writer_queue_filled;
1060
main_loop_io_worker_job_init(&self->io_job);
1061
self->io_job.user_data = self;
1062
self->io_job.work = (void (*)(void *)) log_writer_work_perform;
1063
self->io_job.completion = (void (*)(void *)) log_writer_work_finished;
1067
log_writer_init(LogPipe *s)
1069
LogWriter *self = (LogWriter *) s;
1071
g_assert(self->queue != NULL);
1072
iv_event_register(&self->queue_filled);
1074
if ((self->options->options & LWO_NO_STATS) == 0 && !self->dropped_messages)
1077
stats_register_counter(self->stats_level, self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_DROPPED, &self->dropped_messages);
1078
if (self->options->suppress > 0)
1079
stats_register_counter(self->stats_level, self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_SUPPRESSED, &self->suppressed_messages);
1080
stats_register_counter(self->stats_level, self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_PROCESSED, &self->processed_messages);
1082
stats_register_counter(self->stats_level, self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_STORED, &self->stored_messages);
1085
self->suppress_timer_updated = TRUE;
1086
log_queue_set_counters(self->queue, self->stored_messages, self->dropped_messages);
1091
proto = self->proto;
1093
log_writer_reopen(&self->super, proto);
1099
log_writer_deinit(LogPipe *s)
1101
LogWriter *self = (LogWriter *) s;
1103
main_loop_assert_main_thread();
1105
log_queue_reset_parallel_push(self->queue);
1106
log_writer_flush(self, LW_FLUSH_QUEUE);
1107
/* FIXME: by the time we arrive here, it must be guaranteed that no
1108
* _queue() call is running in a different thread, otherwise we'd need
1109
* some kind of locking. */
1111
log_writer_stop_watches(self);
1112
iv_event_unregister(&self->queue_filled);
1114
if (iv_timer_registered(&self->suppress_timer))
1115
iv_timer_unregister(&self->suppress_timer);
1117
log_queue_set_counters(self->queue, NULL, NULL);
1120
stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_DROPPED, &self->dropped_messages);
1121
stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_SUPPRESSED, &self->suppressed_messages);
1122
stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_PROCESSED, &self->processed_messages);
1123
stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_STORED, &self->stored_messages);
1130
log_writer_free(LogPipe *s)
1132
LogWriter *self = (LogWriter *) s;
1135
log_proto_free(self->proto);
1137
if (self->line_buffer)
1138
g_string_free(self->line_buffer, TRUE);
1140
log_queue_unref(self->queue);
1142
log_msg_unref(self->last_msg);
1143
g_free(self->stats_id);
1144
g_free(self->stats_instance);
1145
g_static_mutex_free(&self->suppress_lock);
1146
g_static_mutex_free(&self->pending_proto_lock);
1147
g_cond_free(self->pending_proto_cond);
1149
log_pipe_free_method(s);
1152
/* FIXME: this is inherently racy */
1154
log_writer_has_pending_writes(LogWriter *self)
1156
return log_queue_get_length(self->queue) > 0 || !self->watches_running;
1160
log_writer_opened(LogWriter *self)
1162
return self->proto != NULL;
1165
/* run in the main thread in reaction to a log_writer_reopen to change
1166
* the destination LogProto instance. It needs to be ran in the main
1167
* thread as it reregisters the watches associated with the main
1170
log_writer_reopen_deferred(gpointer s)
1172
gpointer *args = (gpointer *) s;
1173
LogWriter *self = args[0];
1174
LogProto *proto = args[1];
1176
init_sequence_number(&self->seq_num);
1178
if (self->io_job.working)
1180
/* NOTE: proto can be NULL */
1181
self->pending_proto = proto;
1182
self->pending_proto_present = TRUE;
1186
log_writer_stop_watches(self);
1189
log_proto_free(self->proto);
1191
self->proto = proto;
1194
log_writer_start_watches(self);
1198
* This function can be called from any threads, from the main thread
1199
* as well as I/O worker threads. It takes care about going to the
1200
* main thread to actually switch LogProto under this writer.
1202
* The writer may still be operating, (e.g. log_pipe_deinit/init is
1205
* In case we're running in a non-main thread, then by the time this
1206
* function returns, the reopen has finished. In case it is called
1207
* from the main thread, this function may defer updating self->proto
1208
* until the worker thread has finished. The reason for this
1211
* - if LogWriter is busy, then updating the LogProto instance is
1212
* deferred to log_writer_work_finished(), but that runs in the
1215
* - normally, even this deferred update is waited for, but in case
1216
* we're in the main thread, we can't block.
1218
* This situation could probably be improved, maybe the synchonous
1219
* return of log_writer_reopen() is not needed by call sites, but I
1220
* was not sure, and right before release I didn't want to take the
1224
log_writer_reopen(LogPipe *s, LogProto *proto)
1226
LogWriter *self = (LogWriter *) s;
1227
gpointer args[] = { s, proto };
1229
main_loop_call((MainLoopTaskFunc) log_writer_reopen_deferred, args, TRUE);
1231
if (!main_loop_is_main_thread())
1233
g_static_mutex_lock(&self->pending_proto_lock);
1234
while (self->pending_proto_present)
1236
g_cond_wait(self->pending_proto_cond, g_static_mutex_get_mutex(&self->pending_proto_lock));
1238
g_static_mutex_unlock(&self->pending_proto_lock);
1243
log_writer_set_options(LogWriter *self, LogPipe *control, LogWriterOptions *options, gint stats_level, gint stats_source, const gchar *stats_id, const gchar *stats_instance)
1245
self->control = control;
1246
self->options = options;
1248
self->stats_level = stats_level;
1249
self->stats_source = stats_source;
1250
self->stats_id = stats_id ? g_strdup(stats_id) : NULL;
1251
self->stats_instance = stats_instance ? g_strdup(stats_instance) : NULL;
1256
log_writer_new(guint32 flags)
1258
LogWriter *self = g_new0(LogWriter, 1);
1260
log_pipe_init_instance(&self->super);
1261
self->super.init = log_writer_init;
1262
self->super.deinit = log_writer_deinit;
1263
self->super.queue = log_writer_queue;
1264
self->super.free_fn = log_writer_free;
1265
self->flags = flags;
1266
self->line_buffer = g_string_sized_new(128);
1267
self->pollable_state = -1;
1268
init_sequence_number(&self->seq_num);
1270
log_writer_init_watches(self);
1271
g_static_mutex_init(&self->suppress_lock);
1272
g_static_mutex_init(&self->pending_proto_lock);
1273
self->pending_proto_cond = g_cond_new();
1275
return &self->super;
1278
/* returns a reference */
1280
log_writer_get_queue(LogPipe *s)
1282
LogWriter *self = (LogWriter *) s;
1284
return log_queue_ref(self->queue);
1287
/* consumes the reference */
1289
log_writer_set_queue(LogPipe *s, LogQueue *queue)
1291
LogWriter *self = (LogWriter *)s;
1294
log_queue_unref(self->queue);
1295
self->queue = queue;
1299
log_writer_options_defaults(LogWriterOptions *options)
1301
options->template = NULL;
1302
options->flush_lines = -1;
1303
options->flush_timeout = -1;
1304
log_template_options_defaults(&options->template_options);
1305
options->time_reopen = -1;
1306
options->suppress = -1;
1307
options->padding = 0;
1311
log_writer_options_set_template_escape(LogWriterOptions *options, gboolean enable)
1313
if (options->template && options->template->def_inline)
1315
log_template_set_escape(options->template, enable);
1319
msg_error("Macro escaping can only be specified for inline templates", NULL);
1325
* NOTE: options_init and options_destroy are a bit weird, because their
1326
* invocation is not completely symmetric:
1328
* - init is called from driver init (e.g. affile_dd_init),
1329
* - destroy is called from driver free method (e.g. affile_sd_free, NOT affile_dd_deinit)
1332
* - when initializing the reloaded configuration fails for some reason,
1333
* we have to fall back to the old configuration, thus we cannot dump
1334
* the information stored in the Options structure.
1336
* For the reasons above, init and destroy behave the following way:
1338
* - init is idempotent, it can be called multiple times without leaking
1339
* memory, and without loss of information
1340
* - destroy is only called once, when the options are indeed to be destroyed
1342
* As init allocates memory, it has to take care about freeing memory
1343
* allocated by the previous init call (or it has to reuse those).
1347
log_writer_options_init(LogWriterOptions *options, GlobalConfig *cfg, guint32 option_flags)
1349
LogTemplate *template;
1350
gchar *time_zone[2];
1351
TimeZoneInfo *time_zone_info[2];
1354
template = log_template_ref(options->template);
1356
for (i = 0; i < LTZ_MAX; i++)
1358
time_zone[i] = options->template_options.time_zone[i];
1359
time_zone_info[i] = options->template_options.time_zone_info[i];
1360
options->template_options.time_zone[i] = NULL;
1361
options->template_options.time_zone_info[i] = NULL;
1364
log_writer_options_destroy(options);
1365
log_template_options_destroy(&options->template_options);
1367
/* restroe the config */
1368
options->template = template;
1369
for (i = 0; i < LTZ_MAX; i++)
1371
options->template_options.time_zone[i] = time_zone[i];
1372
options->template_options.time_zone_info[i] = time_zone_info[i];
1374
log_template_options_init(&options->template_options, cfg);
1375
options->options |= option_flags;
1377
if (options->flush_lines == -1)
1378
options->flush_lines = cfg->flush_lines;
1379
if (options->flush_timeout == -1)
1380
options->flush_timeout = cfg->flush_timeout;
1381
if (options->suppress == -1)
1382
options->suppress = cfg->suppress;
1383
if (options->time_reopen == -1)
1384
options->time_reopen = cfg->time_reopen;
1385
options->file_template = log_template_ref(cfg->file_template);
1386
options->proto_template = log_template_ref(cfg->proto_template);
1388
options->options |= LWO_THREADED;
1392
log_writer_options_destroy(LogWriterOptions *options)
1394
log_template_options_destroy(&options->template_options);
1395
log_template_unref(options->template);
1396
log_template_unref(options->file_template);
1397
log_template_unref(options->proto_template);
1401
log_writer_options_lookup_flag(const gchar *flag)
1403
if (strcmp(flag, "syslog_protocol") == 0 || strcmp(flag, "syslog-protocol") == 0)
1404
return LWO_SYSLOG_PROTOCOL;
1405
if (strcmp(flag, "no-multi-line") == 0 || strcmp(flag, "no_multi_line") == 0)
1406
return LWO_NO_MULTI_LINE;
1407
if (strcmp(flag, "threaded") == 0)
1408
return LWO_THREADED;
1409
if (strcmp(flag, "ignore-errors") == 0 || strcmp(flag, "ignore_errors") == 0)
1410
return LWO_IGNORE_ERRORS;
1411
msg_error("Unknown dest writer flag", evt_tag_str("flag", flag), NULL);