1
from landscape.monitor.loadaverage import LoadAverage
2
from landscape.tests.helpers import LandscapeTest, MonitorHelper
3
from landscape.tests.mocker import ANY
6
def get_load_average():
9
yield (float(i), 1.0, 500.238)
13
class LoadAveragePluginTest(LandscapeTest):
15
helpers = [MonitorHelper]
17
def test_real_load_average(self):
19
When the load average plugin runs it calls os.getloadavg() to
20
retrieve current load average data. This test makes sure that
21
os.getloadavg() is called without failing and that messages
22
with the expected datatypes are generated.
24
plugin = LoadAverage(create_time=self.reactor.time)
25
self.monitor.add(plugin)
27
self.reactor.advance(self.monitor.step_size)
29
message = plugin.create_message()
30
self.assertTrue("type" in message)
31
self.assertEquals(message["type"], "load-average")
32
self.assertTrue("load-averages" in message)
34
load_averages = message["load-averages"]
35
self.assertEquals(len(load_averages), 1)
37
load_average = load_averages[0]
38
self.assertEquals(load_average[0], self.monitor.step_size)
39
self.assertTrue(isinstance(load_average[0], int))
40
self.assertTrue(isinstance(load_average[1], float))
42
def test_sample_load_average(self):
44
Sample data is used to ensure that the load average included
45
in the message is calculated correctly.
47
get_load_average = lambda: (0.15, 1, 500)
48
plugin = LoadAverage(create_time=self.reactor.time,
49
get_load_average=get_load_average)
50
self.monitor.add(plugin)
52
self.reactor.advance(self.monitor.step_size)
54
message = plugin.create_message()
55
load_averages = message["load-averages"]
56
self.assertEquals(len(load_averages), 1)
57
self.assertEquals(load_averages[0], (self.monitor.step_size, 0.15))
59
def test_ranges_remain_contiguous_after_flush(self):
61
The load average plugin uses the accumulate function to queue
62
messages. Timestamps should always be contiguous, and always
63
fall on a step boundary.
65
plugin = LoadAverage(create_time=self.reactor.time,
66
get_load_average=get_load_average().next)
67
self.monitor.add(plugin)
69
for i in range(1, 10):
70
self.reactor.advance(self.monitor.step_size)
71
message = plugin.create_message()
72
load_averages = message["load-averages"]
73
self.assertEquals(len(load_averages), 1)
74
self.assertEquals(load_averages[0][0], self.monitor.step_size * i)
76
def test_messaging_flushes(self):
78
Duplicate message should never be created. If no data is
79
available, a message with an empty C{load-averages} list is
82
plugin = LoadAverage(create_time=self.reactor.time,
83
get_load_average=get_load_average().next)
84
self.monitor.add(plugin)
86
self.reactor.advance(self.monitor.step_size)
88
message = plugin.create_message()
89
self.assertEquals(len(message["load-averages"]), 1)
91
message = plugin.create_message()
92
self.assertEquals(len(message["load-averages"]), 0)
94
def test_never_exchange_empty_messages(self):
96
The plugin will create a message with an empty
97
C{load-averages} list when no data is available. If an empty
98
message is created during exchange, it should not be queued.
100
self.mstore.set_accepted_types(["load-average"])
102
plugin = LoadAverage(create_time=self.reactor.time,
103
get_load_average=get_load_average().next)
104
self.monitor.add(plugin)
106
self.monitor.exchange()
107
self.assertEquals(len(self.mstore.get_pending_messages()), 0)
109
def test_exchange_messages(self):
111
The load average plugin queues message when manager.exchange()
112
is called. Each message should be aligned to a step boundary;
113
messages collected bewteen exchange periods should be
114
delivered in a single message.
116
self.mstore.set_accepted_types(["load-average"])
118
plugin = LoadAverage(create_time=self.reactor.time,
119
get_load_average=get_load_average().next)
120
self.monitor.add(plugin)
122
self.reactor.advance(self.monitor.step_size * 2)
123
self.monitor.exchange()
125
self.assertMessages(self.mstore.get_pending_messages(),
126
[{"type": "load-average",
127
"load-averages": [(300, 10.5), (600, 30.5)]}])
129
def test_call_on_accepted(self):
130
plugin = LoadAverage(create_time=self.reactor.time,
131
get_load_average=get_load_average().next)
132
self.monitor.add(plugin)
134
self.reactor.advance(self.monitor.step_size * 1)
136
remote_broker_mock = self.mocker.replace(self.remote)
137
remote_broker_mock.send_message(ANY, urgent=True)
140
self.reactor.fire(("message-type-acceptance-changed", "load-average"),
143
def test_no_message_if_not_accepted(self):
145
Don't add any messages at all if the broker isn't currently
146
accepting their type.
148
plugin = LoadAverage(create_time=self.reactor.time,
149
get_load_average=get_load_average().next)
150
self.monitor.add(plugin)
152
self.reactor.advance(self.monitor.step_size * 2)
153
self.monitor.exchange()
155
self.mstore.set_accepted_types(["load-average"])
156
self.assertMessages(list(self.mstore.get_pending_messages()), [])