~posulliv/drizzle/optimizer-style-cleanup

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/datalog_xt.cc

  • Committer: Padraig O'Sullivan
  • Date: 2010-04-17 01:38:47 UTC
  • mfrom: (1237.9.238 bad-staging)
  • Revision ID: osullivan.padraig@gmail.com-20100417013847-ibjioqsfbmf5yg4g
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
 
2
 *
 
3
 * PrimeBase XT
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * 2005-01-24   Paul McCullagh
 
20
 *
 
21
 * H&G2JCtL
 
22
 */
 
23
 
 
24
#include "xt_config.h"
 
25
 
 
26
#include <stdio.h>
 
27
#ifndef XT_WIN
 
28
#include <unistd.h>
 
29
#include <signal.h>
 
30
#endif
 
31
#include <stdlib.h>
 
32
 
 
33
#ifndef DRIZZLED
 
34
#include "mysql_priv.h"
 
35
#endif
 
36
 
 
37
#include "ha_pbxt.h"
 
38
 
 
39
#include "filesys_xt.h"
 
40
#include "database_xt.h"
 
41
#include "memory_xt.h"
 
42
#include "strutil_xt.h"
 
43
#include "sortedlist_xt.h"
 
44
#include "util_xt.h"
 
45
#include "heap_xt.h"
 
46
#include "table_xt.h"
 
47
#include "trace_xt.h"
 
48
#include "myxt_xt.h"
 
49
 
 
50
static void dl_wake_co_thread(XTDatabaseHPtr db);
 
51
 
 
52
/*
 
53
 * --------------------------------------------------------------------------------
 
54
 * SEQUENTIAL READING
 
55
 */
 
56
 
 
57
xtBool XTDataSeqRead::sl_seq_init(struct XTDatabase *db, size_t buffer_size)
 
58
{
 
59
        sl_db = db;
 
60
        sl_buffer_size = buffer_size;
 
61
 
 
62
        sl_log_file = NULL;
 
63
        sl_log_eof = 0;
 
64
 
 
65
        sl_buf_log_offset = 0;
 
66
        sl_buffer_len = 0;
 
67
        sl_buffer = (xtWord1 *) xt_malloc_ns(buffer_size);
 
68
 
 
69
        sl_rec_log_id = 0;
 
70
        sl_rec_log_offset = 0;
 
71
        sl_record_len = 0;
 
72
        sl_extra_garbage = 0;
 
73
 
 
74
        return sl_buffer != NULL;
 
75
}
 
76
 
 
77
void XTDataSeqRead::sl_seq_exit()
 
78
{
 
79
        if (sl_log_file) {
 
80
                xt_close_file_ns(sl_log_file);
 
81
                sl_log_file  = NULL;
 
82
        }
 
83
        if (sl_buffer) {
 
84
                xt_free_ns(sl_buffer);
 
85
                sl_buffer = NULL;
 
86
        }
 
87
}
 
88
 
 
89
XTOpenFilePtr XTDataSeqRead::sl_seq_open_file()
 
90
{
 
91
        return sl_log_file;
 
92
}
 
93
 
 
94
void XTDataSeqRead::sl_seq_pos(xtLogID *log_id, xtLogOffset *log_offset)
 
95
{
 
96
        *log_id = sl_rec_log_id;
 
97
        *log_offset = sl_rec_log_offset;
 
98
}
 
99
 
 
100
xtBool XTDataSeqRead::sl_seq_start(xtLogID log_id, xtLogOffset log_offset, xtBool missing_ok)
 
101
{
 
102
        if (sl_rec_log_id != log_id) {
 
103
                if (sl_log_file) {
 
104
                        xt_close_file_ns(sl_log_file);
 
105
                        sl_log_file  = NULL;
 
106
                }
 
107
 
 
108
                sl_rec_log_id = log_id;
 
109
                sl_buf_log_offset = sl_rec_log_offset;
 
110
                sl_buffer_len = 0;
 
111
 
 
112
                if (!sl_db->db_datalogs.dlc_open_log(&sl_log_file, log_id, missing_ok ? XT_FS_MISSING_OK : XT_FS_DEFAULT))
 
113
                        return FAILED;
 
114
                if (sl_log_file)
 
115
                        sl_log_eof = xt_seek_eof_file(NULL, sl_log_file);
 
116
        }
 
117
        sl_rec_log_offset = log_offset;
 
118
        sl_record_len = 0;
 
119
        return OK;
 
120
}
 
121
 
 
122
xtBool XTDataSeqRead::sl_rnd_read(xtLogOffset log_offset, size_t size, xtWord1 *buffer, size_t *data_read, struct XTThread *thread)
 
123
{
 
124
        if (!sl_log_file) {
 
125
                *data_read = 0;
 
126
                return OK;
 
127
        }
 
128
        return xt_pread_file(sl_log_file, log_offset, size, 0, buffer, data_read, &thread->st_statistics.st_data, thread);
 
129
}
 
130
 
 
131
/*
 
132
 * Unlike the transaction log sequential reader, this function only returns
 
133
 * the header of a record.
 
134
 *
 
135
 * {SKIP-GAPS}
 
136
 * This function now skips gaps. This should not be required, because in normal
 
137
 * operation, no gaps should be created.
 
138
 *
 
139
 * However, if his happens there is a danger that a valid record after the
 
140
 * gap will be lost.
 
141
 *
 
142
 * So, if we find an invalid record, we scan through the log to find the next
 
143
 * valid record. Note, that there is still a danger that will will find
 
144
 * data that looks like a valid record, but is not.
 
145
 *
 
146
 * In this case, this "pseudo record" may cause the function to actually skip
 
147
 * valid records.
 
148
 *
 
149
 * Note, any such malfunction will eventually cause the record to be lost forever
 
150
 * after the garbage collector has run.
 
151
 */
 
152
xtBool XTDataSeqRead::sl_seq_next(XTXactLogBufferDPtr *ret_entry, struct XTThread *thread)
 
153
{
 
154
        XTXactLogBufferDPtr     record;
 
155
        size_t                          tfer;
 
156
        size_t                          len = 0;
 
157
        size_t                          rec_offset;
 
158
        size_t                          max_rec_len;
 
159
        xtBool                          reread_from_buffer;
 
160
        xtWord4                         size;
 
161
        xtLogOffset                     gap_start = 0;
 
162
 
 
163
        /* Go to the next record (xseq_record_len must be initialized
 
164
         * to 0 for this to work.
 
165
         */
 
166
        retry:
 
167
        sl_rec_log_offset += sl_record_len;
 
168
        sl_record_len = 0;
 
169
 
 
170
        if (sl_rec_log_offset < sl_buf_log_offset ||
 
171
                sl_rec_log_offset >= sl_buf_log_offset + (xtLogOffset) sl_buffer_len) {
 
172
                /* The current position is nowhere near the buffer, read data into the
 
173
                 * buffer:
 
174
                 */
 
175
                tfer = sl_buffer_size;
 
176
                if (!sl_rnd_read(sl_rec_log_offset, tfer, sl_buffer, &tfer, thread))
 
177
                        return FAILED;
 
178
                sl_buf_log_offset = sl_rec_log_offset;
 
179
                sl_buffer_len = tfer;
 
180
 
 
181
                /* Should we go to the next log? */
 
182
                if (!tfer)
 
183
                        goto return_empty;
 
184
        }
 
185
 
 
186
        /* The start of the record is in the buffer: */
 
187
        read_from_buffer:
 
188
        rec_offset = (size_t) (sl_rec_log_offset - sl_buf_log_offset);
 
189
        max_rec_len = sl_buffer_len - rec_offset;
 
190
        reread_from_buffer = FALSE;
 
191
        size = 0;
 
192
 
 
193
        /* Check the type of record: */
 
194
        record = (XTXactLogBufferDPtr) (sl_buffer + rec_offset);
 
195
        switch (record->xl.xl_status_1) {
 
196
                case XT_LOG_ENT_HEADER:
 
197
                        if (sl_rec_log_offset != 0)
 
198
                                goto scan_to_next_record;
 
199
                        if (offsetof(XTXactLogHeaderDRec, xh_size_4) + 4 > max_rec_len) {
 
200
                                reread_from_buffer = TRUE;
 
201
                                goto read_more;
 
202
                        }
 
203
                        len = XT_GET_DISK_4(record->xh.xh_size_4);
 
204
                        if (len > max_rec_len) {
 
205
                                reread_from_buffer = TRUE;
 
206
                                goto read_more;
 
207
                        }
 
208
 
 
209
                        if (record->xh.xh_checksum_1 != XT_CHECKSUM_1(sl_rec_log_id))
 
210
                                goto return_empty;
 
211
                        if (XT_LOG_HEAD_MAGIC(record, len) != XT_LOG_FILE_MAGIC)
 
212
                                goto return_empty;
 
213
                        if (len > offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
 
214
                                if (XT_GET_DISK_4(record->xh.xh_log_id_4) != sl_rec_log_id)
 
215
                                        goto return_empty;
 
216
                        }
 
217
                        break;
 
218
                case XT_LOG_ENT_EXT_REC_OK:
 
219
                case XT_LOG_ENT_EXT_REC_DEL:
 
220
                        if (gap_start) {
 
221
                                xt_logf(XT_NS_CONTEXT, XT_LOG_WARNING, "Gap in data log %lu, start: %llu, size: %llu\n", (u_long) sl_rec_log_id, (u_llong) gap_start, (u_llong) (sl_rec_log_offset - gap_start));
 
222
                                gap_start = 0;
 
223
                        }
 
224
                        len = offsetof(XTactExtRecEntryDRec, er_data);
 
225
                        if (len > max_rec_len) {
 
226
                                reread_from_buffer = TRUE;
 
227
                                goto read_more;
 
228
                        }
 
229
                        size = XT_GET_DISK_4(record->er.er_data_size_4);
 
230
                        /* Verify the record as good as we can! */
 
231
                        if (!size)
 
232
                                goto scan_to_next_record;
 
233
                        if (sl_rec_log_offset + (xtLogOffset) offsetof(XTactExtRecEntryDRec, er_data) + size > sl_log_eof)
 
234
                                goto scan_to_next_record;
 
235
                        if (!XT_GET_DISK_4(record->er.er_tab_id_4))
 
236
                                goto scan_to_next_record;
 
237
                        if (!XT_GET_DISK_4(record->er.er_rec_id_4))
 
238
                                goto scan_to_next_record;
 
239
                        break;
 
240
                default:
 
241
                        /* Note, we no longer assume EOF.
 
242
                         * Instead, we skip to the next value record. */
 
243
                        goto scan_to_next_record;
 
244
        }
 
245
 
 
246
        if (len <= max_rec_len) {
 
247
                /* The record is completely in the buffer: */
 
248
                sl_record_len = len+size;
 
249
                *ret_entry = record;
 
250
                return OK;
 
251
        }
 
252
        
 
253
        read_more:
 
254
        /* The record is partially in the buffer. */
 
255
        memmove(sl_buffer, sl_buffer + rec_offset, max_rec_len);
 
256
        sl_buf_log_offset += rec_offset;
 
257
        sl_buffer_len = max_rec_len;
 
258
 
 
259
        /* Read the rest, as far as possible: */
 
260
        tfer = sl_buffer_size - max_rec_len;
 
261
        if (!sl_rnd_read(sl_buf_log_offset + max_rec_len, tfer, sl_buffer + max_rec_len, &tfer, thread))
 
262
                return FAILED;
 
263
        sl_buffer_len += tfer;
 
264
 
 
265
        if (sl_buffer_len < len)
 
266
                /* A partial record is in the log, must be the end of the log: */
 
267
                goto return_empty;
 
268
 
 
269
        if (reread_from_buffer)
 
270
                goto read_from_buffer;
 
271
 
 
272
        /* The record is not completely in the buffer: */
 
273
        sl_record_len = len;
 
274
        *ret_entry = (XTXactLogBufferDPtr) sl_buffer;
 
275
        return OK;
 
276
 
 
277
        scan_to_next_record:
 
278
        if (!gap_start) {
 
279
                gap_start = sl_rec_log_offset;
 
280
                xt_logf(XT_NS_CONTEXT, XT_LOG_WARNING, "Gap found in data log %lu, starting at offset %llu\n", (u_long) sl_rec_log_id, (u_llong) gap_start);
 
281
        }
 
282
        sl_record_len = 1;
 
283
        sl_extra_garbage++;
 
284
        goto retry;
 
285
 
 
286
        return_empty:
 
287
        if (gap_start) {
 
288
                xt_logf(XT_NS_CONTEXT, XT_LOG_WARNING, "Gap in data log %lu, start: %llu, size: %llu\n", (u_long) sl_rec_log_id, (u_llong) gap_start, (u_llong) (sl_rec_log_offset - gap_start));
 
289
                gap_start = 0;
 
290
        }
 
291
        *ret_entry = NULL;
 
292
        return OK;
 
293
}
 
294
 
 
295
void XTDataSeqRead::sl_seq_skip(size_t size)
 
296
{
 
297
        sl_record_len += size;
 
298
}
 
299
 
 
300
void XTDataSeqRead::sl_seq_skip_to(off_t log_offset)
 
301
{
 
302
        if (log_offset >= sl_rec_log_offset)
 
303
                sl_record_len = (size_t) (log_offset - sl_rec_log_offset);
 
304
}
 
305
 
 
306
/*
 
307
 * --------------------------------------------------------------------------------
 
308
 * STATIC UTILITIES
 
309
 */
 
310
 
 
311
static xtBool dl_create_log_header(XTDataLogFilePtr data_log, XTOpenFilePtr of, XTThreadPtr thread)
 
312
{
 
313
        XTXactLogHeaderDRec     header;
 
314
 
 
315
        /* The header was not completely written, so write a new one: */
 
316
        memset(&header, 0, sizeof(XTXactLogHeaderDRec));
 
317
        header.xh_status_1 = XT_LOG_ENT_HEADER;
 
318
        header.xh_checksum_1 = XT_CHECKSUM_1(data_log->dlf_log_id);
 
319
        XT_SET_DISK_4(header.xh_size_4, sizeof(XTXactLogHeaderDRec));
 
320
        XT_SET_DISK_8(header.xh_free_space_8, 0);
 
321
        XT_SET_DISK_8(header.xh_file_len_8, sizeof(XTXactLogHeaderDRec));
 
322
        XT_SET_DISK_4(header.xh_log_id_4, data_log->dlf_log_id);
 
323
        XT_SET_DISK_2(header.xh_version_2, XT_LOG_VERSION_NO);
 
324
        XT_SET_DISK_4(header.xh_magic_4, XT_LOG_FILE_MAGIC);
 
325
        if (!xt_pwrite_file(of, 0, sizeof(XTXactLogHeaderDRec), &header, &thread->st_statistics.st_data, thread))
 
326
                return FAILED;
 
327
        if (!xt_flush_file(of, &thread->st_statistics.st_data, thread))
 
328
                return FAILED;
 
329
        return OK;
 
330
}
 
331
 
 
332
static xtBool dl_write_garbage_level(XTDataLogFilePtr data_log, XTOpenFilePtr of, xtBool flush, XTThreadPtr thread)
 
333
{
 
334
        XTXactLogHeaderDRec     header;
 
335
 
 
336
        /* The header was not completely written, so write a new one: */
 
337
        XT_SET_DISK_8(header.xh_free_space_8, data_log->dlf_garbage_count);
 
338
        if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_free_space_8), 8, (xtWord1 *) &header.xh_free_space_8, &thread->st_statistics.st_data, thread))
 
339
                return FAILED;
 
340
        if (flush && !xt_flush_file(of, &thread->st_statistics.st_data, thread))
 
341
                return FAILED;
 
342
        return OK;
 
343
}
 
344
 
 
345
/*
 
346
 * {SKIP-GAPS}
 
347
 * Extra garbage is the amount of space skipped during recovery of the data
 
348
 * log file. We assume this space has not be counted as garbage, 
 
349
 * and add it to the garbage count.
 
350
 *
 
351
 * This may mean that our estimate of garbaged is higher than it should
 
352
 * be, but that is better than the other way around.
 
353
 *
 
354
 * The fact is, there should not be any gaps in the data log files, so
 
355
 * this is actually an exeption which should not occur.
 
356
 */
 
357
static xtBool dl_write_log_header(XTDataLogFilePtr data_log, XTOpenFilePtr of, xtLogOffset extra_garbage, XTThreadPtr thread)
 
358
{
 
359
        XTXactLogHeaderDRec     header;
 
360
 
 
361
        XT_SET_DISK_8(header.xh_file_len_8, data_log->dlf_log_eof);
 
362
 
 
363
        if (extra_garbage) {
 
364
                data_log->dlf_garbage_count += extra_garbage;
 
365
                if (data_log->dlf_garbage_count > data_log->dlf_log_eof)
 
366
                        data_log->dlf_garbage_count = data_log->dlf_log_eof;
 
367
                XT_SET_DISK_8(header.xh_free_space_8, data_log->dlf_garbage_count);
 
368
                if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_free_space_8), 16, (xtWord1 *) &header.xh_free_space_8, &thread->st_statistics.st_data, thread))
 
369
                        return FAILED;
 
370
        }
 
371
        else {
 
372
                if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_file_len_8), 8, (xtWord1 *) &header.xh_file_len_8, &thread->st_statistics.st_data, thread))
 
373
                        return FAILED;
 
374
        }
 
375
        if (!xt_flush_file(of, &thread->st_statistics.st_data, thread))
 
376
                return FAILED;
 
377
        return OK;
 
378
}
 
379
 
 
380
static void dl_free_seq_read(XTThreadPtr XT_UNUSED(self), XTDataSeqReadPtr seq_read)
 
381
{
 
382
        seq_read->sl_seq_exit();
 
383
}
 
384
 
 
385
static void dl_recover_log(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogFilePtr data_log)
 
386
{
 
387
        XTDataSeqReadRec        seq_read;
 
388
        XTXactLogBufferDPtr     record;
 
389
 
 
390
        if (!seq_read.sl_seq_init(db, xt_db_log_buffer_size))
 
391
                xt_throw(self);
 
392
        pushr_(dl_free_seq_read, &seq_read);
 
393
 
 
394
        seq_read.sl_seq_start(data_log->dlf_log_id, 0, FALSE);
 
395
 
 
396
        for (;;) {
 
397
                if (!seq_read.sl_seq_next(&record, self))
 
398
                        xt_throw(self);
 
399
                if (!record)
 
400
                        break;
 
401
                switch (record->xh.xh_status_1) {
 
402
                        case XT_LOG_ENT_HEADER:
 
403
                                data_log->dlf_garbage_count = XT_GET_DISK_8(record->xh.xh_free_space_8);
 
404
                                data_log->dlf_start_offset = XT_GET_DISK_8(record->xh.xh_comp_pos_8);
 
405
                                seq_read.sl_seq_skip_to((off_t) XT_GET_DISK_8(record->xh.xh_file_len_8)); 
 
406
                                break;
 
407
                }
 
408
        }
 
409
 
 
410
        ASSERT_NS(seq_read.sl_log_eof == seq_read.sl_rec_log_offset);
 
411
        data_log->dlf_log_eof = seq_read.sl_rec_log_offset;
 
412
 
 
413
        if (data_log->dlf_log_eof < (off_t) sizeof(XTXactLogHeaderDRec)) {
 
414
                data_log->dlf_log_eof = sizeof(XTXactLogHeaderDRec);
 
415
                if (!dl_create_log_header(data_log, seq_read.sl_log_file, self))
 
416
                        xt_throw(self);
 
417
        }
 
418
        else {
 
419
                if (!dl_write_log_header(data_log, seq_read.sl_log_file, seq_read.sl_extra_garbage, self))
 
420
                        xt_throw(self);
 
421
        }
 
422
 
 
423
        freer_(); // dl_free_seq_read(&seq_read)
 
424
}
 
425
 
 
426
/*
 
427
 * --------------------------------------------------------------------------------
 
428
 * D A T A  L O G  C AC H E
 
429
 */
 
430
 
 
431
void XTDataLogCache::dls_remove_log(XTDataLogFilePtr data_log)
 
432
{
 
433
        xtLogID log_id = data_log->dlf_log_id;
 
434
 
 
435
        switch (data_log->dlf_state) {
 
436
                case XT_DL_HAS_SPACE:
 
437
                        xt_sl_delete(NULL, dlc_has_space, &log_id);
 
438
                        break;
 
439
                case XT_DL_TO_COMPACT:
 
440
                        xt_sl_delete(NULL, dlc_to_compact, &log_id);
 
441
                        break;
 
442
                case XT_DL_TO_DELETE:
 
443
                        xt_sl_delete(NULL, dlc_to_delete, &log_id);
 
444
                        break;
 
445
                case XT_DL_DELETED:
 
446
                        xt_sl_delete(NULL, dlc_deleted, &log_id);
 
447
                        break;
 
448
        }
 
449
}
 
450
 
 
451
int XTDataLogCache::dls_get_log_state(XTDataLogFilePtr data_log)
 
452
{
 
453
        if (data_log->dlf_to_much_garbage())
 
454
                return XT_DL_TO_COMPACT;
 
455
        if (data_log->dlf_space_avaliable() > 0)
 
456
                return XT_DL_HAS_SPACE;
 
457
        return XT_DL_READ_ONLY;
 
458
}
 
459
 
 
460
xtBool XTDataLogCache::dls_set_log_state(XTDataLogFilePtr data_log, int state)
 
461
{
 
462
        xtLogID log_id = data_log->dlf_log_id;
 
463
 
 
464
        xt_lock_mutex_ns(&dlc_lock);
 
465
        if (state == XT_DL_MAY_COMPACT) {
 
466
                if (data_log->dlf_state != XT_DL_UNKNOWN &&
 
467
                        data_log->dlf_state != XT_DL_HAS_SPACE &&
 
468
                        data_log->dlf_state != XT_DL_READ_ONLY)
 
469
                        goto ok;
 
470
                state = XT_DL_TO_COMPACT;
 
471
        }
 
472
        if (state == XT_DL_UNKNOWN)
 
473
                state = dls_get_log_state(data_log);
 
474
        switch (state) {
 
475
                case XT_DL_HAS_SPACE:
 
476
                        if (data_log->dlf_state != XT_DL_HAS_SPACE) {
 
477
                                dls_remove_log(data_log);
 
478
                                if (!xt_sl_insert(NULL, dlc_has_space, &log_id, &log_id))
 
479
                                        goto failed;
 
480
                        }
 
481
                        break;
 
482
                case XT_DL_TO_COMPACT:
 
483
#ifdef DEBUG_LOG_DELETE
 
484
                        printf("-- set to compact: %d\n", (int) log_id);
 
485
#endif
 
486
                        if (data_log->dlf_state != XT_DL_TO_COMPACT) {
 
487
                                dls_remove_log(data_log);
 
488
                                if (!xt_sl_insert(NULL, dlc_to_compact, &log_id, &log_id))
 
489
                                        goto failed;
 
490
                        }
 
491
                        dl_wake_co_thread(dlc_db);
 
492
                        break;
 
493
                case XT_DL_COMPACTED:
 
494
#ifdef DEBUG_LOG_DELETE
 
495
                        printf("-- set compacted: %d\n", (int) log_id);
 
496
#endif
 
497
                        if (data_log->dlf_state != state)
 
498
                                dls_remove_log(data_log);
 
499
                        break;
 
500
                case XT_DL_TO_DELETE:
 
501
#ifdef DEBUG_LOG_DELETE
 
502
                        printf("-- set to delete log: %d\n", (int) log_id);
 
503
#endif
 
504
                        if (data_log->dlf_state != XT_DL_TO_DELETE) {
 
505
                                dls_remove_log(data_log);
 
506
                                if (!xt_sl_insert(NULL, dlc_to_delete, &log_id, &log_id))
 
507
                                        goto failed;
 
508
                        }
 
509
                        break;
 
510
                case XT_DL_DELETED:
 
511
#ifdef DEBUG_LOG_DELETE
 
512
                        printf("-- set DELETED log: %d\n", (int) log_id);
 
513
#endif
 
514
                        if (data_log->dlf_state != XT_DL_DELETED) {
 
515
                                dls_remove_log(data_log);
 
516
                                if (!xt_sl_insert(NULL, dlc_deleted, &log_id, &log_id))
 
517
                                        goto failed;
 
518
                        }
 
519
                        break;
 
520
                default:
 
521
                        if (data_log->dlf_state != state)
 
522
                                dls_remove_log(data_log);
 
523
                        break;
 
524
        }
 
525
        data_log->dlf_state = state;
 
526
 
 
527
        ok:
 
528
        xt_unlock_mutex_ns(&dlc_lock);
 
529
        return OK;
 
530
 
 
531
        failed:
 
532
        xt_unlock_mutex_ns(&dlc_lock);
 
533
        return FAILED;
 
534
}
 
535
 
 
536
static int dl_cmp_log_id(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
 
537
{
 
538
        xtLogID                 log_id_a = *((xtLogID *) a);
 
539
        xtLogID                 log_id_b = *((xtLogID *) b);
 
540
 
 
541
        if (log_id_a == log_id_b)
 
542
                return 0;
 
543
        if (log_id_a < log_id_b)
 
544
                return -1;
 
545
        return 1;
 
546
}
 
547
 
 
548
void XTDataLogCache::dlc_init(XTThreadPtr self, XTDatabaseHPtr db)
 
549
{
 
550
        XTOpenDirPtr            od;
 
551
        char                            log_dir[PATH_MAX];
 
552
        char                            *file;
 
553
        xtLogID                         log_id;
 
554
        XTDataLogFilePtr        data_log= NULL;
 
555
 
 
556
        memset(this, 0, sizeof(XTDataLogCacheRec));
 
557
        dlc_db = db;
 
558
        try_(a) {
 
559
                xt_init_mutex_with_autoname(self, &dlc_lock);
 
560
                xt_init_cond(self, &dlc_cond);
 
561
                for (u_int i=0; i<XT_DL_NO_OF_SEGMENTS; i++) {
 
562
                        xt_init_mutex_with_autoname(self, &dlc_segment[i].dls_lock);
 
563
                        xt_init_cond(self, &dlc_segment[i].dls_cond);
 
564
                }
 
565
                dlc_has_space = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
 
566
                dlc_to_compact = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
 
567
                dlc_to_delete = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
 
568
                dlc_deleted = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
 
569
                xt_init_mutex_with_autoname(self, &dlc_mru_lock);
 
570
                xt_init_mutex_with_autoname(self, &dlc_head_lock);
 
571
 
 
572
                xt_strcpy(PATH_MAX, log_dir, dlc_db->db_main_path);
 
573
                xt_add_data_dir(PATH_MAX, log_dir);
 
574
                if (xt_fs_exists(log_dir)) {
 
575
                        pushsr_(od, xt_dir_close, xt_dir_open(self, log_dir, NULL));
 
576
                        while (xt_dir_next(self, od)) {
 
577
                                file = xt_dir_name(self, od);
 
578
                                if (xt_ends_with(file, ".xt")) {
 
579
                                        if ((log_id = (xtLogID) xt_file_name_to_id(file))) {
 
580
                                                if (!dlc_get_data_log(&data_log, log_id, TRUE, NULL))
 
581
                                                        xt_throw(self);
 
582
                                                dl_recover_log(self, db, data_log);
 
583
                                                if (!dls_set_log_state(data_log, XT_DL_UNKNOWN))
 
584
                                                        xt_throw(self);
 
585
                                        }
 
586
                                }
 
587
                        }
 
588
                        freer_();
 
589
                }
 
590
        }
 
591
        catch_(a) {
 
592
                dlc_exit(self);
 
593
                xt_throw(self);
 
594
        }
 
595
        cont_(a);
 
596
}
 
597
 
 
598
void XTDataLogCache::dlc_exit(XTThreadPtr self)
 
599
{
 
600
        XTDataLogFilePtr        data_log, tmp_data_log;
 
601
        XTOpenLogFilePtr        open_log, tmp_open_log;
 
602
 
 
603
        if (dlc_has_space) {
 
604
                xt_free_sortedlist(self, dlc_has_space);
 
605
                dlc_has_space = NULL;
 
606
        }
 
607
        if (dlc_to_compact) {
 
608
                xt_free_sortedlist(self, dlc_to_compact);
 
609
                dlc_to_compact = NULL;
 
610
        }
 
611
        if (dlc_to_delete) {
 
612
                xt_free_sortedlist(self, dlc_to_delete);
 
613
                dlc_to_delete = NULL;
 
614
        }
 
615
        if (dlc_deleted) {
 
616
                xt_free_sortedlist(self, dlc_deleted);
 
617
                dlc_deleted = NULL;
 
618
        }
 
619
        for (u_int i=0; i<XT_DL_NO_OF_SEGMENTS; i++) {
 
620
                for (u_int j=0; j<XT_DL_SEG_HASH_TABLE_SIZE; j++) {
 
621
                        data_log = dlc_segment[i].dls_hash_table[j];
 
622
                        dlc_segment[i].dls_hash_table[j] = NULL;
 
623
                        while (data_log) {
 
624
                                if (data_log->dlf_log_file) {
 
625
                                        xt_close_file_ns(data_log->dlf_log_file);
 
626
                                        data_log->dlf_log_file = NULL;
 
627
                                }
 
628
 
 
629
                                open_log = data_log->dlf_free_list;
 
630
                                while (open_log) {
 
631
                                        if (open_log->odl_log_file)
 
632
                                                xt_close_file(self, open_log->odl_log_file);
 
633
                                        tmp_open_log = open_log;
 
634
                                        open_log = open_log->odl_next_free;
 
635
                                        xt_free(self, tmp_open_log);
 
636
                                }
 
637
                                tmp_data_log = data_log;
 
638
                                data_log = data_log->dlf_next_hash;
 
639
 
 
640
                                xt_free(self, tmp_data_log);
 
641
                        }
 
642
                }
 
643
                xt_free_mutex(&dlc_segment[i].dls_lock);
 
644
                xt_free_cond(&dlc_segment[i].dls_cond);
 
645
        }
 
646
        xt_free_mutex(&dlc_head_lock);
 
647
        xt_free_mutex(&dlc_mru_lock);
 
648
        xt_free_mutex(&dlc_lock);
 
649
        xt_free_cond(&dlc_cond);
 
650
}
 
651
 
 
652
void XTDataLogCache::dlc_name(size_t size, char *path, xtLogID log_id)
 
653
{
 
654
        char name[50];
 
655
 
 
656
        sprintf(name, "dlog-%lu.xt", (u_long) log_id);
 
657
        xt_strcpy(size, path, dlc_db->db_main_path);
 
658
        xt_add_data_dir(size, path);
 
659
        xt_add_dir_char(size, path);
 
660
        xt_strcat(size, path, name);
 
661
}
 
662
 
 
663
xtBool XTDataLogCache::dlc_open_log(XTOpenFilePtr *fh, xtLogID log_id, int mode)
 
664
{
 
665
        char log_path[PATH_MAX];
 
666
 
 
667
        dlc_name(PATH_MAX, log_path, log_id);
 
668
        return xt_open_file_ns(fh, log_path, XT_FT_STANDARD, mode, 16*1024*1024);
 
669
}
 
670
 
 
671
xtBool XTDataLogCache::dlc_unlock_log(XTDataLogFilePtr data_log)
 
672
{
 
673
        if (data_log->dlf_log_file) {
 
674
                xt_close_file_ns(data_log->dlf_log_file);
 
675
                data_log->dlf_log_file = NULL;
 
676
        }
 
677
 
 
678
        return dls_set_log_state(data_log, XT_DL_UNKNOWN);
 
679
}
 
680
 
 
681
XTDataLogFilePtr XTDataLogCache::dlc_get_log_for_writing(off_t space_required, struct XTThread *thread)
 
682
{
 
683
        xtLogID                         log_id, *log_id_ptr = NULL;
 
684
        size_t                          size;
 
685
        size_t                          idx;
 
686
        XTDataLogFilePtr        data_log = NULL;
 
687
 
 
688
        xt_lock_mutex_ns(&dlc_lock);
 
689
 
 
690
        /* Look for an existing log with enough space: */
 
691
        size = xt_sl_get_size(dlc_has_space);
 
692
        for (idx=0; idx<size; idx++) {
 
693
                log_id_ptr = (xtLogID *) xt_sl_item_at(dlc_has_space, idx);
 
694
                if (!dlc_get_data_log(&data_log, *log_id_ptr, FALSE, NULL))
 
695
                        goto failed;
 
696
                if (data_log) {
 
697
                        if (data_log->dlf_space_avaliable() >= space_required)
 
698
                                break;
 
699
                        data_log = NULL;
 
700
                }
 
701
                else {
 
702
                        ASSERT_NS(FALSE);
 
703
                        xt_sl_delete_item_at(NULL, dlc_has_space, idx);
 
704
                        idx--;
 
705
                        size--;
 
706
                }
 
707
        }
 
708
 
 
709
        if (data_log) {
 
710
                /* Found a log: */
 
711
                if (!dlc_open_log(&data_log->dlf_log_file, *log_id_ptr, XT_FS_DEFAULT))
 
712
                        goto failed;
 
713
                xt_sl_delete_item_at(NULL, dlc_has_space, idx);
 
714
        }
 
715
        else {
 
716
                /* Create a new log: */
 
717
                log_id = dlc_next_log_id;
 
718
                for (u_int i=0; i<XT_DL_MAX_LOG_ID; i++) {
 
719
                        log_id++;
 
720
                        if (log_id > XT_DL_MAX_LOG_ID)
 
721
                                log_id = 1;
 
722
                        if (!dlc_get_data_log(&data_log, log_id, FALSE, NULL))
 
723
                                goto failed;
 
724
                        if (!data_log)
 
725
                                break;
 
726
                }
 
727
                dlc_next_log_id = log_id;
 
728
                if (data_log) {
 
729
                        xt_register_ulxterr(XT_REG_CONTEXT, XT_ERR_LOG_MAX_EXCEEDED, (u_long) XT_DL_MAX_LOG_ID);
 
730
                        goto failed;
 
731
                }
 
732
                if (!dlc_get_data_log(&data_log, log_id, TRUE, NULL))
 
733
                        goto failed;
 
734
                if (!dlc_open_log(&data_log->dlf_log_file, log_id, XT_FS_CREATE | XT_FS_MAKE_PATH))
 
735
                        goto failed;
 
736
                data_log->dlf_log_eof = sizeof(XTXactLogHeaderDRec);
 
737
                if (!dl_create_log_header(data_log, data_log->dlf_log_file, thread)) {
 
738
                        xt_close_file_ns(data_log->dlf_log_file);
 
739
                        goto failed;
 
740
                }
 
741
                /* By setting this late we ensure that the error
 
742
                 * will be repeated.
 
743
                 */ 
 
744
                dlc_next_log_id = log_id;
 
745
        }
 
746
        data_log->dlf_state = XT_DL_EXCLUSIVE;
 
747
 
 
748
        xt_unlock_mutex_ns(&dlc_lock);
 
749
        return data_log;
 
750
 
 
751
        failed:
 
752
        xt_unlock_mutex_ns(&dlc_lock);
 
753
        return NULL;
 
754
}
 
755
 
 
756
xtBool XTDataLogCache::dlc_get_data_log(XTDataLogFilePtr *lf, xtLogID log_id, xtBool create, XTDataLogSegPtr *ret_seg)
 
757
{
 
758
        register XTDataLogSegPtr        seg;
 
759
        register u_int                          hash_idx;
 
760
        register XTDataLogFilePtr       data_log;
 
761
 
 
762
        /* Which segment, and hash index: */
 
763
        seg = &dlc_segment[log_id & XT_DL_SEGMENT_MASK];
 
764
        hash_idx = (log_id >> XT_DL_SEGMENT_SHIFTS) % XT_DL_SEG_HASH_TABLE_SIZE;
 
765
 
 
766
        /* Lock the segment: */
 
767
        xt_lock_mutex_ns(&seg->dls_lock);
 
768
 
 
769
        /* Find the log file on the hash list: */
 
770
        data_log = seg->dls_hash_table[hash_idx];
 
771
        while (data_log) {
 
772
                if (data_log->dlf_log_id == log_id)
 
773
                        break;
 
774
                data_log = data_log->dlf_next_hash;
 
775
        }
 
776
 
 
777
        if (!data_log && create) {
 
778
                /* Create a new log file structure: */
 
779
                if (!(data_log = (XTDataLogFilePtr) xt_calloc_ns(sizeof(XTDataLogFileRec))))
 
780
                        goto failed;
 
781
                data_log->dlf_log_id = log_id;
 
782
                data_log->dlf_next_hash = seg->dls_hash_table[hash_idx];
 
783
                seg->dls_hash_table[hash_idx] = data_log;
 
784
        }
 
785
 
 
786
        if (ret_seg) {
 
787
                /* This gives the caller the lock: */
 
788
                *ret_seg = seg;
 
789
                *lf = data_log;
 
790
                return OK;
 
791
        }
 
792
 
 
793
        xt_unlock_mutex_ns(&seg->dls_lock);
 
794
        *lf = data_log;
 
795
        return OK;
 
796
 
 
797
        failed:
 
798
        xt_unlock_mutex_ns(&seg->dls_lock);
 
799
        return FAILED;
 
800
}
 
801
 
 
802
/*
 
803
 * If just_close is FALSE, then a log is being deleted.
 
804
 * This means that that the log may still be in exclusive use by
 
805
 * some thread. So we just close the log!
 
806
 */
 
807
xtBool XTDataLogCache::dlc_remove_data_log(xtLogID log_id, xtBool just_close)
 
808
{
 
809
        register XTDataLogSegPtr        seg;
 
810
        register u_int                          hash_idx;
 
811
        register XTDataLogFilePtr       data_log;
 
812
        XTOpenLogFilePtr                        open_log, tmp_open_log;
 
813
 
 
814
        /* Which segment, and hash index: */
 
815
        seg = &dlc_segment[log_id & XT_DL_SEGMENT_MASK];
 
816
        hash_idx = (log_id >> XT_DL_SEGMENT_SHIFTS) % XT_DL_SEG_HASH_TABLE_SIZE;
 
817
 
 
818
        /* Lock the segment: */
 
819
        retry:
 
820
        xt_lock_mutex_ns(&seg->dls_lock);
 
821
 
 
822
        /* Find the log file on the hash list: */
 
823
        data_log = seg->dls_hash_table[hash_idx];
 
824
        while (data_log) {
 
825
                if (data_log->dlf_log_id == log_id)
 
826
                        break;
 
827
                data_log = data_log->dlf_next_hash;
 
828
        }
 
829
 
 
830
        if (data_log) {
 
831
                xt_lock_mutex_ns(&dlc_mru_lock);
 
832
 
 
833
                open_log = data_log->dlf_free_list;
 
834
                while (open_log) {
 
835
                        if (open_log->odl_log_file)
 
836
                                xt_close_file_ns(open_log->odl_log_file);
 
837
 
 
838
                        /* Remove from MRU list: */
 
839
                        if (dlc_lru_open_log == open_log) {
 
840
                                dlc_lru_open_log = open_log->odl_mr_used;
 
841
                                ASSERT_NS(!open_log->odl_lr_used);
 
842
                        }
 
843
                        else if (open_log->odl_lr_used)
 
844
                                open_log->odl_lr_used->odl_mr_used = open_log->odl_mr_used;
 
845
                        if (dlc_mru_open_log == open_log) {
 
846
                                dlc_mru_open_log = open_log->odl_lr_used;
 
847
                                ASSERT_NS(!open_log->odl_mr_used);
 
848
                        }
 
849
                        else if (open_log->odl_mr_used)
 
850
                                open_log->odl_mr_used->odl_lr_used = open_log->odl_lr_used;
 
851
 
 
852
                        data_log->dlf_open_count--;
 
853
                        tmp_open_log = open_log;
 
854
                        open_log = open_log->odl_next_free;
 
855
                        xt_free_ns(tmp_open_log);
 
856
                }
 
857
                data_log->dlf_free_list = NULL;
 
858
 
 
859
                xt_unlock_mutex_ns(&dlc_mru_lock);
 
860
 
 
861
                if (data_log->dlf_open_count) {
 
862
                        if (!xt_timed_wait_cond_ns(&seg->dls_cond, &seg->dls_lock, 2000))
 
863
                                goto failed;
 
864
                        xt_unlock_mutex_ns(&seg->dls_lock);
 
865
                        goto retry;
 
866
                }
 
867
 
 
868
                /* Close the exclusive file if required: */
 
869
                if (data_log->dlf_log_file) {
 
870
                        xt_close_file_ns(data_log->dlf_log_file);
 
871
                        data_log->dlf_log_file = NULL;
 
872
                }
 
873
 
 
874
                if (!just_close) {
 
875
                        /* Remove the log from the hash list: */
 
876
                        XTDataLogFilePtr ptr, pptr = NULL;
 
877
 
 
878
                        ptr = seg->dls_hash_table[hash_idx];
 
879
                        while (ptr) {
 
880
                                if (ptr == data_log)
 
881
                                        break;
 
882
                                pptr = ptr;
 
883
                                ptr = ptr->dlf_next_hash;
 
884
                        }
 
885
                        
 
886
                        if (ptr == data_log) {
 
887
                                if (pptr)
 
888
                                        pptr->dlf_next_hash = ptr->dlf_next_hash;
 
889
                                else
 
890
                                        seg->dls_hash_table[hash_idx] = ptr->dlf_next_hash;
 
891
                        }
 
892
 
 
893
                        xt_free_ns(data_log);
 
894
                }
 
895
        }
 
896
 
 
897
        xt_unlock_mutex_ns(&seg->dls_lock);
 
898
        return OK;
 
899
 
 
900
        failed:
 
901
        xt_unlock_mutex_ns(&seg->dls_lock);
 
902
        return FAILED;
 
903
}
 
904
 
 
905
xtBool XTDataLogCache::dlc_get_open_log(XTOpenLogFilePtr *ol, xtLogID log_id)
 
906
{
 
907
        register XTDataLogSegPtr        seg;
 
908
        register u_int                          hash_idx;
 
909
        register XTDataLogFilePtr       data_log;
 
910
        register XTOpenLogFilePtr       open_log;
 
911
        char                                            path[PATH_MAX];
 
912
 
 
913
        /* Which segment, and hash index: */
 
914
        seg = &dlc_segment[log_id & XT_DL_SEGMENT_MASK];
 
915
        hash_idx = (log_id >> XT_DL_SEGMENT_SHIFTS) % XT_DL_SEG_HASH_TABLE_SIZE;
 
916
 
 
917
        /* Lock the segment: */
 
918
        xt_lock_mutex_ns(&seg->dls_lock);
 
919
 
 
920
        /* Find the log file on the hash list: */
 
921
        data_log = seg->dls_hash_table[hash_idx];
 
922
        while (data_log) {
 
923
                if (data_log->dlf_log_id == log_id)
 
924
                        break;
 
925
                data_log = data_log->dlf_next_hash;
 
926
        }
 
927
 
 
928
        if (!data_log) {
 
929
                /* Create a new log file structure: */
 
930
                dlc_name(PATH_MAX, path, log_id);
 
931
                if (!xt_fs_exists(path)) {
 
932
                        xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_DATA_LOG_NOT_FOUND, path);
 
933
                        goto failed;
 
934
                }
 
935
                if (!(data_log = (XTDataLogFilePtr) xt_calloc_ns(sizeof(XTDataLogFileRec))))
 
936
                        goto failed;
 
937
                data_log->dlf_log_id = log_id;
 
938
                data_log->dlf_next_hash = seg->dls_hash_table[hash_idx];
 
939
                seg->dls_hash_table[hash_idx] = data_log;
 
940
        }
 
941
 
 
942
        if ((open_log = data_log->dlf_free_list)) {
 
943
                /* Remove from the free list: */
 
944
                if ((data_log->dlf_free_list = open_log->odl_next_free))
 
945
                        data_log->dlf_free_list->odl_prev_free = NULL;
 
946
 
 
947
                /* This file has been most recently used: */
 
948
                if (XT_TIME_DIFF(open_log->odl_ru_time, dlc_ru_now) > (XT_DL_LOG_POOL_SIZE >> 1)) {
 
949
                        /* Move to the front of the MRU list: */
 
950
                        xt_lock_mutex_ns(&dlc_mru_lock);
 
951
 
 
952
                        open_log->odl_ru_time = ++dlc_ru_now;
 
953
                        if (dlc_mru_open_log != open_log) {
 
954
                                /* Remove from the MRU list: */
 
955
                                if (dlc_lru_open_log == open_log) {
 
956
                                        dlc_lru_open_log = open_log->odl_mr_used;
 
957
                                        ASSERT_NS(!open_log->odl_lr_used);
 
958
                                }
 
959
                                else if (open_log->odl_lr_used)
 
960
                                        open_log->odl_lr_used->odl_mr_used = open_log->odl_mr_used;
 
961
                                if (open_log->odl_mr_used)
 
962
                                        open_log->odl_mr_used->odl_lr_used = open_log->odl_lr_used;
 
963
 
 
964
                                /* Make the file the most recently used: */
 
965
                                if ((open_log->odl_lr_used = dlc_mru_open_log))
 
966
                                        dlc_mru_open_log->odl_mr_used = open_log;
 
967
                                open_log->odl_mr_used = NULL;
 
968
                                dlc_mru_open_log = open_log;
 
969
                                if (!dlc_lru_open_log)
 
970
                                        dlc_lru_open_log = open_log;
 
971
                        }
 
972
                        xt_unlock_mutex_ns(&dlc_mru_lock);
 
973
                }
 
974
        }
 
975
        else {
 
976
                /* Create a new open file: */
 
977
                if (!(open_log = (XTOpenLogFilePtr) xt_calloc_ns(sizeof(XTOpenLogFileRec))))
 
978
                        goto failed;
 
979
                dlc_name(PATH_MAX, path, log_id);
 
980
                if (!xt_open_file_ns(&open_log->odl_log_file, path, XT_FT_STANDARD, XT_FS_DEFAULT, 16*1024*1204)) {
 
981
                        xt_free_ns(open_log);
 
982
                        goto failed;
 
983
                }
 
984
                open_log->olf_log_id = log_id;
 
985
                open_log->odl_data_log = data_log;
 
986
                data_log->dlf_open_count++;
 
987
 
 
988
                /* Make the new open file the most recently used: */
 
989
                xt_lock_mutex_ns(&dlc_mru_lock);
 
990
                open_log->odl_ru_time = ++dlc_ru_now;
 
991
                if ((open_log->odl_lr_used = dlc_mru_open_log))
 
992
                        dlc_mru_open_log->odl_mr_used = open_log;
 
993
                open_log->odl_mr_used = NULL;
 
994
                dlc_mru_open_log = open_log;
 
995
                if (!dlc_lru_open_log)
 
996
                        dlc_lru_open_log = open_log;
 
997
                dlc_open_count++;
 
998
                xt_unlock_mutex_ns(&dlc_mru_lock);
 
999
        }
 
1000
 
 
1001
        open_log->odl_in_use = TRUE;
 
1002
        xt_unlock_mutex_ns(&seg->dls_lock);
 
1003
        *ol = open_log;
 
1004
 
 
1005
        if (dlc_open_count > XT_DL_LOG_POOL_SIZE) {
 
1006
                u_int   target = XT_DL_LOG_POOL_SIZE / 4 * 3;
 
1007
                xtLogID free_log_id;
 
1008
 
 
1009
                /* Remove some open files: */
 
1010
                while (dlc_open_count > target) {
 
1011
                        XTOpenLogFilePtr to_free = dlc_lru_open_log;
 
1012
 
 
1013
                        if (!to_free || to_free->odl_in_use)
 
1014
                                break;
 
1015
 
 
1016
                        /* Dirty read the file ID: */
 
1017
                        free_log_id = to_free->olf_log_id;
 
1018
 
 
1019
                        seg = &dlc_segment[free_log_id & XT_DL_SEGMENT_MASK];
 
1020
 
 
1021
                        /* Lock the segment: */
 
1022
                        xt_lock_mutex_ns(&seg->dls_lock);
 
1023
 
 
1024
                        /* Lock the MRU list: */
 
1025
                        xt_lock_mutex_ns(&dlc_mru_lock);
 
1026
 
 
1027
                        /* Check if we have the same open file: */
 
1028
                        if (dlc_lru_open_log == to_free && !to_free->odl_in_use) {
 
1029
                                data_log = to_free->odl_data_log;
 
1030
                
 
1031
                                /* Remove from the MRU list: */
 
1032
                                dlc_lru_open_log = to_free->odl_mr_used;
 
1033
                                ASSERT_NS(!to_free->odl_lr_used);
 
1034
 
 
1035
                                if (dlc_mru_open_log == to_free) {
 
1036
                                        dlc_mru_open_log = to_free->odl_lr_used;
 
1037
                                        ASSERT_NS(!to_free->odl_mr_used);
 
1038
                                }
 
1039
                                else if (to_free->odl_mr_used)
 
1040
                                        to_free->odl_mr_used->odl_lr_used = to_free->odl_lr_used;
 
1041
 
 
1042
                                /* Remove from the free list of the file: */
 
1043
                                if (data_log->dlf_free_list == to_free) {
 
1044
                                        data_log->dlf_free_list = to_free->odl_next_free;
 
1045
                                        ASSERT_NS(!to_free->odl_prev_free);
 
1046
                                }
 
1047
                                else if (to_free->odl_prev_free)
 
1048
                                        to_free->odl_prev_free->odl_next_free = to_free->odl_next_free;
 
1049
                                if (to_free->odl_next_free)
 
1050
                                        to_free->odl_next_free->odl_prev_free = to_free->odl_prev_free;
 
1051
                                ASSERT_NS(data_log->dlf_open_count > 0);
 
1052
                                data_log->dlf_open_count--;
 
1053
                                dlc_open_count--;
 
1054
                        }
 
1055
                        else
 
1056
                                to_free = NULL;
 
1057
 
 
1058
                        xt_unlock_mutex_ns(&dlc_mru_lock);
 
1059
                        xt_unlock_mutex_ns(&seg->dls_lock);
 
1060
 
 
1061
                        if (to_free) {
 
1062
                                xt_close_file_ns(to_free->odl_log_file);
 
1063
                                xt_free_ns(to_free);
 
1064
                        }
 
1065
                }
 
1066
        }
 
1067
 
 
1068
        return OK;
 
1069
 
 
1070
        failed:
 
1071
        xt_unlock_mutex_ns(&seg->dls_lock);
 
1072
        return FAILED;
 
1073
}
 
1074
 
 
1075
void XTDataLogCache::dlc_release_open_log(XTOpenLogFilePtr open_log)
 
1076
{
 
1077
        register XTDataLogSegPtr        seg;
 
1078
        register XTDataLogFilePtr       data_log = open_log->odl_data_log;
 
1079
 
 
1080
        /* Which segment, and hash index: */
 
1081
        seg = &dlc_segment[open_log->olf_log_id & XT_DL_SEGMENT_MASK];
 
1082
 
 
1083
        xt_lock_mutex_ns(&seg->dls_lock);
 
1084
        open_log->odl_next_free = data_log->dlf_free_list;
 
1085
        open_log->odl_prev_free = NULL;
 
1086
        if (data_log->dlf_free_list)
 
1087
                data_log->dlf_free_list->odl_prev_free = open_log;
 
1088
        data_log->dlf_free_list = open_log;
 
1089
        open_log->odl_in_use = FALSE;
 
1090
 
 
1091
        /* Wakeup any exclusive lockers: */
 
1092
        if (!xt_broadcast_cond_ns(&seg->dls_cond))
 
1093
                xt_log_and_clear_exception_ns();
 
1094
 
 
1095
        xt_unlock_mutex_ns(&seg->dls_lock);
 
1096
}
 
1097
 
 
1098
/*
 
1099
 * --------------------------------------------------------------------------------
 
1100
 * D A T A   L O G   F I L E
 
1101
 */
 
1102
 
 
1103
off_t XTDataLogFile::dlf_space_avaliable()
 
1104
{
 
1105
        if (dlf_log_eof < xt_db_data_log_threshold)
 
1106
                return xt_db_data_log_threshold - dlf_log_eof;
 
1107
        return 0;
 
1108
}
 
1109
 
 
1110
xtBool XTDataLogFile::dlf_to_much_garbage()
 
1111
{
 
1112
        if (!dlf_log_eof)
 
1113
                return FALSE;
 
1114
        return dlf_garbage_count * 100 / dlf_log_eof >= xt_db_garbage_threshold;
 
1115
}
 
1116
 
 
1117
/*
 
1118
 * --------------------------------------------------------------------------------
 
1119
 * D A T A   L O G   B U F F E R
 
1120
 */
 
1121
 
 
1122
void XTDataLogBuffer::dlb_init(XTDatabaseHPtr db, size_t buffer_size)
 
1123
{
 
1124
        ASSERT_NS(!dlb_db);
 
1125
        ASSERT_NS(!dlb_buffer_size);
 
1126
        ASSERT_NS(!dlb_data_log);
 
1127
        ASSERT_NS(!dlb_log_buffer);
 
1128
        dlb_db = db;
 
1129
        dlb_buffer_size = buffer_size;
 
1130
}
 
1131
 
 
1132
void XTDataLogBuffer::dlb_exit(XTThreadPtr self)
 
1133
{
 
1134
        dlb_close_log(self);
 
1135
        if (dlb_log_buffer) {
 
1136
                xt_free(self, dlb_log_buffer);
 
1137
                dlb_log_buffer = NULL;
 
1138
        }
 
1139
        dlb_db = NULL;
 
1140
        dlb_buffer_offset = 0;
 
1141
        dlb_buffer_size = 0;
 
1142
        dlb_buffer_len = 0;
 
1143
        dlb_flush_required = FALSE;
 
1144
#ifdef DEBUG
 
1145
        dlb_max_write_offset = 0;
 
1146
#endif
 
1147
}
 
1148
 
 
1149
xtBool XTDataLogBuffer::dlb_close_log(XTThreadPtr thread)
 
1150
{
 
1151
        if (dlb_data_log) {
 
1152
                /* Flush and commit the data in the old log: */
 
1153
                if (!dlb_flush_log(TRUE, thread))
 
1154
                        return FAILED;
 
1155
 
 
1156
                if (!dlb_db->db_datalogs.dlc_unlock_log(dlb_data_log))
 
1157
                        return FAILED;
 
1158
                dlb_data_log = NULL;
 
1159
        }
 
1160
        return OK;
 
1161
}
 
1162
 
 
1163
/* When I use 'thread' instead of 'self', this means
 
1164
 * that I will not throw an error.
 
1165
 */
 
1166
xtBool XTDataLogBuffer::dlb_get_log_offset(xtLogID *log_id, xtLogOffset *out_offset, size_t XT_UNUSED(req_size), struct XTThread *thread)
 
1167
{
 
1168
        /* Note, I am allowing a log to grow beyond the threshold.
 
1169
         * The amount depends on the maximum extended record size.
 
1170
         * If I don't some logs will never fill up, because of only having
 
1171
         * a few more bytes available.
 
1172
         */
 
1173
        if (!dlb_data_log || dlb_data_log->dlf_space_avaliable() == 0) {
 
1174
                /* Release the old log: */
 
1175
                if (!dlb_close_log(thread))
 
1176
                        return FAILED;
 
1177
 
 
1178
                if (!dlb_log_buffer) {
 
1179
                        if (!(dlb_log_buffer = (xtWord1 *) xt_malloc_ns(dlb_buffer_size)))
 
1180
                                return FAILED;
 
1181
                }
 
1182
 
 
1183
                /* I could use req_size instead of 1, but this would mean some logs
 
1184
                 * are never filled up.
 
1185
                 */
 
1186
                if (!(dlb_data_log = dlb_db->db_datalogs.dlc_get_log_for_writing(1, thread)))
 
1187
                        return FAILED;
 
1188
#ifdef DEBUG
 
1189
                dlb_max_write_offset = dlb_data_log->dlf_log_eof;
 
1190
#endif
 
1191
        }
 
1192
 
 
1193
        *log_id = dlb_data_log->dlf_log_id;
 
1194
        *out_offset = dlb_data_log->dlf_log_eof;
 
1195
        return OK;
 
1196
}
 
1197
 
 
1198
xtBool XTDataLogBuffer::dlb_flush_log(xtBool commit, XTThreadPtr thread)
 
1199
{
 
1200
        if (!dlb_data_log || !dlb_data_log->dlf_log_file)
 
1201
                return OK;
 
1202
 
 
1203
        if (dlb_buffer_len) {
 
1204
                if (!xt_pwrite_file(dlb_data_log->dlf_log_file, dlb_buffer_offset, dlb_buffer_len, dlb_log_buffer, &thread->st_statistics.st_data, thread))
 
1205
                        return FAILED;
 
1206
#ifdef DEBUG
 
1207
                if (dlb_buffer_offset + dlb_buffer_len > dlb_max_write_offset)
 
1208
                        dlb_max_write_offset = dlb_buffer_offset + (xtLogOffset) dlb_buffer_len;
 
1209
#endif
 
1210
                dlb_buffer_len = 0;
 
1211
                dlb_flush_required = TRUE;
 
1212
        }
 
1213
 
 
1214
        if (commit && dlb_flush_required) {
 
1215
#ifdef DEBUG
 
1216
                /* This would normally be equal, however, in the case
 
1217
                 * where some other thread flushes the compactors
 
1218
                 * data log, the eof, can be greater than the
 
1219
                 * write offset.
 
1220
                 *
 
1221
                 * This occurs because the flush can come between the 
 
1222
                 * dlb_get_log_offset() and dlb_write_thru_log() calls.
 
1223
                 */
 
1224
                ASSERT_NS(dlb_data_log->dlf_log_eof >= dlb_max_write_offset);
 
1225
#endif
 
1226
                if (!xt_flush_file(dlb_data_log->dlf_log_file, &thread->st_statistics.st_data, thread))
 
1227
                        return FAILED;
 
1228
                dlb_flush_required = FALSE;
 
1229
        }
 
1230
        return OK;
 
1231
}
 
1232
 
 
1233
//#define INJECT_ERROR
 
1234
 
 
1235
#ifdef INJECT_ERROR
 
1236
int inject_when;
 
1237
#endif
 
1238
 
 
1239
xtBool XTDataLogBuffer::dlb_write_thru_log(xtLogID XT_NDEBUG_UNUSED(log_id), xtLogOffset log_offset, size_t size, xtWord1 *data, XTThreadPtr thread)
 
1240
{
 
1241
        ASSERT_NS(log_id == dlb_data_log->dlf_log_id);
 
1242
 
 
1243
        if (dlb_buffer_len)
 
1244
                dlb_flush_log(FALSE, thread);
 
1245
 
 
1246
#ifdef INJECT_ERROR
 
1247
        inject_when++;
 
1248
        if (inject_when > 1000 && inject_when < 1002)
 
1249
                return xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(dlb_data_log->dlf_log_file));
 
1250
#endif
 
1251
        if (!xt_pwrite_file(dlb_data_log->dlf_log_file, log_offset, size, data, &thread->st_statistics.st_data, thread))
 
1252
                return FAILED;
 
1253
        /* Increment of dlb_data_log->dlf_log_eof was moved here from dlb_get_log_offset()
 
1254
         * to ensure it is done after a successful update of the log, otherwise otherwise a 
 
1255
         * gap occurs in the log which cause eof to be detected  in middle of the log
 
1256
         */
 
1257
        dlb_data_log->dlf_log_eof += size;
 
1258
#ifdef DEBUG
 
1259
        if (log_offset + size > dlb_max_write_offset)
 
1260
                dlb_max_write_offset = log_offset + size;
 
1261
#endif
 
1262
        dlb_flush_required = TRUE;
 
1263
        return OK;
 
1264
}
 
1265
 
 
1266
xtBool XTDataLogBuffer::dlb_append_log(xtLogID XT_NDEBUG_UNUSED(log_id), xtLogOffset log_offset, size_t size, xtWord1 *data, XTThreadPtr thread)
 
1267
{
 
1268
        ASSERT_NS(log_id == dlb_data_log->dlf_log_id);
 
1269
 
 
1270
        if (dlb_buffer_len) {
 
1271
                /* Should be the case, we only write by appending: */
 
1272
                ASSERT_NS(dlb_buffer_offset + (xtLogOffset) dlb_buffer_len == log_offset);
 
1273
                /* Check if we are appending to the existing value in the buffer: */
 
1274
                if (dlb_buffer_offset + (xtLogOffset) dlb_buffer_len == log_offset) {
 
1275
                        /* Can we just append: */
 
1276
                        if (dlb_buffer_size >= dlb_buffer_len + size) {
 
1277
                                memcpy(dlb_log_buffer + dlb_buffer_len, data, size);
 
1278
                                dlb_buffer_len += size;
 
1279
                                dlb_data_log->dlf_log_eof += size;
 
1280
                                return OK;
 
1281
                        }
 
1282
                }
 
1283
                if (dlb_flush_log(FALSE, thread) != OK)
 
1284
                        return FAILED;
 
1285
        }
 
1286
        
 
1287
        ASSERT_NS(dlb_buffer_len == 0);
 
1288
        
 
1289
        if (dlb_buffer_size >= size) {
 
1290
                dlb_buffer_offset = log_offset;
 
1291
                dlb_buffer_len = size;
 
1292
                memcpy(dlb_log_buffer, data, size);
 
1293
                dlb_data_log->dlf_log_eof += size;
 
1294
                return OK;
 
1295
        }
 
1296
 
 
1297
        /* Write directly: */
 
1298
        if (!xt_pwrite_file(dlb_data_log->dlf_log_file, log_offset, size, data, &thread->st_statistics.st_data, thread))
 
1299
                return FAILED;
 
1300
#ifdef DEBUG
 
1301
        if (log_offset + size > dlb_max_write_offset)
 
1302
                dlb_max_write_offset = log_offset + size;
 
1303
#endif
 
1304
        dlb_flush_required = TRUE;
 
1305
        dlb_data_log->dlf_log_eof += size;
 
1306
        return OK;
 
1307
}
 
1308
 
 
1309
xtBool XTDataLogBuffer::dlb_read_log(xtLogID log_id, xtLogOffset log_offset, size_t size, xtWord1 *data, XTThreadPtr thread)
 
1310
{
 
1311
        size_t                          red_size;
 
1312
        XTOpenLogFilePtr        open_log;
 
1313
 
 
1314
        if (dlb_data_log && log_id == dlb_data_log->dlf_log_id) {
 
1315
                /* Reading from the write log, I can do this quicker: */
 
1316
                if (dlb_buffer_len) {
 
1317
                        /* If it is in the buffer, then it is completely in the buffer. */
 
1318
                        if (log_offset >= dlb_buffer_offset) {
 
1319
                                if (log_offset + (xtLogOffset) size <= dlb_buffer_offset + (xtLogOffset) dlb_buffer_len) {
 
1320
                                        memcpy(data, dlb_log_buffer + (log_offset - dlb_buffer_offset), size);
 
1321
                                        return OK;
 
1322
                                }
 
1323
                                /* Should not happen, reading past EOF: */
 
1324
                                ASSERT_NS(FALSE);
 
1325
                                memset(data, 0, size);
 
1326
                                return OK;
 
1327
                        }
 
1328
                        /* In the write log, but not in the buffer,
 
1329
                         * must be completely not in the log,
 
1330
                         * because only whole records are written to the
 
1331
                         * log:
 
1332
                         */
 
1333
                        ASSERT_NS(log_offset + (xtLogOffset) size <= dlb_buffer_offset);
 
1334
                }               
 
1335
                return xt_pread_file(dlb_data_log->dlf_log_file, log_offset, size, size, data, NULL, &thread->st_statistics.st_data, thread);
 
1336
        }
 
1337
 
 
1338
        /* Read from some other log: */
 
1339
        if (!dlb_db->db_datalogs.dlc_get_open_log(&open_log, log_id))
 
1340
                return FAILED;
 
1341
 
 
1342
        if (!xt_pread_file(open_log->odl_log_file, log_offset, size, 0, data, &red_size, &thread->st_statistics.st_data, thread)) {
 
1343
                dlb_db->db_datalogs.dlc_release_open_log(open_log);
 
1344
                return FAILED;
 
1345
        }
 
1346
 
 
1347
        dlb_db->db_datalogs.dlc_release_open_log(open_log);
 
1348
 
 
1349
        if (red_size < size)
 
1350
                memset(data + red_size, 0, size - red_size);
 
1351
 
 
1352
        return OK;
 
1353
}
 
1354
 
 
1355
/*
 
1356
 * We assume that the given reference may not be valid.
 
1357
 * Only valid references actually cause a delete.
 
1358
 * Invalid references are logged, and ignored.
 
1359
 *
 
1360
 * Note this routine does not lock the compactor.
 
1361
 * This can lead to the some incorrect calculation is the
 
1362
 * amount of garbage. But nothing serious I think.
 
1363
 */
 
1364
xtBool XTDataLogBuffer::dlb_delete_log(xtLogID log_id, xtLogOffset log_offset, size_t size, xtTableID tab_id, xtRecordID rec_id, XTThreadPtr thread)
 
1365
{
 
1366
        XTactExtRecEntryDRec    record;
 
1367
        xtWord1                                 status = XT_LOG_ENT_EXT_REC_DEL;
 
1368
        XTOpenLogFilePtr                open_log;
 
1369
        xtBool                                  to_much_garbage;
 
1370
        XTDataLogFilePtr                data_log;
 
1371
 
 
1372
        if (!dlb_read_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data), (xtWord1 *) &record, thread))
 
1373
                return FAILED;
 
1374
 
 
1375
        /* Already deleted: */
 
1376
        if (record.er_status_1 == XT_LOG_ENT_EXT_REC_DEL)
 
1377
                return OK;
 
1378
 
 
1379
        if (record.er_status_1 != XT_LOG_ENT_EXT_REC_OK ||
 
1380
                size != XT_GET_DISK_4(record.er_data_size_4) ||
 
1381
                tab_id != XT_GET_DISK_4(record.er_tab_id_4) ||
 
1382
                rec_id != XT_GET_DISK_4(record.er_rec_id_4)) {
 
1383
                xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_EXT_RECORD);
 
1384
                return FAILED;
 
1385
        }
 
1386
 
 
1387
        if (dlb_data_log && log_id == dlb_data_log->dlf_log_id) {
 
1388
                /* Writing to the write log, I can do this quicker: */
 
1389
                if (dlb_buffer_len) {
 
1390
                        /* If it is in the buffer, then it is completely in the buffer. */
 
1391
                        if (log_offset >= dlb_buffer_offset) {
 
1392
                                if (log_offset + 1 <= dlb_buffer_offset + (xtLogOffset) dlb_buffer_len) {
 
1393
                                        *(dlb_log_buffer + (log_offset - dlb_buffer_offset)) = XT_LOG_ENT_EXT_REC_DEL;
 
1394
                                        goto inc_garbage_count;
 
1395
                                }
 
1396
                                /* Should not happen, writing past EOF: */
 
1397
                                ASSERT_NS(FALSE);
 
1398
                                return OK;
 
1399
                        }
 
1400
                        ASSERT_NS(log_offset + (xtLogOffset) size <= dlb_buffer_offset);
 
1401
                }
 
1402
 
 
1403
                if (!xt_pwrite_file(dlb_data_log->dlf_log_file, log_offset, 1, &status, &thread->st_statistics.st_data, thread))
 
1404
                        return FAILED;
 
1405
                
 
1406
                inc_garbage_count:
 
1407
                xt_lock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
 
1408
                dlb_data_log->dlf_garbage_count += offsetof(XTactExtRecEntryDRec, er_data) + size;
 
1409
                ASSERT_NS(dlb_data_log->dlf_garbage_count < dlb_data_log->dlf_log_eof);
 
1410
                if (!dl_write_garbage_level(dlb_data_log, dlb_data_log->dlf_log_file, FALSE, thread)) {
 
1411
                        xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
 
1412
                        return FAILED;
 
1413
                }
 
1414
                dlb_flush_required = TRUE;
 
1415
                xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
 
1416
                return OK;
 
1417
        }
 
1418
 
 
1419
        /* Write to some other log, open the log: */
 
1420
        if (!dlb_db->db_datalogs.dlc_get_open_log(&open_log, log_id))
 
1421
                return FAILED;
 
1422
 
 
1423
        /* Write the status byte: */
 
1424
        if (!xt_pwrite_file(open_log->odl_log_file, log_offset, 1, &status, &thread->st_statistics.st_data, thread))
 
1425
                goto failed;
 
1426
 
 
1427
        data_log = open_log->odl_data_log;
 
1428
 
 
1429
        /* Adjust the garbage level in the header. */
 
1430
        xt_lock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
 
1431
        data_log->dlf_garbage_count += offsetof(XTactExtRecEntryDRec, er_data) + size;
 
1432
        ASSERT_NS(data_log->dlf_garbage_count < data_log->dlf_log_eof);
 
1433
        if (!dl_write_garbage_level(data_log, open_log->odl_log_file, FALSE, thread)) {
 
1434
                xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
 
1435
                goto failed;
 
1436
        }
 
1437
        to_much_garbage = data_log->dlf_to_much_garbage();
 
1438
        xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
 
1439
 
 
1440
        if (to_much_garbage &&
 
1441
                (data_log->dlf_state == XT_DL_HAS_SPACE || data_log->dlf_state == XT_DL_READ_ONLY)) {
 
1442
                /* There is too much garbage, it may be compacted. */
 
1443
                if (!dlb_db->db_datalogs.dls_set_log_state(data_log, XT_DL_MAY_COMPACT))
 
1444
                        goto failed;
 
1445
        }
 
1446
 
 
1447
        /* Release the open log: */
 
1448
        dlb_db->db_datalogs.dlc_release_open_log(open_log);
 
1449
        
 
1450
        return OK;
 
1451
 
 
1452
        failed:
 
1453
        dlb_db->db_datalogs.dlc_release_open_log(open_log);
 
1454
        return FAILED;
 
1455
}
 
1456
 
 
1457
/*
 
1458
 * Delete all the extended data belonging to a particular
 
1459
 * table.
 
1460
 */
 
1461
xtPublic void xt_dl_delete_ext_data(XTThreadPtr self, XTTableHPtr tab, xtBool XT_UNUSED(missing_ok), xtBool have_table_lock)
 
1462
{
 
1463
        XTOpenTablePtr  ot;
 
1464
        xtRecordID              page_rec_id, offs_rec_id;
 
1465
        XTTabRecExtDPtr rec_buf;
 
1466
        xtWord4                 log_over_size;
 
1467
        xtLogID                 log_id;
 
1468
        xtLogOffset             log_offset;
 
1469
        xtWord1                 *page_data;
 
1470
 
 
1471
        page_data = (xtWord1 *) xt_malloc(self, tab->tab_recs.tci_page_size);
 
1472
        pushr_(xt_free, page_data);
 
1473
 
 
1474
        /* Scan the table, and remove all exended data... */
 
1475
        if (!(ot = xt_open_table(tab))) {
 
1476
                if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
 
1477
                        XT_FILE_NOT_FOUND(self->t_exception.e_sys_err))
 
1478
                        return;
 
1479
                xt_throw(self);
 
1480
        }
 
1481
        ot->ot_thread = self;
 
1482
 
 
1483
        /* {LOCK-EXT-REC} This lock is to stop the compactor changing records 
 
1484
         * while we are doing the delete.
 
1485
         */
 
1486
        xt_lock_mutex_ns(&tab->tab_db->db_co_ext_lock);
 
1487
 
 
1488
        page_rec_id = 1;
 
1489
        while (page_rec_id < tab->tab_rec_eof_id) {
 
1490
                /* NOTE: There is a good reason for using xt_tc_read_page().
 
1491
                 * A deadlock can occur if using read, which can run out of
 
1492
                 * memory, which waits for the freeer, which may need to
 
1493
                 * open a table, which requires the db->db_tables lock,
 
1494
                 * which is owned by the this thread, when the function
 
1495
                 * is called from drop table.
 
1496
                 *
 
1497
                 * xt_tc_read_page() should work because no more changes
 
1498
                 * should happen to the table while we are dropping it.
 
1499
                 */
 
1500
                if (!tab->tab_recs.xt_tc_read_page(ot->ot_rec_file, page_rec_id, page_data, self))
 
1501
                        goto failed;
 
1502
 
 
1503
                for (offs_rec_id=0; offs_rec_id<tab->tab_recs.tci_rows_per_page && page_rec_id+offs_rec_id < tab->tab_rec_eof_id; offs_rec_id++) {
 
1504
                        rec_buf = (XTTabRecExtDPtr) (page_data + (offs_rec_id * tab->tab_recs.tci_rec_size));
 
1505
                        if (XT_REC_IS_EXT_DLOG(rec_buf->tr_rec_type_1)) {
 
1506
                                log_over_size = XT_GET_DISK_4(rec_buf->re_log_dat_siz_4);
 
1507
                                XT_GET_LOG_REF(log_id, log_offset, rec_buf);
 
1508
 
 
1509
                                if (tab->tab_dic.dic_tab_flags & XT_TF_MEMORY_TABLE)
 
1510
                                        xt_tab_free_ext_slot(tab, log_id, log_offset, log_over_size);
 
1511
                                else {
 
1512
                                        if (!self->st_dlog_buf.dlb_delete_log(log_id, log_offset, log_over_size, tab->tab_id, page_rec_id+offs_rec_id, self)) {
 
1513
                                                if (self->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
 
1514
                                                        self->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
 
1515
                                                        xt_log_and_clear_exception(self);
 
1516
                                        }
 
1517
                                }
 
1518
                        }
 
1519
                }
 
1520
 
 
1521
                page_rec_id += tab->tab_recs.tci_rows_per_page;
 
1522
        }
 
1523
 
 
1524
        xt_unlock_mutex_ns(&tab->tab_db->db_co_ext_lock);
 
1525
 
 
1526
        xt_close_table(ot, TRUE, have_table_lock);
 
1527
        
 
1528
        freer_(); // xt_free(page_data)
 
1529
        return;
 
1530
        
 
1531
        failed:
 
1532
        xt_unlock_mutex_ns(&tab->tab_db->db_co_ext_lock);
 
1533
 
 
1534
        xt_close_table(ot, TRUE, have_table_lock);
 
1535
        xt_throw(self);
 
1536
}
 
1537
 
 
1538
/*
 
1539
 * --------------------------------------------------------------------------------
 
1540
 * GARBAGE COLLECTOR THREAD
 
1541
 */
 
1542
 
 
1543
xtPublic void xt_dl_init_db(XTThreadPtr self, XTDatabaseHPtr db)
 
1544
{
 
1545
        xt_init_mutex_with_autoname(self, &db->db_co_ext_lock);
 
1546
        xt_init_mutex_with_autoname(self, &db->db_co_dlog_lock);
 
1547
}
 
1548
 
 
1549
xtPublic void xt_dl_exit_db(XTThreadPtr self, XTDatabaseHPtr db)
 
1550
{
 
1551
        xt_stop_compactor(self, db);    // Already done!
 
1552
        db->db_co_thread = NULL;
 
1553
        xt_free_mutex(&db->db_co_ext_lock);
 
1554
        xt_free_mutex(&db->db_co_dlog_lock);
 
1555
}
 
1556
 
 
1557
xtPublic void xt_dl_set_to_delete(XTThreadPtr self, XTDatabaseHPtr db, xtLogID log_id)
 
1558
{
 
1559
        XTDataLogFilePtr data_log;
 
1560
 
 
1561
        if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, FALSE, NULL))
 
1562
                xt_throw(self);
 
1563
        if (data_log) {
 
1564
                if (!db->db_datalogs.dls_set_log_state(data_log, XT_DL_TO_DELETE))
 
1565
                        xt_throw(self);
 
1566
        }
 
1567
}
 
1568
 
 
1569
xtPublic void xt_dl_log_status(XTThreadPtr self, XTDatabaseHPtr db, XTStringBufferPtr strbuf)
 
1570
{
 
1571
        XTSortedListPtr         list;
 
1572
        XTDataLogFilePtr        data_log;
 
1573
        XTDataLogSegPtr         seg;
 
1574
        u_int                           no_of_logs;
 
1575
        xtLogID                         *log_id_ptr;
 
1576
 
 
1577
        list = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
 
1578
        pushr_(xt_free_sortedlist, list);
 
1579
 
 
1580
        for (u_int i=0; i<XT_DL_NO_OF_SEGMENTS; i++) {
 
1581
                for (u_int j=0; j<XT_DL_SEG_HASH_TABLE_SIZE; j++) {
 
1582
                        seg = &db->db_datalogs.dlc_segment[i];
 
1583
                        data_log = seg->dls_hash_table[j];
 
1584
                        while (data_log) {
 
1585
                                xt_sl_insert(self, list, &data_log->dlf_log_id, &data_log->dlf_log_id);
 
1586
                                data_log = data_log->dlf_next_hash;
 
1587
                        }
 
1588
                }
 
1589
        }
 
1590
 
 
1591
        no_of_logs = xt_sl_get_size(list);
 
1592
        for (u_int i=0; i<no_of_logs; i++) {
 
1593
                log_id_ptr = (xtLogID *) xt_sl_item_at(list, i);
 
1594
                if (!db->db_datalogs.dlc_get_data_log(&data_log, *log_id_ptr, FALSE, &seg))
 
1595
                        xt_throw(self);
 
1596
                if (data_log) {
 
1597
                        xt_sb_concat(self, strbuf, "d-log: ");
 
1598
                        xt_sb_concat_int8(self, strbuf, data_log->dlf_log_id);
 
1599
                        xt_sb_concat(self, strbuf, " status=");
 
1600
                        switch (data_log->dlf_state) {
 
1601
                                case XT_DL_UNKNOWN:
 
1602
                                        xt_sb_concat(self, strbuf, "?");
 
1603
                                        break;
 
1604
                                case XT_DL_HAS_SPACE:
 
1605
                                        xt_sb_concat(self, strbuf, "has-space ");
 
1606
                                        break;
 
1607
                                case XT_DL_READ_ONLY:
 
1608
                                        xt_sb_concat(self, strbuf, "read-only ");
 
1609
                                        break;
 
1610
                                case XT_DL_TO_COMPACT:
 
1611
                                        xt_sb_concat(self, strbuf, "to-compact");
 
1612
                                        break;
 
1613
                                case XT_DL_COMPACTED:
 
1614
                                        xt_sb_concat(self, strbuf, "compacted ");
 
1615
                                        break;
 
1616
                                case XT_DL_TO_DELETE:
 
1617
                                        xt_sb_concat(self, strbuf, "to-delete ");
 
1618
                                        break;
 
1619
                                case XT_DL_DELETED:
 
1620
                                        xt_sb_concat(self, strbuf, "deleted   ");
 
1621
                                        break;
 
1622
                                case XT_DL_EXCLUSIVE:
 
1623
                                        xt_sb_concat(self, strbuf, "x-locked  ");
 
1624
                                        break;
 
1625
                        }
 
1626
                        xt_sb_concat(self, strbuf, " eof=");
 
1627
                        xt_sb_concat_int8(self, strbuf, data_log->dlf_log_eof);
 
1628
                        xt_sb_concat(self, strbuf, " garbage=");
 
1629
                        xt_sb_concat_int8(self, strbuf, data_log->dlf_garbage_count);
 
1630
                        xt_sb_concat(self, strbuf, " g%=");
 
1631
                        if (data_log->dlf_log_eof)
 
1632
                                xt_sb_concat_int8(self, strbuf, data_log->dlf_garbage_count * 100 / data_log->dlf_log_eof);
 
1633
                        else
 
1634
                                xt_sb_concat(self, strbuf, "100");
 
1635
                        xt_sb_concat(self, strbuf, " open=");
 
1636
                        xt_sb_concat_int8(self, strbuf, data_log->dlf_open_count);
 
1637
                        xt_sb_concat(self, strbuf, "\n");
 
1638
                }
 
1639
                xt_unlock_mutex_ns(&seg->dls_lock);
 
1640
        }
 
1641
 
 
1642
        freer_(); // xt_free_sortedlist(list)
 
1643
}
 
1644
 
 
1645
xtPublic void xt_dl_delete_logs(XTThreadPtr self, XTDatabaseHPtr db)
 
1646
{
 
1647
        char                    path[PATH_MAX];
 
1648
        XTOpenDirPtr    od;
 
1649
        char                    *file;
 
1650
        xtLogID                 log_id;
 
1651
 
 
1652
        xt_strcpy(PATH_MAX, path, db->db_main_path);
 
1653
        xt_add_data_dir(PATH_MAX, path);
 
1654
        if (!xt_fs_exists(path))
 
1655
                return;
 
1656
        pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
 
1657
        while (xt_dir_next(self, od)) {
 
1658
                file = xt_dir_name(self, od);
 
1659
                if ((log_id = (xtLogID) xt_file_name_to_id(file))) {
 
1660
                        if (!db->db_datalogs.dlc_remove_data_log(log_id, TRUE))
 
1661
                                xt_log_and_clear_exception(self);
 
1662
                }
 
1663
                if (xt_ends_with(file, ".xt")) {
 
1664
                        xt_add_dir_char(PATH_MAX, path);
 
1665
                        xt_strcat(PATH_MAX, path, file);
 
1666
                        xt_fs_delete(self, path);
 
1667
                        xt_remove_last_name_of_path(path);
 
1668
                }
 
1669
        }
 
1670
        freer_(); // xt_dir_close(od)
 
1671
 
 
1672
        /* I no longer attach the condition: !db->db_multi_path
 
1673
         * to removing this directory. This is because
 
1674
         * the pbxt directory must now be removed explicitly
 
1675
         * by drop database, or by delete all the PBXT
 
1676
         * system tables.
 
1677
         */
 
1678
        if (!xt_fs_rmdir(NULL, path))
 
1679
                xt_log_and_clear_exception(self);
 
1680
}
 
1681
 
 
1682
typedef struct XTCompactorState {
 
1683
        XTSeqLogReadPtr                 cs_seqread;
 
1684
        XTOpenTablePtr                  cs_ot;
 
1685
        XTDataBufferRec                 cs_databuf;
 
1686
} XTCompactorStateRec, *XTCompactorStatePtr;
 
1687
 
 
1688
static void dl_free_compactor_state(XTThreadPtr self, XTCompactorStatePtr cs)
 
1689
{
 
1690
        if (cs->cs_seqread) {
 
1691
                cs->cs_seqread->sl_seq_exit();
 
1692
                delete cs->cs_seqread;
 
1693
                cs->cs_seqread = NULL;
 
1694
        }
 
1695
        if (cs->cs_ot) {
 
1696
                xt_db_return_table_to_pool(self, cs->cs_ot);
 
1697
                cs->cs_ot = NULL;
 
1698
        }
 
1699
        xt_db_set_size(self, &cs->cs_databuf, 0);
 
1700
}
 
1701
 
 
1702
static XTOpenTablePtr dl_cs_get_open_table(XTThreadPtr self, XTCompactorStatePtr cs, xtTableID tab_id)
 
1703
{
 
1704
        if (cs->cs_ot) {
 
1705
                if (cs->cs_ot->ot_table->tab_id == tab_id)
 
1706
                        return cs->cs_ot;
 
1707
 
 
1708
                xt_db_return_table_to_pool(self, cs->cs_ot);
 
1709
                cs->cs_ot = NULL;
 
1710
        }
 
1711
 
 
1712
        if (!cs->cs_ot) {
 
1713
                if (!(cs->cs_ot = xt_db_open_pool_table(self, self->st_database, tab_id, NULL, TRUE)))
 
1714
                        return NULL;
 
1715
        }
 
1716
 
 
1717
        return cs->cs_ot;
 
1718
}
 
1719
 
 
1720
static void dl_co_wait(XTThreadPtr self, XTDatabaseHPtr db, u_int secs)
 
1721
{
 
1722
        xt_lock_mutex(self, &db->db_datalogs.dlc_lock);
 
1723
        pushr_(xt_unlock_mutex, &db->db_datalogs.dlc_lock);
 
1724
        if (!self->t_quit)
 
1725
                xt_timed_wait_cond(self, &db->db_datalogs.dlc_cond, &db->db_datalogs.dlc_lock, secs * 1000);
 
1726
        freer_(); // xt_unlock_mutex(&db->db_datalogs.dlc_lock)
 
1727
}
 
1728
 
 
1729
/*
 
1730
 * Collect all the garbage in a file by moving all valid records
 
1731
 * into some other data log and updating the handles.
 
1732
 */
 
1733
static xtBool dl_collect_garbage(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogFilePtr data_log)
 
1734
{
 
1735
        XTXactLogBufferDPtr     record;
 
1736
        size_t                          size;
 
1737
        xtTableID                       tab_id;
 
1738
        xtRecordID                      rec_id;
 
1739
        XTCompactorStateRec     cs;
 
1740
        XTOpenTablePtr          ot;
 
1741
        XTTableHPtr                     tab;
 
1742
        XTTabRecExtDRec         rec_buffer;
 
1743
        size_t                          src_size;
 
1744
        xtLogID                         src_log_id;
 
1745
        xtLogOffset                     src_log_offset;
 
1746
        xtLogID                         curr_log_id;
 
1747
        xtLogOffset                     curr_log_offset;
 
1748
        xtLogID                         dest_log_id;
 
1749
        xtLogOffset                     dest_log_offset;
 
1750
        off_t                           garbage_count = 0;
 
1751
 
 
1752
        memset(&cs, 0, sizeof(XTCompactorStateRec));
 
1753
 
 
1754
        if (!(cs.cs_seqread = new XTDataSeqRead()))
 
1755
                xt_throw_errno(XT_CONTEXT, XT_ENOMEM);
 
1756
 
 
1757
        if (!cs.cs_seqread->sl_seq_init(db, xt_db_log_buffer_size)) {
 
1758
                delete cs.cs_seqread;
 
1759
                xt_throw(self);
 
1760
        }
 
1761
        pushr_(dl_free_compactor_state, &cs);
 
1762
 
 
1763
        if (!cs.cs_seqread->sl_seq_start(data_log->dlf_log_id, data_log->dlf_start_offset, FALSE))
 
1764
                xt_throw(self);
 
1765
 
 
1766
        for (;;) {
 
1767
                if (self->t_quit) {
 
1768
                        /* Flush the destination log: */
 
1769
                        xt_lock_mutex(self, &db->db_co_dlog_lock);
 
1770
                        pushr_(xt_unlock_mutex, &db->db_co_dlog_lock);
 
1771
                        if (!self->st_dlog_buf.dlb_flush_log(TRUE, self))
 
1772
                                xt_throw(self);
 
1773
                        freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)
 
1774
 
 
1775
                        /* Flush the transaction log. */
 
1776
                        if (!xt_xlog_flush_log(db, self))
 
1777
                                xt_throw(self);
 
1778
 
 
1779
                        xt_lock_mutex_ns(&db->db_datalogs.dlc_head_lock);
 
1780
                        data_log->dlf_garbage_count += garbage_count;
 
1781
                        ASSERT(data_log->dlf_garbage_count < data_log->dlf_log_eof);
 
1782
                        if (!dl_write_garbage_level(data_log, cs.cs_seqread->sl_seq_open_file(), TRUE, self)) {
 
1783
                                xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);
 
1784
                                xt_throw(self);
 
1785
                        }
 
1786
                        xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);
 
1787
 
 
1788
                        freer_(); // dl_free_compactor_state(&cs)
 
1789
                        return FAILED;
 
1790
                }
 
1791
                if (!cs.cs_seqread->sl_seq_next(&record, self))
 
1792
                        xt_throw(self);
 
1793
                cs.cs_seqread->sl_seq_pos(&curr_log_id, &curr_log_offset);
 
1794
                if (!record) {
 
1795
                        data_log->dlf_start_offset = curr_log_offset;
 
1796
                        break;
 
1797
                }
 
1798
                switch (record->xh.xh_status_1) {
 
1799
                        case XT_LOG_ENT_EXT_REC_OK:
 
1800
                                size = XT_GET_DISK_4(record->er.er_data_size_4);
 
1801
                                tab_id = XT_GET_DISK_4(record->er.er_tab_id_4);
 
1802
                                rec_id = XT_GET_DISK_4(record->er.er_rec_id_4);
 
1803
                                
 
1804
                                if (!(ot = dl_cs_get_open_table(self, &cs, tab_id)))
 
1805
                                        break;
 
1806
                                tab = ot->ot_table;
 
1807
                                
 
1808
                                /* All this is required for a valid record address: */
 
1809
                                if (!rec_id || rec_id >= tab->tab_rec_eof_id)
 
1810
                                        break;
 
1811
 
 
1812
                                /* {LOCK-EXT-REC} It is important to prevent the compactor from modifying
 
1813
                                 * a record that has been freed (and maybe allocated again).
 
1814
                                 *
 
1815
                                 * Consider the following sequence:
 
1816
                                 *
 
1817
                                 * 1. Compactor reads the record.
 
1818
                                 * 2. The record is freed and reallocated.
 
1819
                                 * 3. The compactor updates the record.
 
1820
                                 *
 
1821
                                 * To prevent this, the compactor locks out the
 
1822
                                 * sweeper using the db_co_ext_lock lock. The db_co_ext_lock lock
 
1823
                                 * prevents a extended record from being moved and removed at the
 
1824
                                 * same time.
 
1825
                                 *
 
1826
                                 * The compactor also checks the status of the record before
 
1827
                                 * moving a record.
 
1828
                                 */
 
1829
                                xt_lock_mutex(self, &db->db_co_ext_lock);
 
1830
                                pushr_(xt_unlock_mutex, &db->db_co_ext_lock);
 
1831
 
 
1832
                                /* Read the record: */
 
1833
                                if (!xt_tab_get_rec_data(ot, rec_id, offsetof(XTTabRecExtDRec, re_data), (xtWord1 *) &rec_buffer)) {
 
1834
                                        xt_log_and_clear_warning(self);
 
1835
                                        freer_(); // xt_unlock_mutex(&db->db_co_ext_lockk)
 
1836
                                        break;
 
1837
                                }
 
1838
 
 
1839
                                /* [(7)] REMOVE is followed by FREE:
 
1840
                                if (XT_REC_IS_REMOVED(rec_buffer.tr_rec_type_1) || !XT_REC_IS_EXT_DLOG(rec_buffer.tr_rec_type_1)) {
 
1841
                                */
 
1842
                                if (!XT_REC_IS_EXT_DLOG(rec_buffer.tr_rec_type_1)) {
 
1843
                                        freer_(); // xt_unlock_mutex(&db->db_co_ext_lock)
 
1844
                                        break;
 
1845
                                }
 
1846
 
 
1847
                                XT_GET_LOG_REF(src_log_id, src_log_offset, &rec_buffer);
 
1848
                                src_size = (size_t) XT_GET_DISK_4(rec_buffer.re_log_dat_siz_4);
 
1849
 
 
1850
                                /* Does the record agree with the current position: */
 
1851
                                if (curr_log_id != src_log_id ||
 
1852
                                        curr_log_offset != src_log_offset ||
 
1853
                                        size != src_size) {
 
1854
                                        freer_(); // xt_unlock_mutex(&db->db_co_ext_lock)
 
1855
                                        break;
 
1856
                                }
 
1857
 
 
1858
                                size = offsetof(XTactExtRecEntryDRec, er_data) + size;
 
1859
 
 
1860
                                /* Allocate space in a destination log: */
 
1861
                                xt_lock_mutex(self, &db->db_co_dlog_lock);
 
1862
                                pushr_(xt_unlock_mutex, &db->db_co_dlog_lock);
 
1863
                                if (!self->st_dlog_buf.dlb_get_log_offset(&dest_log_id, &dest_log_offset, size, self))
 
1864
                                        xt_throw(self);
 
1865
                                freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)
 
1866
 
 
1867
                                /* This record is referenced by the data: */
 
1868
                                xt_db_set_size(self, &cs.cs_databuf, size);
 
1869
                                if (!cs.cs_seqread->sl_rnd_read(src_log_offset, size, cs.cs_databuf.db_data, NULL, self))
 
1870
                                        xt_throw(self);
 
1871
 
 
1872
                                /* The problem with writing to the buffer here, is that other
 
1873
                                 * threads want to read the data! */
 
1874
                                xt_lock_mutex(self, &db->db_co_dlog_lock);
 
1875
                                pushr_(xt_unlock_mutex, &db->db_co_dlog_lock);
 
1876
                                if (!self->st_dlog_buf.dlb_write_thru_log(dest_log_id, dest_log_offset, size, cs.cs_databuf.db_data, self))
 
1877
                                        xt_throw(self);
 
1878
                                freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)
 
1879
 
 
1880
                                /* Make sure we flush the compactor target log, before we
 
1881
                                 * flush the transaction log!!
 
1882
                                 * This is done here [(8)]
 
1883
                                 */
 
1884
 
 
1885
                                XT_SET_LOG_REF(&rec_buffer, dest_log_id, dest_log_offset);
 
1886
                                xtOpSeqNo op_seq;
 
1887
                                if (!xt_tab_put_log_rec_data(ot, XT_LOG_ENT_REC_MOVED, 0, rec_id, 8, (xtWord1 *) &rec_buffer.re_log_id_2, &op_seq))
 
1888
                                        xt_throw(self);
 
1889
                                tab->tab_co_op_seq = op_seq;
 
1890
 
 
1891
                                /* Only records that were actually moved, count as garbage now!
 
1892
                                 * This means, lost records, remain "lost" as far as the garbage
 
1893
                                 * count is concerned!
 
1894
                                 */
 
1895
                                garbage_count += size;
 
1896
                                freer_(); // xt_unlock_mutex(&db->db_co_ext_lock)
 
1897
                                break;
 
1898
                }
 
1899
                data_log->dlf_start_offset = curr_log_offset;
 
1900
        }
 
1901
 
 
1902
        /* Flush the distination log. */
 
1903
        xt_lock_mutex(self, &db->db_co_dlog_lock);
 
1904
        pushr_(xt_unlock_mutex, &db->db_co_dlog_lock);
 
1905
        if (!self->st_dlog_buf.dlb_flush_log(TRUE, self))
 
1906
                xt_throw(self);
 
1907
        freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)
 
1908
        
 
1909
        /* Flush the transaction log. */
 
1910
        if (!xt_xlog_flush_log(db, self))
 
1911
                xt_throw(self);
 
1912
 
 
1913
        /* Save state in source log header. */
 
1914
        xt_lock_mutex_ns(&db->db_datalogs.dlc_head_lock);
 
1915
        data_log->dlf_garbage_count += garbage_count;
 
1916
        ASSERT(data_log->dlf_garbage_count < data_log->dlf_log_eof);
 
1917
        if (!dl_write_garbage_level(data_log, cs.cs_seqread->sl_seq_open_file(), TRUE, self)) {
 
1918
                xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);
 
1919
                xt_throw(self);
 
1920
        }
 
1921
        xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);
 
1922
 
 
1923
        /* Wait for the writer to write all the changes.
 
1924
         * Then we can start the delete process for the log:
 
1925
         *
 
1926
         * Note, if we do not wait, then it could be some operations are held up,
 
1927
         * by being out of sequence. This could cause the log to be deleted
 
1928
         * before all the operations have been performed (which are on a table
 
1929
         * basis).
 
1930
         *
 
1931
         */
 
1932
        for (;;) {
 
1933
                u_int                   edx;
 
1934
                XTTableEntryPtr tab_ptr;
 
1935
                xtBool                  wait;
 
1936
 
 
1937
                if (self->t_quit) {
 
1938
                        freer_(); // dl_free_compactor_state(&cs)
 
1939
                        return FAILED;
 
1940
                }
 
1941
                wait = FALSE;
 
1942
                xt_ht_lock(self, db->db_tables);
 
1943
                pushr_(xt_ht_unlock, db->db_tables);
 
1944
                xt_enum_tables_init(&edx);
 
1945
                while ((tab_ptr = xt_enum_tables_next(self, db, &edx))) {
 
1946
                        if (tab_ptr->te_table && tab_ptr->te_table->tab_co_op_seq > tab_ptr->te_table->tab_head_op_seq) {
 
1947
                                wait = TRUE;
 
1948
                                break;
 
1949
                        }
 
1950
                }
 
1951
                freer_(); // xt_ht_unlock(db->db_tables)
 
1952
                
 
1953
                if (!wait)
 
1954
                        break;
 
1955
 
 
1956
                /* Nobody will wake me, so check again shortly! */
 
1957
                dl_co_wait(self, db, 1);                
 
1958
        }
 
1959
 
 
1960
        db->db_datalogs.dls_set_log_state(data_log, XT_DL_COMPACTED);
 
1961
 
 
1962
#ifdef DEBUG_LOG_DELETE
 
1963
        printf("-- MARK FOR DELETE IN LOG: %d\n", (int) data_log->dlf_log_id);
 
1964
#endif
 
1965
        /* Log that this log should be deleted on the next checkpoint: */
 
1966
        // transaction log...
 
1967
        XTXactNewLogEntryDRec   log_rec;
 
1968
        log_rec.xl_status_1 = XT_LOG_ENT_DEL_LOG;
 
1969
        log_rec.xl_checksum_1 = XT_CHECKSUM_1(data_log->dlf_log_id);
 
1970
        XT_SET_DISK_4(log_rec.xl_log_id_4, data_log->dlf_log_id);
 
1971
        if (!xt_xlog_log_data(self, sizeof(XTXactNewLogEntryDRec), (XTXactLogBufferDPtr) &log_rec, XT_XLOG_WRITE_AND_FLUSH)) {
 
1972
                db->db_datalogs.dls_set_log_state(data_log, XT_DL_TO_COMPACT);
 
1973
                xt_throw(self);
 
1974
        }
 
1975
 
 
1976
        freer_(); // dl_free_compactor_state(&cs)
 
1977
        return OK;
 
1978
}
 
1979
 
 
1980
static void dl_co_not_busy(XTThreadPtr XT_UNUSED(self), XTDatabaseHPtr db)
 
1981
{
 
1982
        db->db_co_busy = FALSE;
 
1983
}
 
1984
 
 
1985
static void dl_co_main(XTThreadPtr self, xtBool once_off)
 
1986
{
 
1987
        XTDatabaseHPtr          db = self->st_database;
 
1988
        xtLogID                         *log_id_ptr, log_id;
 
1989
        XTDataLogFilePtr        data_log = NULL;
 
1990
 
 
1991
        xt_set_low_priority(self);
 
1992
 
 
1993
        while (!self->t_quit) {
 
1994
                while (!self->t_quit) {
 
1995
                        xt_lock_mutex_ns(&db->db_datalogs.dlc_lock);
 
1996
                        if ((log_id_ptr = (xtLogID *) xt_sl_first_item(db->db_datalogs.dlc_to_compact))) {
 
1997
                                log_id = *log_id_ptr;
 
1998
                        }
 
1999
                        else
 
2000
                                log_id = 0;
 
2001
                        xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
 
2002
                        if (!log_id)
 
2003
                                break;
 
2004
                        if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, FALSE, NULL))
 
2005
                                xt_throw(self);
 
2006
                        ASSERT(data_log);
 
2007
                        if (data_log) {
 
2008
                                db->db_co_busy = TRUE;
 
2009
                                pushr_(dl_co_not_busy, db);
 
2010
                                dl_collect_garbage(self, db, data_log);
 
2011
                                freer_(); // dl_co_not_busy(db)
 
2012
                        }
 
2013
                        else {
 
2014
                                xt_lock_mutex_ns(&db->db_datalogs.dlc_lock);
 
2015
                                xt_sl_delete(self, db->db_datalogs.dlc_to_compact, &log_id);
 
2016
                                xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
 
2017
                        }
 
2018
                }
 
2019
 
 
2020
                if (once_off)
 
2021
                        break;
 
2022
 
 
2023
                /* Wait for a signal that a data log can be collected: */
 
2024
                dl_co_wait(self, db, 120);
 
2025
        }
 
2026
}
 
2027
 
 
2028
static void *dl_run_co_thread(XTThreadPtr self)
 
2029
{
 
2030
        XTDatabaseHPtr  db = (XTDatabaseHPtr) self->t_data;
 
2031
        int                             count;
 
2032
        void                    *mysql_thread;
 
2033
 
 
2034
        if (!(mysql_thread = myxt_create_thread()))
 
2035
                xt_throw(self);
 
2036
 
 
2037
        while (!self->t_quit) {
 
2038
                try_(a) {
 
2039
                        /*
 
2040
                         * The garbage collector requires that the database
 
2041
                         * is in use because.
 
2042
                         */
 
2043
                        xt_use_database(self, db, XT_FOR_COMPACTOR);
 
2044
 
 
2045
                        /* {BACKGROUND-RELEASE-DB} 
 
2046
                         * This action is both safe and required:
 
2047
                         *
 
2048
                         * safe: releasing the database is safe because as
 
2049
                         * long as this thread is running the database
 
2050
                         * reference is valid, and this reference cannot
 
2051
                         * be the only one to the database because
 
2052
                         * otherwize this thread would not be running.
 
2053
                         *
 
2054
                         * required: releasing the database is necessary
 
2055
                         * otherwise we cannot close the database
 
2056
                         * correctly because we only shutdown this
 
2057
                         * thread when the database is closed and we
 
2058
                         * only close the database when all references
 
2059
                         * are removed.
 
2060
                         */
 
2061
                        xt_heap_release(self, self->st_database);
 
2062
 
 
2063
                        dl_co_main(self, FALSE);
 
2064
                }
 
2065
                catch_(a) {
 
2066
                        if (!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
 
2067
                                self->t_exception.e_sys_err == SIGTERM))
 
2068
                                xt_log_and_clear_exception(self);
 
2069
                }
 
2070
                cont_(a);
 
2071
 
 
2072
                /* Avoid releasing the database (done above) */
 
2073
                self->st_database = NULL;
 
2074
                xt_unuse_database(self, self);
 
2075
 
 
2076
                /* After an exception, pause before trying again... */
 
2077
                /* Number of seconds */
 
2078
#ifdef DEBUG
 
2079
                count = 10;
 
2080
#else
 
2081
                count = 2*60;
 
2082
#endif
 
2083
                while (!self->t_quit && count > 0) {
 
2084
                        sleep(1);
 
2085
                        count--;
 
2086
                }
 
2087
        }
 
2088
 
 
2089
   /*
 
2090
        * {MYSQL-THREAD-KILL}
 
2091
        myxt_destroy_thread(mysql_thread, TRUE);
 
2092
        */
 
2093
        return NULL;
 
2094
}
 
2095
 
 
2096
static void dl_free_co_thread(XTThreadPtr self, void *data)
 
2097
{
 
2098
        XTDatabaseHPtr db = (XTDatabaseHPtr) data;
 
2099
 
 
2100
        if (db->db_co_thread) {
 
2101
                xt_lock_mutex(self, &db->db_datalogs.dlc_lock);
 
2102
                pushr_(xt_unlock_mutex, &db->db_datalogs.dlc_lock);
 
2103
                db->db_co_thread = NULL;
 
2104
                freer_(); // xt_unlock_mutex(&db->db_datalogs.dlc_lock)
 
2105
        }
 
2106
}
 
2107
 
 
2108
xtPublic void xt_start_compactor(XTThreadPtr self, XTDatabaseHPtr db)
 
2109
{
 
2110
        char name[PATH_MAX];
 
2111
 
 
2112
        sprintf(name, "GC-%s", xt_last_directory_of_path(db->db_main_path));
 
2113
        xt_remove_dir_char(name);
 
2114
        db->db_co_thread = xt_create_daemon(self, name);
 
2115
        xt_set_thread_data(db->db_co_thread, db, dl_free_co_thread);
 
2116
        xt_run_thread(self, db->db_co_thread, dl_run_co_thread);
 
2117
}
 
2118
 
 
2119
static void dl_wake_co_thread(XTDatabaseHPtr db)
 
2120
{
 
2121
        if (!xt_signal_cond(NULL, &db->db_datalogs.dlc_cond))
 
2122
                xt_log_and_clear_exception_ns();
 
2123
}
 
2124
 
 
2125
xtPublic void xt_stop_compactor(XTThreadPtr self, XTDatabaseHPtr db)
 
2126
{
 
2127
        XTThreadPtr thr_co;
 
2128
 
 
2129
        if (db->db_co_thread) {
 
2130
                xt_lock_mutex(self, &db->db_datalogs.dlc_lock);
 
2131
                pushr_(xt_unlock_mutex, &db->db_datalogs.dlc_lock);
 
2132
 
 
2133
                /* This pointer is safe as long as you have the transaction lock. */
 
2134
                if ((thr_co = db->db_co_thread)) {
 
2135
                        xtThreadID tid = thr_co->t_id;
 
2136
 
 
2137
                        /* Make sure the thread quits when woken up. */
 
2138
                        xt_terminate_thread(self, thr_co);
 
2139
 
 
2140
                        dl_wake_co_thread(db);
 
2141
        
 
2142
                        freer_(); // xt_unlock_mutex(&db->db_datalogs.dlc_lock)
 
2143
 
 
2144
                        /*
 
2145
                         * This seems to kill the whole server sometimes!!
 
2146
                         * SIGTERM is going to a different thread??!
 
2147
                        xt_kill_thread(thread);
 
2148
                         */
 
2149
                        xt_wait_for_thread_to_exit(tid, FALSE);
 
2150
        
 
2151
                        /* PMC - This should not be necessary to set the signal here, but in the
 
2152
                         * debugger the handler is not called!!?
 
2153
                        thr_co->t_delayed_signal = SIGTERM;
 
2154
                        xt_kill_thread(thread);
 
2155
                         */
 
2156
                        db->db_co_thread = NULL;
 
2157
                }
 
2158
                else
 
2159
                        freer_(); // xt_unlock_mutex(&db->db_datalogs.dlc_lock)
 
2160
        }
 
2161
}
 
2162