1
/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
1
/* Copyright (c) 2013 Dovecot authors, see the included COPYING file */
4
#include "connection.h"
9
7
#include "strescape.h"
8
#include "wildcard-match.h"
9
#include "master-service.h"
10
#include "replicator-queue.h"
10
11
#include "doveadm-connection.h"
12
13
#include <unistd.h>
14
#define DOVEADM_FAIL_TIMEOUT_MSECS (1000*5)
15
#define DOVEADM_HANDSHAKE "VERSION\tdoveadm-server\t1\t0\n"
16
#define MAX_INBUF_SIZE 1024
15
#define REPLICATOR_DOVEADM_MAJOR_VERSION 1
16
#define REPLICATOR_DOVEADM_MINOR_VERSION 0
18
18
struct doveadm_connection {
22
struct istream *input;
23
struct ostream *output;
26
doveadm_callback_t *callback;
29
time_t last_connect_failure;
30
unsigned int handshaked:1;
31
unsigned int end_of_print:1;
19
struct connection conn;
20
struct replicator_queue *queue;
34
struct doveadm_connection *doveadm_connection_init(const char *path)
36
struct doveadm_connection *conn;
38
conn = i_new(struct doveadm_connection, 1);
39
conn->path = i_strdup(path);
44
static void doveadm_callback(struct doveadm_connection *conn,
45
enum doveadm_reply reply)
47
doveadm_callback_t *callback = conn->callback;
48
void *context = conn->context;
51
timeout_remove(&conn->to);
53
conn->callback = NULL;
55
callback(reply, context);
58
static void doveadm_disconnect(struct doveadm_connection *conn)
64
o_stream_destroy(&conn->output);
65
i_stream_destroy(&conn->input);
66
if (close(conn->fd) < 0)
67
i_error("close(doveadm) failed: %m");
70
if (conn->callback != NULL)
71
doveadm_callback(conn, DOVEADM_REPLY_FAIL);
74
void doveadm_connection_deinit(struct doveadm_connection **_conn)
76
struct doveadm_connection *conn = *_conn;
80
doveadm_disconnect(conn);
85
static int doveadm_input_line(struct doveadm_connection *conn, const char *line)
87
if (!conn->handshaked) {
88
if (strcmp(line, "+") != 0) {
89
i_error("%s: Unexpected handshake: %s",
22
static struct connection_list *doveadm_connections;
24
static int client_input_status_overview(struct doveadm_connection *client)
26
struct replicator_queue_iter *iter;
27
struct replicator_user *user;
28
enum replication_priority priority;
29
unsigned int pending_counts[REPLICATION_PRIORITY_SYNC+1];
30
unsigned int user_count, next_secs, pending_failed_count;
31
unsigned int pending_full_resync_count, waiting_failed_count;
32
string_t *str = t_str_new(256);
34
memset(pending_counts, 0, sizeof(pending_counts));
35
pending_failed_count = 0; waiting_failed_count = 0;
36
pending_full_resync_count = 0;
39
iter = replicator_queue_iter_init(client->queue);
40
while ((user = replicator_queue_iter_next(iter)) != NULL) {
41
if (user->priority != REPLICATION_PRIORITY_NONE)
42
pending_counts[user->priority]++;
43
else if (replicator_queue_want_sync_now(client->queue,
45
if (user->last_sync_failed)
46
pending_failed_count++;
48
pending_full_resync_count++;
50
if (user->last_sync_failed)
51
waiting_failed_count++;
93
conn->handshaked = TRUE;
96
if (conn->callback == NULL) {
97
i_error("%s: Unexpected input: %s", conn->path, line);
100
if (!conn->end_of_print) {
102
conn->end_of_print = TRUE;
106
doveadm_callback(conn, DOVEADM_REPLY_OK);
107
else if (line[0] == '-') {
108
if (strcmp(line+1, "NOUSER") == 0)
109
doveadm_callback(conn, DOVEADM_REPLY_NOUSER);
111
doveadm_callback(conn, DOVEADM_REPLY_FAIL);
113
i_error("%s: Invalid input: %s", conn->path, line);
116
conn->end_of_print = FALSE;
117
/* FIXME: disconnect after each request for now.
118
doveadm server's getopt() handling seems to break otherwise */
55
replicator_queue_iter_deinit(&iter);
57
for (priority = REPLICATION_PRIORITY_SYNC; priority > 0; priority--) {
58
str_printfa(str, "Queued '%s' requests\t%u\n",
59
replicator_priority_to_str(priority),
60
pending_counts[priority]);
62
str_printfa(str, "Queued 'failed' requests\t%u\n",
63
pending_failed_count);
64
str_printfa(str, "Queued 'full resync' requests\t%u\n",
65
pending_full_resync_count);
66
str_printfa(str, "Waiting 'failed' requests\t%u\n",
67
waiting_failed_count);
68
str_printfa(str, "Total number of known users\t%u\n", user_count);
69
str_append_c(str, '\n');
70
o_stream_send(client->conn.output, str_data(str), str_len(str));
75
client_input_status(struct doveadm_connection *client, const char *const *args)
77
struct replicator_queue_iter *iter;
78
struct replicator_user *user;
79
const char *mask = args[0];
80
string_t *str = t_str_new(128);
83
return client_input_status_overview(client);
85
iter = replicator_queue_iter_init(client->queue);
86
while ((user = replicator_queue_iter_next(iter)) != NULL) {
87
if (!wildcard_match(user->username, mask))
91
str_append_tabescaped(str, user->username);
92
str_append_c(str, '\t');
93
str_append(str, replicator_priority_to_str(user->priority));
94
str_printfa(str, "\t%lld\t%lld\t%d\n",
95
(long long)user->last_fast_sync,
96
(long long)user->last_full_sync,
97
user->last_sync_failed);
98
o_stream_send(client->conn.output, str_data(str), str_len(str));
100
replicator_queue_iter_deinit(&iter);
101
o_stream_send(client->conn.output, "\n", 1);
106
client_input_replicate(struct doveadm_connection *client, const char *const *args)
108
struct replicator_queue_iter *iter;
109
struct replicator_user *user;
110
const char *usermask;
111
enum replication_priority priority;
112
unsigned int match_count;
114
/* <priority> <username>|<mask> */
115
if (str_array_length(args) != 2) {
116
i_error("%s: REPLICATE: Invalid parameters", client->conn.name);
119
if (replication_priority_parse(args[0], &priority) < 0) {
120
o_stream_send_str(client->conn.output, "-Invalid priority\n");
124
if (strchr(usermask, '*') == NULL && strchr(usermask, '?') == NULL) {
125
replicator_queue_add(client->queue, usermask, priority);
126
o_stream_send_str(client->conn.output, "+1\n");
131
iter = replicator_queue_iter_init(client->queue);
132
while ((user = replicator_queue_iter_next(iter)) != NULL) {
133
if (!wildcard_match(user->username, usermask))
135
replicator_queue_add(client->queue, user->username, priority);
138
replicator_queue_iter_deinit(&iter);
139
o_stream_send_str(client->conn.output,
140
t_strdup_printf("+%u\n", match_count));
145
client_input_remove(struct doveadm_connection *client, const char *const *args)
147
struct replicator_user *user;
150
if (str_array_length(args) != 1) {
151
i_error("%s: REMOVE: Invalid parameters", client->conn.name);
154
user = replicator_queue_lookup(client->queue, args[0]);
156
o_stream_send_str(client->conn.output, "-User not found\n");
158
replicator_queue_remove(client->queue, &user);
159
o_stream_send_str(client->conn.output, "+\n");
165
client_input_notify(struct doveadm_connection *client, const char *const *args)
167
struct replicator_user *user;
169
/* <username> <flags> <state> */
170
if (str_array_length(args) < 3) {
171
i_error("%s: NOTIFY: Invalid parameters", client->conn.name);
175
user = replicator_queue_add(client->queue, args[0],
176
REPLICATION_PRIORITY_NONE);
177
if (args[1][0] == 'f')
178
user->last_full_sync = ioloop_time;
179
user->last_fast_sync = ioloop_time;
180
user->last_update = ioloop_time;
182
if (args[2][0] != '\0') {
184
user->state = i_strdup(args[2]);
186
o_stream_send_str(client->conn.output, "+\n");
190
static int client_input_args(struct connection *conn, const char *const *args)
192
struct doveadm_connection *client = (struct doveadm_connection *)conn;
193
const char *cmd = args[0];
196
i_error("%s: Empty command", conn->name);
201
if (strcmp(cmd, "STATUS") == 0)
202
return client_input_status(client, args);
203
else if (strcmp(cmd, "REPLICATE") == 0)
204
return client_input_replicate(client, args);
205
else if (strcmp(cmd, "REMOVE") == 0)
206
return client_input_remove(client, args);
207
else if (strcmp(cmd, "NOTIFY") == 0)
208
return client_input_notify(client, args);
209
i_error("%s: Unknown command: %s", conn->name, cmd);
122
static void doveadm_input(struct doveadm_connection *conn)
126
while ((line = i_stream_read_next_line(conn->input)) != NULL) {
127
if (doveadm_input_line(conn, line) < 0) {
128
doveadm_disconnect(conn);
132
if (conn->input->eof)
133
doveadm_disconnect(conn);
136
static int doveadm_connect(struct doveadm_connection *conn)
141
if (conn->last_connect_failure == ioloop_time)
144
conn->fd = net_connect_unix(conn->path);
145
if (conn->fd == -1) {
146
i_error("net_connect_unix(%s) failed: %m", conn->path);
147
conn->last_connect_failure = ioloop_time;
150
conn->last_connect_failure = 0;
151
conn->io = io_add(conn->fd, IO_READ, doveadm_input, conn);
152
conn->input = i_stream_create_fd(conn->fd, MAX_INBUF_SIZE, FALSE);
153
conn->output = o_stream_create_fd(conn->fd, (size_t)-1, FALSE);
154
o_stream_send_str(conn->output, DOVEADM_HANDSHAKE);
158
static void doveadm_fail_timeout(struct doveadm_connection *conn)
160
doveadm_callback(conn, DOVEADM_REPLY_FAIL);
163
void doveadm_connection_sync(struct doveadm_connection *conn,
164
const char *username, bool full,
165
doveadm_callback_t *callback, void *context)
169
i_assert(callback != NULL);
170
i_assert(!doveadm_connection_is_busy(conn));
172
conn->callback = callback;
173
conn->context = context;
175
if (doveadm_connect(conn) < 0) {
176
i_assert(conn->to == NULL);
177
conn->to = timeout_add(DOVEADM_FAIL_TIMEOUT_MSECS,
178
doveadm_fail_timeout, conn);
180
/* <flags> <username> <command> [<args>] */
181
cmd = t_str_new(256);
182
str_append_c(cmd, '\t');
183
str_tabescape_write(cmd, username);
184
str_append(cmd, "\tsync\t-d");
186
str_append(cmd, "\t-f");
187
str_append_c(cmd, '\n');
188
o_stream_send(conn->output, str_data(cmd), str_len(cmd));
192
bool doveadm_connection_is_busy(struct doveadm_connection *conn)
194
return conn->callback != NULL;
213
static void client_destroy(struct connection *conn)
215
struct doveadm_connection *client = (struct doveadm_connection *)conn;
217
connection_deinit(&client->conn);
220
master_service_client_connection_destroyed(master_service);
223
void doveadm_connection_create(struct replicator_queue *queue, int fd)
225
struct doveadm_connection *client;
227
client = i_new(struct doveadm_connection, 1);
228
client->queue = queue;
229
connection_init_server(doveadm_connections, &client->conn,
230
"(doveadm client)", fd, fd);
233
static struct connection_settings doveadm_conn_set = {
234
.service_name_in = "replicator-doveadm-client",
235
.service_name_out = "replicator-doveadm-server",
236
.major_version = REPLICATOR_DOVEADM_MAJOR_VERSION,
237
.minor_version = REPLICATOR_DOVEADM_MINOR_VERSION,
239
.input_max_size = (size_t)-1,
240
.output_max_size = (size_t)-1,
244
static const struct connection_vfuncs doveadm_conn_vfuncs = {
245
.destroy = client_destroy,
246
.input_args = client_input_args
249
void doveadm_connections_init(void)
251
doveadm_connections = connection_list_init(&doveadm_conn_set,
252
&doveadm_conn_vfuncs);
255
void doveadm_connections_deinit(void)
257
connection_list_deinit(&doveadm_connections);