17
17
# under the License.
18
18
"""Tests for Ceilometer notify daemon."""
23
22
from stevedore import extension
24
23
from stevedore.tests import manager as test_manager
26
from ceilometer.compute import notifications
25
from ceilometer.compute.notifications import instance
27
26
from ceilometer import notification
28
27
from ceilometer.openstack.common.fixture import config
29
from ceilometer.openstack.common import timeutils
30
28
from ceilometer.storage import models
31
29
from ceilometer.tests import base as tests_base
89
87
self.CONF.set_override("connection", "log://", group='database')
91
89
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
90
@mock.patch('ceilometer.event.converter.setup_events', mock.MagicMock())
92
91
def test_process_notification(self):
93
92
# If we try to create a real RPC connection, init_host() never
94
93
# returns. Mock it out so we can establish the service
124
123
self.srv.process_notification({})
125
124
self.assertTrue(fake_msg_to_event.called)
127
def test_message_to_event_missing_keys(self):
128
now = timeutils.utcnow()
129
timeutils.set_time_override(now)
130
message = {'event_type': "foo",
134
mock_dispatcher = mock.MagicMock()
135
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
136
[extension.Extension('test',
143
self.srv._message_to_event(message)
144
events = mock_dispatcher.record_events.call_args[0]
145
self.assertEqual(1, len(events))
147
self.assertEqual("foo", event.event_type)
148
self.assertEqual(now, event.generated)
149
self.assertEqual(1, len(event.traits))
151
126
def test_message_to_event_duplicate(self):
152
127
self.CONF.set_override("store_events", True, group="notification")
153
128
mock_dispatcher = mock.MagicMock()
129
self.srv.event_converter = mock.MagicMock()
130
self.srv.event_converter.to_event.return_value = mock.MagicMock(
131
event_type='test.test')
154
132
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
155
133
[extension.Extension('test',
166
144
def test_message_to_event_bad_event(self):
167
145
self.CONF.set_override("store_events", True, group="notification")
168
146
mock_dispatcher = mock.MagicMock()
147
self.srv.event_converter = mock.MagicMock()
148
self.srv.event_converter.to_event.return_value = mock.MagicMock(
149
event_type='test.test')
169
150
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
170
151
[extension.Extension('test',
178
159
message = {'event_type': "foo", 'message_id': "abc"}
179
160
self.assertRaises(notification.UnableToSaveEventException,
180
161
self.srv._message_to_event, message)
182
def test_extract_when(self):
183
now = timeutils.utcnow()
184
modified = now + datetime.timedelta(minutes=1)
185
timeutils.set_time_override(now)
187
body = {"timestamp": str(modified)}
188
when = notification.NotificationService._extract_when(body)
189
self.assertTimestampEqual(modified, when)
191
body = {"_context_timestamp": str(modified)}
192
when = notification.NotificationService._extract_when(body)
193
self.assertTimestampEqual(modified, when)
195
then = now + datetime.timedelta(hours=1)
196
body = {"timestamp": str(modified), "_context_timestamp": str(then)}
197
when = notification.NotificationService._extract_when(body)
198
self.assertTimestampEqual(modified, when)
200
when = notification.NotificationService._extract_when({})
201
self.assertTimestampEqual(now, when)