1
/* Copyright (c) 2009-2012 Dovecot authors, see the included COPYING file */
5
#include "fd-set-nonblock.h"
8
#include "dsync-worker.h"
9
#include "dsync-proxy.h"
10
#include "dsync-proxy-server.h"
15
proxy_server_read_line(struct dsync_proxy_server *server,
18
*line_r = i_stream_read_next_line(server->input);
19
if (*line_r == NULL) {
20
if (server->input->stream_errno != 0) {
21
errno = server->input->stream_errno;
22
i_error("read() from proxy client failed: %m");
23
io_loop_stop(current_ioloop);
26
if (server->input->eof) {
27
if (!server->finished)
28
i_error("read() from proxy client failed: EOF");
29
io_loop_stop(current_ioloop);
36
if (!server->handshake_received) {
37
if (strcmp(*line_r, DSYNC_PROXY_CLIENT_GREETING_LINE) != 0) {
38
i_error("Invalid client handshake: %s", *line_r);
39
io_loop_stop(current_ioloop);
42
server->handshake_received = TRUE;
43
return proxy_server_read_line(server, line_r);
48
static int proxy_server_run_cmd(struct dsync_proxy_server *server)
52
if ((ret = server->cur_cmd->func(server, server->cur_args)) == 0)
55
i_error("command %s failed", server->cur_cmd->name);
59
server->cur_cmd = NULL;
60
server->cur_args = NULL;
65
proxy_server_input_line(struct dsync_proxy_server *server, const char *line)
67
const char *const *args;
68
const char **cmd_args;
69
unsigned int i, count;
71
i_assert(server->cur_cmd == NULL);
73
p_clear(server->cmd_pool);
74
args = (const char *const *)p_strsplit(server->cmd_pool, line, "\t");
75
if (args[0] == NULL) {
76
i_error("proxy client sent invalid input: %s", line);
80
server->cur_cmd = dsync_proxy_server_command_find(args[0]);
81
if (server->cur_cmd == NULL) {
82
i_error("proxy client sent unknown command: %s", args[0]);
86
count = str_array_length(args);
88
cmd_args = p_new(server->cmd_pool, const char *, count + 1);
89
for (i = 0; i < count; i++) {
90
cmd_args[i] = str_tabunescape(p_strdup(server->cmd_pool,
94
server->cur_args = cmd_args;
95
return proxy_server_run_cmd(server);
99
static void proxy_server_input(struct dsync_proxy_server *server)
104
if (server->cur_cmd != NULL) {
105
/* wait until command handling is finished */
106
io_remove(&server->io);
110
o_stream_cork(server->output);
111
while (proxy_server_read_line(server, &line) > 0) {
113
ret = proxy_server_input_line(server, line);
118
o_stream_uncork(server->output);
119
if (server->output->closed)
123
io_loop_stop(current_ioloop);
124
timeout_reset(server->to);
127
static int proxy_server_output(struct dsync_proxy_server *server)
129
struct ostream *output = server->output;
132
if ((ret = o_stream_flush(output)) < 0)
134
else if (server->cur_cmd != NULL) {
135
o_stream_cork(output);
136
(void)proxy_server_run_cmd(server);
137
o_stream_uncork(output);
139
if (server->cur_cmd == NULL) {
140
if (server->io == NULL) {
141
server->io = io_add(server->fd_in, IO_READ,
142
proxy_server_input, server);
144
/* handle pending input */
145
proxy_server_input(server);
149
io_loop_stop(current_ioloop);
150
timeout_reset(server->to);
154
static void dsync_proxy_server_timeout(void *context ATTR_UNUSED)
156
i_error("proxy server timed out");
157
io_loop_stop(current_ioloop);
160
struct dsync_proxy_server *
161
dsync_proxy_server_init(int fd_in, int fd_out, struct dsync_worker *worker)
163
struct dsync_proxy_server *server;
165
server = i_new(struct dsync_proxy_server, 1);
166
server->worker = worker;
167
server->fd_in = fd_in;
168
server->fd_out = fd_out;
170
server->cmd_pool = pool_alloconly_create("worker server cmd", 2048);
171
server->io = io_add(fd_in, IO_READ, proxy_server_input, server);
172
server->input = i_stream_create_fd(fd_in, (size_t)-1, FALSE);
173
server->output = o_stream_create_fd(fd_out, (size_t)-1, FALSE);
174
server->to = timeout_add(DSYNC_PROXY_SERVER_TIMEOUT_MSECS,
175
dsync_proxy_server_timeout, NULL);
176
o_stream_set_flush_callback(server->output, proxy_server_output,
178
o_stream_send_str(server->output, DSYNC_PROXY_SERVER_GREETING_LINE"\n");
179
fd_set_nonblock(fd_in, TRUE);
180
fd_set_nonblock(fd_out, TRUE);
184
void dsync_proxy_server_deinit(struct dsync_proxy_server **_server)
186
struct dsync_proxy_server *server = *_server;
190
if (server->get_input != NULL)
191
i_stream_unref(&server->get_input);
192
pool_unref(&server->cmd_pool);
193
timeout_remove(&server->to);
194
if (server->io != NULL)
195
io_remove(&server->io);
196
i_stream_destroy(&server->input);
197
o_stream_destroy(&server->output);
198
if (close(server->fd_in) < 0)
199
i_error("close(proxy input) failed: %m");
200
if (server->fd_in != server->fd_out) {
201
if (close(server->fd_out) < 0)
202
i_error("close(proxy output) failed: %m");