Bug 1196253 - update in-tree psutil to 3.1.1. r=gps

This commit is contained in:
Joel Maher
2015-08-20 08:03:31 -04:00
parent e27f1455ab
commit 6fabf5a7a3
73 changed files with 6544 additions and 3863 deletions

View File

@@ -1,15 +0,0 @@
- The recommended way to run tests (also on Windows) is to cd into parent
directory and run:
make test
- If you're on Python < 2.7 unittest2 module must be installed first:
https://pypi.python.org/pypi/unittest2
- The main test script is test_psutil.py, which also imports platform-specific
_*.py scripts (which should be ignored).
- test_memory_leaks.py looks for memory leaks into C extension modules and must
be run separately with:
make memtest

View File

@@ -0,0 +1,21 @@
- The recommended way to run tests (also on Windows) is to cd into parent
directory and run ``make test``
- Dependencies for running tests:
- python 2.6: ipaddress, mock, unittest2
- python 2.7: ipaddress, mock
- python 3.2: ipaddress, mock
- python 3.3: ipaddress
- python >= 3.4: no deps required
- The main test script is ``test_psutil.py``, which also imports platform-specific
``_*.py`` scripts (which should be ignored).
- ``test_memory_leaks.py`` looks for memory leaks into C extension modules and must
be run separately with ``make test-memleaks``.
- To run tests on all supported Python version install tox (pip install tox)
then run ``tox``.
- Every time a commit is pushed tests are automatically run on Travis:
https://travis-ci.org/giampaolo/psutil/

View File

@@ -8,15 +8,15 @@
"""BSD specific tests. These are implicitly run by test_psutil.py."""
import subprocess
import time
import sys
import os
import subprocess
import sys
import time
import psutil
from psutil._compat import PY3
from test_psutil import (TOLERANCE, sh, get_test_subprocess, which,
from test_psutil import (TOLERANCE, BSD, sh, get_test_subprocess, which,
retry_before_failing, reap_children, unittest)
@@ -50,6 +50,7 @@ def muse(field):
return int(line.split()[1])
@unittest.skipUnless(BSD, "not a BSD system")
class BSDSpecificTestCase(unittest.TestCase):
@classmethod
@@ -106,6 +107,7 @@ class BSDSpecificTestCase(unittest.TestCase):
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.used, used))
@retry_before_failing()
def test_memory_maps(self):
out = sh('procstat -v %s' % self.pid)
maps = psutil.Process(self.pid).memory_maps(grouped=False)
@@ -120,6 +122,29 @@ class BSDSpecificTestCase(unittest.TestCase):
if not map.path.startswith('['):
self.assertEqual(fields[10], map.path)
def test_exe(self):
out = sh('procstat -b %s' % self.pid)
self.assertEqual(psutil.Process(self.pid).exe(),
out.split('\n')[1].split()[-1])
def test_cmdline(self):
out = sh('procstat -c %s' % self.pid)
self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()),
' '.join(out.split('\n')[1].split()[2:]))
def test_uids_gids(self):
out = sh('procstat -s %s' % self.pid)
euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8]
p = psutil.Process(self.pid)
uids = p.uids()
gids = p.gids()
self.assertEqual(uids.real, int(ruid))
self.assertEqual(uids.effective, int(euid))
self.assertEqual(uids.saved, int(suid))
self.assertEqual(gids.real, int(rgid))
self.assertEqual(gids.effective, int(egid))
self.assertEqual(gids.saved, int(sgid))
# --- virtual_memory(); tests against sysctl
def test_vmem_total(self):
@@ -162,6 +187,10 @@ class BSDSpecificTestCase(unittest.TestCase):
self.assertAlmostEqual(psutil.virtual_memory().buffers, syst,
delta=TOLERANCE)
def test_cpu_count_logical(self):
syst = sysctl("hw.ncpu")
self.assertEqual(psutil.cpu_count(logical=True), syst)
# --- virtual_memory(); tests against muse
@unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
@@ -212,12 +241,12 @@ class BSDSpecificTestCase(unittest.TestCase):
delta=TOLERANCE)
def test_main():
def main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase))
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
if not main():
sys.exit(1)

View File

@@ -7,18 +7,70 @@
"""Linux specific tests. These are implicitly run by test_psutil.py."""
from __future__ import division
import contextlib
import errno
import fcntl
import io
import os
import pprint
import re
import socket
import struct
import sys
import tempfile
import time
import warnings
from test_psutil import POSIX, TOLERANCE, TRAVIS
try:
from unittest import mock # py3
except ImportError:
import mock # requires "pip install mock"
from test_psutil import POSIX, TOLERANCE, TRAVIS, LINUX
from test_psutil import (skip_on_not_implemented, sh, get_test_subprocess,
retry_before_failing, get_kernel_version, unittest)
retry_before_failing, get_kernel_version, unittest,
which, call_until)
import psutil
import psutil._pslinux
from psutil._compat import PY3, u
SIOCGIFADDR = 0x8915
SIOCGIFCONF = 0x8912
SIOCGIFHWADDR = 0x8927
def get_ipv4_address(ifname):
ifname = ifname[:15]
if PY3:
ifname = bytes(ifname, 'ascii')
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
with contextlib.closing(s):
return socket.inet_ntoa(
fcntl.ioctl(s.fileno(),
SIOCGIFADDR,
struct.pack('256s', ifname))[20:24])
def get_mac_address(ifname):
ifname = ifname[:15]
if PY3:
ifname = bytes(ifname, 'ascii')
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
with contextlib.closing(s):
info = fcntl.ioctl(
s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
if PY3:
def ord(x):
return x
else:
import __builtin__
ord = __builtin__.ord
return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
@unittest.skipUnless(LINUX, "not a Linux system")
class LinuxSpecificTestCase(unittest.TestCase):
@unittest.skipIf(
@@ -139,6 +191,241 @@ class LinuxSpecificTestCase(unittest.TestCase):
else:
self.assertNotIn('guest_nice', fields)
def test_net_if_addrs_ips(self):
for name, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == psutil.AF_LINK:
self.assertEqual(addr.address, get_mac_address(name))
elif addr.family == socket.AF_INET:
self.assertEqual(addr.address, get_ipv4_address(name))
# TODO: test for AF_INET6 family
@unittest.skipUnless(which('ip'), "'ip' utility not available")
@unittest.skipIf(TRAVIS, "skipped on Travis")
def test_net_if_names(self):
out = sh("ip addr").strip()
nics = psutil.net_if_addrs()
found = 0
for line in out.split('\n'):
line = line.strip()
if re.search("^\d+:", line):
found += 1
name = line.split(':')[1].strip()
self.assertIn(name, nics.keys())
self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
pprint.pformat(nics), out))
@unittest.skipUnless(which("nproc"), "nproc utility not available")
def test_cpu_count_logical_w_nproc(self):
num = int(sh("nproc --all"))
self.assertEqual(psutil.cpu_count(logical=True), num)
@unittest.skipUnless(which("lscpu"), "lscpu utility not available")
def test_cpu_count_logical_w_lscpu(self):
out = sh("lscpu -p")
num = len([x for x in out.split('\n') if not x.startswith('#')])
self.assertEqual(psutil.cpu_count(logical=True), num)
# --- mocked tests
def test_virtual_memory_mocked_warnings(self):
with mock.patch('psutil._pslinux.open', create=True) as m:
with warnings.catch_warnings(record=True) as ws:
warnings.simplefilter("always")
ret = psutil._pslinux.virtual_memory()
assert m.called
self.assertEqual(len(ws), 1)
w = ws[0]
self.assertTrue(w.filename.endswith('psutil/_pslinux.py'))
self.assertIn(
"'cached', 'active' and 'inactive' memory stats couldn't "
"be determined", str(w.message))
self.assertEqual(ret.cached, 0)
self.assertEqual(ret.active, 0)
self.assertEqual(ret.inactive, 0)
def test_swap_memory_mocked_warnings(self):
with mock.patch('psutil._pslinux.open', create=True) as m:
with warnings.catch_warnings(record=True) as ws:
warnings.simplefilter("always")
ret = psutil._pslinux.swap_memory()
assert m.called
self.assertEqual(len(ws), 1)
w = ws[0]
self.assertTrue(w.filename.endswith('psutil/_pslinux.py'))
self.assertIn(
"'sin' and 'sout' swap memory stats couldn't "
"be determined", str(w.message))
self.assertEqual(ret.sin, 0)
self.assertEqual(ret.sout, 0)
def test_cpu_count_logical_mocked(self):
import psutil._pslinux
original = psutil._pslinux.cpu_count_logical()
# Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
# order to cause the parsing of /proc/cpuinfo and /proc/stat.
with mock.patch(
'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
assert m.called
# Let's have open() return emtpy data and make sure None is
# returned ('cause we mimick os.cpu_count()).
with mock.patch('psutil._pslinux.open', create=True) as m:
self.assertIsNone(psutil._pslinux.cpu_count_logical())
self.assertEqual(m.call_count, 2)
# /proc/stat should be the last one
self.assertEqual(m.call_args[0][0], '/proc/stat')
# Let's push this a bit further and make sure /proc/cpuinfo
# parsing works as expected.
with open('/proc/cpuinfo', 'rb') as f:
cpuinfo_data = f.read()
fake_file = io.BytesIO(cpuinfo_data)
with mock.patch('psutil._pslinux.open',
return_value=fake_file, create=True) as m:
self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
def test_cpu_count_physical_mocked(self):
# Have open() return emtpy data and make sure None is returned
# ('cause we want to mimick os.cpu_count())
with mock.patch('psutil._pslinux.open', create=True) as m:
self.assertIsNone(psutil._pslinux.cpu_count_physical())
assert m.called
def test_proc_open_files_file_gone(self):
# simulates a file which gets deleted during open_files()
# execution
p = psutil.Process()
files = p.open_files()
with tempfile.NamedTemporaryFile():
# give the kernel some time to see the new file
call_until(p.open_files, "len(ret) != %i" % len(files))
with mock.patch('psutil._pslinux.os.readlink',
side_effect=OSError(errno.ENOENT, "")) as m:
files = p.open_files()
assert not files
assert m.called
# also simulate the case where os.readlink() returns EINVAL
# in which case psutil is supposed to 'continue'
with mock.patch('psutil._pslinux.os.readlink',
side_effect=OSError(errno.EINVAL, "")) as m:
self.assertEqual(p.open_files(), [])
assert m.called
def test_proc_terminal_mocked(self):
with mock.patch('psutil._pslinux._psposix._get_terminal_map',
return_value={}) as m:
self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
assert m.called
def test_proc_num_ctx_switches_mocked(self):
with mock.patch('psutil._pslinux.open', create=True) as m:
self.assertRaises(
NotImplementedError,
psutil._pslinux.Process(os.getpid()).num_ctx_switches)
assert m.called
def test_proc_num_threads_mocked(self):
with mock.patch('psutil._pslinux.open', create=True) as m:
self.assertRaises(
NotImplementedError,
psutil._pslinux.Process(os.getpid()).num_threads)
assert m.called
def test_proc_ppid_mocked(self):
with mock.patch('psutil._pslinux.open', create=True) as m:
self.assertRaises(
NotImplementedError,
psutil._pslinux.Process(os.getpid()).ppid)
assert m.called
def test_proc_uids_mocked(self):
with mock.patch('psutil._pslinux.open', create=True) as m:
self.assertRaises(
NotImplementedError,
psutil._pslinux.Process(os.getpid()).uids)
assert m.called
def test_proc_gids_mocked(self):
with mock.patch('psutil._pslinux.open', create=True) as m:
self.assertRaises(
NotImplementedError,
psutil._pslinux.Process(os.getpid()).gids)
assert m.called
def test_proc_cmdline_mocked(self):
# see: https://github.com/giampaolo/psutil/issues/639
p = psutil.Process()
fake_file = io.StringIO(u('foo\x00bar\x00'))
with mock.patch('psutil._pslinux.open',
return_value=fake_file, create=True) as m:
p.cmdline() == ['foo', 'bar']
assert m.called
fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
with mock.patch('psutil._pslinux.open',
return_value=fake_file, create=True) as m:
p.cmdline() == ['foo', 'bar', '']
assert m.called
def test_proc_io_counters_mocked(self):
with mock.patch('psutil._pslinux.open', create=True) as m:
self.assertRaises(
NotImplementedError,
psutil._pslinux.Process(os.getpid()).io_counters)
assert m.called
def test_boot_time_mocked(self):
with mock.patch('psutil._pslinux.open', create=True) as m:
self.assertRaises(
RuntimeError,
psutil._pslinux.boot_time)
assert m.called
def test_users_mocked(self):
# Make sure ':0' and ':0.0' (returned by C ext) are converted
# to 'localhost'.
with mock.patch('psutil._pslinux.cext.users',
return_value=[('giampaolo', 'pts/2', ':0',
1436573184.0, True)]) as m:
self.assertEqual(psutil.users()[0].host, 'localhost')
assert m.called
with mock.patch('psutil._pslinux.cext.users',
return_value=[('giampaolo', 'pts/2', ':0.0',
1436573184.0, True)]) as m:
self.assertEqual(psutil.users()[0].host, 'localhost')
assert m.called
# ...otherwise it should be returned as-is
with mock.patch('psutil._pslinux.cext.users',
return_value=[('giampaolo', 'pts/2', 'foo',
1436573184.0, True)]) as m:
self.assertEqual(psutil.users()[0].host, 'foo')
assert m.called
def test_disk_partitions_mocked(self):
# Test that ZFS partitions are returned.
with open("/proc/filesystems", "r") as f:
data = f.read()
if 'zfs' in data:
for part in psutil.disk_partitions():
if part.fstype == 'zfs':
break
else:
self.fail("couldn't find any ZFS partition")
else:
# No ZFS partitions on this system. Let's fake one.
fake_file = io.StringIO(u("nodev\tzfs\n"))
with mock.patch('psutil._pslinux.open',
return_value=fake_file, create=True) as m1:
with mock.patch(
'psutil._pslinux.cext.disk_partitions',
return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
ret = psutil.disk_partitions()
assert m1.called
assert m2.called
assert ret
self.assertEqual(ret[0].fstype, 'zfs')
# --- tests for specific kernel versions
@unittest.skipUnless(
@@ -175,12 +462,12 @@ class LinuxSpecificTestCase(unittest.TestCase):
self.assertTrue(hasattr(psutil, "RLIMIT_SIGPENDING"))
def test_main():
def main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase))
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
if not main():
sys.exit(1)

View File

@@ -15,8 +15,8 @@ import time
import psutil
from psutil._compat import PY3
from test_psutil import (TOLERANCE, sh, get_test_subprocess, reap_children,
retry_before_failing, unittest)
from test_psutil import (TOLERANCE, OSX, sh, get_test_subprocess,
reap_children, retry_before_failing, unittest)
PAGESIZE = os.sysconf("SC_PAGE_SIZE")
@@ -47,6 +47,7 @@ def vm_stat(field):
return int(re.search('\d+', line).group(0)) * PAGESIZE
@unittest.skipUnless(OSX, "not an OSX system")
class OSXSpecificTestCase(unittest.TestCase):
@classmethod
@@ -148,12 +149,12 @@ class OSXSpecificTestCase(unittest.TestCase):
self.assertEqual(tot1, tot2)
def test_main():
def main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase))
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
if not main():
sys.exit(1)

View File

@@ -14,8 +14,8 @@ import time
import psutil
from psutil._compat import PY3
from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON
from psutil._compat import PY3, callable
from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON, POSIX, TRAVIS
from test_psutil import (get_test_subprocess, skip_on_access_denied,
retry_before_failing, reap_children, sh, unittest,
get_kernel_version, wait_for_pid)
@@ -42,6 +42,7 @@ def ps(cmd):
return output
@unittest.skipUnless(POSIX, "not a POSIX system")
class PosixSpecificTestCase(unittest.TestCase):
"""Compare psutil results against 'ps' command line utility."""
@@ -111,11 +112,14 @@ class PosixSpecificTestCase(unittest.TestCase):
def test_process_create_time(self):
time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0]
time_psutil = psutil.Process(self.pid).create_time()
if SUNOS:
time_psutil = round(time_psutil)
time_psutil_tstamp = datetime.datetime.fromtimestamp(
time_psutil).strftime("%H:%M:%S")
self.assertEqual(time_ps, time_psutil_tstamp)
# sometimes ps shows the time rounded up instead of down, so we check
# for both possible values
round_time_psutil = round(time_psutil)
round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
round_time_psutil).strftime("%H:%M:%S")
self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp])
def test_process_exe(self):
ps_pathname = ps("ps --no-headers -o command -p %s" %
@@ -173,9 +177,10 @@ class PosixSpecificTestCase(unittest.TestCase):
[x for x in pids_ps if x not in pids_psutil]
self.fail("difference: " + str(difference))
# for some reason ifconfig -a does not report differente interfaces
# psutil does
# for some reason ifconfig -a does not report all interfaces
# returned by psutil
@unittest.skipIf(SUNOS, "test not reliable on SUNOS")
@unittest.skipIf(TRAVIS, "test not reliable on Travis")
def test_nic_names(self):
p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
@@ -186,7 +191,9 @@ class PosixSpecificTestCase(unittest.TestCase):
if line.startswith(nic):
break
else:
self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic)
self.fail(
"couldn't find %s nic in 'ifconfig -a' output\n%s" % (
nic, output))
@retry_before_failing()
def test_users(self):
@@ -208,8 +215,6 @@ class PosixSpecificTestCase(unittest.TestCase):
if attr is not None and callable(attr):
if name == 'rlimit':
args = (psutil.RLIMIT_NOFILE,)
elif name == 'set_rlimit':
args = (psutil.RLIMIT_NOFILE, (5, 5))
attr(*args)
else:
attr
@@ -220,11 +225,10 @@ class PosixSpecificTestCase(unittest.TestCase):
'send_signal', 'wait', 'children', 'as_dict']
if LINUX and get_kernel_version() < (2, 6, 36):
ignored_names.append('rlimit')
if LINUX and get_kernel_version() < (2, 6, 23):
ignored_names.append('num_ctx_switches')
for name in dir(psutil.Process):
if (name.startswith('_')
or name.startswith('set_')
or name.startswith('get') # deprecated APIs
or name in ignored_names):
if (name.startswith('_') or name in ignored_names):
continue
else:
try:
@@ -243,12 +247,12 @@ class PosixSpecificTestCase(unittest.TestCase):
self.fail('\n' + '\n'.join(failures))
def test_main():
def main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
if not main():
sys.exit(1)

View File

@@ -7,15 +7,17 @@
"""Sun OS specific tests. These are implicitly run by test_psutil.py."""
import sys
import os
from test_psutil import sh, unittest
from test_psutil import SUNOS, sh, unittest
import psutil
@unittest.skipUnless(SUNOS, "not a SunOS system")
class SunOSSpecificTestCase(unittest.TestCase):
def test_swap_memory(self):
out = sh('swap -l -k')
out = sh('env PATH=/usr/sbin:/sbin:%s swap -l -k' % os.environ['PATH'])
lines = out.strip().split('\n')[1:]
if not lines:
raise ValueError('no swap device(s) configured')
@@ -35,12 +37,12 @@ class SunOSSpecificTestCase(unittest.TestCase):
self.assertEqual(psutil_swap.free, free)
def test_main():
def main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(SunOSSpecificTestCase))
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
if not main():
sys.exit(1)

View File

@@ -15,8 +15,10 @@ import sys
import time
import traceback
from test_psutil import (get_test_subprocess, reap_children, unittest)
from test_psutil import APPVEYOR, WINDOWS
from test_psutil import get_test_subprocess, reap_children, unittest
import mock
try:
import wmi
except ImportError:
@@ -28,17 +30,18 @@ except ImportError:
win32api = win32con = None
from psutil._compat import PY3, callable, long
from psutil._pswindows import ACCESS_DENIED_SET
import _psutil_windows
import psutil
cext = psutil._psplatform.cext
def wrap_exceptions(fun):
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except OSError:
err = sys.exc_info()[1]
except OSError as err:
from psutil._pswindows import ACCESS_DENIED_SET
if err.errno in ACCESS_DENIED_SET:
raise psutil.AccessDenied(None, None)
if err.errno == errno.ESRCH:
@@ -47,6 +50,7 @@ def wrap_exceptions(fun):
return wrapper
@unittest.skipUnless(WINDOWS, "not a Windows system")
class WindowsSpecificTestCase(unittest.TestCase):
@classmethod
@@ -114,7 +118,9 @@ class WindowsSpecificTestCase(unittest.TestCase):
def test_process_exe(self):
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
p = psutil.Process(self.pid)
self.assertEqual(p.exe(), w.ExecutablePath)
# Note: wmi reports the exe as a lower case string.
# Being Windows paths case-insensitive we ignore that.
self.assertEqual(p.exe().lower(), w.ExecutablePath.lower())
@unittest.skipIf(wmi is None, "wmi module is not installed")
def test_process_cmdline(self):
@@ -164,7 +170,7 @@ class WindowsSpecificTestCase(unittest.TestCase):
# --- psutil namespace functions and constants tests
@unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'),
@unittest.skipUnless('NUMBER_OF_PROCESSORS' in os.environ,
'NUMBER_OF_PROCESSORS env var is not available')
def test_cpu_count(self):
num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
@@ -188,20 +194,16 @@ class WindowsSpecificTestCase(unittest.TestCase):
# time.localtime(p.create_time()))
#
# Note: this test is not very reliable
@unittest.skipIf(wmi is None, "wmi module is not installed")
@unittest.skipIf(APPVEYOR, "test not relieable on appveyor")
def test_pids(self):
# Note: this test might fail if the OS is starting/killing
# other processes in the meantime
w = wmi.WMI().Win32_Process()
wmi_pids = [x.ProcessId for x in w]
wmi_pids.sort()
psutil_pids = psutil.pids()
psutil_pids.sort()
if wmi_pids != psutil_pids:
difference = \
filter(lambda x: x not in wmi_pids, psutil_pids) + \
filter(lambda x: x not in psutil_pids, wmi_pids)
self.fail("difference: " + str(difference))
wmi_pids = set([x.ProcessId for x in w])
psutil_pids = set(psutil.pids())
self.assertEqual(wmi_pids, psutil_pids)
@unittest.skipIf(wmi is None, "wmi module is not installed")
def test_disks(self):
@@ -215,8 +217,7 @@ class WindowsSpecificTestCase(unittest.TestCase):
break
try:
usage = psutil.disk_usage(ps_part.mountpoint)
except OSError:
err = sys.exc_info()[1]
except OSError as err:
if err.errno == errno.ENOENT:
# usually this is the floppy
break
@@ -259,8 +260,6 @@ class WindowsSpecificTestCase(unittest.TestCase):
failures = []
for name in dir(psutil.Process):
if name.startswith('_') \
or name.startswith('set_') \
or name.startswith('get') \
or name in ('terminate', 'kill', 'suspend', 'resume',
'nice', 'send_signal', 'wait', 'children',
'as_dict'):
@@ -282,29 +281,42 @@ class WindowsSpecificTestCase(unittest.TestCase):
if failures:
self.fail('\n' + '\n'.join(failures))
def test_name_always_available(self):
# On Windows name() is never supposed to raise AccessDenied,
# see https://github.com/giampaolo/psutil/issues/627
for p in psutil.process_iter():
try:
p.name()
except psutil.NoSuchProcess():
pass
@unittest.skipUnless(WINDOWS, "not a Windows system")
class TestDualProcessImplementation(unittest.TestCase):
"""
Certain APIs on Windows have 2 internal implementations, one
based on documented Windows APIs, another one based
NtQuerySystemInformation() which gets called as fallback in
case the first fails because of limited permission error.
Here we test that the two methods return the exact same value,
see:
https://github.com/giampaolo/psutil/issues/304
"""
fun_names = [
# function name, tolerance
('proc_cpu_times', 0.2),
('proc_create_time', 0.5),
('proc_num_handles', 1), # 1 because impl #1 opens a handle
('proc_io_counters', 0),
('proc_memory_info', 1024), # KB
('proc_io_counters', 0),
]
def test_compare_values(self):
# Certain APIs on Windows have 2 internal implementations, one
# based on documented Windows APIs, another one based
# NtQuerySystemInformation() which gets called as fallback in
# case the first fails because of limited permission error.
# Here we test that the two methods return the exact same value,
# see:
# https://github.com/giampaolo/psutil/issues/304
def assert_ge_0(obj):
if isinstance(obj, tuple):
for value in obj:
self.assertGreaterEqual(value, 0)
self.assertGreaterEqual(value, 0, msg=obj)
elif isinstance(obj, (int, long, float)):
self.assertGreaterEqual(obj, 0)
else:
@@ -322,55 +334,125 @@ class TestDualProcessImplementation(unittest.TestCase):
diff = abs(a - b)
self.assertLessEqual(diff, tolerance)
from psutil._pswindows import ntpinfo
failures = []
for name, tolerance in self.fun_names:
meth1 = wrap_exceptions(getattr(_psutil_windows, name))
meth2 = wrap_exceptions(getattr(_psutil_windows, name + '_2'))
for p in psutil.process_iter():
for p in psutil.process_iter():
try:
nt = ntpinfo(*cext.proc_info(p.pid))
except psutil.NoSuchProcess:
continue
assert_ge_0(nt)
for name, tolerance in self.fun_names:
if name == 'proc_memory_info' and p.pid == os.getpid():
continue
#
try:
ret1 = meth1(p.pid)
except psutil.NoSuchProcess:
if name == 'proc_create_time' and p.pid in (0, 4):
continue
except psutil.AccessDenied:
ret1 = None
#
meth = wrap_exceptions(getattr(cext, name))
try:
ret2 = meth2(p.pid)
except psutil.NoSuchProcess:
# this is supposed to fail only in case of zombie process
# never for permission error
ret = meth(p.pid)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# compare values
try:
if ret1 is None:
assert_ge_0(ret2)
else:
compare_with_tolerance(ret1, ret2, tolerance)
assert_ge_0(ret1)
assert_ge_0(ret2)
if name == 'proc_cpu_times':
compare_with_tolerance(ret[0], nt.user_time, tolerance)
compare_with_tolerance(ret[1],
nt.kernel_time, tolerance)
elif name == 'proc_create_time':
compare_with_tolerance(ret, nt.create_time, tolerance)
elif name == 'proc_num_handles':
compare_with_tolerance(ret, nt.num_handles, tolerance)
elif name == 'proc_io_counters':
compare_with_tolerance(ret[0], nt.io_rcount, tolerance)
compare_with_tolerance(ret[1], nt.io_wcount, tolerance)
compare_with_tolerance(ret[2], nt.io_rbytes, tolerance)
compare_with_tolerance(ret[3], nt.io_wbytes, tolerance)
elif name == 'proc_memory_info':
try:
rawtupl = cext.proc_memory_info_2(p.pid)
except psutil.NoSuchProcess:
continue
compare_with_tolerance(ret, rawtupl, tolerance)
except AssertionError:
trace = traceback.format_exc()
msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' % (
trace, p.pid, name, ret1, ret2)
trace, p.pid, name, ret, nt)
failures.append(msg)
break
if failures:
self.fail('\n\n'.join(failures))
# ---
# same tests as above but mimicks the AccessDenied failure of
# the first (fast) method failing with AD.
# TODO: currently does not take tolerance into account.
def test_name(self):
name = psutil.Process().name()
with mock.patch("psutil._psplatform.cext.proc_exe",
side_effect=psutil.AccessDenied(os.getpid())) as fun:
psutil.Process().name() == name
assert fun.called
def test_memory_info(self):
mem = psutil.Process().memory_info()
with mock.patch("psutil._psplatform.cext.proc_memory_info",
side_effect=OSError(errno.EPERM, "msg")) as fun:
psutil.Process().memory_info() == mem
assert fun.called
def test_create_time(self):
ctime = psutil.Process().create_time()
with mock.patch("psutil._psplatform.cext.proc_create_time",
side_effect=OSError(errno.EPERM, "msg")) as fun:
psutil.Process().create_time() == ctime
assert fun.called
def test_cpu_times(self):
cpu_times = psutil.Process().cpu_times()
with mock.patch("psutil._psplatform.cext.proc_cpu_times",
side_effect=OSError(errno.EPERM, "msg")) as fun:
psutil.Process().cpu_times() == cpu_times
assert fun.called
def test_io_counters(self):
io_counters = psutil.Process().io_counters()
with mock.patch("psutil._psplatform.cext.proc_io_counters",
side_effect=OSError(errno.EPERM, "msg")) as fun:
psutil.Process().io_counters() == io_counters
assert fun.called
def test_num_handles(self):
io_counters = psutil.Process().io_counters()
with mock.patch("psutil._psplatform.cext.proc_io_counters",
side_effect=OSError(errno.EPERM, "msg")) as fun:
psutil.Process().io_counters() == io_counters
assert fun.called
# --- other tests
def test_compare_name_exe(self):
for p in psutil.process_iter():
try:
a = os.path.basename(p.exe())
b = p.name()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
else:
self.assertEqual(a, b)
def test_zombies(self):
# test that NPS is raised by the 2nd implementation in case a
# process no longer exists
ZOMBIE_PID = max(psutil.pids()) + 5000
for name, _ in self.fun_names:
meth = wrap_exceptions(getattr(_psutil_windows, name))
meth = wrap_exceptions(getattr(cext, name))
self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID)
def test_main():
def main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase))
test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation))
@@ -378,5 +460,5 @@ def test_main():
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
if not main():
sys.exit(1)

View File

@@ -10,6 +10,7 @@ functions many times and compare process memory usage before and
after the calls. It might produce false positives.
"""
import functools
import gc
import os
import socket
@@ -17,20 +18,20 @@ import sys
import threading
import time
import psutil
import psutil._common
from psutil._compat import xrange, callable
from test_psutil import (WINDOWS, POSIX, OSX, LINUX, SUNOS, BSD, TESTFN,
RLIMIT_SUPPORT, TRAVIS)
from test_psutil import (reap_children, supports_ipv6, safe_remove,
get_test_subprocess)
if sys.version_info < (2, 7):
import unittest2 as unittest # https://pypi.python.org/pypi/unittest2
else:
import unittest
import psutil
import psutil._common
from psutil._compat import callable, xrange
from test_psutil import (WINDOWS, POSIX, OSX, LINUX, SUNOS, TESTFN,
RLIMIT_SUPPORT)
from test_psutil import (reap_children, supports_ipv6, safe_remove,
get_test_subprocess)
LOOPS = 1000
TOLERANCE = 4096
@@ -43,7 +44,7 @@ def skip_if_linux():
class Base(unittest.TestCase):
proc = psutil.Process(os.getpid())
proc = psutil.Process()
def execute(self, function, *args, **kwargs):
def call_many_times():
@@ -73,7 +74,7 @@ class Base(unittest.TestCase):
# Let's keep calling fun for 3 more seconds and fail if
# we notice any difference.
stop_at = time.time() + 3
while 1:
while True:
self.call(function, *args, **kwargs)
if time.time() >= stop_at:
break
@@ -85,10 +86,14 @@ class Base(unittest.TestCase):
self.fail("rss2=%s, rss3=%s, difference=%s"
% (rss2, rss3, difference))
def get_mem(self):
return psutil.Process(os.getpid()).memory_info()[0]
def execute_w_exc(self, exc, function, *args, **kwargs):
kwargs['_exc'] = exc
self.execute(function, *args, **kwargs)
def call(self, *args, **kwargs):
def get_mem(self):
return psutil.Process().memory_info()[0]
def call(self, function, *args, **kwargs):
raise NotImplementedError("must be implemented in subclass")
@@ -102,12 +107,25 @@ class TestProcessObjectLeaks(Base):
reap_children()
def call(self, function, *args, **kwargs):
try:
obj = getattr(self.proc, function)
if callable(obj):
obj(*args, **kwargs)
except psutil.Error:
pass
if callable(function):
if '_exc' in kwargs:
exc = kwargs.pop('_exc')
self.assertRaises(exc, function, *args, **kwargs)
else:
try:
function(*args, **kwargs)
except psutil.Error:
pass
else:
meth = getattr(self.proc, function)
if '_exc' in kwargs:
exc = kwargs.pop('_exc')
self.assertRaises(exc, meth, *args, **kwargs)
else:
try:
meth(*args, **kwargs)
except psutil.Error:
pass
@skip_if_linux()
def test_name(self):
@@ -143,7 +161,7 @@ class TestProcessObjectLeaks(Base):
self.execute('nice')
def test_nice_set(self):
niceness = psutil.Process(os.getpid()).nice()
niceness = psutil.Process().nice()
self.execute('nice', niceness)
@unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
@@ -155,16 +173,20 @@ class TestProcessObjectLeaks(Base):
"Linux and Windows Vista only")
def test_ionice_set(self):
if WINDOWS:
value = psutil.Process(os.getpid()).ionice()
value = psutil.Process().ionice()
self.execute('ionice', value)
else:
from psutil._pslinux import cext
self.execute('ionice', psutil.IOPRIO_CLASS_NONE)
fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0)
self.execute_w_exc(OSError, fun)
@unittest.skipIf(OSX, "feature not supported on this platform")
@unittest.skipIf(OSX or SUNOS, "feature not supported on this platform")
@skip_if_linux()
def test_io_counters(self):
self.execute('io_counters')
@unittest.skipUnless(WINDOWS, "not worth being tested on posix")
def test_username(self):
self.execute('username')
@@ -215,23 +237,24 @@ class TestProcessObjectLeaks(Base):
def test_cwd(self):
self.execute('cwd')
@unittest.skipUnless(WINDOWS or LINUX, "Windows or Linux only")
@unittest.skipUnless(WINDOWS or LINUX or BSD,
"Windows or Linux or BSD only")
def test_cpu_affinity_get(self):
self.execute('cpu_affinity')
@unittest.skipUnless(WINDOWS or LINUX, "Windows or Linux only")
@unittest.skipUnless(WINDOWS or LINUX or BSD,
"Windows or Linux or BSD only")
def test_cpu_affinity_set(self):
affinity = psutil.Process(os.getpid()).cpu_affinity()
affinity = psutil.Process().cpu_affinity()
self.execute('cpu_affinity', affinity)
if not TRAVIS:
self.execute_w_exc(ValueError, 'cpu_affinity', [-1])
@skip_if_linux()
def test_open_files(self):
safe_remove(TESTFN) # needed after UNIX socket test has run
f = open(TESTFN, 'w')
try:
with open(TESTFN, 'w'):
self.execute('open_files')
finally:
f.close()
# OSX implementation is unbelievably slow
@unittest.skipIf(OSX, "OSX implementation is too slow")
@@ -251,6 +274,7 @@ class TestProcessObjectLeaks(Base):
def test_rlimit_set(self):
limit = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)
self.execute('rlimit', psutil.RLIMIT_NOFILE, limit)
self.execute_w_exc(OSError, 'rlimit', -1)
@skip_if_linux()
# Windows implementation is based on a single system-wide function
@@ -301,6 +325,12 @@ class TestProcessObjectLeaksZombie(TestProcessObjectLeaks):
"""
proc = DEAD_PROC
def call(self, *args, **kwargs):
try:
TestProcessObjectLeaks.call(self, *args, **kwargs)
except psutil.NoSuchProcess:
pass
if not POSIX:
def test_kill(self):
self.execute('kill')
@@ -325,9 +355,8 @@ class TestModuleFunctionsLeaks(Base):
gc.collect()
def call(self, function, *args, **kwargs):
obj = getattr(psutil, function)
if callable(obj):
obj(*args, **kwargs)
fun = getattr(psutil, function)
fun(*args, **kwargs)
@skip_if_linux()
def test_cpu_count_logical(self):
@@ -393,8 +422,15 @@ class TestModuleFunctionsLeaks(Base):
def test_net_connections(self):
self.execute('net_connections')
def test_net_if_addrs(self):
self.execute('net_if_addrs')
def test_main():
@unittest.skipIf(TRAVIS, "EPERM on travis")
def test_net_if_stats(self):
self.execute('net_if_stats')
def main():
test_suite = unittest.TestSuite()
tests = [TestProcessObjectLeaksZombie,
TestProcessObjectLeaks,
@@ -405,5 +441,5 @@ def test_main():
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
if not main():
sys.exit(1)

File diff suppressed because it is too large Load Diff