2
* Unix SMB/CIFS implementation.
3
* thread pool implementation
4
* Copyright (C) Volker Lendecke 2009
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 3 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program. If not, see <http://www.gnu.org/licenses/>.
31
#include "pthreadpool.h"
32
#include "lib/util/dlinklist.h"
34
struct pthreadpool_job {
35
struct pthreadpool_job *next;
37
void (*fn)(void *private_data);
43
* List pthreadpools for fork safety
45
struct pthreadpool *prev, *next;
48
* Control access to this struct
50
pthread_mutex_t mutex;
53
* Threads waiting for work do so here
55
pthread_cond_t condvar;
60
struct pthreadpool_job *jobs, *last_job;
68
* indicator to worker threads that they should shut down
73
* maximum number of threads
83
* Number of idle threads
88
* An array of threads that require joining.
91
pthread_t *exited; /* We alloc more */
94
static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
95
static struct pthreadpool *pthreadpools = NULL;
96
static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
98
static void pthreadpool_prep_atfork(void);
101
* Initialize a thread pool
104
int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
106
struct pthreadpool *pool;
109
pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
114
ret = pipe(pool->sig_pipe);
121
ret = pthread_mutex_init(&pool->mutex, NULL);
123
close(pool->sig_pipe[0]);
124
close(pool->sig_pipe[1]);
129
ret = pthread_cond_init(&pool->condvar, NULL);
131
pthread_mutex_destroy(&pool->mutex);
132
close(pool->sig_pipe[0]);
133
close(pool->sig_pipe[1]);
139
pool->jobs = pool->last_job = NULL;
140
pool->num_threads = 0;
141
pool->num_exited = 0;
143
pool->max_threads = max_threads;
146
ret = pthread_mutex_lock(&pthreadpools_mutex);
148
pthread_cond_destroy(&pool->condvar);
149
pthread_mutex_destroy(&pool->mutex);
150
close(pool->sig_pipe[0]);
151
close(pool->sig_pipe[1]);
155
DLIST_ADD(pthreadpools, pool);
157
ret = pthread_mutex_unlock(&pthreadpools_mutex);
160
pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
167
static void pthreadpool_prepare(void)
170
struct pthreadpool *pool;
172
ret = pthread_mutex_lock(&pthreadpools_mutex);
177
while (pool != NULL) {
178
ret = pthread_mutex_lock(&pool->mutex);
184
static void pthreadpool_parent(void)
187
struct pthreadpool *pool;
189
pool = DLIST_TAIL(pthreadpools);
192
ret = pthread_mutex_unlock(&pool->mutex);
195
if (pool == pthreadpools) {
201
ret = pthread_mutex_unlock(&pthreadpools_mutex);
205
static void pthreadpool_child(void)
208
struct pthreadpool *pool;
210
pool = DLIST_TAIL(pthreadpools);
213
close(pool->sig_pipe[0]);
214
close(pool->sig_pipe[1]);
216
ret = pipe(pool->sig_pipe);
219
pool->num_threads = 0;
221
pool->num_exited = 0;
227
while (pool->jobs != NULL) {
228
struct pthreadpool_job *job;
230
pool->jobs = job->next;
233
pool->last_job = NULL;
235
ret = pthread_mutex_unlock(&pool->mutex);
238
if (pool == pthreadpools) {
244
ret = pthread_mutex_unlock(&pthreadpools_mutex);
248
static void pthreadpool_prep_atfork(void)
250
pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
255
* Return the file descriptor which becomes readable when a job has
259
int pthreadpool_signal_fd(struct pthreadpool *pool)
261
return pool->sig_pipe[0];
265
* Do a pthread_join() on all children that have exited, pool->mutex must be
268
static void pthreadpool_join_children(struct pthreadpool *pool)
272
for (i=0; i<pool->num_exited; i++) {
273
pthread_join(pool->exited[i], NULL);
275
pool->num_exited = 0;
278
* Deliberately not free and NULL pool->exited. That will be
279
* re-used by realloc later.
284
* Fetch a finished job number from the signal pipe
287
int pthreadpool_finished_job(struct pthreadpool *pool)
295
while ((nread == -1) && (errno == EINTR)) {
296
nread = read(pool->sig_pipe[0], &result, sizeof(int));
301
if (nread != sizeof(int)) {
308
* Destroy a thread pool, finishing all threads working for it
311
int pthreadpool_destroy(struct pthreadpool *pool)
315
ret = pthread_mutex_lock(&pool->mutex);
320
if ((pool->jobs != NULL) || pool->shutdown) {
321
ret = pthread_mutex_unlock(&pool->mutex);
326
if (pool->num_threads > 0) {
328
* We have active threads, tell them to finish, wait for that.
333
if (pool->num_idle > 0) {
335
* Wake the idle threads. They will find pool->quit to
336
* be set and exit themselves
338
ret = pthread_cond_broadcast(&pool->condvar);
340
pthread_mutex_unlock(&pool->mutex);
345
while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
347
if (pool->num_exited > 0) {
348
pthreadpool_join_children(pool);
352
* A thread that shuts down will also signal
355
ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
357
pthread_mutex_unlock(&pool->mutex);
363
ret = pthread_mutex_unlock(&pool->mutex);
367
ret = pthread_mutex_destroy(&pool->mutex);
368
ret1 = pthread_cond_destroy(&pool->condvar);
377
ret = pthread_mutex_lock(&pthreadpools_mutex);
381
DLIST_REMOVE(pthreadpools, pool);
382
ret = pthread_mutex_unlock(&pthreadpools_mutex);
385
close(pool->sig_pipe[0]);
386
pool->sig_pipe[0] = -1;
388
close(pool->sig_pipe[1]);
389
pool->sig_pipe[1] = -1;
398
* Prepare for pthread_exit(), pool->mutex must be locked
400
static void pthreadpool_server_exit(struct pthreadpool *pool)
404
pool->num_threads -= 1;
406
exited = (pthread_t *)realloc(
407
pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));
409
if (exited == NULL) {
410
/* lost a thread status */
413
pool->exited = exited;
415
pool->exited[pool->num_exited] = pthread_self();
416
pool->num_exited += 1;
419
static void *pthreadpool_server(void *arg)
421
struct pthreadpool *pool = (struct pthreadpool *)arg;
424
res = pthread_mutex_lock(&pool->mutex);
432
struct pthreadpool_job *job;
435
* idle-wait at most 1 second. If nothing happens in that
436
* time, exit this thread.
439
gettimeofday(&tv, NULL);
440
ts.tv_sec = tv.tv_sec + 1;
441
ts.tv_nsec = tv.tv_usec*1000;
443
while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
446
res = pthread_cond_timedwait(
447
&pool->condvar, &pool->mutex, &ts);
450
if (res == ETIMEDOUT) {
452
if (pool->jobs == NULL) {
454
* we timed out and still no work for
457
pthreadpool_server_exit(pool);
458
pthread_mutex_unlock(&pool->mutex);
473
* Ok, there's work for us to do, remove the job from
474
* the pthreadpool list
476
pool->jobs = job->next;
477
if (pool->last_job == job) {
478
pool->last_job = NULL;
482
* Do the work with the mutex unlocked
485
res = pthread_mutex_unlock(&pool->mutex);
488
job->fn(job->private_data);
490
written = write(pool->sig_pipe[1], &job->id,
495
res = pthread_mutex_lock(&pool->mutex);
498
if (written != sizeof(int)) {
499
pthreadpool_server_exit(pool);
500
pthread_mutex_unlock(&pool->mutex);
505
if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
507
* No more work to do and we're asked to shut down, so
510
pthreadpool_server_exit(pool);
512
if (pool->num_threads == 0) {
514
* Ping the main thread waiting for all of us
515
* workers to have quit.
517
pthread_cond_broadcast(&pool->condvar);
520
pthread_mutex_unlock(&pool->mutex);
526
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
527
void (*fn)(void *private_data), void *private_data)
529
struct pthreadpool_job *job;
532
sigset_t mask, omask;
534
job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
540
job->private_data = private_data;
544
res = pthread_mutex_lock(&pool->mutex);
550
if (pool->shutdown) {
552
* Protect against the pool being shut down while
553
* trying to add a job
555
res = pthread_mutex_unlock(&pool->mutex);
562
* Just some cleanup under the mutex
564
pthreadpool_join_children(pool);
567
* Add job to the end of the queue
569
if (pool->jobs == NULL) {
573
pool->last_job->next = job;
575
pool->last_job = job;
577
if (pool->num_idle > 0) {
579
* We have idle threads, wake one.
581
res = pthread_cond_signal(&pool->condvar);
582
pthread_mutex_unlock(&pool->mutex);
586
if ((pool->max_threads != 0) &&
587
(pool->num_threads >= pool->max_threads)) {
589
* No more new threads, we just queue the request
591
pthread_mutex_unlock(&pool->mutex);
596
* Create a new worker thread. It should not receive any signals.
601
res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
603
pthread_mutex_unlock(&pool->mutex);
607
res = pthread_create(&thread_id, NULL, pthreadpool_server,
610
pool->num_threads += 1;
613
assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
615
pthread_mutex_unlock(&pool->mutex);