1
/* Copyright 2000-2005 The Apache Software Foundation or its licensors, as
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
* you may not use this file except in compliance with the License.
6
* You may obtain a copy of the License at
8
* http://www.apache.org/licenses/LICENSE-2.0
10
* Unless required by applicable law or agreed to in writing, software
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
* See the License for the specific language governing permissions and
14
* limitations under the License.
30
#include "apr_portable.h"
31
#include "apr_thread_mutex.h"
32
#include "apr_thread_cond.h"
33
#include "apr_errno.h"
34
#include "apr_queue.h"
38
* define this to get debug messages
45
unsigned int nelts; /**< # elements */
46
unsigned int in; /**< next empty location */
47
unsigned int out; /**< next filled location */
48
unsigned int bounds;/**< max size of queue */
49
unsigned int full_waiters;
50
unsigned int empty_waiters;
51
apr_thread_mutex_t *one_big_mutex;
52
apr_thread_cond_t *not_empty;
53
apr_thread_cond_t *not_full;
58
static void Q_DBG(char*msg, apr_queue_t *q) {
59
fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n",
60
apr_os_thread_current(),
61
q->nelts, q->in, q->out,
70
* Detects when the apr_queue_t is full. This utility function is expected
71
* to be called from within critical sections, and is not threadsafe.
73
#define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds)
76
* Detects when the apr_queue_t is empty. This utility function is expected
77
* to be called from within critical sections, and is not threadsafe.
79
#define apr_queue_empty(queue) ((queue)->nelts == 0)
82
* Callback routine that is called to destroy this
83
* apr_queue_t when its pool is destroyed.
85
static apr_status_t queue_destroy(void *data)
87
apr_queue_t *queue = data;
89
/* Ignore errors here, we can't do anything about them anyway. */
91
apr_thread_cond_destroy(queue->not_empty);
92
apr_thread_cond_destroy(queue->not_full);
93
apr_thread_mutex_destroy(queue->one_big_mutex);
99
* Initialize the apr_queue_t.
101
APU_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q,
102
unsigned int queue_capacity,
107
queue = apr_palloc(a, sizeof(apr_queue_t));
110
/* nested doesn't work ;( */
111
rv = apr_thread_mutex_create(&queue->one_big_mutex,
112
APR_THREAD_MUTEX_UNNESTED,
114
if (rv != APR_SUCCESS) {
118
rv = apr_thread_cond_create(&queue->not_empty, a);
119
if (rv != APR_SUCCESS) {
123
rv = apr_thread_cond_create(&queue->not_full, a);
124
if (rv != APR_SUCCESS) {
128
/* Set all the data in the queue to NULL */
129
queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*));
130
queue->bounds = queue_capacity;
134
queue->terminated = 0;
135
queue->full_waiters = 0;
136
queue->empty_waiters = 0;
138
apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null);
144
* Push new data onto the queue. Blocks if the queue is full. Once
145
* the push operation has completed, it signals other threads waiting
146
* in apr_queue_pop() that they may continue consuming sockets.
148
APU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
152
if (queue->terminated) {
153
return APR_EOF; /* no more elements ever again */
156
rv = apr_thread_mutex_lock(queue->one_big_mutex);
157
if (rv != APR_SUCCESS) {
161
if (apr_queue_full(queue)) {
162
if (!queue->terminated) {
163
queue->full_waiters++;
164
rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
165
queue->full_waiters--;
166
if (rv != APR_SUCCESS) {
167
apr_thread_mutex_unlock(queue->one_big_mutex);
171
/* If we wake up and it's still empty, then we were interrupted */
172
if (apr_queue_full(queue)) {
173
Q_DBG("queue full (intr)", queue);
174
rv = apr_thread_mutex_unlock(queue->one_big_mutex);
175
if (rv != APR_SUCCESS) {
178
if (queue->terminated) {
179
return APR_EOF; /* no more elements ever again */
187
queue->data[queue->in] = data;
188
queue->in = (queue->in + 1) % queue->bounds;
191
if (queue->empty_waiters) {
192
Q_DBG("sig !empty", queue);
193
rv = apr_thread_cond_signal(queue->not_empty);
194
if (rv != APR_SUCCESS) {
195
apr_thread_mutex_unlock(queue->one_big_mutex);
200
rv = apr_thread_mutex_unlock(queue->one_big_mutex);
205
* Push new data onto the queue. Blocks if the queue is full. Once
206
* the push operation has completed, it signals other threads waiting
207
* in apr_queue_pop() that they may continue consuming sockets.
209
APU_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data)
213
if (queue->terminated) {
214
return APR_EOF; /* no more elements ever again */
217
rv = apr_thread_mutex_lock(queue->one_big_mutex);
218
if (rv != APR_SUCCESS) {
222
if (apr_queue_full(queue)) {
223
rv = apr_thread_mutex_unlock(queue->one_big_mutex);
227
queue->data[queue->in] = data;
228
queue->in = (queue->in + 1) % queue->bounds;
231
if (queue->empty_waiters) {
232
Q_DBG("sig !empty", queue);
233
rv = apr_thread_cond_signal(queue->not_empty);
234
if (rv != APR_SUCCESS) {
235
apr_thread_mutex_unlock(queue->one_big_mutex);
240
rv = apr_thread_mutex_unlock(queue->one_big_mutex);
247
APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) {
252
* Retrieves the next item from the queue. If there are no
253
* items available, it will block until one becomes available.
254
* Once retrieved, the item is placed into the address specified by
257
APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
261
if (queue->terminated) {
262
return APR_EOF; /* no more elements ever again */
265
rv = apr_thread_mutex_lock(queue->one_big_mutex);
266
if (rv != APR_SUCCESS) {
270
/* Keep waiting until we wake up and find that the queue is not empty. */
271
if (apr_queue_empty(queue)) {
272
if (!queue->terminated) {
273
queue->empty_waiters++;
274
rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
275
queue->empty_waiters--;
276
if (rv != APR_SUCCESS) {
277
apr_thread_mutex_unlock(queue->one_big_mutex);
281
/* If we wake up and it's still empty, then we were interrupted */
282
if (apr_queue_empty(queue)) {
283
Q_DBG("queue empty (intr)", queue);
284
rv = apr_thread_mutex_unlock(queue->one_big_mutex);
285
if (rv != APR_SUCCESS) {
288
if (queue->terminated) {
289
return APR_EOF; /* no more elements ever again */
297
*data = queue->data[queue->out];
300
queue->out = (queue->out + 1) % queue->bounds;
301
if (queue->full_waiters) {
302
Q_DBG("signal !full", queue);
303
rv = apr_thread_cond_signal(queue->not_full);
304
if (rv != APR_SUCCESS) {
305
apr_thread_mutex_unlock(queue->one_big_mutex);
310
rv = apr_thread_mutex_unlock(queue->one_big_mutex);
315
* Retrieves the next item from the queue. If there are no
316
* items available, return APR_EAGAIN. Once retrieved,
317
* the item is placed into the address specified by 'data'.
319
APU_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
323
if (queue->terminated) {
324
return APR_EOF; /* no more elements ever again */
327
rv = apr_thread_mutex_lock(queue->one_big_mutex);
328
if (rv != APR_SUCCESS) {
332
if (apr_queue_empty(queue)) {
333
rv = apr_thread_mutex_unlock(queue->one_big_mutex);
337
*data = queue->data[queue->out];
340
queue->out = (queue->out + 1) % queue->bounds;
341
if (queue->full_waiters) {
342
Q_DBG("signal !full", queue);
343
rv = apr_thread_cond_signal(queue->not_full);
344
if (rv != APR_SUCCESS) {
345
apr_thread_mutex_unlock(queue->one_big_mutex);
350
rv = apr_thread_mutex_unlock(queue->one_big_mutex);
354
APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue)
357
Q_DBG("intr all", queue);
358
if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
361
apr_thread_cond_broadcast(queue->not_empty);
362
apr_thread_cond_broadcast(queue->not_full);
364
if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
371
APU_DECLARE(apr_status_t) apr_queue_term(apr_queue_t *queue)
375
if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
379
/* we must hold one_big_mutex when setting this... otherwise,
380
* we could end up setting it and waking everybody up just after a
381
* would-be popper checks it but right before they block
383
queue->terminated = 1;
384
if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
387
return apr_queue_interrupt_all(queue);
390
#endif /* APR_HAS_THREADS */