1
from datetime import datetime
4
from landscape.monitor.computeruptime import (LoginInfo, LoginInfoReader,
5
ComputerUptime, BootTimes,
7
from landscape.tests.helpers import (LandscapeTest, MakePathHelper,
9
from landscape.tests.mocker import ANY
12
def append_login_data(filename, login_type=0, pid=0, tty_device="/dev/",
13
id="", username="", hostname="", termination_status=0,
14
exit_status=0, session_id=0, entry_time_seconds=0,
15
entry_time_milliseconds=0,
16
remote_ip_address=[0, 0, 0, 0]):
17
"""Append binary login data to the specified filename."""
18
file = open(filename, "ab")
20
file.write(struct.pack(LoginInfo.RAW_FORMAT, login_type, pid,
21
tty_device, id, username, hostname,
22
termination_status, exit_status, session_id,
23
entry_time_seconds, entry_time_milliseconds,
24
remote_ip_address[0], remote_ip_address[1],
25
remote_ip_address[2], remote_ip_address[3], ""))
30
class UptimeTest(LandscapeTest):
31
"""Test for parsing /proc/uptime data."""
33
def test_valid_uptime_file(self):
34
"""Test ensures that we can read a valid /proc/uptime file."""
35
proc_file = self.make_path("17608.24 16179.25")
36
self.assertEquals("%0.2f" % get_uptime(proc_file),
40
class LoginInfoReaderTest(LandscapeTest):
41
"""Tests for login info file reader."""
43
helpers = [MakePathHelper]
45
def test_read_empty_file(self):
46
"""Test ensures the reader is resilient to empty files."""
47
filename = self.make_path("")
49
file = open(filename, "rb")
51
reader = LoginInfoReader(file)
52
self.assertEquals(reader.read_next(), None)
56
def test_read_login_info(self):
57
"""Test ensures the reader can read login info."""
58
filename = self.make_path("")
59
append_login_data(filename, login_type=1, pid=100, tty_device="/dev/",
60
id="1", username="jkakar", hostname="localhost",
61
termination_status=0, exit_status=0, session_id=1,
62
entry_time_seconds=105, entry_time_milliseconds=10,
63
remote_ip_address=[192, 168, 42, 102])
64
append_login_data(filename, login_type=1, pid=101, tty_device="/dev/",
65
id="1", username="root", hostname="localhost",
66
termination_status=0, exit_status=0, session_id=2,
67
entry_time_seconds=235, entry_time_milliseconds=17,
68
remote_ip_address=[192, 168, 42, 102])
70
file = open(filename, "rb")
72
reader = LoginInfoReader(file)
74
info = reader.read_next()
75
self.assertEquals(info.login_type, 1)
76
self.assertEquals(info.pid, 100)
77
self.assertEquals(info.tty_device, "/dev/")
78
self.assertEquals(info.id, "1")
79
self.assertEquals(info.username, "jkakar")
80
self.assertEquals(info.hostname, "localhost")
81
self.assertEquals(info.termination_status, 0)
82
self.assertEquals(info.exit_status, 0)
83
self.assertEquals(info.session_id, 1)
84
self.assertEquals(info.entry_time, datetime.utcfromtimestamp(105))
85
# FIXME Test IP address handling. -jk
87
info = reader.read_next()
88
self.assertEquals(info.login_type, 1)
89
self.assertEquals(info.pid, 101)
90
self.assertEquals(info.tty_device, "/dev/")
91
self.assertEquals(info.id, "1")
92
self.assertEquals(info.username, "root")
93
self.assertEquals(info.hostname, "localhost")
94
self.assertEquals(info.termination_status, 0)
95
self.assertEquals(info.exit_status, 0)
96
self.assertEquals(info.session_id, 2)
97
self.assertEquals(info.entry_time, datetime.utcfromtimestamp(235))
98
# FIXME Test IP address handling. -jk
100
info = reader.read_next()
101
self.assertEquals(info, None)
105
def test_login_info_iterator(self):
106
"""Test ensures iteration behaves correctly."""
107
filename = self.make_path("")
108
append_login_data(filename)
109
append_login_data(filename)
111
file = open(filename, "rb")
113
reader = LoginInfoReader(file)
116
for info in reader.login_info():
119
self.assertEquals(count, 2)
124
class ComputerUptimeTest(LandscapeTest):
125
"""Tests for the computer-uptime plugin."""
127
helpers = [MakePathHelper, MonitorHelper]
130
LandscapeTest.setUp(self)
131
self.mstore.set_accepted_types(["computer-uptime"])
133
def test_deliver_message(self):
134
"""Test delivering a message with the boot and shutdown times."""
135
wtmp_filename = self.make_path("")
136
append_login_data(wtmp_filename, tty_device="~", username="shutdown",
137
entry_time_seconds=535)
138
plugin = ComputerUptime(wtmp_file=wtmp_filename)
139
self.monitor.add(plugin)
142
message = self.mstore.get_pending_messages()[0]
143
self.assertTrue("type" in message)
144
self.assertEquals(message["type"], "computer-uptime")
145
self.assertTrue("shutdown-times" in message)
146
self.assertEquals(message["shutdown-times"], [535])
148
def test_only_deliver_unique_shutdown_messages(self):
149
"""Test that only unique shutdown messages are generated."""
150
wtmp_filename = self.make_path("")
151
append_login_data(wtmp_filename, tty_device="~", username="shutdown",
152
entry_time_seconds=535)
154
plugin = ComputerUptime(wtmp_file=wtmp_filename)
155
self.monitor.add(plugin)
158
message = self.mstore.get_pending_messages()[0]
159
self.assertTrue("type" in message)
160
self.assertEquals(message["type"], "computer-uptime")
161
self.assertTrue("shutdown-times" in message)
162
self.assertEquals(message["shutdown-times"], [535])
164
append_login_data(wtmp_filename, tty_device="~", username="shutdown",
165
entry_time_seconds=3212)
168
message = self.mstore.get_pending_messages()[1]
169
self.assertTrue("type" in message)
170
self.assertEquals(message["type"], "computer-uptime")
171
self.assertTrue("shutdown-times" in message)
172
self.assertEquals(message["shutdown-times"], [3212])
174
def test_only_queue_messages_with_data(self):
175
"""Test ensures that messages without data are not queued."""
176
wtmp_filename = self.make_path("")
177
append_login_data(wtmp_filename, tty_device="~", username="reboot",
178
entry_time_seconds=3212)
179
append_login_data(wtmp_filename, tty_device="~", username="shutdown",
180
entry_time_seconds=3562)
181
plugin = ComputerUptime(wtmp_file=wtmp_filename)
182
self.monitor.add(plugin)
185
self.assertEquals(len(self.mstore.get_pending_messages()), 1)
188
self.assertEquals(len(self.mstore.get_pending_messages()), 1)
190
def test_missing_wtmp_file(self):
191
wtmp_filename = self.make_path()
192
plugin = ComputerUptime(wtmp_file=wtmp_filename)
193
self.monitor.add(plugin)
195
self.assertEquals(len(self.mstore.get_pending_messages()), 0)
197
def test_boot_time_same_as_last_known_startup_time(self):
198
"""Ensure one message is queued for duplicate startup times."""
199
wtmp_filename = self.make_path("")
200
append_login_data(wtmp_filename, tty_device="~", username="reboot",
201
entry_time_seconds=3212)
202
plugin = ComputerUptime(wtmp_file=wtmp_filename)
203
self.monitor.add(plugin)
206
messages = self.mstore.get_pending_messages()
207
self.assertEquals(len(messages), 1)
208
self.assertEquals(messages[0]["type"], "computer-uptime")
209
self.assertEquals(messages[0]["startup-times"], [3212])
211
def test_new_startup_time_replaces_old_startup_time(self):
213
Test ensures startup times are not duplicated even across restarts of
214
the client. This is simulated by creating a new instance of the plugin.
216
wtmp_filename = self.make_path("")
217
append_login_data(wtmp_filename, tty_device="~", username="reboot",
218
entry_time_seconds=3212)
219
plugin1 = ComputerUptime(wtmp_file=wtmp_filename)
220
self.monitor.add(plugin1)
223
append_login_data(wtmp_filename, tty_device="~", username="shutdown",
224
entry_time_seconds=3871)
225
append_login_data(wtmp_filename, tty_device="~", username="reboot",
226
entry_time_seconds=4657)
227
plugin2 = ComputerUptime(wtmp_file=wtmp_filename)
228
self.monitor.add(plugin2)
231
messages = self.mstore.get_pending_messages()
232
self.assertEquals(len(messages), 2)
233
self.assertEquals(messages[0]["type"], "computer-uptime")
234
self.assertEquals(messages[0]["startup-times"], [3212])
235
self.assertEquals(messages[1]["type"], "computer-uptime")
236
self.assertEquals(messages[1]["startup-times"], [4657])
238
def test_check_last_logrotated_file(self):
239
"""Test ensures reading falls back to logrotated files."""
240
wtmp_filename = self.make_path("")
241
logrotated_filename = self.make_path("", wtmp_filename + ".1")
242
append_login_data(logrotated_filename, tty_device="~",
243
username="reboot", entry_time_seconds=125)
244
append_login_data(logrotated_filename, tty_device="~",
245
username="shutdown", entry_time_seconds=535)
247
plugin = ComputerUptime(wtmp_file=wtmp_filename)
248
self.monitor.add(plugin)
251
message = self.mstore.get_pending_messages()[0]
252
self.assertTrue("type" in message)
253
self.assertEquals(message["type"], "computer-uptime")
254
self.assertTrue("startup-times" in message)
255
self.assertEquals(message["startup-times"], [125])
256
self.assertTrue("shutdown-times" in message)
257
self.assertEquals(message["shutdown-times"], [535])
259
def test_check_logrotate_spillover(self):
260
"""Test ensures reading falls back to logrotated files."""
261
wtmp_filename = self.make_path("")
262
logrotated_filename = self.make_path("", wtmp_filename + ".1")
263
append_login_data(logrotated_filename, tty_device="~",
264
username="reboot", entry_time_seconds=125)
265
append_login_data(logrotated_filename, tty_device="~",
266
username="shutdown", entry_time_seconds=535)
267
append_login_data(wtmp_filename, tty_device="~",
268
username="reboot", entry_time_seconds=1025)
269
append_login_data(wtmp_filename, tty_device="~",
270
username="shutdown", entry_time_seconds=1150)
272
plugin = ComputerUptime(wtmp_file=wtmp_filename)
273
self.monitor.add(plugin)
276
messages = self.mstore.get_pending_messages()
277
self.assertEquals(len(messages), 2)
279
message = messages[0]
280
self.assertTrue("type" in message)
281
self.assertEquals(message["type"], "computer-uptime")
282
self.assertTrue("startup-times" in message)
283
self.assertEquals(message["startup-times"], [125])
284
self.assertTrue("shutdown-times" in message)
285
self.assertEquals(message["shutdown-times"], [535])
287
message = messages[1]
288
self.assertTrue("type" in message)
289
self.assertEquals(message["type"], "computer-uptime")
290
self.assertTrue("startup-times" in message)
291
self.assertEquals(message["startup-times"], [1025])
292
self.assertTrue("shutdown-times" in message)
293
self.assertEquals(message["shutdown-times"], [1150])
295
def test_call_on_accepted(self):
296
wtmp_filename = self.make_path("")
297
append_login_data(wtmp_filename, tty_device="~", username="shutdown",
298
entry_time_seconds=535)
299
plugin = ComputerUptime(wtmp_file=wtmp_filename)
300
self.monitor.add(plugin)
302
remote_broker_mock = self.mocker.replace(self.remote)
303
remote_broker_mock.send_message(ANY, urgent=True)
306
self.reactor.fire(("message-type-acceptance-changed",
310
def test_no_message_if_not_accepted(self):
312
Don't add any messages at all if the broker isn't currently
313
accepting their type.
315
self.mstore.set_accepted_types([])
316
wtmp_filename = self.make_path("")
317
append_login_data(wtmp_filename, tty_device="~", username="shutdown",
318
entry_time_seconds=535)
319
plugin = ComputerUptime(wtmp_file=wtmp_filename)
320
self.monitor.add(plugin)
322
self.mstore.set_accepted_types(["computer-uptime"])
323
self.assertMessages(list(self.mstore.get_pending_messages()), [])
326
class BootTimesTest(LandscapeTest):
328
helpers = [MakePathHelper]
330
def test_fallback_to_uptime(self):
332
When no data is available in C{/var/log/wtmp}
333
L{BootTimes.get_last_boot_time} falls back to C{/proc/uptime}.
335
wtmp_filename = self.make_path("")
336
append_login_data(wtmp_filename, tty_device="~", username="shutdown",
337
entry_time_seconds=535)
338
self.assertTrue(BootTimes(filename=wtmp_filename).get_last_boot_time())