~ubuntu-branches/ubuntu/wily/dovecot/wily

« back to all changes in this revision

Viewing changes to src/doveadm/dsync/dsync-proxy-server.c

  • Committer: Package Import Robot
  • Author(s): Jaldhar H. Vyas
  • Date: 2013-09-09 00:57:32 UTC
  • mfrom: (1.13.11)
  • mto: (4.8.5 experimental) (1.16.1)
  • mto: This revision was merged to the branch mainline in revision 97.
  • Revision ID: package-import@ubuntu.com-20130909005732-dn1eell8srqbhh0e
Tags: upstream-2.2.5
ImportĀ upstreamĀ versionĀ 2.2.5

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2009-2012 Dovecot authors, see the included COPYING file */
2
 
 
3
 
#include "lib.h"
4
 
#include "strescape.h"
5
 
#include "fd-set-nonblock.h"
6
 
#include "istream.h"
7
 
#include "ostream.h"
8
 
#include "dsync-worker.h"
9
 
#include "dsync-proxy.h"
10
 
#include "dsync-proxy-server.h"
11
 
 
12
 
#include <stdlib.h>
13
 
 
14
 
static int
15
 
proxy_server_read_line(struct dsync_proxy_server *server,
16
 
                       const char **line_r)
17
 
{
18
 
        *line_r = i_stream_read_next_line(server->input);
19
 
        if (*line_r == NULL) {
20
 
                if (server->input->stream_errno != 0) {
21
 
                        errno = server->input->stream_errno;
22
 
                        i_error("read() from proxy client failed: %m");
23
 
                        io_loop_stop(current_ioloop);
24
 
                        return -1;
25
 
                }
26
 
                if (server->input->eof) {
27
 
                        if (!server->finished)
28
 
                                i_error("read() from proxy client failed: EOF");
29
 
                        io_loop_stop(current_ioloop);
30
 
                        return -1;
31
 
                }
32
 
        }
33
 
        if (*line_r == NULL)
34
 
                return 0;
35
 
 
36
 
        if (!server->handshake_received) {
37
 
                if (strcmp(*line_r, DSYNC_PROXY_CLIENT_GREETING_LINE) != 0) {
38
 
                        i_error("Invalid client handshake: %s", *line_r);
39
 
                        io_loop_stop(current_ioloop);
40
 
                        return -1;
41
 
                }
42
 
                server->handshake_received = TRUE;
43
 
                return proxy_server_read_line(server, line_r);
44
 
        }
45
 
        return 1;
46
 
}
47
 
 
48
 
static int proxy_server_run_cmd(struct dsync_proxy_server *server)
49
 
{
50
 
        int ret;
51
 
 
52
 
        if ((ret = server->cur_cmd->func(server, server->cur_args)) == 0)
53
 
                return 0;
54
 
        if (ret < 0) {
55
 
                i_error("command %s failed", server->cur_cmd->name);
56
 
                return -1;
57
 
        }
58
 
 
59
 
        server->cur_cmd = NULL;
60
 
        server->cur_args = NULL;
61
 
        return 1;
62
 
}
63
 
 
64
 
static int
65
 
proxy_server_input_line(struct dsync_proxy_server *server, const char *line)
66
 
{
67
 
        const char *const *args;
68
 
        const char **cmd_args;
69
 
        unsigned int i, count;
70
 
 
71
 
        i_assert(server->cur_cmd == NULL);
72
 
 
73
 
        p_clear(server->cmd_pool);
74
 
        args = (const char *const *)p_strsplit(server->cmd_pool, line, "\t");
75
 
        if (args[0] == NULL) {
76
 
                i_error("proxy client sent invalid input: %s", line);
77
 
                return -1;
78
 
        }
79
 
 
80
 
        server->cur_cmd = dsync_proxy_server_command_find(args[0]);
81
 
        if (server->cur_cmd == NULL) {
82
 
                i_error("proxy client sent unknown command: %s", args[0]);
83
 
                return -1;
84
 
        } else {
85
 
                args++;
86
 
                count = str_array_length(args);
87
 
 
88
 
                cmd_args = p_new(server->cmd_pool, const char *, count + 1);
89
 
                for (i = 0; i < count; i++) {
90
 
                        cmd_args[i] = str_tabunescape(p_strdup(server->cmd_pool,
91
 
                                                               args[i]));
92
 
                }
93
 
 
94
 
                server->cur_args = cmd_args;
95
 
                return proxy_server_run_cmd(server);
96
 
        }
97
 
}
98
 
 
99
 
static void proxy_server_input(struct dsync_proxy_server *server)
100
 
{
101
 
        const char *line;
102
 
        int ret = 0;
103
 
 
104
 
        if (server->cur_cmd != NULL) {
105
 
                /* wait until command handling is finished */
106
 
                io_remove(&server->io);
107
 
                return;
108
 
        }
109
 
 
110
 
        o_stream_cork(server->output);
111
 
        while (proxy_server_read_line(server, &line) > 0) {
112
 
                T_BEGIN {
113
 
                        ret = proxy_server_input_line(server, line);
114
 
                } T_END;
115
 
                if (ret <= 0)
116
 
                        break;
117
 
        }
118
 
        o_stream_uncork(server->output);
119
 
        if (server->output->closed)
120
 
                ret = -1;
121
 
 
122
 
        if (ret < 0)
123
 
                io_loop_stop(current_ioloop);
124
 
        timeout_reset(server->to);
125
 
}
126
 
 
127
 
static int proxy_server_output(struct dsync_proxy_server *server)
128
 
{
129
 
        struct ostream *output = server->output;
130
 
        int ret;
131
 
 
132
 
        if ((ret = o_stream_flush(output)) < 0)
133
 
                ret = 1;
134
 
        else if (server->cur_cmd != NULL) {
135
 
                o_stream_cork(output);
136
 
                (void)proxy_server_run_cmd(server);
137
 
                o_stream_uncork(output);
138
 
 
139
 
                if (server->cur_cmd == NULL) {
140
 
                        if (server->io == NULL) {
141
 
                                server->io = io_add(server->fd_in, IO_READ,
142
 
                                                    proxy_server_input, server);
143
 
                        }
144
 
                        /* handle pending input */
145
 
                        proxy_server_input(server);
146
 
                }
147
 
        }
148
 
        if (output->closed)
149
 
                io_loop_stop(current_ioloop);
150
 
        timeout_reset(server->to);
151
 
        return ret;
152
 
}
153
 
 
154
 
static void dsync_proxy_server_timeout(void *context ATTR_UNUSED)
155
 
{
156
 
        i_error("proxy server timed out");
157
 
        io_loop_stop(current_ioloop);
158
 
}
159
 
 
160
 
struct dsync_proxy_server *
161
 
dsync_proxy_server_init(int fd_in, int fd_out, struct dsync_worker *worker)
162
 
{
163
 
        struct dsync_proxy_server *server;
164
 
 
165
 
        server = i_new(struct dsync_proxy_server, 1);
166
 
        server->worker = worker;
167
 
        server->fd_in = fd_in;
168
 
        server->fd_out = fd_out;
169
 
 
170
 
        server->cmd_pool = pool_alloconly_create("worker server cmd", 2048);
171
 
        server->io = io_add(fd_in, IO_READ, proxy_server_input, server);
172
 
        server->input = i_stream_create_fd(fd_in, (size_t)-1, FALSE);
173
 
        server->output = o_stream_create_fd(fd_out, (size_t)-1, FALSE);
174
 
        server->to = timeout_add(DSYNC_PROXY_SERVER_TIMEOUT_MSECS,
175
 
                                 dsync_proxy_server_timeout, NULL);
176
 
        o_stream_set_flush_callback(server->output, proxy_server_output,
177
 
                                    server);
178
 
        o_stream_send_str(server->output, DSYNC_PROXY_SERVER_GREETING_LINE"\n");
179
 
        fd_set_nonblock(fd_in, TRUE);
180
 
        fd_set_nonblock(fd_out, TRUE);
181
 
        return server;
182
 
}
183
 
 
184
 
void dsync_proxy_server_deinit(struct dsync_proxy_server **_server)
185
 
{
186
 
        struct dsync_proxy_server *server = *_server;
187
 
 
188
 
        *_server = NULL;
189
 
 
190
 
        if (server->get_input != NULL)
191
 
                i_stream_unref(&server->get_input);
192
 
        pool_unref(&server->cmd_pool);
193
 
        timeout_remove(&server->to);
194
 
        if (server->io != NULL)
195
 
                io_remove(&server->io);
196
 
        i_stream_destroy(&server->input);
197
 
        o_stream_destroy(&server->output);
198
 
        if (close(server->fd_in) < 0)
199
 
                i_error("close(proxy input) failed: %m");
200
 
        if (server->fd_in != server->fd_out) {
201
 
                if (close(server->fd_out) < 0)
202
 
                        i_error("close(proxy output) failed: %m");
203
 
        }
204
 
        i_free(server);
205
 
}