~ubuntu-branches/ubuntu/hardy/transmission/hardy-updates

« back to all changes in this revision

Viewing changes to third-party/libevent/evrpc.c

  • Committer: Bazaar Package Importer
  • Author(s): Philipp Benner
  • Date: 2007-10-26 16:02:39 UTC
  • mto: This revision was merged to the branch mainline in revision 6.
  • Revision ID: james.westby@ubuntu.com-20071026160239-2c0agn7q1ken0xsp
Tags: upstream-0.90.dfsg
ImportĀ upstreamĀ versionĀ 0.90.dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 * 3. The name of the author may not be used to endorse or promote products
 
14
 *    derived from this software without specific prior written permission.
 
15
 *
 
16
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
 
17
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
18
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
19
 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
 
20
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
21
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
22
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
23
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
24
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
25
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
26
 */
 
27
#ifdef HAVE_CONFIG_H
 
28
#include "config.h"
 
29
#endif
 
30
 
 
31
#ifdef WIN32
 
32
#define WIN32_LEAN_AND_MEAN
 
33
#include <windows.h>
 
34
#include <winsock2.h>
 
35
#undef WIN32_LEAN_AND_MEAN
 
36
#include "misc.h"
 
37
#endif
 
38
 
 
39
#include <sys/types.h>
 
40
#include <sys/tree.h>
 
41
#ifndef WIN32
 
42
#include <sys/socket.h>
 
43
#endif
 
44
#ifdef HAVE_SYS_TIME_H
 
45
#include <sys/time.h>
 
46
#else
 
47
#include <sys/_time.h>
 
48
#endif
 
49
#include <sys/queue.h>
 
50
#include <stdio.h>
 
51
#include <stdlib.h>
 
52
#ifndef WIN32
 
53
#include <unistd.h>
 
54
#endif
 
55
#include <errno.h>
 
56
#include <signal.h>
 
57
#include <string.h>
 
58
#include <assert.h>
 
59
 
 
60
#include "event.h"
 
61
#include "evrpc.h"
 
62
#include "evrpc-internal.h"
 
63
#include "evhttp.h"
 
64
#include "log.h"
 
65
 
 
66
struct evrpc_base *
 
67
evrpc_init(struct evhttp *http_server)
 
68
{
 
69
        struct evrpc_base* base = calloc(1, sizeof(struct evrpc_base));
 
70
        if (base == NULL)
 
71
                return (NULL);
 
72
 
 
73
        /* we rely on the tagging sub system */
 
74
        evtag_init();
 
75
 
 
76
        TAILQ_INIT(&base->registered_rpcs);
 
77
        base->http_server = http_server;
 
78
 
 
79
        return (base);
 
80
}
 
81
 
 
82
void
 
83
evrpc_free(struct evrpc_base *base)
 
84
{
 
85
        struct evrpc *rpc;
 
86
        
 
87
        while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
 
88
                assert(evrpc_unregister_rpc(base, rpc->uri));
 
89
        }
 
90
 
 
91
        free(base);
 
92
}
 
93
 
 
94
static void evrpc_pool_schedule(struct evrpc_pool *pool);
 
95
static void evrpc_request_cb(struct evhttp_request *, void *);
 
96
void evrpc_request_done(struct evrpc_req_generic*);
 
97
 
 
98
/*
 
99
 * Registers a new RPC with the HTTP server.   The evrpc object is expected
 
100
 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
 
101
 * calls this function.
 
102
 */
 
103
 
 
104
char *
 
105
evrpc_construct_uri(const char *uri)
 
106
{
 
107
        char *constructed_uri;
 
108
        int constructed_uri_len;
 
109
 
 
110
        constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
 
111
        if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
 
112
                event_err(1, "%s: failed to register rpc at %s",
 
113
                    __func__, uri);
 
114
        memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
 
115
        memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
 
116
        constructed_uri[constructed_uri_len - 1] = '\0';
 
117
 
 
118
        return (constructed_uri);
 
119
}
 
120
 
 
121
int
 
122
evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
 
123
    void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
 
124
{
 
125
        char *constructed_uri = evrpc_construct_uri(rpc->uri);
 
126
 
 
127
        rpc->cb = cb;
 
128
        rpc->cb_arg = cb_arg;
 
129
 
 
130
        TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
 
131
 
 
132
        evhttp_set_cb(base->http_server,
 
133
            constructed_uri,
 
134
            evrpc_request_cb,
 
135
            rpc);
 
136
        
 
137
        free(constructed_uri);
 
138
 
 
139
        return (0);
 
140
}
 
141
 
 
142
int
 
143
evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
 
144
{
 
145
        char *registered_uri = NULL;
 
146
        struct evrpc *rpc;
 
147
 
 
148
        /* find the right rpc; linear search might be slow */
 
149
        TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
 
150
                if (strcmp(rpc->uri, name) == 0)
 
151
                        break;
 
152
        }
 
153
        if (rpc == NULL) {
 
154
                /* We did not find an RPC with this name */
 
155
                return (-1);
 
156
        }
 
157
        TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
 
158
        
 
159
        free((char *)rpc->uri);
 
160
        free(rpc);
 
161
 
 
162
        registered_uri = evrpc_construct_uri(name);
 
163
 
 
164
        /* remove the http server callback */
 
165
        assert(evhttp_del_cb(base->http_server, registered_uri) == 0);
 
166
 
 
167
        free(registered_uri);
 
168
        return (0);
 
169
}
 
170
 
 
171
static void
 
172
evrpc_request_cb(struct evhttp_request *req, void *arg)
 
173
{
 
174
        struct evrpc *rpc = arg;
 
175
        struct evrpc_req_generic *rpc_state = NULL;
 
176
 
 
177
        /* let's verify the outside parameters */
 
178
        if (req->type != EVHTTP_REQ_POST ||
 
179
            EVBUFFER_LENGTH(req->input_buffer) <= 0)
 
180
                goto error;
 
181
 
 
182
        rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
 
183
        if (rpc_state == NULL)
 
184
                goto error;
 
185
 
 
186
        /* let's check that we can parse the request */
 
187
        rpc_state->request = rpc->request_new();
 
188
        if (rpc_state->request == NULL)
 
189
                goto error;
 
190
 
 
191
        rpc_state->rpc = rpc;
 
192
 
 
193
        if (rpc->request_unmarshal(
 
194
                    rpc_state->request, req->input_buffer) == -1) {
 
195
                /* we failed to parse the request; that's a bummer */
 
196
                goto error;
 
197
        }
 
198
 
 
199
        /* at this point, we have a well formed request, prepare the reply */
 
200
 
 
201
        rpc_state->reply = rpc->reply_new();
 
202
        if (rpc_state->reply == NULL)
 
203
                goto error;
 
204
 
 
205
        rpc_state->http_req = req;
 
206
        rpc_state->done = evrpc_request_done;
 
207
 
 
208
        /* give the rpc to the user; they can deal with it */
 
209
        rpc->cb(rpc_state, rpc->cb_arg);
 
210
 
 
211
        return;
 
212
 
 
213
error:
 
214
        evrpc_reqstate_free(rpc_state);
 
215
        evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
 
216
        return;
 
217
}
 
218
 
 
219
void
 
220
evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
 
221
{
 
222
        /* clean up all memory */
 
223
        if (rpc_state != NULL) {
 
224
                struct evrpc *rpc = rpc_state->rpc;
 
225
 
 
226
                if (rpc_state->request != NULL)
 
227
                        rpc->request_free(rpc_state->request);
 
228
                if (rpc_state->reply != NULL)
 
229
                        rpc->reply_free(rpc_state->reply);
 
230
                free(rpc_state);
 
231
        }
 
232
}
 
233
 
 
234
void
 
235
evrpc_request_done(struct evrpc_req_generic* rpc_state)
 
236
{
 
237
        struct evhttp_request *req = rpc_state->http_req;
 
238
        struct evrpc *rpc = rpc_state->rpc;
 
239
        struct evbuffer* data;
 
240
 
 
241
        if (rpc->reply_complete(rpc_state->reply) == -1) {
 
242
                /* the reply was not completely filled in.  error out */
 
243
                goto error;
 
244
        }
 
245
 
 
246
        if ((data = evbuffer_new()) == NULL) {
 
247
                /* out of memory */
 
248
                goto error;
 
249
        }
 
250
 
 
251
        /* serialize the reply */
 
252
        rpc->reply_marshal(data, rpc_state->reply);
 
253
 
 
254
        evhttp_send_reply(req, HTTP_OK, "OK", data);
 
255
 
 
256
        evbuffer_free(data);
 
257
 
 
258
        evrpc_reqstate_free(rpc_state);
 
259
 
 
260
        return;
 
261
 
 
262
error:
 
263
        evrpc_reqstate_free(rpc_state);
 
264
        evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
 
265
        return;
 
266
}
 
267
 
 
268
/* Client implementation of RPC site */
 
269
 
 
270
static int evrpc_schedule_request(struct evhttp_connection *connection,
 
271
    struct evrpc_request_wrapper *ctx);
 
272
 
 
273
struct evrpc_pool *
 
274
evrpc_pool_new(void)
 
275
{
 
276
        struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
 
277
        if (pool == NULL)
 
278
                return (NULL);
 
279
 
 
280
        TAILQ_INIT(&pool->connections);
 
281
        TAILQ_INIT(&pool->requests);
 
282
 
 
283
        pool->timeout = -1;
 
284
 
 
285
        return (pool);
 
286
}
 
287
 
 
288
static void
 
289
evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
 
290
{
 
291
        free(request->name);
 
292
        free(request);
 
293
}
 
294
 
 
295
void
 
296
evrpc_pool_free(struct evrpc_pool *pool)
 
297
{
 
298
        struct evhttp_connection *connection;
 
299
        struct evrpc_request_wrapper *request;
 
300
 
 
301
        while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
 
302
                TAILQ_REMOVE(&pool->requests, request, next);
 
303
                /* if this gets more complicated we need our own function */
 
304
                evrpc_request_wrapper_free(request);
 
305
        }
 
306
 
 
307
        while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
 
308
                TAILQ_REMOVE(&pool->connections, connection, next);
 
309
                evhttp_connection_free(connection);
 
310
        }
 
311
 
 
312
        free(pool);
 
313
}
 
314
 
 
315
/*
 
316
 * Add a connection to the RPC pool.   A request scheduled on the pool
 
317
 * may use any available connection.
 
318
 */
 
319
 
 
320
void
 
321
evrpc_pool_add_connection(struct evrpc_pool *pool,
 
322
    struct evhttp_connection *connection) {
 
323
        assert(connection->http_server == NULL);
 
324
        TAILQ_INSERT_TAIL(&pool->connections, connection, next);
 
325
 
 
326
        /* 
 
327
         * unless a timeout was specifically set for a connection,
 
328
         * the connection inherits the timeout from the pool.
 
329
         */
 
330
        if (connection->timeout == -1)
 
331
                connection->timeout = pool->timeout;
 
332
 
 
333
        /* 
 
334
         * if we have any requests pending, schedule them with the new
 
335
         * connections.
 
336
         */
 
337
 
 
338
        if (TAILQ_FIRST(&pool->requests) != NULL) {
 
339
                struct evrpc_request_wrapper *request = 
 
340
                    TAILQ_FIRST(&pool->requests);
 
341
                TAILQ_REMOVE(&pool->requests, request, next);
 
342
                evrpc_schedule_request(connection, request);
 
343
        }
 
344
}
 
345
 
 
346
void
 
347
evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
 
348
{
 
349
        struct evhttp_connection *evcon;
 
350
        TAILQ_FOREACH(evcon, &pool->connections, next) {
 
351
                evcon->timeout = timeout_in_secs;
 
352
        }
 
353
        pool->timeout = timeout_in_secs;
 
354
}
 
355
 
 
356
 
 
357
static void evrpc_reply_done(struct evhttp_request *, void *);
 
358
static void evrpc_request_timeout(int, short, void *);
 
359
 
 
360
/*
 
361
 * Finds a connection object associated with the pool that is currently
 
362
 * idle and can be used to make a request.
 
363
 */
 
364
static struct evhttp_connection *
 
365
evrpc_pool_find_connection(struct evrpc_pool *pool)
 
366
{
 
367
        struct evhttp_connection *connection;
 
368
        TAILQ_FOREACH(connection, &pool->connections, next) {
 
369
                if (TAILQ_FIRST(&connection->requests) == NULL)
 
370
                        return (connection);
 
371
        }
 
372
 
 
373
        return (NULL);
 
374
}
 
375
 
 
376
/*
 
377
 * We assume that the ctx is no longer queued on the pool.
 
378
 */
 
379
static int
 
380
evrpc_schedule_request(struct evhttp_connection *connection,
 
381
    struct evrpc_request_wrapper *ctx)
 
382
{
 
383
        struct evhttp_request *req = NULL;
 
384
        struct evrpc_pool *pool = ctx->pool;
 
385
        struct evrpc_status status;
 
386
        char *uri = NULL;
 
387
        int res = 0;
 
388
 
 
389
        if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
 
390
                goto error;
 
391
 
 
392
        /* serialize the request data into the output buffer */
 
393
        ctx->request_marshal(req->output_buffer, ctx->request);
 
394
 
 
395
        uri = evrpc_construct_uri(ctx->name);
 
396
        if (uri == NULL)
 
397
                goto error;
 
398
 
 
399
        /* we need to know the connection that we might have to abort */
 
400
        ctx->evcon = connection;
 
401
 
 
402
        if (pool->timeout > 0) {
 
403
                /* 
 
404
                 * a timeout after which the whole rpc is going to be aborted.
 
405
                 */
 
406
                struct timeval tv;
 
407
                timerclear(&tv);
 
408
                tv.tv_sec = pool->timeout;
 
409
                evtimer_add(&ctx->ev_timeout, &tv);
 
410
        }
 
411
 
 
412
        /* start the request over the connection */
 
413
        res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
 
414
        free(uri);
 
415
 
 
416
        if (res == -1)
 
417
                goto error;
 
418
 
 
419
        return (0);
 
420
 
 
421
error:
 
422
        memset(&status, 0, sizeof(status));
 
423
        status.error = EVRPC_STATUS_ERR_UNSTARTED;
 
424
        (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
 
425
        evrpc_request_wrapper_free(ctx);
 
426
        return (-1);
 
427
}
 
428
 
 
429
int
 
430
evrpc_make_request(struct evrpc_request_wrapper *ctx)
 
431
{
 
432
        struct evrpc_pool *pool = ctx->pool;
 
433
 
 
434
        /* initialize the event structure for this rpc */
 
435
        evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
 
436
 
 
437
        /* we better have some available connections on the pool */
 
438
        assert(TAILQ_FIRST(&pool->connections) != NULL);
 
439
 
 
440
        /* 
 
441
         * if no connection is available, we queue the request on the pool,
 
442
         * the next time a connection is empty, the rpc will be send on that.
 
443
         */
 
444
        TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
 
445
 
 
446
        evrpc_pool_schedule(pool);
 
447
 
 
448
        return (0);
 
449
}
 
450
 
 
451
static void
 
452
evrpc_reply_done(struct evhttp_request *req, void *arg)
 
453
{
 
454
        struct evrpc_request_wrapper *ctx = arg;
 
455
        struct evrpc_pool *pool = ctx->pool;
 
456
        struct evrpc_status status;
 
457
        int res = -1;
 
458
        
 
459
        /* cancel any timeout we might have scheduled */
 
460
        event_del(&ctx->ev_timeout);
 
461
 
 
462
        memset(&status, 0, sizeof(status));
 
463
        /* we need to get the reply now */
 
464
        if (req != NULL) {
 
465
                res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
 
466
                if (res == -1) {
 
467
                        status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
 
468
                }
 
469
        } else {
 
470
                status.error = EVRPC_STATUS_ERR_TIMEOUT;
 
471
        }
 
472
        if (res == -1) {
 
473
                /* clear everything that we might have written previously */
 
474
                ctx->reply_clear(ctx->reply);
 
475
        }
 
476
 
 
477
        (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
 
478
        
 
479
        evrpc_request_wrapper_free(ctx);
 
480
 
 
481
        /* the http layer owns the request structure */
 
482
 
 
483
        /* see if we can schedule another request */
 
484
        evrpc_pool_schedule(pool);
 
485
}
 
486
 
 
487
static void
 
488
evrpc_pool_schedule(struct evrpc_pool *pool)
 
489
{
 
490
        struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
 
491
        struct evhttp_connection *evcon;
 
492
 
 
493
        /* if no requests are pending, we have no work */
 
494
        if (ctx == NULL)
 
495
                return;
 
496
 
 
497
        if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
 
498
                TAILQ_REMOVE(&pool->requests, ctx, next);
 
499
                evrpc_schedule_request(evcon, ctx);
 
500
        }
 
501
}
 
502
 
 
503
static void
 
504
evrpc_request_timeout(int fd, short what, void *arg)
 
505
{
 
506
        struct evrpc_request_wrapper *ctx = arg;
 
507
        struct evhttp_connection *evcon = ctx->evcon;
 
508
        assert(evcon != NULL);
 
509
 
 
510
        evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
 
511
}