~ubuntu-branches/ubuntu/natty/gnome-keyring/natty

« back to all changes in this revision

Viewing changes to daemon/util/gkr-daemon-async.c

  • Committer: Bazaar Package Importer
  • Author(s): Sebastien Bacher
  • Date: 2010-02-16 19:00:06 UTC
  • mfrom: (1.1.58 upstream)
  • Revision ID: james.westby@ubuntu.com-20100216190006-cqpnic4zxlkmmi0o
Tags: 2.29.90git20100218-0ubuntu1
Updated to a git snapshot version

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- Mode: C; indent-tabs-mode: t; c-basic-offset: 8; tab-width: 8 -*- */
2
 
/* gkr-daemon-async.c - some daemon async functionality
3
 
 
4
 
   Copyright (C) 2007, Nate Nielsen
5
 
 
6
 
   The Gnome Keyring Library is free software; you can redistribute it and/or
7
 
   modify it under the terms of the GNU Library General Public License as
8
 
   published by the Free Software Foundation; either version 2 of the
9
 
   License, or (at your option) any later version.
10
 
 
11
 
   The Gnome Keyring Library is distributed in the hope that it will be useful,
12
 
   but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
 
   Library General Public License for more details.
15
 
 
16
 
   You should have received a copy of the GNU Library General Public
17
 
   License along with the Gnome Library; see the file COPYING.LIB.  If not,
18
 
   write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19
 
   Boston, MA 02111-1307, USA.
20
 
 
21
 
   Author: Nate Nielsen <nielsen@memberwebs.com>
22
 
*/
23
 
 
24
 
 
25
 
#include "gkr-daemon-async.h"
26
 
 
27
 
#include <glib.h>
28
 
 
29
 
#include <unistd.h>
30
 
#include <errno.h>
31
 
#include <fcntl.h>
32
 
#include <string.h>
33
 
 
34
 
#define DEBUG_LOCKS 0
35
 
 
36
 
/* 
37
 
 * See comments on async_poll_func() on the order of the various
38
 
 * gets and sets of waiting_on_* flags.
39
 
 */
40
 
#if DEBUG_LOCKS
41
 
#define DO_LOCK(mtx) G_STMT_START { \
42
 
                g_printerr ("%s LOCK %s\n", __func__, G_STRINGIFY(mtx));  \
43
 
                g_atomic_int_inc (&waiting_on_lock); \
44
 
                if (g_atomic_int_get (&waiting_on_poll)) g_main_context_wakeup (main_ctx); \
45
 
                g_mutex_lock (mtx);  \
46
 
                g_atomic_int_add (&waiting_on_lock, -1); \
47
 
        } G_STMT_END
48
 
#define DO_UNLOCK(mtx) G_STMT_START { \
49
 
                g_printerr ("%s UNLOCK %s\n", __func__, G_STRINGIFY(mtx));  \
50
 
                g_mutex_unlock (mtx);  \
51
 
        } G_STMT_END
52
 
#else
53
 
#define DO_LOCK(mtx) G_STMT_START { \
54
 
                g_atomic_int_inc (&waiting_on_lock); \
55
 
                if (g_atomic_int_get (&waiting_on_poll)) g_main_context_wakeup (main_ctx); \
56
 
                g_mutex_lock (mtx); \
57
 
                g_atomic_int_add (&waiting_on_lock, -1); \
58
 
        } G_STMT_END
59
 
#define DO_UNLOCK(mtx) \
60
 
        g_mutex_unlock (mtx)
61
 
#endif  
62
 
        
63
 
/* 
64
 
 * Private data for the async calls to be used on a worker thread, for making 
65
 
 * calls to the main thread. 
66
 
 * 
67
 
 * This will always be null for the main thread. 
68
 
 */
69
 
GStaticPrivate thread_private = G_STATIC_PRIVATE_INIT;
70
 
 
71
 
#define ASSERT_IS_MAIN() \
72
 
        g_assert (g_static_private_get (&thread_private) == NULL)
73
 
        
74
 
#define ASSERT_IS_WORKER() \
75
 
        g_assert (g_static_private_get (&thread_private) != NULL)
76
 
 
77
 
 
78
 
static GMainContext *main_ctx = NULL;           /* The main loop we're operating on */
79
 
static GMutex *async_mutex = NULL;              /* The mutex which is used for cooperative multitasking */
80
 
static GPollFunc orig_poll_func = NULL;         /* The system poll function, which we wrap */
81
 
static gint async_source_id = 0;                /* Our GSource id for the main loop */
82
 
static GQueue *done_queue = NULL;               /* The queue of completed worker threads */ 
83
 
static GHashTable *running_workers = NULL;      /* A set of running worker threads */
84
 
static gint waiting_on_lock = 0;                /* Number of threads waiting on lock */ 
85
 
static gint waiting_on_poll = 0;                /* Whether we're waiting on the poll or not */
86
 
 
87
 
static void cleanup_done_threads (void);
88
 
 
89
 
/* -----------------------------------------------------------------------------
90
 
 * ASYNC MAINLOOP FUNCTIONS
91
 
 */
92
 
 
93
 
static gint
94
 
async_poll_func (GPollFD *ufds, guint nfsd, gint timeout)
95
 
{
96
 
        gint ret;
97
 
        
98
 
        g_assert (orig_poll_func);
99
 
 
100
 
        if (done_queue && !g_queue_is_empty (done_queue))
101
 
                cleanup_done_threads ();
102
 
 
103
 
        /* 
104
 
         * These two atomic variables are interlocked in the 
105
 
         * opposite order from those in DO_LOCK which prevents
106
 
         * race conditions in the if statements.
107
 
         */
108
 
        g_atomic_int_set (&waiting_on_poll, 1);
109
 
        if (g_atomic_int_get (&waiting_on_lock))
110
 
                timeout = 0;
111
 
        
112
 
        ret = (orig_poll_func) (ufds, nfsd, timeout);
113
 
 
114
 
        g_atomic_int_set (&waiting_on_poll, 0);
115
 
        
116
 
        return ret;
117
 
}
118
 
 
119
 
static gboolean
120
 
async_source_prepare(GSource* source, gint *timeout)
121
 
{
122
 
        gboolean have = g_atomic_int_get (&waiting_on_lock) > 0;
123
 
        *timeout = have ? 0 : -1;
124
 
        return have ? TRUE : FALSE;
125
 
}
126
 
 
127
 
static gboolean
128
 
async_source_check(GSource* source)
129
 
{
130
 
        return g_atomic_int_get (&waiting_on_lock) > 0;
131
 
}
132
 
 
133
 
static gboolean
134
 
async_source_dispatch(GSource* source, GSourceFunc callback, gpointer user_data)
135
 
{
136
 
        /* Let a worker run */
137
 
        DO_UNLOCK (async_mutex);
138
 
        g_thread_yield ();
139
 
        DO_LOCK (async_mutex);
140
 
        return TRUE;
141
 
}
142
 
 
143
 
static void
144
 
async_source_finalize(GSource* source)
145
 
{
146
 
 
147
 
}
148
 
 
149
 
static GSourceFuncs async_source_functions = {
150
 
        async_source_prepare,
151
 
        async_source_check,
152
 
        async_source_dispatch,
153
 
        async_source_finalize
154
 
};
155
 
 
156
 
void
157
 
gkr_daemon_async_workers_init (GMainLoop *mainloop)
158
 
{
159
 
        GSource *src;
160
 
        
161
 
        if (main_ctx)
162
 
                return;
163
 
 
164
 
        g_assert (mainloop);
165
 
        
166
 
        async_mutex = g_mutex_new ();
167
 
 
168
 
        g_assert (!main_ctx);
169
 
        main_ctx = g_main_loop_get_context (mainloop);
170
 
        g_assert (main_ctx);
171
 
        g_main_context_ref (main_ctx);
172
 
        
173
 
        /* Add our idle handler which processes other tasks */
174
 
        g_assert(!async_source_id);
175
 
        src = g_source_new (&async_source_functions, sizeof (GSource));
176
 
        async_source_id = g_source_attach (src, main_ctx);
177
 
        g_source_unref (src);
178
 
 
179
 
        /* Swap in our poll func */
180
 
        orig_poll_func = g_main_context_get_poll_func (main_ctx);
181
 
        g_assert (orig_poll_func);
182
 
        g_main_context_set_poll_func (main_ctx, async_poll_func);
183
 
 
184
 
        /* 
185
 
         * The mutex gets locked each time the main loop is waiting 
186
 
         * for input. See lock_step_poll_func() 
187
 
         */     
188
 
        DO_LOCK (async_mutex);
189
 
}
190
 
 
191
 
void 
192
 
gkr_daemon_async_workers_uninit (void)
193
 
{
194
 
        GSource* src;
195
 
        
196
 
        gkr_daemon_async_workers_stop_all ();
197
 
 
198
 
        DO_UNLOCK (async_mutex);
199
 
        
200
 
        /* Take out the source */
201
 
        g_assert (async_source_id);
202
 
        src = g_main_context_find_source_by_id(main_ctx, async_source_id);
203
 
        g_assert (src);
204
 
        g_source_destroy (src);
205
 
        async_source_id = 0;
206
 
                
207
 
        /* Swap back in original poll func */
208
 
        g_assert (orig_poll_func);
209
 
        g_main_context_set_poll_func (main_ctx, orig_poll_func);
210
 
                
211
 
        g_main_context_unref (main_ctx);
212
 
        main_ctx = NULL;
213
 
        
214
 
        if (async_mutex) {
215
 
                g_mutex_free (async_mutex);
216
 
                async_mutex = NULL;
217
 
        }
218
 
}
219
 
 
220
 
/* -----------------------------------------------------------------------------
221
 
 * ASYNC WORKER FUNCTIONS
222
 
 */
223
 
 
224
 
 
225
 
typedef struct _GkrCancelCallback {
226
 
        GDestroyNotify cancel_func;
227
 
        gpointer user_data;
228
 
} GkrCancelCallback;
229
 
 
230
 
struct _GkrDaemonAsyncWorker {
231
 
        GThread *thread;
232
 
        
233
 
        GThreadFunc func;
234
 
        GkrDaemonAsyncWorkerCallback callback;
235
 
        GQueue *cancel_funcs;
236
 
        
237
 
        /* The current status */
238
 
        gint cancelled;
239
 
        gint stopped;
240
 
 
241
 
        /* Arguments for callbacks and worker calls */
242
 
        gpointer user_data;
243
 
};
244
 
 
245
 
static gpointer 
246
 
async_worker_thread (gpointer data)
247
 
{
248
 
        GkrDaemonAsyncWorker *worker = (GkrDaemonAsyncWorker*)data;
249
 
        gpointer result;
250
 
        
251
 
        g_assert (worker);
252
 
        g_assert (worker->func);
253
 
 
254
 
        /* The marks this as a worker thread, setup async calls to main thread */
255
 
        g_assert (g_static_private_get (&thread_private) == NULL);
256
 
        g_static_private_set (&thread_private, worker, NULL);
257
 
        
258
 
        ASSERT_IS_WORKER ();
259
 
        
260
 
        /* 
261
 
         * Call the actual thread function. This mutex is unlocked by workers
262
 
         * when they yield, or by the main loop when it is waiting for input. 
263
 
         */
264
 
        DO_LOCK (async_mutex);
265
 
        
266
 
                result = (worker->func) (worker->user_data);
267
 
 
268
 
                /* We're all done yay, let main thread know about it */
269
 
                g_atomic_int_inc (&worker->stopped);
270
 
                
271
 
                g_assert (done_queue);
272
 
                g_queue_push_tail (done_queue, worker);
273
 
                
274
 
        DO_UNLOCK (async_mutex);
275
 
        
276
 
        g_static_private_set (&thread_private, NULL, NULL);
277
 
        
278
 
        g_main_context_wakeup (main_ctx);
279
 
        
280
 
        g_thread_exit (result);
281
 
        return result;
282
 
}
283
 
 
284
 
static gboolean
285
 
cleanup_done_thread (gpointer message, gpointer data)
286
 
{
287
 
        GkrDaemonAsyncWorker *worker = (GkrDaemonAsyncWorker*)message;
288
 
        GkrCancelCallback *cb;
289
 
        gpointer result;
290
 
 
291
 
        ASSERT_IS_MAIN ();
292
 
        
293
 
        g_assert (g_atomic_int_get (&worker->stopped));
294
 
                
295
 
        /* This shouldn't block, because worker->stopped is set */
296
 
        g_assert (worker->thread);
297
 
        result = g_thread_join (worker->thread);
298
 
                
299
 
        if (worker->callback)
300
 
                (worker->callback) (worker, result, worker->user_data);
301
 
 
302
 
        /* Free all the cancel funcs */         
303
 
        for (;;) {
304
 
                cb = g_queue_pop_tail (worker->cancel_funcs);
305
 
                if (!cb)
306
 
                        break;
307
 
                g_slice_free (GkrCancelCallback, cb);
308
 
        }
309
 
        g_queue_free (worker->cancel_funcs);
310
 
                        
311
 
        g_hash_table_remove (running_workers, worker); 
312
 
        g_free (worker);        
313
 
        
314
 
        /* Cleanup all related stuff */
315
 
        if (!g_hash_table_size (running_workers)) {
316
 
                g_queue_free (done_queue);
317
 
                done_queue = NULL;
318
 
                
319
 
                g_hash_table_destroy (running_workers);
320
 
                running_workers = NULL;
321
 
                
322
 
                g_assert (main_ctx);
323
 
                return FALSE;
324
 
        }
325
 
        
326
 
        return TRUE;
327
 
}
328
 
 
329
 
static void 
330
 
cleanup_done_threads (void)
331
 
{
332
 
        gpointer message;
333
 
        
334
 
        while (done_queue && !g_queue_is_empty (done_queue))
335
 
        {
336
 
                message = g_queue_pop_head (done_queue);
337
 
                g_assert (message);
338
 
                
339
 
                cleanup_done_thread (message, NULL);
340
 
        }
341
 
}
342
 
 
343
 
GkrDaemonAsyncWorker*
344
 
gkr_daemon_async_worker_start (GThreadFunc func, GkrDaemonAsyncWorkerCallback callback, 
345
 
                               gpointer user_data)
346
 
{
347
 
        GkrDaemonAsyncWorker *worker;
348
 
        GError *err = NULL;
349
 
        
350
 
        ASSERT_IS_MAIN ();      
351
 
        
352
 
        if (!done_queue) {
353
 
                g_assert (main_ctx);
354
 
                
355
 
                done_queue = g_queue_new ();
356
 
                g_assert (!running_workers);
357
 
                running_workers = g_hash_table_new (g_direct_hash, g_direct_equal);
358
 
        }
359
 
        
360
 
        worker = g_new0 (GkrDaemonAsyncWorker, 1);
361
 
        worker->func = func;
362
 
        worker->callback = callback;
363
 
        worker->cancel_funcs = g_queue_new ();
364
 
        worker->user_data = user_data;
365
 
        worker->cancelled = 0;
366
 
        worker->stopped = 0;
367
 
        
368
 
        /* 
369
 
         * Don't change this to a thread pool too lightly. Assumptions are made 
370
 
         * that worker threads are not shared throughout the code.
371
 
         */
372
 
        worker->thread = g_thread_create (async_worker_thread, worker, TRUE, &err);
373
 
        if (!worker->thread) {
374
 
                g_warning ("couldn't create new worker thread: %s", err->message);
375
 
                g_error_free (err);
376
 
                g_free (worker);
377
 
                return NULL;
378
 
        }
379
 
 
380
 
        g_hash_table_replace (running_workers, worker, worker); 
381
 
        return worker;
382
 
}
383
 
 
384
 
void
385
 
gkr_daemon_async_worker_cancel (GkrDaemonAsyncWorker *worker)
386
 
{
387
 
        GkrCancelCallback *cb;
388
 
        
389
 
        g_assert (gkr_daemon_async_worker_is_valid (worker));
390
 
        g_atomic_int_inc (&worker->cancelled);
391
 
        
392
 
        for (;;) {
393
 
                cb = g_queue_pop_tail (worker->cancel_funcs);
394
 
                if (!cb)
395
 
                        break;
396
 
                (cb->cancel_func) (cb->user_data);
397
 
                g_slice_free (GkrCancelCallback, cb);
398
 
        }
399
 
}
400
 
 
401
 
void
402
 
gkr_daemon_async_worker_stop (GkrDaemonAsyncWorker *worker)
403
 
{
404
 
        g_assert (gkr_daemon_async_worker_is_valid (worker));
405
 
        g_assert (worker);
406
 
        ASSERT_IS_MAIN ();
407
 
        
408
 
        gkr_daemon_async_worker_cancel (worker);
409
 
        
410
 
        while (!g_atomic_int_get (&worker->stopped)) {
411
 
                g_assert (running_workers && g_hash_table_size (running_workers) > 0);
412
 
                cleanup_done_threads ();
413
 
                gkr_daemon_async_yield ();
414
 
        }
415
 
 
416
 
        cleanup_done_threads ();
417
 
}
418
 
 
419
 
gboolean
420
 
gkr_daemon_async_worker_is_valid (GkrDaemonAsyncWorker *worker)
421
 
{
422
 
        ASSERT_IS_MAIN ();
423
 
        
424
 
        return worker && running_workers && 
425
 
               g_hash_table_lookup (running_workers, worker);   
426
 
}
427
 
 
428
 
guint
429
 
gkr_daemon_async_workers_get_n (void)
430
 
{
431
 
        ASSERT_IS_MAIN ();
432
 
        
433
 
        if (!running_workers)
434
 
                return 0;
435
 
        return g_hash_table_size (running_workers);     
436
 
}
437
 
 
438
 
static void 
439
 
cancel_each_worker (gpointer key, gpointer value, gpointer data)
440
 
{
441
 
        gkr_daemon_async_worker_cancel ((GkrDaemonAsyncWorker*)key);
442
 
}
443
 
 
444
 
void
445
 
gkr_daemon_async_workers_stop_all (void)
446
 
{
447
 
        ASSERT_IS_MAIN ();
448
 
        
449
 
        if (!running_workers)
450
 
                return;
451
 
        
452
 
        g_assert (done_queue);
453
 
        
454
 
        g_hash_table_foreach (running_workers, cancel_each_worker, NULL);
455
 
        
456
 
        while (running_workers) {
457
 
                g_assert (g_hash_table_size (running_workers) > 0);
458
 
                cleanup_done_threads ();
459
 
                gkr_daemon_async_yield ();
460
 
        }
461
 
}
462
 
 
463
 
/* -----------------------------------------------------------------------------
464
 
 * ASYNC FUNCTIONS FOR ANY THREAD
465
 
 */
466
 
 
467
 
gboolean
468
 
gkr_daemon_async_yield (void)
469
 
{
470
 
        GkrDaemonAsyncWorker *worker;
471
 
 
472
 
        g_assert (async_mutex);
473
 
        
474
 
        worker = (GkrDaemonAsyncWorker*)g_static_private_get (&thread_private);
475
 
        if (worker && g_atomic_int_get (&worker->cancelled))
476
 
                return FALSE;
477
 
 
478
 
        /* Let another worker or the main loop run */
479
 
        DO_UNLOCK (async_mutex);
480
 
        g_thread_yield ();
481
 
        DO_LOCK (async_mutex);
482
 
 
483
 
        if (worker && g_atomic_int_get (&worker->cancelled))
484
 
                return FALSE;
485
 
                
486
 
        return TRUE;
487
 
}
488
 
 
489
 
gboolean
490
 
gkr_daemon_async_is_stopping (void)
491
 
{
492
 
        GkrDaemonAsyncWorker *worker;
493
 
 
494
 
        worker = (GkrDaemonAsyncWorker*)g_static_private_get (&thread_private);
495
 
        if (worker && g_atomic_int_get (&worker->cancelled))
496
 
                return TRUE;
497
 
        
498
 
        return FALSE;
499
 
}
500
 
 
501
 
void
502
 
gkr_daemon_async_begin_concurrent (void)
503
 
{
504
 
        g_assert (async_mutex);
505
 
        
506
 
        /* Let another worker or the main loop run */
507
 
        DO_UNLOCK (async_mutex);
508
 
}
509
 
 
510
 
void
511
 
gkr_daemon_async_end_concurrent (void)
512
 
{
513
 
        g_assert (async_mutex);
514
 
        
515
 
        /* Make sure only one thread is running */
516
 
        DO_LOCK (async_mutex);
517
 
}
518
 
 
519
 
void
520
 
gkr_daemon_async_register_cancel (GDestroyNotify cancel, gpointer data)
521
 
{
522
 
        GkrCancelCallback *cb;
523
 
        GkrDaemonAsyncWorker *worker;
524
 
 
525
 
        g_assert (cancel);
526
 
        
527
 
        worker = (GkrDaemonAsyncWorker*)g_static_private_get (&thread_private);
528
 
        
529
 
        /* We don't support cancellation funcs for main thread */       
530
 
        if (!worker)
531
 
                return;
532
 
 
533
 
        cb = g_slice_new (GkrCancelCallback);
534
 
        cb->cancel_func = cancel;
535
 
        cb->user_data = data;
536
 
        
537
 
        g_queue_push_tail (worker->cancel_funcs, cb);
538
 
}
539
 
 
540
 
static gint
541
 
match_cancel_func (gconstpointer a, gconstpointer b)
542
 
{
543
 
        return memcmp (a, b, sizeof (GkrCancelCallback));
544
 
}
545
 
 
546
 
void
547
 
gkr_daemon_async_unregister_cancel (GDestroyNotify cancel, gpointer data)
548
 
{
549
 
        GkrCancelCallback match;
550
 
        GkrDaemonAsyncWorker *worker;
551
 
        GList *l;
552
 
        
553
 
        g_assert (cancel);
554
 
        
555
 
        worker = (GkrDaemonAsyncWorker*)g_static_private_get (&thread_private);
556
 
        
557
 
        /* We don't support cancellation funcs for main thread */       
558
 
        if (!worker)
559
 
                return;
560
 
                
561
 
        match.cancel_func = cancel;
562
 
        match.user_data = data;
563
 
                
564
 
        l = g_queue_find_custom (worker->cancel_funcs, &match, match_cancel_func);
565
 
        if (l) {
566
 
                g_slice_free (GkrCancelCallback, l->data);
567
 
                g_queue_delete_link (worker->cancel_funcs, l);
568
 
        } 
569
 
}
570
 
 
571
 
/* -----------------------------------------------------------------------------
572
 
 * ASYNC WAITS
573
 
 */
574
 
 
575
 
GkrDaemonAsyncWait*
576
 
gkr_daemon_async_wait_new (void)
577
 
{
578
 
        return (GkrDaemonAsyncWait*)g_cond_new ();
579
 
}
580
 
 
581
 
void
582
 
gkr_daemon_async_wait_free (GkrDaemonAsyncWait *wait)
583
 
{
584
 
        if (!wait)
585
 
                return;
586
 
        g_cond_free ((GCond*)wait);
587
 
}
588
 
 
589
 
void
590
 
gkr_daemon_async_wait (GkrDaemonAsyncWait *wait)
591
 
{
592
 
        g_assert (wait);
593
 
        g_cond_wait ((GCond*)wait, async_mutex);
594
 
}
595
 
 
596
 
void
597
 
gkr_daemon_async_notify (GkrDaemonAsyncWait *wait)
598
 
{
599
 
        g_assert (wait);
600
 
        g_cond_signal ((GCond*)wait);
601
 
}
602
 
 
603
 
void
604
 
gkr_daemon_async_usleep (gulong microseconds)
605
 
{
606
 
        g_assert (async_mutex);
607
 
        
608
 
        /* Let another worker or the main loop run */
609
 
        DO_UNLOCK (async_mutex);
610
 
        
611
 
                g_usleep (microseconds);
612
 
                
613
 
        DO_LOCK (async_mutex);
614
 
}
615
 
 
616
 
void
617
 
gkr_daemon_async_sleep (glong seconds)
618
 
{
619
 
        g_assert (async_mutex);
620
 
        
621
 
        /* Let another worker or the main loop run */
622
 
        DO_UNLOCK (async_mutex);
623
 
        
624
 
                g_usleep (G_USEC_PER_SEC * seconds);
625
 
                
626
 
        DO_LOCK (async_mutex);
627
 
}