~ubuntu-branches/ubuntu/saucy/ceilometer/saucy

« back to all changes in this revision

Viewing changes to tests/alarm/partition/test_coordination.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2013-10-03 08:40:40 UTC
  • mfrom: (1.1.7)
  • Revision ID: package-import@ubuntu.com-20131003084040-47e1qrwl5s5bkwjh
Tags: 2013.2~rc1-0ubuntu1
* debian/patches/fix-setup-requirements.patch: Dropped no longer
  needed.
* debian/patches/skip-database.patch: Refreshed
* debian/control:
  - Add versioned dependency for python-pbr.
  - Bump version dependency for python-webob.
  - Add versioned dependency for alembic.
  - Bump versioned dependency for python-sqlalchemy.
  - Add versioned dependency for python-pymongo.
  - Add versioned dependency for python-eventlet.
  - Dropped python-extras dependency.
  - Bump versioned dependency for python-flask.
  - Bump versioned dependency for python-stevedore.
  - Add versioned dependency for python-glanceclient.
  - Bump versioned dependency for python-novaclient.
  - Bump versioned dependency for python-keystoneclient.
  - Bump versioned dependency for python-ceilometerclient.
  - Add versioned dependency for python-lxml.
  - Bump versioned dependency for python-wsme.
  - Dropped python-netifaces dependency.
  - Added python-httplib2 build dependency.
  - Bump versioned dependency for python-fixtures.
  - Bump versioned dependency for testrepository.
  - Added versioned dependency for python-testtools.
  - Added versioned dependency for python-swiftclient.
  - Dropped python-cinderclient dependency.
  - Dropped python-lockfile dependency.
  - Dropped python-setuptoools-git dependency.
  - Dropped python-unittest2 dependency.
  - Dropped python-d2to1 dependency.
  - Added versioned dependency for python-testtools.
  - Added binary dependency for python-netaddr.
  - Add python-six as a dependency.
* debian/patches/fix-setup-requirements.patch: Bump sqlachemy version.
* debian/ceilometer-common.install:
  - Dropped ceilometer-alarm-singleton, no longer exists.
  - Added usr/bin/ceilometer-alarm-evaluator.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- encoding: utf-8 -*-
 
2
#
 
3
# Copyright © 2013 Red Hat, Inc
 
4
#
 
5
# Author: Eoghan Glynn <eglynn@redhat.com>
 
6
#
 
7
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
8
# not use this file except in compliance with the License. You may obtain
 
9
# a copy of the License at
 
10
#
 
11
#      http://www.apache.org/licenses/LICENSE-2.0
 
12
#
 
13
# Unless required by applicable law or agreed to in writing, software
 
14
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
15
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
16
# License for the specific language governing permissions and limitations
 
17
# under the License.
 
18
"""Tests for ceilometer/alarm/partition/coordination.py
 
19
"""
 
20
import datetime
 
21
import mock
 
22
import uuid
 
23
 
 
24
from oslo.config import cfg
 
25
 
 
26
from ceilometer.alarm.partition import coordination
 
27
from ceilometer.openstack.common import timeutils
 
28
from ceilometer.storage import models
 
29
from ceilometer.tests import base
 
30
 
 
31
 
 
32
class TestCoordinate(base.TestCase):
 
33
    def setUp(self):
 
34
        super(TestCoordinate, self).setUp()
 
35
        self.test_interval = 120
 
36
        cfg.CONF.set_override('evaluation_interval',
 
37
                              self.test_interval,
 
38
                              group='alarm')
 
39
        self.api_client = mock.Mock()
 
40
        self.override_start = datetime.datetime(2012, 7, 2, 10, 45)
 
41
        timeutils.utcnow.override_time = self.override_start
 
42
        self.partition_coordinator = coordination.PartitionCoordinator()
 
43
        self.partition_coordinator.coordination_rpc = mock.Mock()
 
44
 
 
45
    def tearDown(self):
 
46
        super(TestCoordinate, self).tearDown()
 
47
        timeutils.utcnow.override_time = None
 
48
 
 
49
    def _no_alarms(self):
 
50
        self.api_client.alarms.list.return_value = []
 
51
 
 
52
    def _some_alarms(self, count):
 
53
        alarm_ids = [str(uuid.uuid4()) for _ in xrange(count)]
 
54
        alarms = [self._make_alarm(aid) for aid in alarm_ids]
 
55
        self.api_client.alarms.list.return_value = alarms
 
56
        return alarm_ids
 
57
 
 
58
    def _current_alarms(self):
 
59
        return self.api_client.alarms.list.return_value
 
60
 
 
61
    def _dump_alarms(self, shave):
 
62
        alarms = self.api_client.alarms.list.return_value
 
63
        alarms = alarms[:shave]
 
64
        alarm_ids = [a.alarm_id for a in alarms]
 
65
        self.api_client.alarms.list.return_value = alarms
 
66
        return alarm_ids
 
67
 
 
68
    def _add_alarms(self, boost):
 
69
        new_alarm_ids = [str(uuid.uuid4()) for _ in xrange(boost)]
 
70
        alarms = self.api_client.alarms.list.return_value
 
71
        for aid in new_alarm_ids:
 
72
            alarms.append(self._make_alarm(aid))
 
73
        self.api_client.alarms.list.return_value = alarms
 
74
        return new_alarm_ids
 
75
 
 
76
    @staticmethod
 
77
    def _make_alarm(uuid):
 
78
        return models.Alarm(name='instance_running_hot',
 
79
                            type='threshold',
 
80
                            user_id='foobar',
 
81
                            project_id='snafu',
 
82
                            enabled=True,
 
83
                            description='',
 
84
                            repeat_actions=False,
 
85
                            state='insufficient data',
 
86
                            state_timestamp=None,
 
87
                            timestamp=None,
 
88
                            ok_actions=[],
 
89
                            alarm_actions=[],
 
90
                            insufficient_data_actions=[],
 
91
                            alarm_id=uuid,
 
92
                            rule=dict(
 
93
                                statistic='avg',
 
94
                                comparison_operator='gt',
 
95
                                threshold=80.0,
 
96
                                evaluation_periods=5,
 
97
                                period=60,
 
98
                                query=[],
 
99
                            ))
 
100
 
 
101
    def _advance_time(self, factor):
 
102
        delta = datetime.timedelta(seconds=self.test_interval * factor)
 
103
        timeutils.utcnow.override_time += delta
 
104
 
 
105
    def _younger_by(self, offset):
 
106
        return self.partition_coordinator.this.priority + offset
 
107
 
 
108
    def _older_by(self, offset):
 
109
        return self.partition_coordinator.this.priority - offset
 
110
 
 
111
    def _check_mastership(self, expected):
 
112
        self.partition_coordinator.check_mastership(self.test_interval,
 
113
                                                    self.api_client)
 
114
        self.assertEqual(expected, self.partition_coordinator.is_master)
 
115
 
 
116
    def _new_partition(self, offset):
 
117
        younger = self._younger_by(offset)
 
118
        pid = uuid.uuid4()
 
119
        self.partition_coordinator.presence(pid, younger)
 
120
        return (pid, younger)
 
121
 
 
122
    def _check_assignments(self, others, alarm_ids, per_worker,
 
123
                           expect_uneffected=[]):
 
124
        rpc = self.partition_coordinator.coordination_rpc
 
125
        calls = rpc.assign.call_args_list
 
126
        return self._check_distribution(others, alarm_ids, per_worker, calls,
 
127
                                        expect_uneffected)
 
128
 
 
129
    def _check_allocation(self, others, alarm_ids, per_worker):
 
130
        rpc = self.partition_coordinator.coordination_rpc
 
131
        calls = rpc.allocate.call_args_list
 
132
        return self._check_distribution(others, alarm_ids, per_worker, calls)
 
133
 
 
134
    def _check_distribution(self, others, alarm_ids, per_worker, calls,
 
135
                            expect_uneffected=[]):
 
136
        uneffected = [pid for pid, _ in others]
 
137
        uneffected.extend(expect_uneffected)
 
138
        remainder = list(alarm_ids)
 
139
        for call in calls:
 
140
            args, _ = call
 
141
            target, alarms = args
 
142
            self.assertTrue(target in uneffected)
 
143
            uneffected.remove(target)
 
144
            self.assertEqual(len(alarms), per_worker)
 
145
            for aid in alarms:
 
146
                self.assertTrue(aid in remainder)
 
147
                remainder.remove(aid)
 
148
        self.assertEqual(set(uneffected), set(expect_uneffected))
 
149
        return remainder
 
150
 
 
151
    def _forget_assignments(self, expected_assignments):
 
152
        rpc = self.partition_coordinator.coordination_rpc
 
153
        self.assertEqual(len(rpc.assign.call_args_list),
 
154
                         expected_assignments)
 
155
        rpc.reset_mock()
 
156
 
 
157
    def test_mastership_not_assumed_during_warmup(self):
 
158
        self._no_alarms()
 
159
 
 
160
        for _ in xrange(7):
 
161
            # still warming up
 
162
            self._advance_time(0.25)
 
163
            self._check_mastership(False)
 
164
 
 
165
        # now warmed up
 
166
        self._advance_time(0.25)
 
167
        self._check_mastership(True)
 
168
 
 
169
    def test_uncontested_mastership_assumed(self):
 
170
        self._no_alarms()
 
171
 
 
172
        self._advance_time(3)
 
173
 
 
174
        self._check_mastership(True)
 
175
 
 
176
    def test_contested_mastership_assumed(self):
 
177
        self._no_alarms()
 
178
 
 
179
        self._advance_time(3)
 
180
 
 
181
        for offset in xrange(1, 5):
 
182
            younger = self._younger_by(offset)
 
183
            self.partition_coordinator.presence(uuid.uuid4(), younger)
 
184
 
 
185
        self._check_mastership(True)
 
186
 
 
187
    def test_bested_mastership_relinquished(self):
 
188
        self._no_alarms()
 
189
 
 
190
        self._advance_time(3)
 
191
 
 
192
        self._check_mastership(True)
 
193
 
 
194
        older = self._older_by(1)
 
195
        self.partition_coordinator.presence(uuid.uuid4(), older)
 
196
 
 
197
        self._check_mastership(False)
 
198
 
 
199
    def _do_test_tie_broken_mastership(self, seed, expect_mastership):
 
200
        self._no_alarms()
 
201
        self.partition_coordinator.this.uuid = uuid.UUID(int=1)
 
202
 
 
203
        self._advance_time(3)
 
204
 
 
205
        self._check_mastership(True)
 
206
 
 
207
        tied = self.partition_coordinator.this.priority
 
208
        self.partition_coordinator.presence(uuid.UUID(int=seed), tied)
 
209
 
 
210
        self._check_mastership(expect_mastership)
 
211
 
 
212
    def test_tie_broken_mastership_assumed(self):
 
213
        self._do_test_tie_broken_mastership(2, True)
 
214
 
 
215
    def test_tie_broken_mastership_relinquished(self):
 
216
        self._do_test_tie_broken_mastership(0, False)
 
217
 
 
218
    def test_fair_distribution(self):
 
219
        alarm_ids = self._some_alarms(49)
 
220
 
 
221
        self._advance_time(3)
 
222
 
 
223
        others = [self._new_partition(i) for i in xrange(1, 5)]
 
224
 
 
225
        self._check_mastership(True)
 
226
 
 
227
        remainder = self._check_assignments(others, alarm_ids, 10)
 
228
        self.assertEqual(set(remainder),
 
229
                         set(self.partition_coordinator.assignment))
 
230
 
 
231
    def test_rebalance_on_partition_startup(self):
 
232
        alarm_ids = self._some_alarms(49)
 
233
 
 
234
        self._advance_time(3)
 
235
 
 
236
        others = [self._new_partition(i) for i in xrange(1, 5)]
 
237
 
 
238
        self._check_mastership(True)
 
239
 
 
240
        self. _forget_assignments(4)
 
241
 
 
242
        others.append(self._new_partition(5))
 
243
        self._check_mastership(True)
 
244
 
 
245
        remainder = self._check_assignments(others, alarm_ids, 9)
 
246
        self.assertEqual(set(remainder),
 
247
                         set(self.partition_coordinator.assignment))
 
248
 
 
249
    def test_rebalance_on_partition_staleness(self):
 
250
        alarm_ids = self._some_alarms(49)
 
251
 
 
252
        self._advance_time(3)
 
253
 
 
254
        others = [self._new_partition(i) for i in xrange(1, 5)]
 
255
 
 
256
        self._check_mastership(True)
 
257
 
 
258
        self. _forget_assignments(4)
 
259
 
 
260
        self._advance_time(4)
 
261
 
 
262
        stale, _ = others.pop()
 
263
        for pid, younger in others:
 
264
            self.partition_coordinator.presence(pid, younger)
 
265
 
 
266
        self._check_mastership(True)
 
267
 
 
268
        remainder = self._check_assignments(others, alarm_ids, 13, [stale])
 
269
        self.assertEqual(set(remainder),
 
270
                         set(self.partition_coordinator.assignment))
 
271
 
 
272
    def test_rebalance_on_sufficient_deletion(self):
 
273
        alarm_ids = self._some_alarms(49)
 
274
 
 
275
        self._advance_time(3)
 
276
 
 
277
        others = [self._new_partition(i) for i in xrange(1, 5)]
 
278
 
 
279
        self._check_mastership(True)
 
280
 
 
281
        self._forget_assignments(4)
 
282
 
 
283
        alarm_ids = self._dump_alarms(len(alarm_ids) / 2)
 
284
 
 
285
        self._check_mastership(True)
 
286
 
 
287
        remainder = self._check_assignments(others, alarm_ids, 5)
 
288
        self.assertEqual(set(remainder),
 
289
                         set(self.partition_coordinator.assignment))
 
290
 
 
291
    def test_no_rebalance_on_insufficient_deletion(self):
 
292
        alarm_ids = self._some_alarms(49)
 
293
 
 
294
        self._advance_time(3)
 
295
 
 
296
        others = [self._new_partition(i) for i in xrange(1, 5)]
 
297
 
 
298
        self._check_mastership(True)
 
299
 
 
300
        self._forget_assignments(4)
 
301
 
 
302
        alarm_ids = self._dump_alarms(45)
 
303
 
 
304
        self._check_mastership(True)
 
305
 
 
306
        expect_uneffected = [pid for pid, _ in others]
 
307
        self._check_assignments(others, alarm_ids, 10, expect_uneffected)
 
308
 
 
309
    def test_no_rebalance_on_creation(self):
 
310
        self._some_alarms(49)
 
311
 
 
312
        self._advance_time(3)
 
313
 
 
314
        others = [self._new_partition(i) for i in xrange(1, 5)]
 
315
 
 
316
        self._check_mastership(True)
 
317
 
 
318
        self._forget_assignments(4)
 
319
 
 
320
        new_alarm_ids = self._add_alarms(8)
 
321
 
 
322
        master_assignment = set(self.partition_coordinator.assignment)
 
323
        self._check_mastership(True)
 
324
 
 
325
        remainder = self._check_allocation(others, new_alarm_ids, 2)
 
326
        self.assertEqual(len(remainder), 0)
 
327
        self.assertEqual(master_assignment,
 
328
                         set(self.partition_coordinator.assignment))
 
329
 
 
330
    def test_bail_when_overtaken_in_distribution(self):
 
331
        self._some_alarms(49)
 
332
 
 
333
        self._advance_time(3)
 
334
 
 
335
        for i in xrange(1, 5):
 
336
            self._new_partition(i)
 
337
 
 
338
        def overtake(*args):
 
339
            self._new_partition(-1)
 
340
 
 
341
        rpc = self.partition_coordinator.coordination_rpc
 
342
        rpc.assign.side_effect = overtake
 
343
 
 
344
        self._check_mastership(False)
 
345
 
 
346
        self.assertEqual(len(rpc.assign.call_args_list), 1)
 
347
 
 
348
    def test_assigned_alarms_no_assignment(self):
 
349
        alarms = self.partition_coordinator.assigned_alarms(self.api_client)
 
350
        self.assertEqual(len(alarms), 0)
 
351
 
 
352
    def test_assigned_alarms_assignment(self):
 
353
        alarm_ids = self._some_alarms(6)
 
354
 
 
355
        uuid = self.partition_coordinator.this.uuid
 
356
        self.partition_coordinator.assign(uuid, alarm_ids)
 
357
 
 
358
        alarms = self.partition_coordinator.assigned_alarms(self.api_client)
 
359
        self.assertEqual(alarms, self._current_alarms())
 
360
 
 
361
    def test_assigned_alarms_allocation(self):
 
362
        alarm_ids = self._some_alarms(6)
 
363
 
 
364
        uuid = self.partition_coordinator.this.uuid
 
365
        self.partition_coordinator.assign(uuid, alarm_ids)
 
366
 
 
367
        new_alarm_ids = self._add_alarms(2)
 
368
        self.partition_coordinator.allocate(uuid, new_alarm_ids)
 
369
 
 
370
        alarms = self.partition_coordinator.assigned_alarms(self.api_client)
 
371
        self.assertEqual(alarms, self._current_alarms())
 
372
 
 
373
    def test_assigned_alarms_deleted_assignment(self):
 
374
        alarm_ids = self._some_alarms(6)
 
375
 
 
376
        uuid = self.partition_coordinator.this.uuid
 
377
        self.partition_coordinator.assign(uuid, alarm_ids)
 
378
 
 
379
        self._dump_alarms(len(alarm_ids) / 2)
 
380
 
 
381
        alarms = self.partition_coordinator.assigned_alarms(self.api_client)
 
382
        self.assertEqual(alarms, self._current_alarms())