~jlukas79/+junk/mysql-server

« back to all changes in this revision

Viewing changes to extra/libevent/evrpc.c

manual merge 6.0-main --> 6.0-bka-review

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 * 1. Redistributions of source code must retain the above copyright
 
9
 *    notice, this list of conditions and the following disclaimer.
 
10
 * 2. Redistributions in binary form must reproduce the above copyright
 
11
 *    notice, this list of conditions and the following disclaimer in the
 
12
 *    documentation and/or other materials provided with the distribution.
 
13
 * 3. The name of the author may not be used to endorse or promote products
 
14
 *    derived from this software without specific prior written permission.
 
15
 *
 
16
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
 
17
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
18
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
19
 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
 
20
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
21
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
22
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
23
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
24
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
25
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
26
 */
 
27
#ifdef HAVE_CONFIG_H
 
28
#include "config.h"
 
29
#endif
 
30
 
 
31
#ifdef WIN32
 
32
#define WIN32_LEAN_AND_MEAN
 
33
#include <windows.h>
 
34
#include <winsock2.h>
 
35
#undef WIN32_LEAN_AND_MEAN
 
36
#include "misc.h"
 
37
#endif
 
38
 
 
39
#include <sys/types.h>
 
40
#ifndef WIN32
 
41
#include <sys/socket.h>
 
42
#endif
 
43
#ifdef HAVE_SYS_TIME_H
 
44
#include <sys/time.h>
 
45
#endif
 
46
#include <sys/queue.h>
 
47
#include <stdio.h>
 
48
#include <stdlib.h>
 
49
#ifndef WIN32
 
50
#include <unistd.h>
 
51
#endif
 
52
#include <errno.h>
 
53
#include <signal.h>
 
54
#include <string.h>
 
55
#include <assert.h>
 
56
 
 
57
#include "event.h"
 
58
#include "evrpc.h"
 
59
#include "evrpc-internal.h"
 
60
#include "evhttp.h"
 
61
#include "evutil.h"
 
62
#include "log.h"
 
63
 
 
64
struct evrpc_base *
 
65
evrpc_init(struct evhttp *http_server)
 
66
{
 
67
        struct evrpc_base* base = calloc(1, sizeof(struct evrpc_base));
 
68
        if (base == NULL)
 
69
                return (NULL);
 
70
 
 
71
        /* we rely on the tagging sub system */
 
72
        evtag_init();
 
73
 
 
74
        TAILQ_INIT(&base->registered_rpcs);
 
75
        TAILQ_INIT(&base->input_hooks);
 
76
        TAILQ_INIT(&base->output_hooks);
 
77
        base->http_server = http_server;
 
78
 
 
79
        return (base);
 
80
}
 
81
 
 
82
void
 
83
evrpc_free(struct evrpc_base *base)
 
84
{
 
85
        struct evrpc *rpc;
 
86
        struct evrpc_hook *hook;
 
87
 
 
88
        while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
 
89
                assert(evrpc_unregister_rpc(base, rpc->uri));
 
90
        }
 
91
        while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
 
92
                assert(evrpc_remove_hook(base, INPUT, hook));
 
93
        }
 
94
        while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
 
95
                assert(evrpc_remove_hook(base, OUTPUT, hook));
 
96
        }
 
97
        free(base);
 
98
}
 
99
 
 
100
void *
 
101
evrpc_add_hook(void *vbase,
 
102
    enum EVRPC_HOOK_TYPE hook_type,
 
103
    int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
 
104
    void *cb_arg)
 
105
{
 
106
        struct _evrpc_hooks *base = vbase;
 
107
        struct evrpc_hook_list *head = NULL;
 
108
        struct evrpc_hook *hook = NULL;
 
109
        switch (hook_type) {
 
110
        case INPUT:
 
111
                head = &base->in_hooks;
 
112
                break;
 
113
        case OUTPUT:
 
114
                head = &base->out_hooks;
 
115
                break;
 
116
        default:
 
117
                assert(hook_type == INPUT || hook_type == OUTPUT);
 
118
        }
 
119
 
 
120
        hook = calloc(1, sizeof(struct evrpc_hook));
 
121
        assert(hook != NULL);
 
122
        
 
123
        hook->process = cb;
 
124
        hook->process_arg = cb_arg;
 
125
        TAILQ_INSERT_TAIL(head, hook, next);
 
126
 
 
127
        return (hook);
 
128
}
 
129
 
 
130
static int
 
131
evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
 
132
{
 
133
        struct evrpc_hook *hook = NULL;
 
134
        TAILQ_FOREACH(hook, head, next) {
 
135
                if (hook == handle) {
 
136
                        TAILQ_REMOVE(head, hook, next);
 
137
                        free(hook);
 
138
                        return (1);
 
139
                }
 
140
        }
 
141
 
 
142
        return (0);
 
143
}
 
144
 
 
145
/*
 
146
 * remove the hook specified by the handle
 
147
 */
 
148
 
 
149
int
 
150
evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
 
151
{
 
152
        struct _evrpc_hooks *base = vbase;
 
153
        struct evrpc_hook_list *head = NULL;
 
154
        switch (hook_type) {
 
155
        case INPUT:
 
156
                head = &base->in_hooks;
 
157
                break;
 
158
        case OUTPUT:
 
159
                head = &base->out_hooks;
 
160
                break;
 
161
        default:
 
162
                assert(hook_type == INPUT || hook_type == OUTPUT);
 
163
        }
 
164
 
 
165
        return (evrpc_remove_hook_internal(head, handle));
 
166
}
 
167
 
 
168
static int
 
169
evrpc_process_hooks(struct evrpc_hook_list *head,
 
170
    struct evhttp_request *req, struct evbuffer *evbuf)
 
171
{
 
172
        struct evrpc_hook *hook;
 
173
        TAILQ_FOREACH(hook, head, next) {
 
174
                if (hook->process(req, evbuf, hook->process_arg) == -1)
 
175
                        return (-1);
 
176
        }
 
177
 
 
178
        return (0);
 
179
}
 
180
 
 
181
static void evrpc_pool_schedule(struct evrpc_pool *pool);
 
182
static void evrpc_request_cb(struct evhttp_request *, void *);
 
183
void evrpc_request_done(struct evrpc_req_generic*);
 
184
 
 
185
/*
 
186
 * Registers a new RPC with the HTTP server.   The evrpc object is expected
 
187
 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
 
188
 * calls this function.
 
189
 */
 
190
 
 
191
static char *
 
192
evrpc_construct_uri(const char *uri)
 
193
{
 
194
        char *constructed_uri;
 
195
        int constructed_uri_len;
 
196
 
 
197
        constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
 
198
        if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
 
199
                event_err(1, "%s: failed to register rpc at %s",
 
200
                    __func__, uri);
 
201
        memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
 
202
        memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
 
203
        constructed_uri[constructed_uri_len - 1] = '\0';
 
204
 
 
205
        return (constructed_uri);
 
206
}
 
207
 
 
208
int
 
209
evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
 
210
    void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
 
211
{
 
212
        char *constructed_uri = evrpc_construct_uri(rpc->uri);
 
213
 
 
214
        rpc->base = base;
 
215
        rpc->cb = cb;
 
216
        rpc->cb_arg = cb_arg;
 
217
 
 
218
        TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
 
219
 
 
220
        evhttp_set_cb(base->http_server,
 
221
            constructed_uri,
 
222
            evrpc_request_cb,
 
223
            rpc);
 
224
        
 
225
        free(constructed_uri);
 
226
 
 
227
        return (0);
 
228
}
 
229
 
 
230
int
 
231
evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
 
232
{
 
233
        char *registered_uri = NULL;
 
234
        struct evrpc *rpc;
 
235
 
 
236
        /* find the right rpc; linear search might be slow */
 
237
        TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
 
238
                if (strcmp(rpc->uri, name) == 0)
 
239
                        break;
 
240
        }
 
241
        if (rpc == NULL) {
 
242
                /* We did not find an RPC with this name */
 
243
                return (-1);
 
244
        }
 
245
        TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
 
246
        
 
247
        free((char *)rpc->uri);
 
248
        free(rpc);
 
249
 
 
250
        registered_uri = evrpc_construct_uri(name);
 
251
 
 
252
        /* remove the http server callback */
 
253
        assert(evhttp_del_cb(base->http_server, registered_uri) == 0);
 
254
 
 
255
        free(registered_uri);
 
256
        return (0);
 
257
}
 
258
 
 
259
static void
 
260
evrpc_request_cb(struct evhttp_request *req, void *arg)
 
261
{
 
262
        struct evrpc *rpc = arg;
 
263
        struct evrpc_req_generic *rpc_state = NULL;
 
264
 
 
265
        /* let's verify the outside parameters */
 
266
        if (req->type != EVHTTP_REQ_POST ||
 
267
            EVBUFFER_LENGTH(req->input_buffer) <= 0)
 
268
                goto error;
 
269
 
 
270
        /*
 
271
         * we might want to allow hooks to suspend the processing,
 
272
         * but at the moment, we assume that they just act as simple
 
273
         * filters.
 
274
         */
 
275
        if (evrpc_process_hooks(&rpc->base->input_hooks,
 
276
                req, req->input_buffer) == -1)
 
277
                goto error;
 
278
 
 
279
        rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
 
280
        if (rpc_state == NULL)
 
281
                goto error;
 
282
 
 
283
        /* let's check that we can parse the request */
 
284
        rpc_state->request = rpc->request_new();
 
285
        if (rpc_state->request == NULL)
 
286
                goto error;
 
287
 
 
288
        rpc_state->rpc = rpc;
 
289
 
 
290
        if (rpc->request_unmarshal(
 
291
                    rpc_state->request, req->input_buffer) == -1) {
 
292
                /* we failed to parse the request; that's a bummer */
 
293
                goto error;
 
294
        }
 
295
 
 
296
        /* at this point, we have a well formed request, prepare the reply */
 
297
 
 
298
        rpc_state->reply = rpc->reply_new();
 
299
        if (rpc_state->reply == NULL)
 
300
                goto error;
 
301
 
 
302
        rpc_state->http_req = req;
 
303
        rpc_state->done = evrpc_request_done;
 
304
 
 
305
        /* give the rpc to the user; they can deal with it */
 
306
        rpc->cb(rpc_state, rpc->cb_arg);
 
307
 
 
308
        return;
 
309
 
 
310
error:
 
311
        evrpc_reqstate_free(rpc_state);
 
312
        evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
 
313
        return;
 
314
}
 
315
 
 
316
void
 
317
evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
 
318
{
 
319
        /* clean up all memory */
 
320
        if (rpc_state != NULL) {
 
321
                struct evrpc *rpc = rpc_state->rpc;
 
322
 
 
323
                if (rpc_state->request != NULL)
 
324
                        rpc->request_free(rpc_state->request);
 
325
                if (rpc_state->reply != NULL)
 
326
                        rpc->reply_free(rpc_state->reply);
 
327
                free(rpc_state);
 
328
        }
 
329
}
 
330
 
 
331
void
 
332
evrpc_request_done(struct evrpc_req_generic* rpc_state)
 
333
{
 
334
        struct evhttp_request *req = rpc_state->http_req;
 
335
        struct evrpc *rpc = rpc_state->rpc;
 
336
        struct evbuffer* data = NULL;
 
337
 
 
338
        if (rpc->reply_complete(rpc_state->reply) == -1) {
 
339
                /* the reply was not completely filled in.  error out */
 
340
                goto error;
 
341
        }
 
342
 
 
343
        if ((data = evbuffer_new()) == NULL) {
 
344
                /* out of memory */
 
345
                goto error;
 
346
        }
 
347
 
 
348
        /* serialize the reply */
 
349
        rpc->reply_marshal(data, rpc_state->reply);
 
350
 
 
351
        /* do hook based tweaks to the request */
 
352
        if (evrpc_process_hooks(&rpc->base->output_hooks,
 
353
                req, data) == -1)
 
354
                goto error;
 
355
 
 
356
        /* on success, we are going to transmit marshaled binary data */
 
357
        if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
 
358
                evhttp_add_header(req->output_headers,
 
359
                    "Content-Type", "application/octet-stream");
 
360
        }
 
361
 
 
362
        evhttp_send_reply(req, HTTP_OK, "OK", data);
 
363
 
 
364
        evbuffer_free(data);
 
365
 
 
366
        evrpc_reqstate_free(rpc_state);
 
367
 
 
368
        return;
 
369
 
 
370
error:
 
371
        if (data != NULL)
 
372
                evbuffer_free(data);
 
373
        evrpc_reqstate_free(rpc_state);
 
374
        evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
 
375
        return;
 
376
}
 
377
 
 
378
/* Client implementation of RPC site */
 
379
 
 
380
static int evrpc_schedule_request(struct evhttp_connection *connection,
 
381
    struct evrpc_request_wrapper *ctx);
 
382
 
 
383
struct evrpc_pool *
 
384
evrpc_pool_new(struct event_base *base)
 
385
{
 
386
        struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
 
387
        if (pool == NULL)
 
388
                return (NULL);
 
389
 
 
390
        TAILQ_INIT(&pool->connections);
 
391
        TAILQ_INIT(&pool->requests);
 
392
 
 
393
        TAILQ_INIT(&pool->input_hooks);
 
394
        TAILQ_INIT(&pool->output_hooks);
 
395
 
 
396
        pool->base = base;
 
397
        pool->timeout = -1;
 
398
 
 
399
        return (pool);
 
400
}
 
401
 
 
402
static void
 
403
evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
 
404
{
 
405
        free(request->name);
 
406
        free(request);
 
407
}
 
408
 
 
409
void
 
410
evrpc_pool_free(struct evrpc_pool *pool)
 
411
{
 
412
        struct evhttp_connection *connection;
 
413
        struct evrpc_request_wrapper *request;
 
414
        struct evrpc_hook *hook;
 
415
 
 
416
        while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
 
417
                TAILQ_REMOVE(&pool->requests, request, next);
 
418
                /* if this gets more complicated we need our own function */
 
419
                evrpc_request_wrapper_free(request);
 
420
        }
 
421
 
 
422
        while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
 
423
                TAILQ_REMOVE(&pool->connections, connection, next);
 
424
                evhttp_connection_free(connection);
 
425
        }
 
426
 
 
427
        while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
 
428
                assert(evrpc_remove_hook(pool, INPUT, hook));
 
429
        }
 
430
 
 
431
        while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
 
432
                assert(evrpc_remove_hook(pool, OUTPUT, hook));
 
433
        }
 
434
 
 
435
        free(pool);
 
436
}
 
437
 
 
438
/*
 
439
 * Add a connection to the RPC pool.   A request scheduled on the pool
 
440
 * may use any available connection.
 
441
 */
 
442
 
 
443
void
 
444
evrpc_pool_add_connection(struct evrpc_pool *pool,
 
445
    struct evhttp_connection *connection) {
 
446
        assert(connection->http_server == NULL);
 
447
        TAILQ_INSERT_TAIL(&pool->connections, connection, next);
 
448
 
 
449
        /*
 
450
         * associate an event base with this connection
 
451
         */
 
452
        if (pool->base != NULL)
 
453
                evhttp_connection_set_base(connection, pool->base);
 
454
 
 
455
        /* 
 
456
         * unless a timeout was specifically set for a connection,
 
457
         * the connection inherits the timeout from the pool.
 
458
         */
 
459
        if (connection->timeout == -1)
 
460
                connection->timeout = pool->timeout;
 
461
 
 
462
        /* 
 
463
         * if we have any requests pending, schedule them with the new
 
464
         * connections.
 
465
         */
 
466
 
 
467
        if (TAILQ_FIRST(&pool->requests) != NULL) {
 
468
                struct evrpc_request_wrapper *request = 
 
469
                    TAILQ_FIRST(&pool->requests);
 
470
                TAILQ_REMOVE(&pool->requests, request, next);
 
471
                evrpc_schedule_request(connection, request);
 
472
        }
 
473
}
 
474
 
 
475
void
 
476
evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
 
477
{
 
478
        struct evhttp_connection *evcon;
 
479
        TAILQ_FOREACH(evcon, &pool->connections, next) {
 
480
                evcon->timeout = timeout_in_secs;
 
481
        }
 
482
        pool->timeout = timeout_in_secs;
 
483
}
 
484
 
 
485
 
 
486
static void evrpc_reply_done(struct evhttp_request *, void *);
 
487
static void evrpc_request_timeout(int, short, void *);
 
488
 
 
489
/*
 
490
 * Finds a connection object associated with the pool that is currently
 
491
 * idle and can be used to make a request.
 
492
 */
 
493
static struct evhttp_connection *
 
494
evrpc_pool_find_connection(struct evrpc_pool *pool)
 
495
{
 
496
        struct evhttp_connection *connection;
 
497
        TAILQ_FOREACH(connection, &pool->connections, next) {
 
498
                if (TAILQ_FIRST(&connection->requests) == NULL)
 
499
                        return (connection);
 
500
        }
 
501
 
 
502
        return (NULL);
 
503
}
 
504
 
 
505
/*
 
506
 * We assume that the ctx is no longer queued on the pool.
 
507
 */
 
508
static int
 
509
evrpc_schedule_request(struct evhttp_connection *connection,
 
510
    struct evrpc_request_wrapper *ctx)
 
511
{
 
512
        struct evhttp_request *req = NULL;
 
513
        struct evrpc_pool *pool = ctx->pool;
 
514
        struct evrpc_status status;
 
515
        char *uri = NULL;
 
516
        int res = 0;
 
517
 
 
518
        if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
 
519
                goto error;
 
520
 
 
521
        /* serialize the request data into the output buffer */
 
522
        ctx->request_marshal(req->output_buffer, ctx->request);
 
523
 
 
524
        uri = evrpc_construct_uri(ctx->name);
 
525
        if (uri == NULL)
 
526
                goto error;
 
527
 
 
528
        /* we need to know the connection that we might have to abort */
 
529
        ctx->evcon = connection;
 
530
 
 
531
        /* apply hooks to the outgoing request */
 
532
        if (evrpc_process_hooks(&pool->output_hooks,
 
533
                req, req->output_buffer) == -1)
 
534
                goto error;
 
535
 
 
536
        if (pool->timeout > 0) {
 
537
                /* 
 
538
                 * a timeout after which the whole rpc is going to be aborted.
 
539
                 */
 
540
                struct timeval tv;
 
541
                evutil_timerclear(&tv);
 
542
                tv.tv_sec = pool->timeout;
 
543
                evtimer_add(&ctx->ev_timeout, &tv);
 
544
        }
 
545
 
 
546
        /* start the request over the connection */
 
547
        res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
 
548
        free(uri);
 
549
 
 
550
        if (res == -1)
 
551
                goto error;
 
552
 
 
553
        return (0);
 
554
 
 
555
error:
 
556
        memset(&status, 0, sizeof(status));
 
557
        status.error = EVRPC_STATUS_ERR_UNSTARTED;
 
558
        (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
 
559
        evrpc_request_wrapper_free(ctx);
 
560
        return (-1);
 
561
}
 
562
 
 
563
int
 
564
evrpc_make_request(struct evrpc_request_wrapper *ctx)
 
565
{
 
566
        struct evrpc_pool *pool = ctx->pool;
 
567
 
 
568
        /* initialize the event structure for this rpc */
 
569
        evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
 
570
        if (pool->base != NULL)
 
571
                event_base_set(pool->base, &ctx->ev_timeout);
 
572
 
 
573
        /* we better have some available connections on the pool */
 
574
        assert(TAILQ_FIRST(&pool->connections) != NULL);
 
575
 
 
576
        /* 
 
577
         * if no connection is available, we queue the request on the pool,
 
578
         * the next time a connection is empty, the rpc will be send on that.
 
579
         */
 
580
        TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
 
581
 
 
582
        evrpc_pool_schedule(pool);
 
583
 
 
584
        return (0);
 
585
}
 
586
 
 
587
static void
 
588
evrpc_reply_done(struct evhttp_request *req, void *arg)
 
589
{
 
590
        struct evrpc_request_wrapper *ctx = arg;
 
591
        struct evrpc_pool *pool = ctx->pool;
 
592
        struct evrpc_status status;
 
593
        int res = -1;
 
594
        
 
595
        /* cancel any timeout we might have scheduled */
 
596
        event_del(&ctx->ev_timeout);
 
597
 
 
598
        memset(&status, 0, sizeof(status));
 
599
        status.http_req = req;
 
600
 
 
601
        /* we need to get the reply now */
 
602
        if (req != NULL) {
 
603
                /* apply hooks to the incoming request */
 
604
                if (evrpc_process_hooks(&pool->input_hooks,
 
605
                        req, req->input_buffer) == -1) {
 
606
                        status.error = EVRPC_STATUS_ERR_HOOKABORTED;
 
607
                        res = -1;
 
608
                } else {
 
609
                        res = ctx->reply_unmarshal(ctx->reply,
 
610
                            req->input_buffer);
 
611
                        if (res == -1) {
 
612
                                status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
 
613
                        }
 
614
                }
 
615
        } else {
 
616
                status.error = EVRPC_STATUS_ERR_TIMEOUT;
 
617
        }
 
618
 
 
619
        if (res == -1) {
 
620
                /* clear everything that we might have written previously */
 
621
                ctx->reply_clear(ctx->reply);
 
622
        }
 
623
 
 
624
        (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
 
625
        
 
626
        evrpc_request_wrapper_free(ctx);
 
627
 
 
628
        /* the http layer owns the request structure */
 
629
 
 
630
        /* see if we can schedule another request */
 
631
        evrpc_pool_schedule(pool);
 
632
}
 
633
 
 
634
static void
 
635
evrpc_pool_schedule(struct evrpc_pool *pool)
 
636
{
 
637
        struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
 
638
        struct evhttp_connection *evcon;
 
639
 
 
640
        /* if no requests are pending, we have no work */
 
641
        if (ctx == NULL)
 
642
                return;
 
643
 
 
644
        if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
 
645
                TAILQ_REMOVE(&pool->requests, ctx, next);
 
646
                evrpc_schedule_request(evcon, ctx);
 
647
        }
 
648
}
 
649
 
 
650
static void
 
651
evrpc_request_timeout(int fd, short what, void *arg)
 
652
{
 
653
        struct evrpc_request_wrapper *ctx = arg;
 
654
        struct evhttp_connection *evcon = ctx->evcon;
 
655
        assert(evcon != NULL);
 
656
 
 
657
        evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
 
658
}