1
# Copyright 2009-2012 10gen, Inc.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
15
"""Tools for testing high availability in PyMongo."""
26
from stat import S_IRUSR
31
home = os.environ.get('HOME')
32
default_dbpath = os.path.join(home, 'data', 'pymongo_high_availability')
33
dbpath = os.environ.get('DBPATH', default_dbpath)
34
default_logpath = os.path.join(home, 'log', 'pymongo_high_availability')
35
logpath = os.environ.get('LOGPATH', default_logpath)
36
hostname = os.environ.get('HOSTNAME', socket.gethostname())
37
port = int(os.environ.get('DBPORT', 27017))
38
mongod = os.environ.get('MONGOD', 'mongod')
39
mongos = os.environ.get('MONGOS', 'mongos')
40
set_name = os.environ.get('SETNAME', 'repl0')
41
use_greenlets = bool(os.environ.get('GREENLETS'))
42
ha_tools_debug = bool(os.environ.get('HA_TOOLS_DEBUG'))
50
def kill_members(members, sig, hosts=nodes):
51
for member in members:
54
print 'killing', member
55
proc = hosts[member]['proc']
56
# Not sure if cygwin makes sense here...
57
if sys.platform in ('win32', 'cygwin'):
58
os.kill(proc.pid, signal.CTRL_C_EVENT)
60
os.kill(proc.pid, sig)
63
print member, 'already dead?'
67
def kill_all_members():
68
kill_members(nodes.keys(), 2, nodes)
69
kill_members(routers.keys(), 2, routers)
72
def wait_for(proc, port_num):
74
while proc.poll() is None and trys < 160:
76
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
79
s.connect((hostname, port_num))
81
except (IOError, socket.error):
90
def start_replica_set(members, auth=False, fresh=True):
94
if os.path.exists(dbpath):
102
key_file = os.path.join(dbpath, 'key.txt')
103
if not os.path.exists(key_file):
104
f = open(key_file, 'w')
106
f.write("my super secret system password")
109
os.chmod(key_file, S_IRUSR)
111
for i in xrange(len(members)):
112
host = '%s:%d' % (hostname, cur_port)
113
members[i].update({'_id': i, 'host': host})
114
path = os.path.join(dbpath, 'db' + str(i))
115
if not os.path.exists(path):
117
member_logpath = os.path.join(logpath, 'db' + str(i) + '.log')
118
if not os.path.exists(os.path.dirname(member_logpath)):
119
os.makedirs(os.path.dirname(member_logpath))
122
'--port', str(cur_port),
123
'--replSet', set_name,
124
'--nojournal', '--oplogSize', '64',
125
'--logappend', '--logpath', member_logpath]
127
cmd += ['--keyFile', key_file]
130
print 'starting', ' '.join(cmd)
132
proc = subprocess.Popen(cmd,
133
stdout=subprocess.PIPE,
134
stderr=subprocess.STDOUT)
135
nodes[host] = {'proc': proc, 'cmd': cmd}
136
res = wait_for(proc, cur_port)
143
config = {'_id': set_name, 'members': members}
144
primary = members[0]['host']
145
c = pymongo.Connection(primary, use_greenlets=use_greenlets)
148
print 'rs.initiate(%s)' % config
150
c.admin.command('replSetInitiate', config)
151
except pymongo.errors.OperationFailure, e:
152
# Already initialized from a previous run?
156
expected_arbiters = 0
157
for member in members:
158
if member.get('arbiterOnly'):
159
expected_arbiters += 1
160
expected_secondaries = len(members) - expected_arbiters - 1
162
# Wait for 8 minutes for replica set to come up
164
for i in range(patience * 60 / 2):
167
if (get_primary() and
168
len(get_secondaries()) == expected_secondaries and
169
len(get_arbiters()) == expected_arbiters):
171
except pymongo.errors.ConnectionFailure:
176
print 'waiting for RS', i
180
"Replica set still not initalized after %s minutes" % patience)
181
return primary, set_name
184
def create_sharded_cluster(num_routers=3):
187
# Start a config server
188
configdb_host = '%s:%d' % (hostname, cur_port)
189
path = os.path.join(dbpath, 'configdb')
190
if not os.path.exists(path):
192
configdb_logpath = os.path.join(logpath, 'configdb.log')
195
'--port', str(cur_port),
196
'--nojournal', '--logappend',
197
'--logpath', configdb_logpath]
198
proc = subprocess.Popen(cmd,
199
stdout=subprocess.PIPE,
200
stderr=subprocess.STDOUT)
201
nodes[configdb_host] = {'proc': proc, 'cmd': cmd}
202
res = wait_for(proc, cur_port)
206
# ...and a shard server
207
cur_port = cur_port + 1
208
shard_host = '%s:%d' % (hostname, cur_port)
209
path = os.path.join(dbpath, 'shard1')
210
if not os.path.exists(path):
212
db_logpath = os.path.join(logpath, 'shard1.log')
215
'--port', str(cur_port),
216
'--nojournal', '--logappend',
217
'--logpath', db_logpath]
218
proc = subprocess.Popen(cmd,
219
stdout=subprocess.PIPE,
220
stderr=subprocess.STDOUT)
221
nodes[shard_host] = {'proc': proc, 'cmd': cmd}
222
res = wait_for(proc, cur_port)
226
# ...and a few mongos instances
227
cur_port = cur_port + 1
228
for i in xrange(num_routers):
229
cur_port = cur_port + i
230
host = '%s:%d' % (hostname, cur_port)
231
mongos_logpath = os.path.join(logpath, 'mongos' + str(i) + '.log')
233
'--port', str(cur_port),
235
'--logpath', mongos_logpath,
236
'--configdb', configdb_host]
237
proc = subprocess.Popen(cmd,
238
stdout=subprocess.PIPE,
239
stderr=subprocess.STDOUT)
240
routers[host] = {'proc': proc, 'cmd': cmd}
241
res = wait_for(proc, cur_port)
246
conn = pymongo.Connection(host)
248
conn.admin.command({'addshard': shard_host})
249
except pymongo.errors.OperationFailure:
250
# Already configured.
253
return get_mongos_seed_list()
256
# Connect to a random member
257
def get_connection():
258
return pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
261
def get_mongos_seed_list():
262
members = routers.keys()
263
return ','.join(members)
266
def kill_mongos(host):
267
kill_members([host], 2, hosts=routers)
271
def restart_mongos(host):
272
restart_members([host], True)
275
def get_members_in_state(state):
276
status = get_connection().admin.command('replSetGetStatus')
277
members = status['members']
278
return [k['name'] for k in members if k['state'] == state]
283
primaries = get_members_in_state(1)
284
assert len(primaries) <= 1
287
except pymongo.errors.ConnectionFailure:
293
def get_random_secondary():
294
secondaries = get_members_in_state(2)
296
return random.choice(secondaries)
300
def get_secondaries():
301
return get_members_in_state(2)
305
return get_members_in_state(7)
308
def get_recovering():
309
return get_members_in_state(3)
313
return get_connection().admin.command('ismaster').get('passives', [])
317
return get_connection().admin.command('ismaster').get('hosts', [])
320
def get_hidden_members():
321
# Both 'hidden' and 'slaveDelay'
322
secondaries = get_secondaries()
323
readers = get_hosts() + get_passives()
324
for member in readers:
326
secondaries.remove(member)
333
def get_tags(member):
334
config = get_connection().local.system.replset.find_one()
335
for m in config['members']:
336
if m['host'] == member:
337
return m.get('tags', {})
339
raise Exception('member %s not in config' % repr(member))
342
def kill_primary(sig=2):
343
primary = get_primary()
344
kill_members([primary], sig)
348
def kill_secondary(sig=2):
349
secondary = get_random_secondary()
350
kill_members([secondary], sig)
354
def kill_all_secondaries(sig=2):
355
secondaries = get_secondaries()
356
kill_members(secondaries, sig)
360
def stepdown_primary():
361
primary = get_primary()
363
c = pymongo.Connection(primary, use_greenlets=use_greenlets)
364
# replSetStepDown causes mongod to close all connections
366
c.admin.command('replSetStepDown', 20)
371
def set_maintenance(member, value):
372
"""Put a member into RECOVERING state if value is True, else normal state.
374
c = pymongo.Connection(member, use_greenlets=use_greenlets)
375
c.admin.command('replSetMaintenance', value)
377
while value != (member in get_recovering()):
378
assert (time.time() - start) <= 10, (
379
"Member %s never switched state" % member)
384
def restart_members(members, router=False):
386
for member in members:
388
cmd = routers[member]['cmd']
390
cmd = nodes[member]['cmd']
391
proc = subprocess.Popen(cmd,
392
stdout=subprocess.PIPE,
393
stderr=subprocess.STDOUT)
395
routers[member]['proc'] = proc
397
nodes[member]['proc'] = proc
398
res = wait_for(proc, int(member.split(':')[1]))
400
restarted.append(member)