~ubuntu-branches/ubuntu/wily/syslog-ng/wily-proposed

« back to all changes in this revision

Viewing changes to lib/logthrdestdrv.c

  • Committer: Package Import Robot
  • Author(s): Gergely Nagy, Gergely Nagy
  • Date: 2013-11-04 15:27:37 UTC
  • mfrom: (1.3.12)
  • Revision ID: package-import@ubuntu.com-20131104152737-mqh6eqtna2xk97jq
Tags: 3.5.1-1
[ Gergely Nagy <algernon@madhouse-project.org> ]
* New upstream release.
  + Support auto-loading modules (Closes: #650814)
  + The SMTP module is available in syslog-ng-mod-smtp (Closes: #722746)
  + New modules: amqp, geoip, stomp, redis and smtp.
  + Multi-line input support (indented multiline and regexp-based)
  + Template type hinting for the MongoDB destination and $(format-json)
  + Support for unit suffixes in the configuration file
  + New filters, template functions and other miscellaneous changes
* New (team) maintainer, Laszlo Boszormenyi, Attila Szalay and myself
  added to Uploaders.
* Ship /var/lib/syslog-ng in the syslog-ng-core package, instead of
  creating it in the init script. Thanks Michael Biebl
  <biebl@debian.org> for the report & assistance. (Closes: #699942, #719910)
* Use dh-systemd for proper systemd-related maintainer scripts. Based on
  a patch by Michael Biebl <biebl@debian.org>. (Closes: #713982,
  #690067)
* Do not wait for syslog-ng to settle down during installation / update.
  This also fixes installing via debootstrap and a fake
  start-stop-daemon. (Closes: #714254)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2013 BalaBit IT Ltd, Budapest, Hungary
 
3
 * Copyright (c) 2013 Gergely Nagy <algernon@balabit.hu>
 
4
 *
 
5
 * This library is free software; you can redistribute it and/or
 
6
 * modify it under the terms of the GNU Lesser General Public
 
7
 * License as published by the Free Software Foundation; either
 
8
 * version 2.1 of the License, or (at your option) any later version.
 
9
 *
 
10
 * This library is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
13
 * Lesser General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU Lesser General Public
 
16
 * License along with this library; if not, write to the Free Software
 
17
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
18
 *
 
19
 * As an additional exemption you are allowed to compile & link against the
 
20
 * OpenSSL libraries as published by the OpenSSL project. See the file
 
21
 * COPYING for details.
 
22
 *
 
23
 */
 
24
 
 
25
#include "logthrdestdrv.h"
 
26
#include "misc.h"
 
27
 
 
28
void
 
29
log_threaded_dest_driver_suspend(LogThrDestDriver *self)
 
30
{
 
31
  self->writer_thread_suspended = TRUE;
 
32
  g_get_current_time(&self->writer_thread_suspend_target);
 
33
  g_time_val_add(&self->writer_thread_suspend_target,
 
34
                 self->time_reopen * 1000000);
 
35
}
 
36
 
 
37
static void
 
38
log_threaded_dest_driver_message_became_available_in_the_queue(gpointer user_data)
 
39
{
 
40
  LogThrDestDriver *self = (LogThrDestDriver *) user_data;
 
41
 
 
42
  g_mutex_lock(self->suspend_mutex);
 
43
  g_cond_signal(self->writer_thread_wakeup_cond);
 
44
  g_mutex_unlock(self->suspend_mutex);
 
45
}
 
46
 
 
47
static gpointer
 
48
log_threaded_dest_driver_worker_thread_main(gpointer arg)
 
49
{
 
50
  LogThrDestDriver *self = (LogThrDestDriver *)arg;
 
51
 
 
52
  msg_debug("Worker thread started",
 
53
            evt_tag_str("driver", self->super.super.id),
 
54
            NULL);
 
55
 
 
56
  if (self->worker.thread_init)
 
57
    self->worker.thread_init(self);
 
58
 
 
59
  while (!self->writer_thread_terminate)
 
60
    {
 
61
      g_mutex_lock(self->suspend_mutex);
 
62
      if (self->writer_thread_suspended)
 
63
        {
 
64
          g_cond_timed_wait(self->writer_thread_wakeup_cond,
 
65
                            self->suspend_mutex,
 
66
                            &self->writer_thread_suspend_target);
 
67
          self->writer_thread_suspended = FALSE;
 
68
          g_mutex_unlock(self->suspend_mutex);
 
69
        }
 
70
      else if (!log_queue_check_items(self->queue, NULL,
 
71
                                      log_threaded_dest_driver_message_became_available_in_the_queue,
 
72
                                      self, NULL))
 
73
        {
 
74
          g_cond_wait(self->writer_thread_wakeup_cond, self->suspend_mutex);
 
75
          g_mutex_unlock(self->suspend_mutex);
 
76
        }
 
77
      else
 
78
        g_mutex_unlock(self->suspend_mutex);
 
79
 
 
80
      if (self->writer_thread_terminate)
 
81
        break;
 
82
 
 
83
      if (!self->worker.insert(self))
 
84
        {
 
85
          if (self->worker.disconnect)
 
86
            self->worker.disconnect(self);
 
87
          log_threaded_dest_driver_suspend(self);
 
88
        }
 
89
    }
 
90
 
 
91
  if (self->worker.disconnect)
 
92
    self->worker.disconnect(self);
 
93
 
 
94
  if (self->worker.thread_deinit)
 
95
    self->worker.thread_deinit(self);
 
96
 
 
97
  msg_debug("Worker thread finished",
 
98
            evt_tag_str("driver", self->super.super.id),
 
99
            NULL);
 
100
 
 
101
  return NULL;
 
102
}
 
103
 
 
104
static void
 
105
log_threaded_dest_driver_start_thread(LogThrDestDriver *self)
 
106
{
 
107
  self->writer_thread = create_worker_thread(log_threaded_dest_driver_worker_thread_main,
 
108
                                             self, TRUE, NULL);
 
109
}
 
110
 
 
111
static void
 
112
log_threaded_dest_driver_stop_thread(LogThrDestDriver *self)
 
113
{
 
114
  self->writer_thread_terminate = TRUE;
 
115
  g_mutex_lock(self->suspend_mutex);
 
116
  g_cond_signal(self->writer_thread_wakeup_cond);
 
117
  g_mutex_unlock(self->suspend_mutex);
 
118
  g_thread_join(self->writer_thread);
 
119
}
 
120
 
 
121
gboolean
 
122
log_threaded_dest_driver_start(LogPipe *s)
 
123
{
 
124
  LogThrDestDriver *self = (LogThrDestDriver *)s;
 
125
  GlobalConfig *cfg = log_pipe_get_config(s);
 
126
 
 
127
  if (cfg)
 
128
    self->time_reopen = cfg->time_reopen;
 
129
 
 
130
  self->queue = log_dest_driver_acquire_queue(&self->super,
 
131
                                              self->format.persist_name(self));
 
132
 
 
133
  stats_lock();
 
134
  stats_register_counter(0, self->stats_source | SCS_DESTINATION, self->super.super.id,
 
135
                         self->format.stats_instance(self),
 
136
                         SC_TYPE_STORED, &self->stored_messages);
 
137
  stats_register_counter(0, self->stats_source | SCS_DESTINATION, self->super.super.id,
 
138
                         self->format.stats_instance(self),
 
139
                         SC_TYPE_DROPPED, &self->dropped_messages);
 
140
  stats_unlock();
 
141
 
 
142
  log_queue_set_counters(self->queue, self->stored_messages,
 
143
                         self->dropped_messages);
 
144
 
 
145
  log_threaded_dest_driver_start_thread(self);
 
146
 
 
147
  return TRUE;
 
148
}
 
149
 
 
150
gboolean
 
151
log_threaded_dest_driver_deinit_method(LogPipe *s)
 
152
{
 
153
  LogThrDestDriver *self = (LogThrDestDriver *)s;
 
154
 
 
155
  log_threaded_dest_driver_stop_thread(self);
 
156
  log_queue_reset_parallel_push(self->queue);
 
157
 
 
158
  log_queue_set_counters(self->queue, NULL, NULL);
 
159
 
 
160
  stats_lock();
 
161
  stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->super.super.id,
 
162
                           self->format.stats_instance(self),
 
163
                           SC_TYPE_STORED, &self->stored_messages);
 
164
  stats_unregister_counter(self->stats_source | SCS_DESTINATION, self->super.super.id,
 
165
                           self->format.stats_instance(self),
 
166
                           SC_TYPE_DROPPED, &self->dropped_messages);
 
167
  stats_unlock();
 
168
 
 
169
  if (!log_dest_driver_deinit_method(s))
 
170
    return FALSE;
 
171
 
 
172
  return TRUE;
 
173
}
 
174
 
 
175
void
 
176
log_threaded_dest_driver_free(LogPipe *s)
 
177
{
 
178
  LogThrDestDriver *self = (LogThrDestDriver *)s;
 
179
 
 
180
  g_mutex_free(self->suspend_mutex);
 
181
  g_cond_free(self->writer_thread_wakeup_cond);
 
182
 
 
183
  if (self->queue)
 
184
    log_queue_unref(self->queue);
 
185
 
 
186
  log_dest_driver_free((LogPipe *)self);
 
187
}
 
188
 
 
189
static void
 
190
log_threaded_dest_driver_queue(LogPipe *s, LogMessage *msg,
 
191
                               const LogPathOptions *path_options,
 
192
                               gpointer user_data)
 
193
{
 
194
  LogThrDestDriver *self = (LogThrDestDriver *)s;
 
195
  LogPathOptions local_options;
 
196
 
 
197
  if (!path_options->flow_control_requested)
 
198
    path_options = log_msg_break_ack(msg, path_options, &local_options);
 
199
 
 
200
  if (self->queue_method)
 
201
    self->queue_method(self);
 
202
 
 
203
  log_msg_add_ack(msg, path_options);
 
204
  log_queue_push_tail(self->queue, log_msg_ref(msg), path_options);
 
205
 
 
206
  log_dest_driver_queue_method(s, msg, path_options, user_data);
 
207
}
 
208
 
 
209
void
 
210
log_threaded_dest_driver_init_instance(LogThrDestDriver *self)
 
211
{
 
212
  log_dest_driver_init_instance(&self->super);
 
213
 
 
214
  self->writer_thread_wakeup_cond = g_cond_new();
 
215
  self->suspend_mutex = g_mutex_new();
 
216
 
 
217
  self->super.super.super.init = log_threaded_dest_driver_start;
 
218
  self->super.super.super.deinit = log_threaded_dest_driver_deinit_method;
 
219
  self->super.super.super.queue = log_threaded_dest_driver_queue;
 
220
  self->super.super.super.free_fn = log_threaded_dest_driver_free;
 
221
}