/* This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . Authors: Domas Mituzas, Facebook ( domas at fb dot com ) Mark Leith, Oracle Corporation (mark dot leith at oracle dot com) Andrew Hutchings, SkySQL (andrew at skysql dot com) */ #define _LARGEFILE64_SOURCE #define _FILE_OFFSET_BITS 64 #include #include #include #include #include #include #include #include #include #include #include #include #include #include "binlog.h" #include "server_detect.h" #include "common.h" #include "g_unix_signal.h" #include "config.h" char *regexstring=NULL; const char DIRECTORY[]= "export"; const char BINLOG_DIRECTORY[]= "binlog_snapshot"; const char DAEMON_BINLOGS[]= "binlogs"; static GMutex * init_mutex = NULL; /* Program options */ gchar *output_directory= NULL; guint statement_size= 1000000; guint rows_per_file= 0; int longquery= 60; int build_empty_files= 0; int need_dummy_read= 0; int compress_output= 0; int killqueries= 0; int detected_server= 0; guint snapshot_interval= 60; gboolean daemon_mode= FALSE; gchar *ignore_engines= NULL; char **ignore= NULL; gchar *tables_list= NULL; char **tables= NULL; gboolean need_binlogs= FALSE; gchar *binlog_directory= NULL; gchar *daemon_binlog_directory= NULL; gchar *logfile= NULL; FILE *logoutfile= NULL; gboolean no_schemas= FALSE; gboolean no_locks= FALSE; GList *innodb_tables= NULL; GList *non_innodb_table= NULL; GList *table_schemas= NULL; gint non_innodb_table_counter= 0; gint non_innodb_done= 0; // For daemon mode, 0 or 1 guint dump_number= 0; guint binlog_connect_id= 0; gboolean shutdown_triggered= FALSE; GAsyncQueue *start_scheduled_dump; GMainLoop *m1; int errors; static GOptionEntry entries[] = { { "database", 'B', 0, G_OPTION_ARG_STRING, &db, "Database to dump", NULL }, { "tables-list", 'T', 0, G_OPTION_ARG_STRING, &tables_list, "Comma delimited table list to dump (does not exclude regex option)", NULL }, { "outputdir", 'o', 0, G_OPTION_ARG_FILENAME, &output_directory, "Directory to output files to", NULL }, { "statement-size", 's', 0, G_OPTION_ARG_INT, &statement_size, "Attempted size of INSERT statement in bytes, default 1000000", NULL}, { "rows", 'r', 0, G_OPTION_ARG_INT, &rows_per_file, "Try to split tables into chunks of this many rows", NULL}, { "compress", 'c', 0, G_OPTION_ARG_NONE, &compress_output, "Compress output files", NULL}, { "build-empty-files", 'e', 0, G_OPTION_ARG_NONE, &build_empty_files, "Build dump files even if no data available from table", NULL}, { "regex", 'x', 0, G_OPTION_ARG_STRING, ®exstring, "Regular expression for 'db.table' matching", NULL}, { "ignore-engines", 'i', 0, G_OPTION_ARG_STRING, &ignore_engines, "Comma delimited list of storage engines to ignore", NULL }, { "no-schemas", 'm', 0, G_OPTION_ARG_NONE, &no_schemas, "Do not dump table schemas with the data", NULL }, { "no-locks", 'k', 0, G_OPTION_ARG_NONE, &no_locks, "Do not execute the temporary shared read lock. WARNING: This will cause inconsistent backups", NULL }, { "long-query-guard", 'l', 0, G_OPTION_ARG_INT, &longquery, "Set long query timer in seconds, default 60", NULL }, { "kill-long-queries", 'k', 0, G_OPTION_ARG_NONE, &killqueries, "Kill long running queries (instead of aborting)", NULL }, { "binlogs", 'b', 0, G_OPTION_ARG_NONE, &need_binlogs, "Get a snapshot of the binary logs as well as dump data", NULL }, { "daemon", 'D', 0, G_OPTION_ARG_NONE, &daemon_mode, "Enable daemon mode", NULL }, { "snapshot-interval", 'I', 0, G_OPTION_ARG_INT, &snapshot_interval, "Interval between each dump snapshot (in minutes), requires --daemon, default 60", NULL }, { "logfile", 'L', 0, G_OPTION_ARG_FILENAME, &logfile, "Log file name to use, by default stdout is used", NULL }, { NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL } }; struct tm tval; void dump_schema_data(MYSQL *conn, char *database, char *table, char *filename); void dump_schema(char *database, char *table, struct configuration *conf); void dump_table(MYSQL *conn, char *database, char *table, struct configuration *conf, gboolean is_innodb); guint64 dump_table_data(MYSQL *, FILE *, char *, char *, char *); void dump_database(MYSQL *, char *); GList * get_chunks_for_table(MYSQL *, char *, char*, struct configuration *conf); guint64 estimate_count(MYSQL *conn, char *database, char *table, char *field, char *from, char *to); void dump_table_data_file(MYSQL *conn, char *database, char *table, char *where, char *filename); void create_backup_dir(char *directory); gboolean write_data(FILE *,GString*); gboolean check_regex(char *database, char *table); void no_log(const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer user_data); void set_verbose(guint verbosity); MYSQL *reconnect_for_binlog(MYSQL *thrconn); void start_dump(MYSQL *conn); MYSQL *create_main_connection(); void *binlog_thread(void *data); void *exec_thread(void *data); void write_log_file(const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer user_data); void no_log(const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer user_data) { (void) log_domain; (void) log_level; (void) message; (void) user_data; } void set_verbose(guint verbosity) { if (logfile) { logoutfile = g_fopen(logfile, "w"); if (!logoutfile) { g_critical("Could not open log file '%s' for writing: %d", logfile, errno); exit(EXIT_FAILURE); } } switch (verbosity) { case 0: g_log_set_handler(NULL, (GLogLevelFlags)(G_LOG_LEVEL_MASK), no_log, NULL); break; case 1: g_log_set_handler(NULL, (GLogLevelFlags)(G_LOG_LEVEL_WARNING | G_LOG_LEVEL_MESSAGE), no_log, NULL); if (logfile) g_log_set_handler(NULL, (GLogLevelFlags)(G_LOG_LEVEL_ERROR | G_LOG_LEVEL_CRITICAL), write_log_file, NULL); break; case 2: g_log_set_handler(NULL, (GLogLevelFlags)(G_LOG_LEVEL_MESSAGE), no_log, NULL); if (logfile) g_log_set_handler(NULL, (GLogLevelFlags)(G_LOG_LEVEL_WARNING | G_LOG_LEVEL_ERROR | G_LOG_LEVEL_WARNING | G_LOG_LEVEL_ERROR | G_LOG_LEVEL_CRITICAL), write_log_file, NULL); break; default: if (logfile) g_log_set_handler(NULL, (GLogLevelFlags)(G_LOG_LEVEL_MASK), write_log_file, NULL); break; } } gboolean sig_triggered(gpointer user_data) { (void) user_data; g_message("Shutting down gracefully"); shutdown_triggered= TRUE; g_main_loop_quit(m1); return FALSE; } void clear_dump_directory() { GError *error= NULL; char* dump_directory= g_strdup_printf("%s/%d", output_directory, dump_number); GDir* dir= g_dir_open(dump_directory, 0, &error); if (error) { g_critical("cannot open directory %s, %s\n", dump_directory, error->message); errors++; return; } const gchar* filename= NULL; while((filename= g_dir_read_name(dir))) { gchar* path= g_build_filename(dump_directory, filename, NULL); if (g_unlink(path) == -1) { g_critical("error removing file %s (%d)\n", path, errno); errors++; return; } g_free(path); } g_dir_close(dir); g_free(dump_directory); } gboolean run_snapshot(gpointer *data) { (void) data; g_async_queue_push(start_scheduled_dump,GINT_TO_POINTER(1)); return (shutdown_triggered) ? FALSE : TRUE; } /* Check database.table string against regular expression */ gboolean check_regex(char *database, char *table) { /* This is not going to be used in threads */ static pcre *re = NULL; int rc; int ovector[9]= {0}; const char *error; int erroroffset; char *p; /* Let's compile the RE before we do anything */ if (!re) { re = pcre_compile(regexstring,PCRE_CASELESS|PCRE_MULTILINE,&error,&erroroffset,NULL); if(!re) { g_critical("Regular expression fail: %s", error); exit(EXIT_FAILURE); } } p=g_strdup_printf("%s.%s",database,table); rc = pcre_exec(re,NULL,p,strlen(p),0,0,ovector,9); g_free(p); return (rc>0)?TRUE:FALSE; } /* Write some stuff we know about snapshot, before it changes */ void write_snapshot_info(MYSQL *conn, FILE *file) { MYSQL_RES *master=NULL, *slave=NULL; MYSQL_FIELD *fields; MYSQL_ROW row; char *masterlog=NULL; char *masterpos=NULL; char *slavehost=NULL; char *slavelog=NULL; char *slavepos=NULL; mysql_query(conn,"SHOW MASTER STATUS"); master=mysql_store_result(conn); if (master && (row=mysql_fetch_row(master))) { masterlog=row[0]; masterpos=row[1]; } mysql_query(conn, "SHOW SLAVE STATUS"); slave=mysql_store_result(conn); guint i; if (slave && (row=mysql_fetch_row(slave))) { fields=mysql_fetch_fields(slave); for (i=0; iconf; // mysql_init is not thread safe, especially in Connector/C g_mutex_lock(init_mutex); MYSQL *thrconn = mysql_init(NULL); g_mutex_unlock(init_mutex); mysql_options(thrconn,MYSQL_READ_DEFAULT_GROUP,"mydumper"); if (compress_protocol) mysql_options(thrconn,MYSQL_OPT_COMPRESS,NULL); if (!mysql_real_connect(thrconn, hostname, username, password, NULL, port, socket_path, 0)) { g_critical("Failed to connect to database: %s", mysql_error(thrconn)); exit(EXIT_FAILURE); } else { g_message("Thread %d connected using MySQL connection ID %lu", td->thread_id, mysql_thread_id(thrconn)); } if ((detected_server == SERVER_TYPE_MYSQL) && mysql_query(thrconn, "SET SESSION wait_timeout = 2147483")){ g_warning("Failed to increase wait_timeout: %s", mysql_error(thrconn)); } if (mysql_query(thrconn, "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")) { g_warning("Failed to set isolation level: %s", mysql_error(thrconn)); } if (mysql_query(thrconn, "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */")) { g_critical("Failed to start consistent snapshot: %s",mysql_error(thrconn)); errors++; } /* Unfortunately version before 4.1.8 did not support consistent snapshot transaction starts, so we cheat */ if (need_dummy_read) { mysql_query(thrconn,"SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.mydumperdummy"); MYSQL_RES *res=mysql_store_result(thrconn); if (res) mysql_free_result(res); } mysql_query(thrconn, "/*!40101 SET NAMES binary*/"); g_async_queue_push(conf->ready,GINT_TO_POINTER(1)); struct job* job= NULL; struct table_job* tj= NULL; struct schema_job* sj= NULL; struct binlog_job* bj= NULL; for(;;) { GTimeVal tv; g_get_current_time(&tv); g_time_val_add(&tv,1000*1000*1); job=(struct job *)g_async_queue_pop(conf->queue); if (shutdown_triggered && (job->type != JOB_SHUTDOWN)) { continue; } switch (job->type) { case JOB_DUMP: tj=(struct table_job *)job->job_data; if (tj->where) g_message("Thread %d dumping data for `%s`.`%s` where %s", td->thread_id, tj->database, tj->table, tj->where); else g_message("Thread %d dumping data for `%s`.`%s`", td->thread_id, tj->database, tj->table); dump_table_data_file(thrconn, tj->database, tj->table, tj->where, tj->filename); if(tj->database) g_free(tj->database); if(tj->table) g_free(tj->table); if(tj->where) g_free(tj->where); if(tj->filename) g_free(tj->filename); g_free(tj); g_free(job); break; case JOB_DUMP_NON_INNODB: tj=(struct table_job *)job->job_data; if (tj->where) g_message("Thread %d dumping data for `%s`.`%s` where %s", td->thread_id, tj->database, tj->table, tj->where); else g_message("Thread %d dumping data for `%s`.`%s`", td->thread_id, tj->database, tj->table); dump_table_data_file(thrconn, tj->database, tj->table, tj->where, tj->filename); if(tj->database) g_free(tj->database); if(tj->table) g_free(tj->table); if(tj->where) g_free(tj->where); if(tj->filename) g_free(tj->filename); g_free(tj); g_free(job); if (g_atomic_int_dec_and_test(&non_innodb_table_counter) && g_atomic_int_get(&non_innodb_done)) { g_async_queue_push(conf->unlock_tables, GINT_TO_POINTER(1)); } break; case JOB_SCHEMA: sj=(struct schema_job *)job->job_data; g_message("Thread %d dumping schema for `%s`.`%s`", td->thread_id, sj->database, sj->table); dump_schema_data(thrconn, sj->database, sj->table, sj->filename); if(sj->database) g_free(sj->database); if(sj->table) g_free(sj->table); if(sj->filename) g_free(sj->filename); g_free(sj); g_free(job); break; case JOB_BINLOG: thrconn= reconnect_for_binlog(thrconn); g_message("Thread %d connected using MySQL connection ID %lu (in binlog mode)", td->thread_id, mysql_thread_id(thrconn)); bj=(struct binlog_job *)job->job_data; g_message("Thread %d dumping binary log file %s", td->thread_id, bj->filename); get_binlog_file(thrconn, bj->filename, binlog_directory, bj->start_position, bj->stop_position, FALSE); if(bj->filename) g_free(bj->filename); g_free(bj); g_free(job); break; case JOB_SHUTDOWN: g_message("Thread %d shutting down", td->thread_id); if (thrconn) mysql_close(thrconn); g_free(job); mysql_thread_end(); return NULL; break; default: g_critical("Something very bad happened!"); exit(EXIT_FAILURE); } } if (thrconn) mysql_close(thrconn); mysql_thread_end(); return NULL; } MYSQL *reconnect_for_binlog(MYSQL *thrconn) { if (thrconn) { mysql_close(thrconn); } g_mutex_lock(init_mutex); thrconn= mysql_init(NULL); g_mutex_unlock(init_mutex); if (compress_protocol) mysql_options(thrconn,MYSQL_OPT_COMPRESS,NULL); int timeout= 1; mysql_options(thrconn, MYSQL_OPT_READ_TIMEOUT, (const char*)&timeout); if (!mysql_real_connect(thrconn, hostname, username, password, NULL, port, socket_path, 0)) { g_critical("Failed to re-connect to database: %s", mysql_error(thrconn)); exit(EXIT_FAILURE); } return thrconn; } int main(int argc, char *argv[]) { GError *error = NULL; GOptionContext *context; g_thread_init(NULL); init_mutex = g_mutex_new(); context = g_option_context_new("multi-threaded MySQL dumping"); GOptionGroup *main_group= g_option_group_new("main", "Main Options", "Main Options", NULL, NULL); g_option_group_add_entries(main_group, entries); g_option_group_add_entries(main_group, common_entries); g_option_context_set_main_group(context, main_group); if (!g_option_context_parse(context, &argc, &argv, &error)) { g_print ("option parsing failed: %s, try --help\n", error->message); exit (EXIT_FAILURE); } g_option_context_free(context); if (program_version) { g_print("mydumper %s, built against MySQL %s\n", VERSION, MYSQL_SERVER_VERSION); exit (EXIT_SUCCESS); } set_verbose(verbose); time_t t; time(&t);localtime_r(&t,&tval); if (!output_directory) output_directory = g_strdup_printf("%s-%04d%02d%02d-%02d%02d%02d",DIRECTORY, tval.tm_year+1900, tval.tm_mon+1, tval.tm_mday, tval.tm_hour, tval.tm_min, tval.tm_sec); create_backup_dir(output_directory); if (daemon_mode) { pid_t pid, sid; pid= fork(); if (pid < 0) exit(EXIT_FAILURE); else if (pid > 0) exit(EXIT_SUCCESS); umask(0); sid= setsid(); if (sid < 0) exit(EXIT_FAILURE); char *dump_directory= g_strdup_printf("%s/0", output_directory); create_backup_dir(dump_directory); g_free(dump_directory); dump_directory= g_strdup_printf("%s/1", output_directory); create_backup_dir(dump_directory); g_free(dump_directory); daemon_binlog_directory= g_strdup_printf("%s/%s", output_directory, DAEMON_BINLOGS); create_backup_dir(daemon_binlog_directory); } if (need_binlogs) { binlog_directory = g_strdup_printf("%s/%s", output_directory, BINLOG_DIRECTORY); create_backup_dir(binlog_directory); } /* Give ourselves an array of engines to ignore */ if (ignore_engines) ignore = g_strsplit(ignore_engines, ",", 0); /* Give ourselves an array of tables to dump */ if (tables_list) tables = g_strsplit(tables_list, ",", 0); if (daemon_mode) { GError* terror; GThread *bthread= g_thread_create(binlog_thread, GINT_TO_POINTER(1), FALSE, &terror); if (bthread == NULL) { g_critical("Could not create binlog thread: %s", terror->message); g_error_free(terror); exit(EXIT_FAILURE); } start_scheduled_dump= g_async_queue_new(); GThread *ethread= g_thread_create(exec_thread, GINT_TO_POINTER(1), FALSE, &terror); if (ethread == NULL) { g_critical("Could not create exec thread: %s", terror->message); g_error_free(terror); exit(EXIT_FAILURE); } // Run initial snapshot run_snapshot(NULL); #if GLIB_MINOR_VERSION < 14 g_timeout_add(snapshot_interval*60*1000, (GSourceFunc) run_snapshot, NULL); #else g_timeout_add_seconds(snapshot_interval*60, (GSourceFunc) run_snapshot, NULL); #endif guint sigsource= g_unix_signal_add(SIGINT, sig_triggered, NULL); sigsource= g_unix_signal_add(SIGTERM, sig_triggered, NULL); m1= g_main_loop_new(NULL, TRUE); g_main_loop_run(m1); g_source_remove(sigsource); } else { MYSQL *conn= create_main_connection(); start_dump(conn); mysql_close(conn); } sleep(5); mysql_thread_end(); mysql_library_end(); g_free(output_directory); g_strfreev(ignore); g_strfreev(tables); if (logoutfile) { fclose(logoutfile); } exit(errors ? EXIT_FAILURE : EXIT_SUCCESS); } MYSQL *create_main_connection() { MYSQL *conn; conn = mysql_init(NULL); mysql_options(conn,MYSQL_READ_DEFAULT_GROUP,"mydumper"); if (!mysql_real_connect(conn, hostname, username, password, db, port, socket_path, 0)) { g_critical("Error connecting to database: %s", mysql_error(conn)); exit(EXIT_FAILURE); } if ((detected_server == SERVER_TYPE_MYSQL) && mysql_query(conn, "SET SESSION wait_timeout = 2147483")){ g_warning("Failed to increase wait_timeout: %s", mysql_error(conn)); } if ((detected_server == SERVER_TYPE_MYSQL) && mysql_query(conn, "SET SESSION net_write_timeout = 2147483")){ g_warning("Failed to increase net_write_timeout: %s", mysql_error(conn)); } detected_server= detect_server(conn); switch (detected_server) { case SERVER_TYPE_MYSQL: g_message("Connected to a MySQL server"); break; case SERVER_TYPE_DRIZZLE: g_message("Connected to a Drizzle server"); break; default: g_critical("Cannot detect server type"); exit(EXIT_FAILURE); break; } return conn; } void *exec_thread(void *data) { (void) data; while(1) { g_async_queue_pop(start_scheduled_dump); clear_dump_directory(); MYSQL *conn= create_main_connection(); start_dump(conn); mysql_close(conn); mysql_thread_end(); // Don't switch the symlink on shutdown because the dump is probably incomplete. if (!shutdown_triggered) { const char *dump_symlink_source= (dump_number == 0) ? "0" : "1"; char *dump_symlink_dest= g_strdup_printf("%s/last_dump", output_directory); // We don't care if this fails g_unlink(dump_symlink_dest); if (symlink(dump_symlink_source, dump_symlink_dest) == -1) { g_critical("error setting last good dump symlink %s, %d", dump_symlink_dest, errno); } g_free(dump_symlink_dest); dump_number= (dump_number == 1) ? 0 : 1; } } return NULL; } void *binlog_thread(void *data) { (void) data; MYSQL_RES *master= NULL; MYSQL_ROW row; MYSQL *conn; conn = mysql_init(NULL); mysql_options(conn,MYSQL_READ_DEFAULT_GROUP,"mydumper"); if (!mysql_real_connect(conn, hostname, username, password, db, port, socket_path, 0)) { g_critical("Error connecting to database: %s", mysql_error(conn)); exit(EXIT_FAILURE); } mysql_query(conn,"SHOW MASTER STATUS"); master= mysql_store_result(conn); if (master && (row= mysql_fetch_row(master))) { MYSQL *binlog_connection= NULL; binlog_connection= reconnect_for_binlog(binlog_connection); binlog_connect_id= mysql_thread_id(binlog_connection); guint64 start_position= g_ascii_strtoull(row[1], NULL, 10); gchar* filename= g_strdup(row[0]); mysql_free_result(master); mysql_close(conn); g_message("Continuous binlog thread connected using MySQL connection ID %lu", mysql_thread_id(binlog_connection)); get_binlog_file(binlog_connection, filename, daemon_binlog_directory, start_position, 0, TRUE); g_free(filename); mysql_close(binlog_connection); } else { mysql_free_result(master); mysql_close(conn); } g_message("Continuous binlog thread shutdown"); mysql_thread_end(); return NULL; } void start_dump(MYSQL *conn) { struct configuration conf = { 1, NULL, NULL, NULL, NULL, 0 }; char *p; time_t t; if (daemon_mode) p= g_strdup_printf("%s/%d/metadata", output_directory, dump_number); else p= g_strdup_printf("%s/metadata", output_directory); FILE* mdfile=g_fopen(p,"w"); g_free(p); if(!mdfile) { g_critical("Couldn't write metadata file (%d)",errno); exit(EXIT_FAILURE); } /* We check SHOW PROCESSLIST, and if there're queries larger than preset value, we terminate the process. This avoids stalling whole server with flush */ if (mysql_query(conn, "SHOW PROCESSLIST")) { g_warning("Could not check PROCESSLIST, no long query guard enabled: %s", mysql_error(conn)); } else { MYSQL_RES *res = mysql_store_result(conn); MYSQL_ROW row; /* Just in case PROCESSLIST output column order changes */ MYSQL_FIELD *fields = mysql_fetch_fields(res); guint i; int tcol=-1, ccol=-1, icol=-1; for(i=0; ilongquery) { if (killqueries) { if (mysql_query(conn,p=g_strdup_printf("KILL %lu",atol(row[icol])))) g_warning("Could not KILL slow query: %s",mysql_error(conn)); else g_warning("Killed a query that was running for %ss",row[tcol]); g_free(p); } else { g_critical("There are queries in PROCESSLIST running longer than %us, aborting dump,\n\t" "use --long-query-guard to change the guard value, kill queries (--kill-long-queries) or use \n\tdifferent server for dump", longquery); exit(EXIT_FAILURE); } } } mysql_free_result(res); } if (!no_locks) { if (mysql_query(conn, "FLUSH TABLES WITH READ LOCK")) { g_critical("Couldn't acquire global lock, snapshots will not be consistent: %s",mysql_error(conn)); errors++; } } else { g_warning("Executing in no-locks mode, snapshot will notbe consistent"); } if (mysql_get_server_version(conn)) { mysql_query(conn, "CREATE TABLE IF NOT EXISTS mysql.mydumperdummy (a INT) ENGINE=INNODB"); need_dummy_read=1; } mysql_query(conn, "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */"); if (need_dummy_read) { mysql_query(conn,"SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.mydumperdummy"); MYSQL_RES *res=mysql_store_result(conn); if (res) mysql_free_result(res); } time(&t); localtime_r(&t,&tval); fprintf(mdfile,"Started dump at: %04d-%02d-%02d %02d:%02d:%02d\n", tval.tm_year+1900, tval.tm_mon+1, tval.tm_mday, tval.tm_hour, tval.tm_min, tval.tm_sec); g_message("Started dump at: %04d-%02d-%02d %02d:%02d:%02d\n", tval.tm_year+1900, tval.tm_mon+1, tval.tm_mday, tval.tm_hour, tval.tm_min, tval.tm_sec); if (detected_server == SERVER_TYPE_MYSQL) { mysql_query(conn, "/*!40101 SET NAMES binary*/"); write_snapshot_info(conn, mdfile); } conf.queue = g_async_queue_new(); conf.ready = g_async_queue_new(); conf.unlock_tables= g_async_queue_new(); guint n; GThread **threads = g_new(GThread*,num_threads); struct thread_data *td= g_new(struct thread_data, num_threads); for (n=0; ndata; dump_table(conn, dbt->database, dbt->table, &conf, FALSE); g_atomic_int_inc(&non_innodb_table_counter); } g_list_free(g_list_first(non_innodb_table)); g_atomic_int_inc(&non_innodb_done); for (innodb_tables= g_list_first(innodb_tables); innodb_tables; innodb_tables= g_list_next(innodb_tables)) { dbt= (struct db_table*) innodb_tables->data; dump_table(conn, dbt->database, dbt->table, &conf, TRUE); } g_list_free(g_list_first(innodb_tables)); for (table_schemas= g_list_first(table_schemas); table_schemas; table_schemas= g_list_next(table_schemas)) { dbt= (struct db_table*) table_schemas->data; dump_schema(dbt->database, dbt->table, &conf); g_free(dbt->table); g_free(dbt->database); g_free(dbt); } g_list_free(g_list_first(table_schemas)); if (need_binlogs) { get_binlogs(conn, &conf); } for (n=0; ntype = JOB_SHUTDOWN; g_async_queue_push(conf.queue,j); } g_async_queue_pop(conf.unlock_tables); if (!no_locks) { g_message("Non-InnoDB dump complete, unlocking tables"); mysql_query(conn, "UNLOCK TABLES"); } for (n=0; nuse_any_index) { guint64 max_cardinality=0; guint64 cardinality=0; mysql_data_seek(indexes,0); while ((row=mysql_fetch_row(indexes))) { if(!strcmp(row[3],"1")) { if (row[6]) cardinality = strtoll(row[6],NULL,10); if (cardinality>max_cardinality) { field=row[4]; max_cardinality=cardinality; } } } } /* Oh well, no chunks today - no suitable index */ if (!field) goto cleanup; /* Get minimum/maximum */ mysql_query(conn, query=g_strdup_printf("SELECT %s MIN(`%s`),MAX(`%s`) FROM `%s`.`%s`", (detected_server == SERVER_TYPE_MYSQL) ? "/*!40001 SQL_NO_CACHE */" : "", field, field, database, table)); g_free(query); minmax=mysql_store_result(conn); if (!minmax) goto cleanup; row=mysql_fetch_row(minmax); MYSQL_FIELD * fields=mysql_fetch_fields(minmax); char *min=row[0]; char *max=row[1]; /* Got total number of rows, skip chunk logic if estimates are low */ guint64 rows = estimate_count(conn, database, table, field, NULL, NULL); if (rows <= rows_per_file) goto cleanup; /* This is estimate, not to use as guarantee! Every chunk would have eventual adjustments */ guint64 estimated_chunks = rows / rows_per_file; guint64 estimated_step, nmin, nmax, cutoff; /* Support just bigger INTs for now, very dumb, no verify approach */ switch (fields[0].type) { case MYSQL_TYPE_LONG: case MYSQL_TYPE_LONGLONG: case MYSQL_TYPE_INT24: /* static stepping */ nmin = strtoll(min,NULL,10); nmax = strtoll(max,NULL,10); estimated_step = (nmax-nmin)/estimated_chunks+1; cutoff = nmin; while(cutoff<=nmax) { chunks=g_list_append(chunks,g_strdup_printf("%s%s(`%s` >= %llu AND `%s` < %llu)", !showed_nulls?field:"", !showed_nulls?" IS NULL OR ":"", field, (unsigned long long)cutoff, field, (unsigned long long)(cutoff+estimated_step))); cutoff+=estimated_step; showed_nulls=1; } default: goto cleanup; } cleanup: if (indexes) mysql_free_result(indexes); if (minmax) mysql_free_result(minmax); if (total) mysql_free_result(total); return chunks; } /* Try to get EXPLAIN'ed estimates of row in resultset */ guint64 estimate_count(MYSQL *conn, char *database, char *table, char *field, char *from, char *to) { char *querybase, *query; int ret; g_assert(conn && database && table); querybase = g_strdup_printf("EXPLAIN SELECT `%s` FROM `%s`.`%s`", (field?field:"*"), database, table); if (from || to) { g_assert(field != NULL); char *fromclause=NULL, *toclause=NULL; char *escaped; if (from) { escaped=g_new(char,strlen(from)*2+1); mysql_real_escape_string(conn,escaped,from,strlen(from)); fromclause = g_strdup_printf(" `%s` >= \"%s\" ", field, escaped); g_free(escaped); } if (to) { escaped=g_new(char,strlen(to)*2+1); mysql_real_escape_string(conn,escaped,from,strlen(from)); toclause = g_strdup_printf( " `%s` <= \"%s\"", field, escaped); g_free(escaped); } query = g_strdup_printf("%s WHERE `%s` %s %s", querybase, (from?fromclause:""), ((from&&to)?"AND":""), (to?toclause:"")); if (toclause) g_free(toclause); if (fromclause) g_free(fromclause); ret=mysql_query(conn,query); g_free(querybase); g_free(query); } else { ret=mysql_query(conn,querybase); g_free(querybase); } if (ret) { g_warning("Unable to get estimates for %s.%s: %s",database,table,mysql_error(conn)); } MYSQL_RES * result = mysql_store_result(conn); MYSQL_FIELD * fields = mysql_fetch_fields(result); guint i; for (i=0; i1 kicks in only in case of 5.0 SHOW FULL TABLES or SHOW TABLE STATUS row[1] == NULL if it is a view in 5.0 'SHOW TABLE STATUS' row[1] == "VIEW" if it is a view in 5.0 'SHOW FULL TABLES' */ if ((detected_server == SERVER_TYPE_MYSQL) && ( row[ccol] == NULL || !strcmp(row[ccol],"VIEW") )) continue; /* Skip ignored engines, handy for avoiding Merge, Federated or Blackhole :-) dumps */ if (ignore) { for (i = 0; ignore[i] != NULL; i++) { if (g_ascii_strcasecmp(ignore[i], row[ecol]) == 0) { dump = 0; break; } } } if (!dump) continue; /* In case of table-list option is enabled, check if table is part of the list */ if (tables) { int table_found=0; for (i = 0; tables[i] != NULL; i++) if (g_ascii_strcasecmp(tables[i], row[0]) == 0) table_found = 1; if (!table_found) dump = 0; } if (!dump) continue; /* Checks PCRE expressions on 'database.table' string */ if (regexstring && !check_regex(database,row[0])) continue; /* Green light! */ struct db_table *dbt = g_new(struct db_table, 1); dbt->database= g_strdup(database); dbt->table= g_strdup(row[0]); if (!g_ascii_strcasecmp("InnoDB", row[ecol])) { innodb_tables= g_list_append(innodb_tables, dbt); } else { non_innodb_table= g_list_append(non_innodb_table, dbt); } if (!no_schemas) { table_schemas= g_list_append(table_schemas, dbt); } } mysql_free_result(result); } void dump_schema_data(MYSQL *conn, char *database, char *table, char *filename) { void *outfile; char *query = NULL; MYSQL_RES *result = NULL; MYSQL_ROW row; if (!compress_output) outfile= g_fopen(filename, "w"); else outfile= gzopen(filename, "w"); if (!outfile) { g_critical("Error: DB: %s Could not create output file %s (%d)", database, filename, errno); errors++; return; } GString* statement = g_string_sized_new(statement_size); if (detected_server == SERVER_TYPE_MYSQL) { g_string_printf(statement,"/*!40101 SET NAMES binary*/;\n"); g_string_append(statement,"/*!40014 SET FOREIGN_KEY_CHECKS=0*/;\n\n"); } else { g_string_printf(statement, "SET FOREIGN_KEY_CHECKS=0;\n"); } if (!write_data((FILE *)outfile,statement)) { g_critical("Could not write schema data for %s.%s", database, table); errors++; return; } query= g_strdup_printf("SHOW CREATE TABLE `%s`.`%s`", database, table); if (mysql_query(conn, query) || !(result= mysql_use_result(conn))) { g_critical("Error dumping schemas (%s.%s): %s", database, table, mysql_error(conn)); g_free(query); errors++; return; } g_string_set_size(statement, 0); /* There should never be more than one row */ row = mysql_fetch_row(result); g_string_append(statement, row[1]); g_string_append(statement, ";\n"); if (!write_data((FILE *)outfile, statement)) { g_critical("Could not write schema for %s.%s", database, table); errors++; } g_free(query); if (!compress_output) fclose((FILE *)outfile); else gzclose(outfile); g_string_free(statement, TRUE); if (result) mysql_free_result(result); return; } void dump_table_data_file(MYSQL *conn, char *database, char *table, char *where, char *filename) { void *outfile; if (!compress_output) outfile = g_fopen(filename, "w"); else outfile = gzopen(filename, "w"); if (!outfile) { g_critical("Error: DB: %s TABLE: %s Could not create output file %s (%d)", database, table, filename, errno); errors++; return; } guint64 rows_count = dump_table_data(conn, (FILE *)outfile, database, table, where); if (!compress_output) fclose((FILE *)outfile); else gzclose(outfile); if (!rows_count && !build_empty_files) { // dropping the useless file if (remove(filename)) { g_warning("failed to remove empty file : %s\n", filename); return; } } } void dump_schema(char *database, char *table, struct configuration *conf) { struct job *j = g_new0(struct job,1); struct schema_job *sj = g_new0(struct schema_job,1); j->job_data=(void*) sj; sj->database=g_strdup(database); sj->table=g_strdup(table); j->conf=conf; j->type=JOB_SCHEMA; if (daemon_mode) sj->filename = g_strdup_printf("%s/%d/%s.%s-schema.sql%s", output_directory, dump_number, database, table, (compress_output?".gz":"")); else sj->filename = g_strdup_printf("%s/%s.%s-schema.sql%s", output_directory, database, table, (compress_output?".gz":"")); g_async_queue_push(conf->queue,j); return; } void dump_table(MYSQL *conn, char *database, char *table, struct configuration *conf, gboolean is_innodb) { GList * chunks = NULL; if (rows_per_file) chunks = get_chunks_for_table(conn, database, table, conf); if (chunks) { int nchunk=0; for (chunks = g_list_first(chunks); chunks; chunks=g_list_next(chunks)) { struct job *j = g_new0(struct job,1); struct table_job *tj = g_new0(struct table_job,1); j->job_data=(void*) tj; tj->database=g_strdup(database); tj->table=g_strdup(table); j->conf=conf; j->type= is_innodb ? JOB_DUMP : JOB_DUMP_NON_INNODB; if (daemon_mode) tj->filename=g_strdup_printf("%s/%d/%s.%s.%05d.sql%s", output_directory, dump_number, database, table, nchunk,(compress_output?".gz":"")); else tj->filename=g_strdup_printf("%s/%s.%s.%05d.sql%s", output_directory, database, table, nchunk,(compress_output?".gz":"")); tj->where=(char *)chunks->data; g_async_queue_push(conf->queue,j); nchunk++; } g_list_free(g_list_first(chunks)); } else { struct job *j = g_new0(struct job,1); struct table_job *tj = g_new0(struct table_job,1); j->job_data=(void*) tj; tj->database=g_strdup(database); tj->table=g_strdup(table); j->conf=conf; j->type= is_innodb ? JOB_DUMP : JOB_DUMP_NON_INNODB; if (daemon_mode) tj->filename = g_strdup_printf("%s/%d/%s.%s.sql%s", output_directory, dump_number, database, table,(compress_output?".gz":"")); else tj->filename = g_strdup_printf("%s/%s.%s.sql%s", output_directory, database, table,(compress_output?".gz":"")); g_async_queue_push(conf->queue,j); return; } } /* Do actual data chunk reading/writing magic */ guint64 dump_table_data(MYSQL * conn, FILE *file, char *database, char *table, char *where) { guint i; guint num_fields = 0; guint64 num_rows = 0; MYSQL_RES *result = NULL; char *query = NULL; /* Ghm, not sure if this should be statement_size - but default isn't too big for now */ GString* statement = g_string_sized_new(statement_size); if (detected_server == SERVER_TYPE_MYSQL) { g_string_printf(statement,"/*!40101 SET NAMES binary*/;\n"); g_string_append(statement,"/*!40014 SET FOREIGN_KEY_CHECKS=0*/;\n"); } else { g_string_printf(statement,"SET FOREIGN_KEY_CHECKS=0;\n"); } if (!write_data(file,statement)) { g_critical("Could not write out data for %s.%s", database, table); return num_rows; } /* Poor man's database code */ query = g_strdup_printf("SELECT %s * FROM `%s`.`%s` %s %s", (detected_server == SERVER_TYPE_MYSQL) ? "/*!40001 SQL_NO_CACHE */" : "", database, table, where?"WHERE":"",where?where:""); if (mysql_query(conn, query) || !(result=mysql_use_result(conn))) { g_critical("Error dumping table (%s.%s) data: %s ",database, table, mysql_error(conn)); g_free(query); errors++; return num_rows; } num_fields = mysql_num_fields(result); MYSQL_FIELD *fields = mysql_fetch_fields(result); /* Buffer for escaping field values */ GString *escaped = g_string_sized_new(3000); MYSQL_ROW row; g_string_set_size(statement,0); /* Poor man's data dump code */ while ((row = mysql_fetch_row(result))) { gulong *lengths = mysql_fetch_lengths(result); num_rows++; if (!statement->len) g_string_printf(statement, "INSERT INTO `%s` VALUES\n(", table); else g_string_append(statement, ",\n("); for (i = 0; i < num_fields; i++) { /* Don't escape safe formats, saves some time */ if (!row[i]) { g_string_append(statement, "NULL"); } else if (fields[i].flags & NUM_FLAG) { g_string_append(statement, row[i]); } else { /* We reuse buffers for string escaping, growing is expensive just at the beginning */ g_string_set_size(escaped, lengths[i]*2+1); mysql_real_escape_string(conn, escaped->str, row[i], lengths[i]); g_string_append_c(statement,'\"'); g_string_append(statement,escaped->str); g_string_append_c(statement,'\"'); } if (i < num_fields - 1) { g_string_append_c(statement,','); } else { /* INSERT statement is closed once over limit */ if (statement->len > statement_size) { g_string_append(statement,");\n"); if (!write_data(file,statement)) { g_critical("Could not write out data for %s.%s", database, table); goto cleanup; } g_string_set_size(statement,0); } else { g_string_append_c(statement,')'); } } } } if (mysql_errno(conn)) { g_critical("Could not read data from %s.%s: %s", database, table, mysql_error(conn)); } if (statement->len > 0) { if (!write_data(file,statement)) { g_critical("Could not write out data for %s.%s", database, table); goto cleanup; } g_string_printf(statement,";\n"); if (!write_data(file,statement)) { g_critical("Could not write out closing newline for %s.%s, now this is sad!", database, table); goto cleanup; } } cleanup: g_free(query); g_string_free(escaped,TRUE); g_string_free(statement,TRUE); if (result) { mysql_free_result(result); } return num_rows; } gboolean write_data(FILE* file,GString * data) { size_t written= 0; ssize_t r= 0; while (written < data->len) { if (!compress_output) r = write(fileno(file), data->str + written, data->len); else r = gzwrite((gzFile)file, data->str + written, data->len); if (r < 0) { g_critical("Couldn't write data to a file: %s", strerror(errno)); errors++; return FALSE; } written += r; } return TRUE; } void write_log_file(const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer user_data) { (void) log_domain; (void) user_data; gchar date[20]; time_t rawtime; struct tm timeinfo; time(&rawtime); localtime_r(&rawtime, &timeinfo); strftime(date, 20, "%Y-%m-%d %H:%M:%S", &timeinfo); GString* message_out = g_string_new(date); if (log_level & G_LOG_LEVEL_DEBUG) { g_string_append(message_out, " [DEBUG] - "); } else if ((log_level & G_LOG_LEVEL_INFO) || (log_level & G_LOG_LEVEL_MESSAGE)) { g_string_append(message_out, " [INFO] - "); } else if (log_level & G_LOG_LEVEL_WARNING) { g_string_append(message_out, " [WARNING] - "); } else if ((log_level & G_LOG_LEVEL_ERROR) || (log_level & G_LOG_LEVEL_CRITICAL)) { g_string_append(message_out, " [ERROR] - "); } g_string_append_printf(message_out, "%s\n", message); if (write(fileno(logoutfile), message_out->str, message_out->len) <= 0) { fprintf(stderr, "Cannot write to log file with error %d. Exiting...", errno); } g_string_free(message_out, TRUE); }