72
static gboolean log_writer_flush(LogWriter *self, gboolean flush_all);
116
static gboolean log_writer_flush(LogWriter *self, LogWriterFlushMode flush_mode);
73
117
static void log_writer_broken(LogWriter *self, gint notify_code);
74
static gboolean log_writer_throttling(LogWriter *self);
78
log_writer_fd_prepare(GSource *source,
81
LogWriterWatch *self = (LogWriterWatch *) source;
82
gint64 num_elements = log_queue_get_length(self->writer->queue);
84
GIOCondition proto_cond;
86
self->pollfd.events = G_IO_ERR;
87
self->pollfd.revents = 0;
89
g_source_get_current_time(source, &now);
90
if (log_proto_prepare(self->proto, &self->pollfd.fd, &proto_cond, timeout))
93
/* recalculate buckets */
95
if (self->writer->options->throttle > 0)
118
static void log_writer_start_watches(LogWriter *self);
119
static void log_writer_stop_watches(LogWriter *self);
120
static void log_writer_update_watches(LogWriter *self);
121
static void log_writer_suspend(LogWriter *self);
124
log_writer_work_perform(gpointer s)
126
LogWriter *self = (LogWriter *) s;
128
g_assert((self->super.flags & PIF_INITIALIZED) != 0);
129
self->work_result = log_writer_flush(self, self->flush_waiting_for_timeout ? LW_FLUSH_BUFFER : LW_FLUSH_NORMAL);
133
log_writer_work_finished(gpointer s)
135
LogWriter *self = (LogWriter *) s;
137
main_loop_assert_main_thread();
138
self->flush_waiting_for_timeout = FALSE;
140
if (self->pending_proto_present)
100
/* throttling is enabled, calculate new buckets */
101
if (self->last_throttle_check.tv_sec != 0)
103
diff = g_time_val_diff(&now, &self->last_throttle_check);
108
self->last_throttle_check = now;
110
new_buckets = (self->writer->options->throttle * diff) / G_USEC_PER_SEC;
114
/* if new_buckets is zero, we don't save the current time as
115
* last_throttle_check. The reason is that new_buckets could be
116
* rounded to zero when only a minimal interval passes between
119
self->writer->throttle_buckets = MIN(self->writer->options->throttle, self->writer->throttle_buckets + new_buckets);
120
self->last_throttle_check = now;
142
/* pending proto is only set in the main thread, so no need to
143
* lock it before coming here. After we're syncing with the
144
* log_writer_reopen() call, quite possibly coming from a
145
* non-main thread. */
147
g_static_mutex_lock(&self->pending_proto_lock);
149
log_proto_free(self->proto);
151
self->proto = self->pending_proto;
152
self->pending_proto = NULL;
153
self->pending_proto_present = FALSE;
155
g_cond_signal(self->pending_proto_cond);
156
g_static_mutex_unlock(&self->pending_proto_lock);
124
if (G_UNLIKELY(self->error_suspend))
159
if (!self->work_result)
126
*timeout = g_time_val_diff(&self->error_suspend_target, &now) / 1000;
161
log_writer_broken(self, NC_WRITE_ERROR);
129
msg_notice("Error suspend timeout has elapsed, attempting to write again",
164
log_writer_suspend(self);
165
msg_notice("Suspending write operation because of an I/O error",
130
166
evt_tag_int("fd", log_proto_get_fd(self->proto)),
167
evt_tag_int("time_reopen", self->options->time_reopen),
132
self->error_suspend = FALSE;
141
if ((self->writer->options->flush_lines == 0 && (!log_writer_throttling(self->writer) && num_elements != 0)) ||
142
(self->writer->options->flush_lines > 0 && (!log_writer_throttling(self->writer) && num_elements >= self->writer->options->flush_lines)))
144
/* we need to flush our buffers */
145
self->pollfd.events |= proto_cond;
147
else if (num_elements && !log_writer_throttling(self->writer))
149
/* our buffer does not contain enough elements to flush, but we do not
150
* want to wait more than flush_timeout time */
152
if (!self->flush_waiting_for_timeout)
156
*timeout = self->writer->options->flush_timeout;
157
g_source_get_current_time(source, &self->flush_target);
158
g_time_val_add(&self->flush_target, *timeout * 1000);
159
self->flush_waiting_for_timeout = TRUE;
163
glong to = g_time_val_diff(&self->flush_target, &now) / 1000;
166
/* timeout elapsed, start polling again */
167
if (self->writer->flags & LW_ALWAYS_WRITABLE)
169
self->pollfd.events = proto_cond;
173
if ((self->super.flags & PIF_INITIALIZED) && self->proto)
175
/* reenable polling the source, but only if we're still initialized */
176
log_writer_start_watches(self);
180
log_pipe_unref(&self->super);
184
log_writer_io_flush_output(gpointer s)
186
LogWriter *self = (LogWriter *) s;
188
main_loop_assert_main_thread();
190
log_writer_stop_watches(self);
191
log_pipe_ref(&self->super);
192
if ((self->options->options & LWO_THREADED))
194
main_loop_io_worker_job_submit(&self->io_job);
198
log_writer_work_perform(s);
199
log_writer_work_finished(s);
204
log_writer_io_error(gpointer s)
206
LogWriter *self = (LogWriter *) s;
208
if (self->fd_watch.handler_out == NULL && self->fd_watch.handler_in == NULL)
210
msg_debug("POLLERR occurred while idle",
211
evt_tag_int("fd", log_proto_get_fd(self->proto)),
213
log_writer_broken(self, NC_WRITE_ERROR);
218
/* in case we have an error state but we also asked for read/write
219
* polling, the error should be handled by the I/O callback. But we
220
* need not call that explicitly as ivykis does that for us. */
222
log_writer_update_watches(self);
226
log_writer_io_check_eof(gpointer s)
228
LogWriter *self = (LogWriter *) s;
230
msg_error("EOF occurred while idle",
231
evt_tag_int("fd", log_proto_get_fd(self->proto)),
233
log_writer_broken(self, NC_CLOSE);
237
log_writer_error_suspend_elapsed(gpointer s)
239
LogWriter *self = (LogWriter *) s;
241
self->suspended = FALSE;
242
msg_notice("Error suspend timeout has elapsed, attempting to write again",
243
evt_tag_int("fd", log_proto_get_fd(self->proto)),
245
log_writer_update_watches(self);
249
log_writer_update_fd_callbacks(LogWriter *self, GIOCondition cond)
251
main_loop_assert_main_thread();
252
if (self->pollable_state > 0)
254
if (self->flags & LW_DETECT_EOF && (cond & G_IO_IN) == 0 && (cond & G_IO_OUT))
256
/* if output is enabled, and we're in DETECT_EOF mode, and input is
257
* not needed by the log protocol, install the eof check callback to
258
* destroy the connection if an EOF is received. */
260
iv_fd_set_handler_in(&self->fd_watch, log_writer_io_check_eof);
262
else if (cond & G_IO_IN)
264
/* in case the protocol requested G_IO_IN, it means that it needs to
265
* invoke read in the flush code, so just install the flush_output
266
* handler for input */
268
iv_fd_set_handler_in(&self->fd_watch, log_writer_io_flush_output);
272
/* otherwise we're not interested in input */
273
iv_fd_set_handler_in(&self->fd_watch, NULL);
276
iv_fd_set_handler_out(&self->fd_watch, log_writer_io_flush_output);
278
iv_fd_set_handler_out(&self->fd_watch, NULL);
280
iv_fd_set_handler_err(&self->fd_watch, log_writer_io_error);
284
/* fd is not pollable, assume it is always writable */
287
if (!iv_task_registered(&self->immed_io_task))
288
iv_task_register(&self->immed_io_task);
290
else if (iv_task_registered(&self->immed_io_task))
292
iv_task_unregister(&self->immed_io_task);
298
log_writer_arm_suspend_timer(LogWriter *self, void (*handler)(void *), gint timeout_msec)
300
if (iv_timer_registered(&self->suspend_timer))
301
iv_timer_unregister(&self->suspend_timer);
303
self->suspend_timer.handler = handler;
304
self->suspend_timer.expires = iv_now;
305
timespec_add_msec(&self->suspend_timer.expires, timeout_msec);
306
iv_timer_register(&self->suspend_timer);
310
log_writer_queue_filled(gpointer s)
312
LogWriter *self = (LogWriter *) s;
314
main_loop_assert_main_thread();
317
* NOTE: This theory is somewhat questionable, e.g. I'm not 100% sure it
318
* is the right scenario, but the race was closed. So take this with a
321
* The queue_filled callback is running in the main thread. Because of the
322
* possible delay caused by iv_event_post() the callback might be
323
* delivered event after stop_watches() has been called.
325
* - log_writer_schedule_update_watches() is called by the reader
326
* thread, which calls iv_event_post()
327
* - the main thread calls stop_watches() in work_perform
328
* - the event is delivered in the main thread
330
* But since stop/start watches always run in the main thread and we do
331
* too, we can check if this is the case. A LogWriter without watches
332
* running is busy writing out data to the destination, e.g. a
333
* start_watches is to be expected once log_writer_work_finished() is run
334
* at the end of the deferred work, executed by the I/O threads.
336
if (self->watches_running)
337
log_writer_update_watches((LogWriter *) s);
340
/* NOTE: runs in the source thread */
342
log_writer_schedule_update_watches(LogWriter *self)
344
iv_event_post(&self->queue_filled);
348
log_writer_suspend(LogWriter *self)
350
/* flush code indicates that we need to suspend our writing activities
351
* until time_reopen elapses */
353
log_writer_arm_suspend_timer(self, log_writer_error_suspend_elapsed, self->options->time_reopen * 1e3);
354
self->suspended = TRUE;
358
log_writer_update_watches(LogWriter *self)
361
GIOCondition cond = 0;
362
gboolean partial_batch;
363
gint timeout_msec = 0;
365
main_loop_assert_main_thread();
367
/* NOTE: we either start the suspend_timer or enable the fd_watch. The two MUST not happen at the same time. */
369
if (log_proto_prepare(self->proto, &fd, &cond) ||
370
self->flush_waiting_for_timeout ||
371
log_queue_check_items(self->queue, self->options->flush_lines, &partial_batch, &timeout_msec,
372
(LogQueuePushNotifyFunc) log_writer_schedule_update_watches, self, NULL))
374
/* flush_lines number of element is already available and throttle would permit us to send. */
375
log_writer_update_fd_callbacks(self, cond);
377
else if (partial_batch || timeout_msec)
379
/* few elements are available, but less than flush_lines, we need to start a timer to initiate a flush */
381
log_writer_update_fd_callbacks(self, 0);
382
self->flush_waiting_for_timeout = TRUE;
383
log_writer_arm_suspend_timer(self, (void (*)(void *)) log_writer_update_watches, timeout_msec ? timeout_msec : self->options->flush_timeout);
387
/* no elements or no throttle space, wait for a wakeup by the queue
388
* when the required number of items are added. see the
389
* log_queue_check_items and its parallel_push argument above
391
log_writer_update_fd_callbacks(self, 0);
396
is_file_regular(gint fd)
400
if (fstat(fd, &st) >= 0)
402
return S_ISREG(st.st_mode);
405
/* if stat fails, that's interesting, but we should probably poll
406
* it, hopefully that's less likely to cause spinning */
412
log_writer_start_watches(LogWriter *self)
417
if (!self->watches_running)
419
log_proto_prepare(self->proto, &fd, &cond);
421
if (self->pollable_state < 0)
423
if (is_file_regular(fd))
424
self->pollable_state = 0;
180
if (num_elements && log_writer_throttling(self->writer))
182
/* we are unable to send because of throttling, make sure that we
183
* wake up when the rate limits lets us send at least 1 message */
184
*timeout = (1000 / self->writer->options->throttle) + 1;
185
msg_debug("Throttling output",
186
evt_tag_int("wait", *timeout),
191
if (self->writer->flags & LW_DETECT_EOF && (self->pollfd.events & G_IO_IN) == 0)
193
self->pollfd.events |= G_IO_HUP | G_IO_IN;
194
self->input_means_connection_broken = TRUE;
198
self->input_means_connection_broken = FALSE;
201
self->flush_waiting_for_timeout = FALSE;
203
if ((self->pollfd.events & G_IO_OUT) && (self->writer->flags & LW_ALWAYS_WRITABLE))
205
self->pollfd.revents = G_IO_OUT;
212
log_writer_fd_check(GSource *source)
214
LogWriterWatch *self = (LogWriterWatch *) source;
215
gint64 num_elements = log_queue_get_length(self->writer->queue);
217
if (self->error_suspend)
220
if (num_elements && !log_writer_throttling(self->writer))
222
/* we have data to flush */
223
if (self->flush_waiting_for_timeout)
227
/* check if timeout elapsed */
228
g_source_get_current_time(source, &tv);
229
if (!(self->flush_target.tv_sec <= tv.tv_sec || (self->flush_target.tv_sec == tv.tv_sec && self->flush_target.tv_usec <= tv.tv_usec)))
231
if ((self->writer->flags & LW_ALWAYS_WRITABLE))
235
return !!(self->pollfd.revents & (G_IO_OUT | G_IO_ERR | G_IO_HUP | G_IO_IN));
239
log_writer_fd_dispatch(GSource *source,
240
GSourceFunc callback,
243
LogWriterWatch *self = (LogWriterWatch *) source;
244
gint64 num_elements = log_queue_get_length(self->writer->queue);
246
if (self->pollfd.revents & (G_IO_HUP | G_IO_IN) && self->input_means_connection_broken)
248
msg_error("EOF occurred while idle",
249
evt_tag_int("fd", log_proto_get_fd(self->proto)),
251
log_writer_broken(self->writer, NC_CLOSE);
254
else if (self->pollfd.revents & (G_IO_ERR) && num_elements == 0)
256
msg_error("POLLERR occurred while idle",
257
evt_tag_int("fd", log_proto_get_fd(self->proto)),
259
log_writer_broken(self->writer, NC_WRITE_ERROR);
261
else if (num_elements)
263
if (!log_writer_flush(self->writer, FALSE))
265
self->error_suspend = TRUE;
266
g_source_get_current_time(source, &self->error_suspend_target);
267
g_time_val_add(&self->error_suspend_target, self->writer->options->time_reopen * 1e6);
269
log_writer_broken(self->writer, NC_WRITE_ERROR);
271
if (self->writer->source == (GSource *) self)
273
msg_notice("Suspending write operation because of an I/O error",
274
evt_tag_int("fd", log_proto_get_fd(self->proto)),
275
evt_tag_int("time_reopen", self->writer->options->time_reopen),
285
log_writer_fd_finalize(GSource *source)
287
LogWriterWatch *self = (LogWriterWatch *) source;
289
log_proto_free(self->proto);
290
log_pipe_unref(&self->writer->super);
293
GSourceFuncs log_writer_source_funcs =
295
log_writer_fd_prepare,
297
log_writer_fd_dispatch,
298
log_writer_fd_finalize
302
log_writer_watch_new(LogWriter *writer, LogProto *proto)
304
LogWriterWatch *self = (LogWriterWatch *) g_source_new(&log_writer_source_funcs, sizeof(LogWriterWatch));
306
self->writer = writer;
308
log_pipe_ref(&self->writer->super);
309
g_source_set_priority(&self->super, LOG_PRIORITY_WRITER);
311
if ((writer->flags & LW_ALWAYS_WRITABLE) == 0)
312
g_source_add_poll(&self->super, &self->pollfd);
317
log_writer_throttling(LogWriter *self)
319
return self->options->throttle > 0 && self->throttle_buckets == 0;
426
self->pollable_state = iv_fd_pollable(fd);
429
if (self->pollable_state)
431
self->fd_watch.fd = fd;
432
iv_fd_register(&self->fd_watch);
435
log_writer_update_watches(self);
436
self->watches_running = TRUE;
441
log_writer_stop_watches(LogWriter *self)
443
if (self->watches_running)
445
if (iv_timer_registered(&self->suspend_timer))
446
iv_timer_unregister(&self->suspend_timer);
447
if (iv_fd_registered(&self->fd_watch))
448
iv_fd_unregister(&self->fd_watch);
449
if (iv_task_registered(&self->immed_io_task))
450
iv_task_unregister(&self->immed_io_task);
452
log_queue_reset_parallel_push(self->queue);
454
self->watches_running = FALSE;
458
/* function called using main_loop_call() in case the suppress timer needs
461
log_writer_perform_suppress_timer_update(LogWriter *self)
463
main_loop_assert_main_thread();
465
if (iv_timer_registered(&self->suppress_timer))
466
iv_timer_unregister(&self->suppress_timer);
467
g_static_mutex_lock(&self->suppress_lock);
468
self->suppress_timer.expires = self->suppress_timer_expires;
469
self->suppress_timer_updated = TRUE;
470
g_static_mutex_unlock(&self->suppress_lock);
471
if (self->suppress_timer.expires.tv_sec > 0)
472
iv_timer_register(&self->suppress_timer);
473
log_pipe_unref(&self->super);
477
* Update the suppress timer in a deferred manner, possibly batching the
478
* results of multiple updates to the suppress timer. This is necessary as
479
* suppress timer updates must run in the main thread, and updating it every
480
* time a new message comes in would cause enormous latency in the fast
481
* path. By collecting multiple updates
483
* msec == 0 means to turn off the suppress timer
484
* msec > 0 to enable the timer with the specified timeout
486
* NOTE: suppress_lock must be held.
489
log_writer_update_suppress_timer(LogWriter *self, glong sec)
492
struct timespec next_expires;
496
/* we deliberately use nsec == 0 in order to increase the likelyhood that
497
* we target the same second, in case only a fraction of a second has
498
* passed between two updates. */
501
next_expires.tv_nsec = 0;
502
next_expires.tv_sec = iv_now.tv_sec + sec;
506
next_expires.tv_sec = 0;
507
next_expires.tv_nsec = 0;
509
/* last update was finished, we need to invoke the updater again */
510
invoke = ((next_expires.tv_sec != self->suppress_timer_expires.tv_sec) || (next_expires.tv_nsec != self->suppress_timer_expires.tv_nsec)) && self->suppress_timer_updated;
511
self->suppress_timer_updated = FALSE;
515
self->suppress_timer_expires = next_expires;
516
g_static_mutex_unlock(&self->suppress_lock);
517
log_pipe_ref(&self->super);
518
main_loop_call((void *(*)(void *)) log_writer_perform_suppress_timer_update, self, FALSE);
519
g_static_mutex_lock(&self->suppress_lock);
525
* NOTE: suppress_lock must be held.
323
528
log_writer_last_msg_release(LogWriter *self)
325
if (self->last_msg_timerid)
326
g_source_remove(self->last_msg_timerid);
530
log_writer_update_suppress_timer(self, 0);
328
531
if (self->last_msg)
329
532
log_msg_unref(self->last_msg);
331
534
self->last_msg = NULL;
332
535
self->last_msg_count = 0;
333
self->last_msg_timerid = 0;
539
* NOTE: suppress_lock must be held.
337
542
log_writer_last_msg_flush(LogWriter *self)
810
1130
log_writer_free(LogPipe *s)
812
1132
LogWriter *self = (LogWriter *) s;
1135
log_proto_free(self->proto);
1137
if (self->line_buffer)
1138
g_string_free(self->line_buffer, TRUE);
814
1139
if (self->queue)
815
log_queue_free(self->queue);
816
log_writer_last_msg_release(self);
1140
log_queue_unref(self->queue);
1142
log_msg_unref(self->last_msg);
817
1143
g_free(self->stats_id);
818
1144
g_free(self->stats_instance);
1145
g_static_mutex_free(&self->suppress_lock);
1146
g_static_mutex_free(&self->pending_proto_lock);
1147
g_cond_free(self->pending_proto_cond);
819
1149
log_pipe_free_method(s);
1152
/* FIXME: this is inherently racy */
1154
log_writer_has_pending_writes(LogWriter *self)
1156
return log_queue_get_length(self->queue) > 0 || !self->watches_running;
1160
log_writer_opened(LogWriter *self)
1162
return self->proto != NULL;
1165
/* run in the main thread in reaction to a log_writer_reopen to change
1166
* the destination LogProto instance. It needs to be ran in the main
1167
* thread as it reregisters the watches associated with the main
1170
log_writer_reopen_deferred(gpointer s)
1172
gpointer *args = (gpointer *) s;
1173
LogWriter *self = args[0];
1174
LogProto *proto = args[1];
1176
init_sequence_number(&self->seq_num);
1178
if (self->io_job.working)
1180
/* NOTE: proto can be NULL */
1181
self->pending_proto = proto;
1182
self->pending_proto_present = TRUE;
1186
log_writer_stop_watches(self);
1189
log_proto_free(self->proto);
1191
self->proto = proto;
1194
log_writer_start_watches(self);
1198
* This function can be called from any threads, from the main thread
1199
* as well as I/O worker threads. It takes care about going to the
1200
* main thread to actually switch LogProto under this writer.
1202
* The writer may still be operating, (e.g. log_pipe_deinit/init is
1205
* In case we're running in a non-main thread, then by the time this
1206
* function returns, the reopen has finished. In case it is called
1207
* from the main thread, this function may defer updating self->proto
1208
* until the worker thread has finished. The reason for this
1211
* - if LogWriter is busy, then updating the LogProto instance is
1212
* deferred to log_writer_work_finished(), but that runs in the
1215
* - normally, even this deferred update is waited for, but in case
1216
* we're in the main thread, we can't block.
1218
* This situation could probably be improved, maybe the synchonous
1219
* return of log_writer_reopen() is not needed by call sites, but I
1220
* was not sure, and right before release I didn't want to take the
823
1224
log_writer_reopen(LogPipe *s, LogProto *proto)
825
1226
LogWriter *self = (LogWriter *) s;
827
/* old fd is freed by the source */
830
g_source_destroy(self->source);
831
g_source_unref(self->source);
837
self->source = log_writer_watch_new(self, proto);
838
g_source_attach(self->source, NULL);
840
init_sequence_number(&self->seq_num);
1227
gpointer args[] = { s, proto };
1229
main_loop_call((MainLoopTaskFunc) log_writer_reopen_deferred, args, TRUE);
1231
if (!main_loop_is_main_thread())
1233
g_static_mutex_lock(&self->pending_proto_lock);
1234
while (self->pending_proto_present)
1236
g_cond_wait(self->pending_proto_cond, g_static_mutex_get_mutex(&self->pending_proto_lock));
1238
g_static_mutex_unlock(&self->pending_proto_lock);