~mysqlatfacebook/mysqlatfacebook/tools

« back to all changes in this revision

Viewing changes to faker/faker.c

  • Committer: Domas Mituzas
  • Date: 2012-09-04 10:46:59 UTC
  • mto: This revision was merged to the branch mainline in revision 16.
  • Revision ID: domas@fb.com-20120904104659-pvzx1973dli3iv6u
Faker: C implementation of InnoDB fake changes based replication prefetcher

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
   Copyright 2012 Facebook
 
3
 
 
4
   Licensed under the Apache License, Version 2.0 (the "License");
 
5
   you may not use this file except in compliance with the License.
 
6
   You may obtain a copy of the License at
 
7
 
 
8
       http://www.apache.org/licenses/LICENSE-2.0
 
9
 
 
10
   Unless required by applicable law or agreed to in writing, software
 
11
   distributed under the License is distributed on an "AS IS" BASIS,
 
12
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
   See the License for the specific language governing permissions and
 
14
   limitations under the License.
 
15
*/
 
16
 
 
17
#include "faker.h"
 
18
 
 
19
char *db_username = "root";
 
20
char *db_password = "";
 
21
char *log_directory = "/var/lib/mysql";
 
22
 
 
23
char *properties_file = NULL;
 
24
 
 
25
unsigned int db_port = 3306;
 
26
 
 
27
int prefetch_threshold = 1;
 
28
int prefetch_frequency = 10;
 
29
int prefetch_window_start = 2;
 
30
int prefetch_window_stop = 60;
 
31
 
 
32
int nthreads = 4;
 
33
gboolean debug, foreground, verbose;
 
34
 
 
35
void *queue;
 
36
 
 
37
void *worker(void *queue)
 
38
{
 
39
    EVENT *ev;
 
40
    mysql_thread_init();
 
41
    MYSQL *slave = init_slave("SET SESSION long_query_time=60, "
 
42
                              "innodb_fake_changes=1, sql_log_bin=0,"
 
43
                              "wait_timeout=5");
 
44
 
 
45
    GString *query = g_string_sized_new(100 * 1024);
 
46
    while ((ev = sq_pop(queue))) {
 
47
        // Build our query
 
48
        g_string_truncate(query, 0);
 
49
        g_string_append_printf(query, "USE %s;", ev->database);
 
50
        if (ev->insert_id)
 
51
            g_string_append_printf(query, "SET INSERT_ID=%llu;",
 
52
                                   (unsigned long long) ev->insert_id);
 
53
        if (ev->last_insert_id)
 
54
            g_string_append_printf(query, "SET LAST_INSERT_ID=%llu;",
 
55
                                   (unsigned long long)
 
56
                                   ev->last_insert_id);
 
57
        g_string_append_printf(query, "/* pos:%d */ ", ev->log_position);
 
58
        g_string_append_len(query, ev->query, ev->query_length);
 
59
        free_event(ev);
 
60
 
 
61
        if (debug) {
 
62
            printf("Would run: %*s\n", (int) query->len, query->str);
 
63
            slave_sleep(slave, 0.0001, "dry run");
 
64
            continue;
 
65
        }
 
66
        // Use our query
 
67
        MYSQL_RES *res = slave_query(slave, query->str, query->len);
 
68
        if (res)
 
69
            mysql_free_result(res);
 
70
    }
 
71
    mysql_close(slave);
 
72
    mysql_thread_end();
 
73
    return NULL;
 
74
}
 
75
 
 
76
gint64 get_monotonic_time()
 
77
{
 
78
    struct timespec tp;
 
79
    clock_gettime(CLOCK_MONOTONIC, &tp);
 
80
    return tp.tv_sec * 1000000 + tp.tv_nsec / 1000;
 
81
}
 
82
 
 
83
 
 
84
/* This skips comment blocks and gets to the first character in SQL query */
 
85
const char *skip_initial_comments(const char *query)
 
86
{
 
87
    char in_comment = 0, slash_before = 0, star_before = 0;
 
88
    const char *p;
 
89
 
 
90
    for (p = query; *p; p++) {
 
91
        if (isspace(*p))
 
92
            continue;
 
93
        if (!in_comment) {
 
94
            if (isalpha(*p))
 
95
                return p;
 
96
            if (slash_before && *p == '*') {
 
97
                in_comment = 1;
 
98
            }
 
99
            slash_before = (*p == '/');
 
100
        } else {
 
101
            if (star_before && *p == '/') {
 
102
                in_comment = 0;
 
103
            }
 
104
            star_before = (*p == '*');
 
105
        }
 
106
    }
 
107
    return *p ? p : NULL;
 
108
}
 
109
 
 
110
gboolean valid_event(EVENT * event)
 
111
{
 
112
    const char *prefix = skip_initial_comments(event->query);
 
113
    if (!prefix)
 
114
        return FALSE;
 
115
    if (!strncasecmp(prefix, "UPDATE", sizeof("UPDATE") - 1) ||
 
116
        !strncasecmp(prefix, "INSERT", sizeof("INSERT") - 1) ||
 
117
        !strncasecmp(prefix, "DELETE", sizeof("DELETE") - 1)
 
118
        || !strncasecmp(prefix, "REPLACE", sizeof("REPLACE") - 1))
 
119
        return TRUE;
 
120
    else
 
121
        return FALSE;
 
122
}
 
123
 
 
124
static GOptionEntry entries[] = {
 
125
    {"port", 'P', 0, G_OPTION_ARG_INT, &db_port, "MySQL port number", NULL},
 
126
    {"username", 'u', 0, G_OPTION_ARG_STRING, &db_username, "MySQL username",
 
127
     NULL},
 
128
    {"password", 'p', 0, G_OPTION_ARG_STRING, &db_password, "MySQL password",
 
129
     NULL},
 
130
    {"threads", 't', 0, G_OPTION_ARG_INT, &nthreads,
 
131
     "Number of parallel threads", NULL},
 
132
    {"debug", 'd', 0, G_OPTION_ARG_NONE, &debug, "Debug (dry run) mode", NULL},
 
133
    {"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose,
 
134
     "Print lots of verbose stuff", NULL},
 
135
    {"foreground", 'F', 0, G_OPTION_ARG_NONE, &foreground,
 
136
     "Run in foreground, don't daemonize", NULL},
 
137
    {"properties", 'f', 0, G_OPTION_ARG_FILENAME, &properties_file,
 
138
     "File with authentication properties", NULL},
 
139
    {NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL}
 
140
};
 
141
 
 
142
int main(int ac, char **av)
 
143
{
 
144
    BINLOG *relaylog = NULL;
 
145
    EVENT *event;
 
146
    SLAVE_STATUS st;
 
147
 
 
148
    GError *error = NULL;
 
149
 
 
150
    char *prefetched_file = NULL;
 
151
    guint prefetched_pos = 0;
 
152
    guint cycles = 0;
 
153
 
 
154
    g_thread_init(NULL);
 
155
    queue = sq_init(50);
 
156
 
 
157
    mysql_library_init(0, NULL, NULL);
 
158
    mysql_thread_init();
 
159
 
 
160
    GOptionContext *context =
 
161
        g_option_context_new("fake changes based replication prefetcher");
 
162
    g_option_context_add_main_entries(context, entries, NULL);
 
163
    if (!g_option_context_parse(context, &ac, &av, &error)) {
 
164
        g_print("option parsing failed: %s, try --help\n", error->message);
 
165
        exit(EXIT_FAILURE);
 
166
    }
 
167
    g_option_context_free(context);
 
168
 
 
169
    // We allow k/v file passed for configuration
 
170
    if (properties_file) {
 
171
        FILE *pf = fopen(properties_file, "r");
 
172
        char lb[128];
 
173
        char *key, *value;
 
174
 
 
175
        if (!pf) {
 
176
            g_print("Could not open properties file\n");
 
177
            exit(EXIT_FAILURE);
 
178
        }
 
179
 
 
180
        while (fgets(lb, sizeof(lb) - 1, pf)) {
 
181
            key = strtok(lb, " \t\n");
 
182
            value = strtok(NULL, " \t\n");
 
183
 
 
184
            if (!key || !value)
 
185
                continue;
 
186
 
 
187
            if (!strcmp(key, "mysql_user"))
 
188
                db_username = strdup(value);
 
189
            else if (!strcmp(key, "mysql_pass"))
 
190
                db_password = strdup(value);
 
191
        }
 
192
        fclose(pf);
 
193
    }
 
194
 
 
195
    MYSQL *slave = init_slave("SET SESSION long_query_time=60, "
 
196
                              "innodb_fake_changes=1");
 
197
 
 
198
    if (!foreground && !daemon(0, 0))
 
199
        g_warning("Couldn't daemonize");
 
200
 
 
201
 
 
202
    while (nthreads--)
 
203
        g_thread_create(worker, queue, 0, NULL);
 
204
 
 
205
    for (;;) {
 
206
        slave_status(slave, &st);
 
207
        if (!st.sql_running) {
 
208
            slave_sleep(slave, 10, "Waiting for replication");
 
209
            continue;
 
210
        }
 
211
 
 
212
        if (st.lag > 0 && st.lag <= prefetch_threshold) {
 
213
            if (verbose)
 
214
                printf("Lag (%d) is below threshold\n", st.lag);
 
215
 
 
216
            slave_sleep(slave, 1.0 / prefetch_frequency,
 
217
                        "Lag is below threshold");
 
218
            continue;
 
219
        }
 
220
 
 
221
        relaylog = relaylog_from_slave(slave, relaylog);
 
222
        if (!relaylog) {
 
223
            slave_sleep(slave, 1.0 / prefetch_frequency, "Log ran away");
 
224
            continue;
 
225
        }
 
226
 
 
227
        event = read_binlog(relaylog);
 
228
        if (!event) {
 
229
            if (verbose)
 
230
                printf("No more events returned from relay log, "
 
231
                       "prefetched position: %d \n", prefetched_pos);
 
232
 
 
233
            slave_sleep(slave, 1.0 / prefetch_frequency,
 
234
                        "Reached the end of binlog");
 
235
            continue;
 
236
        }
 
237
 
 
238
        time_t sql_time = event->timestamp;
 
239
 
 
240
        // Save position of a new file
 
241
        if (!prefetched_file || strcmp(relaylog->filename, prefetched_file)) {
 
242
            if (verbose)
 
243
                printf("Looking at a new (%s)  file instead of old one (%s)\n",
 
244
                       relaylog->filename, prefetched_file);
 
245
 
 
246
            if (prefetched_file)
 
247
                free(prefetched_file);
 
248
            prefetched_file = strdup(relaylog->filename);
 
249
            prefetched_pos = relaylog->position;
 
250
        } else if (prefetched_pos > relaylog->position) {
 
251
            // We allow ourselves to jump to position ahead, if filename matches
 
252
            if (verbose)
 
253
                printf("Jumping to %s:%d\n", prefetched_file, prefetched_pos);
 
254
            seek_binlog(relaylog, prefetched_pos);
 
255
        }
 
256
        if (verbose)
 
257
            printf("Starting binlog read at %d, prefetched at : %d\n",
 
258
                   relaylog->position, prefetched_pos);
 
259
 
 
260
        gboolean nosleep = FALSE;
 
261
        while ((event = read_binlog(relaylog))) {
 
262
            prefetched_pos = event->log_position;
 
263
            // Skip too short or too early events
 
264
            if (event->timestamp < sql_time + prefetch_window_start) {
 
265
                if (verbose)
 
266
                    printf("Event timestamp (%d) is too close to "
 
267
                           "replication thread timestamp (%d)\n",
 
268
                           (int) event->timestamp, (int) sql_time);
 
269
                free_event(event);
 
270
                continue;
 
271
            }
 
272
            // Break from the loop if events are too far
 
273
            if (event->timestamp > sql_time + prefetch_window_stop) {
 
274
                if (verbose)
 
275
                    printf("Event timestamp (%d) is way beyond replication "
 
276
                           "thread timestamp (%d)\n",
 
277
                           (int) event->timestamp, (int) sql_time);
 
278
                free_event(event);
 
279
                break;
 
280
            }
 
281
            // Validate event by known prefixes
 
282
            if (!valid_event(event)) {
 
283
                free_event(event);
 
284
                continue;
 
285
            }
 
286
            // We go async here!
 
287
            gint64 start = get_monotonic_time();
 
288
            if (verbose)
 
289
                printf("Executing event at %s:%d\n", relaylog->filename,
 
290
                       event->log_position);
 
291
            sq_push(queue, event);
 
292
 
 
293
            // Queue blocked us for too long, or too many events. re-evaluate!
 
294
            if (get_monotonic_time() - start > 2000000 || !(++cycles % 1000)) {
 
295
                if (verbose)
 
296
                    printf("Blocked for too long or cycle limits kicked in\n");
 
297
 
 
298
                nosleep = TRUE;
 
299
                break;
 
300
            }
 
301
        }
 
302
 
 
303
        if (!nosleep) {
 
304
            char *sleep_state =
 
305
                g_strdup_printf("Got ahead to %d", prefetched_pos);
 
306
            slave_sleep(slave, 1.0 / prefetch_frequency, sleep_state);
 
307
            g_free(sleep_state);
 
308
        }
 
309
        if (verbose)
 
310
            printf("Reached end of relay log at %s:%d\n", relaylog->filename,
 
311
                   prefetched_pos);
 
312
    }
 
313
 
 
314
}