~ubuntu-branches/ubuntu/trusty/mozjs24/trusty-proposed

« back to all changes in this revision

Viewing changes to js/src/python/psutil/test/_bsd.py

  • Committer: Package Import Robot
  • Author(s): Tim Lunn
  • Date: 2014-02-11 21:55:34 UTC
  • Revision ID: package-import@ubuntu.com-20140211215534-m1zyq5aj59md3y07
Tags: upstream-24.2.0
ImportĀ upstreamĀ versionĀ 24.2.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
 
 
3
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
 
4
# Use of this source code is governed by a BSD-style license that can be
 
5
# found in the LICENSE file.
 
6
 
 
7
"""BSD specific tests.  These are implicitly run by test_psutil.py."""
 
8
 
 
9
import unittest
 
10
import subprocess
 
11
import time
 
12
import re
 
13
import sys
 
14
import os
 
15
 
 
16
import psutil
 
17
 
 
18
from psutil._compat import PY3
 
19
from test_psutil import DEVNULL
 
20
from test_psutil import (reap_children, get_test_subprocess, sh, which,
 
21
                         skipUnless)
 
22
 
 
23
 
 
24
PAGESIZE = os.sysconf("SC_PAGE_SIZE")
 
25
TOLERANCE = 200 * 1024  # 200 KB
 
26
MUSE_AVAILABLE = which('muse')
 
27
 
 
28
 
 
29
def sysctl(cmdline):
 
30
    """Expects a sysctl command with an argument and parse the result
 
31
    returning only the value of interest.
 
32
    """
 
33
    result = sh("sysctl " + cmdline)
 
34
    result = result[result.find(": ") + 2:]
 
35
    try:
 
36
        return int(result)
 
37
    except ValueError:
 
38
        return result
 
39
 
 
40
def muse(field):
 
41
    """Thin wrapper around 'muse' cmdline utility."""
 
42
    out = sh('muse', stderr=DEVNULL)
 
43
    for line in out.split('\n'):
 
44
        if line.startswith(field):
 
45
            break
 
46
    else:
 
47
        raise ValueError("line not found")
 
48
    return int(line.split()[1])
 
49
 
 
50
 
 
51
class BSDSpecificTestCase(unittest.TestCase):
 
52
 
 
53
    def setUp(self):
 
54
        self.pid = get_test_subprocess().pid
 
55
 
 
56
    def tearDown(self):
 
57
        reap_children()
 
58
 
 
59
    def assert_eq_w_tol(self, first, second, tolerance):
 
60
        difference = abs(first - second)
 
61
        if difference <= tolerance:
 
62
            return
 
63
        msg = '%r != %r within %r delta (%r difference)' \
 
64
              % (first, second, tolerance, difference)
 
65
        raise AssertionError(msg)
 
66
 
 
67
    def test_BOOT_TIME(self):
 
68
        s = sysctl('sysctl kern.boottime')
 
69
        s = s[s.find(" sec = ") + 7:]
 
70
        s = s[:s.find(',')]
 
71
        btime = int(s)
 
72
        self.assertEqual(btime, psutil.BOOT_TIME)
 
73
 
 
74
    def test_process_create_time(self):
 
75
        cmdline = "ps -o lstart -p %s" %self.pid
 
76
        p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
 
77
        output = p.communicate()[0]
 
78
        if PY3:
 
79
            output = str(output, sys.stdout.encoding)
 
80
        start_ps = output.replace('STARTED', '').strip()
 
81
        start_psutil = psutil.Process(self.pid).create_time
 
82
        start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
 
83
                                     time.localtime(start_psutil))
 
84
        self.assertEqual(start_ps, start_psutil)
 
85
 
 
86
    def test_disks(self):
 
87
        # test psutil.disk_usage() and psutil.disk_partitions()
 
88
        # against "df -a"
 
89
        def df(path):
 
90
            out = sh('df -k "%s"' % path).strip()
 
91
            lines = out.split('\n')
 
92
            lines.pop(0)
 
93
            line = lines.pop(0)
 
94
            dev, total, used, free = line.split()[:4]
 
95
            if dev == 'none':
 
96
                dev = ''
 
97
            total = int(total) * 1024
 
98
            used = int(used) * 1024
 
99
            free = int(free) * 1024
 
100
            return dev, total, used, free
 
101
 
 
102
        for part in psutil.disk_partitions(all=False):
 
103
            usage = psutil.disk_usage(part.mountpoint)
 
104
            dev, total, used, free = df(part.mountpoint)
 
105
            self.assertEqual(part.device, dev)
 
106
            self.assertEqual(usage.total, total)
 
107
            # 10 MB tollerance
 
108
            if abs(usage.free - free) > 10 * 1024 * 1024:
 
109
                self.fail("psutil=%s, df=%s" % (usage.free, free))
 
110
            if abs(usage.used - used) > 10 * 1024 * 1024:
 
111
                self.fail("psutil=%s, df=%s" % (usage.used, used))
 
112
 
 
113
    def test_memory_maps(self):
 
114
        out = sh('procstat -v %s' % self.pid)
 
115
        maps = psutil.Process(self.pid).get_memory_maps(grouped=False)
 
116
        lines = out.split('\n')[1:]
 
117
        while lines:
 
118
            line = lines.pop()
 
119
            fields = line.split()
 
120
            _, start, stop, perms, res = fields[:5]
 
121
            map = maps.pop()
 
122
            self.assertEqual("%s-%s" % (start, stop), map.addr)
 
123
            self.assertEqual(int(res), map.rss)
 
124
            if not map.path.startswith('['):
 
125
                self.assertEqual(fields[10], map.path)
 
126
 
 
127
    # --- virtual_memory(); tests against sysctl
 
128
 
 
129
    def test_vmem_total(self):
 
130
        syst = sysctl("sysctl vm.stats.vm.v_page_count") * PAGESIZE
 
131
        self.assertEqual(psutil.virtual_memory().total, syst)
 
132
 
 
133
    def test_vmem_active(self):
 
134
        syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE
 
135
        self.assert_eq_w_tol(psutil.virtual_memory().active, syst, TOLERANCE)
 
136
 
 
137
    def test_vmem_inactive(self):
 
138
        syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE
 
139
        self.assert_eq_w_tol(psutil.virtual_memory().inactive, syst, TOLERANCE)
 
140
 
 
141
    def test_vmem_wired(self):
 
142
        syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE
 
143
        self.assert_eq_w_tol(psutil.virtual_memory().wired, syst, TOLERANCE)
 
144
 
 
145
    def test_vmem_cached(self):
 
146
        syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE
 
147
        self.assert_eq_w_tol(psutil.virtual_memory().cached, syst, TOLERANCE)
 
148
 
 
149
    def test_vmem_free(self):
 
150
        syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE
 
151
        self.assert_eq_w_tol(psutil.virtual_memory().free, syst, TOLERANCE)
 
152
 
 
153
    def test_vmem_buffers(self):
 
154
        syst = sysctl("vfs.bufspace")
 
155
        self.assert_eq_w_tol(psutil.virtual_memory().buffers, syst, TOLERANCE)
 
156
 
 
157
    # --- virtual_memory(); tests against muse
 
158
 
 
159
    @skipUnless(MUSE_AVAILABLE)
 
160
    def test_total(self):
 
161
        num = muse('Total')
 
162
        self.assertEqual(psutil.virtual_memory().total, num)
 
163
 
 
164
    @skipUnless(MUSE_AVAILABLE)
 
165
    def test_active(self):
 
166
        num = muse('Active')
 
167
        self.assert_eq_w_tol(psutil.virtual_memory().active, num, TOLERANCE)
 
168
 
 
169
    @skipUnless(MUSE_AVAILABLE)
 
170
    def test_inactive(self):
 
171
        num = muse('Inactive')
 
172
        self.assert_eq_w_tol(psutil.virtual_memory().inactive, num, TOLERANCE)
 
173
 
 
174
    @skipUnless(MUSE_AVAILABLE)
 
175
    def test_wired(self):
 
176
        num = muse('Wired')
 
177
        self.assert_eq_w_tol(psutil.virtual_memory().wired, num, TOLERANCE)
 
178
 
 
179
    @skipUnless(MUSE_AVAILABLE)
 
180
    def test_cached(self):
 
181
        num = muse('Cache')
 
182
        self.assert_eq_w_tol(psutil.virtual_memory().cached, num, TOLERANCE)
 
183
 
 
184
    @skipUnless(MUSE_AVAILABLE)
 
185
    def test_free(self):
 
186
        num = muse('Free')
 
187
        self.assert_eq_w_tol(psutil.virtual_memory().free, num, TOLERANCE)
 
188
 
 
189
    @skipUnless(MUSE_AVAILABLE)
 
190
    def test_buffers(self):
 
191
        num = muse('Buffer')
 
192
        self.assert_eq_w_tol(psutil.virtual_memory().buffers, num, TOLERANCE)
 
193
 
 
194
 
 
195
if __name__ == '__main__':
 
196
    test_suite = unittest.TestSuite()
 
197
    test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase))
 
198
    unittest.TextTestRunner(verbosity=2).run(test_suite)