1
# Copyright 2011-2012 10gen, Inc.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
15
"""Test the replica_set_connection module."""
21
from nose.plugins.skip import SkipTest
25
from bson.son import SON
26
from pymongo.cursor import _QUERY_OPTIONS
27
from pymongo.replica_set_connection import ReplicaSetConnection
28
from pymongo.read_preferences import (ReadPreference, modes, MovingAverage,
29
secondary_ok_commands)
30
from pymongo.errors import ConfigurationError
32
from test.test_replica_set_connection import TestConnectionReplicaSetBase
33
from test.test_connection import get_connection, host, port
34
from test import version, utils
37
class TestReadPreferencesBase(TestConnectionReplicaSetBase):
39
super(TestReadPreferencesBase, self).setUp()
40
# Insert some data so we can use cursors in read_from_which_host
41
c = self._get_connection()
42
c.pymongo_test.test.drop()
43
c.pymongo_test.test.insert([{'_id': i} for i in range(10)], w=self.w)
46
super(TestReadPreferencesBase, self).tearDown()
47
c = self._get_connection()
48
c.pymongo_test.test.drop()
50
def read_from_which_host(self, connection):
51
"""Do a find() on the connection and return which host was used
53
cursor = connection.pymongo_test.test.find()
55
return cursor._Cursor__connection_id
57
def read_from_which_kind(self, connection):
58
"""Do a find() on the connection and return 'primary' or 'secondary'
59
depending on which the connection used.
61
connection_id = self.read_from_which_host(connection)
62
if connection_id == connection.primary:
64
elif connection_id in connection.secondaries:
68
'Cursor used connection id %s, expected either primary '
69
'%s or secondaries %s' % (
70
connection_id, connection.primary, connection.secondaries))
72
def assertReadsFrom(self, expected, **kwargs):
73
c = self._get_connection(**kwargs)
74
used = self.read_from_which_kind(c)
75
self.assertEqual(expected, used, 'Cursor used %s, expected %s' % (
79
class TestReadPreferences(TestReadPreferencesBase):
80
def test_mode_validation(self):
81
# 'modes' are imported from read_preferences.py
83
self.assertEqual(mode, self._get_connection(
84
read_preference=mode).read_preference)
86
self.assertRaises(ConfigurationError, self._get_connection,
87
read_preference='foo')
89
def test_tag_sets_validation(self):
90
# Can't use tags with PRIMARY
91
self.assertRaises(ConfigurationError, self._get_connection,
92
tag_sets=[{'k': 'v'}])
94
# ... but empty tag sets are ok with PRIMARY
95
self.assertEqual([{}], self._get_connection(tag_sets=[{}]).tag_sets)
97
S = ReadPreference.SECONDARY
98
self.assertEqual([{}], self._get_connection(read_preference=S).tag_sets)
100
self.assertEqual([{'k': 'v'}], self._get_connection(
101
read_preference=S, tag_sets=[{'k': 'v'}]).tag_sets)
103
self.assertEqual([{'k': 'v'}, {}], self._get_connection(
104
read_preference=S, tag_sets=[{'k': 'v'}, {}]).tag_sets)
106
self.assertRaises(ConfigurationError, self._get_connection,
107
read_preference=S, tag_sets=[])
109
# One dict not ok, must be a list of dicts
110
self.assertRaises(ConfigurationError, self._get_connection,
111
read_preference=S, tag_sets={'k': 'v'})
113
self.assertRaises(ConfigurationError, self._get_connection,
114
read_preference=S, tag_sets='foo')
116
self.assertRaises(ConfigurationError, self._get_connection,
117
read_preference=S, tag_sets=['foo'])
119
def test_latency_validation(self):
120
self.assertEqual(17, self._get_connection(
121
secondary_acceptable_latency_ms=17
122
).secondary_acceptable_latency_ms)
124
self.assertEqual(42, self._get_connection(
125
secondaryAcceptableLatencyMS=42
126
).secondary_acceptable_latency_ms)
128
self.assertEqual(666, self._get_connection(
129
secondaryacceptablelatencyms=666
130
).secondary_acceptable_latency_ms)
132
def test_primary(self):
133
self.assertReadsFrom('primary',
134
read_preference=ReadPreference.PRIMARY)
136
def test_primary_with_tags(self):
137
# Tags not allowed with PRIMARY
138
self.assertRaises(ConfigurationError,
139
self._get_connection, tag_sets=[{'dc': 'ny'}])
141
def test_primary_preferred(self):
142
self.assertReadsFrom('primary',
143
read_preference=ReadPreference.PRIMARY_PREFERRED)
145
def test_secondary(self):
146
self.assertReadsFrom('secondary',
147
read_preference=ReadPreference.SECONDARY)
149
def test_secondary_preferred(self):
150
self.assertReadsFrom('secondary',
151
read_preference=ReadPreference.SECONDARY_PREFERRED)
153
def test_secondary_only(self):
154
# Test deprecated mode SECONDARY_ONLY, which is now a synonym for
157
ReadPreference.SECONDARY, ReadPreference.SECONDARY_ONLY)
159
def test_nearest(self):
160
# With high secondaryAcceptableLatencyMS, expect to read from any
162
c = self._get_connection(
163
read_preference=ReadPreference.NEAREST,
164
secondaryAcceptableLatencyMS=10000, # 10 seconds
165
auto_start_request=False)
167
data_members = set(self.hosts).difference(set(self.arbiters))
169
# This is a probabilistic test; track which members we've read from so
170
# far, and keep reading until we've used all the members or give up.
171
# Chance of using only 2 of 3 members 10k times if there's no bug =
172
# 3 * (2/3)**10000, very low.
175
while data_members.difference(used) and i < 10000:
176
host = self.read_from_which_host(c)
180
not_used = data_members.difference(used)
181
latencies = ', '.join(
182
'%s: %dms' % (member.host, member.ping_time.get())
183
for member in c._MongoReplicaSetClient__members.values())
185
self.assertFalse(not_used,
186
"Expected to use primary and all secondaries for mode NEAREST,"
187
" but didn't use %s\nlatencies: %s" % (not_used, latencies))
190
class ReadPrefTester(ReplicaSetConnection):
191
def __init__(self, *args, **kwargs):
192
self.has_read_from = set()
193
super(ReadPrefTester, self).__init__(*args, **kwargs)
195
def _MongoReplicaSetClient__send_and_receive(self, member, *args, **kwargs):
196
self.has_read_from.add(member)
197
rsc = super(ReadPrefTester, self)
198
return rsc._MongoReplicaSetClient__send_and_receive(
199
member, *args, **kwargs)
202
class TestCommandAndReadPreference(TestConnectionReplicaSetBase):
204
super(TestCommandAndReadPreference, self).setUp()
206
# Need auto_start_request False to avoid pinning members.
207
self.c = ReadPrefTester(
208
'%s:%s' % (host, port),
209
replicaSet=self.name, auto_start_request=False,
210
# Effectively ignore members' ping times so we can test the effect
211
# of ReadPreference modes only
212
secondary_acceptable_latency_ms=1000*1000)
217
# We create a lot of collections and indexes in these tests, so drop
219
self._get_connection().drop_database('pymongo_test')
220
super(TestCommandAndReadPreference, self).tearDown()
222
def executed_on_which_member(self, connection, fn, *args, **kwargs):
223
connection.has_read_from.clear()
225
self.assertEqual(1, len(connection.has_read_from))
226
member, = connection.has_read_from
229
def assertExecutedOn(self, state, connection, fn, *args, **kwargs):
230
member = self.executed_on_which_member(connection, fn, *args, **kwargs)
231
if state == 'primary':
232
self.assertTrue(member.is_primary)
233
elif state == 'secondary':
234
self.assertFalse(member.is_primary)
236
self.fail("Bad state %s" % repr(state))
238
def _test_fn(self, obedient, fn):
241
self.c.read_preference = mode
243
# Run it a few times to make sure we don't just get lucky the
246
self.assertExecutedOn('primary', self.c, fn)
248
for mode, expected_state in [
249
(ReadPreference.PRIMARY, 'primary'),
250
(ReadPreference.PRIMARY_PREFERRED, 'primary'),
251
(ReadPreference.SECONDARY, 'secondary'),
252
(ReadPreference.SECONDARY_PREFERRED, 'secondary'),
253
(ReadPreference.NEAREST, 'any'),
255
self.c.read_preference = mode
257
if expected_state in ('primary', 'secondary'):
258
self.assertExecutedOn(expected_state, self.c, fn)
259
elif expected_state == 'any':
261
for _ in range(1000):
262
member = self.executed_on_which_member(
264
used.add(member.host)
265
if len(used) == len(self.c.secondaries) + 1:
269
unused = self.c.secondaries.union(
270
set([self.c.primary])
274
"Some members not used for NEAREST: %s" % (
277
def test_command(self):
278
# Test generic 'command' method. Some commands obey read preference,
280
# Disobedient commands, always go to primary
281
self._test_fn(False, lambda: self.c.pymongo_test.command('ping'))
282
self._test_fn(False, lambda: self.c.admin.command('buildinfo'))
285
self._test_fn(True, lambda: self.c.pymongo_test.command('group', {
286
'ns': 'test', 'key': {'a': 1}, '$reduce': 'function(obj, prev) { }',
289
self._test_fn(True, lambda: self.c.pymongo_test.command('dbStats'))
291
# collStats fails if no collection
292
self.c.pymongo_test.test.insert({}, w=self.w)
293
self._test_fn(True, lambda: self.c.pymongo_test.command(
294
'collStats', 'test'))
297
self._test_fn(True, lambda: self.c.pymongo_test.command(
299
self._test_fn(True, lambda: self.c.pymongo_test.command(
300
'count', 'test', query={'a': 1}))
301
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
302
('count', 'test'), ('query', {'a': 1})])))
305
self._test_fn(True, lambda: self.c.pymongo_test.command(
306
'distinct', 'test', key={'a': 1}))
307
self._test_fn(True, lambda: self.c.pymongo_test.command(
308
'distinct', 'test', key={'a': 1}, query={'a': 1}))
309
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
310
('distinct', 'test'), ('key', {'a': 1}), ('query', {'a': 1})])))
312
# Geo stuff. Make sure a 2d index is created and replicated
313
self.c.pymongo_test.system.indexes.insert({
314
'key' : { 'location' : '2d' }, 'ns' : 'pymongo_test.test',
315
'name' : 'location_2d' }, w=self.w)
317
self.c.pymongo_test.system.indexes.insert(SON([
318
('ns', 'pymongo_test.test'),
319
('key', SON([('location', 'geoHaystack'), ('key', 1)])),
321
('name', 'location_geoHaystack'),
324
self._test_fn(True, lambda: self.c.pymongo_test.command(
325
'geoNear', 'test', near=[0, 0]))
326
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
327
('geoNear', 'test'), ('near', [0, 0])])))
329
self._test_fn(True, lambda: self.c.pymongo_test.command(
330
'geoSearch', 'test', near=[33, 33], maxDistance=6,
331
search={'type': 'restaurant' }, limit=30))
333
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
334
('geoSearch', 'test'), ('near', [33, 33]), ('maxDistance', 6),
335
('search', {'type': 'restaurant'}), ('limit', 30)])))
337
if version.at_least(self.c, (2, 1, 0)):
338
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
339
('aggregate', 'test'),
343
def test_map_reduce_command(self):
344
# mapreduce fails if no collection
345
self.c.pymongo_test.test.insert({}, w=self.w)
347
# Non-inline mapreduce always goes to primary, doesn't obey read prefs.
348
# Test with command in a SON and with kwargs
349
self._test_fn(False, lambda: self.c.pymongo_test.command(SON([
350
('mapreduce', 'test'),
351
('map', 'function() { }'),
352
('reduce', 'function() { }'),
356
self._test_fn(False, lambda: self.c.pymongo_test.command(
357
'mapreduce', 'test', map='function() { }',
358
reduce='function() { }', out='mr_out'))
360
self._test_fn(False, lambda: self.c.pymongo_test.command(
361
'mapreduce', 'test', map='function() { }',
362
reduce='function() { }', out={'replace': 'some_collection'}))
364
# Inline mapreduce obeys read prefs
365
self._test_fn(True, lambda: self.c.pymongo_test.command(
366
'mapreduce', 'test', map='function() { }',
367
reduce='function() { }', out={'inline': True}))
369
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
370
('mapreduce', 'test'),
371
('map', 'function() { }'),
372
('reduce', 'function() { }'),
373
('out', {'inline': True})
376
def test_create_collection(self):
377
# Collections should be created on primary, obviously
378
self._test_fn(False, lambda: self.c.pymongo_test.command(
379
'create', 'some_collection%s' % random.randint(0, sys.maxint)))
381
self._test_fn(False, lambda: self.c.pymongo_test.create_collection(
382
'some_collection%s' % random.randint(0, sys.maxint)))
384
def test_drop_collection(self):
385
self._test_fn(False, lambda: self.c.pymongo_test.drop_collection(
388
self._test_fn(False, lambda: self.c.pymongo_test.some_collection.drop())
390
def test_group(self):
391
self._test_fn(True, lambda: self.c.pymongo_test.test.group(
392
{'a': 1}, {}, {}, 'function() { }'))
394
def test_map_reduce(self):
395
# mapreduce fails if no collection
396
self.c.pymongo_test.test.insert({}, w=self.w)
398
self._test_fn(False, lambda: self.c.pymongo_test.test.map_reduce(
399
'function() { }', 'function() { }', 'mr_out'))
401
self._test_fn(True, lambda: self.c.pymongo_test.test.map_reduce(
402
'function() { }', 'function() { }', {'inline': 1}))
404
def test_inline_map_reduce(self):
405
# mapreduce fails if no collection
406
self.c.pymongo_test.test.insert({}, w=self.w)
408
self._test_fn(True, lambda: self.c.pymongo_test.test.inline_map_reduce(
409
'function() { }', 'function() { }'))
411
self._test_fn(True, lambda: self.c.pymongo_test.test.inline_map_reduce(
412
'function() { }', 'function() { }', full_response=True))
414
def test_count(self):
415
self._test_fn(True, lambda: self.c.pymongo_test.test.count())
416
self._test_fn(True, lambda: self.c.pymongo_test.test.find().count())
418
def test_distinct(self):
419
self._test_fn(True, lambda: self.c.pymongo_test.test.distinct('a'))
421
lambda: self.c.pymongo_test.test.find().distinct('a'))
423
def test_aggregate(self):
424
if version.at_least(self.c, (2, 1, 0)):
425
self._test_fn(True, lambda: self.c.pymongo_test.test.aggregate([]))
428
class TestMovingAverage(unittest.TestCase):
429
def test_empty_moving_average(self):
430
avg = MovingAverage(0)
431
self.assertEqual(None, avg.get())
433
self.assertEqual(None, avg.get())
435
def test_trivial_moving_average(self):
436
avg = MovingAverage(1)
437
self.assertEqual(None, avg.get())
439
self.assertEqual(10, avg.get())
441
self.assertEqual(20, avg.get())
443
self.assertEqual(0, avg.get())
445
def test_2_sample_moving_average(self):
446
avg = MovingAverage(2)
447
self.assertEqual(None, avg.get())
449
self.assertEqual(10, avg.get())
451
self.assertEqual(15, avg.get())
453
self.assertEqual(25, avg.get())
455
self.assertEqual(-35, avg.get())
457
def test_5_sample_moving_average(self):
458
avg = MovingAverage(5)
459
self.assertEqual(None, avg.get())
461
self.assertEqual(10, avg.get())
463
self.assertEqual(15, avg.get())
465
self.assertEqual(20, avg.get())
467
self.assertEqual((10 + 20 + 30 - 100) / 4, avg.get())
469
self.assertEqual((10 + 20 + 30 - 100 + 17) / 5., avg.get())
471
self.assertEqual((20 + 30 - 100 + 17 + 43) / 5., avg.get())
473
self.assertEqual((30 - 100 + 17 + 43 - 1111) / 5., avg.get())
476
class TestMongosConnection(unittest.TestCase):
477
def test_mongos_connection(self):
479
is_mongos = utils.is_mongos(c)
481
# Test default mode, PRIMARY
482
cursor = c.pymongo_test.test.find()
484
# We only set $readPreference if it's something other than
485
# PRIMARY to avoid problems with mongos versions that don't
486
# support read preferences.
489
cursor._Cursor__query_spec().get('$readPreference')
493
'$readPreference' in cursor._Cursor__query_spec())
495
# Copy these constants for brevity
496
PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
497
SECONDARY = ReadPreference.SECONDARY
498
SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
499
NEAREST = ReadPreference.NEAREST
500
SLAVE_OKAY = _QUERY_OPTIONS['slave_okay']
502
# Test non-PRIMARY modes which can be combined with tags
503
for kwarg, value, mongos_mode in (
504
('read_preference', PRIMARY_PREFERRED, 'primaryPreferred'),
505
('read_preference', SECONDARY, 'secondary'),
506
('read_preference', SECONDARY_PREFERRED, 'secondaryPreferred'),
507
('read_preference', NEAREST, 'nearest'),
508
('slave_okay', True, 'secondaryPreferred'),
509
('slave_okay', False, 'primary')
514
# Create a connection e.g. with read_preference=NEAREST or
516
c = get_connection(tag_sets=tag_sets, **{kwarg: value})
518
self.assertEqual(is_mongos, c.is_mongos)
519
cursor = c.pymongo_test.test.find()
521
# We don't set $readPreference for SECONDARY_PREFERRED
522
# unless tags are in use. slaveOkay has the same effect.
523
if mongos_mode == 'secondaryPreferred':
526
cursor._Cursor__query_spec().get('$readPreference'))
529
cursor._Cursor__query_options() & SLAVE_OKAY)
531
# Don't send $readPreference for PRIMARY either
532
elif mongos_mode == 'primary':
535
cursor._Cursor__query_spec().get('$readPreference'))
538
cursor._Cursor__query_options() & SLAVE_OKAY)
541
{'mode': mongos_mode},
542
cursor._Cursor__query_spec().get('$readPreference'))
545
cursor._Cursor__query_options() & SLAVE_OKAY)
548
'$readPreference' in cursor._Cursor__query_spec())
552
[{'dc': 'la'}, {'dc': 'sf'}],
553
[{'dc': 'la'}, {'dc': 'sf'}, {}],
555
if kwarg == 'slave_okay':
556
# Can't use tags with slave_okay True or False, need a
557
# real read preference
560
get_connection, tag_sets=tag_sets, **{kwarg: value})
564
c = get_connection(tag_sets=tag_sets, **{kwarg: value})
566
self.assertEqual(is_mongos, c.is_mongos)
567
cursor = c.pymongo_test.test.find()
570
{'mode': mongos_mode, 'tags': tag_sets},
571
cursor._Cursor__query_spec().get('$readPreference'))
574
'$readPreference' in cursor._Cursor__query_spec())
576
def test_only_secondary_ok_commands_have_read_prefs(self):
577
c = get_connection(read_preference=ReadPreference.SECONDARY)
578
is_mongos = utils.is_mongos(c)
580
raise SkipTest("Only mongos have read_prefs added to the spec")
582
# Ensure secondary_ok_commands have readPreference
583
for cmd in secondary_ok_commands:
584
if cmd == 'mapreduce': # map reduce is a special case
586
command = SON([(cmd, 1)])
587
cursor = c.pymongo_test["$cmd"].find(command.copy())
588
# White-listed commands also have to be wrapped in $query
589
command = SON([('$query', command)])
590
command['$readPreference'] = {'mode': 'secondary'}
591
self.assertEqual(command, cursor._Cursor__query_spec())
593
# map_reduce inline should have read prefs
594
command = SON([('mapreduce', 'test'), ('out', {'inline': 1})])
595
cursor = c.pymongo_test["$cmd"].find(command.copy())
596
# White-listed commands also have to be wrapped in $query
597
command = SON([('$query', command)])
598
command['$readPreference'] = {'mode': 'secondary'}
599
self.assertEqual(command, cursor._Cursor__query_spec())
601
# map_reduce that outputs to a collection shouldn't have read prefs
602
command = SON([('mapreduce', 'test'), ('out', {'mrtest': 1})])
603
cursor = c.pymongo_test["$cmd"].find(command.copy())
604
self.assertEqual(command, cursor._Cursor__query_spec())
606
# Other commands shouldn't be changed
607
for cmd in ('drop', 'create', 'any-future-cmd'):
608
command = SON([(cmd, 1)])
609
cursor = c.pymongo_test["$cmd"].find(command.copy())
610
self.assertEqual(command, cursor._Cursor__query_spec())
612
if __name__ == "__main__":