~stewart/drizzle/use-catalog-for-path

« back to all changes in this revision

Viewing changes to plugin/slave/queue_consumer.cc

  • Committer: Lee Bieber
  • Date: 2011-04-02 03:51:56 UTC
  • mfrom: (2260.1.1 revert_slave)
  • Revision ID: kalebral@gmail.com-20110402035156-lhfvo7o6yqtli0xs
Merge Shrews - Revert the multi-master slave plugin changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
#include <config.h>
22
22
#include <plugin/slave/queue_consumer.h>
23
 
#include <drizzled/errmsg_print.h>
24
 
#include <drizzled/execute.h>
25
23
#include <drizzled/message/transaction.pb.h>
26
24
#include <drizzled/message/statement_transform.h>
27
25
#include <drizzled/sql/result_set.h>
 
26
#include <drizzled/execute.h>
28
27
#include <string>
29
28
#include <vector>
30
29
#include <boost/thread.hpp>
52
51
 
53
52
bool QueueConsumer::process()
54
53
{
55
 
  for (size_t index= 0; index < _master_ids.size(); index++)
56
 
  {
57
 
    /* We go ahead and get the string version of the master ID
58
 
     * so we don't have to keep converting it from int to string.
59
 
     */
60
 
    const string master_id= boost::lexical_cast<string>(_master_ids[index]);
61
 
 
62
 
    if (not processSingleMaster(master_id))
63
 
      return false;
64
 
  }
65
 
 
66
 
  return true;
67
 
}
68
 
 
69
 
bool QueueConsumer::processSingleMaster(const string &master_id)
70
 
{
71
54
  TrxIdList completedTransactionIds;
72
55
 
73
 
  getListOfCompletedTransactions(master_id, completedTransactionIds);
 
56
  getListOfCompletedTransactions(completedTransactionIds);
74
57
 
75
58
  for (size_t x= 0; x < completedTransactionIds.size(); x++)
76
59
  {
83
66
    message::Transaction transaction;
84
67
    uint32_t segment_id= 1;
85
68
 
86
 
    while (getMessage(transaction, commit_id, master_id, trx_id, segment_id++))
 
69
    while (getMessage(transaction, commit_id, trx_id, segment_id++))
87
70
    {
88
71
      convertToSQL(transaction, aggregate_sql, segmented_sql);
89
72
      transaction.Clear();
132
115
      }
133
116
    }
134
117
 
135
 
    if (not executeSQLWithCommitId(aggregate_sql, commit_id, master_id))
 
118
    if (not executeSQLWithCommitId(aggregate_sql, commit_id))
136
119
    {
137
 
      if (_ignore_errors)
138
 
      {
139
 
        clearErrorState();
140
 
 
141
 
        /* Still need to record that we handled this trx */
142
 
        vector<string> sql;
143
 
        string tmp("UPDATE `sys_replication`.`applier_state`"
144
 
                   " SET `last_applied_commit_id` = ");
145
 
        tmp.append(commit_id);
146
 
        tmp.append(" WHERE `master_id` = ");
147
 
        tmp.append(master_id);
148
 
        sql.push_back(tmp);
149
 
        executeSQL(sql);
150
 
      }
151
 
      else
152
 
      {
153
 
        return false;
154
 
      }
 
120
      return false;
155
121
    }
156
122
 
157
 
    if (not deleteFromQueue(master_id, trx_id))
 
123
    if (not deleteFromQueue(trx_id))
158
124
    {
159
125
      return false;
160
126
    }
166
132
 
167
133
bool QueueConsumer::getMessage(message::Transaction &transaction,
168
134
                              string &commit_id,
169
 
                              const string &master_id,
170
135
                              uint64_t trx_id,
171
136
                              uint32_t segment_id)
172
137
{
175
140
  sql.append(boost::lexical_cast<string>(trx_id));
176
141
  sql.append(" AND `seg_id` = ", 16);
177
142
  sql.append(boost::lexical_cast<string>(segment_id));
178
 
  sql.append(" AND `master_id` = ", 19),
179
 
  sql.append(master_id);
180
143
 
181
144
  sql::ResultSet result_set(2);
182
145
  Execute execute(*(_session.get()), true);
211
174
  return true;
212
175
}
213
176
 
214
 
bool QueueConsumer::getListOfCompletedTransactions(const string &master_id,
215
 
                                                   TrxIdList &list)
 
177
bool QueueConsumer::getListOfCompletedTransactions(TrxIdList &list)
216
178
{
217
179
  Execute execute(*(_session.get()), true);
218
180
  
219
181
  string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
220
182
             " WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
221
 
             " AND `master_id` = "
222
 
             + master_id
223
 
             + " ORDER BY `commit_order` ASC");
 
183
             " ORDER BY `commit_order` ASC");
224
184
  
225
185
  /* ResultSet size must match column count */
226
186
  sql::ResultSet result_set(1);
354
314
}
355
315
 
356
316
 
357
 
/*
358
 
 * TODO: This currently updates every row in the applier_state table.
359
 
 * This use to be a single row. With multi-master support, we now need
360
 
 * a row for every master so we can track the last applied commit ID
361
 
 * value for each. Eventually, we may want multiple consumer threads,
362
 
 * so then we'd need to update each row independently.
363
 
 */
364
317
void QueueConsumer::setApplierState(const string &err_msg, bool status)
365
318
{
366
319
  vector<string> statements;
403
356
 
404
357
 
405
358
bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
406
 
                                           const string &commit_id,
407
 
                                           const string &master_id)
 
359
                                           const string &commit_id)
408
360
{
409
361
  string tmp("UPDATE `sys_replication`.`applier_state`"
410
362
             " SET `last_applied_commit_id` = ");
411
363
  tmp.append(commit_id);
412
 
  tmp.append(" WHERE `master_id` = ");
413
 
  tmp.append(master_id);
414
364
  sql.push_back(tmp);
415
365
  
416
366
  return executeSQL(sql);
417
367
}
418
368
 
419
369
 
420
 
bool QueueConsumer::deleteFromQueue(const string &master_id, uint64_t trx_id)
 
370
bool QueueConsumer::deleteFromQueue(uint64_t trx_id)
421
371
{
422
372
  string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
423
373
  sql.append(boost::lexical_cast<std::string>(trx_id));
424
 
  sql.append(" AND `master_id` = ");
425
 
  sql.append(master_id);
426
374
 
427
375
  vector<string> sql_vect;
428
376
  sql_vect.push_back(sql);