1
# -*- encoding: utf-8 -*-
3
# Copyright © 2013 Red Hat, Inc
5
# Author: Eoghan Glynn <eglynn@redhat.com>
7
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8
# not use this file except in compliance with the License. You may obtain
9
# a copy of the License at
11
# http://www.apache.org/licenses/LICENSE-2.0
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
# License for the specific language governing permissions and limitations
18
"""Tests for ceilometer/alarm/partition/coordination.py
24
from oslo.config import cfg
26
from ceilometer.alarm.partition import coordination
27
from ceilometer.openstack.common import timeutils
28
from ceilometer.storage import models
29
from ceilometer.tests import base
32
class TestCoordinate(base.TestCase):
34
super(TestCoordinate, self).setUp()
35
self.test_interval = 120
36
cfg.CONF.set_override('evaluation_interval',
39
self.api_client = mock.Mock()
40
self.override_start = datetime.datetime(2012, 7, 2, 10, 45)
41
timeutils.utcnow.override_time = self.override_start
42
self.partition_coordinator = coordination.PartitionCoordinator()
43
self.partition_coordinator.coordination_rpc = mock.Mock()
46
super(TestCoordinate, self).tearDown()
47
timeutils.utcnow.override_time = None
50
self.api_client.alarms.list.return_value = []
52
def _some_alarms(self, count):
53
alarm_ids = [str(uuid.uuid4()) for _ in xrange(count)]
54
alarms = [self._make_alarm(aid) for aid in alarm_ids]
55
self.api_client.alarms.list.return_value = alarms
58
def _current_alarms(self):
59
return self.api_client.alarms.list.return_value
61
def _dump_alarms(self, shave):
62
alarms = self.api_client.alarms.list.return_value
63
alarms = alarms[:shave]
64
alarm_ids = [a.alarm_id for a in alarms]
65
self.api_client.alarms.list.return_value = alarms
68
def _add_alarms(self, boost):
69
new_alarm_ids = [str(uuid.uuid4()) for _ in xrange(boost)]
70
alarms = self.api_client.alarms.list.return_value
71
for aid in new_alarm_ids:
72
alarms.append(self._make_alarm(aid))
73
self.api_client.alarms.list.return_value = alarms
77
def _make_alarm(uuid):
78
return models.Alarm(name='instance_running_hot',
85
state='insufficient data',
90
insufficient_data_actions=[],
94
comparison_operator='gt',
101
def _advance_time(self, factor):
102
delta = datetime.timedelta(seconds=self.test_interval * factor)
103
timeutils.utcnow.override_time += delta
105
def _younger_by(self, offset):
106
return self.partition_coordinator.this.priority + offset
108
def _older_by(self, offset):
109
return self.partition_coordinator.this.priority - offset
111
def _check_mastership(self, expected):
112
self.partition_coordinator.check_mastership(self.test_interval,
114
self.assertEqual(expected, self.partition_coordinator.is_master)
116
def _new_partition(self, offset):
117
younger = self._younger_by(offset)
119
self.partition_coordinator.presence(pid, younger)
120
return (pid, younger)
122
def _check_assignments(self, others, alarm_ids, per_worker,
123
expect_uneffected=[]):
124
rpc = self.partition_coordinator.coordination_rpc
125
calls = rpc.assign.call_args_list
126
return self._check_distribution(others, alarm_ids, per_worker, calls,
129
def _check_allocation(self, others, alarm_ids, per_worker):
130
rpc = self.partition_coordinator.coordination_rpc
131
calls = rpc.allocate.call_args_list
132
return self._check_distribution(others, alarm_ids, per_worker, calls)
134
def _check_distribution(self, others, alarm_ids, per_worker, calls,
135
expect_uneffected=[]):
136
uneffected = [pid for pid, _ in others]
137
uneffected.extend(expect_uneffected)
138
remainder = list(alarm_ids)
141
target, alarms = args
142
self.assertTrue(target in uneffected)
143
uneffected.remove(target)
144
self.assertEqual(len(alarms), per_worker)
146
self.assertTrue(aid in remainder)
147
remainder.remove(aid)
148
self.assertEqual(set(uneffected), set(expect_uneffected))
151
def _forget_assignments(self, expected_assignments):
152
rpc = self.partition_coordinator.coordination_rpc
153
self.assertEqual(len(rpc.assign.call_args_list),
154
expected_assignments)
157
def test_mastership_not_assumed_during_warmup(self):
162
self._advance_time(0.25)
163
self._check_mastership(False)
166
self._advance_time(0.25)
167
self._check_mastership(True)
169
def test_uncontested_mastership_assumed(self):
172
self._advance_time(3)
174
self._check_mastership(True)
176
def test_contested_mastership_assumed(self):
179
self._advance_time(3)
181
for offset in xrange(1, 5):
182
younger = self._younger_by(offset)
183
self.partition_coordinator.presence(uuid.uuid4(), younger)
185
self._check_mastership(True)
187
def test_bested_mastership_relinquished(self):
190
self._advance_time(3)
192
self._check_mastership(True)
194
older = self._older_by(1)
195
self.partition_coordinator.presence(uuid.uuid4(), older)
197
self._check_mastership(False)
199
def _do_test_tie_broken_mastership(self, seed, expect_mastership):
201
self.partition_coordinator.this.uuid = uuid.UUID(int=1)
203
self._advance_time(3)
205
self._check_mastership(True)
207
tied = self.partition_coordinator.this.priority
208
self.partition_coordinator.presence(uuid.UUID(int=seed), tied)
210
self._check_mastership(expect_mastership)
212
def test_tie_broken_mastership_assumed(self):
213
self._do_test_tie_broken_mastership(2, True)
215
def test_tie_broken_mastership_relinquished(self):
216
self._do_test_tie_broken_mastership(0, False)
218
def test_fair_distribution(self):
219
alarm_ids = self._some_alarms(49)
221
self._advance_time(3)
223
others = [self._new_partition(i) for i in xrange(1, 5)]
225
self._check_mastership(True)
227
remainder = self._check_assignments(others, alarm_ids, 10)
228
self.assertEqual(set(remainder),
229
set(self.partition_coordinator.assignment))
231
def test_rebalance_on_partition_startup(self):
232
alarm_ids = self._some_alarms(49)
234
self._advance_time(3)
236
others = [self._new_partition(i) for i in xrange(1, 5)]
238
self._check_mastership(True)
240
self. _forget_assignments(4)
242
others.append(self._new_partition(5))
243
self._check_mastership(True)
245
remainder = self._check_assignments(others, alarm_ids, 9)
246
self.assertEqual(set(remainder),
247
set(self.partition_coordinator.assignment))
249
def test_rebalance_on_partition_staleness(self):
250
alarm_ids = self._some_alarms(49)
252
self._advance_time(3)
254
others = [self._new_partition(i) for i in xrange(1, 5)]
256
self._check_mastership(True)
258
self. _forget_assignments(4)
260
self._advance_time(4)
262
stale, _ = others.pop()
263
for pid, younger in others:
264
self.partition_coordinator.presence(pid, younger)
266
self._check_mastership(True)
268
remainder = self._check_assignments(others, alarm_ids, 13, [stale])
269
self.assertEqual(set(remainder),
270
set(self.partition_coordinator.assignment))
272
def test_rebalance_on_sufficient_deletion(self):
273
alarm_ids = self._some_alarms(49)
275
self._advance_time(3)
277
others = [self._new_partition(i) for i in xrange(1, 5)]
279
self._check_mastership(True)
281
self._forget_assignments(4)
283
alarm_ids = self._dump_alarms(len(alarm_ids) / 2)
285
self._check_mastership(True)
287
remainder = self._check_assignments(others, alarm_ids, 5)
288
self.assertEqual(set(remainder),
289
set(self.partition_coordinator.assignment))
291
def test_no_rebalance_on_insufficient_deletion(self):
292
alarm_ids = self._some_alarms(49)
294
self._advance_time(3)
296
others = [self._new_partition(i) for i in xrange(1, 5)]
298
self._check_mastership(True)
300
self._forget_assignments(4)
302
alarm_ids = self._dump_alarms(45)
304
self._check_mastership(True)
306
expect_uneffected = [pid for pid, _ in others]
307
self._check_assignments(others, alarm_ids, 10, expect_uneffected)
309
def test_no_rebalance_on_creation(self):
310
self._some_alarms(49)
312
self._advance_time(3)
314
others = [self._new_partition(i) for i in xrange(1, 5)]
316
self._check_mastership(True)
318
self._forget_assignments(4)
320
new_alarm_ids = self._add_alarms(8)
322
master_assignment = set(self.partition_coordinator.assignment)
323
self._check_mastership(True)
325
remainder = self._check_allocation(others, new_alarm_ids, 2)
326
self.assertEqual(len(remainder), 0)
327
self.assertEqual(master_assignment,
328
set(self.partition_coordinator.assignment))
330
def test_bail_when_overtaken_in_distribution(self):
331
self._some_alarms(49)
333
self._advance_time(3)
335
for i in xrange(1, 5):
336
self._new_partition(i)
339
self._new_partition(-1)
341
rpc = self.partition_coordinator.coordination_rpc
342
rpc.assign.side_effect = overtake
344
self._check_mastership(False)
346
self.assertEqual(len(rpc.assign.call_args_list), 1)
348
def test_assigned_alarms_no_assignment(self):
349
alarms = self.partition_coordinator.assigned_alarms(self.api_client)
350
self.assertEqual(len(alarms), 0)
352
def test_assigned_alarms_assignment(self):
353
alarm_ids = self._some_alarms(6)
355
uuid = self.partition_coordinator.this.uuid
356
self.partition_coordinator.assign(uuid, alarm_ids)
358
alarms = self.partition_coordinator.assigned_alarms(self.api_client)
359
self.assertEqual(alarms, self._current_alarms())
361
def test_assigned_alarms_allocation(self):
362
alarm_ids = self._some_alarms(6)
364
uuid = self.partition_coordinator.this.uuid
365
self.partition_coordinator.assign(uuid, alarm_ids)
367
new_alarm_ids = self._add_alarms(2)
368
self.partition_coordinator.allocate(uuid, new_alarm_ids)
370
alarms = self.partition_coordinator.assigned_alarms(self.api_client)
371
self.assertEqual(alarms, self._current_alarms())
373
def test_assigned_alarms_deleted_assignment(self):
374
alarm_ids = self._some_alarms(6)
376
uuid = self.partition_coordinator.this.uuid
377
self.partition_coordinator.assign(uuid, alarm_ids)
379
self._dump_alarms(len(alarm_ids) / 2)
381
alarms = self.partition_coordinator.assigned_alarms(self.api_client)
382
self.assertEqual(alarms, self._current_alarms())