~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pbms/src/open_table_ms.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-10-02 14:17:48 UTC
  • mfrom: (1.1.1 upstream)
  • mto: (2.1.17 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101002141748-m6vbfbfjhrw1153e
Tags: 2010.09.1802-1
* New upstream release.
* Removed pid-file argument hack.
* Updated GPL-2 address to be new address.
* Directly copy in drizzledump.1 since debian doesn't have sphinx 1.0 yet.
* Link to jquery from libjs-jquery. Add it as a depend.
* Add drizzled.8 symlink to the install files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original author: Paul McCullagh
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-06-04
 
23
 *
 
24
 * H&G2JCtL
 
25
 *
 
26
 * Media Stream Tables.
 
27
 *
 
28
 */
 
29
#include "cslib/CSConfig.h"
 
30
 
 
31
#include "cslib/CSGlobal.h"
 
32
#include "cslib/CSLog.h"
 
33
#include "cslib/CSStrUtil.h"
 
34
#include "cslib/CSPath.h"
 
35
 
 
36
#include "open_table_ms.h"
 
37
#include "table_ms.h"
 
38
#include "connection_handler_ms.h"
 
39
#include "engine_ms.h"
 
40
#include "transaction_ms.h"
 
41
#include "parameters_ms.h"
 
42
 
 
43
/*
 
44
 * ---------------------------------------------------------------
 
45
 * OPEN TABLES
 
46
 */
 
47
 
 
48
MSOpenTable::MSOpenTable():
 
49
CSRefObject(),
 
50
CSPooled(),
 
51
inUse(true),
 
52
isNotATable(false),
 
53
nextTable(NULL),
 
54
myPool(NULL),
 
55
myTableFile(NULL),
 
56
myWriteRepo(NULL),
 
57
myWriteRepoFile(NULL),
 
58
myTempLogFile(NULL),
 
59
iNextLink(NULL),
 
60
iPrevLink(NULL)
 
61
//iUseSize(0),
 
62
//iUseCount(0),
 
63
//iUsedBlobs(0)
 
64
{
 
65
}
 
66
 
 
67
MSOpenTable::~MSOpenTable()
 
68
{
 
69
        close();
 
70
}
 
71
 
 
72
void MSOpenTable::close()
 
73
{
 
74
        enter_();
 
75
        if (myTableFile) {
 
76
                myTableFile->release();
 
77
                myTableFile = NULL;
 
78
        }
 
79
        closeForWriting();
 
80
        if (myTempLogFile) {
 
81
                myTempLogFile->release();
 
82
                myTempLogFile = NULL;
 
83
        }
 
84
/*
 
85
        if (iUsedBlobs) {
 
86
                cs_free(iUsedBlobs);
 
87
                iUsedBlobs = NULL;
 
88
        }
 
89
        iUseCount = 0;
 
90
        iUseSize = 0;
 
91
*/
 
92
        exit_();
 
93
}
 
94
 
 
95
void MSOpenTable::returnToPool()
 
96
{
 
97
        MSTableList::releaseTable(this);
 
98
}
 
99
 
 
100
void MSOpenTable::createBlob(PBMSBlobURLPtr bh, uint64_t blob_size, char *metadata, uint16_t metadata_size, CSInputStream *stream, CloudKeyPtr cloud_key, Md5Digest *checksum)
 
101
{
 
102
        uint64_t repo_offset;
 
103
        uint64_t blob_id = 0;
 
104
        uint32_t        auth_code;
 
105
        uint16_t head_size;
 
106
        uint32_t        log_id;
 
107
        uint32_t log_offset;
 
108
        uint32_t temp_time;
 
109
        uint64_t repo_size;
 
110
        uint64_t repo_id;
 
111
        Md5Digest my_checksum;
 
112
        CloudKeyRec cloud_key_rec;
 
113
        enter_();
 
114
        
 
115
        CLOBBER_PROTECT(cloud_key);
 
116
        CLOBBER_PROTECT(checksum);
 
117
 
 
118
        if (!checksum)
 
119
                checksum = &my_checksum;
 
120
                
 
121
        if (stream) push_(stream);
 
122
        openForWriting();
 
123
        ASSERT(myWriteRepo);
 
124
        auth_code = random();
 
125
        repo_size = myWriteRepo->getRepoFileSize();
 
126
        temp_time =     myWriteRepo->myLastTempTime;
 
127
 
 
128
        try_(a) {
 
129
                head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
 
130
                if (getDB()->myBlobType == MS_STANDARD_STORAGE) {
 
131
                        pop_(stream);
 
132
                        repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size, checksum, stream);
 
133
                } else {
 
134
                        ASSERT(getDB()->myBlobType == MS_CLOUD_STORAGE);
 
135
                        CloudDB *cloud = getDB()->myBlobCloud;
 
136
                        
 
137
                        if (!cloud)
 
138
                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Creating cloud BLOB without cloud.");
 
139
                
 
140
                        repo_offset = repo_size + head_size;
 
141
                        memset(checksum, 0, sizeof(Md5Digest)); // The checksum is only for local storage.
 
142
                        
 
143
                        // If there is a stream then the data has not been sent to the cloud yet.
 
144
                        if (stream) { 
 
145
                                cloud_key = &cloud_key_rec;
 
146
                                cloud->cl_getNewKey(cloud_key);
 
147
                                pop_(stream);
 
148
                                cloud->cl_putData(cloud_key, stream, blob_size);
 
149
                        }
 
150
                        
 
151
                }
 
152
                
 
153
                repo_id = myWriteRepo->myRepoID;
 
154
                if (isNotATable) {      
 
155
                        getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
 
156
                        formatRepoURL(bh, repo_id, repo_offset, auth_code, blob_size);
 
157
                }
 
158
                else {
 
159
                        blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
 
160
                        getDB()->queueForDeletion(this, MS_TL_BLOB_REF, getDBTable()->myTableID, blob_id, auth_code, &log_id, &log_offset, &temp_time);
 
161
                        formatBlobURL(bh, blob_id, auth_code, blob_size, 0);
 
162
                }
 
163
                
 
164
                myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, checksum, metadata, metadata_size, blob_id, auth_code, log_id, log_offset, getDB()->myBlobType, cloud_key);
 
165
        }
 
166
        
 
167
        catch_(a);
 
168
        // In the event of an error reset the repository size to its original size.
 
169
        myWriteRepo->setRepoFileSize(this, repo_size);
 
170
        throw_();
 
171
        
 
172
        cont_(a);
 
173
        exit_();
 
174
}
 
175
 
 
176
// BLOBs created with this method are always created as standard local BLOBs. (No cloud storage)
 
177
void MSOpenTable::createBlob(PBMSBlobIDPtr blob_id, uint64_t blob_size, char *metadata, uint16_t metadata_size)
 
178
{
 
179
        uint64_t repo_size;
 
180
        uint64_t repo_offset;
 
181
        uint64_t repo_id;
 
182
        uint32_t        auth_code;
 
183
        uint16_t head_size;
 
184
        uint32_t        log_id;
 
185
        uint32_t log_offset;
 
186
        uint32_t temp_time;
 
187
        enter_();
 
188
 
 
189
        openForWriting();
 
190
        ASSERT(myWriteRepo);
 
191
        auth_code = random();
 
192
        
 
193
        repo_size = myWriteRepo->getRepoFileSize();
 
194
        try_(a) {
 
195
                head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
 
196
 
 
197
                repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size);
 
198
                repo_id = myWriteRepo->myRepoID;
 
199
                temp_time = myWriteRepo->myLastTempTime;
 
200
                getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
 
201
                myWriteRepo->myLastTempTime = temp_time;
 
202
                myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, NULL, metadata, metadata_size, 0, auth_code, log_id, log_offset, MS_STANDARD_STORAGE, NULL);
 
203
                // myWriteRepo->setRepoFileSize(this, repo_offset + head_size + blob_size);This is now set by writeBlobHead()
 
204
                
 
205
                blob_id->bi_db_id = getDB()->myDatabaseID;
 
206
                blob_id->bi_blob_id = repo_offset;
 
207
                blob_id->bi_tab_id = repo_id;
 
208
                blob_id->bi_auth_code = auth_code;
 
209
                blob_id->bi_blob_size = blob_size;
 
210
                blob_id->bi_blob_type = MS_URL_TYPE_REPO;
 
211
                blob_id->bi_blob_ref_id = 0;
 
212
        
 
213
        }
 
214
        
 
215
        catch_(a);
 
216
        // BLOBs created with this method are always created as standard local BLOBs. (No cloud storage)
 
217
        myWriteRepo->setRepoFileSize(this, repo_size);
 
218
        throw_();
 
219
        
 
220
        cont_(a);
 
221
        exit_();
 
222
}
 
223
 
 
224
void MSOpenTable::sendRepoBlob(uint64_t blob_id, uint64_t req_offset, uint64_t req_size, uint32_t auth_code, bool info_only, CSHTTPOutputStream *stream)
 
225
{
 
226
        uint32_t                repo_id;
 
227
        uint64_t                offset;
 
228
        uint64_t                size;
 
229
        uint16_t                head_size;
 
230
        MSRepoFile      *repo_file;
 
231
 
 
232
        enter_();
 
233
        openForReading();
 
234
        getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &size, &head_size, true);
 
235
        repo_file = getDB()->getRepoFileFromPool(repo_id, false);
 
236
        frompool_(repo_file);
 
237
        //repo_file->sendBlob(this, offset, head_size, size, stream);
 
238
        repo_file->sendBlob(this, offset, req_offset, req_size, 0, false, info_only, stream);
 
239
        backtopool_(repo_file);
 
240
        exit_();
 
241
}
 
242
 
 
243
void MSOpenTable::freeReference(uint64_t blob_id, uint64_t blob_ref_id)
 
244
{
 
245
        uint32_t                repo_id;
 
246
        uint64_t                offset;
 
247
        uint64_t                blob_size;
 
248
        uint16_t                head_size;
 
249
        MSRepoFile      *repo_file;
 
250
        uint32_t                auth_code = 0;
 
251
 
 
252
        enter_();
 
253
        openForReading();
 
254
 
 
255
        getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &blob_size, &head_size, true);
 
256
        repo_file = getDB()->getRepoFileFromPool(repo_id, false);
 
257
 
 
258
        frompool_(repo_file);
 
259
        repo_file->releaseBlob(this, offset, head_size, getDBTable()->myTableID, blob_id, blob_ref_id, auth_code);
 
260
        backtopool_(repo_file);
 
261
 
 
262
        exit_();
 
263
}
 
264
 
 
265
void MSOpenTable::commitReference(uint64_t blob_id, uint64_t blob_ref_id)
 
266
{
 
267
        uint32_t                repo_id;
 
268
        uint64_t                offset;
 
269
        uint64_t                blob_size;
 
270
        uint16_t                head_size;
 
271
        MSRepoFile      *repo_file;
 
272
        uint32_t                auth_code = 0;
 
273
 
 
274
        enter_();
 
275
        openForReading();
 
276
        
 
277
        getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &blob_size, &head_size, true);
 
278
        repo_file = getDB()->getRepoFileFromPool(repo_id, false);
 
279
 
 
280
        frompool_(repo_file);
 
281
        repo_file->commitBlob(this, offset, head_size, getDBTable()->myTableID, blob_id, blob_ref_id, auth_code);
 
282
        backtopool_(repo_file);
 
283
 
 
284
        exit_();
 
285
}
 
286
 
 
287
void MSOpenTable::useBlob(int type, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code, uint16_t col_index, uint64_t blob_size, uint64_t blob_ref_id, PBMSBlobURLPtr ret_blob_url)
 
288
{
 
289
        MSRepoFile              *repo_file= NULL;
 
290
        MSBlobHeadRec   blob;
 
291
        CSInputStream   *stream;
 
292
        MSDatabase              *blob_db;
 
293
        int                             state;
 
294
        uint16_t                        head_size;
 
295
        uint64_t                        repo_offset;
 
296
        uint32_t                        repo_id;
 
297
 
 
298
        enter_();
 
299
 
 
300
        blob_db = getDB();
 
301
                
 
302
        if (!blob_db->isRecovering()) {
 
303
                // During recovery the only thing that needs to be done is to 
 
304
                // reset the database ID which is done when the URL is created.
 
305
                // Create the URL using the table ID passed in not the one from 
 
306
                // the table associated with this object.
 
307
 
 
308
                openForReading();
 
309
                if (type == MS_URL_TYPE_REPO) { // There is no table reference associated with this BLOB yet.
 
310
                        uint32_t                ac;
 
311
                        uint8_t         status;
 
312
                        bool            same_db = true;
 
313
 
 
314
                        if (blob_db->myDatabaseID == db_id)
 
315
                                repo_file = blob_db->getRepoFileFromPool(tab_id, false);
 
316
                        else {
 
317
                                same_db = false;
 
318
                                blob_db = MSDatabase::getDatabase(db_id);
 
319
                                push_(blob_db);
 
320
                                repo_file = blob_db->getRepoFileFromPool(tab_id, false);
 
321
                                release_(blob_db);
 
322
                                blob_db = repo_file->myRepo->myRepoDatabase;
 
323
                        }
 
324
                
 
325
                        frompool_(repo_file);
 
326
                        repo_file->read(&blob, blob_id, MS_MIN_BLOB_HEAD_SIZE, MS_MIN_BLOB_HEAD_SIZE);
 
327
 
 
328
                        repo_offset = blob_id;
 
329
                        blob_size  = CS_GET_DISK_6(blob.rb_blob_data_size_6);
 
330
                        head_size = CS_GET_DISK_2(blob.rb_head_size_2);
 
331
                         
 
332
                        ac = CS_GET_DISK_4(blob.rb_auth_code_4);
 
333
                        if (auth_code != ac)
 
334
                                CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB identifier");
 
335
                        status = CS_GET_DISK_1(blob.rb_status_1);
 
336
                        if ( ! IN_USE_BLOB_STATUS(status))
 
337
                                CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB has already been deleted");
 
338
 
 
339
                        if (same_db) {
 
340
                                // Create a table reference to the BLOB:
 
341
                                repo_id = tab_id;
 
342
                                blob_id = getDBTable()->createBlobHandle(this, tab_id, blob_id, blob_size, head_size, auth_code);
 
343
                                state = MS_UB_NEW_HANDLE;
 
344
                        }
 
345
                        else {
 
346
                                
 
347
                                getDB()->openWriteRepo(this);
 
348
 
 
349
                                // If either databases are using cloud storage then this is
 
350
                                // not supported yet.                   
 
351
                                if (getDB()->myBlobCloud || myWriteRepo->myRepoDatabase->myBlobCloud)
 
352
                                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Copying cloud BLOB between databases is not supported.");
 
353
                        
 
354
                                stream = repo_file->getInputStream(repo_offset);
 
355
                                push_(stream);
 
356
                                repo_offset = myWriteRepo->copyBlob(this, head_size + blob_size, stream);                       
 
357
                                release_(stream);
 
358
 
 
359
                                // Create a table reference to the BLOB:
 
360
                                repo_id = myWriteRepo->myRepoID;
 
361
                                blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
 
362
                                state = MS_UB_NEW_BLOB;
 
363
                        }
 
364
                        backtopool_(repo_file);
 
365
                }
 
366
                else {
 
367
 
 
368
                        if (blob_db->myDatabaseID == db_id && getDBTable()->myTableID == tab_id) {
 
369
                                getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &repo_offset, &blob_size, &head_size, true);
 
370
                                
 
371
                                state = MS_UB_SAME_TAB;
 
372
                        }
 
373
                        else {
 
374
                                MSOpenTable *blob_otab;
 
375
 
 
376
                                blob_otab = MSTableList::getOpenTableByID(db_id, tab_id);
 
377
                                frompool_(blob_otab);
 
378
                                blob_otab->getDBTable()->readBlobHandle(blob_otab, blob_id, &auth_code, &repo_id, &repo_offset, &blob_size, &head_size, true);
 
379
                                if (blob_db->myDatabaseID == db_id) {
 
380
                                        blob_id = getDBTable()->findBlobHandle(this, repo_id, repo_offset, blob_size, head_size, auth_code);
 
381
                                        if (blob_id == 0)
 
382
                                                blob_id = getDBTable()->createBlobHandle(this, repo_id, repo_offset, blob_size, head_size, auth_code);
 
383
                                        state = MS_UB_NEW_HANDLE;
 
384
                                }
 
385
                                else {
 
386
 
 
387
                                        // If either databases are using cloud storage then this is
 
388
                                        // not supported yet.                   
 
389
                                        if (blob_db->myBlobCloud || myWriteRepo->myRepoDatabase->myBlobCloud)
 
390
                                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Copying cloud BLOB between databases is not supported.");
 
391
 
 
392
                                        // NOTE: For each BLOB reference copied from one database to another a new
 
393
                                        // BLOB will be created. This can result in multiple copies fo the same BLOB
 
394
                                        // in the destination database. One way around this would be to redisign things
 
395
                                        // so that there is one BLOB repository shared across all databases. 
 
396
                                        blob_db->openWriteRepo(this);
 
397
                                                                        
 
398
                                        stream = repo_file->getInputStream(repo_offset);
 
399
                                        push_(stream);
 
400
                                        
 
401
                                        repo_offset = myWriteRepo->copyBlob(this, head_size + blob_size, stream);
 
402
                                        
 
403
                                        release_(stream);
 
404
 
 
405
                                        repo_id = myWriteRepo->myRepoID;
 
406
                                        blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
 
407
                                        state = MS_UB_NEW_BLOB;
 
408
                                }
 
409
                                backtopool_(blob_otab);
 
410
                        }
 
411
                        
 
412
                }
 
413
                
 
414
                blob_ref_id = blob_db->newBlobRefId();
 
415
                
 
416
                // Always use the table ID of this table because regardless of
 
417
                // where the BLOB ref came from it is being inserted into this table.
 
418
                tab_id = getDBTable()->myTableID; 
 
419
                
 
420
                // Add the BLOB reference to the repository.
 
421
                repo_file = blob_db->getRepoFileFromPool(repo_id, false);               
 
422
                frompool_(repo_file);
 
423
                repo_file->referenceBlob(this, repo_offset, head_size, tab_id, blob_id, blob_ref_id, auth_code, col_index);             
 
424
                backtopool_(repo_file);
 
425
                
 
426
                MSTransactionManager::referenceBLOB(getDB()->myDatabaseID, tab_id, blob_id, blob_ref_id);
 
427
 
 
428
        } 
 
429
        
 
430
        formatBlobURL(ret_blob_url, blob_id, auth_code, blob_size, tab_id, blob_ref_id);
 
431
                
 
432
        exit_();
 
433
}
 
434
 
 
435
void MSOpenTable::releaseReference(uint64_t blob_id, uint64_t blob_ref_id)
 
436
{
 
437
        enter_();
 
438
        
 
439
        MSTransactionManager::dereferenceBLOB(getDB()->myDatabaseID, getDBTable()->myTableID, blob_id, blob_ref_id);
 
440
 
 
441
        exit_();
 
442
}
 
443
 
 
444
void MSOpenTable::checkBlob(CSStringBuffer *buffer, uint64_t blob_id, uint32_t auth_code, uint32_t temp_log_id, uint32_t temp_log_offset)
 
445
{
 
446
        uint32_t                repo_id;
 
447
        uint64_t                offset;
 
448
        uint64_t                size;
 
449
        uint16_t                head_size;
 
450
        MSRepoFile      *repo_file;
 
451
 
 
452
        enter_();
 
453
        openForReading();
 
454
        if (getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &size, &head_size, false)) {
 
455
                if ((repo_file = getDB()->getRepoFileFromPool(repo_id, true))) {
 
456
                        frompool_(repo_file);
 
457
                        repo_file->checkBlob(buffer, offset, auth_code, temp_log_id, temp_log_offset);
 
458
                        backtopool_(repo_file);
 
459
                }
 
460
                else
 
461
                        getDBTable()->freeBlobHandle(this, blob_id, repo_id, offset, auth_code);
 
462
        }
 
463
        exit_();
 
464
}
 
465
 
 
466
bool MSOpenTable::deleteReferences(uint32_t temp_log_id, uint32_t temp_log_offset, bool *must_quit)
 
467
{
 
468
        MSTableHeadRec          tab_head;
 
469
        off64_t                         blob_id;
 
470
        MSTableBlobRec          tab_blob;
 
471
        uint32_t                                repo_id;
 
472
        uint64_t                                repo_offset;
 
473
        uint16_t                                head_size;
 
474
        uint32_t                                auth_code;
 
475
        MSRepoFile                      *repo_file = NULL;
 
476
        bool                            result = true;
 
477
 
 
478
        enter_();
 
479
        openForReading();
 
480
        if (myTableFile->read(&tab_head, 0, offsetof(MSTableHeadRec, th_reserved_4), 0) < offsetof(MSTableHeadRec, th_reserved_4))
 
481
                /* Nothing to read, delete it ... */
 
482
                goto exit;
 
483
        if (CS_GET_DISK_4(tab_head.th_temp_log_id_4) != temp_log_id ||
 
484
                CS_GET_DISK_4(tab_head.th_temp_log_offset_4) != temp_log_offset) {
 
485
                /* Wrong delete reference (ignore): */
 
486
                result = false;
 
487
                goto exit;
 
488
        }
 
489
 
 
490
        blob_id = CS_GET_DISK_2(tab_head.th_head_size_2);
 
491
        while (blob_id + sizeof(MSTableBlobRec) <= getDBTable()->getTableFileSize()) {
 
492
                if (*must_quit) {
 
493
                        /* Bit of a waste of work, but we must quit! */
 
494
                        result = false;
 
495
                        break;
 
496
                }
 
497
                if (myTableFile->read(&tab_blob, blob_id, sizeof(MSTableBlobRec), 0) < sizeof(MSTableBlobRec))
 
498
                        break;
 
499
                repo_id = CS_GET_DISK_3(tab_blob.tb_repo_id_3);
 
500
                repo_offset = CS_GET_DISK_6(tab_blob.tb_offset_6);
 
501
                head_size = CS_GET_DISK_2(tab_blob.tb_header_size_2);
 
502
                auth_code = CS_GET_DISK_4(tab_blob.tb_auth_code_4);
 
503
                if (repo_file && repo_file->myRepo->myRepoID != repo_id) {
 
504
                        backtopool_(repo_file);
 
505
                        repo_file = NULL;
 
506
                }
 
507
                if (!repo_file) {
 
508
                        repo_file = getDB()->getRepoFileFromPool(repo_id, true);
 
509
                        if (repo_file)
 
510
                                frompool_(repo_file);
 
511
                }
 
512
                if (repo_file) 
 
513
                        repo_file->freeTableReference(this, repo_offset, head_size, getDBTable()->myTableID, blob_id, auth_code);
 
514
                
 
515
                blob_id += sizeof(MSTableBlobRec);
 
516
        }
 
517
        
 
518
        if (repo_file)
 
519
                backtopool_(repo_file);
 
520
 
 
521
        exit:
 
522
        return_(result);
 
523
}
 
524
 
 
525
void MSOpenTable::openForReading()
 
526
{
 
527
        if (!myTableFile && !isNotATable)
 
528
                myTableFile = getDBTable()->openTableFile();
 
529
}
 
530
 
 
531
void MSOpenTable::openForWriting()
 
532
{
 
533
        if (myTableFile && myWriteRepo && myWriteRepoFile)
 
534
                return;
 
535
        enter_();
 
536
        openForReading();
 
537
        if (!myWriteRepo || !myWriteRepoFile)
 
538
                getDB()->openWriteRepo(this);
 
539
        exit_();
 
540
}
 
541
 
 
542
void MSOpenTable::closeForWriting()
 
543
{
 
544
        if (myWriteRepoFile) {          
 
545
                myWriteRepoFile->myRepo->syncHead(myWriteRepoFile);
 
546
                myWriteRepoFile->release();
 
547
                myWriteRepoFile = NULL;
 
548
        }
 
549
        if (myWriteRepo) {
 
550
                myWriteRepo->unlockRepo(REPO_WRITE);
 
551
#ifndef MS_COMPACTOR_POLLS
 
552
                if (myWriteRepo->getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
 
553
                        if (myWriteRepo->myRepoDatabase->myCompactorThread)
 
554
                                myWriteRepo->myRepoDatabase->myCompactorThread->wakeup();
 
555
                }
 
556
#endif
 
557
                myWriteRepo->release();
 
558
                myWriteRepo = NULL;
 
559
        }
 
560
}
 
561
 
 
562
uint32_t MSOpenTable::getTableID()
 
563
{
 
564
        return myPool->myPoolTable->myTableID;
 
565
}
 
566
 
 
567
MSTable *MSOpenTable::getDBTable()
 
568
{
 
569
        return myPool->myPoolTable;
 
570
}
 
571
 
 
572
MSDatabase *MSOpenTable::getDB()
 
573
{
 
574
        return myPool->myPoolDB;
 
575
}
 
576
 
 
577
void MSOpenTable::formatBlobURL(PBMSBlobURLPtr blob_url, uint64_t blob_id, uint32_t auth_code, uint64_t blob_size, uint32_t tab_id, uint64_t blob_ref_id)
 
578
{
 
579
        MSBlobURLRec blob;
 
580
        
 
581
        blob.bu_type = MS_URL_TYPE_BLOB;
 
582
        blob.bu_db_id = getDB()->myDatabaseID;
 
583
        blob.bu_tab_id = tab_id;
 
584
        blob.bu_blob_id = blob_id;
 
585
        blob.bu_auth_code = auth_code;
 
586
        blob.bu_server_id = PBMSParameters::getServerID();
 
587
        blob.bu_blob_size = blob_size;
 
588
        blob.bu_blob_ref_id = blob_ref_id;
 
589
        
 
590
        PBMSBlobURLTools::buildBlobURL(&blob, blob_url);
 
591
        
 
592
}
 
593
void MSOpenTable::formatBlobURL(PBMSBlobURLPtr blob_url, uint64_t blob_id, uint32_t auth_code, uint64_t blob_size, uint64_t blob_ref_id)
 
594
{
 
595
        formatBlobURL(blob_url, blob_id, auth_code, blob_size, getDBTable()->myTableID, blob_ref_id);
 
596
}
 
597
void MSOpenTable::formatRepoURL(PBMSBlobURLPtr blob_url, uint32_t log_id, uint64_t log_offset, uint32_t auth_code, uint64_t blob_size)
 
598
{
 
599
        MSBlobURLRec blob;
 
600
        
 
601
        blob.bu_type = MS_URL_TYPE_REPO;
 
602
        blob.bu_db_id = getDB()->myDatabaseID;
 
603
        blob.bu_tab_id = log_id;
 
604
        blob.bu_blob_id = log_offset;
 
605
        blob.bu_auth_code = auth_code;
 
606
        blob.bu_server_id = PBMSParameters::getServerID();
 
607
        blob.bu_blob_size = blob_size;
 
608
        blob.bu_blob_ref_id = 0;
 
609
        
 
610
        PBMSBlobURLTools::buildBlobURL(&blob, blob_url);
 
611
}
 
612
 
 
613
MSOpenTable *MSOpenTable::newOpenTable(MSOpenTablePool *pool)
 
614
{
 
615
        MSOpenTable *otab;
 
616
        
 
617
        if (!(otab = new MSOpenTable()))
 
618
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
619
        if ((otab->myPool = pool))
 
620
                otab->isNotATable = pool->myPoolTable == NULL;
 
621
        else
 
622
                otab->isNotATable = false;
 
623
                
 
624
        return otab;
 
625
}
 
626
 
 
627
/*
 
628
 * ---------------------------------------------------------------
 
629
 * OPEN TABLE POOLS
 
630
 */
 
631
 
 
632
MSOpenTablePool::MSOpenTablePool():
 
633
myPoolTableID(0),
 
634
isRemovingTP(false),
 
635
myPoolTable(NULL),
 
636
myPoolDB(NULL),
 
637
iTablePool(NULL)
 
638
{
 
639
}
 
640
 
 
641
MSOpenTablePool::~MSOpenTablePool()
 
642
{
 
643
        isRemovingTP = true;
 
644
        removeOpenTablesNotInUse();
 
645
        /* With this, I also delete those that are in use!: */
 
646
        iPoolTables.clear();
 
647
        if (myPoolTable)
 
648
                myPoolTable->release();
 
649
        if (myPoolDB)
 
650
                myPoolDB->release();
 
651
}
 
652
 
 
653
#ifdef DEBUG
 
654
void MSOpenTablePool::check()
 
655
{
 
656
        MSOpenTable     *otab, *ptab;
 
657
        bool            found;
 
658
 
 
659
        if ((otab = (MSOpenTable *) iPoolTables.getBack())) {
 
660
                do {
 
661
                        found = false;
 
662
                        ptab = iTablePool;
 
663
                        while (ptab) {
 
664
                                if (ptab == otab) {
 
665
                                        ASSERT(!found);
 
666
                                        found = true;
 
667
                                }
 
668
                                ptab = ptab->nextTable;
 
669
                        }
 
670
                        if (otab->inUse) {
 
671
                                ASSERT(!found);
 
672
                        }
 
673
                        else {
 
674
                                ASSERT(found);
 
675
                        }
 
676
                        otab = (MSOpenTable *) otab->getNextLink();
 
677
                } while (otab);
 
678
        }
 
679
        else
 
680
                ASSERT(!iTablePool);
 
681
}
 
682
#endif
 
683
 
 
684
/*
 
685
 * This returns the table referenced. So it is safe from the pool being
 
686
 * destroyed.
 
687
 */
 
688
MSOpenTable *MSOpenTablePool::getPoolTable()
 
689
{
 
690
        MSOpenTable *otab;
 
691
 
 
692
        if ((otab = iTablePool)) {
 
693
                iTablePool = otab->nextTable;
 
694
                otab->nextTable = NULL;
 
695
                ASSERT(!otab->inUse);
 
696
                otab->inUse = true;
 
697
                otab->retain();
 
698
        }
 
699
        return otab;
 
700
}
 
701
 
 
702
void MSOpenTablePool::returnOpenTable(MSOpenTable *otab)
 
703
{
 
704
        otab->inUse = false;
 
705
        otab->nextTable = iTablePool;
 
706
        iTablePool = otab;
 
707
}
 
708
 
 
709
/*
 
710
 * Add a table to the pool, but do not release it!
 
711
 */
 
712
void MSOpenTablePool::addOpenTable(MSOpenTable *otab)
 
713
{
 
714
        iPoolTables.addFront(otab);
 
715
}
 
716
 
 
717
void MSOpenTablePool::removeOpenTable(MSOpenTable *otab)
 
718
{
 
719
        otab->close();
 
720
        iPoolTables.remove(otab);
 
721
}
 
722
 
 
723
void MSOpenTablePool::removeOpenTablesNotInUse()
 
724
{
 
725
        MSOpenTable *otab, *curr_otab;
 
726
 
 
727
        iTablePool = NULL;
 
728
        /* Remove all tables that are not in use: */
 
729
        if ((otab = (MSOpenTable *) iPoolTables.getBack())) {
 
730
                do {
 
731
                        curr_otab = otab;
 
732
                        otab = (MSOpenTable *) otab->getNextLink();
 
733
                        if (!curr_otab->inUse)
 
734
                                iPoolTables.remove(curr_otab);
 
735
                } while (otab);
 
736
        }
 
737
}
 
738
 
 
739
void MSOpenTablePool::returnToPool()
 
740
{
 
741
        MSTableList::removeTablePool(this);
 
742
}
 
743
 
 
744
MSOpenTablePool *MSOpenTablePool::newPool(uint32_t db_id, uint32_t tab_id)
 
745
{
 
746
        MSOpenTablePool *pool;
 
747
        enter_();
 
748
        
 
749
        if (!(pool = new MSOpenTablePool())) {
 
750
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
751
        }
 
752
        push_(pool);
 
753
        pool->myPoolDB = MSDatabase::getDatabase(db_id);
 
754
        pool->myPoolTableID = tab_id;
 
755
        if (tab_id)
 
756
                pool->myPoolTable = pool->myPoolDB->getTable(tab_id, false);
 
757
        pop_(pool);
 
758
        return_(pool);
 
759
}
 
760
 
 
761
/*
 
762
 * ---------------------------------------------------------------
 
763
 * TABLE LIST
 
764
 */
 
765
 
 
766
CSSyncOrderedList               *MSTableList::gPoolListByID;
 
767
 
 
768
MSTableList::MSTableList()
 
769
{       
 
770
}
 
771
 
 
772
MSTableList::~MSTableList()
 
773
{
 
774
}
 
775
 
 
776
void MSTableList::startUp()
 
777
{
 
778
        new_(gPoolListByID, CSSyncOrderedList);
 
779
}
 
780
 
 
781
void MSTableList::shutDown()
 
782
{
 
783
        if (gPoolListByID) {
 
784
                gPoolListByID->clear();
 
785
                gPoolListByID->release();
 
786
                gPoolListByID = NULL;
 
787
        }
 
788
}
 
789
 
 
790
class MSTableKey : public CSOrderKey {
 
791
public:
 
792
        uint32_t        myKeyDatabaseID;
 
793
        uint32_t        myKeyTableID;
 
794
 
 
795
        MSTableKey(): myKeyDatabaseID(0), myKeyTableID(0){ }
 
796
 
 
797
        virtual ~MSTableKey() {
 
798
        }
 
799
 
 
800
        int compareKey(CSObject *key) {return CSObject::compareKey(key);}
 
801
        virtual int compareKey(CSOrderKey *x) {
 
802
                MSTableKey      *key = (MSTableKey *) x;
 
803
                int                     r = 0;
 
804
 
 
805
                if (myKeyDatabaseID < key->myKeyDatabaseID)
 
806
                        r = -1;
 
807
                else if (myKeyDatabaseID > key->myKeyDatabaseID)
 
808
                        r = 1;
 
809
                        
 
810
                if (r == 0) {
 
811
                        if (myKeyTableID < key->myKeyTableID)
 
812
                                r = -1;
 
813
                        else if (myKeyTableID > key->myKeyTableID)
 
814
                                r = 1;
 
815
                }
 
816
                return r;
 
817
        }
 
818
 
 
819
public:
 
820
        static MSTableKey *newTableKey(uint32_t db_id, uint32_t tab_id)
 
821
        {
 
822
                MSTableKey *key;
 
823
 
 
824
                if (!(key = new MSTableKey())) {
 
825
                        CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
826
                }
 
827
                key->myKeyDatabaseID = db_id;
 
828
                key->myKeyTableID = tab_id;
 
829
                return key;
 
830
        }
 
831
};
 
832
 
 
833
MSOpenTable *MSTableList::getOpenTableByID(uint32_t db_id, uint32_t tab_id)
 
834
{
 
835
        MSOpenTablePool         *pool;
 
836
        MSOpenTable                     *otab = NULL;
 
837
        MSTableKey                      key;
 
838
 
 
839
        enter_();
 
840
        lock_(gPoolListByID);
 
841
        key.myKeyDatabaseID = db_id;
 
842
        key.myKeyTableID = tab_id;
 
843
        pool = (MSOpenTablePool *) gPoolListByID->find(&key);
 
844
        if (!pool) {
 
845
                MSTableKey      *key_ptr;
 
846
                pool = MSOpenTablePool::newPool(db_id, tab_id);
 
847
                key_ptr = MSTableKey::newTableKey(db_id, tab_id);
 
848
                gPoolListByID->add(key_ptr, pool);
 
849
        }
 
850
        if (!(otab = pool->getPoolTable())) {
 
851
                otab = MSOpenTable::newOpenTable(pool);
 
852
                pool->addOpenTable(otab);
 
853
                otab->retain();
 
854
        }
 
855
        unlock_(gPoolListByID);
 
856
        return_(otab);
 
857
}
 
858
 
 
859
MSOpenTable *MSTableList::getOpenTableForDB(uint32_t db_id)
 
860
{
 
861
        return(MSTableList::getOpenTableByID(db_id, 0));
 
862
}
 
863
 
 
864
 
 
865
void MSTableList::releaseTable(MSOpenTable *otab)
 
866
{
 
867
        MSOpenTablePool *pool;
 
868
 
 
869
        enter_();
 
870
        lock_(gPoolListByID);
 
871
        push_(otab);
 
872
        if ((pool = otab->myPool)) {
 
873
                if (pool->isRemovingTP) {
 
874
                        pool->removeOpenTable(otab);
 
875
                        gPoolListByID->wakeup();
 
876
                }
 
877
                else
 
878
                        pool->returnOpenTable(otab);
 
879
        }
 
880
        release_(otab);
 
881
        unlock_(gPoolListByID);
 
882
        exit_();
 
883
}
 
884
 
 
885
bool MSTableList::removeTablePoolIfEmpty(MSOpenTablePool *pool)
 
886
{
 
887
        enter_();
 
888
        if (pool->getSize() == 0) {
 
889
                MSTableKey      key;
 
890
                
 
891
                key.myKeyDatabaseID = pool->myPoolDB->myDatabaseID;
 
892
                key.myKeyTableID = pool->myPoolTableID;
 
893
                gPoolListByID->remove(&key);
 
894
                /* TODO: Remove the table from the database, if it does not exist
 
895
                 * on disk.
 
896
                 */
 
897
                return_(true);
 
898
        }
 
899
        return_(false);
 
900
}
 
901
 
 
902
void MSTableList::removeTablePool(MSOpenTablePool *pool)
 
903
{
 
904
        enter_();
 
905
        lock_(gPoolListByID);
 
906
        for (;;) {
 
907
                pool->isRemovingTP = true;
 
908
                pool->removeOpenTablesNotInUse();
 
909
                if (removeTablePoolIfEmpty(pool)) 
 
910
                        break;
 
911
 
 
912
                /*
 
913
                 * Wait for the tables that are in use to be
 
914
                 * freed.
 
915
                 */
 
916
                gPoolListByID->wait();
 
917
        }
 
918
        unlock_(gPoolListByID);
 
919
        exit_();
 
920
}
 
921
 
 
922
/*
 
923
 * Close the pool associated with this open table.
 
924
 */
 
925
void MSTableList::removeTablePool(MSOpenTable *otab)
 
926
{
 
927
        MSOpenTablePool *pool;
 
928
        MSTableKey      key;
 
929
        
 
930
        key.myKeyDatabaseID = otab->getDB()->myDatabaseID;
 
931
        key.myKeyTableID = otab->getTableID();
 
932
 
 
933
        enter_();
 
934
        frompool_(otab);
 
935
        lock_(gPoolListByID);
 
936
        for (;;) {
 
937
                if (!(pool = (MSOpenTablePool *) gPoolListByID->find(&key)))
 
938
                        break;
 
939
                pool->isRemovingTP = true;
 
940
                pool->removeOpenTablesNotInUse();
 
941
                if (removeTablePoolIfEmpty(pool))
 
942
                        break;
 
943
                /*
 
944
                 * Wait for the tables that are in use to be
 
945
                 * freed.
 
946
                 */
 
947
                gPoolListByID->wait();
 
948
        }
 
949
        unlock_(gPoolListByID);
 
950
        backtopool_(otab);
 
951
        exit_();
 
952
}
 
953
 
 
954
void MSTableList::removeDatabaseTables(MSDatabase *database)
 
955
{
 
956
        MSOpenTablePool *pool;
 
957
        uint32_t                        idx;
 
958
        
 
959
 
 
960
        enter_();
 
961
        push_(database);
 
962
        
 
963
        retry:
 
964
        lock_(gPoolListByID);
 
965
        idx = 0;
 
966
        while ((pool = (MSOpenTablePool *) gPoolListByID->itemAt(idx))) {
 
967
                if (pool->myPoolDB == database) {
 
968
                        break;
 
969
                }
 
970
                idx++;
 
971
        }
 
972
        unlock_(gPoolListByID);
 
973
 
 
974
        if (pool) {
 
975
                removeTablePool(pool);
 
976
                goto retry;
 
977
        }
 
978
        
 
979
        release_(database);
 
980
        exit_();
 
981
}
 
982
 
 
983
// lockTablePoolForDeletion() is only called to lock a pool for a table which is about  to be removed.
 
984
// When the pool is returned then it will be removed from the global pool list.
 
985
MSOpenTablePool *MSTableList::lockTablePoolForDeletion(uint32_t db_id, uint32_t tab_id, CSString *db_name, CSString *tab_name)
 
986
{
 
987
        MSOpenTablePool *pool;
 
988
        MSTableKey              key;
 
989
 
 
990
        enter_();
 
991
 
 
992
        push_(db_name);
 
993
        if (tab_name)
 
994
                push_(tab_name);
 
995
                
 
996
        key.myKeyDatabaseID = db_id;
 
997
        key.myKeyTableID = tab_id;
 
998
        
 
999
        lock_(gPoolListByID);
 
1000
 
 
1001
        for (;;) {
 
1002
                if (!(pool = (MSOpenTablePool *) gPoolListByID->find(&key))) {
 
1003
                        char buffer[CS_EXC_MESSAGE_SIZE];
 
1004
 
 
1005
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Table is temporarily not available: ");
 
1006
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, db_name->getCString());
 
1007
                        if(tab_name) {
 
1008
                                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, ".");
 
1009
                                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, tab_name->getCString());
 
1010
                        }
 
1011
                        CSException::throwException(CS_CONTEXT, MS_ERR_TABLE_LOCKED, buffer);
 
1012
                }
 
1013
                pool->isRemovingTP = true;
 
1014
                pool->removeOpenTablesNotInUse();
 
1015
                if (pool->getSize() == 0) {
 
1016
                        // pool->retain();      Do not do this. The return to pool will free this by removing it from the list. 
 
1017
                        break;
 
1018
                }
 
1019
                /*
 
1020
                 * Wait for the tables that are in use to be
 
1021
                 * freed.
 
1022
                 */
 
1023
                gPoolListByID->wait();
 
1024
        }
 
1025
        unlock_(gPoolListByID);
 
1026
        
 
1027
        if (tab_name)
 
1028
                release_(tab_name);
 
1029
        release_(db_name);
 
1030
        return_(pool);  
 
1031
        
 
1032
}
 
1033
 
 
1034
MSOpenTablePool *MSTableList::lockTablePoolForDeletion(MSTable *tab)
 
1035
{
 
1036
        CSString *tab_name = NULL, *db_name;
 
1037
        uint32_t db_id, tab_id;
 
1038
        
 
1039
        enter_();
 
1040
 
 
1041
        db_name = tab->myDatabase->myDatabaseName;
 
1042
        db_name->retain();
 
1043
 
 
1044
        tab_name = tab->myTableName;
 
1045
        tab_name->retain();
 
1046
        
 
1047
        db_id = tab->myDatabase->myDatabaseID;
 
1048
        tab_id = tab->myTableID;
 
1049
        
 
1050
        tab->release();
 
1051
        
 
1052
        return_( lockTablePoolForDeletion(db_id, tab_id, db_name, tab_name));
 
1053
}
 
1054
 
 
1055
MSOpenTablePool *MSTableList::lockTablePoolForDeletion(MSOpenTable *otab)
 
1056
{
 
1057
        CSString *tab_name = NULL, *db_name;
 
1058
        uint32_t db_id, tab_id;
 
1059
        MSTable *tab;
 
1060
 
 
1061
        enter_();
 
1062
        
 
1063
        tab = otab->getDBTable();
 
1064
        if (tab) {
 
1065
                tab_name = tab->myTableName;
 
1066
                tab_name->retain();
 
1067
        }
 
1068
        
 
1069
        db_name = otab->getDB()->myDatabaseName;
 
1070
        db_name->retain();
 
1071
 
 
1072
        db_id = otab->getDB()->myDatabaseID;
 
1073
        tab_id = otab->getTableID();
 
1074
 
 
1075
        otab->returnToPool();
 
1076
 
 
1077
        return_( lockTablePoolForDeletion(db_id, tab_id, db_name, tab_name));
 
1078
        
 
1079
}
 
1080
 
 
1081