~ubuntu-branches/ubuntu/raring/drizzle/raring

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/tabcache_xt.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2011-03-15 10:41:18 UTC
  • mfrom: (1.2.10 upstream)
  • Revision ID: james.westby@ubuntu.com-20110315104118-eaf0hvlytjdl4zrf
Tags: 2011.03.13-0ubuntu1
* New upstream release.
* Added slave plugin.
* Removed archive, blackhole and blitzdb plugins.
* Moved location of libdrizzle headers.
* Removed drizzleadmin manpage patch.
* Add drizzle_safe_write_string to symbols.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2007 PrimeBase Technologies GmbH
2
 
 *
3
 
 * PrimeBase XT
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * 2007-10-30   Paul McCullagh
20
 
 *
21
 
 * H&G2JCtL
22
 
 *
23
 
 * The new table cache. Caches all non-index data. This includes the data
24
 
 * files and the row pointer files.
25
 
 */
26
 
 
27
 
#include "xt_config.h"
28
 
 
29
 
#ifdef DRIZZLED
30
 
#include <bitset>
31
 
#endif
32
 
 
33
 
#include <signal.h>
34
 
 
35
 
#include "pthread_xt.h"
36
 
#include "tabcache_xt.h"
37
 
#include "table_xt.h"
38
 
#include "database_xt.h"
39
 
#include "trace_xt.h"
40
 
#include "myxt_xt.h"
41
 
#include "lock_xt.h"
42
 
#include "strutil_xt.h"
43
 
 
44
 
#ifdef DEBUG
45
 
//#define TRACE_DISTRIBUTION
46
 
#endif
47
 
 
48
 
#ifdef TRACE_DISTRIBUTION
49
 
static void tabc_init_dist(XTTabCachePtr cac, XTFilePtr file);
50
 
static void tabc_dist_change(XTOpenFilePtr file, xtRefID ref_id);
51
 
#endif
52
 
 
53
 
xtPublic XTTabCacheMemRec       xt_tab_cache;
54
 
 
55
 
static void tabc_fr_wait_for_cache(XTThreadPtr self, u_int msecs);
56
 
 
57
 
xtPublic void xt_tc_set_cache_size(size_t cache_size)
58
 
{
59
 
        xt_tab_cache.tcm_cache_size = cache_size;
60
 
        /* Multiplying by this number can overflow a 4 byte value! */
61
 
        xt_tab_cache.tcm_low_level = (size_t) ((xtWord8) cache_size * (xtWord8) 70 / (xtWord8) 100);    // Current 70%
62
 
        xt_tab_cache.tcm_high_level = (size_t) ((xtWord8) cache_size * 95 / (xtWord8) 100);                             // Current 95%
63
 
        xt_tab_cache.tcm_mid_level = (size_t) ((xtWord8) cache_size * 85 / (xtWord8) 100);                              // Current 85%
64
 
}
65
 
 
66
 
/*
67
 
 * Initialize the disk cache.
68
 
 */
69
 
xtPublic void xt_tc_init(XTThreadPtr self, size_t cache_size)
70
 
{
71
 
        xt_tc_set_cache_size(cache_size);
72
 
 
73
 
        xt_tab_cache.tcm_approx_page_count = cache_size / sizeof(XTTabCachePageRec);
74
 
        /* Determine the size of the hash table.
75
 
         * The size is set to 2* the number of pages!
76
 
         */
77
 
        xt_tab_cache.tcm_hash_size = (xt_tab_cache.tcm_approx_page_count * 2) / XT_TC_SEGMENT_COUNT;
78
 
 
79
 
        try_(a) {
80
 
                for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
81
 
                        xt_tab_cache.tcm_segment[i].tcs_cache_in_use = 0;
82
 
                        xt_tab_cache.tcm_segment[i].tcs_hash_table = (XTTabCachePagePtr *) xt_calloc(self, xt_tab_cache.tcm_hash_size * sizeof(XTTabCachePagePtr));
83
 
                        TAB_CAC_INIT_LOCK(self, &xt_tab_cache.tcm_segment[i].tcs_lock);
84
 
                }
85
 
 
86
 
                xt_init_mutex_with_autoname(self, &xt_tab_cache.tcm_lock);
87
 
                xt_init_cond(self, &xt_tab_cache.tcm_cond);
88
 
                xt_init_mutex_with_autoname(self, &xt_tab_cache.tcm_freeer_lock);
89
 
                xt_init_cond(self, &xt_tab_cache.tcm_freeer_cond);
90
 
        }
91
 
        catch_(a) {
92
 
                xt_tc_exit(self);
93
 
                throw_();
94
 
        }
95
 
        cont_(a);
96
 
}
97
 
 
98
 
xtPublic void xt_tc_exit(XTThreadPtr self)
99
 
{
100
 
        XTTabCacheSegPtr seg;
101
 
 
102
 
        for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
103
 
                seg = &xt_tab_cache.tcm_segment[i];
104
 
                if (seg->tcs_hash_table) {
105
 
                        XTTabCachePagePtr page, tmp_page;
106
 
 
107
 
                        for (size_t j=0; j<xt_tab_cache.tcm_hash_size; j++) {
108
 
                                page = seg->tcs_hash_table[j];
109
 
                                while (page) {
110
 
                                        tmp_page = page;
111
 
                                        page = page->tcp_next;
112
 
                                        ASSERT_NS(seg->tcs_cache_in_use >= offsetof(XTTabCachePageRec, tcp_data) + tmp_page->tcp_data_size);
113
 
                                        seg->tcs_cache_in_use -= (offsetof(XTTabCachePageRec, tcp_data) + tmp_page->tcp_data_size);
114
 
                                        ASSERT_NS(seg->tcs_cache_in_use == 0 || seg->tcs_cache_in_use >= 25000);
115
 
                                        xt_free(self, tmp_page);
116
 
                                }
117
 
                        }
118
 
 
119
 
#ifdef CHECK_DOUBLE_READ
120
 
                        printf("reads = %d, not req. = %d %% unrequired = %.2f\n", seg->tcs_total_reads, seg->tcs_read_not_req, 
121
 
                                (double) seg->tcs_read_not_req / (double) seg->tcs_total_reads);
122
 
#endif
123
 
                        xt_free(self, seg->tcs_hash_table);
124
 
                        seg->tcs_hash_table = NULL;
125
 
                        TAB_CAC_FREE_LOCK(self, &seg->tcs_lock);
126
 
                }
127
 
                ASSERT_NS(seg->tcs_cache_in_use == 0);
128
 
        }
129
 
 
130
 
        xt_free_mutex(&xt_tab_cache.tcm_lock);
131
 
        xt_free_cond(&xt_tab_cache.tcm_cond);
132
 
        xt_free_mutex(&xt_tab_cache.tcm_freeer_lock);
133
 
        xt_free_cond(&xt_tab_cache.tcm_freeer_cond);
134
 
}
135
 
 
136
 
xtPublic xtInt8 xt_tc_get_usage()
137
 
{
138
 
        xtInt8 size = 0;
139
 
 
140
 
        for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
141
 
                size += xt_tab_cache.tcm_segment[i].tcs_cache_in_use;
142
 
        }
143
 
        return size;
144
 
}
145
 
 
146
 
xtPublic xtInt8 xt_tc_get_size()
147
 
{
148
 
        return (xtInt8) xt_tab_cache.tcm_cache_size;
149
 
}
150
 
 
151
 
xtPublic xtInt8 xt_tc_get_high()
152
 
{
153
 
        return (xtInt8) xt_tab_cache.tcm_cache_high;
154
 
}
155
 
 
156
 
#ifdef DEBUG
157
 
xtPublic void xt_check_table_cache(XTTableHPtr tab)
158
 
{
159
 
        XTTabCachePagePtr page, ppage;
160
 
 
161
 
        xt_lock_mutex_ns(&xt_tab_cache.tcm_lock);
162
 
        ppage = NULL;
163
 
        page = xt_tab_cache.tcm_lru_page;
164
 
        while (page) {
165
 
                if (tab) {
166
 
                        if (page->tcp_db_id == tab->tab_db->db_id && page->tcp_tab_id == tab->tab_id) {
167
 
                                ASSERT_NS(!XTTableSeq::xt_op_is_before(tab->tab_seq.ts_next_seq, page->tcp_op_seq));
168
 
                        }
169
 
                }
170
 
                ASSERT_NS(page->tcp_lr_used == ppage);
171
 
                ppage = page;
172
 
                page = page->tcp_mr_used;
173
 
        }
174
 
        ASSERT_NS(xt_tab_cache.tcm_mru_page == ppage);
175
 
        xt_unlock_mutex_ns(&xt_tab_cache.tcm_lock);
176
 
}
177
 
#endif
178
 
 
179
 
void XTTabCache::xt_tc_setup(XTTableHPtr tab, xtBool rec_file, size_t head_size, size_t rec_size)
180
 
{
181
 
        tci_table = tab;
182
 
        tci_rec_file = rec_file;
183
 
        tci_header_size = head_size;
184
 
        tci_rec_size = rec_size;
185
 
        tci_rows_per_page = (XT_TC_PAGE_SIZE / rec_size) + 1;
186
 
        if (tci_rows_per_page < 2)
187
 
                tci_rows_per_page = 2;
188
 
        tci_page_size = tci_rows_per_page * rec_size;
189
 
 
190
 
#ifdef TRACE_DISTRIBUTION
191
 
        tabc_init_dist(this, tab->tab_rec_file);
192
 
#endif
193
 
}
194
 
 
195
 
/*
196
 
 * This function assumes that we never write past the boundary of a page.
197
 
 * This should be the case, because we should never write more than
198
 
 * a row, and there are only whole rows on a page.
199
 
 */
200
 
xtBool XTTabCache::xt_tc_write(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t inc, size_t size, xtWord1 *data, xtOpSeqNo *op_seq, xtBool read, XTThreadPtr thread)
201
 
{
202
 
        size_t                          offset;
203
 
        XTTabCachePagePtr       page;
204
 
        XTTabCacheSegPtr        seg;
205
 
 
206
 
#ifdef TRACE_DISTRIBUTION
207
 
        tabc_dist_change(file, ref_id);
208
 
#endif
209
 
        /*
210
 
        retry:
211
 
        */
212
 
        if (!tc_fetch(file, ref_id, &seg, &page, &offset, read, thread))
213
 
                return FAILED;
214
 
        /* Don't write while there is a read lock on the page,
215
 
         * which can happen during a sequential scan...
216
 
         *
217
 
         * This will have to be OK.
218
 
         * I cannot wait for the lock because a thread locks
219
 
         * itself out when updating during a sequential scan.
220
 
         *
221
 
         * However, I don't think this is a problem, because
222
 
         * the only records that are changed, are records
223
 
         * containing uncommitted data. Such records should
224
 
         * be ignored by a sequential scan. As long as
225
 
         * we don't crash due to reading half written
226
 
         * data!
227
 
         *
228
 
        if (page->tcp_lock_count) {
229
 
                if (!xt_timed_wait_cond_ns(&seg->tcs_cond, &seg->tcs_lock, 100)) {
230
 
                        xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id);
231
 
                        return FAILED;
232
 
                }
233
 
                xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id);
234
 
                // The page may have dissappeared from the cache, while we were sleeping!
235
 
                goto retry;
236
 
        }
237
 
        */
238
 
        
239
 
        ASSERT_NS(offset + inc + 4 <= tci_page_size);
240
 
        memcpy(page->tcp_data + offset + inc, data, size);
241
 
        /* GOTCHA, this was "op_seq > page->tcp_op_seq", however
242
 
         * this does not handle overflow!
243
 
        if (XTTableSeq::xt_op_is_before(page->tcp_op_seq, op_seq))
244
 
                page->tcp_op_seq = op_seq;
245
 
         */
246
 
 
247
 
        page->tcp_dirty = TRUE;
248
 
        ASSERT_NS(page->tcp_db_id == tci_table->tab_db->db_id && page->tcp_tab_id == tci_table->tab_id);
249
 
        *op_seq = tci_table->tab_seq.ts_set_op_seq(page);
250
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
251
 
        return OK;
252
 
}
253
 
 
254
 
/*
255
 
 * This is a special version of write which is used to set the "clean" bit.
256
 
 * The alternative would be to read the record first, but this
257
 
 * is much quicker!
258
 
 *
259
 
 * This function also checks if xn_id, row_id and other data match (the checks 
260
 
 * are similar to xn_sw_cleanup_done) before modifying the record, otherwise it 
261
 
 * assumes that the record was already updated earlier and we must not set it to 
262
 
 * clean.
263
 
 *
264
 
 * If the record was not modified the function returns FALSE.
265
 
 *
266
 
 * The function has a self pointer and can throw an exception.
267
 
 */
268
 
xtBool XTTabCache::xt_tc_write_cond(XTThreadPtr self, XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord1 new_type, xtOpSeqNo *op_seq, 
269
 
        xtXactID xn_id, xtRowID row_id, u_int stat_id, u_int rec_type)
270
 
{
271
 
        size_t                          offset;
272
 
        XTTabCachePagePtr       page;
273
 
        XTTabCacheSegPtr        seg;
274
 
        XTTabRecHeadDPtr        rec_head;
275
 
 
276
 
#ifdef TRACE_DISTRIBUTION
277
 
        tabc_dist_change(file, ref_id);
278
 
#endif
279
 
        if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, self))
280
 
                xt_throw(self);
281
 
 
282
 
        ASSERT(offset + 1 <= tci_page_size);
283
 
 
284
 
        rec_head = (XTTabRecHeadDPtr)(page->tcp_data + offset);
285
 
 
286
 
        /* Transaction must match: */
287
 
        if (XT_GET_DISK_4(rec_head->tr_xact_id_4) != xn_id)
288
 
                goto no_change;
289
 
 
290
 
        /* Record header must match expected value from
291
 
         * log or clean has been done, or is not required.
292
 
         *
293
 
         * For example, it is not required if a record
294
 
         * has been overwritten in a transaction.
295
 
         */
296
 
        if (rec_head->tr_rec_type_1 != rec_type ||
297
 
                rec_head->tr_stat_id_1 != stat_id)
298
 
                goto no_change;
299
 
 
300
 
        /* Row must match: */
301
 
        if (XT_GET_DISK_4(rec_head->tr_row_id_4) != row_id)
302
 
                goto no_change;
303
 
 
304
 
        *(page->tcp_data + offset) = new_type;
305
 
 
306
 
        page->tcp_dirty = TRUE;
307
 
        ASSERT(page->tcp_db_id == tci_table->tab_db->db_id && page->tcp_tab_id == tci_table->tab_id);
308
 
        *op_seq = tci_table->tab_seq.ts_set_op_seq(page);
309
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
310
 
        return TRUE;
311
 
 
312
 
        no_change:
313
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
314
 
        return FALSE;
315
 
}
316
 
 
317
 
xtBool XTTabCache::xt_tc_read(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t size, xtWord1 *data, XTThreadPtr thread)
318
 
{
319
 
#ifdef XT_USE_ROW_REC_MMAP_FILES
320
 
        return tc_read_direct(file, ref_id, size, data, thread);
321
 
#else
322
 
        size_t                          offset;
323
 
        XTTabCachePagePtr       page;
324
 
        XTTabCacheSegPtr        seg;
325
 
 
326
 
        if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, thread))
327
 
                return FAILED;
328
 
        /* A read must be completely on a page: */
329
 
        ASSERT_NS(offset + size <= tci_page_size);
330
 
        memcpy(data, page->tcp_data + offset, size);
331
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
332
 
        return OK;
333
 
#endif
334
 
}
335
 
 
336
 
xtBool XTTabCache::xt_tc_read_4(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord4 *value, XTThreadPtr thread)
337
 
{
338
 
#ifdef XT_USE_ROW_REC_MMAP_FILES
339
 
        register u_int                          page_idx;
340
 
        register XTTabCachePagePtr      page;
341
 
        register XTTabCacheSegPtr       seg;
342
 
        register u_int                          hash_idx;
343
 
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
344
 
        off_t                                           address;
345
 
 
346
 
        ASSERT_NS(ref_id);
347
 
        ref_id--;
348
 
        page_idx = ref_id / this->tci_rows_per_page;
349
 
        address = (off_t) ref_id * (off_t) this->tci_rec_size + (off_t) this->tci_header_size;
350
 
 
351
 
        hash_idx = page_idx + (file->fr_id * 223);
352
 
        seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
353
 
        hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
354
 
 
355
 
        TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id);
356
 
        page = seg->tcs_hash_table[hash_idx];
357
 
        while (page) {
358
 
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
359
 
                        size_t  offset;
360
 
                        xtWord1 *buffer;
361
 
 
362
 
                        offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
363
 
                        ASSERT_NS(offset + 4 <= this->tci_page_size);
364
 
                        buffer = page->tcp_data + offset;
365
 
                        *value = XT_GET_DISK_4(buffer);
366
 
                        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
367
 
                        return OK;
368
 
                }
369
 
                page = page->tcp_next;
370
 
        }
371
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
372
 
 
373
 
        return xt_pread_file_4(file, address, value, &thread->st_statistics.st_rec, thread);
374
 
#else
375
 
        size_t                          offset;
376
 
        XTTabCachePagePtr       page;
377
 
        XTTabCacheSegPtr        seg;
378
 
        xtWord1                         *data;
379
 
 
380
 
        if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, thread))
381
 
                return FAILED;
382
 
        /* A read must be completely on a page: */
383
 
        ASSERT_NS(offset + 4 <= tci_page_size);
384
 
        data = page->tcp_data + offset;
385
 
        *value = XT_GET_DISK_4(data);
386
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
387
 
        return OK;
388
 
#endif
389
 
}
390
 
 
391
 
xtBool XTTabCache::xt_tc_get_page(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtBool load, XTTabCachePagePtr *ret_page, size_t *offset, XTThreadPtr thread)
392
 
{
393
 
        XTTabCachePagePtr       page;
394
 
        XTTabCacheSegPtr        seg;
395
 
 
396
 
        if (load) {
397
 
                if (!tc_fetch(file, ref_id, &seg, &page, offset, TRUE, thread))
398
 
                        return FAILED;
399
 
        }
400
 
        else {
401
 
                if (!tc_fetch_direct(file, ref_id, &seg, &page, offset, thread))
402
 
                        return FAILED;
403
 
                if (!seg) {
404
 
                        *ret_page = NULL;
405
 
                        return OK;
406
 
                }
407
 
        }
408
 
        page->tcp_lock_count++;
409
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
410
 
        *ret_page = page;
411
 
        return OK;
412
 
}
413
 
 
414
 
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
415
 
void XTTabCache::xt_tc_release_page(XT_ROW_REC_FILE_PTR XT_UNUSED(file), XTTabCachePagePtr page, XTThreadPtr thread __attribute__((unused)))
416
 
{
417
 
        XTTabCacheSegPtr        seg;
418
 
 
419
 
        seg = &xt_tab_cache.tcm_segment[page->tcp_seg];
420
 
        TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id);
421
 
 
422
 
#ifdef DEBUG
423
 
        XTTabCachePagePtr lpage, ppage;
424
 
 
425
 
        ppage = NULL;
426
 
        lpage = seg->tcs_hash_table[page->tcp_hash_idx];
427
 
        while (lpage) {
428
 
                if (lpage->tcp_page_idx == page->tcp_page_idx &&
429
 
                        lpage->tcp_file_id == page->tcp_file_id)
430
 
                        break;
431
 
                ppage = lpage;
432
 
                lpage = lpage->tcp_next;
433
 
        }
434
 
 
435
 
        ASSERT_NS(page == lpage);
436
 
        ASSERT_NS(page->tcp_lock_count > 0);
437
 
#endif
438
 
 
439
 
        if (page->tcp_lock_count > 0)
440
 
                page->tcp_lock_count--;
441
 
 
442
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
443
 
}
444
 
 
445
 
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
446
 
xtBool XTTabCache::xt_tc_lock_page(XT_ROW_REC_FILE_PTR file, XTTabCachePagePtr *ret_page, xtRefID ref_id, size_t *offset, XTThreadPtr thread __attribute__((unused)))
447
 
{
448
 
        XTTabCachePagePtr       page;
449
 
        XTTabCacheSegPtr        seg;
450
 
 
451
 
#ifdef TRACE_DISTRIBUTION
452
 
        tabc_dist_change(file, ref_id);
453
 
#endif
454
 
        if (!tc_fetch(file, ref_id, &seg, &page, offset, TRUE, thread))
455
 
                return FAILED;
456
 
        *ret_page = page;
457
 
        return OK;
458
 
}
459
 
 
460
 
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
461
 
void XTTabCache::xt_tc_unlock_page(XT_ROW_REC_FILE_PTR XT_UNUSED(file), XTTabCachePagePtr page, xtOpSeqNo *op_seq, XTThreadPtr thread __attribute__((unused)))
462
 
{
463
 
        XTTabCacheSegPtr        seg;
464
 
 
465
 
        seg = &xt_tab_cache.tcm_segment[page->tcp_seg];
466
 
        page->tcp_dirty = TRUE;
467
 
        *op_seq = tci_table->tab_seq.ts_set_op_seq(page);
468
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
469
 
}
470
 
 
471
 
xtBool XTTabCache::xt_tc_read_page(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord1 *data, XTThreadPtr thread)
472
 
{
473
 
        return tc_read_direct(file, ref_id, this->tci_page_size, data, thread);
474
 
}
475
 
 
476
 
/* Read row and record files directly.
477
 
 * This by-passed the cache when reading, which mean
478
 
 * we rely in the OS for caching.
479
 
 * This probably only makes sense when these files
480
 
 * are memory mapped.
481
 
 */
482
 
xtBool XTTabCache::tc_read_direct(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t size, xtWord1 *data, XTThreadPtr thread)
483
 
{
484
 
        register u_int                          page_idx;
485
 
        register XTTabCachePagePtr      page;
486
 
        register XTTabCacheSegPtr       seg;
487
 
        register u_int                          hash_idx;
488
 
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
489
 
        size_t                                          red_size;
490
 
        off_t                                           address;
491
 
 
492
 
        ASSERT_NS(ref_id);
493
 
        ref_id--;
494
 
        page_idx = ref_id / this->tci_rows_per_page;
495
 
        address = (off_t) ref_id * (off_t) this->tci_rec_size + (off_t) this->tci_header_size;
496
 
 
497
 
        hash_idx = page_idx + (file->fr_id * 223);
498
 
        seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
499
 
        hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
500
 
 
501
 
        TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id);
502
 
        page = seg->tcs_hash_table[hash_idx];
503
 
        while (page) {
504
 
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
505
 
                        size_t offset;
506
 
 
507
 
                        offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
508
 
                        ASSERT_NS(offset + size <= this->tci_page_size);
509
 
                        memcpy(data, page->tcp_data + offset, size);
510
 
                        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
511
 
                        return OK;
512
 
                }
513
 
                page = page->tcp_next;
514
 
        }
515
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
516
 
        if (!XT_PREAD_RR_FILE(file, address, size, 0, data, &red_size, &thread->st_statistics.st_rec, thread))
517
 
                return FAILED;
518
 
        memset(data + red_size, 0, size - red_size);
519
 
        return OK;
520
 
}
521
 
 
522
 
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
523
 
xtBool XTTabCache::tc_fetch_direct(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCacheSegPtr *ret_seg, XTTabCachePagePtr *ret_page, size_t *offset, XTThreadPtr thread  __attribute__((unused)))
524
 
{
525
 
        register u_int                          page_idx;
526
 
        register XTTabCachePagePtr      page;
527
 
        register XTTabCacheSegPtr       seg;
528
 
        register u_int                          hash_idx;
529
 
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
530
 
 
531
 
        ASSERT_NS(ref_id);
532
 
        ref_id--;
533
 
        page_idx = ref_id / this->tci_rows_per_page;
534
 
        *offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
535
 
 
536
 
        hash_idx = page_idx + (file->fr_id * 223);
537
 
        seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
538
 
        hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
539
 
 
540
 
        TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id);
541
 
        page = seg->tcs_hash_table[hash_idx];
542
 
        while (page) {
543
 
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
544
 
                        *ret_seg = seg;
545
 
                        *ret_page = page;
546
 
                        return OK;
547
 
                }
548
 
                page = page->tcp_next;
549
 
        }
550
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
551
 
        *ret_seg = NULL;
552
 
        *ret_page = NULL;
553
 
        return OK;
554
 
}
555
 
 
556
 
/*
557
 
 * Note, this function may return an exclusive, or a shared lock.
558
 
 * If the page is in cache it will return a shared lock of the segment.
559
 
 * If the page was just added to the cache it will return an
560
 
 * exclusive lock.
561
 
 */
562
 
xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCacheSegPtr *ret_seg, XTTabCachePagePtr *ret_page, size_t *offset, xtBool read, XTThreadPtr thread)
563
 
{
564
 
        register u_int                          page_idx;
565
 
        register XTTabCachePagePtr      page, new_page;
566
 
        register XTTabCacheSegPtr       seg;
567
 
        register u_int                          hash_idx;
568
 
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
569
 
        size_t                                          red_size;
570
 
        off_t                                           address;
571
 
 
572
 
        ASSERT_NS(ref_id);
573
 
        ref_id--;
574
 
        page_idx = ref_id / this->tci_rows_per_page;
575
 
        address = (off_t) page_idx * (off_t) this->tci_page_size + (off_t) this->tci_header_size;
576
 
        *offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
577
 
 
578
 
        hash_idx = page_idx + (file->fr_id * 223);
579
 
        seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
580
 
        hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
581
 
 
582
 
        TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id);
583
 
        page = seg->tcs_hash_table[hash_idx];
584
 
        while (page) {
585
 
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
586
 
                        /* This page has been most recently used: */
587
 
                        if (XT_TIME_DIFF(page->tcp_ru_time, dcg->tcm_ru_now) > (dcg->tcm_approx_page_count >> 1)) {
588
 
                                /* Move to the front of the MRU list: */
589
 
                                xt_lock_mutex_ns(&dcg->tcm_lock);
590
 
 
591
 
                                page->tcp_ru_time = ++dcg->tcm_ru_now;
592
 
                                if (dcg->tcm_mru_page != page) {
593
 
                                        /* Remove from the MRU list: */
594
 
                                        if (dcg->tcm_lru_page == page)
595
 
                                                dcg->tcm_lru_page = page->tcp_mr_used;
596
 
                                        if (page->tcp_lr_used)
597
 
                                                page->tcp_lr_used->tcp_mr_used = page->tcp_mr_used;
598
 
                                        if (page->tcp_mr_used)
599
 
                                                page->tcp_mr_used->tcp_lr_used = page->tcp_lr_used;
600
 
        
601
 
                                        /* Make the page the most recently used: */
602
 
                                        if ((page->tcp_lr_used = dcg->tcm_mru_page))
603
 
                                                dcg->tcm_mru_page->tcp_mr_used = page;
604
 
                                        page->tcp_mr_used = NULL;
605
 
                                        dcg->tcm_mru_page = page;
606
 
                                        if (!dcg->tcm_lru_page)
607
 
                                                dcg->tcm_lru_page = page;
608
 
                                }
609
 
                                xt_unlock_mutex_ns(&dcg->tcm_lock);
610
 
                        }
611
 
                        *ret_seg = seg;
612
 
                        *ret_page = page;
613
 
                        thread->st_statistics.st_rec_cache_hit++;
614
 
                        return OK;
615
 
                }
616
 
                page = page->tcp_next;
617
 
        }
618
 
        
619
 
        size_t page_size = offsetof(XTTabCachePageRec, tcp_data) + this->tci_page_size;
620
 
 
621
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
622
 
        
623
 
        /* Page not found, allocate a new page: */
624
 
        if (!(new_page = (XTTabCachePagePtr) xt_malloc_ns(page_size)))
625
 
                return FAILED;
626
 
 
627
 
        /* Check the level of the cache: */
628
 
        size_t cache_used = 0;
629
 
        for (int i=0; i<XT_TC_SEGMENT_COUNT; i++)
630
 
                cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
631
 
 
632
 
        if (cache_used + page_size > dcg->tcm_cache_high)
633
 
                dcg->tcm_cache_high = cache_used;
634
 
 
635
 
        if (cache_used + page_size > dcg->tcm_cache_size) {
636
 
                XTThreadPtr self;
637
 
                time_t          now;
638
 
 
639
 
                /* Wait for the cache level to go down.
640
 
                 * If this happens, then the freeer is not working fast
641
 
                 * enough!
642
 
                 */
643
 
 
644
 
                /* But before I do this, I must flush my own log because:
645
 
                 * - The freeer might be waiting for a page to be cleaned.
646
 
                 * - The page can only be cleaned once it has been written to
647
 
                 *   the database.
648
 
                 * - The writer cannot write the page data until it has been
649
 
                 *   flushed to the log.
650
 
                 * - The log won't be flushed, unless this thread does it.
651
 
                 * So there could be a deadlock if I don't flush the log!
652
 
                 */
653
 
                if ((self = xt_get_self())) {
654
 
                        if (!xt_xlog_flush_log(tci_table->tab_db, self))
655
 
                                goto failed;
656
 
                }
657
 
 
658
 
                /* Wait for the free'er thread: */
659
 
                xt_lock_mutex_ns(&dcg->tcm_freeer_lock);
660
 
                now = time(NULL);
661
 
                do {
662
 
                        /* I have set the timeout to 2 here because of the following situation:
663
 
                         * 1. Transaction allocates an op seq
664
 
                         * 2. Transaction goes to update cache, but must wait for
665
 
                         *    cache to be freed (after this, the op would be written to
666
 
                         *    the log).
667
 
                         * 3. The free'er wants to free cache, but is waiting for the writter.
668
 
                         * 4. The writer cannot continue because an op seq is missing!
669
 
                         *    So the writer is waiting for the transaction thread to write
670
 
                         *    the op seq.
671
 
                         * - So we have a deadlock situation.
672
 
                         * - However, this situation can only occur if there is not enougn
673
 
                         *   cache.
674
 
                         * The timeout helps, but will not solve the problem, unless we
675
 
                         * ignore cache level here, after a while, and just continue.
676
 
                         */
677
 
 
678
 
                        /* Wake freeer before we go to sleep: */
679
 
                        if (!dcg->tcm_freeer_busy) {
680
 
                                if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
681
 
                                        xt_log_and_clear_exception_ns();
682
 
                        }
683
 
 
684
 
                        dcg->tcm_threads_waiting++;
685
 
#ifdef DEBUG
686
 
                        if (!xt_timed_wait_cond_ns(&dcg->tcm_freeer_cond, &dcg->tcm_freeer_lock, 30000)) {
687
 
                                dcg->tcm_threads_waiting--;
688
 
                                break;
689
 
                        }
690
 
#else
691
 
                        if (!xt_timed_wait_cond_ns(&dcg->tcm_freeer_cond, &dcg->tcm_freeer_lock, 1000)) {
692
 
                                dcg->tcm_threads_waiting--;
693
 
                                break;
694
 
                        }
695
 
#endif
696
 
                        dcg->tcm_threads_waiting--;
697
 
 
698
 
                        cache_used = 0;
699
 
                        for (int i=0; i<XT_TC_SEGMENT_COUNT; i++)
700
 
                                cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
701
 
 
702
 
                        if (cache_used + page_size <= dcg->tcm_high_level)
703
 
                                break;
704
 
                        /*
705
 
                         * If there is too little cache we can get stuck here.
706
 
                         * The problem is that seg numbers are allocated before fetching a
707
 
                         * record to be updated.
708
 
                         *
709
 
                         * It can happen that we end up waiting for that seq number
710
 
                         * to be written to the log before we can continue here.
711
 
                         *
712
 
                         * This happens as follows:
713
 
                         * 1. This thread waits for the freeer.
714
 
                         * 2. The freeer cannot free a page because it has not been
715
 
                         *    written by the writter.
716
 
                         * 3. The writter cannot continue because it is waiting
717
 
                         *    for a missing sequence number.
718
 
                         * 4. The missing sequence number is the one allocated
719
 
                         *    before we entered this function!
720
 
                         * 
721
 
                         * So don't wait for more than 5 seconds here!
722
 
                         */
723
 
                }
724
 
                while (time(NULL) < now + 5);
725
 
                xt_unlock_mutex_ns(&dcg->tcm_freeer_lock);
726
 
        }
727
 
        else if (cache_used + page_size > dcg->tcm_high_level) {
728
 
                /* Wake up the freeer because the cache level,
729
 
                 * is higher than the high level.
730
 
                 */
731
 
                if (!dcg->tcm_freeer_busy) {
732
 
                        xt_lock_mutex_ns(&xt_tab_cache.tcm_freeer_lock);
733
 
                        if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
734
 
                                xt_log_and_clear_exception_ns();
735
 
                        xt_unlock_mutex_ns(&xt_tab_cache.tcm_freeer_lock);
736
 
                }
737
 
        }
738
 
 
739
 
        /* Read the page into memory.... */
740
 
        new_page->tcp_dirty = FALSE;
741
 
        new_page->tcp_seg = (xtWord1) ((page_idx + (file->fr_id * 223)) & XT_TC_SEGMENT_MASK);
742
 
#ifdef XT_CLUSTER_FREE_RECORDS
743
 
        new_page->tcp_free_rec = 0xFFFF;
744
 
#endif
745
 
        new_page->tcp_lock_count = 0;
746
 
        new_page->tcp_hash_idx = hash_idx;
747
 
        new_page->tcp_page_idx = page_idx;
748
 
        new_page->tcp_file_id = file->fr_id;
749
 
        new_page->tcp_db_id = this->tci_table->tab_db->db_id;
750
 
        new_page->tcp_tab_id = this->tci_table->tab_id;
751
 
        new_page->tcp_data_size = this->tci_page_size;
752
 
        new_page->tcp_op_seq = 0; // Value not used because not dirty
753
 
 
754
 
        if (read) {
755
 
                if (!XT_PREAD_RR_FILE(file, address, this->tci_page_size, 0, new_page->tcp_data, &red_size, &thread->st_statistics.st_rec, thread))
756
 
                        goto failed;
757
 
 
758
 
#ifdef XT_CLUSTER_FREE_RECORDS
759
 
                /* Find the first free record! */
760
 
                if (tci_rec_file) {
761
 
                        xtWord1 *buff_ptr;
762
 
                        xtWord1 *end_ptr;
763
 
 
764
 
                        buff_ptr = new_page->tcp_data;
765
 
                        end_ptr = new_page->tcp_data + red_size;
766
 
                        while (buff_ptr < end_ptr) {
767
 
                                if (XT_REC_IS_FREE(*buff_ptr)) {
768
 
                                        new_page->tcp_free_rec = (xtWord2) (buff_ptr - new_page->tcp_data);
769
 
                                        break;
770
 
                                }
771
 
                                buff_ptr += tci_rec_size;
772
 
                        }
773
 
                }
774
 
#endif
775
 
        }
776
 
#ifdef XT_MEMSET_UNUSED_SPACE
777
 
        else
778
 
                red_size = 0;
779
 
 
780
 
        /* Removing this is an optimization. It should not be required
781
 
         * to clear the unused space in the page.
782
 
         */
783
 
        memset(new_page->tcp_data + red_size, 0, this->tci_page_size - red_size);
784
 
#endif
785
 
 
786
 
        /* Add the page to the cache! */
787
 
        TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id);
788
 
#ifdef CHECK_DOUBLE_READ
789
 
        seg->tcs_total_reads++;
790
 
#endif
791
 
        page = seg->tcs_hash_table[hash_idx];
792
 
        while (page) {
793
 
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
794
 
                        /* Oops, someone else was faster! */
795
 
#ifdef CHECK_DOUBLE_READ
796
 
                        seg->tcs_read_not_req++;
797
 
#endif
798
 
                        xt_free_ns(new_page);
799
 
                        goto done_ok;
800
 
                }
801
 
                page = page->tcp_next;
802
 
        }
803
 
        page = new_page;
804
 
 
805
 
        /* Make the page the most recently used: */
806
 
        xt_lock_mutex_ns(&dcg->tcm_lock);
807
 
        page->tcp_ru_time = ++dcg->tcm_ru_now;
808
 
        if ((page->tcp_lr_used = dcg->tcm_mru_page))
809
 
                dcg->tcm_mru_page->tcp_mr_used = page;
810
 
        page->tcp_mr_used = NULL;
811
 
        dcg->tcm_mru_page = page;
812
 
        if (!dcg->tcm_lru_page)
813
 
                dcg->tcm_lru_page = page;
814
 
        xt_unlock_mutex_ns(&dcg->tcm_lock);
815
 
 
816
 
        /* Add the page to the hash table: */
817
 
        page->tcp_next = seg->tcs_hash_table[hash_idx];
818
 
        seg->tcs_hash_table[hash_idx] = page;
819
 
 
820
 
        /* GOTCHA! This increment was done just after the malloc!
821
 
         * So it was not protected by the segment lock!
822
 
         * The result was that this count was no longer reliable,
823
 
         * This resulted in the amount of cache being used becoming less, and\
824
 
         * less, because increments were lost over time!
825
 
         */
826
 
        /* Increment cache used. */
827
 
        seg->tcs_cache_in_use += page_size;
828
 
 
829
 
        done_ok:
830
 
        *ret_seg = seg;
831
 
        *ret_page = page;
832
 
#ifdef DEBUG_CHECK_CACHE
833
 
        //XT_TC_check_cache();
834
 
#endif
835
 
        thread->st_statistics.st_rec_cache_miss++;
836
 
        return OK;
837
 
 
838
 
        failed:
839
 
        xt_free_ns(new_page);
840
 
        return FAILED;
841
 
}
842
 
 
843
 
 
844
 
/* ----------------------------------------------------------------------
845
 
 * OPERATION SEQUENCE
846
 
 */
847
 
 
848
 
xtBool XTTableSeq::ts_log_no_op(XTThreadPtr thread, xtTableID tab_id, xtOpSeqNo op_seq)
849
 
{
850
 
        XTactNoOpEntryDRec      ent_rec;
851
 
        xtWord4                         sum = (xtWord4) tab_id ^ (xtWord4) op_seq;
852
 
 
853
 
        ent_rec.no_status_1 = XT_LOG_ENT_NO_OP;
854
 
        ent_rec.no_checksum_1 = XT_CHECKSUM_1(sum);
855
 
        XT_SET_DISK_4(ent_rec.no_tab_id_4, tab_id);
856
 
        XT_SET_DISK_4(ent_rec.no_op_seq_4, op_seq);
857
 
        /* TODO - If this also fails we have a problem.
858
 
         * From this point on we should actually not generate
859
 
         * any more op IDs. The problem is that the
860
 
         * some will be missing, so the writer will not
861
 
         * be able to contniue.
862
 
         */
863
 
        return xt_xlog_log_data(thread, sizeof(XTactNoOpEntryDRec), (XTXactLogBufferDPtr) &ent_rec, XT_XLOG_NO_WRITE_NO_FLUSH);
864
 
}
865
 
 
866
 
#ifdef XT_NOT_INLINE
867
 
xtOpSeqNo XTTableSeq::ts_set_op_seq(XTTabCachePagePtr page)
868
 
{
869
 
        xtOpSeqNo seq;
870
 
 
871
 
        xt_lock_mutex_ns(&ts_ns_lock);
872
 
        page->tcp_op_seq = seq = ts_next_seq++;
873
 
        xt_unlock_mutex_ns(&ts_ns_lock);
874
 
        return seq;
875
 
}
876
 
 
877
 
xtOpSeqNo XTTableSeq::ts_get_op_seq()
878
 
{
879
 
        xtOpSeqNo seq;
880
 
 
881
 
        xt_lock_mutex_ns(&ts_ns_lock);
882
 
        seq = ts_next_seq++;
883
 
        xt_unlock_mutex_ns(&ts_ns_lock);
884
 
        return seq;
885
 
}
886
 
#endif
887
 
 
888
 
#ifdef XT_NOT_INLINE
889
 
/*
890
 
 * Return TRUE if the current sequence is before the
891
 
 * target (then) sequence number. This function
892
 
 * takes into account overflow. Overflow is detected
893
 
 * by checking the difference between the 2 values.
894
 
 * If the difference is very large, then we
895
 
 * assume overflow.
896
 
 */
897
 
xtBool XTTableSeq::xt_op_is_before(register xtOpSeqNo now, register xtOpSeqNo then)
898
 
{
899
 
        ASSERT_NS(sizeof(xtOpSeqNo) == 4);
900
 
        /* The now time is being incremented.
901
 
         * If it is after the then time (which is static, then
902
 
         * it is not before!
903
 
         */
904
 
        if (now >= then) {
905
 
                if ((now - then) > (xtOpSeqNo) 0xFFFFFFFF/2)
906
 
                        return TRUE;
907
 
                return FALSE;
908
 
        }
909
 
 
910
 
        /* If it appears to be before, we still have to check
911
 
         * for overflow. If the gap is bigger then half of
912
 
         * the MAX value, then we can assume it has wrapped around
913
 
         * because we know that no then can be so far in the
914
 
         * future!
915
 
         */
916
 
        if ((then - now) > (xtOpSeqNo) 0xFFFFFFFF/2)
917
 
                return FALSE;
918
 
        return TRUE;
919
 
}
920
 
#endif
921
 
 
922
 
 
923
 
/* ----------------------------------------------------------------------
924
 
 * F R E E E R    P R O C E S S
925
 
 */
926
 
 
927
 
/*
928
 
 * Used by the writer to wake the freeer.
929
 
 */
930
 
xtPublic void xt_wr_wake_freeer(XTThreadPtr XT_UNUSED(self), XTDatabaseHPtr db)
931
 
{
932
 
        /* BUG FIX: Was using tcm_freeer_cond.
933
 
         * This is incorrect. When the freeer waits for the
934
 
         * writter, it uses the writer's condition!
935
 
         */
936
 
        xt_lock_mutex_ns(&db->db_wr_lock);
937
 
        if (!xt_broadcast_cond_ns(&db->db_wr_cond))
938
 
                xt_log_and_clear_exception_ns();
939
 
        xt_unlock_mutex_ns(&db->db_wr_lock);
940
 
/*
941
 
        xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
942
 
        pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
943
 
        if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
944
 
                xt_log_and_clear_exception_ns();
945
 
        freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
946
 
*/
947
 
}
948
 
 
949
 
/* Wait for a transaction to quit: */
950
 
static void tabc_fr_wait_for_cache(XTThreadPtr self, u_int msecs)
951
 
{
952
 
        if (!self->t_quit)
953
 
                xt_timed_wait_cond(NULL, &xt_tab_cache.tcm_freeer_cond, &xt_tab_cache.tcm_freeer_lock, msecs);
954
 
}
955
 
 
956
 
typedef struct TCResource {
957
 
        XTOpenTablePtr          tc_ot;
958
 
} TCResourceRec, *TCResourcePtr;
959
 
 
960
 
static void tabc_free_fr_resources(XTThreadPtr self, TCResourcePtr tc)
961
 
{
962
 
        if (tc->tc_ot) {
963
 
                xt_db_return_table_to_pool(self, tc->tc_ot);
964
 
                tc->tc_ot = NULL;
965
 
        }
966
 
}
967
 
 
968
 
static XTTableHPtr tabc_get_table(XTThreadPtr self, TCResourcePtr tc, xtDatabaseID db_id, xtTableID tab_id)
969
 
{
970
 
        XTTableHPtr     tab;
971
 
        XTDatabaseHPtr  db;
972
 
 
973
 
        if (tc->tc_ot) {
974
 
                tab = tc->tc_ot->ot_table;
975
 
                if (tab->tab_id == tab_id && tab->tab_db->db_id == db_id)
976
 
                        return tab;
977
 
 
978
 
                xt_db_return_table_to_pool(self, tc->tc_ot);
979
 
                tc->tc_ot = NULL;
980
 
        }
981
 
 
982
 
        if (!tc->tc_ot) {
983
 
                if (!(db = xt_get_database_by_id(self, db_id)))
984
 
                        return NULL;
985
 
 
986
 
                pushr_(xt_heap_release, db);
987
 
                tc->tc_ot = xt_db_open_pool_table(self, db, tab_id, NULL, TRUE);
988
 
                freer_(); // xt_heap_release(db);
989
 
                if (!tc->tc_ot)
990
 
                        return NULL;
991
 
        }
992
 
 
993
 
        return tc->tc_ot->ot_table;
994
 
}
995
 
 
996
 
/*
997
 
 * Free the given page, or the least recently used page.
998
 
 * Return the amount of bytes freed.
999
 
 */
1000
 
static size_t tabc_free_page(XTThreadPtr self, TCResourcePtr tc)
1001
 
{
1002
 
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
1003
 
        XTTableHPtr                                     tab = NULL;
1004
 
        XTTabCachePagePtr                       page, lpage, ppage;
1005
 
        XTTabCacheSegPtr                        seg;
1006
 
        u_int                                           page_cnt;
1007
 
        xtBool                                          was_dirty;
1008
 
 
1009
 
#ifdef DEBUG_CHECK_CACHE
1010
 
        //XT_TC_check_cache();
1011
 
#endif
1012
 
        dcg->tcm_free_try_count = 0;
1013
 
 
1014
 
        retry:
1015
 
        /* Note, handling the page is safe because
1016
 
         * there is only one free'er thread which
1017
 
         * can remove pages from the cache!
1018
 
         */
1019
 
        page_cnt = 0;
1020
 
        if (!(page = dcg->tcm_lru_page)) {
1021
 
                dcg->tcm_free_try_count = 0;
1022
 
                return 0;
1023
 
        }
1024
 
 
1025
 
        retry_2:
1026
 
        if ((was_dirty = page->tcp_dirty)) {
1027
 
                /* Do all this stuff without a lock, because to
1028
 
                 * have a lock while doing this is too expensive!
1029
 
                 */
1030
 
        
1031
 
                /* Wait for the page to be cleaned. */
1032
 
                tab = tabc_get_table(self, tc, page->tcp_db_id, page->tcp_tab_id);
1033
 
        }
1034
 
 
1035
 
        seg = &dcg->tcm_segment[page->tcp_seg];
1036
 
        TAB_CAC_WRITE_LOCK(&seg->tcs_lock, self->t_id);
1037
 
 
1038
 
        if (page->tcp_dirty) {
1039
 
                if (!was_dirty) {
1040
 
                        TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1041
 
                        goto retry_2;
1042
 
                }
1043
 
 
1044
 
                if (tab) {
1045
 
                        ASSERT(!XTTableSeq::xt_op_is_before(tab->tab_seq.ts_next_seq, page->tcp_op_seq+1));
1046
 
                        /* This should never happen. However, is has been occuring,
1047
 
                         * during multi_update test on Windows.
1048
 
                         * In particular it occurs after rename of a table, during ALTER.
1049
 
                         * As if the table was not flushed before the rename!?
1050
 
                         * To guard against an infinite loop below, I will just continue here.
1051
 
                         */
1052
 
                        if (XTTableSeq::xt_op_is_before(tab->tab_seq.ts_next_seq, page->tcp_op_seq+1))
1053
 
                                goto go_on;
1054
 
                        /* OK, we have the table, now we check where the current
1055
 
                         * sequence number is.
1056
 
                         */
1057
 
                        if (XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, page->tcp_op_seq)) {
1058
 
                                XTDatabaseHPtr db = tab->tab_db;
1059
 
 
1060
 
                                rewait:
1061
 
                                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1062
 
 
1063
 
                                /* Flush the log, in case this is holding up the
1064
 
                                 * writer!
1065
 
                                 */
1066
 
                                if (!db->db_xlog.xlog_flush(self)) {
1067
 
                                        dcg->tcm_free_try_count = 0;
1068
 
                                        xt_throw(self);
1069
 
                                }
1070
 
 
1071
 
                                xt_lock_mutex(self, &db->db_wr_lock);
1072
 
                                pushr_(xt_unlock_mutex, &db->db_wr_lock);
1073
 
 
1074
 
                                /* The freeer is now waiting: */
1075
 
                                db->db_wr_freeer_waiting = TRUE;
1076
 
 
1077
 
                                /* If the writer is idle, wake it up. 
1078
 
                                 * The writer will commit the changes to the database
1079
 
                                 * which will allow the freeer to free up the cache.
1080
 
                                 */
1081
 
                                if (db->db_wr_idle) {
1082
 
                                        if (!xt_broadcast_cond_ns(&db->db_wr_cond))
1083
 
                                                xt_log_and_clear_exception_ns();
1084
 
                                }
1085
 
 
1086
 
                                /* Go to sleep on the writer's condition.
1087
 
                                 * The writer will wake the free'er before it goes to
1088
 
                                 * sleep!
1089
 
                                 */
1090
 
                                tab->tab_wake_freeer_op = page->tcp_op_seq;
1091
 
                                tab->tab_wr_wake_freeer = TRUE;
1092
 
                                if (!xt_timed_wait_cond_ns(&db->db_wr_cond, &db->db_wr_lock, 30000)) {
1093
 
                                        tab->tab_wr_wake_freeer = FALSE;
1094
 
                                        db->db_wr_freeer_waiting = FALSE;
1095
 
                                        xt_throw(self);
1096
 
                                }
1097
 
                                tab->tab_wr_wake_freeer = FALSE;
1098
 
                                db->db_wr_freeer_waiting = FALSE;
1099
 
                                freer_(); // xt_unlock_mutex(&db->db_wr_lock)
1100
 
 
1101
 
                                TAB_CAC_WRITE_LOCK(&seg->tcs_lock, self->t_id);
1102
 
                                if (XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, page->tcp_op_seq))
1103
 
                                        goto rewait;
1104
 
                        }
1105
 
                        go_on:;
1106
 
                }
1107
 
        }
1108
 
 
1109
 
        /* Wait if the page is being read or locked. */
1110
 
        if (page->tcp_lock_count) {
1111
 
                /* (1) If the page is being read, then we should not free
1112
 
                 *     it immediately.
1113
 
                 * (2) If a page is locked, the locker may be waiting
1114
 
                 *     for the freeer to free some cache - this
1115
 
                 *     causes a deadlock.
1116
 
                 *
1117
 
                 * Therefore, we move on, and try to free another page...
1118
 
                 */
1119
 
                if (page_cnt < (dcg->tcm_approx_page_count >> 1)) {
1120
 
                        /* Page has not changed MRU position, and we
1121
 
                         * have looked at less than half of the pages.
1122
 
                         * Go to the next page...
1123
 
                         */
1124
 
                        if ((page = page->tcp_mr_used)) {
1125
 
                                page_cnt++;
1126
 
                                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1127
 
                                goto retry_2;
1128
 
                        }
1129
 
                }
1130
 
                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1131
 
                dcg->tcm_free_try_count++;                              
1132
 
 
1133
 
                /* Starting to spin, free the threads: */
1134
 
                if (dcg->tcm_threads_waiting) {
1135
 
                        if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
1136
 
                                xt_log_and_clear_exception_ns();
1137
 
                }
1138
 
                goto retry;
1139
 
        }
1140
 
 
1141
 
        /* Page is clean, remove from the hash table: */
1142
 
 
1143
 
        /* Find the page on the list: */
1144
 
        u_int page_idx = page->tcp_page_idx;
1145
 
        u_int file_id = page->tcp_file_id;
1146
 
 
1147
 
        ppage = NULL;
1148
 
        lpage = seg->tcs_hash_table[page->tcp_hash_idx];
1149
 
        while (lpage) {
1150
 
                if (lpage->tcp_page_idx == page_idx && lpage->tcp_file_id == file_id)
1151
 
                        break;
1152
 
                ppage = lpage;
1153
 
                lpage = lpage->tcp_next;
1154
 
        }
1155
 
 
1156
 
        if (page == lpage) {
1157
 
                /* Should be the case! */
1158
 
                if (ppage)
1159
 
                        ppage->tcp_next = page->tcp_next;
1160
 
                else
1161
 
                        seg->tcs_hash_table[page->tcp_hash_idx] = page->tcp_next;
1162
 
        }
1163
 
#ifdef DEBUG
1164
 
        else
1165
 
                ASSERT_NS(FALSE);
1166
 
#endif
1167
 
 
1168
 
        /* Remove from the MRU list: */
1169
 
        xt_lock_mutex_ns(&dcg->tcm_lock);
1170
 
        if (dcg->tcm_lru_page == page)
1171
 
                dcg->tcm_lru_page = page->tcp_mr_used;
1172
 
        if (dcg->tcm_mru_page == page)
1173
 
                dcg->tcm_mru_page = page->tcp_lr_used;
1174
 
        if (page->tcp_lr_used)
1175
 
                page->tcp_lr_used->tcp_mr_used = page->tcp_mr_used;
1176
 
        if (page->tcp_mr_used)
1177
 
                page->tcp_mr_used->tcp_lr_used = page->tcp_lr_used;
1178
 
        xt_unlock_mutex_ns(&dcg->tcm_lock);
1179
 
 
1180
 
        /* Free the page: */
1181
 
        size_t freed_space = offsetof(XTTabCachePageRec, tcp_data) + page->tcp_data_size;
1182
 
        ASSERT_NS(seg->tcs_cache_in_use >= freed_space);
1183
 
        seg->tcs_cache_in_use -= freed_space;
1184
 
        ASSERT_NS(seg->tcs_cache_in_use == 0 || seg->tcs_cache_in_use >= 25000);
1185
 
        xt_free_ns(page);
1186
 
 
1187
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1188
 
        self->st_statistics.st_rec_cache_frees++;
1189
 
        dcg->tcm_free_try_count = 0;
1190
 
        return freed_space;
1191
 
}
1192
 
 
1193
 
 
1194
 
#define CACHE_HIGH_LEVEL(d)                     
1195
 
 
1196
 
static void tabc_fr_main(XTThreadPtr self)
1197
 
{
1198
 
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
1199
 
        TCResourceRec                           tc = { 0 };
1200
 
        int                                                     i;
1201
 
 
1202
 
        xt_set_low_priority(self);
1203
 
        dcg->tcm_freeer_busy = TRUE;
1204
 
 
1205
 
        while (!self->t_quit) {         
1206
 
                size_t cache_used, freed;
1207
 
 
1208
 
                pushr_(tabc_free_fr_resources, &tc);
1209
 
 
1210
 
                while (!self->t_quit) {
1211
 
                        /* Total up the cache memory used: */
1212
 
                        cache_used = 0;
1213
 
                        for (i=0; i<XT_TC_SEGMENT_COUNT; i++)
1214
 
                                cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
1215
 
 
1216
 
                        if (cache_used > dcg->tcm_cache_high)
1217
 
                                dcg->tcm_cache_high = cache_used;
1218
 
 
1219
 
                        /* Check if the cache usage is over 95%: */
1220
 
                        if (self->t_quit)
1221
 
                                break;
1222
 
 
1223
 
                        /* If threads are waiting then we are more aggressive about freeing
1224
 
                         * cache.
1225
 
                         */ 
1226
 
                        if (cache_used < (dcg->tcm_threads_waiting ? dcg->tcm_mid_level : dcg->tcm_high_level))
1227
 
                                break;
1228
 
 
1229
 
                        /* Reduce cache to the 75% level: */
1230
 
                        while (!self->t_quit && cache_used > dcg->tcm_low_level) {
1231
 
                                freed = tabc_free_page(self, &tc);
1232
 
                                cache_used -= freed;
1233
 
                                if (cache_used <= dcg->tcm_high_level) {
1234
 
                                        /* Wakeup any threads that are waiting for some cache to be
1235
 
                                         * freed.
1236
 
                                         */
1237
 
                                        if (dcg->tcm_threads_waiting) {
1238
 
                                                if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
1239
 
                                                        xt_log_and_clear_exception_ns();
1240
 
                                        }
1241
 
                                }
1242
 
                        }
1243
 
                }
1244
 
 
1245
 
                freer_(); // tabc_free_fr_resources(&tc)
1246
 
 
1247
 
                xt_lock_mutex(self, &dcg->tcm_freeer_lock);
1248
 
                pushr_(xt_unlock_mutex, &dcg->tcm_freeer_lock);
1249
 
 
1250
 
                if (dcg->tcm_threads_waiting) {
1251
 
                        /* Wake threads before we go to sleep: */
1252
 
                        if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
1253
 
                                xt_log_and_clear_exception_ns();
1254
 
                }
1255
 
                        
1256
 
                /* Wait for a thread that allocates data to signal
1257
 
                 * that the cache level has exceeeded the upper limit:
1258
 
                 */
1259
 
                xt_db_approximate_time = time(NULL);
1260
 
                dcg->tcm_freeer_busy = FALSE;
1261
 
                /* No idea, why, but I am getting an uneccesarry pause here.
1262
 
                 * I run DBT2 with low record cache.
1263
 
                 *
1264
 
                 * Every now and then there is a pause where the freeer is here,
1265
 
                 * and all user threads are waiting for the freeer.
1266
 
                 *
1267
 
                 * So adding the tcm_threads_waiting condition.
1268
 
                 */
1269
 
                if (dcg->tcm_threads_waiting) {
1270
 
                        cache_used = 0;
1271
 
                        for (i=0; i<XT_TC_SEGMENT_COUNT; i++)
1272
 
                                cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
1273
 
                        if (cache_used < dcg->tcm_mid_level)
1274
 
                                tabc_fr_wait_for_cache(self, 500);
1275
 
                }
1276
 
                else
1277
 
                        tabc_fr_wait_for_cache(self, 500);
1278
 
                //tabc_fr_wait_for_cache(self, 30*1000);
1279
 
                dcg->tcm_freeer_busy = TRUE;
1280
 
                xt_db_approximate_time = time(NULL);
1281
 
                freer_(); // xt_unlock_mutex(&dcg->tcm_freeer_lock)
1282
 
        }
1283
 
}
1284
 
 
1285
 
static void *tabc_fr_run_thread(XTThreadPtr self)
1286
 
{
1287
 
        int             count;
1288
 
        void    *mysql_thread;
1289
 
 
1290
 
        myxt_wait_pbxt_plugin_slot_assigned(self);
1291
 
 
1292
 
        mysql_thread = myxt_create_thread();
1293
 
 
1294
 
        while (!self->t_quit) {
1295
 
                try_(a) {
1296
 
                        tabc_fr_main(self);
1297
 
                }
1298
 
                catch_(a) {
1299
 
                        /* This error is "normal"! */
1300
 
                        if (!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
1301
 
                                self->t_exception.e_sys_err == SIGTERM))
1302
 
                                xt_log_and_clear_exception(self);
1303
 
                }
1304
 
                cont_(a);
1305
 
 
1306
 
                /* After an exception, pause before trying again... */
1307
 
                /* Number of seconds */
1308
 
#ifdef DEBUG
1309
 
                count = 10;
1310
 
#else
1311
 
                count = 2*60;
1312
 
#endif
1313
 
                while (!self->t_quit && count > 0) {
1314
 
                        xt_db_approximate_time = time(NULL);
1315
 
                        sleep(1);
1316
 
                        count--;
1317
 
                }
1318
 
        }
1319
 
 
1320
 
   /*
1321
 
        * {MYSQL-THREAD-KILL}
1322
 
        myxt_destroy_thread(mysql_thread, TRUE);
1323
 
        */
1324
 
        return NULL;
1325
 
}
1326
 
 
1327
 
static void tabc_fr_free_thread(XTThreadPtr self, void *XT_UNUSED(data))
1328
 
{
1329
 
        if (xt_tab_cache.tcm_freeer_thread) {
1330
 
                xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
1331
 
                pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
1332
 
                xt_tab_cache.tcm_freeer_thread = NULL;
1333
 
                freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
1334
 
        }
1335
 
}
1336
 
 
1337
 
xtPublic void xt_start_freeer(XTThreadPtr self)
1338
 
{
1339
 
        xt_tab_cache.tcm_freeer_thread = xt_create_daemon(self, "free-er");
1340
 
        xt_set_thread_data(xt_tab_cache.tcm_freeer_thread, NULL, tabc_fr_free_thread);
1341
 
        xt_run_thread(self, xt_tab_cache.tcm_freeer_thread, tabc_fr_run_thread);
1342
 
}
1343
 
 
1344
 
xtPublic void xt_quit_freeer(XTThreadPtr self)
1345
 
{
1346
 
        if (xt_tab_cache.tcm_freeer_thread) {
1347
 
                xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
1348
 
                pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
1349
 
                xt_terminate_thread(self, xt_tab_cache.tcm_freeer_thread);
1350
 
                freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
1351
 
        }
1352
 
}
1353
 
 
1354
 
xtPublic void xt_stop_freeer(XTThreadPtr self)
1355
 
{
1356
 
        XTThreadPtr thr_fr;
1357
 
 
1358
 
        if (xt_tab_cache.tcm_freeer_thread) {
1359
 
                xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
1360
 
                pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
1361
 
 
1362
 
                /* This pointer is safe as long as you have the transaction lock. */
1363
 
                if ((thr_fr = xt_tab_cache.tcm_freeer_thread)) {
1364
 
                        xtThreadID tid = thr_fr->t_id;
1365
 
 
1366
 
                        /* Make sure the thread quits when woken up. */
1367
 
                        xt_terminate_thread(self, thr_fr);
1368
 
 
1369
 
                        /* Wake the freeer to get it to quit: */
1370
 
                        if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
1371
 
                                xt_log_and_clear_exception_ns();
1372
 
        
1373
 
                        freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
1374
 
 
1375
 
                        /*
1376
 
                         * GOTCHA: This is a wierd thing but the SIGTERM directed
1377
 
                         * at a particular thread (in this case the sweeper) was
1378
 
                         * being caught by a different thread and killing the server
1379
 
                         * sometimes. Disconcerting.
1380
 
                         * (this may only be a problem on Mac OS X)
1381
 
                        xt_kill_thread(thread);
1382
 
                         */
1383
 
                        xt_wait_for_thread_to_exit(tid, FALSE);
1384
 
        
1385
 
                        /* PMC - This should not be necessary to set the signal here, but in the
1386
 
                         * debugger the handler is not called!!?
1387
 
                        thr_fr->t_delayed_signal = SIGTERM;
1388
 
                        xt_kill_thread(thread);
1389
 
                         */
1390
 
                        xt_tab_cache.tcm_freeer_thread = NULL;
1391
 
                }
1392
 
                else
1393
 
                        freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
1394
 
        }
1395
 
}
1396
 
 
1397
 
xtPublic void xt_load_pages(XTThreadPtr self, XTOpenTablePtr ot)
1398
 
{
1399
 
        XTTableHPtr                     tab = ot->ot_table;
1400
 
        xtRecordID                      rec_id;
1401
 
        XTTabCachePagePtr       page;
1402
 
        XTTabCacheSegPtr        seg;
1403
 
        size_t                          poffset;
1404
 
 
1405
 
        rec_id = 1;
1406
 
        while (rec_id<tab->tab_row_eof_id) {
1407
 
                if (!tab->tab_rows.tc_fetch(ot->ot_row_file, rec_id, &seg, &page, &poffset, TRUE, self))
1408
 
                        xt_throw(self);
1409
 
                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1410
 
                rec_id += tab->tab_rows.tci_rows_per_page;
1411
 
        }
1412
 
 
1413
 
        rec_id = 1;
1414
 
        while (rec_id<tab->tab_rec_eof_id) {
1415
 
                if (!tab->tab_recs.tc_fetch(ot->ot_rec_file, rec_id, &seg, &page, &poffset, TRUE, self))
1416
 
                        xt_throw(self);
1417
 
                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1418
 
                rec_id += tab->tab_recs.tci_rows_per_page;
1419
 
        }
1420
 
}
1421
 
 
1422
 
#ifdef TRACE_DISTRIBUTION
1423
 
/* ----------------------------------------------------------------------
1424
 
 *
1425
 
 */
1426
 
 
1427
 
//#define TABC_TRACE_FILE       "stock"
1428
 
//#define TABC_TRACE_FILE               "customer"
1429
 
#define TABC_TRACE_FILE         "upd_ind_PBXT"
1430
 
//#define TABC_TRACE_FILE       "order_line"
1431
 
 
1432
 
#define TABC_DISPLAY_WIDTH      180
1433
 
 
1434
 
//#define TRACK_ROWS
1435
 
 
1436
 
#ifdef TRACK_ROWS
1437
 
#define TABC_CORRECT_REC_SIZE(s)                ((s) == 4)
1438
 
#else
1439
 
#define TABC_CORRECT_REC_SIZE(s)                ((s) != 4)
1440
 
#endif
1441
 
 
1442
 
static XTSpinXSLockRec  tabc_dist_lock;
1443
 
static u_int                    tabc_dist_file_id;
1444
 
static xtWord8                  tabc_output_time;
1445
 
static int                              tabc_recs_per_block;
1446
 
static int                              tabc_change_map_eof;
1447
 
static int                              tabc_change_map_init_eof;
1448
 
static int                              tabc_change_map_size;
1449
 
static int                              *tabc_change_map;
1450
 
static char                             *tabc_change_map_string;
1451
 
 
1452
 
static void tabc_init_dist(XTTabCachePtr cac, XTFilePtr file)
1453
 
{
1454
 
        if (!strstr(xt_last_name_of_path(file->fil_path), TABC_TRACE_FILE) || !TABC_CORRECT_REC_SIZE(cac->tci_rec_size))
1455
 
                return;
1456
 
 
1457
 
        xt_spinxslock_init_with_autoname(NULL, &tabc_dist_lock);
1458
 
        tabc_dist_file_id = file->fil_id;
1459
 
        tabc_output_time = xt_trace_clock();
1460
 
#ifdef TRACK_ROWS
1461
 
        tabc_recs_per_block = cac->tci_table->tab_row_eof_id / TABC_DISPLAY_WIDTH;
1462
 
#else
1463
 
        tabc_recs_per_block = cac->tci_table->tab_row_eof_id / TABC_DISPLAY_WIDTH;
1464
 
#endif
1465
 
        tabc_change_map_init_eof = TABC_DISPLAY_WIDTH;
1466
 
        tabc_change_map_eof = tabc_change_map_init_eof;
1467
 
        tabc_change_map_size = tabc_change_map_eof * 2;
1468
 
        tabc_change_map = (int *) malloc(tabc_change_map_size * sizeof(int));
1469
 
        memset(tabc_change_map, 0, tabc_change_map_size * sizeof(int));
1470
 
        tabc_change_map_string = (char *) malloc(tabc_change_map_size + 1);
1471
 
        printf("FILE: %s\n", file->fil_path);
1472
 
        printf("rec size:   %d\n", (int) cac->tci_rec_size);
1473
 
        printf("block size: %d\n", tabc_recs_per_block);
1474
 
        printf("EOF:        %d\n", tabc_change_map_eof);
1475
 
}
1476
 
 
1477
 
/* _.-~`'* */
1478
 
static void tabc_dist_output(xtWord8 now)
1479
 
{
1480
 
        int v;
1481
 
        int i;  
1482
 
        int time;
1483
 
 
1484
 
        time = (int) (now - tabc_output_time);
1485
 
        tabc_output_time = now;
1486
 
        xt_spinxslock_xlock(&tabc_dist_lock, FALSE, 0);
1487
 
        for (i=0; i<tabc_change_map_eof; i++) {
1488
 
                v = tabc_change_map[i];
1489
 
                tabc_change_map[i] = 0;
1490
 
                if (v == 0)
1491
 
                        tabc_change_map_string[i] = ' ';
1492
 
                else if (v <= 2)
1493
 
                        tabc_change_map_string[i] = '_';
1494
 
                else if (v <= 10)
1495
 
                        tabc_change_map_string[i] = '.';
1496
 
                else if (v <= 50)
1497
 
                        tabc_change_map_string[i] = '-';
1498
 
                else if (v <= 100)
1499
 
                        tabc_change_map_string[i] = '~';
1500
 
                else if (v <= 200)
1501
 
                        tabc_change_map_string[i] = '`';
1502
 
                else if (v <= 500)
1503
 
                        tabc_change_map_string[i] = '\'';
1504
 
                else
1505
 
                        tabc_change_map_string[i] = '*';
1506
 
        }
1507
 
        tabc_change_map_string[i] = 0;
1508
 
        printf("%5d %s\n", time / 1000, tabc_change_map_string);
1509
 
        xt_spinxslock_unlock(&tabc_dist_lock, TRUE);
1510
 
}
1511
 
 
1512
 
static void tabc_dist_change(XTOpenFilePtr file, xtRefID ref_id)
1513
 
{
1514
 
        xtWord8 now;
1515
 
 
1516
 
        if (file->fr_file->fil_id != tabc_dist_file_id)
1517
 
                return;
1518
 
 
1519
 
        now = xt_trace_clock();
1520
 
        if (now - tabc_output_time >= 1000000)
1521
 
                tabc_dist_output(now);
1522
 
 
1523
 
        ref_id = ref_id / tabc_recs_per_block;
1524
 
        if (ref_id < tabc_change_map_size)
1525
 
                tabc_change_map[ref_id]++;
1526
 
        if (ref_id+1 > tabc_change_map_eof)
1527
 
                tabc_change_map_eof = ref_id+1;
1528
 
}
1529
 
 
1530
 
#endif
1531
 
 
1532