2
* Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions
8
* 1. Redistributions of source code must retain the above copyright
9
* notice, this list of conditions and the following disclaimer.
10
* 2. Redistributions in binary form must reproduce the above copyright
11
* notice, this list of conditions and the following disclaimer in the
12
* documentation and/or other materials provided with the distribution.
13
* 3. The name of the author may not be used to endorse or promote products
14
* derived from this software without specific prior written permission.
16
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32
#define WIN32_LEAN_AND_MEAN
35
#undef WIN32_LEAN_AND_MEAN
39
#include <sys/types.h>
41
#include <sys/socket.h>
43
#ifdef HAVE_SYS_TIME_H
46
#include <sys/queue.h>
59
#include "evrpc-internal.h"
65
evrpc_init(struct evhttp *http_server)
67
struct evrpc_base* base = calloc(1, sizeof(struct evrpc_base));
71
/* we rely on the tagging sub system */
74
TAILQ_INIT(&base->registered_rpcs);
75
TAILQ_INIT(&base->input_hooks);
76
TAILQ_INIT(&base->output_hooks);
77
base->http_server = http_server;
83
evrpc_free(struct evrpc_base *base)
86
struct evrpc_hook *hook;
88
while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
89
assert(evrpc_unregister_rpc(base, rpc->uri));
91
while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
92
assert(evrpc_remove_hook(base, INPUT, hook));
94
while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
95
assert(evrpc_remove_hook(base, OUTPUT, hook));
101
evrpc_add_hook(void *vbase,
102
enum EVRPC_HOOK_TYPE hook_type,
103
int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
106
struct _evrpc_hooks *base = vbase;
107
struct evrpc_hook_list *head = NULL;
108
struct evrpc_hook *hook = NULL;
111
head = &base->in_hooks;
114
head = &base->out_hooks;
117
assert(hook_type == INPUT || hook_type == OUTPUT);
120
hook = calloc(1, sizeof(struct evrpc_hook));
121
assert(hook != NULL);
124
hook->process_arg = cb_arg;
125
TAILQ_INSERT_TAIL(head, hook, next);
131
evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
133
struct evrpc_hook *hook = NULL;
134
TAILQ_FOREACH(hook, head, next) {
135
if (hook == handle) {
136
TAILQ_REMOVE(head, hook, next);
146
* remove the hook specified by the handle
150
evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
152
struct _evrpc_hooks *base = vbase;
153
struct evrpc_hook_list *head = NULL;
156
head = &base->in_hooks;
159
head = &base->out_hooks;
162
assert(hook_type == INPUT || hook_type == OUTPUT);
165
return (evrpc_remove_hook_internal(head, handle));
169
evrpc_process_hooks(struct evrpc_hook_list *head,
170
struct evhttp_request *req, struct evbuffer *evbuf)
172
struct evrpc_hook *hook;
173
TAILQ_FOREACH(hook, head, next) {
174
if (hook->process(req, evbuf, hook->process_arg) == -1)
181
static void evrpc_pool_schedule(struct evrpc_pool *pool);
182
static void evrpc_request_cb(struct evhttp_request *, void *);
183
void evrpc_request_done(struct evrpc_req_generic*);
186
* Registers a new RPC with the HTTP server. The evrpc object is expected
187
* to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
188
* calls this function.
192
evrpc_construct_uri(const char *uri)
194
char *constructed_uri;
195
int constructed_uri_len;
197
constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
198
if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
199
event_err(1, "%s: failed to register rpc at %s",
201
memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
202
memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
203
constructed_uri[constructed_uri_len - 1] = '\0';
205
return (constructed_uri);
209
evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
210
void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
212
char *constructed_uri = evrpc_construct_uri(rpc->uri);
216
rpc->cb_arg = cb_arg;
218
TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
220
evhttp_set_cb(base->http_server,
225
free(constructed_uri);
231
evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
233
char *registered_uri = NULL;
236
/* find the right rpc; linear search might be slow */
237
TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
238
if (strcmp(rpc->uri, name) == 0)
242
/* We did not find an RPC with this name */
245
TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
247
free((char *)rpc->uri);
250
registered_uri = evrpc_construct_uri(name);
252
/* remove the http server callback */
253
assert(evhttp_del_cb(base->http_server, registered_uri) == 0);
255
free(registered_uri);
260
evrpc_request_cb(struct evhttp_request *req, void *arg)
262
struct evrpc *rpc = arg;
263
struct evrpc_req_generic *rpc_state = NULL;
265
/* let's verify the outside parameters */
266
if (req->type != EVHTTP_REQ_POST ||
267
EVBUFFER_LENGTH(req->input_buffer) <= 0)
271
* we might want to allow hooks to suspend the processing,
272
* but at the moment, we assume that they just act as simple
275
if (evrpc_process_hooks(&rpc->base->input_hooks,
276
req, req->input_buffer) == -1)
279
rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
280
if (rpc_state == NULL)
283
/* let's check that we can parse the request */
284
rpc_state->request = rpc->request_new();
285
if (rpc_state->request == NULL)
288
rpc_state->rpc = rpc;
290
if (rpc->request_unmarshal(
291
rpc_state->request, req->input_buffer) == -1) {
292
/* we failed to parse the request; that's a bummer */
296
/* at this point, we have a well formed request, prepare the reply */
298
rpc_state->reply = rpc->reply_new();
299
if (rpc_state->reply == NULL)
302
rpc_state->http_req = req;
303
rpc_state->done = evrpc_request_done;
305
/* give the rpc to the user; they can deal with it */
306
rpc->cb(rpc_state, rpc->cb_arg);
311
evrpc_reqstate_free(rpc_state);
312
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
317
evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
319
/* clean up all memory */
320
if (rpc_state != NULL) {
321
struct evrpc *rpc = rpc_state->rpc;
323
if (rpc_state->request != NULL)
324
rpc->request_free(rpc_state->request);
325
if (rpc_state->reply != NULL)
326
rpc->reply_free(rpc_state->reply);
332
evrpc_request_done(struct evrpc_req_generic* rpc_state)
334
struct evhttp_request *req = rpc_state->http_req;
335
struct evrpc *rpc = rpc_state->rpc;
336
struct evbuffer* data = NULL;
338
if (rpc->reply_complete(rpc_state->reply) == -1) {
339
/* the reply was not completely filled in. error out */
343
if ((data = evbuffer_new()) == NULL) {
348
/* serialize the reply */
349
rpc->reply_marshal(data, rpc_state->reply);
351
/* do hook based tweaks to the request */
352
if (evrpc_process_hooks(&rpc->base->output_hooks,
356
/* on success, we are going to transmit marshaled binary data */
357
if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
358
evhttp_add_header(req->output_headers,
359
"Content-Type", "application/octet-stream");
362
evhttp_send_reply(req, HTTP_OK, "OK", data);
366
evrpc_reqstate_free(rpc_state);
373
evrpc_reqstate_free(rpc_state);
374
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
378
/* Client implementation of RPC site */
380
static int evrpc_schedule_request(struct evhttp_connection *connection,
381
struct evrpc_request_wrapper *ctx);
384
evrpc_pool_new(struct event_base *base)
386
struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
390
TAILQ_INIT(&pool->connections);
391
TAILQ_INIT(&pool->requests);
393
TAILQ_INIT(&pool->input_hooks);
394
TAILQ_INIT(&pool->output_hooks);
403
evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
410
evrpc_pool_free(struct evrpc_pool *pool)
412
struct evhttp_connection *connection;
413
struct evrpc_request_wrapper *request;
414
struct evrpc_hook *hook;
416
while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
417
TAILQ_REMOVE(&pool->requests, request, next);
418
/* if this gets more complicated we need our own function */
419
evrpc_request_wrapper_free(request);
422
while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
423
TAILQ_REMOVE(&pool->connections, connection, next);
424
evhttp_connection_free(connection);
427
while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
428
assert(evrpc_remove_hook(pool, INPUT, hook));
431
while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
432
assert(evrpc_remove_hook(pool, OUTPUT, hook));
439
* Add a connection to the RPC pool. A request scheduled on the pool
440
* may use any available connection.
444
evrpc_pool_add_connection(struct evrpc_pool *pool,
445
struct evhttp_connection *connection) {
446
assert(connection->http_server == NULL);
447
TAILQ_INSERT_TAIL(&pool->connections, connection, next);
450
* associate an event base with this connection
452
if (pool->base != NULL)
453
evhttp_connection_set_base(connection, pool->base);
456
* unless a timeout was specifically set for a connection,
457
* the connection inherits the timeout from the pool.
459
if (connection->timeout == -1)
460
connection->timeout = pool->timeout;
463
* if we have any requests pending, schedule them with the new
467
if (TAILQ_FIRST(&pool->requests) != NULL) {
468
struct evrpc_request_wrapper *request =
469
TAILQ_FIRST(&pool->requests);
470
TAILQ_REMOVE(&pool->requests, request, next);
471
evrpc_schedule_request(connection, request);
476
evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
478
struct evhttp_connection *evcon;
479
TAILQ_FOREACH(evcon, &pool->connections, next) {
480
evcon->timeout = timeout_in_secs;
482
pool->timeout = timeout_in_secs;
486
static void evrpc_reply_done(struct evhttp_request *, void *);
487
static void evrpc_request_timeout(int, short, void *);
490
* Finds a connection object associated with the pool that is currently
491
* idle and can be used to make a request.
493
static struct evhttp_connection *
494
evrpc_pool_find_connection(struct evrpc_pool *pool)
496
struct evhttp_connection *connection;
497
TAILQ_FOREACH(connection, &pool->connections, next) {
498
if (TAILQ_FIRST(&connection->requests) == NULL)
506
* We assume that the ctx is no longer queued on the pool.
509
evrpc_schedule_request(struct evhttp_connection *connection,
510
struct evrpc_request_wrapper *ctx)
512
struct evhttp_request *req = NULL;
513
struct evrpc_pool *pool = ctx->pool;
514
struct evrpc_status status;
518
if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
521
/* serialize the request data into the output buffer */
522
ctx->request_marshal(req->output_buffer, ctx->request);
524
uri = evrpc_construct_uri(ctx->name);
528
/* we need to know the connection that we might have to abort */
529
ctx->evcon = connection;
531
/* apply hooks to the outgoing request */
532
if (evrpc_process_hooks(&pool->output_hooks,
533
req, req->output_buffer) == -1)
536
if (pool->timeout > 0) {
538
* a timeout after which the whole rpc is going to be aborted.
541
evutil_timerclear(&tv);
542
tv.tv_sec = pool->timeout;
543
evtimer_add(&ctx->ev_timeout, &tv);
546
/* start the request over the connection */
547
res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
556
memset(&status, 0, sizeof(status));
557
status.error = EVRPC_STATUS_ERR_UNSTARTED;
558
(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
559
evrpc_request_wrapper_free(ctx);
564
evrpc_make_request(struct evrpc_request_wrapper *ctx)
566
struct evrpc_pool *pool = ctx->pool;
568
/* initialize the event structure for this rpc */
569
evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
570
if (pool->base != NULL)
571
event_base_set(pool->base, &ctx->ev_timeout);
573
/* we better have some available connections on the pool */
574
assert(TAILQ_FIRST(&pool->connections) != NULL);
577
* if no connection is available, we queue the request on the pool,
578
* the next time a connection is empty, the rpc will be send on that.
580
TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
582
evrpc_pool_schedule(pool);
588
evrpc_reply_done(struct evhttp_request *req, void *arg)
590
struct evrpc_request_wrapper *ctx = arg;
591
struct evrpc_pool *pool = ctx->pool;
592
struct evrpc_status status;
595
/* cancel any timeout we might have scheduled */
596
event_del(&ctx->ev_timeout);
598
memset(&status, 0, sizeof(status));
599
status.http_req = req;
601
/* we need to get the reply now */
603
/* apply hooks to the incoming request */
604
if (evrpc_process_hooks(&pool->input_hooks,
605
req, req->input_buffer) == -1) {
606
status.error = EVRPC_STATUS_ERR_HOOKABORTED;
609
res = ctx->reply_unmarshal(ctx->reply,
612
status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
616
status.error = EVRPC_STATUS_ERR_TIMEOUT;
620
/* clear everything that we might have written previously */
621
ctx->reply_clear(ctx->reply);
624
(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
626
evrpc_request_wrapper_free(ctx);
628
/* the http layer owns the request structure */
630
/* see if we can schedule another request */
631
evrpc_pool_schedule(pool);
635
evrpc_pool_schedule(struct evrpc_pool *pool)
637
struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
638
struct evhttp_connection *evcon;
640
/* if no requests are pending, we have no work */
644
if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
645
TAILQ_REMOVE(&pool->requests, ctx, next);
646
evrpc_schedule_request(evcon, ctx);
651
evrpc_request_timeout(int fd, short what, void *arg)
653
struct evrpc_request_wrapper *ctx = arg;
654
struct evhttp_connection *evcon = ctx->evcon;
655
assert(evcon != NULL);
657
evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);