~crunch.io/ubuntu/precise/pymongo/unstable

« back to all changes in this revision

Viewing changes to test/test_read_preferences.py

  • Committer: Joseph Tate
  • Date: 2013-01-31 08:00:57 UTC
  • mfrom: (1.1.12)
  • Revision ID: jtate@dragonstrider.com-20130131080057-y7lv17xi6x8c1j5x
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2011-2012 10gen, Inc.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
# http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
12
# See the License for the specific language governing permissions and
 
13
# limitations under the License.
 
14
 
 
15
"""Test the replica_set_connection module."""
 
16
import random
 
17
 
 
18
import sys
 
19
import unittest
 
20
 
 
21
from nose.plugins.skip import SkipTest
 
22
 
 
23
sys.path[0:0] = [""]
 
24
 
 
25
from bson.son import SON
 
26
from pymongo.cursor import _QUERY_OPTIONS
 
27
from pymongo.replica_set_connection import ReplicaSetConnection
 
28
from pymongo.read_preferences import (ReadPreference, modes, MovingAverage,
 
29
                                      secondary_ok_commands)
 
30
from pymongo.errors import ConfigurationError
 
31
 
 
32
from test.test_replica_set_connection import TestConnectionReplicaSetBase
 
33
from test.test_connection import get_connection, host, port
 
34
from test import version, utils
 
35
 
 
36
 
 
37
class TestReadPreferencesBase(TestConnectionReplicaSetBase):
 
38
    def setUp(self):
 
39
        super(TestReadPreferencesBase, self).setUp()
 
40
        # Insert some data so we can use cursors in read_from_which_host
 
41
        c = self._get_connection()
 
42
        c.pymongo_test.test.drop()
 
43
        c.pymongo_test.test.insert([{'_id': i} for i in range(10)], w=self.w)
 
44
 
 
45
    def tearDown(self):
 
46
        super(TestReadPreferencesBase, self).tearDown()
 
47
        c = self._get_connection()
 
48
        c.pymongo_test.test.drop()
 
49
 
 
50
    def read_from_which_host(self, connection):
 
51
        """Do a find() on the connection and return which host was used
 
52
        """
 
53
        cursor = connection.pymongo_test.test.find()
 
54
        cursor.next()
 
55
        return cursor._Cursor__connection_id
 
56
 
 
57
    def read_from_which_kind(self, connection):
 
58
        """Do a find() on the connection and return 'primary' or 'secondary'
 
59
           depending on which the connection used.
 
60
        """
 
61
        connection_id = self.read_from_which_host(connection)
 
62
        if connection_id == connection.primary:
 
63
            return 'primary'
 
64
        elif connection_id in connection.secondaries:
 
65
            return 'secondary'
 
66
        else:
 
67
            self.fail(
 
68
                'Cursor used connection id %s, expected either primary '
 
69
                '%s or secondaries %s' % (
 
70
                    connection_id, connection.primary, connection.secondaries))
 
71
 
 
72
    def assertReadsFrom(self, expected, **kwargs):
 
73
        c = self._get_connection(**kwargs)
 
74
        used = self.read_from_which_kind(c)
 
75
        self.assertEqual(expected, used, 'Cursor used %s, expected %s' % (
 
76
            expected, used))
 
77
 
 
78
 
 
79
class TestReadPreferences(TestReadPreferencesBase):
 
80
    def test_mode_validation(self):
 
81
        # 'modes' are imported from read_preferences.py
 
82
        for mode in modes:
 
83
            self.assertEqual(mode, self._get_connection(
 
84
                read_preference=mode).read_preference)
 
85
 
 
86
        self.assertRaises(ConfigurationError, self._get_connection,
 
87
            read_preference='foo')
 
88
 
 
89
    def test_tag_sets_validation(self):
 
90
        # Can't use tags with PRIMARY
 
91
        self.assertRaises(ConfigurationError, self._get_connection,
 
92
            tag_sets=[{'k': 'v'}])
 
93
 
 
94
        # ... but empty tag sets are ok with PRIMARY
 
95
        self.assertEqual([{}], self._get_connection(tag_sets=[{}]).tag_sets)
 
96
 
 
97
        S = ReadPreference.SECONDARY
 
98
        self.assertEqual([{}], self._get_connection(read_preference=S).tag_sets)
 
99
 
 
100
        self.assertEqual([{'k': 'v'}], self._get_connection(
 
101
            read_preference=S, tag_sets=[{'k': 'v'}]).tag_sets)
 
102
 
 
103
        self.assertEqual([{'k': 'v'}, {}], self._get_connection(
 
104
            read_preference=S, tag_sets=[{'k': 'v'}, {}]).tag_sets)
 
105
 
 
106
        self.assertRaises(ConfigurationError, self._get_connection,
 
107
            read_preference=S, tag_sets=[])
 
108
 
 
109
        # One dict not ok, must be a list of dicts
 
110
        self.assertRaises(ConfigurationError, self._get_connection,
 
111
            read_preference=S, tag_sets={'k': 'v'})
 
112
 
 
113
        self.assertRaises(ConfigurationError, self._get_connection,
 
114
            read_preference=S, tag_sets='foo')
 
115
 
 
116
        self.assertRaises(ConfigurationError, self._get_connection,
 
117
            read_preference=S, tag_sets=['foo'])
 
118
 
 
119
    def test_latency_validation(self):
 
120
        self.assertEqual(17, self._get_connection(
 
121
            secondary_acceptable_latency_ms=17
 
122
        ).secondary_acceptable_latency_ms)
 
123
 
 
124
        self.assertEqual(42, self._get_connection(
 
125
            secondaryAcceptableLatencyMS=42
 
126
        ).secondary_acceptable_latency_ms)
 
127
 
 
128
        self.assertEqual(666, self._get_connection(
 
129
            secondaryacceptablelatencyms=666
 
130
        ).secondary_acceptable_latency_ms)
 
131
 
 
132
    def test_primary(self):
 
133
        self.assertReadsFrom('primary',
 
134
            read_preference=ReadPreference.PRIMARY)
 
135
 
 
136
    def test_primary_with_tags(self):
 
137
        # Tags not allowed with PRIMARY
 
138
        self.assertRaises(ConfigurationError,
 
139
            self._get_connection, tag_sets=[{'dc': 'ny'}])
 
140
 
 
141
    def test_primary_preferred(self):
 
142
        self.assertReadsFrom('primary',
 
143
            read_preference=ReadPreference.PRIMARY_PREFERRED)
 
144
 
 
145
    def test_secondary(self):
 
146
        self.assertReadsFrom('secondary',
 
147
            read_preference=ReadPreference.SECONDARY)
 
148
 
 
149
    def test_secondary_preferred(self):
 
150
        self.assertReadsFrom('secondary',
 
151
            read_preference=ReadPreference.SECONDARY_PREFERRED)
 
152
 
 
153
    def test_secondary_only(self):
 
154
        # Test deprecated mode SECONDARY_ONLY, which is now a synonym for
 
155
        # SECONDARY
 
156
        self.assertEqual(
 
157
            ReadPreference.SECONDARY, ReadPreference.SECONDARY_ONLY)
 
158
 
 
159
    def test_nearest(self):
 
160
        # With high secondaryAcceptableLatencyMS, expect to read from any
 
161
        # member
 
162
        c = self._get_connection(
 
163
            read_preference=ReadPreference.NEAREST,
 
164
            secondaryAcceptableLatencyMS=10000, # 10 seconds
 
165
            auto_start_request=False)
 
166
 
 
167
        data_members = set(self.hosts).difference(set(self.arbiters))
 
168
 
 
169
        # This is a probabilistic test; track which members we've read from so
 
170
        # far, and keep reading until we've used all the members or give up.
 
171
        # Chance of using only 2 of 3 members 10k times if there's no bug =
 
172
        # 3 * (2/3)**10000, very low.
 
173
        used = set()
 
174
        i = 0
 
175
        while data_members.difference(used) and i < 10000:
 
176
            host = self.read_from_which_host(c)
 
177
            used.add(host)
 
178
            i += 1
 
179
 
 
180
        not_used = data_members.difference(used)
 
181
        latencies = ', '.join(
 
182
            '%s: %dms' % (member.host, member.ping_time.get())
 
183
            for member in c._MongoReplicaSetClient__members.values())
 
184
 
 
185
        self.assertFalse(not_used,
 
186
            "Expected to use primary and all secondaries for mode NEAREST,"
 
187
            " but didn't use %s\nlatencies: %s" % (not_used, latencies))
 
188
 
 
189
 
 
190
class ReadPrefTester(ReplicaSetConnection):
 
191
    def __init__(self, *args, **kwargs):
 
192
        self.has_read_from = set()
 
193
        super(ReadPrefTester, self).__init__(*args, **kwargs)
 
194
 
 
195
    def _MongoReplicaSetClient__send_and_receive(self, member, *args, **kwargs):
 
196
        self.has_read_from.add(member)
 
197
        rsc = super(ReadPrefTester, self)
 
198
        return rsc._MongoReplicaSetClient__send_and_receive(
 
199
            member, *args, **kwargs)
 
200
 
 
201
 
 
202
class TestCommandAndReadPreference(TestConnectionReplicaSetBase):
 
203
    def setUp(self):
 
204
        super(TestCommandAndReadPreference, self).setUp()
 
205
 
 
206
        # Need auto_start_request False to avoid pinning members.
 
207
        self.c = ReadPrefTester(
 
208
            '%s:%s' % (host, port),
 
209
            replicaSet=self.name, auto_start_request=False,
 
210
            # Effectively ignore members' ping times so we can test the effect
 
211
            # of ReadPreference modes only
 
212
            secondary_acceptable_latency_ms=1000*1000)
 
213
 
 
214
    def tearDown(self):
 
215
        self.c.close()
 
216
 
 
217
        # We create a lot of collections and indexes in these tests, so drop
 
218
        # the database
 
219
        self._get_connection().drop_database('pymongo_test')
 
220
        super(TestCommandAndReadPreference, self).tearDown()
 
221
 
 
222
    def executed_on_which_member(self, connection, fn, *args, **kwargs):
 
223
        connection.has_read_from.clear()
 
224
        fn(*args, **kwargs)
 
225
        self.assertEqual(1, len(connection.has_read_from))
 
226
        member, = connection.has_read_from
 
227
        return member
 
228
 
 
229
    def assertExecutedOn(self, state, connection, fn, *args, **kwargs):
 
230
        member = self.executed_on_which_member(connection, fn, *args, **kwargs)
 
231
        if state == 'primary':
 
232
            self.assertTrue(member.is_primary)
 
233
        elif state == 'secondary':
 
234
            self.assertFalse(member.is_primary)
 
235
        else:
 
236
            self.fail("Bad state %s" % repr(state))
 
237
 
 
238
    def _test_fn(self, obedient, fn):
 
239
        if not obedient:
 
240
            for mode in modes:
 
241
                self.c.read_preference = mode
 
242
 
 
243
                # Run it a few times to make sure we don't just get lucky the
 
244
                # first time.
 
245
                for _ in range(10):
 
246
                    self.assertExecutedOn('primary', self.c, fn)
 
247
        else:
 
248
            for mode, expected_state in [
 
249
                (ReadPreference.PRIMARY, 'primary'),
 
250
                (ReadPreference.PRIMARY_PREFERRED, 'primary'),
 
251
                (ReadPreference.SECONDARY, 'secondary'),
 
252
                (ReadPreference.SECONDARY_PREFERRED, 'secondary'),
 
253
                (ReadPreference.NEAREST, 'any'),
 
254
            ]:
 
255
                self.c.read_preference = mode
 
256
                for _ in range(10):
 
257
                    if expected_state in ('primary', 'secondary'):
 
258
                        self.assertExecutedOn(expected_state, self.c, fn)
 
259
                    elif expected_state == 'any':
 
260
                        used = set()
 
261
                        for _ in range(1000):
 
262
                            member = self.executed_on_which_member(
 
263
                                self.c, fn)
 
264
                            used.add(member.host)
 
265
                            if len(used) == len(self.c.secondaries) + 1:
 
266
                                # Success
 
267
                                break
 
268
 
 
269
                        unused = self.c.secondaries.union(
 
270
                            set([self.c.primary])
 
271
                        ).difference(used)
 
272
                        if unused:
 
273
                            self.fail(
 
274
                                "Some members not used for NEAREST: %s" % (
 
275
                                    unused))
 
276
 
 
277
    def test_command(self):
 
278
        # Test generic 'command' method. Some commands obey read preference,
 
279
        # most don't.
 
280
        # Disobedient commands, always go to primary
 
281
        self._test_fn(False, lambda: self.c.pymongo_test.command('ping'))
 
282
        self._test_fn(False, lambda: self.c.admin.command('buildinfo'))
 
283
 
 
284
        # Obedient commands.
 
285
        self._test_fn(True, lambda: self.c.pymongo_test.command('group', {
 
286
            'ns': 'test', 'key': {'a': 1}, '$reduce': 'function(obj, prev) { }',
 
287
            'initial': {}}))
 
288
 
 
289
        self._test_fn(True, lambda: self.c.pymongo_test.command('dbStats'))
 
290
 
 
291
        # collStats fails if no collection
 
292
        self.c.pymongo_test.test.insert({}, w=self.w)
 
293
        self._test_fn(True, lambda: self.c.pymongo_test.command(
 
294
            'collStats', 'test'))
 
295
 
 
296
        # Count
 
297
        self._test_fn(True, lambda: self.c.pymongo_test.command(
 
298
            'count', 'test'))
 
299
        self._test_fn(True, lambda: self.c.pymongo_test.command(
 
300
            'count', 'test', query={'a': 1}))
 
301
        self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
 
302
            ('count', 'test'), ('query', {'a': 1})])))
 
303
 
 
304
        # Distinct
 
305
        self._test_fn(True, lambda: self.c.pymongo_test.command(
 
306
            'distinct', 'test', key={'a': 1}))
 
307
        self._test_fn(True, lambda: self.c.pymongo_test.command(
 
308
            'distinct', 'test', key={'a': 1}, query={'a': 1}))
 
309
        self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
 
310
            ('distinct', 'test'), ('key', {'a': 1}), ('query', {'a': 1})])))
 
311
 
 
312
        # Geo stuff. Make sure a 2d index is created and replicated
 
313
        self.c.pymongo_test.system.indexes.insert({
 
314
            'key' : { 'location' : '2d' }, 'ns' : 'pymongo_test.test',
 
315
            'name' : 'location_2d' }, w=self.w)
 
316
 
 
317
        self.c.pymongo_test.system.indexes.insert(SON([
 
318
            ('ns', 'pymongo_test.test'),
 
319
            ('key', SON([('location', 'geoHaystack'), ('key', 1)])),
 
320
            ('bucketSize', 100),
 
321
            ('name', 'location_geoHaystack'),
 
322
        ]), w=self.w)
 
323
 
 
324
        self._test_fn(True, lambda: self.c.pymongo_test.command(
 
325
            'geoNear', 'test', near=[0, 0]))
 
326
        self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
 
327
            ('geoNear', 'test'), ('near', [0, 0])])))
 
328
 
 
329
        self._test_fn(True, lambda: self.c.pymongo_test.command(
 
330
            'geoSearch', 'test', near=[33, 33], maxDistance=6,
 
331
            search={'type': 'restaurant' }, limit=30))
 
332
 
 
333
        self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
 
334
            ('geoSearch', 'test'), ('near', [33, 33]), ('maxDistance', 6),
 
335
            ('search', {'type': 'restaurant'}), ('limit', 30)])))
 
336
 
 
337
        if version.at_least(self.c, (2, 1, 0)):
 
338
            self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
 
339
                ('aggregate', 'test'),
 
340
                ('pipeline', [])
 
341
            ])))
 
342
 
 
343
    def test_map_reduce_command(self):
 
344
        # mapreduce fails if no collection
 
345
        self.c.pymongo_test.test.insert({}, w=self.w)
 
346
 
 
347
        # Non-inline mapreduce always goes to primary, doesn't obey read prefs.
 
348
        # Test with command in a SON and with kwargs
 
349
        self._test_fn(False, lambda: self.c.pymongo_test.command(SON([
 
350
            ('mapreduce', 'test'),
 
351
            ('map', 'function() { }'),
 
352
            ('reduce', 'function() { }'),
 
353
            ('out', 'mr_out')
 
354
        ])))
 
355
 
 
356
        self._test_fn(False, lambda: self.c.pymongo_test.command(
 
357
            'mapreduce', 'test', map='function() { }',
 
358
            reduce='function() { }', out='mr_out'))
 
359
 
 
360
        self._test_fn(False, lambda: self.c.pymongo_test.command(
 
361
            'mapreduce', 'test', map='function() { }',
 
362
            reduce='function() { }', out={'replace': 'some_collection'}))
 
363
 
 
364
        # Inline mapreduce obeys read prefs
 
365
        self._test_fn(True, lambda: self.c.pymongo_test.command(
 
366
            'mapreduce', 'test', map='function() { }',
 
367
            reduce='function() { }', out={'inline': True}))
 
368
 
 
369
        self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
 
370
            ('mapreduce', 'test'),
 
371
            ('map', 'function() { }'),
 
372
            ('reduce', 'function() { }'),
 
373
            ('out', {'inline': True})
 
374
        ])))
 
375
 
 
376
    def test_create_collection(self):
 
377
        # Collections should be created on primary, obviously
 
378
        self._test_fn(False, lambda: self.c.pymongo_test.command(
 
379
            'create', 'some_collection%s' % random.randint(0, sys.maxint)))
 
380
 
 
381
        self._test_fn(False, lambda: self.c.pymongo_test.create_collection(
 
382
            'some_collection%s' % random.randint(0, sys.maxint)))
 
383
 
 
384
    def test_drop_collection(self):
 
385
        self._test_fn(False, lambda: self.c.pymongo_test.drop_collection(
 
386
            'some_collection'))
 
387
 
 
388
        self._test_fn(False, lambda: self.c.pymongo_test.some_collection.drop())
 
389
 
 
390
    def test_group(self):
 
391
        self._test_fn(True, lambda: self.c.pymongo_test.test.group(
 
392
            {'a': 1}, {}, {}, 'function() { }'))
 
393
 
 
394
    def test_map_reduce(self):
 
395
        # mapreduce fails if no collection
 
396
        self.c.pymongo_test.test.insert({}, w=self.w)
 
397
 
 
398
        self._test_fn(False, lambda: self.c.pymongo_test.test.map_reduce(
 
399
            'function() { }', 'function() { }', 'mr_out'))
 
400
 
 
401
        self._test_fn(True, lambda: self.c.pymongo_test.test.map_reduce(
 
402
            'function() { }', 'function() { }', {'inline': 1}))
 
403
 
 
404
    def test_inline_map_reduce(self):
 
405
        # mapreduce fails if no collection
 
406
        self.c.pymongo_test.test.insert({}, w=self.w)
 
407
 
 
408
        self._test_fn(True, lambda: self.c.pymongo_test.test.inline_map_reduce(
 
409
            'function() { }', 'function() { }'))
 
410
 
 
411
        self._test_fn(True, lambda: self.c.pymongo_test.test.inline_map_reduce(
 
412
            'function() { }', 'function() { }', full_response=True))
 
413
 
 
414
    def test_count(self):
 
415
        self._test_fn(True, lambda: self.c.pymongo_test.test.count())
 
416
        self._test_fn(True, lambda: self.c.pymongo_test.test.find().count())
 
417
 
 
418
    def test_distinct(self):
 
419
        self._test_fn(True, lambda: self.c.pymongo_test.test.distinct('a'))
 
420
        self._test_fn(True,
 
421
            lambda: self.c.pymongo_test.test.find().distinct('a'))
 
422
 
 
423
    def test_aggregate(self):
 
424
        if version.at_least(self.c, (2, 1, 0)):
 
425
            self._test_fn(True, lambda: self.c.pymongo_test.test.aggregate([]))
 
426
 
 
427
 
 
428
class TestMovingAverage(unittest.TestCase):
 
429
    def test_empty_moving_average(self):
 
430
        avg = MovingAverage(0)
 
431
        self.assertEqual(None, avg.get())
 
432
        avg.update(10)
 
433
        self.assertEqual(None, avg.get())
 
434
 
 
435
    def test_trivial_moving_average(self):
 
436
        avg = MovingAverage(1)
 
437
        self.assertEqual(None, avg.get())
 
438
        avg.update(10)
 
439
        self.assertEqual(10, avg.get())
 
440
        avg.update(20)
 
441
        self.assertEqual(20, avg.get())
 
442
        avg.update(0)
 
443
        self.assertEqual(0, avg.get())
 
444
 
 
445
    def test_2_sample_moving_average(self):
 
446
        avg = MovingAverage(2)
 
447
        self.assertEqual(None, avg.get())
 
448
        avg.update(10)
 
449
        self.assertEqual(10, avg.get())
 
450
        avg.update(20)
 
451
        self.assertEqual(15, avg.get())
 
452
        avg.update(30)
 
453
        self.assertEqual(25, avg.get())
 
454
        avg.update(-100)
 
455
        self.assertEqual(-35, avg.get())
 
456
 
 
457
    def test_5_sample_moving_average(self):
 
458
        avg = MovingAverage(5)
 
459
        self.assertEqual(None, avg.get())
 
460
        avg.update(10)
 
461
        self.assertEqual(10, avg.get())
 
462
        avg.update(20)
 
463
        self.assertEqual(15, avg.get())
 
464
        avg.update(30)
 
465
        self.assertEqual(20, avg.get())
 
466
        avg.update(-100)
 
467
        self.assertEqual((10 + 20 + 30 - 100) / 4, avg.get())
 
468
        avg.update(17)
 
469
        self.assertEqual((10 + 20 + 30 - 100 + 17) / 5., avg.get())
 
470
        avg.update(43)
 
471
        self.assertEqual((20 + 30 - 100 + 17 + 43) / 5., avg.get())
 
472
        avg.update(-1111)
 
473
        self.assertEqual((30 - 100 + 17 + 43 - 1111) / 5., avg.get())
 
474
 
 
475
 
 
476
class TestMongosConnection(unittest.TestCase):
 
477
    def test_mongos_connection(self):
 
478
        c = get_connection()
 
479
        is_mongos = utils.is_mongos(c)
 
480
 
 
481
        # Test default mode, PRIMARY
 
482
        cursor = c.pymongo_test.test.find()
 
483
        if is_mongos:
 
484
            # We only set $readPreference if it's something other than
 
485
            # PRIMARY to avoid problems with mongos versions that don't
 
486
            # support read preferences.
 
487
            self.assertEqual(
 
488
                None,
 
489
                cursor._Cursor__query_spec().get('$readPreference')
 
490
            )
 
491
        else:
 
492
            self.assertFalse(
 
493
                '$readPreference' in cursor._Cursor__query_spec())
 
494
 
 
495
        # Copy these constants for brevity
 
496
        PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
 
497
        SECONDARY = ReadPreference.SECONDARY
 
498
        SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
 
499
        NEAREST = ReadPreference.NEAREST
 
500
        SLAVE_OKAY = _QUERY_OPTIONS['slave_okay']
 
501
 
 
502
        # Test non-PRIMARY modes which can be combined with tags
 
503
        for kwarg, value, mongos_mode in (
 
504
            ('read_preference', PRIMARY_PREFERRED, 'primaryPreferred'),
 
505
            ('read_preference', SECONDARY, 'secondary'),
 
506
            ('read_preference', SECONDARY_PREFERRED, 'secondaryPreferred'),
 
507
            ('read_preference', NEAREST, 'nearest'),
 
508
            ('slave_okay', True, 'secondaryPreferred'),
 
509
            ('slave_okay', False, 'primary')
 
510
        ):
 
511
            for tag_sets in (
 
512
                None, [{}]
 
513
            ):
 
514
                # Create a connection e.g. with read_preference=NEAREST or
 
515
                # slave_okay=True
 
516
                c = get_connection(tag_sets=tag_sets, **{kwarg: value})
 
517
 
 
518
                self.assertEqual(is_mongos, c.is_mongos)
 
519
                cursor = c.pymongo_test.test.find()
 
520
                if is_mongos:
 
521
                    # We don't set $readPreference for SECONDARY_PREFERRED
 
522
                    # unless tags are in use. slaveOkay has the same effect.
 
523
                    if mongos_mode == 'secondaryPreferred':
 
524
                        self.assertEqual(
 
525
                            None,
 
526
                            cursor._Cursor__query_spec().get('$readPreference'))
 
527
 
 
528
                        self.assertTrue(
 
529
                            cursor._Cursor__query_options() & SLAVE_OKAY)
 
530
 
 
531
                    # Don't send $readPreference for PRIMARY either
 
532
                    elif mongos_mode == 'primary':
 
533
                        self.assertEqual(
 
534
                            None,
 
535
                            cursor._Cursor__query_spec().get('$readPreference'))
 
536
 
 
537
                        self.assertFalse(
 
538
                            cursor._Cursor__query_options() & SLAVE_OKAY)
 
539
                    else:
 
540
                        self.assertEqual(
 
541
                            {'mode': mongos_mode},
 
542
                            cursor._Cursor__query_spec().get('$readPreference'))
 
543
 
 
544
                        self.assertTrue(
 
545
                            cursor._Cursor__query_options() & SLAVE_OKAY)
 
546
                else:
 
547
                    self.assertFalse(
 
548
                        '$readPreference' in cursor._Cursor__query_spec())
 
549
 
 
550
            for tag_sets in (
 
551
                [{'dc': 'la'}],
 
552
                [{'dc': 'la'}, {'dc': 'sf'}],
 
553
                [{'dc': 'la'}, {'dc': 'sf'}, {}],
 
554
            ):
 
555
                if kwarg == 'slave_okay':
 
556
                    # Can't use tags with slave_okay True or False, need a
 
557
                    # real read preference
 
558
                    self.assertRaises(
 
559
                        ConfigurationError,
 
560
                        get_connection, tag_sets=tag_sets, **{kwarg: value})
 
561
 
 
562
                    continue
 
563
 
 
564
                c = get_connection(tag_sets=tag_sets, **{kwarg: value})
 
565
 
 
566
                self.assertEqual(is_mongos, c.is_mongos)
 
567
                cursor = c.pymongo_test.test.find()
 
568
                if is_mongos:
 
569
                    self.assertEqual(
 
570
                        {'mode': mongos_mode, 'tags': tag_sets},
 
571
                        cursor._Cursor__query_spec().get('$readPreference'))
 
572
                else:
 
573
                    self.assertFalse(
 
574
                        '$readPreference' in cursor._Cursor__query_spec())
 
575
 
 
576
    def test_only_secondary_ok_commands_have_read_prefs(self):
 
577
        c = get_connection(read_preference=ReadPreference.SECONDARY)
 
578
        is_mongos = utils.is_mongos(c)
 
579
        if not is_mongos:
 
580
            raise SkipTest("Only mongos have read_prefs added to the spec")
 
581
 
 
582
        # Ensure secondary_ok_commands have readPreference
 
583
        for cmd in secondary_ok_commands:
 
584
            if cmd == 'mapreduce':  # map reduce is a special case
 
585
                continue
 
586
            command = SON([(cmd, 1)])
 
587
            cursor = c.pymongo_test["$cmd"].find(command.copy())
 
588
            # White-listed commands also have to be wrapped in $query
 
589
            command = SON([('$query', command)])
 
590
            command['$readPreference'] = {'mode': 'secondary'}
 
591
            self.assertEqual(command, cursor._Cursor__query_spec())
 
592
 
 
593
        # map_reduce inline should have read prefs
 
594
        command = SON([('mapreduce', 'test'), ('out', {'inline': 1})])
 
595
        cursor = c.pymongo_test["$cmd"].find(command.copy())
 
596
        # White-listed commands also have to be wrapped in $query
 
597
        command = SON([('$query', command)])
 
598
        command['$readPreference'] = {'mode': 'secondary'}
 
599
        self.assertEqual(command, cursor._Cursor__query_spec())
 
600
 
 
601
        # map_reduce that outputs to a collection shouldn't have read prefs
 
602
        command = SON([('mapreduce', 'test'), ('out', {'mrtest': 1})])
 
603
        cursor = c.pymongo_test["$cmd"].find(command.copy())
 
604
        self.assertEqual(command, cursor._Cursor__query_spec())
 
605
 
 
606
        # Other commands shouldn't be changed
 
607
        for cmd in ('drop', 'create', 'any-future-cmd'):
 
608
            command = SON([(cmd, 1)])
 
609
            cursor = c.pymongo_test["$cmd"].find(command.copy())
 
610
            self.assertEqual(command, cursor._Cursor__query_spec())
 
611
 
 
612
if __name__ == "__main__":
 
613
    unittest.main()