~skinny.moey/drizzle/bug755201

« back to all changes in this revision

Viewing changes to plugin/slave/replication_schema.cc

  • Committer: Lee Bieber
  • Date: 2011-03-28 18:11:45 UTC
  • mfrom: (2254.1.2 build)
  • Revision ID: kalebral@gmail.com-20110328181145-tfsb6s5ozhuvhfoq
Merge Patrick - Tweaked dbqp so that the existing slave tests work.
Merge Stewart - remove over 1000 lines of mysys

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
   * Create our IO thread state information table if we need to.
48
48
   */
49
49
 
 
50
  /*
 
51
   * Table: io_state
 
52
   * Version 1.0: Initial definition
 
53
   * Version 1.1: Added master_id and PK on master_id
 
54
   */
 
55
 
50
56
  sql.clear();
51
57
  sql.push_back("COMMIT");
52
58
  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`io_state` ("
 
59
                " `master_id` BIGINT NOT NULL,"
53
60
                " `status` VARCHAR(20) NOT NULL,"
54
 
                " `error_msg` VARCHAR(250))"
55
 
                " COMMENT = 'VERSION 1.0'");
 
61
                " `error_msg` VARCHAR(250),"
 
62
                " PRIMARY KEY(`master_id`))"
 
63
                " COMMENT = 'VERSION 1.1'");
56
64
 
57
65
  if (not executeSQL(sql))
58
66
    return false;
59
67
 
60
 
  sql.clear();
61
 
  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state`");
62
 
 
63
 
  {
64
 
    sql::ResultSet result_set(1);
65
 
    Execute execute(*(_session.get()), true);
66
 
    execute.run(sql[0], result_set);
67
 
    result_set.next();
68
 
    string count= result_set.getString(0);
69
 
 
70
 
    /* Must always be at least one row in the table */
71
 
    if (count == "0")
72
 
    {
73
 
      sql.clear();
74
 
      sql.push_back("INSERT INTO `sys_replication`.`io_state` (`status`)"
75
 
                    " VALUES ('STOPPED')");
76
 
      if (not executeSQL(sql))
77
 
        return false;
78
 
    }
79
 
  }
80
 
 
81
68
  /*
82
69
   * Create our applier thread state information table if we need to.
83
70
   */
84
71
 
 
72
  /*
 
73
   * Table: applier_state
 
74
   * Version 1.0: Initial definition
 
75
   * Version 1.1: Added master_id and changed PK to master_id
 
76
   */
 
77
 
85
78
  sql.clear();
86
79
  sql.push_back("COMMIT");
87
 
  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state`"
88
 
                " (`last_applied_commit_id` BIGINT NOT NULL PRIMARY KEY,"
 
80
  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state` ("
 
81
                " `master_id` BIGINT NOT NULL,"
 
82
                " `last_applied_commit_id` BIGINT NOT NULL,"
89
83
                " `status` VARCHAR(20) NOT NULL,"
90
 
                " `error_msg` VARCHAR(250))"
91
 
                " COMMENT = 'VERSION 1.0'");
 
84
                " `error_msg` VARCHAR(250),"
 
85
                " PRIMARY KEY(`master_id`))"
 
86
                " COMMENT = 'VERSION 1.1'");
92
87
 
93
88
  if (not executeSQL(sql))
94
89
    return false;
95
90
 
96
 
  sql.clear();
97
 
  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state`");
98
 
 
99
 
  {
100
 
    sql::ResultSet result_set(1);
101
 
    Execute execute(*(_session.get()), true);
102
 
    execute.run(sql[0], result_set);
103
 
    result_set.next();
104
 
    string count= result_set.getString(0);
105
 
 
106
 
    /* Must always be at least one row in the table */
107
 
    if (count == "0")
108
 
    {
109
 
      sql.clear();
110
 
      sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
111
 
                    " (`last_applied_commit_id`, `status`)"
112
 
                    " VALUES (0, 'STOPPED')");
113
 
      if (not executeSQL(sql))
114
 
        return false;
115
 
    }
116
 
  }
117
 
 
118
91
  /*
119
92
   * Create our message queue table if we need to.
 
93
   * Version 1.0: Initial definition
 
94
   * Version 1.1: Added master_id and changed PK to (master_id, trx_id, seg_id)
120
95
   */
121
96
 
122
97
  sql.clear();
123
98
  sql.push_back("COMMIT");
124
 
  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`queue`"
125
 
                " (`trx_id` BIGINT NOT NULL, `seg_id` INT NOT NULL,"
126
 
                " `commit_order` BIGINT, `msg` BLOB,"
127
 
                " PRIMARY KEY(`trx_id`, `seg_id`))"
128
 
                " COMMENT = 'VERSION 1.0'");
 
99
  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`queue` ("
 
100
                " `master_id` BIGINT NOT NULL,"
 
101
                " `trx_id` BIGINT NOT NULL,"
 
102
                " `seg_id` INT NOT NULL,"
 
103
                " `commit_order` BIGINT,"
 
104
                " `msg` BLOB,"
 
105
                " PRIMARY KEY(`master_id`, `trx_id`, `seg_id`))"
 
106
                " COMMENT = 'VERSION 1.1'");
129
107
  if (not executeSQL(sql))
130
108
    return false;
131
109
 
132
110
  return true;
133
111
}
134
112
 
135
 
bool ReplicationSchema::setInitialMaxCommitId(uint64_t value)
 
113
bool ReplicationSchema::createInitialIORow(uint32_t master_id)
 
114
{
 
115
  vector<string> sql;
 
116
 
 
117
  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
 
118
 
 
119
  sql::ResultSet result_set(1);
 
120
  Execute execute(*(_session.get()), true);
 
121
  execute.run(sql[0], result_set);
 
122
  result_set.next();
 
123
  string count= result_set.getString(0);
 
124
 
 
125
  if (count == "0")
 
126
  {
 
127
    sql.clear();
 
128
    sql.push_back("INSERT INTO `sys_replication`.`io_state` (`master_id`, `status`) VALUES ("
 
129
                  + boost::lexical_cast<string>(master_id)
 
130
                  + ", 'STOPPED')");
 
131
    if (not executeSQL(sql))
 
132
      return false;
 
133
  }
 
134
 
 
135
  return true;
 
136
}
 
137
 
 
138
bool ReplicationSchema::createInitialApplierRow(uint32_t master_id)
 
139
{
 
140
  vector<string> sql;
 
141
 
 
142
  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
 
143
 
 
144
  sql::ResultSet result_set(1);
 
145
  Execute execute(*(_session.get()), true);
 
146
  execute.run(sql[0], result_set);
 
147
  result_set.next();
 
148
  string count= result_set.getString(0);
 
149
 
 
150
  if (count == "0")
 
151
  {
 
152
    sql.clear();
 
153
    sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
 
154
                  " (`master_id`, `last_applied_commit_id`, `status`) VALUES ("
 
155
                  + boost::lexical_cast<string>(master_id)
 
156
                  + ",0 , 'STOPPED')");
 
157
    if (not executeSQL(sql))
 
158
      return false;
 
159
  }
 
160
 
 
161
  return true;
 
162
}
 
163
 
 
164
bool ReplicationSchema::setInitialMaxCommitId(uint32_t master_id, uint64_t value)
136
165
{
137
166
  vector<string> sql;
138
167
 
139
168
  sql.push_back("UPDATE `sys_replication`.`applier_state`"
140
169
                " SET `last_applied_commit_id` = "
141
 
                + lexical_cast<string>(value));
 
170
                + lexical_cast<string>(value)
 
171
                + " WHERE `master_id` = "
 
172
                + lexical_cast<string>(master_id));
142
173
 
143
174
  return executeSQL(sql);
144
175
}