50
50
* ENGINE CALL-IN INTERFACE
53
MYBS_API *StreamingEngines;
53
PBMS_API *StreamingEngines;
59
void BSEngine::startUp()
59
void MSEngine::startUp()
61
StreamingEngines = new MYBS_API();
61
StreamingEngines = new PBMS_API();
64
void BSEngine::shutDown()
64
void MSEngine::shutDown()
66
66
delete StreamingEngines;
69
int BSEngine::getEngineCount()
69
int MSEngine::getEngineCount()
71
MyBSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
71
PBMSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
74
74
return sh_mem->sm_list_len;
78
MyBSEnginePtr BSEngine::getEngine(int i)
78
PBMSEnginePtr MSEngine::getEngine(int i)
80
MyBSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
80
PBMSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
83
83
if (sh_mem && i>=0 && i<sh_mem->sm_list_len) {
84
84
if ((engine = sh_mem->sm_engine_list[i]))
90
void BSEngine::closeConnection(void *thd)
90
void MSEngine::closeConnection(void *thd)
92
MyBSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
92
PBMSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
95
95
if (thd && sh_mem) {
96
96
for (int i=0; i<sh_mem->sm_list_len; i++) {
97
97
if ((engine = sh_mem->sm_engine_list[i])) {
98
if (engine->bs_close_conn)
99
engine->bs_close_conn(thd);
98
if (engine->ms_close_conn)
99
engine->ms_close_conn(thd);
105
void BSEngine::closeConnections(MyBSEnginePtr engine)
105
void MSEngine::closeConnections(PBMSEnginePtr engine)
110
lock_(BSNetwork::gHandlerList);
110
lock_(MSNetwork::gHandlerList);
112
ptr = (CSThread *) BSNetwork::gHandlerList->getBack();
112
ptr = (CSThread *) MSNetwork::gHandlerList->getBack();
116
if ((thd = ((BSConnectionHandler *) ptr)->myMySQLTHD))
117
engine->bs_close_conn(thd);
116
if ((thd = ((MSConnectionHandler *) ptr)->myMySQLTHD))
117
engine->ms_close_conn(thd);
118
118
ptr = (CSThread *) ptr->getNextLink();
121
unlock_(BSNetwork::gHandlerList);
121
unlock_(MSNetwork::gHandlerList);
125
int BSEngine::getEngineList(size_t len, char *engine_list)
125
int MSEngine::getEngineList(size_t len, char *engine_list)
127
MyBSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
127
PBMSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
129
MyBSEnginePtr engine;
129
PBMSEnginePtr engine;
131
131
*engine_list = 0;
133
133
for (i=0; i<sh_mem->sm_list_len; i++) {
134
134
if ((engine = sh_mem->sm_engine_list[i])) {
135
if (engine->bs_close_conn) {
135
if (engine->ms_close_conn) {
137
137
cs_strcat(len, engine_list, ", ");
138
cs_strcat(len, engine_list, engine->bs_engine_name);
138
cs_strcat(len, engine_list, engine->ms_engine_name);
159
159
CSException::throwException(func, file, line, err, buffer, stack);
162
void BSEngine::throwEngineError(const char *func, const char *file, int line, int err, char *engine, MyBSResultPtr result)
162
void MSEngine::throwEngineError(const char *func, const char *file, int line, int err, char *engine, PBMSResultPtr result)
164
164
throwEngineError(func, file, line, err, engine, result->mr_code, result->mr_message, result->mr_stack);
167
void BSEngine::throwEngineListError(const char *func, const char *file, int line, int err, const char *message, const char *item)
167
void MSEngine::throwEngineListError(const char *func, const char *file, int line, int err, const char *message, const char *item)
169
169
char buffer[CS_EXC_MESSAGE_SIZE];
180
180
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, ", No streaming enabled engines");
181
181
else if (cnt == 1) {
182
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, ", Streaming engine: ");
182
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, ", Stream engine: ");
183
183
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, list);
186
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, ", Streaming engines: ");
186
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, ", Stream engines: ");
187
187
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, list);
189
189
CSException::throwException(func, file, line, err, buffer);
255
255
CSThread::detach(thread);
258
static int bs_set_content_len(void *stream, off_t len, MyBSResultPtr result)
258
static int ms_set_content_len(void *stream, off_t len, PBMSResultPtr result)
260
260
CSHTTPOutputStream *out;
262
262
out = (CSHTTPOutputStream *) stream;
263
263
out->setContentLength((csWord8) len);
267
static int bs_write_head(void *stream, MyBSResultPtr result)
267
static int ms_write_head(void *stream, PBMSResultPtr result)
270
270
CSHTTPOutputStream *out;
271
BSConnectionHandler *me;
271
MSConnectionHandler *me;
274
me = (BSConnectionHandler *) self;
274
me = (MSConnectionHandler *) self;
275
275
out = (CSHTTPOutputStream *) stream;
277
277
me->replyPending = false;
278
278
out->writeHead();
281
r = mybs_exception_to_result(&self->myException, result);
281
r = pbms_exception_to_result(&self->myException, result);
287
static int bs_write_stream(void *stream, void *buffer, size_t len, MyBSResultPtr result)
287
static int ms_write_stream(void *stream, void *buffer, size_t len, PBMSResultPtr result)
290
290
CSHTTPOutputStream *out;
295
295
out->write((char *) buffer, len);
298
r = mybs_exception_to_result(&self->myException, result);
298
r = pbms_exception_to_result(&self->myException, result);
304
static int bs_open_table(void **open_table, char *table_path, MyBSResultPtr result)
304
static int ms_open_table(void **open_table, char *table_path, PBMSResultPtr result)
310
if ((err = mybs_enter_conn_no_thd(&self, result, NULL)))
310
if ((err = pbms_enter_conn_no_thd(&self, result, NULL)))
316
otab = BSTableList::getOpenTableByStr(table_path, true);
316
otab = MSTableList::getOpenTableByStr(table_path, true);
317
317
*open_table = otab;
320
if (self->myException.getErrorCode() == BS_ERR_DATABASE_DELETED)
320
if (self->myException.getErrorCode() == MS_ERR_DATABASE_DELETED)
322
err = mybs_exception_to_result(&self->myException, result);
322
err = pbms_exception_to_result(&self->myException, result);
328
static void bs_close_table(void *open_table)
328
static void ms_close_table(void *open_table)
332
otab = (BSOpenTable *) open_table;
332
otab = (MSOpenTable *) open_table;
333
333
otab->unUseBlobs();
334
334
otab->returnToPool();
338
static int bs_use_blob(void *open_table, char **ret_blob_url, char *blob_url, unsigned short col_index, MyBSResultPtr result)
338
static int ms_use_blob(void *open_table, char **ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result)
345
otab = (BSOpenTable *) open_table;
345
otab = (MSOpenTable *) open_table;
347
if (bs_parse_blob_url(&blob, blob_url, false))
347
if (ms_parse_blob_url(&blob, blob_url, false))
348
348
otab->useBlob(blob.bu_type, blob.bu_database, blob.bu_tab_id, blob.bu_blob_id, blob.bu_auth_code, col_index, ret_blob_url);
350
350
char buffer[CS_EXC_MESSAGE_SIZE];
352
352
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
353
353
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
354
CSException::throwException(CS_CONTEXT, BS_ERR_INCORRECT_URL, buffer);
354
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
358
err = mybs_exception_to_result(&self->myException, result);
358
err = pbms_exception_to_result(&self->myException, result);
364
static int bs_retain_blobs(void *open_table, MyBSEngineRefPtr eng_ref, MyBSResultPtr result)
364
static int ms_retain_blobs(void *open_table, PBMSEngineRefPtr eng_ref, PBMSResultPtr result)
371
otab = (BSOpenTable *) open_table;
371
otab = (MSOpenTable *) open_table;
372
372
otab->retainReferences(eng_ref);
375
err = mybs_exception_to_result(&self->myException, result);
375
err = pbms_exception_to_result(&self->myException, result);
381
static int bs_release_blob(void *open_table, char *blob_url, unsigned short col_index, MyBSEngineRefPtr eng_ref, MyBSResultPtr result)
381
static int ms_release_blob(void *open_table, char *blob_url, unsigned short col_index, PBMSEngineRefPtr eng_ref, PBMSResultPtr result)
389
otab = (BSOpenTable *) open_table;
390
if (bs_parse_blob_url(&blob, blob_url, true)) {
389
otab = (MSOpenTable *) open_table;
390
if (ms_parse_blob_url(&blob, blob_url, true)) {
391
391
otab->openForReading();
392
392
if (otab->getTableID() == blob.bu_tab_id)
393
393
otab->releaseReference(blob.bu_blob_id, blob.bu_auth_code, col_index, eng_ref);
405
405
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
406
406
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
407
CSException::throwException(CS_CONTEXT, BS_ERR_INCORRECT_URL, buffer);
407
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
411
err = mybs_exception_to_result(&self->myException, result);
411
err = pbms_exception_to_result(&self->myException, result);
417
static int bs_drop_table(const char *table_path, MyBSResultPtr result)
417
static int ms_drop_table(const char *table_path, PBMSResultPtr result)
422
if ((err = mybs_enter_conn_no_thd(&self, result, NULL)))
422
if ((err = pbms_enter_conn_no_thd(&self, result, NULL)))
426
426
CSPath *from_path;
429
BSOpenTablePool *tab_pool;
431
BSDatabase *database_to_drop = NULL;
429
MSOpenTablePool *tab_pool;
431
MSDatabase *database_to_drop = NULL;
433
433
// getOpenTable() must 'create' the table if it doesn't exist because if it doesn't
434
434
// it will fail even if the table exists but was not open. We need an open table to delete it.
435
if (!(otab = BSTableList::getOpenTableByStr(table_path, true)))
435
if (!(otab = MSTableList::getOpenTableByStr(table_path, true)))
480
err = mybs_exception_to_result(&self->myException, result);
480
err = pbms_exception_to_result(&self->myException, result);
488
static int be_rename_table(const char *from_table, const char *to_table, MyBSResultPtr result)
488
static int ms_rename_table(const char *from_table, const char *to_table, PBMSResultPtr result)
493
if ((err = mybs_enter_conn_no_thd(&self, result, NULL)))
493
if ((err = pbms_enter_conn_no_thd(&self, result, NULL)))
498
498
char from_path_buffer[PATH_MAX];
499
499
char to_path_buffer[PATH_MAX];
500
500
CSPath *from_path;
502
BSOpenTablePool *tab_pool;
502
MSOpenTablePool *tab_pool;
505
505
/* Check the databases: */
506
506
cs_strcpy(PATH_MAX, from_path_buffer, from_table);