~ubuntu-branches/ubuntu/vivid/ceilometer/vivid

« back to all changes in this revision

Viewing changes to ceilometer/tests/key_value_storage/test_notifications.py

  • Committer: Package Import Robot
  • Author(s): James Page, Corey Bryant, James Page
  • Date: 2015-02-19 14:59:07 UTC
  • mfrom: (1.2.3)
  • Revision ID: package-import@ubuntu.com-20150219145907-9jojybdsl64zcn14
Tags: 2015.1~b2-0ubuntu1
[ Corey Bryant ]
* New upstream release.
  - d/control: Align requirements with upstream.
  - d/p/skip-test.patch: Rebased.

[ James Page ]
* d/rules,d/p/skip-gabbi.patch: Skip tests that rely on python-gabbi until
  packaging and MIR is complete.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
2
# not use this file except in compliance with the License. You may obtain
 
3
# a copy of the License at
 
4
#
 
5
#      http://www.apache.org/licenses/LICENSE-2.0
 
6
#
 
7
# Unless required by applicable law or agreed to in writing, software
 
8
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
9
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
10
# License for the specific language governing permissions and limitations
 
11
# under the License.
 
12
 
 
13
import datetime
 
14
 
 
15
import mock
 
16
from oslotest import base
 
17
 
 
18
from ceilometer.key_value_storage import notifications
 
19
from ceilometer import sample
 
20
 
 
21
 
 
22
def fake_uuid(x):
 
23
    return '%s-%s-%s-%s' % (x * 8, x * 4, x * 4, x * 12)
 
24
 
 
25
 
 
26
NOW = datetime.datetime.isoformat(datetime.datetime.utcnow())
 
27
 
 
28
 
 
29
TABLE_CREATE_PAYLOAD = {
 
30
    u'table_uuid': fake_uuid('r'),
 
31
    u'index_count': 2,
 
32
    u'table_name': u'email_data'
 
33
    }
 
34
 
 
35
TABLE_DELETE_PAYLOAD = {
 
36
    u'table_uuid': fake_uuid('r'),
 
37
    u'table_name': u'email_data'
 
38
    }
 
39
 
 
40
NOTIFICATION_TABLE_CREATE = {
 
41
    u'_context_request_id': u'req-d6e9b7ec-976f-443f-ba6e-e2b89b18aa75',
 
42
    u'_context_tenant': fake_uuid('t'),
 
43
    u'_context_user': fake_uuid('u'),
 
44
    u'_context_auth_token': u'',
 
45
    u'_context_show_deleted': False,
 
46
    u'_context_is_admin': u'False',
 
47
    u'_context_read_only': False,
 
48
    'payload': TABLE_CREATE_PAYLOAD,
 
49
    'publisher_id': u'magnetodb.winterfell.com',
 
50
    'message_id': u'3d71fb8a-f1d7-4a4e-b29f-7a711a761ba1',
 
51
    'event_type': u'magnetodb.table.create.end',
 
52
    'timestamp': NOW,
 
53
    'priority': 'info'
 
54
    }
 
55
 
 
56
NOTIFICATION_TABLE_DELETE = {
 
57
    u'_context_request_id': u'req-d6e9b7ec-976f-443f-ba6e-e2b89b18aa75',
 
58
    u'_context_tenant': fake_uuid('t'),
 
59
    u'_context_user': fake_uuid('u'),
 
60
    u'_context_auth_token': u'',
 
61
    u'_context_show_deleted': False,
 
62
    u'_context_is_admin': u'False',
 
63
    u'_context_read_only': False,
 
64
    'payload': TABLE_DELETE_PAYLOAD,
 
65
    'publisher_id': u'magnetodb.winterfell.com',
 
66
    'message_id': u'4c8f5940-3c90-41af-ac16-f0e3055a305d',
 
67
    'event_type': u'magnetodb.table.delete.end',
 
68
    'timestamp': NOW,
 
69
    'priority': 'info'
 
70
    }
 
71
 
 
72
 
 
73
class TestNotification(base.BaseTestCase):
 
74
 
 
75
    def _verify_common_counter(self, c, name, volume):
 
76
        self.assertIsNotNone(c)
 
77
        self.assertEqual(name, c.name)
 
78
        self.assertEqual(fake_uuid('r'), c.resource_id)
 
79
        self.assertEqual(NOW, c.timestamp)
 
80
        self.assertEqual(volume, c.volume)
 
81
        metadata = c.resource_metadata
 
82
        self.assertEqual(u'magnetodb.winterfell.com', metadata.get('host'))
 
83
 
 
84
    def test_create_table(self):
 
85
        handler = notifications.Table(mock.Mock())
 
86
        counters = list(handler.process_notification(
 
87
                        NOTIFICATION_TABLE_CREATE))
 
88
        self.assertEqual(1, len(counters))
 
89
        table = counters[0]
 
90
        self._verify_common_counter(table, 'magnetodb.table.create', 1)
 
91
        self.assertEqual(fake_uuid('u'), table.user_id)
 
92
        self.assertEqual(fake_uuid('t'), table.project_id)
 
93
        self.assertEqual(sample.TYPE_GAUGE, table.type)
 
94
 
 
95
    def test_delete_table(self):
 
96
        handler = notifications.Table(mock.Mock())
 
97
        counters = list(handler.process_notification(
 
98
                        NOTIFICATION_TABLE_DELETE))
 
99
        self.assertEqual(1, len(counters))
 
100
        table = counters[0]
 
101
        self._verify_common_counter(table, 'magnetodb.table.delete', 1)
 
102
        self.assertEqual(fake_uuid('u'), table.user_id)
 
103
        self.assertEqual(fake_uuid('t'), table.project_id)
 
104
        self.assertEqual(sample.TYPE_GAUGE, table.type)
 
105
 
 
106
    def test_index_count(self):
 
107
        handler = notifications.Index(mock.Mock())
 
108
        counters = list(handler.process_notification(
 
109
                        NOTIFICATION_TABLE_CREATE))
 
110
        self.assertEqual(1, len(counters))
 
111
        table = counters[0]
 
112
        self._verify_common_counter(table, 'magnetodb.table.index.count', 2)
 
113
        self.assertEqual(fake_uuid('u'), table.user_id)
 
114
        self.assertEqual(fake_uuid('t'), table.project_id)
 
115
        self.assertEqual(sample.TYPE_GAUGE, table.type)