2
# Copyright 2014 Red Hat, Inc.
4
# Author: Nejc Saje <nsaje@redhat.com>
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
20
from oslo.config import fixture as fixture_config
21
import tooz.coordination
23
from ceilometer import coordination
24
from ceilometer.tests import base
25
from ceilometer import utils
28
class MockToozCoordinator(object):
29
def __init__(self, member_id, shared_storage):
30
self._member_id = member_id
31
self._groups = shared_storage
39
def create_group(self, group_id):
40
if group_id in self._groups:
41
return MockAsyncError(
42
tooz.coordination.GroupAlreadyExist(group_id))
43
self._groups[group_id] = {}
44
return MockAsyncResult(None)
46
def join_group(self, group_id, capabilities=b''):
47
if group_id not in self._groups:
48
return MockAsyncError(
49
tooz.coordination.GroupNotCreated(group_id))
50
if self._member_id in self._groups[group_id]:
51
return MockAsyncError(
52
tooz.coordination.MemberAlreadyExist(group_id,
54
self._groups[group_id][self._member_id] = {
55
"capabilities": capabilities,
57
return MockAsyncResult(None)
59
def get_members(self, group_id):
60
if group_id not in self._groups:
61
return MockAsyncError(
62
tooz.coordination.GroupNotCreated(group_id))
63
return MockAsyncResult(self._groups[group_id])
66
class MockToozCoordExceptionRaiser(MockToozCoordinator):
68
raise tooz.coordination.ToozError('error')
71
raise tooz.coordination.ToozError('error')
73
def join_group(self, group_id, capabilities=b''):
74
raise tooz.coordination.ToozError('error')
76
def get_members(self, group_id):
77
raise tooz.coordination.ToozError('error')
80
class MockAsyncResult(tooz.coordination.CoordAsyncResult):
81
def __init__(self, result):
84
def get(self, timeout=0):
92
class MockAsyncError(tooz.coordination.CoordAsyncResult):
93
def __init__(self, error):
96
def get(self, timeout=0):
104
class MockLoggingHandler(logging.Handler):
105
"""Mock logging handler to check for expected logs."""
107
def __init__(self, *args, **kwargs):
109
logging.Handler.__init__(self, *args, **kwargs)
111
def emit(self, record):
112
self.messages[record.levelname.lower()].append(record.getMessage())
115
self.messages = {'debug': [],
122
class TestPartitioning(base.BaseTestCase):
125
super(TestPartitioning, self).setUp()
126
self.CONF = self.useFixture(fixture_config.Config()).conf
127
self.str_handler = MockLoggingHandler()
128
coordination.LOG.logger.addHandler(self.str_handler)
129
self.shared_storage = {}
131
def _get_new_started_coordinator(self, shared_storage, agent_id=None,
132
coordinator_cls=None):
133
coordinator_cls = coordinator_cls or MockToozCoordinator
134
self.CONF.set_override('backend_url', 'xxx://yyy',
135
group='coordination')
136
with mock.patch('tooz.coordination.get_coordinator',
138
coordinator_cls(member_id, shared_storage)):
139
pc = coordination.PartitionCoordinator(agent_id)
143
def _usage_simulation(self, *agents_kwargs):
144
partition_coordinators = []
145
for kwargs in agents_kwargs:
146
partition_coordinator = self._get_new_started_coordinator(
147
self.shared_storage, kwargs['agent_id'], kwargs.get(
149
partition_coordinator.join_group(kwargs['group_id'])
150
partition_coordinators.append(partition_coordinator)
152
for i, kwargs in enumerate(agents_kwargs):
153
all_resources = kwargs.get('all_resources', [])
154
expected_resources = kwargs.get('expected_resources', [])
155
actual_resources = partition_coordinators[i].extract_my_subset(
156
kwargs['group_id'], all_resources)
157
self.assertEqual(expected_resources, actual_resources)
159
def test_single_group(self):
160
agents = [dict(agent_id='agent1', group_id='group'),
161
dict(agent_id='agent2', group_id='group')]
162
self._usage_simulation(*agents)
164
self.assertEqual(sorted(self.shared_storage.keys()), ['group'])
165
self.assertEqual(sorted(self.shared_storage['group'].keys()),
166
['agent1', 'agent2'])
168
def test_multiple_groups(self):
169
agents = [dict(agent_id='agent1', group_id='group1'),
170
dict(agent_id='agent2', group_id='group2')]
171
self._usage_simulation(*agents)
173
self.assertEqual(sorted(self.shared_storage.keys()), ['group1',
176
def test_partitioning(self):
177
all_resources = ['resource_%s' % i for i in range(1000)]
178
agents = ['agent_%s' % i for i in range(10)]
180
expected_resources = [list() for _ in range(len(agents))]
181
hr = utils.HashRing(agents)
182
for r in all_resources:
183
key = agents.index(hr.get_node(r))
184
expected_resources[key].append(r)
187
for i, agent in enumerate(agents):
188
agents_kwargs.append(dict(agent_id=agent,
190
all_resources=all_resources,
191
expected_resources=expected_resources[i]))
192
self._usage_simulation(*agents_kwargs)
194
def test_coordination_backend_offline(self):
195
agents = [dict(agent_id='agent1',
197
all_resources=['res1', 'res2'],
198
expected_resources=[],
199
coordinator_cls=MockToozCoordExceptionRaiser)]
200
self._usage_simulation(*agents)
201
expected_errors = ['Error getting group membership info from '
202
'coordination backend.',
203
'Error connecting to coordination backend.']
204
for e in expected_errors:
205
self.assertIn(e, self.str_handler.messages['error'])
207
def test_reconnect(self):
208
coord = self._get_new_started_coordinator({}, 'a',
209
MockToozCoordExceptionRaiser)
210
with mock.patch('tooz.coordination.get_coordinator',
211
return_value=MockToozCoordExceptionRaiser('a', {})):
213
expected_errors = ['Error connecting to coordination backend.',
214
'Error sending a heartbeat to coordination '
216
for e in expected_errors:
217
self.assertIn(e, self.str_handler.messages['error'])
219
self.str_handler.messages['error'] = []
220
with mock.patch('tooz.coordination.get_coordinator',
221
return_value=MockToozCoordinator('a', {})):
223
for e in expected_errors:
224
self.assertNotIn(e, self.str_handler.messages['error'])