~mysqlatfacebook/mysqlatfacebook/tools

« back to all changes in this revision

Viewing changes to prefetch/readahead.py

  • Committer: Domas Mituzas
  • Date: 2011-12-03 19:55:54 UTC
  • Revision ID: domas@facebook.com-20111203195554-xvvgeq6gw01cs4yq
Replication prefetcher:
* binlog.py - binlog reading class
* readahead.py - main chassis for replication event prefetching
* mysql.py - helper MySQL class
* rewriters.py - helper query 'rewrite' routines
* custom_query_prefetch.py - example for query-specific prefetcher
* fake_updates_prefetch.py - InnoDB fake changes based prefetcher

Licensed under Apache License 2.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python
 
2
#
 
3
#   Copyright 2011 Facebook
 
4
#
 
5
#   Licensed under the Apache License, Version 2.0 (the "License");
 
6
#   you may not use this file except in compliance with the License.
 
7
#   You may obtain a copy of the License at
 
8
#
 
9
#       http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
#   Unless required by applicable law or agreed to in writing, software
 
12
#   distributed under the License is distributed on an "AS IS" BASIS,
 
13
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
14
#   See the License for the specific language governing permissions and
 
15
#   limitations under the License.
 
16
#
 
17
 
 
18
import sys
 
19
import os
 
20
import mysql
 
21
import traceback
 
22
import time
 
23
 
 
24
from threading import Thread
 
25
import Queue
 
26
from binlog import Binlog
 
27
import rewriters
 
28
import re
 
29
 
 
30
# Return whole string after multiple comment groups
 
31
initial_comment_re = re.compile("^\s*(/\*.*?\*/\s*)*(.*)")
 
32
def strip_initial_comment(query):
 
33
    return initial_comment_re.findall(query)[-1][-1]
 
34
 
 
35
 
 
36
# Debugging output
 
37
def d(s):
 
38
    # print "DEBUG:", s
 
39
    pass
 
40
 
 
41
 
 
42
class Executor (object):
 
43
    """Rewriters inheriting from Executor are given more freedom to execute"""
 
44
    def run(self, event, db):
 
45
        """Must implement run() to do anything, or it is noop"""
 
46
        pass
 
47
 
 
48
 
 
49
class Slave (mysql.MySQL):
 
50
    def slave_status(self):
 
51
        status = self.q("SHOW SLAVE STATUS")
 
52
        return status and status[0] or None
 
53
 
 
54
    def sleep(self, threshold, comment=None):
 
55
        if comment:
 
56
            self.q("/* %s */ SELECT SLEEP(%f)" % (comment, threshold))
 
57
        else:
 
58
            self.q("SELECT SLEEP(%f)" % threshold)
 
59
 
 
60
 
 
61
class Runner(Thread):
 
62
    """Worker thread that runs events placed on a queue"""
 
63
    def __init__(self, prefetcher):
 
64
        self.db = None
 
65
        self.queue = prefetcher.queue
 
66
        self.detect = prefetcher.detect
 
67
        Thread.__init__(self)
 
68
        self.daemon = True
 
69
 
 
70
    def run(self):
 
71
        try:
 
72
            if not self.db:
 
73
                self.db = Slave()
 
74
 
 
75
            while True:
 
76
                event = self.queue.get(block=True)
 
77
                rewriter = self.detect(event)
 
78
                if rewriter == None:
 
79
                    continue
 
80
 
 
81
                try:
 
82
                    # We give up full control to Executors
 
83
                    if isinstance(rewriter, Executor):
 
84
                        rewriter.run(event, self.db)
 
85
                        continue
 
86
 
 
87
                    queries = rewriter(event)
 
88
                    if queries == None:
 
89
                        continue
 
90
 
 
91
                    if type(queries) == str:
 
92
                        queries = (queries, )
 
93
                    for query in queries:
 
94
                        self.db.q("/* prefetching at %d */ %s" %
 
95
                                    (event.pos, query))
 
96
 
 
97
                except mysql.Error:
 
98
                    self.db.q("ROLLBACK")
 
99
        except:
 
100
            traceback.print_exc()
 
101
            os.kill(os.getpid(), 9)
 
102
 
 
103
 
 
104
class Prefetch:
 
105
    """Main prefetching chassis"""
 
106
    def __init__(self):
 
107
        # Number of runner threads
 
108
        self.runners = 4
 
109
        # How much do we lag before we step in
 
110
        self.threshold = 1.0
 
111
        # how much do we jump to future from actual execution (in seconds)
 
112
        self.window_start = 1
 
113
        # how much ahead we actually work (in seconds)
 
114
        self.window_stop = 240
 
115
        # Time limit (seconds) - based on elapsed time on master,
 
116
        # should we try prefetching
 
117
        self.elapsed_limit = 4
 
118
        # Where are all the logs
 
119
        self.logpath = "/var/lib/mysql/"
 
120
        # How often should checks run (hz)
 
121
        self.frequency = 10
 
122
        # Should comments be stripped from query inside event
 
123
        self.strip_comments = False
 
124
        # Custom rewriters for specific queries
 
125
        self.prefixes = [
 
126
          # ("INSERT INTO customtable", rewriters.custom_table_rewriter),
 
127
        ]
 
128
        self.rewriter = rewriters.rollback
 
129
        self.wait_for_replication = True
 
130
        self.worker_init_connect = "SET SESSION long_query_time=60"
 
131
 
 
132
        # Better not to override this from outside
 
133
        self.queue = None
 
134
 
 
135
    def detect(self, event):
 
136
        """Return rewriting method for event"""
 
137
        if event.query in (None, "", "BEGIN", "COMMIT", "ROLLBACK"):
 
138
            return None
 
139
 
 
140
        # Allow custom per-prefix rewriter
 
141
        if self.prefixes:
 
142
            query = strip_initial_comment(event.query)
 
143
            if self.strip_comments:
 
144
                event.query = query
 
145
            for prefix, rewriter in self.prefixes:
 
146
                if isinstance(prefix, str):
 
147
                    if query.startswith(prefix):
 
148
                        return rewriter
 
149
                elif prefix.match(query):
 
150
                    return rewriter
 
151
 
 
152
        return self.rewriter
 
153
 
 
154
    def binlog_from_status(self, status):
 
155
        """ Open binlog object based on SHOW SLAVE STATUS """
 
156
        filepath = self.logpath + status["Relay_Log_File"]
 
157
        pos = int(status["Relay_Log_Pos"])
 
158
        binlog = Binlog(filepath)
 
159
        binlog.seek(pos)
 
160
        return binlog
 
161
 
 
162
    def prefetch(self):
 
163
        """Main service routine to glue everything together"""
 
164
        slave = Slave(init_connect=self.worker_init_connect)
 
165
        prefetched = None
 
166
        cycles_count = 0
 
167
 
 
168
        self.queue = Queue.Queue(self.runners * 4)
 
169
        for thread in range(self.runners):
 
170
            Runner(self).start()
 
171
 
 
172
        while True:
 
173
            d("Running prefetch check")
 
174
 
 
175
            st = slave.slave_status()
 
176
            if not st or st['Slave_SQL_Running'] != "Yes":
 
177
                if not self.wait_for_replication:
 
178
                    raise EnvironmentError("Replication not running! Bye")
 
179
                else:
 
180
                    time.sleep(10)
 
181
                    continue
 
182
 
 
183
            if st['Seconds_Behind_Master'] is not None:
 
184
                # We compensate for negative lag here
 
185
                lag = max(int(st['Seconds_Behind_Master']), 0)
 
186
 
 
187
                if lag <= self.threshold:
 
188
                    d("Skipping for now, lag is below threshold")
 
189
                    slave.sleep(1.0 / self.frequency,
 
190
                                "Lag (%d) is below threshold (%d)" % \
 
191
                                (lag, self.threshold))
 
192
                    continue
 
193
 
 
194
            binlog = self.binlog_from_status(st)
 
195
            # Look at where we are
 
196
            event = binlog.next()
 
197
 
 
198
            # Though this should not happen usually...
 
199
            if not event:
 
200
                slave.sleep(1.0 / self.frequency, "Reached the end of binlog")
 
201
                continue
 
202
 
 
203
            sql_time = event.timestamp
 
204
 
 
205
            # Jump ahead if we have already prefetched on this file
 
206
            if prefetched and prefetched['file'] == binlog.filename and \
 
207
                    prefetched['pos'] > event.pos:
 
208
 
 
209
                d("Jump to %d" % prefetched['pos'])
 
210
                binlog.seek(prefetched['pos'])
 
211
 
 
212
            # Iterate through the stuff in front
 
213
            for event in binlog:
 
214
                if len(event.query) < 10:
 
215
                    continue
 
216
                # Skip few entries, leave them for SQL thread
 
217
                if event.timestamp < sql_time + self.window_start:
 
218
                    d("Skipping, too close to SQL thread")
 
219
                    continue
 
220
 
 
221
                if event.timestamp > sql_time + self.window_stop:
 
222
                    d("Breaking, too far from SQL thread")
 
223
                    break
 
224
 
 
225
                if event.elapsed > self.elapsed_limit:
 
226
                    d("Skipping, elapsed too long")
 
227
                    continue
 
228
 
 
229
                try:
 
230
                    self.queue.put(event, block=True, timeout=1)
 
231
                except Queue.Full:
 
232
                    d("Queue full, breaking out of binlog")
 
233
                    break
 
234
                cycles_count += 1
 
235
                if not cycles_count % 10000:
 
236
                    break
 
237
 
 
238
            d("Got ahead to %d" % binlog.position)
 
239
            prefetched = {'pos': binlog.position, 'file': binlog.filename}
 
240
            slave.sleep(1.0 / self.frequency,
 
241
                "Got ahead to %d" % binlog.position)
 
242
 
 
243
    def run(self):
 
244
        try:
 
245
            self.prefetch()
 
246
        except:
 
247
            traceback.print_exc()
 
248
            sys.exit()
 
249
 
 
250
if __name__ == "__main__":
 
251
    """As standalone application it will start rollback-based prefetcher"""
 
252
    Prefetch().run()