~ubuntu-branches/ubuntu/trusty/syslog-ng/trusty-proposed

« back to all changes in this revision

Viewing changes to .pc/LogQueue-added-keep_on_reload-method.patch/lib/logqueue-fifo.c

  • Committer: Package Import Robot
  • Author(s): Laszlo Boszormenyi (GCS), Gergely Nagy
  • Date: 2011-10-11 14:30:48 UTC
  • mfrom: (1.3.7)
  • Revision ID: package-import@ubuntu.com-20111011143048-r1iljux9xbvj3lwh
Tags: 3.3.1.dfsg-1
* New upstream release with important fixes from upstream git tree with
  non-free manpages removed.
* Drop syslog-ng.conf(5) (closes: #496521).
* syslog-ng(8) is generated, and does not mention -Q anymore
  (closes: #616069).
* Supports CAP_SYSLOG on recent kernels (closes: #630172).
* Does not use g_timeout_add_seconds anymore (closes: #609154).

[ Gergely Nagy <algernon@madhouse-project.org> ]
* Update debian/copyright to DEP-5 format.
* Simplified the logrotate file by merging identical entries.
* Include local configuration files from /etc/syslog-ng/conf.d/ (Closes:
  #609050).
* Update syslog-ng.conf to be fully 3.3 compliant.
* Compress both source and binaries (except the syslog-ng meta
  package) with xz, instead of gzip.
* Use dpkg triggers to restart syslog-ng when appropriate.
* Include DFSG-free manual pages for all binaries.
* Build with Hardening enabled.
* Mention syslog(3) in /etc/default/syslog-ng, instead of
  <linux/kernel.h> (Closes: #608605)
* Support 'status' in the init script.
  Patch from Peter Eisentraut <petere@debian.org> (Closes: #644458)
* Build-Depend on libevtlog-dev (>= 0.2.12-5~) for correct shlibs.
* Use [linux-any] in Build-Depends instead of hardcoded links.
  (Closes: #634715)
* Use $SYSLOGNG_OPTS in the init script when reloading syslog-ng.
  (Closes: #589081)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary
 
3
 * Copyright (c) 1998-2010 Balázs Scheidler
 
4
 *
 
5
 * This library is free software; you can redistribute it and/or
 
6
 * modify it under the terms of the GNU Lesser General Public
 
7
 * License as published by the Free Software Foundation; either
 
8
 * version 2.1 of the License, or (at your option) any later version.
 
9
 *
 
10
 * This library is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
13
 * Lesser General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU Lesser General Public
 
16
 * License along with this library; if not, write to the Free Software
 
17
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
18
 *
 
19
 * As an additional exemption you are allowed to compile & link against the
 
20
 * OpenSSL libraries as published by the OpenSSL project. See the file
 
21
 * COPYING for details.
 
22
 *
 
23
 */
 
24
 
 
25
#include "logqueue.h"
 
26
#include "logpipe.h"
 
27
#include "messages.h"
 
28
#include "serialize.h"
 
29
#include "stats.h"
 
30
#include "mainloop.h"
 
31
 
 
32
#include <sys/types.h>
 
33
#include <sys/stat.h>
 
34
#include <fcntl.h>
 
35
#include <unistd.h>
 
36
#include <string.h>
 
37
#include <iv_thread.h>
 
38
 
 
39
/*
 
40
 * LogFifo is a scalable first-in-first-output queue implementation, that:
 
41
 *
 
42
 *   - has a per-thread, unlocked input queue where threads can put their items
 
43
 *
 
44
 *   - has a locked wait-queue where items go once the per-thread input
 
45
 *     would be overflown or if the input thread goes to sleep (e.g.  one
 
46
 *     lock acquisition per a longer period)
 
47
 *
 
48
 *   - has an unlocked output queue where items from the wait queue go, once
 
49
 *     it becomes depleted.
 
50
 *
 
51
 * This means that items flow in this sequence from one list to the next:
 
52
 *
 
53
 *    input queue (per-thread) -> wait queue (locked) -> output queue (single-threaded)
 
54
 *
 
55
 * Fastpath is:
 
56
 *   - input threads putting elements on their per-thread queue (lockless)
 
57
 *   - output threads removing elements from the output queue (lockless)
 
58
 *
 
59
 * Slowpath:
 
60
 *   - input queue is overflown (or the input thread goes to sleep), wait
 
61
 *     queue mutex is grabbed, all elements are put to the wait queue.
 
62
 *
 
63
 *   - output queue is depleted, wait queue mutex is grabbed, all elements
 
64
 *     on the wait queue is put to the output queue
 
65
 *
 
66
 * Threading assumptions:
 
67
 *   - the head of the queue is only manipulated from the output thread
 
68
 *   - the tail of the queue is only manipulated from the input threads
 
69
 *
 
70
 */
 
71
 
 
72
 
 
73
typedef struct _LogQueueFifo
 
74
{
 
75
  LogQueue super;
 
76
  
 
77
  /* scalable qoverflow implementation */
 
78
  struct list_head qoverflow_output;
 
79
  struct list_head qoverflow_wait;
 
80
  gint qoverflow_wait_len;
 
81
  gint qoverflow_output_len;
 
82
  gint qoverflow_size; /* in number of elements */
 
83
 
 
84
  struct list_head qbacklog;    /* entries that were sent but not acked yet */
 
85
  gint qbacklog_len;
 
86
 
 
87
  struct
 
88
  {
 
89
    struct list_head items;
 
90
    MainLoopIOWorkerFinishCallback cb;
 
91
    guint16 len;
 
92
    guint16 finish_cb_registered;
 
93
  } qoverflow_input[0];
 
94
} LogQueueFifo;
 
95
 
 
96
/* NOTE: this is inherently racy, unless protected by LogQueue->lock */
 
97
static gint64
 
98
log_queue_fifo_get_length(LogQueue *s)
 
99
{
 
100
  LogQueueFifo *self = (LogQueueFifo *) s;
 
101
 
 
102
  return self->qoverflow_wait_len + self->qoverflow_output_len;
 
103
}
 
104
 
 
105
/* move items from the per-thread input queue to the lock-protected "wait" queue */
 
106
static void
 
107
log_queue_fifo_move_input_unlocked(LogQueueFifo *self, gint thread_id)
 
108
{
 
109
  if (log_queue_fifo_get_length(&self->super) + self->qoverflow_input[thread_id].len > self->qoverflow_size)
 
110
    {
 
111
      /* slow path, the input thread's queue would overflow the queue, let's drop some messages */
 
112
 
 
113
      LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
 
114
      gint n = self->qoverflow_input[thread_id].len - (self->qoverflow_size - log_queue_fifo_get_length(&self->super));
 
115
      gint i;
 
116
 
 
117
      for (i = 0; i < n; i++)
 
118
        {
 
119
          LogMessageQueueNode *node = list_entry(self->qoverflow_input[thread_id].items.next, LogMessageQueueNode, list);
 
120
          LogMessage *msg = node->msg;
 
121
 
 
122
          list_del(&node->list);
 
123
          self->qoverflow_input[thread_id].len--;
 
124
          path_options.ack_needed = node->ack_needed;
 
125
          stats_counter_inc(self->super.dropped_messages);
 
126
          log_msg_free_queue_node(node);
 
127
          log_msg_drop(msg, &path_options);
 
128
        }
 
129
      msg_debug("Destination queue full, dropping messages",
 
130
                evt_tag_int("queue_len", log_queue_fifo_get_length(&self->super)),
 
131
                evt_tag_int("log_fifo_size", self->qoverflow_size),
 
132
                evt_tag_int("count", n),
 
133
                NULL);
 
134
    }
 
135
  stats_counter_add(self->super.stored_messages, self->qoverflow_input[thread_id].len);
 
136
  list_splice_tail_init(&self->qoverflow_input[thread_id].items, &self->qoverflow_wait);
 
137
  self->qoverflow_wait_len += self->qoverflow_input[thread_id].len;
 
138
  self->qoverflow_input[thread_id].len = 0;
 
139
}
 
140
 
 
141
/* move items from the per-thread input queue to the lock-protected
 
142
 * "wait" queue, but grabbing locks first. This is registered as a
 
143
 * callback to be called when the input worker thread finishes its
 
144
 * job.
 
145
 */
 
146
static gpointer
 
147
log_queue_fifo_move_input(gpointer user_data)
 
148
{
 
149
  LogQueueFifo *self = (LogQueueFifo *) user_data;
 
150
  gint thread_id;
 
151
 
 
152
  thread_id = main_loop_io_worker_thread_id();
 
153
 
 
154
  g_assert(thread_id >= 0);
 
155
 
 
156
  g_static_mutex_lock(&self->super.lock);
 
157
  log_queue_fifo_move_input_unlocked(self, thread_id);
 
158
  log_queue_push_notify(&self->super);
 
159
  g_static_mutex_unlock(&self->super.lock);
 
160
  self->qoverflow_input[thread_id].finish_cb_registered = FALSE;
 
161
  return NULL;
 
162
}
 
163
 
 
164
/**
 
165
 * Assumed to be called from one of the input threads. If the thread_id
 
166
 * cannot be determined, the item is put directly in the wait queue.
 
167
 *
 
168
 * Puts the message to the queue, and logs an error if it caused the
 
169
 * queue to be full.
 
170
 *
 
171
 * It attempts to put the item to the per-thread input queue.
 
172
 **/
 
173
static void
 
174
log_queue_fifo_push_tail(LogQueue *s, LogMessage *msg, const LogPathOptions *path_options)
 
175
{
 
176
  LogQueueFifo *self = (LogQueueFifo *) s;
 
177
  gint thread_id;
 
178
  LogMessageQueueNode *node;
 
179
 
 
180
  thread_id = main_loop_io_worker_thread_id();
 
181
 
 
182
  g_assert(thread_id < 0 || log_queue_max_threads > thread_id);
 
183
 
 
184
  /* NOTE: we don't use high-water marks for now, as log_fetch_limit
 
185
   * limits the number of items placed on the per-thread input queue
 
186
   * anyway, and any sane number decreased the performance measurably.
 
187
   *
 
188
   * This means that per-thread input queues contain _all_ items that
 
189
   * a single poll iteration produces. And once the reader is finished
 
190
   * (either because the input is depleted or because of
 
191
   * log_fetch_limit / window_size) the whole bunch is propagated to
 
192
   * the "wait" queue.
 
193
   */
 
194
 
 
195
  if (thread_id >= 0) {
 
196
      /* fastpath, use per-thread input FIFOs */
 
197
      if (!self->qoverflow_input[thread_id].finish_cb_registered)
 
198
        {
 
199
          /* this is the first item in the input FIFO, register a finish
 
200
           * callback to make sure it gets moved to the wait_queue if the
 
201
           * input thread finishes */
 
202
 
 
203
          main_loop_io_worker_register_finish_callback(&self->qoverflow_input[thread_id].cb);
 
204
          self->qoverflow_input[thread_id].finish_cb_registered = TRUE;
 
205
        }
 
206
 
 
207
      node = log_msg_alloc_queue_node(msg, path_options);
 
208
      list_add_tail(&node->list, &self->qoverflow_input[thread_id].items);
 
209
      self->qoverflow_input[thread_id].len++;
 
210
      log_msg_unref(msg);
 
211
      return;
 
212
    }
 
213
 
 
214
  /* slow path, put the pending item and the whole input queue to the wait_queue */
 
215
 
 
216
  g_static_mutex_lock(&self->super.lock);
 
217
  
 
218
  if (thread_id >= 0)
 
219
    log_queue_fifo_move_input_unlocked(self, thread_id);
 
220
  
 
221
  if (log_queue_fifo_get_length(s) < self->qoverflow_size)
 
222
    {
 
223
      node = log_msg_alloc_queue_node(msg, path_options);
 
224
 
 
225
      list_add_tail(&node->list, &self->qoverflow_wait);
 
226
      self->qoverflow_wait_len++;
 
227
      log_queue_push_notify(&self->super);
 
228
 
 
229
      stats_counter_inc(self->super.stored_messages);
 
230
      g_static_mutex_unlock(&self->super.lock);
 
231
 
 
232
      log_msg_unref(msg);
 
233
    }
 
234
  else
 
235
    {
 
236
      stats_counter_inc(self->super.dropped_messages);
 
237
      g_static_mutex_unlock(&self->super.lock);
 
238
      log_msg_drop(msg, path_options);
 
239
 
 
240
      msg_debug("Destination queue full, dropping message",
 
241
                evt_tag_int("queue_len", log_queue_fifo_get_length(&self->super)),
 
242
                evt_tag_int("log_fifo_size", self->qoverflow_size),
 
243
                NULL);
 
244
    }
 
245
  return;
 
246
}
 
247
 
 
248
/*
 
249
 * Put an item back to the front of the queue.
 
250
 *
 
251
 * This is assumed to be called only from the output thread.
 
252
 */
 
253
static void
 
254
log_queue_fifo_push_head(LogQueue *s, LogMessage *msg, const LogPathOptions *path_options)
 
255
{
 
256
  LogQueueFifo *self = (LogQueueFifo *) s;
 
257
  LogMessageQueueNode *node;
 
258
 
 
259
  /* we don't check limits when putting items "in-front", as it
 
260
   * normally happens when we start processing an item, but at the end
 
261
   * can't deliver it. No checks, no drops either. */
 
262
 
 
263
  log_queue_assert_output_thread(s);
 
264
 
 
265
  node = log_msg_alloc_dynamic_queue_node(msg, path_options);
 
266
  list_add(&node->list, &self->qoverflow_output);
 
267
  self->qoverflow_output_len++;
 
268
 
 
269
  stats_counter_inc(self->super.stored_messages);
 
270
}
 
271
 
 
272
/*
 
273
 * Can only run from the output thread.
 
274
 */
 
275
static gboolean
 
276
log_queue_fifo_pop_head(LogQueue *s, LogMessage **msg, LogPathOptions *path_options, gboolean push_to_backlog, gboolean ignore_throttle)
 
277
{
 
278
  LogQueueFifo *self = (LogQueueFifo *) s;
 
279
  LogMessageQueueNode *node;
 
280
 
 
281
  log_queue_assert_output_thread(s);
 
282
 
 
283
  if (!ignore_throttle && self->super.throttle && self->super.throttle_buckets == 0)
 
284
    {
 
285
      return FALSE;
 
286
    }
 
287
    
 
288
  if (self->qoverflow_output_len == 0)
 
289
    {
 
290
      /* slow path, output queue is empty, get some elements from the wait queue */
 
291
      g_static_mutex_lock(&self->super.lock);
 
292
      list_splice_tail_init(&self->qoverflow_wait, &self->qoverflow_output);
 
293
      self->qoverflow_output_len = self->qoverflow_wait_len;
 
294
      self->qoverflow_wait_len = 0;
 
295
      g_static_mutex_unlock(&self->super.lock);
 
296
    }
 
297
 
 
298
  if (self->qoverflow_output_len > 0)
 
299
    {
 
300
      node = list_entry(self->qoverflow_output.next, LogMessageQueueNode, list);
 
301
 
 
302
      *msg = node->msg;
 
303
      path_options->ack_needed = node->ack_needed;
 
304
      self->qoverflow_output_len--;
 
305
      if (!push_to_backlog)
 
306
        {
 
307
          list_del(&node->list);
 
308
          log_msg_free_queue_node(node);
 
309
        }
 
310
      else
 
311
        {
 
312
          list_del_init(&node->list);
 
313
        }
 
314
    }
 
315
  else
 
316
    {
 
317
      /* no items either on the wait queue nor the output queue.
 
318
       *
 
319
       * NOTE: the input queues may contain items even in this case,
 
320
       * however we don't touch them here, they'll be migrated to the
 
321
       * wait_queue once the input threads finish their processing (or
 
322
       * the high watermark is reached). Also, they are unlocked, so
 
323
       * no way to touch them safely.
 
324
       */
 
325
      return FALSE;
 
326
    }
 
327
  stats_counter_dec(self->super.stored_messages);
 
328
 
 
329
  if (push_to_backlog)
 
330
    {
 
331
      log_msg_ref(*msg);
 
332
      list_add_tail(&node->list, &self->qbacklog);
 
333
      self->qbacklog_len++;
 
334
    }
 
335
  if (!ignore_throttle)
 
336
    {
 
337
      self->super.throttle_buckets--;
 
338
    }
 
339
 
 
340
  return TRUE;
 
341
}
 
342
 
 
343
/*
 
344
 * Can only run from the output thread.
 
345
 */
 
346
static void
 
347
log_queue_fifo_ack_backlog(LogQueue *s, gint n)
 
348
{
 
349
  LogQueueFifo *self = (LogQueueFifo *) s;
 
350
  LogMessage *msg;
 
351
  LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
 
352
  gint i;
 
353
 
 
354
  log_queue_assert_output_thread(s);
 
355
 
 
356
  for (i = 0; i < n && self->qbacklog_len > 0; i++)
 
357
    {
 
358
      LogMessageQueueNode *node;
 
359
 
 
360
      node = list_entry(self->qbacklog.next, LogMessageQueueNode, list);
 
361
      msg = node->msg;
 
362
      path_options.ack_needed = node->ack_needed;
 
363
 
 
364
      list_del(&node->list);
 
365
      log_msg_free_queue_node(node);
 
366
      self->qbacklog_len--;
 
367
 
 
368
      log_msg_ack(msg, &path_options);
 
369
      log_msg_unref(msg);
 
370
    }
 
371
}
 
372
 
 
373
 
 
374
/*
 
375
 * log_queue_rewind_backlog:
 
376
 *
 
377
 * Move items on our backlog back to our qoverflow queue. Please note that this
 
378
 * function does not really care about qoverflow size, it has to put the backlog
 
379
 * somewhere. The backlog is emptied as that will be filled if we send the
 
380
 * items again.
 
381
 *
 
382
 * NOTE: this is assumed to be called from the output thread.
 
383
 */
 
384
static void
 
385
log_queue_fifo_rewind_backlog(LogQueue *s)
 
386
{
 
387
  LogQueueFifo *self = (LogQueueFifo *) s;
 
388
 
 
389
  log_queue_assert_output_thread(s);
 
390
 
 
391
  list_splice_tail_init(&self->qbacklog, &self->qoverflow_output);
 
392
  self->qoverflow_output_len += self->qbacklog_len;
 
393
  stats_counter_add(self->super.stored_messages, self->qbacklog_len);
 
394
  self->qbacklog_len = 0;
 
395
}
 
396
 
 
397
static void
 
398
log_queue_fifo_free_queue(struct list_head *q)
 
399
{
 
400
  while (!list_empty(q))
 
401
    {
 
402
      LogMessageQueueNode *node;
 
403
      LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
 
404
      LogMessage *msg;
 
405
 
 
406
      node = list_entry(q->next, LogMessageQueueNode, list);
 
407
      list_del(&node->list);
 
408
 
 
409
      path_options.ack_needed = node->ack_needed;
 
410
      msg = node->msg;
 
411
      log_msg_free_queue_node(node);
 
412
      log_msg_ack(msg, &path_options);
 
413
      log_msg_unref(msg);
 
414
    }
 
415
}
 
416
 
 
417
static void
 
418
log_queue_fifo_free(LogQueue *s)
 
419
{
 
420
  LogQueueFifo *self = (LogQueueFifo *) s;
 
421
  gint i;
 
422
 
 
423
  for (i = 0; i < log_queue_max_threads; i++)
 
424
    log_queue_fifo_free_queue(&self->qoverflow_input[i].items);
 
425
 
 
426
  log_queue_fifo_free_queue(&self->qoverflow_wait);
 
427
  log_queue_fifo_free_queue(&self->qoverflow_output);
 
428
  log_queue_fifo_free_queue(&self->qbacklog);
 
429
  log_queue_free_method(s);
 
430
}
 
431
 
 
432
LogQueue *
 
433
log_queue_fifo_new(gint qoverflow_size, const gchar *persist_name)
 
434
{
 
435
  LogQueueFifo *self;
 
436
  gint i;
 
437
 
 
438
  self = g_malloc0(sizeof(LogQueueFifo) + log_queue_max_threads * sizeof(self->qoverflow_input[0]));
 
439
 
 
440
  log_queue_init_instance(&self->super, persist_name);
 
441
  self->super.get_length = log_queue_fifo_get_length;
 
442
  self->super.push_tail = log_queue_fifo_push_tail;
 
443
  self->super.push_head = log_queue_fifo_push_head;
 
444
  self->super.pop_head = log_queue_fifo_pop_head;
 
445
  self->super.ack_backlog = log_queue_fifo_ack_backlog;
 
446
  self->super.rewind_backlog = log_queue_fifo_rewind_backlog;
 
447
 
 
448
  self->super.free_fn = log_queue_fifo_free;
 
449
  
 
450
  for (i = 0; i < log_queue_max_threads; i++)
 
451
    {
 
452
      INIT_LIST_HEAD(&self->qoverflow_input[i].items);
 
453
      main_loop_io_worker_finish_callback_init(&self->qoverflow_input[i].cb);
 
454
      self->qoverflow_input[i].cb.user_data = self;
 
455
      self->qoverflow_input[i].cb.func = log_queue_fifo_move_input;
 
456
    }
 
457
  INIT_LIST_HEAD(&self->qoverflow_wait);
 
458
  INIT_LIST_HEAD(&self->qoverflow_output);
 
459
  INIT_LIST_HEAD(&self->qbacklog);
 
460
 
 
461
  self->qoverflow_size = qoverflow_size;
 
462
  return &self->super;
 
463
}