2
* threadpool.c: a generic thread pool implementation
4
* Copyright (C) 2010 Hu Tao
5
* Copyright (C) 2010 Daniel P. Berrange
7
* This library is free software; you can redistribute it and/or
8
* modify it under the terms of the GNU Lesser General Public
9
* License as published by the Free Software Foundation; either
10
* version 2.1 of the License, or (at your option) any later version.
12
* This library is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this library; if not, write to the Free Software
19
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
* Hu Tao <hutao@cn.fujitsu.com>
23
* Daniel P. Berrange <berrange@redhat.com>
30
#include "threadpool.h"
33
#include "virterror_internal.h"
34
#include "ignore-value.h"
36
#define VIR_FROM_THIS VIR_FROM_NONE
38
typedef struct _virThreadPoolJob virThreadPoolJob;
39
typedef virThreadPoolJob *virThreadPoolJobPtr;
41
struct _virThreadPoolJob {
42
virThreadPoolJobPtr next;
47
typedef struct _virThreadPoolJobList virThreadPoolJobList;
48
typedef virThreadPoolJobList *virThreadPoolJobListPtr;
50
struct _virThreadPoolJobList {
51
virThreadPoolJobPtr head;
52
virThreadPoolJobPtr *tail;
56
struct _virThreadPool {
59
virThreadPoolJobFunc jobFunc;
61
virThreadPoolJobList jobList;
73
static void virThreadPoolWorker(void *opaque)
75
virThreadPoolPtr pool = opaque;
77
virMutexLock(&pool->mutex);
81
!pool->jobList.head) {
83
if (virCondWait(&pool->cond, &pool->mutex) < 0) {
93
virThreadPoolJobPtr job = pool->jobList.head;
94
pool->jobList.head = pool->jobList.head->next;
96
if (pool->jobList.tail == &job->next)
97
pool->jobList.tail = &pool->jobList.head;
99
virMutexUnlock(&pool->mutex);
100
(pool->jobFunc)(job->data, pool->jobOpaque);
102
virMutexLock(&pool->mutex);
107
if (pool->nWorkers == 0)
108
virCondSignal(&pool->quit_cond);
109
virMutexUnlock(&pool->mutex);
112
virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
114
virThreadPoolJobFunc func,
117
virThreadPoolPtr pool;
120
if (minWorkers > maxWorkers)
121
minWorkers = maxWorkers;
123
if (VIR_ALLOC(pool) < 0) {
128
pool->jobList.head = NULL;
129
pool->jobList.tail = &pool->jobList.head;
131
pool->jobFunc = func;
132
pool->jobOpaque = opaque;
134
if (virMutexInit(&pool->mutex) < 0)
136
if (virCondInit(&pool->cond) < 0)
138
if (virCondInit(&pool->quit_cond) < 0)
141
if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
144
pool->maxWorkers = maxWorkers;
145
for (i = 0; i < minWorkers; i++) {
146
if (virThreadCreate(&pool->workers[i],
158
virThreadPoolFree(pool);
163
void virThreadPoolFree(virThreadPoolPtr pool)
165
virThreadPoolJobPtr job;
170
virMutexLock(&pool->mutex);
172
if (pool->nWorkers > 0) {
173
virCondBroadcast(&pool->cond);
174
ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
177
while ((job = pool->jobList.head)) {
178
pool->jobList.head = pool->jobList.head->next;
182
VIR_FREE(pool->workers);
183
virMutexUnlock(&pool->mutex);
184
virMutexDestroy(&pool->mutex);
185
ignore_value(virCondDestroy(&pool->quit_cond));
186
ignore_value(virCondDestroy(&pool->cond));
190
int virThreadPoolSendJob(virThreadPoolPtr pool,
193
virThreadPoolJobPtr job;
195
virMutexLock(&pool->mutex);
199
if (pool->freeWorkers == 0 &&
200
pool->nWorkers < pool->maxWorkers) {
201
if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
206
if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
215
if (VIR_ALLOC(job) < 0) {
222
*pool->jobList.tail = job;
223
pool->jobList.tail = &(*pool->jobList.tail)->next;
225
virCondSignal(&pool->cond);
226
virMutexUnlock(&pool->mutex);
231
virMutexUnlock(&pool->mutex);