~galfy/helenos/bird-port-mainline

« back to all changes in this revision

Viewing changes to uspace/lib/libc/generic/async.c

  • Committer: Martin Decky
  • Date: 2009-08-04 11:19:19 UTC
  • Revision ID: martin@uranus.dsrg.hide.ms.mff.cuni.cz-20090804111919-evyclddlr3v5lhmp
Initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2006 Ondrej Palkovsky
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions
 
7
 * are met:
 
8
 *
 
9
 * - Redistributions of source code must retain the above copyright
 
10
 *   notice, this list of conditions and the following disclaimer.
 
11
 * - Redistributions in binary form must reproduce the above copyright
 
12
 *   notice, this list of conditions and the following disclaimer in the
 
13
 *   documentation and/or other materials provided with the distribution.
 
14
 * - The name of the author may not be used to endorse or promote products
 
15
 *   derived from this software without specific prior written permission.
 
16
 *
 
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
 
18
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
19
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 
20
 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
 
21
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 
22
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
23
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
24
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
25
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
26
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
27
 */
 
28
 
 
29
/** @addtogroup libc
 
30
 * @{
 
31
 */
 
32
/** @file
 
33
 */
 
34
 
 
35
/**
 
36
 * Asynchronous library
 
37
 *
 
38
 * The aim of this library is to provide a facility for writing programs which
 
39
 * utilize the asynchronous nature of HelenOS IPC, yet using a normal way of
 
40
 * programming.
 
41
 *
 
42
 * You should be able to write very simple multithreaded programs, the async
 
43
 * framework will automatically take care of most synchronization problems.
 
44
 *
 
45
 * Default semantics:
 
46
 * - async_send_*(): Send asynchronously. If the kernel refuses to send
 
47
 *                   more messages, [ try to get responses from kernel, if
 
48
 *                   nothing found, might try synchronous ]
 
49
 *
 
50
 * Example of use (pseudo C):
 
51
 *
 
52
 * 1) Multithreaded client application
 
53
 *
 
54
 *   fibril_create(fibril1, ...);
 
55
 *   fibril_create(fibril2, ...);
 
56
 *   ...
 
57
 *
 
58
 *   int fibril1(void *arg)
 
59
 *   {
 
60
 *     conn = ipc_connect_me_to();
 
61
 *     c1 = async_send(conn);
 
62
 *     c2 = async_send(conn);
 
63
 *     async_wait_for(c1);
 
64
 *     async_wait_for(c2);
 
65
 *     ...
 
66
 *   }
 
67
 *
 
68
 *
 
69
 * 2) Multithreaded server application
 
70
 *
 
71
 *   main()
 
72
 *   {
 
73
 *     async_manager();
 
74
 *   }
 
75
 *
 
76
 *   my_client_connection(icallid, *icall)
 
77
 *   {
 
78
 *     if (want_refuse) {
 
79
 *       ipc_answer_0(icallid, ELIMIT);
 
80
 *       return;
 
81
 *     }
 
82
 *     ipc_answer_0(icallid, EOK);
 
83
 *
 
84
 *     callid = async_get_call(&call);
 
85
 *     handle_call(callid, call);
 
86
 *     ipc_answer_2(callid, 1, 2, 3);
 
87
 *
 
88
 *     callid = async_get_call(&call);
 
89
 *     ...
 
90
 *   }
 
91
 *
 
92
 */
 
93
 
 
94
#include <futex.h>
 
95
#include <async.h>
 
96
#include <fibril.h>
 
97
#include <stdio.h>
 
98
#include <adt/hash_table.h>
 
99
#include <adt/list.h>
 
100
#include <ipc/ipc.h>
 
101
#include <assert.h>
 
102
#include <errno.h>
 
103
#include <sys/time.h>
 
104
#include <arch/barrier.h>
 
105
#include <bool.h>
 
106
 
 
107
atomic_t async_futex = FUTEX_INITIALIZER;
 
108
 
 
109
/** Structures of this type represent a waiting fibril. */
 
110
typedef struct {
 
111
        /** Expiration time. */
 
112
        struct timeval expires;
 
113
        
 
114
        /** If true, this struct is in the timeout list. */
 
115
        bool inlist;
 
116
        
 
117
        /** Timeout list link. */
 
118
        link_t link;
 
119
        
 
120
        /** Identification of and link to the waiting fibril. */
 
121
        fid_t fid;
 
122
        
 
123
        /** If true, this fibril is currently active. */
 
124
        bool active;
 
125
        
 
126
        /** If true, we have timed out. */
 
127
        bool timedout;
 
128
} awaiter_t;
 
129
 
 
130
typedef struct {
 
131
        awaiter_t wdata;
 
132
        
 
133
        /** If reply was received. */
 
134
        bool done;
 
135
        
 
136
        /** Pointer to where the answer data is stored. */
 
137
        ipc_call_t *dataptr;
 
138
        
 
139
        ipcarg_t retval;
 
140
} amsg_t;
 
141
 
 
142
/**
 
143
 * Structures of this type are used to group information about a call and a
 
144
 * message queue link.
 
145
 */
 
146
typedef struct {
 
147
        link_t link;
 
148
        ipc_callid_t callid;
 
149
        ipc_call_t call;
 
150
} msg_t;
 
151
 
 
152
typedef struct {
 
153
        awaiter_t wdata;
 
154
        
 
155
        /** Hash table link. */
 
156
        link_t link;
 
157
        
 
158
        /** Incoming phone hash. */
 
159
        ipcarg_t in_phone_hash;
 
160
        
 
161
        /** Messages that should be delivered to this fibril. */
 
162
        link_t msg_queue;
 
163
        
 
164
        /** Identification of the opening call. */
 
165
        ipc_callid_t callid;
 
166
        /** Call data of the opening call. */
 
167
        ipc_call_t call;
 
168
        
 
169
        /** Identification of the closing call. */
 
170
        ipc_callid_t close_callid;
 
171
        
 
172
        /** Fibril function that will be used to handle the connection. */
 
173
        void (*cfibril)(ipc_callid_t, ipc_call_t *);
 
174
} connection_t;
 
175
 
 
176
/** Identifier of the incoming connection handled by the current fibril. */
 
177
fibril_local connection_t *FIBRIL_connection;
 
178
 
 
179
static void default_client_connection(ipc_callid_t callid, ipc_call_t *call);
 
180
static void default_interrupt_received(ipc_callid_t callid, ipc_call_t *call);
 
181
 
 
182
/**
 
183
 * Pointer to a fibril function that will be used to handle connections.
 
184
 */
 
185
static async_client_conn_t client_connection = default_client_connection;
 
186
 
 
187
/**
 
188
 * Pointer to a fibril function that will be used to handle interrupt
 
189
 * notifications.
 
190
 */
 
191
static async_client_conn_t interrupt_received = default_interrupt_received;
 
192
 
 
193
static hash_table_t conn_hash_table;
 
194
static LIST_INITIALIZE(timeout_list);
 
195
 
 
196
#define CONN_HASH_TABLE_CHAINS  32
 
197
 
 
198
/** Compute hash into the connection hash table based on the source phone hash.
 
199
 *
 
200
 * @param key Pointer to source phone hash.
 
201
 *
 
202
 * @return Index into the connection hash table.
 
203
 *
 
204
 */
 
205
static hash_index_t conn_hash(unsigned long *key)
 
206
{
 
207
        assert(key);
 
208
        return (((*key) >> 4) % CONN_HASH_TABLE_CHAINS);
 
209
}
 
210
 
 
211
/** Compare hash table item with a key.
 
212
 *
 
213
 * @param key  Array containing the source phone hash as the only item.
 
214
 * @param keys Expected 1 but ignored.
 
215
 * @param item Connection hash table item.
 
216
 *
 
217
 * @return True on match, false otherwise.
 
218
 *
 
219
 */
 
220
static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
 
221
{
 
222
        connection_t *hs = hash_table_get_instance(item, connection_t, link);
 
223
        return (key[0] == hs->in_phone_hash);
 
224
}
 
225
 
 
226
/** Connection hash table removal callback function.
 
227
 *
 
228
 * This function is called whenever a connection is removed from the connection
 
229
 * hash table.
 
230
 *
 
231
 * @param item Connection hash table item being removed.
 
232
 *
 
233
 */
 
234
static void conn_remove(link_t *item)
 
235
{
 
236
        free(hash_table_get_instance(item, connection_t, link));
 
237
}
 
238
 
 
239
 
 
240
/** Operations for the connection hash table. */
 
241
static hash_table_operations_t conn_hash_table_ops = {
 
242
        .hash = conn_hash,
 
243
        .compare = conn_compare,
 
244
        .remove_callback = conn_remove
 
245
};
 
246
 
 
247
/** Sort in current fibril's timeout request.
 
248
 *
 
249
 * @param wd Wait data of the current fibril.
 
250
 *
 
251
 */
 
252
static void insert_timeout(awaiter_t *wd)
 
253
{
 
254
        wd->timedout = false;
 
255
        wd->inlist = true;
 
256
        
 
257
        link_t *tmp = timeout_list.next;
 
258
        while (tmp != &timeout_list) {
 
259
                awaiter_t *cur = list_get_instance(tmp, awaiter_t, link);
 
260
                
 
261
                if (tv_gteq(&cur->expires, &wd->expires))
 
262
                        break;
 
263
                
 
264
                tmp = tmp->next;
 
265
        }
 
266
        
 
267
        list_append(&wd->link, tmp);
 
268
}
 
269
 
 
270
/** Try to route a call to an appropriate connection fibril.
 
271
 *
 
272
 * If the proper connection fibril is found, a message with the call is added to
 
273
 * its message queue. If the fibril was not active, it is activated and all
 
274
 * timeouts are unregistered.
 
275
 *
 
276
 * @param callid Hash of the incoming call.
 
277
 * @param call   Data of the incoming call.
 
278
 *
 
279
 * @return False if the call doesn't match any connection.
 
280
 *         True if the call was passed to the respective connection fibril.
 
281
 *
 
282
 */
 
283
static bool route_call(ipc_callid_t callid, ipc_call_t *call)
 
284
{
 
285
        futex_down(&async_futex);
 
286
        
 
287
        unsigned long key = call->in_phone_hash;
 
288
        link_t *hlp = hash_table_find(&conn_hash_table, &key);
 
289
        
 
290
        if (!hlp) {
 
291
                futex_up(&async_futex);
 
292
                return false;
 
293
        }
 
294
        
 
295
        connection_t *conn = hash_table_get_instance(hlp, connection_t, link);
 
296
        
 
297
        msg_t *msg = malloc(sizeof(*msg));
 
298
        if (!msg) {
 
299
                futex_up(&async_futex);
 
300
                return false;
 
301
        }
 
302
        
 
303
        msg->callid = callid;
 
304
        msg->call = *call;
 
305
        list_append(&msg->link, &conn->msg_queue);
 
306
        
 
307
        if (IPC_GET_METHOD(*call) == IPC_M_PHONE_HUNGUP)
 
308
                conn->close_callid = callid;
 
309
        
 
310
        /* If the connection fibril is waiting for an event, activate it */
 
311
        if (!conn->wdata.active) {
 
312
                
 
313
                /* If in timeout list, remove it */
 
314
                if (conn->wdata.inlist) {
 
315
                        conn->wdata.inlist = false;
 
316
                        list_remove(&conn->wdata.link);
 
317
                }
 
318
                
 
319
                conn->wdata.active = true;
 
320
                fibril_add_ready(conn->wdata.fid);
 
321
        }
 
322
        
 
323
        futex_up(&async_futex);
 
324
        return true;
 
325
}
 
326
 
 
327
/** Notification fibril.
 
328
 *
 
329
 * When a notification arrives, a fibril with this implementing function is
 
330
 * created. It calls interrupt_received() and does the final cleanup.
 
331
 *
 
332
 * @param arg Message structure pointer.
 
333
 *
 
334
 * @return Always zero.
 
335
 *
 
336
 */
 
337
static int notification_fibril(void *arg)
 
338
{
 
339
        msg_t *msg = (msg_t *) arg;
 
340
        interrupt_received(msg->callid, &msg->call);
 
341
        
 
342
        free(msg);
 
343
        return 0;
 
344
}
 
345
 
 
346
/** Process interrupt notification.
 
347
 *
 
348
 * A new fibril is created which would process the notification.
 
349
 *
 
350
 * @param callid Hash of the incoming call.
 
351
 * @param call   Data of the incoming call.
 
352
 *
 
353
 * @return False if an error occured.
 
354
 *         True if the call was passed to the notification fibril.
 
355
 *
 
356
 */
 
357
static bool process_notification(ipc_callid_t callid, ipc_call_t *call)
 
358
{
 
359
        futex_down(&async_futex);
 
360
        
 
361
        msg_t *msg = malloc(sizeof(*msg));
 
362
        if (!msg) {
 
363
                futex_up(&async_futex);
 
364
                return false;
 
365
        }
 
366
        
 
367
        msg->callid = callid;
 
368
        msg->call = *call;
 
369
        
 
370
        fid_t fid = fibril_create(notification_fibril, msg);
 
371
        fibril_add_ready(fid);
 
372
        
 
373
        futex_up(&async_futex);
 
374
        return true;
 
375
}
 
376
 
 
377
/** Return new incoming message for the current (fibril-local) connection.
 
378
 *
 
379
 * @param call  Storage where the incoming call data will be stored.
 
380
 * @param usecs Timeout in microseconds. Zero denotes no timeout.
 
381
 *
 
382
 * @return If no timeout was specified, then a hash of the
 
383
 *         incoming call is returned. If a timeout is specified,
 
384
 *         then a hash of the incoming call is returned unless
 
385
 *         the timeout expires prior to receiving a message. In
 
386
 *         that case zero is returned.
 
387
 *
 
388
 */
 
389
ipc_callid_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs)
 
390
{
 
391
        assert(FIBRIL_connection);
 
392
        
 
393
        /* Why doing this?
 
394
         * GCC 4.1.0 coughs on FIBRIL_connection-> dereference.
 
395
         * GCC 4.1.1 happilly puts the rdhwr instruction in delay slot.
 
396
         *           I would never expect to find so many errors in
 
397
         *           a compiler.
 
398
         */
 
399
        connection_t *conn = FIBRIL_connection;
 
400
        
 
401
        futex_down(&async_futex);
 
402
        
 
403
        if (usecs) {
 
404
                gettimeofday(&conn->wdata.expires, NULL);
 
405
                tv_add(&conn->wdata.expires, usecs);
 
406
        } else
 
407
                conn->wdata.inlist = false;
 
408
        
 
409
        /* If nothing in queue, wait until something arrives */
 
410
        while (list_empty(&conn->msg_queue)) {
 
411
                if (usecs)
 
412
                        insert_timeout(&conn->wdata);
 
413
                
 
414
                conn->wdata.active = false;
 
415
                
 
416
                /*
 
417
                 * Note: the current fibril will be rescheduled either due to a
 
418
                 * timeout or due to an arriving message destined to it. In the
 
419
                 * former case, handle_expired_timeouts() and, in the latter
 
420
                 * case, route_call() will perform the wakeup.
 
421
                 */
 
422
                fibril_switch(FIBRIL_TO_MANAGER);
 
423
                
 
424
                /*
 
425
                 * Futex is up after getting back from async_manager.
 
426
                 * Get it again.
 
427
                 */
 
428
                futex_down(&async_futex);
 
429
                if ((usecs) && (conn->wdata.timedout)
 
430
                    && (list_empty(&conn->msg_queue))) {
 
431
                        /* If we timed out -> exit */
 
432
                        futex_up(&async_futex);
 
433
                        return 0;
 
434
                }
 
435
        }
 
436
        
 
437
        msg_t *msg = list_get_instance(conn->msg_queue.next, msg_t, link);
 
438
        list_remove(&msg->link);
 
439
        
 
440
        ipc_callid_t callid = msg->callid;
 
441
        *call = msg->call;
 
442
        free(msg);
 
443
        
 
444
        futex_up(&async_futex);
 
445
        return callid;
 
446
}
 
447
 
 
448
/** Default fibril function that gets called to handle new connection.
 
449
 *
 
450
 * This function is defined as a weak symbol - to be redefined in user code.
 
451
 *
 
452
 * @param callid Hash of the incoming call.
 
453
 * @param call   Data of the incoming call.
 
454
 *
 
455
 */
 
456
static void default_client_connection(ipc_callid_t callid, ipc_call_t *call)
 
457
{
 
458
        ipc_answer_0(callid, ENOENT);
 
459
}
 
460
 
 
461
/** Default fibril function that gets called to handle interrupt notifications.
 
462
 *
 
463
 * This function is defined as a weak symbol - to be redefined in user code.
 
464
 *
 
465
 * @param callid Hash of the incoming call.
 
466
 * @param call   Data of the incoming call.
 
467
 *
 
468
 */
 
469
static void default_interrupt_received(ipc_callid_t callid, ipc_call_t *call)
 
470
{
 
471
}
 
472
 
 
473
/** Wrapper for client connection fibril.
 
474
 *
 
475
 * When a new connection arrives, a fibril with this implementing function is
 
476
 * created. It calls client_connection() and does the final cleanup.
 
477
 *
 
478
 * @param arg Connection structure pointer.
 
479
 *
 
480
 * @return Always zero.
 
481
 *
 
482
 */
 
483
static int connection_fibril(void *arg)
 
484
{
 
485
        /*
 
486
         * Setup fibril-local connection pointer and call client_connection().
 
487
         *
 
488
         */
 
489
        FIBRIL_connection = (connection_t *) arg;
 
490
        FIBRIL_connection->cfibril(FIBRIL_connection->callid,
 
491
            &FIBRIL_connection->call);
 
492
        
 
493
        /* Remove myself from the connection hash table */
 
494
        futex_down(&async_futex);
 
495
        unsigned long key = FIBRIL_connection->in_phone_hash;
 
496
        hash_table_remove(&conn_hash_table, &key, 1);
 
497
        futex_up(&async_futex);
 
498
        
 
499
        /* Answer all remaining messages with EHANGUP */
 
500
        while (!list_empty(&FIBRIL_connection->msg_queue)) {
 
501
                msg_t *msg;
 
502
                
 
503
                msg = list_get_instance(FIBRIL_connection->msg_queue.next,
 
504
                    msg_t, link);
 
505
                list_remove(&msg->link);
 
506
                ipc_answer_0(msg->callid, EHANGUP);
 
507
                free(msg);
 
508
        }
 
509
        
 
510
        if (FIBRIL_connection->close_callid)
 
511
                ipc_answer_0(FIBRIL_connection->close_callid, EOK);
 
512
        
 
513
        return 0;
 
514
}
 
515
 
 
516
/** Create a new fibril for a new connection.
 
517
 *
 
518
 * Create new fibril for connection, fill in connection structures and inserts
 
519
 * it into the hash table, so that later we can easily do routing of messages to
 
520
 * particular fibrils.
 
521
 *
 
522
 * @param in_phone_hash Identification of the incoming connection.
 
523
 * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
 
524
 *                      If callid is zero, the connection was opened by
 
525
 *                      accepting the IPC_M_CONNECT_TO_ME call and this function
 
526
 *                      is called directly by the server.
 
527
 * @param call          Call data of the opening call.
 
528
 * @param cfibril       Fibril function that should be called upon opening the
 
529
 *                      connection.
 
530
 *
 
531
 * @return New fibril id or NULL on failure.
 
532
 *
 
533
 */
 
534
fid_t async_new_connection(ipcarg_t in_phone_hash, ipc_callid_t callid,
 
535
    ipc_call_t *call, void (*cfibril)(ipc_callid_t, ipc_call_t *))
 
536
{
 
537
        connection_t *conn = malloc(sizeof(*conn));
 
538
        if (!conn) {
 
539
                if (callid)
 
540
                        ipc_answer_0(callid, ENOMEM);
 
541
                return NULL;
 
542
        }
 
543
        
 
544
        conn->in_phone_hash = in_phone_hash;
 
545
        list_initialize(&conn->msg_queue);
 
546
        conn->callid = callid;
 
547
        conn->close_callid = false;
 
548
        
 
549
        if (call)
 
550
                conn->call = *call;
 
551
        
 
552
        /* We will activate the fibril ASAP */
 
553
        conn->wdata.active = true;
 
554
        conn->cfibril = cfibril;
 
555
        conn->wdata.fid = fibril_create(connection_fibril, conn);
 
556
        
 
557
        if (!conn->wdata.fid) {
 
558
                free(conn);
 
559
                if (callid)
 
560
                        ipc_answer_0(callid, ENOMEM);
 
561
                return NULL;
 
562
        }
 
563
        
 
564
        /* Add connection to the connection hash table */
 
565
        unsigned long key = conn->in_phone_hash;
 
566
        
 
567
        futex_down(&async_futex);
 
568
        hash_table_insert(&conn_hash_table, &key, &conn->link);
 
569
        futex_up(&async_futex);
 
570
        
 
571
        fibril_add_ready(conn->wdata.fid);
 
572
        
 
573
        return conn->wdata.fid;
 
574
}
 
575
 
 
576
/** Handle a call that was received.
 
577
 *
 
578
 * If the call has the IPC_M_CONNECT_ME_TO method, a new connection is created.
 
579
 * Otherwise the call is routed to its connection fibril.
 
580
 *
 
581
 * @param callid Hash of the incoming call.
 
582
 * @param call   Data of the incoming call.
 
583
 *
 
584
 */
 
585
static void handle_call(ipc_callid_t callid, ipc_call_t *call)
 
586
{
 
587
        /* Unrouted call - do some default behaviour */
 
588
        if ((callid & IPC_CALLID_NOTIFICATION)) {
 
589
                process_notification(callid, call);
 
590
                goto out;
 
591
        }
 
592
        
 
593
        switch (IPC_GET_METHOD(*call)) {
 
594
        case IPC_M_CONNECT_ME:
 
595
        case IPC_M_CONNECT_ME_TO:
 
596
                /* Open new connection with fibril etc. */
 
597
                async_new_connection(IPC_GET_ARG5(*call), callid, call,
 
598
                    client_connection);
 
599
                goto out;
 
600
        }
 
601
        
 
602
        /* Try to route the call through the connection hash table */
 
603
        if (route_call(callid, call))
 
604
                goto out;
 
605
        
 
606
        /* Unknown call from unknown phone - hang it up */
 
607
        ipc_answer_0(callid, EHANGUP);
 
608
        return;
 
609
        
 
610
out:
 
611
        ;
 
612
}
 
613
 
 
614
/** Fire all timeouts that expired. */
 
615
static void handle_expired_timeouts(void)
 
616
{
 
617
        struct timeval tv;
 
618
        gettimeofday(&tv, NULL);
 
619
        
 
620
        futex_down(&async_futex);
 
621
        
 
622
        link_t *cur = timeout_list.next;
 
623
        while (cur != &timeout_list) {
 
624
                awaiter_t *waiter = list_get_instance(cur, awaiter_t, link);
 
625
                
 
626
                if (tv_gt(&waiter->expires, &tv))
 
627
                        break;
 
628
                
 
629
                cur = cur->next;
 
630
                
 
631
                list_remove(&waiter->link);
 
632
                waiter->inlist = false;
 
633
                waiter->timedout = true;
 
634
                
 
635
                /*
 
636
                 * Redundant condition?
 
637
                 * The fibril should not be active when it gets here.
 
638
                 */
 
639
                if (!waiter->active) {
 
640
                        waiter->active = true;
 
641
                        fibril_add_ready(waiter->fid);
 
642
                }
 
643
        }
 
644
        
 
645
        futex_up(&async_futex);
 
646
}
 
647
 
 
648
/** Endless loop dispatching incoming calls and answers.
 
649
 *
 
650
 * @return Never returns.
 
651
 *
 
652
 */
 
653
static int async_manager_worker(void)
 
654
{
 
655
        while (true) {
 
656
                if (fibril_switch(FIBRIL_FROM_MANAGER)) {
 
657
                        futex_up(&async_futex); 
 
658
                        /*
 
659
                         * async_futex is always held when entering a manager
 
660
                         * fibril.
 
661
                         */
 
662
                        continue;
 
663
                }
 
664
                
 
665
                futex_down(&async_futex);
 
666
                
 
667
                suseconds_t timeout;
 
668
                if (!list_empty(&timeout_list)) {
 
669
                        awaiter_t *waiter = list_get_instance(timeout_list.next,
 
670
                            awaiter_t, link);
 
671
                        
 
672
                        struct timeval tv;
 
673
                        gettimeofday(&tv, NULL);
 
674
                        
 
675
                        if (tv_gteq(&tv, &waiter->expires)) {
 
676
                                futex_up(&async_futex);
 
677
                                handle_expired_timeouts();
 
678
                                continue;
 
679
                        } else
 
680
                                timeout = tv_sub(&waiter->expires, &tv);
 
681
                } else
 
682
                        timeout = SYNCH_NO_TIMEOUT;
 
683
                
 
684
                futex_up(&async_futex);
 
685
                
 
686
                ipc_call_t call;
 
687
                ipc_callid_t callid = ipc_wait_cycle(&call, timeout,
 
688
                    SYNCH_FLAGS_NONE);
 
689
                
 
690
                if (!callid) {
 
691
                        handle_expired_timeouts();
 
692
                        continue;
 
693
                }
 
694
                
 
695
                if (callid & IPC_CALLID_ANSWERED)
 
696
                        continue;
 
697
                
 
698
                handle_call(callid, &call);
 
699
        }
 
700
        
 
701
        return 0;
 
702
}
 
703
 
 
704
/** Function to start async_manager as a standalone fibril.
 
705
 *
 
706
 * When more kernel threads are used, one async manager should exist per thread.
 
707
 *
 
708
 * @param arg Unused.
 
709
 * @return Never returns.
 
710
 *
 
711
 */
 
712
static int async_manager_fibril(void *arg)
 
713
{
 
714
        futex_up(&async_futex);
 
715
        
 
716
        /*
 
717
         * async_futex is always locked when entering manager
 
718
         */
 
719
        async_manager_worker();
 
720
        
 
721
        return 0;
 
722
}
 
723
 
 
724
/** Add one manager to manager list. */
 
725
void async_create_manager(void)
 
726
{
 
727
        fid_t fid = fibril_create(async_manager_fibril, NULL);
 
728
        fibril_add_manager(fid);
 
729
}
 
730
 
 
731
/** Remove one manager from manager list */
 
732
void async_destroy_manager(void)
 
733
{
 
734
        fibril_remove_manager();
 
735
}
 
736
 
 
737
/** Initialize the async framework.
 
738
 *
 
739
 * @return Zero on success or an error code.
 
740
 */
 
741
int __async_init(void)
 
742
{
 
743
        if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_CHAINS, 1,
 
744
            &conn_hash_table_ops)) {
 
745
                printf("%s: cannot create hash table\n", "async");
 
746
                return ENOMEM;
 
747
        }
 
748
        
 
749
        return 0;
 
750
}
 
751
 
 
752
/** Reply received callback.
 
753
 *
 
754
 * This function is called whenever a reply for an asynchronous message sent out
 
755
 * by the asynchronous framework is received.
 
756
 *
 
757
 * Notify the fibril which is waiting for this message that it has arrived.
 
758
 *
 
759
 * @param arg    Pointer to the asynchronous message record.
 
760
 * @param retval Value returned in the answer.
 
761
 * @param data   Call data of the answer.
 
762
 */
 
763
static void reply_received(void *arg, int retval, ipc_call_t *data)
 
764
{
 
765
        futex_down(&async_futex);
 
766
        
 
767
        amsg_t *msg = (amsg_t *) arg;
 
768
        msg->retval = retval;
 
769
        
 
770
        /* Copy data after futex_down, just in case the call was detached */
 
771
        if ((msg->dataptr) && (data))
 
772
                *msg->dataptr = *data;
 
773
        
 
774
        write_barrier();
 
775
        
 
776
        /* Remove message from timeout list */
 
777
        if (msg->wdata.inlist)
 
778
                list_remove(&msg->wdata.link);
 
779
        
 
780
        msg->done = true;
 
781
        if (!msg->wdata.active) {
 
782
                msg->wdata.active = true;
 
783
                fibril_add_ready(msg->wdata.fid);
 
784
        }
 
785
        
 
786
        futex_up(&async_futex);
 
787
}
 
788
 
 
789
/** Send message and return id of the sent message.
 
790
 *
 
791
 * The return value can be used as input for async_wait() to wait for
 
792
 * completion.
 
793
 *
 
794
 * @param phoneid Handle of the phone that will be used for the send.
 
795
 * @param method  Service-defined method.
 
796
 * @param arg1    Service-defined payload argument.
 
797
 * @param arg2    Service-defined payload argument.
 
798
 * @param arg3    Service-defined payload argument.
 
799
 * @param arg4    Service-defined payload argument.
 
800
 * @param dataptr If non-NULL, storage where the reply data will be
 
801
 *                stored.
 
802
 *
 
803
 * @return Hash of the sent message or 0 on error.
 
804
 *
 
805
 */
 
806
aid_t async_send_fast(int phoneid, ipcarg_t method, ipcarg_t arg1,
 
807
    ipcarg_t arg2, ipcarg_t arg3, ipcarg_t arg4, ipc_call_t *dataptr)
 
808
{
 
809
        amsg_t *msg = malloc(sizeof(*msg));
 
810
        
 
811
        if (!msg)
 
812
                return 0;
 
813
        
 
814
        msg->done = false;
 
815
        msg->dataptr = dataptr;
 
816
        
 
817
        msg->wdata.inlist = false;
 
818
        /* We may sleep in the next method, but it will use its own mechanism */
 
819
        msg->wdata.active = true;
 
820
        
 
821
        ipc_call_async_4(phoneid, method, arg1, arg2, arg3, arg4, msg,
 
822
            reply_received, true);
 
823
        
 
824
        return (aid_t) msg;
 
825
}
 
826
 
 
827
/** Send message and return id of the sent message
 
828
 *
 
829
 * The return value can be used as input for async_wait() to wait for
 
830
 * completion.
 
831
 *
 
832
 * @param phoneid Handle of the phone that will be used for the send.
 
833
 * @param method  Service-defined method.
 
834
 * @param arg1    Service-defined payload argument.
 
835
 * @param arg2    Service-defined payload argument.
 
836
 * @param arg3    Service-defined payload argument.
 
837
 * @param arg4    Service-defined payload argument.
 
838
 * @param arg5    Service-defined payload argument.
 
839
 * @param dataptr If non-NULL, storage where the reply data will be
 
840
 *                stored.
 
841
 *
 
842
 * @return Hash of the sent message or 0 on error.
 
843
 *
 
844
 */
 
845
aid_t async_send_slow(int phoneid, ipcarg_t method, ipcarg_t arg1,
 
846
    ipcarg_t arg2, ipcarg_t arg3, ipcarg_t arg4, ipcarg_t arg5,
 
847
    ipc_call_t *dataptr)
 
848
{
 
849
        amsg_t *msg = malloc(sizeof(*msg));
 
850
        
 
851
        if (!msg)
 
852
                return 0;
 
853
        
 
854
        msg->done = false;
 
855
        msg->dataptr = dataptr;
 
856
        
 
857
        msg->wdata.inlist = false;
 
858
        /* We may sleep in next method, but it will use its own mechanism */
 
859
        msg->wdata.active = true;
 
860
        
 
861
        ipc_call_async_5(phoneid, method, arg1, arg2, arg3, arg4, arg5, msg,
 
862
            reply_received, true);
 
863
        
 
864
        return (aid_t) msg;
 
865
}
 
866
 
 
867
/** Wait for a message sent by the async framework.
 
868
 *
 
869
 * @param amsgid Hash of the message to wait for.
 
870
 * @param retval Pointer to storage where the retval of the answer will
 
871
 *               be stored.
 
872
 *
 
873
 */
 
874
void async_wait_for(aid_t amsgid, ipcarg_t *retval)
 
875
{
 
876
        amsg_t *msg = (amsg_t *) amsgid;
 
877
        
 
878
        futex_down(&async_futex);
 
879
        if (msg->done) {
 
880
                futex_up(&async_futex);
 
881
                goto done;
 
882
        }
 
883
        
 
884
        msg->wdata.fid = fibril_get_id();
 
885
        msg->wdata.active = false;
 
886
        msg->wdata.inlist = false;
 
887
        
 
888
        /* Leave the async_futex locked when entering this function */
 
889
        fibril_switch(FIBRIL_TO_MANAGER);
 
890
        
 
891
        /* Futex is up automatically after fibril_switch */
 
892
        
 
893
done:
 
894
        if (retval)
 
895
                *retval = msg->retval;
 
896
        
 
897
        free(msg);
 
898
}
 
899
 
 
900
/** Wait for a message sent by the async framework, timeout variant.
 
901
 *
 
902
 * @param amsgid  Hash of the message to wait for.
 
903
 * @param retval  Pointer to storage where the retval of the answer will
 
904
 *                be stored.
 
905
 * @param timeout Timeout in microseconds.
 
906
 *
 
907
 * @return Zero on success, ETIMEOUT if the timeout has expired.
 
908
 *
 
909
 */
 
910
int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, suseconds_t timeout)
 
911
{
 
912
        amsg_t *msg = (amsg_t *) amsgid;
 
913
        
 
914
        /* TODO: Let it go through the event read at least once */
 
915
        if (timeout < 0)
 
916
                return ETIMEOUT;
 
917
        
 
918
        futex_down(&async_futex);
 
919
        if (msg->done) {
 
920
                futex_up(&async_futex);
 
921
                goto done;
 
922
        }
 
923
        
 
924
        gettimeofday(&msg->wdata.expires, NULL);
 
925
        tv_add(&msg->wdata.expires, timeout);
 
926
        
 
927
        msg->wdata.fid = fibril_get_id();
 
928
        msg->wdata.active = false;
 
929
        insert_timeout(&msg->wdata);
 
930
        
 
931
        /* Leave the async_futex locked when entering this function */
 
932
        fibril_switch(FIBRIL_TO_MANAGER);
 
933
        
 
934
        /* Futex is up automatically after fibril_switch */
 
935
        
 
936
        if (!msg->done)
 
937
                return ETIMEOUT;
 
938
        
 
939
done:
 
940
        if (retval)
 
941
                *retval = msg->retval;
 
942
        
 
943
        free(msg);
 
944
        
 
945
        return 0;
 
946
}
 
947
 
 
948
/** Wait for specified time.
 
949
 *
 
950
 * The current fibril is suspended but the thread continues to execute.
 
951
 *
 
952
 * @param timeout Duration of the wait in microseconds.
 
953
 *
 
954
 */
 
955
void async_usleep(suseconds_t timeout)
 
956
{
 
957
        amsg_t *msg = malloc(sizeof(*msg));
 
958
        
 
959
        if (!msg)
 
960
                return;
 
961
        
 
962
        msg->wdata.fid = fibril_get_id();
 
963
        msg->wdata.active = false;
 
964
        
 
965
        gettimeofday(&msg->wdata.expires, NULL);
 
966
        tv_add(&msg->wdata.expires, timeout);
 
967
        
 
968
        futex_down(&async_futex);
 
969
        
 
970
        insert_timeout(&msg->wdata);
 
971
        
 
972
        /* Leave the async_futex locked when entering this function */
 
973
        fibril_switch(FIBRIL_TO_MANAGER);
 
974
        
 
975
        /* Futex is up automatically after fibril_switch() */
 
976
        
 
977
        free(msg);
 
978
}
 
979
 
 
980
/** Setter for client_connection function pointer.
 
981
 *
 
982
 * @param conn Function that will implement a new connection fibril.
 
983
 *
 
984
 */
 
985
void async_set_client_connection(async_client_conn_t conn)
 
986
{
 
987
        client_connection = conn;
 
988
}
 
989
 
 
990
/** Setter for interrupt_received function pointer.
 
991
 *
 
992
 * @param intr Function that will implement a new interrupt
 
993
 *             notification fibril.
 
994
 */
 
995
void async_set_interrupt_received(async_client_conn_t intr)
 
996
{
 
997
        interrupt_received = intr;
 
998
}
 
999
 
 
1000
/** Pseudo-synchronous message sending - fast version.
 
1001
 *
 
1002
 * Send message asynchronously and return only after the reply arrives.
 
1003
 *
 
1004
 * This function can only transfer 4 register payload arguments. For
 
1005
 * transferring more arguments, see the slower async_req_slow().
 
1006
 *
 
1007
 * @param phoneid Hash of the phone through which to make the call.
 
1008
 * @param method  Method of the call.
 
1009
 * @param arg1    Service-defined payload argument.
 
1010
 * @param arg2    Service-defined payload argument.
 
1011
 * @param arg3    Service-defined payload argument.
 
1012
 * @param arg4    Service-defined payload argument.
 
1013
 * @param r1      If non-NULL, storage for the 1st reply argument.
 
1014
 * @param r2      If non-NULL, storage for the 2nd reply argument.
 
1015
 * @param r3      If non-NULL, storage for the 3rd reply argument.
 
1016
 * @param r4      If non-NULL, storage for the 4th reply argument.
 
1017
 * @param r5      If non-NULL, storage for the 5th reply argument.
 
1018
 *
 
1019
 * @return Return code of the reply or a negative error code.
 
1020
 *
 
1021
 */
 
1022
ipcarg_t async_req_fast(int phoneid, ipcarg_t method, ipcarg_t arg1,
 
1023
    ipcarg_t arg2, ipcarg_t arg3, ipcarg_t arg4, ipcarg_t *r1, ipcarg_t *r2,
 
1024
    ipcarg_t *r3, ipcarg_t *r4, ipcarg_t *r5)
 
1025
{
 
1026
        ipc_call_t result;
 
1027
        aid_t eid = async_send_4(phoneid, method, arg1, arg2, arg3, arg4,
 
1028
            &result);
 
1029
        
 
1030
        ipcarg_t rc;
 
1031
        async_wait_for(eid, &rc);
 
1032
        
 
1033
        if (r1)
 
1034
                *r1 = IPC_GET_ARG1(result);
 
1035
        
 
1036
        if (r2)
 
1037
                *r2 = IPC_GET_ARG2(result);
 
1038
        
 
1039
        if (r3)
 
1040
                *r3 = IPC_GET_ARG3(result);
 
1041
        
 
1042
        if (r4)
 
1043
                *r4 = IPC_GET_ARG4(result);
 
1044
        
 
1045
        if (r5)
 
1046
                *r5 = IPC_GET_ARG5(result);
 
1047
        
 
1048
        return rc;
 
1049
}
 
1050
 
 
1051
/** Pseudo-synchronous message sending - slow version.
 
1052
 *
 
1053
 * Send message asynchronously and return only after the reply arrives.
 
1054
 *
 
1055
 * @param phoneid Hash of the phone through which to make the call.
 
1056
 * @param method  Method of the call.
 
1057
 * @param arg1    Service-defined payload argument.
 
1058
 * @param arg2    Service-defined payload argument.
 
1059
 * @param arg3    Service-defined payload argument.
 
1060
 * @param arg4    Service-defined payload argument.
 
1061
 * @param arg5    Service-defined payload argument.
 
1062
 * @param r1      If non-NULL, storage for the 1st reply argument.
 
1063
 * @param r2      If non-NULL, storage for the 2nd reply argument.
 
1064
 * @param r3      If non-NULL, storage for the 3rd reply argument.
 
1065
 * @param r4      If non-NULL, storage for the 4th reply argument.
 
1066
 * @param r5      If non-NULL, storage for the 5th reply argument.
 
1067
 *
 
1068
 * @return Return code of the reply or a negative error code.
 
1069
 *
 
1070
 */
 
1071
ipcarg_t async_req_slow(int phoneid, ipcarg_t method, ipcarg_t arg1,
 
1072
    ipcarg_t arg2, ipcarg_t arg3, ipcarg_t arg4, ipcarg_t arg5, ipcarg_t *r1,
 
1073
    ipcarg_t *r2, ipcarg_t *r3, ipcarg_t *r4, ipcarg_t *r5)
 
1074
{
 
1075
        ipc_call_t result;
 
1076
        aid_t eid = async_send_5(phoneid, method, arg1, arg2, arg3, arg4, arg5,
 
1077
            &result);
 
1078
        
 
1079
        ipcarg_t rc;
 
1080
        async_wait_for(eid, &rc);
 
1081
        
 
1082
        if (r1)
 
1083
                *r1 = IPC_GET_ARG1(result);
 
1084
        
 
1085
        if (r2)
 
1086
                *r2 = IPC_GET_ARG2(result);
 
1087
        
 
1088
        if (r3)
 
1089
                *r3 = IPC_GET_ARG3(result);
 
1090
        
 
1091
        if (r4)
 
1092
                *r4 = IPC_GET_ARG4(result);
 
1093
        
 
1094
        if (r5)
 
1095
                *r5 = IPC_GET_ARG5(result);
 
1096
        
 
1097
        return rc;
 
1098
}
 
1099
 
 
1100
/** @}
 
1101
 */