3
# Copyright 2011 Facebook
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
24
from threading import Thread
26
from binlog import Binlog
30
# Return whole string after multiple comment groups
31
initial_comment_re = re.compile("^\s*(/\*.*?\*/\s*)*(.*)")
32
def strip_initial_comment(query):
33
return initial_comment_re.findall(query)[-1][-1]
42
class Executor (object):
43
"""Rewriters inheriting from Executor are given more freedom to execute"""
44
def run(self, event, db):
45
"""Must implement run() to do anything, or it is noop"""
49
class Slave (mysql.MySQL):
50
def slave_status(self):
51
status = self.q("SHOW SLAVE STATUS")
52
return status and status[0] or None
54
def sleep(self, threshold, comment=None):
56
self.q("/* %s */ SELECT SLEEP(%f)" % (comment, threshold))
58
self.q("SELECT SLEEP(%f)" % threshold)
62
"""Worker thread that runs events placed on a queue"""
63
def __init__(self, prefetcher):
65
self.queue = prefetcher.queue
66
self.detect = prefetcher.detect
76
event = self.queue.get(block=True)
77
rewriter = self.detect(event)
82
# We give up full control to Executors
83
if isinstance(rewriter, Executor):
84
rewriter.run(event, self.db)
87
queries = rewriter(event)
91
if type(queries) == str:
94
self.db.q("/* prefetching at %d */ %s" %
100
traceback.print_exc()
101
os.kill(os.getpid(), 9)
105
"""Main prefetching chassis"""
107
# Number of runner threads
109
# How much do we lag before we step in
111
# how much do we jump to future from actual execution (in seconds)
112
self.window_start = 1
113
# how much ahead we actually work (in seconds)
114
self.window_stop = 240
115
# Time limit (seconds) - based on elapsed time on master,
116
# should we try prefetching
117
self.elapsed_limit = 4
118
# Where are all the logs
119
self.logpath = "/var/lib/mysql/"
120
# How often should checks run (hz)
122
# Should comments be stripped from query inside event
123
self.strip_comments = False
124
# Custom rewriters for specific queries
126
# ("INSERT INTO customtable", rewriters.custom_table_rewriter),
128
self.rewriter = rewriters.rollback
129
self.wait_for_replication = True
130
self.worker_init_connect = "SET SESSION long_query_time=60"
132
# Better not to override this from outside
135
def detect(self, event):
136
"""Return rewriting method for event"""
137
if event.query in (None, "", "BEGIN", "COMMIT", "ROLLBACK"):
140
# Allow custom per-prefix rewriter
142
query = strip_initial_comment(event.query)
143
if self.strip_comments:
145
for prefix, rewriter in self.prefixes:
146
if isinstance(prefix, str):
147
if query.startswith(prefix):
149
elif prefix.match(query):
154
def binlog_from_status(self, status):
155
""" Open binlog object based on SHOW SLAVE STATUS """
156
filepath = self.logpath + status["Relay_Log_File"]
157
pos = int(status["Relay_Log_Pos"])
158
binlog = Binlog(filepath)
163
"""Main service routine to glue everything together"""
164
slave = Slave(init_connect=self.worker_init_connect)
168
self.queue = Queue.Queue(self.runners * 4)
169
for thread in range(self.runners):
173
d("Running prefetch check")
175
st = slave.slave_status()
176
if not st or st['Slave_SQL_Running'] != "Yes":
177
if not self.wait_for_replication:
178
raise EnvironmentError("Replication not running! Bye")
183
if st['Seconds_Behind_Master'] is not None:
184
# We compensate for negative lag here
185
lag = max(int(st['Seconds_Behind_Master']), 0)
187
if lag <= self.threshold:
188
d("Skipping for now, lag is below threshold")
189
slave.sleep(1.0 / self.frequency,
190
"Lag (%d) is below threshold (%d)" % \
191
(lag, self.threshold))
194
binlog = self.binlog_from_status(st)
195
# Look at where we are
196
event = binlog.next()
198
# Though this should not happen usually...
200
slave.sleep(1.0 / self.frequency, "Reached the end of binlog")
203
sql_time = event.timestamp
205
# Jump ahead if we have already prefetched on this file
206
if prefetched and prefetched['file'] == binlog.filename and \
207
prefetched['pos'] > event.pos:
209
d("Jump to %d" % prefetched['pos'])
210
binlog.seek(prefetched['pos'])
212
# Iterate through the stuff in front
214
if len(event.query) < 10:
216
# Skip few entries, leave them for SQL thread
217
if event.timestamp < sql_time + self.window_start:
218
d("Skipping, too close to SQL thread")
221
if event.timestamp > sql_time + self.window_stop:
222
d("Breaking, too far from SQL thread")
225
if event.elapsed > self.elapsed_limit:
226
d("Skipping, elapsed too long")
230
self.queue.put(event, block=True, timeout=1)
232
d("Queue full, breaking out of binlog")
235
if not cycles_count % 10000:
238
d("Got ahead to %d" % binlog.position)
239
prefetched = {'pos': binlog.position, 'file': binlog.filename}
240
slave.sleep(1.0 / self.frequency,
241
"Got ahead to %d" % binlog.position)
247
traceback.print_exc()
250
if __name__ == "__main__":
251
"""As standalone application it will start rollback-based prefetcher"""