1
/* Copyright (c) 2007 SNAP Innovation GmbH
3
* BLOB Streaming for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
33
#include "CSStrUtil.h"
34
#include "CSHTTPStream.h"
36
#include "BSConnectionHandler.h"
37
#include "BSNetwork.h"
38
#include "BSOpenTable.h"
44
BSConnectionHandler::BSConnectionHandler(CSThreadList *list):
47
amWaitingToListen(false),
58
BSConnectionHandler::~BSConnectionHandler()
60
void *thd = myMySQLTHD;
64
BSEngine::closeConnection(myMySQLTHD);
67
bs_my_destroy_thread(thd);
71
void BSConnectionHandler::close()
77
BSConnectionHandler *BSConnectionHandler::newHandler(CSThreadList *list)
79
return new BSConnectionHandler(list);
82
/* Return false if not connection was openned, and the thread must quit. */
83
bool BSConnectionHandler::openStream()
90
if (!(sock = BSNetwork::openConnection(this)))
93
in = sock->getInputStream();
94
in = CSBufferedInputStream::newStream(in);
95
iInputStream = CSHTTPInputStream::newStream(in);
97
out = sock->getOutputStream();
98
out = CSBufferedOutputStream::newStream(out);
99
iOutputStream = CSHTTPOutputStream::newStream(out);
104
int BSConnectionHandler::getHTTPStatus(int err)
109
case BS_OK: code = 200; break;
110
case BS_ERR_ENGINE: code = 500; break;
111
case BS_ERR_UNKNOWN_TABLE: code = 404; break;
112
case BS_ERR_UNKNOWN_DB: code = 404; break;
113
case BS_ERR_NOT_FOUND: code = 404; break;
114
case BS_ERR_REMOVING_REPO: code - 404; break;
115
case BS_ERR_TABLE_LOCKED: code = 412; break; // Precondition Failed
116
case BS_ERR_INCORRECT_URL: code = 404; break;
117
case BS_ERR_AUTH_FAILED: code = 403; break; // Forbidden
118
default: code = 500; break;
123
void BSConnectionHandler::writeException(const char *qualifier)
128
iOutputStream->clearHeaders();
129
iOutputStream->clearBody();
130
code = getHTTPStatus(myException.getErrorCode());
131
iOutputStream->setStatus(code);
132
iOutputStream->appendBody("<HTML><HEAD><TITLE>HTTP Error ");
133
iOutputStream->appendBody(code);
134
iOutputStream->appendBody(": ");
135
iOutputStream->appendBody(CSHTTPOutputStream::getReasonPhrase(code));
136
iOutputStream->appendBody("</TITLE></HEAD>");
137
iOutputStream->appendBody("<BODY><H2>HTTP Error ");
138
iOutputStream->appendBody(code);
139
iOutputStream->appendBody(": ");
140
iOutputStream->appendBody(CSHTTPOutputStream::getReasonPhrase(code));
141
iOutputStream->appendBody("</H2>");
143
iOutputStream->appendBody(qualifier);
144
iOutputStream->appendBody("<P><B>");
145
iOutputStream->appendBody(myException.getMessage());
146
iOutputStream->appendBody("</B></P><PRE>");
147
iOutputStream->appendBody(myException.getStackTrace());
148
iOutputStream->appendBody("</PRE><P><font size=-1>");
149
iOutputStream->appendBody("MySQL ");
150
iOutputStream->appendBody(bs_my_version());
151
iOutputStream->appendBody(", MyBS ");
152
iOutputStream->appendBody(bs_version());
153
iOutputStream->appendBody("<br>Copyright © 2007, SNAP Innovation GmbH</font></P></BODY></HTML>");
155
replyPending = false;
156
iOutputStream->writeHead();
157
iOutputStream->writeBody();
158
iOutputStream->flush();
162
void BSConnectionHandler::writeException()
164
writeException(NULL);
167
void BSConnectionHandler::writeResult(const char *func, const char *file, int line, int err, char *engine, MyBSResultPtr result)
170
char buffer[CS_EXC_MESSAGE_SIZE];
174
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "<P>");
175
if (result->mr_code) {
176
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Error ");
177
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, code);
179
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, " from ");
180
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, engine);
185
char engine_list[400];
187
cnt = BSEngine::getEngineList(400, engine_list);
189
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, "No streaming enabled engines");
191
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, "Streaming engine ");
192
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, engine_list);
195
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, "Streaming engines ");
196
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, engine_list);
199
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, ":</P>");
200
myException.initException(func, file, line, err, result->mr_message);
201
writeException(buffer);
205
void BSConnectionHandler::closeStream()
217
iOutputStream->release();
218
iOutputStream = NULL;
221
iInputStream->release();
227
void BSConnectionHandler::parseRequestURI()
229
CSString *uri = iInputStream->getRequestURI();
233
pos = uri->locate(0, "://");
234
if (pos < uri->length())
237
pos = uri->skip(0, '/');
240
end = uri->locate(pos, '/');
241
end = uri->locate(uri->nextPos(end), '/');
242
iTableURI = uri->substr(pos, end - pos);
245
pos = uri->nextPos(end);
246
end = uri->locate(pos, '/');
247
iColumn = uri->substr(pos, end - pos);
250
pos = uri->nextPos(end);
251
end = uri->locate(pos, '/');
252
iCondition = uri->substr(pos);
255
void BSConnectionHandler::freeRequestURI()
258
iTableURI->release();
264
iCondition->release();
268
void BSConnectionHandler::writeFile(CSString *file_path)
272
CSInputStream *file_input;
278
path = CSPath::newPath(file_path);
281
if (path->exists()) {
282
file = path->openFile(CSFile::READONLY);
285
file_input = file->getInputStream();
288
iOutputStream->setContentLength((csWord8) path->getSize());
289
replyPending = false;
290
iOutputStream->writeHead();
291
CSStream::pipe(iOutputStream, file_input);
293
release_(file_input);
297
myException.initFileError(CS_CONTEXT, path->getCString(), ENOENT);
306
* Request URI: /<database>/<table>/<column>/<column>=<value>&<column>=<value>&...
308
void BSConnectionHandler::handleGet()
310
char *bad_url_comment = "Incorrect URL: ";
314
self->myException.setErrorCode(0);
316
iOutputStream->clearHeaders();
317
iOutputStream->clearBody();
318
iOutputStream->setStatus(200);
322
if (iTableURI->length() == 0)
325
if (iColumn->length() == 0) {
328
if (bs_parse_blob_url(&blob, iTableURI->getCString(), false)) {
329
if (blob.bu_type == BS_URL_TYPE_BLOB || blob.bu_type == BS_URL_NEW_TYPE_BLOB) {
330
otab = BSTableList::getOpenTable(blob.bu_database, blob.bu_tab_id, false);
332
otab->sendRepoBlob(blob.bu_blob_id, blob.bu_auth_code, iOutputStream);
336
BSRepoFile *repo_file;
338
if (!(otab = BSTableList::getOpenTable(blob.bu_database, true))) {
339
char buffer[CS_EXC_MESSAGE_SIZE];
341
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database: ");
342
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob.bu_database);
343
CSException::throwException(CS_CONTEXT, BS_ERR_UNKNOWN_DB, buffer);
346
repo_file = otab->getDB()->getRepoFileFromPool(blob.bu_tab_id, false);
347
frompool_(repo_file);
348
repo_file->sendBlob(otab, blob.bu_blob_id, blob.bu_auth_code, iOutputStream);
349
backtopool_(repo_file);
355
if (iTableURI->equals("favicon.ico")) {
356
writeFile(iTableURI);
363
if (iCondition->length() == 0)
366
if (!mybs_field_ref_enabled())
367
CSException::throwException(CS_CONTEXT, BS_ERR_AUTH_FAILED, "BLOB field references not permitted. Use \"set global mybs_field_references=TRUE\" or use the command line option --mybs-field-references to enable BLOB field references.");
369
if (!(otab = BSTableList::lockTable(iTableURI)))
372
otab->sendEngineBlob(iColumn, iCondition, iOutputStream);
373
iOutputStream->flush();
376
BSTableList::unlockTable(otab, self->myException.getErrorCode() != 0);
384
char buffer[CS_EXC_MESSAGE_SIZE];
386
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, bad_url_comment);
387
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, iInputStream->getRequestURI()->getCString());
388
CSException::throwException(CS_CONTEXT, BS_ERR_INCORRECT_URL, buffer);
392
void BSConnectionHandler::handlePut()
397
self->myException.setErrorCode(0);
399
iOutputStream->clearHeaders();
400
iOutputStream->clearBody();
401
iOutputStream->setStatus(200);
405
if (iTableURI->length() == 0 ||
406
iColumn->length() != 0 ||
407
iCondition->length() != 0) {
408
char buffer[CS_EXC_MESSAGE_SIZE];
410
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
411
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, iInputStream->getRequestURI()->getCString());
412
CSException::throwException(CS_CONTEXT, BS_ERR_INCORRECT_URL, buffer);
415
if (!(otab = BSTableList::lockTable(iTableURI)))
422
blob_len = iInputStream->getContentLength();
424
otab->createBlob(&bh, iInputStream->getHeaderValue("content-type"), blob_len, iInputStream);
425
handle_len = strlen(bh.bu_data);
426
iOutputStream->setContentLength(handle_len);
428
replyPending = false;
429
iOutputStream->writeHead();
430
iOutputStream->write(bh.bu_data, handle_len);
431
iOutputStream->flush();
433
/* After reply, sync the repository: */
434
/* This is done when the connection is closed.
435
if (otab->myWriteRepoFile)
436
otab->myWriteRepoFile->sync();
440
BSTableList::unlockTable(otab, self->myException.getErrorCode() != 0);
448
void BSConnectionHandler::serviceConnection()
451
bool threadStarted = false;
454
iInputStream->readHead();
455
if (!(method = iInputStream->getMethod()))
457
if (!threadStarted && iInputStream->keepAlive()) {
458
/* Start another service handler if no threads
459
* are waiting to listen!
461
threadStarted = true;
462
if (!BSNetwork::gWaitingToListen)
463
BSNetwork::startConnectionHandler();
466
if (strcmp(method, "GET") == 0)
468
else if (strcmp(method, "PUT") == 0 ||
469
strcmp(method, "POST") == 0)
472
CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_METHOD, method);
476
bool BSConnectionHandler::initialize()
478
myMySQLTHD = bs_my_create_thread();
483
* Return false if no connection this thread should quit!
485
bool BSConnectionHandler::doWork()
489
/* Open a connection: */
495
/* Do the work for the connection: */
498
/* Close the connection: */
504
void *BSConnectionHandler::finalize()
508
/* Close the stream, if it was openned. */
511
/* Free the MySQL thread: */
512
if ((thd = myMySQLTHD)) {
514
BSEngine::closeConnection(thd);
515
bs_my_destroy_thread(thd);
520
bool BSConnectionHandler::handleException()
523
/* Start another handler if required: */
524
if (!BSNetwork::gWaitingToListen)
525
BSNetwork::startConnectionHandler();
528
CSDaemon::handleException();