~ubuntu-branches/ubuntu/trusty/apr-util/trusty

« back to all changes in this revision

Viewing changes to misc/apr_thread_pool.c

  • Committer: Bazaar Package Importer
  • Author(s): Ryan Niebur
  • Date: 2009-03-26 22:25:48 UTC
  • mto: (4.1.1 squeeze) (20.1.2 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20090326222548-v103269kb84vo0ub
Tags: upstream-1.3.4+dfsg
ImportĀ upstreamĀ versionĀ 1.3.4+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
 
3
 * contributor license agreements.  See the NOTICE file distributed
 
4
 * with this work for additional information regarding copyright
 
5
 * ownership.  The ASF licenses this file to you under the Apache
 
6
 * License, Version 2.0 (the "License"); you may not use this file
 
7
 * except in compliance with the License.  You may obtain a copy of
 
8
 * the License at
 
9
 *
 
10
 *     http://www.apache.org/licenses/LICENSE-2.0
 
11
 *
 
12
 * Unless required by applicable law or agreed to in writing, software
 
13
 * distributed under the License is distributed on an "AS IS" BASIS,
 
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
15
 * implied.  See the License for the specific language governing
 
16
 * permissions and limitations under the License.
 
17
 */
 
18
 
 
19
#include <assert.h>
 
20
#include "apr_thread_pool.h"
 
21
#include "apr_ring.h"
 
22
#include "apr_thread_cond.h"
 
23
#include "apr_portable.h"
 
24
 
 
25
#if APR_HAS_THREADS
 
26
 
 
27
#define TASK_PRIORITY_SEGS 4
 
28
#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
 
29
 
 
30
typedef struct apr_thread_pool_task
 
31
{
 
32
    APR_RING_ENTRY(apr_thread_pool_task) link;
 
33
    apr_thread_start_t func;
 
34
    void *param;
 
35
    void *owner;
 
36
    union
 
37
    {
 
38
        apr_byte_t priority;
 
39
        apr_time_t time;
 
40
    } dispatch;
 
41
} apr_thread_pool_task_t;
 
42
 
 
43
APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
 
44
 
 
45
struct apr_thread_list_elt
 
46
{
 
47
    APR_RING_ENTRY(apr_thread_list_elt) link;
 
48
    apr_thread_t *thd;
 
49
    volatile void *current_owner;
 
50
    volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state;
 
51
};
 
52
 
 
53
APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
 
54
 
 
55
struct apr_thread_pool
 
56
{
 
57
    apr_pool_t *pool;
 
58
    volatile apr_size_t thd_max;
 
59
    volatile apr_size_t idle_max;
 
60
    volatile apr_interval_time_t idle_wait;
 
61
    volatile apr_size_t thd_cnt;
 
62
    volatile apr_size_t idle_cnt;
 
63
    volatile apr_size_t task_cnt;
 
64
    volatile apr_size_t scheduled_task_cnt;
 
65
    volatile apr_size_t threshold;
 
66
    volatile apr_size_t tasks_run;
 
67
    volatile apr_size_t tasks_high;
 
68
    volatile apr_size_t thd_high;
 
69
    volatile apr_size_t thd_timed_out;
 
70
    struct apr_thread_pool_tasks *tasks;
 
71
    struct apr_thread_pool_tasks *scheduled_tasks;
 
72
    struct apr_thread_list *busy_thds;
 
73
    struct apr_thread_list *idle_thds;
 
74
    apr_thread_mutex_t *lock;
 
75
    apr_thread_mutex_t *cond_lock;
 
76
    apr_thread_cond_t *cond;
 
77
    volatile int terminated;
 
78
    struct apr_thread_pool_tasks *recycled_tasks;
 
79
    struct apr_thread_list *recycled_thds;
 
80
    apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
 
81
};
 
82
 
 
83
static apr_status_t thread_pool_construct(apr_thread_pool_t * me,
 
84
                                          apr_size_t init_threads,
 
85
                                          apr_size_t max_threads)
 
86
{
 
87
    apr_status_t rv;
 
88
    int i;
 
89
 
 
90
    me->thd_max = max_threads;
 
91
    me->idle_max = init_threads;
 
92
    me->threshold = init_threads / 2;
 
93
    rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
 
94
                                 me->pool);
 
95
    if (APR_SUCCESS != rv) {
 
96
        return rv;
 
97
    }
 
98
    rv = apr_thread_mutex_create(&me->cond_lock, APR_THREAD_MUTEX_UNNESTED,
 
99
                                 me->pool);
 
100
    if (APR_SUCCESS != rv) {
 
101
        apr_thread_mutex_destroy(me->lock);
 
102
        return rv;
 
103
    }
 
104
    rv = apr_thread_cond_create(&me->cond, me->pool);
 
105
    if (APR_SUCCESS != rv) {
 
106
        apr_thread_mutex_destroy(me->lock);
 
107
        apr_thread_mutex_destroy(me->cond_lock);
 
108
        return rv;
 
109
    }
 
110
    me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
 
111
    if (!me->tasks) {
 
112
        goto CATCH_ENOMEM;
 
113
    }
 
114
    APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
 
115
    me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
 
116
    if (!me->scheduled_tasks) {
 
117
        goto CATCH_ENOMEM;
 
118
    }
 
119
    APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
 
120
    me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
 
121
    if (!me->recycled_tasks) {
 
122
        goto CATCH_ENOMEM;
 
123
    }
 
124
    APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
 
125
    me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
 
126
    if (!me->busy_thds) {
 
127
        goto CATCH_ENOMEM;
 
128
    }
 
129
    APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
 
130
    me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
 
131
    if (!me->idle_thds) {
 
132
        goto CATCH_ENOMEM;
 
133
    }
 
134
    APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
 
135
    me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));
 
136
    if (!me->recycled_thds) {
 
137
        goto CATCH_ENOMEM;
 
138
    }
 
139
    APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link);
 
140
    me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;
 
141
    me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0;
 
142
    me->idle_wait = 0;
 
143
    me->terminated = 0;
 
144
    for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
 
145
        me->task_idx[i] = NULL;
 
146
    }
 
147
    goto FINAL_EXIT;
 
148
  CATCH_ENOMEM:
 
149
    rv = APR_ENOMEM;
 
150
    apr_thread_mutex_destroy(me->lock);
 
151
    apr_thread_mutex_destroy(me->cond_lock);
 
152
    apr_thread_cond_destroy(me->cond);
 
153
  FINAL_EXIT:
 
154
    return rv;
 
155
}
 
156
 
 
157
/*
 
158
 * NOTE: This function is not thread safe by itself. Caller should hold the lock
 
159
 */
 
160
static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
 
161
{
 
162
    apr_thread_pool_task_t *task = NULL;
 
163
    int seg;
 
164
 
 
165
    /* check for scheduled tasks */
 
166
    if (me->scheduled_task_cnt > 0) {
 
167
        task = APR_RING_FIRST(me->scheduled_tasks);
 
168
        assert(task != NULL);
 
169
        assert(task !=
 
170
               APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
 
171
                                 link));
 
172
        /* if it's time */
 
173
        if (task->dispatch.time <= apr_time_now()) {
 
174
            --me->scheduled_task_cnt;
 
175
            APR_RING_REMOVE(task, link);
 
176
            return task;
 
177
        }
 
178
    }
 
179
    /* check for normal tasks if we're not returning a scheduled task */
 
180
    if (me->task_cnt == 0) {
 
181
        return NULL;
 
182
    }
 
183
 
 
184
    task = APR_RING_FIRST(me->tasks);
 
185
    assert(task != NULL);
 
186
    assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
 
187
    --me->task_cnt;
 
188
    seg = TASK_PRIORITY_SEG(task);
 
189
    if (task == me->task_idx[seg]) {
 
190
        me->task_idx[seg] = APR_RING_NEXT(task, link);
 
191
        if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
 
192
                                                   apr_thread_pool_task, link)
 
193
            || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
 
194
            me->task_idx[seg] = NULL;
 
195
        }
 
196
    }
 
197
    APR_RING_REMOVE(task, link);
 
198
    return task;
 
199
}
 
200
 
 
201
static apr_interval_time_t waiting_time(apr_thread_pool_t * me)
 
202
{
 
203
    apr_thread_pool_task_t *task = NULL;
 
204
 
 
205
    task = APR_RING_FIRST(me->scheduled_tasks);
 
206
    assert(task != NULL);
 
207
    assert(task !=
 
208
           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
 
209
                             link));
 
210
    return task->dispatch.time - apr_time_now();
 
211
}
 
212
 
 
213
/*
 
214
 * NOTE: This function is not thread safe by itself. Caller should hold the lock
 
215
 */
 
216
static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me,
 
217
                                           apr_thread_t * t)
 
218
{
 
219
    struct apr_thread_list_elt *elt;
 
220
 
 
221
    if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) {
 
222
        elt = apr_pcalloc(me->pool, sizeof(*elt));
 
223
        if (NULL == elt) {
 
224
            return NULL;
 
225
        }
 
226
    }
 
227
    else {
 
228
        elt = APR_RING_FIRST(me->recycled_thds);
 
229
        APR_RING_REMOVE(elt, link);
 
230
    }
 
231
 
 
232
    APR_RING_ELEM_INIT(elt, link);
 
233
    elt->thd = t;
 
234
    elt->current_owner = NULL;
 
235
    elt->state = TH_RUN;
 
236
    return elt;
 
237
}
 
238
 
 
239
/*
 
240
 * The worker thread function. Take a task from the queue and perform it if
 
241
 * there is any. Otherwise, put itself into the idle thread list and waiting
 
242
 * for signal to wake up.
 
243
 * The thread terminate directly by detach and exit when it is asked to stop
 
244
 * after finishing a task. Otherwise, the thread should be in idle thread list
 
245
 * and should be joined.
 
246
 */
 
247
static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
 
248
{
 
249
    apr_status_t rv = APR_SUCCESS;
 
250
    apr_thread_pool_t *me = param;
 
251
    apr_thread_pool_task_t *task = NULL;
 
252
    apr_interval_time_t wait;
 
253
    struct apr_thread_list_elt *elt;
 
254
 
 
255
    apr_thread_mutex_lock(me->lock);
 
256
    elt = elt_new(me, t);
 
257
    if (!elt) {
 
258
        apr_thread_mutex_unlock(me->lock);
 
259
        apr_thread_exit(t, APR_ENOMEM);
 
260
    }
 
261
 
 
262
    while (!me->terminated && elt->state != TH_STOP) {
 
263
        /* Test if not new element, it is awakened from idle */
 
264
        if (APR_RING_NEXT(elt, link) != elt) {
 
265
            --me->idle_cnt;
 
266
            APR_RING_REMOVE(elt, link);
 
267
        }
 
268
 
 
269
        APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
 
270
        task = pop_task(me);
 
271
        while (NULL != task && !me->terminated) {
 
272
            ++me->tasks_run;
 
273
            elt->current_owner = task->owner;
 
274
            apr_thread_mutex_unlock(me->lock);
 
275
            apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);
 
276
            task->func(t, task->param);
 
277
            apr_thread_mutex_lock(me->lock);
 
278
            APR_RING_INSERT_TAIL(me->recycled_tasks, task,
 
279
                                 apr_thread_pool_task, link);
 
280
            elt->current_owner = NULL;
 
281
            if (TH_STOP == elt->state) {
 
282
                break;
 
283
            }
 
284
            task = pop_task(me);
 
285
        }
 
286
        assert(NULL == elt->current_owner);
 
287
        if (TH_STOP != elt->state)
 
288
            APR_RING_REMOVE(elt, link);
 
289
 
 
290
        /* Test if a busy thread been asked to stop, which is not joinable */
 
291
        if ((me->idle_cnt >= me->idle_max
 
292
             && !(me->scheduled_task_cnt && 0 >= me->idle_max)
 
293
             && !me->idle_wait)
 
294
            || me->terminated || elt->state != TH_RUN) {
 
295
            --me->thd_cnt;
 
296
            if ((TH_PROBATION == elt->state) && me->idle_wait)
 
297
                ++me->thd_timed_out;
 
298
            APR_RING_INSERT_TAIL(me->recycled_thds, elt,
 
299
                                 apr_thread_list_elt, link);
 
300
            apr_thread_mutex_unlock(me->lock);
 
301
            apr_thread_detach(t);
 
302
            apr_thread_exit(t, APR_SUCCESS);
 
303
            return NULL;        /* should not be here, safe net */
 
304
        }
 
305
 
 
306
        /* busy thread become idle */
 
307
        ++me->idle_cnt;
 
308
        APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
 
309
 
 
310
        /* 
 
311
         * If there is a scheduled task, always scheduled to perform that task.
 
312
         * Since there is no guarantee that current idle threads are scheduled
 
313
         * for next scheduled task.
 
314
         */
 
315
        if (me->scheduled_task_cnt)
 
316
            wait = waiting_time(me);
 
317
        else if (me->idle_cnt > me->idle_max) {
 
318
            wait = me->idle_wait;
 
319
            elt->state = TH_PROBATION;
 
320
        }
 
321
        else
 
322
            wait = -1;
 
323
 
 
324
        apr_thread_mutex_unlock(me->lock);
 
325
        apr_thread_mutex_lock(me->cond_lock);
 
326
        if (wait >= 0) {
 
327
            rv = apr_thread_cond_timedwait(me->cond, me->cond_lock, wait);
 
328
        }
 
329
        else {
 
330
            rv = apr_thread_cond_wait(me->cond, me->cond_lock);
 
331
        }
 
332
        apr_thread_mutex_unlock(me->cond_lock);
 
333
        apr_thread_mutex_lock(me->lock);
 
334
    }
 
335
 
 
336
    /* idle thread been asked to stop, will be joined */
 
337
    --me->thd_cnt;
 
338
    apr_thread_mutex_unlock(me->lock);
 
339
    apr_thread_exit(t, APR_SUCCESS);
 
340
    return NULL;                /* should not be here, safe net */
 
341
}
 
342
 
 
343
static apr_status_t thread_pool_cleanup(void *me)
 
344
{
 
345
    apr_thread_pool_t *_self = me;
 
346
 
 
347
    _self->terminated = 1;
 
348
    apr_thread_pool_idle_max_set(_self, 0);
 
349
    while (_self->thd_cnt) {
 
350
        apr_sleep(20 * 1000);   /* spin lock with 20 ms */
 
351
    }
 
352
    apr_thread_mutex_destroy(_self->lock);
 
353
    apr_thread_mutex_destroy(_self->cond_lock);
 
354
    apr_thread_cond_destroy(_self->cond);
 
355
    return APR_SUCCESS;
 
356
}
 
357
 
 
358
APU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
 
359
                                                 apr_size_t init_threads,
 
360
                                                 apr_size_t max_threads,
 
361
                                                 apr_pool_t * pool)
 
362
{
 
363
    apr_thread_t *t;
 
364
    apr_status_t rv = APR_SUCCESS;
 
365
 
 
366
    *me = apr_pcalloc(pool, sizeof(**me));
 
367
    if (!*me) {
 
368
        return APR_ENOMEM;
 
369
    }
 
370
 
 
371
    (*me)->pool = pool;
 
372
 
 
373
    rv = thread_pool_construct(*me, init_threads, max_threads);
 
374
    if (APR_SUCCESS != rv) {
 
375
        *me = NULL;
 
376
        return rv;
 
377
    }
 
378
    apr_pool_cleanup_register(pool, *me, thread_pool_cleanup,
 
379
                              apr_pool_cleanup_null);
 
380
 
 
381
    while (init_threads) {
 
382
        rv = apr_thread_create(&t, NULL, thread_pool_func, *me, (*me)->pool);
 
383
        if (APR_SUCCESS != rv) {
 
384
            break;
 
385
        }
 
386
        ++(*me)->thd_cnt;
 
387
        if ((*me)->thd_cnt > (*me)->thd_high)
 
388
            (*me)->thd_high = (*me)->thd_cnt;
 
389
        --init_threads;
 
390
    }
 
391
 
 
392
    return rv;
 
393
}
 
394
 
 
395
APU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
 
396
{
 
397
    return apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup);
 
398
}
 
399
 
 
400
/*
 
401
 * NOTE: This function is not thread safe by itself. Caller should hold the lock
 
402
 */
 
403
static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
 
404
                                        apr_thread_start_t func,
 
405
                                        void *param, apr_byte_t priority,
 
406
                                        void *owner, apr_time_t time)
 
407
{
 
408
    apr_thread_pool_task_t *t;
 
409
 
 
410
    if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
 
411
        t = apr_pcalloc(me->pool, sizeof(*t));
 
412
        if (NULL == t) {
 
413
            return NULL;
 
414
        }
 
415
    }
 
416
    else {
 
417
        t = APR_RING_FIRST(me->recycled_tasks);
 
418
        APR_RING_REMOVE(t, link);
 
419
    }
 
420
 
 
421
    APR_RING_ELEM_INIT(t, link);
 
422
    t->func = func;
 
423
    t->param = param;
 
424
    t->owner = owner;
 
425
    if (time > 0) {
 
426
        t->dispatch.time = apr_time_now() + time;
 
427
    }
 
428
    else {
 
429
        t->dispatch.priority = priority;
 
430
    }
 
431
    return t;
 
432
}
 
433
 
 
434
/*
 
435
 * Test it the task is the only one within the priority segment. 
 
436
 * If it is not, return the first element with same or lower priority. 
 
437
 * Otherwise, add the task into the queue and return NULL.
 
438
 *
 
439
 * NOTE: This function is not thread safe by itself. Caller should hold the lock
 
440
 */
 
441
static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
 
442
                                            apr_thread_pool_task_t * const t)
 
443
{
 
444
    int seg;
 
445
    int next;
 
446
    apr_thread_pool_task_t *t_next;
 
447
 
 
448
    seg = TASK_PRIORITY_SEG(t);
 
449
    if (me->task_idx[seg]) {
 
450
        assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
 
451
               me->task_idx[seg]);
 
452
        t_next = me->task_idx[seg];
 
453
        while (t_next->dispatch.priority > t->dispatch.priority) {
 
454
            t_next = APR_RING_NEXT(t_next, link);
 
455
            if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
 
456
                t_next) {
 
457
                return t_next;
 
458
            }
 
459
        }
 
460
        return t_next;
 
461
    }
 
462
 
 
463
    for (next = seg - 1; next >= 0; next--) {
 
464
        if (me->task_idx[next]) {
 
465
            APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
 
466
            break;
 
467
        }
 
468
    }
 
469
    if (0 > next) {
 
470
        APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
 
471
    }
 
472
    me->task_idx[seg] = t;
 
473
    return NULL;
 
474
}
 
475
 
 
476
/*
 
477
*   schedule a task to run in "time" microseconds. Find the spot in the ring where
 
478
*   the time fits. Adjust the short_time so the thread wakes up when the time is reached.
 
479
*/
 
480
static apr_status_t schedule_task(apr_thread_pool_t *me,
 
481
                                  apr_thread_start_t func, void *param,
 
482
                                  void *owner, apr_interval_time_t time)
 
483
{
 
484
    apr_thread_pool_task_t *t;
 
485
    apr_thread_pool_task_t *t_loc;
 
486
    apr_thread_t *thd;
 
487
    apr_status_t rv = APR_SUCCESS;
 
488
    apr_thread_mutex_lock(me->lock);
 
489
 
 
490
    t = task_new(me, func, param, 0, owner, time);
 
491
    if (NULL == t) {
 
492
        apr_thread_mutex_unlock(me->lock);
 
493
        return APR_ENOMEM;
 
494
    }
 
495
    t_loc = APR_RING_FIRST(me->scheduled_tasks);
 
496
    while (NULL != t_loc) {
 
497
        /* if the time is less than the entry insert ahead of it */
 
498
        if (t->dispatch.time < t_loc->dispatch.time) {
 
499
            ++me->scheduled_task_cnt;
 
500
            APR_RING_INSERT_BEFORE(t_loc, t, link);
 
501
            break;
 
502
        }
 
503
        else {
 
504
            t_loc = APR_RING_NEXT(t_loc, link);
 
505
            if (t_loc ==
 
506
                APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
 
507
                                  link)) {
 
508
                ++me->scheduled_task_cnt;
 
509
                APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
 
510
                                     apr_thread_pool_task, link);
 
511
                break;
 
512
            }
 
513
        }
 
514
    }
 
515
    /* there should be at least one thread for scheduled tasks */
 
516
    if (0 == me->thd_cnt) {
 
517
        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
 
518
        if (APR_SUCCESS == rv) {
 
519
            ++me->thd_cnt;
 
520
            if (me->thd_cnt > me->thd_high)
 
521
                me->thd_high = me->thd_cnt;
 
522
        }
 
523
    }
 
524
    apr_thread_mutex_unlock(me->lock);
 
525
    apr_thread_mutex_lock(me->cond_lock);
 
526
    apr_thread_cond_signal(me->cond);
 
527
    apr_thread_mutex_unlock(me->cond_lock);
 
528
    return rv;
 
529
}
 
530
 
 
531
static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func,
 
532
                             void *param, apr_byte_t priority, int push,
 
533
                             void *owner)
 
534
{
 
535
    apr_thread_pool_task_t *t;
 
536
    apr_thread_pool_task_t *t_loc;
 
537
    apr_thread_t *thd;
 
538
    apr_status_t rv = APR_SUCCESS;
 
539
 
 
540
    apr_thread_mutex_lock(me->lock);
 
541
 
 
542
    t = task_new(me, func, param, priority, owner, 0);
 
543
    if (NULL == t) {
 
544
        apr_thread_mutex_unlock(me->lock);
 
545
        return APR_ENOMEM;
 
546
    }
 
547
 
 
548
    t_loc = add_if_empty(me, t);
 
549
    if (NULL == t_loc) {
 
550
        goto FINAL_EXIT;
 
551
    }
 
552
 
 
553
    if (push) {
 
554
        while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
 
555
               t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
 
556
            t_loc = APR_RING_NEXT(t_loc, link);
 
557
        }
 
558
    }
 
559
    APR_RING_INSERT_BEFORE(t_loc, t, link);
 
560
    if (!push) {
 
561
        if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
 
562
            me->task_idx[TASK_PRIORITY_SEG(t)] = t;
 
563
        }
 
564
    }
 
565
 
 
566
  FINAL_EXIT:
 
567
    me->task_cnt++;
 
568
    if (me->task_cnt > me->tasks_high)
 
569
        me->tasks_high = me->task_cnt;
 
570
    if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
 
571
                             me->task_cnt > me->threshold)) {
 
572
        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
 
573
        if (APR_SUCCESS == rv) {
 
574
            ++me->thd_cnt;
 
575
            if (me->thd_cnt > me->thd_high)
 
576
                me->thd_high = me->thd_cnt;
 
577
        }
 
578
    }
 
579
    apr_thread_mutex_unlock(me->lock);
 
580
 
 
581
    apr_thread_mutex_lock(me->cond_lock);
 
582
    apr_thread_cond_signal(me->cond);
 
583
    apr_thread_mutex_unlock(me->cond_lock);
 
584
 
 
585
    return rv;
 
586
}
 
587
 
 
588
APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me,
 
589
                                               apr_thread_start_t func,
 
590
                                               void *param,
 
591
                                               apr_byte_t priority,
 
592
                                               void *owner)
 
593
{
 
594
    return add_task(me, func, param, priority, 1, owner);
 
595
}
 
596
 
 
597
APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me,
 
598
                                                   apr_thread_start_t func,
 
599
                                                   void *param,
 
600
                                                   apr_interval_time_t time,
 
601
                                                   void *owner)
 
602
{
 
603
    return schedule_task(me, func, param, owner, time);
 
604
}
 
605
 
 
606
APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me,
 
607
                                              apr_thread_start_t func,
 
608
                                              void *param,
 
609
                                              apr_byte_t priority,
 
610
                                              void *owner)
 
611
{
 
612
    return add_task(me, func, param, priority, 0, owner);
 
613
}
 
614
 
 
615
static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me,
 
616
                                           void *owner)
 
617
{
 
618
    apr_thread_pool_task_t *t_loc;
 
619
    apr_thread_pool_task_t *next;
 
620
 
 
621
    t_loc = APR_RING_FIRST(me->scheduled_tasks);
 
622
    while (t_loc !=
 
623
           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
 
624
                             link)) {
 
625
        next = APR_RING_NEXT(t_loc, link);
 
626
        /* if this is the owner remove it */
 
627
        if (t_loc->owner == owner) {
 
628
            --me->scheduled_task_cnt;
 
629
            APR_RING_REMOVE(t_loc, link);
 
630
        }
 
631
        t_loc = next;
 
632
    }
 
633
    return APR_SUCCESS;
 
634
}
 
635
 
 
636
static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner)
 
637
{
 
638
    apr_thread_pool_task_t *t_loc;
 
639
    apr_thread_pool_task_t *next;
 
640
    int seg;
 
641
 
 
642
    t_loc = APR_RING_FIRST(me->tasks);
 
643
    while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
 
644
        next = APR_RING_NEXT(t_loc, link);
 
645
        if (t_loc->owner == owner) {
 
646
            --me->task_cnt;
 
647
            seg = TASK_PRIORITY_SEG(t_loc);
 
648
            if (t_loc == me->task_idx[seg]) {
 
649
                me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
 
650
                if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
 
651
                                                           apr_thread_pool_task,
 
652
                                                           link)
 
653
                    || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
 
654
                    me->task_idx[seg] = NULL;
 
655
                }
 
656
            }
 
657
            APR_RING_REMOVE(t_loc, link);
 
658
        }
 
659
        t_loc = next;
 
660
    }
 
661
    return APR_SUCCESS;
 
662
}
 
663
 
 
664
static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner)
 
665
{
 
666
#ifndef NDEBUG
 
667
    apr_os_thread_t *os_thread;
 
668
#endif
 
669
    struct apr_thread_list_elt *elt;
 
670
    apr_thread_mutex_lock(me->lock);
 
671
    elt = APR_RING_FIRST(me->busy_thds);
 
672
    while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
 
673
        if (elt->current_owner != owner) {
 
674
            elt = APR_RING_NEXT(elt, link);
 
675
            continue;
 
676
        }
 
677
#ifndef NDEBUG
 
678
        /* make sure the thread is not the one calling tasks_cancel */
 
679
        apr_os_thread_get(&os_thread, elt->thd);
 
680
#ifdef WIN32
 
681
        /* hack for apr win32 bug */
 
682
        assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
 
683
#else
 
684
        assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
 
685
#endif
 
686
#endif
 
687
        while (elt->current_owner == owner) {
 
688
            apr_thread_mutex_unlock(me->lock);
 
689
            apr_sleep(200 * 1000);
 
690
            apr_thread_mutex_lock(me->lock);
 
691
        }
 
692
        elt = APR_RING_FIRST(me->busy_thds);
 
693
    }
 
694
    apr_thread_mutex_unlock(me->lock);
 
695
    return;
 
696
}
 
697
 
 
698
APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me,
 
699
                                                       void *owner)
 
700
{
 
701
    apr_status_t rv = APR_SUCCESS;
 
702
 
 
703
    apr_thread_mutex_lock(me->lock);
 
704
    if (me->task_cnt > 0) {
 
705
        rv = remove_tasks(me, owner);
 
706
    }
 
707
    if (me->scheduled_task_cnt > 0) {
 
708
        rv = remove_scheduled_tasks(me, owner);
 
709
    }
 
710
    apr_thread_mutex_unlock(me->lock);
 
711
    wait_on_busy_threads(me, owner);
 
712
 
 
713
    return rv;
 
714
}
 
715
 
 
716
APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me)
 
717
{
 
718
    return me->task_cnt;
 
719
}
 
720
 
 
721
APU_DECLARE(apr_size_t)
 
722
    apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me)
 
723
{
 
724
    return me->scheduled_task_cnt;
 
725
}
 
726
 
 
727
APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me)
 
728
{
 
729
    return me->thd_cnt;
 
730
}
 
731
 
 
732
APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me)
 
733
{
 
734
    return me->thd_cnt - me->idle_cnt;
 
735
}
 
736
 
 
737
APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me)
 
738
{
 
739
    return me->idle_cnt;
 
740
}
 
741
 
 
742
APU_DECLARE(apr_size_t)
 
743
    apr_thread_pool_tasks_run_count(apr_thread_pool_t * me)
 
744
{
 
745
    return me->tasks_run;
 
746
}
 
747
 
 
748
APU_DECLARE(apr_size_t)
 
749
    apr_thread_pool_tasks_high_count(apr_thread_pool_t * me)
 
750
{
 
751
    return me->tasks_high;
 
752
}
 
753
 
 
754
APU_DECLARE(apr_size_t)
 
755
    apr_thread_pool_threads_high_count(apr_thread_pool_t * me)
 
756
{
 
757
    return me->thd_high;
 
758
}
 
759
 
 
760
APU_DECLARE(apr_size_t)
 
761
    apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me)
 
762
{
 
763
    return me->thd_timed_out;
 
764
}
 
765
 
 
766
 
 
767
APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me)
 
768
{
 
769
    return me->idle_max;
 
770
}
 
771
 
 
772
APU_DECLARE(apr_interval_time_t)
 
773
    apr_thread_pool_idle_wait_get(apr_thread_pool_t * me)
 
774
{
 
775
    return me->idle_wait;
 
776
}
 
777
 
 
778
/*
 
779
 * This function stop extra idle threads to the cnt.
 
780
 * @return the number of threads stopped
 
781
 * NOTE: There could be busy threads become idle during this function
 
782
 */
 
783
static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me,
 
784
                                                apr_size_t *cnt, int idle)
 
785
{
 
786
    struct apr_thread_list *thds;
 
787
    apr_size_t n, n_dbg, i;
 
788
    struct apr_thread_list_elt *head, *tail, *elt;
 
789
 
 
790
    apr_thread_mutex_lock(me->lock);
 
791
    if (idle) {
 
792
        thds = me->idle_thds;
 
793
        n = me->idle_cnt;
 
794
    }
 
795
    else {
 
796
        thds = me->busy_thds;
 
797
        n = me->thd_cnt - me->idle_cnt;
 
798
    }
 
799
    if (n <= *cnt) {
 
800
        apr_thread_mutex_unlock(me->lock);
 
801
        *cnt = 0;
 
802
        return NULL;
 
803
    }
 
804
    n -= *cnt;
 
805
 
 
806
    head = APR_RING_FIRST(thds);
 
807
    for (i = 0; i < *cnt; i++) {
 
808
        head = APR_RING_NEXT(head, link);
 
809
    }
 
810
    tail = APR_RING_LAST(thds);
 
811
    if (idle) {
 
812
        APR_RING_UNSPLICE(head, tail, link);
 
813
        me->idle_cnt = *cnt;
 
814
    }
 
815
 
 
816
    n_dbg = 0;
 
817
    for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
 
818
        elt->state = TH_STOP;
 
819
        n_dbg++;
 
820
    }
 
821
    elt->state = TH_STOP;
 
822
    n_dbg++;
 
823
    assert(n == n_dbg);
 
824
    *cnt = n;
 
825
 
 
826
    apr_thread_mutex_unlock(me->lock);
 
827
 
 
828
    APR_RING_PREV(head, link) = NULL;
 
829
    APR_RING_NEXT(tail, link) = NULL;
 
830
    return head;
 
831
}
 
832
 
 
833
static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt)
 
834
{
 
835
    apr_size_t n_dbg;
 
836
    struct apr_thread_list_elt *elt, *head, *tail;
 
837
    apr_status_t rv;
 
838
 
 
839
    elt = trim_threads(me, &cnt, 1);
 
840
 
 
841
    apr_thread_mutex_lock(me->cond_lock);
 
842
    apr_thread_cond_broadcast(me->cond);
 
843
    apr_thread_mutex_unlock(me->cond_lock);
 
844
 
 
845
    n_dbg = 0;
 
846
    if (NULL != (head = elt)) {
 
847
        while (elt) {
 
848
            tail = elt;
 
849
            apr_thread_join(&rv, elt->thd);
 
850
            elt = APR_RING_NEXT(elt, link);
 
851
            ++n_dbg;
 
852
        }
 
853
        apr_thread_mutex_lock(me->lock);
 
854
        APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail,
 
855
                             apr_thread_list_elt, link);
 
856
        apr_thread_mutex_unlock(me->lock);
 
857
    }
 
858
    assert(cnt == n_dbg);
 
859
 
 
860
    return cnt;
 
861
}
 
862
 
 
863
/* don't join on busy threads for performance reasons, who knows how long will
 
864
 * the task takes to perform
 
865
 */
 
866
static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt)
 
867
{
 
868
    trim_threads(me, &cnt, 0);
 
869
    return cnt;
 
870
}
 
871
 
 
872
APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me,
 
873
                                                     apr_size_t cnt)
 
874
{
 
875
    me->idle_max = cnt;
 
876
    cnt = trim_idle_threads(me, cnt);
 
877
    return cnt;
 
878
}
 
879
 
 
880
APU_DECLARE(apr_interval_time_t)
 
881
    apr_thread_pool_idle_wait_set(apr_thread_pool_t * me,
 
882
                                  apr_interval_time_t timeout)
 
883
{
 
884
    apr_interval_time_t oldtime;
 
885
 
 
886
    oldtime = me->idle_wait;
 
887
    me->idle_wait = timeout;
 
888
 
 
889
    return oldtime;
 
890
}
 
891
 
 
892
APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me)
 
893
{
 
894
    return me->thd_max;
 
895
}
 
896
 
 
897
/*
 
898
 * This function stop extra working threads to the new limit.
 
899
 * NOTE: There could be busy threads become idle during this function
 
900
 */
 
901
APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me,
 
902
                                                       apr_size_t cnt)
 
903
{
 
904
    unsigned int n;
 
905
 
 
906
    me->thd_max = cnt;
 
907
    if (0 == cnt || me->thd_cnt <= cnt) {
 
908
        return 0;
 
909
    }
 
910
 
 
911
    n = me->thd_cnt - cnt;
 
912
    if (n >= me->idle_cnt) {
 
913
        trim_busy_threads(me, n - me->idle_cnt);
 
914
        trim_idle_threads(me, 0);
 
915
    }
 
916
    else {
 
917
        trim_idle_threads(me, me->idle_cnt - n);
 
918
    }
 
919
    return n;
 
920
}
 
921
 
 
922
APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me)
 
923
{
 
924
    return me->threshold;
 
925
}
 
926
 
 
927
APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me,
 
928
                                                      apr_size_t val)
 
929
{
 
930
    apr_size_t ov;
 
931
 
 
932
    ov = me->threshold;
 
933
    me->threshold = val;
 
934
    return ov;
 
935
}
 
936
 
 
937
APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd,
 
938
                                                         void **owner)
 
939
{
 
940
    apr_status_t rv;
 
941
    apr_thread_pool_task_t *task;
 
942
    void *data;
 
943
 
 
944
    rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd);
 
945
    if (rv != APR_SUCCESS) {
 
946
        return rv;
 
947
    }
 
948
 
 
949
    task = data;
 
950
    if (!task) {
 
951
        *owner = NULL;
 
952
        return APR_BADARG;
 
953
    }
 
954
 
 
955
    *owner = task->owner;
 
956
    return APR_SUCCESS;
 
957
}
 
958
 
 
959
#endif /* APR_HAS_THREADS */
 
960
 
 
961
/* vim: set ts=4 sw=4 et cin tw=80: */