~ubuntu-branches/ubuntu/trusty/syslog-ng/trusty-proposed

« back to all changes in this revision

Viewing changes to .pc/log_writer_set_options-fixed-a-memory-leak.patch/lib/logwriter.c

  • Committer: Package Import Robot
  • Author(s): Laszlo Boszormenyi (GCS), Gergely Nagy
  • Date: 2011-10-11 14:30:48 UTC
  • mfrom: (1.3.7)
  • Revision ID: package-import@ubuntu.com-20111011143048-r1iljux9xbvj3lwh
Tags: 3.3.1.dfsg-1
* New upstream release with important fixes from upstream git tree with
  non-free manpages removed.
* Drop syslog-ng.conf(5) (closes: #496521).
* syslog-ng(8) is generated, and does not mention -Q anymore
  (closes: #616069).
* Supports CAP_SYSLOG on recent kernels (closes: #630172).
* Does not use g_timeout_add_seconds anymore (closes: #609154).

[ Gergely Nagy <algernon@madhouse-project.org> ]
* Update debian/copyright to DEP-5 format.
* Simplified the logrotate file by merging identical entries.
* Include local configuration files from /etc/syslog-ng/conf.d/ (Closes:
  #609050).
* Update syslog-ng.conf to be fully 3.3 compliant.
* Compress both source and binaries (except the syslog-ng meta
  package) with xz, instead of gzip.
* Use dpkg triggers to restart syslog-ng when appropriate.
* Include DFSG-free manual pages for all binaries.
* Build with Hardening enabled.
* Mention syslog(3) in /etc/default/syslog-ng, instead of
  <linux/kernel.h> (Closes: #608605)
* Support 'status' in the init script.
  Patch from Peter Eisentraut <petere@debian.org> (Closes: #644458)
* Build-Depend on libevtlog-dev (>= 0.2.12-5~) for correct shlibs.
* Use [linux-any] in Build-Depends instead of hardcoded links.
  (Closes: #634715)
* Use $SYSLOGNG_OPTS in the init script when reloading syslog-ng.
  (Closes: #589081)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary
 
3
 * Copyright (c) 1998-2010 Balázs Scheidler
 
4
 *
 
5
 * This library is free software; you can redistribute it and/or
 
6
 * modify it under the terms of the GNU Lesser General Public
 
7
 * License as published by the Free Software Foundation; either
 
8
 * version 2.1 of the License, or (at your option) any later version.
 
9
 *
 
10
 * This library is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
13
 * Lesser General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU Lesser General Public
 
16
 * License along with this library; if not, write to the Free Software
 
17
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
18
 *
 
19
 * As an additional exemption you are allowed to compile & link against the
 
20
 * OpenSSL libraries as published by the OpenSSL project. See the file
 
21
 * COPYING for details.
 
22
 *
 
23
 */
 
24
  
 
25
#include "logwriter.h"
 
26
#include "messages.h"
 
27
#include "stats.h"
 
28
#include "misc.h"
 
29
#include "mainloop.h"
 
30
#include "str-format.h"
 
31
 
 
32
#include <unistd.h>
 
33
#include <assert.h>
 
34
#include <string.h>
 
35
#include <stdlib.h>
 
36
#include <sys/stat.h>
 
37
#include <unistd.h>
 
38
#include <iv.h>
 
39
#include <iv_event.h>
 
40
#include <iv_work.h>
 
41
 
 
42
typedef enum
 
43
{
 
44
  /* flush modes */
 
45
 
 
46
  /* business as usual, flush when the buffer is full */
 
47
  LW_FLUSH_NORMAL,
 
48
  /* flush the buffer immediately please */
 
49
  LW_FLUSH_BUFFER,
 
50
  /* pull off any queued items, at maximum speed, even ignoring throttle, and flush the buffer too */
 
51
  LW_FLUSH_QUEUE,
 
52
} LogWriterFlushMode;
 
53
 
 
54
struct _LogWriter
 
55
{
 
56
  LogPipe super;
 
57
  LogQueue *queue;
 
58
  guint32 flags:31;
 
59
  gint32 seq_num;
 
60
  StatsCounterItem *dropped_messages;
 
61
  StatsCounterItem *suppressed_messages;
 
62
  StatsCounterItem *processed_messages;
 
63
  StatsCounterItem *stored_messages;
 
64
  LogPipe *control;
 
65
  LogWriterOptions *options;
 
66
  GStaticMutex suppress_lock;
 
67
  LogMessage *last_msg;
 
68
  guint32 last_msg_count;
 
69
  GString *line_buffer;
 
70
 
 
71
  gint stats_level;
 
72
  guint16 stats_source;
 
73
  gchar *stats_id;
 
74
  gchar *stats_instance;
 
75
 
 
76
  struct iv_fd fd_watch;
 
77
  struct iv_timer suspend_timer;
 
78
  struct iv_task immed_io_task;
 
79
  struct iv_event queue_filled;
 
80
  MainLoopIOWorkerJob io_job;
 
81
  struct iv_timer suppress_timer;
 
82
  struct timespec suppress_timer_expires;
 
83
  gboolean suppress_timer_updated;
 
84
  gboolean work_result;
 
85
  gint pollable_state;
 
86
  LogProto *proto, *pending_proto;
 
87
  gboolean watches_running:1, suspended:1, working:1, flush_waiting_for_timeout:1;
 
88
  gboolean pending_proto_present;
 
89
  GCond *pending_proto_cond;
 
90
  GStaticMutex pending_proto_lock;
 
91
};
 
92
 
 
93
/**
 
94
 * LogWriter behaviour
 
95
 * ~~~~~~~~~~~~~~~~~~~
 
96
 *
 
97
 * LogWriter is a core element of syslog-ng sending messages out to some
 
98
 * kind of destination represented by a UNIX fd. Outgoing messages are sent
 
99
 * to the target asynchronously, first by placing them to a queue and then
 
100
 * sending messages when poll() indicates that the fd is writable.
 
101
 *
 
102
 * 
 
103
 * Flow control
 
104
 * ------------
 
105
 * For a simple log writer without a disk buffer messages are placed on a
 
106
 * GQueue and they are acknowledged when the send() system call returned
 
107
 * success. This is more complex when disk buffering is used, in which case
 
108
 * messages are put to the "disk buffer" first and acknowledged immediately. 
 
109
 * (this way the reader never stops when the disk buffer area is not yet
 
110
 * full). When disk buffer reaches its limit, messages are added to the the
 
111
 * usual GQueue and messages get acknowledged when they are moved to the
 
112
 * disk buffer.
 
113
 *
 
114
 **/
 
115
 
 
116
static gboolean log_writer_flush(LogWriter *self, LogWriterFlushMode flush_mode);
 
117
static void log_writer_broken(LogWriter *self, gint notify_code);
 
118
static void log_writer_start_watches(LogWriter *self);
 
119
static void log_writer_stop_watches(LogWriter *self);
 
120
static void log_writer_update_watches(LogWriter *self);
 
121
static void log_writer_suspend(LogWriter *self);
 
122
 
 
123
static void
 
124
log_writer_work_perform(gpointer s)
 
125
{
 
126
  LogWriter *self = (LogWriter *) s;
 
127
 
 
128
  g_assert((self->super.flags & PIF_INITIALIZED) != 0);
 
129
  self->work_result = log_writer_flush(self, self->flush_waiting_for_timeout ? LW_FLUSH_BUFFER : LW_FLUSH_NORMAL);
 
130
}
 
131
 
 
132
static void
 
133
log_writer_work_finished(gpointer s)
 
134
{
 
135
  LogWriter *self = (LogWriter *) s;
 
136
 
 
137
  main_loop_assert_main_thread();
 
138
  self->flush_waiting_for_timeout = FALSE;
 
139
 
 
140
  if (self->pending_proto_present)
 
141
    {
 
142
      /* pending proto is only set in the main thread, so no need to
 
143
       * lock it before coming here. After we're syncing with the
 
144
       * log_writer_reopen() call, quite possibly coming from a
 
145
       * non-main thread. */
 
146
 
 
147
      g_static_mutex_lock(&self->pending_proto_lock);
 
148
      if (self->proto)
 
149
        log_proto_free(self->proto);
 
150
 
 
151
      self->proto = self->pending_proto;
 
152
      self->pending_proto = NULL;
 
153
      self->pending_proto_present = FALSE;
 
154
 
 
155
      g_cond_signal(self->pending_proto_cond);
 
156
      g_static_mutex_unlock(&self->pending_proto_lock);
 
157
    }
 
158
 
 
159
  if (!self->work_result)
 
160
    {
 
161
      log_writer_broken(self, NC_WRITE_ERROR);
 
162
      if (self->proto)
 
163
        {
 
164
          log_writer_suspend(self);
 
165
          msg_notice("Suspending write operation because of an I/O error",
 
166
                     evt_tag_int("fd", log_proto_get_fd(self->proto)),
 
167
                     evt_tag_int("time_reopen", self->options->time_reopen),
 
168
                     NULL);
 
169
        }
 
170
      goto exit;
 
171
    }
 
172
 
 
173
  if ((self->super.flags & PIF_INITIALIZED) && self->proto)
 
174
    {
 
175
      /* reenable polling the source, but only if we're still initialized */
 
176
      log_writer_start_watches(self);
 
177
    }
 
178
 
 
179
exit:
 
180
  log_pipe_unref(&self->super);
 
181
}
 
182
 
 
183
static void
 
184
log_writer_io_flush_output(gpointer s)
 
185
{
 
186
  LogWriter *self = (LogWriter *) s;
 
187
 
 
188
  main_loop_assert_main_thread();
 
189
 
 
190
  log_writer_stop_watches(self);
 
191
  log_pipe_ref(&self->super);
 
192
  if ((self->options->options & LWO_THREADED))
 
193
    {
 
194
      main_loop_io_worker_job_submit(&self->io_job);
 
195
    }
 
196
  else
 
197
    {
 
198
      log_writer_work_perform(s);
 
199
      log_writer_work_finished(s);
 
200
    }
 
201
}
 
202
 
 
203
static void
 
204
log_writer_io_error(gpointer s)
 
205
{
 
206
  LogWriter *self = (LogWriter *) s;
 
207
 
 
208
  if (self->fd_watch.handler_out == NULL && self->fd_watch.handler_in == NULL)
 
209
    {
 
210
      msg_debug("POLLERR occurred while idle",
 
211
                evt_tag_int("fd", log_proto_get_fd(self->proto)),
 
212
                NULL);
 
213
      log_writer_broken(self, NC_WRITE_ERROR);
 
214
      return;
 
215
    }
 
216
  else
 
217
    {
 
218
      /* in case we have an error state but we also asked for read/write
 
219
       * polling, the error should be handled by the I/O callback.  But we
 
220
       * need not call that explicitly as ivykis does that for us.  */
 
221
    }
 
222
  log_writer_update_watches(self);
 
223
}
 
224
 
 
225
static void
 
226
log_writer_io_check_eof(gpointer s)
 
227
{
 
228
  LogWriter *self = (LogWriter *) s;
 
229
 
 
230
  msg_error("EOF occurred while idle",
 
231
            evt_tag_int("fd", log_proto_get_fd(self->proto)),
 
232
            NULL);
 
233
  log_writer_broken(self, NC_CLOSE);
 
234
}
 
235
 
 
236
static void
 
237
log_writer_error_suspend_elapsed(gpointer s)
 
238
{
 
239
  LogWriter *self = (LogWriter *) s;
 
240
 
 
241
  self->suspended = FALSE;
 
242
  msg_notice("Error suspend timeout has elapsed, attempting to write again",
 
243
             evt_tag_int("fd", log_proto_get_fd(self->proto)),
 
244
             NULL);
 
245
  log_writer_update_watches(self);
 
246
}
 
247
 
 
248
static void
 
249
log_writer_update_fd_callbacks(LogWriter *self, GIOCondition cond)
 
250
{
 
251
  main_loop_assert_main_thread();
 
252
  if (self->pollable_state > 0)
 
253
    {
 
254
      if (self->flags & LW_DETECT_EOF && (cond & G_IO_IN) == 0 && (cond & G_IO_OUT))
 
255
        {
 
256
          /* if output is enabled, and we're in DETECT_EOF mode, and input is
 
257
           * not needed by the log protocol, install the eof check callback to
 
258
           * destroy the connection if an EOF is received. */
 
259
 
 
260
          iv_fd_set_handler_in(&self->fd_watch, log_writer_io_check_eof);
 
261
        }
 
262
      else if (cond & G_IO_IN)
 
263
        {
 
264
          /* in case the protocol requested G_IO_IN, it means that it needs to
 
265
           * invoke read in the flush code, so just install the flush_output
 
266
           * handler for input */
 
267
 
 
268
          iv_fd_set_handler_in(&self->fd_watch, log_writer_io_flush_output);
 
269
        }
 
270
      else
 
271
        {
 
272
          /* otherwise we're not interested in input */
 
273
          iv_fd_set_handler_in(&self->fd_watch, NULL);
 
274
        }
 
275
      if (cond & G_IO_OUT)
 
276
        iv_fd_set_handler_out(&self->fd_watch, log_writer_io_flush_output);
 
277
      else
 
278
        iv_fd_set_handler_out(&self->fd_watch, NULL);
 
279
 
 
280
      iv_fd_set_handler_err(&self->fd_watch, log_writer_io_error);
 
281
    }
 
282
  else
 
283
    {
 
284
      /* fd is not pollable, assume it is always writable */
 
285
      if (cond & G_IO_OUT)
 
286
        {
 
287
          if (!iv_task_registered(&self->immed_io_task))
 
288
            iv_task_register(&self->immed_io_task);
 
289
        }
 
290
      else if (iv_task_registered(&self->immed_io_task))
 
291
        {
 
292
          iv_task_unregister(&self->immed_io_task);
 
293
        }
 
294
    }
 
295
}
 
296
 
 
297
void
 
298
log_writer_arm_suspend_timer(LogWriter *self, void (*handler)(void *), gint timeout_msec)
 
299
{
 
300
  if (iv_timer_registered(&self->suspend_timer))
 
301
    iv_timer_unregister(&self->suspend_timer);
 
302
  iv_validate_now();
 
303
  self->suspend_timer.handler = handler;
 
304
  self->suspend_timer.expires = iv_now;
 
305
  timespec_add_msec(&self->suspend_timer.expires, timeout_msec);
 
306
  iv_timer_register(&self->suspend_timer);
 
307
}
 
308
 
 
309
static void
 
310
log_writer_queue_filled(gpointer s)
 
311
{
 
312
  LogWriter *self = (LogWriter *) s;
 
313
 
 
314
  main_loop_assert_main_thread();
 
315
 
 
316
  /*
 
317
   * NOTE: This theory is somewhat questionable, e.g. I'm not 100% sure it
 
318
   * is the right scenario, but the race was closed.  So take this with a
 
319
   * grain of salt.
 
320
   *
 
321
   * The queue_filled callback is running in the main thread. Because of the
 
322
   * possible delay caused by iv_event_post() the callback might be
 
323
   * delivered event after stop_watches() has been called.
 
324
   *
 
325
   *   - log_writer_schedule_update_watches() is called by the reader
 
326
   *     thread, which calls iv_event_post()
 
327
   *   - the main thread calls stop_watches() in work_perform
 
328
   *   - the event is delivered in the main thread
 
329
   *
 
330
   * But since stop/start watches always run in the main thread and we do
 
331
   * too, we can check if this is the case.  A LogWriter without watches
 
332
   * running is busy writing out data to the destination, e.g.  a
 
333
   * start_watches is to be expected once log_writer_work_finished() is run
 
334
   * at the end of the deferred work, executed by the I/O threads.
 
335
   */
 
336
  if (self->watches_running)
 
337
    log_writer_update_watches((LogWriter *) s);
 
338
}
 
339
 
 
340
/* NOTE: runs in the source thread */
 
341
static void
 
342
log_writer_schedule_update_watches(LogWriter *self)
 
343
{
 
344
  iv_event_post(&self->queue_filled);
 
345
}
 
346
 
 
347
static void
 
348
log_writer_suspend(LogWriter *self)
 
349
{
 
350
  /* flush code indicates that we need to suspend our writing activities
 
351
   * until time_reopen elapses */
 
352
 
 
353
  log_writer_arm_suspend_timer(self, log_writer_error_suspend_elapsed, self->options->time_reopen * 1e3);
 
354
  self->suspended = TRUE;
 
355
}
 
356
 
 
357
static void
 
358
log_writer_update_watches(LogWriter *self)
 
359
{
 
360
  gint fd;
 
361
  GIOCondition cond = 0;
 
362
  gboolean partial_batch;
 
363
  gint timeout_msec = 0;
 
364
 
 
365
  main_loop_assert_main_thread();
 
366
 
 
367
  /* NOTE: we either start the suspend_timer or enable the fd_watch. The two MUST not happen at the same time. */
 
368
 
 
369
  if (log_proto_prepare(self->proto, &fd, &cond) ||
 
370
      self->flush_waiting_for_timeout ||
 
371
      log_queue_check_items(self->queue, self->options->flush_lines, &partial_batch, &timeout_msec,
 
372
                            (LogQueuePushNotifyFunc) log_writer_schedule_update_watches, self, NULL))
 
373
    {
 
374
      /* flush_lines number of element is already available and throttle would permit us to send. */
 
375
      log_writer_update_fd_callbacks(self, cond);
 
376
    }
 
377
  else if (partial_batch || timeout_msec)
 
378
    {
 
379
      /* few elements are available, but less than flush_lines, we need to start a timer to initiate a flush */
 
380
 
 
381
      log_writer_update_fd_callbacks(self, 0);
 
382
      self->flush_waiting_for_timeout = TRUE;
 
383
      log_writer_arm_suspend_timer(self, (void (*)(void *)) log_writer_update_watches, timeout_msec ? timeout_msec : self->options->flush_timeout);
 
384
    }
 
385
  else
 
386
    {
 
387
      /* no elements or no throttle space, wait for a wakeup by the queue
 
388
       * when the required number of items are added.  see the
 
389
       * log_queue_check_items and its parallel_push argument above
 
390
       */
 
391
      log_writer_update_fd_callbacks(self, 0);
 
392
    }
 
393
}
 
394
 
 
395
static gboolean
 
396
is_file_regular(gint fd)
 
397
{
 
398
  struct stat st;
 
399
 
 
400
  if (fstat(fd, &st) >= 0)
 
401
    {
 
402
      return S_ISREG(st.st_mode);
 
403
    }
 
404
 
 
405
  /* if stat fails, that's interesting, but we should probably poll
 
406
   * it, hopefully that's less likely to cause spinning */
 
407
 
 
408
  return FALSE;
 
409
}
 
410
 
 
411
static void
 
412
log_writer_start_watches(LogWriter *self)
 
413
{
 
414
  gint fd;
 
415
  GIOCondition cond;
 
416
 
 
417
  if (!self->watches_running)
 
418
    {
 
419
      log_proto_prepare(self->proto, &fd, &cond);
 
420
 
 
421
      if (self->pollable_state < 0)
 
422
        {
 
423
          if (is_file_regular(fd))
 
424
            self->pollable_state = 0;
 
425
          else
 
426
            self->pollable_state = iv_fd_pollable(fd);
 
427
        }
 
428
 
 
429
      if (self->pollable_state)
 
430
        {
 
431
          self->fd_watch.fd = fd;
 
432
          iv_fd_register(&self->fd_watch);
 
433
        }
 
434
 
 
435
      log_writer_update_watches(self);
 
436
      self->watches_running = TRUE;
 
437
    }
 
438
}
 
439
 
 
440
static void
 
441
log_writer_stop_watches(LogWriter *self)
 
442
{
 
443
  if (self->watches_running)
 
444
    {
 
445
      if (iv_timer_registered(&self->suspend_timer))
 
446
        iv_timer_unregister(&self->suspend_timer);
 
447
      if (iv_fd_registered(&self->fd_watch))
 
448
        iv_fd_unregister(&self->fd_watch);
 
449
      if (iv_task_registered(&self->immed_io_task))
 
450
        iv_task_unregister(&self->immed_io_task);
 
451
 
 
452
      log_queue_reset_parallel_push(self->queue);
 
453
 
 
454
      self->watches_running = FALSE;
 
455
    }
 
456
}
 
457
 
 
458
/* function called using main_loop_call() in case the suppress timer needs
 
459
 * to be updated */
 
460
static void
 
461
log_writer_perform_suppress_timer_update(LogWriter *self)
 
462
{
 
463
  main_loop_assert_main_thread();
 
464
 
 
465
  if (iv_timer_registered(&self->suppress_timer))
 
466
    iv_timer_unregister(&self->suppress_timer);
 
467
  g_static_mutex_lock(&self->suppress_lock);
 
468
  self->suppress_timer.expires = self->suppress_timer_expires;
 
469
  self->suppress_timer_updated = TRUE;
 
470
  g_static_mutex_unlock(&self->suppress_lock);
 
471
  if (self->suppress_timer.expires.tv_sec > 0)
 
472
    iv_timer_register(&self->suppress_timer);
 
473
  log_pipe_unref(&self->super);
 
474
}
 
475
 
 
476
/*
 
477
 * Update the suppress timer in a deferred manner, possibly batching the
 
478
 * results of multiple updates to the suppress timer.  This is necessary as
 
479
 * suppress timer updates must run in the main thread, and updating it every
 
480
 * time a new message comes in would cause enormous latency in the fast
 
481
 * path. By collecting multiple updates
 
482
 *
 
483
 * msec == 0 means to turn off the suppress timer
 
484
 * msec >  0 to enable the timer with the specified timeout
 
485
 *
 
486
 * NOTE: suppress_lock must be held.
 
487
 */
 
488
static void
 
489
log_writer_update_suppress_timer(LogWriter *self, glong sec)
 
490
{
 
491
  gboolean invoke;
 
492
  struct timespec next_expires;
 
493
 
 
494
  iv_validate_now();
 
495
 
 
496
  /* we deliberately use nsec == 0 in order to increase the likelyhood that
 
497
   * we target the same second, in case only a fraction of a second has
 
498
   * passed between two updates.  */
 
499
  if (sec)
 
500
    {
 
501
      next_expires.tv_nsec = 0;
 
502
      next_expires.tv_sec = iv_now.tv_sec + sec;
 
503
    }
 
504
  else
 
505
    {
 
506
      next_expires.tv_sec = 0;
 
507
      next_expires.tv_nsec = 0;
 
508
    }
 
509
  /* last update was finished, we need to invoke the updater again */
 
510
  invoke = ((next_expires.tv_sec != self->suppress_timer_expires.tv_sec) || (next_expires.tv_nsec != self->suppress_timer_expires.tv_nsec)) && self->suppress_timer_updated;
 
511
  self->suppress_timer_updated = FALSE;
 
512
 
 
513
  if (invoke)
 
514
    {
 
515
      self->suppress_timer_expires = next_expires;
 
516
      g_static_mutex_unlock(&self->suppress_lock);
 
517
      log_pipe_ref(&self->super);
 
518
      main_loop_call((void *(*)(void *)) log_writer_perform_suppress_timer_update, self, FALSE);
 
519
      g_static_mutex_lock(&self->suppress_lock);
 
520
    }
 
521
 
 
522
}
 
523
 
 
524
/*
 
525
 * NOTE: suppress_lock must be held.
 
526
 */
 
527
static void
 
528
log_writer_last_msg_release(LogWriter *self)
 
529
{
 
530
  log_writer_update_suppress_timer(self, 0);
 
531
  if (self->last_msg)
 
532
    log_msg_unref(self->last_msg);
 
533
 
 
534
  self->last_msg = NULL;
 
535
  self->last_msg_count = 0;
 
536
}
 
537
 
 
538
/*
 
539
 * NOTE: suppress_lock must be held.
 
540
 */
 
541
static void
 
542
log_writer_last_msg_flush(LogWriter *self)
 
543
{
 
544
  LogMessage *m;
 
545
  LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
 
546
  gchar hostname[256];
 
547
  gchar buf[1024];
 
548
  gssize len;
 
549
  const gchar *p;
 
550
 
 
551
  msg_debug("Suppress timer elapsed, emitting suppression summary", 
 
552
            NULL);
 
553
 
 
554
  getlonghostname(hostname, sizeof(hostname));
 
555
  m = log_msg_new_empty();
 
556
  m->timestamps[LM_TS_STAMP] = m->timestamps[LM_TS_RECVD];
 
557
  m->pri = self->last_msg->pri;
 
558
  m->flags = LF_INTERNAL | LF_LOCAL;
 
559
 
 
560
  p = log_msg_get_value(self->last_msg, LM_V_HOST, &len);
 
561
  log_msg_set_value(m, LM_V_HOST, p, len);
 
562
  p = log_msg_get_value(self->last_msg, LM_V_PROGRAM, &len);
 
563
  log_msg_set_value(m, LM_V_PROGRAM, p, len);
 
564
 
 
565
  len = g_snprintf(buf, sizeof(buf), "Last message '%.20s' repeated %d times, suppressed by syslog-ng on %s",
 
566
                   log_msg_get_value(self->last_msg, LM_V_MESSAGE, NULL),
 
567
                   self->last_msg_count,
 
568
                   hostname);
 
569
  log_msg_set_value(m, LM_V_MESSAGE, buf, len);
 
570
 
 
571
  path_options.ack_needed = FALSE;
 
572
 
 
573
  log_queue_push_tail(self->queue, m, &path_options);
 
574
  log_writer_last_msg_release(self);
 
575
}
 
576
 
 
577
 
 
578
/**
 
579
 * Remember the last message for dup detection.
 
580
 *
 
581
 * NOTE: suppress_lock must be held.
 
582
 **/
 
583
static void
 
584
log_writer_last_msg_record(LogWriter *self, LogMessage *lm)
 
585
{
 
586
  if (self->last_msg)
 
587
    log_msg_unref(self->last_msg);
 
588
 
 
589
  log_msg_ref(lm);
 
590
  self->last_msg = lm;
 
591
  self->last_msg_count = 0;
 
592
}
 
593
 
 
594
static gboolean
 
595
log_writer_last_msg_timer(gpointer pt)
 
596
{
 
597
  LogWriter *self = (LogWriter *) pt;
 
598
 
 
599
  main_loop_assert_main_thread();
 
600
 
 
601
  g_static_mutex_lock(&self->suppress_lock);
 
602
  log_writer_last_msg_flush(self);
 
603
  g_static_mutex_unlock(&self->suppress_lock);
 
604
 
 
605
  return FALSE;
 
606
}
 
607
 
 
608
/**
 
609
 * log_writer_last_msg_check:
 
610
 *
 
611
 * This function is called to suppress duplicate messages from a given host.
 
612
 *
 
613
 * Returns TRUE to indicate that the message was consumed.
 
614
 **/
 
615
static gboolean
 
616
log_writer_last_msg_check(LogWriter *self, LogMessage *lm, const LogPathOptions *path_options)
 
617
{
 
618
  g_static_mutex_lock(&self->suppress_lock);
 
619
  if (self->last_msg)
 
620
    {
 
621
      if (self->last_msg->timestamps[LM_TS_RECVD].tv_sec >= lm->timestamps[LM_TS_RECVD].tv_sec - self->options->suppress &&
 
622
          strcmp(log_msg_get_value(self->last_msg, LM_V_MESSAGE, NULL), log_msg_get_value(lm, LM_V_MESSAGE, NULL)) == 0 &&
 
623
          strcmp(log_msg_get_value(self->last_msg, LM_V_HOST, NULL), log_msg_get_value(lm, LM_V_HOST, NULL)) == 0 &&
 
624
          strcmp(log_msg_get_value(self->last_msg, LM_V_PROGRAM, NULL), log_msg_get_value(lm, LM_V_PROGRAM, NULL)) == 0 &&
 
625
          strcmp(log_msg_get_value(self->last_msg, LM_V_PID, NULL), log_msg_get_value(lm, LM_V_PID, NULL)) == 0 &&
 
626
          strcmp(log_msg_get_value(lm, LM_V_MESSAGE, NULL), "-- MARK --") != 0)
 
627
        {
 
628
          stats_counter_inc(self->suppressed_messages);
 
629
          self->last_msg_count++;
 
630
          
 
631
          if (self->last_msg_count == 1)
 
632
            {
 
633
              /* we only create the timer if this is the first suppressed message, otherwise it is already running. */
 
634
 
 
635
              log_writer_update_suppress_timer(self, self->options->suppress);
 
636
            }
 
637
          g_static_mutex_unlock(&self->suppress_lock);
 
638
 
 
639
          msg_debug("Suppressing duplicate message",
 
640
                    evt_tag_str("host", log_msg_get_value(lm, LM_V_HOST, NULL)),
 
641
                    evt_tag_str("msg", log_msg_get_value(lm, LM_V_MESSAGE, NULL)),
 
642
                    NULL);
 
643
          log_msg_drop(lm, path_options);
 
644
          return TRUE;
 
645
        }
 
646
 
 
647
      if (self->last_msg_count)
 
648
        log_writer_last_msg_flush(self);
 
649
      else
 
650
        log_writer_last_msg_release(self);
 
651
    }
 
652
 
 
653
  log_writer_last_msg_record(self, lm);
 
654
  g_static_mutex_unlock(&self->suppress_lock);
 
655
  return FALSE;
 
656
}
 
657
 
 
658
/* NOTE: runs in the reader thread */
 
659
static void
 
660
log_writer_queue(LogPipe *s, LogMessage *lm, const LogPathOptions *path_options, gpointer user_data)
 
661
{
 
662
  LogWriter *self = (LogWriter *) s;
 
663
  LogPathOptions local_options;
 
664
 
 
665
  if (!path_options->flow_control_requested &&
 
666
      (self->suspended || !(self->flags & LW_SOFT_FLOW_CONTROL)))
 
667
    {
 
668
      /* NOTE: this code ACKs the message back if there's a write error in
 
669
       * order not to hang the client in case of a disk full */
 
670
 
 
671
      path_options = log_msg_break_ack(lm, path_options, &local_options);
 
672
    }
 
673
  if (self->options->suppress > 0 && log_writer_last_msg_check(self, lm, path_options))
 
674
    return;
 
675
 
 
676
  stats_counter_inc(self->processed_messages);
 
677
  log_queue_push_tail(self->queue, lm, path_options);
 
678
}
 
679
 
 
680
static void
 
681
log_writer_append_value(GString *result, LogMessage *lm, NVHandle handle, gboolean use_nil, gboolean append_space)
 
682
{
 
683
  const gchar *value;
 
684
  gssize value_len;
 
685
 
 
686
  value = log_msg_get_value(lm, handle, &value_len);
 
687
  if (use_nil && value_len == 0)
 
688
    g_string_append_c(result, '-');
 
689
  else
 
690
    {
 
691
      gchar *space;
 
692
      
 
693
      space = strchr(value, ' ');
 
694
      
 
695
      if (!space)
 
696
        g_string_append_len(result, value, value_len);
 
697
      else
 
698
        g_string_append_len(result, value, space - value);
 
699
    }
 
700
  if (append_space)
 
701
    g_string_append_c(result, ' ');
 
702
}
 
703
 
 
704
static void
 
705
log_writer_do_padding(LogWriter *self, GString *result)
 
706
{
 
707
  if (!self->options->padding)
 
708
    return;
 
709
 
 
710
  if(G_UNLIKELY(self->options->padding < result->len))
 
711
    {
 
712
      msg_warning("Padding is too small to hold the full message",
 
713
               evt_tag_int("padding", self->options->padding),
 
714
               evt_tag_int("msg_size", result->len),
 
715
               NULL);
 
716
      g_string_set_size(result, self->options->padding);
 
717
      return;
 
718
    }
 
719
  /* store the original length of the result */
 
720
  gint len = result->len;
 
721
  gint padd_bytes = self->options->padding - result->len;
 
722
  /* set the size to the padded size, this will allocate the string */
 
723
  g_string_set_size(result, self->options->padding);
 
724
  memset(result->str + len - 1, '\0', padd_bytes);
 
725
}
 
726
 
 
727
void
 
728
log_writer_format_log(LogWriter *self, LogMessage *lm, GString *result)
 
729
{
 
730
  LogTemplate *template = NULL;
 
731
  LogStamp *stamp;
 
732
  guint32 seq_num;
 
733
  static NVHandle meta_seqid = 0;
 
734
 
 
735
  if (!meta_seqid)
 
736
    meta_seqid = log_msg_get_value_handle(".SDATA.meta.sequenceId");
 
737
 
 
738
  if (lm->flags & LF_LOCAL)
 
739
    {
 
740
      seq_num = self->seq_num;
 
741
    }
 
742
  else
 
743
    {
 
744
      const gchar *seqid;
 
745
      gssize seqid_length;
 
746
 
 
747
      seqid = log_msg_get_value(lm, meta_seqid, &seqid_length);
 
748
      APPEND_ZERO(seqid, seqid, seqid_length);
 
749
      if (seqid[0])
 
750
        seq_num = strtol(seqid, NULL, 10);
 
751
      else
 
752
        seq_num = 0;
 
753
    }
 
754
  
 
755
  /* no template was specified, use default */
 
756
  stamp = &lm->timestamps[LM_TS_STAMP];
 
757
 
 
758
  g_string_truncate(result, 0);
 
759
 
 
760
  if ((self->flags & LW_SYSLOG_PROTOCOL) || (self->options->options & LWO_SYSLOG_PROTOCOL))
 
761
    {
 
762
      gint len;
 
763
       
 
764
      /* we currently hard-wire version 1 */
 
765
      g_string_append_c(result, '<');
 
766
      format_uint32_padded(result, 0, 0, 10, lm->pri);
 
767
      g_string_append_c(result, '>');
 
768
      g_string_append_c(result, '1');
 
769
      g_string_append_c(result, ' ');
 
770
 
 
771
      log_stamp_append_format(stamp, result, TS_FMT_ISO, 
 
772
                              time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->tv_sec),
 
773
                              self->options->template_options.frac_digits);
 
774
      g_string_append_c(result, ' ');
 
775
      
 
776
      log_writer_append_value(result, lm, LM_V_HOST, TRUE, TRUE);
 
777
      log_writer_append_value(result, lm, LM_V_PROGRAM, TRUE, TRUE);
 
778
      log_writer_append_value(result, lm, LM_V_PID, TRUE, TRUE);
 
779
      log_writer_append_value(result, lm, LM_V_MSGID, TRUE, TRUE);
 
780
 
 
781
#if 0
 
782
      if (lm->flags & LF_LOCAL)
 
783
        {
 
784
          gchar sequence_id[16];
 
785
          
 
786
          g_snprintf(sequence_id, sizeof(sequence_id), "%d", seq_num);
 
787
          log_msg_update_sdata(lm, "meta", "sequenceId", sequence_id);
 
788
        }
 
789
#endif
 
790
      len = result->len;
 
791
      log_msg_append_format_sdata(lm, result, seq_num);
 
792
      if (len == result->len)
 
793
        {
 
794
          /* NOTE: sd_param format did not generate any output, take it as an empty SD string */
 
795
          g_string_append_c(result, '-');
 
796
        }
 
797
       
 
798
      if (self->options->template)
 
799
        {
 
800
          g_string_append_c(result, ' ');
 
801
          if (lm->flags & LF_UTF8)
 
802
            g_string_append_len(result, "\xEF\xBB\xBF", 3);
 
803
          log_template_append_format(self->options->template, lm, 
 
804
                                     &self->options->template_options,
 
805
                                     LTZ_SEND,
 
806
                                     seq_num, NULL,
 
807
                                     result);
 
808
        }
 
809
      else
 
810
        {
 
811
          const gchar *p;
 
812
          gssize len;
 
813
 
 
814
          p = log_msg_get_value(lm, LM_V_MESSAGE, &len);
 
815
          g_string_append_c(result, ' ');
 
816
          if (len != 0)
 
817
            {
 
818
              if (lm->flags & LF_UTF8)
 
819
                g_string_append_len(result, "\xEF\xBB\xBF", 3);
 
820
 
 
821
              g_string_append_len(result, p, len);
 
822
            }
 
823
        }
 
824
      g_string_append_c(result, '\n');
 
825
      log_writer_do_padding(self, result);
 
826
    }
 
827
  else
 
828
    {
 
829
 
 
830
      if (self->options->template)
 
831
        {
 
832
          template = self->options->template;
 
833
        }
 
834
      else if (self->flags & LW_FORMAT_FILE)
 
835
        {
 
836
          template = self->options->file_template;
 
837
        }
 
838
      else if ((self->flags & LW_FORMAT_PROTO))
 
839
        {
 
840
          template = self->options->proto_template;
 
841
        }
 
842
      
 
843
      if (template)
 
844
        {
 
845
          log_template_format(template, lm, 
 
846
                              &self->options->template_options,
 
847
                              LTZ_SEND,
 
848
                              seq_num, NULL,
 
849
                              result);
 
850
 
 
851
        }
 
852
      else 
 
853
        {
 
854
          const gchar *p;
 
855
          gssize len;
 
856
 
 
857
          if (self->flags & LW_FORMAT_FILE)
 
858
            {
 
859
              log_stamp_format(stamp, result, self->options->template_options.ts_format,
 
860
                               time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->tv_sec),
 
861
                               self->options->template_options.frac_digits);
 
862
            }
 
863
          else if (self->flags & LW_FORMAT_PROTO)
 
864
            {
 
865
              g_string_append_c(result, '<');
 
866
              format_uint32_padded(result, 0, 0, 10, lm->pri);
 
867
              g_string_append_c(result, '>');
 
868
 
 
869
              /* always use BSD timestamp by default, the use can override this using a custom template */
 
870
              log_stamp_append_format(stamp, result, TS_FMT_BSD,
 
871
                                      time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->tv_sec),
 
872
                                      self->options->template_options.frac_digits);
 
873
            }
 
874
          g_string_append_c(result, ' ');
 
875
 
 
876
          p = log_msg_get_value(lm, LM_V_HOST, &len);
 
877
          g_string_append_len(result, p, len);
 
878
          g_string_append_c(result, ' ');
 
879
 
 
880
          if ((lm->flags & LF_LEGACY_MSGHDR))
 
881
            {
 
882
              p = log_msg_get_value(lm, LM_V_LEGACY_MSGHDR, &len);
 
883
              g_string_append_len(result, p, len);
 
884
            }
 
885
          else
 
886
            {
 
887
              p = log_msg_get_value(lm, LM_V_PROGRAM, &len);
 
888
              if (len > 0)
 
889
                {
 
890
                  g_string_append_len(result, p, len);
 
891
                  p = log_msg_get_value(lm, LM_V_PID, &len);
 
892
                  if (len > 0)
 
893
                    {
 
894
                      g_string_append_c(result, '[');
 
895
                      g_string_append_len(result, p, len);
 
896
                      g_string_append_c(result, ']');
 
897
                    }
 
898
                  g_string_append_len(result, ": ", 2);
 
899
                }
 
900
            }
 
901
          p = log_msg_get_value(lm, LM_V_MESSAGE, &len);
 
902
          g_string_append_len(result, p, len);
 
903
          g_string_append_c(result, '\n');
 
904
          log_writer_do_padding(self, result);
 
905
        }
 
906
    }
 
907
  if (self->options->options & LWO_NO_MULTI_LINE)
 
908
    {
 
909
      gchar *p;
 
910
 
 
911
      p = result->str;
 
912
      /* NOTE: the size is calculated to leave trailing new line */
 
913
      while ((p = find_cr_or_lf(p, result->str + result->len - p - 1)))
 
914
        {
 
915
          *p = ' ';
 
916
          p++;
 
917
        }
 
918
 
 
919
    }
 
920
}
 
921
 
 
922
static void
 
923
log_writer_broken(LogWriter *self, gint notify_code)
 
924
{
 
925
  log_writer_stop_watches(self);
 
926
  log_pipe_notify(self->control, &self->super, notify_code, self);
 
927
}
 
928
 
 
929
/*
 
930
 * Write messages to the underlying file descriptor using the installed
 
931
 * LogProto instance.  This is called whenever the output is ready to accept
 
932
 * further messages, and once during config deinitialization, in order to
 
933
 * flush messages still in the queue, in the hope that most of them can be
 
934
 * written out.
 
935
 *
 
936
 * In threaded mode, this function is invoked as part of the "output" task
 
937
 * (in essence, this is the function that performs the output task).
 
938
 *
 
939
 * @flush_mode specifies how hard LogWriter is trying to send messages to
 
940
 * the actual destination:
 
941
 *
 
942
 *
 
943
 * LW_FLUSH_NORMAL    - business as usual, flush when the buffer is full
 
944
 * LW_FLUSH_BUFFER    - flush the buffer immediately please
 
945
 * LW_FLUSH_QUEUE     - pull off any queued items, at maximum speed, even
 
946
 *                      ignoring throttle, and flush the buffer too
 
947
 *
 
948
 */
 
949
gboolean
 
950
log_writer_flush(LogWriter *self, LogWriterFlushMode flush_mode)
 
951
{
 
952
  LogProto *proto = self->proto;
 
953
  gint count = 0;
 
954
  gboolean ignore_throttle = (flush_mode >= LW_FLUSH_QUEUE);
 
955
  
 
956
  if (!proto)
 
957
    return FALSE;
 
958
 
 
959
  /* NOTE: in case we're reloading or exiting we flush all queued items as
 
960
   * long as the destination can consume it.  This is not going to be an
 
961
   * infinite loop, since the reader will cease to produce new messages when
 
962
   * main_loop_io_worker_job_quit() is set. */
 
963
 
 
964
  while (!main_loop_io_worker_job_quit() || flush_mode >= LW_FLUSH_QUEUE)
 
965
    {
 
966
      LogMessage *lm;
 
967
      LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
 
968
      gboolean consumed = FALSE;
 
969
      
 
970
      if (!log_queue_pop_head(self->queue, &lm, &path_options, FALSE, ignore_throttle))
 
971
        {
 
972
          /* no more items are available */
 
973
          break;
 
974
        }
 
975
 
 
976
      log_msg_refcache_start_consumer(lm, &path_options);
 
977
      msg_set_context(lm);
 
978
 
 
979
      log_writer_format_log(self, lm, self->line_buffer);
 
980
      
 
981
      if (self->line_buffer->len)
 
982
        {
 
983
          LogProtoStatus status;
 
984
 
 
985
          status = log_proto_post(proto, (guchar *) self->line_buffer->str, self->line_buffer->len, &consumed);
 
986
          if (status == LPS_ERROR)
 
987
            {
 
988
              if ((self->options->options & LWO_IGNORE_ERRORS) == 0)
 
989
                {
 
990
                  msg_set_context(NULL);
 
991
                  log_msg_refcache_stop();
 
992
                  return FALSE;
 
993
                }
 
994
              else
 
995
                {
 
996
                  if (!consumed)
 
997
                    g_free(self->line_buffer->str);
 
998
                  consumed = TRUE;
 
999
                }
 
1000
            }
 
1001
          if (consumed)
 
1002
            {
 
1003
              self->line_buffer->str = g_malloc(self->line_buffer->allocated_len);
 
1004
              self->line_buffer->str[0] = 0;
 
1005
              self->line_buffer->len = 0;
 
1006
            }
 
1007
        }
 
1008
      if (consumed)
 
1009
        {
 
1010
          if (lm->flags & LF_LOCAL)
 
1011
            step_sequence_number(&self->seq_num);
 
1012
          log_msg_ack(lm, &path_options);
 
1013
          log_msg_unref(lm);
 
1014
        }
 
1015
      else
 
1016
        {
 
1017
          /* push back to the queue */
 
1018
          log_queue_push_head(self->queue, lm, &path_options);
 
1019
 
 
1020
          msg_set_context(NULL);
 
1021
          log_msg_refcache_stop();
 
1022
          break;
 
1023
        }
 
1024
        
 
1025
      msg_set_context(NULL);
 
1026
      log_msg_refcache_stop();
 
1027
      count++;
 
1028
    }
 
1029
 
 
1030
  if (flush_mode >= LW_FLUSH_BUFFER || count == 0)
 
1031
    {
 
1032
      if (log_proto_flush(proto) == LPS_ERROR)
 
1033
        return FALSE;
 
1034
    }
 
1035
 
 
1036
  return TRUE;
 
1037
}
 
1038
 
 
1039
static void
 
1040
log_writer_init_watches(LogWriter *self)
 
1041
{
 
1042
  IV_FD_INIT(&self->fd_watch);
 
1043
  self->fd_watch.cookie = self;
 
1044
 
 
1045
  IV_TASK_INIT(&self->immed_io_task);
 
1046
  self->immed_io_task.cookie = self;
 
1047
  self->immed_io_task.handler = log_writer_io_flush_output;
 
1048
 
 
1049
  IV_TIMER_INIT(&self->suspend_timer);
 
1050
  self->suspend_timer.cookie = self;
 
1051
 
 
1052
  IV_TIMER_INIT(&self->suppress_timer);
 
1053
  self->suppress_timer.cookie = self;
 
1054
  self->suppress_timer.handler = (void (*)(void *)) log_writer_last_msg_timer;
 
1055
 
 
1056
  IV_EVENT_INIT(&self->queue_filled);
 
1057
  self->queue_filled.cookie = self;
 
1058
  self->queue_filled.handler = log_writer_queue_filled;
 
1059
 
 
1060
  main_loop_io_worker_job_init(&self->io_job);
 
1061
  self->io_job.user_data = self;
 
1062
  self->io_job.work = (void (*)(void *)) log_writer_work_perform;
 
1063
  self->io_job.completion = (void (*)(void *)) log_writer_work_finished;
 
1064
}
 
1065
 
 
1066
static gboolean
 
1067
log_writer_init(LogPipe *s)
 
1068
{
 
1069
  LogWriter *self = (LogWriter *) s;
 
1070
 
 
1071
  g_assert(self->queue != NULL);
 
1072
  iv_event_register(&self->queue_filled);
 
1073
 
 
1074
  if ((self->options->options & LWO_NO_STATS) == 0 && !self->dropped_messages)
 
1075
    {
 
1076
      stats_lock();
 
1077
      stats_register_counter(self->stats_level, self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_DROPPED, &self->dropped_messages);
 
1078
      if (self->options->suppress > 0)
 
1079
        stats_register_counter(self->stats_level, self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_SUPPRESSED, &self->suppressed_messages);
 
1080
      stats_register_counter(self->stats_level, self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_PROCESSED, &self->processed_messages);
 
1081
      
 
1082
      stats_register_counter(self->stats_level, self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_STORED, &self->stored_messages);
 
1083
      stats_unlock();
 
1084
    }
 
1085
  self->suppress_timer_updated = TRUE;
 
1086
  log_queue_set_counters(self->queue, self->stored_messages, self->dropped_messages);
 
1087
  if (self->proto)
 
1088
    {
 
1089
      LogProto *proto;
 
1090
 
 
1091
      proto = self->proto;
 
1092
      self->proto = NULL;
 
1093
      log_writer_reopen(&self->super, proto);
 
1094
    }
 
1095
  return TRUE;
 
1096
}
 
1097
 
 
1098
static gboolean
 
1099
log_writer_deinit(LogPipe *s)
 
1100
{
 
1101
  LogWriter *self = (LogWriter *) s;
 
1102
 
 
1103
  main_loop_assert_main_thread();
 
1104
 
 
1105
  log_queue_reset_parallel_push(self->queue);
 
1106
  log_writer_flush(self, LW_FLUSH_QUEUE);
 
1107
  /* FIXME: by the time we arrive here, it must be guaranteed that no
 
1108
   * _queue() call is running in a different thread, otherwise we'd need
 
1109
   * some kind of locking. */
 
1110
 
 
1111
  log_writer_stop_watches(self);
 
1112
  iv_event_unregister(&self->queue_filled);
 
1113
 
 
1114
  if (iv_timer_registered(&self->suppress_timer))
 
1115
    iv_timer_unregister(&self->suppress_timer);
 
1116
 
 
1117
  log_queue_set_counters(self->queue, NULL, NULL);
 
1118
 
 
1119
  stats_lock();
 
1120
  stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_DROPPED, &self->dropped_messages);
 
1121
  stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_SUPPRESSED, &self->suppressed_messages);
 
1122
  stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_PROCESSED, &self->processed_messages);
 
1123
  stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->stats_id, self->stats_instance, SC_TYPE_STORED, &self->stored_messages);
 
1124
  stats_unlock();
 
1125
  
 
1126
  return TRUE;
 
1127
}
 
1128
 
 
1129
static void
 
1130
log_writer_free(LogPipe *s)
 
1131
{
 
1132
  LogWriter *self = (LogWriter *) s;
 
1133
 
 
1134
  if (self->proto)
 
1135
    log_proto_free(self->proto);
 
1136
 
 
1137
  if (self->line_buffer)
 
1138
    g_string_free(self->line_buffer, TRUE);
 
1139
  if (self->queue)
 
1140
    log_queue_unref(self->queue);
 
1141
  if (self->last_msg)
 
1142
    log_msg_unref(self->last_msg);
 
1143
  g_free(self->stats_id);
 
1144
  g_free(self->stats_instance);
 
1145
  g_static_mutex_free(&self->suppress_lock);
 
1146
  g_static_mutex_free(&self->pending_proto_lock);
 
1147
  g_cond_free(self->pending_proto_cond);
 
1148
 
 
1149
  log_pipe_free_method(s);
 
1150
}
 
1151
 
 
1152
/* FIXME: this is inherently racy */
 
1153
gboolean
 
1154
log_writer_has_pending_writes(LogWriter *self)
 
1155
{
 
1156
  return log_queue_get_length(self->queue) > 0 || !self->watches_running;
 
1157
}
 
1158
 
 
1159
gboolean
 
1160
log_writer_opened(LogWriter *self)
 
1161
{
 
1162
  return self->proto != NULL;
 
1163
}
 
1164
 
 
1165
/* run in the main thread in reaction to a log_writer_reopen to change
 
1166
 * the destination LogProto instance. It needs to be ran in the main
 
1167
 * thread as it reregisters the watches associated with the main
 
1168
 * thread. */
 
1169
void
 
1170
log_writer_reopen_deferred(gpointer s)
 
1171
{
 
1172
  gpointer *args = (gpointer *) s;
 
1173
  LogWriter *self = args[0];
 
1174
  LogProto *proto = args[1];
 
1175
 
 
1176
  init_sequence_number(&self->seq_num);
 
1177
 
 
1178
  if (self->io_job.working)
 
1179
    {
 
1180
      /* NOTE: proto can be NULL */
 
1181
      self->pending_proto = proto;
 
1182
      self->pending_proto_present = TRUE;
 
1183
      return;
 
1184
    }
 
1185
 
 
1186
  log_writer_stop_watches(self);
 
1187
 
 
1188
  if (self->proto)
 
1189
    log_proto_free(self->proto);
 
1190
 
 
1191
  self->proto = proto;
 
1192
 
 
1193
  if (proto)
 
1194
    log_writer_start_watches(self);
 
1195
}
 
1196
 
 
1197
/*
 
1198
 * This function can be called from any threads, from the main thread
 
1199
 * as well as I/O worker threads. It takes care about going to the
 
1200
 * main thread to actually switch LogProto under this writer.
 
1201
 *
 
1202
 * The writer may still be operating, (e.g. log_pipe_deinit/init is
 
1203
 * not needed).
 
1204
 *
 
1205
 * In case we're running in a non-main thread, then by the time this
 
1206
 * function returns, the reopen has finished. In case it is called
 
1207
 * from the main thread, this function may defer updating self->proto
 
1208
 * until the worker thread has finished. The reason for this
 
1209
 * difference is:
 
1210
 *
 
1211
 *   - if LogWriter is busy, then updating the LogProto instance is
 
1212
 *     deferred to log_writer_work_finished(), but that runs in the
 
1213
 *     main thread.
 
1214
 *
 
1215
 *   - normally, even this deferred update is waited for, but in case
 
1216
 *     we're in the main thread, we can't block.
 
1217
 *
 
1218
 * This situation could probably be improved, maybe the synchonous
 
1219
 * return of log_writer_reopen() is not needed by call sites, but I
 
1220
 * was not sure, and right before release I didn't want to take the
 
1221
 * risky approach.
 
1222
 */
 
1223
void
 
1224
log_writer_reopen(LogPipe *s, LogProto *proto)
 
1225
{
 
1226
  LogWriter *self = (LogWriter *) s;
 
1227
  gpointer args[] = { s, proto };
 
1228
 
 
1229
  main_loop_call((MainLoopTaskFunc) log_writer_reopen_deferred, args, TRUE);
 
1230
 
 
1231
  if (!main_loop_is_main_thread())
 
1232
    {
 
1233
      g_static_mutex_lock(&self->pending_proto_lock);
 
1234
      while (self->pending_proto_present)
 
1235
        {
 
1236
          g_cond_wait(self->pending_proto_cond, g_static_mutex_get_mutex(&self->pending_proto_lock));
 
1237
        }
 
1238
      g_static_mutex_unlock(&self->pending_proto_lock);
 
1239
    }
 
1240
}
 
1241
 
 
1242
void
 
1243
log_writer_set_options(LogWriter *self, LogPipe *control, LogWriterOptions *options, gint stats_level, gint stats_source, const gchar *stats_id, const gchar *stats_instance)
 
1244
{
 
1245
  self->control = control;
 
1246
  self->options = options;
 
1247
 
 
1248
  self->stats_level = stats_level;
 
1249
  self->stats_source = stats_source;
 
1250
  self->stats_id = stats_id ? g_strdup(stats_id) : NULL;
 
1251
  self->stats_instance = stats_instance ? g_strdup(stats_instance) : NULL;
 
1252
 
 
1253
}
 
1254
 
 
1255
LogPipe *
 
1256
log_writer_new(guint32 flags)
 
1257
{
 
1258
  LogWriter *self = g_new0(LogWriter, 1);
 
1259
  
 
1260
  log_pipe_init_instance(&self->super);
 
1261
  self->super.init = log_writer_init;
 
1262
  self->super.deinit = log_writer_deinit;
 
1263
  self->super.queue = log_writer_queue;
 
1264
  self->super.free_fn = log_writer_free;
 
1265
  self->flags = flags;
 
1266
  self->line_buffer = g_string_sized_new(128);
 
1267
  self->pollable_state = -1;
 
1268
  init_sequence_number(&self->seq_num);
 
1269
 
 
1270
  log_writer_init_watches(self);
 
1271
  g_static_mutex_init(&self->suppress_lock);
 
1272
  g_static_mutex_init(&self->pending_proto_lock);
 
1273
  self->pending_proto_cond = g_cond_new();
 
1274
 
 
1275
  return &self->super;
 
1276
}
 
1277
 
 
1278
/* returns a reference */
 
1279
LogQueue *
 
1280
log_writer_get_queue(LogPipe *s)
 
1281
{
 
1282
  LogWriter *self = (LogWriter *) s;
 
1283
 
 
1284
  return log_queue_ref(self->queue);
 
1285
}
 
1286
 
 
1287
/* consumes the reference */
 
1288
void
 
1289
log_writer_set_queue(LogPipe *s, LogQueue *queue)
 
1290
{
 
1291
  LogWriter *self = (LogWriter *)s;
 
1292
 
 
1293
  if (self->queue)
 
1294
    log_queue_unref(self->queue);
 
1295
  self->queue = queue;
 
1296
}
 
1297
 
 
1298
void 
 
1299
log_writer_options_defaults(LogWriterOptions *options)
 
1300
{
 
1301
  options->template = NULL;
 
1302
  options->flush_lines = -1;
 
1303
  options->flush_timeout = -1;
 
1304
  log_template_options_defaults(&options->template_options);
 
1305
  options->time_reopen = -1;
 
1306
  options->suppress = -1;
 
1307
  options->padding = 0;
 
1308
}
 
1309
 
 
1310
void 
 
1311
log_writer_options_set_template_escape(LogWriterOptions *options, gboolean enable)
 
1312
{
 
1313
  if (options->template && options->template->def_inline)
 
1314
    {
 
1315
      log_template_set_escape(options->template, enable);
 
1316
    }
 
1317
  else
 
1318
    {
 
1319
      msg_error("Macro escaping can only be specified for inline templates", NULL);
 
1320
    }
 
1321
}
 
1322
 
 
1323
 
 
1324
/*
 
1325
 * NOTE: options_init and options_destroy are a bit weird, because their
 
1326
 * invocation is not completely symmetric:
 
1327
 *
 
1328
 *   - init is called from driver init (e.g. affile_dd_init), 
 
1329
 *   - destroy is called from driver free method (e.g. affile_sd_free, NOT affile_dd_deinit)
 
1330
 *
 
1331
 * The reason:
 
1332
 *   - when initializing the reloaded configuration fails for some reason,
 
1333
 *     we have to fall back to the old configuration, thus we cannot dump
 
1334
 *     the information stored in the Options structure.
 
1335
 *
 
1336
 * For the reasons above, init and destroy behave the following way:
 
1337
 *
 
1338
 *   - init is idempotent, it can be called multiple times without leaking
 
1339
 *     memory, and without loss of information
 
1340
 *   - destroy is only called once, when the options are indeed to be destroyed
 
1341
 *
 
1342
 * As init allocates memory, it has to take care about freeing memory
 
1343
 * allocated by the previous init call (or it has to reuse those).
 
1344
 *   
 
1345
 */
 
1346
void
 
1347
log_writer_options_init(LogWriterOptions *options, GlobalConfig *cfg, guint32 option_flags)
 
1348
{
 
1349
  LogTemplate *template;
 
1350
  gchar *time_zone[2];
 
1351
  TimeZoneInfo *time_zone_info[2];
 
1352
  gint i;
 
1353
 
 
1354
  template = log_template_ref(options->template);
 
1355
 
 
1356
  for (i = 0; i < LTZ_MAX; i++)
 
1357
    {
 
1358
      time_zone[i] = options->template_options.time_zone[i];
 
1359
      time_zone_info[i] = options->template_options.time_zone_info[i];
 
1360
      options->template_options.time_zone[i] = NULL;
 
1361
      options->template_options.time_zone_info[i] = NULL;
 
1362
    }
 
1363
 
 
1364
  log_writer_options_destroy(options);
 
1365
  log_template_options_destroy(&options->template_options);
 
1366
  
 
1367
  /* restroe the config */
 
1368
  options->template = template;
 
1369
  for (i = 0; i < LTZ_MAX; i++)
 
1370
    {
 
1371
      options->template_options.time_zone[i] = time_zone[i];
 
1372
      options->template_options.time_zone_info[i] = time_zone_info[i];
 
1373
    }
 
1374
  log_template_options_init(&options->template_options, cfg);
 
1375
  options->options |= option_flags;
 
1376
    
 
1377
  if (options->flush_lines == -1)
 
1378
    options->flush_lines = cfg->flush_lines;
 
1379
  if (options->flush_timeout == -1)
 
1380
    options->flush_timeout = cfg->flush_timeout;
 
1381
  if (options->suppress == -1)
 
1382
    options->suppress = cfg->suppress;
 
1383
  if (options->time_reopen == -1)
 
1384
    options->time_reopen = cfg->time_reopen;
 
1385
  options->file_template = log_template_ref(cfg->file_template);
 
1386
  options->proto_template = log_template_ref(cfg->proto_template);
 
1387
  if (cfg->threaded)
 
1388
    options->options |= LWO_THREADED;
 
1389
}
 
1390
 
 
1391
void
 
1392
log_writer_options_destroy(LogWriterOptions *options)
 
1393
{
 
1394
  log_template_options_destroy(&options->template_options);
 
1395
  log_template_unref(options->template);
 
1396
  log_template_unref(options->file_template);
 
1397
  log_template_unref(options->proto_template);
 
1398
}
 
1399
 
 
1400
gint
 
1401
log_writer_options_lookup_flag(const gchar *flag)
 
1402
{
 
1403
  if (strcmp(flag, "syslog_protocol") == 0 || strcmp(flag, "syslog-protocol") == 0)
 
1404
    return LWO_SYSLOG_PROTOCOL;
 
1405
  if (strcmp(flag, "no-multi-line") == 0 || strcmp(flag, "no_multi_line") == 0)
 
1406
    return LWO_NO_MULTI_LINE;
 
1407
  if (strcmp(flag, "threaded") == 0)
 
1408
    return LWO_THREADED;
 
1409
  if (strcmp(flag, "ignore-errors") == 0 || strcmp(flag, "ignore_errors") == 0)
 
1410
    return LWO_IGNORE_ERRORS;
 
1411
  msg_error("Unknown dest writer flag", evt_tag_str("flag", flag), NULL);
 
1412
  return 0;
 
1413
}