1
/* Copyright (c) 2007 SNAP Innovation GmbH
3
* BLOB Streaming for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
31
#include "CSStrUtil.h"
32
#include "CSStorage.h"
34
#include "BSTempLog.h"
35
#include "BSOpenTable.h"
37
u_long BSTempLog::gTempBlobTimeout;
39
BSTempLogFile::BSTempLogFile():
46
BSTempLogFile::~BSTempLogFile()
53
BSTempLogFile *BSTempLogFile::newTempLogFile(u_int id, BSTempLog *temp_log, CSFile *file)
57
if (!(f = new BSTempLogFile())) {
60
CSException::throwOSError(CS_CONTEXT, ENOMEM);
63
f->myTempLog = temp_log;
65
f->myFilePath = file->myFilePath;
66
f->myFilePath->retain();
70
BSTempLog::BSTempLog(u_int id, BSDatabase *db, off_t file_size):
73
myTempLogSize(file_size),
81
BSTempLog::~BSTempLog()
95
void BSTempLog::deleteLog()
100
CSPath *BSTempLog::getLogPath()
104
cs_strcpy(120, file_name, "bs-logs");
105
cs_add_dir_char(120, file_name);
106
cs_strcat(120, file_name, "temp-");
107
cs_strcat(120, file_name, myLogID);
108
cs_strcat(120, file_name, ".bs");
109
iLogDatabase->myDatabasePath->retain();
110
return CSPath::newPath(iLogDatabase->myDatabasePath, file_name);
113
BSTempLogFile *BSTempLog::openTempLog()
121
fh = BSTempLogFile::newTempLogFile(myLogID, this, CSFile::newFile(path));
124
fh->open(CSFile::DEFAULT);
126
fh->open(CSFile::CREATE);
127
if (!myTempLogHeadSize) {
128
BSTempLogHeadRec head;
130
lock_(iLogDatabase->myTempLogArray);
131
/* Check again after locking: */
132
if (!myTempLogHeadSize) {
135
if (fh->read(&head, 0, offsetof(BSTempLogHeadRec, th_reserved_4), 0) < offsetof(BSTempLogHeadRec, th_reserved_4)) {
136
CS_SET_DISK_4(head.th_magic_4, BS_TEMP_LOG_MAGIC);
137
CS_SET_DISK_2(head.th_version_2, BS_TEMP_LOG_VERSION);
138
CS_SET_DISK_2(head.th_head_size_2, BS_TEMP_LOG_HEAD_SIZE);
139
CS_SET_DISK_2(head.th_rec_size_2, sizeof(BSTempLogItemRec));
140
CS_SET_DISK_4(head.th_reserved_4, 0);
141
fh->write(&head, 0, sizeof(BSTempLogHeadRec));
145
/* Check the file header: */
146
if (CS_GET_DISK_4(head.th_magic_4) != BS_TEMP_LOG_MAGIC)
147
CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_BAD_HEADER_MAGIC);
148
if (CS_GET_DISK_2(head.th_version_2) > BS_TEMP_LOG_VERSION)
149
CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_VERSION_TOO_NEW);
151
/* Load the header details: */
152
myTempLogHeadSize = CS_GET_DISK_2(head.th_head_size_2);
153
myTemplogRecSize = CS_GET_DISK_2(head.th_rec_size_2);
155
/* File size, cannot be less than header size, adjust to correct offset: */
156
if (myTempLogSize < myTempLogHeadSize)
157
myTempLogSize = myTempLogHeadSize;
158
if ((rem = (myTempLogSize - myTempLogHeadSize) % myTemplogRecSize))
159
myTempLogSize += myTemplogRecSize - rem;
161
unlock_(iLogDatabase->myTempLogArray);
167
time_t BSTempLog::adjustWaitTime(time_t then, time_t now)
171
if (now < then + gTempBlobTimeout) {
172
wait = ((then + gTempBlobTimeout - now) * 1000);
175
else if (wait > 120 * 1000)
185
* ---------------------------------------------------------------
189
BSTempLogThread::BSTempLogThread(time_t wait_time, BSDatabase *db):
190
CSDaemon(wait_time, NULL),
191
iTempLogDatabase(db),
198
BSTempLogThread::~BSTempLogThread()
203
void BSTempLogThread::close()
206
iTempLogFile->release();
211
bool BSTempLogThread::doWork()
214
BSTempLogItemRec log_item;
215
CSStringBuffer *buffer;
218
new_(buffer, CSStringBuffer(20));
220
while (!myMustQuit) {
223
if (!(iTempLogFile = iTempLogDatabase->openTempLogFile(0, &iLogRecSize, &head_size))) {
227
iLogOffset = head_size;
230
tfer = iTempLogFile->read(&log_item, iLogOffset, sizeof(BSTempLogItemRec), 0);
232
/* No more data to be read: */
234
/* Check to see if there is a log after this: */
235
if (iTempLogDatabase->getTempLogCount() <= 1) {
236
/* The next log does not yet exist. We wait for
237
* it to be created before we delete and
238
* close the current log.
240
myWaitTime = BSTempLog::gTempBlobTimeout * 1000;
244
iTempLogFile->myTempLog->deleteLog();
245
iTempLogDatabase->removeTempLog(iTempLogFile->myTempLogID);
248
else if (tfer == sizeof(BSTempLogItemRec)) {
249
/* We have a record: */
258
* Table ID is set to zero, if the BLOB is referenced.
259
* This cancels the removal.
261
type = CS_GET_DISK_1(log_item.ti_type_1);
262
tab_id = CS_GET_DISK_4(log_item.ti_table_id_4);
263
blob_id = CS_GET_DISK_6(log_item.ti_blob_id_6);
264
auth_code = CS_GET_DISK_4(log_item.ti_auth_code_4);
265
then = CS_GET_DISK_4(log_item.ti_time_4);
268
if (now < then + BSTempLog::gTempBlobTimeout) {
269
/* Time has not yet exired, adjust wait time: */
270
myWaitTime = BSTempLog::adjustWaitTime(then, now);
275
/* Release the BLOB reference. */
278
if (type == BS_TL_REPO_REF) {
279
BSRepoFile *repo_file;
281
if ((repo_file = iTempLogDatabase->getRepoFileFromPool(tab_id, true))) {
282
frompool_(repo_file);
283
repo_file->freeBlob(buffer, blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
284
backtopool_(repo_file);
288
if ((otab = BSTableList::getOpenTable(iTempLogDatabase->getDatabaseNameCString(), tab_id, true))) {
290
if (type == BS_TL_BLOB_REF) {
291
otab->releaseReference(blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
295
if (otab->deleteReferences(iTempLogFile->myTempLogID, iLogOffset, &myMustQuit)) {
296
/* Delete the file now... */
299
BSOpenTablePool *tab_pool;
301
tab = otab->getDBTable();
302
from_path = otab->getDBTable()->getTablePath();
310
tab_pool = BSTableList::lockTablePool(otab);
313
from_path->removeFile();
314
tab->myDatabase->removeTable(tab);
316
backtopool_(tab_pool); // The will unlock and close the table pool.
327
int err = self->myException.getErrorCode();
329
if (err == BS_ERR_TABLE_LOCKED) {
332
else if (err == BS_ERR_REMOVING_REPO) {
333
/* Wait for the compactor to finish: */
334
myWaitTime = 2 * 1000;
338
else if (err == BS_ERR_UNKNOWN_TABLE)
341
self->myException.log(NULL);
346
// Only part of the data read, don't wait very long to try again:
347
myWaitTime = 2 * 1000;
350
iLogOffset += iLogRecSize;
357
void *BSTempLogThread::finalize()