~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/cache_xt.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-10-02 14:17:48 UTC
  • mfrom: (1.1.1 upstream)
  • mto: (2.1.17 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101002141748-m6vbfbfjhrw1153e
Tags: 2010.09.1802-1
* New upstream release.
* Removed pid-file argument hack.
* Updated GPL-2 address to be new address.
* Directly copy in drizzledump.1 since debian doesn't have sphinx 1.0 yet.
* Link to jquery from libjs-jquery. Add it as a depend.
* Add drizzled.8 symlink to the install files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2005 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase XT
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * 2005-05-24   Paul McCullagh
 
20
 *
 
21
 * H&G2JCtL
 
22
 */
 
23
 
 
24
#include "xt_config.h"
 
25
 
 
26
#ifdef DRIZZLED
 
27
#include <bitset>
 
28
#endif
 
29
 
 
30
#ifndef XT_WIN
 
31
#include <unistd.h>
 
32
#endif
 
33
 
 
34
#include <stdio.h>
 
35
#include <time.h>
 
36
 
 
37
#include "pthread_xt.h"
 
38
#include "thread_xt.h"
 
39
#include "filesys_xt.h"
 
40
#include "cache_xt.h"
 
41
#include "table_xt.h"
 
42
#include "trace_xt.h"
 
43
#include "util_xt.h"
 
44
 
 
45
#define XT_TIME_DIFF(start, now) (\
 
46
        ((xtWord4) (now) < (xtWord4) (start)) ? \
 
47
        ((xtWord4) 0XFFFFFFFF - ((xtWord4) (start) - (xtWord4) (now))) : \
 
48
        ((xtWord4) (now) - (xtWord4) (start)))
 
49
 
 
50
/*
 
51
 * -----------------------------------------------------------------------
 
52
 * D I S K   C A C H E
 
53
 */
 
54
 
 
55
#define IDX_CAC_SEGMENT_COUNT           ((off_t) 1 << XT_INDEX_CACHE_SEGMENT_SHIFTS)
 
56
#define IDX_CAC_SEGMENT_MASK            (IDX_CAC_SEGMENT_COUNT - 1)
 
57
 
 
58
#ifdef XT_NO_ATOMICS
 
59
#define IDX_CAC_USE_PTHREAD_RW
 
60
#else
 
61
//#define IDX_CAC_USE_PTHREAD_RW
 
62
#define IDX_CAC_USE_XSMUTEX
 
63
//#define IDX_USE_SPINXSLOCK
 
64
#endif
 
65
 
 
66
#if defined(IDX_CAC_USE_PTHREAD_RW)
 
67
#define IDX_CAC_LOCK_TYPE                               xt_rwlock_type
 
68
#define IDX_CAC_INIT_LOCK(s, i)                 xt_init_rwlock_with_autoname(s, &(i)->cs_lock)
 
69
#define IDX_CAC_FREE_LOCK(s, i)                 xt_free_rwlock(&(i)->cs_lock)   
 
70
#define IDX_CAC_READ_LOCK(i, o)                 xt_slock_rwlock_ns(&(i)->cs_lock)
 
71
#define IDX_CAC_WRITE_LOCK(i, o)                xt_xlock_rwlock_ns(&(i)->cs_lock)
 
72
#define IDX_CAC_UNLOCK(i, o)                    xt_unlock_rwlock_ns(&(i)->cs_lock)
 
73
#elif defined(IDX_CAC_USE_XSMUTEX)
 
74
#define IDX_CAC_LOCK_TYPE                               XTMutexXSLockRec
 
75
#define IDX_CAC_INIT_LOCK(s, i)                 xt_xsmutex_init_with_autoname(s, &(i)->cs_lock)
 
76
#define IDX_CAC_FREE_LOCK(s, i)                 xt_xsmutex_free(s, &(i)->cs_lock)       
 
77
#define IDX_CAC_READ_LOCK(i, o)                 xt_xsmutex_slock(&(i)->cs_lock, (o)->t_id)
 
78
#define IDX_CAC_WRITE_LOCK(i, o)                xt_xsmutex_xlock(&(i)->cs_lock, (o)->t_id)
 
79
#define IDX_CAC_UNLOCK(i, o)                    xt_xsmutex_unlock(&(i)->cs_lock, (o)->t_id)
 
80
#elif defined(IDX_CAC_USE_SPINXSLOCK)
 
81
#define IDX_CAC_LOCK_TYPE                               XTSpinXSLockRec
 
82
#define IDX_CAC_INIT_LOCK(s, i)                 xt_spinxslock_init_with_autoname(s, &(i)->cs_lock)
 
83
#define IDX_CAC_FREE_LOCK(s, i)                 xt_spinxslock_free(s, &(i)->cs_lock)    
 
84
#define IDX_CAC_READ_LOCK(i, s)                 xt_spinxslock_slock(&(i)->cs_lock, (s)->t_id)
 
85
#define IDX_CAC_WRITE_LOCK(i, s)                xt_spinxslock_xlock(&(i)->cs_lock, FALSE, (s)->t_id)
 
86
#define IDX_CAC_UNLOCK(i, s)                    xt_spinxslock_unlock(&(i)->cs_lock, (s)->t_id)
 
87
#else
 
88
#error Please define the lock type
 
89
#endif
 
90
 
 
91
#ifdef XT_NO_ATOMICS
 
92
#define ID_HANDLE_USE_PTHREAD_RW
 
93
#else
 
94
//#define ID_HANDLE_USE_PTHREAD_RW
 
95
#define ID_HANDLE_USE_SPINLOCK
 
96
#endif
 
97
 
 
98
#if defined(ID_HANDLE_USE_PTHREAD_RW)
 
99
#define ID_HANDLE_LOCK_TYPE                             xt_mutex_type
 
100
#define ID_HANDLE_INIT_LOCK(s, i)               xt_init_mutex_with_autoname(s, i)
 
101
#define ID_HANDLE_FREE_LOCK(s, i)               xt_free_mutex(i)        
 
102
#define ID_HANDLE_LOCK(i)                               xt_lock_mutex_ns(i)
 
103
#define ID_HANDLE_UNLOCK(i)                             xt_unlock_mutex_ns(i)
 
104
#elif defined(ID_HANDLE_USE_SPINLOCK)
 
105
#define ID_HANDLE_LOCK_TYPE                             XTSpinLockRec
 
106
#define ID_HANDLE_INIT_LOCK(s, i)               xt_spinlock_init_with_autoname(s, i)
 
107
#define ID_HANDLE_FREE_LOCK(s, i)               xt_spinlock_free(s, i)  
 
108
#define ID_HANDLE_LOCK(i)                               xt_spinlock_lock(i)
 
109
#define ID_HANDLE_UNLOCK(i)                             xt_spinlock_unlock(i)
 
110
#endif
 
111
 
 
112
#define XT_HANDLE_SLOTS                                 37
 
113
 
 
114
/*
 
115
#ifdef DEBUG
 
116
#define XT_INIT_HANDLE_COUNT                    0
 
117
#define XT_INIT_HANDLE_BLOCKS                   0
 
118
#else
 
119
#define XT_INIT_HANDLE_COUNT                    40
 
120
#define XT_INIT_HANDLE_BLOCKS                   10
 
121
#endif
 
122
*/
 
123
 
 
124
/* A disk cache segment. The cache is divided into a number of segments
 
125
 * to improve concurrency.
 
126
 */
 
127
typedef struct DcSegment {
 
128
        IDX_CAC_LOCK_TYPE       cs_lock;                                                /* The cache segment lock. */
 
129
        XTIndBlockPtr           *cs_hash_table;
 
130
} DcSegmentRec, *DcSegmentPtr;
 
131
 
 
132
typedef struct DcHandleSlot {
 
133
        ID_HANDLE_LOCK_TYPE     hs_handles_lock;
 
134
        XTIndHandleBlockPtr     hs_free_blocks;
 
135
        XTIndHandlePtr          hs_free_handles;
 
136
        XTIndHandlePtr          hs_used_handles;
 
137
} DcHandleSlotRec, *DcHandleSlotPtr;
 
138
 
 
139
typedef struct DcGlobals {
 
140
        xt_mutex_type           cg_lock;                                                /* The public cache lock. */
 
141
        DcSegmentRec            cg_segment[IDX_CAC_SEGMENT_COUNT];
 
142
        XTIndBlockPtr           cg_blocks;
 
143
#ifdef XT_USE_DIRECT_IO_ON_INDEX
 
144
        xtWord1                         *cg_buffer;
 
145
#endif
 
146
        XTIndBlockPtr           cg_free_list;
 
147
        xtWord4                         cg_free_count;
 
148
        xtWord4                         cg_ru_now;                                              /* A counter as described by Jim Starkey (my thanks) */
 
149
        XTIndBlockPtr           cg_lru_block;
 
150
        XTIndBlockPtr           cg_mru_block;
 
151
        xtWord4                         cg_hash_size;
 
152
        xtWord4                         cg_block_count;
 
153
        xtWord4                         cg_max_free;
 
154
#ifdef DEBUG_CHECK_IND_CACHE
 
155
        u_int                           cg_reserved_by_ots;                             /* Number of blocks reserved by open tables. */
 
156
        u_int                           cg_read_count;                                  /* Number of blocks being read. */
 
157
#endif
 
158
 
 
159
        /* Index cache handles: */
 
160
        DcHandleSlotRec         cg_handle_slot[XT_HANDLE_SLOTS];
 
161
} DcGlobalsRec;
 
162
 
 
163
static DcGlobalsRec     ind_cac_globals;
 
164
 
 
165
#ifdef XT_USE_MYSYS
 
166
#ifdef xtPublic
 
167
#undef xtPublic
 
168
#endif
 
169
#include "my_global.h"
 
170
#include "my_sys.h"
 
171
#include "keycache.h"
 
172
KEY_CACHE my_cache;
 
173
#undef  pthread_rwlock_rdlock
 
174
#undef  pthread_rwlock_wrlock
 
175
#undef  pthread_rwlock_try_wrlock
 
176
#undef  pthread_rwlock_unlock
 
177
#undef  pthread_mutex_lock
 
178
#undef  pthread_mutex_unlock
 
179
#undef  pthread_cond_wait
 
180
#undef  pthread_cond_broadcast
 
181
#undef  xt_mutex_type
 
182
#define xtPublic
 
183
#endif
 
184
 
 
185
/*
 
186
 * -----------------------------------------------------------------------
 
187
 * INDEX CACHE HANDLES
 
188
 */
 
189
 
 
190
static XTIndHandlePtr ind_alloc_handle()
 
191
{
 
192
        XTIndHandlePtr handle;
 
193
 
 
194
        if (!(handle = (XTIndHandlePtr) xt_calloc_ns(sizeof(XTIndHandleRec))))
 
195
                return NULL;
 
196
        xt_spinlock_init_with_autoname(NULL, &handle->ih_lock);
 
197
        return handle;
 
198
}
 
199
 
 
200
static void ind_free_handle(XTIndHandlePtr handle)
 
201
{
 
202
        xt_spinlock_free(NULL, &handle->ih_lock);
 
203
        xt_free_ns(handle);
 
204
}
 
205
 
 
206
static void ind_handle_exit(XTThreadPtr self)
 
207
{
 
208
        DcHandleSlotPtr         hs;
 
209
        XTIndHandlePtr          handle;
 
210
        XTIndHandleBlockPtr     hptr;
 
211
 
 
212
        for (int i=0; i<XT_HANDLE_SLOTS; i++) {
 
213
                hs = &ind_cac_globals.cg_handle_slot[i];
 
214
 
 
215
                while (hs->hs_used_handles) {
 
216
                        handle = hs->hs_used_handles;
 
217
                        xt_ind_release_handle(handle, FALSE, self);
 
218
                }
 
219
 
 
220
                while (hs->hs_free_blocks) {
 
221
                        hptr = hs->hs_free_blocks;
 
222
                        hs->hs_free_blocks = hptr->hb_next;
 
223
                        xt_free(self, hptr);
 
224
                }
 
225
 
 
226
                while (hs->hs_free_handles) {
 
227
                        handle = hs->hs_free_handles;
 
228
                        hs->hs_free_handles = handle->ih_next;
 
229
                        ind_free_handle(handle);
 
230
                }
 
231
 
 
232
                ID_HANDLE_FREE_LOCK(self, &hs->hs_handles_lock);
 
233
        }
 
234
}
 
235
 
 
236
static void ind_handle_init(XTThreadPtr self)
 
237
{
 
238
        DcHandleSlotPtr         hs;
 
239
 
 
240
        for (int i=0; i<XT_HANDLE_SLOTS; i++) {
 
241
                hs = &ind_cac_globals.cg_handle_slot[i];
 
242
                memset(hs, 0, sizeof(DcHandleSlotRec));
 
243
                ID_HANDLE_INIT_LOCK(self, &hs->hs_handles_lock);
 
244
        }
 
245
}
 
246
 
 
247
//#define CHECK_HANDLE_STRUCTS
 
248
 
 
249
#ifdef CHECK_HANDLE_STRUCTS
 
250
static int gdummy = 0;
 
251
 
 
252
static void ic_stop_here()
 
253
{
 
254
        gdummy = gdummy + 1;
 
255
        printf("Nooo %d!\n", gdummy);
 
256
}
 
257
 
 
258
static void ic_check_handle_structs()
 
259
{
 
260
        XTIndHandlePtr          handle, phandle;
 
261
        XTIndHandleBlockPtr     hptr, phptr;
 
262
        int                                     count = 0;
 
263
        int                                     ctest;
 
264
 
 
265
        phandle = NULL;
 
266
        handle = ind_cac_globals.cg_used_handles;
 
267
        while (handle) {
 
268
                if (handle == phandle)
 
269
                        ic_stop_here();
 
270
                if (handle->ih_prev != phandle)
 
271
                        ic_stop_here();
 
272
                if (handle->ih_cache_reference) {
 
273
                        ctest = handle->x.ih_cache_block->cb_handle_count;
 
274
                        if (ctest == 0 || ctest > 100)
 
275
                                ic_stop_here();
 
276
                }
 
277
                else {
 
278
                        ctest = handle->x.ih_handle_block->hb_ref_count;
 
279
                        if (ctest == 0 || ctest > 100)
 
280
                                ic_stop_here();
 
281
                }
 
282
                phandle = handle;
 
283
                handle = handle->ih_next;
 
284
                count++;
 
285
                if (count > 1000)
 
286
                        ic_stop_here();
 
287
        }
 
288
 
 
289
        count = 0;
 
290
        hptr = ind_cac_globals.cg_free_blocks;
 
291
        while (hptr) {
 
292
                if (hptr == phptr)
 
293
                        ic_stop_here();
 
294
                phptr = hptr;
 
295
                hptr = hptr->hb_next;
 
296
                count++;
 
297
                if (count > 1000)
 
298
                        ic_stop_here();
 
299
        }
 
300
 
 
301
        count = 0;
 
302
        handle = ind_cac_globals.cg_free_handles;
 
303
        while (handle) {
 
304
                if (handle == phandle)
 
305
                        ic_stop_here();
 
306
                phandle = handle;
 
307
                handle = handle->ih_next;
 
308
                count++;
 
309
                if (count > 1000)
 
310
                        ic_stop_here();
 
311
        }
 
312
}
 
313
#endif
 
314
 
 
315
/*
 
316
 * Get a handle to the index block.
 
317
 * This function is called by index scanners (readers).
 
318
 */
 
319
xtPublic XTIndHandlePtr xt_ind_get_handle(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref)
 
320
{
 
321
        DcHandleSlotPtr hs;
 
322
        XTIndHandlePtr  handle;
 
323
 
 
324
        hs = &ind_cac_globals.cg_handle_slot[iref->ir_block->cb_address % XT_HANDLE_SLOTS];
 
325
 
 
326
        ASSERT_NS(iref->ir_xlock == FALSE);
 
327
        ASSERT_NS(iref->ir_updated == FALSE);
 
328
        ID_HANDLE_LOCK(&hs->hs_handles_lock);
 
329
#ifdef CHECK_HANDLE_STRUCTS
 
330
        ic_check_handle_structs();
 
331
#endif
 
332
        if ((handle = hs->hs_free_handles))
 
333
                hs->hs_free_handles = handle->ih_next;
 
334
        else {
 
335
                if (!(handle = ind_alloc_handle())) {
 
336
                        ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
 
337
                        xt_ind_release(ot, ind, XT_UNLOCK_READ, iref);
 
338
                        return NULL;
 
339
                }
 
340
        }
 
341
        if (hs->hs_used_handles)
 
342
                hs->hs_used_handles->ih_prev = handle;
 
343
        handle->ih_next = hs->hs_used_handles;
 
344
        handle->ih_prev = NULL;
 
345
        handle->ih_address = iref->ir_block->cb_address;
 
346
        handle->ih_cache_reference = TRUE;
 
347
        handle->x.ih_cache_block = iref->ir_block;
 
348
        handle->ih_branch = iref->ir_branch;
 
349
        /* {HANDLE-COUNT-USAGE}
 
350
         * This is safe because:
 
351
         *
 
352
         * I have an Slock on the cache block, and I have
 
353
         * at least an Slock on the index.
 
354
         * So this excludes anyone who is reading 
 
355
         * cb_handle_count in the index.
 
356
         * (all cache block writers, and the freeer).
 
357
         *
 
358
         * The increment is safe because I have the list
 
359
         * lock (hs_handles_lock), which is required by anyone else
 
360
         * who increments or decrements this value.
 
361
         */
 
362
        iref->ir_block->cb_handle_count++;
 
363
        hs->hs_used_handles = handle;
 
364
#ifdef CHECK_HANDLE_STRUCTS
 
365
        ic_check_handle_structs();
 
366
#endif
 
367
        ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
 
368
        xt_ind_release(ot, ind, XT_UNLOCK_READ, iref);
 
369
        return handle;
 
370
}
 
371
 
 
372
xtPublic void xt_ind_release_handle(XTIndHandlePtr handle, xtBool have_lock, XTThreadPtr thread)
 
373
{
 
374
        DcHandleSlotPtr hs;
 
375
        XTIndBlockPtr   block = NULL;
 
376
        u_int                   hash_idx = 0;
 
377
        DcSegmentPtr    seg = NULL;
 
378
        XTIndBlockPtr   xblock;
 
379
 
 
380
        (void) thread; /*DRIZZLED*/
 
381
 
 
382
        /* The lock order is:
 
383
         * 1. Cache segment (cs_lock) - This is only by ind_free_block()!
 
384
         * 1. S/Slock cache block (cb_lock)
 
385
         * 2. List lock (cg_handles_lock).
 
386
         * 3. Handle lock (ih_lock)
 
387
         */
 
388
        if (!have_lock)
 
389
                xt_spinlock_lock(&handle->ih_lock);
 
390
 
 
391
        /* Get the lock on the cache page if required: */
 
392
        if (handle->ih_cache_reference) {
 
393
                u_int                   file_id;
 
394
                xtIndexNodeID   address;
 
395
 
 
396
                block = handle->x.ih_cache_block;
 
397
 
 
398
                file_id = block->cb_file_id;
 
399
                address = block->cb_address;
 
400
                hash_idx = XT_NODE_ID(address) + (file_id * 223);
 
401
                seg = &ind_cac_globals.cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
 
402
                hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % ind_cac_globals.cg_hash_size;
 
403
        }
 
404
 
 
405
        xt_spinlock_unlock(&handle->ih_lock);
 
406
 
 
407
        /* Because of the lock order, I have to release the
 
408
         * handle before I get a lock on the cache block.
 
409
         *
 
410
         * But, by doing this, this cache block may be gone!
 
411
         */
 
412
        if (block) {
 
413
                IDX_CAC_READ_LOCK(seg, thread);
 
414
                xblock = seg->cs_hash_table[hash_idx];
 
415
                while (xblock) {
 
416
                        if (block == xblock) {
 
417
                                /* Found the block... 
 
418
                                 * {HANDLE-COUNT-SLOCK}
 
419
                                 * 04.05.2009, changed to slock.
 
420
                                 * The xlock causes too much contention
 
421
                                 * on the cache block for read only loads.
 
422
                                 *
 
423
                                 * Is it safe?
 
424
                                 * See below...
 
425
                                 */
 
426
                                XT_IPAGE_READ_LOCK(&block->cb_lock);
 
427
                                goto block_found;
 
428
                        }
 
429
                        xblock = xblock->cb_next;
 
430
                }
 
431
                block = NULL;
 
432
                block_found:
 
433
                IDX_CAC_UNLOCK(seg, thread);
 
434
        }
 
435
 
 
436
        hs = &ind_cac_globals.cg_handle_slot[handle->ih_address % XT_HANDLE_SLOTS];
 
437
 
 
438
        ID_HANDLE_LOCK(&hs->hs_handles_lock);
 
439
#ifdef CHECK_HANDLE_STRUCTS
 
440
        ic_check_handle_structs();
 
441
#endif
 
442
 
 
443
        /* I don't need to lock the handle because I have locked
 
444
         * the list, and no other thread can change the
 
445
         * handle without first getting a lock on the list.
 
446
         *
 
447
         * In addition, the caller is the only owner of the
 
448
         * handle, and the only thread with an independent
 
449
         * reference to the handle.
 
450
         * All other access occur over the list.
 
451
         */
 
452
 
 
453
        /* Remove the reference to the cache or a handle block: */
 
454
        if (handle->ih_cache_reference) {
 
455
                ASSERT_NS(block == handle->x.ih_cache_block);
 
456
                ASSERT_NS(block && block->cb_handle_count > 0);
 
457
                /* {HANDLE-COUNT-USAGE}
 
458
                 * This is safe here because I have excluded
 
459
                 * all readers by taking an Xlock on the
 
460
                 * cache block (CHANGED - see below).
 
461
                 *
 
462
                 * {HANDLE-COUNT-SLOCK}
 
463
                 * 04.05.2009, changed to slock.
 
464
                 * Should be OK, because:
 
465
                 * A have a lock on the list lock (hs_handles_lock),
 
466
                 * which prevents concurrent updates to cb_handle_count.
 
467
                 *
 
468
                 * I have also have a read lock on the cache block
 
469
                 * but not a lock on the index. As a result, we cannot
 
470
                 * excluded all index writers (and readers of 
 
471
                 * cb_handle_count.
 
472
                 */
 
473
                block->cb_handle_count--;
 
474
        }
 
475
        else {
 
476
                XTIndHandleBlockPtr     hptr = handle->x.ih_handle_block;
 
477
 
 
478
                ASSERT_NS(!handle->ih_cache_reference);
 
479
                ASSERT_NS(hptr->hb_ref_count > 0);
 
480
                hptr->hb_ref_count--;
 
481
                if (!hptr->hb_ref_count) {
 
482
                        /* Put it back on the free list: */
 
483
                        hptr->hb_next = hs->hs_free_blocks;
 
484
                        hs->hs_free_blocks = hptr;
 
485
                }
 
486
        }
 
487
 
 
488
        /* Unlink the handle: */
 
489
        if (handle->ih_next)
 
490
                handle->ih_next->ih_prev = handle->ih_prev;
 
491
        if (handle->ih_prev)
 
492
                handle->ih_prev->ih_next = handle->ih_next;
 
493
        if (hs->hs_used_handles == handle)
 
494
                hs->hs_used_handles = handle->ih_next;
 
495
 
 
496
        /* Put it on the free list: */
 
497
        handle->ih_next = hs->hs_free_handles;
 
498
        hs->hs_free_handles = handle;
 
499
 
 
500
#ifdef CHECK_HANDLE_STRUCTS
 
501
        ic_check_handle_structs();
 
502
#endif
 
503
        ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
 
504
 
 
505
        if (block)
 
506
                XT_IPAGE_UNLOCK(&block->cb_lock, FALSE);
 
507
}
 
508
 
 
509
/* Call this function before a referenced cache block is modified!
 
510
 * This function is called by index updaters.
 
511
 */
 
512
xtPublic xtBool xt_ind_copy_on_write(XTIndReferencePtr iref)
 
513
{
 
514
        DcHandleSlotPtr         hs;
 
515
        XTIndHandleBlockPtr     hptr;
 
516
        u_int                           branch_size;
 
517
        XTIndHandlePtr          handle;
 
518
        u_int                           i = 0;
 
519
 
 
520
        hs = &ind_cac_globals.cg_handle_slot[iref->ir_block->cb_address % XT_HANDLE_SLOTS];
 
521
 
 
522
        ID_HANDLE_LOCK(&hs->hs_handles_lock);
 
523
 
 
524
        /* {HANDLE-COUNT-USAGE}
 
525
         * This is only called by updaters of this index block, or
 
526
         * the free which holds an Xlock on the index block.
 
527
         * These are all mutually exclusive for the index block.
 
528
         *
 
529
         * {HANDLE-COUNT-SLOCK}
 
530
         * Do this check again, after we have the list lock (hs_handles_lock).
 
531
         * There is a small chance that the count has changed, since we last
 
532
         * checked because xt_ind_release_handle() only holds
 
533
         * an slock on the index page.
 
534
         *
 
535
         * An updater can sometimes have a XLOCK on the index and an slock
 
536
         * on the cache block. In this case xt_ind_release_handle()
 
537
         * could have run through.
 
538
         */
 
539
        if (!iref->ir_block->cb_handle_count) {
 
540
                ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
 
541
                return OK;
 
542
        }
 
543
 
 
544
#ifdef CHECK_HANDLE_STRUCTS
 
545
        ic_check_handle_structs();
 
546
#endif
 
547
        if ((hptr = hs->hs_free_blocks))
 
548
                hs->hs_free_blocks = hptr->hb_next;
 
549
        else {
 
550
                if (!(hptr = (XTIndHandleBlockPtr) xt_malloc_ns(sizeof(XTIndHandleBlockRec)))) {
 
551
                        ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
 
552
                        return FAILED;
 
553
                }
 
554
        }
 
555
 
 
556
        branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref->ir_branch->tb_size_2));
 
557
        memcpy(&hptr->hb_branch, iref->ir_branch, branch_size);
 
558
        hptr->hb_ref_count = iref->ir_block->cb_handle_count;
 
559
 
 
560
        handle = hs->hs_used_handles;
 
561
        while (handle) {
 
562
                if (handle->ih_branch == iref->ir_branch) {
 
563
                        i++;
 
564
                        xt_spinlock_lock(&handle->ih_lock);
 
565
                        ASSERT_NS(handle->ih_cache_reference);
 
566
                        handle->ih_cache_reference = FALSE;
 
567
                        handle->x.ih_handle_block = hptr;
 
568
                        handle->ih_branch = &hptr->hb_branch;
 
569
                        xt_spinlock_unlock(&handle->ih_lock);
 
570
#ifndef DEBUG
 
571
                        if (i == hptr->hb_ref_count)
 
572
                                break;
 
573
#endif
 
574
                }
 
575
                handle = handle->ih_next;
 
576
        }
 
577
#ifdef DEBUG
 
578
        ASSERT_NS(hptr->hb_ref_count == i);
 
579
#endif
 
580
        /* {HANDLE-COUNT-USAGE}
 
581
         * It is safe to modify cb_handle_count when I have the
 
582
         * list lock, and I have excluded all readers!
 
583
         */
 
584
        iref->ir_block->cb_handle_count = 0;
 
585
#ifdef CHECK_HANDLE_STRUCTS
 
586
        ic_check_handle_structs();
 
587
#endif
 
588
        ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
 
589
 
 
590
        return OK;
 
591
}
 
592
 
 
593
xtPublic void xt_ind_lock_handle(XTIndHandlePtr handle)
 
594
{
 
595
        xt_spinlock_lock(&handle->ih_lock);
 
596
}
 
597
 
 
598
xtPublic void xt_ind_unlock_handle(XTIndHandlePtr handle)
 
599
{
 
600
        xt_spinlock_unlock(&handle->ih_lock);
 
601
}
 
602
 
 
603
/*
 
604
 * -----------------------------------------------------------------------
 
605
 * INIT/EXIT
 
606
 */
 
607
 
 
608
/*
 
609
 * Initialize the disk cache.
 
610
 */
 
611
xtPublic void xt_ind_init(XTThreadPtr self, size_t cache_size)
 
612
{
 
613
        XTIndBlockPtr   block;
 
614
 
 
615
#ifdef XT_USE_MYSYS
 
616
        init_key_cache(&my_cache, 1024, cache_size, 100, 300);
 
617
#endif
 
618
        /* Memory is devoted to the page data alone, I no longer count the size of the directory,
 
619
         * or the page overhead: */
 
620
        ind_cac_globals.cg_block_count = cache_size / XT_INDEX_PAGE_SIZE;
 
621
        ind_cac_globals.cg_hash_size = ind_cac_globals.cg_block_count / (IDX_CAC_SEGMENT_COUNT >> 1);
 
622
        ind_cac_globals.cg_max_free = ind_cac_globals.cg_block_count / 10;
 
623
        if (ind_cac_globals.cg_max_free < 8)
 
624
                ind_cac_globals.cg_max_free = 8;
 
625
        if (ind_cac_globals.cg_max_free > 128)
 
626
                ind_cac_globals.cg_max_free = 128;
 
627
 
 
628
        try_(a) {
 
629
                for (u_int i=0; i<IDX_CAC_SEGMENT_COUNT; i++) {
 
630
                        ind_cac_globals.cg_segment[i].cs_hash_table = (XTIndBlockPtr *) xt_calloc(self, ind_cac_globals.cg_hash_size * sizeof(XTIndBlockPtr));
 
631
                        IDX_CAC_INIT_LOCK(self, &ind_cac_globals.cg_segment[i]);
 
632
                }
 
633
 
 
634
                block = (XTIndBlockPtr) xt_malloc(self, ind_cac_globals.cg_block_count * sizeof(XTIndBlockRec));
 
635
                ind_cac_globals.cg_blocks = block;
 
636
                xt_init_mutex_with_autoname(self, &ind_cac_globals.cg_lock);
 
637
#ifdef XT_USE_DIRECT_IO_ON_INDEX
 
638
                xtWord1 *buffer;
 
639
#ifdef XT_WIN
 
640
                size_t  psize = 512;
 
641
#else
 
642
                size_t  psize = getpagesize();
 
643
#endif
 
644
                size_t  diff;
 
645
 
 
646
                buffer = (xtWord1 *) xt_malloc(self, (ind_cac_globals.cg_block_count * XT_INDEX_PAGE_SIZE));
 
647
                diff = (size_t) buffer % psize;
 
648
                if (diff != 0) {
 
649
                        xt_free(self, buffer);
 
650
                        buffer = (xtWord1 *) xt_malloc(self, (ind_cac_globals.cg_block_count * XT_INDEX_PAGE_SIZE) + psize);
 
651
                        diff = (size_t) buffer % psize;
 
652
                        if (diff != 0)
 
653
                                diff = psize - diff;
 
654
                }
 
655
                ind_cac_globals.cg_buffer = buffer;
 
656
                buffer += diff;
 
657
#endif
 
658
 
 
659
                for (u_int i=0; i<ind_cac_globals.cg_block_count; i++) {
 
660
                        XT_IPAGE_INIT_LOCK(self, &block->cb_lock);
 
661
                        block->cb_state = IDX_CAC_BLOCK_FREE;
 
662
                        block->cb_next = ind_cac_globals.cg_free_list;
 
663
#ifdef XT_USE_DIRECT_IO_ON_INDEX
 
664
                        block->cb_data = buffer;
 
665
                        buffer += XT_INDEX_PAGE_SIZE;
 
666
#endif
 
667
#ifdef CHECK_BLOCK_TRAILERS
 
668
                        XT_SET_DISK_4(block->cp_check, 0xDEADBEEF);
 
669
#endif
 
670
                        ind_cac_globals.cg_free_list = block;
 
671
                        block++;
 
672
                }
 
673
                ind_cac_globals.cg_free_count = ind_cac_globals.cg_block_count;
 
674
#ifdef DEBUG_CHECK_IND_CACHE
 
675
                ind_cac_globals.cg_reserved_by_ots = 0;
 
676
#endif
 
677
                ind_handle_init(self);
 
678
        }
 
679
        catch_(a) {
 
680
                xt_ind_exit(self);
 
681
                throw_();
 
682
        }
 
683
        cont_(a);
 
684
}
 
685
 
 
686
#ifdef CHECK_BLOCK_TRAILERS
 
687
xtPublic void check_block_trailers()
 
688
{
 
689
        XTIndBlockPtr   block;
 
690
 
 
691
        block = ind_cac_globals.cg_blocks;
 
692
        for (u_int i=0; i<ind_cac_globals.cg_block_count; i++) {
 
693
                ASSERT_NS(XT_GET_DISK_4(block->cp_check) == 0xDEADBEEF);
 
694
                block++;
 
695
        }
 
696
}
 
697
#endif
 
698
 
 
699
xtPublic void xt_ind_exit(XTThreadPtr self)
 
700
{
 
701
#ifdef XT_USE_MYSYS
 
702
        end_key_cache(&my_cache, 1);
 
703
#endif
 
704
        for (u_int i=0; i<IDX_CAC_SEGMENT_COUNT; i++) {
 
705
                if (ind_cac_globals.cg_segment[i].cs_hash_table) {
 
706
                        xt_free(self, ind_cac_globals.cg_segment[i].cs_hash_table);
 
707
                        ind_cac_globals.cg_segment[i].cs_hash_table = NULL;
 
708
                        IDX_CAC_FREE_LOCK(self, &ind_cac_globals.cg_segment[i]);
 
709
                }
 
710
        }
 
711
 
 
712
        /* Must be done before freeing the blocks! */
 
713
        ind_handle_exit(self);
 
714
 
 
715
        if (ind_cac_globals.cg_blocks) {
 
716
                xt_free(self, ind_cac_globals.cg_blocks);
 
717
                ind_cac_globals.cg_blocks = NULL;
 
718
                xt_free_mutex(&ind_cac_globals.cg_lock);
 
719
        }
 
720
#ifdef XT_USE_DIRECT_IO_ON_INDEX
 
721
        if (ind_cac_globals.cg_buffer) {
 
722
                xt_free(self, ind_cac_globals.cg_buffer);
 
723
                ind_cac_globals.cg_buffer = NULL;
 
724
        }
 
725
#endif
 
726
 
 
727
        memset(&ind_cac_globals, 0, sizeof(ind_cac_globals));
 
728
}
 
729
 
 
730
xtPublic xtInt8 xt_ind_get_usage()
 
731
{
 
732
        xtInt8 size = 0;
 
733
 
 
734
        size = (xtInt8) (ind_cac_globals.cg_block_count - ind_cac_globals.cg_free_count) * (xtInt8) XT_INDEX_PAGE_SIZE;
 
735
        return size;
 
736
}
 
737
 
 
738
xtPublic xtInt8 xt_ind_get_size()
 
739
{
 
740
        xtInt8 size = 0;
 
741
 
 
742
        size = (xtInt8) ind_cac_globals.cg_block_count * (xtInt8) XT_INDEX_PAGE_SIZE;
 
743
        return size;
 
744
}
 
745
 
 
746
xtPublic u_int xt_ind_get_blocks()
 
747
{
 
748
        return ind_cac_globals.cg_block_count;
 
749
}
 
750
 
 
751
/*
 
752
 * -----------------------------------------------------------------------
 
753
 * INDEX CHECKING
 
754
 */
 
755
 
 
756
xtPublic void xt_ind_check_cache(XTIndexPtr ind)
 
757
{
 
758
        XTIndBlockPtr   block;
 
759
        u_int                   free_count, inuse_count, clean_count;
 
760
        xtBool                  check_count = FALSE;
 
761
 
 
762
        if (ind == (XTIndex *) 1) {
 
763
                ind = NULL;
 
764
                check_count = TRUE;
 
765
        }
 
766
 
 
767
        // Check the dirty list:
 
768
        if (ind) {
 
769
                u_int cnt = 0;
 
770
 
 
771
                block = ind->mi_dirty_list;
 
772
                while (block) {
 
773
                        cnt++;
 
774
                        ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_DIRTY);
 
775
                        block = block->cb_dirty_next;
 
776
                }
 
777
                ASSERT_NS(ind->mi_dirty_blocks == cnt);
 
778
        }
 
779
 
 
780
        xt_lock_mutex_ns(&ind_cac_globals.cg_lock);
 
781
 
 
782
        // Check the free list:
 
783
        free_count = 0;
 
784
        block = ind_cac_globals.cg_free_list;
 
785
        while (block) {
 
786
                free_count++;
 
787
                ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_FREE);
 
788
                block = block->cb_next;
 
789
        }
 
790
        ASSERT_NS(ind_cac_globals.cg_free_count == free_count);
 
791
 
 
792
        /* Check the LRU list: */
 
793
        XTIndBlockPtr list_block, plist_block;
 
794
        
 
795
        plist_block = NULL;
 
796
        list_block = ind_cac_globals.cg_lru_block;
 
797
        if (list_block) {
 
798
                ASSERT_NS(ind_cac_globals.cg_mru_block != NULL);
 
799
                ASSERT_NS(ind_cac_globals.cg_mru_block->cb_mr_used == NULL);
 
800
                ASSERT_NS(list_block->cb_lr_used == NULL);
 
801
                inuse_count = 0;
 
802
                clean_count = 0;
 
803
                while (list_block) {
 
804
                        inuse_count++;
 
805
                        ASSERT_NS(IDX_CAC_NOT_FREE(list_block->cb_state));
 
806
                        if (list_block->cb_state == IDX_CAC_BLOCK_CLEAN)
 
807
                                clean_count++;
 
808
                        ASSERT_NS(block != list_block);
 
809
                        ASSERT_NS(list_block->cb_lr_used == plist_block);
 
810
                        plist_block = list_block;
 
811
                        list_block = list_block->cb_mr_used;
 
812
                }
 
813
                ASSERT_NS(ind_cac_globals.cg_mru_block == plist_block);
 
814
        }
 
815
        else {
 
816
                inuse_count = 0;
 
817
                clean_count = 0;
 
818
                ASSERT_NS(ind_cac_globals.cg_mru_block == NULL);
 
819
        }
 
820
 
 
821
#ifdef DEBUG_CHECK_IND_CACHE
 
822
        ASSERT_NS(free_count + inuse_count + ind_cac_globals.cg_reserved_by_ots + ind_cac_globals.cg_read_count == ind_cac_globals.cg_block_count);
 
823
#endif
 
824
        xt_unlock_mutex_ns(&ind_cac_globals.cg_lock);
 
825
        if (check_count) {
 
826
                /* We have just flushed, check how much is now free/clean. */
 
827
                if (free_count + clean_count < 10) {
 
828
                        /* This could be a problem: */
 
829
                        printf("Cache very low!\n");
 
830
                }
 
831
        }
 
832
}
 
833
 
 
834
/*
 
835
 * -----------------------------------------------------------------------
 
836
 * FREEING INDEX CACHE
 
837
 */
 
838
 
 
839
/*
 
840
 * This function return TRUE if the block is freed. 
 
841
 * This function returns FALSE if the block cannot be found, or the
 
842
 * block is not clean.
 
843
 *
 
844
 * We also return FALSE if we cannot copy the block to the handle
 
845
 * (if this is required). This will be due to out-of-memory!
 
846
 */
 
847
static xtBool ind_free_block(XTOpenTablePtr ot, XTIndBlockPtr block)
 
848
{
 
849
        XTIndBlockPtr   xblock, pxblock;
 
850
        u_int                   hash_idx;
 
851
        u_int                   file_id;
 
852
        xtIndexNodeID   address;
 
853
        DcSegmentPtr    seg;
 
854
 
 
855
        (void) ot; /*DRIZZLED*/
 
856
 
 
857
#ifdef DEBUG_CHECK_IND_CACHE
 
858
        xt_ind_check_cache(NULL);
 
859
#endif
 
860
        file_id = block->cb_file_id;
 
861
        address = block->cb_address;
 
862
 
 
863
        hash_idx = XT_NODE_ID(address) + (file_id * 223);
 
864
        seg = &ind_cac_globals.cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
 
865
        hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % ind_cac_globals.cg_hash_size;
 
866
 
 
867
        IDX_CAC_WRITE_LOCK(seg, ot->ot_thread);
 
868
 
 
869
        pxblock = NULL;
 
870
        xblock = seg->cs_hash_table[hash_idx];
 
871
        while (xblock) {
 
872
                if (block == xblock) {
 
873
                        /* Found the block... */
 
874
                        /* It is possible that a thread enters this code holding a
 
875
                         * lock on a page. This can cause a deadlock:
 
876
                         *
 
877
                         * #0   0x91faa2ce in semaphore_wait_signal_trap
 
878
                         * #1   0x91fb1da5 in pthread_mutex_lock
 
879
                         * #2   0x00e2ec13 in xt_p_mutex_lock at pthread_xt.cc:544
 
880
                         * #3   0x00e6c30a in xt_xsmutex_xlock at lock_xt.cc:1547
 
881
                         * #4   0x00dee402 in ind_free_block at cache_xt.cc:879
 
882
                         * #5   0x00dee76a in ind_cac_free_lru_blocks at cache_xt.cc:1033
 
883
                         * #6   0x00def8d1 in xt_ind_reserve at cache_xt.cc:1513
 
884
                         * #7   0x00e22118 in xt_idx_insert at index_xt.cc:2047
 
885
                         * #8   0x00e4d7ee in xt_tab_new_record at table_xt.cc:4702
 
886
                         * #9   0x00e0ff0b in ha_pbxt::write_row at ha_pbxt.cc:2340
 
887
                         * #10  0x0023a00f in handler::ha_write_row at handler.cc:4570
 
888
                         * #11  0x001a32c8 in write_record at sql_insert.cc:1568
 
889
                         * #12  0x001ab635 in mysql_insert at sql_insert.cc:812
 
890
                         * #13  0x0010e068 in mysql_execute_command at sql_parse.cc:3066
 
891
                         * #14  0x0011480d in mysql_parse at sql_parse.cc:5787
 
892
                         * #15  0x00115afb in dispatch_command at sql_parse.cc:1200
 
893
                         * #16  0x00116de2 in do_command at sql_parse.cc:857
 
894
                         * #17  0x00101ee4 in handle_one_connection at sql_connect.cc:1115
 
895
                         * #18  0x91fdb155 in _pthread_start
 
896
                         * #19  0x91fdb012 in thread_start
 
897
                         * 
 
898
                         * #0   0x91fb146e in __semwait_signal
 
899
                         * #1   0x91fb12ef in nanosleep$UNIX2003
 
900
                         * #2   0x91fb1236 in usleep$UNIX2003
 
901
                         * #3   0x00e52112 in xt_yield at thread_xt.cc:1274
 
902
                         * #4   0x00e6c0eb in xt_spinxslock_xlock at lock_xt.cc:1456
 
903
                         * #5   0x00dee444 in ind_free_block at cache_xt.cc:886
 
904
                         * #6   0x00dee76a in ind_cac_free_lru_blocks at cache_xt.cc:1033
 
905
                         * #7   0x00deeaf0 in ind_cac_fetch at cache_xt.cc:1130
 
906
                         * #8   0x00def604 in xt_ind_fetch at cache_xt.cc:1386
 
907
                         * #9   0x00e2159a in xt_idx_update_row_id at index_xt.cc:2489
 
908
                         * #10  0x00e603c8 in xn_sw_clean_indices at xaction_xt.cc:1932
 
909
                         * #11  0x00e606d4 in xn_sw_cleanup_variation at xaction_xt.cc:2056
 
910
                         * #12  0x00e60e29 in xn_sw_cleanup_xact at xaction_xt.cc:2276
 
911
                         * #13  0x00e615ed in xn_sw_main at xaction_xt.cc:2433
 
912
                         * #14  0x00e61919 in xn_sw_run_thread at xaction_xt.cc:2564
 
913
                         * #15  0x00e53f80 in thr_main at thread_xt.cc:1017
 
914
                         * #16  0x91fdb155 in _pthread_start
 
915
                         * #17  0x91fdb012 in thread_start
 
916
                         *
 
917
                         * So we back off if a lock is held!
 
918
                         */
 
919
                        if (!XT_IPAGE_WRITE_TRY_LOCK(&block->cb_lock, ot->ot_thread->t_id)) {
 
920
                                IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
921
#ifdef DEBUG_CHECK_IND_CACHE
 
922
                                xt_ind_check_cache(NULL);
 
923
#endif
 
924
                                return FALSE;
 
925
                        }
 
926
                        if (block->cb_state != IDX_CAC_BLOCK_CLEAN) {
 
927
                                /* This block cannot be freeed: */
 
928
                                XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
 
929
                                IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
930
#ifdef DEBUG_CHECK_IND_CACHE
 
931
                                xt_ind_check_cache(NULL);
 
932
#endif
 
933
                                return FALSE;
 
934
                        }
 
935
                        
 
936
                        goto free_the_block;
 
937
                }
 
938
                pxblock = xblock;
 
939
                xblock = xblock->cb_next;
 
940
        }
 
941
 
 
942
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
943
 
 
944
        /* Not found (this can happen, if block was freed by another thread) */
 
945
#ifdef DEBUG_CHECK_IND_CACHE
 
946
        xt_ind_check_cache(NULL);
 
947
#endif
 
948
        return FALSE;
 
949
 
 
950
        free_the_block:
 
951
 
 
952
        /* If the block is reference by a handle, then we
 
953
         * have to copy the data to the handle before we
 
954
         * free the page:
 
955
         */
 
956
        /* {HANDLE-COUNT-USAGE}
 
957
         * This access is safe because:
 
958
         *
 
959
         * We have an Xlock on the cache block, which excludes
 
960
         * all other writers that want to change the cache block
 
961
         * and also all readers of the cache block, because
 
962
         * they all have at least an Slock on the cache block.
 
963
         */
 
964
        if (block->cb_handle_count) {
 
965
                XTIndReferenceRec       iref;
 
966
                
 
967
                iref.ir_xlock = TRUE;
 
968
                iref.ir_updated = FALSE;
 
969
                iref.ir_block = block;
 
970
                iref.ir_branch = (XTIdxBranchDPtr) block->cb_data;
 
971
                if (!xt_ind_copy_on_write(&iref)) {
 
972
                        XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
 
973
                        return FALSE;
 
974
                }
 
975
        }
 
976
 
 
977
        /* Block is clean, remove from the hash table: */
 
978
        if (pxblock)
 
979
                pxblock->cb_next = block->cb_next;
 
980
        else
 
981
                seg->cs_hash_table[hash_idx] = block->cb_next;
 
982
 
 
983
        xt_lock_mutex_ns(&ind_cac_globals.cg_lock);
 
984
 
 
985
        /* Remove from the MRU list: */
 
986
        if (ind_cac_globals.cg_lru_block == block)
 
987
                ind_cac_globals.cg_lru_block = block->cb_mr_used;
 
988
        if (ind_cac_globals.cg_mru_block == block)
 
989
                ind_cac_globals.cg_mru_block = block->cb_lr_used;
 
990
        
 
991
        /* Note, I am updating blocks for which I have no lock
 
992
         * here. But I think this is OK because I have a lock
 
993
         * for the MRU list.
 
994
         */
 
995
        if (block->cb_lr_used)
 
996
                block->cb_lr_used->cb_mr_used = block->cb_mr_used;
 
997
        if (block->cb_mr_used)
 
998
                block->cb_mr_used->cb_lr_used = block->cb_lr_used;
 
999
 
 
1000
        /* The block is now free: */
 
1001
        block->cb_next = ind_cac_globals.cg_free_list;
 
1002
        ind_cac_globals.cg_free_list = block;
 
1003
        ind_cac_globals.cg_free_count++;
 
1004
        ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN);
 
1005
        block->cb_state = IDX_CAC_BLOCK_FREE;
 
1006
        IDX_TRACE("%d- f%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(block->cb_data));
 
1007
 
 
1008
        /* Unlock BEFORE the block is reused! */
 
1009
        XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
 
1010
 
 
1011
        xt_unlock_mutex_ns(&ind_cac_globals.cg_lock);
 
1012
 
 
1013
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
1014
 
 
1015
#ifdef DEBUG_CHECK_IND_CACHE
 
1016
        xt_ind_check_cache(NULL);
 
1017
#endif
 
1018
        return TRUE;
 
1019
}
 
1020
 
 
1021
#define IND_CACHE_MAX_BLOCKS_TO_FREE            100
 
1022
 
 
1023
/*
 
1024
 * Return the number of blocks freed.
 
1025
 *
 
1026
 * The idea is to grab a list of blocks to free.
 
1027
 * The list consists of the LRU blocks that are
 
1028
 * clean.
 
1029
 *
 
1030
 * Free as many as possible (up to max of blocks_required)
 
1031
 * from the list, even if LRU position has changed
 
1032
 * (or we have a race if there are too few blocks).
 
1033
 * However, if the block cannot be found, or is dirty
 
1034
 * we must skip it.
 
1035
 *
 
1036
 * Repeat until we find no blocks for the list, or
 
1037
 * we have freed 'blocks_required'.
 
1038
 *
 
1039
 * 'not_this' is a block that must not be freed because
 
1040
 * it is locked by the calling thread!
 
1041
 */
 
1042
static u_int ind_cac_free_lru_blocks(XTOpenTablePtr ot, u_int blocks_required, XTIdxBranchDPtr not_this)
 
1043
{
 
1044
        register DcGlobalsRec   *dcg = &ind_cac_globals;
 
1045
        XTIndBlockPtr                   to_free[IND_CACHE_MAX_BLOCKS_TO_FREE];
 
1046
        int                                             count;
 
1047
        XTIndBlockPtr                   block;
 
1048
        u_int                                   blocks_freed = 0;
 
1049
        XTIndBlockPtr                   locked_block;
 
1050
 
 
1051
#ifdef XT_USE_DIRECT_IO_ON_INDEX
 
1052
#error This will not work!
 
1053
#endif
 
1054
        locked_block = (XTIndBlockPtr) ((xtWord1 *) not_this - offsetof(XTIndBlockRec, cb_data));
 
1055
 
 
1056
        retry:
 
1057
        xt_lock_mutex_ns(&ind_cac_globals.cg_lock);
 
1058
        block = dcg->cg_lru_block;
 
1059
        count = 0;
 
1060
        while (block && count < IND_CACHE_MAX_BLOCKS_TO_FREE) {
 
1061
                if (block != locked_block && block->cb_state == IDX_CAC_BLOCK_CLEAN) {
 
1062
                        to_free[count] = block;
 
1063
                        count++;
 
1064
                }
 
1065
                block = block->cb_mr_used;
 
1066
        }
 
1067
        xt_unlock_mutex_ns(&ind_cac_globals.cg_lock);
 
1068
 
 
1069
        if (!count)
 
1070
                return blocks_freed;
 
1071
 
 
1072
        for (int i=0; i<count; i++) {
 
1073
                if (ind_free_block(ot, to_free[i]))
 
1074
                        blocks_freed++;
 
1075
                if (blocks_freed >= blocks_required &&
 
1076
                        ind_cac_globals.cg_free_count >= ind_cac_globals.cg_max_free + blocks_required)
 
1077
                return blocks_freed;
 
1078
        }
 
1079
 
 
1080
        goto retry;
 
1081
}
 
1082
 
 
1083
/*
 
1084
 * -----------------------------------------------------------------------
 
1085
 * MAIN CACHE FUNCTIONS
 
1086
 */
 
1087
 
 
1088
/*
 
1089
 * Fetch the block. Note, if we are about to write the block
 
1090
 * then there is no need to read it from disk!
 
1091
 */
 
1092
static XTIndBlockPtr ind_cac_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, DcSegmentPtr *ret_seg, xtBool read_data)
 
1093
{
 
1094
        register XTOpenFilePtr  file = ot->ot_ind_file;
 
1095
        register XTIndBlockPtr  block, new_block;
 
1096
        register DcSegmentPtr   seg;
 
1097
        register u_int                  hash_idx;
 
1098
        register DcGlobalsRec   *dcg = &ind_cac_globals;
 
1099
        size_t                                  red_size;
 
1100
 
 
1101
#ifdef DEBUG_CHECK_IND_CACHE
 
1102
        xt_ind_check_cache(NULL);
 
1103
#endif
 
1104
        /* Address, plus file ID multiplied by my favorite prime number! */
 
1105
        hash_idx = XT_NODE_ID(address) + (file->fr_id * 223);
 
1106
        seg = &dcg->cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
 
1107
        hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % dcg->cg_hash_size;
 
1108
 
 
1109
        IDX_CAC_READ_LOCK(seg, ot->ot_thread);
 
1110
        block = seg->cs_hash_table[hash_idx];
 
1111
        while (block) {
 
1112
                if (XT_NODE_ID(block->cb_address) == XT_NODE_ID(address) && block->cb_file_id == file->fr_id) {
 
1113
                        ASSERT_NS(block->cb_state != IDX_CAC_BLOCK_FREE);
 
1114
 
 
1115
                        /* Check how recently this page has been used: */
 
1116
                        if (XT_TIME_DIFF(block->cb_ru_time, dcg->cg_ru_now) > (dcg->cg_block_count >> 1)) {
 
1117
                                xt_lock_mutex_ns(&dcg->cg_lock);
 
1118
 
 
1119
                                /* Move to the front of the MRU list: */
 
1120
                                block->cb_ru_time = ++dcg->cg_ru_now;
 
1121
                                if (dcg->cg_mru_block != block) {
 
1122
                                        /* Remove from the MRU list: */
 
1123
                                        if (dcg->cg_lru_block == block)
 
1124
                                                dcg->cg_lru_block = block->cb_mr_used;
 
1125
                                        if (block->cb_lr_used)
 
1126
                                                block->cb_lr_used->cb_mr_used = block->cb_mr_used;
 
1127
                                        if (block->cb_mr_used)
 
1128
                                                block->cb_mr_used->cb_lr_used = block->cb_lr_used;
 
1129
 
 
1130
                                        /* Make the block the most recently used: */
 
1131
                                        if ((block->cb_lr_used = dcg->cg_mru_block))
 
1132
                                                dcg->cg_mru_block->cb_mr_used = block;
 
1133
                                        block->cb_mr_used = NULL;
 
1134
                                        dcg->cg_mru_block = block;
 
1135
                                        if (!dcg->cg_lru_block)
 
1136
                                                dcg->cg_lru_block = block;
 
1137
                                }
 
1138
 
 
1139
                                xt_unlock_mutex_ns(&dcg->cg_lock);
 
1140
                        }
 
1141
                
 
1142
                        *ret_seg = seg;
 
1143
#ifdef DEBUG_CHECK_IND_CACHE
 
1144
                        xt_ind_check_cache(NULL);
 
1145
#endif
 
1146
                        ot->ot_thread->st_statistics.st_ind_cache_hit++;
 
1147
                        return block;
 
1148
                }
 
1149
                block = block->cb_next;
 
1150
        }
 
1151
        
 
1152
        /* Block not found... */
 
1153
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
1154
 
 
1155
        /* Check the open table reserve list first: */
 
1156
        if ((new_block = ot->ot_ind_res_bufs)) {
 
1157
                ot->ot_ind_res_bufs = new_block->cb_next;
 
1158
                ot->ot_ind_res_count--;
 
1159
#ifdef DEBUG_CHECK_IND_CACHE
 
1160
                xt_lock_mutex_ns(&dcg->cg_lock);
 
1161
                dcg->cg_reserved_by_ots--;
 
1162
                dcg->cg_read_count++;
 
1163
                xt_unlock_mutex_ns(&dcg->cg_lock);
 
1164
#endif
 
1165
                goto use_free_block;
 
1166
        }
 
1167
 
 
1168
        free_some_blocks:
 
1169
        if (!dcg->cg_free_list) {
 
1170
                if (!ind_cac_free_lru_blocks(ot, 1, NULL)) {
 
1171
                        if (!dcg->cg_free_list) {
 
1172
                                xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_INDEX_CACHE);
 
1173
#ifdef DEBUG_CHECK_IND_CACHE
 
1174
                                xt_ind_check_cache(NULL);
 
1175
#endif
 
1176
                                return NULL;
 
1177
                        }
 
1178
                }
 
1179
        }
 
1180
 
 
1181
        /* Get a free block: */
 
1182
        xt_lock_mutex_ns(&dcg->cg_lock);
 
1183
        if (!(new_block = dcg->cg_free_list)) {
 
1184
                xt_unlock_mutex_ns(&dcg->cg_lock);
 
1185
                goto free_some_blocks;
 
1186
        }
 
1187
        ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_FREE);
 
1188
        dcg->cg_free_list = new_block->cb_next;
 
1189
        dcg->cg_free_count--;
 
1190
#ifdef DEBUG_CHECK_IND_CACHE
 
1191
        dcg->cg_read_count++;
 
1192
#endif
 
1193
        xt_unlock_mutex_ns(&dcg->cg_lock);
 
1194
 
 
1195
        use_free_block:
 
1196
        new_block->cb_address = address;
 
1197
        new_block->cb_file_id = file->fr_id;
 
1198
        ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_FREE);
 
1199
        new_block->cb_state = IDX_CAC_BLOCK_CLEAN;
 
1200
        new_block->cb_handle_count = 0;
 
1201
        new_block->cp_del_count = 0;
 
1202
        new_block->cb_dirty_next = NULL;
 
1203
        new_block->cb_dirty_prev = NULL;
 
1204
#ifdef IND_OPT_DATA_WRITTEN
 
1205
        new_block->cb_header = FALSE;
 
1206
        new_block->cb_min_pos = 0xFFFF;
 
1207
        new_block->cb_max_pos = 0;
 
1208
#endif
 
1209
 
 
1210
        if (read_data) {
 
1211
                if (!xt_pread_file(file, xt_ind_node_to_offset(ot->ot_table, address), XT_INDEX_PAGE_SIZE, 0, new_block->cb_data, &red_size, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread)) {
 
1212
                        xt_lock_mutex_ns(&dcg->cg_lock);
 
1213
                        new_block->cb_next = dcg->cg_free_list;
 
1214
                        dcg->cg_free_list = new_block;
 
1215
                        dcg->cg_free_count++;
 
1216
#ifdef DEBUG_CHECK_IND_CACHE
 
1217
                        dcg->cg_read_count--;
 
1218
#endif
 
1219
                        ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_CLEAN);
 
1220
                        new_block->cb_state = IDX_CAC_BLOCK_FREE;
 
1221
                        IDX_TRACE("%d- F%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(new_block->cb_data));
 
1222
                        xt_unlock_mutex_ns(&dcg->cg_lock);
 
1223
#ifdef DEBUG_CHECK_IND_CACHE
 
1224
                        xt_ind_check_cache(NULL);
 
1225
#endif
 
1226
                        return NULL;
 
1227
                }
 
1228
                IDX_TRACE("%d- R%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(new_block->cb_data));
 
1229
                ot->ot_thread->st_statistics.st_ind_cache_miss++;
 
1230
        }
 
1231
        else
 
1232
                red_size = 0;
 
1233
        // PMC - I don't think this is required! memset(new_block->cb_data + red_size, 0, XT_INDEX_PAGE_SIZE - red_size);
 
1234
 
 
1235
        IDX_CAC_WRITE_LOCK(seg, ot->ot_thread);
 
1236
        block = seg->cs_hash_table[hash_idx];
 
1237
        while (block) {
 
1238
                if (XT_NODE_ID(block->cb_address) == XT_NODE_ID(address) && block->cb_file_id == file->fr_id) {
 
1239
                        /* Oops, someone else was faster! */
 
1240
                        xt_lock_mutex_ns(&dcg->cg_lock);
 
1241
                        new_block->cb_next = dcg->cg_free_list;
 
1242
                        dcg->cg_free_list = new_block;
 
1243
                        dcg->cg_free_count++;
 
1244
#ifdef DEBUG_CHECK_IND_CACHE
 
1245
                        dcg->cg_read_count--;
 
1246
#endif
 
1247
                        ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_CLEAN);
 
1248
                        new_block->cb_state = IDX_CAC_BLOCK_FREE;
 
1249
                        IDX_TRACE("%d- F%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(new_block->cb_data));
 
1250
                        xt_unlock_mutex_ns(&dcg->cg_lock);
 
1251
                        goto done_ok;
 
1252
                }
 
1253
                block = block->cb_next;
 
1254
        }
 
1255
        block = new_block;
 
1256
 
 
1257
        /* Make the block the most recently used: */
 
1258
        xt_lock_mutex_ns(&dcg->cg_lock);
 
1259
        block->cb_ru_time = ++dcg->cg_ru_now;
 
1260
        if ((block->cb_lr_used = dcg->cg_mru_block))
 
1261
                dcg->cg_mru_block->cb_mr_used = block;
 
1262
        block->cb_mr_used = NULL;
 
1263
        dcg->cg_mru_block = block;
 
1264
        if (!dcg->cg_lru_block)
 
1265
                dcg->cg_lru_block = block;
 
1266
#ifdef DEBUG_CHECK_IND_CACHE
 
1267
        dcg->cg_read_count--;
 
1268
#endif
 
1269
        xt_unlock_mutex_ns(&dcg->cg_lock);
 
1270
 
 
1271
        /* {LAZY-DEL-INDEX-ITEMS}
 
1272
         * Conditionally count the number of deleted entries in the index:
 
1273
         * We do this before other threads can read the block.
 
1274
         */
 
1275
        if (ind && ind->mi_lazy_delete && read_data)
 
1276
                xt_ind_count_deleted_items(ot->ot_table, ind, block);
 
1277
 
 
1278
        /* Add to the hash table: */
 
1279
        block->cb_next = seg->cs_hash_table[hash_idx];
 
1280
        seg->cs_hash_table[hash_idx] = block;
 
1281
 
 
1282
        done_ok:
 
1283
        *ret_seg = seg;
 
1284
#ifdef DEBUG_CHECK_IND_CACHE
 
1285
        xt_ind_check_cache(NULL);
 
1286
#endif
 
1287
        return block;
 
1288
}
 
1289
 
 
1290
static xtBool ind_cac_get(XTOpenTablePtr ot, xtIndexNodeID address, DcSegmentPtr *ret_seg, XTIndBlockPtr *ret_block)
 
1291
{
 
1292
        register XTOpenFilePtr  file = ot->ot_ind_file;
 
1293
        register XTIndBlockPtr  block;
 
1294
        register DcSegmentPtr   seg;
 
1295
        register u_int                  hash_idx;
 
1296
        register DcGlobalsRec   *dcg = &ind_cac_globals;
 
1297
 
 
1298
        hash_idx = XT_NODE_ID(address) + (file->fr_id * 223);
 
1299
        seg = &dcg->cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
 
1300
        hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % dcg->cg_hash_size;
 
1301
 
 
1302
        IDX_CAC_READ_LOCK(seg, ot->ot_thread);
 
1303
        block = seg->cs_hash_table[hash_idx];
 
1304
        while (block) {
 
1305
                if (XT_NODE_ID(block->cb_address) == XT_NODE_ID(address) && block->cb_file_id == file->fr_id) {
 
1306
                        ASSERT_NS(block->cb_state != IDX_CAC_BLOCK_FREE);
 
1307
 
 
1308
                        *ret_seg = seg;
 
1309
                        *ret_block = block;
 
1310
                        return OK;
 
1311
                }
 
1312
                block = block->cb_next;
 
1313
        }
 
1314
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
1315
        
 
1316
        /* Block not found: */
 
1317
        *ret_seg = NULL;
 
1318
        *ret_block = NULL;
 
1319
        return OK;
 
1320
}
 
1321
 
 
1322
xtPublic xtBool xt_ind_write(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, size_t size, xtWord1 *data)
 
1323
{
 
1324
        XTIndBlockPtr   block;
 
1325
        DcSegmentPtr    seg;
 
1326
 
 
1327
        if (!(block = ind_cac_fetch(ot, ind, address, &seg, FALSE)))
 
1328
                return FAILED;
 
1329
 
 
1330
        XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
 
1331
        if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
 
1332
                if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
 
1333
                        XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
 
1334
                        IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
1335
                        return FAILED;
 
1336
                }
 
1337
        }
 
1338
#ifdef IND_OPT_DATA_WRITTEN
 
1339
        block->cb_header = TRUE;
 
1340
        block->cb_min_pos = 0;
 
1341
        if (size-XT_INDEX_PAGE_HEAD_SIZE > block->cb_max_pos)
 
1342
                block->cb_max_pos = size-XT_INDEX_PAGE_HEAD_SIZE;
 
1343
        ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-XT_INDEX_PAGE_HEAD_SIZE);
 
1344
        ASSERT_NS(block->cb_min_pos < block->cb_max_pos);
 
1345
#endif
 
1346
        ASSERT_NS(IDX_CAC_MODIFYABLE(block->cb_state));
 
1347
        memcpy(block->cb_data, data, size);
 
1348
        if (block->cb_state != IDX_CAC_BLOCK_DIRTY) {
 
1349
                TRACK_BLOCK_WRITE(offset);
 
1350
                xt_spinlock_lock(&ind->mi_dirty_lock);
 
1351
                if ((block->cb_dirty_next = ind->mi_dirty_list))
 
1352
                        ind->mi_dirty_list->cb_dirty_prev = block;
 
1353
                block->cb_dirty_prev = NULL;
 
1354
                ind->mi_dirty_list = block;
 
1355
                ind->mi_dirty_blocks++;
 
1356
                xt_spinlock_unlock(&ind->mi_dirty_lock);
 
1357
                if (block->cb_state != IDX_CAC_BLOCK_LOGGED) {
 
1358
                        ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN);
 
1359
                        ot->ot_thread->st_statistics.st_ind_cache_dirty++;
 
1360
                }
 
1361
                block->cb_state = IDX_CAC_BLOCK_DIRTY;
 
1362
        }
 
1363
        XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
 
1364
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
1365
#ifdef XT_TRACK_INDEX_UPDATES
 
1366
        ot->ot_ind_changed++;
 
1367
#endif
 
1368
#ifdef CHECK_BLOCK_TRAILERS
 
1369
        check_block_trailers();
 
1370
#endif
 
1371
        return OK;
 
1372
}
 
1373
 
 
1374
/*
 
1375
 * Update the cache, if in RAM.
 
1376
 */
 
1377
xtPublic xtBool xt_ind_write_cache(XTOpenTablePtr ot, xtIndexNodeID address, size_t size, xtWord1 *data)
 
1378
{
 
1379
        XTIndBlockPtr   block;
 
1380
        DcSegmentPtr    seg;
 
1381
 
 
1382
        if (!ind_cac_get(ot, address, &seg, &block))
 
1383
                return FAILED;
 
1384
 
 
1385
        if (block) {
 
1386
                XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
 
1387
                /* This should only be done to pages that are free, which
 
1388
                 * are not on the dirty list, so they must be clean!
 
1389
                 */
 
1390
                ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN);
 
1391
                memcpy(block->cb_data, data, size);
 
1392
 
 
1393
                XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
 
1394
                IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
1395
        }
 
1396
 
 
1397
        return OK;
 
1398
}
 
1399
 
 
1400
xtPublic xtBool xt_ind_get(XTOpenTablePtr ot, xtIndexNodeID address, XTIndReferencePtr iref)
 
1401
{
 
1402
        XTIndBlockPtr   block;
 
1403
        DcSegmentPtr    seg;
 
1404
 
 
1405
        if (!ind_cac_get(ot, address, &seg, &block))
 
1406
                return FAILED;
 
1407
 
 
1408
        if (block) {
 
1409
                XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
 
1410
                ASSERT_NS(IDX_CAC_NOT_FREE(block->cb_state));
 
1411
                IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
1412
                iref->ir_block = block;
 
1413
                iref->ir_branch = (XTIdxBranchDPtr) block->cb_data;
 
1414
        }
 
1415
        else {
 
1416
                iref->ir_block = NULL;
 
1417
                iref->ir_branch = NULL;
 
1418
        }
 
1419
        iref->ir_xlock = TRUE;
 
1420
        iref->ir_updated = FALSE;
 
1421
 
 
1422
        return OK;
 
1423
}
 
1424
 
 
1425
/* 
 
1426
 * Note, this function may only be called if the block has
 
1427
 * been freed.
 
1428
 */
 
1429
xtPublic xtBool xt_ind_free_block(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address)
 
1430
{
 
1431
        XTIndBlockPtr   block;
 
1432
        DcSegmentPtr    seg;
 
1433
 
 
1434
        if (!ind_cac_get(ot, address, &seg, &block))
 
1435
                return FAILED;
 
1436
        if (block) {
 
1437
                XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
 
1438
 
 
1439
                if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
 
1440
                        if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
 
1441
                                XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
 
1442
                                IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
1443
                                return FAILED;
 
1444
                        }
 
1445
                }
 
1446
 
 
1447
                /* {PAGE-NO-IN-INDEX-FILE}
 
1448
                 * This is the one exeption to the rule that a block
 
1449
                 * that is in the IDX_CAC_BLOCK_LOGGED may be released
 
1450
                 * from the cache!
 
1451
                 */
 
1452
                ASSERT_NS(IDX_CAC_MODIFYABLE(block->cb_state));
 
1453
 
 
1454
                if (block->cb_state == IDX_CAC_BLOCK_DIRTY) {
 
1455
                        /* Take the block off the dirty list: */
 
1456
                        xt_spinlock_lock(&ind->mi_dirty_lock);
 
1457
                        if (block->cb_dirty_next)
 
1458
                                block->cb_dirty_next->cb_dirty_prev = block->cb_dirty_prev;
 
1459
                        if (block->cb_dirty_prev)
 
1460
                                block->cb_dirty_prev->cb_dirty_next = block->cb_dirty_next;
 
1461
                        if (ind->mi_dirty_list == block)
 
1462
                                ind->mi_dirty_list = block->cb_dirty_next;
 
1463
                        ind->mi_dirty_blocks--;
 
1464
                        xt_spinlock_unlock(&ind->mi_dirty_lock);
 
1465
                        block->cb_state = IDX_CAC_BLOCK_CLEAN;
 
1466
                        ot->ot_thread->st_statistics.st_ind_cache_dirty--;
 
1467
#ifdef IND_OPT_DATA_WRITTEN
 
1468
                        block->cb_header = FALSE;
 
1469
                        block->cb_min_pos = 0xFFFF;
 
1470
                        block->cb_max_pos = 0;
 
1471
#endif
 
1472
                }
 
1473
                XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
 
1474
 
 
1475
                IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
1476
        }
 
1477
 
 
1478
        return OK;
 
1479
}
 
1480
 
 
1481
xtPublic xtBool xt_ind_read_bytes(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, size_t size, xtWord1 *data)
 
1482
{
 
1483
        XTIndBlockPtr   block;
 
1484
        DcSegmentPtr    seg;
 
1485
 
 
1486
        if (!(block = ind_cac_fetch(ot, ind, address, &seg, TRUE)))
 
1487
                return FAILED;
 
1488
 
 
1489
        XT_IPAGE_READ_LOCK(&block->cb_lock);
 
1490
        memcpy(data, block->cb_data, size);
 
1491
        XT_IPAGE_UNLOCK(&block->cb_lock, FALSE);
 
1492
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
1493
        return OK;
 
1494
}
 
1495
 
 
1496
xtPublic xtBool xt_ind_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, XTPageLockType ltype, XTIndReferencePtr iref)
 
1497
{
 
1498
        register XTIndBlockPtr  block;
 
1499
        DcSegmentPtr                    seg;
 
1500
        xtWord2                                 branch_size;
 
1501
        u_int                                   rec_size;
 
1502
        xtBool                                  xlock = FALSE;
 
1503
 
 
1504
#ifdef DEBUG
 
1505
        ASSERT_NS(iref->ir_xlock == 2);
 
1506
        ASSERT_NS(iref->ir_xlock == 2);
 
1507
#endif
 
1508
        if (!(block = ind_cac_fetch(ot, ind, address, &seg, TRUE)))
 
1509
                return FAILED;
 
1510
 
 
1511
        branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2);
 
1512
        rec_size = XT_GET_INDEX_BLOCK_LEN(branch_size);
 
1513
        if (rec_size < 2 || rec_size > XT_INDEX_PAGE_SIZE)
 
1514
                goto failed_corrupt;
 
1515
        if (ind->mi_fix_key) {
 
1516
                rec_size -= 2;
 
1517
                if (XT_IS_NODE(branch_size)) {
 
1518
                        if (rec_size != 0) {
 
1519
                                if (rec_size < XT_NODE_REF_SIZE)
 
1520
                                        goto failed_corrupt;
 
1521
                                rec_size -= XT_NODE_REF_SIZE;
 
1522
                                if ((rec_size % (ind->mi_key_size + XT_RECORD_REF_SIZE + XT_NODE_REF_SIZE)) != 0)
 
1523
                                        goto failed_corrupt;
 
1524
                        }
 
1525
                }
 
1526
                else {
 
1527
                        if ((rec_size % (ind->mi_key_size + XT_RECORD_REF_SIZE)) != 0)
 
1528
                                goto failed_corrupt;
 
1529
                }
 
1530
        }
 
1531
 
 
1532
        switch (ltype) {
 
1533
                case XT_LOCK_READ:
 
1534
                        break;
 
1535
                case XT_LOCK_WRITE:
 
1536
                        xlock = TRUE;
 
1537
                        break;
 
1538
                case XT_XLOCK_LEAF:
 
1539
                        if (!XT_IS_NODE(branch_size))
 
1540
                                xlock = TRUE;
 
1541
                        break;
 
1542
                case XT_XLOCK_DEL_LEAF:
 
1543
                        if (!XT_IS_NODE(branch_size)) {
 
1544
                                if (ot->ot_table->tab_dic.dic_no_lazy_delete)
 
1545
                                        xlock = TRUE;
 
1546
                                else {
 
1547
                                        /*
 
1548
                                         * {LAZY-DEL-INDEX-ITEMS}
 
1549
                                         *
 
1550
                                         * We are fetch a page for delete purpose.
 
1551
                                         * we decide here if we plan to do a lazy delete,
 
1552
                                         * Or if we plan to compact the node.
 
1553
                                         *
 
1554
                                         * A lazy delete just requires a shared lock.
 
1555
                                         *
 
1556
                                         */
 
1557
                                        if (ind->mi_lazy_delete) {
 
1558
                                                /* If the number of deleted items is greater than
 
1559
                                                 * half of the number of times that can fit in the
 
1560
                                                 * page, then we will compact the node.
 
1561
                                                 */
 
1562
                                                if (!xt_idx_lazy_delete_on_leaf(ind, block, XT_GET_INDEX_BLOCK_LEN(branch_size)))
 
1563
                                                        xlock = TRUE;
 
1564
                                        }
 
1565
                                        else
 
1566
                                                xlock = TRUE;
 
1567
                                }
 
1568
                        }
 
1569
                        break;
 
1570
        }
 
1571
 
 
1572
        if ((iref->ir_xlock = xlock))
 
1573
                XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
 
1574
        else
 
1575
                XT_IPAGE_READ_LOCK(&block->cb_lock);
 
1576
 
 
1577
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
1578
 
 
1579
        /* {DIRECT-IO}
 
1580
         * Direct I/O requires that the buffer is 512 byte aligned.
 
1581
         * To do this, cb_data is turned into a pointer, instead
 
1582
         * of an array.
 
1583
         * As a result, we need to pass a pointer to both the
 
1584
         * cache block and the cache block data:
 
1585
         */
 
1586
        iref->ir_updated = FALSE;
 
1587
        iref->ir_block = block;
 
1588
        iref->ir_branch = (XTIdxBranchDPtr) block->cb_data;
 
1589
        return OK;
 
1590
 
 
1591
        failed_corrupt:
 
1592
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
 
1593
        xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
 
1594
        return FAILED;
 
1595
}
 
1596
 
 
1597
xtPublic xtBool xt_ind_release(XTOpenTablePtr ot, XTIndexPtr ind, XTPageUnlockType XT_NDEBUG_UNUSED(utype), XTIndReferencePtr iref)
 
1598
{
 
1599
        register XTIndBlockPtr  block;
 
1600
 
 
1601
        block = iref->ir_block;
 
1602
 
 
1603
#ifdef DEBUG
 
1604
        ASSERT_NS(iref->ir_xlock != 2);
 
1605
        ASSERT_NS(iref->ir_updated != 2);
 
1606
        if (iref->ir_updated)
 
1607
                ASSERT_NS(utype == XT_UNLOCK_R_UPDATE || utype == XT_UNLOCK_W_UPDATE);
 
1608
        else
 
1609
                ASSERT_NS(utype == XT_UNLOCK_READ || utype == XT_UNLOCK_WRITE);
 
1610
        if (iref->ir_xlock)
 
1611
                ASSERT_NS(utype == XT_UNLOCK_WRITE || utype == XT_UNLOCK_W_UPDATE);
 
1612
        else
 
1613
                ASSERT_NS(utype == XT_UNLOCK_READ || utype == XT_UNLOCK_R_UPDATE);
 
1614
#endif
 
1615
        if (iref->ir_updated) {
 
1616
#ifdef DEBUG
 
1617
#ifdef IND_OPT_DATA_WRITTEN
 
1618
                xtWord2 branch_size;
 
1619
                u_int   rec_size;
 
1620
 
 
1621
                branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2);
 
1622
                rec_size = XT_GET_INDEX_BLOCK_LEN(branch_size);
 
1623
 
 
1624
                ASSERT_NS(block->cb_min_pos <= rec_size-2);
 
1625
                ASSERT_NS(block->cb_min_pos <= block->cb_max_pos);
 
1626
                ASSERT_NS(block->cb_max_pos <= rec_size-2);
 
1627
                ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
 
1628
#endif
 
1629
#endif
 
1630
                /* The page was update: */
 
1631
                ASSERT_NS(IDX_CAC_MODIFYABLE(block->cb_state));
 
1632
                if (block->cb_state != IDX_CAC_BLOCK_DIRTY) {
 
1633
                        TRACK_BLOCK_WRITE(offset);
 
1634
                        xt_spinlock_lock(&ind->mi_dirty_lock);
 
1635
                        if ((block->cb_dirty_next = ind->mi_dirty_list))
 
1636
                                ind->mi_dirty_list->cb_dirty_prev = block;
 
1637
                        block->cb_dirty_prev = NULL;
 
1638
                        ind->mi_dirty_list = block;
 
1639
                        ind->mi_dirty_blocks++;
 
1640
                        xt_spinlock_unlock(&ind->mi_dirty_lock);
 
1641
                        if (block->cb_state != IDX_CAC_BLOCK_LOGGED) {
 
1642
                                ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN);
 
1643
                                ot->ot_thread->st_statistics.st_ind_cache_dirty++;
 
1644
                        }
 
1645
                        block->cb_state = IDX_CAC_BLOCK_DIRTY;
 
1646
                }
 
1647
        }
 
1648
 
 
1649
        XT_IPAGE_UNLOCK(&block->cb_lock, iref->ir_xlock);
 
1650
#ifdef DEBUG
 
1651
        iref->ir_xlock = 2;
 
1652
        iref->ir_updated = 2;
 
1653
#endif
 
1654
        return OK;
 
1655
}
 
1656
 
 
1657
xtPublic xtBool xt_ind_reserve(XTOpenTablePtr ot, u_int count, XTIdxBranchDPtr not_this)
 
1658
{
 
1659
        register XTIndBlockPtr  block;
 
1660
        register DcGlobalsRec   *dcg = &ind_cac_globals;
 
1661
 
 
1662
#ifdef XT_TRACK_INDEX_UPDATES
 
1663
        ot->ot_ind_reserved = count;
 
1664
        ot->ot_ind_reads = 0;
 
1665
#endif
 
1666
#ifdef DEBUG_CHECK_IND_CACHE
 
1667
        xt_ind_check_cache(NULL);
 
1668
#endif
 
1669
        while (ot->ot_ind_res_count < count) {
 
1670
                if (!dcg->cg_free_list) {
 
1671
                        if (!ind_cac_free_lru_blocks(ot, count - ot->ot_ind_res_count, not_this)) {
 
1672
                                if (!dcg->cg_free_list) {
 
1673
                                        xt_ind_free_reserved(ot);
 
1674
                                        xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_INDEX_CACHE);
 
1675
#ifdef DEBUG_CHECK_IND_CACHE
 
1676
                                        xt_ind_check_cache(NULL);
 
1677
#endif
 
1678
                                        return FAILED;
 
1679
                                }
 
1680
                        }
 
1681
                }
 
1682
 
 
1683
                /* Get a free block: */
 
1684
                xt_lock_mutex_ns(&dcg->cg_lock);
 
1685
                while (ot->ot_ind_res_count < count && (block = dcg->cg_free_list)) {
 
1686
                        ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_FREE);
 
1687
                        dcg->cg_free_list = block->cb_next;
 
1688
                        dcg->cg_free_count--;
 
1689
                        block->cb_next = ot->ot_ind_res_bufs;
 
1690
                        ot->ot_ind_res_bufs = block;
 
1691
                        ot->ot_ind_res_count++;
 
1692
#ifdef DEBUG_CHECK_IND_CACHE
 
1693
                        dcg->cg_reserved_by_ots++;
 
1694
#endif
 
1695
                }
 
1696
                xt_unlock_mutex_ns(&dcg->cg_lock);
 
1697
        }
 
1698
#ifdef DEBUG_CHECK_IND_CACHE
 
1699
        xt_ind_check_cache(NULL);
 
1700
#endif
 
1701
        return OK;
 
1702
}
 
1703
 
 
1704
xtPublic void xt_ind_free_reserved(XTOpenTablePtr ot)
 
1705
{
 
1706
#ifdef DEBUG_CHECK_IND_CACHE
 
1707
        xt_ind_check_cache(NULL);
 
1708
#endif
 
1709
        if (ot->ot_ind_res_bufs) {
 
1710
                register XTIndBlockPtr  block, fblock;
 
1711
                register DcGlobalsRec   *dcg = &ind_cac_globals;
 
1712
 
 
1713
                xt_lock_mutex_ns(&dcg->cg_lock);
 
1714
                block = ot->ot_ind_res_bufs;
 
1715
                while (block) {
 
1716
                        fblock = block;
 
1717
                        block = block->cb_next;
 
1718
 
 
1719
                        fblock->cb_next = dcg->cg_free_list;
 
1720
                        dcg->cg_free_list = fblock;
 
1721
#ifdef DEBUG_CHECK_IND_CACHE
 
1722
                        dcg->cg_reserved_by_ots--;
 
1723
#endif
 
1724
                        dcg->cg_free_count++;
 
1725
                }
 
1726
                xt_unlock_mutex_ns(&dcg->cg_lock);
 
1727
                ot->ot_ind_res_bufs = NULL;
 
1728
                ot->ot_ind_res_count = 0;
 
1729
        }
 
1730
#ifdef DEBUG_CHECK_IND_CACHE
 
1731
        xt_ind_check_cache(NULL);
 
1732
#endif
 
1733
}
 
1734
 
 
1735
xtPublic void xt_ind_unreserve(XTOpenTablePtr ot)
 
1736
{
 
1737
        if (!ind_cac_globals.cg_free_list)
 
1738
                xt_ind_free_reserved(ot);
 
1739
}
 
1740