~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pbms/src/system_table_ms.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-10-02 14:17:48 UTC
  • mfrom: (1.1.1 upstream)
  • mto: (2.1.17 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101002141748-m6vbfbfjhrw1153e
Tags: 2010.09.1802-1
* New upstream release.
* Removed pid-file argument hack.
* Updated GPL-2 address to be new address.
* Directly copy in drizzledump.1 since debian doesn't have sphinx 1.0 yet.
* Link to jquery from libjs-jquery. Add it as a depend.
* Add drizzled.8 symlink to the install files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original author: Paul McCullagh
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-07-18
 
23
 *
 
24
 * H&G2JCtL
 
25
 *
 
26
 * System tables.
 
27
 *
 
28
 */
 
29
 
 
30
#ifdef DRIZZLED
 
31
#include "config.h"
 
32
#include <drizzled/common.h>
 
33
#include <drizzled/session.h>
 
34
#include <drizzled/table.h>
 
35
#include <drizzled/field.h>
 
36
#include <drizzled/field/blob.h>
 
37
 
 
38
#include <drizzled/message/table.pb.h>
 
39
#include "drizzled/charset_info.h"
 
40
#include <drizzled/table_proto.h>
 
41
#endif
 
42
 
 
43
 
 
44
#include "cslib/CSConfig.h"
 
45
#include <inttypes.h>
 
46
 
 
47
#include <sys/types.h>
 
48
#include <sys/stat.h>
 
49
#include <stdlib.h>
 
50
#include <time.h>
 
51
 
 
52
//#include "mysql_priv.h"
 
53
//#include <plugin.h>
 
54
 
 
55
#include "cslib/CSGlobal.h"
 
56
#include "cslib/CSStrUtil.h"
 
57
#include "ha_pbms.h"
 
58
 
 
59
#include "mysql_ms.h"
 
60
#include "engine_ms.h"
 
61
#include "system_table_ms.h"
 
62
#include "repository_ms.h"
 
63
#include "database_ms.h"
 
64
#include "compactor_ms.h"
 
65
#include "open_table_ms.h"
 
66
#include "metadata_ms.h"
 
67
#ifdef HAVE_ALIAS_SUPPORT
 
68
#include "alias_ms.h"
 
69
#endif
 
70
#include "cloud_ms.h"
 
71
#include "transaction_ms.h"
 
72
 
 
73
#include "systab_httpheader_ms.h"
 
74
#include "systab_dump_ms.h"
 
75
#include "systab_variable_ms.h"
 
76
#include "systab_cloud_ms.h"
 
77
#include "systab_backup_ms.h"
 
78
#ifndef DRIZZLED
 
79
#include "systab_enabled_ms.h"
 
80
#endif
 
81
#include "discover_ms.h"
 
82
#include "parameters_ms.h"
 
83
 
 
84
///* Note: mysql_priv.h messes with new, which caused a crash. */
 
85
//#ifdef new
 
86
//#undef new
 
87
//#endif
 
88
 
 
89
/* Definitions for PBMS table discovery: */
 
90
//--------------------------------
 
91
static DT_FIELD_INFO pbms_repository_info[]=
 
92
{
 
93
#ifdef HAVE_ALIAS_SUPPORT
 
94
        {"Blob_alias",                  BLOB_ALIAS_LENGTH, NULL, MYSQL_TYPE_VARCHAR,            &my_charset_utf8_bin,   0,                                                              "The BLOB alias"},
 
95
#endif
 
96
        {"Repository_id",               NULL, NULL, MYSQL_TYPE_LONG,            NULL,                                   NOT_NULL_FLAG,                                  "The repository file number"},
 
97
        {"Repo_blob_offset",    NULL, NULL, MYSQL_TYPE_LONGLONG,        NULL,                                   NOT_NULL_FLAG,                                  "The offset of the BLOB in the repository file"},
 
98
        {"Blob_size",                   NULL, NULL, MYSQL_TYPE_LONGLONG,        NULL,                                   NOT_NULL_FLAG,                                  "The size of the BLOB in bytes"},
 
99
        {"MD5_Checksum",                32,   NULL,     MYSQL_TYPE_VARCHAR,             system_charset_info,    0,                                                              "The MD5 Digest of the BLOB data."},
 
100
        {"Head_size",                   NULL, NULL, MYSQL_TYPE_SHORT,           NULL,                                   NOT_NULL_FLAG | UNSIGNED_FLAG,  "The size of the BLOB header - proceeds the BLOB data"},
 
101
        {"Access_code",                 NULL, NULL, MYSQL_TYPE_LONG,            NULL,                                   NOT_NULL_FLAG,                                  "The 4-byte authorisation code required to access the BLOB - part of the BLOB URL"},
 
102
        {"Creation_time",               NULL, NULL, MYSQL_TYPE_TIMESTAMP,       NULL,                                   NOT_NULL_FLAG,                                  "The time the BLOB was created"},
 
103
        {"Last_ref_time",               NULL, NULL, MYSQL_TYPE_TIMESTAMP,       NULL,                                   0,                                                              "The last time the BLOB was referenced"},
 
104
        {"Last_access_time",    NULL, NULL, MYSQL_TYPE_TIMESTAMP,       NULL,                                   0,                                                              "The last time the BLOB was accessed (read)"},
 
105
        {"Access_count",                NULL, NULL, MYSQL_TYPE_LONG,            NULL,                                   NOT_NULL_FLAG,                                  "The count of the number of times the BLOB has been read"},
 
106
        {NULL,NULL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
 
107
};
 
108
 
 
109
#ifdef PBMS_HAS_KEYS
 
110
static DT_KEY_INFO pbms_repository_keys[]=
 
111
{
 
112
        {"pbms_repository_pk", PRI_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
 
113
        {NULL, 0, {NULL}}
 
114
};
 
115
#endif
 
116
 
 
117
static DT_FIELD_INFO pbms_metadata_info[]=
 
118
{
 
119
        {"Repository_id",               NULL,                                   NULL, MYSQL_TYPE_LONG,          NULL,                                                   NOT_NULL_FLAG,  "The repository file number"},
 
120
        {"Repo_blob_offset",    NULL,                                   NULL, MYSQL_TYPE_LONGLONG,      NULL,                                                   NOT_NULL_FLAG,  "The offset of the BLOB in the repository file"},
 
121
        {"Name",                                MS_META_NAME_SIZE,      NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,  NOT_NULL_FLAG,  "Metadata name"},
 
122
        {"Value",                               MS_META_VALUE_SIZE,     NULL, MYSQL_TYPE_VARCHAR,       &UTF8_CHARSET,                  NOT_NULL_FLAG,  "Metadata value"},
 
123
        {NULL,                                  NULL,                                   NULL, MYSQL_TYPE_STRING,        NULL, 0, NULL}
 
124
};
 
125
 
 
126
#ifdef PBMS_HAS_KEYS
 
127
static DT_KEY_INFO pbms_metadata_keys[]=
 
128
{
 
129
        {"pbms_metadata_pk", PRI_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
 
130
        {NULL, 0, {NULL}}
 
131
};
 
132
#endif
 
133
 
 
134
 
 
135
#ifdef HAVE_ALIAS_SUPPORT
 
136
static DT_FIELD_INFO pbms_alias_info[]=
 
137
{
 
138
        {"Repository_id",               NULL, NULL, MYSQL_TYPE_LONG,            NULL,                           NOT_NULL_FLAG,  "The repository file number"},
 
139
        {"Repo_blob_offset",    NULL, NULL, MYSQL_TYPE_LONGLONG,        NULL,                           NOT_NULL_FLAG,  "The offset of the BLOB in the repository file"},
 
140
        {"Blob_alias",                  BLOB_ALIAS_LENGTH, NULL, MYSQL_TYPE_VARCHAR,            &my_charset_utf8_bin,   NOT_NULL_FLAG,                  "The BLOB alias"},
 
141
        {NULL,NULL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
 
142
};
 
143
 
 
144
static DT_KEY_INFO pbms_alias_keys[]=
 
145
{
 
146
        {"pbms_alias_pk", PRI_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
 
147
        {NULL, 0, {NULL}}
 
148
};
 
149
#endif
 
150
 
 
151
static DT_FIELD_INFO pbms_blobs_info[]=
 
152
{
 
153
        {"Repository_id",               NULL, NULL, MYSQL_TYPE_LONG,            NULL,                           NOT_NULL_FLAG,  "The repository file number"},
 
154
        {"Repo_blob_offset",    NULL, NULL, MYSQL_TYPE_LONGLONG,        NULL,                           NOT_NULL_FLAG,  "The offset of the BLOB in the repository file"},
 
155
        {"Blob_data",                   NULL, NULL, MYSQL_TYPE_LONG_BLOB,       &my_charset_bin,        NOT_NULL_FLAG,  "The data of this BLOB"},
 
156
        {NULL,NULL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
 
157
};
 
158
 
 
159
#ifdef PBMS_HAS_KEYS
 
160
static DT_KEY_INFO pbms_blobs_keys[]=
 
161
{
 
162
        {"pbms_blobs_pk", PRI_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
 
163
        {NULL, 0, {NULL}}
 
164
};
 
165
#endif
 
166
 
 
167
static DT_FIELD_INFO pbms_reference_info[]=
 
168
{
 
169
        {"Table_name",          MS_TABLE_NAME_SIZE,             NULL, MYSQL_TYPE_STRING,        system_charset_info,    0,      "The name of the referencing table"},
 
170
        {"Column_ordinal",      NULL,                                   NULL, MYSQL_TYPE_LONG,          NULL,                                   0,      "The column ordinal of the referencing field"},
 
171
        {"Blob_id",                     NULL,                                   NULL, MYSQL_TYPE_LONGLONG,      NULL,                                   NOT_NULL_FLAG,  "The BLOB reference number - part of the BLOB URL"},
 
172
        {"Blob_url",            PBMS_BLOB_URL_SIZE,             NULL, MYSQL_TYPE_VARCHAR,       system_charset_info,    0,      "The BLOB URL for HTTP GET access"},
 
173
        {"Repository_id",       NULL,                                   NULL, MYSQL_TYPE_LONG,          NULL,                                   NOT_NULL_FLAG,  "The repository file number of the BLOB"},
 
174
        {"Repo_blob_offset",NULL,                                       NULL, MYSQL_TYPE_LONGLONG,      NULL,                                   NOT_NULL_FLAG,  "The offset in the repository file"},
 
175
        {"Blob_size",           NULL,                                   NULL, MYSQL_TYPE_LONGLONG,      NULL,                                   NOT_NULL_FLAG,  "The size of the BLOB in bytes"},
 
176
        {"Deletion_time",       NULL,                                   NULL, MYSQL_TYPE_TIMESTAMP,     NULL,                                   0,                              "The time the BLOB was deleted"},
 
177
        {"Remove_in",           NULL,                                   NULL, MYSQL_TYPE_LONG,          NULL,                                   0,                              "The number of seconds before the reference/BLOB is removed perminently"},
 
178
        {"Temp_log_id",         NULL,                                   NULL, MYSQL_TYPE_LONG,          NULL,                                   0,                              "Temporary log number of the referencing deletion entry"},
 
179
        {"Temp_log_offset",     NULL,                                   NULL, MYSQL_TYPE_LONGLONG,      NULL,                                   0,                              "Temporary log offset of the referencing deletion entry"},
 
180
        {NULL,NULL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
 
181
};
 
182
 
 
183
#ifdef PBMS_HAS_KEYS
 
184
static DT_KEY_INFO pbms_reference_keys[]=
 
185
{
 
186
        {"pbms_reference_pk", PRI_KEY_FLAG, {"Table_name", "Blob_id", NULL}},
 
187
        {"pbms_reference_k", MULTIPLE_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
 
188
        {NULL, 0, {NULL}}
 
189
};
 
190
#endif
 
191
 
 
192
 
 
193
typedef enum {  SYS_REP = 0, 
 
194
                                SYS_REF, 
 
195
                                SYS_BLOB, 
 
196
                                SYS_DUMP, 
 
197
                                SYS_META, 
 
198
                                SYS_HTTP, 
 
199
#ifdef HAVE_ALIAS_SUPPORT
 
200
                                SYS_ALIAS, 
 
201
#endif
 
202
                                SYS_VARIABLE, 
 
203
                                SYS_CLOUD, 
 
204
                                SYS_BACKUP, 
 
205
#ifndef DRIZZLED
 
206
                                SYS_ENABLED, 
 
207
#endif
 
208
                                SYS_UNKNOWN} SysTableType;
 
209
                                
 
210
static const char *sysTableNames[] = {
 
211
        "pbms_repository",
 
212
        "pbms_reference",
 
213
        "pbms_blob",
 
214
        "pbms_dump",
 
215
        "pbms_metadata",
 
216
        METADATA_HEADER_NAME,
 
217
#ifdef HAVE_ALIAS_SUPPORT
 
218
        "pbms_alias",
 
219
#endif
 
220
        VARIABLES_TABLE_NAME,
 
221
        CLOUD_TABLE_NAME,
 
222
        BACKUP_TABLE_NAME,
 
223
#ifndef DRIZZLED
 
224
        ENABLED_TABLE_NAME,
 
225
#endif
 
226
        NULL
 
227
};
 
228
 
 
229
static INTERRNAL_TABLE_INFO pbms_internal_tables[]=
 
230
{
 
231
#ifdef PBMS_HAS_KEYS
 
232
        { false, sysTableNames[SYS_REP],pbms_repository_info, pbms_repository_keys},
 
233
        { false, sysTableNames[SYS_REF], pbms_reference_info, pbms_reference_keys},
 
234
        { false, sysTableNames[SYS_BLOB], pbms_blobs_info, pbms_blobs_keys},
 
235
        { false, sysTableNames[SYS_DUMP], pbms_dump_info, pbms_dump_keys},
 
236
        { false, sysTableNames[SYS_META], pbms_metadata_info, pbms_metadata_keys},
 
237
        { false, sysTableNames[SYS_HTTP], pbms_metadata_headers_info, pbms_metadata_headers_keys},
 
238
#ifdef HAVE_ALIAS_SUPPORT
 
239
        { false, sysTableNames[SYS_ALIAS], pbms_alias_info, pbms_alias_keys},
 
240
#endif
 
241
        { false, sysTableNames[SYS_VARIABLE], pbms_variable_info, pbms_variable_keys},
 
242
        { true, sysTableNames[SYS_CLOUD], pbms_cloud_info, pbms_cloud_keys},
 
243
        { true, sysTableNames[SYS_BACKUP], pbms_backup_info, pbms_backup_keys},
 
244
#ifndef DRIZZLED
 
245
        { true, sysTableNames[SYS_ENABLED], pbms_enabled_info, pbms_enabled_keys},
 
246
#endif
 
247
#else
 
248
        { false, sysTableNames[SYS_REP], pbms_repository_info, NULL},
 
249
        { false, sysTableNames[SYS_REF], pbms_reference_info, NULL},
 
250
        { false, sysTableNames[SYS_BLOB], pbms_blobs_info, NULL},
 
251
        { false, sysTableNames[SYS_DUMP], pbms_dump_info, NULL},
 
252
        { false, sysTableNames[SYS_META], pbms_metadata_info, NULL},
 
253
        { false, sysTableNames[SYS_HTTP], pbms_metadata_headers_info, NULL},
 
254
#ifdef HAVE_ALIAS_SUPPORT
 
255
        { false, sysTableNames[SYS_ALIAS], pbms_alias_info, NULL},
 
256
#endif
 
257
        { false, sysTableNames[SYS_VARIABLE], pbms_variable_info, NULL},
 
258
        { true, sysTableNames[SYS_CLOUD], pbms_cloud_info, NULL},
 
259
        { true, sysTableNames[SYS_BACKUP], pbms_backup_info, NULL},
 
260
#ifndef DRIZZLED
 
261
        { true, sysTableNames[SYS_ENABLED], pbms_enabled_info, NULL},
 
262
#endif
 
263
#endif
 
264
 
 
265
        { false, NULL, NULL, NULL}
 
266
        
 
267
};
 
268
 
 
269
//--------------------------
 
270
static SysTableType pbms_systable_type(const char *table)
 
271
{
 
272
        int i = 0;
 
273
        
 
274
        while ((i < SYS_UNKNOWN) && strcasecmp(table, sysTableNames[i])) i++;
 
275
        
 
276
        return((SysTableType) i );
 
277
}
 
278
 
 
279
//--------------------------
 
280
bool PBMSSystemTables::isSystemTable(bool isPBMS, const char *table)
 
281
{
 
282
        SysTableType i;
 
283
        
 
284
        i = pbms_systable_type(table);
 
285
 
 
286
        if (i == SYS_UNKNOWN)
 
287
                return false;
 
288
                
 
289
        return (pbms_internal_tables[i].is_pbms == isPBMS);
 
290
}
 
291
 
 
292
//--------------------------
 
293
#ifdef DRIZZLED
 
294
using namespace std;
 
295
using namespace drizzled;
 
296
#undef TABLE
 
297
#undef Field
 
298
static int pbms_create_proto_table(const char *engine_name, const char *name, DT_FIELD_INFO *info, DT_KEY_INFO *keys, drizzled::message::Table &table)
 
299
{
 
300
        message::Table::Field *field;
 
301
        message::Table::Field::FieldConstraints *field_constraints;
 
302
        message::Table::Field::StringFieldOptions *string_field_options;
 
303
        message::Table::TableOptions *table_options;
 
304
 
 
305
        table.set_name(name);
 
306
        table.set_name(name);
 
307
        table.set_type(message::Table::STANDARD);
 
308
        table.mutable_engine()->set_name(engine_name);
 
309
        
 
310
        table_options = table.mutable_options();
 
311
        table_options->set_collation_id(my_charset_utf8_bin.number);
 
312
        table_options->set_collation(my_charset_utf8_bin.name);
 
313
        
 
314
        while (info->field_name) {      
 
315
                field= table.add_field();
 
316
                
 
317
                field->set_name(info->field_name);
 
318
                if (info->comment)
 
319
                        field->set_comment(info->comment);
 
320
                        
 
321
                field_constraints= field->mutable_constraints();
 
322
                if (info->field_flags & NOT_NULL_FLAG)
 
323
                        field_constraints->set_is_nullable(false);
 
324
                else
 
325
                        field_constraints->set_is_nullable(true);
 
326
                
 
327
                if (info->field_flags & UNSIGNED_FLAG)
 
328
                        field_constraints->set_is_unsigned(true);
 
329
                else
 
330
                        field_constraints->set_is_unsigned(false);
 
331
 
 
332
                switch (info->field_type) {
 
333
                        case DRIZZLE_TYPE_VARCHAR:
 
334
                                string_field_options = field->mutable_string_options();
 
335
                                
 
336
                                field->set_type(message::Table::Field::VARCHAR);
 
337
                                string_field_options->set_length(info->field_length);
 
338
                                if (info->field_charset) {
 
339
                                        string_field_options->set_collation(info->field_charset->name);
 
340
                                        string_field_options->set_collation_id(info->field_charset->number);
 
341
                                }
 
342
                                break;
 
343
                                
 
344
                        case DRIZZLE_TYPE_LONG:
 
345
                                field->set_type(message::Table::Field::INTEGER);
 
346
                                break;
 
347
                                
 
348
                        case DRIZZLE_TYPE_DOUBLE:
 
349
                                field->set_type(message::Table::Field::DOUBLE);
 
350
                                break;
 
351
                                
 
352
                        case DRIZZLE_TYPE_LONGLONG:
 
353
                                field->set_type(message::Table::Field::BIGINT);
 
354
                                break;
 
355
                                
 
356
                        case DRIZZLE_TYPE_TIMESTAMP:
 
357
                                field->set_type(message::Table::Field::TIMESTAMP);
 
358
                                break;
 
359
                                
 
360
                        case DRIZZLE_TYPE_BLOB:
 
361
                                field->set_type(message::Table::Field::BLOB);
 
362
                                if (info->field_charset) {
 
363
                                        string_field_options = field->mutable_string_options();
 
364
                                        string_field_options->set_collation(info->field_charset->name);
 
365
                                        string_field_options->set_collation_id(info->field_charset->number);
 
366
                                }
 
367
                                break;
 
368
                                
 
369
                        default:
 
370
                                assert(0); 
 
371
                }
 
372
                info++;
 
373
        }
 
374
        
 
375
                        
 
376
        if (keys) {
 
377
                while (keys->key_name) {
 
378
                        // To be done later. (maybe)
 
379
                        keys++;
 
380
                }
 
381
        }
 
382
 
 
383
        return 0;
 
384
}
 
385
#define TABLE                                                           drizzled::Table
 
386
#define Field                                                           drizzled::Field
 
387
 
 
388
int PBMSSystemTables::getSystemTableInfo(const char *name, drizzled::message::Table &table)
 
389
{
 
390
        int err = 1, i = 0;
 
391
                        
 
392
        while (pbms_internal_tables[i].name) {
 
393
                if (strcasecmp(name, pbms_internal_tables[i].name) == 0){
 
394
                        err = pbms_create_proto_table("PBMS", name, pbms_internal_tables[i].info, pbms_internal_tables[i].keys, table);
 
395
                        break;
 
396
                }
 
397
                i++;
 
398
        }
 
399
        
 
400
        return err;
 
401
}
 
402
 
 
403
void PBMSSystemTables::getSystemTableNames(bool isPBMS, std::set<std::string> &set_of_names)
 
404
{
 
405
        int i = 0;
 
406
                        
 
407
        while (pbms_internal_tables[i].name) {
 
408
                if ( isPBMS == pbms_internal_tables[i].is_pbms){
 
409
                        set_of_names.insert(pbms_internal_tables[i].name);
 
410
                }
 
411
                i++;
 
412
        }
 
413
        
 
414
}
 
415
 
 
416
#else // DRIZZLED
 
417
//--------------------------
 
418
static bool pbms_database_follder_exists( const char *db)
 
419
{
 
420
        struct stat stat_info;  
 
421
        char path[PATH_MAX];
 
422
        
 
423
        if (!db)
 
424
                return false;
 
425
                
 
426
        cs_strcpy(PATH_MAX, path, ms_my_get_mysql_home_path());
 
427
        cs_add_name_to_path(PATH_MAX, path, db);
 
428
        
 
429
        if (stat(path, &stat_info) == 0)
 
430
                return(stat_info.st_mode & S_IFDIR);
 
431
                
 
432
        return false;
 
433
}
 
434
 
 
435
int pbms_discover_system_tables(handlerton *hton, THD* thd, const char *db, const char *name, uchar **frmblob, size_t *frmlen)
 
436
{
 
437
        int err = 1, i = 0;
 
438
        bool is_pbms = false;
 
439
 
 
440
        // Check that the database exists!
 
441
        if (!pbms_database_follder_exists(db))
 
442
                return err;
 
443
                
 
444
        is_pbms = (strcmp(db, "pbms") == 0);
 
445
                
 
446
        
 
447
        while (pbms_internal_tables[i].name) {
 
448
                if ((!strcasecmp(name, pbms_internal_tables[i].name)) && ( is_pbms == pbms_internal_tables[i].is_pbms)){
 
449
                        err = ms_create_table_frm(hton, thd, db, name, pbms_internal_tables[i].info, pbms_internal_tables[i].keys, frmblob, frmlen);
 
450
                        break;
 
451
                }
 
452
                i++;
 
453
        }
 
454
        
 
455
        return err;
 
456
}
 
457
#endif // DRIZZLED
 
458
 
 
459
// Transfer any physical PBMS ststem tables to another database.
 
460
void PBMSSystemTables::transferSystemTables(MSDatabase *dst_db, MSDatabase *src_db)
 
461
{
 
462
        enter_();
 
463
        push_(dst_db);
 
464
        push_(src_db);
 
465
        
 
466
        MSHTTPHeaderTable::transferTable(RETAIN(dst_db), RETAIN(src_db));
 
467
        MSVariableTable::transferTable(RETAIN(dst_db), RETAIN(src_db));
 
468
        MSCloudTable::transferTable(RETAIN(dst_db), RETAIN(src_db));
 
469
        MSBackupTable::transferTable(RETAIN(dst_db), RETAIN(src_db));
 
470
        
 
471
        release_(src_db);
 
472
        release_(dst_db);
 
473
        exit_();
 
474
}
 
475
 
 
476
//----------------
 
477
void PBMSSystemTables::removeSystemTables(CSString *db_path)
 
478
{
 
479
        enter_();
 
480
        push_(db_path);
 
481
        
 
482
        MSHTTPHeaderTable::removeTable(RETAIN(db_path));
 
483
        MSVariableTable::removeTable(RETAIN(db_path));
 
484
        MSCloudTable::removeTable(RETAIN(db_path));
 
485
        MSBackupTable::removeTable(RETAIN(db_path));
 
486
        
 
487
        release_(db_path);
 
488
        exit_();
 
489
}
 
490
 
 
491
//----------------
 
492
void PBMSSystemTables::loadSystemTables(MSDatabase *db)
 
493
{
 
494
        int i = 0;
 
495
        
 
496
        CLOBBER_PROTECT(i);
 
497
        enter_();
 
498
        push_(db);
 
499
        
 
500
        for ( i = 0; i < 4; i++) {
 
501
                try_(a) {
 
502
                        switch (i) {
 
503
                                case 0:
 
504
                                        MSHTTPHeaderTable::loadTable(RETAIN(db));
 
505
                                        break;
 
506
                                case 1:
 
507
                                        MSCloudTable::loadTable(RETAIN(db));
 
508
                                        break;
 
509
                                case 2:
 
510
                                        MSBackupTable::loadTable(RETAIN(db));
 
511
                                        break;
 
512
                                case 3:
 
513
                                        // Variable must be loaded after cloud and backup info
 
514
                                        // incase BLOB recovery is required.
 
515
                                        MSVariableTable::loadTable(RETAIN(db)); 
 
516
                                        break;
 
517
                                        
 
518
                                default:
 
519
                                        ASSERT(false);
 
520
                        }
 
521
                }
 
522
                catch_(a) {
 
523
                        self->logException();
 
524
                }
 
525
                cont_(a);
 
526
        }
 
527
        
 
528
        release_(db);
 
529
        exit_();
 
530
}
 
531
 
 
532
//----------------
 
533
// Dump all the system tables into one buffer.
 
534
typedef struct {
 
535
        CSDiskValue4 size_4;
 
536
        CSDiskValue1 tab_id_1;
 
537
        CSDiskValue4 tab_version_4;
 
538
} DumpHeaderRec, *DumpHeaderPtr;
 
539
 
 
540
typedef union {
 
541
                char *rec_chars;
 
542
                const char *rec_cchars;
 
543
                DumpHeaderPtr dumpHeader;
 
544
} DumpDiskData;
 
545
 
 
546
CSStringBuffer *PBMSSystemTables::dumpSystemTables(MSDatabase *db)
 
547
{
 
548
        CSStringBuffer *sysDump, *tabDump = NULL;
 
549
        uint32_t size, pos, tab_version;
 
550
        uint8_t tab_id = 0;
 
551
        DumpDiskData    d;
 
552
 
 
553
        enter_();
 
554
        push_(db);
 
555
        new_(sysDump, CSStringBuffer());
 
556
        push_(sysDump);
 
557
        pos = 0;
 
558
        
 
559
        for ( int i = 0; i < 4; i++) {
 
560
                switch (i) {
 
561
                        case 0:
 
562
                                tabDump = MSHTTPHeaderTable::dumpTable(RETAIN(db));
 
563
                                tab_id = MSHTTPHeaderTable::tableID;
 
564
                                tab_version = MSHTTPHeaderTable::tableVersion;
 
565
                                break;
 
566
                        case 1:
 
567
                                tabDump = MSCloudTable::dumpTable(RETAIN(db));
 
568
                                tab_id = MSCloudTable::tableID;
 
569
                                tab_version = MSCloudTable::tableVersion;
 
570
                                break;
 
571
                        case 2:
 
572
                                tabDump = MSBackupTable::dumpTable(RETAIN(db));
 
573
                                tab_id = MSBackupTable::tableID;
 
574
                                tab_version = MSBackupTable::tableVersion;
 
575
                                break;
 
576
                        case 3:
 
577
                                tabDump = MSVariableTable::dumpTable(RETAIN(db)); // Dump the variables table last.
 
578
                                tab_id = MSVariableTable::tableID;
 
579
                                tab_version = MSVariableTable::tableVersion;
 
580
                                break;
 
581
                                
 
582
                        default:
 
583
                                ASSERT(false);
 
584
                }
 
585
                
 
586
                push_(tabDump);
 
587
                size = tabDump->length();
 
588
                
 
589
                // Grow the buffer for the header
 
590
                sysDump->setLength(pos + sizeof(DumpHeaderRec));
 
591
                
 
592
                // Add the dump header 
 
593
                d.rec_chars = sysDump->getBuffer(pos); 
 
594
                CS_SET_DISK_4(d.dumpHeader->size_4, size);;             
 
595
                CS_SET_DISK_1(d.dumpHeader->tab_id_1, tab_id);          
 
596
                CS_SET_DISK_4(d.dumpHeader->tab_version_4, tab_version);        
 
597
        
 
598
                sysDump->append(tabDump->getBuffer(0), size);
 
599
                pos += size + sizeof(DumpHeaderRec);
 
600
                release_(tabDump);
 
601
        }
 
602
        
 
603
        
 
604
        pop_(sysDump);
 
605
        release_(db);
 
606
        return_(sysDump);
 
607
}
 
608
 
 
609
//----------------
 
610
void PBMSSystemTables::restoreSystemTables(MSDatabase *db, const char *data, size_t size)
 
611
{
 
612
        uint32_t tab_size, tab_version;
 
613
        uint8_t tab_id;
 
614
        DumpDiskData    d;
 
615
 
 
616
        enter_();
 
617
        push_(db);
 
618
        
 
619
        while  ( size >= sizeof(DumpHeaderRec)) {
 
620
                d.rec_cchars = data;
 
621
                tab_size = CS_GET_DISK_4(d.dumpHeader->size_4);
 
622
                tab_id = CS_GET_DISK_1(d.dumpHeader->tab_id_1);                 
 
623
                tab_version = CS_GET_DISK_4(d.dumpHeader->tab_version_4);       
 
624
                data += sizeof(DumpHeaderRec);
 
625
                size -= sizeof(DumpHeaderRec);
 
626
                
 
627
                if (size < tab_size) {
 
628
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "PBMS system table restore data truncated.");
 
629
                }
 
630
                
 
631
                switch (tab_id) {
 
632
                        case MSHTTPHeaderTable::tableID:
 
633
                                if (MSHTTPHeaderTable::tableVersion == tab_version)
 
634
                                        MSHTTPHeaderTable::restoreTable(RETAIN(db), data, tab_size);
 
635
                                else
 
636
                                        CSException::logException(CS_CONTEXT, MS_ERR_SYSTAB_VERSION, "Restore "METADATA_HEADER_NAME" failed, incompatable table version" );
 
637
                                break;
 
638
                        case MSCloudTable::tableID:
 
639
                                if (MSCloudTable::tableVersion == tab_version)
 
640
                                        MSCloudTable::restoreTable(RETAIN(db), data, tab_size);
 
641
                                else
 
642
                                        CSException::logException(CS_CONTEXT, MS_ERR_SYSTAB_VERSION, "Restore "CLOUD_TABLE_NAME" failed, incompatable table version" );
 
643
                                break;
 
644
                        case MSBackupTable::tableID:
 
645
                                if (MSBackupTable::tableVersion == tab_version)
 
646
                                        MSBackupTable::restoreTable(RETAIN(db), data, tab_size);
 
647
                                else
 
648
                                        CSException::logException(CS_CONTEXT, MS_ERR_SYSTAB_VERSION, "Restore "BACKUP_TABLE_NAME" failed, incompatable table version" );
 
649
                                break;                          
 
650
                        case MSVariableTable::tableID:
 
651
                                if (MSVariableTable::tableVersion == tab_version)
 
652
                                        MSVariableTable::restoreTable(RETAIN(db), data, tab_size);
 
653
                                else
 
654
                                        CSException::logException(CS_CONTEXT, MS_ERR_SYSTAB_VERSION, "Restore "VARIABLES_TABLE_NAME" failed, incompatable table version" );
 
655
                                break;
 
656
                        default:
 
657
                                ASSERT(false);
 
658
                }
 
659
                 data += tab_size;
 
660
                 size -= tab_size;
 
661
        }
 
662
        
 
663
        if (size) {
 
664
                CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "PBMS trailing garbage in system table restore data being ignored.");
 
665
        }
 
666
                
 
667
        release_(db);
 
668
        exit_();
 
669
}
 
670
 
 
671
/*
 
672
 * -------------------------------------------------------------------------
 
673
 * MYSQL UTILITIES
 
674
 */
 
675
 
 
676
void MSOpenSystemTable::setNotNullInRecord(Field *field, char *record)
 
677
{
 
678
        if (field->null_ptr)
 
679
                record[(uint) (field->null_ptr - (uchar *) field->getTable()->getInsertRecord())] &= (uchar) ~field->null_bit;
 
680
}
 
681
 
 
682
/*
 
683
 * -------------------------------------------------------------------------
 
684
 * OPEN SYSTEM TABLES
 
685
 */
 
686
 
 
687
MSOpenSystemTable::MSOpenSystemTable(MSSystemTableShare *share, TABLE *table):
 
688
CSRefObject()
 
689
{
 
690
        myShare = share;
 
691
        mySQLTable = table;
 
692
}
 
693
 
 
694
MSOpenSystemTable::~MSOpenSystemTable()
 
695
{
 
696
        MSSystemTableShare::releaseSystemTable(this);
 
697
}
 
698
 
 
699
 
 
700
/*
 
701
 * -------------------------------------------------------------------------
 
702
 * REPOSITORY TABLE
 
703
 */
 
704
 
 
705
MSRepositoryTable::MSRepositoryTable(MSSystemTableShare *share, TABLE *table):
 
706
MSOpenSystemTable(share, table),
 
707
iCompactor(NULL),
 
708
iRepoFile(NULL),
 
709
iBlobBuffer(NULL)
 
710
{
 
711
}
 
712
 
 
713
//-----------------------
 
714
MSRepositoryTable::~MSRepositoryTable()
 
715
{
 
716
        unuse();
 
717
        
 
718
        if (iBlobBuffer)
 
719
                iBlobBuffer->release();
 
720
}
 
721
 
 
722
//-----------------------
 
723
void MSRepositoryTable::use()
 
724
{
 
725
}
 
726
 
 
727
//-----------------------
 
728
void MSRepositoryTable::unuse()
 
729
{
 
730
        if (iCompactor) {
 
731
                iCompactor->resume();
 
732
                iCompactor->release();
 
733
                iCompactor = NULL;
 
734
        }
 
735
        if (iRepoFile) {
 
736
                iRepoFile->release();
 
737
                iRepoFile = NULL;
 
738
        }
 
739
        if (iBlobBuffer)
 
740
                iBlobBuffer->clear();
 
741
}
 
742
 
 
743
 
 
744
//-----------------------
 
745
void MSRepositoryTable::seqScanInit()
 
746
{
 
747
        MSDatabase *db;
 
748
 
 
749
        enter_();
 
750
        
 
751
        // Flush all committed transactions to the repository file.
 
752
        MSTransactionManager::flush();
 
753
 
 
754
        iRepoIndex = 0;
 
755
        iRepoOffset = 0;
 
756
        if (!iBlobBuffer)
 
757
                new_(iBlobBuffer, CSStringBuffer(20));
 
758
        db = myShare->mySysDatabase;
 
759
        if ((iCompactor = db->getCompactorThread())) {
 
760
                if (iCompactor->isMe(self))
 
761
                        iCompactor = NULL;
 
762
                else {
 
763
                        iCompactor->retain();
 
764
                        iCompactor->suspend();
 
765
                }
 
766
        }
 
767
                
 
768
        exit_();
 
769
}
 
770
 
 
771
//-----------------------
 
772
bool MSRepositoryTable::resetScan(bool positioned, uint32_t repo_index)
 
773
{
 
774
        if (positioned) {
 
775
                if (iRepoFile && (repo_index != iRepoIndex)) {
 
776
                        iRepoFile->release();
 
777
                        iRepoFile = NULL;
 
778
                }
 
779
                
 
780
                iRepoIndex = repo_index;
 
781
        }
 
782
        if (iRepoFile) 
 
783
                return true;
 
784
                
 
785
        enter_();
 
786
        MSRepository    *repo = NULL;
 
787
        CSSyncVector    *repo_list = myShare->mySysDatabase->getRepositoryList();
 
788
 
 
789
        lock_(repo_list);
 
790
        for (; iRepoIndex<repo_list->size(); iRepoIndex++) {
 
791
                if ((repo = (MSRepository *) repo_list->get(iRepoIndex))) {
 
792
                        iRepoFile = repo->openRepoFile();
 
793
                        break;
 
794
                }
 
795
        }
 
796
        unlock_(repo_list);
 
797
        
 
798
        if (!iRepoFile)
 
799
                return_(false);
 
800
        iRepoFileSize = repo->getRepoFileSize();
 
801
        if ((!iRepoOffset) || !positioned) 
 
802
                iRepoOffset = repo->getRepoHeadSize();
 
803
                
 
804
        iRepoCurrentOffset = iRepoOffset;
 
805
                
 
806
        return_(true);
 
807
}
 
808
 
 
809
//-----------------------
 
810
bool MSRepositoryTable::seqScanNext(char *buf)
 
811
{
 
812
 
 
813
        enter_();
 
814
        iRepoCurrentOffset = iRepoOffset;
 
815
        
 
816
        if (returnSubRecord(buf))
 
817
                goto exit;
 
818
 
 
819
        restart:
 
820
        if ((!iRepoFile) && !MSRepositoryTable::resetScan(false))
 
821
                return false;
 
822
 
 
823
        while (iRepoOffset < iRepoFileSize) {
 
824
                if (returnRecord(buf))
 
825
                        goto exit;
 
826
        }
 
827
 
 
828
        if (iRepoFile) {
 
829
                iRepoFile->release();
 
830
                iRepoFile = NULL;
 
831
                iRepoOffset = 0;
 
832
        }
 
833
        iRepoIndex++;
 
834
        goto restart;
 
835
 
 
836
        exit:
 
837
        return_(true);
 
838
}
 
839
 
 
840
//-----------------------
 
841
int     MSRepositoryTable::getRefLen()
 
842
{
 
843
        return sizeof(iRepoIndex) + sizeof(iRepoOffset);
 
844
}
 
845
 
 
846
 
 
847
//-----------------------
 
848
void MSRepositoryTable::seqScanPos(uint8_t *pos)
 
849
{
 
850
        mi_int4store(pos, iRepoIndex); pos +=4;
 
851
        mi_int8store(pos, iRepoCurrentOffset);
 
852
}
 
853
 
 
854
//-----------------------
 
855
void MSRepositoryTable::seqScanRead(uint32_t repo, uint64_t offset, char *buf)
 
856
{
 
857
        iRepoOffset = offset;
 
858
 
 
859
        if (!resetScan(true, repo))
 
860
                return;
 
861
                
 
862
        seqScanNext(buf);
 
863
}
 
864
 
 
865
//-----------------------
 
866
void MSRepositoryTable::seqScanRead(uint8_t *pos, char *buf)
 
867
{
 
868
        seqScanRead(mi_uint4korr( pos), mi_uint8korr(pos +4), buf);
 
869
}
 
870
 
 
871
//-----------------------
 
872
bool MSRepositoryTable::returnRecord(char *buf)
 
873
{
 
874
        CSMutex                 *lock;
 
875
        MSBlobHeadRec   blob;
 
876
        uint16_t                        head_size;
 
877
        uint64_t                        blob_size;
 
878
        int                             ref_count;
 
879
        size_t                  ref_size;
 
880
        uint8_t                 status;
 
881
 
 
882
        enter_();
 
883
        retry_read:
 
884
        lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
 
885
        lock_(lock);
 
886
        if (iRepoFile->read(&blob, iRepoOffset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) {
 
887
                unlock_(lock);
 
888
                iRepoOffset = iRepoFileSize;
 
889
                return_(false);
 
890
        }
 
891
        head_size = CS_GET_DISK_2(blob.rb_head_size_2);
 
892
        blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
 
893
        ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
 
894
        ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
 
895
        status = CS_GET_DISK_1(blob.rb_status_1);
 
896
        if (ref_count <= 0 || ref_size == 0 ||
 
897
                head_size < iRepoFile->myRepo->getRepoBlobHeadSize() + ref_count * ref_size || 
 
898
                !VALID_BLOB_STATUS(status)) {
 
899
                /* Can't be true. Assume this is garbage! */
 
900
                unlock_(lock);
 
901
                iRepoOffset++;
 
902
                goto retry_read;
 
903
        }
 
904
 
 
905
        if (IN_USE_BLOB_STATUS(status)) {
 
906
                unlock_(lock);
 
907
                if (!returnRow(&blob, buf)) {
 
908
                        /* This record may not have had any data of interest. */
 
909
                        iRepoOffset++;
 
910
                        goto retry_read;
 
911
                }
 
912
                iRepoOffset += head_size + blob_size;
 
913
                return_(true);
 
914
        }
 
915
        unlock_(lock);
 
916
        iRepoOffset += head_size + blob_size;
 
917
        return_(false);
 
918
}
 
919
 
 
920
//-----------------------
 
921
bool MSRepositoryTable::returnSubRecord(char *)
 
922
{
 
923
        return false;
 
924
}
 
925
 
 
926
//-----------------------
 
927
bool MSRepositoryTable::returnRow(MSBlobHeadPtr blob, char *buf)
 
928
{
 
929
        TABLE           *table = mySQLTable;
 
930
        uint8_t         storage_type;
 
931
        uint32_t                access_count;
 
932
        uint32_t                last_access;
 
933
        uint32_t                last_ref;
 
934
        uint32_t                creation_time;
 
935
        uint32_t                access_code;
 
936
        uint16_t                head_size;
 
937
        uint16_t                alias_offset;
 
938
        uint64_t                blob_size;
 
939
        Field           *curr_field;
 
940
        byte            *save;
 
941
        MY_BITMAP       *save_write_set;
 
942
        
 
943
        enter_();
 
944
 
 
945
        storage_type = CS_GET_DISK_1(blob->rb_storage_type_1);
 
946
        last_access = CS_GET_DISK_4(blob->rb_last_access_4);
 
947
        access_count = CS_GET_DISK_4(blob->rb_access_count_4);
 
948
        last_ref = CS_GET_DISK_4(blob->rb_last_ref_4);
 
949
        creation_time = CS_GET_DISK_4(blob->rb_create_time_4);
 
950
        head_size = CS_GET_DISK_2(blob->rb_head_size_2);
 
951
        blob_size = CS_GET_DISK_6(blob->rb_blob_repo_size_6);
 
952
        access_code = CS_GET_DISK_4(blob->rb_auth_code_4);
 
953
        alias_offset = CS_GET_DISK_2(blob->rb_alias_offset_2);
 
954
 
 
955
        /* ASSERT_COLUMN_MARKED_FOR_WRITE is failing when
 
956
         * I use store()!??
 
957
         * But I want to use it! :(
 
958
         */
 
959
        save_write_set = table->write_set;
 
960
        table->write_set = NULL;
 
961
 
 
962
        memset(buf, 0xFF, table->s->null_bytes);
 
963
        for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
 
964
                curr_field = *field;
 
965
 
 
966
                save = curr_field->ptr;
 
967
#if MYSQL_VERSION_ID < 50114
 
968
                curr_field->ptr = (byte *) buf + curr_field->offset();
 
969
#else
 
970
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
 
971
#endif
 
972
                switch (curr_field->field_name[0]) {
 
973
                        case 'A':
 
974
                                switch (curr_field->field_name[9]) {
 
975
                                        case 'd':
 
976
                                                ASSERT(strcmp(curr_field->field_name, "Access_code") == 0);
 
977
                                                curr_field->store(access_code, true);
 
978
                                                setNotNullInRecord(curr_field, buf);
 
979
                                                break;
 
980
                                        case 'u':
 
981
                                                ASSERT(strcmp(curr_field->field_name, "Access_count") == 0);
 
982
                                                curr_field->store(access_count, true);
 
983
                                                setNotNullInRecord(curr_field, buf);
 
984
                                                break;
 
985
                                }
 
986
                                break;
 
987
                        case 'R':
 
988
                                switch (curr_field->field_name[6]) {
 
989
                                        case 't':
 
990
                                                // Repository_id     INT
 
991
                                                ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
 
992
                                                curr_field->store(iRepoFile->myRepo->getRepoID(), true);
 
993
                                                setNotNullInRecord(curr_field, buf);
 
994
                                                break;
 
995
                                        case 'l':
 
996
                                                // Repo_blob_offset  BIGINT
 
997
                                                ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
 
998
                                                curr_field->store(iRepoOffset, true);
 
999
                                                setNotNullInRecord(curr_field, buf);
 
1000
                                                break;
 
1001
                                }
 
1002
                                break;
 
1003
                        case 'B':
 
1004
                                switch (curr_field->field_name[5]) {
 
1005
                                        case 's':
 
1006
                                                // Blob_size         BIGINT
 
1007
                                                ASSERT(strcmp(curr_field->field_name, "Blob_size") == 0);
 
1008
                                                curr_field->store(blob_size, true);
 
1009
                                                setNotNullInRecord(curr_field, buf);
 
1010
                                                break;
 
1011
                                        case 'a':
 
1012
                                                // Blob_alias         
 
1013
                                                ASSERT(strcmp(curr_field->field_name, "Blob_alias") == 0);
 
1014
#ifdef HAVE_ALIAS_SUPPORT
 
1015
                                                if (alias_offset) {
 
1016
                                                        char blob_alias[BLOB_ALIAS_LENGTH +1];
 
1017
                                                        CSMutex *lock;
 
1018
                                                        uint64_t rsize;
 
1019
                                                        
 
1020
                                                        lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
 
1021
                                                        lock_(lock);
 
1022
                                                        rsize = iRepoFile->read(blob_alias, iRepoOffset + alias_offset, BLOB_ALIAS_LENGTH, 0);
 
1023
                                                        unlock_(lock);
 
1024
                                                        blob_alias[rsize] = 0;
 
1025
                                                        curr_field->store(blob_alias, strlen(blob_alias), &my_charset_utf8_general_ci);
 
1026
                                                        setNotNullInRecord(curr_field, buf);
 
1027
                                                } else {
 
1028
                                                        curr_field->store((uint64_t) 0, true);
 
1029
                                                }
 
1030
#else
 
1031
                                                curr_field->store((uint64_t) 0, true);
 
1032
#endif
 
1033
                                                
 
1034
                                                break;
 
1035
                                }
 
1036
 
 
1037
                                break;
 
1038
                        case 'M': // MD5_Checksum
 
1039
                                if (storage_type == MS_STANDARD_STORAGE) {
 
1040
                                        char checksum[sizeof(Md5Digest) *2];
 
1041
                                        
 
1042
                                        ASSERT(strcmp(curr_field->field_name, "MD5_Checksum") == 0);
 
1043
                                        cs_bin_to_hex(sizeof(Md5Digest) *2, checksum, sizeof(Md5Digest), &(blob->rb_blob_checksum_md5d));
 
1044
                                        curr_field->store(checksum, sizeof(Md5Digest) *2, system_charset_info);
 
1045
                                        setNotNullInRecord(curr_field, buf);
 
1046
                                        
 
1047
                                } else
 
1048
                                        curr_field->store((uint64_t) 0, true);
 
1049
                        
 
1050
                                break;
 
1051
                        case 'H':
 
1052
                                // Head_size         SMALLINT UNSIGNED
 
1053
                                ASSERT(strcmp(curr_field->field_name, "Head_size") == 0);
 
1054
                                curr_field->store(head_size, true);
 
1055
                                setNotNullInRecord(curr_field, buf);
 
1056
                                break;
 
1057
                        case 'C':
 
1058
                                // Creation_time     TIMESTAMP
 
1059
                                ASSERT(strcmp(curr_field->field_name, "Creation_time") == 0);
 
1060
                                curr_field->store(ms_my_1970_to_mysql_time(creation_time), true);
 
1061
                                setNotNullInRecord(curr_field, buf);
 
1062
                                break;
 
1063
                        case 'L':
 
1064
                                switch (curr_field->field_name[5]) {
 
1065
                                        case 'r':
 
1066
                                                // Last_ref_time     TIMESTAMP
 
1067
                                                ASSERT(strcmp(curr_field->field_name, "Last_ref_time") == 0);
 
1068
                                                curr_field->store(ms_my_1970_to_mysql_time(last_ref), true);
 
1069
                                                setNotNullInRecord(curr_field, buf);
 
1070
                                                break;
 
1071
                                        case 'a':
 
1072
                                                // Last_access_time  TIMESTAMP
 
1073
                                                ASSERT(strcmp(curr_field->field_name, "Last_access_time") == 0);
 
1074
                                                curr_field->store(ms_my_1970_to_mysql_time(last_access), true);
 
1075
                                                setNotNullInRecord(curr_field, buf);
 
1076
                                                break;
 
1077
                                }
 
1078
                                break;
 
1079
                }
 
1080
                curr_field->ptr = save;
 
1081
        }
 
1082
 
 
1083
        table->write_set = save_write_set;
 
1084
        return_(true);
 
1085
}
 
1086
 
 
1087
/*
 
1088
 * -------------------------------------------------------------------------
 
1089
 * BLOB DATA TABLE
 
1090
 */
 
1091
//-----------------------
 
1092
MSBlobDataTable::MSBlobDataTable(MSSystemTableShare *share, TABLE *table):MSRepositoryTable(share, table)
 
1093
{
 
1094
}
 
1095
 
 
1096
//-----------------------
 
1097
MSBlobDataTable::~MSBlobDataTable()
 
1098
{       
 
1099
}
 
1100
 
 
1101
//-----------------------
 
1102
bool MSBlobDataTable::returnRow(MSBlobHeadPtr blob, char *buf)
 
1103
{
 
1104
        TABLE           *table = mySQLTable;
 
1105
        uint8_t         blob_type;
 
1106
        uint16_t                head_size;
 
1107
        uint64_t                blob_size;
 
1108
        uint32          len;
 
1109
        Field           *curr_field;
 
1110
        byte            *save;
 
1111
        MY_BITMAP       *save_write_set;
 
1112
 
 
1113
        head_size = CS_GET_DISK_2(blob->rb_head_size_2);
 
1114
        blob_size = CS_GET_DISK_6(blob->rb_blob_repo_size_6);
 
1115
        blob_type = CS_GET_DISK_1(blob->rb_storage_type_1);
 
1116
 
 
1117
        /* ASSERT_COLUMN_MARKED_FOR_WRITE is failing when
 
1118
         * I use store()!??
 
1119
         * But I want to use it! :(
 
1120
         */
 
1121
        save_write_set = table->write_set;
 
1122
        table->write_set = NULL;
 
1123
 
 
1124
        memset(buf, 0xFF, table->s->null_bytes);
 
1125
        for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
 
1126
                curr_field = *field;
 
1127
 
 
1128
                save = curr_field->ptr;
 
1129
#if MYSQL_VERSION_ID < 50114
 
1130
                curr_field->ptr = (byte *) buf + curr_field->offset();
 
1131
#else
 
1132
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
 
1133
#endif
 
1134
                switch (curr_field->field_name[0]) {
 
1135
                        case 'R':
 
1136
                                switch (curr_field->field_name[6]) {
 
1137
                                        case 't':
 
1138
                                                // Repository_id     INT
 
1139
                                                ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
 
1140
                                                curr_field->store(iRepoFile->myRepo->getRepoID(), true);
 
1141
                                                setNotNullInRecord(curr_field, buf);
 
1142
                                                break;
 
1143
                                        case 'l':
 
1144
                                                // Repo_blob_offset  BIGINT
 
1145
                                                ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
 
1146
                                                curr_field->store(iRepoOffset, true);
 
1147
                                                setNotNullInRecord(curr_field, buf);
 
1148
                                                break;
 
1149
                                }
 
1150
                                break;
 
1151
                        case 'B':
 
1152
                                // Blob_data         LONGBLOB
 
1153
                                ASSERT(strcmp(curr_field->field_name, "Blob_data") == 0);
 
1154
                                if (blob_size <= 0xFFFFFFF) {
 
1155
                                        iBlobBuffer->setLength((uint32_t) blob_size);
 
1156
                                        if (BLOB_IN_REPOSITORY(blob_type)) {
 
1157
                                                len = iRepoFile->read(iBlobBuffer->getBuffer(0), iRepoOffset + head_size, (size_t) blob_size, 0);
 
1158
                                        } else {
 
1159
                                                CloudDB *blobCloud = myShare->mySysDatabase->myBlobCloud;
 
1160
                                                CloudKeyRec key;
 
1161
                                                ASSERT(blobCloud != NULL);
 
1162
                                                
 
1163
                                                MSRepoFile::getBlobKey(blob, &key);
 
1164
                                                
 
1165
                                                len = blobCloud->cl_getData(&key, iBlobBuffer->getBuffer(0), blob_size);
 
1166
                                        }
 
1167
                                        ((Field_blob *) curr_field)->set_ptr(len, (byte *) iBlobBuffer->getBuffer(0));
 
1168
                                        setNotNullInRecord(curr_field, buf);
 
1169
                                }
 
1170
                                break;
 
1171
                }
 
1172
                curr_field->ptr = save;
 
1173
        }
 
1174
 
 
1175
        table->write_set = save_write_set;
 
1176
        return true;
 
1177
}
 
1178
 
 
1179
#ifdef HAVE_ALIAS_SUPPORT
 
1180
/*
 
1181
 * -------------------------------------------------------------------------
 
1182
 * Alias DATA TABLE
 
1183
 */
 
1184
bool MSBlobAliasTable::returnRow(MSBlobHeadPtr blob, char *buf)
 
1185
{
 
1186
        TABLE           *table = mySQLTable;
 
1187
        Field           *curr_field;
 
1188
        byte            *save;
 
1189
        MY_BITMAP       *save_write_set;
 
1190
        uint16_t                alias_offset; 
 
1191
        char            blob_alias[BLOB_ALIAS_LENGTH +1];
 
1192
        CSMutex         *lock;
 
1193
        uint64_t                rsize;
 
1194
        enter_();
 
1195
        
 
1196
        alias_offset = CS_GET_DISK_2(blob->rb_alias_offset_2);
 
1197
 
 
1198
        if (!alias_offset)
 
1199
                return_(false);
 
1200
 
 
1201
        lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
 
1202
        lock_(lock);
 
1203
        rsize = iRepoFile->read(blob_alias, iRepoOffset + alias_offset, BLOB_ALIAS_LENGTH, 0);
 
1204
        unlock_(lock);
 
1205
        blob_alias[rsize] = 0;
 
1206
        
 
1207
        /* ASSERT_COLUMN_MARKED_FOR_WRITE is failing when
 
1208
         * I use store()!??
 
1209
         * But I want to use it! :(
 
1210
         */
 
1211
        save_write_set = table->write_set;
 
1212
        table->write_set = NULL;
 
1213
 
 
1214
        memset(buf, 0xFF, table->s->null_bytes);
 
1215
        for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
 
1216
                curr_field = *field;
 
1217
 
 
1218
                save = curr_field->ptr;
 
1219
#if MYSQL_VERSION_ID < 50114
 
1220
                curr_field->ptr = (byte *) buf + curr_field->offset();
 
1221
#else
 
1222
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->getInsertRecord());
 
1223
#endif
 
1224
                switch (curr_field->field_name[0]) {
 
1225
                        case 'R':
 
1226
                                switch (curr_field->field_name[6]) {
 
1227
                                        case 't':
 
1228
                                                // Repository_id     INT
 
1229
                                                ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
 
1230
                                                curr_field->store(iRepoFile->myRepo->getRepoID(), true);
 
1231
                                                setNotNullInRecord(curr_field, buf);
 
1232
                                                break;
 
1233
                                        case 'l':
 
1234
                                                // Repo_blob_offset  BIGINT
 
1235
                                                ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
 
1236
                                                curr_field->store(iRepoOffset, true);
 
1237
                                                setNotNullInRecord(curr_field, buf);
 
1238
                                                break;
 
1239
                                }
 
1240
                                break;
 
1241
                        case 'B':
 
1242
                                // Blob_alias         
 
1243
                                ASSERT(strcmp(curr_field->field_name, "Blob_alias") == 0);
 
1244
                                curr_field->store(blob_alias, strlen(blob_alias), &UTF8_CHARSET);
 
1245
                                setNotNullInRecord(curr_field, buf);
 
1246
                                
 
1247
                                break;
 
1248
                }
 
1249
                curr_field->ptr = save;
 
1250
        }
 
1251
 
 
1252
        table->write_set = save_write_set;
 
1253
        return_(true);
 
1254
}
 
1255
 
 
1256
#endif
 
1257
 
 
1258
/*
 
1259
 * -------------------------------------------------------------------------
 
1260
 * REFERENCE TABLE
 
1261
 */
 
1262
 
 
1263
MSReferenceTable::MSReferenceTable(MSSystemTableShare *share, TABLE *table):
 
1264
MSRepositoryTable(share, table),
 
1265
iRefDataList(NULL), 
 
1266
iRefDataSize(0), 
 
1267
iRefDataUsed(0),
 
1268
iRefDataPos(0),
 
1269
iRefOpenTable(NULL),
 
1270
iRefTempLog(NULL)
 
1271
{
 
1272
}
 
1273
 
 
1274
MSReferenceTable::~MSReferenceTable()
 
1275
{
 
1276
        if (iRefDataList)
 
1277
                cs_free(iRefDataList);
 
1278
        if (iRefOpenTable)
 
1279
                iRefOpenTable->returnToPool();
 
1280
        if (iRefTempLog)
 
1281
                iRefTempLog->release();
 
1282
}
 
1283
 
 
1284
void MSReferenceTable::unuse()
 
1285
{
 
1286
        MSRepositoryTable::unuse();
 
1287
        if (iRefDataList) {
 
1288
                cs_free(iRefDataList);
 
1289
                iRefDataList = NULL;
 
1290
        }
 
1291
        iRefDataSize = 0;
 
1292
        if (iRefOpenTable) {
 
1293
                iRefOpenTable->returnToPool();
 
1294
                iRefOpenTable = NULL;
 
1295
        }
 
1296
        if (iRefTempLog) {
 
1297
                iRefTempLog->release();
 
1298
                iRefTempLog = NULL;
 
1299
        }
 
1300
}
 
1301
 
 
1302
 
 
1303
void MSReferenceTable::seqScanInit()
 
1304
{
 
1305
        MSRepositoryTable::seqScanInit();
 
1306
        iRefDataUsed = 0;
 
1307
        iRefDataPos = 0;
 
1308
}
 
1309
 
 
1310
int     MSReferenceTable::getRefLen()
 
1311
{
 
1312
        return sizeof(iRefDataUsed) + sizeof(iRefDataPos) + sizeof(iRefCurrentIndex) + sizeof(iRefCurrentOffset);
 
1313
}
 
1314
 
 
1315
void MSReferenceTable::seqScanPos(uint8_t *pos)
 
1316
{
 
1317
        mi_int4store(pos, iRefCurrentDataPos); pos +=4;
 
1318
        mi_int4store(pos, iRefCurrentDataUsed);pos +=4; 
 
1319
        mi_int4store(pos, iRefCurrentIndex); pos +=4;
 
1320
        mi_int8store(pos, iRefCurrentOffset);
 
1321
}
 
1322
 
 
1323
void MSReferenceTable::seqScanRead(uint8_t *pos, char *buf)
 
1324
{
 
1325
        iRefDataPos = mi_uint4korr( pos); pos +=4;
 
1326
        iRefDataUsed = mi_uint4korr(pos); pos +=4;
 
1327
        iRefBlobRepo = mi_uint4korr(pos); pos +=4;
 
1328
        iRefBlobOffset = mi_uint8korr(pos);
 
1329
        MSRepositoryTable::seqScanRead(iRefBlobRepo, iRefBlobOffset, buf);
 
1330
}
 
1331
 
 
1332
bool MSReferenceTable::seqScanNext(char *buf)
 
1333
{
 
1334
        iRefCurrentDataPos = iRefDataPos;
 
1335
        iRefCurrentDataUsed = iRefDataUsed;
 
1336
        
 
1337
        // Reset the position
 
1338
        return MSRepositoryTable::seqScanNext(buf);
 
1339
}
 
1340
// select * from pbms_reference order by blob_size DESC;
 
1341
// select * from pbms_reference order by Table_name DESC;
 
1342
 
 
1343
bool MSReferenceTable::resetScan(bool positioned, uint32_t repo_index)
 
1344
{
 
1345
        CSMutex                         *lock;
 
1346
        MSBlobHeadRec           blob;
 
1347
        uint16_t                        head_size;
 
1348
        uint64_t                        blob_size;
 
1349
        MSRepoPointersRec       ptr;
 
1350
        size_t                          ref_size;
 
1351
        uint32_t                        tab_index;
 
1352
        uint32_t                        ref_count;
 
1353
        uint8_t                         status;
 
1354
 
 
1355
        enter_();
 
1356
        
 
1357
        if (!MSRepositoryTable::resetScan(positioned, repo_index))
 
1358
                return_(false);
 
1359
        
 
1360
        retry_read:
 
1361
        lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
 
1362
        lock_(lock);
 
1363
        if (iRepoFile->read(&blob, iRepoOffset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) {
 
1364
                unlock_(lock);
 
1365
                iRepoOffset = iRepoFileSize;
 
1366
                return_(false);
 
1367
        }
 
1368
        head_size = CS_GET_DISK_2(blob.rb_head_size_2);
 
1369
        blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
 
1370
        ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
 
1371
        ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
 
1372
        status = CS_GET_DISK_1(blob.rb_status_1);
 
1373
        if (ref_count <= 0 || ref_size == 0 ||
 
1374
                head_size < iRepoFile->myRepo->getRepoBlobHeadSize() + ref_count * ref_size || 
 
1375
                !VALID_BLOB_STATUS(status)) {
 
1376
                /* Can't be true. Assume this is garbage! */
 
1377
                unlock_(lock);
 
1378
                iRepoOffset++;
 
1379
                goto retry_read;
 
1380
        }
 
1381
 
 
1382
        if (IN_USE_BLOB_STATUS(status)) {
 
1383
                iBlobBuffer->setLength((uint32_t) head_size);
 
1384
                if (iRepoFile->read(iBlobBuffer->getBuffer(0), iRepoOffset, head_size, 0) < head_size) {
 
1385
                        unlock_(lock);
 
1386
                        iRepoOffset = iRepoFileSize;
 
1387
                        return_(false);
 
1388
                }
 
1389
                unlock_(lock);
 
1390
 
 
1391
                iRefAuthCode = CS_GET_DISK_4(blob.rb_auth_code_4);
 
1392
                iRefBlobSize = CS_GET_DISK_6(blob.rb_blob_data_size_6);;
 
1393
                iRefBlobRepo = iRepoFile->myRepo->getRepoID();
 
1394
                iRefBlobOffset = iRepoOffset;
 
1395
 
 
1396
                if (ref_count > iRefDataSize) {
 
1397
                        cs_realloc((void **) &iRefDataList, sizeof(MSRefDataRec) * ref_count);
 
1398
                        iRefDataSize = ref_count;
 
1399
                }
 
1400
                
 
1401
                if (!positioned) 
 
1402
                        iRefDataPos = 0;
 
1403
 
 
1404
                // When ever the data position is reset the current location information
 
1405
                // must also be reset so that it is consisent with the data position.
 
1406
                iRefCurrentDataPos = iRefDataPos;
 
1407
                iRefCurrentOffset = iRepoOffset;
 
1408
                iRefCurrentIndex = iRepoIndex;
 
1409
                iRefCurrentDataUsed = iRefDataUsed = ref_count;
 
1410
                memset(iRefDataList, 0, sizeof(MSRefDataRec) * ref_count);
 
1411
 
 
1412
                uint32_t h = iRepoFile->myRepo->getRepoBlobHeadSize();
 
1413
                ptr.rp_chars = iBlobBuffer->getBuffer(0) + h;
 
1414
                for (uint32_t i=0; i<ref_count; i++) {
 
1415
                        switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
 
1416
                                case MS_BLOB_FREE_REF:
 
1417
                                        break;
 
1418
                                case MS_BLOB_TABLE_REF: // The blob is intended for a file but has not been inserted yet.
 
1419
                                        iRefDataList[i].rd_tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
 
1420
                                        iRefDataList[i].rd_blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
 
1421
                                        iRefDataList[i].rd_col_index = INVALID_INDEX;  // Means not used
 
1422
                                        break;
 
1423
                                case MS_BLOB_DELETE_REF:
 
1424
                                        tab_index = CS_GET_DISK_2(ptr.rp_temp_ref->tp_del_ref_2);
 
1425
                                        if (tab_index && (tab_index <= ref_count)) {
 
1426
                                                tab_index--;
 
1427
                                                iRefDataList[tab_index].rd_temp_log_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);
 
1428
                                                iRefDataList[tab_index].rd_temp_log_offset = CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4);
 
1429
                                        }
 
1430
                                        else if (tab_index == INVALID_INDEX) {
 
1431
                                                /* The is a reference from the temporary log only!! */
 
1432
                                                iRefDataList[i].rd_tab_id = 0xFFFFFFFF;  // Indicates no table
 
1433
                                                iRefDataList[i].rd_blob_id = iRepoOffset;
 
1434
                                                iRefDataList[i].rd_blob_ref_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);;
 
1435
                                                iRefDataList[i].rd_col_index = INVALID_INDEX;  // Means not used
 
1436
                                                iRefDataList[i].rd_temp_log_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);
 
1437
                                                iRefDataList[i].rd_temp_log_offset = CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4);
 
1438
                                        }
 
1439
                                        break;
 
1440
                                default:
 
1441
                                        MSRepoTableRefPtr       tab_ref;
 
1442
 
 
1443
                                        tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1;
 
1444
                                        if (tab_index < ref_count) {
 
1445
                                                tab_ref = (MSRepoTableRefPtr) (iBlobBuffer->getBuffer(0) + iRepoFile->myRepo->getRepoBlobHeadSize() + tab_index * ref_size);
 
1446
        
 
1447
                                                iRefDataList[i].rd_tab_id = CS_GET_DISK_4(tab_ref->tr_table_id_4);
 
1448
                                                iRefDataList[i].rd_blob_id = CS_GET_DISK_6(tab_ref->tr_blob_id_6);
 
1449
                                                iRefDataList[i].rd_col_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_col_index_2);
 
1450
                                                iRefDataList[i].rd_blob_ref_id = COMMIT_MASK(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8));
 
1451
 
 
1452
                                                iRefDataList[tab_index].rd_ref_count++;
 
1453
                                        }
 
1454
                                        else {
 
1455
                                                /* Can't be true. Assume this is garbage! */
 
1456
                                                unlock_(lock);
 
1457
                                                iRepoOffset++;
 
1458
                                                goto retry_read;
 
1459
                                        }
 
1460
                                        break;
 
1461
                        }
 
1462
                        ptr.rp_chars += ref_size;
 
1463
                }
 
1464
 
 
1465
                iRepoOffset += head_size + blob_size;
 
1466
 
 
1467
                return_(true);
 
1468
        }
 
1469
        unlock_(lock);
 
1470
        iRepoOffset += head_size + blob_size;
 
1471
        return_(false);
 
1472
}
 
1473
 
 
1474
bool MSReferenceTable::returnRecord(char *buf)
 
1475
{
 
1476
        if (!resetScan(false))
 
1477
                return false;
 
1478
                
 
1479
        return(returnSubRecord(buf));
 
1480
}
 
1481
 
 
1482
bool MSReferenceTable::returnSubRecord(char *buf)
 
1483
{
 
1484
        uint32_t i;
 
1485
        
 
1486
        while (iRefDataPos < iRefDataUsed) {
 
1487
                i = iRefDataPos++;
 
1488
                if (iRefDataList[i].rd_tab_id) {
 
1489
                        if (iRefDataList[i].rd_col_index == INVALID_INDEX) {
 
1490
                                /* This is a table reference */
 
1491
                                if (!iRefDataList[i].rd_ref_count || iRefDataList[i].rd_temp_log_id) {
 
1492
                                        returnRow(&iRefDataList[i], buf);
 
1493
                                        return true;
 
1494
                                }
 
1495
                        }
 
1496
                        else {
 
1497
                                /* This is an engine reference */
 
1498
                                returnRow(&iRefDataList[i], buf);
 
1499
                                return true;
 
1500
                        }
 
1501
                }
 
1502
        }
 
1503
 
 
1504
        return false;
 
1505
}
 
1506
 
 
1507
void MSReferenceTable::returnRow(MSRefDataPtr ref_data, char *buf)
 
1508
{
 
1509
        TABLE                   *table = mySQLTable;
 
1510
        Field                   *curr_field;
 
1511
        byte                    *save;
 
1512
        MY_BITMAP               *save_write_set;
 
1513
        MY_BITMAP               *save_read_set;
 
1514
        bool                    have_times = false;
 
1515
        time_t                  delete_time;
 
1516
        int32_t                 countdown = 0;
 
1517
 
 
1518
        if (iRefOpenTable) {
 
1519
                if (iRefOpenTable->getDBTable()->myTableID != ref_data->rd_tab_id) {
 
1520
                        iRefOpenTable->returnToPool();
 
1521
                        iRefOpenTable = NULL;
 
1522
                }
 
1523
        }
 
1524
 
 
1525
        if (!iRefOpenTable && ref_data->rd_tab_id != 0xFFFFFFFF)
 
1526
                iRefOpenTable = MSTableList::getOpenTableByID(myShare->mySysDatabase->myDatabaseID, ref_data->rd_tab_id);
 
1527
 
 
1528
        if (ref_data->rd_temp_log_id) {
 
1529
                if (iRefTempLog) {
 
1530
                        if (iRefTempLog->myTempLogID != ref_data->rd_temp_log_id) {
 
1531
                                iRefTempLog->release();
 
1532
                                iRefTempLog = NULL;
 
1533
                        }
 
1534
                }
 
1535
                if (!iRefTempLog)
 
1536
                        iRefTempLog = myShare->mySysDatabase->openTempLogFile(ref_data->rd_temp_log_id, NULL, NULL);
 
1537
 
 
1538
                if (iRefTempLog) {
 
1539
                        MSTempLogItemRec        log_item;
 
1540
 
 
1541
                        if (iRefTempLog->read(&log_item, ref_data->rd_temp_log_offset, sizeof(MSTempLogItemRec), 0) == sizeof(MSTempLogItemRec)) {
 
1542
                                have_times = true;
 
1543
                                delete_time = CS_GET_DISK_4(log_item.ti_time_4);
 
1544
                                countdown = (int32_t) (delete_time + PBMSParameters::getTempBlobTimeout()) - (int32_t) time(NULL);
 
1545
                        }
 
1546
                }
 
1547
        }
 
1548
 
 
1549
        if (ref_data->rd_col_index != INVALID_INDEX) {
 
1550
                if (iRefOpenTable) {
 
1551
                        if (iRefOpenTable->getDBTable()->isToDelete()) {
 
1552
                                have_times = true;
 
1553
                                iRefOpenTable->getDBTable()->getDeleteInfo(&ref_data->rd_temp_log_id, &ref_data->rd_temp_log_offset, &delete_time);
 
1554
                                ref_data->rd_col_index = INVALID_INDEX;
 
1555
                                countdown = (int32_t) (delete_time + PBMSParameters::getTempBlobTimeout()) - (int32_t) time(NULL);
 
1556
                        }
 
1557
                }
 
1558
                else
 
1559
                        ref_data->rd_col_index = INVALID_INDEX;
 
1560
        }
 
1561
 
 
1562
        save_write_set = table->write_set;
 
1563
        save_read_set = table->read_set;
 
1564
        table->write_set = NULL;
 
1565
        table->read_set = NULL;
 
1566
 
 
1567
        memset(buf, 0xFF, table->s->null_bytes);
 
1568
        for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
 
1569
                curr_field = *field;
 
1570
 
 
1571
                save = curr_field->ptr;
 
1572
#if MYSQL_VERSION_ID < 50114
 
1573
                curr_field->ptr = (byte *) buf + curr_field->offset();
 
1574
#else
 
1575
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
 
1576
#endif
 
1577
                switch (curr_field->field_name[0]) {
 
1578
                        case 'B':
 
1579
                                switch (curr_field->field_name[5]) {
 
1580
                                        case 'i':
 
1581
                                                // Blob_id           BIGINT,
 
1582
                                                ASSERT(strcmp(curr_field->field_name, "Blob_id") == 0);
 
1583
                                                if (ref_data->rd_tab_id == 0xFFFFFFFF)
 
1584
                                                        curr_field->store(0, true);
 
1585
                                                else {
 
1586
                                                        curr_field->store(ref_data->rd_blob_id, true);
 
1587
                                                        setNotNullInRecord(curr_field, buf);
 
1588
                                                }
 
1589
                                                break;
 
1590
                                        case 'u':
 
1591
                                                // Blob_url          VARCHAR(120),
 
1592
                                                PBMSBlobURLRec blob_url;
 
1593
 
 
1594
                                                ASSERT(strcmp(curr_field->field_name, "Blob_url") == 0);
 
1595
                                                if (ref_data->rd_tab_id != 0xFFFFFFFF) {
 
1596
                                                        iRefOpenTable->formatBlobURL(&blob_url, ref_data->rd_blob_id, iRefAuthCode, iRefBlobSize, ref_data->rd_blob_ref_id);
 
1597
                                                        curr_field->store(blob_url.bu_data, strlen(blob_url.bu_data) +1, &UTF8_CHARSET); // Include the null char in the url. This is the way it is stored in user tables.
 
1598
                                                        setNotNullInRecord(curr_field, buf);
 
1599
                                                }
 
1600
                                                break;
 
1601
                                        case 's':
 
1602
                                                // Blob_size         BIGINT,
 
1603
                                                ASSERT(strcmp(curr_field->field_name, "Blob_size") == 0);
 
1604
                                                curr_field->store(iRefBlobSize, true);
 
1605
                                                setNotNullInRecord(curr_field, buf);
 
1606
                                                break;
 
1607
                                }
 
1608
                                break;
 
1609
                        case 'C':
 
1610
                                // Column_ordinal       INT,
 
1611
                                ASSERT(strcmp(curr_field->field_name, "Column_ordinal") == 0);
 
1612
                                if (ref_data->rd_col_index != INVALID_INDEX) {
 
1613
                                        curr_field->store(ref_data->rd_col_index +1, true);
 
1614
                                        setNotNullInRecord(curr_field, buf);
 
1615
                                }
 
1616
                                break;
 
1617
                        case 'D':
 
1618
                                // Deletion_time     TIMESTAMP,
 
1619
                                ASSERT(strcmp(curr_field->field_name, "Deletion_time") == 0);
 
1620
                                if (have_times) {
 
1621
                                        curr_field->store(ms_my_1970_to_mysql_time(delete_time), true);
 
1622
                                        setNotNullInRecord(curr_field, buf);
 
1623
                                }
 
1624
                                break;
 
1625
                        case 'R':
 
1626
                                switch (curr_field->field_name[5]) {
 
1627
                                        case 'i':
 
1628
                                                // Repository_id     INT,
 
1629
                                                ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
 
1630
                                                curr_field->store(iRefBlobRepo, true);
 
1631
                                                setNotNullInRecord(curr_field, buf);
 
1632
                                                break;
 
1633
                                        case 'b':
 
1634
                                                // Repo_blob_offset  BIGINT,
 
1635
                                                ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
 
1636
                                                curr_field->store(iRefBlobOffset, true);
 
1637
                                                setNotNullInRecord(curr_field, buf);
 
1638
                                                break;
 
1639
                                        case 'e':
 
1640
                                                // Remove_in INT
 
1641
                                                ASSERT(strcmp(curr_field->field_name, "Remove_in") == 0);
 
1642
                                                if (have_times) {
 
1643
                                                        curr_field->store(countdown, false);
 
1644
                                                        setNotNullInRecord(curr_field, buf);
 
1645
                                                }
 
1646
                                                break;
 
1647
                                }
 
1648
                                break;
 
1649
                        case 'T':
 
1650
                                switch (curr_field->field_name[9]) {
 
1651
                                        case 'e':
 
1652
                                                // Table_name        CHAR(64),
 
1653
                                                ASSERT(strcmp(curr_field->field_name, "Table_name") == 0);
 
1654
                                                if (ref_data->rd_tab_id != 0xFFFFFFFF) {
 
1655
                                                        if (iRefOpenTable) {
 
1656
                                                                CSString *table_name = iRefOpenTable->getDBTable()->getTableName();
 
1657
                                                                curr_field->store(table_name->getCString(), table_name->length(), &UTF8_CHARSET);
 
1658
                                                        }
 
1659
                                                        else {
 
1660
                                                                char buffer[MS_TABLE_NAME_SIZE];
 
1661
                                                                
 
1662
                                                                snprintf(buffer, MS_TABLE_NAME_SIZE, "Table #%"PRIu32"", ref_data->rd_tab_id);
 
1663
                                                                curr_field->store(buffer, strlen(buffer), &UTF8_CHARSET);
 
1664
                                                        }
 
1665
                                                        setNotNullInRecord(curr_field, buf);
 
1666
                                                }
 
1667
                                                break;
 
1668
                                        case 'i':
 
1669
                                                // Temp_log_id       INT,
 
1670
                                                ASSERT(strcmp(curr_field->field_name, "Temp_log_id") == 0);
 
1671
                                                if (ref_data->rd_temp_log_id) {
 
1672
                                                        curr_field->store(ref_data->rd_temp_log_id, true);
 
1673
                                                        setNotNullInRecord(curr_field, buf);
 
1674
                                                }
 
1675
                                                break;
 
1676
                                        case 'o':
 
1677
                                                // Temp_log_offset   BIGINT
 
1678
                                                ASSERT(strcmp(curr_field->field_name, "Temp_log_offset") == 0);
 
1679
                                                if (ref_data->rd_temp_log_id) {
 
1680
                                                        curr_field->store(ref_data->rd_temp_log_offset, true);
 
1681
                                                        setNotNullInRecord(curr_field, buf);
 
1682
                                                }
 
1683
                                                break;
 
1684
                                }
 
1685
                                break;
 
1686
                }
 
1687
                curr_field->ptr = save;
 
1688
        }
 
1689
 
 
1690
        table->write_set = save_write_set;
 
1691
        table->read_set = save_read_set;
 
1692
}
 
1693
 
 
1694
/*
 
1695
 * -------------------------------------------------------------------------
 
1696
 * META DATA TABLE
 
1697
 */
 
1698
MSMetaDataTable::MSMetaDataTable(MSSystemTableShare *share, TABLE *table):
 
1699
MSRepositoryTable(share, table),
 
1700
iMetData(NULL), 
 
1701
iMetCurrentBlobRepo(0),
 
1702
iMetCurrentBlobOffset(0),
 
1703
iMetCurrentDataPos(0),
 
1704
iMetCurrentDataSize(0),
 
1705
iMetDataPos(0),
 
1706
iMetDataSize(0), 
 
1707
iMetBlobRepo(0),
 
1708
iMetBlobOffset(0),
 
1709
iMetStateSaved(false)
 
1710
{
 
1711
}
 
1712
 
 
1713
MSMetaDataTable::~MSMetaDataTable()
 
1714
{
 
1715
        if (iMetData) {
 
1716
                iMetData->release();
 
1717
                iMetData = NULL;
 
1718
        }
 
1719
}
 
1720
 
 
1721
MSMetaDataTable *MSMetaDataTable::newMSMetaDataTable(MSDatabase *db)
 
1722
{
 
1723
        char path[PATH_MAX];
 
1724
        
 
1725
        cs_strcpy(PATH_MAX, path, db->myDatabasePath->getCString());
 
1726
        db->release();
 
1727
        cs_add_dir_char(PATH_MAX, path);
 
1728
        cs_strcat(PATH_MAX, path, sysTableNames[SYS_META]);
 
1729
        
 
1730
        return  (MSMetaDataTable*) MSSystemTableShare::openSystemTable(path, NULL);
 
1731
}
 
1732
 
 
1733
void MSMetaDataTable::unuse()
 
1734
{
 
1735
        MSRepositoryTable::unuse();
 
1736
        if (iMetData) {
 
1737
                iMetData->release();
 
1738
                iMetData = NULL;
 
1739
        }
 
1740
        iMetDataSize = 0;
 
1741
}
 
1742
 
 
1743
 
 
1744
void MSMetaDataTable::seqScanInit()
 
1745
{
 
1746
        MSRepositoryTable::seqScanInit();
 
1747
        iMetDataSize = 0;
 
1748
        iMetDataPos = 0;
 
1749
        iMetBlobRepo = 0;
 
1750
        iMetBlobOffset = 0;
 
1751
        new_(iMetData, CSStringBuffer(80));
 
1752
        iMetStateSaved = false;
 
1753
}
 
1754
 
 
1755
void MSMetaDataTable::seqScanReset()
 
1756
{
 
1757
        seqScanPos(iMetState);
 
1758
        seqScanInit();
 
1759
        iMetStateSaved = true;
 
1760
}
 
1761
 
 
1762
int     MSMetaDataTable::getRefLen()
 
1763
{
 
1764
        return sizeof(iMetCurrentDataPos) + sizeof(iMetCurrentDataSize) + sizeof(iMetCurrentBlobRepo) + sizeof(iMetCurrentBlobOffset);
 
1765
}
 
1766
 
 
1767
void MSMetaDataTable::seqScanPos(uint8_t *pos)
 
1768
{
 
1769
        mi_int4store(pos, iMetCurrentDataPos); pos +=4;
 
1770
        mi_int4store(pos, iMetCurrentDataSize);pos +=4; 
 
1771
        mi_int4store(pos, iMetCurrentBlobRepo); pos +=4;
 
1772
        mi_int8store(pos, iMetCurrentBlobOffset);
 
1773
}
 
1774
 
 
1775
void MSMetaDataTable::seqScanRead(uint8_t *pos, char *buf)
 
1776
{
 
1777
        iMetStateSaved = false;
 
1778
        iMetDataPos = mi_uint4korr( pos); pos +=4;
 
1779
        iMetDataSize = mi_uint4korr(pos); pos +=4;
 
1780
        iMetBlobRepo = mi_uint4korr(pos); pos +=4;
 
1781
        iMetBlobOffset = mi_uint8korr(pos);
 
1782
        MSRepositoryTable::seqScanRead(iMetBlobRepo, iMetBlobOffset, buf);
 
1783
}
 
1784
 
 
1785
bool MSMetaDataTable::seqScanNext(char *buf)
 
1786
{
 
1787
        if (iMetStateSaved) {
 
1788
                bool have_data;
 
1789
                uint8_t *pos = iMetState;
 
1790
                iMetDataPos = mi_uint4korr( pos); pos +=4;
 
1791
                // Do not reset the meta data size.
 
1792
                /*iMetDataSize = mi_uint4korr(pos); */pos +=4;
 
1793
                iMetBlobRepo = mi_uint4korr(pos); pos +=4;
 
1794
                iMetBlobOffset = mi_uint8korr(pos);
 
1795
                iMetStateSaved = false;
 
1796
                resetScan(true, &have_data, iMetBlobRepo);
 
1797
        }
 
1798
        
 
1799
        iMetCurrentDataPos = iMetDataPos;
 
1800
        iMetCurrentDataSize = iMetDataSize;
 
1801
        
 
1802
        return MSRepositoryTable::seqScanNext(buf);
 
1803
}
 
1804
 
 
1805
bool MSMetaDataTable::resetScan(bool positioned, bool *have_data, uint32_t repo_index)
 
1806
{
 
1807
        CSMutex                         *lock;
 
1808
        MSBlobHeadRec           blob;
 
1809
        uint16_t                                head_size;
 
1810
        uint64_t                                blob_size;
 
1811
        size_t                          mdata_size, mdata_offset;
 
1812
        uint8_t                         status;
 
1813
 
 
1814
        enter_();
 
1815
        
 
1816
        *have_data = false;
 
1817
        if (!MSRepositoryTable::resetScan(positioned, repo_index))
 
1818
                return_(false);
 
1819
        
 
1820
        retry_read:
 
1821
        lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
 
1822
        lock_(lock);
 
1823
        if (iRepoFile->read(&blob, iRepoOffset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) {
 
1824
                unlock_(lock);
 
1825
                iRepoOffset = iRepoFileSize;
 
1826
                return_(false);
 
1827
        }
 
1828
        
 
1829
        head_size = CS_GET_DISK_2(blob.rb_head_size_2);
 
1830
        blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
 
1831
        mdata_size = CS_GET_DISK_2(blob.rb_mdata_size_2);
 
1832
        mdata_offset = CS_GET_DISK_2(blob.rb_mdata_offset_2);
 
1833
        
 
1834
        status = CS_GET_DISK_1(blob.rb_status_1);
 
1835
        if ((head_size < (mdata_offset + mdata_size)) || !VALID_BLOB_STATUS(status)) {
 
1836
                /* Can't be true. Assume this is garbage! */
 
1837
                unlock_(lock);
 
1838
                iRepoOffset++;
 
1839
                goto retry_read;
 
1840
        }
 
1841
 
 
1842
        if (mdata_size && IN_USE_BLOB_STATUS(status)) {
 
1843
                iMetData->setLength((uint32_t) mdata_size);
 
1844
                if (iRepoFile->read(iMetData->getBuffer(0), iRepoOffset + mdata_offset, mdata_size, 0) < mdata_size) {
 
1845
                        unlock_(lock);
 
1846
                        iRepoOffset = iRepoFileSize;
 
1847
                        return_(false);
 
1848
                }
 
1849
 
 
1850
                iMetBlobRepo = iRepoFile->myRepo->getRepoID();
 
1851
                iMetBlobOffset = iRepoOffset;
 
1852
 
 
1853
                if (!positioned) 
 
1854
                        iMetDataPos = 0;
 
1855
 
 
1856
                iMetDataSize = mdata_size;
 
1857
                
 
1858
                // When ever the data position is reset the current location information
 
1859
                // must also be reset to that it is consisent with the data position.
 
1860
                iMetCurrentBlobOffset = iRepoOffset;
 
1861
                iMetCurrentBlobRepo = iRepoIndex;               
 
1862
                iMetCurrentDataPos = iMetDataPos;
 
1863
                iMetCurrentDataSize = iMetDataSize;
 
1864
                
 
1865
                *have_data = true;
 
1866
        }
 
1867
        unlock_(lock);
 
1868
        iRepoOffset += head_size + blob_size;
 
1869
        return_(true);
 
1870
}
 
1871
 
 
1872
bool MSMetaDataTable::returnRecord(char *buf)
 
1873
{
 
1874
        bool have_data = false;
 
1875
 
 
1876
        if (resetScan(false, &have_data) && have_data)
 
1877
                return(returnSubRecord(buf));
 
1878
                
 
1879
        return false;
 
1880
}
 
1881
 
 
1882
bool MSMetaDataTable::nextRecord(char **name, char **value)
 
1883
{
 
1884
        if (iMetDataPos < iMetDataSize) {
 
1885
                char *data = iMetData->getBuffer(iMetDataPos);
 
1886
                
 
1887
                *name = data;
 
1888
                data += strlen(*name) +1;
 
1889
                *value = data;
 
1890
                data += strlen(*value) +1;
 
1891
                
 
1892
                iMetDataPos += data - *name;
 
1893
                ASSERT(iMetDataPos <= iMetDataSize);
 
1894
                
 
1895
                return true;            
 
1896
        }
 
1897
 
 
1898
        return false;
 
1899
        
 
1900
}
 
1901
 
 
1902
bool MSMetaDataTable::returnSubRecord(char *buf)
 
1903
{
 
1904
        char *name, *value;
 
1905
        
 
1906
        if (nextRecord(&name, &value)) {
 
1907
                returnRow(name, value, buf);            
 
1908
                return true;            
 
1909
        }
 
1910
 
 
1911
        return false;
 
1912
}
 
1913
 
 
1914
void MSMetaDataTable::returnRow(char *name, char *value, char *buf)
 
1915
{
 
1916
        TABLE           *table = mySQLTable;
 
1917
        Field           *curr_field;
 
1918
        byte            *save;
 
1919
        MY_BITMAP       *save_write_set;
 
1920
 
 
1921
        save_write_set = table->write_set;
 
1922
        table->write_set = NULL;
 
1923
 
 
1924
        memset(buf, 0xFF, table->s->null_bytes);
 
1925
        for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
 
1926
                curr_field = *field;
 
1927
 
 
1928
                save = curr_field->ptr;
 
1929
#if MYSQL_VERSION_ID < 50114
 
1930
                curr_field->ptr = (byte *) buf + curr_field->offset();
 
1931
#else
 
1932
                curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
 
1933
#endif
 
1934
                switch (curr_field->field_name[0]) {
 
1935
                        case 'R':
 
1936
                                switch (curr_field->field_name[6]) {
 
1937
                                        case 't':
 
1938
                                                // Repository_id     INT
 
1939
                                                ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
 
1940
                                                curr_field->store(iRepoFile->myRepo->getRepoID(), true);
 
1941
                                                setNotNullInRecord(curr_field, buf);
 
1942
                                                break;
 
1943
                                        case 'l':
 
1944
                                                // Repo_blob_offset  BIGINT
 
1945
                                                ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
 
1946
                                                curr_field->store(iMetCurrentBlobOffset, true);
 
1947
                                                setNotNullInRecord(curr_field, buf);
 
1948
                                                break;
 
1949
                                }
 
1950
                                break;
 
1951
                        case 'N':
 
1952
                                // Name        
 
1953
                                ASSERT(strcmp(curr_field->field_name, "Name") == 0);
 
1954
                                curr_field->store(name, strlen(name), &UTF8_CHARSET);
 
1955
                                setNotNullInRecord(curr_field, buf);
 
1956
                                break;
 
1957
                        case 'V':
 
1958
                                // Value        
 
1959
                                ASSERT(strcmp(curr_field->field_name, "Value") == 0);
 
1960
                                curr_field->store(value, strlen(value), &my_charset_utf8_bin);
 
1961
                                setNotNullInRecord(curr_field, buf);
 
1962
                                break;
 
1963
                }
 
1964
                curr_field->ptr = save;
 
1965
        }
 
1966
 
 
1967
        table->write_set = save_write_set;
 
1968
}
 
1969
 
 
1970
 
 
1971
#ifdef HAVE_ALIAS_SUPPORT
 
1972
bool MSMetaDataTable::matchAlias(uint32_t repo_id, uint64_t offset, const char *alias)
 
1973
{
 
1974
        bool matched = false, have_data = false;
 
1975
        enter_();
 
1976
        
 
1977
        if (resetScan(true, &have_data, repo_id) && have_data) {
 
1978
                const char *blob_alias;
 
1979
                MetaData md(iMetData->getBuffer(0), iMetDataSize);
 
1980
                
 
1981
                blob_alias = md.findAlias();
 
1982
                matched = (blob_alias && !my_strcasecmp(&UTF8_CHARSET, blob_alias, alias));
 
1983
        }
 
1984
        
 
1985
        return_(matched);
 
1986
}
 
1987
#endif
 
1988
 
 
1989
void MSMetaDataTable::insertRow(char *buf)
 
1990
{
 
1991
        uint32_t repo_index;
 
1992
        String meta_name, meta_value;   
 
1993
        uint16_t data_len;
 
1994
        uint64_t repo_offset;
 
1995
        char *data;             
 
1996
        bool have_data, reset_alias = false;
 
1997
        
 
1998
        enter_();
 
1999
        
 
2000
        // Metadata inserts are ignored during reovery.
 
2001
        // They will be restored from the dump table.
 
2002
        if (myShare->mySysDatabase->isRecovering())
 
2003
                exit_();
 
2004
                
 
2005
        seqScanReset();
 
2006
 
 
2007
        getFieldValue(buf, 0, &repo_index);
 
2008
        getFieldValue(buf, 1, &repo_offset);
 
2009
        getFieldValue(buf, 2, &meta_name);
 
2010
        getFieldValue(buf, 3, &meta_value);
 
2011
        
 
2012
        if (!repo_index)
 
2013
                CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id");
 
2014
                
 
2015
        iRepoOffset = repo_offset;
 
2016
        if (!resetScan(true, &have_data, repo_index -1))
 
2017
                CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id or Repo_blob_offset");
 
2018
        
 
2019
        const char *alias = NULL, *tag = meta_name.c_ptr_safe();
 
2020
        
 
2021
        if (iMetDataSize) {
 
2022
                MetaData md(iMetData->getBuffer(0), iMetDataSize);
 
2023
                // Check to see it this is a duplicate name.
 
2024
                if (md.findName(tag))
 
2025
                        CSException::throwException(CS_CONTEXT, HA_ERR_FOUND_DUPP_KEY, "Meta data tag already exists.");
 
2026
                        
 
2027
#ifdef HAVE_ALIAS_SUPPORT
 
2028
                alias = md.findAlias();
 
2029
#endif
 
2030
        }
 
2031
        
 
2032
        // Create the meta data record.
 
2033
#ifdef HAVE_ALIAS_SUPPORT
 
2034
        if (alias)
 
2035
                data = iMetData->getBuffer(0); // This is to be able to test if the alias pointer needs to be reset.
 
2036
        else
 
2037
#endif
 
2038
                data = NULL;
 
2039
                
 
2040
        iMetData->setLength(iMetDataSize + meta_name.length() + meta_value.length()  + 2);
 
2041
        
 
2042
#ifdef HAVE_ALIAS_SUPPORT
 
2043
        if (alias && (data != iMetData->getBuffer(0))) // The buffer moved, adjust the alias.
 
2044
                alias += (iMetData->getBuffer(0) - data);
 
2045
#endif
 
2046
        
 
2047
        data = iMetData->getBuffer(0);
 
2048
        data_len = iMetDataSize;
 
2049
        
 
2050
#ifdef HAVE_ALIAS_SUPPORT
 
2051
        if ((!alias) && !my_strcasecmp(&UTF8_CHARSET, MS_ALIAS_TAG, tag)) {
 
2052
                reset_alias = true;
 
2053
                memcpy(data + data_len, MS_ALIAS_TAG, meta_name->length()); // Use my alias tag so we do not need to wory about case.
 
2054
                alias = data + data_len + meta_name->length() + 1; // Set the alias to the value location.
 
2055
        } else 
 
2056
#endif
 
2057
                memcpy(data + data_len, meta_name.c_ptr_quick(), meta_name.length());
 
2058
                
 
2059
        data_len += meta_name.length();
 
2060
        data[data_len] = 0;
 
2061
        data_len++;
 
2062
 
 
2063
        memcpy(data + data_len, meta_value.c_ptr_quick(), meta_value.length());
 
2064
        data_len += meta_value.length();
 
2065
        data[data_len] = 0;
 
2066
        data_len++;
 
2067
        
 
2068
        // Update the blob header with the new meta data.
 
2069
        MSOpenTable     *otab = MSOpenTable::newOpenTable(NULL);
 
2070
        push_(otab);
 
2071
        iRepoFile->setBlobMetaData(otab, repo_offset, data, data_len, reset_alias, alias);
 
2072
        release_(otab);
 
2073
                
 
2074
        exit_();
 
2075
}
 
2076
/*
 
2077
insert into pbms_mata_data values(1, 921, "ATAG 3", "xx");
 
2078
insert into pbms_mata_data values(1, 921, "ATAG 1", "A VALUE 1");
 
2079
insert into pbms_mata_data values(1, 921, "ATAG 2", "xx");
 
2080
insert into pbms_mata_data values(1, 383, "ATAG 2", "xx");
 
2081
select * from pbms_mata_data;
 
2082
 
 
2083
delete from pbms_mata_data where value = "xx";
 
2084
select * from pbms_mata_data;
 
2085
 
 
2086
update pbms_mata_data set value = "!!" where name = "ATAG 3";
 
2087
update pbms_mata_data set Repo_blob_offset = 383 where value = "!!";
 
2088
 
 
2089
delete from pbms_mata_data where Repo_blob_offset = 921;
 
2090
*/
 
2091
//insert into pbms_mata_data values(1, 921, "blob_ALIAs", "My_alias");
 
2092
//select * from pbms_mata_data;
 
2093
//select * from pbms_repository;
 
2094
 
 
2095
 
 
2096
void MSMetaDataTable::deleteRow(char *buf)
 
2097
{
 
2098
        uint32_t repo_index;
 
2099
        String meta_name, meta_value;   
 
2100
        uint16_t record_size;
 
2101
        uint64_t repo_offset;
 
2102
        char *data;
 
2103
        bool have_data, reset_alias = false;
 
2104
        
 
2105
        enter_();
 
2106
        
 
2107
        seqScanReset();
 
2108
 
 
2109
        getFieldValue(buf, 0, &repo_index);
 
2110
        getFieldValue(buf, 1, &repo_offset);
 
2111
        getFieldValue(buf, 2, &meta_name);
 
2112
        getFieldValue(buf, 3, &meta_value);
 
2113
 
 
2114
        if (!repo_index)
 
2115
                CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id");
 
2116
                
 
2117
        iRepoOffset = repo_offset;
 
2118
        if (!resetScan(true, &have_data, repo_index -1))
 
2119
                CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id or Repo_blob_offset");
 
2120
        
 
2121
        const char *alias = NULL, *value = NULL, *tag = meta_name.c_ptr_safe();
 
2122
        char *location;
 
2123
        
 
2124
        // Check to see name exists.
 
2125
        MetaData md;
 
2126
 
 
2127
        md.use_data(iMetData->getBuffer(0), iMetDataSize);
 
2128
        if (iMetDataSize) 
 
2129
                value = md.findName(tag);
 
2130
        
 
2131
        if (value == NULL)
 
2132
                CSException::throwException(CS_CONTEXT, HA_ERR_KEY_NOT_FOUND, "Meta data tag dosn't exists.");
 
2133
                        
 
2134
#ifdef HAVE_ALIAS_SUPPORT
 
2135
        alias = md.findAlias();
 
2136
        
 
2137
        if (alias == value) {
 
2138
                reset_alias = true;
 
2139
                alias = NULL;
 
2140
        }
 
2141
#endif
 
2142
        
 
2143
        // Create the meta data record.
 
2144
        data = md.getBuffer();
 
2145
        location = md.findNamePosition(tag);
 
2146
        record_size = MetaData::recSize(location);
 
2147
        iMetDataSize -= record_size;
 
2148
        memmove(location, location + record_size, iMetDataSize - (location - data)); // Shift the meta data down
 
2149
 
 
2150
#ifdef HAVE_ALIAS_SUPPORT
 
2151
        // Get the alias again incase it moved.
 
2152
        if (alias)
 
2153
                alias = md.findAlias();
 
2154
#endif
 
2155
        
 
2156
        // Update the blob header with the new meta data.
 
2157
        MSOpenTable     *otab = MSOpenTable::newOpenTable(NULL);
 
2158
        push_(otab);
 
2159
        iRepoFile->setBlobMetaData(otab, repo_offset, data, iMetDataSize, reset_alias, alias);
 
2160
        release_(otab);
 
2161
 
 
2162
        exit_();
 
2163
}
 
2164
 
 
2165
void MSMetaDataTable::updateRow(char *old_data, char *new_data)
 
2166
{
 
2167
        uint32_t o_repo_index, n_repo_index;
 
2168
        String n_meta_name, n_meta_value;       
 
2169
        String o_meta_name, o_meta_value;       
 
2170
        uint16_t record_size;
 
2171
        uint64_t o_repo_offset, n_repo_offset;
 
2172
        char *data;     
 
2173
        bool have_data, reset_alias = false;
 
2174
        
 
2175
        enter_();
 
2176
        
 
2177
        seqScanReset();
 
2178
 
 
2179
        getFieldValue(new_data, 0, &n_repo_index);
 
2180
        getFieldValue(new_data, 1, &n_repo_offset);
 
2181
        getFieldValue(new_data, 2, &n_meta_name);
 
2182
        getFieldValue(new_data, 3, &n_meta_value);
 
2183
 
 
2184
        getFieldValue(old_data, 0, &o_repo_index);
 
2185
        getFieldValue(old_data, 1, &o_repo_offset);
 
2186
        getFieldValue(old_data, 2, &o_meta_name);
 
2187
        getFieldValue(old_data, 3, &o_meta_value);
 
2188
 
 
2189
        if ((!o_repo_index) || (!n_repo_index))
 
2190
                CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id");
 
2191
        
 
2192
        // If the meta data is not for the same BLOB then we do an insert and delete.
 
2193
        if ((n_repo_index != o_repo_index) || (n_repo_offset != o_repo_offset)) {
 
2194
                insertRow(new_data);
 
2195
                try_(a) {
 
2196
                        deleteRow(old_data);
 
2197
                }
 
2198
                catch_(a) {
 
2199
                        deleteRow(new_data);
 
2200
                        throw_();
 
2201
                }
 
2202
                cont_(a);
 
2203
                exit_();
 
2204
        }
 
2205
        
 
2206
        iRepoOffset = n_repo_offset;
 
2207
        if (!resetScan(true, &have_data, n_repo_index -1))
 
2208
                CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id or Repo_blob_offset");
 
2209
        
 
2210
        char *location;
 
2211
        const char *value, *alias = NULL, *n_tag = n_meta_name.c_ptr_safe(), *o_tag = o_meta_name.c_ptr_safe();
 
2212
        
 
2213
        if (!my_strcasecmp(&UTF8_CHARSET, o_tag, n_tag))
 
2214
                n_tag = NULL;
 
2215
                
 
2216
        MetaData md;
 
2217
 
 
2218
        md.use_data(iMetData->getBuffer(0), iMetDataSize);
 
2219
        
 
2220
        if ((!iMetDataSize) || ((value = md.findName(o_tag)) == NULL))
 
2221
                CSException::throwException(CS_CONTEXT, HA_ERR_KEY_NOT_FOUND, "Meta data tag dosn't exists.");
 
2222
                        
 
2223
        if (n_tag && (md.findName(n_tag) != NULL))
 
2224
                CSException::throwException(CS_CONTEXT, HA_ERR_FOUND_DUPP_KEY, "Meta data tag already exists.");
 
2225
                
 
2226
#ifdef HAVE_ALIAS_SUPPORT
 
2227
        alias = md.findAlias();
 
2228
 
 
2229
        if (alias == value) {
 
2230
                reset_alias = true;
 
2231
                alias = NULL; // The alias is being deleted.
 
2232
        }
 
2233
#endif
 
2234
        
 
2235
        if (!n_tag)
 
2236
                n_tag = o_tag;
 
2237
                
 
2238
        // Create the meta data record.
 
2239
        data = md.getBuffer();
 
2240
        location = md.findNamePosition(o_tag);
 
2241
        record_size = MetaData::recSize(location);
 
2242
        iMetDataSize -= record_size;
 
2243
        memmove(location, location + record_size, iMetDataSize - (location - data)); // Shift the meta data down
 
2244
        
 
2245
        // Add the updated meta data to the end of the block.
 
2246
        iMetData->setLength(iMetDataSize + n_meta_name.length() + n_meta_value.length()  + 2);
 
2247
        
 
2248
        md.use_data(iMetData->getBuffer(0), iMetDataSize); // Reset this incase the buffer moved.
 
2249
#ifdef HAVE_ALIAS_SUPPORT
 
2250
        // Get the alias again incase it moved.
 
2251
        if (alias)
 
2252
                alias = md.findAlias();
 
2253
#endif
 
2254
        
 
2255
        data = iMetData->getBuffer(0);
 
2256
                
 
2257
#ifdef HAVE_ALIAS_SUPPORT
 
2258
        if ((!alias) && !my_strcasecmp(&UTF8_CHARSET, MS_ALIAS_TAG, n_tag)) {
 
2259
                reset_alias = true;
 
2260
                memcpy(data + iMetDataSize, MS_ALIAS_TAG, n_meta_name.length()); // Use my alias tag so we do not need to wory about case.
 
2261
                alias = data + iMetDataSize + n_meta_name.length() + 1; // Set the alias to the value location.
 
2262
        } else 
 
2263
#endif
 
2264
                memcpy(data + iMetDataSize, n_meta_name.c_ptr_quick(), n_meta_name.length());
 
2265
                
 
2266
        iMetDataSize += n_meta_name.length();
 
2267
        data[iMetDataSize] = 0;
 
2268
        iMetDataSize++;
 
2269
 
 
2270
        memcpy(data + iMetDataSize, n_meta_value.c_ptr_quick(), n_meta_value.length());
 
2271
        iMetDataSize += n_meta_value.length();
 
2272
        data[iMetDataSize] = 0;
 
2273
        iMetDataSize++;
 
2274
        
 
2275
        
 
2276
        // Update the blob header with the new meta data.
 
2277
        MSOpenTable     *otab = MSOpenTable::newOpenTable(NULL);
 
2278
        push_(otab);
 
2279
        iRepoFile->setBlobMetaData(otab, n_repo_offset, data, iMetDataSize, reset_alias, alias);
 
2280
        release_(otab);
 
2281
                
 
2282
        exit_();
 
2283
}
 
2284
 
 
2285
/*
 
2286
 * -------------------------------------------------------------------------
 
2287
 * SYSTEM TABLE SHARES
 
2288
 */
 
2289
 
 
2290
CSSyncSortedList *MSSystemTableShare::gSystemTableList;
 
2291
 
 
2292
MSSystemTableShare::MSSystemTableShare():
 
2293
CSRefObject(),
 
2294
myTablePath(NULL),
 
2295
mySysDatabase(NULL),
 
2296
iOpenCount(0)
 
2297
{
 
2298
        thr_lock_init(&myThrLock);
 
2299
}
 
2300
 
 
2301
MSSystemTableShare::~MSSystemTableShare()
 
2302
{
 
2303
        myThrLock.unlock();
 
2304
        if (myTablePath) {
 
2305
                myTablePath->release();
 
2306
                myTablePath = NULL;
 
2307
        }
 
2308
        if (mySysDatabase) {
 
2309
                mySysDatabase->release();
 
2310
                mySysDatabase = NULL;
 
2311
        }
 
2312
}
 
2313
 
 
2314
CSObject *MSSystemTableShare::getKey()
 
2315
{
 
2316
        return (CSObject *) myTablePath;
 
2317
}
 
2318
 
 
2319
int MSSystemTableShare::compareKey(CSObject *key)
 
2320
{
 
2321
        return myTablePath->compare((CSString *) key);
 
2322
}
 
2323
 
 
2324
void MSSystemTableShare::startUp()
 
2325
{
 
2326
        new_(gSystemTableList, CSSyncSortedList);
 
2327
}
 
2328
 
 
2329
void MSSystemTableShare::shutDown()
 
2330
{
 
2331
        if (gSystemTableList) {
 
2332
                gSystemTableList->release();
 
2333
                gSystemTableList = NULL;
 
2334
        }
 
2335
}
 
2336
 
 
2337
MSOpenSystemTable *MSSystemTableShare::openSystemTable(const char *table_path, TABLE *table)
 
2338
{
 
2339
        CSString                        *table_url;
 
2340
        MSSystemTableShare      *share;
 
2341
        MSOpenSystemTable       *otab = NULL;
 
2342
        SysTableType            table_type;
 
2343
 
 
2344
        enter_();
 
2345
        
 
2346
        table_type =  pbms_systable_type(cs_last_name_of_path(table_path));
 
2347
        if (table_type == SYS_UNKNOWN)
 
2348
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_TABLE, "Table not found");
 
2349
        
 
2350
        table_path = cs_last_name_of_path(table_path, 2);
 
2351
        table_url = CSString::newString(table_path);
 
2352
        push_(table_url);
 
2353
 
 
2354
        lock_(gSystemTableList);
 
2355
        if (!(share = (MSSystemTableShare *) gSystemTableList->find(table_url))) {
 
2356
                share = MSSystemTableShare::newTableShare(RETAIN(table_url));
 
2357
                gSystemTableList->add(share);
 
2358
        }
 
2359
        
 
2360
        switch (table_type) {
 
2361
                case SYS_REP:
 
2362
                        new_(otab, MSRepositoryTable(share, table));
 
2363
                        break;
 
2364
                case SYS_REF:
 
2365
                        new_(otab, MSReferenceTable(share, table));
 
2366
                        break;
 
2367
                case SYS_BLOB:
 
2368
                        new_(otab, MSBlobDataTable(share, table));
 
2369
                        break;
 
2370
                case SYS_DUMP:
 
2371
                        new_(otab, MSDumpTable(share, table));
 
2372
                        break;
 
2373
                case SYS_META:
 
2374
                        new_(otab, MSMetaDataTable(share, table));
 
2375
                        break;
 
2376
                case SYS_HTTP:
 
2377
                        new_(otab, MSHTTPHeaderTable(share, table));
 
2378
                        break;
 
2379
#ifdef HAVE_ALIAS_SUPPORT
 
2380
                case SYS_ALIAS:
 
2381
                        new_(otab, MSBlobAliasTable(share, table));
 
2382
                        break;
 
2383
#endif
 
2384
                case SYS_VARIABLE:
 
2385
                        new_(otab, MSVariableTable(share, table));
 
2386
                        break;
 
2387
                case SYS_CLOUD:
 
2388
                        new_(otab, MSCloudTable(share, table));
 
2389
                        break;
 
2390
                case SYS_BACKUP:
 
2391
                        new_(otab, MSBackupTable(share, table));
 
2392
                        break;
 
2393
#ifndef DRIZZLED
 
2394
                case SYS_ENABLED:
 
2395
                        new_(otab, MSEnabledTable(share, table));
 
2396
                        break;
 
2397
#endif
 
2398
                case SYS_UNKNOWN:
 
2399
                        break;
 
2400
        }
 
2401
        
 
2402
        share->iOpenCount++;
 
2403
        unlock_(gSystemTableList);
 
2404
 
 
2405
        release_(table_url);
 
2406
        return_(otab);
 
2407
}
 
2408
 
 
2409
void MSSystemTableShare::removeDatabaseSystemTables(MSDatabase *doomed_db)
 
2410
{
 
2411
        MSSystemTableShare      *share;
 
2412
        uint32_t i= 0;
 
2413
        enter_();
 
2414
        
 
2415
        push_(doomed_db);
 
2416
        lock_(gSystemTableList);
 
2417
        while ((share = (MSSystemTableShare *) gSystemTableList->itemAt(i))) {
 
2418
                if (share->mySysDatabase == doomed_db) {
 
2419
                        gSystemTableList->remove(share->myTablePath);
 
2420
                } else
 
2421
                        i++;
 
2422
        }
 
2423
        
 
2424
        unlock_(gSystemTableList);
 
2425
        release_(doomed_db);
 
2426
        exit_();
 
2427
}
 
2428
 
 
2429
void MSSystemTableShare::releaseSystemTable(MSOpenSystemTable *tab)
 
2430
{
 
2431
        enter_();
 
2432
        lock_(gSystemTableList);
 
2433
        tab->myShare->iOpenCount--;
 
2434
        if (!tab->myShare->iOpenCount) {
 
2435
                gSystemTableList->remove(tab->myShare->myTablePath);
 
2436
        }
 
2437
        unlock_(gSystemTableList);
 
2438
        exit_();
 
2439
}
 
2440
 
 
2441
MSSystemTableShare *MSSystemTableShare::newTableShare(CSString *table_path)
 
2442
{
 
2443
        MSSystemTableShare *tab;
 
2444
 
 
2445
        enter_();
 
2446
        if (!(tab = new MSSystemTableShare())) {
 
2447
                table_path->release();
 
2448
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
2449
        }
 
2450
        push_(tab);
 
2451
        tab->myTablePath = table_path;
 
2452
        tab->mySysDatabase = MSDatabase::getDatabase(table_path->left("/", -1), true);
 
2453
        pop_(tab);
 
2454
        return_(tab);
 
2455
}
 
2456
 
 
2457
void PBMSSystemTables::systemTablesStartUp()
 
2458
{
 
2459
        MSCloudTable::startUp();
 
2460
        MSBackupTable::startUp();
 
2461
}
 
2462
 
 
2463
void PBMSSystemTables::systemTableShutDown()
 
2464
{
 
2465
        MSBackupTable::shutDown();
 
2466
        MSCloudTable::shutDown();
 
2467
}
 
2468