~clint-fewbar/ubuntu/precise/squid3/ignore-sighup-early

« back to all changes in this revision

Viewing changes to src/DiskIO/DiskThreads/aiops_win32.cc

  • Committer: Bazaar Package Importer
  • Author(s): Luigi Gangitano
  • Date: 2006-11-11 10:32:06 UTC
  • Revision ID: james.westby@ubuntu.com-20061111103206-f3p0r9g0vq44rp3r
Tags: upstream-3.0.PRE5
ImportĀ upstreamĀ versionĀ 3.0.PRE5

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * $Id: aiops_win32.cc,v 1.2 2006/09/09 15:29:59 serassio Exp $
 
3
 *
 
4
 * DEBUG: section 43    Windows AIOPS
 
5
 * AUTHOR: Stewart Forster <slf@connect.com.au>
 
6
 * AUTHOR: Robert Collins <robertc@squid-cache.org>
 
7
 * AUTHOR: Guido Serassio <serassio@squid-cache.org>
 
8
 *
 
9
 * SQUID Web Proxy Cache          http://www.squid-cache.org/
 
10
 * ----------------------------------------------------------
 
11
 *
 
12
 *  Squid is the result of efforts by numerous individuals from
 
13
 *  the Internet community; see the CONTRIBUTORS file for full
 
14
 *  details.   Many organizations have provided support for Squid's
 
15
 *  development; see the SPONSORS file for full details.  Squid is
 
16
 *  Copyrighted (C) 2001 by the Regents of the University of
 
17
 *  California; see the COPYRIGHT file for full details.  Squid
 
18
 *  incorporates software developed and/or copyrighted by other
 
19
 *  sources; see the CREDITS file for full details.
 
20
 *
 
21
 *  This program is free software; you can redistribute it and/or modify
 
22
 *  it under the terms of the GNU General Public License as published by
 
23
 *  the Free Software Foundation; either version 2 of the License, or
 
24
 *  (at your option) any later version.
 
25
 *  
 
26
 *  This program is distributed in the hope that it will be useful,
 
27
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
28
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
29
 *  GNU General Public License for more details.
 
30
 *  
 
31
 *  You should have received a copy of the GNU General Public License
 
32
 *  along with this program; if not, write to the Free Software
 
33
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
 
34
 *
 
35
 */
 
36
 
 
37
#include "squid.h"
 
38
#include "squid_windows.h"
 
39
#include "DiskThreads.h"
 
40
 
 
41
#include        <stdio.h>
 
42
#include        <sys/types.h>
 
43
#include        <sys/stat.h>
 
44
#include        <fcntl.h>
 
45
#include        <errno.h>
 
46
#include        <dirent.h>
 
47
#include        <signal.h>
 
48
#include "CommIO.h"
 
49
#include "SquidTime.h"
 
50
#include "Store.h"
 
51
 
 
52
#define RIDICULOUS_LENGTH       4096
 
53
 
 
54
enum _squidaio_thread_status {
 
55
    _THREAD_STARTING = 0,
 
56
    _THREAD_WAITING,
 
57
    _THREAD_BUSY,
 
58
    _THREAD_FAILED,
 
59
    _THREAD_DONE
 
60
};
 
61
typedef enum _squidaio_thread_status squidaio_thread_status;
 
62
 
 
63
typedef struct squidaio_request_t
 
64
{
 
65
 
 
66
    struct squidaio_request_t *next;
 
67
    squidaio_request_type request_type;
 
68
    int cancelled;
 
69
    char *path;
 
70
    int oflag;
 
71
    mode_t mode;
 
72
    int fd;
 
73
    char *bufferp;
 
74
    char *tmpbufp;
 
75
    int buflen;
 
76
    off_t offset;
 
77
    int whence;
 
78
    int ret;
 
79
    int err;
 
80
 
 
81
    struct stat *tmpstatp;
 
82
 
 
83
    struct stat *statp;
 
84
    squidaio_result_t *resultp;
 
85
}
 
86
 
 
87
squidaio_request_t;
 
88
 
 
89
typedef struct squidaio_request_queue_t
 
90
{
 
91
    HANDLE mutex;
 
92
    HANDLE cond; /* See Event objects */
 
93
    squidaio_request_t *volatile head;
 
94
    squidaio_request_t *volatile *volatile tailp;
 
95
    unsigned long requests;
 
96
    unsigned long blocked;      /* main failed to lock the queue */
 
97
}
 
98
 
 
99
squidaio_request_queue_t;
 
100
 
 
101
typedef struct squidaio_thread_t squidaio_thread_t;
 
102
 
 
103
struct squidaio_thread_t
 
104
{
 
105
    squidaio_thread_t *next;
 
106
    HANDLE thread;
 
107
    DWORD dwThreadId; /* thread ID */
 
108
    squidaio_thread_status status;
 
109
 
 
110
    struct squidaio_request_t *current_req;
 
111
    unsigned long requests;
 
112
    int volatile exit;
 
113
};
 
114
 
 
115
static void squidaio_queue_request(squidaio_request_t *);
 
116
static void squidaio_cleanup_request(squidaio_request_t *);
 
117
static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
 
118
static void squidaio_do_open(squidaio_request_t *);
 
119
static void squidaio_do_read(squidaio_request_t *);
 
120
static void squidaio_do_write(squidaio_request_t *);
 
121
static void squidaio_do_close(squidaio_request_t *);
 
122
static void squidaio_do_stat(squidaio_request_t *);
 
123
#if USE_TRUNCATE
 
124
static void squidaio_do_truncate(squidaio_request_t *);
 
125
#else
 
126
static void squidaio_do_unlink(squidaio_request_t *);
 
127
#endif
 
128
#if AIO_OPENDIR
 
129
static void *squidaio_do_opendir(squidaio_request_t *);
 
130
#endif
 
131
static void squidaio_debug(squidaio_request_t *);
 
132
static void squidaio_poll_queues(void);
 
133
 
 
134
static squidaio_thread_t *threads = NULL;
 
135
static int squidaio_initialised = 0;
 
136
 
 
137
 
 
138
#define AIO_LARGE_BUFS  16384
 
139
#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
 
140
#define AIO_SMALL_BUFS  AIO_LARGE_BUFS >> 2
 
141
#define AIO_TINY_BUFS   AIO_LARGE_BUFS >> 3
 
142
#define AIO_MICRO_BUFS  128
 
143
 
 
144
static MemAllocator *squidaio_large_bufs = NULL;        /* 16K */
 
145
static MemAllocator *squidaio_medium_bufs = NULL;       /* 8K */
 
146
static MemAllocator *squidaio_small_bufs = NULL;        /* 4K */
 
147
static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
 
148
static MemAllocator *squidaio_micro_bufs = NULL;        /* 128K */
 
149
 
 
150
static int request_queue_len = 0;
 
151
static MemAllocator *squidaio_request_pool = NULL;
 
152
static MemAllocator *squidaio_thread_pool = NULL;
 
153
static squidaio_request_queue_t request_queue;
 
154
 
 
155
static struct
 
156
{
 
157
    squidaio_request_t *head, **tailp;
 
158
}
 
159
 
 
160
request_queue2 = {
 
161
 
 
162
                     NULL, &request_queue2.head
 
163
                 };
 
164
static squidaio_request_queue_t done_queue;
 
165
 
 
166
static struct
 
167
{
 
168
    squidaio_request_t *head, **tailp;
 
169
}
 
170
 
 
171
done_requests = {
 
172
 
 
173
                    NULL, &done_requests.head
 
174
                };
 
175
 
 
176
static HANDLE main_thread;
 
177
 
 
178
static MemAllocator *
 
179
squidaio_get_pool(int size)
 
180
{
 
181
    if (size <= AIO_LARGE_BUFS) {
 
182
        if (size <= AIO_MICRO_BUFS)
 
183
            return squidaio_micro_bufs;
 
184
        else if (size <= AIO_TINY_BUFS)
 
185
            return squidaio_tiny_bufs;
 
186
        else if (size <= AIO_SMALL_BUFS)
 
187
            return squidaio_small_bufs;
 
188
        else if (size <= AIO_MEDIUM_BUFS)
 
189
            return squidaio_medium_bufs;
 
190
        else
 
191
            return squidaio_large_bufs;
 
192
    }
 
193
 
 
194
    return NULL;
 
195
}
 
196
 
 
197
void *
 
198
squidaio_xmalloc(int size)
 
199
{
 
200
    void *p;
 
201
    MemAllocator *pool;
 
202
 
 
203
    if ((pool = squidaio_get_pool(size)) != NULL) {
 
204
        p = pool->alloc();
 
205
    } else
 
206
        p = xmalloc(size);
 
207
 
 
208
    return p;
 
209
}
 
210
 
 
211
static char *
 
212
squidaio_xstrdup(const char *str)
 
213
{
 
214
    char *p;
 
215
    int len = strlen(str) + 1;
 
216
 
 
217
    p = (char *)squidaio_xmalloc(len);
 
218
    strncpy(p, str, len);
 
219
 
 
220
    return p;
 
221
}
 
222
 
 
223
void
 
224
squidaio_xfree(void *p, int size)
 
225
{
 
226
    MemAllocator *pool;
 
227
 
 
228
    if ((pool = squidaio_get_pool(size)) != NULL) {
 
229
        pool->free(p);
 
230
    } else
 
231
        xfree(p);
 
232
}
 
233
 
 
234
static void
 
235
squidaio_xstrfree(char *str)
 
236
{
 
237
    MemAllocator *pool;
 
238
    int len = strlen(str) + 1;
 
239
 
 
240
    if ((pool = squidaio_get_pool(len)) != NULL) {
 
241
        pool->free(str);
 
242
    } else
 
243
        xfree(str);
 
244
}
 
245
 
 
246
void
 
247
squidaio_init(void)
 
248
{
 
249
    int i;
 
250
    squidaio_thread_t *threadp;
 
251
 
 
252
    if (squidaio_initialised)
 
253
        return;
 
254
 
 
255
    if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
 
256
                         GetCurrentThread(),  /* pseudo handle to copy */
 
257
                         GetCurrentProcess(), /* pseudo handle, don't close */
 
258
                         &main_thread,
 
259
                         0,                   /* required access */
 
260
                         FALSE,               /* child process's don't inherit the handle */
 
261
                         DUPLICATE_SAME_ACCESS)) {
 
262
        /* spit errors */
 
263
        fatal("Couldn't get current thread handle");
 
264
    }
 
265
 
 
266
    /* Initialize request queue */
 
267
    if ((request_queue.mutex = CreateMutex(NULL,    /* no inheritance */
 
268
                                           FALSE,   /* start unowned (as per mutex_init) */
 
269
                                           NULL)    /* no name */
 
270
        ) == NULL) {
 
271
        fatal("Failed to create mutex");
 
272
    }
 
273
 
 
274
    if ((request_queue.cond = CreateEvent(NULL,     /* no inheritance */
 
275
                                          FALSE,    /* auto signal reset - which I think is pthreads like ? */
 
276
                                          FALSE,    /* start non signaled */
 
277
                                          NULL)     /* no name */
 
278
        ) == NULL) {
 
279
        fatal("Failed to create condition variable");
 
280
    }
 
281
 
 
282
    request_queue.head = NULL;
 
283
 
 
284
    request_queue.tailp = &request_queue.head;
 
285
 
 
286
    request_queue.requests = 0;
 
287
 
 
288
    request_queue.blocked = 0;
 
289
 
 
290
    /* Initialize done queue */
 
291
 
 
292
    if ((done_queue.mutex = CreateMutex(NULL,  /* no inheritance */
 
293
                                        FALSE, /* start unowned (as per mutex_init) */
 
294
                                        NULL)  /* no name */
 
295
        ) == NULL) {
 
296
        fatal("Failed to create mutex");
 
297
    }
 
298
 
 
299
    if ((done_queue.cond = CreateEvent(NULL,  /* no inheritance */
 
300
                                       TRUE,  /* manually signaled - which I think is pthreads like ? */
 
301
                                       FALSE, /* start non signaled */
 
302
                                       NULL)  /* no name */
 
303
        ) == NULL) {
 
304
        fatal("Failed to create condition variable");
 
305
    }
 
306
 
 
307
    done_queue.head = NULL;
 
308
 
 
309
    done_queue.tailp = &done_queue.head;
 
310
 
 
311
    done_queue.requests = 0;
 
312
 
 
313
    done_queue.blocked = 0;
 
314
 
 
315
    CommIO::NotifyIOCompleted();
 
316
 
 
317
    /* Create threads and get them to sit in their wait loop */
 
318
    squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
 
319
 
 
320
    assert(NUMTHREADS);
 
321
 
 
322
    for (i = 0; i < NUMTHREADS; i++) {
 
323
        threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
 
324
        threadp->status = _THREAD_STARTING;
 
325
        threadp->current_req = NULL;
 
326
        threadp->requests = 0;
 
327
        threadp->next = threads;
 
328
        threads = threadp;
 
329
 
 
330
        if ((threadp->thread = CreateThread(NULL,                   /* no security attributes */
 
331
                                            0,                      /* use default stack size */
 
332
                                            squidaio_thread_loop,   /* thread function */
 
333
                                            threadp,                /* argument to thread function */
 
334
                                            0,                      /* use default creation flags */
 
335
                                            &(threadp->dwThreadId)) /* returns the thread identifier */
 
336
            ) == NULL) {
 
337
            fprintf(stderr, "Thread creation failed\n");
 
338
            threadp->status = _THREAD_FAILED;
 
339
            continue;
 
340
        }
 
341
 
 
342
        /* Set the new thread priority above parent process */
 
343
        SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
 
344
    }
 
345
 
 
346
    /* Create request pool */
 
347
    squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
 
348
 
 
349
    squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
 
350
 
 
351
    squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
 
352
 
 
353
    squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
 
354
 
 
355
    squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
 
356
 
 
357
    squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
 
358
 
 
359
    squidaio_initialised = 1;
 
360
}
 
361
 
 
362
void
 
363
squidaio_shutdown(void)
 
364
{
 
365
    squidaio_thread_t *threadp;
 
366
    int i;
 
367
    HANDLE * hthreads;
 
368
 
 
369
    if (!squidaio_initialised)
 
370
        return;
 
371
 
 
372
    /* This is the same as in squidaio_sync */
 
373
    do {
 
374
        squidaio_poll_queues();
 
375
    } while (request_queue_len > 0);
 
376
 
 
377
    hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
 
378
 
 
379
    threadp = threads;
 
380
 
 
381
    for (i = 0; i < NUMTHREADS; i++) {
 
382
        threadp->exit = 1;
 
383
        hthreads[i] = threadp->thread;
 
384
        threadp = threadp->next;
 
385
    }
 
386
 
 
387
    ReleaseMutex(request_queue.mutex);
 
388
    ResetEvent(request_queue.cond);
 
389
    ReleaseMutex(done_queue.mutex);
 
390
    ResetEvent(done_queue.cond);
 
391
    Sleep(0);
 
392
 
 
393
    WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
 
394
 
 
395
    for (i = 0; i < NUMTHREADS; i++) {
 
396
        CloseHandle(hthreads[i]);
 
397
    }
 
398
 
 
399
    CloseHandle(main_thread);
 
400
    CommIO::NotifyIOClose();
 
401
 
 
402
    squidaio_initialised = 0;
 
403
    xfree(hthreads);
 
404
}
 
405
 
 
406
static DWORD WINAPI
 
407
squidaio_thread_loop(LPVOID lpParam)
 
408
{
 
409
    squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
 
410
    squidaio_request_t *request;
 
411
    HANDLE cond; /* local copy of the event queue because win32 event handles
 
412
                              * don't atomically release the mutex as cond variables do. */
 
413
 
 
414
    /* lock the thread info */
 
415
 
 
416
    if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
 
417
        fatal("Can't get ownership of mutex\n");
 
418
    }
 
419
 
 
420
    /* duplicate the handle */
 
421
    if (!DuplicateHandle(GetCurrentProcess(),    /* pseudo handle, don't close */
 
422
                         request_queue.cond,     /* handle to copy */
 
423
                         GetCurrentProcess(),    /* pseudo handle, don't close */
 
424
                         &cond,
 
425
                         0,                      /* required access */
 
426
                         FALSE,                  /* child process's don't inherit the handle */
 
427
                         DUPLICATE_SAME_ACCESS))
 
428
        fatal("Can't duplicate mutex handle\n");
 
429
 
 
430
    if (!ReleaseMutex(request_queue.mutex)) {
 
431
        CloseHandle(cond);
 
432
        fatal("Can't release mutex\n");
 
433
    }
 
434
 
 
435
    Sleep(0);
 
436
 
 
437
    while (1) {
 
438
        DWORD rv;
 
439
        threadp->current_req = request = NULL;
 
440
        request = NULL;
 
441
        /* Get a request to process */
 
442
        threadp->status = _THREAD_WAITING;
 
443
 
 
444
        if (threadp->exit) {
 
445
            CloseHandle(request_queue.mutex);
 
446
            CloseHandle(cond);
 
447
            return 0;
 
448
        }
 
449
 
 
450
        rv = WaitForSingleObject(request_queue.mutex, INFINITE);
 
451
 
 
452
        if (rv == WAIT_FAILED) {
 
453
            CloseHandle(cond);
 
454
            return 1;
 
455
        }
 
456
 
 
457
        while (!request_queue.head) {
 
458
            if (!ReleaseMutex(request_queue.mutex)) {
 
459
                CloseHandle(cond);
 
460
                threadp->status = _THREAD_FAILED;
 
461
                return 1;
 
462
            }
 
463
 
 
464
            Sleep(0);
 
465
            rv = WaitForSingleObject(cond, INFINITE);
 
466
 
 
467
            if (rv == WAIT_FAILED) {
 
468
                CloseHandle(cond);
 
469
                return 1;
 
470
            }
 
471
 
 
472
            rv = WaitForSingleObject(request_queue.mutex, INFINITE);
 
473
 
 
474
            if (rv == WAIT_FAILED) {
 
475
                CloseHandle(cond);
 
476
                return 1;
 
477
            }
 
478
        }
 
479
 
 
480
        request = request_queue.head;
 
481
 
 
482
        if (request)
 
483
            request_queue.head = request->next;
 
484
 
 
485
        if (!request_queue.head)
 
486
            request_queue.tailp = &request_queue.head;
 
487
 
 
488
        if (!ReleaseMutex(request_queue.mutex)) {
 
489
            CloseHandle(cond);
 
490
            return 1;
 
491
        }
 
492
 
 
493
        Sleep(0);
 
494
 
 
495
        /* process the request */
 
496
        threadp->status = _THREAD_BUSY;
 
497
 
 
498
        request->next = NULL;
 
499
 
 
500
        threadp->current_req = request;
 
501
 
 
502
        errno = 0;
 
503
 
 
504
        if (!request->cancelled) {
 
505
            switch (request->request_type) {
 
506
 
 
507
            case _AIO_OP_OPEN:
 
508
                squidaio_do_open(request);
 
509
                break;
 
510
 
 
511
            case _AIO_OP_READ:
 
512
                squidaio_do_read(request);
 
513
                break;
 
514
 
 
515
            case _AIO_OP_WRITE:
 
516
                squidaio_do_write(request);
 
517
                break;
 
518
 
 
519
            case _AIO_OP_CLOSE:
 
520
                squidaio_do_close(request);
 
521
                break;
 
522
 
 
523
#if USE_TRUNCATE
 
524
 
 
525
            case _AIO_OP_TRUNCATE:
 
526
                squidaio_do_truncate(request);
 
527
                break;
 
528
#else
 
529
 
 
530
            case _AIO_OP_UNLINK:
 
531
                squidaio_do_unlink(request);
 
532
                break;
 
533
 
 
534
#endif
 
535
#if AIO_OPENDIR                 /* Opendir not implemented yet */
 
536
 
 
537
            case _AIO_OP_OPENDIR:
 
538
                squidaio_do_opendir(request);
 
539
                break;
 
540
#endif
 
541
 
 
542
            case _AIO_OP_STAT:
 
543
                squidaio_do_stat(request);
 
544
                break;
 
545
 
 
546
            default:
 
547
                request->ret = -1;
 
548
                request->err = EINVAL;
 
549
                break;
 
550
            }
 
551
        } else {                /* cancelled */
 
552
            request->ret = -1;
 
553
            request->err = EINTR;
 
554
        }
 
555
 
 
556
        threadp->status = _THREAD_DONE;
 
557
        /* put the request in the done queue */
 
558
        rv = WaitForSingleObject(done_queue.mutex, INFINITE);
 
559
 
 
560
        if (rv == WAIT_FAILED) {
 
561
            CloseHandle(cond);
 
562
            return 1;
 
563
        }
 
564
 
 
565
        *done_queue.tailp = request;
 
566
        done_queue.tailp = &request->next;
 
567
 
 
568
        if (!ReleaseMutex(done_queue.mutex)) {
 
569
            CloseHandle(cond);
 
570
            return 1;
 
571
        }
 
572
 
 
573
        CommIO::NotifyIOCompleted();
 
574
        Sleep(0);
 
575
        threadp->requests++;
 
576
    }                           /* while forever */
 
577
 
 
578
    CloseHandle(cond);
 
579
 
 
580
    return 0;
 
581
}                               /* squidaio_thread_loop */
 
582
 
 
583
static void
 
584
squidaio_queue_request(squidaio_request_t * request)
 
585
{
 
586
    static int high_start = 0;
 
587
    debug(43, 9) ("squidaio_queue_request: %p type=%d result=%p\n",
 
588
                  request, request->request_type, request->resultp);
 
589
    /* Mark it as not executed (failing result, no error) */
 
590
    request->ret = -1;
 
591
    request->err = 0;
 
592
    /* Internal housekeeping */
 
593
    request_queue_len += 1;
 
594
    request->resultp->_data = request;
 
595
    /* Play some tricks with the request_queue2 queue */
 
596
    request->next = NULL;
 
597
 
 
598
    if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
 
599
        if (request_queue2.head) {
 
600
            /* Grab blocked requests */
 
601
            *request_queue.tailp = request_queue2.head;
 
602
            request_queue.tailp = request_queue2.tailp;
 
603
        }
 
604
 
 
605
        /* Enqueue request */
 
606
        *request_queue.tailp = request;
 
607
 
 
608
        request_queue.tailp = &request->next;
 
609
 
 
610
        if (!SetEvent(request_queue.cond))
 
611
            fatal("Couldn't push queue");
 
612
 
 
613
        if (!ReleaseMutex(request_queue.mutex)) {
 
614
            /* unexpected error */
 
615
            fatal("Couldn't push queue");
 
616
        }
 
617
 
 
618
        Sleep(0);
 
619
 
 
620
        if (request_queue2.head) {
 
621
            /* Clear queue of blocked requests */
 
622
            request_queue2.head = NULL;
 
623
            request_queue2.tailp = &request_queue2.head;
 
624
        }
 
625
    } else {
 
626
        /* Oops, the request queue is blocked, use request_queue2 */
 
627
        *request_queue2.tailp = request;
 
628
        request_queue2.tailp = &request->next;
 
629
    }
 
630
 
 
631
    if (request_queue2.head) {
 
632
        static int filter = 0;
 
633
        static int filter_limit = 8;
 
634
 
 
635
        if (++filter >= filter_limit) {
 
636
            filter_limit += filter;
 
637
            filter = 0;
 
638
            debug(43, 1) ("squidaio_queue_request: WARNING - Queue congestion\n");
 
639
        }
 
640
    }
 
641
 
 
642
    /* Warn if out of threads */
 
643
    if (request_queue_len > MAGIC1) {
 
644
        static int last_warn = 0;
 
645
        static int queue_high, queue_low;
 
646
 
 
647
        if (high_start == 0) {
 
648
            high_start = (int)squid_curtime;
 
649
            queue_high = request_queue_len;
 
650
            queue_low = request_queue_len;
 
651
        }
 
652
 
 
653
        if (request_queue_len > queue_high)
 
654
            queue_high = request_queue_len;
 
655
 
 
656
        if (request_queue_len < queue_low)
 
657
            queue_low = request_queue_len;
 
658
 
 
659
        if (squid_curtime >= (last_warn + 15) &&
 
660
                squid_curtime >= (high_start + 5)) {
 
661
            debug(43, 1) ("squidaio_queue_request: WARNING - Disk I/O overloading\n");
 
662
 
 
663
            if (squid_curtime >= (high_start + 15))
 
664
                debug(43, 1) ("squidaio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%ld\n",
 
665
                              request_queue_len, queue_high, queue_low, (long int) (squid_curtime - high_start));
 
666
 
 
667
            last_warn = (int)squid_curtime;
 
668
        }
 
669
    } else {
 
670
        high_start = 0;
 
671
    }
 
672
 
 
673
    /* Warn if seriously overloaded */
 
674
    if (request_queue_len > RIDICULOUS_LENGTH) {
 
675
        debug(43, 0) ("squidaio_queue_request: Async request queue growing uncontrollably!\n");
 
676
        debug(43, 0) ("squidaio_queue_request: Syncing pending I/O operations.. (blocking)\n");
 
677
        squidaio_sync();
 
678
        debug(43, 0) ("squidaio_queue_request: Synced\n");
 
679
    }
 
680
}                               /* squidaio_queue_request */
 
681
 
 
682
static void
 
683
squidaio_cleanup_request(squidaio_request_t * requestp)
 
684
{
 
685
    squidaio_result_t *resultp = requestp->resultp;
 
686
    int cancelled = requestp->cancelled;
 
687
 
 
688
    /* Free allocated structures and copy data back to user space if the */
 
689
    /* request hasn't been cancelled */
 
690
 
 
691
    switch (requestp->request_type) {
 
692
 
 
693
    case _AIO_OP_STAT:
 
694
 
 
695
        if (!cancelled && requestp->ret == 0)
 
696
 
 
697
            xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
 
698
 
 
699
        squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
 
700
 
 
701
        squidaio_xstrfree(requestp->path);
 
702
 
 
703
        break;
 
704
 
 
705
    case _AIO_OP_OPEN:
 
706
        if (cancelled && requestp->ret >= 0)
 
707
            /* The open() was cancelled but completed */
 
708
            close(requestp->ret);
 
709
 
 
710
        squidaio_xstrfree(requestp->path);
 
711
 
 
712
        break;
 
713
 
 
714
    case _AIO_OP_CLOSE:
 
715
        if (cancelled && requestp->ret < 0)
 
716
            /* The close() was cancelled and never got executed */
 
717
            close(requestp->fd);
 
718
 
 
719
        break;
 
720
 
 
721
    case _AIO_OP_UNLINK:
 
722
 
 
723
    case _AIO_OP_TRUNCATE:
 
724
 
 
725
    case _AIO_OP_OPENDIR:
 
726
        squidaio_xstrfree(requestp->path);
 
727
 
 
728
        break;
 
729
 
 
730
    case _AIO_OP_READ:
 
731
        break;
 
732
 
 
733
    case _AIO_OP_WRITE:
 
734
        break;
 
735
 
 
736
    default:
 
737
        break;
 
738
    }
 
739
 
 
740
    if (resultp != NULL && !cancelled) {
 
741
        resultp->aio_return = requestp->ret;
 
742
        resultp->aio_errno = requestp->err;
 
743
    }
 
744
 
 
745
    squidaio_request_pool->free(requestp);
 
746
}                               /* squidaio_cleanup_request */
 
747
 
 
748
 
 
749
int
 
750
squidaio_cancel(squidaio_result_t * resultp)
 
751
{
 
752
    squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
 
753
 
 
754
    if (request && request->resultp == resultp) {
 
755
        debug(43, 9) ("squidaio_cancel: %p type=%d result=%p\n",
 
756
                      request, request->request_type, request->resultp);
 
757
        request->cancelled = 1;
 
758
        request->resultp = NULL;
 
759
        resultp->_data = NULL;
 
760
        resultp->result_type = _AIO_OP_NONE;
 
761
        return 0;
 
762
    }
 
763
 
 
764
    return 1;
 
765
}                               /* squidaio_cancel */
 
766
 
 
767
 
 
768
int
 
769
squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
 
770
{
 
771
    squidaio_init();
 
772
    squidaio_request_t *requestp;
 
773
 
 
774
    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
 
775
 
 
776
    requestp->path = (char *) squidaio_xstrdup(path);
 
777
 
 
778
    requestp->oflag = oflag;
 
779
 
 
780
    requestp->mode = mode;
 
781
 
 
782
    requestp->resultp = resultp;
 
783
 
 
784
    requestp->request_type = _AIO_OP_OPEN;
 
785
 
 
786
    requestp->cancelled = 0;
 
787
 
 
788
    resultp->result_type = _AIO_OP_OPEN;
 
789
 
 
790
    squidaio_queue_request(requestp);
 
791
 
 
792
    return 0;
 
793
}
 
794
 
 
795
 
 
796
static void
 
797
squidaio_do_open(squidaio_request_t * requestp)
 
798
{
 
799
    requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
 
800
    requestp->err = errno;
 
801
}
 
802
 
 
803
 
 
804
int
 
805
squidaio_read(int fd, char *bufp, int bufs, off_t offset, int whence, squidaio_result_t * resultp)
 
806
{
 
807
    squidaio_request_t *requestp;
 
808
 
 
809
    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
 
810
 
 
811
    requestp->fd = fd;
 
812
 
 
813
    requestp->bufferp = bufp;
 
814
 
 
815
    requestp->buflen = bufs;
 
816
 
 
817
    requestp->offset = offset;
 
818
 
 
819
    requestp->whence = whence;
 
820
 
 
821
    requestp->resultp = resultp;
 
822
 
 
823
    requestp->request_type = _AIO_OP_READ;
 
824
 
 
825
    requestp->cancelled = 0;
 
826
 
 
827
    resultp->result_type = _AIO_OP_READ;
 
828
 
 
829
    squidaio_queue_request(requestp);
 
830
 
 
831
    return 0;
 
832
}
 
833
 
 
834
 
 
835
static void
 
836
squidaio_do_read(squidaio_request_t * requestp)
 
837
{
 
838
    lseek(requestp->fd, requestp->offset, requestp->whence);
 
839
 
 
840
    if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
 
841
                  requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
 
842
        WIN32_maperror(GetLastError());
 
843
        requestp->ret = -1;
 
844
    }
 
845
 
 
846
    requestp->err = errno;
 
847
}
 
848
 
 
849
 
 
850
int
 
851
squidaio_write(int fd, char *bufp, int bufs, off_t offset, int whence, squidaio_result_t * resultp)
 
852
{
 
853
    squidaio_request_t *requestp;
 
854
 
 
855
    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
 
856
 
 
857
    requestp->fd = fd;
 
858
 
 
859
    requestp->bufferp = bufp;
 
860
 
 
861
    requestp->buflen = bufs;
 
862
 
 
863
    requestp->offset = offset;
 
864
 
 
865
    requestp->whence = whence;
 
866
 
 
867
    requestp->resultp = resultp;
 
868
 
 
869
    requestp->request_type = _AIO_OP_WRITE;
 
870
 
 
871
    requestp->cancelled = 0;
 
872
 
 
873
    resultp->result_type = _AIO_OP_WRITE;
 
874
 
 
875
    squidaio_queue_request(requestp);
 
876
 
 
877
    return 0;
 
878
}
 
879
 
 
880
 
 
881
static void
 
882
squidaio_do_write(squidaio_request_t * requestp)
 
883
{
 
884
    if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
 
885
                   requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
 
886
        WIN32_maperror(GetLastError());
 
887
        requestp->ret = -1;
 
888
    }
 
889
 
 
890
    requestp->err = errno;
 
891
}
 
892
 
 
893
 
 
894
int
 
895
squidaio_close(int fd, squidaio_result_t * resultp)
 
896
{
 
897
    squidaio_request_t *requestp;
 
898
 
 
899
    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
 
900
 
 
901
    requestp->fd = fd;
 
902
 
 
903
    requestp->resultp = resultp;
 
904
 
 
905
    requestp->request_type = _AIO_OP_CLOSE;
 
906
 
 
907
    requestp->cancelled = 0;
 
908
 
 
909
    resultp->result_type = _AIO_OP_CLOSE;
 
910
 
 
911
    squidaio_queue_request(requestp);
 
912
 
 
913
    return 0;
 
914
}
 
915
 
 
916
 
 
917
static void
 
918
squidaio_do_close(squidaio_request_t * requestp)
 
919
{
 
920
    if((requestp->ret = close(requestp->fd)) < 0) {
 
921
        debug(43, 0) ("squidaio_do_close: FD %d, errno %d\n", requestp->fd, errno);
 
922
        close(requestp->fd);
 
923
    }
 
924
 
 
925
    requestp->err = errno;
 
926
}
 
927
 
 
928
 
 
929
int
 
930
 
 
931
squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
 
932
{
 
933
    squidaio_init();
 
934
    squidaio_request_t *requestp;
 
935
 
 
936
    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
 
937
 
 
938
    requestp->path = (char *) squidaio_xstrdup(path);
 
939
 
 
940
    requestp->statp = sb;
 
941
 
 
942
    requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
 
943
 
 
944
    requestp->resultp = resultp;
 
945
 
 
946
    requestp->request_type = _AIO_OP_STAT;
 
947
 
 
948
    requestp->cancelled = 0;
 
949
 
 
950
    resultp->result_type = _AIO_OP_STAT;
 
951
 
 
952
    squidaio_queue_request(requestp);
 
953
 
 
954
    return 0;
 
955
}
 
956
 
 
957
 
 
958
static void
 
959
squidaio_do_stat(squidaio_request_t * requestp)
 
960
{
 
961
    requestp->ret = stat(requestp->path, requestp->tmpstatp);
 
962
    requestp->err = errno;
 
963
}
 
964
 
 
965
 
 
966
#if USE_TRUNCATE
 
967
int
 
968
squidaio_truncate(const char *path, off_t length, squidaio_result_t * resultp)
 
969
{
 
970
    squidaio_init();
 
971
    squidaio_request_t *requestp;
 
972
 
 
973
    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
 
974
 
 
975
    requestp->path = (char *) squidaio_xstrdup(path);
 
976
 
 
977
    requestp->offset = length;
 
978
 
 
979
    requestp->resultp = resultp;
 
980
 
 
981
    requestp->request_type = _AIO_OP_TRUNCATE;
 
982
 
 
983
    requestp->cancelled = 0;
 
984
 
 
985
    resultp->result_type = _AIO_OP_TRUNCATE;
 
986
 
 
987
    squidaio_queue_request(requestp);
 
988
 
 
989
    return 0;
 
990
}
 
991
 
 
992
 
 
993
static void
 
994
squidaio_do_truncate(squidaio_request_t * requestp)
 
995
{
 
996
    requestp->ret = truncate(requestp->path, requestp->offset);
 
997
    requestp->err = errno;
 
998
}
 
999
 
 
1000
 
 
1001
#else
 
1002
int
 
1003
squidaio_unlink(const char *path, squidaio_result_t * resultp)
 
1004
{
 
1005
    squidaio_init();
 
1006
    squidaio_request_t *requestp;
 
1007
 
 
1008
    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
 
1009
 
 
1010
    requestp->path = squidaio_xstrdup(path);
 
1011
 
 
1012
    requestp->resultp = resultp;
 
1013
 
 
1014
    requestp->request_type = _AIO_OP_UNLINK;
 
1015
 
 
1016
    requestp->cancelled = 0;
 
1017
 
 
1018
    resultp->result_type = _AIO_OP_UNLINK;
 
1019
 
 
1020
    squidaio_queue_request(requestp);
 
1021
 
 
1022
    return 0;
 
1023
}
 
1024
 
 
1025
 
 
1026
static void
 
1027
squidaio_do_unlink(squidaio_request_t * requestp)
 
1028
{
 
1029
    requestp->ret = unlink(requestp->path);
 
1030
    requestp->err = errno;
 
1031
}
 
1032
 
 
1033
#endif
 
1034
 
 
1035
#if AIO_OPENDIR
 
1036
/* XXX squidaio_opendir NOT implemented yet.. */
 
1037
 
 
1038
int
 
1039
squidaio_opendir(const char *path, squidaio_result_t * resultp)
 
1040
{
 
1041
    squidaio_request_t *requestp;
 
1042
    int len;
 
1043
 
 
1044
    requestp = squidaio_request_pool->alloc();
 
1045
 
 
1046
    resultp->result_type = _AIO_OP_OPENDIR;
 
1047
 
 
1048
    return -1;
 
1049
}
 
1050
 
 
1051
static void
 
1052
squidaio_do_opendir(squidaio_request_t * requestp)
 
1053
{
 
1054
    /* NOT IMPLEMENTED */
 
1055
}
 
1056
 
 
1057
#endif
 
1058
 
 
1059
static void
 
1060
squidaio_poll_queues(void)
 
1061
{
 
1062
    /* kick "overflow" request queue */
 
1063
 
 
1064
    if (request_queue2.head &&
 
1065
            (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
 
1066
        *request_queue.tailp = request_queue2.head;
 
1067
        request_queue.tailp = request_queue2.tailp;
 
1068
 
 
1069
        if (!SetEvent(request_queue.cond))
 
1070
            fatal("couldn't push queue\n");
 
1071
 
 
1072
        if (!ReleaseMutex(request_queue.mutex)) {
 
1073
            /* unexpected error */
 
1074
        }
 
1075
 
 
1076
        Sleep(0);
 
1077
        request_queue2.head = NULL;
 
1078
        request_queue2.tailp = &request_queue2.head;
 
1079
    }
 
1080
 
 
1081
    /* poll done queue */
 
1082
    if (done_queue.head &&
 
1083
            (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
 
1084
 
 
1085
        struct squidaio_request_t *requests = done_queue.head;
 
1086
        done_queue.head = NULL;
 
1087
        done_queue.tailp = &done_queue.head;
 
1088
 
 
1089
        if (!ReleaseMutex(done_queue.mutex)) {
 
1090
            /* unexpected error */
 
1091
        }
 
1092
 
 
1093
        Sleep(0);
 
1094
        *done_requests.tailp = requests;
 
1095
        request_queue_len -= 1;
 
1096
 
 
1097
        while (requests->next) {
 
1098
            requests = requests->next;
 
1099
            request_queue_len -= 1;
 
1100
        }
 
1101
 
 
1102
        done_requests.tailp = &requests->next;
 
1103
    }
 
1104
}
 
1105
 
 
1106
squidaio_result_t *
 
1107
squidaio_poll_done(void)
 
1108
{
 
1109
    squidaio_request_t *request;
 
1110
    squidaio_result_t *resultp;
 
1111
    int cancelled;
 
1112
    int polled = 0;
 
1113
 
 
1114
AIO_REPOLL:
 
1115
    request = done_requests.head;
 
1116
 
 
1117
    if (request == NULL && !polled) {
 
1118
        CommIO::ResetNotifications();
 
1119
        squidaio_poll_queues();
 
1120
        polled = 1;
 
1121
        request = done_requests.head;
 
1122
    }
 
1123
 
 
1124
    if (!request) {
 
1125
        return NULL;
 
1126
    }
 
1127
 
 
1128
    debug(43, 9) ("squidaio_poll_done: %p type=%d result=%p\n",
 
1129
                  request, request->request_type, request->resultp);
 
1130
    done_requests.head = request->next;
 
1131
 
 
1132
    if (!done_requests.head)
 
1133
        done_requests.tailp = &done_requests.head;
 
1134
 
 
1135
    resultp = request->resultp;
 
1136
 
 
1137
    cancelled = request->cancelled;
 
1138
 
 
1139
    squidaio_debug(request);
 
1140
 
 
1141
    debug(43, 5) ("DONE: %d -> %d\n", request->ret, request->err);
 
1142
 
 
1143
    squidaio_cleanup_request(request);
 
1144
 
 
1145
    if (cancelled)
 
1146
        goto AIO_REPOLL;
 
1147
 
 
1148
    return resultp;
 
1149
}                               /* squidaio_poll_done */
 
1150
 
 
1151
int
 
1152
squidaio_operations_pending(void)
 
1153
{
 
1154
    return request_queue_len + (done_requests.head ? 1 : 0);
 
1155
}
 
1156
 
 
1157
int
 
1158
squidaio_sync(void)
 
1159
{
 
1160
    /* XXX This might take a while if the queue is large.. */
 
1161
 
 
1162
    do {
 
1163
        squidaio_poll_queues();
 
1164
    } while (request_queue_len > 0);
 
1165
 
 
1166
    return squidaio_operations_pending();
 
1167
}
 
1168
 
 
1169
int
 
1170
squidaio_get_queue_len(void)
 
1171
{
 
1172
    return request_queue_len;
 
1173
}
 
1174
 
 
1175
static void
 
1176
squidaio_debug(squidaio_request_t * request)
 
1177
{
 
1178
    switch (request->request_type) {
 
1179
 
 
1180
    case _AIO_OP_OPEN:
 
1181
        debug(43, 5) ("OPEN of %s to FD %d\n", request->path, request->ret);
 
1182
        break;
 
1183
 
 
1184
    case _AIO_OP_READ:
 
1185
        debug(43, 5) ("READ on fd: %d\n", request->fd);
 
1186
        break;
 
1187
 
 
1188
    case _AIO_OP_WRITE:
 
1189
        debug(43, 5) ("WRITE on fd: %d\n", request->fd);
 
1190
        break;
 
1191
 
 
1192
    case _AIO_OP_CLOSE:
 
1193
        debug(43, 5) ("CLOSE of fd: %d\n", request->fd);
 
1194
        break;
 
1195
 
 
1196
    case _AIO_OP_UNLINK:
 
1197
        debug(43, 5) ("UNLINK of %s\n", request->path);
 
1198
        break;
 
1199
 
 
1200
    case _AIO_OP_TRUNCATE:
 
1201
        debug(43, 5) ("UNLINK of %s\n", request->path);
 
1202
        break;
 
1203
 
 
1204
    default:
 
1205
        break;
 
1206
    }
 
1207
}
 
1208
 
 
1209
void
 
1210
squidaio_stats(StoreEntry * sentry)
 
1211
{
 
1212
    squidaio_thread_t *threadp;
 
1213
    int i;
 
1214
 
 
1215
    if (!squidaio_initialised)
 
1216
        return;
 
1217
 
 
1218
    storeAppendPrintf(sentry, "\n\nThreads Status:\n");
 
1219
 
 
1220
    storeAppendPrintf(sentry, "#\tID\t# Requests\n");
 
1221
 
 
1222
    threadp = threads;
 
1223
 
 
1224
    for (i = 0; i < NUMTHREADS; i++) {
 
1225
        storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
 
1226
        threadp = threadp->next;
 
1227
    }
 
1228
}