~pbms-core/pbms/5.11-beta

« back to all changes in this revision

Viewing changes to mybs/src/BSConnectionHandler.cc

  • Committer: paul-mccullagh
  • Date: 2008-03-26 11:35:17 UTC
  • Revision ID: paul-mccullagh-afb1610c21464a577ae428d72fc725eb986c05a5
Initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2007 SNAP Innovation GmbH
 
2
 *
 
3
 * BLOB Streaming for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Paul McCullagh
 
20
 *
 
21
 * 2007-05-25
 
22
 *
 
23
 * H&G2JCtL
 
24
 *
 
25
 * Network interface.
 
26
 *
 
27
 */
 
28
 
 
29
#include "CSConfig.h"
 
30
 
 
31
#include "CSGlobal.h"
 
32
#include "CSSocket.h"
 
33
#include "CSStrUtil.h"
 
34
#include "CSHTTPStream.h"
 
35
 
 
36
#include "BSConnectionHandler.h"
 
37
#include "BSNetwork.h"
 
38
#include "BSOpenTable.h"
 
39
#include "BSUtil.h"
 
40
#include "BSEngine.h"
 
41
 
 
42
#include "bs_mysql.h"
 
43
 
 
44
BSConnectionHandler::BSConnectionHandler(CSThreadList *list):
 
45
        CSDaemon(list),
 
46
        myMySQLTHD(NULL),
 
47
        amWaitingToListen(false),
 
48
        lastUse(0),
 
49
        iInputStream(NULL),
 
50
        iOutputStream(NULL),
 
51
        replyPending(false),
 
52
        iTableURI(NULL),
 
53
        iColumn(NULL),
 
54
        iCondition(NULL)
 
55
{
 
56
}
 
57
 
 
58
BSConnectionHandler::~BSConnectionHandler()
 
59
{
 
60
        void *thd = myMySQLTHD;
 
61
 
 
62
        myMySQLTHD = NULL;
 
63
        if (thd)
 
64
                BSEngine::closeConnection(myMySQLTHD);
 
65
        close();
 
66
        if (thd)
 
67
                bs_my_destroy_thread(thd);
 
68
}
 
69
 
 
70
 
 
71
void BSConnectionHandler::close()
 
72
{
 
73
        closeStream();
 
74
        freeRequestURI();
 
75
}
 
76
 
 
77
BSConnectionHandler *BSConnectionHandler::newHandler(CSThreadList *list)
 
78
{
 
79
        return new BSConnectionHandler(list);
 
80
}
 
81
 
 
82
/* Return false if not connection was openned, and the thread must quit. */
 
83
bool BSConnectionHandler::openStream()
 
84
{
 
85
        CSSocket                *sock;
 
86
        CSInputStream   *in;
 
87
        CSOutputStream  *out;
 
88
        
 
89
        enter_();
 
90
        if (!(sock = BSNetwork::openConnection(this)))
 
91
                return_(false);
 
92
        push_(sock);
 
93
        in = sock->getInputStream();
 
94
        in = CSBufferedInputStream::newStream(in);
 
95
        iInputStream = CSHTTPInputStream::newStream(in);
 
96
 
 
97
        out = sock->getOutputStream();
 
98
        out = CSBufferedOutputStream::newStream(out);
 
99
        iOutputStream = CSHTTPOutputStream::newStream(out);
 
100
        release_(sock);
 
101
        return_(true);
 
102
}
 
103
 
 
104
int BSConnectionHandler::getHTTPStatus(int err)
 
105
{
 
106
        int code;
 
107
 
 
108
        switch (err) {
 
109
                case BS_OK:                                     code = 200; break;
 
110
                case BS_ERR_ENGINE:                     code = 500; break;
 
111
                case BS_ERR_UNKNOWN_TABLE:      code = 404; break;
 
112
                case BS_ERR_UNKNOWN_DB:         code = 404; break;
 
113
                case BS_ERR_NOT_FOUND:          code = 404; break;
 
114
                case BS_ERR_REMOVING_REPO:      code - 404; break;
 
115
                case BS_ERR_TABLE_LOCKED:       code = 412; break; // Precondition Failed
 
116
                case BS_ERR_INCORRECT_URL:      code = 404; break;
 
117
                case BS_ERR_AUTH_FAILED:        code = 403; break; // Forbidden
 
118
                default:                                        code = 500; break;
 
119
        }
 
120
        return code;
 
121
}
 
122
 
 
123
void BSConnectionHandler::writeException(const char *qualifier)
 
124
{
 
125
        int code;
 
126
 
 
127
        enter_();
 
128
        iOutputStream->clearHeaders();
 
129
        iOutputStream->clearBody();
 
130
        code = getHTTPStatus(myException.getErrorCode());
 
131
        iOutputStream->setStatus(code);
 
132
        iOutputStream->appendBody("<HTML><HEAD><TITLE>HTTP Error ");
 
133
        iOutputStream->appendBody(code);
 
134
        iOutputStream->appendBody(": ");
 
135
        iOutputStream->appendBody(CSHTTPOutputStream::getReasonPhrase(code));
 
136
        iOutputStream->appendBody("</TITLE></HEAD>");
 
137
        iOutputStream->appendBody("<BODY><H2>HTTP Error ");
 
138
        iOutputStream->appendBody(code);
 
139
        iOutputStream->appendBody(": ");
 
140
        iOutputStream->appendBody(CSHTTPOutputStream::getReasonPhrase(code));
 
141
        iOutputStream->appendBody("</H2>");
 
142
        if (qualifier)
 
143
                iOutputStream->appendBody(qualifier);
 
144
        iOutputStream->appendBody("<P><B>");
 
145
        iOutputStream->appendBody(myException.getMessage());
 
146
        iOutputStream->appendBody("</B></P><PRE>");
 
147
        iOutputStream->appendBody(myException.getStackTrace());
 
148
        iOutputStream->appendBody("</PRE><P><font size=-1>");
 
149
        iOutputStream->appendBody("MySQL ");
 
150
        iOutputStream->appendBody(bs_my_version());
 
151
        iOutputStream->appendBody(", MyBS ");
 
152
        iOutputStream->appendBody(bs_version());
 
153
        iOutputStream->appendBody("<br>Copyright &#169; 2007, SNAP Innovation GmbH</font></P></BODY></HTML>");
 
154
 
 
155
        replyPending = false;
 
156
        iOutputStream->writeHead();
 
157
        iOutputStream->writeBody();
 
158
        iOutputStream->flush();
 
159
        exit_();
 
160
}
 
161
 
 
162
void BSConnectionHandler::writeException()
 
163
{
 
164
        writeException(NULL);
 
165
}
 
166
 
 
167
void BSConnectionHandler::writeResult(const char *func, const char *file, int line, int err, char *engine, MyBSResultPtr result)
 
168
{
 
169
        int             code;
 
170
        char    buffer[CS_EXC_MESSAGE_SIZE];
 
171
 
 
172
        enter_();
 
173
 
 
174
        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "<P>");
 
175
        if (result->mr_code) {
 
176
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Error ");
 
177
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, code);
 
178
                if (engine) {
 
179
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, " from ");
 
180
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, engine);
 
181
                }
 
182
        }
 
183
        else {
 
184
                int             cnt;
 
185
                char    engine_list[400];
 
186
 
 
187
                cnt = BSEngine::getEngineList(400, engine_list);
 
188
                if (cnt == 0)
 
189
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, "No streaming enabled engines");
 
190
                else if (cnt == 1) {
 
191
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, "Streaming engine ");
 
192
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, engine_list);
 
193
                }
 
194
                else {
 
195
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, "Streaming engines ");
 
196
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, engine_list);
 
197
                }
 
198
        }
 
199
        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, ":</P>");
 
200
        myException.initException(func, file, line, err, result->mr_message);
 
201
        writeException(buffer);
 
202
        exit_();
 
203
}
 
204
 
 
205
void BSConnectionHandler::closeStream()
 
206
{
 
207
        enter_();
 
208
        if (iOutputStream) {
 
209
                if (replyPending) {
 
210
                        try_(a) {
 
211
                                writeException();
 
212
                        }
 
213
                        catch_(a) {
 
214
                        }
 
215
                        cont_(a);
 
216
                }
 
217
                iOutputStream->release();
 
218
                iOutputStream = NULL;
 
219
        }
 
220
        if (iInputStream) {
 
221
                iInputStream->release();
 
222
                iInputStream = NULL;
 
223
        }
 
224
        exit_();
 
225
}
 
226
 
 
227
void BSConnectionHandler::parseRequestURI()
 
228
{
 
229
        CSString        *uri = iInputStream->getRequestURI();
 
230
        u_int           pos = 0, end;
 
231
 
 
232
        freeRequestURI();
 
233
        pos = uri->locate(0, "://");
 
234
        if (pos < uri->length())
 
235
                pos += 3;
 
236
        else
 
237
                pos = uri->skip(0, '/');
 
238
 
 
239
        // Table URI
 
240
        end = uri->locate(pos, '/');
 
241
        end = uri->locate(uri->nextPos(end), '/');
 
242
        iTableURI = uri->substr(pos, end - pos);
 
243
 
 
244
        // column
 
245
        pos = uri->nextPos(end);
 
246
        end = uri->locate(pos, '/');
 
247
        iColumn = uri->substr(pos, end - pos);
 
248
 
 
249
        // condition
 
250
        pos = uri->nextPos(end);
 
251
        end = uri->locate(pos, '/');
 
252
        iCondition = uri->substr(pos);
 
253
}
 
254
 
 
255
void BSConnectionHandler::freeRequestURI()
 
256
{
 
257
        if (iTableURI)
 
258
                iTableURI->release();
 
259
        iTableURI = NULL;
 
260
        if (iColumn)
 
261
                iColumn->release();
 
262
        iColumn = NULL;
 
263
        if (iCondition)
 
264
                iCondition->release();
 
265
        iCondition = NULL;
 
266
}
 
267
 
 
268
void BSConnectionHandler::writeFile(CSString *file_path)
 
269
{
 
270
        CSPath                  *path;
 
271
        CSFile                  *file;
 
272
        CSInputStream   *file_input;
 
273
 
 
274
        enter_();
 
275
        file_path->retain();
 
276
        push_(file_path);
 
277
 
 
278
        path = CSPath::newPath(file_path);
 
279
        pop_(file_path);
 
280
        push_(path);
 
281
        if (path->exists()) {
 
282
                file = path->openFile(CSFile::READONLY);
 
283
                push_(file);
 
284
 
 
285
                file_input = file->getInputStream();
 
286
                push_(file_input);
 
287
 
 
288
                iOutputStream->setContentLength((csWord8) path->getSize());
 
289
                replyPending = false;
 
290
                iOutputStream->writeHead();
 
291
                CSStream::pipe(iOutputStream, file_input);
 
292
 
 
293
                release_(file_input);
 
294
                release_(file);
 
295
        }
 
296
        else {
 
297
                myException.initFileError(CS_CONTEXT, path->getCString(), ENOENT);
 
298
                writeException();
 
299
        }
 
300
        release_(path);
 
301
 
 
302
        exit_();
 
303
}
 
304
 
 
305
/*
 
306
 * Request URI: /<database>/<table>/<column>/<column>=<value>&<column>=<value>&...
 
307
 */
 
308
void BSConnectionHandler::handleGet()
 
309
{
 
310
        char            *bad_url_comment = "Incorrect URL: ";
 
311
        BSOpenTable     *otab;
 
312
 
 
313
        enter_();
 
314
        self->myException.setErrorCode(0);
 
315
 
 
316
        iOutputStream->clearHeaders();
 
317
        iOutputStream->clearBody();
 
318
        iOutputStream->setStatus(200);
 
319
        
 
320
        parseRequestURI();
 
321
 
 
322
        if (iTableURI->length() == 0)
 
323
                goto bad_url;
 
324
        
 
325
        if (iColumn->length() == 0) {
 
326
                BSBlobURLRec blob;
 
327
 
 
328
                if (bs_parse_blob_url(&blob, iTableURI->getCString(), false)) {
 
329
                        if (blob.bu_type == BS_URL_TYPE_BLOB || blob.bu_type == BS_URL_NEW_TYPE_BLOB) {
 
330
                                otab = BSTableList::getOpenTable(blob.bu_database, blob.bu_tab_id, false);
 
331
                                frompool_(otab);
 
332
                                otab->sendRepoBlob(blob.bu_blob_id, blob.bu_auth_code, iOutputStream);
 
333
                                backtopool_(otab);
 
334
                        }
 
335
                        else {
 
336
                                BSRepoFile      *repo_file;
 
337
 
 
338
                                if (!(otab = BSTableList::getOpenTable(blob.bu_database, true))) {
 
339
                                        char buffer[CS_EXC_MESSAGE_SIZE];
 
340
 
 
341
                                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database: ");
 
342
                                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob.bu_database);
 
343
                                        CSException::throwException(CS_CONTEXT, BS_ERR_UNKNOWN_DB, buffer);
 
344
                                }
 
345
                                frompool_(otab);
 
346
                                repo_file = otab->getDB()->getRepoFileFromPool(blob.bu_tab_id, false);
 
347
                                frompool_(repo_file);
 
348
                                repo_file->sendBlob(otab, blob.bu_blob_id, blob.bu_auth_code, iOutputStream);
 
349
                                backtopool_(repo_file);
 
350
                                backtopool_(otab);
 
351
                        }
 
352
                        goto exit;
 
353
                }
 
354
 
 
355
                if (iTableURI->equals("favicon.ico")) {
 
356
                        writeFile(iTableURI);
 
357
                        goto exit;
 
358
                }
 
359
 
 
360
                goto bad_url;
 
361
        }
 
362
 
 
363
        if (iCondition->length() == 0)
 
364
                goto bad_url;
 
365
 
 
366
        if (!mybs_field_ref_enabled())
 
367
                CSException::throwException(CS_CONTEXT, BS_ERR_AUTH_FAILED, "BLOB field references not permitted. Use \"set global mybs_field_references=TRUE\" or use the command line option --mybs-field-references to enable BLOB field references.");
 
368
 
 
369
        if (!(otab = BSTableList::lockTable(iTableURI)))
 
370
                goto exit;
 
371
        try_(b) {
 
372
                otab->sendEngineBlob(iColumn, iCondition, iOutputStream);
 
373
                iOutputStream->flush();
 
374
        }
 
375
        finally_(b) {
 
376
                BSTableList::unlockTable(otab, self->myException.getErrorCode() != 0);
 
377
        }
 
378
        cont_(b);
 
379
 
 
380
        exit:
 
381
        exit_();
 
382
 
 
383
        bad_url:
 
384
        char buffer[CS_EXC_MESSAGE_SIZE];
 
385
 
 
386
        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, bad_url_comment);
 
387
        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, iInputStream->getRequestURI()->getCString());
 
388
        CSException::throwException(CS_CONTEXT, BS_ERR_INCORRECT_URL, buffer);
 
389
        exit_();
 
390
}
 
391
 
 
392
void BSConnectionHandler::handlePut()
 
393
{
 
394
        BSOpenTable *otab;
 
395
 
 
396
        enter_();
 
397
        self->myException.setErrorCode(0);
 
398
 
 
399
        iOutputStream->clearHeaders();
 
400
        iOutputStream->clearBody();
 
401
        iOutputStream->setStatus(200);
 
402
        
 
403
        parseRequestURI();
 
404
 
 
405
        if (iTableURI->length() == 0 ||
 
406
                iColumn->length() != 0 ||
 
407
                iCondition->length() != 0) {
 
408
                char buffer[CS_EXC_MESSAGE_SIZE];
 
409
 
 
410
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
 
411
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, iInputStream->getRequestURI()->getCString());
 
412
                CSException::throwException(CS_CONTEXT, BS_ERR_INCORRECT_URL, buffer);
 
413
        }
 
414
 
 
415
        if (!(otab = BSTableList::lockTable(iTableURI)))
 
416
                goto exit;
 
417
        try_(a) {
 
418
                csWord8                 blob_len;
 
419
                MyBSBlobURLRec  bh;
 
420
                size_t                  handle_len;
 
421
 
 
422
                blob_len = iInputStream->getContentLength();
 
423
 
 
424
                otab->createBlob(&bh, iInputStream->getHeaderValue("content-type"), blob_len, iInputStream);
 
425
                handle_len = strlen(bh.bu_data);
 
426
                iOutputStream->setContentLength(handle_len);
 
427
 
 
428
                replyPending = false;
 
429
                iOutputStream->writeHead();
 
430
                iOutputStream->write(bh.bu_data, handle_len);
 
431
                iOutputStream->flush();
 
432
 
 
433
                /* After reply, sync the repository: */
 
434
                /* This is done when the connection is closed.
 
435
                if (otab->myWriteRepoFile)
 
436
                        otab->myWriteRepoFile->sync();
 
437
                */
 
438
        }
 
439
        finally_(a) {
 
440
                BSTableList::unlockTable(otab, self->myException.getErrorCode() != 0);
 
441
        }
 
442
        cont_(a);
 
443
 
 
444
        exit:
 
445
        exit_();
 
446
}
 
447
 
 
448
void BSConnectionHandler::serviceConnection()
 
449
{
 
450
        const char      *method;
 
451
        bool            threadStarted = false;
 
452
 
 
453
        for (;;) {
 
454
                iInputStream->readHead();
 
455
                if (!(method = iInputStream->getMethod()))
 
456
                        break;
 
457
                if (!threadStarted && iInputStream->keepAlive()) {
 
458
                        /* Start another service handler if no threads
 
459
                         * are waiting to listen!
 
460
                         */
 
461
                        threadStarted = true;
 
462
                        if (!BSNetwork::gWaitingToListen)
 
463
                                BSNetwork::startConnectionHandler();
 
464
                }
 
465
                replyPending = true;
 
466
                if (strcmp(method, "GET") == 0)
 
467
                        handleGet();
 
468
                else if (strcmp(method, "PUT") == 0 ||
 
469
                        strcmp(method, "POST") == 0)
 
470
                        handlePut();
 
471
                else
 
472
                        CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_METHOD, method);
 
473
        }
 
474
}
 
475
 
 
476
bool BSConnectionHandler::initialize()
 
477
{
 
478
        myMySQLTHD = bs_my_create_thread();
 
479
        return true;
 
480
}
 
481
 
 
482
/*
 
483
 * Return false if no connection this thread should quit!
 
484
 */
 
485
bool BSConnectionHandler::doWork()
 
486
{
 
487
        enter_();
 
488
 
 
489
        /* Open a connection: */
 
490
        if (!openStream()) {
 
491
                myMustQuit = true;
 
492
                return_(false);
 
493
        }
 
494
 
 
495
        /* Do the work for the connection: */
 
496
        serviceConnection();
 
497
 
 
498
        /* Close the connection: */
 
499
        close();
 
500
 
 
501
        return_(false);
 
502
}
 
503
 
 
504
void *BSConnectionHandler::finalize()
 
505
{
 
506
        void *thd;
 
507
 
 
508
        /* Close the stream, if it was openned. */
 
509
        close();
 
510
 
 
511
        /* Free the MySQL thread: */
 
512
        if ((thd = myMySQLTHD)) {
 
513
                myMySQLTHD = NULL;
 
514
                BSEngine::closeConnection(thd);
 
515
                bs_my_destroy_thread(thd);
 
516
        }
 
517
        return NULL;
 
518
}
 
519
 
 
520
bool BSConnectionHandler::handleException()
 
521
{
 
522
        if (!myMustQuit) {
 
523
                /* Start another handler if required: */
 
524
                if (!BSNetwork::gWaitingToListen)
 
525
                        BSNetwork::startConnectionHandler();
 
526
        }
 
527
        close();
 
528
        CSDaemon::handleException();
 
529
        return false;
 
530
}
 
531
 
 
532