1
# Copyright 2009-2011 10gen, Inc.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
15
"""Test replica set operations and failures."""
21
from replset_tools import use_greenlets
24
from pymongo import (ReplicaSetConnection,
26
from pymongo.connection import Connection, _partition_node
27
from pymongo.errors import AutoReconnect, ConnectionFailure
30
class TestReadPreference(unittest.TestCase):
33
members = [{}, {}, {'arbiterOnly': True}]
34
res = replset_tools.start_replica_set(members)
35
self.seed, self.name = res
37
def test_read_preference(self):
38
c = ReplicaSetConnection(
39
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
40
self.assertTrue(bool(len(c.secondaries)))
42
db.test.remove({}, safe=True, w=len(c.secondaries))
44
# Force replication...
45
w = len(c.secondaries) + 1
46
db.test.insert({'foo': 'bar'}, safe=True, w=w)
48
# Test direct connection to a secondary
49
host, port = replset_tools.get_secondaries()[0].split(':')
52
host, port, slave_okay=True, use_greenlets=use_greenlets)
53
self.assertEqual(host, conn.host)
54
self.assertEqual(port, conn.port)
55
self.assert_(conn.pymongo_test.test.find_one())
58
read_preference=ReadPreference.SECONDARY,
59
use_greenlets=use_greenlets)
60
self.assertEqual(host, conn.host)
61
self.assertEqual(port, conn.port)
62
self.assert_(conn.pymongo_test.test.find_one())
64
# Test direct connection to an arbiter
65
host = replset_tools.get_arbiters()[0]
67
ConnectionFailure, Connection, host, use_greenlets=use_greenlets)
71
cursor = db.test.find()
73
self.assertEqual(cursor._Cursor__connection_id, c.primary)
75
# Test SECONDARY with a secondary
76
db.read_preference = ReadPreference.SECONDARY
78
cursor = db.test.find()
80
self.assertTrue(cursor._Cursor__connection_id in c.secondaries)
82
# Test SECONDARY_ONLY with a secondary
83
db.read_preference = ReadPreference.SECONDARY_ONLY
85
cursor = db.test.find()
87
self.assertTrue(cursor._Cursor__connection_id in c.secondaries)
89
# Test SECONDARY with no secondary
90
killed = replset_tools.kill_all_secondaries()
91
sleep(5) # Let monitor thread notice change
92
self.assertTrue(bool(len(killed)))
93
db.read_preference = ReadPreference.SECONDARY
95
cursor = db.test.find()
97
self.assertEqual(cursor._Cursor__connection_id, c.primary)
99
# Test SECONDARY_ONLY with no secondary
100
db.read_preference = ReadPreference.SECONDARY_ONLY
102
cursor = db.test.find()
103
self.assertRaises(AutoReconnect, cursor.next)
105
replset_tools.restart_members(killed)
106
# Test PRIMARY with no primary (should raise an exception)
107
db.read_preference = ReadPreference.PRIMARY
108
cursor = db.test.find()
110
self.assertEqual(cursor._Cursor__connection_id, c.primary)
111
killed = replset_tools.kill_primary()
112
self.assertTrue(bool(len(killed)))
113
self.assertRaises(AutoReconnect, db.test.find_one)
116
replset_tools.kill_all_members()
119
class TestPassiveAndHidden(unittest.TestCase):
122
members = [{}, {'priority': 0}, {'arbiterOnly': True},
123
{'priority': 0, 'hidden': True}, {'priority': 0, 'slaveDelay': 5}]
124
res = replset_tools.start_replica_set(members)
125
self.seed, self.name = res
127
def test_passive_and_hidden(self):
128
c = ReplicaSetConnection(
129
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
131
db.test.remove({}, safe=True, w=len(c.secondaries))
132
w = len(c.secondaries) + 1
133
db.test.insert({'foo': 'bar'}, safe=True, w=w)
134
db.read_preference = ReadPreference.SECONDARY
136
passives = replset_tools.get_passives()
137
passives = [_partition_node(member) for member in passives]
138
hidden = replset_tools.get_hidden_members()
139
hidden = [_partition_node(member) for member in hidden]
140
self.assertEqual(c.secondaries, set(passives))
143
cursor = db.test.find()
145
self.assertTrue(cursor._Cursor__connection_id not in hidden)
147
replset_tools.kill_members(replset_tools.get_passives(), 2)
148
sleep(5) # Let monitor thread notice change
151
cursor = db.test.find()
153
self.assertEqual(cursor._Cursor__connection_id, c.primary)
156
replset_tools.kill_all_members()
158
class TestHealthMonitor(unittest.TestCase):
161
res = replset_tools.start_replica_set([{}, {}, {}])
162
self.seed, self.name = res
164
def test_primary_failure(self):
165
c = ReplicaSetConnection(
166
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
167
self.assertTrue(bool(len(c.secondaries)))
169
secondaries = c.secondaries
171
def primary_changed():
173
if c.primary != primary:
178
killed = replset_tools.kill_primary()
179
sleep(5) # Let monitor thread notice change
180
self.assertTrue(bool(len(killed)))
181
self.assertTrue(primary_changed())
182
self.assertTrue(secondaries != c.secondaries)
184
def test_secondary_failure(self):
185
c = ReplicaSetConnection(
186
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
187
self.assertTrue(bool(len(c.secondaries)))
189
secondaries = c.secondaries
191
def readers_changed():
193
if c.secondaries != secondaries:
199
killed = replset_tools.kill_secondary()
200
sleep(5) # Let monitor thread notice change
201
self.assertTrue(bool(len(killed)))
202
self.assertEqual(primary, c.primary)
203
self.assertTrue(readers_changed())
204
secondaries = c.secondaries
206
replset_tools.restart_members(killed)
207
self.assertEqual(primary, c.primary)
208
self.assertTrue(readers_changed())
210
def test_primary_stepdown(self):
211
c = ReplicaSetConnection(
212
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
213
self.assertTrue(bool(len(c.secondaries)))
215
secondaries = c.secondaries
217
def primary_changed():
219
if c.primary != primary:
224
replset_tools.stepdown_primary()
225
self.assertTrue(primary_changed())
226
self.assertTrue(secondaries != c.secondaries)
229
replset_tools.kill_all_members()
232
class TestWritesWithFailover(unittest.TestCase):
235
res = replset_tools.start_replica_set([{}, {}, {}])
236
self.seed, self.name = res
238
def test_writes_with_failover(self):
239
c = ReplicaSetConnection(
240
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
243
w = len(c.secondaries) + 1
244
db.test.remove({}, safe=True, w=w)
245
db.test.insert({'foo': 'bar'}, safe=True, w=w)
246
self.assertEqual('bar', db.test.find_one()['foo'])
251
db.test.insert({'bar': 'baz'}, safe=True)
253
except AutoReconnect:
257
killed = replset_tools.kill_primary(9)
258
self.assertTrue(bool(len(killed)))
259
self.assertTrue(try_write())
260
self.assertTrue(primary != c.primary)
261
self.assertEqual('baz', db.test.find_one({'bar': 'baz'})['bar'])
264
replset_tools.kill_all_members()
267
class TestReadWithFailover(unittest.TestCase):
270
res = replset_tools.start_replica_set([{}, {}, {}])
271
self.seed, self.name = res
273
def test_read_with_failover(self):
274
c = ReplicaSetConnection(
275
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
276
self.assertTrue(bool(len(c.secondaries)))
278
def iter_cursor(cursor):
284
w = len(c.secondaries) + 1
285
db.test.remove({}, safe=True, w=w)
287
db.test.insert([{'foo': i} for i in xrange(10)],
289
self.assertEqual(10, db.test.count())
291
db.read_preference = ReadPreference.SECONDARY
292
cursor = db.test.find().batch_size(5)
294
self.assertEqual(5, cursor._Cursor__retrieved)
295
killed = replset_tools.kill_primary()
296
# Primary failure shouldn't interrupt the cursor
297
self.assertTrue(iter_cursor(cursor))
298
self.assertEqual(10, cursor._Cursor__retrieved)
301
replset_tools.kill_all_members()
303
if __name__ == '__main__':
305
print('Using Gevent')
307
print('gevent version %s' % gevent.__version__)
309
if gevent.__version__ == '0.13.6':
310
print('method %s' % gevent.core.get_method())
312
print(gevent.hub.get_hub())
313
from gevent import monkey
314
monkey.patch_socket()