~posulliv/drizzle/optimizer-style-cleanup

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/backup_xt.cc

  • Committer: Padraig O'Sullivan
  • Date: 2010-04-17 01:38:47 UTC
  • mfrom: (1237.9.238 bad-staging)
  • Revision ID: osullivan.padraig@gmail.com-20100417013847-ibjioqsfbmf5yg4g
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2009 PrimeBase Technologies GmbH
 
2
 *
 
3
 * PrimeBase XT
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * 2009-09-07   Paul McCullagh
 
20
 *
 
21
 * H&G2JCtL
 
22
 */
 
23
 
 
24
#include "xt_config.h"
 
25
 
 
26
#ifdef MYSQL_SUPPORTS_BACKUP
 
27
 
 
28
#include <string.h>
 
29
#include <stdio.h>
 
30
#include <stdlib.h>
 
31
#include <time.h>
 
32
#include <ctype.h>
 
33
 
 
34
#include "mysql_priv.h"
 
35
#include <backup/api_types.h>
 
36
#include <backup/backup_engine.h>
 
37
#include <backup/backup_aux.h>         // for build_table_list()
 
38
#include <hash.h>
 
39
 
 
40
#include "ha_pbxt.h"
 
41
 
 
42
#include "backup_xt.h"
 
43
#include "pthread_xt.h"
 
44
#include "filesys_xt.h"
 
45
#include "database_xt.h"
 
46
#include "strutil_xt.h"
 
47
#include "memory_xt.h"
 
48
#include "trace_xt.h"
 
49
#include "myxt_xt.h"
 
50
 
 
51
#ifdef OK
 
52
#undef OK
 
53
#endif
 
54
 
 
55
#ifdef byte
 
56
#undef byte
 
57
#endif
 
58
 
 
59
#ifdef DEBUG
 
60
//#define TRACE_BACKUP_CALLS
 
61
//#define TEST_SMALL_BLOCK                      100000
 
62
#endif
 
63
 
 
64
using backup::byte;
 
65
using backup::result_t;
 
66
using backup::version_t;
 
67
using backup::Table_list;
 
68
using backup::Table_ref;
 
69
using backup::Buffer;
 
70
 
 
71
#ifdef TRACE_BACKUP_CALLS
 
72
#define XT_TRACE_CALL()                         ha_trace_function(__FUNC__, NULL)
 
73
#else
 
74
#define XT_TRACE_CALL()
 
75
#endif
 
76
 
 
77
#define XT_RESTORE_BATCH_SIZE           10000
 
78
 
 
79
#define BUP_STATE_BEFORE_LOCK           0
 
80
#define BUP_STATE_AFTER_LOCK            1
 
81
 
 
82
#define BUP_STANDARD_VAR_RECORD         1
 
83
#define BUP_RECORD_BLOCK_4_START        2                       // Part of a record, with a 4 byte total length, and 4 byte data length
 
84
#define BUP_RECORD_BLOCK_4                      3                       // Part of a record, with a 4 byte length
 
85
#define BUP_RECORD_BLOCK_4_END          4                       // Last part of a record with a 4 byte length
 
86
 
 
87
/*
 
88
 * -----------------------------------------------------------------------
 
89
 * UTILITIES
 
90
 */
 
91
 
 
92
#ifdef TRACE_BACKUP_CALLS
 
93
static void ha_trace_function(const char *function, char *table)
 
94
{
 
95
        char            func_buf[50], *ptr;
 
96
        XTThreadPtr     thread = xt_get_self(); 
 
97
        
 
98
        if ((ptr = strchr(function, '('))) {
 
99
                ptr--;
 
100
                while (ptr > function) {
 
101
                        if (!(isalnum(*ptr) || *ptr == '_'))
 
102
                                break;
 
103
                        ptr--;
 
104
                }
 
105
                ptr++;
 
106
                xt_strcpy(50, func_buf, ptr);
 
107
                if ((ptr = strchr(func_buf, '(')))
 
108
                        *ptr = 0;
 
109
        }
 
110
        else
 
111
                xt_strcpy(50, func_buf, function);
 
112
        if (table)
 
113
                printf("%s %s (%s)\n", thread ? thread->t_name : "-unknown-", func_buf, table);
 
114
        else
 
115
                printf("%s %s\n", thread ? thread->t_name : "-unknown-", func_buf);
 
116
}
 
117
#endif
 
118
 
 
119
/*
 
120
 * -----------------------------------------------------------------------
 
121
 * BACKUP DRIVER
 
122
 */
 
123
 
 
124
class PBXTBackupDriver: public Backup_driver
 
125
{
 
126
        public:
 
127
        PBXTBackupDriver(const Table_list &);
 
128
        virtual ~PBXTBackupDriver();
 
129
 
 
130
        virtual size_t          size();
 
131
        virtual size_t          init_size();
 
132
        virtual result_t        begin(const size_t);
 
133
        virtual result_t        end();
 
134
        virtual result_t        get_data(Buffer &);
 
135
        virtual result_t        prelock();
 
136
        virtual result_t        lock();
 
137
        virtual result_t        unlock();
 
138
        virtual result_t        cancel();
 
139
        virtual void            free();
 
140
        void                            lock_tables_TL_READ_NO_INSERT();
 
141
 
 
142
        private:
 
143
        XTThreadPtr             bd_thread;
 
144
        int                             bd_state;
 
145
        u_int                   bd_table_no;
 
146
        XTOpenTablePtr  bd_ot;
 
147
        xtWord1                 *bd_row_buf;
 
148
 
 
149
        /* Non-zero if we last returned only part of
 
150
         * a row.
 
151
         */
 
152
        xtWord1                 *db_write_block(xtWord1 *buffer, xtWord1 bup_type, size_t *size, xtWord4 row_len);
 
153
        xtWord1                 *db_write_block(xtWord1 *buffer, xtWord1 bup_type, size_t *size, xtWord4 total_len, xtWord4 row_len);
 
154
 
 
155
        xtWord4                 bd_row_offset;
 
156
        xtWord4                 bd_row_size;
 
157
};
 
158
 
 
159
 
 
160
PBXTBackupDriver::PBXTBackupDriver(const Table_list &tables):
 
161
Backup_driver(tables),
 
162
bd_state(BUP_STATE_BEFORE_LOCK),
 
163
bd_table_no(0),
 
164
bd_ot(NULL),
 
165
bd_row_buf(NULL),
 
166
bd_row_offset(0),
 
167
bd_row_size(0)
 
168
{
 
169
}
 
170
 
 
171
PBXTBackupDriver::~PBXTBackupDriver()
 
172
{
 
173
}
 
174
 
 
175
/** Estimates total size of backup. @todo improve it */
 
176
size_t PBXTBackupDriver::size()
 
177
{
 
178
        XT_TRACE_CALL();
 
179
        return UNKNOWN_SIZE;
 
180
}
 
181
 
 
182
/** Estimates size of backup before lock. @todo improve it */
 
183
size_t PBXTBackupDriver::init_size()
 
184
{
 
185
        XT_TRACE_CALL();
 
186
        return 0;
 
187
}
 
188
 
 
189
result_t PBXTBackupDriver::begin(const size_t)
 
190
{
 
191
        THD                             *thd = current_thd;
 
192
        XTExceptionRec  e;
 
193
 
 
194
        XT_TRACE_CALL();
 
195
        
 
196
        if (!(bd_thread = xt_ha_set_current_thread(thd, &e))) {
 
197
                xt_log_exception(NULL, &e, XT_LOG_DEFAULT);
 
198
                return backup::ERROR;
 
199
        }
 
200
        
 
201
        return backup::OK;
 
202
}
 
203
 
 
204
result_t PBXTBackupDriver::end()
 
205
{
 
206
        XT_TRACE_CALL();
 
207
        if (bd_ot) {
 
208
                xt_tab_seq_exit(bd_ot);
 
209
                xt_db_return_table_to_pool_ns(bd_ot);
 
210
                bd_ot = NULL;
 
211
        }
 
212
        if (bd_thread->st_xact_data) {
 
213
                if (!xt_xn_commit(bd_thread))
 
214
                        return backup::ERROR;
 
215
        }
 
216
        return backup::OK;
 
217
}
 
218
 
 
219
xtWord1 *PBXTBackupDriver::db_write_block(xtWord1 *buffer, xtWord1 bup_type, size_t *ret_size, xtWord4 row_len)
 
220
{
 
221
        register size_t size = *ret_size;
 
222
 
 
223
        *buffer = bup_type;     // Record type identifier.
 
224
        buffer++;
 
225
        size--;
 
226
        memcpy(buffer, bd_ot->ot_row_wbuffer, row_len);
 
227
        buffer += row_len;
 
228
        size -= row_len;
 
229
        *ret_size = size;
 
230
        return buffer;
 
231
}
 
232
 
 
233
xtWord1 *PBXTBackupDriver::db_write_block(xtWord1 *buffer, xtWord1 bup_type, size_t *ret_size, xtWord4 total_len, xtWord4 row_len)
 
234
{
 
235
        register size_t size = *ret_size;
 
236
 
 
237
        *buffer = bup_type;     // Record type identifier.
 
238
        buffer++;
 
239
        size--;
 
240
        if (bup_type == BUP_RECORD_BLOCK_4_START) {
 
241
                XT_SET_DISK_4(buffer, total_len);
 
242
                buffer += 4;
 
243
                size -= 4;
 
244
        }
 
245
        XT_SET_DISK_4(buffer, row_len);
 
246
        buffer += 4;
 
247
        size -= 4;
 
248
        memcpy(buffer, bd_ot->ot_row_wbuffer+bd_row_offset, row_len);
 
249
        buffer += row_len;
 
250
        size -= row_len;
 
251
        bd_row_size -= row_len;
 
252
        bd_row_offset += row_len;
 
253
        *ret_size = size;
 
254
        return buffer;
 
255
}
 
256
 
 
257
result_t PBXTBackupDriver::get_data(Buffer &buf)
 
258
{
 
259
        xtBool  eof = FALSE;
 
260
        size_t  size;
 
261
        xtWord4 row_len;
 
262
        xtWord1 *buffer;
 
263
 
 
264
        XT_TRACE_CALL();
 
265
 
 
266
        if (bd_state == BUP_STATE_BEFORE_LOCK) {
 
267
                buf.table_num = 0;
 
268
                buf.size = 0;
 
269
                buf.last = FALSE;
 
270
                return backup::READY;
 
271
        }
 
272
 
 
273
        /* Open the backup table: */
 
274
        if (!bd_ot) {
 
275
                XTThreadPtr             self = bd_thread;
 
276
                XTTableHPtr             tab;
 
277
                char                    path[PATH_MAX];
 
278
        
 
279
                if (bd_table_no == m_tables.count()) {
 
280
                        buf.size = 0;
 
281
                        buf.table_num = 0;
 
282
                        buf.last = TRUE;
 
283
                        return backup::DONE;
 
284
                }
 
285
                
 
286
                m_tables[bd_table_no].internal_name(path, sizeof(path));
 
287
                bd_table_no++;
 
288
                try_(a) {
 
289
                        xt_ha_open_database_of_table(self, (XTPathStrPtr) path);
 
290
                        tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE);
 
291
                        pushr_(xt_heap_release, tab);
 
292
                        if (!(bd_ot = xt_db_open_table_using_tab(tab, bd_thread)))
 
293
                                xt_throw(self);
 
294
                        freer_(); // xt_heap_release(tab)
 
295
 
 
296
                        /* Prepare the seqential scan: */
 
297
                        xt_tab_seq_exit(bd_ot);
 
298
                        if (!xt_tab_seq_init(bd_ot))
 
299
                                xt_throw(self);
 
300
                        
 
301
                        if (bd_row_buf) {
 
302
                                xt_free(self, bd_row_buf);
 
303
                                bd_row_buf = NULL;
 
304
                        }
 
305
                        bd_row_buf = (xtWord1 *) xt_malloc(self, bd_ot->ot_table->tab_dic.dic_mysql_buf_size);
 
306
                        bd_ot->ot_cols_req = bd_ot->ot_table->tab_dic.dic_no_of_cols;
 
307
                }
 
308
                catch_(a) {
 
309
                        ;
 
310
                }
 
311
                cont_(a);
 
312
 
 
313
                if (!bd_ot)
 
314
                        goto failed;
 
315
        }
 
316
 
 
317
        buf.table_num = bd_table_no;
 
318
#ifdef TEST_SMALL_BLOCK
 
319
        buf.size = TEST_SMALL_BLOCK;
 
320
#endif
 
321
        size = buf.size;
 
322
        buffer = (xtWord1 *) buf.data;
 
323
        ASSERT_NS(size > 9);
 
324
 
 
325
        /* First check of a record was partically written
 
326
         * last time.
 
327
         */
 
328
        write_row:
 
329
        if (bd_row_size > 0) {
 
330
                row_len = bd_row_size;
 
331
                if (bd_row_offset == 0) {
 
332
                        if (row_len+1 > size) {
 
333
                                ASSERT_NS(size > 9);
 
334
                                row_len = size - 9;
 
335
                                buffer = db_write_block(buffer, BUP_RECORD_BLOCK_4_START, &size, bd_row_size, row_len);
 
336
                                goto done;
 
337
                        }
 
338
                        buffer = db_write_block(buffer, BUP_STANDARD_VAR_RECORD, &size, row_len);
 
339
                        bd_row_size = 0;
 
340
                }
 
341
                else {
 
342
                        if (row_len+5 > size) {
 
343
                                row_len = size - 5;
 
344
                                buffer = db_write_block(buffer, BUP_RECORD_BLOCK_4, &size, 0, row_len);
 
345
                                goto done;
 
346
                        }
 
347
                        buffer = db_write_block(buffer, BUP_RECORD_BLOCK_4_END, &size, 0, row_len);
 
348
                }
 
349
        }
 
350
 
 
351
        /* Now continue with the sequential scan. */
 
352
        while (size > 1) {
 
353
                if (!xt_tab_seq_next(bd_ot, bd_row_buf, &eof))
 
354
                        goto failed;
 
355
                if (eof) {
 
356
                        /* We will go the next table, on the next call. */
 
357
                        xt_tab_seq_exit(bd_ot);
 
358
                        xt_db_return_table_to_pool_ns(bd_ot);
 
359
                        bd_ot = NULL;
 
360
                        break;
 
361
                }
 
362
                if (!(row_len = myxt_store_row_data(bd_ot, 0, (char *) bd_row_buf)))
 
363
                        goto failed;
 
364
                if (row_len+1 > size) {
 
365
                        /* Does not fit: */
 
366
                        bd_row_offset = 0;
 
367
                        bd_row_size = row_len;
 
368
                        /* Only add part of the row, if there is still
 
369
                         * quite a bit of space left:
 
370
                         */
 
371
                        if (size >= (32 * 1024))
 
372
                                goto write_row;
 
373
                        break;
 
374
                }
 
375
                buffer = db_write_block(buffer, BUP_STANDARD_VAR_RECORD, &size, row_len);
 
376
        }
 
377
 
 
378
        done:
 
379
        buf.size = buf.size - size;
 
380
        /* This indicates wnd of data for a table! */
 
381
    buf.last = eof;
 
382
 
 
383
        return backup::OK;
 
384
 
 
385
        failed:
 
386
        xt_log_and_clear_exception(bd_thread);
 
387
        return backup::ERROR;
 
388
}
 
389
 
 
390
result_t PBXTBackupDriver::prelock()
 
391
{
 
392
        XT_TRACE_CALL();
 
393
        return backup::READY;
 
394
}
 
395
 
 
396
result_t PBXTBackupDriver::lock()
 
397
{
 
398
        XT_TRACE_CALL();
 
399
        bd_thread->st_xact_mode = XT_XACT_COMMITTED_READ;
 
400
        bd_thread->st_ignore_fkeys = FALSE;
 
401
        bd_thread->st_auto_commit = FALSE;
 
402
        bd_thread->st_table_trans = FALSE;
 
403
        bd_thread->st_abort_trans = FALSE;
 
404
        bd_thread->st_stat_ended = FALSE;
 
405
        bd_thread->st_stat_trans = FALSE;
 
406
        bd_thread->st_is_update = NULL;
 
407
        if (!xt_xn_begin(bd_thread))
 
408
                return backup::ERROR;
 
409
        bd_state = BUP_STATE_AFTER_LOCK;
 
410
        return backup::OK;
 
411
}
 
412
 
 
413
result_t PBXTBackupDriver::unlock()
 
414
{
 
415
        XT_TRACE_CALL();
 
416
        return backup::OK;
 
417
}
 
418
 
 
419
result_t PBXTBackupDriver::cancel()
 
420
{
 
421
        XT_TRACE_CALL();
 
422
        return backup::OK; // free() will be called and suffice
 
423
}
 
424
 
 
425
void PBXTBackupDriver::free()
 
426
{
 
427
        XT_TRACE_CALL();
 
428
        if (bd_ot) {
 
429
                xt_tab_seq_exit(bd_ot);
 
430
                xt_db_return_table_to_pool_ns(bd_ot);
 
431
                bd_ot = NULL;
 
432
        }
 
433
        if (bd_row_buf) {
 
434
                xt_free_ns(bd_row_buf);
 
435
                bd_row_buf = NULL;
 
436
        }
 
437
        if (bd_thread->st_xact_data)
 
438
                xt_xn_rollback(bd_thread);
 
439
        delete this;
 
440
}
 
441
 
 
442
void PBXTBackupDriver::lock_tables_TL_READ_NO_INSERT()
 
443
{
 
444
        XT_TRACE_CALL();
 
445
}
 
446
 
 
447
/*
 
448
 * -----------------------------------------------------------------------
 
449
 * BACKUP DRIVER
 
450
 */
 
451
 
 
452
class PBXTRestoreDriver: public Restore_driver
 
453
{
 
454
        public:
 
455
        PBXTRestoreDriver(const Table_list &tables);
 
456
        virtual ~PBXTRestoreDriver();
 
457
 
 
458
        virtual result_t  begin(const size_t);
 
459
        virtual result_t  end();
 
460
        virtual result_t  send_data(Buffer &buf);
 
461
        virtual result_t  cancel();
 
462
        virtual void      free();
 
463
        
 
464
        private:
 
465
        XTThreadPtr             rd_thread;
 
466
        u_int                   rd_table_no;
 
467
        XTOpenTablePtr  rd_ot;
 
468
        STRUCT_TABLE    *rd_my_table;
 
469
        xtWord1                 *rb_row_buf;
 
470
        u_int                   rb_col_cnt;
 
471
        u_int                   rb_insert_count;
 
472
 
 
473
        /* Long rows are accumulated here: */
 
474
        xtWord4                 rb_row_len;
 
475
        xtWord4                 rb_data_size;
 
476
        xtWord1                 *rb_row_data;
 
477
};
 
478
 
 
479
PBXTRestoreDriver::PBXTRestoreDriver(const Table_list &tables):
 
480
Restore_driver(tables),
 
481
rd_thread(NULL),
 
482
rd_table_no(0),
 
483
rd_ot(NULL),
 
484
rb_row_buf(NULL),
 
485
rb_row_len(0),
 
486
rb_data_size(0),
 
487
rb_row_data(NULL)
 
488
{
 
489
}
 
490
 
 
491
PBXTRestoreDriver::~PBXTRestoreDriver()
 
492
{
 
493
}
 
494
 
 
495
result_t PBXTRestoreDriver::begin(const size_t)
 
496
{
 
497
        THD                             *thd = current_thd;
 
498
        XTExceptionRec  e;
 
499
        
 
500
        XT_TRACE_CALL();
 
501
        
 
502
        if (!(rd_thread = xt_ha_set_current_thread(thd, &e))) {
 
503
                xt_log_exception(NULL, &e, XT_LOG_DEFAULT);
 
504
                return backup::ERROR;
 
505
        }
 
506
        
 
507
        return backup::OK;
 
508
}
 
509
 
 
510
result_t PBXTRestoreDriver::end()
 
511
{
 
512
        XT_TRACE_CALL();
 
513
        if (rd_ot) {
 
514
                xt_db_return_table_to_pool_ns(rd_ot);
 
515
                rd_ot = NULL;
 
516
        }
 
517
        //if (rb_row_buf) {
 
518
        //      xt_free_ns(rb_row_buf);
 
519
        //      rb_row_buf = NULL;
 
520
        //}
 
521
        if (rb_row_data) {
 
522
                xt_free_ns(rb_row_data);
 
523
                rb_row_data = NULL;
 
524
        }
 
525
        if (rd_thread->st_xact_data) {
 
526
                if (!xt_xn_commit(rd_thread))
 
527
                        return backup::ERROR;
 
528
        }
 
529
        return backup::OK;
 
530
}
 
531
 
 
532
 
 
533
result_t PBXTRestoreDriver::send_data(Buffer &buf)
 
534
{
 
535
        size_t  size;
 
536
        xtWord1 type;
 
537
        xtWord1 *buffer;
 
538
        xtWord4 row_len;
 
539
        xtWord1 *rec_data;
 
540
 
 
541
        XT_TRACE_CALL();
 
542
 
 
543
        if (buf.table_num != rd_table_no) {
 
544
                XTThreadPtr             self = rd_thread;
 
545
                XTTableHPtr             tab;
 
546
                char                    path[PATH_MAX];
 
547
                
 
548
                if (rd_ot) {
 
549
                        xt_db_return_table_to_pool_ns(rd_ot);
 
550
                        rd_ot = NULL;
 
551
                }
 
552
 
 
553
                if (rd_thread->st_xact_data) {
 
554
                        if (!xt_xn_commit(rd_thread))
 
555
                                goto failed;
 
556
                }
 
557
                if (!xt_xn_begin(rd_thread))
 
558
                        goto failed;
 
559
                rb_insert_count = 0;
 
560
                
 
561
                rd_table_no = buf.table_num;
 
562
                m_tables[rd_table_no-1].internal_name(path, sizeof(path));
 
563
                try_(a) {
 
564
                        xt_ha_open_database_of_table(self, (XTPathStrPtr) path);
 
565
                        tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE);
 
566
                        pushr_(xt_heap_release, tab);
 
567
                        if (!(rd_ot = xt_db_open_table_using_tab(tab, rd_thread)))
 
568
                                xt_throw(self);
 
569
                        freer_(); // xt_heap_release(tab)
 
570
 
 
571
                        rd_my_table = rd_ot->ot_table->tab_dic.dic_my_table;
 
572
                        if (rd_my_table->found_next_number_field) {
 
573
                                rd_my_table->in_use = current_thd;
 
574
                                rd_my_table->next_number_field = rd_my_table->found_next_number_field;
 
575
                                rd_my_table->mark_columns_used_by_index_no_reset(rd_my_table->s->next_number_index, rd_my_table->read_set);
 
576
                        }
 
577
 
 
578
                        /* This is safe because only one thread can restore a table at 
 
579
                         * a time!
 
580
                         */
 
581
                        rb_row_buf = (xtWord1 *) rd_my_table->record[0];
 
582
                        //if (rb_row_buf) {
 
583
                        //      xt_free(self, rb_row_buf);
 
584
                        //      rb_row_buf = NULL;
 
585
                        //}
 
586
                        //rb_row_buf = (xtWord1 *) xt_malloc(self, rd_ot->ot_table->tab_dic.dic_mysql_buf_size);
 
587
        
 
588
                        rb_col_cnt = rd_ot->ot_table->tab_dic.dic_no_of_cols;
 
589
 
 
590
                }
 
591
                catch_(a) {
 
592
                        ;
 
593
                }
 
594
                cont_(a);
 
595
                
 
596
                if (!rd_ot)
 
597
                        goto failed;
 
598
        }
 
599
 
 
600
        buffer = (xtWord1 *) buf.data;
 
601
        size = buf.size;
 
602
 
 
603
        while (size > 0) {
 
604
                type = *buffer;
 
605
                switch (type) {
 
606
                        case BUP_STANDARD_VAR_RECORD:
 
607
                                rec_data = buffer + 1;
 
608
                                break;
 
609
                        case BUP_RECORD_BLOCK_4_START:
 
610
                                buffer++;
 
611
                                row_len = XT_GET_DISK_4(buffer);
 
612
                                buffer += 4;
 
613
                                if (rb_data_size < row_len) {
 
614
                                        if (!xt_realloc_ns((void **) &rb_row_data, row_len))
 
615
                                                goto failed;
 
616
                                        rb_data_size = row_len;
 
617
                                }
 
618
                                row_len = XT_GET_DISK_4(buffer);
 
619
                                buffer += 4;
 
620
                                ASSERT_NS(row_len <= rb_data_size);
 
621
                                if (row_len > rb_data_size) {
 
622
                                        xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
 
623
                                        goto failed;
 
624
                                }
 
625
                                memcpy(rb_row_data, buffer, row_len);
 
626
                                rb_row_len = row_len;
 
627
                                buffer += row_len;
 
628
                                if (row_len + 9 > size) {
 
629
                                        xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
 
630
                                        goto failed;
 
631
                                }
 
632
                                size -= row_len + 9;
 
633
                                continue;
 
634
                        case BUP_RECORD_BLOCK_4:
 
635
                                buffer++;
 
636
                                row_len = XT_GET_DISK_4(buffer);
 
637
                                buffer += 4;
 
638
                                ASSERT_NS(rb_row_len + row_len <= rb_data_size);
 
639
                                if (rb_row_len + row_len > rb_data_size) {
 
640
                                        xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
 
641
                                        goto failed;
 
642
                                }
 
643
                                memcpy(rb_row_data + rb_row_len, buffer, row_len);
 
644
                                rb_row_len += row_len;
 
645
                                buffer += row_len;
 
646
                                if (row_len + 5 > size) {
 
647
                                        xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
 
648
                                        goto failed;
 
649
                                }
 
650
                                size -= row_len + 5;
 
651
                                continue;
 
652
                        case BUP_RECORD_BLOCK_4_END:
 
653
                                buffer++;
 
654
                                row_len = XT_GET_DISK_4(buffer);
 
655
                                buffer += 4;
 
656
                                ASSERT_NS(rb_row_len + row_len <= rb_data_size);
 
657
                                if (rb_row_len + row_len > rb_data_size) {
 
658
                                        xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
 
659
                                        goto failed;
 
660
                                }
 
661
                                memcpy(rb_row_data + rb_row_len, buffer, row_len);
 
662
                                buffer += row_len;
 
663
                                if (row_len + 5 > size) {
 
664
                                        xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
 
665
                                        goto failed;
 
666
                                }
 
667
                                size -= row_len + 5;
 
668
                                rec_data = rb_row_data;
 
669
                                break;
 
670
                        default:
 
671
                                xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
 
672
                                goto failed;
 
673
                }
 
674
                
 
675
                if (!(row_len = myxt_load_row_data(rd_ot, rec_data, rb_row_buf, rb_col_cnt)))
 
676
                        goto failed;
 
677
 
 
678
                if (rd_ot->ot_table->tab_dic.dic_my_table->found_next_number_field)
 
679
                        ha_set_auto_increment(rd_ot, rd_ot->ot_table->tab_dic.dic_my_table->found_next_number_field);
 
680
 
 
681
                if (!xt_tab_new_record(rd_ot, rb_row_buf))
 
682
                        goto failed;
 
683
 
 
684
                if (type == BUP_STANDARD_VAR_RECORD) {
 
685
                        buffer += row_len+1;
 
686
                        if (row_len + 1 > size) {
 
687
                                xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
 
688
                                goto failed;
 
689
                        }
 
690
                        size -= row_len + 1;
 
691
                }
 
692
 
 
693
                rb_insert_count++;
 
694
                if (rb_insert_count == XT_RESTORE_BATCH_SIZE) {
 
695
                        if (!xt_xn_commit(rd_thread))
 
696
                                goto failed;
 
697
                        if (!xt_xn_begin(rd_thread))
 
698
                                goto failed;
 
699
                        rb_insert_count = 0;
 
700
                }
 
701
        }
 
702
 
 
703
        return backup::OK;
 
704
        
 
705
        failed:
 
706
        xt_log_and_clear_exception(rd_thread);
 
707
        return backup::ERROR;
 
708
}
 
709
 
 
710
 
 
711
result_t PBXTRestoreDriver::cancel()
 
712
{
 
713
        XT_TRACE_CALL();
 
714
        /* Nothing to do in cancel(); free() will suffice */
 
715
        return backup::OK;
 
716
}
 
717
 
 
718
void PBXTRestoreDriver::free()
 
719
{
 
720
        XT_TRACE_CALL();
 
721
        if (rd_ot) {
 
722
                xt_db_return_table_to_pool_ns(rd_ot);
 
723
                rd_ot = NULL;
 
724
        }
 
725
        //if (rb_row_buf) {
 
726
        //      xt_free_ns(rb_row_buf);
 
727
        //      rb_row_buf = NULL;
 
728
        //}
 
729
        if (rb_row_data) {
 
730
                xt_free_ns(rb_row_data);
 
731
                rb_row_data = NULL;
 
732
        }
 
733
        if (rd_thread->st_xact_data)
 
734
                xt_xn_rollback(rd_thread);
 
735
        delete this;
 
736
}
 
737
 
 
738
/*
 
739
 * -----------------------------------------------------------------------
 
740
 * BACKUP ENGINE FACTORY
 
741
 */
 
742
 
 
743
#define PBXT_BACKUP_VERSION 1
 
744
 
 
745
 
 
746
class PBXTBackupEngine: public Backup_engine
 
747
{
 
748
        public:
 
749
        PBXTBackupEngine() { };
 
750
 
 
751
        virtual version_t version() const {
 
752
                return PBXT_BACKUP_VERSION;
 
753
        };
 
754
 
 
755
        virtual result_t get_backup(const uint32, const Table_list &, Backup_driver* &);
 
756
 
 
757
        virtual result_t get_restore(const version_t, const uint32, const Table_list &,Restore_driver* &);
 
758
 
 
759
        virtual void free()
 
760
        {
 
761
                delete this;
 
762
        }
 
763
};
 
764
 
 
765
result_t PBXTBackupEngine::get_backup(const u_int count, const Table_list &tables, Backup_driver* &drv)
 
766
{
 
767
        PBXTBackupDriver *ptr = new PBXTBackupDriver(tables);
 
768
 
 
769
        if (!ptr)
 
770
                return backup::ERROR;
 
771
        drv = ptr;
 
772
        return backup::OK;
 
773
}
 
774
 
 
775
result_t PBXTBackupEngine::get_restore(const version_t ver, const uint32,
 
776
                             const Table_list &tables, Restore_driver* &drv)
 
777
{
 
778
        if (ver > PBXT_BACKUP_VERSION)
 
779
        {
 
780
                return backup::ERROR;    
 
781
        }
 
782
        
 
783
        PBXTRestoreDriver *ptr = new PBXTRestoreDriver(tables);
 
784
 
 
785
        if (!ptr)
 
786
                return backup::ERROR;
 
787
        drv = (Restore_driver *) ptr;
 
788
        return backup::OK;
 
789
}
 
790
 
 
791
 
 
792
Backup_result_t pbxt_backup_engine(handlerton *self, Backup_engine* &be)
 
793
{
 
794
        be = new PBXTBackupEngine();
 
795
        
 
796
        if (!be)
 
797
                return backup::ERROR;
 
798
        
 
799
        return backup::OK;
 
800
}
 
801
 
 
802
#endif