1
/* Copyright (C) 2007 MySQL AB & Sanja Belkin
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include "maria_def.h"
18
#include "ma_blockrec.h" /* for some constants and in-write hooks */
19
#include "ma_key_recover.h" /* For some in-write hooks */
20
#include "ma_checkpoint.h"
23
On Windows, neither my_open() nor my_sync() work for directories.
24
Also there is no need to flush filesystem changes ,i.e to sync()
28
#define sync_dir(A,B) 0
30
#define sync_dir(A,B) my_sync(A,B)
35
@brief Module which writes and reads to a transaction log
38
/* 0xFF can never be valid first byte of a chunk */
39
#define TRANSLOG_FILLER 0xFF
41
/* number of opened log files in the pagecache (should be at least 2) */
42
#define OPENED_FILES_NUM 3
43
#define CACHED_FILES_NUM 5
44
#define CACHED_FILES_NUM_DIRECT_SEARCH_LIMIT 7
45
#if CACHED_FILES_NUM > CACHED_FILES_NUM_DIRECT_SEARCH_LIMIT
50
/* transaction log file descriptor */
51
typedef struct st_translog_file
54
PAGECACHE_FILE handler;
55
my_bool was_recovered;
59
/* records buffer size (should be TRANSLOG_PAGE_SIZE * n) */
60
#define TRANSLOG_WRITE_BUFFER (1024*1024)
62
pagecache_read/write/inject() use bmove512() on their buffers so those must
63
be long-aligned, which we guarantee by using the type below:
68
uchar buffer[TRANSLOG_PAGE_SIZE];
69
} TRANSLOG_PAGE_SIZE_BUFF;
71
/* min chunk length */
72
#define TRANSLOG_MIN_CHUNK 3
74
Number of buffers used by loghandler
76
Should be at least 4, because one thread can block up to 2 buffers in
77
normal circumstances (less then half of one and full other, or just
78
switched one and other), But if we met end of the file in the middle and
79
have to switch buffer it will be 3. + 1 buffer for flushing/writing.
80
We have a bigger number here for higher concurrency and to make division
83
The number should be power of 2 to be fast.
85
#define TRANSLOG_BUFFERS_NO 8
86
/* number of bytes (+ header) which can be unused on first page in sequence */
87
#define TRANSLOG_MINCHUNK_CONTENT 1
88
/* version of log file */
89
#define TRANSLOG_VERSION_ID 10000 /* 1.00.00 */
91
#define TRANSLOG_PAGE_FLAGS 6 /* transaction log page flags offset */
93
/* Maximum length of compressed LSNs (the worst case of whole LSN storing) */
94
#define COMPRESSED_LSN_MAX_STORE_SIZE (2 + LSN_STORE_SIZE)
95
#define MAX_NUMBER_OF_LSNS_PER_RECORD 2
98
/* max lsn calculation for buffer */
99
#define BUFFER_MAX_LSN(B) \
100
((B)->last_lsn == LSN_IMPOSSIBLE ? (B)->prev_last_lsn : (B)->last_lsn)
102
/* log write buffer descriptor */
103
struct st_translog_buffer
106
Cache for current log. Comes first to be aligned for bmove512() in
109
uchar buffer[TRANSLOG_WRITE_BUFFER];
111
Maximum LSN of records which ends in this buffer (or IMPOSSIBLE_LSN
112
if no LSNs ends here)
115
/* last_lsn of previous buffer or IMPOSSIBLE_LSN if it is very first one */
117
/* This buffer offset in the file */
118
TRANSLOG_ADDRESS offset;
120
Next buffer offset in the file (it is not always offset + size,
121
in case of flush by LSN it can be offset + size - TRANSLOG_PAGE_SIZE)
123
TRANSLOG_ADDRESS next_buffer_offset;
125
How much is written (or will be written when copy_to_buffer_in_progress
126
become 0) to this buffer
128
translog_size_t size;
129
/* File handler for this buffer */
131
/* Threads which are waiting for buffer filling/freeing */
132
pthread_cond_t waiting_filling_buffer;
133
/* Number of records which are in copy progress */
134
uint copy_to_buffer_in_progress;
135
/* list of waiting buffer ready threads */
136
struct st_my_thread_var *waiting_flush;
138
Pointer on the buffer which overlap with this one (due to flush of
139
loghandler, the last page of that buffer is the same as the first page
140
of this buffer) and have to be written first (because contain old
141
content of page which present in both buffers)
143
struct st_translog_buffer *overlay;
148
Current buffer also lock the whole handler (if one want lock the handler
149
one should lock the current buffer).
151
Buffers are locked only in one direction (with overflow and beginning
152
from the first buffer). If we keep lock on buffer N we can lock only
153
buffer N+1 (never N-1).
155
One thread do not lock more then 2 buffer in a time, so to make dead
156
lock it should be N thread (where N equal number of buffers) takes one
157
buffer and try to lock next. But it is impossible because there is only
158
2 cases when thread take 2 buffers: 1) one thread finishes current
159
buffer (where horizon is) and start next (to which horizon moves). 2)
160
flush start from buffer after current (oldest) and go till the current
161
crabbing by buffer sequence. And there is only one flush in a moment
162
(they are serialised).
164
Because of above and number of buffers equal 5 we can't get dead lock (it is
165
impossible to get all 5 buffers locked simultaneously).
167
pthread_mutex_t mutex;
169
Some thread is going to close the buffer and it should be
170
done only by that thread
172
my_bool is_closing_buffer;
174
Version of the buffer increases every time buffer the buffer flushed.
175
With file and offset it allow detect buffer changes
181
struct st_buffer_cursor
183
/* pointer into the buffer */
186
struct st_translog_buffer *buffer;
187
/* How many bytes we wrote on the current page */
188
uint16 current_page_fill;
190
How many times we write the page on the disk during flushing process
191
(for sector protection).
193
uint16 write_counter;
194
/* previous write offset */
195
uint16 previous_offset;
196
/* Number of current buffer */
199
True if it is just filling buffer after advancing the pointer to
204
Is current page of the cursor already finished (sector protection
205
should be applied if it is needed)
211
typedef uint8 dirty_buffer_mask_t;
213
struct st_translog_descriptor
215
/* *** Parameters of the log handler *** */
217
/* Page cache for the log reads */
218
PAGECACHE *pagecache;
220
/* File open flags */
222
/* max size of one log size (for new logs creation) */
223
uint32 log_file_max_size;
224
uint32 server_version;
225
/* server ID (used for replication) */
227
/* Loghandler's buffer capacity in case of chunk 2 filling */
228
uint32 buffer_capacity_chunk_2;
230
Half of the buffer capacity in case of chunk 2 filling,
231
used to decide will we write a record in one group or many.
232
It is written to the variable just to avoid devision every
235
uint32 half_buffer_capacity_chunk_2;
236
/* Page overhead calculated by flags (whether CRC is enabled, etc) */
237
uint16 page_overhead;
239
Page capacity ("useful load") calculated by flags
240
(TRANSLOG_PAGE_SIZE - page_overhead-1)
242
uint16 page_capacity_chunk_2;
243
/* Path to the directory where we store log store files */
244
char directory[FN_REFLEN];
246
/* *** Current state of the log handler *** */
247
/* list of opened files */
248
DYNAMIC_ARRAY open_files;
249
/* min/max number of file in the array */
250
uint32 max_file, min_file;
251
/* the opened files list guard */
252
rw_lock_t open_files_lock;
255
File descriptor of the directory where we store log files for syncing
259
/* buffers for log writing */
260
struct st_translog_buffer buffers[TRANSLOG_BUFFERS_NO];
261
/* Mask where 1 in position N mean that buffer N is not flushed */
262
dirty_buffer_mask_t dirty_buffer_mask;
263
/* The above variable protection */
264
pthread_mutex_t dirty_buffer_mask_lock;
266
horizon - visible end of the log (here is absolute end of the log:
267
position where next chunk can start
269
TRANSLOG_ADDRESS horizon;
270
/* horizon buffer cursor */
271
struct st_buffer_cursor bc;
272
/* maximum LSN of the current (not finished) file */
276
Last flushed LSN (protected by log_flush_lock).
277
Pointers in the log ordered like this:
278
last_lsn_checked <= flushed <= sent_to_disk <= in_buffers_only <=
282
/* Last LSN sent to the disk (but maybe not written yet) */
284
/* Horizon from which log started after initialization */
285
TRANSLOG_ADDRESS log_start;
286
TRANSLOG_ADDRESS previous_flush_horizon;
287
/* All what is after this address is not sent to disk yet */
288
TRANSLOG_ADDRESS in_buffers_only;
289
/* protection of sent_to_disk and in_buffers_only */
290
pthread_mutex_t sent_to_disk_lock;
292
Protect flushed (see above) and for flush serialization (will
295
pthread_mutex_t log_flush_lock;
296
pthread_cond_t log_flush_cond;
298
/* Protects changing of headers of finished files (max_lsn) */
299
pthread_mutex_t file_header_lock;
302
Sorted array (with protection) of files where we started writing process
303
and so we can't give last LSN yet
305
pthread_mutex_t unfinished_files_lock;
306
DYNAMIC_ARRAY unfinished_files;
309
minimum number of still need file calculeted during last
312
uint32 min_need_file;
313
/* Purger data: minimum file in the log (or 0 if unknown) */
314
uint32 min_file_number;
315
/* Protect purger from many calls and it's data */
316
pthread_mutex_t purger_lock;
317
/* last low water mark checked */
318
LSN last_lsn_checked;
320
Must be set to 0 under loghandler lock every time a new LSN
323
my_bool is_everything_flushed;
324
/* True when flush pass is in progress */
325
my_bool flush_in_progress;
326
/* Next flush pass variables */
327
TRANSLOG_ADDRESS next_pass_max_lsn;
328
pthread_t max_lsn_requester;
331
static struct st_translog_descriptor log_descriptor;
333
ulong log_purge_type= TRANSLOG_PURGE_IMMIDIATE;
334
ulong log_file_size= TRANSLOG_FILE_SIZE;
335
ulong sync_log_dir= TRANSLOG_SYNC_DIR_NEWFILE;
337
/* Marker for end of log */
338
static uchar end_of_log= 0;
339
#define END_OF_LOG &end_of_log
341
enum enum_translog_status translog_status= TRANSLOG_UNINITED;
344
#define TRANSLOG_CHUNK_LSN 0x00 /* 0 chunk refer as LSN (head or tail */
345
#define TRANSLOG_CHUNK_FIXED (1 << 6) /* 1 (pseudo)fixed record (also LSN) */
346
#define TRANSLOG_CHUNK_NOHDR (2 << 6) /* 2 no head chunk (till page end) */
347
#define TRANSLOG_CHUNK_LNGTH (3 << 6) /* 3 chunk with chunk length */
348
#define TRANSLOG_CHUNK_TYPE (3 << 6) /* Mask to get chunk type */
349
#define TRANSLOG_REC_TYPE 0x3F /* Mask to get record type */
350
#define TRANSLOG_CHUNK_0_CONT 0x3F /* the type to mark chunk 0 continue */
352
/* compressed (relative) LSN constants */
353
#define TRANSLOG_CLSN_LEN_BITS 0xC0 /* Mask to get compressed LSN length */
356
#include <my_atomic.h>
357
/* an array that maps id of a MARIA_SHARE to this MARIA_SHARE */
358
static MARIA_SHARE **id_to_share= NULL;
359
/* lock for id_to_share */
360
static my_atomic_rwlock_t LOCK_id_to_share;
362
static my_bool translog_dummy_callback(uchar *page,
363
pgcache_page_no_t page_no,
365
static my_bool translog_page_validator(uchar *page,
366
pgcache_page_no_t page_no,
369
static my_bool translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner);
370
static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected);
371
LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon);
375
Initialize log_record_type_descriptors
378
LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES];
383
#define translog_buffer_lock_assert_owner(B) \
384
safe_mutex_assert_owner(&(B)->mutex)
385
#define translog_lock_assert_owner() \
386
safe_mutex_assert_owner(&log_descriptor.bc.buffer->mutex)
387
void translog_lock_handler_assert_owner()
389
translog_lock_assert_owner();
393
@brief check the description table validity
395
@param num how many records should be filled
398
static void check_translog_description_table(int num)
401
DBUG_ENTER("check_translog_description_table");
402
DBUG_PRINT("enter", ("last record: %d", num));
403
DBUG_ASSERT(num > 0);
404
/* last is reserved for extending the table */
405
DBUG_ASSERT(num < LOGREC_NUMBER_OF_TYPES - 1);
406
DBUG_ASSERT(log_record_type_descriptor[0].rclass == LOGRECTYPE_NOT_ALLOWED);
408
for (i= 0; i <= num; i++)
411
("record type: %d class: %d fixed: %u header: %u LSNs: %u "
413
i, log_record_type_descriptor[i].rclass,
414
(uint)log_record_type_descriptor[i].fixed_length,
415
(uint)log_record_type_descriptor[i].read_header_len,
416
(uint)log_record_type_descriptor[i].compressed_LSN,
417
log_record_type_descriptor[i].name));
418
switch (log_record_type_descriptor[i].rclass) {
419
case LOGRECTYPE_NOT_ALLOWED:
422
case LOGRECTYPE_VARIABLE_LENGTH:
423
DBUG_ASSERT(log_record_type_descriptor[i].fixed_length == 0);
424
DBUG_ASSERT((log_record_type_descriptor[i].compressed_LSN == 0) ||
425
((log_record_type_descriptor[i].compressed_LSN == 1) &&
426
(log_record_type_descriptor[i].read_header_len >=
428
((log_record_type_descriptor[i].compressed_LSN == 2) &&
429
(log_record_type_descriptor[i].read_header_len >=
430
LSN_STORE_SIZE * 2)));
432
case LOGRECTYPE_PSEUDOFIXEDLENGTH:
433
DBUG_ASSERT(log_record_type_descriptor[i].fixed_length ==
434
log_record_type_descriptor[i].read_header_len);
435
DBUG_ASSERT(log_record_type_descriptor[i].compressed_LSN > 0);
436
DBUG_ASSERT(log_record_type_descriptor[i].compressed_LSN <= 2);
438
case LOGRECTYPE_FIXEDLENGTH:
439
DBUG_ASSERT(log_record_type_descriptor[i].fixed_length ==
440
log_record_type_descriptor[i].read_header_len);
441
DBUG_ASSERT(log_record_type_descriptor[i].compressed_LSN == 0);
447
for (i= num + 1; i < LOGREC_NUMBER_OF_TYPES; i++)
449
DBUG_ASSERT(log_record_type_descriptor[i].rclass ==
450
LOGRECTYPE_NOT_ALLOWED);
455
#define translog_buffer_lock_assert_owner(B) {}
456
#define translog_lock_assert_owner() {}
459
static LOG_DESC INIT_LOGREC_RESERVED_FOR_CHUNKS23=
460
{LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0,
461
"reserved", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL };
463
static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_HEAD=
464
{LOGRECTYPE_VARIABLE_LENGTH, 0,
465
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL,
466
write_hook_for_redo, NULL, 0,
467
"redo_insert_row_head", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
469
static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_TAIL=
470
{LOGRECTYPE_VARIABLE_LENGTH, 0,
471
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL,
472
write_hook_for_redo, NULL, 0,
473
"redo_insert_row_tail", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
475
static LOG_DESC INIT_LOGREC_REDO_NEW_ROW_HEAD=
476
{LOGRECTYPE_VARIABLE_LENGTH, 0,
477
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL,
478
write_hook_for_redo, NULL, 0,
479
"redo_new_row_head", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
481
static LOG_DESC INIT_LOGREC_REDO_NEW_ROW_TAIL=
482
{LOGRECTYPE_VARIABLE_LENGTH, 0,
483
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL,
484
write_hook_for_redo, NULL, 0,
485
"redo_new_row_tail", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
487
static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_BLOBS=
488
{LOGRECTYPE_VARIABLE_LENGTH, 0, FILEID_STORE_SIZE, NULL,
489
write_hook_for_redo, NULL, 0,
490
"redo_insert_row_blobs", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
492
static LOG_DESC INIT_LOGREC_REDO_PURGE_ROW_HEAD=
493
{LOGRECTYPE_FIXEDLENGTH,
494
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
495
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
496
NULL, write_hook_for_redo, NULL, 0,
497
"redo_purge_row_head", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
499
static LOG_DESC INIT_LOGREC_REDO_PURGE_ROW_TAIL=
500
{LOGRECTYPE_FIXEDLENGTH,
501
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
502
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
503
NULL, write_hook_for_redo, NULL, 0,
504
"redo_purge_row_tail", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
506
static LOG_DESC INIT_LOGREC_REDO_FREE_BLOCKS=
507
{LOGRECTYPE_VARIABLE_LENGTH, 0,
508
FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE,
509
NULL, write_hook_for_redo, NULL, 0,
510
"redo_free_blocks", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
512
static LOG_DESC INIT_LOGREC_REDO_FREE_HEAD_OR_TAIL=
513
{LOGRECTYPE_FIXEDLENGTH,
514
FILEID_STORE_SIZE + PAGE_STORE_SIZE,
515
FILEID_STORE_SIZE + PAGE_STORE_SIZE,
516
NULL, write_hook_for_redo, NULL, 0,
517
"redo_free_head_or_tail", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
519
/* not yet used; for when we have versioning */
520
static LOG_DESC INIT_LOGREC_REDO_DELETE_ROW=
521
{LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, write_hook_for_redo, NULL, 0,
522
"redo_delete_row", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
524
/** @todo RECOVERY BUG unused, remove? */
525
static LOG_DESC INIT_LOGREC_REDO_UPDATE_ROW_HEAD=
526
{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, write_hook_for_redo, NULL, 0,
527
"redo_update_row_head", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
529
static LOG_DESC INIT_LOGREC_REDO_INDEX=
530
{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, write_hook_for_redo, NULL, 0,
531
"redo_index", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
533
static LOG_DESC INIT_LOGREC_REDO_INDEX_NEW_PAGE=
534
{LOGRECTYPE_VARIABLE_LENGTH, 0,
535
FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE + 1,
536
NULL, write_hook_for_redo, NULL, 0,
537
"redo_index_new_page", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
539
static LOG_DESC INIT_LOGREC_REDO_INDEX_FREE_PAGE=
540
{LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2,
541
FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2,
542
NULL, write_hook_for_redo, NULL, 0,
543
"redo_index_free_page", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
545
static LOG_DESC INIT_LOGREC_REDO_UNDELETE_ROW=
546
{LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, write_hook_for_redo, NULL, 0,
547
"redo_undelete_row", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
549
static LOG_DESC INIT_LOGREC_CLR_END=
550
{LOGRECTYPE_VARIABLE_LENGTH, 0, LSN_STORE_SIZE + FILEID_STORE_SIZE +
551
CLR_TYPE_STORE_SIZE, NULL, write_hook_for_clr_end, NULL, 1,
552
"clr_end", LOGREC_LAST_IN_GROUP, NULL, NULL};
554
static LOG_DESC INIT_LOGREC_PURGE_END=
555
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1,
556
"purge_end", LOGREC_LAST_IN_GROUP, NULL, NULL};
558
static LOG_DESC INIT_LOGREC_UNDO_ROW_INSERT=
559
{LOGRECTYPE_VARIABLE_LENGTH, 0,
560
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
561
NULL, write_hook_for_undo_row_insert, NULL, 1,
562
"undo_row_insert", LOGREC_LAST_IN_GROUP, NULL, NULL};
564
static LOG_DESC INIT_LOGREC_UNDO_ROW_DELETE=
565
{LOGRECTYPE_VARIABLE_LENGTH, 0,
566
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
567
NULL, write_hook_for_undo_row_delete, NULL, 1,
568
"undo_row_delete", LOGREC_LAST_IN_GROUP, NULL, NULL};
570
static LOG_DESC INIT_LOGREC_UNDO_ROW_UPDATE=
571
{LOGRECTYPE_VARIABLE_LENGTH, 0,
572
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
573
NULL, write_hook_for_undo_row_update, NULL, 1,
574
"undo_row_update", LOGREC_LAST_IN_GROUP, NULL, NULL};
576
static LOG_DESC INIT_LOGREC_UNDO_KEY_INSERT=
577
{LOGRECTYPE_VARIABLE_LENGTH, 0,
578
LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE,
579
NULL, write_hook_for_undo_key_insert, NULL, 1,
580
"undo_key_insert", LOGREC_LAST_IN_GROUP, NULL, NULL};
582
/* This will never be in the log, only in the clr */
583
static LOG_DESC INIT_LOGREC_UNDO_KEY_INSERT_WITH_ROOT=
584
{LOGRECTYPE_VARIABLE_LENGTH, 0,
585
LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE + PAGE_STORE_SIZE,
586
NULL, write_hook_for_undo_key, NULL, 1,
587
"undo_key_insert_with_root", LOGREC_LAST_IN_GROUP, NULL, NULL};
589
static LOG_DESC INIT_LOGREC_UNDO_KEY_DELETE=
590
{LOGRECTYPE_VARIABLE_LENGTH, 0,
591
LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE,
592
NULL, write_hook_for_undo_key_delete, NULL, 1,
593
"undo_key_delete", LOGREC_LAST_IN_GROUP, NULL, NULL};
595
static LOG_DESC INIT_LOGREC_UNDO_KEY_DELETE_WITH_ROOT=
596
{LOGRECTYPE_VARIABLE_LENGTH, 0,
597
LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE + PAGE_STORE_SIZE,
598
NULL, write_hook_for_undo_key_delete, NULL, 1,
599
"undo_key_delete_with_root", LOGREC_LAST_IN_GROUP, NULL, NULL};
601
static LOG_DESC INIT_LOGREC_PREPARE=
602
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0,
603
"prepare", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
605
static LOG_DESC INIT_LOGREC_PREPARE_WITH_UNDO_PURGE=
606
{LOGRECTYPE_VARIABLE_LENGTH, 0, LSN_STORE_SIZE, NULL, NULL, NULL, 1,
607
"prepare_with_undo_purge", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
609
static LOG_DESC INIT_LOGREC_COMMIT=
610
{LOGRECTYPE_FIXEDLENGTH, 0, 0, NULL,
611
NULL, NULL, 0, "commit", LOGREC_IS_GROUP_ITSELF, NULL,
614
static LOG_DESC INIT_LOGREC_COMMIT_WITH_UNDO_PURGE=
615
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1,
616
"commit_with_undo_purge", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
618
static LOG_DESC INIT_LOGREC_CHECKPOINT=
619
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0,
620
"checkpoint", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
622
static LOG_DESC INIT_LOGREC_REDO_CREATE_TABLE=
623
{LOGRECTYPE_VARIABLE_LENGTH, 0, 1 + 2, NULL, NULL, NULL, 0,
624
"redo_create_table", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
626
static LOG_DESC INIT_LOGREC_REDO_RENAME_TABLE=
627
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0,
628
"redo_rename_table", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
630
static LOG_DESC INIT_LOGREC_REDO_DROP_TABLE=
631
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0,
632
"redo_drop_table", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
634
static LOG_DESC INIT_LOGREC_REDO_DELETE_ALL=
635
{LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE, FILEID_STORE_SIZE,
636
NULL, write_hook_for_redo_delete_all, NULL, 0,
637
"redo_delete_all", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
639
static LOG_DESC INIT_LOGREC_REDO_REPAIR_TABLE=
640
{LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE + 8 + 8, FILEID_STORE_SIZE + 8 + 8,
642
"redo_repair_table", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
644
static LOG_DESC INIT_LOGREC_FILE_ID=
645
{LOGRECTYPE_VARIABLE_LENGTH, 0, 2, NULL, write_hook_for_file_id, NULL, 0,
646
"file_id", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
648
static LOG_DESC INIT_LOGREC_LONG_TRANSACTION_ID=
649
{LOGRECTYPE_FIXEDLENGTH, 6, 6, NULL, NULL, NULL, 0,
650
"long_transaction_id", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
652
static LOG_DESC INIT_LOGREC_INCOMPLETE_LOG=
653
{LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE, FILEID_STORE_SIZE,
655
"incomplete_log", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
657
static LOG_DESC INIT_LOGREC_INCOMPLETE_GROUP=
658
{LOGRECTYPE_FIXEDLENGTH, 0, 0,
660
"incomplete_group", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
662
static LOG_DESC INIT_LOGREC_UNDO_BULK_INSERT=
663
{LOGRECTYPE_VARIABLE_LENGTH, 0,
664
LSN_STORE_SIZE + FILEID_STORE_SIZE,
665
NULL, write_hook_for_undo_bulk_insert, NULL, 1,
666
"undo_bulk_insert", LOGREC_LAST_IN_GROUP, NULL, NULL};
668
static LOG_DESC INIT_LOGREC_REDO_BITMAP_NEW_PAGE=
669
{LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2,
670
FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2,
672
"redo_create_bitmap", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
674
const myf log_write_flags= MY_WME | MY_NABP | MY_WAIT_IF_FULL;
676
void translog_table_init()
679
log_record_type_descriptor[LOGREC_RESERVED_FOR_CHUNKS23]=
680
INIT_LOGREC_RESERVED_FOR_CHUNKS23;
681
log_record_type_descriptor[LOGREC_REDO_INSERT_ROW_HEAD]=
682
INIT_LOGREC_REDO_INSERT_ROW_HEAD;
683
log_record_type_descriptor[LOGREC_REDO_INSERT_ROW_TAIL]=
684
INIT_LOGREC_REDO_INSERT_ROW_TAIL;
685
log_record_type_descriptor[LOGREC_REDO_NEW_ROW_HEAD]=
686
INIT_LOGREC_REDO_NEW_ROW_HEAD;
687
log_record_type_descriptor[LOGREC_REDO_NEW_ROW_TAIL]=
688
INIT_LOGREC_REDO_NEW_ROW_TAIL;
689
log_record_type_descriptor[LOGREC_REDO_INSERT_ROW_BLOBS]=
690
INIT_LOGREC_REDO_INSERT_ROW_BLOBS;
691
log_record_type_descriptor[LOGREC_REDO_PURGE_ROW_HEAD]=
692
INIT_LOGREC_REDO_PURGE_ROW_HEAD;
693
log_record_type_descriptor[LOGREC_REDO_PURGE_ROW_TAIL]=
694
INIT_LOGREC_REDO_PURGE_ROW_TAIL;
695
log_record_type_descriptor[LOGREC_REDO_FREE_BLOCKS]=
696
INIT_LOGREC_REDO_FREE_BLOCKS;
697
log_record_type_descriptor[LOGREC_REDO_FREE_HEAD_OR_TAIL]=
698
INIT_LOGREC_REDO_FREE_HEAD_OR_TAIL;
699
log_record_type_descriptor[LOGREC_REDO_DELETE_ROW]=
700
INIT_LOGREC_REDO_DELETE_ROW;
701
log_record_type_descriptor[LOGREC_REDO_UPDATE_ROW_HEAD]=
702
INIT_LOGREC_REDO_UPDATE_ROW_HEAD;
703
log_record_type_descriptor[LOGREC_REDO_INDEX]=
704
INIT_LOGREC_REDO_INDEX;
705
log_record_type_descriptor[LOGREC_REDO_INDEX_NEW_PAGE]=
706
INIT_LOGREC_REDO_INDEX_NEW_PAGE;
707
log_record_type_descriptor[LOGREC_REDO_INDEX_FREE_PAGE]=
708
INIT_LOGREC_REDO_INDEX_FREE_PAGE;
709
log_record_type_descriptor[LOGREC_REDO_UNDELETE_ROW]=
710
INIT_LOGREC_REDO_UNDELETE_ROW;
711
log_record_type_descriptor[LOGREC_CLR_END]=
713
log_record_type_descriptor[LOGREC_PURGE_END]=
714
INIT_LOGREC_PURGE_END;
715
log_record_type_descriptor[LOGREC_UNDO_ROW_INSERT]=
716
INIT_LOGREC_UNDO_ROW_INSERT;
717
log_record_type_descriptor[LOGREC_UNDO_ROW_DELETE]=
718
INIT_LOGREC_UNDO_ROW_DELETE;
719
log_record_type_descriptor[LOGREC_UNDO_ROW_UPDATE]=
720
INIT_LOGREC_UNDO_ROW_UPDATE;
721
log_record_type_descriptor[LOGREC_UNDO_KEY_INSERT]=
722
INIT_LOGREC_UNDO_KEY_INSERT;
723
log_record_type_descriptor[LOGREC_UNDO_KEY_INSERT_WITH_ROOT]=
724
INIT_LOGREC_UNDO_KEY_INSERT_WITH_ROOT;
725
log_record_type_descriptor[LOGREC_UNDO_KEY_DELETE]=
726
INIT_LOGREC_UNDO_KEY_DELETE;
727
log_record_type_descriptor[LOGREC_UNDO_KEY_DELETE_WITH_ROOT]=
728
INIT_LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
729
log_record_type_descriptor[LOGREC_PREPARE]=
731
log_record_type_descriptor[LOGREC_PREPARE_WITH_UNDO_PURGE]=
732
INIT_LOGREC_PREPARE_WITH_UNDO_PURGE;
733
log_record_type_descriptor[LOGREC_COMMIT]=
735
log_record_type_descriptor[LOGREC_COMMIT_WITH_UNDO_PURGE]=
736
INIT_LOGREC_COMMIT_WITH_UNDO_PURGE;
737
log_record_type_descriptor[LOGREC_CHECKPOINT]=
738
INIT_LOGREC_CHECKPOINT;
739
log_record_type_descriptor[LOGREC_REDO_CREATE_TABLE]=
740
INIT_LOGREC_REDO_CREATE_TABLE;
741
log_record_type_descriptor[LOGREC_REDO_RENAME_TABLE]=
742
INIT_LOGREC_REDO_RENAME_TABLE;
743
log_record_type_descriptor[LOGREC_REDO_DROP_TABLE]=
744
INIT_LOGREC_REDO_DROP_TABLE;
745
log_record_type_descriptor[LOGREC_REDO_DELETE_ALL]=
746
INIT_LOGREC_REDO_DELETE_ALL;
747
log_record_type_descriptor[LOGREC_REDO_REPAIR_TABLE]=
748
INIT_LOGREC_REDO_REPAIR_TABLE;
749
log_record_type_descriptor[LOGREC_FILE_ID]=
751
log_record_type_descriptor[LOGREC_LONG_TRANSACTION_ID]=
752
INIT_LOGREC_LONG_TRANSACTION_ID;
753
log_record_type_descriptor[LOGREC_INCOMPLETE_LOG]=
754
INIT_LOGREC_INCOMPLETE_LOG;
755
log_record_type_descriptor[LOGREC_INCOMPLETE_GROUP]=
756
INIT_LOGREC_INCOMPLETE_GROUP;
757
log_record_type_descriptor[LOGREC_UNDO_BULK_INSERT]=
758
INIT_LOGREC_UNDO_BULK_INSERT;
759
log_record_type_descriptor[LOGREC_REDO_BITMAP_NEW_PAGE]=
760
INIT_LOGREC_REDO_BITMAP_NEW_PAGE;
761
for (i= LOGREC_FIRST_FREE; i < LOGREC_NUMBER_OF_TYPES; i++)
762
log_record_type_descriptor[i].rclass= LOGRECTYPE_NOT_ALLOWED;
764
check_translog_description_table(LOGREC_FIRST_FREE -1);
769
/* all possible flags page overheads */
770
static uint page_overhead[TRANSLOG_FLAGS_NUM];
772
typedef struct st_translog_validator_data
774
TRANSLOG_ADDRESS *addr;
775
my_bool was_recovered;
776
} TRANSLOG_VALIDATOR_DATA;
780
Check cursor/buffer consistence
783
translog_check_cursor
784
cursor cursor which will be checked
787
static void translog_check_cursor(struct st_buffer_cursor *cursor
788
__attribute__((unused)))
790
DBUG_ASSERT(cursor->chaser ||
791
((ulong) (cursor->ptr - cursor->buffer->buffer) ==
792
cursor->buffer->size));
793
DBUG_ASSERT(cursor->buffer->buffer_no == cursor->buffer_no);
794
DBUG_ASSERT((cursor->ptr -cursor->buffer->buffer) %TRANSLOG_PAGE_SIZE ==
795
cursor->current_page_fill % TRANSLOG_PAGE_SIZE);
796
DBUG_ASSERT(cursor->current_page_fill <= TRANSLOG_PAGE_SIZE);
801
@brief switch the loghandler in read only mode in case of write error
804
void translog_stop_writing()
806
DBUG_ENTER("translog_stop_writing");
807
DBUG_PRINT("error", ("errno: %d my_errno: %d", errno, my_errno));
808
translog_status= (translog_status == TRANSLOG_SHUTDOWN ?
811
log_descriptor.is_everything_flushed= 1;
812
log_descriptor.open_flags= O_BINARY | O_RDONLY;
819
@brief Get file name of the log by log number
821
@param file_no Number of the log we want to open
822
@param path Pointer to buffer where file name will be
823
stored (must be FN_REFLEN bytes at least)
825
@return pointer to path
828
char *translog_filename_by_fileno(uint32 file_no, char *path)
832
DBUG_ENTER("translog_filename_by_fileno");
833
DBUG_ASSERT(file_no <= 0xfffffff);
835
/* log_descriptor.directory is already formated */
836
end= strxmov(path, log_descriptor.directory, "maria_log.0000000", NullS);
837
length= (uint) (int10_to_str(file_no, buff, 10) - buff);
838
strmov(end - length +1, buff);
840
DBUG_PRINT("info", ("Path: '%s' path: 0x%lx", path, (ulong) path));
846
@brief Create log file with given number without cache
848
@param file_no Number of the log we want to open
851
retval # file descriptor number
854
static File create_logfile_by_number_no_cache(uint32 file_no)
857
char path[FN_REFLEN];
858
DBUG_ENTER("create_logfile_by_number_no_cache");
860
if (translog_status != TRANSLOG_OK)
863
/* TODO: add O_DIRECT to open flags (when buffer is aligned) */
864
if ((file= my_create(translog_filename_by_fileno(file_no, path),
865
0, O_BINARY | O_RDWR, MYF(MY_WME))) < 0)
867
DBUG_PRINT("error", ("Error %d during creating file '%s'", errno, path));
868
translog_stop_writing();
871
if (sync_log_dir >= TRANSLOG_SYNC_DIR_NEWFILE &&
872
sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)))
874
DBUG_PRINT("error", ("Error %d during syncing directory '%s'",
875
errno, log_descriptor.directory));
876
translog_stop_writing();
879
DBUG_PRINT("info", ("File: '%s' handler: %d", path, file));
884
@brief Open (not create) log file with given number without cache
886
@param file_no Number of the log we want to open
889
retval # file descriptor number
892
static File open_logfile_by_number_no_cache(uint32 file_no)
895
char path[FN_REFLEN];
896
DBUG_ENTER("open_logfile_by_number_no_cache");
898
/* TODO: add O_DIRECT to open flags (when buffer is aligned) */
899
/* TODO: use my_create() */
900
if ((file= my_open(translog_filename_by_fileno(file_no, path),
901
log_descriptor.open_flags,
904
DBUG_PRINT("error", ("Error %d during opening file '%s'", errno, path));
907
DBUG_PRINT("info", ("File: '%s' handler: %d", path, file));
913
@brief get file descriptor by given number using cache
915
@param file_no Number of the log we want to open
917
retval # file descriptor
918
retval NULL file is not opened
921
static TRANSLOG_FILE *get_logfile_by_number(uint32 file_no)
924
DBUG_ENTER("get_logfile_by_number");
925
rw_rdlock(&log_descriptor.open_files_lock);
926
if (log_descriptor.max_file - file_no >=
927
log_descriptor.open_files.elements)
929
DBUG_PRINT("info", ("File #%u is not opened", file_no));
930
rw_unlock(&log_descriptor.open_files_lock);
933
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
934
log_descriptor.open_files.elements);
935
DBUG_ASSERT(log_descriptor.max_file >= file_no);
936
DBUG_ASSERT(log_descriptor.min_file <= file_no);
938
file= *dynamic_element(&log_descriptor.open_files,
939
log_descriptor.max_file - file_no, TRANSLOG_FILE **);
940
rw_unlock(&log_descriptor.open_files_lock);
941
DBUG_PRINT("info", ("File 0x%lx File no: %lu, File handler: %d",
942
(ulong)file, (ulong)file_no,
943
(file ? file->handler.file : -1)));
944
DBUG_ASSERT(!file || file->number == file_no);
950
@brief get current file descriptor
952
retval # file descriptor
955
static TRANSLOG_FILE *get_current_logfile()
958
rw_rdlock(&log_descriptor.open_files_lock);
959
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
960
log_descriptor.open_files.elements);
961
file= *dynamic_element(&log_descriptor.open_files, 0, TRANSLOG_FILE **);
962
rw_unlock(&log_descriptor.open_files_lock);
966
uchar NEAR maria_trans_file_magic[]=
967
{ (uchar) 254, (uchar) 254, (uchar) 11, '\001', 'M', 'A', 'R', 'I', 'A',
969
#define LOG_HEADER_DATA_SIZE (sizeof(maria_trans_file_magic) + \
970
8 + 4 + 4 + 4 + 2 + 3 + \
975
Write log file page header in the just opened new log file
978
translog_write_file_header();
981
First page is just a marker page; We don't store any real log data in it.
988
static my_bool translog_write_file_header()
992
uchar page_buff[TRANSLOG_PAGE_SIZE], *page= page_buff;
994
DBUG_ENTER("translog_write_file_header");
997
memcpy(page, maria_trans_file_magic, sizeof(maria_trans_file_magic));
998
page+= sizeof(maria_trans_file_magic);
1000
timestamp= my_getsystime();
1001
int8store(page, timestamp);
1004
int4store(page, TRANSLOG_VERSION_ID);
1006
/* mysql version (MYSQL_VERSION_ID) */
1007
int4store(page, log_descriptor.server_version);
1010
int4store(page, log_descriptor.server_id);
1012
/* loghandler page_size */
1013
int2store(page, TRANSLOG_PAGE_SIZE - 1);
1016
int3store(page, LSN_FILE_NO(log_descriptor.horizon));
1018
lsn_store(page, LSN_IMPOSSIBLE);
1019
page+= LSN_STORE_SIZE;
1020
memset(page, TRANSLOG_FILLER, sizeof(page_buff) - (page- page_buff));
1022
file= get_current_logfile();
1023
rc= my_pwrite(file->handler.file, page_buff, sizeof(page_buff), 0,
1024
log_write_flags) != 0;
1026
Dropping the flag in such way can make false alarm: signalling than the
1027
file in not sync when it is sync, but the situation is quite rare and
1028
protections with mutexes give much more overhead to the whole engine
1035
@brief write the new LSN on the given file header
1037
@param file The file descriptor
1038
@param lsn That LSN which should be written
1044
static my_bool translog_max_lsn_to_header(File file, LSN lsn)
1046
uchar lsn_buff[LSN_STORE_SIZE];
1047
DBUG_ENTER("translog_max_lsn_to_header");
1048
DBUG_PRINT("enter", ("File descriptor: %ld "
1051
LSN_IN_PARTS(lsn)));
1053
lsn_store(lsn_buff, lsn);
1055
DBUG_RETURN(my_pwrite(file, lsn_buff,
1057
(LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE),
1058
log_write_flags) != 0 ||
1059
my_sync(file, MYF(MY_WME)) != 0);
1064
Information from transaction log file header
1067
typedef struct st_loghandler_file_info
1070
LSN_IMPOSSIBLE for current file (not finished file).
1071
Maximum LSN of the record which parts stored in the
1075
ulonglong timestamp; /* Time stamp */
1076
ulong maria_version; /* Version of maria loghandler */
1077
ulong mysql_version; /* Version of mysql server */
1078
ulong server_id; /* Server ID */
1079
ulong page_size; /* Loghandler page size */
1080
ulong file_number; /* Number of the file (from the file header) */
1081
} LOGHANDLER_FILE_INFO;
1084
@brief Extract hander file information from loghandler file page
1086
@param desc header information descriptor to be filled with information
1087
@param page_buff buffer with the page content
1090
static void translog_interpret_file_header(LOGHANDLER_FILE_INFO *desc,
1095
ptr= page_buff + sizeof(maria_trans_file_magic);
1096
desc->timestamp= uint8korr(ptr);
1098
desc->maria_version= uint4korr(ptr);
1100
desc->mysql_version= uint4korr(ptr);
1102
desc->server_id= uint4korr(ptr + 4);
1104
desc->page_size= uint2korr(ptr) + 1;
1106
desc->file_number= uint3korr(ptr);
1108
desc->max_lsn= lsn_korr(ptr);
1113
@brief Read hander file information from loghandler file
1115
@param desc header information descriptor to be filled with information
1116
@param file file descriptor to read
1122
my_bool translog_read_file_header(LOGHANDLER_FILE_INFO *desc, File file)
1124
uchar page_buff[LOG_HEADER_DATA_SIZE];
1125
DBUG_ENTER("translog_read_file_header");
1127
if (my_pread(file, page_buff,
1128
sizeof(page_buff), 0, MYF(MY_FNABP | MY_WME)))
1130
DBUG_PRINT("info", ("log read fail error: %d", my_errno));
1133
translog_interpret_file_header(desc, page_buff);
1134
DBUG_PRINT("info", ("timestamp: %llu maria ver: %lu mysql ver: %lu "
1135
"server id %lu page size %lu file number %lu "
1136
"max lsn: (%lu,0x%lx)",
1137
(ulonglong) desc->timestamp,
1138
(ulong) desc->maria_version,
1139
(ulong) desc->mysql_version,
1140
(ulong) desc->server_id,
1141
desc->page_size, (ulong) desc->file_number,
1142
LSN_IN_PARTS(desc->max_lsn)));
1148
@brief set the lsn to the files from_file - to_file if it is greater
1149
then written in the file
1151
@param from_file first file number (min)
1152
@param to_file last file number (max)
1153
@param lsn the lsn for writing
1154
@param is_locked true if current thread locked the log handler
1160
static my_bool translog_set_lsn_for_files(uint32 from_file, uint32 to_file,
1161
LSN lsn, my_bool is_locked)
1164
DBUG_ENTER("translog_set_lsn_for_files");
1165
DBUG_PRINT("enter", ("From: %lu to: %lu lsn: (%lu,0x%lx) locked: %d",
1166
(ulong) from_file, (ulong) to_file,
1169
DBUG_ASSERT(from_file <= to_file);
1170
DBUG_ASSERT(from_file > 0); /* we have not file 0 */
1172
/* Checks the current file (not finished yet file) */
1175
if (to_file == (uint32) LSN_FILE_NO(log_descriptor.horizon))
1177
if (likely(cmp_translog_addr(lsn, log_descriptor.max_lsn) > 0))
1178
log_descriptor.max_lsn= lsn;
1184
/* Checks finished files if they are */
1185
pthread_mutex_lock(&log_descriptor.file_header_lock);
1186
for (file= from_file; file <= to_file; file++)
1188
LOGHANDLER_FILE_INFO info;
1189
File fd= open_logfile_by_number_no_cache(file);
1191
((translog_read_file_header(&info, fd) ||
1192
(cmp_translog_addr(lsn, info.max_lsn) > 0 &&
1193
translog_max_lsn_to_header(fd, lsn))) |
1194
my_close(fd, MYF(MY_WME))))
1196
translog_stop_writing();
1200
pthread_mutex_unlock(&log_descriptor.file_header_lock);
1206
/* descriptor of file in unfinished_files */
1207
struct st_file_counter
1209
uint32 file; /* file number */
1210
uint32 counter; /* counter for started writes */
1215
@brief mark file "in progress" (for multi-group records)
1217
@param file log file number
1220
static void translog_mark_file_unfinished(uint32 file)
1223
struct st_file_counter fc, *fc_ptr;
1225
DBUG_ENTER("translog_mark_file_unfinished");
1226
DBUG_PRINT("enter", ("file: %lu", (ulong) file));
1228
fc.file= file; fc.counter= 1;
1229
pthread_mutex_lock(&log_descriptor.unfinished_files_lock);
1231
if (log_descriptor.unfinished_files.elements == 0)
1233
insert_dynamic(&log_descriptor.unfinished_files, (uchar*) &fc);
1234
DBUG_PRINT("info", ("The first element inserted"));
1238
for (place= log_descriptor.unfinished_files.elements - 1;
1242
fc_ptr= dynamic_element(&log_descriptor.unfinished_files,
1243
place, struct st_file_counter *);
1244
if (fc_ptr->file <= file)
1248
if (place >= 0 && fc_ptr->file == file)
1251
DBUG_PRINT("info", ("counter increased"));
1255
if (place == (int)log_descriptor.unfinished_files.elements)
1257
insert_dynamic(&log_descriptor.unfinished_files, (uchar*) &fc);
1258
DBUG_PRINT("info", ("The last element inserted"));
1261
/* shift and assign new element */
1262
insert_dynamic(&log_descriptor.unfinished_files,
1264
dynamic_element(&log_descriptor.unfinished_files,
1265
log_descriptor.unfinished_files.elements- 1,
1266
struct st_file_counter *));
1267
for(i= log_descriptor.unfinished_files.elements - 1; i > place; i--)
1269
/* we do not use set_dynamic() to avoid unneeded checks */
1270
memcpy(dynamic_element(&log_descriptor.unfinished_files,
1271
i, struct st_file_counter *),
1272
dynamic_element(&log_descriptor.unfinished_files,
1273
i + 1, struct st_file_counter *),
1274
sizeof(struct st_file_counter));
1276
memcpy(dynamic_element(&log_descriptor.unfinished_files,
1277
place + 1, struct st_file_counter *),
1278
&fc, sizeof(struct st_file_counter));
1280
pthread_mutex_unlock(&log_descriptor.unfinished_files_lock);
1286
@brief remove file mark "in progress" (for multi-group records)
1288
@param file log file number
1291
static void translog_mark_file_finished(uint32 file)
1294
struct st_file_counter *fc_ptr;
1295
DBUG_ENTER("translog_mark_file_finished");
1296
DBUG_PRINT("enter", ("file: %lu", (ulong) file));
1300
pthread_mutex_lock(&log_descriptor.unfinished_files_lock);
1302
DBUG_ASSERT(log_descriptor.unfinished_files.elements > 0);
1304
i < (int) log_descriptor.unfinished_files.elements;
1307
fc_ptr= dynamic_element(&log_descriptor.unfinished_files,
1308
i, struct st_file_counter *);
1309
if (fc_ptr->file == file)
1314
DBUG_ASSERT(i < (int) log_descriptor.unfinished_files.elements);
1316
if (! --fc_ptr->counter)
1317
delete_dynamic_element(&log_descriptor.unfinished_files, i);
1318
pthread_mutex_unlock(&log_descriptor.unfinished_files_lock);
1324
@brief get max LSN of the record which parts stored in this file
1326
@param file file number
1328
@return requested LSN or LSN_IMPOSSIBLE/LSN_ERROR
1329
@retval LSN_IMPOSSIBLE File is still not finished
1330
@retval LSN_ERROR Error opening file
1331
@retval # LSN of the record which parts stored in this file
1334
LSN translog_get_file_max_lsn_stored(uint32 file)
1336
uint32 limit= FILENO_IMPOSSIBLE;
1337
DBUG_ENTER("translog_get_file_max_lsn_stored");
1338
DBUG_PRINT("enter", ("file: %lu", (ulong)file));
1339
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
1340
translog_status == TRANSLOG_READONLY);
1342
pthread_mutex_lock(&log_descriptor.unfinished_files_lock);
1344
/* find file with minimum file number "in progress" */
1345
if (log_descriptor.unfinished_files.elements > 0)
1347
struct st_file_counter *fc_ptr;
1348
fc_ptr= dynamic_element(&log_descriptor.unfinished_files,
1349
0, struct st_file_counter *);
1350
limit= fc_ptr->file; /* minimal file number "in progress" */
1352
pthread_mutex_unlock(&log_descriptor.unfinished_files_lock);
1355
if there is no "in progress file" then unfinished file is in progress
1358
if (limit == FILENO_IMPOSSIBLE)
1360
TRANSLOG_ADDRESS horizon= translog_get_horizon();
1361
limit= LSN_FILE_NO(horizon);
1366
DBUG_PRINT("info", ("The file in in progress"));
1367
DBUG_RETURN(LSN_IMPOSSIBLE);
1371
LOGHANDLER_FILE_INFO info;
1372
File fd= open_logfile_by_number_no_cache(file);
1374
(translog_read_file_header(&info, fd) | my_close(fd, MYF(MY_WME))))
1376
DBUG_PRINT("error", ("Can't read file header"));
1377
DBUG_RETURN(LSN_ERROR);
1379
DBUG_PRINT("info", ("Max lsn: (%lu,0x%lx)",
1380
LSN_IN_PARTS(info.max_lsn)));
1381
DBUG_RETURN(info.max_lsn);
1386
Initialize transaction log file buffer
1389
translog_buffer_init()
1390
buffer The buffer to initialize
1397
static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
1399
DBUG_ENTER("translog_buffer_init");
1400
buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
1401
DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx",
1403
/* This Buffer File */
1406
/* cache for current log */
1407
memset(buffer->buffer, TRANSLOG_FILLER, TRANSLOG_WRITE_BUFFER);
1410
/* cond of thread which is waiting for buffer filling */
1411
if (pthread_cond_init(&buffer->waiting_filling_buffer, 0))
1413
/* Number of records which are in copy progress */
1414
buffer->copy_to_buffer_in_progress= 0;
1415
/* list of waiting buffer ready threads */
1416
buffer->waiting_flush= 0;
1417
/* lock for the buffer. Current buffer also lock the handler */
1418
if (pthread_mutex_init(&buffer->mutex, MY_MUTEX_INIT_FAST))
1420
buffer->is_closing_buffer= 0;
1427
@brief close transaction log file by descriptor
1429
@param file pagegecache file descriptor reference
1431
@return Operation status
1436
static my_bool translog_close_log_file(TRANSLOG_FILE *file)
1439
flush_pagecache_blocks(log_descriptor.pagecache, &file->handler,
1442
Sync file when we close it
1443
TODO: sync only we have changed the log
1446
rc= my_sync(file->handler.file, MYF(MY_WME));
1447
rc|= my_close(file->handler.file, MYF(MY_WME));
1448
my_free(file, MYF(0));
1454
@brief Dummy function for write failure (the log to not use
1458
void translog_dummy_write_failure(uchar *data __attribute__((unused)))
1465
@brief Initializes TRANSLOG_FILE structure
1467
@param file reference on the file to initialize
1468
@param number file number
1469
@param is_sync is file synced on disk
1472
static void translog_file_init(TRANSLOG_FILE *file, uint32 number,
1475
pagecache_file_init(file->handler, &translog_page_validator,
1476
&translog_dummy_callback,
1477
&translog_dummy_write_failure,
1478
maria_flush_log_for_page_none, file);
1479
file->number= number;
1480
file->was_recovered= 0;
1481
file->is_sync= is_sync;
1486
@brief Create and fill header of new file.
1488
@note the caller must call it right after it has increased
1489
log_descriptor.horizon to the new file
1490
(log_descriptor.horizon+= LSN_ONE_FILE)
1497
static my_bool translog_create_new_file()
1499
TRANSLOG_FILE *file= (TRANSLOG_FILE*)my_malloc(sizeof(TRANSLOG_FILE),
1502
TRANSLOG_FILE *old= get_current_logfile();
1503
uint32 file_no= LSN_FILE_NO(log_descriptor.horizon);
1504
DBUG_ENTER("translog_create_new_file");
1510
Writes max_lsn to the file header before finishing it (there is no need
1511
to lock file header buffer because it is still unfinished file, so only
1512
one thread can finish the file and nobody interested of LSN of current
1513
(unfinished) file, because no one can purge it).
1515
if (translog_max_lsn_to_header(old->handler.file, log_descriptor.max_lsn))
1518
rw_wrlock(&log_descriptor.open_files_lock);
1519
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
1520
log_descriptor.open_files.elements);
1521
DBUG_ASSERT(file_no == log_descriptor.max_file + 1);
1522
if (allocate_dynamic(&log_descriptor.open_files,
1523
log_descriptor.max_file - log_descriptor.min_file + 2))
1525
if ((file->handler.file=
1526
create_logfile_by_number_no_cache(file_no)) == -1)
1528
translog_file_init(file, file_no, 0);
1530
/* this call just expand the array */
1531
insert_dynamic(&log_descriptor.open_files, (uchar*)&file);
1532
log_descriptor.max_file++;
1534
char *start= (char*) dynamic_element(&log_descriptor.open_files, 0,
1536
memmove(start + sizeof(TRANSLOG_FILE*), start,
1537
sizeof(TRANSLOG_FILE*) *
1538
(log_descriptor.max_file - log_descriptor.min_file + 1 - 1));
1540
/* can't fail we because we expanded array */
1541
set_dynamic(&log_descriptor.open_files, (uchar*)&file, 0);
1542
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
1543
log_descriptor.open_files.elements);
1544
rw_unlock(&log_descriptor.open_files_lock);
1546
DBUG_PRINT("info", ("file_no: %lu", (ulong)file_no));
1548
if (translog_write_file_header())
1551
if (ma_control_file_write_and_force(last_checkpoint_lsn, file_no,
1552
max_trid_in_control_file,
1555
translog_stop_writing();
1562
rw_unlock(&log_descriptor.open_files_lock);
1564
translog_stop_writing();
1570
@brief Locks the loghandler buffer.
1572
@param buffer This buffer which should be locked
1574
@note See comment before buffer 'mutex' variable.
1580
static void translog_buffer_lock(struct st_translog_buffer *buffer)
1582
DBUG_ENTER("translog_buffer_lock");
1584
("Lock buffer #%u: (0x%lx)", (uint) buffer->buffer_no,
1586
pthread_mutex_lock(&buffer->mutex);
1592
Unlock the loghandler buffer
1595
translog_buffer_unlock()
1596
buffer This buffer which should be unlocked
1603
static void translog_buffer_unlock(struct st_translog_buffer *buffer)
1605
DBUG_ENTER("translog_buffer_unlock");
1606
DBUG_PRINT("enter", ("Unlock buffer... #%u (0x%lx)",
1607
(uint) buffer->buffer_no, (ulong) buffer));
1609
pthread_mutex_unlock(&buffer->mutex);
1615
Write a header on the page
1618
translog_new_page_header()
1619
horizon Where to write the page
1620
cursor Where to write the page
1623
- space for page header should be checked before
1626
static uchar translog_sector_random;
1628
static void translog_new_page_header(TRANSLOG_ADDRESS *horizon,
1629
struct st_buffer_cursor *cursor)
1633
DBUG_ENTER("translog_new_page_header");
1634
DBUG_ASSERT(cursor->ptr);
1636
cursor->protected= 0;
1640
int3store(ptr, LSN_OFFSET(*horizon) / TRANSLOG_PAGE_SIZE);
1643
int3store(ptr, LSN_FILE_NO(*horizon));
1645
DBUG_ASSERT(TRANSLOG_PAGE_FLAGS == (ptr - cursor->ptr));
1646
cursor->ptr[TRANSLOG_PAGE_FLAGS]= (uchar) log_descriptor.flags;
1648
if (log_descriptor.flags & TRANSLOG_PAGE_CRC)
1651
DBUG_PRINT("info", ("write 0x11223344 CRC to (%lu,0x%lx)",
1652
LSN_IN_PARTS(*horizon)));
1653
/* This will be overwritten by real CRC; This is just for debugging */
1654
int4store(ptr, 0x11223344);
1656
/* CRC will be put when page is finished */
1659
if (log_descriptor.flags & TRANSLOG_SECTOR_PROTECTION)
1662
translog_sector_randmo works like "random" values producer because
1663
it is enough to have such "random" for this purpose and it will
1664
not interfere with higher level pseudo random value generator
1666
ptr[0]= translog_sector_random++;
1667
ptr+= TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE;
1670
uint len= (ptr - cursor->ptr);
1671
(*horizon)+= len; /* increasing the offset part of the address */
1672
cursor->current_page_fill= len;
1673
if (!cursor->chaser)
1674
cursor->buffer->size+= len;
1677
DBUG_PRINT("info", ("NewP buffer #%u: 0x%lx chaser: %d Size: %lu (%lu) "
1678
"Horizon: (%lu,0x%lx)",
1679
(uint) cursor->buffer->buffer_no, (ulong) cursor->buffer,
1680
cursor->chaser, (ulong) cursor->buffer->size,
1681
(ulong) (cursor->ptr - cursor->buffer->buffer),
1682
LSN_IN_PARTS(*horizon)));
1683
translog_check_cursor(cursor);
1689
Put sector protection on the page image
1692
translog_put_sector_protection()
1693
page reference on the page content
1694
cursor cursor of the buffer
1697
We put a sector protection on all following sectors on the page,
1698
except the first sector that is protected by page header.
1701
static void translog_put_sector_protection(uchar *page,
1702
struct st_buffer_cursor *cursor)
1704
uchar *table= page + log_descriptor.page_overhead -
1705
TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE;
1707
uint16 last_protected_sector= ((cursor->previous_offset - 1) /
1708
DISK_DRIVE_SECTOR_SIZE);
1709
uint16 start_sector= cursor->previous_offset / DISK_DRIVE_SECTOR_SIZE;
1710
uint8 value= table[0] + cursor->write_counter;
1711
DBUG_ENTER("translog_put_sector_protection");
1713
if (start_sector == 0)
1715
/* First sector is protected by file & page numbers in the page header. */
1719
DBUG_PRINT("enter", ("Write counter:%u value:%u offset:%u, "
1720
"last protected:%u start sector:%u",
1721
(uint) cursor->write_counter,
1723
(uint) cursor->previous_offset,
1724
(uint) last_protected_sector, (uint) start_sector));
1725
if (last_protected_sector == start_sector)
1727
i= last_protected_sector;
1728
offset= last_protected_sector * DISK_DRIVE_SECTOR_SIZE;
1729
/* restore data, because we modified sector which was protected */
1730
if (offset < cursor->previous_offset)
1731
page[offset]= table[i];
1733
for (i= start_sector, offset= start_sector * DISK_DRIVE_SECTOR_SIZE;
1734
i < TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE;
1735
i++, (offset+= DISK_DRIVE_SECTOR_SIZE))
1737
DBUG_PRINT("info", ("sector:%u offset:%u data 0x%x",
1738
i, offset, (uint) page[offset]));
1739
table[i]= page[offset];
1740
page[offset]= value;
1741
DBUG_PRINT("info", ("sector:%u offset:%u data 0x%x",
1742
i, offset, (uint) page[offset]));
1749
Calculate CRC32 of given area
1753
area Pointer of the area beginning
1754
length The Area length
1760
static uint32 translog_crc(uchar *area, uint length)
1762
DBUG_ENTER("translog_crc");
1763
DBUG_RETURN(crc32(0L, (unsigned char*) area, length));
1768
Finish current page with zeros
1771
translog_finish_page()
1772
horizon \ horizon & buffer pointers
1776
static void translog_finish_page(TRANSLOG_ADDRESS *horizon,
1777
struct st_buffer_cursor *cursor)
1779
uint16 left= TRANSLOG_PAGE_SIZE - cursor->current_page_fill;
1780
uchar *page= cursor->ptr - cursor->current_page_fill;
1781
DBUG_ENTER("translog_finish_page");
1782
DBUG_PRINT("enter", ("Buffer: #%u 0x%lx "
1783
"Buffer addr: (%lu,0x%lx) "
1784
"Page addr: (%lu,0x%lx) "
1785
"size:%lu (%lu) Pg:%u left:%u",
1786
(uint) cursor->buffer_no, (ulong) cursor->buffer,
1787
LSN_IN_PARTS(cursor->buffer->offset),
1788
(ulong) LSN_FILE_NO(*horizon),
1789
(ulong) (LSN_OFFSET(*horizon) -
1790
cursor->current_page_fill),
1791
(ulong) cursor->buffer->size,
1792
(ulong) (cursor->ptr -cursor->buffer->buffer),
1793
(uint) cursor->current_page_fill, (uint) left));
1794
DBUG_ASSERT(LSN_FILE_NO(*horizon) == LSN_FILE_NO(cursor->buffer->offset));
1795
translog_check_cursor(cursor);
1796
if (cursor->protected)
1798
DBUG_PRINT("info", ("Already protected and finished"));
1801
cursor->protected= 1;
1803
DBUG_ASSERT(left < TRANSLOG_PAGE_SIZE);
1806
DBUG_PRINT("info", ("left: %u", (uint) left));
1807
memset(cursor->ptr, TRANSLOG_FILLER, left);
1809
(*horizon)+= left; /* offset increasing */
1810
if (!cursor->chaser)
1811
cursor->buffer->size+= left;
1812
/* We are finishing the page so reset the counter */
1813
cursor->current_page_fill= 0;
1814
DBUG_PRINT("info", ("Finish Page buffer #%u: 0x%lx "
1815
"chaser: %d Size: %lu (%lu)",
1816
(uint) cursor->buffer->buffer_no,
1817
(ulong) cursor->buffer, cursor->chaser,
1818
(ulong) cursor->buffer->size,
1819
(ulong) (cursor->ptr - cursor->buffer->buffer)));
1820
translog_check_cursor(cursor);
1823
When we are finishing the page other thread might not finish the page
1824
header yet (in case if we started from the middle of the page) so we
1825
have to read log_descriptor.flags but not the flags from the page.
1827
if (log_descriptor.flags & TRANSLOG_SECTOR_PROTECTION)
1829
translog_put_sector_protection(page, cursor);
1830
DBUG_PRINT("info", ("drop write_counter"));
1831
cursor->write_counter= 0;
1832
cursor->previous_offset= 0;
1834
if (log_descriptor.flags & TRANSLOG_PAGE_CRC)
1836
uint32 crc= translog_crc(page + log_descriptor.page_overhead,
1837
TRANSLOG_PAGE_SIZE -
1838
log_descriptor.page_overhead);
1839
DBUG_PRINT("info", ("CRC: %lx", (ulong) crc));
1840
/* We have page number, file number and flag before crc */
1841
int4store(page + 3 + 3 + 1, crc);
1848
@brief Wait until all threads have finished closing this buffer.
1850
@param buffer This buffer should be check
1853
static void translog_wait_for_closing(struct st_translog_buffer *buffer)
1855
DBUG_ENTER("translog_wait_for_closing");
1856
DBUG_PRINT("enter", ("Buffer #%u 0x%lx copies in progress: %u "
1857
"is closing %u File: %d size: %lu",
1858
(uint) buffer->buffer_no, (ulong) buffer,
1859
(uint) buffer->copy_to_buffer_in_progress,
1860
(uint) buffer->is_closing_buffer,
1861
(buffer->file ? buffer->file->handler.file : -1),
1862
(ulong) buffer->size));
1863
translog_buffer_lock_assert_owner(buffer);
1865
while (buffer->is_closing_buffer)
1867
DBUG_PRINT("info", ("wait for writers... buffer: #%u 0x%lx",
1868
(uint) buffer->buffer_no, (ulong) buffer));
1869
DBUG_ASSERT(buffer->file != NULL);
1870
pthread_cond_wait(&buffer->waiting_filling_buffer, &buffer->mutex);
1871
DBUG_PRINT("info", ("wait for writers done buffer: #%u 0x%lx",
1872
(uint) buffer->buffer_no, (ulong) buffer));
1880
@brief Wait until all threads have finished filling this buffer.
1882
@param buffer This buffer should be check
1885
static void translog_wait_for_writers(struct st_translog_buffer *buffer)
1887
DBUG_ENTER("translog_wait_for_writers");
1888
DBUG_PRINT("enter", ("Buffer #%u 0x%lx copies in progress: %u "
1889
"is closing %u File: %d size: %lu",
1890
(uint) buffer->buffer_no, (ulong) buffer,
1891
(uint) buffer->copy_to_buffer_in_progress,
1892
(uint) buffer->is_closing_buffer,
1893
(buffer->file ? buffer->file->handler.file : -1),
1894
(ulong) buffer->size));
1895
translog_buffer_lock_assert_owner(buffer);
1897
while (buffer->copy_to_buffer_in_progress)
1899
DBUG_PRINT("info", ("wait for writers... buffer: #%u 0x%lx",
1900
(uint) buffer->buffer_no, (ulong) buffer));
1901
DBUG_ASSERT(buffer->file != NULL);
1902
pthread_cond_wait(&buffer->waiting_filling_buffer, &buffer->mutex);
1903
DBUG_PRINT("info", ("wait for writers done buffer: #%u 0x%lx",
1904
(uint) buffer->buffer_no, (ulong) buffer));
1913
Wait for buffer to become free
1916
translog_wait_for_buffer_free()
1917
buffer The buffer we are waiting for
1920
- this buffer should be locked
1923
static void translog_wait_for_buffer_free(struct st_translog_buffer *buffer)
1925
TRANSLOG_ADDRESS offset= buffer->offset;
1926
TRANSLOG_FILE *file= buffer->file;
1927
uint8 ver= buffer->ver;
1928
DBUG_ENTER("translog_wait_for_buffer_free");
1929
DBUG_PRINT("enter", ("Buffer #%u 0x%lx copies in progress: %u "
1930
"is closing %u File: %d size: %lu",
1931
(uint) buffer->buffer_no, (ulong) buffer,
1932
(uint) buffer->copy_to_buffer_in_progress,
1933
(uint) buffer->is_closing_buffer,
1934
(buffer->file ? buffer->file->handler.file : -1),
1935
(ulong) buffer->size));
1937
translog_wait_for_writers(buffer);
1939
if (offset != buffer->offset || file != buffer->file || ver != buffer->ver)
1940
DBUG_VOID_RETURN; /* the buffer if already freed */
1942
while (buffer->file != NULL)
1944
DBUG_PRINT("info", ("wait for writers... buffer: #%u 0x%lx",
1945
(uint) buffer->buffer_no, (ulong) buffer));
1946
pthread_cond_wait(&buffer->waiting_filling_buffer, &buffer->mutex);
1947
DBUG_PRINT("info", ("wait for writers done. buffer: #%u 0x%lx",
1948
(uint) buffer->buffer_no, (ulong) buffer));
1950
DBUG_ASSERT(buffer->copy_to_buffer_in_progress == 0);
1956
Initialize the cursor for a buffer
1959
translog_cursor_init()
1962
buffer_no Number of buffer
1965
static void translog_cursor_init(struct st_buffer_cursor *cursor,
1966
struct st_translog_buffer *buffer,
1969
DBUG_ENTER("translog_cursor_init");
1970
cursor->ptr= buffer->buffer;
1971
cursor->buffer= buffer;
1972
cursor->buffer_no= buffer_no;
1973
cursor->current_page_fill= 0;
1974
cursor->chaser= (cursor != &log_descriptor.bc);
1975
cursor->write_counter= 0;
1976
cursor->previous_offset= 0;
1977
cursor->protected= 0;
1983
@brief Initialize buffer for the current file, and a cursor for this buffer.
1985
@param buffer The buffer
1986
@param cursor It's cursor
1987
@param buffer_no Number of buffer
1990
static void translog_start_buffer(struct st_translog_buffer *buffer,
1991
struct st_buffer_cursor *cursor,
1994
DBUG_ENTER("translog_start_buffer");
1996
("Assign buffer: #%u (0x%lx) offset: 0x%lx(%lu)",
1997
(uint) buffer->buffer_no, (ulong) buffer,
1998
(ulong) LSN_OFFSET(log_descriptor.horizon),
1999
(ulong) LSN_OFFSET(log_descriptor.horizon)));
2000
DBUG_ASSERT(buffer_no == buffer->buffer_no);
2001
buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
2002
DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx",
2004
buffer->offset= log_descriptor.horizon;
2005
buffer->next_buffer_offset= LSN_IMPOSSIBLE;
2006
buffer->file= get_current_logfile();
2009
translog_cursor_init(cursor, buffer, buffer_no);
2010
DBUG_PRINT("info", ("file: #%ld (%d) init cursor #%u: 0x%lx "
2011
"chaser: %d Size: %lu (%lu)",
2012
(long) (buffer->file ? buffer->file->number : 0),
2013
(buffer->file ? buffer->file->handler.file : -1),
2014
(uint) cursor->buffer->buffer_no, (ulong) cursor->buffer,
2015
cursor->chaser, (ulong) cursor->buffer->size,
2016
(ulong) (cursor->ptr - cursor->buffer->buffer)));
2017
translog_check_cursor(cursor);
2018
pthread_mutex_lock(&log_descriptor.dirty_buffer_mask_lock);
2019
log_descriptor.dirty_buffer_mask|= (1 << buffer->buffer_no);
2020
pthread_mutex_unlock(&log_descriptor.dirty_buffer_mask_lock);
2027
@brief Switch to the next buffer in a chain.
2029
@param horizon \ Pointers on current position in file and buffer
2031
@param new_file Also start new file
2034
- loghandler should be locked
2035
- after return new and old buffer still are locked
2041
static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
2042
struct st_buffer_cursor *cursor,
2045
uint old_buffer_no= cursor->buffer_no;
2046
uint new_buffer_no= (old_buffer_no + 1) % TRANSLOG_BUFFERS_NO;
2047
struct st_translog_buffer *new_buffer= log_descriptor.buffers + new_buffer_no;
2048
my_bool chasing= cursor->chaser;
2049
DBUG_ENTER("translog_buffer_next");
2051
DBUG_PRINT("info", ("horizon: (%lu,0x%lx) chasing: %d",
2052
LSN_IN_PARTS(log_descriptor.horizon), chasing));
2054
DBUG_ASSERT(cmp_translog_addr(log_descriptor.horizon, *horizon) >= 0);
2056
translog_finish_page(horizon, cursor);
2060
translog_buffer_lock(new_buffer);
2063
TRANSLOG_ADDRESS offset= new_buffer->offset;
2064
TRANSLOG_FILE *file= new_buffer->file;
2065
uint8 ver= new_buffer->ver;
2066
translog_lock_assert_owner();
2068
translog_wait_for_buffer_free(new_buffer);
2070
/* We keep the handler locked so nobody can start this new buffer */
2071
DBUG_ASSERT(offset == new_buffer->offset && new_buffer->file == NULL &&
2072
(file == NULL ? ver : (uint8)(ver + 1)) == new_buffer->ver);
2077
DBUG_ASSERT(new_buffer->file != NULL);
2081
/* move the horizon to the next file and its header page */
2082
(*horizon)+= LSN_ONE_FILE;
2083
(*horizon)= LSN_REPLACE_OFFSET(*horizon, TRANSLOG_PAGE_SIZE);
2084
if (!chasing && translog_create_new_file())
2090
/* prepare next page */
2092
translog_cursor_init(cursor, new_buffer, new_buffer_no);
2095
translog_lock_assert_owner();
2096
translog_start_buffer(new_buffer, cursor, new_buffer_no);
2098
log_descriptor.buffers[old_buffer_no].next_buffer_offset= new_buffer->offset;
2099
new_buffer->prev_last_lsn=
2100
BUFFER_MAX_LSN(log_descriptor.buffers + old_buffer_no);
2101
DBUG_PRINT("info", ("prev_last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
2102
LSN_IN_PARTS(new_buffer->prev_last_lsn),
2103
(ulong) new_buffer));
2104
translog_new_page_header(horizon, cursor);
2110
Sets max LSN sent to file, and address from which data is only in the buffer
2113
translog_set_sent_to_disk()
2115
in_buffers to assign to in_buffers_only
2117
TODO: use atomic operations if possible (64bit architectures?)
2120
static void translog_set_sent_to_disk(LSN lsn, TRANSLOG_ADDRESS in_buffers)
2122
DBUG_ENTER("translog_set_sent_to_disk");
2123
pthread_mutex_lock(&log_descriptor.sent_to_disk_lock);
2124
DBUG_PRINT("enter", ("lsn: (%lu,0x%lx) in_buffers: (%lu,0x%lx) "
2125
"in_buffers_only: (%lu,0x%lx) start: (%lu,0x%lx) "
2126
"sent_to_disk: (%lu,0x%lx)",
2128
LSN_IN_PARTS(in_buffers),
2129
LSN_IN_PARTS(log_descriptor.log_start),
2130
LSN_IN_PARTS(log_descriptor.in_buffers_only),
2131
LSN_IN_PARTS(log_descriptor.sent_to_disk)));
2133
We write sequentially (first part of following assert) but we rewrite
2134
the same page in case we started mysql and shut it down immediately
2135
(second part of the following assert)
2137
DBUG_ASSERT(cmp_translog_addr(lsn, log_descriptor.sent_to_disk) >= 0 ||
2138
cmp_translog_addr(lsn, log_descriptor.log_start) < 0);
2139
log_descriptor.sent_to_disk= lsn;
2140
/* LSN_IMPOSSIBLE == 0 => it will work for very first time */
2141
if (cmp_translog_addr(in_buffers, log_descriptor.in_buffers_only) > 0)
2143
log_descriptor.in_buffers_only= in_buffers;
2144
DBUG_PRINT("info", ("set new in_buffers_only"));
2146
pthread_mutex_unlock(&log_descriptor.sent_to_disk_lock);
2152
Sets address from which data is only in the buffer
2155
translog_set_only_in_buffers()
2157
in_buffers to assign to in_buffers_only
2160
static void translog_set_only_in_buffers(TRANSLOG_ADDRESS in_buffers)
2162
DBUG_ENTER("translog_set_only_in_buffers");
2163
pthread_mutex_lock(&log_descriptor.sent_to_disk_lock);
2164
DBUG_PRINT("enter", ("in_buffers: (%lu,0x%lx) "
2165
"in_buffers_only: (%lu,0x%lx)",
2166
LSN_IN_PARTS(in_buffers),
2167
LSN_IN_PARTS(log_descriptor.in_buffers_only)));
2168
/* LSN_IMPOSSIBLE == 0 => it will work for very first time */
2169
if (cmp_translog_addr(in_buffers, log_descriptor.in_buffers_only) > 0)
2171
if (translog_status != TRANSLOG_OK)
2173
log_descriptor.in_buffers_only= in_buffers;
2174
DBUG_PRINT("info", ("set new in_buffers_only"));
2176
pthread_mutex_unlock(&log_descriptor.sent_to_disk_lock);
2182
Gets address from which data is only in the buffer
2185
translog_only_in_buffers()
2188
address from which data is only in the buffer
2191
static TRANSLOG_ADDRESS translog_only_in_buffers()
2193
register TRANSLOG_ADDRESS addr;
2194
DBUG_ENTER("translog_only_in_buffers");
2195
pthread_mutex_lock(&log_descriptor.sent_to_disk_lock);
2196
addr= log_descriptor.in_buffers_only;
2197
pthread_mutex_unlock(&log_descriptor.sent_to_disk_lock);
2203
Get max LSN sent to file
2206
translog_get_sent_to_disk()
2209
max LSN send to file
2212
static LSN translog_get_sent_to_disk()
2215
DBUG_ENTER("translog_get_sent_to_disk");
2216
pthread_mutex_lock(&log_descriptor.sent_to_disk_lock);
2217
lsn= log_descriptor.sent_to_disk;
2218
DBUG_PRINT("info", ("sent to disk up to (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
2219
pthread_mutex_unlock(&log_descriptor.sent_to_disk_lock);
2225
Get first chunk address on the given page
2228
translog_get_first_chunk_offset()
2229
page The page where to find first chunk
2235
static my_bool translog_get_first_chunk_offset(uchar *page)
2237
DBUG_ENTER("translog_get_first_chunk_offset");
2238
DBUG_ASSERT(page[TRANSLOG_PAGE_FLAGS] < TRANSLOG_FLAGS_NUM);
2239
DBUG_RETURN(page_overhead[page[TRANSLOG_PAGE_FLAGS]]);
2244
Write coded length of record
2247
translog_write_variable_record_1group_code_len
2248
dst Destination buffer pointer
2249
length Length which should be coded
2250
header_len Calculated total header length
2254
translog_write_variable_record_1group_code_len(uchar *dst,
2255
translog_size_t length,
2258
switch (header_len) {
2259
case 6: /* (5 + 1) */
2260
DBUG_ASSERT(length <= 250);
2261
*dst= (uint8) length;
2263
case 8: /* (5 + 3) */
2264
DBUG_ASSERT(length <= 0xFFFF);
2266
int2store(dst + 1, length);
2268
case 9: /* (5 + 4) */
2269
DBUG_ASSERT(length <= (ulong) 0xFFFFFF);
2271
int3store(dst + 1, length);
2273
case 10: /* (5 + 5) */
2275
int4store(dst + 1, length);
2285
Decode record data length and advance given pointer to the next field
2288
translog_variable_record_1group_decode_len()
2289
src The pointer to the pointer to the length beginning
2295
static translog_size_t translog_variable_record_1group_decode_len(uchar **src)
2297
uint8 first= (uint8) (**src);
2301
return (uint2korr((*src) - 2));
2304
return (uint3korr((*src) - 3));
2307
return (uint4korr((*src) - 4));
2310
DBUG_ASSERT(0); /* reserved for future use */
2320
Get total length of this chunk (not only body)
2323
translog_get_total_chunk_length()
2324
page The page where chunk placed
2325
offset Offset of the chunk on this place
2328
total length of the chunk
2331
static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset)
2333
DBUG_ENTER("translog_get_total_chunk_length");
2334
switch (page[offset] & TRANSLOG_CHUNK_TYPE) {
2335
case TRANSLOG_CHUNK_LSN:
2337
/* 0 chunk referred as LSN (head or tail) */
2338
translog_size_t rec_len;
2339
uchar *start= page + offset;
2340
uchar *ptr= start + 1 + 2; /* chunk type and short trid */
2341
uint16 chunk_len, header_len, page_rest;
2342
DBUG_PRINT("info", ("TRANSLOG_CHUNK_LSN"));
2343
rec_len= translog_variable_record_1group_decode_len(&ptr);
2344
chunk_len= uint2korr(ptr);
2345
header_len= (uint16) (ptr -start) + 2;
2346
DBUG_PRINT("info", ("rec len: %lu chunk len: %u header len: %u",
2347
(ulong) rec_len, (uint) chunk_len, (uint) header_len));
2350
DBUG_PRINT("info", ("chunk len: %u + %u = %u",
2351
(uint) header_len, (uint) chunk_len,
2352
(uint) (chunk_len + header_len)));
2353
DBUG_RETURN(chunk_len + header_len);
2355
page_rest= TRANSLOG_PAGE_SIZE - offset;
2356
DBUG_PRINT("info", ("page_rest %u", (uint) page_rest));
2357
if (rec_len + header_len < page_rest)
2358
DBUG_RETURN(rec_len + header_len);
2359
DBUG_RETURN(page_rest);
2361
case TRANSLOG_CHUNK_FIXED:
2364
uint type= page[offset] & TRANSLOG_REC_TYPE;
2367
/* 1 (pseudo)fixed record (also LSN) */
2368
DBUG_PRINT("info", ("TRANSLOG_CHUNK_FIXED"));
2369
DBUG_ASSERT(log_record_type_descriptor[type].rclass ==
2370
LOGRECTYPE_FIXEDLENGTH ||
2371
log_record_type_descriptor[type].rclass ==
2372
LOGRECTYPE_PSEUDOFIXEDLENGTH);
2373
if (log_record_type_descriptor[type].rclass == LOGRECTYPE_FIXEDLENGTH)
2376
("Fixed length: %u",
2377
(uint) (log_record_type_descriptor[type].fixed_length + 3)));
2378
DBUG_RETURN(log_record_type_descriptor[type].fixed_length + 3);
2381
ptr= page + offset + 3; /* first compressed LSN */
2382
length= log_record_type_descriptor[type].fixed_length + 3;
2383
for (i= 0; i < log_record_type_descriptor[type].compressed_LSN; i++)
2385
/* first 2 bits is length - 2 */
2386
uint len= (((uint8) (*ptr)) >> 6) + 2;
2387
if (ptr[0] == 0 && ((uint8) ptr[1]) == 1)
2388
len+= LSN_STORE_SIZE; /* case of full LSN storing */
2390
/* subtract saved bytes */
2391
length-= (LSN_STORE_SIZE - len);
2393
DBUG_PRINT("info", ("Pseudo-fixed length: %u", length));
2394
DBUG_RETURN(length);
2396
case TRANSLOG_CHUNK_NOHDR:
2397
/* 2 no header chunk (till page end) */
2398
DBUG_PRINT("info", ("TRANSLOG_CHUNK_NOHDR length: %u",
2399
(uint) (TRANSLOG_PAGE_SIZE - offset)));
2400
DBUG_RETURN(TRANSLOG_PAGE_SIZE - offset);
2401
case TRANSLOG_CHUNK_LNGTH: /* 3 chunk with chunk length */
2402
DBUG_PRINT("info", ("TRANSLOG_CHUNK_LNGTH"));
2403
DBUG_ASSERT(TRANSLOG_PAGE_SIZE - offset >= 3);
2404
DBUG_PRINT("info", ("length: %u", uint2korr(page + offset + 1) + 3));
2405
DBUG_RETURN(uint2korr(page + offset + 1) + 3);
2417
translog_buffer_flush()
2418
buffer This buffer should be flushed
2425
static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
2428
TRANSLOG_ADDRESS offset= buffer->offset;
2429
TRANSLOG_FILE *file= buffer->file;
2430
uint8 ver= buffer->ver;
2431
DBUG_ENTER("translog_buffer_flush");
2433
("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu",
2434
(uint) buffer->buffer_no, (ulong) buffer,
2435
buffer->file->handler.file,
2436
LSN_IN_PARTS(buffer->offset),
2437
(ulong) buffer->size));
2438
translog_buffer_lock_assert_owner(buffer);
2440
if (buffer->file == NULL)
2443
translog_wait_for_writers(buffer);
2445
if (buffer->file != file || buffer->offset != offset || buffer->ver != ver)
2446
DBUG_RETURN(0); /* some the thread flushed the buffer already */
2448
if (buffer->is_closing_buffer)
2450
/* some other flush in progress */
2451
translog_wait_for_closing(buffer);
2454
if (buffer->file != file || buffer->offset != offset || buffer->ver != ver)
2455
DBUG_RETURN(0); /* some the thread flushed the buffer already */
2457
if (buffer->overlay && buffer->overlay->file == buffer->file &&
2458
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
2459
buffer->offset) > 0)
2462
This can't happen for normal translog_flush,
2463
only during destroying the loghandler
2465
struct st_translog_buffer *overlay= buffer->overlay;
2466
TRANSLOG_ADDRESS buffer_offset= buffer->offset;
2467
TRANSLOG_FILE *fl= buffer->file;
2468
uint8 ver= buffer->ver;
2469
translog_buffer_unlock(buffer);
2470
translog_buffer_lock(overlay);
2471
/* rechecks under mutex protection that overlay is still our overlay */
2472
if (buffer->overlay->file == fl &&
2473
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
2476
translog_wait_for_buffer_free(overlay);
2478
translog_buffer_unlock(overlay);
2479
translog_buffer_lock(buffer);
2480
if (buffer->file != fl || buffer_offset != buffer->offset ||
2484
This means that somebody else flushed the buffer while we was
2485
waiting for overlay then for locking buffer again.
2492
Send page by page in the pagecache what we are going to write on the
2496
for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE;
2498
i+= TRANSLOG_PAGE_SIZE, pg++)
2500
TRANSLOG_ADDRESS addr= (buffer->offset + i);
2501
TRANSLOG_VALIDATOR_DATA data;
2502
DBUG_PRINT("info", ("send log form %lu till %lu address: (%lu,0x%lx) "
2503
"page #: %lu buffer size: %lu buffer: 0x%lx",
2504
(ulong) i, (ulong) (i + TRANSLOG_PAGE_SIZE),
2505
LSN_IN_PARTS(addr), (ulong) pg, (ulong) buffer->size,
2508
DBUG_ASSERT(log_descriptor.pagecache->block_size == TRANSLOG_PAGE_SIZE);
2509
DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size);
2510
if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN)
2512
if (pagecache_inject(log_descriptor.pagecache,
2513
&file->handler, pg, 3,
2515
PAGECACHE_PLAIN_PAGE,
2516
PAGECACHE_LOCK_LEFT_UNLOCKED,
2517
PAGECACHE_PIN_LEFT_UNPINNED, 0,
2521
("Can't write page (%lu,0x%lx) to pagecache, error: %d",
2522
(ulong) buffer->file->number,
2523
(ulong) (LSN_OFFSET(buffer->offset)+ i),
2525
translog_stop_writing();
2530
if (my_pwrite(file->handler.file, buffer->buffer,
2531
buffer->size, LSN_OFFSET(buffer->offset),
2534
DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu "
2536
(ulong) file->handler.file,
2537
(ulong) LSN_OFFSET(buffer->offset),
2538
(ulong) buffer->size, errno));
2539
translog_stop_writing();
2543
Dropping the flag in such way can make false alarm: signalling than the
2544
file in not sync when it is sync, but the situation is quite rare and
2545
protections with mutexes give much more overhead to the whole engine
2549
if (LSN_OFFSET(buffer->last_lsn) != 0) /* if buffer->last_lsn is set */
2550
translog_set_sent_to_disk(buffer->last_lsn,
2551
buffer->next_buffer_offset);
2553
translog_set_only_in_buffers(buffer->next_buffer_offset);
2558
pthread_mutex_lock(&log_descriptor.dirty_buffer_mask_lock);
2559
log_descriptor.dirty_buffer_mask&= ~(1 << buffer->buffer_no);
2560
pthread_mutex_unlock(&log_descriptor.dirty_buffer_mask_lock);
2561
pthread_cond_broadcast(&buffer->waiting_filling_buffer);
2567
Recover page with sector protection (wipe out failed chunks)
2570
translog_recover_page_up_to_sector()
2571
page reference on the page
2572
offset offset of failed sector
2579
static my_bool translog_recover_page_up_to_sector(uchar *page, uint16 offset)
2581
uint16 chunk_offset= translog_get_first_chunk_offset(page), valid_chunk_end;
2582
DBUG_ENTER("translog_recover_page_up_to_sector");
2583
DBUG_PRINT("enter", ("offset: %u first chunk: %u",
2584
(uint) offset, (uint) chunk_offset));
2586
while (page[chunk_offset] != TRANSLOG_FILLER && chunk_offset < offset)
2588
uint16 chunk_length;
2590
translog_get_total_chunk_length(page, chunk_offset)) == 0)
2592
DBUG_PRINT("error", ("cant get chunk length (offset %u)",
2593
(uint) chunk_offset));
2596
DBUG_PRINT("info", ("chunk: offset: %u length %u",
2597
(uint) chunk_offset, (uint) chunk_length));
2598
if (((ulong) chunk_offset) + ((ulong) chunk_length) > TRANSLOG_PAGE_SIZE)
2600
DBUG_PRINT("error", ("damaged chunk (offset %u) in trusted area",
2601
(uint) chunk_offset));
2604
chunk_offset+= chunk_length;
2607
valid_chunk_end= chunk_offset;
2608
/* end of trusted area - sector parsing */
2609
while (page[chunk_offset] != TRANSLOG_FILLER)
2611
uint16 chunk_length;
2613
translog_get_total_chunk_length(page, chunk_offset)) == 0)
2616
DBUG_PRINT("info", ("chunk: offset: %u length %u",
2617
(uint) chunk_offset, (uint) chunk_length));
2618
if (((ulong) chunk_offset) + ((ulong) chunk_length) >
2619
(uint) (offset + DISK_DRIVE_SECTOR_SIZE))
2622
chunk_offset+= chunk_length;
2623
valid_chunk_end= chunk_offset;
2625
DBUG_PRINT("info", ("valid chunk end offset: %u", (uint) valid_chunk_end));
2627
memset(page + valid_chunk_end, TRANSLOG_FILLER,
2628
TRANSLOG_PAGE_SIZE - valid_chunk_end);
2635
@brief Dummy write callback.
2639
translog_dummy_callback(uchar *page __attribute__((unused)),
2640
pgcache_page_no_t page_no __attribute__((unused)),
2641
uchar* data_ptr __attribute__((unused)))
2648
@brief Checks and removes sector protection.
2650
@param page reference on the page content.
2651
@param file transaction log descriptor.
2658
translog_check_sector_protection(uchar *page, TRANSLOG_FILE *file)
2661
uchar *table= page + page_overhead[page[TRANSLOG_PAGE_FLAGS]] -
2662
TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE;
2663
uint8 current= table[0];
2664
DBUG_ENTER("translog_check_sector_protection");
2666
for (i= 1, offset= DISK_DRIVE_SECTOR_SIZE;
2667
i < TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE;
2668
i++, offset+= DISK_DRIVE_SECTOR_SIZE)
2671
TODO: add chunk counting for "suspecting" sectors (difference is
2672
more than 1-2), if difference more then present chunks then it is
2675
uint8 test= page[offset];
2676
DBUG_PRINT("info", ("sector: #%u offset: %u current: %lx "
2677
"read: 0x%x stored: 0x%x%x",
2678
i, offset, (ulong) current,
2679
(uint) uint2korr(page + offset), (uint) table[i],
2680
(uint) table[i + 1]));
2682
3 is minimal possible record length. So we can have "distance"
2683
between 2 sectors value more then DISK_DRIVE_SECTOR_SIZE / 3
2684
only if it is old value, i.e. the sector was not written.
2686
if (((test < current) &&
2687
((uint)(0xFFL - current + test) > DISK_DRIVE_SECTOR_SIZE / 3)) ||
2688
((test >= current) &&
2689
((uint)(test - current) > DISK_DRIVE_SECTOR_SIZE / 3)))
2691
if (translog_recover_page_up_to_sector(page, offset))
2693
file->was_recovered= 1;
2697
/* Restore value on the page */
2698
page[offset]= table[i];
2700
DBUG_PRINT("info", ("sector: #%u offset: %u current: %lx "
2701
"read: 0x%x stored: 0x%x",
2702
i, offset, (ulong) current,
2703
(uint) page[offset], (uint) table[i]));
2710
@brief Log page validator (read callback)
2712
@param page The page data to check
2713
@param page_no The page number (<offset>/<page length>)
2714
@param data_ptr Read callback data pointer (pointer to TRANSLOG_FILE)
2716
@todo: add turning loghandler to read-only mode after merging with
2723
static my_bool translog_page_validator(uchar *page,
2724
pgcache_page_no_t page_no,
2727
uint this_page_page_overhead;
2730
TRANSLOG_FILE *data= (TRANSLOG_FILE *) data_ptr;
2732
pgcache_page_no_t offset= page_no * TRANSLOG_PAGE_SIZE;
2734
DBUG_ENTER("translog_page_validator");
2736
data->was_recovered= 0;
2738
if (uint3korr(page) != page_no ||
2739
uint3korr(page + 3) != data->number)
2741
DBUG_PRINT("error", ("Page (%lu,0x%lx): "
2742
"page address written in the page is incorrect: "
2743
"File %lu instead of %lu or page %lu instead of %lu",
2744
(ulong) data->number, (ulong) offset,
2745
(ulong) uint3korr(page + 3), (ulong) data->number,
2746
(ulong) uint3korr(page),
2750
flags= (uint)(page[TRANSLOG_PAGE_FLAGS]);
2751
this_page_page_overhead= page_overhead[flags];
2752
if (flags & ~(TRANSLOG_PAGE_CRC | TRANSLOG_SECTOR_PROTECTION |
2753
TRANSLOG_RECORD_CRC))
2755
DBUG_PRINT("error", ("Page (%lu,0x%lx): "
2756
"Garbage in the page flags field detected : %x",
2757
(ulong) data->number, (ulong) offset,
2761
page_pos= page + (3 + 3 + 1);
2762
if (flags & TRANSLOG_PAGE_CRC)
2764
uint32 crc= translog_crc(page + this_page_page_overhead,
2765
TRANSLOG_PAGE_SIZE -
2766
this_page_page_overhead);
2767
if (crc != uint4korr(page_pos))
2769
DBUG_PRINT("error", ("Page (%lu,0x%lx): "
2770
"CRC mismatch: calculated: %lx on the page %lx",
2771
(ulong) data->number, (ulong) offset,
2772
(ulong) crc, (ulong) uint4korr(page_pos)));
2775
page_pos+= CRC_SIZE; /* Skip crc */
2777
if (flags & TRANSLOG_SECTOR_PROTECTION &&
2778
translog_check_sector_protection(page, data))
2787
@brief Locks the loghandler.
2790
void translog_lock()
2792
uint8 current_buffer;
2793
DBUG_ENTER("translog_lock");
2796
Locking the loghandler mean locking current buffer, but it can change
2797
during locking, so we should check it
2802
log_descriptor.bc.buffer_no is only one byte so its reading is
2805
current_buffer= log_descriptor.bc.buffer_no;
2806
translog_buffer_lock(log_descriptor.buffers + current_buffer);
2807
if (log_descriptor.bc.buffer_no == current_buffer)
2809
translog_buffer_unlock(log_descriptor.buffers + current_buffer);
2816
Unlock the loghandler
2826
void translog_unlock()
2828
translog_buffer_unlock(log_descriptor.bc.buffer);
2833
@brief Get log page by file number and offset of the beginning of the page
2835
@param data validator data, which contains the page address
2836
@param buffer buffer for page placing
2837
(might not be used in some cache implementations)
2838
@param direct_link if it is not NULL then caller can accept direct
2839
link to the page cache
2842
@retval # pointer to the page cache which should be used to read this page
2845
static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
2846
PAGECACHE_BLOCK_LINK **direct_link)
2848
TRANSLOG_ADDRESS addr= *(data->addr), in_buffers;
2849
uint32 file_no= LSN_FILE_NO(addr);
2850
TRANSLOG_FILE *file;
2851
DBUG_ENTER("translog_get_page");
2852
DBUG_PRINT("enter", ("File: %lu Offset: %lu(0x%lx)",
2854
(ulong) LSN_OFFSET(addr),
2855
(ulong) LSN_OFFSET(addr)));
2857
/* it is really page address */
2858
DBUG_ASSERT(LSN_OFFSET(addr) % TRANSLOG_PAGE_SIZE == 0);
2864
in_buffers= translog_only_in_buffers();
2865
DBUG_PRINT("info", ("in_buffers: (%lu,0x%lx)",
2866
LSN_IN_PARTS(in_buffers)));
2867
if (in_buffers != LSN_IMPOSSIBLE &&
2868
cmp_translog_addr(addr, in_buffers) >= 0)
2871
DBUG_ASSERT(cmp_translog_addr(addr, log_descriptor.horizon) < 0);
2872
/* recheck with locked loghandler */
2873
in_buffers= translog_only_in_buffers();
2874
if (cmp_translog_addr(addr, in_buffers) >= 0)
2876
uint16 buffer_no= log_descriptor.bc.buffer_no;
2878
uint16 buffer_start= buffer_no;
2880
struct st_translog_buffer *buffer_unlock= log_descriptor.bc.buffer;
2881
struct st_translog_buffer *curr_buffer= log_descriptor.bc.buffer;
2885
if the page is in the buffer and it is the last version of the
2886
page (in case of division the page by buffer flush)
2888
if (curr_buffer->file != NULL &&
2889
cmp_translog_addr(addr, curr_buffer->offset) >= 0 &&
2890
cmp_translog_addr(addr,
2891
(curr_buffer->next_buffer_offset ?
2892
curr_buffer->next_buffer_offset:
2893
curr_buffer->offset + curr_buffer->size)) < 0)
2895
TRANSLOG_ADDRESS offset= curr_buffer->offset;
2896
TRANSLOG_FILE *fl= curr_buffer->file;
2897
uchar *from, *table= NULL;
2898
int is_last_unfinished_page;
2899
uint last_protected_sector= 0;
2900
TRANSLOG_FILE file_copy;
2901
uint8 ver= curr_buffer->ver;
2902
translog_wait_for_writers(curr_buffer);
2903
if (offset != curr_buffer->offset || fl != curr_buffer->file ||
2904
ver != curr_buffer->ver)
2906
DBUG_ASSERT(buffer_unlock == curr_buffer);
2907
translog_buffer_unlock(buffer_unlock);
2910
DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset));
2911
from= curr_buffer->buffer + (addr - curr_buffer->offset);
2912
memcpy(buffer, from, TRANSLOG_PAGE_SIZE);
2914
We can use copy then in translog_page_validator() because it
2915
do not put it permanently somewhere.
2916
We have to use copy because after releasing log lock we can't
2917
guaranty that the file still be present (in real life it will be
2918
present but theoretically possible that it will be released
2919
already from last files cache);
2921
file_copy= *(curr_buffer->file);
2922
file_copy.handler.callback_data= (uchar*) &file_copy;
2923
is_last_unfinished_page= ((log_descriptor.bc.buffer ==
2925
(log_descriptor.bc.ptr >= from) &&
2926
(log_descriptor.bc.ptr <
2927
from + TRANSLOG_PAGE_SIZE));
2928
if (is_last_unfinished_page &&
2929
(buffer[TRANSLOG_PAGE_FLAGS] & TRANSLOG_SECTOR_PROTECTION))
2931
last_protected_sector= ((log_descriptor.bc.previous_offset - 1) /
2932
DISK_DRIVE_SECTOR_SIZE);
2933
table= buffer + log_descriptor.page_overhead -
2934
TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE;
2937
DBUG_ASSERT(buffer_unlock == curr_buffer);
2938
translog_buffer_unlock(buffer_unlock);
2939
if (is_last_unfinished_page)
2943
This is last unfinished page => we should not check CRC and
2944
remove only that protection which already installed (no need
2947
We do not check the flag of sector protection, because if
2948
(buffer[TRANSLOG_PAGE_FLAGS] & TRANSLOG_SECTOR_PROTECTION) is
2949
not set then last_protected_sector will be 0 so following loop
2950
will be never executed
2952
DBUG_PRINT("info", ("This is last unfinished page, "
2953
"last protected sector %u",
2954
last_protected_sector));
2955
for (i= 1; i <= last_protected_sector; i++)
2957
uint offset= i * DISK_DRIVE_SECTOR_SIZE;
2958
DBUG_PRINT("info", ("Sector %u: 0x%02x <- 0x%02x",
2961
buffer[offset]= table[i];
2967
This IF should be true because we use in-memory data which
2968
supposed to be correct.
2970
if (translog_page_validator((uchar*) buffer,
2971
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
2972
(uchar*) &file_copy))
2978
DBUG_RETURN(buffer);
2980
buffer_no= (buffer_no + 1) % TRANSLOG_BUFFERS_NO;
2981
curr_buffer= log_descriptor.buffers + buffer_no;
2982
translog_buffer_lock(curr_buffer);
2983
translog_buffer_unlock(buffer_unlock);
2984
buffer_unlock= curr_buffer;
2985
/* we can't make a full circle */
2986
DBUG_ASSERT(buffer_start != buffer_no);
2991
file= get_logfile_by_number(file_no);
2992
DBUG_ASSERT(file != NULL);
2994
(uchar*) pagecache_read(log_descriptor.pagecache, &file->handler,
2995
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
2996
3, (direct_link ? NULL : buffer),
2997
PAGECACHE_PLAIN_PAGE,
2999
PAGECACHE_LOCK_READ :
3000
PAGECACHE_LOCK_LEFT_UNLOCKED),
3002
DBUG_PRINT("info", ("Direct link is assigned to : 0x%lx * 0x%lx",
3003
(ulong) direct_link,
3004
(ulong)(direct_link ? *direct_link : NULL)));
3005
data->was_recovered= file->was_recovered;
3006
DBUG_RETURN(buffer);
3011
@brief free direct log page link
3013
@param direct_link the direct log page link to be freed
3017
static void translog_free_link(PAGECACHE_BLOCK_LINK *direct_link)
3019
DBUG_ENTER("translog_free_link");
3020
DBUG_PRINT("info", ("Direct link: 0x%lx",
3021
(ulong) direct_link));
3023
pagecache_unlock_by_link(log_descriptor.pagecache, direct_link,
3024
PAGECACHE_LOCK_READ_UNLOCK, PAGECACHE_UNPIN,
3025
LSN_IMPOSSIBLE, LSN_IMPOSSIBLE, 0);
3031
@brief Finds last full page of the given log file.
3033
@param addr address structure to fill with data, which contain
3034
file number of the log file
3035
@param last_page_ok Result of the check whether last page OK.
3036
(for now only we check only that file length
3037
divisible on page length).
3038
@param no_errors suppress messages about non-critical errors
3044
static my_bool translog_get_last_page_addr(TRANSLOG_ADDRESS *addr,
3045
my_bool *last_page_ok,
3048
char path[FN_REFLEN];
3051
uint32 file_no= LSN_FILE_NO(*addr);
3052
TRANSLOG_FILE *file;
3056
DBUG_ENTER("translog_get_last_page_addr");
3058
if (likely((file= get_logfile_by_number(file_no)) != NULL))
3061
This function used only during initialization of loghandler or in
3062
scanner (which mean we need read that part of the log), so the
3063
requested log file have to be opened and can't be freed after
3064
returning pointer on it (file_size).
3066
file_size= my_seek(file->handler.file, 0, SEEK_END, MYF(0));
3071
This branch is used only during very early initialization
3072
when files are not opened.
3075
if ((fd= my_open(translog_filename_by_fileno(file_no, path),
3076
O_RDONLY, (no_errors ? MYF(0) : MYF(MY_WME)))) < 0)
3079
DBUG_PRINT("error", ("Error %d during opening file #%d",
3083
file_size= my_seek(fd, 0, SEEK_END, MYF(0));
3084
my_close(fd, MYF(0));
3086
DBUG_PRINT("info", ("File size: %s", llstr(file_size, buff)));
3087
if (file_size == MY_FILEPOS_ERROR)
3089
DBUG_ASSERT(file_size < ULL(0xffffffff));
3090
if (((uint32)file_size) > TRANSLOG_PAGE_SIZE)
3092
rec_offset= (((((uint32)file_size) / TRANSLOG_PAGE_SIZE) - 1) *
3093
TRANSLOG_PAGE_SIZE);
3094
*last_page_ok= (((uint32)file_size) == rec_offset + TRANSLOG_PAGE_SIZE);
3101
*addr= MAKE_LSN(file_no, rec_offset);
3102
DBUG_PRINT("info", ("Last page: 0x%lx ok: %d", (ulong) rec_offset,
3109
@brief Get number bytes for record length storing
3111
@param length Record length which will be encoded
3113
@return 1,3,4,5 - number of bytes to store given length
3116
static uint translog_variable_record_length_bytes(translog_size_t length)
3120
if (length < 0xFFFF)
3122
if (length < (ulong) 0xFFFFFF)
3129
@brief Gets header of this chunk.
3131
@param chunk The pointer to the chunk beginning
3133
@retval # total length of the chunk
3137
static uint16 translog_get_chunk_header_length(uchar *chunk)
3139
DBUG_ENTER("translog_get_chunk_header_length");
3140
switch (*chunk & TRANSLOG_CHUNK_TYPE) {
3141
case TRANSLOG_CHUNK_LSN:
3143
/* 0 chunk referred as LSN (head or tail) */
3144
translog_size_t rec_len;
3145
uchar *start= chunk;
3146
uchar *ptr= start + 1 + 2;
3147
uint16 chunk_len, header_len;
3148
DBUG_PRINT("info", ("TRANSLOG_CHUNK_LSN"));
3149
rec_len= translog_variable_record_1group_decode_len(&ptr);
3150
chunk_len= uint2korr(ptr);
3151
header_len= (uint16) (ptr - start) +2;
3152
DBUG_PRINT("info", ("rec len: %lu chunk len: %u header len: %u",
3153
(ulong) rec_len, (uint) chunk_len, (uint) header_len));
3156
/* TODO: fine header end */
3158
The last chunk of multi-group record can be base for it header
3159
calculation (we skip to the first group to read the header) so if we
3160
stuck here something is wrong.
3163
DBUG_RETURN(0); /* Keep compiler happy */
3165
DBUG_RETURN(header_len);
3167
case TRANSLOG_CHUNK_FIXED:
3169
/* 1 (pseudo)fixed record (also LSN) */
3170
DBUG_PRINT("info", ("TRANSLOG_CHUNK_FIXED = 3"));
3173
case TRANSLOG_CHUNK_NOHDR:
3174
/* 2 no header chunk (till page end) */
3175
DBUG_PRINT("info", ("TRANSLOG_CHUNK_NOHDR = 1"));
3178
case TRANSLOG_CHUNK_LNGTH:
3179
/* 3 chunk with chunk length */
3180
DBUG_PRINT("info", ("TRANSLOG_CHUNK_LNGTH = 3"));
3185
DBUG_RETURN(0); /* Keep compiler happy */
3191
@brief Truncate the log to the given address. Used during the startup if the
3192
end of log if corrupted.
3194
@param addr new horizon
3200
static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr)
3203
TRANSLOG_ADDRESS current_page;
3204
uint32 next_page_offset, page_rest;
3207
TRANSLOG_VALIDATOR_DATA data;
3208
char path[FN_REFLEN];
3209
uchar page_buff[TRANSLOG_PAGE_SIZE];
3210
DBUG_ENTER("translog_truncate_log");
3211
/* TODO: write warning to the client */
3212
DBUG_PRINT("warning", ("removing all records from (%lu,0x%lx) "
3215
LSN_IN_PARTS(log_descriptor.horizon)));
3216
DBUG_ASSERT(cmp_translog_addr(addr, log_descriptor.horizon) < 0);
3217
/* remove files between the address and horizon */
3218
for (i= LSN_FILE_NO(addr) + 1; i <= LSN_FILE_NO(log_descriptor.horizon); i++)
3219
if (my_delete(translog_filename_by_fileno(i, path), MYF(MY_WME)))
3225
/* truncate the last file up to the last page */
3226
next_page_offset= LSN_OFFSET(addr);
3227
next_page_offset= (next_page_offset -
3228
((next_page_offset - 1) % TRANSLOG_PAGE_SIZE + 1) +
3229
TRANSLOG_PAGE_SIZE);
3230
page_rest= next_page_offset - LSN_OFFSET(addr);
3231
memset(page_buff, TRANSLOG_FILLER, page_rest);
3232
if ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 ||
3233
((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) ||
3234
(page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr),
3235
log_write_flags)) ||
3236
my_sync(fd, MYF(MY_WME))) |
3237
my_close(fd, MYF(MY_WME))) ||
3238
(sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
3239
sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD))))
3242
/* fix the horizon */
3243
log_descriptor.horizon= addr;
3244
/* fix the buffer data */
3245
current_page= MAKE_LSN(LSN_FILE_NO(addr), (next_page_offset -
3246
TRANSLOG_PAGE_SIZE));
3247
data.addr= ¤t_page;
3248
if ((page= translog_get_page(&data, log_descriptor.buffers->buffer, NULL)) ==
3251
if (page != log_descriptor.buffers->buffer)
3252
memcpy(log_descriptor.buffers->buffer, page, TRANSLOG_PAGE_SIZE);
3253
log_descriptor.bc.buffer->offset= current_page;
3254
log_descriptor.bc.buffer->size= LSN_OFFSET(addr) - LSN_OFFSET(current_page);
3255
log_descriptor.bc.ptr=
3256
log_descriptor.buffers->buffer + log_descriptor.bc.buffer->size;
3257
log_descriptor.bc.current_page_fill= log_descriptor.bc.buffer->size;
3263
Applies function 'callback' to all files (in a directory) which
3264
name looks like a log's name (maria_log.[0-9]{7}).
3265
If 'callback' returns TRUE this interrupts the walk and returns
3266
TRUE. Otherwise FALSE is returned after processing all log files.
3267
It cannot just use log_descriptor.directory because that may not yet have
3270
@param directory directory to scan
3271
@param callback function to apply; is passed directory and base
3275
my_bool translog_walk_filenames(const char *directory,
3276
my_bool (*callback)(const char *,
3283
/* Finds and removes transaction log files */
3284
if (!(dirp = my_dir(directory, MYF(MY_DONT_SORT))))
3287
for (i= 0; i < dirp->number_off_files; i++)
3289
char *file= dirp->dir_entry[i].name;
3290
if (strncmp(file, "maria_log.", 10) == 0 &&
3291
file[10] >= '0' && file[10] <= '9' &&
3292
file[11] >= '0' && file[11] <= '9' &&
3293
file[12] >= '0' && file[12] <= '9' &&
3294
file[13] >= '0' && file[13] <= '9' &&
3295
file[14] >= '0' && file[14] <= '9' &&
3296
file[15] >= '0' && file[15] <= '9' &&
3297
file[16] >= '0' && file[16] <= '9' &&
3298
file[17] >= '0' && file[17] <= '9' &&
3299
file[18] == '\0' && (*callback)(directory, file))
3311
@brief Fills table of dependence length of page header from page flags
3314
static void translog_fill_overhead_table()
3317
for (i= 0; i < TRANSLOG_FLAGS_NUM; i++)
3319
page_overhead[i]= 7;
3320
if (i & TRANSLOG_PAGE_CRC)
3321
page_overhead[i]+= CRC_SIZE;
3322
if (i & TRANSLOG_SECTOR_PROTECTION)
3323
page_overhead[i]+= TRANSLOG_PAGE_SIZE /
3324
DISK_DRIVE_SECTOR_SIZE;
3330
Callback to find first log in directory.
3333
static my_bool translog_callback_search_first(const char *directory
3334
__attribute__((unused)),
3335
const char *filename
3336
__attribute__((unused)))
3343
@brief Checks that chunk is LSN one
3345
@param type type of the chunk
3347
@retval 1 the chunk is LNS
3348
@retval 0 the chunk is not LSN
3351
static my_bool translog_is_LSN_chunk(uchar type)
3353
DBUG_ENTER("translog_is_LSN_chunk");
3354
DBUG_PRINT("info", ("byte: %x chunk type: %u record type: %u",
3355
type, type >> 6, type & TRANSLOG_REC_TYPE));
3356
DBUG_RETURN(((type & TRANSLOG_CHUNK_TYPE) == TRANSLOG_CHUNK_FIXED) ||
3357
(((type & TRANSLOG_CHUNK_TYPE) == TRANSLOG_CHUNK_LSN) &&
3358
((type & TRANSLOG_REC_TYPE)) != TRANSLOG_CHUNK_0_CONT));
3363
@brief Initialize transaction log
3365
@param directory Directory where log files are put
3366
@param log_file_max_size max size of one log size (for new logs creation)
3367
@param server_version version of MySQL server (MYSQL_VERSION_ID)
3368
@param server_id server ID (replication & Co)
3369
@param pagecache Page cache for the log reads
3370
@param flags flags (TRANSLOG_PAGE_CRC, TRANSLOG_SECTOR_PROTECTION
3371
TRANSLOG_RECORD_CRC)
3372
@param read_only Put transaction log in read-only mode
3373
@param init_table_func function to initialize record descriptors table
3374
@param no_errors suppress messages about non-critical errors
3377
Free used resources in case of error.
3383
my_bool translog_init_with_table(const char *directory,
3384
uint32 log_file_max_size,
3385
uint32 server_version,
3386
uint32 server_id, PAGECACHE *pagecache,
3387
uint flags, my_bool readonly,
3388
void (*init_table_func)(),
3392
int old_log_was_recovered= 0, logs_found= 0;
3393
uint old_flags= flags;
3394
uint32 start_file_num= 1;
3395
TRANSLOG_ADDRESS sure_page, last_page, last_valid_page, checkpoint_lsn;
3396
my_bool version_changed= 0;
3397
DBUG_ENTER("translog_init_with_table");
3400
log_descriptor.directory_fd= -1;
3401
log_descriptor.is_everything_flushed= 1;
3402
log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
3404
(*init_table_func)();
3405
compile_time_assert(sizeof(log_descriptor.dirty_buffer_mask) * 8 >=
3406
TRANSLOG_BUFFERS_NO);
3407
log_descriptor.dirty_buffer_mask= 0;
3409
log_descriptor.open_flags= O_BINARY | O_RDONLY;
3411
log_descriptor.open_flags= O_BINARY | O_RDWR;
3412
if (pthread_mutex_init(&log_descriptor.sent_to_disk_lock,
3413
MY_MUTEX_INIT_FAST) ||
3414
pthread_mutex_init(&log_descriptor.file_header_lock,
3415
MY_MUTEX_INIT_FAST) ||
3416
pthread_mutex_init(&log_descriptor.unfinished_files_lock,
3417
MY_MUTEX_INIT_FAST) ||
3418
pthread_mutex_init(&log_descriptor.purger_lock,
3419
MY_MUTEX_INIT_FAST) ||
3420
pthread_mutex_init(&log_descriptor.log_flush_lock,
3421
MY_MUTEX_INIT_FAST) ||
3422
pthread_mutex_init(&log_descriptor.dirty_buffer_mask_lock,
3423
MY_MUTEX_INIT_FAST) ||
3424
pthread_cond_init(&log_descriptor.log_flush_cond, 0) ||
3425
my_rwlock_init(&log_descriptor.open_files_lock,
3427
my_init_dynamic_array(&log_descriptor.open_files,
3428
sizeof(TRANSLOG_FILE*), 10, 10) ||
3429
my_init_dynamic_array(&log_descriptor.unfinished_files,
3430
sizeof(struct st_file_counter),
3433
log_descriptor.min_need_file= 0;
3434
log_descriptor.min_file_number= 0;
3435
log_descriptor.last_lsn_checked= LSN_IMPOSSIBLE;
3437
/* Directory to store files */
3438
unpack_dirname(log_descriptor.directory, directory);
3440
if ((log_descriptor.directory_fd= my_open(log_descriptor.directory,
3441
O_RDONLY, MYF(MY_WME))) < 0)
3444
DBUG_PRINT("error", ("Error %d during opening directory '%s'",
3445
errno, log_descriptor.directory));
3449
log_descriptor.in_buffers_only= LSN_IMPOSSIBLE;
3450
DBUG_ASSERT(log_file_max_size % TRANSLOG_PAGE_SIZE == 0 &&
3451
log_file_max_size >= TRANSLOG_MIN_FILE_SIZE);
3452
/* max size of one log size (for new logs creation) */
3453
log_file_size= log_descriptor.log_file_max_size=
3455
/* server version */
3456
log_descriptor.server_version= server_version;
3458
log_descriptor.server_id= server_id;
3459
/* Page cache for the log reads */
3460
log_descriptor.pagecache= pagecache;
3462
DBUG_ASSERT((flags &
3463
~(TRANSLOG_PAGE_CRC | TRANSLOG_SECTOR_PROTECTION |
3464
TRANSLOG_RECORD_CRC)) == 0);
3465
log_descriptor.flags= flags;
3466
translog_fill_overhead_table();
3467
log_descriptor.page_overhead= page_overhead[flags];
3468
log_descriptor.page_capacity_chunk_2=
3469
TRANSLOG_PAGE_SIZE - log_descriptor.page_overhead - 1;
3470
compile_time_assert(TRANSLOG_WRITE_BUFFER % TRANSLOG_PAGE_SIZE == 0);
3471
log_descriptor.buffer_capacity_chunk_2=
3472
(TRANSLOG_WRITE_BUFFER / TRANSLOG_PAGE_SIZE) *
3473
log_descriptor.page_capacity_chunk_2;
3474
log_descriptor.half_buffer_capacity_chunk_2=
3475
log_descriptor.buffer_capacity_chunk_2 / 2;
3477
("Overhead: %u pc2: %u bc2: %u, bc2/2: %u",
3478
log_descriptor.page_overhead,
3479
log_descriptor.page_capacity_chunk_2,
3480
log_descriptor.buffer_capacity_chunk_2,
3481
log_descriptor.half_buffer_capacity_chunk_2));
3483
/* Just to init it somehow (hack for bootstrap)*/
3485
TRANSLOG_FILE *file= 0;
3486
log_descriptor.min_file = log_descriptor.max_file= 1;
3487
insert_dynamic(&log_descriptor.open_files, (uchar *)&file);
3488
translog_start_buffer(log_descriptor.buffers, &log_descriptor.bc, 0);
3489
pop_dynamic(&log_descriptor.open_files);
3492
/* Buffers for log writing */
3493
for (i= 0; i < TRANSLOG_BUFFERS_NO; i++)
3495
if (translog_buffer_init(log_descriptor.buffers + i))
3497
log_descriptor.buffers[i].buffer_no= (uint8) i;
3498
DBUG_PRINT("info", ("translog_buffer buffer #%u: 0x%lx",
3499
i, (ulong) log_descriptor.buffers + i));
3503
last_logno and last_checkpoint_lsn were set in
3504
ma_control_file_create_or_open()
3506
logs_found= (last_logno != FILENO_IMPOSSIBLE);
3508
translog_status= (readonly ? TRANSLOG_READONLY : TRANSLOG_OK);
3509
checkpoint_lsn= last_checkpoint_lsn;
3514
DBUG_PRINT("info", ("log found..."));
3516
TODO: scan directory for maria_log.XXXXXXXX files and find
3517
highest XXXXXXXX & set logs_found
3518
TODO: check that last checkpoint within present log addresses space
3522
if (LSN_FILE_NO(last_checkpoint_lsn) == FILENO_IMPOSSIBLE)
3524
DBUG_ASSERT(LSN_OFFSET(last_checkpoint_lsn) == 0);
3525
/* only last log needs to be checked */
3526
sure_page= MAKE_LSN(last_logno, TRANSLOG_PAGE_SIZE);
3530
sure_page= last_checkpoint_lsn;
3531
DBUG_ASSERT(LSN_OFFSET(sure_page) % TRANSLOG_PAGE_SIZE != 0);
3532
sure_page-= LSN_OFFSET(sure_page) % TRANSLOG_PAGE_SIZE;
3534
/* Set horizon to the beginning of the last file first */
3535
log_descriptor.horizon= last_page= MAKE_LSN(last_logno, 0);
3536
if (translog_get_last_page_addr(&last_page, &pageok, no_errors))
3538
if (!translog_walk_filenames(log_descriptor.directory,
3539
&translog_callback_search_first))
3542
Files was deleted, just start from the next log number, so that
3543
existing tables are in the past.
3545
start_file_num= last_logno + 1;
3546
checkpoint_lsn= LSN_IMPOSSIBLE; /* no log so no checkpoint */
3552
else if (LSN_OFFSET(last_page) == 0)
3554
if (LSN_FILE_NO(last_page) == 1)
3556
logs_found= 0; /* file #1 has no pages */
3557
DBUG_PRINT("info", ("log found. But is is empty => no log assumed"));
3561
last_page-= LSN_ONE_FILE;
3562
if (translog_get_last_page_addr(&last_page, &pageok, 0))
3569
log_descriptor.min_file= translog_first_file(log_descriptor.horizon, 1);
3570
log_descriptor.max_file= last_logno;
3571
/* Open all files */
3572
if (allocate_dynamic(&log_descriptor.open_files,
3573
log_descriptor.max_file -
3574
log_descriptor.min_file + 1))
3576
for (i = log_descriptor.max_file; i >= log_descriptor.min_file; i--)
3579
We can't allocate all file together because they will be freed
3582
TRANSLOG_FILE *file= (TRANSLOG_FILE *)my_malloc(sizeof(TRANSLOG_FILE),
3585
compile_time_assert(MY_FILEPOS_ERROR > ULL(0xffffffff));
3587
(file->handler.file=
3588
open_logfile_by_number_no_cache(i)) < 0 ||
3589
my_seek(file->handler.file, 0, SEEK_END, MYF(0)) >=
3593
for (j= i - log_descriptor.min_file - 1; j > 0; j--)
3596
*dynamic_element(&log_descriptor.open_files, j,
3598
my_close(el->handler.file, MYF(MY_WME));
3599
my_free(el, MYF(0));
3609
translog_file_init(file, i, 1);
3610
/* we allocated space so it can't fail */
3611
insert_dynamic(&log_descriptor.open_files, (uchar *)&file);
3613
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
3614
log_descriptor.open_files.elements);
3619
/* There is no logs and there is read-only mode => nothing to read */
3620
DBUG_PRINT("error", ("No logs and read-only mode"));
3626
TRANSLOG_ADDRESS current_page= sure_page;
3629
DBUG_PRINT("info", ("The log is really present"));
3630
DBUG_ASSERT(sure_page <= last_page);
3632
/* TODO: check page size */
3634
last_valid_page= LSN_IMPOSSIBLE;
3636
Scans and validate pages. We need it to show "outside" only for sure
3637
valid part of the log. If the log was damaged then fixed we have to
3638
cut off damaged part before some other process start write something
3643
TRANSLOG_ADDRESS current_file_last_page;
3644
current_file_last_page= current_page;
3645
if (translog_get_last_page_addr(¤t_file_last_page, &pageok, 0))
3649
DBUG_PRINT("error", ("File %lu have no complete last page",
3650
(ulong) LSN_FILE_NO(current_file_last_page)));
3651
old_log_was_recovered= 1;
3652
/* This file is not written till the end so it should be last */
3653
last_page= current_file_last_page;
3654
/* TODO: issue warning */
3658
TRANSLOG_VALIDATOR_DATA data;
3659
TRANSLOG_PAGE_SIZE_BUFF psize_buff;
3661
data.addr= ¤t_page;
3662
if ((page= translog_get_page(&data, psize_buff.buffer, NULL)) == NULL)
3664
if (data.was_recovered)
3666
DBUG_PRINT("error", ("file no: %lu (%d) "
3667
"rec_offset: 0x%lx (%lu) (%d)",
3668
(ulong) LSN_FILE_NO(current_page),
3669
(uint3korr(page + 3) !=
3670
LSN_FILE_NO(current_page)),
3671
(ulong) LSN_OFFSET(current_page),
3672
(ulong) (LSN_OFFSET(current_page) /
3673
TRANSLOG_PAGE_SIZE),
3675
LSN_OFFSET(current_page) /
3676
TRANSLOG_PAGE_SIZE)));
3677
old_log_was_recovered= 1;
3680
old_flags= page[TRANSLOG_PAGE_FLAGS];
3681
last_valid_page= current_page;
3682
current_page+= TRANSLOG_PAGE_SIZE; /* increase offset */
3683
} while (current_page <= current_file_last_page);
3684
current_page+= LSN_ONE_FILE;
3685
current_page= LSN_REPLACE_OFFSET(current_page, TRANSLOG_PAGE_SIZE);
3686
} while (LSN_FILE_NO(current_page) <= LSN_FILE_NO(last_page) &&
3687
!old_log_was_recovered);
3688
if (last_valid_page == LSN_IMPOSSIBLE)
3690
/* Panic!!! Even page which should be valid is invalid */
3691
/* TODO: issue error */
3694
DBUG_PRINT("info", ("Last valid page is in file: %lu "
3695
"offset: %lu (0x%lx) "
3696
"Logs found: %d was recovered: %d "
3698
(ulong) LSN_FILE_NO(last_valid_page),
3699
(ulong) LSN_OFFSET(last_valid_page),
3700
(ulong) LSN_OFFSET(last_valid_page),
3701
logs_found, old_log_was_recovered,
3702
(old_flags == flags)));
3704
/* TODO: check server ID */
3705
if (logs_found && !old_log_was_recovered && old_flags == flags)
3707
TRANSLOG_VALIDATOR_DATA data;
3708
TRANSLOG_PAGE_SIZE_BUFF psize_buff;
3710
uint16 chunk_offset;
3711
data.addr= &last_valid_page;
3712
/* continue old log */
3713
DBUG_ASSERT(LSN_FILE_NO(last_valid_page)==
3714
LSN_FILE_NO(log_descriptor.horizon));
3715
if ((page= translog_get_page(&data, psize_buff.buffer, NULL)) == NULL ||
3716
(chunk_offset= translog_get_first_chunk_offset(page)) == 0)
3719
/* Puts filled part of old page in the buffer */
3720
log_descriptor.horizon= last_valid_page;
3721
translog_start_buffer(log_descriptor.buffers, &log_descriptor.bc, 0);
3723
Free space if filled with TRANSLOG_FILLER and first uchar of
3724
real chunk can't be TRANSLOG_FILLER
3726
while (chunk_offset < TRANSLOG_PAGE_SIZE &&
3727
page[chunk_offset] != TRANSLOG_FILLER)
3729
uint16 chunk_length;
3731
translog_get_total_chunk_length(page, chunk_offset)) == 0)
3733
DBUG_PRINT("info", ("chunk: offset: %u length: %u",
3734
(uint) chunk_offset, (uint) chunk_length));
3735
chunk_offset+= chunk_length;
3737
/* chunk can't cross the page border */
3738
DBUG_ASSERT(chunk_offset <= TRANSLOG_PAGE_SIZE);
3740
memcpy(log_descriptor.buffers->buffer, page, chunk_offset);
3741
log_descriptor.bc.buffer->size+= chunk_offset;
3742
log_descriptor.bc.ptr+= chunk_offset;
3743
log_descriptor.bc.current_page_fill= chunk_offset;
3744
log_descriptor.horizon= LSN_REPLACE_OFFSET(log_descriptor.horizon,
3746
LSN_OFFSET(last_valid_page)));
3747
DBUG_PRINT("info", ("Move Page #%u: 0x%lx chaser: %d Size: %lu (%lu)",
3748
(uint) log_descriptor.bc.buffer_no,
3749
(ulong) log_descriptor.bc.buffer,
3750
log_descriptor.bc.chaser,
3751
(ulong) log_descriptor.bc.buffer->size,
3752
(ulong) (log_descriptor.bc.ptr - log_descriptor.bc.
3754
translog_check_cursor(&log_descriptor.bc);
3756
if (!old_log_was_recovered && old_flags == flags)
3758
LOGHANDLER_FILE_INFO info;
3760
Accessing &log_descriptor.open_files without mutex is safe
3761
because it is initialization
3763
if (translog_read_file_header(&info,
3764
(*dynamic_element(&log_descriptor.
3766
0, TRANSLOG_FILE **))->
3769
version_changed= (info.maria_version != TRANSLOG_VERSION_ID);
3772
DBUG_PRINT("info", ("Logs found: %d was recovered: %d",
3773
logs_found, old_log_was_recovered));
3776
TRANSLOG_FILE *file= (TRANSLOG_FILE*)my_malloc(sizeof(TRANSLOG_FILE),
3778
DBUG_PRINT("info", ("The log is not found => we will create new log"));
3781
/* Start new log system from scratch */
3782
log_descriptor.horizon= MAKE_LSN(start_file_num,
3783
TRANSLOG_PAGE_SIZE); /* header page */
3784
if ((file->handler.file=
3785
create_logfile_by_number_no_cache(start_file_num)) == -1)
3787
translog_file_init(file, start_file_num, 0);
3788
if (insert_dynamic(&log_descriptor.open_files, (uchar*)&file))
3790
log_descriptor.min_file= log_descriptor.max_file= start_file_num;
3791
if (translog_write_file_header())
3793
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
3794
log_descriptor.open_files.elements);
3796
if (ma_control_file_write_and_force(checkpoint_lsn, start_file_num,
3797
max_trid_in_control_file,
3800
/* assign buffer 0 */
3801
translog_start_buffer(log_descriptor.buffers, &log_descriptor.bc, 0);
3802
translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc);
3804
else if ((old_log_was_recovered || old_flags != flags || version_changed) &&
3807
/* leave the damaged file untouched */
3808
log_descriptor.horizon+= LSN_ONE_FILE;
3810
log_descriptor.horizon= LSN_REPLACE_OFFSET(log_descriptor.horizon,
3811
TRANSLOG_PAGE_SIZE);
3812
if (translog_create_new_file())
3815
Buffer system left untouched after recovery => we should init it
3816
(starting from buffer 0)
3818
translog_start_buffer(log_descriptor.buffers, &log_descriptor.bc, 0);
3819
translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc);
3822
/* all LSNs that are on disk are flushed */
3823
log_descriptor.log_start= log_descriptor.sent_to_disk=
3824
log_descriptor.flushed= log_descriptor.horizon;
3825
log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset;
3826
log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */
3827
log_descriptor.previous_flush_horizon= log_descriptor.horizon;
3829
Now 'flushed' is set to 'horizon' value, but 'horizon' is (potentially)
3830
address of the next LSN and we want indicate that all LSNs that are
3831
already on the disk are flushed so we need decrease horizon on 1 (we are
3832
sure that there is no LSN on the disk which is greater then 'flushed'
3833
and there will not be LSN created that is equal or less then the value
3836
log_descriptor.flushed--; /* offset decreased */
3837
log_descriptor.sent_to_disk--; /* offset decreased */
3839
Log records will refer to a MARIA_SHARE by a unique 2-byte id; set up
3840
structures for generating 2-byte ids:
3842
my_atomic_rwlock_init(&LOCK_id_to_share);
3843
id_to_share= (MARIA_SHARE **) my_malloc(SHARE_ID_MAX * sizeof(MARIA_SHARE*),
3844
MYF(MY_WME | MY_ZEROFILL));
3845
if (unlikely(!id_to_share))
3847
id_to_share--; /* min id is 1 */
3849
/* Check the last LSN record integrity */
3852
TRANSLOG_SCANNER_DATA scanner;
3853
TRANSLOG_ADDRESS page_addr;
3854
LSN last_lsn= LSN_IMPOSSIBLE;
3856
take very last page address and try to find LSN record on it
3857
if it fail take address of previous page and so on
3859
page_addr= (log_descriptor.horizon -
3860
((log_descriptor.horizon - 1) % TRANSLOG_PAGE_SIZE + 1));
3861
if (translog_scanner_init(page_addr, 1, &scanner, 1))
3863
scanner.page_offset= page_overhead[scanner.page[TRANSLOG_PAGE_FLAGS]];
3867
chunk_1byte= scanner.page[scanner.page_offset];
3868
while (!translog_is_LSN_chunk(chunk_1byte) &&
3869
scanner.page != END_OF_LOG &&
3870
scanner.page[scanner.page_offset] != TRANSLOG_FILLER &&
3871
scanner.page_addr == page_addr)
3873
if (translog_get_next_chunk(&scanner))
3875
translog_destroy_scanner(&scanner);
3878
if (scanner.page != END_OF_LOG)
3879
chunk_1byte= scanner.page[scanner.page_offset];
3881
if (translog_is_LSN_chunk(chunk_1byte))
3883
last_lsn= scanner.page_addr + scanner.page_offset;
3884
if (translog_get_next_chunk(&scanner))
3886
translog_destroy_scanner(&scanner);
3889
if (scanner.page == END_OF_LOG)
3890
break; /* it was the last record */
3891
chunk_1byte= scanner.page[scanner.page_offset];
3892
continue; /* try to find other record on this page */
3895
if (last_lsn != LSN_IMPOSSIBLE)
3896
break; /* there is no more records on the page */
3898
/* We have to make step back */
3899
if (unlikely(LSN_OFFSET(page_addr) == TRANSLOG_PAGE_SIZE))
3901
uint32 file_no= LSN_FILE_NO(page_addr);
3902
my_bool last_page_ok;
3903
/* it is beginning of the current file */
3904
if (unlikely(file_no == 1))
3907
It is beginning of the log => there is no LSNs in the log =>
3908
There is no harm in leaving it "as-is".
3913
page_addr= MAKE_LSN(file_no, TRANSLOG_PAGE_SIZE);
3914
translog_get_last_page_addr(&page_addr, &last_page_ok, 0);
3915
/* page should be OK as it is not the last file */
3916
DBUG_ASSERT(last_page_ok);
3920
page_addr-= TRANSLOG_PAGE_SIZE;
3922
translog_destroy_scanner(&scanner);
3923
if (translog_scanner_init(page_addr, 1, &scanner, 1))
3925
scanner.page_offset= page_overhead[scanner.page[TRANSLOG_PAGE_FLAGS]];
3927
translog_destroy_scanner(&scanner);
3929
/* Now scanner points to the last LSN chunk, lets check it */
3931
TRANSLOG_HEADER_BUFFER rec;
3932
translog_size_t rec_len;
3935
DBUG_PRINT("info", ("going to check the last found record (%lu,0x%lx)",
3936
LSN_IN_PARTS(last_lsn)));
3939
translog_read_record_header(last_lsn, &rec);
3940
if (unlikely (len == RECHEADER_READ_ERROR ||
3941
len == RECHEADER_READ_EOF))
3943
DBUG_PRINT("error", ("unexpected end of log or record during "
3944
"reading record header: (%lu,0x%lx) len: %d",
3945
LSN_IN_PARTS(last_lsn), len));
3947
log_descriptor.log_start= log_descriptor.horizon= last_lsn;
3948
else if (translog_truncate_log(last_lsn))
3950
translog_free_record_header(&rec);
3956
DBUG_ASSERT(last_lsn == rec.lsn);
3957
if (likely(rec.record_length != 0))
3960
Reading the last byte of record will trigger scanning all
3961
record chunks for now
3963
rec_len= translog_read_record(rec.lsn, rec.record_length - 1, 1,
3967
DBUG_PRINT("error", ("unexpected end of log or record during "
3968
"reading record body: (%lu,0x%lx) len: %d",
3969
LSN_IN_PARTS(rec.lsn),
3972
log_descriptor.log_start= log_descriptor.horizon= last_lsn;
3974
else if (translog_truncate_log(last_lsn))
3976
translog_free_record_header(&rec);
3982
translog_free_record_header(&rec);
3987
ma_message_no_user(0, "log initialization failed");
3993
@brief Free transaction log file buffer.
3995
@param buffer_no The buffer to free
3998
static void translog_buffer_destroy(struct st_translog_buffer *buffer)
4000
DBUG_ENTER("translog_buffer_destroy");
4002
("Buffer #%u: 0x%lx file: %d offset: (%lu,0x%lx) size: %lu",
4003
(uint) buffer->buffer_no, (ulong) buffer,
4004
(buffer->file ? buffer->file->handler.file : -1),
4005
LSN_IN_PARTS(buffer->offset),
4006
(ulong) buffer->size));
4007
if (buffer->file != NULL)
4010
We ignore errors here, because we can't do something about it
4011
(it is shutting down)
4013
We also have to take the locks even if there can't be any other
4014
threads running, because translog_buffer_flush()
4015
requires that we have the buffer locked.
4017
translog_buffer_lock(buffer);
4018
translog_buffer_flush(buffer);
4019
translog_buffer_unlock(buffer);
4021
DBUG_PRINT("info", ("Destroy mutex: 0x%lx", (ulong) &buffer->mutex));
4022
pthread_mutex_destroy(&buffer->mutex);
4023
pthread_cond_destroy(&buffer->waiting_filling_buffer);
4029
Free log handler resources
4035
void translog_destroy()
4037
TRANSLOG_FILE **file;
4039
uint8 current_buffer;
4040
DBUG_ENTER("translog_destroy");
4042
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
4043
translog_status == TRANSLOG_READONLY);
4045
current_buffer= log_descriptor.bc.buffer_no;
4046
translog_status= (translog_status == TRANSLOG_READONLY ?
4049
if (log_descriptor.bc.buffer->file != NULL)
4050
translog_finish_page(&log_descriptor.horizon, &log_descriptor.bc);
4053
for (i= 0; i < TRANSLOG_BUFFERS_NO; i++)
4055
struct st_translog_buffer *buffer= (log_descriptor.buffers +
4056
((i + current_buffer + 1) %
4057
TRANSLOG_BUFFERS_NO));
4058
translog_buffer_destroy(buffer);
4060
translog_status= TRANSLOG_UNINITED;
4063
while ((file= (TRANSLOG_FILE **)pop_dynamic(&log_descriptor.open_files)))
4064
translog_close_log_file(*file);
4065
pthread_mutex_destroy(&log_descriptor.sent_to_disk_lock);
4066
pthread_mutex_destroy(&log_descriptor.file_header_lock);
4067
pthread_mutex_destroy(&log_descriptor.unfinished_files_lock);
4068
pthread_mutex_destroy(&log_descriptor.purger_lock);
4069
pthread_mutex_destroy(&log_descriptor.log_flush_lock);
4070
pthread_mutex_destroy(&log_descriptor.dirty_buffer_mask_lock);
4071
pthread_cond_destroy(&log_descriptor.log_flush_cond);
4072
rwlock_destroy(&log_descriptor.open_files_lock);
4073
delete_dynamic(&log_descriptor.open_files);
4074
delete_dynamic(&log_descriptor.unfinished_files);
4076
if (log_descriptor.directory_fd >= 0)
4077
my_close(log_descriptor.directory_fd, MYF(MY_WME));
4078
my_atomic_rwlock_destroy(&LOCK_id_to_share);
4079
if (id_to_share != NULL)
4080
my_free((uchar*)(id_to_share + 1), MYF(MY_WME));
4086
@brief Starts new page.
4088
@param horizon \ Position in file and buffer where we are
4090
@param prev_buffer Buffer which should be flushed will be assigned here.
4091
This is always set (to NULL if nothing to flush).
4093
@note We do not want to flush the buffer immediately because we want to
4094
let caller of this function first advance 'horizon' pointer and unlock the
4095
loghandler and only then flush the log which can take some time.
4101
static my_bool translog_page_next(TRANSLOG_ADDRESS *horizon,
4102
struct st_buffer_cursor *cursor,
4103
struct st_translog_buffer **prev_buffer)
4105
struct st_translog_buffer *buffer= cursor->buffer;
4106
DBUG_ENTER("translog_page_next");
4109
if ((cursor->ptr + TRANSLOG_PAGE_SIZE >
4110
cursor->buffer->buffer + TRANSLOG_WRITE_BUFFER) ||
4111
(LSN_OFFSET(*horizon) >
4112
log_descriptor.log_file_max_size - TRANSLOG_PAGE_SIZE))
4114
DBUG_PRINT("info", ("Switch to next buffer Buffer Size: %lu (%lu) => %d "
4115
"File size: %lu max: %lu => %d",
4116
(ulong) cursor->buffer->size,
4117
(ulong) (cursor->ptr - cursor->buffer->buffer),
4118
(cursor->ptr + TRANSLOG_PAGE_SIZE >
4119
cursor->buffer->buffer + TRANSLOG_WRITE_BUFFER),
4120
(ulong) LSN_OFFSET(*horizon),
4121
(ulong) log_descriptor.log_file_max_size,
4122
(LSN_OFFSET(*horizon) >
4123
(log_descriptor.log_file_max_size -
4124
TRANSLOG_PAGE_SIZE))));
4125
if (translog_buffer_next(horizon, cursor,
4126
LSN_OFFSET(*horizon) >
4127
(log_descriptor.log_file_max_size -
4128
TRANSLOG_PAGE_SIZE)))
4130
*prev_buffer= buffer;
4131
DBUG_PRINT("info", ("Buffer #%u (0x%lu): have to be flushed",
4132
(uint) buffer->buffer_no, (ulong) buffer));
4136
DBUG_PRINT("info", ("Use the same buffer #%u (0x%lu): "
4137
"Buffer Size: %lu (%lu)",
4138
(uint) buffer->buffer_no,
4140
(ulong) cursor->buffer->size,
4141
(ulong) (cursor->ptr - cursor->buffer->buffer)));
4142
translog_finish_page(horizon, cursor);
4143
translog_new_page_header(horizon, cursor);
4150
Write data of given length to the current page
4153
translog_write_data_on_page()
4154
horizon \ Pointers on file and buffer
4156
length IN length of the chunk
4157
buffer buffer with data
4164
static my_bool translog_write_data_on_page(TRANSLOG_ADDRESS *horizon,
4165
struct st_buffer_cursor *cursor,
4166
translog_size_t length,
4169
DBUG_ENTER("translog_write_data_on_page");
4170
DBUG_PRINT("enter", ("Chunk length: %lu Page size %u",
4171
(ulong) length, (uint) cursor->current_page_fill));
4172
DBUG_ASSERT(length > 0);
4173
DBUG_ASSERT(length + cursor->current_page_fill <= TRANSLOG_PAGE_SIZE);
4174
DBUG_ASSERT(length + cursor->ptr <= cursor->buffer->buffer +
4175
TRANSLOG_WRITE_BUFFER);
4177
memcpy(cursor->ptr, buffer, length);
4178
cursor->ptr+= length;
4179
(*horizon)+= length; /* adds offset */
4180
cursor->current_page_fill+= length;
4181
if (!cursor->chaser)
4182
cursor->buffer->size+= length;
4183
DBUG_PRINT("info", ("Write data buffer #%u: 0x%lx "
4184
"chaser: %d Size: %lu (%lu)",
4185
(uint) cursor->buffer->buffer_no, (ulong) cursor->buffer,
4186
cursor->chaser, (ulong) cursor->buffer->size,
4187
(ulong) (cursor->ptr - cursor->buffer->buffer)));
4188
translog_check_cursor(cursor);
4195
Write data from parts of given length to the current page
4198
translog_write_parts_on_page()
4199
horizon \ Pointers on file and buffer
4201
length IN length of the chunk
4202
parts IN/OUT chunk source
4209
static my_bool translog_write_parts_on_page(TRANSLOG_ADDRESS *horizon,
4210
struct st_buffer_cursor *cursor,
4211
translog_size_t length,
4212
struct st_translog_parts *parts)
4214
translog_size_t left= length;
4215
uint cur= (uint) parts->current;
4216
DBUG_ENTER("translog_write_parts_on_page");
4217
DBUG_PRINT("enter", ("Chunk length: %lu parts: %u of %u. Page size: %u "
4218
"Buffer size: %lu (%lu)",
4220
(uint) (cur + 1), (uint) parts->elements,
4221
(uint) cursor->current_page_fill,
4222
(ulong) cursor->buffer->size,
4223
(ulong) (cursor->ptr - cursor->buffer->buffer)));
4224
DBUG_ASSERT(length > 0);
4225
DBUG_ASSERT(length + cursor->current_page_fill <= TRANSLOG_PAGE_SIZE);
4226
DBUG_ASSERT(length + cursor->ptr <= cursor->buffer->buffer +
4227
TRANSLOG_WRITE_BUFFER);
4231
translog_size_t len;
4235
DBUG_ASSERT(cur < parts->elements);
4236
part= parts->parts + cur;
4238
DBUG_PRINT("info", ("Part: %u Length: %lu left: %lu buff: 0x%lx",
4239
(uint) (cur + 1), (ulong) part->length, (ulong) left,
4242
if (part->length > left)
4244
/* we should write less then the current part */
4248
DBUG_PRINT("info", ("Set new part: %u Length: %lu",
4249
(uint) (cur + 1), (ulong) part->length));
4253
len= (translog_size_t) part->length;
4255
DBUG_PRINT("info", ("moved to next part (len: %lu)", (ulong) len));
4257
DBUG_PRINT("info", ("copy: 0x%lx <- 0x%lx %u",
4258
(ulong) cursor->ptr, (ulong)buff, (uint)len));
4261
memcpy(cursor->ptr, buff, len);
4267
DBUG_PRINT("info", ("Horizon: (%lu,0x%lx) Length %lu(0x%lx)",
4268
LSN_IN_PARTS(*horizon),
4269
(ulong) length, (ulong) length));
4270
parts->current= cur;
4271
(*horizon)+= length; /* offset increasing */
4272
cursor->current_page_fill+= length;
4273
if (!cursor->chaser)
4274
cursor->buffer->size+= length;
4276
We do not not updating parts->total_record_length here because it is
4277
need only before writing record to have total length
4279
DBUG_PRINT("info", ("Write parts buffer #%u: 0x%lx "
4280
"chaser: %d Size: %lu (%lu) "
4281
"Horizon: (%lu,0x%lx) buff offset: 0x%lx",
4282
(uint) cursor->buffer->buffer_no, (ulong) cursor->buffer,
4283
cursor->chaser, (ulong) cursor->buffer->size,
4284
(ulong) (cursor->ptr - cursor->buffer->buffer),
4285
LSN_IN_PARTS(*horizon),
4286
(ulong) (LSN_OFFSET(cursor->buffer->offset) +
4287
cursor->buffer->size)));
4288
translog_check_cursor(cursor);
4295
Put 1 group chunk type 0 header into parts array
4298
translog_write_variable_record_1group_header()
4299
parts Descriptor of record source parts
4300
type The log record type
4301
short_trid Short transaction ID or 0 if it has no sense
4302
header_length Calculated header length of chunk type 0
4303
chunk0_header Buffer for the chunk header writing
4307
translog_write_variable_record_1group_header(struct st_translog_parts *parts,
4308
enum translog_record_type type,
4309
SHORT_TRANSACTION_ID short_trid,
4310
uint16 header_length,
4311
uchar *chunk0_header)
4314
DBUG_ASSERT(parts->current != 0); /* first part is left for header */
4315
part= parts->parts + (--parts->current);
4316
parts->total_record_length+= (translog_size_t) (part->length= header_length);
4317
part->str= chunk0_header;
4318
/* puts chunk type */
4319
*chunk0_header= (uchar) (type | TRANSLOG_CHUNK_LSN);
4320
int2store(chunk0_header + 1, short_trid);
4321
/* puts record length */
4322
translog_write_variable_record_1group_code_len(chunk0_header + 3,
4323
parts->record_length,
4325
/* puts 0 as chunk length which indicate 1 group record */
4326
int2store(chunk0_header + header_length - 2, 0);
4331
Increase number of writers for this buffer
4334
translog_buffer_increase_writers()
4335
buffer target buffer
4339
translog_buffer_increase_writers(struct st_translog_buffer *buffer)
4341
DBUG_ENTER("translog_buffer_increase_writers");
4342
translog_buffer_lock_assert_owner(buffer);
4343
buffer->copy_to_buffer_in_progress++;
4344
DBUG_PRINT("info", ("copy_to_buffer_in_progress. Buffer #%u 0x%lx progress: %d",
4345
(uint) buffer->buffer_no, (ulong) buffer,
4346
buffer->copy_to_buffer_in_progress));
4352
Decrease number of writers for this buffer
4355
translog_buffer_decrease_writers()
4356
buffer target buffer
4359
static void translog_buffer_decrease_writers(struct st_translog_buffer *buffer)
4361
DBUG_ENTER("translog_buffer_decrease_writers");
4362
translog_buffer_lock_assert_owner(buffer);
4363
buffer->copy_to_buffer_in_progress--;
4365
("copy_to_buffer_in_progress. Buffer #%u 0x%lx progress: %d",
4366
(uint) buffer->buffer_no, (ulong) buffer,
4367
buffer->copy_to_buffer_in_progress));
4368
if (buffer->copy_to_buffer_in_progress == 0)
4369
pthread_cond_broadcast(&buffer->waiting_filling_buffer);
4375
@brief Skip to the next page for chaser (thread which advanced horizon
4376
pointer and now feeling the buffer)
4378
@param horizon \ Pointers on file position and buffer
4385
static my_bool translog_chaser_page_next(TRANSLOG_ADDRESS *horizon,
4386
struct st_buffer_cursor *cursor)
4388
struct st_translog_buffer *buffer_to_flush;
4390
DBUG_ENTER("translog_chaser_page_next");
4391
DBUG_ASSERT(cursor->chaser);
4392
rc= translog_page_next(horizon, cursor, &buffer_to_flush);
4393
if (buffer_to_flush != NULL)
4395
translog_buffer_lock(buffer_to_flush);
4396
translog_buffer_decrease_writers(buffer_to_flush);
4398
rc= translog_buffer_flush(buffer_to_flush);
4399
translog_buffer_unlock(buffer_to_flush);
4405
Put chunk 2 from new page beginning
4408
translog_write_variable_record_chunk2_page()
4409
parts Descriptor of record source parts
4410
horizon \ Pointers on file position and buffer
4419
translog_write_variable_record_chunk2_page(struct st_translog_parts *parts,
4420
TRANSLOG_ADDRESS *horizon,
4421
struct st_buffer_cursor *cursor)
4423
uchar chunk2_header[1];
4424
DBUG_ENTER("translog_write_variable_record_chunk2_page");
4425
chunk2_header[0]= TRANSLOG_CHUNK_NOHDR;
4427
if (translog_chaser_page_next(horizon, cursor))
4430
/* Puts chunk type */
4431
translog_write_data_on_page(horizon, cursor, 1, chunk2_header);
4432
/* Puts chunk body */
4433
translog_write_parts_on_page(horizon, cursor,
4434
log_descriptor.page_capacity_chunk_2, parts);
4440
Put chunk 3 of requested length in the buffer from new page beginning
4443
translog_write_variable_record_chunk3_page()
4444
parts Descriptor of record source parts
4445
length Length of this chunk
4446
horizon \ Pointers on file position and buffer
4455
translog_write_variable_record_chunk3_page(struct st_translog_parts *parts,
4457
TRANSLOG_ADDRESS *horizon,
4458
struct st_buffer_cursor *cursor)
4461
uchar chunk3_header[1 + 2];
4462
DBUG_ENTER("translog_write_variable_record_chunk3_page");
4464
if (translog_chaser_page_next(horizon, cursor))
4469
/* It was call to write page header only (no data for chunk 3) */
4470
DBUG_PRINT("info", ("It is a call to make page header only"));
4474
DBUG_ASSERT(parts->current != 0); /* first part is left for header */
4475
part= parts->parts + (--parts->current);
4476
parts->total_record_length+= (translog_size_t) (part->length= 1 + 2);
4477
part->str= chunk3_header;
4478
/* Puts chunk type */
4479
*chunk3_header= (uchar) (TRANSLOG_CHUNK_LNGTH);
4480
/* Puts chunk length */
4481
int2store(chunk3_header + 1, length);
4483
translog_write_parts_on_page(horizon, cursor, length + 1 + 2, parts);
4488
Move log pointer (horizon) on given number pages starting from next page,
4489
and given offset on the last page
4492
translog_advance_pointer()
4493
pages Number of full pages starting from the next one
4494
last_page_data Plus this data on the last page
4501
static my_bool translog_advance_pointer(int pages, uint16 last_page_data)
4503
translog_size_t last_page_offset= (log_descriptor.page_overhead +
4505
translog_size_t offset= (TRANSLOG_PAGE_SIZE -
4506
log_descriptor.bc.current_page_fill +
4507
pages * TRANSLOG_PAGE_SIZE + last_page_offset);
4508
translog_size_t buffer_end_offset, file_end_offset, min_offset;
4509
DBUG_ENTER("translog_advance_pointer");
4510
DBUG_PRINT("enter", ("Pointer: (%lu, 0x%lx) + %u + %u pages + %u + %u",
4511
LSN_IN_PARTS(log_descriptor.horizon),
4512
(uint) (TRANSLOG_PAGE_SIZE -
4513
log_descriptor.bc.current_page_fill),
4514
pages, (uint) log_descriptor.page_overhead,
4515
(uint) last_page_data));
4516
translog_lock_assert_owner();
4521
It is special case when we advance the pointer on the same page.
4522
It can happened when we write last part of multi-group record.
4524
DBUG_ASSERT(last_page_data + log_descriptor.bc.current_page_fill <=
4525
TRANSLOG_PAGE_SIZE);
4526
offset= last_page_data;
4527
last_page_offset= log_descriptor.bc.current_page_fill + last_page_data;
4530
DBUG_PRINT("info", ("last_page_offset %lu", (ulong) last_page_offset));
4531
DBUG_ASSERT(last_page_offset <= TRANSLOG_PAGE_SIZE);
4534
The loop will be executed 1-3 times. Usually we advance the
4535
pointer to fill only the current buffer (if we have more then 1/2 of
4536
buffer free or 2 buffers (rest of current and all next). In case of
4537
really huge record end where we write last group with "table of
4538
content" of all groups and ignore buffer borders we can occupy
4543
uint8 new_buffer_no;
4544
struct st_translog_buffer *new_buffer;
4545
struct st_translog_buffer *old_buffer;
4546
buffer_end_offset= TRANSLOG_WRITE_BUFFER - log_descriptor.bc.buffer->size;
4547
if (likely(log_descriptor.log_file_max_size >=
4548
LSN_OFFSET(log_descriptor.horizon)))
4549
file_end_offset= (log_descriptor.log_file_max_size -
4550
LSN_OFFSET(log_descriptor.horizon));
4554
We already have written more then current file limit allow,
4555
So we will finish this page and start new file
4557
file_end_offset= (TRANSLOG_PAGE_SIZE -
4558
log_descriptor.bc.current_page_fill);
4560
DBUG_PRINT("info", ("offset: %lu buffer_end_offs: %lu, "
4561
"file_end_offs: %lu",
4562
(ulong) offset, (ulong) buffer_end_offset,
4563
(ulong) file_end_offset));
4564
DBUG_PRINT("info", ("Buff #%u %u (0x%lx) offset 0x%lx + size 0x%lx = "
4566
(uint) log_descriptor.bc.buffer->buffer_no,
4567
(uint) log_descriptor.bc.buffer_no,
4568
(ulong) log_descriptor.bc.buffer,
4569
(ulong) LSN_OFFSET(log_descriptor.bc.buffer->offset),
4570
(ulong) log_descriptor.bc.buffer->size,
4571
(ulong) (LSN_OFFSET(log_descriptor.bc.buffer->offset) +
4572
log_descriptor.bc.buffer->size),
4573
(ulong) LSN_OFFSET(log_descriptor.horizon)));
4574
DBUG_ASSERT(LSN_OFFSET(log_descriptor.bc.buffer->offset) +
4575
log_descriptor.bc.buffer->size ==
4576
LSN_OFFSET(log_descriptor.horizon));
4578
if (offset <= buffer_end_offset && offset <= file_end_offset)
4580
old_buffer= log_descriptor.bc.buffer;
4581
new_buffer_no= (log_descriptor.bc.buffer_no + 1) % TRANSLOG_BUFFERS_NO;
4582
new_buffer= log_descriptor.buffers + new_buffer_no;
4584
translog_buffer_lock(new_buffer);
4587
TRANSLOG_ADDRESS offset= new_buffer->offset;
4588
TRANSLOG_FILE *file= new_buffer->file;
4589
uint8 ver= new_buffer->ver;
4590
translog_lock_assert_owner();
4592
translog_wait_for_buffer_free(new_buffer);
4594
/* We keep the handler locked so nobody can start this new buffer */
4595
DBUG_ASSERT(offset == new_buffer->offset && new_buffer->file == NULL &&
4596
(file == NULL ? ver : (uint8)(ver + 1)) == new_buffer->ver);
4600
min_offset= min(buffer_end_offset, file_end_offset);
4601
/* TODO: check is it ptr or size enough */
4602
log_descriptor.bc.buffer->size+= min_offset;
4603
log_descriptor.bc.ptr+= min_offset;
4604
DBUG_PRINT("info", ("NewP buffer #%u: 0x%lx chaser: %d Size: %lu (%lu)",
4605
(uint) log_descriptor.bc.buffer->buffer_no,
4606
(ulong) log_descriptor.bc.buffer,
4607
log_descriptor.bc.chaser,
4608
(ulong) log_descriptor.bc.buffer->size,
4609
(ulong) (log_descriptor.bc.ptr -log_descriptor.bc.
4611
DBUG_ASSERT((ulong) (log_descriptor.bc.ptr -
4612
log_descriptor.bc.buffer->buffer) ==
4613
log_descriptor.bc.buffer->size);
4614
DBUG_ASSERT(log_descriptor.bc.buffer->buffer_no ==
4615
log_descriptor.bc.buffer_no);
4616
translog_buffer_increase_writers(log_descriptor.bc.buffer);
4618
if (file_end_offset <= buffer_end_offset)
4620
log_descriptor.horizon+= LSN_ONE_FILE;
4621
log_descriptor.horizon= LSN_REPLACE_OFFSET(log_descriptor.horizon,
4622
TRANSLOG_PAGE_SIZE);
4623
DBUG_PRINT("info", ("New file: %lu",
4624
(ulong) LSN_FILE_NO(log_descriptor.horizon)));
4625
if (translog_create_new_file())
4632
DBUG_PRINT("info", ("The same file"));
4633
log_descriptor.horizon+= min_offset; /* offset increasing */
4635
translog_start_buffer(new_buffer, &log_descriptor.bc, new_buffer_no);
4636
old_buffer->next_buffer_offset= new_buffer->offset;
4637
translog_buffer_unlock(old_buffer);
4638
offset-= min_offset;
4640
DBUG_PRINT("info", ("drop write_counter"));
4641
log_descriptor.bc.write_counter= 0;
4642
log_descriptor.bc.previous_offset= 0;
4644
log_descriptor.bc.ptr+= offset;
4645
log_descriptor.bc.buffer->size+= offset;
4646
translog_buffer_increase_writers(log_descriptor.bc.buffer);
4647
log_descriptor.horizon+= offset; /* offset increasing */
4648
log_descriptor.bc.current_page_fill= last_page_offset;
4649
DBUG_PRINT("info", ("NewP buffer #%u: 0x%lx chaser: %d Size: %lu (%lu) "
4650
"offset: %u last page: %u",
4651
(uint) log_descriptor.bc.buffer->buffer_no,
4652
(ulong) log_descriptor.bc.buffer,
4653
log_descriptor.bc.chaser,
4654
(ulong) log_descriptor.bc.buffer->size,
4655
(ulong) (log_descriptor.bc.ptr -
4656
log_descriptor.bc.buffer->
4657
buffer), (uint) offset,
4658
(uint) last_page_offset));
4660
("pointer moved to: (%lu, 0x%lx)",
4661
LSN_IN_PARTS(log_descriptor.horizon)));
4662
translog_check_cursor(&log_descriptor.bc);
4663
log_descriptor.bc.protected= 0;
4672
translog_get_current_page_rest()
4674
NOTE loghandler should be locked
4677
number of bytes left on the current page
4680
static uint translog_get_current_page_rest()
4682
return (TRANSLOG_PAGE_SIZE - log_descriptor.bc.current_page_fill);
4687
Get buffer rest in full pages
4690
translog_get_current_buffer_rest()
4692
NOTE loghandler should be locked
4695
number of full pages left on the current buffer
4698
static uint translog_get_current_buffer_rest()
4700
return ((log_descriptor.bc.buffer->buffer + TRANSLOG_WRITE_BUFFER -
4701
log_descriptor.bc.ptr) /
4702
TRANSLOG_PAGE_SIZE);
4706
Calculate possible group size without first (current) page
4709
translog_get_current_group_size()
4711
NOTE loghandler should be locked
4714
group size without first (current) page
4717
static translog_size_t translog_get_current_group_size()
4719
/* buffer rest in full pages */
4720
translog_size_t buffer_rest= translog_get_current_buffer_rest();
4721
DBUG_ENTER("translog_get_current_group_size");
4722
DBUG_PRINT("info", ("buffer_rest in pages: %u", buffer_rest));
4724
buffer_rest*= log_descriptor.page_capacity_chunk_2;
4725
/* in case of only half of buffer free we can write this and next buffer */
4726
if (buffer_rest < log_descriptor.half_buffer_capacity_chunk_2)
4728
DBUG_PRINT("info", ("buffer_rest: %lu -> add %lu",
4729
(ulong) buffer_rest,
4730
(ulong) log_descriptor.buffer_capacity_chunk_2));
4731
buffer_rest+= log_descriptor.buffer_capacity_chunk_2;
4734
DBUG_PRINT("info", ("buffer_rest: %lu", (ulong) buffer_rest));
4736
DBUG_RETURN(buffer_rest);
4740
static inline void set_lsn(LSN *lsn, LSN value)
4742
DBUG_ENTER("set_lsn");
4743
translog_lock_assert_owner();
4745
/* we generate LSN so something is not flushed in log */
4746
log_descriptor.is_everything_flushed= 0;
4747
DBUG_PRINT("info", ("new LSN appeared: (%lu,0x%lx)", LSN_IN_PARTS(value)));
4753
@brief Write variable record in 1 group.
4755
@param lsn LSN of the record will be written here
4756
@param type the log record type
4757
@param short_trid Short transaction ID or 0 if it has no sense
4758
@param parts Descriptor of record source parts
4759
@param buffer_to_flush Buffer which have to be flushed if it is not 0
4760
@param header_length Calculated header length of chunk type 0
4761
@param trn Transaction structure pointer for hooks by
4762
record log type, for short_id
4763
@param hook_arg Argument which will be passed to pre-write and
4764
in-write hooks of this record.
4767
We must have a translog_lock() when entering this function
4768
We must have buffer_to_flush locked (if not null)
4770
@return Operation status
4776
translog_write_variable_record_1group(LSN *lsn,
4777
enum translog_record_type type,
4779
SHORT_TRANSACTION_ID short_trid,
4780
struct st_translog_parts *parts,
4781
struct st_translog_buffer
4782
*buffer_to_flush, uint16 header_length,
4783
TRN *trn, void *hook_arg)
4785
TRANSLOG_ADDRESS horizon;
4786
struct st_buffer_cursor cursor;
4789
translog_size_t record_rest, full_pages, first_page;
4790
uint additional_chunk3_page= 0;
4791
uchar chunk0_header[1 + 2 + 5 + 2];
4792
DBUG_ENTER("translog_write_variable_record_1group");
4793
translog_lock_assert_owner();
4794
if (buffer_to_flush)
4795
translog_buffer_lock_assert_owner(buffer_to_flush);
4797
set_lsn(lsn, horizon= log_descriptor.horizon);
4798
if (translog_set_lsn_for_files(LSN_FILE_NO(*lsn), LSN_FILE_NO(*lsn),
4800
(log_record_type_descriptor[type].inwrite_hook &&
4801
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, tbl_info,
4807
cursor= log_descriptor.bc;
4810
/* Advance pointer to be able unlock the loghandler */
4811
first_page= translog_get_current_page_rest();
4812
record_rest= parts->record_length - (first_page - header_length);
4813
full_pages= record_rest / log_descriptor.page_capacity_chunk_2;
4814
record_rest= (record_rest % log_descriptor.page_capacity_chunk_2);
4816
if (record_rest + 1 == log_descriptor.page_capacity_chunk_2)
4818
DBUG_PRINT("info", ("2 chunks type 3 is needed"));
4819
/* We will write 2 chunks type 3 at the end of this group */
4820
additional_chunk3_page= 1;
4824
DBUG_PRINT("info", ("first_page: %u (%u) full_pages: %u (%lu) "
4825
"additional: %u (%u) rest %u = %u",
4826
first_page, first_page - header_length,
4828
(ulong) full_pages *
4829
log_descriptor.page_capacity_chunk_2,
4830
additional_chunk3_page,
4831
additional_chunk3_page *
4832
(log_descriptor.page_capacity_chunk_2 - 1),
4833
record_rest, parts->record_length));
4834
/* record_rest + 3 is chunk type 3 overhead + record_rest */
4835
rc|= translog_advance_pointer((int)(full_pages + additional_chunk3_page),
4836
(record_rest ? record_rest + 3 : 0));
4837
log_descriptor.bc.buffer->last_lsn= *lsn;
4838
DBUG_PRINT("info", ("last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
4839
LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn),
4840
(ulong) log_descriptor.bc.buffer));
4845
Check if we switched buffer and need process it (current buffer is
4846
unlocked already => we will not delay other threads
4848
if (buffer_to_flush != NULL)
4851
rc= translog_buffer_flush(buffer_to_flush);
4852
translog_buffer_unlock(buffer_to_flush);
4857
translog_write_variable_record_1group_header(parts, type, short_trid,
4858
header_length, chunk0_header);
4860
/* fill the pages */
4861
translog_write_parts_on_page(&horizon, &cursor, first_page, parts);
4863
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx)",
4864
LSN_IN_PARTS(log_descriptor.horizon),
4865
LSN_IN_PARTS(horizon)));
4867
for (i= 0; i < full_pages; i++)
4869
if (translog_write_variable_record_chunk2_page(parts, &horizon, &cursor))
4872
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx)",
4873
LSN_IN_PARTS(log_descriptor.horizon),
4874
LSN_IN_PARTS(horizon)));
4877
if (additional_chunk3_page)
4879
if (translog_write_variable_record_chunk3_page(parts,
4881
page_capacity_chunk_2 - 2,
4884
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx)",
4885
LSN_IN_PARTS(log_descriptor.horizon),
4886
LSN_IN_PARTS(horizon)));
4887
DBUG_ASSERT(cursor.current_page_fill == TRANSLOG_PAGE_SIZE);
4890
if (translog_write_variable_record_chunk3_page(parts,
4894
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx)",
4895
(ulong) LSN_FILE_NO(log_descriptor.horizon),
4896
(ulong) LSN_OFFSET(log_descriptor.horizon),
4897
(ulong) LSN_FILE_NO(horizon),
4898
(ulong) LSN_OFFSET(horizon)));
4900
translog_buffer_lock(cursor.buffer);
4901
translog_buffer_decrease_writers(cursor.buffer);
4902
translog_buffer_unlock(cursor.buffer);
4908
@brief Write variable record in 1 chunk.
4910
@param lsn LSN of the record will be written here
4911
@param type the log record type
4912
@param short_trid Short transaction ID or 0 if it has no sense
4913
@param parts Descriptor of record source parts
4914
@param buffer_to_flush Buffer which have to be flushed if it is not 0
4915
@param header_length Calculated header length of chunk type 0
4916
@param trn Transaction structure pointer for hooks by
4917
record log type, for short_id
4918
@param hook_arg Argument which will be passed to pre-write and
4919
in-write hooks of this record.
4922
We must have a translog_lock() when entering this function
4923
We must have buffer_to_flush locked (if not null)
4925
@return Operation status
4931
translog_write_variable_record_1chunk(LSN *lsn,
4932
enum translog_record_type type,
4934
SHORT_TRANSACTION_ID short_trid,
4935
struct st_translog_parts *parts,
4936
struct st_translog_buffer
4937
*buffer_to_flush, uint16 header_length,
4938
TRN *trn, void *hook_arg)
4941
uchar chunk0_header[1 + 2 + 5 + 2];
4942
DBUG_ENTER("translog_write_variable_record_1chunk");
4943
translog_lock_assert_owner();
4944
if (buffer_to_flush)
4945
translog_buffer_lock_assert_owner(buffer_to_flush);
4947
translog_write_variable_record_1group_header(parts, type, short_trid,
4948
header_length, chunk0_header);
4949
set_lsn(lsn, log_descriptor.horizon);
4950
if (translog_set_lsn_for_files(LSN_FILE_NO(*lsn), LSN_FILE_NO(*lsn),
4952
(log_record_type_descriptor[type].inwrite_hook &&
4953
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, tbl_info,
4960
rc= translog_write_parts_on_page(&log_descriptor.horizon,
4962
parts->total_record_length, parts);
4963
log_descriptor.bc.buffer->last_lsn= *lsn;
4964
DBUG_PRINT("info", ("last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
4965
LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn),
4966
(ulong) log_descriptor.bc.buffer));
4970
check if we switched buffer and need process it (current buffer is
4971
unlocked already => we will not delay other threads
4973
if (buffer_to_flush != NULL)
4976
rc= translog_buffer_flush(buffer_to_flush);
4977
translog_buffer_unlock(buffer_to_flush);
4985
@brief Calculates and write LSN difference (compressed LSN).
4987
@param base_lsn LSN from which we calculate difference
4988
@param lsn LSN for codding
4989
@param dst Result will be written to dst[-pack_length] .. dst[-1]
4991
@note To store an LSN in a compact way we will use the following compression:
4992
If a log record has LSN1, and it contains the LSN2 as a back reference,
4993
Instead of LSN2 we write LSN1-LSN2, encoded as:
4994
two bits the number N (see below)
4997
That is, LSN is encoded in 2..5 bytes, and the number of bytes minus 2
4998
is stored in the first two bits.
5000
@note function made to write the result in backward direction with no
5001
special sense or tricks both directions are equal in complicity
5003
@retval # pointer on coded LSN
5006
static uchar *translog_put_LSN_diff(LSN base_lsn, LSN lsn, uchar *dst)
5009
DBUG_ENTER("translog_put_LSN_diff");
5010
DBUG_PRINT("enter", ("Base: (%lu,0x%lx) val: (%lu,0x%lx) dst: 0x%lx",
5011
LSN_IN_PARTS(base_lsn), LSN_IN_PARTS(lsn),
5013
DBUG_ASSERT(base_lsn > lsn);
5014
diff= base_lsn - lsn;
5015
DBUG_PRINT("info", ("Diff: 0x%llx", (ulonglong) diff));
5020
Note we store this high uchar first to ensure that first uchar has
5021
0 in the 3 upper bits.
5023
dst[0]= (uchar)(diff >> 8);
5024
dst[1]= (uchar)(diff & 0xFF);
5026
else if (diff <= 0x3FFFFFL)
5029
dst[0]= (uchar)(0x40 | (diff >> 16));
5030
int2store(dst + 1, diff & 0xFFFF);
5032
else if (diff <= 0x3FFFFFFFL)
5035
dst[0]= (uchar)(0x80 | (diff >> 24));
5036
int3store(dst + 1, diff & 0xFFFFFFL);
5038
else if (diff <= LL(0x3FFFFFFFFF))
5042
dst[0]= (uchar)(0xC0 | (diff >> 32));
5043
int4store(dst + 1, diff & 0xFFFFFFFFL);
5048
It is full LSN after special 1 diff (which is impossible
5051
dst-= 2 + LSN_STORE_SIZE;
5054
lsn_store(dst + 2, lsn);
5056
DBUG_PRINT("info", ("new dst: 0x%lx", (ulong) dst));
5062
Get LSN from LSN-difference (compressed LSN)
5065
translog_get_LSN_from_diff()
5066
base_lsn LSN from which we calculate difference
5067
src pointer to coded lsn
5068
dst pointer to buffer where to write 7byte LSN
5071
To store an LSN in a compact way we will use the following compression:
5073
If a log record has LSN1, and it contains the lSN2 as a back reference,
5074
Instead of LSN2 we write LSN1-LSN2, encoded as:
5076
two bits the number N (see below)
5080
That is, LSN is encoded in 2..5 bytes, and the number of bytes minus 2
5081
is stored in the first two bits.
5084
pointer to buffer after decoded LSN
5087
static uchar *translog_get_LSN_from_diff(LSN base_lsn, uchar *src, uchar *dst)
5092
uint32 file_no, rec_offset;
5094
DBUG_ENTER("translog_get_LSN_from_diff");
5095
DBUG_PRINT("enter", ("Base: (%lu,0x%lx) src: 0x%lx dst 0x%lx",
5096
LSN_IN_PARTS(base_lsn), (ulong) src, (ulong) dst));
5097
first_byte= *((uint8*) src);
5098
code= first_byte >> 6; /* Length is in 2 most significant bits */
5100
src++; /* Skip length + encode */
5101
file_no= LSN_FILE_NO(base_lsn); /* Assume relative */
5102
DBUG_PRINT("info", ("code: %u first byte: %lu",
5103
(uint) code, (ulong) first_byte));
5106
if (first_byte == 0 && *((uint8*)src) == 1)
5109
It is full LSN after special 1 diff (which is impossible
5112
memcpy(dst, src + 1, LSN_STORE_SIZE);
5113
DBUG_PRINT("info", ("Special case of full LSN, new src: 0x%lx",
5114
(ulong) (src + 1 + LSN_STORE_SIZE)));
5115
DBUG_RETURN(src + 1 + LSN_STORE_SIZE);
5117
rec_offset= LSN_OFFSET(base_lsn) - ((first_byte << 8) + *((uint8*)src));
5120
diff= uint2korr(src);
5121
rec_offset= LSN_OFFSET(base_lsn) - ((first_byte << 16) + diff);
5124
diff= uint3korr(src);
5125
rec_offset= LSN_OFFSET(base_lsn) - ((first_byte << 24) + diff);
5129
ulonglong base_offset= LSN_OFFSET(base_lsn);
5130
diff= uint4korr(src);
5131
if (diff > LSN_OFFSET(base_lsn))
5133
/* take 1 from file offset */
5135
base_offset+= LL(0x100000000);
5137
file_no= LSN_FILE_NO(base_lsn) - first_byte;
5138
DBUG_ASSERT(base_offset - diff <= UINT_MAX);
5139
rec_offset= (uint32)(base_offset - diff);
5146
lsn= MAKE_LSN(file_no, rec_offset);
5148
lsn_store(dst, lsn);
5149
DBUG_PRINT("info", ("new src: 0x%lx", (ulong) src));
5155
@brief Encodes relative LSNs listed in the parameters.
5157
@param parts Parts list with encoded LSN(s)
5158
@param base_lsn LSN which is base for encoding
5159
@param lsns number of LSN(s) to encode
5160
@param compressed_LSNs buffer which can be used for storing compressed LSN(s)
5163
static void translog_relative_LSN_encode(struct st_translog_parts *parts,
5165
uint lsns, uchar *compressed_LSNs)
5168
uint lsns_len= lsns * LSN_STORE_SIZE;
5169
char buffer_src[MAX_NUMBER_OF_LSNS_PER_RECORD * LSN_STORE_SIZE];
5170
char *buffer= buffer_src;
5171
const char *cbuffer;
5173
DBUG_ENTER("translog_relative_LSN_encode");
5175
DBUG_ASSERT(parts->current != 0);
5176
part= parts->parts + parts->current;
5178
/* collect all LSN(s) in one chunk if it (they) is (are) divided */
5179
if (part->length < lsns_len)
5181
uint copied= part->length;
5182
LEX_CUSTRING *next_part;
5183
DBUG_PRINT("info", ("Using buffer: 0x%lx", (ulong) compressed_LSNs));
5184
memcpy(buffer, part->str, part->length);
5185
next_part= parts->parts + parts->current + 1;
5188
DBUG_ASSERT(next_part < parts->parts + parts->elements);
5189
if ((next_part->length + copied) < lsns_len)
5191
memcpy(buffer + copied, next_part->str,
5193
copied+= next_part->length;
5194
next_part->length= 0; next_part->str= 0;
5195
/* delete_dynamic_element(&parts->parts, parts->current + 1); */
5198
part= parts->parts + parts->current;
5202
uint len= lsns_len - copied;
5203
memcpy(buffer + copied, next_part->str, len);
5205
next_part->str+= len;
5206
next_part->length-= len;
5208
} while (copied < lsns_len);
5214
part->str+= lsns_len;
5215
part->length-= lsns_len;
5217
part= parts->parts + parts->current;
5224
const uchar *src_ptr;
5225
uchar *dst_ptr= compressed_LSNs + (MAX_NUMBER_OF_LSNS_PER_RECORD *
5226
COMPRESSED_LSN_MAX_STORE_SIZE);
5228
We write the result in backward direction with no special sense or
5229
tricks both directions are equal in complicity
5231
for (src_ptr= cbuffer + lsns_len - LSN_STORE_SIZE;
5232
src_ptr >= (const uchar*)cbuffer;
5233
src_ptr-= LSN_STORE_SIZE)
5235
ref= lsn_korr(src_ptr);
5236
dst_ptr= translog_put_LSN_diff(base_lsn, ref, dst_ptr);
5238
part->length= (uint)((compressed_LSNs +
5239
(MAX_NUMBER_OF_LSNS_PER_RECORD *
5240
COMPRESSED_LSN_MAX_STORE_SIZE)) -
5242
parts->record_length-= (economy= lsns_len - part->length);
5243
DBUG_PRINT("info", ("new length of LSNs: %lu economy: %d",
5244
(ulong)part->length, economy));
5245
parts->total_record_length-= economy;
5246
part->str= (char*)dst_ptr;
5253
@brief Write multi-group variable-size record.
5255
@param lsn LSN of the record will be written here
5256
@param type the log record type
5257
@param short_trid Short transaction ID or 0 if it has no sense
5258
@param parts Descriptor of record source parts
5259
@param buffer_to_flush Buffer which have to be flushed if it is not 0
5260
@param header_length Header length calculated for 1 group
5261
@param buffer_rest Beginning from which we plan to write in full pages
5262
@param trn Transaction structure pointer for hooks by
5263
record log type, for short_id
5264
@param hook_arg Argument which will be passed to pre-write and
5265
in-write hooks of this record.
5268
We must have a translog_lock() when entering this function
5270
We must have buffer_to_flush locked (if not null)
5271
buffer_to_flush should *NOT* be locked when calling this function.
5272
(This is note is here as this is different from most other
5273
translog_write...() functions which require the buffer to be locked)
5275
@return Operation status
5281
translog_write_variable_record_mgroup(LSN *lsn,
5282
enum translog_record_type type,
5284
SHORT_TRANSACTION_ID short_trid,
5285
struct st_translog_parts *parts,
5286
struct st_translog_buffer
5288
uint16 header_length,
5289
translog_size_t buffer_rest,
5290
TRN *trn, void *hook_arg)
5292
TRANSLOG_ADDRESS horizon;
5293
struct st_buffer_cursor cursor;
5295
uint i, chunk2_page, full_pages;
5297
translog_size_t record_rest, first_page, chunk3_pages, chunk0_pages= 1;
5298
translog_size_t done= 0;
5299
struct st_translog_group_descriptor group;
5300
DYNAMIC_ARRAY groups;
5302
uint16 page_capacity= log_descriptor.page_capacity_chunk_2 + 1;
5303
uint16 last_page_capacity;
5304
my_bool new_page_before_chunk0= 1, first_chunk0= 1;
5305
uchar chunk0_header[1 + 2 + 5 + 2 + 2], group_desc[7 + 1];
5306
uchar chunk2_header[1];
5307
uint header_fixed_part= header_length + 2;
5308
uint groups_per_page= (page_capacity - header_fixed_part) / (7 + 1);
5309
uint file_of_the_first_group;
5311
struct st_translog_buffer *buffer_of_last_lsn;
5312
DBUG_ENTER("translog_write_variable_record_mgroup");
5313
translog_lock_assert_owner();
5315
chunk2_header[0]= TRANSLOG_CHUNK_NOHDR;
5317
if (my_init_dynamic_array(&groups,
5318
sizeof(struct st_translog_group_descriptor),
5322
DBUG_PRINT("error", ("init array failed"));
5326
first_page= translog_get_current_page_rest();
5327
record_rest= parts->record_length - (first_page - 1);
5328
DBUG_PRINT("info", ("Record Rest: %lu", (ulong) record_rest));
5330
if (record_rest < buffer_rest)
5333
The record (group 1 type) is larger than the free space on the page
5334
- we need to split it in two. But when we split it in two, the first
5335
part is big enough to hold all the data of the record (because the
5336
header of the first part of the split is smaller than the header of
5337
the record as a whole when it takes only one chunk)
5339
DBUG_PRINT("info", ("too many free space because changing header"));
5340
buffer_rest-= log_descriptor.page_capacity_chunk_2;
5341
DBUG_ASSERT(record_rest >= buffer_rest);
5344
file_of_the_first_group= LSN_FILE_NO(log_descriptor.horizon);
5345
translog_mark_file_unfinished(file_of_the_first_group);
5348
group.addr= horizon= log_descriptor.horizon;
5349
cursor= log_descriptor.bc;
5351
if ((full_pages= buffer_rest / log_descriptor.page_capacity_chunk_2) > 255)
5353
/* sizeof(uint8) == 256 is max number of chunk in multi-chunks group */
5355
buffer_rest= full_pages * log_descriptor.page_capacity_chunk_2;
5359
full pages + first page (which actually can be full, too).
5360
But here we assign number of chunks - 1
5362
group.num= full_pages;
5363
if (insert_dynamic(&groups, (uchar*) &group))
5365
DBUG_PRINT("error", ("insert into array failed"));
5369
DBUG_PRINT("info", ("chunk: #%u first_page: %u (%u) "
5370
"full_pages: %lu (%lu) "
5373
first_page, first_page - 1,
5375
(ulong) (full_pages *
5376
log_descriptor.page_capacity_chunk_2),
5377
(ulong)(parts->record_length - (first_page - 1 +
5380
rc|= translog_advance_pointer((int)full_pages, 0);
5384
if (buffer_to_flush != NULL)
5386
translog_buffer_decrease_writers(buffer_to_flush);
5388
rc= translog_buffer_flush(buffer_to_flush);
5389
translog_buffer_unlock(buffer_to_flush);
5390
buffer_to_flush= NULL;
5394
DBUG_PRINT("error", ("flush of unlock buffer failed"));
5398
translog_write_data_on_page(&horizon, &cursor, 1, chunk2_header);
5399
translog_write_parts_on_page(&horizon, &cursor, first_page - 1, parts);
5400
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx) "
5402
LSN_IN_PARTS(log_descriptor.horizon),
5403
LSN_IN_PARTS(horizon),
5404
(ulong) (parts->record_length - (first_page - 1) -
5407
for (i= 0; i < full_pages; i++)
5409
if (translog_write_variable_record_chunk2_page(parts, &horizon, &cursor))
5412
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) "
5413
"local: (%lu,0x%lx) "
5415
LSN_IN_PARTS(log_descriptor.horizon),
5416
LSN_IN_PARTS(horizon),
5417
(ulong) (parts->record_length - (first_page - 1) -
5418
i * log_descriptor.page_capacity_chunk_2 -
5422
done+= (first_page - 1 + buffer_rest);
5424
if (translog_chaser_page_next(&horizon, &cursor))
5426
DBUG_PRINT("error", ("flush of unlock buffer failed"));
5429
translog_buffer_lock(cursor.buffer);
5430
translog_buffer_decrease_writers(cursor.buffer);
5431
translog_buffer_unlock(cursor.buffer);
5435
/* Check that we have place for chunk type 2 */
5436
first_page= translog_get_current_page_rest();
5437
if (first_page <= 1)
5439
if (translog_page_next(&log_descriptor.horizon, &log_descriptor.bc,
5442
first_page= translog_get_current_page_rest();
5444
buffer_rest= translog_get_current_group_size();
5445
} while ((translog_size_t)(first_page + buffer_rest) <
5446
(translog_size_t)(parts->record_length - done));
5448
group.addr= horizon= log_descriptor.horizon;
5449
cursor= log_descriptor.bc;
5451
group.num= 0; /* 0 because it does not matter */
5452
if (insert_dynamic(&groups, (uchar*) &group))
5454
DBUG_PRINT("error", ("insert into array failed"));
5457
record_rest= parts->record_length - done;
5458
DBUG_PRINT("info", ("Record rest: %lu", (ulong) record_rest));
5459
if (first_page > record_rest + 1)
5462
We have not so much data to fill all first page
5463
(no speaking about full pages)
5467
<chunk0>...<chunk0><chunk0 <data>>
5469
<chunk3 <data>><chunk0>...<chunk0><chunk0 <possible data of 1 byte>>
5471
chunk2_page= full_pages= 0;
5472
last_page_capacity= first_page;
5479
<chunk2 <data>>...<chunk2 <data>><chunk0 <data>>
5481
<chunk2 <data>>...<chunk2 <data>><chunk0>...<chunk0><chunk0 <data>>
5483
<chunk3 <data>><chunk0>...<chunk0><chunk0 <possible data of 1 byte>>
5486
record_rest-= (first_page - 1);
5487
pages_to_skip= full_pages=
5488
record_rest / log_descriptor.page_capacity_chunk_2;
5489
record_rest= (record_rest % log_descriptor.page_capacity_chunk_2);
5490
last_page_capacity= page_capacity;
5494
if (last_page_capacity > record_rest + 1 && record_rest != 0)
5496
if (last_page_capacity >
5497
record_rest + header_fixed_part + groups.elements * (7 + 1))
5499
/* 1 record of type 0 */
5506
if (record_rest + 2 == last_page_capacity)
5508
chunk3_size= record_rest - 1;
5513
chunk3_size= record_rest;
5519
A first non-full page will hold type 0 chunk only if it fit in it with
5522
while (page_capacity <
5523
record_rest + header_fixed_part +
5524
(groups.elements - groups_per_page * (chunk0_pages - 1)) * (7 + 1))
5526
DBUG_PRINT("info", ("chunk0_pages: %u groups %u groups per full page: %u "
5527
"Group on last page: %u",
5528
chunk0_pages, groups.elements,
5531
((page_capacity - header_fixed_part) / (7 + 1)) *
5532
(chunk0_pages - 1))));
5533
DBUG_PRINT("info", ("first_page: %u chunk2: %u full_pages: %u (%lu) "
5534
"chunk3: %u (%u) rest: %u",
5536
chunk2_page, full_pages,
5537
(ulong) full_pages *
5538
log_descriptor.page_capacity_chunk_2,
5539
chunk3_pages, (uint) chunk3_size, (uint) record_rest));
5540
rc= translog_advance_pointer(pages_to_skip + (int)(chunk0_pages - 1),
5541
record_rest + header_fixed_part +
5544
header_fixed_part) / (7 + 1)) *
5545
(chunk0_pages - 1)) * (7 + 1));
5546
buffer_of_last_lsn= log_descriptor.bc.buffer;
5549
if (buffer_to_flush != NULL)
5551
translog_buffer_decrease_writers(buffer_to_flush);
5553
rc= translog_buffer_flush(buffer_to_flush);
5554
translog_buffer_unlock(buffer_to_flush);
5555
buffer_to_flush= NULL;
5559
DBUG_PRINT("error", ("flush of unlock buffer failed"));
5568
DBUG_PRINT("info", ("chunk 2 to finish first page"));
5569
translog_write_data_on_page(&horizon, &cursor, 1, chunk2_header);
5570
translog_write_parts_on_page(&horizon, &cursor, first_page - 1, parts);
5571
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx) "
5573
LSN_IN_PARTS(log_descriptor.horizon),
5574
LSN_IN_PARTS(horizon),
5575
(ulong) (parts->record_length - (first_page - 1) -
5578
else if (chunk3_pages)
5580
uchar chunk3_header[3];
5581
DBUG_PRINT("info", ("chunk 3"));
5582
DBUG_ASSERT(full_pages == 0);
5584
chunk3_header[0]= TRANSLOG_CHUNK_LNGTH;
5585
int2store(chunk3_header + 1, chunk3_size);
5586
translog_write_data_on_page(&horizon, &cursor, 3, chunk3_header);
5587
translog_write_parts_on_page(&horizon, &cursor, chunk3_size, parts);
5588
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx) "
5590
LSN_IN_PARTS(log_descriptor.horizon),
5591
LSN_IN_PARTS(horizon),
5592
(ulong) (parts->record_length - chunk3_size - done)));
5596
DBUG_PRINT("info", ("no new_page_before_chunk0"));
5597
new_page_before_chunk0= 0;
5600
for (i= 0; i < full_pages; i++)
5602
DBUG_ASSERT(chunk2_page != 0);
5603
if (translog_write_variable_record_chunk2_page(parts, &horizon, &cursor))
5606
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx) "
5608
LSN_IN_PARTS(log_descriptor.horizon),
5609
LSN_IN_PARTS(horizon),
5610
(ulong) (parts->record_length - (first_page - 1) -
5611
i * log_descriptor.page_capacity_chunk_2 -
5616
translog_write_variable_record_chunk3_page(parts,
5620
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx)",
5621
LSN_IN_PARTS(log_descriptor.horizon),
5622
LSN_IN_PARTS(horizon)));
5624
*chunk0_header= (uchar) (type | TRANSLOG_CHUNK_LSN);
5625
int2store(chunk0_header + 1, short_trid);
5626
translog_write_variable_record_1group_code_len(chunk0_header + 3,
5627
parts->record_length,
5632
if (new_page_before_chunk0 &&
5633
translog_chaser_page_next(&horizon, &cursor))
5635
DBUG_PRINT("error", ("flush of unlock buffer failed"));
5638
new_page_before_chunk0= 1;
5645
We can drop "log_descriptor.is_everything_flushed" earlier when have
5646
lock on loghandler and assign initial value of "horizon" variable or
5647
before unlocking loghandler (because we will increase writers
5648
counter on the buffer and every thread which wanted flush the buffer
5649
will wait till we finish with it). But IMHO better here take short
5650
lock and do not bother other threads with waiting.
5653
set_lsn(lsn, horizon);
5654
buffer_of_last_lsn->last_lsn= *lsn;
5655
DBUG_PRINT("info", ("last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
5656
LSN_IN_PARTS(buffer_of_last_lsn->last_lsn),
5657
(ulong) buffer_of_last_lsn));
5658
if (log_record_type_descriptor[type].inwrite_hook &&
5659
(*log_record_type_descriptor[type].inwrite_hook) (type, trn,
5667
A first non-full page will hold type 0 chunk only if it fit in it with
5668
all its headers => the fist page is full or number of groups less then
5669
possible number of full page.
5671
limit= (groups_per_page < groups.elements - curr_group ?
5672
groups_per_page : groups.elements - curr_group);
5673
DBUG_PRINT("info", ("Groups: %u curr: %u limit: %u",
5674
(uint) groups.elements, (uint) curr_group,
5677
if (chunk0_pages == 1)
5679
DBUG_PRINT("info", ("chunk_len: 2 + %u * (7+1) + %u = %u",
5680
(uint) limit, (uint) record_rest,
5681
(uint) (2 + limit * (7 + 1) + record_rest)));
5682
int2store(chunk0_header + header_length - 2,
5683
2 + limit * (7 + 1) + record_rest);
5687
DBUG_PRINT("info", ("chunk_len: 2 + %u * (7+1) = %u",
5688
(uint) limit, (uint) (2 + limit * (7 + 1))));
5689
int2store(chunk0_header + header_length - 2, 2 + limit * (7 + 1));
5691
int2store(chunk0_header + header_length, groups.elements - curr_group);
5692
translog_write_data_on_page(&horizon, &cursor, header_fixed_part,
5694
for (i= curr_group; i < limit + curr_group; i++)
5696
struct st_translog_group_descriptor *grp_ptr;
5697
grp_ptr= dynamic_element(&groups, i,
5698
struct st_translog_group_descriptor *);
5699
lsn_store(group_desc, grp_ptr->addr);
5700
group_desc[7]= grp_ptr->num;
5701
translog_write_data_on_page(&horizon, &cursor, (7 + 1), group_desc);
5704
if (chunk0_pages == 1 && record_rest != 0)
5705
translog_write_parts_on_page(&horizon, &cursor, record_rest, parts);
5709
/* put special type to indicate that it is not LSN chunk */
5710
*chunk0_header= (uchar) (TRANSLOG_CHUNK_LSN | TRANSLOG_CHUNK_0_CONT);
5711
} while (chunk0_pages != 0);
5712
translog_buffer_lock(cursor.buffer);
5713
translog_buffer_decrease_writers(cursor.buffer);
5714
translog_buffer_unlock(cursor.buffer);
5717
if (translog_set_lsn_for_files(file_of_the_first_group, LSN_FILE_NO(*lsn),
5721
translog_mark_file_finished(file_of_the_first_group);
5723
delete_dynamic(&groups);
5731
if (buffer_to_flush != NULL)
5733
/* This is to prevent locking buffer forever in case of error */
5734
translog_buffer_decrease_writers(buffer_to_flush);
5736
rc= translog_buffer_flush(buffer_to_flush);
5737
translog_buffer_unlock(buffer_to_flush);
5738
buffer_to_flush= NULL;
5742
translog_mark_file_finished(file_of_the_first_group);
5744
delete_dynamic(&groups);
5750
@brief Write the variable length log record.
5752
@param lsn LSN of the record will be written here
5753
@param type the log record type
5754
@param short_trid Short transaction ID or 0 if it has no sense
5755
@param parts Descriptor of record source parts
5756
@param trn Transaction structure pointer for hooks by
5757
record log type, for short_id
5758
@param hook_arg Argument which will be passed to pre-write and
5759
in-write hooks of this record.
5761
@return Operation status
5766
static my_bool translog_write_variable_record(LSN *lsn,
5767
enum translog_record_type type,
5769
SHORT_TRANSACTION_ID short_trid,
5770
struct st_translog_parts *parts,
5771
TRN *trn, void *hook_arg)
5773
struct st_translog_buffer *buffer_to_flush= NULL;
5774
uint header_length1= 1 + 2 + 2 +
5775
translog_variable_record_length_bytes(parts->record_length);
5778
/* Max number of such LSNs per record is 2 */
5779
uchar compressed_LSNs[MAX_NUMBER_OF_LSNS_PER_RECORD *
5780
COMPRESSED_LSN_MAX_STORE_SIZE];
5782
DBUG_ENTER("translog_write_variable_record");
5785
DBUG_PRINT("info", ("horizon: (%lu,0x%lx)",
5786
LSN_IN_PARTS(log_descriptor.horizon)));
5787
page_rest= TRANSLOG_PAGE_SIZE - log_descriptor.bc.current_page_fill;
5788
DBUG_PRINT("info", ("header length: %u page_rest: %u",
5789
header_length1, page_rest));
5792
header and part which we should read have to fit in one chunk
5793
TODO: allow to divide readable header
5796
(header_length1 + log_record_type_descriptor[type].read_header_len))
5799
("Next page, size: %u header: %u + %u",
5800
log_descriptor.bc.current_page_fill,
5802
log_record_type_descriptor[type].read_header_len));
5803
translog_page_next(&log_descriptor.horizon, &log_descriptor.bc,
5805
/* Chunk 2 header is 1 byte, so full page capacity will be one uchar more */
5806
page_rest= log_descriptor.page_capacity_chunk_2 + 1;
5807
DBUG_PRINT("info", ("page_rest: %u", page_rest));
5811
To minimize compressed size we will compress always relative to
5812
very first chunk address (log_descriptor.horizon for now)
5814
if (log_record_type_descriptor[type].compressed_LSN > 0)
5816
translog_relative_LSN_encode(parts, log_descriptor.horizon,
5817
log_record_type_descriptor[type].
5818
compressed_LSN, compressed_LSNs);
5819
/* recalculate header length after compression */
5820
header_length1= 1 + 2 + 2 +
5821
translog_variable_record_length_bytes(parts->record_length);
5822
DBUG_PRINT("info", ("after compressing LSN(s) header length: %u "
5823
"record length: %lu",
5824
header_length1, (ulong)parts->record_length));
5827
/* TODO: check space on current page for header + few bytes */
5828
if (page_rest >= parts->record_length + header_length1)
5830
/* following function makes translog_unlock(); */
5831
res= translog_write_variable_record_1chunk(lsn, type, tbl_info,
5833
parts, buffer_to_flush,
5834
header_length1, trn, hook_arg);
5838
buffer_rest= translog_get_current_group_size();
5840
if (buffer_rest >= parts->record_length + header_length1 - page_rest)
5842
/* following function makes translog_unlock(); */
5843
res= translog_write_variable_record_1group(lsn, type, tbl_info,
5845
parts, buffer_to_flush,
5846
header_length1, trn, hook_arg);
5849
/* following function makes translog_unlock(); */
5850
res= translog_write_variable_record_mgroup(lsn, type, tbl_info,
5852
parts, buffer_to_flush,
5854
buffer_rest, trn, hook_arg);
5860
@brief Write the fixed and pseudo-fixed log record.
5862
@param lsn LSN of the record will be written here
5863
@param type the log record type
5864
@param short_trid Short transaction ID or 0 if it has no sense
5865
@param parts Descriptor of record source parts
5866
@param trn Transaction structure pointer for hooks by
5867
record log type, for short_id
5868
@param hook_arg Argument which will be passed to pre-write and
5869
in-write hooks of this record.
5871
@return Operation status
5876
static my_bool translog_write_fixed_record(LSN *lsn,
5877
enum translog_record_type type,
5879
SHORT_TRANSACTION_ID short_trid,
5880
struct st_translog_parts *parts,
5881
TRN *trn, void *hook_arg)
5883
struct st_translog_buffer *buffer_to_flush= NULL;
5884
uchar chunk1_header[1 + 2];
5885
/* Max number of such LSNs per record is 2 */
5886
uchar compressed_LSNs[MAX_NUMBER_OF_LSNS_PER_RECORD *
5887
COMPRESSED_LSN_MAX_STORE_SIZE];
5890
DBUG_ENTER("translog_write_fixed_record");
5891
DBUG_ASSERT((log_record_type_descriptor[type].rclass ==
5892
LOGRECTYPE_FIXEDLENGTH &&
5893
parts->record_length ==
5894
log_record_type_descriptor[type].fixed_length) ||
5895
(log_record_type_descriptor[type].rclass ==
5896
LOGRECTYPE_PSEUDOFIXEDLENGTH &&
5897
parts->record_length ==
5898
log_record_type_descriptor[type].fixed_length));
5901
DBUG_PRINT("info", ("horizon: (%lu,0x%lx)",
5902
LSN_IN_PARTS(log_descriptor.horizon)));
5904
DBUG_ASSERT(log_descriptor.bc.current_page_fill <= TRANSLOG_PAGE_SIZE);
5906
("Page size: %u record: %u next cond: %d",
5907
log_descriptor.bc.current_page_fill,
5908
(parts->record_length +
5909
log_record_type_descriptor[type].compressed_LSN * 2 + 3),
5910
((((uint) log_descriptor.bc.current_page_fill) +
5911
(parts->record_length +
5912
log_record_type_descriptor[type].compressed_LSN * 2 + 3)) >
5913
TRANSLOG_PAGE_SIZE)));
5915
check that there is enough place on current page.
5916
NOTE: compressing may increase page LSN size on two bytes for every LSN
5918
if ((((uint) log_descriptor.bc.current_page_fill) +
5919
(parts->record_length +
5920
log_record_type_descriptor[type].compressed_LSN * 2 + 3)) >
5923
DBUG_PRINT("info", ("Next page"));
5924
if (translog_page_next(&log_descriptor.horizon, &log_descriptor.bc,
5926
goto err; /* rc == 1 */
5927
if (buffer_to_flush)
5928
translog_buffer_lock_assert_owner(buffer_to_flush);
5931
set_lsn(lsn, log_descriptor.horizon);
5932
if (translog_set_lsn_for_files(LSN_FILE_NO(*lsn), LSN_FILE_NO(*lsn),
5934
(log_record_type_descriptor[type].inwrite_hook &&
5935
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, tbl_info,
5940
if (log_record_type_descriptor[type].rclass ==
5941
LOGRECTYPE_PSEUDOFIXEDLENGTH)
5943
DBUG_ASSERT(log_record_type_descriptor[type].compressed_LSN > 0);
5944
translog_relative_LSN_encode(parts, *lsn,
5945
log_record_type_descriptor[type].
5946
compressed_LSN, compressed_LSNs);
5950
Write the whole record at once (we know that there is enough place on
5951
the destination page)
5953
DBUG_ASSERT(parts->current != 0); /* first part is left for header */
5954
part= parts->parts + (--parts->current);
5955
parts->total_record_length+= (translog_size_t) (part->length= 1 + 2);
5956
part->str= (char*)chunk1_header;
5957
*chunk1_header= (uchar) (type | TRANSLOG_CHUNK_FIXED);
5958
int2store(chunk1_header + 1, short_trid);
5960
rc= translog_write_parts_on_page(&log_descriptor.horizon,
5962
parts->total_record_length, parts);
5964
log_descriptor.bc.buffer->last_lsn= *lsn;
5965
DBUG_PRINT("info", ("last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
5966
LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn),
5967
(ulong) log_descriptor.bc.buffer));
5973
check if we switched buffer and need process it (current buffer is
5974
unlocked already => we will not delay other threads
5976
if (buffer_to_flush != NULL)
5979
rc= translog_buffer_flush(buffer_to_flush);
5980
translog_buffer_unlock(buffer_to_flush);
5988
@brief Writes the log record
5990
If share has no 2-byte-id yet, gives an id to the share and logs
5991
LOGREC_FILE_ID. If transaction has not logged LOGREC_LONG_TRANSACTION_ID
5994
@param lsn LSN of the record will be written here
5995
@param type the log record type
5996
@param trn Transaction structure pointer for hooks by
5997
record log type, for short_id
5998
@param tbl_info MARIA_HA of table or NULL
5999
@param rec_len record length or 0 (count it)
6000
@param part_no number of parts or 0 (count it)
6001
@param parts_data zero ended (in case of number of parts is 0)
6002
array of LEX_STRINGs (parts), first
6003
TRANSLOG_INTERNAL_PARTS positions in the log
6004
should be unused (need for loghandler)
6005
@param store_share_id if tbl_info!=NULL then share's id will
6006
automatically be stored in the two first bytes
6007
pointed (so pointer is assumed to be !=NULL)
6008
@param hook_arg argument which will be passed to pre-write and
6009
in-write hooks of this record.
6011
@return Operation status
6016
my_bool translog_write_record(LSN *lsn,
6017
enum translog_record_type type,
6018
TRN *trn, MARIA_HA *tbl_info,
6019
translog_size_t rec_len,
6021
LEX_CUSTRING *parts_data,
6022
uchar *store_share_id,
6025
struct st_translog_parts parts;
6028
uint short_trid= trn->short_id;
6029
DBUG_ENTER("translog_write_record");
6030
DBUG_PRINT("enter", ("type: %u (%s) ShortTrID: %u rec_len: %lu",
6031
(uint) type, log_record_type_descriptor[type].name,
6032
(uint) short_trid, (ulong) rec_len));
6033
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
6034
translog_status == TRANSLOG_READONLY);
6035
if (unlikely(translog_status != TRANSLOG_OK))
6037
DBUG_PRINT("error", ("Transaction log is write protected"));
6043
MARIA_SHARE *share= tbl_info->s;
6044
DBUG_ASSERT(share->now_transactional);
6045
if (unlikely(share->id == 0))
6048
First log write for this MARIA_SHARE; give it a short id.
6049
When the lock manager is enabled and needs a short id, it should be
6050
assigned in the lock manager (because row locks will be taken before
6051
log records are written; for example SELECT FOR UPDATE takes locks but
6052
writes no log record.
6054
if (unlikely(translog_assign_id_to_share(tbl_info, trn)))
6057
fileid_store(store_share_id, share->id);
6059
if (unlikely(!(trn->first_undo_lsn & TRANSACTION_LOGGED_LONG_ID)))
6062
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
6064
int6store(log_data, trn->trid);
6065
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
6066
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
6067
trn->first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID; /* no recursion */
6068
if (unlikely(translog_write_record(&dummy_lsn, LOGREC_LONG_TRANSACTION_ID,
6069
trn, NULL, sizeof(log_data),
6070
sizeof(log_array)/sizeof(log_array[0]),
6071
log_array, NULL, NULL)))
6075
parts.parts= parts_data;
6077
/* count parts if they are not counted by upper level */
6080
for (part_no= TRANSLOG_INTERNAL_PARTS;
6081
parts_data[part_no].length != 0;
6084
parts.elements= part_no;
6085
parts.current= TRANSLOG_INTERNAL_PARTS;
6087
/* clear TRANSLOG_INTERNAL_PARTS */
6088
compile_time_assert(TRANSLOG_INTERNAL_PARTS != 0);
6089
parts_data[0].str= 0;
6090
parts_data[0].length= 0;
6092
/* count length of the record */
6095
for(part= parts_data + TRANSLOG_INTERNAL_PARTS;\
6096
part < parts_data + part_no;
6099
rec_len+= (translog_size_t) part->length;
6102
parts.record_length= rec_len;
6109
ha_checksum checksum= 0;
6111
for (i= TRANSLOG_INTERNAL_PARTS; i < part_no; i++)
6114
/* Find unitialized bytes early */
6115
checksum+= my_checksum(checksum, parts_data[i].str,
6116
parts_data[i].length);
6118
len+= parts_data[i].length;
6120
DBUG_ASSERT(len == rec_len);
6124
Start total_record_length from record_length then overhead will
6127
parts.total_record_length= parts.record_length;
6128
DBUG_PRINT("info", ("record length: %lu", (ulong) parts.record_length));
6130
/* process this parts */
6131
if (!(rc= (log_record_type_descriptor[type].prewrite_hook &&
6132
(*log_record_type_descriptor[type].prewrite_hook) (type, trn,
6136
switch (log_record_type_descriptor[type].rclass) {
6137
case LOGRECTYPE_VARIABLE_LENGTH:
6138
rc= translog_write_variable_record(lsn, type, tbl_info,
6139
short_trid, &parts, trn, hook_arg);
6141
case LOGRECTYPE_PSEUDOFIXEDLENGTH:
6142
case LOGRECTYPE_FIXEDLENGTH:
6143
rc= translog_write_fixed_record(lsn, type, tbl_info,
6144
short_trid, &parts, trn, hook_arg);
6146
case LOGRECTYPE_NOT_ALLOWED:
6153
DBUG_PRINT("info", ("LSN: (%lu,0x%lx)", LSN_IN_PARTS(*lsn)));
6159
Decode compressed (relative) LSN(s)
6162
translog_relative_lsn_decode()
6163
base_lsn LSN for encoding
6164
src Decode LSN(s) from here
6165
dst Put decoded LSNs here
6166
lsns number of LSN(s)
6169
position in sources after decoded LSN(s)
6172
static uchar *translog_relative_LSN_decode(LSN base_lsn,
6173
uchar *src, uchar *dst, uint lsns)
6176
for (i= 0; i < lsns; i++, dst+= LSN_STORE_SIZE)
6178
src= translog_get_LSN_from_diff(base_lsn, src, dst);
6184
@brief Get header of fixed/pseudo length record and call hook for
6187
@param page Pointer to the buffer with page where LSN chunk is
6189
@param page_offset Offset of the first chunk in the page
6190
@param buff Buffer to be filled with header data
6192
@return Length of header or operation status
6193
@retval # number of bytes in TRANSLOG_HEADER_BUFFER::header where
6194
stored decoded part of the header
6197
static int translog_fixed_length_header(uchar *page,
6198
translog_size_t page_offset,
6199
TRANSLOG_HEADER_BUFFER *buff)
6201
struct st_log_record_type_descriptor *desc=
6202
log_record_type_descriptor + buff->type;
6203
uchar *src= page + page_offset + 3;
6204
uchar *dst= buff->header;
6206
int lsns= desc->compressed_LSN;
6207
uint length= desc->fixed_length;
6208
DBUG_ENTER("translog_fixed_length_header");
6210
buff->record_length= length;
6212
if (desc->rclass == LOGRECTYPE_PSEUDOFIXEDLENGTH)
6214
DBUG_ASSERT(lsns > 0);
6215
src= translog_relative_LSN_decode(buff->lsn, src, dst, lsns);
6216
lsns*= LSN_STORE_SIZE;
6219
buff->compressed_LSN_economy= (lsns - (int) (src - start));
6222
buff->compressed_LSN_economy= 0;
6224
memcpy(dst, src, length);
6225
buff->non_header_data_start_offset= (uint16) (page_offset +
6227
(page + page_offset)));
6228
buff->non_header_data_len= 0;
6229
DBUG_RETURN(buff->record_length);
6234
Free resources used by TRANSLOG_HEADER_BUFFER
6237
translog_free_record_header();
6240
void translog_free_record_header(TRANSLOG_HEADER_BUFFER *buff)
6242
DBUG_ENTER("translog_free_record_header");
6243
if (buff->groups_no != 0)
6245
my_free((uchar*) buff->groups, MYF(0));
6253
@brief Returns the current horizon at the end of the current log
6256
@retval LSN_ERROR error
6260
TRANSLOG_ADDRESS translog_get_horizon()
6262
TRANSLOG_ADDRESS res;
6263
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
6264
translog_status == TRANSLOG_READONLY);
6266
res= log_descriptor.horizon;
6273
@brief Returns the current horizon at the end of the current log, caller is
6274
assumed to already hold the lock
6277
@retval LSN_ERROR error
6281
TRANSLOG_ADDRESS translog_get_horizon_no_lock()
6283
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
6284
translog_status == TRANSLOG_READONLY);
6285
translog_lock_assert_owner();
6286
return log_descriptor.horizon;
6291
Set last page in the scanner data structure
6294
translog_scanner_set_last_page()
6295
scanner Information about current chunk during scanning
6302
static my_bool translog_scanner_set_last_page(TRANSLOG_SCANNER_DATA *scanner)
6305
if (LSN_FILE_NO(scanner->page_addr) == LSN_FILE_NO(scanner->horizon))
6307
/* It is last file => we can easy find last page address by horizon */
6308
uint pagegrest= LSN_OFFSET(scanner->horizon) % TRANSLOG_PAGE_SIZE;
6309
scanner->last_file_page= (scanner->horizon -
6310
(pagegrest ? pagegrest : TRANSLOG_PAGE_SIZE));
6313
scanner->last_file_page= scanner->page_addr;
6314
return (translog_get_last_page_addr(&scanner->last_file_page, &page_ok, 0));
6319
@brief Get page from page cache according to requested method
6321
@param scanner The scanner data
6323
@return operation status
6329
translog_scanner_get_page(TRANSLOG_SCANNER_DATA *scanner)
6331
TRANSLOG_VALIDATOR_DATA data;
6332
DBUG_ENTER("translog_scanner_get_page");
6333
data.addr= &scanner->page_addr;
6334
data.was_recovered= 0;
6335
DBUG_RETURN((scanner->page=
6336
translog_get_page(&data, scanner->buffer,
6337
(scanner->use_direct_link ?
6338
&scanner->direct_link :
6345
@brief Initialize reader scanner.
6347
@param lsn LSN with which it have to be inited
6348
@param fixed_horizon true if it is OK do not read records which was written
6349
after scanning beginning
6350
@param scanner scanner which have to be inited
6351
@param use_direct prefer using direct lings from page handler
6352
where it is possible.
6354
@note If direct link was used translog_destroy_scanner should be
6355
called after it using
6357
@return status of the operation
6362
my_bool translog_scanner_init(LSN lsn,
6363
my_bool fixed_horizon,
6364
TRANSLOG_SCANNER_DATA *scanner,
6367
TRANSLOG_VALIDATOR_DATA data;
6368
DBUG_ENTER("translog_scanner_init");
6369
DBUG_PRINT("enter", ("Scanner: 0x%lx LSN: (%lu,0x%lx)",
6370
(ulong) scanner, LSN_IN_PARTS(lsn)));
6371
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
6372
translog_status == TRANSLOG_READONLY);
6374
data.addr= &scanner->page_addr;
6375
data.was_recovered= 0;
6377
scanner->page_offset= LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE;
6379
scanner->fixed_horizon= fixed_horizon;
6380
scanner->use_direct_link= use_direct;
6381
scanner->direct_link= NULL;
6383
scanner->horizon= translog_get_horizon();
6384
DBUG_PRINT("info", ("horizon: (%lu,0x%lx)", LSN_IN_PARTS(scanner->horizon)));
6387
DBUG_ASSERT(lsn <= scanner->horizon);
6389
scanner->page_addr= lsn;
6390
scanner->page_addr-= scanner->page_offset; /*decrease offset */
6392
if (translog_scanner_set_last_page(scanner))
6395
if (translog_scanner_get_page(scanner))
6402
@brief Destroy scanner object;
6404
@param scanner The scanner object to destroy
6407
void translog_destroy_scanner(TRANSLOG_SCANNER_DATA *scanner)
6409
DBUG_ENTER("translog_destroy_scanner");
6410
DBUG_PRINT("enter", ("Scanner: 0x%lx", (ulong)scanner));
6411
translog_free_link(scanner->direct_link);
6417
Checks End of the Log
6420
translog_scanner_eol()
6421
scanner Information about current chunk during scanning
6428
static my_bool translog_scanner_eol(TRANSLOG_SCANNER_DATA *scanner)
6430
DBUG_ENTER("translog_scanner_eol");
6432
("Horizon: (%lu, 0x%lx) Current: (%lu, 0x%lx+0x%x=0x%lx)",
6433
LSN_IN_PARTS(scanner->horizon),
6434
LSN_IN_PARTS(scanner->page_addr),
6435
(uint) scanner->page_offset,
6436
(ulong) (LSN_OFFSET(scanner->page_addr) + scanner->page_offset)));
6437
if (scanner->horizon > (scanner->page_addr +
6438
scanner->page_offset))
6440
DBUG_PRINT("info", ("Horizon is not reached"));
6443
if (scanner->fixed_horizon)
6445
DBUG_PRINT("info", ("Horizon is fixed and reached"));
6448
scanner->horizon= translog_get_horizon();
6450
("Horizon is re-read, EOL: %d",
6451
scanner->horizon <= (scanner->page_addr +
6452
scanner->page_offset)));
6453
DBUG_RETURN(scanner->horizon <= (scanner->page_addr +
6454
scanner->page_offset));
6459
@brief Cheks End of the Page
6461
@param scanner Information about current chunk during scanning
6463
@retval 1 End of the Page
6467
static my_bool translog_scanner_eop(TRANSLOG_SCANNER_DATA *scanner)
6469
DBUG_ENTER("translog_scanner_eop");
6470
DBUG_RETURN(scanner->page_offset >= TRANSLOG_PAGE_SIZE ||
6471
scanner->page[scanner->page_offset] == TRANSLOG_FILLER);
6476
@brief Checks End of the File (i.e. we are scanning last page, which do not
6477
mean end of this page)
6479
@param scanner Information about current chunk during scanning
6481
@retval 1 End of the File
6485
static my_bool translog_scanner_eof(TRANSLOG_SCANNER_DATA *scanner)
6487
DBUG_ENTER("translog_scanner_eof");
6488
DBUG_ASSERT(LSN_FILE_NO(scanner->page_addr) ==
6489
LSN_FILE_NO(scanner->last_file_page));
6490
DBUG_PRINT("enter", ("curr Page: 0x%lx last page: 0x%lx "
6492
(ulong) LSN_OFFSET(scanner->page_addr),
6493
(ulong) LSN_OFFSET(scanner->last_file_page),
6494
LSN_OFFSET(scanner->page_addr) ==
6495
LSN_OFFSET(scanner->last_file_page)));
6497
TODO: detect damaged file EOF,
6498
TODO: issue warning if damaged file EOF detected
6500
DBUG_RETURN(scanner->page_addr ==
6501
scanner->last_file_page);
6505
Move scanner to the next chunk
6508
translog_get_next_chunk()
6509
scanner Information about current chunk during scanning
6517
translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner)
6520
DBUG_ENTER("translog_get_next_chunk");
6522
if (translog_scanner_eop(scanner))
6523
len= TRANSLOG_PAGE_SIZE - scanner->page_offset;
6524
else if ((len= translog_get_total_chunk_length(scanner->page,
6525
scanner->page_offset)) == 0)
6527
scanner->page_offset+= len;
6529
if (translog_scanner_eol(scanner))
6531
scanner->page= END_OF_LOG;
6532
scanner->page_offset= 0;
6535
if (translog_scanner_eop(scanner))
6537
/* before reading next page we should unpin current one if it was pinned */
6538
translog_free_link(scanner->direct_link);
6539
if (translog_scanner_eof(scanner))
6541
DBUG_PRINT("info", ("horizon: (%lu,0x%lx) pageaddr: (%lu,0x%lx)",
6542
LSN_IN_PARTS(scanner->horizon),
6543
LSN_IN_PARTS(scanner->page_addr)));
6544
/* if it is log end it have to be caught before */
6545
DBUG_ASSERT(LSN_FILE_NO(scanner->horizon) >
6546
LSN_FILE_NO(scanner->page_addr));
6547
scanner->page_addr+= LSN_ONE_FILE;
6548
scanner->page_addr= LSN_REPLACE_OFFSET(scanner->page_addr,
6549
TRANSLOG_PAGE_SIZE);
6550
if (translog_scanner_set_last_page(scanner))
6555
scanner->page_addr+= TRANSLOG_PAGE_SIZE; /* offset increased */
6558
if (translog_scanner_get_page(scanner))
6561
scanner->page_offset= translog_get_first_chunk_offset(scanner->page);
6562
if (translog_scanner_eol(scanner))
6564
scanner->page= END_OF_LOG;
6565
scanner->page_offset= 0;
6568
DBUG_ASSERT(scanner->page[scanner->page_offset] != TRANSLOG_FILLER);
6575
@brief Get header of variable length record and call hook for it processing
6577
@param page Pointer to the buffer with page where LSN chunk is
6579
@param page_offset Offset of the first chunk in the page
6580
@param buff Buffer to be filled with header data
6581
@param scanner If present should be moved to the header page if
6582
it differ from LSN page
6584
@return Length of header or operation status
6585
@retval RECHEADER_READ_ERROR error
6586
@retval RECHEADER_READ_EOF End of the log reached during the read
6587
@retval # number of bytes in
6588
TRANSLOG_HEADER_BUFFER::header where
6589
stored decoded part of the header
6593
translog_variable_length_header(uchar *page, translog_size_t page_offset,
6594
TRANSLOG_HEADER_BUFFER *buff,
6595
TRANSLOG_SCANNER_DATA *scanner)
6597
struct st_log_record_type_descriptor *desc= (log_record_type_descriptor +
6599
uchar *src= page + page_offset + 1 + 2;
6600
uchar *dst= buff->header;
6602
uint lsns= desc->compressed_LSN;
6604
uint16 length= desc->read_header_len;
6605
uint16 buffer_length= length;
6608
TRANSLOG_SCANNER_DATA internal_scanner;
6609
DBUG_ENTER("translog_variable_length_header");
6611
buff->record_length= translog_variable_record_1group_decode_len(&src);
6612
chunk_len= uint2korr(src);
6613
DBUG_PRINT("info", ("rec len: %lu chunk len: %u length: %u bufflen: %u",
6614
(ulong) buff->record_length, (uint) chunk_len,
6615
(uint) length, (uint) buffer_length));
6619
DBUG_PRINT("info", ("1 group"));
6621
page_rest= (uint16) (TRANSLOG_PAGE_SIZE - (src - page));
6623
base_lsn= buff->lsn;
6624
body_len= min(page_rest, buff->record_length);
6629
uint header_to_skip;
6632
DBUG_PRINT("info", ("multi-group"));
6633
grp_no= buff->groups_no= uint2korr(src + 2);
6635
(TRANSLOG_GROUP*) my_malloc(sizeof(TRANSLOG_GROUP) * grp_no,
6637
DBUG_RETURN(RECHEADER_READ_ERROR);
6638
DBUG_PRINT("info", ("Groups: %u", (uint) grp_no));
6640
page_rest= (uint16) (TRANSLOG_PAGE_SIZE - (src - page));
6642
header_to_skip= src - (page + page_offset);
6643
buff->chunk0_pages= 0;
6647
uint i, read_length= grp_no;
6649
buff->chunk0_pages++;
6650
if (page_rest < grp_no * (7 + 1))
6651
read_length= page_rest / (7 + 1);
6652
DBUG_PRINT("info", ("Read chunk0 page#%u read: %u left: %u "
6654
buff->chunk0_pages, read_length, grp_no, curr));
6655
for (i= 0; i < read_length; i++, curr++)
6657
DBUG_ASSERT(curr < buff->groups_no);
6658
buff->groups[curr].addr= lsn_korr(src + i * (7 + 1));
6659
buff->groups[curr].num= src[i * (7 + 1) + 7];
6660
DBUG_PRINT("info", ("group #%u (%lu,0x%lx) chunks: %u",
6662
LSN_IN_PARTS(buff->groups[curr].addr),
6663
(uint) buff->groups[curr].num));
6665
grp_no-= read_length;
6670
buff->chunk0_data_addr= scanner->page_addr;
6671
/* offset increased */
6672
buff->chunk0_data_addr+= (page_offset + header_to_skip +
6673
read_length * (7 + 1));
6677
buff->chunk0_data_addr= buff->lsn;
6678
/* offset increased */
6679
buff->chunk0_data_addr+= (header_to_skip + read_length * (7 + 1));
6681
buff->chunk0_data_len= chunk_len - 2 - read_length * (7 + 1);
6682
DBUG_PRINT("info", ("Data address: (%lu,0x%lx) len: %u",
6683
LSN_IN_PARTS(buff->chunk0_data_addr),
6684
buff->chunk0_data_len));
6687
if (scanner == NULL)
6689
DBUG_PRINT("info", ("use internal scanner for header reading"));
6690
scanner= &internal_scanner;
6691
if (translog_scanner_init(buff->lsn, 1, scanner, 0))
6693
rc= RECHEADER_READ_ERROR;
6697
if (translog_get_next_chunk(scanner))
6699
if (scanner == &internal_scanner)
6700
translog_destroy_scanner(scanner);
6701
rc= RECHEADER_READ_ERROR;
6704
if (scanner->page == END_OF_LOG)
6706
if (scanner == &internal_scanner)
6707
translog_destroy_scanner(scanner);
6708
rc= RECHEADER_READ_EOF;
6711
page= scanner->page;
6712
page_offset= scanner->page_offset;
6713
src= page + page_offset + header_to_skip;
6714
chunk_len= uint2korr(src - 2 - 2);
6715
DBUG_PRINT("info", ("Chunk len: %u", (uint) chunk_len));
6716
page_rest= (uint16) (TRANSLOG_PAGE_SIZE - (src - page));
6719
if (scanner == NULL)
6721
DBUG_PRINT("info", ("use internal scanner"));
6722
scanner= &internal_scanner;
6726
translog_destroy_scanner(scanner);
6728
base_lsn= buff->groups[0].addr;
6729
translog_scanner_init(base_lsn, 1, scanner, scanner == &internal_scanner);
6730
/* first group chunk is always chunk type 2 */
6731
page= scanner->page;
6732
page_offset= scanner->page_offset;
6733
src= page + page_offset + 1;
6734
page_rest= (uint16) (TRANSLOG_PAGE_SIZE - (src - page));
6735
body_len= page_rest;
6736
if (scanner == &internal_scanner)
6737
translog_destroy_scanner(scanner);
6742
src= translog_relative_LSN_decode(base_lsn, src, dst, lsns);
6743
lsns*= LSN_STORE_SIZE;
6746
buff->record_length+= (buff->compressed_LSN_economy=
6747
(int) (lsns - (src - start)));
6748
DBUG_PRINT("info", ("lsns: %u length: %u economy: %d new length: %lu",
6749
lsns / LSN_STORE_SIZE, (uint) length,
6750
(int) buff->compressed_LSN_economy,
6751
(ulong) buff->record_length));
6752
body_len-= (uint16) (src - start);
6755
buff->compressed_LSN_economy= 0;
6757
DBUG_ASSERT(body_len >= length);
6759
memcpy(dst, src, length);
6760
buff->non_header_data_start_offset= (uint16) (src + length - page);
6761
buff->non_header_data_len= body_len;
6762
DBUG_PRINT("info", ("non_header_data_start_offset: %u len: %u buffer: %u",
6763
buff->non_header_data_start_offset,
6764
buff->non_header_data_len, buffer_length));
6765
DBUG_RETURN(buffer_length);
6768
my_free(buff->groups, MYF(0));
6769
buff->groups_no= 0; /* prevent try to use of buff->groups */
6775
@brief Read record header from the given buffer
6777
@param page page content buffer
6778
@param page_offset offset of the chunk in the page
6779
@param buff destination buffer
6780
@param scanner If this is set the scanner will be moved to the
6781
record header page (differ from LSN page in case of
6782
multi-group records)
6784
@return Length of header or operation status
6785
@retval RECHEADER_READ_ERROR error
6786
@retval # number of bytes in
6787
TRANSLOG_HEADER_BUFFER::header where
6788
stored decoded part of the header
6791
int translog_read_record_header_from_buffer(uchar *page,
6793
TRANSLOG_HEADER_BUFFER *buff,
6794
TRANSLOG_SCANNER_DATA *scanner)
6796
translog_size_t res;
6797
DBUG_ENTER("translog_read_record_header_from_buffer");
6798
DBUG_ASSERT(translog_is_LSN_chunk(page[page_offset]));
6799
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
6800
translog_status == TRANSLOG_READONLY);
6801
DBUG_PRINT("info", ("page byte: 0x%x offset: %u",
6802
(uint) page[page_offset], (uint) page_offset));
6803
buff->type= (page[page_offset] & TRANSLOG_REC_TYPE);
6804
buff->short_trid= uint2korr(page + page_offset + 1);
6805
DBUG_PRINT("info", ("Type %u, Short TrID %u, LSN (%lu,0x%lx)",
6806
(uint) buff->type, (uint)buff->short_trid,
6807
LSN_IN_PARTS(buff->lsn)));
6808
/* Read required bytes from the header and call hook */
6809
switch (log_record_type_descriptor[buff->type].rclass) {
6810
case LOGRECTYPE_VARIABLE_LENGTH:
6811
res= translog_variable_length_header(page, page_offset, buff,
6814
case LOGRECTYPE_PSEUDOFIXEDLENGTH:
6815
case LOGRECTYPE_FIXEDLENGTH:
6816
res= translog_fixed_length_header(page, page_offset, buff);
6819
DBUG_ASSERT(0); /* we read some junk (got no LSN) */
6820
res= RECHEADER_READ_ERROR;
6827
@brief Read record header and some fixed part of a record (the part depend
6830
@param lsn log record serial number (address of the record)
6831
@param buff log record header buffer
6833
@note Some type of record can be read completely by this call
6834
@note "Decoded" header stored in TRANSLOG_HEADER_BUFFER::header (relative
6835
LSN can be translated to absolute one), some fields can be added (like
6836
actual header length in the record if the header has variable length)
6838
@return Length of header or operation status
6839
@retval RECHEADER_READ_ERROR error
6840
@retval # number of bytes in
6841
TRANSLOG_HEADER_BUFFER::header where
6842
stored decoded part of the header
6845
int translog_read_record_header(LSN lsn, TRANSLOG_HEADER_BUFFER *buff)
6847
TRANSLOG_PAGE_SIZE_BUFF psize_buff;
6849
translog_size_t res, page_offset= LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE;
6850
PAGECACHE_BLOCK_LINK *direct_link;
6851
TRANSLOG_ADDRESS addr;
6852
TRANSLOG_VALIDATOR_DATA data;
6853
DBUG_ENTER("translog_read_record_header");
6854
DBUG_PRINT("enter", ("LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
6855
DBUG_ASSERT(LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE != 0);
6856
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
6857
translog_status == TRANSLOG_READONLY);
6862
data.was_recovered= 0;
6864
addr-= page_offset; /* offset decreasing */
6865
res= (!(page= translog_get_page(&data, psize_buff.buffer, &direct_link))) ?
6866
RECHEADER_READ_ERROR :
6867
translog_read_record_header_from_buffer(page, page_offset, buff, 0);
6868
translog_free_link(direct_link);
6874
@brief Read record header and some fixed part of a record (the part depend
6877
@param scan scanner position to read
6878
@param buff log record header buffer
6879
@param move_scanner request to move scanner to the header position
6881
@note Some type of record can be read completely by this call
6882
@note "Decoded" header stored in TRANSLOG_HEADER_BUFFER::header (relative
6883
LSN can be translated to absolute one), some fields can be added (like
6884
actual header length in the record if the header has variable length)
6886
@return Length of header or operation status
6887
@retval RECHEADER_READ_ERROR error
6888
@retval # number of bytes in
6889
TRANSLOG_HEADER_BUFFER::header where stored
6890
decoded part of the header
6893
int translog_read_record_header_scan(TRANSLOG_SCANNER_DATA *scanner,
6894
TRANSLOG_HEADER_BUFFER *buff,
6895
my_bool move_scanner)
6897
translog_size_t res;
6898
DBUG_ENTER("translog_read_record_header_scan");
6899
DBUG_PRINT("enter", ("Scanner: Cur: (%lu,0x%lx) Hrz: (%lu,0x%lx) "
6900
"Lst: (%lu,0x%lx) Offset: %u(%x) fixed %d",
6901
LSN_IN_PARTS(scanner->page_addr),
6902
LSN_IN_PARTS(scanner->horizon),
6903
LSN_IN_PARTS(scanner->last_file_page),
6904
(uint) scanner->page_offset,
6905
(uint) scanner->page_offset, scanner->fixed_horizon));
6906
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
6907
translog_status == TRANSLOG_READONLY);
6909
buff->lsn= scanner->page_addr;
6910
buff->lsn+= scanner->page_offset; /* offset increasing */
6911
res= translog_read_record_header_from_buffer(scanner->page,
6912
scanner->page_offset,
6921
@brief Read record header and some fixed part of the next record (the part
6922
depend on record type).
6924
@param scanner data for scanning if lsn is NULL scanner data
6925
will be used for continue scanning.
6926
The scanner can be NULL.
6928
@param buff log record header buffer
6930
@return Length of header or operation status
6931
@retval RECHEADER_READ_ERROR error
6932
@retval RECHEADER_READ_EOF EOF
6933
@retval # number of bytes in
6934
TRANSLOG_HEADER_BUFFER::header where
6935
stored decoded part of the header
6938
int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner,
6939
TRANSLOG_HEADER_BUFFER *buff)
6941
translog_size_t res;
6943
DBUG_ENTER("translog_read_next_record_header");
6944
buff->groups_no= 0; /* to be sure that we will free it right */
6945
DBUG_PRINT("enter", ("scanner: 0x%lx", (ulong) scanner));
6946
DBUG_PRINT("info", ("Scanner: Cur: (%lu,0x%lx) Hrz: (%lu,0x%lx) "
6947
"Lst: (%lu,0x%lx) Offset: %u(%x) fixed: %d",
6948
LSN_IN_PARTS(scanner->page_addr),
6949
LSN_IN_PARTS(scanner->horizon),
6950
LSN_IN_PARTS(scanner->last_file_page),
6951
(uint) scanner->page_offset,
6952
(uint) scanner->page_offset, scanner->fixed_horizon));
6953
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
6954
translog_status == TRANSLOG_READONLY);
6958
if (translog_get_next_chunk(scanner))
6959
DBUG_RETURN(RECHEADER_READ_ERROR);
6960
if (scanner->page == END_OF_LOG)
6962
DBUG_PRINT("info", ("End of file from the scanner"));
6963
/* Last record was read */
6964
buff->lsn= LSN_IMPOSSIBLE;
6965
DBUG_RETURN(RECHEADER_READ_EOF);
6967
DBUG_PRINT("info", ("Page: (%lu,0x%lx) offset: %lu byte: %x",
6968
LSN_IN_PARTS(scanner->page_addr),
6969
(ulong) scanner->page_offset,
6970
(uint) scanner->page[scanner->page_offset]));
6971
} while (!translog_is_LSN_chunk(scanner->page[scanner->page_offset]) &&
6972
scanner->page[scanner->page_offset] != TRANSLOG_FILLER);
6974
if (scanner->page[scanner->page_offset] == TRANSLOG_FILLER)
6976
DBUG_PRINT("info", ("End of file"));
6977
/* Last record was read */
6978
buff->lsn= LSN_IMPOSSIBLE;
6979
/* Return 'end of log' marker */
6980
res= RECHEADER_READ_EOF;
6983
res= translog_read_record_header_scan(scanner, buff, 0);
6989
Moves record data reader to the next chunk and fill the data reader
6990
information about that chunk.
6993
translog_record_read_next_chunk()
7001
static my_bool translog_record_read_next_chunk(TRANSLOG_READER_DATA *data)
7003
translog_size_t new_current_offset= data->current_offset + data->chunk_size;
7004
uint16 chunk_header_len, chunk_len;
7006
DBUG_ENTER("translog_record_read_next_chunk");
7010
DBUG_PRINT("info", ("end of the record flag set"));
7014
if (data->header.groups_no &&
7015
data->header.groups_no - 1 != data->current_group &&
7016
data->header.groups[data->current_group].num == data->current_chunk)
7018
/* Goto next group */
7019
data->current_group++;
7020
data->current_chunk= 0;
7021
DBUG_PRINT("info", ("skip to group: #%u", data->current_group));
7022
translog_destroy_scanner(&data->scanner);
7023
translog_scanner_init(data->header.groups[data->current_group].addr,
7024
1, &data->scanner, 1);
7028
data->current_chunk++;
7029
if (translog_get_next_chunk(&data->scanner))
7031
if (data->scanner.page == END_OF_LOG)
7034
Actually it should not happened, but we want to quit nicely in case
7040
type= data->scanner.page[data->scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
7042
if (type == TRANSLOG_CHUNK_LSN && data->header.groups_no)
7045
("Last chunk: data len: %u offset: %u group: %u of %u",
7046
data->header.chunk0_data_len, data->scanner.page_offset,
7047
data->current_group, data->header.groups_no - 1));
7048
DBUG_ASSERT(data->header.groups_no - 1 == data->current_group);
7049
DBUG_ASSERT(data->header.lsn ==
7050
data->scanner.page_addr + data->scanner.page_offset);
7051
translog_destroy_scanner(&data->scanner);
7052
translog_scanner_init(data->header.chunk0_data_addr, 1, &data->scanner, 1);
7053
data->chunk_size= data->header.chunk0_data_len;
7054
data->body_offset= data->scanner.page_offset;
7055
data->current_offset= new_current_offset;
7060
if (type == TRANSLOG_CHUNK_LSN || type == TRANSLOG_CHUNK_FIXED)
7063
DBUG_RETURN(1); /* End of record */
7067
translog_get_chunk_header_length(data->scanner.page +
7068
data->scanner.page_offset);
7069
chunk_len= translog_get_total_chunk_length(data->scanner.page,
7070
data->scanner.page_offset);
7071
data->chunk_size= chunk_len - chunk_header_len;
7072
data->body_offset= data->scanner.page_offset + chunk_header_len;
7073
data->current_offset= new_current_offset;
7074
DBUG_PRINT("info", ("grp: %u chunk: %u body_offset: %u chunk_size: %u "
7075
"current_offset: %lu",
7076
(uint) data->current_group,
7077
(uint) data->current_chunk,
7078
(uint) data->body_offset,
7079
(uint) data->chunk_size, (ulong) data->current_offset));
7085
Initialize record reader data from LSN
7088
translog_init_reader_data()
7089
lsn reference to LSN we should start from
7090
data reader data to initialize
7097
static my_bool translog_init_reader_data(LSN lsn,
7098
TRANSLOG_READER_DATA *data)
7101
DBUG_ENTER("translog_init_reader_data");
7102
if (translog_scanner_init(lsn, 1, &data->scanner, 1) ||
7104
translog_read_record_header_scan(&data->scanner, &data->header, 1))
7105
== RECHEADER_READ_ERROR))
7107
data->read_header= read_header;
7108
data->body_offset= data->header.non_header_data_start_offset;
7109
data->chunk_size= data->header.non_header_data_len;
7110
data->current_offset= data->read_header;
7111
data->current_group= 0;
7112
data->current_chunk= 0;
7114
DBUG_PRINT("info", ("read_header: %u "
7115
"body_offset: %u chunk_size: %u current_offset: %lu",
7116
(uint) data->read_header,
7117
(uint) data->body_offset,
7118
(uint) data->chunk_size, (ulong) data->current_offset));
7124
@brief Destroy reader data object
7127
static void translog_destroy_reader_data(TRANSLOG_READER_DATA *data)
7129
translog_destroy_scanner(&data->scanner);
7130
translog_free_record_header(&data->header);
7135
Read a part of the record.
7138
translog_read_record_header()
7139
lsn log record serial number (address of the record)
7140
offset From the beginning of the record beginning (read
7141
by translog_read_record_header).
7142
length Length of record part which have to be read.
7143
buffer Buffer where to read the record part (have to be at
7144
least 'length' bytes length)
7147
length of data actually read
7150
translog_size_t translog_read_record(LSN lsn,
7151
translog_size_t offset,
7152
translog_size_t length,
7154
TRANSLOG_READER_DATA *data)
7156
translog_size_t requested_length= length;
7157
translog_size_t end= offset + length;
7158
TRANSLOG_READER_DATA internal_data;
7159
DBUG_ENTER("translog_read_record");
7160
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
7161
translog_status == TRANSLOG_READONLY);
7165
DBUG_ASSERT(lsn != LSN_IMPOSSIBLE);
7166
data= &internal_data;
7169
(offset < data->current_offset &&
7170
!(offset < data->read_header && offset + length < data->read_header)))
7172
if (translog_init_reader_data(lsn, data))
7175
DBUG_PRINT("info", ("Offset: %lu length: %lu "
7176
"Scanner: Cur: (%lu,0x%lx) Hrz: (%lu,0x%lx) "
7177
"Lst: (%lu,0x%lx) Offset: %u(%x) fixed: %d",
7178
(ulong) offset, (ulong) length,
7179
LSN_IN_PARTS(data->scanner.page_addr),
7180
LSN_IN_PARTS(data->scanner.horizon),
7181
LSN_IN_PARTS(data->scanner.last_file_page),
7182
(uint) data->scanner.page_offset,
7183
(uint) data->scanner.page_offset,
7184
data->scanner.fixed_horizon));
7185
if (offset < data->read_header)
7187
uint16 len= min(data->read_header, end) - offset;
7189
("enter header offset: %lu length: %lu",
7190
(ulong) offset, (ulong) length));
7191
memcpy(buffer, data->header.header + offset, len);
7195
translog_destroy_reader_data(data);
7196
DBUG_RETURN(requested_length);
7201
("len: %u offset: %lu curr: %lu length: %lu",
7202
len, (ulong) offset, (ulong) data->current_offset,
7205
/* TODO: find first page which we should read by offset */
7207
/* read the record chunk by chunk */
7210
uint page_end= data->current_offset + data->chunk_size;
7212
("enter body offset: %lu curr: %lu "
7213
"length: %lu page_end: %lu",
7214
(ulong) offset, (ulong) data->current_offset, (ulong) length,
7216
if (offset < page_end)
7218
uint len= page_end - offset;
7219
set_if_smaller(len, length); /* in case we read beyond record's end */
7220
DBUG_ASSERT(offset >= data->current_offset);
7222
data->scanner.page + data->body_offset +
7223
(offset - data->current_offset), len);
7227
translog_destroy_reader_data(data);
7228
DBUG_RETURN(requested_length);
7233
("len: %u offset: %lu curr: %lu length: %lu",
7234
len, (ulong) offset, (ulong) data->current_offset,
7237
if (translog_record_read_next_chunk(data))
7239
translog_destroy_reader_data(data);
7240
DBUG_RETURN(requested_length - length);
7247
@brief Force skipping to the next buffer
7249
@todo Do not copy old page content if all page protections are switched off
7250
(because we do not need calculate something or change old parts of the page)
7253
static void translog_force_current_buffer_to_finish()
7255
TRANSLOG_ADDRESS new_buff_beginning;
7256
uint16 old_buffer_no= log_descriptor.bc.buffer_no;
7257
uint16 new_buffer_no= (old_buffer_no + 1) % TRANSLOG_BUFFERS_NO;
7258
struct st_translog_buffer *new_buffer= (log_descriptor.buffers +
7260
struct st_translog_buffer *old_buffer= log_descriptor.bc.buffer;
7261
uchar *data= log_descriptor.bc.ptr - log_descriptor.bc.current_page_fill;
7262
uint16 left= TRANSLOG_PAGE_SIZE - log_descriptor.bc.current_page_fill;
7263
uint16 current_page_fill, write_counter, previous_offset;
7264
DBUG_ENTER("translog_force_current_buffer_to_finish");
7265
DBUG_PRINT("enter", ("Buffer #%u 0x%lx "
7266
"Buffer addr: (%lu,0x%lx) "
7267
"Page addr: (%lu,0x%lx) "
7268
"size: %lu (%lu) Pg: %u left: %u in progress %u",
7269
(uint) log_descriptor.bc.buffer_no,
7270
(ulong) log_descriptor.bc.buffer,
7271
LSN_IN_PARTS(log_descriptor.bc.buffer->offset),
7272
(ulong) LSN_FILE_NO(log_descriptor.horizon),
7273
(ulong) (LSN_OFFSET(log_descriptor.horizon) -
7274
log_descriptor.bc.current_page_fill),
7275
(ulong) log_descriptor.bc.buffer->size,
7276
(ulong) (log_descriptor.bc.ptr -log_descriptor.bc.
7278
(uint) log_descriptor.bc.current_page_fill,
7280
(uint) log_descriptor.bc.buffer->
7281
copy_to_buffer_in_progress));
7282
translog_lock_assert_owner();
7283
LINT_INIT(current_page_fill);
7284
new_buff_beginning= log_descriptor.bc.buffer->offset;
7285
new_buff_beginning+= log_descriptor.bc.buffer->size; /* increase offset */
7287
DBUG_ASSERT(log_descriptor.bc.ptr !=NULL);
7288
DBUG_ASSERT(LSN_FILE_NO(log_descriptor.horizon) ==
7289
LSN_FILE_NO(log_descriptor.bc.buffer->offset));
7290
translog_check_cursor(&log_descriptor.bc);
7291
DBUG_ASSERT(left < TRANSLOG_PAGE_SIZE);
7295
TODO: if 'left' is so small that can't hold any other record
7296
then do not move the page
7298
DBUG_PRINT("info", ("left: %u", (uint) left));
7300
/* decrease offset */
7301
new_buff_beginning-= log_descriptor.bc.current_page_fill;
7302
current_page_fill= log_descriptor.bc.current_page_fill;
7304
memset(log_descriptor.bc.ptr, TRANSLOG_FILLER, left);
7305
log_descriptor.bc.buffer->size+= left;
7306
DBUG_PRINT("info", ("Finish Page buffer #%u: 0x%lx "
7308
(uint) log_descriptor.bc.buffer->buffer_no,
7309
(ulong) log_descriptor.bc.buffer,
7310
(ulong) log_descriptor.bc.buffer->size));
7311
DBUG_ASSERT(log_descriptor.bc.buffer->buffer_no ==
7312
log_descriptor.bc.buffer_no);
7316
log_descriptor.bc.current_page_fill= 0;
7319
translog_buffer_lock(new_buffer);
7322
TRANSLOG_ADDRESS offset= new_buffer->offset;
7323
TRANSLOG_FILE *file= new_buffer->file;
7324
uint8 ver= new_buffer->ver;
7325
translog_lock_assert_owner();
7327
translog_wait_for_buffer_free(new_buffer);
7329
/* We keep the handler locked so nobody can start this new buffer */
7330
DBUG_ASSERT(offset == new_buffer->offset && new_buffer->file == NULL &&
7331
(file == NULL ? ver : (uint8)(ver + 1)) == new_buffer->ver);
7335
write_counter= log_descriptor.bc.write_counter;
7336
previous_offset= log_descriptor.bc.previous_offset;
7337
translog_start_buffer(new_buffer, &log_descriptor.bc, new_buffer_no);
7338
/* Fix buffer offset (which was incorrectly set to horizon) */
7339
log_descriptor.bc.buffer->offset= new_buff_beginning;
7340
log_descriptor.bc.write_counter= write_counter;
7341
log_descriptor.bc.previous_offset= previous_offset;
7344
Advances this log pointer, increases writers and let other threads to
7345
write to the log while we process old page content
7349
log_descriptor.bc.ptr+= current_page_fill;
7350
log_descriptor.bc.buffer->size= log_descriptor.bc.current_page_fill=
7352
new_buffer->overlay= old_buffer;
7355
translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc);
7356
translog_buffer_increase_writers(new_buffer);
7357
translog_buffer_unlock(new_buffer);
7360
We have to wait until all writers finish before start changing the
7361
pages by applying protection and copying the page content in the
7366
TRANSLOG_ADDRESS offset= old_buffer->offset;
7367
TRANSLOG_FILE *file= old_buffer->file;
7368
uint8 ver= old_buffer->ver;
7371
Now only one thread can flush log (buffer can flush many threads but
7372
log flush log flush where this function is used can do only one thread)
7373
so no other thread can set is_closing_buffer.
7375
DBUG_ASSERT(!old_buffer->is_closing_buffer);
7376
old_buffer->is_closing_buffer= 1; /* Other flushes will wait */
7377
DBUG_PRINT("enter", ("Buffer #%u 0x%lx is_closing_buffer set",
7378
(uint) old_buffer->buffer_no, (ulong) old_buffer));
7379
translog_wait_for_writers(old_buffer);
7381
/* We blocked flushing this buffer so the buffer should not changed */
7382
DBUG_ASSERT(offset == old_buffer->offset && file == old_buffer->file &&
7383
ver == old_buffer->ver);
7387
if (log_descriptor.flags & TRANSLOG_SECTOR_PROTECTION)
7389
translog_put_sector_protection(data, &log_descriptor.bc);
7392
log_descriptor.bc.write_counter++;
7393
log_descriptor.bc.previous_offset= current_page_fill;
7397
DBUG_PRINT("info", ("drop write_counter"));
7398
log_descriptor.bc.write_counter= 0;
7399
log_descriptor.bc.previous_offset= 0;
7403
if (log_descriptor.flags & TRANSLOG_PAGE_CRC)
7405
uint32 crc= translog_crc(data + log_descriptor.page_overhead,
7406
TRANSLOG_PAGE_SIZE -
7407
log_descriptor.page_overhead);
7408
DBUG_PRINT("info", ("CRC: 0x%lx", (ulong) crc));
7409
int4store(data + 3 + 3 + 1, crc);
7411
old_buffer->is_closing_buffer= 0;
7412
DBUG_PRINT("enter", ("Buffer #%u 0x%lx is_closing_buffer cleared",
7413
(uint) old_buffer->buffer_no, (ulong) old_buffer));
7414
pthread_cond_broadcast(&old_buffer->waiting_filling_buffer);
7419
TODO: do not copy beginning of the page if we have no CRC or sector
7422
memcpy(new_buffer->buffer, data, current_page_fill);
7424
old_buffer->next_buffer_offset= new_buffer->offset;
7426
translog_buffer_lock(new_buffer);
7427
translog_buffer_decrease_writers(new_buffer);
7428
translog_buffer_unlock(new_buffer);
7435
@brief Waits while given lsn will be flushed
7437
@param lsn log record serial number up to which (inclusive)
7438
the log has to be flushed
7441
void translog_flush_wait_for_end(LSN lsn)
7443
DBUG_ENTER("translog_flush_wait_for_end");
7444
DBUG_PRINT("enter", ("LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
7445
safe_mutex_assert_owner(&log_descriptor.log_flush_lock);
7446
while (cmp_translog_addr(log_descriptor.flushed, lsn) < 0)
7447
pthread_cond_wait(&log_descriptor.log_flush_cond,
7448
&log_descriptor.log_flush_lock);
7454
@brief Sets goal for the next flush pass and waits for this pass end.
7456
@param lsn log record serial number up to which (inclusive)
7457
the log has to be flushed
7460
void translog_flush_set_new_goal_and_wait(TRANSLOG_ADDRESS lsn)
7462
DBUG_ENTER("translog_flush_set_new_goal_and_wait");
7463
DBUG_PRINT("enter", ("LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
7464
safe_mutex_assert_owner(&log_descriptor.log_flush_lock);
7465
if (cmp_translog_addr(lsn, log_descriptor.next_pass_max_lsn) > 0)
7467
log_descriptor.next_pass_max_lsn= lsn;
7468
log_descriptor.max_lsn_requester= pthread_self();
7470
while (log_descriptor.flush_in_progress)
7472
pthread_cond_wait(&log_descriptor.log_flush_cond,
7473
&log_descriptor.log_flush_lock);
7480
@brief Flush the log up to given LSN (included)
7482
@param lsn log record serial number up to which (inclusive)
7483
the log has to be flushed
7485
@return Operation status
7491
my_bool translog_flush(TRANSLOG_ADDRESS lsn)
7493
LSN sent_to_disk= LSN_IMPOSSIBLE;
7494
TRANSLOG_ADDRESS flush_horizon;
7496
dirty_buffer_mask_t dirty_buffer_mask;
7497
uint8 last_buffer_no, start_buffer_no;
7499
DBUG_ENTER("translog_flush");
7500
DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
7501
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
7502
translog_status == TRANSLOG_READONLY);
7503
LINT_INIT(sent_to_disk);
7505
pthread_mutex_lock(&log_descriptor.log_flush_lock);
7506
DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)",
7507
LSN_IN_PARTS(log_descriptor.flushed)));
7508
if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0)
7510
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
7513
if (log_descriptor.flush_in_progress)
7515
translog_flush_set_new_goal_and_wait(lsn);
7516
if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self()))
7518
/* fix lsn if it was horizon */
7519
if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0)
7520
lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer);
7521
translog_flush_wait_for_end(lsn);
7522
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
7525
log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
7527
log_descriptor.flush_in_progress= 1;
7528
flush_horizon= log_descriptor.previous_flush_horizon;
7529
DBUG_PRINT("info", ("flush_in_progress is set"));
7530
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
7533
if (log_descriptor.is_everything_flushed)
7535
DBUG_PRINT("info", ("everything is flushed"));
7536
rc= (translog_status == TRANSLOG_READONLY);
7542
We will recheck information when will lock buffers one by
7543
one so we can use unprotected read here (this is just for
7544
speed up buffers processing)
7546
dirty_buffer_mask= log_descriptor.dirty_buffer_mask;
7547
DBUG_PRINT("info", ("Dirty buffer mask: %lx current buffer: %u",
7548
(ulong) dirty_buffer_mask,
7549
(uint) log_descriptor.bc.buffer_no));
7550
for (i= (log_descriptor.bc.buffer_no + 1) % TRANSLOG_BUFFERS_NO;
7551
i != log_descriptor.bc.buffer_no && !(dirty_buffer_mask & (1 << i));
7552
i= (i + 1) % TRANSLOG_BUFFERS_NO) {}
7556
("start from: %u current: %u prev last lsn: (%lu,0x%lx)",
7557
(uint) start_buffer_no, (uint) log_descriptor.bc.buffer_no,
7558
LSN_IN_PARTS(log_descriptor.bc.buffer->prev_last_lsn)));
7562
if LSN up to which we have to flush bigger then maximum LSN of previous
7563
buffer and at least one LSN was saved in the current buffer (last_lsn !=
7564
LSN_IMPOSSIBLE) then we better finish the current buffer.
7566
if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 &&
7567
log_descriptor.bc.buffer->last_lsn != LSN_IMPOSSIBLE)
7569
struct st_translog_buffer *buffer= log_descriptor.bc.buffer;
7570
lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */
7571
DBUG_PRINT("info", ("LSN to flush fixed to last lsn: (%lu,0x%lx)",
7572
LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn)));
7573
last_buffer_no= log_descriptor.bc.buffer_no;
7574
log_descriptor.is_everything_flushed= 1;
7575
translog_force_current_buffer_to_finish();
7576
translog_buffer_unlock(buffer);
7580
last_buffer_no= ((log_descriptor.bc.buffer_no + TRANSLOG_BUFFERS_NO -1) %
7581
TRANSLOG_BUFFERS_NO);
7584
sent_to_disk= translog_get_sent_to_disk();
7585
if (cmp_translog_addr(lsn, sent_to_disk) > 0)
7588
DBUG_PRINT("info", ("Start buffer #: %u last buffer #: %u",
7589
(uint) start_buffer_no, (uint) last_buffer_no));
7590
last_buffer_no= (last_buffer_no + 1) % TRANSLOG_BUFFERS_NO;
7594
struct st_translog_buffer *buffer= log_descriptor.buffers + i;
7595
translog_buffer_lock(buffer);
7596
DBUG_PRINT("info", ("Check buffer: 0x%lx #: %u "
7597
"prev last LSN: (%lu,0x%lx) "
7598
"last LSN: (%lu,0x%lx) status: %s",
7601
LSN_IN_PARTS(buffer->prev_last_lsn),
7602
LSN_IN_PARTS(buffer->last_lsn),
7604
"dirty" : "closed")));
7605
if (buffer->prev_last_lsn <= lsn &&
7606
buffer->file != NULL)
7608
DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size);
7609
flush_horizon= buffer->offset + buffer->size;
7610
translog_buffer_flush(buffer);
7612
translog_buffer_unlock(buffer);
7613
i= (i + 1) % TRANSLOG_BUFFERS_NO;
7614
} while (i != last_buffer_no);
7615
sent_to_disk= translog_get_sent_to_disk();
7618
/* sync files from previous flush till current one */
7619
for (fn= LSN_FILE_NO(log_descriptor.flushed); fn <= LSN_FILE_NO(lsn); fn++)
7621
TRANSLOG_FILE *file= get_logfile_by_number(fn);
7622
DBUG_ASSERT(file != NULL);
7625
if (my_sync(file->handler.file, MYF(MY_WME)))
7628
translog_stop_writing();
7629
sent_to_disk= LSN_IMPOSSIBLE;
7636
if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
7637
(LSN_FILE_NO(log_descriptor.previous_flush_horizon) !=
7638
LSN_FILE_NO(flush_horizon) ||
7639
((LSN_OFFSET(log_descriptor.previous_flush_horizon) - 1) /
7640
TRANSLOG_PAGE_SIZE) !=
7641
((LSN_OFFSET(flush_horizon) - 1) / TRANSLOG_PAGE_SIZE)))
7642
rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD));
7643
log_descriptor.previous_flush_horizon= flush_horizon;
7645
pthread_mutex_lock(&log_descriptor.log_flush_lock);
7646
if (sent_to_disk != LSN_IMPOSSIBLE)
7647
log_descriptor.flushed= sent_to_disk;
7648
log_descriptor.flush_in_progress= 0;
7649
DBUG_PRINT("info", ("flush_in_progress is dropped"));
7650
pthread_mutex_unlock(&log_descriptor.log_flush_lock);\
7651
pthread_cond_broadcast(&log_descriptor.log_flush_cond);
7657
@brief Gives a 2-byte-id to MARIA_SHARE and logs this fact
7659
If a MARIA_SHARE does not yet have a 2-byte-id (unique over all currently
7660
open MARIA_SHAREs), give it one and record this assignment in the log
7661
(LOGREC_FILE_ID log record).
7663
@param tbl_info table
7664
@param trn calling transaction
7666
@return Operation status
7670
@note Can be called even if share already has an id (then will do nothing)
7673
int translog_assign_id_to_share(MARIA_HA *tbl_info, TRN *trn)
7675
MARIA_SHARE *share= tbl_info->s;
7677
If you give an id to a non-BLOCK_RECORD table, you also need to release
7678
this id somewhere. Then you can change the assertion.
7680
DBUG_ASSERT(share->data_file_type == BLOCK_RECORD);
7681
/* re-check under mutex to avoid having 2 ids for the same share */
7682
pthread_mutex_lock(&share->intern_lock);
7683
if (likely(share->id == 0))
7686
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
7687
uchar log_data[FILEID_STORE_SIZE];
7688
/* Inspired by set_short_trid() of trnman.c */
7689
uint i= share->kfile.file % SHARE_ID_MAX + 1;
7692
my_atomic_rwlock_wrlock(&LOCK_id_to_share);
7693
for ( ; i <= SHARE_ID_MAX ; i++) /* the range is [1..SHARE_ID_MAX] */
7696
if (id_to_share[i] == NULL &&
7697
my_atomic_casptr((void **)&id_to_share[i], &tmp, share))
7699
share->id= (uint16)i;
7703
my_atomic_rwlock_wrunlock(&LOCK_id_to_share);
7704
i= 1; /* scan the whole array */
7705
} while (share->id == 0);
7706
DBUG_PRINT("info", ("id_to_share: 0x%lx -> %u", (ulong)share, share->id));
7707
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
7708
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
7710
open_file_name is an unresolved name (symlinks are not resolved, datadir
7711
is not realpath-ed, etc) which is good: the log can be moved to another
7712
directory and continue working.
7714
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= share->open_file_name;
7716
@todo if we had the name's length in MARIA_SHARE we could avoid this
7719
log_array[TRANSLOG_INTERNAL_PARTS + 1].length=
7720
strlen(share->open_file_name) + 1;
7721
if (unlikely(translog_write_record(&lsn, LOGREC_FILE_ID, trn, tbl_info,
7724
log_array[TRANSLOG_INTERNAL_PARTS +
7726
sizeof(log_array)/sizeof(log_array[0]),
7727
log_array, log_data, NULL)))
7729
pthread_mutex_unlock(&share->intern_lock);
7733
pthread_mutex_unlock(&share->intern_lock);
7739
@brief Recycles a MARIA_SHARE's short id.
7743
@note Must be called only if share has an id (i.e. id != 0)
7746
void translog_deassign_id_from_share(MARIA_SHARE *share)
7748
DBUG_PRINT("info", ("id_to_share: 0x%lx id %u -> 0",
7749
(ulong)share, share->id));
7751
We don't need any mutex as we are called only when closing the last
7752
instance of the table or at the end of REPAIR: no writes can be
7753
happening. But a Checkpoint may be reading share->id, so we require this
7756
safe_mutex_assert_owner(&share->intern_lock);
7757
my_atomic_rwlock_rdlock(&LOCK_id_to_share);
7758
my_atomic_storeptr((void **)&id_to_share[share->id], 0);
7759
my_atomic_rwlock_rdunlock(&LOCK_id_to_share);
7761
/* useless but safety: */
7762
share->lsn_of_file_id= LSN_IMPOSSIBLE;
7766
void translog_assign_id_to_share_from_recovery(MARIA_SHARE *share,
7769
DBUG_ASSERT(maria_in_recovery && !maria_multi_threaded);
7770
DBUG_ASSERT(share->data_file_type == BLOCK_RECORD);
7771
DBUG_ASSERT(share->id == 0);
7772
DBUG_ASSERT(id_to_share[id] == NULL);
7773
id_to_share[share->id= id]= share;
7778
@brief check if such log file exists
7780
@param file_no number of the file to test
7782
@retval 0 no such file
7783
@retval 1 there is file with such number
7786
my_bool translog_is_file(uint file_no)
7789
char path[FN_REFLEN];
7790
return (test(my_stat(translog_filename_by_fileno(file_no, path),
7791
&stat_buff, MYF(0))));
7796
@brief returns minimum log file number
7798
@param horizon the end of the log
7799
@param is_protected true if it is under purge_log protection
7801
@retval minimum file number
7802
@retval 0 no files found
7805
static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected)
7807
uint min_file= 0, max_file;
7808
DBUG_ENTER("translog_first_file");
7810
pthread_mutex_lock(&log_descriptor.purger_lock);
7811
if (log_descriptor.min_file_number &&
7812
translog_is_file(log_descriptor.min_file_number))
7814
DBUG_PRINT("info", ("cached %lu",
7815
(ulong) log_descriptor.min_file_number));
7817
pthread_mutex_unlock(&log_descriptor.purger_lock);
7818
DBUG_RETURN(log_descriptor.min_file_number);
7821
max_file= LSN_FILE_NO(horizon);
7823
/* binary search for last file */
7824
while (min_file != max_file && min_file != (max_file - 1))
7826
uint test= (min_file + max_file) / 2;
7827
DBUG_PRINT("info", ("min_file: %u test: %u max_file: %u",
7828
min_file, test, max_file));
7829
if (test == max_file)
7831
if (translog_is_file(test))
7836
log_descriptor.min_file_number= max_file;
7838
pthread_mutex_unlock(&log_descriptor.purger_lock);
7839
DBUG_PRINT("info", ("first file :%lu", (ulong) max_file));
7840
DBUG_ASSERT(max_file >= 1);
7841
DBUG_RETURN(max_file);
7846
@brief returns the most close LSN higher the given chunk address
7848
@param addr the chunk address to start from
7849
@param horizon the horizon if it is known or LSN_IMPOSSIBLE
7851
@retval LSN_ERROR Error
7852
@retval LSN_IMPOSSIBLE no LSNs after the address
7853
@retval # LSN of the most close LSN higher the given chunk address
7856
LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon)
7858
TRANSLOG_SCANNER_DATA scanner;
7860
DBUG_ENTER("translog_next_LSN");
7862
if (horizon == LSN_IMPOSSIBLE)
7863
horizon= translog_get_horizon();
7865
if (addr == horizon)
7866
DBUG_RETURN(LSN_IMPOSSIBLE);
7868
translog_scanner_init(addr, 0, &scanner, 1);
7870
addr can point not to a chunk beginning but page end so next
7873
if (addr % TRANSLOG_PAGE_SIZE == 0)
7876
We are emulating the page end which cased such horizon value to
7877
trigger translog_scanner_eop().
7879
We can't just increase addr on page header overhead because it
7880
can be file end so we allow translog_get_next_chunk() to skip
7881
to the next page in correct way
7883
scanner.page_addr-= TRANSLOG_PAGE_SIZE;
7884
scanner.page_offset= TRANSLOG_PAGE_SIZE;
7886
scanner.page= NULL; /* prevent using incorrect page content */
7889
/* addr can point not to a chunk beginning but to a page end */
7890
if (translog_scanner_eop(&scanner))
7892
if (translog_get_next_chunk(&scanner))
7897
if (scanner.page == END_OF_LOG)
7899
result= LSN_IMPOSSIBLE;
7904
while (!translog_is_LSN_chunk(scanner.page[scanner.page_offset]) &&
7905
scanner.page[scanner.page_offset] != TRANSLOG_FILLER)
7907
if (translog_get_next_chunk(&scanner))
7912
if (scanner.page == END_OF_LOG)
7914
result= LSN_IMPOSSIBLE;
7919
if (scanner.page[scanner.page_offset] == TRANSLOG_FILLER)
7920
result= LSN_IMPOSSIBLE; /* reached page filler */
7922
result= scanner.page_addr + scanner.page_offset;
7924
translog_destroy_scanner(&scanner);
7925
DBUG_RETURN(result);
7930
@brief returns the LSN of the first record starting in this log
7932
@retval LSN_ERROR Error
7933
@retval LSN_IMPOSSIBLE no log or the log is empty
7934
@retval # LSN of the first record
7937
LSN translog_first_lsn_in_log()
7939
TRANSLOG_ADDRESS addr, horizon= translog_get_horizon();
7940
TRANSLOG_VALIDATOR_DATA data;
7942
uint16 chunk_offset;
7944
DBUG_ENTER("translog_first_lsn_in_log");
7945
DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)", LSN_IN_PARTS(horizon)));
7946
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
7947
translog_status == TRANSLOG_READONLY);
7949
if (!(file= translog_first_file(horizon, 0)))
7951
/* log has no records yet */
7952
DBUG_RETURN(LSN_IMPOSSIBLE);
7955
addr= MAKE_LSN(file, TRANSLOG_PAGE_SIZE); /* the first page of the file */
7958
TRANSLOG_PAGE_SIZE_BUFF psize_buff;
7959
if ((page= translog_get_page(&data, psize_buff.buffer, NULL)) == NULL ||
7960
(chunk_offset= translog_get_first_chunk_offset(page)) == 0)
7961
DBUG_RETURN(LSN_ERROR);
7963
addr+= chunk_offset;
7965
DBUG_RETURN(translog_next_LSN(addr, horizon));
7970
@brief Returns theoretical first LSN if first log is present
7972
@retval LSN_ERROR Error
7973
@retval LSN_IMPOSSIBLE no log
7974
@retval # LSN of the first record
7977
LSN translog_first_theoretical_lsn()
7979
TRANSLOG_ADDRESS addr= translog_get_horizon();
7980
TRANSLOG_PAGE_SIZE_BUFF psize_buff;
7982
TRANSLOG_VALIDATOR_DATA data;
7983
DBUG_ENTER("translog_first_theoretical_lsn");
7984
DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)", LSN_IN_PARTS(addr)));
7985
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
7986
translog_status == TRANSLOG_READONLY);
7988
if (!translog_is_file(1))
7989
DBUG_RETURN(LSN_IMPOSSIBLE);
7990
if (addr == MAKE_LSN(1, TRANSLOG_PAGE_SIZE))
7992
/* log has no records yet */
7993
DBUG_RETURN(MAKE_LSN(1, TRANSLOG_PAGE_SIZE +
7994
log_descriptor.page_overhead));
7997
addr= MAKE_LSN(1, TRANSLOG_PAGE_SIZE); /* the first page of the file */
7999
if ((page= translog_get_page(&data, psize_buff.buffer, NULL)) == NULL)
8000
DBUG_RETURN(LSN_ERROR);
8002
DBUG_RETURN(MAKE_LSN(1, TRANSLOG_PAGE_SIZE +
8003
page_overhead[page[TRANSLOG_PAGE_FLAGS]]));
8008
@brief Checks given low water mark and purge files if it is need
8010
@param low the last (minimum) address which is need
8016
my_bool translog_purge(TRANSLOG_ADDRESS low)
8018
uint32 last_need_file= LSN_FILE_NO(low);
8019
TRANSLOG_ADDRESS horizon= translog_get_horizon();
8021
DBUG_ENTER("translog_purge");
8022
DBUG_PRINT("enter", ("low: (%lu,0x%lx)", LSN_IN_PARTS(low)));
8023
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
8024
translog_status == TRANSLOG_READONLY);
8026
pthread_mutex_lock(&log_descriptor.purger_lock);
8027
if (LSN_FILE_NO(log_descriptor.last_lsn_checked) < last_need_file)
8030
uint32 min_file= translog_first_file(horizon, 1);
8031
DBUG_ASSERT(min_file != 0); /* log is already started */
8032
for(i= min_file; i < last_need_file && rc == 0; i++)
8034
LSN lsn= translog_get_file_max_lsn_stored(i);
8035
if (lsn == LSN_IMPOSSIBLE)
8036
break; /* files are still in writing */
8037
if (lsn == LSN_ERROR)
8042
if (cmp_translog_addr(lsn, low) >= 0)
8045
DBUG_PRINT("info", ("purge file %lu", (ulong) i));
8047
/* remove file descriptor from the cache */
8049
log_descriptor.min_file can be changed only here during execution
8050
and the function is serialized, so we can access it without problems
8052
if (i >= log_descriptor.min_file)
8054
TRANSLOG_FILE *file;
8055
rw_wrlock(&log_descriptor.open_files_lock);
8056
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
8057
log_descriptor.open_files.elements);
8058
DBUG_ASSERT(log_descriptor.min_file == i);
8059
file= *((TRANSLOG_FILE **)pop_dynamic(&log_descriptor.open_files));
8060
DBUG_PRINT("info", ("Files : %d", log_descriptor.open_files.elements));
8061
DBUG_ASSERT(i == file->number);
8062
log_descriptor.min_file++;
8063
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
8064
log_descriptor.open_files.elements);
8065
rw_unlock(&log_descriptor.open_files_lock);
8066
translog_close_log_file(file);
8068
if (log_purge_type == TRANSLOG_PURGE_IMMIDIATE)
8070
char path[FN_REFLEN], *file_name;
8071
file_name= translog_filename_by_fileno(i, path);
8072
rc= test(my_delete(file_name, MYF(MY_WME)));
8075
if (unlikely(rc == 1))
8076
log_descriptor.min_need_file= 0; /* impossible value */
8078
log_descriptor.min_need_file= i;
8081
pthread_mutex_unlock(&log_descriptor.purger_lock);
8087
@brief Purges files by stored min need file in case of
8088
"ondemend" purge type
8090
@note This function do real work only if it is "ondemend" purge type
8091
and translog_purge() was called at least once and last time without
8098
my_bool translog_purge_at_flush()
8102
DBUG_ENTER("translog_purge_at_flush");
8103
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
8104
translog_status == TRANSLOG_READONLY);
8106
if (unlikely(translog_status == TRANSLOG_READONLY))
8108
DBUG_PRINT("info", ("The log is read only => exit"));
8112
if (log_purge_type != TRANSLOG_PURGE_ONDEMAND)
8114
DBUG_PRINT("info", ("It is not \"at_flush\" => exit"));
8118
pthread_mutex_lock(&log_descriptor.purger_lock);
8120
if (unlikely(log_descriptor.min_need_file == 0))
8122
DBUG_PRINT("info", ("No info about min need file => exit"));
8123
pthread_mutex_unlock(&log_descriptor.purger_lock);
8127
min_file= translog_first_file(translog_get_horizon(), 1);
8128
DBUG_ASSERT(min_file != 0); /* log is already started */
8129
for(i= min_file; i < log_descriptor.min_need_file && rc == 0; i++)
8131
char path[FN_REFLEN], *file_name;
8132
DBUG_PRINT("info", ("purge file %lu\n", (ulong) i));
8133
file_name= translog_filename_by_fileno(i, path);
8134
rc= test(my_delete(file_name, MYF(MY_WME)));
8137
pthread_mutex_unlock(&log_descriptor.purger_lock);
8143
@brief Gets min file number
8145
@param horizon the end of the log
8147
@retval minimum file number
8148
@retval 0 no files found
8151
uint32 translog_get_first_file(TRANSLOG_ADDRESS horizon)
8153
return translog_first_file(horizon, 0);
8158
@brief Gets min file number which is needed
8160
@retval minimum file number
8164
uint32 translog_get_first_needed_file()
8167
pthread_mutex_lock(&log_descriptor.purger_lock);
8168
file_no= log_descriptor.min_need_file;
8169
pthread_mutex_unlock(&log_descriptor.purger_lock);
8175
@brief Gets transaction log file size
8177
@return transaction log file size
8180
uint32 translog_get_file_size()
8184
res= log_descriptor.log_file_max_size;
8191
@brief Sets transaction log file size
8193
@return Returns actually set transaction log size
8196
void translog_set_file_size(uint32 size)
8198
struct st_translog_buffer *old_buffer= NULL;
8199
DBUG_ENTER("translog_set_file_size");
8201
DBUG_PRINT("enter", ("Size: %lu", (ulong) size));
8202
DBUG_ASSERT(size % TRANSLOG_PAGE_SIZE == 0 &&
8203
size >= TRANSLOG_MIN_FILE_SIZE);
8204
log_descriptor.log_file_max_size= size;
8205
/* if current file longer then finish it*/
8206
if (LSN_OFFSET(log_descriptor.horizon) >= log_descriptor.log_file_max_size)
8208
old_buffer= log_descriptor.bc.buffer;
8209
translog_buffer_next(&log_descriptor.horizon, &log_descriptor.bc, 1);
8210
translog_buffer_unlock(old_buffer);
8215
translog_buffer_lock(old_buffer);
8216
translog_buffer_flush(old_buffer);
8217
translog_buffer_unlock(old_buffer);
8222
#ifdef MARIA_DUMP_LOG
8223
#include <my_getopt.h>
8224
extern void translog_example_table_init();
8225
static const char *load_default_groups[]= { "maria_dump_log",0 };
8226
static void get_options(int *argc,char * * *argv);
8228
#if defined(__WIN__)
8229
const char *default_dbug_option= "d:t:i:O,\\maria_dump_log.trace";
8231
const char *default_dbug_option= "d:t:i:o,/tmp/maria_dump_log.trace";
8234
static ulonglong opt_offset;
8235
static ulong opt_pages;
8236
static const char *opt_file= NULL;
8237
static File handler= -1;
8238
static my_bool opt_unit= 0;
8239
static struct my_option my_long_options[] =
8243
"Print chunk body dump",
8244
(uchar **) &opt_body, (uchar **) &opt_body, 0,
8245
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
8248
{"debug", '#', "Output debug log. Often the argument is 'd:t:o,filename'.",
8249
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
8251
{"file", 'f', "Path to file which will be read",
8252
(uchar**) &opt_file, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
8253
{"help", '?', "Display this help and exit.",
8254
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
8255
{ "offset", 'o', "Start reading log from this offset",
8256
(uchar**) &opt_offset, (uchar**) &opt_offset,
8257
0, GET_ULL, REQUIRED_ARG, 0, 0, ~(longlong) 0, 0, 0, 0 },
8258
{ "pages", 'n', "Number of pages to read",
8259
(uchar**) &opt_pages, (uchar**) &opt_pages, 0,
8260
GET_ULONG, REQUIRED_ARG, (long) ~(ulong) 0,
8261
(long) 1, (long) ~(ulong) 0, (long) 0,
8264
"Use unit test record table (for logs created by unittests",
8265
(uchar **) &opt_unit, (uchar **) &opt_unit, 0,
8266
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
8267
{"version", 'V', "Print version and exit.",
8268
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
8269
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
8273
static void print_version(void)
8275
(void)(printf("%s Ver 1.0 for %s on %s\n",
8276
my_progname_short, SYSTEM_TYPE, MACHINE_TYPE));
8277
NETWARE_SET_SCREEN_MODE(1);
8281
static void usage(void)
8284
puts("Copyright (C) 2008 MySQL AB");
8285
puts("This software comes with ABSOLUTELY NO WARRANTY. This is free software,");
8286
puts("and you are welcome to modify and redistribute it under the GPL license\n");
8288
puts("Dump content of maria log pages.");
8289
(void)(printf("\nUsage: %s -f file OPTIONS\n", my_progname_short));
8290
my_print_help(my_long_options);
8291
print_defaults("my", load_default_groups);
8292
my_print_variables(my_long_options);
8297
get_one_option(int optid __attribute__((unused)),
8298
const struct my_option *opt __attribute__((unused)),
8299
char *argument __attribute__((unused)))
8310
DBUG_SET_INITIAL(argument ? argument : default_dbug_option);
8318
static void get_options(int *argc,char ***argv)
8322
if ((ho_error=handle_options(argc, argv, my_long_options, get_one_option)))
8325
if (opt_file == NULL)
8334
@brief Dump information about file header page.
8337
static void dump_header_page(uchar *buff)
8339
LOGHANDLER_FILE_INFO desc;
8341
translog_interpret_file_header(&desc, buff);
8342
printf(" This can be header page:\n"
8344
" Maria log version: %lu\n"
8345
" Server version: %lu\n"
8348
llstr(desc.timestamp, strbuff),
8353
if (desc.page_size != TRANSLOG_PAGE_SIZE)
8354
printf(" WARNING: page size is not equal compiled in one %lu!!!\n",
8355
(ulong) TRANSLOG_PAGE_SIZE);
8356
printf(" File number %lu\n"
8357
" Max lsn: (%lu,0x%lx)\n",
8359
LSN_IN_PARTS(desc.max_lsn));
8362
static const char *record_class_string[]=
8364
"LOGRECTYPE_NOT_ALLOWED",
8365
"LOGRECTYPE_VARIABLE_LENGTH",
8366
"LOGRECTYPE_PSEUDOFIXEDLENGTH",
8367
"LOGRECTYPE_FIXEDLENGTH"
8372
@brief dump information about transaction log chunk
8374
@param buffer reference to the whole page
8375
@param ptr pointer to the chunk
8377
@reval # reference to the next chunk
8378
@retval NULL can't interpret data
8381
static uchar *dump_chunk(uchar *buffer, uchar *ptr)
8384
if (*ptr == TRANSLOG_FILLER)
8386
printf(" Filler till the page end\n");
8387
for (; ptr < buffer + TRANSLOG_PAGE_SIZE; ptr++)
8389
if (*ptr != TRANSLOG_FILLER)
8391
printf(" WARNING: non filler character met before page end "
8392
"(page + 0x%04x: 0x%02x) (stop interpretation)!!!",
8393
(uint) (ptr - buffer), (uint) ptr[0]);
8399
if (*ptr == 0 || *ptr == 0xFF)
8401
printf(" WARNING: chunk can't start from 0x0 "
8402
"(stop interpretation)!!!\n");
8405
switch (ptr[0] & TRANSLOG_CHUNK_TYPE) {
8406
case TRANSLOG_CHUNK_LSN:
8407
printf(" LSN chunk type 0 (variable length)\n");
8408
if (likely((ptr[0] & TRANSLOG_REC_TYPE) != TRANSLOG_CHUNK_0_CONT))
8410
printf(" Record type %u: %s record class %s compressed LSNs: %u\n",
8411
ptr[0] & TRANSLOG_REC_TYPE,
8412
(log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].name ?
8413
log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].name :
8415
record_class_string[log_record_type_descriptor[ptr[0] &
8418
log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].
8420
if (log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].rclass !=
8421
LOGRECTYPE_VARIABLE_LENGTH)
8423
printf(" WARNING: this record class here can't be used "
8424
"(stop interpretation)!!!\n");
8429
printf(" Continuation of previous chunk 0 header \n");
8430
printf(" Short transaction id: %u\n", (uint) uint2korr(ptr + 1));
8432
uchar *hdr_ptr= ptr + 1 + 2; /* chunk type and short trid */
8434
printf (" Record length: %lu\n",
8435
(ulong) translog_variable_record_1group_decode_len(&hdr_ptr));
8436
chunk_len= uint2korr(hdr_ptr);
8438
printf (" It is 1 group record (chunk length == 0)\n");
8443
printf (" Chunk length %u\n", (uint) chunk_len);
8444
groups= uint2korr(hdr_ptr + 2);
8446
printf (" Number of groups left to the end %u:\n", (uint) groups);
8448
i < groups && hdr_ptr < buffer + TRANSLOG_PAGE_SIZE;
8449
i++, hdr_ptr+= LSN_STORE_SIZE + 1)
8451
TRANSLOG_ADDRESS gpr_addr= lsn_korr(hdr_ptr);
8452
uint pages= hdr_ptr[LSN_STORE_SIZE];
8453
printf (" Group +#%u: (%lu,0x%lx) pages: %u\n",
8454
(uint) i, LSN_IN_PARTS(gpr_addr), pages);
8459
case TRANSLOG_CHUNK_FIXED:
8460
printf(" LSN chunk type 1 (fixed size)\n");
8461
printf(" Record type %u: %s record class %s compressed LSNs: %u\n",
8462
ptr[0] & TRANSLOG_REC_TYPE,
8463
(log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].name ?
8464
log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].name :
8466
record_class_string[log_record_type_descriptor[ptr[0] &
8469
log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].
8471
if (log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].rclass !=
8472
LOGRECTYPE_PSEUDOFIXEDLENGTH &&
8473
log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].rclass !=
8474
LOGRECTYPE_FIXEDLENGTH)
8476
printf(" WARNING: this record class here can't be used "
8477
"(stop interpretation)!!!\n");
8479
printf(" Short transaction id: %u\n", (uint) uint2korr(ptr + 1));
8481
case TRANSLOG_CHUNK_NOHDR:
8482
printf(" No header chunk type 2(till the end of the page)\n");
8483
if (ptr[0] & TRANSLOG_REC_TYPE)
8485
printf(" WARNING: chunk header content record type: 0x%02x "
8486
"(dtop interpretation)!!!",
8491
case TRANSLOG_CHUNK_LNGTH:
8492
printf(" Chunk with length type 3\n");
8493
if (ptr[0] & TRANSLOG_REC_TYPE)
8495
printf(" WARNING: chunk header content record type: 0x%02x "
8496
"(dtop interpretation)!!!",
8503
intptr offset= ptr - buffer;
8504
DBUG_ASSERT(offset >= 0 && offset <= UINT_MAX16);
8505
length= translog_get_total_chunk_length(buffer, (uint16)offset);
8507
printf(" Length %u\n", length);
8514
@brief Dump information about page with data.
8517
static void dump_datapage(uchar *buffer)
8523
printf(" Page: %ld File number: %ld\n",
8524
(ulong) (page= uint3korr(buffer)),
8525
(ulong) (file= uint3korr(buffer + 3)));
8527
printf(" WARNING: page == 0!!!\n");
8529
printf(" WARNING: file == 0!!!\n");
8530
offset= page * TRANSLOG_PAGE_SIZE;
8531
printf(" Flags (0x%x):\n", (uint) buffer[TRANSLOG_PAGE_FLAGS]);
8532
if (buffer[TRANSLOG_PAGE_FLAGS])
8534
if (buffer[TRANSLOG_PAGE_FLAGS] & TRANSLOG_PAGE_CRC)
8535
printf(" Page CRC\n");
8536
if (buffer[TRANSLOG_PAGE_FLAGS] & TRANSLOG_SECTOR_PROTECTION)
8537
printf(" Sector protection\n");
8538
if (buffer[TRANSLOG_PAGE_FLAGS] & TRANSLOG_RECORD_CRC)
8539
printf(" Record CRC (WARNING: not yet implemented!!!)\n");
8540
if (buffer[TRANSLOG_PAGE_FLAGS] & ~(TRANSLOG_PAGE_CRC |
8541
TRANSLOG_SECTOR_PROTECTION |
8542
TRANSLOG_RECORD_CRC))
8544
printf(" WARNING: unknown flags (stop interpretation)!!!\n");
8549
printf(" No flags\n");
8550
printf(" Page header length: %u\n",
8551
(header_len= page_overhead[buffer[TRANSLOG_PAGE_FLAGS]]));
8552
if (buffer[TRANSLOG_PAGE_FLAGS] & TRANSLOG_RECORD_CRC)
8554
uint32 crc= uint4korr(buffer + TRANSLOG_PAGE_FLAGS + 1);
8556
printf (" Page CRC 0x%04lx\n", (ulong) crc);
8557
ccrc= translog_crc(buffer + header_len, TRANSLOG_PAGE_SIZE - header_len);
8559
printf(" WARNING: calculated CRC: 0x%04lx!!!\n", (ulong) ccrc);
8561
if (buffer[TRANSLOG_PAGE_FLAGS] & TRANSLOG_SECTOR_PROTECTION)
8563
TRANSLOG_FILE tfile;
8565
uchar *table= buffer + header_len -
8566
TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE;
8568
printf(" Sector protection current value: 0x%02x\n", (uint) table[0]);
8569
for (i= 1; i < TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE; i++)
8571
printf(" Sector protection in sector: 0x%02x saved value 0x%02x\n",
8572
(uint)buffer[i * DISK_DRIVE_SECTOR_SIZE],
8577
tfile.handler.file= handler;
8578
pagecache_file_init(tfile.handler, NULL, NULL, NULL, NULL, NULL);
8579
tfile.was_recovered= 0;
8581
if (translog_check_sector_protection(buffer, &tfile))
8582
printf(" WARNING: sector protection found problems!!!\n");
8584
ptr= buffer + header_len;
8585
while (ptr && ptr < buffer + TRANSLOG_PAGE_SIZE)
8587
printf(" Chunk (%lu,0x%lx):\n",
8588
(ulong)file, (ulong) offset + (ptr - buffer));
8589
ptr= dump_chunk(buffer, ptr);
8595
@brief Dump information about page.
8598
static void dump_page(uchar *buffer)
8600
printf("Page by offset %llu (0x%llx)\n", opt_offset, opt_offset);
8601
if (strncmp((char*)maria_trans_file_magic, (char*)buffer,
8602
sizeof(maria_trans_file_magic)) == 0)
8604
dump_header_page(buffer);
8606
dump_datapage(buffer);
8611
@brief maria_dump_log main function.
8614
int main(int argc, char **argv)
8616
char **default_argv;
8617
uchar buffer[TRANSLOG_PAGE_SIZE];
8620
load_defaults("my", load_default_groups, &argc, &argv);
8622
get_options(&argc, &argv);
8625
translog_example_table_init();
8627
translog_table_init();
8628
translog_fill_overhead_table();
8630
maria_data_root= (char *)".";
8632
if ((handler= my_open(opt_file, O_RDONLY, MYF(MY_WME))) < 0)
8634
fprintf(stderr, "Can't open file: '%s' errno: %d\n",
8635
opt_file, my_errno);
8638
if (my_seek(handler, opt_offset, SEEK_SET, MYF(MY_WME)) !=
8641
fprintf(stderr, "Can't set position %lld file: '%s' errno: %d\n",
8642
opt_offset, opt_file, my_errno);
8647
opt_offset+= TRANSLOG_PAGE_SIZE, opt_pages--)
8649
if (my_pread(handler, buffer, TRANSLOG_PAGE_SIZE, opt_offset,
8652
if (my_errno == HA_ERR_FILE_TOO_SHORT)
8654
fprintf(stderr, "Can't read page at position %lld file: '%s' "
8655
"errno: %d\n", opt_offset, opt_file, my_errno);
8662
my_close(handler, MYF(0));
8663
free_defaults(default_argv);
8665
return 0; /* No compiler warning */
8668
my_close(handler, MYF(0));
8669
fprintf(stderr, "%s: FAILED\n", my_progname_short);
8670
free_defaults(default_argv);
8674
#include "ma_check_standalone.h"