2
* Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions
8
* 1. Redistributions of source code must retain the above copyright
9
* notice, this list of conditions and the following disclaimer.
10
* 2. Redistributions in binary form must reproduce the above copyright
11
* notice, this list of conditions and the following disclaimer in the
12
* documentation and/or other materials provided with the distribution.
13
* 3. The name of the author may not be used to endorse or promote products
14
* derived from this software without specific prior written permission.
16
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32
#define WIN32_LEAN_AND_MEAN
35
#undef WIN32_LEAN_AND_MEAN
39
#include <sys/types.h>
42
#include <sys/socket.h>
44
#ifdef HAVE_SYS_TIME_H
47
#include <sys/_time.h>
49
#include <sys/queue.h>
62
#include "evrpc-internal.h"
67
evrpc_init(struct evhttp *http_server)
69
struct evrpc_base* base = calloc(1, sizeof(struct evrpc_base));
73
/* we rely on the tagging sub system */
76
TAILQ_INIT(&base->registered_rpcs);
77
base->http_server = http_server;
83
evrpc_free(struct evrpc_base *base)
87
while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
88
assert(evrpc_unregister_rpc(base, rpc->uri));
94
static void evrpc_pool_schedule(struct evrpc_pool *pool);
95
static void evrpc_request_cb(struct evhttp_request *, void *);
96
void evrpc_request_done(struct evrpc_req_generic*);
99
* Registers a new RPC with the HTTP server. The evrpc object is expected
100
* to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
101
* calls this function.
105
evrpc_construct_uri(const char *uri)
107
char *constructed_uri;
108
int constructed_uri_len;
110
constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
111
if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
112
event_err(1, "%s: failed to register rpc at %s",
114
memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
115
memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
116
constructed_uri[constructed_uri_len - 1] = '\0';
118
return (constructed_uri);
122
evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
123
void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
125
char *constructed_uri = evrpc_construct_uri(rpc->uri);
128
rpc->cb_arg = cb_arg;
130
TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
132
evhttp_set_cb(base->http_server,
137
free(constructed_uri);
143
evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
145
char *registered_uri = NULL;
148
/* find the right rpc; linear search might be slow */
149
TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
150
if (strcmp(rpc->uri, name) == 0)
154
/* We did not find an RPC with this name */
157
TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
159
free((char *)rpc->uri);
162
registered_uri = evrpc_construct_uri(name);
164
/* remove the http server callback */
165
assert(evhttp_del_cb(base->http_server, registered_uri) == 0);
167
free(registered_uri);
172
evrpc_request_cb(struct evhttp_request *req, void *arg)
174
struct evrpc *rpc = arg;
175
struct evrpc_req_generic *rpc_state = NULL;
177
/* let's verify the outside parameters */
178
if (req->type != EVHTTP_REQ_POST ||
179
EVBUFFER_LENGTH(req->input_buffer) <= 0)
182
rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
183
if (rpc_state == NULL)
186
/* let's check that we can parse the request */
187
rpc_state->request = rpc->request_new();
188
if (rpc_state->request == NULL)
191
rpc_state->rpc = rpc;
193
if (rpc->request_unmarshal(
194
rpc_state->request, req->input_buffer) == -1) {
195
/* we failed to parse the request; that's a bummer */
199
/* at this point, we have a well formed request, prepare the reply */
201
rpc_state->reply = rpc->reply_new();
202
if (rpc_state->reply == NULL)
205
rpc_state->http_req = req;
206
rpc_state->done = evrpc_request_done;
208
/* give the rpc to the user; they can deal with it */
209
rpc->cb(rpc_state, rpc->cb_arg);
214
evrpc_reqstate_free(rpc_state);
215
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
220
evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
222
/* clean up all memory */
223
if (rpc_state != NULL) {
224
struct evrpc *rpc = rpc_state->rpc;
226
if (rpc_state->request != NULL)
227
rpc->request_free(rpc_state->request);
228
if (rpc_state->reply != NULL)
229
rpc->reply_free(rpc_state->reply);
235
evrpc_request_done(struct evrpc_req_generic* rpc_state)
237
struct evhttp_request *req = rpc_state->http_req;
238
struct evrpc *rpc = rpc_state->rpc;
239
struct evbuffer* data;
241
if (rpc->reply_complete(rpc_state->reply) == -1) {
242
/* the reply was not completely filled in. error out */
246
if ((data = evbuffer_new()) == NULL) {
251
/* serialize the reply */
252
rpc->reply_marshal(data, rpc_state->reply);
254
evhttp_send_reply(req, HTTP_OK, "OK", data);
258
evrpc_reqstate_free(rpc_state);
263
evrpc_reqstate_free(rpc_state);
264
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
268
/* Client implementation of RPC site */
270
static int evrpc_schedule_request(struct evhttp_connection *connection,
271
struct evrpc_request_wrapper *ctx);
276
struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
280
TAILQ_INIT(&pool->connections);
281
TAILQ_INIT(&pool->requests);
289
evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
296
evrpc_pool_free(struct evrpc_pool *pool)
298
struct evhttp_connection *connection;
299
struct evrpc_request_wrapper *request;
301
while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
302
TAILQ_REMOVE(&pool->requests, request, next);
303
/* if this gets more complicated we need our own function */
304
evrpc_request_wrapper_free(request);
307
while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
308
TAILQ_REMOVE(&pool->connections, connection, next);
309
evhttp_connection_free(connection);
316
* Add a connection to the RPC pool. A request scheduled on the pool
317
* may use any available connection.
321
evrpc_pool_add_connection(struct evrpc_pool *pool,
322
struct evhttp_connection *connection) {
323
assert(connection->http_server == NULL);
324
TAILQ_INSERT_TAIL(&pool->connections, connection, next);
327
* unless a timeout was specifically set for a connection,
328
* the connection inherits the timeout from the pool.
330
if (connection->timeout == -1)
331
connection->timeout = pool->timeout;
334
* if we have any requests pending, schedule them with the new
338
if (TAILQ_FIRST(&pool->requests) != NULL) {
339
struct evrpc_request_wrapper *request =
340
TAILQ_FIRST(&pool->requests);
341
TAILQ_REMOVE(&pool->requests, request, next);
342
evrpc_schedule_request(connection, request);
347
evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
349
struct evhttp_connection *evcon;
350
TAILQ_FOREACH(evcon, &pool->connections, next) {
351
evcon->timeout = timeout_in_secs;
353
pool->timeout = timeout_in_secs;
357
static void evrpc_reply_done(struct evhttp_request *, void *);
358
static void evrpc_request_timeout(int, short, void *);
361
* Finds a connection object associated with the pool that is currently
362
* idle and can be used to make a request.
364
static struct evhttp_connection *
365
evrpc_pool_find_connection(struct evrpc_pool *pool)
367
struct evhttp_connection *connection;
368
TAILQ_FOREACH(connection, &pool->connections, next) {
369
if (TAILQ_FIRST(&connection->requests) == NULL)
377
* We assume that the ctx is no longer queued on the pool.
380
evrpc_schedule_request(struct evhttp_connection *connection,
381
struct evrpc_request_wrapper *ctx)
383
struct evhttp_request *req = NULL;
384
struct evrpc_pool *pool = ctx->pool;
385
struct evrpc_status status;
389
if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
392
/* serialize the request data into the output buffer */
393
ctx->request_marshal(req->output_buffer, ctx->request);
395
uri = evrpc_construct_uri(ctx->name);
399
/* we need to know the connection that we might have to abort */
400
ctx->evcon = connection;
402
if (pool->timeout > 0) {
404
* a timeout after which the whole rpc is going to be aborted.
408
tv.tv_sec = pool->timeout;
409
evtimer_add(&ctx->ev_timeout, &tv);
412
/* start the request over the connection */
413
res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
422
memset(&status, 0, sizeof(status));
423
status.error = EVRPC_STATUS_ERR_UNSTARTED;
424
(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
425
evrpc_request_wrapper_free(ctx);
430
evrpc_make_request(struct evrpc_request_wrapper *ctx)
432
struct evrpc_pool *pool = ctx->pool;
434
/* initialize the event structure for this rpc */
435
evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
437
/* we better have some available connections on the pool */
438
assert(TAILQ_FIRST(&pool->connections) != NULL);
441
* if no connection is available, we queue the request on the pool,
442
* the next time a connection is empty, the rpc will be send on that.
444
TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
446
evrpc_pool_schedule(pool);
452
evrpc_reply_done(struct evhttp_request *req, void *arg)
454
struct evrpc_request_wrapper *ctx = arg;
455
struct evrpc_pool *pool = ctx->pool;
456
struct evrpc_status status;
459
/* cancel any timeout we might have scheduled */
460
event_del(&ctx->ev_timeout);
462
memset(&status, 0, sizeof(status));
463
/* we need to get the reply now */
465
res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
467
status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
470
status.error = EVRPC_STATUS_ERR_TIMEOUT;
473
/* clear everything that we might have written previously */
474
ctx->reply_clear(ctx->reply);
477
(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
479
evrpc_request_wrapper_free(ctx);
481
/* the http layer owns the request structure */
483
/* see if we can schedule another request */
484
evrpc_pool_schedule(pool);
488
evrpc_pool_schedule(struct evrpc_pool *pool)
490
struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
491
struct evhttp_connection *evcon;
493
/* if no requests are pending, we have no work */
497
if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
498
TAILQ_REMOVE(&pool->requests, ctx, next);
499
evrpc_schedule_request(evcon, ctx);
504
evrpc_request_timeout(int fd, short what, void *arg)
506
struct evrpc_request_wrapper *ctx = arg;
507
struct evhttp_connection *evcon = ctx->evcon;
508
assert(evcon != NULL);
510
evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);