69
73
* Of course a similar change can be applied to LogWriters as well.
73
static gboolean log_reader_fetch_log(LogReader *self, LogProto *proto);
75
struct _LogReaderWatch
81
GTimeVal last_follow_freq_check;
80
LogReaderWatch *source;
81
gboolean immediate_check;
82
gboolean waiting_for_preemption;
84
LogReaderOptions *options;
86
gchar *follow_filename;
90
/* NOTE: these used to be LogReaderWatch members, which were merged into
91
* LogReader with the multi-thread refactorization */
93
struct iv_fd fd_watch;
94
struct iv_timer follow_timer;
95
struct iv_task restart_task;
96
struct iv_event schedule_wakeup;
97
MainLoopIOWorkerJob io_job;
85
log_reader_fd_prepare(GSource *source,
88
LogReaderWatch *self = (LogReaderWatch *) source;
89
GIOCondition proto_cond;
91
self->pollfd.revents = 0;
92
self->pollfd.events = G_IO_ERR;
94
/* never indicate readability if flow control prevents us from sending messages */
96
if (!log_source_free_to_send(&self->reader->super))
99
if (log_proto_prepare(self->proto, &self->pollfd.fd, &proto_cond, timeout))
102
if (self->reader->immediate_check)
105
self->reader->immediate_check = FALSE;
109
if (self->reader->options->follow_freq > 0)
111
*timeout = self->reader->options->follow_freq;
115
self->pollfd.events |= proto_cond;
120
log_reader_fd_check(GSource *source)
122
LogReaderWatch *self = (LogReaderWatch *) source;
124
if (!log_source_free_to_send(&self->reader->super))
127
if (self->reader->options->follow_freq > 0)
129
struct stat st, followed_st;
131
gint fd = log_proto_get_fd(self->reader->proto);
133
/* FIXME: this is a _HUGE_ layering violation... */
136
pos = lseek(fd, 0, SEEK_CUR);
137
if (pos == (off_t) -1)
139
msg_error("Error invoking seek on followed file",
140
evt_tag_errno("error", errno),
145
if (fstat(fd, &st) < 0)
149
msg_trace("log_reader_fd_check file moved ESTALE",
150
evt_tag_str("follow_filename",self->reader->follow_filename),
152
log_pipe_notify(self->reader->control, &self->reader->super.super, NC_FILE_MOVED, self);
157
msg_error("Error invoking fstat() on followed file",
158
evt_tag_errno("error", errno),
164
msg_trace("log_reader_fd_check",
165
evt_tag_int("pos", pos),
166
evt_tag_int("size", st.st_size),
169
if (pos < st.st_size)
173
else if (pos == st.st_size)
175
log_pipe_notify(self->reader->control, &self->reader->super.super, NC_FILE_EOF, self);
177
else if (pos > st.st_size)
179
/* reopen the file */
180
log_pipe_notify(self->reader->control, &self->reader->super.super, NC_FILE_MOVED, self);
184
if (self->reader->follow_filename && stat(self->reader->follow_filename, &followed_st) != -1)
103
static gboolean log_reader_fetch_log(LogReader *self);
105
static gboolean log_reader_start_watches(LogReader *self);
106
static void log_reader_stop_watches(LogReader *self);
107
static void log_reader_update_watches(LogReader *self);
111
log_reader_work_perform(void *s)
113
LogReader *self = (LogReader *) s;
115
self->notify_code = log_reader_fetch_log(self);
119
log_reader_work_finished(void *s)
121
LogReader *self = (LogReader *) s;
123
if (self->notify_code == 0)
125
if (self->super.super.flags & PIF_INITIALIZED)
127
/* reenable polling the source assuming that we're still in
128
* business (e.g. the reader hasn't been uninitialized) */
130
log_reader_start_watches(self);
135
log_pipe_notify(self->control, &self->super.super, self->notify_code, self);
137
log_pipe_unref(&self->super.super);
141
log_reader_wakeup_triggered(gpointer s)
143
LogReader *self = (LogReader *) s;
145
if (!self->io_job.working && self->suspended)
147
/* NOTE: by the time working is set to FALSE we're over an
148
* update_watches call. So it is called either here (when
149
* work_finished has done its work) or from work_finished above. The
150
* two are not racing as both run in the main thread
152
log_reader_update_watches(self);
156
/* NOTE: may be running in the destination's thread, thus proper locking must be used */
158
log_reader_wakeup(LogSource *s)
160
LogReader *self = (LogReader *) s;
163
* We might get called even after this LogReader has been
164
* deinitialized, in which case we must not do anything (since the
165
* iv_event triggered here is not registered).
167
* This happens when log_writer_deinit() flushes its output queue
168
* after the reader which produced the message has already been
169
* deinited. Since init/deinit calls are made in the main thread, no
174
if (self->super.super.flags & PIF_INITIALIZED)
175
iv_event_post(&self->schedule_wakeup);
179
log_reader_io_process_input(gpointer s)
181
LogReader *self = (LogReader *) s;
183
log_reader_stop_watches(self);
184
log_pipe_ref(&self->super.super);
185
if ((self->options->flags & LR_THREADED))
187
main_loop_io_worker_job_submit(&self->io_job);
191
log_reader_work_perform(s);
192
log_reader_work_finished(s);
196
/* follow timer callback. Check if the file has new content, or deleted or
197
* moved. Ran every follow_freq seconds. */
199
log_reader_io_follow_file(gpointer s)
201
LogReader *self = (LogReader *) s;
202
struct stat st, followed_st;
204
gint fd = log_proto_get_fd(self->proto);
206
msg_trace("Checking if the followed file has new lines",
207
evt_tag_str("follow_filename", self->follow_filename),
211
pos = lseek(fd, 0, SEEK_CUR);
212
if (pos == (off_t) -1)
214
msg_error("Error invoking seek on followed file",
215
evt_tag_errno("error", errno),
220
if (fstat(fd, &st) < 0)
224
msg_trace("log_reader_fd_check file moved ESTALE",
225
evt_tag_str("follow_filename", self->follow_filename),
227
log_pipe_notify(self->control, &self->super.super, NC_FILE_MOVED, self);
232
msg_error("Error invoking fstat() on followed file",
233
evt_tag_errno("error", errno),
239
msg_trace("log_reader_fd_check",
240
evt_tag_int("pos", pos),
241
evt_tag_int("size", st.st_size),
244
if (pos < st.st_size)
246
/* we have data to read */
247
log_reader_io_process_input(s);
250
else if (pos == st.st_size)
253
log_pipe_notify(self->control, &self->super.super, NC_FILE_EOF, self);
255
else if (pos > st.st_size)
257
/* the last known position is larger than the current size of the file. it got truncated. Restart from the beginning. */
258
log_pipe_notify(self->control, &self->super.super, NC_FILE_MOVED, self);
260
/* we may be freed by the time the notification above returns */
265
if (self->follow_filename)
267
if (stat(self->follow_filename, &followed_st) != -1)
186
269
if (fd < 0 || (st.st_ino != followed_st.st_ino && followed_st.st_size > 0))
188
271
msg_trace("log_reader_fd_check file moved eof",
189
272
evt_tag_int("pos", pos),
190
273
evt_tag_int("size", followed_st.st_size),
191
evt_tag_str("follow_filename",self->reader->follow_filename),
274
evt_tag_str("follow_filename", self->follow_filename),
193
276
/* file was moved and we are at EOF, follow the new file */
194
log_pipe_notify(self->reader->control, &self->reader->super.super, NC_FILE_MOVED, self);
277
log_pipe_notify(self->control, &self->super.super, NC_FILE_MOVED, self);
278
/* we may be freed by the time the notification above returns */
197
else if (self->reader->follow_filename)
201
g_source_get_current_time(source, &now);
202
if (g_time_val_diff(&now, &self->last_follow_freq_check) > self->reader->options->follow_freq * 1000)
204
msg_verbose("Follow mode file still does not exist",
205
evt_tag_str("filename", self->reader->follow_filename),
207
self->last_follow_freq_check = now;
284
msg_verbose("Follow mode file still does not exist",
285
evt_tag_str("filename", self->follow_filename),
213
return !!(self->pollfd.revents & (G_IO_IN | G_IO_OUT | G_IO_ERR | G_IO_HUP));
290
log_reader_update_watches(self);
294
log_reader_init_watches(LogReader *self)
299
log_proto_prepare(self->proto, &fd, &cond);
301
IV_FD_INIT(&self->fd_watch);
302
self->fd_watch.cookie = self;
304
IV_TIMER_INIT(&self->follow_timer);
305
self->follow_timer.cookie = self;
306
self->follow_timer.handler = log_reader_io_follow_file;
308
IV_TASK_INIT(&self->restart_task);
309
self->restart_task.cookie = self;
310
self->restart_task.handler = log_reader_io_process_input;
312
IV_EVENT_INIT(&self->schedule_wakeup);
313
self->schedule_wakeup.cookie = self;
314
self->schedule_wakeup.handler = log_reader_wakeup_triggered;
316
main_loop_io_worker_job_init(&self->io_job);
317
self->io_job.user_data = self;
318
self->io_job.work = (void (*)(void *)) log_reader_work_perform;
319
self->io_job.completion = (void (*)(void *)) log_reader_work_finished;
322
/* NOTE: the return value is only used during initialization, and it is not
323
* expected that it'd change once it returns success */
217
log_reader_fd_dispatch(GSource *source,
218
GSourceFunc callback,
325
log_reader_start_watches(LogReader *self)
221
LogReaderWatch *self = (LogReaderWatch *) source;
223
/* The window status can change between check() and dispatch()
224
* because multiple tcp connections can have messages ready
225
* from the same source at check() time, but the queue may
226
* fill before we dispatch() them all
228
if (!log_source_free_to_send(&self->reader->super))
231
if (!log_reader_fetch_log(self->reader, self->proto))
330
log_proto_prepare(self->proto, &fd, &cond);
332
if (self->pollable_state < 0 && fd >= 0)
333
self->pollable_state = iv_fd_pollable(fd);
335
if (self->options->follow_freq > 0)
337
/* follow freq specified (only the file source does that, go into timed polling */
339
/* NOTE: the fd may not be set here, as it may not have been opened yet */
340
iv_timer_register(&self->follow_timer);
344
msg_error("In order to poll non-yet-existing files, follow_freq() must be set",
348
else if (self->pollable_state > 0)
350
/* we have an FD, it is possible to poll it, register it */
351
self->fd_watch.fd = fd;
352
iv_fd_register(&self->fd_watch);
356
msg_error("Unable to determine how to monitor this fd, follow_freq() not set and it is not possible to poll it with the current ivykis polling method, try changing IV_EXCLUDE_POLL_METHOD environment variable",
357
evt_tag_int("fd", fd),
362
log_reader_update_watches(self);
240
log_reader_fd_finalize(GSource *source)
368
log_reader_stop_watches(LogReader *self)
242
LogReaderWatch *self = (LogReaderWatch *) source;
243
log_proto_free(self->proto);
244
log_pipe_unref(&self->reader->super.super);
370
if (iv_fd_registered(&self->fd_watch))
371
iv_fd_unregister(&self->fd_watch);
372
if (iv_timer_registered(&self->follow_timer))
373
iv_timer_unregister(&self->follow_timer);
374
if (iv_task_registered(&self->restart_task))
375
iv_task_unregister(&self->restart_task);
247
GSourceFuncs log_reader_source_funcs =
379
log_reader_update_watches(LogReader *self)
249
log_reader_fd_prepare,
251
log_reader_fd_dispatch,
252
log_reader_fd_finalize
383
gboolean free_to_send;
255
static LogReaderWatch *
256
log_reader_watch_new(LogReader *reader, LogProto *proto)
258
LogReaderWatch *self = (LogReaderWatch *) g_source_new(&log_reader_source_funcs, sizeof(LogReaderWatch));
385
main_loop_assert_main_thread();
260
log_pipe_ref(&reader->super.super);
261
self->reader = reader;
263
g_source_set_priority(&self->super, LOG_PRIORITY_READER);
264
g_source_add_poll(&self->super, &self->pollfd);
387
self->suspended = FALSE;
388
free_to_send = log_source_free_to_send(&self->super);
390
self->immediate_check ||
391
log_proto_prepare(self->proto, &fd, &cond))
393
/* we disable all I/O related callbacks here because we either know
394
* that we can continue (e.g. immediate_check == TRUE) or we know
395
* that we can't continue even if data would be available (e.g.
396
* free_to_send == FALSE)
399
self->immediate_check = FALSE;
400
if (iv_fd_registered(&self->fd_watch))
402
iv_fd_set_handler_in(&self->fd_watch, NULL);
403
iv_fd_set_handler_out(&self->fd_watch, NULL);
405
/* we disable the error handler too, as it might be
406
* triggered even when we don't want to read data
407
* (e.g. log_source_free_to_send() is FALSE).
409
* And at least on Linux, it may happen that EPOLLERR is
410
* set, while there's still data in the socket buffer. Thus
411
* in reaction to an EPOLLERR, we could possibly send
412
* further messages without validating the
413
* log_source_free_to_send() would allow us to, potentially
414
* overflowing our window (and causing a failed assertion in
415
* log_source_queue().
418
iv_fd_set_handler_err(&self->fd_watch, NULL);
421
if (iv_timer_registered(&self->follow_timer))
422
iv_timer_unregister(&self->follow_timer);
426
/* we have data in our input buffer, we need to start working
427
* on it immediately, without waiting for I/O events */
428
if (!iv_task_registered(&self->restart_task))
430
iv_task_register(&self->restart_task);
435
self->suspended = TRUE;
440
if (iv_fd_registered(&self->fd_watch))
442
/* this branch is executed when our fd is connected to a non-file
443
* source (e.g. TCP, UDP socket). We set up I/O callbacks here.
444
* files cannot be polled using epoll, as it causes an I/O error
445
* (thus abort in ivykis).
448
iv_fd_set_handler_in(&self->fd_watch, log_reader_io_process_input);
450
iv_fd_set_handler_in(&self->fd_watch, NULL);
453
iv_fd_set_handler_out(&self->fd_watch, log_reader_io_process_input);
455
iv_fd_set_handler_out(&self->fd_watch, NULL);
457
if (cond & (G_IO_IN + G_IO_OUT))
458
iv_fd_set_handler_err(&self->fd_watch, log_reader_io_process_input);
460
iv_fd_set_handler_err(&self->fd_watch, NULL);
465
if (self->options->follow_freq > 0)
467
if (iv_timer_registered(&self->follow_timer))
468
iv_timer_unregister(&self->follow_timer);
470
self->follow_timer.expires = iv_now;
471
timespec_add_msec(&self->follow_timer.expires, self->options->follow_freq);
472
iv_timer_register(&self->follow_timer);
476
/* NOTE: we don't need to unregister the timer here as follow_freq
477
* never changes during runtime, thus if ever it was registered that
478
* also means that we go into the if branch above. */
270
484
log_reader_handle_line(LogReader *self, const guchar *line, gint length, GSockAddr *saddr)
273
487
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
276
489
msg_debug("Incoming log entry",
277
490
evt_tag_printf("line", "%.*s", length, line),