1
from landscape.monitor.memoryinfo import MemoryInfo
2
from landscape.tests.helpers import LandscapeTest, MakePathHelper, MonitorHelper
3
from landscape.tests.mocker import ANY
6
class MemoryInfoTest(LandscapeTest):
8
helpers = [MonitorHelper, MakePathHelper]
28
CommitLimit: 2395740 kB
29
Committed_AS: 1566888 kB
31
VmallocTotal: 516088 kB
33
VmallocChunk: 510252 kB
37
super(MemoryInfoTest, self).setUp()
39
def test_read_proc_meminfo(self):
41
When the memory info plugin runs it reads data from
42
/proc/meminfo which it parses and accumulates to read values.
43
This test ensures that /proc/meminfo is always parseable and
44
that messages are in the expected format and contain data with
47
plugin = MemoryInfo(create_time=self.reactor.time)
48
self.monitor.add(plugin)
50
self.reactor.advance(self.monitor.step_size)
52
message = plugin.create_message()
53
self.assertTrue("type" in message)
54
self.assertEquals(message["type"], "memory-info")
55
self.assertTrue("memory-info" in message)
56
memory_info = message["memory-info"]
57
self.assertEquals(len(memory_info), 1)
58
self.assertTrue(isinstance(memory_info[0], tuple))
59
self.assertTrue(len(memory_info), 3)
60
self.assertTrue(isinstance(memory_info[0][0], int))
61
self.assertTrue(isinstance(memory_info[0][1], int))
62
self.assertTrue(isinstance(memory_info[0][2], int))
64
def test_read_sample_data(self):
66
This test uses sample /proc/meminfo data and ensures that
67
messages contain expected free memory and free swap values.
69
filename = self.make_path(self.SAMPLE_DATA)
70
plugin = MemoryInfo(source_filename=filename,
71
create_time=self.reactor.time)
72
step_size = self.monitor.step_size
73
self.monitor.add(plugin)
75
self.reactor.advance(step_size)
77
message = plugin.create_message()
78
self.assertEquals(message["memory-info"][0], (step_size, 503, 1567))
80
def test_messaging_flushes(self):
82
Duplicate message should never be created. If no data is
83
available, a message with an empty C{memory-info} list is
86
filename = self.make_path(self.SAMPLE_DATA)
87
plugin = MemoryInfo(source_filename=filename,
88
create_time=self.reactor.time)
89
self.monitor.add(plugin)
91
self.reactor.advance(self.monitor.step_size)
93
message = plugin.create_message()
94
self.assertEquals(len(message["memory-info"]), 1)
96
message = plugin.create_message()
97
self.assertEquals(len(message["memory-info"]), 0)
99
def test_ranges_remain_contiguous_after_flush(self):
101
The memory info plugin uses the accumulate function to queue
102
messages. Timestamps should always be contiguous, and always
103
fall on a step boundary.
105
filename = self.make_path(self.SAMPLE_DATA)
106
plugin = MemoryInfo(source_filename=filename,
107
create_time=self.reactor.time)
108
self.monitor.add(plugin)
110
step_size = self.monitor.step_size
111
for i in range(1, 10):
112
self.reactor.advance(step_size)
113
message = plugin.create_message()
114
memory_info = message["memory-info"]
115
self.assertEquals(len(memory_info), 1)
116
self.assertEquals(memory_info[0][0], step_size * i)
118
def test_never_exchange_empty_messages(self):
120
The plugin will create a message with an empty C{memory-info}
121
list when no data is available. If an empty message is
122
created during exchange, it should not be queued.
124
self.mstore.set_accepted_types(["memory-info"])
126
filename = self.make_path(self.SAMPLE_DATA)
127
plugin = MemoryInfo(source_filename=filename,
128
create_time=self.reactor.time)
129
self.monitor.add(plugin)
130
self.monitor.exchange()
131
self.assertEquals(len(self.mstore.get_pending_messages()), 0)
133
def test_exchange_messages(self):
135
The memory info plugin queues messages when manager.exchange()
136
is called. Each message should be aligned to a step boundary;
137
messages collected between exchange period should be delivered
140
self.mstore.set_accepted_types(["memory-info"])
142
filename = self.make_path(self.SAMPLE_DATA)
143
plugin = MemoryInfo(source_filename=filename,
144
create_time=self.reactor.time)
145
step_size = self.monitor.step_size
146
self.monitor.add(plugin)
148
self.reactor.advance(step_size * 2)
149
self.monitor.exchange()
151
self.assertMessages(self.mstore.get_pending_messages(),
152
[{"type": "memory-info",
153
"memory-info": [(step_size, 503, 1567),
154
(step_size * 2, 503, 1567)]}])
156
def test_call_on_accepted(self):
157
plugin = MemoryInfo(source_filename=self.make_path(self.SAMPLE_DATA),
158
create_time=self.reactor.time)
159
self.monitor.add(plugin)
161
self.reactor.advance(self.monitor.step_size * 1)
163
remote_broker_mock = self.mocker.replace(self.remote)
164
remote_broker_mock.send_message(ANY, urgent=True)
167
self.reactor.fire(("message-type-acceptance-changed", "memory-info"),
170
def test_no_message_if_not_accepted(self):
172
Don't add any messages at all if the broker isn't currently
173
accepting their type.
175
plugin = MemoryInfo(create_time=self.reactor.time)
176
self.monitor.add(plugin)
177
self.reactor.advance(self.monitor.step_size)
178
self.monitor.exchange()
180
self.mstore.set_accepted_types(["memory-info"])
181
self.assertMessages(list(self.mstore.get_pending_messages()), [])