88
director_has_outgoing_connection(struct director *dir,
89
struct director_host *host)
91
struct director_connection *const *connp;
93
array_foreach(&dir->connections, connp) {
94
if (director_connection_get_host(*connp) == host &&
95
!director_connection_is_incoming(*connp))
83
101
int director_connect_host(struct director *dir, struct director_host *host)
85
103
unsigned int port;
88
if (director_connection_find_outgoing(dir, host) != NULL)
106
if (director_has_outgoing_connection(dir, host))
95
113
port = dir->test_port != 0 ? dir->test_port : host->port;
96
114
fd = net_connect_ip(&host->ip, port, &dir->self_ip);
98
host->last_failed = ioloop_time;
116
host->last_network_failure = ioloop_time;
99
117
i_error("connect(%s) failed: %m", host->name);
102
120
/* Reset timestamp so that director_connect() won't skip this host
103
121
while we're still trying to connect to it */
104
host->last_failed = 0;
122
host->last_network_failure = 0;
106
124
director_connection_init_out(dir, fd, host);
110
128
static struct director_host *
111
129
director_get_preferred_right_host(struct director *dir)
113
struct director_host *const *hosts;
114
unsigned int count, self_idx;
131
struct director_host *const *hosts, *host;
132
unsigned int i, count, self_idx;
116
134
hosts = array_get(&dir->dir_hosts, &count);
120
140
self_idx = director_find_self_idx(dir);
121
return hosts[(self_idx + 1) % count];
141
for (i = 0; i < count; i++) {
142
host = hosts[(self_idx + i + 1) % count];
146
/* self, with some removed hosts */
150
static bool director_wait_for_others(struct director *dir)
152
struct director_host *const *hostp;
154
/* don't assume we're alone until we've attempted to connect
155
to others for a while */
156
if (dir->ring_first_alone != 0 &&
157
ioloop_time - dir->ring_first_alone > DIRECTOR_RING_MIN_WAIT_SECS)
160
if (dir->ring_first_alone == 0)
161
dir->ring_first_alone = ioloop_time;
162
/* reset all failures and try again */
163
array_foreach(&dir->dir_hosts, hostp) {
164
(*hostp)->last_network_failure = 0;
165
(*hostp)->last_protocol_failure = 0;
167
if (dir->to_reconnect != NULL)
168
timeout_remove(&dir->to_reconnect);
169
dir->to_reconnect = timeout_add(DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS,
170
director_connect, dir);
124
174
void director_connect(struct director *dir)
135
184
for (i = 1; i < count; i++) {
136
185
unsigned int idx = (self_idx + i) % count;
138
if (hosts[idx]->last_failed +
187
if (hosts[idx]->removed)
190
if (hosts[idx]->last_network_failure +
139
191
DIRECTOR_RECONNECT_RETRY_SECS > ioloop_time) {
140
/* failed recently, don't try retrying here */
144
if (director_connect_host(dir, hosts[idx]) == 0)
148
/* we're the only one */
150
i_debug("director: Couldn't connect to right side, "
151
"we must be the only director left");
153
if (dir->left != NULL) {
154
/* since we couldn't connect to it,
155
it must have failed recently */
156
director_connection_deinit(&dir->left);
158
if (!dir->ring_handshaked)
159
director_set_ring_handshaked(dir);
161
director_set_ring_synced(dir);
192
/* connection failed recently, don't try retrying here */
195
if (hosts[idx]->last_protocol_failure +
196
DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS > ioloop_time) {
197
/* the director recently sent invalid protocol data,
198
don't try retrying yet */
202
if (director_connect_host(dir, hosts[idx]) == 0) {
208
if (count > 1 && director_wait_for_others(dir))
211
/* we're the only one */
213
i_warning("director: Couldn't connect to right side, "
214
"we must be the only director left");
216
if (dir->left != NULL) {
217
/* since we couldn't connect to it,
218
it must have failed recently */
219
director_connection_deinit(&dir->left);
221
dir->ring_min_version = DIRECTOR_VERSION_MINOR;
222
if (!dir->ring_handshaked)
223
director_set_ring_handshaked(dir);
225
director_set_ring_synced(dir);
165
228
void director_set_ring_handshaked(struct director *dir)
214
279
host = dir->right == NULL ? NULL :
215
280
director_connection_get_host(dir->right);
282
if (dir->to_reconnect != NULL)
283
timeout_remove(&dir->to_reconnect);
216
284
if (host != director_get_preferred_right_host(dir)) {
217
285
/* try to reconnect to preferred host later */
218
if (dir->to_reconnect == NULL) {
220
timeout_add(DIRECTOR_RECONNECT_TIMEOUT_MSECS,
221
director_reconnect_timeout, dir);
224
if (dir->to_reconnect != NULL)
225
timeout_remove(&dir->to_reconnect);
287
timeout_add(DIRECTOR_RECONNECT_TIMEOUT_MSECS,
288
director_reconnect_timeout, dir);
291
if (dir->left != NULL)
292
director_connection_set_synced(dir->left, TRUE);
293
if (dir->right != NULL)
294
director_connection_set_synced(dir->right, TRUE);
295
if (dir->to_sync != NULL)
296
timeout_remove(&dir->to_sync);
228
297
dir->ring_synced = TRUE;
298
dir->ring_last_sync_time = ioloop_time;
229
299
director_set_state_changed(dir);
302
void director_sync_send(struct director *dir, struct director_host *host,
303
uint32_t seq, unsigned int minor_version)
307
str = t_str_new(128);
308
str_printfa(str, "SYNC\t%s\t%u\t%u",
309
net_ip2addr(&host->ip), host->port, seq);
310
if (minor_version > 0 &&
311
director_connection_get_minor_version(dir->right) > 0) {
312
/* only minor_version>0 supports this parameter */
313
str_printfa(str, "\t%u", minor_version);
315
str_append_c(str, '\n');
316
director_connection_send(dir->right, str_c(str));
318
/* ping our connections in case either of them are hanging.
319
if they are, we want to know it fast. */
320
if (dir->left != NULL)
321
director_connection_ping(dir->left);
322
if (dir->right != NULL)
323
director_connection_ping(dir->right);
326
bool director_resend_sync(struct director *dir)
328
if (!dir->ring_synced && dir->left != NULL && dir->right != NULL) {
329
/* send a new SYNC in case the previous one got dropped */
330
director_sync_send(dir, dir->self_host, dir->sync_seq,
331
DIRECTOR_VERSION_MINOR);
332
if (dir->to_sync != NULL)
333
timeout_reset(dir->to_sync);
339
static void director_sync_timeout(struct director *dir)
341
i_assert(!dir->ring_synced);
343
if (director_resend_sync(dir))
344
i_error("Ring SYNC appears to have got lost, resending");
347
void director_set_ring_unsynced(struct director *dir)
349
if (dir->ring_synced) {
350
dir->ring_synced = FALSE;
351
dir->ring_last_sync_time = ioloop_time;
354
if (dir->to_sync == NULL) {
355
dir->to_sync = timeout_add(DIRECTOR_SYNC_TIMEOUT_MSECS,
356
director_sync_timeout, dir);
358
timeout_reset(dir->to_sync);
232
362
static void director_sync(struct director *dir)
364
/* we're synced again when we receive this SYNC back */
366
director_set_ring_unsynced(dir);
234
368
if (dir->sync_frozen) {
235
369
dir->sync_pending = TRUE;
244
/* we're synced again when we receive this SYNC back */
246
dir->ring_synced = FALSE;
248
378
if (dir->debug) {
249
379
i_debug("Ring is desynced (seq=%u, sending SYNC to %s)",
250
380
dir->sync_seq, dir->right == NULL ? "(nowhere)" :
251
381
director_connection_get_name(dir->right));
384
/* send PINGs to our connections more rapidly until we've synced again.
385
if the connection has actually died, we don't need to wait (and
386
delay requests) for as long to detect it */
254
387
if (dir->left != NULL)
255
director_connection_wait_sync(dir->left);
256
director_connection_wait_sync(dir->right);
257
director_connection_send(dir->right, t_strdup_printf(
258
"SYNC\t%s\t%u\t%u\n", net_ip2addr(&dir->self_ip),
259
dir->self_port, dir->sync_seq));
388
director_connection_set_synced(dir->left, FALSE);
389
director_connection_set_synced(dir->right, FALSE);
390
director_sync_send(dir, dir->self_host, dir->sync_seq,
391
DIRECTOR_VERSION_MINOR);
262
394
void director_sync_freeze(struct director *dir)
396
struct director_connection *const *connp;
264
398
i_assert(!dir->sync_frozen);
265
399
i_assert(!dir->sync_pending);
267
if (dir->left != NULL)
268
director_connection_cork(dir->left);
269
if (dir->right != NULL)
270
director_connection_cork(dir->right);
401
array_foreach(&dir->connections, connp)
402
director_connection_cork(*connp);
271
403
dir->sync_frozen = TRUE;
274
406
void director_sync_thaw(struct director *dir)
408
struct director_connection *const *connp;
276
410
i_assert(dir->sync_frozen);
278
412
dir->sync_frozen = FALSE;
280
414
dir->sync_pending = FALSE;
281
415
director_sync(dir);
283
if (dir->left != NULL)
284
director_connection_uncork(dir->left);
285
if (dir->right != NULL)
286
director_connection_uncork(dir->right);
417
array_foreach(&dir->connections, connp)
418
director_connection_uncork(*connp);
421
void director_notify_ring_added(struct director_host *added_host,
422
struct director_host *src)
426
cmd = t_strdup_printf("DIRECTOR\t%s\t%u\n",
427
net_ip2addr(&added_host->ip), added_host->port);
428
director_update_send(added_host->dir, src, cmd);
431
static void director_delayed_dir_remove_timeout(struct director *dir)
433
struct director_host *const *hosts, *host;
434
unsigned int i, count;
436
timeout_remove(&dir->to_remove_dirs);
438
hosts = array_get(&dir->dir_hosts, &count);
439
for (i = 0; i < count; ) {
440
if (hosts[i]->removed) {
442
director_host_free(&host);
443
hosts = array_get(&dir->dir_hosts, &count);
450
void director_ring_remove(struct director_host *removed_host,
451
struct director_host *src)
453
struct director *dir = removed_host->dir;
454
struct director_connection *const *conns, *conn;
455
unsigned int i, count;
458
if (removed_host->self) {
459
/* others will just disconnect us */
463
/* mark the host as removed and fully remove it later. this delay is
464
needed, because the removal may trigger director reconnections,
465
which may send the director back and we don't want to re-add it */
466
removed_host->removed = TRUE;
467
if (dir->to_remove_dirs == NULL) {
468
dir->to_remove_dirs =
469
timeout_add(DIRECTOR_DELAYED_DIR_REMOVE_MSECS,
470
director_delayed_dir_remove_timeout, dir);
473
/* disconnect any connections to the host */
474
conns = array_get(&dir->connections, &count);
475
for (i = 0; i < count; ) {
477
if (director_connection_get_host(conn) != removed_host)
480
director_connection_deinit(&conn);
481
conns = array_get(&dir->connections, &count);
484
if (dir->right == NULL)
485
director_connect(dir);
487
cmd = t_strdup_printf("DIRECTOR-REMOVE\t%s\t%u\n",
488
net_ip2addr(&removed_host->ip),
490
director_update_send_version(dir, src,
491
DIRECTOR_VERSION_RING_REMOVE, cmd);
289
494
void director_update_host(struct director *dir, struct director_host *src,
346
551
void director_update_user(struct director *dir, struct director_host *src,
347
552
struct user *user)
554
i_assert(src != NULL);
556
i_assert(!user->weak);
557
director_update_send(dir, src, t_strdup_printf("USER\t%u\t%s\n",
558
user->username_hash, net_ip2addr(&user->host->ip)));
561
void director_update_user_weak(struct director *dir, struct director_host *src,
562
struct director_host *orig_src,
565
i_assert(src != NULL);
566
i_assert(user->weak);
568
if (orig_src == NULL) {
569
orig_src = dir->self_host;
570
orig_src->last_seq++;
349
573
director_update_send(dir, src, t_strdup_printf(
350
"USER\t%u\t%s\n", user->username_hash,
351
net_ip2addr(&user->host->ip)));
574
"USER-WEAK\t%s\t%u\t%u\t%u\t%s\n",
575
net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
576
user->username_hash, net_ip2addr(&user->host->ip)));
354
579
struct director_user_kill_finish_ctx {
562
787
void director_update_send(struct director *dir, struct director_host *src,
790
director_update_send_version(dir, src, 0, cmd);
793
void director_update_send_version(struct director *dir,
794
struct director_host *src,
795
unsigned int min_version, const char *cmd)
797
struct director_connection *const *connp;
565
799
i_assert(src != NULL);
567
if (dir->left != NULL)
568
director_connection_send_except(dir->left, src, cmd);
569
if (dir->right != NULL && dir->right != dir->left)
570
director_connection_send_except(dir->right, src, cmd);
801
array_foreach(&dir->connections, connp) {
802
if (director_connection_get_host(*connp) != src &&
803
director_connection_get_minor_version(*connp) >= min_version)
804
director_connection_send(*connp, cmd);
573
808
struct director *
585
819
dir->state_change_callback = callback;
586
820
i_array_init(&dir->dir_hosts, 16);
587
821
i_array_init(&dir->pending_requests, 16);
588
dir->users = user_directory_init(set->director_user_expire);
822
i_array_init(&dir->connections, 8);
823
dir->users = user_directory_init(set->director_user_expire,
824
set->director_username_hash);
589
825
dir->mail_hosts = mail_hosts_init();
591
path = t_strconcat(set->base_dir, "/" DIRECTOR_IPC_PROXY_PATH, NULL);
592
dir->ipc_proxy = ipc_client_init(path);
827
dir->ipc_proxy = ipc_client_init(DIRECTOR_IPC_PROXY_PATH);
828
dir->ring_min_version = DIRECTOR_VERSION_MINOR;
596
832
void director_deinit(struct director **_dir)
598
834
struct director *dir = *_dir;
599
struct director_host *const *hostp;
835
struct director_host *const *hostp, *host;
836
struct director_connection *conn, *const *connp;
603
director_connections_deinit(dir);
840
while (array_count(&dir->connections) > 0) {
841
connp = array_idx(&dir->connections, 0);
843
director_connection_deinit(&conn);
604
846
user_directory_deinit(&dir->users);
605
847
mail_hosts_deinit(&dir->mail_hosts);
606
848
mail_hosts_deinit(&dir->orig_config_hosts);
612
854
timeout_remove(&dir->to_handshake_warning);
613
855
if (dir->to_request != NULL)
614
856
timeout_remove(&dir->to_request);
615
array_foreach(&dir->dir_hosts, hostp)
616
director_host_free(*hostp);
857
if (dir->to_sync != NULL)
858
timeout_remove(&dir->to_sync);
859
if (dir->to_remove_dirs != NULL)
860
timeout_remove(&dir->to_remove_dirs);
861
while (array_count(&dir->dir_hosts) > 0) {
862
hostp = array_idx(&dir->dir_hosts, 0);
864
director_host_free(&host);
617
866
array_free(&dir->pending_requests);
618
867
array_free(&dir->dir_hosts);
868
array_free(&dir->connections);