~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pbms/src/systab_cloud_ms.cc

  • Committer: Package Import Robot
  • Author(s): Clint Byrum
  • Date: 2012-06-19 10:46:49 UTC
  • mfrom: (1.1.6)
  • mto: This revision was merged to the branch mainline in revision 29.
  • Revision ID: package-import@ubuntu.com-20120619104649-e2l0ggd4oz3um0f4
Tags: upstream-7.1.36-stable
ImportĀ upstreamĀ versionĀ 7.1.36-stable

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Barry Leslie
20
 
 *
21
 
 * 2009-10-21
22
 
 *
23
 
 * System cloud starage info table.
24
 
 *
25
 
 */
26
 
#ifdef DRIZZLED
27
 
#include <config.h>
28
 
#include <drizzled/common.h>
29
 
#include <drizzled/session.h>
30
 
#endif
31
 
 
32
 
#include "cslib/CSConfig.h"
33
 
#include <inttypes.h>
34
 
 
35
 
#include <sys/types.h>
36
 
#include <sys/stat.h>
37
 
#include <stdlib.h>
38
 
#include <time.h>
39
 
 
40
 
//#include "mysql_priv.h"
41
 
#include "cslib/CSGlobal.h"
42
 
#include "cslib/CSStrUtil.h"
43
 
#include "cslib/CSLog.h"
44
 
#include "cslib/CSPath.h"
45
 
#include "cslib/CSDirectory.h"
46
 
 
47
 
#include "ha_pbms.h"
48
 
//#include <plugin.h>
49
 
 
50
 
#include "mysql_ms.h"
51
 
#include "database_ms.h"
52
 
#include "open_table_ms.h"
53
 
#include "discover_ms.h"
54
 
#include "systab_util_ms.h"
55
 
 
56
 
#include "systab_cloud_ms.h"
57
 
 
58
 
DT_FIELD_INFO pbms_cloud_info[]=
59
 
{
60
 
        {"Id",                  NOVAL,  NULL, MYSQL_TYPE_LONG,          NULL,                   NOT_NULL_FLAG,  "The Cloud storage reference ID"},
61
 
        {"Server",              1024,   NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 server name"},
62
 
        {"Bucket",              124,    NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 bucket name"},
63
 
        {"PublicKey",   124,    NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 public key"},
64
 
        {"PrivateKey",  124,    NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 private key"},
65
 
        {NULL,NOVAL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
66
 
};
67
 
 
68
 
DT_KEY_INFO pbms_cloud_keys[]=
69
 
{
70
 
        {"pbms_cloud_pk", PRI_KEY_FLAG, {"Id", NULL}},
71
 
        {NULL, 0, {NULL}}
72
 
};
73
 
 
74
 
#define MIN_CLOUD_TABLE_SIZE 4
75
 
 
76
 
//----------------------------
77
 
void MSCloudTable::startUp()
78
 
{
79
 
        MSCloudInfo::startUp();
80
 
}
81
 
 
82
 
//----------------------------
83
 
void MSCloudTable::shutDown()
84
 
{
85
 
        MSCloudInfo::shutDown();
86
 
}
87
 
 
88
 
//----------------------------
89
 
void MSCloudTable::loadTable(MSDatabase *db)
90
 
{
91
 
 
92
 
        enter_();
93
 
        
94
 
        push_(db);
95
 
        lock_(MSCloudInfo::gCloudInfo);
96
 
        
97
 
        if (MSCloudInfo::gMaxInfoRef == 0) {
98
 
                CSPath  *path;
99
 
                path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
100
 
                push_(path);
101
 
 
102
 
                if (path->exists()) {
103
 
                        CSFile          *file;
104
 
                        SysTabRec       *cloudData;
105
 
                        const char      *server, *bucket, *pubKey, *privKey;
106
 
                        uint32_t                info_id;
107
 
                        MSCloudInfo     *info;
108
 
                        size_t          size;
109
 
                        
110
 
                        new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
111
 
                        push_(cloudData);
112
 
 
113
 
                        file = path->openFile(CSFile::READONLY);
114
 
                        push_(file);
115
 
                        size = file->getEOF();
116
 
                        cloudData->setLength(size);
117
 
                        file->read(cloudData->getBuffer(0), 0, size, size);
118
 
                        release_(file);
119
 
                        
120
 
                        cloudData->firstRecord();
121
 
                        MSCloudInfo::gMaxInfoRef = cloudData->getInt4Field();
122
 
                        
123
 
                        if (! cloudData->isValidRecord()) 
124
 
                                MSCloudInfo::gMaxInfoRef = 1;
125
 
                        
126
 
                        while (cloudData->nextRecord()) {
127
 
                                info_id = cloudData->getInt4Field();
128
 
                                server = cloudData->getStringField();
129
 
                                bucket = cloudData->getStringField();
130
 
                                pubKey = cloudData->getStringField();
131
 
                                privKey = cloudData->getStringField();
132
 
                                
133
 
                                if (cloudData->isValidRecord()) {
134
 
                                        if (info_id > MSCloudInfo::gMaxInfoRef) {
135
 
                                                char msg[80];
136
 
                                                snprintf(msg, 80, "Cloud info id (%"PRIu32") larger than expected (%"PRIu32")\n", info_id, MSCloudInfo::gMaxInfoRef);
137
 
                                                CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
138
 
                                                CSL.log(self, CSLog::Warning, msg);
139
 
                                                MSCloudInfo::gMaxInfoRef = info_id +1;
140
 
                                        }
141
 
                                        if ( MSCloudInfo::gCloudInfo->get(info_id)) {
142
 
                                                char msg[80];
143
 
                                                snprintf(msg, 80, "Duplicate Cloud info id (%"PRIu32") being ignored\n", info_id);
144
 
                                                CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
145
 
                                                CSL.log(self, CSLog::Warning, msg);
146
 
                                        } else {
147
 
                                                new_(info, MSCloudInfo( info_id, server, bucket, pubKey, privKey));
148
 
                                                MSCloudInfo::gCloudInfo->set(info_id, info);
149
 
                                        }
150
 
                                }
151
 
                        }
152
 
                        release_(cloudData); cloudData = NULL;
153
 
                        
154
 
                } else
155
 
                        MSCloudInfo::gMaxInfoRef = 1;
156
 
                
157
 
                release_(path);
158
 
                
159
 
        }
160
 
        unlock_(MSCloudInfo::gCloudInfo);
161
 
 
162
 
        release_(db);
163
 
 
164
 
        exit_();
165
 
}
166
 
 
167
 
void MSCloudTable::saveTable(MSDatabase *db)
168
 
{
169
 
        SysTabRec               *cloudData;
170
 
        MSCloudInfo             *info;
171
 
        enter_();
172
 
        
173
 
        push_(db);
174
 
        
175
 
        new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
176
 
        push_(cloudData);
177
 
        
178
 
        // Build the table records
179
 
        cloudData->clear();
180
 
        lock_(MSCloudInfo::gCloudInfo);
181
 
        
182
 
        cloudData->beginRecord();       
183
 
        cloudData->setInt4Field(MSCloudInfo::gMaxInfoRef);
184
 
        cloudData->endRecord(); 
185
 
        for  (int i = 0;(info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->itemAt(i)); i++) { // info is not referenced.
186
 
                
187
 
                cloudData->beginRecord();       
188
 
                cloudData->setInt4Field(info->getCloudRefId());
189
 
                cloudData->setStringField(info->getServer());
190
 
                cloudData->setStringField(info->getBucket());
191
 
                cloudData->setStringField(info->getPublicKey());
192
 
                cloudData->setStringField(info->getPrivateKey());
193
 
                cloudData->endRecord();                 
194
 
        }
195
 
        unlock_(MSCloudInfo::gCloudInfo);
196
 
 
197
 
        restoreTable(RETAIN(db), cloudData->getBuffer(0), cloudData->length(), false);
198
 
        
199
 
        release_(cloudData);
200
 
        release_(db);
201
 
        exit_();
202
 
}
203
 
 
204
 
 
205
 
MSCloudTable::MSCloudTable(MSSystemTableShare *share, TABLE *table):
206
 
MSOpenSystemTable(share, table),
207
 
iCloudIndex(0)
208
 
{
209
 
}
210
 
 
211
 
MSCloudTable::~MSCloudTable()
212
 
{
213
 
        //unuse();
214
 
}
215
 
 
216
 
void MSCloudTable::use()
217
 
{
218
 
        MSCloudInfo::gCloudInfo->lock();
219
 
}
220
 
 
221
 
void MSCloudTable::unuse()
222
 
{
223
 
        MSCloudInfo::gCloudInfo->unlock();
224
 
        
225
 
}
226
 
 
227
 
 
228
 
void MSCloudTable::seqScanInit()
229
 
{
230
 
        iCloudIndex = 0;
231
 
}
232
 
 
233
 
#define MAX_PASSWORD ((int32_t)64)
234
 
bool MSCloudTable::seqScanNext(char *buf)
235
 
{
236
 
        char            passwd[MAX_PASSWORD +1];
237
 
        TABLE           *table = mySQLTable;
238
 
        Field           *curr_field;
239
 
        byte            *save;
240
 
        MY_BITMAP       *save_write_set;
241
 
        MSCloudInfo     *info;
242
 
        const char      *val;
243
 
        
244
 
        enter_();
245
 
        
246
 
        info = (MSCloudInfo     *) MSCloudInfo::gCloudInfo->itemAt(iCloudIndex++); // Object is not referenced.
247
 
        if (!info)
248
 
                return_(false);
249
 
        
250
 
        save_write_set = table->write_set;
251
 
        table->write_set = NULL;
252
 
 
253
 
#ifdef DRIZZLED
254
 
        memset(buf, 0xFF, table->getNullBytes());
255
 
#else
256
 
        memset(buf, 0xFF, table->s->null_bytes);
257
 
#endif
258
 
        for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
259
 
                curr_field = *field;
260
 
                save = curr_field->ptr;
261
 
#if MYSQL_VERSION_ID < 50114
262
 
                curr_field->ptr = (byte *) buf + curr_field->offset();
263
 
#else
264
 
#ifdef DRIZZLED
265
 
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
266
 
#else
267
 
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
268
 
#endif
269
 
#endif
270
 
                switch (curr_field->field_name[0]) {
271
 
                        case 'I':
272
 
                                ASSERT(strcmp(curr_field->field_name, "Id") == 0);
273
 
                                curr_field->store(info->getCloudRefId(), true);
274
 
                                break;
275
 
 
276
 
                        case 'S':
277
 
                                ASSERT(strcmp(curr_field->field_name, "Server") == 0);
278
 
                                val = info->getServer();
279
 
                                curr_field->store(val, strlen(val), &UTF8_CHARSET);
280
 
                                setNotNullInRecord(curr_field, buf);
281
 
                                break;
282
 
 
283
 
                        case 'B': 
284
 
                                ASSERT(strcmp(curr_field->field_name, "Bucket") == 0);
285
 
                                val = info->getBucket();
286
 
                                curr_field->store(val, strlen(val), &UTF8_CHARSET);
287
 
                                setNotNullInRecord(curr_field, buf);
288
 
                                break;
289
 
 
290
 
                        case 'P': 
291
 
                                if (curr_field->field_name[1] == 'u') {
292
 
                                        ASSERT(strcmp(curr_field->field_name, "PublicKey") == 0);
293
 
                                        val = info->getPublicKey();
294
 
                                } else if (curr_field->field_name[1] == 'r') {
295
 
                                        ASSERT(strcmp(curr_field->field_name, "PrivateKey") == 0);
296
 
                                        val = info->getPrivateKey();
297
 
                                        
298
 
                                        int32_t i;
299
 
                                        for (i = 0; (i < MAX_PASSWORD) && (i < (int32_t)strlen(val)); i++) passwd[i] = '*';
300
 
                                        passwd[i] = 0;
301
 
                                        val = passwd;
302
 
                                } else {
303
 
                                        ASSERT(false);
304
 
                                        break;
305
 
                                }
306
 
                                curr_field->store(val, strlen(val), &UTF8_CHARSET);
307
 
                                setNotNullInRecord(curr_field, buf);
308
 
                                break;
309
 
                                
310
 
                        default:
311
 
                                ASSERT(false);
312
 
                }
313
 
                curr_field->ptr = save;
314
 
        }
315
 
 
316
 
        table->write_set = save_write_set;
317
 
        
318
 
        return_(true);
319
 
}
320
 
 
321
 
void MSCloudTable::seqScanPos(unsigned char *pos )
322
 
{
323
 
        int32_t index = iCloudIndex -1;
324
 
        if (index < 0)
325
 
                index = 0; // This is probably an error condition.
326
 
                
327
 
        mi_int4store(pos, index);
328
 
}
329
 
 
330
 
void MSCloudTable::seqScanRead(unsigned char *pos , char *buf)
331
 
{
332
 
        iCloudIndex = mi_uint4korr(pos);
333
 
        seqScanNext(buf);
334
 
}
335
 
 
336
 
void MSCloudTable::updateRow(char *old_data, char *new_data) 
337
 
{
338
 
        uint32_t n_id, o_id, o_indx, n_indx;
339
 
        const char *realPrivKey;
340
 
        String server, bucket, pubKey, privKey;
341
 
        String o_server, o_bucket, o_pubKey, o_privKey;
342
 
        MSCloudInfo *info;
343
 
 
344
 
        enter_();
345
 
        
346
 
        getFieldValue(new_data, 0, &n_id);
347
 
        getFieldValue(new_data, 1, &server);
348
 
        getFieldValue(new_data, 2, &bucket);
349
 
        getFieldValue(new_data, 3, &pubKey);
350
 
        getFieldValue(new_data, 4, &privKey);
351
 
 
352
 
        getFieldValue(old_data, 0, &o_id);
353
 
        getFieldValue(old_data, 1, &o_server);
354
 
        getFieldValue(old_data, 2, &o_bucket);
355
 
        getFieldValue(old_data, 3, &o_pubKey);
356
 
        getFieldValue(old_data, 4, &o_privKey);
357
 
 
358
 
        // The cloud ID must be unique
359
 
        if ((o_id !=  n_id) && MSCloudInfo::gCloudInfo->get(n_id)) {
360
 
                CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to update a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
361
 
        }
362
 
 
363
 
        // The private key is masked when returned to the caller, so
364
 
        // unless the caller has updated it we need to get the real 
365
 
        // private key from the old record.
366
 
        if (strcmp(privKey.c_ptr(), o_privKey.c_ptr()))
367
 
                realPrivKey = privKey.c_ptr();
368
 
        else {
369
 
                info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->get(o_id); // unreference pointer
370
 
                realPrivKey = info->getPrivateKey();
371
 
        }
372
 
        
373
 
        new_(info, MSCloudInfo( n_id, server.c_ptr(), bucket.c_ptr(), pubKey.c_ptr(), realPrivKey));
374
 
        push_(info);
375
 
        
376
 
        o_indx = MSCloudInfo::gCloudInfo->getIndex(o_id);
377
 
 
378
 
        MSCloudInfo::gCloudInfo->remove(o_id);
379
 
        pop_(info);
380
 
        MSCloudInfo::gCloudInfo->set(n_id, info);
381
 
        n_indx = MSCloudInfo::gCloudInfo->getIndex(n_id);
382
 
        
383
 
        // Adjust the current position in the array if required.
384
 
        if (o_indx < n_indx )
385
 
                iCloudIndex--;
386
 
                
387
 
        saveTable(RETAIN(myShare->mySysDatabase));
388
 
        exit_();
389
 
}
390
 
 
391
 
void MSCloudTable::insertRow(char *data) 
392
 
{
393
 
        uint32_t ref_id;
394
 
        String server, bucket, pubKey, privKey;
395
 
        MSCloudInfo *info;
396
 
 
397
 
        enter_();
398
 
        
399
 
        getFieldValue(data, 0, &ref_id);
400
 
                
401
 
        // The cloud ID must be unique
402
 
        if (ref_id && MSCloudInfo::gCloudInfo->get(ref_id)) {
403
 
                CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to insert a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
404
 
        }
405
 
        
406
 
        getFieldValue(data, 1, &server);
407
 
        getFieldValue(data, 2, &bucket);
408
 
        getFieldValue(data, 3, &pubKey);
409
 
        getFieldValue(data, 4, &privKey);
410
 
 
411
 
        if (ref_id == 0)
412
 
                ref_id = MSCloudInfo::gMaxInfoRef++;
413
 
        else if (ref_id >= MSCloudInfo::gMaxInfoRef)
414
 
                MSCloudInfo::gMaxInfoRef = ref_id +1;
415
 
                
416
 
        new_(info, MSCloudInfo( ref_id, server.c_ptr(), bucket.c_ptr(), pubKey.c_ptr(), privKey.c_ptr()));
417
 
        MSCloudInfo::gCloudInfo->set(ref_id, info);
418
 
        
419
 
        saveTable(RETAIN(myShare->mySysDatabase));
420
 
        exit_();
421
 
}
422
 
 
423
 
void MSCloudTable::deleteRow(char *data) 
424
 
{
425
 
        uint32_t ref_id, indx;
426
 
 
427
 
        enter_();
428
 
        
429
 
        getFieldValue(data, 0, &ref_id);
430
 
                
431
 
        // Adjust the current position in the array if required.
432
 
        indx = MSCloudInfo::gCloudInfo->getIndex(ref_id);
433
 
        if (indx <= iCloudIndex)
434
 
                iCloudIndex--;
435
 
 
436
 
        MSCloudInfo::gCloudInfo->remove(ref_id);
437
 
        saveTable(RETAIN(myShare->mySysDatabase));
438
 
        exit_();
439
 
}
440
 
 
441
 
void MSCloudTable::transferTable(MSDatabase *to_db, MSDatabase *from_db)
442
 
{
443
 
        CSPath  *path;
444
 
        enter_();
445
 
        
446
 
        push_(from_db);
447
 
        push_(to_db);
448
 
        
449
 
        path = CSPath::newPath(getPBMSPath(RETAIN(from_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
450
 
        push_(path);
451
 
        if (path->exists()) {
452
 
                CSPath  *bu_path;
453
 
                bu_path = CSPath::newPath(getPBMSPath(RETAIN(to_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
454
 
                path->copyTo(bu_path, true);
455
 
        }
456
 
        
457
 
        release_(path);
458
 
        release_(to_db);
459
 
        release_(from_db);
460
 
        
461
 
        exit_();
462
 
}
463
 
 
464
 
CSStringBuffer *MSCloudTable::dumpTable(MSDatabase *db)
465
 
{
466
 
 
467
 
        CSPath                  *path;
468
 
        CSStringBuffer  *dump;
469
 
 
470
 
        enter_();
471
 
        
472
 
        push_(db);
473
 
        path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
474
 
        release_(db);
475
 
        
476
 
        push_(path);
477
 
        new_(dump, CSStringBuffer(20));
478
 
        push_(dump);
479
 
 
480
 
        if (path->exists()) {
481
 
                CSFile  *file;
482
 
                size_t  size;
483
 
                
484
 
                file = path->openFile(CSFile::READONLY);
485
 
                push_(file);
486
 
                
487
 
                size = file->getEOF();
488
 
                dump->setLength(size);
489
 
                file->read(dump->getBuffer(0), 0, size, size);
490
 
                release_(file);
491
 
        }
492
 
        
493
 
        pop_(dump);
494
 
        release_(path);
495
 
        return_(dump);
496
 
}
497
 
 
498
 
void MSCloudTable::restoreTable(MSDatabase *db, const char *data, size_t size, bool reload)
499
 
{
500
 
        CSPath  *path;
501
 
        CSFile  *file;
502
 
 
503
 
        enter_();
504
 
        
505
 
        push_(db);
506
 
        path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
507
 
        push_(path);
508
 
        
509
 
        file = path->openFile(CSFile::CREATE | CSFile::TRUNCATE);
510
 
        push_(file);
511
 
        
512
 
        file->write(data, 0, size);
513
 
        file->close();
514
 
        release_(file);
515
 
        
516
 
        release_(path);
517
 
        
518
 
        pop_(db);
519
 
        if (reload)
520
 
                loadTable(db);
521
 
        else
522
 
                db->release();
523
 
                
524
 
        exit_();
525
 
}
526
 
 
527
 
void MSCloudTable::removeTable(CSString *db_path)
528
 
{
529
 
        CSPath  *path;
530
 
        char pbms_path[PATH_MAX];
531
 
        
532
 
        enter_();
533
 
        
534
 
        push_(db_path); 
535
 
        cs_strcpy(PATH_MAX, pbms_path, db_path->getCString());
536
 
        release_(db_path);
537
 
        
538
 
        if (strcmp(cs_last_name_of_path(pbms_path), "pbms")  != 0)
539
 
                exit_();
540
 
                
541
 
        cs_remove_last_name_of_path(pbms_path);
542
 
 
543
 
        path = getSysFile(CSString::newString(pbms_path), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
544
 
        push_(path);
545
 
        
546
 
        if (path->exists())
547
 
                path->removeFile();
548
 
        release_(path);
549
 
        
550
 
        exit_();
551
 
}
552