2
* Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary
3
* Copyright (c) 1998-2010 Balázs Scheidler
5
* This library is free software; you can redistribute it and/or
6
* modify it under the terms of the GNU Lesser General Public
7
* License as published by the Free Software Foundation; either
8
* version 2.1 of the License, or (at your option) any later version.
10
* This library is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13
* Lesser General Public License for more details.
15
* You should have received a copy of the GNU Lesser General Public
16
* License along with this library; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* As an additional exemption you are allowed to compile & link against the
20
* OpenSSL libraries as published by the OpenSSL project. See the file
21
* COPYING for details.
28
#include "serialize.h"
32
#include <sys/types.h>
37
#include <iv_thread.h>
40
* LogFifo is a scalable first-in-first-output queue implementation, that:
42
* - has a per-thread, unlocked input queue where threads can put their items
44
* - has a locked wait-queue where items go once the per-thread input
45
* would be overflown or if the input thread goes to sleep (e.g. one
46
* lock acquisition per a longer period)
48
* - has an unlocked output queue where items from the wait queue go, once
49
* it becomes depleted.
51
* This means that items flow in this sequence from one list to the next:
53
* input queue (per-thread) -> wait queue (locked) -> output queue (single-threaded)
56
* - input threads putting elements on their per-thread queue (lockless)
57
* - output threads removing elements from the output queue (lockless)
60
* - input queue is overflown (or the input thread goes to sleep), wait
61
* queue mutex is grabbed, all elements are put to the wait queue.
63
* - output queue is depleted, wait queue mutex is grabbed, all elements
64
* on the wait queue is put to the output queue
66
* Threading assumptions:
67
* - the head of the queue is only manipulated from the output thread
68
* - the tail of the queue is only manipulated from the input threads
73
typedef struct _LogQueueFifo
77
/* scalable qoverflow implementation */
78
struct list_head qoverflow_output;
79
struct list_head qoverflow_wait;
80
gint qoverflow_wait_len;
81
gint qoverflow_output_len;
82
gint qoverflow_size; /* in number of elements */
84
struct list_head qbacklog; /* entries that were sent but not acked yet */
89
struct list_head items;
90
MainLoopIOWorkerFinishCallback cb;
92
guint16 finish_cb_registered;
96
/* NOTE: this is inherently racy, unless protected by LogQueue->lock */
98
log_queue_fifo_get_length(LogQueue *s)
100
LogQueueFifo *self = (LogQueueFifo *) s;
102
return self->qoverflow_wait_len + self->qoverflow_output_len;
105
/* move items from the per-thread input queue to the lock-protected "wait" queue */
107
log_queue_fifo_move_input_unlocked(LogQueueFifo *self, gint thread_id)
109
if (log_queue_fifo_get_length(&self->super) + self->qoverflow_input[thread_id].len > self->qoverflow_size)
111
/* slow path, the input thread's queue would overflow the queue, let's drop some messages */
113
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
114
gint n = self->qoverflow_input[thread_id].len - (self->qoverflow_size - log_queue_fifo_get_length(&self->super));
117
for (i = 0; i < n; i++)
119
LogMessageQueueNode *node = list_entry(self->qoverflow_input[thread_id].items.next, LogMessageQueueNode, list);
120
LogMessage *msg = node->msg;
122
list_del(&node->list);
123
self->qoverflow_input[thread_id].len--;
124
path_options.ack_needed = node->ack_needed;
125
stats_counter_inc(self->super.dropped_messages);
126
log_msg_free_queue_node(node);
127
log_msg_drop(msg, &path_options);
129
msg_debug("Destination queue full, dropping messages",
130
evt_tag_int("queue_len", log_queue_fifo_get_length(&self->super)),
131
evt_tag_int("log_fifo_size", self->qoverflow_size),
132
evt_tag_int("count", n),
135
stats_counter_add(self->super.stored_messages, self->qoverflow_input[thread_id].len);
136
list_splice_tail_init(&self->qoverflow_input[thread_id].items, &self->qoverflow_wait);
137
self->qoverflow_wait_len += self->qoverflow_input[thread_id].len;
138
self->qoverflow_input[thread_id].len = 0;
141
/* move items from the per-thread input queue to the lock-protected
142
* "wait" queue, but grabbing locks first. This is registered as a
143
* callback to be called when the input worker thread finishes its
147
log_queue_fifo_move_input(gpointer user_data)
149
LogQueueFifo *self = (LogQueueFifo *) user_data;
152
thread_id = main_loop_io_worker_thread_id();
154
g_assert(thread_id >= 0);
156
g_static_mutex_lock(&self->super.lock);
157
log_queue_fifo_move_input_unlocked(self, thread_id);
158
log_queue_push_notify(&self->super);
159
g_static_mutex_unlock(&self->super.lock);
160
self->qoverflow_input[thread_id].finish_cb_registered = FALSE;
165
* Assumed to be called from one of the input threads. If the thread_id
166
* cannot be determined, the item is put directly in the wait queue.
168
* Puts the message to the queue, and logs an error if it caused the
171
* It attempts to put the item to the per-thread input queue.
174
log_queue_fifo_push_tail(LogQueue *s, LogMessage *msg, const LogPathOptions *path_options)
176
LogQueueFifo *self = (LogQueueFifo *) s;
178
LogMessageQueueNode *node;
180
thread_id = main_loop_io_worker_thread_id();
182
g_assert(thread_id < 0 || log_queue_max_threads > thread_id);
184
/* NOTE: we don't use high-water marks for now, as log_fetch_limit
185
* limits the number of items placed on the per-thread input queue
186
* anyway, and any sane number decreased the performance measurably.
188
* This means that per-thread input queues contain _all_ items that
189
* a single poll iteration produces. And once the reader is finished
190
* (either because the input is depleted or because of
191
* log_fetch_limit / window_size) the whole bunch is propagated to
195
if (thread_id >= 0) {
196
/* fastpath, use per-thread input FIFOs */
197
if (!self->qoverflow_input[thread_id].finish_cb_registered)
199
/* this is the first item in the input FIFO, register a finish
200
* callback to make sure it gets moved to the wait_queue if the
201
* input thread finishes */
203
main_loop_io_worker_register_finish_callback(&self->qoverflow_input[thread_id].cb);
204
self->qoverflow_input[thread_id].finish_cb_registered = TRUE;
207
node = log_msg_alloc_queue_node(msg, path_options);
208
list_add_tail(&node->list, &self->qoverflow_input[thread_id].items);
209
self->qoverflow_input[thread_id].len++;
214
/* slow path, put the pending item and the whole input queue to the wait_queue */
216
g_static_mutex_lock(&self->super.lock);
219
log_queue_fifo_move_input_unlocked(self, thread_id);
221
if (log_queue_fifo_get_length(s) < self->qoverflow_size)
223
node = log_msg_alloc_queue_node(msg, path_options);
225
list_add_tail(&node->list, &self->qoverflow_wait);
226
self->qoverflow_wait_len++;
227
log_queue_push_notify(&self->super);
229
stats_counter_inc(self->super.stored_messages);
230
g_static_mutex_unlock(&self->super.lock);
236
stats_counter_inc(self->super.dropped_messages);
237
g_static_mutex_unlock(&self->super.lock);
238
log_msg_drop(msg, path_options);
240
msg_debug("Destination queue full, dropping message",
241
evt_tag_int("queue_len", log_queue_fifo_get_length(&self->super)),
242
evt_tag_int("log_fifo_size", self->qoverflow_size),
249
* Put an item back to the front of the queue.
251
* This is assumed to be called only from the output thread.
254
log_queue_fifo_push_head(LogQueue *s, LogMessage *msg, const LogPathOptions *path_options)
256
LogQueueFifo *self = (LogQueueFifo *) s;
257
LogMessageQueueNode *node;
259
/* we don't check limits when putting items "in-front", as it
260
* normally happens when we start processing an item, but at the end
261
* can't deliver it. No checks, no drops either. */
263
log_queue_assert_output_thread(s);
265
node = log_msg_alloc_dynamic_queue_node(msg, path_options);
266
list_add(&node->list, &self->qoverflow_output);
267
self->qoverflow_output_len++;
269
stats_counter_inc(self->super.stored_messages);
273
* Can only run from the output thread.
276
log_queue_fifo_pop_head(LogQueue *s, LogMessage **msg, LogPathOptions *path_options, gboolean push_to_backlog, gboolean ignore_throttle)
278
LogQueueFifo *self = (LogQueueFifo *) s;
279
LogMessageQueueNode *node;
281
log_queue_assert_output_thread(s);
283
if (!ignore_throttle && self->super.throttle && self->super.throttle_buckets == 0)
288
if (self->qoverflow_output_len == 0)
290
/* slow path, output queue is empty, get some elements from the wait queue */
291
g_static_mutex_lock(&self->super.lock);
292
list_splice_tail_init(&self->qoverflow_wait, &self->qoverflow_output);
293
self->qoverflow_output_len = self->qoverflow_wait_len;
294
self->qoverflow_wait_len = 0;
295
g_static_mutex_unlock(&self->super.lock);
298
if (self->qoverflow_output_len > 0)
300
node = list_entry(self->qoverflow_output.next, LogMessageQueueNode, list);
303
path_options->ack_needed = node->ack_needed;
304
self->qoverflow_output_len--;
305
if (!push_to_backlog)
307
list_del(&node->list);
308
log_msg_free_queue_node(node);
312
list_del_init(&node->list);
317
/* no items either on the wait queue nor the output queue.
319
* NOTE: the input queues may contain items even in this case,
320
* however we don't touch them here, they'll be migrated to the
321
* wait_queue once the input threads finish their processing (or
322
* the high watermark is reached). Also, they are unlocked, so
323
* no way to touch them safely.
327
stats_counter_dec(self->super.stored_messages);
332
list_add_tail(&node->list, &self->qbacklog);
333
self->qbacklog_len++;
335
if (!ignore_throttle)
337
self->super.throttle_buckets--;
344
* Can only run from the output thread.
347
log_queue_fifo_ack_backlog(LogQueue *s, gint n)
349
LogQueueFifo *self = (LogQueueFifo *) s;
351
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
354
log_queue_assert_output_thread(s);
356
for (i = 0; i < n && self->qbacklog_len > 0; i++)
358
LogMessageQueueNode *node;
360
node = list_entry(self->qbacklog.next, LogMessageQueueNode, list);
362
path_options.ack_needed = node->ack_needed;
364
list_del(&node->list);
365
log_msg_free_queue_node(node);
366
self->qbacklog_len--;
368
log_msg_ack(msg, &path_options);
375
* log_queue_rewind_backlog:
377
* Move items on our backlog back to our qoverflow queue. Please note that this
378
* function does not really care about qoverflow size, it has to put the backlog
379
* somewhere. The backlog is emptied as that will be filled if we send the
382
* NOTE: this is assumed to be called from the output thread.
385
log_queue_fifo_rewind_backlog(LogQueue *s)
387
LogQueueFifo *self = (LogQueueFifo *) s;
389
log_queue_assert_output_thread(s);
391
list_splice_tail_init(&self->qbacklog, &self->qoverflow_output);
392
self->qoverflow_output_len += self->qbacklog_len;
393
stats_counter_add(self->super.stored_messages, self->qbacklog_len);
394
self->qbacklog_len = 0;
398
log_queue_fifo_free_queue(struct list_head *q)
400
while (!list_empty(q))
402
LogMessageQueueNode *node;
403
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
406
node = list_entry(q->next, LogMessageQueueNode, list);
407
list_del(&node->list);
409
path_options.ack_needed = node->ack_needed;
411
log_msg_free_queue_node(node);
412
log_msg_ack(msg, &path_options);
418
log_queue_fifo_free(LogQueue *s)
420
LogQueueFifo *self = (LogQueueFifo *) s;
423
for (i = 0; i < log_queue_max_threads; i++)
424
log_queue_fifo_free_queue(&self->qoverflow_input[i].items);
426
log_queue_fifo_free_queue(&self->qoverflow_wait);
427
log_queue_fifo_free_queue(&self->qoverflow_output);
428
log_queue_fifo_free_queue(&self->qbacklog);
429
log_queue_free_method(s);
433
log_queue_fifo_new(gint qoverflow_size, const gchar *persist_name)
438
self = g_malloc0(sizeof(LogQueueFifo) + log_queue_max_threads * sizeof(self->qoverflow_input[0]));
440
log_queue_init_instance(&self->super, persist_name);
441
self->super.get_length = log_queue_fifo_get_length;
442
self->super.push_tail = log_queue_fifo_push_tail;
443
self->super.push_head = log_queue_fifo_push_head;
444
self->super.pop_head = log_queue_fifo_pop_head;
445
self->super.ack_backlog = log_queue_fifo_ack_backlog;
446
self->super.rewind_backlog = log_queue_fifo_rewind_backlog;
448
self->super.free_fn = log_queue_fifo_free;
450
for (i = 0; i < log_queue_max_threads; i++)
452
INIT_LIST_HEAD(&self->qoverflow_input[i].items);
453
main_loop_io_worker_finish_callback_init(&self->qoverflow_input[i].cb);
454
self->qoverflow_input[i].cb.user_data = self;
455
self->qoverflow_input[i].cb.func = log_queue_fifo_move_input;
457
INIT_LIST_HEAD(&self->qoverflow_wait);
458
INIT_LIST_HEAD(&self->qoverflow_output);
459
INIT_LIST_HEAD(&self->qbacklog);
461
self->qoverflow_size = qoverflow_size;