~ubuntu-branches/ubuntu/oneiric/syslog-ng/oneiric

« back to all changes in this revision

Viewing changes to src/logsource.c

  • Committer: Bazaar Package Importer
  • Author(s): Laszlo Boszormenyi (GCS)
  • Date: 2011-05-16 22:02:46 UTC
  • mfrom: (26.1.1 sid)
  • Revision ID: james.westby@ubuntu.com-20110516220246-nknmeu831n49bx1z
Tags: 3.2.4-1
* New upstream release, fixing infinite loop via PCRE and global. No CVE
  number yet, Vigil@nce id is 10648.
* Remove all patches, they were applied upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
 * Copyright (c) 2002-2009 BalaBit IT Ltd, Budapest, Hungary
3
 
 *
4
 
 * This program is free software; you can redistribute it and/or modify it
5
 
 * under the terms of the GNU General Public License version 2 as published
6
 
 * by the Free Software Foundation.
7
 
 *
8
 
 * Note that this permission is granted for only version 2 of the GPL.
9
 
 *
10
 
 * As an additional exemption you are allowed to compile & link against the
11
 
 * OpenSSL libraries as published by the OpenSSL project. See the file
12
 
 * COPYING for details.
13
 
 *
14
 
 * This program is distributed in the hope that it will be useful,
15
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 
 * GNU General Public License for more details.
18
 
 *
19
 
 * You should have received a copy of the GNU General Public License
20
 
 * along with this program; if not, write to the Free Software
21
 
 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22
 
 */
23
 
  
24
 
#include "logsource.h"
25
 
#include "messages.h"
26
 
#include "misc.h"
27
 
#include "timeutils.h"
28
 
#include "stats.h"
29
 
#include "tags.h"
30
 
 
31
 
#include <string.h>
32
 
 
33
 
static void
34
 
log_source_msg_ack(LogMessage *msg, gpointer user_data)
35
 
{
36
 
  LogSource *self = (LogSource *) user_data;
37
 
  guint32 old_window_size;
38
 
  
39
 
  old_window_size = g_atomic_counter_get(&self->options->window_size);
40
 
  g_atomic_counter_inc(&self->options->window_size);
41
 
  
42
 
  /* NOTE: this check could be racy, but it isn't for the following reasons:
43
 
   *  - if the current window size went down to zero, the source does not emit new messages
44
 
   *  - the only possible change in this case is: the destinations processed a 
45
 
   *    message and it got acked back
46
 
   *  - so the 0 -> change can only happen because of the destinations
47
 
   *  - from all the possible destinations the ACK happens at the last one, thus there 
48
 
   *    might be no concurrencies here, even if all destinations are in different threads
49
 
   *    
50
 
   */
51
 
  if (old_window_size == 0)
52
 
    main_loop_wakeup();
53
 
  log_msg_unref(msg);
54
 
  
55
 
  log_pipe_unref(&self->super);
56
 
}
57
 
 
58
 
void
59
 
log_source_mangle_hostname(LogSource *self, LogMessage *msg)
60
 
{
61
 
  gchar resolved_name[256];
62
 
  gsize resolved_name_len = sizeof(resolved_name);
63
 
  const gchar *orig_host;
64
 
  
65
 
  resolve_sockaddr(resolved_name, &resolved_name_len, msg->saddr, self->options->use_dns, self->options->use_fqdn, self->options->use_dns_cache, self->options->normalize_hostnames);
66
 
  log_msg_set_value(msg, LM_V_HOST_FROM, resolved_name, resolved_name_len);
67
 
 
68
 
  orig_host = log_msg_get_value(msg, LM_V_HOST, NULL);
69
 
  if (!self->options->keep_hostname || !orig_host || !orig_host[0])
70
 
    {
71
 
      gchar host[256];
72
 
      gint host_len = -1;
73
 
      if (G_UNLIKELY(self->options->chain_hostnames)) 
74
 
        {
75
 
          msg->flags |= LF_CHAINED_HOSTNAME;
76
 
          if (msg->flags & LF_LOCAL) 
77
 
            {
78
 
              /* local */
79
 
              host_len = g_snprintf(host, sizeof(host), "%s@%s", self->options->group_name, resolved_name);
80
 
            }
81
 
          else if (!orig_host || !orig_host[0])
82
 
            {
83
 
              /* remote && no hostname */
84
 
              host_len = g_snprintf(host, sizeof(host), "%s/%s", resolved_name, resolved_name);
85
 
            } 
86
 
          else 
87
 
            {
88
 
              /* everything else, append source hostname */
89
 
              if (orig_host && orig_host[0])
90
 
                host_len = g_snprintf(host, sizeof(host), "%s/%s", orig_host, resolved_name);
91
 
              else
92
 
                {
93
 
                  strncpy(host, resolved_name, sizeof(host));
94
 
                  /* just in case it is not zero terminated */
95
 
                  host[255] = 0;
96
 
                }
97
 
            }
98
 
          log_msg_set_value(msg, LM_V_HOST, host, host_len);
99
 
        }
100
 
      else
101
 
        {
102
 
          log_msg_set_value(msg, LM_V_HOST, resolved_name, resolved_name_len);
103
 
        }
104
 
    }
105
 
}
106
 
 
107
 
gboolean
108
 
log_source_init(LogPipe *s)
109
 
{
110
 
  LogSource *self = (LogSource *) s;
111
 
  stats_register_counter(self->stats_level, self->stats_source | SCS_SOURCE, self->stats_id, self->stats_instance, SC_TYPE_PROCESSED, &self->recvd_messages);
112
 
  stats_register_counter(self->stats_level, self->stats_source | SCS_SOURCE, self->stats_id, self->stats_instance, SC_TYPE_STAMP, &self->last_message_seen);
113
 
  return TRUE;
114
 
}
115
 
 
116
 
gboolean
117
 
log_source_deinit(LogPipe *s)
118
 
{
119
 
  LogSource *self = (LogSource *) s;
120
 
  
121
 
  stats_unregister_counter(self->stats_source | SCS_SOURCE, self->stats_id, self->stats_instance, SC_TYPE_PROCESSED, &self->recvd_messages);
122
 
  stats_unregister_counter(self->stats_source | SCS_SOURCE, self->stats_id, self->stats_instance, SC_TYPE_STAMP, &self->last_message_seen);
123
 
  return TRUE;
124
 
}
125
 
 
126
 
 
127
 
static void
128
 
log_source_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
129
 
{
130
 
  LogSource *self = (LogSource *) s;
131
 
  LogPathOptions local_options = *path_options;
132
 
  guint32 *processed_counter, *stamp;
133
 
  gboolean new;
134
 
  StatsCounter *handle;
135
 
  
136
 
  msg_set_context(msg);
137
 
 
138
 
  if (msg->timestamps[LM_TS_STAMP].time.tv_sec == -1 || !self->options->keep_timestamp)
139
 
    msg->timestamps[LM_TS_STAMP] = msg->timestamps[LM_TS_RECVD];
140
 
    
141
 
  g_assert(msg->timestamps[LM_TS_STAMP].zone_offset != -1);
142
 
 
143
 
  log_source_mangle_hostname(self, msg);
144
 
 
145
 
  if (self->options->program_override)
146
 
    {
147
 
      if (self->options->program_override_len < 0)
148
 
        self->options->program_override_len = strlen(self->options->program_override);
149
 
      log_msg_set_value(msg, LM_V_PROGRAM, self->options->program_override, self->options->program_override_len);
150
 
    }
151
 
  if (self->options->host_override)
152
 
    {
153
 
      if (self->options->host_override_len < 0)
154
 
        self->options->host_override_len = strlen(self->options->host_override);
155
 
      log_msg_set_value(msg, LM_V_HOST, self->options->host_override, self->options->host_override_len);
156
 
    }
157
 
    
158
 
  handle = stats_register_dynamic_counter(2, SCS_HOST | SCS_SOURCE, NULL, log_msg_get_value(msg, LM_V_HOST, NULL), SC_TYPE_PROCESSED, &processed_counter, &new);
159
 
  stats_register_associated_counter(handle, SC_TYPE_STAMP, &stamp);
160
 
  stats_counter_inc(processed_counter);
161
 
  stats_counter_set(stamp, msg->timestamps[LM_TS_RECVD].time.tv_sec);
162
 
  stats_unregister_dynamic_counter(handle, SC_TYPE_PROCESSED, &processed_counter);
163
 
  stats_unregister_dynamic_counter(handle, SC_TYPE_STAMP, &stamp);
164
 
 
165
 
  /* NOTE: we start by enabling flow-control, thus we need an acknowledgement */
166
 
  local_options.flow_control = TRUE;
167
 
  log_msg_ref(msg);
168
 
  log_msg_add_ack(msg, &local_options);
169
 
  msg->ack_func = log_source_msg_ack;
170
 
  msg->ack_userdata = log_pipe_ref(s);
171
 
    
172
 
  g_atomic_counter_dec_and_test(&self->options->window_size);
173
 
 
174
 
  /* NOTE: we don't need to be very accurate here, it is a bug if
175
 
   * window_size goes below 0, but an atomic operation is expensive and
176
 
   * reading it without locks does not always get the most accurate value,
177
 
   * but the only outcome might be that window_size is larger (since the
178
 
   * update we can lose is the atomic decrement in the consuming thread,
179
 
   * most probably SQL), thus the assert is safe.
180
 
   */
181
 
 
182
 
  g_assert(g_atomic_counter_racy_get(&self->options->window_size) >= 0);
183
 
 
184
 
  stats_counter_inc(self->recvd_messages);
185
 
  stats_counter_set(self->last_message_seen, msg->timestamps[LM_TS_RECVD].time.tv_sec);
186
 
  log_pipe_queue(s->pipe_next, msg, &local_options);
187
 
  msg_set_context(NULL);
188
 
}
189
 
 
190
 
void
191
 
log_source_set_options(LogSource *self, LogSourceOptions *options, gint stats_level, gint stats_source, const gchar *stats_id, const gchar *stats_instance)
192
 
{
193
 
  
194
 
  if (self->options)
195
 
    {
196
 
      gint current_window;
197
 
 
198
 
      /* this LogSource instance was originally used for an old
199
 
       * configuration, and after config reload it has become part of the
200
 
       * new config.  Make sure our window is propagated to avoid filling up
201
 
       * the queue with SIGHUPs.
202
 
       *
203
 
       * NOTE: although atomic_get/set without locks is racy, this is not,
204
 
       * since the old configuration is already deinitialized, thus any
205
 
       * threads that might acknowledge back messages is already stopped.
206
 
       */
207
 
 
208
 
      current_window = g_atomic_counter_get(&self->options->window_size);
209
 
      g_atomic_counter_set(&options->window_size, current_window);
210
 
    }
211
 
  self->options = options;
212
 
  self->stats_level = stats_level;
213
 
  self->stats_source = stats_source;
214
 
  self->stats_id = stats_id ? g_strdup(stats_id) : NULL;
215
 
  self->stats_instance = stats_instance ? g_strdup(stats_instance): NULL;  
216
 
}
217
 
 
218
 
void
219
 
log_source_init_instance(LogSource *self)
220
 
{
221
 
  log_pipe_init_instance(&self->super);
222
 
  self->super.queue = log_source_queue;
223
 
  self->super.free_fn = log_source_free;
224
 
  self->super.init = log_source_init;
225
 
  self->super.deinit = log_source_deinit;
226
 
}
227
 
 
228
 
void
229
 
log_source_free(LogPipe *s)
230
 
{
231
 
  LogSource *self = (LogSource *) s;
232
 
  
233
 
  g_free(self->stats_id);
234
 
  g_free(self->stats_instance);
235
 
  log_pipe_free(s);
236
 
}
237
 
 
238
 
void
239
 
log_source_options_defaults(LogSourceOptions *options)
240
 
{
241
 
  options->init_window_size = -1;
242
 
  g_atomic_counter_set(&options->window_size, -1);
243
 
  options->keep_hostname = -1;
244
 
  options->chain_hostnames = -1;
245
 
  options->use_dns = -1;
246
 
  options->use_fqdn = -1;
247
 
  options->use_dns_cache = -1;
248
 
  options->normalize_hostnames = -1;
249
 
  options->keep_timestamp = -1;
250
 
}
251
 
 
252
 
/*
253
 
 * NOTE: options_init and options_destroy are a bit weird, because their
254
 
 * invocation is not completely symmetric:
255
 
 *
256
 
 *   - init is called from driver init (e.g. affile_sd_init), 
257
 
 *   - destroy is called from driver free method (e.g. affile_sd_free, NOT affile_sd_deinit)
258
 
 *
259
 
 * The reason:
260
 
 *   - when initializing the reloaded configuration fails for some reason,
261
 
 *     we have to fall back to the old configuration, thus we cannot dump
262
 
 *     the information stored in the Options structure.
263
 
 *
264
 
 * For the reasons above, init and destroy behave the following way:
265
 
 *
266
 
 *   - init is idempotent, it can be called multiple times without leaking
267
 
 *     memory, and without loss of information
268
 
 *   - destroy is only called once, when the options are indeed to be destroyed
269
 
 *
270
 
 * As init allocates memory, it has to take care about freeing memory
271
 
 * allocated by the previous init call (or it has to reuse those).
272
 
 *   
273
 
 */
274
 
void
275
 
log_source_options_init(LogSourceOptions *options, GlobalConfig *cfg, const gchar *group_name)
276
 
{
277
 
  gchar *host_override, *program_override;
278
 
  gchar *source_group_name;
279
 
  
280
 
  host_override = options->host_override;
281
 
  options->host_override = NULL;
282
 
  program_override = options->program_override;
283
 
  options->program_override = NULL;
284
 
  log_source_options_destroy(options);
285
 
  
286
 
  options->host_override = host_override;
287
 
  options->host_override_len = -1;
288
 
  options->program_override = program_override;
289
 
  options->program_override_len = -1;
290
 
  
291
 
  if (options->init_window_size == -1)
292
 
    options->init_window_size = cfg->log_iw_size;
293
 
  if (g_atomic_counter_get(&options->window_size) == -1)
294
 
    g_atomic_counter_set(&options->window_size, options->init_window_size);
295
 
  if (options->keep_hostname == -1)
296
 
    options->keep_hostname = cfg->keep_hostname;
297
 
  if (options->chain_hostnames == -1)
298
 
    options->chain_hostnames = cfg->chain_hostnames;
299
 
  if (options->use_dns == -1)
300
 
    options->use_dns = cfg->use_dns;
301
 
  if (options->use_fqdn == -1)
302
 
    options->use_fqdn = cfg->use_fqdn;
303
 
  if (options->use_dns_cache == -1)
304
 
    options->use_dns_cache = cfg->use_dns_cache;
305
 
  if (options->normalize_hostnames == -1)
306
 
    options->normalize_hostnames = cfg->normalize_hostnames;
307
 
  if (options->keep_timestamp == -1)
308
 
    options->keep_timestamp = cfg->keep_timestamp;
309
 
  options->group_name = group_name;
310
 
 
311
 
  source_group_name = g_strdup_printf(".source.%s", group_name);
312
 
  options->source_group_tag = log_tags_get_by_name(source_group_name);
313
 
  g_free(source_group_name);
314
 
}
315
 
 
316
 
void
317
 
log_source_options_destroy(LogSourceOptions *options)
318
 
{
319
 
  if (options->program_override)
320
 
    g_free(options->program_override);
321
 
  if (options->host_override)
322
 
    g_free(options->host_override);
323
 
}