~mysqlatfacebook/mysqlatfacebook/tools

« back to all changes in this revision

Viewing changes to faker/binlog.c

  • Committer: Domas Mituzas
  • Date: 2012-09-04 10:46:59 UTC
  • mto: This revision was merged to the branch mainline in revision 16.
  • Revision ID: domas@fb.com-20120904104659-pvzx1973dli3iv6u
Faker: C implementation of InnoDB fake changes based replication prefetcher

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
   Copyright 2012 Facebook
 
3
 
 
4
   Licensed under the Apache License, Version 2.0 (the "License");
 
5
   you may not use this file except in compliance with the License.
 
6
   You may obtain a copy of the License at
 
7
 
 
8
       http://www.apache.org/licenses/LICENSE-2.0
 
9
 
 
10
   Unless required by applicable law or agreed to in writing, software
 
11
   distributed under the License is distributed on an "AS IS" BASIS,
 
12
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
   See the License for the specific language governing permissions and
 
14
   limitations under the License.
 
15
*/
 
16
 
 
17
#include "faker.h"
 
18
 
 
19
#define BINLOG_BUFFER 1024*1024
 
20
 
 
21
BINLOG *open_binlog(char *filename)
 
22
{
 
23
    BINLOG *binlog;
 
24
    FILE *fd = fopen(filename, "r");
 
25
    if (!fd) {
 
26
        g_warning("Couldn't open binlog (%d)", errno);
 
27
        return NULL;
 
28
    }
 
29
 
 
30
 
 
31
    binlog = g_new0(BINLOG, 1);
 
32
    binlog->fd = fd;
 
33
    binlog->buffer = malloc(MAX_EVENT_SIZE);
 
34
    binlog->vbuffer = malloc(BINLOG_BUFFER);
 
35
    setvbuf(fd, binlog->vbuffer, _IOFBF, BINLOG_BUFFER);
 
36
 
 
37
    if (!fread(&binlog->fde, sizeof(binlog->fde), 1, binlog->fd)
 
38
        || binlog->fde.magic != 1852400382
 
39
        || binlog->fde.event_type != FORMAT_DESCRIPTION_EVENT
 
40
        || binlog->fde.binlog_version != 4)
 
41
        goto err;
 
42
 
 
43
    binlog->filename = filename;
 
44
    binlog->position = binlog->fde.next_position;
 
45
 
 
46
    fseek(binlog->fd, binlog->fde.next_position, SEEK_SET);
 
47
    return binlog;
 
48
 
 
49
  err:
 
50
    g_warning("Invalid binlog");
 
51
    if (binlog->fd)
 
52
        fclose(binlog->fd);
 
53
    free(binlog->buffer);
 
54
    free(binlog->vbuffer);
 
55
    g_free(binlog);
 
56
    return NULL;
 
57
}
 
58
 
 
59
EVENT *read_binlog(BINLOG * binlog)
 
60
{
 
61
    struct EH eh;
 
62
    struct QEH *qeh;
 
63
    struct IntvarEvent *iv;
 
64
    EVENT *event;
 
65
 
 
66
    for (;;) {
 
67
        if (!fread(&eh, sizeof(eh), 1, binlog->fd))
 
68
            return NULL;
 
69
 
 
70
        guint32 body_length = eh.event_length - binlog->fde.header_length;
 
71
 
 
72
        // We will skip events that are way too large for us
 
73
        if (body_length > MAX_EVENT_SIZE) {
 
74
            fseek(binlog->fd, body_length, SEEK_CUR);
 
75
            continue;
 
76
        }
 
77
 
 
78
        if (!fread(binlog->buffer, body_length, 1, binlog->fd))
 
79
            return NULL;
 
80
 
 
81
        switch (eh.event_type) {
 
82
        case QUERY_EVENT:
 
83
            /* Figure out where in binlog buffer query and database sit */
 
84
            qeh = binlog->buffer;
 
85
            char *db = binlog->buffer + sizeof(*qeh) + qeh->status_length;
 
86
            void *query = db + qeh->db_length + 1;
 
87
            guint32 query_length = body_length - (query - binlog->buffer);
 
88
 
 
89
            // Fill in event structure, return it too
 
90
            event = g_new0(EVENT, 1);
 
91
            event->log_file = strdup(binlog->filename);
 
92
            event->log_position = binlog->position;
 
93
            event->database = strndup(db, qeh->db_length);
 
94
            event->timestamp = eh.timestamp;
 
95
 
 
96
            // We use binary-compatible query storage
 
97
            event->query = malloc(query_length);
 
98
            event->query_length = query_length;
 
99
            memcpy(event->query, query, query_length);
 
100
            if (binlog->insert_id)
 
101
                event->insert_id = binlog->insert_id;
 
102
            if (binlog->last_insert_id)
 
103
                event->last_insert_id = binlog->last_insert_id;
 
104
 
 
105
            binlog->position += eh.event_length;
 
106
            return event;
 
107
        case STOP_EVENT:
 
108
        case ROTATE_EVENT:
 
109
            return NULL;
 
110
        case INTVAR_EVENT:
 
111
            iv = binlog->buffer;
 
112
            switch (iv->type) {
 
113
            case 1:
 
114
                binlog->insert_id = iv->value;
 
115
                break;
 
116
            case 2:
 
117
                binlog->last_insert_id = iv->value;
 
118
                break;
 
119
            }
 
120
            break;
 
121
        }
 
122
        binlog->position += eh.event_length;
 
123
    }
 
124
}
 
125
 
 
126
void seek_binlog(BINLOG * binlog, guint32 position)
 
127
{
 
128
    fseek(binlog->fd, position, SEEK_SET);
 
129
    binlog->position = position;
 
130
    binlog->insert_id = binlog->last_insert_id = 0;
 
131
}
 
132
 
 
133
void free_event(EVENT * event)
 
134
{
 
135
    free(event->log_file);
 
136
    free(event->database);
 
137
    free(event->query);
 
138
    g_free(event);
 
139
}
 
140
 
 
141
void free_binlog(BINLOG * binlog)
 
142
{
 
143
    g_free(binlog->filename);
 
144
    free(binlog->buffer);
 
145
    free(binlog->vbuffer);
 
146
    fclose(binlog->fd);
 
147
    g_free(binlog);
 
148
}