~james-page/ubuntu/saucy/openvswitch/1.12-snapshot

« back to all changes in this revision

Viewing changes to lib/worker.c

  • Committer: James Page
  • Date: 2013-08-21 10:16:57 UTC
  • mfrom: (1.1.20)
  • Revision ID: james.page@canonical.com-20130821101657-3o0z0qeiv5zkwlzi
New upstream snapshot

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2012, 2013 Nicira, Inc.
2
 
 *
3
 
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 
 * you may not use this file except in compliance with the License.
5
 
 * You may obtain a copy of the License at:
6
 
 *
7
 
 *     http://www.apache.org/licenses/LICENSE-2.0
8
 
 *
9
 
 * Unless required by applicable law or agreed to in writing, software
10
 
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 
 * See the License for the specific language governing permissions and
13
 
 * limitations under the License.
14
 
 */
15
 
 
16
 
#include <config.h>
17
 
 
18
 
#include "worker.h"
19
 
 
20
 
#include <assert.h>
21
 
#include <errno.h>
22
 
#include <stdlib.h>
23
 
#include <string.h>
24
 
#include <sys/socket.h>
25
 
#include <sys/types.h>
26
 
#include <sys/uio.h>
27
 
#include <sys/wait.h>
28
 
#include <unistd.h>
29
 
 
30
 
#include "command-line.h"
31
 
#include "daemon.h"
32
 
#include "ofpbuf.h"
33
 
#include "poll-loop.h"
34
 
#include "socket-util.h"
35
 
#include "util.h"
36
 
#include "vlog.h"
37
 
 
38
 
VLOG_DEFINE_THIS_MODULE(worker);
39
 
 
40
 
/* ovs_assert() logs the assertion message and logging sometimes goes through a
41
 
 * worker, so using ovs_assert() in this source file could cause recursion. */
42
 
#undef ovs_assert
43
 
#define ovs_assert use_assert_instead_of_ovs_assert_in_this_module
44
 
 
45
 
/* Header for an RPC request. */
46
 
struct worker_request {
47
 
    size_t request_len;              /* Length of the payload in bytes. */
48
 
    worker_request_func *request_cb; /* Function to call in worker process. */
49
 
    worker_reply_func *reply_cb;     /* Function to call in main process. */
50
 
    void *reply_aux;                 /* Auxiliary data for 'reply_cb'. */
51
 
};
52
 
 
53
 
/* Header for an RPC reply. */
54
 
struct worker_reply {
55
 
    size_t reply_len;            /* Length of the payload in bytes. */
56
 
    worker_reply_func *reply_cb; /* Function to call in main process. */
57
 
    void *reply_aux;             /* Auxiliary data for 'reply_cb'. */
58
 
};
59
 
 
60
 
/* Receive buffer for a RPC request or reply. */
61
 
struct rxbuf {
62
 
    /* Header. */
63
 
    struct ofpbuf header;       /* Header data. */
64
 
    int fds[SOUTIL_MAX_FDS];    /* File descriptors. */
65
 
    size_t n_fds;
66
 
 
67
 
    /* Payload. */
68
 
    struct ofpbuf payload;      /* Payload data. */
69
 
};
70
 
 
71
 
static int client_sock = -1;
72
 
static struct rxbuf client_rx;
73
 
 
74
 
static void rxbuf_init(struct rxbuf *);
75
 
static void rxbuf_clear(struct rxbuf *);
76
 
static int rxbuf_run(struct rxbuf *, int sock, size_t header_len);
77
 
 
78
 
static struct iovec *prefix_iov(void *data, size_t len,
79
 
                                const struct iovec *iovs, size_t n_iovs);
80
 
 
81
 
static void worker_broke(void);
82
 
 
83
 
static void worker_main(int fd) NO_RETURN;
84
 
 
85
 
/* Starts a worker process as a subprocess of the current process.  Currently
86
 
 * only a single worker process is supported, so this function may only be
87
 
 * called once.
88
 
 *
89
 
 * The client should call worker_run() and worker_wait() from its main loop.
90
 
 *
91
 
 * Call this function between daemonize_start() and daemonize_complete(). */
92
 
void
93
 
worker_start(void)
94
 
{
95
 
    int work_fds[2];
96
 
 
97
 
    assert(client_sock < 0);
98
 
 
99
 
    /* Create non-blocking socket pair. */
100
 
    xsocketpair(AF_UNIX, SOCK_STREAM, 0, work_fds);
101
 
    xset_nonblocking(work_fds[0]);
102
 
    xset_nonblocking(work_fds[1]);
103
 
 
104
 
    /* Don't let the worker process own the responsibility to delete
105
 
     * the pidfile.  Register it again after the fork. */
106
 
    remove_pidfile_from_unlink();
107
 
    if (!fork_and_clean_up()) {
108
 
        /* In child (worker) process. */
109
 
        daemonize_post_detach();
110
 
        close(work_fds[0]);
111
 
        worker_main(work_fds[1]);
112
 
        NOT_REACHED();
113
 
    }
114
 
 
115
 
    /* In parent (main) process. */
116
 
    add_pidfile_to_unlink();
117
 
    close(work_fds[1]);
118
 
    client_sock = work_fds[0];
119
 
    rxbuf_init(&client_rx);
120
 
}
121
 
 
122
 
/* Returns true if this process has started a worker and the worker is not
123
 
 * known to have malfunctioned. */
124
 
bool
125
 
worker_is_running(void)
126
 
{
127
 
    return client_sock >= 0;
128
 
}
129
 
 
130
 
/* If a worker process was started, processes RPC replies from it, calling the
131
 
 * registered 'reply_cb' callbacks.
132
 
 *
133
 
 * If the worker process died or malfunctioned, aborts. */
134
 
void
135
 
worker_run(void)
136
 
{
137
 
    if (worker_is_running()) {
138
 
        int error;
139
 
 
140
 
        error = rxbuf_run(&client_rx, client_sock,
141
 
                          sizeof(struct worker_reply));
142
 
        if (!error) {
143
 
            struct worker_reply *reply = client_rx.header.data;
144
 
            reply->reply_cb(&client_rx.payload, client_rx.fds,
145
 
                            client_rx.n_fds, reply->reply_aux);
146
 
            rxbuf_clear(&client_rx);
147
 
        } else if (error != EAGAIN) {
148
 
            worker_broke();
149
 
            VLOG_ABORT("receive from worker failed (%s)",
150
 
                       ovs_retval_to_string(error));
151
 
        }
152
 
    }
153
 
}
154
 
 
155
 
/* Causes the poll loop to wake up if we need to process RPC replies. */
156
 
void
157
 
worker_wait(void)
158
 
{
159
 
    if (worker_is_running()) {
160
 
        poll_fd_wait(client_sock, POLLIN);
161
 
    }
162
 
}
163
 
 
164
 
/* Interface for main process to interact with the worker. */
165
 
 
166
 
/* Sends an RPC request to the worker process.  The worker process will call
167
 
 * 'request_cb' passing the 'size' (zero or more) bytes of data in 'data' as
168
 
 * arguments as well as the 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors
169
 
 * in 'fds'.
170
 
 *
171
 
 * If and only if 'reply_cb' is nonnull, 'request_cb' must call worker_reply()
172
 
 * or worker_reply_iovec() with a reply.  The main process will later call
173
 
 * 'reply_cb' with the reply data (if any) and file descriptors (if any).
174
 
 *
175
 
 * 'request_cb' receives copies (as if by dup()) of the file descriptors in
176
 
 * fds[].  'request_cb' takes ownership of these copies, and the caller of
177
 
 * worker_request() retains its ownership of the originals.
178
 
 *
179
 
 * This function may block until the RPC request has been sent (if the socket
180
 
 * buffer fills up) but it does not wait for the reply (if any).  If this
181
 
 * function blocks, it may invoke reply callbacks for previous requests.
182
 
 *
183
 
 * The worker process executes RPC requests in strict order of submission and
184
 
 * runs each request to completion before beginning the next request.  The main
185
 
 * process invokes reply callbacks in strict order of request submission. */
186
 
void
187
 
worker_request(const void *data, size_t size,
188
 
               const int fds[], size_t n_fds,
189
 
               worker_request_func *request_cb,
190
 
               worker_reply_func *reply_cb, void *aux)
191
 
{
192
 
    if (size > 0) {
193
 
        struct iovec iov;
194
 
 
195
 
        iov.iov_base = (void *) data;
196
 
        iov.iov_len = size;
197
 
        worker_request_iovec(&iov, 1, fds, n_fds, request_cb, reply_cb, aux);
198
 
    } else {
199
 
        worker_request_iovec(NULL, 0, fds, n_fds, request_cb, reply_cb, aux);
200
 
    }
201
 
}
202
 
 
203
 
static int
204
 
worker_send_iovec(const struct iovec iovs[], size_t n_iovs,
205
 
                  const int fds[], size_t n_fds)
206
 
{
207
 
    size_t sent = 0;
208
 
 
209
 
    for (;;) {
210
 
        struct pollfd pfd;
211
 
        int error;
212
 
 
213
 
        /* Try to send the rest of the request. */
214
 
        error = send_iovec_and_fds_fully(client_sock, iovs, n_iovs,
215
 
                                         fds, n_fds, sent, &sent);
216
 
        if (error != EAGAIN) {
217
 
            return error;
218
 
        }
219
 
 
220
 
        /* Process replies to avoid deadlock. */
221
 
        worker_run();
222
 
 
223
 
        /* Wait for 'client_sock' to become ready before trying again.  We
224
 
         * can't use poll_block() because it sometimes calls into vlog, which
225
 
         * calls indirectly into worker_send_iovec().  To be usable here,
226
 
         * poll_block() would therefore need to be reentrant, but it isn't
227
 
         * (calling it recursively causes memory corruption and an eventual
228
 
         * crash). */
229
 
        pfd.fd = client_sock;
230
 
        pfd.events = POLLIN | POLLOUT;
231
 
        do {
232
 
            error = poll(&pfd, 1, -1) < 0 ? errno : 0;
233
 
        } while (error == EINTR);
234
 
        if (error) {
235
 
            worker_broke();
236
 
            VLOG_ABORT("poll failed (%s)", strerror(error));
237
 
        }
238
 
    }
239
 
}
240
 
 
241
 
/* Same as worker_request() except that the data to send is specified as an
242
 
 * array of iovecs. */
243
 
void
244
 
worker_request_iovec(const struct iovec iovs[], size_t n_iovs,
245
 
                     const int fds[], size_t n_fds,
246
 
                     worker_request_func *request_cb,
247
 
                     worker_reply_func *reply_cb, void *aux)
248
 
{
249
 
    static bool recursing = false;
250
 
    struct worker_request rq;
251
 
    struct iovec *all_iovs;
252
 
    int error;
253
 
 
254
 
    assert(worker_is_running());
255
 
    assert(!recursing);
256
 
    recursing = true;
257
 
 
258
 
    rq.request_len = iovec_len(iovs, n_iovs);
259
 
    rq.request_cb = request_cb;
260
 
    rq.reply_cb = reply_cb;
261
 
    rq.reply_aux = aux;
262
 
 
263
 
    all_iovs = prefix_iov(&rq, sizeof rq, iovs, n_iovs);
264
 
    error = worker_send_iovec(all_iovs, n_iovs + 1, fds, n_fds);
265
 
    if (error) {
266
 
        worker_broke();
267
 
        VLOG_ABORT("send failed (%s)", strerror(error));
268
 
    }
269
 
    free(all_iovs);
270
 
 
271
 
    recursing = false;
272
 
}
273
 
 
274
 
/* Closes the client socket, if any, so that worker_is_running() will return
275
 
 * false.
276
 
 *
277
 
 * The client does this just before aborting if the worker process dies or
278
 
 * malfunctions, to prevent the logging subsystem from trying to use the
279
 
 * worker to log the failure. */
280
 
static void
281
 
worker_broke(void)
282
 
{
283
 
    if (client_sock >= 0) {
284
 
        close(client_sock);
285
 
        client_sock = -1;
286
 
    }
287
 
}
288
 
 
289
 
/* Interfaces for RPC implementations (running in the worker process). */
290
 
 
291
 
static int server_sock = -1;
292
 
static bool expect_reply;
293
 
static struct worker_request request;
294
 
 
295
 
/* When a call to worker_request() or worker_request_iovec() provides a
296
 
 * 'reply_cb' callback, the 'request_cb' implementation must call this function
297
 
 * to send its reply.  The main process will call 'reply_cb' passing the
298
 
 * 'size' (zero or more) bytes of data in 'data' as arguments as well as the
299
 
 * 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors in 'fds'.
300
 
 *
301
 
 * If a call to worker_request() or worker_request_iovec() provides no
302
 
 * 'reply_cb' callback, the 'request_cb' implementation must not call this
303
 
 * function.
304
 
 *
305
 
 * 'reply_cb' receives copies (as if by dup()) of the file descriptors in
306
 
 * fds[].  'reply_cb' takes ownership of these copies, and the caller of
307
 
 * worker_reply() retains its ownership of the originals.
308
 
 *
309
 
 * This function blocks until the RPC reply has been sent (if the socket buffer
310
 
 * fills up) but it does not wait for the main process to receive or to process
311
 
 * the reply. */
312
 
void
313
 
worker_reply(const void *data, size_t size, const int fds[], size_t n_fds)
314
 
{
315
 
    if (size > 0) {
316
 
        struct iovec iov;
317
 
 
318
 
        iov.iov_base = (void *) data;
319
 
        iov.iov_len = size;
320
 
        worker_reply_iovec(&iov, 1, fds, n_fds);
321
 
    } else {
322
 
        worker_reply_iovec(NULL, 0, fds, n_fds);
323
 
    }
324
 
}
325
 
 
326
 
/* Same as worker_reply() except that the data to send is specified as an array
327
 
 * of iovecs. */
328
 
void
329
 
worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
330
 
                       const int fds[], size_t n_fds)
331
 
{
332
 
    struct worker_reply reply;
333
 
    struct iovec *all_iovs;
334
 
    int error;
335
 
 
336
 
    assert(expect_reply);
337
 
    expect_reply = false;
338
 
 
339
 
    reply.reply_len = iovec_len(iovs, n_iovs);
340
 
    reply.reply_cb = request.reply_cb;
341
 
    reply.reply_aux = request.reply_aux;
342
 
 
343
 
    all_iovs = prefix_iov(&reply, sizeof reply, iovs, n_iovs);
344
 
 
345
 
    error = send_iovec_and_fds_fully_block(server_sock, all_iovs, n_iovs + 1,
346
 
                                           fds, n_fds);
347
 
    if (error == EPIPE) {
348
 
        /* Parent probably died.  Continue processing any RPCs still buffered,
349
 
         * to avoid missing log messages. */
350
 
        VLOG_INFO("send failed (%s)", strerror(error));
351
 
    } else if (error) {
352
 
        VLOG_FATAL("send failed (%s)", strerror(error));
353
 
    }
354
 
 
355
 
    free(all_iovs);
356
 
}
357
 
 
358
 
static void
359
 
worker_main(int fd)
360
 
{
361
 
    struct rxbuf rx;
362
 
 
363
 
    server_sock = fd;
364
 
 
365
 
    subprogram_name = "worker";
366
 
    proctitle_set("worker process for pid %lu", (unsigned long int) getppid());
367
 
    VLOG_INFO("worker process started");
368
 
 
369
 
    rxbuf_init(&rx);
370
 
    for (;;) {
371
 
        int error;
372
 
 
373
 
        error = rxbuf_run(&rx, server_sock, sizeof(struct worker_request));
374
 
        if (!error) {
375
 
            request = *(struct worker_request *) rx.header.data;
376
 
 
377
 
            expect_reply = request.reply_cb != NULL;
378
 
            request.request_cb(&rx.payload, rx.fds, rx.n_fds);
379
 
            assert(!expect_reply);
380
 
 
381
 
            rxbuf_clear(&rx);
382
 
        } else if (error == EOF && !rx.header.size) {
383
 
            /* Main process closed the IPC socket.  Exit cleanly. */
384
 
            break;
385
 
        } else if (error != EAGAIN) {
386
 
            VLOG_FATAL("RPC receive failed (%s)", ovs_retval_to_string(error));
387
 
        }
388
 
 
389
 
        poll_fd_wait(server_sock, POLLIN);
390
 
        poll_block();
391
 
    }
392
 
 
393
 
    VLOG_INFO("worker process exiting");
394
 
    exit(0);
395
 
}
396
 
 
397
 
static void
398
 
rxbuf_init(struct rxbuf *rx)
399
 
{
400
 
    ofpbuf_init(&rx->header, 0);
401
 
    rx->n_fds = 0;
402
 
    ofpbuf_init(&rx->payload, 0);
403
 
}
404
 
 
405
 
static void
406
 
rxbuf_clear(struct rxbuf *rx)
407
 
{
408
 
    ofpbuf_clear(&rx->header);
409
 
    rx->n_fds = 0;
410
 
    ofpbuf_clear(&rx->payload);
411
 
}
412
 
 
413
 
static int
414
 
rxbuf_run(struct rxbuf *rx, int sock, size_t header_len)
415
 
{
416
 
    for (;;) {
417
 
        if (!rx->header.size) {
418
 
            int retval;
419
 
 
420
 
            ofpbuf_clear(&rx->header);
421
 
            ofpbuf_prealloc_tailroom(&rx->header, header_len);
422
 
 
423
 
            retval = recv_data_and_fds(sock, rx->header.data, header_len,
424
 
                                       rx->fds, &rx->n_fds);
425
 
            if (retval <= 0) {
426
 
                return retval ? -retval : EOF;
427
 
            }
428
 
            rx->header.size += retval;
429
 
        } else if (rx->header.size < header_len) {
430
 
            size_t bytes_read;
431
 
            int error;
432
 
 
433
 
            error = read_fully(sock, ofpbuf_tail(&rx->header),
434
 
                               header_len - rx->header.size, &bytes_read);
435
 
            rx->header.size += bytes_read;
436
 
            if (error) {
437
 
                return error;
438
 
            }
439
 
        } else {
440
 
            size_t payload_len = *(size_t *) rx->header.data;
441
 
 
442
 
            if (rx->payload.size < payload_len) {
443
 
                size_t left = payload_len - rx->payload.size;
444
 
                size_t bytes_read;
445
 
                int error;
446
 
 
447
 
                ofpbuf_prealloc_tailroom(&rx->payload, left);
448
 
                error = read_fully(sock, ofpbuf_tail(&rx->payload), left,
449
 
                                   &bytes_read);
450
 
                rx->payload.size += bytes_read;
451
 
                if (error) {
452
 
                    return error;
453
 
                }
454
 
            } else {
455
 
                return 0;
456
 
            }
457
 
        }
458
 
    }
459
 
}
460
 
 
461
 
static struct iovec *
462
 
prefix_iov(void *data, size_t len, const struct iovec *iovs, size_t n_iovs)
463
 
{
464
 
    struct iovec *dst;
465
 
 
466
 
    dst = xmalloc((n_iovs + 1) * sizeof *dst);
467
 
    dst[0].iov_base = data;
468
 
    dst[0].iov_len = len;
469
 
    memcpy(dst + 1, iovs, n_iovs * sizeof *iovs);
470
 
 
471
 
    return dst;
472
 
}