~ubuntu-branches/ubuntu/feisty/apache2/feisty

« back to all changes in this revision

Viewing changes to srclib/apr-util/misc/apr_queue.c

  • Committer: Bazaar Package Importer
  • Author(s): Andreas Barth
  • Date: 2006-12-09 21:05:45 UTC
  • mfrom: (0.6.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20061209210545-h70s0xaqc2v8vqr2
Tags: 2.2.3-3.2
* Non-maintainer upload.
* 043_ajp_connection_reuse: Patch from upstream Bugzilla, fixing a critical
  issue with regard to connection reuse in mod_proxy_ajp.
  Closes: #396265

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright 2000-2005 The Apache Software Foundation or its licensors, as
 
2
 * applicable.
 
3
 *
 
4
 * Licensed under the Apache License, Version 2.0 (the "License");
 
5
 * you may not use this file except in compliance with the License.
 
6
 * You may obtain a copy of the License at
 
7
 *
 
8
 *     http://www.apache.org/licenses/LICENSE-2.0
 
9
 *
 
10
 * Unless required by applicable law or agreed to in writing, software
 
11
 * distributed under the License is distributed on an "AS IS" BASIS,
 
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
 * See the License for the specific language governing permissions and
 
14
 * limitations under the License.
 
15
 */
 
16
 
 
17
#include "apr.h"
 
18
 
 
19
#if APR_HAVE_STDIO_H
 
20
#include <stdio.h>
 
21
#endif
 
22
#if APR_HAVE_STDLIB_H
 
23
#include <stdlib.h>
 
24
#endif
 
25
#if APR_HAVE_UNISTD_H
 
26
#include <unistd.h>
 
27
#endif
 
28
 
 
29
#include "apu.h"
 
30
#include "apr_portable.h"
 
31
#include "apr_thread_mutex.h"
 
32
#include "apr_thread_cond.h"
 
33
#include "apr_errno.h"
 
34
#include "apr_queue.h"
 
35
 
 
36
#if APR_HAS_THREADS
 
37
/* 
 
38
 * define this to get debug messages
 
39
 *
 
40
#define QUEUE_DEBUG
 
41
 */
 
42
 
 
43
struct apr_queue_t {
 
44
    void              **data;
 
45
    unsigned int        nelts; /**< # elements */
 
46
    unsigned int        in;    /**< next empty location */
 
47
    unsigned int        out;   /**< next filled location */
 
48
    unsigned int        bounds;/**< max size of queue */
 
49
    unsigned int        full_waiters;
 
50
    unsigned int        empty_waiters;
 
51
    apr_thread_mutex_t *one_big_mutex;
 
52
    apr_thread_cond_t  *not_empty;
 
53
    apr_thread_cond_t  *not_full;
 
54
    int                 terminated;
 
55
};
 
56
 
 
57
#ifdef QUEUE_DEBUG
 
58
static void Q_DBG(char*msg, apr_queue_t *q) {
 
59
    fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n", 
 
60
                    apr_os_thread_current(),
 
61
                    q->nelts, q->in, q->out,
 
62
                    msg
 
63
                    );
 
64
}
 
65
#else
 
66
#define Q_DBG(x,y) 
 
67
#endif
 
68
 
 
69
/**
 
70
 * Detects when the apr_queue_t is full. This utility function is expected
 
71
 * to be called from within critical sections, and is not threadsafe.
 
72
 */
 
73
#define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds)
 
74
 
 
75
/**
 
76
 * Detects when the apr_queue_t is empty. This utility function is expected
 
77
 * to be called from within critical sections, and is not threadsafe.
 
78
 */
 
79
#define apr_queue_empty(queue) ((queue)->nelts == 0)
 
80
 
 
81
/**
 
82
 * Callback routine that is called to destroy this
 
83
 * apr_queue_t when its pool is destroyed.
 
84
 */
 
85
static apr_status_t queue_destroy(void *data) 
 
86
{
 
87
    apr_queue_t *queue = data;
 
88
 
 
89
    /* Ignore errors here, we can't do anything about them anyway. */
 
90
 
 
91
    apr_thread_cond_destroy(queue->not_empty);
 
92
    apr_thread_cond_destroy(queue->not_full);
 
93
    apr_thread_mutex_destroy(queue->one_big_mutex);
 
94
 
 
95
    return APR_SUCCESS;
 
96
}
 
97
 
 
98
/**
 
99
 * Initialize the apr_queue_t.
 
100
 */
 
101
APU_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q, 
 
102
                                           unsigned int queue_capacity, 
 
103
                                           apr_pool_t *a)
 
104
{
 
105
    apr_status_t rv;
 
106
    apr_queue_t *queue;
 
107
    queue = apr_palloc(a, sizeof(apr_queue_t));
 
108
    *q = queue;
 
109
 
 
110
    /* nested doesn't work ;( */
 
111
    rv = apr_thread_mutex_create(&queue->one_big_mutex,
 
112
                                 APR_THREAD_MUTEX_UNNESTED,
 
113
                                 a);
 
114
    if (rv != APR_SUCCESS) {
 
115
        return rv;
 
116
    }
 
117
 
 
118
    rv = apr_thread_cond_create(&queue->not_empty, a);
 
119
    if (rv != APR_SUCCESS) {
 
120
        return rv;
 
121
    }
 
122
 
 
123
    rv = apr_thread_cond_create(&queue->not_full, a);
 
124
    if (rv != APR_SUCCESS) {
 
125
        return rv;
 
126
    }
 
127
 
 
128
    /* Set all the data in the queue to NULL */
 
129
    queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*));
 
130
    queue->bounds = queue_capacity;
 
131
    queue->nelts = 0;
 
132
    queue->in = 0;
 
133
    queue->out = 0;
 
134
    queue->terminated = 0;
 
135
    queue->full_waiters = 0;
 
136
    queue->empty_waiters = 0;
 
137
 
 
138
    apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null);
 
139
 
 
140
    return APR_SUCCESS;
 
141
}
 
142
 
 
143
/**
 
144
 * Push new data onto the queue. Blocks if the queue is full. Once
 
145
 * the push operation has completed, it signals other threads waiting
 
146
 * in apr_queue_pop() that they may continue consuming sockets.
 
147
 */
 
148
APU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
 
149
{
 
150
    apr_status_t rv;
 
151
 
 
152
    if (queue->terminated) {
 
153
        return APR_EOF; /* no more elements ever again */
 
154
    }
 
155
 
 
156
    rv = apr_thread_mutex_lock(queue->one_big_mutex);
 
157
    if (rv != APR_SUCCESS) {
 
158
        return rv;
 
159
    }
 
160
 
 
161
    if (apr_queue_full(queue)) {
 
162
        if (!queue->terminated) {
 
163
            queue->full_waiters++;
 
164
            rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
 
165
            queue->full_waiters--;
 
166
            if (rv != APR_SUCCESS) {
 
167
                apr_thread_mutex_unlock(queue->one_big_mutex);
 
168
                return rv;
 
169
            }
 
170
        }
 
171
        /* If we wake up and it's still empty, then we were interrupted */
 
172
        if (apr_queue_full(queue)) {
 
173
            Q_DBG("queue full (intr)", queue);
 
174
            rv = apr_thread_mutex_unlock(queue->one_big_mutex);
 
175
            if (rv != APR_SUCCESS) {
 
176
                return rv;
 
177
            }
 
178
            if (queue->terminated) {
 
179
                return APR_EOF; /* no more elements ever again */
 
180
            }
 
181
            else {
 
182
                return APR_EINTR;
 
183
            }
 
184
        }
 
185
    }
 
186
 
 
187
    queue->data[queue->in] = data;
 
188
    queue->in = (queue->in + 1) % queue->bounds;
 
189
    queue->nelts++;
 
190
 
 
191
    if (queue->empty_waiters) {
 
192
        Q_DBG("sig !empty", queue);
 
193
        rv = apr_thread_cond_signal(queue->not_empty);
 
194
        if (rv != APR_SUCCESS) {
 
195
            apr_thread_mutex_unlock(queue->one_big_mutex);
 
196
            return rv;
 
197
        }
 
198
    }
 
199
 
 
200
    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
 
201
    return rv;
 
202
}
 
203
 
 
204
/**
 
205
 * Push new data onto the queue. Blocks if the queue is full. Once
 
206
 * the push operation has completed, it signals other threads waiting
 
207
 * in apr_queue_pop() that they may continue consuming sockets.
 
208
 */
 
209
APU_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data)
 
210
{
 
211
    apr_status_t rv;
 
212
 
 
213
    if (queue->terminated) {
 
214
        return APR_EOF; /* no more elements ever again */
 
215
    }
 
216
 
 
217
    rv = apr_thread_mutex_lock(queue->one_big_mutex);
 
218
    if (rv != APR_SUCCESS) {
 
219
        return rv;
 
220
    }
 
221
 
 
222
    if (apr_queue_full(queue)) {
 
223
        rv = apr_thread_mutex_unlock(queue->one_big_mutex);
 
224
        return APR_EAGAIN;
 
225
    }
 
226
    
 
227
    queue->data[queue->in] = data;
 
228
    queue->in = (queue->in + 1) % queue->bounds;
 
229
    queue->nelts++;
 
230
 
 
231
    if (queue->empty_waiters) {
 
232
        Q_DBG("sig !empty", queue);
 
233
        rv  = apr_thread_cond_signal(queue->not_empty);
 
234
        if (rv != APR_SUCCESS) {
 
235
            apr_thread_mutex_unlock(queue->one_big_mutex);
 
236
            return rv;
 
237
        }
 
238
    }
 
239
 
 
240
    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
 
241
    return rv;
 
242
}
 
243
 
 
244
/**
 
245
 * not thread safe
 
246
 */
 
247
APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) {
 
248
    return queue->nelts;
 
249
}
 
250
 
 
251
/**
 
252
 * Retrieves the next item from the queue. If there are no
 
253
 * items available, it will block until one becomes available.
 
254
 * Once retrieved, the item is placed into the address specified by
 
255
 * 'data'.
 
256
 */
 
257
APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
 
258
{
 
259
    apr_status_t rv;
 
260
 
 
261
    if (queue->terminated) {
 
262
        return APR_EOF; /* no more elements ever again */
 
263
    }
 
264
 
 
265
    rv = apr_thread_mutex_lock(queue->one_big_mutex);
 
266
    if (rv != APR_SUCCESS) {
 
267
        return rv;
 
268
    }
 
269
 
 
270
    /* Keep waiting until we wake up and find that the queue is not empty. */
 
271
    if (apr_queue_empty(queue)) {
 
272
        if (!queue->terminated) {
 
273
            queue->empty_waiters++;
 
274
            rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
 
275
            queue->empty_waiters--;
 
276
            if (rv != APR_SUCCESS) {
 
277
                apr_thread_mutex_unlock(queue->one_big_mutex);
 
278
                return rv;
 
279
            }
 
280
        }
 
281
        /* If we wake up and it's still empty, then we were interrupted */
 
282
        if (apr_queue_empty(queue)) {
 
283
            Q_DBG("queue empty (intr)", queue);
 
284
            rv = apr_thread_mutex_unlock(queue->one_big_mutex);
 
285
            if (rv != APR_SUCCESS) {
 
286
                return rv;
 
287
            }
 
288
            if (queue->terminated) {
 
289
                return APR_EOF; /* no more elements ever again */
 
290
            }
 
291
            else {
 
292
                return APR_EINTR;
 
293
            }
 
294
        }
 
295
    } 
 
296
 
 
297
    *data = queue->data[queue->out];
 
298
    queue->nelts--;
 
299
 
 
300
    queue->out = (queue->out + 1) % queue->bounds;
 
301
    if (queue->full_waiters) {
 
302
        Q_DBG("signal !full", queue);
 
303
        rv = apr_thread_cond_signal(queue->not_full);
 
304
        if (rv != APR_SUCCESS) {
 
305
            apr_thread_mutex_unlock(queue->one_big_mutex);
 
306
            return rv;
 
307
        }
 
308
    }
 
309
 
 
310
    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
 
311
    return rv;
 
312
}
 
313
 
 
314
/**
 
315
 * Retrieves the next item from the queue. If there are no
 
316
 * items available, return APR_EAGAIN.  Once retrieved,
 
317
 * the item is placed into the address specified by 'data'.
 
318
 */
 
319
APU_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
 
320
{
 
321
    apr_status_t rv;
 
322
 
 
323
    if (queue->terminated) {
 
324
        return APR_EOF; /* no more elements ever again */
 
325
    }
 
326
 
 
327
    rv = apr_thread_mutex_lock(queue->one_big_mutex);
 
328
    if (rv != APR_SUCCESS) {
 
329
        return rv;
 
330
    }
 
331
 
 
332
    if (apr_queue_empty(queue)) {
 
333
        rv = apr_thread_mutex_unlock(queue->one_big_mutex);
 
334
        return APR_EAGAIN;
 
335
    } 
 
336
 
 
337
    *data = queue->data[queue->out];
 
338
    queue->nelts--;
 
339
 
 
340
    queue->out = (queue->out + 1) % queue->bounds;
 
341
    if (queue->full_waiters) {
 
342
        Q_DBG("signal !full", queue);
 
343
        rv = apr_thread_cond_signal(queue->not_full);
 
344
        if (rv != APR_SUCCESS) {
 
345
            apr_thread_mutex_unlock(queue->one_big_mutex);
 
346
            return rv;
 
347
        }
 
348
    }
 
349
 
 
350
    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
 
351
    return rv;
 
352
}
 
353
 
 
354
APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue)
 
355
{
 
356
    apr_status_t rv;
 
357
    Q_DBG("intr all", queue);    
 
358
    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
 
359
        return rv;
 
360
    }
 
361
    apr_thread_cond_broadcast(queue->not_empty);
 
362
    apr_thread_cond_broadcast(queue->not_full);
 
363
 
 
364
    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
 
365
        return rv;
 
366
    }
 
367
 
 
368
    return APR_SUCCESS;
 
369
}
 
370
 
 
371
APU_DECLARE(apr_status_t) apr_queue_term(apr_queue_t *queue)
 
372
{
 
373
    apr_status_t rv;
 
374
 
 
375
    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
 
376
        return rv;
 
377
    }
 
378
 
 
379
    /* we must hold one_big_mutex when setting this... otherwise,
 
380
     * we could end up setting it and waking everybody up just after a 
 
381
     * would-be popper checks it but right before they block
 
382
     */
 
383
    queue->terminated = 1;
 
384
    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
 
385
        return rv;
 
386
    }
 
387
    return apr_queue_interrupt_all(queue);
 
388
}
 
389
 
 
390
#endif /* APR_HAS_THREADS */