~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pbms/src/backup_ms.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-11-12 12:26:01 UTC
  • mfrom: (1.1.1 upstream)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101112122601-myppfj3tfmlkccuq
Tags: upstream-2010.11.03
ImportĀ upstreamĀ versionĀ 2010.11.03

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
 
18
 *
 
19
 * Barry Leslie
 
20
 *
 
21
 * 2009-05-29
 
22
 *
 
23
 * H&G2JCtL
 
24
 *
 
25
 * Repository backup.
 
26
 *
 
27
 * The backup is done by creating a new database with the same name and ID in the 
 
28
 * backup location. Then the pbms_dump table in the source database is initialized
 
29
 * for a sequential scan for backup. This has the effect of locking all current repository
 
30
 * files. Then the equvalent of  'insert into dst_db.pbms_dump (select * from src_db.pbms_dump);'
 
31
 * is performed. 
 
32
 *
 
33
 */
 
34
 
 
35
#ifdef DRIZZLED
 
36
#include "config.h"
 
37
#include <drizzled/common.h>
 
38
#include <drizzled/session.h>
 
39
#include <drizzled/table.h>
 
40
#include <drizzled/message/table.pb.h>
 
41
#include "drizzled/charset_info.h"
 
42
#include <drizzled/table_proto.h>
 
43
#include <drizzled/field.h>
 
44
#include <drizzled/field/varstring.h>
 
45
#endif
 
46
 
 
47
#include "cslib/CSConfig.h"
 
48
 
 
49
#include <sys/types.h>
 
50
#include <inttypes.h>
 
51
 
 
52
#include "cslib/CSGlobal.h"
 
53
#include "cslib/CSStrUtil.h"
 
54
#include "cslib/CSStorage.h"
 
55
 
 
56
#include "defs_ms.h"
 
57
#include "system_table_ms.h"
 
58
#include "open_table_ms.h"
 
59
#include "table_ms.h"
 
60
#include "database_ms.h"
 
61
#include "repository_ms.h"
 
62
#include "backup_ms.h"
 
63
#include "transaction_ms.h"
 
64
#include "systab_variable_ms.h"
 
65
#include "systab_backup_ms.h"
 
66
 
 
67
uint32_t MSBackupInfo::gMaxInfoRef;
 
68
CSSyncSparseArray *MSBackupInfo::gBackupInfo;
 
69
 
 
70
//==========================================
 
71
MSBackupInfo::MSBackupInfo(     uint32_t id, 
 
72
                                                        const char *name, 
 
73
                                                        uint32_t db_id_arg, 
 
74
                                                        time_t start, 
 
75
                                                        time_t end, 
 
76
                                                        bool _isDump, 
 
77
                                                        const char *location, 
 
78
                                                        uint32_t cloudRef_arg, 
 
79
                                                        uint32_t cloudBackupNo_arg ):
 
80
        backupRefId(id),
 
81
        db_name(NULL),
 
82
        db_id(db_id_arg),
 
83
        startTime(start),
 
84
        completionTime(end),
 
85
        dump(_isDump),
 
86
        isRunning(false),
 
87
        backupLocation(NULL),
 
88
        cloudRef(cloudRef_arg),
 
89
        cloudBackupNo(cloudBackupNo_arg)
 
90
{
 
91
        db_name = CSString::newString(name);
 
92
        if (location && *location)              
 
93
                backupLocation = CSString::newString(location);         
 
94
}
 
95
 
 
96
//-------------------------------
 
97
MSBackupInfo::~MSBackupInfo()
 
98
{
 
99
        if (db_name)
 
100
                db_name->release();
 
101
        
 
102
        if (backupLocation)
 
103
                backupLocation->release();
 
104
}
 
105
 
 
106
//-------------------------------
 
107
void MSBackupInfo::startBackup(MSDatabase *pbms_db)
 
108
{
 
109
        MSDatabase *src_db;
 
110
        
 
111
        enter_();
 
112
        push_(pbms_db);
 
113
        
 
114
        src_db = MSDatabase::getDatabase(db_id);
 
115
        push_(src_db);
 
116
        
 
117
        startTime = time(NULL);
 
118
        
 
119
        src_db->startBackup(RETAIN(this));
 
120
        release_(src_db);
 
121
        
 
122
        isRunning = true;
 
123
        
 
124
        pop_(pbms_db);
 
125
        MSBackupTable::saveTable(pbms_db);
 
126
        exit_();
 
127
}
 
128
 
 
129
//-------------------------------
 
130
class StartDumpCleanUp : public CSRefObject {
 
131
        bool do_cleanup;
 
132
        uint32_t ref_id;
 
133
 
 
134
        public:
 
135
        
 
136
        StartDumpCleanUp(): CSRefObject(),
 
137
                do_cleanup(false){}
 
138
                
 
139
        ~StartDumpCleanUp() 
 
140
        {
 
141
                if (do_cleanup) {
 
142
                        MSBackupInfo::gBackupInfo->remove(ref_id);
 
143
                }
 
144
        }
 
145
        
 
146
        void setCleanUp(uint32_t id)
 
147
        {
 
148
                ref_id = id;
 
149
                do_cleanup = true;
 
150
        }
 
151
        
 
152
        void cancelCleanUp()
 
153
        {
 
154
                do_cleanup = false;
 
155
        }
 
156
        
 
157
};
 
158
 
 
159
MSBackupInfo *MSBackupInfo::startDump(MSDatabase *db, uint32_t cloud_ref, uint32_t backup_no)
 
160
{
 
161
        MSBackupInfo *info;
 
162
        uint32_t ref_id;
 
163
        StartDumpCleanUp *cleanup;
 
164
        
 
165
        enter_();
 
166
        push_(db);
 
167
        lock_(gBackupInfo);
 
168
        
 
169
        ref_id = gMaxInfoRef++;
 
170
        new_(info, MSBackupInfo(ref_id, db->myDatabaseName->getCString(), db->myDatabaseID, time(NULL), 0, true, NULL, cloud_ref, backup_no));
 
171
        push_(info);
 
172
        
 
173
        gBackupInfo->set(ref_id, RETAIN(info));
 
174
        
 
175
        info->isRunning = true;
 
176
 
 
177
        pop_(info);
 
178
        unlock_(gBackupInfo);
 
179
        push_(info);
 
180
        
 
181
        // Create a cleanup object to handle cleanup
 
182
        // after a possible exception.
 
183
        new_(cleanup, StartDumpCleanUp());
 
184
        push_(cleanup);
 
185
        cleanup->setCleanUp(ref_id);
 
186
        
 
187
        MSBackupTable::saveTable(RETAIN(db));
 
188
        
 
189
        cleanup->cancelCleanUp();
 
190
        release_(cleanup);
 
191
        
 
192
        pop_(info);
 
193
        release_(db);
 
194
 
 
195
        return_(info);
 
196
}
 
197
//-------------------------------
 
198
void MSBackupInfo::backupCompleted(MSDatabase *db)
 
199
{
 
200
        completionTime = time(NULL);    
 
201
        isRunning = false;
 
202
        MSBackupTable::saveTable(db);
 
203
}
 
204
 
 
205
//-------------------------------
 
206
void MSBackupInfo::backupTerminated(MSDatabase *db)
 
207
{
 
208
        enter_();
 
209
        push_(db);
 
210
        lock_(gBackupInfo);
 
211
        
 
212
        gBackupInfo->remove(backupRefId);
 
213
        unlock_(gBackupInfo);
 
214
        
 
215
        pop_(db);
 
216
        MSBackupTable::saveTable(db);
 
217
        exit_();
 
218
}
 
219
 
 
220
//==========================================
 
221
MSBackup::MSBackup():
 
222
CSDaemon(NULL),
 
223
bu_info(NULL),
 
224
bu_BackupList(NULL),
 
225
bu_Compactor(NULL),
 
226
bu_BackupRunning(false),
 
227
bu_State(BU_COMPLETED),
 
228
bu_SourceDatabase(NULL),
 
229
bu_Database(NULL),
 
230
bu_dst_dump(NULL),
 
231
bu_src_dump(NULL),
 
232
bu_size(0),
 
233
bu_completed(0),
 
234
bu_ID(0),
 
235
bu_start_time(0),
 
236
bu_TransactionManagerSuspended(false)
 
237
{
 
238
}
 
239
 
 
240
MSBackup *MSBackup::newMSBackup(MSBackupInfo *info)
 
241
{
 
242
        MSBackup *bu;
 
243
        enter_();
 
244
        
 
245
        push_(info);
 
246
        
 
247
        new_(bu, MSBackup());
 
248
        push_(bu);
 
249
        bu->bu_Database = MSDatabase::getBackupDatabase(RETAIN(info->backupLocation), RETAIN(info->db_name), info->db_id, true);
 
250
        pop_(bu);
 
251
        
 
252
        bu->bu_info = info;
 
253
        pop_(info);
 
254
 
 
255
        return_(bu);
 
256
}
 
257
 
 
258
//-------------------------------
 
259
class StartBackupCleanUp : public CSRefObject {
 
260
        bool do_cleanup;
 
261
        MSBackup *backup;
 
262
 
 
263
        public:
 
264
        
 
265
        StartBackupCleanUp(): CSRefObject(),
 
266
                do_cleanup(false){}
 
267
                
 
268
        ~StartBackupCleanUp() 
 
269
        {
 
270
                if (do_cleanup) {
 
271
                        backup->completeBackup();
 
272
                }
 
273
        }
 
274
        
 
275
        void setCleanUp(MSBackup *bup)
 
276
        {
 
277
                backup = bup;
 
278
                do_cleanup = true;
 
279
        }
 
280
        
 
281
        void cancelCleanUp()
 
282
        {
 
283
                do_cleanup = false;
 
284
        }
 
285
        
 
286
};
 
287
 
 
288
void MSBackup::startBackup(MSDatabase *src_db)
 
289
{
 
290
        CSSyncVector    *repo_list;
 
291
        bool                    compacting = false;
 
292
        MSRepository    *repo;
 
293
        StartBackupCleanUp *cleanup;
 
294
        enter_();
 
295
 
 
296
        // Create a cleanup object to handle cleanup
 
297
        // after a possible exception.
 
298
        new_(cleanup, StartBackupCleanUp());
 
299
        push_(cleanup);
 
300
        cleanup->setCleanUp(this);
 
301
 
 
302
        bu_SourceDatabase = src_db;
 
303
        repo_list = bu_SourceDatabase->getRepositoryList();
 
304
        // Suspend the compactor before locking the list.
 
305
        bu_Compactor = bu_SourceDatabase->getCompactorThread();
 
306
        if (bu_Compactor) {
 
307
                bu_Compactor->retain();
 
308
                bu_Compactor->suspend();
 
309
        }
 
310
 
 
311
        // Build the list of repositories to be backed up.
 
312
        lock_(repo_list);
 
313
 
 
314
        new_(bu_BackupList, CSVector(repo_list->size()));
 
315
        for (uint32_t i = 0; i<repo_list->size(); i++) {
 
316
                if ((repo = (MSRepository *) repo_list->get(i))) {
 
317
                        if (!repo->isRemovingFP && !repo->mustBeDeleted) {
 
318
                                bu_BackupList->add(RETAIN(repo));
 
319
                                if (repo->initBackup() == REPO_COMPACTING) 
 
320
                                        compacting = true; 
 
321
                                
 
322
                                if (!repo->myRepoHeadSize) {
 
323
                                        /* The file has not yet been opened, so the
 
324
                                         * garbage count will not be known!
 
325
                                         */
 
326
                                        MSRepoFile *repo_file;
 
327
 
 
328
                                        //repo->retain();
 
329
                                        //unlock_(myRepostoryList);
 
330
                                        //push_(repo);
 
331
                                        repo_file = repo->openRepoFile();
 
332
                                        repo_file->release();
 
333
                                        //release_(repo);
 
334
                                        //lock_(myRepostoryList);
 
335
                                        //goto retry;
 
336
                                }
 
337
                                
 
338
                                bu_size += repo->myRepoFileSize; 
 
339
 
 
340
                        }
 
341
                }
 
342
        }
 
343
        
 
344
        // Copy the table list to the backup database:
 
345
        uint32_t                next_tab = 0;
 
346
        MSTable         *tab;
 
347
        while ((tab = bu_SourceDatabase->getNextTable(&next_tab))) {
 
348
                push_(tab);
 
349
                bu_Database->addTable(tab->myTableID, tab->myTableName->getCString(), 0, false);
 
350
                release_(tab);
 
351
        }
 
352
        unlock_(repo_list);
 
353
        
 
354
        // Copy over any physical PBMS system tables.
 
355
        PBMSSystemTables::transferSystemTables(RETAIN(bu_Database), RETAIN(bu_SourceDatabase));
 
356
 
 
357
        // Load the system tables into the backup database. This will
 
358
        // initialize the database for cloud storage if required.
 
359
        PBMSSystemTables::loadSystemTables(RETAIN(bu_Database));
 
360
        
 
361
        // Set the cloud backup info.
 
362
        bu_Database->myBlobCloud->cl_setBackupInfo(RETAIN(bu_info));
 
363
        
 
364
        
 
365
        // Set the backup number in the pbms_variable tabe. (This is a hidden value.)
 
366
        // This value is used in case a drag and drop restore was done. When a data base is
 
367
        // first loaded this value is checked and if it is not zero then the backup record
 
368
        // will be read and any used to recover any BLOBs.
 
369
        // 
 
370
        char value[20];
 
371
        snprintf(value, 20, "%"PRIu32"", bu_info->getBackupRefId());
 
372
        MSVariableTable::setVariable(RETAIN(bu_Database), BACKUP_NUMBER_VAR, value);
 
373
        
 
374
        // Once the repositories are locked the compactor can be restarted
 
375
        // unless it is in the process of compacting a repository that is
 
376
        // being backed up.
 
377
        if (bu_Compactor && !compacting) {      
 
378
                bu_Compactor->resume();         
 
379
                bu_Compactor->release();                
 
380
                bu_Compactor = NULL;            
 
381
        }
 
382
        
 
383
        // Suspend the transaction writer while the backup is running.
 
384
        MSTransactionManager::suspend(true);
 
385
        bu_TransactionManagerSuspended = true;
 
386
        
 
387
        // Start the backup daemon thread.
 
388
        bu_ID = bu_start_time = time(NULL);
 
389
        start();
 
390
        
 
391
        cleanup->cancelCleanUp();
 
392
        release_(cleanup);
 
393
 
 
394
        exit_();
 
395
}
 
396
 
 
397
void MSBackup::completeBackup()
 
398
{
 
399
        if (bu_TransactionManagerSuspended) {   
 
400
                MSTransactionManager::resume();
 
401
                bu_TransactionManagerSuspended = false;
 
402
        }
 
403
 
 
404
        if (bu_BackupList) {
 
405
                MSRepository *repo;             
 
406
                
 
407
                while (bu_BackupList->size()) {
 
408
                        repo = (MSRepository *) bu_BackupList->take(0);
 
409
                        if (repo) {                             
 
410
                                repo->backupCompleted();
 
411
                                repo->release();                                
 
412
                        }
 
413
                }
 
414
                bu_BackupList->release();
 
415
                bu_BackupList = NULL;
 
416
        }
 
417
                
 
418
        if (bu_Compactor) {
 
419
                bu_Compactor->resume();
 
420
                bu_Compactor->release();
 
421
                bu_Compactor = NULL;
 
422
        }
 
423
        
 
424
        if (bu_Database) {
 
425
                if (bu_State == BU_COMPLETED)
 
426
                        bu_Database->releaseBackupDatabase();
 
427
                else 
 
428
                        MSDatabase::dropDatabase(bu_Database);
 
429
                        
 
430
                bu_Database = NULL;
 
431
        }
 
432
 
 
433
        if (bu_SourceDatabase){
 
434
                if (bu_State == BU_COMPLETED) 
 
435
                        bu_info->backupCompleted(bu_SourceDatabase);
 
436
                else 
 
437
                        bu_info->backupTerminated(bu_SourceDatabase);
 
438
                
 
439
                bu_SourceDatabase = NULL;
 
440
                bu_info->release();
 
441
                bu_info = NULL;
 
442
        }
 
443
        
 
444
        bu_BackupRunning = false;
 
445
}
 
446
 
 
447
bool MSBackup::doWork()
 
448
{
 
449
        enter_();
 
450
        try_(a) {
 
451
                CSMutex                         *my_lock;
 
452
                MSRepository            *src_repo, *dst_repo;
 
453
                MSRepoFile                      *src_file, *dst_file;
 
454
                off64_t                         src_offset, prev_offset;
 
455
                uint16_t                                head_size;
 
456
                uint64_t                                blob_size, blob_data_size;
 
457
                CSStringBuffer          *head;
 
458
                MSRepoPointersRec       ptr;
 
459
                uint32_t                                table_ref_count;
 
460
                uint32_t                                blob_ref_count;
 
461
                int                                     ref_count;
 
462
                size_t                          ref_size;
 
463
                uint32_t                                auth_code;
 
464
                uint32_t                                tab_id;
 
465
                uint64_t                                blob_id;
 
466
                MSOpenTable                     *otab;
 
467
                uint32_t                                src_repo_id;
 
468
                uint8_t                         status;
 
469
                uint8_t                         blob_storage_type;
 
470
                uint16_t                                tab_index;
 
471
                uint32_t                                mod_time;
 
472
                char                            transferBuffer[MS_BACKUP_BUFFER_SIZE];
 
473
                CloudKeyRec                     cloud_key;
 
474
 
 
475
        
 
476
                bu_BackupRunning = true;
 
477
                bu_State = BU_RUNNING; 
 
478
 
 
479
        /*
 
480
                // For testing:
 
481
                {
 
482
                        int blockit = 0;
 
483
                        myWaitTime = 5 * 1000;  // Time in milli-seconds
 
484
                        while (blockit)
 
485
                                return_(true);
 
486
                }
 
487
        */
 
488
        
 
489
                new_(head, CSStringBuffer(100));
 
490
                push_(head);
 
491
 
 
492
                src_repo = (MSRepository*)bu_BackupList->get(0);
 
493
                while (src_repo && !myMustQuit) {
 
494
                        src_offset = 0;
 
495
                        src_file = src_repo->openRepoFile();
 
496
                        push_(src_file);
 
497
 
 
498
                        dst_repo = bu_Database->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
 
499
                        frompool_(dst_repo);
 
500
                        dst_file = dst_repo->openRepoFile();
 
501
                        push_(dst_file);
 
502
                        
 
503
                        src_repo_id = src_repo->myRepoID;
 
504
                        src_offset = src_repo->myRepoHeadSize;
 
505
                        prev_offset = 0;
 
506
                        while (src_offset < src_repo->myRepoFileSize) { 
 
507
        retry_read:
 
508
                                        
 
509
                                bu_completed += src_offset - prev_offset;
 
510
                                prev_offset = src_offset;
 
511
                                suspended();
 
512
 
 
513
                                if (myMustQuit)
 
514
                                        break;
 
515
                                
 
516
                                // A lock is required here because references and dereferences to the
 
517
                                // BLOBs can result in the repository record being updated while 
 
518
                                // it is being copied.
 
519
                                my_lock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
 
520
                                lock_(my_lock);
 
521
                                head->setLength(src_repo->myRepoBlobHeadSize);
 
522
                                if (src_file->read(head->getBuffer(0), src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) { 
 
523
                                        unlock_(my_lock);
 
524
                                        break;
 
525
                                }
 
526
                                        
 
527
                                ptr.rp_chars = head->getBuffer(0);
 
528
                                ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
 
529
                                ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
 
530
                                head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
 
531
                                blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
 
532
                                blob_data_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_data_size_6);
 
533
                                auth_code = CS_GET_DISK_4(ptr.rp_head->rb_auth_code_4);
 
534
                                status = CS_GET_DISK_1(ptr.rp_head->rb_status_1);
 
535
                                mod_time = CS_GET_DISK_4(ptr.rp_head->rb_mod_time_4);
 
536
                                
 
537
                                blob_storage_type = CS_GET_DISK_1(ptr.rp_head->rb_storage_type_1);
 
538
                                if (blob_storage_type == MS_CLOUD_STORAGE) {
 
539
                                        MSRepoFile::getBlobKey(ptr.rp_head, &cloud_key);
 
540
                                }
 
541
 
 
542
                                // If the BLOB was modified after the start of the backup
 
543
                                // then set the mod time to the backup time to ensure that
 
544
                                // a backup for update will work correctly.
 
545
                                if (mod_time > bu_start_time)
 
546
                                        CS_SET_DISK_4(ptr.rp_head->rb_mod_time_4, bu_start_time);
 
547
                                        
 
548
                                // If the BLOB was moved during the time of this backup then copy
 
549
                                // it to the backup location as a referenced BLOB.
 
550
                                if ((status == MS_BLOB_MOVED)  && (bu_ID == (uint32_t) CS_GET_DISK_4(ptr.rp_head->rb_backup_id_4))) {
 
551
                                        status = MS_BLOB_REFERENCED;
 
552
                                        CS_SET_DISK_1(ptr.rp_head->rb_status_1, status);
 
553
                                }
 
554
                                
 
555
                                // sanity check
 
556
                                if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
 
557
                                        head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
 
558
                                        !VALID_BLOB_STATUS(status)) {
 
559
                                        /* Can't be true. Assume this is garbage! */
 
560
                                        src_offset++;
 
561
                                        unlock_(my_lock);
 
562
                                        continue;
 
563
                                }
 
564
                                
 
565
                                
 
566
                                if ((status == MS_BLOB_REFERENCED) || (status == MS_BLOB_MOVED)) {
 
567
                                        head->setLength(head_size);
 
568
                                        if (src_file->read(head->getBuffer(0) + src_repo->myRepoBlobHeadSize, src_offset + src_repo->myRepoBlobHeadSize, head_size  - src_repo->myRepoBlobHeadSize, 0) != (head_size- src_repo->myRepoBlobHeadSize)) {
 
569
                                                unlock_(my_lock);
 
570
                                                break;
 
571
                                        }
 
572
 
 
573
                                        table_ref_count = 0;
 
574
                                        blob_ref_count = 0;
 
575
                                        
 
576
                                        // Loop through all the references removing temporary references 
 
577
                                        // and counting table and blob references.
 
578
                                        
 
579
                                        ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
 
580
                                        for (int count = 0; count < ref_count; count++) {
 
581
                                                switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
 
582
                                                        case MS_BLOB_FREE_REF:
 
583
                                                                break;
 
584
                                                        case MS_BLOB_TABLE_REF:
 
585
                                                                // Unlike the compactor, table refs are not checked because
 
586
                                                                // they do not yet exist in the backup database.
 
587
                                                                table_ref_count++;
 
588
                                                                break;
 
589
                                                        case MS_BLOB_DELETE_REF:
 
590
                                                                // These are temporary references from the TempLog file. 
 
591
                                                                // They are not copied to the backup. 
 
592
                                                                CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
 
593
                                                                break;
 
594
                                                        default:
 
595
                                                                // Must be a BLOB reference
 
596
                                                                
 
597
                                                                tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
 
598
                                                                if (tab_index && (tab_index <= ref_count)) {
 
599
                                                                        // Only committed references are backed up.
 
600
                                                                        if (IS_COMMITTED(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8))) {
 
601
                                                                                MSRepoTableRefPtr       tab_ref;
 
602
                                                                                tab_ref = (MSRepoTableRefPtr) (head->getBuffer(0) + src_repo->myRepoBlobHeadSize + (tab_index-1) * ref_size);
 
603
                                                                                if (CS_GET_DISK_2(tab_ref->rr_type_2) == MS_BLOB_TABLE_REF)
 
604
                                                                                        blob_ref_count++;
 
605
                                                                        } else {
 
606
                                                                                CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
 
607
                                                                        }
 
608
                                                                
 
609
                                                                } else {
 
610
                                                                        /* Can't be true. Assume this is garbage! */
 
611
                                                                        src_offset++;
 
612
                                                                        unlock_(my_lock);
 
613
                                                                        goto retry_read;
 
614
                                                                }
 
615
                                                                break;
 
616
                                                }
 
617
                                                ptr.rp_chars += ref_size;
 
618
                                        }
 
619
 
 
620
 
 
621
                                        // If there are still blob references then the record needs to be backed up.
 
622
                                        if (table_ref_count && blob_ref_count) {
 
623
 
 
624
                                                off64_t dst_offset;
 
625
 
 
626
                                                dst_offset = dst_repo->myRepoFileSize;
 
627
                                                
 
628
                                                /* Write the header. */
 
629
                                                dst_file->write(head->getBuffer(0), dst_offset, head_size);
 
630
 
 
631
                                                /* Copy the BLOB over: */
 
632
                                                if (blob_storage_type == MS_CLOUD_STORAGE) { 
 
633
                                                        bu_Database->myBlobCloud->cl_backupBLOB(&cloud_key);
 
634
                                                } else
 
635
                                                        CSFile::transfer(RETAIN(dst_file), dst_offset + head_size, RETAIN(src_file), src_offset + head_size, blob_size, transferBuffer, MS_BACKUP_BUFFER_SIZE);
 
636
                                        
 
637
                                                /* Update the references: */
 
638
                                                ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
 
639
                                                for (int count = 0; count < ref_count; count++) {
 
640
                                                        switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
 
641
                                                                case MS_BLOB_FREE_REF:
 
642
                                                                case MS_BLOB_DELETE_REF:
 
643
                                                                        break;
 
644
                                                                case MS_BLOB_TABLE_REF:
 
645
                                                                        tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
 
646
                                                                        blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
 
647
 
 
648
                                                                        if ((otab = MSTableList::getOpenTableByID(bu_Database->myDatabaseID, tab_id))) {
 
649
                                                                                frompool_(otab);
 
650
                                                                                otab->getDBTable()->setBlobHandle(otab, blob_id, dst_repo->myRepoID, dst_offset, blob_size, head_size, auth_code);
 
651
//CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "What if an error ocurred here!");
 
652
 
 
653
                                                                                backtopool_(otab);
 
654
                                                                        }
 
655
                                                                        break;
 
656
                                                                default:
 
657
                                                                        break;
 
658
                                                        }
 
659
                                                        ptr.rp_chars += ref_size;
 
660
                                                }
 
661
 
 
662
                                                dst_repo->myRepoFileSize += head_size + blob_size;
 
663
                                        }
 
664
                                }
 
665
                                unlock_(my_lock);
 
666
                                src_offset += head_size + blob_size;
 
667
                        }
 
668
                        bu_completed += src_offset - prev_offset;
 
669
                        
 
670
                        // close the destination repository and cleanup.
 
671
                        release_(dst_file);
 
672
                        backtopool_(dst_repo);
 
673
                        release_(src_file);
 
674
                        
 
675
                        // release the source repository and get the next one in the list.
 
676
                        src_repo->backupCompleted();
 
677
                        bu_BackupList->remove(0);
 
678
                        
 
679
                        src_repo = (MSRepository*)bu_BackupList->get(0);
 
680
                }
 
681
                                
 
682
                release_(head);
 
683
                if (myMustQuit)
 
684
                        bu_State = BU_TERMINATED; 
 
685
                else
 
686
                        bu_State = BU_COMPLETED; 
 
687
                        
 
688
        }       
 
689
        
 
690
        catch_(a) {
 
691
                logException();
 
692
        }
 
693
        
 
694
        cont_(a);       
 
695
        completeBackup();
 
696
        myMustQuit = true;
 
697
        return_(true);
 
698
}
 
699
 
 
700
void *MSBackup::completeWork()
 
701
{
 
702
        if (bu_SourceDatabase || bu_BackupList || bu_Compactor || bu_info) {
 
703
                // We shouldn't be here
 
704
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "MSBackup::completeBackup() not called");
 
705
                if (bu_SourceDatabase) {
 
706
                         bu_SourceDatabase->release();
 
707
                         bu_SourceDatabase = NULL;
 
708
                }
 
709
                        
 
710
                if (bu_BackupList) {
 
711
                         bu_BackupList->release();
 
712
                         bu_BackupList = NULL;
 
713
                }
 
714
 
 
715
                        
 
716
                if (bu_Compactor) {
 
717
                         bu_Compactor->release();
 
718
                         bu_Compactor = NULL;
 
719
                }
 
720
 
 
721
                        
 
722
                if (bu_info) {
 
723
                         bu_info->release();
 
724
                         bu_info = NULL;
 
725
                }
 
726
 
 
727
        }
 
728
        return NULL;
 
729
}