~ubuntu-branches/ubuntu/oneiric/python-psutil/oneiric

« back to all changes in this revision

Viewing changes to test/_posix.py

  • Committer: Bazaar Package Importer
  • Author(s): Sandro Tosi
  • Date: 2011-04-04 20:26:42 UTC
  • mfrom: (2.1.3 sid)
  • Revision ID: james.westby@ubuntu.com-20110404202642-u2fyar19eabqb2mn
Tags: 0.2.1-1
* New upstream release
* debian/copyright
  - extended packaging copyright years
* debian/rules
  - use the correct PYTHONPATH when running tests, for all supported versions
* debian/control
  - it now contains also extensions, so it's an arch:any package
  - move python-support from b-d-i to b-d
  - we now need python-all-dev in b-d
  - added procps to b-d, needed to run tests
* debian/{control, pyversion}
  - removed pyversion, replaced by XS-P-V field

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/env python
2
 
 
3
 
import unittest
4
 
import subprocess
5
 
import time
6
 
import sys
7
 
import os
8
 
 
9
 
import psutil
10
 
 
11
 
from test_psutil import kill, PYTHON, DEVNULL
12
 
 
13
 
 
14
 
def ps(cmd):
15
 
    """Expects a ps command with a -o argument and parse the result
16
 
    returning only the value of interest.
17
 
    """
18
 
    if not sys.platform.lower().startswith("linux"):
19
 
        cmd = cmd.replace(" --no-headers ", " ")
20
 
    p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
21
 
    output = p.communicate()[0].strip()
22
 
    if sys.version_info >= (3,):
23
 
        output = str(output, sys.stdout.encoding)
24
 
    if not sys.platform.lower().startswith("linux"):
25
 
        output = output.split('\n')[1]
26
 
    try:
27
 
        return int(output)
28
 
    except ValueError:
29
 
        return output
30
 
 
31
 
 
32
 
class PosixSpecificTestCase(unittest.TestCase):
33
 
    """Compare psutil results against 'ps' command line utility."""
34
 
 
35
 
    # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
36
 
 
37
 
    def setUp(self):
38
 
        self.pid = subprocess.Popen([PYTHON, "-E", "-O"], stdout=DEVNULL, stderr=DEVNULL).pid
39
 
 
40
 
    def tearDown(self):
41
 
        kill(self.pid)
42
 
 
43
 
    def test_process_parent_pid(self):
44
 
        ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid)
45
 
        ppid_psutil = psutil.Process(self.pid).ppid
46
 
        self.assertEqual(ppid_ps, ppid_psutil)
47
 
 
48
 
    def test_process_uid(self):
49
 
        uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid)
50
 
        uid_psutil = psutil.Process(self.pid).uid
51
 
        self.assertEqual(uid_ps, uid_psutil)
52
 
 
53
 
    def test_process_gid(self):
54
 
        gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid)
55
 
        gid_psutil = psutil.Process(self.pid).gid
56
 
        self.assertEqual(gid_ps, gid_psutil)
57
 
 
58
 
    def test_process_username(self):
59
 
        username_ps = ps("ps --no-headers -o user -p %s" %self.pid)
60
 
        username_psutil = psutil.Process(self.pid).username
61
 
        self.assertEqual(username_ps, username_psutil)
62
 
 
63
 
    def test_process_rss_memory(self):
64
 
        # give python interpreter some time to properly initialize
65
 
        # so that the results are the same
66
 
        time.sleep(0.1)
67
 
        rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid)
68
 
        rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024
69
 
        self.assertEqual(rss_ps, rss_psutil)
70
 
 
71
 
    def test_process_vsz_memory(self):
72
 
        # give python interpreter some time to properly initialize
73
 
        # so that the results are the same
74
 
        time.sleep(0.1)
75
 
        vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid)
76
 
        vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024
77
 
        self.assertEqual(vsz_ps, vsz_psutil)
78
 
 
79
 
    def test_process_name(self):
80
 
        # use command + arg since "comm" keyword not supported on all platforms
81
 
        name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
82
 
        # remove path if there is any, from the command
83
 
        name_ps = os.path.basename(name_ps)
84
 
        name_psutil = psutil.Process(self.pid).name
85
 
        self.assertEqual(name_ps, name_psutil)
86
 
 
87
 
    def test_process_pathname(self):
88
 
        ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
89
 
        psutil_pathname = os.path.join(psutil.Process(self.pid).path,
90
 
                                       psutil.Process(self.pid).name)
91
 
        self.assertEqual(ps_pathname, psutil_pathname)
92
 
 
93
 
    def test_process_cmdline(self):
94
 
        ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid)
95
 
        psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline)
96
 
        self.assertEqual(ps_cmdline, psutil_cmdline)
97
 
 
98
 
    def test_get_pids(self):
99
 
        # Note: this test might fail if the OS is starting/killing
100
 
        # other processes in the meantime
101
 
        p = subprocess.Popen(["ps", "ax", "-o", "pid"], stdout=subprocess.PIPE)
102
 
        output = p.communicate()[0].strip()
103
 
        if sys.version_info >= (3,):
104
 
            output = str(output, sys.stdout.encoding)
105
 
        output = output.replace('PID', '')
106
 
        p.wait()
107
 
        pids_ps = []
108
 
        for pid in output.split('\n'):
109
 
            if pid:
110
 
                pids_ps.append(int(pid.strip()))
111
 
        # remove ps subprocess pid which is supposed to be dead in meantime
112
 
        pids_ps.remove(p.pid)
113
 
        # not all systems include pid 0 in their list but psutil does so
114
 
        # we force it
115
 
        if 0 not in pids_ps:
116
 
            pids_ps.append(0)
117
 
 
118
 
        pids_psutil = psutil.get_pid_list()
119
 
        pids_ps.sort()
120
 
        pids_psutil.sort()
121
 
 
122
 
        if pids_ps != pids_psutil:
123
 
            difference = filter(lambda x:x not in pids_ps, pids_psutil) + \
124
 
                         filter(lambda x:x not in pids_psutil, pids_ps)
125
 
            self.fail("difference: " + str(difference))
126
 
 
127
 
 
128
 
if __name__ == '__main__':
129
 
    test_suite = unittest.TestSuite()
130
 
    test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
131
 
    unittest.TextTestRunner(verbosity=2).run(test_suite)
132
 
 
133
 
 
 
1
#!/usr/bin/env python
 
2
#
 
3
# $Id: _posix.py 935 2011-02-22 22:19:33Z g.rodola $
 
4
#
 
5
 
 
6
import unittest
 
7
import subprocess
 
8
import time
 
9
import sys
 
10
import os
 
11
 
 
12
import psutil
 
13
 
 
14
from test_psutil import (get_test_subprocess, reap_children, PYTHON, LINUX, OSX,
 
15
                         ignore_access_denied)
 
16
 
 
17
 
 
18
def ps(cmd):
 
19
    """Expects a ps command with a -o argument and parse the result
 
20
    returning only the value of interest.
 
21
    """
 
22
    if not LINUX:
 
23
        cmd = cmd.replace(" --no-headers ", " ")
 
24
    p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
 
25
    output = p.communicate()[0].strip()
 
26
    if sys.version_info >= (3,):
 
27
        output = str(output, sys.stdout.encoding)
 
28
    if not LINUX:
 
29
        output = output.split('\n')[1]
 
30
    try:
 
31
        return int(output)
 
32
    except ValueError:
 
33
        return output
 
34
 
 
35
 
 
36
class PosixSpecificTestCase(unittest.TestCase):
 
37
    """Compare psutil results against 'ps' command line utility."""
 
38
 
 
39
    # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
 
40
 
 
41
    def setUp(self):
 
42
        self.pid = get_test_subprocess([PYTHON, "-E", "-O"]).pid
 
43
 
 
44
    def tearDown(self):
 
45
        reap_children()
 
46
 
 
47
    def test_process_parent_pid(self):
 
48
        ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid)
 
49
        ppid_psutil = psutil.Process(self.pid).ppid
 
50
        self.assertEqual(ppid_ps, ppid_psutil)
 
51
 
 
52
    def test_process_uid(self):
 
53
        uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid)
 
54
        uid_psutil = psutil.Process(self.pid).uids.real
 
55
        self.assertEqual(uid_ps, uid_psutil)
 
56
 
 
57
    def test_process_gid(self):
 
58
        gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid)
 
59
        gid_psutil = psutil.Process(self.pid).gids.real
 
60
        self.assertEqual(gid_ps, gid_psutil)
 
61
 
 
62
    def test_process_username(self):
 
63
        username_ps = ps("ps --no-headers -o user -p %s" %self.pid)
 
64
        username_psutil = psutil.Process(self.pid).username
 
65
        self.assertEqual(username_ps, username_psutil)
 
66
 
 
67
    @ignore_access_denied
 
68
    def test_process_rss_memory(self):
 
69
        # give python interpreter some time to properly initialize
 
70
        # so that the results are the same
 
71
        time.sleep(0.1)
 
72
        rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid)
 
73
        rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024
 
74
        self.assertEqual(rss_ps, rss_psutil)
 
75
 
 
76
    @ignore_access_denied
 
77
    def test_process_vsz_memory(self):
 
78
        # give python interpreter some time to properly initialize
 
79
        # so that the results are the same
 
80
        time.sleep(0.1)
 
81
        vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid)
 
82
        vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024
 
83
        self.assertEqual(vsz_ps, vsz_psutil)
 
84
 
 
85
    def test_process_name(self):
 
86
        # use command + arg since "comm" keyword not supported on all platforms
 
87
        name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
 
88
        # remove path if there is any, from the command
 
89
        name_ps = os.path.basename(name_ps)
 
90
        name_psutil = psutil.Process(self.pid).name
 
91
        if OSX:
 
92
            self.assertEqual(name_psutil, "Python")
 
93
        else:
 
94
            self.assertEqual(name_ps, name_psutil)
 
95
 
 
96
    def test_process_exe(self):
 
97
        ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
 
98
        psutil_pathname = psutil.Process(self.pid).exe
 
99
        try:
 
100
            self.assertEqual(ps_pathname, psutil_pathname)
 
101
        except AssertionError:
 
102
            # certain platforms such as BSD are more accurate returning:
 
103
            # "/usr/local/bin/python2.7"
 
104
            # ...instead of:
 
105
            # "/usr/local/bin/python"
 
106
            # We do not want to consider this difference in accuracy
 
107
            # an error.
 
108
            ps_extended_pathname = PYTHON + "%s.%s" % (sys.version_info.major,
 
109
                                                       sys.version_info.minor)
 
110
            self.assertEqual(ps_extended_pathname, psutil_pathname)
 
111
 
 
112
    def test_process_cmdline(self):
 
113
        ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid)
 
114
        psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline)
 
115
        self.assertEqual(ps_cmdline, psutil_cmdline)
 
116
 
 
117
    def test_get_pids(self):
 
118
        # Note: this test might fail if the OS is starting/killing
 
119
        # other processes in the meantime
 
120
        p = get_test_subprocess(["ps", "ax", "-o", "pid"], stdout=subprocess.PIPE)
 
121
        output = p.communicate()[0].strip()
 
122
        if sys.version_info >= (3,):
 
123
            output = str(output, sys.stdout.encoding)
 
124
        output = output.replace('PID', '')
 
125
        p.wait()
 
126
        pids_ps = []
 
127
        for pid in output.split('\n'):
 
128
            if pid:
 
129
                pids_ps.append(int(pid.strip()))
 
130
        # remove ps subprocess pid which is supposed to be dead in meantime
 
131
        pids_ps.remove(p.pid)
 
132
        # not all systems include pid 0 in their list but psutil does so
 
133
        # we force it
 
134
        if 0 not in pids_ps:
 
135
            pids_ps.append(0)
 
136
 
 
137
        pids_psutil = psutil.get_pid_list()
 
138
        pids_ps.sort()
 
139
        pids_psutil.sort()
 
140
 
 
141
        if pids_ps != pids_psutil:
 
142
            difference = filter(lambda x:x not in pids_ps, pids_psutil) + \
 
143
                         filter(lambda x:x not in pids_psutil, pids_ps)
 
144
            self.fail("difference: " + str(difference))
 
145
 
 
146
 
 
147
if __name__ == '__main__':
 
148
    test_suite = unittest.TestSuite()
 
149
    test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
 
150
    unittest.TextTestRunner(verbosity=2).run(test_suite)
 
151
 
 
152