~ubuntu-branches/ubuntu/utopic/ceilometer/utopic-proposed

« back to all changes in this revision

Viewing changes to ceilometer/tests/test_coordination.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2014-10-16 14:07:11 UTC
  • mfrom: (1.2.1) (28.1.5 utopic-proposed)
  • Revision ID: package-import@ubuntu.com-20141016140711-95mki6bdkivvfr2x
Tags: 2014.2-0ubuntu1
New upstream release. 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# Copyright 2014 Red Hat, Inc.
 
3
#
 
4
# Author: Nejc Saje <nsaje@redhat.com>
 
5
#
 
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
7
# not use this file except in compliance with the License. You may obtain
 
8
# a copy of the License at
 
9
#
 
10
#      http://www.apache.org/licenses/LICENSE-2.0
 
11
#
 
12
# Unless required by applicable law or agreed to in writing, software
 
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
15
# License for the specific language governing permissions and limitations
 
16
# under the License.
 
17
 
 
18
import logging
 
19
import mock
 
20
from oslo.config import fixture as fixture_config
 
21
import tooz.coordination
 
22
 
 
23
from ceilometer import coordination
 
24
from ceilometer.tests import base
 
25
from ceilometer import utils
 
26
 
 
27
 
 
28
class MockToozCoordinator(object):
 
29
    def __init__(self, member_id, shared_storage):
 
30
        self._member_id = member_id
 
31
        self._groups = shared_storage
 
32
 
 
33
    def start(self):
 
34
        pass
 
35
 
 
36
    def heartbeat(self):
 
37
        pass
 
38
 
 
39
    def create_group(self, group_id):
 
40
        if group_id in self._groups:
 
41
            return MockAsyncError(
 
42
                tooz.coordination.GroupAlreadyExist(group_id))
 
43
        self._groups[group_id] = {}
 
44
        return MockAsyncResult(None)
 
45
 
 
46
    def join_group(self, group_id, capabilities=b''):
 
47
        if group_id not in self._groups:
 
48
            return MockAsyncError(
 
49
                tooz.coordination.GroupNotCreated(group_id))
 
50
        if self._member_id in self._groups[group_id]:
 
51
            return MockAsyncError(
 
52
                tooz.coordination.MemberAlreadyExist(group_id,
 
53
                                                     self._member_id))
 
54
        self._groups[group_id][self._member_id] = {
 
55
            "capabilities": capabilities,
 
56
        }
 
57
        return MockAsyncResult(None)
 
58
 
 
59
    def get_members(self, group_id):
 
60
        if group_id not in self._groups:
 
61
            return MockAsyncError(
 
62
                tooz.coordination.GroupNotCreated(group_id))
 
63
        return MockAsyncResult(self._groups[group_id])
 
64
 
 
65
 
 
66
class MockToozCoordExceptionRaiser(MockToozCoordinator):
 
67
    def start(self):
 
68
        raise tooz.coordination.ToozError('error')
 
69
 
 
70
    def heartbeat(self):
 
71
        raise tooz.coordination.ToozError('error')
 
72
 
 
73
    def join_group(self, group_id, capabilities=b''):
 
74
        raise tooz.coordination.ToozError('error')
 
75
 
 
76
    def get_members(self, group_id):
 
77
        raise tooz.coordination.ToozError('error')
 
78
 
 
79
 
 
80
class MockAsyncResult(tooz.coordination.CoordAsyncResult):
 
81
    def __init__(self, result):
 
82
        self.result = result
 
83
 
 
84
    def get(self, timeout=0):
 
85
        return self.result
 
86
 
 
87
    @staticmethod
 
88
    def done():
 
89
        return True
 
90
 
 
91
 
 
92
class MockAsyncError(tooz.coordination.CoordAsyncResult):
 
93
    def __init__(self, error):
 
94
        self.error = error
 
95
 
 
96
    def get(self, timeout=0):
 
97
        raise self.error
 
98
 
 
99
    @staticmethod
 
100
    def done():
 
101
        return True
 
102
 
 
103
 
 
104
class MockLoggingHandler(logging.Handler):
 
105
    """Mock logging handler to check for expected logs."""
 
106
 
 
107
    def __init__(self, *args, **kwargs):
 
108
        self.reset()
 
109
        logging.Handler.__init__(self, *args, **kwargs)
 
110
 
 
111
    def emit(self, record):
 
112
        self.messages[record.levelname.lower()].append(record.getMessage())
 
113
 
 
114
    def reset(self):
 
115
        self.messages = {'debug': [],
 
116
                         'info': [],
 
117
                         'warning': [],
 
118
                         'error': [],
 
119
                         'critical': []}
 
120
 
 
121
 
 
122
class TestPartitioning(base.BaseTestCase):
 
123
 
 
124
    def setUp(self):
 
125
        super(TestPartitioning, self).setUp()
 
126
        self.CONF = self.useFixture(fixture_config.Config()).conf
 
127
        self.str_handler = MockLoggingHandler()
 
128
        coordination.LOG.logger.addHandler(self.str_handler)
 
129
        self.shared_storage = {}
 
130
 
 
131
    def _get_new_started_coordinator(self, shared_storage, agent_id=None,
 
132
                                     coordinator_cls=None):
 
133
        coordinator_cls = coordinator_cls or MockToozCoordinator
 
134
        self.CONF.set_override('backend_url', 'xxx://yyy',
 
135
                               group='coordination')
 
136
        with mock.patch('tooz.coordination.get_coordinator',
 
137
                        lambda _, member_id:
 
138
                        coordinator_cls(member_id, shared_storage)):
 
139
            pc = coordination.PartitionCoordinator(agent_id)
 
140
            pc.start()
 
141
            return pc
 
142
 
 
143
    def _usage_simulation(self, *agents_kwargs):
 
144
        partition_coordinators = []
 
145
        for kwargs in agents_kwargs:
 
146
            partition_coordinator = self._get_new_started_coordinator(
 
147
                self.shared_storage, kwargs['agent_id'], kwargs.get(
 
148
                    'coordinator_cls'))
 
149
            partition_coordinator.join_group(kwargs['group_id'])
 
150
            partition_coordinators.append(partition_coordinator)
 
151
 
 
152
        for i, kwargs in enumerate(agents_kwargs):
 
153
            all_resources = kwargs.get('all_resources', [])
 
154
            expected_resources = kwargs.get('expected_resources', [])
 
155
            actual_resources = partition_coordinators[i].extract_my_subset(
 
156
                kwargs['group_id'], all_resources)
 
157
            self.assertEqual(expected_resources, actual_resources)
 
158
 
 
159
    def test_single_group(self):
 
160
        agents = [dict(agent_id='agent1', group_id='group'),
 
161
                  dict(agent_id='agent2', group_id='group')]
 
162
        self._usage_simulation(*agents)
 
163
 
 
164
        self.assertEqual(sorted(self.shared_storage.keys()), ['group'])
 
165
        self.assertEqual(sorted(self.shared_storage['group'].keys()),
 
166
                         ['agent1', 'agent2'])
 
167
 
 
168
    def test_multiple_groups(self):
 
169
        agents = [dict(agent_id='agent1', group_id='group1'),
 
170
                  dict(agent_id='agent2', group_id='group2')]
 
171
        self._usage_simulation(*agents)
 
172
 
 
173
        self.assertEqual(sorted(self.shared_storage.keys()), ['group1',
 
174
                                                              'group2'])
 
175
 
 
176
    def test_partitioning(self):
 
177
        all_resources = ['resource_%s' % i for i in range(1000)]
 
178
        agents = ['agent_%s' % i for i in range(10)]
 
179
 
 
180
        expected_resources = [list() for _ in range(len(agents))]
 
181
        hr = utils.HashRing(agents)
 
182
        for r in all_resources:
 
183
            key = agents.index(hr.get_node(r))
 
184
            expected_resources[key].append(r)
 
185
 
 
186
        agents_kwargs = []
 
187
        for i, agent in enumerate(agents):
 
188
            agents_kwargs.append(dict(agent_id=agent,
 
189
                                 group_id='group',
 
190
                                 all_resources=all_resources,
 
191
                                 expected_resources=expected_resources[i]))
 
192
        self._usage_simulation(*agents_kwargs)
 
193
 
 
194
    def test_coordination_backend_offline(self):
 
195
        agents = [dict(agent_id='agent1',
 
196
                       group_id='group',
 
197
                       all_resources=['res1', 'res2'],
 
198
                       expected_resources=[],
 
199
                       coordinator_cls=MockToozCoordExceptionRaiser)]
 
200
        self._usage_simulation(*agents)
 
201
        expected_errors = ['Error getting group membership info from '
 
202
                           'coordination backend.',
 
203
                           'Error connecting to coordination backend.']
 
204
        for e in expected_errors:
 
205
            self.assertIn(e, self.str_handler.messages['error'])
 
206
 
 
207
    def test_reconnect(self):
 
208
        coord = self._get_new_started_coordinator({}, 'a',
 
209
                                                  MockToozCoordExceptionRaiser)
 
210
        with mock.patch('tooz.coordination.get_coordinator',
 
211
                        return_value=MockToozCoordExceptionRaiser('a', {})):
 
212
            coord.heartbeat()
 
213
        expected_errors = ['Error connecting to coordination backend.',
 
214
                           'Error sending a heartbeat to coordination '
 
215
                           'backend.']
 
216
        for e in expected_errors:
 
217
            self.assertIn(e, self.str_handler.messages['error'])
 
218
 
 
219
        self.str_handler.messages['error'] = []
 
220
        with mock.patch('tooz.coordination.get_coordinator',
 
221
                        return_value=MockToozCoordinator('a', {})):
 
222
            coord.heartbeat()
 
223
        for e in expected_errors:
 
224
            self.assertNotIn(e, self.str_handler.messages['error'])