1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
31
#include <drizzled/common.h>
32
#include <drizzled/session.h>
36
#include "cslib/CSConfig.h"
37
#include "cslib/CSGlobal.h"
38
#include "cslib/CSStrUtil.h"
39
#include "cslib/CSThread.h"
42
#define PBMS_API pbms_internal
46
#include "engine_ms.h"
47
#include "connection_handler_ms.h"
48
#include "open_table_ms.h"
49
#include "network_ms.h"
50
#include "transaction_ms.h"
59
extern CSThread *pbms_getMySelf(THD *thd);
60
extern void pbms_setMySelf(THD *thd, CSThread *self);
65
* ---------------------------------------------------------------
66
* ENGINE CALL-IN INTERFACE
69
static PBMS_API *StreamingEngines;
70
// If PBMS support is built directly into the mysql/drizzle handler code
71
// then calls from all other handlers are ignored.
72
static bool have_handler_support = false;
75
* ---------------------------------------------------------------
76
* ENGINE CALLBACK INTERFACE
79
static void ms_register_engine(PBMSEnginePtr engine)
81
if (engine->ms_internal)
82
have_handler_support = true;
85
static void ms_deregister_engine(PBMSEnginePtr engine)
90
static int ms_create_blob(bool internal, const char *db_name, const char *tab_name, char *blob, size_t blob_len, PBMSBlobURLPtr blob_url, PBMSResultPtr result)
92
if (have_handler_support && !internal) {
93
MSEngine::errorResult(CS_CONTEXT, MS_ERR_INVALID_OPERATION, "Invalid ms_create_blob() call", result);
97
return MSEngine::createBlob(db_name, tab_name, blob, blob_len, blob_url, result);
101
* ms_use_blob() may or may not alter the blob url depending on the type of URL and if the BLOB is in a
102
* different database or not. It may also add a BLOB reference to the BLOB table log if the BLOB was from
103
* a different table or no table was specified when the BLOB was uploaded.
105
* There is no need to undo this function because it will be undone automaticly if the BLOB is not retained.
107
static int ms_retain_blob(bool internal, const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result)
109
if (have_handler_support && !internal) {
110
cs_strcpy(PBMS_BLOB_URL_SIZE, ret_blob_url->bu_data, blob_url); // This should have already been converted.
114
return MSEngine::referenceBlob(db_name, tab_name, ret_blob_url, blob_url, col_index, result);
117
static int ms_release_blob(bool internal, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result)
120
if (have_handler_support && !internal)
123
return MSEngine::dereferenceBlob(db_name, tab_name, blob_url, result);
126
static int ms_drop_table(bool internal, const char *db_name, const char *tab_name, PBMSResultPtr result)
128
if (have_handler_support && !internal)
131
return MSEngine::dropTable(db_name, tab_name, result);
134
static int ms_rename_table(bool internal, const char * db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result)
136
if (have_handler_support && !internal)
139
return MSEngine::renameTable(db_name, from_table, to_db, to_table, result);
142
static void ms_completed(bool internal, bool ok)
144
if (have_handler_support && !internal)
147
MSEngine::callCompleted(ok);
150
PBMSCallbacksRec engine_callbacks = {
153
ms_deregister_engine,
162
// =============================
163
int MSEngine::startUp(PBMSResultPtr result)
167
StreamingEngines = new PBMS_API();
168
err = StreamingEngines->PBMSStartup(&engine_callbacks, result);
170
delete StreamingEngines;
171
else { // Register the PBMS enabled engines the startup before PBMS
172
PBMSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
173
PBMSEnginePtr engine;
175
for (int i=0; i<sh_mem->sm_list_len; i++) {
176
if ((engine = sh_mem->sm_engine_list[i]))
177
ms_register_engine(engine);
183
void MSEngine::shutDown()
185
StreamingEngines->PBMSShutdown();
187
delete StreamingEngines;
190
const PBMSEnginePtr MSEngine::getEngineInfoAt(int indx)
192
PBMSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
193
PBMSEnginePtr engine = NULL;
196
for (int i=0; i<sh_mem->sm_list_len; i++) {
197
if ((engine = sh_mem->sm_engine_list[i])) {
210
int32_t MSEngine::createBlob(const char *db_name, const char *tab_name, char *blob, size_t blob_len, PBMSBlobURLPtr blob_url, PBMSResultPtr result)
216
CSInputStream *i_stream = NULL;
218
CLOBBER_PROTECT(err);
220
if ((err = enterConnectionNoThd(&self, result)))
225
otab = openTable(db_name, tab_name, true);
228
if (!otab->getDB()->isRecovering()) {
229
i_stream = CSMemoryInputStream::newStream((unsigned char *)blob, blob_len);
230
otab->createBlob(blob_url, blob_len, NULL, 0, i_stream);
232
CSException::throwException(CS_CONTEXT, MS_ERR_RECOVERY_IN_PROGRESS, "Cannot create BLOBs during repository recovery.");
237
err = exceptionToResult(&self->myException, result);
244
int32_t MSEngine::referenceBlob(const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, uint16_t col_index, PBMSResultPtr result)
252
CLOBBER_PROTECT(err);
254
if ((err = enterConnectionNoThd(&self, result)))
260
if (! PBMSBlobURLTools::couldBeURL(blob_url, &blob)){
261
char buffer[CS_EXC_MESSAGE_SIZE];
263
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
264
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
265
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
268
otab = openTable(db_name, tab_name, true);
271
otab->useBlob(blob.bu_type, blob.bu_db_id, blob.bu_tab_id, blob.bu_blob_id, blob.bu_auth_code, col_index, blob.bu_blob_size, blob.bu_blob_ref_id, ret_blob_url);
276
err = exceptionToResult(&self->myException, result);
283
int32_t MSEngine::dereferenceBlob(const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result)
290
CLOBBER_PROTECT(err);
292
if ((err = enterConnectionNoThd(&self, result)))
297
if (! PBMSBlobURLTools::couldBeURL(blob_url, &blob)){
298
char buffer[CS_EXC_MESSAGE_SIZE];
300
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
301
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
302
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
305
otab = openTable(db_name, tab_name, true);
307
if (!otab->getDB()->isRecovering()) {
308
if (otab->getTableID() == blob.bu_tab_id)
309
otab->releaseReference(blob.bu_blob_id, blob.bu_blob_ref_id);
311
char buffer[CS_EXC_MESSAGE_SIZE];
313
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect table ID: ");
314
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
315
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
319
char buffer[CS_EXC_MESSAGE_SIZE];
321
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
322
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
323
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
329
err = exceptionToResult(&self->myException, result);
335
int32_t MSEngine::dropDatabase(const char *db_name, PBMSResultPtr result)
340
CLOBBER_PROTECT(err);
342
if ((err = enterConnectionNoThd(&self, result)))
347
MSDatabase::dropDatabase(db_name);
350
err = exceptionToResult(&self->myException, result);
356
typedef struct UnDoInfo {
358
CSString *udo_toDatabaseName;
359
CSString *udo_fromDatabaseName;
360
CSString *udo_OldName;
361
CSString *udo_NewName;
362
} UnDoInfoRec, *UnDoInfoPtr;
364
int32_t MSEngine::dropTable(const char *db_name, const char *tab_name, PBMSResultPtr result)
369
CLOBBER_PROTECT(err);
371
if ((err = enterConnectionNoThd(&self, result)))
380
MSOpenTablePool *tab_pool;
382
UnDoInfoPtr undo_info = NULL;
384
otab = openTable(db_name, tab_name, false);
388
// If we are recovering do not delete the table.
389
// It is normal for MySQL recovery scripts to delete any table they aare about to
390
// recover and then recreate it. If this is done after the repository has been recovered
391
// then this would delete all the recovered BLOBs in the table.
392
if (otab->getDB()->isRecovering()) {
393
otab->returnToPool();
399
// Before dropping the table the table ref file is renamed so that
400
// it is out of the way incase a new table is created before the
401
// old one is cleaned up.
403
old_path = otab->getDBTable()->getTableFile();
406
new_path = otab->getDBTable()->getTableFile(tab_name, true);
408
// Rearrage the object stack to pop the otab object
416
tab = otab->getDBTable();
420
tab_pool = MSTableList::lockTablePoolForDeletion(otab);
423
if (old_path->exists())
424
old_path->move(new_path);
425
tab->myDatabase->dropTable(RETAIN(tab));
427
/* Add the table to the temp delete list if we are not recovering... */
428
tab->prepareToDelete();
430
backtopool_(tab_pool); // The will unlock and close the table pool freeing all tables in it.
431
pop_(tab); // Returning the pool will have released this. (YUK!)
436
undo_info = (UnDoInfoPtr) cs_malloc(sizeof(UnDoInfoRec));
438
undo_info->udo_WasRename = false;
439
self->myInfo = undo_info;
446
err = exceptionToResult(&self->myException, result);
455
static void completeDeleteTable(UnDoInfoPtr info, bool ok)
457
// TO DO: figure out a way to undo the delete.
460
CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "Cannot undo delete table.");
464
bool MSEngine::renameTable(const char *from_db_name, const char *from_table, const char *to_db_name, const char *to_table)
469
MSOpenTablePool *tab_pool;
474
if (strcmp(to_db_name, from_db_name) != 0) {
475
CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "Cannot rename tables containing BLOBs across databases (yet). Sorry!");
478
otab = openTable(from_db_name, from_table, false);
484
if (otab->getDB()->isRecovering())
485
CSException::throwException(CS_CONTEXT, MS_ERR_RECOVERY_IN_PROGRESS, "Cannot rename tables during repository recovery.");
487
from_path = otab->getDBTable()->getTableFile();
490
to_path = otab->getDBTable()->getTableFile(to_table, false);
492
// Rearrage the object stack to pop the otab object
500
otab->openForReading();
501
tab = otab->getDBTable();
506
tab_pool = MSTableList::lockTablePoolForDeletion(otab);
509
from_path->move(to_path);
510
tab->myDatabase->renameTable(tab, to_table);
512
backtopool_(tab_pool); // The will unlock and close the table pool freeing all tables in it.
513
pop_(tab); // Returning the pool will have released this. (YUK!)
521
int32_t MSEngine::renameTable(const char *from_db_name, const char *from_table, const char *to_db_name, const char *to_table, PBMSResultPtr result)
526
CLOBBER_PROTECT(err);
528
if ((err = enterConnectionNoThd(&self, result)))
533
UnDoInfoPtr undo_info = (UnDoInfoPtr) cs_malloc(sizeof(UnDoInfoRec));
534
push_ptr_(undo_info);
536
undo_info->udo_WasRename = true;
537
if (renameTable(from_db_name, from_table, to_db_name, to_table)) {
538
undo_info->udo_fromDatabaseName = CSString::newString(from_db_name);
539
push_(undo_info->udo_fromDatabaseName);
541
undo_info->udo_toDatabaseName = CSString::newString(to_db_name);
542
push_(undo_info->udo_toDatabaseName);
544
undo_info->udo_OldName = CSString::newString(from_table);
545
push_(undo_info->udo_OldName);
547
undo_info->udo_NewName = CSString::newString(to_table);
549
pop_(undo_info->udo_OldName);
550
pop_(undo_info->udo_toDatabaseName);
551
pop_(undo_info->udo_fromDatabaseName);
553
undo_info->udo_fromDatabaseName = undo_info->udo_toDatabaseName = undo_info->udo_OldName = undo_info->udo_NewName = NULL;
555
self->myInfo = undo_info;
559
err = exceptionToResult(&self->myException, result);
568
void MSEngine::completeRenameTable(UnDoInfoPtr info, bool ok)
570
// Swap the paths around here to revers the rename.
571
CSString *from_db_name= info->udo_toDatabaseName;
572
CSString *to_db_name= info->udo_fromDatabaseName;
573
CSString *from_table= info->udo_NewName;
574
CSString *to_table= info->udo_OldName;
585
renameTable(from_db_name->getCString(), from_table->getCString(), to_db_name->getCString(), to_table->getCString());
588
release_(to_db_name);
589
release_(from_table);
590
release_(from_db_name);
595
void MSEngine::callCompleted(bool ok)
598
PBMSResultRec result;
600
if (enterConnectionNoThd(&self, &result))
604
UnDoInfoPtr info = (UnDoInfoPtr) self->myInfo;
605
if (info->udo_WasRename)
606
completeRenameTable(info, ok);
608
completeDeleteTable(info, ok);
612
} else if (self->myTID && (self->myIsAutoCommit || !ok)) {
616
MSTransactionManager::commit();
617
else if (self->myIsAutoCommit)
618
MSTransactionManager::rollback();
620
MSTransactionManager::rollbackToPosition(self->myStartStmt); // Rollback the last logical statement.
623
self->logException();
629
self->myStartStmt = self->myStmtCount;
633
MSOpenTable *MSEngine::openTable(const char *db_name, const char *tab_name, bool create)
635
MSOpenTable *otab = NULL;
636
uint32_t db_id, tab_id;
639
if ( MSDatabase::convertTableAndDatabaseToIDs(db_name, tab_name, &db_id, &tab_id, create))
640
otab = MSTableList::getOpenTableByID(db_id, tab_id);
646
bool MSEngine::couldBeURL(const char *blob_url, size_t length)
649
return PBMSBlobURLTools::couldBeURL(blob_url, length, &blob);
653
int MSEngine::exceptionToResult(CSException *e, PBMSResultPtr result)
655
const char *context, *trace;
657
result->mr_code = e->getErrorCode();
658
cs_strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, e->getMessage());
659
context = e->getContext();
660
trace = e->getStackTrace();
661
if (context && *context) {
662
cs_strcpy(MS_RESULT_STACK_SIZE, result->mr_stack, context);
664
cs_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, "\n");
667
*result->mr_stack = 0;
669
cs_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, trace);
670
return MS_ERR_ENGINE;
674
int MSEngine::errorResult(const char *func, const char *file, int line, int err, const char *message, PBMSResultPtr result)
678
e.initException(func, file, line, err, message);
679
return exceptionToResult(&e, result);
683
int MSEngine::osErrorResult(const char *func, const char *file, int line, int err, PBMSResultPtr result)
687
e.initOSError(func, file, line, err);
688
return MSEngine::exceptionToResult(&e, result);
692
int MSEngine::enterConnection(THD *thd, CSThread **r_self, PBMSResultPtr result, bool doCreate)
694
CSThread *self = NULL;
697
// In drizzle there is no 1:1 relationship between pthreads and sessions
698
// so we must always get it from the session handle NOT the current pthread.
699
self = CSThread::getSelf();
703
if (!(self = pbms_getMySelf(thd))) {
705
return MS_ERR_NOT_FOUND;
707
if (!(self = CSThread::newCSThread()))
708
return osErrorResult(CS_CONTEXT, ENOMEM, result);
709
if (!CSThread::attach(self))
710
return MSEngine::exceptionToResult(&self->myException, result);
711
pbms_setMySelf(thd, self);
713
if (!CSThread::setSelf(self))
714
return MSEngine::exceptionToResult(&self->myException, result);
718
return MS_ERR_NOT_FOUND;
720
if (!(self = CSThread::newCSThread()))
721
return osErrorResult(CS_CONTEXT, ENOMEM, result);
722
if (!CSThread::attach(self))
723
return MSEngine::exceptionToResult(&self->myException, result);
732
int MSEngine::enterConnectionNoThd(CSThread **r_self, PBMSResultPtr result)
734
return enterConnection(current_thd, r_self, result, true);
738
void MSEngine::exitConnection()
740
THD *thd = (THD *) current_thd;
743
self = CSThread::getSelf();
744
if (self && self->pbms_api_owner)
749
CSThread::setSelf(NULL);
751
self = CSThread::getSelf();
752
CSThread::detach(self);
757
void MSEngine::closeConnection(THD* thd)
761
self = CSThread::getSelf();
762
if (self && self->pbms_api_owner)
766
if ((self = pbms_getMySelf(thd))) {
767
pbms_setMySelf(thd, NULL);
768
CSThread::setSelf(self);
769
CSThread::detach(self);
773
self = CSThread::getSelf();
774
CSThread::detach(self);