~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pbms/src/temp_log_ms.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-10-02 14:17:48 UTC
  • mfrom: (1.1.1 upstream)
  • mto: (2.1.17 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101002141748-m6vbfbfjhrw1153e
Tags: 2010.09.1802-1
* New upstream release.
* Removed pid-file argument hack.
* Updated GPL-2 address to be new address.
* Directly copy in drizzledump.1 since debian doesn't have sphinx 1.0 yet.
* Link to jquery from libjs-jquery. Add it as a depend.
* Add drizzled.8 symlink to the install files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original author: Paul McCullagh
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-07-03
 
23
 *
 
24
 * H&G2JCtL
 
25
 *
 
26
 * Network interface.
 
27
 *
 
28
 */
 
29
 
 
30
#include "cslib/CSConfig.h"
 
31
 
 
32
#include <stddef.h>
 
33
 
 
34
#include "cslib/CSGlobal.h"
 
35
#include "cslib/CSStrUtil.h"
 
36
#include "cslib/CSStorage.h"
 
37
 
 
38
#include "temp_log_ms.h"
 
39
#include "open_table_ms.h"
 
40
#include "trans_log_ms.h"
 
41
#include "transaction_ms.h"
 
42
#include "parameters_ms.h"
 
43
 
 
44
 
 
45
// Search the transaction log for a MS_ReferenceTxn record for the given BLOB.
 
46
// Just search the log file and not the cache. Seaching the cache may be faster but
 
47
// it would require locks that could block the writers or reader threads and in the worse
 
48
// case it will still require the reading of the log anyway.
 
49
//
 
50
// This search doesn't distinguish between transactions that are still running and
 
51
// transactions that are rolled back.
 
52
class SearchTXNLog : ReadTXNLog {
 
53
        public:
 
54
        SearchTXNLog(uint32_t db_id, MSTrans *log): ReadTXNLog(log), st_db_id(db_id) {}
 
55
        
 
56
        bool    st_found;
 
57
        bool    st_terminated;
 
58
        bool    st_commited;
 
59
        uint32_t st_tid; 
 
60
        uint32_t st_db_id; 
 
61
        uint32_t st_tab_id; 
 
62
        uint64_t st_blob_id;
 
63
        
 
64
        virtual bool rl_CanContinue() { return ((!st_found) || !st_terminated);}
 
65
        virtual void rl_Load(uint64_t log_position, MSTransPtr rec) 
 
66
        {
 
67
                (void) log_position;
 
68
                
 
69
                if ( !st_found && (TRANS_TYPE(rec->tr_type) != MS_ReferenceTxn))
 
70
                        return;
 
71
                
 
72
                if (!st_found) {
 
73
                        if  ((rec->tr_db_id == st_db_id) && (rec->tr_tab_id == st_tab_id) && (rec->tr_blob_id == st_blob_id)) {
 
74
                                st_found = true;
 
75
                                st_tid = rec->tr_id;
 
76
                        } else
 
77
                                return;
 
78
                }
 
79
                st_terminated = TRANS_IS_TERMINATED(rec->tr_type);
 
80
                if (st_terminated)
 
81
                        st_commited = (TRANS_IS_AUTOCOMMIT(rec->tr_type) || (TRANS_TYPE(rec->tr_type) == MS_CommitTxn));
 
82
        }
 
83
        
 
84
        bool st_FindBlobRef(bool *committed, uint32_t tab_id, uint64_t blob_id)
 
85
        {
 
86
                enter_();
 
87
                st_found = st_terminated = st_commited = false;
 
88
                st_tab_id = tab_id;
 
89
                st_blob_id = blob_id;   
 
90
                
 
91
                rl_ReadLog(rl_log->txn_GetStartPosition(), false);
 
92
                *committed = st_commited;
 
93
                return_(st_found);
 
94
        }
 
95
};
 
96
 
 
97
MSTempLogFile::MSTempLogFile():
 
98
CSReadBufferedFile(),
 
99
myTempLogID(0),
 
100
myTempLog(NULL)
 
101
{
 
102
}
 
103
 
 
104
MSTempLogFile::~MSTempLogFile()
 
105
{
 
106
        close();
 
107
        if (myTempLog)
 
108
                myTempLog->release();
 
109
}
 
110
 
 
111
MSTempLogFile *MSTempLogFile::newTempLogFile(uint32_t id, MSTempLog *temp_log, CSFile *file)
 
112
{
 
113
        MSTempLogFile *f;
 
114
        
 
115
        if (!(f = new MSTempLogFile())) {
 
116
                temp_log->release();
 
117
                file->release();
 
118
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
119
        }
 
120
        f->myTempLogID = id;
 
121
        f->myTempLog = temp_log;
 
122
        f->myFile = file;
 
123
        f->myFilePath = file->myFilePath;
 
124
        f->myFilePath->retain();
 
125
        return f;
 
126
}
 
127
 
 
128
MSTempLog::MSTempLog(uint32_t id, MSDatabase *db, off64_t file_size):
 
129
CSRefObject(),
 
130
myLogID(id),
 
131
myTempLogSize(file_size),
 
132
myTemplogRecSize(0),
 
133
myTempLogHeadSize(0),
 
134
iLogDatabase(db),
 
135
iDeleteLog(false)
 
136
{
 
137
}
 
138
 
 
139
MSTempLog::~MSTempLog()
 
140
{
 
141
        enter_();
 
142
        if (iDeleteLog) {
 
143
                CSPath *path;
 
144
 
 
145
                path = getLogPath();
 
146
                push_(path);
 
147
                path->removeFile();
 
148
                release_(path);
 
149
        }
 
150
        exit_();
 
151
}
 
152
 
 
153
void MSTempLog::deleteLog()
 
154
{
 
155
        iDeleteLog = true;
 
156
}
 
157
 
 
158
CSPath *MSTempLog::getLogPath()
 
159
{
 
160
        char file_name[120];
 
161
 
 
162
        cs_strcpy(120, file_name, "bs-logs");
 
163
        cs_add_dir_char(120, file_name);
 
164
        cs_strcat(120, file_name, "temp-");
 
165
        cs_strcat(120, file_name, myLogID);
 
166
        cs_strcat(120, file_name, ".bs");
 
167
        return CSPath::newPath(RETAIN(iLogDatabase->myDatabasePath), file_name);
 
168
}
 
169
 
 
170
MSTempLogFile *MSTempLog::openTempLog()
 
171
{
 
172
        CSPath                  *path;
 
173
        MSTempLogFile   *fh;
 
174
 
 
175
        enter_();
 
176
        path = getLogPath();
 
177
        retain();
 
178
        fh = MSTempLogFile::newTempLogFile(myLogID, this, CSFile::newFile(path));
 
179
        push_(fh);
 
180
        if (myTempLogSize)
 
181
                fh->open(CSFile::DEFAULT);
 
182
        else
 
183
                fh->open(CSFile::CREATE);
 
184
        if (!myTempLogHeadSize) {
 
185
                MSTempLogHeadRec        head;
 
186
 
 
187
                lock_(iLogDatabase->myTempLogArray);
 
188
                /* Check again after locking: */
 
189
                if (!myTempLogHeadSize) {
 
190
                        size_t rem;
 
191
 
 
192
                        if (fh->read(&head, 0, offsetof(MSTempLogHeadRec, th_reserved_4), 0) < offsetof(MSTempLogHeadRec, th_reserved_4)) {
 
193
                                CS_SET_DISK_4(head.th_magic_4, MS_TEMP_LOG_MAGIC);
 
194
                                CS_SET_DISK_2(head.th_version_2, MS_TEMP_LOG_VERSION);
 
195
                                CS_SET_DISK_2(head.th_head_size_2, MS_TEMP_LOG_HEAD_SIZE);
 
196
                                CS_SET_DISK_2(head.th_rec_size_2, sizeof(MSTempLogItemRec));
 
197
                                CS_SET_DISK_4(head.th_reserved_4, 0);
 
198
                                fh->write(&head, 0, sizeof(MSTempLogHeadRec));
 
199
                                fh->flush();
 
200
                        }
 
201
                        
 
202
                        /* Check the file header: */
 
203
                        if (CS_GET_DISK_4(head.th_magic_4) != MS_TEMP_LOG_MAGIC)
 
204
                                CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_BAD_HEADER_MAGIC);
 
205
                        if (CS_GET_DISK_2(head.th_version_2) > MS_TEMP_LOG_VERSION)
 
206
                                CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_VERSION_TOO_NEW);
 
207
 
 
208
                        /* Load the header details: */
 
209
                        myTempLogHeadSize = CS_GET_DISK_2(head.th_head_size_2);
 
210
                        myTemplogRecSize = CS_GET_DISK_2(head.th_rec_size_2);
 
211
 
 
212
                        /* File size, cannot be less than header size, adjust to correct offset: */
 
213
                        if (myTempLogSize < myTempLogHeadSize)
 
214
                                myTempLogSize = myTempLogHeadSize;
 
215
                        if ((rem = (myTempLogSize - myTempLogHeadSize) % myTemplogRecSize))
 
216
                                myTempLogSize += myTemplogRecSize - rem;
 
217
                }
 
218
                unlock_(iLogDatabase->myTempLogArray);
 
219
        }
 
220
        pop_(fh);
 
221
        return_(fh);
 
222
}
 
223
 
 
224
time_t MSTempLog::adjustWaitTime(time_t then, time_t now)
 
225
{
 
226
        time_t wait;
 
227
 
 
228
        if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
 
229
                wait = ((then + PBMSParameters::getTempBlobTimeout() - now) * 1000);
 
230
                if (wait < 2000)
 
231
                        wait = 2000;
 
232
                else if (wait > 120 * 1000)
 
233
                        wait = 120 * 1000;
 
234
        }
 
235
        else
 
236
                wait = 1;
 
237
                        
 
238
        return wait;
 
239
}
 
240
 
 
241
/*
 
242
 * ---------------------------------------------------------------
 
243
 * TEMP LOG THREAD
 
244
 */
 
245
 
 
246
MSTempLogThread::MSTempLogThread(time_t wait_time, MSDatabase *db):
 
247
CSDaemon(wait_time, NULL),
 
248
iTempLogDatabase(db),
 
249
iTempLogFile(NULL),
 
250
iLogRecSize(0),
 
251
iLogOffset(0)
 
252
{
 
253
}
 
254
 
 
255
 
 
256
void MSTempLogThread::close()
 
257
{
 
258
        if (iTempLogFile) {
 
259
                iTempLogFile->release();
 
260
                iTempLogFile = NULL;
 
261
        }
 
262
}
 
263
 
 
264
bool MSTempLogThread::doWork()
 
265
{
 
266
        size_t                          tfer;
 
267
        MSTempLogItemRec        log_item;
 
268
        CSStringBuffer          *buffer;
 
269
        SearchTXNLog            txn_log(iTempLogDatabase->myDatabaseID, MSTransactionManager::tm_Log);
 
270
 
 
271
        enter_();
 
272
        new_(buffer, CSStringBuffer(20));
 
273
        push_(buffer);
 
274
        while (!myMustQuit) {
 
275
                if (!iTempLogFile) {
 
276
                        size_t head_size;
 
277
                        if (!(iTempLogFile = iTempLogDatabase->openTempLogFile(0, &iLogRecSize, &head_size))) {
 
278
                                release_(buffer);
 
279
                                return_(true);
 
280
                        }
 
281
                        iLogOffset = head_size;
 
282
                }
 
283
 
 
284
                tfer = iTempLogFile->read(&log_item, iLogOffset, sizeof(MSTempLogItemRec), 0);
 
285
                if (tfer == 0) {
 
286
                        /* No more data to be read: */
 
287
 
 
288
                        /* Check to see if there is a log after this: */
 
289
                        if (iTempLogDatabase->getTempLogCount() <= 1) {
 
290
                                /* The next log does not yet exist. We wait for
 
291
                                 * it to be created before we delete and
 
292
                                 * close the current log.
 
293
                                 */
 
294
                                myWaitTime = PBMSParameters::getTempBlobTimeout() * 1000;
 
295
                                break;
 
296
                        }
 
297
 
 
298
                        iTempLogFile->myTempLog->deleteLog();
 
299
                        iTempLogDatabase->removeTempLog(iTempLogFile->myTempLogID);
 
300
                        close();
 
301
                }
 
302
                else if (tfer == sizeof(MSTempLogItemRec)) {
 
303
                        /* We have a record: */
 
304
                        int             type;
 
305
                        uint32_t tab_id;
 
306
                        uint64_t blob_id= 0;
 
307
                        uint32_t auth_code;
 
308
                        uint32_t then;
 
309
                        time_t  now;
 
310
 
 
311
                        CLOBBER_PROTECT(blob_id);
 
312
                        /*
 
313
                         * Items in the temp log are never updated.
 
314
                         * If a temp operation is canceled then the object 
 
315
                         * records this itself and when the temp operation 
 
316
                         * is attempted it will recognize by the templog
 
317
                         * id and offset that it is no longer a valid 
 
318
                         * operation.
 
319
                         */
 
320
                        tab_id = CS_GET_DISK_4(log_item.ti_table_id_4);
 
321
                                
 
322
                        type = CS_GET_DISK_1(log_item.ti_type_1);
 
323
                        blob_id = CS_GET_DISK_6(log_item.ti_blob_id_6);
 
324
                        auth_code = CS_GET_DISK_4(log_item.ti_auth_code_4);
 
325
                        then = CS_GET_DISK_4(log_item.ti_time_4);
 
326
 
 
327
                        now = time(NULL);
 
328
                        if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
 
329
                                /* Time has not yet exired, adjust wait time: */
 
330
                                myWaitTime = MSTempLog::adjustWaitTime(then, now);
 
331
                                break;
 
332
                        }
 
333
 
 
334
                        try_(a) {
 
335
                                /* Release the BLOB reference. */
 
336
                                MSOpenTable *otab;
 
337
 
 
338
                                if (type == MS_TL_REPO_REF) {
 
339
                                        MSRepoFile      *repo_file;
 
340
 
 
341
                                        if ((repo_file = iTempLogDatabase->getRepoFileFromPool(tab_id, true))) {
 
342
                                                frompool_(repo_file);
 
343
                                                repo_file->checkBlob(buffer, blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
 
344
                                                backtopool_(repo_file);
 
345
                                        }
 
346
                                }
 
347
                                else {
 
348
                                        if ((otab = MSTableList::getOpenTableByID(iTempLogDatabase->myDatabaseID, tab_id))) {
 
349
                                                frompool_(otab);
 
350
                                                if (type == MS_TL_BLOB_REF) {
 
351
                                                        otab->checkBlob(buffer, blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
 
352
                                                        backtopool_(otab);
 
353
                                                }
 
354
                                                else {
 
355
                                                        ASSERT(type == MS_TL_TABLE_REF);
 
356
                                                        if ((type == MS_TL_TABLE_REF) && otab->deleteReferences(iTempLogFile->myTempLogID, iLogOffset, &myMustQuit)) {
 
357
                                                                /* Delete the file now... */
 
358
                                                                MSTable                 *tab;
 
359
                                                                CSPath                  *from_path;
 
360
                                                                MSOpenTablePool *tab_pool;
 
361
 
 
362
                                                                tab = otab->getDBTable();
 
363
                                                                from_path = otab->getDBTable()->getTableFile();
 
364
 
 
365
                                                                pop_(otab);
 
366
 
 
367
                                                                push_(from_path);
 
368
                                                                tab->retain();
 
369
                                                                push_(tab);
 
370
 
 
371
                                                                tab_pool = MSTableList::lockTablePoolForDeletion(otab); // This returns otab to the pool.
 
372
                                                                frompool_(tab_pool);
 
373
 
 
374
                                                                from_path->removeFile();
 
375
                                                                tab->myDatabase->removeTable(tab);
 
376
 
 
377
                                                                backtopool_(tab_pool); // The will unlock and close the table pool freeing all tables in it.
 
378
                                                                pop_(tab);                              // Returning the pool will have released this. (YUK!)
 
379
                                                                release_(from_path);
 
380
                                                        }
 
381
                                                        else 
 
382
                                                                backtopool_(otab);
 
383
                                                }
 
384
                                        }
 
385
                                }
 
386
                        }
 
387
                        catch_(a) {
 
388
                                int err = self->myException.getErrorCode();
 
389
                                
 
390
                                if (err == MS_ERR_TABLE_LOCKED) {
 
391
                                        throw_();
 
392
                                }
 
393
                                else if (err == MS_ERR_REMOVING_REPO) {
 
394
                                        /* Wait for the compactor to finish: */
 
395
                                        myWaitTime = 2 * 1000;
 
396
                                        release_(buffer);
 
397
                                        return_(true);
 
398
                                }
 
399
                                else if ((err == MS_ERR_UNKNOWN_TABLE) || (err == MS_ERR_DATABASE_DELETED))
 
400
                                        ;
 
401
                                else
 
402
                                        self->myException.log(NULL);
 
403
                        }
 
404
                        cont_(a);
 
405
                }
 
406
                else {
 
407
                        // Only part of the data read, don't wait very long to try again:
 
408
                        myWaitTime = 2 * 1000;
 
409
                        break;
 
410
                }
 
411
                iLogOffset += iLogRecSize;
 
412
        }
 
413
 
 
414
        release_(buffer);
 
415
        return_(true);
 
416
}
 
417
 
 
418
void *MSTempLogThread::completeWork()
 
419
{
 
420
        close();
 
421
        return NULL;
 
422
}
 
423