~ubuntu-branches/ubuntu/trusty/syslog-ng/trusty-proposed

« back to all changes in this revision

Viewing changes to lib/logreader.c

  • Committer: Package Import Robot
  • Author(s): Laszlo Boszormenyi (GCS), Gergely Nagy
  • Date: 2011-10-11 14:30:48 UTC
  • mfrom: (1.3.7)
  • Revision ID: package-import@ubuntu.com-20111011143048-r1iljux9xbvj3lwh
Tags: 3.3.1.dfsg-1
* New upstream release with important fixes from upstream git tree with
  non-free manpages removed.
* Drop syslog-ng.conf(5) (closes: #496521).
* syslog-ng(8) is generated, and does not mention -Q anymore
  (closes: #616069).
* Supports CAP_SYSLOG on recent kernels (closes: #630172).
* Does not use g_timeout_add_seconds anymore (closes: #609154).

[ Gergely Nagy <algernon@madhouse-project.org> ]
* Update debian/copyright to DEP-5 format.
* Simplified the logrotate file by merging identical entries.
* Include local configuration files from /etc/syslog-ng/conf.d/ (Closes:
  #609050).
* Update syslog-ng.conf to be fully 3.3 compliant.
* Compress both source and binaries (except the syslog-ng meta
  package) with xz, instead of gzip.
* Use dpkg triggers to restart syslog-ng when appropriate.
* Include DFSG-free manual pages for all binaries.
* Build with Hardening enabled.
* Mention syslog(3) in /etc/default/syslog-ng, instead of
  <linux/kernel.h> (Closes: #608605)
* Support 'status' in the init script.
  Patch from Peter Eisentraut <petere@debian.org> (Closes: #644458)
* Build-Depend on libevtlog-dev (>= 0.2.12-5~) for correct shlibs.
* Use [linux-any] in Build-Depends instead of hardcoded links.
  (Closes: #634715)
* Use $SYSLOGNG_OPTS in the init script when reloading syslog-ng.
  (Closes: #589081)

Show diffs side-by-side

added added

removed removed

Lines of Context:
29
29
#include "stats.h"
30
30
#include "tags.h"
31
31
#include "cfg-parser.h"
 
32
#include "timeutils.h"
32
33
#include "compat.h"
 
34
#include "mainloop.h"
33
35
 
34
36
#include <sys/types.h>
35
37
#include <sys/socket.h>
41
43
#include <errno.h>
42
44
#include <stdio.h>
43
45
#include <stdlib.h>
 
46
#include <iv.h>
 
47
#include <iv_work.h>
44
48
 
45
49
/**
46
50
 * FIXME: LogReader has grown big enough that it is difficult to
69
73
 * Of course a similar change can be applied to LogWriters as well.
70
74
 **/
71
75
 
72
 
 
73
 
static gboolean log_reader_fetch_log(LogReader *self, LogProto *proto);
74
 
 
75
 
struct _LogReaderWatch
 
76
struct _LogReader
76
77
{
77
 
  GSource super;
78
 
  LogReader *reader;
79
 
  GPollFD pollfd;
 
78
  LogSource super;
80
79
  LogProto *proto;
81
 
  GTimeVal last_follow_freq_check;
 
80
  LogReaderWatch *source;
 
81
  gboolean immediate_check;
 
82
  gboolean waiting_for_preemption;
 
83
  LogPipe *control;
 
84
  LogReaderOptions *options;
 
85
  GSockAddr *peer_addr;
 
86
  gchar *follow_filename;
 
87
  ino_t inode;
 
88
  gint64 size;
 
89
 
 
90
  /* NOTE: these used to be LogReaderWatch members, which were merged into
 
91
   * LogReader with the multi-thread refactorization */
 
92
 
 
93
  struct iv_fd fd_watch;
 
94
  struct iv_timer follow_timer;
 
95
  struct iv_task restart_task;
 
96
  struct iv_event schedule_wakeup;
 
97
  MainLoopIOWorkerJob io_job;
 
98
  gboolean suspended:1;
 
99
  gint pollable_state;
 
100
  gint notify_code;
82
101
};
83
102
 
84
 
static gboolean
85
 
log_reader_fd_prepare(GSource *source,
86
 
                      gint *timeout)
87
 
{
88
 
  LogReaderWatch *self = (LogReaderWatch *) source;
89
 
  GIOCondition proto_cond;
90
 
 
91
 
  self->pollfd.revents = 0;
92
 
  self->pollfd.events = G_IO_ERR;
93
 
 
94
 
  /* never indicate readability if flow control prevents us from sending messages */
95
 
  
96
 
  if (!log_source_free_to_send(&self->reader->super))
97
 
    return FALSE;
98
 
 
99
 
  if (log_proto_prepare(self->proto, &self->pollfd.fd, &proto_cond, timeout))
100
 
    return TRUE;
101
 
 
102
 
  if (self->reader->immediate_check)
103
 
    {
104
 
      *timeout = 0;
105
 
      self->reader->immediate_check = FALSE;
106
 
      return FALSE;
107
 
    }
108
 
 
109
 
  if (self->reader->options->follow_freq > 0)
110
 
    {
111
 
      *timeout = self->reader->options->follow_freq;
112
 
      return FALSE;
113
 
    }
114
 
  
115
 
  self->pollfd.events |= proto_cond;
116
 
  return FALSE;
117
 
}
118
 
 
119
 
static gboolean
120
 
log_reader_fd_check(GSource *source)
121
 
{
122
 
  LogReaderWatch *self = (LogReaderWatch *) source;
123
 
 
124
 
  if (!log_source_free_to_send(&self->reader->super))
125
 
    return FALSE;
126
 
 
127
 
  if (self->reader->options->follow_freq > 0)
128
 
    {
129
 
      struct stat st, followed_st;
130
 
      off_t pos = -1;
131
 
      gint fd = log_proto_get_fd(self->reader->proto);
132
 
 
133
 
      /* FIXME: this is a _HUGE_ layering violation... */
134
 
      if (fd >= 0)
135
 
        {
136
 
          pos = lseek(fd, 0, SEEK_CUR);      
137
 
          if (pos == (off_t) -1)
138
 
            {
139
 
              msg_error("Error invoking seek on followed file",
140
 
                        evt_tag_errno("error", errno),
141
 
                        NULL);
142
 
              return FALSE;
143
 
            }
144
 
 
145
 
          if (fstat(fd, &st) < 0)
146
 
            {
147
 
              if (errno == ESTALE)
148
 
                {
149
 
                  msg_trace("log_reader_fd_check file moved ESTALE",
150
 
                            evt_tag_str("follow_filename",self->reader->follow_filename),
151
 
                            NULL);
152
 
                  log_pipe_notify(self->reader->control, &self->reader->super.super, NC_FILE_MOVED, self);
153
 
                  return FALSE;
154
 
                }
155
 
              else
156
 
                {
157
 
                  msg_error("Error invoking fstat() on followed file",
158
 
                            evt_tag_errno("error", errno),
159
 
                            NULL);
160
 
                  return FALSE;
161
 
                }
162
 
            }
163
 
 
164
 
          msg_trace("log_reader_fd_check",
165
 
                    evt_tag_int("pos", pos),
166
 
                    evt_tag_int("size", st.st_size),
167
 
                    NULL);
168
 
 
169
 
          if (pos < st.st_size)
170
 
            {
171
 
              return TRUE;
172
 
            }
173
 
          else if (pos == st.st_size)
174
 
            {
175
 
              log_pipe_notify(self->reader->control, &self->reader->super.super, NC_FILE_EOF, self);
176
 
            }
177
 
          else if (pos > st.st_size)
178
 
            { 
179
 
              /* reopen the file */
180
 
              log_pipe_notify(self->reader->control, &self->reader->super.super, NC_FILE_MOVED, self);
181
 
            }
182
 
        } 
183
 
        
184
 
      if (self->reader->follow_filename && stat(self->reader->follow_filename, &followed_st) != -1)
 
103
static gboolean log_reader_fetch_log(LogReader *self);
 
104
 
 
105
static gboolean log_reader_start_watches(LogReader *self);
 
106
static void log_reader_stop_watches(LogReader *self);
 
107
static void log_reader_update_watches(LogReader *self);
 
108
 
 
109
 
 
110
static void
 
111
log_reader_work_perform(void *s)
 
112
{
 
113
  LogReader *self = (LogReader *) s;
 
114
 
 
115
  self->notify_code = log_reader_fetch_log(self);
 
116
}
 
117
 
 
118
static void
 
119
log_reader_work_finished(void *s)
 
120
{
 
121
  LogReader *self = (LogReader *) s;
 
122
 
 
123
  if (self->notify_code == 0)
 
124
    {
 
125
      if (self->super.super.flags & PIF_INITIALIZED)
 
126
        {
 
127
          /* reenable polling the source assuming that we're still in
 
128
           * business (e.g. the reader hasn't been uninitialized) */
 
129
 
 
130
          log_reader_start_watches(self);
 
131
        }
 
132
    }
 
133
  else
 
134
    {
 
135
      log_pipe_notify(self->control, &self->super.super, self->notify_code, self);
 
136
    }
 
137
  log_pipe_unref(&self->super.super);
 
138
}
 
139
 
 
140
static void
 
141
log_reader_wakeup_triggered(gpointer s)
 
142
{
 
143
  LogReader *self = (LogReader *) s;
 
144
 
 
145
  if (!self->io_job.working && self->suspended)
 
146
    {
 
147
      /* NOTE: by the time working is set to FALSE we're over an
 
148
       * update_watches call.  So it is called either here (when
 
149
       * work_finished has done its work) or from work_finished above. The
 
150
       * two are not racing as both run in the main thread
 
151
       */
 
152
      log_reader_update_watches(self);
 
153
    }
 
154
}
 
155
 
 
156
/* NOTE: may be running in the destination's thread, thus proper locking must be used */
 
157
static void
 
158
log_reader_wakeup(LogSource *s)
 
159
{
 
160
  LogReader *self = (LogReader *) s;
 
161
 
 
162
  /*
 
163
   * We might get called even after this LogReader has been
 
164
   * deinitialized, in which case we must not do anything (since the
 
165
   * iv_event triggered here is not registered).
 
166
   *
 
167
   * This happens when log_writer_deinit() flushes its output queue
 
168
   * after the reader which produced the message has already been
 
169
   * deinited. Since init/deinit calls are made in the main thread, no
 
170
   * locking is needed.
 
171
   *
 
172
   */
 
173
 
 
174
  if (self->super.super.flags & PIF_INITIALIZED)
 
175
    iv_event_post(&self->schedule_wakeup);
 
176
}
 
177
 
 
178
static void
 
179
log_reader_io_process_input(gpointer s)
 
180
{
 
181
  LogReader *self = (LogReader *) s;
 
182
 
 
183
  log_reader_stop_watches(self);
 
184
  log_pipe_ref(&self->super.super);
 
185
  if ((self->options->flags & LR_THREADED))
 
186
    {
 
187
      main_loop_io_worker_job_submit(&self->io_job);
 
188
    }
 
189
  else
 
190
    {
 
191
      log_reader_work_perform(s);
 
192
      log_reader_work_finished(s);
 
193
    }
 
194
}
 
195
 
 
196
/* follow timer callback. Check if the file has new content, or deleted or
 
197
 * moved.  Ran every follow_freq seconds.  */
 
198
static void
 
199
log_reader_io_follow_file(gpointer s)
 
200
{
 
201
  LogReader *self = (LogReader *) s;
 
202
  struct stat st, followed_st;
 
203
  off_t pos = -1;
 
204
  gint fd = log_proto_get_fd(self->proto);
 
205
 
 
206
  msg_trace("Checking if the followed file has new lines",
 
207
            evt_tag_str("follow_filename", self->follow_filename),
 
208
            NULL);
 
209
  if (fd >= 0)
 
210
    {
 
211
      pos = lseek(fd, 0, SEEK_CUR);
 
212
      if (pos == (off_t) -1)
 
213
        {
 
214
          msg_error("Error invoking seek on followed file",
 
215
                    evt_tag_errno("error", errno),
 
216
                    NULL);
 
217
          goto reschedule;
 
218
        }
 
219
 
 
220
      if (fstat(fd, &st) < 0)
 
221
        {
 
222
          if (errno == ESTALE)
 
223
            {
 
224
              msg_trace("log_reader_fd_check file moved ESTALE",
 
225
                        evt_tag_str("follow_filename", self->follow_filename),
 
226
                        NULL);
 
227
              log_pipe_notify(self->control, &self->super.super, NC_FILE_MOVED, self);
 
228
              return;
 
229
            }
 
230
          else
 
231
            {
 
232
              msg_error("Error invoking fstat() on followed file",
 
233
                        evt_tag_errno("error", errno),
 
234
                        NULL);
 
235
              goto reschedule;
 
236
            }
 
237
        }
 
238
 
 
239
      msg_trace("log_reader_fd_check",
 
240
                evt_tag_int("pos", pos),
 
241
                evt_tag_int("size", st.st_size),
 
242
                NULL);
 
243
 
 
244
      if (pos < st.st_size)
 
245
        {
 
246
          /* we have data to read */
 
247
          log_reader_io_process_input(s);
 
248
          return;
 
249
        }
 
250
      else if (pos == st.st_size)
 
251
        {
 
252
          /* we are at EOF */
 
253
          log_pipe_notify(self->control, &self->super.super, NC_FILE_EOF, self);
 
254
        }
 
255
      else if (pos > st.st_size)
 
256
        {
 
257
          /* the last known position is larger than the current size of the file. it got truncated. Restart from the beginning. */
 
258
          log_pipe_notify(self->control, &self->super.super, NC_FILE_MOVED, self);
 
259
 
 
260
          /* we may be freed by the time the notification above returns */
 
261
          return;
 
262
        }
 
263
    }
 
264
 
 
265
  if (self->follow_filename)
 
266
    {
 
267
      if (stat(self->follow_filename, &followed_st) != -1)
185
268
        {
186
269
          if (fd < 0 || (st.st_ino != followed_st.st_ino && followed_st.st_size > 0))
187
270
            {
188
271
              msg_trace("log_reader_fd_check file moved eof",
189
272
                        evt_tag_int("pos", pos),
190
273
                        evt_tag_int("size", followed_st.st_size),
191
 
                        evt_tag_str("follow_filename",self->reader->follow_filename),
 
274
                        evt_tag_str("follow_filename", self->follow_filename),
192
275
                        NULL);
193
276
              /* file was moved and we are at EOF, follow the new file */
194
 
              log_pipe_notify(self->reader->control, &self->reader->super.super, NC_FILE_MOVED, self);
 
277
              log_pipe_notify(self->control, &self->super.super, NC_FILE_MOVED, self);
 
278
              /* we may be freed by the time the notification above returns */
 
279
              return;
195
280
            }
196
281
        }
197
 
      else if (self->reader->follow_filename)
 
282
      else
198
283
        {
199
 
          GTimeVal now;
200
 
 
201
 
          g_source_get_current_time(source, &now);
202
 
          if (g_time_val_diff(&now, &self->last_follow_freq_check) > self->reader->options->follow_freq * 1000)
203
 
            {
204
 
              msg_verbose("Follow mode file still does not exist",
205
 
                          evt_tag_str("filename", self->reader->follow_filename),
206
 
                          NULL);
207
 
              self->last_follow_freq_check = now;
208
 
            }
 
284
          msg_verbose("Follow mode file still does not exist",
 
285
                      evt_tag_str("filename", self->follow_filename),
 
286
                      NULL);
209
287
        }
210
 
      return FALSE;
211
288
    }
212
 
    
213
 
  return !!(self->pollfd.revents & (G_IO_IN | G_IO_OUT | G_IO_ERR | G_IO_HUP));
214
 
}
215
 
 
 
289
 reschedule:
 
290
  log_reader_update_watches(self);
 
291
}
 
292
 
 
293
static void
 
294
log_reader_init_watches(LogReader *self)
 
295
{
 
296
  gint fd;
 
297
  GIOCondition cond;
 
298
 
 
299
  log_proto_prepare(self->proto, &fd, &cond);
 
300
 
 
301
  IV_FD_INIT(&self->fd_watch);
 
302
  self->fd_watch.cookie = self;
 
303
 
 
304
  IV_TIMER_INIT(&self->follow_timer);
 
305
  self->follow_timer.cookie = self;
 
306
  self->follow_timer.handler = log_reader_io_follow_file;
 
307
 
 
308
  IV_TASK_INIT(&self->restart_task);
 
309
  self->restart_task.cookie = self;
 
310
  self->restart_task.handler = log_reader_io_process_input;
 
311
 
 
312
  IV_EVENT_INIT(&self->schedule_wakeup);
 
313
  self->schedule_wakeup.cookie = self;
 
314
  self->schedule_wakeup.handler = log_reader_wakeup_triggered;
 
315
 
 
316
  main_loop_io_worker_job_init(&self->io_job);
 
317
  self->io_job.user_data = self;
 
318
  self->io_job.work = (void (*)(void *)) log_reader_work_perform;
 
319
  self->io_job.completion = (void (*)(void *)) log_reader_work_finished;
 
320
}
 
321
 
 
322
/* NOTE: the return value is only used during initialization, and it is not
 
323
 * expected that it'd change once it returns success */
216
324
static gboolean
217
 
log_reader_fd_dispatch(GSource *source,
218
 
                       GSourceFunc callback,
219
 
                       gpointer user_data)
 
325
log_reader_start_watches(LogReader *self)
220
326
{
221
 
  LogReaderWatch *self = (LogReaderWatch *) source;
222
 
 
223
 
  /* The window status can change between check() and dispatch()
224
 
   * because multiple tcp connections can have messages ready
225
 
   * from the same source at check() time, but the queue may
226
 
   * fill before we dispatch() them all
227
 
   */
228
 
  if (!log_source_free_to_send(&self->reader->super))
229
 
    return TRUE;
230
 
 
231
 
  if (!log_reader_fetch_log(self->reader, self->proto))
232
 
    {
233
 
      return FALSE;
234
 
    }
235
 
    
 
327
  gint fd;
 
328
  GIOCondition cond;
 
329
 
 
330
  log_proto_prepare(self->proto, &fd, &cond);
 
331
 
 
332
  if (self->pollable_state < 0 && fd >= 0)
 
333
    self->pollable_state = iv_fd_pollable(fd);
 
334
 
 
335
  if (self->options->follow_freq > 0)
 
336
    {
 
337
      /* follow freq specified (only the file source does that, go into timed polling */
 
338
 
 
339
      /* NOTE: the fd may not be set here, as it may not have been opened yet */
 
340
      iv_timer_register(&self->follow_timer);
 
341
    }
 
342
  else if (fd < 0)
 
343
    {
 
344
      msg_error("In order to poll non-yet-existing files, follow_freq() must be set",
 
345
                NULL);
 
346
      return FALSE;
 
347
    }
 
348
  else if (self->pollable_state > 0)
 
349
    {
 
350
      /* we have an FD, it is possible to poll it, register it  */
 
351
      self->fd_watch.fd = fd;
 
352
      iv_fd_register(&self->fd_watch);
 
353
    }
 
354
  else
 
355
    {
 
356
      msg_error("Unable to determine how to monitor this fd, follow_freq() not set and it is not possible to poll it with the current ivykis polling method, try changing IV_EXCLUDE_POLL_METHOD environment variable",
 
357
                evt_tag_int("fd", fd),
 
358
                NULL);
 
359
      return FALSE;
 
360
    }
 
361
 
 
362
  log_reader_update_watches(self);
236
363
  return TRUE;
237
364
}
238
365
 
 
366
 
239
367
static void
240
 
log_reader_fd_finalize(GSource *source)
 
368
log_reader_stop_watches(LogReader *self)
241
369
{
242
 
  LogReaderWatch *self = (LogReaderWatch *) source;
243
 
  log_proto_free(self->proto);
244
 
  log_pipe_unref(&self->reader->super.super);
 
370
  if (iv_fd_registered(&self->fd_watch))
 
371
    iv_fd_unregister(&self->fd_watch);
 
372
  if (iv_timer_registered(&self->follow_timer))
 
373
    iv_timer_unregister(&self->follow_timer);
 
374
  if (iv_task_registered(&self->restart_task))
 
375
    iv_task_unregister(&self->restart_task);
245
376
}
246
377
 
247
 
GSourceFuncs log_reader_source_funcs =
 
378
static void
 
379
log_reader_update_watches(LogReader *self)
248
380
{
249
 
  log_reader_fd_prepare,
250
 
  log_reader_fd_check,
251
 
  log_reader_fd_dispatch,
252
 
  log_reader_fd_finalize
253
 
};
 
381
  gint fd;
 
382
  GIOCondition cond;
 
383
  gboolean free_to_send;
254
384
 
255
 
static LogReaderWatch *
256
 
log_reader_watch_new(LogReader *reader, LogProto *proto)
257
 
{
258
 
  LogReaderWatch *self = (LogReaderWatch *) g_source_new(&log_reader_source_funcs, sizeof(LogReaderWatch));
 
385
  main_loop_assert_main_thread();
259
386
  
260
 
  log_pipe_ref(&reader->super.super);
261
 
  self->reader = reader;
262
 
  self->proto = proto;
263
 
  g_source_set_priority(&self->super, LOG_PRIORITY_READER);
264
 
  g_source_add_poll(&self->super, &self->pollfd);
265
 
  return self;
 
387
  self->suspended = FALSE;
 
388
  free_to_send = log_source_free_to_send(&self->super);
 
389
  if (!free_to_send ||
 
390
      self->immediate_check ||
 
391
      log_proto_prepare(self->proto, &fd, &cond))
 
392
    {
 
393
      /* we disable all I/O related callbacks here because we either know
 
394
       * that we can continue (e.g.  immediate_check == TRUE) or we know
 
395
       * that we can't continue even if data would be available (e.g.
 
396
       * free_to_send == FALSE)
 
397
       */
 
398
 
 
399
      self->immediate_check = FALSE;
 
400
      if (iv_fd_registered(&self->fd_watch))
 
401
        {
 
402
          iv_fd_set_handler_in(&self->fd_watch, NULL);
 
403
          iv_fd_set_handler_out(&self->fd_watch, NULL);
 
404
 
 
405
          /* we disable the error handler too, as it might be
 
406
           * triggered even when we don't want to read data
 
407
           * (e.g. log_source_free_to_send() is FALSE).
 
408
           *
 
409
           * And at least on Linux, it may happen that EPOLLERR is
 
410
           * set, while there's still data in the socket buffer.  Thus
 
411
           * in reaction to an EPOLLERR, we could possibly send
 
412
           * further messages without validating the
 
413
           * log_source_free_to_send() would allow us to, potentially
 
414
           * overflowing our window (and causing a failed assertion in
 
415
           * log_source_queue().
 
416
           */
 
417
 
 
418
          iv_fd_set_handler_err(&self->fd_watch, NULL);
 
419
        }
 
420
 
 
421
      if (iv_timer_registered(&self->follow_timer))
 
422
        iv_timer_unregister(&self->follow_timer);
 
423
 
 
424
      if (free_to_send)
 
425
        {
 
426
          /* we have data in our input buffer, we need to start working
 
427
           * on it immediately, without waiting for I/O events */
 
428
          if (!iv_task_registered(&self->restart_task))
 
429
            {
 
430
              iv_task_register(&self->restart_task);
 
431
            }
 
432
        }
 
433
      else
 
434
        {
 
435
          self->suspended = TRUE;
 
436
        }
 
437
      return;
 
438
    }
 
439
 
 
440
  if (iv_fd_registered(&self->fd_watch))
 
441
    {
 
442
      /* this branch is executed when our fd is connected to a non-file
 
443
       * source (e.g. TCP, UDP socket). We set up I/O callbacks here.
 
444
       * files cannot be polled using epoll, as it causes an I/O error
 
445
       * (thus abort in ivykis).
 
446
       */
 
447
      if (cond & G_IO_IN)
 
448
        iv_fd_set_handler_in(&self->fd_watch, log_reader_io_process_input);
 
449
      else
 
450
        iv_fd_set_handler_in(&self->fd_watch, NULL);
 
451
 
 
452
      if (cond & G_IO_OUT)
 
453
        iv_fd_set_handler_out(&self->fd_watch, log_reader_io_process_input);
 
454
      else
 
455
        iv_fd_set_handler_out(&self->fd_watch, NULL);
 
456
 
 
457
      if (cond & (G_IO_IN + G_IO_OUT))
 
458
        iv_fd_set_handler_err(&self->fd_watch, log_reader_io_process_input);
 
459
      else
 
460
        iv_fd_set_handler_err(&self->fd_watch, NULL);
 
461
 
 
462
    }
 
463
  else
 
464
    {
 
465
      if (self->options->follow_freq > 0)
 
466
        {
 
467
          if (iv_timer_registered(&self->follow_timer))
 
468
            iv_timer_unregister(&self->follow_timer);
 
469
          iv_validate_now();
 
470
          self->follow_timer.expires = iv_now;
 
471
          timespec_add_msec(&self->follow_timer.expires, self->options->follow_freq);
 
472
          iv_timer_register(&self->follow_timer);
 
473
        }
 
474
      else
 
475
        {
 
476
          /* NOTE: we don't need to unregister the timer here as follow_freq
 
477
           * never changes during runtime, thus if ever it was registered that
 
478
           * also means that we go into the if branch above. */
 
479
        }
 
480
    }
266
481
}
267
482
 
268
 
 
269
483
static gboolean
270
484
log_reader_handle_line(LogReader *self, const guchar *line, gint length, GSockAddr *saddr)
271
485
{
272
486
  LogMessage *m;
273
487
  LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
274
 
  gint i;
275
488
  
276
489
  msg_debug("Incoming log entry", 
277
490
            evt_tag_printf("line", "%.*s", length, line),
280
493
  m = log_msg_new((gchar *) line, length,
281
494
                  saddr,
282
495
                  &self->options->parse_options);
283
 
  
 
496
 
 
497
  log_msg_refcache_start_producer(m);
284
498
  if (!m->saddr && self->peer_addr)
285
499
    {
286
500
      m->saddr = g_sockaddr_ref(self->peer_addr);
287
501
    }
288
502
 
289
 
  if (self->options->tags)
290
 
    {
291
 
      for (i = 0; i < self->options->tags->len; i++)
292
 
        {
293
 
          log_msg_set_tag_by_id(m, g_array_index(self->options->tags, LogTagId, i));
294
 
        }
295
 
    }
296
 
 
297
 
  log_msg_set_tag_by_id(m, self->super.options->source_group_tag);
298
 
 
299
503
  log_pipe_queue(&self->super.super, m, &path_options);
 
504
  log_msg_refcache_stop();
300
505
  return log_source_free_to_send(&self->super);
301
506
}
302
507
 
303
 
static gboolean
304
 
log_reader_fetch_log(LogReader *self, LogProto *proto)
 
508
/* returns: notify_code (NC_XXXX) or 0 for success */
 
509
static gint
 
510
log_reader_fetch_log(LogReader *self)
305
511
{
306
512
  GSockAddr *sa;
307
513
  gint msg_count = 0;
309
515
 
310
516
  if (self->waiting_for_preemption)
311
517
    may_read = FALSE;
312
 
    
 
518
 
313
519
  /* NOTE: this loop is here to decrease the load on the main loop, we try
314
520
   * to fetch a couple of messages in a single run (but only up to
315
521
   * fetch_limit).
316
522
   */
317
 
  while (msg_count < self->options->fetch_limit)
 
523
  while (msg_count < self->options->fetch_limit && !main_loop_io_worker_job_quit())
318
524
    {
319
525
      const guchar *msg;
320
526
      gsize msg_len;
334
540
        {
335
541
        case LPS_EOF:
336
542
        case LPS_ERROR:
337
 
          log_pipe_notify(self->control, &self->super.super, status == LPS_ERROR ? NC_READ_ERROR : NC_CLOSE, self);
338
543
          g_sockaddr_unref(sa);
339
 
          return FALSE;
 
544
          return status == LPS_ERROR ? NC_READ_ERROR : NC_CLOSE;
340
545
        case LPS_SUCCESS:
341
546
          break;
342
547
        default:
378
583
    }
379
584
  if (msg_count == self->options->fetch_limit)
380
585
    self->immediate_check = TRUE;
381
 
  return TRUE;
 
586
  return 0;
382
587
}
383
588
 
384
589
static gboolean
417
622
                NULL);
418
623
      return FALSE;
419
624
    }
420
 
  /* the source added below references this logreader, it will be unref'd
421
 
     when the source is destroyed */ 
422
 
  self->source = log_reader_watch_new(self, self->proto);
423
 
  g_source_attach(&self->source->super, NULL);
424
 
    
 
625
  if (!log_reader_start_watches(self))
 
626
    return FALSE;
 
627
  iv_event_register(&self->schedule_wakeup);
 
628
 
425
629
  return TRUE;
426
630
}
427
631
 
430
634
{
431
635
  LogReader *self = (LogReader *) s;
432
636
  
433
 
  if (self->source)
434
 
    {
435
 
      g_source_destroy(&self->source->super);
436
 
      g_source_unref(&self->source->super);
437
 
      self->source = NULL;
438
 
    }
439
 
    
 
637
  main_loop_assert_main_thread();
 
638
 
 
639
  iv_event_unregister(&self->schedule_wakeup);
 
640
  log_reader_stop_watches(self);
440
641
  if (!log_source_deinit(s))
441
642
    return FALSE;
442
643
 
450
651
  
451
652
  /* when this function is called the source is already removed, because it
452
653
     holds a reference to this reader */
 
654
  log_proto_free(self->proto);
453
655
  log_pipe_unref(self->control);
454
656
  g_sockaddr_unref(self->peer_addr);
455
657
  g_free(self->follow_filename);
461
663
{
462
664
  LogReader *self = (LogReader *) s;
463
665
 
464
 
  log_source_set_options(&self->super, &options->super, stats_level, stats_source, stats_id, stats_instance);
 
666
  log_source_set_options(&self->super, &options->super, stats_level, stats_source, stats_id, stats_instance, (options->flags & LR_THREADED));
465
667
 
466
668
  log_pipe_unref(self->control);
467
669
  log_pipe_ref(control);
495
697
  self->super.super.init = log_reader_init;
496
698
  self->super.super.deinit = log_reader_deinit;
497
699
  self->super.super.free_fn = log_reader_free;
 
700
  self->super.wakeup = log_reader_wakeup;
498
701
  self->proto = proto;
499
702
  self->immediate_check = FALSE;
 
703
  self->pollable_state = -1;
 
704
  log_reader_init_watches(self);
500
705
  return &self->super.super;
501
706
}
502
707
 
514
719
  log_source_options_defaults(&options->super);
515
720
  msg_format_options_defaults(&options->parse_options);
516
721
  options->padding = 0;
517
 
  options->fetch_limit = -1;
 
722
  options->fetch_limit = 10;
518
723
  options->msg_size = -1;
519
724
  options->follow_freq = -1; 
520
725
  options->text_encoding = NULL;
568
773
  options->parse_options.recv_time_zone_info = NULL;
569
774
  text_encoding = options->text_encoding;
570
775
  options->text_encoding = NULL;
571
 
  tags = options->tags;
572
 
  options->tags = NULL;
573
776
 
574
777
  /* NOTE: having to save super's variables is a crude hack, but I know of
575
778
   * no other way to do it in the scheme described above. Be sure that you
576
779
   * know what you are doing when you modify this code. */
577
780
  
 
781
  tags = options->super.tags;
 
782
  options->super.tags = NULL;
578
783
  host_override = options->super.host_override;
579
784
  options->super.host_override = NULL;
580
785
  program_override = options->super.program_override;
585
790
  format_handler = options->parse_options.format_handler;
586
791
  options->parse_options.format_handler = NULL;
587
792
 
 
793
  /***********************************************************************
 
794
   * PLEASE NOTE THIS. please read the comment at the top of the function
 
795
   ***********************************************************************/
588
796
  log_reader_options_destroy(options);
589
797
 
590
798
  options->parse_options.format = format;
592
800
 
593
801
  options->super.host_override = host_override;
594
802
  options->super.program_override = program_override;
 
803
  options->super.tags = tags;
595
804
  
596
805
  options->parse_options.recv_time_zone = recv_time_zone;
597
806
  options->parse_options.recv_time_zone_info = recv_time_zone_info;
598
807
  options->text_encoding = text_encoding;
599
 
  options->tags = tags;
600
808
  options->parse_options.format = format;
601
809
 
602
810
  log_source_options_init(&options->super, cfg, group_name);
603
811
  msg_format_options_init(&options->parse_options, cfg);
604
812
 
605
 
  if (options->fetch_limit == -1)
606
 
    options->fetch_limit = cfg->log_fetch_limit;
607
813
  if (options->msg_size == -1)
608
814
    options->msg_size = cfg->log_msg_size;
609
815
  if (options->follow_freq == -1)
623
829
    }
624
830
  if (options->text_encoding)
625
831
    options->parse_options.flags |= LP_ASSUME_UTF8;
 
832
  if (cfg->threaded)
 
833
    options->flags |= LR_THREADED;
626
834
}
627
835
 
628
836
void
635
843
      g_free(options->text_encoding);
636
844
      options->text_encoding = NULL;
637
845
    }
638
 
  if (options->tags)
639
 
    {
640
 
      g_array_free(options->tags, TRUE);
641
 
      options->tags = NULL;
642
 
    }
643
846
}
644
847
 
645
848
CfgFlagHandler log_reader_flag_handlers[] =
646
849
{
647
850
  /* LogParseOptions */
 
851
  /* NOTE: underscores are automatically converted to dashes */
648
852
  { "no-parse",                   CFH_SET, offsetof(LogReaderOptions, parse_options.flags), LP_NOPARSE },
649
853
  { "check-hostname",             CFH_SET, offsetof(LogReaderOptions, parse_options.flags), LP_CHECK_HOSTNAME },
650
854
  { "syslog-protocol",            CFH_SET, offsetof(LogReaderOptions, parse_options.flags), LP_SYSLOG_PROTOCOL },
659
863
  /* LogReaderOptions */
660
864
  { "kernel",                     CFH_SET, offsetof(LogReaderOptions, flags),               LR_KERNEL },
661
865
  { "empty-lines",                CFH_SET, offsetof(LogReaderOptions, flags),               LR_EMPTY_LINES },
 
866
  { "threaded",                   CFH_SET, offsetof(LogReaderOptions, flags),               LR_THREADED },
 
867
  { NULL },
662
868
};
663
869
 
664
870
gboolean
666
872
{
667
873
  return cfg_process_flag(log_reader_flag_handlers, options, flag);
668
874
}
669
 
 
670
 
void
671
 
log_reader_options_set_tags(LogReaderOptions *options, GList *tags)
672
 
{
673
 
  LogTagId id;
674
 
 
675
 
  if (!options->tags)
676
 
    options->tags = g_array_new(FALSE, FALSE, sizeof(LogTagId));
677
 
 
678
 
  while (tags)
679
 
    {
680
 
      id = log_tags_get_by_name((gchar *) tags->data);
681
 
      g_array_append_val(options->tags, id);
682
 
 
683
 
      g_free(tags->data);
684
 
      tags = g_list_delete_link(tags, tags);
685
 
    }
686
 
}