~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pbms/src/open_table_ms.cc

  • Committer: Package Import Robot
  • Author(s): Clint Byrum
  • Date: 2012-06-19 10:46:49 UTC
  • mfrom: (1.1.6)
  • mto: This revision was merged to the branch mainline in revision 29.
  • Revision ID: package-import@ubuntu.com-20120619104649-e2l0ggd4oz3um0f4
Tags: upstream-7.1.36-stable
ImportĀ upstreamĀ versionĀ 7.1.36-stable

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Original author: Paul McCullagh
20
 
 * Continued development: Barry Leslie
21
 
 *
22
 
 * 2007-06-04
23
 
 *
24
 
 * H&G2JCtL
25
 
 *
26
 
 * Media Stream Tables.
27
 
 *
28
 
 */
29
 
#include "cslib/CSConfig.h"
30
 
 
31
 
#include "defs_ms.h"
32
 
 
33
 
#include "cslib/CSGlobal.h"
34
 
#include "cslib/CSLog.h"
35
 
#include "cslib/CSStrUtil.h"
36
 
#include "cslib/CSPath.h"
37
 
 
38
 
#include "open_table_ms.h"
39
 
#include "table_ms.h"
40
 
#include "connection_handler_ms.h"
41
 
#include "engine_ms.h"
42
 
#include "transaction_ms.h"
43
 
#include "parameters_ms.h"
44
 
 
45
 
/*
46
 
 * ---------------------------------------------------------------
47
 
 * OPEN TABLES
48
 
 */
49
 
 
50
 
MSOpenTable::MSOpenTable():
51
 
CSRefObject(),
52
 
CSPooled(),
53
 
inUse(true),
54
 
isNotATable(false),
55
 
nextTable(NULL),
56
 
myPool(NULL),
57
 
myTableFile(NULL),
58
 
myWriteRepo(NULL),
59
 
myWriteRepoFile(NULL),
60
 
myTempLogFile(NULL),
61
 
iNextLink(NULL),
62
 
iPrevLink(NULL)
63
 
//iUseSize(0),
64
 
//iUseCount(0),
65
 
//iUsedBlobs(0)
66
 
{
67
 
        memset(myOTBuffer, 0, MS_OT_BUFFER_SIZE); // wipe this to make valgrind happy.
68
 
}
69
 
 
70
 
MSOpenTable::~MSOpenTable()
71
 
{
72
 
        close();
73
 
}
74
 
 
75
 
void MSOpenTable::close()
76
 
{
77
 
        enter_();
78
 
        if (myTableFile) {
79
 
                myTableFile->release();
80
 
                myTableFile = NULL;
81
 
        }
82
 
        closeForWriting();
83
 
        if (myTempLogFile) {
84
 
                myTempLogFile->release();
85
 
                myTempLogFile = NULL;
86
 
        }
87
 
/*
88
 
        if (iUsedBlobs) {
89
 
                cs_free(iUsedBlobs);
90
 
                iUsedBlobs = NULL;
91
 
        }
92
 
        iUseCount = 0;
93
 
        iUseSize = 0;
94
 
*/
95
 
        exit_();
96
 
}
97
 
 
98
 
void MSOpenTable::returnToPool()
99
 
{
100
 
        MSTableList::releaseTable(this);
101
 
}
102
 
 
103
 
// This cleanup class is used to reset the 
104
 
// repository size if something goes wrong.
105
 
class CreateBlobCleanUp : public CSRefObject {
106
 
        bool do_cleanup;
107
 
        uint64_t old_size;
108
 
        MSOpenTable *ot;
109
 
        MSRepository *repo;
110
 
 
111
 
        public:
112
 
        
113
 
        CreateBlobCleanUp(): CSRefObject(),
114
 
                do_cleanup(false){}
115
 
                
116
 
        ~CreateBlobCleanUp() 
117
 
        {
118
 
                if (do_cleanup) {
119
 
                        repo->setRepoFileSize(ot, old_size);
120
 
 
121
 
                }
122
 
        }
123
 
        
124
 
        void setCleanUp(MSOpenTable *ot_arg, MSRepository *repo_arg, uint64_t size)
125
 
        {
126
 
                old_size = size;
127
 
                repo = repo_arg;
128
 
                ot = ot_arg;
129
 
                do_cleanup = true;
130
 
        }
131
 
        
132
 
        void cancelCleanUp()
133
 
        {
134
 
                do_cleanup = false;
135
 
        }
136
 
        
137
 
};
138
 
 
139
 
void MSOpenTable::createBlob(PBMSBlobURLPtr bh, uint64_t blob_size, char *metadata, uint16_t metadata_size, CSInputStream *stream, CloudKeyPtr cloud_key, Md5Digest *checksum)
140
 
{
141
 
        uint64_t repo_offset;
142
 
        uint64_t blob_id = 0;
143
 
        uint32_t        auth_code;
144
 
        uint16_t head_size;
145
 
        uint32_t        log_id;
146
 
        uint32_t log_offset;
147
 
        uint32_t temp_time;
148
 
        uint64_t repo_size;
149
 
        uint64_t repo_id;
150
 
        Md5Digest my_checksum;
151
 
        CloudKeyRec cloud_key_rec;
152
 
        CreateBlobCleanUp *cleanup;
153
 
        enter_();
154
 
        
155
 
        new_(cleanup, CreateBlobCleanUp());
156
 
        push_(cleanup);
157
 
        
158
 
        if (!checksum)
159
 
                checksum = &my_checksum;
160
 
                
161
 
        if (stream) push_(stream);
162
 
        openForWriting();
163
 
        ASSERT(myWriteRepo);
164
 
        auth_code = random();
165
 
        repo_size = myWriteRepo->getRepoFileSize();
166
 
        temp_time =     myWriteRepo->myLastTempTime;
167
 
 
168
 
        // If an exception occurs the cleanup operation will be called.
169
 
        cleanup->setCleanUp(this, myWriteRepo, repo_size);
170
 
 
171
 
        head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
172
 
        if (getDB()->myBlobType == MS_STANDARD_STORAGE) {
173
 
                pop_(stream);
174
 
                repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size, checksum, stream);
175
 
        } else {
176
 
                ASSERT(getDB()->myBlobType == MS_CLOUD_STORAGE);
177
 
                CloudDB *cloud = getDB()->myBlobCloud;
178
 
                
179
 
                if (!cloud)
180
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Creating cloud BLOB without cloud.");
181
 
        
182
 
                repo_offset = repo_size + head_size;
183
 
                memset(checksum, 0, sizeof(Md5Digest)); // The checksum is only for local storage.
184
 
                
185
 
                // If there is a stream then the data has not been sent to the cloud yet.
186
 
                if (stream) { 
187
 
                        cloud_key = &cloud_key_rec;
188
 
                        cloud->cl_getNewKey(cloud_key);
189
 
                        pop_(stream);
190
 
                        cloud->cl_putData(cloud_key, stream, blob_size);
191
 
                }
192
 
                
193
 
        }
194
 
        
195
 
        repo_id = myWriteRepo->myRepoID;
196
 
        if (isNotATable) {      
197
 
                getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
198
 
                formatRepoURL(bh, repo_id, repo_offset, auth_code, blob_size);
199
 
        }
200
 
        else {
201
 
                blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
202
 
                getDB()->queueForDeletion(this, MS_TL_BLOB_REF, getDBTable()->myTableID, blob_id, auth_code, &log_id, &log_offset, &temp_time);
203
 
                formatBlobURL(bh, blob_id, auth_code, blob_size, 0);
204
 
        }
205
 
        
206
 
        myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, checksum, metadata, metadata_size, blob_id, auth_code, log_id, log_offset, getDB()->myBlobType, cloud_key);
207
 
        
208
 
        cleanup->cancelCleanUp();
209
 
        release_(cleanup);
210
 
        
211
 
        exit_();
212
 
}
213
 
 
214
 
// BLOBs created with this method are always created as standard local BLOBs. (No cloud storage)
215
 
void MSOpenTable::createBlob(PBMSBlobIDPtr blob_id, uint64_t blob_size, char *metadata, uint16_t metadata_size)
216
 
{
217
 
        uint64_t repo_size;
218
 
        uint64_t repo_offset;
219
 
        uint64_t repo_id;
220
 
        uint32_t        auth_code;
221
 
        uint16_t head_size;
222
 
        uint32_t        log_id;
223
 
        uint32_t log_offset;
224
 
        uint32_t temp_time;
225
 
        CreateBlobCleanUp *cleanup;
226
 
        enter_();
227
 
        
228
 
        new_(cleanup, CreateBlobCleanUp());
229
 
        push_(cleanup);
230
 
 
231
 
        openForWriting();
232
 
        ASSERT(myWriteRepo);
233
 
        auth_code = random();
234
 
        
235
 
        repo_size = myWriteRepo->getRepoFileSize();
236
 
        
237
 
        // If an exception occurs the cleanup operation will be called.
238
 
        cleanup->setCleanUp(this, myWriteRepo, repo_size);
239
 
 
240
 
        head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
241
 
 
242
 
        repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size);
243
 
        repo_id = myWriteRepo->myRepoID;
244
 
        temp_time = myWriteRepo->myLastTempTime;
245
 
        getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
246
 
        myWriteRepo->myLastTempTime = temp_time;
247
 
        myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, NULL, metadata, metadata_size, 0, auth_code, log_id, log_offset, MS_STANDARD_STORAGE, NULL);
248
 
        // myWriteRepo->setRepoFileSize(this, repo_offset + head_size + blob_size);This is now set by writeBlobHead()
249
 
        
250
 
        blob_id->bi_db_id = getDB()->myDatabaseID;
251
 
        blob_id->bi_blob_id = repo_offset;
252
 
        blob_id->bi_tab_id = repo_id;
253
 
        blob_id->bi_auth_code = auth_code;
254
 
        blob_id->bi_blob_size = blob_size;
255
 
        blob_id->bi_blob_type = MS_URL_TYPE_REPO;
256
 
        blob_id->bi_blob_ref_id = 0;
257
 
        
258
 
        cleanup->cancelCleanUp();
259
 
        release_(cleanup);
260
 
 
261
 
        exit_();
262
 
}
263
 
 
264
 
void MSOpenTable::sendRepoBlob(uint64_t blob_id, uint64_t req_offset, uint64_t req_size, uint32_t auth_code, bool info_only, CSHTTPOutputStream *stream)
265
 
{
266
 
        uint32_t                repo_id;
267
 
        uint64_t                offset;
268
 
        uint64_t                size;
269
 
        uint16_t                head_size;
270
 
        MSRepoFile      *repo_file;
271
 
 
272
 
        enter_();
273
 
        openForReading();
274
 
        getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &size, &head_size, true);
275
 
        repo_file = getDB()->getRepoFileFromPool(repo_id, false);
276
 
        frompool_(repo_file);
277
 
        //repo_file->sendBlob(this, offset, head_size, size, stream);
278
 
        repo_file->sendBlob(this, offset, req_offset, req_size, 0, false, info_only, stream);
279
 
        backtopool_(repo_file);
280
 
        exit_();
281
 
}
282
 
 
283
 
void MSOpenTable::freeReference(uint64_t blob_id, uint64_t blob_ref_id)
284
 
{
285
 
        uint32_t                repo_id;
286
 
        uint64_t                offset;
287
 
        uint64_t                blob_size;
288
 
        uint16_t                head_size;
289
 
        MSRepoFile      *repo_file;
290
 
        uint32_t                auth_code = 0;
291
 
 
292
 
        enter_();
293
 
        openForReading();
294
 
 
295
 
        getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &blob_size, &head_size, true);
296
 
        repo_file = getDB()->getRepoFileFromPool(repo_id, false);
297
 
 
298
 
        frompool_(repo_file);
299
 
        repo_file->releaseBlob(this, offset, head_size, getDBTable()->myTableID, blob_id, blob_ref_id, auth_code);
300
 
        backtopool_(repo_file);
301
 
 
302
 
        exit_();
303
 
}
304
 
 
305
 
void MSOpenTable::commitReference(uint64_t blob_id, uint64_t blob_ref_id)
306
 
{
307
 
        uint32_t                repo_id;
308
 
        uint64_t                offset;
309
 
        uint64_t                blob_size;
310
 
        uint16_t                head_size;
311
 
        MSRepoFile      *repo_file;
312
 
        uint32_t                auth_code = 0;
313
 
 
314
 
        enter_();
315
 
        openForReading();
316
 
        
317
 
        getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &blob_size, &head_size, true);
318
 
        repo_file = getDB()->getRepoFileFromPool(repo_id, false);
319
 
 
320
 
        frompool_(repo_file);
321
 
        repo_file->commitBlob(this, offset, head_size, getDBTable()->myTableID, blob_id, blob_ref_id, auth_code);
322
 
        backtopool_(repo_file);
323
 
 
324
 
        exit_();
325
 
}
326
 
 
327
 
void MSOpenTable::useBlob(int type, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code, uint16_t col_index, uint64_t blob_size, uint64_t blob_ref_id, PBMSBlobURLPtr ret_blob_url)
328
 
{
329
 
        MSRepoFile              *repo_file= NULL;
330
 
        MSBlobHeadRec   blob;
331
 
        CSInputStream   *stream;
332
 
        MSDatabase              *blob_db;
333
 
        int                             state;
334
 
        uint16_t                        head_size;
335
 
        uint64_t                        repo_offset;
336
 
        uint32_t                        repo_id;
337
 
 
338
 
        enter_();
339
 
 
340
 
        blob_db = getDB();
341
 
                
342
 
        if (!blob_db->isRecovering()) {
343
 
                // During recovery the only thing that needs to be done is to 
344
 
                // reset the database ID which is done when the URL is created.
345
 
                // Create the URL using the table ID passed in not the one from 
346
 
                // the table associated with this object.
347
 
 
348
 
                openForReading();
349
 
                if (type == MS_URL_TYPE_REPO) { // There is no table reference associated with this BLOB yet.
350
 
                        uint32_t                ac;
351
 
                        uint8_t         status;
352
 
                        bool            same_db = true;
353
 
 
354
 
                        if (blob_db->myDatabaseID == db_id)
355
 
                                repo_file = blob_db->getRepoFileFromPool(tab_id, false);
356
 
                        else {
357
 
                                same_db = false;
358
 
                                blob_db = MSDatabase::getDatabase(db_id);
359
 
                                push_(blob_db);
360
 
                                repo_file = blob_db->getRepoFileFromPool(tab_id, false);
361
 
                                release_(blob_db);
362
 
                                blob_db = repo_file->myRepo->myRepoDatabase;
363
 
                        }
364
 
                
365
 
                        frompool_(repo_file);
366
 
                        repo_file->read(&blob, blob_id, MS_MIN_BLOB_HEAD_SIZE, MS_MIN_BLOB_HEAD_SIZE);
367
 
 
368
 
                        repo_offset = blob_id;
369
 
                        blob_size  = CS_GET_DISK_6(blob.rb_blob_data_size_6);
370
 
                        head_size = CS_GET_DISK_2(blob.rb_head_size_2);
371
 
                         
372
 
                        ac = CS_GET_DISK_4(blob.rb_auth_code_4);
373
 
                        if (auth_code != ac)
374
 
                                CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB identifier");
375
 
                        status = CS_GET_DISK_1(blob.rb_status_1);
376
 
                        if ( ! IN_USE_BLOB_STATUS(status))
377
 
                                CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB has already been deleted");
378
 
 
379
 
                        if (same_db) {
380
 
                                // Create a table reference to the BLOB:
381
 
                                repo_id = tab_id;
382
 
                                blob_id = getDBTable()->createBlobHandle(this, tab_id, blob_id, blob_size, head_size, auth_code);
383
 
                                state = MS_UB_NEW_HANDLE;
384
 
                        }
385
 
                        else {
386
 
                                
387
 
                                getDB()->openWriteRepo(this);
388
 
 
389
 
                                // If either databases are using cloud storage then this is
390
 
                                // not supported yet.                   
391
 
                                if (getDB()->myBlobCloud || myWriteRepo->myRepoDatabase->myBlobCloud)
392
 
                                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Copying cloud BLOB between databases is not supported.");
393
 
                        
394
 
                                stream = repo_file->getInputStream(repo_offset);
395
 
                                push_(stream);
396
 
                                repo_offset = myWriteRepo->copyBlob(this, head_size + blob_size, stream);                       
397
 
                                release_(stream);
398
 
 
399
 
                                // Create a table reference to the BLOB:
400
 
                                repo_id = myWriteRepo->myRepoID;
401
 
                                blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
402
 
                                state = MS_UB_NEW_BLOB;
403
 
                        }
404
 
                        backtopool_(repo_file);
405
 
                }
406
 
                else {
407
 
 
408
 
                        if (blob_db->myDatabaseID == db_id && getDBTable()->myTableID == tab_id) {
409
 
                                getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &repo_offset, &blob_size, &head_size, true);
410
 
                                
411
 
                                state = MS_UB_SAME_TAB;
412
 
                        }
413
 
                        else {
414
 
                                MSOpenTable *blob_otab;
415
 
 
416
 
                                blob_otab = MSTableList::getOpenTableByID(db_id, tab_id);
417
 
                                frompool_(blob_otab);
418
 
                                blob_otab->getDBTable()->readBlobHandle(blob_otab, blob_id, &auth_code, &repo_id, &repo_offset, &blob_size, &head_size, true);
419
 
                                if (blob_db->myDatabaseID == db_id) {
420
 
                                        blob_id = getDBTable()->findBlobHandle(this, repo_id, repo_offset, blob_size, head_size, auth_code);
421
 
                                        if (blob_id == 0)
422
 
                                                blob_id = getDBTable()->createBlobHandle(this, repo_id, repo_offset, blob_size, head_size, auth_code);
423
 
                                        state = MS_UB_NEW_HANDLE;
424
 
                                }
425
 
                                else {
426
 
 
427
 
                                        // If either databases are using cloud storage then this is
428
 
                                        // not supported yet.                   
429
 
                                        if (blob_db->myBlobCloud || myWriteRepo->myRepoDatabase->myBlobCloud)
430
 
                                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Copying cloud BLOB between databases is not supported.");
431
 
 
432
 
                                        // NOTE: For each BLOB reference copied from one database to another a new
433
 
                                        // BLOB will be created. This can result in multiple copies fo the same BLOB
434
 
                                        // in the destination database. One way around this would be to redisign things
435
 
                                        // so that there is one BLOB repository shared across all databases. 
436
 
                                        blob_db->openWriteRepo(this);
437
 
                                                                        
438
 
                                        stream = repo_file->getInputStream(repo_offset);
439
 
                                        push_(stream);
440
 
                                        
441
 
                                        repo_offset = myWriteRepo->copyBlob(this, head_size + blob_size, stream);
442
 
                                        
443
 
                                        release_(stream);
444
 
 
445
 
                                        repo_id = myWriteRepo->myRepoID;
446
 
                                        blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
447
 
                                        state = MS_UB_NEW_BLOB;
448
 
                                }
449
 
                                backtopool_(blob_otab);
450
 
                        }
451
 
                        
452
 
                }
453
 
                
454
 
                blob_ref_id = blob_db->newBlobRefId();
455
 
                
456
 
                // Always use the table ID of this table because regardless of
457
 
                // where the BLOB ref came from it is being inserted into this table.
458
 
                tab_id = getDBTable()->myTableID; 
459
 
                
460
 
                // Add the BLOB reference to the repository.
461
 
                repo_file = blob_db->getRepoFileFromPool(repo_id, false);               
462
 
                frompool_(repo_file);
463
 
                repo_file->referenceBlob(this, repo_offset, head_size, tab_id, blob_id, blob_ref_id, auth_code, col_index);             
464
 
                backtopool_(repo_file);
465
 
                
466
 
                MSTransactionManager::referenceBLOB(getDB()->myDatabaseID, tab_id, blob_id, blob_ref_id);
467
 
 
468
 
        } 
469
 
        
470
 
        formatBlobURL(ret_blob_url, blob_id, auth_code, blob_size, tab_id, blob_ref_id);
471
 
                
472
 
        exit_();
473
 
}
474
 
 
475
 
void MSOpenTable::releaseReference(uint64_t blob_id, uint64_t blob_ref_id)
476
 
{
477
 
        enter_();
478
 
        
479
 
        MSTransactionManager::dereferenceBLOB(getDB()->myDatabaseID, getDBTable()->myTableID, blob_id, blob_ref_id);
480
 
 
481
 
        exit_();
482
 
}
483
 
 
484
 
void MSOpenTable::checkBlob(CSStringBuffer *buffer, uint64_t blob_id, uint32_t auth_code, uint32_t temp_log_id, uint32_t temp_log_offset)
485
 
{
486
 
        uint32_t                repo_id;
487
 
        uint64_t                offset;
488
 
        uint64_t                size;
489
 
        uint16_t                head_size;
490
 
        MSRepoFile      *repo_file;
491
 
 
492
 
        enter_();
493
 
        openForReading();
494
 
        if (getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &size, &head_size, false)) {
495
 
                if ((repo_file = getDB()->getRepoFileFromPool(repo_id, true))) {
496
 
                        frompool_(repo_file);
497
 
                        repo_file->checkBlob(buffer, offset, auth_code, temp_log_id, temp_log_offset);
498
 
                        backtopool_(repo_file);
499
 
                }
500
 
                else
501
 
                        getDBTable()->freeBlobHandle(this, blob_id, repo_id, offset, auth_code);
502
 
        }
503
 
        exit_();
504
 
}
505
 
 
506
 
bool MSOpenTable::deleteReferences(uint32_t temp_log_id, uint32_t temp_log_offset, bool *must_quit)
507
 
{
508
 
        MSTableHeadRec          tab_head;
509
 
        off64_t                         blob_id;
510
 
        MSTableBlobRec          tab_blob;
511
 
        uint32_t                                repo_id;
512
 
        uint64_t                                repo_offset;
513
 
        uint16_t                                head_size;
514
 
        uint32_t                                auth_code;
515
 
        MSRepoFile                      *repo_file = NULL;
516
 
        bool                            result = true;
517
 
 
518
 
        enter_();
519
 
        openForReading();
520
 
        if (myTableFile->read(&tab_head, 0, offsetof(MSTableHeadRec, th_reserved_4), 0) < offsetof(MSTableHeadRec, th_reserved_4))
521
 
                /* Nothing to read, delete it ... */
522
 
                goto exit;
523
 
        if (CS_GET_DISK_4(tab_head.th_temp_log_id_4) != temp_log_id ||
524
 
                CS_GET_DISK_4(tab_head.th_temp_log_offset_4) != temp_log_offset) {
525
 
                /* Wrong delete reference (ignore): */
526
 
                result = false;
527
 
                goto exit;
528
 
        }
529
 
 
530
 
        blob_id = CS_GET_DISK_2(tab_head.th_head_size_2);
531
 
        while (blob_id + sizeof(MSTableBlobRec) <= getDBTable()->getTableFileSize()) {
532
 
                if (*must_quit) {
533
 
                        /* Bit of a waste of work, but we must quit! */
534
 
                        result = false;
535
 
                        break;
536
 
                }
537
 
                if (myTableFile->read(&tab_blob, blob_id, sizeof(MSTableBlobRec), 0) < sizeof(MSTableBlobRec))
538
 
                        break;
539
 
                repo_id = CS_GET_DISK_3(tab_blob.tb_repo_id_3);
540
 
                repo_offset = CS_GET_DISK_6(tab_blob.tb_offset_6);
541
 
                head_size = CS_GET_DISK_2(tab_blob.tb_header_size_2);
542
 
                auth_code = CS_GET_DISK_4(tab_blob.tb_auth_code_4);
543
 
                if (repo_file && repo_file->myRepo->myRepoID != repo_id) {
544
 
                        backtopool_(repo_file);
545
 
                        repo_file = NULL;
546
 
                }
547
 
                if (!repo_file) {
548
 
                        repo_file = getDB()->getRepoFileFromPool(repo_id, true);
549
 
                        if (repo_file)
550
 
                                frompool_(repo_file);
551
 
                }
552
 
                if (repo_file) 
553
 
                        repo_file->freeTableReference(this, repo_offset, head_size, getDBTable()->myTableID, blob_id, auth_code);
554
 
                
555
 
                blob_id += sizeof(MSTableBlobRec);
556
 
        }
557
 
        
558
 
        if (repo_file)
559
 
                backtopool_(repo_file);
560
 
 
561
 
        exit:
562
 
        return_(result);
563
 
}
564
 
 
565
 
void MSOpenTable::openForReading()
566
 
{
567
 
        if (!myTableFile && !isNotATable)
568
 
                myTableFile = getDBTable()->openTableFile();
569
 
}
570
 
 
571
 
void MSOpenTable::openForWriting()
572
 
{
573
 
        if (myTableFile && myWriteRepo && myWriteRepoFile)
574
 
                return;
575
 
        enter_();
576
 
        openForReading();
577
 
        if (!myWriteRepo || !myWriteRepoFile)
578
 
                getDB()->openWriteRepo(this);
579
 
        exit_();
580
 
}
581
 
 
582
 
void MSOpenTable::closeForWriting()
583
 
{
584
 
        if (myWriteRepoFile) {          
585
 
                myWriteRepoFile->myRepo->syncHead(myWriteRepoFile);
586
 
                myWriteRepoFile->release();
587
 
                myWriteRepoFile = NULL;
588
 
        }
589
 
        if (myWriteRepo) {
590
 
                myWriteRepo->unlockRepo(REPO_WRITE);
591
 
#ifndef MS_COMPACTOR_POLLS
592
 
                if (myWriteRepo->getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
593
 
                        if (myWriteRepo->myRepoDatabase->myCompactorThread)
594
 
                                myWriteRepo->myRepoDatabase->myCompactorThread->wakeup();
595
 
                }
596
 
#endif
597
 
                myWriteRepo->release();
598
 
                myWriteRepo = NULL;
599
 
        }
600
 
}
601
 
 
602
 
uint32_t MSOpenTable::getTableID()
603
 
{
604
 
        return myPool->myPoolTable->myTableID;
605
 
}
606
 
 
607
 
MSTable *MSOpenTable::getDBTable()
608
 
{
609
 
        return myPool->myPoolTable;
610
 
}
611
 
 
612
 
MSDatabase *MSOpenTable::getDB()
613
 
{
614
 
        return myPool->myPoolDB;
615
 
}
616
 
 
617
 
void MSOpenTable::formatBlobURL(PBMSBlobURLPtr blob_url, uint64_t blob_id, uint32_t auth_code, uint64_t blob_size, uint32_t tab_id, uint64_t blob_ref_id)
618
 
{
619
 
        MSBlobURLRec blob;
620
 
        
621
 
        blob.bu_type = MS_URL_TYPE_BLOB;
622
 
        blob.bu_db_id = getDB()->myDatabaseID;
623
 
        blob.bu_tab_id = tab_id;
624
 
        blob.bu_blob_id = blob_id;
625
 
        blob.bu_auth_code = auth_code;
626
 
        blob.bu_server_id = PBMSParameters::getServerID();
627
 
        blob.bu_blob_size = blob_size;
628
 
        blob.bu_blob_ref_id = blob_ref_id;
629
 
        
630
 
        PBMSBlobURLTools::buildBlobURL(&blob, blob_url);
631
 
        
632
 
}
633
 
void MSOpenTable::formatBlobURL(PBMSBlobURLPtr blob_url, uint64_t blob_id, uint32_t auth_code, uint64_t blob_size, uint64_t blob_ref_id)
634
 
{
635
 
        formatBlobURL(blob_url, blob_id, auth_code, blob_size, getDBTable()->myTableID, blob_ref_id);
636
 
}
637
 
void MSOpenTable::formatRepoURL(PBMSBlobURLPtr blob_url, uint32_t log_id, uint64_t log_offset, uint32_t auth_code, uint64_t blob_size)
638
 
{
639
 
        MSBlobURLRec blob;
640
 
        
641
 
        blob.bu_type = MS_URL_TYPE_REPO;
642
 
        blob.bu_db_id = getDB()->myDatabaseID;
643
 
        blob.bu_tab_id = log_id;
644
 
        blob.bu_blob_id = log_offset;
645
 
        blob.bu_auth_code = auth_code;
646
 
        blob.bu_server_id = PBMSParameters::getServerID();
647
 
        blob.bu_blob_size = blob_size;
648
 
        blob.bu_blob_ref_id = 0;
649
 
        
650
 
        PBMSBlobURLTools::buildBlobURL(&blob, blob_url);
651
 
}
652
 
 
653
 
MSOpenTable *MSOpenTable::newOpenTable(MSOpenTablePool *pool)
654
 
{
655
 
        MSOpenTable *otab;
656
 
        
657
 
        if (!(otab = new MSOpenTable()))
658
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
659
 
        if ((otab->myPool = pool))
660
 
                otab->isNotATable = pool->myPoolTable == NULL;
661
 
        else
662
 
                otab->isNotATable = false;
663
 
                
664
 
        return otab;
665
 
}
666
 
 
667
 
/*
668
 
 * ---------------------------------------------------------------
669
 
 * OPEN TABLE POOLS
670
 
 */
671
 
 
672
 
MSOpenTablePool::MSOpenTablePool():
673
 
myPoolTableID(0),
674
 
isRemovingTP(false),
675
 
myPoolTable(NULL),
676
 
myPoolDB(NULL),
677
 
iTablePool(NULL)
678
 
{
679
 
}
680
 
 
681
 
MSOpenTablePool::~MSOpenTablePool()
682
 
{
683
 
        isRemovingTP = true;
684
 
        removeOpenTablesNotInUse();
685
 
        /* With this, I also delete those that are in use!: */
686
 
        iPoolTables.clear();
687
 
        if (myPoolTable)
688
 
                myPoolTable->release();
689
 
        if (myPoolDB)
690
 
                myPoolDB->release();
691
 
}
692
 
 
693
 
#ifdef DEBUG
694
 
void MSOpenTablePool::check()
695
 
{
696
 
        MSOpenTable     *otab, *ptab;
697
 
        bool            found;
698
 
 
699
 
        if ((otab = (MSOpenTable *) iPoolTables.getBack())) {
700
 
                do {
701
 
                        found = false;
702
 
                        ptab = iTablePool;
703
 
                        while (ptab) {
704
 
                                if (ptab == otab) {
705
 
                                        ASSERT(!found);
706
 
                                        found = true;
707
 
                                }
708
 
                                ptab = ptab->nextTable;
709
 
                        }
710
 
                        if (otab->inUse) {
711
 
                                ASSERT(!found);
712
 
                        }
713
 
                        else {
714
 
                                ASSERT(found);
715
 
                        }
716
 
                        otab = (MSOpenTable *) otab->getNextLink();
717
 
                } while (otab);
718
 
        }
719
 
        else
720
 
                ASSERT(!iTablePool);
721
 
}
722
 
#endif
723
 
 
724
 
/*
725
 
 * This returns the table referenced. So it is safe from the pool being
726
 
 * destroyed.
727
 
 */
728
 
MSOpenTable *MSOpenTablePool::getPoolTable()
729
 
{
730
 
        MSOpenTable *otab;
731
 
 
732
 
        if ((otab = iTablePool)) {
733
 
                iTablePool = otab->nextTable;
734
 
                otab->nextTable = NULL;
735
 
                ASSERT(!otab->inUse);
736
 
                otab->inUse = true;
737
 
                otab->retain();
738
 
        }
739
 
        return otab;
740
 
}
741
 
 
742
 
void MSOpenTablePool::returnOpenTable(MSOpenTable *otab)
743
 
{
744
 
        otab->inUse = false;
745
 
        otab->nextTable = iTablePool;
746
 
        iTablePool = otab;
747
 
}
748
 
 
749
 
/*
750
 
 * Add a table to the pool, but do not release it!
751
 
 */
752
 
void MSOpenTablePool::addOpenTable(MSOpenTable *otab)
753
 
{
754
 
        iPoolTables.addFront(otab);
755
 
}
756
 
 
757
 
void MSOpenTablePool::removeOpenTable(MSOpenTable *otab)
758
 
{
759
 
        otab->close();
760
 
        iPoolTables.remove(otab);
761
 
}
762
 
 
763
 
void MSOpenTablePool::removeOpenTablesNotInUse()
764
 
{
765
 
        MSOpenTable *otab, *curr_otab;
766
 
 
767
 
        iTablePool = NULL;
768
 
        /* Remove all tables that are not in use: */
769
 
        if ((otab = (MSOpenTable *) iPoolTables.getBack())) {
770
 
                do {
771
 
                        curr_otab = otab;
772
 
                        otab = (MSOpenTable *) otab->getNextLink();
773
 
                        if (!curr_otab->inUse)
774
 
                                iPoolTables.remove(curr_otab);
775
 
                } while (otab);
776
 
        }
777
 
}
778
 
 
779
 
void MSOpenTablePool::returnToPool()
780
 
{
781
 
        MSTableList::removeTablePool(this);
782
 
}
783
 
 
784
 
MSOpenTablePool *MSOpenTablePool::newPool(uint32_t db_id, uint32_t tab_id)
785
 
{
786
 
        MSOpenTablePool *pool;
787
 
        enter_();
788
 
        
789
 
        if (!(pool = new MSOpenTablePool())) {
790
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
791
 
        }
792
 
        push_(pool);
793
 
        pool->myPoolDB = MSDatabase::getDatabase(db_id);
794
 
        pool->myPoolTableID = tab_id;
795
 
        if (tab_id)
796
 
                pool->myPoolTable = pool->myPoolDB->getTable(tab_id, false);
797
 
        pop_(pool);
798
 
        return_(pool);
799
 
}
800
 
 
801
 
/*
802
 
 * ---------------------------------------------------------------
803
 
 * TABLE LIST
804
 
 */
805
 
 
806
 
CSSyncOrderedList               *MSTableList::gPoolListByID;
807
 
 
808
 
MSTableList::MSTableList()
809
 
{       
810
 
}
811
 
 
812
 
MSTableList::~MSTableList()
813
 
{
814
 
}
815
 
 
816
 
void MSTableList::startUp()
817
 
{
818
 
        new_(gPoolListByID, CSSyncOrderedList);
819
 
}
820
 
 
821
 
void MSTableList::shutDown()
822
 
{
823
 
        if (gPoolListByID) {
824
 
                gPoolListByID->clear();
825
 
                gPoolListByID->release();
826
 
                gPoolListByID = NULL;
827
 
        }
828
 
}
829
 
 
830
 
class MSTableKey : public CSOrderKey {
831
 
public:
832
 
        uint32_t        myKeyDatabaseID;
833
 
        uint32_t        myKeyTableID;
834
 
 
835
 
        MSTableKey(): myKeyDatabaseID(0), myKeyTableID(0){ }
836
 
 
837
 
        virtual ~MSTableKey() {
838
 
        }
839
 
 
840
 
        int compareKey(CSObject *key) {return CSObject::compareKey(key);}
841
 
        virtual int compareKey(CSOrderKey *x) {
842
 
                MSTableKey      *key = (MSTableKey *) x;
843
 
                int                     r = 0;
844
 
 
845
 
                if (myKeyDatabaseID < key->myKeyDatabaseID)
846
 
                        r = -1;
847
 
                else if (myKeyDatabaseID > key->myKeyDatabaseID)
848
 
                        r = 1;
849
 
                        
850
 
                if (r == 0) {
851
 
                        if (myKeyTableID < key->myKeyTableID)
852
 
                                r = -1;
853
 
                        else if (myKeyTableID > key->myKeyTableID)
854
 
                                r = 1;
855
 
                }
856
 
                return r;
857
 
        }
858
 
 
859
 
public:
860
 
        static MSTableKey *newTableKey(uint32_t db_id, uint32_t tab_id)
861
 
        {
862
 
                MSTableKey *key;
863
 
 
864
 
                if (!(key = new MSTableKey())) {
865
 
                        CSException::throwOSError(CS_CONTEXT, ENOMEM);
866
 
                }
867
 
                key->myKeyDatabaseID = db_id;
868
 
                key->myKeyTableID = tab_id;
869
 
                return key;
870
 
        }
871
 
};
872
 
 
873
 
MSOpenTable *MSTableList::getOpenTableByID(uint32_t db_id, uint32_t tab_id)
874
 
{
875
 
        MSOpenTablePool         *pool;
876
 
        MSOpenTable                     *otab = NULL;
877
 
        MSTableKey                      key;
878
 
 
879
 
        enter_();
880
 
        lock_(gPoolListByID);
881
 
        key.myKeyDatabaseID = db_id;
882
 
        key.myKeyTableID = tab_id;
883
 
        pool = (MSOpenTablePool *) gPoolListByID->find(&key);
884
 
        if (!pool) {
885
 
                MSTableKey      *key_ptr;
886
 
                pool = MSOpenTablePool::newPool(db_id, tab_id);
887
 
                key_ptr = MSTableKey::newTableKey(db_id, tab_id);
888
 
                gPoolListByID->add(key_ptr, pool);
889
 
        }
890
 
        if (!(otab = pool->getPoolTable())) {
891
 
                otab = MSOpenTable::newOpenTable(pool);
892
 
                pool->addOpenTable(otab);
893
 
                otab->retain();
894
 
        }
895
 
        unlock_(gPoolListByID);
896
 
        return_(otab);
897
 
}
898
 
 
899
 
MSOpenTable *MSTableList::getOpenTableForDB(uint32_t db_id)
900
 
{
901
 
        return(MSTableList::getOpenTableByID(db_id, 0));
902
 
}
903
 
 
904
 
 
905
 
void MSTableList::releaseTable(MSOpenTable *otab)
906
 
{
907
 
        MSOpenTablePool *pool;
908
 
 
909
 
        enter_();
910
 
        lock_(gPoolListByID);
911
 
        push_(otab);
912
 
        if ((pool = otab->myPool)) {
913
 
                if (pool->isRemovingTP) {
914
 
                        pool->removeOpenTable(otab);
915
 
                        gPoolListByID->wakeup();
916
 
                }
917
 
                else
918
 
                        pool->returnOpenTable(otab);
919
 
        }
920
 
        release_(otab);
921
 
        unlock_(gPoolListByID);
922
 
        exit_();
923
 
}
924
 
 
925
 
bool MSTableList::removeTablePoolIfEmpty(MSOpenTablePool *pool)
926
 
{
927
 
        enter_();
928
 
        if (pool->getSize() == 0) {
929
 
                MSTableKey      key;
930
 
                
931
 
                key.myKeyDatabaseID = pool->myPoolDB->myDatabaseID;
932
 
                key.myKeyTableID = pool->myPoolTableID;
933
 
                gPoolListByID->remove(&key);
934
 
                /* TODO: Remove the table from the database, if it does not exist
935
 
                 * on disk.
936
 
                 */
937
 
                return_(true);
938
 
        }
939
 
        return_(false);
940
 
}
941
 
 
942
 
void MSTableList::removeTablePool(MSOpenTablePool *pool)
943
 
{
944
 
        enter_();
945
 
        lock_(gPoolListByID);
946
 
        for (;;) {
947
 
                pool->isRemovingTP = true;
948
 
                pool->removeOpenTablesNotInUse();
949
 
                if (removeTablePoolIfEmpty(pool)) 
950
 
                        break;
951
 
 
952
 
                /*
953
 
                 * Wait for the tables that are in use to be
954
 
                 * freed.
955
 
                 */
956
 
                gPoolListByID->wait();
957
 
        }
958
 
        unlock_(gPoolListByID);
959
 
        exit_();
960
 
}
961
 
 
962
 
/*
963
 
 * Close the pool associated with this open table.
964
 
 */
965
 
void MSTableList::removeTablePool(MSOpenTable *otab)
966
 
{
967
 
        MSOpenTablePool *pool;
968
 
        MSTableKey      key;
969
 
        
970
 
        key.myKeyDatabaseID = otab->getDB()->myDatabaseID;
971
 
        key.myKeyTableID = otab->getTableID();
972
 
 
973
 
        enter_();
974
 
        frompool_(otab);
975
 
        lock_(gPoolListByID);
976
 
        for (;;) {
977
 
                if (!(pool = (MSOpenTablePool *) gPoolListByID->find(&key)))
978
 
                        break;
979
 
                pool->isRemovingTP = true;
980
 
                pool->removeOpenTablesNotInUse();
981
 
                if (removeTablePoolIfEmpty(pool))
982
 
                        break;
983
 
                /*
984
 
                 * Wait for the tables that are in use to be
985
 
                 * freed.
986
 
                 */
987
 
                gPoolListByID->wait();
988
 
        }
989
 
        unlock_(gPoolListByID);
990
 
        backtopool_(otab);
991
 
        exit_();
992
 
}
993
 
 
994
 
void MSTableList::removeDatabaseTables(MSDatabase *database)
995
 
{
996
 
        MSOpenTablePool *pool;
997
 
        uint32_t                        idx;
998
 
        
999
 
 
1000
 
        enter_();
1001
 
        push_(database);
1002
 
        
1003
 
        retry:
1004
 
        lock_(gPoolListByID);
1005
 
        idx = 0;
1006
 
        while ((pool = (MSOpenTablePool *) gPoolListByID->itemAt(idx))) {
1007
 
                if (pool->myPoolDB == database) {
1008
 
                        break;
1009
 
                }
1010
 
                idx++;
1011
 
        }
1012
 
        unlock_(gPoolListByID);
1013
 
 
1014
 
        if (pool) {
1015
 
                removeTablePool(pool);
1016
 
                goto retry;
1017
 
        }
1018
 
        
1019
 
        release_(database);
1020
 
        exit_();
1021
 
}
1022
 
 
1023
 
// lockTablePoolForDeletion() is only called to lock a pool for a table which is about  to be removed.
1024
 
// When the pool is returned then it will be removed from the global pool list.
1025
 
MSOpenTablePool *MSTableList::lockTablePoolForDeletion(uint32_t db_id, uint32_t tab_id, CSString *db_name, CSString *tab_name)
1026
 
{
1027
 
        MSOpenTablePool *pool;
1028
 
        MSTableKey              key;
1029
 
 
1030
 
        enter_();
1031
 
 
1032
 
        push_(db_name);
1033
 
        if (tab_name)
1034
 
                push_(tab_name);
1035
 
                
1036
 
        key.myKeyDatabaseID = db_id;
1037
 
        key.myKeyTableID = tab_id;
1038
 
        
1039
 
        lock_(gPoolListByID);
1040
 
 
1041
 
        for (;;) {
1042
 
                if (!(pool = (MSOpenTablePool *) gPoolListByID->find(&key))) {
1043
 
                        char buffer[CS_EXC_MESSAGE_SIZE];
1044
 
 
1045
 
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Table is temporarily not available: ");
1046
 
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, db_name->getCString());
1047
 
                        if(tab_name) {
1048
 
                                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, ".");
1049
 
                                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, tab_name->getCString());
1050
 
                        }
1051
 
                        CSException::throwException(CS_CONTEXT, MS_ERR_TABLE_LOCKED, buffer);
1052
 
                }
1053
 
                pool->isRemovingTP = true;
1054
 
                pool->removeOpenTablesNotInUse();
1055
 
                if (pool->getSize() == 0) {
1056
 
                        // pool->retain();      Do not do this. The return to pool will free this by removing it from the list. 
1057
 
                        break;
1058
 
                }
1059
 
                /*
1060
 
                 * Wait for the tables that are in use to be
1061
 
                 * freed.
1062
 
                 */
1063
 
                gPoolListByID->wait();
1064
 
        }
1065
 
        unlock_(gPoolListByID);
1066
 
        
1067
 
        if (tab_name)
1068
 
                release_(tab_name);
1069
 
        release_(db_name);
1070
 
        return_(pool);  
1071
 
        
1072
 
}
1073
 
 
1074
 
MSOpenTablePool *MSTableList::lockTablePoolForDeletion(MSTable *tab)
1075
 
{
1076
 
        CSString *tab_name = NULL, *db_name;
1077
 
        uint32_t db_id, tab_id;
1078
 
        
1079
 
        enter_();
1080
 
 
1081
 
        db_name = tab->myDatabase->myDatabaseName;
1082
 
        db_name->retain();
1083
 
 
1084
 
        tab_name = tab->myTableName;
1085
 
        tab_name->retain();
1086
 
        
1087
 
        db_id = tab->myDatabase->myDatabaseID;
1088
 
        tab_id = tab->myTableID;
1089
 
        
1090
 
        tab->release();
1091
 
        
1092
 
        return_( lockTablePoolForDeletion(db_id, tab_id, db_name, tab_name));
1093
 
}
1094
 
 
1095
 
MSOpenTablePool *MSTableList::lockTablePoolForDeletion(MSOpenTable *otab)
1096
 
{
1097
 
        CSString *tab_name = NULL, *db_name;
1098
 
        uint32_t db_id, tab_id;
1099
 
        MSTable *tab;
1100
 
 
1101
 
        enter_();
1102
 
        
1103
 
        tab = otab->getDBTable();
1104
 
        if (tab) {
1105
 
                tab_name = tab->myTableName;
1106
 
                tab_name->retain();
1107
 
        }
1108
 
        
1109
 
        db_name = otab->getDB()->myDatabaseName;
1110
 
        db_name->retain();
1111
 
 
1112
 
        db_id = otab->getDB()->myDatabaseID;
1113
 
        tab_id = otab->getTableID();
1114
 
 
1115
 
        otab->returnToPool();
1116
 
 
1117
 
        return_( lockTablePoolForDeletion(db_id, tab_id, db_name, tab_name));
1118
 
        
1119
 
}
1120
 
 
1121