19
46
#include <stdlib.h>
20
47
#include <unistd.h>
22
#define DIRECTOR_VERSION_NAME "director"
23
#define DIRECTOR_VERSION_MAJOR 1
24
#define DIRECTOR_VERSION_MINOR 0
26
49
#define MAX_INBUF_SIZE 1024
27
50
#define MAX_OUTBUF_SIZE (1024*1024*10)
28
51
#define OUTBUF_FLUSH_THRESHOLD (1024*128)
29
/* Max idling time while connecting/handshaking before disconnecting */
30
#define DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS (2*1000)
31
/* How long to wait for PONG after PING request */
32
#define DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS (2*1000)
52
/* Max idling time before "ME" command must have been received,
53
or we'll disconnect. */
54
#define DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS (10*1000)
55
/* Max time to wait for USERs in handshake to be sent. With a lot of users the
56
kernel may quickly eat up everything we send, while the receiver is busy
58
#define DIRECTOR_CONNECTION_SEND_USERS_TIMEOUT_MSECS (30*1000)
59
/* Max idling time before "DONE" command must have been received,
60
or we'll disconnect. */
61
#define DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS (30*1000)
62
/* How long to wait for PONG for an idling connection */
63
#define DIRECTOR_CONNECTION_PING_IDLE_TIMEOUT_MSECS (10*1000)
64
/* Maximum time to wait for PONG reply */
65
#define DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS (60*1000)
33
66
/* How long to wait to send PING when connection is idle */
34
67
#define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000)
35
68
/* How long to wait before sending PING while waiting for SYNC reply */
36
#define DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS 1000
69
#define DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS 1000
37
70
/* If outgoing director connection exists for less than this many seconds,
38
71
mark the host as failed so we won't try to reconnect to it immediately */
39
#define DIRECTOR_SUCCESS_MIN_CONNECT_SECS 10
72
#define DIRECTOR_SUCCESS_MIN_CONNECT_SECS 40
73
#define DIRECTOR_WAIT_DISCONNECT_SECS 10
75
#if DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS <= DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS
76
# error DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS is too low
79
#if DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS <= DIRECTOR_CONNECTION_PING_IDLE_TIMEOUT_MSECS
80
# error DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS is too low
83
#define CMD_IS_USER_HANDHAKE(args) \
84
(str_array_length(args) > 2)
41
86
struct director_connection {
42
struct director_connection *prev, *next;
44
87
struct director *dir;
90
unsigned int minor_version;
48
92
/* for incoming connections the director host isn't known until
49
93
ME-line is received */
65
112
unsigned int ignore_host_events:1;
66
113
unsigned int handshake_sending_hosts:1;
67
114
unsigned int ping_waiting:1;
68
unsigned int sync_ping:1;
115
unsigned int synced:1;
116
unsigned int wrong_host:1;
117
unsigned int verifying_left:1;
71
static void director_connection_ping(struct director_connection *conn);
72
120
static void director_connection_disconnected(struct director_connection **conn);
121
static void director_connection_reconnect(struct director_connection **conn);
123
static void ATTR_FORMAT(2, 3)
124
director_cmd_error(struct director_connection *conn, const char *fmt, ...)
129
i_error("director(%s): Command %s: %s (input: %s)", conn->name,
130
conn->cur_cmd, t_strdup_vprintf(fmt, args), conn->cur_line);
133
conn->host->last_protocol_failure = ioloop_time;
137
director_connection_init_timeout(struct director_connection *conn)
139
unsigned int secs = ioloop_time - conn->created;
141
if (!conn->connected) {
142
i_error("director(%s): Connect timed out (%u secs)",
144
} else if (conn->io == NULL) {
145
i_error("director(%s): Sending handshake (%u secs)",
147
} else if (!conn->me_received) {
148
i_error("director(%s): Handshaking ME timed out (%u secs)",
151
i_error("director(%s): Handshaking DONE timed out (%u secs)",
154
director_connection_disconnected(&conn);
158
director_connection_set_ping_timeout(struct director_connection *conn)
162
msecs = conn->synced || !conn->handshake_received ?
163
DIRECTOR_CONNECTION_PING_INTERVAL_MSECS :
164
DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS;
166
timeout_remove(&conn->to_ping);
167
conn->to_ping = timeout_add(msecs, director_connection_ping, conn);
170
static void director_connection_wait_timeout(struct director_connection *conn)
172
director_connection_deinit(&conn);
175
static void director_connection_send_connect(struct director_connection *conn,
176
struct director_host *host)
178
const char *connect_str;
180
if (conn->to_disconnect != NULL)
183
connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
184
net_ip2addr(&host->ip), host->port);
185
director_connection_send(conn, connect_str);
186
(void)o_stream_flush(conn->output);
187
o_stream_uncork(conn->output);
189
conn->to_disconnect =
190
timeout_add(DIRECTOR_WAIT_DISCONNECT_SECS*1000,
191
director_connection_wait_timeout, conn);
194
static void director_connection_assigned(struct director_connection *conn)
196
struct director *dir = conn->dir;
198
if (dir->left != NULL && dir->right != NULL) {
199
/* we're connected to both directors. see if the ring is
200
finished by sending a SYNC. if we get it back, it's done. */
202
director_set_ring_unsynced(dir);
203
director_sync_send(dir, dir->self_host, dir->sync_seq,
204
DIRECTOR_VERSION_MINOR);
206
director_connection_set_ping_timeout(conn);
209
static bool director_connection_assign_left(struct director_connection *conn)
211
struct director *dir = conn->dir;
214
i_assert(dir->left != conn);
216
/* make sure this is the correct incoming connection */
217
if (conn->host->self) {
218
i_error("Connection from self, dropping");
220
} else if (dir->left == NULL) {
221
/* no conflicts yet */
222
} else if (dir->left->host == conn->host) {
223
i_info("Dropping existing connection %s "
224
"in favor of its new connection %s",
225
dir->left->host->name, conn->host->name);
226
director_connection_deinit(&dir->left);
227
} else if (dir->left->verifying_left) {
228
/* we're waiting to verify if our current left is still
229
working. if we don't receive a PONG, the current left
230
gets disconnected and a new left gets assigned. if we do
231
receive a PONG, we'll wait until the current left
232
disconnects us and then reassign the new left. */
234
} else if (director_host_cmp_to_self(dir->left->host, conn->host,
235
dir->self_host) < 0) {
236
/* the old connection is the correct one.
237
refer the client there (FIXME: do we ever get here?) */
238
i_warning("Director connection %s tried to connect to "
239
"us, should use %s instead",
240
conn->name, dir->left->host->name);
241
director_connection_send_connect(conn, dir->left->host);
244
/* this new connection is the correct one, but wait until the
245
old connection gets disconnected before using this one.
246
that guarantees that the director inserting itself into
247
the ring has finished handshaking its left side, so the
248
switch will be fast. */
253
conn->name = i_strdup_printf("%s/left", conn->host->name);
254
director_connection_assigned(conn);
258
static void director_assign_left(struct director *dir)
260
struct director_connection *conn, *const *connp;
262
array_foreach(&dir->connections, connp) {
265
if (conn->in && conn->handshake_received &&
266
conn->to_disconnect == NULL && conn != dir->left) {
267
/* either use this or disconnect it */
268
if (!director_connection_assign_left(conn)) {
269
/* we don't want this */
270
director_connection_deinit(&conn);
271
director_assign_left(dir);
278
static bool director_has_outgoing_connections(struct director *dir)
280
struct director_connection *const *connp;
282
array_foreach(&dir->connections, connp) {
283
if (!(*connp)->in && (*connp)->to_disconnect == NULL)
289
static bool director_connection_assign_right(struct director_connection *conn)
291
struct director *dir = conn->dir;
295
if (dir->right != NULL) {
296
/* see if we should disconnect or keep the existing
298
if (director_host_cmp_to_self(conn->host, dir->right->host,
299
dir->self_host) <= 0) {
300
/* the old connection is the correct one */
301
i_warning("Aborting incorrect outgoing connection to %s "
302
"(already connected to correct one: %s)",
303
conn->host->name, dir->right->host->name);
304
conn->wrong_host = TRUE;
307
i_info("Replacing right director connection %s with %s",
308
dir->right->host->name, conn->host->name);
309
director_connection_deinit(&dir->right);
313
conn->name = i_strdup_printf("%s/right", conn->host->name);
314
director_connection_assigned(conn);
75
319
director_args_parse_ip_port(struct director_connection *conn,
76
320
const char *const *args,
77
321
struct ip_addr *ip_r, unsigned int *port_r)
323
if (args[0] == NULL || args[1] == NULL) {
324
director_cmd_error(conn, "Missing IP+port parameters");
79
327
if (net_addr2ip(args[0], ip_r) < 0) {
80
i_error("director(%s): Command has invalid IP address: %s",
328
director_cmd_error(conn, "Invalid IP address: %s", args[0]);
84
331
if (str_to_uint(args[1], port_r) < 0) {
85
i_error("director(%s): Command has invalid port: %s",
332
director_cmd_error(conn, "Invalid port: %s", args[1]);
109
359
net_ip2addr(&ip), port);
112
host = director_host_get(dir, &ip, port);
113
/* the host is up now, make sure we can connect to it immediately
115
host->last_failed = 0;
116
362
conn->me_received = TRUE;
364
timeout_remove(&conn->to_ping);
365
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS,
366
director_connection_init_timeout, conn);
122
conn->name = i_strdup_printf("%s/left", host->name);
371
/* Incoming connection:
373
a) we don't have an established ring yet. make sure we're connecting
374
to our right side (which might become our left side).
376
b) it's our current "left" connection. the previous connection
379
c) we have an existing ring. tell our current "left" to connect to
380
it with CONNECT command.
382
d) the incoming connection doesn't belong to us at all, refer it
383
elsewhere with CONNECT. however, before disconnecting it verify
384
first that our left side is actually still functional.
386
i_assert(conn->host == NULL);
387
conn->host = director_host_get(dir, &ip, port);
388
/* the host shouldn't be removed at this point, but if for some
389
reason it is we don't want to crash */
390
conn->host->removed = FALSE;
391
director_host_ref(conn->host);
124
392
/* make sure we don't keep old sequence values across restarts */
393
conn->host->last_seq = 0;
127
connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
128
net_ip2addr(&host->ip), host->port);
129
/* make sure this is the correct incoming connection */
131
/* probably we're trying to find our own ip. it's no */
132
i_error("Connection from self, dropping");
395
next_comm_attempt = conn->host->last_protocol_failure +
396
DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS;
397
if (next_comm_attempt > ioloop_time) {
398
/* the director recently sent invalid protocol data,
399
don't try retrying yet */
400
i_error("director(%s): Remote sent invalid protocol data recently, "
401
"waiting %u secs before allowing further communication",
402
conn->name, (unsigned int)(next_comm_attempt-ioloop_time));
134
404
} else if (dir->left == NULL) {
135
/* no conflicts yet */
136
} else if (dir->left->host == host) {
137
i_warning("Dropping existing connection %s "
138
"in favor of its new connection %s",
139
dir->left->host->name, host->name);
405
/* a) - just in case the left is also our right side reset
406
its failed state, so we can connect to it */
407
conn->host->last_network_failure = 0;
408
if (!director_has_outgoing_connections(dir))
409
director_connect(dir);
410
} else if (dir->left->host == conn->host) {
412
i_assert(dir->left != conn);
140
413
director_connection_deinit(&dir->left);
142
if (director_host_cmp_to_self(dir->left->host, host,
143
dir->self_host) < 0) {
144
/* the old connection is the correct one.
145
refer the client there. */
146
i_warning("Director connection %s tried to connect to "
147
"us, should use %s instead",
148
host->name, dir->left->host->name);
149
director_connection_send(conn, t_strdup_printf(
151
net_ip2addr(&dir->left->host->ip),
152
dir->left->host->port));
153
/* also make sure that the connection is alive */
154
director_connection_ping(dir->left);
158
/* this new connection is the correct one. disconnect the old
159
one, but before that tell it to connect to the new one.
160
that message might not reach it, so also send the same
161
message to right side. */
162
i_warning("Replacing director connection %s with %s",
163
dir->left->host->name, host->name);
414
} else if (director_host_cmp_to_self(conn->host, dir->left->host,
415
dir->self_host) < 0) {
417
connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
418
net_ip2addr(&conn->host->ip),
164
420
director_connection_send(dir->left, connect_str);
165
(void)o_stream_flush(dir->left->output);
166
director_connection_deinit(&dir->left);
170
/* tell the ring's right side to connect to this new director. */
171
if (dir->right != NULL) {
172
if (dir->left->host != dir->right->host)
173
director_connection_send(dir->right, connect_str);
175
/* there are only two directors, and we already have
176
a connection to this server. */
179
/* there are only two directors. connect to the other one. */
180
(void)director_connect_host(dir, host);
423
dir->left->verifying_left = TRUE;
424
director_connection_ping(dir->left);
186
director_user_refresh(struct director *dir, unsigned int username_hash,
187
struct mail_host *host, time_t timestamp,
188
struct user **user_r)
430
director_user_refresh(struct director_connection *conn,
431
unsigned int username_hash, struct mail_host *host,
432
time_t timestamp, bool weak, struct user **user_r)
434
struct director *dir = conn->dir;
190
435
struct user *user;
436
bool ret = FALSE, unset_weak_user = FALSE;
193
438
user = user_directory_lookup(dir->users, username_hash);
194
439
if (user == NULL) {
195
440
*user_r = user_directory_add(dir->users, username_hash,
196
441
host, timestamp);
442
(*user_r)->weak = weak;
199
if (timestamp == ioloop_time && (time_t)user->timestamp != timestamp) {
200
user_directory_refresh(dir->users, user);
448
/* removing user's weakness */
449
unset_weak_user = TRUE;
453
/* weak user marked again as weak */
456
!user_directory_user_is_recently_updated(dir->users, user)) {
457
/* mark the user as weak */
204
if (user->host != host) {
205
i_error("User hash %u is being redirected to two hosts: "
206
"%s and %s", username_hash,
207
net_ip2addr(&user->host->ip),
208
net_ip2addr(&host->ip));
460
} else if (user->host != host) {
461
/* non-weak user received a non-weak update with
462
conflicting host. this shouldn't happen. */
463
string_t *str = t_str_new(128);
465
str_printfa(str, "User hash %u "
466
"is being redirected to two hosts: %s and %s",
467
username_hash, net_ip2addr(&user->host->ip),
468
net_ip2addr(&host->ip));
469
str_printfa(str, " (old_ts=%ld", (long)user->timestamp);
471
if (!conn->handshake_received) {
472
str_printfa(str, ",handshaking,recv_ts=%ld",
475
if (user->to_move != NULL)
476
str_append(str, ",moving");
477
if (user->kill_state != USER_KILL_STATE_NONE)
478
str_printfa(str, ",kill_state=%d", user->kill_state);
479
str_append_c(str, ')');
480
i_error("%s", str_c(str));
210
482
/* we want all the directors to redirect the user to same
211
483
server, but we don't want two directors fighting over which
286
581
struct director_host *host;
287
582
struct ip_addr ip;
288
583
unsigned int port;
584
bool forward = FALSE;
290
586
if (!director_args_parse_ip_port(conn, args, &ip, &port))
293
589
host = director_host_lookup(conn->dir, &ip, port);
294
590
if (host != NULL) {
295
/* already have this. just reset its last_failed timestamp,
296
since it might be up now. */
297
host->last_failed = 0;
301
/* save the director and forward it */
302
director_host_add(conn->dir, &ip, port);
303
director_connection_send(conn->dir->right,
304
t_strdup_printf("DIRECTOR\t%s\t%u\n", net_ip2addr(&ip), port));
591
if (host == conn->dir->self_host) {
592
/* ignore updates to ourself */
596
/* ignore re-adds of removed directors */
600
/* already have this. just reset its last_network_failure
601
timestamp, since it might be up now. */
602
host->last_network_failure = 0;
603
if (host->last_seq != 0) {
604
/* it also may have been restarted, reset last_seq */
609
/* save the director and forward it */
610
host = director_host_add(conn->dir, &ip, port);
614
director_notify_ring_added(host,
615
director_connection_get_host(conn));
620
static bool director_cmd_director_remove(struct director_connection *conn,
621
const char *const *args)
623
struct director_host *host;
627
if (!director_args_parse_ip_port(conn, args, &ip, &port))
630
host = director_host_lookup(conn->dir, &ip, port);
631
if (host != NULL && !host->removed)
632
director_ring_remove(host, director_connection_get_host(conn));
545
static void director_handshake_cmd_done(struct director_connection *conn)
919
static bool director_handshake_cmd_done(struct director_connection *conn)
547
921
struct director *dir = conn->dir;
550
i_debug("Handshaked to %s", conn->host->name);
552
conn->host->last_failed = 0;
924
unsigned int secs = time(NULL)-conn->created;
926
i_debug("director(%s): Handshake took %u secs, "
927
"bytes in=%"PRIuUOFF_T" out=%"PRIuUOFF_T,
928
conn->name, secs, conn->input->v_offset,
929
conn->output->offset);
932
/* the host is up now, make sure we can connect to it immediately
934
conn->host->last_network_failure = 0;
553
936
conn->handshake_received = TRUE;
555
938
/* handshaked to left side. tell it we've received the
556
939
whole handshake. */
557
940
director_connection_send(conn, "DONE\n");
559
/* tell the right director about the left one */
560
if (dir->right != NULL) {
561
director_connection_send(dir->right,
562
t_strdup_printf("DIRECTOR\t%s\t%u\n",
563
net_ip2addr(&conn->host->ip),
568
if (dir->left != NULL && dir->right != NULL &&
569
dir->left->handshake_received && dir->right->handshake_received) {
570
/* we're connected to both directors. see if the ring is
571
finished by sending a SYNC. if we get it back, it's done. */
573
dir->ring_synced = FALSE;
574
director_connection_send(dir->right,
575
t_strdup_printf("SYNC\t%s\t%u\t%u\n",
576
net_ip2addr(&dir->self_ip),
577
dir->self_port, dir->sync_seq));
579
if (conn->to_ping != NULL)
580
timeout_remove(&conn->to_ping);
581
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
582
director_connection_ping, conn);
942
/* tell the "right" director about the "left" one */
943
director_update_send(dir, director_connection_get_host(conn),
944
t_strdup_printf("DIRECTOR\t%s\t%u\n",
945
net_ip2addr(&conn->host->ip),
947
/* this is our "left" side. */
948
return director_connection_assign_left(conn);
950
/* handshaked to "right" side. */
951
return director_connection_assign_right(conn);
586
956
director_connection_handle_handshake(struct director_connection *conn,
587
957
const char *cmd, const char *const *args)
589
struct director_host *host;
593
959
/* both incoming and outgoing connections get VERSION and ME */
594
960
if (strcmp(cmd, "VERSION") == 0 && str_array_length(args) >= 3) {
595
961
if (strcmp(args[0], DIRECTOR_VERSION_NAME) != 0) {
596
962
i_error("director(%s): Wrong protocol in socket "
598
964
conn->name, args[0], DIRECTOR_VERSION_NAME);
600
966
} else if (atoi(args[1]) != DIRECTOR_VERSION_MAJOR) {
601
967
i_error("director(%s): Incompatible protocol version: "
602
968
"%u vs %u", conn->name, atoi(args[1]),
603
969
DIRECTOR_VERSION_MAJOR);
972
conn->minor_version = atoi(args[2]);
606
973
conn->version_received = TRUE;
609
976
if (!conn->version_received) {
610
i_error("director(%s): Incompatible protocol", conn->name);
614
if (strcmp(cmd, "ME") == 0 && !conn->me_received &&
615
str_array_length(args) == 2)
616
return director_cmd_me(conn, args);
618
/* only outgoing connections get a CONNECT reference */
619
if (!conn->in && strcmp(cmd, "CONNECT") == 0 &&
620
str_array_length(args) == 2) {
621
/* remote wants us to connect elsewhere */
622
if (!director_args_parse_ip_port(conn, args, &ip, &port))
625
conn->dir->right = NULL;
626
host = director_host_get(conn->dir, &ip, port);
627
/* reset failure timestamp so we'll actually try to
629
host->last_failed = 0;
630
if (conn->dir->debug)
631
i_debug("Received CONNECT reference to %s", host->name);
632
(void)director_connect_host(conn->dir, host);
635
/* only incoming connections get DIRECTOR and HOST lists */
636
if (conn->in && strcmp(cmd, "DIRECTOR") == 0 && conn->me_received)
637
return director_cmd_director(conn, args);
639
if (strcmp(cmd, "HOST") == 0) {
640
/* allow hosts from all connections always,
641
this could be an host update */
642
if (conn->handshake_sending_hosts)
643
return director_cmd_host_handshake(conn, args);
645
return director_cmd_host(conn, args);
647
if (conn->handshake_sending_hosts &&
648
strcmp(cmd, "HOST-HAND-END") == 0) {
649
conn->ignore_host_events = FALSE;
650
conn->handshake_sending_hosts = FALSE;
653
if (conn->in && strcmp(cmd, "HOST-HAND-START") == 0 &&
655
return director_cmd_host_hand_start(conn, args);
657
/* only incoming connections get a full USER list, but outgoing
658
connections can also receive USER updates during handshake and
659
it wouldn't be safe to ignore them. */
660
if (strcmp(cmd, "USER") == 0 && conn->me_received) {
662
return director_handshake_cmd_user(conn, args);
664
return director_cmd_user(conn, args);
977
director_cmd_error(conn, "Incompatible protocol");
981
if (strcmp(cmd, "ME") == 0)
982
return director_cmd_me(conn, args) ? 1 : -1;
983
if (!conn->me_received) {
984
director_cmd_error(conn, "Expecting ME command first");
988
/* incoming connections get a HOST list */
989
if (conn->handshake_sending_hosts) {
990
if (strcmp(cmd, "HOST") == 0)
991
return director_cmd_host_handshake(conn, args) ? 1 : -1;
992
if (strcmp(cmd, "HOST-HAND-END") == 0) {
993
conn->ignore_host_events = FALSE;
994
conn->handshake_sending_hosts = FALSE;
997
director_cmd_error(conn, "Unexpected command during host list");
1000
if (strcmp(cmd, "HOST-HAND-START") == 0) {
1002
director_cmd_error(conn,
1003
"Host list is only for incoming connections");
1006
return director_cmd_host_hand_start(conn, args) ? 1 : -1;
1009
if (conn->in && strcmp(cmd, "USER") == 0 && CMD_IS_USER_HANDHAKE(args))
1010
return director_handshake_cmd_user(conn, args) ? 1 : -1;
666
1012
/* both get DONE */
667
if (strcmp(cmd, "DONE") == 0 && !conn->handshake_received &&
668
!conn->handshake_sending_hosts) {
669
director_handshake_cmd_done(conn);
672
i_error("director(%s): Invalid handshake command: %s "
673
"(in=%d me_received=%d)", conn->name, cmd,
674
conn->in, conn->me_received);
1013
if (strcmp(cmd, "DONE") == 0)
1014
return director_handshake_cmd_done(conn) ? 1 : -1;
678
static bool director_connection_sync(struct director_connection *conn,
679
const char *const *args, const char *line)
1019
director_connection_sync_host(struct director_connection *conn,
1020
struct director_host *host,
1021
uint32_t seq, unsigned int minor_version)
681
1023
struct director *dir = conn->dir;
682
struct director_host *host;
684
unsigned int port, seq;
686
if (str_array_length(args) != 3 ||
687
!director_args_parse_ip_port(conn, args, &ip, &port) ||
688
str_to_uint(args[2], &seq) < 0) {
689
i_error("director(%s): Invalid SYNC args", conn->name);
1025
if (minor_version > DIRECTOR_VERSION_MINOR) {
1026
/* we're not up to date */
1027
minor_version = DIRECTOR_VERSION_MINOR;
693
/* find the originating director. if we don't see it, it was already
694
removed and we can ignore this sync. */
695
host = director_host_lookup(dir, &ip, port);
699
1030
if (host->self) {
700
1031
if (dir->sync_seq != seq) {
701
1032
/* stale SYNC event */
1035
/* sync_seq increases when we get disconnected, so we must be
1036
successfully connected to both directions */
1037
i_assert(dir->left != NULL && dir->right != NULL);
1039
dir->ring_min_version = minor_version;
705
1040
if (!dir->ring_handshaked) {
706
1041
/* the ring is handshaked */
707
1042
director_set_ring_handshaked(dir);
708
1043
} else if (dir->ring_synced) {
709
i_error("Received SYNC from %s (seq=%u) "
710
"while already synced", conn->name, seq);
1044
/* duplicate SYNC (which was sent just in case the
1045
previous one got lost) */
713
1047
if (dir->debug) {
714
1048
i_debug("Ring is synced (%s sent seq=%u)",
1136
static void director_disconnect_wrong_lefts(struct director *dir)
1138
struct director_connection *const *connp, *conn;
1140
array_foreach(&dir->connections, connp) {
1143
if (conn->in && conn != dir->left && conn->me_received &&
1144
conn->to_disconnect == NULL &&
1145
director_host_cmp_to_self(dir->left->host, conn->host,
1146
dir->self_host) < 0) {
1147
i_warning("Director connection %s tried to connect to "
1148
"us, should use %s instead",
1149
conn->name, dir->left->host->name);
1150
director_connection_send_connect(conn, dir->left->host);
780
1155
static bool director_cmd_pong(struct director_connection *conn)
782
1157
if (!conn->ping_waiting)
785
1159
conn->ping_waiting = FALSE;
786
timeout_remove(&conn->to_ping);
787
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
788
director_connection_ping, conn);
1160
timeout_remove(&conn->to_pong);
1162
if (conn->verifying_left) {
1163
conn->verifying_left = FALSE;
1164
if (conn == conn->dir->left) {
1165
/* our left side is functional. tell all the wrong
1166
incoming connections to connect to it instead. */
1167
director_disconnect_wrong_lefts(conn->dir);
1171
director_connection_set_ping_timeout(conn);
793
director_connection_handle_line(struct director_connection *conn,
1176
director_connection_handle_cmd(struct director_connection *conn,
1177
const char *cmd, const char *const *args)
796
const char *cmd, *const *args;
798
args = t_strsplit(line, "\t");
799
cmd = args[0]; args++;
801
i_error("director(%s): Received empty line", conn->name);
1181
if (!conn->handshake_received) {
1182
ret = director_connection_handle_handshake(conn, cmd, args);
1186
/* invalid commands during handshake,
1187
we probably don't want to reconnect here */
1190
/* allow also other commands during handshake */
805
/* ping/pong is always handled */
806
1193
if (strcmp(cmd, "PING") == 0) {
807
1194
director_connection_send(conn, "PONG\n");
810
1197
if (strcmp(cmd, "PONG") == 0)
811
1198
return director_cmd_pong(conn);
813
if (!conn->handshake_received) {
814
if (!director_connection_handle_handshake(conn, cmd, args)) {
815
/* invalid commands during handshake,
816
we probably don't want to reconnect here */
817
if (conn->dir->debug) {
818
i_debug("director(%s): Handshaking failed",
821
if (conn->host != NULL)
822
conn->host->last_failed = ioloop_time;
828
1199
if (strcmp(cmd, "USER") == 0)
829
1200
return director_cmd_user(conn, args);
1201
if (strcmp(cmd, "USER-WEAK") == 0)
1202
return director_cmd_user_weak(conn, args);
830
1203
if (strcmp(cmd, "HOST") == 0)
831
1204
return director_cmd_host(conn, args);
832
1205
if (strcmp(cmd, "HOST-REMOVE") == 0)
1033
1428
if ((err = net_geterror(conn->fd)) != 0) {
1034
conn->host->last_failed = ioloop_time;
1035
1429
i_error("director(%s): connect() failed: %s", conn->name,
1036
1430
strerror(err));
1037
1431
director_connection_disconnected(&conn);
1040
1434
conn->connected = TRUE;
1042
if (dir->right != NULL) {
1043
/* see if we should disconnect or keep the existing
1045
if (director_host_cmp_to_self(conn->host, dir->right->host,
1046
dir->self_host) <= 0) {
1047
/* the old connection is the correct one */
1048
i_warning("Aborting incorrect outgoing connection to %s "
1049
"(already connected to correct one: %s)",
1050
conn->host->name, dir->right->host->name);
1051
director_connection_deinit(&conn);
1054
i_warning("Replacing director connection %s with %s",
1055
dir->right->host->name, conn->host->name);
1056
director_connection_deinit(&dir->right);
1060
conn->name = i_strdup_printf("%s/right", conn->host->name);
1435
o_stream_set_flush_callback(conn->output,
1436
director_connection_output, conn);
1062
1438
io_remove(&conn->io);
1439
conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
1441
timeout_remove(&conn->to_ping);
1442
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SEND_USERS_TIMEOUT_MSECS,
1443
director_connection_init_timeout, conn);
1445
o_stream_cork(conn->output);
1064
1446
director_connection_send_handshake(conn);
1065
1447
director_connection_send_directors(conn, str);
1066
1448
director_connection_send_hosts(conn, str);
1067
1449
director_connection_send(conn, str_c(str));
1069
1451
conn->user_iter = user_directory_iter_init(dir->users);
1070
(void)director_connection_send_users(conn);
1452
if (director_connection_send_users(conn) == 0)
1453
o_stream_set_flush_pending(conn->output, TRUE);
1454
o_stream_uncork(conn->output);
1073
1457
struct director_connection *
1077
1461
struct director_connection *conn;
1463
i_assert(!host->removed);
1079
1465
/* make sure we don't keep old sequence values across restarts */
1080
1466
host->last_seq = 0;
1082
1468
conn = director_connection_init_common(dir, fd);
1083
1469
conn->name = i_strdup_printf("%s/out", host->name);
1084
1470
conn->host = host;
1085
/* use IO_READ instead of IO_WRITE, so that we don't assign
1086
dir->right until remote has actually sent some data */
1087
conn->io = io_add(conn->fd, IO_READ,
1471
director_host_ref(host);
1472
conn->io = io_add(conn->fd, IO_WRITE,
1088
1473
director_connection_connected, conn);
1092
1477
void director_connection_deinit(struct director_connection **_conn)
1094
struct director_connection *conn = *_conn;
1479
struct director_connection *const *conns, *conn = *_conn;
1095
1480
struct director *dir = conn->dir;
1481
unsigned int i, count;
1099
1485
if (dir->debug && conn->host != NULL)
1100
1486
i_debug("Disconnecting from %s", conn->host->name);
1102
if (conn->host != NULL && !conn->in &&
1103
conn->created + DIRECTOR_SUCCESS_MIN_CONNECT_SECS > ioloop_time)
1104
conn->host->last_failed = ioloop_time;
1106
DLLIST_REMOVE(&dir->connections, conn);
1107
if (dir->left == conn)
1488
conns = array_get(&dir->connections, &count);
1489
for (i = 0; i < count; i++) {
1490
if (conns[i] == conn) {
1491
array_delete(&dir->connections, i, 1);
1495
i_assert(i < count);
1496
if (dir->left == conn) {
1108
1497
dir->left = NULL;
1498
/* if there is already another handshaked incoming connection,
1499
use it as the new "left" */
1500
director_assign_left(dir);
1109
1502
if (dir->right == conn)
1110
1503
dir->right = NULL;
1504
if (conn->host != NULL)
1505
director_host_unref(conn->host);
1112
1507
if (conn->user_iter != NULL)
1113
1508
user_directory_iter_deinit(&conn->user_iter);
1114
if (conn->to != NULL)
1115
timeout_remove(&conn->to);
1116
if (conn->to_ping != NULL)
1117
timeout_remove(&conn->to_ping);
1509
if (conn->to_disconnect != NULL)
1510
timeout_remove(&conn->to_disconnect);
1511
if (conn->to_pong != NULL)
1512
timeout_remove(&conn->to_pong);
1513
timeout_remove(&conn->to_ping);
1118
1514
if (conn->io != NULL)
1119
1515
io_remove(&conn->io);
1120
1516
i_stream_unref(&conn->input);
1167
1575
"disconnecting", conn->name);
1169
1577
o_stream_close(conn->output);
1170
conn->to = timeout_add(0, director_connection_timeout, conn);
1174
void director_connection_send_except(struct director_connection *conn,
1175
struct director_host *skip_host,
1178
if (conn->host != skip_host)
1179
director_connection_send(conn, data);
1182
static void director_connection_ping_timeout(struct director_connection *conn)
1582
director_connection_ping_idle_timeout(struct director_connection *conn)
1184
1584
i_error("director(%s): Ping timed out, disconnecting", conn->name);
1185
1585
director_connection_disconnected(&conn);
1188
static void director_connection_ping(struct director_connection *conn)
1190
conn->sync_ping = FALSE;
1588
static void director_connection_pong_timeout(struct director_connection *conn)
1590
i_error("director(%s): PONG reply not received although other "
1591
"input keeps coming, disconnecting", conn->name);
1592
director_connection_disconnected(&conn);
1595
void director_connection_ping(struct director_connection *conn)
1191
1597
if (conn->ping_waiting)
1194
if (conn->to_ping != NULL)
1195
timeout_remove(&conn->to_ping);
1196
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS,
1197
director_connection_ping_timeout, conn);
1600
timeout_remove(&conn->to_ping);
1601
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_IDLE_TIMEOUT_MSECS,
1602
director_connection_ping_idle_timeout, conn);
1603
conn->to_pong = timeout_add(DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS,
1604
director_connection_pong_timeout, conn);
1198
1605
director_connection_send(conn, "PING\n");
1199
1606
conn->ping_waiting = TRUE;