~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/slave/replication_slave.cc

  • Committer: Package Import Robot
  • Author(s): Clint Byrum
  • Date: 2012-06-19 10:46:49 UTC
  • mfrom: (1.1.6)
  • mto: This revision was merged to the branch mainline in revision 29.
  • Revision ID: package-import@ubuntu.com-20120619104649-e2l0ggd4oz3um0f4
Tags: upstream-7.1.36-stable
ImportĀ upstreamĀ versionĀ 7.1.36-stable

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
#include <config.h>
22
22
#include <plugin/slave/replication_slave.h>
 
23
#include <drizzled/errmsg_print.h>
23
24
#include <drizzled/program_options/config_file.h>
24
 
#include <drizzled/errmsg_print.h>
 
25
#include <boost/lexical_cast.hpp>
25
26
#include <boost/program_options.hpp>
26
27
#include <fstream>
 
28
#include <drizzled/plugin.h>
27
29
 
28
30
using namespace std;
29
31
using namespace drizzled;
33
35
namespace slave
34
36
{
35
37
 
36
 
/* Gets called after all plugins are initialized. */
37
38
void ReplicationSlave::startup(Session &session)
38
39
{
39
40
  (void)session;
40
41
  if (not initWithConfig())
41
42
  {
42
 
    errmsg_printf(error::ERROR,
43
 
                  _("Could not start slave services: %s\n"),
44
 
                  getError().c_str());
 
43
    errmsg_printf(error::ERROR, _("Could not start slave services: %s\n"),
 
44
                  _error.c_str());
45
45
  }
46
46
  else
47
47
  {
 
48
    /* Start the IO threads */
 
49
    boost::unordered_map<uint32_t, Master *>::const_iterator it;
 
50
    for (it= _masters.begin(); it != _masters.end(); ++it)
 
51
    {
 
52
      it->second->start();
 
53
      /* Consumer must know server IDs */
 
54
      _consumer.addMasterId(it->first);
 
55
    }
 
56
 
48
57
    _consumer_thread= boost::thread(&QueueConsumer::run, &_consumer);
49
 
    _producer_thread= boost::thread(&QueueProducer::run, &_producer);
50
58
  }
51
59
}
52
60
 
55
63
  po::variables_map vm;
56
64
  po::options_description slave_options("Options for the slave plugin");
57
65
 
 
66
  /* Common options */
58
67
  slave_options.add_options()
59
 
    ("master-host", po::value<string>()->default_value(""))
60
 
    ("master-port", po::value<uint16_t>()->default_value(3306))
61
 
    ("master-user", po::value<string>()->default_value(""))
62
 
    ("master-pass", po::value<string>()->default_value(""))
63
 
    ("max-reconnects", po::value<uint32_t>()->default_value(10))
64
68
    ("seconds-between-reconnects", po::value<uint32_t>()->default_value(30))
65
69
    ("io-thread-sleep", po::value<uint32_t>()->default_value(5))
66
 
    ("applier-thread-sleep", po::value<uint32_t>()->default_value(5));
 
70
    ("applier-thread-sleep", po::value<uint32_t>()->default_value(5))
 
71
    ("ignore-errors", po::value<bool>()->default_value(false)->zero_tokens());
 
72
 
 
73
  /* Master defining options */
 
74
  for (size_t num= 1; num <= 10; num++)
 
75
  {
 
76
    string section("master");
 
77
    section.append(boost::lexical_cast<string>(num));
 
78
    slave_options.add_options()
 
79
      ((section + ".master-host").c_str(), po::value<string>()->default_value(""))
 
80
      ((section + ".master-port").c_str(), po::value<uint16_t>()->default_value(3306))
 
81
      ((section + ".master-user").c_str(), po::value<string>()->default_value(""))
 
82
      ((section + ".master-pass").c_str(), po::value<string>()->default_value(""))
 
83
      ((section + ".max-reconnects").c_str(), po::value<uint32_t>()->default_value(10))
 
84
      ((section + ".max-commit-id").c_str(), po::value<uint64_t>());
 
85
   }
67
86
 
68
87
  ifstream cf_stream(_config_file.c_str());
69
88
 
78
97
 
79
98
  po::notify(vm);
80
99
 
81
 
  if (vm.count("master-host"))
82
 
    _producer.setMasterHost(vm["master-host"].as<string>());
83
 
 
84
 
  if (vm.count("master-port"))
85
 
    _producer.setMasterPort(vm["master-port"].as<uint16_t>());
86
 
 
87
 
  if (vm.count("master-user"))
88
 
    _producer.setMasterUser(vm["master-user"].as<string>());
89
 
 
90
 
  if (vm.count("master-pass"))
91
 
    _producer.setMasterPassword(vm["master-pass"].as<string>());
92
 
 
93
 
  if (vm.count("max-reconnects"))
94
 
    _producer.setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
95
 
 
96
 
  if (vm.count("seconds-between-reconnects"))
97
 
    _producer.setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
98
 
 
99
 
  if (vm.count("io-thread-sleep"))
100
 
    _producer.setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
 
100
  /*
 
101
   * We will support 10 masters. This loope effectively creates the Master
 
102
   * objects as they are referenced.
 
103
   *
 
104
   * @todo Support a variable number of master hosts.
 
105
   */
 
106
  for (size_t num= 1; num <= 10; num++)
 
107
  {
 
108
    string section("master");
 
109
    section.append(boost::lexical_cast<string>(num));
 
110
 
 
111
    /* WARNING! Hack!
 
112
     * We need to be able to determine when a master host is actually defined
 
113
     * by the user vs. we are just using defaults. So if the hostname is ever
 
114
     * the default value of "", then we'll assume that this section was not
 
115
     * user defined.
 
116
     */
 
117
    if (vm[section + ".master-host"].as<string>() == "")
 
118
      continue;
 
119
 
 
120
    _masters[num]= new (std::nothrow) Master(num);
 
121
 
 
122
    if (vm.count(section + ".master-host"))
 
123
      master(num).producer().setMasterHost(vm[section + ".master-host"].as<string>());
 
124
 
 
125
    if (vm.count(section + ".master-port"))
 
126
      master(num).producer().setMasterPort(vm[section + ".master-port"].as<uint16_t>());
 
127
 
 
128
    if (vm.count(section + ".master-user"))
 
129
      master(num).producer().setMasterUser(vm[section + ".master-user"].as<string>());
 
130
 
 
131
    if (vm.count(section + ".master-pass"))
 
132
      master(num).producer().setMasterPassword(vm[section + ".master-pass"].as<string>());
 
133
 
 
134
    if (vm.count(section + ".max-commit-id"))
 
135
      master(num).producer().setCachedMaxCommitId(vm[section + ".max-commit-id"].as<uint64_t>());
 
136
  }
 
137
 
 
138
  boost::unordered_map<uint32_t, Master *>::const_iterator it;
 
139
 
 
140
  for (it= _masters.begin(); it != _masters.end(); ++it)
 
141
  {
 
142
    if (vm.count("max-reconnects"))
 
143
      it->second->producer().setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
 
144
 
 
145
    if (vm.count("seconds-between-reconnects"))
 
146
      it->second->producer().setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
 
147
 
 
148
    if (vm.count("io-thread-sleep"))
 
149
      it->second->producer().setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
 
150
  }
101
151
 
102
152
  if (vm.count("applier-thread-sleep"))
103
153
    _consumer.setSleepInterval(vm["applier-thread-sleep"].as<uint32_t>());
 
154
  if (vm.count("ignore-errors"))
 
155
    _consumer.setIgnoreErrors(vm["ignore-errors"].as<bool>());
104
156
 
105
157
  /* setup schema and tables */
106
158
  ReplicationSchema rs;
110
162
    return false;
111
163
  }
112
164
 
113
 
  if (_initial_max_commit_id)
 
165
  for (it= _masters.begin(); it != _masters.end(); ++it)
114
166
  {
115
 
    if (not rs.setInitialMaxCommitId(_initial_max_commit_id))
 
167
    /* make certain a row exists for each master */
 
168
    rs.createInitialApplierRow(it->first);
 
169
    rs.createInitialIORow(it->first);
 
170
 
 
171
    uint64_t cachedValue= it->second->producer().cachedMaxCommitId();
 
172
    if (cachedValue)
116
173
    {
117
 
      _error= rs.getErrorMessage();
118
 
      return false;
 
174
      if (not rs.setInitialMaxCommitId(it->first, cachedValue))
 
175
      {
 
176
        _error= rs.getErrorMessage();
 
177
        return false;
 
178
      }
119
179
    }
120
180
  }
121
181