~jlukas79/+junk/mysql-server

« back to all changes in this revision

Viewing changes to storage/maria/ma_recovery.c

manual merge 6.0-main --> 6.0-bka-review

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2006, 2007 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/*
 
17
  WL#3072 Maria recovery
 
18
  First version written by Guilhem Bichot on 2006-04-27.
 
19
*/
 
20
 
 
21
/* Here is the implementation of this module */
 
22
 
 
23
#include "maria_def.h"
 
24
#include "ma_recovery.h"
 
25
#include "ma_blockrec.h"
 
26
#include "ma_checkpoint.h"
 
27
#include "trnman.h"
 
28
#include "ma_key_recover.h"
 
29
#include "ma_recovery_util.h"
 
30
 
 
31
struct st_trn_for_recovery /* used only in the REDO phase */
 
32
{
 
33
  LSN group_start_lsn, undo_lsn, first_undo_lsn;
 
34
  TrID long_trid;
 
35
};
 
36
struct st_table_for_recovery /* used in the REDO and UNDO phase */
 
37
{
 
38
  MARIA_HA *info;
 
39
};
 
40
/* Variables used by all functions of this module. Ok as single-threaded */
 
41
static struct st_trn_for_recovery *all_active_trans;
 
42
static struct st_table_for_recovery *all_tables;
 
43
static struct st_dirty_page *dirty_pages_pool;
 
44
static LSN current_group_end_lsn;
 
45
#ifndef DBUG_OFF
 
46
/** Current group of REDOs is about this table and only this one */
 
47
static MARIA_HA *current_group_table;
 
48
#endif
 
49
static TrID max_long_trid= 0; /**< max long trid seen by REDO phase */
 
50
static my_bool skip_DDLs; /**< if REDO phase should skip DDL records */
 
51
/** @brief to avoid writing a checkpoint if recovery did nothing. */
 
52
static my_bool checkpoint_useful;
 
53
static my_bool in_redo_phase;
 
54
static my_bool trns_created;
 
55
static ulong skipped_undo_phase;
 
56
static ulonglong now; /**< for tracking execution time of phases */
 
57
static void (*save_error_handler_hook)(uint, const char *,myf);
 
58
static uint recovery_warnings; /**< count of warnings */
 
59
 
 
60
#define prototype_redo_exec_hook(R)                                          \
 
61
  static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
 
62
 
 
63
#define prototype_redo_exec_hook_dummy(R)                                    \
 
64
  static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec        \
 
65
                               __attribute__ ((unused)))
 
66
 
 
67
#define prototype_undo_exec_hook(R)                                          \
 
68
  static int exec_UNDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec, TRN *trn)
 
69
 
 
70
prototype_redo_exec_hook(LONG_TRANSACTION_ID);
 
71
prototype_redo_exec_hook_dummy(CHECKPOINT);
 
72
prototype_redo_exec_hook(REDO_CREATE_TABLE);
 
73
prototype_redo_exec_hook(REDO_RENAME_TABLE);
 
74
prototype_redo_exec_hook(REDO_REPAIR_TABLE);
 
75
prototype_redo_exec_hook(REDO_DROP_TABLE);
 
76
prototype_redo_exec_hook(FILE_ID);
 
77
prototype_redo_exec_hook(INCOMPLETE_LOG);
 
78
prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP);
 
79
prototype_redo_exec_hook(UNDO_BULK_INSERT);
 
80
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
 
81
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
 
82
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
 
83
prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD);
 
84
prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL);
 
85
prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
 
86
prototype_redo_exec_hook(REDO_FREE_BLOCKS);
 
87
prototype_redo_exec_hook(REDO_DELETE_ALL);
 
88
prototype_redo_exec_hook(REDO_INDEX);
 
89
prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE);
 
90
prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE);
 
91
prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE);
 
92
prototype_redo_exec_hook(UNDO_ROW_INSERT);
 
93
prototype_redo_exec_hook(UNDO_ROW_DELETE);
 
94
prototype_redo_exec_hook(UNDO_ROW_UPDATE);
 
95
prototype_redo_exec_hook(UNDO_KEY_INSERT);
 
96
prototype_redo_exec_hook(UNDO_KEY_DELETE);
 
97
prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
 
98
prototype_redo_exec_hook(COMMIT);
 
99
prototype_redo_exec_hook(CLR_END);
 
100
prototype_undo_exec_hook(UNDO_ROW_INSERT);
 
101
prototype_undo_exec_hook(UNDO_ROW_DELETE);
 
102
prototype_undo_exec_hook(UNDO_ROW_UPDATE);
 
103
prototype_undo_exec_hook(UNDO_KEY_INSERT);
 
104
prototype_undo_exec_hook(UNDO_KEY_DELETE);
 
105
prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
 
106
prototype_undo_exec_hook(UNDO_BULK_INSERT);
 
107
 
 
108
static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply);
 
109
static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
 
110
static int run_undo_phase(uint uncommitted);
 
111
static void display_record_position(const LOG_DESC *log_desc,
 
112
                                    const TRANSLOG_HEADER_BUFFER *rec,
 
113
                                    uint number);
 
114
static int display_and_apply_record(const LOG_DESC *log_desc,
 
115
                                    const TRANSLOG_HEADER_BUFFER *rec);
 
116
static MARIA_HA *get_MARIA_HA_from_REDO_record(const
 
117
                                               TRANSLOG_HEADER_BUFFER *rec);
 
118
static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
 
119
                                               TRANSLOG_HEADER_BUFFER *rec);
 
120
static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon);
 
121
static LSN parse_checkpoint_record(LSN lsn);
 
122
static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
 
123
                            LSN first_undo_lsn);
 
124
static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id);
 
125
static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
 
126
                    struct st_dirty_page *dirty_page);
 
127
static int close_all_tables(void);
 
128
static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr);
 
129
static void print_redo_phase_progress(TRANSLOG_ADDRESS addr);
 
130
static void delete_all_transactions();
 
131
 
 
132
/** @brief global [out] buffer for translog_read_record(); never shrinks */
 
133
static struct
 
134
{
 
135
  /*
 
136
    uchar* is more adapted (less casts) than char*, thus we don't use
 
137
    LEX_STRING.
 
138
  */
 
139
  uchar *str;
 
140
  size_t length;
 
141
} log_record_buffer;
 
142
static void enlarge_buffer(const TRANSLOG_HEADER_BUFFER *rec)
 
143
{
 
144
  if (log_record_buffer.length < rec->record_length)
 
145
  {
 
146
    log_record_buffer.length= rec->record_length;
 
147
    log_record_buffer.str= my_realloc(log_record_buffer.str,
 
148
                                      rec->record_length,
 
149
                                      MYF(MY_WME | MY_ALLOW_ZERO_PTR));
 
150
  }
 
151
}
 
152
/** @brief Tells what kind of progress message was printed to the error log */
 
153
static enum recovery_message_type
 
154
{
 
155
  REC_MSG_NONE= 0, REC_MSG_REDO, REC_MSG_UNDO, REC_MSG_FLUSH
 
156
} recovery_message_printed;
 
157
 
 
158
 
 
159
/* Hook to ensure we get nicer output if we get an error */
 
160
 
 
161
void maria_recover_error_handler_hook(uint error, const char *str,
 
162
                                      myf flags)
 
163
{
 
164
  if (procent_printed)
 
165
  {
 
166
    procent_printed= 0;
 
167
    fputc('\n', stderr);
 
168
    fflush(stderr);
 
169
  }
 
170
  (*save_error_handler_hook)(error, str, flags);
 
171
}
 
172
 
 
173
#define ALERT_USER() DBUG_ASSERT(0)
 
174
 
 
175
static void print_preamble()
 
176
{
 
177
  ma_message_no_user(ME_JUST_INFO, "starting recovery");
 
178
}
 
179
 
 
180
 
 
181
/**
 
182
   @brief Recovers from the last checkpoint.
 
183
 
 
184
   Runs the REDO phase using special structures, then sets up the playground
 
185
   of runtime: recreates transactions inside trnman, open tables with their
 
186
   two-byte-id mapping; takes a checkpoint and runs the UNDO phase. Closes all
 
187
   tables.
 
188
 
 
189
   @return Operation status
 
190
     @retval 0      OK
 
191
     @retval !=0    Error
 
192
*/
 
193
 
 
194
int maria_recovery_from_log(void)
 
195
{
 
196
  int res= 1;
 
197
  FILE *trace_file;
 
198
  uint warnings_count;
 
199
  DBUG_ENTER("maria_recovery_from_log");
 
200
 
 
201
  DBUG_ASSERT(!maria_in_recovery);
 
202
  maria_in_recovery= TRUE;
 
203
 
 
204
#ifdef EXTRA_DEBUG
 
205
  trace_file= fopen("maria_recovery.trace", "a+");
 
206
#else
 
207
  trace_file= NULL; /* no trace file for being fast */
 
208
#endif
 
209
  tprint(trace_file, "TRACE of the last MARIA recovery from mysqld\n");
 
210
  DBUG_ASSERT(maria_pagecache->inited);
 
211
  res= maria_apply_log(LSN_IMPOSSIBLE, MARIA_LOG_APPLY, trace_file,
 
212
                       TRUE, TRUE, TRUE, &warnings_count);
 
213
  if (!res)
 
214
  {
 
215
    if (warnings_count == 0)
 
216
      tprint(trace_file, "SUCCESS\n");
 
217
    else
 
218
      tprint(trace_file, "DOUBTFUL (%u warnings, check previous output)\n",
 
219
             warnings_count);
 
220
  }
 
221
  if (trace_file)
 
222
    fclose(trace_file);
 
223
  maria_in_recovery= FALSE;
 
224
  DBUG_RETURN(res);
 
225
}
 
226
 
 
227
 
 
228
/**
 
229
   @brief Displays and/or applies the log
 
230
 
 
231
   @param  from_lsn        LSN from which log reading/applying should start;
 
232
                           LSN_IMPOSSIBLE means "use last checkpoint"
 
233
   @param  apply           how log records should be applied or not
 
234
   @param  trace_file      trace file where progress/debug messages will go
 
235
   @param  skip_DDLs_arg   Should DDL records (CREATE/RENAME/DROP/REPAIR)
 
236
                           be skipped by the REDO phase or not
 
237
   @param  take_checkpoints Should we take checkpoints or not.
 
238
   @param[out] warnings_count Count of warnings will be put there
 
239
 
 
240
   @todo This trace_file thing is primitive; soon we will make it similar to
 
241
   ma_check_print_warning() etc, and a successful recovery does not need to
 
242
   create a trace file. But for debugging now it is useful.
 
243
 
 
244
   @return Operation status
 
245
     @retval 0      OK
 
246
     @retval !=0    Error
 
247
*/
 
248
 
 
249
int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
 
250
                    FILE *trace_file,
 
251
                    my_bool should_run_undo_phase, my_bool skip_DDLs_arg,
 
252
                    my_bool take_checkpoints, uint *warnings_count)
 
253
{
 
254
  int error= 0;
 
255
  uint uncommitted_trans;
 
256
  ulonglong old_now;
 
257
  DBUG_ENTER("maria_apply_log");
 
258
 
 
259
  DBUG_ASSERT(apply == MARIA_LOG_APPLY || !should_run_undo_phase);
 
260
  DBUG_ASSERT(!maria_multi_threaded);
 
261
  recovery_warnings= 0;
 
262
  /* checkpoints can happen only if TRNs have been built */
 
263
  DBUG_ASSERT(should_run_undo_phase || !take_checkpoints);
 
264
  all_active_trans= (struct st_trn_for_recovery *)
 
265
    my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct st_trn_for_recovery),
 
266
              MYF(MY_ZEROFILL));
 
267
  all_tables= (struct st_table_for_recovery *)
 
268
    my_malloc((SHARE_ID_MAX + 1) * sizeof(struct st_table_for_recovery),
 
269
              MYF(MY_ZEROFILL));
 
270
 
 
271
  save_error_handler_hook= error_handler_hook;
 
272
  error_handler_hook= maria_recover_error_handler_hook;
 
273
 
 
274
  if (!all_active_trans || !all_tables)
 
275
    goto err;
 
276
 
 
277
  if (take_checkpoints && ma_checkpoint_init(0))
 
278
    goto err;
 
279
 
 
280
  recovery_message_printed= REC_MSG_NONE;
 
281
  checkpoint_useful= trns_created= FALSE;
 
282
  tracef= trace_file;
 
283
#ifdef INSTANT_FLUSH_OF_MESSAGES
 
284
  /* enable this for instant flush of messages to trace file */
 
285
  setbuf(tracef, NULL);
 
286
#endif
 
287
  skip_DDLs= skip_DDLs_arg;
 
288
  skipped_undo_phase= 0;
 
289
 
 
290
  if (from_lsn == LSN_IMPOSSIBLE)
 
291
  {
 
292
    if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
 
293
    {
 
294
      from_lsn= translog_first_lsn_in_log();
 
295
      if (unlikely(from_lsn == LSN_ERROR))
 
296
        goto err;
 
297
    }
 
298
    else
 
299
    {
 
300
      from_lsn= parse_checkpoint_record(last_checkpoint_lsn);
 
301
      if (from_lsn == LSN_ERROR)
 
302
        goto err;
 
303
    }
 
304
  }
 
305
 
 
306
  now= my_getsystime();
 
307
  in_redo_phase= TRUE;
 
308
  if (run_redo_phase(from_lsn, apply))
 
309
  {
 
310
    ma_message_no_user(0, "Redo phase failed");
 
311
    goto err;
 
312
  }
 
313
 
 
314
  if ((uncommitted_trans=
 
315
       end_of_redo_phase(should_run_undo_phase)) == (uint)-1)
 
316
  {
 
317
    ma_message_no_user(0, "End of redo phase failed");
 
318
    goto err;
 
319
  }
 
320
  in_redo_phase= FALSE;
 
321
 
 
322
  old_now= now;
 
323
  now= my_getsystime();
 
324
  if (recovery_message_printed == REC_MSG_REDO)
 
325
  {
 
326
    double phase_took= (now - old_now)/10000000.0;
 
327
    /*
 
328
      Detailed progress info goes to stderr, because ma_message_no_user()
 
329
      cannot put several messages on one line.
 
330
    */
 
331
    procent_printed= 1;
 
332
    fprintf(stderr, " (%.1f seconds); ", phase_took);
 
333
    fflush(stderr);
 
334
  }
 
335
 
 
336
  /**
 
337
     REDO phase does not fill blocks' rec_lsn, so a checkpoint now would be
 
338
     wrong: if a future recovery used it, the REDO phase would always
 
339
     start from the checkpoint and never from before, wrongly skipping REDOs
 
340
     (tested). Another problem is that the REDO phase uses
 
341
     PAGECACHE_PLAIN_PAGE, while Checkpoint only collects PAGECACHE_LSN_PAGE.
 
342
 
 
343
     @todo fix this. pagecache_write() now can have a rec_lsn argument. And we
 
344
     could make a function which goes through pages at end of REDO phase and
 
345
     changes their type.
 
346
  */
 
347
#ifdef FIX_AND_ENABLE_LATER
 
348
  if (take_checkpoints && checkpoint_useful)
 
349
  {
 
350
    /*
 
351
      We take a checkpoint as it can save future recovery work if we crash
 
352
      during the UNDO phase. But we don't flush pages, as UNDOs will change
 
353
      them again probably.
 
354
      If we wanted to take checkpoints in the middle of the REDO phase, at a
 
355
      moment when we haven't reached the end of log so don't have exact data
 
356
      about transactions, we could write a special checkpoint: containing only
 
357
      the list of dirty pages, otherwise to be treated as if it was at the
 
358
      same LSN as the last checkpoint.
 
359
    */
 
360
    if (ma_checkpoint_execute(CHECKPOINT_INDIRECT, FALSE))
 
361
      goto err;
 
362
  }
 
363
#endif
 
364
 
 
365
  if (should_run_undo_phase)
 
366
  {
 
367
    if (run_undo_phase(uncommitted_trans))
 
368
    {
 
369
      ma_message_no_user(0, "Undo phase failed");
 
370
      goto err;
 
371
    }
 
372
  }
 
373
  else if (uncommitted_trans > 0)
 
374
  {
 
375
    eprint(tracef, "***WARNING: %u uncommitted transactions; some tables may"
 
376
           " be left inconsistent!***", uncommitted_trans);
 
377
    recovery_warnings++;
 
378
  }
 
379
 
 
380
  if (skipped_undo_phase)
 
381
  {
 
382
    /*
 
383
      We could want to print a list of tables for which UNDOs were skipped,
 
384
      but not one line per skipped UNDO.
 
385
    */
 
386
    eprint(tracef, "***WARNING: %lu UNDO records skipped in UNDO phase; some"
 
387
           " tables may be left inconsistent!***", skipped_undo_phase);
 
388
    recovery_warnings++;
 
389
  }
 
390
 
 
391
  old_now= now;
 
392
  now= my_getsystime();
 
393
  if (recovery_message_printed == REC_MSG_UNDO)
 
394
  {
 
395
    double phase_took= (now - old_now)/10000000.0;
 
396
    procent_printed= 1;
 
397
    fprintf(stderr, " (%.1f seconds); ", phase_took);
 
398
    fflush(stderr);
 
399
  }
 
400
 
 
401
  /*
 
402
    we don't use maria_panic() because it would maria_end(), and Recovery does
 
403
    not want that (we want to keep some modules initialized for runtime).
 
404
  */
 
405
  if (close_all_tables())
 
406
  {
 
407
    ma_message_no_user(0, "closing of tables failed");
 
408
    goto err;
 
409
  }
 
410
 
 
411
  old_now= now;
 
412
  now= my_getsystime();
 
413
  if (recovery_message_printed == REC_MSG_FLUSH)
 
414
  {
 
415
    double phase_took= (now - old_now)/10000000.0;
 
416
    procent_printed= 1;
 
417
    fprintf(stderr, " (%.1f seconds); ", phase_took);
 
418
    fflush(stderr);
 
419
  }
 
420
 
 
421
  if (take_checkpoints && checkpoint_useful)
 
422
  {
 
423
    /* No dirty pages, all tables are closed, no active transactions, save: */
 
424
    if (ma_checkpoint_execute(CHECKPOINT_FULL, FALSE))
 
425
      goto err;
 
426
  }
 
427
 
 
428
  goto end;
 
429
err:
 
430
  error= 1;
 
431
  tprint(tracef, "\nRecovery of tables with transaction logs FAILED\n");
 
432
  if (trns_created)
 
433
    delete_all_transactions();
 
434
end:
 
435
  error_handler_hook= save_error_handler_hook;
 
436
  hash_free(&all_dirty_pages);
 
437
  bzero(&all_dirty_pages, sizeof(all_dirty_pages));
 
438
  my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
 
439
  dirty_pages_pool= NULL;
 
440
  my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
 
441
  all_tables= NULL;
 
442
  my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
 
443
  all_active_trans= NULL;
 
444
  my_free(log_record_buffer.str, MYF(MY_ALLOW_ZERO_PTR));
 
445
  log_record_buffer.str= NULL;
 
446
  log_record_buffer.length= 0;
 
447
  ma_checkpoint_end();
 
448
  *warnings_count= recovery_warnings;
 
449
  if (recovery_message_printed != REC_MSG_NONE)
 
450
  {
 
451
    if (procent_printed)
 
452
    {
 
453
      procent_printed= 0;
 
454
      fprintf(stderr, "\n");
 
455
      fflush(stderr);
 
456
    }
 
457
    if (!error)
 
458
      ma_message_no_user(ME_JUST_INFO, "recovery done");
 
459
  }
 
460
  if (error)
 
461
    my_message(HA_ERR_INITIALIZATION,
 
462
               "Maria recovery failed. Please run maria_chk -r on all maria "
 
463
               "tables and delete all maria_log.######## files", MYF(0));
 
464
  procent_printed= 0;
 
465
  /*
 
466
    We don't cleanly close tables if we hit some error (may corrupt them by
 
467
    flushing some wrong blocks made from wrong REDOs). It also leaves their
 
468
    open_count>0, which ensures that --maria-recover, if used, will try to
 
469
    repair them.
 
470
  */
 
471
  DBUG_RETURN(error);
 
472
}
 
473
 
 
474
 
 
475
/* very basic info about the record's header */
 
476
static void display_record_position(const LOG_DESC *log_desc,
 
477
                                    const TRANSLOG_HEADER_BUFFER *rec,
 
478
                                    uint number)
 
479
{
 
480
  /*
 
481
    if number==0, we're going over records which we had already seen and which
 
482
    form a group, so we indent below the group's end record
 
483
  */
 
484
  tprint(tracef,
 
485
         "%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n",
 
486
         number ? "" : "   ", number, LSN_IN_PARTS(rec->lsn),
 
487
         rec->short_trid, log_desc->name, rec->type,
 
488
         (ulong)rec->record_length);
 
489
}
 
490
 
 
491
 
 
492
static int display_and_apply_record(const LOG_DESC *log_desc,
 
493
                                    const TRANSLOG_HEADER_BUFFER *rec)
 
494
{
 
495
  int error;
 
496
  if (log_desc->record_execute_in_redo_phase == NULL)
 
497
  {
 
498
    /* die on all not-yet-handled records :) */
 
499
    DBUG_ASSERT("one more hook" == "to write");
 
500
    return 1;
 
501
  }
 
502
  if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
 
503
    eprint(tracef, "Got error %d when executing record %s",
 
504
           my_errno, log_desc->name);
 
505
  return error;
 
506
}
 
507
 
 
508
 
 
509
prototype_redo_exec_hook(LONG_TRANSACTION_ID)
 
510
{
 
511
  uint16 sid= rec->short_trid;
 
512
  TrID long_trid= all_active_trans[sid].long_trid;
 
513
  /*
 
514
    Any incomplete group should be of an old crash which already had a
 
515
    recovery and thus has logged INCOMPLETE_GROUP which we must have seen.
 
516
  */
 
517
  DBUG_ASSERT(all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE);
 
518
  if (long_trid != 0)
 
519
  {
 
520
    LSN ulsn= all_active_trans[sid].undo_lsn;
 
521
    /*
 
522
      If the first record of that transaction is after 'rec', it's probably
 
523
      because that transaction was found in the checkpoint record, and then
 
524
      it's ok, we can forget about that transaction (we'll meet it later
 
525
      again in the REDO phase) and replace it with the one in 'rec'.
 
526
    */
 
527
    if ((ulsn != LSN_IMPOSSIBLE) &&
 
528
        (cmp_translog_addr(ulsn, rec->lsn) < 0))
 
529
    {
 
530
      char llbuf[22];
 
531
      llstr(long_trid, llbuf);
 
532
      eprint(tracef, "Found an old transaction long_trid %s short_trid %u"
 
533
             " with same short id as this new transaction, and has neither"
 
534
             " committed nor rollback (undo_lsn: (%lu,0x%lx))",
 
535
             llbuf, sid, LSN_IN_PARTS(ulsn));
 
536
      goto err;
 
537
    }
 
538
  }
 
539
  long_trid= uint6korr(rec->header);
 
540
  new_transaction(sid, long_trid, LSN_IMPOSSIBLE, LSN_IMPOSSIBLE);
 
541
  goto end;
 
542
err:
 
543
  ALERT_USER();
 
544
  return 1;
 
545
end:
 
546
  return 0;
 
547
}
 
548
 
 
549
 
 
550
static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
 
551
                            LSN first_undo_lsn)
 
552
{
 
553
  char llbuf[22];
 
554
  all_active_trans[sid].long_trid= long_id;
 
555
  llstr(long_id, llbuf);
 
556
  tprint(tracef, "Transaction long_trid %s short_trid %u starts\n",
 
557
         llbuf, sid);
 
558
  all_active_trans[sid].undo_lsn= undo_lsn;
 
559
  all_active_trans[sid].first_undo_lsn= first_undo_lsn;
 
560
  set_if_bigger(max_long_trid, long_id);
 
561
}
 
562
 
 
563
 
 
564
prototype_redo_exec_hook_dummy(CHECKPOINT)
 
565
{
 
566
  /* the only checkpoint we care about was found via control file, ignore */
 
567
  return 0;
 
568
}
 
569
 
 
570
 
 
571
prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP)
 
572
{
 
573
  /* abortion was already made */
 
574
  return 0;
 
575
}
 
576
 
 
577
 
 
578
prototype_redo_exec_hook(INCOMPLETE_LOG)
 
579
{
 
580
  MARIA_HA *info;
 
581
  if (skip_DDLs)
 
582
  {
 
583
    tprint(tracef, "we skip DDLs\n");
 
584
    return 0;
 
585
  }
 
586
  if ((info= get_MARIA_HA_from_REDO_record(rec)) == NULL)
 
587
  {
 
588
    /* no such table, don't need to warn */
 
589
    return 0;
 
590
  }
 
591
  /*
 
592
    Example of what can go wrong when replaying DDLs:
 
593
    CREATE TABLE t (logged); INSERT INTO t VALUES(1) (logged);
 
594
    ALTER TABLE t ... which does
 
595
    CREATE a temporary table #sql... (logged)
 
596
    INSERT data from t into #sql... (not logged)
 
597
    RENAME #sql TO t (logged)
 
598
    Removing tables by hand and replaying the log will leave in the
 
599
    end an empty table "t": missing records. If after the RENAME an INSERT
 
600
    into t was done, that row had number 1 in its page, executing the
 
601
    REDO_INSERT_ROW_HEAD on the recreated empty t will fail (assertion
 
602
    failure in _ma_apply_redo_insert_row_head_or_tail(): new data page is
 
603
    created whereas rownr is not 0).
 
604
    So when the server disables logging for ALTER TABLE or CREATE SELECT, it
 
605
    logs LOGREC_INCOMPLETE_LOG to warn maria_read_log and then the user.
 
606
 
 
607
    Another issue is that replaying of DDLs is not correct enough to work if
 
608
    there was a crash during a DDL (see comment in execution of
 
609
    REDO_RENAME_TABLE ).
 
610
  */
 
611
  tprint(tracef, "***WARNING: MySQL server currently logs no records"
 
612
         " about insertion of data by ALTER TABLE and CREATE SELECT,"
 
613
         " as they are not necessary for recovery;"
 
614
         " present applying of log records may well not work.***\n");
 
615
  recovery_warnings++;
 
616
  return 0;
 
617
}
 
618
 
 
619
 
 
620
prototype_redo_exec_hook(REDO_CREATE_TABLE)
 
621
{
 
622
  File dfile= -1, kfile= -1;
 
623
  char *linkname_ptr, filename[FN_REFLEN], *name, *ptr, *ptr2,
 
624
    *data_file_name, *index_file_name;
 
625
  uchar *kfile_header;
 
626
  myf create_flag;
 
627
  uint flags;
 
628
  int error= 1, create_mode= O_RDWR | O_TRUNC, i;
 
629
  MARIA_HA *info= NULL;
 
630
  uint kfile_size_before_extension, keystart;
 
631
 
 
632
  if (skip_DDLs)
 
633
  {
 
634
    tprint(tracef, "we skip DDLs\n");
 
635
    return 0;
 
636
  }
 
637
  enlarge_buffer(rec);
 
638
  if (log_record_buffer.str == NULL ||
 
639
      translog_read_record(rec->lsn, 0, rec->record_length,
 
640
                           log_record_buffer.str, NULL) !=
 
641
      rec->record_length)
 
642
  {
 
643
    eprint(tracef, "Failed to read record");
 
644
    goto end;
 
645
  }
 
646
  name= (char *)log_record_buffer.str;
 
647
  /*
 
648
    TRUNCATE TABLE and REPAIR USE_FRM call maria_create(), so below we can
 
649
    find a REDO_CREATE_TABLE for a table which we have open, that's why we
 
650
    need to look for any open instances and close them first.
 
651
  */
 
652
  if (close_one_table(name, rec->lsn))
 
653
  {
 
654
    eprint(tracef, "Table '%s' got error %d on close", name, my_errno);
 
655
    ALERT_USER();
 
656
    goto end;
 
657
  }
 
658
  /* we try hard to get create_rename_lsn, to avoid mistakes if possible */
 
659
  info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
 
660
  if (info)
 
661
  {
 
662
    MARIA_SHARE *share= info->s;
 
663
    /* check that we're not already using it */
 
664
    if (share->reopen != 1)
 
665
    {
 
666
      eprint(tracef, "Table '%s is already open (reopen=%u)",
 
667
             name, share->reopen);
 
668
      ALERT_USER();
 
669
      goto end;
 
670
    }
 
671
    DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
 
672
    if (!share->base.born_transactional)
 
673
    {
 
674
      /*
 
675
        could be that transactional table was later dropped, and a non-trans
 
676
        one was renamed to its name, thus create_rename_lsn is 0 and should
 
677
        not be trusted.
 
678
      */
 
679
      tprint(tracef, "Table '%s' is not transactional, ignoring creation\n",
 
680
             name);
 
681
      ALERT_USER();
 
682
      error= 0;
 
683
      goto end;
 
684
    }
 
685
    if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
 
686
    {
 
687
      tprint(tracef, "Table '%s' has create_rename_lsn (%lu,0x%lx) more "
 
688
             "recent than record, ignoring creation",
 
689
             name, LSN_IN_PARTS(share->state.create_rename_lsn));
 
690
      error= 0;
 
691
      goto end;
 
692
    }
 
693
    if (maria_is_crashed(info))
 
694
    {
 
695
      eprint(tracef, "Table '%s' is crashed, can't recreate it", name);
 
696
      ALERT_USER();
 
697
      goto end;
 
698
    }
 
699
    maria_close(info);
 
700
    info= NULL;
 
701
  }
 
702
  else /* one or two files absent, or header corrupted... */
 
703
    tprint(tracef, "Table '%s' can't be opened, probably does not exist\n",
 
704
           name);
 
705
  /* if does not exist, or is older, overwrite it */
 
706
  ptr= name + strlen(name) + 1;
 
707
  if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
 
708
    tprint(tracef, ", we will only touch index file");
 
709
  ptr++;
 
710
  kfile_size_before_extension= uint2korr(ptr);
 
711
  ptr+= 2;
 
712
  keystart= uint2korr(ptr);
 
713
  ptr+= 2;
 
714
  kfile_header= (uchar *)ptr;
 
715
  ptr+= kfile_size_before_extension;
 
716
  /* set header lsns */
 
717
  ptr2= (char *) kfile_header + sizeof(info->s->state.header) +
 
718
    MARIA_FILE_CREATE_RENAME_LSN_OFFSET;
 
719
  for (i= 0; i<3; i++)
 
720
  {
 
721
    lsn_store(ptr2, rec->lsn);
 
722
    ptr2+= LSN_STORE_SIZE;
 
723
  }
 
724
  data_file_name= ptr;
 
725
  ptr+= strlen(data_file_name) + 1;
 
726
  index_file_name= ptr;
 
727
  ptr+= strlen(index_file_name) + 1;
 
728
  /** @todo handle symlinks */
 
729
  if (data_file_name[0] || index_file_name[0])
 
730
  {
 
731
    eprint(tracef, "Table '%s' DATA|INDEX DIRECTORY clauses are not handled",
 
732
           name);
 
733
    goto end;
 
734
  }
 
735
  fn_format(filename, name, "", MARIA_NAME_IEXT,
 
736
            (MY_UNPACK_FILENAME |
 
737
             (flags & HA_DONT_TOUCH_DATA) ? MY_RETURN_REAL_PATH : 0) |
 
738
            MY_APPEND_EXT);
 
739
  linkname_ptr= NULL;
 
740
  create_flag= MY_DELETE_OLD;
 
741
  tprint(tracef, "Table '%s' creating as '%s'\n", name, filename);
 
742
  if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
 
743
                                     MYF(MY_WME|create_flag))) < 0)
 
744
  {
 
745
    eprint(tracef, "Failed to create index file");
 
746
    goto end;
 
747
  }
 
748
  if (my_pwrite(kfile, kfile_header,
 
749
                kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
 
750
      my_chsize(kfile, keystart, 0, MYF(MY_WME)))
 
751
  {
 
752
    eprint(tracef, "Failed to write to index file");
 
753
    goto end;
 
754
  }
 
755
  if (!(flags & HA_DONT_TOUCH_DATA))
 
756
  {
 
757
    fn_format(filename,name,"", MARIA_NAME_DEXT,
 
758
              MY_UNPACK_FILENAME | MY_APPEND_EXT);
 
759
    linkname_ptr= NULL;
 
760
    create_flag=MY_DELETE_OLD;
 
761
    if (((dfile=
 
762
          my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
 
763
                                 MYF(MY_WME | create_flag))) < 0) ||
 
764
        my_close(dfile, MYF(MY_WME)))
 
765
    {
 
766
      eprint(tracef, "Failed to create data file");
 
767
      goto end;
 
768
    }
 
769
    /*
 
770
      we now have an empty data file. To be able to
 
771
      _ma_initialize_data_file() we need some pieces of the share to be
 
772
      correctly filled. So we just open the table (fortunately, an empty
 
773
      data file does not preclude this).
 
774
    */
 
775
    if (((info= maria_open(name, O_RDONLY, 0)) == NULL) ||
 
776
        _ma_initialize_data_file(info->s, info->dfile.file))
 
777
    {
 
778
      eprint(tracef, "Failed to open new table or write to data file");
 
779
      goto end;
 
780
    }
 
781
  }
 
782
  error= 0;
 
783
end:
 
784
  if (kfile >= 0)
 
785
    error|= my_close(kfile, MYF(MY_WME));
 
786
  if (info != NULL)
 
787
    error|= maria_close(info);
 
788
  return error;
 
789
}
 
790
 
 
791
 
 
792
prototype_redo_exec_hook(REDO_RENAME_TABLE)
 
793
{
 
794
  char *old_name, *new_name;
 
795
  int error= 1;
 
796
  MARIA_HA *info= NULL;
 
797
  if (skip_DDLs)
 
798
  {
 
799
    tprint(tracef, "we skip DDLs\n");
 
800
    return 0;
 
801
  }
 
802
  enlarge_buffer(rec);
 
803
  if (log_record_buffer.str == NULL ||
 
804
      translog_read_record(rec->lsn, 0, rec->record_length,
 
805
                           log_record_buffer.str, NULL) !=
 
806
      rec->record_length)
 
807
  {
 
808
    eprint(tracef, "Failed to read record");
 
809
    goto end;
 
810
  }
 
811
  old_name= (char *)log_record_buffer.str;
 
812
  new_name= old_name + strlen(old_name) + 1;
 
813
  tprint(tracef, "Table '%s' to rename to '%s'; old-name table ", old_name,
 
814
         new_name);
 
815
  /*
 
816
    Here is why we skip CREATE/DROP/RENAME when doing a recovery from
 
817
    ha_maria (whereas we do when called from maria_read_log). Consider:
 
818
    CREATE TABLE t;
 
819
    RENAME TABLE t to u;
 
820
    DROP TABLE u;
 
821
    RENAME TABLE v to u; # crash between index rename and data rename.
 
822
    And do a Recovery (not removing tables beforehand).
 
823
    Recovery replays CREATE, then RENAME: the maria_open("t") works,
 
824
    maria_open("u") does not (no data file) so table "u" is considered
 
825
    inexistent and so maria_rename() is done which overwrites u's index file,
 
826
    which is lost. Ok, the data file (v.MAD) is still available, but only a
 
827
    REPAIR USE_FRM can rebuild the index, which is unsafe and downtime.
 
828
    So it is preferrable to not execute RENAME, and leave the "mess" of files,
 
829
    rather than possibly destroy a file. DBA will manually rename files.
 
830
    A safe recovery method would probably require checking the existence of
 
831
    the index file and of the data file separately (not via maria_open()), and
 
832
    maybe also to store a create_rename_lsn in the data file too
 
833
    For now, all we risk is to leave the mess (half-renamed files) left by the
 
834
    crash. We however sync files and directories at each file rename. The SQL
 
835
    layer is anyway not crash-safe for DDLs (except the repartioning-related
 
836
    ones).
 
837
    We replay DDLs in maria_read_log to be able to recreate tables from
 
838
    scratch. It means that "maria_read_log -a" should not be used on a
 
839
    database which just crashed during a DDL. And also ALTER TABLE does not
 
840
    log insertions of records into the temporary table, so replaying may
 
841
    fail (grep for INCOMPLETE_LOG in files).
 
842
  */
 
843
  info= maria_open(old_name, O_RDONLY, HA_OPEN_FOR_REPAIR);
 
844
  if (info)
 
845
  {
 
846
    MARIA_SHARE *share= info->s;
 
847
    if (!share->base.born_transactional)
 
848
    {
 
849
      tprint(tracef, ", is not transactional, ignoring renaming\n");
 
850
      ALERT_USER();
 
851
      error= 0;
 
852
      goto end;
 
853
    }
 
854
    if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
 
855
    {
 
856
      tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
 
857
             " record, ignoring renaming",
 
858
             LSN_IN_PARTS(share->state.create_rename_lsn));
 
859
      error= 0;
 
860
      goto end;
 
861
    }
 
862
    if (maria_is_crashed(info))
 
863
    {
 
864
      tprint(tracef, ", is crashed, can't rename it");
 
865
      ALERT_USER();
 
866
      goto end;
 
867
    }
 
868
    if (close_one_table(info->s->open_file_name, rec->lsn) ||
 
869
        maria_close(info))
 
870
      goto end;
 
871
    info= NULL;
 
872
    tprint(tracef, ", is ok for renaming; new-name table ");
 
873
  }
 
874
  else /* one or two files absent, or header corrupted... */
 
875
  {
 
876
    tprint(tracef, ", can't be opened, probably does not exist");
 
877
    error= 0;
 
878
    goto end;
 
879
  }
 
880
  /*
 
881
    We must also check the create_rename_lsn of the 'new_name' table if it
 
882
    exists: otherwise we may, with our rename which overwrites, destroy
 
883
    another table. For example:
 
884
    CREATE TABLE t;
 
885
    RENAME t to u;
 
886
    DROP TABLE u;
 
887
    RENAME v to u; # v is an old table, its creation/insertions not in log
 
888
    And start executing the log (without removing tables beforehand): creates
 
889
    t, renames it to u (if not testing create_rename_lsn) thus overwriting
 
890
    old-named v, drops u, and we are stuck, we have lost data.
 
891
  */
 
892
  info= maria_open(new_name, O_RDONLY, HA_OPEN_FOR_REPAIR);
 
893
  if (info)
 
894
  {
 
895
    MARIA_SHARE *share= info->s;
 
896
    /* We should not have open instances on this table. */
 
897
    if (share->reopen != 1)
 
898
    {
 
899
      tprint(tracef, ", is already open (reopen=%u)\n", share->reopen);
 
900
      ALERT_USER();
 
901
      goto end;
 
902
    }
 
903
    if (!share->base.born_transactional)
 
904
    {
 
905
      tprint(tracef, ", is not transactional, ignoring renaming\n");
 
906
      ALERT_USER();
 
907
      goto drop;
 
908
    }
 
909
    if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
 
910
    {
 
911
      tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
 
912
             " record, ignoring renaming",
 
913
             LSN_IN_PARTS(share->state.create_rename_lsn));
 
914
      /*
 
915
        We have to drop the old_name table. Consider:
 
916
        CREATE TABLE t;
 
917
        CREATE TABLE v;
 
918
        RENAME TABLE t to u;
 
919
        DROP TABLE u;
 
920
        RENAME TABLE v to u;
 
921
        and apply the log without removing tables beforehand. t will be
 
922
        created, v too; in REDO_RENAME u will be more recent, but we still
 
923
        have to drop t otherwise it stays.
 
924
      */
 
925
      goto drop;
 
926
    }
 
927
    if (maria_is_crashed(info))
 
928
    {
 
929
      tprint(tracef, ", is crashed, can't rename it");
 
930
      ALERT_USER();
 
931
      goto end;
 
932
    }
 
933
    if (maria_close(info))
 
934
      goto end;
 
935
    info= NULL;
 
936
    /* abnormal situation */
 
937
    tprint(tracef, ", exists but is older than record, can't rename it");
 
938
    goto end;
 
939
  }
 
940
  else /* one or two files absent, or header corrupted... */
 
941
    tprint(tracef, ", can't be opened, probably does not exist");
 
942
  tprint(tracef, ", renaming '%s'", old_name);
 
943
  if (maria_rename(old_name, new_name))
 
944
  {
 
945
    eprint(tracef, "Failed to rename table");
 
946
    goto end;
 
947
  }
 
948
  info= maria_open(new_name, O_RDONLY, 0);
 
949
  if (info == NULL)
 
950
  {
 
951
    eprint(tracef, "Failed to open renamed table");
 
952
    goto end;
 
953
  }
 
954
  if (_ma_update_state_lsns(info->s, rec->lsn, info->s->state.create_trid,
 
955
                            TRUE, TRUE))
 
956
    goto end;
 
957
  if (maria_close(info))
 
958
    goto end;
 
959
  info= NULL;
 
960
  error= 0;
 
961
  goto end;
 
962
drop:
 
963
  tprint(tracef, ", only dropping '%s'", old_name);
 
964
  if (maria_delete_table(old_name))
 
965
  {
 
966
    eprint(tracef, "Failed to drop table");
 
967
    goto end;
 
968
  }
 
969
  error= 0;
 
970
  goto end;
 
971
end:
 
972
  tprint(tracef, "\n");
 
973
  if (info != NULL)
 
974
    error|= maria_close(info);
 
975
  return error;
 
976
}
 
977
 
 
978
 
 
979
/*
 
980
  The record may come from REPAIR, ALTER TABLE ENABLE KEYS, OPTIMIZE.
 
981
*/
 
982
prototype_redo_exec_hook(REDO_REPAIR_TABLE)
 
983
{
 
984
  int error= 1;
 
985
  MARIA_HA *info;
 
986
  HA_CHECK param;
 
987
  char *name;
 
988
  my_bool quick_repair;
 
989
  DBUG_ENTER("exec_REDO_LOGREC_REDO_REPAIR_TABLE");
 
990
 
 
991
  if (skip_DDLs)
 
992
  {
 
993
    /*
 
994
      REPAIR is not exactly a DDL, but it manipulates files without logging
 
995
      insertions into them.
 
996
    */
 
997
    tprint(tracef, "we skip DDLs\n");
 
998
    DBUG_RETURN(0);
 
999
  }
 
1000
  if ((info= get_MARIA_HA_from_REDO_record(rec)) == NULL)
 
1001
    DBUG_RETURN(0);
 
1002
 
 
1003
  /*
 
1004
    Otherwise, the mapping is newer than the table, and our record is newer
 
1005
    than the mapping, so we can repair.
 
1006
  */
 
1007
  tprint(tracef, "   repairing...\n");
 
1008
 
 
1009
  maria_chk_init(&param);
 
1010
  param.isam_file_name= name= info->s->open_file_name;
 
1011
  param.testflag= uint8korr(rec->header + FILEID_STORE_SIZE);
 
1012
  param.tmpdir= maria_tmpdir;
 
1013
  DBUG_ASSERT(maria_tmpdir);
 
1014
 
 
1015
  info->s->state.key_map= uint8korr(rec->header + FILEID_STORE_SIZE + 8);
 
1016
  quick_repair= test(param.testflag & T_QUICK);
 
1017
 
 
1018
  if (param.testflag & T_REP_PARALLEL)
 
1019
  {
 
1020
    if (maria_repair_parallel(&param, info, name, quick_repair))
 
1021
      goto end;
 
1022
  }
 
1023
  else if (param.testflag & T_REP_BY_SORT)
 
1024
  {
 
1025
    if (maria_repair_by_sort(&param, info, name, quick_repair))
 
1026
      goto end;
 
1027
  }
 
1028
  else if (maria_repair(&param, info, name, quick_repair))
 
1029
    goto end;
 
1030
 
 
1031
  if (_ma_update_state_lsns(info->s, rec->lsn, trnman_get_min_safe_trid(),
 
1032
                            TRUE, !(param.testflag & T_NO_CREATE_RENAME_LSN)))
 
1033
    goto end;
 
1034
  error= 0;
 
1035
 
 
1036
end:
 
1037
  DBUG_RETURN(error);
 
1038
}
 
1039
 
 
1040
 
 
1041
prototype_redo_exec_hook(REDO_DROP_TABLE)
 
1042
{
 
1043
  char *name;
 
1044
  int error= 1;
 
1045
  MARIA_HA *info;
 
1046
  if (skip_DDLs)
 
1047
  {
 
1048
    tprint(tracef, "we skip DDLs\n");
 
1049
    return 0;
 
1050
  }
 
1051
  enlarge_buffer(rec);
 
1052
  if (log_record_buffer.str == NULL ||
 
1053
      translog_read_record(rec->lsn, 0, rec->record_length,
 
1054
                           log_record_buffer.str, NULL) !=
 
1055
      rec->record_length)
 
1056
  {
 
1057
    eprint(tracef, "Failed to read record");
 
1058
    return 1;
 
1059
  }
 
1060
  name= (char *)log_record_buffer.str;
 
1061
  tprint(tracef, "Table '%s'", name);
 
1062
  info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
 
1063
  if (info)
 
1064
  {
 
1065
    MARIA_SHARE *share= info->s;
 
1066
    if (!share->base.born_transactional)
 
1067
    {
 
1068
      tprint(tracef, ", is not transactional, ignoring removal\n");
 
1069
      ALERT_USER();
 
1070
      error= 0;
 
1071
      goto end;
 
1072
    }
 
1073
    if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
 
1074
    {
 
1075
      tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
 
1076
             " record, ignoring removal",
 
1077
             LSN_IN_PARTS(share->state.create_rename_lsn));
 
1078
      error= 0;
 
1079
      goto end;
 
1080
    }
 
1081
    if (maria_is_crashed(info))
 
1082
    {
 
1083
      tprint(tracef, ", is crashed, can't drop it");
 
1084
      ALERT_USER();
 
1085
      goto end;
 
1086
    }
 
1087
    if (close_one_table(info->s->open_file_name, rec->lsn) ||
 
1088
        maria_close(info))
 
1089
      goto end;
 
1090
    info= NULL;
 
1091
    /* if it is older, or its header is corrupted, drop it */
 
1092
    tprint(tracef, ", dropping '%s'", name);
 
1093
    if (maria_delete_table(name))
 
1094
    {
 
1095
      eprint(tracef, "Failed to drop table");
 
1096
      goto end;
 
1097
    }
 
1098
  }
 
1099
  else /* one or two files absent, or header corrupted... */
 
1100
    tprint(tracef,", can't be opened, probably does not exist");
 
1101
  error= 0;
 
1102
end:
 
1103
  tprint(tracef, "\n");
 
1104
  if (info != NULL)
 
1105
    error|= maria_close(info);
 
1106
  return error;
 
1107
}
 
1108
 
 
1109
 
 
1110
prototype_redo_exec_hook(FILE_ID)
 
1111
{
 
1112
  uint16 sid;
 
1113
  int error= 1;
 
1114
  const char *name;
 
1115
  MARIA_HA *info;
 
1116
  DBUG_ENTER("exec_REDO_LOGREC_FILE_ID");
 
1117
 
 
1118
  if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
 
1119
  {
 
1120
    /*
 
1121
      If that mapping was still true at checkpoint time, it was found in
 
1122
      checkpoint record, no need to recreate it. If that mapping had ended at
 
1123
      checkpoint time (table was closed or repaired), a flush and force
 
1124
      happened and so mapping is not needed.
 
1125
    */
 
1126
    tprint(tracef, "ignoring because before checkpoint\n");
 
1127
    DBUG_RETURN(0);
 
1128
  }
 
1129
 
 
1130
  enlarge_buffer(rec);
 
1131
  if (log_record_buffer.str == NULL ||
 
1132
      translog_read_record(rec->lsn, 0, rec->record_length,
 
1133
                           log_record_buffer.str, NULL) !=
 
1134
       rec->record_length)
 
1135
  {
 
1136
    eprint(tracef, "Failed to read record");
 
1137
    goto end;
 
1138
  }
 
1139
  sid= fileid_korr(log_record_buffer.str);
 
1140
  info= all_tables[sid].info;
 
1141
  if (info != NULL)
 
1142
  {
 
1143
    tprint(tracef, "   Closing table '%s'\n", info->s->open_file_name);
 
1144
    prepare_table_for_close(info, rec->lsn);
 
1145
    if (maria_close(info))
 
1146
    {
 
1147
      eprint(tracef, "Failed to close table");
 
1148
      goto end;
 
1149
    }
 
1150
    all_tables[sid].info= NULL;
 
1151
  }
 
1152
  name= (char *)log_record_buffer.str + FILEID_STORE_SIZE;
 
1153
  if (new_table(sid, name, rec->lsn))
 
1154
    goto end;
 
1155
  error= 0;
 
1156
end:
 
1157
  DBUG_RETURN(error);
 
1158
}
 
1159
 
 
1160
 
 
1161
static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id)
 
1162
{
 
1163
  /*
 
1164
    -1 (skip table): close table and return 0;
 
1165
    1 (error): close table and return 1;
 
1166
    0 (success): leave table open and return 0.
 
1167
  */
 
1168
  int error= 1;
 
1169
  MARIA_HA *info;
 
1170
  MARIA_SHARE *share;
 
1171
  my_off_t dfile_len, kfile_len;
 
1172
 
 
1173
  checkpoint_useful= TRUE;
 
1174
  if ((name == NULL) || (name[0] == 0))
 
1175
  {
 
1176
    /*
 
1177
      we didn't use DBUG_ASSERT() because such record corruption could
 
1178
      silently pass in the "info == NULL" test below.
 
1179
    */
 
1180
    tprint(tracef, ", record is corrupted");
 
1181
    info= NULL;
 
1182
    goto end;
 
1183
  }
 
1184
  tprint(tracef, "Table '%s', id %u", name, sid);
 
1185
  info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
 
1186
  if (info == NULL)
 
1187
  {
 
1188
    tprint(tracef, ", is absent (must have been dropped later?)"
 
1189
           " or its header is so corrupted that we cannot open it;"
 
1190
           " we skip it");
 
1191
    error= 0;
 
1192
    goto end;
 
1193
  }
 
1194
  share= info->s;
 
1195
  /* check that we're not already using it */
 
1196
  if (share->reopen != 1)
 
1197
  {
 
1198
    tprint(tracef, ", is already open (reopen=%u)\n", share->reopen);
 
1199
    /*
 
1200
      It could be that we have in the log
 
1201
      FILE_ID(t1,10) ... (t1 was flushed) ... FILE_ID(t1,12);
 
1202
    */
 
1203
    if (close_one_table(share->open_file_name, lsn_of_file_id))
 
1204
      goto end;
 
1205
  }
 
1206
  if (!share->base.born_transactional)
 
1207
  {
 
1208
    /*
 
1209
      This can happen if one converts a transactional table to a
 
1210
      not transactional table
 
1211
    */
 
1212
    tprint(tracef, ", is not transactional.  Ignoring open request");
 
1213
    error= -1;
 
1214
    goto end;
 
1215
  }
 
1216
  if (cmp_translog_addr(lsn_of_file_id, share->state.create_rename_lsn) <= 0)
 
1217
  {
 
1218
    tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
 
1219
           " LOGREC_FILE_ID's LSN (%lu,0x%lx), ignoring open request",
 
1220
           LSN_IN_PARTS(share->state.create_rename_lsn),
 
1221
           LSN_IN_PARTS(lsn_of_file_id));
 
1222
    error= -1;
 
1223
    goto end;
 
1224
    /*
 
1225
      Note that we tested that before testing corruption; a recent corrupted
 
1226
      table is not a blocker for the present log record.
 
1227
    */
 
1228
  }
 
1229
  if (maria_is_crashed(info))
 
1230
  {
 
1231
    eprint(tracef, "Table '%s' is crashed, skipping it. Please repair it with"
 
1232
           " maria_chk -r", share->open_file_name);
 
1233
    error= -1; /* not fatal, try with other tables */
 
1234
    goto end;
 
1235
    /*
 
1236
      Note that if a first recovery fails to apply a REDO, it marks the table
 
1237
      corrupted and stops the entire recovery. A second recovery will find the
 
1238
      table is marked corrupted and skip it (and thus possibly handle other
 
1239
      tables).
 
1240
    */
 
1241
  }
 
1242
  /* don't log any records for this work */
 
1243
  _ma_tmp_disable_logging_for_table(info, FALSE);
 
1244
  /* execution of some REDO records relies on data_file_length */
 
1245
  dfile_len= my_seek(info->dfile.file, 0, SEEK_END, MYF(MY_WME));
 
1246
  kfile_len= my_seek(info->s->kfile.file, 0, SEEK_END, MYF(MY_WME));
 
1247
  if ((dfile_len == MY_FILEPOS_ERROR) ||
 
1248
      (kfile_len == MY_FILEPOS_ERROR))
 
1249
  {
 
1250
    tprint(tracef, ", length unknown\n");
 
1251
    goto end;
 
1252
  }
 
1253
  if (share->state.state.data_file_length != dfile_len)
 
1254
  {
 
1255
    tprint(tracef, ", has wrong state.data_file_length (fixing it)");
 
1256
    share->state.state.data_file_length= dfile_len;
 
1257
  }
 
1258
  if (share->state.state.key_file_length != kfile_len)
 
1259
  {
 
1260
    tprint(tracef, ", has wrong state.key_file_length (fixing it)");
 
1261
    share->state.state.key_file_length= kfile_len;
 
1262
  }
 
1263
  if ((dfile_len % share->block_size) || (kfile_len % share->block_size))
 
1264
  {
 
1265
    tprint(tracef, ", has too short last page\n");
 
1266
    /* Recovery will fix this, no error */
 
1267
    ALERT_USER();
 
1268
  }
 
1269
  /*
 
1270
    This LSN serves in this situation; assume log is:
 
1271
    FILE_ID(6->"t2") REDO_INSERT(6) FILE_ID(6->"t1") CHECKPOINT(6->"t1")
 
1272
    then crash, checkpoint record is parsed and opens "t1" with id 6; assume
 
1273
    REDO phase starts from the REDO_INSERT above: it will wrongly try to
 
1274
    update a page of "t1". With this LSN below, REDO_INSERT can realize the
 
1275
    mapping is newer than itself, and not execute.
 
1276
    Same example is possible with UNDO_INSERT (update of the state).
 
1277
  */
 
1278
  info->s->lsn_of_file_id= lsn_of_file_id;
 
1279
  all_tables[sid].info= info;
 
1280
  /*
 
1281
    We don't set info->s->id, it would be useless (no logging in REDO phase);
 
1282
    if you change that, know that some records in REDO phase call
 
1283
    _ma_update_state_lsns() which resets info->s->id.
 
1284
  */
 
1285
  tprint(tracef, ", opened");
 
1286
  error= 0;
 
1287
end:
 
1288
  tprint(tracef, "\n");
 
1289
  if (error)
 
1290
  {
 
1291
    if (info != NULL)
 
1292
      maria_close(info);
 
1293
    if (error == -1)
 
1294
      error= 0;
 
1295
  }
 
1296
  return error;
 
1297
}
 
1298
 
 
1299
/*
 
1300
  NOTE
 
1301
  This is called for REDO_INSERT_ROW_HEAD and READ_NEW_ROW_HEAD
 
1302
*/
 
1303
 
 
1304
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD)
 
1305
{
 
1306
  int error= 1;
 
1307
  uchar *buff= NULL;
 
1308
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
 
1309
  if (info == NULL)
 
1310
  {
 
1311
    /*
 
1312
      Table was skipped at open time (because later dropped/renamed, not
 
1313
      transactional, or create_rename_lsn newer than LOGREC_FILE_ID), or
 
1314
      record was skipped due to skip_redo_lsn; it is not an error.
 
1315
    */
 
1316
    return 0;
 
1317
  }
 
1318
  /*
 
1319
    Note that REDO is per page, we still consider it if its transaction
 
1320
    committed long ago and is unknown.
 
1321
  */
 
1322
  /*
 
1323
    If REDO's LSN is > page's LSN (read from disk), we are going to modify the
 
1324
    page and change its LSN. The normal runtime code stores the UNDO's LSN
 
1325
    into the page. Here storing the REDO's LSN (rec->lsn) would work
 
1326
    (we are not writing to the log here, so don't have to "flush up to UNDO's
 
1327
    LSN"). But in a test scenario where we do updates at runtime, then remove
 
1328
    tables, apply the log and check that this results in the same table as at
 
1329
    runtime, putting the same LSN as runtime had done will decrease
 
1330
    differences. So we use the UNDO's LSN which is current_group_end_lsn.
 
1331
  */
 
1332
  enlarge_buffer(rec);
 
1333
  if (log_record_buffer.str == NULL)
 
1334
  {
 
1335
    eprint(tracef, "Failed to read allocate buffer for record");
 
1336
    goto end;
 
1337
  }
 
1338
  if (translog_read_record(rec->lsn, 0, rec->record_length,
 
1339
                           log_record_buffer.str, NULL) !=
 
1340
      rec->record_length)
 
1341
  {
 
1342
    eprint(tracef, "Failed to read record");
 
1343
    goto end;
 
1344
  }
 
1345
  buff= log_record_buffer.str;
 
1346
  if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
 
1347
                                             HEAD_PAGE,
 
1348
                                             (rec->type ==
 
1349
                                              LOGREC_REDO_NEW_ROW_HEAD),
 
1350
                                             buff + FILEID_STORE_SIZE,
 
1351
                                             buff +
 
1352
                                             FILEID_STORE_SIZE +
 
1353
                                             PAGE_STORE_SIZE +
 
1354
                                             DIRPOS_STORE_SIZE,
 
1355
                                             rec->record_length -
 
1356
                                             (FILEID_STORE_SIZE +
 
1357
                                              PAGE_STORE_SIZE +
 
1358
                                              DIRPOS_STORE_SIZE)))
 
1359
    goto end;
 
1360
  error= 0;
 
1361
end:
 
1362
  return error;
 
1363
}
 
1364
 
 
1365
/*
 
1366
  NOTE
 
1367
  This is called for REDO_INSERT_ROW_TAIL and READ_NEW_ROW_TAIL
 
1368
*/
 
1369
 
 
1370
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL)
 
1371
{
 
1372
  int error= 1;
 
1373
  uchar *buff;
 
1374
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
 
1375
  if (info == NULL)
 
1376
    return 0;
 
1377
  enlarge_buffer(rec);
 
1378
  if (log_record_buffer.str == NULL ||
 
1379
      translog_read_record(rec->lsn, 0, rec->record_length,
 
1380
                           log_record_buffer.str, NULL) !=
 
1381
       rec->record_length)
 
1382
  {
 
1383
    eprint(tracef, "Failed to read record");
 
1384
    goto end;
 
1385
  }
 
1386
  buff= log_record_buffer.str;
 
1387
  if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
 
1388
                                             TAIL_PAGE,
 
1389
                                             (rec->type ==
 
1390
                                              LOGREC_REDO_NEW_ROW_TAIL),
 
1391
                                             buff + FILEID_STORE_SIZE,
 
1392
                                             buff +
 
1393
                                             FILEID_STORE_SIZE +
 
1394
                                             PAGE_STORE_SIZE +
 
1395
                                             DIRPOS_STORE_SIZE,
 
1396
                                             rec->record_length -
 
1397
                                             (FILEID_STORE_SIZE +
 
1398
                                              PAGE_STORE_SIZE +
 
1399
                                              DIRPOS_STORE_SIZE)))
 
1400
    goto end;
 
1401
  error= 0;
 
1402
 
 
1403
end:
 
1404
  return error;
 
1405
}
 
1406
 
 
1407
 
 
1408
prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS)
 
1409
{
 
1410
  int error= 1;
 
1411
  uchar *buff;
 
1412
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
 
1413
  if (info == NULL)
 
1414
    return 0;
 
1415
  enlarge_buffer(rec);
 
1416
  if (log_record_buffer.str == NULL ||
 
1417
      translog_read_record(rec->lsn, 0, rec->record_length,
 
1418
                           log_record_buffer.str, NULL) !=
 
1419
       rec->record_length)
 
1420
  {
 
1421
    eprint(tracef, "Failed to read record");
 
1422
    goto end;
 
1423
  }
 
1424
  buff= log_record_buffer.str;
 
1425
  if (_ma_apply_redo_insert_row_blobs(info, current_group_end_lsn,
 
1426
                                      buff, rec->lsn))
 
1427
    goto end;
 
1428
  error= 0;
 
1429
 
 
1430
end:
 
1431
  return error;
 
1432
}
 
1433
 
 
1434
 
 
1435
prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD)
 
1436
{
 
1437
  int error= 1;
 
1438
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
 
1439
  if (info == NULL)
 
1440
    return 0;
 
1441
  if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
 
1442
                                            HEAD_PAGE,
 
1443
                                            rec->header + FILEID_STORE_SIZE))
 
1444
    goto end;
 
1445
  error= 0;
 
1446
end:
 
1447
  return error;
 
1448
}
 
1449
 
 
1450
 
 
1451
prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL)
 
1452
{
 
1453
  int error= 1;
 
1454
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
 
1455
  if (info == NULL)
 
1456
    return 0;
 
1457
  if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
 
1458
                                            TAIL_PAGE,
 
1459
                                            rec->header + FILEID_STORE_SIZE))
 
1460
    goto end;
 
1461
  error= 0;
 
1462
end:
 
1463
  return error;
 
1464
}
 
1465
 
 
1466
 
 
1467
prototype_redo_exec_hook(REDO_FREE_BLOCKS)
 
1468
{
 
1469
  int error= 1;
 
1470
  uchar *buff;
 
1471
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
 
1472
  if (info == NULL)
 
1473
    return 0;
 
1474
  enlarge_buffer(rec);
 
1475
 
 
1476
  if (log_record_buffer.str == NULL ||
 
1477
      translog_read_record(rec->lsn, 0, rec->record_length,
 
1478
                           log_record_buffer.str, NULL) !=
 
1479
       rec->record_length)
 
1480
  {
 
1481
    eprint(tracef, "Failed to read record");
 
1482
    goto end;
 
1483
  }
 
1484
 
 
1485
  buff= log_record_buffer.str;
 
1486
  if (_ma_apply_redo_free_blocks(info, current_group_end_lsn,
 
1487
                                 buff + FILEID_STORE_SIZE))
 
1488
    goto end;
 
1489
  error= 0;
 
1490
end:
 
1491
  return error;
 
1492
}
 
1493
 
 
1494
 
 
1495
prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL)
 
1496
{
 
1497
  int error= 1;
 
1498
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
 
1499
  if (info == NULL)
 
1500
    return 0;
 
1501
 
 
1502
  if (_ma_apply_redo_free_head_or_tail(info, current_group_end_lsn,
 
1503
                                       rec->header + FILEID_STORE_SIZE))
 
1504
    goto end;
 
1505
  error= 0;
 
1506
end:
 
1507
  return error;
 
1508
}
 
1509
 
 
1510
 
 
1511
prototype_redo_exec_hook(REDO_DELETE_ALL)
 
1512
{
 
1513
  int error= 1;
 
1514
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
 
1515
  if (info == NULL)
 
1516
    return 0;
 
1517
  tprint(tracef, "   deleting all %lu rows\n",
 
1518
         (ulong)info->s->state.state.records);
 
1519
  if (maria_delete_all_rows(info))
 
1520
    goto end;
 
1521
  error= 0;
 
1522
end:
 
1523
  return error;
 
1524
}
 
1525
 
 
1526
 
 
1527
prototype_redo_exec_hook(REDO_INDEX)
 
1528
{
 
1529
  int error= 1;
 
1530
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
 
1531
  if (info == NULL)
 
1532
    return 0;
 
1533
  enlarge_buffer(rec);
 
1534
 
 
1535
  if (log_record_buffer.str == NULL ||
 
1536
      translog_read_record(rec->lsn, 0, rec->record_length,
 
1537
                           log_record_buffer.str, NULL) !=
 
1538
       rec->record_length)
 
1539
  {
 
1540
    eprint(tracef, "Failed to read record");
 
1541
    goto end;
 
1542
  }
 
1543
 
 
1544
  if (_ma_apply_redo_index(info, current_group_end_lsn,
 
1545
                           log_record_buffer.str + FILEID_STORE_SIZE,
 
1546
                           rec->record_length - FILEID_STORE_SIZE))
 
1547
    goto end;
 
1548
  error= 0;
 
1549
end:
 
1550
  return error;
 
1551
}
 
1552
 
 
1553
prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE)
 
1554
{
 
1555
  int error= 1;
 
1556
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
 
1557
  if (info == NULL)
 
1558
    return 0;
 
1559
  enlarge_buffer(rec);
 
1560
 
 
1561
  if (log_record_buffer.str == NULL ||
 
1562
      translog_read_record(rec->lsn, 0, rec->record_length,
 
1563
                           log_record_buffer.str, NULL) !=
 
1564
       rec->record_length)
 
1565
  {
 
1566
    eprint(tracef, "Failed to read record");
 
1567
    goto end;
 
1568
  }
 
1569
 
 
1570
  if (_ma_apply_redo_index_new_page(info, current_group_end_lsn,
 
1571
                                    log_record_buffer.str + FILEID_STORE_SIZE,
 
1572
                                    rec->record_length - FILEID_STORE_SIZE))
 
1573
    goto end;
 
1574
  error= 0;
 
1575
end:
 
1576
  return error;
 
1577
}
 
1578
 
 
1579
 
 
1580
prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE)
 
1581
{
 
1582
  int error= 1;
 
1583
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
 
1584
  if (info == NULL)
 
1585
    return 0;
 
1586
 
 
1587
  if (_ma_apply_redo_index_free_page(info, current_group_end_lsn,
 
1588
                                     rec->header + FILEID_STORE_SIZE))
 
1589
    goto end;
 
1590
  error= 0;
 
1591
end:
 
1592
  return error;
 
1593
}
 
1594
 
 
1595
 
 
1596
prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE)
 
1597
{
 
1598
  int error= 1;
 
1599
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
 
1600
  if (info == NULL)
 
1601
    return 0;
 
1602
  enlarge_buffer(rec);
 
1603
 
 
1604
  if (log_record_buffer.str == NULL ||
 
1605
      translog_read_record(rec->lsn, 0, rec->record_length,
 
1606
                           log_record_buffer.str, NULL) !=
 
1607
       rec->record_length)
 
1608
  {
 
1609
    eprint(tracef, "Failed to read record");
 
1610
    goto end;
 
1611
  }
 
1612
 
 
1613
  if (cmp_translog_addr(rec->lsn, checkpoint_start) >= 0)
 
1614
  {
 
1615
    /*
 
1616
      Record is potentially after the bitmap flush made by Checkpoint, so has
 
1617
      to be replayed. It may overwrite a more recent state but that will be
 
1618
      corrected by all upcoming REDOs for data pages.
 
1619
      If the condition is false, we must not apply the record: it is unneeded
 
1620
      and nocive (may not be corrected as REDOs can be skipped due to
 
1621
      dirty-pages list).
 
1622
    */
 
1623
    if (_ma_apply_redo_bitmap_new_page(info, current_group_end_lsn,
 
1624
                                       log_record_buffer.str +
 
1625
                                       FILEID_STORE_SIZE))
 
1626
      goto end;
 
1627
  }
 
1628
  error= 0;
 
1629
end:
 
1630
  return error;
 
1631
}
 
1632
 
 
1633
 
 
1634
static inline void set_undo_lsn_for_active_trans(uint16 short_trid, LSN lsn)
 
1635
{
 
1636
  if (all_active_trans[short_trid].long_trid == 0)
 
1637
  {
 
1638
    /* transaction unknown, so has committed or fully rolled back long ago */
 
1639
    return;
 
1640
  }
 
1641
  all_active_trans[short_trid].undo_lsn= lsn;
 
1642
  if (all_active_trans[short_trid].first_undo_lsn == LSN_IMPOSSIBLE)
 
1643
    all_active_trans[short_trid].first_undo_lsn= lsn;
 
1644
}
 
1645
 
 
1646
 
 
1647
prototype_redo_exec_hook(UNDO_ROW_INSERT)
 
1648
{
 
1649
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
 
1650
  MARIA_SHARE *share;
 
1651
 
 
1652
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
 
1653
  if (info == NULL)
 
1654
  {
 
1655
    /*
 
1656
      Note that we set undo_lsn anyway. So that if the transaction is later
 
1657
      rolled back, this UNDO is tried for execution and we get a warning (as
 
1658
      it would then be abnormal that info==NULL).
 
1659
    */
 
1660
    return 0;
 
1661
  }
 
1662
  share= info->s;
 
1663
  if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
 
1664
  {
 
1665
    tprint(tracef, "   state has LSN (%lu,0x%lx) older than record, updating"
 
1666
           " rows' count\n", LSN_IN_PARTS(share->state.is_of_horizon));
 
1667
    share->state.state.records++;
 
1668
    if (share->calc_checksum)
 
1669
    {
 
1670
      uchar buff[HA_CHECKSUM_STORE_SIZE];
 
1671
      if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
 
1672
                               PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
 
1673
                               HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
 
1674
          HA_CHECKSUM_STORE_SIZE)
 
1675
      {
 
1676
        eprint(tracef, "Failed to read record");
 
1677
        return 1;
 
1678
      }
 
1679
      share->state.state.checksum+= ha_checksum_korr(buff);
 
1680
    }
 
1681
    info->s->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
 
1682
                              STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
 
1683
  }
 
1684
  tprint(tracef, "   rows' count %lu\n", (ulong)info->s->state.state.records);
 
1685
  /* Unpin all pages, stamp them with UNDO's LSN */
 
1686
  _ma_unpin_all_pages(info, rec->lsn);
 
1687
  return 0;
 
1688
}
 
1689
 
 
1690
 
 
1691
prototype_redo_exec_hook(UNDO_ROW_DELETE)
 
1692
{
 
1693
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
 
1694
  MARIA_SHARE *share;
 
1695
 
 
1696
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
 
1697
  if (info == NULL)
 
1698
    return 0;
 
1699
  share= info->s;
 
1700
  if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
 
1701
  {
 
1702
    tprint(tracef, "   state older than record\n");
 
1703
    share->state.state.records--;
 
1704
    if (share->calc_checksum)
 
1705
    {
 
1706
      uchar buff[HA_CHECKSUM_STORE_SIZE];
 
1707
      if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
 
1708
                               PAGE_STORE_SIZE + DIRPOS_STORE_SIZE + 2 +
 
1709
                               PAGERANGE_STORE_SIZE,
 
1710
                               HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
 
1711
          HA_CHECKSUM_STORE_SIZE)
 
1712
      {
 
1713
        eprint(tracef, "Failed to read record");
 
1714
        return 1;
 
1715
      }
 
1716
      share->state.state.checksum+= ha_checksum_korr(buff);
 
1717
    }
 
1718
    share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
 
1719
                            STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_ZEROFILLED |
 
1720
                            STATE_NOT_MOVABLE);
 
1721
  }
 
1722
  tprint(tracef, "   rows' count %lu\n", (ulong)share->state.state.records);
 
1723
  _ma_unpin_all_pages(info, rec->lsn);
 
1724
  return 0;
 
1725
}
 
1726
 
 
1727
 
 
1728
prototype_redo_exec_hook(UNDO_ROW_UPDATE)
 
1729
{
 
1730
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
 
1731
  MARIA_SHARE *share;
 
1732
 
 
1733
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
 
1734
  if (info == NULL)
 
1735
    return 0;
 
1736
  share= info->s;
 
1737
  if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
 
1738
  {
 
1739
    if (share->calc_checksum)
 
1740
    {
 
1741
      uchar buff[HA_CHECKSUM_STORE_SIZE];
 
1742
      if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
 
1743
                               PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
 
1744
                               HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
 
1745
          HA_CHECKSUM_STORE_SIZE)
 
1746
      {
 
1747
        eprint(tracef, "Failed to read record");
 
1748
        return 1;
 
1749
      }
 
1750
      share->state.state.checksum+= ha_checksum_korr(buff);
 
1751
    }
 
1752
    share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
 
1753
                            STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
 
1754
  }
 
1755
  _ma_unpin_all_pages(info, rec->lsn);
 
1756
  return 0;
 
1757
}
 
1758
 
 
1759
 
 
1760
prototype_redo_exec_hook(UNDO_KEY_INSERT)
 
1761
{
 
1762
  MARIA_HA *info;
 
1763
  MARIA_SHARE *share;
 
1764
 
 
1765
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
 
1766
  if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
 
1767
    return 0;
 
1768
  share= info->s;
 
1769
  if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
 
1770
  {
 
1771
    const uchar *ptr= rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE;
 
1772
    uint keynr= key_nr_korr(ptr);
 
1773
    if (share->base.auto_key == (keynr + 1)) /* it's auto-increment */
 
1774
    {
 
1775
      const HA_KEYSEG *keyseg= info->s->keyinfo[keynr].seg;
 
1776
      ulonglong value;
 
1777
      char llbuf[22];
 
1778
      uchar *to;
 
1779
      tprint(tracef, "   state older than record\n");
 
1780
      /* we read the record to find the auto_increment value */
 
1781
      enlarge_buffer(rec);
 
1782
      if (log_record_buffer.str == NULL ||
 
1783
          translog_read_record(rec->lsn, 0, rec->record_length,
 
1784
                               log_record_buffer.str, NULL) !=
 
1785
          rec->record_length)
 
1786
      {
 
1787
        eprint(tracef, "Failed to read record");
 
1788
        return 1;
 
1789
      }
 
1790
      to= log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE +
 
1791
        KEY_NR_STORE_SIZE;
 
1792
      if (keyseg->flag & HA_SWAP_KEY)
 
1793
      {
 
1794
        /* We put key from log record to "data record" packing format... */
 
1795
        uchar reversed[MARIA_MAX_KEY_BUFF];
 
1796
        uchar *key_ptr= to;
 
1797
        uchar *key_end= key_ptr + keyseg->length;
 
1798
        to= reversed + keyseg->length;
 
1799
        do
 
1800
        {
 
1801
          *--to= *key_ptr++;
 
1802
        } while (key_ptr != key_end);
 
1803
        /* ... so that we can read it with: */
 
1804
      }
 
1805
      value= ma_retrieve_auto_increment(to, keyseg->type);
 
1806
      set_if_bigger(share->state.auto_increment, value);
 
1807
      llstr(share->state.auto_increment, llbuf);
 
1808
      tprint(tracef, "   auto-inc %s\n", llbuf);
 
1809
    }
 
1810
  }
 
1811
  _ma_unpin_all_pages(info, rec->lsn);
 
1812
  return 0;
 
1813
}
 
1814
 
 
1815
 
 
1816
prototype_redo_exec_hook(UNDO_KEY_DELETE)
 
1817
{
 
1818
  MARIA_HA *info;
 
1819
 
 
1820
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
 
1821
  if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
 
1822
    return 0;
 
1823
  _ma_unpin_all_pages(info, rec->lsn);
 
1824
  return 0;
 
1825
}
 
1826
 
 
1827
 
 
1828
prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
 
1829
{
 
1830
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
 
1831
  MARIA_SHARE *share;
 
1832
 
 
1833
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
 
1834
  if (info == NULL)
 
1835
    return 0;
 
1836
  share= info->s;
 
1837
  if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
 
1838
  {
 
1839
    uint key_nr;
 
1840
    my_off_t page;
 
1841
    key_nr= key_nr_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
 
1842
    page=  page_korr(rec->header +  LSN_STORE_SIZE + FILEID_STORE_SIZE +
 
1843
                     KEY_NR_STORE_SIZE);
 
1844
    share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
 
1845
                                    HA_OFFSET_ERROR :
 
1846
                                    page * share->block_size);
 
1847
  }
 
1848
  _ma_unpin_all_pages(info, rec->lsn);
 
1849
  return 0;
 
1850
}
 
1851
 
 
1852
 
 
1853
prototype_redo_exec_hook(UNDO_BULK_INSERT)
 
1854
{
 
1855
  /*
 
1856
    If the repair finished it wrote and sync the state. If it didn't finish,
 
1857
    we are going to empty the table and that will fix the state.
 
1858
  */
 
1859
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
 
1860
  return 0;
 
1861
}
 
1862
 
 
1863
 
 
1864
prototype_redo_exec_hook(COMMIT)
 
1865
{
 
1866
  uint16 sid= rec->short_trid;
 
1867
  TrID long_trid= all_active_trans[sid].long_trid;
 
1868
  char llbuf[22];
 
1869
  if (long_trid == 0)
 
1870
  {
 
1871
    tprint(tracef, "We don't know about transaction with short_trid %u;"
 
1872
           "it probably committed long ago, forget it\n", sid);
 
1873
    bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
 
1874
    return 0;
 
1875
  }
 
1876
  llstr(long_trid, llbuf);
 
1877
  tprint(tracef, "Transaction long_trid %s short_trid %u committed\n",
 
1878
         llbuf, sid);
 
1879
  bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
 
1880
#ifdef MARIA_VERSIONING
 
1881
  /*
 
1882
    if real recovery:
 
1883
    transaction was committed, move it to some separate list for later
 
1884
    purging (but don't purge now! purging may have been started before, we
 
1885
    may find REDO_PURGE records soon).
 
1886
  */
 
1887
#endif
 
1888
  return 0;
 
1889
}
 
1890
 
 
1891
prototype_redo_exec_hook(CLR_END)
 
1892
{
 
1893
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
 
1894
  MARIA_SHARE *share;
 
1895
  LSN previous_undo_lsn;
 
1896
  enum translog_record_type undone_record_type;
 
1897
  const LOG_DESC *log_desc;
 
1898
  my_bool row_entry= 0;
 
1899
  uchar *logpos;
 
1900
  DBUG_ENTER("exec_REDO_LOGREC_CLR_END");
 
1901
 
 
1902
  previous_undo_lsn= lsn_korr(rec->header);
 
1903
  undone_record_type=
 
1904
    clr_type_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
 
1905
  log_desc= &log_record_type_descriptor[undone_record_type];
 
1906
 
 
1907
  set_undo_lsn_for_active_trans(rec->short_trid, previous_undo_lsn);
 
1908
  if (info == NULL)
 
1909
    DBUG_RETURN(0);
 
1910
  share= info->s;
 
1911
  tprint(tracef, "   CLR_END was about %s, undo_lsn now LSN (%lu,0x%lx)\n",
 
1912
         log_desc->name, LSN_IN_PARTS(previous_undo_lsn));
 
1913
 
 
1914
  enlarge_buffer(rec);
 
1915
  if (log_record_buffer.str == NULL ||
 
1916
      translog_read_record(rec->lsn, 0, rec->record_length,
 
1917
                           log_record_buffer.str, NULL) !=
 
1918
      rec->record_length)
 
1919
  {
 
1920
    eprint(tracef, "Failed to read record");
 
1921
    return 1;
 
1922
  }
 
1923
  logpos= (log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE +
 
1924
           CLR_TYPE_STORE_SIZE);
 
1925
 
 
1926
  if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
 
1927
  {
 
1928
    tprint(tracef, "   state older than record\n");
 
1929
    switch (undone_record_type) {
 
1930
    case LOGREC_UNDO_ROW_DELETE:
 
1931
      row_entry= 1;
 
1932
      share->state.state.records++;
 
1933
      break;
 
1934
    case LOGREC_UNDO_ROW_INSERT:
 
1935
      share->state.state.records--;
 
1936
      share->state.changed|= STATE_NOT_OPTIMIZED_ROWS;
 
1937
      row_entry= 1;
 
1938
      break;
 
1939
    case LOGREC_UNDO_ROW_UPDATE:
 
1940
      row_entry= 1;
 
1941
      break;
 
1942
    case LOGREC_UNDO_KEY_INSERT:
 
1943
    case LOGREC_UNDO_KEY_DELETE:
 
1944
      break;
 
1945
    case LOGREC_UNDO_KEY_INSERT_WITH_ROOT:
 
1946
    case LOGREC_UNDO_KEY_DELETE_WITH_ROOT:
 
1947
    {
 
1948
      uint key_nr;
 
1949
      my_off_t page;
 
1950
      key_nr= key_nr_korr(logpos);
 
1951
      page=  page_korr(logpos + KEY_NR_STORE_SIZE);
 
1952
      share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
 
1953
                                      HA_OFFSET_ERROR :
 
1954
                                      page * share->block_size);
 
1955
      break;
 
1956
    }
 
1957
    case LOGREC_UNDO_BULK_INSERT:
 
1958
      break;
 
1959
    default:
 
1960
      DBUG_ASSERT(0);
 
1961
    }
 
1962
    if (row_entry && share->calc_checksum)
 
1963
      share->state.state.checksum+= ha_checksum_korr(logpos);
 
1964
    share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
 
1965
                            STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
 
1966
  }
 
1967
  if (row_entry)
 
1968
    tprint(tracef, "   rows' count %lu\n", (ulong)share->state.state.records);
 
1969
  _ma_unpin_all_pages(info, rec->lsn);
 
1970
  DBUG_RETURN(0);
 
1971
}
 
1972
 
 
1973
 
 
1974
/**
 
1975
  In some cases we have to skip execution of an UNDO record during the UNDO
 
1976
  phase.
 
1977
*/
 
1978
 
 
1979
static void skip_undo_record(LSN previous_undo_lsn, TRN *trn)
 
1980
{
 
1981
  trn->undo_lsn= previous_undo_lsn;
 
1982
  if (previous_undo_lsn == LSN_IMPOSSIBLE) /* has fully rolled back */
 
1983
    trn->first_undo_lsn= LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
 
1984
  skipped_undo_phase++;
 
1985
}
 
1986
 
 
1987
 
 
1988
prototype_undo_exec_hook(UNDO_ROW_INSERT)
 
1989
{
 
1990
  my_bool error;
 
1991
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
 
1992
  LSN previous_undo_lsn= lsn_korr(rec->header);
 
1993
  MARIA_SHARE *share;
 
1994
  const uchar *record_ptr;
 
1995
 
 
1996
  if (info == NULL)
 
1997
  {
 
1998
    /*
 
1999
      Unlike for REDOs, if the table was skipped it is abnormal; we have a
 
2000
      transaction to rollback which used this table, as it is not rolled back
 
2001
      it was supposed to hold this table and so the table should still be
 
2002
      there. Skip it (user may have repaired the table with maria_chk because
 
2003
      it was so badly corrupted that a previous recovery failed) but warn.
 
2004
    */
 
2005
    skip_undo_record(previous_undo_lsn, trn);
 
2006
    return 0;
 
2007
  }
 
2008
  share= info->s;
 
2009
  share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
 
2010
                          STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_ZEROFILLED |
 
2011
                          STATE_NOT_MOVABLE);
 
2012
  record_ptr= rec->header;
 
2013
  if (share->calc_checksum)
 
2014
  {
 
2015
    /*
 
2016
      We need to read more of the record to put the checksum into the record
 
2017
      buffer used by _ma_apply_undo_row_insert().
 
2018
      If the table has no live checksum, rec->header will be enough.
 
2019
    */
 
2020
    enlarge_buffer(rec);
 
2021
    if (log_record_buffer.str == NULL ||
 
2022
        translog_read_record(rec->lsn, 0, rec->record_length,
 
2023
                             log_record_buffer.str, NULL) !=
 
2024
        rec->record_length)
 
2025
    {
 
2026
      eprint(tracef, "Failed to read record");
 
2027
      return 1;
 
2028
    }
 
2029
    record_ptr= log_record_buffer.str;
 
2030
  }
 
2031
 
 
2032
  info->trn= trn;
 
2033
  error= _ma_apply_undo_row_insert(info, previous_undo_lsn,
 
2034
                                   record_ptr + LSN_STORE_SIZE +
 
2035
                                   FILEID_STORE_SIZE);
 
2036
  info->trn= 0;
 
2037
  /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
 
2038
  tprint(tracef, "   rows' count %lu\n", (ulong)info->s->state.state.records);
 
2039
  tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
 
2040
         LSN_IN_PARTS(trn->undo_lsn));
 
2041
  return error;
 
2042
}
 
2043
 
 
2044
 
 
2045
prototype_undo_exec_hook(UNDO_ROW_DELETE)
 
2046
{
 
2047
  my_bool error;
 
2048
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
 
2049
  LSN previous_undo_lsn= lsn_korr(rec->header);
 
2050
  MARIA_SHARE *share;
 
2051
 
 
2052
  if (info == NULL)
 
2053
  {
 
2054
    skip_undo_record(previous_undo_lsn, trn);
 
2055
    return 0;
 
2056
  }
 
2057
 
 
2058
  share= info->s;
 
2059
  share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
 
2060
                          STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
 
2061
  enlarge_buffer(rec);
 
2062
  if (log_record_buffer.str == NULL ||
 
2063
      translog_read_record(rec->lsn, 0, rec->record_length,
 
2064
                           log_record_buffer.str, NULL) !=
 
2065
       rec->record_length)
 
2066
  {
 
2067
    eprint(tracef, "Failed to read record");
 
2068
    return 1;
 
2069
  }
 
2070
 
 
2071
  info->trn= trn;
 
2072
  error= _ma_apply_undo_row_delete(info, previous_undo_lsn,
 
2073
                                   log_record_buffer.str + LSN_STORE_SIZE +
 
2074
                                   FILEID_STORE_SIZE,
 
2075
                                   rec->record_length -
 
2076
                                   (LSN_STORE_SIZE + FILEID_STORE_SIZE));
 
2077
  info->trn= 0;
 
2078
  tprint(tracef, "   rows' count %lu\n   undo_lsn now LSN (%lu,0x%lx)\n",
 
2079
         (ulong)share->state.state.records, LSN_IN_PARTS(trn->undo_lsn));
 
2080
  return error;
 
2081
}
 
2082
 
 
2083
 
 
2084
prototype_undo_exec_hook(UNDO_ROW_UPDATE)
 
2085
{
 
2086
  my_bool error;
 
2087
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
 
2088
  LSN previous_undo_lsn= lsn_korr(rec->header);
 
2089
  MARIA_SHARE *share;
 
2090
 
 
2091
  if (info == NULL)
 
2092
  {
 
2093
    skip_undo_record(previous_undo_lsn, trn);
 
2094
    return 0;
 
2095
  }
 
2096
 
 
2097
  share= info->s;
 
2098
  share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
 
2099
                          STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
 
2100
  enlarge_buffer(rec);
 
2101
  if (log_record_buffer.str == NULL ||
 
2102
      translog_read_record(rec->lsn, 0, rec->record_length,
 
2103
                           log_record_buffer.str, NULL) !=
 
2104
       rec->record_length)
 
2105
  {
 
2106
    eprint(tracef, "Failed to read record");
 
2107
    return 1;
 
2108
  }
 
2109
 
 
2110
  info->trn= trn;
 
2111
  error= _ma_apply_undo_row_update(info, previous_undo_lsn,
 
2112
                                   log_record_buffer.str + LSN_STORE_SIZE +
 
2113
                                   FILEID_STORE_SIZE,
 
2114
                                   rec->record_length -
 
2115
                                   (LSN_STORE_SIZE + FILEID_STORE_SIZE));
 
2116
  info->trn= 0;
 
2117
  tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
 
2118
         LSN_IN_PARTS(trn->undo_lsn));
 
2119
  return error;
 
2120
}
 
2121
 
 
2122
 
 
2123
prototype_undo_exec_hook(UNDO_KEY_INSERT)
 
2124
{
 
2125
  my_bool error;
 
2126
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
 
2127
  LSN previous_undo_lsn= lsn_korr(rec->header);
 
2128
  MARIA_SHARE *share;
 
2129
 
 
2130
  if (info == NULL)
 
2131
  {
 
2132
    skip_undo_record(previous_undo_lsn, trn);
 
2133
    return 0;
 
2134
  }
 
2135
 
 
2136
  share= info->s;
 
2137
  share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
 
2138
                          STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
 
2139
 
 
2140
  enlarge_buffer(rec);
 
2141
  if (log_record_buffer.str == NULL ||
 
2142
      translog_read_record(rec->lsn, 0, rec->record_length,
 
2143
                           log_record_buffer.str, NULL) !=
 
2144
        rec->record_length)
 
2145
  {
 
2146
    eprint(tracef, "Failed to read record");
 
2147
    return 1;
 
2148
  }
 
2149
 
 
2150
  info->trn= trn;
 
2151
  error= _ma_apply_undo_key_insert(info, previous_undo_lsn,
 
2152
                                   log_record_buffer.str + LSN_STORE_SIZE +
 
2153
                                   FILEID_STORE_SIZE,
 
2154
                                   rec->record_length - LSN_STORE_SIZE -
 
2155
                                   FILEID_STORE_SIZE);
 
2156
  info->trn= 0;
 
2157
  /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
 
2158
  tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
 
2159
         LSN_IN_PARTS(trn->undo_lsn));
 
2160
  return error;
 
2161
}
 
2162
 
 
2163
 
 
2164
prototype_undo_exec_hook(UNDO_KEY_DELETE)
 
2165
{
 
2166
  my_bool error;
 
2167
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
 
2168
  LSN previous_undo_lsn= lsn_korr(rec->header);
 
2169
  MARIA_SHARE *share;
 
2170
 
 
2171
  if (info == NULL)
 
2172
  {
 
2173
    skip_undo_record(previous_undo_lsn, trn);
 
2174
    return 0;
 
2175
  }
 
2176
 
 
2177
  share= info->s;
 
2178
  share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
 
2179
                          STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
 
2180
 
 
2181
  enlarge_buffer(rec);
 
2182
  if (log_record_buffer.str == NULL ||
 
2183
      translog_read_record(rec->lsn, 0, rec->record_length,
 
2184
                           log_record_buffer.str, NULL) !=
 
2185
        rec->record_length)
 
2186
  {
 
2187
    eprint(tracef, "Failed to read record");
 
2188
    return 1;
 
2189
  }
 
2190
 
 
2191
  info->trn= trn;
 
2192
  error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
 
2193
                                   log_record_buffer.str + LSN_STORE_SIZE +
 
2194
                                   FILEID_STORE_SIZE,
 
2195
                                   rec->record_length - LSN_STORE_SIZE -
 
2196
                                   FILEID_STORE_SIZE, FALSE);
 
2197
  info->trn= 0;
 
2198
  /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
 
2199
  tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
 
2200
         LSN_IN_PARTS(trn->undo_lsn));
 
2201
  return error;
 
2202
}
 
2203
 
 
2204
 
 
2205
prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
 
2206
{
 
2207
  my_bool error;
 
2208
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
 
2209
  LSN previous_undo_lsn= lsn_korr(rec->header);
 
2210
  MARIA_SHARE *share;
 
2211
 
 
2212
  if (info == NULL)
 
2213
  {
 
2214
    skip_undo_record(previous_undo_lsn, trn);
 
2215
    return 0;
 
2216
  }
 
2217
 
 
2218
  share= info->s;
 
2219
  share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
 
2220
                          STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
 
2221
 
 
2222
  enlarge_buffer(rec);
 
2223
  if (log_record_buffer.str == NULL ||
 
2224
      translog_read_record(rec->lsn, 0, rec->record_length,
 
2225
                           log_record_buffer.str, NULL) !=
 
2226
        rec->record_length)
 
2227
  {
 
2228
    eprint(tracef, "Failed to read record");
 
2229
    return 1;
 
2230
  }
 
2231
 
 
2232
  info->trn= trn;
 
2233
  error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
 
2234
                                   log_record_buffer.str + LSN_STORE_SIZE +
 
2235
                                   FILEID_STORE_SIZE,
 
2236
                                   rec->record_length - LSN_STORE_SIZE -
 
2237
                                   FILEID_STORE_SIZE, TRUE);
 
2238
  info->trn= 0;
 
2239
  /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
 
2240
  tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
 
2241
         LSN_IN_PARTS(trn->undo_lsn));
 
2242
  return error;
 
2243
}
 
2244
 
 
2245
 
 
2246
prototype_undo_exec_hook(UNDO_BULK_INSERT)
 
2247
{
 
2248
  my_bool error;
 
2249
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
 
2250
  LSN previous_undo_lsn= lsn_korr(rec->header);
 
2251
  MARIA_SHARE *share;
 
2252
 
 
2253
  if (info == NULL)
 
2254
  {
 
2255
    skip_undo_record(previous_undo_lsn, trn);
 
2256
    return 0;
 
2257
  }
 
2258
 
 
2259
  share= info->s;
 
2260
  share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
 
2261
                          STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
 
2262
 
 
2263
  info->trn= trn;
 
2264
  error= _ma_apply_undo_bulk_insert(info, previous_undo_lsn);
 
2265
  info->trn= 0;
 
2266
  /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
 
2267
  tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
 
2268
         LSN_IN_PARTS(trn->undo_lsn));
 
2269
  return error;
 
2270
}
 
2271
 
 
2272
 
 
2273
static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
 
2274
{
 
2275
  TRANSLOG_HEADER_BUFFER rec;
 
2276
  struct st_translog_scanner_data scanner;
 
2277
  int len;
 
2278
  uint i;
 
2279
 
 
2280
  /* install hooks for execution */
 
2281
#define install_redo_exec_hook(R)                                        \
 
2282
  log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
 
2283
    exec_REDO_LOGREC_ ## R;
 
2284
#define install_redo_exec_hook_shared(R,S)                               \
 
2285
  log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
 
2286
    exec_REDO_LOGREC_ ## S;
 
2287
#define install_undo_exec_hook(R)                                        \
 
2288
  log_record_type_descriptor[LOGREC_ ## R].record_execute_in_undo_phase= \
 
2289
    exec_UNDO_LOGREC_ ## R;
 
2290
  install_redo_exec_hook(LONG_TRANSACTION_ID);
 
2291
  install_redo_exec_hook(CHECKPOINT);
 
2292
  install_redo_exec_hook(REDO_CREATE_TABLE);
 
2293
  install_redo_exec_hook(REDO_RENAME_TABLE);
 
2294
  install_redo_exec_hook(REDO_REPAIR_TABLE);
 
2295
  install_redo_exec_hook(REDO_DROP_TABLE);
 
2296
  install_redo_exec_hook(FILE_ID);
 
2297
  install_redo_exec_hook(INCOMPLETE_LOG);
 
2298
  install_redo_exec_hook(INCOMPLETE_GROUP);
 
2299
  install_redo_exec_hook(REDO_INSERT_ROW_HEAD);
 
2300
  install_redo_exec_hook(REDO_INSERT_ROW_TAIL);
 
2301
  install_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
 
2302
  install_redo_exec_hook(REDO_PURGE_ROW_HEAD);
 
2303
  install_redo_exec_hook(REDO_PURGE_ROW_TAIL);
 
2304
  install_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
 
2305
  install_redo_exec_hook(REDO_FREE_BLOCKS);
 
2306
  install_redo_exec_hook(REDO_DELETE_ALL);
 
2307
  install_redo_exec_hook(REDO_INDEX);
 
2308
  install_redo_exec_hook(REDO_INDEX_NEW_PAGE);
 
2309
  install_redo_exec_hook(REDO_INDEX_FREE_PAGE);
 
2310
  install_redo_exec_hook(REDO_BITMAP_NEW_PAGE);
 
2311
  install_redo_exec_hook(UNDO_ROW_INSERT);
 
2312
  install_redo_exec_hook(UNDO_ROW_DELETE);
 
2313
  install_redo_exec_hook(UNDO_ROW_UPDATE);
 
2314
  install_redo_exec_hook(UNDO_KEY_INSERT);
 
2315
  install_redo_exec_hook(UNDO_KEY_DELETE);
 
2316
  install_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
 
2317
  install_redo_exec_hook(COMMIT);
 
2318
  install_redo_exec_hook(CLR_END);
 
2319
  install_undo_exec_hook(UNDO_ROW_INSERT);
 
2320
  install_undo_exec_hook(UNDO_ROW_DELETE);
 
2321
  install_undo_exec_hook(UNDO_ROW_UPDATE);
 
2322
  install_undo_exec_hook(UNDO_KEY_INSERT);
 
2323
  install_undo_exec_hook(UNDO_KEY_DELETE);
 
2324
  install_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
 
2325
  /* REDO_NEW_ROW_HEAD shares entry with REDO_INSERT_ROW_HEAD */
 
2326
  install_redo_exec_hook_shared(REDO_NEW_ROW_HEAD, REDO_INSERT_ROW_HEAD);
 
2327
  /* REDO_NEW_ROW_TAIL shares entry with REDO_INSERT_ROW_TAIL */
 
2328
  install_redo_exec_hook_shared(REDO_NEW_ROW_TAIL, REDO_INSERT_ROW_TAIL);
 
2329
  install_redo_exec_hook(UNDO_BULK_INSERT);
 
2330
  install_undo_exec_hook(UNDO_BULK_INSERT);
 
2331
 
 
2332
  current_group_end_lsn= LSN_IMPOSSIBLE;
 
2333
#ifndef DBUG_OFF
 
2334
  current_group_table= NULL;
 
2335
#endif
 
2336
 
 
2337
  if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon()))
 
2338
  {
 
2339
    tprint(tracef, "checkpoint address refers to the log end log or "
 
2340
           "log is empty, nothing to do.\n");
 
2341
    return 0;
 
2342
  }
 
2343
 
 
2344
  len= translog_read_record_header(lsn, &rec);
 
2345
 
 
2346
  if (len == RECHEADER_READ_ERROR)
 
2347
  {
 
2348
    eprint(tracef, "Failed to read header of the first record.");
 
2349
    return 1;
 
2350
  }
 
2351
  if (translog_scanner_init(lsn, 1, &scanner, 1))
 
2352
  {
 
2353
    tprint(tracef, "Scanner init failed\n");
 
2354
    return 1;
 
2355
  }
 
2356
  for (i= 1;;i++)
 
2357
  {
 
2358
    uint16 sid= rec.short_trid;
 
2359
    const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
 
2360
    display_record_position(log_desc, &rec, i);
 
2361
    /*
 
2362
      A complete group is a set of log records with an "end mark" record
 
2363
      (e.g. a set of REDOs for an operation, terminated by an UNDO for this
 
2364
      operation); if there is no "end mark" record the group is incomplete and
 
2365
      won't be executed.
 
2366
    */
 
2367
    if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
 
2368
        (log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
 
2369
    {
 
2370
      if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
 
2371
      {
 
2372
        if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
 
2373
        {
 
2374
          /*
 
2375
            Can happen if the transaction got a table write error, then
 
2376
            unlocked tables thus wrote a COMMIT record. Or can be an
 
2377
            INCOMPLETE_GROUP record written by a previous recovery.
 
2378
          */
 
2379
          tprint(tracef, "\nDiscarding incomplete group before this record\n");
 
2380
          all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
 
2381
        }
 
2382
        else
 
2383
        {
 
2384
          struct st_translog_scanner_data scanner2;
 
2385
          TRANSLOG_HEADER_BUFFER rec2;
 
2386
          /*
 
2387
            There is a complete group for this transaction, containing more
 
2388
            than this event.
 
2389
          */
 
2390
          tprint(tracef, "   ends a group:\n");
 
2391
          len=
 
2392
            translog_read_record_header(all_active_trans[sid].group_start_lsn,
 
2393
                                        &rec2);
 
2394
          if (len < 0) /* EOF or error */
 
2395
          {
 
2396
            tprint(tracef, "Cannot find record where it should be\n");
 
2397
            goto err;
 
2398
          }
 
2399
          if (translog_scanner_init(rec2.lsn, 1, &scanner2, 1))
 
2400
          {
 
2401
            tprint(tracef, "Scanner2 init failed\n");
 
2402
            goto err;
 
2403
          }
 
2404
          current_group_end_lsn= rec.lsn;
 
2405
          do
 
2406
          {
 
2407
            if (rec2.short_trid == sid) /* it's in our group */
 
2408
            {
 
2409
              const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
 
2410
              display_record_position(log_desc2, &rec2, 0);
 
2411
              if (apply == MARIA_LOG_CHECK)
 
2412
              {
 
2413
                translog_size_t read_len;
 
2414
                enlarge_buffer(&rec2);
 
2415
                read_len=
 
2416
                  translog_read_record(rec2.lsn, 0, rec2.record_length,
 
2417
                                       log_record_buffer.str, NULL);
 
2418
                if (read_len != rec2.record_length)
 
2419
                {
 
2420
                  tprint(tracef, "Cannot read record's body: read %u of"
 
2421
                         " %u bytes\n", read_len, rec2.record_length);
 
2422
                  translog_destroy_scanner(&scanner2);
 
2423
                  translog_free_record_header(&rec2);
 
2424
                  goto err;
 
2425
                }
 
2426
              }
 
2427
              if (apply == MARIA_LOG_APPLY &&
 
2428
                  display_and_apply_record(log_desc2, &rec2))
 
2429
              {
 
2430
                translog_destroy_scanner(&scanner2);
 
2431
                translog_free_record_header(&rec2);
 
2432
                goto err;
 
2433
              }
 
2434
            }
 
2435
            translog_free_record_header(&rec2);
 
2436
            len= translog_read_next_record_header(&scanner2, &rec2);
 
2437
            if (len < 0) /* EOF or error */
 
2438
            {
 
2439
              tprint(tracef, "Cannot find record where it should be\n");
 
2440
              translog_destroy_scanner(&scanner2);
 
2441
              translog_free_record_header(&rec2);
 
2442
              goto err;
 
2443
            }
 
2444
          }
 
2445
          while (rec2.lsn < rec.lsn);
 
2446
          /* group finished */
 
2447
          all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
 
2448
          current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
 
2449
          display_record_position(log_desc, &rec, 0);
 
2450
          translog_destroy_scanner(&scanner2);
 
2451
          translog_free_record_header(&rec2);
 
2452
        }
 
2453
      }
 
2454
      if (apply == MARIA_LOG_APPLY &&
 
2455
          display_and_apply_record(log_desc, &rec))
 
2456
        goto err;
 
2457
#ifndef DBUG_OFF
 
2458
      current_group_table= NULL;
 
2459
#endif
 
2460
    }
 
2461
    else /* record does not end group */
 
2462
    {
 
2463
      /* just record the fact, can't know if can execute yet */
 
2464
      if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
 
2465
      {
 
2466
        /* group not yet started */
 
2467
        all_active_trans[sid].group_start_lsn= rec.lsn;
 
2468
      }
 
2469
    }
 
2470
    translog_free_record_header(&rec);
 
2471
    len= translog_read_next_record_header(&scanner, &rec);
 
2472
    if (len < 0)
 
2473
    {
 
2474
      switch (len)
 
2475
      {
 
2476
      case RECHEADER_READ_EOF:
 
2477
        tprint(tracef, "EOF on the log\n");
 
2478
        break;
 
2479
      case RECHEADER_READ_ERROR:
 
2480
        tprint(tracef, "Error reading log\n");
 
2481
        goto err;
 
2482
      }
 
2483
      break;
 
2484
    }
 
2485
  }
 
2486
  translog_destroy_scanner(&scanner);
 
2487
  translog_free_record_header(&rec);
 
2488
  if (recovery_message_printed == REC_MSG_REDO)
 
2489
  {
 
2490
    fprintf(stderr, " 100%%");
 
2491
    fflush(stderr);
 
2492
    procent_printed= 1;
 
2493
  }
 
2494
  return 0;
 
2495
 
 
2496
err:
 
2497
  translog_destroy_scanner(&scanner);
 
2498
  translog_free_record_header(&rec);
 
2499
  return 1;
 
2500
}
 
2501
 
 
2502
 
 
2503
/**
 
2504
   @brief Informs about any aborted groups or uncommitted transactions,
 
2505
   prepares for the UNDO phase if needed.
 
2506
 
 
2507
   @note Observe that it may init trnman.
 
2508
*/
 
2509
static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
 
2510
{
 
2511
  uint sid, uncommitted= 0;
 
2512
  char llbuf[22];
 
2513
  LSN addr;
 
2514
 
 
2515
  hash_free(&all_dirty_pages);
 
2516
  /*
 
2517
    hash_free() can be called multiple times probably, but be safe if that
 
2518
    changes
 
2519
  */
 
2520
  bzero(&all_dirty_pages, sizeof(all_dirty_pages));
 
2521
  my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
 
2522
  dirty_pages_pool= NULL;
 
2523
 
 
2524
  llstr(max_long_trid, llbuf);
 
2525
  tprint(tracef, "Maximum transaction long id seen: %s\n", llbuf);
 
2526
  llstr(max_trid_in_control_file, llbuf);
 
2527
  tprint(tracef, "Maximum transaction long id seen in control file: %s\n",
 
2528
         llbuf);
 
2529
  /*
 
2530
    If logs were deleted, or lost, trid in control file is needed to set
 
2531
    trnman's generator:
 
2532
  */
 
2533
  set_if_bigger(max_long_trid, max_trid_in_control_file);
 
2534
  if (prepare_for_undo_phase && trnman_init(max_long_trid))
 
2535
    return -1;
 
2536
 
 
2537
  trns_created= TRUE;
 
2538
 
 
2539
  for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
 
2540
  {
 
2541
    TrID long_trid= all_active_trans[sid].long_trid;
 
2542
    LSN gslsn= all_active_trans[sid].group_start_lsn;
 
2543
    TRN *trn;
 
2544
    if (gslsn != LSN_IMPOSSIBLE)
 
2545
    {
 
2546
      tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u incomplete\n",
 
2547
             LSN_IN_PARTS(gslsn), sid);
 
2548
      all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
 
2549
    }
 
2550
    if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
 
2551
    {
 
2552
      llstr(long_trid, llbuf);
 
2553
      tprint(tracef, "Transaction long_trid %s short_trid %u uncommitted\n",
 
2554
             llbuf, sid);
 
2555
      /*
 
2556
        dummy_transaction_object serves only for DDLs, where there is never a
 
2557
        rollback or incomplete group. And unknown transactions (which have
 
2558
        long_trid==0) should have undo_lsn==LSN_IMPOSSIBLE.
 
2559
      */
 
2560
      if (long_trid ==0)
 
2561
      {
 
2562
        eprint(tracef, "Transaction with long_trid 0 should not roll back");
 
2563
        ALERT_USER();
 
2564
        return -1;
 
2565
      }
 
2566
      if (prepare_for_undo_phase)
 
2567
      {
 
2568
        if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL)
 
2569
          return -1;
 
2570
        trn->undo_lsn= all_active_trans[sid].undo_lsn;
 
2571
        trn->first_undo_lsn= all_active_trans[sid].first_undo_lsn |
 
2572
          TRANSACTION_LOGGED_LONG_ID; /* because trn is known in log */
 
2573
        if (gslsn != LSN_IMPOSSIBLE)
 
2574
        {
 
2575
          /*
 
2576
            UNDO phase will log some records. So, a future recovery may see:
 
2577
            REDO(from incomplete group) - REDO(from rollback) - CLR_END
 
2578
            and thus execute the first REDO (finding it in "a complete
 
2579
            group"). To prevent that:
 
2580
          */
 
2581
          LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS];
 
2582
          LSN lsn;
 
2583
          if (translog_write_record(&lsn, LOGREC_INCOMPLETE_GROUP,
 
2584
                                    trn, NULL, 0,
 
2585
                                    TRANSLOG_INTERNAL_PARTS, log_array,
 
2586
                                    NULL, NULL))
 
2587
            return -1;
 
2588
        }
 
2589
      }
 
2590
      uncommitted++;
 
2591
    }
 
2592
#ifdef MARIA_VERSIONING
 
2593
    /*
 
2594
      If real recovery: if transaction was committed, move it to some separate
 
2595
      list for soon purging.
 
2596
    */
 
2597
#endif
 
2598
  }
 
2599
 
 
2600
  my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
 
2601
  all_active_trans= NULL;
 
2602
 
 
2603
  /*
 
2604
    The UNDO phase uses some normal run-time code of ROLLBACK: generates log
 
2605
    records, etc; prepare tables for that
 
2606
  */
 
2607
  addr= translog_get_horizon();
 
2608
  for (sid= 0; sid <= SHARE_ID_MAX; sid++)
 
2609
  {
 
2610
    MARIA_HA *info= all_tables[sid].info;
 
2611
    if (info != NULL)
 
2612
    {
 
2613
      prepare_table_for_close(info, addr);
 
2614
      /*
 
2615
        But we don't close it; we leave it available for the UNDO phase;
 
2616
        it's likely that the UNDO phase will need it.
 
2617
      */
 
2618
      if (prepare_for_undo_phase)
 
2619
        translog_assign_id_to_share_from_recovery(info->s, sid);
 
2620
    }
 
2621
  }
 
2622
  return uncommitted;
 
2623
}
 
2624
 
 
2625
 
 
2626
static int run_undo_phase(uint uncommitted)
 
2627
{
 
2628
  LSN last_undo;
 
2629
  DBUG_ENTER("run_undo_phase");
 
2630
 
 
2631
  if (uncommitted > 0)
 
2632
  {
 
2633
    checkpoint_useful= TRUE;
 
2634
    if (tracef != stdout)
 
2635
    {
 
2636
      if (recovery_message_printed == REC_MSG_NONE)
 
2637
        print_preamble();
 
2638
      fprintf(stderr, "transactions to roll back:");
 
2639
      recovery_message_printed= REC_MSG_UNDO;
 
2640
    }
 
2641
    tprint(tracef, "%u transactions will be rolled back\n", uncommitted);
 
2642
    procent_printed= 1;
 
2643
    for( ; ; )
 
2644
    {
 
2645
      char llbuf[22];
 
2646
      TRN *trn;
 
2647
      if (recovery_message_printed == REC_MSG_UNDO)
 
2648
      {
 
2649
        fprintf(stderr, " %u", uncommitted);
 
2650
        fflush(stderr);
 
2651
      }
 
2652
      if ((uncommitted--) == 0)
 
2653
        break;
 
2654
      trn= trnman_get_any_trn();
 
2655
      DBUG_ASSERT(trn != NULL);
 
2656
      llstr(trn->trid, llbuf);
 
2657
      tprint(tracef, "Rolling back transaction of long id %s\n", llbuf);
 
2658
      last_undo= trn->undo_lsn + 1;
 
2659
 
 
2660
      /* Execute all undo entries */
 
2661
      while (trn->undo_lsn)
 
2662
      {
 
2663
        TRANSLOG_HEADER_BUFFER rec;
 
2664
        LOG_DESC *log_desc;
 
2665
        DBUG_ASSERT(trn->undo_lsn < last_undo);
 
2666
        last_undo= trn->undo_lsn;
 
2667
 
 
2668
        if (translog_read_record_header(trn->undo_lsn, &rec) ==
 
2669
            RECHEADER_READ_ERROR)
 
2670
          DBUG_RETURN(1);
 
2671
        log_desc= &log_record_type_descriptor[rec.type];
 
2672
        display_record_position(log_desc, &rec, 0);
 
2673
        if (log_desc->record_execute_in_undo_phase(&rec, trn))
 
2674
        {
 
2675
          eprint(tracef, "Got error %d when executing undo %s", my_errno,
 
2676
                 log_desc->name);
 
2677
          translog_free_record_header(&rec);
 
2678
          DBUG_RETURN(1);
 
2679
        }
 
2680
        translog_free_record_header(&rec);
 
2681
      }
 
2682
 
 
2683
      if (trnman_rollback_trn(trn))
 
2684
        DBUG_RETURN(1);
 
2685
      /* We could want to span a few threads (4?) instead of 1 */
 
2686
      /* In the future, we want to have this phase *online* */
 
2687
    }
 
2688
  }
 
2689
  procent_printed= 0;
 
2690
  DBUG_RETURN(0);
 
2691
}
 
2692
 
 
2693
 
 
2694
/**
 
2695
  In case of error in recovery, deletes all transactions from the transaction
 
2696
  manager so that this module does not assert.
 
2697
 
 
2698
  @note no checkpoint should be taken as those transactions matter for the
 
2699
  next recovery (they still haven't been properly dealt with).
 
2700
*/
 
2701
 
 
2702
static void delete_all_transactions()
 
2703
{
 
2704
  for( ; ; )
 
2705
  {
 
2706
    TRN *trn= trnman_get_any_trn();
 
2707
    if (trn == NULL)
 
2708
      break;
 
2709
    trn->undo_lsn= trn->first_undo_lsn= LSN_IMPOSSIBLE;
 
2710
    trnman_rollback_trn(trn); /* ignore error */
 
2711
  }
 
2712
}
 
2713
 
 
2714
 
 
2715
/**
 
2716
   @brief re-enables transactionality, updates is_of_horizon
 
2717
 
 
2718
   @param  info                table
 
2719
   @param  horizon             address to set is_of_horizon
 
2720
*/
 
2721
 
 
2722
static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon)
 
2723
{
 
2724
  MARIA_SHARE *share= info->s;
 
2725
  /*
 
2726
    In a fully-forward REDO phase (no checkpoint record),
 
2727
    state is now at least as new as the LSN of the current record. It may be
 
2728
    newer, in case we are seeing a LOGREC_FILE_ID which tells us to close a
 
2729
    table, but that table was later modified further in the log.
 
2730
    But if we parsed a checkpoint record, it may be this way in the log:
 
2731
    FILE_ID(6->t2)... FILE_ID(6->t1)... CHECKPOINT(6->t1)
 
2732
    Checkpoint parsing opened t1 with id 6; first FILE_ID above is going to
 
2733
    make t1 close; the first condition below is however false (when checkpoint
 
2734
    was taken it increased is_of_horizon) and so it works. For safety we
 
2735
    add the second condition.
 
2736
  */
 
2737
  if (cmp_translog_addr(share->state.is_of_horizon, horizon) < 0 &&
 
2738
      cmp_translog_addr(share->lsn_of_file_id, horizon) < 0)
 
2739
  {
 
2740
    share->state.is_of_horizon= horizon;
 
2741
    _ma_state_info_write_sub(share->kfile.file, &share->state, 1);
 
2742
  }
 
2743
 
 
2744
  /*
 
2745
   Ensure that info->state is up to date as
 
2746
   _ma_renable_logging_for_table() is depending on this
 
2747
  */
 
2748
  *info->state= info->s->state.state;
 
2749
 
 
2750
  /*
 
2751
    This leaves PAGECACHE_PLAIN_PAGE pages into the cache, while the table is
 
2752
    going to switch back to transactional. So the table will be a mix of
 
2753
    pages, which is ok as long as we don't take any checkpoints until all
 
2754
    tables get closed at the end of the UNDO phase.
 
2755
  */
 
2756
  _ma_reenable_logging_for_table(info, FALSE);
 
2757
  info->trn= NULL; /* safety */
 
2758
}
 
2759
 
 
2760
 
 
2761
static MARIA_HA *get_MARIA_HA_from_REDO_record(const
 
2762
                                               TRANSLOG_HEADER_BUFFER *rec)
 
2763
{
 
2764
  uint16 sid;
 
2765
  pgcache_page_no_t page;
 
2766
  MARIA_HA *info;
 
2767
  MARIA_SHARE *share;
 
2768
  char llbuf[22];
 
2769
  my_bool index_page_redo_entry= FALSE, page_redo_entry= FALSE;
 
2770
  LINT_INIT(page);
 
2771
 
 
2772
  print_redo_phase_progress(rec->lsn);
 
2773
  sid= fileid_korr(rec->header);
 
2774
  switch (rec->type) {
 
2775
    /* not all REDO records have a page: */
 
2776
  case LOGREC_REDO_INDEX_NEW_PAGE:
 
2777
  case LOGREC_REDO_INDEX:
 
2778
  case LOGREC_REDO_INDEX_FREE_PAGE:
 
2779
    index_page_redo_entry= 1;
 
2780
    /* Fall trough*/
 
2781
  case LOGREC_REDO_INSERT_ROW_HEAD:
 
2782
  case LOGREC_REDO_INSERT_ROW_TAIL:
 
2783
  case LOGREC_REDO_PURGE_ROW_HEAD:
 
2784
  case LOGREC_REDO_PURGE_ROW_TAIL:
 
2785
  case LOGREC_REDO_NEW_ROW_HEAD:
 
2786
  case LOGREC_REDO_NEW_ROW_TAIL:
 
2787
  case LOGREC_REDO_FREE_HEAD_OR_TAIL:
 
2788
    page_redo_entry= TRUE;
 
2789
    page= page_korr(rec->header + FILEID_STORE_SIZE);
 
2790
    llstr(page, llbuf);
 
2791
    break;
 
2792
    /*
 
2793
      For REDO_FREE_BLOCKS, no need to look at dirty pages list: it does not
 
2794
      read data pages, only reads/modifies bitmap page(s) which is cheap.
 
2795
    */
 
2796
  default:
 
2797
    break;
 
2798
  }
 
2799
  tprint(tracef, "   For table of short id %u", sid);
 
2800
  info= all_tables[sid].info;
 
2801
#ifndef DBUG_OFF
 
2802
  DBUG_ASSERT(current_group_table == NULL || current_group_table == info);
 
2803
  current_group_table= info;
 
2804
#endif
 
2805
  if (info == NULL)
 
2806
  {
 
2807
    tprint(tracef, ", table skipped, so skipping record\n");
 
2808
    return NULL;
 
2809
  }
 
2810
  share= info->s;
 
2811
  tprint(tracef, ", '%s'", share->open_file_name);
 
2812
  DBUG_ASSERT(in_redo_phase);
 
2813
  if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
 
2814
  {
 
2815
    /*
 
2816
      This can happen only if processing a record before the checkpoint
 
2817
      record.
 
2818
      id->name mapping is newer than REDO record: for sure the table subject
 
2819
      of the REDO has been flushed and forced (id re-assignment implies this);
 
2820
      REDO can be ignored (and must be, as we don't know what this subject
 
2821
      table was).
 
2822
    */
 
2823
    DBUG_ASSERT(cmp_translog_addr(rec->lsn, checkpoint_start) < 0);
 
2824
    tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent"
 
2825
           " than record, skipping record",
 
2826
           LSN_IN_PARTS(share->lsn_of_file_id));
 
2827
    return NULL;
 
2828
  }
 
2829
  if (cmp_translog_addr(rec->lsn, share->state.skip_redo_lsn) <= 0)
 
2830
  {
 
2831
    /* probably a bulk insert repair */
 
2832
    tprint(tracef, ", has skip_redo_lsn (%lu,0x%lx) more recent than"
 
2833
           " record, skipping record\n",
 
2834
           LSN_IN_PARTS(share->state.skip_redo_lsn));
 
2835
    return NULL;
 
2836
  }
 
2837
  /* detect if an open instance of a dropped table (internal bug) */
 
2838
  DBUG_ASSERT(share->last_version != 0);
 
2839
  if (page_redo_entry)
 
2840
  {
 
2841
    /*
 
2842
      Consult dirty pages list.
 
2843
      REDO_INSERT_ROW_BLOBS will consult list by itself, as it covers several
 
2844
      pages.
 
2845
    */
 
2846
    tprint(tracef, " page %s", llbuf);
 
2847
    if (_ma_redo_not_needed_for_page(sid, rec->lsn, page,
 
2848
                                     index_page_redo_entry))
 
2849
      return NULL;
 
2850
  }
 
2851
  /*
 
2852
    So we are going to read the page, and if its LSN is older than the
 
2853
    record's we will modify the page
 
2854
  */
 
2855
  tprint(tracef, ", applying record\n");
 
2856
  _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
 
2857
  return info;
 
2858
}
 
2859
 
 
2860
 
 
2861
static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
 
2862
                                               TRANSLOG_HEADER_BUFFER *rec)
 
2863
{
 
2864
  uint16 sid;
 
2865
  MARIA_HA *info;
 
2866
  MARIA_SHARE *share;
 
2867
 
 
2868
  sid= fileid_korr(rec->header + LSN_STORE_SIZE);
 
2869
  tprint(tracef, "   For table of short id %u", sid);
 
2870
  info= all_tables[sid].info;
 
2871
#ifndef DBUG_OFF
 
2872
  DBUG_ASSERT(!in_redo_phase ||
 
2873
              current_group_table == NULL || current_group_table == info);
 
2874
  current_group_table= info;
 
2875
#endif
 
2876
  if (info == NULL)
 
2877
  {
 
2878
    tprint(tracef, ", table skipped, so skipping record\n");
 
2879
    return NULL;
 
2880
  }
 
2881
  share= info->s;
 
2882
  tprint(tracef, ", '%s'", share->open_file_name);
 
2883
  if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
 
2884
  {
 
2885
    tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent"
 
2886
           " than record, skipping record",
 
2887
           LSN_IN_PARTS(share->lsn_of_file_id));
 
2888
    return NULL;
 
2889
  }
 
2890
  if (in_redo_phase &&
 
2891
      cmp_translog_addr(rec->lsn, share->state.skip_redo_lsn) <= 0)
 
2892
  {
 
2893
    /* probably a bulk insert repair */
 
2894
    tprint(tracef, ", has skip_redo_lsn (%lu,0x%lx) more recent than"
 
2895
           " record, skipping record\n",
 
2896
           LSN_IN_PARTS(share->state.skip_redo_lsn));
 
2897
    return NULL;
 
2898
  }
 
2899
  DBUG_ASSERT(share->last_version != 0);
 
2900
  _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
 
2901
  tprint(tracef, ", applying record\n");
 
2902
  return info;
 
2903
}
 
2904
 
 
2905
 
 
2906
/**
 
2907
   @brief Parses checkpoint record.
 
2908
 
 
2909
   Builds from it the dirty_pages list (a hash), opens tables and maps them to
 
2910
   their 2-byte IDs, recreates transactions (not real TRNs though).
 
2911
 
 
2912
   @return LSN from where in the log the REDO phase should start
 
2913
     @retval LSN_ERROR error
 
2914
     @retval other     ok
 
2915
*/
 
2916
 
 
2917
static LSN parse_checkpoint_record(LSN lsn)
 
2918
{
 
2919
  ulong i;
 
2920
  ulonglong nb_dirty_pages;
 
2921
  TRANSLOG_HEADER_BUFFER rec;
 
2922
  TRANSLOG_ADDRESS start_address;
 
2923
  int len;
 
2924
  uint nb_active_transactions, nb_committed_transactions, nb_tables;
 
2925
  uchar *ptr;
 
2926
  LSN minimum_rec_lsn_of_active_transactions, minimum_rec_lsn_of_dirty_pages;
 
2927
  struct st_dirty_page *next_dirty_page_in_pool;
 
2928
 
 
2929
  tprint(tracef, "Loading data from checkpoint record at LSN (%lu,0x%lx)\n",
 
2930
         LSN_IN_PARTS(lsn));
 
2931
  if ((len= translog_read_record_header(lsn, &rec)) == RECHEADER_READ_ERROR)
 
2932
  {
 
2933
    tprint(tracef, "Cannot find checkpoint record where it should be\n");
 
2934
    return LSN_ERROR;
 
2935
  }
 
2936
 
 
2937
  enlarge_buffer(&rec);
 
2938
  if (log_record_buffer.str == NULL ||
 
2939
      translog_read_record(rec.lsn, 0, rec.record_length,
 
2940
                           log_record_buffer.str, NULL) !=
 
2941
      rec.record_length)
 
2942
  {
 
2943
    eprint(tracef, "Failed to read record");
 
2944
    return LSN_ERROR;
 
2945
  }
 
2946
 
 
2947
  ptr= log_record_buffer.str;
 
2948
  start_address= lsn_korr(ptr);
 
2949
  ptr+= LSN_STORE_SIZE;
 
2950
 
 
2951
  /* transactions */
 
2952
  nb_active_transactions= uint2korr(ptr);
 
2953
  ptr+= 2;
 
2954
  tprint(tracef, "%u active transactions\n", nb_active_transactions);
 
2955
  minimum_rec_lsn_of_active_transactions= lsn_korr(ptr);
 
2956
  ptr+= LSN_STORE_SIZE;
 
2957
  max_long_trid= transid_korr(ptr);
 
2958
  ptr+= TRANSID_SIZE;
 
2959
 
 
2960
  /*
 
2961
    how much brain juice and discussions there was to come to writing this
 
2962
    line. It may make start_address slightly decrease (only by the time it
 
2963
    takes to write one or a few rows, roughly).
 
2964
  */
 
2965
  set_if_smaller(start_address, minimum_rec_lsn_of_active_transactions);
 
2966
 
 
2967
  for (i= 0; i < nb_active_transactions; i++)
 
2968
  {
 
2969
    uint16 sid= uint2korr(ptr);
 
2970
    TrID long_id;
 
2971
    LSN undo_lsn, first_undo_lsn;
 
2972
    ptr+= 2;
 
2973
    long_id= uint6korr(ptr);
 
2974
    ptr+= 6;
 
2975
    DBUG_ASSERT(sid > 0 && long_id > 0);
 
2976
    undo_lsn= lsn_korr(ptr);
 
2977
    ptr+= LSN_STORE_SIZE;
 
2978
    first_undo_lsn= lsn_korr(ptr);
 
2979
    ptr+= LSN_STORE_SIZE;
 
2980
    new_transaction(sid, long_id, undo_lsn, first_undo_lsn);
 
2981
  }
 
2982
  nb_committed_transactions= uint4korr(ptr);
 
2983
  ptr+= 4;
 
2984
  tprint(tracef, "%lu committed transactions\n",
 
2985
         (ulong)nb_committed_transactions);
 
2986
  /* no purging => committed transactions are not important */
 
2987
  ptr+= (6 + LSN_STORE_SIZE) * nb_committed_transactions;
 
2988
 
 
2989
  /* tables  */
 
2990
  nb_tables= uint4korr(ptr);
 
2991
  ptr+= 4;
 
2992
  tprint(tracef, "%u open tables\n", nb_tables);
 
2993
  for (i= 0; i< nb_tables; i++)
 
2994
  {
 
2995
    char name[FN_REFLEN];
 
2996
    LSN first_log_write_lsn;
 
2997
    uint name_len;
 
2998
    uint16 sid= uint2korr(ptr);
 
2999
    ptr+= 2;
 
3000
    DBUG_ASSERT(sid > 0);
 
3001
    first_log_write_lsn= lsn_korr(ptr);
 
3002
    ptr+= LSN_STORE_SIZE;
 
3003
    name_len= strlen((char *)ptr) + 1;
 
3004
    strmake(name, (char *)ptr, sizeof(name)-1);
 
3005
    ptr+= name_len;
 
3006
    if (new_table(sid, name, first_log_write_lsn))
 
3007
      return LSN_ERROR;
 
3008
  }
 
3009
 
 
3010
  /* dirty pages */
 
3011
  nb_dirty_pages= uint8korr(ptr);
 
3012
 
 
3013
  /* Ensure casts later will not loose significant bits. */
 
3014
  DBUG_ASSERT((nb_dirty_pages <= SIZE_T_MAX/sizeof(struct st_dirty_page)) &&
 
3015
              (nb_dirty_pages <= ULONG_MAX));
 
3016
 
 
3017
  ptr+= 8;
 
3018
  tprint(tracef, "%lu dirty pages\n", (ulong) nb_dirty_pages);
 
3019
  if (hash_init(&all_dirty_pages, &my_charset_bin, (ulong)nb_dirty_pages,
 
3020
                offsetof(struct st_dirty_page, file_and_page_id),
 
3021
                sizeof(((struct st_dirty_page *)NULL)->file_and_page_id),
 
3022
                NULL, NULL, 0))
 
3023
    return LSN_ERROR;
 
3024
  dirty_pages_pool=
 
3025
    (struct st_dirty_page *)my_malloc((size_t)nb_dirty_pages *
 
3026
                                      sizeof(struct st_dirty_page),
 
3027
                                      MYF(MY_WME));
 
3028
  if (unlikely(dirty_pages_pool == NULL))
 
3029
    return LSN_ERROR;
 
3030
  next_dirty_page_in_pool= dirty_pages_pool;
 
3031
  minimum_rec_lsn_of_dirty_pages= LSN_MAX;
 
3032
  for (i= 0; i < nb_dirty_pages ; i++)
 
3033
  {
 
3034
    pgcache_page_no_t page_id;
 
3035
    LSN rec_lsn;
 
3036
    uint32 is_index;
 
3037
    uint16 table_id= uint2korr(ptr);
 
3038
    ptr+= 2;
 
3039
    is_index= ptr[0];
 
3040
    ptr++;
 
3041
    page_id= page_korr(ptr);
 
3042
    ptr+= PAGE_STORE_SIZE;
 
3043
    rec_lsn= lsn_korr(ptr);
 
3044
    ptr+= LSN_STORE_SIZE;
 
3045
    if (new_page((is_index << 16) | table_id,
 
3046
                 page_id, rec_lsn, next_dirty_page_in_pool++))
 
3047
      return LSN_ERROR;
 
3048
    set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn);
 
3049
  }
 
3050
  /* after that, there will be no insert/delete into the hash */
 
3051
  /*
 
3052
    sanity check on record (did we screw up with all those "ptr+=", did the
 
3053
    checkpoint write code and checkpoint read code go out of sync?).
 
3054
  */
 
3055
  if (ptr != (log_record_buffer.str + log_record_buffer.length))
 
3056
  {
 
3057
    eprint(tracef, "checkpoint record corrupted\n");
 
3058
    return LSN_ERROR;
 
3059
  }
 
3060
 
 
3061
  /*
 
3062
    start_address is now from where the dirty pages list can be ignored.
 
3063
    Find LSN higher or equal to this TRANSLOG_ADDRESS, suitable for
 
3064
    translog_read_record() functions.
 
3065
  */
 
3066
  start_address= checkpoint_start=
 
3067
    translog_next_LSN(start_address, LSN_IMPOSSIBLE);
 
3068
  if (checkpoint_start == LSN_IMPOSSIBLE)
 
3069
  {
 
3070
    /*
 
3071
      There must be a problem, as our checkpoint record exists and is >= the
 
3072
      address which is stored in its first bytes, which is >= start_address.
 
3073
    */
 
3074
    return LSN_ERROR;
 
3075
  }
 
3076
  /* now, where the REDO phase should start reading log: */
 
3077
  set_if_smaller(start_address, minimum_rec_lsn_of_dirty_pages);
 
3078
  DBUG_PRINT("info",
 
3079
             ("checkpoint_start: (%lu,0x%lx) start_address: (%lu,0x%lx)",
 
3080
              LSN_IN_PARTS(checkpoint_start), LSN_IN_PARTS(start_address)));
 
3081
  return start_address;
 
3082
}
 
3083
 
 
3084
 
 
3085
static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
 
3086
                    struct st_dirty_page *dirty_page)
 
3087
{
 
3088
  /* serves as hash key */
 
3089
  dirty_page->file_and_page_id= (((uint64)fileid) << 40) | pageid;
 
3090
  dirty_page->rec_lsn= rec_lsn;
 
3091
  return my_hash_insert(&all_dirty_pages, (uchar *)dirty_page);
 
3092
}
 
3093
 
 
3094
 
 
3095
static int close_all_tables(void)
 
3096
{
 
3097
  int error= 0;
 
3098
  uint count= 0;
 
3099
  LIST *list_element, *next_open;
 
3100
  MARIA_HA *info;
 
3101
  TRANSLOG_ADDRESS addr;
 
3102
  DBUG_ENTER("close_all_tables");
 
3103
 
 
3104
  pthread_mutex_lock(&THR_LOCK_maria);
 
3105
  if (maria_open_list == NULL)
 
3106
    goto end;
 
3107
  tprint(tracef, "Closing all tables\n");
 
3108
  if (tracef != stdout)
 
3109
  {
 
3110
    if (recovery_message_printed == REC_MSG_NONE)
 
3111
      print_preamble();
 
3112
    for (count= 0, list_element= maria_open_list ;
 
3113
         list_element ; count++, (list_element= list_element->next))
 
3114
      ;
 
3115
    fprintf(stderr, "tables to flush:");
 
3116
    recovery_message_printed= REC_MSG_FLUSH;
 
3117
  }
 
3118
  /*
 
3119
    Since the end of end_of_redo_phase(), we may have written new records
 
3120
    (if UNDO phase ran)  and thus the state is newer than at
 
3121
    end_of_redo_phase(), we need to bump is_of_horizon again.
 
3122
  */
 
3123
  addr= translog_get_horizon();
 
3124
  for (list_element= maria_open_list ; ; list_element= next_open)
 
3125
  {
 
3126
    if (recovery_message_printed == REC_MSG_FLUSH)
 
3127
    {
 
3128
      fprintf(stderr, " %u", count--);
 
3129
      fflush(stderr);
 
3130
    }
 
3131
    if (list_element == NULL)
 
3132
      break;
 
3133
    next_open= list_element->next;
 
3134
    info= (MARIA_HA*)list_element->data;
 
3135
    pthread_mutex_unlock(&THR_LOCK_maria); /* ok, UNDO phase not online yet */
 
3136
    /*
 
3137
      Tables which we see here are exactly those which were open at time of
 
3138
      crash. They might have open_count>0 as Checkpoint maybe flushed their
 
3139
      state while they were used. As Recovery corrected them, don't alarm the
 
3140
      user, don't ask for a table check:
 
3141
    */
 
3142
    info->s->state.open_count= 0;
 
3143
    prepare_table_for_close(info, addr);
 
3144
    error|= maria_close(info);
 
3145
    pthread_mutex_lock(&THR_LOCK_maria);
 
3146
  }
 
3147
end:
 
3148
  pthread_mutex_unlock(&THR_LOCK_maria);
 
3149
  DBUG_RETURN(error);
 
3150
}
 
3151
 
 
3152
 
 
3153
/**
 
3154
   @brief Close all table instances with a certain name which are present in
 
3155
   all_tables.
 
3156
 
 
3157
   @param  name                Name of table
 
3158
   @param  addr                Log address passed to prepare_table_for_close()
 
3159
*/
 
3160
 
 
3161
static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr)
 
3162
{
 
3163
  my_bool res= 0;
 
3164
  /* There are no other threads using the tables, so we don't need any locks */
 
3165
  struct st_table_for_recovery *internal_table, *end;
 
3166
  for (internal_table= all_tables, end= internal_table + SHARE_ID_MAX + 1;
 
3167
       internal_table < end ;
 
3168
       internal_table++)
 
3169
  {
 
3170
    MARIA_HA *info= internal_table->info;
 
3171
    if ((info != NULL) && !strcmp(info->s->open_file_name, name))
 
3172
    {
 
3173
      prepare_table_for_close(info, addr);
 
3174
      if (maria_close(info))
 
3175
        res= 1;
 
3176
      internal_table->info= NULL;
 
3177
    }
 
3178
  }
 
3179
  return res;
 
3180
}
 
3181
 
 
3182
 
 
3183
/**
 
3184
   Temporarily disables logging for this table.
 
3185
 
 
3186
   If that makes the log incomplete, writes a LOGREC_INCOMPLETE_LOG to the log
 
3187
   to warn log readers.
 
3188
 
 
3189
   @param  info            table
 
3190
   @param  log_incomplete  if that disabling makes the log incomplete
 
3191
 
 
3192
   @note for example in the REDO phase we disable logging but that does not
 
3193
   make the log incomplete.
 
3194
*/
 
3195
 
 
3196
void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
 
3197
                                       my_bool log_incomplete)
 
3198
{
 
3199
  MARIA_SHARE *share= info->s;
 
3200
  DBUG_ENTER("_ma_tmp_disable_logging_for_table");
 
3201
  if (log_incomplete)
 
3202
  {
 
3203
    uchar log_data[FILEID_STORE_SIZE];
 
3204
    LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
 
3205
    LSN lsn;
 
3206
    log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
 
3207
    log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
 
3208
    translog_write_record(&lsn, LOGREC_INCOMPLETE_LOG,
 
3209
                          &dummy_transaction_object, info,
 
3210
                          (translog_size_t) sizeof(log_data),
 
3211
                          TRANSLOG_INTERNAL_PARTS + 1, log_array,
 
3212
                          log_data, NULL);
 
3213
  }
 
3214
 
 
3215
  /* if we disabled before writing the record, record wouldn't reach log */
 
3216
  share->now_transactional= FALSE;
 
3217
  /*
 
3218
    Some code in ma_blockrec.c assumes a trn even if !now_transactional but in
 
3219
    this case it only reads trn->rec_lsn, which has to be LSN_IMPOSSIBLE and
 
3220
    should be now. info->trn may be NULL in maria_chk.
 
3221
  */
 
3222
  if (info->trn == NULL)
 
3223
    info->trn= &dummy_transaction_object;
 
3224
  DBUG_ASSERT(info->trn->rec_lsn == LSN_IMPOSSIBLE);
 
3225
  share->page_type= PAGECACHE_PLAIN_PAGE;
 
3226
  /* Functions below will pick up now_transactional and change callbacks */
 
3227
  _ma_set_data_pagecache_callbacks(&info->dfile, share);
 
3228
  _ma_set_index_pagecache_callbacks(&share->kfile, share);
 
3229
  _ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share);
 
3230
  DBUG_VOID_RETURN;
 
3231
}
 
3232
 
 
3233
 
 
3234
/**
 
3235
   Re-enables logging for a table which had it temporarily disabled.
 
3236
 
 
3237
   @param  info            table
 
3238
   @param  flush_pages     if function needs to flush pages first
 
3239
*/
 
3240
 
 
3241
my_bool _ma_reenable_logging_for_table(MARIA_HA *info, my_bool flush_pages)
 
3242
{
 
3243
  MARIA_SHARE *share= info->s;
 
3244
  DBUG_ENTER("_ma_reenable_logging_for_table");
 
3245
 
 
3246
  if (share->now_transactional == share->base.born_transactional)
 
3247
    DBUG_RETURN(0);
 
3248
 
 
3249
  if ((share->now_transactional= share->base.born_transactional))
 
3250
  {
 
3251
    share->page_type= PAGECACHE_LSN_PAGE;
 
3252
 
 
3253
    /*
 
3254
      Copy state information that where updated while the table was used
 
3255
      in not transactional mode
 
3256
    */
 
3257
    _ma_copy_nontrans_state_information(info);
 
3258
 
 
3259
    if (flush_pages)
 
3260
    {
 
3261
      /*
 
3262
        We are going to change callbacks; if a page is flushed at this moment
 
3263
        this can cause race conditions, that's one reason to flush pages
 
3264
        now. Other reasons: a checkpoint could be running and miss pages. As
 
3265
        there are no REDOs for pages, them, bitmaps and the state also have to
 
3266
        be flushed and synced. Leaving non-dirty pages in cache is ok, when
 
3267
        they become dirty again they will have their type corrected.
 
3268
      */
 
3269
      if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
 
3270
                                FLUSH_KEEP, FLUSH_KEEP) ||
 
3271
          _ma_state_info_write(share, 1|4) ||
 
3272
          _ma_sync_table_files(info))
 
3273
        DBUG_RETURN(1);
 
3274
    }
 
3275
    else if (!maria_in_recovery)
 
3276
    {
 
3277
      /*
 
3278
        Except in Recovery, we mustn't leave dirty pages (see comments above).
 
3279
        Note that this does not verify that the state was flushed, but hey.
 
3280
      */
 
3281
      pagecache_file_no_dirty_page(share->pagecache, &info->dfile);
 
3282
      pagecache_file_no_dirty_page(share->pagecache, &share->kfile);
 
3283
    }
 
3284
    _ma_set_data_pagecache_callbacks(&info->dfile, share);
 
3285
    _ma_set_index_pagecache_callbacks(&share->kfile, share);
 
3286
    _ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share);
 
3287
    /*
 
3288
      info->trn was not changed in the disable/enable combo, so that it's
 
3289
      still usable in this kind of combination:
 
3290
      external_lock;
 
3291
      start_bulk_insert; # table is empty, disables logging
 
3292
      end_bulk_insert;   # enables logging
 
3293
      start_bulk_insert; # table is not empty, logging stays
 
3294
                         # so rows insertion needs the real trn.
 
3295
      as happens during row-based replication on the slave.
 
3296
    */
 
3297
  }
 
3298
  DBUG_RETURN(0);
 
3299
}
 
3300
 
 
3301
 
 
3302
static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
 
3303
{
 
3304
  static uint end_logno= FILENO_IMPOSSIBLE, percentage_printed= 0;
 
3305
  static ulong end_offset;
 
3306
  static ulonglong initial_remainder= ~(ulonglong) 0;
 
3307
 
 
3308
  uint cur_logno;
 
3309
  ulong cur_offset;
 
3310
  ulonglong local_remainder;
 
3311
  uint percentage_done;
 
3312
 
 
3313
  if (tracef == stdout)
 
3314
    return;
 
3315
  if (recovery_message_printed == REC_MSG_NONE)
 
3316
  {
 
3317
    print_preamble();
 
3318
    fprintf(stderr, "recovered pages: 0%%");
 
3319
    fflush(stderr);
 
3320
    procent_printed= 1;
 
3321
    recovery_message_printed= REC_MSG_REDO;
 
3322
  }
 
3323
  if (end_logno == FILENO_IMPOSSIBLE)
 
3324
  {
 
3325
    LSN end_addr= translog_get_horizon();
 
3326
    end_logno= LSN_FILE_NO(end_addr);
 
3327
    end_offset= LSN_OFFSET(end_addr);
 
3328
  }
 
3329
  cur_logno= LSN_FILE_NO(addr);
 
3330
  cur_offset= LSN_OFFSET(addr);
 
3331
  local_remainder= (cur_logno == end_logno) ? (end_offset - cur_offset) :
 
3332
    (((longlong)log_file_size) - cur_offset +
 
3333
     max(end_logno - cur_logno - 1, 0) * ((longlong)log_file_size) +
 
3334
     end_offset);
 
3335
  if (initial_remainder == (ulonglong)(-1))
 
3336
    initial_remainder= local_remainder;
 
3337
  percentage_done= (uint) ((initial_remainder - local_remainder) * ULL(100) /
 
3338
                           initial_remainder);
 
3339
  if ((percentage_done - percentage_printed) >= 10)
 
3340
  {
 
3341
    percentage_printed= percentage_done;
 
3342
    fprintf(stderr, " %u%%", percentage_done);
 
3343
    fflush(stderr);
 
3344
    procent_printed= 1;
 
3345
  }
 
3346
}
 
3347
 
 
3348
#ifdef MARIA_EXTERNAL_LOCKING
 
3349
#error Marias Checkpoint and Recovery are really not ready for it
 
3350
#endif
 
3351
 
 
3352
/*
 
3353
Recovery of the state :  how it works
 
3354
=====================================
 
3355
 
 
3356
Here we ignore Checkpoints for a start.
 
3357
 
 
3358
The state (MARIA_HA::MARIA_SHARE::MARIA_STATE_INFO) is updated in
 
3359
memory frequently (at least at every row write/update/delete) but goes
 
3360
to disk at few moments: maria_close() when closing the last open
 
3361
instance, and a few rare places like CHECK/REPAIR/ALTER
 
3362
(non-transactional tables also do it at maria_lock_database() but we
 
3363
needn't cover them here).
 
3364
 
 
3365
In case of crash, state on disk is likely to be older than what it was
 
3366
in memory, the REDO phase needs to recreate the state as it was in
 
3367
memory at the time of crash. When we say Recovery here we will always
 
3368
mean "REDO phase".
 
3369
 
 
3370
For example MARIA_STATUS_INFO::records (count of records). It is updated at
 
3371
the end of every row write/update/delete/delete_all. When Recovery sees the
 
3372
sign of such row operation (UNDO or REDO), it may need to update the records'
 
3373
count if that count does not reflect that operation (is older). How to know
 
3374
the age of the state compared to the log record: every time the state
 
3375
goes to disk at runtime, its member "is_of_horizon" is updated to the
 
3376
current end-of-log horizon. So Recovery just needs to compare is_of_horizon
 
3377
and the record's LSN to know if it should modify "records".
 
3378
 
 
3379
Other operations like ALTER TABLE DISABLE KEYS update the state but
 
3380
don't write log records, thus the REDO phase cannot repeat their
 
3381
effect on the state in case of crash. But we make them sync the state
 
3382
as soon as they have finished. This reduces the window for a problem.
 
3383
 
 
3384
It looks like only one thread at a time updates the state in memory or
 
3385
on disk. We assume that the upper level (normally MySQL) has protection
 
3386
against issuing HA_EXTRA_(FORCE_REOPEN|PREPARE_FOR_RENAME) so that these
 
3387
are not issued while there are any running transactions on the given table.
 
3388
If this is not done, we may write a corrupted state to disk.
 
3389
 
 
3390
With checkpoints
 
3391
================
 
3392
 
 
3393
Checkpoint module needs to read the state in memory and write it to
 
3394
disk. This may happen while some other thread is modifying the state
 
3395
in memory or on disk. Checkpoint thus may be reading changing data, it
 
3396
needs a mutex to not have it corrupted, and concurrent modifiers of
 
3397
the state need that mutex too for the same reason.
 
3398
"records" is modified for every row write/update/delete, we don't want
 
3399
to add a mutex lock/unlock there. So we re-use the mutex lock/unlock
 
3400
which is already present in these moments, namely the log's mutex which is
 
3401
taken when UNDO_ROW_INSERT|UPDATE|DELETE is written: we update "records" in
 
3402
under-log-mutex hooks when writing these records (thus "records" is
 
3403
not updated at the end of maria_write/update/delete() anymore).
 
3404
Thus Checkpoint takes the log's lock and can read "records" from
 
3405
memory an write it to disk and release log's lock.
 
3406
We however want to avoid having the disk write under the log's
 
3407
lock. So it has to be under another mutex, natural choice is
 
3408
intern_lock (as Checkpoint needs it anyway to read MARIA_SHARE::kfile,
 
3409
and as maria_close() takes it too). All state writes to disk are
 
3410
changed to be protected with intern_lock.
 
3411
So Checkpoint takes intern_lock, log's lock, reads "records" from
 
3412
memory, releases log's lock, updates is_of_horizon and writes "records" to
 
3413
disk, release intern_lock.
 
3414
In practice, not only "records" needs to be written but the full
 
3415
state. So, Checkpoint reads the full state from memory. Some other
 
3416
thread may at this moment be modifying in memory some pieces of the
 
3417
state which are not protected by the lock's log (see ma_extra.c
 
3418
HA_EXTRA_NO_KEYS), and Checkpoint would be reading a corrupted state
 
3419
from memory; to guard against that we extend the intern_lock-zone to
 
3420
changes done to the state in memory by HA_EXTRA_NO_KEYS et al, and
 
3421
also any change made in memory to create_rename_lsn/state_is_of_horizon.
 
3422
Last, we don't want in Checkpoint to do
 
3423
 log lock; read state from memory; release log lock;
 
3424
for each table, it may hold the log's lock too much in total.
 
3425
So, we instead do
 
3426
 log lock; read N states from memory; release log lock;
 
3427
Thus, the sequence above happens outside of any intern_lock.
 
3428
But this re-introduces the problem that some other thread may be changing the
 
3429
state in memory and on disk under intern_lock, without log's lock, like
 
3430
HA_EXTRA_NO_KEYS, while we read the N states. However, when Checkpoint later
 
3431
comes to handling the table under intern_lock, which is serialized with
 
3432
HA_EXTRA_NO_KEYS, it can see that is_of_horizon is higher then when the state
 
3433
was read from memory under log's lock, and thus can decide to not flush the
 
3434
obsolete state it has, knowing that the other thread flushed a more recent
 
3435
state already. If on the other hand is_of_horizon is not higher, the read
 
3436
state is current and can be flushed. So we have a per-table sequence:
 
3437
 lock intern_lock; test if is_of_horizon is higher than when we read the state
 
3438
 under log's lock; if no then flush the read state to disk.
 
3439
*/
 
3440
 
 
3441
/* some comments and pseudo-code which we keep for later */
 
3442
#if 0
 
3443
  /*
 
3444
    MikaelR suggests: support checkpoints during REDO phase too: do checkpoint
 
3445
    after a certain amount of log records have been executed. This helps
 
3446
    against repeated crashes. Those checkpoints could not be user-requested
 
3447
    (as engine is not communicating during the REDO phase), so they would be
 
3448
    automatic: this changes the original assumption that we don't write to the
 
3449
    log while in the REDO phase, but why not. How often should we checkpoint?
 
3450
  */
 
3451
 
 
3452
  /*
 
3453
    We want to have two steps:
 
3454
    engine->recover_with_max_memory();
 
3455
    next_engine->recover_with_max_memory();
 
3456
    engine->init_with_normal_memory();
 
3457
    next_engine->init_with_normal_memory();
 
3458
    So: in recover_with_max_memory() allocate a giant page cache, do REDO
 
3459
    phase, then all page cache is flushed and emptied and freed (only retain
 
3460
    small structures like TM): take full checkpoint, which is useful if
 
3461
    next engine crashes in its recovery the next second.
 
3462
    Destroy all shares (maria_close()), then at init_with_normal_memory() we
 
3463
    do this:
 
3464
  */
 
3465
 
 
3466
  /**** UNDO PHASE *****/
 
3467
 
 
3468
  /*
 
3469
    Launch one or more threads to do the background rollback. Don't wait for
 
3470
    them to complete their rollback (background rollback; for debugging, we
 
3471
    can have an option which waits). Set a counter (total_of_rollback_threads)
 
3472
    to the number of threads to lauch.
 
3473
 
 
3474
    Note that InnoDB's rollback-in-background works as long as InnoDB is the
 
3475
    last engine to recover, otherwise MySQL will refuse new connections until
 
3476
    the last engine has recovered so it's not "background" from the user's
 
3477
    point of view. InnoDB is near top of sys_table_types so all others
 
3478
    (e.g. BDB) recover after it... So it's really "online rollback" only if
 
3479
    InnoDB is the only engine.
 
3480
  */
 
3481
 
 
3482
  /* wake up delete/update handler */
 
3483
  /* tell the TM that it can now accept new transactions */
 
3484
 
 
3485
  /*
 
3486
    mark that checkpoint requests are now allowed.
 
3487
  */
 
3488
#endif