1
by Tim Lunn
Import upstream version 24.2.0 |
1 |
#!/usr/bin/env python
|
2 |
||
3 |
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
|
|
4 |
# Use of this source code is governed by a BSD-style license that can be
|
|
5 |
# found in the LICENSE file.
|
|
6 |
||
7 |
"""BSD specific tests. These are implicitly run by test_psutil.py."""
|
|
8 |
||
9 |
import unittest |
|
10 |
import subprocess |
|
11 |
import time |
|
12 |
import re |
|
13 |
import sys |
|
14 |
import os |
|
15 |
||
16 |
import psutil |
|
17 |
||
18 |
from psutil._compat import PY3 |
|
19 |
from test_psutil import DEVNULL |
|
20 |
from test_psutil import (reap_children, get_test_subprocess, sh, which, |
|
21 |
skipUnless) |
|
22 |
||
23 |
||
24 |
PAGESIZE = os.sysconf("SC_PAGE_SIZE") |
|
25 |
TOLERANCE = 200 * 1024 # 200 KB |
|
26 |
MUSE_AVAILABLE = which('muse') |
|
27 |
||
28 |
||
29 |
def sysctl(cmdline): |
|
30 |
"""Expects a sysctl command with an argument and parse the result
|
|
31 |
returning only the value of interest.
|
|
32 |
"""
|
|
33 |
result = sh("sysctl " + cmdline) |
|
34 |
result = result[result.find(": ") + 2:] |
|
35 |
try: |
|
36 |
return int(result) |
|
37 |
except ValueError: |
|
38 |
return result |
|
39 |
||
40 |
def muse(field): |
|
41 |
"""Thin wrapper around 'muse' cmdline utility."""
|
|
42 |
out = sh('muse', stderr=DEVNULL) |
|
43 |
for line in out.split('\n'): |
|
44 |
if line.startswith(field): |
|
45 |
break
|
|
46 |
else: |
|
47 |
raise ValueError("line not found") |
|
48 |
return int(line.split()[1]) |
|
49 |
||
50 |
||
51 |
class BSDSpecificTestCase(unittest.TestCase): |
|
52 |
||
53 |
def setUp(self): |
|
54 |
self.pid = get_test_subprocess().pid |
|
55 |
||
56 |
def tearDown(self): |
|
57 |
reap_children() |
|
58 |
||
59 |
def assert_eq_w_tol(self, first, second, tolerance): |
|
60 |
difference = abs(first - second) |
|
61 |
if difference <= tolerance: |
|
62 |
return
|
|
63 |
msg = '%r != %r within %r delta (%r difference)' \ |
|
64 |
% (first, second, tolerance, difference) |
|
65 |
raise AssertionError(msg) |
|
66 |
||
67 |
def test_BOOT_TIME(self): |
|
68 |
s = sysctl('sysctl kern.boottime') |
|
69 |
s = s[s.find(" sec = ") + 7:] |
|
70 |
s = s[:s.find(',')] |
|
71 |
btime = int(s) |
|
72 |
self.assertEqual(btime, psutil.BOOT_TIME) |
|
73 |
||
74 |
def test_process_create_time(self): |
|
75 |
cmdline = "ps -o lstart -p %s" %self.pid |
|
76 |
p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) |
|
77 |
output = p.communicate()[0] |
|
78 |
if PY3: |
|
79 |
output = str(output, sys.stdout.encoding) |
|
80 |
start_ps = output.replace('STARTED', '').strip() |
|
81 |
start_psutil = psutil.Process(self.pid).create_time |
|
82 |
start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", |
|
83 |
time.localtime(start_psutil)) |
|
84 |
self.assertEqual(start_ps, start_psutil) |
|
85 |
||
86 |
def test_disks(self): |
|
87 |
# test psutil.disk_usage() and psutil.disk_partitions()
|
|
88 |
# against "df -a"
|
|
89 |
def df(path): |
|
90 |
out = sh('df -k "%s"' % path).strip() |
|
91 |
lines = out.split('\n') |
|
92 |
lines.pop(0) |
|
93 |
line = lines.pop(0) |
|
94 |
dev, total, used, free = line.split()[:4] |
|
95 |
if dev == 'none': |
|
96 |
dev = '' |
|
97 |
total = int(total) * 1024 |
|
98 |
used = int(used) * 1024 |
|
99 |
free = int(free) * 1024 |
|
100 |
return dev, total, used, free |
|
101 |
||
102 |
for part in psutil.disk_partitions(all=False): |
|
103 |
usage = psutil.disk_usage(part.mountpoint) |
|
104 |
dev, total, used, free = df(part.mountpoint) |
|
105 |
self.assertEqual(part.device, dev) |
|
106 |
self.assertEqual(usage.total, total) |
|
107 |
# 10 MB tollerance
|
|
108 |
if abs(usage.free - free) > 10 * 1024 * 1024: |
|
109 |
self.fail("psutil=%s, df=%s" % (usage.free, free)) |
|
110 |
if abs(usage.used - used) > 10 * 1024 * 1024: |
|
111 |
self.fail("psutil=%s, df=%s" % (usage.used, used)) |
|
112 |
||
113 |
def test_memory_maps(self): |
|
114 |
out = sh('procstat -v %s' % self.pid) |
|
115 |
maps = psutil.Process(self.pid).get_memory_maps(grouped=False) |
|
116 |
lines = out.split('\n')[1:] |
|
117 |
while lines: |
|
118 |
line = lines.pop() |
|
119 |
fields = line.split() |
|
120 |
_, start, stop, perms, res = fields[:5] |
|
121 |
map = maps.pop() |
|
122 |
self.assertEqual("%s-%s" % (start, stop), map.addr) |
|
123 |
self.assertEqual(int(res), map.rss) |
|
124 |
if not map.path.startswith('['): |
|
125 |
self.assertEqual(fields[10], map.path) |
|
126 |
||
127 |
# --- virtual_memory(); tests against sysctl
|
|
128 |
||
129 |
def test_vmem_total(self): |
|
130 |
syst = sysctl("sysctl vm.stats.vm.v_page_count") * PAGESIZE |
|
131 |
self.assertEqual(psutil.virtual_memory().total, syst) |
|
132 |
||
133 |
def test_vmem_active(self): |
|
134 |
syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE |
|
135 |
self.assert_eq_w_tol(psutil.virtual_memory().active, syst, TOLERANCE) |
|
136 |
||
137 |
def test_vmem_inactive(self): |
|
138 |
syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE |
|
139 |
self.assert_eq_w_tol(psutil.virtual_memory().inactive, syst, TOLERANCE) |
|
140 |
||
141 |
def test_vmem_wired(self): |
|
142 |
syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE |
|
143 |
self.assert_eq_w_tol(psutil.virtual_memory().wired, syst, TOLERANCE) |
|
144 |
||
145 |
def test_vmem_cached(self): |
|
146 |
syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE |
|
147 |
self.assert_eq_w_tol(psutil.virtual_memory().cached, syst, TOLERANCE) |
|
148 |
||
149 |
def test_vmem_free(self): |
|
150 |
syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE |
|
151 |
self.assert_eq_w_tol(psutil.virtual_memory().free, syst, TOLERANCE) |
|
152 |
||
153 |
def test_vmem_buffers(self): |
|
154 |
syst = sysctl("vfs.bufspace") |
|
155 |
self.assert_eq_w_tol(psutil.virtual_memory().buffers, syst, TOLERANCE) |
|
156 |
||
157 |
# --- virtual_memory(); tests against muse
|
|
158 |
||
159 |
@skipUnless(MUSE_AVAILABLE) |
|
160 |
def test_total(self): |
|
161 |
num = muse('Total') |
|
162 |
self.assertEqual(psutil.virtual_memory().total, num) |
|
163 |
||
164 |
@skipUnless(MUSE_AVAILABLE) |
|
165 |
def test_active(self): |
|
166 |
num = muse('Active') |
|
167 |
self.assert_eq_w_tol(psutil.virtual_memory().active, num, TOLERANCE) |
|
168 |
||
169 |
@skipUnless(MUSE_AVAILABLE) |
|
170 |
def test_inactive(self): |
|
171 |
num = muse('Inactive') |
|
172 |
self.assert_eq_w_tol(psutil.virtual_memory().inactive, num, TOLERANCE) |
|
173 |
||
174 |
@skipUnless(MUSE_AVAILABLE) |
|
175 |
def test_wired(self): |
|
176 |
num = muse('Wired') |
|
177 |
self.assert_eq_w_tol(psutil.virtual_memory().wired, num, TOLERANCE) |
|
178 |
||
179 |
@skipUnless(MUSE_AVAILABLE) |
|
180 |
def test_cached(self): |
|
181 |
num = muse('Cache') |
|
182 |
self.assert_eq_w_tol(psutil.virtual_memory().cached, num, TOLERANCE) |
|
183 |
||
184 |
@skipUnless(MUSE_AVAILABLE) |
|
185 |
def test_free(self): |
|
186 |
num = muse('Free') |
|
187 |
self.assert_eq_w_tol(psutil.virtual_memory().free, num, TOLERANCE) |
|
188 |
||
189 |
@skipUnless(MUSE_AVAILABLE) |
|
190 |
def test_buffers(self): |
|
191 |
num = muse('Buffer') |
|
192 |
self.assert_eq_w_tol(psutil.virtual_memory().buffers, num, TOLERANCE) |
|
193 |
||
194 |
||
195 |
if __name__ == '__main__': |
|
196 |
test_suite = unittest.TestSuite() |
|
197 |
test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) |
|
198 |
unittest.TextTestRunner(verbosity=2).run(test_suite) |