1
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
* System cloud starage info table.
28
#include <drizzled/common.h>
29
#include <drizzled/session.h>
32
#include "cslib/CSConfig.h"
35
#include <sys/types.h>
40
//#include "mysql_priv.h"
41
#include "cslib/CSGlobal.h"
42
#include "cslib/CSStrUtil.h"
43
#include "cslib/CSLog.h"
44
#include "cslib/CSPath.h"
45
#include "cslib/CSDirectory.h"
51
#include "database_ms.h"
52
#include "open_table_ms.h"
53
#include "discover_ms.h"
54
#include "systab_util_ms.h"
56
#include "systab_cloud_ms.h"
58
DT_FIELD_INFO pbms_cloud_info[]=
60
{"Id", NOVAL, NULL, MYSQL_TYPE_LONG, NULL, NOT_NULL_FLAG, "The Cloud storage reference ID"},
61
{"Server", 1024, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET, NOT_NULL_FLAG, "S3 server name"},
62
{"Bucket", 124, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET, NOT_NULL_FLAG, "S3 bucket name"},
63
{"PublicKey", 124, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET, NOT_NULL_FLAG, "S3 public key"},
64
{"PrivateKey", 124, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET, NOT_NULL_FLAG, "S3 private key"},
65
{NULL,NOVAL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
68
DT_KEY_INFO pbms_cloud_keys[]=
70
{"pbms_cloud_pk", PRI_KEY_FLAG, {"Id", NULL}},
74
#define MIN_CLOUD_TABLE_SIZE 4
76
//----------------------------
77
void MSCloudTable::startUp()
79
MSCloudInfo::startUp();
82
//----------------------------
83
void MSCloudTable::shutDown()
85
MSCloudInfo::shutDown();
88
//----------------------------
89
void MSCloudTable::loadTable(MSDatabase *db)
95
lock_(MSCloudInfo::gCloudInfo);
97
if (MSCloudInfo::gMaxInfoRef == 0) {
99
path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
102
if (path->exists()) {
104
SysTabRec *cloudData;
105
const char *server, *bucket, *pubKey, *privKey;
110
new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
113
file = path->openFile(CSFile::READONLY);
115
size = file->getEOF();
116
cloudData->setLength(size);
117
file->read(cloudData->getBuffer(0), 0, size, size);
120
cloudData->firstRecord();
121
MSCloudInfo::gMaxInfoRef = cloudData->getInt4Field();
123
if (! cloudData->isValidRecord())
124
MSCloudInfo::gMaxInfoRef = 1;
126
while (cloudData->nextRecord()) {
127
info_id = cloudData->getInt4Field();
128
server = cloudData->getStringField();
129
bucket = cloudData->getStringField();
130
pubKey = cloudData->getStringField();
131
privKey = cloudData->getStringField();
133
if (cloudData->isValidRecord()) {
134
if (info_id > MSCloudInfo::gMaxInfoRef) {
136
snprintf(msg, 80, "Cloud info id (%"PRIu32") larger than expected (%"PRIu32")\n", info_id, MSCloudInfo::gMaxInfoRef);
137
CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
138
CSL.log(self, CSLog::Warning, msg);
139
MSCloudInfo::gMaxInfoRef = info_id +1;
141
if ( MSCloudInfo::gCloudInfo->get(info_id)) {
143
snprintf(msg, 80, "Duplicate Cloud info id (%"PRIu32") being ignored\n", info_id);
144
CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
145
CSL.log(self, CSLog::Warning, msg);
147
new_(info, MSCloudInfo( info_id, server, bucket, pubKey, privKey));
148
MSCloudInfo::gCloudInfo->set(info_id, info);
152
release_(cloudData); cloudData = NULL;
155
MSCloudInfo::gMaxInfoRef = 1;
160
unlock_(MSCloudInfo::gCloudInfo);
167
void MSCloudTable::saveTable(MSDatabase *db)
169
SysTabRec *cloudData;
175
new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
178
// Build the table records
180
lock_(MSCloudInfo::gCloudInfo);
182
cloudData->beginRecord();
183
cloudData->setInt4Field(MSCloudInfo::gMaxInfoRef);
184
cloudData->endRecord();
185
for (int i = 0;(info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->itemAt(i)); i++) { // info is not referenced.
187
cloudData->beginRecord();
188
cloudData->setInt4Field(info->getCloudRefId());
189
cloudData->setStringField(info->getServer());
190
cloudData->setStringField(info->getBucket());
191
cloudData->setStringField(info->getPublicKey());
192
cloudData->setStringField(info->getPrivateKey());
193
cloudData->endRecord();
195
unlock_(MSCloudInfo::gCloudInfo);
197
restoreTable(RETAIN(db), cloudData->getBuffer(0), cloudData->length(), false);
205
MSCloudTable::MSCloudTable(MSSystemTableShare *share, TABLE *table):
206
MSOpenSystemTable(share, table),
211
MSCloudTable::~MSCloudTable()
216
void MSCloudTable::use()
218
MSCloudInfo::gCloudInfo->lock();
221
void MSCloudTable::unuse()
223
MSCloudInfo::gCloudInfo->unlock();
228
void MSCloudTable::seqScanInit()
233
#define MAX_PASSWORD ((int32_t)64)
234
bool MSCloudTable::seqScanNext(char *buf)
236
char passwd[MAX_PASSWORD +1];
237
TABLE *table = mySQLTable;
240
MY_BITMAP *save_write_set;
246
info = (MSCloudInfo *) MSCloudInfo::gCloudInfo->itemAt(iCloudIndex++); // Object is not referenced.
250
save_write_set = table->write_set;
251
table->write_set = NULL;
254
memset(buf, 0xFF, table->getNullBytes());
256
memset(buf, 0xFF, table->s->null_bytes);
258
for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
260
save = curr_field->ptr;
261
#if MYSQL_VERSION_ID < 50114
262
curr_field->ptr = (byte *) buf + curr_field->offset();
265
curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
267
curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
270
switch (curr_field->field_name[0]) {
272
ASSERT(strcmp(curr_field->field_name, "Id") == 0);
273
curr_field->store(info->getCloudRefId(), true);
277
ASSERT(strcmp(curr_field->field_name, "Server") == 0);
278
val = info->getServer();
279
curr_field->store(val, strlen(val), &UTF8_CHARSET);
280
setNotNullInRecord(curr_field, buf);
284
ASSERT(strcmp(curr_field->field_name, "Bucket") == 0);
285
val = info->getBucket();
286
curr_field->store(val, strlen(val), &UTF8_CHARSET);
287
setNotNullInRecord(curr_field, buf);
291
if (curr_field->field_name[1] == 'u') {
292
ASSERT(strcmp(curr_field->field_name, "PublicKey") == 0);
293
val = info->getPublicKey();
294
} else if (curr_field->field_name[1] == 'r') {
295
ASSERT(strcmp(curr_field->field_name, "PrivateKey") == 0);
296
val = info->getPrivateKey();
299
for (i = 0; (i < MAX_PASSWORD) && (i < (int32_t)strlen(val)); i++) passwd[i] = '*';
306
curr_field->store(val, strlen(val), &UTF8_CHARSET);
307
setNotNullInRecord(curr_field, buf);
313
curr_field->ptr = save;
316
table->write_set = save_write_set;
321
void MSCloudTable::seqScanPos(unsigned char *pos )
323
int32_t index = iCloudIndex -1;
325
index = 0; // This is probably an error condition.
327
mi_int4store(pos, index);
330
void MSCloudTable::seqScanRead(unsigned char *pos , char *buf)
332
iCloudIndex = mi_uint4korr(pos);
336
void MSCloudTable::updateRow(char *old_data, char *new_data)
338
uint32_t n_id, o_id, o_indx, n_indx;
339
const char *realPrivKey;
340
String server, bucket, pubKey, privKey;
341
String o_server, o_bucket, o_pubKey, o_privKey;
346
getFieldValue(new_data, 0, &n_id);
347
getFieldValue(new_data, 1, &server);
348
getFieldValue(new_data, 2, &bucket);
349
getFieldValue(new_data, 3, &pubKey);
350
getFieldValue(new_data, 4, &privKey);
352
getFieldValue(old_data, 0, &o_id);
353
getFieldValue(old_data, 1, &o_server);
354
getFieldValue(old_data, 2, &o_bucket);
355
getFieldValue(old_data, 3, &o_pubKey);
356
getFieldValue(old_data, 4, &o_privKey);
358
// The cloud ID must be unique
359
if ((o_id != n_id) && MSCloudInfo::gCloudInfo->get(n_id)) {
360
CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to update a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
363
// The private key is masked when returned to the caller, so
364
// unless the caller has updated it we need to get the real
365
// private key from the old record.
366
if (strcmp(privKey.c_ptr(), o_privKey.c_ptr()))
367
realPrivKey = privKey.c_ptr();
369
info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->get(o_id); // unreference pointer
370
realPrivKey = info->getPrivateKey();
373
new_(info, MSCloudInfo( n_id, server.c_ptr(), bucket.c_ptr(), pubKey.c_ptr(), realPrivKey));
376
o_indx = MSCloudInfo::gCloudInfo->getIndex(o_id);
378
MSCloudInfo::gCloudInfo->remove(o_id);
380
MSCloudInfo::gCloudInfo->set(n_id, info);
381
n_indx = MSCloudInfo::gCloudInfo->getIndex(n_id);
383
// Adjust the current position in the array if required.
384
if (o_indx < n_indx )
387
saveTable(RETAIN(myShare->mySysDatabase));
391
void MSCloudTable::insertRow(char *data)
394
String server, bucket, pubKey, privKey;
399
getFieldValue(data, 0, &ref_id);
401
// The cloud ID must be unique
402
if (ref_id && MSCloudInfo::gCloudInfo->get(ref_id)) {
403
CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to insert a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
406
getFieldValue(data, 1, &server);
407
getFieldValue(data, 2, &bucket);
408
getFieldValue(data, 3, &pubKey);
409
getFieldValue(data, 4, &privKey);
412
ref_id = MSCloudInfo::gMaxInfoRef++;
413
else if (ref_id >= MSCloudInfo::gMaxInfoRef)
414
MSCloudInfo::gMaxInfoRef = ref_id +1;
416
new_(info, MSCloudInfo( ref_id, server.c_ptr(), bucket.c_ptr(), pubKey.c_ptr(), privKey.c_ptr()));
417
MSCloudInfo::gCloudInfo->set(ref_id, info);
419
saveTable(RETAIN(myShare->mySysDatabase));
423
void MSCloudTable::deleteRow(char *data)
425
uint32_t ref_id, indx;
429
getFieldValue(data, 0, &ref_id);
431
// Adjust the current position in the array if required.
432
indx = MSCloudInfo::gCloudInfo->getIndex(ref_id);
433
if (indx <= iCloudIndex)
436
MSCloudInfo::gCloudInfo->remove(ref_id);
437
saveTable(RETAIN(myShare->mySysDatabase));
441
void MSCloudTable::transferTable(MSDatabase *to_db, MSDatabase *from_db)
449
path = CSPath::newPath(getPBMSPath(RETAIN(from_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
451
if (path->exists()) {
453
bu_path = CSPath::newPath(getPBMSPath(RETAIN(to_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
454
path->copyTo(bu_path, true);
464
CSStringBuffer *MSCloudTable::dumpTable(MSDatabase *db)
468
CSStringBuffer *dump;
473
path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
477
new_(dump, CSStringBuffer(20));
480
if (path->exists()) {
484
file = path->openFile(CSFile::READONLY);
487
size = file->getEOF();
488
dump->setLength(size);
489
file->read(dump->getBuffer(0), 0, size, size);
498
void MSCloudTable::restoreTable(MSDatabase *db, const char *data, size_t size, bool reload)
506
path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
509
file = path->openFile(CSFile::CREATE | CSFile::TRUNCATE);
512
file->write(data, 0, size);
527
void MSCloudTable::removeTable(CSString *db_path)
530
char pbms_path[PATH_MAX];
535
cs_strcpy(PATH_MAX, pbms_path, db_path->getCString());
538
if (strcmp(cs_last_name_of_path(pbms_path), "pbms") != 0)
541
cs_remove_last_name_of_path(pbms_path);
543
path = getSysFile(CSString::newString(pbms_path), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);