1
/* -*- Mode: C; indent-tabs-mode: t; c-basic-offset: 8; tab-width: 8 -*- */
2
/* gkr-daemon-async.c - some daemon async functionality
4
Copyright (C) 2007, Nate Nielsen
6
The Gnome Keyring Library is free software; you can redistribute it and/or
7
modify it under the terms of the GNU Library General Public License as
8
published by the Free Software Foundation; either version 2 of the
9
License, or (at your option) any later version.
11
The Gnome Keyring Library is distributed in the hope that it will be useful,
12
but WITHOUT ANY WARRANTY; without even the implied warranty of
13
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14
Library General Public License for more details.
16
You should have received a copy of the GNU Library General Public
17
License along with the Gnome Library; see the file COPYING.LIB. If not,
18
write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19
Boston, MA 02111-1307, USA.
21
Author: Nate Nielsen <nielsen@memberwebs.com>
25
#include "gkr-daemon-async.h"
37
* See comments on async_poll_func() on the order of the various
38
* gets and sets of waiting_on_* flags.
41
#define DO_LOCK(mtx) G_STMT_START { \
42
g_printerr ("%s LOCK %s\n", __func__, G_STRINGIFY(mtx)); \
43
g_atomic_int_inc (&waiting_on_lock); \
44
if (g_atomic_int_get (&waiting_on_poll)) g_main_context_wakeup (main_ctx); \
46
g_atomic_int_add (&waiting_on_lock, -1); \
48
#define DO_UNLOCK(mtx) G_STMT_START { \
49
g_printerr ("%s UNLOCK %s\n", __func__, G_STRINGIFY(mtx)); \
50
g_mutex_unlock (mtx); \
53
#define DO_LOCK(mtx) G_STMT_START { \
54
g_atomic_int_inc (&waiting_on_lock); \
55
if (g_atomic_int_get (&waiting_on_poll)) g_main_context_wakeup (main_ctx); \
57
g_atomic_int_add (&waiting_on_lock, -1); \
59
#define DO_UNLOCK(mtx) \
64
* Private data for the async calls to be used on a worker thread, for making
65
* calls to the main thread.
67
* This will always be null for the main thread.
69
GStaticPrivate thread_private = G_STATIC_PRIVATE_INIT;
71
#define ASSERT_IS_MAIN() \
72
g_assert (g_static_private_get (&thread_private) == NULL)
74
#define ASSERT_IS_WORKER() \
75
g_assert (g_static_private_get (&thread_private) != NULL)
78
static GMainContext *main_ctx = NULL; /* The main loop we're operating on */
79
static GMutex *async_mutex = NULL; /* The mutex which is used for cooperative multitasking */
80
static GPollFunc orig_poll_func = NULL; /* The system poll function, which we wrap */
81
static gint async_source_id = 0; /* Our GSource id for the main loop */
82
static GQueue *done_queue = NULL; /* The queue of completed worker threads */
83
static GHashTable *running_workers = NULL; /* A set of running worker threads */
84
static gint waiting_on_lock = 0; /* Number of threads waiting on lock */
85
static gint waiting_on_poll = 0; /* Whether we're waiting on the poll or not */
87
static void cleanup_done_threads (void);
89
/* -----------------------------------------------------------------------------
90
* ASYNC MAINLOOP FUNCTIONS
94
async_poll_func (GPollFD *ufds, guint nfsd, gint timeout)
98
g_assert (orig_poll_func);
100
if (done_queue && !g_queue_is_empty (done_queue))
101
cleanup_done_threads ();
104
* These two atomic variables are interlocked in the
105
* opposite order from those in DO_LOCK which prevents
106
* race conditions in the if statements.
108
g_atomic_int_set (&waiting_on_poll, 1);
109
if (g_atomic_int_get (&waiting_on_lock))
112
ret = (orig_poll_func) (ufds, nfsd, timeout);
114
g_atomic_int_set (&waiting_on_poll, 0);
120
async_source_prepare(GSource* source, gint *timeout)
122
gboolean have = g_atomic_int_get (&waiting_on_lock) > 0;
123
*timeout = have ? 0 : -1;
124
return have ? TRUE : FALSE;
128
async_source_check(GSource* source)
130
return g_atomic_int_get (&waiting_on_lock) > 0;
134
async_source_dispatch(GSource* source, GSourceFunc callback, gpointer user_data)
136
/* Let a worker run */
137
DO_UNLOCK (async_mutex);
139
DO_LOCK (async_mutex);
144
async_source_finalize(GSource* source)
149
static GSourceFuncs async_source_functions = {
150
async_source_prepare,
152
async_source_dispatch,
153
async_source_finalize
157
gkr_daemon_async_workers_init (GMainLoop *mainloop)
166
async_mutex = g_mutex_new ();
168
g_assert (!main_ctx);
169
main_ctx = g_main_loop_get_context (mainloop);
171
g_main_context_ref (main_ctx);
173
/* Add our idle handler which processes other tasks */
174
g_assert(!async_source_id);
175
src = g_source_new (&async_source_functions, sizeof (GSource));
176
async_source_id = g_source_attach (src, main_ctx);
177
g_source_unref (src);
179
/* Swap in our poll func */
180
orig_poll_func = g_main_context_get_poll_func (main_ctx);
181
g_assert (orig_poll_func);
182
g_main_context_set_poll_func (main_ctx, async_poll_func);
185
* The mutex gets locked each time the main loop is waiting
186
* for input. See lock_step_poll_func()
188
DO_LOCK (async_mutex);
192
gkr_daemon_async_workers_uninit (void)
196
gkr_daemon_async_workers_stop_all ();
198
DO_UNLOCK (async_mutex);
200
/* Take out the source */
201
g_assert (async_source_id);
202
src = g_main_context_find_source_by_id(main_ctx, async_source_id);
204
g_source_destroy (src);
207
/* Swap back in original poll func */
208
g_assert (orig_poll_func);
209
g_main_context_set_poll_func (main_ctx, orig_poll_func);
211
g_main_context_unref (main_ctx);
215
g_mutex_free (async_mutex);
220
/* -----------------------------------------------------------------------------
221
* ASYNC WORKER FUNCTIONS
225
typedef struct _GkrCancelCallback {
226
GDestroyNotify cancel_func;
230
struct _GkrDaemonAsyncWorker {
234
GkrDaemonAsyncWorkerCallback callback;
235
GQueue *cancel_funcs;
237
/* The current status */
241
/* Arguments for callbacks and worker calls */
246
async_worker_thread (gpointer data)
248
GkrDaemonAsyncWorker *worker = (GkrDaemonAsyncWorker*)data;
252
g_assert (worker->func);
254
/* The marks this as a worker thread, setup async calls to main thread */
255
g_assert (g_static_private_get (&thread_private) == NULL);
256
g_static_private_set (&thread_private, worker, NULL);
261
* Call the actual thread function. This mutex is unlocked by workers
262
* when they yield, or by the main loop when it is waiting for input.
264
DO_LOCK (async_mutex);
266
result = (worker->func) (worker->user_data);
268
/* We're all done yay, let main thread know about it */
269
g_atomic_int_inc (&worker->stopped);
271
g_assert (done_queue);
272
g_queue_push_tail (done_queue, worker);
274
DO_UNLOCK (async_mutex);
276
g_static_private_set (&thread_private, NULL, NULL);
278
g_main_context_wakeup (main_ctx);
280
g_thread_exit (result);
285
cleanup_done_thread (gpointer message, gpointer data)
287
GkrDaemonAsyncWorker *worker = (GkrDaemonAsyncWorker*)message;
288
GkrCancelCallback *cb;
293
g_assert (g_atomic_int_get (&worker->stopped));
295
/* This shouldn't block, because worker->stopped is set */
296
g_assert (worker->thread);
297
result = g_thread_join (worker->thread);
299
if (worker->callback)
300
(worker->callback) (worker, result, worker->user_data);
302
/* Free all the cancel funcs */
304
cb = g_queue_pop_tail (worker->cancel_funcs);
307
g_slice_free (GkrCancelCallback, cb);
309
g_queue_free (worker->cancel_funcs);
311
g_hash_table_remove (running_workers, worker);
314
/* Cleanup all related stuff */
315
if (!g_hash_table_size (running_workers)) {
316
g_queue_free (done_queue);
319
g_hash_table_destroy (running_workers);
320
running_workers = NULL;
330
cleanup_done_threads (void)
334
while (done_queue && !g_queue_is_empty (done_queue))
336
message = g_queue_pop_head (done_queue);
339
cleanup_done_thread (message, NULL);
343
GkrDaemonAsyncWorker*
344
gkr_daemon_async_worker_start (GThreadFunc func, GkrDaemonAsyncWorkerCallback callback,
347
GkrDaemonAsyncWorker *worker;
355
done_queue = g_queue_new ();
356
g_assert (!running_workers);
357
running_workers = g_hash_table_new (g_direct_hash, g_direct_equal);
360
worker = g_new0 (GkrDaemonAsyncWorker, 1);
362
worker->callback = callback;
363
worker->cancel_funcs = g_queue_new ();
364
worker->user_data = user_data;
365
worker->cancelled = 0;
369
* Don't change this to a thread pool too lightly. Assumptions are made
370
* that worker threads are not shared throughout the code.
372
worker->thread = g_thread_create (async_worker_thread, worker, TRUE, &err);
373
if (!worker->thread) {
374
g_warning ("couldn't create new worker thread: %s", err->message);
380
g_hash_table_replace (running_workers, worker, worker);
385
gkr_daemon_async_worker_cancel (GkrDaemonAsyncWorker *worker)
387
GkrCancelCallback *cb;
389
g_assert (gkr_daemon_async_worker_is_valid (worker));
390
g_atomic_int_inc (&worker->cancelled);
393
cb = g_queue_pop_tail (worker->cancel_funcs);
396
(cb->cancel_func) (cb->user_data);
397
g_slice_free (GkrCancelCallback, cb);
402
gkr_daemon_async_worker_stop (GkrDaemonAsyncWorker *worker)
404
g_assert (gkr_daemon_async_worker_is_valid (worker));
408
gkr_daemon_async_worker_cancel (worker);
410
while (!g_atomic_int_get (&worker->stopped)) {
411
g_assert (running_workers && g_hash_table_size (running_workers) > 0);
412
cleanup_done_threads ();
413
gkr_daemon_async_yield ();
416
cleanup_done_threads ();
420
gkr_daemon_async_worker_is_valid (GkrDaemonAsyncWorker *worker)
424
return worker && running_workers &&
425
g_hash_table_lookup (running_workers, worker);
429
gkr_daemon_async_workers_get_n (void)
433
if (!running_workers)
435
return g_hash_table_size (running_workers);
439
cancel_each_worker (gpointer key, gpointer value, gpointer data)
441
gkr_daemon_async_worker_cancel ((GkrDaemonAsyncWorker*)key);
445
gkr_daemon_async_workers_stop_all (void)
449
if (!running_workers)
452
g_assert (done_queue);
454
g_hash_table_foreach (running_workers, cancel_each_worker, NULL);
456
while (running_workers) {
457
g_assert (g_hash_table_size (running_workers) > 0);
458
cleanup_done_threads ();
459
gkr_daemon_async_yield ();
463
/* -----------------------------------------------------------------------------
464
* ASYNC FUNCTIONS FOR ANY THREAD
468
gkr_daemon_async_yield (void)
470
GkrDaemonAsyncWorker *worker;
472
g_assert (async_mutex);
474
worker = (GkrDaemonAsyncWorker*)g_static_private_get (&thread_private);
475
if (worker && g_atomic_int_get (&worker->cancelled))
478
/* Let another worker or the main loop run */
479
DO_UNLOCK (async_mutex);
481
DO_LOCK (async_mutex);
483
if (worker && g_atomic_int_get (&worker->cancelled))
490
gkr_daemon_async_is_stopping (void)
492
GkrDaemonAsyncWorker *worker;
494
worker = (GkrDaemonAsyncWorker*)g_static_private_get (&thread_private);
495
if (worker && g_atomic_int_get (&worker->cancelled))
502
gkr_daemon_async_begin_concurrent (void)
504
g_assert (async_mutex);
506
/* Let another worker or the main loop run */
507
DO_UNLOCK (async_mutex);
511
gkr_daemon_async_end_concurrent (void)
513
g_assert (async_mutex);
515
/* Make sure only one thread is running */
516
DO_LOCK (async_mutex);
520
gkr_daemon_async_register_cancel (GDestroyNotify cancel, gpointer data)
522
GkrCancelCallback *cb;
523
GkrDaemonAsyncWorker *worker;
527
worker = (GkrDaemonAsyncWorker*)g_static_private_get (&thread_private);
529
/* We don't support cancellation funcs for main thread */
533
cb = g_slice_new (GkrCancelCallback);
534
cb->cancel_func = cancel;
535
cb->user_data = data;
537
g_queue_push_tail (worker->cancel_funcs, cb);
541
match_cancel_func (gconstpointer a, gconstpointer b)
543
return memcmp (a, b, sizeof (GkrCancelCallback));
547
gkr_daemon_async_unregister_cancel (GDestroyNotify cancel, gpointer data)
549
GkrCancelCallback match;
550
GkrDaemonAsyncWorker *worker;
555
worker = (GkrDaemonAsyncWorker*)g_static_private_get (&thread_private);
557
/* We don't support cancellation funcs for main thread */
561
match.cancel_func = cancel;
562
match.user_data = data;
564
l = g_queue_find_custom (worker->cancel_funcs, &match, match_cancel_func);
566
g_slice_free (GkrCancelCallback, l->data);
567
g_queue_delete_link (worker->cancel_funcs, l);
571
/* -----------------------------------------------------------------------------
576
gkr_daemon_async_wait_new (void)
578
return (GkrDaemonAsyncWait*)g_cond_new ();
582
gkr_daemon_async_wait_free (GkrDaemonAsyncWait *wait)
586
g_cond_free ((GCond*)wait);
590
gkr_daemon_async_wait (GkrDaemonAsyncWait *wait)
593
g_cond_wait ((GCond*)wait, async_mutex);
597
gkr_daemon_async_notify (GkrDaemonAsyncWait *wait)
600
g_cond_signal ((GCond*)wait);
604
gkr_daemon_async_usleep (gulong microseconds)
606
g_assert (async_mutex);
608
/* Let another worker or the main loop run */
609
DO_UNLOCK (async_mutex);
611
g_usleep (microseconds);
613
DO_LOCK (async_mutex);
617
gkr_daemon_async_sleep (glong seconds)
619
g_assert (async_mutex);
621
/* Let another worker or the main loop run */
622
DO_UNLOCK (async_mutex);
624
g_usleep (G_USEC_PER_SEC * seconds);
626
DO_LOCK (async_mutex);