7
from twisted.internet.defer import fail
9
from landscape.monitor.activeprocessinfo import ActiveProcessInfo
10
from landscape.tests.helpers import (LandscapeTest, MakePathHelper,
11
MonitorHelper, ProcessDataBuilder)
12
from landscape.tests.mocker import ANY
15
class ActiveProcessInfoTest(LandscapeTest):
16
"""Active process info plugin tests."""
18
helpers = [MonitorHelper, MakePathHelper]
21
"""Initialize helpers and sample data builder."""
22
LandscapeTest.setUp(self)
23
self.sample_dir = tempfile.mkdtemp()
24
self.builder = ProcessDataBuilder(self.sample_dir)
25
self.mstore.set_accepted_types(["active-process-info"])
28
"""Clean up sample data artifacts."""
29
shutil.rmtree(self.sample_dir)
30
LandscapeTest.tearDown(self)
32
def test_first_run_includes_kill_message(self):
33
"""Test ensures that the first run queues a kill-processes message."""
34
plugin = ActiveProcessInfo(uptime=10)
35
self.monitor.add(plugin)
37
message = self.mstore.get_pending_messages()[0]
38
self.assertEquals(message["type"], "active-process-info")
39
self.assertTrue("kill-all-processes" in message)
40
self.assertEquals(message["kill-all-processes"], True)
41
self.assertTrue("add-processes" in message)
43
def test_only_first_run_includes_kill_message(self):
44
"""Test ensures that only the first run queues a kill message."""
45
self.builder.create_data(672, self.builder.TRACING_STOP,
46
uid=1000, gid=1000, started_after_boot=10,
47
process_name="blarpy")
49
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=10)
50
self.monitor.add(plugin)
51
self.monitor.exchange()
53
self.builder.create_data(671, self.builder.STOPPED, uid=1000,
54
gid=1000, started_after_boot=15,
55
process_name="blargh")
56
self.monitor.exchange()
57
messages = self.mstore.get_pending_messages()
58
self.assertEquals(len(messages), 2)
60
self.assertEquals(message["type"], "active-process-info")
61
self.assertTrue("kill-all-processes" in message)
62
self.assertTrue("add-processes" in message)
65
self.assertEquals(message["type"], "active-process-info")
66
self.assertTrue("add-processes" in message)
68
def test_terminating_process_race(self):
69
"""Test that the plugin handles process termination races.
71
There is a potential race in the time between getting a list
72
of process directories in C{/proc} and reading
73
C{/proc/<process-id>/status} or C{/proc/<process-id>/stat}.
74
The process with C{<process-id>} may terminate and causing
75
status (or stat) to be removed in this window, resulting in an
76
file-not-found IOError.
78
This test simulates race behaviour by creating a directory for
79
a process without a C{status} or C{stat} file.
81
directory = tempfile.mkdtemp()
83
os.mkdir(os.path.join(directory, "42"))
84
plugin = ActiveProcessInfo(proc_dir=directory, uptime=10)
85
self.monitor.add(plugin)
88
shutil.rmtree(directory)
90
def test_read_proc(self):
91
"""Test reading from /proc."""
92
plugin = ActiveProcessInfo(uptime=10)
93
self.monitor.add(plugin)
95
messages = self.mstore.get_pending_messages()
96
self.assertTrue(len(messages) > 0)
97
self.assertTrue("add-processes" in messages[0])
99
def test_read_sample_data(self):
100
"""Test reading a sample set of process data."""
101
self.builder.create_data(1, self.builder.RUNNING, uid=0, gid=0,
102
started_after_boot=1030, process_name="init")
103
self.builder.create_data(671, self.builder.STOPPED, uid=1000,
104
gid=1000, started_after_boot=1110,
105
process_name="blargh")
106
self.builder.create_data(672, self.builder.TRACING_STOP,
107
uid=1000, gid=1000, started_after_boot=1120,
108
process_name="blarpy")
110
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=100,
111
jiffies=10, boot_time=0)
112
self.monitor.add(plugin)
114
message = self.mstore.get_pending_messages()[0]
115
self.assertEquals(message["type"], "active-process-info")
116
self.assertTrue("kill-all-processes" in message)
117
self.assertTrue("add-processes" in message)
118
expected_process_0 = {"state": "R", "gid": 0, "pid": 1,
119
"vm-size": 11676, "name": "init", "uid": 0,
120
"start-time": 103, "percent-cpu": 0.0}
121
expected_process_1 = {"state": "T", "gid": 1000, "pid": 671,
122
"vm-size": 11676, "name": "blargh", "uid": 1000,
123
"start-time": 111, "percent-cpu": 0.0}
124
expected_process_2 = {"state": "I", "gid": 1000, "pid": 672,
125
"vm-size": 11676, "name": "blarpy", "uid": 1000,
126
"start-time": 112, "percent-cpu": 0.0}
127
processes = message["add-processes"]
128
processes.sort(key=operator.itemgetter("pid"))
129
self.assertEquals(processes, [expected_process_0, expected_process_1,
132
def test_skip_non_numeric_subdirs(self):
133
"""Test ensures the plugin doesn't touch non-process dirs in /proc."""
134
self.builder.create_data(1, self.builder.RUNNING, uid=0, gid=0,
135
started_after_boot=1120, process_name="init")
137
directory = os.path.join(self.sample_dir, "acpi")
139
self.assertTrue(os.path.isdir(directory))
141
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=100,
142
jiffies=10, boot_time=0)
143
self.monitor.add(plugin)
145
message = self.mstore.get_pending_messages()[0]
146
self.assertEquals(message["type"], "active-process-info")
147
self.assertTrue("kill-all-processes" in message)
148
self.assertTrue("add-processes" in message)
150
expected_process = {"pid": 1, "state": "R", "name": "init",
151
"vm-size": 11676, "uid": 0, "gid": 0,
152
"start-time": 112, "percent-cpu": 0.0}
153
self.assertEquals(message["add-processes"], [expected_process])
155
def test_plugin_manager(self):
156
"""Test plugin manager integration."""
157
self.builder.create_data(1, self.builder.RUNNING, uid=0, gid=0,
158
started_after_boot=1100, process_name="init")
160
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=100,
161
jiffies=10, boot_time=0)
162
self.monitor.add(plugin)
163
self.monitor.exchange()
166
self.mstore.get_pending_messages(),
167
[{"type": "active-process-info",
168
"kill-all-processes": True,
169
"add-processes": [{"pid": 1, "state": "R", "name": "init",
170
"vm-size": 11676, "uid": 0, "gid": 0,
171
"start-time": 110, "percent-cpu": 0.0}]}])
173
def test_process_terminated(self):
174
"""Test that the plugin handles process changes in a diff-like way."""
175
# This test is *too big*
176
self.builder.create_data(1, self.builder.RUNNING, uid=0, gid=0,
177
started_after_boot=1010, process_name="init")
178
self.builder.create_data(671, self.builder.STOPPED, uid=1000,
179
gid=1000, started_after_boot=1020,
180
process_name="blargh")
181
self.builder.create_data(672, self.builder.TRACING_STOP,
182
uid=1000, gid=1000, started_after_boot=1040,
183
process_name="blarpy")
185
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=100,
186
jiffies=10, boot_time=0)
187
self.monitor.add(plugin)
190
# Terminate a process and start another.
191
self.builder.remove_data(671)
192
self.builder.create_data(12753, self.builder.RUNNING,
193
uid=0, gid=0, started_after_boot=1070,
194
process_name="wubble")
197
messages = self.mstore.get_pending_messages()
198
self.assertEquals(len(messages), 2)
200
# The first time the plugin runs we expect all known processes
202
message = messages[0]
203
self.assertEquals(message["type"], "active-process-info")
204
self.assertTrue("kill-all-processes" in message)
205
self.assertEquals(message["kill-all-processes"], True)
206
self.assertTrue("add-processes" in message)
207
expected_process_0 = {"state": "R", "gid": 0, "pid": 1,
208
"vm-size": 11676, "name": "init",
209
"uid": 0, "start-time": 101,
211
expected_process_1 = {"state": "T", "gid": 1000, "pid": 671,
212
"vm-size": 11676, "name": "blargh",
213
"uid": 1000, "start-time": 102,
215
expected_process_2 = {"state": "I", "gid": 1000, "pid": 672,
216
"vm-size": 11676, "name": "blarpy",
217
"uid": 1000, "start-time": 104,
219
processes = message["add-processes"]
220
processes.sort(key=operator.itemgetter("pid"))
221
self.assertEquals(processes, [expected_process_0, expected_process_1,
224
# Report diff-like changes to processes, such as terminated
225
# processes and new processes.
226
message = messages[1]
227
self.assertEquals(message["type"], "active-process-info")
229
self.assertTrue("add-processes" in message)
230
self.assertEquals(len(message["add-processes"]), 1)
231
expected_process = {"state": "R", "gid": 0, "pid": 12753,
232
"vm-size": 11676, "name": "wubble",
233
"uid": 0, "start-time": 107,
235
self.assertEquals(message["add-processes"], [expected_process])
237
self.assertTrue("kill-processes" in message)
238
self.assertEquals(len(message["kill-processes"]), 1)
239
self.assertEquals(message["kill-processes"], [671])
241
def test_only_queue_message_when_process_data_is_available(self):
242
"""Test ensures that messages are only queued when data changes."""
243
self.builder.create_data(672, self.builder.TRACING_STOP,
244
uid=1000, gid=1000, started_after_boot=10,
245
process_name="blarpy")
247
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=10)
248
self.monitor.add(plugin)
251
self.assertEquals(len(self.mstore.get_pending_messages()), 1)
254
self.assertEquals(len(self.mstore.get_pending_messages()), 1)
256
def test_only_report_active_processes(self):
257
"""Test ensures the plugin only reports active processes."""
258
self.builder.create_data(672, self.builder.DEAD,
259
uid=1000, gid=1000, started_after_boot=10,
260
process_name="blarpy")
261
self.builder.create_data(673, self.builder.ZOMBIE,
262
uid=1000, gid=1000, started_after_boot=12,
263
process_name="blarpitty")
264
self.builder.create_data(674, self.builder.RUNNING,
265
uid=1000, gid=1000, started_after_boot=13,
266
process_name="blarpie")
267
self.builder.create_data(675, self.builder.STOPPED,
268
uid=1000, gid=1000, started_after_boot=14,
269
process_name="blarping")
270
self.builder.create_data(676, self.builder.TRACING_STOP,
271
uid=1000, gid=1000, started_after_boot=15,
272
process_name="floerp")
273
self.builder.create_data(677, self.builder.DISK_SLEEP,
274
uid=1000, gid=1000, started_after_boot=18,
275
process_name="floerpidity")
276
self.builder.create_data(678, self.builder.SLEEPING,
277
uid=1000, gid=1000, started_after_boot=21,
278
process_name="floerpiditting")
280
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=10)
281
self.monitor.add(plugin)
284
messages = self.mstore.get_pending_messages()
285
self.assertEquals(len(messages), 1)
287
message = messages[0]
288
self.assertTrue("kill-all-processes" in message)
289
self.assertTrue("kill-processes" not in message)
290
self.assertTrue("add-processes" in message)
292
pids = [process["pid"] for process in message["add-processes"]]
294
self.assertEquals(pids, [673, 674, 675, 676, 677, 678])
296
def test_report_interesting_state_changes(self):
297
"""Test ensures that interesting state changes are reported."""
298
self.builder.create_data(672, self.builder.RUNNING,
299
uid=1000, gid=1000, started_after_boot=10,
300
process_name="blarpy")
302
# Report a running process.
303
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=10)
304
self.monitor.add(plugin)
308
messages = self.mstore.get_pending_messages()
309
self.assertEquals(len(messages), 1)
310
message = messages[0]
312
self.assertTrue("kill-all-processes" in message)
313
self.assertTrue("kill-processes" not in message)
314
self.assertTrue("add-processes" in message)
315
self.assertEquals(message["add-processes"][0]["pid"], 672)
316
self.assertEquals(message["add-processes"][0]["state"], u"R")
318
# Convert the process to a zombie and ensure it gets reported.
319
self.builder.remove_data(672)
320
self.builder.create_data(672, self.builder.ZOMBIE,
321
uid=1000, gid=1000, started_after_boot=10,
322
process_name="blarpy")
326
messages = self.mstore.get_pending_messages()
327
self.assertEquals(len(messages), 2)
328
message = messages[1]
330
self.assertTrue("kill-all-processes" not in message)
331
self.assertTrue("update-processes" in message)
332
self.assertEquals(message["update-processes"][0]["state"], u"Z")
335
def test_call_on_accepted(self):
337
L{MonitorPlugin}-based plugins can provide a callable to call
338
when a message type becomes accepted.
340
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=100,
342
self.monitor.add(plugin)
343
self.assertEquals(len(self.mstore.get_pending_messages()), 0)
344
self.broker_service.reactor.fire(("message-type-acceptance-changed",
345
"active-process-info"), True)
346
self.assertEquals(len(self.mstore.get_pending_messages()), 1)
348
def test_resynchronize_event(self):
350
When a C{resynchronize} event occurs we should clear the information
351
held in memory by the activeprocess monitor.
353
self.builder.create_data(1, self.builder.RUNNING, uid=0, gid=0,
354
started_after_boot=1030, process_name="init")
355
self.builder.create_data(671, self.builder.STOPPED, uid=1000,
356
gid=1000, started_after_boot=1110,
357
process_name="blargh")
358
self.builder.create_data(672, self.builder.TRACING_STOP,
359
uid=1000, gid=1000, started_after_boot=1120,
360
process_name="blarpy")
362
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=100,
363
jiffies=10, boot_time=0)
364
self.monitor.add(plugin)
367
messages = self.mstore.get_pending_messages()
369
expected_messages = [{"add-processes": [
393
"percent-cpu": 0.0}],
394
"kill-all-processes": True,
395
"type": "active-process-info"}]
397
self.assertMessages(messages, expected_messages)
400
messages = self.mstore.get_pending_messages()
401
# No new messages should be pending
402
self.assertMessages(messages, expected_messages)
404
self.reactor.fire("resynchronize")
406
messages = self.mstore.get_pending_messages()
407
# The resynchronisation should cause the same messages to be generated
409
expected_messages.extend(expected_messages)
410
self.assertMessages(messages, expected_messages)
413
def test_do_not_persist_changes_when_send_message_fails(self):
415
When the plugin is run it persists data that it uses on
416
subsequent checks to calculate the delta to send. It should
417
only persist data when the broker confirms that the message
418
sent by the plugin has been sent.
420
class MyException(Exception): pass
421
self.log_helper.ignore_errors(MyException)
423
self.builder.create_data(672, self.builder.RUNNING,
424
uid=1000, gid=1000, started_after_boot=10,
425
process_name="python")
426
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=10)
427
self.monitor.add(plugin)
429
broker_mock = self.mocker.replace(self.monitor.broker)
430
broker_mock.send_message(ANY, urgent=ANY)
431
self.mocker.result(fail(MyException()))
434
message = plugin.get_message()
436
def assert_message(message_id):
437
self.assertEquals(message, plugin.get_message())
439
result = plugin.exchange()
440
result.addCallback(assert_message)
443
def test_process_updates(self):
444
"""Test updates to processes are successfully reported."""
445
self.builder.create_data(1, self.builder.RUNNING, uid=0, gid=0,
446
started_after_boot=1100, process_name="init",)
448
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=100,
449
jiffies=10, boot_time=0)
450
self.monitor.add(plugin)
453
messages = self.mstore.get_pending_messages()
454
self.assertEquals(len(messages), 1)
456
self.builder.remove_data(1)
457
self.builder.create_data(1, self.builder.RUNNING, uid=0, gid=0,
458
started_after_boot=1100, process_name="init",
462
messages = self.mstore.get_pending_messages()
463
self.assertEquals(len(messages), 2)
464
self.assertMessages(messages, [{"timestamp": 0,
466
"type": "active-process-info",
467
"kill-all-processes": True,
468
"add-processes": [{"start-time": 110,
478
"type": "active-process-info",
479
"update-processes": [{"start-time": 110,
488
class PluginManagerIntegrationTest(LandscapeTest):
490
helpers = [MonitorHelper, MakePathHelper]
493
LandscapeTest.setUp(self)
494
self.sample_dir = self.make_dir()
495
self.builder = ProcessDataBuilder(self.sample_dir)
496
self.mstore.set_accepted_types(["active-process-info",
499
def get_missing_pid(self):
500
popen = subprocess.Popen(["hostname"], stdout=subprocess.PIPE)
504
def get_active_process(self):
505
return subprocess.Popen(["python", "-c", "raw_input()"],
506
stdin=subprocess.PIPE,
507
stdout=subprocess.PIPE,
508
stderr=subprocess.PIPE)
510
def test_read_long_process_name(self):
511
"""Test reading a process with a long name."""
512
self.builder.create_data(1, self.builder.RUNNING, uid=0, gid=0,
513
started_after_boot=1030,
514
process_name="NetworkManagerDaemon")
516
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=2000,
517
jiffies=10, boot_time=0)
518
self.monitor.add(plugin)
520
message = self.mstore.get_pending_messages()[0]
521
self.assertEquals(message["type"], "active-process-info")
522
self.assertTrue("kill-all-processes" in message)
523
self.assertTrue("add-processes" in message)
524
expected_process_0 = {"state": "R", "gid": 0, "pid": 1,
525
"vm-size": 11676, "name": "NetworkManagerDaemon",
526
"uid": 0, "start-time": 103, "percent-cpu": 0.0}
527
processes = message["add-processes"]
528
self.assertEquals(processes, [expected_process_0])
530
def test_strip_command_line_name_whitespace(self):
531
"""Whitespace should be stripped from command-line names."""
532
self.builder.create_data(1, self.builder.RUNNING, uid=0, gid=0,
533
started_after_boot=30,
534
process_name=" postgres: writer process ")
535
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=100,
537
self.monitor.add(plugin)
539
message = self.mstore.get_pending_messages()[0]
540
self.assertEquals(message["add-processes"][0]["name"],
541
u"postgres: writer process")
543
def test_read_process_with_no_cmdline(self):
544
"""Test reading a process without a cmdline file."""
545
self.builder.create_data(1, self.builder.RUNNING, uid=0, gid=0,
546
started_after_boot=1030,
547
process_name="ProcessWithLongName",
548
generate_cmd_line=False)
550
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=100,
551
jiffies=10, boot_time=0)
552
self.monitor.add(plugin)
554
message = self.mstore.get_pending_messages()[0]
555
self.assertEquals(message["type"], "active-process-info")
556
self.assertTrue("kill-all-processes" in message)
557
self.assertTrue("add-processes" in message)
558
expected_process_0 = {"state": "R", "gid": 0, "pid": 1,
559
"vm-size": 11676, "name": "ProcessWithLong",
560
"uid": 0, "start-time": 103, "percent-cpu": 0.0}
561
processes = message["add-processes"]
562
self.assertEquals(processes, [expected_process_0])
564
def test_generate_cpu_usage(self):
566
Test that we can calculate the CPU usage from system information and
567
the /proc/<pid>/stat file.
569
stat_data = "1 Process S 1 0 0 0 0 0 0 0 " \
570
"0 0 20 20 0 0 0 0 0 0 3000 0 " \
571
"0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0"
573
self.builder.create_data(1, self.builder.RUNNING, uid=0, gid=0,
574
started_after_boot=None,
575
process_name="Process",
576
generate_cmd_line=False,
578
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=400,
579
jiffies=10, boot_time=0)
580
self.monitor.add(plugin)
582
message = self.mstore.get_pending_messages()[0]
583
self.assertEquals(message["type"], "active-process-info")
584
self.assertTrue("kill-all-processes" in message)
585
self.assertTrue("add-processes" in message)
586
processes = message["add-processes"]
587
expected_process_0 = {"state": "R", "gid": 0, "pid": 1,
588
"vm-size": 11676, "name": u"Process",
589
"uid": 0, "start-time": 300,
591
processes = message["add-processes"]
592
self.assertEquals(processes, [expected_process_0])
594
def test_generate_cpu_usage_capped(self):
596
Test that we can calculate the CPU usage from system information and
597
the /proc/<pid>/stat file, the CPU usage should be capped at 99%.
600
stat_data = "1 Process S 1 0 0 0 0 0 0 0 " \
601
"0 0 500 500 0 0 0 0 0 0 3000 0 " \
602
"0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0"
604
self.builder.create_data(1, self.builder.RUNNING, uid=0, gid=0,
605
started_after_boot=None,
606
process_name="Process",
607
generate_cmd_line=False,
609
plugin = ActiveProcessInfo(proc_dir=self.sample_dir, uptime=400,
610
jiffies=10, boot_time=0)
611
self.monitor.add(plugin)
613
message = self.mstore.get_pending_messages()[0]
614
self.assertEquals(message["type"], "active-process-info")
615
self.assertTrue("kill-all-processes" in message)
616
self.assertTrue("add-processes" in message)
617
processes = message["add-processes"]
618
expected_process_0 = {"state": "R", "gid": 0, "pid": 1,
619
"vm-size": 11676, "name": u"Process",
620
"uid": 0, "start-time": 300,
621
"percent-cpu": 99.00}
622
processes = message["add-processes"]
623
self.assertEquals(processes, [expected_process_0])