2
* Copyright (c) 2013 BalaBit IT Ltd, Budapest, Hungary
3
* Copyright (c) 2013 Gergely Nagy <algernon@balabit.hu>
5
* This library is free software; you can redistribute it and/or
6
* modify it under the terms of the GNU Lesser General Public
7
* License as published by the Free Software Foundation; either
8
* version 2.1 of the License, or (at your option) any later version.
10
* This library is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13
* Lesser General Public License for more details.
15
* You should have received a copy of the GNU Lesser General Public
16
* License along with this library; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* As an additional exemption you are allowed to compile & link against the
20
* OpenSSL libraries as published by the OpenSSL project. See the file
21
* COPYING for details.
25
#include "logthrdestdrv.h"
29
log_threaded_dest_driver_suspend(LogThrDestDriver *self)
31
self->writer_thread_suspended = TRUE;
32
g_get_current_time(&self->writer_thread_suspend_target);
33
g_time_val_add(&self->writer_thread_suspend_target,
34
self->time_reopen * 1000000);
38
log_threaded_dest_driver_message_became_available_in_the_queue(gpointer user_data)
40
LogThrDestDriver *self = (LogThrDestDriver *) user_data;
42
g_mutex_lock(self->suspend_mutex);
43
g_cond_signal(self->writer_thread_wakeup_cond);
44
g_mutex_unlock(self->suspend_mutex);
48
log_threaded_dest_driver_worker_thread_main(gpointer arg)
50
LogThrDestDriver *self = (LogThrDestDriver *)arg;
52
msg_debug("Worker thread started",
53
evt_tag_str("driver", self->super.super.id),
56
if (self->worker.thread_init)
57
self->worker.thread_init(self);
59
while (!self->writer_thread_terminate)
61
g_mutex_lock(self->suspend_mutex);
62
if (self->writer_thread_suspended)
64
g_cond_timed_wait(self->writer_thread_wakeup_cond,
66
&self->writer_thread_suspend_target);
67
self->writer_thread_suspended = FALSE;
68
g_mutex_unlock(self->suspend_mutex);
70
else if (!log_queue_check_items(self->queue, NULL,
71
log_threaded_dest_driver_message_became_available_in_the_queue,
74
g_cond_wait(self->writer_thread_wakeup_cond, self->suspend_mutex);
75
g_mutex_unlock(self->suspend_mutex);
78
g_mutex_unlock(self->suspend_mutex);
80
if (self->writer_thread_terminate)
83
if (!self->worker.insert(self))
85
if (self->worker.disconnect)
86
self->worker.disconnect(self);
87
log_threaded_dest_driver_suspend(self);
91
if (self->worker.disconnect)
92
self->worker.disconnect(self);
94
if (self->worker.thread_deinit)
95
self->worker.thread_deinit(self);
97
msg_debug("Worker thread finished",
98
evt_tag_str("driver", self->super.super.id),
105
log_threaded_dest_driver_start_thread(LogThrDestDriver *self)
107
self->writer_thread = create_worker_thread(log_threaded_dest_driver_worker_thread_main,
112
log_threaded_dest_driver_stop_thread(LogThrDestDriver *self)
114
self->writer_thread_terminate = TRUE;
115
g_mutex_lock(self->suspend_mutex);
116
g_cond_signal(self->writer_thread_wakeup_cond);
117
g_mutex_unlock(self->suspend_mutex);
118
g_thread_join(self->writer_thread);
122
log_threaded_dest_driver_start(LogPipe *s)
124
LogThrDestDriver *self = (LogThrDestDriver *)s;
125
GlobalConfig *cfg = log_pipe_get_config(s);
128
self->time_reopen = cfg->time_reopen;
130
self->queue = log_dest_driver_acquire_queue(&self->super,
131
self->format.persist_name(self));
134
stats_register_counter(0, self->stats_source | SCS_DESTINATION, self->super.super.id,
135
self->format.stats_instance(self),
136
SC_TYPE_STORED, &self->stored_messages);
137
stats_register_counter(0, self->stats_source | SCS_DESTINATION, self->super.super.id,
138
self->format.stats_instance(self),
139
SC_TYPE_DROPPED, &self->dropped_messages);
142
log_queue_set_counters(self->queue, self->stored_messages,
143
self->dropped_messages);
145
log_threaded_dest_driver_start_thread(self);
151
log_threaded_dest_driver_deinit_method(LogPipe *s)
153
LogThrDestDriver *self = (LogThrDestDriver *)s;
155
log_threaded_dest_driver_stop_thread(self);
156
log_queue_reset_parallel_push(self->queue);
158
log_queue_set_counters(self->queue, NULL, NULL);
161
stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->super.super.id,
162
self->format.stats_instance(self),
163
SC_TYPE_STORED, &self->stored_messages);
164
stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->super.super.id,
165
self->format.stats_instance(self),
166
SC_TYPE_DROPPED, &self->dropped_messages);
169
if (!log_dest_driver_deinit_method(s))
176
log_threaded_dest_driver_free(LogPipe *s)
178
LogThrDestDriver *self = (LogThrDestDriver *)s;
180
g_mutex_free(self->suspend_mutex);
181
g_cond_free(self->writer_thread_wakeup_cond);
184
log_queue_unref(self->queue);
186
log_dest_driver_free((LogPipe *)self);
190
log_threaded_dest_driver_queue(LogPipe *s, LogMessage *msg,
191
const LogPathOptions *path_options,
194
LogThrDestDriver *self = (LogThrDestDriver *)s;
195
LogPathOptions local_options;
197
if (!path_options->flow_control_requested)
198
path_options = log_msg_break_ack(msg, path_options, &local_options);
200
if (self->queue_method)
201
self->queue_method(self);
203
log_msg_add_ack(msg, path_options);
204
log_queue_push_tail(self->queue, log_msg_ref(msg), path_options);
206
log_dest_driver_queue_method(s, msg, path_options, user_data);
210
log_threaded_dest_driver_init_instance(LogThrDestDriver *self)
212
log_dest_driver_init_instance(&self->super);
214
self->writer_thread_wakeup_cond = g_cond_new();
215
self->suspend_mutex = g_mutex_new();
217
self->super.super.super.init = log_threaded_dest_driver_start;
218
self->super.super.super.deinit = log_threaded_dest_driver_deinit_method;
219
self->super.super.super.queue = log_threaded_dest_driver_queue;
220
self->super.super.super.free_fn = log_threaded_dest_driver_free;