~stewart/drizzle/bug911643

« back to all changes in this revision

Viewing changes to plugin/pbms/src/open_table_ms.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Original author: Paul McCullagh
20
 
 * Continued development: Barry Leslie
21
 
 *
22
 
 * 2007-06-04
23
 
 *
24
 
 * H&G2JCtL
25
 
 *
26
 
 * Media Stream Tables.
27
 
 *
28
 
 */
29
 
#include "cslib/CSConfig.h"
30
 
 
31
 
#include "defs_ms.h"
32
 
 
33
 
#include "cslib/CSGlobal.h"
34
 
#include "cslib/CSLog.h"
35
 
#include "cslib/CSStrUtil.h"
36
 
#include "cslib/CSPath.h"
37
 
 
38
 
#include "open_table_ms.h"
39
 
#include "table_ms.h"
40
 
#include "connection_handler_ms.h"
41
 
#include "engine_ms.h"
42
 
#include "transaction_ms.h"
43
 
#include "parameters_ms.h"
44
 
 
45
 
/*
46
 
 * ---------------------------------------------------------------
47
 
 * OPEN TABLES
48
 
 */
49
 
 
50
 
MSOpenTable::MSOpenTable():
51
 
CSRefObject(),
52
 
CSPooled(),
53
 
inUse(true),
54
 
isNotATable(false),
55
 
nextTable(NULL),
56
 
myPool(NULL),
57
 
myTableFile(NULL),
58
 
myWriteRepo(NULL),
59
 
myWriteRepoFile(NULL),
60
 
myTempLogFile(NULL),
61
 
iNextLink(NULL),
62
 
iPrevLink(NULL)
63
 
//iUseSize(0),
64
 
//iUseCount(0),
65
 
//iUsedBlobs(0)
66
 
{
67
 
        memset(myOTBuffer, 0, MS_OT_BUFFER_SIZE); // wipe this to make valgrind happy.
68
 
}
69
 
 
70
 
MSOpenTable::~MSOpenTable()
71
 
{
72
 
        close();
73
 
}
74
 
 
75
 
void MSOpenTable::close()
76
 
{
77
 
        enter_();
78
 
        if (myTableFile) {
79
 
                myTableFile->release();
80
 
                myTableFile = NULL;
81
 
        }
82
 
        closeForWriting();
83
 
        if (myTempLogFile) {
84
 
                myTempLogFile->release();
85
 
                myTempLogFile = NULL;
86
 
        }
87
 
/*
88
 
        if (iUsedBlobs) {
89
 
                cs_free(iUsedBlobs);
90
 
                iUsedBlobs = NULL;
91
 
        }
92
 
        iUseCount = 0;
93
 
        iUseSize = 0;
94
 
*/
95
 
        exit_();
96
 
}
97
 
 
98
 
void MSOpenTable::returnToPool()
99
 
{
100
 
        MSTableList::releaseTable(this);
101
 
}
102
 
 
103
 
// This cleanup class is used to reset the 
104
 
// repository size if something goes wrong.
105
 
class CreateBlobCleanUp : public CSRefObject {
106
 
        bool do_cleanup;
107
 
        uint64_t old_size;
108
 
        MSOpenTable *ot;
109
 
        MSRepository *repo;
110
 
 
111
 
        public:
112
 
        
113
 
        CreateBlobCleanUp(): CSRefObject(),
114
 
                do_cleanup(false){}
115
 
                
116
 
        ~CreateBlobCleanUp() 
117
 
        {
118
 
                if (do_cleanup) {
119
 
                        repo->setRepoFileSize(ot, old_size);
120
 
 
121
 
                }
122
 
        }
123
 
        
124
 
        void setCleanUp(MSOpenTable *ot_arg, MSRepository *repo_arg, uint64_t size)
125
 
        {
126
 
                old_size = size;
127
 
                repo = repo_arg;
128
 
                ot = ot_arg;
129
 
                do_cleanup = true;
130
 
        }
131
 
        
132
 
        void cancelCleanUp()
133
 
        {
134
 
                do_cleanup = false;
135
 
        }
136
 
        
137
 
};
138
 
 
139
 
void MSOpenTable::createBlob(PBMSBlobURLPtr bh, uint64_t blob_size, char *metadata, uint16_t metadata_size, CSInputStream *stream, CloudKeyPtr cloud_key, Md5Digest *checksum)
140
 
{
141
 
        uint64_t repo_offset;
142
 
        uint64_t blob_id = 0;
143
 
        uint32_t        auth_code;
144
 
        uint16_t head_size;
145
 
        uint32_t        log_id;
146
 
        uint32_t log_offset;
147
 
        uint32_t temp_time;
148
 
        uint64_t repo_size;
149
 
        uint64_t repo_id;
150
 
        Md5Digest my_checksum;
151
 
        CloudKeyRec cloud_key_rec;
152
 
        CreateBlobCleanUp *cleanup;
153
 
        enter_();
154
 
        
155
 
        new_(cleanup, CreateBlobCleanUp());
156
 
        push_(cleanup);
157
 
        
158
 
        if (!checksum)
159
 
                checksum = &my_checksum;
160
 
                
161
 
        if (stream) push_(stream);
162
 
        openForWriting();
163
 
        ASSERT(myWriteRepo);
164
 
        auth_code = random();
165
 
        repo_size = myWriteRepo->getRepoFileSize();
166
 
        temp_time =     myWriteRepo->myLastTempTime;
167
 
 
168
 
        // If an exception occurs the cleanup operation will be called.
169
 
        cleanup->setCleanUp(this, myWriteRepo, repo_size);
170
 
 
171
 
        head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
172
 
        if (getDB()->myBlobType == MS_STANDARD_STORAGE) {
173
 
                pop_(stream);
174
 
                repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size, checksum, stream);
175
 
        } else {
176
 
                ASSERT(getDB()->myBlobType == MS_CLOUD_STORAGE);
177
 
                CloudDB *cloud = getDB()->myBlobCloud;
178
 
                
179
 
                if (!cloud)
180
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Creating cloud BLOB without cloud.");
181
 
        
182
 
                repo_offset = repo_size + head_size;
183
 
                memset(checksum, 0, sizeof(Md5Digest)); // The checksum is only for local storage.
184
 
                
185
 
                // If there is a stream then the data has not been sent to the cloud yet.
186
 
                if (stream) { 
187
 
                        cloud_key = &cloud_key_rec;
188
 
                        cloud->cl_getNewKey(cloud_key);
189
 
                        pop_(stream);
190
 
                        cloud->cl_putData(cloud_key, stream, blob_size);
191
 
                }
192
 
                
193
 
        }
194
 
        
195
 
        repo_id = myWriteRepo->myRepoID;
196
 
        if (isNotATable) {      
197
 
                getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
198
 
                formatRepoURL(bh, repo_id, repo_offset, auth_code, blob_size);
199
 
        }
200
 
        else {
201
 
                blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
202
 
                getDB()->queueForDeletion(this, MS_TL_BLOB_REF, getDBTable()->myTableID, blob_id, auth_code, &log_id, &log_offset, &temp_time);
203
 
                formatBlobURL(bh, blob_id, auth_code, blob_size, 0);
204
 
        }
205
 
        
206
 
        myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, checksum, metadata, metadata_size, blob_id, auth_code, log_id, log_offset, getDB()->myBlobType, cloud_key);
207
 
        
208
 
        cleanup->cancelCleanUp();
209
 
        release_(cleanup);
210
 
        
211
 
        exit_();
212
 
}
213
 
 
214
 
// BLOBs created with this method are always created as standard local BLOBs. (No cloud storage)
215
 
void MSOpenTable::createBlob(PBMSBlobIDPtr blob_id, uint64_t blob_size, char *metadata, uint16_t metadata_size)
216
 
{
217
 
        uint64_t repo_size;
218
 
        uint64_t repo_offset;
219
 
        uint64_t repo_id;
220
 
        uint32_t        auth_code;
221
 
        uint16_t head_size;
222
 
        uint32_t        log_id;
223
 
        uint32_t log_offset;
224
 
        uint32_t temp_time;
225
 
        CreateBlobCleanUp *cleanup;
226
 
        enter_();
227
 
        
228
 
        new_(cleanup, CreateBlobCleanUp());
229
 
        push_(cleanup);
230
 
 
231
 
        openForWriting();
232
 
        ASSERT(myWriteRepo);
233
 
        auth_code = random();
234
 
        
235
 
        repo_size = myWriteRepo->getRepoFileSize();
236
 
        
237
 
        // If an exception occurs the cleanup operation will be called.
238
 
        cleanup->setCleanUp(this, myWriteRepo, repo_size);
239
 
 
240
 
        head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
241
 
 
242
 
        repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size);
243
 
        repo_id = myWriteRepo->myRepoID;
244
 
        temp_time = myWriteRepo->myLastTempTime;
245
 
        getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
246
 
        myWriteRepo->myLastTempTime = temp_time;
247
 
        myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, NULL, metadata, metadata_size, 0, auth_code, log_id, log_offset, MS_STANDARD_STORAGE, NULL);
248
 
        // myWriteRepo->setRepoFileSize(this, repo_offset + head_size + blob_size);This is now set by writeBlobHead()
249
 
        
250
 
        blob_id->bi_db_id = getDB()->myDatabaseID;
251
 
        blob_id->bi_blob_id = repo_offset;
252
 
        blob_id->bi_tab_id = repo_id;
253
 
        blob_id->bi_auth_code = auth_code;
254
 
        blob_id->bi_blob_size = blob_size;
255
 
        blob_id->bi_blob_type = MS_URL_TYPE_REPO;
256
 
        blob_id->bi_blob_ref_id = 0;
257
 
        
258
 
        cleanup->cancelCleanUp();
259
 
        release_(cleanup);
260
 
 
261
 
        exit_();
262
 
}
263
 
 
264
 
void MSOpenTable::sendRepoBlob(uint64_t blob_id, uint64_t req_offset, uint64_t req_size, uint32_t auth_code, bool info_only, CSHTTPOutputStream *stream)
265
 
{
266
 
        uint32_t                repo_id;
267
 
        uint64_t                offset;
268
 
        uint64_t                size;
269
 
        uint16_t                head_size;
270
 
        MSRepoFile      *repo_file;
271
 
 
272
 
        enter_();
273
 
        openForReading();
274
 
        getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &size, &head_size, true);
275
 
        repo_file = getDB()->getRepoFileFromPool(repo_id, false);
276
 
        frompool_(repo_file);
277
 
        //repo_file->sendBlob(this, offset, head_size, size, stream);
278
 
        repo_file->sendBlob(this, offset, req_offset, req_size, 0, false, info_only, stream);
279
 
        backtopool_(repo_file);
280
 
        exit_();
281
 
}
282
 
 
283
 
void MSOpenTable::freeReference(uint64_t blob_id, uint64_t blob_ref_id)
284
 
{
285
 
        uint32_t                repo_id;
286
 
        uint64_t                offset;
287
 
        uint64_t                blob_size;
288
 
        uint16_t                head_size;
289
 
        MSRepoFile      *repo_file;
290
 
        uint32_t                auth_code = 0;
291
 
 
292
 
        enter_();
293
 
        openForReading();
294
 
 
295
 
        getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &blob_size, &head_size, true);
296
 
        repo_file = getDB()->getRepoFileFromPool(repo_id, false);
297
 
 
298
 
        frompool_(repo_file);
299
 
        repo_file->releaseBlob(this, offset, head_size, getDBTable()->myTableID, blob_id, blob_ref_id, auth_code);
300
 
        backtopool_(repo_file);
301
 
 
302
 
        exit_();
303
 
}
304
 
 
305
 
void MSOpenTable::commitReference(uint64_t blob_id, uint64_t blob_ref_id)
306
 
{
307
 
        uint32_t                repo_id;
308
 
        uint64_t                offset;
309
 
        uint64_t                blob_size;
310
 
        uint16_t                head_size;
311
 
        MSRepoFile      *repo_file;
312
 
        uint32_t                auth_code = 0;
313
 
 
314
 
        enter_();
315
 
        openForReading();
316
 
        
317
 
        getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &blob_size, &head_size, true);
318
 
        repo_file = getDB()->getRepoFileFromPool(repo_id, false);
319
 
 
320
 
        frompool_(repo_file);
321
 
        repo_file->commitBlob(this, offset, head_size, getDBTable()->myTableID, blob_id, blob_ref_id, auth_code);
322
 
        backtopool_(repo_file);
323
 
 
324
 
        exit_();
325
 
}
326
 
 
327
 
void MSOpenTable::useBlob(int type, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code, uint16_t col_index, uint64_t blob_size, uint64_t blob_ref_id, PBMSBlobURLPtr ret_blob_url)
328
 
{
329
 
        MSRepoFile              *repo_file= NULL;
330
 
        MSBlobHeadRec   blob;
331
 
        CSInputStream   *stream;
332
 
        MSDatabase              *blob_db;
333
 
        int                             state;
334
 
        uint16_t                        head_size;
335
 
        uint64_t                        repo_offset;
336
 
        uint32_t                        repo_id;
337
 
 
338
 
        enter_();
339
 
        (void)state;
340
 
 
341
 
        blob_db = getDB();
342
 
                
343
 
        if (!blob_db->isRecovering()) {
344
 
                // During recovery the only thing that needs to be done is to 
345
 
                // reset the database ID which is done when the URL is created.
346
 
                // Create the URL using the table ID passed in not the one from 
347
 
                // the table associated with this object.
348
 
 
349
 
                openForReading();
350
 
                if (type == MS_URL_TYPE_REPO) { // There is no table reference associated with this BLOB yet.
351
 
                        uint32_t                ac;
352
 
                        uint8_t         status;
353
 
                        bool            same_db = true;
354
 
 
355
 
                        if (blob_db->myDatabaseID == db_id)
356
 
                                repo_file = blob_db->getRepoFileFromPool(tab_id, false);
357
 
                        else {
358
 
                                same_db = false;
359
 
                                blob_db = MSDatabase::getDatabase(db_id);
360
 
                                push_(blob_db);
361
 
                                repo_file = blob_db->getRepoFileFromPool(tab_id, false);
362
 
                                release_(blob_db);
363
 
                                blob_db = repo_file->myRepo->myRepoDatabase;
364
 
                        }
365
 
                
366
 
                        frompool_(repo_file);
367
 
                        repo_file->read(&blob, blob_id, MS_MIN_BLOB_HEAD_SIZE, MS_MIN_BLOB_HEAD_SIZE);
368
 
 
369
 
                        repo_offset = blob_id;
370
 
                        blob_size  = CS_GET_DISK_6(blob.rb_blob_data_size_6);
371
 
                        head_size = CS_GET_DISK_2(blob.rb_head_size_2);
372
 
                         
373
 
                        ac = CS_GET_DISK_4(blob.rb_auth_code_4);
374
 
                        if (auth_code != ac)
375
 
                                CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB identifier");
376
 
                        status = CS_GET_DISK_1(blob.rb_status_1);
377
 
                        if ( ! IN_USE_BLOB_STATUS(status))
378
 
                                CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB has already been deleted");
379
 
 
380
 
                        if (same_db) {
381
 
                                // Create a table reference to the BLOB:
382
 
                                repo_id = tab_id;
383
 
                                blob_id = getDBTable()->createBlobHandle(this, tab_id, blob_id, blob_size, head_size, auth_code);
384
 
                                state = MS_UB_NEW_HANDLE;
385
 
                        }
386
 
                        else {
387
 
                                
388
 
                                getDB()->openWriteRepo(this);
389
 
 
390
 
                                // If either databases are using cloud storage then this is
391
 
                                // not supported yet.                   
392
 
                                if (getDB()->myBlobCloud || myWriteRepo->myRepoDatabase->myBlobCloud)
393
 
                                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Copying cloud BLOB between databases is not supported.");
394
 
                        
395
 
                                stream = repo_file->getInputStream(repo_offset);
396
 
                                push_(stream);
397
 
                                repo_offset = myWriteRepo->copyBlob(this, head_size + blob_size, stream);                       
398
 
                                release_(stream);
399
 
 
400
 
                                // Create a table reference to the BLOB:
401
 
                                repo_id = myWriteRepo->myRepoID;
402
 
                                blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
403
 
                                state = MS_UB_NEW_BLOB;
404
 
                        }
405
 
                        backtopool_(repo_file);
406
 
                }
407
 
                else {
408
 
 
409
 
                        if (blob_db->myDatabaseID == db_id && getDBTable()->myTableID == tab_id) {
410
 
                                getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &repo_offset, &blob_size, &head_size, true);
411
 
                                
412
 
                                state = MS_UB_SAME_TAB;
413
 
                        }
414
 
                        else {
415
 
                                MSOpenTable *blob_otab;
416
 
 
417
 
                                blob_otab = MSTableList::getOpenTableByID(db_id, tab_id);
418
 
                                frompool_(blob_otab);
419
 
                                blob_otab->getDBTable()->readBlobHandle(blob_otab, blob_id, &auth_code, &repo_id, &repo_offset, &blob_size, &head_size, true);
420
 
                                if (blob_db->myDatabaseID == db_id) {
421
 
                                        blob_id = getDBTable()->findBlobHandle(this, repo_id, repo_offset, blob_size, head_size, auth_code);
422
 
                                        if (blob_id == 0)
423
 
                                                blob_id = getDBTable()->createBlobHandle(this, repo_id, repo_offset, blob_size, head_size, auth_code);
424
 
                                        state = MS_UB_NEW_HANDLE;
425
 
                                }
426
 
                                else {
427
 
 
428
 
                                        // If either databases are using cloud storage then this is
429
 
                                        // not supported yet.                   
430
 
                                        if (blob_db->myBlobCloud || myWriteRepo->myRepoDatabase->myBlobCloud)
431
 
                                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Copying cloud BLOB between databases is not supported.");
432
 
 
433
 
                                        // NOTE: For each BLOB reference copied from one database to another a new
434
 
                                        // BLOB will be created. This can result in multiple copies fo the same BLOB
435
 
                                        // in the destination database. One way around this would be to redisign things
436
 
                                        // so that there is one BLOB repository shared across all databases. 
437
 
                                        blob_db->openWriteRepo(this);
438
 
                                                                        
439
 
                                        stream = repo_file->getInputStream(repo_offset);
440
 
                                        push_(stream);
441
 
                                        
442
 
                                        repo_offset = myWriteRepo->copyBlob(this, head_size + blob_size, stream);
443
 
                                        
444
 
                                        release_(stream);
445
 
 
446
 
                                        repo_id = myWriteRepo->myRepoID;
447
 
                                        blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
448
 
                                        state = MS_UB_NEW_BLOB;
449
 
                                }
450
 
                                backtopool_(blob_otab);
451
 
                        }
452
 
                        
453
 
                }
454
 
                
455
 
                blob_ref_id = blob_db->newBlobRefId();
456
 
                
457
 
                // Always use the table ID of this table because regardless of
458
 
                // where the BLOB ref came from it is being inserted into this table.
459
 
                tab_id = getDBTable()->myTableID; 
460
 
                
461
 
                // Add the BLOB reference to the repository.
462
 
                repo_file = blob_db->getRepoFileFromPool(repo_id, false);               
463
 
                frompool_(repo_file);
464
 
                repo_file->referenceBlob(this, repo_offset, head_size, tab_id, blob_id, blob_ref_id, auth_code, col_index);             
465
 
                backtopool_(repo_file);
466
 
                
467
 
                MSTransactionManager::referenceBLOB(getDB()->myDatabaseID, tab_id, blob_id, blob_ref_id);
468
 
 
469
 
        } 
470
 
        
471
 
        formatBlobURL(ret_blob_url, blob_id, auth_code, blob_size, tab_id, blob_ref_id);
472
 
                
473
 
        exit_();
474
 
}
475
 
 
476
 
void MSOpenTable::releaseReference(uint64_t blob_id, uint64_t blob_ref_id)
477
 
{
478
 
        enter_();
479
 
        
480
 
        MSTransactionManager::dereferenceBLOB(getDB()->myDatabaseID, getDBTable()->myTableID, blob_id, blob_ref_id);
481
 
 
482
 
        exit_();
483
 
}
484
 
 
485
 
void MSOpenTable::checkBlob(CSStringBuffer *buffer, uint64_t blob_id, uint32_t auth_code, uint32_t temp_log_id, uint32_t temp_log_offset)
486
 
{
487
 
        uint32_t                repo_id;
488
 
        uint64_t                offset;
489
 
        uint64_t                size;
490
 
        uint16_t                head_size;
491
 
        MSRepoFile      *repo_file;
492
 
 
493
 
        enter_();
494
 
        openForReading();
495
 
        if (getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &size, &head_size, false)) {
496
 
                if ((repo_file = getDB()->getRepoFileFromPool(repo_id, true))) {
497
 
                        frompool_(repo_file);
498
 
                        repo_file->checkBlob(buffer, offset, auth_code, temp_log_id, temp_log_offset);
499
 
                        backtopool_(repo_file);
500
 
                }
501
 
                else
502
 
                        getDBTable()->freeBlobHandle(this, blob_id, repo_id, offset, auth_code);
503
 
        }
504
 
        exit_();
505
 
}
506
 
 
507
 
bool MSOpenTable::deleteReferences(uint32_t temp_log_id, uint32_t temp_log_offset, bool *must_quit)
508
 
{
509
 
        MSTableHeadRec          tab_head;
510
 
        off64_t                         blob_id;
511
 
        MSTableBlobRec          tab_blob;
512
 
        uint32_t                                repo_id;
513
 
        uint64_t                                repo_offset;
514
 
        uint16_t                                head_size;
515
 
        uint32_t                                auth_code;
516
 
        MSRepoFile                      *repo_file = NULL;
517
 
        bool                            result = true;
518
 
 
519
 
        enter_();
520
 
        openForReading();
521
 
        if (myTableFile->read(&tab_head, 0, offsetof(MSTableHeadRec, th_reserved_4), 0) < offsetof(MSTableHeadRec, th_reserved_4))
522
 
                /* Nothing to read, delete it ... */
523
 
                goto exit;
524
 
        if (CS_GET_DISK_4(tab_head.th_temp_log_id_4) != temp_log_id ||
525
 
                CS_GET_DISK_4(tab_head.th_temp_log_offset_4) != temp_log_offset) {
526
 
                /* Wrong delete reference (ignore): */
527
 
                result = false;
528
 
                goto exit;
529
 
        }
530
 
 
531
 
        blob_id = CS_GET_DISK_2(tab_head.th_head_size_2);
532
 
        while (blob_id + sizeof(MSTableBlobRec) <= getDBTable()->getTableFileSize()) {
533
 
                if (*must_quit) {
534
 
                        /* Bit of a waste of work, but we must quit! */
535
 
                        result = false;
536
 
                        break;
537
 
                }
538
 
                if (myTableFile->read(&tab_blob, blob_id, sizeof(MSTableBlobRec), 0) < sizeof(MSTableBlobRec))
539
 
                        break;
540
 
                repo_id = CS_GET_DISK_3(tab_blob.tb_repo_id_3);
541
 
                repo_offset = CS_GET_DISK_6(tab_blob.tb_offset_6);
542
 
                head_size = CS_GET_DISK_2(tab_blob.tb_header_size_2);
543
 
                auth_code = CS_GET_DISK_4(tab_blob.tb_auth_code_4);
544
 
                if (repo_file && repo_file->myRepo->myRepoID != repo_id) {
545
 
                        backtopool_(repo_file);
546
 
                        repo_file = NULL;
547
 
                }
548
 
                if (!repo_file) {
549
 
                        repo_file = getDB()->getRepoFileFromPool(repo_id, true);
550
 
                        if (repo_file)
551
 
                                frompool_(repo_file);
552
 
                }
553
 
                if (repo_file) 
554
 
                        repo_file->freeTableReference(this, repo_offset, head_size, getDBTable()->myTableID, blob_id, auth_code);
555
 
                
556
 
                blob_id += sizeof(MSTableBlobRec);
557
 
        }
558
 
        
559
 
        if (repo_file)
560
 
                backtopool_(repo_file);
561
 
 
562
 
        exit:
563
 
        return_(result);
564
 
}
565
 
 
566
 
void MSOpenTable::openForReading()
567
 
{
568
 
        if (!myTableFile && !isNotATable)
569
 
                myTableFile = getDBTable()->openTableFile();
570
 
}
571
 
 
572
 
void MSOpenTable::openForWriting()
573
 
{
574
 
        if (myTableFile && myWriteRepo && myWriteRepoFile)
575
 
                return;
576
 
        enter_();
577
 
        openForReading();
578
 
        if (!myWriteRepo || !myWriteRepoFile)
579
 
                getDB()->openWriteRepo(this);
580
 
        exit_();
581
 
}
582
 
 
583
 
void MSOpenTable::closeForWriting()
584
 
{
585
 
        if (myWriteRepoFile) {          
586
 
                myWriteRepoFile->myRepo->syncHead(myWriteRepoFile);
587
 
                myWriteRepoFile->release();
588
 
                myWriteRepoFile = NULL;
589
 
        }
590
 
        if (myWriteRepo) {
591
 
                myWriteRepo->unlockRepo(REPO_WRITE);
592
 
#ifndef MS_COMPACTOR_POLLS
593
 
                if (myWriteRepo->getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
594
 
                        if (myWriteRepo->myRepoDatabase->myCompactorThread)
595
 
                                myWriteRepo->myRepoDatabase->myCompactorThread->wakeup();
596
 
                }
597
 
#endif
598
 
                myWriteRepo->release();
599
 
                myWriteRepo = NULL;
600
 
        }
601
 
}
602
 
 
603
 
uint32_t MSOpenTable::getTableID()
604
 
{
605
 
        return myPool->myPoolTable->myTableID;
606
 
}
607
 
 
608
 
MSTable *MSOpenTable::getDBTable()
609
 
{
610
 
        return myPool->myPoolTable;
611
 
}
612
 
 
613
 
MSDatabase *MSOpenTable::getDB()
614
 
{
615
 
        return myPool->myPoolDB;
616
 
}
617
 
 
618
 
void MSOpenTable::formatBlobURL(PBMSBlobURLPtr blob_url, uint64_t blob_id, uint32_t auth_code, uint64_t blob_size, uint32_t tab_id, uint64_t blob_ref_id)
619
 
{
620
 
        MSBlobURLRec blob;
621
 
        
622
 
        blob.bu_type = MS_URL_TYPE_BLOB;
623
 
        blob.bu_db_id = getDB()->myDatabaseID;
624
 
        blob.bu_tab_id = tab_id;
625
 
        blob.bu_blob_id = blob_id;
626
 
        blob.bu_auth_code = auth_code;
627
 
        blob.bu_server_id = PBMSParameters::getServerID();
628
 
        blob.bu_blob_size = blob_size;
629
 
        blob.bu_blob_ref_id = blob_ref_id;
630
 
        
631
 
        PBMSBlobURLTools::buildBlobURL(&blob, blob_url);
632
 
        
633
 
}
634
 
void MSOpenTable::formatBlobURL(PBMSBlobURLPtr blob_url, uint64_t blob_id, uint32_t auth_code, uint64_t blob_size, uint64_t blob_ref_id)
635
 
{
636
 
        formatBlobURL(blob_url, blob_id, auth_code, blob_size, getDBTable()->myTableID, blob_ref_id);
637
 
}
638
 
void MSOpenTable::formatRepoURL(PBMSBlobURLPtr blob_url, uint32_t log_id, uint64_t log_offset, uint32_t auth_code, uint64_t blob_size)
639
 
{
640
 
        MSBlobURLRec blob;
641
 
        
642
 
        blob.bu_type = MS_URL_TYPE_REPO;
643
 
        blob.bu_db_id = getDB()->myDatabaseID;
644
 
        blob.bu_tab_id = log_id;
645
 
        blob.bu_blob_id = log_offset;
646
 
        blob.bu_auth_code = auth_code;
647
 
        blob.bu_server_id = PBMSParameters::getServerID();
648
 
        blob.bu_blob_size = blob_size;
649
 
        blob.bu_blob_ref_id = 0;
650
 
        
651
 
        PBMSBlobURLTools::buildBlobURL(&blob, blob_url);
652
 
}
653
 
 
654
 
MSOpenTable *MSOpenTable::newOpenTable(MSOpenTablePool *pool)
655
 
{
656
 
        MSOpenTable *otab;
657
 
        
658
 
        if (!(otab = new MSOpenTable()))
659
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
660
 
        if ((otab->myPool = pool))
661
 
                otab->isNotATable = pool->myPoolTable == NULL;
662
 
        else
663
 
                otab->isNotATable = false;
664
 
                
665
 
        return otab;
666
 
}
667
 
 
668
 
/*
669
 
 * ---------------------------------------------------------------
670
 
 * OPEN TABLE POOLS
671
 
 */
672
 
 
673
 
MSOpenTablePool::MSOpenTablePool():
674
 
myPoolTableID(0),
675
 
isRemovingTP(false),
676
 
myPoolTable(NULL),
677
 
myPoolDB(NULL),
678
 
iTablePool(NULL)
679
 
{
680
 
}
681
 
 
682
 
MSOpenTablePool::~MSOpenTablePool()
683
 
{
684
 
        isRemovingTP = true;
685
 
        removeOpenTablesNotInUse();
686
 
        /* With this, I also delete those that are in use!: */
687
 
        iPoolTables.clear();
688
 
        if (myPoolTable)
689
 
                myPoolTable->release();
690
 
        if (myPoolDB)
691
 
                myPoolDB->release();
692
 
}
693
 
 
694
 
#ifdef DEBUG
695
 
void MSOpenTablePool::check()
696
 
{
697
 
        MSOpenTable     *otab, *ptab;
698
 
 
699
 
        if ((otab = (MSOpenTable *) iPoolTables.getBack())) {
700
 
        bool found;
701
 
                do {
702
 
                        found = false;
703
 
                        ptab = iTablePool;
704
 
                        while (ptab) {
705
 
                                if (ptab == otab) {
706
 
                                        ASSERT(!found);
707
 
                                        found = true;
708
 
                                }
709
 
                                ptab = ptab->nextTable;
710
 
                        }
711
 
                        if (otab->inUse) {
712
 
                                ASSERT(!found);
713
 
                        }
714
 
                        else {
715
 
                                ASSERT(found);
716
 
                        }
717
 
                        otab = (MSOpenTable *) otab->getNextLink();
718
 
                } while (otab);
719
 
        }
720
 
        else
721
 
                ASSERT(!iTablePool);
722
 
}
723
 
#endif
724
 
 
725
 
/*
726
 
 * This returns the table referenced. So it is safe from the pool being
727
 
 * destroyed.
728
 
 */
729
 
MSOpenTable *MSOpenTablePool::getPoolTable()
730
 
{
731
 
        MSOpenTable *otab;
732
 
 
733
 
        if ((otab = iTablePool)) {
734
 
                iTablePool = otab->nextTable;
735
 
                otab->nextTable = NULL;
736
 
                ASSERT(!otab->inUse);
737
 
                otab->inUse = true;
738
 
                otab->retain();
739
 
        }
740
 
        return otab;
741
 
}
742
 
 
743
 
void MSOpenTablePool::returnOpenTable(MSOpenTable *otab)
744
 
{
745
 
        otab->inUse = false;
746
 
        otab->nextTable = iTablePool;
747
 
        iTablePool = otab;
748
 
}
749
 
 
750
 
/*
751
 
 * Add a table to the pool, but do not release it!
752
 
 */
753
 
void MSOpenTablePool::addOpenTable(MSOpenTable *otab)
754
 
{
755
 
        iPoolTables.addFront(otab);
756
 
}
757
 
 
758
 
void MSOpenTablePool::removeOpenTable(MSOpenTable *otab)
759
 
{
760
 
        otab->close();
761
 
        iPoolTables.remove(otab);
762
 
}
763
 
 
764
 
void MSOpenTablePool::removeOpenTablesNotInUse()
765
 
{
766
 
        MSOpenTable *otab, *curr_otab;
767
 
 
768
 
        iTablePool = NULL;
769
 
        /* Remove all tables that are not in use: */
770
 
        if ((otab = (MSOpenTable *) iPoolTables.getBack())) {
771
 
                do {
772
 
                        curr_otab = otab;
773
 
                        otab = (MSOpenTable *) otab->getNextLink();
774
 
                        if (!curr_otab->inUse)
775
 
                                iPoolTables.remove(curr_otab);
776
 
                } while (otab);
777
 
        }
778
 
}
779
 
 
780
 
void MSOpenTablePool::returnToPool()
781
 
{
782
 
        MSTableList::removeTablePool(this);
783
 
}
784
 
 
785
 
MSOpenTablePool *MSOpenTablePool::newPool(uint32_t db_id, uint32_t tab_id)
786
 
{
787
 
        MSOpenTablePool *pool;
788
 
        enter_();
789
 
        
790
 
        if (!(pool = new MSOpenTablePool())) {
791
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
792
 
        }
793
 
        push_(pool);
794
 
        pool->myPoolDB = MSDatabase::getDatabase(db_id);
795
 
        pool->myPoolTableID = tab_id;
796
 
        if (tab_id)
797
 
                pool->myPoolTable = pool->myPoolDB->getTable(tab_id, false);
798
 
        pop_(pool);
799
 
        return_(pool);
800
 
}
801
 
 
802
 
/*
803
 
 * ---------------------------------------------------------------
804
 
 * TABLE LIST
805
 
 */
806
 
 
807
 
CSSyncOrderedList               *MSTableList::gPoolListByID;
808
 
 
809
 
MSTableList::MSTableList()
810
 
{       
811
 
}
812
 
 
813
 
MSTableList::~MSTableList()
814
 
{
815
 
}
816
 
 
817
 
void MSTableList::startUp()
818
 
{
819
 
        new_(gPoolListByID, CSSyncOrderedList);
820
 
}
821
 
 
822
 
void MSTableList::shutDown()
823
 
{
824
 
        if (gPoolListByID) {
825
 
                gPoolListByID->clear();
826
 
                gPoolListByID->release();
827
 
                gPoolListByID = NULL;
828
 
        }
829
 
}
830
 
 
831
 
class MSTableKey : public CSOrderKey {
832
 
public:
833
 
        uint32_t        myKeyDatabaseID;
834
 
        uint32_t        myKeyTableID;
835
 
 
836
 
        MSTableKey(): myKeyDatabaseID(0), myKeyTableID(0){ }
837
 
 
838
 
        virtual ~MSTableKey() {
839
 
        }
840
 
 
841
 
        int compareKey(CSObject *key) {return CSObject::compareKey(key);}
842
 
        virtual int compareKey(CSOrderKey *x) {
843
 
                MSTableKey      *key = (MSTableKey *) x;
844
 
                int                     r = 0;
845
 
 
846
 
                if (myKeyDatabaseID < key->myKeyDatabaseID)
847
 
                        r = -1;
848
 
                else if (myKeyDatabaseID > key->myKeyDatabaseID)
849
 
                        r = 1;
850
 
                        
851
 
                if (r == 0) {
852
 
                        if (myKeyTableID < key->myKeyTableID)
853
 
                                r = -1;
854
 
                        else if (myKeyTableID > key->myKeyTableID)
855
 
                                r = 1;
856
 
                }
857
 
                return r;
858
 
        }
859
 
 
860
 
public:
861
 
        static MSTableKey *newTableKey(uint32_t db_id, uint32_t tab_id)
862
 
        {
863
 
                MSTableKey *key;
864
 
 
865
 
                if (!(key = new MSTableKey())) {
866
 
                        CSException::throwOSError(CS_CONTEXT, ENOMEM);
867
 
                }
868
 
                key->myKeyDatabaseID = db_id;
869
 
                key->myKeyTableID = tab_id;
870
 
                return key;
871
 
        }
872
 
};
873
 
 
874
 
MSOpenTable *MSTableList::getOpenTableByID(uint32_t db_id, uint32_t tab_id)
875
 
{
876
 
        MSOpenTablePool         *pool;
877
 
        MSOpenTable                     *otab = NULL;
878
 
        MSTableKey                      key;
879
 
 
880
 
        enter_();
881
 
        lock_(gPoolListByID);
882
 
        key.myKeyDatabaseID = db_id;
883
 
        key.myKeyTableID = tab_id;
884
 
        pool = (MSOpenTablePool *) gPoolListByID->find(&key);
885
 
        if (!pool) {
886
 
                MSTableKey      *key_ptr;
887
 
                pool = MSOpenTablePool::newPool(db_id, tab_id);
888
 
                key_ptr = MSTableKey::newTableKey(db_id, tab_id);
889
 
                gPoolListByID->add(key_ptr, pool);
890
 
        }
891
 
        if (!(otab = pool->getPoolTable())) {
892
 
                otab = MSOpenTable::newOpenTable(pool);
893
 
                pool->addOpenTable(otab);
894
 
                otab->retain();
895
 
        }
896
 
        unlock_(gPoolListByID);
897
 
        return_(otab);
898
 
}
899
 
 
900
 
MSOpenTable *MSTableList::getOpenTableForDB(uint32_t db_id)
901
 
{
902
 
        return(MSTableList::getOpenTableByID(db_id, 0));
903
 
}
904
 
 
905
 
 
906
 
void MSTableList::releaseTable(MSOpenTable *otab)
907
 
{
908
 
        MSOpenTablePool *pool;
909
 
 
910
 
        enter_();
911
 
        lock_(gPoolListByID);
912
 
        push_(otab);
913
 
        if ((pool = otab->myPool)) {
914
 
                if (pool->isRemovingTP) {
915
 
                        pool->removeOpenTable(otab);
916
 
                        gPoolListByID->wakeup();
917
 
                }
918
 
                else
919
 
                        pool->returnOpenTable(otab);
920
 
        }
921
 
        release_(otab);
922
 
        unlock_(gPoolListByID);
923
 
        exit_();
924
 
}
925
 
 
926
 
bool MSTableList::removeTablePoolIfEmpty(MSOpenTablePool *pool)
927
 
{
928
 
        enter_();
929
 
        if (pool->getSize() == 0) {
930
 
                MSTableKey      key;
931
 
                
932
 
                key.myKeyDatabaseID = pool->myPoolDB->myDatabaseID;
933
 
                key.myKeyTableID = pool->myPoolTableID;
934
 
                gPoolListByID->remove(&key);
935
 
                /* TODO: Remove the table from the database, if it does not exist
936
 
                 * on disk.
937
 
                 */
938
 
                return_(true);
939
 
        }
940
 
        return_(false);
941
 
}
942
 
 
943
 
void MSTableList::removeTablePool(MSOpenTablePool *pool)
944
 
{
945
 
        enter_();
946
 
        lock_(gPoolListByID);
947
 
        for (;;) {
948
 
                pool->isRemovingTP = true;
949
 
                pool->removeOpenTablesNotInUse();
950
 
                if (removeTablePoolIfEmpty(pool)) 
951
 
                        break;
952
 
 
953
 
                /*
954
 
                 * Wait for the tables that are in use to be
955
 
                 * freed.
956
 
                 */
957
 
                gPoolListByID->wait();
958
 
        }
959
 
        unlock_(gPoolListByID);
960
 
        exit_();
961
 
}
962
 
 
963
 
/*
964
 
 * Close the pool associated with this open table.
965
 
 */
966
 
void MSTableList::removeTablePool(MSOpenTable *otab)
967
 
{
968
 
        MSOpenTablePool *pool;
969
 
        MSTableKey      key;
970
 
        
971
 
        key.myKeyDatabaseID = otab->getDB()->myDatabaseID;
972
 
        key.myKeyTableID = otab->getTableID();
973
 
 
974
 
        enter_();
975
 
        frompool_(otab);
976
 
        lock_(gPoolListByID);
977
 
        for (;;) {
978
 
                if (!(pool = (MSOpenTablePool *) gPoolListByID->find(&key)))
979
 
                        break;
980
 
                pool->isRemovingTP = true;
981
 
                pool->removeOpenTablesNotInUse();
982
 
                if (removeTablePoolIfEmpty(pool))
983
 
                        break;
984
 
                /*
985
 
                 * Wait for the tables that are in use to be
986
 
                 * freed.
987
 
                 */
988
 
                gPoolListByID->wait();
989
 
        }
990
 
        unlock_(gPoolListByID);
991
 
        backtopool_(otab);
992
 
        exit_();
993
 
}
994
 
 
995
 
void MSTableList::removeDatabaseTables(MSDatabase *database)
996
 
{
997
 
        MSOpenTablePool *pool;
998
 
        uint32_t                        idx;
999
 
        
1000
 
 
1001
 
        enter_();
1002
 
        push_(database);
1003
 
        
1004
 
        retry:
1005
 
        lock_(gPoolListByID);
1006
 
        idx = 0;
1007
 
        while ((pool = (MSOpenTablePool *) gPoolListByID->itemAt(idx))) {
1008
 
                if (pool->myPoolDB == database) {
1009
 
                        break;
1010
 
                }
1011
 
                idx++;
1012
 
        }
1013
 
        unlock_(gPoolListByID);
1014
 
 
1015
 
        if (pool) {
1016
 
                removeTablePool(pool);
1017
 
                goto retry;
1018
 
        }
1019
 
        
1020
 
        release_(database);
1021
 
        exit_();
1022
 
}
1023
 
 
1024
 
// lockTablePoolForDeletion() is only called to lock a pool for a table which is about  to be removed.
1025
 
// When the pool is returned then it will be removed from the global pool list.
1026
 
MSOpenTablePool *MSTableList::lockTablePoolForDeletion(uint32_t db_id, uint32_t tab_id, CSString *db_name, CSString *tab_name)
1027
 
{
1028
 
        MSOpenTablePool *pool;
1029
 
        MSTableKey              key;
1030
 
 
1031
 
        enter_();
1032
 
 
1033
 
        push_(db_name);
1034
 
        if (tab_name)
1035
 
                push_(tab_name);
1036
 
                
1037
 
        key.myKeyDatabaseID = db_id;
1038
 
        key.myKeyTableID = tab_id;
1039
 
        
1040
 
        lock_(gPoolListByID);
1041
 
 
1042
 
        for (;;) {
1043
 
                if (!(pool = (MSOpenTablePool *) gPoolListByID->find(&key))) {
1044
 
                        char buffer[CS_EXC_MESSAGE_SIZE];
1045
 
 
1046
 
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Table is temporarily not available: ");
1047
 
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, db_name->getCString());
1048
 
                        if(tab_name) {
1049
 
                                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, ".");
1050
 
                                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, tab_name->getCString());
1051
 
                        }
1052
 
                        CSException::throwException(CS_CONTEXT, MS_ERR_TABLE_LOCKED, buffer);
1053
 
                }
1054
 
                pool->isRemovingTP = true;
1055
 
                pool->removeOpenTablesNotInUse();
1056
 
                if (pool->getSize() == 0) {
1057
 
                        // pool->retain();      Do not do this. The return to pool will free this by removing it from the list. 
1058
 
                        break;
1059
 
                }
1060
 
                /*
1061
 
                 * Wait for the tables that are in use to be
1062
 
                 * freed.
1063
 
                 */
1064
 
                gPoolListByID->wait();
1065
 
        }
1066
 
        unlock_(gPoolListByID);
1067
 
        
1068
 
        if (tab_name)
1069
 
                release_(tab_name);
1070
 
        release_(db_name);
1071
 
        return_(pool);  
1072
 
        
1073
 
}
1074
 
 
1075
 
MSOpenTablePool *MSTableList::lockTablePoolForDeletion(MSTable *tab)
1076
 
{
1077
 
        CSString *tab_name = NULL, *db_name;
1078
 
        uint32_t db_id, tab_id;
1079
 
        
1080
 
        enter_();
1081
 
 
1082
 
        db_name = tab->myDatabase->myDatabaseName;
1083
 
        db_name->retain();
1084
 
 
1085
 
        tab_name = tab->myTableName;
1086
 
        tab_name->retain();
1087
 
        
1088
 
        db_id = tab->myDatabase->myDatabaseID;
1089
 
        tab_id = tab->myTableID;
1090
 
        
1091
 
        tab->release();
1092
 
        
1093
 
        return_( lockTablePoolForDeletion(db_id, tab_id, db_name, tab_name));
1094
 
}
1095
 
 
1096
 
MSOpenTablePool *MSTableList::lockTablePoolForDeletion(MSOpenTable *otab)
1097
 
{
1098
 
        CSString *tab_name = NULL, *db_name;
1099
 
        uint32_t db_id, tab_id;
1100
 
        MSTable *tab;
1101
 
 
1102
 
        enter_();
1103
 
        
1104
 
        tab = otab->getDBTable();
1105
 
        if (tab) {
1106
 
                tab_name = tab->myTableName;
1107
 
                tab_name->retain();
1108
 
        }
1109
 
        
1110
 
        db_name = otab->getDB()->myDatabaseName;
1111
 
        db_name->retain();
1112
 
 
1113
 
        db_id = otab->getDB()->myDatabaseID;
1114
 
        tab_id = otab->getTableID();
1115
 
 
1116
 
        otab->returnToPool();
1117
 
 
1118
 
        return_( lockTablePoolForDeletion(db_id, tab_id, db_name, tab_name));
1119
 
        
1120
 
}
1121
 
 
1122