~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/slave/queue_consumer.cc

  • Committer: Package Import Robot
  • Author(s): Clint Byrum
  • Date: 2012-06-19 10:46:49 UTC
  • mfrom: (1.1.6)
  • mto: This revision was merged to the branch mainline in revision 29.
  • Revision ID: package-import@ubuntu.com-20120619104649-e2l0ggd4oz3um0f4
Tags: upstream-7.1.36-stable
ImportĀ upstreamĀ versionĀ 7.1.36-stable

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
#include <config.h>
22
22
#include <plugin/slave/queue_consumer.h>
 
23
#include <drizzled/errmsg_print.h>
 
24
#include <drizzled/execute.h>
23
25
#include <drizzled/message/transaction.pb.h>
24
26
#include <drizzled/message/statement_transform.h>
25
27
#include <drizzled/sql/result_set.h>
26
 
#include <drizzled/execute.h>
27
28
#include <string>
28
29
#include <vector>
29
30
#include <boost/thread.hpp>
51
52
 
52
53
bool QueueConsumer::process()
53
54
{
 
55
  for (size_t index= 0; index < _master_ids.size(); index++)
 
56
  {
 
57
    /* We go ahead and get the string version of the master ID
 
58
     * so we don't have to keep converting it from int to string.
 
59
     */
 
60
    const string master_id= boost::lexical_cast<string>(_master_ids[index]);
 
61
 
 
62
    if (not processSingleMaster(master_id))
 
63
      return false;
 
64
  }
 
65
 
 
66
  return true;
 
67
}
 
68
 
 
69
bool QueueConsumer::processSingleMaster(const string &master_id)
 
70
{
54
71
  TrxIdList completedTransactionIds;
55
72
 
56
 
  getListOfCompletedTransactions(completedTransactionIds);
 
73
  getListOfCompletedTransactions(master_id, completedTransactionIds);
57
74
 
58
75
  for (size_t x= 0; x < completedTransactionIds.size(); x++)
59
76
  {
60
77
    string commit_id;
 
78
    string originating_server_uuid;
 
79
    uint64_t originating_commit_id= 0;
61
80
    uint64_t trx_id= completedTransactionIds[x];
62
81
 
63
82
    vector<string> aggregate_sql;  /* final SQL to execute */
66
85
    message::Transaction transaction;
67
86
    uint32_t segment_id= 1;
68
87
 
69
 
    while (getMessage(transaction, commit_id, trx_id, segment_id++))
 
88
    while (getMessage(transaction, commit_id, master_id, trx_id, originating_server_uuid,
 
89
                      originating_commit_id, segment_id++))
70
90
    {
71
91
      convertToSQL(transaction, aggregate_sql, segmented_sql);
72
92
      transaction.Clear();
115
135
      }
116
136
    }
117
137
 
118
 
    if (not executeSQLWithCommitId(aggregate_sql, commit_id))
 
138
    if (not executeSQLWithCommitId(aggregate_sql, commit_id, 
 
139
                                   originating_server_uuid, 
 
140
                                   originating_commit_id,
 
141
                                   master_id))
119
142
    {
120
 
      return false;
 
143
      if (_ignore_errors)
 
144
      {
 
145
        clearErrorState();
 
146
 
 
147
        /* Still need to record that we handled this trx */
 
148
        vector<string> sql;
 
149
        string tmp("UPDATE `sys_replication`.`applier_state`"
 
150
                   " SET `last_applied_commit_id` = ");
 
151
        tmp.append(commit_id);
 
152
        tmp.append(" WHERE `master_id` = ");
 
153
        tmp.append(master_id);
 
154
        sql.push_back(tmp);
 
155
        executeSQL(sql);
 
156
      }
 
157
      else
 
158
      {
 
159
        return false;
 
160
      }
121
161
    }
122
162
 
123
 
    if (not deleteFromQueue(trx_id))
 
163
    if (not deleteFromQueue(master_id, trx_id))
124
164
    {
125
165
      return false;
126
166
    }
132
172
 
133
173
bool QueueConsumer::getMessage(message::Transaction &transaction,
134
174
                              string &commit_id,
 
175
                              const string &master_id,
135
176
                              uint64_t trx_id,
 
177
                              string &originating_server_uuid,
 
178
                              uint64_t &originating_commit_id,
136
179
                              uint32_t segment_id)
137
180
{
138
 
  string sql("SELECT `msg`, `commit_order` FROM `sys_replication`.`queue`"
 
181
  string sql("SELECT `msg`, `commit_order`, `originating_server_uuid`, "
 
182
             "`originating_commit_id` FROM `sys_replication`.`queue`"
139
183
             " WHERE `trx_id` = ");
140
184
  sql.append(boost::lexical_cast<string>(trx_id));
141
185
  sql.append(" AND `seg_id` = ", 16);
142
186
  sql.append(boost::lexical_cast<string>(segment_id));
 
187
  sql.append(" AND `master_id` = ", 19),
 
188
  sql.append(master_id);
143
189
 
144
 
  sql::ResultSet result_set(2);
 
190
  sql::ResultSet result_set(4);
145
191
  Execute execute(*(_session.get()), true);
146
192
  
147
193
  execute.run(sql, result_set);
148
194
  
149
 
  assert(result_set.getMetaData().getColumnCount() == 2);
 
195
  assert(result_set.getMetaData().getColumnCount() == 4);
150
196
 
151
197
  /* Really should only be 1 returned row */
152
198
  uint32_t found_rows= 0;
154
200
  {
155
201
    string msg= result_set.getString(0);
156
202
    string com_id= result_set.getString(1);
 
203
    string orig_server_uuid= result_set.getString(2);
 
204
    string orig_commit_id= result_set.getString(3);
157
205
 
158
206
    if ((msg == "") || (found_rows == 1))
159
207
      break;
160
208
 
161
 
    /* Neither column should be NULL */
 
209
    /* No columns should be NULL */
162
210
    assert(result_set.isNull(0) == false);
163
211
    assert(result_set.isNull(1) == false);
 
212
    assert(result_set.isNull(2) == false);
 
213
    assert(result_set.isNull(3) == false);
 
214
 
164
215
 
165
216
    google::protobuf::TextFormat::ParseFromString(msg, &transaction);
166
217
 
167
218
    commit_id= com_id;
 
219
    originating_server_uuid= orig_server_uuid;
 
220
    originating_commit_id= boost::lexical_cast<uint64_t>(orig_commit_id);
168
221
    found_rows++;
169
222
  }
170
223
 
174
227
  return true;
175
228
}
176
229
 
177
 
bool QueueConsumer::getListOfCompletedTransactions(TrxIdList &list)
 
230
bool QueueConsumer::getListOfCompletedTransactions(const string &master_id,
 
231
                                                   TrxIdList &list)
178
232
{
179
233
  Execute execute(*(_session.get()), true);
180
234
  
181
235
  string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
182
236
             " WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
183
 
             " ORDER BY `commit_order` ASC");
 
237
             " AND `master_id` = "
 
238
             + master_id
 
239
             + " ORDER BY `commit_order` ASC");
184
240
  
185
241
  /* ResultSet size must match column count */
186
242
  sql::ResultSet result_set(1);
314
370
}
315
371
 
316
372
 
 
373
/*
 
374
 * TODO: This currently updates every row in the applier_state table.
 
375
 * This use to be a single row. With multi-master support, we now need
 
376
 * a row for every master so we can track the last applied commit ID
 
377
 * value for each. Eventually, we may want multiple consumer threads,
 
378
 * so then we'd need to update each row independently.
 
379
 */
317
380
void QueueConsumer::setApplierState(const string &err_msg, bool status)
318
381
{
319
382
  vector<string> statements;
351
414
  sql.append("'", 1);
352
415
 
353
416
  statements.push_back(sql);
 
417
  clearErrorState();
354
418
  executeSQL(statements);
355
419
}
356
420
 
357
421
 
358
422
bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
359
 
                                           const string &commit_id)
 
423
                                           const string &commit_id, 
 
424
                                           const string &originating_server_uuid, 
 
425
                                           uint64_t originating_commit_id,
 
426
                                           const string &master_id)
360
427
{
361
428
  string tmp("UPDATE `sys_replication`.`applier_state`"
362
429
             " SET `last_applied_commit_id` = ");
363
430
  tmp.append(commit_id);
 
431
  tmp.append(", `originating_server_uuid` = '");
 
432
  tmp.append(originating_server_uuid);
 
433
  tmp.append("' , `originating_commit_id` = ");
 
434
  tmp.append(boost::lexical_cast<string>(originating_commit_id));
 
435
 
 
436
  tmp.append(" WHERE `master_id` = ");
 
437
  tmp.append(master_id);
 
438
 
364
439
  sql.push_back(tmp);
365
 
  
 
440
 
 
441
  _session->setOriginatingServerUUID(originating_server_uuid);
 
442
  _session->setOriginatingCommitID(originating_commit_id);
 
443
 
366
444
  return executeSQL(sql);
367
445
}
368
446
 
369
447
 
370
 
bool QueueConsumer::deleteFromQueue(uint64_t trx_id)
 
448
bool QueueConsumer::deleteFromQueue(const string &master_id, uint64_t trx_id)
371
449
{
372
450
  string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
373
451
  sql.append(boost::lexical_cast<std::string>(trx_id));
374
452
 
 
453
  sql.append(" AND `master_id` = ");
 
454
  sql.append(master_id);
 
455
 
375
456
  vector<string> sql_vect;
376
457
  sql_vect.push_back(sql);
377
458