27
27
from ceilometer.openstack.common.fixture import config
28
from ceilometer.openstack.common import jsonutils
29
28
from ceilometer.openstack.common import network_utils
30
29
from ceilometer.openstack.common import test
31
30
from ceilometer.publisher import rpc
32
31
from ceilometer import sample
35
class TestSignature(test.BaseTestCase):
37
def test_compute_signature_change_key(self):
38
sig1 = rpc.compute_signature({'a': 'A', 'b': 'B'},
40
sig2 = rpc.compute_signature({'A': 'A', 'b': 'B'},
42
self.assertNotEqual(sig1, sig2)
44
def test_compute_signature_change_value(self):
45
sig1 = rpc.compute_signature({'a': 'A', 'b': 'B'},
47
sig2 = rpc.compute_signature({'a': 'a', 'b': 'B'},
49
self.assertNotEqual(sig1, sig2)
51
def test_compute_signature_same(self):
52
sig1 = rpc.compute_signature({'a': 'A', 'b': 'B'},
54
sig2 = rpc.compute_signature({'a': 'A', 'b': 'B'},
56
self.assertEqual(sig1, sig2)
58
def test_compute_signature_signed(self):
59
data = {'a': 'A', 'b': 'B'}
60
sig1 = rpc.compute_signature(data, 'not-so-secret')
61
data['message_signature'] = sig1
62
sig2 = rpc.compute_signature(data, 'not-so-secret')
63
self.assertEqual(sig1, sig2)
65
def test_compute_signature_use_configured_secret(self):
66
data = {'a': 'A', 'b': 'B'}
67
sig1 = rpc.compute_signature(data, 'not-so-secret')
68
sig2 = rpc.compute_signature(data, 'different-value')
69
self.assertNotEqual(sig1, sig2)
71
def test_verify_signature_signed(self):
72
data = {'a': 'A', 'b': 'B'}
73
sig1 = rpc.compute_signature(data, 'not-so-secret')
74
data['message_signature'] = sig1
75
self.assertTrue(rpc.verify_signature(data, 'not-so-secret'))
77
def test_verify_signature_unsigned(self):
78
data = {'a': 'A', 'b': 'B'}
79
self.assertFalse(rpc.verify_signature(data, 'not-so-secret'))
81
def test_verify_signature_incorrect(self):
82
data = {'a': 'A', 'b': 'B',
83
'message_signature': 'Not the same'}
84
self.assertFalse(rpc.verify_signature(data, 'not-so-secret'))
86
def test_verify_signature_nested(self):
93
data['message_signature'] = rpc.compute_signature(
96
self.assertTrue(rpc.verify_signature(data, 'not-so-secret'))
98
def test_verify_signature_nested_json(self):
107
data['message_signature'] = rpc.compute_signature(
110
jsondata = jsonutils.loads(jsonutils.dumps(data))
111
self.assertTrue(rpc.verify_signature(jsondata, 'not-so-secret'))
114
class TestCounter(test.BaseTestCase):
116
TEST_COUNTER = sample.Sample(name='name',
121
project_id='project',
124
resource_metadata={'key': 'value'},
127
def test_meter_message_from_counter_signed(self):
128
msg = rpc.meter_message_from_counter(self.TEST_COUNTER,
130
self.assertIn('message_signature', msg)
132
def test_meter_message_from_counter_field(self):
133
def compare(f, c, msg_f, msg):
134
self.assertEqual(msg, c)
135
msg = rpc.meter_message_from_counter(self.TEST_COUNTER,
137
name_map = {'name': 'counter_name',
138
'type': 'counter_type',
139
'unit': 'counter_unit',
140
'volume': 'counter_volume'}
141
for f in self.TEST_COUNTER._fields:
142
msg_f = name_map.get(f, f)
143
yield compare, f, getattr(self.TEST_COUNTER, f), msg_f, msg[msg_f]
146
34
class TestPublish(test.BaseTestCase):
281
168
def faux_cast_wait(context, topic, msg):
282
169
self.useFixture(fixtures.MonkeyPatch(
283
"ceilometer.openstack.common.rpc.cast",
170
"ceilometer.openstack.common.rpc.cast",
285
172
# Sleep to simulate concurrency and allow other threads to work
286
173
eventlet.sleep(0)
287
174
self.published.append((topic, msg))
289
176
self.useFixture(fixtures.MonkeyPatch(
290
"ceilometer.openstack.common.rpc.cast",
177
"ceilometer.openstack.common.rpc.cast",
293
180
publisher = rpc.RPCPublisher(network_utils.urlsplit('rpc://'))
294
181
job1 = eventlet.spawn(publisher.publish_samples, None, self.test_data)