~mysqlatfacebook/mysqlatfacebook/tools

« back to all changes in this revision

Viewing changes to prefetch/binlog.py

  • Committer: Domas Mituzas
  • Date: 2011-12-03 19:55:54 UTC
  • Revision ID: domas@facebook.com-20111203195554-xvvgeq6gw01cs4yq
Replication prefetcher:
* binlog.py - binlog reading class
* readahead.py - main chassis for replication event prefetching
* mysql.py - helper MySQL class
* rewriters.py - helper query 'rewrite' routines
* custom_query_prefetch.py - example for query-specific prefetcher
* fake_updates_prefetch.py - InnoDB fake changes based prefetcher

Licensed under Apache License 2.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python
 
2
#
 
3
#   Copyright 2011 Facebook
 
4
#
 
5
#   Licensed under the Apache License, Version 2.0 (the "License");
 
6
#   you may not use this file except in compliance with the License.
 
7
#   You may obtain a copy of the License at
 
8
#
 
9
#       http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
#   Unless required by applicable law or agreed to in writing, software
 
12
#   distributed under the License is distributed on an "AS IS" BASIS,
 
13
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
14
#   See the License for the specific language governing permissions and
 
15
#   limitations under the License.
 
16
#
 
17
 
 
18
import struct
 
19
 
 
20
UNKNOWN_EVENT = 0
 
21
START_EVENT_V3 = 1
 
22
QUERY_EVENT = 2
 
23
STOP_EVENT = 3
 
24
ROTATE_EVENT = 4
 
25
INTVAR_EVENT = 5
 
26
LOAD_EVENT = 6
 
27
SLAVE_EVENT = 7
 
28
CREATE_FILE_EVENT = 8
 
29
APPEND_BLOCK_EVENT = 9
 
30
EXEC_LOAD_EVENT = 10
 
31
DELETE_FILE_EVENT = 11
 
32
NEW_LOAD_EVENT = 12
 
33
RAND_EVENT = 13
 
34
USER_VAR_EVENT = 14
 
35
FORMAT_DESCRIPTION_EVENT = 15
 
36
XID_EVENT = 16
 
37
BEGIN_LOAD_QUERY_EVENT = 17
 
38
EXECUTE_LOAD_QUERY_EVENT = 18
 
39
TABLE_MAP_EVENT = 19
 
40
PRE_GA_WRITE_ROWS_EVENT = 20
 
41
PRE_GA_UPDATE_ROWS_EVENT = 21
 
42
PRE_GA_DELETE_ROWS_EVENT = 22
 
43
WRITE_ROWS_EVENT = 23
 
44
UPDATE_ROWS_EVENT = 24
 
45
DELETE_ROWS_EVENT = 25
 
46
INCIDENT_EVENT = 26
 
47
HEARTBEAT_LOG_EVENT = 27
 
48
 
 
49
class MalformedBinlogException (ValueError):
 
50
        pass
 
51
 
 
52
class Event(object):
 
53
    """Fixed wrapper for event data"""
 
54
    def __init__(self, pos, type, db, query, timestamp,
 
55
                 elapsed, insert_id, last_insert_id):
 
56
        self.pos = pos
 
57
        self.type = type
 
58
        self.db = db
 
59
        self.query = query
 
60
        self.timestamp = timestamp
 
61
        if elapsed < 4294967200:
 
62
            self.elapsed = elapsed
 
63
        else:
 
64
            self.elapsed = 0
 
65
 
 
66
        self.insert_id = insert_id
 
67
        self.last_insert_id = last_insert_id
 
68
 
 
69
    def __str__(self):
 
70
        db = self.db or 'None'
 
71
        return "# Binlog Event at %d DB: %s TS: %d Elapsed: %d Query:\n%s" % (
 
72
            self.pos, db, self.timestamp, self.elapsed, self.query
 
73
        )
 
74
 
 
75
class Binlog(object):
 
76
    """Implements methods to access binary log"""
 
77
    def __init__(self, filename):
 
78
        self.file = open(filename)
 
79
 
 
80
        self.filename = filename
 
81
 
 
82
        self.insert_id = None
 
83
        self.last_insert_id = None
 
84
 
 
85
        self.until = None
 
86
 
 
87
        self.max_event_size = 1024 * 1024
 
88
 
 
89
        fde = struct.unpack("<IIBIIIHH50sIB", self.file.read(4 + 76))
 
90
        (magic, timestamp, type_code, server_id, event_length,
 
91
         next_position, flags, binlog_version, server_version,
 
92
         create_timestamp, self.header_length) = fde
 
93
 
 
94
        if magic != 1852400382:    # 0xfe bin
 
95
            raise MalformedBinlogException("Bad magic byte")
 
96
 
 
97
        if type_code != FORMAT_DESCRIPTION_EVENT:
 
98
            raise MalformedBinlogException("No FDE found")
 
99
        if binlog_version != 4:
 
100
            raise NotImplementedError("Only binlog format 4 (5.x) is supported")
 
101
 
 
102
        tail = self.file.read(event_length - 76)
 
103
        self.header_lengths = (0, ) + struct.unpack("%dB" % len(tail), tail)
 
104
        self.position = next_position
 
105
        self.start_position = self.position
 
106
 
 
107
    def read_event(self):
 
108
        """Returns a dictionary with query event data
 
109
             Returns None on end-of-file conditions or False on ignored events
 
110
        """
 
111
 
 
112
        if self.until and self.position >= self.until:
 
113
            return None
 
114
 
 
115
        header_data = self.file.read(self.header_length)
 
116
        if len(header_data) < 19:
 
117
            self.file.seek(self.position)
 
118
            return None
 
119
 
 
120
        # Avoiding LLL here
 
121
        (timestamp, event_type,
 
122
         server_id, event_length,
 
123
         next_position, flags
 
124
        ) = struct.unpack("<IBIIIH", header_data)
 
125
 
 
126
        total_tail = event_length - self.header_length
 
127
 
 
128
        cur_position = self.position
 
129
        self.position += event_length
 
130
 
 
131
        # We allow very efficient skipping of large events
 
132
        if event_length > self.max_event_size:
 
133
            self.file.seek(total_tail, 1)
 
134
            return False
 
135
 
 
136
        event_data = self.file.read(total_tail)
 
137
        # Operating on end of file
 
138
        if len(event_data) < total_tail:
 
139
            self.file.seek(cur_position)
 
140
            return None
 
141
 
 
142
        if event_type == QUERY_EVENT:
 
143
            hlen = 13
 
144
            (thread_id, elapsed, db_len, error_code, status_length) = \
 
145
                struct.unpack("<IIBHH", event_data[0:hlen])
 
146
            db_offset = hlen + status_length
 
147
            db = event_data[db_offset:db_offset + db_len]
 
148
            query = event_data[db_offset + db_len + 1:]
 
149
 
 
150
            return Event(cur_position, 'query', db, query, timestamp,
 
151
                         elapsed, self.insert_id, self.last_insert_id)
 
152
 
 
153
        elif event_type in (STOP_EVENT, ROTATE_EVENT):
 
154
            return False
 
155
        elif event_type == INTVAR_EVENT:
 
156
            (intvar_type, intvar_value) = struct.unpack("<BQ", event_data)
 
157
            if intvar_type == 1:
 
158
                self.last_insert_id = intvar_value
 
159
            elif intvar_type == 2:
 
160
                self.insert_id = intvar_value
 
161
            return False
 
162
        return False
 
163
 
 
164
    def seek(self, pos):
 
165
        self.position = pos
 
166
        self.file.seek(pos, 0)
 
167
        self.insert_id = None
 
168
        self.last_insert_id = None
 
169
 
 
170
    def rewind(self):
 
171
        self.seek(self.start_position)
 
172
 
 
173
    def set_event_size_limit(self, limit):
 
174
        """Allows setting size limit for events to be skipped"""
 
175
        self.max_event_size = limit
 
176
 
 
177
    def next(self):
 
178
        """One-by-one event reader for binlog"""
 
179
        while True:
 
180
            event = self.read_event()
 
181
            if event == False:
 
182
                continue
 
183
            elif event == None:
 
184
                return
 
185
            return event
 
186
 
 
187
    # This is sad copy of above but for the iterator interface
 
188
    def events(self):
 
189
        """Iterator for binlog"""
 
190
        while True:
 
191
            event = self.read_event()
 
192
            if event == False:
 
193
                continue
 
194
            elif event == None:
 
195
                return
 
196
            yield event
 
197
 
 
198
    def __iter__(self):
 
199
        return self.events()
 
200
 
 
201
# Simple standalone testcase
 
202
if __name__ == "__main__":
 
203
    import sys
 
204
    bl = Binlog(sys.argv[1])
 
205
    for entry in bl:
 
206
        print entry