~pbms-core/pbms/async_read

« back to all changes in this revision

Viewing changes to mybs/src/BSTempLog.cc

  • Committer: paul-mccullagh
  • Date: 2008-03-26 11:35:17 UTC
  • Revision ID: paul-mccullagh-afb1610c21464a577ae428d72fc725eb986c05a5
Initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2007 SNAP Innovation GmbH
 
2
 *
 
3
 * BLOB Streaming for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Paul McCullagh
 
20
 *
 
21
 * 2007-07-03
 
22
 *
 
23
 * H&G2JCtL
 
24
 *
 
25
 * Network interface.
 
26
 *
 
27
 */
 
28
 
 
29
#include "CSConfig.h"
 
30
#include "CSGlobal.h"
 
31
#include "CSStrUtil.h"
 
32
#include "CSStorage.h"
 
33
 
 
34
#include "BSTempLog.h"
 
35
#include "BSOpenTable.h"
 
36
 
 
37
u_long                          BSTempLog::gTempBlobTimeout;
 
38
 
 
39
BSTempLogFile::BSTempLogFile():
 
40
CSReadBufferedFile(),
 
41
myTempLogID(0),
 
42
myTempLog(NULL)
 
43
{
 
44
}
 
45
 
 
46
BSTempLogFile::~BSTempLogFile()
 
47
{
 
48
        close();
 
49
        if (myTempLog)
 
50
                myTempLog->release();
 
51
}
 
52
 
 
53
BSTempLogFile *BSTempLogFile::newTempLogFile(u_int id, BSTempLog *temp_log, CSFile *file)
 
54
{
 
55
        BSTempLogFile *f;
 
56
        
 
57
        if (!(f = new BSTempLogFile())) {
 
58
                temp_log->release();
 
59
                file->release();
 
60
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
61
        }
 
62
        f->myTempLogID = id;
 
63
        f->myTempLog = temp_log;
 
64
        f->myFile = file;
 
65
        f->myFilePath = file->myFilePath;
 
66
        f->myFilePath->retain();
 
67
        return f;
 
68
}
 
69
 
 
70
BSTempLog::BSTempLog(u_int id, BSDatabase *db, off_t file_size):
 
71
CSRefObject(),
 
72
myLogID(id),
 
73
myTempLogSize(file_size),
 
74
myTemplogRecSize(0),
 
75
myTempLogHeadSize(0),
 
76
iLogDatabase(db),
 
77
iDeleteLog(false)
 
78
{
 
79
}
 
80
 
 
81
BSTempLog::~BSTempLog()
 
82
{
 
83
        enter_();
 
84
        if (iDeleteLog) {
 
85
                CSPath *path;
 
86
 
 
87
                path = getLogPath();
 
88
                push_(path);
 
89
                path->removeFile();
 
90
                release_(path);
 
91
        }
 
92
        exit_();
 
93
}
 
94
 
 
95
void BSTempLog::deleteLog()
 
96
{
 
97
        iDeleteLog = true;
 
98
}
 
99
 
 
100
CSPath *BSTempLog::getLogPath()
 
101
{
 
102
        char file_name[120];
 
103
 
 
104
        cs_strcpy(120, file_name, "bs-logs");
 
105
        cs_add_dir_char(120, file_name);
 
106
        cs_strcat(120, file_name, "temp-");
 
107
        cs_strcat(120, file_name, myLogID);
 
108
        cs_strcat(120, file_name, ".bs");
 
109
        iLogDatabase->myDatabasePath->retain();
 
110
        return CSPath::newPath(iLogDatabase->myDatabasePath, file_name);
 
111
}
 
112
 
 
113
BSTempLogFile *BSTempLog::openTempLog()
 
114
{
 
115
        CSPath                  *path;
 
116
        BSTempLogFile   *fh;
 
117
 
 
118
        enter_();
 
119
        path = getLogPath();
 
120
        retain();
 
121
        fh = BSTempLogFile::newTempLogFile(myLogID, this, CSFile::newFile(path));
 
122
        push_(fh);
 
123
        if (myTempLogSize)
 
124
                fh->open(CSFile::DEFAULT);
 
125
        else
 
126
                fh->open(CSFile::CREATE);
 
127
        if (!myTempLogHeadSize) {
 
128
                BSTempLogHeadRec        head;
 
129
 
 
130
                lock_(iLogDatabase->myTempLogArray);
 
131
                /* Check again after locking: */
 
132
                if (!myTempLogHeadSize) {
 
133
                        size_t rem;
 
134
 
 
135
                        if (fh->read(&head, 0, offsetof(BSTempLogHeadRec, th_reserved_4), 0) < offsetof(BSTempLogHeadRec, th_reserved_4)) {
 
136
                                CS_SET_DISK_4(head.th_magic_4, BS_TEMP_LOG_MAGIC);
 
137
                                CS_SET_DISK_2(head.th_version_2, BS_TEMP_LOG_VERSION);
 
138
                                CS_SET_DISK_2(head.th_head_size_2, BS_TEMP_LOG_HEAD_SIZE);
 
139
                                CS_SET_DISK_2(head.th_rec_size_2, sizeof(BSTempLogItemRec));
 
140
                                CS_SET_DISK_4(head.th_reserved_4, 0);
 
141
                                fh->write(&head, 0, sizeof(BSTempLogHeadRec));
 
142
                                fh->flush();
 
143
                        }
 
144
                        
 
145
                        /* Check the file header: */
 
146
                        if (CS_GET_DISK_4(head.th_magic_4) != BS_TEMP_LOG_MAGIC)
 
147
                                CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_BAD_HEADER_MAGIC);
 
148
                        if (CS_GET_DISK_2(head.th_version_2) > BS_TEMP_LOG_VERSION)
 
149
                                CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_VERSION_TOO_NEW);
 
150
 
 
151
                        /* Load the header details: */
 
152
                        myTempLogHeadSize = CS_GET_DISK_2(head.th_head_size_2);
 
153
                        myTemplogRecSize = CS_GET_DISK_2(head.th_rec_size_2);
 
154
 
 
155
                        /* File size, cannot be less than header size, adjust to correct offset: */
 
156
                        if (myTempLogSize < myTempLogHeadSize)
 
157
                                myTempLogSize = myTempLogHeadSize;
 
158
                        if ((rem = (myTempLogSize - myTempLogHeadSize) % myTemplogRecSize))
 
159
                                myTempLogSize += myTemplogRecSize - rem;
 
160
                }
 
161
                unlock_(iLogDatabase->myTempLogArray);
 
162
        }
 
163
        pop_(fh);
 
164
        return_(fh);
 
165
}
 
166
 
 
167
time_t BSTempLog::adjustWaitTime(time_t then, time_t now)
 
168
{
 
169
        time_t wait;
 
170
 
 
171
        if (now < then + gTempBlobTimeout) {
 
172
                wait = ((then + gTempBlobTimeout - now) * 1000);
 
173
                if (wait < 2000)
 
174
                        wait = 2000;
 
175
                else if (wait > 120 * 1000)
 
176
                        wait = 120 * 1000;
 
177
        }
 
178
        else
 
179
                wait = 1;
 
180
                        
 
181
        return wait;
 
182
}
 
183
 
 
184
/*
 
185
 * ---------------------------------------------------------------
 
186
 * TEMP LOG THREAD
 
187
 */
 
188
 
 
189
BSTempLogThread::BSTempLogThread(time_t wait_time, BSDatabase *db):
 
190
CSDaemon(wait_time, NULL),
 
191
iTempLogDatabase(db),
 
192
iTempLogFile(NULL),
 
193
iLogRecSize(0),
 
194
iLogOffset(0)
 
195
{
 
196
}
 
197
 
 
198
BSTempLogThread::~BSTempLogThread()
 
199
{
 
200
        close();
 
201
}
 
202
 
 
203
void BSTempLogThread::close()
 
204
{
 
205
        if (iTempLogFile) {
 
206
                iTempLogFile->release();
 
207
                iTempLogFile = NULL;
 
208
        }
 
209
}
 
210
 
 
211
bool BSTempLogThread::doWork()
 
212
{
 
213
        size_t                          tfer;
 
214
        BSTempLogItemRec        log_item;
 
215
        CSStringBuffer          *buffer;
 
216
 
 
217
        enter_();
 
218
        new_(buffer, CSStringBuffer(20));
 
219
        push_(buffer);
 
220
        while (!myMustQuit) {
 
221
                if (!iTempLogFile) {
 
222
                        size_t head_size;
 
223
                        if (!(iTempLogFile = iTempLogDatabase->openTempLogFile(0, &iLogRecSize, &head_size))) {
 
224
                                release_(buffer);
 
225
                                return_(true);
 
226
                        }
 
227
                        iLogOffset = head_size;
 
228
                }
 
229
 
 
230
                tfer = iTempLogFile->read(&log_item, iLogOffset, sizeof(BSTempLogItemRec), 0);
 
231
                if (tfer == 0) {
 
232
                        /* No more data to be read: */
 
233
 
 
234
                        /* Check to see if there is a log after this: */
 
235
                        if (iTempLogDatabase->getTempLogCount() <= 1) {
 
236
                                /* The next log does not yet exist. We wait for
 
237
                                 * it to be created before we delete and
 
238
                                 * close the current log.
 
239
                                 */
 
240
                                myWaitTime = BSTempLog::gTempBlobTimeout * 1000;
 
241
                                break;
 
242
                        }
 
243
 
 
244
                        iTempLogFile->myTempLog->deleteLog();
 
245
                        iTempLogDatabase->removeTempLog(iTempLogFile->myTempLogID);
 
246
                        close();
 
247
                }
 
248
                else if (tfer == sizeof(BSTempLogItemRec)) {
 
249
                        /* We have a record: */
 
250
                        int             type;
 
251
                        csWord4 tab_id;
 
252
                        csWord8 blob_id;
 
253
                        csWord4 auth_code;
 
254
                        csWord4 then;
 
255
                        time_t  now;
 
256
 
 
257
                        /*
 
258
                         * Table ID is set to zero, if the BLOB is referenced.
 
259
                         * This cancels the removal.
 
260
                         */
 
261
                        type = CS_GET_DISK_1(log_item.ti_type_1);
 
262
                        tab_id = CS_GET_DISK_4(log_item.ti_table_id_4);
 
263
                        blob_id = CS_GET_DISK_6(log_item.ti_blob_id_6);
 
264
                        auth_code = CS_GET_DISK_4(log_item.ti_auth_code_4);
 
265
                        then = CS_GET_DISK_4(log_item.ti_time_4);
 
266
 
 
267
                        now = time(NULL);
 
268
                        if (now < then + BSTempLog::gTempBlobTimeout) {
 
269
                                /* Time has not yet exired, adjust wait time: */
 
270
                                myWaitTime = BSTempLog::adjustWaitTime(then, now);
 
271
                                break;
 
272
                        }
 
273
 
 
274
                        try_(a) {
 
275
                                /* Release the BLOB reference. */
 
276
                                BSOpenTable *otab;
 
277
 
 
278
                                if (type == BS_TL_REPO_REF) {
 
279
                                        BSRepoFile      *repo_file;
 
280
 
 
281
                                        if ((repo_file = iTempLogDatabase->getRepoFileFromPool(tab_id, true))) {
 
282
                                                frompool_(repo_file);
 
283
                                                repo_file->freeBlob(buffer, blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
 
284
                                                backtopool_(repo_file);
 
285
                                        }
 
286
                                }
 
287
                                else {
 
288
                                        if ((otab = BSTableList::getOpenTable(iTempLogDatabase->getDatabaseNameCString(), tab_id, true))) {
 
289
                                                frompool_(otab);
 
290
                                                if (type == BS_TL_BLOB_REF) {
 
291
                                                        otab->releaseReference(blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
 
292
                                                        backtopool_(otab);
 
293
                                                }
 
294
                                                else {
 
295
                                                        if (otab->deleteReferences(iTempLogFile->myTempLogID, iLogOffset, &myMustQuit)) {
 
296
                                                                /* Delete the file now... */
 
297
                                                                BSTable                 *tab;
 
298
                                                                CSPath                  *from_path;
 
299
                                                                BSOpenTablePool *tab_pool;
 
300
 
 
301
                                                                tab = otab->getDBTable();
 
302
                                                                from_path = otab->getDBTable()->getTablePath();
 
303
 
 
304
                                                                pop_(otab);
 
305
 
 
306
                                                                push_(from_path);
 
307
                                                                tab->retain();
 
308
                                                                push_(tab);
 
309
 
 
310
                                                                tab_pool = BSTableList::lockTablePool(otab);
 
311
                                                                frompool_(tab_pool);
 
312
 
 
313
                                                                from_path->removeFile();
 
314
                                                                tab->myDatabase->removeTable(tab);
 
315
 
 
316
                                                                backtopool_(tab_pool); // The will unlock and close the table pool.
 
317
                                                                release_(tab);
 
318
                                                                release_(from_path);
 
319
                                                        }
 
320
                                                        else 
 
321
                                                                backtopool_(otab);
 
322
                                                }
 
323
                                        }
 
324
                                }
 
325
                        }
 
326
                        catch_(a) {
 
327
                                int err = self->myException.getErrorCode();
 
328
                                
 
329
                                if (err == BS_ERR_TABLE_LOCKED) {
 
330
                                        throw_();
 
331
                                }
 
332
                                else if (err == BS_ERR_REMOVING_REPO) {
 
333
                                        /* Wait for the compactor to finish: */
 
334
                                        myWaitTime = 2 * 1000;
 
335
                                        release_(buffer);
 
336
                                        return_(true);
 
337
                                }
 
338
                                else if (err == BS_ERR_UNKNOWN_TABLE) 
 
339
                                        ;
 
340
                                else
 
341
                                        self->myException.log(NULL);
 
342
                        }
 
343
                        cont_(a);
 
344
                }
 
345
                else {
 
346
                        // Only part of the data read, don't wait very long to try again:
 
347
                        myWaitTime = 2 * 1000;
 
348
                        break;
 
349
                }
 
350
                iLogOffset += iLogRecSize;
 
351
        }
 
352
 
 
353
        release_(buffer);
 
354
        return_(true);
 
355
}
 
356
 
 
357
void *BSTempLogThread::finalize()
 
358
{
 
359
        close();
 
360
        return NULL;
 
361
};
 
362