~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pbms/src/database_ms.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-10-02 14:17:48 UTC
  • mfrom: (1.1.1 upstream)
  • mto: (2.1.17 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101002141748-m6vbfbfjhrw1153e
Tags: 2010.09.1802-1
* New upstream release.
* Removed pid-file argument hack.
* Updated GPL-2 address to be new address.
* Directly copy in drizzledump.1 since debian doesn't have sphinx 1.0 yet.
* Link to jquery from libjs-jquery. Add it as a depend.
* Add drizzled.8 symlink to the install files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original author: Paul McCullagh
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-05-25
 
23
 *
 
24
 * H&G2JCtL
 
25
 *
 
26
 * Network interface.
 
27
 *
 
28
 */
 
29
 
 
30
#ifdef DRIZZLED
 
31
#include "config.h"
 
32
#include <drizzled/common.h>
 
33
#include <drizzled/session.h>
 
34
#include <drizzled/table.h>
 
35
#include <drizzled/message/table.pb.h>
 
36
#include "drizzled/charset_info.h"
 
37
#include <drizzled/table_proto.h>
 
38
#include <drizzled/field.h>
 
39
#endif
 
40
 
 
41
#include "cslib/CSConfig.h"
 
42
 
 
43
#include <string.h>
 
44
#include <ctype.h>
 
45
#include <inttypes.h>
 
46
 
 
47
#include "string.h"
 
48
 
 
49
#include "cslib/CSGlobal.h"
 
50
#include "cslib/CSLog.h"
 
51
#include "cslib/CSDirectory.h"
 
52
#include "cslib/CSStrUtil.h"
 
53
 
 
54
#include "database_ms.h"
 
55
#include "open_table_ms.h"
 
56
#include "backup_ms.h"
 
57
#include "table_ms.h"
 
58
#include "temp_log_ms.h"
 
59
#include "network_ms.h"
 
60
#include "mysql_ms.h"
 
61
#include "pbmslib.h"
 
62
#include "transaction_ms.h"
 
63
//#include "systab_variable_ms.h"
 
64
#include "systab_httpheader_ms.h"
 
65
#include "parameters_ms.h"
 
66
#include "pbmsdaemon_ms.h"
 
67
 
 
68
 
 
69
 
 
70
CSSyncSortedList        *MSDatabase::gDatabaseList;
 
71
CSSparseArray           *MSDatabase::gDatabaseArray;
 
72
/*
 
73
 * -------------------------------------------------------------------------
 
74
 * PBMS DATABASES
 
75
 */
 
76
 
 
77
MSDatabase::MSDatabase():
 
78
myIsPBMS(false),
 
79
myDatabaseID(0),
 
80
myDatabaseName(NULL),
 
81
myDatabasePath(NULL),
 
82
myTempLogArray(NULL),
 
83
myCompactorThread(NULL),
 
84
myTempLogThread(NULL),
 
85
myRepostoryList(NULL),
 
86
myBlobCloud(NULL),
 
87
myBlobType(MS_STANDARD_STORAGE),
 
88
isBackup(false),
 
89
iBackupThread(NULL),
 
90
iBackupTime(0),
 
91
iRecovering(false),
 
92
#ifdef HAVE_ALIAS_SUPPORT
 
93
iBlobAliases(NULL),
 
94
#endif
 
95
iClosing(false),
 
96
iTableList(NULL),
 
97
iTableArray(NULL),
 
98
iMaxTableID(0),
 
99
iWriteTempLog(NULL),
 
100
iDropping(false),
 
101
iNextBlobRefId(0)
 
102
{
 
103
//startTracking();
 
104
}
 
105
 
 
106
MSDatabase::~MSDatabase()
 
107
{
 
108
        iClosing = true;
 
109
        if (iBackupThread) {
 
110
                iBackupThread->stop();
 
111
                iBackupThread->release();
 
112
                iBackupThread = NULL;
 
113
        }
 
114
        
 
115
        if (myTempLogThread) {
 
116
                myTempLogThread->stop();
 
117
                myTempLogThread->release();
 
118
                myTempLogThread = NULL;
 
119
        }
 
120
        if (myCompactorThread) {
 
121
                myRepostoryList->wakeup(); // The compator thread waits on this.
 
122
                myCompactorThread->stop();
 
123
                myCompactorThread->release();
 
124
                myCompactorThread = NULL;
 
125
        }
 
126
        
 
127
        if (myDatabasePath)
 
128
                myDatabasePath->release();
 
129
                
 
130
        if (myDatabaseName)
 
131
                myDatabaseName->release();
 
132
                
 
133
        iWriteTempLog = NULL;
 
134
        if (myTempLogArray) {
 
135
                myTempLogArray->clear();
 
136
                myTempLogArray->release();
 
137
        }
 
138
        if (iTableList) {
 
139
                iTableList->clear();
 
140
                iTableList->release();
 
141
        }
 
142
        if (iTableArray){
 
143
                iTableArray->clear();
 
144
                iTableArray->release();
 
145
        }
 
146
        if (myRepostoryList) {
 
147
                myRepostoryList->clear();
 
148
                myRepostoryList->release();
 
149
        }
 
150
#ifdef HAVE_ALIAS_SUPPORT
 
151
        if (iBlobAliases) {
 
152
                iBlobAliases->ma_close();
 
153
                iBlobAliases->release();
 
154
        }
 
155
#endif
 
156
        if (myBlobCloud) {
 
157
                myBlobCloud->release();
 
158
        }
 
159
}
 
160
 
 
161
uint32_t MSDatabase::fileToTableId(const char *file_name, const char *name_part)
 
162
{
 
163
        uint32_t value = 0;
 
164
 
 
165
        if (file_name) {
 
166
                const char *num = file_name +  strlen(file_name) - 1;
 
167
                
 
168
                while (num >= file_name && *num != '-')
 
169
                        num--;
 
170
                if (name_part) {
 
171
                        /* Check the name part of the file: */
 
172
                        int len = strlen(name_part);
 
173
                        
 
174
                        if (len != num - file_name)
 
175
                                return 0;
 
176
                        if (strncmp(file_name, name_part, len) != 0)
 
177
                                return 0;
 
178
                }
 
179
                num++;
 
180
                if (isdigit(*num))
 
181
                        sscanf(num, "%"PRIu32"", &value);
 
182
        }
 
183
        return value;
 
184
}
 
185
 
 
186
const char *MSDatabase::fileToTableName(size_t size, char *tab_name, const char *file_name)
 
187
{
 
188
        const char      *cptr;
 
189
        size_t          len;
 
190
 
 
191
        file_name = cs_last_name_of_path(file_name);
 
192
        cptr = file_name + strlen(file_name) - 1;
 
193
        while (cptr > file_name && *cptr != '.')
 
194
                cptr--;
 
195
        if (cptr > file_name && *cptr == '.') {
 
196
                if (strncmp(cptr, ".bs", 2) == 0) {
 
197
                        cptr--;
 
198
                        while (cptr > file_name && isdigit(*cptr))
 
199
                                cptr--;
 
200
                }
 
201
        }
 
202
 
 
203
        len = cptr - file_name;
 
204
        if (len > size-1)
 
205
                len = size-1;
 
206
 
 
207
        memcpy(tab_name, file_name, len);
 
208
        tab_name[len] = 0;
 
209
 
 
210
        /* Return a pointer to what was removed! */
 
211
        return file_name + len;
 
212
}
 
213
 
 
214
 
 
215
const char *MSDatabase::getDatabaseNameCString()
 
216
{
 
217
        return myDatabaseName->getCString();
 
218
}
 
219
 
 
220
MSTable *MSDatabase::getTable(CSString *tab_name, bool create)
 
221
{
 
222
        MSTable *tab;
 
223
        
 
224
        enter_();
 
225
        push_(tab_name);
 
226
        lock_(iTableList);
 
227
        if (!(tab = (MSTable *) iTableList->find(tab_name))) {
 
228
 
 
229
                if (create) {
 
230
                        /* Create a new table: */
 
231
                        tab = MSTable::newTable(iMaxTableID+1, RETAIN(tab_name), this, (off64_t) 0, false);
 
232
                        iTableList->add(tab);
 
233
                        iTableArray->set(iMaxTableID+1, RETAIN(tab));
 
234
                        iMaxTableID++;
 
235
                }
 
236
        }
 
237
        if (tab)
 
238
                tab->retain();
 
239
        unlock_(iTableList);
 
240
        release_(tab_name);
 
241
        return_(tab);
 
242
}
 
243
 
 
244
MSTable *MSDatabase::getTable(const char *tab_name, bool create)
 
245
{
 
246
        return getTable(CSString::newString(tab_name), create);
 
247
}
 
248
 
 
249
 
 
250
MSTable *MSDatabase::getTable(uint32_t tab_id, bool missing_ok)
 
251
{
 
252
        MSTable *tab;
 
253
        
 
254
        enter_();
 
255
        lock_(iTableList);
 
256
        if (!(tab = (MSTable *) iTableArray->get((uint32_t) tab_id))) {
 
257
                if (missing_ok) {
 
258
                        unlock_(iTableList);
 
259
                        return_(NULL);
 
260
                }
 
261
                char buffer[CS_EXC_MESSAGE_SIZE];
 
262
 
 
263
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown table #");
 
264
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) tab_id);
 
265
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, " in database ");
 
266
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, getDatabaseNameCString());
 
267
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_TABLE, buffer);
 
268
        }
 
269
        tab->retain();
 
270
        unlock_(iTableList);
 
271
        return_(tab);
 
272
}
 
273
 
 
274
MSTable *MSDatabase::getNextTable(uint32_t *pos)
 
275
{
 
276
        uint32_t i = *pos;
 
277
        MSTable *tab = NULL;
 
278
        
 
279
        enter_();
 
280
        lock_(iTableList);
 
281
        while (i < iTableList->getSize()) {
 
282
                tab = (MSTable *) iTableList->itemAt(i++);
 
283
                if (!tab->isToDelete())
 
284
                        break;
 
285
                tab = NULL;
 
286
        }
 
287
        if (tab)
 
288
                tab->retain();
 
289
        unlock_(iTableList);
 
290
        *pos = i;
 
291
        return_(tab);
 
292
}
 
293
 
 
294
void MSDatabase::addTable(uint32_t tab_id, const char *tab_name, off64_t file_size, bool to_delete)
 
295
{
 
296
        MSTable *tab;
 
297
 
 
298
        if (tab_id > iMaxTableID)
 
299
                iMaxTableID = tab_id;
 
300
        tab = MSTable::newTable(tab_id, tab_name, this, file_size, to_delete);
 
301
        iTableList->add(tab);
 
302
        iTableArray->set(tab_id, RETAIN(tab));
 
303
}
 
304
 
 
305
void MSDatabase::addTableFromFile(CSDirectory *dir, const char *file_name, bool to_delete)
 
306
{
 
307
        off64_t file_size;
 
308
        uint32_t        file_id;
 
309
        char    tab_name[MS_TABLE_NAME_SIZE];
 
310
 
 
311
        dir->info(NULL, &file_size, NULL);
 
312
        file_id = fileToTableId(file_name);
 
313
        fileToTableName(MS_TABLE_NAME_SIZE, tab_name, file_name);
 
314
        addTable(file_id, tab_name, file_size, to_delete);
 
315
}
 
316
 
 
317
void MSDatabase::removeTable(MSTable *tab)
 
318
{
 
319
        enter_();
 
320
        push_(tab);
 
321
        lock_(iTableList);
 
322
        iTableList->remove(tab->myTableName);
 
323
        iTableArray->remove(tab->myTableID);
 
324
        unlock_(iTableList);
 
325
        release_(tab);
 
326
        exit_();
 
327
}
 
328
 
 
329
void MSDatabase::dropTable(MSTable *tab)
 
330
{
 
331
        enter_();
 
332
        push_(tab);
 
333
        lock_(iTableList);
 
334
        iTableList->remove(tab->myTableName);
 
335
        iTableArray->remove(tab->myTableID);
 
336
 
 
337
        // Cute: you drop the table by adding it with the 'to_delete' flag set to 'true'
 
338
        addTable(tab->myTableID, tab->myTableName->getCString(), tab->getTableFileSize(), true);
 
339
 
 
340
        unlock_(iTableList);
 
341
        release_(tab);
 
342
        exit_();
 
343
}
 
344
 
 
345
// This function is used when dropping tables from a database before
 
346
// dropping the database itself. 
 
347
CSString *MSDatabase::getATableName()
 
348
{
 
349
        uint32_t i = 0;
 
350
        MSTable *tab;
 
351
        CSString *name = NULL;
 
352
        
 
353
        enter_();
 
354
        lock_(iTableList);
 
355
 
 
356
        while ((tab = (MSTable *) iTableList->itemAt(i++)) && tab->isToDelete()) ;
 
357
        if (tab) {
 
358
                name = tab->getTableName();
 
359
                name->retain();
 
360
        }
 
361
        unlock_(iTableList);
 
362
        return_(name);
 
363
}
 
364
 
 
365
uint32_t MSDatabase::getTableCount()
 
366
{
 
367
        uint32_t cnt = 0, i = 0;
 
368
        MSTable *tab;
 
369
        
 
370
        enter_();
 
371
        lock_(iTableList);
 
372
 
 
373
        while ((tab = (MSTable *) iTableList->itemAt(i++))) {
 
374
                if (!tab->isToDelete())
 
375
                        cnt++;
 
376
        }
 
377
 
 
378
        unlock_(iTableList);
 
379
        return_(cnt);
 
380
}
 
381
 
 
382
 
 
383
void MSDatabase::renameTable(MSTable *tab, const char *to_name)
 
384
{
 
385
        enter_();
 
386
        lock_(iTableList);
 
387
        iTableList->remove(tab->myTableName);
 
388
        iTableArray->remove(tab->myTableID);
 
389
 
 
390
        addTable(tab->myTableID, to_name, tab->getTableFileSize(), false);
 
391
 
 
392
        unlock_(iTableList);
 
393
        exit_();
 
394
}
 
395
 
 
396
void MSDatabase::openWriteRepo(MSOpenTable *otab)
 
397
{
 
398
        if (otab->myWriteRepo && otab->myWriteRepoFile)
 
399
                return;
 
400
 
 
401
        enter_();
 
402
        if (!otab->myWriteRepo)
 
403
                otab->myWriteRepo = lockRepo(0);
 
404
 
 
405
        /* Now open the repo file for the open table: */
 
406
        otab->myWriteRepo->openRepoFileForWriting(otab);
 
407
        exit_();
 
408
}
 
409
 
 
410
MSRepository *MSDatabase::getRepoFullOfTrash(time_t *ret_wait_time)
 
411
{
 
412
        MSRepository    *repo = NULL;
 
413
        time_t                  wait_time = 0;
 
414
        
 
415
        if (ret_wait_time)
 
416
                wait_time = *ret_wait_time;
 
417
        enter_();
 
418
        lock_(myRepostoryList);
 
419
        for (uint32_t i=0; i<myRepostoryList->size(); i++) {
 
420
                retry:
 
421
                if ((repo = (MSRepository *) myRepostoryList->get(i))) {
 
422
                        if (!repo->isRemovingFP && !repo->mustBeDeleted && !repo->isRepoLocked()) {
 
423
                                if (!repo->myRepoHeadSize) {
 
424
                                        /* The file has not yet been opened, so the
 
425
                                         * garbage count will not be known!
 
426
                                         */
 
427
                                        MSRepoFile *repo_file;
 
428
 
 
429
                                        repo->retain();
 
430
                                        unlock_(myRepostoryList);
 
431
                                        push_(repo);
 
432
                                        repo_file = repo->openRepoFile();
 
433
                                        repo_file->release();
 
434
                                        release_(repo);
 
435
                                        lock_(myRepostoryList);
 
436
                                        goto retry;
 
437
                                }
 
438
                                if (repo->getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
 
439
                                        /* Make sure there are not temp BLOBs in this repository that have
 
440
                                         * not yet timed out:
 
441
                                         */
 
442
                                        time_t now = time(NULL);
 
443
                                        time_t then = repo->myLastTempTime;
 
444
 
 
445
                                        /* Check if there are any temp BLOBs to be removed: */
 
446
                                        if (now > (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
 
447
                                                repo->lockRepo(REPO_COMPACTING); 
 
448
                                                repo->retain();
 
449
                                                break;
 
450
                                        }
 
451
                                        else {
 
452
                                                /* There are temp BLOBs to wait for... */
 
453
                                                if (!wait_time || wait_time > MSTempLog::adjustWaitTime(then, now))
 
454
                                                        wait_time = MSTempLog::adjustWaitTime(then, now);
 
455
                                        }
 
456
                                }
 
457
                        }
 
458
                        repo = NULL;
 
459
                }
 
460
        }
 
461
        unlock_(myRepostoryList);
 
462
        if (ret_wait_time)
 
463
                *ret_wait_time = wait_time;
 
464
        return_(repo);
 
465
}
 
466
 
 
467
MSRepository *MSDatabase::lockRepo(off64_t size)
 
468
{
 
469
        MSRepository    *repo;
 
470
        uint32_t                        free_slot;
 
471
 
 
472
        enter_();
 
473
        lock_(myRepostoryList);
 
474
        free_slot = myRepostoryList->size();
 
475
        /* Find an unlocked repository file that is below the write threshold: */
 
476
        for (uint32_t i=0; i<myRepostoryList->size(); i++) {
 
477
                if ((repo = (MSRepository *) myRepostoryList->get(i))) {
 
478
                        if ((!repo->isRepoLocked()) && (!repo->isRemovingFP) && (!repo->mustBeDeleted) &&
 
479
                                ((repo->myRepoFileSize + size) < PBMSParameters::getRepoThreshold()) 
 
480
                                /**/ && (repo->getGarbageLevel() < PBMSParameters::getGarbageThreshold()))
 
481
                                goto found1;
 
482
                }
 
483
                else {
 
484
                        if (i < free_slot)
 
485
                                free_slot = i;
 
486
                }
 
487
        }
 
488
 
 
489
        /* None found, create a new repo file: */
 
490
        new_(repo, MSRepository(free_slot + 1, this, 0));
 
491
        myRepostoryList->set(free_slot, repo);
 
492
 
 
493
        found1:
 
494
        repo->retain();
 
495
        repo->lockRepo(REPO_WRITE);  // <- The MSRepository::backToPool() will unlock this.
 
496
        unlock_(myRepostoryList);
 
497
        return_(repo);
 
498
}
 
499
 
 
500
MSRepoFile *MSDatabase::getRepoFileFromPool(uint32_t repo_id, bool missing_ok)
 
501
{
 
502
        MSRepository    *repo;
 
503
        MSRepoFile              *file;
 
504
 
 
505
        enter_();
 
506
        lock_(myRepostoryList);
 
507
        if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1))) {
 
508
                if (!missing_ok) {
 
509
                        char buffer[CS_EXC_MESSAGE_SIZE];
 
510
 
 
511
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown repository file: ");
 
512
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
 
513
                        CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
 
514
                }
 
515
                unlock_(myRepostoryList);
 
516
                return_(NULL);
 
517
        }
 
518
        if (repo->isRemovingFP) {
 
519
                char buffer[CS_EXC_MESSAGE_SIZE];
 
520
 
 
521
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Repository will be removed: ");
 
522
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
 
523
                CSException::throwException(CS_CONTEXT, MS_ERR_REMOVING_REPO, buffer);
 
524
        }
 
525
        repo->retain(); /* Release is here: [++] */
 
526
        file = repo->getRepoFile();
 
527
        unlock_(myRepostoryList);
 
528
 
 
529
        if (!file) {
 
530
                file = repo->openRepoFile();
 
531
                lock_(myRepostoryList);
 
532
                repo->addRepoFile(file);
 
533
                file->retain();
 
534
                unlock_(myRepostoryList);
 
535
        }
 
536
        return_(file);
 
537
}
 
538
 
 
539
void MSDatabase::returnRepoFileToPool(MSRepoFile *file)
 
540
{
 
541
        MSRepository    *repo;
 
542
 
 
543
        enter_();
 
544
        lock_(myRepostoryList);
 
545
        push_(file);
 
546
        if ((repo = file->myRepo)) {
 
547
                if (repo->isRemovingFP) {
 
548
                        repo->removeRepoFile(file);
 
549
                        myRepostoryList->wakeup();
 
550
                }
 
551
                else
 
552
                        repo->returnRepoFile(file);
 
553
                repo->release(); /* [++] here is the release.  */
 
554
        }
 
555
        release_(file);
 
556
        unlock_(myRepostoryList);
 
557
        exit_();
 
558
}
 
559
 
 
560
void MSDatabase::removeRepo(uint32_t repo_id, bool *mustQuit)
 
561
{
 
562
        MSRepository *repo;
 
563
 
 
564
        enter_();
 
565
        lock_(myRepostoryList);
 
566
        while ((!mustQuit || !*mustQuit) && !iClosing) {
 
567
                if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1)))
 
568
                        break;
 
569
                repo->isRemovingFP = true;
 
570
                if (repo->removeRepoFilesNotInUse()) {
 
571
                        myRepostoryList->set(repo_id - 1, NULL);
 
572
                        break;
 
573
                }
 
574
                /*
 
575
                 * Wait for the files that are in use to be
 
576
                 * freed.
 
577
                 */
 
578
                myRepostoryList->wait();
 
579
        }
 
580
        unlock_(myRepostoryList);
 
581
        exit_();
 
582
}
 
583
 
 
584
void MSDatabase::queueTempLogEvent(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
 
585
         uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time)
 
586
{
 
587
        MSTempLogItemRec        item;
 
588
        uint32_t                                timev;
 
589
 
 
590
        // Each otab object holds an handle to an instance of an OPEN
 
591
        // temp log. This is so that each thread has it's own open temp log 
 
592
        // and doesn't need to be opened and close it constantly.
 
593
        
 
594
        enter_();
 
595
        lock_(myTempLogArray);
 
596
        if (!iWriteTempLog) {
 
597
                iWriteTempLog = (MSTempLog *) myTempLogArray->last();
 
598
                if (!iWriteTempLog) {
 
599
                        new_(iWriteTempLog, MSTempLog(1, this, 0));
 
600
                        myTempLogArray->set(1, iWriteTempLog);
 
601
                }
 
602
        }
 
603
        if (!otab->myTempLogFile)
 
604
                otab->myTempLogFile = iWriteTempLog->openTempLog();
 
605
        else if (otab->myTempLogFile->myTempLogID != iWriteTempLog->myLogID) {
 
606
                otab->myTempLogFile->release();
 
607
                otab->myTempLogFile = NULL;
 
608
                otab->myTempLogFile = iWriteTempLog->openTempLog();
 
609
        }
 
610
 
 
611
        if (iWriteTempLog->myTempLogSize >= PBMSParameters::getTempLogThreshold()) {
 
612
                uint32_t tmp_log_id = iWriteTempLog->myLogID + 1;
 
613
 
 
614
                new_(iWriteTempLog, MSTempLog(tmp_log_id, this, 0));
 
615
                myTempLogArray->set(tmp_log_id, iWriteTempLog);
 
616
 
 
617
                otab->myTempLogFile->release();
 
618
                otab->myTempLogFile = NULL;
 
619
                otab->myTempLogFile = iWriteTempLog->openTempLog();
 
620
 
 
621
        }
 
622
 
 
623
        timev = time(NULL);
 
624
        *log_id = iWriteTempLog->myLogID;
 
625
        *log_offset = (uint32_t) iWriteTempLog->myTempLogSize;
 
626
        if (q_time)
 
627
                *q_time = timev;
 
628
        iWriteTempLog->myTempLogSize += iWriteTempLog->myTemplogRecSize;
 
629
        unlock_(myTempLogArray);
 
630
 
 
631
        CS_SET_DISK_1(item.ti_type_1, type);
 
632
        CS_SET_DISK_4(item.ti_table_id_4, tab_id);
 
633
        CS_SET_DISK_6(item.ti_blob_id_6, blob_id);
 
634
        CS_SET_DISK_4(item.ti_auth_code_4, auth_code);
 
635
        CS_SET_DISK_4(item.ti_time_4, timev);
 
636
        otab->myTempLogFile->write(&item, *log_offset, sizeof(MSTempLogItemRec));
 
637
        
 
638
        
 
639
        exit_();
 
640
}
 
641
 
 
642
#ifdef HAVE_ALIAS_SUPPORT
 
643
void MSDatabase::queueForDeletion(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
 
644
         uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time, MSDiskAliasPtr aliasDiskRec)
 
645
{
 
646
        enter_();
 
647
        
 
648
        queueTempLogEvent(otab, type, tab_id, blob_id, auth_code, log_id, log_offset, q_time);
 
649
                
 
650
        // If it has an alias remove it from the ailias index.
 
651
        if (aliasDiskRec) {
 
652
                try_(a) {
 
653
                        deleteBlobAlias(aliasDiskRec);
 
654
                }
 
655
                catch_(a);
 
656
                self->logException();
 
657
                cont_(a);
 
658
        }
 
659
        
 
660
        exit_();
 
661
}
 
662
#endif
 
663
 
 
664
MSTempLogFile *MSDatabase::openTempLogFile(uint32_t log_id, size_t *log_rec_size, size_t *log_head_size)
 
665
{
 
666
        MSTempLog               *log;
 
667
        MSTempLogFile   *log_file = NULL;
 
668
 
 
669
        enter_();
 
670
        lock_(myTempLogArray);
 
671
        if (log_id)
 
672
                log = (MSTempLog *) myTempLogArray->get(log_id);
 
673
        else
 
674
                log = (MSTempLog *) myTempLogArray->first();
 
675
        if (log) {
 
676
                log_file = log->openTempLog();
 
677
                if (log_rec_size)
 
678
                        *log_rec_size = log->myTemplogRecSize;
 
679
                if (log_head_size)
 
680
                        *log_head_size = log->myTempLogHeadSize;
 
681
        }
 
682
        unlock_(myTempLogArray);
 
683
        return_(log_file);
 
684
}
 
685
 
 
686
uint32_t MSDatabase::getTempLogCount()
 
687
{
 
688
        uint32_t count;
 
689
 
 
690
        enter_();
 
691
        lock_(myTempLogArray);
 
692
        count = myTempLogArray->size();
 
693
        unlock_(myTempLogArray);
 
694
        return_(count);
 
695
}
 
696
 
 
697
void MSDatabase::removeTempLog(uint32_t log_id)
 
698
{
 
699
        enter_();
 
700
        lock_(myTempLogArray);
 
701
        myTempLogArray->remove(log_id);
 
702
        unlock_(myTempLogArray);
 
703
        exit_();
 
704
}
 
705
 
 
706
CSObject *MSDatabase::getKey()
 
707
{
 
708
        return (CSObject *) myDatabaseName;
 
709
}
 
710
 
 
711
int MSDatabase::compareKey(CSObject *key)
 
712
{
 
713
        return myDatabaseName->compare((CSString *) key);
 
714
}
 
715
 
 
716
MSCompactorThread *MSDatabase::getCompactorThread()
 
717
{
 
718
        return myCompactorThread;
 
719
}
 
720
 
 
721
CSSyncVector *MSDatabase::getRepositoryList()
 
722
{       
 
723
        return myRepostoryList;
 
724
}
 
725
 
 
726
#ifdef HAVE_ALIAS_SUPPORT
 
727
uint32_t MSDatabase::registerBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
 
728
{
 
729
        uint32_t hash;
 
730
        bool can_retry = true;
 
731
        enter_();
 
732
        
 
733
retry:
 
734
        lock_(&iBlobAliaseLock);
 
735
        
 
736
        try_(a) {
 
737
                hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
 
738
        }
 
739
        
 
740
        catch_(a) {
 
741
                unlock_(&iBlobAliaseLock);
 
742
                if (can_retry) {
 
743
                        // It can be that a duplicater alias exists that was deleted
 
744
                        // but the transaction has not been written to the repository yet.
 
745
                        // Flush all committed transactions to the repository file.
 
746
                        MSTransactionManager::flush();
 
747
                        can_retry = false;
 
748
                        goto retry;
 
749
                }
 
750
                throw_();
 
751
        }
 
752
        
 
753
        cont_(a);
 
754
        unlock_(&iBlobAliaseLock);
 
755
        return_(hash);
 
756
}
 
757
 
 
758
uint32_t MSDatabase::updateBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t old_alias_hash, const char *alias)
 
759
{
 
760
        uint32_t new_hash;
 
761
        enter_();
 
762
        lock_(&iBlobAliaseLock);
 
763
        
 
764
        new_hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
 
765
        iBlobAliases->deleteAlias(repo_id, repo_offset, old_alias_hash);
 
766
 
 
767
        unlock_(&iBlobAliaseLock);
 
768
        return_(new_hash);
 
769
}
 
770
 
 
771
void MSDatabase::deleteBlobAlias(MSDiskAliasPtr diskRec)
 
772
{
 
773
        enter_();
 
774
        lock_(&iBlobAliaseLock);
 
775
        iBlobAliases->deleteAlias(diskRec);
 
776
        unlock_(&iBlobAliaseLock);
 
777
        exit_();
 
778
}
 
779
 
 
780
void MSDatabase::deleteBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
 
781
{
 
782
        MSDiskAliasRec diskRec;
 
783
        
 
784
        CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);   
 
785
        CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);        
 
786
        CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);
 
787
        deleteBlobAlias(&diskRec);
 
788
}
 
789
 
 
790
void MSDatabase::moveBlobAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
 
791
{
 
792
        enter_();
 
793
        lock_(&iBlobAliaseLock);
 
794
        iBlobAliases->resetAlias(old_repo_id, old_repo_offset, alias_hash, new_repo_id, new_repo_offset);
 
795
        unlock_(&iBlobAliaseLock);
 
796
        exit_();
 
797
}
 
798
#endif
 
799
 
 
800
bool MSDatabase::isValidHeaderField(const char *name)
 
801
{
 
802
        bool is_valid = false;
 
803
        CSString                *header;
 
804
        enter_();
 
805
 
 
806
        if (name && *name) {
 
807
                if (strcasecmp(name, MS_ALIAS_TAG)) {
 
808
                        lock_(&iHTTPMetaDataHeaders);
 
809
                        header = CSString::newString(name);
 
810
                        push_(header);
 
811
                                
 
812
                        is_valid = (iHTTPMetaDataHeaders.find(header) != NULL);
 
813
                        release_(header);
 
814
                        
 
815
                        unlock_(&iHTTPMetaDataHeaders);
 
816
                } else 
 
817
                        is_valid = true;
 
818
        }
 
819
        
 
820
        return_(is_valid);
 
821
}
 
822
 
 
823
void MSDatabase::startUp(const char *default_http_headers)
 
824
{
 
825
        enter_();
 
826
        
 
827
        new_(gDatabaseList, CSSyncSortedList);
 
828
        new_(gDatabaseArray, CSSparseArray(5));
 
829
        MSHTTPHeaderTable::setDefaultMetaDataHeaders(default_http_headers);
 
830
        PBMSSystemTables::systemTablesStartUp();
 
831
        PBMSParameters::setBackupDatabaseID(1);
 
832
        exit_();
 
833
}
 
834
 
 
835
void MSDatabase::stopThreads()
 
836
{
 
837
        MSDatabase *db;
 
838
 
 
839
        enter_();
 
840
        if (gDatabaseList) {
 
841
                lock_(gDatabaseList);
 
842
                for (int i=0;;i++) {
 
843
                        if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
 
844
                                break;
 
845
                        db->iClosing = true;
 
846
                        
 
847
                        if (db->myTempLogThread) {
 
848
                                db->myTempLogThread->stop();
 
849
                                db->myTempLogThread->release();
 
850
                                db->myTempLogThread = NULL;
 
851
                        }
 
852
                        if (db->myCompactorThread) {
 
853
                                db->myRepostoryList->wakeup(); // The compator thread waits on this.
 
854
                                db->myCompactorThread->stop();
 
855
                                db->myCompactorThread->release();
 
856
                                db->myCompactorThread = NULL;
 
857
                        }
 
858
                        
 
859
                        if (db->iBackupThread) {
 
860
                                db->iBackupThread->stop();
 
861
                                db->iBackupThread->release();
 
862
                                db->iBackupThread = NULL;
 
863
                        }
 
864
        
 
865
                }
 
866
                
 
867
                unlock_(gDatabaseList);
 
868
        }
 
869
        exit_();
 
870
}
 
871
 
 
872
void MSDatabase::shutDown()
 
873
{
 
874
                
 
875
        if (gDatabaseArray) {
 
876
                gDatabaseArray->clear();
 
877
                gDatabaseArray->release();
 
878
                gDatabaseArray = NULL;
 
879
        }
 
880
        
 
881
        if (gDatabaseList) {
 
882
                gDatabaseList->clear();
 
883
                gDatabaseList->release();
 
884
                gDatabaseList = NULL;
 
885
        }
 
886
        
 
887
        MSHTTPHeaderTable::releaseDefaultMetaDataHeaders();
 
888
        PBMSSystemTables::systemTableShutDown();
 
889
}
 
890
 
 
891
void MSDatabase::setBackupDatabase()
 
892
{
 
893
        enter_();
 
894
        // I need to give the backup database a unique fake database ID.
 
895
        // This is so that it is not confused with the database being backed
 
896
        // backed up when opening tables.
 
897
        
 
898
        // Normally database IDs are generated by time(NULL) so small database IDs
 
899
        // are safe to use as fake IDs.
 
900
         
 
901
        lock_(gDatabaseList);
 
902
        myDatabaseID = PBMSParameters::getBackupDatabaseID() +1;
 
903
        PBMSParameters::setBackupDatabaseID(myDatabaseID);
 
904
        gDatabaseArray->set(myDatabaseID, RETAIN(this));
 
905
        isBackup = true;
 
906
        
 
907
        // Notify the cloud storage, if any, that it is a backup.
 
908
        // This is important because if the backup database is dropped
 
909
        // we need to be sure that only the BLOBs belonging to the
 
910
        // backup are removed from the cloud.
 
911
        myBlobCloud->cl_setCloudIsBackup(); 
 
912
        
 
913
        unlock_(gDatabaseList);
 
914
        
 
915
        // Rename the database path so that it is obviouse that this is an incomplete backup database.
 
916
        // When the backup is completed it will be renamed back.
 
917
        CSPath *new_path = CSPath::newPath(myDatabasePath->concat("#"));
 
918
        push_(new_path);
 
919
        
 
920
        if (new_path->exists()) 
 
921
                new_path->remove();
 
922
        
 
923
        CSPath *db_path = CSPath::newPath(RETAIN(myDatabasePath));
 
924
        push_(db_path);
 
925
        
 
926
        db_path->rename(new_path->getNameCString());
 
927
        myDatabasePath->release();
 
928
        myDatabasePath = new_path->getString();
 
929
        myDatabasePath->retain();
 
930
        
 
931
        release_(db_path);
 
932
        release_(new_path);
 
933
        
 
934
        
 
935
        exit_();
 
936
}
 
937
 
 
938
void MSDatabase::releaseBackupDatabase()
 
939
{
 
940
        enter_();
 
941
 
 
942
        
 
943
        // The backup has completed succefully, rename the path to the correct name.
 
944
        CSPath *db_path = CSPath::newPath(myDatabasePath->getCString());
 
945
        push_(db_path);
 
946
        
 
947
        myDatabasePath->setLength(myDatabasePath->length()-1);
 
948
        db_path->rename(cs_last_name_of_path(myDatabasePath->getCString()));
 
949
        release_(db_path);
 
950
        
 
951
        // Remove the backup database object.
 
952
        lock_(gDatabaseList);
 
953
        gDatabaseArray->remove(myDatabaseID);
 
954
        MSTableList::removeDatabaseTables(this); // Will also release the database object.
 
955
        unlock_(gDatabaseList);
 
956
        
 
957
        
 
958
        exit_();
 
959
}
 
960
 
 
961
void MSDatabase::startBackup(MSBackupInfo *backup_info)
 
962
{
 
963
        enter_();
 
964
 
 
965
        push_(backup_info);
 
966
        if (iBackupThread) {
 
967
                if (iBackupThread->isRunning()) {
 
968
                        CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, "A backup is still running.");
 
969
                }
 
970
                iBackupThread->release();
 
971
                iBackupThread = NULL;
 
972
        }
 
973
        
 
974
        pop_(backup_info);
 
975
        iBackupThread = MSBackup::newMSBackup(backup_info);
 
976
        
 
977
        try_(a) {
 
978
                iBackupThread->startBackup(RETAIN(this));
 
979
        }
 
980
        
 
981
        catch_(a) {
 
982
                iBackupThread->release();
 
983
                iBackupThread = NULL;
 
984
                throw_();
 
985
        }
 
986
        cont_(a);
 
987
        
 
988
        exit_();
 
989
}
 
990
 
 
991
bool MSDatabase::backupStatus(uint64_t *total, uint64_t *completed, bool *completed_ok)
 
992
{
 
993
        bool done;
 
994
        
 
995
        enter_();
 
996
        
 
997
        if (iBackupThread) {
 
998
                *total = iBackupThread->getBackupSize();
 
999
                *completed = iBackupThread->getBackupCompletedSize();
 
1000
                done = !iBackupThread->isRunning();
 
1001
                *completed = (iBackupThread->getStatus() == 0);
 
1002
        } else {
 
1003
                *completed_ok = done = true;
 
1004
                *total = *completed = 0;                        
 
1005
        }
 
1006
                
 
1007
        return_(done);
 
1008
}
 
1009
 
 
1010
uint32_t MSDatabase::backupID()
 
1011
 
1012
        return (iBackupThread)?iBackupThread->backupID(): 0;
 
1013
}
 
1014
 
 
1015
void MSDatabase::terminateBackup()
 
1016
{
 
1017
        if (iBackupThread) {
 
1018
                iBackupThread->stop();
 
1019
                iBackupThread->release();
 
1020
                iBackupThread = NULL;
 
1021
        }
 
1022
}
 
1023
 
 
1024
MSDatabase *MSDatabase::getDatabase(CSString *db_name, bool create)
 
1025
{
 
1026
        MSDatabase *db;
 
1027
        enter_();
 
1028
        push_(db_name);
 
1029
        
 
1030
        
 
1031
        lock_(gDatabaseList);
 
1032
        if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
 
1033
                db = MSDatabase::loadDatabase(RETAIN(db_name), create);
 
1034
                if (!db)
 
1035
                        goto exit;
 
1036
        } else
 
1037
                db->retain();
 
1038
        
 
1039
        exit:
 
1040
        unlock_(gDatabaseList);
 
1041
        release_(db_name);
 
1042
        return_(db);
 
1043
}
 
1044
 
 
1045
MSDatabase *MSDatabase::getDatabase(const char *db_name, bool create)
 
1046
{
 
1047
        return getDatabase(CSString::newString(db_name), create);
 
1048
}
 
1049
 
 
1050
MSDatabase *MSDatabase::getDatabase(uint32_t db_id)
 
1051
{
 
1052
        MSDatabase *db;
 
1053
        
 
1054
        enter_();
 
1055
        lock_(gDatabaseList);
 
1056
        if ((db = (MSDatabase *) gDatabaseArray->get((uint32_t) db_id))) 
 
1057
                db->retain();
 
1058
        else {
 
1059
                // Look for the database folder with the correct ID:
 
1060
                CSPath *path = CSPath::newPath(PBMSDaemon::getPBMSDir());
 
1061
                push_(path);
 
1062
                if (path->exists()) {
 
1063
                        CSDirectory *dir;
 
1064
                        dir = CSDirectory::newDirectory(RETAIN(path));
 
1065
                        push_(dir);
 
1066
                        dir->open();
 
1067
                        
 
1068
                        while (dir->next() && !db) {
 
1069
                                if (!dir->isFile()) {
 
1070
                                        const char *ptr, *dir_name  = dir->name();
 
1071
                                        ptr = dir_name + strlen(dir_name) -1;
 
1072
                                        
 
1073
                                        while (ptr > dir_name && *ptr != '-') ptr--;
 
1074
                                        
 
1075
                                        if (*ptr ==  '-') {
 
1076
                                                int len = ptr - dir_name;
 
1077
                                                ptr++;
 
1078
                                                if ((strtoul(ptr, NULL, 10) == db_id) && len) {
 
1079
                                                        db = getDatabase(CSCString::newString(dir_name, len), true);
 
1080
                                                        ASSERT(db->myDatabaseID == db_id);
 
1081
                                                }
 
1082
                                        }
 
1083
                                }
 
1084
                        }
 
1085
                        release_(dir);
 
1086
                }
 
1087
                release_(path);         
 
1088
        }
 
1089
        unlock_(gDatabaseList);
 
1090
        
 
1091
        if (!db) {
 
1092
                char buffer[CS_EXC_MESSAGE_SIZE];
 
1093
 
 
1094
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database #");
 
1095
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) db_id);
 
1096
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
 
1097
        }
 
1098
        return_(db);
 
1099
}
 
1100
 
 
1101
void MSDatabase::wakeTempLogThreads()
 
1102
{
 
1103
        MSDatabase *db;
 
1104
 
 
1105
        if (!gDatabaseList)
 
1106
                return;
 
1107
        enter_();
 
1108
        lock_(gDatabaseList);
 
1109
        for (int i=0;;i++) {
 
1110
                if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
 
1111
                        break;
 
1112
                if (db->myTempLogThread)
 
1113
                        db->myTempLogThread->wakeup();
 
1114
        }
 
1115
        unlock_(gDatabaseList);
 
1116
        exit_();
 
1117
}
 
1118
 
 
1119
uint32_t MSDatabase::getDBID(CSPath *path, CSString *db_name)
 
1120
{
 
1121
        CSDirectory             *dir;
 
1122
        uint32_t                        db_id = 0;
 
1123
        int                             len = db_name->length();
 
1124
        const char              *ptr;
 
1125
        
 
1126
        enter_();
 
1127
        push_(db_name);
 
1128
        push_(path);
 
1129
        
 
1130
        // Search for the ID of the database
 
1131
        dir = CSDirectory::newDirectory(RETAIN(path));
 
1132
        push_(dir);
 
1133
        dir->open();
 
1134
        while (dir->next() && !db_id) 
 
1135
                {
 
1136
                if (!dir->isFile()){
 
1137
                        ptr = dir->name() + strlen(dir->name()) -1;
 
1138
                        while (ptr > dir->name() && isdigit(*ptr)) ptr--;
 
1139
                        if ((*ptr == '-') && (len == (ptr - dir->name())) && !db_name->compare(dir->name(), len) ) {
 
1140
                                db_id = atol(ptr+1);                            
 
1141
                        }
 
1142
                }
 
1143
        }
 
1144
        release_(dir);
 
1145
        
 
1146
        if (!db_id) {
 
1147
                db_id = time(NULL);
 
1148
 
 
1149
                while (1) { // search for a unique db_id
 
1150
                        dir = CSDirectory::newDirectory(RETAIN(path));
 
1151
                        push_(dir);
 
1152
                        dir->open();
 
1153
                        while (db_id && dir->next()) {
 
1154
                                if (!dir->isFile()) {
 
1155
                                        ptr = dir->name() + strlen(dir->name()) -1;
 
1156
                                        while (ptr > dir->name() && isdigit(*ptr)) ptr--;
 
1157
                                        if ((*ptr == '-') && (db_id == strtoul(ptr+1, NULL, 10))) {
 
1158
                                                db_id = 0;                              
 
1159
                                        }
 
1160
                                }
 
1161
                        }
 
1162
                        release_(dir);
 
1163
                        if (db_id)
 
1164
                                break;
 
1165
                        sleep(1); // Allow 1 second to pass.
 
1166
                        db_id = time(NULL);
 
1167
                } 
 
1168
        }
 
1169
        
 
1170
        release_(path);
 
1171
        release_(db_name);
 
1172
        return_(db_id);
 
1173
}
 
1174
 
 
1175
CSPath *MSDatabase::createDatabasePath(const char *location, CSString *db_name, uint32_t *db_id_ptr, bool *create, bool is_pbms)
 
1176
{
 
1177
        bool create_path = *create;     
 
1178
        CSPath *path = NULL;
 
1179
        char name_buffer[MS_DATABASE_NAME_SIZE + 40];
 
1180
        uint32_t db_id;
 
1181
        enter_();
 
1182
        
 
1183
        push_(db_name);
 
1184
        *create = false;
 
1185
        path = CSPath::newPath(location, "pbms");
 
1186
        push_(path);
 
1187
        if (!path->exists()) {
 
1188
                if (!create_path){
 
1189
                        release_(path);
 
1190
                        path = NULL;
 
1191
                        goto done;
 
1192
                }
 
1193
                        
 
1194
                *create = true;
 
1195
                path->makeDir();
 
1196
        }
 
1197
 
 
1198
        // If this is the pbms database then nothing more is to be done.
 
1199
        if (is_pbms)
 
1200
                goto done;
 
1201
        
 
1202
        if ((!db_id_ptr) || !*db_id_ptr) {
 
1203
                db_id = getDBID(RETAIN(path), RETAIN(db_name));
 
1204
                if (db_id_ptr)
 
1205
                        *db_id_ptr = db_id;
 
1206
        } else
 
1207
                db_id = *db_id_ptr;
 
1208
 
 
1209
        // Create the PBMS database name with ID
 
1210
        cs_strcpy(MS_DATABASE_NAME_SIZE + 40, name_buffer, db_name->getCString());
 
1211
        cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, "-");
 
1212
        cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, (uint32_t) db_id);
 
1213
                        
 
1214
        pop_(path);
 
1215
        path = CSPath::newPath(path, name_buffer);
 
1216
        push_(path);
 
1217
        if (!path->exists()) {
 
1218
                if (create_path) {
 
1219
                        *create = true;
 
1220
                        path->makeDir();
 
1221
                } else {
 
1222
                        release_(path);
 
1223
                        path = NULL;
 
1224
                }
 
1225
                        
 
1226
        }
 
1227
        
 
1228
done:
 
1229
        if (path)
 
1230
                pop_(path);
 
1231
        release_(db_name);
 
1232
        return_(path);
 
1233
}
 
1234
 
 
1235
 
 
1236
 
 
1237
MSDatabase *MSDatabase::newDatabase(const char *db_location, CSString *db_name, uint32_t        db_id, bool create)
 
1238
{
 
1239
        MSDatabase              *db = NULL;
 
1240
        CSDirectory             *dir;
 
1241
        MSRepository    *repo;
 
1242
        CSPath                  *path;
 
1243
        const char              *file_name;
 
1244
        uint32_t                file_id;
 
1245
        off64_t                 file_size;
 
1246
        MSTempLog               *log;
 
1247
        uint32_t                to_delete = 0;
 
1248
        CSString                *db_path;
 
1249
        bool                    is_pbms = false;
 
1250
 
 
1251
        enter_();
 
1252
 
 
1253
        push_(db_name);
 
1254
 
 
1255
        //is_pbms = (strcmp(db_name->getCString(), "pbms") == 0); To be done later.
 
1256
        
 
1257
        /*
 
1258
         * Block the creation of the pbms database if there is no MySQL database. 
 
1259
         * The database name is case sensitive here if the file system names are
 
1260
         * case sensitive. This is desirable.
 
1261
         */
 
1262
        path = CSPath::newPath(ms_my_get_mysql_home_path(), RETAIN(db_name));
 
1263
        push_(path);
 
1264
        if (create && !path->exists()) {
 
1265
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
 
1266
        }
 
1267
        release_(path);
 
1268
        
 
1269
         // Create the database path, if 'create' == false then it can return NULL
 
1270
        path = createDatabasePath(db_location, RETAIN(db_name), &db_id, &create, is_pbms);
 
1271
        if (!path) {
 
1272
                release_(db_name);
 
1273
                return_(NULL);
 
1274
        }
 
1275
        push_(path);
 
1276
        
 
1277
        // Create the database object and initialize it.
 
1278
        if (!(db = new MSDatabase())) {
 
1279
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
1280
        }
 
1281
        
 
1282
        db->myIsPBMS = is_pbms;
 
1283
        db_path = path->getString();
 
1284
        db_path->retain();
 
1285
        release_(path);
 
1286
        path = NULL;
 
1287
                
 
1288
        db->iNextBlobRefId = (uint32_t) time(NULL);
 
1289
        db->iNextBlobRefId <<= 32;
 
1290
        db->iNextBlobRefId = COMMIT_MASK(db->iNextBlobRefId);
 
1291
        db->iNextBlobRefId++;
 
1292
        
 
1293
        db->myDatabaseID = db_id;
 
1294
        db->myDatabasePath = db_path;
 
1295
        db->myDatabaseName = db_name;
 
1296
        new_(db->myBlobCloud, CloudDB(db_id));
 
1297
 
 
1298
        pop_(db_name);  
 
1299
        
 
1300
        push_(db);
 
1301
        
 
1302
        
 
1303
        new_(db->myTempLogArray, CSSyncSparseArray(20));
 
1304
        new_(db->iTableList, CSSyncSortedList());
 
1305
        new_(db->iTableArray, CSSparseArray(20));
 
1306
        new_(db->myRepostoryList, CSSyncVector(20));
 
1307
        
 
1308
        if (!is_pbms) { //
 
1309
#ifdef HAVE_ALIAS_SUPPORT
 
1310
                //db->retain(); no retain here, MSAlias() takes a back ref.
 
1311
                new_(db->iBlobAliases, MSAlias(db));
 
1312
#endif          
 
1313
                /* "Load" the database: */
 
1314
 
 
1315
                /* Get the max table ID: */
 
1316
                dir = CSDirectory::newDirectory(RETAIN(db_path));
 
1317
                push_(dir);
 
1318
                dir->open();
 
1319
                while (dir->next()) {
 
1320
                        file_name = dir->name();
 
1321
                        if (dir->isFile() && cs_is_extension(file_name, "bst"))
 
1322
                                db->addTableFromFile(dir, file_name, false);
 
1323
                }
 
1324
                release_(dir);
 
1325
 
 
1326
                path = CSPath::newPath(RETAIN(db_path), "bs-repository");
 
1327
                if (path->exists()) {
 
1328
                        dir = CSDirectory::newDirectory(path);
 
1329
                        push_(dir);
 
1330
                        dir->open();
 
1331
                        while (dir->next()) {
 
1332
                                file_name = dir->name();
 
1333
                                if (dir->isFile() && cs_is_extension(file_name, "bs")) {
 
1334
                                        if ((file_id = fileToTableId(file_name, "repo"))) {
 
1335
                                                dir->info(NULL, &file_size, NULL);
 
1336
                                                new_(repo, MSRepository(file_id, db, file_size));
 
1337
                                                db->myRepostoryList->set(file_id - 1, repo);
 
1338
                                        }
 
1339
                                }
 
1340
                        }
 
1341
                        release_(dir);
 
1342
                }
 
1343
                else {
 
1344
                        path->makeDir();
 
1345
                        path->release();
 
1346
                }
 
1347
 
 
1348
                path = CSPath::newPath(RETAIN(db_path), "bs-logs");
 
1349
                if (path->exists()) {
 
1350
                        dir = CSDirectory::newDirectory(path);
 
1351
                        push_(dir);
 
1352
                        dir->open();
 
1353
                        while (dir->next()) {
 
1354
                                file_name = dir->name();
 
1355
                                if (dir->isFile()) {
 
1356
                                        if (cs_is_extension(file_name, "bs")) {
 
1357
                                                if ((file_id = fileToTableId(file_name, "temp"))) {
 
1358
                                                        dir->info(NULL, &file_size, NULL);
 
1359
                                                        new_(log, MSTempLog(file_id, db, file_size));
 
1360
                                                        db->myTempLogArray->set(file_id, log);
 
1361
                                                }
 
1362
                                        }
 
1363
                                        else if (cs_is_extension(file_name, "bst")) {
 
1364
                                                db->addTableFromFile(dir, file_name, true);
 
1365
                                                to_delete++;
 
1366
                                        }
 
1367
                                }
 
1368
                        }
 
1369
                        release_(dir);
 
1370
                }
 
1371
                else {
 
1372
                        path->makeDir();
 
1373
                        path->release();
 
1374
                }
 
1375
 
 
1376
                if (to_delete) {
 
1377
                        /* Go through and prepare all the tables that are to
 
1378
                         * be deleted:
 
1379
                         */
 
1380
                        uint32_t        i = 0;
 
1381
                        MSTable *tab;
 
1382
                        
 
1383
                        while ((tab = (MSTable *) db->iTableList->itemAt(i))) {
 
1384
                                if (tab->isToDelete())
 
1385
                                        tab->prepareToDelete();
 
1386
                                i++;
 
1387
                        }
 
1388
                }
 
1389
 
 
1390
        }
 
1391
        pop_(db);
 
1392
 
 
1393
        return_(db);
 
1394
}
 
1395
 
 
1396
void MSDatabase::startThreads()
 
1397
{
 
1398
        enter_();
 
1399
 
 
1400
        if (myIsPBMS)
 
1401
                exit_();
 
1402
                
 
1403
#ifdef HAVE_ALIAS_SUPPORT
 
1404
        // iBlobAliases->ma_open() must be called before starting any threads.
 
1405
        iBlobAliases->ma_open(); 
 
1406
#endif
 
1407
 
 
1408
        new_(myTempLogThread, MSTempLogThread(1 * 1000, this));
 
1409
        myTempLogThread->start();
 
1410
 
 
1411
#ifdef MS_COMPACTOR_POLLS
 
1412
        new_(myCompactorThread, MSCompactorThread(MS_COMPACTOR_POLL_FREQ, this));
 
1413
#else
 
1414
        new_(myCompactorThread, MSCompactorThread(MS_DEFAULT_COMPACTOR_WAIT * 1000, this));
 
1415
#endif
 
1416
 
 
1417
        
 
1418
        myCompactorThread->start();
 
1419
        exit_();
 
1420
}
 
1421
 
 
1422
 
 
1423
void MSDatabase::dropDatabase() 
 
1424
{
 
1425
        enter_();
 
1426
 
 
1427
        iDropping = true;
 
1428
        iClosing = true;
 
1429
        
 
1430
        if (iBackupThread) {
 
1431
                iBackupThread->stop();
 
1432
                iBackupThread->release();
 
1433
                iBackupThread = NULL;
 
1434
        }
 
1435
        
 
1436
        if (myTempLogThread) {
 
1437
                myTempLogThread->stop();
 
1438
                myTempLogThread->release();
 
1439
                myTempLogThread = NULL;
 
1440
        }
 
1441
        
 
1442
        if (myCompactorThread) {
 
1443
                myRepostoryList->wakeup(); // The compator thread waits on this.
 
1444
                myCompactorThread->stop();
 
1445
                myCompactorThread->release();
 
1446
                myCompactorThread = NULL;
 
1447
        }
 
1448
        
 
1449
        // Call cloud drop database even if the database is not currrently
 
1450
        // using cloud storage just in case to was in the past. If the connection
 
1451
        // to the cloud is not setup then nothing will be done.
 
1452
        try_(a) {
 
1453
                myBlobCloud->cl_dropDB();
 
1454
        }
 
1455
        catch_(a) {
 
1456
                self->logException();
 
1457
        }
 
1458
        cont_(a);
 
1459
        exit_();
 
1460
}
 
1461
 
 
1462
void MSDatabase::removeDatabasePath(CSString *doomedDatabasePath ) 
 
1463
{
 
1464
        CSPath *path = NULL;
 
1465
        CSDirectory *dir = NULL;
 
1466
        const char *file_name;
 
1467
        enter_();
 
1468
        
 
1469
        push_(doomedDatabasePath);
 
1470
        
 
1471
        // Delete repository files
 
1472
        path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-repository");
 
1473
        push_(path);
 
1474
        if (path->exists()) {
 
1475
                dir = CSDirectory::newDirectory(RETAIN(path));
 
1476
                push_(dir);
 
1477
                dir->open();
 
1478
                while (dir->next()) {
 
1479
                        file_name = dir->name();
 
1480
                        if (dir->isFile() && cs_is_extension(file_name, "bs")) {
 
1481
                                dir->deleteEntry();
 
1482
                        }
 
1483
                }
 
1484
                release_(dir);
 
1485
                if (path->isEmpty())
 
1486
                        path->removeDir();
 
1487
        }
 
1488
        release_(path);
 
1489
 
 
1490
        // Delete temp log files.
 
1491
        path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-logs");
 
1492
        push_(path);
 
1493
        if (path->exists()) {
 
1494
                dir = CSDirectory::newDirectory(RETAIN(path));
 
1495
                push_(dir);
 
1496
                dir->open();
 
1497
                while (dir->next()) {
 
1498
                        file_name = dir->name();
 
1499
                        if (dir->isFile() && (cs_is_extension(file_name, "bs") || cs_is_extension(file_name, "bst"))) {
 
1500
                                dir->deleteEntry();
 
1501
                        }
 
1502
                }
 
1503
                release_(dir);
 
1504
                if (path->isEmpty())
 
1505
                        path->removeDir();
 
1506
        }
 
1507
        release_(path);
 
1508
 
 
1509
        // Delete table reference files.
 
1510
        dir = CSDirectory::newDirectory(RETAIN(doomedDatabasePath));
 
1511
        push_(dir);
 
1512
        dir->open();
 
1513
        while (dir->next()) {
 
1514
                file_name = dir->name();
 
1515
                if (dir->isFile() && cs_is_extension(file_name, "bst"))
 
1516
                        dir->deleteEntry();
 
1517
        }
 
1518
        release_(dir);
 
1519
 
 
1520
#ifdef HAVE_ALIAS_SUPPORT
 
1521
        path = CSPath::newPath(RETAIN(doomedDatabasePath), ACTIVE_ALIAS_INDEX);
 
1522
        push_(path);
 
1523
        path->removeFile();
 
1524
        release_(path);
 
1525
#endif
 
1526
 
 
1527
        PBMSSystemTables::removeSystemTables(RETAIN(doomedDatabasePath));
 
1528
        
 
1529
        path = CSPath::newPath(RETAIN(doomedDatabasePath));
 
1530
        push_(path);
 
1531
        if (path->isEmpty() && !path->isLink()) {
 
1532
                path->removeDir();
 
1533
        } else { 
 
1534
                CSStringBuffer *new_name;
 
1535
                // If the database folder is not empty we rename it to get it out of the way.
 
1536
                // If it is not renamed it will be reused if a database with the same name is
 
1537
                // created again wich will result in the database ID being reused which may
 
1538
                // have some bad side effects.
 
1539
                new_(new_name, CSStringBuffer());
 
1540
                push_(new_name);
 
1541
                new_name->append(cs_last_name_of_path(doomedDatabasePath->getCString()));
 
1542
                new_name->append("_DROPPED");
 
1543
                path->rename(new_name->getCString());
 
1544
                release_(new_name);
 
1545
        }
 
1546
        release_(path);
 
1547
 
 
1548
        release_(doomedDatabasePath);
 
1549
        
 
1550
        path = CSPath::newPath(PBMSDaemon::getPBMSDir());
 
1551
        push_(path);
 
1552
        if (path->isEmpty() && !path->isLink()) {
 
1553
                path->removeDir();
 
1554
        }
 
1555
        release_(path);
 
1556
 
 
1557
        exit_();
 
1558
}
 
1559
 
 
1560
/* Drop the PBMS database if it exists.
 
1561
 * The root folder 'pbms' will be deleted also
 
1562
 * if it is empty and not a symbolic link.
 
1563
 * The database folder in 'pbms' is deleted if it is empty and
 
1564
 * it is not a symbolic link.
 
1565
 */
 
1566
void MSDatabase::dropDatabase(MSDatabase *doomedDatabase, const char *db_name ) 
 
1567
{
 
1568
        CSString *doomedDatabasePath = NULL;
 
1569
 
 
1570
        enter_();
 
1571
        
 
1572
        if (doomedDatabase) {
 
1573
                push_(doomedDatabase);
 
1574
                
 
1575
                // Remove any pending transactions for the dropped database.
 
1576
                // This is important because if the database is restored it will have the
 
1577
                // same database ID and the old transactions would be applied to it.
 
1578
                MSTransactionManager::dropDatabase(doomedDatabase->myDatabaseID);
 
1579
                
 
1580
                doomedDatabasePath = doomedDatabase->myDatabasePath;
 
1581
                doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
 
1582
                
 
1583
                MSTableList::removeDatabaseTables(RETAIN(doomedDatabase));
 
1584
                MSSystemTableShare::removeDatabaseSystemTables(RETAIN(doomedDatabase));
 
1585
 
 
1586
                doomedDatabase->dropDatabase(); // Shutdown database threads.
 
1587
                
 
1588
                // To avoid a deadlock a lock is not taken on the database list
 
1589
                // if shutdown is in progress. The only database that would be
 
1590
                // dropped during a shutdown is an incomplete backup database.
 
1591
                ASSERT(doomedDatabase->isBackup || !self->myMustQuit);
 
1592
                if (!self->myMustQuit) 
 
1593
                        lock_(gDatabaseList); // Be sure to shutdown the database before locking this or it can lead to deadlocks
 
1594
                        
 
1595
                gDatabaseArray->remove(doomedDatabase->myDatabaseID);
 
1596
                if (!doomedDatabase->isBackup)
 
1597
                        gDatabaseList->remove(doomedDatabase->getKey());
 
1598
                if (!self->myMustQuit) 
 
1599
                        unlock_(gDatabaseList); 
 
1600
                ASSERT(doomedDatabase->iRefCount == 1);
 
1601
                release_(doomedDatabase);
 
1602
                
 
1603
        } else {
 
1604
                CSPath *path;
 
1605
                bool create = false;
 
1606
                uint32_t db_id;
 
1607
                
 
1608
                path = createDatabasePath(ms_my_get_mysql_home_path(), CSString::newString(db_name), &db_id, &create);
 
1609
                
 
1610
                if (path) {
 
1611
                        MSTransactionManager::dropDatabase(db_id);
 
1612
 
 
1613
                        push_(path);
 
1614
                        doomedDatabasePath = path->getString();
 
1615
                        doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
 
1616
                        release_(path);
 
1617
                }
 
1618
        }
 
1619
        
 
1620
        if (doomedDatabasePath)
 
1621
                removeDatabasePath(doomedDatabasePath);
 
1622
        
 
1623
        exit_();
 
1624
}
 
1625
 
 
1626
void MSDatabase::dropDatabase(const char *db_name ) 
 
1627
{
 
1628
        enter_();
 
1629
        dropDatabase(getDatabase(db_name, false), db_name);
 
1630
        exit_();
 
1631
}
 
1632
 
 
1633
// The table_path can be several things here:
 
1634
// 1: <absalute path>/<database>/<table>
 
1635
// 2: <absalute path>/<database>
 
1636
// 3: <database>/<table>
 
1637
bool MSDatabase::convertTablePathToIDs(const char *table_path, uint32_t *db_id, uint32_t *tab_id, bool create) 
 
1638
{
 
1639
        const char      *base = ms_my_get_mysql_home_path();
 
1640
        CSString        *table_url;
 
1641
        CSString        *db_path = NULL;
 
1642
        CSString        *db_name = NULL;
 
1643
        CSString        *tab_name = NULL;
 
1644
        MSDatabase      *db;
 
1645
        enter_();
 
1646
        
 
1647
        *db_id = 0;
 
1648
        *tab_id = 0;
 
1649
        
 
1650
        table_url = CSString::newString(table_path);
 
1651
        if (table_url->startsWith(base)) {
 
1652
                table_url = table_url->right(base);
 
1653
        }
 
1654
        push_(table_url);
 
1655
 
 
1656
 
 
1657
        db_path = table_url->left("/", -1);
 
1658
        push_(db_path);
 
1659
        tab_name = table_url->right("/", -1);
 
1660
        
 
1661
        pop_(db_path);
 
1662
        release_(table_url);
 
1663
 
 
1664
        if (db_path->length() == 0) { // Only a database name was supplied.
 
1665
                db_path->release();
 
1666
                db_name = tab_name;
 
1667
                tab_name = NULL;
 
1668
        } else {
 
1669
                if (tab_name->length() == 0) {
 
1670
                        tab_name->release();
 
1671
                        tab_name = NULL;
 
1672
                } else
 
1673
                        push_(tab_name);
 
1674
                push_(db_path);
 
1675
                db_name = db_path->right("/", -1);
 
1676
                pop_(db_path);
 
1677
                if (db_name->length() == 0) {
 
1678
                        db_name->release();
 
1679
                        db_name = db_path;
 
1680
                } else {
 
1681
                        db_path->release();
 
1682
                        db_path = NULL;
 
1683
                }
 
1684
        }
 
1685
        
 
1686
        db = MSDatabase::getDatabase(db_name, create); // This will release db_name
 
1687
        if (db) {
 
1688
                *db_id = db->myDatabaseID;
 
1689
                if (tab_name) {
 
1690
                        MSTable         *tab;
 
1691
                        pop_(tab_name);
 
1692
                        push_(db);
 
1693
                        tab = db->getTable(tab_name, create);// This will release tab_name
 
1694
                        pop_(db);
 
1695
                        if (tab) {
 
1696
                                *tab_id = tab->myTableID;
 
1697
                                tab->release();
 
1698
                        }
 
1699
                }
 
1700
                        
 
1701
                db->release();
 
1702
        }
 
1703
        
 
1704
        return_((*tab_id > 0) && (*db_id > 0));
 
1705
}
 
1706
 
 
1707
bool MSDatabase::convertTableAndDatabaseToIDs(const char *db_name, const char *tab_name, uint32_t *db_id, uint32_t *tab_id, bool create) 
 
1708
{
 
1709
        MSDatabase      *db;
 
1710
        enter_();
 
1711
        
 
1712
        *db_id = 0;
 
1713
        *tab_id = 0;
 
1714
        
 
1715
        db = MSDatabase::getDatabase(db_name, create); 
 
1716
        if (db) {
 
1717
                push_(db);
 
1718
                *db_id = db->myDatabaseID;
 
1719
                if (tab_name) {
 
1720
                        MSTable         *tab;
 
1721
                        tab = db->getTable(tab_name, create);
 
1722
                        if (tab) {
 
1723
                                *tab_id = tab->myTableID;
 
1724
                                tab->release();
 
1725
                        }
 
1726
                }
 
1727
                        
 
1728
                release_(db);
 
1729
        }
 
1730
        
 
1731
        return_((*tab_id > 0) && (*db_id > 0));
 
1732
}
 
1733
 
 
1734
MSDatabase *MSDatabase::loadDatabase(CSString *db_name, bool create)
 
1735
{
 
1736
        MSDatabase *db;
 
1737
        enter_();
 
1738
        
 
1739
        db = newDatabase(ms_my_get_mysql_home_path(), db_name, 0, create);
 
1740
        
 
1741
        if (db) {
 
1742
                push_(db);
 
1743
                
 
1744
                gDatabaseList->add(RETAIN(db));
 
1745
                
 
1746
                gDatabaseArray->set(db->myDatabaseID, RETAIN(db));
 
1747
                db->startThreads();
 
1748
                PBMSSystemTables::loadSystemTables(RETAIN(db));
 
1749
                        
 
1750
                pop_(db);
 
1751
        }
 
1752
        return_(db);
 
1753
}
 
1754
 
 
1755
uint32_t MSDatabase::getDatabaseID(CSString *db_name, bool create)
 
1756
{
 
1757
        MSDatabase *db;
 
1758
        uint32_t id = 0;
 
1759
        enter_();
 
1760
        push_(db_name);
 
1761
        
 
1762
        
 
1763
        lock_(gDatabaseList);
 
1764
        if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
 
1765
                db = MSDatabase::loadDatabase(RETAIN(db_name), create);
 
1766
                if (!db)
 
1767
                        goto exit;
 
1768
                id = db->myDatabaseID;
 
1769
                db->release();
 
1770
        } else
 
1771
                id = db->myDatabaseID;
 
1772
        
 
1773
        exit:
 
1774
        unlock_(gDatabaseList);
 
1775
        release_(db_name);
 
1776
        return_(id);
 
1777
}
 
1778
 
 
1779
 
 
1780
uint32_t MSDatabase::getDatabaseID(const char *db_name, bool create)
 
1781
{
 
1782
        return getDatabaseID(CSString::newString(db_name), create);
 
1783
}
 
1784
 
 
1785
MSDatabase *MSDatabase::getBackupDatabase(CSString *db_location, CSString *db_name, uint32_t db_id, bool create)
 
1786
{
 
1787
        bool was_created = create; 
 
1788
        CSPath *path;
 
1789
        MSDatabase *db;
 
1790
        enter_();
 
1791
        
 
1792
        push_(db_location);
 
1793
        push_(db_name);
 
1794
        // If the db already exists and 'create' == true then the existing db
 
1795
        // must be deleted.
 
1796
        // Create the database path, if 'create' == false then it can return NULL
 
1797
        path = createDatabasePath(db_location->getCString(), RETAIN(db_name), &db_id, &was_created);
 
1798
        if (!path) {
 
1799
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
 
1800
        }
 
1801
        push_(path);
 
1802
        
 
1803
        // If we wanted to create it but it already exists then throw an error.
 
1804
        if ( create && !was_created) {
 
1805
                char str[120];
 
1806
                snprintf(str, 120, "Duplicate database: %s", db_name->getCString());
 
1807
                CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, str);
 
1808
        }
 
1809
                
 
1810
        release_(path);
 
1811
        pop_(db_name);
 
1812
        // everything looks OK
 
1813
        db = newDatabase(db_location->getCString(), db_name, db_id, create);
 
1814
        db->setBackupDatabase();
 
1815
        release_(db_location);
 
1816
        return_(db);
 
1817
}
 
1818