Bug 870575 - Upgrade psutil to 0.7.1; rs=me
Archive obtained from https://psutil.googlecode.com/files/psutil-0.7.1.tar.gz and checked in with no modifications.
This commit is contained in:
@@ -1,7 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# $Id: _bsd.py 1498 2012-07-24 21:41:28Z g.rodola $
|
||||
#
|
||||
|
||||
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# $Id: _linux.py 1498 2012-07-24 21:41:28Z g.rodola $
|
||||
#
|
||||
|
||||
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
@@ -14,6 +12,7 @@ import subprocess
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
|
||||
from test_psutil import sh, get_test_subprocess
|
||||
from psutil._compat import PY3
|
||||
@@ -115,6 +114,26 @@ class LinuxSpecificTestCase(unittest.TestCase):
|
||||
free = int(lines[2].split()[3]) * 1024
|
||||
self.assert_eq_w_tol(free, psutil.swap_memory().free, TOLERANCE)
|
||||
|
||||
def test_cpu_times(self):
|
||||
fields = psutil.cpu_times()._fields
|
||||
kernel_ver = re.findall('\d.\d.\d', os.uname()[2])[0]
|
||||
kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
|
||||
# steal >= 2.6.11
|
||||
# guest >= 2.6.24
|
||||
# guest_nice >= 3.2.0
|
||||
if kernel_ver_info >= (2, 6, 11):
|
||||
assert 'steal' in fields, fields
|
||||
else:
|
||||
assert 'steal' not in fields, fields
|
||||
if kernel_ver_info >= (2, 6, 24):
|
||||
assert 'guest' in fields, fields
|
||||
else:
|
||||
assert 'guest' not in fields, fields
|
||||
if kernel_ver_info >= (3, 2, 0):
|
||||
assert 'guest_nice' in fields, fields
|
||||
else:
|
||||
assert 'guest_nice' not in fields, fields
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_suite = unittest.TestSuite()
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# $Id: _osx.py 1498 2012-07-24 21:41:28Z g.rodola $
|
||||
#
|
||||
|
||||
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
@@ -22,7 +20,7 @@ from test_psutil import reap_children, get_test_subprocess, sh
|
||||
|
||||
|
||||
PAGESIZE = os.sysconf("SC_PAGE_SIZE")
|
||||
TOLERANCE = 200 * 1024 # 200 KB
|
||||
TOLERANCE = 500 * 1024 # 500 KB
|
||||
|
||||
|
||||
def sysctl(cmdline):
|
||||
@@ -61,7 +59,7 @@ class OSXSpecificTestCase(unittest.TestCase):
|
||||
difference = abs(first - second)
|
||||
if difference <= tolerance:
|
||||
return
|
||||
msg = '%r != %r within %r delta (%r difference)' \
|
||||
msg = '%r != %r (tolerance=%r, difference=%s)' \
|
||||
% (first, second, tolerance, difference)
|
||||
raise AssertionError(msg)
|
||||
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# $Id: _posix.py 1386 2012-06-27 15:44:36Z g.rodola $
|
||||
#
|
||||
|
||||
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
@@ -19,7 +17,7 @@ import psutil
|
||||
|
||||
from psutil._compat import PY3
|
||||
from test_psutil import (get_test_subprocess, reap_children, PYTHON, LINUX, OSX,
|
||||
BSD, ignore_access_denied, sh, skipIf)
|
||||
BSD, skip_on_access_denied, sh, skipIf)
|
||||
|
||||
|
||||
def ps(cmd):
|
||||
@@ -72,7 +70,7 @@ class PosixSpecificTestCase(unittest.TestCase):
|
||||
username_psutil = psutil.Process(self.pid).username
|
||||
self.assertEqual(username_ps, username_psutil)
|
||||
|
||||
@ignore_access_denied
|
||||
@skip_on_access_denied()
|
||||
def test_process_rss_memory(self):
|
||||
# give python interpreter some time to properly initialize
|
||||
# so that the results are the same
|
||||
@@ -81,7 +79,7 @@ class PosixSpecificTestCase(unittest.TestCase):
|
||||
rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024
|
||||
self.assertEqual(rss_ps, rss_psutil)
|
||||
|
||||
@ignore_access_denied
|
||||
@skip_on_access_denied()
|
||||
def test_process_vsz_memory(self):
|
||||
# give python interpreter some time to properly initialize
|
||||
# so that the results are the same
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# $Id: _windows.py 1453 2012-07-13 19:55:11Z g.rodola $
|
||||
#
|
||||
|
||||
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# $Id: test_memory_leaks.py 1499 2012-07-25 12:42:31Z g.rodola $
|
||||
#
|
||||
|
||||
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
@@ -149,7 +147,11 @@ class TestProcessObjectLeaks(Base):
|
||||
self.execute('get_ionice')
|
||||
|
||||
def test_set_ionice(self):
|
||||
self.execute('set_ionice', psutil.IOPRIO_CLASS_NONE)
|
||||
if WINDOWS:
|
||||
value = psutil.Process(os.getpid()).get_ionice()
|
||||
self.execute('set_ionice', value)
|
||||
else:
|
||||
self.execute('set_ionice', psutil.IOPRIO_CLASS_NONE)
|
||||
|
||||
def test_username(self):
|
||||
self.execute('username')
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# $Id: test_psutil.py 1525 2012-08-16 16:32:03Z g.rodola $
|
||||
#
|
||||
|
||||
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
@@ -33,7 +31,7 @@ import collections
|
||||
import datetime
|
||||
|
||||
import psutil
|
||||
from psutil._compat import PY3, callable, long
|
||||
from psutil._compat import PY3, callable, long, wraps
|
||||
|
||||
|
||||
PYTHON = os.path.realpath(sys.executable)
|
||||
@@ -48,7 +46,7 @@ BSD = sys.platform.startswith("freebsd")
|
||||
|
||||
_subprocesses_started = set()
|
||||
|
||||
def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None,
|
||||
def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL,
|
||||
wait=False):
|
||||
"""Return a subprocess.Popen object to use in tests.
|
||||
By default stdout and stderr are redirected to /dev/null and the
|
||||
@@ -59,7 +57,7 @@ def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None,
|
||||
if cmd is None:
|
||||
pyline = ""
|
||||
if wait:
|
||||
pyline += "open('%s', 'w'); " % TESTFN
|
||||
pyline += "open(r'%s', 'w'); " % TESTFN
|
||||
pyline += "import time; time.sleep(3600);"
|
||||
cmd_ = [PYTHON, "-c", pyline]
|
||||
else:
|
||||
@@ -68,7 +66,7 @@ def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None,
|
||||
if wait:
|
||||
if cmd is None:
|
||||
stop_at = time.time() + 3
|
||||
while time.time() > stop_at:
|
||||
while stop_at > time.time():
|
||||
if os.path.exists(TESTFN):
|
||||
break
|
||||
time.sleep(0.001)
|
||||
@@ -166,7 +164,9 @@ def safe_remove(fname):
|
||||
try:
|
||||
os.remove(fname)
|
||||
except OSError:
|
||||
pass
|
||||
err = sys.exc_info()[1]
|
||||
if err.args[0] != errno.ENOENT:
|
||||
raise
|
||||
|
||||
def call_until(fun, expr, timeout=1):
|
||||
"""Keep calling function for timeout secs and exit if eval()
|
||||
@@ -213,16 +213,22 @@ def skipUnless(condition, reason="", warn=False):
|
||||
return skipIf(True, reason, warn)
|
||||
return skipIf(False)
|
||||
|
||||
def ignore_access_denied(fun):
|
||||
def skip_on_access_denied(only_if=None):
|
||||
"""Decorator to Ignore AccessDenied exceptions."""
|
||||
def outer(fun, *args, **kwargs):
|
||||
def inner(self):
|
||||
def decorator(fun):
|
||||
@wraps(fun)
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return fun(self, *args, **kwargs)
|
||||
return fun(*args, **kwargs)
|
||||
except psutil.AccessDenied:
|
||||
pass
|
||||
return inner
|
||||
return outer
|
||||
if only_if is not None:
|
||||
if not only_if:
|
||||
raise
|
||||
atexit.register(warn, "%r was skipped because it raised " \
|
||||
"AccessDenied" % fun.__name__)
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
def supports_ipv6():
|
||||
"""Return True if IPv6 is supported on this platform."""
|
||||
@@ -320,12 +326,21 @@ class TestCase(unittest.TestCase):
|
||||
assert x > 0
|
||||
self.assertEqual(x, psutil.virtual_memory().total)
|
||||
|
||||
def test_BOOT_TIME(self):
|
||||
x = psutil.BOOT_TIME
|
||||
assert isinstance(x, float)
|
||||
assert x > 0
|
||||
def test_BOOT_TIME(self, arg=None):
|
||||
x = arg or psutil.BOOT_TIME
|
||||
assert isinstance(x, float), x
|
||||
assert x > 0, x
|
||||
assert x < time.time(), x
|
||||
|
||||
def test_get_boot_time(self):
|
||||
self.test_BOOT_TIME(psutil.get_boot_time())
|
||||
if WINDOWS:
|
||||
# work around float precision issues; give it 1 secs tolerance
|
||||
diff = abs(psutil.get_boot_time() - psutil.BOOT_TIME)
|
||||
assert diff < 1, diff
|
||||
else:
|
||||
self.assertEqual(psutil.get_boot_time(), psutil.BOOT_TIME)
|
||||
|
||||
def test_NUM_CPUS(self):
|
||||
self.assertEqual(psutil.NUM_CPUS, len(psutil.cpu_times(percpu=True)))
|
||||
assert psutil.NUM_CPUS >= 1, psutil.NUM_CPUS
|
||||
@@ -342,6 +357,7 @@ class TestCase(unittest.TestCase):
|
||||
warnings.filterwarnings("error")
|
||||
p = psutil.Process(os.getpid())
|
||||
try:
|
||||
self.assertRaises(DeprecationWarning, __import__, 'psutil.error')
|
||||
self.assertRaises(DeprecationWarning, psutil.virtmem_usage)
|
||||
self.assertRaises(DeprecationWarning, psutil.used_phymem)
|
||||
self.assertRaises(DeprecationWarning, psutil.avail_phymem)
|
||||
@@ -468,6 +484,8 @@ class TestCase(unittest.TestCase):
|
||||
total += cp_time
|
||||
self.assertEqual(total, sum(times))
|
||||
str(times)
|
||||
self.assertEqual(len(psutil.cpu_times(percpu=True)[0]),
|
||||
len(psutil.cpu_times(percpu=False)))
|
||||
|
||||
def test_sys_per_cpu_times2(self):
|
||||
tot1 = psutil.cpu_times(percpu=True)
|
||||
@@ -483,24 +501,42 @@ class TestCase(unittest.TestCase):
|
||||
return
|
||||
self.fail()
|
||||
|
||||
def _test_cpu_percent(self, percent):
|
||||
assert isinstance(percent, float), percent
|
||||
assert percent >= 0.0, percent
|
||||
assert percent <= 100.0, percent
|
||||
|
||||
def test_sys_cpu_percent(self):
|
||||
psutil.cpu_percent(interval=0.001)
|
||||
psutil.cpu_percent(interval=0.001)
|
||||
for x in range(1000):
|
||||
percent = psutil.cpu_percent(interval=None)
|
||||
assert isinstance(percent, float)
|
||||
assert percent >= 0.0, percent
|
||||
assert percent <= 100.0, percent
|
||||
self._test_cpu_percent(psutil.cpu_percent(interval=None))
|
||||
|
||||
def test_sys_per_cpu_percent(self):
|
||||
psutil.cpu_percent(interval=0.001, percpu=True)
|
||||
psutil.cpu_percent(interval=0.001, percpu=True)
|
||||
self.assertEqual(len(psutil.cpu_percent(interval=0.001, percpu=True)),
|
||||
psutil.NUM_CPUS)
|
||||
for x in range(1000):
|
||||
percents = psutil.cpu_percent(interval=None, percpu=True)
|
||||
for percent in percents:
|
||||
assert isinstance(percent, float)
|
||||
assert percent >= 0.0, percent
|
||||
assert percent <= 100.0, percent
|
||||
self._test_cpu_percent(percent)
|
||||
|
||||
def test_sys_cpu_times_percent(self):
|
||||
psutil.cpu_times_percent(interval=0.001)
|
||||
for x in range(1000):
|
||||
cpu = psutil.cpu_times_percent(interval=None)
|
||||
for percent in cpu:
|
||||
self._test_cpu_percent(percent)
|
||||
self._test_cpu_percent(sum(cpu))
|
||||
|
||||
def test_sys_per_cpu_times_percent(self):
|
||||
self.assertEqual(len(psutil.cpu_times_percent(interval=0.001,
|
||||
percpu=True)),
|
||||
psutil.NUM_CPUS)
|
||||
for x in range(1000):
|
||||
cpus = psutil.cpu_times_percent(interval=None, percpu=True)
|
||||
for cpu in cpus:
|
||||
for percent in cpu:
|
||||
self._test_cpu_percent(percent)
|
||||
self._test_cpu_percent(sum(cpu))
|
||||
|
||||
def test_disk_usage(self):
|
||||
usage = psutil.disk_usage(os.getcwd())
|
||||
@@ -525,6 +561,8 @@ class TestCase(unittest.TestCase):
|
||||
|
||||
def test_disk_partitions(self):
|
||||
for disk in psutil.disk_partitions(all=False):
|
||||
if WINDOWS and 'cdrom' in disk.opts:
|
||||
continue
|
||||
assert os.path.exists(disk.device), disk
|
||||
assert os.path.isdir(disk.mountpoint), disk
|
||||
assert disk.fstype, disk
|
||||
@@ -599,9 +637,17 @@ class TestCase(unittest.TestCase):
|
||||
ret = psutil.disk_io_counters(perdisk=False)
|
||||
check_ntuple(ret)
|
||||
ret = psutil.disk_io_counters(perdisk=True)
|
||||
# make sure there are no duplicates
|
||||
self.assertEqual(len(ret), len(set(ret)))
|
||||
for key in ret:
|
||||
assert key, key
|
||||
check_ntuple(ret[key])
|
||||
if LINUX and key[-1].isdigit():
|
||||
# if 'sda1' is listed 'sda' shouldn't, see:
|
||||
# http://code.google.com/p/psutil/issues/detail?id=338
|
||||
while key[-1].isdigit():
|
||||
key = key[:-1]
|
||||
assert key not in ret, (key, ret.keys())
|
||||
|
||||
def test_get_users(self):
|
||||
users = psutil.get_users()
|
||||
@@ -828,37 +874,53 @@ class TestCase(unittest.TestCase):
|
||||
assert io2.read_count >= io1.read_count, (io1, io2)
|
||||
assert io2.read_bytes >= io1.read_bytes, (io1, io2)
|
||||
|
||||
@skipUnless(LINUX)
|
||||
# Linux and Windows Vista+
|
||||
@skipUnless(hasattr(psutil.Process, 'get_ionice'))
|
||||
def test_get_set_ionice(self):
|
||||
from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, IOPRIO_CLASS_BE,
|
||||
IOPRIO_CLASS_IDLE)
|
||||
self.assertEqual(IOPRIO_CLASS_NONE, 0)
|
||||
self.assertEqual(IOPRIO_CLASS_RT, 1)
|
||||
self.assertEqual(IOPRIO_CLASS_BE, 2)
|
||||
self.assertEqual(IOPRIO_CLASS_IDLE, 3)
|
||||
p = psutil.Process(os.getpid())
|
||||
try:
|
||||
p.set_ionice(2)
|
||||
ioclass, value = p.get_ionice()
|
||||
self.assertEqual(ioclass, 2)
|
||||
self.assertEqual(value, 4)
|
||||
if LINUX:
|
||||
from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT,
|
||||
IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE)
|
||||
self.assertEqual(IOPRIO_CLASS_NONE, 0)
|
||||
self.assertEqual(IOPRIO_CLASS_RT, 1)
|
||||
self.assertEqual(IOPRIO_CLASS_BE, 2)
|
||||
self.assertEqual(IOPRIO_CLASS_IDLE, 3)
|
||||
p = psutil.Process(os.getpid())
|
||||
try:
|
||||
p.set_ionice(2)
|
||||
ioclass, value = p.get_ionice()
|
||||
self.assertEqual(ioclass, 2)
|
||||
self.assertEqual(value, 4)
|
||||
#
|
||||
p.set_ionice(3)
|
||||
ioclass, value = p.get_ionice()
|
||||
self.assertEqual(ioclass, 3)
|
||||
self.assertEqual(value, 0)
|
||||
#
|
||||
p.set_ionice(2, 0)
|
||||
ioclass, value = p.get_ionice()
|
||||
self.assertEqual(ioclass, 2)
|
||||
self.assertEqual(value, 0)
|
||||
p.set_ionice(2, 7)
|
||||
ioclass, value = p.get_ionice()
|
||||
self.assertEqual(ioclass, 2)
|
||||
self.assertEqual(value, 7)
|
||||
self.assertRaises(ValueError, p.set_ionice, 2, 10)
|
||||
finally:
|
||||
p.set_ionice(IOPRIO_CLASS_NONE)
|
||||
else:
|
||||
p = psutil.Process(os.getpid())
|
||||
original = p.get_ionice()
|
||||
try:
|
||||
value = 0 # very low
|
||||
if original == value:
|
||||
value = 1 # low
|
||||
p.set_ionice(value)
|
||||
self.assertEqual(p.get_ionice(), value)
|
||||
finally:
|
||||
p.set_ionice(original)
|
||||
#
|
||||
p.set_ionice(3)
|
||||
ioclass, value = p.get_ionice()
|
||||
self.assertEqual(ioclass, 3)
|
||||
self.assertEqual(value, 0)
|
||||
#
|
||||
p.set_ionice(2, 0)
|
||||
ioclass, value = p.get_ionice()
|
||||
self.assertEqual(ioclass, 2)
|
||||
self.assertEqual(value, 0)
|
||||
p.set_ionice(2, 7)
|
||||
ioclass, value = p.get_ionice()
|
||||
self.assertEqual(ioclass, 2)
|
||||
self.assertEqual(value, 7)
|
||||
self.assertRaises(ValueError, p.set_ionice, 2, 10)
|
||||
finally:
|
||||
p.set_ionice(IOPRIO_CLASS_NONE)
|
||||
self.assertRaises(ValueError, p.set_ionice, 3)
|
||||
self.assertRaises(TypeError, p.set_ionice, 2, 1)
|
||||
|
||||
def test_get_num_threads(self):
|
||||
# on certain platforms such as Linux we might test for exact
|
||||
@@ -983,14 +1045,18 @@ class TestCase(unittest.TestCase):
|
||||
try:
|
||||
self.assertEqual(exe, PYTHON)
|
||||
except AssertionError:
|
||||
# certain platforms such as BSD are more accurate returning:
|
||||
# "/usr/local/bin/python2.7"
|
||||
# ...instead of:
|
||||
# "/usr/local/bin/python"
|
||||
# We do not want to consider this difference in accuracy
|
||||
# an error.
|
||||
ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
|
||||
self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, ''))
|
||||
if WINDOWS and len(exe) == len(PYTHON):
|
||||
# on Windows we don't care about case sensitivity
|
||||
self.assertEqual(exe.lower(), PYTHON.lower())
|
||||
else:
|
||||
# certain platforms such as BSD are more accurate returning:
|
||||
# "/usr/local/bin/python2.7"
|
||||
# ...instead of:
|
||||
# "/usr/local/bin/python"
|
||||
# We do not want to consider this difference in accuracy
|
||||
# an error.
|
||||
ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
|
||||
self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, ''))
|
||||
|
||||
def test_cmdline(self):
|
||||
sproc = get_test_subprocess([PYTHON, "-E"], wait=True)
|
||||
@@ -998,8 +1064,9 @@ class TestCase(unittest.TestCase):
|
||||
|
||||
def test_name(self):
|
||||
sproc = get_test_subprocess(PYTHON, wait=True)
|
||||
name = psutil.Process(sproc.pid).name.lower()
|
||||
pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
|
||||
self.assertEqual(psutil.Process(sproc.pid).name.lower(), pyexe)
|
||||
assert pyexe.startswith(name), (pyexe, name)
|
||||
|
||||
if os.name == 'posix':
|
||||
|
||||
@@ -1367,7 +1434,7 @@ class TestCase(unittest.TestCase):
|
||||
def test_get_num_ctx_switches(self):
|
||||
p = psutil.Process(os.getpid())
|
||||
before = sum(p.get_num_ctx_switches())
|
||||
for x in range(50000):
|
||||
for x in range(500000):
|
||||
after = sum(p.get_num_ctx_switches())
|
||||
if after > before:
|
||||
return
|
||||
@@ -1428,7 +1495,7 @@ class TestCase(unittest.TestCase):
|
||||
except psutil.Error:
|
||||
pass
|
||||
# this is the one, now let's make sure there are no duplicates
|
||||
pid = max(sorted(table, key=lambda x: table[x]))
|
||||
pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
|
||||
p = psutil.Process(pid)
|
||||
try:
|
||||
c = p.get_children(recursive=True)
|
||||
@@ -1452,8 +1519,7 @@ class TestCase(unittest.TestCase):
|
||||
def test_invalid_pid(self):
|
||||
self.assertRaises(TypeError, psutil.Process, "1")
|
||||
self.assertRaises(TypeError, psutil.Process, None)
|
||||
# Refers to Issue #12
|
||||
self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1)
|
||||
self.assertRaises(ValueError, psutil.Process, -1)
|
||||
|
||||
def test_as_dict(self):
|
||||
sproc = get_test_subprocess()
|
||||
@@ -1569,6 +1635,21 @@ class TestCase(unittest.TestCase):
|
||||
self.assertIn(0, psutil.get_pid_list())
|
||||
self.assertTrue(psutil.pid_exists(0))
|
||||
|
||||
def test__all__(self):
|
||||
for name in dir(psutil):
|
||||
if name in ('callable', 'defaultdict', 'error', 'namedtuple'):
|
||||
continue
|
||||
if not name.startswith('_'):
|
||||
try:
|
||||
__import__(name)
|
||||
except ImportError:
|
||||
if name not in psutil.__all__:
|
||||
fun = getattr(psutil, name)
|
||||
if fun is None:
|
||||
continue
|
||||
if 'deprecated' not in fun.__doc__.lower():
|
||||
self.fail('%r not in psutil.__all__' % name)
|
||||
|
||||
def test_Popen(self):
|
||||
# Popen class test
|
||||
# XXX this test causes a ResourceWarning on Python 3 because
|
||||
@@ -1620,8 +1701,11 @@ class TestFetchAllProcesses(unittest.TestCase):
|
||||
continue
|
||||
attrs.append(name)
|
||||
|
||||
for p in psutil.process_iter():
|
||||
for name in attrs:
|
||||
default = object()
|
||||
failures = []
|
||||
for name in attrs:
|
||||
for p in psutil.process_iter():
|
||||
ret = default
|
||||
try:
|
||||
try:
|
||||
attr = getattr(p, name, None)
|
||||
@@ -1632,6 +1716,13 @@ class TestFetchAllProcesses(unittest.TestCase):
|
||||
valid_procs += 1
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
err = sys.exc_info()[1]
|
||||
if isinstance(err, psutil.NoSuchProcess):
|
||||
if psutil.pid_exists(p.pid):
|
||||
# XXX race condition; we probably need
|
||||
# to try figuring out the process
|
||||
# identity before failing
|
||||
self.fail("PID still exists but fun raised " \
|
||||
"NoSuchProcess")
|
||||
self.assertEqual(err.pid, p.pid)
|
||||
if err.name:
|
||||
# make sure exception's name attr is set
|
||||
@@ -1646,9 +1737,19 @@ class TestFetchAllProcesses(unittest.TestCase):
|
||||
meth(ret)
|
||||
except Exception:
|
||||
err = sys.exc_info()[1]
|
||||
trace = traceback.format_exc()
|
||||
self.fail('%s\nproc=%s, method=%r, retvalue=%r'
|
||||
% (trace, p, name, ret))
|
||||
s = '\n' + '=' * 70 + '\n'
|
||||
s += "FAIL: test_%s (proc=%s" % (name, p)
|
||||
if ret != default:
|
||||
s += ", ret=%s)" % repr(ret)
|
||||
s += ')\n'
|
||||
s += '-' * 70
|
||||
s += "\n%s" % traceback.format_exc()
|
||||
s = "\n".join((" " * 4) + i for i in s.splitlines())
|
||||
failures.append(s)
|
||||
break
|
||||
|
||||
if failures:
|
||||
self.fail(''.join(failures))
|
||||
|
||||
# we should always have a non-empty list, not including PID 0 etc.
|
||||
# special cases.
|
||||
@@ -1668,6 +1769,7 @@ class TestFetchAllProcesses(unittest.TestCase):
|
||||
if POSIX:
|
||||
assert os.path.isfile(ret), ret
|
||||
if hasattr(os, 'access') and hasattr(os, "X_OK"):
|
||||
# XXX may fail on OSX
|
||||
self.assertTrue(os.access(ret, os.X_OK))
|
||||
|
||||
def ppid(self, ret):
|
||||
@@ -1680,7 +1782,7 @@ class TestFetchAllProcesses(unittest.TestCase):
|
||||
def create_time(self, ret):
|
||||
self.assertTrue(ret > 0)
|
||||
if not WINDOWS:
|
||||
self.assertTrue(ret >= psutil.BOOT_TIME)
|
||||
assert ret >= psutil.BOOT_TIME, (ret, psutil.BOOT_TIME)
|
||||
# make sure returned value can be pretty printed
|
||||
# with strftime
|
||||
time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret))
|
||||
@@ -1712,8 +1814,12 @@ class TestFetchAllProcesses(unittest.TestCase):
|
||||
self.assertTrue(field >= 0)
|
||||
|
||||
def get_ionice(self, ret):
|
||||
self.assertTrue(ret.ioclass >= 0)
|
||||
self.assertTrue(ret.value >= 0)
|
||||
if LINUX:
|
||||
self.assertTrue(ret.ioclass >= 0)
|
||||
self.assertTrue(ret.value >= 0)
|
||||
else:
|
||||
self.assertTrue(ret >= 0)
|
||||
self.assertIn(ret, (0, 1, 2))
|
||||
|
||||
def get_num_threads(self, ret):
|
||||
self.assertTrue(ret >= 1)
|
||||
@@ -1907,6 +2013,7 @@ def cleanup():
|
||||
safe_remove(TESTFN)
|
||||
|
||||
atexit.register(cleanup)
|
||||
safe_remove(TESTFN)
|
||||
|
||||
def test_main():
|
||||
tests = []
|
||||
|
||||
Reference in New Issue
Block a user