~pbms-core/pbms/5.11-beta

« back to all changes in this revision

Viewing changes to mybs/src/pbmslib.cc

  • Committer: barry_leslie
  • Date: 2008-09-12 14:22:01 UTC
  • Revision ID: barry_leslie-71db134102f081d5227c64cc38ca9d28e72a6c9f
Internal name changes from MyBS to PBMS.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
 *  mybslib.cc
3
 
 *  MyBSLib
4
 
 *
5
 
 *  Created by Leslie on 7/14/08.
6
 
 *  Copyright 2008 PrimeBase Technologies GmbH. All rights reserved.
7
 
 *
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 *  PrimeBase Media Stream (PBMS)
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * 2008-09-10   Barry Leslie
 
20
 *
 
21
 * H&G2JCtL
8
22
 */
9
 
 
10
23
#include "CSConfig.h"
11
24
 
12
25
#include "mysql.h"
13
 
#include "mybslib.h"
14
 
#define MYBS_CLIENT_API
15
 
#include "mybs.h"
 
26
#include "pbmslib.h"
 
27
#define PBMS_CLIENT_API
 
28
#include "pbms.h"
16
29
#include "CSGlobal.h"
17
30
#include "CSThread.h"
18
31
#include "CSString.h"
19
32
#include "CSStrUtil.h"
20
33
#include "CSSocket.h"
21
34
#include "CSHTTPStream.h"
22
 
#include "BSUtil.h"
 
35
#include "Util_ms.h"
23
36
 
24
37
#define CLEAR_SELF()    CSThread::setSelf(NULL)
25
38
 
26
39
static int global_errno;
27
 
static char global_err_message[BS_RESULT_MESSAGE_SIZE];
 
40
static char global_err_message[MS_RESULT_MESSAGE_SIZE];
28
41
 
29
42
static unsigned long init_count = 0;
30
 
static CSThreadList     *mybsi_thread_list;
31
 
static  CSThread *bslib_cleanup_thread = NULL, *bslib_global_thread = NULL;
32
 
static bool bslib_cleanup_thread_running = false;
 
43
static CSThreadList     *pbms_thread_list;
 
44
static  CSThread *mslib_cleanup_thread = NULL, *mslib_global_thread = NULL;
 
45
static bool mslib_cleanup_thread_running = false;
33
46
 
34
 
static void report_global_bse_error(class  CSThread *thd);
 
47
static void report_global_mse_error(class  CSThread *thd);
35
48
 
36
49
#define DFLT_CONNECTION_TIMEOUT 10      // Changing this required a documentation update.
37
50
 
38
 
class  MYBS_Ref: public CSThread, public CSMutex {
 
51
class  PBMS_Ref: public CSThread, public CSMutex {
39
52
        public:
40
53
        
41
 
        MYBS_Ref():
42
 
                CSThread(mybsi_thread_list),
43
 
                be_host(NULL),
44
 
                be_port(0),
45
 
                be_timeout(DFLT_CONNECTION_TIMEOUT),
46
 
                be_transmition_timeout(0),
47
 
                be_connected(false),
48
 
                be_input(NULL),
49
 
                be_output(NULL)
 
54
        PBMS_Ref():
 
55
                CSThread(pbms_thread_list),
 
56
                ms_host(NULL),
 
57
                ms_port(0),
 
58
                ms_timeout(DFLT_CONNECTION_TIMEOUT),
 
59
                ms_transmition_timeout(0),
 
60
                ms_connected(false),
 
61
                ms_input(NULL),
 
62
                ms_output(NULL)
50
63
        {
51
64
                }
52
65
                
53
 
        ~MYBS_Ref() {
54
 
                if (be_socket) {
55
 
                        if (be_input) 
56
 
                                be_input->release();
57
 
                                
58
 
                        if (be_output) 
59
 
                                be_output->release();
60
 
                                
61
 
                        if (be_connected)
62
 
                                be_disconnect();
63
 
                                
64
 
                        if (be_header)
65
 
                                be_header->release();
 
66
        ~PBMS_Ref() {
 
67
                if (ms_socket) {
 
68
                        if (ms_input) 
 
69
                                ms_input->release();
 
70
                                
 
71
                        if (ms_output) 
 
72
                                ms_output->release();
 
73
                                
 
74
                        if (ms_connected)
 
75
                                ms_disconnect();
 
76
                                
 
77
                        if (ms_header)
 
78
                                ms_header->release();
66
79
                        
67
 
                        be_socket->release();
 
80
                        ms_socket->release();
68
81
                }
69
 
                if (be_host) 
70
 
                        be_host->release();
 
82
                if (ms_host) 
 
83
                        ms_host->release();
71
84
        }
72
 
        CSString        *be_host;
73
 
        unsigned int be_port;
74
 
        unsigned int be_timeout;
75
 
        unsigned int be_transmition_timeout; // In the future this may have some effect but for now it is always be 0 (no timeout).
 
85
        CSString        *ms_host;
 
86
        unsigned int ms_port;
 
87
        unsigned int ms_timeout;
 
88
        unsigned int ms_transmition_timeout; // In the future this may have some effect but for now it is always be 0 (no timeout).
76
89
        
77
 
        void be_init_Streams(const char* host, unsigned int port)
 
90
        void ms_init_Streams(const char* host, unsigned int port)
78
91
        {
79
92
                CSThread *self = this;
80
93
                
81
 
                be_port = port;
82
 
                mybs_api_owner = true;
 
94
                ms_port = port;
 
95
                pbms_api_owner = true;
83
96
                try_(a) {
84
 
                        be_socket = CSSocket::newSocket();
85
 
                        be_host = CSString::newString(host);
86
 
                        
87
 
                        be_socket->retain();
88
 
                        be_output = CSHTTPOutputStream::newStream(CSSocketOutputStream::newStream(be_socket));
89
 
                        
90
 
                        be_socket->retain();
91
 
                        be_input = CSHTTPInputStream::newStream(CSBufferedInputStream::newStream(CSSocketInputStream::newStream(be_socket)));
92
 
                        be_header = new CSStringBuffer(50);
 
97
                        ms_socket = CSSocket::newSocket();
 
98
                        ms_host = CSString::newString(host);
 
99
                        
 
100
                        ms_socket->retain();
 
101
                        ms_output = CSHTTPOutputStream::newStream(CSSocketOutputStream::newStream(ms_socket));
 
102
                        
 
103
                        ms_socket->retain();
 
104
                        ms_input = CSHTTPInputStream::newStream(CSBufferedInputStream::newStream(CSSocketInputStream::newStream(ms_socket)));
 
105
                        ms_header = new CSStringBuffer(50);
93
106
                        
94
107
                }
95
108
                
96
109
                catch_(a) {
97
 
                        report_global_bse_error(this);
 
110
                        report_global_mse_error(this);
98
111
                }
99
112
                
100
113
                cont_(a);
101
114
        }
102
115
        
103
 
        void be_setSelf()
 
116
        void ms_setSelf()
104
117
        {
105
118
                setSelf(this);
106
119
        }
107
120
 
108
 
        void be_connect()
 
121
        void ms_connect()
109
122
        {
110
 
                if (!be_connected) {
111
 
                        be_socket->open((char *) be_host->getCString(), be_port);
112
 
                        be_connected = true;
 
123
                if (!ms_connected) {
 
124
                        ms_socket->open((char *) ms_host->getCString(), ms_port);
 
125
                        ms_connected = true;
113
126
                }
114
 
                be_connect_timeout = time(NULL) + be_timeout;
 
127
                ms_connect_timeout = time(NULL) + ms_timeout;
115
128
        }
116
129
        
117
 
        void be_disconnect()
 
130
        void ms_disconnect()
118
131
        {
119
 
                be_socket->close();
120
 
                be_connected = false;
 
132
                ms_socket->close();
 
133
                ms_connected = false;
121
134
        }
122
135
 
123
 
        void be_init_put_blob(int size, const char *database, const char *table)
 
136
        void ms_init_put_blob(int size, const char *database, const char *table)
124
137
        {
125
 
                be_header->setLength(0);
126
 
                be_header->append("POST /");
127
 
                be_header->append(database);
 
138
                ms_header->setLength(0);
 
139
                ms_header->append("POST /");
 
140
                ms_header->append(database);
128
141
                if (table) {
129
 
                        be_header->append("/");
130
 
                        be_header->append(table);
 
142
                        ms_header->append("/");
 
143
                        ms_header->append(table);
131
144
                }
132
 
                be_header->append(" HTTP/1.1 \r\nContent-Length: ");
133
 
                be_header->append(size);
134
 
                be_header->append("\r\nConnection: Keep-Alive\r\n\r\n");
135
 
                be_output->print(be_header->getCString());
 
145
                ms_header->append(" HTTP/1.1 \r\nContent-Length: ");
 
146
                ms_header->append(size);
 
147
                ms_header->append("\r\nConnection: Keep-Alive\r\n\r\n");
 
148
                ms_output->print(ms_header->getCString());
136
149
                
137
150
        }
138
151
        
139
 
        void be_init_get_blob(MYBS_BLOB_REF *ref)
 
152
        void ms_init_get_blob(PBMS_BLOB_REF *ref)
140
153
        {
141
 
                be_header->setLength(0);
142
 
                be_header->append("GET ");
143
 
                be_header->append((char *) ref);
144
 
                be_header->append(" HTTP/1.1 \r\nContent-Length: ");
145
 
                be_header->append(0);
146
 
                be_header->append("\r\nConnection: Keep-Alive\r\n\r\n");
147
 
                be_output->print(be_header->getCString());
 
154
                ms_header->setLength(0);
 
155
                ms_header->append("GET ");
 
156
                ms_header->append((char *) ref);
 
157
                ms_header->append(" HTTP/1.1 \r\nContent-Length: ");
 
158
                ms_header->append(0);
 
159
                ms_header->append("\r\nConnection: Keep-Alive\r\n\r\n");
 
160
                ms_output->print(ms_header->getCString());
148
161
                
149
162
        }
150
163
        
151
 
        size_t be_read(char *ptr, size_t size)
 
164
        size_t ms_read(char *ptr, size_t size)
152
165
        {
153
166
                size_t len = size;
154
167
                
155
168
                while (size && len) {
156
 
                        len = be_input->read(ptr, size);
 
169
                        len = ms_input->read(ptr, size);
157
170
                        size -= len;
158
171
                        ptr += len;
159
172
                }
161
174
                return size; // returns the unread data size.
162
175
        }
163
176
 
164
 
        bool be_connected;
165
 
        time_t be_connect_timeout;
 
177
        bool ms_connected;
 
178
        time_t ms_connect_timeout;
166
179
        
167
 
        CSHTTPInputStream *be_input;
168
 
        CSHTTPOutputStream *be_output;
 
180
        CSHTTPInputStream *ms_input;
 
181
        CSHTTPOutputStream *ms_output;
169
182
        
170
183
        private:
171
 
        CSSocket *be_socket;
172
 
        CSStringBuffer *be_header;
 
184
        CSSocket *ms_socket;
 
185
        CSStringBuffer *ms_header;
173
186
};
174
187
 
175
188
 
176
189
//------------------------------------------------
177
 
static void throw_http_reply_exception(MYBS_Ref *bse)
 
190
static void throw_http_reply_exception(PBMS_Ref *mse)
178
191
{
179
192
        char *reply_buffer = NULL;
180
193
        CSString *reply = NULL, *error_text = NULL;
182
195
        enter_();
183
196
        
184
197
        try_(a) {
185
 
                size_t size = bse->be_input->getContentLength();
 
198
                size_t size = mse->ms_input->getContentLength();
186
199
                
187
200
                if (!size) {
188
 
                        error_text = CSString::newString("Missing HTTP reply: possible BLOB Streaming engine connection failure.");
189
 
                        bse->be_disconnect();
 
201
                        error_text = CSString::newString("Missing HTTP reply: possible Media Stream engine connection failure.");
 
202
                        mse->ms_disconnect();
190
203
                } else {
191
204
                        u_int start, end;
192
205
                
193
206
                        reply_buffer = (char*) cs_malloc(size);
194
207
 
195
208
                        *reply_buffer = 0;
196
 
                        bse->be_read(reply_buffer, size); 
 
209
                        mse->ms_read(reply_buffer, size); 
197
210
                                
198
211
                        reply = CSString::newString(reply_buffer);
199
212
                        
221
234
}
222
235
 
223
236
//------------------------------------------------
224
 
static void report_global_bse_error(CSThread *thd)
 
237
static void report_global_mse_error(CSThread *thd)
225
238
{
226
239
        if (global_errno)
227
240
                return;
228
241
        global_errno = thd->myException.getErrorCode();
229
 
        cs_strcpy(BS_RESULT_MESSAGE_SIZE, global_err_message,  thd->myException.getMessage());
 
242
        cs_strcpy(MS_RESULT_MESSAGE_SIZE, global_err_message,  thd->myException.getMessage());
230
243
}
231
244
 
232
245
//------------------------------------------------
237
250
        if (global_errno)
238
251
                return;
239
252
        global_errno = err;
240
 
        cs_strcpy(BS_RESULT_MESSAGE_SIZE, global_err_message,  message);
 
253
        cs_strcpy(MS_RESULT_MESSAGE_SIZE, global_err_message,  message);
241
254
        sprintf(line_no, ": line %d", line);
242
 
        cs_strcat(BS_RESULT_MESSAGE_SIZE, global_err_message,  line_no);
 
255
        cs_strcat(MS_RESULT_MESSAGE_SIZE, global_err_message,  line_no);
243
256
}
244
257
 
245
258
//------------------------------------------------
252
265
#define ONE_SECOND (1000)
253
266
#define MAX_SLEEP (60 * ONE_SECOND)
254
267
//------------------------------------------------
255
 
static void *bslib_cleanup_thread_run()
 
268
static void *mslib_cleanup_thread_run()
256
269
{
257
270
        unsigned long sleep_time =  MAX_SLEEP; 
258
271
        
259
272
        enter_();
260
273
        try_(a) {
261
 
                mybsi_thread_list = new CSThreadList();
262
 
                bslib_cleanup_thread_running = true;
 
274
                pbms_thread_list = new CSThreadList();
 
275
                mslib_cleanup_thread_running = true;
263
276
                
264
 
                while (bslib_cleanup_thread_running) {
 
277
                while (mslib_cleanup_thread_running) {
265
278
                        if (init_count)
266
279
                                CSThread::sleep(sleep_time);
267
280
                        
268
281
                        // Scan the thread lists for connections that should be closed.
269
282
                        sleep_time = MAX_SLEEP;
270
283
                        
271
 
                        lock_(mybsi_thread_list);
 
284
                        lock_(pbms_thread_list);
272
285
                        time_t now = time(NULL);
273
286
                        
274
287
                        if (!init_count)
275
 
                                bslib_cleanup_thread_running = false;
 
288
                                mslib_cleanup_thread_running = false;
276
289
                                
277
 
                        MYBS_Ref *ptr = (MYBS_Ref *) mybsi_thread_list->getBack();
 
290
                        PBMS_Ref *ptr = (PBMS_Ref *) pbms_thread_list->getBack();
278
291
                        while (ptr) {
279
 
                                unsigned long timeout = ptr->be_timeout * ONE_SECOND;
 
292
                                unsigned long timeout = ptr->ms_timeout * ONE_SECOND;
280
293
                                lock_(ptr);
281
 
                                if (ptr->be_connected) {
282
 
                                        if (init_count && ptr->be_connect_timeout > now) {
283
 
                                                timeout = (ptr->be_connect_timeout - now) * ONE_SECOND;
 
294
                                if (ptr->ms_connected) {
 
295
                                        if (init_count && ptr->ms_connect_timeout > now) {
 
296
                                                timeout = (ptr->ms_connect_timeout - now) * ONE_SECOND;
284
297
                                                if (timeout < sleep_time)
285
298
                                                        sleep_time = timeout;
286
299
                                        } else 
287
 
                                                ptr->be_disconnect();
 
300
                                                ptr->ms_disconnect();
288
301
                                        
289
302
                                }
290
303
                                unlock_(ptr);
291
304
                                
292
305
                                if (timeout < sleep_time)
293
306
                                        sleep_time = timeout;
294
 
                                ptr = (MYBS_Ref *) ptr->getNextLink();
 
307
                                ptr = (PBMS_Ref *) ptr->getNextLink();
295
308
                        }
296
309
                        
297
 
                        unlock_(mybsi_thread_list);
 
310
                        unlock_(pbms_thread_list);
298
311
                }               
299
312
        }
300
313
        
301
314
        catch_(a) {
302
 
                report_global_bse_error(self);
 
315
                report_global_mse_error(self);
303
316
        }
304
317
        
305
318
        cont_(a);
306
319
        
307
 
        if (mybsi_thread_list)
308
 
                mybsi_thread_list->release();
 
320
        if (pbms_thread_list)
 
321
                pbms_thread_list->release();
309
322
                
310
 
        bslib_cleanup_thread_running = false;
 
323
        mslib_cleanup_thread_running = false;
311
324
        
312
325
        return_(NULL);
313
326
}
314
327
 
315
328
 
316
329
//------------------------------------------------
317
 
mybs_bool mybs_library_init(mybs_bool thread_safe)
 
330
pbms_bool pbms_library_init(pbms_bool thread_safe)
318
331
{
319
332
        clear_global_error();
320
333
        init_count++;
330
343
                
331
344
                cs_init_memory();
332
345
                
333
 
                bslib_global_thread = new CSThread( NULL);
334
 
                CSThread::setSelf(bslib_global_thread);
 
346
                mslib_global_thread = new CSThread( NULL);
 
347
                CSThread::setSelf(mslib_global_thread);
335
348
                
336
 
                CSThread *self = bslib_global_thread;
337
 
                if (!bslib_global_thread) {
 
349
                CSThread *self = mslib_global_thread;
 
350
                if (!mslib_global_thread) {
338
351
                        report_global_error(ENOMEM, "new CSThread( NULL) failed.", __LINE__);
339
352
                        init_count  = 0;
340
353
                         CSThread::shutDown();
342
355
                }
343
356
        
344
357
                try_(a) {
345
 
                        bslib_cleanup_thread = CSThread::newThread(CSString::newString("bslib_cleanup_thread"), bslib_cleanup_thread_run, NULL);
346
 
                        bslib_cleanup_thread->ignoreSignals = true;
347
 
                        bslib_cleanup_thread->start();
 
358
                        mslib_cleanup_thread = CSThread::newThread(CSString::newString("mslib_cleanup_thread"), mslib_cleanup_thread_run, NULL);
 
359
                        mslib_cleanup_thread->ignoreSignals = true;
 
360
                        mslib_cleanup_thread->start();
348
361
                        
349
362
                        time_t timeout = time(NULL) + 5;
350
 
                        while (!bslib_cleanup_thread_running) {
 
363
                        while (!mslib_cleanup_thread_running) {
351
364
                                usleep(100);
352
365
                                if (time(NULL) > timeout)
353
366
                                        break;
354
367
                        }
355
 
                        if (!bslib_cleanup_thread_running) 
356
 
                                report_global_error(ETIMEDOUT, "Timeout while starting bslib_cleanup_thread.", __LINE__);
 
368
                        if (!mslib_cleanup_thread_running) 
 
369
                                report_global_error(ETIMEDOUT, "Timeout while starting mslib_cleanup_thread.", __LINE__);
357
370
                }
358
371
                catch_(a);
359
 
                        report_global_bse_error(bslib_cleanup_thread);
 
372
                        report_global_mse_error(mslib_cleanup_thread);
360
373
                        
361
374
                cont_(a);
362
 
                if (!bslib_cleanup_thread_running) {
 
375
                if (!mslib_cleanup_thread_running) {
363
376
                        init_count  = 0;
364
 
                        if ( bslib_cleanup_thread) {
365
 
                                bslib_cleanup_thread->signal(0); // wakeup the thread.
366
 
                                bslib_cleanup_thread->join();
367
 
                                bslib_cleanup_thread->release();
368
 
                                bslib_cleanup_thread = NULL;
 
377
                        if ( mslib_cleanup_thread) {
 
378
                                mslib_cleanup_thread->signal(0); // wakeup the thread.
 
379
                                mslib_cleanup_thread->join();
 
380
                                mslib_cleanup_thread->release();
 
381
                                mslib_cleanup_thread = NULL;
369
382
                        }
370
383
                        
371
 
                        bslib_global_thread->release();
372
 
                        bslib_global_thread = NULL;
 
384
                        mslib_global_thread->release();
 
385
                        mslib_global_thread = NULL;
373
386
                        cs_exit_memory();
374
387
                        CSThread::shutDown();
375
388
                }
376
389
        }
377
390
        
378
 
        return(bslib_cleanup_thread_running);
 
391
        return(mslib_cleanup_thread_running);
379
392
}
380
393
 
381
394
//------------------------------------------------
382
 
void mybs_library_end()
 
395
void pbms_library_end()
383
396
{
384
397
         
385
398
        if (init_count == 1 ) {
386
399
                init_count  = 0; // This will cause the main thread to terminate.
387
400
                        
388
 
                CSThread::setSelf(bslib_global_thread);
389
 
                
390
 
                bslib_cleanup_thread->signal(0); // wakeup the thread.
391
 
                bslib_cleanup_thread->join();
392
 
                bslib_cleanup_thread->release();
393
 
                bslib_cleanup_thread = NULL;
394
 
                
395
 
                bslib_global_thread->release();
396
 
                bslib_global_thread = NULL;
 
401
                CSThread::setSelf(mslib_global_thread);
 
402
                
 
403
                mslib_cleanup_thread->signal(0); // wakeup the thread.
 
404
                mslib_cleanup_thread->join();
 
405
                mslib_cleanup_thread->release();
 
406
                mslib_cleanup_thread = NULL;
 
407
                
 
408
                mslib_global_thread->release();
 
409
                mslib_global_thread = NULL;
397
410
                cs_exit_memory();
398
411
                CSThread::shutDown();
399
412
        }
403
416
}
404
417
 
405
418
//------------------------------------------------
406
 
int mybs_errno(MYBS mybs)
 
419
int pbms_errno(PBMS mybs)
407
420
{
408
 
        MYBS_Ref *bse = (MYBS_Ref*) mybs;
409
 
        if (bse)
410
 
                return bse->myException.getErrorCode();
 
421
        PBMS_Ref *mse = (PBMS_Ref*) mybs;
 
422
        if (mse)
 
423
                return mse->myException.getErrorCode();
411
424
                
412
425
        return global_errno;
413
426
}
414
427
 
415
428
//------------------------------------------------
416
 
const char *mybs_error(MYBS mybs)
 
429
const char *pbms_error(PBMS mybs)
417
430
{
418
 
        MYBS_Ref *bse = (MYBS_Ref*) mybs;
419
 
        if (bse)
420
 
                return  bse->myException.getMessage();
 
431
        PBMS_Ref *mse = (PBMS_Ref*) mybs;
 
432
        if (mse)
 
433
                return  mse->myException.getMessage();
421
434
                
422
435
        return global_err_message;
423
436
}
424
437
 
425
438
//------------------------------------------------
426
 
MYBS mybs_connect(const char* host, unsigned int port)
 
439
PBMS pbms_connect(const char* host, unsigned int port)
427
440
{
428
 
        MYBS_Ref *bse = NULL;
 
441
        PBMS_Ref *mse = NULL;
429
442
        CLEAR_SELF(); 
430
443
        
431
444
        clear_global_error();
432
445
 
433
 
        if ((!(bse = new MYBS_Ref())) || !CSThread::attach(bse)) {
434
 
                report_global_error(ENOMEM, "new MYBS_Ref() failed.", __LINE__);
435
 
                if (bse) {
436
 
                        bse->release();
437
 
                        bse = NULL;
 
446
        if ((!(mse = new PBMS_Ref())) || !CSThread::attach(mse)) {
 
447
                report_global_error(ENOMEM, "new PBMS_Ref() failed.", __LINE__);
 
448
                if (mse) {
 
449
                        mse->release();
 
450
                        mse = NULL;
438
451
                }
439
452
        } else {
440
 
                CSThread *self = bse;
 
453
                CSThread *self = mse;
441
454
                
442
455
                try_(a) {
443
 
                                bse->be_init_Streams(host, port);
 
456
                                mse->ms_init_Streams(host, port);
444
457
                        }
445
458
                catch_(a);
446
 
                        report_global_bse_error(bse);
447
 
                        bse->release();
448
 
                        bse = NULL;
 
459
                        report_global_mse_error(mse);
 
460
                        mse->release();
 
461
                        mse = NULL;
449
462
                        
450
463
                cont_(a);
451
464
        }
452
465
                
453
 
        return(bse);
 
466
        return(mse);
454
467
}
455
468
 
456
469
//------------------------------------------------
457
 
MYBS mybs_mysql_connect(MYSQL *mysql)
 
470
PBMS pbms_mysql_connect(MYSQL *mysql)
458
471
{
459
472
        MYSQL_RES *results = NULL;            
460
473
        MYSQL_ROW record;
461
474
        int port;
462
475
        char *host = mysql->host;
463
 
        MYBS_Ref *bse = NULL;
 
476
        PBMS_Ref *mse = NULL;
464
477
        CLEAR_SELF(); 
465
478
        
466
479
        clear_global_error();
470
483
                goto done;
471
484
        }
472
485
        
473
 
        if (mysql_query(mysql, "show variables like  \"mybs_port\";")){
 
486
        if (mysql_query(mysql, "show variables like  \"pbms_port\";")){
474
487
                report_global_error((int) mysql_errno(mysql), (char *) mysql_error(mysql), __LINE__);
475
488
                goto done;
476
489
        }
482
495
        }
483
496
 
484
497
        if (mysql_num_rows(results) != 1) {
485
 
                report_global_error(CS_ERR_GENERIC_ERROR, "No MyBS blob streaming server associated with the MYSQL connection handle." , __LINE__);
 
498
                report_global_error(CS_ERR_GENERIC_ERROR, "No PBMS blob streaming server associated with the MYSQL connection handle." , __LINE__);
486
499
                goto done;
487
500
        }
488
501
        
489
502
        record = mysql_fetch_row(results);
490
503
        port = atol(record[1]);
491
504
        
492
 
        if ((!(bse = new MYBS_Ref())) || !CSThread::attach(bse)) {
493
 
                report_global_error(ENOMEM, "new MYBS_Ref() failed.", __LINE__);
494
 
                if (bse) {
495
 
                        bse->release();
496
 
                        bse = NULL;
 
505
        if ((!(mse = new PBMS_Ref())) || !CSThread::attach(mse)) {
 
506
                report_global_error(ENOMEM, "new PBMS_Ref() failed.", __LINE__);
 
507
                if (mse) {
 
508
                        mse->release();
 
509
                        mse = NULL;
497
510
                }
498
511
        }
499
512
        
500
 
        CSThread *self = bse;
 
513
        CSThread *self = mse;
501
514
        try_(a) {
502
 
                bse->be_init_Streams(host, port);
 
515
                mse->ms_init_Streams(host, port);
503
516
        }
504
517
        catch_(a);
505
 
        report_global_bse_error(bse);
506
 
        bse->release();
507
 
        bse = NULL;
 
518
        report_global_mse_error(mse);
 
519
        mse->release();
 
520
        mse = NULL;
508
521
        goto done;
509
522
                
510
523
        cont_(a);
513
526
        if (results)
514
527
                mysql_free_result(results);
515
528
                
516
 
        return bse;
517
 
}
518
 
 
519
 
//------------------------------------------------
520
 
void mybs_close(MYBS mybs)
521
 
{
522
 
        MYBS_Ref *bse = (MYBS_Ref*) mybs;
523
 
        bse->be_setSelf();
524
 
 
525
 
        CSThread::detach(bse);
526
 
}
527
 
 
528
 
//------------------------------------------------
529
 
mybs_bool mybs_is_blob_reference(MYBS mybs, MYBS_BLOB_REF *ref)
530
 
{
531
 
        mybs_bool ok = false;
532
 
        MYBS_Ref *bse = (MYBS_Ref*) mybs;
533
 
        bse->be_setSelf();      
 
529
        return mse;
 
530
}
 
531
 
 
532
//------------------------------------------------
 
533
void pbms_close(PBMS mybs)
 
534
{
 
535
        PBMS_Ref *mse = (PBMS_Ref*) mybs;
 
536
        mse->ms_setSelf();
 
537
 
 
538
        CSThread::detach(mse);
 
539
}
 
540
 
 
541
//------------------------------------------------
 
542
pbms_bool pbms_is_blob_reference(PBMS mybs, PBMS_BLOB_REF *ref)
 
543
{
 
544
        pbms_bool ok = false;
 
545
        PBMS_Ref *mse = (PBMS_Ref*) mybs;
 
546
        mse->ms_setSelf();      
534
547
        enter_();
535
548
        
536
549
        try_(a) {
537
 
                BSBlobURLRec blob;
538
 
                if (bs_parse_blob_url(&blob, (char *)ref, true)) 
 
550
                MSBlobURLRec blob;
 
551
                if (ms_parse_blob_url(&blob, (char *)ref, true)) 
539
552
                        ok = true;
540
553
        }
541
554
        
545
558
}
546
559
 
547
560
//------------------------------------------------
548
 
size_t mybs_get_blob_size(MYBS mybs, MYBS_BLOB_REF *ref)
 
561
size_t pbms_get_blob_size(PBMS mybs, PBMS_BLOB_REF *ref)
549
562
{
550
563
        size_t size = 0;
551
 
        MYBS_Ref *bse = (MYBS_Ref*) mybs;
552
 
        bse->be_setSelf();      
 
564
        PBMS_Ref *mse = (PBMS_Ref*) mybs;
 
565
        mse->ms_setSelf();      
553
566
        enter_();
554
567
        
555
568
        try_(a) {
556
 
                BSBlobURLRec blob;
557
 
                if (bs_parse_blob_url(&blob, (char *)ref, true)) 
 
569
                MSBlobURLRec blob;
 
570
                if (ms_parse_blob_url(&blob, (char *)ref, true)) 
558
571
                        size = blob.bu_blob_size;
559
572
        }
560
573
        
564
577
}
565
578
 
566
579
//------------------------------------------------
567
 
mybs_bool mybs_put_data(MYBS mybs, size_t size, const char *database, const char *table, MYBS_BLOB_REF *ref, char *data, size_t data_size, MYBS_CALLBACK_FUNC cb, void *caller_data)
 
580
pbms_bool pbms_put_data(PBMS mybs, size_t size, const char *database, const char *table, PBMS_BLOB_REF *ref, char *data, size_t data_size, PBMS_CALLBACK_FUNC cb, void *caller_data)
568
581
{
569
 
        mybs_bool ok = true;
570
 
        MYBS_Ref *bse = (MYBS_Ref*) mybs;
571
 
        bse->be_setSelf();      
 
582
        pbms_bool ok = true;
 
583
        PBMS_Ref *mse = (PBMS_Ref*) mybs;
 
584
        mse->ms_setSelf();      
572
585
        enter_();
573
586
        
574
587
        try_(a) {
575
588
        
576
 
                lock_(bse);
 
589
                lock_(mse);
577
590
                
578
 
                bse->be_connect();
 
591
                mse->ms_connect();
579
592
                        
580
 
                bse->be_init_put_blob(size, database, table);
 
593
                mse->ms_init_put_blob(size, database, table);
581
594
                
582
595
                if (data && data_size) {
583
 
                        bse->be_output->write(data, data_size);
 
596
                        mse->ms_output->write(data, data_size);
584
597
                        size -= data_size;
585
598
                        }
586
599
                        
593
606
                        }
594
607
                        
595
608
                                if (data_size)
596
 
                                        bse->be_output->write(data, data_size);
 
609
                                        mse->ms_output->write(data, data_size);
597
610
                                
598
611
                                size -= data_size;
599
612
                        }
602
615
                if (size) 
603
616
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Unexpected end of data.");
604
617
 
605
 
                bse->be_output->flush();
 
618
                mse->ms_output->flush();
606
619
                
607
 
                bse->be_input->readHead();
608
 
                if ( bse->be_input->getStatus() != 200) {
609
 
                        throw_http_reply_exception(bse);
 
620
                mse->ms_input->readHead();
 
621
                if ( mse->ms_input->getStatus() != 200) {
 
622
                        throw_http_reply_exception(mse);
610
623
                }
611
624
                
612
 
                size_t len = bse->be_input->getContentLength();
613
 
                size = sizeof(MYBS_BLOB_REF) -1;
 
625
                size_t len = mse->ms_input->getContentLength();
 
626
                size = sizeof(PBMS_BLOB_REF) -1;
614
627
                if ( len > size) 
615
628
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Unexpected Blob refernce returned, blob reference was too large.");
616
629
                else
618
631
                
619
632
                char *ptr = (char*) ref;
620
633
                
621
 
                if (bse->be_read(ptr, size)) 
 
634
                if (mse->ms_read(ptr, size)) 
622
635
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Incomplete Blob refernce returned.");
623
636
 
624
637
                ptr[size] = 0;
625
 
                unlock_(bse);
 
638
                unlock_(mse);
626
639
        }
627
640
        
628
641
        catch_(a);
629
 
        bse->be_disconnect();
 
642
        mse->ms_disconnect();
630
643
        ok = false;
631
644
        cont_(a);
632
645
        return_(ok);
633
646
}
634
647
 
635
648
//------------------------------------------------
636
 
mybs_bool mybs_get_data(MYBS mybs, MYBS_BLOB_REF *ref, char *buffer, size_t buffer_size, MYBS_CALLBACK_FUNC cb, void *caller_data)
 
649
pbms_bool pbms_get_data(PBMS mybs, PBMS_BLOB_REF *ref, char *buffer, size_t buffer_size, PBMS_CALLBACK_FUNC cb, void *caller_data)
637
650
{
638
 
        mybs_bool ok = true;
639
 
        MYBS_Ref *bse = (MYBS_Ref*) mybs;
640
 
        bse->be_setSelf();      
 
651
        pbms_bool ok = true;
 
652
        PBMS_Ref *mse = (PBMS_Ref*) mybs;
 
653
        mse->ms_setSelf();      
641
654
        enter_();
642
655
        
643
656
        try_(a) {
644
657
        
645
 
                lock_(bse);
 
658
                lock_(mse);
646
659
                
647
 
                bse->be_connect();
 
660
                mse->ms_connect();
648
661
                        
649
 
                bse->be_init_get_blob(ref);
650
 
                bse->be_output->flush();
 
662
                mse->ms_init_get_blob(ref);
 
663
                mse->ms_output->flush();
651
664
 
652
 
                bse->be_input->readHead();
653
 
                if ( bse->be_input->getStatus() != 200) {
654
 
                        throw_http_reply_exception(bse);
 
665
                mse->ms_input->readHead();
 
666
                if ( mse->ms_input->getStatus() != 200) {
 
667
                        throw_http_reply_exception(mse);
655
668
                }
656
669
                
657
 
                size_t data_len = bse->be_input->getContentLength();
 
670
                size_t data_len = mse->ms_input->getContentLength();
658
671
                
659
672
                if (buffer_size && buffer) {
660
673
                        if (buffer_size > data_len)
661
674
                                buffer_size = data_len;
662
675
                                
663
 
                        if (bse->be_read(buffer, buffer_size)) 
 
676
                        if (mse->ms_read(buffer, buffer_size)) 
664
677
                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Incomplete Blob data returned.");
665
678
                                
666
679
                        data_len -= buffer_size;
671
684
                                if (buffer_size > data_len)
672
685
                                        buffer_size = data_len;
673
686
                                        
674
 
                                if (buffer_size && bse->be_read(buffer, buffer_size)) 
 
687
                                if (buffer_size && mse->ms_read(buffer, buffer_size)) 
675
688
                                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Incomplete Blob data returned.");
676
689
 
677
690
 
682
695
                if (data_len)
683
696
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Incomplete Blob data returned.");
684
697
                
685
 
                unlock_(bse);
 
698
                unlock_(mse);
686
699
        }
687
700
        
688
701
        catch_(a);
689
 
        bse->be_disconnect();
 
702
        mse->ms_disconnect();
690
703
        ok = false;
691
704
        cont_(a);
692
705
        return_(ok);
694
707
 
695
708
 
696
709
//------------------------------------------------
697
 
mybs_bool mybs_set_option(MYBS mybs, enum enum_mybs_option option, const void *in_value)
 
710
pbms_bool pbms_set_option(PBMS mybs, enum pbms_option option, const void *in_value)
698
711
{
699
 
        mybs_bool ok = true;
700
 
        MYBS_Ref *bse = (MYBS_Ref*) mybs;
701
 
        bse->be_setSelf();      
 
712
        pbms_bool ok = true;
 
713
        PBMS_Ref *mse = (PBMS_Ref*) mybs;
 
714
        mse->ms_setSelf();      
702
715
        enter_();
703
716
        
704
717
        try_(a) {
705
718
 
706
719
                switch (option) {
707
 
                        case MYBS_OPTION_CONNECTION_TIMEOUT:
708
 
                                bse->be_timeout = *((unsigned int*)in_value);
 
720
                        case PBMS_OPTION_CONNECTION_TIMEOUT:
 
721
                                mse->ms_timeout = *((unsigned int*)in_value);
709
722
                                break;
710
723
                                
711
 
                        case MYBS_OPTION_TRANSMITION_TIMEOUT:
712
 
                                bse->be_transmition_timeout = *((unsigned int*)in_value);
 
724
                        case PBMS_OPTION_TRANSMITION_TIMEOUT:
 
725
                                mse->ms_transmition_timeout = *((unsigned int*)in_value);
713
726
                                break; 
714
727
                        
715
 
                        case MYBS_OPTION_HOST:
716
 
                        case MYBS_OPTION_PORT:
 
728
                        case PBMS_OPTION_HOST:
 
729
                        case PBMS_OPTION_PORT:
717
730
                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Option is ReadOnly.");
718
731
                                break;
719
732
 
730
743
}
731
744
 
732
745
//------------------------------------------------
733
 
mybs_bool mybs_get_option(MYBS mybs, enum enum_mybs_option option, void *out_value)
 
746
pbms_bool pbms_get_option(PBMS mybs, enum pbms_option option, void *out_value)
734
747
{
735
 
        mybs_bool ok = true;
736
 
        MYBS_Ref *bse = (MYBS_Ref*) mybs;
737
 
        bse->be_setSelf();      
 
748
        pbms_bool ok = true;
 
749
        PBMS_Ref *mse = (PBMS_Ref*) mybs;
 
750
        mse->ms_setSelf();      
738
751
        enter_();
739
752
        
740
753
        try_(a) {
741
754
 
742
755
                switch (option) {
743
 
                        case MYBS_OPTION_CONNECTION_TIMEOUT:
744
 
                                *((unsigned int*)out_value) = bse->be_timeout;
 
756
                        case PBMS_OPTION_CONNECTION_TIMEOUT:
 
757
                                *((unsigned int*)out_value) = mse->ms_timeout;
745
758
                                break;
746
759
                        
747
 
                        case MYBS_OPTION_TRANSMITION_TIMEOUT:
748
 
                                *((unsigned int*)out_value) = bse->be_transmition_timeout;
 
760
                        case PBMS_OPTION_TRANSMITION_TIMEOUT:
 
761
                                *((unsigned int*)out_value) = mse->ms_transmition_timeout;
749
762
                                break; 
750
763
                        
751
 
                        case MYBS_OPTION_HOST:
752
 
                                *((const char**)out_value) = bse->be_host->getCString();
 
764
                        case PBMS_OPTION_HOST:
 
765
                                *((const char**)out_value) = mse->ms_host->getCString();
753
766
                                break;
754
767
                                
755
 
                        case MYBS_OPTION_PORT:
756
 
                                *((unsigned int*)out_value) = bse->be_port;
 
768
                        case PBMS_OPTION_PORT:
 
769
                                *((unsigned int*)out_value) = mse->ms_port;
757
770
                                break;
758
771
 
759
772
                        default: