~crunch.io/ubuntu/precise/pymongo/unstable

« back to all changes in this revision

Viewing changes to test/replica/replset_tools.py

  • Committer: Joseph Tate
  • Date: 2013-01-31 08:00:57 UTC
  • mfrom: (1.1.12)
  • Revision ID: jtate@dragonstrider.com-20130131080057-y7lv17xi6x8c1j5x
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2009-2011 10gen, Inc.
2
 
#
3
 
# Licensed under the Apache License, Version 2.0 (the "License");
4
 
# you may not use this file except in compliance with the License.
5
 
# You may obtain a copy of the License at
6
 
#
7
 
# http://www.apache.org/licenses/LICENSE-2.0
8
 
#
9
 
# Unless required by applicable law or agreed to in writing, software
10
 
# distributed under the License is distributed on an "AS IS" BASIS,
11
 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 
# See the License for the specific language governing permissions and
13
 
# limitations under the License.
14
 
 
15
 
"""Tools for testing PyMongo with a replica set."""
16
 
 
17
 
import os
18
 
import random
19
 
import shutil
20
 
import signal
21
 
import socket
22
 
import subprocess
23
 
import sys
24
 
import time
25
 
 
26
 
import pymongo
27
 
 
28
 
home = os.environ.get('HOME')
29
 
default_dbpath = os.path.join(home, 'data', 'pymongo_replica_set')
30
 
dbpath = os.environ.get('DBPATH', default_dbpath)
31
 
default_logpath = os.path.join(home, 'log', 'pymongo_replica_set')
32
 
logpath = os.environ.get('LOGPATH', default_logpath)
33
 
hostname = os.environ.get('HOSTNAME', socket.gethostname())
34
 
port = int(os.environ.get('DBPORT', 27017))
35
 
mongod = os.environ.get('MONGOD', 'mongod')
36
 
set_name = os.environ.get('SETNAME', 'repl0')
37
 
use_greenlets = bool(os.environ.get('GREENLETS'))
38
 
 
39
 
nodes = {}
40
 
 
41
 
 
42
 
def kill_members(members, sig):
43
 
    for member in members:
44
 
        try:
45
 
            proc = nodes[member]['proc']
46
 
            # Not sure if cygwin makes sense here...
47
 
            if sys.platform in ('win32', 'cygwin'):
48
 
                os.kill(proc.pid, signal.CTRL_C_EVENT)
49
 
            else:
50
 
                os.kill(proc.pid, sig)
51
 
        except OSError:
52
 
            pass  # already dead
53
 
 
54
 
 
55
 
def kill_all_members():
56
 
    kill_members(nodes.keys(), 2)
57
 
 
58
 
 
59
 
def wait_for(proc, port):
60
 
    trys = 0
61
 
    while proc.poll() is None and trys < 40:  # ~10 seconds
62
 
        trys += 1
63
 
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
64
 
        try:
65
 
            try:
66
 
                s.connect((hostname, port))
67
 
                return True
68
 
            except (IOError, socket.error):
69
 
                time.sleep(0.25)
70
 
        finally:
71
 
            s.close()
72
 
 
73
 
    kill_all_members()
74
 
    return False
75
 
 
76
 
 
77
 
def start_replica_set(members, fresh=True):
78
 
    if fresh:
79
 
        if os.path.exists(dbpath):
80
 
            try:
81
 
                shutil.rmtree(dbpath)
82
 
            except OSError:
83
 
                pass
84
 
 
85
 
    for i in xrange(len(members)):
86
 
        cur_port = port + i
87
 
        host = '%s:%d' % (hostname, cur_port)
88
 
        members[i].update({'_id': i, 'host': host})
89
 
        path = os.path.join(dbpath, 'db' + str(i))
90
 
        if not os.path.exists(path):
91
 
            os.makedirs(path)
92
 
        member_logpath = os.path.join(logpath, 'db' + str(i) + '.log')
93
 
        if not os.path.exists(os.path.dirname(member_logpath)):
94
 
            os.makedirs(os.path.dirname(member_logpath))
95
 
        cmd = [mongod,
96
 
               '--dbpath', path,
97
 
               '--port', str(cur_port),
98
 
               '--replSet', set_name,
99
 
               '--journal', '--oplogSize', '1',
100
 
               '--logpath', member_logpath]
101
 
        proc = subprocess.Popen(cmd,
102
 
                                stdout=subprocess.PIPE,
103
 
                                stderr=subprocess.STDOUT)
104
 
        nodes[host] = {'proc': proc, 'cmd': cmd}
105
 
        res = wait_for(proc, cur_port)
106
 
        if not res:
107
 
            return None
108
 
    config = {'_id': set_name, 'members': members}
109
 
    primary = members[0]['host']
110
 
    c = pymongo.Connection(primary, use_greenlets=use_greenlets)
111
 
    try:
112
 
        c.admin.command('replSetInitiate', config)
113
 
    except:
114
 
        # Already initialized from a previous run?
115
 
        pass
116
 
 
117
 
    expected_arbiters = 0
118
 
    for member in members:
119
 
        if member.get('arbiterOnly'):
120
 
            expected_arbiters += 1
121
 
    expected_secondaries = len(members) - expected_arbiters - 1
122
 
 
123
 
    while True:
124
 
        time.sleep(2)
125
 
        try:
126
 
            if (len(get_primary()) == 1 and
127
 
                len(get_secondaries()) == expected_secondaries and
128
 
                len(get_arbiters()) == expected_arbiters):
129
 
                break
130
 
        except pymongo.errors.AutoReconnect:
131
 
            # Keep waiting
132
 
            pass
133
 
    return primary, set_name
134
 
 
135
 
 
136
 
def get_members_in_state(state):
137
 
    c = pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
138
 
    status = c.admin.command('replSetGetStatus')
139
 
    members = status['members']
140
 
    return [k['name'] for k in members if k['state'] == state]
141
 
 
142
 
 
143
 
def get_primary():
144
 
    return get_members_in_state(1)
145
 
 
146
 
 
147
 
def get_random_secondary():
148
 
    secondaries = get_members_in_state(2)
149
 
    if len(secondaries):
150
 
        return [secondaries[random.randrange(0, len(secondaries))]]
151
 
    return secondaries
152
 
 
153
 
 
154
 
def get_secondaries():
155
 
    return get_members_in_state(2)
156
 
 
157
 
 
158
 
def get_arbiters():
159
 
    return get_members_in_state(7)
160
 
 
161
 
 
162
 
def get_passives():
163
 
    c = pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
164
 
    return c.admin.command('ismaster').get('passives', [])
165
 
 
166
 
 
167
 
def get_hosts():
168
 
    c = pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
169
 
    return c.admin.command('ismaster').get('hosts', [])
170
 
 
171
 
 
172
 
def get_hidden_members():
173
 
    # Both 'hidden' and 'slaveDelay'
174
 
    secondaries = get_secondaries()
175
 
    readers = get_hosts() + get_passives()
176
 
    for member in readers:
177
 
        try:
178
 
            secondaries.remove(member)
179
 
        except:
180
 
            # Skip primary
181
 
            pass
182
 
    return secondaries
183
 
 
184
 
 
185
 
def kill_primary(sig=2):
186
 
    primary = get_primary()
187
 
    kill_members(primary, sig)
188
 
    return primary
189
 
 
190
 
 
191
 
def kill_secondary(sig=2):
192
 
    secondary = get_random_secondary()
193
 
    kill_members(secondary, sig)
194
 
    return secondary
195
 
 
196
 
 
197
 
def kill_all_secondaries(sig=2):
198
 
    secondaries = get_secondaries()
199
 
    kill_members(secondaries, sig)
200
 
    return secondaries
201
 
 
202
 
 
203
 
def stepdown_primary():
204
 
    primary = get_primary()
205
 
    if primary:
206
 
        c = pymongo.Connection(primary, use_greenlets=use_greenlets)
207
 
        # replSetStepDown causes mongod to close all connections
208
 
        try:
209
 
            c.admin.command('replSetStepDown', 20)
210
 
        except:
211
 
            pass
212
 
 
213
 
 
214
 
def restart_members(members):
215
 
    restarted = []
216
 
    for member in members:
217
 
        cmd = nodes[member]['cmd']
218
 
        proc = subprocess.Popen(cmd,
219
 
                                stdout=subprocess.PIPE,
220
 
                                stderr=subprocess.STDOUT)
221
 
        nodes[member]['proc'] = proc
222
 
        res = wait_for(proc, int(member.split(':')[1]))
223
 
        if res:
224
 
            restarted.append(member)
225
 
    return restarted