~ubuntu-branches/ubuntu/wily/pymongo/wily-proposed

« back to all changes in this revision

Viewing changes to test/test_server_selection.py

  • Committer: Package Import Robot
  • Author(s): Federico Ceratto
  • Date: 2015-04-26 22:43:13 UTC
  • mfrom: (24.1.5 sid)
  • Revision ID: package-import@ubuntu.com-20150426224313-0hga2jphvf0rrmfe
Tags: 3.0.1-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2015 MongoDB, Inc.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
# http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
12
# See the License for the specific language governing permissions and
 
13
# limitations under the License.
 
14
 
 
15
"""Test the topology module."""
 
16
 
 
17
import json
 
18
import os
 
19
import sys
 
20
 
 
21
sys.path[0:0] = [""]
 
22
 
 
23
from pymongo import read_preferences
 
24
from pymongo.common import clean_node
 
25
from pymongo.errors import AutoReconnect
 
26
from pymongo.ismaster import IsMaster
 
27
from pymongo.server_description import ServerDescription
 
28
from pymongo.settings import TopologySettings
 
29
from pymongo.server_selectors import writable_server_selector
 
30
from pymongo.topology import Topology
 
31
from test import unittest
 
32
 
 
33
 
 
34
# Location of JSON test specifications.
 
35
_TEST_PATH = os.path.join(
 
36
    os.path.dirname(os.path.realpath(__file__)),
 
37
    os.path.join('server_selection', 'server_selection'))
 
38
 
 
39
 
 
40
class MockSocketInfo(object):
 
41
    def close(self):
 
42
        pass
 
43
 
 
44
    def __enter__(self):
 
45
        return self
 
46
 
 
47
    def __exit__(self, exc_type, exc_val, exc_tb):
 
48
        pass
 
49
 
 
50
 
 
51
class MockPool(object):
 
52
    def __init__(self, *args, **kwargs):
 
53
        pass
 
54
 
 
55
    def reset(self):
 
56
        pass
 
57
 
 
58
 
 
59
class MockMonitor(object):
 
60
    def __init__(self, server_description, topology, pool, topology_settings):
 
61
        pass
 
62
 
 
63
    def open(self):
 
64
        pass
 
65
 
 
66
    def request_check(self):
 
67
        pass
 
68
 
 
69
    def close(self):
 
70
        pass
 
71
 
 
72
 
 
73
def get_addresses(server_list):
 
74
    seeds = []
 
75
    hosts = []
 
76
    for server in server_list:
 
77
        seeds.append(clean_node(server['address']))
 
78
        hosts.append(server['address'])
 
79
    return seeds, hosts
 
80
 
 
81
 
 
82
def make_server_description(server, hosts):
 
83
    """Make ServerDescription from server info from JSON file."""
 
84
    ismaster_response = {}
 
85
    ismaster_response['tags'] = server['tags']
 
86
    ismaster_response['ok'] = True
 
87
    ismaster_response['hosts'] = hosts
 
88
 
 
89
    server_type = server['type']
 
90
 
 
91
    if server_type != "Standalone" and server_type != "Mongos":
 
92
        ismaster_response['setName'] = True
 
93
    if server_type == "RSPrimary":
 
94
        ismaster_response['ismaster'] = True
 
95
    elif server_type == "RSSecondary":
 
96
        ismaster_response['secondary'] = True
 
97
    elif server_type == "Mongos":
 
98
        ismaster_response['msg'] = 'isdbgrid'
 
99
 
 
100
    return ServerDescription(clean_node(server['address']),
 
101
                             IsMaster(ismaster_response),
 
102
                             round_trip_time=server['avg_rtt_ms'])
 
103
 
 
104
 
 
105
class TestAllScenarios(unittest.TestCase):
 
106
    pass
 
107
 
 
108
 
 
109
def create_test(scenario_def):
 
110
    def run_scenario(self):
 
111
 
 
112
        # Initialize topologies.
 
113
        seeds, hosts = get_addresses(
 
114
            scenario_def['topology_description']['servers'])
 
115
 
 
116
        # "Eligible servers" is defined in the server selection spec as
 
117
        # the set of servers matching both the ReadPreference's mode
 
118
        # and tag sets.
 
119
        top_latency = Topology(
 
120
            TopologySettings(seeds=seeds, monitor_class=MockMonitor,
 
121
                             pool_class=MockPool))
 
122
        # "In latency window" is defined in the server selection
 
123
        # spec as the subset of suitable_servers that falls within the
 
124
        # allowable latency window.
 
125
        top_suitable = Topology(
 
126
            TopologySettings(seeds=seeds, local_threshold_ms=1000000,
 
127
                             monitor_class=MockMonitor,
 
128
                             pool_class=MockPool))
 
129
 
 
130
        # Update topologies with server descriptions.
 
131
        for server in scenario_def['topology_description']['servers']:
 
132
            server_description = make_server_description(server, hosts)
 
133
            top_suitable.on_change(server_description)
 
134
            top_latency.on_change(server_description)
 
135
 
 
136
        # Create server selector.
 
137
        if scenario_def["operation"] == "write":
 
138
            instance = writable_server_selector
 
139
        else:
 
140
            # Make first letter lowercase to match read_pref's modes.
 
141
            mode_string = scenario_def['read_preference']['mode']
 
142
            if mode_string:
 
143
                mode_string = mode_string[:1].lower() + mode_string[1:]
 
144
 
 
145
            mode = read_preferences.read_pref_mode_from_name(mode_string)
 
146
            tag_sets = None
 
147
            if scenario_def['read_preference']['tag_sets'][0]:
 
148
                tag_sets = scenario_def['read_preference']['tag_sets']
 
149
            instance = read_preferences.make_read_preference(mode, tag_sets)
 
150
 
 
151
        # Select servers.
 
152
        if not scenario_def['suitable_servers']:
 
153
            self.assertRaises(AutoReconnect, top_suitable.select_server,
 
154
                              instance,
 
155
                              server_selection_timeout=0)
 
156
            return
 
157
 
 
158
        if not scenario_def['in_latency_window']:
 
159
            self.assertRaises(AutoReconnect, top_latency.select_server,
 
160
                              instance,
 
161
                              server_selection_timeout=0)
 
162
            return
 
163
 
 
164
        actual_suitable_s = top_suitable.select_servers(instance,
 
165
                                                    server_selection_timeout=0)
 
166
        actual_latency_s = top_latency.select_servers(instance,
 
167
                                                    server_selection_timeout=0)
 
168
 
 
169
        expected_suitable_servers = {}
 
170
        for server in scenario_def['suitable_servers']:
 
171
            server_description = make_server_description(server, hosts)
 
172
            expected_suitable_servers[server['address']] = server_description
 
173
 
 
174
        actual_suitable_servers = {}
 
175
        for s in actual_suitable_s:
 
176
            actual_suitable_servers["%s:%d" % (s.description.address[0],
 
177
                                    s.description.address[1])] = s.description
 
178
 
 
179
        self.assertEqual(len(actual_suitable_servers),
 
180
                         len(expected_suitable_servers))
 
181
        for k, actual in actual_suitable_servers.items():
 
182
            expected = expected_suitable_servers[k]
 
183
            self.assertEqual(expected.address, actual.address)
 
184
            self.assertEqual(expected.server_type, actual.server_type)
 
185
            self.assertEqual(expected.round_trip_time, actual.round_trip_time)
 
186
            self.assertEqual(expected.tags, actual.tags)
 
187
            self.assertEqual(expected.all_hosts, actual.all_hosts)
 
188
 
 
189
        expected_latency_servers = {}
 
190
        for server in scenario_def['in_latency_window']:
 
191
            server_description = make_server_description(server, hosts)
 
192
            expected_latency_servers[server['address']] = server_description
 
193
 
 
194
        actual_latency_servers = {}
 
195
        for s in actual_latency_s:
 
196
            actual_latency_servers["%s:%d" %
 
197
                                   (s.description.address[0],
 
198
                                    s.description.address[1])] = s.description
 
199
 
 
200
        self.assertEqual(len(actual_latency_servers),
 
201
                         len(expected_latency_servers))
 
202
        for k, actual in actual_latency_servers.items():
 
203
            expected = expected_latency_servers[k]
 
204
            self.assertEqual(expected.address, actual.address)
 
205
            self.assertEqual(expected.server_type, actual.server_type)
 
206
            self.assertEqual(expected.round_trip_time, actual.round_trip_time)
 
207
            self.assertEqual(expected.tags, actual.tags)
 
208
            self.assertEqual(expected.all_hosts, actual.all_hosts)
 
209
 
 
210
    return run_scenario
 
211
 
 
212
 
 
213
def create_tests():
 
214
    for dirpath, _, filenames in os.walk(_TEST_PATH):
 
215
        dirname = os.path.split(dirpath)
 
216
        dirname = os.path.split(dirname[-2])[-1] + '_' + dirname[-1]
 
217
 
 
218
        for filename in filenames:
 
219
            with open(os.path.join(dirpath, filename)) as scenario_stream:
 
220
                scenario_def = json.load(scenario_stream)
 
221
 
 
222
            # Construct test from scenario.
 
223
            new_test = create_test(scenario_def)
 
224
            test_name = 'test_%s_%s' % (
 
225
                dirname, os.path.splitext(filename)[0])
 
226
 
 
227
            new_test.__name__ = test_name
 
228
            setattr(TestAllScenarios, new_test.__name__, new_test)
 
229
 
 
230
 
 
231
create_tests()
 
232
 
 
233
if __name__ == "__main__":
 
234
    unittest.main()