~ubuntu-branches/ubuntu/trusty/syslog-ng/trusty-proposed

« back to all changes in this revision

Viewing changes to lib/logwriter.c

  • Committer: Package Import Robot
  • Author(s): Laszlo Boszormenyi (GCS), Gergely Nagy
  • Date: 2011-10-11 14:30:48 UTC
  • mfrom: (1.3.7)
  • Revision ID: package-import@ubuntu.com-20111011143048-r1iljux9xbvj3lwh
Tags: 3.3.1.dfsg-1
* New upstream release with important fixes from upstream git tree with
  non-free manpages removed.
* Drop syslog-ng.conf(5) (closes: #496521).
* syslog-ng(8) is generated, and does not mention -Q anymore
  (closes: #616069).
* Supports CAP_SYSLOG on recent kernels (closes: #630172).
* Does not use g_timeout_add_seconds anymore (closes: #609154).

[ Gergely Nagy <algernon@madhouse-project.org> ]
* Update debian/copyright to DEP-5 format.
* Simplified the logrotate file by merging identical entries.
* Include local configuration files from /etc/syslog-ng/conf.d/ (Closes:
  #609050).
* Update syslog-ng.conf to be fully 3.3 compliant.
* Compress both source and binaries (except the syslog-ng meta
  package) with xz, instead of gzip.
* Use dpkg triggers to restart syslog-ng when appropriate.
* Include DFSG-free manual pages for all binaries.
* Build with Hardening enabled.
* Mention syslog(3) in /etc/default/syslog-ng, instead of
  <linux/kernel.h> (Closes: #608605)
* Support 'status' in the init script.
  Patch from Peter Eisentraut <petere@debian.org> (Closes: #644458)
* Build-Depend on libevtlog-dev (>= 0.2.12-5~) for correct shlibs.
* Use [linux-any] in Build-Depends instead of hardcoded links.
  (Closes: #634715)
* Use $SYSLOGNG_OPTS in the init script when reloading syslog-ng.
  (Closes: #589081)

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
#include "messages.h"
27
27
#include "stats.h"
28
28
#include "misc.h"
 
29
#include "mainloop.h"
 
30
#include "str-format.h"
29
31
 
30
32
#include <unistd.h>
31
33
#include <assert.h>
32
34
#include <string.h>
33
35
#include <stdlib.h>
34
 
 
35
 
typedef struct _LogWriterWatch
36
 
{
37
 
  GSource super;
38
 
  GPollFD pollfd;
39
 
  LogWriter *writer;
40
 
  LogProto *proto;
41
 
  GTimeVal flush_target;
42
 
  GTimeVal error_suspend_target;
43
 
  GTimeVal last_throttle_check;
44
 
  gboolean flush_waiting_for_timeout:1,
45
 
           input_means_connection_broken:1,
46
 
           error_suspend:1;
47
 
} LogWriterWatch;
 
36
#include <sys/stat.h>
 
37
#include <unistd.h>
 
38
#include <iv.h>
 
39
#include <iv_event.h>
 
40
#include <iv_work.h>
 
41
 
 
42
typedef enum
 
43
{
 
44
  /* flush modes */
 
45
 
 
46
  /* business as usual, flush when the buffer is full */
 
47
  LW_FLUSH_NORMAL,
 
48
  /* flush the buffer immediately please */
 
49
  LW_FLUSH_BUFFER,
 
50
  /* pull off any queued items, at maximum speed, even ignoring throttle, and flush the buffer too */
 
51
  LW_FLUSH_QUEUE,
 
52
} LogWriterFlushMode;
 
53
 
 
54
struct _LogWriter
 
55
{
 
56
  LogPipe super;
 
57
  LogQueue *queue;
 
58
  guint32 flags:31;
 
59
  gint32 seq_num;
 
60
  StatsCounterItem *dropped_messages;
 
61
  StatsCounterItem *suppressed_messages;
 
62
  StatsCounterItem *processed_messages;
 
63
  StatsCounterItem *stored_messages;
 
64
  LogPipe *control;
 
65
  LogWriterOptions *options;
 
66
  GStaticMutex suppress_lock;
 
67
  LogMessage *last_msg;
 
68
  guint32 last_msg_count;
 
69
  GString *line_buffer;
 
70
 
 
71
  gint stats_level;
 
72
  guint16 stats_source;
 
73
  gchar *stats_id;
 
74
  gchar *stats_instance;
 
75
 
 
76
  struct iv_fd fd_watch;
 
77
  struct iv_timer suspend_timer;
 
78
  struct iv_task immed_io_task;
 
79
  struct iv_event queue_filled;
 
80
  MainLoopIOWorkerJob io_job;
 
81
  struct iv_timer suppress_timer;
 
82
  struct timespec suppress_timer_expires;
 
83
  gboolean suppress_timer_updated;
 
84
  gboolean work_result;
 
85
  gint pollable_state;
 
86
  LogProto *proto, *pending_proto;
 
87
  gboolean watches_running:1, suspended:1, working:1, flush_waiting_for_timeout:1;
 
88
  gboolean pending_proto_present;
 
89
  GCond *pending_proto_cond;
 
90
  GStaticMutex pending_proto_lock;
 
91
};
48
92
 
49
93
/**
50
94
 * LogWriter behaviour
69
113
 *
70
114
 **/
71
115
 
72
 
static gboolean log_writer_flush(LogWriter *self, gboolean flush_all);
 
116
static gboolean log_writer_flush(LogWriter *self, LogWriterFlushMode flush_mode);
73
117
static void log_writer_broken(LogWriter *self, gint notify_code);
74
 
static gboolean log_writer_throttling(LogWriter *self);
75
 
 
76
 
 
77
 
static gboolean
78
 
log_writer_fd_prepare(GSource *source,
79
 
                      gint *timeout)
80
 
{
81
 
  LogWriterWatch *self = (LogWriterWatch *) source;
82
 
  gint64 num_elements = log_queue_get_length(self->writer->queue);
83
 
  GTimeVal now;
84
 
  GIOCondition proto_cond;
85
 
 
86
 
  self->pollfd.events = G_IO_ERR;
87
 
  self->pollfd.revents = 0;
88
 
 
89
 
  g_source_get_current_time(source, &now);
90
 
  if (log_proto_prepare(self->proto, &self->pollfd.fd, &proto_cond, timeout))
91
 
    return TRUE;
92
 
  
93
 
  /* recalculate buckets */
94
 
  
95
 
  if (self->writer->options->throttle > 0)
 
118
static void log_writer_start_watches(LogWriter *self);
 
119
static void log_writer_stop_watches(LogWriter *self);
 
120
static void log_writer_update_watches(LogWriter *self);
 
121
static void log_writer_suspend(LogWriter *self);
 
122
 
 
123
static void
 
124
log_writer_work_perform(gpointer s)
 
125
{
 
126
  LogWriter *self = (LogWriter *) s;
 
127
 
 
128
  g_assert((self->super.flags & PIF_INITIALIZED) != 0);
 
129
  self->work_result = log_writer_flush(self, self->flush_waiting_for_timeout ? LW_FLUSH_BUFFER : LW_FLUSH_NORMAL);
 
130
}
 
131
 
 
132
static void
 
133
log_writer_work_finished(gpointer s)
 
134
{
 
135
  LogWriter *self = (LogWriter *) s;
 
136
 
 
137
  main_loop_assert_main_thread();
 
138
  self->flush_waiting_for_timeout = FALSE;
 
139
 
 
140
  if (self->pending_proto_present)
96
141
    {
97
 
      gint64 diff;
98
 
      gint new_buckets;
99
 
      
100
 
      /* throttling is enabled, calculate new buckets */
101
 
      if (self->last_throttle_check.tv_sec != 0)
102
 
        {
103
 
          diff = g_time_val_diff(&now, &self->last_throttle_check);
104
 
        }
105
 
      else
106
 
        {
107
 
          diff = 0;
108
 
          self->last_throttle_check = now;
109
 
        }
110
 
      new_buckets = (self->writer->options->throttle * diff) / G_USEC_PER_SEC;
111
 
      if (new_buckets)
112
 
        {
113
 
          
114
 
          /* if new_buckets is zero, we don't save the current time as
115
 
           * last_throttle_check. The reason is that new_buckets could be
116
 
           * rounded to zero when only a minimal interval passes between
117
 
           * poll iterations.
118
 
           */
119
 
          self->writer->throttle_buckets = MIN(self->writer->options->throttle, self->writer->throttle_buckets + new_buckets);
120
 
          self->last_throttle_check = now;
121
 
        }
 
142
      /* pending proto is only set in the main thread, so no need to
 
143
       * lock it before coming here. After we're syncing with the
 
144
       * log_writer_reopen() call, quite possibly coming from a
 
145
       * non-main thread. */
 
146
 
 
147
      g_static_mutex_lock(&self->pending_proto_lock);
 
148
      if (self->proto)
 
149
        log_proto_free(self->proto);
 
150
 
 
151
      self->proto = self->pending_proto;
 
152
      self->pending_proto = NULL;
 
153
      self->pending_proto_present = FALSE;
 
154
 
 
155
      g_cond_signal(self->pending_proto_cond);
 
156
      g_static_mutex_unlock(&self->pending_proto_lock);
122
157
    }
123
158
 
124
 
  if (G_UNLIKELY(self->error_suspend))
 
159
  if (!self->work_result)
125
160
    {
126
 
      *timeout = g_time_val_diff(&self->error_suspend_target, &now) / 1000;
127
 
      if (*timeout <= 0)
 
161
      log_writer_broken(self, NC_WRITE_ERROR);
 
162
      if (self->proto)
128
163
        {
129
 
          msg_notice("Error suspend timeout has elapsed, attempting to write again",
 
164
          log_writer_suspend(self);
 
165
          msg_notice("Suspending write operation because of an I/O error",
130
166
                     evt_tag_int("fd", log_proto_get_fd(self->proto)),
 
167
                     evt_tag_int("time_reopen", self->options->time_reopen),
131
168
                     NULL);
132
 
          self->error_suspend = FALSE;
133
 
          *timeout = -1;
134
 
        }
135
 
      else
136
 
        {
137
 
          return FALSE;
138
 
        }
139
 
    }
140
 
  
141
 
  if ((self->writer->options->flush_lines == 0 && (!log_writer_throttling(self->writer) && num_elements != 0)) ||
142
 
      (self->writer->options->flush_lines > 0  && (!log_writer_throttling(self->writer) && num_elements >= self->writer->options->flush_lines)))
143
 
    {
144
 
      /* we need to flush our buffers */
145
 
      self->pollfd.events |= proto_cond;
146
 
    }
147
 
  else if (num_elements && !log_writer_throttling(self->writer))
148
 
    {
149
 
      /* our buffer does not contain enough elements to flush, but we do not
150
 
       * want to wait more than flush_timeout time */
151
 
      
152
 
      if (!self->flush_waiting_for_timeout)
153
 
        {
154
 
          /* start waiting */
155
 
 
156
 
          *timeout = self->writer->options->flush_timeout;
157
 
          g_source_get_current_time(source, &self->flush_target);
158
 
          g_time_val_add(&self->flush_target, *timeout * 1000);
159
 
          self->flush_waiting_for_timeout = TRUE;
160
 
        }
161
 
      else
162
 
        {
163
 
          glong to = g_time_val_diff(&self->flush_target, &now) / 1000;
164
 
          if (to <= 0)
165
 
            {
166
 
              /* timeout elapsed, start polling again */
167
 
              if (self->writer->flags & LW_ALWAYS_WRITABLE)
168
 
                return TRUE;
169
 
              self->pollfd.events = proto_cond;
170
 
            }
 
169
        }
 
170
      goto exit;
 
171
    }
 
172
 
 
173
  if ((self->super.flags & PIF_INITIALIZED) && self->proto)
 
174
    {
 
175
      /* reenable polling the source, but only if we're still initialized */
 
176
      log_writer_start_watches(self);
 
177
    }
 
178
 
 
179
exit:
 
180
  log_pipe_unref(&self->super);
 
181
}
 
182
 
 
183
static void
 
184
log_writer_io_flush_output(gpointer s)
 
185
{
 
186
  LogWriter *self = (LogWriter *) s;
 
187
 
 
188
  main_loop_assert_main_thread();
 
189
 
 
190
  log_writer_stop_watches(self);
 
191
  log_pipe_ref(&self->super);
 
192
  if ((self->options->options & LWO_THREADED))
 
193
    {
 
194
      main_loop_io_worker_job_submit(&self->io_job);
 
195
    }
 
196
  else
 
197
    {
 
198
      log_writer_work_perform(s);
 
199
      log_writer_work_finished(s);
 
200
    }
 
201
}
 
202
 
 
203
static void
 
204
log_writer_io_error(gpointer s)
 
205
{
 
206
  LogWriter *self = (LogWriter *) s;
 
207
 
 
208
  if (self->fd_watch.handler_out == NULL && self->fd_watch.handler_in == NULL)
 
209
    {
 
210
      msg_debug("POLLERR occurred while idle",
 
211
                evt_tag_int("fd", log_proto_get_fd(self->proto)),
 
212
                NULL);
 
213
      log_writer_broken(self, NC_WRITE_ERROR);
 
214
      return;
 
215
    }
 
216
  else
 
217
    {
 
218
      /* in case we have an error state but we also asked for read/write
 
219
       * polling, the error should be handled by the I/O callback.  But we
 
220
       * need not call that explicitly as ivykis does that for us.  */
 
221
    }
 
222
  log_writer_update_watches(self);
 
223
}
 
224
 
 
225
static void
 
226
log_writer_io_check_eof(gpointer s)
 
227
{
 
228
  LogWriter *self = (LogWriter *) s;
 
229
 
 
230
  msg_error("EOF occurred while idle",
 
231
            evt_tag_int("fd", log_proto_get_fd(self->proto)),
 
232
            NULL);
 
233
  log_writer_broken(self, NC_CLOSE);
 
234
}
 
235
 
 
236
static void
 
237
log_writer_error_suspend_elapsed(gpointer s)
 
238
{
 
239
  LogWriter *self = (LogWriter *) s;
 
240
 
 
241
  self->suspended = FALSE;
 
242
  msg_notice("Error suspend timeout has elapsed, attempting to write again",
 
243
             evt_tag_int("fd", log_proto_get_fd(self->proto)),
 
244
             NULL);
 
245
  log_writer_update_watches(self);
 
246
}
 
247
 
 
248
static void
 
249
log_writer_update_fd_callbacks(LogWriter *self, GIOCondition cond)
 
250
{
 
251
  main_loop_assert_main_thread();
 
252
  if (self->pollable_state > 0)
 
253
    {
 
254
      if (self->flags & LW_DETECT_EOF && (cond & G_IO_IN) == 0 && (cond & G_IO_OUT))
 
255
        {
 
256
          /* if output is enabled, and we're in DETECT_EOF mode, and input is
 
257
           * not needed by the log protocol, install the eof check callback to
 
258
           * destroy the connection if an EOF is received. */
 
259
 
 
260
          iv_fd_set_handler_in(&self->fd_watch, log_writer_io_check_eof);
 
261
        }
 
262
      else if (cond & G_IO_IN)
 
263
        {
 
264
          /* in case the protocol requested G_IO_IN, it means that it needs to
 
265
           * invoke read in the flush code, so just install the flush_output
 
266
           * handler for input */
 
267
 
 
268
          iv_fd_set_handler_in(&self->fd_watch, log_writer_io_flush_output);
 
269
        }
 
270
      else
 
271
        {
 
272
          /* otherwise we're not interested in input */
 
273
          iv_fd_set_handler_in(&self->fd_watch, NULL);
 
274
        }
 
275
      if (cond & G_IO_OUT)
 
276
        iv_fd_set_handler_out(&self->fd_watch, log_writer_io_flush_output);
 
277
      else
 
278
        iv_fd_set_handler_out(&self->fd_watch, NULL);
 
279
 
 
280
      iv_fd_set_handler_err(&self->fd_watch, log_writer_io_error);
 
281
    }
 
282
  else
 
283
    {
 
284
      /* fd is not pollable, assume it is always writable */
 
285
      if (cond & G_IO_OUT)
 
286
        {
 
287
          if (!iv_task_registered(&self->immed_io_task))
 
288
            iv_task_register(&self->immed_io_task);
 
289
        }
 
290
      else if (iv_task_registered(&self->immed_io_task))
 
291
        {
 
292
          iv_task_unregister(&self->immed_io_task);
 
293
        }
 
294
    }
 
295
}
 
296
 
 
297
void
 
298
log_writer_arm_suspend_timer(LogWriter *self, void (*handler)(void *), gint timeout_msec)
 
299
{
 
300
  if (iv_timer_registered(&self->suspend_timer))
 
301
    iv_timer_unregister(&self->suspend_timer);
 
302
  iv_validate_now();
 
303
  self->suspend_timer.handler = handler;
 
304
  self->suspend_timer.expires = iv_now;
 
305
  timespec_add_msec(&self->suspend_timer.expires, timeout_msec);
 
306
  iv_timer_register(&self->suspend_timer);
 
307
}
 
308
 
 
309
static void
 
310
log_writer_queue_filled(gpointer s)
 
311
{
 
312
  LogWriter *self = (LogWriter *) s;
 
313
 
 
314
  main_loop_assert_main_thread();
 
315
 
 
316
  /*
 
317
   * NOTE: This theory is somewhat questionable, e.g. I'm not 100% sure it
 
318
   * is the right scenario, but the race was closed.  So take this with a
 
319
   * grain of salt.
 
320
   *
 
321
   * The queue_filled callback is running in the main thread. Because of the
 
322
   * possible delay caused by iv_event_post() the callback might be
 
323
   * delivered event after stop_watches() has been called.
 
324
   *
 
325
   *   - log_writer_schedule_update_watches() is called by the reader
 
326
   *     thread, which calls iv_event_post()
 
327
   *   - the main thread calls stop_watches() in work_perform
 
328
   *   - the event is delivered in the main thread
 
329
   *
 
330
   * But since stop/start watches always run in the main thread and we do
 
331
   * too, we can check if this is the case.  A LogWriter without watches
 
332
   * running is busy writing out data to the destination, e.g.  a
 
333
   * start_watches is to be expected once log_writer_work_finished() is run
 
334
   * at the end of the deferred work, executed by the I/O threads.
 
335
   */
 
336
  if (self->watches_running)
 
337
    log_writer_update_watches((LogWriter *) s);
 
338
}
 
339
 
 
340
/* NOTE: runs in the source thread */
 
341
static void
 
342
log_writer_schedule_update_watches(LogWriter *self)
 
343
{
 
344
  iv_event_post(&self->queue_filled);
 
345
}
 
346
 
 
347
static void
 
348
log_writer_suspend(LogWriter *self)
 
349
{
 
350
  /* flush code indicates that we need to suspend our writing activities
 
351
   * until time_reopen elapses */
 
352
 
 
353
  log_writer_arm_suspend_timer(self, log_writer_error_suspend_elapsed, self->options->time_reopen * 1e3);
 
354
  self->suspended = TRUE;
 
355
}
 
356
 
 
357
static void
 
358
log_writer_update_watches(LogWriter *self)
 
359
{
 
360
  gint fd;
 
361
  GIOCondition cond = 0;
 
362
  gboolean partial_batch;
 
363
  gint timeout_msec = 0;
 
364
 
 
365
  main_loop_assert_main_thread();
 
366
 
 
367
  /* NOTE: we either start the suspend_timer or enable the fd_watch. The two MUST not happen at the same time. */
 
368
 
 
369
  if (log_proto_prepare(self->proto, &fd, &cond) ||
 
370
      self->flush_waiting_for_timeout ||
 
371
      log_queue_check_items(self->queue, self->options->flush_lines, &partial_batch, &timeout_msec,
 
372
                            (LogQueuePushNotifyFunc) log_writer_schedule_update_watches, self, NULL))
 
373
    {
 
374
      /* flush_lines number of element is already available and throttle would permit us to send. */
 
375
      log_writer_update_fd_callbacks(self, cond);
 
376
    }
 
377
  else if (partial_batch || timeout_msec)
 
378
    {
 
379
      /* few elements are available, but less than flush_lines, we need to start a timer to initiate a flush */
 
380
 
 
381
      log_writer_update_fd_callbacks(self, 0);
 
382
      self->flush_waiting_for_timeout = TRUE;
 
383
      log_writer_arm_suspend_timer(self, (void (*)(void *)) log_writer_update_watches, timeout_msec ? timeout_msec : self->options->flush_timeout);
 
384
    }
 
385
  else
 
386
    {
 
387
      /* no elements or no throttle space, wait for a wakeup by the queue
 
388
       * when the required number of items are added.  see the
 
389
       * log_queue_check_items and its parallel_push argument above
 
390
       */
 
391
      log_writer_update_fd_callbacks(self, 0);
 
392
    }
 
393
}
 
394
 
 
395
static gboolean
 
396
is_file_regular(gint fd)
 
397
{
 
398
  struct stat st;
 
399
 
 
400
  if (fstat(fd, &st) >= 0)
 
401
    {
 
402
      return S_ISREG(st.st_mode);
 
403
    }
 
404
 
 
405
  /* if stat fails, that's interesting, but we should probably poll
 
406
   * it, hopefully that's less likely to cause spinning */
 
407
 
 
408
  return FALSE;
 
409
}
 
410
 
 
411
static void
 
412
log_writer_start_watches(LogWriter *self)
 
413
{
 
414
  gint fd;
 
415
  GIOCondition cond;
 
416
 
 
417
  if (!self->watches_running)
 
418
    {
 
419
      log_proto_prepare(self->proto, &fd, &cond);
 
420
 
 
421
      if (self->pollable_state < 0)
 
422
        {
 
423
          if (is_file_regular(fd))
 
424
            self->pollable_state = 0;
171
425
          else
172
 
            {
173
 
              *timeout = to;
174
 
            }
175
 
        }
176
 
      return FALSE;
177
 
    }
178
 
  else
179
 
    {
180
 
      if (num_elements && log_writer_throttling(self->writer))
181
 
        {
182
 
          /* we are unable to send because of throttling, make sure that we
183
 
           * wake up when the rate limits lets us send at least 1 message */
184
 
          *timeout = (1000 / self->writer->options->throttle) + 1;
185
 
          msg_debug("Throttling output", 
186
 
                    evt_tag_int("wait", *timeout), 
187
 
                    NULL);
188
 
        }
189
 
    }
190
 
  
191
 
  if (self->writer->flags & LW_DETECT_EOF && (self->pollfd.events & G_IO_IN) == 0)
192
 
    {
193
 
      self->pollfd.events |= G_IO_HUP | G_IO_IN;
194
 
      self->input_means_connection_broken = TRUE;
195
 
    }
196
 
  else
197
 
    {
198
 
      self->input_means_connection_broken = FALSE;
199
 
    }
200
 
    
201
 
  self->flush_waiting_for_timeout = FALSE;
202
 
  
203
 
  if ((self->pollfd.events & G_IO_OUT) && (self->writer->flags & LW_ALWAYS_WRITABLE))
204
 
    {
205
 
      self->pollfd.revents = G_IO_OUT;
206
 
      return TRUE;
207
 
    }
208
 
  return FALSE;
209
 
}
210
 
 
211
 
static gboolean
212
 
log_writer_fd_check(GSource *source)
213
 
{
214
 
  LogWriterWatch *self = (LogWriterWatch *) source;
215
 
  gint64 num_elements = log_queue_get_length(self->writer->queue);
216
 
  
217
 
  if (self->error_suspend)
218
 
    return FALSE;
219
 
  
220
 
  if (num_elements && !log_writer_throttling(self->writer))
221
 
    {
222
 
      /* we have data to flush */
223
 
      if (self->flush_waiting_for_timeout)
224
 
        {
225
 
          GTimeVal tv;
226
 
 
227
 
          /* check if timeout elapsed */
228
 
          g_source_get_current_time(source, &tv);
229
 
          if (!(self->flush_target.tv_sec <= tv.tv_sec || (self->flush_target.tv_sec == tv.tv_sec && self->flush_target.tv_usec <= tv.tv_usec)))
230
 
            return FALSE;
231
 
          if ((self->writer->flags & LW_ALWAYS_WRITABLE))
232
 
            return TRUE;
233
 
        }
234
 
    }
235
 
  return !!(self->pollfd.revents & (G_IO_OUT | G_IO_ERR | G_IO_HUP | G_IO_IN));
236
 
}
237
 
 
238
 
static gboolean
239
 
log_writer_fd_dispatch(GSource *source,
240
 
                       GSourceFunc callback,
241
 
                       gpointer user_data)
242
 
{
243
 
  LogWriterWatch *self = (LogWriterWatch *) source;
244
 
  gint64 num_elements = log_queue_get_length(self->writer->queue);
245
 
 
246
 
  if (self->pollfd.revents & (G_IO_HUP | G_IO_IN) && self->input_means_connection_broken)
247
 
    {
248
 
      msg_error("EOF occurred while idle",
249
 
                evt_tag_int("fd", log_proto_get_fd(self->proto)),
250
 
                NULL);
251
 
      log_writer_broken(self->writer, NC_CLOSE);
252
 
      return FALSE;
253
 
    }
254
 
  else if (self->pollfd.revents & (G_IO_ERR) && num_elements == 0)
255
 
    {
256
 
      msg_error("POLLERR occurred while idle",
257
 
                evt_tag_int("fd", log_proto_get_fd(self->proto)),
258
 
                NULL);
259
 
      log_writer_broken(self->writer, NC_WRITE_ERROR);
260
 
    }
261
 
  else if (num_elements)
262
 
    {
263
 
      if (!log_writer_flush(self->writer, FALSE))
264
 
        {
265
 
          self->error_suspend = TRUE;
266
 
          g_source_get_current_time(source, &self->error_suspend_target);
267
 
          g_time_val_add(&self->error_suspend_target, self->writer->options->time_reopen * 1e6);
268
 
 
269
 
          log_writer_broken(self->writer, NC_WRITE_ERROR);
270
 
          
271
 
          if (self->writer->source == (GSource *) self)
272
 
            {
273
 
              msg_notice("Suspending write operation because of an I/O error",
274
 
                         evt_tag_int("fd", log_proto_get_fd(self->proto)),
275
 
                         evt_tag_int("time_reopen", self->writer->options->time_reopen),
276
 
                         NULL);
277
 
            }
278
 
          return TRUE;
279
 
        }
280
 
    }
281
 
  return TRUE;
282
 
}
283
 
 
284
 
static void
285
 
log_writer_fd_finalize(GSource *source)
286
 
{
287
 
  LogWriterWatch *self = (LogWriterWatch *) source;
288
 
 
289
 
  log_proto_free(self->proto);
290
 
  log_pipe_unref(&self->writer->super);
291
 
}
292
 
 
293
 
GSourceFuncs log_writer_source_funcs =
294
 
{
295
 
  log_writer_fd_prepare,
296
 
  log_writer_fd_check,
297
 
  log_writer_fd_dispatch,
298
 
  log_writer_fd_finalize
299
 
};
300
 
 
301
 
static GSource *
302
 
log_writer_watch_new(LogWriter *writer, LogProto *proto)
303
 
{
304
 
  LogWriterWatch *self = (LogWriterWatch *) g_source_new(&log_writer_source_funcs, sizeof(LogWriterWatch));
305
 
  
306
 
  self->writer = writer;
307
 
  self->proto = proto;
308
 
  log_pipe_ref(&self->writer->super);
309
 
  g_source_set_priority(&self->super, LOG_PRIORITY_WRITER);
310
 
  
311
 
  if ((writer->flags & LW_ALWAYS_WRITABLE) == 0)
312
 
    g_source_add_poll(&self->super, &self->pollfd);
313
 
  return &self->super;
314
 
}
315
 
 
316
 
static gboolean
317
 
log_writer_throttling(LogWriter *self)
318
 
{
319
 
  return self->options->throttle > 0 && self->throttle_buckets == 0;
320
 
}
321
 
 
 
426
            self->pollable_state = iv_fd_pollable(fd);
 
427
        }
 
428
 
 
429
      if (self->pollable_state)
 
430
        {
 
431
          self->fd_watch.fd = fd;
 
432
          iv_fd_register(&self->fd_watch);
 
433
        }
 
434
 
 
435
      log_writer_update_watches(self);
 
436
      self->watches_running = TRUE;
 
437
    }
 
438
}
 
439
 
 
440
static void
 
441
log_writer_stop_watches(LogWriter *self)
 
442
{
 
443
  if (self->watches_running)
 
444
    {
 
445
      if (iv_timer_registered(&self->suspend_timer))
 
446
        iv_timer_unregister(&self->suspend_timer);
 
447
      if (iv_fd_registered(&self->fd_watch))
 
448
        iv_fd_unregister(&self->fd_watch);
 
449
      if (iv_task_registered(&self->immed_io_task))
 
450
        iv_task_unregister(&self->immed_io_task);
 
451
 
 
452
      log_queue_reset_parallel_push(self->queue);
 
453
 
 
454
      self->watches_running = FALSE;
 
455
    }
 
456
}
 
457
 
 
458
/* function called using main_loop_call() in case the suppress timer needs
 
459
 * to be updated */
 
460
static void
 
461
log_writer_perform_suppress_timer_update(LogWriter *self)
 
462
{
 
463
  main_loop_assert_main_thread();
 
464
 
 
465
  if (iv_timer_registered(&self->suppress_timer))
 
466
    iv_timer_unregister(&self->suppress_timer);
 
467
  g_static_mutex_lock(&self->suppress_lock);
 
468
  self->suppress_timer.expires = self->suppress_timer_expires;
 
469
  self->suppress_timer_updated = TRUE;
 
470
  g_static_mutex_unlock(&self->suppress_lock);
 
471
  if (self->suppress_timer.expires.tv_sec > 0)
 
472
    iv_timer_register(&self->suppress_timer);
 
473
  log_pipe_unref(&self->super);
 
474
}
 
475
 
 
476
/*
 
477
 * Update the suppress timer in a deferred manner, possibly batching the
 
478
 * results of multiple updates to the suppress timer.  This is necessary as
 
479
 * suppress timer updates must run in the main thread, and updating it every
 
480
 * time a new message comes in would cause enormous latency in the fast
 
481
 * path. By collecting multiple updates
 
482
 *
 
483
 * msec == 0 means to turn off the suppress timer
 
484
 * msec >  0 to enable the timer with the specified timeout
 
485
 *
 
486
 * NOTE: suppress_lock must be held.
 
487
 */
 
488
static void
 
489
log_writer_update_suppress_timer(LogWriter *self, glong sec)
 
490
{
 
491
  gboolean invoke;
 
492
  struct timespec next_expires;
 
493
 
 
494
  iv_validate_now();
 
495
 
 
496
  /* we deliberately use nsec == 0 in order to increase the likelyhood that
 
497
   * we target the same second, in case only a fraction of a second has
 
498
   * passed between two updates.  */
 
499
  if (sec)
 
500
    {
 
501
      next_expires.tv_nsec = 0;
 
502
      next_expires.tv_sec = iv_now.tv_sec + sec;
 
503
    }
 
504
  else
 
505
    {
 
506
      next_expires.tv_sec = 0;
 
507
      next_expires.tv_nsec = 0;
 
508
    }
 
509
  /* last update was finished, we need to invoke the updater again */
 
510
  invoke = ((next_expires.tv_sec != self->suppress_timer_expires.tv_sec) || (next_expires.tv_nsec != self->suppress_timer_expires.tv_nsec)) && self->suppress_timer_updated;
 
511
  self->suppress_timer_updated = FALSE;
 
512
 
 
513
  if (invoke)
 
514
    {
 
515
      self->suppress_timer_expires = next_expires;
 
516
      g_static_mutex_unlock(&self->suppress_lock);
 
517
      log_pipe_ref(&self->super);
 
518
      main_loop_call((void *(*)(void *)) log_writer_perform_suppress_timer_update, self, FALSE);
 
519
      g_static_mutex_lock(&self->suppress_lock);
 
520
    }
 
521
 
 
522
}
 
523
 
 
524
/*
 
525
 * NOTE: suppress_lock must be held.
 
526
 */
322
527
static void
323
528
log_writer_last_msg_release(LogWriter *self)
324
529
{
325
 
  if (self->last_msg_timerid)
326
 
    g_source_remove(self->last_msg_timerid);
327
 
 
 
530
  log_writer_update_suppress_timer(self, 0);
328
531
  if (self->last_msg)
329
532
    log_msg_unref(self->last_msg);
330
533
 
331
534
  self->last_msg = NULL;
332
535
  self->last_msg_count = 0;
333
 
  self->last_msg_timerid = 0;
334
536
}
335
537
 
 
538
/*
 
539
 * NOTE: suppress_lock must be held.
 
540
 */
336
541
static void
337
542
log_writer_last_msg_flush(LogWriter *self)
338
543
{
357
562
  p = log_msg_get_value(self->last_msg, LM_V_PROGRAM, &len);
358
563
  log_msg_set_value(m, LM_V_PROGRAM, p, len);
359
564
 
360
 
  len = g_snprintf(buf, sizeof(buf), "Last message '%.20s' repeated %d times, supressed by syslog-ng on %s",
 
565
  len = g_snprintf(buf, sizeof(buf), "Last message '%.20s' repeated %d times, suppressed by syslog-ng on %s",
361
566
                   log_msg_get_value(self->last_msg, LM_V_MESSAGE, NULL),
362
567
                   self->last_msg_count,
363
568
                   hostname);
364
569
  log_msg_set_value(m, LM_V_MESSAGE, buf, len);
365
570
 
366
 
  path_options.flow_control = FALSE;
367
 
  if (!log_queue_push_tail(self->queue, m, &path_options))
368
 
    {
369
 
      stats_counter_inc(self->dropped_messages);
370
 
      msg_debug("Destination queue full, dropping suppressed message",
371
 
                evt_tag_int("queue_len", log_queue_get_length(self->queue)),
372
 
                evt_tag_int("mem_fifo_size", self->options->mem_fifo_size),
373
 
                NULL);
374
 
      log_msg_drop(m, &path_options);
375
 
    }
 
571
  path_options.ack_needed = FALSE;
376
572
 
 
573
  log_queue_push_tail(self->queue, m, &path_options);
377
574
  log_writer_last_msg_release(self);
378
575
}
379
576
 
380
 
static gboolean
381
 
last_msg_timer(gpointer pt)
382
 
{
383
 
  LogWriter *self = (LogWriter *) pt;
384
 
 
385
 
  log_writer_last_msg_flush(self);
386
 
 
387
 
  return FALSE;
388
 
}
389
577
 
390
578
/**
391
579
 * Remember the last message for dup detection.
 
580
 *
 
581
 * NOTE: suppress_lock must be held.
392
582
 **/
393
583
static void
394
584
log_writer_last_msg_record(LogWriter *self, LogMessage *lm)
401
591
  self->last_msg_count = 0;
402
592
}
403
593
 
 
594
static gboolean
 
595
log_writer_last_msg_timer(gpointer pt)
 
596
{
 
597
  LogWriter *self = (LogWriter *) pt;
 
598
 
 
599
  main_loop_assert_main_thread();
 
600
 
 
601
  g_static_mutex_lock(&self->suppress_lock);
 
602
  log_writer_last_msg_flush(self);
 
603
  g_static_mutex_unlock(&self->suppress_lock);
 
604
 
 
605
  return FALSE;
 
606
}
 
607
 
404
608
/**
405
609
 * log_writer_last_msg_check:
406
610
 *
411
615
static gboolean
412
616
log_writer_last_msg_check(LogWriter *self, LogMessage *lm, const LogPathOptions *path_options)
413
617
{
 
618
  g_static_mutex_lock(&self->suppress_lock);
414
619
  if (self->last_msg)
415
620
    {
416
 
      if (self->last_msg->timestamps[LM_TS_RECVD].time.tv_sec >= lm->timestamps[LM_TS_RECVD].time.tv_sec - self->options->suppress &&
 
621
      if (self->last_msg->timestamps[LM_TS_RECVD].tv_sec >= lm->timestamps[LM_TS_RECVD].tv_sec - self->options->suppress &&
417
622
          strcmp(log_msg_get_value(self->last_msg, LM_V_MESSAGE, NULL), log_msg_get_value(lm, LM_V_MESSAGE, NULL)) == 0 &&
418
623
          strcmp(log_msg_get_value(self->last_msg, LM_V_HOST, NULL), log_msg_get_value(lm, LM_V_HOST, NULL)) == 0 &&
419
624
          strcmp(log_msg_get_value(self->last_msg, LM_V_PROGRAM, NULL), log_msg_get_value(lm, LM_V_PROGRAM, NULL)) == 0 &&
421
626
          strcmp(log_msg_get_value(lm, LM_V_MESSAGE, NULL), "-- MARK --") != 0)
422
627
        {
423
628
          stats_counter_inc(self->suppressed_messages);
 
629
          self->last_msg_count++;
 
630
          
 
631
          if (self->last_msg_count == 1)
 
632
            {
 
633
              /* we only create the timer if this is the first suppressed message, otherwise it is already running. */
 
634
 
 
635
              log_writer_update_suppress_timer(self, self->options->suppress);
 
636
            }
 
637
          g_static_mutex_unlock(&self->suppress_lock);
 
638
 
424
639
          msg_debug("Suppressing duplicate message",
425
640
                    evt_tag_str("host", log_msg_get_value(lm, LM_V_HOST, NULL)),
426
641
                    evt_tag_str("msg", log_msg_get_value(lm, LM_V_MESSAGE, NULL)),
427
642
                    NULL);
428
 
          self->last_msg_count++;
429
 
          
430
 
          if (self->last_msg_count == 1)
431
 
            {
432
 
              /* we only create the timer if there's at least one suppressed message */
433
 
              self->last_msg_timerid = g_timeout_add(self->options->suppress * 1000, last_msg_timer, self);
434
 
            }
435
643
          log_msg_drop(lm, path_options);
436
644
          return TRUE;
437
645
        }
443
651
    }
444
652
 
445
653
  log_writer_last_msg_record(self, lm);
446
 
 
 
654
  g_static_mutex_unlock(&self->suppress_lock);
447
655
  return FALSE;
448
656
}
449
657
 
 
658
/* NOTE: runs in the reader thread */
450
659
static void
451
 
log_writer_queue(LogPipe *s, LogMessage *lm, const LogPathOptions *path_options)
 
660
log_writer_queue(LogPipe *s, LogMessage *lm, const LogPathOptions *path_options, gpointer user_data)
452
661
{
453
662
  LogWriter *self = (LogWriter *) s;
454
 
 
 
663
  LogPathOptions local_options;
 
664
 
 
665
  if (!path_options->flow_control_requested &&
 
666
      (self->suspended || !(self->flags & LW_SOFT_FLOW_CONTROL)))
 
667
    {
 
668
      /* NOTE: this code ACKs the message back if there's a write error in
 
669
       * order not to hang the client in case of a disk full */
 
670
 
 
671
      path_options = log_msg_break_ack(lm, path_options, &local_options);
 
672
    }
455
673
  if (self->options->suppress > 0 && log_writer_last_msg_check(self, lm, path_options))
456
674
    return;
457
 
  
 
675
 
458
676
  stats_counter_inc(self->processed_messages);
459
 
  if (!log_queue_push_tail(self->queue, lm, path_options))
460
 
    {
461
 
      /* drop incoming message, we must ack here, otherwise the sender might
462
 
       * block forever, however this should not happen unless the sum of
463
 
       * window_sizes of sources feeding this writer exceeds log_fifo_size
464
 
       * or if flow control is not turned on.
465
 
       */
466
 
      
467
 
      /* we don't send a message here since the system is draining anyway */
468
 
      
469
 
      stats_counter_inc(self->dropped_messages);
470
 
      msg_debug("Destination queue full, dropping message",
471
 
                evt_tag_int("queue_len", log_queue_get_length(self->queue)),
472
 
                evt_tag_int("mem_fifo_size", self->options->mem_fifo_size),
473
 
                NULL);
474
 
      log_msg_drop(lm, path_options);
475
 
      return;
476
 
    }
 
677
  log_queue_push_tail(self->queue, lm, path_options);
477
678
}
478
679
 
479
680
static void
500
701
    g_string_append_c(result, ' ');
501
702
}
502
703
 
 
704
static void
 
705
log_writer_do_padding(LogWriter *self, GString *result)
 
706
{
 
707
  if (!self->options->padding)
 
708
    return;
 
709
 
 
710
  if(G_UNLIKELY(self->options->padding < result->len))
 
711
    {
 
712
      msg_warning("Padding is too small to hold the full message",
 
713
               evt_tag_int("padding", self->options->padding),
 
714
               evt_tag_int("msg_size", result->len),
 
715
               NULL);
 
716
      g_string_set_size(result, self->options->padding);
 
717
      return;
 
718
    }
 
719
  /* store the original length of the result */
 
720
  gint len = result->len;
 
721
  gint padd_bytes = self->options->padding - result->len;
 
722
  /* set the size to the padded size, this will allocate the string */
 
723
  g_string_set_size(result, self->options->padding);
 
724
  memset(result->str + len - 1, '\0', padd_bytes);
 
725
}
 
726
 
503
727
void
504
728
log_writer_format_log(LogWriter *self, LogMessage *lm, GString *result)
505
729
{
521
745
      gssize seqid_length;
522
746
 
523
747
      seqid = log_msg_get_value(lm, meta_seqid, &seqid_length);
524
 
      seqid = APPEND_ZERO(seqid, seqid_length);
 
748
      APPEND_ZERO(seqid, seqid, seqid_length);
525
749
      if (seqid[0])
526
750
        seq_num = strtol(seqid, NULL, 10);
527
751
      else
531
755
  /* no template was specified, use default */
532
756
  stamp = &lm->timestamps[LM_TS_STAMP];
533
757
 
 
758
  g_string_truncate(result, 0);
 
759
 
534
760
  if ((self->flags & LW_SYSLOG_PROTOCOL) || (self->options->options & LWO_SYSLOG_PROTOCOL))
535
761
    {
536
762
      gint len;
537
763
       
538
764
      /* we currently hard-wire version 1 */
539
 
      g_string_sprintf(result, "<%d>%d ", lm->pri, 1);
 
765
      g_string_append_c(result, '<');
 
766
      format_uint32_padded(result, 0, 0, 10, lm->pri);
 
767
      g_string_append_c(result, '>');
 
768
      g_string_append_c(result, '1');
 
769
      g_string_append_c(result, ' ');
540
770
 
541
771
      log_stamp_append_format(stamp, result, TS_FMT_ISO, 
542
 
                              time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->time.tv_sec),
 
772
                              time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->tv_sec),
543
773
                              self->options->template_options.frac_digits);
544
774
      g_string_append_c(result, ' ');
545
775
      
558
788
        }
559
789
#endif
560
790
      len = result->len;
561
 
      log_msg_append_format_sdata(lm, result);
 
791
      log_msg_append_format_sdata(lm, result, seq_num);
562
792
      if (len == result->len)
563
793
        {
564
794
          /* NOTE: sd_param format did not generate any output, take it as an empty SD string */
573
803
          log_template_append_format(self->options->template, lm, 
574
804
                                     &self->options->template_options,
575
805
                                     LTZ_SEND,
576
 
                                     seq_num,
 
806
                                     seq_num, NULL,
577
807
                                     result);
578
808
        }
579
809
      else
592
822
            }
593
823
        }
594
824
      g_string_append_c(result, '\n');
 
825
      log_writer_do_padding(self, result);
595
826
    }
596
827
  else
597
828
    {
614
845
          log_template_format(template, lm, 
615
846
                              &self->options->template_options,
616
847
                              LTZ_SEND,
617
 
                              seq_num,
 
848
                              seq_num, NULL,
618
849
                              result);
619
850
 
620
851
        }
626
857
          if (self->flags & LW_FORMAT_FILE)
627
858
            {
628
859
              log_stamp_format(stamp, result, self->options->template_options.ts_format,
629
 
                               time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->time.tv_sec),
 
860
                               time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->tv_sec),
630
861
                               self->options->template_options.frac_digits);
631
862
            }
632
863
          else if (self->flags & LW_FORMAT_PROTO)
633
864
            {
634
 
              g_string_sprintf(result, "<%d>", lm->pri);
 
865
              g_string_append_c(result, '<');
 
866
              format_uint32_padded(result, 0, 0, 10, lm->pri);
 
867
              g_string_append_c(result, '>');
635
868
 
636
869
              /* always use BSD timestamp by default, the use can override this using a custom template */
637
870
              log_stamp_append_format(stamp, result, TS_FMT_BSD,
638
 
                                      time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->time.tv_sec),
 
871
                                      time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->tv_sec),
639
872
                                      self->options->template_options.frac_digits);
640
873
            }
641
874
          g_string_append_c(result, ' ');
668
901
          p = log_msg_get_value(lm, LM_V_MESSAGE, &len);
669
902
          g_string_append_len(result, p, len);
670
903
          g_string_append_c(result, '\n');
671
 
 
 
904
          log_writer_do_padding(self, result);
672
905
        }
673
906
    }
674
907
  if (self->options->options & LWO_NO_MULTI_LINE)
689
922
static void
690
923
log_writer_broken(LogWriter *self, gint notify_code)
691
924
{
 
925
  log_writer_stop_watches(self);
692
926
  log_pipe_notify(self->control, &self->super, notify_code, self);
693
927
}
694
928
 
 
929
/*
 
930
 * Write messages to the underlying file descriptor using the installed
 
931
 * LogProto instance.  This is called whenever the output is ready to accept
 
932
 * further messages, and once during config deinitialization, in order to
 
933
 * flush messages still in the queue, in the hope that most of them can be
 
934
 * written out.
 
935
 *
 
936
 * In threaded mode, this function is invoked as part of the "output" task
 
937
 * (in essence, this is the function that performs the output task).
 
938
 *
 
939
 * @flush_mode specifies how hard LogWriter is trying to send messages to
 
940
 * the actual destination:
 
941
 *
 
942
 *
 
943
 * LW_FLUSH_NORMAL    - business as usual, flush when the buffer is full
 
944
 * LW_FLUSH_BUFFER    - flush the buffer immediately please
 
945
 * LW_FLUSH_QUEUE     - pull off any queued items, at maximum speed, even
 
946
 *                      ignoring throttle, and flush the buffer too
 
947
 *
 
948
 */
695
949
gboolean
696
 
log_writer_flush(LogWriter *self, gboolean flush_all)
 
950
log_writer_flush(LogWriter *self, LogWriterFlushMode flush_mode)
697
951
{
698
 
  GString *line = NULL;
699
 
  gint64 num_elements = 0;
700
 
  LogProto *proto = self->source ? ((LogWriterWatch *) self->source)->proto : NULL;
701
 
 
 
952
  LogProto *proto = self->proto;
 
953
  gint count = 0;
 
954
  gboolean ignore_throttle = (flush_mode >= LW_FLUSH_QUEUE);
 
955
  
702
956
  if (!proto)
703
957
    return FALSE;
704
 
  
705
 
  line = g_string_sized_new(128);
706
 
  if (self->queue)
707
 
    num_elements = log_queue_get_length(self->queue);
708
 
  else
709
 
    num_elements = 0;
710
 
 
711
 
  while (num_elements > 0 && (flush_all || !log_writer_throttling(self)))
 
958
 
 
959
  /* NOTE: in case we're reloading or exiting we flush all queued items as
 
960
   * long as the destination can consume it.  This is not going to be an
 
961
   * infinite loop, since the reader will cease to produce new messages when
 
962
   * main_loop_io_worker_job_quit() is set. */
 
963
 
 
964
  while (!main_loop_io_worker_job_quit() || flush_mode >= LW_FLUSH_QUEUE)
712
965
    {
713
966
      LogMessage *lm;
714
967
      LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
715
968
      gboolean consumed = FALSE;
716
969
      
717
 
      if (!log_queue_pop_head(self->queue, &lm, &path_options, FALSE))
718
 
        g_assert_not_reached();
 
970
      if (!log_queue_pop_head(self->queue, &lm, &path_options, FALSE, ignore_throttle))
 
971
        {
 
972
          /* no more items are available */
 
973
          break;
 
974
        }
719
975
 
 
976
      log_msg_refcache_start_consumer(lm, &path_options);
720
977
      msg_set_context(lm);
721
978
 
722
 
      log_writer_format_log(self, lm, line);
723
 
      
724
 
      /* account this message against the throttle rate */
725
 
      self->throttle_buckets--;
726
 
      
727
 
      if (line->len)
 
979
      log_writer_format_log(self, lm, self->line_buffer);
 
980
      
 
981
      if (self->line_buffer->len)
728
982
        {
729
983
          LogProtoStatus status;
730
984
 
731
 
          status = log_proto_post(proto, (guchar *) line->str, line->len, &consumed);
 
985
          status = log_proto_post(proto, (guchar *) self->line_buffer->str, self->line_buffer->len, &consumed);
732
986
          if (status == LPS_ERROR)
733
987
            {
734
 
              msg_set_context(NULL);
735
 
              g_string_free(line, TRUE);
736
 
              return FALSE;
 
988
              if ((self->options->options & LWO_IGNORE_ERRORS) == 0)
 
989
                {
 
990
                  msg_set_context(NULL);
 
991
                  log_msg_refcache_stop();
 
992
                  return FALSE;
 
993
                }
 
994
              else
 
995
                {
 
996
                  if (!consumed)
 
997
                    g_free(self->line_buffer->str);
 
998
                  consumed = TRUE;
 
999
                }
737
1000
            }
738
1001
          if (consumed)
739
1002
            {
740
 
              line->str = g_malloc0(1);
741
 
              line->allocated_len = 1;
742
 
              line->len = 0;
 
1003
              self->line_buffer->str = g_malloc(self->line_buffer->allocated_len);
 
1004
              self->line_buffer->str[0] = 0;
 
1005
              self->line_buffer->len = 0;
743
1006
            }
744
1007
        }
745
1008
      if (consumed)
754
1017
          /* push back to the queue */
755
1018
          log_queue_push_head(self->queue, lm, &path_options);
756
1019
 
757
 
          /* force exit of the loop */
758
 
          num_elements = 1;
 
1020
          msg_set_context(NULL);
 
1021
          log_msg_refcache_stop();
 
1022
          break;
759
1023
        }
760
1024
        
761
1025
      msg_set_context(NULL);
762
 
      num_elements--;
763
 
    }
764
 
  g_string_free(line, TRUE);
 
1026
      log_msg_refcache_stop();
 
1027
      count++;
 
1028
    }
 
1029
 
 
1030
  if (flush_mode >= LW_FLUSH_BUFFER || count == 0)
 
1031
    {
 
1032
      if (log_proto_flush(proto) == LPS_ERROR)
 
1033
        return FALSE;
 
1034
    }
 
1035
 
765
1036
  return TRUE;
766
1037
}
767
1038
 
 
1039
static void
 
1040
log_writer_init_watches(LogWriter *self)
 
1041
{
 
1042
  IV_FD_INIT(&self->fd_watch);
 
1043
  self->fd_watch.cookie = self;
 
1044
 
 
1045
  IV_TASK_INIT(&self->immed_io_task);
 
1046
  self->immed_io_task.cookie = self;
 
1047
  self->immed_io_task.handler = log_writer_io_flush_output;
 
1048
 
 
1049
  IV_TIMER_INIT(&self->suspend_timer);
 
1050
  self->suspend_timer.cookie = self;
 
1051
 
 
1052
  IV_TIMER_INIT(&self->suppress_timer);
 
1053
  self->suppress_timer.cookie = self;
 
1054
  self->suppress_timer.handler = (void (*)(void *)) log_writer_last_msg_timer;
 
1055
 
 
1056
  IV_EVENT_INIT(&self->queue_filled);
 
1057
  self->queue_filled.cookie = self;
 
1058
  self->queue_filled.handler = log_writer_queue_filled;
 
1059
 
 
1060
  main_loop_io_worker_job_init(&self->io_job);
 
1061
  self->io_job.user_data = self;
 
1062
  self->io_job.work = (void (*)(void *)) log_writer_work_perform;
 
1063
  self->io_job.completion = (void (*)(void *)) log_writer_work_finished;
 
1064
}
 
1065
 
768
1066
static gboolean
769
1067
log_writer_init(LogPipe *s)
770
1068
{
771
1069
  LogWriter *self = (LogWriter *) s;
772
1070
 
773
 
  if (!self->queue)
774
 
    self->queue = log_queue_new(self->options->mem_fifo_size);
775
 
  
 
1071
  g_assert(self->queue != NULL);
 
1072
  iv_event_register(&self->queue_filled);
 
1073
 
776
1074
  if ((self->options->options & LWO_NO_STATS) == 0 && !self->dropped_messages)
777
1075
    {
 
1076
      stats_lock();
778
1077
      stats_register_counter(self->stats_level, self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_DROPPED, &self->dropped_messages);
779
1078
      if (self->options->suppress > 0)
780
1079
        stats_register_counter(self->stats_level, self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_SUPPRESSED, &self->suppressed_messages);
781
1080
      stats_register_counter(self->stats_level, self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_PROCESSED, &self->processed_messages);
782
1081
      
783
1082
      stats_register_counter(self->stats_level, self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_STORED, &self->stored_messages);
 
1083
      stats_unlock();
 
1084
    }
 
1085
  self->suppress_timer_updated = TRUE;
 
1086
  log_queue_set_counters(self->queue, self->stored_messages, self->dropped_messages);
 
1087
  if (self->proto)
 
1088
    {
 
1089
      LogProto *proto;
 
1090
 
 
1091
      proto = self->proto;
 
1092
      self->proto = NULL;
 
1093
      log_writer_reopen(&self->super, proto);
784
1094
    }
785
1095
  return TRUE;
786
1096
}
790
1100
{
791
1101
  LogWriter *self = (LogWriter *) s;
792
1102
 
793
 
  log_writer_flush(self, TRUE);
794
 
  if (self->source)
795
 
    {
796
 
      g_source_destroy(self->source);
797
 
      g_source_unref(self->source);
798
 
      self->source = NULL;
799
 
    }
800
 
 
 
1103
  main_loop_assert_main_thread();
 
1104
 
 
1105
  log_queue_reset_parallel_push(self->queue);
 
1106
  log_writer_flush(self, LW_FLUSH_QUEUE);
 
1107
  /* FIXME: by the time we arrive here, it must be guaranteed that no
 
1108
   * _queue() call is running in a different thread, otherwise we'd need
 
1109
   * some kind of locking. */
 
1110
 
 
1111
  log_writer_stop_watches(self);
 
1112
  iv_event_unregister(&self->queue_filled);
 
1113
 
 
1114
  if (iv_timer_registered(&self->suppress_timer))
 
1115
    iv_timer_unregister(&self->suppress_timer);
 
1116
 
 
1117
  log_queue_set_counters(self->queue, NULL, NULL);
 
1118
 
 
1119
  stats_lock();
801
1120
  stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_DROPPED, &self->dropped_messages);
802
1121
  stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_SUPPRESSED, &self->suppressed_messages);
803
1122
  stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_PROCESSED, &self->processed_messages);
804
1123
  stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_STORED, &self->stored_messages);
 
1124
  stats_unlock();
805
1125
  
806
1126
  return TRUE;
807
1127
}
810
1130
log_writer_free(LogPipe *s)
811
1131
{
812
1132
  LogWriter *self = (LogWriter *) s;
813
 
  
 
1133
 
 
1134
  if (self->proto)
 
1135
    log_proto_free(self->proto);
 
1136
 
 
1137
  if (self->line_buffer)
 
1138
    g_string_free(self->line_buffer, TRUE);
814
1139
  if (self->queue)
815
 
    log_queue_free(self->queue);
816
 
  log_writer_last_msg_release(self);
 
1140
    log_queue_unref(self->queue);
 
1141
  if (self->last_msg)
 
1142
    log_msg_unref(self->last_msg);
817
1143
  g_free(self->stats_id);
818
1144
  g_free(self->stats_instance);
 
1145
  g_static_mutex_free(&self->suppress_lock);
 
1146
  g_static_mutex_free(&self->pending_proto_lock);
 
1147
  g_cond_free(self->pending_proto_cond);
 
1148
 
819
1149
  log_pipe_free_method(s);
820
1150
}
821
1151
 
822
 
gboolean
 
1152
/* FIXME: this is inherently racy */
 
1153
gboolean
 
1154
log_writer_has_pending_writes(LogWriter *self)
 
1155
{
 
1156
  return log_queue_get_length(self->queue) > 0 || !self->watches_running;
 
1157
}
 
1158
 
 
1159
gboolean
 
1160
log_writer_opened(LogWriter *self)
 
1161
{
 
1162
  return self->proto != NULL;
 
1163
}
 
1164
 
 
1165
/* run in the main thread in reaction to a log_writer_reopen to change
 
1166
 * the destination LogProto instance. It needs to be ran in the main
 
1167
 * thread as it reregisters the watches associated with the main
 
1168
 * thread. */
 
1169
void
 
1170
log_writer_reopen_deferred(gpointer s)
 
1171
{
 
1172
  gpointer *args = (gpointer *) s;
 
1173
  LogWriter *self = args[0];
 
1174
  LogProto *proto = args[1];
 
1175
 
 
1176
  init_sequence_number(&self->seq_num);
 
1177
 
 
1178
  if (self->io_job.working)
 
1179
    {
 
1180
      /* NOTE: proto can be NULL */
 
1181
      self->pending_proto = proto;
 
1182
      self->pending_proto_present = TRUE;
 
1183
      return;
 
1184
    }
 
1185
 
 
1186
  log_writer_stop_watches(self);
 
1187
 
 
1188
  if (self->proto)
 
1189
    log_proto_free(self->proto);
 
1190
 
 
1191
  self->proto = proto;
 
1192
 
 
1193
  if (proto)
 
1194
    log_writer_start_watches(self);
 
1195
}
 
1196
 
 
1197
/*
 
1198
 * This function can be called from any threads, from the main thread
 
1199
 * as well as I/O worker threads. It takes care about going to the
 
1200
 * main thread to actually switch LogProto under this writer.
 
1201
 *
 
1202
 * The writer may still be operating, (e.g. log_pipe_deinit/init is
 
1203
 * not needed).
 
1204
 *
 
1205
 * In case we're running in a non-main thread, then by the time this
 
1206
 * function returns, the reopen has finished. In case it is called
 
1207
 * from the main thread, this function may defer updating self->proto
 
1208
 * until the worker thread has finished. The reason for this
 
1209
 * difference is:
 
1210
 *
 
1211
 *   - if LogWriter is busy, then updating the LogProto instance is
 
1212
 *     deferred to log_writer_work_finished(), but that runs in the
 
1213
 *     main thread.
 
1214
 *
 
1215
 *   - normally, even this deferred update is waited for, but in case
 
1216
 *     we're in the main thread, we can't block.
 
1217
 *
 
1218
 * This situation could probably be improved, maybe the synchonous
 
1219
 * return of log_writer_reopen() is not needed by call sites, but I
 
1220
 * was not sure, and right before release I didn't want to take the
 
1221
 * risky approach.
 
1222
 */
 
1223
void
823
1224
log_writer_reopen(LogPipe *s, LogProto *proto)
824
1225
{
825
1226
  LogWriter *self = (LogWriter *) s;
826
 
  
827
 
  /* old fd is freed by the source */
828
 
  if (self->source)
829
 
    {
830
 
      g_source_destroy(self->source);
831
 
      g_source_unref(self->source);
832
 
      self->source = NULL;
833
 
    }
834
 
  
835
 
  if (proto)
836
 
    {
837
 
      self->source = log_writer_watch_new(self, proto);
838
 
      g_source_attach(self->source, NULL);
839
 
    }
840
 
  init_sequence_number(&self->seq_num);
841
 
  
842
 
  return TRUE;
 
1227
  gpointer args[] = { s, proto };
 
1228
 
 
1229
  main_loop_call((MainLoopTaskFunc) log_writer_reopen_deferred, args, TRUE);
 
1230
 
 
1231
  if (!main_loop_is_main_thread())
 
1232
    {
 
1233
      g_static_mutex_lock(&self->pending_proto_lock);
 
1234
      while (self->pending_proto_present)
 
1235
        {
 
1236
          g_cond_wait(self->pending_proto_cond, g_static_mutex_get_mutex(&self->pending_proto_lock));
 
1237
        }
 
1238
      g_static_mutex_unlock(&self->pending_proto_lock);
 
1239
    }
843
1240
}
844
1241
 
845
1242
void
850
1247
 
851
1248
  self->stats_level = stats_level;
852
1249
  self->stats_source = stats_source;
 
1250
 
 
1251
  if (self->stats_id)
 
1252
    g_free(self->stats_id);
853
1253
  self->stats_id = stats_id ? g_strdup(stats_id) : NULL;
 
1254
 
 
1255
  if (self->stats_instance)
 
1256
    g_free(self->stats_instance);
854
1257
  self->stats_instance = stats_instance ? g_strdup(stats_instance) : NULL;
855
 
 
856
 
  self->throttle_buckets = self->options->throttle;
857
1258
}
858
1259
 
859
1260
LogPipe *
866
1267
  self->super.deinit = log_writer_deinit;
867
1268
  self->super.queue = log_writer_queue;
868
1269
  self->super.free_fn = log_writer_free;
869
 
 
870
1270
  self->flags = flags;
871
 
  self->last_msg = NULL;
872
 
  self->last_msg_count = 0;
873
 
  self->last_msg_timerid = 0;
 
1271
  self->line_buffer = g_string_sized_new(128);
 
1272
  self->pollable_state = -1;
 
1273
  init_sequence_number(&self->seq_num);
 
1274
 
 
1275
  log_writer_init_watches(self);
 
1276
  g_static_mutex_init(&self->suppress_lock);
 
1277
  g_static_mutex_init(&self->pending_proto_lock);
 
1278
  self->pending_proto_cond = g_cond_new();
 
1279
 
874
1280
  return &self->super;
875
1281
}
876
1282
 
 
1283
/* returns a reference */
 
1284
LogQueue *
 
1285
log_writer_get_queue(LogPipe *s)
 
1286
{
 
1287
  LogWriter *self = (LogWriter *) s;
 
1288
 
 
1289
  return log_queue_ref(self->queue);
 
1290
}
 
1291
 
 
1292
/* consumes the reference */
 
1293
void
 
1294
log_writer_set_queue(LogPipe *s, LogQueue *queue)
 
1295
{
 
1296
  LogWriter *self = (LogWriter *)s;
 
1297
 
 
1298
  if (self->queue)
 
1299
    log_queue_unref(self->queue);
 
1300
  self->queue = queue;
 
1301
}
 
1302
 
877
1303
void 
878
1304
log_writer_options_defaults(LogWriterOptions *options)
879
1305
{
880
 
  options->mem_fifo_size = -1;
881
1306
  options->template = NULL;
882
1307
  options->flush_lines = -1;
883
1308
  options->flush_timeout = -1;
884
1309
  log_template_options_defaults(&options->template_options);
885
1310
  options->time_reopen = -1;
886
 
  options->suppress = 0;
 
1311
  options->suppress = -1;
 
1312
  options->padding = 0;
887
1313
}
888
1314
 
889
1315
void 
952
1378
    }
953
1379
  log_template_options_init(&options->template_options, cfg);
954
1380
  options->options |= option_flags;
955
 
  if (options->mem_fifo_size == -1)
956
 
    options->mem_fifo_size = MAX(1000, cfg->log_fifo_size);
957
1381
    
958
1382
  if (options->flush_lines == -1)
959
1383
    options->flush_lines = cfg->flush_lines;
960
1384
  if (options->flush_timeout == -1)
961
1385
    options->flush_timeout = cfg->flush_timeout;
962
 
    
963
 
  if (options->mem_fifo_size < options->flush_lines)
964
 
    {
965
 
      msg_error("The value of flush_lines must be less than log_fifo_size",
966
 
                evt_tag_int("log_fifo_size", options->mem_fifo_size),
967
 
                evt_tag_int("flush_lines", options->flush_lines),
968
 
                NULL);
969
 
      options->flush_lines = options->mem_fifo_size - 1;
970
 
    }
971
 
 
 
1386
  if (options->suppress == -1)
 
1387
    options->suppress = cfg->suppress;
972
1388
  if (options->time_reopen == -1)
973
1389
    options->time_reopen = cfg->time_reopen;
974
1390
  options->file_template = log_template_ref(cfg->file_template);
975
1391
  options->proto_template = log_template_ref(cfg->proto_template);
 
1392
  if (cfg->threaded)
 
1393
    options->options |= LWO_THREADED;
976
1394
}
977
1395
 
978
1396
void
991
1409
    return LWO_SYSLOG_PROTOCOL;
992
1410
  if (strcmp(flag, "no-multi-line") == 0 || strcmp(flag, "no_multi_line") == 0)
993
1411
    return LWO_NO_MULTI_LINE;
 
1412
  if (strcmp(flag, "threaded") == 0)
 
1413
    return LWO_THREADED;
 
1414
  if (strcmp(flag, "ignore-errors") == 0 || strcmp(flag, "ignore_errors") == 0)
 
1415
    return LWO_IGNORE_ERRORS;
994
1416
  msg_error("Unknown dest writer flag", evt_tag_str("flag", flag), NULL);
995
1417
  return 0;
996
1418
}