1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27
* The backup is done by creating a new database with the same name and ID in the
28
* backup location. Then the pbms_dump table in the source database is initialized
29
* for a sequential scan for backup. This has the effect of locking all current repository
30
* files. Then the equvalent of 'insert into dst_db.pbms_dump (select * from src_db.pbms_dump);'
37
#include <drizzled/common.h>
38
#include <drizzled/session.h>
39
#include <drizzled/table.h>
40
#include <drizzled/message/table.pb.h>
41
#include "drizzled/charset_info.h"
42
#include <drizzled/table_proto.h>
43
#include <drizzled/field.h>
44
#include <drizzled/field/varstring.h>
47
#include "cslib/CSConfig.h"
49
#include <sys/types.h>
52
#include "cslib/CSGlobal.h"
53
#include "cslib/CSStrUtil.h"
54
#include "cslib/CSStorage.h"
57
#include "system_table_ms.h"
58
#include "open_table_ms.h"
60
#include "database_ms.h"
61
#include "repository_ms.h"
62
#include "backup_ms.h"
63
#include "transaction_ms.h"
64
#include "systab_variable_ms.h"
65
#include "systab_backup_ms.h"
67
uint32_t MSBackupInfo::gMaxInfoRef;
68
CSSyncSparseArray *MSBackupInfo::gBackupInfo;
70
//==========================================
71
MSBackupInfo::MSBackupInfo( uint32_t id,
78
uint32_t cloudRef_arg,
79
uint32_t cloudBackupNo_arg ):
88
cloudRef(cloudRef_arg),
89
cloudBackupNo(cloudBackupNo_arg)
91
db_name = CSString::newString(name);
92
if (location && *location)
93
backupLocation = CSString::newString(location);
96
//-------------------------------
97
MSBackupInfo::~MSBackupInfo()
103
backupLocation->release();
106
//-------------------------------
107
void MSBackupInfo::startBackup(MSDatabase *pbms_db)
114
src_db = MSDatabase::getDatabase(db_id);
117
startTime = time(NULL);
119
src_db->startBackup(RETAIN(this));
125
MSBackupTable::saveTable(pbms_db);
129
//-------------------------------
130
MSBackupInfo *MSBackupInfo::startDump(MSDatabase *db, uint32_t cloud_ref, uint32_t backup_no)
139
ref_id = gMaxInfoRef++;
140
new_(info, MSBackupInfo(ref_id, db->myDatabaseName->getCString(), db->myDatabaseID, time(NULL), 0, true, NULL, cloud_ref, backup_no));
143
gBackupInfo->set(ref_id, RETAIN(info));
145
info->isRunning = true;
148
unlock_(gBackupInfo);
152
MSBackupTable::saveTable(db);
155
gBackupInfo->remove(ref_id);
162
//-------------------------------
163
void MSBackupInfo::backupCompleted(MSDatabase *db)
165
completionTime = time(NULL);
167
MSBackupTable::saveTable(db);
170
//-------------------------------
171
void MSBackupInfo::backupTerminated(MSDatabase *db)
177
gBackupInfo->remove(backupRefId);
178
unlock_(gBackupInfo);
181
MSBackupTable::saveTable(db);
185
//==========================================
186
MSBackup::MSBackup():
191
bu_BackupRunning(false),
192
bu_State(BU_COMPLETED),
193
bu_SourceDatabase(NULL),
201
bu_TransactionManagerSuspended(false)
205
MSBackup *MSBackup::newMSBackup(MSBackupInfo *info)
212
new_(bu, MSBackup());
214
bu->bu_Database = MSDatabase::getBackupDatabase(RETAIN(info->backupLocation), RETAIN(info->db_name), info->db_id, true);
223
void MSBackup::startBackup(MSDatabase *src_db)
225
CSSyncVector *repo_list;
226
bool compacting = false;
231
bu_SourceDatabase = src_db;
232
repo_list = bu_SourceDatabase->getRepositoryList();
233
// Suspend the compactor before locking the list.
234
bu_Compactor = bu_SourceDatabase->getCompactorThread();
236
bu_Compactor->retain();
237
bu_Compactor->suspend();
240
// Build the list of repositories to be backed up.
243
new_(bu_BackupList, CSVector(repo_list->size()));
244
for (uint32_t i = 0; i<repo_list->size(); i++) {
245
if ((repo = (MSRepository *) repo_list->get(i))) {
246
if (!repo->isRemovingFP && !repo->mustBeDeleted) {
247
bu_BackupList->add(RETAIN(repo));
248
if (repo->initBackup() == REPO_COMPACTING)
251
if (!repo->myRepoHeadSize) {
252
/* The file has not yet been opened, so the
253
* garbage count will not be known!
255
MSRepoFile *repo_file;
258
//unlock_(myRepostoryList);
260
repo_file = repo->openRepoFile();
261
repo_file->release();
263
//lock_(myRepostoryList);
267
bu_size += repo->myRepoFileSize;
273
// Copy the table list to the backup database:
274
uint32_t next_tab = 0;
276
while ((tab = bu_SourceDatabase->getNextTable(&next_tab))) {
278
bu_Database->addTable(tab->myTableID, tab->myTableName->getCString(), 0, false);
283
// Copy over any physical PBMS system tables.
284
PBMSSystemTables::transferSystemTables(RETAIN(bu_Database), RETAIN(bu_SourceDatabase));
286
// Load the system tables into the backup database. This will
287
// initialize the database for cloud storage if required.
288
PBMSSystemTables::loadSystemTables(RETAIN(bu_Database));
290
// Set the cloud backup info.
291
bu_Database->myBlobCloud->cl_setBackupInfo(RETAIN(bu_info));
294
// Set the backup number in the pbms_variable tabe. (This is a hidden value.)
295
// This value is used in case a drag and drop restore was done. When a data base is
296
// first loaded this value is checked and if it is not zero then the backup record
297
// will be read and any used to recover any BLOBs.
300
snprintf(value, 20, "%"PRIu32"", bu_info->getBackupRefId());
301
MSVariableTable::setVariable(RETAIN(bu_Database), BACKUP_NUMBER_VAR, value);
303
// Once the repositories are locked the compactor can be restarted
304
// unless it is in the process of compacting a repository that is
306
if (bu_Compactor && !compacting) {
307
bu_Compactor->resume();
308
bu_Compactor->release();
312
// Suspend the transaction writer while the backup is running.
313
MSTransactionManager::suspend(true);
314
bu_TransactionManagerSuspended = true;
316
// Start the backup daemon thread.
317
bu_ID = bu_start_time = time(NULL);
330
void MSBackup::completeBackup()
332
if (bu_TransactionManagerSuspended) {
333
MSTransactionManager::resume();
334
bu_TransactionManagerSuspended = false;
340
while (bu_BackupList->size()) {
341
repo = (MSRepository *) bu_BackupList->take(0);
343
repo->backupCompleted();
347
bu_BackupList->release();
348
bu_BackupList = NULL;
352
bu_Compactor->resume();
353
bu_Compactor->release();
358
if (bu_State == BU_COMPLETED)
359
bu_Database->releaseBackupDatabase();
361
MSDatabase::dropDatabase(bu_Database);
366
if (bu_SourceDatabase){
367
if (bu_State == BU_COMPLETED)
368
bu_info->backupCompleted(bu_SourceDatabase);
370
bu_info->backupTerminated(bu_SourceDatabase);
372
bu_SourceDatabase = NULL;
377
bu_BackupRunning = false;
380
bool MSBackup::doWork()
383
MSRepository *src_repo, *dst_repo;
384
MSRepoFile *src_file, *dst_file;
385
off64_t src_offset, prev_offset;
387
uint64_t blob_size, blob_data_size;
388
CSStringBuffer *head;
389
MSRepoPointersRec ptr;
390
uint32_t table_ref_count;
391
uint32_t blob_ref_count;
398
uint32_t src_repo_id;
400
uint8_t blob_storage_type;
403
char transferBuffer[MS_BACKUP_BUFFER_SIZE];
404
CloudKeyRec cloud_key;
408
bu_BackupRunning = true;
409
bu_State = BU_RUNNING;
415
myWaitTime = 5 * 1000; // Time in milli-seconds
422
new_(head, CSStringBuffer(100));
425
src_repo = (MSRepository*)bu_BackupList->get(0);
426
while (src_repo && !myMustQuit) {
428
src_file = src_repo->openRepoFile();
431
dst_repo = bu_Database->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
433
dst_file = dst_repo->openRepoFile();
436
src_repo_id = src_repo->myRepoID;
437
src_offset = src_repo->myRepoHeadSize;
439
while (src_offset < src_repo->myRepoFileSize) {
442
bu_completed += src_offset - prev_offset;
443
prev_offset = src_offset;
449
// A lock is required here because references and dereferences to the
450
// BLOBs can result in the repository record being updated while
451
// it is being copied.
452
my_lock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
454
head->setLength(src_repo->myRepoBlobHeadSize);
455
if (src_file->read(head->getBuffer(0), src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) {
460
ptr.rp_chars = head->getBuffer(0);
461
ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
462
ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
463
head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
464
blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
465
blob_data_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_data_size_6);
466
auth_code = CS_GET_DISK_4(ptr.rp_head->rb_auth_code_4);
467
status = CS_GET_DISK_1(ptr.rp_head->rb_status_1);
468
mod_time = CS_GET_DISK_4(ptr.rp_head->rb_mod_time_4);
470
blob_storage_type = CS_GET_DISK_1(ptr.rp_head->rb_storage_type_1);
471
if (blob_storage_type == MS_CLOUD_STORAGE) {
472
MSRepoFile::getBlobKey(ptr.rp_head, &cloud_key);
475
// If the BLOB was modified after the start of the backup
476
// then set the mod time to the backup time to ensure that
477
// a backup for update will work correctly.
478
if (mod_time > bu_start_time)
479
CS_SET_DISK_4(ptr.rp_head->rb_mod_time_4, bu_start_time);
481
// If the BLOB was moved during the time of this backup then copy
482
// it to the backup location as a referenced BLOB.
483
if ((status == MS_BLOB_MOVED) && (bu_ID == (uint32_t) CS_GET_DISK_4(ptr.rp_head->rb_backup_id_4))) {
484
status = MS_BLOB_REFERENCED;
485
CS_SET_DISK_1(ptr.rp_head->rb_status_1, status);
489
if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
490
head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
491
!VALID_BLOB_STATUS(status)) {
492
/* Can't be true. Assume this is garbage! */
499
if ((status == MS_BLOB_REFERENCED) || (status == MS_BLOB_MOVED)) {
500
head->setLength(head_size);
501
if (src_file->read(head->getBuffer(0) + src_repo->myRepoBlobHeadSize, src_offset + src_repo->myRepoBlobHeadSize, head_size - src_repo->myRepoBlobHeadSize, 0) != (head_size- src_repo->myRepoBlobHeadSize)) {
509
// Loop through all the references removing temporary references
510
// and counting table and blob references.
512
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
513
for (int count = 0; count < ref_count; count++) {
514
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
515
case MS_BLOB_FREE_REF:
517
case MS_BLOB_TABLE_REF:
518
// Unlike the compactor, table refs are not checked because
519
// they do not yet exist in the backup database.
522
case MS_BLOB_DELETE_REF:
523
// These are temporary references from the TempLog file.
524
// They are not copied to the backup.
525
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
528
// Must be a BLOB reference
530
tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
531
if (tab_index && (tab_index <= ref_count)) {
532
// Only committed references are backed up.
533
if (IS_COMMITTED(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8))) {
534
MSRepoTableRefPtr tab_ref;
535
tab_ref = (MSRepoTableRefPtr) (head->getBuffer(0) + src_repo->myRepoBlobHeadSize + (tab_index-1) * ref_size);
536
if (CS_GET_DISK_2(tab_ref->rr_type_2) == MS_BLOB_TABLE_REF)
539
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
543
/* Can't be true. Assume this is garbage! */
550
ptr.rp_chars += ref_size;
554
// If there are still blob references then the record needs to be backed up.
555
if (table_ref_count && blob_ref_count) {
559
dst_offset = dst_repo->myRepoFileSize;
561
/* Write the header. */
562
dst_file->write(head->getBuffer(0), dst_offset, head_size);
564
/* Copy the BLOB over: */
565
if (blob_storage_type == MS_CLOUD_STORAGE) {
566
bu_Database->myBlobCloud->cl_backupBLOB(&cloud_key);
568
CSFile::transfer(dst_file, dst_offset + head_size, src_file, src_offset + head_size, blob_size, transferBuffer, MS_BACKUP_BUFFER_SIZE);
570
/* Update the references: */
571
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
572
for (int count = 0; count < ref_count; count++) {
573
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
574
case MS_BLOB_FREE_REF:
575
case MS_BLOB_DELETE_REF:
577
case MS_BLOB_TABLE_REF:
578
tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
579
blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
581
if ((otab = MSTableList::getOpenTableByID(bu_Database->myDatabaseID, tab_id))) {
583
otab->getDBTable()->setBlobHandle(otab, blob_id, dst_repo->myRepoID, dst_offset, blob_size, head_size, auth_code);
584
//CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "What if an error ocurred here!");
592
ptr.rp_chars += ref_size;
595
dst_repo->myRepoFileSize += head_size + blob_size;
599
src_offset += head_size + blob_size;
601
bu_completed += src_offset - prev_offset;
603
// close the destination repository and cleanup.
605
backtopool_(dst_repo);
608
// release the source repository and get the next one in the list.
609
src_repo->backupCompleted();
610
bu_BackupList->remove(0);
612
src_repo = (MSRepository*)bu_BackupList->get(0);
617
bu_State = BU_TERMINATED;
619
bu_State = BU_COMPLETED;
633
void *MSBackup::completeWork()
635
if (bu_SourceDatabase || bu_BackupList || bu_Compactor || bu_info) {
636
// We shouldn't be here
637
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "MSBackup::completeBackup() not called");
638
if (bu_SourceDatabase) {
639
bu_SourceDatabase->release();
640
bu_SourceDatabase = NULL;
644
bu_BackupList->release();
645
bu_BackupList = NULL;
650
bu_Compactor->release();