~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pbms/src/alias_ms.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-10-02 14:17:48 UTC
  • mfrom: (1.1.1 upstream)
  • mto: (2.1.17 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101002141748-m6vbfbfjhrw1153e
Tags: 2010.09.1802-1
* New upstream release.
* Removed pid-file argument hack.
* Updated GPL-2 address to be new address.
* Directly copy in drizzledump.1 since debian doesn't have sphinx 1.0 yet.
* Link to jquery from libjs-jquery. Add it as a depend.
* Add drizzled.8 symlink to the install files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Barry Leslie
 
20
 *
 
21
 * 2008-12-30
 
22
 *
 
23
 * H&G2JCtL
 
24
 *
 
25
 * BLOB alias index.
 
26
 *
 
27
 */
 
28
 
 
29
#ifdef HAVE_ALIAS_SUPPORT
 
30
#include "cslib/CSConfig.h"
 
31
 
 
32
#include "string.h"
 
33
 
 
34
#ifdef DRIZZLED
 
35
#include <drizzled/common.h>
 
36
#endif
 
37
 
 
38
#include "cslib/CSGlobal.h"
 
39
#include "cslib/CSLog.h"
 
40
#include "cslib/CSStrUtil.h"
 
41
#include "cslib/CSFile.h"
 
42
#include "system_table_ms.h"
 
43
#include "database_ms.h"
 
44
 
 
45
#include "alias_ms.h"
 
46
 
 
47
 
 
48
 
 
49
//------------------------
 
50
MSAlias::~MSAlias()
 
51
{
 
52
        enter_();
 
53
        
 
54
        ASSERT(iClosing);
 
55
        ASSERT(iPoolSysTables.getSize() == 0);
 
56
 
 
57
        
 
58
        if (iFilePath) {
 
59
        
 
60
                if (iDelete)
 
61
                        iFilePath->removeFile();
 
62
                        
 
63
                iFilePath->release();
 
64
        }
 
65
        
 
66
        if (iFileShare)
 
67
                iFileShare->release();
 
68
                
 
69
        exit_();
 
70
}
 
71
 
 
72
//------------------------
 
73
MSAlias::MSAlias(MSDatabase *db_noref)
 
74
{
 
75
        iClosing = false;
 
76
        iDelete = false;
 
77
        iDatabase_br = db_noref;
 
78
        iFilePath = NULL;
 
79
        iFileShare = NULL;
 
80
}
 
81
 
 
82
//------------------------
 
83
void MSAlias::ma_close()
 
84
{
 
85
        enter_();
 
86
        
 
87
        iClosing = true;
 
88
        if (iFileShare)
 
89
                iFileShare->close();
 
90
        iPoolSysTables.clear();
 
91
        exit_();
 
92
}
 
93
 
 
94
//------------------------
 
95
// Compress the index bucket chain and free unused buckets.
 
96
void MSAlias::MSAliasCompress(CSFile *fa, CSSortedList  *freeList, MSABucketLinkedList *bucketChain)
 
97
{
 
98
        // For now I will just remove empty buckets. 
 
99
        // Later this function should also compress the records also 
 
100
        // thus making the searches faster and freeing up more space.
 
101
        MSABucketInfo *b_info, *next;
 
102
        
 
103
        b_info = bucketChain->getFront();
 
104
        while (b_info) {
 
105
                next = b_info->getNextLink();
 
106
                if (b_info->getSize() == 0) {
 
107
                        bucketChain->remove(RETAIN(b_info));
 
108
                        freeList->add(b_info);
 
109
                }               
 
110
                b_info = next;
 
111
        }
 
112
                
 
113
}
 
114
 
 
115
//------------------------
 
116
void MSAlias::MSAliasLoad()
 
117
{       
 
118
        CSFile                  *fa = NULL;
 
119
        CSSortedList    freeList;
 
120
        off64_t                 fileSize;
 
121
 
 
122
        enter_();
 
123
        
 
124
        fa = CSFile::newFile(RETAIN(iFilePath));
 
125
        push_(fa);
 
126
 
 
127
        MSAliasHeadRec header;
 
128
        uint64_t free_list_offset;
 
129
        fa->open(CSFile::DEFAULT);
 
130
        fa->read(&header, 0, sizeof(header), sizeof(header));
 
131
        
 
132
        /* Check the file header: */
 
133
        if (CS_GET_DISK_4(header.ah_magic_4) != MS_ALIAS_FILE_MAGIC)
 
134
                CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_HEADER_MAGIC);
 
135
        if (CS_GET_DISK_2(header.ah_version_2) != MS_ALIAS_FILE_VERSION)
 
136
                CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_VERSION_TOO_NEW);
 
137
                
 
138
        free_list_offset = CS_GET_DISK_8(header.ah_free_list_8);
 
139
        
 
140
        fileSize = CS_GET_DISK_8(header.ah_file_size_8);
 
141
 
 
142
        // Do some sanity checks. 
 
143
        if (CS_GET_DISK_2(header.ah_head_size_2) != sizeof(header))
 
144
                CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
 
145
        
 
146
        if (CS_GET_DISK_2(header.ah_num_buckets_2) != BUCKET_LIST_SIZE)
 
147
                CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
 
148
        
 
149
        if (CS_GET_DISK_4(header.ah_bucket_size_4) != NUM_RECORDS_PER_BUCKET)
 
150
                CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
 
151
        
 
152
        if (fileSize != fa->getEOF()) 
 
153
                CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
 
154
        
 
155
        // Load the bucket headers into RAM
 
156
        MSADiskBucketHeadRec bucketHead = {0};
 
157
        uint64_t offset, start_offset;
 
158
 
 
159
        // Fist load the free list:
 
160
        if (free_list_offset) {
 
161
                start_offset = offset = free_list_offset;
 
162
                do {
 
163
                        fa->read(&bucketHead, offset, sizeof(MSADiskBucketHeadRec), sizeof(MSADiskBucketHeadRec));
 
164
                        freeList.add(MSABucketInfo::newMSABucketInfo(offset));
 
165
                        offset = CS_GET_DISK_8(bucketHead.ab_next_bucket_8);                                    
 
166
                } while (offset != start_offset);
 
167
                
 
168
        }
 
169
        for (uint32_t i = 0; i < BUCKET_LIST_SIZE; i++) {
 
170
                uint64_t used, total_space;
 
171
                MSABucketLinkedList *bucketChain = &(iFileShare->msa_buckets[i]);
 
172
                
 
173
                start_offset = offset = sizeof(header) + i * sizeof(MSADiskBucketRec);
 
174
                used = total_space = 0;
 
175
                do {
 
176
                        uint32_t num, end_of_records;
 
177
                        
 
178
                        fa->read(&bucketHead, offset, sizeof(MSADiskBucketHeadRec), sizeof(MSADiskBucketHeadRec));
 
179
                        num = CS_GET_DISK_4(bucketHead.ab_num_recs_4);
 
180
                        end_of_records = CS_GET_DISK_4(bucketHead.ab_eor_rec_4);
 
181
                        total_space += NUM_RECORDS_PER_BUCKET;
 
182
                        used += num;
 
183
                        bucketChain->addFront(MSABucketInfo::newMSABucketInfo(offset, num, end_of_records));
 
184
                        offset = CS_GET_DISK_8(bucketHead.ab_next_bucket_8);
 
185
                        
 
186
                } while (offset != start_offset);
 
187
                
 
188
                // Pack the index if required
 
189
                if (((total_space - used) /  NUM_RECORDS_PER_BUCKET) > 1) 
 
190
                        MSAliasCompress(fa, &freeList, bucketChain); 
 
191
                        
 
192
        }
 
193
        
 
194
        // If there are free buckets try to free up some disk
 
195
        // space or add them to a free list to be reused later.
 
196
        if (freeList.getSize()) {
 
197
                uint64_t last_bucket = fileSize - sizeof(MSADiskBucketRec);
 
198
                MSABucketInfo *rec;
 
199
                bool reduce = false;
 
200
                
 
201
                // Search for freed buckets at the end of the file
 
202
                // so that they can be released and the file
 
203
                // shrunk.
 
204
                //
 
205
                // The free list has been sorted so that buckets
 
206
                // with the highest file offset are first.
 
207
                do {
 
208
                        rec = (MSABucketInfo*) freeList.itemAt(0);
 
209
                        if (rec->bi_bucket_offset != last_bucket);
 
210
                                break;
 
211
                                
 
212
                        last_bucket -= sizeof(MSADiskBucketRec);
 
213
                        freeList.remove(rec);
 
214
                        reduce = true;
 
215
                } while (freeList.getSize());
 
216
                
 
217
                if (reduce) {
 
218
                        // The file can be reduced in size.
 
219
                        fileSize = last_bucket + sizeof(MSADiskBucketRec);      
 
220
                        fa->setEOF(fileSize);
 
221
                        CS_SET_DISK_8(header.ah_file_size_8, fileSize);
 
222
                        fa->write(&header.ah_file_size_8, offsetof(MSAliasHeadRec,ah_file_size_8) , 8); 
 
223
                }
 
224
                
 
225
                // Add the empty buckets to the index file's empty bucket list.
 
226
                memset(&bucketHead, 0, sizeof(bucketHead));
 
227
                offset = 0;
 
228
                while (freeList.getSize()) { // Add the empty buckets to the empty_bucket list.
 
229
                        rec = (MSABucketInfo*) freeList.takeItemAt(0);
 
230
                        
 
231
                        // buckets are added to the front of the list.
 
232
                        fa->write(&offset, rec->bi_bucket_offset + offsetof(MSADiskBucketHeadRec,ab_next_bucket_8) , 8);
 
233
                        offset =  rec->bi_bucket_offset;
 
234
                        fa->write(&offset, offsetof(MSAliasHeadRec,ah_free_list_8) , 8); 
 
235
                        
 
236
                        iFileShare->msa_empty_buckets.addFront(rec);
 
237
                }
 
238
        }
 
239
        
 
240
        iFileShare->msa_fileSize = fa->getEOF();
 
241
        
 
242
        release_(fa);
 
243
        exit_();
 
244
}
 
245
 
 
246
//------------------------
 
247
void MSAlias::buildAliasIndex()
 
248
{
 
249
        MSBlobHeadRec   blob;
 
250
        MSRepository    *repo;
 
251
        uint64_t                        blob_size, fileSize, offset;
 
252
        uint16_t                        head_size;
 
253
        MSAliasFile             *afile;
 
254
        MSAliasRec              aliasRec;
 
255
        
 
256
        enter_();
 
257
        
 
258
        afile = getAliasFile();
 
259
        frompool_(afile);
 
260
 
 
261
        afile->startLoad();
 
262
 
 
263
        CSSyncVector    *repo_list = iDatabase_br->getRepositoryList();
 
264
        
 
265
        // No locking is required since the index is loaded before the database is opened
 
266
        // and the compactor thread is started.
 
267
 
 
268
        for (uint32_t repo_index =0; repo_index<repo_list->size(); repo_index++) {
 
269
                if ((repo = (MSRepository *) repo_list->get(repo_index))) {
 
270
                        MSRepoFile      *repoFile = repo->openRepoFile();
 
271
                        push_(repoFile);
 
272
                        fileSize = repo->getRepoFileSize();
 
273
                        offset = repo->getRepoHeadSize();
 
274
                        
 
275
                        aliasRec.repo_id = repoFile->myRepo->getRepoID();
 
276
                        
 
277
                        while (offset < fileSize) {
 
278
                                if (repoFile->read(&blob, offset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) 
 
279
                                        break;
 
280
                                        
 
281
                                if ((CS_GET_DISK_1(blob.rb_status_1) == MS_BLOB_REFERENCED) && CS_GET_DISK_2(blob.rb_alias_offset_2)) {
 
282
                                        aliasRec.repo_offset = offset;
 
283
                                        aliasRec.alias_hash = CS_GET_DISK_4(blob.rb_alias_hash_4);
 
284
                                        addAlias(afile, &aliasRec);
 
285
                                }
 
286
                                
 
287
                                head_size = CS_GET_DISK_2(blob.rb_head_size_2);
 
288
                                blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
 
289
                                offset += head_size + blob_size;
 
290
                        }
 
291
                        
 
292
                        release_(repoFile);
 
293
                }
 
294
        }
 
295
        
 
296
        afile->finishLoad();
 
297
        backtopool_(afile);
 
298
 
 
299
        exit_();
 
300
}
 
301
 
 
302
//------------------------
 
303
void MSAlias::MSAliasBuild()
 
304
{
 
305
        CSFile *fa;
 
306
        MSAliasHeadRec header = {0};
 
307
        uint64_t offset, size = sizeof(header) + BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec);
 
308
        enter_();
 
309
        
 
310
        fa = CSFile::newFile(RETAIN(iFilePath));
 
311
        push_(fa);
 
312
 
 
313
        fa->open(CSFile::CREATE | CSFile::TRUNCATE);
 
314
 
 
315
        // Create an empty index with 1 empty bucket in each bucket chain.      
 
316
 
 
317
        CS_SET_DISK_4(header.ah_magic_4,  MS_ALIAS_FILE_MAGIC);
 
318
        CS_SET_DISK_2(header.ah_version_2, MS_ALIAS_FILE_VERSION);
 
319
                
 
320
        CS_SET_DISK_2(header.ah_head_size_2, sizeof(header));
 
321
        CS_SET_DISK_8(header.ah_file_size_8, size);
 
322
        
 
323
        CS_SET_DISK_2(header.ah_num_buckets_2, BUCKET_LIST_SIZE);
 
324
        CS_SET_DISK_2(header.ah_bucket_size_4, NUM_RECORDS_PER_BUCKET);
 
325
        
 
326
        fa->setEOF(size); // Grow the file.
 
327
        fa->write(&header, 0, sizeof(header));
 
328
 
 
329
        offset = sizeof(header);
 
330
        
 
331
        // Initialize the file bucket chains.
 
332
        MSADiskBucketHeadRec bucketHead = {0};
 
333
        for (uint32_t i = 0; i < BUCKET_LIST_SIZE; i++) {
 
334
                CS_SET_DISK_8(bucketHead.ab_prev_bucket_8, offset);
 
335
                CS_SET_DISK_8(bucketHead.ab_next_bucket_8, offset);
 
336
                fa->write(&bucketHead, offset, sizeof(MSADiskBucketHeadRec));
 
337
                // Add the bucket to the RAM based list.
 
338
                iFileShare->msa_buckets[i].addFront(MSABucketInfo::newMSABucketInfo(offset));
 
339
                offset += sizeof(MSADiskBucketRec); // NOTE: MSADiskBucketRec not MSADiskBucketHeadRec
 
340
        }
 
341
        
 
342
        fa->sync();
 
343
        
 
344
        
 
345
        
 
346
        fa->close();
 
347
        
 
348
        release_(fa);
 
349
        
 
350
        // Scan through all the BLOBs in the repository and add an entry
 
351
        // for each blob alias.
 
352
        buildAliasIndex();
 
353
 
 
354
        exit_();
 
355
}
 
356
 
 
357
//------------------------
 
358
void MSAlias::ma_open(const char *file_name)
 
359
{
 
360
        bool isdir = false;
 
361
        
 
362
        enter_();
 
363
 
 
364
        iFilePath = CSPath::newPath(RETAIN(iDatabase_br->myDatabasePath), file_name);
 
365
        
 
366
retry:
 
367
        new_(iFileShare, MSAliasFileShare(RETAIN(iFilePath)));
 
368
        
 
369
        if (iFilePath->exists(&isdir)) {
 
370
                try_(a) {
 
371
                        MSAliasLoad(); 
 
372
                }
 
373
                catch_(a) {
 
374
                        // If an error occurs delete the index and rebuild it.
 
375
                        self->myException.log(NULL);
 
376
                        iFileShare->release();
 
377
                        iFilePath->removeFile();
 
378
                        goto retry;
 
379
                }
 
380
                cont_(a);
 
381
        } else
 
382
                MSAliasBuild();
 
383
        
 
384
        
 
385
        exit_();
 
386
}
 
387
 
 
388
//------------------------
 
389
uint32_t MSAlias::hashAlias(const char *ptr)
 
390
{
 
391
        register uint32_t h = 0, g;
 
392
        
 
393
        while (*ptr) {
 
394
                h = (h << 4) + (uint32_t) toupper(*ptr++);
 
395
                if ((g = (h & 0xF0000000)))
 
396
                        h = (h ^ (g >> 24)) ^ g;
 
397
        }
 
398
 
 
399
        return (h);
 
400
}
 
401
 
 
402
//------------------------
 
403
void MSAlias::addAlias(MSAliasFile *af, MSAliasRec *rec)
 
404
{
 
405
        MSDiskAliasRec diskRec;
 
406
        CS_SET_DISK_4(diskRec.ar_repo_id_4, rec->repo_id);      
 
407
        CS_SET_DISK_8(diskRec.ar_offset_8, rec->repo_offset);   
 
408
        CS_SET_DISK_4(diskRec.ar_hash_4, rec->alias_hash);
 
409
        af->addRec(&diskRec);
 
410
 
 
411
}
 
412
 
 
413
//------------------------
 
414
uint32_t MSAlias::addAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
 
415
{
 
416
        MSDiskAliasRec diskRec;
 
417
        uint32_t hash;
 
418
        uint32_t f_repo_id;
 
419
        uint64_t f_repo_offset;
 
420
        bool referenced = false;
 
421
        enter_();
 
422
        
 
423
        hash = hashAlias(alias);
 
424
        
 
425
        // Use a lock to make sure that the same alias cannot be added at the same time.
 
426
        lock_(this);
 
427
        
 
428
        MSAliasFile *af = getAliasFile();
 
429
        frompool_(af);
 
430
 
 
431
        if (findBlobByAlias(RETAIN(af), alias, &referenced, &f_repo_id, &f_repo_offset)) {
 
432
                if ((f_repo_id == repo_id) && (f_repo_offset == repo_offset))
 
433
                        goto done; // Do not treat this as an error.
 
434
                if (!referenced) {
 
435
                        // If the alias is in use by a non referenced BLOB then delete it.
 
436
                        // This can happen because I allow newly created BLOBs to be accessed
 
437
                        // by their alias even before a reference to the BLOB has been added to
 
438
                        // the database.
 
439
                        af->deleteCurrentRec();
 
440
                } else  {
 
441
#ifdef xxDEBUG
 
442
                        CSL.log(self, CSLog::Protocol, "Alias: ");
 
443
                        CSL.log(self, CSLog::Protocol, alias);
 
444
                        CSL.log(self, CSLog::Protocol, "\n");
 
445
#endif
 
446
                        CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Alias Exists");
 
447
                }
 
448
        }
 
449
                
 
450
        CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);   
 
451
        CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);        
 
452
        CS_SET_DISK_4(diskRec.ar_hash_4, hash); 
 
453
 
 
454
        af->addRec(&diskRec);
 
455
done:
 
456
        backtopool_(af);
 
457
 
 
458
        unlock_(this);
 
459
        return_(hash);
 
460
}
 
461
 
 
462
//------------------------
 
463
void MSAlias::deleteAlias(MSDiskAliasPtr diskRec)
 
464
{
 
465
        enter_();
 
466
        
 
467
        MSAliasFile *af = getAliasFile();
 
468
        frompool_(af);
 
469
        if (af->findRec(diskRec))
 
470
                af->deleteCurrentRec();
 
471
        backtopool_(af);
 
472
 
 
473
        exit_();
 
474
}
 
475
 
 
476
//------------------------
 
477
void MSAlias::deleteAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
 
478
{
 
479
        MSDiskAliasRec diskRec;
 
480
        
 
481
        CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);   
 
482
        CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);        
 
483
        CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);   
 
484
        deleteAlias(&diskRec);
 
485
        
 
486
}
 
487
//------------------------
 
488
void MSAlias::resetAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
 
489
{
 
490
        MSDiskAliasRec diskRec;
 
491
        bool found;
 
492
        enter_();
 
493
        
 
494
        CS_SET_DISK_4(diskRec.ar_repo_id_4, old_repo_id);       
 
495
        CS_SET_DISK_8(diskRec.ar_offset_8, old_repo_offset);    
 
496
        CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);   
 
497
 
 
498
        lock_(this);
 
499
        
 
500
        MSAliasFile *af = getAliasFile();
 
501
        frompool_(af);
 
502
        found = af->findRec(&diskRec);
 
503
        CS_SET_DISK_4(diskRec.ar_repo_id_4, new_repo_id);       
 
504
        CS_SET_DISK_8(diskRec.ar_offset_8, new_repo_offset);    
 
505
 
 
506
        if (found) 
 
507
                af->updateCurrentRec(&diskRec);
 
508
        else {
 
509
                CSException::logException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Alias doesn't exists");
 
510
                af->addRec(&diskRec);
 
511
        }
 
512
                        
 
513
        backtopool_(af);
 
514
 
 
515
        unlock_(this);
 
516
        exit_();
 
517
}
 
518
 
 
519
//------------------------
 
520
// Check to see if the blob with the given repo_id
 
521
// and repo_offset has the specified alias.
 
522
bool MSAlias::hasBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias, bool *referenced)
 
523
{
 
524
        bool found = false;
 
525
        MSRepoFile *repoFile;
 
526
        MSBlobHeadRec   blob;
 
527
        uint8_t status;
 
528
        uint64_t offset;
 
529
        uint32_t alias_size = strlen(alias) +1;
 
530
        char blob_alias[BLOB_ALIAS_LENGTH +1];
 
531
        
 
532
        if (alias_size > BLOB_ALIAS_LENGTH)
 
533
                return false;
 
534
 
 
535
        enter_();
 
536
        
 
537
        repoFile = iDatabase_br->getRepoFileFromPool(repo_id, false);
 
538
        frompool_(repoFile);
 
539
 
 
540
        repoFile->read(&blob, repo_offset, sizeof(MSBlobHeadRec), sizeof(MSBlobHeadRec));
 
541
        status = CS_GET_DISK_1(blob.rb_status_1);
 
542
        if (IN_USE_BLOB_STATUS(status)) {
 
543
                offset = repo_offset + CS_GET_DISK_2(blob.rb_alias_offset_2);
 
544
                
 
545
                blob_alias[BLOB_ALIAS_LENGTH] = 0;
 
546
                if (repoFile->read(blob_alias, offset, alias_size, 0) == alias_size) {
 
547
                        found = !my_strcasecmp(&my_charset_utf8_general_ci, blob_alias, alias);
 
548
                        if (found)
 
549
                                *referenced = (status == MS_BLOB_REFERENCED);
 
550
                }
 
551
        } else {
 
552
                CSException::logException(CS_CONTEXT, MS_ERR_ENGINE, "Deleted BLOB alias found. (Rebuild BLOB alias index.)");
 
553
        }
 
554
        
 
555
                
 
556
        backtopool_(repoFile);  
 
557
 
 
558
        return_(found);
 
559
}
 
560
 
 
561
//------------------------
 
562
bool MSAlias::findBlobByAlias( MSAliasFile *af, const char *alias, bool *referenced, uint32_t *repo_id, uint64_t *repo_offset)
 
563
{
 
564
        bool found = false;
 
565
        uint32_t hash, l_repo_id, l_repo_offset;
 
566
        MSDiskAliasPtr diskRec;
 
567
        enter_();
 
568
 
 
569
        push_(af);
 
570
        
 
571
        hash = hashAlias(alias);
 
572
        diskRec = af->findRec(hash);
 
573
        
 
574
        while (diskRec && !found) {
 
575
                l_repo_id = CS_GET_DISK_4(diskRec->ar_repo_id_4);
 
576
                l_repo_offset = CS_GET_DISK_8(diskRec->ar_offset_8);
 
577
                if (hasBlobAlias(l_repo_id, l_repo_offset, alias, referenced))
 
578
                        found = true;
 
579
                else
 
580
                        diskRec = af->nextRec();
 
581
        }
 
582
                
 
583
        if (found) {
 
584
                if (repo_id)
 
585
                        *repo_id = l_repo_id;
 
586
                        
 
587
                if (repo_offset)
 
588
                        *repo_offset = l_repo_offset;
 
589
        }
 
590
        
 
591
        release_(af);
 
592
        return_(found);
 
593
}
 
594
//------------------------
 
595
bool MSAlias::findBlobByAlias( const char *alias, bool *referenced, uint32_t *repo_id, uint64_t *repo_offset)
 
596
{
 
597
        bool found;
 
598
        enter_();
 
599
        
 
600
        MSAliasFile *af = getAliasFile();
 
601
        frompool_(af);
 
602
        
 
603
        found = findBlobByAlias(RETAIN(af), alias, referenced, repo_id, repo_offset);
 
604
 
 
605
        backtopool_(af);
 
606
        return_(found);
 
607
}
 
608
 
 
609
//------------------------
 
610
bool MSAlias::blobAliasExists(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
 
611
{
 
612
        bool found;
 
613
        MSDiskAliasRec diskRec;
 
614
        
 
615
        CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);   
 
616
        CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);        
 
617
        CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);   
 
618
 
 
619
        enter_();
 
620
        
 
621
        MSAliasFile *af = getAliasFile();
 
622
        frompool_(af);
 
623
        
 
624
        found = af->findRec(&diskRec);
 
625
 
 
626
        backtopool_(af);
 
627
        return_(found);
 
628
}
 
629
 
 
630
/////////////////////////////////////
 
631
MSSysMeta::MSSysMeta(MSAlias *msa)
 
632
{
 
633
        md_myMSAlias = msa;
 
634
        md_isFileInUse = false;
 
635
        md_NextLink = md_PrevLink = NULL;
 
636
        
 
637
        mtab = MSMetaDataTable::newMSMetaDataTable(RETAIN(msa->iDatabase_br));
 
638
}
 
639
 
 
640
//------------------------
 
641
MSSysMeta::~MSSysMeta()
 
642
{
 
643
        if (mtab)
 
644
                mtab->release();
 
645
 
 
646
        if (md_myMSAlias)
 
647
                md_myMSAlias->release();
 
648
}
 
649
 
 
650
//------------------------
 
651
void MSSysMeta::returnToPool()
 
652
{
 
653
        enter_();
 
654
        push_(this);
 
655
        
 
656
                
 
657
        md_isFileInUse = false;
 
658
                
 
659
        if (!md_myMSAlias->iClosing) {
 
660
                lock_(&md_myMSAlias->iSysTablePoolLock); // It may be better if the pool had it's own lock.
 
661
                md_nextFile = md_myMSAlias->iSysTablePool;
 
662
                md_myMSAlias->iSysTablePool - this;
 
663
                unlock_(&md_myMSAlias->iSysTablePoolLock);
 
664
        }
 
665
                
 
666
        release_(this);
 
667
        exit_();
 
668
}
 
669
//------------------------
 
670
bool MSSysMeta::matchAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
 
671
{
 
672
        mtab->seqScanInit(); 
 
673
        return mtab->matchAlias(repo_id, repo_offset, alias); 
 
674
}
 
675
 
 
676
/////////////////////////////////////
 
677
/////////////////////////////////////
 
678
MSAliasFile::MSAliasFile(MSAliasFileShare *share)
 
679
{
 
680
        ba_share = share;
 
681
        ba_isFileInUse = false;
 
682
        ba_NextLink = ba_PrevLink = NULL;
 
683
        
 
684
        iCurrentRec = 0;
 
685
        iBucketCache = NULL;
 
686
        iStartBucket = iCurrentBucket = NULL;
 
687
        iBucketChain = NULL;
 
688
        iLoading = false;
 
689
        ba_nextFile = NULL;
 
690
        
 
691
        iFile = CSFile::newFile(RETAIN(ba_share->msa_filePath));        
 
692
        iFile->open(CSFile::DEFAULT);
 
693
        
 
694
        
 
695
}
 
696
 
 
697
//------------------------
 
698
MSAliasFile::~MSAliasFile()
 
699
{
 
700
        if (iFile)
 
701
                iFile->release();
 
702
                
 
703
        if (iBucketCache)
 
704
                cs_free(iBucketCache);
 
705
}
 
706
 
 
707
//------------------------
 
708
void MSAliasFile::startLoad()
 
709
{
 
710
        enter_();
 
711
        
 
712
        ASSERT(!iLoading);
 
713
        
 
714
//      iBucketCache = (MSADiskBucketRec*) cs_malloc(BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec));
 
715
//      memset(iBucketCache, 0, BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec));
 
716
        iLoading = true;
 
717
        
 
718
        exit_();
 
719
}
 
720
 
 
721
//------------------------
 
722
void MSAliasFile::finishLoad()
 
723
{
 
724
        enter_();
 
725
        ASSERT(iLoading);
 
726
        // Write the bucket cache to disk.
 
727
//      for (iCurrentBucket && iCurrentBucket->getSize()) {
 
728
                // To Be implemented.
 
729
//      }
 
730
//      cs_free(iBucketCache);
 
731
        iBucketCache = NULL;
 
732
        iLoading = false;
 
733
        exit_();
 
734
}
 
735
 
 
736
//------------------------
 
737
void MSAliasFile::returnToPool()
 
738
{
 
739
        enter_();
 
740
        push_(this);
 
741
        
 
742
        if (iLoading) {
 
743
                // If iLoading is still set then probably an exception has been thrown.
 
744
                try_(a) {
 
745
                        finishLoad();
 
746
                }
 
747
                catch_(a) 
 
748
                        iLoading = false;
 
749
                cont_(a);
 
750
        }
 
751
 
 
752
        ba_isFileInUse = false;
 
753
                
 
754
        if (!ba_share->msa_closing) {
 
755
                lock_(&ba_share->msa_poolLock);
 
756
                ba_nextFile = ba_share->msa_pool;
 
757
                ba_share->msa_pool = this;
 
758
                unlock_(&ba_share->msa_poolLock);
 
759
        }
 
760
                
 
761
        release_(this);
 
762
        exit_();
 
763
}
 
764
 
 
765
//------------------------
 
766
// The bucket chain is treated as a circular list.
 
767
bool MSAliasFile::nextBucket(bool with_space)
 
768
{
 
769
        bool have_bucket = false;
 
770
        enter_();
 
771
        
 
772
        while (!have_bucket){
 
773
                if (iCurrentBucket) {
 
774
                        iCurrentBucket = iCurrentBucket->getNextLink();
 
775
                        if (!iCurrentBucket)
 
776
                                iCurrentBucket = iBucketChain->getFront();
 
777
                        if (iCurrentBucket == iStartBucket)
 
778
                                break;
 
779
                } else {
 
780
                        iCurrentBucket = iBucketChain->getFront();
 
781
                        iStartBucket = iCurrentBucket;
 
782
                }
 
783
                
 
784
                if ((iCurrentBucket->getSize() && !with_space) || (with_space && (iCurrentBucket->getSize() < NUM_RECORDS_PER_BUCKET))){
 
785
                        // Only read the portion of the bucket containing records.
 
786
                        iCurrentRec = iCurrentBucket->getEndOfRecords(); // The current record is set just beyond the last valid record.
 
787
                        size_t size = iCurrentRec * sizeof(MSDiskAliasRec);             
 
788
                        iFile->read(iBucket, iCurrentBucket->bi_records_offset, size, size);                    
 
789
                        have_bucket = true;
 
790
                }
 
791
        }
 
792
        
 
793
        return_(have_bucket);
 
794
}
 
795
 
 
796
//------------------------
 
797
MSDiskAliasPtr MSAliasFile::nextRec()
 
798
{
 
799
        MSDiskAliasPtr rec = NULL;
 
800
        bool have_rec;
 
801
        enter_();
 
802
        
 
803
        while ((!(have_rec = scanBucket())) && nextBucket(false));
 
804
        
 
805
        if (have_rec) 
 
806
                rec = &(iBucket[iCurrentRec]);
 
807
                
 
808
        return_(rec);
 
809
}
 
810
 
 
811
//------------------------
 
812
// When starting a search:
 
813
// If a bucket is already loaded and it is in the correct bucket chain
 
814
// then search it first. In this case then the search starts at the current
 
815
// bucket in the chain.
 
816
//
 
817
// Searches are from back to front with the idea that the more recently
 
818
// added objects will be seached for more often and they are more likely
 
819
// to be at the end of the chain.
 
820
MSDiskAliasPtr MSAliasFile::findRec(uint32_t hash)
 
821
{
 
822
        MSDiskAliasPtr rec = NULL;
 
823
        MSABucketLinkedList *list = ba_share->getBucketChain(hash);
 
824
        enter_();
 
825
        
 
826
        CS_SET_DISK_4(iDiskHash_4, hash);
 
827
        if (list == iBucketChain) {
 
828
                // The search is performed back to front.
 
829
                iCurrentRec = iCurrentBucket->getEndOfRecords();  // Position the start just beyond the last valid record.
 
830
                iStartBucket = iCurrentBucket;
 
831
                if (scanBucket()) {
 
832
                        rec = &(iBucket[iCurrentRec]);
 
833
                        goto done;
 
834
                }
 
835
        } else {
 
836
                iBucketChain = list;
 
837
                iCurrentBucket = NULL;
 
838
                iStartBucket = NULL;
 
839
        }
 
840
 
 
841
        if (nextBucket(false))
 
842
                rec = nextRec();
 
843
                
 
844
done:
 
845
        return_(rec);
 
846
}
 
847
 
 
848
//------------------------
 
849
bool MSAliasFile::findRec(MSDiskAliasPtr theRec)
 
850
{
 
851
        MSDiskAliasPtr aRec = NULL;
 
852
        bool found = false;
 
853
        enter_();
 
854
        
 
855
        aRec = findRec(CS_GET_DISK_4(theRec->ar_hash_4));
 
856
        while ( aRec && !found) {
 
857
                if (CS_EQ_DISK_4(aRec->ar_repo_id_4, theRec->ar_repo_id_4) && CS_EQ_DISK_8(aRec->ar_offset_8, theRec->ar_offset_8))
 
858
                        found = true;
 
859
                else
 
860
                        aRec = nextRec();
 
861
        }       
 
862
        return_(found);
 
863
}
 
864
 
 
865
//------------------------
 
866
void MSAliasFile::addRec(MSDiskAliasPtr new_rec)
 
867
{
 
868
        MSABucketLinkedList *list = ba_share->getBucketChain(CS_GET_DISK_4(new_rec->ar_hash_4));
 
869
        enter_();
 
870
        lock_(&ba_share->msa_writeLock);
 
871
 
 
872
        if (iBucketChain != list) {
 
873
                iBucketChain = list;
 
874
                iCurrentBucket = NULL;
 
875
                iStartBucket = NULL;
 
876
        } else 
 
877
                iStartBucket = iCurrentBucket;
 
878
 
 
879
        if ((iCurrentBucket && (iCurrentBucket->getSize() < NUM_RECORDS_PER_BUCKET)) || nextBucket(true)) { // Find a bucket with space in it for a record.
 
880
                uint32_t size = iCurrentBucket->getSize();
 
881
                uint32_t end_of_records = iCurrentBucket->getEndOfRecords();
 
882
 
 
883
                if (size == end_of_records) { // No holes in the recored list
 
884
                        iCurrentRec = end_of_records;                   
 
885
                } else { // Search for the empty record
 
886
                        iCurrentRec = end_of_records -2;                        
 
887
                        while (iCurrentRec && !CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4))
 
888
                                iCurrentRec--;
 
889
                                
 
890
                        ASSERT(CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4));
 
891
                }
 
892
                
 
893
                memcpy(&iBucket[iCurrentRec], new_rec, sizeof(MSDiskAliasRec)); // Add the record to the cached bucket.
 
894
                
 
895
                iCurrentBucket->recAdded(iFile, iCurrentRec); // update the current bucket header.
 
896
        } else { // A new bucket must be added to the chain.
 
897
                MSADiskBucketHeadRec new_bucket = {0};
 
898
                CSDiskValue8 disk_8_value;
 
899
                uint64_t new_bucket_offset;
 
900
                MSABucketInfo *next, *prev;
 
901
                
 
902
                next = iBucketChain->getFront();
 
903
                prev = iBucketChain->getBack();
 
904
                
 
905
                // Set the next and prev bucket offsets in the new bucket record.
 
906
                CS_SET_DISK_8(new_bucket.ab_prev_bucket_8, prev->bi_bucket_offset);
 
907
                CS_SET_DISK_8(new_bucket.ab_next_bucket_8, next->bi_bucket_offset);
 
908
                
 
909
                if (ba_share->msa_empty_buckets.getSize()) { // Get a bucket from the empty bucket list.
 
910
                        MSABucketInfo *empty_bucket = ba_share->msa_empty_buckets.removeFront();
 
911
                        
 
912
                        new_bucket_offset = empty_bucket->bi_bucket_offset;                     
 
913
                        empty_bucket->release();
 
914
                        
 
915
                        // Update the index file's empty bucket list 
 
916
                        if (ba_share->msa_empty_buckets.getSize() == 0) 
 
917
                                CS_SET_NULL_DISK_8(disk_8_value);
 
918
                        else
 
919
                                CS_SET_DISK_8(disk_8_value, iBucketChain->getFront()->bi_bucket_offset);
 
920
                        
 
921
                        iFile->write(&disk_8_value, offsetof(MSAliasHeadRec,ah_free_list_8) , 8); 
 
922
                } else // There are no empty buckets so grow the file.
 
923
                        new_bucket_offset = ba_share->msa_fileSize;
 
924
                        
 
925
                // Write the new bucket's record header to the file
 
926
                iFile->write(&new_bucket, new_bucket_offset, sizeof(MSADiskBucketHeadRec)); 
 
927
                
 
928
                // Insert the new bucket into the bucket chain on the disk.
 
929
                CS_SET_DISK_8(disk_8_value, new_bucket_offset);
 
930
                iFile->write(&disk_8_value, prev->bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_next_bucket_8), 8); 
 
931
                iFile->write(&disk_8_value, next->bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_prev_bucket_8), 8); 
 
932
                
 
933
                // Update the file size in the file header if required
 
934
                if (ba_share->msa_fileSize == new_bucket_offset) {
 
935
                        ba_share->msa_fileSize += sizeof(MSADiskBucketRec); // Note this is MSADiskBucketRec not MSADiskBucketHeadRec
 
936
 
 
937
                        CS_SET_DISK_8(disk_8_value, ba_share->msa_fileSize);
 
938
                        iFile->write(&disk_8_value, offsetof(MSAliasHeadRec,ah_file_size_8) , 8); 
 
939
                }
 
940
                
 
941
                // Add the info rec into the bucket chain in RAM.
 
942
                iCurrentBucket = MSABucketInfo::newMSABucketInfo(new_bucket_offset, 1, 0);
 
943
                iBucketChain->addFront(iCurrentBucket);
 
944
                iCurrentRec = 0;
 
945
        }
 
946
        
 
947
        uint64_t offset;
 
948
        offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
 
949
 
 
950
        // Write the new index entry to the index file.
 
951
        iFile->write(new_rec, offset, sizeof(MSDiskAliasRec)); 
 
952
        
 
953
        unlock_(&ba_share->msa_writeLock);
 
954
        
 
955
        exit_();        
 
956
}
 
957
//------------------------
 
958
void MSAliasFile::deleteCurrentRec()
 
959
{
 
960
        MSDiskAliasPtr rec = &(iBucket[iCurrentRec]);
 
961
        uint64_t        offset;
 
962
        enter_();
 
963
        
 
964
        CS_SET_NULL_DISK_4(rec->ar_repo_id_4);
 
965
        offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
 
966
        
 
967
        lock_(&ba_share->msa_writeLock);
 
968
 
 
969
        // Update the index file. It is assumed that repo_id is the first 4 bytes of 'rec'.
 
970
        iFile->write(rec, offset, 4); 
 
971
        
 
972
        iCurrentBucket->recRemoved(iFile, iCurrentRec, iBucket);
 
973
        
 
974
        unlock_(&ba_share->msa_writeLock);
 
975
        
 
976
        exit_();        
 
977
}
 
978
 
 
979
//------------------------
 
980
void MSAliasFile::updateCurrentRec(MSDiskAliasPtr update_rec)
 
981
{
 
982
        uint64_t        offset;
 
983
        enter_();
 
984
        
 
985
        // ASSERT that the updated rec still belongs to this bucket chain.
 
986
        ASSERT(ba_share->getBucketChain(CS_GET_DISK_4(update_rec->ar_hash_4)) == iBucketChain);
 
987
        ASSERT(!CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4)); // We should not be updating a deleted record.
 
988
        
 
989
        lock_(&ba_share->msa_writeLock);
 
990
        offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
 
991
        
 
992
        // Update the record on disk.
 
993
        iFile->write(update_rec, offset, sizeof(MSDiskAliasRec));
 
994
        
 
995
        // Update the record in memory. 
 
996
        CS_COPY_DISK_4(iBucket[iCurrentRec].ar_repo_id_4, update_rec->ar_repo_id_4);    
 
997
        CS_COPY_DISK_8(iBucket[iCurrentRec].ar_offset_8, update_rec->ar_offset_8);      
 
998
 
 
999
        unlock_(&ba_share->msa_writeLock);
 
1000
        exit_();        
 
1001
}
 
1002
 
 
1003
 
 
1004
//------------------------
 
1005
MSABucketInfo *MSABucketInfo::newMSABucketInfo(uint64_t offset, uint32_t num, uint32_t last)
 
1006
{
 
1007
        MSABucketInfo *bucket;
 
1008
        new_(bucket, MSABucketInfo(offset, num, last));
 
1009
        return bucket;
 
1010
}
 
1011
//------------------------
 
1012
void MSABucketInfo::recRemoved(CSFile *iFile, uint32_t idx, MSDiskAliasRec bucket[])
 
1013
{
 
1014
        MSADiskBucketHeadRec head;
 
1015
        enter_();
 
1016
        
 
1017
        ASSERT(idx < bi_end_of_records);
 
1018
        
 
1019
        bi_num_recs--;
 
1020
        if (!bi_num_recs) {
 
1021
                // It would be nice to remove this bucket from the 
 
1022
                // bucket list and place it on the empty list.
 
1023
                // Before this can be done a locking method would
 
1024
                // be needed to block anyone from reading this
 
1025
                // bucket while it was being moved.
 
1026
                //
 
1027
                // I haven't done this because I have been trying
 
1028
                // to avoid read locks.
 
1029
                bi_end_of_records = 0;
 
1030
        } else if ((bi_end_of_records -1) == idx) {
 
1031
                while (idx && CS_IS_NULL_DISK_4(bucket[idx].ar_repo_id_4))
 
1032
                        idx--;
 
1033
                        
 
1034
                if ((idx ==0) && CS_IS_NULL_DISK_4(bucket[0].ar_repo_id_4))
 
1035
                        bi_end_of_records = 0;
 
1036
                else
 
1037
                        bi_end_of_records = idx +1;
 
1038
                
 
1039
                ASSERT(bi_end_of_records >= bi_num_recs);
 
1040
        }
 
1041
        
 
1042
        // Update the index file.
 
1043
        CS_SET_DISK_4(head.ab_num_recs_4, bi_num_recs);
 
1044
        CS_SET_DISK_4(head.ab_eor_rec_4, bi_end_of_records);
 
1045
        iFile->write(&head.ab_num_recs_4, bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_num_recs_4), 8); 
 
1046
        exit_();
 
1047
}
 
1048
 
 
1049
//------------------------
 
1050
void MSABucketInfo::recAdded(CSFile *iFile, uint32_t idx)
 
1051
{
 
1052
        MSADiskBucketHeadRec head;
 
1053
        enter_();
 
1054
        
 
1055
        ASSERT(bi_num_recs < NUM_RECORDS_PER_BUCKET);
 
1056
        ASSERT(idx < NUM_RECORDS_PER_BUCKET);
 
1057
 
 
1058
        bi_num_recs++;
 
1059
        if (idx == bi_end_of_records)
 
1060
                bi_end_of_records++;
 
1061
                
 
1062
        // Update the index file.
 
1063
        CS_SET_DISK_4(head.ab_num_recs_4, bi_num_recs);
 
1064
        CS_SET_DISK_4(head.ab_eor_rec_4, bi_end_of_records);
 
1065
        iFile->write(&head.ab_num_recs_4, bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_num_recs_4), 8); 
 
1066
        exit_();        
 
1067
}
 
1068
 
 
1069
//////////////////////////////////
 
1070
MSAliasFile *MSAliasFileShare::getPoolFile()
 
1071
{
 
1072
        MSAliasFile *af;
 
1073
        enter_();
 
1074
        
 
1075
        lock_(&msa_poolLock); 
 
1076
        if ((af = msa_pool)) {
 
1077
                msa_pool = af->ba_nextFile;
 
1078
        } else {
 
1079
                new_(af, MSAliasFile(this));
 
1080
                msa_poolFiles.addFront(af);
 
1081
        }
 
1082
        unlock_(&msa_poolLock);
 
1083
        
 
1084
        af->ba_nextFile = NULL;
 
1085
        ASSERT(!af->ba_isFileInUse);
 
1086
        af->ba_isFileInUse = true;
 
1087
        af->retain();
 
1088
        
 
1089
        return_(af);
 
1090
}
 
1091
#endif // HAVE_ALIAS_SUPPORT
 
1092