~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/tabcache_xt.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-10-02 14:17:48 UTC
  • mfrom: (1.1.1 upstream)
  • mto: (2.1.17 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101002141748-m6vbfbfjhrw1153e
Tags: 2010.09.1802-1
* New upstream release.
* Removed pid-file argument hack.
* Updated GPL-2 address to be new address.
* Directly copy in drizzledump.1 since debian doesn't have sphinx 1.0 yet.
* Link to jquery from libjs-jquery. Add it as a depend.
* Add drizzled.8 symlink to the install files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2007 PrimeBase Technologies GmbH
 
2
 *
 
3
 * PrimeBase XT
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * 2007-10-30   Paul McCullagh
 
20
 *
 
21
 * H&G2JCtL
 
22
 *
 
23
 * The new table cache. Caches all non-index data. This includes the data
 
24
 * files and the row pointer files.
 
25
 */
 
26
 
 
27
#include "xt_config.h"
 
28
 
 
29
#ifdef DRIZZLED
 
30
#include <bitset>
 
31
#endif
 
32
 
 
33
#include <signal.h>
 
34
 
 
35
#include "pthread_xt.h"
 
36
#include "tabcache_xt.h"
 
37
#include "table_xt.h"
 
38
#include "database_xt.h"
 
39
#include "trace_xt.h"
 
40
#include "myxt_xt.h"
 
41
#include "lock_xt.h"
 
42
#include "strutil_xt.h"
 
43
 
 
44
#ifdef DEBUG
 
45
//#define TRACE_DISTRIBUTION
 
46
#endif
 
47
 
 
48
#ifdef TRACE_DISTRIBUTION
 
49
static void tabc_init_dist(XTTabCachePtr cac, XTFilePtr file);
 
50
static void tabc_dist_change(XTOpenFilePtr file, xtRefID ref_id);
 
51
#endif
 
52
 
 
53
xtPublic XTTabCacheMemRec       xt_tab_cache;
 
54
 
 
55
static void tabc_fr_wait_for_cache(XTThreadPtr self, u_int msecs);
 
56
 
 
57
xtPublic void xt_tc_set_cache_size(size_t cache_size)
 
58
{
 
59
        xt_tab_cache.tcm_cache_size = cache_size;
 
60
        /* Multiplying by this number can overflow a 4 byte value! */
 
61
        xt_tab_cache.tcm_low_level = (size_t) ((xtWord8) cache_size * (xtWord8) 70 / (xtWord8) 100);    // Current 70%
 
62
        xt_tab_cache.tcm_high_level = (size_t) ((xtWord8) cache_size * 95 / (xtWord8) 100);                             // Current 95%
 
63
        xt_tab_cache.tcm_mid_level = (size_t) ((xtWord8) cache_size * 85 / (xtWord8) 100);                              // Current 85%
 
64
}
 
65
 
 
66
/*
 
67
 * Initialize the disk cache.
 
68
 */
 
69
xtPublic void xt_tc_init(XTThreadPtr self, size_t cache_size)
 
70
{
 
71
        xt_tc_set_cache_size(cache_size);
 
72
 
 
73
        xt_tab_cache.tcm_approx_page_count = cache_size / sizeof(XTTabCachePageRec);
 
74
        /* Determine the size of the hash table.
 
75
         * The size is set to 2* the number of pages!
 
76
         */
 
77
        xt_tab_cache.tcm_hash_size = (xt_tab_cache.tcm_approx_page_count * 2) / XT_TC_SEGMENT_COUNT;
 
78
 
 
79
        try_(a) {
 
80
                for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
 
81
                        xt_tab_cache.tcm_segment[i].tcs_cache_in_use = 0;
 
82
                        xt_tab_cache.tcm_segment[i].tcs_hash_table = (XTTabCachePagePtr *) xt_calloc(self, xt_tab_cache.tcm_hash_size * sizeof(XTTabCachePagePtr));
 
83
                        TAB_CAC_INIT_LOCK(self, &xt_tab_cache.tcm_segment[i].tcs_lock);
 
84
                }
 
85
 
 
86
                xt_init_mutex_with_autoname(self, &xt_tab_cache.tcm_lock);
 
87
                xt_init_cond(self, &xt_tab_cache.tcm_cond);
 
88
                xt_init_mutex_with_autoname(self, &xt_tab_cache.tcm_freeer_lock);
 
89
                xt_init_cond(self, &xt_tab_cache.tcm_freeer_cond);
 
90
        }
 
91
        catch_(a) {
 
92
                xt_tc_exit(self);
 
93
                throw_();
 
94
        }
 
95
        cont_(a);
 
96
}
 
97
 
 
98
xtPublic void xt_tc_exit(XTThreadPtr self)
 
99
{
 
100
        XTTabCacheSegPtr seg;
 
101
 
 
102
        for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
 
103
                seg = &xt_tab_cache.tcm_segment[i];
 
104
                if (seg->tcs_hash_table) {
 
105
                        XTTabCachePagePtr page, tmp_page;
 
106
 
 
107
                        for (size_t j=0; j<xt_tab_cache.tcm_hash_size; j++) {
 
108
                                page = seg->tcs_hash_table[j];
 
109
                                while (page) {
 
110
                                        tmp_page = page;
 
111
                                        page = page->tcp_next;
 
112
                                        ASSERT_NS(seg->tcs_cache_in_use >= offsetof(XTTabCachePageRec, tcp_data) + tmp_page->tcp_data_size);
 
113
                                        seg->tcs_cache_in_use -= (offsetof(XTTabCachePageRec, tcp_data) + tmp_page->tcp_data_size);
 
114
                                        ASSERT_NS(seg->tcs_cache_in_use == 0 || seg->tcs_cache_in_use >= 25000);
 
115
                                        xt_free(self, tmp_page);
 
116
                                }
 
117
                        }
 
118
 
 
119
#ifdef CHECK_DOUBLE_READ
 
120
                        printf("reads = %d, not req. = %d %% unrequired = %.2f\n", seg->tcs_total_reads, seg->tcs_read_not_req, 
 
121
                                (double) seg->tcs_read_not_req / (double) seg->tcs_total_reads);
 
122
#endif
 
123
                        xt_free(self, seg->tcs_hash_table);
 
124
                        seg->tcs_hash_table = NULL;
 
125
                        TAB_CAC_FREE_LOCK(self, &seg->tcs_lock);
 
126
                }
 
127
                ASSERT_NS(seg->tcs_cache_in_use == 0);
 
128
        }
 
129
 
 
130
        xt_free_mutex(&xt_tab_cache.tcm_lock);
 
131
        xt_free_cond(&xt_tab_cache.tcm_cond);
 
132
        xt_free_mutex(&xt_tab_cache.tcm_freeer_lock);
 
133
        xt_free_cond(&xt_tab_cache.tcm_freeer_cond);
 
134
}
 
135
 
 
136
xtPublic xtInt8 xt_tc_get_usage()
 
137
{
 
138
        xtInt8 size = 0;
 
139
 
 
140
        for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
 
141
                size += xt_tab_cache.tcm_segment[i].tcs_cache_in_use;
 
142
        }
 
143
        return size;
 
144
}
 
145
 
 
146
xtPublic xtInt8 xt_tc_get_size()
 
147
{
 
148
        return (xtInt8) xt_tab_cache.tcm_cache_size;
 
149
}
 
150
 
 
151
xtPublic xtInt8 xt_tc_get_high()
 
152
{
 
153
        return (xtInt8) xt_tab_cache.tcm_cache_high;
 
154
}
 
155
 
 
156
#ifdef DEBUG
 
157
xtPublic void xt_check_table_cache(XTTableHPtr tab)
 
158
{
 
159
        XTTabCachePagePtr page, ppage;
 
160
 
 
161
        xt_lock_mutex_ns(&xt_tab_cache.tcm_lock);
 
162
        ppage = NULL;
 
163
        page = xt_tab_cache.tcm_lru_page;
 
164
        while (page) {
 
165
                if (tab) {
 
166
                        if (page->tcp_db_id == tab->tab_db->db_id && page->tcp_tab_id == tab->tab_id) {
 
167
                                ASSERT_NS(!XTTableSeq::xt_op_is_before(tab->tab_seq.ts_next_seq, page->tcp_op_seq));
 
168
                        }
 
169
                }
 
170
                ASSERT_NS(page->tcp_lr_used == ppage);
 
171
                ppage = page;
 
172
                page = page->tcp_mr_used;
 
173
        }
 
174
        ASSERT_NS(xt_tab_cache.tcm_mru_page == ppage);
 
175
        xt_unlock_mutex_ns(&xt_tab_cache.tcm_lock);
 
176
}
 
177
#endif
 
178
 
 
179
void XTTabCache::xt_tc_setup(XTTableHPtr tab, xtBool rec_file, size_t head_size, size_t rec_size)
 
180
{
 
181
        tci_table = tab;
 
182
        tci_rec_file = rec_file;
 
183
        tci_header_size = head_size;
 
184
        tci_rec_size = rec_size;
 
185
        tci_rows_per_page = (XT_TC_PAGE_SIZE / rec_size) + 1;
 
186
        if (tci_rows_per_page < 2)
 
187
                tci_rows_per_page = 2;
 
188
        tci_page_size = tci_rows_per_page * rec_size;
 
189
 
 
190
#ifdef TRACE_DISTRIBUTION
 
191
        tabc_init_dist(this, tab->tab_rec_file);
 
192
#endif
 
193
}
 
194
 
 
195
/*
 
196
 * This function assumes that we never write past the boundary of a page.
 
197
 * This should be the case, because we should never write more than
 
198
 * a row, and there are only whole rows on a page.
 
199
 */
 
200
xtBool XTTabCache::xt_tc_write(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t inc, size_t size, xtWord1 *data, xtOpSeqNo *op_seq, xtBool read, XTThreadPtr thread)
 
201
{
 
202
        size_t                          offset;
 
203
        XTTabCachePagePtr       page;
 
204
        XTTabCacheSegPtr        seg;
 
205
 
 
206
#ifdef TRACE_DISTRIBUTION
 
207
        tabc_dist_change(file, ref_id);
 
208
#endif
 
209
        /*
 
210
        retry:
 
211
        */
 
212
        if (!tc_fetch(file, ref_id, &seg, &page, &offset, read, thread))
 
213
                return FAILED;
 
214
        /* Don't write while there is a read lock on the page,
 
215
         * which can happen during a sequential scan...
 
216
         *
 
217
         * This will have to be OK.
 
218
         * I cannot wait for the lock because a thread locks
 
219
         * itself out when updating during a sequential scan.
 
220
         *
 
221
         * However, I don't think this is a problem, because
 
222
         * the only records that are changed, are records
 
223
         * containing uncommitted data. Such records should
 
224
         * be ignored by a sequential scan. As long as
 
225
         * we don't crash due to reading half written
 
226
         * data!
 
227
         *
 
228
        if (page->tcp_lock_count) {
 
229
                if (!xt_timed_wait_cond_ns(&seg->tcs_cond, &seg->tcs_lock, 100)) {
 
230
                        xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id);
 
231
                        return FAILED;
 
232
                }
 
233
                xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id);
 
234
                // The page may have dissappeared from the cache, while we were sleeping!
 
235
                goto retry;
 
236
        }
 
237
        */
 
238
        
 
239
        ASSERT_NS(offset + inc + 4 <= tci_page_size);
 
240
        memcpy(page->tcp_data + offset + inc, data, size);
 
241
        /* GOTCHA, this was "op_seq > page->tcp_op_seq", however
 
242
         * this does not handle overflow!
 
243
        if (XTTableSeq::xt_op_is_before(page->tcp_op_seq, op_seq))
 
244
                page->tcp_op_seq = op_seq;
 
245
         */
 
246
 
 
247
        page->tcp_dirty = TRUE;
 
248
        ASSERT_NS(page->tcp_db_id == tci_table->tab_db->db_id && page->tcp_tab_id == tci_table->tab_id);
 
249
        *op_seq = tci_table->tab_seq.ts_set_op_seq(page);
 
250
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
 
251
        return OK;
 
252
}
 
253
 
 
254
/*
 
255
 * This is a special version of write which is used to set the "clean" bit.
 
256
 * The alternative would be to read the record first, but this
 
257
 * is much quicker!
 
258
 *
 
259
 * This function also checks if xn_id, row_id and other data match (the checks 
 
260
 * are similar to xn_sw_cleanup_done) before modifying the record, otherwise it 
 
261
 * assumes that the record was already updated earlier and we must not set it to 
 
262
 * clean.
 
263
 *
 
264
 * If the record was not modified the function returns FALSE.
 
265
 *
 
266
 * The function has a self pointer and can throw an exception.
 
267
 */
 
268
xtBool XTTabCache::xt_tc_write_cond(XTThreadPtr self, XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord1 new_type, xtOpSeqNo *op_seq, 
 
269
        xtXactID xn_id, xtRowID row_id, u_int stat_id, u_int rec_type)
 
270
{
 
271
        size_t                          offset;
 
272
        XTTabCachePagePtr       page;
 
273
        XTTabCacheSegPtr        seg;
 
274
        XTTabRecHeadDPtr        rec_head;
 
275
 
 
276
#ifdef TRACE_DISTRIBUTION
 
277
        tabc_dist_change(file, ref_id);
 
278
#endif
 
279
        if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, self))
 
280
                xt_throw(self);
 
281
 
 
282
        ASSERT(offset + 1 <= tci_page_size);
 
283
 
 
284
        rec_head = (XTTabRecHeadDPtr)(page->tcp_data + offset);
 
285
 
 
286
        /* Transaction must match: */
 
287
        if (XT_GET_DISK_4(rec_head->tr_xact_id_4) != xn_id)
 
288
                goto no_change;
 
289
 
 
290
        /* Record header must match expected value from
 
291
         * log or clean has been done, or is not required.
 
292
         *
 
293
         * For example, it is not required if a record
 
294
         * has been overwritten in a transaction.
 
295
         */
 
296
        if (rec_head->tr_rec_type_1 != rec_type ||
 
297
                rec_head->tr_stat_id_1 != stat_id)
 
298
                goto no_change;
 
299
 
 
300
        /* Row must match: */
 
301
        if (XT_GET_DISK_4(rec_head->tr_row_id_4) != row_id)
 
302
                goto no_change;
 
303
 
 
304
        *(page->tcp_data + offset) = new_type;
 
305
 
 
306
        page->tcp_dirty = TRUE;
 
307
        ASSERT(page->tcp_db_id == tci_table->tab_db->db_id && page->tcp_tab_id == tci_table->tab_id);
 
308
        *op_seq = tci_table->tab_seq.ts_set_op_seq(page);
 
309
        TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
 
310
        return TRUE;
 
311
 
 
312
        no_change:
 
313
        TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
 
314
        return FALSE;
 
315
}
 
316
 
 
317
xtBool XTTabCache::xt_tc_read(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t size, xtWord1 *data, XTThreadPtr thread)
 
318
{
 
319
#ifdef XT_USE_ROW_REC_MMAP_FILES
 
320
        return tc_read_direct(file, ref_id, size, data, thread);
 
321
#else
 
322
        size_t                          offset;
 
323
        XTTabCachePagePtr       page;
 
324
        XTTabCacheSegPtr        seg;
 
325
 
 
326
        if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, thread))
 
327
                return FAILED;
 
328
        /* A read must be completely on a page: */
 
329
        ASSERT_NS(offset + size <= tci_page_size);
 
330
        memcpy(data, page->tcp_data + offset, size);
 
331
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
 
332
        return OK;
 
333
#endif
 
334
}
 
335
 
 
336
xtBool XTTabCache::xt_tc_read_4(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord4 *value, XTThreadPtr thread)
 
337
{
 
338
#ifdef XT_USE_ROW_REC_MMAP_FILES
 
339
        register u_int                          page_idx;
 
340
        register XTTabCachePagePtr      page;
 
341
        register XTTabCacheSegPtr       seg;
 
342
        register u_int                          hash_idx;
 
343
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
 
344
        off_t                                           address;
 
345
 
 
346
        ASSERT_NS(ref_id);
 
347
        ref_id--;
 
348
        page_idx = ref_id / this->tci_rows_per_page;
 
349
        address = (off_t) ref_id * (off_t) this->tci_rec_size + (off_t) this->tci_header_size;
 
350
 
 
351
        hash_idx = page_idx + (file->fr_id * 223);
 
352
        seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
 
353
        hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
 
354
 
 
355
        TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id);
 
356
        page = seg->tcs_hash_table[hash_idx];
 
357
        while (page) {
 
358
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
 
359
                        size_t  offset;
 
360
                        xtWord1 *buffer;
 
361
 
 
362
                        offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
 
363
                        ASSERT_NS(offset + 4 <= this->tci_page_size);
 
364
                        buffer = page->tcp_data + offset;
 
365
                        *value = XT_GET_DISK_4(buffer);
 
366
                        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
 
367
                        return OK;
 
368
                }
 
369
                page = page->tcp_next;
 
370
        }
 
371
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
 
372
 
 
373
        return xt_pread_file_4(file, address, value, &thread->st_statistics.st_rec, thread);
 
374
#else
 
375
        size_t                          offset;
 
376
        XTTabCachePagePtr       page;
 
377
        XTTabCacheSegPtr        seg;
 
378
        xtWord1                         *data;
 
379
 
 
380
        if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, thread))
 
381
                return FAILED;
 
382
        /* A read must be completely on a page: */
 
383
        ASSERT_NS(offset + 4 <= tci_page_size);
 
384
        data = page->tcp_data + offset;
 
385
        *value = XT_GET_DISK_4(data);
 
386
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
 
387
        return OK;
 
388
#endif
 
389
}
 
390
 
 
391
xtBool XTTabCache::xt_tc_get_page(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtBool load, XTTabCachePagePtr *ret_page, size_t *offset, XTThreadPtr thread)
 
392
{
 
393
        XTTabCachePagePtr       page;
 
394
        XTTabCacheSegPtr        seg;
 
395
 
 
396
        if (load) {
 
397
                if (!tc_fetch(file, ref_id, &seg, &page, offset, TRUE, thread))
 
398
                        return FAILED;
 
399
        }
 
400
        else {
 
401
                if (!tc_fetch_direct(file, ref_id, &seg, &page, offset, thread))
 
402
                        return FAILED;
 
403
                if (!seg) {
 
404
                        *ret_page = NULL;
 
405
                        return OK;
 
406
                }
 
407
        }
 
408
        page->tcp_lock_count++;
 
409
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
 
410
        *ret_page = page;
 
411
        return OK;
 
412
}
 
413
 
 
414
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
 
415
void XTTabCache::xt_tc_release_page(XT_ROW_REC_FILE_PTR XT_UNUSED(file), XTTabCachePagePtr page, XTThreadPtr thread __attribute__((unused)))
 
416
{
 
417
        XTTabCacheSegPtr        seg;
 
418
 
 
419
        seg = &xt_tab_cache.tcm_segment[page->tcp_seg];
 
420
        TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id);
 
421
 
 
422
#ifdef DEBUG
 
423
        XTTabCachePagePtr lpage, ppage;
 
424
 
 
425
        ppage = NULL;
 
426
        lpage = seg->tcs_hash_table[page->tcp_hash_idx];
 
427
        while (lpage) {
 
428
                if (lpage->tcp_page_idx == page->tcp_page_idx &&
 
429
                        lpage->tcp_file_id == page->tcp_file_id)
 
430
                        break;
 
431
                ppage = lpage;
 
432
                lpage = lpage->tcp_next;
 
433
        }
 
434
 
 
435
        ASSERT_NS(page == lpage);
 
436
        ASSERT_NS(page->tcp_lock_count > 0);
 
437
#endif
 
438
 
 
439
        if (page->tcp_lock_count > 0)
 
440
                page->tcp_lock_count--;
 
441
 
 
442
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
 
443
}
 
444
 
 
445
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
 
446
xtBool XTTabCache::xt_tc_lock_page(XT_ROW_REC_FILE_PTR file, XTTabCachePagePtr *ret_page, xtRefID ref_id, size_t *offset, XTThreadPtr thread __attribute__((unused)))
 
447
{
 
448
        XTTabCachePagePtr       page;
 
449
        XTTabCacheSegPtr        seg;
 
450
 
 
451
#ifdef TRACE_DISTRIBUTION
 
452
        tabc_dist_change(file, ref_id);
 
453
#endif
 
454
        if (!tc_fetch(file, ref_id, &seg, &page, offset, TRUE, thread))
 
455
                return FAILED;
 
456
        *ret_page = page;
 
457
        return OK;
 
458
}
 
459
 
 
460
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
 
461
void XTTabCache::xt_tc_unlock_page(XT_ROW_REC_FILE_PTR XT_UNUSED(file), XTTabCachePagePtr page, xtOpSeqNo *op_seq, XTThreadPtr thread __attribute__((unused)))
 
462
{
 
463
        XTTabCacheSegPtr        seg;
 
464
 
 
465
        seg = &xt_tab_cache.tcm_segment[page->tcp_seg];
 
466
        page->tcp_dirty = TRUE;
 
467
        *op_seq = tci_table->tab_seq.ts_set_op_seq(page);
 
468
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
 
469
}
 
470
 
 
471
xtBool XTTabCache::xt_tc_read_page(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord1 *data, XTThreadPtr thread)
 
472
{
 
473
        return tc_read_direct(file, ref_id, this->tci_page_size, data, thread);
 
474
}
 
475
 
 
476
/* Read row and record files directly.
 
477
 * This by-passed the cache when reading, which mean
 
478
 * we rely in the OS for caching.
 
479
 * This probably only makes sense when these files
 
480
 * are memory mapped.
 
481
 */
 
482
xtBool XTTabCache::tc_read_direct(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t size, xtWord1 *data, XTThreadPtr thread)
 
483
{
 
484
        register u_int                          page_idx;
 
485
        register XTTabCachePagePtr      page;
 
486
        register XTTabCacheSegPtr       seg;
 
487
        register u_int                          hash_idx;
 
488
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
 
489
        size_t                                          red_size;
 
490
        off_t                                           address;
 
491
 
 
492
        ASSERT_NS(ref_id);
 
493
        ref_id--;
 
494
        page_idx = ref_id / this->tci_rows_per_page;
 
495
        address = (off_t) ref_id * (off_t) this->tci_rec_size + (off_t) this->tci_header_size;
 
496
 
 
497
        hash_idx = page_idx + (file->fr_id * 223);
 
498
        seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
 
499
        hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
 
500
 
 
501
        TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id);
 
502
        page = seg->tcs_hash_table[hash_idx];
 
503
        while (page) {
 
504
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
 
505
                        size_t offset;
 
506
 
 
507
                        offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
 
508
                        ASSERT_NS(offset + size <= this->tci_page_size);
 
509
                        memcpy(data, page->tcp_data + offset, size);
 
510
                        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
 
511
                        return OK;
 
512
                }
 
513
                page = page->tcp_next;
 
514
        }
 
515
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
 
516
        if (!XT_PREAD_RR_FILE(file, address, size, 0, data, &red_size, &thread->st_statistics.st_rec, thread))
 
517
                return FAILED;
 
518
        memset(data + red_size, 0, size - red_size);
 
519
        return OK;
 
520
}
 
521
 
 
522
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
 
523
xtBool XTTabCache::tc_fetch_direct(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCacheSegPtr *ret_seg, XTTabCachePagePtr *ret_page, size_t *offset, XTThreadPtr thread  __attribute__((unused)))
 
524
{
 
525
        register u_int                          page_idx;
 
526
        register XTTabCachePagePtr      page;
 
527
        register XTTabCacheSegPtr       seg;
 
528
        register u_int                          hash_idx;
 
529
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
 
530
 
 
531
        ASSERT_NS(ref_id);
 
532
        ref_id--;
 
533
        page_idx = ref_id / this->tci_rows_per_page;
 
534
        *offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
 
535
 
 
536
        hash_idx = page_idx + (file->fr_id * 223);
 
537
        seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
 
538
        hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
 
539
 
 
540
        TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id);
 
541
        page = seg->tcs_hash_table[hash_idx];
 
542
        while (page) {
 
543
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
 
544
                        *ret_seg = seg;
 
545
                        *ret_page = page;
 
546
                        return OK;
 
547
                }
 
548
                page = page->tcp_next;
 
549
        }
 
550
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
 
551
        *ret_seg = NULL;
 
552
        *ret_page = NULL;
 
553
        return OK;
 
554
}
 
555
 
 
556
/*
 
557
 * Note, this function may return an exclusive, or a shared lock.
 
558
 * If the page is in cache it will return a shared lock of the segment.
 
559
 * If the page was just added to the cache it will return an
 
560
 * exclusive lock.
 
561
 */
 
562
xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCacheSegPtr *ret_seg, XTTabCachePagePtr *ret_page, size_t *offset, xtBool read, XTThreadPtr thread)
 
563
{
 
564
        register u_int                          page_idx;
 
565
        register XTTabCachePagePtr      page, new_page;
 
566
        register XTTabCacheSegPtr       seg;
 
567
        register u_int                          hash_idx;
 
568
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
 
569
        size_t                                          red_size;
 
570
        off_t                                           address;
 
571
 
 
572
        ASSERT_NS(ref_id);
 
573
        ref_id--;
 
574
        page_idx = ref_id / this->tci_rows_per_page;
 
575
        address = (off_t) page_idx * (off_t) this->tci_page_size + (off_t) this->tci_header_size;
 
576
        *offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
 
577
 
 
578
        hash_idx = page_idx + (file->fr_id * 223);
 
579
        seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
 
580
        hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
 
581
 
 
582
        TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id);
 
583
        page = seg->tcs_hash_table[hash_idx];
 
584
        while (page) {
 
585
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
 
586
                        /* This page has been most recently used: */
 
587
                        if (XT_TIME_DIFF(page->tcp_ru_time, dcg->tcm_ru_now) > (dcg->tcm_approx_page_count >> 1)) {
 
588
                                /* Move to the front of the MRU list: */
 
589
                                xt_lock_mutex_ns(&dcg->tcm_lock);
 
590
 
 
591
                                page->tcp_ru_time = ++dcg->tcm_ru_now;
 
592
                                if (dcg->tcm_mru_page != page) {
 
593
                                        /* Remove from the MRU list: */
 
594
                                        if (dcg->tcm_lru_page == page)
 
595
                                                dcg->tcm_lru_page = page->tcp_mr_used;
 
596
                                        if (page->tcp_lr_used)
 
597
                                                page->tcp_lr_used->tcp_mr_used = page->tcp_mr_used;
 
598
                                        if (page->tcp_mr_used)
 
599
                                                page->tcp_mr_used->tcp_lr_used = page->tcp_lr_used;
 
600
        
 
601
                                        /* Make the page the most recently used: */
 
602
                                        if ((page->tcp_lr_used = dcg->tcm_mru_page))
 
603
                                                dcg->tcm_mru_page->tcp_mr_used = page;
 
604
                                        page->tcp_mr_used = NULL;
 
605
                                        dcg->tcm_mru_page = page;
 
606
                                        if (!dcg->tcm_lru_page)
 
607
                                                dcg->tcm_lru_page = page;
 
608
                                }
 
609
                                xt_unlock_mutex_ns(&dcg->tcm_lock);
 
610
                        }
 
611
                        *ret_seg = seg;
 
612
                        *ret_page = page;
 
613
                        thread->st_statistics.st_rec_cache_hit++;
 
614
                        return OK;
 
615
                }
 
616
                page = page->tcp_next;
 
617
        }
 
618
        
 
619
        size_t page_size = offsetof(XTTabCachePageRec, tcp_data) + this->tci_page_size;
 
620
 
 
621
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
 
622
        
 
623
        /* Page not found, allocate a new page: */
 
624
        if (!(new_page = (XTTabCachePagePtr) xt_malloc_ns(page_size)))
 
625
                return FAILED;
 
626
 
 
627
        /* Check the level of the cache: */
 
628
        size_t cache_used = 0;
 
629
        for (int i=0; i<XT_TC_SEGMENT_COUNT; i++)
 
630
                cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
 
631
 
 
632
        if (cache_used + page_size > dcg->tcm_cache_high)
 
633
                dcg->tcm_cache_high = cache_used;
 
634
 
 
635
        if (cache_used + page_size > dcg->tcm_cache_size) {
 
636
                XTThreadPtr self;
 
637
                time_t          now;
 
638
 
 
639
                /* Wait for the cache level to go down.
 
640
                 * If this happens, then the freeer is not working fast
 
641
                 * enough!
 
642
                 */
 
643
 
 
644
                /* But before I do this, I must flush my own log because:
 
645
                 * - The freeer might be waiting for a page to be cleaned.
 
646
                 * - The page can only be cleaned once it has been written to
 
647
                 *   the database.
 
648
                 * - The writer cannot write the page data until it has been
 
649
                 *   flushed to the log.
 
650
                 * - The log won't be flushed, unless this thread does it.
 
651
                 * So there could be a deadlock if I don't flush the log!
 
652
                 */
 
653
                if ((self = xt_get_self())) {
 
654
                        if (!xt_xlog_flush_log(tci_table->tab_db, self))
 
655
                                goto failed;
 
656
                }
 
657
 
 
658
                /* Wait for the free'er thread: */
 
659
                xt_lock_mutex_ns(&dcg->tcm_freeer_lock);
 
660
                now = time(NULL);
 
661
                do {
 
662
                        /* I have set the timeout to 2 here because of the following situation:
 
663
                         * 1. Transaction allocates an op seq
 
664
                         * 2. Transaction goes to update cache, but must wait for
 
665
                         *    cache to be freed (after this, the op would be written to
 
666
                         *    the log).
 
667
                         * 3. The free'er wants to free cache, but is waiting for the writter.
 
668
                         * 4. The writer cannot continue because an op seq is missing!
 
669
                         *    So the writer is waiting for the transaction thread to write
 
670
                         *    the op seq.
 
671
                         * - So we have a deadlock situation.
 
672
                         * - However, this situation can only occur if there is not enougn
 
673
                         *   cache.
 
674
                         * The timeout helps, but will not solve the problem, unless we
 
675
                         * ignore cache level here, after a while, and just continue.
 
676
                         */
 
677
 
 
678
                        /* Wake freeer before we go to sleep: */
 
679
                        if (!dcg->tcm_freeer_busy) {
 
680
                                if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
 
681
                                        xt_log_and_clear_exception_ns();
 
682
                        }
 
683
 
 
684
                        dcg->tcm_threads_waiting++;
 
685
#ifdef DEBUG
 
686
                        if (!xt_timed_wait_cond_ns(&dcg->tcm_freeer_cond, &dcg->tcm_freeer_lock, 30000)) {
 
687
                                dcg->tcm_threads_waiting--;
 
688
                                break;
 
689
                        }
 
690
#else
 
691
                        if (!xt_timed_wait_cond_ns(&dcg->tcm_freeer_cond, &dcg->tcm_freeer_lock, 1000)) {
 
692
                                dcg->tcm_threads_waiting--;
 
693
                                break;
 
694
                        }
 
695
#endif
 
696
                        dcg->tcm_threads_waiting--;
 
697
 
 
698
                        cache_used = 0;
 
699
                        for (int i=0; i<XT_TC_SEGMENT_COUNT; i++)
 
700
                                cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
 
701
 
 
702
                        if (cache_used + page_size <= dcg->tcm_high_level)
 
703
                                break;
 
704
                        /*
 
705
                         * If there is too little cache we can get stuck here.
 
706
                         * The problem is that seg numbers are allocated before fetching a
 
707
                         * record to be updated.
 
708
                         *
 
709
                         * It can happen that we end up waiting for that seq number
 
710
                         * to be written to the log before we can continue here.
 
711
                         *
 
712
                         * This happens as follows:
 
713
                         * 1. This thread waits for the freeer.
 
714
                         * 2. The freeer cannot free a page because it has not been
 
715
                         *    written by the writter.
 
716
                         * 3. The writter cannot continue because it is waiting
 
717
                         *    for a missing sequence number.
 
718
                         * 4. The missing sequence number is the one allocated
 
719
                         *    before we entered this function!
 
720
                         * 
 
721
                         * So don't wait for more than 5 seconds here!
 
722
                         */
 
723
                }
 
724
                while (time(NULL) < now + 5);
 
725
                xt_unlock_mutex_ns(&dcg->tcm_freeer_lock);
 
726
        }
 
727
        else if (cache_used + page_size > dcg->tcm_high_level) {
 
728
                /* Wake up the freeer because the cache level,
 
729
                 * is higher than the high level.
 
730
                 */
 
731
                if (!dcg->tcm_freeer_busy) {
 
732
                        xt_lock_mutex_ns(&xt_tab_cache.tcm_freeer_lock);
 
733
                        if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
 
734
                                xt_log_and_clear_exception_ns();
 
735
                        xt_unlock_mutex_ns(&xt_tab_cache.tcm_freeer_lock);
 
736
                }
 
737
        }
 
738
 
 
739
        /* Read the page into memory.... */
 
740
        new_page->tcp_dirty = FALSE;
 
741
        new_page->tcp_seg = (xtWord1) ((page_idx + (file->fr_id * 223)) & XT_TC_SEGMENT_MASK);
 
742
#ifdef XT_CLUSTER_FREE_RECORDS
 
743
        new_page->tcp_free_rec = 0xFFFF;
 
744
#endif
 
745
        new_page->tcp_lock_count = 0;
 
746
        new_page->tcp_hash_idx = hash_idx;
 
747
        new_page->tcp_page_idx = page_idx;
 
748
        new_page->tcp_file_id = file->fr_id;
 
749
        new_page->tcp_db_id = this->tci_table->tab_db->db_id;
 
750
        new_page->tcp_tab_id = this->tci_table->tab_id;
 
751
        new_page->tcp_data_size = this->tci_page_size;
 
752
        new_page->tcp_op_seq = 0; // Value not used because not dirty
 
753
 
 
754
        if (read) {
 
755
                if (!XT_PREAD_RR_FILE(file, address, this->tci_page_size, 0, new_page->tcp_data, &red_size, &thread->st_statistics.st_rec, thread))
 
756
                        goto failed;
 
757
 
 
758
#ifdef XT_CLUSTER_FREE_RECORDS
 
759
                /* Find the first free record! */
 
760
                if (tci_rec_file) {
 
761
                        xtWord1 *buff_ptr;
 
762
                        xtWord1 *end_ptr;
 
763
 
 
764
                        buff_ptr = new_page->tcp_data;
 
765
                        end_ptr = new_page->tcp_data + red_size;
 
766
                        while (buff_ptr < end_ptr) {
 
767
                                if (XT_REC_IS_FREE(*buff_ptr)) {
 
768
                                        new_page->tcp_free_rec = (xtWord2) (buff_ptr - new_page->tcp_data);
 
769
                                        break;
 
770
                                }
 
771
                                buff_ptr += tci_rec_size;
 
772
                        }
 
773
                }
 
774
#endif
 
775
        }
 
776
#ifdef XT_MEMSET_UNUSED_SPACE
 
777
        else
 
778
                red_size = 0;
 
779
 
 
780
        /* Removing this is an optimization. It should not be required
 
781
         * to clear the unused space in the page.
 
782
         */
 
783
        memset(new_page->tcp_data + red_size, 0, this->tci_page_size - red_size);
 
784
#endif
 
785
 
 
786
        /* Add the page to the cache! */
 
787
        TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id);
 
788
#ifdef CHECK_DOUBLE_READ
 
789
        seg->tcs_total_reads++;
 
790
#endif
 
791
        page = seg->tcs_hash_table[hash_idx];
 
792
        while (page) {
 
793
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
 
794
                        /* Oops, someone else was faster! */
 
795
#ifdef CHECK_DOUBLE_READ
 
796
                        seg->tcs_read_not_req++;
 
797
#endif
 
798
                        xt_free_ns(new_page);
 
799
                        goto done_ok;
 
800
                }
 
801
                page = page->tcp_next;
 
802
        }
 
803
        page = new_page;
 
804
 
 
805
        /* Make the page the most recently used: */
 
806
        xt_lock_mutex_ns(&dcg->tcm_lock);
 
807
        page->tcp_ru_time = ++dcg->tcm_ru_now;
 
808
        if ((page->tcp_lr_used = dcg->tcm_mru_page))
 
809
                dcg->tcm_mru_page->tcp_mr_used = page;
 
810
        page->tcp_mr_used = NULL;
 
811
        dcg->tcm_mru_page = page;
 
812
        if (!dcg->tcm_lru_page)
 
813
                dcg->tcm_lru_page = page;
 
814
        xt_unlock_mutex_ns(&dcg->tcm_lock);
 
815
 
 
816
        /* Add the page to the hash table: */
 
817
        page->tcp_next = seg->tcs_hash_table[hash_idx];
 
818
        seg->tcs_hash_table[hash_idx] = page;
 
819
 
 
820
        /* GOTCHA! This increment was done just after the malloc!
 
821
         * So it was not protected by the segment lock!
 
822
         * The result was that this count was no longer reliable,
 
823
         * This resulted in the amount of cache being used becoming less, and\
 
824
         * less, because increments were lost over time!
 
825
         */
 
826
        /* Increment cache used. */
 
827
        seg->tcs_cache_in_use += page_size;
 
828
 
 
829
        done_ok:
 
830
        *ret_seg = seg;
 
831
        *ret_page = page;
 
832
#ifdef DEBUG_CHECK_CACHE
 
833
        //XT_TC_check_cache();
 
834
#endif
 
835
        thread->st_statistics.st_rec_cache_miss++;
 
836
        return OK;
 
837
 
 
838
        failed:
 
839
        xt_free_ns(new_page);
 
840
        return FAILED;
 
841
}
 
842
 
 
843
 
 
844
/* ----------------------------------------------------------------------
 
845
 * OPERATION SEQUENCE
 
846
 */
 
847
 
 
848
xtBool XTTableSeq::ts_log_no_op(XTThreadPtr thread, xtTableID tab_id, xtOpSeqNo op_seq)
 
849
{
 
850
        XTactNoOpEntryDRec      ent_rec;
 
851
        xtWord4                         sum = (xtWord4) tab_id ^ (xtWord4) op_seq;
 
852
 
 
853
        ent_rec.no_status_1 = XT_LOG_ENT_NO_OP;
 
854
        ent_rec.no_checksum_1 = XT_CHECKSUM_1(sum);
 
855
        XT_SET_DISK_4(ent_rec.no_tab_id_4, tab_id);
 
856
        XT_SET_DISK_4(ent_rec.no_op_seq_4, op_seq);
 
857
        /* TODO - If this also fails we have a problem.
 
858
         * From this point on we should actually not generate
 
859
         * any more op IDs. The problem is that the
 
860
         * some will be missing, so the writer will not
 
861
         * be able to contniue.
 
862
         */
 
863
        return xt_xlog_log_data(thread, sizeof(XTactNoOpEntryDRec), (XTXactLogBufferDPtr) &ent_rec, XT_XLOG_NO_WRITE_NO_FLUSH);
 
864
}
 
865
 
 
866
#ifdef XT_NOT_INLINE
 
867
xtOpSeqNo XTTableSeq::ts_set_op_seq(XTTabCachePagePtr page)
 
868
{
 
869
        xtOpSeqNo seq;
 
870
 
 
871
        xt_lock_mutex_ns(&ts_ns_lock);
 
872
        page->tcp_op_seq = seq = ts_next_seq++;
 
873
        xt_unlock_mutex_ns(&ts_ns_lock);
 
874
        return seq;
 
875
}
 
876
 
 
877
xtOpSeqNo XTTableSeq::ts_get_op_seq()
 
878
{
 
879
        xtOpSeqNo seq;
 
880
 
 
881
        xt_lock_mutex_ns(&ts_ns_lock);
 
882
        seq = ts_next_seq++;
 
883
        xt_unlock_mutex_ns(&ts_ns_lock);
 
884
        return seq;
 
885
}
 
886
#endif
 
887
 
 
888
#ifdef XT_NOT_INLINE
 
889
/*
 
890
 * Return TRUE if the current sequence is before the
 
891
 * target (then) sequence number. This function
 
892
 * takes into account overflow. Overflow is detected
 
893
 * by checking the difference between the 2 values.
 
894
 * If the difference is very large, then we
 
895
 * assume overflow.
 
896
 */
 
897
xtBool XTTableSeq::xt_op_is_before(register xtOpSeqNo now, register xtOpSeqNo then)
 
898
{
 
899
        ASSERT_NS(sizeof(xtOpSeqNo) == 4);
 
900
        /* The now time is being incremented.
 
901
         * If it is after the then time (which is static, then
 
902
         * it is not before!
 
903
         */
 
904
        if (now >= then) {
 
905
                if ((now - then) > (xtOpSeqNo) 0xFFFFFFFF/2)
 
906
                        return TRUE;
 
907
                return FALSE;
 
908
        }
 
909
 
 
910
        /* If it appears to be before, we still have to check
 
911
         * for overflow. If the gap is bigger then half of
 
912
         * the MAX value, then we can assume it has wrapped around
 
913
         * because we know that no then can be so far in the
 
914
         * future!
 
915
         */
 
916
        if ((then - now) > (xtOpSeqNo) 0xFFFFFFFF/2)
 
917
                return FALSE;
 
918
        return TRUE;
 
919
}
 
920
#endif
 
921
 
 
922
 
 
923
/* ----------------------------------------------------------------------
 
924
 * F R E E E R    P R O C E S S
 
925
 */
 
926
 
 
927
/*
 
928
 * Used by the writer to wake the freeer.
 
929
 */
 
930
xtPublic void xt_wr_wake_freeer(XTThreadPtr XT_UNUSED(self), XTDatabaseHPtr db)
 
931
{
 
932
        /* BUG FIX: Was using tcm_freeer_cond.
 
933
         * This is incorrect. When the freeer waits for the
 
934
         * writter, it uses the writer's condition!
 
935
         */
 
936
        xt_lock_mutex_ns(&db->db_wr_lock);
 
937
        if (!xt_broadcast_cond_ns(&db->db_wr_cond))
 
938
                xt_log_and_clear_exception_ns();
 
939
        xt_unlock_mutex_ns(&db->db_wr_lock);
 
940
/*
 
941
        xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
 
942
        pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
 
943
        if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
 
944
                xt_log_and_clear_exception_ns();
 
945
        freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
 
946
*/
 
947
}
 
948
 
 
949
/* Wait for a transaction to quit: */
 
950
static void tabc_fr_wait_for_cache(XTThreadPtr self, u_int msecs)
 
951
{
 
952
        if (!self->t_quit)
 
953
                xt_timed_wait_cond(NULL, &xt_tab_cache.tcm_freeer_cond, &xt_tab_cache.tcm_freeer_lock, msecs);
 
954
}
 
955
 
 
956
typedef struct TCResource {
 
957
        XTOpenTablePtr          tc_ot;
 
958
} TCResourceRec, *TCResourcePtr;
 
959
 
 
960
static void tabc_free_fr_resources(XTThreadPtr self, TCResourcePtr tc)
 
961
{
 
962
        if (tc->tc_ot) {
 
963
                xt_db_return_table_to_pool(self, tc->tc_ot);
 
964
                tc->tc_ot = NULL;
 
965
        }
 
966
}
 
967
 
 
968
static XTTableHPtr tabc_get_table(XTThreadPtr self, TCResourcePtr tc, xtDatabaseID db_id, xtTableID tab_id)
 
969
{
 
970
        XTTableHPtr     tab;
 
971
        XTDatabaseHPtr  db;
 
972
 
 
973
        if (tc->tc_ot) {
 
974
                tab = tc->tc_ot->ot_table;
 
975
                if (tab->tab_id == tab_id && tab->tab_db->db_id == db_id)
 
976
                        return tab;
 
977
 
 
978
                xt_db_return_table_to_pool(self, tc->tc_ot);
 
979
                tc->tc_ot = NULL;
 
980
        }
 
981
 
 
982
        if (!tc->tc_ot) {
 
983
                if (!(db = xt_get_database_by_id(self, db_id)))
 
984
                        return NULL;
 
985
 
 
986
                pushr_(xt_heap_release, db);
 
987
                tc->tc_ot = xt_db_open_pool_table(self, db, tab_id, NULL, TRUE);
 
988
                freer_(); // xt_heap_release(db);
 
989
                if (!tc->tc_ot)
 
990
                        return NULL;
 
991
        }
 
992
 
 
993
        return tc->tc_ot->ot_table;
 
994
}
 
995
 
 
996
/*
 
997
 * Free the given page, or the least recently used page.
 
998
 * Return the amount of bytes freed.
 
999
 */
 
1000
static size_t tabc_free_page(XTThreadPtr self, TCResourcePtr tc)
 
1001
{
 
1002
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
 
1003
        XTTableHPtr                                     tab = NULL;
 
1004
        XTTabCachePagePtr                       page, lpage, ppage;
 
1005
        XTTabCacheSegPtr                        seg;
 
1006
        u_int                                           page_cnt;
 
1007
        xtBool                                          was_dirty;
 
1008
 
 
1009
#ifdef DEBUG_CHECK_CACHE
 
1010
        //XT_TC_check_cache();
 
1011
#endif
 
1012
        dcg->tcm_free_try_count = 0;
 
1013
 
 
1014
        retry:
 
1015
        /* Note, handling the page is safe because
 
1016
         * there is only one free'er thread which
 
1017
         * can remove pages from the cache!
 
1018
         */
 
1019
        page_cnt = 0;
 
1020
        if (!(page = dcg->tcm_lru_page)) {
 
1021
                dcg->tcm_free_try_count = 0;
 
1022
                return 0;
 
1023
        }
 
1024
 
 
1025
        retry_2:
 
1026
        if ((was_dirty = page->tcp_dirty)) {
 
1027
                /* Do all this stuff without a lock, because to
 
1028
                 * have a lock while doing this is too expensive!
 
1029
                 */
 
1030
        
 
1031
                /* Wait for the page to be cleaned. */
 
1032
                tab = tabc_get_table(self, tc, page->tcp_db_id, page->tcp_tab_id);
 
1033
        }
 
1034
 
 
1035
        seg = &dcg->tcm_segment[page->tcp_seg];
 
1036
        TAB_CAC_WRITE_LOCK(&seg->tcs_lock, self->t_id);
 
1037
 
 
1038
        if (page->tcp_dirty) {
 
1039
                if (!was_dirty) {
 
1040
                        TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
 
1041
                        goto retry_2;
 
1042
                }
 
1043
 
 
1044
                if (tab) {
 
1045
                        ASSERT(!XTTableSeq::xt_op_is_before(tab->tab_seq.ts_next_seq, page->tcp_op_seq+1));
 
1046
                        /* This should never happen. However, is has been occuring,
 
1047
                         * during multi_update test on Windows.
 
1048
                         * In particular it occurs after rename of a table, during ALTER.
 
1049
                         * As if the table was not flushed before the rename!?
 
1050
                         * To guard against an infinite loop below, I will just continue here.
 
1051
                         */
 
1052
                        if (XTTableSeq::xt_op_is_before(tab->tab_seq.ts_next_seq, page->tcp_op_seq+1))
 
1053
                                goto go_on;
 
1054
                        /* OK, we have the table, now we check where the current
 
1055
                         * sequence number is.
 
1056
                         */
 
1057
                        if (XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, page->tcp_op_seq)) {
 
1058
                                XTDatabaseHPtr db = tab->tab_db;
 
1059
 
 
1060
                                rewait:
 
1061
                                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
 
1062
 
 
1063
                                /* Flush the log, in case this is holding up the
 
1064
                                 * writer!
 
1065
                                 */
 
1066
                                if (!db->db_xlog.xlog_flush(self)) {
 
1067
                                        dcg->tcm_free_try_count = 0;
 
1068
                                        xt_throw(self);
 
1069
                                }
 
1070
 
 
1071
                                xt_lock_mutex(self, &db->db_wr_lock);
 
1072
                                pushr_(xt_unlock_mutex, &db->db_wr_lock);
 
1073
 
 
1074
                                /* The freeer is now waiting: */
 
1075
                                db->db_wr_freeer_waiting = TRUE;
 
1076
 
 
1077
                                /* If the writer is idle, wake it up. 
 
1078
                                 * The writer will commit the changes to the database
 
1079
                                 * which will allow the freeer to free up the cache.
 
1080
                                 */
 
1081
                                if (db->db_wr_idle) {
 
1082
                                        if (!xt_broadcast_cond_ns(&db->db_wr_cond))
 
1083
                                                xt_log_and_clear_exception_ns();
 
1084
                                }
 
1085
 
 
1086
                                /* Go to sleep on the writer's condition.
 
1087
                                 * The writer will wake the free'er before it goes to
 
1088
                                 * sleep!
 
1089
                                 */
 
1090
                                tab->tab_wake_freeer_op = page->tcp_op_seq;
 
1091
                                tab->tab_wr_wake_freeer = TRUE;
 
1092
                                if (!xt_timed_wait_cond_ns(&db->db_wr_cond, &db->db_wr_lock, 30000)) {
 
1093
                                        tab->tab_wr_wake_freeer = FALSE;
 
1094
                                        db->db_wr_freeer_waiting = FALSE;
 
1095
                                        xt_throw(self);
 
1096
                                }
 
1097
                                tab->tab_wr_wake_freeer = FALSE;
 
1098
                                db->db_wr_freeer_waiting = FALSE;
 
1099
                                freer_(); // xt_unlock_mutex(&db->db_wr_lock)
 
1100
 
 
1101
                                TAB_CAC_WRITE_LOCK(&seg->tcs_lock, self->t_id);
 
1102
                                if (XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, page->tcp_op_seq))
 
1103
                                        goto rewait;
 
1104
                        }
 
1105
                        go_on:;
 
1106
                }
 
1107
        }
 
1108
 
 
1109
        /* Wait if the page is being read or locked. */
 
1110
        if (page->tcp_lock_count) {
 
1111
                /* (1) If the page is being read, then we should not free
 
1112
                 *     it immediately.
 
1113
                 * (2) If a page is locked, the locker may be waiting
 
1114
                 *     for the freeer to free some cache - this
 
1115
                 *     causes a deadlock.
 
1116
                 *
 
1117
                 * Therefore, we move on, and try to free another page...
 
1118
                 */
 
1119
                if (page_cnt < (dcg->tcm_approx_page_count >> 1)) {
 
1120
                        /* Page has not changed MRU position, and we
 
1121
                         * have looked at less than half of the pages.
 
1122
                         * Go to the next page...
 
1123
                         */
 
1124
                        if ((page = page->tcp_mr_used)) {
 
1125
                                page_cnt++;
 
1126
                                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
 
1127
                                goto retry_2;
 
1128
                        }
 
1129
                }
 
1130
                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
 
1131
                dcg->tcm_free_try_count++;                              
 
1132
 
 
1133
                /* Starting to spin, free the threads: */
 
1134
                if (dcg->tcm_threads_waiting) {
 
1135
                        if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
 
1136
                                xt_log_and_clear_exception_ns();
 
1137
                }
 
1138
                goto retry;
 
1139
        }
 
1140
 
 
1141
        /* Page is clean, remove from the hash table: */
 
1142
 
 
1143
        /* Find the page on the list: */
 
1144
        u_int page_idx = page->tcp_page_idx;
 
1145
        u_int file_id = page->tcp_file_id;
 
1146
 
 
1147
        ppage = NULL;
 
1148
        lpage = seg->tcs_hash_table[page->tcp_hash_idx];
 
1149
        while (lpage) {
 
1150
                if (lpage->tcp_page_idx == page_idx && lpage->tcp_file_id == file_id)
 
1151
                        break;
 
1152
                ppage = lpage;
 
1153
                lpage = lpage->tcp_next;
 
1154
        }
 
1155
 
 
1156
        if (page == lpage) {
 
1157
                /* Should be the case! */
 
1158
                if (ppage)
 
1159
                        ppage->tcp_next = page->tcp_next;
 
1160
                else
 
1161
                        seg->tcs_hash_table[page->tcp_hash_idx] = page->tcp_next;
 
1162
        }
 
1163
#ifdef DEBUG
 
1164
        else
 
1165
                ASSERT_NS(FALSE);
 
1166
#endif
 
1167
 
 
1168
        /* Remove from the MRU list: */
 
1169
        xt_lock_mutex_ns(&dcg->tcm_lock);
 
1170
        if (dcg->tcm_lru_page == page)
 
1171
                dcg->tcm_lru_page = page->tcp_mr_used;
 
1172
        if (dcg->tcm_mru_page == page)
 
1173
                dcg->tcm_mru_page = page->tcp_lr_used;
 
1174
        if (page->tcp_lr_used)
 
1175
                page->tcp_lr_used->tcp_mr_used = page->tcp_mr_used;
 
1176
        if (page->tcp_mr_used)
 
1177
                page->tcp_mr_used->tcp_lr_used = page->tcp_lr_used;
 
1178
        xt_unlock_mutex_ns(&dcg->tcm_lock);
 
1179
 
 
1180
        /* Free the page: */
 
1181
        size_t freed_space = offsetof(XTTabCachePageRec, tcp_data) + page->tcp_data_size;
 
1182
        ASSERT_NS(seg->tcs_cache_in_use >= freed_space);
 
1183
        seg->tcs_cache_in_use -= freed_space;
 
1184
        ASSERT_NS(seg->tcs_cache_in_use == 0 || seg->tcs_cache_in_use >= 25000);
 
1185
        xt_free_ns(page);
 
1186
 
 
1187
        TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
 
1188
        self->st_statistics.st_rec_cache_frees++;
 
1189
        dcg->tcm_free_try_count = 0;
 
1190
        return freed_space;
 
1191
}
 
1192
 
 
1193
 
 
1194
#define CACHE_HIGH_LEVEL(d)                     
 
1195
 
 
1196
static void tabc_fr_main(XTThreadPtr self)
 
1197
{
 
1198
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
 
1199
        TCResourceRec                           tc = { 0 };
 
1200
        int                                                     i;
 
1201
 
 
1202
        xt_set_low_priority(self);
 
1203
        dcg->tcm_freeer_busy = TRUE;
 
1204
 
 
1205
        while (!self->t_quit) {         
 
1206
                size_t cache_used, freed;
 
1207
 
 
1208
                pushr_(tabc_free_fr_resources, &tc);
 
1209
 
 
1210
                while (!self->t_quit) {
 
1211
                        /* Total up the cache memory used: */
 
1212
                        cache_used = 0;
 
1213
                        for (i=0; i<XT_TC_SEGMENT_COUNT; i++)
 
1214
                                cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
 
1215
 
 
1216
                        if (cache_used > dcg->tcm_cache_high)
 
1217
                                dcg->tcm_cache_high = cache_used;
 
1218
 
 
1219
                        /* Check if the cache usage is over 95%: */
 
1220
                        if (self->t_quit)
 
1221
                                break;
 
1222
 
 
1223
                        /* If threads are waiting then we are more aggressive about freeing
 
1224
                         * cache.
 
1225
                         */ 
 
1226
                        if (cache_used < (dcg->tcm_threads_waiting ? dcg->tcm_mid_level : dcg->tcm_high_level))
 
1227
                                break;
 
1228
 
 
1229
                        /* Reduce cache to the 75% level: */
 
1230
                        while (!self->t_quit && cache_used > dcg->tcm_low_level) {
 
1231
                                freed = tabc_free_page(self, &tc);
 
1232
                                cache_used -= freed;
 
1233
                                if (cache_used <= dcg->tcm_high_level) {
 
1234
                                        /* Wakeup any threads that are waiting for some cache to be
 
1235
                                         * freed.
 
1236
                                         */
 
1237
                                        if (dcg->tcm_threads_waiting) {
 
1238
                                                if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
 
1239
                                                        xt_log_and_clear_exception_ns();
 
1240
                                        }
 
1241
                                }
 
1242
                        }
 
1243
                }
 
1244
 
 
1245
                freer_(); // tabc_free_fr_resources(&tc)
 
1246
 
 
1247
                xt_lock_mutex(self, &dcg->tcm_freeer_lock);
 
1248
                pushr_(xt_unlock_mutex, &dcg->tcm_freeer_lock);
 
1249
 
 
1250
                if (dcg->tcm_threads_waiting) {
 
1251
                        /* Wake threads before we go to sleep: */
 
1252
                        if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
 
1253
                                xt_log_and_clear_exception_ns();
 
1254
                }
 
1255
                        
 
1256
                /* Wait for a thread that allocates data to signal
 
1257
                 * that the cache level has exceeeded the upper limit:
 
1258
                 */
 
1259
                xt_db_approximate_time = time(NULL);
 
1260
                dcg->tcm_freeer_busy = FALSE;
 
1261
                /* No idea, why, but I am getting an uneccesarry pause here.
 
1262
                 * I run DBT2 with low record cache.
 
1263
                 *
 
1264
                 * Every now and then there is a pause where the freeer is here,
 
1265
                 * and all user threads are waiting for the freeer.
 
1266
                 *
 
1267
                 * So adding the tcm_threads_waiting condition.
 
1268
                 */
 
1269
                if (dcg->tcm_threads_waiting) {
 
1270
                        cache_used = 0;
 
1271
                        for (i=0; i<XT_TC_SEGMENT_COUNT; i++)
 
1272
                                cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
 
1273
                        if (cache_used < dcg->tcm_mid_level)
 
1274
                                tabc_fr_wait_for_cache(self, 500);
 
1275
                }
 
1276
                else
 
1277
                        tabc_fr_wait_for_cache(self, 500);
 
1278
                //tabc_fr_wait_for_cache(self, 30*1000);
 
1279
                dcg->tcm_freeer_busy = TRUE;
 
1280
                xt_db_approximate_time = time(NULL);
 
1281
                freer_(); // xt_unlock_mutex(&dcg->tcm_freeer_lock)
 
1282
        }
 
1283
}
 
1284
 
 
1285
static void *tabc_fr_run_thread(XTThreadPtr self)
 
1286
{
 
1287
        int             count;
 
1288
        void    *mysql_thread;
 
1289
 
 
1290
        myxt_wait_pbxt_plugin_slot_assigned(self);
 
1291
 
 
1292
        mysql_thread = myxt_create_thread();
 
1293
 
 
1294
        while (!self->t_quit) {
 
1295
                try_(a) {
 
1296
                        tabc_fr_main(self);
 
1297
                }
 
1298
                catch_(a) {
 
1299
                        /* This error is "normal"! */
 
1300
                        if (!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
 
1301
                                self->t_exception.e_sys_err == SIGTERM))
 
1302
                                xt_log_and_clear_exception(self);
 
1303
                }
 
1304
                cont_(a);
 
1305
 
 
1306
                /* After an exception, pause before trying again... */
 
1307
                /* Number of seconds */
 
1308
#ifdef DEBUG
 
1309
                count = 10;
 
1310
#else
 
1311
                count = 2*60;
 
1312
#endif
 
1313
                while (!self->t_quit && count > 0) {
 
1314
                        xt_db_approximate_time = time(NULL);
 
1315
                        sleep(1);
 
1316
                        count--;
 
1317
                }
 
1318
        }
 
1319
 
 
1320
   /*
 
1321
        * {MYSQL-THREAD-KILL}
 
1322
        myxt_destroy_thread(mysql_thread, TRUE);
 
1323
        */
 
1324
        return NULL;
 
1325
}
 
1326
 
 
1327
static void tabc_fr_free_thread(XTThreadPtr self, void *XT_UNUSED(data))
 
1328
{
 
1329
        if (xt_tab_cache.tcm_freeer_thread) {
 
1330
                xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
 
1331
                pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
 
1332
                xt_tab_cache.tcm_freeer_thread = NULL;
 
1333
                freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
 
1334
        }
 
1335
}
 
1336
 
 
1337
xtPublic void xt_start_freeer(XTThreadPtr self)
 
1338
{
 
1339
        xt_tab_cache.tcm_freeer_thread = xt_create_daemon(self, "free-er");
 
1340
        xt_set_thread_data(xt_tab_cache.tcm_freeer_thread, NULL, tabc_fr_free_thread);
 
1341
        xt_run_thread(self, xt_tab_cache.tcm_freeer_thread, tabc_fr_run_thread);
 
1342
}
 
1343
 
 
1344
xtPublic void xt_quit_freeer(XTThreadPtr self)
 
1345
{
 
1346
        if (xt_tab_cache.tcm_freeer_thread) {
 
1347
                xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
 
1348
                pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
 
1349
                xt_terminate_thread(self, xt_tab_cache.tcm_freeer_thread);
 
1350
                freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
 
1351
        }
 
1352
}
 
1353
 
 
1354
xtPublic void xt_stop_freeer(XTThreadPtr self)
 
1355
{
 
1356
        XTThreadPtr thr_fr;
 
1357
 
 
1358
        if (xt_tab_cache.tcm_freeer_thread) {
 
1359
                xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
 
1360
                pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
 
1361
 
 
1362
                /* This pointer is safe as long as you have the transaction lock. */
 
1363
                if ((thr_fr = xt_tab_cache.tcm_freeer_thread)) {
 
1364
                        xtThreadID tid = thr_fr->t_id;
 
1365
 
 
1366
                        /* Make sure the thread quits when woken up. */
 
1367
                        xt_terminate_thread(self, thr_fr);
 
1368
 
 
1369
                        /* Wake the freeer to get it to quit: */
 
1370
                        if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
 
1371
                                xt_log_and_clear_exception_ns();
 
1372
        
 
1373
                        freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
 
1374
 
 
1375
                        /*
 
1376
                         * GOTCHA: This is a wierd thing but the SIGTERM directed
 
1377
                         * at a particular thread (in this case the sweeper) was
 
1378
                         * being caught by a different thread and killing the server
 
1379
                         * sometimes. Disconcerting.
 
1380
                         * (this may only be a problem on Mac OS X)
 
1381
                        xt_kill_thread(thread);
 
1382
                         */
 
1383
                        xt_wait_for_thread_to_exit(tid, FALSE);
 
1384
        
 
1385
                        /* PMC - This should not be necessary to set the signal here, but in the
 
1386
                         * debugger the handler is not called!!?
 
1387
                        thr_fr->t_delayed_signal = SIGTERM;
 
1388
                        xt_kill_thread(thread);
 
1389
                         */
 
1390
                        xt_tab_cache.tcm_freeer_thread = NULL;
 
1391
                }
 
1392
                else
 
1393
                        freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
 
1394
        }
 
1395
}
 
1396
 
 
1397
xtPublic void xt_load_pages(XTThreadPtr self, XTOpenTablePtr ot)
 
1398
{
 
1399
        XTTableHPtr                     tab = ot->ot_table;
 
1400
        xtRecordID                      rec_id;
 
1401
        XTTabCachePagePtr       page;
 
1402
        XTTabCacheSegPtr        seg;
 
1403
        size_t                          poffset;
 
1404
 
 
1405
        rec_id = 1;
 
1406
        while (rec_id<tab->tab_row_eof_id) {
 
1407
                if (!tab->tab_rows.tc_fetch(ot->ot_row_file, rec_id, &seg, &page, &poffset, TRUE, self))
 
1408
                        xt_throw(self);
 
1409
                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
 
1410
                rec_id += tab->tab_rows.tci_rows_per_page;
 
1411
        }
 
1412
 
 
1413
        rec_id = 1;
 
1414
        while (rec_id<tab->tab_rec_eof_id) {
 
1415
                if (!tab->tab_recs.tc_fetch(ot->ot_rec_file, rec_id, &seg, &page, &poffset, TRUE, self))
 
1416
                        xt_throw(self);
 
1417
                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
 
1418
                rec_id += tab->tab_recs.tci_rows_per_page;
 
1419
        }
 
1420
}
 
1421
 
 
1422
#ifdef TRACE_DISTRIBUTION
 
1423
/* ----------------------------------------------------------------------
 
1424
 *
 
1425
 */
 
1426
 
 
1427
//#define TABC_TRACE_FILE       "stock"
 
1428
//#define TABC_TRACE_FILE               "customer"
 
1429
#define TABC_TRACE_FILE         "upd_ind_PBXT"
 
1430
//#define TABC_TRACE_FILE       "order_line"
 
1431
 
 
1432
#define TABC_DISPLAY_WIDTH      180
 
1433
 
 
1434
//#define TRACK_ROWS
 
1435
 
 
1436
#ifdef TRACK_ROWS
 
1437
#define TABC_CORRECT_REC_SIZE(s)                ((s) == 4)
 
1438
#else
 
1439
#define TABC_CORRECT_REC_SIZE(s)                ((s) != 4)
 
1440
#endif
 
1441
 
 
1442
static XTSpinXSLockRec  tabc_dist_lock;
 
1443
static u_int                    tabc_dist_file_id;
 
1444
static xtWord8                  tabc_output_time;
 
1445
static int                              tabc_recs_per_block;
 
1446
static int                              tabc_change_map_eof;
 
1447
static int                              tabc_change_map_init_eof;
 
1448
static int                              tabc_change_map_size;
 
1449
static int                              *tabc_change_map;
 
1450
static char                             *tabc_change_map_string;
 
1451
 
 
1452
static void tabc_init_dist(XTTabCachePtr cac, XTFilePtr file)
 
1453
{
 
1454
        if (!strstr(xt_last_name_of_path(file->fil_path), TABC_TRACE_FILE) || !TABC_CORRECT_REC_SIZE(cac->tci_rec_size))
 
1455
                return;
 
1456
 
 
1457
        xt_spinxslock_init_with_autoname(NULL, &tabc_dist_lock);
 
1458
        tabc_dist_file_id = file->fil_id;
 
1459
        tabc_output_time = xt_trace_clock();
 
1460
#ifdef TRACK_ROWS
 
1461
        tabc_recs_per_block = cac->tci_table->tab_row_eof_id / TABC_DISPLAY_WIDTH;
 
1462
#else
 
1463
        tabc_recs_per_block = cac->tci_table->tab_row_eof_id / TABC_DISPLAY_WIDTH;
 
1464
#endif
 
1465
        tabc_change_map_init_eof = TABC_DISPLAY_WIDTH;
 
1466
        tabc_change_map_eof = tabc_change_map_init_eof;
 
1467
        tabc_change_map_size = tabc_change_map_eof * 2;
 
1468
        tabc_change_map = (int *) malloc(tabc_change_map_size * sizeof(int));
 
1469
        memset(tabc_change_map, 0, tabc_change_map_size * sizeof(int));
 
1470
        tabc_change_map_string = (char *) malloc(tabc_change_map_size + 1);
 
1471
        printf("FILE: %s\n", file->fil_path);
 
1472
        printf("rec size:   %d\n", (int) cac->tci_rec_size);
 
1473
        printf("block size: %d\n", tabc_recs_per_block);
 
1474
        printf("EOF:        %d\n", tabc_change_map_eof);
 
1475
}
 
1476
 
 
1477
/* _.-~`'* */
 
1478
static void tabc_dist_output(xtWord8 now)
 
1479
{
 
1480
        int v;
 
1481
        int i;  
 
1482
        int time;
 
1483
 
 
1484
        time = (int) (now - tabc_output_time);
 
1485
        tabc_output_time = now;
 
1486
        xt_spinxslock_xlock(&tabc_dist_lock, FALSE, 0);
 
1487
        for (i=0; i<tabc_change_map_eof; i++) {
 
1488
                v = tabc_change_map[i];
 
1489
                tabc_change_map[i] = 0;
 
1490
                if (v == 0)
 
1491
                        tabc_change_map_string[i] = ' ';
 
1492
                else if (v <= 2)
 
1493
                        tabc_change_map_string[i] = '_';
 
1494
                else if (v <= 10)
 
1495
                        tabc_change_map_string[i] = '.';
 
1496
                else if (v <= 50)
 
1497
                        tabc_change_map_string[i] = '-';
 
1498
                else if (v <= 100)
 
1499
                        tabc_change_map_string[i] = '~';
 
1500
                else if (v <= 200)
 
1501
                        tabc_change_map_string[i] = '`';
 
1502
                else if (v <= 500)
 
1503
                        tabc_change_map_string[i] = '\'';
 
1504
                else
 
1505
                        tabc_change_map_string[i] = '*';
 
1506
        }
 
1507
        tabc_change_map_string[i] = 0;
 
1508
        printf("%5d %s\n", time / 1000, tabc_change_map_string);
 
1509
        xt_spinxslock_unlock(&tabc_dist_lock, TRUE);
 
1510
}
 
1511
 
 
1512
static void tabc_dist_change(XTOpenFilePtr file, xtRefID ref_id)
 
1513
{
 
1514
        xtWord8 now;
 
1515
 
 
1516
        if (file->fr_file->fil_id != tabc_dist_file_id)
 
1517
                return;
 
1518
 
 
1519
        now = xt_trace_clock();
 
1520
        if (now - tabc_output_time >= 1000000)
 
1521
                tabc_dist_output(now);
 
1522
 
 
1523
        ref_id = ref_id / tabc_recs_per_block;
 
1524
        if (ref_id < tabc_change_map_size)
 
1525
                tabc_change_map[ref_id]++;
 
1526
        if (ref_id+1 > tabc_change_map_eof)
 
1527
                tabc_change_map_eof = ref_id+1;
 
1528
}
 
1529
 
 
1530
#endif
 
1531
 
 
1532