1
/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
8
#include "master-service.h"
9
#include "replicator-queue.h"
10
#include "notify-connection.h"
14
#define MAX_INBUF_SIZE (1024*64)
15
#define NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION 1
16
#define NOTIFY_CLIENT_PROTOCOL_MINOR_VERSION 0
18
struct notify_connection {
19
struct notify_connection *prev, *next;
24
struct istream *input;
25
struct ostream *output;
27
struct replicator_queue *queue;
29
unsigned int version_received:1;
30
unsigned int destroyed:1;
33
struct notify_sync_request {
34
struct notify_connection *conn;
38
static struct notify_connection *connections;
40
static void notify_connection_destroy(struct notify_connection *conn);
42
static void notify_sync_callback(bool success, void *context)
44
struct notify_sync_request *request = context;
46
o_stream_send_str(request->conn->output, t_strdup_printf(
47
"%c\t%u\n", success ? '+' : '-', request->id));
49
notify_connection_unref(&request->conn);
54
notify_connection_input_line(struct notify_connection *conn, const char *line)
56
struct notify_sync_request *request;
57
const char *const *args;
58
enum replication_priority priority;
61
/* U \t <username> \t <priority> [\t <sync id>] */
62
args = t_strsplit_tabescaped(line);
63
if (str_array_length(args) < 2) {
64
i_error("notify client sent invalid input: %s", line);
67
if (strcmp(args[0], "U") != 0) {
68
i_error("notify client sent unknown command: %s", args[0]);
71
if (replication_priority_parse(args[2], &priority) < 0) {
72
i_error("notify client sent invalid priority: %s", args[2]);
75
if (priority != REPLICATION_PRIORITY_SYNC)
76
replicator_queue_add(conn->queue, args[1], priority);
77
else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) {
78
i_error("notify client sent invalid sync id: %s", line);
81
request = i_new(struct notify_sync_request, 1);
84
notify_connection_ref(conn);
85
replicator_queue_add_sync(conn->queue, args[1],
86
notify_sync_callback, request);
91
static void notify_connection_input(struct notify_connection *conn)
96
switch (i_stream_read(conn->input)) {
98
i_error("BUG: Client connection sent too much data");
99
notify_connection_destroy(conn);
102
notify_connection_destroy(conn);
106
if (!conn->version_received) {
107
if ((line = i_stream_next_line(conn->input)) == NULL)
110
if (!version_string_verify(line, "replicator-notify",
111
NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION)) {
112
i_error("Notify client not compatible with this server "
113
"(mixed old and new binaries?)");
114
notify_connection_destroy(conn);
117
conn->version_received = TRUE;
120
while ((line = i_stream_next_line(conn->input)) != NULL) {
122
ret = notify_connection_input_line(conn, line);
125
notify_connection_destroy(conn);
131
struct notify_connection *
132
notify_connection_create(int fd, struct replicator_queue *queue)
134
struct notify_connection *conn;
138
conn = i_new(struct notify_connection, 1);
142
conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE, FALSE);
143
conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
144
conn->io = io_add(fd, IO_READ, notify_connection_input, conn);
147
DLLIST_PREPEND(&connections, conn);
151
static void notify_connection_destroy(struct notify_connection *conn)
155
conn->destroyed = TRUE;
157
DLLIST_REMOVE(&connections, conn);
159
io_remove(&conn->io);
160
i_stream_close(conn->input);
161
o_stream_close(conn->output);
162
if (close(conn->fd) < 0)
163
i_error("close(notify connection) failed: %m");
166
notify_connection_unref(&conn);
167
master_service_client_connection_destroyed(master_service);
170
void notify_connection_ref(struct notify_connection *conn)
172
i_assert(conn->refcount > 0);
177
void notify_connection_unref(struct notify_connection **_conn)
179
struct notify_connection *conn = *_conn;
181
i_assert(conn->refcount > 0);
184
if (--conn->refcount > 0)
187
notify_connection_destroy(conn);
188
i_stream_unref(&conn->input);
189
o_stream_unref(&conn->output);
193
void notify_connections_destroy_all(void)
195
while (connections != NULL)
196
notify_connection_destroy(connections);