102
91
work_q->tail = NULL;
105
work_q->item_count--;
109
static struct threadpool_list {
111
struct threadpool_list *nxt;
113
static pthread_mutex_t pools_lock = PTHREAD_MUTEX_INITIALIZER;
115
static void add_topools(threadpool_t *t)
117
struct threadpool_list *new = malloc(sizeof(*new));
119
logg("!Unable to add threadpool to list\n");
123
pthread_mutex_lock(&pools_lock);
126
pthread_mutex_unlock(&pools_lock);
129
static void remove_frompools(threadpool_t *t)
131
struct threadpool_list *l, *prev;
132
struct task_desc *desc;
133
pthread_mutex_lock(&pools_lock);
136
while(l && l->pool != t) {
149
struct task_desc *q = desc;
154
pthread_mutex_unlock(&pools_lock);
157
static void print_queue(int f, work_queue_t *queue, struct timeval *tv_now)
159
long umin=~0UL, umax=0, usum=0;
160
unsigned invalids = 0, cnt = 0;
165
for(q=queue->head;q;q=q->next) {
167
delta = tv_now->tv_usec - q->time_queued.tv_usec;
168
delta += (tv_now->tv_sec - q->time_queued.tv_sec)*1000000;
180
mdprintf(f," min_wait: %.6f max_wait: %.6f avg_wait: %.6f",
181
umin/1e6, umax/1e6, usum /(1e6*cnt));
183
mdprintf(f," (INVALID timestamps: %u)", invalids);
184
if(cnt + invalids != (unsigned)queue->item_count)
185
mdprintf(f," (ERROR: %u != %u)", cnt + invalids,
186
(unsigned)queue->item_count);
189
int thrmgr_printstats(int f, char term)
191
struct threadpool_list *l;
192
unsigned cnt, pool_cnt = 0;
193
size_t pool_used = 0, pool_total = 0, seen_cnt = 0, error_flag = 0;
194
float mem_heap = 0, mem_mmap = 0, mem_used = 0, mem_free = 0, mem_releasable = 0;
195
const struct cl_engine **seen = NULL;
196
int has_libc_memstats = 0;
198
pthread_mutex_lock(&pools_lock);
199
for(cnt=0,l=pools;l;l=l->nxt) cnt++;
200
mdprintf(f,"POOLS: %u\n\n", cnt);
201
for(l= pools;l && !error_flag;l = l->nxt) {
202
threadpool_t *pool = l->pool;
204
struct timeval tv_now;
205
struct task_desc *task;
209
mdprintf(f,"NULL\n\n");
212
/* now we can access desc->, knowing that they won't get freed
213
* because the other tasks can't quit while pool_mutex is taken
215
switch(pool->state) {
229
mdprintf(f, "STATE: %s %s\n", state, l->nxt ? "" : "PRIMARY");
230
mdprintf(f, "THREADS: live %u idle %u max %u idle-timeout %u\n"
231
,pool->thr_alive, pool->thr_idle, pool->thr_max,
233
/* TODO: show both queues */
234
mdprintf(f,"QUEUE: %u items", pool->single_queue->item_count + pool->bulk_queue->item_count);
235
gettimeofday(&tv_now, NULL);
236
print_queue(f, pool->bulk_queue, &tv_now);
237
print_queue(f, pool->single_queue, &tv_now);
239
for(task = pool->tasks; task; task = task->nxt) {
243
delta = tv_now.tv_usec - task->tv.tv_usec;
244
delta += (tv_now.tv_sec - task->tv.tv_sec)*1000000.0;
245
mdprintf(f,"\t%s %f %s\n",
246
task->command ? task->command : "N/A",
248
task->filename ? task->filename:"");
250
/* we usually have at most 2 engines so a linear
251
* search is good enough */
253
for (i=0;i<seen_cnt;i++) {
254
if (seen[i] == task->engine)
257
/* we need to count the memusage from the same
258
* engine only once */
260
const struct cl_engine **s;
263
s = realloc(seen, seen_cnt * sizeof(*seen));
269
seen[seen_cnt - 1] = task->engine;
271
if (mpool_getstats(task->engine, &used, &total) != -1) {
284
struct mallinfo inf = mallinfo();
285
mem_heap = inf.arena/(1024*1024.0);
286
mem_mmap = inf.hblkhd/(1024*1024.0);
287
mem_used = (inf.usmblks + inf.uordblks)/(1024*1024.0);
288
mem_free = (inf.fsmblks + inf.fordblks)/(1024*1024.0);
289
mem_releasable = inf.keepcost/(1024*1024.0);
294
mdprintf(f, "ERROR: error encountered while formatting statistics\n");
296
if (has_libc_memstats)
297
mdprintf(f,"MEMSTATS: heap %.3fM mmap %.3fM used %.3fM free %.3fM releasable %.3fM pools %u pools_used %.3fM pools_total %.3fM\n",
298
mem_heap, mem_mmap, mem_used, mem_free, mem_releasable, pool_cnt,
299
pool_used/(1024*1024.0), pool_total/(1024*1024.0));
301
mdprintf(f,"MEMSTATS: heap N/A mmap N/A used N/A free N/A releasable N/A pools %u pools_used %.3fM pools_total %.3fM\n",
302
pool_cnt, pool_used/(1024*1024.0), pool_total/(1024*1024.0));
304
mdprintf(f,"END%c", term);
305
pthread_mutex_unlock(&pools_lock);
309
97
void thrmgr_destroy(threadpool_t *threadpool)
314
if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
315
logg("!Mutex lock failed\n");
318
if(threadpool->state != POOL_VALID) {
319
if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
320
logg("!Mutex unlock failed\n");
99
if (!threadpool || (threadpool->state != POOL_VALID)) {
102
if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
103
logg("!Mutex lock failed\n");
325
106
threadpool->state = POOL_EXIT;
327
108
/* wait for threads to exit */
328
109
if (threadpool->thr_alive > 0) {
329
110
if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
340
remove_frompools(threadpool);
341
if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
342
logg("!Mutex unlock failed\n");
121
if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
122
logg("!Mutex unlock failed\n");
346
126
pthread_mutex_destroy(&(threadpool->pool_mutex));
347
pthread_cond_destroy(&(threadpool->idle_cond));
348
pthread_cond_destroy(&(threadpool->queueable_single_cond));
349
pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
350
127
pthread_cond_destroy(&(threadpool->pool_cond));
351
128
pthread_attr_destroy(&(threadpool->pool_attr));
352
free(threadpool->single_queue);
353
free(threadpool->bulk_queue);
129
free(threadpool->queue);
354
130
free(threadpool);
358
threadpool_t *thrmgr_new(int max_threads, int idle_timeout, int max_queue, void (*handler)(void *))
134
threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *))
360
136
threadpool_t *threadpool;
361
#if defined(C_BIGSTACK)
137
#if defined(C_BIGSTACK) || defined(C_BSD)
362
138
size_t stacksize;
365
141
if (max_threads <= 0) {
369
threadpool = (threadpool_t *) malloc(sizeof(threadpool_t));
145
threadpool = (threadpool_t *) mmalloc(sizeof(threadpool_t));
370
146
if (!threadpool) {
374
threadpool->single_queue = work_queue_new();
375
if (!threadpool->single_queue) {
379
threadpool->bulk_queue = work_queue_new();
380
if (!threadpool->bulk_queue) {
381
free(threadpool->single_queue);
386
threadpool->queue_max = max_queue;
150
threadpool->queue = work_queue_new();
151
if (!threadpool->queue) {
388
155
threadpool->thr_max = max_threads;
389
156
threadpool->thr_alive = 0;
390
157
threadpool->thr_idle = 0;
391
threadpool->thr_multiscan = 0;
392
158
threadpool->idle_timeout = idle_timeout;
393
159
threadpool->handler = handler;
394
threadpool->tasks = NULL;
396
if(pthread_mutex_init(&(threadpool->pool_mutex), NULL)) {
397
free(threadpool->single_queue);
398
free(threadpool->bulk_queue);
161
pthread_mutex_init(&(threadpool->pool_mutex), NULL);
403
162
if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) {
404
163
pthread_mutex_destroy(&(threadpool->pool_mutex));
405
free(threadpool->single_queue);
406
free(threadpool->bulk_queue);
411
if (pthread_cond_init(&(threadpool->queueable_single_cond), NULL) != 0) {
412
pthread_cond_destroy(&(threadpool->pool_cond));
413
pthread_mutex_destroy(&(threadpool->pool_mutex));
414
free(threadpool->single_queue);
415
free(threadpool->bulk_queue);
420
if (pthread_cond_init(&(threadpool->queueable_bulk_cond), NULL) != 0) {
421
pthread_cond_destroy(&(threadpool->queueable_single_cond));
422
pthread_cond_destroy(&(threadpool->pool_cond));
423
pthread_mutex_destroy(&(threadpool->pool_mutex));
424
free(threadpool->single_queue);
425
free(threadpool->bulk_queue);
431
if (pthread_cond_init(&(threadpool->idle_cond),NULL) != 0) {
432
pthread_cond_destroy(&(threadpool->queueable_single_cond));
433
pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
434
pthread_cond_destroy(&(threadpool->pool_cond));
435
pthread_mutex_destroy(&(threadpool->pool_mutex));
436
free(threadpool->single_queue);
437
free(threadpool->bulk_queue);
164
free(threadpool->queue);
442
169
if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
443
pthread_cond_destroy(&(threadpool->queueable_single_cond));
444
pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
445
pthread_cond_destroy(&(threadpool->idle_cond));
446
170
pthread_cond_destroy(&(threadpool->pool_cond));
447
171
pthread_mutex_destroy(&(threadpool->pool_mutex));
448
free(threadpool->single_queue);
449
free(threadpool->bulk_queue);
172
free(threadpool->queue);
450
173
free(threadpool);
454
177
if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
455
pthread_cond_destroy(&(threadpool->queueable_single_cond));
456
pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
457
178
pthread_attr_destroy(&(threadpool->pool_attr));
458
pthread_cond_destroy(&(threadpool->idle_cond));
459
179
pthread_cond_destroy(&(threadpool->pool_cond));
460
180
pthread_mutex_destroy(&(threadpool->pool_mutex));
461
free(threadpool->single_queue);
462
free(threadpool->bulk_queue);
181
free(threadpool->queue);
463
182
free(threadpool);
467
#if defined(C_BIGSTACK)
186
#if defined(C_BIGSTACK) || defined(C_BSD)
468
187
pthread_attr_getstacksize(&(threadpool->pool_attr), &stacksize);
469
188
stacksize = stacksize + 64 * 1024;
470
189
if (stacksize < 1048576) stacksize = 1048576; /* at least 1MB please */
471
logg("Set stacksize to %lu\n", (unsigned long int) stacksize);
190
logg("Set stacksize to %u\n", stacksize);
472
191
pthread_attr_setstacksize(&(threadpool->pool_attr), stacksize);
474
193
threadpool->state = POOL_VALID;
476
add_topools(threadpool);
477
195
return threadpool;
480
static pthread_key_t stats_tls_key;
481
static pthread_once_t stats_tls_key_once = PTHREAD_ONCE_INIT;
483
static void stats_tls_key_alloc(void)
485
pthread_key_create(&stats_tls_key, NULL);
488
static const char *IDLE_TASK = "IDLE";
490
/* no mutex is needed, we are using thread local variable */
491
void thrmgr_setactivetask(const char *filename, const char* cmd)
493
struct task_desc *desc;
494
pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
495
desc = pthread_getspecific(stats_tls_key);
498
desc->filename = filename;
500
if(cmd == IDLE_TASK && desc->command == cmd)
503
gettimeofday(&desc->tv, NULL);
507
void thrmgr_setactiveengine(const struct cl_engine *engine)
509
struct task_desc *desc;
510
pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
511
desc = pthread_getspecific(stats_tls_key);
514
desc->engine = engine;
517
/* thread pool mutex must be held on entry */
518
static void stats_init(threadpool_t *pool)
520
struct task_desc *desc = calloc(1, sizeof(*desc));
523
pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
524
pthread_setspecific(stats_tls_key, desc);
528
desc->nxt = pool->tasks;
529
pool->tasks->prv = desc;
534
/* thread pool mutex must be held on entry */
535
static void stats_destroy(threadpool_t *pool)
537
struct task_desc *desc = pthread_getspecific(stats_tls_key);
540
pthread_mutex_lock(&pools_lock);
542
desc->prv->nxt = desc->nxt;
544
desc->nxt->prv = desc->prv;
545
if(pool->tasks == desc)
546
pool->tasks = desc->nxt;
548
pthread_setspecific(stats_tls_key, NULL);
549
pthread_mutex_unlock(&pools_lock);
552
static inline int thrmgr_contended(threadpool_t *pool, int bulk)
554
/* don't allow bulk items to exceed 50% of queue, so that
555
* non-bulk items get a chance to be in the queue */
556
if (bulk && pool->bulk_queue->item_count >= pool->queue_max/2)
558
return pool->bulk_queue->item_count + pool->single_queue->item_count
559
+ pool->thr_alive - pool->thr_idle >= pool->queue_max;
562
/* when both queues have tasks, it will pick 4 items from the single queue,
563
* and 1 from the bulk */
564
#define SINGLE_BULK_RATIO 4
565
#define SINGLE_BULK_SUM (SINGLE_BULK_RATIO + 1)
567
/* must be called with pool_mutex held */
568
static void *thrmgr_pop(threadpool_t *pool)
571
work_queue_t *first, *second;
574
if (pool->single_queue->popped < SINGLE_BULK_RATIO) {
575
first = pool->single_queue;
576
second = pool->bulk_queue;
577
ratio = SINGLE_BULK_RATIO;
579
second = pool->single_queue;
580
first = pool->bulk_queue;
581
ratio = SINGLE_BULK_SUM - SINGLE_BULK_RATIO;
584
task = work_queue_pop(first);
586
if (++first->popped == ratio)
589
task = work_queue_pop(second);
591
if (++second->popped == ratio)
596
if (!thrmgr_contended(pool, 0)) {
597
logg("$THRMGR: queue (single) crossed low threshold -> signaling\n");
598
pthread_cond_signal(&pool->queueable_single_cond);
601
if (!thrmgr_contended(pool, 1)) {
602
logg("$THRMGR: queue (bulk) crossed low threshold -> signaling\n");
603
pthread_cond_signal(&pool->queueable_bulk_cond);
610
198
static void *thrmgr_worker(void *arg)
612
200
threadpool_t *threadpool = (threadpool_t *) arg;
614
int retval, must_exit = FALSE, stats_inited = FALSE;
202
int retval, must_exit = FALSE;
615
203
struct timespec timeout;
617
205
/* loop looking for work */
619
207
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
620
209
logg("!Fatal: mutex lock failed\n");
624
stats_init(threadpool);
627
thrmgr_setactiveengine(NULL);
628
thrmgr_setactivetask(NULL, IDLE_TASK);
629
212
timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
630
213
timeout.tv_nsec = 0;
631
214
threadpool->thr_idle++;
632
while (((job_data=thrmgr_pop(threadpool)) == NULL)
215
while (((job_data=work_queue_pop(threadpool->queue)) == NULL)
633
216
&& (threadpool->state != POOL_EXIT)) {
634
217
/* Sleep, awaiting wakeup */
635
pthread_cond_signal(&threadpool->idle_cond);
636
218
retval = pthread_cond_timedwait(&(threadpool->pool_cond),
637
219
&(threadpool->pool_mutex), &timeout);
638
220
if (retval == ETIMEDOUT) {
694
pthread_cond_t *queueable_cond;
697
if (threadpool->state != POOL_VALID) {
703
queue = threadpool->bulk_queue;
704
queueable_cond = &threadpool->queueable_bulk_cond;
706
queue = threadpool->single_queue;
707
queueable_cond = &threadpool->queueable_single_cond;
710
while (thrmgr_contended(threadpool, bulk)) {
711
logg("$THRMGR: contended, sleeping\n");
712
pthread_cond_wait(queueable_cond, &threadpool->pool_mutex);
713
logg("$THRMGR: contended, woken\n");
716
if (!work_queue_add(queue, user_data)) {
721
items = threadpool->single_queue->item_count + threadpool->bulk_queue->item_count;
722
if ((threadpool->thr_idle < items) &&
723
(threadpool->thr_alive < threadpool->thr_max)) {
273
if (threadpool->state != POOL_VALID) {
274
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
275
logg("!Mutex unlock failed\n");
280
if (!work_queue_add(threadpool->queue, user_data)) {
281
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
282
logg("!Mutex unlock failed\n");
288
if ((threadpool->thr_idle == 0) &&
289
(threadpool->thr_alive < threadpool->thr_max)) {
724
290
/* Start a new thread */
725
291
if (pthread_create(&thr_id, &(threadpool->pool_attr),
726
thrmgr_worker, threadpool) != 0) {
727
logg("!pthread_create failed\n");
292
thrmgr_worker, threadpool) != 0) {
293
logg("!pthread_create failed\n");
729
threadpool->thr_alive++;
295
threadpool->thr_alive++;
732
pthread_cond_signal(&(threadpool->pool_cond));
298
pthread_cond_signal(&(threadpool->pool_cond));
736
300
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
737
logg("!Mutex unlock failed\n");
301
logg("!Mutex unlock failed\n");
743
int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
745
return thrmgr_dispatch_internal(threadpool, user_data, 0);
748
int thrmgr_group_dispatch(threadpool_t *threadpool, jobgroup_t *group, void *user_data, int bulk)
752
pthread_mutex_lock(&group->mutex);
754
logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
755
pthread_mutex_unlock(&group->mutex);
757
if (!(ret = thrmgr_dispatch_internal(threadpool, user_data, bulk)) && group) {
758
pthread_mutex_lock(&group->mutex);
760
logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
761
pthread_mutex_unlock(&group->mutex);
767
* 0 - this was not the last thread in the group
768
* 1 - this was last thread in group, group freed
770
int thrmgr_group_finished(jobgroup_t *group, enum thrmgr_exit exitc)
774
/* there is no group, we are obviously the last one */
777
pthread_mutex_lock(&group->mutex);
778
logg("$THRMGR: group_finished: %p, %d\n", group, group->jobs);
791
if (!--group->jobs) {
794
logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
795
if (group->jobs == 1)
796
pthread_cond_signal(&group->only);
798
pthread_mutex_unlock(&group->mutex);
800
logg("$THRMGR: group_finished: freeing %p\n", group);
801
pthread_mutex_destroy(&group->mutex);
802
pthread_cond_destroy(&group->only);
808
void thrmgr_group_waitforall(jobgroup_t *group, unsigned *ok, unsigned *error, unsigned *total)
810
int needexit = 0, needfree = 0;
811
struct timespec timeout;
812
pthread_mutex_lock(&group->mutex);
813
while (group->jobs > 1) {
814
pthread_mutex_lock(&exit_mutex);
816
pthread_mutex_unlock(&exit_mutex);
819
/* wake to check progexit */
820
timeout.tv_sec = time(NULL) + 5;
822
pthread_cond_timedwait(&group->only, &group->mutex, &timeout);
824
*ok = group->exit_ok;
825
*error = group->exit_error + needexit;
826
*total = group->exit_total;
830
logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
831
pthread_mutex_unlock(&group->mutex);
833
logg("$THRMGR: group finished freeing %p\n", group);
838
jobgroup_t *thrmgr_group_new(void)
842
group = malloc(sizeof(*group));
846
group->exit_ok = group->exit_error = group->exit_total = group->force_exit = 0;
847
if (pthread_mutex_init(&group->mutex, NULL)) {
848
logg("^Failed to initialize group mutex");
852
if (pthread_cond_init(&group->only, NULL)) {
853
logg("^Failed to initialize group cond");
854
pthread_mutex_destroy(&group->mutex);
858
logg("$THRMGR: new group: %p\n", group);
862
int thrmgr_group_need_terminate(jobgroup_t *group)
866
pthread_mutex_lock(&group->mutex);
867
ret = group->force_exit;
868
pthread_mutex_unlock(&group->mutex);
871
pthread_mutex_lock(&exit_mutex);
873
pthread_mutex_unlock(&exit_mutex);
877
void thrmgr_group_terminate(jobgroup_t *group)
880
/* we may not be the last active job, now
881
* the last active job will free resources */
882
pthread_mutex_lock(&group->mutex);
883
group->force_exit = 1;
884
pthread_mutex_unlock(&group->mutex);