~james-page/ubuntu/raring/dovecot/autopkgtest

« back to all changes in this revision

Viewing changes to src/director/director-connection.c

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2012-06-11 11:11:54 UTC
  • mfrom: (1.15.2) (4.1.27 sid)
  • Revision ID: package-import@ubuntu.com-20120611111154-678cwbdj6ktgsv1h
Tags: 1:2.1.7-1ubuntu1
* Merge from Debian unstable, remaining changes:
  + Add mail-stack-delivery package:
    - Update d/rules
    - d/control: convert existing dovecot-postfix package to a dummy
      package and add new mail-stack-delivery package.
    - Update maintainer scripts.
    - Rename d/dovecot-postfix.* to debian/mail-stack-delivery.*
    - d/mail-stack-delivery.preinst: Move previously installed backups and
      config files to a new package namespace.
    - d/mail-stack-delivery.prerm: Added to handle downgrades.
  + Use Snakeoil SSL certificates by default:
    - d/control: Depend on ssl-cert.
    - d/dovecot-core.postinst: Relax grep for SSL_* a bit.
  + Add autopkgtest to debian/tests/*.
  + Add ufw integration:
    - d/dovecot-core.ufw.profile: new ufw profile.
    - d/rules: install profile in dovecot-core.
    - d/control: dovecot-core - suggest ufw.
  + d/{control,rules}: enable PIE hardening.
  + d/dovecot-core.dirs: Added usr/share/doc/dovecot-core
  + Add apport hook:
    - d/rules, d/source_dovecot.py
  + Add upstart job:
    - d/rules, d/dovecot-core.dovecot.upstart, d/control,
      d/dovecot-core.dirs, dovecot-imapd.{postrm, postinst, prerm},
      d/dovecot-pop3d.{postinst, postrm, prerm}.
      d/mail-stack-deliver.postinst: Convert init script to upstart.
  + d/control: Added Pre-Depends: dpkg (>= 1.15.6) to dovecot-dbg to support
    xz compression in Ubuntu.
  + d/control: Demote dovecot-common Recommends: to Suggests: to prevent
    install of extra packages on upgrade.
  + d/patches/dovecot-drac.patch: Updated with version for dovecot >= 2.0.0.
  + d/control: Drop B-D on systemd.
* Dropped changes:
  + d/patches/fix-racey-restart.patch: part of 2.1.x, no longer required.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2010-2011 Dovecot authors, see the included COPYING file */
 
1
/* Copyright (c) 2010-2012 Dovecot authors, see the included COPYING file */
 
2
 
 
3
/*
 
4
   Handshaking:
 
5
 
 
6
   Incoming director connections send:
 
7
 
 
8
   VERSION
 
9
   ME
 
10
   <wait for DONE from remote handshake>
 
11
   DONE
 
12
   <make this connection our "left" connection, potentially disconnecting
 
13
   another one>
 
14
 
 
15
   Outgoing director connections send:
 
16
 
 
17
   VERSION
 
18
   ME
 
19
   [0..n] DIRECTOR
 
20
   HOST-HAND-START
 
21
   [0..n] HOST
 
22
   HOST-HAND-END
 
23
   [0..n] USER
 
24
   <possibly other non-handshake commands between USERs>
 
25
   DONE
 
26
   <wait for DONE from remote>
 
27
   <make this connection our "right" connection, potentially disconnecting
 
28
   another one>
 
29
*/
2
30
 
3
31
#include "lib.h"
4
32
#include "ioloop.h"
7
35
#include "istream.h"
8
36
#include "ostream.h"
9
37
#include "str.h"
10
 
#include "llist.h"
11
38
#include "master-service.h"
12
39
#include "mail-host.h"
13
40
#include "director.h"
19
46
#include <stdlib.h>
20
47
#include <unistd.h>
21
48
 
22
 
#define DIRECTOR_VERSION_NAME "director"
23
 
#define DIRECTOR_VERSION_MAJOR 1
24
 
#define DIRECTOR_VERSION_MINOR 0
25
 
 
26
49
#define MAX_INBUF_SIZE 1024
27
50
#define MAX_OUTBUF_SIZE (1024*1024*10)
28
51
#define OUTBUF_FLUSH_THRESHOLD (1024*128)
29
 
/* Max idling time while connecting/handshaking before disconnecting */
30
 
#define DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS (2*1000)
31
 
/* How long to wait for PONG after PING request */
32
 
#define DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS (2*1000)
 
52
/* Max idling time before "ME" command must have been received,
 
53
   or we'll disconnect. */
 
54
#define DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS (10*1000)
 
55
/* Max time to wait for USERs in handshake to be sent. With a lot of users the
 
56
   kernel may quickly eat up everything we send, while the receiver is busy
 
57
   parsing the data. */
 
58
#define DIRECTOR_CONNECTION_SEND_USERS_TIMEOUT_MSECS (30*1000)
 
59
/* Max idling time before "DONE" command must have been received,
 
60
   or we'll disconnect. */
 
61
#define DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS (30*1000)
 
62
/* How long to wait for PONG for an idling connection */
 
63
#define DIRECTOR_CONNECTION_PING_IDLE_TIMEOUT_MSECS (10*1000)
 
64
/* Maximum time to wait for PONG reply */
 
65
#define DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS (60*1000)
33
66
/* How long to wait to send PING when connection is idle */
34
67
#define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000)
35
68
/* How long to wait before sending PING while waiting for SYNC reply */
36
 
#define DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS 1000
 
69
#define DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS 1000
37
70
/* If outgoing director connection exists for less than this many seconds,
38
71
   mark the host as failed so we won't try to reconnect to it immediately */
39
 
#define DIRECTOR_SUCCESS_MIN_CONNECT_SECS 10
 
72
#define DIRECTOR_SUCCESS_MIN_CONNECT_SECS 40
 
73
#define DIRECTOR_WAIT_DISCONNECT_SECS 10
 
74
 
 
75
#if DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS <= DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS
 
76
#  error DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS is too low
 
77
#endif
 
78
 
 
79
#if DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS <= DIRECTOR_CONNECTION_PING_IDLE_TIMEOUT_MSECS
 
80
#  error DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS is too low
 
81
#endif
 
82
 
 
83
#define CMD_IS_USER_HANDHAKE(args) \
 
84
        (str_array_length(args) > 2)
40
85
 
41
86
struct director_connection {
42
 
        struct director_connection *prev, *next;
43
 
 
44
87
        struct director *dir;
45
88
        char *name;
46
89
        time_t created;
 
90
        unsigned int minor_version;
47
91
 
48
92
        /* for incoming connections the director host isn't known until
49
93
           ME-line is received */
53
97
        struct io *io;
54
98
        struct istream *input;
55
99
        struct ostream *output;
56
 
        struct timeout *to, *to_ping;
 
100
        struct timeout *to_disconnect, *to_ping, *to_pong;
57
101
 
58
102
        struct user_directory_iter *user_iter;
59
103
 
 
104
        /* set during command execution */
 
105
        const char *cur_cmd, *cur_line;
 
106
 
60
107
        unsigned int in:1;
61
108
        unsigned int connected:1;
62
109
        unsigned int version_received:1;
65
112
        unsigned int ignore_host_events:1;
66
113
        unsigned int handshake_sending_hosts:1;
67
114
        unsigned int ping_waiting:1;
68
 
        unsigned int sync_ping:1;
 
115
        unsigned int synced:1;
 
116
        unsigned int wrong_host:1;
 
117
        unsigned int verifying_left:1;
69
118
};
70
119
 
71
 
static void director_connection_ping(struct director_connection *conn);
72
120
static void director_connection_disconnected(struct director_connection **conn);
 
121
static void director_connection_reconnect(struct director_connection **conn);
 
122
 
 
123
static void ATTR_FORMAT(2, 3)
 
124
director_cmd_error(struct director_connection *conn, const char *fmt, ...)
 
125
{
 
126
        va_list args;
 
127
 
 
128
        va_start(args, fmt);
 
129
        i_error("director(%s): Command %s: %s (input: %s)", conn->name,
 
130
                conn->cur_cmd, t_strdup_vprintf(fmt, args), conn->cur_line);
 
131
        va_end(args);
 
132
 
 
133
        conn->host->last_protocol_failure = ioloop_time;
 
134
}
 
135
 
 
136
static void
 
137
director_connection_init_timeout(struct director_connection *conn)
 
138
{
 
139
        unsigned int secs = ioloop_time - conn->created;
 
140
 
 
141
        if (!conn->connected) {
 
142
                i_error("director(%s): Connect timed out (%u secs)",
 
143
                        conn->name, secs);
 
144
        } else if (conn->io == NULL) {
 
145
                i_error("director(%s): Sending handshake (%u secs)",
 
146
                        conn->name, secs);
 
147
        } else if (!conn->me_received) {
 
148
                i_error("director(%s): Handshaking ME timed out (%u secs)",
 
149
                        conn->name, secs);
 
150
        } else {
 
151
                i_error("director(%s): Handshaking DONE timed out (%u secs)",
 
152
                        conn->name, secs);
 
153
        }
 
154
        director_connection_disconnected(&conn);
 
155
}
 
156
 
 
157
static void
 
158
director_connection_set_ping_timeout(struct director_connection *conn)
 
159
{
 
160
        unsigned int msecs;
 
161
 
 
162
        msecs = conn->synced || !conn->handshake_received ?
 
163
                DIRECTOR_CONNECTION_PING_INTERVAL_MSECS :
 
164
                DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS;
 
165
 
 
166
        timeout_remove(&conn->to_ping);
 
167
        conn->to_ping = timeout_add(msecs, director_connection_ping, conn);
 
168
}
 
169
 
 
170
static void director_connection_wait_timeout(struct director_connection *conn)
 
171
{
 
172
        director_connection_deinit(&conn);
 
173
}
 
174
 
 
175
static void director_connection_send_connect(struct director_connection *conn,
 
176
                                             struct director_host *host)
 
177
{
 
178
        const char *connect_str;
 
179
 
 
180
        if (conn->to_disconnect != NULL)
 
181
                return;
 
182
 
 
183
        connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
 
184
                                      net_ip2addr(&host->ip), host->port);
 
185
        director_connection_send(conn, connect_str);
 
186
        (void)o_stream_flush(conn->output);
 
187
        o_stream_uncork(conn->output);
 
188
 
 
189
        conn->to_disconnect =
 
190
                timeout_add(DIRECTOR_WAIT_DISCONNECT_SECS*1000,
 
191
                            director_connection_wait_timeout, conn);
 
192
}
 
193
 
 
194
static void director_connection_assigned(struct director_connection *conn)
 
195
{
 
196
        struct director *dir = conn->dir;
 
197
 
 
198
        if (dir->left != NULL && dir->right != NULL) {
 
199
                /* we're connected to both directors. see if the ring is
 
200
                   finished by sending a SYNC. if we get it back, it's done. */
 
201
                dir->sync_seq++;
 
202
                director_set_ring_unsynced(dir);
 
203
                director_sync_send(dir, dir->self_host, dir->sync_seq,
 
204
                                   DIRECTOR_VERSION_MINOR);
 
205
        }
 
206
        director_connection_set_ping_timeout(conn);
 
207
}
 
208
 
 
209
static bool director_connection_assign_left(struct director_connection *conn)
 
210
{
 
211
        struct director *dir = conn->dir;
 
212
 
 
213
        i_assert(conn->in);
 
214
        i_assert(dir->left != conn);
 
215
 
 
216
        /* make sure this is the correct incoming connection */
 
217
        if (conn->host->self) {
 
218
                i_error("Connection from self, dropping");
 
219
                return FALSE;
 
220
        } else if (dir->left == NULL) {
 
221
                /* no conflicts yet */
 
222
        } else if (dir->left->host == conn->host) {
 
223
                i_info("Dropping existing connection %s "
 
224
                       "in favor of its new connection %s",
 
225
                       dir->left->host->name, conn->host->name);
 
226
                director_connection_deinit(&dir->left);
 
227
        } else if (dir->left->verifying_left) {
 
228
                /* we're waiting to verify if our current left is still
 
229
                   working. if we don't receive a PONG, the current left
 
230
                   gets disconnected and a new left gets assigned. if we do
 
231
                   receive a PONG, we'll wait until the current left
 
232
                   disconnects us and then reassign the new left. */
 
233
                return TRUE;
 
234
        } else if (director_host_cmp_to_self(dir->left->host, conn->host,
 
235
                                             dir->self_host) < 0) {
 
236
                /* the old connection is the correct one.
 
237
                   refer the client there (FIXME: do we ever get here?) */
 
238
                i_warning("Director connection %s tried to connect to "
 
239
                          "us, should use %s instead",
 
240
                          conn->name, dir->left->host->name);
 
241
                director_connection_send_connect(conn, dir->left->host);
 
242
                return TRUE;
 
243
        } else {
 
244
                /* this new connection is the correct one, but wait until the
 
245
                   old connection gets disconnected before using this one.
 
246
                   that guarantees that the director inserting itself into
 
247
                   the ring has finished handshaking its left side, so the
 
248
                   switch will be fast. */
 
249
                return TRUE;
 
250
        }
 
251
        dir->left = conn;
 
252
        i_free(conn->name);
 
253
        conn->name = i_strdup_printf("%s/left", conn->host->name);
 
254
        director_connection_assigned(conn);
 
255
        return TRUE;
 
256
}
 
257
 
 
258
static void director_assign_left(struct director *dir)
 
259
{
 
260
        struct director_connection *conn, *const *connp;
 
261
 
 
262
        array_foreach(&dir->connections, connp) {
 
263
                conn = *connp;
 
264
 
 
265
                if (conn->in && conn->handshake_received &&
 
266
                    conn->to_disconnect == NULL && conn != dir->left) {
 
267
                        /* either use this or disconnect it */
 
268
                        if (!director_connection_assign_left(conn)) {
 
269
                                /* we don't want this */
 
270
                                director_connection_deinit(&conn);
 
271
                                director_assign_left(dir);
 
272
                                break;
 
273
                        }
 
274
                }
 
275
        }
 
276
}
 
277
 
 
278
static bool director_has_outgoing_connections(struct director *dir)
 
279
{
 
280
        struct director_connection *const *connp;
 
281
 
 
282
        array_foreach(&dir->connections, connp) {
 
283
                if (!(*connp)->in && (*connp)->to_disconnect == NULL)
 
284
                        return TRUE;
 
285
        }
 
286
        return FALSE;
 
287
}
 
288
 
 
289
static bool director_connection_assign_right(struct director_connection *conn)
 
290
{
 
291
        struct director *dir = conn->dir;
 
292
 
 
293
        i_assert(!conn->in);
 
294
 
 
295
        if (dir->right != NULL) {
 
296
                /* see if we should disconnect or keep the existing
 
297
                   connection. */
 
298
                if (director_host_cmp_to_self(conn->host, dir->right->host,
 
299
                                              dir->self_host) <= 0) {
 
300
                        /* the old connection is the correct one */
 
301
                        i_warning("Aborting incorrect outgoing connection to %s "
 
302
                                  "(already connected to correct one: %s)",
 
303
                                  conn->host->name, dir->right->host->name);
 
304
                        conn->wrong_host = TRUE;
 
305
                        return FALSE;
 
306
                }
 
307
                i_info("Replacing right director connection %s with %s",
 
308
                       dir->right->host->name, conn->host->name);
 
309
                director_connection_deinit(&dir->right);
 
310
        }
 
311
        dir->right = conn;
 
312
        i_free(conn->name);
 
313
        conn->name = i_strdup_printf("%s/right", conn->host->name);
 
314
        director_connection_assigned(conn);
 
315
        return TRUE;
 
316
}
73
317
 
74
318
static bool
75
319
director_args_parse_ip_port(struct director_connection *conn,
76
320
                            const char *const *args,
77
321
                            struct ip_addr *ip_r, unsigned int *port_r)
78
322
{
 
323
        if (args[0] == NULL || args[1] == NULL) {
 
324
                director_cmd_error(conn, "Missing IP+port parameters");
 
325
                return FALSE;
 
326
        }
79
327
        if (net_addr2ip(args[0], ip_r) < 0) {
80
 
                i_error("director(%s): Command has invalid IP address: %s",
81
 
                        conn->name, args[0]);
 
328
                director_cmd_error(conn, "Invalid IP address: %s", args[0]);
82
329
                return FALSE;
83
330
        }
84
331
        if (str_to_uint(args[1], port_r) < 0) {
85
 
                i_error("director(%s): Command has invalid port: %s",
86
 
                        conn->name, args[1]);
 
332
                director_cmd_error(conn, "Invalid port: %s", args[1]);
87
333
                return FALSE;
88
334
        }
89
335
        return TRUE;
93
339
                            const char *const *args)
94
340
{
95
341
        struct director *dir = conn->dir;
96
 
        struct director_host *host;
97
342
        const char *connect_str;
98
343
        struct ip_addr ip;
99
344
        unsigned int port;
 
345
        time_t next_comm_attempt;
100
346
 
101
347
        if (!director_args_parse_ip_port(conn, args, &ip, &port))
102
348
                return FALSE;
 
349
        if (conn->me_received) {
 
350
                director_cmd_error(conn, "Duplicate ME");
 
351
                return FALSE;
 
352
        }
103
353
 
104
354
        if (!conn->in && (!net_ip_compare(&conn->host->ip, &ip) ||
105
355
                          conn->host->port != port)) {
109
359
                        net_ip2addr(&ip), port);
110
360
                return FALSE;
111
361
        }
112
 
        host = director_host_get(dir, &ip, port);
113
 
        /* the host is up now, make sure we can connect to it immediately
114
 
           if needed */
115
 
        host->last_failed = 0;
116
362
        conn->me_received = TRUE;
117
363
 
 
364
        timeout_remove(&conn->to_ping);
 
365
        conn->to_ping = timeout_add(DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS,
 
366
                                    director_connection_init_timeout, conn);
 
367
 
118
368
        if (!conn->in)
119
369
                return TRUE;
120
370
 
121
 
        i_free(conn->name);
122
 
        conn->name = i_strdup_printf("%s/left", host->name);
123
 
        conn->host = host;
 
371
        /* Incoming connection:
 
372
 
 
373
           a) we don't have an established ring yet. make sure we're connecting
 
374
           to our right side (which might become our left side).
 
375
 
 
376
           b) it's our current "left" connection. the previous connection
 
377
           is most likely dead.
 
378
 
 
379
           c) we have an existing ring. tell our current "left" to connect to
 
380
           it with CONNECT command.
 
381
 
 
382
           d) the incoming connection doesn't belong to us at all, refer it
 
383
           elsewhere with CONNECT. however, before disconnecting it verify
 
384
           first that our left side is actually still functional.
 
385
        */
 
386
        i_assert(conn->host == NULL);
 
387
        conn->host = director_host_get(dir, &ip, port);
 
388
        /* the host shouldn't be removed at this point, but if for some
 
389
           reason it is we don't want to crash */
 
390
        conn->host->removed = FALSE;
 
391
        director_host_ref(conn->host);
124
392
        /* make sure we don't keep old sequence values across restarts */
125
 
        host->last_seq = 0;
 
393
        conn->host->last_seq = 0;
126
394
 
127
 
        connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
128
 
                                      net_ip2addr(&host->ip), host->port);
129
 
        /* make sure this is the correct incoming connection */
130
 
        if (host->self) {
131
 
                /* probably we're trying to find our own ip. it's no */
132
 
                i_error("Connection from self, dropping");
 
395
        next_comm_attempt = conn->host->last_protocol_failure +
 
396
                DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS;
 
397
        if (next_comm_attempt > ioloop_time) {
 
398
                /* the director recently sent invalid protocol data,
 
399
                   don't try retrying yet */
 
400
                i_error("director(%s): Remote sent invalid protocol data recently, "
 
401
                        "waiting %u secs before allowing further communication",
 
402
                        conn->name, (unsigned int)(next_comm_attempt-ioloop_time));
133
403
                return FALSE;
134
404
        } else if (dir->left == NULL) {
135
 
                /* no conflicts yet */
136
 
        } else if (dir->left->host == host) {
137
 
                i_warning("Dropping existing connection %s "
138
 
                          "in favor of its new connection %s",
139
 
                          dir->left->host->name, host->name);
 
405
                /* a) - just in case the left is also our right side reset
 
406
                   its failed state, so we can connect to it */
 
407
                conn->host->last_network_failure = 0;
 
408
                if (!director_has_outgoing_connections(dir))
 
409
                        director_connect(dir);
 
410
        } else if (dir->left->host == conn->host) {
 
411
                /* b) */
 
412
                i_assert(dir->left != conn);
140
413
                director_connection_deinit(&dir->left);
141
 
        } else {
142
 
                if (director_host_cmp_to_self(dir->left->host, host,
143
 
                                              dir->self_host) < 0) {
144
 
                        /* the old connection is the correct one.
145
 
                           refer the client there. */
146
 
                        i_warning("Director connection %s tried to connect to "
147
 
                                  "us, should use %s instead",
148
 
                                  host->name, dir->left->host->name);
149
 
                        director_connection_send(conn, t_strdup_printf(
150
 
                                "CONNECT\t%s\t%u\n",
151
 
                                net_ip2addr(&dir->left->host->ip),
152
 
                                dir->left->host->port));
153
 
                        /* also make sure that the connection is alive */
154
 
                        director_connection_ping(dir->left);
155
 
                        return FALSE;
156
 
                }
157
 
 
158
 
                /* this new connection is the correct one. disconnect the old
159
 
                   one, but before that tell it to connect to the new one.
160
 
                   that message might not reach it, so also send the same
161
 
                   message to right side. */
162
 
                i_warning("Replacing director connection %s with %s",
163
 
                          dir->left->host->name, host->name);
 
414
        } else if (director_host_cmp_to_self(conn->host, dir->left->host,
 
415
                                             dir->self_host) < 0) {
 
416
                /* c) */
 
417
                connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
 
418
                                              net_ip2addr(&conn->host->ip),
 
419
                                              conn->host->port);
164
420
                director_connection_send(dir->left, connect_str);
165
 
                (void)o_stream_flush(dir->left->output);
166
 
                director_connection_deinit(&dir->left);
167
 
        }
168
 
        dir->left = conn;
169
 
 
170
 
        /* tell the ring's right side to connect to this new director. */
171
 
        if (dir->right != NULL) {
172
 
                if (dir->left->host != dir->right->host)
173
 
                        director_connection_send(dir->right, connect_str);
174
 
                else {
175
 
                        /* there are only two directors, and we already have
176
 
                           a connection to this server. */
177
 
                }
178
421
        } else {
179
 
                /* there are only two directors. connect to the other one. */
180
 
                (void)director_connect_host(dir, host);
 
422
                /* d) */
 
423
                dir->left->verifying_left = TRUE;
 
424
                director_connection_ping(dir->left);
181
425
        }
182
426
        return TRUE;
183
427
}
184
428
 
185
429
static bool
186
 
director_user_refresh(struct director *dir, unsigned int username_hash,
187
 
                      struct mail_host *host, time_t timestamp,
188
 
                      struct user **user_r)
 
430
director_user_refresh(struct director_connection *conn,
 
431
                      unsigned int username_hash, struct mail_host *host,
 
432
                      time_t timestamp, bool weak, struct user **user_r)
189
433
{
 
434
        struct director *dir = conn->dir;
190
435
        struct user *user;
191
 
        bool ret = FALSE;
 
436
        bool ret = FALSE, unset_weak_user = FALSE;
192
437
 
193
438
        user = user_directory_lookup(dir->users, username_hash);
194
439
        if (user == NULL) {
195
440
                *user_r = user_directory_add(dir->users, username_hash,
196
441
                                             host, timestamp);
 
442
                (*user_r)->weak = weak;
197
443
                return TRUE;
198
444
        }
199
 
        if (timestamp == ioloop_time && (time_t)user->timestamp != timestamp) {
200
 
                user_directory_refresh(dir->users, user);
 
445
 
 
446
        if (user->weak) {
 
447
                if (!weak) {
 
448
                        /* removing user's weakness */
 
449
                        unset_weak_user = TRUE;
 
450
                        user->weak = FALSE;
 
451
                        ret = TRUE;
 
452
                } else {
 
453
                        /* weak user marked again as weak */
 
454
                }
 
455
        } else if (weak &&
 
456
                   !user_directory_user_is_recently_updated(dir->users, user)) {
 
457
                /* mark the user as weak */
 
458
                user->weak = TRUE;
201
459
                ret = TRUE;
202
 
        }
203
 
 
204
 
        if (user->host != host) {
205
 
                i_error("User hash %u is being redirected to two hosts: "
206
 
                        "%s and %s", username_hash,
207
 
                        net_ip2addr(&user->host->ip),
208
 
                        net_ip2addr(&host->ip));
 
460
        } else if (user->host != host) {
 
461
                /* non-weak user received a non-weak update with
 
462
                   conflicting host. this shouldn't happen. */
 
463
                string_t *str = t_str_new(128);
 
464
 
 
465
                str_printfa(str, "User hash %u "
 
466
                            "is being redirected to two hosts: %s and %s",
 
467
                            username_hash, net_ip2addr(&user->host->ip),
 
468
                            net_ip2addr(&host->ip));
 
469
                str_printfa(str, " (old_ts=%ld", (long)user->timestamp);
 
470
 
 
471
                if (!conn->handshake_received) {
 
472
                        str_printfa(str, ",handshaking,recv_ts=%ld",
 
473
                                    (long)timestamp);
 
474
                }
 
475
                if (user->to_move != NULL)
 
476
                        str_append(str, ",moving");
 
477
                if (user->kill_state != USER_KILL_STATE_NONE)
 
478
                        str_printfa(str, ",kill_state=%d", user->kill_state);
 
479
                str_append_c(str, ')');
 
480
                i_error("%s", str_c(str));
209
481
 
210
482
                /* we want all the directors to redirect the user to same
211
483
                   server, but we don't want two directors fighting over which
214
486
                        /* change the host. we'll also need to remove the user
215
487
                           from the old host's user_count, because we can't
216
488
                           keep track of the user for more than one host */
217
 
                        user->host->user_count--;
218
 
                        user->host = host;
219
 
                        user->host->user_count++;
 
489
                } else {
 
490
                        /* keep the host */
 
491
                        host = user->host;
220
492
                }
221
493
                ret = TRUE;
222
494
        }
 
495
        if (user->host != host) {
 
496
                user->host->user_count--;
 
497
                user->host = host;
 
498
                user->host->user_count++;
 
499
                ret = TRUE;
 
500
        }
 
501
        if (timestamp == ioloop_time && (time_t)user->timestamp != timestamp) {
 
502
                user_directory_refresh(dir->users, user);
 
503
                ret = TRUE;
 
504
        }
 
505
 
 
506
        if (unset_weak_user) {
 
507
                /* user is no longer weak. handle pending requests for
 
508
                   this user if there are any */
 
509
                director_set_state_changed(conn->dir);
 
510
        }
 
511
 
223
512
        *user_r = user;
224
513
        return ret;
225
514
}
232
521
        struct ip_addr ip;
233
522
        struct mail_host *host;
234
523
        struct user *user;
 
524
        bool weak;
235
525
 
236
 
        if (str_array_length(args) != 3 ||
 
526
        if (str_array_length(args) < 3 ||
237
527
            str_to_uint(args[0], &username_hash) < 0 ||
238
528
            net_addr2ip(args[1], &ip) < 0 ||
239
529
            str_to_uint(args[2], &timestamp) < 0) {
240
 
                i_error("director(%s): Invalid USER handshake args",
241
 
                        conn->name);
 
530
                director_cmd_error(conn, "Invalid parameters");
242
531
                return FALSE;
243
532
        }
 
533
        weak = args[3] != NULL && args[3][0] == 'w';
244
534
 
245
535
        host = mail_host_lookup(conn->dir->mail_hosts, &ip);
246
536
        if (host == NULL) {
249
539
                return FALSE;
250
540
        }
251
541
 
252
 
        director_user_refresh(conn->dir, username_hash, host, timestamp, &user);
 
542
        director_user_refresh(conn, username_hash, host, timestamp, weak, &user);
253
543
        return TRUE;
254
544
}
255
545
 
256
546
static bool
257
 
director_cmd_user(struct director_connection *conn, const char *const *args)
 
547
director_cmd_user(struct director_connection *conn,
 
548
                  const char *const *args)
258
549
{
259
550
        unsigned int username_hash;
260
551
        struct ip_addr ip;
261
552
        struct mail_host *host;
262
553
        struct user *user;
263
554
 
 
555
        /* NOTE: if more parameters are added, update also
 
556
           CMD_IS_USER_HANDHAKE() macro */
264
557
        if (str_array_length(args) != 2 ||
265
558
            str_to_uint(args[0], &username_hash) < 0 ||
266
559
            net_addr2ip(args[1], &ip) < 0) {
267
 
                i_error("director(%s): Invalid USER args", conn->name);
 
560
                director_cmd_error(conn, "Invalid parameters");
268
561
                return FALSE;
269
562
        }
270
563
 
274
567
                return TRUE;
275
568
        }
276
569
 
277
 
        if (director_user_refresh(conn->dir, username_hash,
278
 
                                  host, ioloop_time, &user))
 
570
        if (director_user_refresh(conn, username_hash,
 
571
                                  host, ioloop_time, FALSE, &user)) {
 
572
                i_assert(!user->weak);
279
573
                director_update_user(conn->dir, conn->host, user);
 
574
        }
280
575
        return TRUE;
281
576
}
282
577
 
286
581
        struct director_host *host;
287
582
        struct ip_addr ip;
288
583
        unsigned int port;
 
584
        bool forward = FALSE;
289
585
 
290
586
        if (!director_args_parse_ip_port(conn, args, &ip, &port))
291
587
                return FALSE;
292
588
 
293
589
        host = director_host_lookup(conn->dir, &ip, port);
294
590
        if (host != NULL) {
295
 
                /* already have this. just reset its last_failed timestamp,
296
 
                   since it might be up now. */
297
 
                host->last_failed = 0;
298
 
                return TRUE;
299
 
        }
300
 
 
301
 
        /* save the director and forward it */
302
 
        director_host_add(conn->dir, &ip, port);
303
 
        director_connection_send(conn->dir->right,
304
 
                t_strdup_printf("DIRECTOR\t%s\t%u\n", net_ip2addr(&ip), port));
 
591
                if (host == conn->dir->self_host) {
 
592
                        /* ignore updates to ourself */
 
593
                        return TRUE;
 
594
                }
 
595
                if (host->removed) {
 
596
                        /* ignore re-adds of removed directors */
 
597
                        return TRUE;
 
598
                }
 
599
 
 
600
                /* already have this. just reset its last_network_failure
 
601
                   timestamp, since it might be up now. */
 
602
                host->last_network_failure = 0;
 
603
                if (host->last_seq != 0) {
 
604
                        /* it also may have been restarted, reset last_seq */
 
605
                        host->last_seq = 0;
 
606
                        forward = TRUE;
 
607
                }
 
608
        } else {
 
609
                /* save the director and forward it */
 
610
                host = director_host_add(conn->dir, &ip, port);
 
611
                forward = TRUE;
 
612
        }
 
613
        if (forward) {
 
614
                director_notify_ring_added(host,
 
615
                        director_connection_get_host(conn));
 
616
        }
 
617
        return TRUE;
 
618
}
 
619
 
 
620
static bool director_cmd_director_remove(struct director_connection *conn,
 
621
                                         const char *const *args)
 
622
{
 
623
        struct director_host *host;
 
624
        struct ip_addr ip;
 
625
        unsigned int port;
 
626
 
 
627
        if (!director_args_parse_ip_port(conn, args, &ip, &port))
 
628
                return FALSE;
 
629
 
 
630
        host = director_host_lookup(conn->dir, &ip, port);
 
631
        if (host != NULL && !host->removed)
 
632
                director_ring_remove(host, director_connection_get_host(conn));
305
633
        return TRUE;
306
634
}
307
635
 
313
641
        struct mail_host *const *hostp;
314
642
        unsigned int remote_ring_completed;
315
643
 
316
 
        if (args == NULL || str_to_uint(args[0], &remote_ring_completed) < 0) {
317
 
                i_error("director(%s): Invalid HOST-HAND-START args",
318
 
                        conn->name);
 
644
        if (args[0] == NULL ||
 
645
            str_to_uint(args[0], &remote_ring_completed) < 0) {
 
646
                director_cmd_error(conn, "Invalid parameters");
319
647
                return FALSE;
320
648
        }
321
649
 
348
676
            net_addr2ip(args[0], &ip) < 0 ||
349
677
            str_to_uint(args[1], &port) < 0 ||
350
678
            str_to_uint(args[2], &seq) < 0) {
351
 
                i_error("director(%s): Command is missing parameters: %s",
352
 
                        conn->name, t_strarray_join(args, " "));
 
679
                director_cmd_error(conn, "Invalid parameters");
353
680
                return -1;
354
681
        }
355
682
        *_args = args + 3;
356
683
 
357
684
        host = director_host_lookup(conn->dir, &ip, port);
358
 
        if (host == NULL) {
 
685
        if (host == NULL || host->removed) {
359
686
                /* director is already gone, but we can't be sure if this
360
687
                   command was sent everywhere. re-send it as if it was from
361
688
                   ourself. */
372
699
}
373
700
 
374
701
static bool
 
702
director_cmd_user_weak(struct director_connection *conn,
 
703
                       const char *const *args)
 
704
{
 
705
        struct director_host *dir_host;
 
706
        struct ip_addr ip;
 
707
        unsigned int username_hash;
 
708
        struct mail_host *host;
 
709
        struct user *user;
 
710
        struct director_host *src_host = conn->host;
 
711
        bool weak = TRUE;
 
712
        int ret;
 
713
 
 
714
        if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) < 0)
 
715
                return FALSE;
 
716
 
 
717
        if (str_array_length(args) != 2 ||
 
718
            str_to_uint(args[0], &username_hash) < 0 ||
 
719
            net_addr2ip(args[1], &ip) < 0) {
 
720
                director_cmd_error(conn, "Invalid parameters");
 
721
                return FALSE;
 
722
        }
 
723
 
 
724
        host = mail_host_lookup(conn->dir->mail_hosts, &ip);
 
725
        if (host == NULL) {
 
726
                /* we probably just removed this host. */
 
727
                return TRUE;
 
728
        }
 
729
 
 
730
        if (ret > 0) {
 
731
                /* The entire ring has seen this USER-WEAK.
 
732
                   make it non-weak now. */
 
733
                weak = FALSE;
 
734
                src_host = conn->dir->self_host;
 
735
        }
 
736
 
 
737
        if (director_user_refresh(conn, username_hash,
 
738
                                  host, ioloop_time, weak, &user)) {
 
739
                if (!user->weak)
 
740
                        director_update_user(conn->dir, src_host, user);
 
741
                else {
 
742
                        director_update_user_weak(conn->dir, src_host,
 
743
                                                  dir_host, user);
 
744
                }
 
745
        }
 
746
        return TRUE;
 
747
}
 
748
 
 
749
static bool
375
750
director_cmd_host_int(struct director_connection *conn, const char *const *args,
376
751
                      struct director_host *dir_host)
377
752
{
383
758
        if (str_array_length(args) != 2 ||
384
759
            net_addr2ip(args[0], &ip) < 0 ||
385
760
            str_to_uint(args[1], &vhost_count) < 0) {
386
 
                i_error("director(%s): Invalid HOST args", conn->name);
 
761
                director_cmd_error(conn, "Invalid parameters");
387
762
                return FALSE;
388
763
        }
389
764
        if (conn->ignore_host_events) {
441
816
 
442
817
        if (str_array_length(args) != 1 ||
443
818
            net_addr2ip(args[0], &ip) < 0) {
444
 
                i_error("director(%s): Invalid HOST-REMOVE args", conn->name);
 
819
                director_cmd_error(conn, "Invalid parameters");
445
820
                return FALSE;
446
821
        }
447
822
 
465
840
 
466
841
        if (str_array_length(args) != 1 ||
467
842
            net_addr2ip(args[0], &ip) < 0) {
468
 
                i_error("director(%s): Invalid HOST-FLUSH args", conn->name);
 
843
                director_cmd_error(conn, "Invalid parameters");
469
844
                return FALSE;
470
845
        }
471
846
 
491
866
        if (str_array_length(args) != 2 ||
492
867
            str_to_uint(args[0], &username_hash) < 0 ||
493
868
            net_addr2ip(args[1], &ip) < 0) {
494
 
                i_error("director(%s): Invalid USER-MOVE args", conn->name);
 
869
                director_cmd_error(conn, "Invalid parameters");
495
870
                return FALSE;
496
871
        }
497
872
 
511
886
 
512
887
        if (str_array_length(args) != 1 ||
513
888
            str_to_uint(args[0], &username_hash) < 0) {
514
 
                i_error("director(%s): Invalid USER-KILLED args", conn->name);
 
889
                director_cmd_error(conn, "Invalid parameters");
515
890
                return FALSE;
516
891
        }
517
892
 
532
907
 
533
908
        if (str_array_length(args) != 1 ||
534
909
            str_to_uint(args[0], &username_hash) < 0) {
535
 
                i_error("director(%s): Invalid USER-KILLED-EVERYWHERE args",
536
 
                        conn->name);
 
910
                director_cmd_error(conn, "Invalid parameters");
537
911
                return FALSE;
538
912
        }
539
913
 
542
916
        return TRUE;
543
917
}
544
918
 
545
 
static void director_handshake_cmd_done(struct director_connection *conn)
 
919
static bool director_handshake_cmd_done(struct director_connection *conn)
546
920
{
547
921
        struct director *dir = conn->dir;
548
922
 
549
 
        if (dir->debug)
550
 
                i_debug("Handshaked to %s", conn->host->name);
551
 
 
552
 
        conn->host->last_failed = 0;
 
923
        if (dir->debug) {
 
924
                unsigned int secs = time(NULL)-conn->created;
 
925
 
 
926
                i_debug("director(%s): Handshake took %u secs, "
 
927
                        "bytes in=%"PRIuUOFF_T" out=%"PRIuUOFF_T,
 
928
                        conn->name, secs, conn->input->v_offset,
 
929
                        conn->output->offset);
 
930
        }
 
931
 
 
932
        /* the host is up now, make sure we can connect to it immediately
 
933
           if needed */
 
934
        conn->host->last_network_failure = 0;
 
935
 
553
936
        conn->handshake_received = TRUE;
554
937
        if (conn->in) {
555
938
                /* handshaked to left side. tell it we've received the
556
939
                   whole handshake. */
557
940
                director_connection_send(conn, "DONE\n");
558
941
 
559
 
                /* tell the right director about the left one */
560
 
                if (dir->right != NULL) {
561
 
                        director_connection_send(dir->right,
562
 
                                t_strdup_printf("DIRECTOR\t%s\t%u\n",
563
 
                                                net_ip2addr(&conn->host->ip),
564
 
                                                conn->host->port));
565
 
                }
566
 
        }
567
 
 
568
 
        if (dir->left != NULL && dir->right != NULL &&
569
 
            dir->left->handshake_received && dir->right->handshake_received) {
570
 
                /* we're connected to both directors. see if the ring is
571
 
                   finished by sending a SYNC. if we get it back, it's done. */
572
 
                dir->sync_seq++;
573
 
                dir->ring_synced = FALSE;
574
 
                director_connection_send(dir->right,
575
 
                        t_strdup_printf("SYNC\t%s\t%u\t%u\n",
576
 
                                        net_ip2addr(&dir->self_ip),
577
 
                                        dir->self_port, dir->sync_seq));
578
 
        }
579
 
        if (conn->to_ping != NULL)
580
 
                timeout_remove(&conn->to_ping);
581
 
        conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
582
 
                                    director_connection_ping, conn);
 
942
                /* tell the "right" director about the "left" one */
 
943
                director_update_send(dir, director_connection_get_host(conn),
 
944
                        t_strdup_printf("DIRECTOR\t%s\t%u\n",
 
945
                                        net_ip2addr(&conn->host->ip),
 
946
                                        conn->host->port));
 
947
                /* this is our "left" side. */
 
948
                return director_connection_assign_left(conn);
 
949
        } else {
 
950
                /* handshaked to "right" side. */
 
951
                return director_connection_assign_right(conn);
 
952
        }
583
953
}
584
954
 
585
 
static bool
 
955
static int
586
956
director_connection_handle_handshake(struct director_connection *conn,
587
957
                                     const char *cmd, const char *const *args)
588
958
{
589
 
        struct director_host *host;
590
 
        struct ip_addr ip;
591
 
        unsigned int port;
592
 
 
593
959
        /* both incoming and outgoing connections get VERSION and ME */
594
960
        if (strcmp(cmd, "VERSION") == 0 && str_array_length(args) >= 3) {
595
961
                if (strcmp(args[0], DIRECTOR_VERSION_NAME) != 0) {
596
962
                        i_error("director(%s): Wrong protocol in socket "
597
963
                                "(%s vs %s)",
598
964
                                conn->name, args[0], DIRECTOR_VERSION_NAME);
599
 
                        return FALSE;
 
965
                        return -1;
600
966
                } else if (atoi(args[1]) != DIRECTOR_VERSION_MAJOR) {
601
967
                        i_error("director(%s): Incompatible protocol version: "
602
968
                                "%u vs %u", conn->name, atoi(args[1]),
603
969
                                DIRECTOR_VERSION_MAJOR);
604
 
                        return FALSE;
 
970
                        return -1;
605
971
                }
 
972
                conn->minor_version = atoi(args[2]);
606
973
                conn->version_received = TRUE;
607
 
                return TRUE;
 
974
                return 1;
608
975
        }
609
976
        if (!conn->version_received) {
610
 
                i_error("director(%s): Incompatible protocol", conn->name);
611
 
                return FALSE;
612
 
        }
613
 
 
614
 
        if (strcmp(cmd, "ME") == 0 && !conn->me_received &&
615
 
            str_array_length(args) == 2)
616
 
                return director_cmd_me(conn, args);
617
 
 
618
 
        /* only outgoing connections get a CONNECT reference */
619
 
        if (!conn->in && strcmp(cmd, "CONNECT") == 0 &&
620
 
            str_array_length(args) == 2) {
621
 
                /* remote wants us to connect elsewhere */
622
 
                if (!director_args_parse_ip_port(conn, args, &ip, &port))
623
 
                        return FALSE;
624
 
 
625
 
                conn->dir->right = NULL;
626
 
                host = director_host_get(conn->dir, &ip, port);
627
 
                /* reset failure timestamp so we'll actually try to
628
 
                   connect there. */
629
 
                host->last_failed = 0;
630
 
                if (conn->dir->debug)
631
 
                        i_debug("Received CONNECT reference to %s", host->name);
632
 
                (void)director_connect_host(conn->dir, host);
633
 
                return FALSE;
634
 
        }
635
 
        /* only incoming connections get DIRECTOR and HOST lists */
636
 
        if (conn->in && strcmp(cmd, "DIRECTOR") == 0 && conn->me_received)
637
 
                return director_cmd_director(conn, args);
638
 
 
639
 
        if (strcmp(cmd, "HOST") == 0) {
640
 
                /* allow hosts from all connections always,
641
 
                   this could be an host update */
642
 
                if (conn->handshake_sending_hosts)
643
 
                        return director_cmd_host_handshake(conn, args);
644
 
                else
645
 
                        return director_cmd_host(conn, args);
646
 
        }
647
 
        if (conn->handshake_sending_hosts &&
648
 
            strcmp(cmd, "HOST-HAND-END") == 0) {
649
 
                conn->ignore_host_events = FALSE;
650
 
                conn->handshake_sending_hosts = FALSE;
651
 
                return TRUE;
652
 
        }
653
 
        if (conn->in && strcmp(cmd, "HOST-HAND-START") == 0 &&
654
 
            conn->me_received)
655
 
                return director_cmd_host_hand_start(conn, args);
656
 
 
657
 
        /* only incoming connections get a full USER list, but outgoing
658
 
           connections can also receive USER updates during handshake and
659
 
           it wouldn't be safe to ignore them. */
660
 
        if (strcmp(cmd, "USER") == 0 && conn->me_received) {
661
 
                if (conn->in)
662
 
                        return director_handshake_cmd_user(conn, args);
663
 
                else
664
 
                        return director_cmd_user(conn, args);
665
 
        }
 
977
                director_cmd_error(conn, "Incompatible protocol");
 
978
                return -1;
 
979
        }
 
980
 
 
981
        if (strcmp(cmd, "ME") == 0)
 
982
                return director_cmd_me(conn, args) ? 1 : -1;
 
983
        if (!conn->me_received) {
 
984
                director_cmd_error(conn, "Expecting ME command first");
 
985
                return -1;
 
986
        }
 
987
 
 
988
        /* incoming connections get a HOST list */
 
989
        if (conn->handshake_sending_hosts) {
 
990
                if (strcmp(cmd, "HOST") == 0)
 
991
                        return director_cmd_host_handshake(conn, args) ? 1 : -1;
 
992
                if (strcmp(cmd, "HOST-HAND-END") == 0) {
 
993
                        conn->ignore_host_events = FALSE;
 
994
                        conn->handshake_sending_hosts = FALSE;
 
995
                        return 1;
 
996
                }
 
997
                director_cmd_error(conn, "Unexpected command during host list");
 
998
                return -1;
 
999
        }
 
1000
        if (strcmp(cmd, "HOST-HAND-START") == 0) {
 
1001
                if (!conn->in) {
 
1002
                        director_cmd_error(conn,
 
1003
                                "Host list is only for incoming connections");
 
1004
                        return -1;
 
1005
                }
 
1006
                return director_cmd_host_hand_start(conn, args) ? 1 : -1;
 
1007
        }
 
1008
 
 
1009
        if (conn->in && strcmp(cmd, "USER") == 0 && CMD_IS_USER_HANDHAKE(args))
 
1010
                return director_handshake_cmd_user(conn, args) ? 1 : -1;
 
1011
 
666
1012
        /* both get DONE */
667
 
        if (strcmp(cmd, "DONE") == 0 && !conn->handshake_received &&
668
 
            !conn->handshake_sending_hosts) {
669
 
                director_handshake_cmd_done(conn);
670
 
                return TRUE;
671
 
        }
672
 
        i_error("director(%s): Invalid handshake command: %s "
673
 
                "(in=%d me_received=%d)", conn->name, cmd,
674
 
                conn->in, conn->me_received);
675
 
        return FALSE;
 
1013
        if (strcmp(cmd, "DONE") == 0)
 
1014
                return director_handshake_cmd_done(conn) ? 1 : -1;
 
1015
        return 0;
676
1016
}
677
1017
 
678
 
static bool director_connection_sync(struct director_connection *conn,
679
 
                                     const char *const *args, const char *line)
 
1018
static void
 
1019
director_connection_sync_host(struct director_connection *conn,
 
1020
                              struct director_host *host,
 
1021
                              uint32_t seq, unsigned int minor_version)
680
1022
{
681
1023
        struct director *dir = conn->dir;
682
 
        struct director_host *host;
683
 
        struct ip_addr ip;
684
 
        unsigned int port, seq;
685
1024
 
686
 
        if (str_array_length(args) != 3 ||
687
 
            !director_args_parse_ip_port(conn, args, &ip, &port) ||
688
 
            str_to_uint(args[2], &seq) < 0) {
689
 
                i_error("director(%s): Invalid SYNC args", conn->name);
690
 
                return FALSE;
 
1025
        if (minor_version > DIRECTOR_VERSION_MINOR) {
 
1026
                /* we're not up to date */
 
1027
                minor_version = DIRECTOR_VERSION_MINOR;
691
1028
        }
692
1029
 
693
 
        /* find the originating director. if we don't see it, it was already
694
 
           removed and we can ignore this sync. */
695
 
        host = director_host_lookup(dir, &ip, port);
696
 
        if (host == NULL)
697
 
                return TRUE;
698
 
 
699
1030
        if (host->self) {
700
1031
                if (dir->sync_seq != seq) {
701
1032
                        /* stale SYNC event */
702
 
                        return TRUE;
 
1033
                        return;
703
1034
                }
 
1035
                /* sync_seq increases when we get disconnected, so we must be
 
1036
                   successfully connected to both directions */
 
1037
                i_assert(dir->left != NULL && dir->right != NULL);
704
1038
 
 
1039
                dir->ring_min_version = minor_version;
705
1040
                if (!dir->ring_handshaked) {
706
1041
                        /* the ring is handshaked */
707
1042
                        director_set_ring_handshaked(dir);
708
1043
                } else if (dir->ring_synced) {
709
 
                        i_error("Received SYNC from %s (seq=%u) "
710
 
                                "while already synced", conn->name, seq);
711
 
                        return TRUE;
 
1044
                        /* duplicate SYNC (which was sent just in case the
 
1045
                           previous one got lost) */
712
1046
                } else {
713
1047
                        if (dir->debug) {
714
1048
                                i_debug("Ring is synced (%s sent seq=%u)",
716
1050
                        }
717
1051
                        director_set_ring_synced(dir);
718
1052
                }
719
 
                return TRUE;
720
 
        }
721
 
 
722
 
        /* forward it to the connection on right */
723
 
        if (dir->right != NULL) {
724
 
                director_connection_send(dir->right,
725
 
                                         t_strconcat(line, "\n", NULL));
726
 
        }
 
1053
        } else if (dir->right != NULL) {
 
1054
                /* forward it to the connection on right */
 
1055
                director_sync_send(dir, host, seq, minor_version);
 
1056
        }
 
1057
}
 
1058
 
 
1059
static bool director_connection_sync(struct director_connection *conn,
 
1060
                                     const char *const *args)
 
1061
{
 
1062
        struct director *dir = conn->dir;
 
1063
        struct director_host *host;
 
1064
        struct ip_addr ip;
 
1065
        unsigned int port, seq, minor_version = 0;
 
1066
 
 
1067
        if (str_array_length(args) < 3 ||
 
1068
            !director_args_parse_ip_port(conn, args, &ip, &port) ||
 
1069
            str_to_uint(args[2], &seq) < 0) {
 
1070
                director_cmd_error(conn, "Invalid parameters");
 
1071
                return FALSE;
 
1072
        }
 
1073
        if (args[3] != NULL)
 
1074
                minor_version = atoi(args[3]);
 
1075
 
 
1076
        /* find the originating director. if we don't see it, it was already
 
1077
           removed and we can ignore this sync. */
 
1078
        host = director_host_lookup(dir, &ip, port);
 
1079
        if (host != NULL) {
 
1080
                director_connection_sync_host(conn, host, seq,
 
1081
                                              minor_version);
 
1082
        }
 
1083
 
 
1084
        if (host == NULL || !host->self)
 
1085
                director_resend_sync(dir);
727
1086
        return TRUE;
728
1087
}
729
1088
 
737
1096
 
738
1097
        if (str_array_length(args) != 2 ||
739
1098
            !director_args_parse_ip_port(conn, args, &ip, &port)) {
740
 
                i_error("director(%s): Invalid CONNECT args", conn->name);
 
1099
                director_cmd_error(conn, "Invalid parameters");
741
1100
                return FALSE;
742
1101
        }
743
1102
 
744
 
        host = director_host_lookup(dir, &ip, port);
745
 
        if (host == NULL) {
746
 
                i_error("Received CONNECT request to unknown host %s:%u",
747
 
                        net_ip2addr(&ip), port);
748
 
                return TRUE;
749
 
        }
 
1103
        host = director_host_get(conn->dir, &ip, port);
 
1104
        /* reset failure timestamp so we'll actually try to connect there. */
 
1105
        host->last_network_failure = 0;
750
1106
 
751
1107
        /* remote suggests us to connect elsewhere */
752
1108
        if (dir->right != NULL &&
777
1133
        return TRUE;
778
1134
}
779
1135
 
 
1136
static void director_disconnect_wrong_lefts(struct director *dir)
 
1137
{
 
1138
        struct director_connection *const *connp, *conn;
 
1139
 
 
1140
        array_foreach(&dir->connections, connp) {
 
1141
                conn = *connp;
 
1142
 
 
1143
                if (conn->in && conn != dir->left && conn->me_received &&
 
1144
                    conn->to_disconnect == NULL &&
 
1145
                    director_host_cmp_to_self(dir->left->host, conn->host,
 
1146
                                              dir->self_host) < 0) {
 
1147
                        i_warning("Director connection %s tried to connect to "
 
1148
                                  "us, should use %s instead",
 
1149
                                  conn->name, dir->left->host->name);
 
1150
                        director_connection_send_connect(conn, dir->left->host);
 
1151
                }
 
1152
        }
 
1153
}
 
1154
 
780
1155
static bool director_cmd_pong(struct director_connection *conn)
781
1156
{
782
1157
        if (!conn->ping_waiting)
783
1158
                return TRUE;
784
 
 
785
1159
        conn->ping_waiting = FALSE;
786
 
        timeout_remove(&conn->to_ping);
787
 
        conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
788
 
                                    director_connection_ping, conn);
 
1160
        timeout_remove(&conn->to_pong);
 
1161
 
 
1162
        if (conn->verifying_left) {
 
1163
                conn->verifying_left = FALSE;
 
1164
                if (conn == conn->dir->left) {
 
1165
                        /* our left side is functional. tell all the wrong
 
1166
                           incoming connections to connect to it instead. */
 
1167
                        director_disconnect_wrong_lefts(conn->dir);
 
1168
                }
 
1169
        }
 
1170
 
 
1171
        director_connection_set_ping_timeout(conn);
789
1172
        return TRUE;
790
1173
}
791
1174
 
792
1175
static bool
793
 
director_connection_handle_line(struct director_connection *conn,
794
 
                                const char *line)
 
1176
director_connection_handle_cmd(struct director_connection *conn,
 
1177
                               const char *cmd, const char *const *args)
795
1178
{
796
 
        const char *cmd, *const *args;
 
1179
        int ret;
797
1180
 
798
 
        args = t_strsplit(line, "\t");
799
 
        cmd = args[0]; args++;
800
 
        if (cmd == NULL) {
801
 
                i_error("director(%s): Received empty line", conn->name);
802
 
                return FALSE;
 
1181
        if (!conn->handshake_received) {
 
1182
                ret = director_connection_handle_handshake(conn, cmd, args);
 
1183
                if (ret > 0)
 
1184
                        return TRUE;
 
1185
                if (ret < 0) {
 
1186
                        /* invalid commands during handshake,
 
1187
                           we probably don't want to reconnect here */
 
1188
                        return FALSE;
 
1189
                }
 
1190
                /* allow also other commands during handshake */
803
1191
        }
804
1192
 
805
 
        /* ping/pong is always handled */
806
1193
        if (strcmp(cmd, "PING") == 0) {
807
1194
                director_connection_send(conn, "PONG\n");
808
1195
                return TRUE;
809
1196
        }
810
1197
        if (strcmp(cmd, "PONG") == 0)
811
1198
                return director_cmd_pong(conn);
812
 
 
813
 
        if (!conn->handshake_received) {
814
 
                if (!director_connection_handle_handshake(conn, cmd, args)) {
815
 
                        /* invalid commands during handshake,
816
 
                           we probably don't want to reconnect here */
817
 
                        if (conn->dir->debug) {
818
 
                                i_debug("director(%s): Handshaking failed",
819
 
                                        conn->host->name);
820
 
                        }
821
 
                        if (conn->host != NULL)
822
 
                                conn->host->last_failed = ioloop_time;
823
 
                        return FALSE;
824
 
                }
825
 
                return TRUE;
826
 
        }
827
 
 
828
1199
        if (strcmp(cmd, "USER") == 0)
829
1200
                return director_cmd_user(conn, args);
 
1201
        if (strcmp(cmd, "USER-WEAK") == 0)
 
1202
                return director_cmd_user_weak(conn, args);
830
1203
        if (strcmp(cmd, "HOST") == 0)
831
1204
                return director_cmd_host(conn, args);
832
1205
        if (strcmp(cmd, "HOST-REMOVE") == 0)
841
1214
                return director_cmd_user_killed_everywhere(conn, args);
842
1215
        if (strcmp(cmd, "DIRECTOR") == 0)
843
1216
                return director_cmd_director(conn, args);
 
1217
        if (strcmp(cmd, "DIRECTOR-REMOVE") == 0)
 
1218
                return director_cmd_director_remove(conn, args);
844
1219
        if (strcmp(cmd, "SYNC") == 0)
845
 
                return director_connection_sync(conn, args, line);
 
1220
                return director_connection_sync(conn, args);
846
1221
        if (strcmp(cmd, "CONNECT") == 0)
847
1222
                return director_cmd_connect(conn, args);
848
1223
 
849
 
        i_error("director(%s): Unknown command (in this state): %s",
850
 
                conn->name, cmd);
 
1224
        director_cmd_error(conn, "Unknown command %s", cmd);
851
1225
        return FALSE;
852
1226
}
853
1227
 
 
1228
static bool
 
1229
director_connection_handle_line(struct director_connection *conn,
 
1230
                                const char *line)
 
1231
{
 
1232
        const char *cmd, *const *args;
 
1233
        bool ret;
 
1234
 
 
1235
        args = t_strsplit_tab(line);
 
1236
        cmd = args[0]; args++;
 
1237
        if (cmd == NULL) {
 
1238
                director_cmd_error(conn, "Received empty line");
 
1239
                return FALSE;
 
1240
        }
 
1241
 
 
1242
        conn->cur_cmd = cmd;
 
1243
        conn->cur_line = line;
 
1244
        ret = director_connection_handle_cmd(conn, cmd, args);
 
1245
        conn->cur_cmd = NULL;
 
1246
        conn->cur_line = NULL;
 
1247
        return ret;
 
1248
}
 
1249
 
854
1250
static void director_connection_input(struct director_connection *conn)
855
1251
{
856
1252
        struct director *dir = conn->dir;
857
1253
        char *line;
858
1254
        bool ret;
859
1255
 
860
 
        if (conn->to_ping != NULL)
861
 
                timeout_reset(conn->to_ping);
862
1256
        switch (i_stream_read(conn->input)) {
863
1257
        case 0:
864
1258
                return;
871
1265
                return;
872
1266
        case -2:
873
1267
                /* buffer full */
874
 
                i_error("BUG: Director %s sent us more than %d bytes",
875
 
                        conn->name, MAX_INBUF_SIZE);
876
 
                director_connection_disconnected(&conn);
 
1268
                director_cmd_error(conn, "Director sent us more than %d bytes",
 
1269
                                   MAX_INBUF_SIZE);
 
1270
                director_connection_reconnect(&conn);
 
1271
                return;
 
1272
        }
 
1273
 
 
1274
        if (conn->to_disconnect != NULL) {
 
1275
                /* just read everything the remote sends, and wait for it
 
1276
                   to disconnect. we mainly just want the remote to read the
 
1277
                   CONNECT we sent it. */
 
1278
                size_t size;
 
1279
 
 
1280
                (void)i_stream_get_data(conn->input, &size);
 
1281
                i_stream_skip(conn->input, size);
877
1282
                return;
878
1283
        }
879
1284
 
884
1289
                } T_END;
885
1290
 
886
1291
                if (!ret) {
887
 
                        if (dir->debug) {
888
 
                                i_debug("director(%s): Invalid input, disconnecting",
889
 
                                        conn->name);
890
 
                        }
891
 
                        director_connection_disconnected(&conn);
 
1292
                        director_connection_reconnect(&conn);
892
1293
                        break;
893
1294
                }
894
1295
        }
895
1296
        director_sync_thaw(dir);
 
1297
        if (conn != NULL)
 
1298
                timeout_reset(conn->to_ping);
896
1299
}
897
1300
 
898
1301
static void director_connection_send_directors(struct director_connection *conn,
901
1304
        struct director_host *const *hostp;
902
1305
 
903
1306
        array_foreach(&conn->dir->dir_hosts, hostp) {
 
1307
                if ((*hostp)->removed)
 
1308
                        continue;
904
1309
                str_printfa(str, "DIRECTOR\t%s\t%u\n",
905
1310
                            net_ip2addr(&(*hostp)->ip), (*hostp)->port);
906
1311
        }
924
1329
        struct user *user;
925
1330
        int ret;
926
1331
 
927
 
        o_stream_cork(conn->output);
928
1332
        while ((user = user_directory_iter_next(conn->user_iter)) != NULL) {
929
 
                if (!user_directory_user_has_connections(conn->dir->users,
930
 
                                                         user)) {
931
 
                        /* user is already expired */
932
 
                        continue;
933
 
                }
934
 
 
935
1333
                T_BEGIN {
936
 
                        const char *line;
 
1334
                        string_t *str = t_str_new(128);
937
1335
 
938
 
                        line = t_strdup_printf("USER\t%u\t%s\t%u\n",
939
 
                                               user->username_hash,
940
 
                                               net_ip2addr(&user->host->ip),
941
 
                                               user->timestamp);
942
 
                        director_connection_send(conn, line);
 
1336
                        str_printfa(str, "USER\t%u\t%s\t%u",
 
1337
                                    user->username_hash,
 
1338
                                    net_ip2addr(&user->host->ip),
 
1339
                                    user->timestamp);
 
1340
                        if (user->weak)
 
1341
                                str_append(str, "\tw");
 
1342
                        str_append_c(str, '\n');
 
1343
                        director_connection_send(conn, str_c(str));
943
1344
                } T_END;
944
1345
 
945
1346
                if (o_stream_get_buffer_used_size(conn->output) >= OUTBUF_FLUSH_THRESHOLD) {
946
1347
                        if ((ret = o_stream_flush(conn->output)) <= 0) {
947
1348
                                /* continue later */
 
1349
                                timeout_reset(conn->to_ping);
948
1350
                                return ret;
949
1351
                        }
950
1352
                }
952
1354
        user_directory_iter_deinit(&conn->user_iter);
953
1355
        director_connection_send(conn, "DONE\n");
954
1356
 
955
 
        i_assert(conn->io == NULL);
956
 
        conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
957
 
 
958
1357
        ret = o_stream_flush(conn->output);
959
 
        o_stream_uncork(conn->output);
 
1358
        timeout_reset(conn->to_ping);
960
1359
        return ret;
961
1360
}
962
1361
 
963
1362
static int director_connection_output(struct director_connection *conn)
964
1363
{
965
 
        if (conn->user_iter != NULL)
966
 
                return director_connection_send_users(conn);
967
 
        else
968
 
                return o_stream_flush(conn->output);
969
 
}
 
1364
        int ret;
970
1365
 
971
 
static void
972
 
director_connection_init_timeout(struct director_connection *conn)
973
 
{
974
 
        if (conn->host != NULL)
975
 
                conn->host->last_failed = ioloop_time;
976
 
        if (!conn->connected)
977
 
                i_error("director(%s): Connect timed out", conn->name);
978
 
        else
979
 
                i_error("director(%s): Handshaking timed out", conn->name);
980
 
        director_connection_disconnected(&conn);
 
1366
        if (conn->user_iter != NULL) {
 
1367
                /* still handshaking USER list */
 
1368
                o_stream_cork(conn->output);
 
1369
                ret = director_connection_send_users(conn);
 
1370
                o_stream_uncork(conn->output);
 
1371
                if (ret < 0)
 
1372
                        director_connection_disconnected(&conn);
 
1373
                else
 
1374
                        o_stream_set_flush_pending(conn->output, TRUE);
 
1375
                return ret;
 
1376
        }
 
1377
        return o_stream_flush(conn->output);
981
1378
}
982
1379
 
983
1380
static struct director_connection *
991
1388
        conn->dir = dir;
992
1389
        conn->input = i_stream_create_fd(conn->fd, MAX_INBUF_SIZE, FALSE);
993
1390
        conn->output = o_stream_create_fd(conn->fd, MAX_OUTBUF_SIZE, FALSE);
994
 
        o_stream_set_flush_callback(conn->output,
995
 
                                    director_connection_output, conn);
996
 
        conn->to_ping = timeout_add(DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS,
 
1391
        conn->to_ping = timeout_add(DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS,
997
1392
                                    director_connection_init_timeout, conn);
998
 
        DLLIST_PREPEND(&dir->connections, conn);
 
1393
        array_append(&dir->connections, &conn, 1);
999
1394
        return conn;
1000
1395
}
1001
1396
 
1031
1426
        int err;
1032
1427
 
1033
1428
        if ((err = net_geterror(conn->fd)) != 0) {
1034
 
                conn->host->last_failed = ioloop_time;
1035
1429
                i_error("director(%s): connect() failed: %s", conn->name,
1036
1430
                        strerror(err));
1037
1431
                director_connection_disconnected(&conn);
1038
1432
                return;
1039
1433
        }
1040
1434
        conn->connected = TRUE;
1041
 
 
1042
 
        if (dir->right != NULL) {
1043
 
                /* see if we should disconnect or keep the existing
1044
 
                   connection. */
1045
 
                if (director_host_cmp_to_self(conn->host, dir->right->host,
1046
 
                                              dir->self_host) <= 0) {
1047
 
                        /* the old connection is the correct one */
1048
 
                        i_warning("Aborting incorrect outgoing connection to %s "
1049
 
                                  "(already connected to correct one: %s)",
1050
 
                                  conn->host->name, dir->right->host->name);
1051
 
                        director_connection_deinit(&conn);
1052
 
                        return;
1053
 
                }
1054
 
                i_warning("Replacing director connection %s with %s",
1055
 
                          dir->right->host->name, conn->host->name);
1056
 
                director_connection_deinit(&dir->right);
1057
 
        }
1058
 
        dir->right = conn;
1059
 
        i_free(conn->name);
1060
 
        conn->name = i_strdup_printf("%s/right", conn->host->name);
 
1435
        o_stream_set_flush_callback(conn->output,
 
1436
                                    director_connection_output, conn);
1061
1437
 
1062
1438
        io_remove(&conn->io);
1063
 
 
 
1439
        conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
 
1440
 
 
1441
        timeout_remove(&conn->to_ping);
 
1442
        conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SEND_USERS_TIMEOUT_MSECS,
 
1443
                                    director_connection_init_timeout, conn);
 
1444
 
 
1445
        o_stream_cork(conn->output);
1064
1446
        director_connection_send_handshake(conn);
1065
1447
        director_connection_send_directors(conn, str);
1066
1448
        director_connection_send_hosts(conn, str);
1067
1449
        director_connection_send(conn, str_c(str));
1068
1450
 
1069
1451
        conn->user_iter = user_directory_iter_init(dir->users);
1070
 
        (void)director_connection_send_users(conn);
 
1452
        if (director_connection_send_users(conn) == 0)
 
1453
                o_stream_set_flush_pending(conn->output, TRUE);
 
1454
        o_stream_uncork(conn->output);
1071
1455
}
1072
1456
 
1073
1457
struct director_connection *
1076
1460
{
1077
1461
        struct director_connection *conn;
1078
1462
 
 
1463
        i_assert(!host->removed);
 
1464
 
1079
1465
        /* make sure we don't keep old sequence values across restarts */
1080
1466
        host->last_seq = 0;
1081
1467
 
1082
1468
        conn = director_connection_init_common(dir, fd);
1083
1469
        conn->name = i_strdup_printf("%s/out", host->name);
1084
1470
        conn->host = host;
1085
 
        /* use IO_READ instead of IO_WRITE, so that we don't assign
1086
 
           dir->right until remote has actually sent some data */
1087
 
        conn->io = io_add(conn->fd, IO_READ,
 
1471
        director_host_ref(host);
 
1472
        conn->io = io_add(conn->fd, IO_WRITE,
1088
1473
                          director_connection_connected, conn);
1089
1474
        return conn;
1090
1475
}
1091
1476
 
1092
1477
void director_connection_deinit(struct director_connection **_conn)
1093
1478
{
1094
 
        struct director_connection *conn = *_conn;
 
1479
        struct director_connection *const *conns, *conn = *_conn;
1095
1480
        struct director *dir = conn->dir;
 
1481
        unsigned int i, count;
1096
1482
 
1097
1483
        *_conn = NULL;
1098
1484
 
1099
1485
        if (dir->debug && conn->host != NULL)
1100
1486
                i_debug("Disconnecting from %s", conn->host->name);
1101
1487
 
1102
 
        if (conn->host != NULL && !conn->in &&
1103
 
            conn->created + DIRECTOR_SUCCESS_MIN_CONNECT_SECS > ioloop_time)
1104
 
                conn->host->last_failed = ioloop_time;
1105
 
 
1106
 
        DLLIST_REMOVE(&dir->connections, conn);
1107
 
        if (dir->left == conn)
 
1488
        conns = array_get(&dir->connections, &count);
 
1489
        for (i = 0; i < count; i++) {
 
1490
                if (conns[i] == conn) {
 
1491
                        array_delete(&dir->connections, i, 1);
 
1492
                        break;
 
1493
                }
 
1494
        }
 
1495
        i_assert(i < count);
 
1496
        if (dir->left == conn) {
1108
1497
                dir->left = NULL;
 
1498
                /* if there is already another handshaked incoming connection,
 
1499
                   use it as the new "left" */
 
1500
                director_assign_left(dir);
 
1501
        }
1109
1502
        if (dir->right == conn)
1110
1503
                dir->right = NULL;
 
1504
        if (conn->host != NULL)
 
1505
                director_host_unref(conn->host);
1111
1506
 
1112
1507
        if (conn->user_iter != NULL)
1113
1508
                user_directory_iter_deinit(&conn->user_iter);
1114
 
        if (conn->to != NULL)
1115
 
                timeout_remove(&conn->to);
1116
 
        if (conn->to_ping != NULL)
1117
 
                timeout_remove(&conn->to_ping);
 
1509
        if (conn->to_disconnect != NULL)
 
1510
                timeout_remove(&conn->to_disconnect);
 
1511
        if (conn->to_pong != NULL)
 
1512
                timeout_remove(&conn->to_pong);
 
1513
        timeout_remove(&conn->to_ping);
1118
1514
        if (conn->io != NULL)
1119
1515
                io_remove(&conn->io);
1120
1516
        i_stream_unref(&conn->input);
1130
1526
        if (dir->left == NULL || dir->right == NULL) {
1131
1527
                /* we aren't synced until we're again connected to a ring */
1132
1528
                dir->sync_seq++;
1133
 
                dir->ring_synced = FALSE;
 
1529
                director_set_ring_unsynced(dir);
1134
1530
        }
1135
1531
}
1136
1532
 
1139
1535
        struct director_connection *conn = *_conn;
1140
1536
        struct director *dir = conn->dir;
1141
1537
 
 
1538
        if (conn->created + DIRECTOR_SUCCESS_MIN_CONNECT_SECS > ioloop_time &&
 
1539
            conn->host != NULL) {
 
1540
                /* connection didn't exist for very long, assume it has a
 
1541
                   network problem */
 
1542
                conn->host->last_network_failure = ioloop_time;
 
1543
        }
 
1544
 
1142
1545
        director_connection_deinit(_conn);
1143
1546
        if (dir->right == NULL)
1144
1547
                director_connect(dir);
1145
1548
}
1146
1549
 
1147
 
static void director_connection_timeout(struct director_connection *conn)
 
1550
void director_connection_reconnect(struct director_connection **_conn)
1148
1551
{
1149
 
        director_connection_disconnected(&conn);
 
1552
        struct director_connection *conn = *_conn;
 
1553
        struct director *dir = conn->dir;
 
1554
 
 
1555
        director_connection_deinit(_conn);
 
1556
        if (dir->right == NULL)
 
1557
                director_connect(dir);
1150
1558
}
1151
1559
 
1152
1560
void director_connection_send(struct director_connection *conn,
1167
1575
                                "disconnecting", conn->name);
1168
1576
                }
1169
1577
                o_stream_close(conn->output);
1170
 
                conn->to = timeout_add(0, director_connection_timeout, conn);
1171
1578
        }
1172
1579
}
1173
1580
 
1174
 
void director_connection_send_except(struct director_connection *conn,
1175
 
                                     struct director_host *skip_host,
1176
 
                                     const char *data)
1177
 
{
1178
 
        if (conn->host != skip_host)
1179
 
                director_connection_send(conn, data);
1180
 
}
1181
 
 
1182
 
static void director_connection_ping_timeout(struct director_connection *conn)
 
1581
static void
 
1582
director_connection_ping_idle_timeout(struct director_connection *conn)
1183
1583
{
1184
1584
        i_error("director(%s): Ping timed out, disconnecting", conn->name);
1185
1585
        director_connection_disconnected(&conn);
1186
1586
}
1187
1587
 
1188
 
static void director_connection_ping(struct director_connection *conn)
1189
 
{
1190
 
        conn->sync_ping = FALSE;
 
1588
static void director_connection_pong_timeout(struct director_connection *conn)
 
1589
{
 
1590
        i_error("director(%s): PONG reply not received although other "
 
1591
                "input keeps coming, disconnecting", conn->name);
 
1592
        director_connection_disconnected(&conn);
 
1593
}
 
1594
 
 
1595
void director_connection_ping(struct director_connection *conn)
 
1596
{
1191
1597
        if (conn->ping_waiting)
1192
1598
                return;
1193
1599
 
1194
 
        if (conn->to_ping != NULL)
1195
 
                timeout_remove(&conn->to_ping);
1196
 
        conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS,
1197
 
                                    director_connection_ping_timeout, conn);
 
1600
        timeout_remove(&conn->to_ping);
 
1601
        conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_IDLE_TIMEOUT_MSECS,
 
1602
                                    director_connection_ping_idle_timeout, conn);
 
1603
        conn->to_pong = timeout_add(DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS,
 
1604
                                    director_connection_pong_timeout, conn);
1198
1605
        director_connection_send(conn, "PING\n");
1199
1606
        conn->ping_waiting = TRUE;
1200
1607
}
1210
1617
        return conn->host;
1211
1618
}
1212
1619
 
1213
 
struct director_connection *
1214
 
director_connection_find_outgoing(struct director *dir,
1215
 
                                  struct director_host *host)
1216
 
{
1217
 
        struct director_connection *conn;
1218
 
 
1219
 
        for (conn = dir->connections; conn != NULL; conn = conn->next) {
1220
 
                if (conn->host == host && !conn->in)
1221
 
                        return conn;
1222
 
        }
1223
 
        return NULL;
 
1620
bool director_connection_is_handshaked(struct director_connection *conn)
 
1621
{
 
1622
        return conn->handshake_received;
 
1623
}
 
1624
 
 
1625
bool director_connection_is_incoming(struct director_connection *conn)
 
1626
{
 
1627
        return conn->in;
 
1628
}
 
1629
 
 
1630
unsigned int
 
1631
director_connection_get_minor_version(struct director_connection *conn)
 
1632
{
 
1633
        return conn->minor_version;
1224
1634
}
1225
1635
 
1226
1636
void director_connection_cork(struct director_connection *conn)
1233
1643
        o_stream_uncork(conn->output);
1234
1644
}
1235
1645
 
1236
 
void director_connection_wait_sync(struct director_connection *conn)
1237
 
{
1238
 
        /* switch to faster ping timeout. avoid reseting the timeout if it's
1239
 
           already fast. */
1240
 
        if (conn->ping_waiting || conn->sync_ping)
1241
 
                return;
1242
 
 
1243
 
        if (conn->to_ping != NULL)
1244
 
                timeout_remove(&conn->to_ping);
1245
 
        conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS,
1246
 
                                    director_connection_ping, conn);
1247
 
        conn->sync_ping = TRUE;
1248
 
}
1249
 
 
1250
 
void director_connections_deinit(struct director *dir)
1251
 
{
1252
 
        struct director_connection *conn;
1253
 
 
1254
 
        while (dir->connections != NULL) {
1255
 
                conn = dir->connections;
1256
 
                dir->connections = conn->next;
1257
 
                director_connection_deinit(&conn);
1258
 
        }
 
1646
void director_connection_set_synced(struct director_connection *conn,
 
1647
                                    bool synced)
 
1648
{
 
1649
        if (conn->synced == synced)
 
1650
                return;
 
1651
        conn->synced = synced;
 
1652
 
 
1653
        /* switch ping timeout, unless we're already waiting for PONG */
 
1654
        if (conn->ping_waiting)
 
1655
                return;
 
1656
 
 
1657
        director_connection_set_ping_timeout(conn);
1259
1658
}