68
70
repeat_actions=True,
69
71
user_id=self.auth_headers['X-User-Id'],
70
72
project_id=self.auth_headers['X-Project-Id'],
73
time_constraints=[dict(name='testcons',
71
76
rule=dict(comparison_operator='gt',
143
150
repeat_actions=False,
144
151
user_id=self.auth_headers['X-User-Id'],
145
152
project_id=self.auth_headers['X-Project-Id'],
146
154
rule=dict(alarm_ids=['a', 'b'],
149
157
self.conn.update_alarm(alarm)
160
def _add_default_threshold_rule(alarm):
161
if 'exclude_outliers' not in alarm['threshold_rule']:
162
alarm['threshold_rule']['exclude_outliers'] = False
164
def _verify_alarm(self, json, alarm, expected_name=None):
165
if expected_name and alarm.name != expected_name:
166
self.fail("Alarm not found")
167
self._add_default_threshold_rule(json)
169
if key.endswith('_rule'):
173
self.assertEqual(getattr(alarm, storage_key),
151
176
def test_list_alarms(self):
152
177
data = self.get_json('/alarms')
153
178
self.assertEqual(4, len(data))
160
185
for r in data if 'combination_rule' in r),
188
def test_alarms_query_with_timestamp(self):
189
date_time = datetime.datetime(2012, 7, 2, 10, 41)
190
isotime = date_time.isoformat()
191
resp = self.get_json('/alarms',
192
q=[{'field': 'timestamp',
196
self.assertEqual(resp.status_code, 400)
197
self.assertEqual(jsonutils.loads(resp.body)['error_message']
199
'Unknown argument: "timestamp": '
200
'not valid for this resource')
163
202
def test_get_not_existing_alarm(self):
164
203
resp = self.get_json('/alarms/alarm-id-3', expect_errors=True)
165
204
self.assertEqual(resp.status_code, 404)
183
222
self.assertEqual(one['alarm_id'], alarms[0]['alarm_id'])
184
223
self.assertEqual(one['repeat_actions'], alarms[0]['repeat_actions'])
224
self.assertEqual(one['time_constraints'],
225
alarms[0]['time_constraints'])
186
227
def test_get_alarm_disabled(self):
187
228
alarm = models.Alarm(name='disabled',
198
239
repeat_actions=False,
199
240
user_id=self.auth_headers['X-User-Id'],
200
241
project_id=self.auth_headers['X-Project-Id'],
201
243
rule=dict(alarm_ids=['a', 'b'], operator='or'))
202
244
self.conn.update_alarm(alarm)
275
317
alarms = list(self.conn.get_alarms())
276
318
self.assertEqual(4, len(alarms))
320
def test_post_invalid_alarm_time_constraint_start(self):
322
'name': 'added_alarm_invalid_constraint_duration',
324
'time_constraints': [
332
'meter_name': 'ameter',
336
self.post_json('/alarms', params=json, expect_errors=True, status=400,
337
headers=self.auth_headers)
338
alarms = list(self.conn.get_alarms())
339
self.assertEqual(4, len(alarms))
341
def test_post_invalid_alarm_time_constraint_duration(self):
343
'name': 'added_alarm_invalid_constraint_duration',
345
'time_constraints': [
348
'start': '* 11 * * *',
353
'meter_name': 'ameter',
357
self.post_json('/alarms', params=json, expect_errors=True, status=400,
358
headers=self.auth_headers)
359
alarms = list(self.conn.get_alarms())
360
self.assertEqual(4, len(alarms))
362
def test_post_invalid_alarm_time_constraint_timezone(self):
364
'name': 'added_alarm_invalid_constraint_timezone',
366
'time_constraints': [
369
'start': '* 11 * * *',
375
'meter_name': 'ameter',
379
self.post_json('/alarms', params=json, expect_errors=True, status=400,
380
headers=self.auth_headers)
381
alarms = list(self.conn.get_alarms())
382
self.assertEqual(4, len(alarms))
278
384
def test_post_invalid_alarm_period(self):
280
386
'name': 'added_alarm_invalid_period',
367
473
'threshold_rule and combination_rule cannot '
368
474
'be set at the same time')
476
def test_post_invalid_alarm_timestamp_in_threshold_rule(self):
477
date_time = datetime.datetime(2012, 7, 2, 10, 41)
478
isotime = date_time.isoformat()
481
'name': 'invalid_alarm',
484
'meter_name': 'ameter',
485
'query': [{'field': 'timestamp',
488
'comparison_operator': 'gt',
492
resp = self.post_json('/alarms', params=json, expect_errors=True,
493
status=400, headers=self.auth_headers)
494
alarms = list(self.conn.get_alarms())
495
self.assertEqual(4, len(alarms))
497
'Unknown argument: "timestamp": '
498
'not valid for this resource',
499
resp.json['error_message']['faultstring'])
370
501
def test_post_alarm_defaults(self):
419
551
self.fail("Alarm not found")
421
def test_post_alarm(self):
424
'name': 'added_alarm',
427
'ok_actions': ['http://something/ok'],
428
'alarm_actions': ['http://something/alarm'],
429
'insufficient_data_actions': ['http://something/no'],
430
'repeat_actions': True,
432
'meter_name': 'ameter',
433
'query': [{'field': 'metadata.field',
437
'comparison_operator': 'le',
438
'statistic': 'count',
440
'evaluation_periods': '3',
553
def test_post_conflict(self):
556
'name': 'added_alarm',
559
'ok_actions': ['http://something/ok'],
560
'alarm_actions': ['http://something/alarm'],
561
'insufficient_data_actions': ['http://something/no'],
562
'repeat_actions': True,
564
'meter_name': 'ameter',
565
'query': [{'field': 'metadata.field',
569
'comparison_operator': 'le',
570
'statistic': 'count',
572
'evaluation_periods': '3',
577
self.post_json('/alarms', params=json, status=201,
578
headers=self.auth_headers)
579
self.post_json('/alarms', params=json, status=409,
580
headers=self.auth_headers)
582
def _do_test_post_alarm(self, exclude_outliers=None):
585
'name': 'added_alarm',
588
'ok_actions': ['http://something/ok'],
589
'alarm_actions': ['http://something/alarm'],
590
'insufficient_data_actions': ['http://something/no'],
591
'repeat_actions': True,
593
'meter_name': 'ameter',
594
'query': [{'field': 'metadata.field',
598
'comparison_operator': 'le',
599
'statistic': 'count',
601
'evaluation_periods': '3',
605
if exclude_outliers is not None:
606
json['threshold_rule']['exclude_outliers'] = exclude_outliers
444
608
self.post_json('/alarms', params=json, status=201,
445
609
headers=self.auth_headers)
446
610
alarms = list(self.conn.get_alarms(enabled=False))
448
612
json['threshold_rule']['query'].append({
449
613
'field': 'project_id', 'op': 'eq',
450
614
'value': self.auth_headers['X-Project-Id']})
615
# to check to IntegerType type conversion
616
json['threshold_rule']['evaluation_periods'] = 3
617
json['threshold_rule']['period'] = 180
618
self._verify_alarm(json, alarms[0], 'added_alarm')
620
def test_post_alarm_outlier_exclusion_set(self):
621
self._do_test_post_alarm(True)
623
def test_post_alarm_outlier_exclusion_clear(self):
624
self._do_test_post_alarm(False)
626
def test_post_alarm_outlier_exclusion_defaulted(self):
627
self._do_test_post_alarm()
629
def test_post_alarm_noauth(self):
632
'name': 'added_alarm',
635
'ok_actions': ['http://something/ok'],
636
'alarm_actions': ['http://something/alarm'],
637
'insufficient_data_actions': ['http://something/no'],
638
'repeat_actions': True,
640
'meter_name': 'ameter',
641
'query': [{'field': 'metadata.field',
645
'comparison_operator': 'le',
646
'statistic': 'count',
648
'evaluation_periods': '3',
649
'exclude_outliers': False,
653
self.post_json('/alarms', params=json, status=201)
654
alarms = list(self.conn.get_alarms(enabled=False))
655
self.assertEqual(1, len(alarms))
451
656
# to check to BoundedInt type conversion
452
657
json['threshold_rule']['evaluation_periods'] = 3
453
658
json['threshold_rule']['period'] = 180
497
702
self.assertEqual(1, len(alarms))
498
703
self.assertEqual(alarms[0].user_id, 'auseridthatisnotmine')
499
704
self.assertEqual(alarms[0].project_id, 'aprojectidthatisnotmine')
705
self._add_default_threshold_rule(json)
500
706
if alarms[0].name == 'added_alarm':
502
708
if key.endswith('_rule'):
564
770
self.assertEqual(1, len(alarms))
565
771
self.assertEqual(alarms[0].user_id, self.auth_headers['X-User-Id'])
566
772
self.assertEqual(alarms[0].project_id, 'aprojectidthatisnotmine')
567
if alarms[0].name == 'added_alarm':
569
if key.endswith('_rule'):
573
self.assertEqual(getattr(alarms[0], storage_key),
576
self.fail("Alarm not found")
773
self._verify_alarm(json, alarms[0], 'added_alarm')
578
775
def test_post_alarm_as_admin_no_project(self):
579
776
"""Test the creation of an alarm as admin for another project but
610
807
self.assertEqual(alarms[0].user_id, 'auseridthatisnotmine')
611
808
self.assertEqual(alarms[0].project_id,
612
809
self.auth_headers['X-Project-Id'])
613
if alarms[0].name == 'added_alarm':
615
if key.endswith('_rule'):
619
self.assertEqual(getattr(alarms[0], storage_key),
622
self.fail("Alarm not found")
810
self._verify_alarm(json, alarms[0], 'added_alarm')
624
812
def test_post_alarm_combination(self):
674
862
an_other_user_auth = {'X-User-Id': str(uuid.uuid4()),
675
863
'X-Project-Id': str(uuid.uuid4())}
676
resp = self.post_json('/alarms', params=json, status=400,
864
resp = self.post_json('/alarms', params=json, status=404,
677
865
headers=an_other_user_auth)
678
self.assertEqual(jsonutils.loads(resp.body)['error_message']
680
"Alarm a doesn't exist")
866
self.assertEqual("Alarm a Not Found",
867
jsonutils.loads(resp.body)['error_message']
682
870
def test_post_combination_alarm_as_admin_on_behalf_of_an_other_user(self):
683
871
"""Test that post a combination alarm as admin on behalf of an other
684
user/project with a alarm_id unauthorized for this project/user
872
user/project with an alarm_id unauthorized for this project/user
687
875
'enabled': False,
705
893
headers.update(self.auth_headers)
706
894
headers['X-Roles'] = 'admin'
707
resp = self.post_json('/alarms', params=json, status=400,
895
resp = self.post_json('/alarms', params=json, status=404,
709
self.assertEqual(jsonutils.loads(resp.body)['error_message']
711
"Alarm a doesn't exist")
897
self.assertEqual("Alarm a Not Found",
898
jsonutils.loads(resp.body)['error_message']
901
def test_post_combination_alarm_with_reasonable_description(self):
902
"""Test that post a combination alarm with two blanks around the
903
operator in alarm description.
907
'name': 'added_alarm',
909
'type': 'combination',
910
'ok_actions': ['http://something/ok'],
911
'alarm_actions': ['http://something/alarm'],
912
'insufficient_data_actions': ['http://something/no'],
913
'repeat_actions': True,
914
'combination_rule': {
920
self.post_json('/alarms', params=json, status=201,
921
headers=self.auth_headers)
922
alarms = list(self.conn.get_alarms(enabled=False))
923
self.assertEqual(1, len(alarms))
924
self.assertEqual(u'Combined state of alarms a and b',
925
alarms[0].description)
713
927
def test_post_combination_alarm_as_admin_success_owner_unset(self):
714
928
self._do_post_combination_alarm_as_admin_success(False)
719
933
def _do_post_combination_alarm_as_admin_success(self, owner_is_set):
720
934
"""Test that post a combination alarm as admin on behalf of nobody
721
with a alarm_id of someone else, with owner set or not
935
with an alarm_id of someone else, with owner set or not
724
938
'enabled': False,
774
988
'operator': 'and',
777
self.post_json('/alarms', params=json, status=400,
991
self.post_json('/alarms', params=json, status=404,
778
992
headers=self.auth_headers)
779
993
alarms = list(self.conn.get_alarms(enabled=False))
780
994
self.assertEqual(0, len(alarms))
816
1030
json['threshold_rule']['query'].append({
817
1031
'field': 'project_id', 'op': 'eq',
818
1032
'value': self.auth_headers['X-Project-Id']})
820
if key.endswith('_rule'):
824
self.assertEqual(getattr(alarm, storage_key), json[key])
1033
self._verify_alarm(json, alarm)
826
1035
def test_put_alarm_as_admin(self):
868
1077
alarm = list(self.conn.get_alarms(alarm_id=alarm_id, enabled=False))[0]
869
1078
self.assertEqual(alarm.user_id, 'myuserid')
870
1079
self.assertEqual(alarm.project_id, 'myprojectid')
872
if key.endswith('_rule'):
876
self.assertEqual(getattr(alarm, storage_key), json[key])
1080
self._verify_alarm(json, alarm)
878
1082
def test_put_alarm_wrong_field(self):
879
1083
# Note: wsme will ignore unknown fields so will just not appear in
909
1113
alarm_id = data[0]['alarm_id']
911
1115
resp = self.put_json('/alarms/%s' % alarm_id,
914
1117
headers=self.auth_headers)
916
json['threshold_rule']['query'].append({
917
'field': 'project_id', 'op': 'eq',
918
'value': self.auth_headers['X-Project-Id']})
919
1118
self.assertEqual(resp.status_code, 200)
921
1120
def test_delete_alarm(self):
994
1193
self.assertIsNotNone(actual['event_id'])
996
1195
def _assert_in_json(self, expected, actual):
1196
actual = jsonutils.dumps(jsonutils.loads(actual), sort_keys=True)
997
1197
for k, v in expected.iteritems():
998
fragment = jsonutils.dumps({k: v})[1:-1]
1198
fragment = jsonutils.dumps({k: v}, sort_keys=True)[1:-1]
999
1199
self.assertTrue(fragment in actual,
1000
1200
'%s not in %s' % (fragment, actual))
1044
1244
type='creation',
1045
1245
user_id=alarm['user_id']),
1247
self._add_default_threshold_rule(new_alarm)
1047
1248
new_alarm['rule'] = new_alarm['threshold_rule']
1048
1249
del new_alarm['threshold_rule']
1049
1250
new_alarm['rule']['query'].append({
1116
1317
data = dict(state='alarm')
1117
1318
self._update_alarm(alarm, data, auth_headers=admin_auth)
1320
self._add_default_threshold_rule(new_alarm)
1119
1321
new_alarm['rule'] = new_alarm['threshold_rule']
1120
1322
del new_alarm['threshold_rule']
1192
1394
def test_get_alarm_history_ordered_by_recentness(self):
1193
1395
alarm = self._get_alarm('a')
1194
for i in xrange(10):
1396
for i in moves.xrange(10):
1195
1397
self._update_alarm(alarm, dict(name='%s' % i))
1196
1398
alarm = self._get_alarm('a')
1197
1399
self._delete_alarm(alarm)
1203
1405
alarm['rule'] = alarm['threshold_rule']
1204
1406
del alarm['threshold_rule']
1205
1407
self._assert_in_json(alarm, history[0]['detail'])
1206
for i in xrange(1, 10):
1408
for i in moves.xrange(1, 10):
1207
1409
detail = '{"name": "%s"}' % (10 - i)
1208
1410
self._assert_is_subset(dict(alarm_id=alarm['alarm_id'],