68
74
#define MS_ERR_UNKNOWN_DB 8
69
75
#define MS_ERR_REMOVING_REPO 9
70
76
#define MS_ERR_DATABASE_DELETED 10
77
#define MS_ERR_DUPLICATE 11 /* Attempt to insert a duplicate key into a system table. */
78
#define MS_ERR_INVALID_RECORD 12
79
#define MS_ERR_RECOVERY_IN_PROGRESS 13
80
#define MS_ERR_DUPLICATE_DB 14
81
#define MS_ERR_DUPLICATE_DB_ID 15
82
#define MS_ERR_INVALID_OPERATION 16
72
84
#define MS_LOCK_NONE 0
73
85
#define MS_LOCK_READONLY 1
74
86
#define MS_LOCK_READ_WRITE 2
76
#define MS_XACT_NONE 0
77
#define MS_XACT_BEGIN 1
78
#define MS_XACT_COMMIT 2
79
#define MS_XACT_ROLLBACK 3
81
#define PBMS_ENGINE_REF_LEN 8
82
#define PBMS_BLOB_URL_SIZE 200
88
#define PBMS_BLOB_URL_SIZE 120
84
90
#define PBMS_FIELD_COL_SIZE 128
85
91
#define PBMS_FIELD_COND_SIZE 300
93
#define MS_RESULT_MESSAGE_SIZE 300
94
#define MS_RESULT_STACK_SIZE 200
96
typedef struct PBMSResultRec {
97
int mr_code; /* Engine specific error code. */
98
char mr_message[MS_RESULT_MESSAGE_SIZE]; /* Error message, required if non-zero return code. */
99
char mr_stack[MS_RESULT_STACK_SIZE]; /* Trace information about where the error occurred. */
100
} PBMSResultRec, *PBMSResultPtr;
88
104
typedef struct PBMSBlobID {
89
106
u_int64_t bi_blob_size;
90
107
u_int64_t bi_blob_id; // or repo file offset if type = REPO
108
u_int64_t bi_blob_ref_id;
91
109
u_int32_t bi_tab_id; // or repo ID if type = REPO
92
110
u_int32_t bi_auth_code;
93
111
u_int32_t bi_blob_type;
94
112
} PBMSBlobIDRec, *PBMSBlobIDPtr;
96
typedef struct PBMSResultRec {
97
int mr_code; /* Engine specific error code. */
98
char mr_message[MS_RESULT_MESSAGE_SIZE]; /* Error message, required if non-zero return code. */
99
char mr_stack[MS_RESULT_STACK_SIZE]; /* Trace information about where the error occurred. */
100
} PBMSResultRec, *PBMSResultPtr;
102
typedef struct PBMSEngineRefRec {
103
unsigned char er_data[PBMS_ENGINE_REF_LEN];
104
} PBMSEngineRefRec, *PBMSEngineRefPtr;
106
114
typedef struct PBMSBlobURL {
107
115
char bu_data[PBMS_BLOB_URL_SIZE];
108
116
} PBMSBlobURLRec, *PBMSBlobURLPtr;
110
typedef struct PBMSFieldRef {
111
char fr_column[PBMS_FIELD_COL_SIZE];
112
char fr_cond[PBMS_FIELD_COND_SIZE];
113
} PBMSFieldRefRec, *PBMSFieldRefPtr;
115
* The engine must free its resources for the given thread.
117
typedef void (*MSCloseConnFunc)(void *thd);
119
/* Before access BLOBs of a table, the streaming engine will open the table.
120
* Open tables are managed as a pool by the streaming engine.
121
* When a request is received, the streaming engine will ask all
122
* registered engine to open the table. The engine must return a NULL
123
* open_table pointer if it does not handle the table.
124
* A callback allows an engine to request all open tables to be
125
* closed by the streaming engine.
127
typedef int (*MSOpenTableFunc)(void *thd, const char *table_url, void **open_table, PBMSResultPtr result);
128
typedef void (*MSCloseTableFunc)(void *thd, void *open_table);
131
* When the streaming engine wants to use an open table handle from the
132
* pool, it calls the lock table function.
134
typedef int (*MSLockTableFunc)(void *thd, int *xact, void *open_table, int lock_type, PBMSResultPtr result);
135
typedef int (*MSUnlockTableFunc)(void *thd, int xact, void *open_table, PBMSResultPtr result);
137
/* This function is used to locate and send a BLOB on the given stream.
139
typedef int (*MSSendBLOBFunc)(void *thd, void *open_table, const char *blob_column, const char *blob_url, void *stream, PBMSResultPtr result);
142
* Lookup and engine reference, and return readable text.
144
typedef int (*MSLookupRefFunc)(void *thd, void *open_table, unsigned short col_index, PBMSEngineRefPtr eng_ref, PBMSFieldRefPtr feild_ref, PBMSResultPtr result);
146
118
typedef struct PBMSEngineRec {
147
119
int ms_version; /* MS_ENGINE_VERSION */
148
120
int ms_index; /* The index into the engine list. */
149
121
int ms_removing; /* TRUE (1) if the engine is being removed. */
150
const char *ms_engine_name;
151
void *ms_engine_info;
152
MSCloseConnFunc ms_close_conn;
153
MSOpenTableFunc ms_open_table;
154
MSCloseTableFunc ms_close_table;
155
MSLockTableFunc ms_lock_table;
156
MSUnlockTableFunc ms_unlock_table;
157
MSSendBLOBFunc ms_send_blob;
158
MSLookupRefFunc ms_lookup_ref;
122
int ms_internal; /* TRUE (1) if the engine is supported directly in the mysq/drizzle handler code . */
123
char ms_engine_name[32];
159
124
} PBMSEngineRec, *PBMSEnginePtr;
162
127
* This function should never be called directly, it is called
163
128
* by deregisterEngine() below.
130
typedef void (*ECRegisterdFunc)(PBMSEnginePtr engine);
165
132
typedef void (*ECDeregisterdFunc)(PBMSEnginePtr engine);
167
typedef void (*ECTableCloseAllFunc)(const char *table_url);
169
typedef int (*ECSetContentLenFunc)(void *stream, off_t len, PBMSResultPtr result);
171
typedef int (*ECWriteHeadFunc)(void *stream, PBMSResultPtr result);
173
typedef int (*ECWriteStreamFunc)(void *stream, void *buffer, size_t len, PBMSResultPtr result);
176
* The engine should call this function from
177
* its own close connection function!
179
typedef int (*ECCloseConnFunc)(void *thd, PBMSResultPtr result);
182
* Call this function before retaining or releasing BLOBs in a row.
184
typedef int (*ECOpenTableFunc)(void **open_table, char *table_path, PBMSResultPtr result);
187
* Call this function when the operation is complete.
189
typedef void (*ECCloseTableFunc)(void *open_table);
135
* Call this function to store a BLOB in the repository the BLOB's
136
* URL will be returned. The returned URL buffer is expected to be atleast
137
* PBMS_BLOB_URL_SIZE long.
139
* The BLOB URL must still be retained or it will automaticly be deleted after a timeout expires.
141
typedef int (*ECCreateBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob, size_t blob_len, char *blob_url, unsigned short col_index, PBMSResultPtr result);
192
144
* Call this function for each BLOB to be retained. When a BLOB is used, the
193
* URL may be changed. The returned URL is valid as long as the the
145
* URL may be changed. The returned URL buffer is expected to be atleast
146
* PBMS_BLOB_URL_SIZE long.
196
148
* The returned URL must be inserted into the row in place of the given
199
typedef int (*ECUseBlobFunc)(void *open_table, char **ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
202
* Reference Blobs that has been uploaded to the streaming engine.
204
* All BLOBs specified by the use blob function are retained by
207
* The engine reference is a (unaligned) 8 byte value which
208
* identifies the row that the BLOBs are in.
210
typedef int (*ECRetainBlobsFunc)(void *open_table, PBMSEngineRefPtr eng_ref, PBMSResultPtr result);
151
typedef int (*ECRetainBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
213
154
* If a row containing a BLOB is deleted, then the BLOBs in the
216
157
* Note: if a table is dropped, all the BLOBs referenced by the
217
158
* table are automatically released.
219
typedef int (*ECReleaseBlobFunc)(void *open_table, char *blob_url, unsigned short col_index, PBMSEngineRefPtr eng_ref, PBMSResultPtr result);
221
typedef int (*ECDropTable)(const char *table_path, PBMSResultPtr result);
223
typedef int (*ECRenameTable)(const char *from_table, const char *to_table, PBMSResultPtr result);
160
typedef int (*ECReleaseBlobFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result);
162
typedef int (*ECDropTable)(bool built_in, const char *db_name, const char *tab_name, PBMSResultPtr result);
164
typedef int (*ECRenameTable)(bool built_in, const char *db_name, const char *from_table, const char *to_table, PBMSResultPtr result);
166
typedef void (*ECCallCompleted)(bool built_in, bool ok);
225
168
typedef struct PBMSCallbacksRec {
226
169
int cb_version; /* MS_CALLBACK_VERSION */
170
ECRegisterdFunc cb_register;
227
171
ECDeregisterdFunc cb_deregister;
228
ECTableCloseAllFunc cb_table_close_all;
229
ECSetContentLenFunc cb_set_cont_len;
230
ECWriteHeadFunc cb_write_head;
231
ECWriteStreamFunc cb_write_stream;
232
ECCloseConnFunc cb_close_conn;
233
ECOpenTableFunc cb_open_table;
234
ECCloseTableFunc cb_close_table;
235
ECUseBlobFunc cb_use_blob;
236
ECRetainBlobsFunc cb_retain_blobs;
172
ECCreateBlobsFunc cb_create_blob;
173
ECRetainBlobsFunc cb_retain_blob;
237
174
ECReleaseBlobFunc cb_release_blob;
238
175
ECDropTable cb_drop_table;
239
176
ECRenameTable cb_rename_table;
177
ECCallCompleted cb_completed;
240
178
} PBMSCallbacksRec, *PBMSCallbacksPtr;
242
180
typedef struct PBMSSharedMemoryRec {
348
sharedMemory->sm_magic = 0;
351
const char **prefix = temp_prefix;
353
getTempFileName(temp_file, *prefix, getpid());
320
removeSharedMemory();
360
void closeAllTables(const char *table_url)
323
void removeSharedMemory()
362
PBMSResultRec result;
365
if ((err = getSharedMemory(true, &result)))
325
const char **prefix = temp_prefix;
328
// Do not remove the sharfed memory until after
329
// the PBMS engine has shutdown.
368
330
if (sharedMemory->sm_callbacks)
369
sharedMemory->sm_callbacks->cb_table_close_all(table_url);
372
int setContentLength(void *stream, off_t len, PBMSResultPtr result)
376
if ((err = getSharedMemory(true, result)))
379
return sharedMemory->sm_callbacks->cb_set_cont_len(stream, len, result);
382
int writeHead(void *stream, PBMSResultPtr result)
386
if ((err = getSharedMemory(true, result)))
389
return sharedMemory->sm_callbacks->cb_write_head(stream, result);
392
int writeStream(void *stream, void *buffer, size_t len, PBMSResultPtr result)
396
if ((err = getSharedMemory(true, result)))
399
return sharedMemory->sm_callbacks->cb_write_stream(stream, buffer, len, result);
402
int closeConn(void *thd, PBMSResultPtr result)
406
if ((err = getSharedMemory(true, result)))
409
if (!sharedMemory->sm_callbacks)
412
return sharedMemory->sm_callbacks->cb_close_conn(thd, result);
415
int openTable(void **open_table, char *table_path, PBMSResultPtr result)
419
if ((err = getSharedMemory(true, result)))
422
if (!sharedMemory->sm_callbacks) {
333
sharedMemory->sm_magic = 0;
338
getTempFileName(temp_file, *prefix, getpid());
427
return sharedMemory->sm_callbacks->cb_open_table(open_table, table_path, result);
430
int closeTable(void *open_table, PBMSResultPtr result)
434
if ((err = getSharedMemory(true, result)))
437
if (sharedMemory->sm_callbacks && open_table)
438
sharedMemory->sm_callbacks->cb_close_table(open_table);
442
344
int couldBeURL(char *blob_url)
443
/* ~*test/~1-150-2b5e0a7-0[*<blob size>][.ext] */
444
/* ~*test/_1-150-2b5e0a7-0[*<blob size>][.ext] */
448
bool have_blob_size = false;
451
if ((len = strlen(blob_url))) {
456
/* Required prefix: */
457
/* NOTE: ~> is deprecated v0.5.4+, now use ~* */
458
if (*blob_url != '~' || (*(blob_url + 1) != '>' && *(blob_url + 1) != '*'))
461
ptr = blob_url + len - 1;
463
/* Allow for an optional extension: */
464
if (!isdigit(*ptr)) {
465
while (ptr > blob_url && *ptr != '/' && *ptr != '.')
467
if (ptr == blob_url || *ptr != '.')
469
if (ptr == blob_url || !isdigit(*ptr))
473
// field 1: server id OR blob size
475
while (ptr > blob_url && isdigit(*ptr))
478
if (ptr != blob_url && *ptr == '*' && !have_blob_size) {
480
have_blob_size = true;
484
if (ptr == blob_url || *ptr != '-')
488
// field 2: Authoration code
493
while (ptr > blob_url && isxdigit(*ptr))
496
if (ptr == blob_url || *ptr != '-')
504
while (ptr > blob_url && isdigit(*ptr))
507
if (ptr == blob_url || *ptr != '-')
516
while (ptr > blob_url && isdigit(*ptr))
519
/* NOTE: ^ and : are deprecated v0.5.4+, now use ! and ~ */
520
if (ptr == blob_url || (*ptr != '^' && *ptr != ':' && *ptr != '_' && *ptr != '~'))
524
if (ptr == blob_url || *ptr != '/')
348
u_int32_t tab_id = 0;
349
u_int64_t blob_id = 0;
350
u_int64_t blob_ref_id = 0;
351
u_int64_t blob_size = 0;
352
u_int32_t auth_code = 0;
353
u_int32_t server_id = 0;
358
scanned = sscanf(blob_url, URL_FMT"%4s", &db_id, &type, &tab_id, &blob_id, &auth_code, &server_id, &blob_ref_id, &blob_size, junk);
359
if (scanned != 8) // If junk is found at the end this will also result in an invalid URL.
362
if (junk[0] || (type != '~' && type != '_'))
535
int useBlob(void *open_table, char **ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result)
371
int retainBlob(const char *db_name, const char *tab_name, char *ret_blob_url, char *blob_url, size_t blob_size, unsigned short col_index, PBMSResultPtr result)
539
if ((err = getSharedMemory(true, result)))
375
if ((err = getSharedMemory(false, result)))
542
378
if (!couldBeURL(blob_url)) {
543
*ret_blob_url = NULL;
380
if (!sharedMemory->sm_callbacks) {
384
err = sharedMemory->sm_callbacks->cb_create_blob(built_in, db_name, tab_name, blob_url, blob_size, ret_blob_url, col_index, result);
388
blob_url = ret_blob_url;
547
391
if (!sharedMemory->sm_callbacks) {
580
411
if (!couldBeURL(blob_url))
583
return sharedMemory->sm_callbacks->cb_release_blob(open_table, blob_url, col_index, eng_ref, result);
586
int dropTable(const char *table_path, PBMSResultPtr result)
590
if ((err = getSharedMemory(true, result)))
593
if (!sharedMemory->sm_callbacks)
596
return sharedMemory->sm_callbacks->cb_drop_table(table_path, result);
599
int renameTable(const char *from_table, const char *to_table, PBMSResultPtr result)
603
if ((err = getSharedMemory(true, result)))
606
if (!sharedMemory->sm_callbacks)
609
return sharedMemory->sm_callbacks->cb_rename_table(from_table, to_table, result);
414
return sharedMemory->sm_callbacks->cb_release_blob(built_in, db_name, tab_name, blob_url, result);
417
int dropTable(const char *db_name, const char *tab_name, PBMSResultPtr result)
421
if ((err = getSharedMemory(false, result)))
424
if (!sharedMemory->sm_callbacks)
427
return sharedMemory->sm_callbacks->cb_drop_table(built_in, db_name, tab_name, result);
430
int renameTable(const char *db_name, const char *from_table, const char *to_table, PBMSResultPtr result)
434
if ((err = getSharedMemory(false, result)))
437
if (!sharedMemory->sm_callbacks)
440
return sharedMemory->sm_callbacks->cb_rename_table(built_in, db_name, from_table, to_table, result);
443
void completed(int ok)
445
PBMSResultRec result;
447
if (getSharedMemory(false, &result))
450
if (!sharedMemory->sm_callbacks)
453
sharedMemory->sm_callbacks->cb_completed(built_in, ok);
612
456
volatile PBMSSharedMemoryPtr sharedMemory;
838
680
extern void PBMSGetError(void *v_bs_thread, PBMSResultPtr result);
841
* PBMSCreateBlob():Creates a new blob in the database of the given size. cont_type can be NULL.
683
* PBMSCreateBlob():Creates a new blob in the database of the given size.
843
extern bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, char *cont_type, u_int64_t size);
685
extern bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, u_int64_t size);
846
688
* PBMSWriteBlob():Write the data to the blob in one or more chunks. The total size of all the chuncks of
847
689
* data written to the blob must match the size specified when the blob was created.
849
extern bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *database_name, char *data, size_t size, size_t offset);
691
extern bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *data, size_t size, size_t offset);
852
694
* PBMSReadBlob():Read the blob data out of the blob in one or more chunks.
854
extern bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *database_name, char *buffer, size_t *size, size_t offset);
696
extern bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *buffer, size_t *size, size_t offset);
857
699
* PBMSIDToURL():Convert a blob id to a blob URL. The 'url' buffer must be atleast PBMS_BLOB_URL_SIZE bytes in size.
859
extern bool PBMSIDToURL(PBMSBlobIDPtr blob_id, char *database_name, char *url);
701
extern bool PBMSIDToURL(PBMSBlobIDPtr blob_id, char *url);
862
704
* PBMSIDToURL():Convert a blob URL to a blob ID.