2
* Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary
3
* Copyright (c) 1998-2010 Balázs Scheidler
5
* This program is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 as published
7
* by the Free Software Foundation, or (at your option) any later version.
9
* This library is distributed in the hope that it will be useful,
10
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12
* Lesser General Public License for more details.
14
* You should have received a copy of the GNU Lesser General Public
15
* License along with this library; if not, write to the Free Software
16
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
* As an additional exemption you are allowed to compile & link against the
19
* OpenSSL libraries as published by the OpenSSL project. See the file
20
* COPYING for details.
28
#include "logwriter.h"
30
#include "tlstransport.h"
39
#include <sys/types.h>
40
#include <sys/socket.h>
41
#include <netinet/in.h>
43
#include <arpa/inet.h>
47
#if ENABLE_TCP_WRAPPER
49
int allow_severity = 0;
50
int deny_severity = 0;
55
typedef struct _AFSocketSourceConnection
58
struct _AFSocketSourceDriver *owner;
62
} AFSocketSourceConnection;
64
static void afsocket_sd_close_connection(AFSocketSourceDriver *self, AFSocketSourceConnection *sc);
67
afsocket_setup_socket(gint fd, SocketOptions *sock_options, AFSocketDirection dir)
69
if (dir & AFSOCKET_DIR_RECV)
71
if (sock_options->rcvbuf)
72
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sock_options->rcvbuf, sizeof(sock_options->rcvbuf));
74
if (dir & AFSOCKET_DIR_SEND)
76
if (sock_options->sndbuf)
77
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sock_options->sndbuf, sizeof(sock_options->sndbuf));
78
if (sock_options->broadcast)
79
setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &sock_options->broadcast, sizeof(sock_options->broadcast));
81
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &sock_options->keepalive, sizeof(sock_options->keepalive));
86
afsocket_open_socket(GSockAddr *bind_addr, int stream_or_dgram, int *fd)
91
sock = socket(bind_addr->sa.sa_family, SOCK_STREAM, 0);
93
sock = socket(bind_addr->sa.sa_family, SOCK_DGRAM, 0);
99
g_fd_set_nonblock(sock, TRUE);
100
g_fd_set_cloexec(sock, TRUE);
101
saved_caps = g_process_cap_save();
102
g_process_cap_modify(CAP_NET_BIND_SERVICE, TRUE);
103
g_process_cap_modify(CAP_DAC_OVERRIDE, TRUE);
104
if (g_bind(sock, bind_addr) != G_IO_STATUS_NORMAL)
108
g_process_cap_restore(saved_caps);
109
msg_error("Error binding socket",
110
evt_tag_str("addr", g_sockaddr_format(bind_addr, buf, sizeof(buf), GSA_FULL)),
111
evt_tag_errno(EVT_TAG_OSERROR, errno),
116
g_process_cap_restore(saved_caps);
123
msg_error("Error creating socket",
124
evt_tag_errno(EVT_TAG_OSERROR, errno),
131
afsocket_sc_stats_source(AFSocketSourceConnection *self)
135
if ((self->owner->flags & AFSOCKET_SYSLOG_PROTOCOL) == 0)
137
switch (self->owner->bind_addr->sa.sa_family)
140
source = !!(self->owner->flags & AFSOCKET_STREAM) ? SCS_UNIX_STREAM : SCS_UNIX_DGRAM;
143
source = !!(self->owner->flags & AFSOCKET_STREAM) ? SCS_TCP : SCS_UDP;
147
source = !!(self->owner->flags & AFSOCKET_STREAM) ? SCS_TCP6 : SCS_UDP6;
151
g_assert_not_reached();
163
afsocket_sc_stats_instance(AFSocketSourceConnection *self)
165
static gchar buf[256];
167
if (!self->peer_addr)
171
if ((self->owner->flags & AFSOCKET_SYSLOG_PROTOCOL) == 0)
173
g_sockaddr_format(self->peer_addr, buf, sizeof(buf), GSA_ADDRESS_ONLY);
177
gchar peer_addr[MAX_SOCKADDR_STRING];
179
g_sockaddr_format(self->peer_addr, peer_addr, sizeof(peer_addr), GSA_ADDRESS_ONLY);
180
g_snprintf(buf, sizeof(buf), "%s,%s", self->owner->transport, peer_addr);
186
afsocket_sc_init(LogPipe *s)
188
AFSocketSourceConnection *self = (AFSocketSourceConnection *) s;
190
LogTransport *transport;
193
read_flags = ((self->owner->flags & AFSOCKET_DGRAM) ? LTF_RECV : 0);
197
if (self->owner->tls_context)
199
TLSSession *tls_session = tls_context_setup_session(self->owner->tls_context);
202
transport = log_transport_tls_new(tls_session, self->sock, read_flags);
206
transport = log_transport_plain_new(self->sock, read_flags);
208
if ((self->owner->flags & AFSOCKET_SYSLOG_PROTOCOL) == 0)
212
if (self->owner->flags & AFSOCKET_DGRAM)
213
proto = log_proto_dgram_server_new(transport, self->owner->reader_options.msg_size, 0);
214
else if (self->owner->reader_options.padding)
215
proto = log_proto_record_server_new(transport, self->owner->reader_options.padding, 0);
217
proto = log_proto_text_server_new(transport, self->owner->reader_options.msg_size, 0);
221
if (self->owner->flags & AFSOCKET_DGRAM)
224
proto = log_proto_dgram_server_new(transport, self->owner->reader_options.msg_size, 0);
228
/* framed protocol */
229
proto = log_proto_framed_server_new(transport, self->owner->reader_options.msg_size);
233
self->reader = log_reader_new(proto);
235
log_reader_set_options(self->reader, s, &self->owner->reader_options, 1, afsocket_sc_stats_source(self), self->owner->super.super.id, afsocket_sc_stats_instance(self));
236
log_reader_set_peer_addr(self->reader, self->peer_addr);
237
log_pipe_append(self->reader, s);
238
if (log_pipe_init(self->reader, NULL))
244
log_pipe_unref(self->reader);
251
afsocket_sc_deinit(LogPipe *s)
253
AFSocketSourceConnection *self = (AFSocketSourceConnection *) s;
255
log_pipe_unref(&self->owner->super.super.super);
258
log_pipe_deinit(self->reader);
263
afsocket_sc_notify(LogPipe *s, LogPipe *sender, gint notify_code, gpointer user_data)
265
AFSocketSourceConnection *self = (AFSocketSourceConnection *) s;
272
if (self->owner->flags & AFSOCKET_STREAM)
273
afsocket_sd_close_connection(self->owner, self);
280
afsocket_sc_set_owner(AFSocketSourceConnection *self, AFSocketSourceDriver *owner)
284
log_pipe_unref(&self->owner->super.super.super);
287
log_pipe_ref(&owner->super.super.super);
289
log_pipe_append(&self->super, &owner->super.super.super);
294
This should be called by log_reader_free -> log_pipe_unref
295
because this is the control pipe of the reader
298
afsocket_sc_free(LogPipe *s)
300
AFSocketSourceConnection *self = (AFSocketSourceConnection *) s;
301
g_sockaddr_unref(self->peer_addr);
302
log_pipe_free_method(s);
305
AFSocketSourceConnection *
306
afsocket_sc_new(AFSocketSourceDriver *owner, GSockAddr *peer_addr, int fd)
308
AFSocketSourceConnection *self = g_new0(AFSocketSourceConnection, 1);
310
log_pipe_init_instance(&self->super);
311
self->super.init = afsocket_sc_init;
312
self->super.deinit = afsocket_sc_deinit;
313
self->super.notify = afsocket_sc_notify;
314
self->super.free_fn = afsocket_sc_free;
315
log_pipe_ref(&owner->super.super.super);
319
self->peer_addr = g_sockaddr_ref(peer_addr);
325
afsocket_sd_set_transport(LogDriver *s, const gchar *transport)
327
AFSocketSourceDriver *self = (AFSocketSourceDriver *) s;
330
g_free(self->transport);
331
self->transport = g_strdup(transport);
335
afsocket_sd_add_connection(AFSocketSourceDriver *self, AFSocketSourceConnection *connection)
337
self->connections = g_list_prepend(self->connections,connection);
341
afsocket_sd_remove_and_kill_connection(AFSocketSourceDriver *self, AFSocketSourceConnection *connection)
343
self->connections = g_list_remove(self->connections, connection);
345
log_pipe_deinit(&connection->super);
347
/* Remove the circular reference between the connection and its
348
* reader (through the connection->reader and reader->control
349
* pointers these have a circular references).
351
log_pipe_unref(connection->reader);
352
connection->reader = NULL;
354
log_pipe_unref(&connection->super);
358
afsocket_sd_kill_connection_list(GList *list)
362
for (l = list; l; l = next)
364
AFSocketSourceConnection *connection = (AFSocketSourceConnection *) l->data;
367
afsocket_sd_remove_and_kill_connection(connection->owner, connection);
372
afsocket_sd_set_keep_alive(LogDriver *s, gint enable)
374
AFSocketSourceDriver *self = (AFSocketSourceDriver *) s;
377
self->flags |= AFSOCKET_KEEP_ALIVE;
379
self->flags &= ~AFSOCKET_KEEP_ALIVE;
383
afsocket_sd_set_max_connections(LogDriver *s, gint max_connections)
385
AFSocketSourceDriver *self = (AFSocketSourceDriver *) s;
387
self->max_connections = max_connections;
392
afsocket_sd_set_tls_context(LogDriver *s, TLSContext *tls_context)
394
AFSocketSourceDriver *self = (AFSocketSourceDriver *) s;
396
self->tls_context = tls_context;
400
static inline gchar *
401
afsocket_sd_format_persist_name(AFSocketSourceDriver *self, gboolean listener_name)
403
static gchar persist_name[128];
406
g_snprintf(persist_name, sizeof(persist_name),
407
listener_name ? "afsocket_sd_listen_fd(%s,%s)" : "afsocket_sd_connections(%s,%s)",
408
!!(self->flags & AFSOCKET_STREAM) ? "stream" : "dgram",
409
g_sockaddr_format(self->bind_addr, buf, sizeof(buf), GSA_FULL));
414
afsocket_sd_process_connection(AFSocketSourceDriver *self, GSockAddr *client_addr, GSockAddr *local_addr, gint fd)
416
gchar buf[MAX_SOCKADDR_STRING], buf2[MAX_SOCKADDR_STRING];
417
#if ENABLE_TCP_WRAPPER
418
if (client_addr && (client_addr->sa.sa_family == AF_INET
420
|| client_addr->sa.sa_family == AF_INET6
424
struct request_info req;
426
request_init(&req, RQ_DAEMON, "syslog-ng", RQ_FILE, fd, 0);
428
if (hosts_access(&req) == 0)
431
msg_error("Syslog connection rejected by tcpd",
432
evt_tag_str("client", g_sockaddr_format(client_addr, buf, sizeof(buf), GSA_FULL)),
433
evt_tag_str("local", g_sockaddr_format(local_addr, buf2, sizeof(buf2), GSA_FULL)),
441
if (self->num_connections >= self->max_connections)
443
msg_error("Number of allowed concurrent connections reached, rejecting connection",
444
evt_tag_str("client", g_sockaddr_format(client_addr, buf, sizeof(buf), GSA_FULL)),
445
evt_tag_str("local", g_sockaddr_format(local_addr, buf2, sizeof(buf2), GSA_FULL)),
446
evt_tag_int("max", self->max_connections),
452
AFSocketSourceConnection *conn;
454
conn = afsocket_sc_new(self, client_addr, fd);
455
if (log_pipe_init(&conn->super, NULL))
457
afsocket_sd_add_connection(self,conn);
458
self->num_connections++;
459
log_pipe_append(&conn->super, &self->super.super.super);
463
log_pipe_unref(&conn->super);
470
#define MAX_ACCEPTS_AT_A_TIME 30
473
afsocket_sd_accept(gpointer s)
475
AFSocketSourceDriver *self = (AFSocketSourceDriver *) s;
476
GSockAddr *peer_addr;
477
gchar buf1[256], buf2[256];
482
while (accepts < MAX_ACCEPTS_AT_A_TIME)
486
status = g_accept(self->fd, &new_fd, &peer_addr);
487
if (status == G_IO_STATUS_AGAIN)
489
/* no more connections to accept */
492
else if (status != G_IO_STATUS_NORMAL)
494
msg_error("Error accepting new connection",
495
evt_tag_errno(EVT_TAG_OSERROR, errno),
499
if (self->setup_socket && !self->setup_socket(self, new_fd))
505
g_fd_set_nonblock(new_fd, TRUE);
506
g_fd_set_cloexec(new_fd, TRUE);
508
res = afsocket_sd_process_connection(self, peer_addr, self->bind_addr, new_fd);
512
if (peer_addr->sa.sa_family != AF_UNIX)
513
msg_notice("Syslog connection accepted",
514
evt_tag_int("fd", new_fd),
515
evt_tag_str("client", g_sockaddr_format(peer_addr, buf1, sizeof(buf1), GSA_FULL)),
516
evt_tag_str("local", g_sockaddr_format(self->bind_addr, buf2, sizeof(buf2), GSA_FULL)),
519
msg_verbose("Syslog connection accepted",
520
evt_tag_int("fd", new_fd),
521
evt_tag_str("client", g_sockaddr_format(peer_addr, buf1, sizeof(buf1), GSA_FULL)),
522
evt_tag_str("local", g_sockaddr_format(self->bind_addr, buf2, sizeof(buf2), GSA_FULL)),
530
g_sockaddr_unref(peer_addr);
537
afsocket_sd_close_connection(AFSocketSourceDriver *self, AFSocketSourceConnection *sc)
539
gchar buf1[MAX_SOCKADDR_STRING], buf2[MAX_SOCKADDR_STRING];
541
if (sc->peer_addr->sa.sa_family != AF_UNIX)
542
msg_notice("Syslog connection closed",
543
evt_tag_int("fd", sc->sock),
544
evt_tag_str("client", g_sockaddr_format(sc->peer_addr, buf1, sizeof(buf1), GSA_FULL)),
545
evt_tag_str("local", g_sockaddr_format(self->bind_addr, buf2, sizeof(buf2), GSA_FULL)),
548
msg_verbose("Syslog connection closed",
549
evt_tag_int("fd", sc->sock),
550
evt_tag_str("client", g_sockaddr_format(sc->peer_addr, buf1, sizeof(buf1), GSA_FULL)),
551
evt_tag_str("local", g_sockaddr_format(self->bind_addr, buf2, sizeof(buf2), GSA_FULL)),
553
log_pipe_deinit(&sc->super);
554
afsocket_sd_remove_and_kill_connection(self, sc);
555
self->num_connections--;
559
afsocket_sd_start_watches(AFSocketSourceDriver *self)
561
IV_FD_INIT(&self->listen_fd);
562
self->listen_fd.fd = self->fd;
563
self->listen_fd.cookie = self;
564
self->listen_fd.handler_in = afsocket_sd_accept;
565
iv_fd_register(&self->listen_fd);
569
afsocket_sd_stop_watches(AFSocketSourceDriver *self)
571
if (iv_fd_registered (&self->listen_fd))
572
iv_fd_unregister(&self->listen_fd);
576
afsocket_sd_init(LogPipe *s)
578
AFSocketSourceDriver *self = (AFSocketSourceDriver *) s;
580
gboolean res = FALSE;
581
GlobalConfig *cfg = log_pipe_get_config(s);
583
if (!log_src_driver_init_method(s))
586
if (!afsocket_sd_apply_transport(self))
589
g_assert(self->transport);
590
g_assert(self->bind_addr);
592
if ((self->flags & (AFSOCKET_STREAM + AFSOCKET_WNDSIZE_INITED)) == AFSOCKET_STREAM)
594
/* distribute the window evenly between each of our possible
595
* connections. This is quite pessimistic and can result in very low
596
* window sizes. Increase that but warn the user at the same time
599
self->reader_options.super.init_window_size /= self->max_connections;
600
if (self->reader_options.super.init_window_size < 100)
602
msg_warning("WARNING: window sizing for tcp sources were changed in syslog-ng 3.3, the configuration value was divided by the value of max-connections(). The result was too small, clamping to 100 entries. Ensure you have a proper log_fifo_size setting to avoid message loss.",
603
evt_tag_int("orig_log_iw_size", self->reader_options.super.init_window_size),
604
evt_tag_int("new_log_iw_size", 100),
605
evt_tag_int("min_log_fifo_size", 100 * self->max_connections),
607
self->reader_options.super.init_window_size = 100;
609
self->flags |= AFSOCKET_WNDSIZE_INITED;
611
log_reader_options_init(&self->reader_options, cfg, self->super.super.group);
613
/* fetch persistent connections first */
614
if ((self->flags & AFSOCKET_KEEP_ALIVE))
618
self->connections = cfg_persist_config_fetch(cfg, afsocket_sd_format_persist_name(self, FALSE));
620
for (p = self->connections; p; p = p->next)
622
afsocket_sc_set_owner((AFSocketSourceConnection *) p->data, self);
623
log_pipe_init((LogPipe *) p->data, NULL);
627
/* ok, we have connection list, check if we need to open a listener */
629
if (self->flags & AFSOCKET_STREAM)
631
if (self->flags & AFSOCKET_KEEP_ALIVE)
633
/* NOTE: this assumes that fd 0 will never be used for listening fds,
634
* main.c opens fd 0 so this assumption can hold */
635
sock = GPOINTER_TO_UINT(cfg_persist_config_fetch(cfg, afsocket_sd_format_persist_name(self, TRUE))) - 1;
640
if (!afsocket_sd_acquire_socket(self, &sock))
641
return self->super.super.optional;
642
if (sock == -1 && !afsocket_open_socket(self->bind_addr, !!(self->flags & AFSOCKET_STREAM), &sock))
643
return self->super.super.optional;
646
/* set up listening source */
647
if (listen(sock, self->listen_backlog) < 0)
649
msg_error("Error during listen()",
650
evt_tag_errno(EVT_TAG_OSERROR, errno),
656
if (self->setup_socket && !self->setup_socket(self, sock))
663
afsocket_sd_start_watches(self);
668
if (!self->connections)
670
if (!afsocket_sd_acquire_socket(self, &sock))
671
return self->super.super.optional;
672
if (sock == -1 && !afsocket_open_socket(self->bind_addr, !!(self->flags & AFSOCKET_STREAM), &sock))
673
return self->super.super.optional;
677
if (!self->setup_socket(self, sock))
683
/* we either have self->connections != NULL, or sock contains a new fd */
684
if (self->connections || afsocket_sd_process_connection(self, NULL, self->bind_addr, sock))
691
afsocket_sd_close_fd(gpointer value)
693
gint fd = GPOINTER_TO_UINT(value) - 1;
698
afsocket_sd_deinit(LogPipe *s)
700
AFSocketSourceDriver *self = (AFSocketSourceDriver *) s;
701
GlobalConfig *cfg = log_pipe_get_config(s);
703
if ((self->flags & AFSOCKET_KEEP_ALIVE) == 0 || !cfg->persist)
705
afsocket_sd_kill_connection_list(self->connections);
711
/* for AFSOCKET_STREAM source drivers this is a list, for
712
* AFSOCKET_DGRAM this is a single connection */
714
for (p = self->connections; p; p = p->next)
716
log_pipe_deinit((LogPipe *) p->data);
718
cfg_persist_config_add(cfg, afsocket_sd_format_persist_name(self, FALSE), self->connections, (GDestroyNotify) afsocket_sd_kill_connection_list, FALSE);
720
self->connections = NULL;
722
if (self->flags & AFSOCKET_STREAM)
724
afsocket_sd_stop_watches(self);
725
if ((self->flags & AFSOCKET_KEEP_ALIVE) == 0)
727
msg_verbose("Closing listener fd",
728
evt_tag_int("fd", self->fd),
734
/* NOTE: the fd is incremented by one when added to persistent config
735
* as persist config cannot store NULL */
737
cfg_persist_config_add(cfg, afsocket_sd_format_persist_name(self, TRUE), GUINT_TO_POINTER(self->fd + 1), afsocket_sd_close_fd, FALSE);
740
else if (self->flags & AFSOCKET_DGRAM)
742
/* we don't need to close the listening fd here as we have a
743
* single connection which will close it */
748
if (!log_src_driver_deinit_method(s))
755
afsocket_sd_notify(LogPipe *s, LogPipe *sender, gint notify_code, gpointer user_data)
762
g_assert_not_reached();
769
afsocket_sd_setup_socket(AFSocketSourceDriver *self, gint fd)
771
return afsocket_setup_socket(fd, self->sock_options_ptr, AFSOCKET_DIR_RECV);
775
afsocket_sd_free(LogPipe *s)
777
AFSocketSourceDriver *self = (AFSocketSourceDriver *) s;
779
log_reader_options_destroy(&self->reader_options);
780
g_sockaddr_unref(self->bind_addr);
781
self->bind_addr = NULL;
782
g_free(self->transport);
784
log_src_driver_free(s);
788
afsocket_sd_init_instance(AFSocketSourceDriver *self, SocketOptions *sock_options, gint family, guint32 flags)
790
log_src_driver_init_instance(&self->super);
792
self->super.super.super.init = afsocket_sd_init;
793
self->super.super.super.deinit = afsocket_sd_deinit;
794
self->super.super.super.free_fn = afsocket_sd_free;
795
/* NULL behaves as if log_pipe_forward_msg was specified */
796
self->super.super.super.queue = NULL;
797
self->super.super.super.notify = afsocket_sd_notify;
798
self->sock_options_ptr = sock_options;
799
self->setup_socket = afsocket_sd_setup_socket;
800
self->address_family = family;
801
self->max_connections = 10;
802
self->listen_backlog = 255;
803
self->flags = flags | AFSOCKET_KEEP_ALIVE;
804
log_reader_options_defaults(&self->reader_options);
805
if (self->flags & AFSOCKET_STREAM)
806
self->reader_options.super.init_window_size = 1000;
808
if (self->flags & AFSOCKET_LOCAL)
810
static gboolean warned = FALSE;
812
self->reader_options.parse_options.flags |= LP_LOCAL;
813
if (configuration && configuration->version < 0x0302)
817
msg_warning("WARNING: the expected message format is being changed for unix-domain transports to improve "
818
"syslogd compatibity with syslog-ng 3.2. If you are using custom "
819
"applications which bypass the syslog() API, you might "
820
"need the 'expect-hostname' flag to get the old behaviour back", NULL);
826
self->reader_options.parse_options.flags &= ~LP_EXPECT_HOSTNAME;
829
if ((self->flags & AFSOCKET_SYSLOG_PROTOCOL))
831
self->reader_options.parse_options.flags |= LP_SYSLOG_PROTOCOL;
835
/* socket destinations */
838
afsocket_dd_set_transport(LogDriver *s, const gchar *transport)
840
AFSocketDestDriver *self = (AFSocketDestDriver *) s;
843
g_free(self->transport);
844
self->transport = g_strdup(transport);
849
afsocket_dd_set_tls_context(LogDriver *s, TLSContext *tls_context)
851
AFSocketDestDriver *self = (AFSocketDestDriver *) s;
853
self->tls_context = tls_context;
858
afsocket_dd_set_keep_alive(LogDriver *s, gint enable)
860
AFSocketDestDriver *self = (AFSocketDestDriver *) s;
863
self->flags |= AFSOCKET_KEEP_ALIVE;
865
self->flags &= ~AFSOCKET_KEEP_ALIVE;
870
afsocket_dd_format_persist_name(AFSocketDestDriver *self, gboolean qfile)
872
static gchar persist_name[128];
874
g_snprintf(persist_name, sizeof(persist_name),
875
qfile ? "afsocket_dd_qfile(%s,%s)" : "afsocket_dd_connection(%s,%s)",
876
!!(self->flags & AFSOCKET_STREAM) ? "stream" : "dgram",
883
afsocket_dd_stats_source(AFSocketDestDriver *self)
887
if ((self->flags & AFSOCKET_SYSLOG_PROTOCOL) == 0)
889
switch (self->dest_addr->sa.sa_family)
892
source = !!(self->flags & AFSOCKET_STREAM) ? SCS_UNIX_STREAM : SCS_UNIX_DGRAM;
895
source = !!(self->flags & AFSOCKET_STREAM) ? SCS_TCP : SCS_UDP;
899
source = !!(self->flags & AFSOCKET_STREAM) ? SCS_TCP6 : SCS_UDP6;
903
g_assert_not_reached();
915
afsocket_dd_stats_instance(AFSocketDestDriver *self)
917
if ((self->flags & AFSOCKET_SYSLOG_PROTOCOL) == 0)
919
return self->dest_name;
923
static gchar buf[256];
925
g_snprintf(buf, sizeof(buf), "%s,%s", self->transport, self->dest_name);
932
afsocket_dd_tls_verify_callback(gint ok, X509_STORE_CTX *ctx, gpointer user_data)
934
AFSocketDestDriver *self = (AFSocketDestDriver *) user_data;
936
if (ok && ctx->current_cert == ctx->cert && self->hostname && (self->tls_context->verify_mode & TVM_TRUSTED))
938
ok = tls_verify_certificate_name(ctx->cert, self->hostname);
945
static gboolean afsocket_dd_connected(AFSocketDestDriver *self);
946
static void afsocket_dd_reconnect(AFSocketDestDriver *self);
949
afsocket_dd_init_watches(AFSocketDestDriver *self)
951
IV_FD_INIT(&self->connect_fd);
952
self->connect_fd.cookie = self;
953
self->connect_fd.handler_out = (void (*)(void *)) afsocket_dd_connected;
955
IV_TIMER_INIT(&self->reconnect_timer);
956
self->reconnect_timer.cookie = self;
957
self->reconnect_timer.handler = (void (*)(void *)) afsocket_dd_reconnect;
961
afsocket_dd_start_watches(AFSocketDestDriver *self)
963
main_loop_assert_main_thread();
965
self->connect_fd.fd = self->fd;
966
iv_fd_register(&self->connect_fd);
970
afsocket_dd_stop_watches(AFSocketDestDriver *self)
972
main_loop_assert_main_thread();
974
if (iv_fd_registered(&self->connect_fd))
976
iv_fd_unregister(&self->connect_fd);
978
/* need to close the fd in this case as it wasn't established yet */
979
msg_verbose("Closing connecting fd",
980
evt_tag_int("fd", self->fd),
984
if (iv_timer_registered(&self->reconnect_timer))
985
iv_timer_unregister(&self->reconnect_timer);
989
afsocket_dd_start_reconnect_timer(AFSocketDestDriver *self)
991
main_loop_assert_main_thread();
993
if (iv_timer_registered(&self->reconnect_timer))
994
iv_timer_unregister(&self->reconnect_timer);
997
self->reconnect_timer.expires = iv_now;
998
timespec_add_msec(&self->reconnect_timer.expires, self->time_reopen * 1000);
999
iv_timer_register(&self->reconnect_timer);
1003
afsocket_dd_connected(AFSocketDestDriver *self)
1005
gchar buf1[256], buf2[256];
1007
socklen_t errorlen = sizeof(error);
1008
LogTransport *transport;
1010
guint32 transport_flags = 0;
1012
main_loop_assert_main_thread();
1014
if (iv_fd_registered(&self->connect_fd))
1015
iv_fd_unregister(&self->connect_fd);
1017
if (self->flags & AFSOCKET_STREAM)
1019
transport_flags |= LTF_SHUTDOWN;
1020
if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &error, &errorlen) == -1)
1022
msg_error("getsockopt(SOL_SOCKET, SO_ERROR) failed for connecting socket",
1023
evt_tag_int("fd", self->fd),
1024
evt_tag_str("server", g_sockaddr_format(self->dest_addr, buf2, sizeof(buf2), GSA_FULL)),
1025
evt_tag_errno(EVT_TAG_OSERROR, errno),
1026
evt_tag_int("time_reopen", self->time_reopen),
1028
goto error_reconnect;
1032
msg_error("Syslog connection failed",
1033
evt_tag_int("fd", self->fd),
1034
evt_tag_str("server", g_sockaddr_format(self->dest_addr, buf2, sizeof(buf2), GSA_FULL)),
1035
evt_tag_errno(EVT_TAG_OSERROR, error),
1036
evt_tag_int("time_reopen", self->time_reopen),
1038
goto error_reconnect;
1041
msg_notice("Syslog connection established",
1042
evt_tag_int("fd", self->fd),
1043
evt_tag_str("server", g_sockaddr_format(self->dest_addr, buf2, sizeof(buf2), GSA_FULL)),
1044
evt_tag_str("local", g_sockaddr_format(self->bind_addr, buf1, sizeof(buf1), GSA_FULL)),
1049
if (self->tls_context)
1051
TLSSession *tls_session;
1053
tls_session = tls_context_setup_session(self->tls_context);
1056
goto error_reconnect;
1059
tls_session_set_verify(tls_session, afsocket_dd_tls_verify_callback, self, NULL);
1060
transport = log_transport_tls_new(tls_session, self->fd, transport_flags);
1064
transport = log_transport_plain_new(self->fd, transport_flags);
1066
if (self->flags & AFSOCKET_SYSLOG_PROTOCOL)
1068
if (self->flags & AFSOCKET_STREAM)
1069
proto = log_proto_framed_client_new(transport);
1071
proto = log_proto_text_client_new(transport);
1075
proto = log_proto_text_client_new(transport);
1078
log_writer_reopen(self->writer, proto);
1083
afsocket_dd_start_reconnect_timer(self);
1088
afsocket_dd_start_connect(AFSocketDestDriver *self)
1091
gchar buf1[MAX_SOCKADDR_STRING], buf2[MAX_SOCKADDR_STRING];
1093
main_loop_assert_main_thread();
1094
if (!afsocket_open_socket(self->bind_addr, !!(self->flags & AFSOCKET_STREAM), &sock))
1099
if (self->setup_socket && !self->setup_socket(self, sock))
1105
rc = g_connect(sock, self->dest_addr);
1106
if (rc == G_IO_STATUS_NORMAL)
1109
afsocket_dd_connected(self);
1111
else if (rc == G_IO_STATUS_ERROR && errno == EINPROGRESS)
1113
/* we must wait until connect succeeds */
1116
afsocket_dd_start_watches(self);
1120
/* error establishing connection */
1121
msg_error("Connection failed",
1122
evt_tag_int("fd", sock),
1123
evt_tag_str("server", g_sockaddr_format(self->dest_addr, buf2, sizeof(buf2), GSA_FULL)),
1124
evt_tag_str("local", g_sockaddr_format(self->bind_addr, buf1, sizeof(buf1), GSA_FULL)),
1125
evt_tag_errno(EVT_TAG_OSERROR, errno),
1135
afsocket_dd_reconnect(AFSocketDestDriver *self)
1137
if (!afsocket_dd_start_connect(self))
1139
msg_error("Initiating connection failed, reconnecting",
1140
evt_tag_int("time_reopen", self->time_reopen),
1142
afsocket_dd_start_reconnect_timer(self);
1147
afsocket_dd_init(LogPipe *s)
1149
AFSocketDestDriver *self = (AFSocketDestDriver *) s;
1150
GlobalConfig *cfg = log_pipe_get_config(s);
1152
if (!log_dest_driver_init_method(s))
1155
if (!afsocket_dd_apply_transport(self))
1158
/* these fields must be set up by apply_transport, so let's check if it indeed did */
1159
g_assert(self->transport);
1160
g_assert(self->bind_addr);
1161
g_assert(self->dest_addr);
1162
g_assert(self->hostname);
1163
g_assert(self->dest_name);
1167
self->time_reopen = cfg->time_reopen;
1170
log_writer_options_init(&self->writer_options, cfg, 0);
1171
self->writer = cfg_persist_config_fetch(cfg, afsocket_dd_format_persist_name(self, FALSE));
1174
/* NOTE: we open our writer with no fd, so we can send messages down there
1175
* even while the connection is not established */
1177
self->writer = log_writer_new(LW_FORMAT_PROTO |
1179
(((self->flags & AFSOCKET_STREAM) && !self->tls_context) ? LW_DETECT_EOF : 0) |
1181
((self->flags & AFSOCKET_STREAM) ? LW_DETECT_EOF : 0) |
1183
(self->flags & AFSOCKET_SYSLOG_PROTOCOL ? LW_SYSLOG_PROTOCOL : 0));
1186
log_writer_set_options((LogWriter *) self->writer, &self->super.super.super, &self->writer_options, 0, afsocket_dd_stats_source(self), self->super.super.id, afsocket_dd_stats_instance(self));
1187
log_writer_set_queue(self->writer, log_dest_driver_acquire_queue(&self->super, afsocket_dd_format_persist_name(self, TRUE)));
1189
log_pipe_init(self->writer, NULL);
1190
log_pipe_append(&self->super.super.super, self->writer);
1192
if (!log_writer_opened((LogWriter *) self->writer))
1193
afsocket_dd_reconnect(self);
1198
afsocket_dd_deinit(LogPipe *s)
1200
AFSocketDestDriver *self = (AFSocketDestDriver *) s;
1201
GlobalConfig *cfg = log_pipe_get_config(s);
1203
afsocket_dd_stop_watches(self);
1206
log_pipe_deinit(self->writer);
1208
if (self->flags & AFSOCKET_KEEP_ALIVE)
1210
cfg_persist_config_add(cfg, afsocket_dd_format_persist_name(self, FALSE), self->writer, (GDestroyNotify) log_pipe_unref, FALSE);
1211
self->writer = NULL;
1214
if (!log_dest_driver_deinit_method(s))
1221
afsocket_dd_notify(LogPipe *s, LogPipe *sender, gint notify_code, gpointer user_data)
1223
AFSocketDestDriver *self = (AFSocketDestDriver *) s;
1224
gchar buf[MAX_SOCKADDR_STRING];
1226
switch (notify_code)
1229
case NC_WRITE_ERROR:
1230
log_writer_reopen(self->writer, NULL);
1232
msg_notice("Syslog connection broken",
1233
evt_tag_int("fd", self->fd),
1234
evt_tag_str("server", g_sockaddr_format(self->dest_addr, buf, sizeof(buf), GSA_FULL)),
1235
evt_tag_int("time_reopen", self->time_reopen),
1237
afsocket_dd_start_reconnect_timer(self);
1243
afsocket_dd_setup_socket(AFSocketDestDriver *self, gint fd)
1245
return afsocket_setup_socket(fd, self->sock_options_ptr, AFSOCKET_DIR_SEND);
1249
afsocket_dd_free(LogPipe *s)
1251
AFSocketDestDriver *self = (AFSocketDestDriver *) s;
1253
log_writer_options_destroy(&self->writer_options);
1254
g_sockaddr_unref(self->bind_addr);
1255
g_sockaddr_unref(self->dest_addr);
1256
log_pipe_unref(self->writer);
1257
g_free(self->hostname);
1258
g_free(self->dest_name);
1259
g_free(self->transport);
1260
log_dest_driver_free(s);
1264
afsocket_dd_init_instance(AFSocketDestDriver *self, SocketOptions *sock_options, gint family, const gchar *hostname, guint32 flags)
1266
log_dest_driver_init_instance(&self->super);
1268
log_writer_options_defaults(&self->writer_options);
1269
self->super.super.super.init = afsocket_dd_init;
1270
self->super.super.super.deinit = afsocket_dd_deinit;
1271
/* NULL behaves as if log_msg_forward_msg was specified */
1272
self->super.super.super.queue = NULL;
1273
self->super.super.super.free_fn = afsocket_dd_free;
1274
self->super.super.super.notify = afsocket_dd_notify;
1275
self->setup_socket = afsocket_dd_setup_socket;
1276
self->sock_options_ptr = sock_options;
1277
self->address_family = family;
1278
self->flags = flags | AFSOCKET_KEEP_ALIVE;
1280
self->hostname = g_strdup(hostname);
1282
afsocket_dd_init_watches(self);