~ubuntu-branches/ubuntu/feisty/clamav/feisty

« back to all changes in this revision

Viewing changes to clamd/thrmgr.c

  • Committer: Bazaar Package Importer
  • Author(s): Kees Cook
  • Date: 2007-02-20 10:33:44 UTC
  • mto: This revision was merged to the branch mainline in revision 16.
  • Revision ID: james.westby@ubuntu.com-20070220103344-zgcu2psnx9d98fpa
Tags: upstream-0.90
ImportĀ upstreamĀ versionĀ 0.90

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/*
2
 
 *  Copyright (C) 2007-2009 Sourcefire, Inc.
3
 
 *
4
 
 *  Authors: Trog, Tƶrƶk Edvin
 
2
 *  Copyright (C) 2004 Trog <trog@clamav.net>
5
3
 *
6
4
 *  This program is free software; you can redistribute it and/or modify
7
5
 *  it under the terms of the GNU General Public License as published by
27
25
#include <pthread.h>
28
26
#include <time.h>
29
27
#include <errno.h>
30
 
#include <string.h>
31
28
 
 
29
#include "shared/memory.h"
32
30
#include "shared/output.h"
33
31
 
34
32
#include "thrmgr.h"
35
33
#include "others.h"
36
 
#include "mpool.h"
37
 
#include "server.h"
38
 
 
39
 
#ifdef HAVE_MALLINFO
40
 
#include <malloc.h>
41
 
#endif
42
 
 
43
 
/* BSD and HP-UX need a bigger stacksize than the system default */
44
 
#if defined (C_BSD) || defined (C_HPUX) || defined(C_AIX)
45
 
#define C_BIGSTACK 1
46
 
#endif
 
34
 
 
35
#define FALSE (0)
 
36
#define TRUE (1)
47
37
 
48
38
static work_queue_t *work_queue_new(void)
49
39
{
50
40
        work_queue_t *work_q;
51
 
 
52
 
        work_q = (work_queue_t *) malloc(sizeof(work_queue_t));
 
41
        
 
42
        work_q = (work_queue_t *) mmalloc(sizeof(work_queue_t));
53
43
        if (!work_q) {
54
44
                return NULL;
55
45
        }
56
 
 
 
46
        
57
47
        work_q->head = work_q->tail = NULL;
58
48
        work_q->item_count = 0;
59
 
        work_q->popped = 0;
60
49
        return work_q;
61
50
}
62
51
 
63
52
static int work_queue_add(work_queue_t *work_q, void *data)
64
53
{
65
54
        work_item_t *work_item;
66
 
 
 
55
        
67
56
        if (!work_q) {
68
57
                return FALSE;
69
58
        }
70
 
        work_item = (work_item_t *) malloc(sizeof(work_item_t));
 
59
        work_item = (work_item_t *) mmalloc(sizeof(work_item_t));
71
60
        if (!work_item) {
72
61
                return FALSE;
73
62
        }
74
 
 
 
63
        
75
64
        work_item->next = NULL;
76
65
        work_item->data = data;
77
66
        gettimeofday(&(work_item->time_queued), NULL);
78
 
 
 
67
        
79
68
        if (work_q->head == NULL) {
80
69
                work_q->head = work_q->tail = work_item;
81
70
                work_q->item_count = 1;
91
80
{
92
81
        work_item_t *work_item;
93
82
        void *data;
94
 
 
 
83
        
95
84
        if (!work_q || !work_q->head) {
96
85
                return NULL;
97
86
        }
102
91
                work_q->tail = NULL;
103
92
        }
104
93
        free(work_item);
105
 
        work_q->item_count--;
106
94
        return data;
107
95
}
108
96
 
109
 
static struct threadpool_list {
110
 
        threadpool_t *pool;
111
 
        struct threadpool_list *nxt;
112
 
} *pools = NULL;
113
 
static pthread_mutex_t pools_lock = PTHREAD_MUTEX_INITIALIZER;
114
 
 
115
 
static void add_topools(threadpool_t *t)
116
 
{
117
 
        struct threadpool_list *new = malloc(sizeof(*new));
118
 
        if(!new) {
119
 
                logg("!Unable to add threadpool to list\n");
120
 
                return;
121
 
        }
122
 
        new->pool = t;
123
 
        pthread_mutex_lock(&pools_lock);
124
 
        new->nxt = pools;
125
 
        pools = new;
126
 
        pthread_mutex_unlock(&pools_lock);
127
 
}
128
 
 
129
 
static void remove_frompools(threadpool_t *t)
130
 
{
131
 
        struct threadpool_list *l, *prev;
132
 
        struct task_desc *desc;
133
 
        pthread_mutex_lock(&pools_lock);
134
 
        prev = NULL;
135
 
        l = pools;
136
 
        while(l && l->pool != t) {
137
 
                prev = l;
138
 
                l = l->nxt;
139
 
        }
140
 
        if(!l)
141
 
                return;
142
 
        if(prev)
143
 
                prev->nxt = l->nxt;
144
 
        if(l == pools)
145
 
                pools = l->nxt;
146
 
        free(l);
147
 
        desc = t->tasks;
148
 
        while(desc) {
149
 
                struct task_desc *q = desc;
150
 
                desc = desc->nxt;
151
 
                free(q);
152
 
        }
153
 
        t->tasks = NULL;
154
 
        pthread_mutex_unlock(&pools_lock);
155
 
}
156
 
 
157
 
static void print_queue(int f, work_queue_t *queue, struct timeval *tv_now)
158
 
{
159
 
    long umin=~0UL, umax=0, usum=0;
160
 
    unsigned invalids = 0, cnt = 0;
161
 
    work_item_t *q;
162
 
 
163
 
    if(!queue->head)
164
 
        return;
165
 
    for(q=queue->head;q;q=q->next) {
166
 
        long delta;
167
 
        delta = tv_now->tv_usec - q->time_queued.tv_usec;
168
 
        delta += (tv_now->tv_sec - q->time_queued.tv_sec)*1000000;
169
 
        if(delta < 0) {
170
 
            invalids++;
171
 
            continue;
172
 
        }
173
 
        if(delta > umax)
174
 
            umax = delta;
175
 
        if(delta < umin)
176
 
            umin = delta;
177
 
        usum += delta;
178
 
        ++cnt;
179
 
    }
180
 
    mdprintf(f," min_wait: %.6f max_wait: %.6f avg_wait: %.6f",
181
 
             umin/1e6, umax/1e6, usum /(1e6*cnt));
182
 
    if(invalids)
183
 
        mdprintf(f," (INVALID timestamps: %u)", invalids);
184
 
    if(cnt + invalids != (unsigned)queue->item_count)
185
 
        mdprintf(f," (ERROR: %u != %u)", cnt + invalids,
186
 
                 (unsigned)queue->item_count);
187
 
}
188
 
 
189
 
int thrmgr_printstats(int f, char term)
190
 
{
191
 
        struct threadpool_list *l;
192
 
        unsigned cnt, pool_cnt = 0;
193
 
        size_t pool_used = 0, pool_total = 0, seen_cnt = 0, error_flag = 0;
194
 
        float mem_heap = 0, mem_mmap = 0, mem_used = 0, mem_free = 0, mem_releasable = 0;
195
 
        const struct cl_engine **seen = NULL;
196
 
        int has_libc_memstats = 0;
197
 
 
198
 
        pthread_mutex_lock(&pools_lock);
199
 
        for(cnt=0,l=pools;l;l=l->nxt) cnt++;
200
 
        mdprintf(f,"POOLS: %u\n\n", cnt);
201
 
        for(l= pools;l && !error_flag;l = l->nxt) {
202
 
                threadpool_t *pool = l->pool;
203
 
                const char *state;
204
 
                struct timeval tv_now;
205
 
                struct task_desc *task;
206
 
                cnt = 0;
207
 
 
208
 
                if(!pool) {
209
 
                        mdprintf(f,"NULL\n\n");
210
 
                        continue;
211
 
                }
212
 
                /* now we can access desc->, knowing that they won't get freed
213
 
                 * because the other tasks can't quit while pool_mutex is taken
214
 
                 */
215
 
                switch(pool->state) {
216
 
                        case POOL_INVALID:
217
 
                                state = "INVALID";
218
 
                                break;
219
 
                        case POOL_VALID:
220
 
                                state = "VALID";
221
 
                                break;
222
 
                        case POOL_EXIT:
223
 
                                state = "EXIT";
224
 
                                break;
225
 
                        default:
226
 
                                state = "??";
227
 
                                break;
228
 
                }
229
 
                mdprintf(f, "STATE: %s %s\n", state, l->nxt ? "" : "PRIMARY");
230
 
                mdprintf(f, "THREADS: live %u  idle %u max %u idle-timeout %u\n"
231
 
                                ,pool->thr_alive, pool->thr_idle, pool->thr_max,
232
 
                                pool->idle_timeout);
233
 
                /* TODO: show both queues */
234
 
                mdprintf(f,"QUEUE: %u items", pool->single_queue->item_count + pool->bulk_queue->item_count);
235
 
                gettimeofday(&tv_now, NULL);
236
 
                print_queue(f, pool->bulk_queue, &tv_now);
237
 
                print_queue(f, pool->single_queue, &tv_now);
238
 
                mdprintf(f, "\n");
239
 
                for(task = pool->tasks; task; task = task->nxt) {
240
 
                        double delta;
241
 
                        size_t used, total;
242
 
 
243
 
                        delta = tv_now.tv_usec - task->tv.tv_usec;
244
 
                        delta += (tv_now.tv_sec - task->tv.tv_sec)*1000000.0;
245
 
                        mdprintf(f,"\t%s %f %s\n",
246
 
                                        task->command ? task->command : "N/A",
247
 
                                        delta/1e6,
248
 
                                        task->filename ? task->filename:"");
249
 
                        if (task->engine) {
250
 
                                /* we usually have at most 2 engines so a linear
251
 
                                 * search is good enough */
252
 
                                size_t i;
253
 
                                for (i=0;i<seen_cnt;i++) {
254
 
                                        if (seen[i] == task->engine)
255
 
                                                break;
256
 
                                }
257
 
                                /* we need to count the memusage from the same
258
 
                                 * engine only once */
259
 
                                if (i == seen_cnt) {
260
 
                                        const struct cl_engine **s;
261
 
                                        /* new engine */
262
 
                                        ++seen_cnt;
263
 
                                        s = realloc(seen, seen_cnt * sizeof(*seen));
264
 
                                        if (!s) {
265
 
                                                error_flag = 1;
266
 
                                                break;
267
 
                                        }
268
 
                                        seen = s;
269
 
                                        seen[seen_cnt - 1] = task->engine;
270
 
 
271
 
                                        if (mpool_getstats(task->engine, &used, &total) != -1) {
272
 
                                                pool_used += used;
273
 
                                                pool_total += total;
274
 
                                                pool_cnt++;
275
 
                                        }
276
 
                                }
277
 
                        }
278
 
                }
279
 
                mdprintf(f,"\n");
280
 
        }
281
 
        free(seen);
282
 
#ifdef HAVE_MALLINFO
283
 
        {
284
 
                struct mallinfo inf = mallinfo();
285
 
                mem_heap = inf.arena/(1024*1024.0);
286
 
                mem_mmap = inf.hblkhd/(1024*1024.0);
287
 
                mem_used = (inf.usmblks + inf.uordblks)/(1024*1024.0);
288
 
                mem_free = (inf.fsmblks + inf.fordblks)/(1024*1024.0);
289
 
                mem_releasable = inf.keepcost/(1024*1024.0);
290
 
                has_libc_memstats=1;
291
 
        }
292
 
#endif
293
 
        if (error_flag) {
294
 
                mdprintf(f, "ERROR: error encountered while formatting statistics\n");
295
 
        } else {
296
 
            if (has_libc_memstats)
297
 
                mdprintf(f,"MEMSTATS: heap %.3fM mmap %.3fM used %.3fM free %.3fM releasable %.3fM pools %u pools_used %.3fM pools_total %.3fM\n",
298
 
                        mem_heap, mem_mmap, mem_used, mem_free, mem_releasable, pool_cnt,
299
 
                        pool_used/(1024*1024.0), pool_total/(1024*1024.0));
300
 
            else
301
 
                mdprintf(f,"MEMSTATS: heap N/A mmap N/A used N/A free N/A releasable N/A pools %u pools_used %.3fM pools_total %.3fM\n",
302
 
                         pool_cnt, pool_used/(1024*1024.0), pool_total/(1024*1024.0));
303
 
        }
304
 
        mdprintf(f,"END%c", term);
305
 
        pthread_mutex_unlock(&pools_lock);
306
 
        return 0;
307
 
}
308
 
 
309
97
void thrmgr_destroy(threadpool_t *threadpool)
310
98
{
311
 
        if (!threadpool) {
312
 
                return;
313
 
        }
314
 
        if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
315
 
                logg("!Mutex lock failed\n");
316
 
                exit(-1);
317
 
        }
318
 
        if(threadpool->state != POOL_VALID) {
319
 
                if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
320
 
                        logg("!Mutex unlock failed\n");
321
 
                        exit(-1);
322
 
                }
323
 
                return;
 
99
        if (!threadpool || (threadpool->state != POOL_VALID)) {
 
100
                return;
 
101
        }
 
102
        if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
 
103
                logg("!Mutex lock failed\n");
 
104
                exit(-1);
324
105
        }
325
106
        threadpool->state = POOL_EXIT;
326
 
 
 
107
        
327
108
        /* wait for threads to exit */
328
109
        if (threadpool->thr_alive > 0) {
329
110
                if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
337
118
                        return;
338
119
                }
339
120
        }
340
 
        remove_frompools(threadpool);
341
 
        if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
342
 
                logg("!Mutex unlock failed\n");
343
 
                exit(-1);
344
 
        }
345
 
 
 
121
        if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
 
122
                logg("!Mutex unlock failed\n");
 
123
                exit(-1);
 
124
        }
 
125
        
346
126
        pthread_mutex_destroy(&(threadpool->pool_mutex));
347
 
        pthread_cond_destroy(&(threadpool->idle_cond));
348
 
        pthread_cond_destroy(&(threadpool->queueable_single_cond));
349
 
        pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
350
127
        pthread_cond_destroy(&(threadpool->pool_cond));
351
128
        pthread_attr_destroy(&(threadpool->pool_attr));
352
 
        free(threadpool->single_queue);
353
 
        free(threadpool->bulk_queue);
 
129
        free(threadpool->queue);
354
130
        free(threadpool);
355
131
        return;
356
132
}
357
133
 
358
 
threadpool_t *thrmgr_new(int max_threads, int idle_timeout, int max_queue, void (*handler)(void *))
 
134
threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *))
359
135
{
360
136
        threadpool_t *threadpool;
361
 
#if defined(C_BIGSTACK)
 
137
#if defined(C_BIGSTACK) || defined(C_BSD)
362
138
        size_t stacksize;
363
139
#endif
364
 
 
 
140
        
365
141
        if (max_threads <= 0) {
366
142
                return NULL;
367
143
        }
368
 
 
369
 
        threadpool = (threadpool_t *) malloc(sizeof(threadpool_t));
 
144
        
 
145
        threadpool = (threadpool_t *) mmalloc(sizeof(threadpool_t));
370
146
        if (!threadpool) {
371
147
                return NULL;
372
148
        }
373
149
 
374
 
        threadpool->single_queue = work_queue_new();
375
 
        if (!threadpool->single_queue) {
376
 
                free(threadpool);
377
 
                return NULL;
378
 
        }
379
 
        threadpool->bulk_queue = work_queue_new();
380
 
        if (!threadpool->bulk_queue) {
381
 
                free(threadpool->single_queue);
382
 
                free(threadpool);
383
 
                return NULL;
384
 
        }
385
 
 
386
 
        threadpool->queue_max = max_queue;
387
 
 
 
150
        threadpool->queue = work_queue_new();
 
151
        if (!threadpool->queue) {
 
152
                free(threadpool);
 
153
                return NULL;
 
154
        }       
388
155
        threadpool->thr_max = max_threads;
389
156
        threadpool->thr_alive = 0;
390
157
        threadpool->thr_idle = 0;
391
 
        threadpool->thr_multiscan = 0;
392
158
        threadpool->idle_timeout = idle_timeout;
393
159
        threadpool->handler = handler;
394
 
        threadpool->tasks = NULL;
395
 
 
396
 
        if(pthread_mutex_init(&(threadpool->pool_mutex), NULL)) {
397
 
                free(threadpool->single_queue);
398
 
                free(threadpool->bulk_queue);
399
 
                free(threadpool);
400
 
                return NULL;
401
 
        }
402
 
 
 
160
        
 
161
        pthread_mutex_init(&(threadpool->pool_mutex), NULL);
403
162
        if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) {
404
163
                pthread_mutex_destroy(&(threadpool->pool_mutex));
405
 
                free(threadpool->single_queue);
406
 
                free(threadpool->bulk_queue);
407
 
                free(threadpool);
408
 
                return NULL;
409
 
        }
410
 
 
411
 
        if (pthread_cond_init(&(threadpool->queueable_single_cond), NULL) != 0) {
412
 
                pthread_cond_destroy(&(threadpool->pool_cond));
413
 
                pthread_mutex_destroy(&(threadpool->pool_mutex));
414
 
                free(threadpool->single_queue);
415
 
                free(threadpool->bulk_queue);
416
 
                free(threadpool);
417
 
                return NULL;
418
 
        }
419
 
 
420
 
        if (pthread_cond_init(&(threadpool->queueable_bulk_cond), NULL) != 0) {
421
 
                pthread_cond_destroy(&(threadpool->queueable_single_cond));
422
 
                pthread_cond_destroy(&(threadpool->pool_cond));
423
 
                pthread_mutex_destroy(&(threadpool->pool_mutex));
424
 
                free(threadpool->single_queue);
425
 
                free(threadpool->bulk_queue);
426
 
                free(threadpool);
427
 
                return NULL;
428
 
        }
429
 
 
430
 
 
431
 
        if (pthread_cond_init(&(threadpool->idle_cond),NULL) != 0)  {
432
 
                pthread_cond_destroy(&(threadpool->queueable_single_cond));
433
 
                pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
434
 
                pthread_cond_destroy(&(threadpool->pool_cond));
435
 
                pthread_mutex_destroy(&(threadpool->pool_mutex));
436
 
                free(threadpool->single_queue);
437
 
                free(threadpool->bulk_queue);
438
 
                free(threadpool);
439
 
                return NULL;
440
 
        }
441
 
 
 
164
                free(threadpool->queue);
 
165
                free(threadpool);
 
166
                return NULL;
 
167
        }
 
168
                
442
169
        if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
443
 
                pthread_cond_destroy(&(threadpool->queueable_single_cond));
444
 
                pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
445
 
                pthread_cond_destroy(&(threadpool->idle_cond));
446
170
                pthread_cond_destroy(&(threadpool->pool_cond));
447
171
                pthread_mutex_destroy(&(threadpool->pool_mutex));
448
 
                free(threadpool->single_queue);
449
 
                free(threadpool->bulk_queue);
 
172
                free(threadpool->queue);
450
173
                free(threadpool);
451
174
                return NULL;
452
175
        }
453
 
 
 
176
        
454
177
        if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
455
 
                pthread_cond_destroy(&(threadpool->queueable_single_cond));
456
 
                pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
457
178
                pthread_attr_destroy(&(threadpool->pool_attr));
458
 
                pthread_cond_destroy(&(threadpool->idle_cond));
459
179
                pthread_cond_destroy(&(threadpool->pool_cond));
460
180
                pthread_mutex_destroy(&(threadpool->pool_mutex));
461
 
                free(threadpool->single_queue);
462
 
                free(threadpool->bulk_queue);
 
181
                free(threadpool->queue);
463
182
                free(threadpool);
464
183
                return NULL;
465
184
        }
466
185
 
467
 
#if defined(C_BIGSTACK)
 
186
#if defined(C_BIGSTACK) || defined(C_BSD)
468
187
        pthread_attr_getstacksize(&(threadpool->pool_attr), &stacksize);
469
188
        stacksize = stacksize + 64 * 1024;
470
189
        if (stacksize < 1048576) stacksize = 1048576; /* at least 1MB please */
471
 
        logg("Set stacksize to %lu\n", (unsigned long int) stacksize);
 
190
        logg("Set stacksize to %u\n", stacksize);
472
191
        pthread_attr_setstacksize(&(threadpool->pool_attr), stacksize);
473
192
#endif
474
193
        threadpool->state = POOL_VALID;
475
194
 
476
 
        add_topools(threadpool);
477
195
        return threadpool;
478
196
}
479
197
 
480
 
static pthread_key_t stats_tls_key;
481
 
static pthread_once_t stats_tls_key_once = PTHREAD_ONCE_INIT;
482
 
 
483
 
static void stats_tls_key_alloc(void)
484
 
{
485
 
        pthread_key_create(&stats_tls_key, NULL);
486
 
}
487
 
 
488
 
static const char *IDLE_TASK = "IDLE";
489
 
 
490
 
/* no mutex is needed, we are using  thread local variable */
491
 
void thrmgr_setactivetask(const char *filename, const char* cmd)
492
 
{
493
 
        struct task_desc *desc;
494
 
        pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
495
 
        desc = pthread_getspecific(stats_tls_key);
496
 
        if(!desc)
497
 
                return;
498
 
        desc->filename = filename;
499
 
        if(cmd) {
500
 
                if(cmd == IDLE_TASK && desc->command == cmd)
501
 
                        return;
502
 
                desc->command = cmd;
503
 
                gettimeofday(&desc->tv, NULL);
504
 
        }
505
 
}
506
 
 
507
 
void thrmgr_setactiveengine(const struct cl_engine *engine)
508
 
{
509
 
        struct task_desc *desc;
510
 
        pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
511
 
        desc = pthread_getspecific(stats_tls_key);
512
 
        if(!desc)
513
 
                return;
514
 
        desc->engine = engine;
515
 
}
516
 
 
517
 
/* thread pool mutex must be held on entry */
518
 
static void stats_init(threadpool_t *pool)
519
 
{
520
 
        struct task_desc *desc = calloc(1, sizeof(*desc));
521
 
        if(!desc)
522
 
                return;
523
 
        pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
524
 
        pthread_setspecific(stats_tls_key, desc);
525
 
        if(!pool->tasks)
526
 
                pool->tasks = desc;
527
 
        else {
528
 
                desc->nxt = pool->tasks;
529
 
                pool->tasks->prv = desc;
530
 
                pool->tasks = desc;
531
 
        }
532
 
}
533
 
 
534
 
/* thread pool mutex must be held on entry */
535
 
static void stats_destroy(threadpool_t *pool)
536
 
{
537
 
        struct task_desc *desc = pthread_getspecific(stats_tls_key);
538
 
        if(!desc)
539
 
                return;
540
 
        pthread_mutex_lock(&pools_lock);
541
 
        if(desc->prv)
542
 
                desc->prv->nxt = desc->nxt;
543
 
        if(desc->nxt)
544
 
                desc->nxt->prv = desc->prv;
545
 
        if(pool->tasks == desc)
546
 
                pool->tasks = desc->nxt;
547
 
        free(desc);
548
 
        pthread_setspecific(stats_tls_key, NULL);
549
 
        pthread_mutex_unlock(&pools_lock);
550
 
}
551
 
 
552
 
static inline int thrmgr_contended(threadpool_t *pool, int bulk)
553
 
{
554
 
    /* don't allow bulk items to exceed 50% of queue, so that
555
 
     * non-bulk items get a chance to be in the queue */
556
 
    if (bulk && pool->bulk_queue->item_count >= pool->queue_max/2)
557
 
        return 1;
558
 
    return pool->bulk_queue->item_count + pool->single_queue->item_count
559
 
        + pool->thr_alive - pool->thr_idle >= pool->queue_max;
560
 
}
561
 
 
562
 
/* when both queues have tasks, it will pick 4 items from the single queue,
563
 
 * and 1 from the bulk */
564
 
#define SINGLE_BULK_RATIO 4
565
 
#define SINGLE_BULK_SUM (SINGLE_BULK_RATIO + 1)
566
 
 
567
 
/* must be called with pool_mutex held */
568
 
static void *thrmgr_pop(threadpool_t *pool)
569
 
{
570
 
    void *task;
571
 
    work_queue_t *first, *second;
572
 
    int ratio;
573
 
 
574
 
    if (pool->single_queue->popped < SINGLE_BULK_RATIO) {
575
 
        first = pool->single_queue;
576
 
        second = pool->bulk_queue;
577
 
        ratio = SINGLE_BULK_RATIO;
578
 
    } else {
579
 
        second = pool->single_queue;
580
 
        first = pool->bulk_queue;
581
 
        ratio = SINGLE_BULK_SUM - SINGLE_BULK_RATIO;
582
 
    }
583
 
 
584
 
    task = work_queue_pop(first);
585
 
    if (task) {
586
 
        if (++first->popped == ratio)
587
 
            second->popped = 0;
588
 
    } else {
589
 
        task = work_queue_pop(second);
590
 
        if (task) {
591
 
            if (++second->popped == ratio)
592
 
                first->popped = 0;
593
 
        }
594
 
    }
595
 
 
596
 
    if (!thrmgr_contended(pool, 0)) {
597
 
        logg("$THRMGR: queue (single) crossed low threshold -> signaling\n");
598
 
        pthread_cond_signal(&pool->queueable_single_cond);
599
 
    }
600
 
 
601
 
    if (!thrmgr_contended(pool, 1)) {
602
 
        logg("$THRMGR: queue (bulk) crossed low threshold -> signaling\n");
603
 
        pthread_cond_signal(&pool->queueable_bulk_cond);
604
 
    }
605
 
 
606
 
    return task;
607
 
}
608
 
 
609
 
 
610
198
static void *thrmgr_worker(void *arg)
611
199
{
612
200
        threadpool_t *threadpool = (threadpool_t *) arg;
613
201
        void *job_data;
614
 
        int retval, must_exit = FALSE, stats_inited = FALSE;
 
202
        int retval, must_exit = FALSE;
615
203
        struct timespec timeout;
616
 
 
 
204
        
617
205
        /* loop looking for work */
618
206
        for (;;) {
619
207
                if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
 
208
                        /* Fatal error */
620
209
                        logg("!Fatal: mutex lock failed\n");
621
210
                        exit(-2);
622
211
                }
623
 
                if(!stats_inited) {
624
 
                        stats_init(threadpool);
625
 
                        stats_inited = TRUE;
626
 
                }
627
 
                thrmgr_setactiveengine(NULL);
628
 
                thrmgr_setactivetask(NULL, IDLE_TASK);
629
212
                timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
630
213
                timeout.tv_nsec = 0;
631
214
                threadpool->thr_idle++;
632
 
                while (((job_data=thrmgr_pop(threadpool)) == NULL)
 
215
                while (((job_data=work_queue_pop(threadpool->queue)) == NULL)
633
216
                                && (threadpool->state != POOL_EXIT)) {
634
217
                        /* Sleep, awaiting wakeup */
635
 
                        pthread_cond_signal(&threadpool->idle_cond);
636
218
                        retval = pthread_cond_timedwait(&(threadpool->pool_cond),
637
219
                                &(threadpool->pool_mutex), &timeout);
638
220
                        if (retval == ETIMEDOUT) {
644
226
                if (threadpool->state == POOL_EXIT) {
645
227
                        must_exit = TRUE;
646
228
                }
647
 
 
 
229
                
648
230
                if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
 
231
                        /* Fatal error */
649
232
                        logg("!Fatal: mutex unlock failed\n");
650
233
                        exit(-2);
651
234
                }
665
248
                /* signal that all threads are finished */
666
249
                pthread_cond_broadcast(&threadpool->pool_cond);
667
250
        }
668
 
        stats_destroy(threadpool);
669
251
        if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
670
252
                /* Fatal error */
671
253
                logg("!Fatal: mutex unlock failed\n");
674
256
        return NULL;
675
257
}
676
258
 
677
 
static int thrmgr_dispatch_internal(threadpool_t *threadpool, void *user_data, int bulk)
 
259
int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
678
260
{
679
 
        int ret = TRUE;
680
261
        pthread_t thr_id;
681
262
 
682
263
        if (!threadpool) {
689
270
                return FALSE;
690
271
        }
691
272
 
692
 
        do {
693
 
            work_queue_t *queue;
694
 
            pthread_cond_t *queueable_cond;
695
 
            int items;
696
 
 
697
 
            if (threadpool->state != POOL_VALID) {
698
 
                ret = FALSE;
699
 
                break;
700
 
            }
701
 
 
702
 
            if (bulk) {
703
 
                queue = threadpool->bulk_queue;
704
 
                queueable_cond = &threadpool->queueable_bulk_cond;
705
 
            } else {
706
 
                queue = threadpool->single_queue;
707
 
                queueable_cond = &threadpool->queueable_single_cond;
708
 
            }
709
 
 
710
 
            while (thrmgr_contended(threadpool, bulk)) {
711
 
                logg("$THRMGR: contended, sleeping\n");
712
 
                pthread_cond_wait(queueable_cond, &threadpool->pool_mutex);
713
 
                logg("$THRMGR: contended, woken\n");
714
 
            }
715
 
 
716
 
            if (!work_queue_add(queue, user_data)) {
717
 
                ret = FALSE;
718
 
                break;
719
 
            }
720
 
 
721
 
            items = threadpool->single_queue->item_count + threadpool->bulk_queue->item_count;
722
 
            if ((threadpool->thr_idle < items) &&
723
 
                (threadpool->thr_alive < threadpool->thr_max)) {
 
273
        if (threadpool->state != POOL_VALID) {
 
274
                if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
 
275
                        logg("!Mutex unlock failed\n");
 
276
                        return FALSE;
 
277
                }
 
278
                return FALSE;
 
279
        }
 
280
        if (!work_queue_add(threadpool->queue, user_data)) {
 
281
                if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
 
282
                        logg("!Mutex unlock failed\n");
 
283
                        return FALSE;
 
284
                }
 
285
                return FALSE;
 
286
        }
 
287
 
 
288
        if ((threadpool->thr_idle == 0) &&
 
289
                        (threadpool->thr_alive < threadpool->thr_max)) {
724
290
                /* Start a new thread */
725
291
                if (pthread_create(&thr_id, &(threadpool->pool_attr),
726
 
                                   thrmgr_worker, threadpool) != 0) {
727
 
                    logg("!pthread_create failed\n");
 
292
                                thrmgr_worker, threadpool) != 0) {
 
293
                        logg("!pthread_create failed\n");
728
294
                } else {
729
 
                    threadpool->thr_alive++;
 
295
                        threadpool->thr_alive++;
730
296
                }
731
 
            }
732
 
            pthread_cond_signal(&(threadpool->pool_cond));
733
 
 
734
 
        } while (0);
 
297
        }
 
298
        pthread_cond_signal(&(threadpool->pool_cond));
735
299
 
736
300
        if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
737
 
            logg("!Mutex unlock failed\n");
738
 
            return FALSE;
 
301
                logg("!Mutex unlock failed\n");
 
302
                return FALSE;
739
303
        }
740
 
        return ret;
741
 
}
742
 
 
743
 
int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
744
 
{
745
 
    return thrmgr_dispatch_internal(threadpool, user_data, 0);
746
 
}
747
 
 
748
 
int thrmgr_group_dispatch(threadpool_t *threadpool, jobgroup_t *group, void *user_data, int bulk)
749
 
{
750
 
    int ret;
751
 
    if (group) {
752
 
        pthread_mutex_lock(&group->mutex);
753
 
        group->jobs++;
754
 
        logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
755
 
        pthread_mutex_unlock(&group->mutex);
756
 
    }
757
 
    if (!(ret = thrmgr_dispatch_internal(threadpool, user_data, bulk)) && group) {
758
 
        pthread_mutex_lock(&group->mutex);
759
 
        group->jobs--;
760
 
        logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
761
 
        pthread_mutex_unlock(&group->mutex);
762
 
    }
763
 
    return ret;
764
 
}
765
 
 
766
 
/* returns
767
 
 *   0 - this was not the last thread in the group
768
 
 *   1 - this was last thread in group, group freed
769
 
 */
770
 
int thrmgr_group_finished(jobgroup_t *group, enum thrmgr_exit exitc)
771
 
{
772
 
    int ret = 0;
773
 
    if (!group) {
774
 
        /* there is no group, we are obviously the last one */
775
 
        return 1;
776
 
    }
777
 
    pthread_mutex_lock(&group->mutex);
778
 
    logg("$THRMGR: group_finished: %p, %d\n", group, group->jobs);
779
 
    group->exit_total++;
780
 
    switch (exitc) {
781
 
        case EXIT_OK:
782
 
            group->exit_ok++;
783
 
            break;
784
 
        case EXIT_ERROR:
785
 
            group->exit_error++;
786
 
            break;
787
 
        default:
788
 
            break;
789
 
    }
790
 
    if (group->jobs) {
791
 
        if (!--group->jobs) {
792
 
            ret = 1;
793
 
        } else
794
 
            logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
795
 
        if (group->jobs == 1)
796
 
            pthread_cond_signal(&group->only);
797
 
    }
798
 
    pthread_mutex_unlock(&group->mutex);
799
 
    if (ret) {
800
 
        logg("$THRMGR: group_finished: freeing %p\n", group);
801
 
        pthread_mutex_destroy(&group->mutex);
802
 
        pthread_cond_destroy(&group->only);
803
 
        free(group);
804
 
    }
805
 
    return ret;
806
 
}
807
 
 
808
 
void thrmgr_group_waitforall(jobgroup_t *group, unsigned *ok, unsigned *error, unsigned *total)
809
 
{
810
 
    int needexit = 0, needfree = 0;
811
 
    struct timespec timeout;
812
 
    pthread_mutex_lock(&group->mutex);
813
 
    while (group->jobs > 1) {
814
 
        pthread_mutex_lock(&exit_mutex);
815
 
        needexit = progexit;
816
 
        pthread_mutex_unlock(&exit_mutex);
817
 
        if (needexit)
818
 
            break;
819
 
        /* wake to check progexit */
820
 
        timeout.tv_sec = time(NULL) + 5;
821
 
        timeout.tv_nsec = 0;
822
 
        pthread_cond_timedwait(&group->only, &group->mutex, &timeout);
823
 
    }
824
 
    *ok = group->exit_ok;
825
 
    *error = group->exit_error + needexit;
826
 
    *total = group->exit_total;
827
 
    if(!--group->jobs)
828
 
        needfree = 1;
829
 
    else
830
 
        logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
831
 
    pthread_mutex_unlock(&group->mutex);
832
 
    if (needfree) {
833
 
        logg("$THRMGR: group finished freeing %p\n", group);
834
 
        free(group);
835
 
    }
836
 
}
837
 
 
838
 
jobgroup_t *thrmgr_group_new(void)
839
 
{
840
 
    jobgroup_t *group;
841
 
 
842
 
    group = malloc(sizeof(*group));
843
 
    if (!group)
844
 
        return NULL;
845
 
    group->jobs = 1;
846
 
    group->exit_ok = group->exit_error = group->exit_total = group->force_exit = 0;
847
 
    if (pthread_mutex_init(&group->mutex, NULL)) {
848
 
        logg("^Failed to initialize group mutex");
849
 
        free(group);
850
 
        return NULL;
851
 
    }
852
 
    if (pthread_cond_init(&group->only, NULL)) {
853
 
        logg("^Failed to initialize group cond");
854
 
        pthread_mutex_destroy(&group->mutex);
855
 
        free(group);
856
 
        return NULL;
857
 
    }
858
 
    logg("$THRMGR: new group: %p\n", group);
859
 
    return group;
860
 
}
861
 
 
862
 
int thrmgr_group_need_terminate(jobgroup_t *group)
863
 
{
864
 
    int ret;
865
 
    if (group) {
866
 
        pthread_mutex_lock(&group->mutex);
867
 
        ret = group->force_exit;
868
 
        pthread_mutex_unlock(&group->mutex);
869
 
    } else
870
 
        ret = 0;
871
 
    pthread_mutex_lock(&exit_mutex);
872
 
    ret |= progexit;
873
 
    pthread_mutex_unlock(&exit_mutex);
874
 
    return ret;
875
 
}
876
 
 
877
 
void thrmgr_group_terminate(jobgroup_t *group)
878
 
{
879
 
    if (group) {
880
 
        /* we may not be the last active job, now
881
 
         * the last active job will free resources */
882
 
        pthread_mutex_lock(&group->mutex);
883
 
        group->force_exit = 1;
884
 
        pthread_mutex_unlock(&group->mutex);
885
 
    }
 
304
        return TRUE;
886
305
}