~crunch.io/ubuntu/precise/pymongo/unstable

« back to all changes in this revision

Viewing changes to test/high_availability/ha_tools.py

  • Committer: Joseph Tate
  • Date: 2013-01-31 08:00:57 UTC
  • mfrom: (1.1.12)
  • Revision ID: jtate@dragonstrider.com-20130131080057-y7lv17xi6x8c1j5x
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2009-2012 10gen, Inc.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
# http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
12
# See the License for the specific language governing permissions and
 
13
# limitations under the License.
 
14
 
 
15
"""Tools for testing high availability in PyMongo."""
 
16
 
 
17
import os
 
18
import random
 
19
import shutil
 
20
import signal
 
21
import socket
 
22
import subprocess
 
23
import sys
 
24
import time
 
25
 
 
26
from stat import S_IRUSR
 
27
 
 
28
import pymongo
 
29
import pymongo.errors
 
30
 
 
31
home = os.environ.get('HOME')
 
32
default_dbpath = os.path.join(home, 'data', 'pymongo_high_availability')
 
33
dbpath = os.environ.get('DBPATH', default_dbpath)
 
34
default_logpath = os.path.join(home, 'log', 'pymongo_high_availability')
 
35
logpath = os.environ.get('LOGPATH', default_logpath)
 
36
hostname = os.environ.get('HOSTNAME', socket.gethostname())
 
37
port = int(os.environ.get('DBPORT', 27017))
 
38
mongod = os.environ.get('MONGOD', 'mongod')
 
39
mongos = os.environ.get('MONGOS', 'mongos')
 
40
set_name = os.environ.get('SETNAME', 'repl0')
 
41
use_greenlets = bool(os.environ.get('GREENLETS'))
 
42
ha_tools_debug = bool(os.environ.get('HA_TOOLS_DEBUG'))
 
43
 
 
44
 
 
45
nodes = {}
 
46
routers = {}
 
47
cur_port = port
 
48
 
 
49
 
 
50
def kill_members(members, sig, hosts=nodes):
 
51
    for member in members:
 
52
        try:
 
53
            if ha_tools_debug:
 
54
                print 'killing', member
 
55
            proc = hosts[member]['proc']
 
56
            # Not sure if cygwin makes sense here...
 
57
            if sys.platform in ('win32', 'cygwin'):
 
58
                os.kill(proc.pid, signal.CTRL_C_EVENT)
 
59
            else:
 
60
                os.kill(proc.pid, sig)
 
61
        except OSError:
 
62
            if ha_tools_debug:
 
63
                print member, 'already dead?'
 
64
            pass  # already dead
 
65
 
 
66
 
 
67
def kill_all_members():
 
68
    kill_members(nodes.keys(), 2, nodes)
 
69
    kill_members(routers.keys(), 2, routers)
 
70
 
 
71
 
 
72
def wait_for(proc, port_num):
 
73
    trys = 0
 
74
    while proc.poll() is None and trys < 160:
 
75
        trys += 1
 
76
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
77
        try:
 
78
            try:
 
79
                s.connect((hostname, port_num))
 
80
                return True
 
81
            except (IOError, socket.error):
 
82
                time.sleep(0.25)
 
83
        finally:
 
84
            s.close()
 
85
 
 
86
    kill_all_members()
 
87
    return False
 
88
 
 
89
 
 
90
def start_replica_set(members, auth=False, fresh=True):
 
91
    global cur_port
 
92
 
 
93
    if fresh:
 
94
        if os.path.exists(dbpath):
 
95
            try:
 
96
                shutil.rmtree(dbpath)
 
97
            except OSError:
 
98
                pass
 
99
        os.makedirs(dbpath)
 
100
 
 
101
    if auth:
 
102
        key_file = os.path.join(dbpath, 'key.txt')
 
103
        if not os.path.exists(key_file):
 
104
            f = open(key_file, 'w')
 
105
            try:
 
106
                f.write("my super secret system password")
 
107
            finally:
 
108
                f.close()
 
109
            os.chmod(key_file, S_IRUSR)
 
110
 
 
111
    for i in xrange(len(members)):
 
112
        host = '%s:%d' % (hostname, cur_port)
 
113
        members[i].update({'_id': i, 'host': host})
 
114
        path = os.path.join(dbpath, 'db' + str(i))
 
115
        if not os.path.exists(path):
 
116
            os.makedirs(path)
 
117
        member_logpath = os.path.join(logpath, 'db' + str(i) + '.log')
 
118
        if not os.path.exists(os.path.dirname(member_logpath)):
 
119
            os.makedirs(os.path.dirname(member_logpath))
 
120
        cmd = [mongod,
 
121
               '--dbpath', path,
 
122
               '--port', str(cur_port),
 
123
               '--replSet', set_name,
 
124
               '--nojournal', '--oplogSize', '64',
 
125
               '--logappend', '--logpath', member_logpath]
 
126
        if auth:
 
127
            cmd += ['--keyFile', key_file]
 
128
 
 
129
        if ha_tools_debug:
 
130
            print 'starting', ' '.join(cmd)
 
131
 
 
132
        proc = subprocess.Popen(cmd,
 
133
                                stdout=subprocess.PIPE,
 
134
                                stderr=subprocess.STDOUT)
 
135
        nodes[host] = {'proc': proc, 'cmd': cmd}
 
136
        res = wait_for(proc, cur_port)
 
137
 
 
138
        cur_port += 1
 
139
 
 
140
        if not res:
 
141
            return None
 
142
 
 
143
    config = {'_id': set_name, 'members': members}
 
144
    primary = members[0]['host']
 
145
    c = pymongo.Connection(primary, use_greenlets=use_greenlets)
 
146
    try:
 
147
        if ha_tools_debug:
 
148
            print 'rs.initiate(%s)' % config
 
149
 
 
150
        c.admin.command('replSetInitiate', config)
 
151
    except pymongo.errors.OperationFailure, e:
 
152
        # Already initialized from a previous run?
 
153
        if ha_tools_debug:
 
154
            print e
 
155
 
 
156
    expected_arbiters = 0
 
157
    for member in members:
 
158
        if member.get('arbiterOnly'):
 
159
            expected_arbiters += 1
 
160
    expected_secondaries = len(members) - expected_arbiters - 1
 
161
 
 
162
    # Wait for 8 minutes for replica set to come up
 
163
    patience = 8
 
164
    for i in range(patience * 60 / 2):
 
165
        time.sleep(2)
 
166
        try:
 
167
            if (get_primary() and
 
168
                len(get_secondaries()) == expected_secondaries and
 
169
                len(get_arbiters()) == expected_arbiters):
 
170
                break
 
171
        except pymongo.errors.ConnectionFailure:
 
172
            # Keep waiting
 
173
            pass
 
174
 
 
175
        if ha_tools_debug:
 
176
            print 'waiting for RS', i
 
177
    else:
 
178
        kill_all_members()
 
179
        raise Exception(
 
180
            "Replica set still not initalized after %s minutes" % patience)
 
181
    return primary, set_name
 
182
 
 
183
 
 
184
def create_sharded_cluster(num_routers=3):
 
185
    global cur_port
 
186
 
 
187
    # Start a config server
 
188
    configdb_host = '%s:%d' % (hostname, cur_port)
 
189
    path = os.path.join(dbpath, 'configdb')
 
190
    if not os.path.exists(path):
 
191
        os.makedirs(path)
 
192
    configdb_logpath = os.path.join(logpath, 'configdb.log')
 
193
    cmd = [mongod,
 
194
           '--dbpath', path,
 
195
           '--port', str(cur_port),
 
196
           '--nojournal', '--logappend',
 
197
           '--logpath', configdb_logpath]
 
198
    proc = subprocess.Popen(cmd,
 
199
                            stdout=subprocess.PIPE,
 
200
                            stderr=subprocess.STDOUT)
 
201
    nodes[configdb_host] = {'proc': proc, 'cmd': cmd}
 
202
    res = wait_for(proc, cur_port)
 
203
    if not res:
 
204
        return None
 
205
 
 
206
    # ...and a shard server
 
207
    cur_port = cur_port + 1
 
208
    shard_host = '%s:%d' % (hostname, cur_port)
 
209
    path = os.path.join(dbpath, 'shard1')
 
210
    if not os.path.exists(path):
 
211
        os.makedirs(path)
 
212
    db_logpath = os.path.join(logpath, 'shard1.log')
 
213
    cmd = [mongod,
 
214
           '--dbpath', path,
 
215
           '--port', str(cur_port),
 
216
           '--nojournal', '--logappend',
 
217
           '--logpath', db_logpath]
 
218
    proc = subprocess.Popen(cmd,
 
219
                            stdout=subprocess.PIPE,
 
220
                            stderr=subprocess.STDOUT)
 
221
    nodes[shard_host] = {'proc': proc, 'cmd': cmd}
 
222
    res = wait_for(proc, cur_port)
 
223
    if not res:
 
224
        return None
 
225
 
 
226
    # ...and a few mongos instances
 
227
    cur_port = cur_port + 1
 
228
    for i in xrange(num_routers):
 
229
        cur_port = cur_port + i
 
230
        host = '%s:%d' % (hostname, cur_port)
 
231
        mongos_logpath = os.path.join(logpath, 'mongos' + str(i) + '.log')
 
232
        cmd = [mongos,
 
233
               '--port', str(cur_port),
 
234
               '--logappend',
 
235
               '--logpath', mongos_logpath,
 
236
               '--configdb', configdb_host]
 
237
        proc = subprocess.Popen(cmd,
 
238
                                stdout=subprocess.PIPE,
 
239
                                stderr=subprocess.STDOUT)
 
240
        routers[host] = {'proc': proc, 'cmd': cmd}
 
241
        res = wait_for(proc, cur_port)
 
242
        if not res:
 
243
            return None
 
244
 
 
245
    # Add the shard
 
246
    conn = pymongo.Connection(host)
 
247
    try:
 
248
        conn.admin.command({'addshard': shard_host})
 
249
    except pymongo.errors.OperationFailure:
 
250
        # Already configured.
 
251
        pass
 
252
 
 
253
    return get_mongos_seed_list()
 
254
 
 
255
 
 
256
# Connect to a random member
 
257
def get_connection():
 
258
    return pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
 
259
 
 
260
 
 
261
def get_mongos_seed_list():
 
262
    members = routers.keys()
 
263
    return ','.join(members)
 
264
 
 
265
 
 
266
def kill_mongos(host):
 
267
    kill_members([host], 2, hosts=routers)
 
268
    return host
 
269
 
 
270
 
 
271
def restart_mongos(host):
 
272
    restart_members([host], True)
 
273
 
 
274
 
 
275
def get_members_in_state(state):
 
276
    status = get_connection().admin.command('replSetGetStatus')
 
277
    members = status['members']
 
278
    return [k['name'] for k in members if k['state'] == state]
 
279
 
 
280
 
 
281
def get_primary():
 
282
    try:
 
283
        primaries = get_members_in_state(1)
 
284
        assert len(primaries) <= 1
 
285
        if primaries:
 
286
            return primaries[0]
 
287
    except pymongo.errors.ConnectionFailure:
 
288
        pass
 
289
 
 
290
    return None
 
291
 
 
292
 
 
293
def get_random_secondary():
 
294
    secondaries = get_members_in_state(2)
 
295
    if len(secondaries):
 
296
        return random.choice(secondaries)
 
297
    return None
 
298
 
 
299
 
 
300
def get_secondaries():
 
301
    return get_members_in_state(2)
 
302
 
 
303
 
 
304
def get_arbiters():
 
305
    return get_members_in_state(7)
 
306
 
 
307
 
 
308
def get_recovering():
 
309
    return get_members_in_state(3)
 
310
 
 
311
 
 
312
def get_passives():
 
313
    return get_connection().admin.command('ismaster').get('passives', [])
 
314
 
 
315
 
 
316
def get_hosts():
 
317
    return get_connection().admin.command('ismaster').get('hosts', [])
 
318
 
 
319
 
 
320
def get_hidden_members():
 
321
    # Both 'hidden' and 'slaveDelay'
 
322
    secondaries = get_secondaries()
 
323
    readers = get_hosts() + get_passives()
 
324
    for member in readers:
 
325
        try:
 
326
            secondaries.remove(member)
 
327
        except:
 
328
            # Skip primary
 
329
            pass
 
330
    return secondaries
 
331
 
 
332
 
 
333
def get_tags(member):
 
334
    config = get_connection().local.system.replset.find_one()
 
335
    for m in config['members']:
 
336
        if m['host'] == member:
 
337
            return m.get('tags', {})
 
338
 
 
339
    raise Exception('member %s not in config' % repr(member))
 
340
 
 
341
 
 
342
def kill_primary(sig=2):
 
343
    primary = get_primary()
 
344
    kill_members([primary], sig)
 
345
    return primary
 
346
 
 
347
 
 
348
def kill_secondary(sig=2):
 
349
    secondary = get_random_secondary()
 
350
    kill_members([secondary], sig)
 
351
    return secondary
 
352
 
 
353
 
 
354
def kill_all_secondaries(sig=2):
 
355
    secondaries = get_secondaries()
 
356
    kill_members(secondaries, sig)
 
357
    return secondaries
 
358
 
 
359
 
 
360
def stepdown_primary():
 
361
    primary = get_primary()
 
362
    if primary:
 
363
        c = pymongo.Connection(primary, use_greenlets=use_greenlets)
 
364
        # replSetStepDown causes mongod to close all connections
 
365
        try:
 
366
            c.admin.command('replSetStepDown', 20)
 
367
        except:
 
368
            pass
 
369
 
 
370
 
 
371
def set_maintenance(member, value):
 
372
    """Put a member into RECOVERING state if value is True, else normal state.
 
373
    """
 
374
    c = pymongo.Connection(member, use_greenlets=use_greenlets)
 
375
    c.admin.command('replSetMaintenance', value)
 
376
    start = time.time()
 
377
    while value != (member in get_recovering()):
 
378
        assert (time.time() - start) <= 10, (
 
379
            "Member %s never switched state" % member)
 
380
 
 
381
        time.sleep(0.25)
 
382
 
 
383
 
 
384
def restart_members(members, router=False):
 
385
    restarted = []
 
386
    for member in members:
 
387
        if router:
 
388
            cmd = routers[member]['cmd']
 
389
        else:
 
390
            cmd = nodes[member]['cmd']
 
391
        proc = subprocess.Popen(cmd,
 
392
                                stdout=subprocess.PIPE,
 
393
                                stderr=subprocess.STDOUT)
 
394
        if router:
 
395
            routers[member]['proc'] = proc
 
396
        else:
 
397
            nodes[member]['proc'] = proc
 
398
        res = wait_for(proc, int(member.split(':')[1]))
 
399
        if res:
 
400
            restarted.append(member)
 
401
    return restarted