1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
30
#include "cslib/CSConfig.h"
34
#include "cslib/CSGlobal.h"
35
#include "cslib/CSStrUtil.h"
36
#include "cslib/CSStorage.h"
38
#include "temp_log_ms.h"
39
#include "open_table_ms.h"
40
#include "trans_log_ms.h"
41
#include "transaction_ms.h"
42
#include "parameters_ms.h"
45
// Search the transaction log for a MS_ReferenceTxn record for the given BLOB.
46
// Just search the log file and not the cache. Seaching the cache may be faster but
47
// it would require locks that could block the writers or reader threads and in the worse
48
// case it will still require the reading of the log anyway.
50
// This search doesn't distinguish between transactions that are still running and
51
// transactions that are rolled back.
52
class SearchTXNLog : ReadTXNLog {
54
SearchTXNLog(uint32_t db_id, MSTrans *log): ReadTXNLog(log), st_db_id(db_id) {}
64
virtual bool rl_CanContinue() { return ((!st_found) || !st_terminated);}
65
virtual void rl_Load(uint64_t log_position, MSTransPtr rec)
69
if ( !st_found && (TRANS_TYPE(rec->tr_type) != MS_ReferenceTxn))
73
if ((rec->tr_db_id == st_db_id) && (rec->tr_tab_id == st_tab_id) && (rec->tr_blob_id == st_blob_id)) {
79
st_terminated = TRANS_IS_TERMINATED(rec->tr_type);
81
st_commited = (TRANS_IS_AUTOCOMMIT(rec->tr_type) || (TRANS_TYPE(rec->tr_type) == MS_CommitTxn));
84
bool st_FindBlobRef(bool *committed, uint32_t tab_id, uint64_t blob_id)
87
st_found = st_terminated = st_commited = false;
91
rl_ReadLog(rl_log->txn_GetStartPosition(), false);
92
*committed = st_commited;
97
MSTempLogFile::MSTempLogFile():
104
MSTempLogFile::~MSTempLogFile()
108
myTempLog->release();
111
MSTempLogFile *MSTempLogFile::newTempLogFile(uint32_t id, MSTempLog *temp_log, CSFile *file)
115
if (!(f = new MSTempLogFile())) {
118
CSException::throwOSError(CS_CONTEXT, ENOMEM);
121
f->myTempLog = temp_log;
123
f->myFilePath = file->myFilePath;
124
f->myFilePath->retain();
128
MSTempLog::MSTempLog(uint32_t id, MSDatabase *db, off64_t file_size):
131
myTempLogSize(file_size),
133
myTempLogHeadSize(0),
139
MSTempLog::~MSTempLog()
153
void MSTempLog::deleteLog()
158
CSPath *MSTempLog::getLogPath()
162
cs_strcpy(120, file_name, "bs-logs");
163
cs_add_dir_char(120, file_name);
164
cs_strcat(120, file_name, "temp-");
165
cs_strcat(120, file_name, myLogID);
166
cs_strcat(120, file_name, ".bs");
167
return CSPath::newPath(RETAIN(iLogDatabase->myDatabasePath), file_name);
170
MSTempLogFile *MSTempLog::openTempLog()
178
fh = MSTempLogFile::newTempLogFile(myLogID, this, CSFile::newFile(path));
181
fh->open(CSFile::DEFAULT);
183
fh->open(CSFile::CREATE);
184
if (!myTempLogHeadSize) {
185
MSTempLogHeadRec head;
187
lock_(iLogDatabase->myTempLogArray);
188
/* Check again after locking: */
189
if (!myTempLogHeadSize) {
192
if (fh->read(&head, 0, offsetof(MSTempLogHeadRec, th_reserved_4), 0) < offsetof(MSTempLogHeadRec, th_reserved_4)) {
193
CS_SET_DISK_4(head.th_magic_4, MS_TEMP_LOG_MAGIC);
194
CS_SET_DISK_2(head.th_version_2, MS_TEMP_LOG_VERSION);
195
CS_SET_DISK_2(head.th_head_size_2, MS_TEMP_LOG_HEAD_SIZE);
196
CS_SET_DISK_2(head.th_rec_size_2, sizeof(MSTempLogItemRec));
197
CS_SET_DISK_4(head.th_reserved_4, 0);
198
fh->write(&head, 0, sizeof(MSTempLogHeadRec));
202
/* Check the file header: */
203
if (CS_GET_DISK_4(head.th_magic_4) != MS_TEMP_LOG_MAGIC)
204
CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_BAD_HEADER_MAGIC);
205
if (CS_GET_DISK_2(head.th_version_2) > MS_TEMP_LOG_VERSION)
206
CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_VERSION_TOO_NEW);
208
/* Load the header details: */
209
myTempLogHeadSize = CS_GET_DISK_2(head.th_head_size_2);
210
myTemplogRecSize = CS_GET_DISK_2(head.th_rec_size_2);
212
/* File size, cannot be less than header size, adjust to correct offset: */
213
if (myTempLogSize < myTempLogHeadSize)
214
myTempLogSize = myTempLogHeadSize;
215
if ((rem = (myTempLogSize - myTempLogHeadSize) % myTemplogRecSize))
216
myTempLogSize += myTemplogRecSize - rem;
218
unlock_(iLogDatabase->myTempLogArray);
224
time_t MSTempLog::adjustWaitTime(time_t then, time_t now)
228
if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
229
wait = ((then + PBMSParameters::getTempBlobTimeout() - now) * 1000);
232
else if (wait > 120 * 1000)
242
* ---------------------------------------------------------------
246
MSTempLogThread::MSTempLogThread(time_t wait_time, MSDatabase *db):
247
CSDaemon(wait_time, NULL),
248
iTempLogDatabase(db),
256
void MSTempLogThread::close()
259
iTempLogFile->release();
264
bool MSTempLogThread::doWork()
267
MSTempLogItemRec log_item;
268
CSStringBuffer *buffer;
269
SearchTXNLog txn_log(iTempLogDatabase->myDatabaseID, MSTransactionManager::tm_Log);
272
new_(buffer, CSStringBuffer(20));
274
while (!myMustQuit) {
277
if (!(iTempLogFile = iTempLogDatabase->openTempLogFile(0, &iLogRecSize, &head_size))) {
281
iLogOffset = head_size;
284
tfer = iTempLogFile->read(&log_item, iLogOffset, sizeof(MSTempLogItemRec), 0);
286
/* No more data to be read: */
288
/* Check to see if there is a log after this: */
289
if (iTempLogDatabase->getTempLogCount() <= 1) {
290
/* The next log does not yet exist. We wait for
291
* it to be created before we delete and
292
* close the current log.
294
myWaitTime = PBMSParameters::getTempBlobTimeout() * 1000;
298
iTempLogFile->myTempLog->deleteLog();
299
iTempLogDatabase->removeTempLog(iTempLogFile->myTempLogID);
302
else if (tfer == sizeof(MSTempLogItemRec)) {
303
/* We have a record: */
311
CLOBBER_PROTECT(blob_id);
313
* Items in the temp log are never updated.
314
* If a temp operation is canceled then the object
315
* records this itself and when the temp operation
316
* is attempted it will recognize by the templog
317
* id and offset that it is no longer a valid
320
tab_id = CS_GET_DISK_4(log_item.ti_table_id_4);
322
type = CS_GET_DISK_1(log_item.ti_type_1);
323
blob_id = CS_GET_DISK_6(log_item.ti_blob_id_6);
324
auth_code = CS_GET_DISK_4(log_item.ti_auth_code_4);
325
then = CS_GET_DISK_4(log_item.ti_time_4);
328
if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
329
/* Time has not yet exired, adjust wait time: */
330
myWaitTime = MSTempLog::adjustWaitTime(then, now);
335
/* Release the BLOB reference. */
338
if (type == MS_TL_REPO_REF) {
339
MSRepoFile *repo_file;
341
if ((repo_file = iTempLogDatabase->getRepoFileFromPool(tab_id, true))) {
342
frompool_(repo_file);
343
repo_file->checkBlob(buffer, blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
344
backtopool_(repo_file);
348
if ((otab = MSTableList::getOpenTableByID(iTempLogDatabase->myDatabaseID, tab_id))) {
350
if (type == MS_TL_BLOB_REF) {
351
otab->checkBlob(buffer, blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
355
ASSERT(type == MS_TL_TABLE_REF);
356
if ((type == MS_TL_TABLE_REF) && otab->deleteReferences(iTempLogFile->myTempLogID, iLogOffset, &myMustQuit)) {
357
/* Delete the file now... */
360
MSOpenTablePool *tab_pool;
362
tab = otab->getDBTable();
363
from_path = otab->getDBTable()->getTableFile();
371
tab_pool = MSTableList::lockTablePoolForDeletion(otab); // This returns otab to the pool.
374
from_path->removeFile();
375
tab->myDatabase->removeTable(tab);
377
backtopool_(tab_pool); // The will unlock and close the table pool freeing all tables in it.
378
pop_(tab); // Returning the pool will have released this. (YUK!)
388
int err = self->myException.getErrorCode();
390
if (err == MS_ERR_TABLE_LOCKED) {
393
else if (err == MS_ERR_REMOVING_REPO) {
394
/* Wait for the compactor to finish: */
395
myWaitTime = 2 * 1000;
399
else if ((err == MS_ERR_UNKNOWN_TABLE) || (err == MS_ERR_DATABASE_DELETED))
402
self->myException.log(NULL);
407
// Only part of the data read, don't wait very long to try again:
408
myWaitTime = 2 * 1000;
411
iLogOffset += iLogRecSize;
418
void *MSTempLogThread::completeWork()