1
/*********************************************************
2
* Copyright (C) 2010 VMware, Inc. All rights reserved.
4
* This program is free software; you can redistribute it and/or modify it
5
* under the terms of the GNU Lesser General Public License as published
6
* by the Free Software Foundation version 2.1 and no later version.
8
* This program is distributed in the hope that it will be useful, but
9
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10
* or FITNESS FOR A PARTICULAR PURPOSE. See the Lesser GNU General Public
11
* License for more details.
13
* You should have received a copy of the GNU Lesser General Public License
14
* along with this program; if not, write to the Free Software Foundation, Inc.,
15
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17
*********************************************************/
22
* Implementation of the shared thread pool defined in threadPool.h.
28
#include "toolsCoreInt.h"
29
#include "serviceObj.h"
30
#include "vmware/tools/threadPool.h"
32
#define DEFAULT_MAX_IDLE_TIME 5000
33
#define DEFAULT_MAX_THREADS 5
34
#define DEFAULT_MAX_UNUSED_THREADS 0
36
typedef struct ThreadPoolState {
48
typedef struct WorkerTask {
57
typedef struct StandaloneTask {
60
ToolsCorePoolCb interrupt;
67
static ThreadPoolState gState;
71
*******************************************************************************
72
* ToolsCorePoolCompareTask -- */ /**
74
* Compares two WorkerTask instances.
76
* @param[in] p1 Pointer to WorkerTask.
77
* @param[in] p2 Pointer to WorkerTask.
79
* @return > 0, 0, < 0 if p1's ID is less than, equal, or greater than p2's.
81
*******************************************************************************
85
ToolsCorePoolCompareTask(gconstpointer p1,
88
const WorkerTask *t1 = p1;
89
const WorkerTask *t2 = p2;
91
if (t1 != NULL && t2 != NULL) {
92
return (t2->id - t1->id);
95
if (t1 == NULL && t2 == NULL) {
99
return (t1 != NULL) ? -1 : 1;
104
*******************************************************************************
105
* ToolsCorePoolDestroyThread -- */ /**
107
* Releases resources associated with a StandaloneTask, joining the thread
108
* that's executing it.
110
* @param[in] data A StandaloneTask.
112
*******************************************************************************
116
ToolsCorePoolDestroyThread(gpointer data)
118
StandaloneTask *task = data;
119
g_thread_join(task->thread);
120
if (task->dtor != NULL) {
121
task->dtor(task->data);
128
*******************************************************************************
129
* ToolsCorePoolDestroyTask -- */ /**
131
* Frees memory associated with a WorkerTask, calling its destructor if one is
134
* @param[in] data A WorkerTask.
136
*******************************************************************************
140
ToolsCorePoolDestroyTask(gpointer data)
142
WorkerTask *work = data;
143
if (work->dtor != NULL) {
144
work->dtor(work->data);
151
*******************************************************************************
152
* ToolsCorePoolDoWork -- */ /**
154
* Execute a work item.
156
* @param[in] data A WorkerTask.
160
*******************************************************************************
164
ToolsCorePoolDoWork(gpointer data)
166
WorkerTask *work = data;
169
* In single threaded mode, remove the task being executed from the queue.
170
* In multi-threaded mode, the thread pool callback already did this.
172
if (gState.pool == NULL) {
173
g_mutex_lock(gState.lock);
174
g_queue_remove(gState.workQueue, work);
175
g_mutex_unlock(gState.lock);
178
work->cb(gState.ctx, work->data);
184
*******************************************************************************
185
* ToolsCorePoolNoOp -- */ /**
187
* Idle callback for destroying a standalone thread. Does nothing, since the
188
* actual destruction is done by ToolsCorePoolDestroyThread.
190
* @param[in] data Unused.
194
*******************************************************************************
198
ToolsCorePoolNoOp(gpointer data)
205
*******************************************************************************
206
* ToolsCorePoolRunThread -- */ /**
208
* Standalone thread runner. Executes the task associated with the thread, and
209
* schedule a task to clean up the thread state when done.
211
* @param[in] data A StandaloneTask.
215
*******************************************************************************
219
ToolsCorePoolRunThread(gpointer data)
221
StandaloneTask *task = data;
223
task->cb(gState.ctx, task->data);
224
task->active = FALSE;
226
g_mutex_lock(gState.lock);
227
/* If not active, the shutdown function will clean things up. */
229
g_ptr_array_remove(gState.threads, task);
230
g_idle_add_full(G_PRIORITY_DEFAULT_IDLE,
233
ToolsCorePoolDestroyThread);
235
g_mutex_unlock(gState.lock);
242
*******************************************************************************
243
* ToolsCorePoolRunWorker -- */ /**
245
* Thread pool callback function. Dequeues the next work item from the work
246
* queue and execute it.
248
* @param[in] state Description of state.
249
* @param[in] clientData Description of clientData.
251
*******************************************************************************
255
ToolsCorePoolRunWorker(gpointer state,
260
g_mutex_lock(gState.lock);
261
work = g_queue_pop_tail(gState.workQueue);
262
g_mutex_unlock(gState.lock);
264
ASSERT(work != NULL);
266
ToolsCorePoolDoWork(work);
267
ToolsCorePoolDestroyTask(work);
272
*******************************************************************************
273
* ToolsCorePoolSubmit -- */ /**
275
* Submits a new task for execution in one of the shared worker threads.
277
* @see ToolsCorePool_SubmitTask()
279
* @param[in] ctx Application context.
280
* @param[in] cb Function to execute the task.
281
* @param[in] data Opaque data for the task.
282
* @param[in] dtor Destructor for the task data.
284
* @return New task's ID, or 0 on error.
286
*******************************************************************************
290
ToolsCorePoolSubmit(ToolsAppCtx *ctx,
296
WorkerTask *task = g_malloc0(sizeof *task);
303
g_mutex_lock(gState.lock);
305
if (!gState.active) {
311
* XXX: a reeeeeeeeeally long running task could cause clashes (e.g., reusing
312
* the same task ID after the counter wraps). That shouldn't really happen in
313
* practice (and is an abuse of the thread pool, and could cause issues if
314
* someone sets the pool size to 0 or 1), but it might be good to have more
315
* fail-safe code here.
317
if (gState.nextWorkId + 1 == UINT_MAX) {
319
gState.nextWorkId = 0;
321
task->id = ++gState.nextWorkId;
327
* We always add the task to the queue, even in single threaded mode, so
328
* that it can be canceled. In single threaded mode, it's unlikely someone
329
* will be able to cancel it before it runs, but they can try.
331
g_queue_push_head(gState.workQueue, task);
333
if (gState.pool != NULL) {
336
/* The client data pointer is bogus, just to avoid passing NULL. */
337
g_thread_pool_push(gState.pool, &gState, &err);
341
g_warning("error sending work request, executing in service thread: %s",
347
/* Run the task in the service's thread. */
348
task->srcId = g_idle_add_full(G_PRIORITY_DEFAULT_IDLE,
351
ToolsCorePoolDestroyTask);
354
g_mutex_unlock(gState.lock);
360
*******************************************************************************
361
* ToolsCorePoolCancel -- */ /**
363
* Cancels a queue task.
365
* @see ToolsCorePool_CancelTask()
367
* @param[in] id Task ID.
369
*******************************************************************************
373
ToolsCorePoolCancel(guint id)
376
WorkerTask *task = NULL;
377
WorkerTask search = { id, };
379
g_return_if_fail(id != 0);
381
g_mutex_lock(gState.lock);
382
if (!gState.active) {
386
taskLnk = g_queue_find_custom(gState.workQueue, &search, ToolsCorePoolCompareTask);
387
if (taskLnk != NULL) {
388
task = taskLnk->data;
389
g_queue_delete_link(gState.workQueue, taskLnk);
393
g_mutex_unlock(gState.lock);
396
if (task->srcId > 0) {
397
g_source_remove(task->srcId);
399
ToolsCorePoolDestroyTask(task);
406
*******************************************************************************
407
* ToolsCorePoolStart -- */ /**
409
* Start a new task in a dedicated thread.
411
* @see ToolsCorePool_StartThread()
413
* @param[in] ctx Application context.
414
* @param[in] cb Callback that executes the task.
415
* @param[in] interrupt Callback that interrupts the task.
416
* @param[in] data Opaque data.
417
* @param[in] dtor Destructor for the task data.
419
* @return TRUE iff thread was successfully started.
421
*******************************************************************************
425
ToolsCorePoolStart(ToolsAppCtx *ctx,
427
ToolsCorePoolCb interrupt,
432
StandaloneTask *task = NULL;
434
g_mutex_lock(gState.lock);
435
if (!gState.active) {
439
task = g_malloc0(sizeof *task);
442
task->interrupt = interrupt;
445
task->thread = g_thread_create(ToolsCorePoolRunThread, task, TRUE, &err);
448
g_ptr_array_add(gState.threads, task);
450
g_warning("failed to start thread: %s.", err->message);
457
g_mutex_unlock(gState.lock);
463
*******************************************************************************
464
* ToolsCorePool_Init -- */ /**
466
* Initializes the shared thread pool. Reads configuration data from the
467
* container-specific section of the config dictionary, so different containers
468
* can have different configuration. Exports the thread pool functions through
469
* the service's object.
471
* @param[in] ctx Application context.
473
*******************************************************************************
477
ToolsCorePool_Init(ToolsAppCtx *ctx)
482
ToolsServiceProperty prop = { TOOLS_CORE_PROP_TPOOL };
484
gState.funcs.submit = ToolsCorePoolSubmit;
485
gState.funcs.cancel = ToolsCorePoolCancel;
486
gState.funcs.start = ToolsCorePoolStart;
489
maxThreads = g_key_file_get_integer(ctx->config, ctx->name,
490
"pool.maxThreads", &err);
492
maxThreads = DEFAULT_MAX_THREADS;
496
if (maxThreads > 0) {
497
gState.pool = g_thread_pool_new(ToolsCorePoolRunWorker,
498
NULL, maxThreads, FALSE, &err);
500
#if GLIB_CHECK_VERSION(2, 10, 0)
504
maxIdleTime = g_key_file_get_integer(ctx->config, ctx->name,
505
"pool.maxIdleTime", &err);
506
if (err != NULL || maxIdleTime <= 0) {
507
maxIdleTime = DEFAULT_MAX_IDLE_TIME;
511
maxUnused = g_key_file_get_integer(ctx->config, ctx->name,
512
"pool.maxUnusedThreads", &err);
513
if (err != NULL || maxUnused < 0) {
514
maxUnused = DEFAULT_MAX_UNUSED_THREADS;
518
g_thread_pool_set_max_idle_time(maxIdleTime);
519
g_thread_pool_set_max_unused_threads(maxUnused);
522
g_warning("error initializing thread pool, running single threaded: %s",
528
gState.active = TRUE;
529
gState.lock = g_mutex_new();
530
gState.threads = g_ptr_array_new();
531
gState.workQueue = g_queue_new();
533
ToolsCoreService_RegisterProperty(ctx->serviceObj, &prop);
534
g_object_set(ctx->serviceObj, TOOLS_CORE_PROP_TPOOL, &gState.funcs, NULL);
539
*******************************************************************************
540
* ToolsCorePool_Shutdown -- */ /**
542
* Shuts down the shared thread pool. This function will interrupt any running
543
* threads (by calling their registered interrupt function), and wait for all
544
* running tasks to finish before cleaning the remaining tasks and shared state.
546
* @param[in] ctx Application context.
548
*******************************************************************************
552
ToolsCorePool_Shutdown(ToolsAppCtx *ctx)
556
g_mutex_lock(gState.lock);
557
gState.active = FALSE;
558
g_mutex_unlock(gState.lock);
560
/* Notify all spawned threads to stop. */
561
for (i = 0; i < gState.threads->len; i++) {
562
StandaloneTask *task = g_ptr_array_index(gState.threads, i);
563
if (task->active && task->interrupt) {
564
task->interrupt(gState.ctx, task->data);
568
/* Stop the thread pool. */
569
if (gState.pool != NULL) {
570
g_thread_pool_free(gState.pool, TRUE, TRUE);
573
/* Join all spawned threads. */
574
for (i = 0; i < gState.threads->len; i++) {
575
StandaloneTask *task = g_ptr_array_index(gState.threads, i);
576
ToolsCorePoolDestroyThread(task);
579
/* Destroy all pending tasks. */
581
WorkerTask *task = g_queue_pop_tail(gState.workQueue);
583
ToolsCorePoolDestroyTask(task);
590
g_ptr_array_free(gState.threads, TRUE);
591
g_queue_free(gState.workQueue);
592
g_mutex_free(gState.lock);
593
memset(&gState, 0, sizeof gState);
594
g_object_set(ctx->serviceObj, TOOLS_CORE_PROP_TPOOL, NULL, NULL);